Concept #195Mediumpython-for-gen-aiimportant

In Flutter, heavy computational tasks run in Isolates. What is the equivalent in Python? How do you run CPU-heavy tasks without blocking the main thread?

#python#multiprocessing#concurrency#parallelism#process-pool#asyncio#cpu-bound#shared-memory#flutter-comparison

Answer

Python's Answer to Flutter Isolates: Multiprocessing

In Flutter, Isolates are separate memory spaces with their own event loops — they don't share memory and communicate via message passing. Python's direct equivalent is the

text
multiprocessing
module, which spawns separate OS processes, each with its own Python interpreter and GIL.


Flutter Isolates vs Python Options

FeatureFlutter IsolatePython
text
multiprocessing
Python
text
threading
Python
text
asyncio
Separate memoryYesYesNo (shared)No (shared)
Bypasses GILN/A (no GIL in Dart)YesNoNo
CPU-bound tasksYesYesNo (GIL blocks)No (single-threaded)
I/O-bound tasksYesOverkillYesYes (best)
CommunicationMessage passing (
text
SendPort
)
text
Queue
,
text
Pipe
, shared memory
Shared variables + locks
text
asyncio.Queue
OverheadMediumHigh (process creation)LowVery low
True parallelismYesYesNoNo

Key Insight: Flutter's Isolates and Python's

text
multiprocessing
both achieve true parallelism by running code in separate memory spaces. The reason Python needs this is the GIL (Global Interpreter Lock) — threads in Python cannot execute CPU-bound Python bytecode in parallel.


Approach 1:
text
multiprocessing.Process
(Direct Isolate Equivalent)

The closest match to Flutter's

text
Isolate.spawn()
— a separate process with message passing.

python
import multiprocessing
import time
import os

def heavy_computation(numbers: list, result_queue: multiprocessing.Queue) -> None:
    """Runs in a separate process (like a Flutter Isolate)."""
    print(f"Worker process PID: {os.getpid()}")
    total = sum(n * n for n in numbers)  # CPU-heavy task
    result_queue.put(total)  # Send result back (like SendPort)

if __name__ == "__main__":
    print(f"Main process PID: {os.getpid()}")
    
    # Create a queue for communication (like SendPort/ReceivePort)
    result_queue = multiprocessing.Queue()
    
    # Spawn a process (like Isolate.spawn)
    numbers = list(range(10_000_000))
    process = multiprocessing.Process(
        target=heavy_computation,
        args=(numbers, result_queue)
    )
    
    start = time.time()
    process.start()        # Start the process
    
    # Main process is NOT blocked — can do other work here
    print("Main process continues working...")
    
    result = result_queue.get()  # Wait for result (like await)
    process.join()               # Wait for process to finish
    
    print(f"Result: {result}")
    print(f"Time: {time.time() - start:.2f}s")

Approach 2:
text
ProcessPoolExecutor
(Recommended for Most Cases)

The most Pythonic way — a pool of worker processes, similar to managing multiple Isolates with a pool.

python
from concurrent.futures import ProcessPoolExecutor, as_completed
import time
import os

def process_chunk(chunk: list[int]) -> int:
    """CPU-heavy task — runs in a separate process."""
    print(f"Processing chunk in PID: {os.getpid()}")
    return sum(n * n for n in chunk)

def split_into_chunks(data: list, num_chunks: int) -> list[list]:
    """Split data into roughly equal chunks."""
    chunk_size = len(data) // num_chunks
    return [data[i:i + chunk_size] for i in range(0, len(data), chunk_size)]

if __name__ == "__main__":
    numbers = list(range(20_000_000))
    num_workers = 4  # Like spawning 4 Isolates
    chunks = split_into_chunks(numbers, num_workers)
    
    # --- Sequential (single process) ---
    start = time.time()
    sequential_result = sum(n * n for n in numbers)
    print(f"Sequential: {time.time() - start:.2f}s")
    
    # --- Parallel (multiple processes) ---
    start = time.time()
    with ProcessPoolExecutor(max_workers=num_workers) as executor:
        futures = [executor.submit(process_chunk, chunk) for chunk in chunks]
        parallel_result = sum(f.result() for f in as_completed(futures))
    print(f"Parallel ({num_workers} workers): {time.time() - start:.2f}s")
    
    print(f"Results match: {sequential_result == parallel_result}")

Approach 3:
text
multiprocessing.Pool.map
(Simplest API)

python
from multiprocessing import Pool
import math

def is_prime(n: int) -> bool:
    """CPU-intensive prime check."""
    if n < 2:
        return False
    for i in range(2, int(math.sqrt(n)) + 1):
        if n % i == 0:
            return False
    return True

if __name__ == "__main__":
    numbers = list(range(100_000, 200_000))
    
    # map() distributes work across processes automatically
    with Pool(processes=4) as pool:
        results = pool.map(is_prime, numbers)
    
    prime_count = sum(results)
    print(f"Found {prime_count} primes")

