Skip to content

Commit 1e1b126

Browse files
committed
fix: preserve Anthropic usage fields across partial stream deltas
Keep previously reported input and cache token counts when message_delta only includes output_tokens. Add regression tests for DeepSeek Anthropic streaming usage inheritance.
1 parent 95f077f commit 1e1b126

File tree

2 files changed

+149
-46
lines changed

2 files changed

+149
-46
lines changed
Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
using System.Net;
2+
using Chats.BE.Controllers.Users.Usages.Dtos;
3+
using Chats.BE.Services.Models;
4+
using Chats.BE.Services.Models.ChatServices.Anthropic;
5+
using Chats.BE.Services.Models.Dtos;
6+
using Chats.BE.Services.Models.Neutral;
7+
using Chats.BE.UnitTest.ChatServices.Http;
8+
using Chats.DB;
9+
using Chats.DB.Enums;
10+
11+
namespace Chats.BE.UnitTest.ChatServices.Anthropic;
12+
13+
public class DeepSeekAnthropicServiceTests
14+
{
15+
private static IHttpClientFactory CreateMockHttpClientFactory(params string[] chunks)
16+
{
17+
return new FiddlerDumpHttpClientFactory([.. chunks], HttpStatusCode.OK);
18+
}
19+
20+
private static ChatRequest CreateRequest()
21+
{
22+
ModelKey modelKey = new()
23+
{
24+
Id = 1,
25+
Name = "TestKey",
26+
Secret = "test-api-key",
27+
Host = "https://api.deepseek.com/anthropic",
28+
ModelProviderId = (int)DBModelProvider.DeepSeek,
29+
};
30+
31+
Model model = new()
32+
{
33+
Id = 1,
34+
Name = "Test Model",
35+
DeploymentName = "deepseek-reasoner",
36+
ModelKeyId = 1,
37+
ModelKey = modelKey,
38+
AllowStreaming = true,
39+
MaxResponseTokens = 2048,
40+
ApiTypeId = (byte)DBApiType.AnthropicMessages,
41+
};
42+
43+
ChatConfig chatConfig = new()
44+
{
45+
Id = 1,
46+
ModelId = 1,
47+
Model = model,
48+
};
49+
50+
return new ChatRequest
51+
{
52+
Messages = [NeutralMessage.FromUserText("hello")],
53+
ChatConfig = chatConfig,
54+
Source = UsageSource.Api,
55+
Streamed = true,
56+
EndUserId = "8",
57+
};
58+
}
59+
60+
[Fact]
61+
public async Task ChatStreamed_MessageDeltaWithoutInputTokens_PreservesPreviousInputTokens()
62+
{
63+
IHttpClientFactory httpClientFactory = CreateMockHttpClientFactory(
64+
"data: {\"type\":\"message_start\",\"message\":{\"id\":\"msg_1\",\"type\":\"message\",\"role\":\"assistant\",\"model\":\"deepseek-reasoner\",\"content\":[],\"stop_reason\":null,\"stop_sequence\":null,\"usage\":{\"input_tokens\":36,\"output_tokens\":0}}}\n\n",
65+
"data: {\"type\":\"content_block_start\",\"index\":0,\"content_block\":{\"type\":\"text\",\"text\":\"\"}}\n\n",
66+
"data: {\"type\":\"content_block_delta\",\"index\":0,\"delta\":{\"type\":\"text_delta\",\"text\":\"Hello\"}}\n\n",
67+
"data: {\"type\":\"message_delta\",\"delta\":{\"stop_reason\":\"end_turn\",\"stop_sequence\":null},\"usage\":{\"output_tokens\":151}}\n\n",
68+
"data: {\"type\":\"message_stop\"}\n\n"
69+
);
70+
DeepSeekAnthropicService service = new(httpClientFactory);
71+
ChatRequest request = CreateRequest();
72+
73+
List<ChatSegment> segments = [];
74+
await foreach (ChatSegment segment in service.ChatStreamed(request, CancellationToken.None))
75+
{
76+
segments.Add(segment);
77+
}
78+
79+
List<UsageChatSegment> usageSegments = segments.OfType<UsageChatSegment>().ToList();
80+
Assert.Equal(2, usageSegments.Count);
81+
Assert.Equal(36, usageSegments[0].Usage.InputTokens);
82+
Assert.Equal(0, usageSegments[0].Usage.OutputTokens);
83+
Assert.Equal(36, usageSegments[1].Usage.InputTokens);
84+
Assert.Equal(151, usageSegments[1].Usage.OutputTokens);
85+
86+
FinishReasonChatSegment? finishReason = segments.OfType<FinishReasonChatSegment>().LastOrDefault();
87+
Assert.NotNull(finishReason);
88+
Assert.Equal(DBFinishReason.Success, finishReason.FinishReason);
89+
}
90+
91+
[Fact]
92+
public async Task ChatStreamed_MessageDeltaWithoutCacheTokens_PreservesPreviousCacheTokens()
93+
{
94+
IHttpClientFactory httpClientFactory = CreateMockHttpClientFactory(
95+
"data: {\"type\":\"message_start\",\"message\":{\"id\":\"msg_1\",\"type\":\"message\",\"role\":\"assistant\",\"model\":\"deepseek-reasoner\",\"content\":[],\"stop_reason\":null,\"stop_sequence\":null,\"usage\":{\"input_tokens\":36,\"cache_creation_input_tokens\":9,\"cache_read_input_tokens\":7,\"output_tokens\":0}}}\n\n",
96+
"data: {\"type\":\"content_block_start\",\"index\":0,\"content_block\":{\"type\":\"text\",\"text\":\"\"}}\n\n",
97+
"data: {\"type\":\"content_block_delta\",\"index\":0,\"delta\":{\"type\":\"text_delta\",\"text\":\"Hello\"}}\n\n",
98+
"data: {\"type\":\"message_delta\",\"delta\":{\"stop_reason\":\"end_turn\",\"stop_sequence\":null},\"usage\":{\"output_tokens\":151}}\n\n",
99+
"data: {\"type\":\"message_stop\"}\n\n"
100+
);
101+
DeepSeekAnthropicService service = new(httpClientFactory);
102+
ChatRequest request = CreateRequest();
103+
104+
List<UsageChatSegment> usageSegments = [];
105+
await foreach (ChatSegment segment in service.ChatStreamed(request, CancellationToken.None))
106+
{
107+
if (segment is UsageChatSegment usage)
108+
{
109+
usageSegments.Add(usage);
110+
}
111+
}
112+
113+
Assert.Equal(2, usageSegments.Count);
114+
UsageChatSegment finalUsage = usageSegments[1];
115+
Assert.Equal(36, finalUsage.Usage.InputTokens);
116+
Assert.Equal(151, finalUsage.Usage.OutputTokens);
117+
Assert.Equal(7, finalUsage.Usage.CacheTokens);
118+
Assert.Equal(9, finalUsage.Usage.CacheCreationTokens);
119+
}
120+
}

