Skip to content

k3jobq

Action-CI Documentation Status Package

Concurrent job queue manager with multi-threading support. Process a series of inputs with worker functions concurrently.

k3jobq is a component of pykit3 project: a python3 toolkit set.

Installation

pip install k3jobq

Quick Start

import k3jobq

def add1(args):
    return args + 1

def printarg(args):
    print(args)

# Process inputs through worker pipeline
k3jobq.run([0, 1, 2], [add1, printarg])
# Output:
# 1
# 2
# 3

# Use multiple threads for a worker
k3jobq.run(range(100), [add1, (printarg, 4)])  # 4 threads for printarg

# Keep results in order
k3jobq.run(range(10), [add1, printarg], keep_order=True)

API Reference

k3jobq

EmptyRst

Bases: object

A worker function return this value to cancel a task. By returning EmptyRst, nothing is passed to next worker group::

def worker_back_hole(args):
    return k3jobq.EmptyRst

If None is returned by a worker, None is passed to next worker.

Source code in k3jobq/jobq.py
13
14
15
16
17
18
19
20
21
22
class EmptyRst(object):
    """
    A worker function return this value to cancel a task.
    By returning ``EmptyRst``, nothing is passed to next worker group::

        def worker_back_hole(args):
            return k3jobq.EmptyRst

    If ``None`` is returned by a worker, ``None`` is passed to next worker.
    """

JobManager

Bases: object

JobManager is the internal impl of run and let user separate worker management and input management. E.g., run(range(3), [_echo]) is same as::

def _echo(args):
    print args
    return args

jm = k3jobq.JobManager([_echo])
for i in range(3):
    jm.put(i)

jm.join()

JobManager creates threads for worker functions, and wait for input to be fed with JobManager.put().

The arguments are the same as k3jobq.run.

