Skip to content

Commit 2f3c918

Browse files
authored
feat(bigtable): add traffic diverter and table shim for bigtable (#14633)
We will read the ClientConfig and call Diverter for how much load we will use for session. We will use TableShim for diverting traffic between classic and session impl.
1 parent 2b4ac42 commit 2f3c918

4 files changed

Lines changed: 594 additions & 0 deletions

File tree

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
// Copyright 2026 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package internal
16+
17+
import (
18+
"math"
19+
"math/rand/v2"
20+
"sync/atomic"
21+
)
22+
23+
// Diverter decides whether to use the session-based protocol or classic protocol.
24+
type Diverter struct {
25+
sessionLoadBits atomic.Uint64
26+
}
27+
28+
// NewDiverter creates a new Diverter with the given session load ratio (0.0 to 1.0).
29+
func NewDiverter(sessionLoad float64) *Diverter {
30+
d := &Diverter{}
31+
d.sessionLoadBits.Store(math.Float64bits(sessionLoad))
32+
return d
33+
}
34+
35+
// SetSessionLoad updates the session load ratio.
36+
func (d *Diverter) SetSessionLoad(load float64) {
37+
d.sessionLoadBits.Store(math.Float64bits(load))
38+
}
39+
40+
// UseSession returns true if the next call should use the session protocol.
41+
func (d *Diverter) UseSession() bool {
42+
load := math.Float64frombits(d.sessionLoadBits.Load())
43+
if load <= 0 {
44+
return false
45+
}
46+
if load >= 1 {
47+
return true
48+
}
49+
return rand.Float64() <= load
50+
}
Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
// Copyright 2026 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package internal
16+
17+
import (
18+
"sync"
19+
"testing"
20+
)
21+
22+
func TestDiverterZeroAndOne(t *testing.T) {
23+
tests := []struct {
24+
name string
25+
sessionLoad float64
26+
expectedRes bool
27+
}{
28+
{
29+
name: "Load is zero",
30+
sessionLoad: 0.0,
31+
expectedRes: false,
32+
},
33+
{
34+
name: "Load is negative",
35+
sessionLoad: -0.5,
36+
expectedRes: false,
37+
},
38+
{
39+
name: "Load is one",
40+
sessionLoad: 1.0,
41+
expectedRes: true,
42+
},
43+
{
44+
name: "Load is greater than one",
45+
sessionLoad: 1.5,
46+
expectedRes: true,
47+
},
48+
}
49+
50+
for _, tc := range tests {
51+
t.Run(tc.name, func(t *testing.T) {
52+
d := NewDiverter(tc.sessionLoad)
53+
for i := 0; i < 1000; i++ {
54+
if got := d.UseSession(); got != tc.expectedRes {
55+
t.Errorf("UseSession() = %v, want %v for load %f", got, tc.expectedRes, tc.sessionLoad)
56+
}
57+
}
58+
})
59+
}
60+
}
61+
62+
func TestDiverterSetSessionLoad(t *testing.T) {
63+
d := NewDiverter(0.0)
64+
if got := d.UseSession(); got != false {
65+
t.Errorf("Expected initial UseSession() to be false, got %v", got)
66+
}
67+
68+
d.SetSessionLoad(1.0)
69+
if got := d.UseSession(); got != true {
70+
t.Errorf("Expected UseSession() to be true after SetSessionLoad(1.0), got %v", got)
71+
}
72+
}
73+
74+
func TestDiverterProbabilistic(t *testing.T) {
75+
load := 0.4
76+
d := NewDiverter(load)
77+
iterations := 10000
78+
trueCount := 0
79+
80+
for i := 0; i < iterations; i++ {
81+
if d.UseSession() {
82+
trueCount++
83+
}
84+
}
85+
86+
expected := int(float64(iterations) * load)
87+
tolerance := 300 // Allow a variance range of +/- 3% of total iterations
88+
89+
if trueCount < expected-tolerance || trueCount > expected+tolerance {
90+
t.Errorf("Expected approximately %d true results (with tolerance %d), got %d", expected, tolerance, trueCount)
91+
}
92+
}
93+
94+
func TestDiverterConcurrentAccess(t *testing.T) {
95+
d := NewDiverter(0.5)
96+
var wg sync.WaitGroup
97+
numWorkers := 20
98+
iterations := 1000
99+
100+
for i := 0; i < numWorkers; i++ {
101+
wg.Add(1)
102+
go func(workerID int) {
103+
defer wg.Done()
104+
for j := 0; j < iterations; j++ {
105+
if j%10 == 0 {
106+
d.SetSessionLoad(float64(workerID) / float64(numWorkers))
107+
}
108+
_ = d.UseSession()
109+
}
110+
}(i)
111+
}
112+
113+
wg.Wait()
114+
}

bigtable/table_shim.go

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
// Copyright 2026 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package bigtable
16+
17+
import (
18+
"context"
19+
20+
internal "cloud.google.com/go/bigtable/internal/transport"
21+
)
22+
23+
// TableShim wraps a classic and a session-based TableAPI and diverts traffic between them.
24+
type TableShim struct {
25+
classic TableAPI
26+
session TableAPI
27+
diverter *internal.Diverter
28+
}
29+
30+
// NewTableShim creates a new TableShim.
31+
func NewTableShim(classic, session TableAPI, diverter *internal.Diverter) TableAPI {
32+
return &TableShim{
33+
classic: classic,
34+
session: session,
35+
diverter: diverter,
36+
}
37+
}
38+
39+
// ReadRow implements TableAPI.
40+
func (t *TableShim) ReadRow(ctx context.Context, row string, opts ...ReadOption) (Row, error) {
41+
if t.diverter.UseSession() {
42+
return t.session.ReadRow(ctx, row, opts...)
43+
}
44+
return t.classic.ReadRow(ctx, row, opts...)
45+
}
46+
47+
// Apply implements TableAPI.
48+
func (t *TableShim) Apply(ctx context.Context, row string, m *Mutation, opts ...ApplyOption) error {
49+
if t.diverter.UseSession() {
50+
return t.session.Apply(ctx, row, m, opts...)
51+
}
52+
return t.classic.Apply(ctx, row, m, opts...)
53+
}
54+
55+
// ReadRows implements TableAPI. It delegates to classic as session support is not yet implemented.
56+
func (t *TableShim) ReadRows(ctx context.Context, arg RowSet, f func(Row) bool, opts ...ReadOption) error {
57+
return t.classic.ReadRows(ctx, arg, f, opts...)
58+
}
59+
60+
// SampleRowKeys implements TableAPI. It delegates to classic.
61+
func (t *TableShim) SampleRowKeys(ctx context.Context) ([]string, error) {
62+
return t.classic.SampleRowKeys(ctx)
63+
}
64+
65+
// ApplyBulk implements TableAPI. It delegates to classic.
66+
func (t *TableShim) ApplyBulk(ctx context.Context, rowKeys []string, muts []*Mutation, opts ...ApplyOption) ([]error, error) {
67+
return t.classic.ApplyBulk(ctx, rowKeys, muts, opts...)
68+
}
69+
70+
// ApplyReadModifyWrite implements TableAPI. It delegates to classic.
71+
func (t *TableShim) ApplyReadModifyWrite(ctx context.Context, row string, m *ReadModifyWrite) (Row, error) {
72+
return t.classic.ApplyReadModifyWrite(ctx, row, m)
73+
}

0 commit comments

Comments
 (0)