/*
 *
 * Copyright 2018 gRPC authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *
 */

package test

import (
	"context"
	"fmt"
	"io"
	"net"
	"reflect"
	"strconv"
	"strings"
	"sync"
	"testing"
	"time"

	"github.com/golang/protobuf/proto"
	"google.golang.org/grpc"
	"google.golang.org/grpc/codes"
	"google.golang.org/grpc/credentials/insecure"
	"google.golang.org/grpc/internal/grpcsync"
	"google.golang.org/grpc/internal/stubserver"
	"google.golang.org/grpc/metadata"
	"google.golang.org/grpc/stats"
	"google.golang.org/grpc/status"
	testpb "google.golang.org/grpc/test/grpc_testing"
)

func (s) TestRetryUnary(t *testing.T) {
	i := -1
	ss := &stubserver.StubServer{
		EmptyCallF: func(context.Context, *testpb.Empty) (r *testpb.Empty, err error) {
			defer func() { t.Logf("server call %v returning err %v", i, err) }()
			i++
			switch i {
			case 0, 2, 5:
				return &testpb.Empty{}, nil
			case 6, 8, 11:
				return nil, status.New(codes.Internal, "non-retryable error").Err()
			}
			return nil, status.New(codes.AlreadyExists, "retryable error").Err()
		},
	}
	if err := ss.Start([]grpc.ServerOption{},
		grpc.WithDefaultServiceConfig(`{
    "methodConfig": [{
      "name": [{"service": "grpc.testing.TestService"}],
      "waitForReady": true,
      "retryPolicy": {
        "MaxAttempts": 4,
        "InitialBackoff": ".01s",
        "MaxBackoff": ".01s",
        "BackoffMultiplier": 1.0,
        "RetryableStatusCodes": [ "ALREADY_EXISTS" ]
      }
    }]}`)); err != nil {
		t.Fatalf("Error starting endpoint server: %v", err)
	}
	defer ss.Stop()

	testCases := []struct {
		code  codes.Code
		count int
	}{
		{codes.OK, 0},
		{codes.OK, 2},
		{codes.OK, 5},
		{codes.Internal, 6},
		{codes.Internal, 8},
		{codes.Internal, 11},
		{codes.AlreadyExists, 15},
	}
	for num, tc := range testCases {
		t.Log("Case", num)
		ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
		_, err := ss.Client.EmptyCall(ctx, &testpb.Empty{})
		cancel()
		if status.Code(err) != tc.code {
			t.Fatalf("EmptyCall(_, _) = _, %v; want _, ", err, tc.code)
		}
		if i != tc.count {
			t.Fatalf("i = %v; want %v", i, tc.count)
		}
	}
}

func (s) TestRetryThrottling(t *testing.T) {
	i := -1
	ss := &stubserver.StubServer{
		EmptyCallF: func(context.Context, *testpb.Empty) (*testpb.Empty, error) {
			i++
			switch i {
			case 0, 3, 6, 10, 11, 12, 13, 14, 16, 18:
				return &testpb.Empty{}, nil
			}
			return nil, status.New(codes.Unavailable, "retryable error").Err()
		},
	}
	if err := ss.Start([]grpc.ServerOption{},
		grpc.WithDefaultServiceConfig(`{
    "methodConfig": [{
      "name": [{"service": "grpc.testing.TestService"}],
      "waitForReady": true,
      "retryPolicy": {
        "MaxAttempts": 4,
        "InitialBackoff": ".01s",
        "MaxBackoff": ".01s",
        "BackoffMultiplier": 1.0,
        "RetryableStatusCodes": [ "UNAVAILABLE" ]
      }
    }],
    "retryThrottling": {
      "maxTokens": 10,
      "tokenRatio": 0.5
    }
    }`)); err != nil {
		t.Fatalf("Error starting endpoint server: %v", err)
	}
	defer ss.Stop()

	testCases := []struct {
		code  codes.Code
		count int
	}{
		{codes.OK, 0},           // tokens = 10
		{codes.OK, 3},           // tokens = 8.5 (10 - 2 failures + 0.5 success)
		{codes.OK, 6},           // tokens = 6
		{codes.Unavailable, 8},  // tokens = 5 -- first attempt is retried; second aborted.
		{codes.Unavailable, 9},  // tokens = 4
		{codes.OK, 10},          // tokens = 4.5
		{codes.OK, 11},          // tokens = 5
		{codes.OK, 12},          // tokens = 5.5
		{codes.OK, 13},          // tokens = 6
		{codes.OK, 14},          // tokens = 6.5
		{codes.OK, 16},          // tokens = 5.5
		{codes.Unavailable, 17}, // tokens = 4.5
	}
	for _, tc := range testCases {
		ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
		_, err := ss.Client.EmptyCall(ctx, &testpb.Empty{})
		cancel()
		if status.Code(err) != tc.code {
			t.Errorf("EmptyCall(_, _) = _, %v; want _, ", err, tc.code)
		}
		if i != tc.count {
			t.Errorf("i = %v; want %v", i, tc.count)
		}
	}
}

