Single Node Executor#

The SingleNodeExecutor in executorlib, is primarily used to enable rapid prototyping on a workstation computer to test your parallel Python program with executorlib before transferring it to an high performance computer (HPC). With the added capability of executorlib it is typically 10% slower than the ProcessPoolExecutor from the Python standard library on a single node, when all acceleration features are enabled. This overhead is primarily related to the creation of new tasks. So the performance of executorlib improves when the individual Python function calls require extensive computations.

An advantage that executorlib has over the ProcessPoolExecutor and the ThreadPoolExecutor from the Python standard libary, is the use of cloudpickle as serialization backend to transfer Python functions between processes. This enables the use of dynamically defined Python functions for example in the case of a Jupyter notebook.

Basic Functionality#

The general functionality of executorlib follows the Executor interface of the Python standard library. You can import the SingleNodeExecutor class directly from executorlib and then just replace the ProcessPoolExecutor or ThreadPoolExecutor with the SingleNodeExecutor class to start using executorlib.

from executorlib import SingleNodeExecutor

It is recommended to use the SingleNodeExecutor class in combination with a with-statement. This guarantees the processes created by the SingleNodeExecutor class to evaluate the Python functions are afterward closed and do not remain ghost processes. A function is then submitted using the submit(fn, /, *args, **kwargs) function which executes a given function fn as fn(*args, **kwargs). The submit() function returns a concurrent.futures.Future object, as defined by the Python Standard Library. As a first example we submit the function sum() to calculate the sum of the list [1, 1]:

%%time
with SingleNodeExecutor() as exe:
    future = exe.submit(sum, [1, 1])
    print(future.result())
2
CPU times: user 48.7 ms, sys: 92 ms, total: 141 ms
Wall time: 292 ms

As expected the result of the summation sum([1, 1]) is 2. The same result is retrieved from the concurrent.futures.Future object received from the submission of the sum() as it is printed here print(future.result()). For most Python functions and especially the sum() function it is computationally not efficient to initialize the SingleNodeExecutor class only for the execution of a single function call, rather it is more computationally efficient to initialize the SingleNodeExecutor class once and then submit a number of functions. This can be achieved with a loop. For example the sum of the pairs [2, 2], [3, 3] and [4, 4] can be achieved with a for-loop inside the context of the SingleNodeExecutor() class as provided by the with-statement.

%%time
with SingleNodeExecutor() as exe:
    future_lst = [exe.submit(sum, [i, i]) for i in range(2, 5)]
    print([f.result() for f in future_lst])
[4, 6, 8]
CPU times: user 29.8 ms, sys: 11.3 ms, total: 41.1 ms
Wall time: 693 ms

If only the parameters change but the function, which is applied to these parameters, remains the same, like in the case above the sum() function is applied to three pairs of parameters, then the map(fn, *iterables, timeout=None, chunksize=1) function can be used to map the function to the different sets of parameters - as it is defined in the Python standard library.

%%time
with SingleNodeExecutor() as exe:
    results = exe.map(sum, [[5, 5], [6, 6], [7, 7]])
    print(list(results))
[10, 12, 14]
CPU times: user 27.3 ms, sys: 15.2 ms, total: 42.5 ms
Wall time: 683 ms

These three examples cover the general functionality of the SingleNodeExecutor class. Following the Executor interface as it is defined in the Python standard library.

Parallel Functions#

Writing parallel software is not trivial. So rather than writing the whole Python program in a parallel way, executorlib allows developers to implement parallel execution on a function by function level. In this way individual functions can be replaced by parallel functions as needed without the need to modify the rest of the program. With the Local Mode executorlib supports two levels of parallel execution, parallel execution based on the Message Passing Interface (MPI) using the mpi4py package, or thread based parallel execution. Both levels of parallelism can be defined inside the function and do not require any modifications to the rest of the Python program.

MPI Parallel Functions#

MPI is the default way to develop parallel programs for HPCs. Still it can be challenging to refactor a previously serial program to efficiently use MPI to achieve optimal computational efficiency for parallel execution, even with libraries like mpi4py. To simplify the up-scaling of Python programs executorlib provides the option to use MPI parallel Python code inside a given Python function and then submit this parallel Python function to an SingleNodeExecutor for evaluation.

The following calc_mpi() function imports the mpi4py package and then uses the internal functionality of MPI to get the total number of parallel CPU cores in the current MPI group MPI.COMM_WORLD.Get_size() and the index of the current processor in the MPI group MPI.COMM_WORLD.Get_rank().

