Skip to content

Commit 9084161

Browse files
committed
big refactor(be): introduce ChatRunService and move retry/billing out of controllers
add scoped ChatRunService orchestration for paid generation runs move 429 retry logic into service layer (preserve pre-first-yield retry semantics) centralize usage persistence, balance/usage updates, and transaction creation in ChatRunService simplify ChatRunRequest by deriving user and API context from UserModel/ChatRequest.Source switch WebChat + OpenAI/Anthropic/Image API controllers to consume ChatRunService link Step/UserApiUsage via UsageId instead of constructing usage in controllers remove obsolete InChatContext and shared multi-scope UserModelBalanceCalculator keep non-paid utility flows (count tokens/list models/validate model) outside the new run layer add minimal fallback in TurnDto model metadata resolution (Usage.Model -> ChatConfig.Model) to prevent navigation-missing errors
1 parent c49fe38 commit 9084161

12 files changed

Lines changed: 819 additions & 947 deletions

File tree

src/BE/web/Controllers/Api/AnthropicCompatible/AnthropicMessagesController.cs

Lines changed: 46 additions & 86 deletions
Original file line numberDiff line numberDiff line change
@@ -3,42 +3,34 @@
33
using Chats.BE.Controllers.Api.AnthropicCompatible.Dtos;
44
using Chats.BE.Controllers.Chats.Chats;
55
using Chats.BE.Services;
6-
using Chats.BE.Services.FileServices;
76
using Chats.BE.Services.Models;
87
using Chats.BE.Services.Models.ChatServices;
98
using Chats.BE.Services.Models.Dtos;
109
using Chats.BE.Services.OpenAIApiKeySession;
1110
using Microsoft.AspNetCore.Authorization;
1211
using Microsoft.AspNetCore.Mvc;
1312
using Microsoft.EntityFrameworkCore;
14-
using System.Diagnostics;
1513
using System.Text.Json;
1614
using System.Text.Json.Nodes;
17-
using Chats.BE.Services.Options;
18-
using Microsoft.Extensions.Options;
1915

2016
namespace Chats.BE.Controllers.Api.AnthropicCompatible;
2117

2218
[Authorize(AuthenticationSchemes = "OpenAIApiKey")]
2319
public class AnthropicMessagesController(
2420
ChatsDB db,
2521
CurrentApiKey currentApiKey,
26-
ChatFactory cf,
22+
ChatRunService chatRunService,
2723
UserModelManager userModelManager,
2824
ILogger<AnthropicMessagesController> logger,
29-
BalanceService balanceService,
30-
FileUrlProvider fup,
3125
ClientInfoManager clientInfoManager) : ControllerBase
3226
{
3327
private static readonly DBApiType[] AllowedApiTypes = [DBApiType.OpenAIChatCompletion, DBApiType.OpenAIResponse, DBApiType.AnthropicMessages];
3428

3529
[HttpPost("v1/messages")]
3630
public async Task<ActionResult> CreateMessage(
3731
[FromBody] JsonObject json,
38-
[FromServices] IOptions<ChatOptions> chatOptions,
3932
CancellationToken cancellationToken)
4033
{
41-
InChatContext icc = new(Stopwatch.GetTimestamp());
4234
AnthropicRequestWrapper request = new(json);
4335

4436
if (!request.SeemsValid())
@@ -63,24 +55,14 @@ public async Task<ActionResult> CreateMessage(
6355
return ErrorMessage(AnthropicErrorTypes.InvalidRequestError, $"The model `{request.Model}` does not support messages API.");
6456
}
6557

66-
int? retry429Times = chatOptions.Value.Retry429Times;
67-
return await ProcessMessage(request, userModel, icc, retry429Times, cancellationToken);
58+
return await ProcessMessage(request, userModel, cancellationToken);
6859
}
6960

7061
private async Task<ActionResult> ProcessMessage(
7162
AnthropicRequestWrapper request,
7263
UserModel userModel,
73-
InChatContext icc,
74-
int? retry429Times,
7564
CancellationToken cancellationToken)
7665
{
77-
Model cm = userModel.Model;
78-
ChatService s = cf.CreateChatService(cm);
79-
UserBalance userBalance = await db.UserBalances
80-
.Where(x => x.UserId == currentApiKey.User.Id)
81-
.FirstOrDefaultAsync(cancellationToken) ?? throw new InvalidOperationException("User balance not found.");
82-
UserModelBalanceCalculator calc = new(BalanceInitialInfo.FromDB([userModel], userBalance.Balance), []);
83-
ScopedBalanceCalculator scopedCalc = calc.WithScoped("0");
8466
ActionResult? errorToReturn = null;
8567
bool hasSuccessYield = false;
8668
string messageId = $"msg_{Guid.NewGuid():N}";
@@ -89,12 +71,16 @@ private async Task<ActionResult> ProcessMessage(
8971
StreamingState streamingState = new();
9072
bool messageStarted = false;
9173

92-
try
93-
{
94-
ChatRequest csr = request.ToChatRequest(currentApiKey.User.Id.ToString(), cm);
95-
96-
await foreach (ChatSegment segment in icc.Run(scopedCalc, userModel, s, csr, fup, retry429Times, cancellationToken))
74+
ChatRequest csr = request.ToChatRequest(currentApiKey.User.Id.ToString(), userModel.Model);
75+
ChatRunResult runResult = await chatRunService.RunAsync(
76+
new ChatRunRequest
9777
{
78+
UserModel = userModel,
79+
ChatRequest = csr,
80+
},
81+
async (segmentContext, ct) =>
82+
{
83+
ChatSegment segment = segmentContext.Segment;
9884
if (request.Streamed)
9985
{
10086
if (!hasSuccessYield)
@@ -109,89 +95,64 @@ private async Task<ActionResult> ProcessMessage(
10995
{
11096
if (!messageStarted)
11197
{
112-
await YieldEvent("message_start", CreateMessageStartEvent(request.Model!, messageId, usageSegment.Usage.InputTokens), cancellationToken);
113-
await YieldEvent("ping", new PingEvent(), cancellationToken);
98+
await YieldEvent("message_start", CreateMessageStartEvent(request.Model!, messageId, usageSegment.Usage.InputTokens), ct);
99+
await YieldEvent("ping", new PingEvent(), ct);
114100
messageStarted = true;
115101
hasSuccessYield = true;
116102
}
117-
continue;
103+
return;
118104
}
119105

120106
if (!messageStarted)
121107
{
122-
await YieldEvent("message_start", CreateMessageStartEvent(request.Model!, messageId, 0), cancellationToken);
123-
await YieldEvent("ping", new PingEvent(), cancellationToken);
108+
await YieldEvent("message_start", CreateMessageStartEvent(request.Model!, messageId, 0), ct);
109+
await YieldEvent("ping", new PingEvent(), ct);
124110
messageStarted = true;
125111
hasSuccessYield = true;
126112
}
127113

128-
await ProcessStreamingItem(segment, streamingState, cancellationToken);
114+
await ProcessStreamingItem(segment, streamingState, ct);
129115
}
130116

131-
if (cancellationToken.IsCancellationRequested)
117+
if (ct.IsCancellationRequested)
132118
{
133119
throw new TaskCanceledException();
134120
}
135-
}
136-
137-
// Send final events for streaming
138-
if (request.Streamed && hasSuccessYield && icc.FinishReason != DBFinishReason.Cancelled)
139-
{
140-
// Close any open content block
141-
if (streamingState.CurrentBlockIndex >= 0)
142-
{
143-
await YieldEvent("content_block_stop", new ContentBlockStopEvent { Index = streamingState.CurrentBlockIndex }, cancellationToken);
144-
}
121+
},
122+
cancellationToken);
145123

146-
// Send message_delta with stop_reason
147-
await YieldEvent("message_delta", icc.FullResponse!.ToMessageDeltaEvent(), cancellationToken);
148-
149-
// Send message_stop
150-
await YieldEvent("message_stop", new MessageStopEvent(), cancellationToken);
151-
}
152-
}
153-
catch (RawChatServiceException rawEx)
154-
{
155-
icc.FinishReason = rawEx.ErrorCode;
156-
logger.LogError(rawEx, "Upstream error: {StatusCode}", rawEx.StatusCode);
157-
errorToReturn = await YieldRawError(hasSuccessYield && request.Streamed, rawEx.StatusCode, rawEx.Body, cancellationToken);
158-
}
159-
catch (ChatServiceException cse)
160-
{
161-
icc.FinishReason = cse.ErrorCode;
162-
errorToReturn = await YieldError(hasSuccessYield && request.Streamed, MapFinishReasonToErrorType(cse.ErrorCode), cse.Message, cancellationToken);
163-
}
164-
catch (TaskCanceledException)
124+
switch (runResult.Exception)
165125
{
166-
icc.FinishReason = DBFinishReason.Cancelled;
167-
}
168-
catch (Exception e)
169-
{
170-
icc.FinishReason = DBFinishReason.UnknownError;
171-
logger.LogError(e, "Unknown error");
172-
errorToReturn = await YieldError(hasSuccessYield && request.Streamed, AnthropicErrorTypes.ApiError, "Internal server error", cancellationToken);
126+
case RawChatServiceException rawEx:
127+
logger.LogError(rawEx, "Upstream error: {StatusCode}", rawEx.StatusCode);
128+
errorToReturn = await YieldRawError(hasSuccessYield && request.Streamed, rawEx.StatusCode, rawEx.Body, cancellationToken);
129+
break;
130+
case ChatServiceException cse:
131+
errorToReturn = await YieldError(hasSuccessYield && request.Streamed, MapFinishReasonToErrorType(cse.ErrorCode), cse.Message, cancellationToken);
132+
break;
133+
case Exception e when e is not TaskCanceledException:
134+
logger.LogError(e, "Unknown error");
135+
errorToReturn = await YieldError(hasSuccessYield && request.Streamed, AnthropicErrorTypes.ApiError, "Internal server error", cancellationToken);
136+
break;
173137
}
174-
finally
138+
139+
if (request.Streamed && hasSuccessYield && runResult.FinishReason != DBFinishReason.Cancelled)
175140
{
176-
cancellationToken = CancellationToken.None;
141+
if (streamingState.CurrentBlockIndex >= 0)
142+
{
143+
await YieldEvent("content_block_stop", new ContentBlockStopEvent { Index = streamingState.CurrentBlockIndex }, cancellationToken);
144+
}
145+
146+
await YieldEvent("message_delta", runResult.FullResponse.ToMessageDeltaEvent(), cancellationToken);
147+
await YieldEvent("message_stop", new MessageStopEvent(), cancellationToken);
177148
}
178149

179-
// Save usage
180-
UserApiUsage usage = new()
150+
db.UserApiUsages.Add(new UserApiUsage
181151
{
182152
ApiKeyId = currentApiKey.ApiKeyId,
183-
Usage = icc.ToUserModelUsage(currentApiKey.User.Id, scopedCalc, userModel, await clientInfoManager.GetClientInfoId(), isApi: true),
184-
};
185-
db.UserApiUsages.Add(usage);
186-
await db.SaveChangesAsync(cancellationToken);
187-
if (calc.BalanceCost > 0)
188-
{
189-
_ = balanceService.AsyncUpdateBalance(currentApiKey.User.Id, CancellationToken.None);
190-
}
191-
if (calc.UsageCosts.Any())
192-
{
193-
_ = balanceService.AsyncUpdateUsage([userModel!.Id], CancellationToken.None);
194-
}
153+
UsageId = runResult.UserModelUsageId,
154+
});
155+
await db.SaveChangesAsync(CancellationToken.None);
195156

196157
if (hasSuccessYield && request.Streamed)
197158
{
@@ -203,8 +164,7 @@ private async Task<ActionResult> ProcessMessage(
203164
}
204165
else
205166
{
206-
// Non-streamed success response
207-
AnthropicResponse response = icc.FullResponse!.ToAnthropicResponse(request.Model!, messageId);
167+
AnthropicResponse response = runResult.FullResponse.ToAnthropicResponse(request.Model!, messageId);
208168
return Ok(response);
209169
}
210170
}

0 commit comments

Comments
 (0)