twat-mp is a Python library designed to simplify parallel processing. It leverages the power of the Pathos library for robust synchronous parallel operations and offers experimental support for asynchronous parallelism using aiomultiprocess. Whether you're dealing with CPU-bound computations or I/O-bound tasks, twat-mp provides convenient tools to help you write efficient, parallelized Python code.
This project is part of the twat collection of Python utilities.
twat-mp is for Python developers who want to:
- Speed up their applications by running tasks in parallel.
- Utilize multiple CPU cores effectively for computationally intensive work.
- Improve responsiveness of applications performing many I/O operations (e.g., network requests, file operations).
- Write cleaner and more manageable parallel code using context managers and decorators.
- Simplified Parallelism: Abstracts away much of the boilerplate associated with Python's native multiprocessing.
- Performance Gains: Enables significant speedups for suitable tasks by distributing work across multiple processes or threads.
- CPU and I/O Optimization: Offers distinct tools (
ProcessPoolfor CPU-bound,ThreadPoolfor I/O-bound) to best match your workload. - Resource Management: Automatic and reliable cleanup of parallel resources using context managers.
- Enhanced Error Handling: Provides clearer error messages from worker processes, making debugging easier.
- Flexible Configuration: Allows customization of worker counts, with sensible defaults based on system CPU cores.
- Modern Python: Built with type hints and modern Python features.
- Synchronous Parallel Processing (Core):
ProcessPool: Context manager for CPU-intensive tasks, utilizing multiple processes.ThreadPool: Context manager for I/O-bound tasks, utilizing multiple threads.- Decorators for common parallel mapping operations:
@pmap: Standard parallel map (eager evaluation).@imap: Lazy parallel map, returning an iterator (memory-efficient).@amap: Asynchronous-style map that collects results (uses synchronous Pathos pools).
- Experimental Asynchronous Support (via
aiomultiprocess):AsyncMultiPool: Context manager for combiningasync/awaitwith multiprocessing.@apmap: Decorator for applyingasyncfunctions in parallel.
- Automatic CPU Core Detection: Optimizes default pool sizes.
- Debug Mode: Provides detailed logging for troubleshooting.
- Customizable Worker Count: Flexibly configure the number of parallel workers.
- Enhanced Exception Propagation: Clearer context for errors occurring in worker processes/threads.
You can install twat-mp using pip:
Core Installation (Synchronous Features):
pip install twat-mpWith Experimental Async Support:
To include the experimental asynchronous features (which require aiomultiprocess), install the aio extra:
pip install 'twat-mp[aio]'With All Extras and Development Tools:
pip install 'twat-mp[all,dev]'twat-mp is a library and is used programmatically within your Python scripts.
Ideal for tasks that perform heavy computations.
from twat_mp import ProcessPool
def calculate_square(x):
# Simulate a CPU-intensive calculation
# In a real scenario, this would be actual computation
result = 0
for _ in range(x * 10000): # Ensure this is not too small to see effect
result +=1
return x * x
if __name__ == "__main__":
with ProcessPool() as pool:
numbers = range(10)
results = pool.map(calculate_square, numbers)
print(f"Squares: {list(results)}")
# Expected (order may vary if not collected into a list immediately for some pool methods):
# Squares: [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]Suitable for tasks that spend time waiting for external operations like network requests or file I/O.
from twat_mp import ThreadPool
import time
import requests # Make sure to install: pip install requests
def fetch_url_status(url):
try:
response = requests.get(url, timeout=5)
return url, response.status_code
except requests.RequestException as e:
return url, str(e)
if __name__ == "__main__":
urls = [
"https://www.python.org",
"https://www.github.com",
"https://nonexistent.example.com" # Example of a failing URL
]
with ThreadPool(nodes=len(urls)) as pool: # Using more threads for I/O
results = pool.map(fetch_url_status, urls)
for url, status in results:
print(f"Status for {url}: {status}")Decorators provide a concise way to parallelize functions.
from twat_mp import pmap, imap, amap
import time
# @pmap: Eager parallel map
@pmap
def process_data_pmap(item):
# print(f"pmap processing {item}")
time.sleep(0.01) # Simulate work
return item * 2
# @imap: Lazy parallel map (results as an iterator)
@imap
def process_data_imap(item):
# print(f"imap processing {item}")
time.sleep(0.01) # Simulate work
return item * 3
# @amap: Asynchronous-style map (collects all results)
@amap
def process_data_amap(item):
# print(f"amap processing {item}")
time.sleep(0.01) # Simulate work
return item * 4
if __name__ == "__main__":
data = range(5)
print("pmap results:", list(process_data_pmap(data)))
# Expected: pmap results: [0, 2, 4, 6, 8]
print("imap results:")
for res_imap in process_data_imap(data):
print(res_imap, end=" ") # 0 3 6 9 12
print()
print("amap results:", list(process_data_amap(data)))
# Expected: amap results: [0, 4, 8, 12, 16]Note: These features are experimental and require aiomultiprocess (pip install 'twat-mp[aio]'). Their API and behavior might change in future releases.
Combines async/await with multiprocessing.
import asyncio
from twat_mp import AsyncMultiPool # Requires 'aio' extra
async def async_work(x):
await asyncio.sleep(0.1) # Simulate async I/O or CPU work in an async context
return x * 2
async def main_async_pool():
# Ensure AsyncMultiPool is imported if 'aio' extra is installed
# and you intend to use these experimental features.
# from twat_mp import AsyncMultiPool
async with AsyncMultiPool() as pool: # Experimental
results = await pool.map(async_work, range(5))
print(f"AsyncMultiPool results: {results}")
# Expected: AsyncMultiPool results: [0, 2, 4, 6, 8]
if __name__ == "__main__":
# To run this example, ensure 'aio' extra is installed.
# You might need to check if AsyncMultiPool is available due to its optional dependency.
try:
from twat_mp import AsyncMultiPool
asyncio.run(main_async_pool())
except ImportError:
print("Async features not available. Install with 'pip install \"twat-mp[aio]\"'")
except RuntimeError as e:
print(f"Async example runtime error: {e}") # Handles test hangs etc.import asyncio
from twat_mp import apmap # Requires 'aio' extra
@apmap # Experimental
async def double_async_apmap(x):
await asyncio.sleep(0.1) # Simulate async work
return x * 2
async def main_apmap_example():
results = await double_async_apmap(range(5))
print(f"apmap decorated results: {results}")
# Expected: apmap decorated results: [0, 2, 4, 6, 8]
if __name__ == "__main__":
# To run this example, ensure 'aio' extra is installed.
try:
from twat_mp import apmap
asyncio.run(main_apmap_example())
except ImportError:
print("Async features not available. Install with 'pip install \"twat-mp[aio]\"'")
except RuntimeError as e:
print(f"Async example runtime error: {e}")# Conceptual example for parallel image resizing
from twat_mp import ProcessPool
# from PIL import Image # Requires: pip install Pillow
import os
def resize_image(image_path):
# Placeholder for actual image processing
# print(f"Resizing {image_path}...")
# with Image.open(image_path) as img:
# resized_img = img.resize((img.width // 2, img.height // 2))
# # save resized_img
time.sleep(0.1) # Simulate work
return f"Resized {os.path.basename(image_path)}"
if __name__ == "__main__":
# Create dummy image files for the example
# os.makedirs("dummy_images", exist_ok=True)
# image_files = []
# for i in range(5):
# fp = f"dummy_images/image_{i}.png"
# # with open(fp, "w") as f: f.write("dummy") # Create placeholder
# image_files.append(fp)
# Assuming image_files is a list of paths to image files
image_files = [f"image_{i}.png" for i in range(5)] # Dummy list
if image_files:
with ProcessPool() as pool:
results = pool.map(resize_image, image_files)
for r in results:
print(r)
else:
print("No image files to process for example.")# Conceptual example for parallel web scraping
from twat_mp import ThreadPool
import requests
# from bs4 import BeautifulSoup # Requires: pip install beautifulsoup4
def fetch_page_title(url):
try:
# print(f"Fetching {url}...")
response = requests.get(url, timeout=5)
response.raise_for_status()
# soup = BeautifulSoup(response.text, 'html.parser')
# title = soup.title.string if soup.title else "No title"
time.sleep(0.1) # Simulate network latency and parsing
return f"Title for {url}: Fetched (simulated)"
except requests.RequestException as e:
return f"Error fetching {url}: {e}"
if __name__ == "__main__":
urls_to_scrape = [
"https://www.python.org",
"https://www.github.com",
"https://www.djangoproject.com",
]
with ThreadPool(nodes=len(urls_to_scrape)) as pool:
results = pool.map(fetch_page_title, urls_to_scrape)
for r in results:
print(r)The synchronous part of twat-mp is built around the Pathos library.
-
MultiPool: This is the base context manager class.- It handles the creation and cleanup of Pathos pools (
PathosProcessPoolorPathosThreadPool). - If the number of
nodes(workers) isn't specified, it defaults to the system's CPU count usingpathos.helpers.mp.cpu_count(). - Upon entering the context (
__enter__), it instantiates the specified Pathos pool. - Crucially, it patches the pool's
mapmethod to use_worker_wrapper. This ensures that any exceptions from worker functions are caught and wrapped in aWorkerError. - On exiting (
__exit__), it ensures the pool is properly closed, joined, and cleared, handlingKeyboardInterruptfor quicker termination.
- It handles the creation and cleanup of Pathos pools (
-
ProcessPool&ThreadPool: These are subclasses ofMultiPoolthat default to usingPathosProcessPoolandPathosThreadPoolrespectively. They are the primary context managers for users. -
_worker_wrapper(func, item, worker_id):- This internal function wraps the user's target function (
func) that is executed in a worker. - If
func(item)executes successfully, its result is returned. - If
func(item)raises an exception,_worker_wrappercatches it, logs the error and traceback (if debug mode is on), and then re-raises it as aWorkerError. This custom exception includes the original exception, the inputitemthat caused the failure, an optionalworker_id(typically the item's index), and the traceback string.
- This internal function wraps the user's target function (
-
WorkerError:- A custom exception class inheriting from
Exception. - It's designed to provide more context about errors occurring in worker processes/threads, bundling the original exception, the problematic input, and traceback.
- A custom exception class inheriting from
-
mmap(how, get_result=False, debug=None): This is a decorator factory.- It's not directly used by end-users but is the engine behind
@pmap,@imap, and@amap. - The
howargument specifies the Pathos mapping method to use:'map': Standard eager evaluation (used by@pmap).'imap': Lazy evaluation, returns an iterator (used by@imap).'amap': Asynchronous-style map; Pathos'samapreturns a future-like object.
- The
get_result=Trueparameter (used by@amap) means the decorator will automatically call.get()on the result of Pathos'samapto retrieve the computed values. - The factory returns a decorator, which in turn wraps the user's function. When the decorated function is called with an iterable, it internally creates a
MultiPool(defaulting toProcessPool), gets the specified mapping method from the pool, and applies it to the user's function and iterable. - It handles
WorkerErrorpropagation: if aWorkerErroris caught, it attempts to re-raise theoriginal_exceptionfor a more natural error flow for the user.
- It's not directly used by end-users but is the engine behind
-
set_debug_mode(enabled: bool):- A global function to enable or disable
DEBUG_MODE. - When enabled, it configures logging to
DEBUGlevel, providing detailed output about pool creation, task execution, and cleanup.
- A global function to enable or disable
These components rely on the aiomultiprocess library and are marked as experimental.
-
_check_aiomultiprocess(): A utility function that raises anImportErrorifaiomultiprocessis not installed, guiding the user to install it viapip install 'twat-mp[aio]'. -
AsyncMultiPool: An asynchronous context manager.- On
__aenter__, it creates anaiomultiprocess.Poolinstance. Users can specify the number ofprocesses, aninitializerfunction for workers, andinitargs. - It provides
asyncmethods:map(func, iterable): Applies anasyncfunctionfuncto each item initerablein parallel.starmap(func, iterable): Similar tomap, but unpacks argument tuples fromiterable.imap(func, iterable): Returns an async iterator that yields results as they complete.
- On
__aexit__, it handles the shutdown of theaiomultiprocess.Poolgracefully (close, join) with a fallback to terminate if necessary. It also includes robust error handling during cleanup.
- On
-
@apmapDecorator:- An
asyncdecorator forasyncfunctions. - When the decorated
asyncfunction is called with an iterable,@apmapinternally creates anAsyncMultiPooland uses itsmapmethod to execute the function calls in parallel. - It simplifies the pattern of setting up and tearing down an
AsyncMultiPoolfor a single parallel map operation.
- An
- Synchronous: Worker exceptions are caught by
_worker_wrapperand re-raised asWorkerError. Themmapdecorator factory then catchesWorkerErrorand typically re-raises theoriginal_exceptioncontained within it. This makes the error handling feel more direct to the user. - Asynchronous (Experimental): Errors within
aiomultiprocesstasks are generally propagated byaiomultiprocessitself.AsyncMultiPooland@apmapwrap calls toaiomultiprocessmethods and may raiseRuntimeErrorfor issues during pool operations or ifaiomultiprocessitself raises an error that isn't more specific.
- The primary mechanism for resource management is the use of context managers:
with ProcessPool() as pool:/with ThreadPool() as pool:async with AsyncMultiPool() as pool:(Experimental)
- The
__exit__and__aexit__methods of these context managers are responsible for ensuring that underlying pools (Pathos or aiomultiprocess) are properly closed, worker processes/threads are joined, and resources are released. This happens automatically, even if errors occur within thewithblock.
We encourage contributions to twat-mp! Please follow these guidelines:
- Formatting: Code is formatted using Ruff (which includes Black-compatible formatting). Key settings (from
pyproject.toml):line-length = 88quote-style = "double"indent-style = "space"
- Linting: Ruff is also used for linting, with an extended set of rules (
Ifor isort,Nfor pep8-naming,Bfor flake8-bugbear,RUFfor Ruff-specific rules). Seepyproject.tomlfor specific ignored rules. - Typing: The project uses type hints and is checked with MyPy. Stricter MyPy options are enabled (e.g.,
disallow_untyped_defs,no_implicit_optional).
- Tests are written using Pytest.
- Running Tests:
- Core synchronous tests:
hatch run test:test(runs tests in parallel, ignores async tests). - To run specific tests or with coverage:
hatch run test:test-cov. - Asynchronous tests (require
[aio]extra) are typically intests/test_async_mp.pyand can be run specifically if needed, e.g.,pytest tests/test_async_mp.py. Project configuration aims to skip these by default ifaiomultiprocessis not available or if explicitly excluded.
- Core synchronous tests:
- Benchmarks: Performance benchmarks use
pytest-benchmark. Run withhatch run test:bench. - New features should include corresponding tests. Bug fixes should ideally include a test that reproduces the bug.
- Core:
pathosis a core dependency. - Optional:
aiomultiprocessis an optional dependency, required for the experimental async features. It's managed via the[aio]extra inpyproject.toml. - Development dependencies (linters, testing tools) are listed under
[project.optional-dependencies](e.g.,test,dev).
- Versioning is managed by
hatch-vcs, which derives the project version from Git tags.
- Fork the repository on GitHub.
- Create a new branch for your feature or bug fix:
git checkout -b feature/your-feature-nameorbugfix/issue-description. - Make your changes. Ensure you add tests and update documentation if necessary.
- Run linters and tests locally to ensure everything passes:
hatch run lint:all hatch run test:test hatch run test:test-cov
- Commit your changes with clear and descriptive commit messages.
- Push your branch to your fork:
git push origin feature/your-feature-name. - Open a Pull Request (PR) against the
mainbranch of thetwardoch/twat-mprepository. - Clearly describe your changes in the PR. Link to any relevant issues.
src/twat_mp/: Main source code for the library.mp.py: Synchronous multiprocessing components.async_mp.py: Experimental asynchronous multiprocessing components.
tests/: Pytest tests.docs/: Additional documentation (likearchitecture.md).pyproject.toml: Project metadata, dependencies, and tool configurations.README.md: This file.
For detailed API documentation of all classes, methods, and functions, please see the API Reference (API_REFERENCE.md).
twat-mp is licensed under the MIT License.