pool: Consider waiting in-flight request be finished before closing connection during rebalance switch #254

Closed
opened 2024-08-15 12:32:17 +00:00 by dkirillov · 0 comments
Member

Currently when we mark client as unhealthy we close during rebalance switch that lead to all current in-flight request on this "unhealthy" (sometime we mark node by mistake #253) client be failed with error:

init writing on API client s01.frostfs.devenv:8080: client failure: rpc error: code = Canceled desc = grpc: the client connection is closing

see test for example


type sleepReader struct {
	r *bytes.Reader
}

func (s *sleepReader) Read(p []byte) (int, error) {
	fmt.Println("reader sleep")
	time.Sleep(2 * time.Second)

	return s.r.Read(p)
}

func TestPool(t *testing.T) {
	ctx := context.Background()
	devenvKey := "1dd37fba80fec4e6a6f13fd708d8dcb3b29def768017052f6c930fa1c5d90bbb"
	key, err := keys.NewPrivateKeyFromHex(devenvKey)
	require.NoError(t, err)
	var devenvOwner user.ID
	user.IDFromKey(&devenvOwner, key.PrivateKey.PublicKey)

	var prm InitParameters
	prm.SetKey(&key.PrivateKey)
	prm.SetLogger(zaptest.NewLogger(t))
	prm.SetErrorThreshold(5)
	prm.SetNodeDialTimeout(5 * time.Second)
	prm.SetClientRebalanceInterval(5 * time.Second)
	prm.AddNode(NewNodeParam(1, "s01.frostfs.devenv:8080", 1))
	prm.AddNode(NewNodeParam(2, "s02.frostfs.devenv:8080", 1))
	clientPool, err := NewPool(prm)
	require.NoError(t, err)
	err = clientPool.Dial(ctx)
	require.NoError(t, err)

	var cnrID cid.ID
	err = cnrID.DecodeString("DRYh5v5retURwgDnafoEECA2eThBSBLnAazQH62etxfH")
	require.NoError(t, err)

	var objID oid.ID
	err = objID.DecodeString("DRYh5v5retURwgDnafoEECA2eThBSBLnAazQH62etxfH")
	require.NoError(t, err)

	go func() {
		obj := object.New()
		obj.SetContainerID(cnrID)
		obj.SetOwnerID(devenvOwner)

		sr := &sleepReader{r: bytes.NewReader(make([]byte, 1044))}

		var prmPut PrmObjectPut
		prmPut.SetHeader(*obj)
		prmPut.SetPayload(sr)
		prmPut.SetBufferMaxSize(1024)

		fmt.Println(clientPool.PutObject(ctx, prmPut))
	}()

	var addr oid.Address
	addr.SetContainer(cnrID)
	addr.SetObject(objID)

	for i := 0; i < 10; i++ {
		time.Sleep(300 * time.Millisecond)
		var prm PrmObjectHead
		prm.SetAddress(addr)
		_, err = clientPool.HeadObject(ctx, prm)
		fmt.Println(err)
	}

	time.Sleep(10 * time.Second)
}

in order the test leads to mentioned error you can use diff

diff --git a/pool/pool.go b/pool/pool.go
index f9e3dae..4d24a88 100644
--- a/pool/pool.go
+++ b/pool/pool.go
@@ -1232,6 +1232,7 @@ func (c *clientStatusMonitor) handleError(ctx context.Context, st apistatus.Stat
 		case *apistatus.ServerInternal,
 			*apistatus.WrongMagicNumber,
 			*apistatus.SignatureVerification,
+			*apistatus.ObjectNotFound,
 			*apistatus.NodeUnderMaintenance:
 			c.incErrorRate()
 		}

Describe the solution you'd like

We can delay closing old client in restartIfUnhealthy to give some time for graceful requests shutdown.

Describe alternatives you've considered

  • Don't care about in-flight requests during rebalance switch
  • Don't close (and redial) once dialed connection
  • Somehow count/monitor current infligh requests and don't redial until all request finished

Additional context

No

## Is your feature request related to a problem? Please describe. Currently when we mark client as unhealthy we close during rebalance switch that lead to all current in-flight request on this "unhealthy" (sometime we mark node by mistake #253) client be failed with error: ``` init writing on API client s01.frostfs.devenv:8080: client failure: rpc error: code = Canceled desc = grpc: the client connection is closing ``` see test for example ```golang type sleepReader struct { r *bytes.Reader } func (s *sleepReader) Read(p []byte) (int, error) { fmt.Println("reader sleep") time.Sleep(2 * time.Second) return s.r.Read(p) } func TestPool(t *testing.T) { ctx := context.Background() devenvKey := "1dd37fba80fec4e6a6f13fd708d8dcb3b29def768017052f6c930fa1c5d90bbb" key, err := keys.NewPrivateKeyFromHex(devenvKey) require.NoError(t, err) var devenvOwner user.ID user.IDFromKey(&devenvOwner, key.PrivateKey.PublicKey) var prm InitParameters prm.SetKey(&key.PrivateKey) prm.SetLogger(zaptest.NewLogger(t)) prm.SetErrorThreshold(5) prm.SetNodeDialTimeout(5 * time.Second) prm.SetClientRebalanceInterval(5 * time.Second) prm.AddNode(NewNodeParam(1, "s01.frostfs.devenv:8080", 1)) prm.AddNode(NewNodeParam(2, "s02.frostfs.devenv:8080", 1)) clientPool, err := NewPool(prm) require.NoError(t, err) err = clientPool.Dial(ctx) require.NoError(t, err) var cnrID cid.ID err = cnrID.DecodeString("DRYh5v5retURwgDnafoEECA2eThBSBLnAazQH62etxfH") require.NoError(t, err) var objID oid.ID err = objID.DecodeString("DRYh5v5retURwgDnafoEECA2eThBSBLnAazQH62etxfH") require.NoError(t, err) go func() { obj := object.New() obj.SetContainerID(cnrID) obj.SetOwnerID(devenvOwner) sr := &sleepReader{r: bytes.NewReader(make([]byte, 1044))} var prmPut PrmObjectPut prmPut.SetHeader(*obj) prmPut.SetPayload(sr) prmPut.SetBufferMaxSize(1024) fmt.Println(clientPool.PutObject(ctx, prmPut)) }() var addr oid.Address addr.SetContainer(cnrID) addr.SetObject(objID) for i := 0; i < 10; i++ { time.Sleep(300 * time.Millisecond) var prm PrmObjectHead prm.SetAddress(addr) _, err = clientPool.HeadObject(ctx, prm) fmt.Println(err) } time.Sleep(10 * time.Second) } ``` in order the test leads to mentioned error you can use diff ```diff diff --git a/pool/pool.go b/pool/pool.go index f9e3dae..4d24a88 100644 --- a/pool/pool.go +++ b/pool/pool.go @@ -1232,6 +1232,7 @@ func (c *clientStatusMonitor) handleError(ctx context.Context, st apistatus.Stat case *apistatus.ServerInternal, *apistatus.WrongMagicNumber, *apistatus.SignatureVerification, + *apistatus.ObjectNotFound, *apistatus.NodeUnderMaintenance: c.incErrorRate() } ``` ## Describe the solution you'd like We can delay closing old client in [restartIfUnhealthy](https://git.frostfs.info/TrueCloudLab/frostfs-sdk-go/src/commit/203bba65a0b33cf1ba3751b6b676b20cd1046119/pool/pool.go#L365) to give some time for graceful requests shutdown. ## Describe alternatives you've considered * Don't care about in-flight requests during rebalance switch * Don't close (and redial) once dialed connection * Somehow count/monitor current infligh requests and don't redial until all request finished ## Additional context No
dkirillov self-assigned this 2024-08-19 13:08:24 +00:00
Sign in to join this conversation.
No milestone
No project
No assignees
1 participant
Notifications
Due date
The due date is invalid or out of range. Please use the format "yyyy-mm-dd".

No due date set.

Dependencies

No dependencies set.

Reference: TrueCloudLab/frostfs-sdk-go#254
No description provided.