Source code in k3jobq/jobq.py
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
class JobManager(object):
    """
    JobManager is the internal impl of ``run`` and let user separate worker
    management and input management. E.g., ``run(range(3), [_echo])`` is same as::

        def _echo(args):
            print args
            return args

        jm = k3jobq.JobManager([_echo])
        for i in range(3):
            jm.put(i)

        jm.join()

    ``JobManager`` creates threads for worker functions, and wait for input to
    be fed with ``JobManager.put()``.

    The arguments are the same as `k3jobq.run`.
    """

    def __init__(self, workers, queue_size=1024, probe=None, keep_order=False):
        if probe is None:
            probe = {}

        self.workers = workers
        self.head_queue = _make_q(queue_size)
        self.probe = probe
        self.keep_order = keep_order

        self.worker_groups = []

        self.probe.update(
            {
                "worker_groups": self.worker_groups,
                "probe_lock": threading.RLock(),
                "in": 0,
                "out": 0,
            }
        )

        self.make_worker_groups()

    def make_worker_groups(self):
        inq = self.head_queue

        workers = self.workers + [_blackhole]
        for i, worker in enumerate(workers):
            if callable(worker):
                worker = (worker, 1)

            # worker callable, n_thread, dispatcher
            worker = (worker + (None,))[:3]

            worker, n, dispatcher = worker

            wg = WorkerGroup(i, worker, n, inq, dispatcher, self.probe, self.keep_order)

            self.worker_groups.append(wg)
            inq = wg.output_queue

    def set_thread_num(self, worker, n):
        """
        Change number of threads for a worker on the fly.

        If job manager has called ``JobManager.join()``.
        ``set_thread_num`` does nothing and just returns silently.

        When thread number is increased, new threads are created.
        If thread number is reduced, we do not stop worker thread in this
        function.
        Because we do not have a steady way to shutdown a running thread in
        python.
        Worker thread checks if it should continue running in _exec() and
        _exec_in_order(), by checking its thread_index against running thread
        index range ``running_index_range``.

        Args:

            worker:
                is the callable passed in when creating JobManager.
                It searches in job manager for the worker.
                If there is not such a worker ``is`` the one passed in, it raise a
                ``JobWorkerNotFound`` error.

            n(int): specifies the expected number of threads for this ``worker``.
                New threads are created and started before this function returns.
                The threads that will be removed are removed after they finishes
                the job they are doing.

        Returns:
            None
        """

        assert n > 0
        assert isinstance(n, int)

        for wg in self.worker_groups:
            """
            In python2, `x = X(); x.meth is x.meth` results in a `False`.
            Every time to retrieve a method, python creates a new **bound** function.

            We must use == to test function equality.

            See https://stackoverflow.com/questions/15977808/why-dont-methods-have-reference-equality
            """

            if wg.worker != worker:
                continue

            if wg.dispatcher is not None:
                raise JobWorkerError("worker-group with dispatcher does not allow to change thread number")

            with wg.worker_group_lock:
                if wg.exiting:
                    logger.info("worker group exiting. Thread number change not allowed")
                    break

                s, e = wg.running_index_range
                oldn = e - s

                if n < oldn:
                    s += oldn - n
                elif n > oldn:
                    e += n - oldn
                else:
                    break

                wg.running_index_range = [s, e]
                wg.add_worker_thread()

                logger.info(
                    "thread number is set to {n}, thread index: {idx}, running threads: {ths}".format(
                        n=n,
                        idx=list(range(wg.running_index_range[0], wg.running_index_range[1])),
                        ths=sorted(wg.threads.keys()),
                    )
                )
                break

        else:
            raise JobWorkerNotFound(worker)

    def put(self, elt):
        """
        Put anything as an input element.
        """
        self.head_queue.put(elt)

    def join(self, timeout=None):
        """
        Wait for all worker to finish.

        Args:
            timeout(float): is the same as `k3jobq.run`

        Returns:
            None
        """

        endtime = time.time() + (timeout or 86400 * 365)

        for wg in self.worker_groups:
            with wg.worker_group_lock:
                # prevent adding or removing thread
                wg.exiting = True
                ths = list(wg.threads.values())

            if wg.dispatcher is None:
                # put nr = len(threads) Finish
                for th in ths:
                    wg.input_queue.put(Finish)
            else:
                wg.input_queue.put(Finish)
                # wait for dispatcher to finish or jobs might be lost
                wg.dispatcher_thread.join(endtime - time.time())

                for qs in wg.dispatch_queues:
                    qs["input"].put(Finish)

            for th in ths:
                th.join(endtime - time.time())

            if wg.queue_of_output_q is not None:
                wg.queue_of_output_q.put(Finish)
                wg.coordinator_thread.join(endtime - time.time())

            # if join timeout, let threads quit at next loop
            wg.running = False

    def stat(self):
        return stat(self.probe)

join(timeout=None)

Wait for all worker to finish.

Parameters:

Name Type Description Default
timeout float

is the same as k3jobq.run

None

Returns:

Type Description

None

Source code in k3jobq/jobq.py
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
def join(self, timeout=None):
    """
    Wait for all worker to finish.

    Args:
        timeout(float): is the same as `k3jobq.run`

    Returns:
        None
    """

    endtime = time.time() + (timeout or 86400 * 365)

    for wg in self.worker_groups:
        with wg.worker_group_lock:
            # prevent adding or removing thread
            wg.exiting = True
            ths = list(wg.threads.values())

        if wg.dispatcher is None:
            # put nr = len(threads) Finish
            for th in ths:
                wg.input_queue.put(Finish)
        else:
            wg.input_queue.put(Finish)
            # wait for dispatcher to finish or jobs might be lost
            wg.dispatcher_thread.join(endtime - time.time())

            for qs in wg.dispatch_queues:
                qs["input"].put(Finish)

        for th in ths:
            th.join(endtime - time.time())

        if wg.queue_of_output_q is not None:
            wg.queue_of_output_q.put(Finish)
            wg.coordinator_thread.join(endtime - time.time())

        # if join timeout, let threads quit at next loop
        wg.running = False

put(elt)

Put anything as an input element.

Source code in k3jobq/jobq.py
354
355
356
357
358
def put(self, elt):
    """
    Put anything as an input element.
    """
    self.head_queue.put(elt)

set_thread_num(worker, n)

