Skip to content

Commit 0874d58

Browse files
authored
fix: Skip very large gRPC messages, log when it happens (#421)
Context: cloudquery/cloudquery#4834 This change: - bumps the gRPC max message size from 50MiB to 100MiB - logs the name of the table to Sentry when we see a message over 50MiB - logs to the user if we drop a message because it exceeds 100MiB
1 parent aadbde9 commit 0874d58

6 files changed

Lines changed: 37 additions & 8 deletions

File tree

clients/constants.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
package clients
22

33
const (
4-
maxMsgSize = 50 * 1024 * 1024 // 50 MiB
4+
maxMsgSize = 100 * 1024 * 1024 // 100 MiB
55
)

internal/servers/constants.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
package servers
2+
3+
const MaxMsgSize = 100 * 1024 * 1024 // 100 MiB

internal/servers/source.go

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,18 @@ import (
44
"bytes"
55
"context"
66
"encoding/json"
7+
"errors"
78
"fmt"
89

910
"github.com/cloudquery/plugin-sdk/internal/pb"
1011
"github.com/cloudquery/plugin-sdk/plugins"
1112
"github.com/cloudquery/plugin-sdk/schema"
1213
"github.com/cloudquery/plugin-sdk/specs"
14+
"github.com/getsentry/sentry-go"
1315
"github.com/rs/zerolog"
1416
"google.golang.org/grpc/codes"
1517
"google.golang.org/grpc/status"
18+
"google.golang.org/protobuf/proto"
1619
)
1720

1821
type SourceServer struct {
@@ -84,9 +87,17 @@ func (s *SourceServer) Sync2(req *pb.Sync2_Request, stream pb.Source_Sync2Server
8487
return status.Errorf(codes.Internal, "failed to marshal resource: %v", err)
8588
}
8689

87-
if err := stream.Send(&pb.Sync2_Response{
90+
msg := &pb.Sync2_Response{
8891
Resource: b,
89-
}); err != nil {
92+
}
93+
err = checkMessageSize(msg, resource)
94+
if err != nil {
95+
s.Logger.Warn().Str("table", resource.Table.Name).
96+
Int("bytes", len(msg.String())).
97+
Msg("Row exceeding max bytes ignored")
98+
continue
99+
}
100+
if err := stream.Send(msg); err != nil {
90101
return status.Errorf(codes.Internal, "failed to send resource: %v", err)
91102
}
92103
}
@@ -116,3 +127,19 @@ func (s *SourceServer) GetMetrics(context.Context, *pb.GetSourceMetrics_Request)
116127
Metrics: b,
117128
}, nil
118129
}
130+
131+
func checkMessageSize(msg proto.Message, resource *schema.Resource) error {
132+
size := proto.Size(msg)
133+
// log error to Sentry if row exceeds half of the max size
134+
if size > MaxMsgSize/2 {
135+
sentry.WithScope(func(scope *sentry.Scope) {
136+
scope.SetTag("table", resource.Table.Name)
137+
scope.SetExtra("bytes", size)
138+
sentry.CurrentHub().CaptureMessage("Large message detected")
139+
})
140+
}
141+
if size > MaxMsgSize {
142+
return errors.New("message exceeds max size")
143+
}
144+
return nil
145+
}

serve/constants.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,5 +8,4 @@ const (
88
// bufSize used for unit testing grpc server and client
99
testBufSize = 1024 * 1024
1010
flushTimeout = 5 * time.Second
11-
maxMsgSize = 50 * 1024 * 1024 // 50 MiB
1211
)

serve/destination.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -107,8 +107,8 @@ func newCmdDestinationServe(destination *destinationServe) *cobra.Command {
107107
grpc.ChainStreamInterceptor(
108108
logging.StreamServerInterceptor(grpczerolog.InterceptorLogger(logger)),
109109
),
110-
grpc.MaxRecvMsgSize(maxMsgSize),
111-
grpc.MaxSendMsgSize(maxMsgSize),
110+
grpc.MaxRecvMsgSize(servers.MaxMsgSize),
111+
grpc.MaxSendMsgSize(servers.MaxMsgSize),
112112
)
113113
pb.RegisterDestinationServer(s, &servers.DestinationServer{
114114
Plugin: destination.plugin,

serve/source.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -113,8 +113,8 @@ func newCmdSourceServe(source *sourceServe) *cobra.Command {
113113
grpc.ChainStreamInterceptor(
114114
logging.StreamServerInterceptor(grpczerolog.InterceptorLogger(logger)),
115115
),
116-
grpc.MaxRecvMsgSize(maxMsgSize),
117-
grpc.MaxSendMsgSize(maxMsgSize),
116+
grpc.MaxRecvMsgSize(servers.MaxMsgSize),
117+
grpc.MaxSendMsgSize(servers.MaxMsgSize),
118118
)
119119
source.plugin.SetLogger(logger)
120120
pb.RegisterSourceServer(s, &servers.SourceServer{

0 commit comments

Comments
 (0)