aiopipe

Utilities for Asynchronous Processing

Classes

AsyncFilter(pipeline, *_args, **_kwargs)

Function object type called by Scanner

AsyncPipeline([threads])

Processes items in an asyncio pipeline

AsyncRequests([cache_fn])

Provides helpers for async access to URLs

Exceptions

EndProcessing

Raised by AsyncFilter to tell AsyncPipeline to stop processing

EndProcessingItem(item, *args)

Raised to indicate that an item should not be processed further

Documentation

class bioconda_utils.aiopipe.AsyncFilter(pipeline, *_args, **_kwargs)[source]

Bases: abc.ABC, typing.Generic

Function object type called by Scanner

abstract async apply(recipe)[source]

Process a recipe. Returns False if processing should stop

async async_init()[source]

Called inside loop before processing

Return type

None

finalize()[source]

Called at the end of a run

Return type

None

class bioconda_utils.aiopipe.AsyncPipeline(threads=None)[source]

Bases: typing.Generic

Processes items in an asyncio pipeline

add(filt, *args, **kwargs)[source]

Adds Filter to this Scanner

Return type

None

conda_sem = None

must never run more than one conda at the same time (used by PyPi when running skeleton)

filters = None

the filters successively applied to each item

io_sem = None

semaphore to limit io parallelism

loop = None

our asyncio loop

proc_pool_executor = None

executor running things in separate python processes

async process(item)[source]

Applies the filters to an item

Return type

bool

run()[source]

Enters the asyncio loop and manages shutdown.

Return type

bool

async run_io(func, *args)[source]

Run func in thread pool executor using args

async run_sp(func, *args)[source]

Run func in process pool executor using args

threads = None

number of threads to use

class bioconda_utils.aiopipe.AsyncRequests(cache_fn=None)[source]

Bases: object

Provides helpers for async access to URLs

USER_AGENT = 'bioconda/bioconda-utils'

Used as user agent in http requests and as requester in github API requests

cache = None

cache

async get_checksum_from_ftp(url, _desc=None)[source]

Compute sha256 checksum of content at ftp url

Does not show progress monitor at this time (would need to get file size first)

get_checksum_from_http(url, desc)[source]

Compute sha256 checksum of content at http url

Shows TQDM progress monitor with label desc.

Return type

str

async get_checksum_from_url(url, desc)[source]

Compute sha256 checksum of content at url

  • Shows TQDM progress monitor with label desc.

  • Caches result

Return type

str

get_file_from_url(fname, url, desc)[source]

Fetch file at url into fname

Shows TQDM progress monitor with label desc.

Return type

None

async get_ftp_listing(url)[source]

Returns list of files at FTP url

get_text_from_url(url)[source]

Fetch content at url and return as text

  • On non-permanent errors (429, 502, 503, 504), the GET is retried 10 times with increasing wait times according to fibonacci series.

  • Permanent errors raise a ClientResponseError

Return type

str

session = None

aiohttp session (only exists while running)

exception bioconda_utils.aiopipe.EndProcessing[source]

Bases: BaseException

Raised by AsyncFilter to tell AsyncPipeline to stop processing

exception bioconda_utils.aiopipe.EndProcessingItem(item, *args)[source]

Bases: Exception

Raised to indicate that an item should not be processed further

log(uselogger=<Logger bioconda_utils.aiopipe (WARNING)>, level=None)[source]

Print message using provided logging func

property name

Name of class