In Flutter we have state-management to manage the state of the application similarly do we have anything in Python if yes explain me along with the flow diagram?
Answer
State Management in Python
Yes — Python has multiple ways to manage application state, ranging from simple in-memory patterns to distributed stores and Gen AI-specific memory systems. Unlike Flutter's widget-tree reactive model, Python state management is pattern-driven and chosen based on scope, lifecycle, and concurrency needs.
Flutter vs Python State Management
| Flutter Concept | Python Equivalent |
|---|---|
text | Mutate class attributes / dataclass fields |
text | Dependency injection (FastAPI text text |
text | text text |
text | State machine ( text |
text | text |
| Global state | Module-level singletons / Redis |
| Ephemeral state | Local variables / instance attributes |
| Persistent state | Database / file / Redis |
Overview of Python State Management Options
1. Class-Based State (Equivalent to textsetState
)
setStateThe most direct equivalent — hold state in instance attributes and mutate them via methods.
pythonfrom dataclasses import dataclass, field from typing import Any @dataclass class AppState: user_query: str = "" retrieved_docs: list[str] = field(default_factory=list) llm_response: str = "" is_loading: bool = False error: str | None = None def reset(self) -> None: self.user_query = "" self.retrieved_docs = [] self.llm_response = "" self.is_loading = False self.error = None class RAGStateManager: def __init__(self): self._state = AppState() self._listeners: list = [] @property def state(self) -> AppState: return self._state def subscribe(self, callback) -> None: self._listeners.append(callback) def _notify(self) -> None: for cb in self._listeners: cb(self._state) def set_query(self, query: str) -> None: self._state.user_query = query self._state.is_loading = True self._notify() def set_docs(self, docs: list[str]) -> None: self._state.retrieved_docs = docs self._notify() def set_response(self, response: str) -> None: self._state.llm_response = response self._state.is_loading = False self._notify() def set_error(self, error: str) -> None: self._state.error = error self._state.is_loading = False self._notify() # Usage manager = RAGStateManager() manager.subscribe(lambda s: print(f"State updated: loading={s.is_loading}")) manager.set_query("What is RAG?") manager.set_docs(["doc1", "doc2"]) manager.set_response("RAG combines retrieval with generation.")
2. Pydantic State Model (Equivalent to textRiverpod
)
RiverpodImmutable, validated state containers — ideal for Gen AI pipelines.
pythonfrom pydantic import BaseModel from typing import Literal from datetime import datetime class PipelineState(BaseModel): session_id: str query: str status: Literal["idle", "retrieving", "generating", "done", "error"] = "idle" docs: list[str] = [] response: str = "" tokens_used: int = 0 created_at: datetime = datetime.now() class Config: frozen = True # immutable — forces explicit state transitions # State transitions via .model_copy() state = PipelineState(session_id="abc-123", query="Explain LoRA") # Each step produces a new state (immutable) state = state.model_copy(update={"status": "retrieving"}) state = state.model_copy(update={"docs": ["doc1", "doc2"], "status": "generating"}) state = state.model_copy(update={"response": "LoRA is...", "status": "done", "tokens_used": 420}) print(state.status) # done print(state.tokens_used) # 420
3. Module-Level Singleton (Global State)
A simple pattern for app-wide shared state — similar to a global
Providerpython# state.py — module-level singleton from threading import Lock class AppRegistry: _instance = None _lock = Lock() def __new__(cls): with cls._lock: if cls._instance is None: cls._instance = super().__new__(cls) cls._instance._store: dict = {} return cls._instance def set(self, key: str, value) -> None: self._store[key] = value def get(self, key: str, default=None): return self._store.get(key, default) def delete(self, key: str) -> None: self._store.pop(key, None) # Use from anywhere in the app registry = AppRegistry() registry.set("active_model", "gpt-4o") registry.set("session_count", 0) # Same instance returned everywhere registry_b = AppRegistry() print(registry is registry_b) # True print(registry_b.get("active_model")) # gpt-4o
4. Context Variables (textcontextvars
) — Request-Scoped State
contextvarsThe Python equivalent of Flutter's
InheritedWidgetpythonimport asyncio from contextvars import ContextVar # Define context variables current_user: ContextVar[str] = ContextVar("current_user", default="anonymous") request_id: ContextVar[str] = ContextVar("request_id", default="") session_state: ContextVar[dict] = ContextVar("session_state", default={}) async def handle_request(user: str, query: str) -> str: # Set state for THIS coroutine's context only current_user.set(user) request_id.set(f"req-{user}-001") session_state.set({"query": query, "tokens": 0}) result = await process_query(query) return result async def process_query(query: str) -> str: # Access context state from anywhere in the call stack user = current_user.get() rid = request_id.get() print(f"[{rid}] Processing for {user}: {query}") return f"Answer for {user}" async def main(): # Two concurrent requests — each has isolated state await asyncio.gather( handle_request("alice", "What is RAG?"), handle_request("bob", "Explain LoRA"), ) asyncio.run(main()) # [req-alice-001] Processing for alice: What is RAG? # [req-bob-001] Processing for bob: Explain LoRA
Use case: Per-request user ID, auth tokens, trace IDs in async FastAPI apps.
5. State Machine (texttransitions
) — BLoC Equivalent
transitionsModel complex workflows as explicit states and transitions — equivalent to BLoC's event → state pattern.
pythonfrom transitions import Machine class RAGWorkflow: states = ["idle", "retrieving", "generating", "done", "error"] transitions = [ {"trigger": "query_received", "source": "idle", "dest": "retrieving"}, {"trigger": "docs_fetched", "source": "retrieving", "dest": "generating"}, {"trigger": "retrieval_failed", "source": "retrieving", "dest": "error"}, {"trigger": "response_ready", "source": "generating", "dest": "done"}, {"trigger": "generation_failed", "source": "generating", "dest": "error"}, {"trigger": "reset", "source": ["done", "error"], "dest": "idle"}, ] def __init__(self): self.query = "" self.docs: list[str] = [] self.response = "" self.machine = Machine( model=self, states=self.states, transitions=self.transitions, initial="idle" ) def on_enter_retrieving(self): print(f"[→ RETRIEVING] Fetching docs for: {self.query}") def on_enter_generating(self): print(f"[→ GENERATING] Building response from {len(self.docs)} docs") def on_enter_done(self): print(f"[→ DONE] Response ready") workflow = RAGWorkflow() workflow.query = "What is attention?" workflow.query_received() # idle → retrieving workflow.docs = ["doc1", "doc2"] workflow.docs_fetched() # retrieving → generating workflow.response = "Attention is..." workflow.response_ready() # generating → done print(workflow.state) # done
Install:
pip install transitions6. Redis — Distributed Shared State
For multi-process or multi-pod Gen AI apps where state must be shared across services.
pythonimport redis import json from datetime import timedelta class SessionStateStore: def __init__(self, host: str = "localhost", port: int = 6379): self._r = redis.Redis(host=host, port=port, decode_responses=True) def set_session(self, session_id: str, state: dict, ttl: int = 3600) -> None: self._r.setex( name=f"session:{session_id}", time=timedelta(seconds=ttl), value=json.dumps(state) ) def get_session(self, session_id: str) -> dict | None: data = self._r.get(f"session:{session_id}") return json.loads(data) if data else None def update_session(self, session_id: str, updates: dict) -> None: state = self.get_session(session_id) or {} state.update(updates) self.set_session(session_id, state) def delete_session(self, session_id: str) -> None: self._r.delete(f"session:{session_id}") # Usage store = SessionStateStore() store.set_session("user-123", {"model": "gpt-4o", "history": [], "tokens_used": 0}) store.update_session("user-123", {"tokens_used": 512}) state = store.get_session("user-123") print(state["tokens_used"]) # 512
7. Gen AI Specific — LangChain Memory
LangChain provides built-in state management for conversational AI.
pythonfrom langchain.memory import ConversationBufferWindowMemory from langchain_openai import ChatOpenAI from langchain.chains import ConversationChain # Buffer memory — keeps last k exchanges memory = ConversationBufferWindowMemory(k=5, return_messages=True) chain = ConversationChain( llm=ChatOpenAI(model="gpt-4o"), memory=memory, verbose=False ) chain.predict(input="My name is Alex and I am a Gen AI engineer.") chain.predict(input="What is RAG?") response = chain.predict(input="What did I tell you my name was?") print(response) # "Your name is Alex." # Inspect stored state print(memory.chat_memory.messages)
8. LangGraph State — Stateful Agent Workflows
LangGraph manages state as a typed dictionary flowing through a graph of nodes.
pythonfrom langgraph.graph import StateGraph, END from typing import TypedDict class AgentState(TypedDict): query: str docs: list[str] response: str is_relevant: bool def retrieve_node(state: AgentState) -> AgentState: state["docs"] = ["doc1", "doc2"] # mock retrieval return state def grade_node(state: AgentState) -> AgentState: state["is_relevant"] = len(state["docs"]) > 0 return state def generate_node(state: AgentState) -> AgentState: state["response"] = f"Answer: {state['query']}" return state def rewrite_node(state: AgentState) -> AgentState: state["query"] = f"Refined: {state['query']}" return state def route(state: AgentState) -> str: return "generate_node" if state["is_relevant"] else "rewrite_node" graph = StateGraph(AgentState) graph.add_node("retrieve_node", retrieve_node) graph.add_node("grade_node", grade_node) graph.add_node("generate_node", generate_node) graph.add_node("rewrite_node", rewrite_node) graph.set_entry_point("retrieve_node") graph.add_edge("retrieve_node", "grade_node") graph.add_conditional_edges("grade_node", route) graph.add_edge("generate_node", END) graph.add_edge("rewrite_node", "retrieve_node") app = graph.compile() result = app.invoke({"query": "What is attention?", "docs": [], "response": "", "is_relevant": False}) print(result["response"])
Choosing the Right State Management
Summary Table
| Approach | Scope | Best For | Flutter Equivalent |
|---|---|---|---|
| Class attributes | Object | Simple local state | text |
| Pydantic model | Object | Validated, immutable state | text |
| Module singleton | App-wide | Global shared state | text |
text | Request/coroutine | Per-request isolation | text |
| State machine | Workflow | Complex transitions | text |
| Redis | Distributed | Multi-process/pod | External text |
| LangChain Memory | Conversation | Chat history | text |
| LangGraph State | Agent graph | Multi-step agent state | text |
Pro tip: In production Gen AI systems, combine approaches —
for per-request isolation, Redis for cross-pod session state, and LangGraph for complex agent workflows.textcontextvars
Learn more at Python contextvars, LangChain Memory, and LangGraph.