I have an application (let's call it client) connecting to another process (let's call it server) on the same machine via gRPC. The communication goes over unix socket.
If server is restarted, my client gets an EOF and does not re-establish the connection, although I expected the clientConn to handle the reconnection automatically.
Why isn't the dialer taking care of the reconnection? I expect it to do so with the backoff params I passed.
Below some pseudo-MWE.
Runestablish the initial connection, then spawnsgoroutineOnegoroutineOnewaits for the connection to be ready and delegates thesendtofooUpdaterfooUpdaterstreams the data, or returns in case of errors- for
waitUntilReadyI used the pseudo-code referenced by this answer to get a new stream.
func main() {
go func() {
if err := Run(ctx); err != nil {
log.Errorf("connection error: %v", err)
}
ctxCancel()
}()
// some wait logic
}
func Run(ctx context.Context) {
backoffConfig := backoff.Config{
BaseDelay: time.Duration(1 * time.Second),
Multiplier: backoff.DefaultConfig.Multiplier,
Jitter: backoff.DefaultConfig.Jitter,
MaxDelay: time.Duration(120 * time.Second),
}
myConn, err := grpc.DialContext(ctx,
"/var/run/foo.bar",
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithConnectParams(grpc.ConnectParams{Backoff: backoffConfig, MinConnectTimeout: time.Duration(1 * time.Second)}),
grpc.WithContextDialer(func(ctx context.Context, addr string) (net.Conn, error) {
d := net.Dialer{}
c, err := d.DialContext(ctx, "unix", addr)
if err != nil {
return nil, fmt.Errorf("connection to unix://%s failed: %w", addr, err)
}
return c, nil
}),
)
if err != nil {
return fmt.Errorf("could not establish socket for foo: %w", err)
}
defer myConn.Close()
return goroutineOne()
}
func goroutineOne() {
reconnect := make(chan struct{})
for {
if ready := waitUntilReady(ctx, myConn, time.Duration(2*time.Minute)); !ready {
return fmt.Errorf("myConn: %w, timeout: %s", ErrWaitReadyTimeout, "2m")
}
go func() {
if err := fooUpdater(ctx, dataBuffer, myConn); err != nil {
log.Errorf("foo updater: %v", err)
}
reconnect <- struct{}{}
}()
select {
case <-ctx.Done():
return nil
case <-reconnect:
}
}
}
func fooUpdater(ctx context.Context, dataBuffer custom.CircularBuffer, myConn *grpc.ClientConn) error {
clientStream, err := myConn.Stream(ctx) // custom pb code, returns grpc.ClientConn.NewStream(...)
if err != nil {
return fmt.Errorf("could not obtain stream: %w", err)
}
for {
select {
case <-ctx.Done():
return nil
case data := <-dataBuffer:
if err := clientStream.Send(data); err != nil {
return fmt.Errorf("could not send data: %w", err)
}
}
}
}
func waitUntilReady(ctx context.Context, conn *grpc.ClientConn, maxTimeout time.Duration) bool {
ctx, cancel := context.WithTimeout(ctx, maxTimeout)
defer cancel()
currentState := conn.GetState()
timeoutValid := true
for currentState != connectivity.Ready && timeoutValid {
timeoutValid = conn.WaitForStateChange(ctx, currentState)
currentState = conn.GetState()
// debug print currentState -> prints IDLE
}
return currentState == connectivity.Ready
}
Debugging hints also welcome :)
EOF?waitUntilReadyis executed last, then it returns and I get theconnection errorprint from mymainafter themaxTimeout