LangChain Reference home pageLangChain ReferenceLangChain Reference
  • GitHub
  • Main Docs
Deep Agents
LangChain
LangGraph
Integrations
LangSmith
  • Overview
    • Overview
    • Graphs
    • Functional API
    • Pregel
    • Checkpointing
    • Storage
    • Caching
    • Types
    • Runtime
    • Config
    • Errors
    • Constants
    • Channels
    • Agents
    LangGraph CLI
    LangGraph SDK
    LangGraph Supervisor
    LangGraph Swarm
    ⌘I

    LangChain Assistant

    Ask a question to get started

    Enter to send•Shift+Enter new line

    Menu

    OverviewGraphsFunctional APIPregelCheckpointingStorageCachingTypesRuntimeConfigErrorsConstantsChannelsAgents
    LangGraph CLI
    LangGraph SDK
    LangGraph Supervisor
    LangGraph Swarm
    Language
    Theme
    Pythonlanggraphconfigget_stream_writer
    Function●Since v0.2

    get_stream_writer

    Access LangGraph StreamWriter from inside a graph node or entrypoint task at runtime.

    Can be called from inside any StateGraph node or functional API task.

    Async with Python < 3.11

    If you are using Python < 3.11 and are running LangGraph asynchronously, get_stream_writer() won't work since it uses contextvar propagation (only available in Python >= 3.11).

    Copy
    get_stream_writer() -> StreamWriter

    Using with StateGraph:

    from typing_extensions import TypedDict
    from langgraph.graph import StateGraph, START
    from langgraph.config import get_stream_writer
    
    class State(TypedDict):
        foo: int
    
    def my_node(state: State):
        my_stream_writer = get_stream_writer()
        my_stream_writer({"custom_data": "Hello!"})
        return {"foo": state["foo"] + 1}
    
    graph = (
        StateGraph(State)
        .add_node(my_node)
        .add_edge(START, "my_node")
        .compile(store=store)
    )
    
    for chunk in graph.stream({"foo": 1}, stream_mode="custom"):
        print(chunk)
    {"custom_data": "Hello!"}
    

    Using with functional API:

    from langgraph.func import entrypoint, task
    from langgraph.config import get_stream_writer
    
    @task
    def my_task(value: int):
        my_stream_writer = get_stream_writer()
        my_stream_writer({"custom_data": "Hello!"})
        return value + 1
    
    @entrypoint(store=store)
    def workflow(value: int):
        return my_task(value).result()
    
    for chunk in workflow.stream(1, stream_mode="custom"):
        print(chunk)
    {"custom_data": "Hello!"}
    

    Used in Docs

    • Overview
    • Overview
    • Streaming
    • Use the functional API
    View source on GitHub