worker

Celery Worker Setup

Functions

custom_dumps(string)

Serialize s to JSON accepting for_json serializer method

custom_loads(string)

Deserialize s recreating objects

setup_new_celery_process([sender, conf])

This hook is called when a celery worker is initialized

Classes

AsyncTask

Task class with support for async tasks

Documentation

class bioconda_utils.bot.worker.AsyncTask[source]

Bases: celery.app.task.Task

Task class with support for async tasks

We override celery.Task with our own version, with some extra features and defaults:

  • Since we already use a lot of async stuff elsewhere, it’s useful to allow the run method of tasks be async. This Task class detects if the method provided is a coroutine and runs it inside the asyncio event loop.

    >>> @app.task()
    >>> async def mytask(self, bind=True):
    >>>    await self.async_init()
    >>>    ...
    
  • Provide access to a GitHubAppHandler instance shared at least within the worker process.

    This is a little tedious. Since the task may be spawned some time after the webook that created it was triggered, the tokens we got inside the webserver may have timed out. In an attempt to avoid wasting API calls to create those tokens continuously, the Task class maintains a copy.

  • Default to acks_late = True. The reason we use Celery at all is so that spawned tasks can survive a shutdown of the app.

acks_late = True

Our tasks should be re-run if they don’t finish

async async_init()[source]

Init things that need to be run inside the loop

This happens during binding -> on load.

async async_pre_run(args, _kwargs)[source]

Per-call async initialization

Prepares the ghapi property for tasks.

FIXME: doesn’t replace kwargs

bind(app=None)[source]

Intercept binding of task to (celery) app

Here we take the half-finished generated Task class and replace the async run method with a sync run method that executes the original method inside the asyncio loop.

ghapi = None

Access the Github API

ghappapi = None

Access Github App API

property loop

Get the async loop - creating a new one if necessary

abstract run(*_args, **_kwargs)[source]

The tasks actual run method. Will be replaced during bind

bioconda_utils.bot.worker.custom_dumps(string)[source]

Serialize s to JSON accepting for_json serializer method

bioconda_utils.bot.worker.custom_loads(string)[source]

Deserialize s recreating objects

JSON objects (dicts) containing a __type__ and a __module__ field are turned into objects by loading and instantiating the type, passing the result dict from obj.for_json() to __init__().

bioconda_utils.bot.worker.setup_new_celery_process(sender=None, conf=None, **_kwargs)[source]

This hook is called when a celery worker is initialized

Here we make sure that the GPG signing key is installed