The mpi4py package is an optional dependency of executorlib. The installation of the mpi4py package is covered in the installation section.

def calc_mpi(i):
    from mpi4py import MPI

    size = MPI.COMM_WORLD.Get_size()
    rank = MPI.COMM_WORLD.Get_rank()
    return i, size, rank

The computational resources for the execution of the calc_mpi() Python function are defined using the resource dictionary parameter resource_dict={}. The reseource dictionary can either be provided as additional parameter for the submit() function. It is important that the parameter name resource_dict is reserved exclusively for the submit() function and cannot be used in the function which is submitted, like the calc_mpi() function in this example:

with SingleNodeExecutor() as exe:
    fs = exe.submit(calc_mpi, 3, resource_dict={"cores": 2})
    print(fs.result())
[(3, 2, 0), (3, 2, 1)]

Another option is to set the resource dictionary parameter resource_dict during the initialization of the Executor. In this case it is internally set for every call of the submit() function, without the need to specify it again.

with SingleNodeExecutor(resource_dict={"cores": 2}) as exe:
    fs = exe.submit(calc_mpi, 3)
    print(fs.result())
[(3, 2, 0), (3, 2, 1)]

In addition, to the compute cores cores, the resource dictionary parameter resource_dict can also define the threads per core as threads_per_core, the GPUs per core as gpus_per_core, the working directory with cwd, the option to use the OpenMPI oversubscribe feature with openmpi_oversubscribe and finally for the Simple Linux Utility for Resource Management (SLURM) queuing system the option to provide additional command line arguments with the slurm_cmd_args parameter - resource dictionary.

Thread Parallel Functions#

An alternative option of parallelism is thread based parallelism. executorlib supports thread based parallelism with the threads_per_core parameter in the resource dictionary resource_dict. Given the global interpreter lock in the cPython implementation a common application of thread based parallelism in Python is using additional threads in linked libraries. The number of threads is commonly controlled with environment variables like OMP_NUM_THREADS, OPENBLAS_NUM_THREADS, MKL_NUM_THREADS, VECLIB_MAXIMUM_THREADS and NUMEXPR_NUM_THREADS. Specific libraries might require other environment variables. The environment variables can be set using the environment interface of the Python standard library os.environ.

def calc_with_threads(i):
    import os

    os.environ["OMP_NUM_THREADS"] = "2"
    os.environ["OPENBLAS_NUM_THREADS"] = "2"
    os.environ["MKL_NUM_THREADS"] = "2"
    os.environ["VECLIB_MAXIMUM_THREADS"] = "2"
    os.environ["NUMEXPR_NUM_THREADS"] = "2"
    import numpy as np

    return i

Again the resource dictionary parameter resource_dict can be set either in the submit() function:

with SingleNodeExecutor() as exe:
    fs = exe.submit(calc_with_threads, 3, resource_dict={"threads_per_core": 2})
    print(fs.result())
3

Or alternatively, the resource dictionary parameter resource_dict can also be set during the initialization of the Executor class:

with SingleNodeExecutor(resource_dict={"threads_per_core": 2}) as exe:
    fs = exe.submit(calc_with_threads, 3)
    print(fs.result())
3

For most cases MPI based parallelism leads to higher computational efficiency in comparison to thread based parallelism, still the choice of parallelism depends on the specific Python function which should be executed in parallel. Careful benchmarks are required to achieve the optimal performance for a given computational architecture.

Beyond MPI based parallelism and thread based parallelism the HPC Cluster Executors and the HPC Job Executors also provide the option to assign GPUs to the execution of individual Python functions.

Performance Optimization#

The default settings of executorlib are chosen to favour stability over performance. Consequently, the performance of executorlib can be improved by setting additional parameters. It is commonly recommended to start with an initial implementation based on executorlib and then improve the performance by enabling specialized features.

Block Allocation#

By default each submitted Python function is executed in a dedicated process. This gurantees that the execution of the submitted Python function starts with a fresh process. Still the initialization of the Python process takes time. Especially when the call of the Python function requires only limited computational resources it makes sense to reuse the existing Python process for the execution of multiple Python functions. In executorlib this functionality is enabled by setting the block_allocation parameter to Ture. To limit the number of parallel Python processes when using block allocation it is recommended to set the max_workers parameter to restrict the number of available computing resources.

%%time
with SingleNodeExecutor(max_workers=2, block_allocation=True) as exe:
    future = exe.submit(sum, [1, 1])
    print(future.result())
2
CPU times: user 26.7 ms, sys: 8.56 ms, total: 35.2 ms
Wall time: 468 ms

The same functionality also applies to MPI parallel Python functions. The important part is that while it is possible to assign more than one Python process to the execution of a Python function in block allocation mode, it is not possible to assign resources during the submission of the function with the submit() function. Starting again with the calc_mpi() function:

def calc_mpi(i):
    from mpi4py import MPI

    size = MPI.COMM_WORLD.Get_size()
    rank = MPI.COMM_WORLD.Get_rank()
    return i, size, rank

Still the resource dictionary parameter can still be set during the initialisation of the SingleNodeExecutor class. Internally, this groups the created Python processes in fixed allocations and afterwards submit Python functions to these allocations.

with SingleNodeExecutor(max_workers=2, resource_dict={"cores": 2}, block_allocation=True) as exe:
    fs = exe.submit(calc_mpi, 3)
    print(fs.result())
[(3, 2, 0), (3, 2, 1)]

The weakness of memory from a previous Python function remaining in the Python process can at the same time be an advantage for working with large datasets. In executorlib this is achieved by introducing the init_function parameter. The init_function returns a dictionary of parameters which can afterwards be reused as keyword arguments **kwargs in the functions submitted to the Executor. When block allocation block_allocation is disabled this functionality is not available, as each function is executed in a separate process, so no data can be preloaded.

def init_function():
    return {"j": 4, "k": 3, "l": 2}
def calc_with_preload(i, j, k):
    return i + j + k

The function calc_with_preload() requires three inputs i, j and k. But when the function is submitted to the executor only two inputs are provided fs = exe.submit(calc, 2, j=5). In this case the first input parameter is mapped to i=2, the second input parameter is specified explicitly j=5 but the third input parameter k is not provided. So the SingleNodeExecutor automatically checks the keys set in the init_function() function. In this case the returned dictionary {"j": 4, "k": 3, "l": 2} defines j=4, k=3 and l=2. For this specific call of the calc_with_preload() function, i and j are already provided so j is not required, but k=3 is used from the init_function() and as the calc_with_preload() function does not define the l parameter this one is also ignored.

with SingleNodeExecutor(max_workers=2, init_function=init_function, block_allocation=True) as exe:
    fs = exe.submit(calc_with_preload, 2, j=5)
    print(fs.result())
10

The result is 2+5+3=10 as i=2 and j=5 are provided during the submission and k=3 is defined in the init_function() function.

Cache#

The development of scientific workflows is commonly an interactive process, extending the functionality step by step. This lead to the development of interactive environments like Jupyter which is fully supported by executorlib. Still many of the computationally intensive Python functions can take in the order of minutes to hours or even longer to execute, so reusing an existing Python process is not feasible. To address this challenge executorlib provides a file based cache to store the results of previously computed concurrent future Futures objects. The results are serialized using cloudpickle and stored in a user-defined cache directory cache_directory to be reloaded later on. Internally, the hierarchical data format (HDF5) is used via the h5py, which is an optional dependency for executorlib.

The h5py package is an optional dependency of executorlib. The installation of the h5py package is covered in the installation section.

%%time
with SingleNodeExecutor(cache_directory="./file") as exe:
    future_lst = [exe.submit(sum, [i, i]) for i in range(1, 4)]
    print([f.result() for f in future_lst])
[2, 4, 6]
CPU times: user 509 ms, sys: 44.4 ms, total: 553 ms
Wall time: 1.19 s

When the same code is executed again, executorlib finds the existing results in the cache directory specified by the cache_directory parameter and reloads the result, accelerating the computation especially during the prototyping phase when similar computations are repeated frequently for testing.

Still it is important to mention, that this cache is not designed to identify the submission of the same parameters within the context of one with-statement. It is the task of the user to minimize duplicate computations, the cache is only designed to restore previous calculation results when the Python process managing executorlib was stopped after the successful execution.

%%time
with SingleNodeExecutor(cache_directory="./file") as exe:
    future_lst = [exe.submit(sum, [i, i]) for i in range(1, 4)]
    print([f.result() for f in future_lst])
[2, 4, 6]
CPU times: user 43.2 ms, sys: 22.8 ms, total: 66 ms
Wall time: 691 ms

An additional advantage of the cache is the option to gather the results of previously submitted functions. Using the get_cache_data() function the results of each Python function is converted to a dictionary. This list of dictionaries can be converted to a pandas.DataFrame for further processing:

