diff --git a/pkg/services/object/put/distributed.go b/pkg/services/object/put/distributed.go index 86dc3c2ca..15296f83f 100644 --- a/pkg/services/object/put/distributed.go +++ b/pkg/services/object/put/distributed.go @@ -36,7 +36,7 @@ type distributedTarget struct { isLocalKey func([]byte) bool - relay func(nodeDesc) error + relay func(context.Context, nodeDesc) error fmt *object.FormatValidator @@ -153,7 +153,7 @@ func (t *distributedTarget) Close(ctx context.Context) (*transformer.AccessIdent func (t *distributedTarget) sendObject(ctx context.Context, node nodeDesc) error { if !node.local && t.relay != nil { - return t.relay(node) + return t.relay(ctx, node) } target := t.nodeTargetInitializer(node) diff --git a/pkg/services/object/put/prm.go b/pkg/services/object/put/prm.go index aea5926f4..27d9c9c7a 100644 --- a/pkg/services/object/put/prm.go +++ b/pkg/services/object/put/prm.go @@ -1,6 +1,8 @@ package putsvc import ( + "context" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement" @@ -17,7 +19,7 @@ type PutInitPrm struct { traverseOpts []placement.Option - relay func(client.NodeInfo, client.MultiAddressClient) error + relay func(context.Context, client.NodeInfo, client.MultiAddressClient) error } type PutChunkPrm struct { @@ -40,7 +42,7 @@ func (p *PutInitPrm) WithObject(v *object.Object) *PutInitPrm { return p } -func (p *PutInitPrm) WithRelay(f func(client.NodeInfo, client.MultiAddressClient) error) *PutInitPrm { +func (p *PutInitPrm) WithRelay(f func(context.Context, client.NodeInfo, client.MultiAddressClient) error) *PutInitPrm { if p != nil { p.relay = f } diff --git a/pkg/services/object/put/streamer.go b/pkg/services/object/put/streamer.go index fed161e03..e355990a3 100644 --- a/pkg/services/object/put/streamer.go +++ b/pkg/services/object/put/streamer.go @@ -23,7 +23,7 @@ type Streamer struct { target transformer.ObjectTarget - relay func(client.NodeInfo, client.MultiAddressClient) error + relay func(context.Context, client.NodeInfo, client.MultiAddressClient) error maxPayloadSz uint64 // network config } @@ -197,9 +197,9 @@ func (p *Streamer) preparePrm(prm *PutInitPrm) error { } func (p *Streamer) newCommonTarget(prm *PutInitPrm) transformer.ObjectTarget { - var relay func(nodeDesc) error + var relay func(context.Context, nodeDesc) error if p.relay != nil { - relay = func(node nodeDesc) error { + relay = func(ctx context.Context, node nodeDesc) error { var info client.NodeInfo client.NodeInfoFromNetmapElement(&info, node.info) @@ -209,7 +209,7 @@ func (p *Streamer) newCommonTarget(prm *PutInitPrm) transformer.ObjectTarget { return fmt.Errorf("could not create SDK client %s: %w", info.AddressGroup(), err) } - return p.relay(info, c) + return p.relay(ctx, info, c) } } diff --git a/pkg/services/object/put/v2/streamer.go b/pkg/services/object/put/v2/streamer.go index 65846ea9f..3b8d7b88c 100644 --- a/pkg/services/object/put/v2/streamer.go +++ b/pkg/services/object/put/v2/streamer.go @@ -120,7 +120,7 @@ func (s *streamer) CloseAndRecv(ctx context.Context) (*object.PutResponse, error return fromPutResponse(resp), nil } -func (s *streamer) relayRequest(info client.NodeInfo, c client.MultiAddressClient) error { +func (s *streamer) relayRequest(ctx context.Context, info client.NodeInfo, c client.MultiAddressClient) error { // open stream resp := new(object.PutResponse) @@ -144,7 +144,7 @@ func (s *streamer) relayRequest(info client.NodeInfo, c client.MultiAddressClien var stream *rpc.PutRequestWriter err = c.RawForAddress(addr, func(cli *rawclient.Client) error { - stream, err = rpc.PutObject(cli, resp) + stream, err = rpc.PutObject(cli, resp, rawclient.WithContext(ctx)) return err }) if err != nil {