Change number of threads for a worker on the fly.

If job manager has called JobManager.join(). set_thread_num does nothing and just returns silently.

When thread number is increased, new threads are created. If thread number is reduced, we do not stop worker thread in this function. Because we do not have a steady way to shutdown a running thread in python. Worker thread checks if it should continue running in _exec() and _exec_in_order(), by checking its thread_index against running thread index range running_index_range.

Args:

worker:
    is the callable passed in when creating JobManager.
    It searches in job manager for the worker.
    If there is not such a worker ``is`` the one passed in, it raise a
    ``JobWorkerNotFound`` error.

n(int): specifies the expected number of threads for this ``worker``.
    New threads are created and started before this function returns.
    The threads that will be removed are removed after they finishes
    the job they are doing.

Returns:

Type Description

None

Source code in k3jobq/jobq.py
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
def set_thread_num(self, worker, n):
    """
    Change number of threads for a worker on the fly.

    If job manager has called ``JobManager.join()``.
    ``set_thread_num`` does nothing and just returns silently.

    When thread number is increased, new threads are created.
    If thread number is reduced, we do not stop worker thread in this
    function.
    Because we do not have a steady way to shutdown a running thread in
    python.
    Worker thread checks if it should continue running in _exec() and
    _exec_in_order(), by checking its thread_index against running thread
    index range ``running_index_range``.

    Args:

        worker:
            is the callable passed in when creating JobManager.
            It searches in job manager for the worker.
            If there is not such a worker ``is`` the one passed in, it raise a
            ``JobWorkerNotFound`` error.

        n(int): specifies the expected number of threads for this ``worker``.
            New threads are created and started before this function returns.
            The threads that will be removed are removed after they finishes
            the job they are doing.

    Returns:
        None
    """

    assert n > 0
    assert isinstance(n, int)

    for wg in self.worker_groups:
        """
        In python2, `x = X(); x.meth is x.meth` results in a `False`.
        Every time to retrieve a method, python creates a new **bound** function.

        We must use == to test function equality.

        See https://stackoverflow.com/questions/15977808/why-dont-methods-have-reference-equality
        """

        if wg.worker != worker:
            continue

        if wg.dispatcher is not None:
            raise JobWorkerError("worker-group with dispatcher does not allow to change thread number")

        with wg.worker_group_lock:
            if wg.exiting:
                logger.info("worker group exiting. Thread number change not allowed")
                break

            s, e = wg.running_index_range
            oldn = e - s

            if n < oldn:
                s += oldn - n
            elif n > oldn:
                e += n - oldn
            else:
                break

            wg.running_index_range = [s, e]
            wg.add_worker_thread()

            logger.info(
                "thread number is set to {n}, thread index: {idx}, running threads: {ths}".format(
                    n=n,
                    idx=list(range(wg.running_index_range[0], wg.running_index_range[1])),
                    ths=sorted(wg.threads.keys()),
                )
            )
            break

    else:
        raise JobWorkerNotFound(worker)

limit_job_speed(max_job_speed, job_step=1)

limit_job_speed is a worker that limits the speed at which jobs are processed. It falls into sleep if the job throughput exceeds the specified value::

k3jobq.run(range(1000), [
        (k3jobq.limit_job_speed(100, 1), 1), # execute 100 job per second
        (empty, 10),
])

Parameters:

Name Type Description Default
max_job_speed float

is a float or function, represents the maximum execution job speed. If it is a function, use the return value of the function.

required
job_step int

represents the step length of a job, the default is 1.

1

Returns:

Type Description

the args passed in.