import pandas
from executorlib import get_cache_data

df = pandas.DataFrame(get_cache_data(cache_directory="./file"))
df
function input_args input_kwargs output runtime filename
0 <built-in function sum> ([2, 2],) {} 4 0.098344 /home/jovyan/file/sum89afbdf9da5eb1794f6976a3f...
1 <built-in function sum> ([3, 3],) {} 6 0.096226 /home/jovyan/file/sum0f7710227cda6456e5d071877...
2 <built-in function sum> ([1, 1],) {} 2 0.085434 /home/jovyan/file/sumf5ad27b855231a293ddd735a8...

After the development phase is concluded it is the task of the user to remove the cache directory defined with the cache_directory parameter. The cache directory is never removed by executorlib to prevent the repeation of expensive computations. Still as disk space on shared file systems in HPC environments is commonly limited it is recommended to remove the cache directory once the development process concluded.

import os
import shutil

cache_dir = "./file"
if os.path.exists(cache_dir):
    print(os.listdir(cache_dir))
    try:
        shutil.rmtree(cache_dir)
    except OSError:
        pass
['sum89afbdf9da5eb1794f6976a3f01697c2_o.h5', 'sum0f7710227cda6456e5d07187702313f3_o.h5', 'sumf5ad27b855231a293ddd735a8554c9ea_o.h5']

Typically the use of the cache is recommended for development processes only and for production workflows the user should implement their own long-term storage solution. The binary format used by executorlib is based on cloudpickle and might change in future without further notice, rendering existing data in the cache unusable. Consequently, using the cache beyond the development process is not recommended. In addition the writing of the results to files might result in additional overhead for accessing the shared file system.

Dependencies#

Many scientific Python programs consist of series of Python function calls with varying level of parallel computations or map-reduce patterns where the same function is first mapped to a number of parameters and afterwards the results are reduced in a single function. To extend the Executor interface of the Python standard library to support this programming pattern, the SingleNodeExecutor class from executorlib supports submitting Python concurrent futures Future objects to the SingleNodeExecutor which are resolved before submission. So the SingleNodeExecutor internally waits until all Python concurrent futures Future objects are successfully executed before it triggers the execution of the submitted Python function.

def calc_add(a, b):
    return a + b

For example the function which adds two numbers calc_add() is used in a loop which adds a counter to the previous numbers. In the first iteration the future parameter is set to 0 but already in the second iteration it is the Python concurrent futures Future object of the first iteration and so on.

The important part is that the user does not have to wait until the first function is executed but instead the waiting happens internally in the SingleNodeExecutor.

with SingleNodeExecutor() as exe:
    future = 0
    for i in range(1, 4):
        future = exe.submit(calc_add, i, future)
    print(future.result())
6

As the reusing of existing concurrent futures Future object can lead to rather complex dependencies, executorlib provides the option to plot the dependency graph by setting the plot_dependency_graph=True during the initialization of the SingleNodeExecutor class.

No computation is executed when the plot_dependency_graph=True is set. This parameter is for debugging only.

Internally, the pygraphviz package is used for the visualisation of these dependency graphs. It is an optional dependency of executorlib. The installation of the pygraphviz package is covered in the installation section.

with SingleNodeExecutor(plot_dependency_graph=True) as exe:
    future = 0
    for i in range(1, 4):
        future = exe.submit(calc_add, i, future)
    print(future.result())
None
_images/420a453ec4f130fcd74edc6ce9e1cfd0da6518ec59fe19d7e4e804a4b736a303.svg

Advanced Scheduling#

Going beyond just directed acyclic graphs (DAG) with one-to-many and many-to-one relationships, executorlib provides a number of advanced scheduling patterns. These are briefly introduced in the following.

Runtime-dependent Batching#

To maximize the throughput of dependent calculation tasks its important to idenify all tasks which can be executed at a given moment. Unfortunately, some of these dependencies can only be determined at run time, which is challenging for most schedulers. To demonstrate the runtime-dependent batching in executorlib we discuss the following example. Starting with a group of ten tasks and then grouping them into groups of three for processing. Still the order of the tasks, which tasks belong into which group, is only determined at run time.

For simplicity, we just use a simple function which directly returns the input.

def reply(i):
    return i

After the group of ten tasks is submitted their future objects are stored in a list named future_individual_lst. This list is then provided to the batched() function of the SingleNodeExecutor() to generate batches of tasks which are then provided to the sum function for further processing. The results of this second step are stored in the future_group_lst. Finally, the results of these future objects are evaluated in the third step.

