k3jobq
Concurrent job queue manager with multi-threading support. Process a series of inputs with worker functions concurrently.
k3jobq is a component of pykit3 project: a python3 toolkit set.
Installation
pip install k3jobq
Quick Start
import k3jobq
def add1(args):
return args + 1
def printarg(args):
print(args)
# Process inputs through worker pipeline
k3jobq.run([0, 1, 2], [add1, printarg])
# Output:
# 1
# 2
# 3
# Use multiple threads for a worker
k3jobq.run(range(100), [add1, (printarg, 4)]) # 4 threads for printarg
# Keep results in order
k3jobq.run(range(10), [add1, printarg], keep_order=True)
API Reference
k3jobq
EmptyRst
Bases: object
A worker function return this value to cancel a task.
By returning EmptyRst, nothing is passed to next worker group::
def worker_back_hole(args):
return k3jobq.EmptyRst
If None is returned by a worker, None is passed to next worker.
Source code in k3jobq/jobq.py
13 14 15 16 17 18 19 20 21 22 | |
JobManager
Bases: object
JobManager is the internal impl of run and let user separate worker
management and input management. E.g., run(range(3), [_echo]) is same as::
def _echo(args):
print args
return args
jm = k3jobq.JobManager([_echo])
for i in range(3):
jm.put(i)
jm.join()
JobManager creates threads for worker functions, and wait for input to
be fed with JobManager.put().
The arguments are the same as k3jobq.run.
Source code in k3jobq/jobq.py
211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 | |
join(timeout=None)
Wait for all worker to finish.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
timeout
|
float
|
is the same as |
None
|
Returns:
| Type | Description |
|---|---|
|
None |
Source code in k3jobq/jobq.py
360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 | |
put(elt)
Put anything as an input element.
Source code in k3jobq/jobq.py
354 355 356 357 358 | |
set_thread_num(worker, n)
Change number of threads for a worker on the fly.
If job manager has called JobManager.join().
set_thread_num does nothing and just returns silently.
When thread number is increased, new threads are created.
If thread number is reduced, we do not stop worker thread in this
function.
Because we do not have a steady way to shutdown a running thread in
python.
Worker thread checks if it should continue running in _exec() and
_exec_in_order(), by checking its thread_index against running thread
index range running_index_range.
Args:
worker:
is the callable passed in when creating JobManager.
It searches in job manager for the worker.
If there is not such a worker ``is`` the one passed in, it raise a
``JobWorkerNotFound`` error.
n(int): specifies the expected number of threads for this ``worker``.
New threads are created and started before this function returns.
The threads that will be removed are removed after they finishes
the job they are doing.
Returns:
| Type | Description |
|---|---|
|
None |
Source code in k3jobq/jobq.py
272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 | |
limit_job_speed(max_job_speed, job_step=1)
limit_job_speed is a worker that limits the speed at which jobs are processed.
It falls into sleep if the job throughput exceeds the specified value::
k3jobq.run(range(1000), [
(k3jobq.limit_job_speed(100, 1), 1), # execute 100 job per second
(empty, 10),
])
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
max_job_speed
|
float
|
is a float or function, represents the maximum execution job speed. If it is a function, use the return value of the function. |
required |
job_step
|
int
|
represents the step length of a job, the default is 1. |
1
|
Returns:
| Type | Description |
|---|---|
|
the |
Source code in k3jobq/works.py
7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 | |
run(input_it, workers, keep_order=False, timeout=None, probe=None)
Process element in input one by one with functions in workers.
Args:
input(iterable): input elts to process.
workers: list of functions, or ``tuple`` of ``(function, nr_of_thread)``,
or ``tuple`` of ``(function, nr_of_thread, dispatcher_func)``.
A worker function accepts exactly one argument and return one value.
The argument is one elt from input(the first worker) or a result
from the previous worker.
A typical worker would be defined like::
def worker_foo(args):
result = foo(args)
return result
A worker function can also be an iterator, in which case, k3jobq
iterates all elements from returned iterator and pass it to next
worker::
def worker_iter(args):
for elt in args:
yield elt
A ``dispatcher`` is specified by user to control how to dispatch args to
different workers.
It accepts one argument ``args``, same as the ``args`` passed to a worker
function, and return a ``int`` indicating which of next workers to used.
A ``dispatcher`` is used to balance related work load to workers.
A dispatcher function may be defined like::
def dispatch(args):
return hash(args) % 5
A user-defined dipatcher is used when **concurrency** and **partial-order**
are both required:
- The ``args`` passed to a same worker is guaranteed to be
executed in order.
- While different workers runs in parallel.
If a ``dispatcher`` specified, it implies ``keep_order=True``.
Thus a worker group with dispatcher also put result into the input
queue of next worker group with order kept.
keep_order(bool): specifies whether elements must be processed in order.
Keeping them in order affects performance. By default it is ``False``.
timeout(float): specifies the max time to run. ``None`` means to wait
until all jobs are done. By default it is ``None``.
If k3jobq exceeds ``timeout`` before finishing, it returns after all workers
finishing their current job.
probe: is a dictionary to collect stats. By defaul it is ``None``.
If it is a valid dictionary, ``k3jobq`` writes stats of running jobs to it.
``k3jobq.stat()`` can be used to obtain stat data.
Returns:
| Type | Description |
|---|---|
|
None |
Source code in k3jobq/jobq.py
405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 | |
stat(probe)
Get stat about a running or finished k3jobq session. stat returned is in a dictionary like::
{
'in': 10, # number of elements all workers started processing.
'out': 8, # number of elements all workers finished processing.
'doing': 2, # number of elements all workers is processing.
'workers': [
{
'name': 'example.worker_foo',
'input': {'size': 3, 'capa': 1024}, # input queue
'nr_worker': 2, # number of worker thread
'coordinator': {'size': 3, 'capa': 1024}, # presents only when keep_order=True
},
...
]
}
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
probe
|
dict
|
is the dictionary passed into |
required |
Returns:
| Type | Description |
|---|---|
|
dict of stat. |
Source code in k3jobq/jobq.py
484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 | |
License
The MIT License (MIT) - Copyright (c) 2015 Zhang Yanpo (张炎泼)