forked from TrueCloudLab/frostfs-node
[#242] put: Pass context to relay function
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
parent
995db117d0
commit
200fc8b882
4 changed files with 12 additions and 10 deletions
|
@ -36,7 +36,7 @@ type distributedTarget struct {
|
||||||
|
|
||||||
isLocalKey func([]byte) bool
|
isLocalKey func([]byte) bool
|
||||||
|
|
||||||
relay func(nodeDesc) error
|
relay func(context.Context, nodeDesc) error
|
||||||
|
|
||||||
fmt *object.FormatValidator
|
fmt *object.FormatValidator
|
||||||
|
|
||||||
|
@ -153,7 +153,7 @@ func (t *distributedTarget) Close(ctx context.Context) (*transformer.AccessIdent
|
||||||
|
|
||||||
func (t *distributedTarget) sendObject(ctx context.Context, node nodeDesc) error {
|
func (t *distributedTarget) sendObject(ctx context.Context, node nodeDesc) error {
|
||||||
if !node.local && t.relay != nil {
|
if !node.local && t.relay != nil {
|
||||||
return t.relay(node)
|
return t.relay(ctx, node)
|
||||||
}
|
}
|
||||||
|
|
||||||
target := t.nodeTargetInitializer(node)
|
target := t.nodeTargetInitializer(node)
|
||||||
|
|
|
@ -1,6 +1,8 @@
|
||||||
package putsvc
|
package putsvc
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
|
||||||
|
@ -17,7 +19,7 @@ type PutInitPrm struct {
|
||||||
|
|
||||||
traverseOpts []placement.Option
|
traverseOpts []placement.Option
|
||||||
|
|
||||||
relay func(client.NodeInfo, client.MultiAddressClient) error
|
relay func(context.Context, client.NodeInfo, client.MultiAddressClient) error
|
||||||
}
|
}
|
||||||
|
|
||||||
type PutChunkPrm struct {
|
type PutChunkPrm struct {
|
||||||
|
@ -40,7 +42,7 @@ func (p *PutInitPrm) WithObject(v *object.Object) *PutInitPrm {
|
||||||
return p
|
return p
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *PutInitPrm) WithRelay(f func(client.NodeInfo, client.MultiAddressClient) error) *PutInitPrm {
|
func (p *PutInitPrm) WithRelay(f func(context.Context, client.NodeInfo, client.MultiAddressClient) error) *PutInitPrm {
|
||||||
if p != nil {
|
if p != nil {
|
||||||
p.relay = f
|
p.relay = f
|
||||||
}
|
}
|
||||||
|
|
|
@ -23,7 +23,7 @@ type Streamer struct {
|
||||||
|
|
||||||
target transformer.ObjectTarget
|
target transformer.ObjectTarget
|
||||||
|
|
||||||
relay func(client.NodeInfo, client.MultiAddressClient) error
|
relay func(context.Context, client.NodeInfo, client.MultiAddressClient) error
|
||||||
|
|
||||||
maxPayloadSz uint64 // network config
|
maxPayloadSz uint64 // network config
|
||||||
}
|
}
|
||||||
|
@ -197,9 +197,9 @@ func (p *Streamer) preparePrm(prm *PutInitPrm) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *Streamer) newCommonTarget(prm *PutInitPrm) transformer.ObjectTarget {
|
func (p *Streamer) newCommonTarget(prm *PutInitPrm) transformer.ObjectTarget {
|
||||||
var relay func(nodeDesc) error
|
var relay func(context.Context, nodeDesc) error
|
||||||
if p.relay != nil {
|
if p.relay != nil {
|
||||||
relay = func(node nodeDesc) error {
|
relay = func(ctx context.Context, node nodeDesc) error {
|
||||||
var info client.NodeInfo
|
var info client.NodeInfo
|
||||||
|
|
||||||
client.NodeInfoFromNetmapElement(&info, node.info)
|
client.NodeInfoFromNetmapElement(&info, node.info)
|
||||||
|
@ -209,7 +209,7 @@ func (p *Streamer) newCommonTarget(prm *PutInitPrm) transformer.ObjectTarget {
|
||||||
return fmt.Errorf("could not create SDK client %s: %w", info.AddressGroup(), err)
|
return fmt.Errorf("could not create SDK client %s: %w", info.AddressGroup(), err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return p.relay(info, c)
|
return p.relay(ctx, info, c)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -120,7 +120,7 @@ func (s *streamer) CloseAndRecv(ctx context.Context) (*object.PutResponse, error
|
||||||
return fromPutResponse(resp), nil
|
return fromPutResponse(resp), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *streamer) relayRequest(info client.NodeInfo, c client.MultiAddressClient) error {
|
func (s *streamer) relayRequest(ctx context.Context, info client.NodeInfo, c client.MultiAddressClient) error {
|
||||||
// open stream
|
// open stream
|
||||||
resp := new(object.PutResponse)
|
resp := new(object.PutResponse)
|
||||||
|
|
||||||
|
@ -144,7 +144,7 @@ func (s *streamer) relayRequest(info client.NodeInfo, c client.MultiAddressClien
|
||||||
var stream *rpc.PutRequestWriter
|
var stream *rpc.PutRequestWriter
|
||||||
|
|
||||||
err = c.RawForAddress(addr, func(cli *rawclient.Client) error {
|
err = c.RawForAddress(addr, func(cli *rawclient.Client) error {
|
||||||
stream, err = rpc.PutObject(cli, resp)
|
stream, err = rpc.PutObject(cli, resp, rawclient.WithContext(ctx))
|
||||||
return err
|
return err
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
Loading…
Reference in a new issue