Approach 4: Async + ProcessPool (Best of Both Worlds)

For Gen AI apps that need both async I/O (API calls) and CPU parallelism (data processing).

python
import asyncio
from concurrent.futures import ProcessPoolExecutor
import numpy as np

def compute_embeddings_batch(texts: list[str]) -> list[list[float]]:
    """CPU-heavy: simulate embedding generation in separate process."""
    # In real code: use a local model like sentence-transformers
    return [np.random.rand(384).tolist() for _ in texts]

async def process_documents(documents: list[str]) -> list[list[float]]:
    """Async function that offloads CPU work to a process pool."""
    loop = asyncio.get_event_loop()
    
    # Split documents into batches
    batch_size = 100
    batches = [
        documents[i:i + batch_size]
        for i in range(0, len(documents), batch_size)
    ]
    
    # Run CPU-heavy embedding in separate processes
    # while keeping the event loop free for I/O
    with ProcessPoolExecutor(max_workers=4) as pool:
        tasks = [
            loop.run_in_executor(pool, compute_embeddings_batch, batch)
            for batch in batches
        ]
        results = await asyncio.gather(*tasks)
    
    # Flatten results
    return [emb for batch_result in results for emb in batch_result]

async def main():
    documents = [f"Document {i}" for i in range(1000)]
    
    # CPU-heavy work runs in processes, doesn't block async I/O
    embeddings = await process_documents(documents)
    print(f"Generated {len(embeddings)} embeddings")
    
    # Can simultaneously do async I/O work here
    # e.g., await fetch_from_api(), await save_to_db()

if __name__ == "__main__":
    asyncio.run(main())

Approach 5: Shared Memory (Like Flutter's
text
TransferableTypedData
)

For large data, avoid copying between processes using shared memory.

python
from multiprocessing import shared_memory, Process
import numpy as np

def worker_with_shared_memory(shm_name: str, shape: tuple, dtype: str) -> None:
    """Access shared memory without copying data (zero-copy)."""
    existing_shm = shared_memory.SharedMemory(name=shm_name)
    array = np.ndarray(shape, dtype=dtype, buffer=existing_shm.buf)
    
    # Modify in-place — changes visible to main process
    array *= 2
    print(f"Worker doubled the array. Sum: {array.sum()}")
    
    existing_shm.close()

if __name__ == "__main__":
    # Create large array in shared memory
    data = np.array([1, 2, 3, 4, 5], dtype=np.float64)
    shm = shared_memory.SharedMemory(create=True, size=data.nbytes)
    shared_array = np.ndarray(data.shape, dtype=data.dtype, buffer=shm.buf)
    shared_array[:] = data  # Copy data into shared memory once
    
    # Worker accesses same memory — no copy!
    p = Process(
        target=worker_with_shared_memory,
        args=(shm.name, data.shape, str(data.dtype))
    )
    p.start()
    p.join()
    
    print(f"Main sees modified data: {shared_array}")  # [2, 4, 6, 8, 10]
    
    shm.close()
    shm.unlink()  # Clean up shared memory

Side-by-Side: Flutter Isolate vs Python Multiprocessing

python
# ─── Flutter (Dart) ────────────────────────────────────
# import 'dart:isolate';
#
# Future<int> heavyTask(int n) async {
#   return await Isolate.run(() {
#     int sum = 0;
#     for (var i = 0; i < n; i++) { sum += i * i; }
#     return sum;
#   });
# }

# ─── Python Equivalent ────────────────────────────────
from concurrent.futures import ProcessPoolExecutor

def heavy_task(n: int) -> int:
    return sum(i * i for i in range(n))

# One-liner equivalent of Isolate.run()
with ProcessPoolExecutor() as pool:
    future = pool.submit(heavy_task, 10_000_000)
    result = future.result()  # Like await in Dart
    print(f"Result: {result}")

Quick Decision Guide

Task TypePython SolutionFlutter Equivalent
CPU-heavy (one-off)
text
multiprocessing.Process
text
Isolate.run()
CPU-heavy (multiple)
text
ProcessPoolExecutor
text
Isolate
pool
CPU-heavy (simple map)
text
Pool.map()
text
Isolate.run()
in loop
I/O-bound (network)
text
asyncio
/
text
threading
text
Future
/
text
async-await
Mixed CPU + I/O
text
asyncio
+
text
ProcessPoolExecutor
Isolate + Stream
Large data sharing
text
shared_memory
text
TransferableTypedData

Best Practice for Gen AI: Use

text
ProcessPoolExecutor
with
text
asyncio.run_in_executor()
in your AI pipelines. This lets you handle API calls asynchronously while offloading CPU-heavy preprocessing (tokenization, chunking, embedding) to separate processes — maximizing throughput without blocking.

Resources: