Workers
Worker is an object which runs specified function (target) in a loop.
Contents
Common
Worker parameters
All workers support the following initial parameters:
- name worker name (default: name of target function if specified, otherwise: auto-generated UUID)
- func target function (default: worker.run)
- priority worker thread priority
- o special object, passed as-is to target (e.g. object worker is running for)
- on_error a function which is called, if target raises an exception
- on_error_kwargs kwargs for on_error function
- supervisor alternative task supervisor
- poll_delay worker poll delay (default: task supervisor poll delay)
Methods
-
class
neotasker.
BackgroundWorker
(name=None, fn=None, **kwargs) -
is_active
() Check if worker is active
Returns: True if worker is active, otherwise False
-
is_started
() Check if worker is started
-
is_stopped
() Check if worker is stopped
-
restart
(*args, **kwargs) Restart worker, all arguments will be passed to target function as-is
-
start
(**kwargs) Start worker, all arguments will be passed to target function as-is
-
stop
(wait=True) Stop worker
-
Overriding parameters at startup
Initial parameters name, priority and o can be overriden during worker startup (first two - as _name and _priority)
myworker.start(_name='worker1')
Executor function
Worker target function is either specified with annotation or named run (see examples below). The function should always have **kwargs param.
Executor function gets in args/kwargs:
- all parameters worker.start has been started with.
- _worker current worker object
- _name current worker name
- _task_id if target function is started in multiprocessing pool - ID of current task (for thread pool, task id = thread name).
Note
If target function return False, worker stops itself.
Asynchronous target function
Executor function can be asynchronous, in this case it’s executed inside task supervisor loop, no new thread is started and priority is ignored.
When background_worker decorator detects asynchronous function, class BackgroundAsyncWorker is automatically used instead of BackgroundWorker (BackgroundQueueWorker, BackgroundEventWorker and BackgroundIntervalWorker support synchronous functions out-of-the-box).
Additional worker parameter loop (_loop at startup) may be specified to put target function inside external async loop.
Note
To prevent interference between supervisor event loop and targets, it’s strongly recommended to specify own async event loop or create aloop.
Multiprocessing target function
To use multiprocessing, task supervisor mp pool must be created.
If target method run is defined as static, workers automatically detect this and use multiprocessing pool of task supervisor to launch target.
Note
As target is started in separate process, it doesn’t have an access to self object.
Additionally, method process_result must be defined in worker class to process target result. The method can stop worker by returning False value.
Example, let’s define BackgroundQueueWorker. Python multiprocessing module can not pick execution function defined via annotation, so worker class is required. Create it in separate module as Python multiprocessing can not pick methods from the module where the worker is started:
Warning
Multiprocessing target function should always finish correctly, without any exceptions otherwise callback function is never called and task become “freezed” in pool.
myworker.py
class MyWorker(BackgroundQueueWorker):
# executed in another process via task_supervisor
@staticmethod
def run(task, *args, **kwargs):
# .. process task
return '<task result>'
def process_result(self, result):
# process result
main.py
from myworker import MyWorker
worker = MyWorker()
worker.start()
# .....
worker.put_threadsafe('task')
# .....
worker.stop()
Workers
BackgroundWorker
Background worker is a worker which continuously run target function in a loop without any condition. Loop of this worker is synchronous and is started in separate thread instantly.
# with annotation - function becomes worker target
from neotasker import background_worker
@background_worker
def myfunc(*args, **kwargs):
print('I am background worker')
# with class
from neotasker import BackgroundWorker
class MyWorker(BackgroundWorker):
def run(self, *args, **kwargs):
print('I am a worker too')
myfunc.start()
myworker2 = MyWorker()
myworker2.start()
# ............
# stop first worker
myfunc.stop()
# stop 2nd worker, don't wait until it is really stopped
myworker2.stop(wait=False)
BackgroundAsyncWorker
Similar to BackgroundWorker but used for async target functions. Has additional parameter loop= (_loop in start function) to specify either async event loop or aloop object. By default either task supervisor event loop or task supervisor default aloop is used.
# with annotation - function becomes worker target
from neotasker import background_worker
@background_worker
async def async_worker(**kwargs):
print('I am async worker')
async_worker.start()
# with class
from neotasker import BackgroundAsyncWorker
class MyWorker(BackgroundAsyncWorker):
async def run(self, *args, **kwargs):
print('I am async worker too')
worker = MyWorker()
worker.start()
BackgroundQueueWorker
Background worker which gets data from asynchronous queue and passes it to synchronous or Asynchronous target.
Queue worker is created as soon as annotator detects q=True or queue=True param. Default queue is asyncio.queues.Queue. If you want to use e.g. priority queue, specify its class instead of just True.
# with annotation - function becomes worker target
from neotasker import background_worker
@background_worker(q=True)
def f(task, **kwargs):
print('Got task from queue: {}'.format(task))
@background_worker(q=asyncio.queues.PriorityQueue)
def f2(task, **kwargs):
print('Got task from queue too: {}'.format(task))
# with class
from neotasker import BackgroundQueueWorker
class MyWorker(BackgroundQueueWorker):
def run(self, task, *args, **kwargs):
print('my task is {}'.format(task))
f.start()
f2.start()
worker3 = MyWorker()
worker3.start()
f.put_threadsafe('task 1')
f2.put_threadsafe('task 2')
worker3.put_threadsafe('task 3')
put method is used to put task into worker’s queue. The method is thread-safe.
BackgroundEventWorker
Background worker which runs asynchronous loop waiting for the event and launches synchronous or asynchronous target when it’s happened.
Event worker is created as soon as annotator detects e=True or event=True param.
# with annotation - function becomes worker target
from neotasker import background_worker
@background_worker(e=True)
def f(task, **kwargs):
print('happened')
# with class
from neotasker import BackgroundEventWorker
class MyWorker(BackgroundEventWorker):
def run(self, *args, **kwargs):
print('happened')
f.start()
worker3 = MyWorker()
worker3.start()
f.trigger_threadsafe()
worker3.trigger_threadsafe()
trigger_threadsafe method is used to put task into worker’s queue. The method is thread-safe. If worker is triggered from the same asyncio loop, trigger method can be used instead.
BackgroundIntervalWorker
Background worker which runs synchronous or asynchronous target function with the specified interval or delay.
Worker initial parameters:
- interval run target with a specified interval (in seconds)
- delay delay between target launches
- delay_before delay before target launch
Parameters interval and delay can not be used together. All parameters can be overriden during startup by adding _ prefix (e.g. worker.start(_interval=1))
Background interval worker is created automatically, as soon as annotator detects one of the parameters above:
@background_worker(interval=1)
def myfunc(**kwargs):
print('I run every second!')
@background_worker(interval=1)
async def myfunc2(**kwargs):
print('I run every second and I am async!')
myfunc.start()
myfunc2.start()
As well as event worker, BackgroundIntervalWorker supports manual target triggering with worker.trigger() and worker.trigger_threadsafe()
Sometimes it’s required to call worker target function manually, without triggering - in example, to return result to the user. If you want interval worker to calculate the next scheduled execution from the time when target function was manually executed, use skip=True parameter when triggering:
def apifunc_trigger_worker_manually():
myfunc2.trigger_threadsafe(skip=True)
return myfunc2.run()
The call acts exactly like usual triggering, except worker skip the next target function execution once.