with SingleNodeExecutor() as exe:
    future_individual_lst = [
        exe.submit(reply, i) for i in range(10)
    ]
    future_group_lst = [
        exe.submit(sum, f) for f in exe.batched(future_individual_lst, n=3)
    ]
    print(sum([f.result() for f in future_group_lst]))
45

Split Future Objects#

In analogy to the batched() function which combines multiple future objects in a batch for further processing, it is also necessary to split the future objects even before the evaluation is completed. Executorlib provides two utility functions, namely split_future() for tuples and lists and get_item_from_future() for dictionaries.

Starting with a function which returns a tuple, named get_a_tuple():

def get_a_tuple(i):
    return "a", "b", i

This function is submitted to the SingleNodeExecutor() and while in this case it directly returns the tuple, the evaluation would commonly take much longer. By having the ability to split the output of the future object using the split_future() function, a number of future objects is generated one for each element of the tuple. This is enabled by providing the number of elements in the tuple as an additional parameter n=3.

from executorlib import split_future
with SingleNodeExecutor() as exe:
    future = exe.submit(get_a_tuple, 15)
    f1, f2, f3 = split_future(future=future, n=3)
    print(f1.result(), f2.result(), f3.result())
a b 15

In analogy, to the split_future() function for lists and tuples, the get_item_from_future() function returns one item of a dicitionary which is returned by a function submitted to an Executor(). In this example the get_a_dict() function returns a dictionary, again in this example the dictionary is returned directly while commonly this would take much longer.

def get_a_dict(i):
    return {"a": 1, "b": 2, "c": i}

The get_a_dict() function is submitted to the SingleNodeExecutor(), it returns a future object named future_dict. Still as we know that the result() of this future object future_dict returns a dictionary, we can already access the items of this dictionary with the get_item_from_future() function. The get_item_from_future() function takes a future object as input in addition to the key of the dictionary which should be accessed, as a result the get_item_from_future() function returns a future object for the value related to the key. In this example these future objects are named f1, f2 and f3. These fucture objects are evaluated afterwards.

from executorlib import get_item_from_future
with SingleNodeExecutor() as exe:
    future_dict = exe.submit(get_a_dict, 15)
    f1 = get_item_from_future(future=future_dict, key="a")
    f2 = get_item_from_future(future=future_dict, key="b")
    f3 = get_item_from_future(future=future_dict, key="c")
    print(f1.result(), f2.result(), f3.result())
1 2 15

Recursion#

In addition to the map() function for parallel loops and the option to integrate while loops directly in the submission, it is sometimes helpful to use recursive algorthims to accelerate scientific simulation and analysis. To demonstrate the use of executorlib in combination with recursive algorithms, the quicksort algorithm is demonstrated.

The quicksort algorithm is implemented in two parts, a quick_sort() which splits the input sequence into two sets, one lower and one larger than the pivot element of the sequence. After the initial execution the quick_sort() function is again applied to both resulting sets until only a single element remains in the sequence. In this case the single element is returned. The pivot is defined as the first element of the sequence.

To simplify the submission of the quick_sort() function to the SingleNodeExecutor() dictionaries are used as return types, these dictionaries either contain only a single item with the key "result" when only a single element remains in the list, or three items, one "left" for a list lower than the pivot, one "right" for the items larger than the pivot and the "result" for the pivot itself.

def quick_sort(sequence):
    length = len(sequence)
    if length <= 1:
        return {"result": sequence}
    else:
        pivot = sequence.pop() 
    
    greater_items = []
    lesser_items = []

    for item in sequence:
        if item > pivot:
            greater_items.append(item)
        else:
            lesser_items.append(item)

    return {"left": lesser_items, "right": greater_items, "result": [pivot]}

To enable the recursive submission to the SingleNodeExecutor() a recusive_submit() function is defined using the asynchronous IO module of the Python standard libary, indicated by the async keyword ahead of the function definition. It takes a function, the sequence and the executor as inputs. As a first step the function is submitted to the executor and the execution is halted until the first result is available by wrapping the future object using the wrap_future() function of the asyncio package and appling the await function. Then the corresponding result dictionary result_dict is evaluated: If the result_dict contains more than one key, then the recusive_submit() is evaluated for both the left side with lower values than the pivot element and the right side with elements higher than the pivot element. Again in both cases the await keyword is used to enable the parallel execution of both branches at the same time. Afterwards the result of both sides is combined with the pivot element. For the case that the dictionary only contains a single element, it is returned directly.

