In Flutter, heavy computational tasks run in Isolates. What is the equivalent in Python? How do you run CPU-heavy tasks without blocking the main thread?
#python#multiprocessing#concurrency#parallelism#process-pool#asyncio#cpu-bound#shared-memory#flutter-comparison
Answer
Python's Answer to Flutter Isolates: Multiprocessing
In Flutter, Isolates are separate memory spaces with their own event loops — they don't share memory and communicate via message passing. Python's direct equivalent is the module, which spawns separate OS processes, each with its own Python interpreter and GIL.textmultiprocessing
Flutter Isolates vs Python Options
| Feature | Flutter Isolate | Python text | Python text | Python text |
|---|---|---|---|---|
| Separate memory | Yes | Yes | No (shared) | No (shared) |
| Bypasses GIL | N/A (no GIL in Dart) | Yes | No | No |
| CPU-bound tasks | Yes | Yes | No (GIL blocks) | No (single-threaded) |
| I/O-bound tasks | Yes | Overkill | Yes | Yes (best) |
| Communication | Message passing ( text | text text | Shared variables + locks | text |
| Overhead | Medium | High (process creation) | Low | Very low |
| True parallelism | Yes | Yes | No | No |
Key Insight: Flutter's Isolates and Python's
both achieve true parallelism by running code in separate memory spaces. The reason Python needs this is the GIL (Global Interpreter Lock) — threads in Python cannot execute CPU-bound Python bytecode in parallel.textmultiprocessing
Approach 1: textmultiprocessing.Process
(Direct Isolate Equivalent)
text
multiprocessing.ProcessThe closest match to Flutter's
text
Isolate.spawn()pythonimport multiprocessing import time import os def heavy_computation(numbers: list, result_queue: multiprocessing.Queue) -> None: """Runs in a separate process (like a Flutter Isolate).""" print(f"Worker process PID: {os.getpid()}") total = sum(n * n for n in numbers) # CPU-heavy task result_queue.put(total) # Send result back (like SendPort) if __name__ == "__main__": print(f"Main process PID: {os.getpid()}") # Create a queue for communication (like SendPort/ReceivePort) result_queue = multiprocessing.Queue() # Spawn a process (like Isolate.spawn) numbers = list(range(10_000_000)) process = multiprocessing.Process( target=heavy_computation, args=(numbers, result_queue) ) start = time.time() process.start() # Start the process # Main process is NOT blocked — can do other work here print("Main process continues working...") result = result_queue.get() # Wait for result (like await) process.join() # Wait for process to finish print(f"Result: {result}") print(f"Time: {time.time() - start:.2f}s")
Approach 2: textProcessPoolExecutor
(Recommended for Most Cases)
text
ProcessPoolExecutorThe most Pythonic way — a pool of worker processes, similar to managing multiple Isolates with a pool.
pythonfrom concurrent.futures import ProcessPoolExecutor, as_completed import time import os def process_chunk(chunk: list[int]) -> int: """CPU-heavy task — runs in a separate process.""" print(f"Processing chunk in PID: {os.getpid()}") return sum(n * n for n in chunk) def split_into_chunks(data: list, num_chunks: int) -> list[list]: """Split data into roughly equal chunks.""" chunk_size = len(data) // num_chunks return [data[i:i + chunk_size] for i in range(0, len(data), chunk_size)] if __name__ == "__main__": numbers = list(range(20_000_000)) num_workers = 4 # Like spawning 4 Isolates chunks = split_into_chunks(numbers, num_workers) # --- Sequential (single process) --- start = time.time() sequential_result = sum(n * n for n in numbers) print(f"Sequential: {time.time() - start:.2f}s") # --- Parallel (multiple processes) --- start = time.time() with ProcessPoolExecutor(max_workers=num_workers) as executor: futures = [executor.submit(process_chunk, chunk) for chunk in chunks] parallel_result = sum(f.result() for f in as_completed(futures)) print(f"Parallel ({num_workers} workers): {time.time() - start:.2f}s") print(f"Results match: {sequential_result == parallel_result}")
Approach 3: textmultiprocessing.Pool.map
(Simplest API)
text
multiprocessing.Pool.mappythonfrom multiprocessing import Pool import math def is_prime(n: int) -> bool: """CPU-intensive prime check.""" if n < 2: return False for i in range(2, int(math.sqrt(n)) + 1): if n % i == 0: return False return True if __name__ == "__main__": numbers = list(range(100_000, 200_000)) # map() distributes work across processes automatically with Pool(processes=4) as pool: results = pool.map(is_prime, numbers) prime_count = sum(results) print(f"Found {prime_count} primes")
Approach 4: Async + ProcessPool (Best of Both Worlds)
For Gen AI apps that need both async I/O (API calls) and CPU parallelism (data processing).
pythonimport asyncio from concurrent.futures import ProcessPoolExecutor import numpy as np def compute_embeddings_batch(texts: list[str]) -> list[list[float]]: """CPU-heavy: simulate embedding generation in separate process.""" # In real code: use a local model like sentence-transformers return [np.random.rand(384).tolist() for _ in texts] async def process_documents(documents: list[str]) -> list[list[float]]: """Async function that offloads CPU work to a process pool.""" loop = asyncio.get_event_loop() # Split documents into batches batch_size = 100 batches = [ documents[i:i + batch_size] for i in range(0, len(documents), batch_size) ] # Run CPU-heavy embedding in separate processes # while keeping the event loop free for I/O with ProcessPoolExecutor(max_workers=4) as pool: tasks = [ loop.run_in_executor(pool, compute_embeddings_batch, batch) for batch in batches ] results = await asyncio.gather(*tasks) # Flatten results return [emb for batch_result in results for emb in batch_result] async def main(): documents = [f"Document {i}" for i in range(1000)] # CPU-heavy work runs in processes, doesn't block async I/O embeddings = await process_documents(documents) print(f"Generated {len(embeddings)} embeddings") # Can simultaneously do async I/O work here # e.g., await fetch_from_api(), await save_to_db() if __name__ == "__main__": asyncio.run(main())
Approach 5: Shared Memory (Like Flutter's textTransferableTypedData
)
text
TransferableTypedDataFor large data, avoid copying between processes using shared memory.
pythonfrom multiprocessing import shared_memory, Process import numpy as np def worker_with_shared_memory(shm_name: str, shape: tuple, dtype: str) -> None: """Access shared memory without copying data (zero-copy).""" existing_shm = shared_memory.SharedMemory(name=shm_name) array = np.ndarray(shape, dtype=dtype, buffer=existing_shm.buf) # Modify in-place — changes visible to main process array *= 2 print(f"Worker doubled the array. Sum: {array.sum()}") existing_shm.close() if __name__ == "__main__": # Create large array in shared memory data = np.array([1, 2, 3, 4, 5], dtype=np.float64) shm = shared_memory.SharedMemory(create=True, size=data.nbytes) shared_array = np.ndarray(data.shape, dtype=data.dtype, buffer=shm.buf) shared_array[:] = data # Copy data into shared memory once # Worker accesses same memory — no copy! p = Process( target=worker_with_shared_memory, args=(shm.name, data.shape, str(data.dtype)) ) p.start() p.join() print(f"Main sees modified data: {shared_array}") # [2, 4, 6, 8, 10] shm.close() shm.unlink() # Clean up shared memory
Side-by-Side: Flutter Isolate vs Python Multiprocessing
python# ─── Flutter (Dart) ──────────────────────────────────── # import 'dart:isolate'; # # Future<int> heavyTask(int n) async { # return await Isolate.run(() { # int sum = 0; # for (var i = 0; i < n; i++) { sum += i * i; } # return sum; # }); # } # ─── Python Equivalent ──────────────────────────────── from concurrent.futures import ProcessPoolExecutor def heavy_task(n: int) -> int: return sum(i * i for i in range(n)) # One-liner equivalent of Isolate.run() with ProcessPoolExecutor() as pool: future = pool.submit(heavy_task, 10_000_000) result = future.result() # Like await in Dart print(f"Result: {result}")
Quick Decision Guide
| Task Type | Python Solution | Flutter Equivalent |
|---|---|---|
| CPU-heavy (one-off) | text | text |
| CPU-heavy (multiple) | text | text |
| CPU-heavy (simple map) | text | text |
| I/O-bound (network) | text text | text text |
| Mixed CPU + I/O | text text | Isolate + Stream |
| Large data sharing | text | text |
Best Practice for Gen AI: Use
withtextProcessPoolExecutorin your AI pipelines. This lets you handle API calls asynchronously while offloading CPU-heavy preprocessing (tokenization, chunking, embedding) to separate processes — maximizing throughput without blocking.textasyncio.run_in_executor()
Resources: