neotasker

Lightweight Python library for modern thread / multiprocessing pooling and task processing via asyncio.

Neotasker is lightweight variation of atasker library: tasks don’t have priorities, go directly to ThreadPoolExecutor and are standard Python future objects. This library is useful for the high-load projects with lightweight tasks as majority tasks are directly proxied to pool.

Neotasker works on top of ThreadPoolExecutor and asyncio and provides additional features:

  • Easy thread pool and asyncio loops initialization
  • Interval, queue and event-based workers
  • Built-in integration with aiosched

Install

pip3 install neotasker

Sources: https://github.com/alttch/neotasker

Documentation: https://neotasker.readthedocs.io/

Code examples

Start/stop

from neotasker import task_supervisor

# set pool size
# min_size='max' means pre-spawn all pool threads
task_supervisor.set_thread_pool(min_size='max', max_size=20)
task_supervisor.start()
# ...
# start workers, other threads etc.
# ...
# optionally block current thread
task_supervisor.block()

# stop from any thread
task_supervisor.stop()

Executing future

You may work with neotasker.thread_pool object directly or use task_supervisor.spawn function, which’s directly mapped to thread_pool.submit)

from neotasker import thread_pool

thread_pool.start()

def mytask(a, b, c):
    print(f'I am working in the background! {a} {b} {c}')
    return 777

task = task_supervisor.spawn(mytask, 1, 2, c=3)

# get future result
result = task.result()

Creating async io loop

from neotasker import thread_pool

thread_pool.start()
task_supervisor.create_aloop('default', default=True)

# The loop will until supervisor is stopped
# Spawn coroutine from another thread:

task_supervisor.get_aloop().spawn_coroutine_threadsafe(coro)

Worker examples

from neotasker import background_worker, task_supervisor

task_supervisor.start()
# we need to create at least one aloop to start workers
task_supervisor.create_aloop('default', default=True)
# create one more async loop
task_supervisor.create_aloop('loop2')

@background_worker
def worker1(**kwargs):
    print('I am a simple background worker')

@background_worker
async def worker_async(**kwargs):
    print('I am async background worker')

@background_worker(interval=1, loop='loop2')
def worker2(**kwargs):
    print('I run every second!')

@background_worker(queue=True)
def worker3(task, **kwargs):
    print('I run when there is a task in my queue')

@background_worker(event=True)
def worker4(**kwargs):
    print('I run when triggered')

worker1.start()
worker_async.start()
worker2.start()
worker3.start()
worker4.start()

worker3.put_threadsafe('todo1')
worker4.trigger_threadsafe()

from neotasker import BackgroundIntervalWorker

class MyWorker(BackgroundIntervalWorker):

    def run(self, **kwargs):
        print('I am custom worker class')

worker5 = MyWorker(interval=0.1, name='worker5')
worker5.start()

Task supervisor

Task supervisor is a component which manages task thread pool and run task schedulers (workers).

Usage

When neotasker package is imported, default task supervisor is automatically created.

from neotasker import task_supervisor

# thread pool
task_supervisor.set_thread_pool(max_size=20)
task_supervisor.start()

Warning

Task supervisor must be started before any scheduler/worker or task.

Pool size

Parameters min_size and max_size set actual system thread pool size. If max_size is not specified, it’s set to pool_size + reserve_normal + reserve_high. It’s recommended to set max_size slightly larger manually to have a space for critical tasks.

By default, max_size is CPU count * 5. You may use argument min_size=’max’ to automatically set minimal pool size to max.

Note

pool size can be changed while task supervisor is running.

Poll delay

Poll delay is a delay (in seconds), which is used only by event and queue workers to prevent asyncio loop spamming and some other methods like start/stop.

Lower poll delay = higher CPU usage, higher poll delay = lower reaction time.

Default poll delay is 0.01 second. Can be changed with:

task_supervisor.poll_delay = 0.1 # set poll delay to 100ms
Blocking

Task supervisor is started in its own thread. If you want to block current thread, you may use method

task_supervisor.block()

which will just sleep while task supervisor is active.

Stopping task supervisor
task_supervisor.stop(wait=True, stop_schedulers=True, cancel_tasks=False)

