Skip to content

Commit 59ea0de

Browse files
authored
feat(bigtable): NewServerWithListener (#12408)
1 parent feb078b commit 59ea0de

File tree

2 files changed

+134
-7
lines changed

2 files changed

+134
-7
lines changed

bigtable/bttest/inmem.go

Lines changed: 50 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,19 @@ To use a Server, create it, and then connect to it with no security:
2929
client, err := bigtable.NewClient(ctx, proj, instance,
3030
option.WithGRPCConn(conn))
3131
...
32+
33+
To use a Server with an in-memory connection, provide a bufconn listener:
34+
35+
l := bufconn.Listen(1024 * 1024)
36+
srv, err := bttest.NewServerWithListener(l)
37+
...
38+
conn, err := grpc.Dial(
39+
"bufnet",
40+
grpc.WithContextDialer(func(context.Context, string) (net.Conn, error) {
41+
return l.Dial()
42+
}),
43+
grpc.WithTransportCredentials(insecure.NewCredentials()))
44+
...
3245
*/
3346
package bttest // import "cloud.google.com/go/bigtable/bttest"
3447

@@ -85,9 +98,10 @@ var validLabelTransformer = regexp.MustCompile(`[a-z0-9\-]{1,15}`)
8598
type Server struct {
8699
Addr string
87100

88-
l net.Listener
89-
srv *grpc.Server
90-
s *server
101+
l net.Listener
102+
srv *grpc.Server
103+
s *server
104+
ownsListener bool
91105
}
92106

93107
// server is the real implementation of the fake.
@@ -105,6 +119,15 @@ type server struct {
105119
btpb.BigtableServer
106120
}
107121

122+
// noopCloserListener wraps a net.Listener, but its Close method is a no-op.
123+
// This is used to prevent the gRPC server from closing a listener that was
124+
// provided by a user.
125+
type noopCloserListener struct {
126+
net.Listener
127+
}
128+
129+
func (n *noopCloserListener) Close() error { return nil }
130+
108131
// NewServer creates a new Server.
109132
// The Server will be listening for gRPC connections, without TLS,
110133
// on the provided address. The resolved address is named by the Addr field.
@@ -121,7 +144,10 @@ func NewServer(laddr string, opt ...grpc.ServerOption) (*Server, error) {
121144
if err != nil {
122145
return nil, err
123146
}
147+
return newServer(l, true, opt...), nil
148+
}
124149

150+
func newServer(l net.Listener, ownsListener bool, opt ...grpc.ServerOption) *Server {
125151
s := &Server{
126152
Addr: l.Addr().String(),
127153
l: l,
@@ -130,14 +156,29 @@ func NewServer(laddr string, opt ...grpc.ServerOption) (*Server, error) {
130156
tables: make(map[string]*table),
131157
instances: make(map[string]*btapb.Instance),
132158
},
159+
ownsListener: ownsListener,
133160
}
134161
btapb.RegisterBigtableInstanceAdminServer(s.srv, s.s)
135162
btapb.RegisterBigtableTableAdminServer(s.srv, s.s)
136163
btpb.RegisterBigtableServer(s.srv, s.s)
137164

138-
go s.srv.Serve(s.l)
165+
listenerForGRPC := l
166+
if !ownsListener {
167+
// If the user owns the listener, wrap it so srv.Stop() doesn't close it.
168+
listenerForGRPC = &noopCloserListener{Listener: l}
169+
}
170+
go s.srv.Serve(listenerForGRPC)
171+
return s
172+
}
139173

140-
return s, nil
174+
// NewServerWithListener creates a new Server using the provided listener.
175+
// The Addr field of the returned Server will be the listener's address.
176+
//
177+
// The caller is responsible for closing the listener. The server's Close method
178+
// will not close the provided listener, nor will it clean up any underlying
179+
// resources like unix socket files.
180+
func NewServerWithListener(l net.Listener, opt ...grpc.ServerOption) (*Server, error) {
181+
return newServer(l, false, opt...), nil
141182
}
142183

143184
// Close shuts down the server.
@@ -149,10 +190,12 @@ func (s *Server) Close() {
149190
s.s.mu.Unlock()
150191

151192
s.srv.Stop()
152-
s.l.Close()
193+
if s.ownsListener {
194+
s.l.Close()
195+
}
153196

154197
// clean up unix socket
155-
if strings.Contains(s.Addr, "/") {
198+
if s.ownsListener && strings.Contains(s.Addr, "/") {
156199
_ = os.Remove(s.Addr)
157200
}
158201
}

bigtable/bttest/inmem_test.go

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,12 @@ import (
1919
"context"
2020
"encoding/base64"
2121
"encoding/binary"
22+
"errors"
2223
"fmt"
2324
"math"
2425
"math/rand"
26+
"net"
27+
"os"
2528
"sort"
2629
"strconv"
2730
"sync"
@@ -46,6 +49,87 @@ import (
4649
"google.golang.org/protobuf/types/known/wrapperspb"
4750
)
4851

52+
type closeSpyListener struct {
53+
net.Listener
54+
closed bool
55+
}
56+
57+
func (s *closeSpyListener) Close() error {
58+
s.closed = true
59+
return s.Listener.Close()
60+
}
61+
62+
func TestNewServer(t *testing.T) {
63+
t.Run("TCP", func(t *testing.T) {
64+
srv, err := NewServer("localhost:0")
65+
if err != nil {
66+
t.Fatalf("NewServer() error = %v", err)
67+
}
68+
if srv == nil {
69+
t.Fatal("NewServer() returned nil server")
70+
}
71+
defer srv.Close()
72+
if srv.Addr == "" {
73+
t.Error("NewServer() returned server with empty Addr")
74+
}
75+
})
76+
77+
t.Run("Unix", func(t *testing.T) {
78+
tmpDir := t.TempDir()
79+
sockPath := tmpDir + "/bttest.sock"
80+
srv, err := NewServer(sockPath)
81+
if err != nil {
82+
t.Fatalf("NewServer() error = %v", err)
83+
}
84+
if srv == nil {
85+
t.Fatal("NewServer() returned nil server")
86+
}
87+
88+
if srv.Addr != sockPath {
89+
t.Errorf("srv.Addr got %q, want %q", srv.Addr, sockPath)
90+
}
91+
92+
srv.Close()
93+
94+
// Check that the unix socket file was removed.
95+
if _, err := os.Stat(sockPath); !os.IsNotExist(err) {
96+
t.Errorf("socket file %q was not removed after Close()", sockPath)
97+
}
98+
})
99+
100+
t.Run("WithListener", func(t *testing.T) {
101+
l, err := net.Listen("tcp", "localhost:0")
102+
if err != nil {
103+
t.Fatalf("net.Listen() error = %v", err)
104+
}
105+
106+
// Create a spy that can tell us if its Close method was called.
107+
spy := &closeSpyListener{Listener: l}
108+
srv, err := NewServerWithListener(spy) // This should not call spy.Close().
109+
if err != nil {
110+
l.Close()
111+
t.Fatalf("NewServerWithListener() error = %v", err)
112+
}
113+
114+
srv.Close() // This should NOT call the Close method on our spy.
115+
if spy.closed {
116+
t.Error("Listener was closed by the server, but it should not have been.")
117+
}
118+
119+
// Clean up the real listener now that the test is done.
120+
l.Close()
121+
122+
// Validate that the listener is now actually closed.
123+
_, err = l.Accept()
124+
if err == nil {
125+
t.Fatal("l.Accept() should have failed for a closed listener, but it did not")
126+
}
127+
if !errors.Is(err, net.ErrClosed) {
128+
t.Errorf("Expected net.ErrClosed, but got a different error: %v", err)
129+
}
130+
})
131+
}
132+
49133
func TestConcurrentMutationsReadModifyAndGC(t *testing.T) {
50134
s := &server{
51135
tables: make(map[string]*table),

0 commit comments

Comments
 (0)