33using Chats . BE . Controllers . Api . AnthropicCompatible . Dtos ;
44using Chats . BE . Controllers . Chats . Chats ;
55using Chats . BE . Services ;
6- using Chats . BE . Services . FileServices ;
76using Chats . BE . Services . Models ;
87using Chats . BE . Services . Models . ChatServices ;
98using Chats . BE . Services . Models . Dtos ;
109using Chats . BE . Services . OpenAIApiKeySession ;
1110using Microsoft . AspNetCore . Authorization ;
1211using Microsoft . AspNetCore . Mvc ;
1312using Microsoft . EntityFrameworkCore ;
14- using System . Diagnostics ;
1513using System . Text . Json ;
1614using System . Text . Json . Nodes ;
17- using Chats . BE . Services . Options ;
18- using Microsoft . Extensions . Options ;
1915
2016namespace Chats . BE . Controllers . Api . AnthropicCompatible ;
2117
2218[ Authorize ( AuthenticationSchemes = "OpenAIApiKey" ) ]
2319public class AnthropicMessagesController (
2420 ChatsDB db ,
2521 CurrentApiKey currentApiKey ,
26- ChatFactory cf ,
22+ ChatRunService chatRunService ,
2723 UserModelManager userModelManager ,
2824 ILogger < AnthropicMessagesController > logger ,
29- BalanceService balanceService ,
30- FileUrlProvider fup ,
3125 ClientInfoManager clientInfoManager ) : ControllerBase
3226{
3327 private static readonly DBApiType [ ] AllowedApiTypes = [ DBApiType . OpenAIChatCompletion , DBApiType . OpenAIResponse , DBApiType . AnthropicMessages ] ;
3428
3529 [ HttpPost ( "v1/messages" ) ]
3630 public async Task < ActionResult > CreateMessage (
3731 [ FromBody ] JsonObject json ,
38- [ FromServices ] IOptions < ChatOptions > chatOptions ,
3932 CancellationToken cancellationToken )
4033 {
41- InChatContext icc = new ( Stopwatch . GetTimestamp ( ) ) ;
4234 AnthropicRequestWrapper request = new ( json ) ;
4335
4436 if ( ! request . SeemsValid ( ) )
@@ -63,24 +55,14 @@ public async Task<ActionResult> CreateMessage(
6355 return ErrorMessage ( AnthropicErrorTypes . InvalidRequestError , $ "The model `{ request . Model } ` does not support messages API.") ;
6456 }
6557
66- int ? retry429Times = chatOptions . Value . Retry429Times ;
67- return await ProcessMessage ( request , userModel , icc , retry429Times , cancellationToken ) ;
58+ return await ProcessMessage ( request , userModel , cancellationToken ) ;
6859 }
6960
7061 private async Task < ActionResult > ProcessMessage (
7162 AnthropicRequestWrapper request ,
7263 UserModel userModel ,
73- InChatContext icc ,
74- int ? retry429Times ,
7564 CancellationToken cancellationToken )
7665 {
77- Model cm = userModel . Model ;
78- ChatService s = cf . CreateChatService ( cm ) ;
79- UserBalance userBalance = await db . UserBalances
80- . Where ( x => x . UserId == currentApiKey . User . Id )
81- . FirstOrDefaultAsync ( cancellationToken ) ?? throw new InvalidOperationException ( "User balance not found." ) ;
82- UserModelBalanceCalculator calc = new ( BalanceInitialInfo . FromDB ( [ userModel ] , userBalance . Balance ) , [ ] ) ;
83- ScopedBalanceCalculator scopedCalc = calc . WithScoped ( "0" ) ;
8466 ActionResult ? errorToReturn = null ;
8567 bool hasSuccessYield = false ;
8668 string messageId = $ "msg_{ Guid . NewGuid ( ) : N} ";
@@ -89,12 +71,16 @@ private async Task<ActionResult> ProcessMessage(
8971 StreamingState streamingState = new ( ) ;
9072 bool messageStarted = false ;
9173
92- try
93- {
94- ChatRequest csr = request . ToChatRequest ( currentApiKey . User . Id . ToString ( ) , cm ) ;
95-
96- await foreach ( ChatSegment segment in icc . Run ( scopedCalc , userModel , s , csr , fup , retry429Times , cancellationToken ) )
74+ ChatRequest csr = request . ToChatRequest ( currentApiKey . User . Id . ToString ( ) , userModel . Model ) ;
75+ ChatRunResult runResult = await chatRunService . RunAsync (
76+ new ChatRunRequest
9777 {
78+ UserModel = userModel ,
79+ ChatRequest = csr ,
80+ } ,
81+ async ( segmentContext , ct ) =>
82+ {
83+ ChatSegment segment = segmentContext . Segment ;
9884 if ( request . Streamed )
9985 {
10086 if ( ! hasSuccessYield )
@@ -109,89 +95,64 @@ private async Task<ActionResult> ProcessMessage(
10995 {
11096 if ( ! messageStarted )
11197 {
112- await YieldEvent ( "message_start" , CreateMessageStartEvent ( request . Model ! , messageId , usageSegment . Usage . InputTokens ) , cancellationToken ) ;
113- await YieldEvent ( "ping" , new PingEvent ( ) , cancellationToken ) ;
98+ await YieldEvent ( "message_start" , CreateMessageStartEvent ( request . Model ! , messageId , usageSegment . Usage . InputTokens ) , ct ) ;
99+ await YieldEvent ( "ping" , new PingEvent ( ) , ct ) ;
114100 messageStarted = true ;
115101 hasSuccessYield = true ;
116102 }
117- continue ;
103+ return ;
118104 }
119105
120106 if ( ! messageStarted )
121107 {
122- await YieldEvent ( "message_start" , CreateMessageStartEvent ( request . Model ! , messageId , 0 ) , cancellationToken ) ;
123- await YieldEvent ( "ping" , new PingEvent ( ) , cancellationToken ) ;
108+ await YieldEvent ( "message_start" , CreateMessageStartEvent ( request . Model ! , messageId , 0 ) , ct ) ;
109+ await YieldEvent ( "ping" , new PingEvent ( ) , ct ) ;
124110 messageStarted = true ;
125111 hasSuccessYield = true ;
126112 }
127113
128- await ProcessStreamingItem ( segment , streamingState , cancellationToken ) ;
114+ await ProcessStreamingItem ( segment , streamingState , ct ) ;
129115 }
130116
131- if ( cancellationToken . IsCancellationRequested )
117+ if ( ct . IsCancellationRequested )
132118 {
133119 throw new TaskCanceledException ( ) ;
134120 }
135- }
136-
137- // Send final events for streaming
138- if ( request . Streamed && hasSuccessYield && icc . FinishReason != DBFinishReason . Cancelled )
139- {
140- // Close any open content block
141- if ( streamingState . CurrentBlockIndex >= 0 )
142- {
143- await YieldEvent ( "content_block_stop" , new ContentBlockStopEvent { Index = streamingState . CurrentBlockIndex } , cancellationToken ) ;
144- }
121+ } ,
122+ cancellationToken ) ;
145123
146- // Send message_delta with stop_reason
147- await YieldEvent ( "message_delta" , icc . FullResponse ! . ToMessageDeltaEvent ( ) , cancellationToken ) ;
148-
149- // Send message_stop
150- await YieldEvent ( "message_stop" , new MessageStopEvent ( ) , cancellationToken ) ;
151- }
152- }
153- catch ( RawChatServiceException rawEx )
154- {
155- icc . FinishReason = rawEx . ErrorCode ;
156- logger . LogError ( rawEx , "Upstream error: {StatusCode}" , rawEx . StatusCode ) ;
157- errorToReturn = await YieldRawError ( hasSuccessYield && request . Streamed , rawEx . StatusCode , rawEx . Body , cancellationToken ) ;
158- }
159- catch ( ChatServiceException cse )
160- {
161- icc . FinishReason = cse . ErrorCode ;
162- errorToReturn = await YieldError ( hasSuccessYield && request . Streamed , MapFinishReasonToErrorType ( cse . ErrorCode ) , cse . Message , cancellationToken ) ;
163- }
164- catch ( TaskCanceledException )
124+ switch ( runResult . Exception )
165125 {
166- icc . FinishReason = DBFinishReason . Cancelled ;
167- }
168- catch ( Exception e )
169- {
170- icc . FinishReason = DBFinishReason . UnknownError ;
171- logger . LogError ( e , "Unknown error" ) ;
172- errorToReturn = await YieldError ( hasSuccessYield && request . Streamed , AnthropicErrorTypes . ApiError , "Internal server error" , cancellationToken ) ;
126+ case RawChatServiceException rawEx :
127+ logger . LogError ( rawEx , "Upstream error: {StatusCode}" , rawEx . StatusCode ) ;
128+ errorToReturn = await YieldRawError ( hasSuccessYield && request . Streamed , rawEx . StatusCode , rawEx . Body , cancellationToken ) ;
129+ break ;
130+ case ChatServiceException cse :
131+ errorToReturn = await YieldError ( hasSuccessYield && request . Streamed , MapFinishReasonToErrorType ( cse . ErrorCode ) , cse . Message , cancellationToken ) ;
132+ break ;
133+ case Exception e when e is not TaskCanceledException :
134+ logger . LogError ( e , "Unknown error" ) ;
135+ errorToReturn = await YieldError ( hasSuccessYield && request . Streamed , AnthropicErrorTypes . ApiError , "Internal server error" , cancellationToken ) ;
136+ break ;
173137 }
174- finally
138+
139+ if ( request . Streamed && hasSuccessYield && runResult . FinishReason != DBFinishReason . Cancelled )
175140 {
176- cancellationToken = CancellationToken . None ;
141+ if ( streamingState . CurrentBlockIndex >= 0 )
142+ {
143+ await YieldEvent ( "content_block_stop" , new ContentBlockStopEvent { Index = streamingState . CurrentBlockIndex } , cancellationToken ) ;
144+ }
145+
146+ await YieldEvent ( "message_delta" , runResult . FullResponse . ToMessageDeltaEvent ( ) , cancellationToken ) ;
147+ await YieldEvent ( "message_stop" , new MessageStopEvent ( ) , cancellationToken ) ;
177148 }
178149
179- // Save usage
180- UserApiUsage usage = new ( )
150+ db . UserApiUsages . Add ( new UserApiUsage
181151 {
182152 ApiKeyId = currentApiKey . ApiKeyId ,
183- Usage = icc . ToUserModelUsage ( currentApiKey . User . Id , scopedCalc , userModel , await clientInfoManager . GetClientInfoId ( ) , isApi : true ) ,
184- } ;
185- db . UserApiUsages . Add ( usage ) ;
186- await db . SaveChangesAsync ( cancellationToken ) ;
187- if ( calc . BalanceCost > 0 )
188- {
189- _ = balanceService . AsyncUpdateBalance ( currentApiKey . User . Id , CancellationToken . None ) ;
190- }
191- if ( calc . UsageCosts . Any ( ) )
192- {
193- _ = balanceService . AsyncUpdateUsage ( [ userModel ! . Id ] , CancellationToken . None ) ;
194- }
153+ UsageId = runResult . UserModelUsageId ,
154+ } ) ;
155+ await db . SaveChangesAsync ( CancellationToken . None ) ;
195156
196157 if ( hasSuccessYield && request . Streamed )
197158 {
@@ -203,8 +164,7 @@ private async Task<ActionResult> ProcessMessage(
203164 }
204165 else
205166 {
206- // Non-streamed success response
207- AnthropicResponse response = icc . FullResponse ! . ToAnthropicResponse ( request . Model ! , messageId ) ;
167+ AnthropicResponse response = runResult . FullResponse . ToAnthropicResponse ( request . Model ! , messageId ) ;
208168 return Ok ( response ) ;
209169 }
210170 }
0 commit comments