import asyncio

async def recusive_submit(function, sequence, executor):
    result_dict = await asyncio.wrap_future(exe.submit(function, sequence))
    if len(result_dict) > 1:
        left = await recusive_submit(function=function, sequence=result_dict["left"], executor=executor)
        right = await recusive_submit(function=function, sequence=result_dict["right"], executor=executor)
        return left + result_dict["result"] + right
    else:
        return result_dict["result"]
with SingleNodeExecutor() as exe:
    loop = asyncio.get_event_loop()
    task = loop.create_task(recusive_submit(function=quick_sort, sequence=[0,9,3,8,2,7,5], executor=exe))
    print(await task)
[0, 2, 3, 5, 7, 8, 9]

Testing and Debugging#

The up-scaling of Python functions from a single workstation to an High Performance Computer (HPC) can be challenging, so executorlib provides a number of debugging utilities to help you optimize your functions for execution with executorlib.

Measure Data Transferred#

Transferring a large amount of data between two processes requires additional resources so it is helpful to measure the data transferred between the frontend and backend process. This is achieved by setting the log_obj_size parameter to True:

from executorlib import SingleNodeExecutor

with SingleNodeExecutor(log_obj_size=True) as exe:
    future = exe.submit(sum, [1, 1])
    print(future.result())
Send dictionary of size: 101
Received dictionary of size: 59
2
Send dictionary of size: 69
Received dictionary of size: 58

Write Log Files#

Libraries like executorlib are commonly used to sample a large parameter space, in this case it is possible that out of a large number of parameters one combination throws an error. This error can be logged in a file which also contains the function and input parameters using the "error_log_file" parameter in the resource_dict. This allows to change the log file on a per-function bases.

from executorlib import SingleNodeExecutor
def my_funct(i, j): 
    if i == 2 and j == 2:
        raise ValueError()
    else: 
        return i * j + i + j

A try and except statement is added to prevent the jupyter notebook from crashing:

with SingleNodeExecutor(resource_dict={"error_log_file": "error.out"}) as exe:
    future_lst = []
    for i in range(4):
        for j in range(4):
            future_lst.append(exe.submit(my_funct, i=i, j=j))
    try:
        print([f.result() for f in future_lst])
    except ValueError:
        pass

The content of the log file is a basic text file, so it can be read with any kind of file utility. The important part is that the log file contains not only the error message but in addition also the function name and the input parameters in the case kwargs: {'i': 2, 'j': 2} which helps for future debugging of the sampling function.

with open("error.out") as f:
    content = f.readlines()
content
['function: <function my_funct at 0x776b70255c60>\n',
 'args: ()\n',
 "kwargs: {'i': 2, 'j': 2}\n",
 'Traceback (most recent call last):\n',
 '  File "/srv/conda/envs/notebook/lib/python3.13/site-packages/executorlib/backend/interactive_serial.py", line 56, in main\n',
 '    output = call_funct(input_dict=input_dict, funct=None, memory=memory)\n',
 '  File "/srv/conda/envs/notebook/lib/python3.13/site-packages/executorlib/standalone/interactive/backend.py", line 33, in call_funct\n',
 '    return funct(input_dict["fn"], *input_dict["args"], **input_dict["kwargs"])\n',
 '  File "/srv/conda/envs/notebook/lib/python3.13/site-packages/executorlib/standalone/interactive/backend.py", line 22, in funct\n',
 '    return args[0].__call__(*args[1:], **kwargs)\n',
 '           ~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^\n',
 '  File "/tmp/ipykernel_100/3167739528.py", line 3, in my_funct\n',
 'ValueError\n']
import os

if os.path.exists("error.out"):
    os.remove("error.out")

TestClusterExecutor#

While the SingleNodeExecutor internally behaves very similar to the FluxJobExecutor and SlurmJobExecutor the FluxClusterExecutor and SlurmClusterExecutor behave very different as they use the file system to exchange information rather than socket-based communication. This can lead to complications when it comes to debugging. To address this challenge executorlib provides the TestClusterExecutor which can be executed on a local workstation just like the SingleNodeExecutor but in the background it uses the same file based communication like the SlurmClusterExecutor and the FluxClusterExecutor:

from executorlib.api import TestClusterExecutor

with TestClusterExecutor(cache_directory="test") as exe:
    future = exe.submit(sum, [1, 1])
    print(future.result())
2