Skip to content

Commit a4e1af5

Browse files
committed
tenantcostclient: plumb SQL instance and session ID
This commit plumbs the SQL Instance ID and corresponding sqlliveness.SessionID to the tenant cost controller. Release note: None Release justification: Necessary fix for the distributed rate limiting functionality, which is vital for the upcoming Serverless MVP release. It allows CRDB to throttle clusters that have run out of free or paid request units (which measure CPU and I/O usage). This functionality is only enabled in multi-tenant scenarios and should have no impact on our dedicated customers.
1 parent 2aa0e1e commit a4e1af5

11 files changed

Lines changed: 77 additions & 26 deletions

File tree

pkg/ccl/multitenantccl/tenantcostclient/BUILD.bazel

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,15 @@ go_library(
1111
importpath = "github.com/cockroachdb/cockroach/pkg/ccl/multitenantccl/tenantcostclient",
1212
visibility = ["//visibility:public"],
1313
deps = [
14+
"//pkg/base",
1415
"//pkg/kv/kvclient/kvtenant",
1516
"//pkg/multitenant",
1617
"//pkg/multitenant/tenantcostmodel",
1718
"//pkg/roachpb:with-mocks",
1819
"//pkg/server",
1920
"//pkg/settings",
2021
"//pkg/settings/cluster",
22+
"//pkg/sql/sqlliveness",
2123
"//pkg/util/log",
2224
"//pkg/util/quotapool",
2325
"//pkg/util/stop",
@@ -50,6 +52,7 @@ go_test(
5052
"//pkg/server",
5153
"//pkg/settings/cluster",
5254
"//pkg/sql",
55+
"//pkg/sql/sqlliveness",
5356
"//pkg/testutils",
5457
"//pkg/testutils/serverutils",
5558
"//pkg/testutils/sqlutils",

pkg/ccl/multitenantccl/tenantcostclient/tenant_side.go

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,15 @@ import (
1212
"context"
1313
"time"
1414

15+
"github.com/cockroachdb/cockroach/pkg/base"
1516
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvtenant"
1617
"github.com/cockroachdb/cockroach/pkg/multitenant"
1718
"github.com/cockroachdb/cockroach/pkg/multitenant/tenantcostmodel"
1819
"github.com/cockroachdb/cockroach/pkg/roachpb"
1920
"github.com/cockroachdb/cockroach/pkg/server"
2021
"github.com/cockroachdb/cockroach/pkg/settings"
2122
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
23+
"github.com/cockroachdb/cockroach/pkg/sql/sqlliveness"
2224
"github.com/cockroachdb/cockroach/pkg/util/log"
2325
"github.com/cockroachdb/cockroach/pkg/util/stop"
2426
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
@@ -154,6 +156,8 @@ type tenantSideCostController struct {
154156
provider kvtenant.TokenBucketProvider
155157
limiter limiter
156158
stopper *stop.Stopper
159+
instanceID base.SQLInstanceID
160+
sessionID sqlliveness.SessionID
157161
cpuSecsFn multitenant.CPUSecsFn
158162

159163
mu struct {
@@ -228,9 +232,21 @@ var _ multitenant.TenantSideCostController = (*tenantSideCostController)(nil)
228232

229233
// Start is part of multitenant.TenantSideCostController.
230234
func (c *tenantSideCostController) Start(
231-
ctx context.Context, stopper *stop.Stopper, cpuSecsFn multitenant.CPUSecsFn,
235+
ctx context.Context,
236+
stopper *stop.Stopper,
237+
instanceID base.SQLInstanceID,
238+
sessionID sqlliveness.SessionID,
239+
cpuSecsFn multitenant.CPUSecsFn,
232240
) error {
241+
if instanceID == 0 {
242+
return errors.New("invalid SQLInstanceID")
243+
}
244+
if sessionID == "" {
245+
return errors.New("invalid sqlliveness.SessionID")
246+
}
233247
c.stopper = stopper
248+
c.instanceID = instanceID
249+
c.sessionID = sessionID
234250
c.cpuSecsFn = cpuSecsFn
235251
return stopper.RunAsyncTask(ctx, "cost-controller", func(ctx context.Context) {
236252
c.mainLoop(ctx)
@@ -335,9 +351,9 @@ func (c *tenantSideCostController) sendTokenBucketRequest(ctx context.Context) {
335351
}
336352

337353
req := roachpb.TokenBucketRequest{
338-
TenantID: c.tenantID.ToUint64(),
339-
// TODO(radu): populate instance ID.
340-
InstanceID: 1,
354+
TenantID: c.tenantID.ToUint64(),
355+
InstanceID: uint32(c.instanceID),
356+
InstanceLease: []byte(c.sessionID),
341357
ConsumptionSinceLastRequest: deltaConsumption,
342358
RequestedRU: requested,
343359
TargetRequestPeriod: c.run.targetPeriod,

pkg/ccl/multitenantccl/tenantcostclient/tenant_side_test.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626
"github.com/cockroachdb/cockroach/pkg/roachpb"
2727
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
2828
"github.com/cockroachdb/cockroach/pkg/sql"
29+
"github.com/cockroachdb/cockroach/pkg/sql/sqlliveness"
2930
"github.com/cockroachdb/cockroach/pkg/testutils"
3031
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
3132
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
@@ -142,7 +143,9 @@ func (ts *testState) start(t *testing.T) {
142143
usage := time.Duration(atomic.LoadInt64((*int64)(&ts.cpuUsage)))
143144
return usage.Seconds()
144145
}
145-
if err := ts.controller.Start(ctx, ts.stopper, cpuUsageFn); err != nil {
146+
instanceID := base.SQLInstanceID(1)
147+
sessionID := sqlliveness.SessionID("foo")
148+
if err := ts.controller.Start(ctx, ts.stopper, instanceID, sessionID, cpuUsageFn); err != nil {
146149
t.Fatal(err)
147150
}
148151
}

pkg/multitenant/BUILD.bazel

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,11 @@ go_library(
1111
importpath = "github.com/cockroachdb/cockroach/pkg/multitenant",
1212
visibility = ["//visibility:public"],
1313
deps = [
14+
"//pkg/base",
1415
"//pkg/kv",
1516
"//pkg/multitenant/tenantcostmodel",
1617
"//pkg/roachpb:with-mocks",
18+
"//pkg/sql/sqlliveness",
1719
"//pkg/util/metric",
1820
"//pkg/util/stop",
1921
],

pkg/multitenant/cost_controller.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,15 +13,23 @@ package multitenant
1313
import (
1414
"context"
1515

16+
"github.com/cockroachdb/cockroach/pkg/base"
1617
"github.com/cockroachdb/cockroach/pkg/multitenant/tenantcostmodel"
18+
"github.com/cockroachdb/cockroach/pkg/sql/sqlliveness"
1719
"github.com/cockroachdb/cockroach/pkg/util/stop"
1820
)
1921

2022
// TenantSideCostController is an interface through which tenant code reports
2123
// and throttles resource usage. Its implementation lives in the
2224
// tenantcostclient CCL package.
2325
type TenantSideCostController interface {
24-
Start(ctx context.Context, stopper *stop.Stopper, cpuSecsFn CPUSecsFn) error
26+
Start(
27+
ctx context.Context,
28+
stopper *stop.Stopper,
29+
instanceID base.SQLInstanceID,
30+
sessionID sqlliveness.SessionID,
31+
cpuSecsFn CPUSecsFn,
32+
) error
2533

2634
TenantSideKVInterceptor
2735
}

pkg/server/server_sql.go

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -130,11 +130,14 @@ type SQLServer struct {
130130
// sqlMemMetrics are used to track memory usage of sql sessions.
131131
sqlMemMetrics sql.MemoryMetrics
132132
stmtDiagnosticsRegistry *stmtdiagnostics.Registry
133-
sqlLivenessProvider sqlliveness.Provider
134-
sqlInstanceProvider sqlinstance.Provider
135-
metricsRegistry *metric.Registry
136-
diagnosticsReporter *diagnostics.Reporter
137-
spanconfigMgr *spanconfigmanager.Manager
133+
// sqlLivenessSessionID will be populated with a non-zero value for non-system
134+
// tenants.
135+
sqlLivenessSessionID sqlliveness.SessionID
136+
sqlLivenessProvider sqlliveness.Provider
137+
sqlInstanceProvider sqlinstance.Provider
138+
metricsRegistry *metric.Registry
139+
diagnosticsReporter *diagnostics.Reporter
140+
spanconfigMgr *spanconfigmanager.Manager
138141

139142
// settingsWatcher is utilized by secondary tenants to watch for settings
140143
// changes. It is nil on the system tenant.
@@ -956,14 +959,15 @@ func (s *SQLServer) initInstanceID(ctx context.Context) error {
956959
// as this is not a SQL pod server.
957960
return nil
958961
}
959-
instanceID, err := s.sqlInstanceProvider.Instance(ctx)
962+
instanceID, sessionID, err := s.sqlInstanceProvider.Instance(ctx)
960963
if err != nil {
961964
return err
962965
}
963966
err = s.sqlIDContainer.SetSQLInstanceID(instanceID)
964967
if err != nil {
965968
return err
966969
}
970+
s.sqlLivenessSessionID = sessionID
967971
s.execCfg.DistSQLPlanner.SetNodeInfo(roachpb.NodeDescriptor{NodeID: roachpb.NodeID(instanceID)})
968972
return nil
969973
}

pkg/server/tenant.go

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ import (
4141
"github.com/cockroachdb/cockroach/pkg/sql/contention"
4242
"github.com/cockroachdb/cockroach/pkg/sql/flowinfra"
4343
"github.com/cockroachdb/cockroach/pkg/sql/optionalnodeliveness"
44+
"github.com/cockroachdb/cockroach/pkg/sql/sqlliveness"
4445
"github.com/cockroachdb/cockroach/pkg/util/hlc"
4546
"github.com/cockroachdb/cockroach/pkg/util/log"
4647
"github.com/cockroachdb/cockroach/pkg/util/metric"
@@ -272,7 +273,9 @@ func StartTenant(
272273
log.SetNodeIDs(clusterID, 0 /* nodeID is not known for a SQL-only server. */)
273274
log.SetTenantIDs(args.TenantID.String(), int32(s.SQLInstanceID()))
274275

275-
if err := args.costController.Start(ctx, args.stopper, status.GetUserCPUSeconds); err != nil {
276+
if err := args.costController.Start(
277+
ctx, args.stopper, s.SQLInstanceID(), s.sqlLivenessSessionID, status.GetUserCPUSeconds,
278+
); err != nil {
276279
return nil, "", "", err
277280
}
278281

@@ -496,7 +499,11 @@ type noopTenantSideCostController struct{}
496499
var _ multitenant.TenantSideCostController = noopTenantSideCostController{}
497500

498501
func (noopTenantSideCostController) Start(
499-
ctx context.Context, stopper *stop.Stopper, cpuSecsFn multitenant.CPUSecsFn,
502+
ctx context.Context,
503+
stopper *stop.Stopper,
504+
instanceID base.SQLInstanceID,
505+
sessionID sqlliveness.SessionID,
506+
cpuSecsFn multitenant.CPUSecsFn,
500507
) error {
501508
return nil
502509
}

pkg/sql/sqlinstance/instanceprovider/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ go_test(
3131
"//pkg/base",
3232
"//pkg/clusterversion",
3333
"//pkg/settings/cluster",
34+
"//pkg/sql/sqlliveness",
3435
"//pkg/sql/sqlliveness/slinstance",
3536
"//pkg/sql/sqlliveness/slstorage",
3637
"//pkg/testutils",

pkg/sql/sqlinstance/instanceprovider/instanceprovider.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ type provider struct {
4343
initOnce sync.Once
4444
initialized chan struct{}
4545
instanceID base.SQLInstanceID
46+
sessionID sqlliveness.SessionID
4647
initError error
4748
}
4849

@@ -68,20 +69,22 @@ func New(
6869
}
6970

7071
// Instance returns the instance ID for the current SQL instance.
71-
func (p *provider) Instance(ctx context.Context) (_ base.SQLInstanceID, err error) {
72+
func (p *provider) Instance(
73+
ctx context.Context,
74+
) (_ base.SQLInstanceID, _ sqlliveness.SessionID, err error) {
7275
p.maybeInitialize()
7376
select {
7477
case <-ctx.Done():
75-
return base.SQLInstanceID(0), ctx.Err()
78+
return base.SQLInstanceID(0), "", ctx.Err()
7679
case <-p.stopper.ShouldQuiesce():
77-
return base.SQLInstanceID(0), stop.ErrUnavailable
80+
return base.SQLInstanceID(0), "", stop.ErrUnavailable
7881
case <-p.initialized:
7982
if p.initError == nil {
8083
log.Ops.Infof(ctx, "created SQL instance %d", p.instanceID)
8184
} else {
8285
log.Ops.Warningf(ctx, "error creating SQL instance: %s", p.initError)
8386
}
84-
return p.instanceID, p.initError
87+
return p.instanceID, p.sessionID, p.initError
8588
}
8689
}
8790

@@ -108,6 +111,7 @@ func (p *provider) initialize(ctx context.Context) error {
108111
if err != nil {
109112
return err
110113
}
114+
p.sessionID = session.ID()
111115
p.instanceID = instanceID
112116
session.RegisterCallbackForSessionExpiry(p.shutdownSQLInstance)
113117
return nil

pkg/sql/sqlinstance/instanceprovider/instanceprovider_test.go

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"github.com/cockroachdb/cockroach/pkg/clusterversion"
1919
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
2020
"github.com/cockroachdb/cockroach/pkg/sql/sqlinstance/instanceprovider"
21+
"github.com/cockroachdb/cockroach/pkg/sql/sqlliveness"
2122
"github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/slinstance"
2223
"github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/slstorage"
2324
"github.com/cockroachdb/cockroach/pkg/testutils"
@@ -61,18 +62,20 @@ func TestInstanceProvider(t *testing.T) {
6162
defer stopper.Stop(ctx)
6263
instanceProvider := instanceprovider.NewTestInstanceProvider(stopper, slInstance, addr)
6364
slInstance.Start(ctx)
64-
instanceID, err := instanceProvider.Instance(ctx)
65+
instanceID, sessionID, err := instanceProvider.Instance(ctx)
6566
require.NoError(t, err)
6667
require.Equal(t, expectedInstanceID, instanceID)
68+
require.NotEqual(t, sqlliveness.SessionID(""), sessionID)
6769

6870
// Verify an additional call to Instance(), returns the same instance
69-
instanceID, err = instanceProvider.Instance(ctx)
71+
instanceID, sessionID2, err := instanceProvider.Instance(ctx)
7072
require.NoError(t, err)
7173
require.Equal(t, expectedInstanceID, instanceID)
74+
require.Equal(t, sessionID, sessionID2)
75+
7276
session, err := slInstance.Session(ctx)
73-
if err != nil {
74-
t.Fatal(err)
75-
}
77+
require.NoError(t, err)
78+
require.Equal(t, session.ID(), sessionID)
7679

7780
// Update clock time to move ahead of session expiry to ensure session expiry callback is invoked.
7881
newTime := session.Expiration().Add(1, 0).UnsafeToClockTimestamp()
@@ -81,7 +84,7 @@ func TestInstanceProvider(t *testing.T) {
8184
slInstance.ClearSessionForTest(ctx)
8285
// Verify that the SQL instance is shutdown on session expiry.
8386
testutils.SucceedsSoon(t, func() error {
84-
if _, err = instanceProvider.Instance(ctx); !errors.Is(err, stop.ErrUnavailable) {
87+
if _, _, err = instanceProvider.Instance(ctx); !errors.Is(err, stop.ErrUnavailable) {
8588
return errors.Errorf("sql instance is not shutdown on session expiry")
8689
}
8790
return nil
@@ -94,7 +97,7 @@ func TestInstanceProvider(t *testing.T) {
9497
instanceProvider := instanceprovider.NewTestInstanceProvider(stopper, slInstance, "addr")
9598
slInstance.Start(ctx)
9699
instanceProvider.ShutdownSQLInstanceForTest(ctx)
97-
_, err := instanceProvider.Instance(ctx)
100+
_, _, err := instanceProvider.Instance(ctx)
98101
require.Error(t, err)
99102
require.Equal(t, "instance never initialized", err.Error())
100103
})

0 commit comments

Comments
 (0)