Source code in k3jobq/works.py
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
def limit_job_speed(max_job_speed, job_step=1):
    """
    ``limit_job_speed`` is a worker that limits the speed at which jobs are processed.
    It falls into sleep if the job throughput exceeds the specified value::

        k3jobq.run(range(1000), [
                (k3jobq.limit_job_speed(100, 1), 1), # execute 100 job per second
                (empty, 10),
        ])

    Args:
        max_job_speed(float): is a float or function, represents the maximum
            execution job speed. If it is a function, use the return value of the
            function.

        job_step(int): represents the step length of a job, the default is 1.

    Returns:
        the ``args`` passed in.
    """

    speed_stat = {
        "start_time": time.time(),
        "job_num": 0,
        "tick_time": 5,
    }

    def _limit(job_args):
        try:
            max_speed = max_job_speed
            if callable(max_job_speed):
                max_speed = max_job_speed()

            speed_stat["job_num"] += job_step

            min_ts = speed_stat["job_num"] * 1.0 / max_speed
            itv_ts = time.time() - speed_stat["start_time"]
            if itv_ts < min_ts:
                time.sleep(min_ts - itv_ts)

            now = time.time()
            if now - speed_stat["start_time"] >= speed_stat["tick_time"]:
                speed = speed_stat["job_num"] * 1.0 / (now - speed_stat["start_time"])

                logger.info(
                    "current speed: {speed}/s max_speed:{max_speed}/s job_num:{job_num} start_time:{start_time}".format(
                        speed=round(speed, 3),
                        max_speed=max_speed,
                        job_num=speed_stat["job_num"],
                        start_time=speed_stat["start_time"],
                    )
                )

                speed_stat["start_time"] = now
                speed_stat["job_num"] = 0

        except Exception as e:
            logger.exception("error occurred limit job speed: " + repr(e))

        return job_args

    return _limit

run(input_it, workers, keep_order=False, timeout=None, probe=None)

Process element in input one by one with functions in workers.

Args:

input(iterable): input elts to process.

workers: list of functions, or ``tuple`` of ``(function, nr_of_thread)``,
    or ``tuple`` of ``(function, nr_of_thread, dispatcher_func)``.

    A worker function accepts exactly one argument and return one value.
    The argument is one elt from input(the first worker) or a result
    from the previous worker.
    A typical worker would be defined like::

        def worker_foo(args):
            result = foo(args)
            return result

    A worker function can also be an iterator, in which case, k3jobq
    iterates all elements from returned iterator and pass it to next
    worker::

        def worker_iter(args):
            for elt in args:
                yield elt

    A ``dispatcher`` is specified by user to control how to dispatch args to
    different workers.
    It accepts one argument ``args``, same as the ``args`` passed to a worker
    function,  and return a ``int`` indicating which of next workers to used.
    A ``dispatcher`` is used to balance related work load to workers.

    A dispatcher function may be defined like::

        def dispatch(args):
            return hash(args) % 5

    A user-defined dipatcher is used when **concurrency** and **partial-order**
    are both required:

    -   The ``args`` passed to a same worker is guaranteed to be
        executed in order.

    -   While different workers runs in parallel.

    If a ``dispatcher`` specified, it implies ``keep_order=True``.

    Thus a worker group with dispatcher also put result into the input
    queue of next worker group with order kept.

keep_order(bool): specifies whether elements must be processed in order.
    Keeping them in order affects performance. By default it is ``False``.

timeout(float): specifies the max time to run. ``None`` means to wait
    until all jobs are done. By default it is ``None``.

    If k3jobq exceeds ``timeout`` before finishing, it returns after all workers
    finishing their current job.

probe: is a dictionary to collect stats. By defaul it is ``None``.
    If it is a valid dictionary, ``k3jobq`` writes stats of running jobs to it.
    ``k3jobq.stat()`` can be used to obtain stat data.

Returns:

Type Description

None

