Skip to content

Commit 5f848de

Browse files
feat!: Idiomatic serve interface (#126)
* feat!: Idiomatic serve interface This is a breaking change for the serve interface so we can easily add more options without breaking the interface and move errors to compile time instead of runtime. * chore: Fix lint error Co-authored-by: erezrokah <erezrokah@users.noreply.github.com>
1 parent 9575a24 commit 5f848de

5 files changed

Lines changed: 313 additions & 158 deletions

File tree

serve/destination.go

Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
package serve
2+
3+
import (
4+
"fmt"
5+
"net"
6+
"os"
7+
"strings"
8+
9+
"github.com/cloudquery/plugin-sdk/internal/pb"
10+
"github.com/cloudquery/plugin-sdk/internal/servers"
11+
"github.com/cloudquery/plugin-sdk/plugins"
12+
"github.com/getsentry/sentry-go"
13+
grpczerolog "github.com/grpc-ecosystem/go-grpc-middleware/providers/zerolog/v2"
14+
middleware "github.com/grpc-ecosystem/go-grpc-middleware/v2"
15+
"github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/logging"
16+
"github.com/rs/zerolog"
17+
"github.com/rs/zerolog/log"
18+
"github.com/spf13/cobra"
19+
"google.golang.org/grpc"
20+
"google.golang.org/grpc/test/bufconn"
21+
)
22+
23+
type destinationServe struct {
24+
plugin plugins.DestinationPlugin
25+
sentryDSN string
26+
}
27+
28+
type DestinationOption func(*destinationServe)
29+
30+
func WithDestinationSentryDSN(dsn string) DestinationOption {
31+
return func(s *destinationServe) {
32+
s.sentryDSN = dsn
33+
}
34+
}
35+
36+
func Destination(plugin plugins.DestinationPlugin, opts ...DestinationOption) {
37+
s := &destinationServe{
38+
plugin: plugin,
39+
}
40+
for _, opt := range opts {
41+
opt(s)
42+
}
43+
if err := newCmdDestinationRoot(s).Execute(); err != nil {
44+
sentry.CaptureMessage(err.Error())
45+
sentry.Flush(flushTimeout)
46+
fmt.Println(err)
47+
os.Exit(1)
48+
}
49+
sentry.Flush(flushTimeout)
50+
}
51+
52+
// nolint:dupl
53+
func newCmdDestinationServe(destination *destinationServe) *cobra.Command {
54+
var address string
55+
var network string
56+
var noSentry bool
57+
logLevel := newEnum([]string{"trace", "debug", "info", "warn", "error"}, "info")
58+
logFormat := newEnum([]string{"text", "json"}, "text")
59+
cmd := &cobra.Command{
60+
Use: "serve",
61+
Short: serveShort,
62+
Long: serveShort,
63+
Args: cobra.NoArgs,
64+
RunE: func(cmd *cobra.Command, args []string) error {
65+
zerologLevel, err := zerolog.ParseLevel(logLevel.String())
66+
if err != nil {
67+
return err
68+
}
69+
var logger zerolog.Logger
70+
if logFormat.String() == "json" {
71+
logger = zerolog.New(os.Stdout).Level(zerologLevel)
72+
} else {
73+
logger = log.Output(zerolog.ConsoleWriter{Out: os.Stdout}).Level(zerologLevel)
74+
}
75+
76+
// opts.Plugin.Logger = logger
77+
var listener net.Listener
78+
if network == "test" {
79+
listener = bufconn.Listen(testBufSize)
80+
testListener = listener.(*bufconn.Listener)
81+
} else {
82+
listener, err = net.Listen(network, address)
83+
if err != nil {
84+
return fmt.Errorf("failed to listen %s:%s: %w", network, address, err)
85+
}
86+
}
87+
// See logging pattern https://github.com/grpc-ecosystem/go-grpc-middleware/blob/v2/providers/zerolog/examples_test.go
88+
s := grpc.NewServer(
89+
middleware.WithUnaryServerChain(
90+
logging.UnaryServerInterceptor(grpczerolog.InterceptorLogger(logger)),
91+
),
92+
middleware.WithStreamServerChain(
93+
logging.StreamServerInterceptor(grpczerolog.InterceptorLogger(logger)),
94+
),
95+
)
96+
97+
destination.plugin.SetLogger(logger)
98+
pb.RegisterDestinationServer(s, &servers.DestinationServer{Plugin: destination.plugin})
99+
version := destination.plugin.Version()
100+
101+
if destination.sentryDSN != "" && version != "development" {
102+
err = sentry.Init(sentry.ClientOptions{
103+
Dsn: destination.sentryDSN,
104+
Debug: false,
105+
AttachStacktrace: true,
106+
Release: destination.plugin.Version(),
107+
// https://docs.sentry.io/platforms/go/configuration/options/#removing-default-integrations
108+
Integrations: func(integrations []sentry.Integration) []sentry.Integration {
109+
var filteredIntegrations []sentry.Integration
110+
for _, integration := range integrations {
111+
if integration.Name() == "Modules" {
112+
continue
113+
}
114+
filteredIntegrations = append(filteredIntegrations, integration)
115+
}
116+
return filteredIntegrations
117+
},
118+
})
119+
if err != nil {
120+
log.Error().Err(err).Msg("Error initializing sentry")
121+
}
122+
}
123+
124+
logger.Info().Str("address", listener.Addr().String()).Msg("server listening")
125+
if err := s.Serve(listener); err != nil {
126+
return fmt.Errorf("failed to serve: %w", err)
127+
}
128+
return nil
129+
},
130+
}
131+
cmd.Flags().StringVar(&address, "address", "localhost:7777", "address to serve on. can be tcp: `localhost:7777` or unix socket: `/tmp/plugin.rpc.sock`")
132+
cmd.Flags().StringVar(&network, "network", "tcp", `the network must be "tcp", "tcp4", "tcp6", "unix" or "unixpacket"`)
133+
cmd.Flags().Var(logLevel, "log-level", fmt.Sprintf("log level. one of: %s", strings.Join(logLevel.Allowed, ",")))
134+
cmd.Flags().Var(logFormat, "log-format", fmt.Sprintf("log format. one of: %s", strings.Join(logFormat.Allowed, ",")))
135+
cmd.Flags().BoolVar(&noSentry, "no-sentry", false, "disable sentry")
136+
137+
return cmd
138+
}
139+
140+
func newCmdDestinationRoot(destination *destinationServe) *cobra.Command {
141+
cmd := &cobra.Command{
142+
Use: "destination-plugin <command>",
143+
}
144+
cmd.AddCommand(newCmdDestinationServe(destination))
145+
cmd.CompletionOptions.DisableDefaultCmd = true
146+
return cmd
147+
}

serve/doc.go

Lines changed: 0 additions & 27 deletions
This file was deleted.

serve/serve.go

Lines changed: 0 additions & 129 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,9 @@
11
package serve
22

33
import (
4-
"fmt"
5-
"net"
6-
"os"
7-
"strings"
84
"time"
95

10-
"github.com/cloudquery/plugin-sdk/internal/pb"
11-
"github.com/cloudquery/plugin-sdk/internal/servers"
126
"github.com/cloudquery/plugin-sdk/plugins"
13-
"github.com/getsentry/sentry-go"
14-
grpczerolog "github.com/grpc-ecosystem/go-grpc-middleware/providers/zerolog/v2"
15-
middleware "github.com/grpc-ecosystem/go-grpc-middleware/v2"
16-
"github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/logging"
17-
"github.com/rs/zerolog"
18-
"github.com/rs/zerolog/log"
19-
"github.com/spf13/cobra"
20-
"google.golang.org/grpc"
217
"google.golang.org/grpc/test/bufconn"
228
)
239

@@ -37,118 +23,3 @@ const (
3723

3824
// lis used for unit testing grpc server and client
3925
var testListener *bufconn.Listener
40-
41-
func newCmdServe(opts Options) *cobra.Command {
42-
var address string
43-
var network string
44-
var noSentry bool
45-
logLevel := newEnum([]string{"trace", "debug", "info", "warn", "error"}, "info")
46-
logFormat := newEnum([]string{"text", "json"}, "text")
47-
cmd := &cobra.Command{
48-
Use: "serve",
49-
Short: serveShort,
50-
Long: serveShort,
51-
Args: cobra.NoArgs,
52-
RunE: func(cmd *cobra.Command, args []string) error {
53-
zerologLevel, err := zerolog.ParseLevel(logLevel.String())
54-
if err != nil {
55-
return err
56-
}
57-
var logger zerolog.Logger
58-
if logFormat.String() == "json" {
59-
logger = zerolog.New(os.Stdout).Level(zerologLevel)
60-
} else {
61-
logger = log.Output(zerolog.ConsoleWriter{Out: os.Stdout}).Level(zerologLevel)
62-
}
63-
64-
// opts.Plugin.Logger = logger
65-
var listener net.Listener
66-
if network == "test" {
67-
listener = bufconn.Listen(testBufSize)
68-
testListener = listener.(*bufconn.Listener)
69-
} else {
70-
listener, err = net.Listen(network, address)
71-
if err != nil {
72-
return fmt.Errorf("failed to listen %s:%s: %w", network, address, err)
73-
}
74-
}
75-
// See logging pattern https://github.com/grpc-ecosystem/go-grpc-middleware/blob/v2/providers/zerolog/examples_test.go
76-
s := grpc.NewServer(
77-
middleware.WithUnaryServerChain(
78-
logging.UnaryServerInterceptor(grpczerolog.InterceptorLogger(logger)),
79-
),
80-
middleware.WithStreamServerChain(
81-
logging.StreamServerInterceptor(grpczerolog.InterceptorLogger(logger)),
82-
),
83-
)
84-
85-
version := "development"
86-
if opts.SourcePlugin != nil {
87-
opts.SourcePlugin.SetLogger(logger)
88-
pb.RegisterSourceServer(s, &servers.SourceServer{Plugin: opts.SourcePlugin})
89-
version = opts.SourcePlugin.Version()
90-
}
91-
if opts.DestinationPlugin != nil {
92-
// opts.DestinationPlugin.Logger = logger
93-
pb.RegisterDestinationServer(s, &servers.DestinationServer{Plugin: opts.DestinationPlugin})
94-
version = opts.DestinationPlugin.Version()
95-
}
96-
97-
if !noSentry && version != "development" {
98-
err = sentry.Init(sentry.ClientOptions{
99-
Dsn: opts.SentryDsn,
100-
Debug: false,
101-
AttachStacktrace: true,
102-
Release: opts.SourcePlugin.Version(),
103-
// https://docs.sentry.io/platforms/go/configuration/options/#removing-default-integrations
104-
Integrations: func(integrations []sentry.Integration) []sentry.Integration {
105-
var filteredIntegrations []sentry.Integration
106-
for _, integration := range integrations {
107-
if integration.Name() == "Modules" {
108-
continue
109-
}
110-
filteredIntegrations = append(filteredIntegrations, integration)
111-
}
112-
return filteredIntegrations
113-
},
114-
})
115-
if err != nil {
116-
log.Error().Err(err).Msg("Error initializing sentry")
117-
}
118-
}
119-
120-
logger.Info().Str("address", listener.Addr().String()).Msg("server listening")
121-
if err := s.Serve(listener); err != nil {
122-
return fmt.Errorf("failed to serve: %w", err)
123-
}
124-
return nil
125-
},
126-
}
127-
cmd.Flags().StringVar(&address, "address", "localhost:7777", "address to serve on. can be tcp: `localhost:7777` or unix socket: `/tmp/plugin.rpc.sock`")
128-
cmd.Flags().StringVar(&network, "network", "tcp", `the network must be "tcp", "tcp4", "tcp6", "unix" or "unixpacket"`)
129-
cmd.Flags().Var(logLevel, "log-level", fmt.Sprintf("log level. one of: %s", strings.Join(logLevel.Allowed, ",")))
130-
cmd.Flags().Var(logFormat, "log-format", fmt.Sprintf("log format. one of: %s", strings.Join(logFormat.Allowed, ",")))
131-
cmd.Flags().BoolVar(&noSentry, "no-sentry", false, "disable sentry")
132-
133-
return cmd
134-
}
135-
136-
func newCmdRoot(opts Options) *cobra.Command {
137-
cmd := &cobra.Command{
138-
Use: "plugin <command>",
139-
}
140-
cmd.AddCommand(newCmdServe(opts))
141-
cmd.AddCommand(newCmdDoc(opts))
142-
cmd.CompletionOptions.DisableDefaultCmd = true
143-
return cmd
144-
}
145-
146-
func Serve(opts Options) {
147-
if err := newCmdRoot(opts).Execute(); err != nil {
148-
sentry.CaptureMessage(err.Error())
149-
sentry.Flush(flushTimeout)
150-
fmt.Println(err)
151-
os.Exit(1)
152-
}
153-
sentry.Flush(flushTimeout)
154-
}

0 commit comments

Comments
 (0)