func (s) TestRetryStreaming(t *testing.T) {
	req := func(b byte) *testpb.StreamingOutputCallRequest {
		return &testpb.StreamingOutputCallRequest{Payload: &testpb.Payload{Body: []byte{b}}}
	}
	res := func(b byte) *testpb.StreamingOutputCallResponse {
		return &testpb.StreamingOutputCallResponse{Payload: &testpb.Payload{Body: []byte{b}}}
	}

	largePayload, _ := newPayload(testpb.PayloadType_COMPRESSABLE, 500)

	type serverOp func(stream testpb.TestService_FullDuplexCallServer) error
	type clientOp func(stream testpb.TestService_FullDuplexCallClient) error

	// Server Operations
	sAttempts := func(n int) serverOp {
		return func(stream testpb.TestService_FullDuplexCallServer) error {
			const key = "grpc-previous-rpc-attempts"
			md, ok := metadata.FromIncomingContext(stream.Context())
			if !ok {
				return status.Errorf(codes.Internal, "server: no header metadata received")
			}
			if got := md[key]; len(got) != 1 || got[0] != strconv.Itoa(n) {
				return status.Errorf(codes.Internal, "server: metadata = %v; want ", md, key, n)
			}
			return nil
		}
	}
	sReq := func(b byte) serverOp {
		return func(stream testpb.TestService_FullDuplexCallServer) error {
			want := req(b)
			if got, err := stream.Recv(); err != nil || !proto.Equal(got, want) {
				return status.Errorf(codes.Internal, "server: Recv() = %v, %v; want %v, ", got, err, want)
			}
			return nil
		}
	}
	sReqPayload := func(p *testpb.Payload) serverOp {
		return func(stream testpb.TestService_FullDuplexCallServer) error {
			want := &testpb.StreamingOutputCallRequest{Payload: p}
			if got, err := stream.Recv(); err != nil || !proto.Equal(got, want) {
				return status.Errorf(codes.Internal, "server: Recv() = %v, %v; want %v, ", got, err, want)
			}
			return nil
		}
	}
	sRes := func(b byte) serverOp {
		return func(stream testpb.TestService_FullDuplexCallServer) error {
			msg := res(b)
			if err := stream.Send(msg); err != nil {
				return status.Errorf(codes.Internal, "server: Send(%v) = %v; want ", msg, err)
			}
			return nil
		}
	}
	sErr := func(c codes.Code) serverOp {
		return func(stream testpb.TestService_FullDuplexCallServer) error {
			return status.New(c, "").Err()
		}
	}
	sCloseSend := func() serverOp {
		return func(stream testpb.TestService_FullDuplexCallServer) error {
			if msg, err := stream.Recv(); msg != nil || err != io.EOF {
				return status.Errorf(codes.Internal, "server: Recv() = %v, %v; want , io.EOF", msg, err)
			}
			return nil
		}
	}
	sPushback := func(s string) serverOp {
		return func(stream testpb.TestService_FullDuplexCallServer) error {
			stream.SetTrailer(metadata.MD{"grpc-retry-pushback-ms": []string{s}})
			return nil
		}
	}

	// Client Operations
	cReq := func(b byte) clientOp {
		return func(stream testpb.TestService_FullDuplexCallClient) error {
			msg := req(b)
			if err := stream.Send(msg); err != nil {
				return fmt.Errorf("client: Send(%v) = %v; want ", msg, err)
			}
			return nil
		}
	}
	cReqPayload := func(p *testpb.Payload) clientOp {
		return func(stream testpb.TestService_FullDuplexCallClient) error {
			msg := &testpb.StreamingOutputCallRequest{Payload: p}
			if err := stream.Send(msg); err != nil {
				return fmt.Errorf("client: Send(%v) = %v; want ", msg, err)
			}
			return nil
		}
	}
	cRes := func(b byte) clientOp {
		return func(stream testpb.TestService_FullDuplexCallClient) error {
			want := res(b)
			if got, err := stream.Recv(); err != nil || !proto.Equal(got, want) {
				return fmt.Errorf("client: Recv() = %v, %v; want %v, ", got, err, want)
			}
			return nil
		}
	}
	cErr := func(c codes.Code) clientOp {
		return func(stream testpb.TestService_FullDuplexCallClient) error {
			want := status.New(c, "").Err()
			if c == codes.OK {
				want = io.EOF
			}
			res, err := stream.Recv()
			if res != nil ||
				((err == nil) != (want == nil)) ||
				(want != nil && err.Error() != want.Error()) {
				return fmt.Errorf("client: Recv() = %v, %v; want , %v", res, err, want)
			}
			return nil
		}
	}
	cCloseSend := func() clientOp {
		return func(stream testpb.TestService_FullDuplexCallClient) error {
			if err := stream.CloseSend(); err != nil {
				return fmt.Errorf("client: CloseSend() = %v; want ", err)
			}
			return nil
		}
	}
	var curTime time.Time
	cGetTime := func() clientOp {
		return func(_ testpb.TestService_FullDuplexCallClient) error {
			curTime = time.Now()
			return nil
		}
	}
	cCheckElapsed := func(d time.Duration) clientOp {
		return func(_ testpb.TestService_FullDuplexCallClient) error {
			if elapsed := time.Since(curTime); elapsed < d {
				return fmt.Errorf("elapsed time: %v; want >= %v", elapsed, d)
			}
			return nil
		}
	}
	cHdr := func() clientOp {
		return func(stream testpb.TestService_FullDuplexCallClient) error {
			_, err := stream.Header()
			return err
		}
	}
	cCtx := func() clientOp {
		return func(stream testpb.TestService_FullDuplexCallClient) error {
			stream.Context()
			return nil
		}
	}

	testCases := []struct {
		desc      string
		serverOps []serverOp
		clientOps []clientOp
	}{{
		desc:      "Non-retryable error code",
		serverOps: []serverOp{sReq(1), sErr(codes.Internal)},
		clientOps: []clientOp{cReq(1), cErr(codes.Internal)},
	}, {
		desc:      "One retry necessary",
		serverOps: []serverOp{sReq(1), sErr(codes.Unavailable), sReq(1), sAttempts(1), sRes(1)},
		clientOps: []clientOp{cReq(1), cRes(1), cErr(codes.OK)},
	}, {
		desc: "Exceed max attempts (4); check attempts header on server",
		serverOps: []serverOp{
			sReq(1), sErr(codes.Unavailable),
			sReq(1), sAttempts(1), sErr(codes.Unavailable),
			sAttempts(2), sReq(1), sErr(codes.Unavailable),
			sAttempts(3), sReq(1), sErr(codes.Unavailable),
		},
		clientOps: []clientOp{cReq(1), cErr(codes.Unavailable)},
	}, {
		desc: "Multiple requests",
		serverOps: []serverOp{
			sReq(1), sReq(2), sErr(codes.Unavailable),
			sReq(1), sReq(2), sRes(5),
		},
		clientOps: []clientOp{cReq(1), cReq(2), cRes(5), cErr(codes.OK)},
	}, {
		desc: "Multiple successive requests",
		serverOps: []serverOp{
			sReq(1), sErr(codes.Unavailable),
			sReq(1), sReq(2), sErr(codes.Unavailable),
			sReq(1), sReq(2), sReq(3), sRes(5),
		},
		clientOps: []clientOp{cReq(1), cReq(2), cReq(3), cRes(5), cErr(codes.OK)},
	}, {
		desc: "No retry after receiving",
		serverOps: []serverOp{
			sReq(1), sErr(codes.Unavailable),
			sReq(1), sRes(3), sErr(codes.Unavailable),
		},
		clientOps: []clientOp{cReq(1), cRes(3), cErr(codes.Unavailable)},
	}, {
		desc:      "No retry after header",
		serverOps: []serverOp{sReq(1), sErr(codes.Unavailable)},
		clientOps: []clientOp{cReq(1), cHdr(), cErr(codes.Unavailable)},
	}, {
		desc:      "No retry after context",
		serverOps: []serverOp{sReq(1), sErr(codes.Unavailable)},
		clientOps: []clientOp{cReq(1), cCtx(), cErr(codes.Unavailable)},
	}, {
		desc: "Replaying close send",
		serverOps: []serverOp{
			sReq(1), sReq(2), sCloseSend(), sErr(codes.Unavailable),
			sReq(1), sReq(2), sCloseSend(), sRes(1), sRes(3), sRes(5),
		},
		clientOps: []clientOp{cReq(1), cReq(2), cCloseSend(), cRes(1), cRes(3), cRes(5), cErr(codes.OK)},
	}, {
		desc:      "Negative server pushback - no retry",
		serverOps: []serverOp{sReq(1), sPushback("-1"), sErr(codes.Unavailable)},
		clientOps: []clientOp{cReq(1), cErr(codes.Unavailable)},
	}, {
		desc:      "Non-numeric server pushback - no retry",
		serverOps: []serverOp{sReq(1), sPushback("xxx"), sErr(codes.Unavailable)},
		clientOps: []clientOp{cReq(1), cErr(codes.Unavailable)},
	}, {
		desc:      "Multiple server pushback values - no retry",
		serverOps: []serverOp{sReq(1), sPushback("100"), sPushback("10"), sErr(codes.Unavailable)},
		clientOps: []clientOp{cReq(1), cErr(codes.Unavailable)},
	}, {
		desc:      "1s server pushback - delayed retry",
		serverOps: []serverOp{sReq(1), sPushback("1000"), sErr(codes.Unavailable), sReq(1), sRes(2)},
		clientOps: []clientOp{cGetTime(), cReq(1), cRes(2), cCheckElapsed(time.Second), cErr(codes.OK)},
	}, {
		desc:      "Overflowing buffer - no retry",
		serverOps: []serverOp{sReqPayload(largePayload), sErr(codes.Unavailable)},
		clientOps: []clientOp{cReqPayload(largePayload), cErr(codes.Unavailable)},
	}}

	var serverOpIter int
	var serverOps []serverOp
	ss := &stubserver.StubServer{
		FullDuplexCallF: func(stream testpb.TestService_FullDuplexCallServer) error {
			for serverOpIter < len(serverOps) {
				op := serverOps[serverOpIter]
				serverOpIter++
				if err := op(stream); err != nil {
					return err
				}
			}
			return nil
		},
	}
	if err := ss.Start([]grpc.ServerOption{}, grpc.WithDefaultCallOptions(grpc.MaxRetryRPCBufferSize(200)),
		grpc.WithDefaultServiceConfig(`{
    "methodConfig": [{
      "name": [{"service": "grpc.testing.TestService"}],
      "waitForReady": true,
      "retryPolicy": {
          "MaxAttempts": 4,
          "InitialBackoff": ".01s",
          "MaxBackoff": ".01s",
          "BackoffMultiplier": 1.0,
          "RetryableStatusCodes": [ "UNAVAILABLE" ]
      }
    }]}`)); err != nil {
		t.Fatalf("Error starting endpoint server: %v", err)
	}
	defer ss.Stop()
	ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
	for {
		if ctx.Err() != nil {
			t.Fatalf("Timed out waiting for service config update")
		}
		if ss.CC.GetMethodConfig("/grpc.testing.TestService/FullDuplexCall").WaitForReady != nil {
			break
		}
		time.Sleep(time.Millisecond)
	}
	cancel()

	for _, tc := range testCases {
		func() {
			serverOpIter = 0
			serverOps = tc.serverOps

			ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
			defer cancel()
			stream, err := ss.Client.FullDuplexCall(ctx)
			if err != nil {
				t.Fatalf("%v: Error while creating stream: %v", tc.desc, err)
			}
			for _, op := range tc.clientOps {
				if err := op(stream); err != nil {
					t.Errorf("%v: %v", tc.desc, err)
					break
				}
			}
			if serverOpIter != len(serverOps) {
				t.Errorf("%v: serverOpIter = %v; want %v", tc.desc, serverOpIter, len(serverOps))
			}
		}()
	}
}

