Source code for arrlp.modules.debug_array_LP.debug_array

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Date          : 2026-02-12
# Author        : Lancelot PINCET
# GitHub        : https://github.com/LancelotPincet
# Library       : arrLP
# Module        : debug_array

"""
This function allows to debug array functions while comparing various modes.
"""



# %% Libraries
import numpy as np
try :
    import cupy as cp
except ImportError :
    cp = None
from time import perf_counter
import warnings



# %% Function
[docs] def debug_array(func, modes, stacks_list=None, channels_list=None, narrays=1, **kwargs) : f''' This function allows to debug array functions while comparing various modes. Parameters ---------- func : function function to test. modes : dict dict of dicts corresponding to various modes that should give same results. stacks_list : list List of stacks bools to test, must same size as channels_list. channels_list : list List of channels bools to test, must same size as stacks_list. narrays : int Number of input arrays. **kwargs : dict constant keyword arguments. Examples -------- >>> from arrlp import debug_array ... >>> debug_array(func, ndimes, dict("Normal": dict())) ''' # Data size ~2**24 nstacks = int(2**8) nchannels = int(2**2) match func.ndims : case 1: shape = (int(2**14),) case 2: shape = (int(2**7), int(2**7)) case 3: shape = (int(2**4), int(2**5), int(2**5)) case _: raise ValueError(f'number of dimensions cannot be {func.ndims}') # Manage modes modes = {key: value for key, value in modes.items() if cp is not None or not value["cuda"]} # Timeit function def timeit(key, arrays, stacks, channels, parallel, cuda, **kw) : # Init xp = cp if cuda else np arrays = [xp.asarray(array) for array in arrays] # Calculate with warnings.catch_warnings(): warnings.simplefilter("ignore") print('Compile run...', end='') func(*arrays, stacks=stacks, channels=channels, parallel=parallel, cuda=cuda, test=True, **kwargs, **kw) print('\rRun... ', end='') tic = perf_counter() out = func(*arrays, stacks=stacks, channels=channels, parallel=parallel, cuda=cuda, test=True, **kwargs, **kw) if cuda : cp.cuda.Stream.null.synchronize() toc = perf_counter() print(f'\r{key} took {(toc-tic)*1000:.3f}ms ') if cuda : out = xp.asnumpy(out) return out # Loops if channels_list is None : channels_list = [False, False, True, True] if stacks_list is None : stacks_list = [False, True, False, True] for channels, stacks in zip(channels_list, stacks_list) : print(f'\nTesting for stacks={stacks} and channels={channels}') # Array match stacks, channels : case (True, True) : _shape = (nstacks, *shape, nchannels) case(True, False) : _shape = (nstacks, *shape) case(False, True) : _shape = (*shape, nchannels) case(False, False) : _shape = shape arrays = [np.random.random(_shape) for _ in range(narrays)] # Calculate results = {} ref = None for key, value in modes.items(): parallel = value['parallel'] if (not channels and not stacks) and parallel and hasattr(func, "use_joblib") and func.use_joblib : continue results[key] = timeit(key, arrays, stacks, channels, **value) if ref is None : ref = key # Compare for key, value in results.items(): if key == ref : continue print(f"\rChecking correctness vs reference: {key}" + " "*(60-34-len(key)), end='') np.testing.assert_allclose( results[ref], results[key], rtol=1e-4, atol=1e-5, err_msg="Outputs differ between optimizations" ) print('\r'+' '*60+'\n')
# %% Test function run if __name__ == "__main__": from corelp import test test(__file__)