a2asrv

package
v0.3.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 14, 2026 License: Apache-2.0 Imports: 22 Imported by: 22

Documentation

Overview

Package a2asrv provides a configurable A2A protocol server implementation.

The default implementation can be created using NewRequestHandler. The function takes a single required AgentExecutor dependency and a variable number of RequestHandlerOption-s used to customize handler behavior.

AgentExecutor implementation is responsible for invoking the agent, translating its outputs to a2a core types and writing them to the provided eventqueue.Queue. A2A server will be reading data from the queue, processing it and notifying connected clients.

RequestHandler is transport-agnostic and needs to be wrapped in a transport-specific translation layer like github.com/a2aproject/a2a-go/a2agrpc.Handler. JSONRPC transport implementation can be created using NewJSONRPCHandler function and registered with the standard http.Server:

handler := a2asrv.NewHandler(
	agentExecutor,
	a2asrv.WithTaskStore(customDB),
	a2asrv.WithPushNotifications(configStore, sender),
	a2asrv.WithCallInterceptor(customMiddleware),
	...
)

mux := http.NewServeMux()
mux.Handle("/invoke", a2asrv.NewJSONRPCHandler(handler))

The package provides utilities for serving public a2a.AgentCard-s. These return handler implementations which can be registered with a standard http server. Since the card is public, CORS policy allows requests from any domain.

mux.Handle(a2asrv, a2asrv.NewStaticAgentCardHandler(card))

// or for more advanced use cases

mux.Handle(a2asrv, a2asrv.NewAgentCardHandler(producer))

Index

Constants

View Source
const ExtensionsMetaKey = "X-A2A-Extensions"

ExtensionsMetaKey is the default extensions key for extensions metadata passed with a request or in a response.

View Source
const WellKnownAgentCardPath = "/.well-known/agent-card.json"

Variables

This section is empty.

Functions

func NewAgentCardHandler

func NewAgentCardHandler(producer AgentCardProducer) http.Handler

NewAgentCardHandler creates an http.Handler implementation for serving a public a2a.AgentCard.

func NewJSONRPCHandler

func NewJSONRPCHandler(handler RequestHandler) http.Handler

NewJSONRPCHandler creates an http.Handler implementation for serving A2A-protocol over JSONRPC.

func NewStaticAgentCardHandler

func NewStaticAgentCardHandler(card *a2a.AgentCard) http.Handler

NewStaticAgentCardHandler creates an http.Handler implementation for serving a public a2a.AgentCard which is not expected to change while the program is running. The method panics if the argument json marhsaling fails.

Types

type AgentCardProducer

type AgentCardProducer interface {
	// Card returns a self-describing manifest for an agent. It provides essential
	// metadata including the agent's identity, capabilities, skills, supported
	// communication methods, and security requirements.
	Card(ctx context.Context) (*a2a.AgentCard, error)
}

AgentCardProducer creates an AgentCard instances used for agent discovery and capability negotiation.

type AgentCardProducerFn

type AgentCardProducerFn func(ctx context.Context) (*a2a.AgentCard, error)

AgentCardProducerFn is a function type which implements AgentCardProducer.

func (AgentCardProducerFn) Card

type AgentExecutor

type AgentExecutor interface {
	// Execute invokes the agent passing information about the request which triggered execution,
	// translates agent outputs to A2A events and writes them to the event queue.
	// Every invocation runs in a dedicated goroutine.
	//
	// Failures should generally be reported by writing events carrying the cancelation information
	// and task state. An error should be returned in special cases like a failure to write an event.
	Execute(ctx context.Context, reqCtx *RequestContext, queue eventqueue.Queue) error

	// Cancel is called when a client requests the agent to stop working on a task.
	// The simplest implementation can write a cancelation event to the queue and let
	// it be processed by the A2A server. If the events gets applied during an active execution the execution
	// Context gets canceled.
	//
	// An an error should be returned if the cancelation request cannot be processed or a queue write failed.
	Cancel(ctx context.Context, reqCtx *RequestContext, queue eventqueue.Queue) error
}

AgentExecutor implementations translate agent outputs to A2A events. The provided RequestContext should be used as a a2a.TaskInfoProvider argument for a2a.Event-s constructor functions. For streaming responses a2a.TaskArtifactUpdatEvent-s should be used. A2A server stops processing events after one of these events:

The following code can be used as a streaming implementation template with generateOutputs and toParts missing:

func Execute(ctx context.Context, reqCtx *RequestContext, queue eventqueue.Queue) error {
	if reqCtx.StoredTask == nil {
		event := a2a.NewStatusUpdateEvent(reqCtx, a2a.TaskStateSubmitted, nil)
		if err := queue.Write(ctx, event); err != nil {
			return fmt.Errorf("failed to write state submitted: %w", err)
		}
	}

	// perform setup

	event := a2a.NewStatusUpdateEvent(reqCtx, a2a.TaskStateWorking, nil)
	if err := queue.Write(ctx, event); err != nil {
		return fmt.Errorf("failed to write state working: %w", err)
	}

	var artifactID a2a.ArtifactID
	for output, err := range generateOutputs() {
		if err != nil {
			event := a2a.NewStatusUpdateEvent(reqCtx, a2a.TaskStateFailed, toErrorMessage(err))
			if err := queue.Write(ctx, event); err != nil {
				return fmt.Errorf("failed to write state failed: %w", err)
			}
		}

		parts := toParts(output)
		var event *a2a.TaskArtifactUpdateEvent
		if artifactID == "" {
			event = a2a.NewArtifactEvent(reqCtx, parts...)
			artifactID = event.Artifact.ID
		} else {
			event = a2a.NewArtifactUpdateEvent(reqCtx, artifactID, parts...)
		}

		if err := queue.Write(ctx, event); err != nil {
			return fmt.Errorf("failed to write artifact update: %w", err)
		}
	}

	event = a2a.NewStatusUpdateEvent(reqCtx, a2a.TaskStateCompleted, nil)
	event.Final = true
	if err := queue.Write(ctx, event); err != nil {
		return fmt.Errorf("failed to write state working: %w", err)
	}

	return nil
}

type AuthenticatedUser

type AuthenticatedUser struct {
	UserName string
}

AuthenticatedUser is a simple implementation of User interface configurable with a username.

func (*AuthenticatedUser) Authenticated

func (u *AuthenticatedUser) Authenticated() bool

func (*AuthenticatedUser) Name

func (u *AuthenticatedUser) Name() string

type CallContext

type CallContext struct {

	// User can be set by authentication middleware to provide information about
	// the user who initiated the request.
	User User
	// contains filtered or unexported fields
}

CallContext holds information about the current server call scope.

func CallContextFrom

func CallContextFrom(ctx context.Context) (*CallContext, bool)

CallContextFrom allows to get a CallContext struct which holds additional information about the current execution scope.

func WithCallContext

func WithCallContext(ctx context.Context, meta *RequestMeta) (context.Context, *CallContext)

WithCallContext can be called by a transport implementation to provide request metadata to RequestHandler or to have access to the list of activated extensions after the call ends. If context already had a CallContext attached, the old context will be shadowed.

func (*CallContext) Extensions

func (cc *CallContext) Extensions() *Extensions

Extensions returns a struct which provides an API for working with extensions in the current call context.

func (*CallContext) Method

func (cc *CallContext) Method() string

Method returns the name of the RequestHandler method which is being executed.

func (*CallContext) RequestMeta

func (cc *CallContext) RequestMeta() *RequestMeta

RequestMeta returns metadata of the request which created the call context.

type CallInterceptor

type CallInterceptor interface {
	// Before allows to observe, modify or reject a Request.
	// A new context.Context can be returned to pass information to one of the extension points.
	Before(ctx context.Context, callCtx *CallContext, req *Request) (context.Context, error)

	// After allows to observe, modify or reject a Response.
	After(ctx context.Context, callCtx *CallContext, resp *Response) error
}

CallInterceptor can be attached to an RequestHandler. If multiple interceptors are added:

  • Before will be executed in the order of attachment sequentially.
  • After will be executed in the reverse order sequentially.

type ClusterConfig added in v0.3.4

type ClusterConfig struct {
	QueueManager eventqueue.Manager
	WorkQueue    workqueue.Queue
	TaskStore    TaskStore
}

ClusterConfig groups the necessary dependencies for A2A cluster mode operation.

type Extensions

type Extensions struct {
	// contains filtered or unexported fields
}

Extensions provides utility methods for accessing extensions requested by the client and keeping track of extensions activated during request processing.

func ExtensionsFrom

func ExtensionsFrom(ctx context.Context) (*Extensions, bool)

ExtensionsFrom is a helper function for quick access to Extensions in the current CallContext.

func (*Extensions) Activate

func (e *Extensions) Activate(extension *a2a.AgentExtension)

Activate marks extension as activated in the current CallContext. A list of activated extensions might be attached as response metadata by a transport implementation.

func (*Extensions) ActivatedURIs

func (e *Extensions) ActivatedURIs() []string

ActivatedURIs returns URIs of all extensions activated during call processing.