type retryStatsHandler struct {
	mu sync.Mutex
	s  []stats.RPCStats
}

func (*retryStatsHandler) TagRPC(ctx context.Context, _ *stats.RPCTagInfo) context.Context {
	return ctx
}
func (h *retryStatsHandler) HandleRPC(_ context.Context, s stats.RPCStats) {
	h.mu.Lock()
	h.s = append(h.s, s)
	h.mu.Unlock()
}
func (*retryStatsHandler) TagConn(ctx context.Context, _ *stats.ConnTagInfo) context.Context {
	return ctx
}
func (*retryStatsHandler) HandleConn(context.Context, stats.ConnStats) {}

func (s) TestRetryStats(t *testing.T) {
	lis, err := net.Listen("tcp", "localhost:0")
	if err != nil {
		t.Fatalf("Failed to listen. Err: %v", err)
	}
	defer lis.Close()
	server := &httpServer{
		waitForEndStream: true,
		responses: []httpServerResponse{{
			trailers: [][]string{{
				":status", "200",
				"content-type", "application/grpc",
				"grpc-status", "14", // UNAVAILABLE
				"grpc-message", "unavailable retry",
				"grpc-retry-pushback-ms", "10",
			}},
		}, {
			headers: [][]string{{
				":status", "200",
				"content-type", "application/grpc",
			}},
			payload: []byte{0, 0, 0, 0, 0}, // header for 0-byte response message.
			trailers: [][]string{{
				"grpc-status", "0", // OK
			}},
		}},
		refuseStream: func(i uint32) bool {
			return i == 1
		},
	}
	server.start(t, lis)
	handler := &retryStatsHandler{}
	cc, err := grpc.Dial(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithStatsHandler(handler),
		grpc.WithDefaultServiceConfig((`{
    "methodConfig": [{
      "name": [{"service": "grpc.testing.TestService"}],
      "retryPolicy": {
          "MaxAttempts": 4,
          "InitialBackoff": ".01s",
          "MaxBackoff": ".01s",
          "BackoffMultiplier": 1.0,
          "RetryableStatusCodes": [ "UNAVAILABLE" ]
      }
    }]}`)))
	if err != nil {
		t.Fatalf("failed to dial due to err: %v", err)
	}
	defer cc.Close()

	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
	defer cancel()

	client := testpb.NewTestServiceClient(cc)

	if _, err := client.EmptyCall(ctx, &testpb.Empty{}); err != nil {
		t.Fatalf("unexpected EmptyCall error: %v", err)
	}
	handler.mu.Lock()
	want := []stats.RPCStats{
		&stats.Begin{},
		&stats.OutHeader{FullMethod: "/grpc.testing.TestService/EmptyCall"},
		&stats.OutPayload{WireLength: 5},
		&stats.End{},

		&stats.Begin{IsTransparentRetryAttempt: true},
		&stats.OutHeader{FullMethod: "/grpc.testing.TestService/EmptyCall"},
		&stats.OutPayload{WireLength: 5},
		&stats.InTrailer{Trailer: metadata.Pairs("content-type", "application/grpc", "grpc-retry-pushback-ms", "10")},
		&stats.End{},

		&stats.Begin{},
		&stats.OutHeader{FullMethod: "/grpc.testing.TestService/EmptyCall"},
		&stats.OutPayload{WireLength: 5},
		&stats.InHeader{},
		&stats.InPayload{WireLength: 5},
		&stats.InTrailer{},
		&stats.End{},
	}

	toString := func(ss []stats.RPCStats) (ret []string) {
		for _, s := range ss {
			ret = append(ret, fmt.Sprintf("%T - %v", s, s))
		}
		return ret
	}
	t.Logf("Handler received frames:\n%v\n---\nwant:\n%v\n",
		strings.Join(toString(handler.s), "\n"),
		strings.Join(toString(want), "\n"))

	if len(handler.s) != len(want) {
		t.Fatalf("received unexpected number of RPCStats: got %v; want %v", len(handler.s), len(want))
	}

	// There is a race between receiving the payload (triggered by the
	// application / gRPC library) and receiving the trailer (triggered at the
	// transport layer).  Adjust the received stats accordingly if necessary.
	const tIdx, pIdx = 13, 14
	_, okT := handler.s[tIdx].(*stats.InTrailer)
	_, okP := handler.s[pIdx].(*stats.InPayload)
	if okT && okP {
		handler.s[pIdx], handler.s[tIdx] = handler.s[tIdx], handler.s[pIdx]
	}

	for i := range handler.s {
		w, s := want[i], handler.s[i]

		// Validate the event type
		if reflect.TypeOf(w) != reflect.TypeOf(s) {
			t.Fatalf("at position %v: got %T; want %T", i, s, w)
		}
		wv, sv := reflect.ValueOf(w).Elem(), reflect.ValueOf(s).Elem()

		// Validate that Client is always true
		if sv.FieldByName("Client").Interface().(bool) != true {
			t.Fatalf("at position %v: got Client=false; want true", i)
		}

		// Validate any set fields in want
		for i := 0; i < wv.NumField(); i++ {
			if !wv.Field(i).IsZero() {
				if got, want := sv.Field(i).Interface(), wv.Field(i).Interface(); !reflect.DeepEqual(got, want) {
					name := reflect.TypeOf(w).Elem().Field(i).Name
					t.Fatalf("at position %v, field %v: got %v; want %v", i, name, got, want)
				}
			}
		}

		// Since the above only tests non-zero-value fields, test
		// IsTransparentRetryAttempt=false explicitly when needed.
		if wb, ok := w.(*stats.Begin); ok && !wb.IsTransparentRetryAttempt {
			if s.(*stats.Begin).IsTransparentRetryAttempt {
				t.Fatalf("at position %v: got IsTransparentRetryAttempt=true; want false", i)
			}
		}
	}

	// Validate timings between last Begin and preceding End.
	end := handler.s[8].(*stats.End)
	begin := handler.s[9].(*stats.Begin)
	diff := begin.BeginTime.Sub(end.EndTime)
	if diff < 10*time.Millisecond || diff > 50*time.Millisecond {
		t.Fatalf("pushback time before final attempt = %v; want ~10ms", diff)
	}
}

