aiopipe

Utilities for Asynchronous Processing

Classes

AsyncFilter(pipeline, *_args, **_kwargs)

Function object type called by Scanner

AsyncPipeline([threads])

Processes items in an asyncio pipeline

AsyncRequests([cache_fn])

Provides helpers for async access to URLs

Exceptions

EndProcessing

Raised by AsyncFilter to tell AsyncPipeline to stop processing

EndProcessingItem(item, *args)

Raised to indicate that an item should not be processed further

Documentation

exception bioconda_utils.aiopipe.EndProcessing[source]

Raised by AsyncFilter to tell AsyncPipeline to stop processing

exception bioconda_utils.aiopipe.EndProcessingItem(item, *args)[source]

Raised to indicate that an item should not be processed further

log(uselogger=<Logger bioconda_utils.aiopipe (INFO)>, level=None)[source]

Print message using provided logging func

property name

Name of class

class bioconda_utils.aiopipe.AsyncFilter(pipeline, *_args, **_kwargs)[source]

Function object type called by Scanner

abstract async apply(recipe)[source]

Process a recipe. Returns False if processing should stop

async async_init()[source]

Called inside loop before processing

Return type:

None

finalize()[source]

Called at the end of a run

Return type:

None

class bioconda_utils.aiopipe.AsyncPipeline(threads=None)[source]

Processes items in an asyncio pipeline

loop

our asyncio loop

threads

number of threads to use

io_sem: Semaphore

semaphore to limit io parallelism

conda_sem: Semaphore

must never run more than one conda at the same time (used by PyPi when running skeleton)

filters: List[AsyncFilter]

the filters successively applied to each item

proc_pool_executor

executor running things in separate python processes

add(filt, *args, **kwargs)[source]

Adds Filter to this Scanner

Return type:

None

run()[source]

Enters the asyncio loop and manages shutdown.

Return type:

bool

async process(item)[source]

Applies the filters to an item

Return type:

bool

async run_io(func, *args)[source]

Run func in thread pool executor using args

async run_sp(func, *args)[source]

Run func in process pool executor using args

class bioconda_utils.aiopipe.AsyncRequests(cache_fn=None)[source]

Provides helpers for async access to URLs

USER_AGENT = 'bioconda/bioconda-utils'

Used as user agent in http requests and as requester in github API requests

session: ClientSession

aiohttp session (only exists while running)

cache: Optional[Dict[str, Dict[str, str]]]

cache

get_text_from_url(url)[source]

Fetch content at url and return as text

  • On non-permanent errors (429, 502, 503, 504), the GET is retried 10 times with increasing wait times according to fibonacci series.

  • Permanent errors raise a ClientResponseError

Return type:

str

async get_checksum_from_url(url, desc)[source]

Compute sha256 checksum of content at url

  • Shows TQDM progress monitor with label desc.

  • Caches result

Return type:

str

get_checksum_from_http(url, desc)[source]

Compute sha256 checksum of content at http url

Shows TQDM progress monitor with label desc.

Return type:

str

get_file_from_url(fname, url, desc)[source]

Fetch file at url into fname

Shows TQDM progress monitor with label desc.

Return type:

None

async get_ftp_listing(url)[source]

Returns list of files at FTP url

async get_checksum_from_ftp(url, _desc=None)[source]

Compute sha256 checksum of content at ftp url

Does not show progress monitor at this time (would need to get file size first)