func (*Extensions) Active

func (e *Extensions) Active(extension *a2a.AgentExtension) bool

Active returns true if an extension has already been activated in the current CallContext using ExtensionContext.Activate.

func (*Extensions) Requested

func (e *Extensions) Requested(extension *a2a.AgentExtension) bool

Requested returns true if the provided extension was requested by the client.

func (*Extensions) RequestedURIs

func (e *Extensions) RequestedURIs() []string

RequestedURIs returns URIs all of all extensions requested by the client.

type InterceptedHandler

type InterceptedHandler struct {
	// Handler is responsible for the actual processing of every call.
	Handler RequestHandler
	// Interceptors is a list of call interceptors which will be applied before and after each call.
	Interceptors []CallInterceptor
	// Logger is the logger which will be accessible from request scope context using log package
	// methods. Defaults to slog.Default() if not set.
	Logger *slog.Logger
}

InterceptedHandler implements RequestHandler. It can be used to attach call interceptors and initialize call context for every method of the wrapped handler.

func (*InterceptedHandler) OnCancelTask

func (h *InterceptedHandler) OnCancelTask(ctx context.Context, params *a2a.TaskIDParams) (*a2a.Task, error)

func (*InterceptedHandler) OnDeleteTaskPushConfig

func (h *InterceptedHandler) OnDeleteTaskPushConfig(ctx context.Context, params *a2a.DeleteTaskPushConfigParams) error

func (*InterceptedHandler) OnGetExtendedAgentCard

func (h *InterceptedHandler) OnGetExtendedAgentCard(ctx context.Context) (*a2a.AgentCard, error)

func (*InterceptedHandler) OnGetTask

func (h *InterceptedHandler) OnGetTask(ctx context.Context, query *a2a.TaskQueryParams) (*a2a.Task, error)

func (*InterceptedHandler) OnGetTaskPushConfig

func (h *InterceptedHandler) OnGetTaskPushConfig(ctx context.Context, params *a2a.GetTaskPushConfigParams) (*a2a.TaskPushConfig, error)

func (*InterceptedHandler) OnListTaskPushConfig

func (h *InterceptedHandler) OnListTaskPushConfig(ctx context.Context, params *a2a.ListTaskPushConfigParams) ([]*a2a.TaskPushConfig, error)

func (*InterceptedHandler) OnResubscribeToTask

func (h *InterceptedHandler) OnResubscribeToTask(ctx context.Context, params *a2a.TaskIDParams) iter.Seq2[a2a.Event, error]

func (*InterceptedHandler) OnSendMessage

func (*InterceptedHandler) OnSendMessageStream

func (h *InterceptedHandler) OnSendMessageStream(ctx context.Context, params *a2a.MessageSendParams) iter.Seq2[a2a.Event, error]

func (*InterceptedHandler) OnSetTaskPushConfig

func (h *InterceptedHandler) OnSetTaskPushConfig(ctx context.Context, params *a2a.TaskPushConfig) (*a2a.TaskPushConfig, error)

type PassthroughCallInterceptor

type PassthroughCallInterceptor struct{}

PassthroughInterceptor can be used by CallInterceptor implementers who don't need all methods. The struct can be embedded for providing a no-op implementation.

func (PassthroughCallInterceptor) After

func (PassthroughCallInterceptor) Before

type PushConfigStore

type PushConfigStore interface {
	// Save creates or updates a push notification configuration for a task. If no ID is set
	// on the provided config, it will have a store-generated ID for the returned config.
	// PushConfig has an ID and a Task can have multiple associated configurations.
	Save(ctx context.Context, taskID a2a.TaskID, config *a2a.PushConfig) (*a2a.PushConfig, error)

	// Get retrieves a push configuration registered for a Task with the given configID.
	Get(ctx context.Context, taskID a2a.TaskID, configID string) (*a2a.PushConfig, error)

	// List retrieves all registered push configurations for a Task. Returning an error stops the execution.
	List(ctx context.Context, taskID a2a.TaskID) ([]*a2a.PushConfig, error)

	// Delete removes a push configuration registered for a Task with the given configID.
	Delete(ctx context.Context, taskID a2a.TaskID, configID string) error

	// DeleteAll removes all registered push configurations of a Task.
	DeleteAll(ctx context.Context, taskID a2a.TaskID) error
}

PushConfigStore manages push notification configurations for tasks.

type PushSender

type PushSender interface {
	// SendPush sends a push notification containing the latest task state. If an error is returned execution is stopped.
	SendPush(ctx context.Context, config *a2a.PushConfig, task *a2a.Task) error
}