func (s) TestRetryTransparentWhenCommitted(t *testing.T) {
	// With MaxConcurrentStreams=1:
	//
	// 1. Create stream 1 that is retriable.
	// 2. Stream 1 is created and fails with a retriable code.
	// 3. Create dummy stream 2, blocking indefinitely.
	// 4. Stream 1 retries (and blocks until stream 2 finishes)
	// 5. Stream 1 is canceled manually.
	//
	// If there is no bug, the stream is done and errors with CANCELED.  With a bug:
	//
	// 6. Stream 1 has a nil stream (attempt.s).  Operations like CloseSend will panic.

	first := grpcsync.NewEvent()
	ss := &stubserver.StubServer{
		FullDuplexCallF: func(stream testpb.TestService_FullDuplexCallServer) error {
			// signal?
			if !first.HasFired() {
				first.Fire()
				t.Log("returned first error")
				return status.Error(codes.AlreadyExists, "first attempt fails and is retriable")
			}
			t.Log("blocking")
			<-stream.Context().Done()
			return stream.Context().Err()
		},
	}

	if err := ss.Start([]grpc.ServerOption{grpc.MaxConcurrentStreams(1)},
		grpc.WithDefaultServiceConfig(`{
    "methodConfig": [{
      "name": [{"service": "grpc.testing.TestService"}],
      "waitForReady": true,
      "retryPolicy": {
        "MaxAttempts": 2,
        "InitialBackoff": ".1s",
        "MaxBackoff": ".1s",
        "BackoffMultiplier": 1.0,
        "RetryableStatusCodes": [ "ALREADY_EXISTS" ]
      }
    }]}`)); err != nil {
		t.Fatalf("Error starting endpoint server: %v", err)
	}
	defer ss.Stop()

	ctx1, cancel1 := context.WithTimeout(context.Background(), defaultTestTimeout)
	defer cancel1()
	ctx2, cancel2 := context.WithTimeout(context.Background(), defaultTestTimeout)
	defer cancel2()

	stream1, err := ss.Client.FullDuplexCall(ctx1)
	if err != nil {
		t.Fatalf("Error creating stream 1: %v", err)
	}

	// Create dummy stream to block indefinitely.
	_, err = ss.Client.FullDuplexCall(ctx2)
	if err != nil {
		t.Errorf("Error creating stream 2: %v", err)
	}

	stream1Closed := grpcsync.NewEvent()
	go func() {
		_, err := stream1.Recv()
		// Will trigger a retry when it sees the ALREADY_EXISTS error
		if status.Code(err) != codes.Canceled {
			t.Errorf("Expected stream1 to be canceled; got error: %v", err)
		}
		stream1Closed.Fire()
	}()

	// Wait longer than the retry backoff timer.
	time.Sleep(200 * time.Millisecond)
	cancel1()

	// Operations on the stream should not panic.
	<-stream1Closed.Done()
	stream1.CloseSend()
	stream1.Recv()
	stream1.Send(&testpb.StreamingOutputCallRequest{})
}

