Source code for arrlp.modules.FunctionArray_LP.FunctionArray

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Date          : 2026-02-13
# Author        : Lancelot PINCET
# GitHub        : https://github.com/LancelotPincet
# Library       : arrLP
# Module        : FunctionArray

"""
This class defines a function for various array configurations.
"""



# %% Libraries
import types
from dataclasses import dataclass, field
import numpy as np
import scipy
from joblib import Parallel, delayed
from numba import njit, prange

# Check cupy
try :
    import cupy as cp
    import cupyx.scipy as cupyx_scipy
    from cupyx.scipy import ndimage as cupyx_ndimage
except ImportError :
    cp = None



# %% Class
[docs] @dataclass(slots=True, kw_only=True) class FunctionArray() : ''' This class defines a function for various array configurations. Parameters ---------- ndims : int Number of dimensions of the base array. cpu_function : types.FunctionType Cpu function. par_function : types.FunctionType Par function. gpu_function : types.FunctionType Gpu function. out_function : types.FunctionType Function defining output array. ini_function : types.FunctionType Function defining initialization kwargs. cpu_loop : bool True if cpu_function applyies default loop. par_loop : bool True if par_function applyies default loop. gpu_loop : bool True if gpu_function applyies default loop. use_joblib : bool True to use joblib for parallel processes in parallel loop. remove_parallel : bool True if the parallel implementation is slower than normal python. remove_cuda : bool True if the cuda implementation is slower than normal python. Examples -------- >>> from arrlp import FunctionArray ... >>> instance = FunctionArray(TODO) ''' # Mandatory ndims : int cpu_function : types.FunctionType = field(repr=False) par_function : types.FunctionType = field(repr=False) gpu_function : types.FunctionType = field(repr=False) out_function : types.FunctionType = field(repr=False) ini_function : types.FunctionType = field(repr=False) # Loops cpu_loop : types.FunctionType = field(default=False, repr=False) par_loop : types.FunctionType = field(default=False, repr=False) gpu_loop : types.FunctionType = field(default=False, repr=False) use_joblib : bool = field(default=True, repr=False) # Performances remove_parallel : bool = field(default=False, repr=False) remove_cuda : bool = field(default=False, repr=False) # Methods stacks : bool = field(init=False, repr=False) channels : bool = field(init=False, repr=False) parallel : bool = field(init=False, repr=False) cuda : bool = field(init=False, repr=False) def __call__(self, array, *args, out=None, # Arrays stacks=False, channels=False, parallel=False, cuda=False, test=False, iterator=range, # Modes **kwargs) : # Arguments of the function ''' This is the main function call Parameters ---------- array : np.array At least one array from which we derive stacks and channels. *args : tuple(np.array) Tuple of array corresponding to inputs that follow the same stacks and channels dimensions. out : np.array Output array where to put result, might not be always available stacks : bool True to consider stacks dimensions channels : bool True to consider channels dimensions parallel : bool True to optimize in parallel cpu cores cuda : bool True to optimize on gpu test : bool [dev only], True when testing the speed to avoid raising errors on function that were defined as not optimal iterator : iterator Iterator defining how to loop on dimensions, must take an int as only input, allow to put progress bars. **kwargs : dict Other constant inputs to apply on each array Returns ------- out : np.array Output array. ''' # checks if iterator is None : iterator = range if cuda : parallel = False if parallel is True : parallel = -1 if parallel == 1 : parallel = False self.checks(out, stacks, channels, parallel, cuda, test) self.stacks, self.channels, self.parallel, self.cuda = stacks, channels, parallel, cuda array = self.xp.asarray(array) args = [self.xp.asarray(arg) for arg in args] if self.ini_function is not None : kwargs.update(self.ini_function(self, array, *args, **kwargs)) out = out if out is not None else self.out_function(self, array, *args, **kwargs) if self.out_function is not None else None # Cuda if cuda : if self.gpu_loop : return self.loop(self.gpu_function, iterator, out, array, *args, **kwargs) return self.gpu_function(self, out, array, *args, **kwargs) # Parallel elif parallel : if self.par_loop : if self.use_joblib : return self.parallel_loop(self.par_function, out, array, *args, **kwargs) else : return self.loop(self.par_function, iterator, out, array, *args, **kwargs) return self.par_function(self, out, array, *args, **kwargs) # Python else : if self.cpu_loop : return self.loop(self.cpu_function, iterator, out, array, *args, **kwargs) return self.cpu_function(self, out, array, *args, **kwargs) # Properties @property def axes(self) : start = int(self.stacks) return tuple(range(start, start + self.ndims)) @property def xp(self) : return cp if self.cuda else np @property def scipyx(self) : return cupyx_scipy if self.cuda else scipy @property def ndimage(self) : return cupyx_ndimage if self.cuda else scipy.ndimage
[docs] def shape(self, array) : # function of array start = int(self.stacks) return array.shape[start:start + self.ndims]
[docs] def checks(self, out, stacks, channels, parallel, cuda, test) : ''' Make checks on asked mode ''' # One optimization if parallel and cuda : raise ValueError('Cuda and Parallel cannot be True at the same time') # No parallel if parallel and self.remove_parallel and not test : raise ValueError('Parallel optimization is not effective in this function') # Cuda not available if cuda and cp is None : raise ValueError('Cuda was asked but is not available in this environment') # No cuda if cuda and self.remove_cuda and not test : raise ValueError('Cuda optimization is not effective in this function') # Joblib when no additional channels if parallel and not stacks and not channels and self.use_joblib : raise ValueError('Normal array (no stack of channel) cannot be calculated in parallel') # Inplace out not possible if out is not None and self.out_function is None and not stacks and not channels : raise ValueError('Output cannot be defined in this function for normal single array')
[docs] def loop(self, func, iterator, out, array, *args, **kwargs) : nstacks, nchannels = array.shape[0], array.shape[-1] match (self.stacks, self.channels) : case (False, False) : return func(self, out, array, *args, **kwargs) case (True, False) : for i in iterator(nstacks) : stack_out = None if out is None else out[i] _stack_out = func(self, stack_out, array[i], *(arg[i] for arg in args), **kwargs) if out is None : if i == 0 : _out = self.xp.empty_like(_stack_out, shape=(nstacks, *_stack_out.shape)) _out[i] = _stack_out return _out if out is None else out case (False, True) : for j in iterator(nchannels) : channel_out = None if out is None else out[..., j] _channel_out = func(self, channel_out, array[..., j], *(arg[..., j] for arg in args), **kwargs) if out is None : if j == 0 : _out = self.xp.empty_like(_channel_out, shape=(*_channel_out.shape, nchannels)) _out[..., j] = _channel_out return _out if out is None else out case (True, True) : for i in iterator(nstacks) : stack_out, stack_array, stack_args = None if out is None else out[i], array[i], [arg[i] for arg in args] if out is None and i > 0 : _stack_out = _out[i] for j in range(nchannels) : channel_out = None if stack_out is None else stack_out[..., j] _channel_out = func(self, channel_out, stack_array[..., j], *(arg[..., j] for arg in stack_args), **kwargs) if out is None : if i == 0 and j == 0 : _out = self.xp.empty_like(_channel_out, shape=(nstacks, *_channel_out.shape, nchannels)) _stack_out = _out[i] _stack_out[..., j] = _channel_out return _out if out is None else out case _ : raise SyntaxError(f'Cannot use (stacks, channels, out_name)={(self.stacks, self.channels, out_name)}')
[docs] def parallel_loop(self, func, out, array, *args, **kwargs): nstacks, nchannels = array.shape[0], array.shape[-1] match (self.stacks, self.channels): case (False, False): raise SyntaxError( "This parallel scenario with no stack nor channel should not exist. " "[should be corrected in checks]" ) case (True, False): if out is None: first = func(self, None, array[0], *(arg[0] for arg in args), **kwargs) out = np.empty_like(first, shape=(nstacks, *first.shape)) out[0] = first start = 1 else: start = 0 Parallel(n_jobs=self.parallel, prefer="threads", require="sharedmem")( delayed(func)( self, out[i], array[i], *(arg[i] for arg in args), **kwargs, ) for i in range(start, nstacks) ) return out case (False, True): if out is None: first = func(self, None, array[..., 0], *(arg[..., 0] for arg in args), **kwargs) out = np.empty_like(first, shape=(*first.shape, nchannels)) out[..., 0] = first start = 1 else: start = 0 Parallel(n_jobs=self.parallel, prefer="threads", require="sharedmem")( delayed(func)( self, out[..., j], array[..., j], *(arg[..., j] for arg in args), **kwargs, ) for j in range(start, nchannels) ) return out case (True, True): if out is None: first = func( self, None, array[0, ..., 0], *(arg[0, ..., 0] for arg in args), **kwargs, ) out = np.empty_like(first, shape=(nstacks, *first.shape, nchannels)) out[0, ..., 0] = first start = 1 else: start = 0 Parallel(n_jobs=self.parallel, prefer="threads", require="sharedmem")( delayed(func)( self, out[i, ..., j], array[i, ..., j], *(arg[i, ..., j] for arg in args), **kwargs, ) for k in range(start, nstacks * nchannels) for i, j in [(k // nchannels, k % nchannels)] ) return out case _: raise SyntaxError( f"Cannot use (stacks, channels)={(self.stacks, self.channels)}" )
# %% Test function run if __name__ == "__main__": from corelp import test test(__file__)