Skip to content

Commit 1b26380

Browse files
authored
Plug in rate limit service URL into xds cluster (#983)
* Plug in rate limit service URL into xds cluster Signed-off-by: Arko Dasgupta <arko@tetrate.io> * more guardrails Signed-off-by: Arko Dasgupta <arko@tetrate.io> * set to grpcPort and pin image Signed-off-by: Arko Dasgupta <arko@tetrate.io>
1 parent eec43d2 commit 1b26380

10 files changed

Lines changed: 80 additions & 27 deletions

File tree

internal/globalratelimit/runner/runner.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,9 +48,15 @@ func (r *Runner) subscribeAndTranslate(ctx context.Context) {
4848
var xdsIRs []*ir.Xds
4949
snapshot := <-xdsIRCh
5050
r.Logger.Info("received a notification")
51+
// Skip translation if state is empty
52+
if len(snapshot.State) == 0 {
53+
continue
54+
}
55+
5156
for _, value := range snapshot.State {
5257
xdsIRs = append(xdsIRs, value)
5358
}
59+
5460
// Translate to ratelimit infra IR
5561
result, err := r.translate(xdsIRs)
5662
if err != nil {

internal/infrastructure/kubernetes/deployment.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -53,10 +53,10 @@ const (
5353

5454
// rateLimitInfraName is the name for rate-limit resources.
5555
rateLimitInfraName = "envoy-ratelimit"
56-
// rateLimitInfraHTTPPort is the http port that the rate limit service listens on.
57-
rateLimitInfraHTTPPort = 8080
56+
// rateLimitInfraGRPCPort is the grpc port that the rate limit service listens on.
57+
rateLimitInfraGRPCPort = 8081
5858
// rateLimitInfraImage is the container image for the rate limit service.
59-
rateLimitInfraImage = "envoyproxy/ratelimit:latest"
59+
rateLimitInfraImage = "envoyproxy/ratelimit:f28024e3"
6060
)
6161

6262
//go:embed bootstrap.yaml.tpl
@@ -362,7 +362,7 @@ func expectedRateLimitContainers(infra *ir.RateLimitInfra) []corev1.Container {
362362
ports := []corev1.ContainerPort{
363363
{
364364
Name: "http",
365-
ContainerPort: rateLimitInfraHTTPPort,
365+
ContainerPort: rateLimitInfraGRPCPort,
366366
Protocol: corev1.ProtocolTCP,
367367
},
368368
}

internal/infrastructure/kubernetes/service.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -125,8 +125,8 @@ func (i *Infra) expectedRateLimitService(_ *ir.RateLimitInfra) *corev1.Service {
125125
{
126126
Name: "http",
127127
Protocol: corev1.ProtocolTCP,
128-
Port: rateLimitInfraHTTPPort,
129-
TargetPort: intstr.IntOrString{IntVal: rateLimitInfraHTTPPort},
128+
Port: rateLimitInfraGRPCPort,
129+
TargetPort: intstr.IntOrString{IntVal: rateLimitInfraGRPCPort},
130130
},
131131
}
132132

@@ -151,6 +151,11 @@ func (i *Infra) expectedRateLimitService(_ *ir.RateLimitInfra) *corev1.Service {
151151
return svc
152152
}
153153

154+
// GetRateLimitServiceURL returns the URL for the rate limit service.
155+
func GetRateLimitServiceURL(namespace string) string {
156+
return fmt.Sprintf("grpc://%s.%s.svc.cluster.local:%d", rateLimitInfraName, namespace, rateLimitInfraGRPCPort)
157+
}
158+
154159
// createOrUpdateRateLimitService creates a Service in the kube api server based on the provided infra,
155160
// if it doesn't exist or updates it if it does.
156161
func (i *Infra) createOrUpdateRateLimitService(ctx context.Context, infra *ir.RateLimitInfra) error {

internal/xds/translator/listener.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ func buildXdsTCPListener(name, address string, port uint32) *listener.Listener {
5252
}
5353
}
5454

55-
func addXdsHTTPFilterChain(xdsListener *listener.Listener, irListener *ir.HTTPListener) error {
55+
func (t *Translator) addXdsHTTPFilterChain(xdsListener *listener.Listener, irListener *ir.HTTPListener) error {
5656
routerAny, err := anypb.New(&router.Router{})
5757
if err != nil {
5858
return err
@@ -121,7 +121,7 @@ func addXdsHTTPFilterChain(xdsListener *listener.Listener, irListener *ir.HTTPLi
121121

122122
// TODO: Make this a generic interface for all API Gateway features.
123123
// https://github.com/envoyproxy/gateway/issues/882
124-
patchHCMWithRateLimit(mgr, irListener)
124+
t.patchHCMWithRateLimit(mgr, irListener)
125125

126126
// Add the jwt authn filter, if needed.
127127
if err := patchHCMWithJwtAuthnFilter(mgr, irListener); err != nil {

internal/xds/translator/ratelimit.go

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ package translator
77

88
import (
99
"bytes"
10+
"net/url"
1011
"strconv"
1112
"time"
1213

@@ -29,9 +30,9 @@ import (
2930

3031
// patchHCMWithRateLimit builds and appends the Rate Limit Filter to the HTTP connection manager
3132
// if applicable and it does not already exist.
32-
func patchHCMWithRateLimit(mgr *hcm.HttpConnectionManager, irListener *ir.HTTPListener) {
33+
func (t *Translator) patchHCMWithRateLimit(mgr *hcm.HttpConnectionManager, irListener *ir.HTTPListener) {
3334
// Return early if rate limits dont exist
34-
if !isRateLimitPresent(irListener) {
35+
if !t.isRateLimitPresent(irListener) {
3536
return
3637
}
3738

@@ -48,7 +49,11 @@ func patchHCMWithRateLimit(mgr *hcm.HttpConnectionManager, irListener *ir.HTTPLi
4849
}
4950

5051
// isRateLimitPresent returns true if rate limit config exists for the listener.
51-
func isRateLimitPresent(irListener *ir.HTTPListener) bool {
52+
func (t *Translator) isRateLimitPresent(irListener *ir.HTTPListener) bool {
53+
// Return false if global ratelimiting is disabled.
54+
if t.GlobalRateLimit == nil {
55+
return false
56+
}
5257
// Return true if rate limit config exists.
5358
for _, route := range irListener.Routes {
5459
if route.RateLimit != nil && route.RateLimit.Global != nil {
@@ -256,14 +261,14 @@ func buildRateLimitServiceDescriptors(descriptorPrefix string, global *ir.Global
256261
return yamlDescs
257262
}
258263

259-
func buildRateLimitServiceCluster(irListener *ir.HTTPListener) *cluster.Cluster {
264+
func (t *Translator) buildRateLimitServiceCluster(irListener *ir.HTTPListener) *cluster.Cluster {
260265
// Return early if rate limits dont exist.
261-
if !isRateLimitPresent(irListener) {
266+
if !t.isRateLimitPresent(irListener) {
262267
return nil
263268
}
264269

265270
clusterName := getRateLimitServiceClusterName()
266-
host, port := getRateLimitServiceGrpcHostPort()
271+
host, port := t.getRateLimitServiceGrpcHostPort()
267272
rateLimitServerCluster := &cluster.Cluster{
268273
Name: clusterName,
269274
ClusterDiscoveryType: &cluster.Cluster_Type{Type: cluster.Cluster_STRICT_DNS},
@@ -317,6 +322,14 @@ func getRateLimitDomain(irListener *ir.HTTPListener) string {
317322
return irListener.Name
318323
}
319324

320-
func getRateLimitServiceGrpcHostPort() (string, int) {
321-
return "TODO", 0
325+
func (t *Translator) getRateLimitServiceGrpcHostPort() (string, int) {
326+
u, err := url.Parse(t.GlobalRateLimit.ServiceURL)
327+
if err != nil {
328+
panic(err)
329+
}
330+
p, err := strconv.Atoi(u.Port())
331+
if err != nil {
332+
panic(err)
333+
}
334+
return u.Hostname(), p
322335
}

internal/xds/translator/runner/runner.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"context"
1010

1111
"github.com/envoyproxy/gateway/internal/envoygateway/config"
12+
infra "github.com/envoyproxy/gateway/internal/infrastructure/kubernetes"
1213
"github.com/envoyproxy/gateway/internal/ir"
1314
"github.com/envoyproxy/gateway/internal/message"
1415
"github.com/envoyproxy/gateway/internal/xds/translator"
@@ -52,7 +53,16 @@ func (r *Runner) subscribeAndTranslate(ctx context.Context) {
5253
r.Xds.Delete(key)
5354
} else {
5455
// Translate to xds resources
55-
result, err := translator.Translate(val)
56+
t := &translator.Translator{}
57+
58+
// Set the rate limit service URL if global rate limiting is enabled.
59+
if r.EnvoyGateway.RateLimit != nil {
60+
t.GlobalRateLimit = &translator.GlobalRateLimitSettings{
61+
ServiceURL: infra.GetRateLimitServiceURL(r.Namespace),
62+
}
63+
}
64+
65+
result, err := t.Translate(val)
5666
if err != nil {
5767
r.Logger.Error(err, "failed to translate xds ir")
5868
} else {

internal/xds/translator/testdata/out/xds-ir/authn-ratelimit.clusters.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,8 +63,8 @@
6363
- endpoint:
6464
address:
6565
socketAddress:
66-
address: TODO
67-
portValue: 0
66+
address: envoy-ratelimit.envoy-gateway-system.svc.cluster.local
67+
portValue: 8081
6868
name: ratelimit_cluster
6969
respectDnsTtl: true
7070
type: STRICT_DNS

internal/xds/translator/testdata/out/xds-ir/ratelimit.clusters.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,8 +63,8 @@
6363
- endpoint:
6464
address:
6565
socketAddress:
66-
address: TODO
67-
portValue: 0
66+
address: envoy-ratelimit.envoy-gateway-system.svc.cluster.local
67+
portValue: 8081
6868
name: ratelimit_cluster
6969
respectDnsTtl: true
7070
type: STRICT_DNS

internal/xds/translator/translator.go

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,15 +20,28 @@ import (
2020
"github.com/envoyproxy/gateway/internal/xds/types"
2121
)
2222

23+
// Translator translates the xDS IR into xDS resources.
24+
type Translator struct {
25+
// GlobalRateLimit holds the global rate limit settings
26+
// required during xds translation.
27+
GlobalRateLimit *GlobalRateLimitSettings
28+
}
29+
30+
type GlobalRateLimitSettings struct {
31+
// ServiceURL is the URL of the global
32+
// rate limit service.
33+
ServiceURL string
34+
}
35+
2336
// Translate translates the XDS IR into xDS resources
24-
func Translate(ir *ir.Xds) (*types.ResourceVersionTable, error) {
37+
func (t *Translator) Translate(ir *ir.Xds) (*types.ResourceVersionTable, error) {
2538
if ir == nil {
2639
return nil, errors.New("ir is nil")
2740
}
2841

2942
tCtx := new(types.ResourceVersionTable)
3043

31-
if err := processHTTPListenerXdsTranslation(tCtx, ir.HTTP); err != nil {
44+
if err := t.processHTTPListenerXdsTranslation(tCtx, ir.HTTP); err != nil {
3245
return nil, err
3346
}
3447

@@ -43,7 +56,7 @@ func Translate(ir *ir.Xds) (*types.ResourceVersionTable, error) {
4356
return tCtx, nil
4457
}
4558

46-
func processHTTPListenerXdsTranslation(tCtx *types.ResourceVersionTable, httpListeners []*ir.HTTPListener) error {
59+
func (t *Translator) processHTTPListenerXdsTranslation(tCtx *types.ResourceVersionTable, httpListeners []*ir.HTTPListener) error {
4760
for _, httpListener := range httpListeners {
4861
addFilterChain := true
4962
var xdsRouteCfg *route.RouteConfiguration
@@ -70,7 +83,7 @@ func processHTTPListenerXdsTranslation(tCtx *types.ResourceVersionTable, httpLis
7083
}
7184

7285
if addFilterChain {
73-
if err := addXdsHTTPFilterChain(xdsListener, httpListener); err != nil {
86+
if err := t.addXdsHTTPFilterChain(xdsListener, httpListener); err != nil {
7487
return err
7588
}
7689
}
@@ -127,7 +140,7 @@ func processHTTPListenerXdsTranslation(tCtx *types.ResourceVersionTable, httpLis
127140
// This is current O(n) right now, but it also leverages an existing
128141
// object without allocating new memory. Consider improving it in the future.
129142
if rlCluster := findXdsCluster(tCtx, getRateLimitServiceClusterName()); rlCluster == nil {
130-
rlCluster := buildRateLimitServiceCluster(httpListener)
143+
rlCluster := t.buildRateLimitServiceCluster(httpListener)
131144
// Add cluster
132145
if rlCluster != nil {
133146
tCtx.AddXdsResource(resource.ClusterType, rlCluster)

internal/xds/translator/translator_test.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"google.golang.org/protobuf/proto"
2020
"sigs.k8s.io/yaml"
2121

22+
infra "github.com/envoyproxy/gateway/internal/infrastructure/kubernetes"
2223
"github.com/envoyproxy/gateway/internal/ir"
2324
)
2425

@@ -126,7 +127,12 @@ func TestTranslateXds(t *testing.T) {
126127
tc := tc
127128
t.Run(tc.name, func(t *testing.T) {
128129
ir := requireXdsIRFromInputTestData(t, "xds-ir", tc.name+".yaml")
129-
tCtx, err := Translate(ir)
130+
tr := &Translator{
131+
GlobalRateLimit: &GlobalRateLimitSettings{
132+
ServiceURL: infra.GetRateLimitServiceURL("envoy-gateway-system"),
133+
},
134+
}
135+
tCtx, err := tr.Translate(ir)
130136
require.NoError(t, err)
131137
listeners := tCtx.XdsResources[resource.ListenerType]
132138
routes := tCtx.XdsResources[resource.RouteType]

0 commit comments

Comments
 (0)