Params:

  • wait wait until tasks and scheduler coroutines finish. If wait=<number>, task supervisor will wait until coroutines finish for the max. wait seconds. However if requested to stop schedulers (workers) or task threads are currently running, method stop wait until they finish for the unlimited time.
  • stop_schedulers before stopping the main event loop, task scheduler will call stop method of all schedulers running.
  • cancel_tasks if specified, task supervisor will try to forcibly cancel all scheduler coroutines.
aloops: async targets and tasks

Usually it’s unsafe to run both schedulers (workers) target functions and custom tasks in supervisor’s event loop. Workers use event loop by default and if anything is blocked, the program may be freezed.

To avoid this, it’s strongly recommended to create independent async loops for your custom tasks. neotasker supervisor has built-in engine for async loops, called “aloops”, each aloop run in a separated thread and doesn’t interfere with supervisor event loop and others.

Create

If you plan to use async worker target functions, create aloop:

a = task_supervisor.create_aloop('myworkers', default=True, daemon=True)
# the loop is instantly started by default, to prevent add param start=False
# and then use
# task_supervisor.start_aloop('myworkers')

To determine in which thread target function is started, simply get its name. aloop threads are called “supervisor_aloop_<name>”.

Using with workers

Workers automatically launch async target function in default aloop, or aloop can be specified with loop= at init or _loop= at startup.

Executing own coroutines

aloops have 2 methods to execute own coroutines:

# put coroutine to loop
task = aloop.spawn_coroutine_threadsafe(coro)

# blocking wait for result from coroutine
result = aloop.run_coroutine_threadsafe(coro)
Other supervisor methods

Note

It’s not recommended to create/start/stop aloops without supervisor

# set default aloop
task_supervisor.set_default_aloop(aloop):

# get aloop by name
task_supervisor.get_aloop(name)

# stop aloop (not required, supervisor stops all aloops at shutdown)
task_supervisor.stop_aloop(name)

# get aloop async event loop object for direct access
aloop.get_loop()
Multiprocessing

It’s possible to replace task_supervisor.thread_pool with ProcessPoolExecutor object and majority functions will continue working properly.

Custom task supervisor
from neotasker import TaskSupervisor

# ID is used only for logging
my_supervisor = TaskSupervisor(id='my_supervisor')

class MyTaskSupervisor(TaskSupervisor):
   # .......

# if ID is not set, random UUID will be assigned
my_supervisor2 = MyTaskSupervisor()
Putting own tasks

Task supervisor method spawn is mapped directly to thread_pool.submit and returns standard future object.

You may also access task_supervisor.thead_pool directly.

Creating own schedulers

Own task scheduler (worker) can be registered in task supervisor with:

task_supervisor.register_scheduler(scheduler)

Where scheduler = scheduler object, which should implement at least stop (regular) and loop (async) methods.

Task supervisor can also register synchronous schedulers/workers, but it can only stop them when stop method is called:

task_supervisor.register_sync_scheduler(scheduler)

To unregister schedulers from task supervisor, use unregister_scheduler and unregister_sync_scheduler methods.

Async jobs

neotasker has built-in integration with aiosched - simple and fast async job scheduler.

aiosched schedulers can be automatically started inside aloop:

async def test1():
   print('I am lightweight async job')

task_supervisor.create_aloop('jobs')
# if aloop id not specified, default aloop is used
task_supervisor.create_async_job_scheduler('default', aloop='jobs',
   default=True)
# create async job
job1 = task_supervisor.create_async_job(target=test1, interval=0.1)
# cancel async job
task_supervisor.cancel_async_job(job=job1)

Note

aiosched jobs are lightweight, don’t report any statistic data and don’t check is the job already running.

Workers

Worker is an object which runs specified function (target) in a loop.

Common
Worker parameters

All workers support the following initial parameters:

  • name worker name (default: name of target function if specified, otherwise: auto-generated UUID)
  • func target function (default: worker.run)
  • priority worker thread priority
  • o special object, passed as-is to target (e.g. object worker is running for)
  • on_error a function which is called, if target raises an exception
  • on_error_kwargs kwargs for on_error function
  • supervisor alternative task supervisor
  • poll_delay worker poll delay (default: task supervisor poll delay)
Methods
class neotasker.BackgroundWorker(name=None, fn=None, **kwargs)
is_active()

Check if worker is active

Returns:True if worker is active, otherwise False
is_started()

Check if worker is started

is_stopped()

Check if worker is stopped

restart(*args, **kwargs)

Restart worker, all arguments will be passed to target function as-is