PushSender defines the interface for sending push notifications about task state changes to external endpoints.

type ReferencedTasksLoader

type ReferencedTasksLoader struct {
	Store TaskStore
}

ReferencedTasksLoader implements RequestContextInterceptor. It populates RelatedTasks field of RequestContext with Tasks referenced in the ReferenceTasks field of the Message which triggered the agent execution.

func (*ReferencedTasksLoader) Intercept

func (ri *ReferencedTasksLoader) Intercept(ctx context.Context, reqCtx *RequestContext) (context.Context, error)

type Request

type Request struct {
	// Payload is one of a2a package core types. It is nil when a request does not have any parameters.
	Payload any
}

Request represents a transport-agnostic request received by the A2A server.

type RequestContext

type RequestContext struct {
	// A message which triggered the execution. nil for cancelation request.
	Message *a2a.Message
	// TaskID is an ID of the task or a newly generated UUIDv4 in case Message did not reference any Task.
	TaskID a2a.TaskID
	// StoredTask is present if request message specified a TaskID.
	StoredTask *a2a.Task
	// RelatedTasks can be present when Message includes Task references and RequestContextBuilder is configured to load them.
	RelatedTasks []*a2a.Task
	// ContextID is a server-generated identifier for maintaining context across multiple related tasks or interactions. Matches the Task ContextID.
	ContextID string
	// Metadata of the request which triggered the call.
	Metadata map[string]any
}

RequestContext provides information about an incoming A2A request to AgentExecutor.

func (*RequestContext) TaskInfo

func (rc *RequestContext) TaskInfo() a2a.TaskInfo

type RequestContextInterceptor

type RequestContextInterceptor interface {
	// Intercept has a chance to modify a RequestContext before it gets passed to AgentExecutor.
	Intercept(ctx context.Context, reqCtx *RequestContext) (context.Context, error)
}

RequestContextInterceptor defines an extension point for modifying request contexts that contain the information needed by AgentExecutor implementations to process incoming requests.

type RequestHandler

type RequestHandler interface {
	// OnGetTask handles the 'tasks/get' protocol method.
	OnGetTask(ctx context.Context, query *a2a.TaskQueryParams) (*a2a.Task, error)

	// OnCancelTask handles the 'tasks/cancel' protocol method.
	OnCancelTask(ctx context.Context, id *a2a.TaskIDParams) (*a2a.Task, error)

	// OnSendMessage handles the 'message/send' protocol method (non-streaming).
	OnSendMessage(ctx context.Context, message *a2a.MessageSendParams) (a2a.SendMessageResult, error)

	// OnResubscribeToTask handles the `tasks/resubscribe` protocol method.
	OnResubscribeToTask(ctx context.Context, id *a2a.TaskIDParams) iter.Seq2[a2a.Event, error]

	// OnSendMessageStream handles the 'message/stream' protocol method (streaming).
	OnSendMessageStream(ctx context.Context, message *a2a.MessageSendParams) iter.Seq2[a2a.Event, error]

	// OnGetTaskPushConfig handles the `tasks/pushNotificationConfig/get` protocol method.
	OnGetTaskPushConfig(ctx context.Context, params *a2a.GetTaskPushConfigParams) (*a2a.TaskPushConfig, error)

	// OnListTaskPushConfig handles the `tasks/pushNotificationConfig/list` protocol method.
	OnListTaskPushConfig(ctx context.Context, params *a2a.ListTaskPushConfigParams) ([]*a2a.TaskPushConfig, error)

	// OnSetTaskPushConfig handles the `tasks/pushNotificationConfig/set` protocol method.
	OnSetTaskPushConfig(ctx context.Context, params *a2a.TaskPushConfig) (*a2a.TaskPushConfig, error)

	// OnDeleteTaskPushConfig handles the `tasks/pushNotificationConfig/delete` protocol method.
	OnDeleteTaskPushConfig(ctx context.Context, params *a2a.DeleteTaskPushConfigParams) error

	// GetAgentCard returns an extended a2a.AgentCard if configured.
	OnGetExtendedAgentCard(ctx context.Context) (*a2a.AgentCard, error)
}

RequestHandler defines a transport-agnostic interface for handling incoming A2A requests.

func NewHandler

func NewHandler(executor AgentExecutor, options ...RequestHandlerOption) RequestHandler

NewHandler creates a new request handler.

type RequestHandlerOption

type RequestHandlerOption func(*InterceptedHandler, *defaultRequestHandler)

RequestHandlerOption can be used to customize the default RequestHandler implementation behavior.

func WithCallInterceptor