Related articles

grpc-go route_guide.pb

// Copyright 2015 gRPC authors. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/

grpc-go bufconn_test

/* * * Copyright 2017 gRPC authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/lic

grpc-go resolver_update_test

/* * * Copyright 2022 gRPC authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/lic

grpc-go xds_security_config_nack_test

/* * * Copyright 2021 gRPC authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/lic

grpc-go test

// Copyright 2017 gRPC authors. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/

grpc-go config_selector_test

/* * * Copyright 2020 gRPC authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/lic

grpc-go rawConnWrapper

/* * Copyright 2018 gRPC authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licens

grpc-go data

/* * Copyright 2020 gRPC authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licens

grpc-go compressor_test

/* * * Copyright 2023 gRPC authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/lic

grpc-go client_ca_cert

-----BEGIN CERTIFICATE----- MIIF6jCCA9KgAwIBAgIJAOhoXtjjP6JdMA0GCSqGSIb3DQEBCwUAMFAxCzAJBgNV BAYTAlVTMQswCQYDVQQIDAJDQTEMMAoGA1UEBwwDU1ZMMQ0wCwYDVQQKDARnUlBD MRcwFQYDVQQDDA50ZXN0LWNsaWVudF9jYTAeFw0yMjAzMTgyMTQ0NThaFw0zMjAz MTUyMTQ0NThaMFAxCzAJBgNVBAYTAlV