Skip to content

Commit 069cd58

Browse files
feat(streaming): Add comprehensive Task ID and summary display at end of streaming (#7)
* feat(streaming): Add comprehensive Task ID and summary display at end of streaming - Display Task ID, Context ID, and final status after streaming completes - Show streaming duration and detailed event counts (status updates, artifact updates) - Include final message parts count for comprehensive overview - Works in both normal and raw streaming modes - Add comprehensive test suite with mock A2A client testing both modes - Update documentation in README and example README with streaming summary features Addresses issue #6 by providing essential task information that was previously difficult to access after streaming completion, eliminating the need for additional task list queries. Co-authored-by: Eden Reich <edenreich@users.noreply.github.com> * chore: Nit-picking and cleanups * chore: Cleanups --------- Co-authored-by: claude[bot] <209825114+claude[bot]@users.noreply.github.com> Co-authored-by: Eden Reich <edenreich@users.noreply.github.com>
1 parent bca713f commit 069cd58

4 files changed

Lines changed: 370 additions & 25 deletions

File tree

README.md

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@ A powerful command-line utility for debugging, monitoring, and inspecting A2A se
2525

2626
- **Server Connectivity**: Test connections to A2A servers and retrieve agent information
2727
- **Task Management**: List, filter, and inspect tasks with detailed status information
28+
- **Real-time Streaming**: Submit streaming tasks and monitor real-time agent responses
29+
- **Streaming Summaries**: Summaries with Task IDs, durations, and event counts
2830
- **Conversation History**: View detailed conversation histories and message flows
2931
- **Agent Information**: Retrieve and display agent cards with capabilities
3032
- **Configuration Management**: Set, get, and list configuration values with namespace commands
@@ -123,9 +125,11 @@ a2a config list # List all configuration values
123125
#### Task Commands
124126

125127
```bash
126-
a2a tasks list # List available tasks
127-
a2a tasks get <task-id> # Get detailed task information
128-
a2a tasks history <context-id> # Get conversation history for a context
128+
a2a tasks list # List available tasks
129+
a2a tasks get <task-id> # Get detailed task information
130+
a2a tasks history <context-id> # Get conversation history for a context
131+
a2a tasks submit <message> # Submit a task and get response
132+
a2a tasks submit-streaming <msg> # Submit streaming task with real-time responses and summary
129133
```
130134

131135
#### Server Commands

cli/cli.go

Lines changed: 88 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -696,6 +696,7 @@ var submitStreamingTaskCmd = &cobra.Command{
696696
showRaw, _ := cmd.Flags().GetBool("raw")
697697

698698
messageID := fmt.Sprintf("msg-%d", time.Now().Unix())
699+
startTime := time.Now()
699700

700701
params := adk.MessageSendParams{
701702
Message: adk.Message{
@@ -734,28 +735,83 @@ var submitStreamingTaskCmd = &cobra.Command{
734735
fmt.Printf("Message ID: %s\n", messageID)
735736
fmt.Printf("\n🔄 Streaming responses:\n\n")
736737

738+
var streamingSummary struct {
739+
TaskID string
740+
ContextID string
741+
FinalStatus string
742+
StatusUpdates int
743+
ArtifactUpdates int
744+
TotalEvents int
745+
FinalMessage *adk.Message
746+
}
747+
737748
for event := range eventChan {
749+
streamingSummary.TotalEvents++
750+
751+
eventJSON, err := json.Marshal(event)
752+
if err != nil {
753+
logger.Error("Failed to marshal event", zap.Error(err))
754+
continue
755+
}
756+
757+
var genericEvent map[string]interface{}
758+
if err := json.Unmarshal(eventJSON, &genericEvent); err != nil {
759+
logger.Error("Failed to unmarshal generic event", zap.Error(err))
760+
continue
761+
}
762+
763+
kind, ok := genericEvent["kind"].(string)
764+
if ok {
765+
switch kind {
766+
case "status-update":
767+
streamingSummary.StatusUpdates++
768+
var statusEvent a2a.TaskStatusUpdateEvent
769+
if err := json.Unmarshal(eventJSON, &statusEvent); err == nil {
770+
if streamingSummary.TaskID == "" {
771+
streamingSummary.TaskID = statusEvent.TaskID
772+
}
773+
if streamingSummary.ContextID == "" {
774+
streamingSummary.ContextID = statusEvent.ContextID
775+
}
776+
streamingSummary.FinalStatus = string(statusEvent.Status.State)
777+
if statusEvent.Status.Message != nil {
778+
adkParts := make([]adk.Part, len(statusEvent.Status.Message.Parts))
779+
for i, part := range statusEvent.Status.Message.Parts {
780+
adkParts[i] = adk.Part(part)
781+
}
782+
783+
adkMessage := &adk.Message{
784+
Kind: statusEvent.Status.Message.Kind,
785+
MessageID: statusEvent.Status.Message.MessageID,
786+
Role: statusEvent.Status.Message.Role,
787+
Parts: adkParts,
788+
ContextID: statusEvent.Status.Message.ContextID,
789+
}
790+
streamingSummary.FinalMessage = adkMessage
791+
}
792+
}
793+
case "artifact-update":
794+
streamingSummary.ArtifactUpdates++
795+
var artifactEvent a2a.TaskArtifactUpdateEvent
796+
if err := json.Unmarshal(eventJSON, &artifactEvent); err == nil {
797+
if streamingSummary.TaskID == "" {
798+
streamingSummary.TaskID = artifactEvent.TaskID
799+
}
800+
if streamingSummary.ContextID == "" {
801+
streamingSummary.ContextID = artifactEvent.ContextID
802+
}
803+
}
804+
}
805+
}
806+
738807
if showRaw {
739-
eventJSON, err := json.MarshalIndent(event, "", " ")
808+
eventJSONFormatted, err := json.MarshalIndent(event, "", " ")
740809
if err != nil {
741810
logger.Error("Failed to marshal event", zap.Error(err))
742811
continue
743812
}
744-
fmt.Printf("📡 Raw Event:\n%s\n\n", eventJSON)
813+
fmt.Printf("📡 Raw Event:\n%s\n\n", eventJSONFormatted)
745814
} else {
746-
eventJSON, err := json.Marshal(event)
747-
if err != nil {
748-
logger.Error("Failed to marshal event", zap.Error(err))
749-
continue
750-
}
751-
752-
var genericEvent map[string]interface{}
753-
if err := json.Unmarshal(eventJSON, &genericEvent); err != nil {
754-
logger.Error("Failed to unmarshal generic event", zap.Error(err))
755-
continue
756-
}
757-
758-
kind, ok := genericEvent["kind"].(string)
759815
if !ok {
760816
fmt.Printf("🔔 Unknown Event (no kind field)\n")
761817
continue
@@ -827,18 +883,28 @@ var submitStreamingTaskCmd = &cobra.Command{
827883

828884
default:
829885
fmt.Printf("🔔 Unknown Event Type: %s\n", kind)
830-
if showRaw {
831-
eventJSON, err := json.MarshalIndent(event, "", " ")
832-
if err == nil {
833-
fmt.Printf("%s\n", eventJSON)
834-
}
835-
}
836886
}
837887
fmt.Printf("\n")
838888
}
839889
}
840890

841-
fmt.Printf("✅ Streaming completed!\n")
891+
duration := time.Since(startTime)
892+
893+
fmt.Printf("✅ Streaming completed!\n\n")
894+
fmt.Printf("📋 Streaming Summary:\n")
895+
fmt.Printf(" Task ID: %s\n", streamingSummary.TaskID)
896+
fmt.Printf(" Context ID: %s\n", streamingSummary.ContextID)
897+
fmt.Printf(" Final Status: %s\n", streamingSummary.FinalStatus)
898+
fmt.Printf(" Duration: %s\n", duration.Round(time.Millisecond))
899+
fmt.Printf(" Total Events: %d\n", streamingSummary.TotalEvents)
900+
fmt.Printf(" Status Updates: %d\n", streamingSummary.StatusUpdates)
901+
fmt.Printf(" Artifact Updates: %d\n", streamingSummary.ArtifactUpdates)
902+
903+
if streamingSummary.FinalMessage != nil {
904+
fmt.Printf(" Final Message Parts: %d\n", len(streamingSummary.FinalMessage.Parts))
905+
}
906+
907+
fmt.Printf("\n")
842908
return nil
843909
},
844910
}

0 commit comments

Comments
 (0)