From f121a730491630980dea0bcf5f63c6b026bbc28e Mon Sep 17 00:00:00 2001 From: ZhangTao1596 Date: Tue, 3 Aug 2021 19:21:04 +0800 Subject: [PATCH] [#737] network/cache: handle request canceled in multiclient Signed-off-by: ZhangTao1596 --- pkg/network/cache/multi.go | 54 ++++++++++++++++++++++---------------- 1 file changed, 31 insertions(+), 23 deletions(-) diff --git a/pkg/network/cache/multi.go b/pkg/network/cache/multi.go index c3d9a94e..86905c58 100644 --- a/pkg/network/cache/multi.go +++ b/pkg/network/cache/multi.go @@ -4,6 +4,7 @@ import ( "context" "crypto/sha256" "crypto/tls" + "errors" "io" "sync" @@ -57,17 +58,24 @@ func (x *multiClient) createForAddress(addr network.Address) client.Client { return c } -func (x *multiClient) iterateClients(f func(client.Client) error) error { +func (x *multiClient) iterateClients(ctx context.Context, f func(client.Client) error) error { var firstErr error x.addr.IterateAddresses(func(addr network.Address) bool { + select { + case <-ctx.Done(): + firstErr = context.Canceled + return true + default: + } + var err error c := x.client(addr) err = f(c) - success := err == nil + success := err == nil || errors.Is(err, context.Canceled) if success || firstErr == nil { firstErr = err @@ -82,7 +90,7 @@ func (x *multiClient) iterateClients(f func(client.Client) error) error { func (x *multiClient) PutObject(ctx context.Context, p *client.PutObjectParams, opts ...client.CallOption) (*object.ID, error) { var res *object.ID - err := x.iterateClients(func(c client.Client) (err error) { + err := x.iterateClients(ctx, func(c client.Client) (err error) { res, err = c.PutObject(ctx, p, opts...) return }) @@ -93,7 +101,7 @@ func (x *multiClient) PutObject(ctx context.Context, p *client.PutObjectParams, func (x *multiClient) GetBalance(ctx context.Context, id *owner.ID, opts ...client.CallOption) (*accounting.Decimal, error) { var res *accounting.Decimal - err := x.iterateClients(func(c client.Client) (err error) { + err := x.iterateClients(ctx, func(c client.Client) (err error) { res, err = c.GetBalance(ctx, id, opts...) return }) @@ -104,7 +112,7 @@ func (x *multiClient) GetBalance(ctx context.Context, id *owner.ID, opts ...clie func (x *multiClient) PutContainer(ctx context.Context, cnr *container.Container, opts ...client.CallOption) (*cid.ID, error) { var res *cid.ID - err := x.iterateClients(func(c client.Client) (err error) { + err := x.iterateClients(ctx, func(c client.Client) (err error) { res, err = c.PutContainer(ctx, cnr, opts...) return }) @@ -115,7 +123,7 @@ func (x *multiClient) PutContainer(ctx context.Context, cnr *container.Container func (x *multiClient) GetContainer(ctx context.Context, id *cid.ID, opts ...client.CallOption) (*container.Container, error) { var res *container.Container - err := x.iterateClients(func(c client.Client) (err error) { + err := x.iterateClients(ctx, func(c client.Client) (err error) { res, err = c.GetContainer(ctx, id, opts...) return }) @@ -126,7 +134,7 @@ func (x *multiClient) GetContainer(ctx context.Context, id *cid.ID, opts ...clie func (x *multiClient) ListContainers(ctx context.Context, id *owner.ID, opts ...client.CallOption) ([]*cid.ID, error) { var res []*cid.ID - err := x.iterateClients(func(c client.Client) (err error) { + err := x.iterateClients(ctx, func(c client.Client) (err error) { res, err = c.ListContainers(ctx, id, opts...) return }) @@ -135,7 +143,7 @@ func (x *multiClient) ListContainers(ctx context.Context, id *owner.ID, opts ... } func (x *multiClient) DeleteContainer(ctx context.Context, id *cid.ID, opts ...client.CallOption) error { - return x.iterateClients(func(c client.Client) error { + return x.iterateClients(ctx, func(c client.Client) error { return c.DeleteContainer(ctx, id, opts...) }) } @@ -143,7 +151,7 @@ func (x *multiClient) DeleteContainer(ctx context.Context, id *cid.ID, opts ...c func (x *multiClient) GetEACL(ctx context.Context, id *cid.ID, opts ...client.CallOption) (*client.EACLWithSignature, error) { var res *client.EACLWithSignature - err := x.iterateClients(func(c client.Client) (err error) { + err := x.iterateClients(ctx, func(c client.Client) (err error) { res, err = c.GetEACL(ctx, id, opts...) return }) @@ -152,13 +160,13 @@ func (x *multiClient) GetEACL(ctx context.Context, id *cid.ID, opts ...client.Ca } func (x *multiClient) SetEACL(ctx context.Context, t *eacl.Table, opts ...client.CallOption) error { - return x.iterateClients(func(c client.Client) error { + return x.iterateClients(ctx, func(c client.Client) error { return c.SetEACL(ctx, t, opts...) }) } func (x *multiClient) AnnounceContainerUsedSpace(ctx context.Context, as []container.UsedSpaceAnnouncement, opts ...client.CallOption) error { - return x.iterateClients(func(c client.Client) error { + return x.iterateClients(ctx, func(c client.Client) error { return c.AnnounceContainerUsedSpace(ctx, as, opts...) }) } @@ -166,7 +174,7 @@ func (x *multiClient) AnnounceContainerUsedSpace(ctx context.Context, as []conta func (x *multiClient) EndpointInfo(ctx context.Context, opts ...client.CallOption) (*client.EndpointInfo, error) { var res *client.EndpointInfo - err := x.iterateClients(func(c client.Client) (err error) { + err := x.iterateClients(ctx, func(c client.Client) (err error) { res, err = c.EndpointInfo(ctx, opts...) return }) @@ -177,7 +185,7 @@ func (x *multiClient) EndpointInfo(ctx context.Context, opts ...client.CallOptio func (x *multiClient) NetworkInfo(ctx context.Context, opts ...client.CallOption) (*netmap.NetworkInfo, error) { var res *netmap.NetworkInfo - err := x.iterateClients(func(c client.Client) (err error) { + err := x.iterateClients(ctx, func(c client.Client) (err error) { res, err = c.NetworkInfo(ctx, opts...) return }) @@ -186,7 +194,7 @@ func (x *multiClient) NetworkInfo(ctx context.Context, opts ...client.CallOption } func (x *multiClient) DeleteObject(ctx context.Context, p *client.DeleteObjectParams, opts ...client.CallOption) error { - return x.iterateClients(func(c client.Client) error { + return x.iterateClients(ctx, func(c client.Client) error { return c.DeleteObject(ctx, p, opts...) }) } @@ -194,7 +202,7 @@ func (x *multiClient) DeleteObject(ctx context.Context, p *client.DeleteObjectPa func (x *multiClient) GetObject(ctx context.Context, p *client.GetObjectParams, opts ...client.CallOption) (*object.Object, error) { var res *object.Object - err := x.iterateClients(func(c client.Client) (err error) { + err := x.iterateClients(ctx, func(c client.Client) (err error) { res, err = c.GetObject(ctx, p, opts...) return }) @@ -205,7 +213,7 @@ func (x *multiClient) GetObject(ctx context.Context, p *client.GetObjectParams, func (x *multiClient) GetObjectHeader(ctx context.Context, p *client.ObjectHeaderParams, opts ...client.CallOption) (*object.Object, error) { var res *object.Object - err := x.iterateClients(func(c client.Client) (err error) { + err := x.iterateClients(ctx, func(c client.Client) (err error) { res, err = c.GetObjectHeader(ctx, p, opts...) return }) @@ -216,7 +224,7 @@ func (x *multiClient) GetObjectHeader(ctx context.Context, p *client.ObjectHeade func (x *multiClient) ObjectPayloadRangeData(ctx context.Context, p *client.RangeDataParams, opts ...client.CallOption) ([]byte, error) { var res []byte - err := x.iterateClients(func(c client.Client) (err error) { + err := x.iterateClients(ctx, func(c client.Client) (err error) { res, err = c.ObjectPayloadRangeData(ctx, p, opts...) return }) @@ -227,7 +235,7 @@ func (x *multiClient) ObjectPayloadRangeData(ctx context.Context, p *client.Rang func (x *multiClient) ObjectPayloadRangeSHA256(ctx context.Context, p *client.RangeChecksumParams, opts ...client.CallOption) ([][sha256.Size]byte, error) { var res [][sha256.Size]byte - err := x.iterateClients(func(c client.Client) (err error) { + err := x.iterateClients(ctx, func(c client.Client) (err error) { res, err = c.ObjectPayloadRangeSHA256(ctx, p, opts...) return }) @@ -238,7 +246,7 @@ func (x *multiClient) ObjectPayloadRangeSHA256(ctx context.Context, p *client.Ra func (x *multiClient) ObjectPayloadRangeTZ(ctx context.Context, p *client.RangeChecksumParams, opts ...client.CallOption) ([][client.TZSize]byte, error) { var res [][client.TZSize]byte - err := x.iterateClients(func(c client.Client) (err error) { + err := x.iterateClients(ctx, func(c client.Client) (err error) { res, err = c.ObjectPayloadRangeTZ(ctx, p, opts...) return }) @@ -249,7 +257,7 @@ func (x *multiClient) ObjectPayloadRangeTZ(ctx context.Context, p *client.RangeC func (x *multiClient) SearchObject(ctx context.Context, p *client.SearchObjectParams, opts ...client.CallOption) ([]*object.ID, error) { var res []*object.ID - err := x.iterateClients(func(c client.Client) (err error) { + err := x.iterateClients(ctx, func(c client.Client) (err error) { res, err = c.SearchObject(ctx, p, opts...) return }) @@ -260,7 +268,7 @@ func (x *multiClient) SearchObject(ctx context.Context, p *client.SearchObjectPa func (x *multiClient) CreateSession(ctx context.Context, exp uint64, opts ...client.CallOption) (*session.Token, error) { var res *session.Token - err := x.iterateClients(func(c client.Client) (err error) { + err := x.iterateClients(ctx, func(c client.Client) (err error) { res, err = c.CreateSession(ctx, exp, opts...) return }) @@ -271,7 +279,7 @@ func (x *multiClient) CreateSession(ctx context.Context, exp uint64, opts ...cli func (x *multiClient) AnnounceLocalTrust(ctx context.Context, p client.AnnounceLocalTrustPrm, opts ...client.CallOption) (*client.AnnounceLocalTrustRes, error) { var res *client.AnnounceLocalTrustRes - err := x.iterateClients(func(c client.Client) (err error) { + err := x.iterateClients(ctx, func(c client.Client) (err error) { res, err = c.AnnounceLocalTrust(ctx, p, opts...) return }) @@ -282,7 +290,7 @@ func (x *multiClient) AnnounceLocalTrust(ctx context.Context, p client.AnnounceL func (x *multiClient) AnnounceIntermediateTrust(ctx context.Context, p client.AnnounceIntermediateTrustPrm, opts ...client.CallOption) (*client.AnnounceIntermediateTrustRes, error) { var res *client.AnnounceIntermediateTrustRes - err := x.iterateClients(func(c client.Client) (err error) { + err := x.iterateClients(ctx, func(c client.Client) (err error) { res, err = c.AnnounceIntermediateTrust(ctx, p, opts...) return })