Eino Human-in-the-Loop Framework: Architecture Guide
Overview
This document provides technical details of Eino’s human-in-the-loop (Human-in-the-Loop, HITL) framework architecture, focusing on the interrupt/resume mechanism and the underlying addressing system.
Human-in-the-Loop Requirements
The following diagram illustrates the key questions each component must answer during the interrupt/resume process. Understanding these requirements is key to grasping the rationale behind the architecture design.
graph TD
subgraph P1 [Interrupt Phase]
direction LR
subgraph Dev1 [Developer]
direction TB
D1[Should I interrupt now?<br/>Was I interrupted before?]
D2[What information should users<br/>see about this interrupt?]
D3[What state should I persist<br/>for later resume?]
D1 --> D2 --> D3
end
subgraph Fw1 [Framework]
direction TB
F1[Where in the execution hierarchy<br/>did the interrupt occur?]
F2[How to associate state with<br/>the interrupt location?]
F3[How to persist interrupt<br/>context and state?]
F4[What information does the user<br/>need to understand the interrupt?]
F1 --> F2 --> F3 --> F4
end
Dev1 --> Fw1
end
subgraph P2 [User Decision Phase]
direction TB
subgraph "End User"
direction TB
U1[Where in the flow did<br/>the interrupt occur?]
U2[What information did the<br/>developer provide?]
U3[Should I resume this<br/>interrupt?]
U4[Should I provide data<br/>for resume?]
U5[What type of resume<br/>data should I provide?]
U1 --> U2 --> U3 --> U4 --> U5
end
end
subgraph P3 [Resume Phase]
direction LR
subgraph Fw2 [Framework]
direction TB
FR1[Which entity is interrupted<br/>and how to re-run it?]
FR2[How to restore context<br/>for the interrupted entity?]
FR3[How to route user data<br/>to the interrupted entity?]
FR1 --> FR2 --> FR3
end
subgraph Dev2 [Developer]
direction TB
DR1[Am I the explicit<br/>resume target?]
DR2[If not target, should I<br/>re-interrupt to continue?]
DR3[What state did I persist<br/>on interrupt?]
DR4[If resume data is provided,<br/>how should I process it?]
DR1 --> DR2 --> DR3 --> DR4
end
Fw2 --> Dev2
end
P1 --> P2 --> P3
classDef dev fill:#e1f5fe
classDef fw fill:#f3e5f5
classDef user fill:#e8f5e8
class D1,D2,D3,DR1,DR2,DR3,DR4 dev
class F1,F2,F3,F4,FR1,FR2,FR3 fw
class U1,U2,U3,U4,U5 user
Therefore, our goals are:
- Help developers answer the above questions as easily as possible.
- Help end users answer the above questions as easily as possible.
- Enable the framework to answer the above questions automatically and out-of-the-box.
Quick Start
We demonstrate the functionality with a simple ticket booking Agent that asks the user for “confirmation” before actually completing the booking. The user can “approve” or “reject” the booking operation. The complete code for this example is at: https://github.com/cloudwego/eino-examples/tree/main/adk/human-in-the-loop/1_approval
- Create a ChatModelAgent and configure a Tool for booking tickets.
import (
"context"
"fmt"
"log"
"github.com/cloudwego/eino/adk"
"github.com/cloudwego/eino/components/tool"
"github.com/cloudwego/eino/components/tool/utils"
"github.com/cloudwego/eino/compose"
"github.com/cloudwego/eino-examples/adk/common/model"
tool2 "github.com/cloudwego/eino-examples/adk/common/tool"
)
func NewTicketBookingAgent() adk.Agent {
ctx := context.Background()
type bookInput struct {
Location string `json:"location"`
PassengerName string `json:"passenger_name"`
PassengerPhoneNumber string `json:"passenger_phone_number"`
}
getWeather, err := utils.InferTool(
"BookTicket",
"this tool can book ticket of the specific location",
func(ctx context.Context, input bookInput) (output string, err error) {
return "success", nil
})
if err != nil {
log.Fatal(err)
}
a, err := adk.NewChatModelAgent(ctx, &adk.ChatModelAgentConfig{
Name: "TicketBooker",
Description: "An agent that can book tickets",
Instruction: `You are an expert ticket booker.
Based on the user's request, use the "BookTicket" tool to book tickets.`,
Model: model.NewChatModel(),
ToolsConfig: adk.ToolsConfig{
ToolsNodeConfig: compose.ToolsNodeConfig{
Tools: []tool.BaseTool{
// InvokableApprovableTool is a tool decorator provided by eino-examples
// that adds "approval interrupt" functionality to any InvokableTool
&tool2.InvokableApprovableTool{InvokableTool: getWeather},
},
},
},
})
if err != nil {
log.Fatal(fmt.Errorf("failed to create chatmodel: %w", err))
}
return a
}
- Create a Runner, configure CheckPointStore, and run with a CheckPointID. Eino uses CheckPointStore to save the Agent’s running state when interrupted. Here we use InMemoryStore, which saves in memory. In actual use, a distributed store like Redis is recommended. Additionally, Eino uses CheckPointID to uniquely identify and link the “before interrupt” and “after interrupt” runs (or multiple runs).
a := NewTicketBookingAgent()
runner := adk.NewRunner(ctx, adk.RunnerConfig{
EnableStreaming: true, // you can disable streaming here
Agent: a,
// provide a CheckPointStore for eino to persist the execution state of the agent for later resumption.
// Here we use an in-memory store for simplicity.
// In the real world, you can use a distributed store like Redis to persist the checkpoints.
CheckPointStore: store.NewInMemoryStore(),
})
iter := runner.Query(ctx, "book a ticket for Martin, to Beijing, on 2025-12-01, the phone number is 1234567. directly call tool.", adk.WithCheckPointID("1"))
- Get the interrupt information from AgentEvent
event.Action.Interrupted.InterruptContexts[0].Info, which here is “preparing to book which train for whom, do you approve”. You will also get an InterruptID (event.Action.Interrupted.InterruptContexts[0].ID), which the Eino framework uses to identify “where the interrupt occurred”. Here it’s printed directly to the terminal; in actual use, it may need to be returned as an HTTP response to the frontend.
var lastEvent *adk.AgentEvent
for {
event, ok := iter.Next()
if !ok {
break
}
if event.Err != nil {
log.Fatal(event.Err)
}
prints.Event(event)
lastEvent = event
}
// this interruptID is crucial 'locator' for Eino to know where the interrupt happens,
// so when resuming later, you have to provide this same `interruptID` along with the approval result back to Eino
interruptID := lastEvent.Action.Interrupted.InterruptContexts[0].ID
- Show the interrupt information to the user and receive the user’s response, such as “approve”. In this example, everything is displayed to and received from the user on the local terminal. In actual applications, a ChatBot might be used for input/output.
var apResult *tool.ApprovalResult
for {
scanner := bufio.NewScanner(os.Stdin)
fmt.Print("your input here: ")
scanner.Scan()
fmt.Println()
nInput := scanner.Text()
if strings.ToUpper(nInput) == "Y" {
apResult = &tool.ApprovalResult{Approved: true}
break
} else if strings.ToUpper(nInput) == "N" {
// Prompt for reason when denying
fmt.Print("Please provide a reason for denial: ")
scanner.Scan()
reason := scanner.Text()
fmt.Println()
apResult = &tool.ApprovalResult{Approved: false, DisapproveReason: &reason}
break
}
fmt.Println("invalid input, please input Y or N")
}
Sample output:
name: TicketBooker
path: [{TicketBooker}]
tool name: BookTicket
arguments: {"location":"Beijing","passenger_name":"Martin","passenger_phone_number":"1234567"}
name: TicketBooker
path: [{TicketBooker}]
tool 'BookTicket' interrupted with arguments '{"location":"Beijing","passenger_name":"Martin","passenger_phone_number":"1234567"}', waiting for your approval, please answer with Y/N
your input here: Y
- Call Runner.ResumeWithParams, passing the same InterruptID and the data for resumption, which here is “approve”. In this example, the initial
Runner.Queryand subsequentRunner.ResumeWithParamsare in the same instance. In real scenarios, it might be two requests from a ChatBot frontend hitting two different server instances. As long as the CheckPointID is the same both times and the CheckPointStore configured for the Runner is distributed storage, Eino can achieve cross-instance interrupt resumption.
// here we directly resumes right in the same instance where the original `Runner.Query` happened.
// In the real world, the original `Runner.Run/Query` and the subsequent `Runner.ResumeWithParams`
// can happen in different processes or machines, as long as you use the same `CheckPointID`,
// and you provided a distributed `CheckPointStore` when creating the `Runner` instance.
iter, err := runner.ResumeWithParams(ctx, "1", &adk.ResumeParams{
Targets: map[string]any{
interruptID: apResult,
},
})
if err != nil {
log.Fatal(err)
}
for {
event, ok := iter.Next()
if !ok {
break
}
if event.Err != nil {
log.Fatal(event.Err)
}
prints.Event(event)
}
Complete sample output:
name: TicketBooker
path: [{TicketBooker}]
tool name: BookTicket
arguments: {"location":"Beijing","passenger_name":"Martin","passenger_phone_number":"1234567"}
name: TicketBooker
path: [{TicketBooker}]
tool 'BookTicket' interrupted with arguments '{"location":"Beijing","passenger_name":"Martin","passenger_phone_number":"1234567"}', waiting for your approval, please answer with Y/N
your input here: Y
name: TicketBooker
path: [{TicketBooker}]
tool response: success
name: TicketBooker
path: [{TicketBooker}]
answer: The ticket for Martin to Beijing on 2025-12-01 has been successfully booked. If you need any more assistance, feel free
to ask!
More Examples
- Review and Edit Mode: Allows human review and in-place editing of tool call parameters before execution. https://github.com/cloudwego/eino-examples/tree/main/adk/human-in-the-loop/2_review-and-edit
- Feedback Loop Mode: Iterative improvement mode where the agent generates content and humans provide qualitative feedback for improvement. https://github.com/cloudwego/eino-examples/tree/main/adk/human-in-the-loop/3_feedback-loop
- Follow-up Mode: Proactive mode where the agent identifies insufficient tool output and requests clarification or next steps. https://github.com/cloudwego/eino-examples/tree/main/adk/human-in-the-loop/4_follow-up
- Interrupt within supervisor architecture: https://github.com/cloudwego/eino-examples/tree/main/adk/human-in-the-loop/5_supervisor
- Interrupt within plan-execute-replan architecture: https://github.com/cloudwego/eino-examples/tree/main/adk/human-in-the-loop/6_plan-execute-replan
- Interrupt within deep-agents architecture: https://github.com/cloudwego/eino-examples/tree/main/adk/human-in-the-loop/7_deep-agents
- Interrupt when a sub-agent of supervisor is plan-execute-replan: https://github.com/cloudwego/eino-examples/tree/main/adk/human-in-the-loop/8_supervisor-plan-execute
Architecture Overview
The following flowchart illustrates the high-level interrupt/resume flow:
flowchart TD
U[End User]
subgraph R [Runner]
Run
Resume
end
U -->|Initial Input| Run
U -->|Resume Data| Resume
subgraph E [(Arbitrarily Nested) Entities]
Agent
Tool
...
end
subgraph C [Run Context]
Address
InterruptState
ResumeData
end
Run -->|any number of transfer / call| E
R <-->|persist/restore| C
Resume -->|replay transfer / call| E
C -->|auto assigned to| E
The following sequence diagram shows the time-ordered interaction flow between the three main participants:
sequenceDiagram
participant D as Developer
participant F as Framework
participant U as End User
Note over D,F: 1. Interrupt Phase
D->>F: call StatefulInterrupt()<br>specify info and state
F->>F: persist InterruptID->{address, state}
Note over F,U: 2. User Decision Phase
F->>U: emit InterruptID->{address, info}
U->>U: decide InterruptID->{resume data}
U->>F: call TargetedResume()<br>provide InterruptID->{resume data}
Note over D,F: 3. Resume Phase
F->>F: route to interrupted entity
F->>D: provide state and resume data
D->>D: process resume
ADK Package APIs
The ADK package provides high-level abstractions for building interruptible agents with human-in-the-loop capabilities.
1. APIs for Interrupting
Interrupt
Creates a basic interrupt action. Use when an agent needs to pause execution to request external input or intervention, but does not need to save any internal state for resumption.
func Interrupt(ctx context.Context, info any) *AgentEvent
Parameters:
ctx: The context of the running component.info: User-facing data describing the reason for the interrupt.
Returns: *AgentEvent with an interrupt action.
Example:
// Inside an agent's Run method:
// Create a simple interrupt to request clarification.
return adk.Interrupt(ctx, "User query is unclear, please clarify.")
StatefulInterrupt
Creates an interrupt action while saving the agent’s internal state. Use when an agent has internal state that must be restored to continue correctly.
func StatefulInterrupt(ctx context.Context, info any, state any) *AgentEvent
Parameters:
ctx: The context of the running component.info: User-facing data describing the interrupt.state: The agent’s internal state object, which will be serialized and stored.
Returns: *AgentEvent with an interrupt action.
Example:
// Inside an agent's Run method:
// Define the state to save.
type MyAgentState struct {
ProcessedItems int
CurrentTopic string
}
currentState := &MyAgentState{
ProcessedItems: 42,
CurrentTopic: "HITL",
}
// Interrupt and save current state.
return adk.StatefulInterrupt(ctx, "Need user feedback before continuing", currentState)
CompositeInterrupt
Creates an interrupt action for components that coordinate multiple sub-components. It combines interrupts from one or more child agents into a single, cohesive interrupt. Any agent containing child agents (e.g., custom Sequential or Parallel agents) uses this to propagate their children’s interrupts.
func CompositeInterrupt(ctx context.Context, info any, state any,
subInterruptSignals ...*InterruptSignal) *AgentEvent
Parameters:
ctx: The context of the running component.info: User-facing data describing the coordinator’s own interrupt reason.state: The coordinator agent’s own state (e.g., the index of the interrupted child agent).subInterruptSignals: A variadic list ofInterruptSignalobjects from interrupted child agents.
Returns: *AgentEvent with an interrupt action.
Example:
// In a custom sequential agent running two child agents...
subAgent1 := &myInterruptingAgent{}
subAgent2 := &myOtherAgent{}
// If subAgent1 returns an interrupt event...
subInterruptEvent := subAgent1.Run(ctx, input)
// The parent agent must capture it and wrap it in CompositeInterrupt.
if subInterruptEvent.Action.Interrupted != nil {
// The parent agent can add its own state, like which child agent was interrupted.
parentState := map[string]int{"interrupted_child_index": 0}
// Bubble up the interrupt.
return adk.CompositeInterrupt(ctx,
"A child agent needs attention",
parentState,
subInterruptEvent.Action.Interrupted.internalInterrupted,
)
}
2. APIs for Getting Interrupt Information
InterruptInfo and InterruptCtx
When agent execution is interrupted, the AgentEvent contains structured interrupt information. The InterruptInfo struct contains a list of InterruptCtx objects, each representing an interrupt point in the hierarchy.
InterruptCtx provides a complete, user-facing context for a single resumable interrupt point.
type InterruptCtx struct {
// ID is the unique, fully qualified address of the interrupt point, used for targeted resume.
// For example: "agent:A;node:graph_a;tool:tool_call_123"
ID string
// Address is a structured sequence of AddressSegment segments leading to the interrupt point.
Address Address
// Info is the user-facing information associated with the interrupt, provided by the component that triggered it.
Info any
// IsRootCause indicates whether the interrupt point is the exact root cause of the interrupt.
IsRootCause bool
// Parent points to the context of the parent component in the interrupt chain (nil for top-level interrupts).
Parent *InterruptCtx
}
The following example shows how to access this information:
// At the application layer, after an interrupt:
if event.Action != nil && event.Action.Interrupted != nil {
interruptInfo := event.Action.Interrupted
// Get a flat list of all interrupt points
interruptPoints := interruptInfo.InterruptContexts
for _, point := range interruptPoints {
// Each point contains a unique ID, user-facing info, and its hierarchical address
fmt.Printf("Interrupt ID: %s, Address: %s, Info: %v\n", point.ID, point.Address.String(), point.Info)
}
}
3. APIs for End User Resume
(*Runner).ResumeWithParams
Continues interrupted execution from a checkpoint using the “explicit targeted resume” strategy. This is the most common and powerful way to resume, allowing you to target specific interrupt points and provide data for them.
When using this method:
- Components whose addresses are in the
ResumeParams.Targetsmap will be explicit targets. - Interrupted components whose addresses are not in the
ResumeParams.Targetsmap must re-interrupt themselves to preserve their state.
func (r *Runner) ResumeWithParams(ctx context.Context, checkPointID string,
params *ResumeParams, opts ...AgentRunOption) (*AsyncIterator[*AgentEvent], error)
Parameters:
ctx: The context for resumption.checkPointID: The identifier of the checkpoint to resume from.params: Interrupt parameters containing a mapping from interrupt IDs to resume data. These IDs can point to any interruptible component throughout the execution graph.opts: Additional run options.
Returns: An async iterator of agent events.
Example:
// After receiving an interrupt event...
interruptID := interruptEvent.Action.Interrupted.InterruptContexts[0].ID
// Prepare data for the specific interrupt point.
resumeData := map[string]any{
interruptID: "This is the clarification you requested.",
}
// Resume execution with targeted data.
resumeIterator, err := runner.ResumeWithParams(ctx, "my-checkpoint-id", &ResumeParams{Targets: resumeData})
if err != nil {
// Handle error
}
// Process events from the resume iterator
for event := range resumeIterator.Events() {
if event.Err != nil {
// Handle event error
break
}
// Process agent event
fmt.Printf("Event: %+v\n", event)
}
4. APIs for Developer Resume
ResumeInfo Struct
ResumeInfo holds all the information needed to resume an interrupted agent execution. It is created by the framework and passed to the agent’s Resume method.
type ResumeInfo struct {
// WasInterrupted indicates whether this agent had an interrupt in the previous Runner run.
WasInterrupted bool
// InterruptState holds the state saved via StatefulInterrupt or CompositeInterrupt.
InterruptState any
// IsResumeTarget indicates whether this agent is a specific target of ResumeWithParams.
IsResumeTarget bool
// ResumeData holds the data provided by the user for this agent.
ResumeData any
// ... other fields
}
Example:
import (
"context"
"errors"
"fmt"
"github.com/cloudwego/eino/adk"
)
// Inside an agent's Resume method:
func (a *myAgent) Resume(ctx context.Context, info *adk.ResumeInfo, opts ...adk.AgentRunOption) *adk.AsyncIterator[*adk.AgentEvent] {
if !info.WasInterrupted {
// Already entered the Resume method, must have WasInterrupted = true
return adk.NewAsyncIterator([]*adk.AgentEvent{{Err: errors.New("not an interrupt")}}, nil)
}
if !info.IsResumeTarget {
// This agent is not a specific target, so it must re-interrupt to preserve its state.
return adk.StatefulInterrupt(ctx, "Waiting for another part of the workflow to be resumed", info.InterruptState)
}
// This agent is the target. Process the resume data.
if info.ResumeData != nil {
userInput, ok := info.ResumeData.(string)
if ok {
// Process user input and continue execution
fmt.Printf("Received user input: %s\n", userInput)
// Update agent state based on user input
a.currentState.LastUserInput = userInput
}
}
// Continue normal execution logic
return a.Run(ctx, &adk.AgentInput{Input: "resumed execution"})
}
Compose Package APIs
The compose package provides low-level building blocks for creating complex, interruptible workflows.
1. APIs for Interrupting
Interrupt
Creates a special error that signals the execution engine to interrupt the current run at the component’s specific address and save a checkpoint. This is the standard way for a single, non-composite component to signal a resumable interrupt.
func Interrupt(ctx context.Context, info any) error
Parameters:
ctx: The context of the running component, used to retrieve the current execution address.info: User-facing information about the interrupt. This information is not persisted but is exposed to the calling application viaInterruptCtx.
StatefulInterrupt
Similar to Interrupt, but also saves the component’s internal state. The state is saved in the checkpoint and provided back to the component via GetInterruptState on resume.
func StatefulInterrupt(ctx context.Context, info any, state any) error
Parameters:
ctx: The context of the running component.info: User-facing information about the interrupt.state: The internal state that the interrupted component needs to persist.
CompositeInterrupt
Creates a special error representing a composite interrupt. It is designed for “composite” nodes (like ToolsNode) or any component that coordinates multiple independent, interruptible sub-processes. It bundles multiple child interrupt errors into a single error that the engine can deconstruct into a flat list of resumable points.
func CompositeInterrupt(ctx context.Context, info any, state any, errs ...error) error
Parameters:
ctx: The context of the running composite node.info: User-facing information for the composite node itself (can benil).state: The state of the composite node itself (can benil).errs: A list of errors from sub-processes. These can beInterrupt,StatefulInterrupt, or nestedCompositeInterrupterrors.
Example:
// A node that runs multiple processes in parallel.
var errs []error
for _, process := range processes {
subCtx := compose.AppendAddressSegment(ctx, "process", process.ID)
_, err := process.Run(subCtx)
if err != nil {
errs = append(errs, err)
}
}
// If any sub-process interrupts, bundle them together.
if len(errs) > 0 {
// The composite node can save its own state, e.g., which processes have completed.
return compose.CompositeInterrupt(ctx, "Parallel execution needs input", parentState, errs...)
}
2. APIs for Getting Interrupt Information
ExtractInterruptInfo
Extracts a structured InterruptInfo object from an error returned by a Runnable’s Invoke or Stream method. This is the primary way for applications to get a list of all interrupt points after execution pauses.
composeInfo, ok := compose.ExtractInterruptInfo(err)
if ok {
// Access interrupt contexts
interruptContexts := composeInfo.InterruptContexts
}
Example:
// After calling a graph that interrupts...
_, err := graph.Invoke(ctx, "initial input")
if err != nil {
interruptInfo, isInterrupt := compose.ExtractInterruptInfo(err)
if isInterrupt {
fmt.Printf("Execution was interrupted by %d interrupt points.\n", len(interruptInfo.InterruptContexts))
// Now you can inspect interruptInfo.InterruptContexts to decide how to resume.
}
}
3. APIs for End User Resume
Resume
Prepares the context for an “explicit targeted resume” operation by targeting one or more components without providing data. This is useful when the resume action itself is the signal.
func Resume(ctx context.Context, interruptIDs ...string) context.Context
Example:
// After an interrupt, we get two interrupt IDs: id1 and id2.
// We want to resume both without providing specific data.
resumeCtx := compose.Resume(context.Background(), id1, id2)
// Pass this context to the next Invoke/Stream call.
// In the components corresponding to id1 and id2, GetResumeContext will return isResumeFlow = true.
ResumeWithData
Prepares a context to resume a single specific component with data. It is a convenience wrapper around BatchResumeWithData.
func ResumeWithData(ctx context.Context, interruptID string, data any) context.Context
Example:
// Resume a single interrupt point with specific data.
resumeCtx := compose.ResumeWithData(context.Background(), interruptID, "This is the specific data you requested.")
// Pass this context to the next Invoke/Stream call.
BatchResumeWithData
This is the core function for preparing a resume context. It injects a mapping of resume targets (interrupt IDs) and their corresponding data into the context. Components whose interrupt IDs exist as keys will receive isResumeFlow = true when calling GetResumeContext.
func BatchResumeWithData(ctx context.Context, resumeData map[string]any) context.Context
Example:
// Resume multiple interrupt points at once, each with different data.
resumeData := map[string]any{
"interrupt-id-1": "Data for the first point.",
"interrupt-id-2": 42, // Data can be any type.
"interrupt-id-3": nil, // Equivalent to using Resume() for this ID.
}
resumeCtx := compose.BatchResumeWithData(context.Background(), resumeData)
// Pass this context to the next Invoke/Stream call.
4. APIs for Developer Resume
GetInterruptState
Provides a type-safe way to check and retrieve the persisted state from a previous interrupt. This is the primary function for components to learn about their past state.
func GetInterruptState[T any](ctx context.Context) (wasInterrupted bool, hasState bool, state T)
Return values:
wasInterrupted:trueif the node was part of a previous interrupt.hasState:trueif state was provided and successfully converted to typeT.state: The typed state object.
Example:
// Inside a lambda or tool's execution logic:
wasInterrupted, hasState, state := compose.GetInterruptState[*MyState](ctx)
if wasInterrupted {
fmt.Println("This component was interrupted in a previous run.")
if hasState {
fmt.Printf("Restored state: %+v\n", state)
}
} else {
// This is the first time this component is running in this execution.
}
GetResumeContext
Checks if the current component is a target of a resume operation and retrieves any user-provided data. This is typically called after GetInterruptState confirms the component was interrupted.
func GetResumeContext[T any](ctx context.Context) (isResumeFlow bool, hasData bool, data T)
Return values:
isResumeFlow:trueif the component was explicitly targeted by the resume call. Iffalse, the component must re-interrupt to preserve its state.hasData:trueif data was provided for this component.data: The typed data provided by the user.
Example:
// Inside a lambda or tool's execution logic, after checking GetInterruptState:
wasInterrupted, _, oldState := compose.GetInterruptState[*MyState](ctx)
if wasInterrupted {
isTarget, hasData, resumeData := compose.GetResumeContext[string](ctx)
if isTarget {
// This component is the target, continue execution logic.
if hasData {
fmt.Printf("Resuming with user data: %s\n", resumeData)
}
// Complete work using restored state and resume data
result := processWithStateAndData(state, resumeData)
return result, nil
} else {
// This component is not the target, so it must re-interrupt.
return compose.StatefulInterrupt(ctx, "Waiting for another component to be resumed", oldState)
}
}
Underlying Architecture: Addressing System
The Need for Addresses
The addressing system is designed to solve three fundamental needs in effective human-in-the-loop interaction:
- State Attachment: To attach local state to interrupt points, we need a stable, unique locator for each interrupt point.
- Targeted Resume: To provide targeted resume data for specific interrupt points, we need a way to precisely identify each point.
- Interrupt Localization: To tell end users exactly where in the execution hierarchy the interrupt occurred.
How Addresses Meet These Needs
The address system meets these needs through three key properties:
- Stability: Addresses remain consistent across multiple executions, ensuring the same interrupt point can be reliably identified.
- Uniqueness: Each interrupt point has a unique address, enabling precise targeting during resume.
- Hierarchical Structure: Addresses provide a clear hierarchical path showing exactly where in the execution flow the interrupt occurred.
Address Structure and Segment Types
Address Structure
type Address struct {
Segments []AddressSegment
}
type AddressSegment struct {
Type AddressSegmentType
ID string
SubID string
}
Address Structure Diagram
The following diagrams illustrate the hierarchical structure of Address and its AddressSegment from both ADK and Compose perspectives:
ADK Layer Perspective (Simplified, Agent-centric view):
graph TD
A[Address] --> B[AddressSegment 1]
A --> C[AddressSegment 2]
A --> D[AddressSegment 3]
B --> B1[Type: Agent]
B --> B2[ID: A]
C --> C1[Type: Agent]
C --> C2[ID: B]
D --> D1[Type: Tool]
D --> D2[ID: search_tool]
D --> D3[SubID: 1]
style A fill:#e1f5fe
style B fill:#f3e5f5
style C fill:#f3e5f5
style D fill:#f3e5f5
style B1 fill:#e8f5e8
style B2 fill:#e8f5e8
style C1 fill:#e8f5e8
style C2 fill:#e8f5e8
style D1 fill:#e8f5e8
style D2 fill:#e8f5e8
style D3 fill:#e8f5e8
Compose Layer Perspective (Detailed, full hierarchy view):
graph TD
A[Address] --> B[AddressSegment 1]
A --> C[AddressSegment 2]
A --> D[AddressSegment 3]
A --> E[AddressSegment 4]
B --> B1[Type: Runnable]
B --> B2[ID: my_graph]
C --> C1[Type: Node]
C --> C2[ID: sub_graph]
D --> D1[Type: Node]
D --> D2[ID: tools_node]
E --> E1[Type: Tool]
E --> E2[ID: mcp_tool]
E --> E3[SubID: 1]
style A fill:#e1f5fe
style B fill:#f3e5f5
style C fill:#f3e5f5
style D fill:#f3e5f5
style E fill:#f3e5f5
style B1 fill:#e8f5e8
style B2 fill:#e8f5e8
style C1 fill:#e8f5e8
style C2 fill:#e8f5e8
style D1 fill:#e8f5e8
style D2 fill:#e8f5e8
style E1 fill:#e8f5e8
style E2 fill:#e8f5e8
style E3 fill:#e8f5e8
Layer-Specific Address Segment Types
ADK Layer Segment Types
The ADK layer provides a simplified, agent-centric abstraction of the execution hierarchy:
type AddressSegmentType = core.AddressSegmentType
const (
AddressSegmentAgent AddressSegmentType = "agent"
AddressSegmentTool AddressSegmentType = "tool"
)
Key Features:
- Agent Segment: Represents agent-level execution segments (typically omits
SubID). - Tool Segment: Represents tool-level execution segments (
SubIDis used to ensure uniqueness). - Simplified View: Abstracts away underlying complexity for agent developers.
- Example Path:
Agent:A → Agent:B → Tool:search_tool:1
Compose Layer Segment Types
The compose layer provides fine-grained control and visibility over the entire execution hierarchy:
type AddressSegmentType = core.AddressSegmentType
const (
AddressSegmentRunnable AddressSegmentType = "runnable" // Graph, Workflow, or Chain
AddressSegmentNode AddressSegmentType = "node" // Individual graph nodes
AddressSegmentTool AddressSegmentType = "tool" // Specific tool calls
)
Key Features:
- Runnable Segment: Represents top-level executables (Graph, Workflow, Chain).
- Node Segment: Represents individual nodes in the execution graph.
- Tool Segment: Represents specific tool calls within a
ToolsNode. - Detailed View: Provides full visibility into the execution hierarchy.
- Example Path:
Runnable:my_graph → Node:sub_graph → Node:tools_node → Tool:mcp_tool:1
Extensibility and Design Principles
The address segment type system is designed to be extensible. Framework developers can add new segment types to support additional execution modes or custom components while maintaining backward compatibility.
Key Design Principle: The ADK layer provides simplified, agent-centric abstractions, while the compose layer handles the full complexity of the execution hierarchy. This layered approach allows developers to work at the abstraction level that suits their needs.
Backward Compatibility
The human-in-the-loop framework maintains full backward compatibility with existing code. All previous interrupt and resume patterns will continue to work as before, while providing enhanced functionality through the new addressing system.
1. Graph Interrupt Compatibility
Previous graph interrupt flows using the deprecated NewInterruptAndRerunErr or InterruptAndRerun in nodes/tools will continue to be supported, but require one critical additional step: error wrapping.
Since these functions are unaware of the new addressing system, it is the responsibility of the component calling them to capture the error and wrap the address information into it using the WrapInterruptAndRerunIfNeeded helper function. This is typically done inside the composite node (like the official ToolsNode) that calls the legacy component.
Note: If you choose not to use
WrapInterruptAndRerunIfNeeded, the original behavior of these functions will be preserved. End users can still useExtractInterruptInfoto get information from the error as before. However, since the resulting interrupt context will lack the correct address, the new targeted resume APIs will not be available for that specific interrupt point. To fully enable the new address-aware functionality, wrapping is required.
// 1. A legacy tool using deprecated interrupt
func myLegacyTool(ctx context.Context, input string) (string, error) {
// ... tool logic
// This error is not address-aware.
return "", compose.NewInterruptAndRerunErr("Requires user approval")
}
// 2. A composite node calling the legacy tool
var legacyToolNode = compose.InvokableLambda(func(ctx context.Context, input string) (string, error) {
out, err := myLegacyTool(ctx, input)
if err != nil {
// Critical: The caller must wrap the error to add the address.
// The "tool:legacy_tool" segment will be appended to the current address.
segment := compose.AddressSegment{Type: "tool", ID: "legacy_tool"}
return "", compose.WrapInterruptAndRerunIfNeeded(ctx, segment, err)
}
return out, nil
})
// 3. End user code can now see the full address.
_, err := graph.Invoke(ctx, input)
if err != nil {
interruptInfo, exists := compose.ExtractInterruptInfo(err)
if exists {
// The interrupt context will now have a correct, fully qualified address.
fmt.Printf("Interrupt Address: %s\n", interruptInfo.InterruptContexts[0].Address.String())
}
}
2. Compatibility with Static Interrupts Added at Compile Time
Static interrupts added via WithInterruptBeforeNodes or WithInterruptAfterNodes continue to work, but the way state is handled has been improved.
When a static interrupt is triggered, an InterruptCtx is generated with an address pointing to the graph (or subgraph) where the interrupt was defined. The key is that the InterruptCtx.Info field now directly exposes the graph’s state.
This enables a more direct and intuitive workflow:
- The end user receives the
InterruptCtxand can inspect the graph’s live state via the.Infofield. - They can directly modify this state object.
- They can then pass the modified graph state object back via
ResumeWithDataandInterruptCtx.IDto resume execution.
This new pattern often eliminates the need for the old WithStateModifier option, although it remains available for full backward compatibility.
// 1. Define a graph with its own local state
type MyGraphState struct {
SomeValue string
}
g := compose.NewGraph[string, string](compose.WithGenLocalState(func(ctx context.Context) *MyGraphState {
return &MyGraphState{SomeValue: "initial"}
}))
// ... add nodes 1 and 2 to the graph ...
// 2. Compile the graph with a static interrupt point
// This will interrupt the graph itself after the "node_1" node completes.
graph, err := g.Compile(ctx, compose.WithInterruptAfterNodes([]string{"node_1"}))
// 3. Run the graph, which will trigger the static interrupt
_, err = graph.Invoke(ctx, "start")
// 4. Extract the interrupt context and the graph's state
interruptInfo, isInterrupt := compose.ExtractInterruptInfo(err)
if isInterrupt {
interruptCtx := interruptInfo.InterruptContexts[0]
// The .Info field exposes the graph's current state
graphState, ok := interruptCtx.Info.(*MyGraphState)
if ok {
// 5. Directly modify the state
fmt.Printf("Original state value: %s\n", graphState.SomeValue) // Prints "initial"
graphState.SomeValue = "a-new-value-from-user"
// 6. Resume by passing back the modified state object
resumeCtx := compose.ResumeWithData(context.Background(), interruptCtx.ID, graphState)
result, err := graph.Invoke(resumeCtx, "start")
// ... execution will continue, and node_2 will now see the modified state.
}
}
3. Agent Interrupt Compatibility
Compatibility with legacy agents is maintained at the data structure level, ensuring that old agent implementations continue to work within the new framework. The key lies in how the adk.InterruptInfo and adk.ResumeInfo structs are populated.
For End Users (Application Layer):
When receiving an interrupt from an agent, the adk.InterruptInfo struct will have both populated:
- The new, structured
InterruptContextsfield. - The legacy
Datafield, which will contain the original interrupt information (e.g.,ChatModelAgentInterruptInfoorWorkflowInterruptInfo).
This allows end users to gradually migrate their application logic to use the richer InterruptContexts while still having access to the old Data field when needed.
For Agent Developers:
When a legacy agent’s Resume method is called, the adk.ResumeInfo struct it receives still contains the now-deprecated embedded InterruptInfo field. This field is populated with the same legacy data structures, allowing agent developers to maintain their existing resume logic without immediately updating to the new address-aware APIs.
// --- End User Perspective ---
// After an agent run, you receive an interrupt event.
if event.Action != nil && event.Action.Interrupted != nil {
interruptInfo := event.Action.Interrupted
// 1. New way: Access structured interrupt contexts
if len(interruptInfo.InterruptContexts) > 0 {
fmt.Printf("New structured context available: %+v\n", interruptInfo.InterruptContexts[0])
}
// 2. Old way (still works): Access the legacy Data field
if chatInterrupt, ok := interruptInfo.Data.(*adk.ChatModelAgentInterruptInfo); ok {
fmt.Printf("Legacy ChatModelAgentInterruptInfo still accessible.\n")
// ... logic using the old struct
}
}
// --- Agent Developer Perspective ---
// Inside a legacy agent's Resume method:
func (a *myLegacyAgent) Resume(ctx context.Context, info *adk.ResumeInfo) *adk.AsyncIterator[*adk.AgentEvent] {
// The deprecated embedded InterruptInfo field will still be populated.
// This allows old resume logic to continue working.
if info.InterruptInfo != nil {
if chatInterrupt, ok := info.InterruptInfo.Data.(*adk.ChatModelAgentInterruptInfo); ok {
// ... existing resume logic relying on the old ChatModelAgentInterruptInfo struct
fmt.Println("Resuming based on legacy InterruptInfo.Data field.")
}
}
// ... continue execution
}
Migration Advantages
- Preserve Legacy Behavior: Existing code will continue to work as it did. Old interrupt patterns will not cause crashes, but they also won’t automatically gain new address-aware capabilities without modification.
- Gradual Adoption: Teams can selectively enable new features on a case-by-case basis. For example, you can wrap legacy interrupts with
WrapInterruptAndRerunIfNeededonly in workflows where you need targeted resume. - Enhanced Functionality: The new addressing system provides richer structured context (
InterruptCtx) for all interrupts, while the old data fields are still populated for full compatibility. - Flexible State Management: For static graph interrupts, you can choose modern, direct state modification via the
.Infofield, or continue using the oldWithStateModifieroption.
This backward compatibility model ensures a smooth transition for existing users while providing a clear path to adopting the new human-in-the-loop features.