func WithCallInterceptor(interceptor CallInterceptor) RequestHandlerOption

WithCallInterceptor adds a CallInterceptor which will be applied to all requests and responses.

func WithClusterMode added in v0.3.4

func WithClusterMode(config ClusterConfig) RequestHandlerOption

WithClusterMode is an experimental feature where work queue is used to distribute tasks across multiple instances.

func WithConcurrencyConfig added in v0.3.3

func WithConcurrencyConfig(config limiter.ConcurrencyConfig) RequestHandlerOption

WithConcurrencyConfig allows to set limits on the number of concurrent executions.

func WithEventQueueManager

func WithEventQueueManager(manager eventqueue.Manager) RequestHandlerOption

WithEventQueueManager overrides eventqueue.Manager with custom implementation

func WithExtendedAgentCard

func WithExtendedAgentCard(card *a2a.AgentCard) RequestHandlerOption

WithExtendedAgentCard sets a static extended authenticated agent card.

func WithExtendedAgentCardProducer

func WithExtendedAgentCardProducer(cardProducer AgentCardProducer) RequestHandlerOption

WithExtendedAgentCardProducer sets a dynamic extended authenticated agent card producer.

func WithLogger

func WithLogger(logger *slog.Logger) RequestHandlerOption

WithLogger sets a custom logger. Request scoped parameters will be attached to this logger on method invocations. Any injected dependency will be able to access the logger using github.com/a2aproject/a2a-go/log package-level functions. If not provided, defaults to slog.Default().

func WithPushNotifications

func WithPushNotifications(store PushConfigStore, sender PushSender) RequestHandlerOption

WithPushNotifications adds support for push notifications. If dependencies are not provided push-related methods will be returning a2a.ErrPushNotificationNotSupported,

func WithRequestContextInterceptor

func WithRequestContextInterceptor(interceptor RequestContextInterceptor) RequestHandlerOption

WithRequestContextInterceptor overrides the default RequestContextInterceptor with a custom implementation.

func WithTaskStore

func WithTaskStore(store TaskStore) RequestHandlerOption

WithTaskStore overrides TaskStore with a custom implementation. If not provided, default to an in-memory implementation.

type RequestMeta

type RequestMeta struct {
	// contains filtered or unexported fields
}

RequestMeta holds the metadata associated with a request, like auth headers and signatures. Custom transport implementations can call WithCallContext to make it accessible during request processing.

func NewRequestMeta

func NewRequestMeta(src map[string][]string) *RequestMeta

NewRequestMeta is a RequestMeta constructor function.

func (*RequestMeta) Get

func (rm *RequestMeta) Get(key string) ([]string, bool)

Get performs a case-insensitive lookup of values for the given key.

func (*RequestMeta) List added in v0.3.2

func (rm *RequestMeta) List() iter.Seq2[string, []string]

List allows to inspect all request meta values.

func (*RequestMeta) With added in v0.3.2

func (rm *RequestMeta) With(additional map[string][]string) *RequestMeta

With allows to create a RequestMeta instance holding the extended set of values.

type Response

type Response struct {
	// Payload is one of a2a package core types. It is nil when Err is set or when a request does not return any value.
	Payload any
	// Err is set to indicate that request processing failed.
	Err error
}

Response represents a transport-agnostic response generated by the A2A server. Payload is one of a2a package core types.

type TaskStore

type TaskStore interface {
	// Save stores a task. Implementations might choose to store event and use the previous known TaskVersion
	// for optimistic concurrency control during updates.
	Save(ctx context.Context, task *a2a.Task, event a2a.Event, prev a2a.TaskVersion) (a2a.TaskVersion, error)

	// Get retrieves a task by ID. If a Task doesn't exist the method should return [a2a.ErrTaskNotFound].
	Get(ctx context.Context, taskID a2a.TaskID) (*a2a.Task, a2a.TaskVersion, error)

	// List retrieves a list of tasks based on the provided request.
	List(ctx context.Context, req *a2a.ListTasksRequest) (*a2a.ListTasksResponse, error)
}

type User

type User interface {
	// Name returns a username.
	Name() string
	// Authenticated returns true if the request was authenticated.
	Authenticated() bool
}

User can be attached to CallContext by authentication middleware.

Directories

Path Synopsis
Package eventqueue provides implementation for in-memory queue management and event processing.
Package eventqueue provides implementation for in-memory queue management and event processing.
Package limiter provides configurations for controlling concurrency limit.
Package limiter provides configurations for controlling concurrency limit.
Package push provides a basic implementation of push notification functionality.
Package push provides a basic implementation of push notification functionality.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL