Feat: implement processor client#4265
Conversation
|
This PR exceeds the recommended size of 1000 lines. Please make sure you are NOT addressing multiple issues with one PR. Note this PR might be rejected due to its size. |
|
Build Succeeded 🥳 Build Id: b2be2e3c-5487-4a3d-b417-ac9ac7fc84bd The following development artifacts have been built, and will exist for the next 30 days:
A preview of the website (the last 30 builds are retained): To install this version: |
|
This PR exceeds the recommended size of 1000 lines. Please make sure you are NOT addressing multiple issues with one PR. Note this PR might be rejected due to its size. |
|
Build Succeeded 🥳 Build Id: b1aa0ed2-82b5-4cfc-b0fd-026c9e495870 The following development artifacts have been built, and will exist for the next 30 days:
A preview of the website (the last 30 builds are retained): To install this version: |
|
This PR exceeds the recommended size of 1000 lines. Please make sure you are NOT addressing multiple issues with one PR. Note this PR might be rejected due to its size. |
|
Build Succeeded 🥳 Build Id: 7d7be6c9-1e78-467c-972d-c8e940aa18f8 The following development artifacts have been built, and will exist for the next 30 days:
A preview of the website (the last 30 builds are retained): To install this version: |
|
Once implemented on the agones-allocator and agones-extensions, I'll add some metrics around the batch size and some others that could be helpful, this will be on an other PR |
|
Build Failed 😭 Build Id: c3fd6020-5921-4aeb-be2b-e2451fc3c073 Status: FAILURE To get permission to view the Cloud Build view, join the agones-discuss Google Group. |
|
/gcbrun |
|
Build Failed 😭 Build Id: abcb98d1-a146-474c-84a7-8012732f944c Status: FAILURE To get permission to view the Cloud Build view, join the agones-discuss Google Group. |
markmandel
left a comment
There was a problem hiding this comment.
Some nits, but nothing major stands out to me. Nice work!
pkg/processor/doc.go
Outdated
| @@ -0,0 +1,16 @@ | |||
| // Copyright 2022 Google LLC All Rights Reserved. | |||
There was a problem hiding this comment.
| // Copyright 2022 Google LLC All Rights Reserved. | |
| // Copyright 2025 Google LLC All Rights Reserved. |
pkg/processor/pending_request.go
Outdated
| ) | ||
|
|
||
| // pendingRequest represents a request waiting for processing. | ||
| type pendingRequest struct { |
There was a problem hiding this comment.
This could probably be pulled into processor.go ? not sure it needs it's own file.
There was a problem hiding this comment.
Will move that back to processor.go, over seperated it, was not needed 😄
|
Build Failed 😭 Build Id: 73373fbf-8127-46e8-9013-7500f53e34c5 Status: FAILURE To get permission to view the Cloud Build view, join the agones-discuss Google Group. |
|
Just checking before I run through it again - you happy for this to be merged when approved? |
Yep all good 👌🏼 I'll create 2 other PRs to use this processor client on the agones allocator and agones extensions |
|
The last error build was caused by an e2e test that timed-out /gcbrun |
|
Build Succeeded 🥳 Build Id: 38cebd9e-c112-4ffb-bd66-01f2168f563f The following development artifacts have been built, and will exist for the next 30 days:
A preview of the website (the last 30 builds are retained): To install this version: |
|
Build Succeeded 🥳 Build Id: a7a8efcf-0d83-4a4b-92f1-d7dcb23aedc7 The following development artifacts have been built, and will exist for the next 30 days:
A preview of the website (the last 30 builds are retained): To install this version: |
markmandel
left a comment
There was a problem hiding this comment.
Doing another quick runthrough - I'm just picking on naming and library usage nits at this point. Looking good!
pkg/processor/doc.go
Outdated
| // See the License for the specific language governing permissions and | ||
| // limitations under the License. | ||
|
|
||
| // Package processor provides utilities for managing game server client processes |
There was a problem hiding this comment.
This comment feels a little too ambiguous to me.
I figure it should explain the allocation processor system here? at least at a high level? (Although I'm tempted to say, but the whole ascii diagram here with the flow? WDYT?)
There was a problem hiding this comment.
I fully agree ! (It's always frustrating having a doc.go without information haha) 💯
Will also add a diagram 👌🏼
pkg/processor/connection.go
Outdated
| // connectAndRun handles the full connection lifecycle to the processor service | ||
| // It establishes a connection, creates a stream, registers the client, and then | ||
| // delegates to handleStream to process messages until an error or cancellation | ||
| func (p *processorClient) connectAndRun(ctx context.Context) error { |
There was a problem hiding this comment.
Nit:
🤔 I'd argue everything that is *processorClient should be in the same file. Otherwise you have to look across multiple files for the one struct and it's methods.
pkg/processor/config.go
Outdated
| ) | ||
|
|
||
| // Config holds the processor client configuration | ||
| type Config struct { |
There was a problem hiding this comment.
Is the config ever used by anything other than the processorClient? If not, probably makes sense to put it all in the same file.
pkg/processor/processor.go
Outdated
| // processorClient implements ProcessorClient interface | ||
| // | ||
| //nolint:govet // fieldalignment: struct alignment is not critical for our use case | ||
| type processorClient struct { |
There was a problem hiding this comment.
| type processorClient struct { | |
| type client struct { |
Since the package is processor the processor part of the name is redundant.
File name could then also just be client.go.
pkg/processor/stream.go
Outdated
|
|
||
| // handleStream processes incoming messages from the processor stream | ||
| // It listens for pull requests and batch responses, dispatching them to appropriate handlers | ||
| func (p *processorClient) handleStream(ctx context.Context, stream allocationpb.Processor_StreamBatchesClient) error { |
There was a problem hiding this comment.
Nit: same here, since this is processorClient, I'd put this all in client.go
pkg/processor/connection.go
Outdated
| return nil, err | ||
| } | ||
|
|
||
| p.logger.Info("successfully connected to processor") |
There was a problem hiding this comment.
| p.logger.Info("successfully connected to processor") | |
| p.logger.Info("Successfully connected to processor") |
(I know we're not consistent on this across the codebase) - but upper case on log messages to start plz . (Same for others too)
pkg/processor/connection.go
Outdated
| // Connect to the processor | ||
| conn, err := p.connect(ctx) | ||
| if err != nil { | ||
| return fmt.Errorf("failed to connect: %w", err) |
There was a problem hiding this comment.
| return fmt.Errorf("failed to connect: %w", err) | |
| return errors.Wrap(err, "failed to connect") |
Use the errors library plz. Legacy libraries ftw 😄 (same for the others below)
Had a quick look at your feedbacks, totally makes sense, will update that tomorrow after work, thanks for the review ! (Will ping you here once updated) 👌🏼 |
|
@markmandel Updated it following your feedbacks |
|
Build Failed 😭 Build Id: 3e37b15e-7fba-4144-b0d6-6030e1553903 Status: FAILURE To get permission to view the Cloud Build view, join the agones-discuss Google Group. |
|
/gcbrun |
|
Test is failing because this site We should update the link to https://github.com/EmbarkStudios/quilkin/blob/release-0.9.0/docs/src/introduction.md |
Created an issue for it 😄 #4287 Will create a PR soon, will assign you as reviewer |
|
Build Failed 😭 Build Id: acd5f3c9-7093-45ea-9eee-fcae4ca35ae4 Status: FAILURE To get permission to view the Cloud Build view, join the agones-discuss Google Group. |
|
Build Succeeded 🥳 Build Id: afd0caff-1ed6-45d9-915c-de672fa1c19b The following development artifacts have been built, and will exist for the next 30 days:
A preview of the website (the last 30 builds are retained): To install this version: |
markmandel
left a comment
There was a problem hiding this comment.
Close!
Sorry for the delay -- holidays!
| // | ||
| // Flow diagram: | ||
| // | ||
| // ┌─────────────────┐ ┌─────────────────────┐ ┌─────────────────────┐ |
pkg/processor/client.go
Outdated
|
|
||
| batchMutex sync.RWMutex | ||
| // requestIDMapping is a map to correlate request IDs to pendingRequest objects for response handling | ||
| requestIDMapping sync.Map |
There was a problem hiding this comment.
Reading the godoc for sync.Map
The Map type is specialized. Most code should use a plain Go map instead, with separate locking or coordination, for better type safety and to make it easier to maintain other invariants along with the map content.
The Map type is optimized for two common use cases: (1) when the entry for a given key is only ever written once but read many times, as in caches that only grow, or (2) when multiple goroutines read, write, and overwrite entries for disjoint sets of keys. In these two cases, use of a Map may significantly reduce lock contention compared to a Go map paired with a separate Mutex or RWMutex.
🤔 does the usage here hit either of those common use cases? 1, seems like a no, 2. is a maybe?
Would also remove the need for runtime casting if we leaned on a mutex.
There was a problem hiding this comment.
Oooh interesting, I usually use the sync.Map without thinking too much, I'll update that !
|
Build Succeeded 🥳 Build Id: 3d46e746-f658-4616-a38f-3fab4cdbd3d2 The following development artifacts have been built, and will exist for the next 30 days:
A preview of the website (the last 30 builds are retained): To install this version: |
markmandel
left a comment
There was a problem hiding this comment.
Pretty sure this is my final little thing. Just consistency around unique identifiers.
pkg/processor/client.go
Outdated
| func generateRequestID() string { | ||
| return fmt.Sprintf("%d-%d", time.Now().UnixNano(), rand.Int63()) | ||
| } |
There was a problem hiding this comment.
Rather than creating our own uid format, we could reuse the uuid library that K8s uses (and we use too).
There was a problem hiding this comment.
Shame on me, I used that for debugging (had previously some strings within it) and totally forgot to get rid of it, I'll review my PR more deeply for the next ones 😅
There was a problem hiding this comment.
All good - that's why we do reviews! 😄
There was a problem hiding this comment.
@markmandel I've updated it to use the uuid library you mentioned, but I was checking, we could directly use the google one to avoid a cast to string https://github.com/kubernetes/apimachinery/blob/v0.34.1/pkg/util/uuid/uuid.go#L26
Depends if we want to stick to this one for consistency with the rest of the code - I don't have a strong opinion, but I like consistency though 😄
|
Build Succeeded 🥳 Build Id: ce7eefb4-6fd5-412e-9aca-e09d8f545258 The following development artifacts have been built, and will exist for the next 30 days:
A preview of the website (the last 30 builds are retained): To install this version: |
* feat: implement processor client * feat: add unit tests * feat: fix conflicts * feat: fix conflicts * feat: fix conflicts * feat: fix conflicts * feat: remove max retries and change logs * feat: move pending request struct to processor file * feat: update following feedbacks * feat: update locks / unused code and sync.map * feat: use k8s uuid instead of custom one
What type of PR is this?
/kind feature
What this PR does / Why we need it:
Implementation of the processor client which will be used on the allocator and extensions
This contains the connection with retry, checking gRPC healthCheck, handling the batch requests on the client (the pending one that have channels for response / error and the hotBatch that are the batching requests already converted and ready to be sent on pull request)
Which issue(s) this PR fixes:
Part of #4190
Special notes for your reviewer: