feat(spanner): add support of using multiplexed session with ReadOnlyTransactions#10269
feat(spanner): add support of using multiplexed session with ReadOnlyTransactions#10269
Conversation
09cb87e to
cf1ac68
Compare
cf1ac68 to
cab67e9
Compare
09d5513 to
89c64c2
Compare
b381f6c to
703ffae
Compare
olavloite
left a comment
There was a problem hiding this comment.
(First pass, continuing review tomorrow.)
spanner/integration_test.go
Outdated
| if os.Getenv("GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS") == "true" { | ||
| return "true" | ||
| } |
There was a problem hiding this comment.
Not very important, but this seems a bit strange. Why are we returning a standardized string from this method, instead of a bool?
spanner/kokoro/presubmit.sh
Outdated
| # See the License for the specific language governing permissions and | ||
| # limitations under the License.. | ||
|
|
||
| # TODO(deklerk): Add integration tests when it's secure to do so. b/64723143 |
There was a problem hiding this comment.
I think we can remove this TODO now ;-)
spanner/session.go
Outdated
| if sh.client != nil { | ||
| sh.client = nil | ||
| } |
There was a problem hiding this comment.
nit: just remove the nil check, it is safe to always set it to nil
spanner/session.go
Outdated
| if sh.session == nil { | ||
| return nil | ||
| } | ||
| if sh.session.isMultiplexed { |
There was a problem hiding this comment.
I think it would be safer/easier to read if we just check for if sh.client != nil. That also allows us to use the same field for regular sessions if that would ever be handy, and it is clear to anyone reading the code.
spanner/session.go
Outdated
| if sh.client != nil { | ||
| sh.client = nil | ||
| } |
There was a problem hiding this comment.
Here also: Just remove the if check and always set to nil
spanner/session.go
Outdated
| // idleList caches idle session IDs. Session IDs in this list can be | ||
| // allocated for use. | ||
| idleList list.List | ||
| // multiplexedSessions contains the multiplexed sessions |
There was a problem hiding this comment.
nit:
| // multiplexedSessions contains the multiplexed sessions | |
| // multiplexedSession contains the multiplexed session |
spanner/session.go
Outdated
| // multiplexedSessionCreationError is the last error that occurred during multiplexed session | ||
| // creation and is propagated to any waiters waiting for a session. | ||
| multiplexedSessionCreationError error |
There was a problem hiding this comment.
(Adding a comment for now, maybe it turns out to be void after reading the rest of the code)
We should only propagate this error to any waiters for the first creation of a multiplexed session. If refreshing a multiplexed session fails, then we should not propagate that error to the application, as the multiplexed session is likely to be usable for many days still.
There was a problem hiding this comment.
Yes, if refreshing fails it will use existing multiplexed session. Added tests in TestMultiplexSessionWorker
0da6099 to
2e24f7e
Compare
| attrs := p.otConfig.attributeMap | ||
| for _, attr := range attributes { | ||
| attrs = append(attrs, attr) | ||
| } | ||
| m.Add(ctx, val, metric.WithAttributes(attrs...)) |
There was a problem hiding this comment.
Instead of creating a new attribute map here every time, can we just have two attribute maps set on otConfig? One with multiplexed=true and one with multiplexed=false. This method is called for every transaction that is executed, and it feels a bit wasteful to create this map over and over again, when there are only two possible values.
There was a problem hiding this comment.
Added separate maps, and updated tests
spanner/session.go
Outdated
| return p.sc.batchCreateSessions(int32(numSessions), distributeOverChannels, p) | ||
| } | ||
|
|
||
| func (p *sessionPool) getMultiplexedSession(ctx context.Context) error { |
There was a problem hiding this comment.
The name of this method seems confusing to me. It is named get..., but it calls a method called executeCreateMultiplexedSessions(...)
spanner/session.go
Outdated
| if err != nil { | ||
| return err | ||
| } | ||
| go p.sc.executeCreateMultiplexedSessions(ctx, client, p.sc.sessionLabels, p.sc.md, p) |
There was a problem hiding this comment.
Rename this method:
| go p.sc.executeCreateMultiplexedSessions(ctx, client, p.sc.sessionLabels, p.sc.md, p) | |
| go p.sc.executeCreateMultiplexedSession(ctx, client, p.sc.sessionLabels, p.sc.md, p) |
spanner/session.go
Outdated
| // Clear any session creation error. | ||
| if s.isMultiplexed { | ||
| s.pool = p | ||
| p.hc.register(s) |
There was a problem hiding this comment.
Note for self: This seems to register the multiplexed session with the normal health checker. That can be OK, as long as we treat it as a multiplexed session (e.g. no pings)
There was a problem hiding this comment.
Not registering now
spanner/session.go
Outdated
| p.multiplexedSessionCreationError = nil | ||
| p.recordStat(context.Background(), SessionsCount, 1, tagNumSessions, tag.Tag{Key: tagKeyIsMultiplexed, Value: "true"}) | ||
| close(p.mayGetMultiplexedSession) | ||
| p.mayGetMultiplexedSession = make(chan struct{}) |
There was a problem hiding this comment.
Why are we re-creating the channel here?
spanner/sessionclient.go
Outdated
| sc.mu.Unlock() | ||
| if closed { | ||
| err := spannerErrorf(codes.Canceled, "Session client closed") | ||
| trace.TracePrintf(ctx, nil, "Session client closed while creating multiplexed sessions: %v", err) |
There was a problem hiding this comment.
| trace.TracePrintf(ctx, nil, "Session client closed while creating multiplexed sessions: %v", err) | |
| trace.TracePrintf(ctx, nil, "Session client closed while creating a multiplexed session: %v", err) |
| client.Single().ReadRow(context.Background(), "Users", spanner.Key{"alice"}, []string{"email"}) | ||
|
|
||
| attributesNumInUseSessions := append(getAttributes(client.ClientID()), attribute.Key("type").String("num_in_use_sessions")) | ||
| //attributesNumInUseSessions := append(getAttributes(client.ClientID()), attribute.Key("type").String("num_in_use_sessions")) |
There was a problem hiding this comment.
nit: remove commented code
spanner/session.go
Outdated
| func (hc *healthChecker) healthCheck(s *session) { | ||
| defer hc.markDone(s) | ||
| // If the session is multiplexed and has been idle for more than 7 days, | ||
| if s.isMultiplexed && s.createTime.Add(multiplexSessionIdleTime).Before(time.Now()) { |
There was a problem hiding this comment.
This should not be part of the standard health-check feature for regular sessions. This method is called by a worker that looks for sessions that need a ping. That logic is not the right logic that should determine whether we need to check whether the multiplexed session should be refreshed. It is probably better to put it in a separate method that only takes care of refreshing multiplexed sessions.
Also, we should ensure that:
- The method (by default) does not need to be called very often. We are checking whether the session is more than 7 days old. That is something that can be checked at an interval of 10 minutes or something like that.
- We should only have one
createMultiplexedSessioncall in-flight at any time.
There was a problem hiding this comment.
- Created separate worker
multiplexSessionWorkerto refresh session with 10 minute interval. - Added test
TestClient_MultiplexedSessionto validate one in-flight reqcreateMultiplexedSessionat any time.
8a0e84c to
fcb133a
Compare
spanner/client_test.go
Outdated
| }, | ||
| validate: func(server InMemSpannerServer) { | ||
| // Validate the multiplexed session is used | ||
| expected := map[string]interface{}{ |
There was a problem hiding this comment.
Here and in a couple of the other validations: Can we replace the expected map with just a simple uint variable, as that is the only thing that is put into the map?
spanner/client_test.go
Outdated
| if !isMultiplexEnabled { | ||
| expected["SessionsCount"] = uint(25) // BatchCreateSession request from regular session pool | ||
| } | ||
| if !testEqual(expected["SessionsCount"], server.TotalSessionsCreated()) { |
There was a problem hiding this comment.
This check is going to be flaky in its current form (at least, I hope it will be), because:
- We always initiate the creation of both regular and multiplexed sessions if multiplexed sessions are enabled directly at startup.
- We by default create 4 * 25 regular sessions. This happens in the background.
- If multiplexed sessions are enabled, then we also create one multiplexed session. This happens in the background.
- Depending on what request is handled when, and whether mux sessions are enabled, then the number of sessions created will be 1, 25, 26, 50, 51, 75, 76, 100 or 101.
There was a problem hiding this comment.
- We always initiate the creation of both regular and multiplexed sessions if multiplexed sessions are enabled directly at startup.
Not true, regular sessions will only be created when number of requests waiting for regular session > minOpened, value for minOpened is 0 for setupMockedTestServer so as long as in test we only use ReadOnly txn it won't trigger regular session creation
We by default create 4 * 25 regular sessions. This happens in the background.
Not true, we only create regular sessions when there are pending requests for regular session given MinOpened=0 config, which is the case here.
Depending on what request is handled when, and whether mux sessions are enabled, then the number of sessions created will be 1, 25, 26, 50, 51, 75, 76, 100 or 101.
For the test cases either it will be 1(in case test is making R/O txn only) or 26(in case its doing R/W txn)
spanner/client_test.go
Outdated
| }, | ||
| validate: func(server InMemSpannerServer) { | ||
| // Validate the regular session is used | ||
| if !testEqual(uint(25), server.TotalSessionsCreated()) { |
There was a problem hiding this comment.
Note that also here the number of sessions is not deterministic, as we have 4 BatchCreateSessions requests running in parallel.
There was a problem hiding this comment.
Note that also here the number of sessions is not deterministic, as we have 4 BatchCreateSessions requests running in parallel.
No, it will only create that many sessions which are needed which is 25(when mux disabled), 1 when enabled (because MinOpened=0 here)
spanner/integration_test.go
Outdated
| } | ||
|
|
||
| func getMultiplexEnableFlag() bool { | ||
| if os.Getenv("GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS") == "true" { |
There was a problem hiding this comment.
nit: You can just do return os.Getenv("...") == "true"
spanner/oc_test.go
Outdated
| "open_session_count": "25", | ||
| }, | ||
| "true": { | ||
| "open_session_count": "1", |
There was a problem hiding this comment.
This here also surprises me a bit. Are we not creating any regular sessions if multiplexed sessions are enabled? I would have expected us to create both in parallel, as we need regular sessions for read/write transactions.
There was a problem hiding this comment.
Since MinOpened in the test is 0 no we don't create regular session at start we wait for requests which need it.
spanner/session.go
Outdated
| // multiplexedSessionInUse is the number of transactions using multiplexed sessions. | ||
| multiplexedSessionInUse uint64 |
There was a problem hiding this comment.
nit:
| // multiplexedSessionInUse is the number of transactions using multiplexed sessions. | |
| multiplexedSessionInUse uint64 | |
| // numTransactionsUsingMultiplexedSession is the number of transactions using the multiplexed session. | |
| numTransactionsUsingMultiplexedSession uint64 |
spanner/session.go
Outdated
| // maxMultiplexedSessionInUse is the maximum number of multiplexed sessions in use concurrently in the | ||
| // current 10 minute interval. | ||
| maxMultiplexedSessionInUse uint64 |
There was a problem hiding this comment.
Same here as above regarding naming.
But: Do we need these two numbers? What do they tell us and/or the user?
There was a problem hiding this comment.
Removed this since it is not adding any real benefit to customer other than drop in InUse and maxInUse metrics on customer when using multiplexed session.
spanner/session.go
Outdated
| if isMultiplexed { | ||
| // Ignore the error if multiplexed session already present | ||
| if p.multiplexedSession != nil { | ||
| p.multiplexedSession.checkingHealth = false |
spanner/session.go
Outdated
| if p.multiplexedSession != nil { | ||
| p.multiplexedSession.checkingHealth = false | ||
| p.multiplexedSessionCreationError = nil | ||
| p.mayGetMultiplexedSession <- true |
There was a problem hiding this comment.
Is this needed? If there already was a multiplexed session, then there should be no waiters, meaning that we should also not need to send this signal, right?
There was a problem hiding this comment.
yes this is to unblock maintainer to continue check, if we don't send here it will keep on waiting and maintainer will go blocked
spanner/session.go
Outdated
| } | ||
|
|
||
| func (p *sessionPool) createMultiplexedSession(ctx context.Context) error { | ||
| for c := range p.multiplexedSessionsReq { |
There was a problem hiding this comment.
It took some time for me to understand what this does, but I think I now understand it:
- During session pool initialization, we call
go createMultiplexedSession(..). - That goroutine is then blocked on this
forstatement until there is a request for a multiplexed session. - The request can come from a read-only transaction (if multiplexed sessions are enabled), or from the background worker keeping the multiplexed session fresh.
That means that:
- The multiplexed session creation is always on the critical path of the first read-only transaction.
- The goroutine is stuck here forever if there is never a request for a multiplexed session.
I don't think that is what we would want. Instead, we should try to:
- Always start the creation of a multiplexed session as soon as possible if multiplexed sessions are enabled. This means that the creation will not be on the critical path of the first read-only transaction.
- Not call this method at all if multiplexed sessions are disabled.
I think that many of my above comments are also a direct consequence of this behavior.
There was a problem hiding this comment.
Always start the creation of a multiplexed session as soon as possible if multiplexed sessions are enabled. This means that the creation will not be on the critical path of the first read-only transaction.
I updated to trigger createMultiplexedSession upon session pool init
Not call this method at all if multiplexed sessions are disabled.
Updated, now we spawn this background thread only when mux is enabled
90fff40 to
c58eff5
Compare
spanner/client_test.go
Outdated
| expectedSessionCount = uint(25) // BatchCreateSession request from regular session pool | ||
| } | ||
| if !testEqual(expectedSessionCount, server.TotalSessionsCreated()) { | ||
| t.Errorf("TestClient_MultiplexedSession expected session creation with multiplexed=%s should be=%v, got: %v", strconv.FormatBool(isMultiplexEnabled), 25, server.TotalSessionsCreated()) |
There was a problem hiding this comment.
nit:
| t.Errorf("TestClient_MultiplexedSession expected session creation with multiplexed=%s should be=%v, got: %v", strconv.FormatBool(isMultiplexEnabled), 25, server.TotalSessionsCreated()) | |
| t.Errorf("TestClient_MultiplexedSession expected session creation with multiplexed=%s should be=%v, got: %v", strconv.FormatBool(isMultiplexEnabled), expectedSessionCount, server.TotalSessionsCreated()) |
spanner/client_test.go
Outdated
| switch s.(type) { | ||
| case *sppb.ReadRequest: | ||
| req, _ := s.(*sppb.ReadRequest) | ||
| // Validate the session is not multiplexed |
There was a problem hiding this comment.
| // Validate the session is not multiplexed | |
| // Verify that a multiplexed session is used when that is enabled. |
| client.Single().ReadRow(context.Background(), "Users", Key{"alice"}, []string{"email"}) | ||
|
|
||
| expectedSpans := 2 | ||
| if isMultiplexEnabled { |
There was a problem hiding this comment.
nit: Would you mind adding a comment here that explains which span will be skipped when multiplexed sessions are enabled?
spanner/oc_test.go
Outdated
| } | ||
|
|
||
| func TestOCStats_SessionPool_GetSessionTimeoutsCount(t *testing.T) { | ||
| if isMultiplexEnabled { |
There was a problem hiding this comment.
I don't think we should skip this test when multiplexed sessions are enabled (or otherwise, we should add a separate test for multiplexed sessions). This test verifies the behavior of the client if for whatever reason the creation of session(s) is taking longer than expected. That could also happen with multiplexed sessions.
spanner/session.go
Outdated
| // Defaults to 50m. | ||
| HealthCheckInterval time.Duration | ||
|
|
||
| // MultiplexSessionCheckInterval is the interval at which the multiplexed session is refreshed. |
There was a problem hiding this comment.
| // MultiplexSessionCheckInterval is the interval at which the multiplexed session is refreshed. | |
| // MultiplexSessionCheckInterval is the interval at which the multiplexed session is checked whether it needs to be refreshed. |
spanner/session.go
Outdated
| // healthCheck checks the health of the session and pings it if needed. | ||
| func (hc *healthChecker) healthCheck(s *session) { | ||
| defer hc.markDone(s) | ||
| // If the session is multiplexed and has been idle for more than 7 days, |
spanner/session.go
Outdated
| createTime = hc.pool.multiplexedSession.createTime | ||
| } | ||
| hc.pool.mu.Unlock() | ||
| ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) |
There was a problem hiding this comment.
Do I understand it correctly that this is the deadline that we will use for creating a new multiplexed session? If so, then we should increase the timeout significantly. Maybe 30 seconds?
| SessionPoolConfig: SessionPoolConfig{ | ||
| MaxIdle: 10, | ||
| MaxOpened: 10, | ||
| healthCheckSampleInterval: 10 * time.Millisecond, |
There was a problem hiding this comment.
Yes because of this unit tests are taking several minutes to complete, change is unrelated to the feature but is an improvement.
spanner/session_test.go
Outdated
| // Will cause session creation RPC to be fail. | ||
| server.TestSpanner.PutExecutionTime(MethodCreateSession, | ||
| SimulatedExecutionTime{ | ||
| Errors: []error{status.Errorf(codes.Unavailable, "try later")}, |
There was a problem hiding this comment.
Unavailable will be retried by Gax, so it won't bubble up to the client library. Instead, it will just seem like it takes a longer time. So I think that the error code that is used by this test should be something else.
spanner/sessionclient_test.go
Outdated
| if isMultiplexEnabled { | ||
| // The multiplexed session creation will use one of the connections | ||
| if c != 2 { | ||
| t.Fatalf("connection %q used an unexpected number of times\ngot: %v\nwant %v", a, c, 1) |
There was a problem hiding this comment.
| t.Fatalf("connection %q used an unexpected number of times\ngot: %v\nwant %v", a, c, 1) | |
| t.Fatalf("connection %q used an unexpected number of times\ngot: %v\nwant %v", a, c, 2) |
🤖 I have created a release *beep* *boop* --- ## [1.66.0](https://togithub.com/googleapis/google-cloud-go/compare/spanner/v1.65.0...spanner/v1.66.0) (2024-08-07) ### Features * **spanner/admin/database:** Add support for Cloud Spanner Incremental Backups ([d949cc0](https://togithub.com/googleapis/google-cloud-go/commit/d949cc0e5d44af62154d9d5fd393f25a852f93ed)) * **spanner:** Add support of multiplexed session support in writeAtleastOnce mutations ([#10646](https://togithub.com/googleapis/google-cloud-go/issues/10646)) ([54009ea](https://togithub.com/googleapis/google-cloud-go/commit/54009eab1c3b11a28531ad9e621917d01c9e5339)) * **spanner:** Add support of using multiplexed session with ReadOnlyTransactions ([#10269](https://togithub.com/googleapis/google-cloud-go/issues/10269)) ([7797022](https://togithub.com/googleapis/google-cloud-go/commit/7797022e51d1ac07b8d919c421a8bfdf34a1d53c)) --- This PR was generated with [Release Please](https://togithub.com/googleapis/release-please). See [documentation](https://togithub.com/googleapis/release-please#release-please).
Uh oh!
There was an error while loading. Please reload this page.