src/BE/web/Services/Models/ChatServices/Anthropic/AnthropicChatService.cs

Lines changed: 29 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ public override async IAsyncEnumerable<ChatSegment> ChatStreamed(ChatRequest req
4141
using Stream stream = await response.Content.ReadAsStreamAsync(cancellationToken);
4242

4343
int toolCallIndex = -1;
44+
ChatTokenUsage? lastKnownUsage = null;
4445
await foreach (SseItem<string> sseItem in SseParser.Create(stream, (_, bytes) => Encoding.UTF8.GetString(bytes)).EnumerateAsync(cancellationToken))
4546
{
4647
if (string.IsNullOrEmpty(sseItem.Data) || sseItem.Data == "[DONE]")
@@ -67,16 +68,8 @@ public override async IAsyncEnumerable<ChatSegment> ChatStreamed(ChatRequest req
6768
if (json.TryGetProperty("message", out JsonElement message) &&
6869
message.TryGetProperty("usage", out JsonElement usage))
6970
{
70-
int inputTokens = usage.TryGetProperty("input_tokens", out JsonElement it) ? it.GetInt32() : 0;
71-
int outputTokens = usage.TryGetProperty("output_tokens", out JsonElement ot) ? ot.GetInt32() : 0;
72-
yield return ChatSegment.FromUsage(new ChatTokenUsage
73-
{
74-
InputTokens = inputTokens,
75-
OutputTokens = outputTokens,
76-
CacheTokens = GetCacheReadTokens(usage),
77-
CacheCreationTokens = GetCacheCreationTokens(usage),
78-
ReasoningTokens = 0
79-
});
71+
lastKnownUsage = MergeUsage(lastKnownUsage, usage);
72+
yield return ChatSegment.FromUsage(lastKnownUsage);
8073
}
8174
break;
8275
}
@@ -195,27 +188,12 @@ public override async IAsyncEnumerable<ChatSegment> ChatStreamed(ChatRequest req
195188
};
196189
}
197190

198-
int inputTokens = 0;
199-
int outputTokens = 0;
200191
JsonElement usageElement = default;
201-
bool hasUsage = json.TryGetProperty("usage", out usageElement);
202-
if (hasUsage)
203-
{
204-
inputTokens = usageElement.TryGetProperty("input_tokens", out JsonElement it) ? it.GetInt32() : 0;
205-
outputTokens = usageElement.TryGetProperty("output_tokens", out JsonElement ot) ? ot.GetInt32() : 0;
206-
}
207-
208-
ChatTokenUsage usage = new()
209-
{
210-
InputTokens = inputTokens,
211-
OutputTokens = outputTokens,
212-
CacheTokens = hasUsage ? GetCacheReadTokens(usageElement) : 0,
213-
CacheCreationTokens = hasUsage ? GetCacheCreationTokens(usageElement) : 0,
214-
};
215-
192+
bool hasUsage = json.TryGetProperty("usage", out usageElement) && usageElement.ValueKind == JsonValueKind.Object;
216193
if (hasUsage)
217194
{
218-
yield return ChatSegment.FromUsage(usage);
195+
lastKnownUsage = MergeUsage(lastKnownUsage, usageElement);
196+
yield return ChatSegment.FromUsage(lastKnownUsage);
219197
}
220198
if (finishReason != null)
221199
{
@@ -240,6 +218,29 @@ public override async IAsyncEnumerable<ChatSegment> ChatStreamed(ChatRequest req
240218
}
241219
}
242220

221+
private static ChatTokenUsage MergeUsage(ChatTokenUsage? previousUsage, JsonElement usage)
222+
{
223+
ChatTokenUsage baseUsage = previousUsage ?? ChatTokenUsage.Zero;
224+
225+
return new ChatTokenUsage
226+
{
227+
InputTokens = GetUsageValueOrFallback(usage, "input_tokens", baseUsage.InputTokens),
228+
OutputTokens = GetUsageValueOrFallback(usage, "output_tokens", baseUsage.OutputTokens),
229+
CacheTokens = GetUsageValueOrFallback(usage, "cache_read_input_tokens", baseUsage.CacheTokens),
230+
CacheCreationTokens = GetUsageValueOrFallback(usage, "cache_creation_input_tokens", baseUsage.CacheCreationTokens),
231+
ReasoningTokens = baseUsage.ReasoningTokens,
232+
};
233+
}
234+
235+
private static int GetUsageValueOrFallback(JsonElement usage, string propertyName, int fallback)
236+
{
237+
return usage.TryGetProperty(propertyName, out JsonElement valueElement) &&
238+
valueElement.ValueKind == JsonValueKind.Number &&
239+
valueElement.TryGetInt32(out int value)
240+
? value
241+
: fallback;
242+
}
243+
243244
/// <summary>
244245
/// Removes the encrypted_content field from web search results as it's very long,
245246
/// cannot be understood by the model, and wastes storage/bandwidth.
@@ -273,24 +274,6 @@ private static string RemoveEncryptedContent(JsonElement json)
273274
return json.ToString();
274275
}
275276

276-
private static int GetCacheReadTokens(JsonElement usage)
277-
{
278-
if (usage.TryGetProperty("cache_read_input_tokens", out JsonElement cacheRead))
279-
{
280-
return cacheRead.GetInt32();
281-
}
282-
return 0;
283-
}
284-
285-
private static int GetCacheCreationTokens(JsonElement usage)
286-
{
287-
if (usage.TryGetProperty("cache_creation_input_tokens", out JsonElement cacheCreation))
288-
{
289-
return cacheCreation.GetInt32();
290-
}
291-
return 0;
292-
}
293-
294277
protected virtual (string url, string apiKey) GetEndpointAndKey(ModelKey modelKey)
295278
{
296279
string url = (modelKey.Host ?? "https://api.anthropic.com").TrimEnd('/');

0 commit comments

Comments
 (0)