start(**kwargs)

Start worker, all arguments will be passed to target function as-is

stop(wait=True)

Stop worker

Overriding parameters at startup

Initial parameters name, priority and o can be overriden during worker startup (first two - as _name and _priority)

myworker.start(_name='worker1')
Executor function

Worker target function is either specified with annotation or named run (see examples below). The function should always have **kwargs param.

Executor function gets in args/kwargs:

  • all parameters worker.start has been started with.
  • _worker current worker object
  • _name current worker name
  • _task_id if target function is started in multiprocessing pool - ID of current task (for thread pool, task id = thread name).

Note

If target function return False, worker stops itself.

Asynchronous target function

Executor function can be asynchronous, in this case it’s executed inside task supervisor loop, no new thread is started and priority is ignored.

When background_worker decorator detects asynchronous function, class BackgroundAsyncWorker is automatically used instead of BackgroundWorker (BackgroundQueueWorker, BackgroundEventWorker and BackgroundIntervalWorker support synchronous functions out-of-the-box).

Additional worker parameter loop (_loop at startup) may be specified to put target function inside external async loop.

Note

To prevent interference between supervisor event loop and targets, it’s strongly recommended to specify own async event loop or create aloop.

Multiprocessing target function

To use multiprocessing, task supervisor mp pool must be created.

If target method run is defined as static, workers automatically detect this and use multiprocessing pool of task supervisor to launch target.

Note

As target is started in separate process, it doesn’t have an access to self object.

Additionally, method process_result must be defined in worker class to process target result. The method can stop worker by returning False value.

Example, let’s define BackgroundQueueWorker. Python multiprocessing module can not pick execution function defined via annotation, so worker class is required. Create it in separate module as Python multiprocessing can not pick methods from the module where the worker is started:

Warning

Multiprocessing target function should always finish correctly, without any exceptions otherwise callback function is never called and task become “freezed” in pool.

myworker.py

class MyWorker(BackgroundQueueWorker):

    # executed in another process via task_supervisor
    @staticmethod
    def run(task, *args, **kwargs):
        # .. process task
        return '<task result>'

    def process_result(self, result):
        # process result

main.py

from myworker import MyWorker

worker = MyWorker()
worker.start()
# .....
worker.put_threadsafe('task')
# .....
worker.stop()
Workers
BackgroundWorker

Background worker is a worker which continuously run target function in a loop without any condition. Loop of this worker is synchronous and is started in separate thread instantly.

# with annotation - function becomes worker target
from neotasker import background_worker

@background_worker
def myfunc(*args, **kwargs):
    print('I am background worker')

# with class
from neotasker import BackgroundWorker

class MyWorker(BackgroundWorker):

    def run(self, *args, **kwargs):
        print('I am a worker too')

myfunc.start()

myworker2 = MyWorker()
myworker2.start()

# ............

# stop first worker
myfunc.stop()
# stop 2nd worker, don't wait until it is really stopped
myworker2.stop(wait=False)
BackgroundAsyncWorker

Similar to BackgroundWorker but used for async target functions. Has additional parameter loop= (_loop in start function) to specify either async event loop or aloop object. By default either task supervisor event loop or task supervisor default aloop is used.

# with annotation - function becomes worker target
from neotasker import background_worker

@background_worker
async def async_worker(**kwargs):
    print('I am async worker')

async_worker.start()

# with class
from neotasker import BackgroundAsyncWorker

class MyWorker(BackgroundAsyncWorker):

    async def run(self, *args, **kwargs):
        print('I am async worker too')

worker = MyWorker()
worker.start()
BackgroundQueueWorker

Background worker which gets data from asynchronous queue and passes it to synchronous or Asynchronous target.

Queue worker is created as soon as annotator detects q=True or queue=True param. Default queue is asyncio.queues.Queue. If you want to use e.g. priority queue, specify its class instead of just True.

# with annotation - function becomes worker target
from neotasker import background_worker

@background_worker(q=True)
def f(task, **kwargs):
    print('Got task from queue: {}'.format(task))

@background_worker(q=asyncio.queues.PriorityQueue)
def f2(task, **kwargs):
    print('Got task from queue too: {}'.format(task))

# with class
from neotasker import BackgroundQueueWorker

class MyWorker(BackgroundQueueWorker):

    def run(self, task, *args, **kwargs):
        print('my task is {}'.format(task))


f.start()
f2.start()
worker3 = MyWorker()
worker3.start()
f.put_threadsafe('task 1')
f2.put_threadsafe('task 2')
worker3.put_threadsafe('task 3')

put method is used to put task into worker’s queue. The method is thread-safe.

BackgroundEventWorker

Background worker which runs asynchronous loop waiting for the event and launches synchronous or asynchronous target when it’s happened.

Event worker is created as soon as annotator detects e=True or event=True param.

# with annotation - function becomes worker target
from neotasker import background_worker

@background_worker(e=True)
def f(task, **kwargs):
    print('happened')

# with class
from neotasker import BackgroundEventWorker

class MyWorker(BackgroundEventWorker):

    def run(self, *args, **kwargs):
        print('happened')


f.start()
worker3 = MyWorker()
worker3.start()
f.trigger_threadsafe()
worker3.trigger_threadsafe()

trigger_threadsafe method is used to put task into worker’s queue. The method is thread-safe. If worker is triggered from the same asyncio loop, trigger method can be used instead.

BackgroundIntervalWorker

Background worker which runs synchronous or asynchronous target function with the specified interval or delay.

Worker initial parameters:

  • interval run target with a specified interval (in seconds)
  • delay delay between target launches
  • delay_before delay before target launch

Parameters interval and delay can not be used together. All parameters can be overriden during startup by adding _ prefix (e.g. worker.start(_interval=1))

Background interval worker is created automatically, as soon as annotator detects one of the parameters above:

@background_worker(interval=1)
def myfunc(**kwargs):
    print('I run every second!')

@background_worker(interval=1)
async def myfunc2(**kwargs):
    print('I run every second and I am async!')

myfunc.start()
myfunc2.start()

As well as event worker, BackgroundIntervalWorker supports manual target triggering with worker.trigger() and worker.trigger_threadsafe()

Sometimes it’s required to call worker target function manually, without triggering - in example, to return result to the user. If you want interval worker to calculate the next scheduled execution from the time when target function was manually executed, use skip=True parameter when triggering:

def apifunc_trigger_worker_manually():
   myfunc2.trigger_threadsafe(skip=True)
   return myfunc2.run()

The call acts exactly like usual triggering, except worker skip the next target function execution once.

Task collections

Task collections are useful when you need to run a pack of tasks e.g. on program startup or shutdown. Currently collections support running task functions only either in a foreground (one-by-one) or as the threads.

Function priority must be specified as a number (lower = higher priority).

FunctionCollection

Simple collection of functions.

from neotasker import FunctionCollection

def error(**kwargs):
   import traceback
   traceback.print_exc()

startup = FunctionCollection(on_error=error)

@startup
def f1():
    return 1

@startup(priority=100)
def f2():
    return 2

# lower number = higher priority
@startup(priority=10)
def f3():
    return 3

result, all_ok = startup.execute()
class neotasker.FunctionCollection(**kwargs)
Parameters:
  • on_error – function, launched when function in collection raises an exception
  • on_error_kwargs – additional kwargs for on_error function
  • include_exceptions – include exceptions into final result dict
append(f, priority=None)

Append function without annotation

Parameters:
  • f – function
  • priority – function priority
execute()

Run all functions in collection

Returns:a tuple { ‘<function>’: ‘<function_return>’, …}, ALL_OK where ALL_OK is True if no function raised an exception
remove(f)

Remove function

Parameters:f – function
run()

Run all functions in collection

Returns:result dict as

{ ‘<function>’: ‘<function_return>’, … }

Thread local proxy

from neotasker import g

if not g.has('db'):
    g.set('db', <new_db_connection>)

Supports methods:

class neotasker.LocalProxy

Simple proxy for threading.local namespace

clear(attr)

Clear (delete) thread-local attribute

Parameters:attr – attribute name
get(attr, default=None)

Get thread-local attribute

Parameters:
  • attr – attribute name
  • default – default value if attribute is not set
Returns:

attribute value or default value

has(attr)

Check if thread-local attribute exists

Parameters:attr – attribute name
Returns:True if attribute exists, False if not
set(attr, value)

Set thread-local attribute

Parameters:
  • attr – attribute name
  • value – attribute value to set

Debugging

The library uses logger “neotasker” to log all events.

Additionally, for debug messages, method neotasker.set_debug() should be called.