Source code in k3jobq/jobq.py
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
def run(input_it, workers, keep_order=False, timeout=None, probe=None):
    """
    Process element in ``input`` one by one with functions in ``workers``.

    Args:

        input(iterable): input elts to process.

        workers: list of functions, or ``tuple`` of ``(function, nr_of_thread)``,
            or ``tuple`` of ``(function, nr_of_thread, dispatcher_func)``.

            A worker function accepts exactly one argument and return one value.
            The argument is one elt from input(the first worker) or a result
            from the previous worker.
            A typical worker would be defined like::

                def worker_foo(args):
                    result = foo(args)
                    return result

            A worker function can also be an iterator, in which case, k3jobq
            iterates all elements from returned iterator and pass it to next
            worker::

                def worker_iter(args):
                    for elt in args:
                        yield elt

            A ``dispatcher`` is specified by user to control how to dispatch args to
            different workers.
            It accepts one argument ``args``, same as the ``args`` passed to a worker
            function,  and return a ``int`` indicating which of next workers to used.
            A ``dispatcher`` is used to balance related work load to workers.

            A dispatcher function may be defined like::

                def dispatch(args):
                    return hash(args) % 5

            A user-defined dipatcher is used when **concurrency** and **partial-order**
            are both required:

            -   The ``args`` passed to a same worker is guaranteed to be
                executed in order.

            -   While different workers runs in parallel.

            If a ``dispatcher`` specified, it implies ``keep_order=True``.

            Thus a worker group with dispatcher also put result into the input
            queue of next worker group with order kept.

        keep_order(bool): specifies whether elements must be processed in order.
            Keeping them in order affects performance. By default it is ``False``.

        timeout(float): specifies the max time to run. ``None`` means to wait
            until all jobs are done. By default it is ``None``.

            If k3jobq exceeds ``timeout`` before finishing, it returns after all workers
            finishing their current job.

        probe: is a dictionary to collect stats. By defaul it is ``None``.
            If it is a valid dictionary, ``k3jobq`` writes stats of running jobs to it.
            ``k3jobq.stat()`` can be used to obtain stat data.

    Returns:
        None
    """

    mgr = JobManager(workers, probe=probe, keep_order=keep_order)

    try:
        for args in input_it:
            mgr.put(args)

    finally:
        mgr.join(timeout=timeout)

stat(probe)

Get stat about a running or finished k3jobq session. stat returned is in a dictionary like::

{
    'in': 10,       # number of elements all workers started processing.
    'out': 8,       # number of elements all workers finished processing.
    'doing': 2,     # number of elements all workers is processing.
    'workers': [
        {
            'name': 'example.worker_foo',
            'input': {'size': 3, 'capa': 1024}, # input queue
            'nr_worker': 2,                     # number of worker thread
            'coordinator': {'size': 3, 'capa': 1024}, # presents only when keep_order=True
        },
        ...
    ]
}

Parameters:

Name Type Description Default
probe dict

is the dictionary passed into k3jobq.run.

required

Returns:

Type Description

dict of stat.

Source code in k3jobq/jobq.py
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
def stat(probe):
    """
    Get stat about a running or finished k3jobq session.
    stat returned is in a dictionary like::

        {
            'in': 10,       # number of elements all workers started processing.
            'out': 8,       # number of elements all workers finished processing.
            'doing': 2,     # number of elements all workers is processing.
            'workers': [
                {
                    'name': 'example.worker_foo',
                    'input': {'size': 3, 'capa': 1024}, # input queue
                    'nr_worker': 2,                     # number of worker thread
                    'coordinator': {'size': 3, 'capa': 1024}, # presents only when keep_order=True
                },
                ...
            ]
        }

    Args:
        probe(dict): is the dictionary passed into ``k3jobq.run``.

    Returns:
        dict of stat.
    """

    with probe["probe_lock"]:
        rst = {
            "in": probe["in"],
            "out": probe["out"],
            "doing": probe["in"] - probe["out"],
            "workers": [],
        }

    # exclude the _start and _end
    for wg in probe["worker_groups"][:-1]:
        o = {}
        wk = wg.worker
        o["name"] = (wk.__module__ or "builtin") + ":" + wk.__name__
        o["input"] = _q_stat(wg.input_queue)
        if wg.dispatcher is not None:
            o["dispatcher"] = [
                {
                    "input": _q_stat(qs["input"]),
                    "output": _q_stat(qs["output"]),
                }
                for qs in wg.dispatch_queues
            ]

        if wg.queue_of_output_q is not None:
            o["coordinator"] = _q_stat(wg.queue_of_output_q)

        s, e = wg.running_index_range
        o["nr_worker"] = e - s

        rst["workers"].append(o)

    return rst

License

The MIT License (MIT) - Copyright (c) 2015 Zhang Yanpo (张炎泼)