Source code for dohlee.thread

import multiprocessing as mp
from tqdm import tqdm, tqdm_notebook


[docs]def imap_helper(args): """ Helper function for imap. This is needed since built-in multiprocessing library does not have `istarmap` function. If packed arguments are passed, it unpacks the arguments and pass through the function. Otherwise, it just pass the argument through the given function. Attributes: args: Tuple of two arguments, user-defined function and arguments to pass through. """ assert len(args) == 2 func = args[0] # Arguments are packed as a list or a tuple, so pass *args. if isinstance(args[1], list) or isinstance(args[1], tuple): return func(*(args[1])) # Otherwise, just pass the argument. else: return func(args[1])
[docs]def threaded(func, params, processes, progress=False, progress_type='tqdm'): """ Generate results of the function with given parameters with threads. Attributes: func (function): Function to be executed. params (iterable): A list of parameters. processes (int): Number of processes to work on. progress (bool): if True, show progress bar. progress_type (str): 'tqdm' or 'tqdm_notebook' can be used. """ n_params = len(list(params)) with mp.Pool(processes=processes) as p: if progress: if progress_type not in ['tqdm', 'tqdm_notebook']: # If given progresstype is not supported, # fall back to tqdm.tqdm. progress_type = 'tqdm' if progress_type == 'tqdm': # Use tqdm.tqdm. for result in tqdm(p.imap(imap_helper, [(func, p) for p in params]), total=n_params): yield result elif progress_type == 'tqdm_notebook': # Use tqdm.tqdm_notebook. for result in tqdm_notebook(p.imap(imap_helper, [(func, p) for p in params]), total=n_params): yield result else: for result in p.imap(imap_helper, [(func, p) for p in params]): yield result