Skip to content

FEA shelving#1779

Open
Nanored4498 wants to merge 11 commits intojoblib:mainfrom
Nanored4498:FEA-Shelving
Open

FEA shelving#1779
Nanored4498 wants to merge 11 commits intojoblib:mainfrom
Nanored4498:FEA-Shelving

Conversation

@Nanored4498
Copy link
Copy Markdown
Contributor

@Nanored4498 Nanored4498 commented Feb 12, 2026

This PR is an attempts to address #593.

This PR is based on an existing PR #619.

  • The PR [WIP] introduce object shelving #619 implemented shelving to be used with threading backend and not with loky or multiprocessing. I made it, this time, compatible with loky and multiprocessing.
  • I did not implemented any ref count to automatically delete shelved object. The user has to handle the lifetime of shelved objects by hand. The shelved object is cleared when the method clear is used or when the owning Shelf is closed. A Shelf is closed when the program ends or when the close method is used. A Shelf object can also be used as a context with with. The __exit__ method of Shelf closes the shelf and hence remove all files containing the shelved objects.
  • I did not implement shelve_mmap. I would say that users could create their own memmap and it does not seem to me that much related to "basic" shelving.

Example:

import joblib

def fun(a, i):
    return a.result()[i]

a = [i for i in range(10)]

shelved_a = joblib.shelve(a)
out = joblib.Parallel(n_jobs=10)(
    joblib.delayed(fun)(shelved_a, i) for i in range(10)
)
print(out)
shelved_a.clear()

with joblib.Shelf("tmp_dir") as shelf:
    shelved_a = shelf.shelve(a)
    out = joblib.Parallel(n_jobs=10)(
        joblib.delayed(fun)(shelved_a, i) for i in range(10)
    )
    print(out)

Closes: #593
Closes: #996
Closes: #1733

@codecov
Copy link
Copy Markdown

codecov bot commented Feb 12, 2026

Codecov Report

❌ Patch coverage is 98.03922% with 3 lines in your changes missing coverage. Please review.
✅ Project coverage is 94.27%. Comparing base (1670e76) to head (7e7d97a).

Files with missing lines Patch % Lines
joblib/test/test_shelf.py 97.75% 2 Missing ⚠️
joblib/shelf.py 98.38% 1 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #1779      +/-   ##
==========================================
+ Coverage   94.21%   94.27%   +0.05%     
==========================================
  Files          46       48       +2     
  Lines        7922     8070     +148     
==========================================
+ Hits         7464     7608     +144     
- Misses        458      462       +4     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@Nanored4498
Copy link
Copy Markdown
Contributor Author

For a perf comparison, it's possible to use the example from #1733

import concurrent
import concurrent.futures
import itertools
import time

import joblib
import pyvista as pv


def process(mesh: pv.PolyData, idx: int) -> float:
    """A simple function that accesses data from the mesh."""
    return mesh.points[idx, 0]

def process2(mesh, idx):
    return process(mesh.result(), idx)

def main() -> None:
    # Create a large mesh object to demonstrate the issue.
    # A smaller `level` will reduce the mesh size and the performance gap.
    mesh: pv.PolyData = pv.Box(level=200)
    N_JOBS: int = 1000
    print("Mesh details:")
    print(mesh)
    print("-" * 20)

    # Benchmark joblib with automatic batching
    with joblib.parallel_config(n_jobs=8, verbose=1, prefer="processes"):
        time_start: float = time.perf_counter()
        shelved_mesh = joblib.shelve(mesh)
        parallel = joblib.Parallel()
        _ = parallel(joblib.delayed(process2)(shelved_mesh, i) for i in range(N_JOBS))
        shelved_mesh.clear()
        time_end: float = time.perf_counter()
        rate: float = N_JOBS / (time_end - time_start)
        print(f"joblib (with shelving): {rate:.2f} it/sec")

        time_start: float = time.perf_counter()
        parallel = joblib.Parallel()
        _ = parallel(joblib.delayed(process)(mesh, i) for i in range(N_JOBS))
        time_end: float = time.perf_counter()
        rate: float = N_JOBS / (time_end - time_start)
        print(f"joblib: {rate:.2f} it/sec")

    # Benchmark concurrent.futures.ProcessPoolExecutor
    time_start = time.perf_counter()
    with concurrent.futures.ProcessPoolExecutor(max_workers=8) as executor:
        _ = executor.map(
            process,
            itertools.repeat(mesh),
            range(N_JOBS),
            chunksize=N_JOBS // 16,
        )
    time_end = time.perf_counter()
    rate: float = N_JOBS / (time_end - time_start)
    print(f"concurrent.futures: {rate:.2f} it/sec")


if __name__ == "__main__":
    main()

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

1 participant