forked from TrueCloudLab/frostfs-node
358e3ed8c4
Some software components regulate the way of working with placement arrays when a local node enters it. In the previous implementation, the locality criterion was the correspondence between the announced network address (group) and the address with which the node was configured. However, by design, network addresses are not unique identifiers of storage nodes in the system. Change comparisons by network addresses to comparisons by keys in all packages with the logic described above. Implement `netmap.AnnouncedKeys` interface on `cfg` type in the storage node application. Signed-off-by: Leonard Lyubich <leonard@nspcc.ru>
222 lines
5.2 KiB
Go
222 lines
5.2 KiB
Go
package putsvc
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
|
|
"github.com/nspcc-dev/neofs-node/pkg/core/client"
|
|
"github.com/nspcc-dev/neofs-node/pkg/core/netmap"
|
|
"github.com/nspcc-dev/neofs-node/pkg/network"
|
|
"github.com/nspcc-dev/neofs-node/pkg/services/object/util"
|
|
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement"
|
|
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/transformer"
|
|
)
|
|
|
|
type Streamer struct {
|
|
*cfg
|
|
|
|
ctx context.Context
|
|
|
|
target transformer.ObjectTarget
|
|
|
|
relay func(network.AddressGroup, client.Client) error
|
|
|
|
maxPayloadSz uint64 // network config
|
|
}
|
|
|
|
var errNotInit = errors.New("stream not initialized")
|
|
|
|
var errInitRecall = errors.New("init recall")
|
|
|
|
func (p *Streamer) Init(prm *PutInitPrm) error {
|
|
// initialize destination target
|
|
if err := p.initTarget(prm); err != nil {
|
|
return fmt.Errorf("(%T) could not initialize object target: %w", p, err)
|
|
}
|
|
|
|
if err := p.target.WriteHeader(prm.hdr); err != nil {
|
|
return fmt.Errorf("(%T) could not write header to target: %w", p, err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// MaxObjectSize returns maximum payload size for the streaming session.
|
|
//
|
|
// Must be called after the successful Init.
|
|
func (p *Streamer) MaxObjectSize() uint64 {
|
|
return p.maxPayloadSz
|
|
}
|
|
|
|
func (p *Streamer) initTarget(prm *PutInitPrm) error {
|
|
// prevent re-calling
|
|
if p.target != nil {
|
|
return errInitRecall
|
|
}
|
|
|
|
// prepare needed put parameters
|
|
if err := p.preparePrm(prm); err != nil {
|
|
return fmt.Errorf("(%T) could not prepare put parameters: %w", p, err)
|
|
}
|
|
|
|
p.maxPayloadSz = p.maxSizeSrc.MaxObjectSize()
|
|
if p.maxPayloadSz == 0 {
|
|
return fmt.Errorf("(%T) could not obtain max object size parameter", p)
|
|
}
|
|
|
|
if prm.hdr.Signature() != nil {
|
|
p.relay = prm.relay
|
|
|
|
// prepare untrusted-Put object target
|
|
p.target = &validatingTarget{
|
|
nextTarget: p.newCommonTarget(prm),
|
|
fmt: p.fmtValidator,
|
|
|
|
maxPayloadSz: p.maxPayloadSz,
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
sToken := prm.common.SessionToken()
|
|
|
|
// prepare trusted-Put object target
|
|
|
|
// get private token from local storage
|
|
sessionKey, err := p.keyStorage.GetKey(sToken)
|
|
if err != nil {
|
|
return fmt.Errorf("(%T) could not receive session key: %w", p, err)
|
|
}
|
|
|
|
p.target = transformer.NewPayloadSizeLimiter(
|
|
p.maxPayloadSz,
|
|
func() transformer.ObjectTarget {
|
|
return transformer.NewFormatTarget(&transformer.FormatterParams{
|
|
Key: sessionKey,
|
|
NextTarget: p.newCommonTarget(prm),
|
|
SessionToken: sToken,
|
|
NetworkState: p.networkState,
|
|
})
|
|
},
|
|
)
|
|
|
|
return nil
|
|
}
|
|
|
|
func (p *Streamer) preparePrm(prm *PutInitPrm) error {
|
|
var err error
|
|
|
|
// get latest network map
|
|
nm, err := netmap.GetLatestNetworkMap(p.netMapSrc)
|
|
if err != nil {
|
|
return fmt.Errorf("(%T) could not get latest network map: %w", p, err)
|
|
}
|
|
|
|
// get container to store the object
|
|
cnr, err := p.cnrSrc.Get(prm.hdr.ContainerID())
|
|
if err != nil {
|
|
return fmt.Errorf("(%T) could not get container by ID: %w", p, err)
|
|
}
|
|
|
|
// add common options
|
|
prm.traverseOpts = append(prm.traverseOpts,
|
|
// set processing container
|
|
placement.ForContainer(cnr),
|
|
|
|
// set identifier of the processing object
|
|
placement.ForObject(prm.hdr.ID()),
|
|
)
|
|
|
|
// create placement builder from network map
|
|
builder := placement.NewNetworkMapBuilder(nm)
|
|
|
|
if prm.common.LocalOnly() {
|
|
// restrict success count to 1 stored copy (to local storage)
|
|
prm.traverseOpts = append(prm.traverseOpts, placement.SuccessAfter(1))
|
|
|
|
// use local-only placement builder
|
|
builder = util.NewLocalPlacement(builder, p.netmapKeys)
|
|
}
|
|
|
|
// set placement builder
|
|
prm.traverseOpts = append(prm.traverseOpts, placement.UseBuilder(builder))
|
|
|
|
return nil
|
|
}
|
|
|
|
var errLocalAddress = errors.New("can't relay to local address")
|
|
|
|
func (p *Streamer) newCommonTarget(prm *PutInitPrm) transformer.ObjectTarget {
|
|
var relay func(placement.Node) error
|
|
if p.relay != nil {
|
|
relay = func(node placement.Node) error {
|
|
addr := node.Addresses()
|
|
|
|
if p.netmapKeys.IsLocalKey(node.Key()) {
|
|
return errLocalAddress
|
|
}
|
|
|
|
c, err := p.clientConstructor.Get(addr)
|
|
if err != nil {
|
|
return fmt.Errorf("could not create SDK client %s: %w", addr, err)
|
|
}
|
|
|
|
return p.relay(addr, c)
|
|
}
|
|
}
|
|
|
|
return &distributedTarget{
|
|
traverseOpts: prm.traverseOpts,
|
|
workerPool: p.workerPool,
|
|
nodeTargetInitializer: func(node placement.Node) transformer.ObjectTarget {
|
|
if p.netmapKeys.IsLocalKey(node.Key()) {
|
|
return &localTarget{
|
|
storage: p.localStore,
|
|
}
|
|
}
|
|
|
|
return &remoteTarget{
|
|
ctx: p.ctx,
|
|
keyStorage: p.keyStorage,
|
|
commonPrm: prm.common,
|
|
addr: node.Addresses(),
|
|
clientConstructor: p.clientConstructor,
|
|
}
|
|
},
|
|
relay: relay,
|
|
fmt: p.fmtValidator,
|
|
log: p.log,
|
|
}
|
|
}
|
|
|
|
func (p *Streamer) SendChunk(prm *PutChunkPrm) error {
|
|
if p.target == nil {
|
|
return errNotInit
|
|
}
|
|
|
|
if _, err := p.target.Write(prm.chunk); err != nil {
|
|
return fmt.Errorf("(%T) could not write payload chunk to target: %w", p, err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (p *Streamer) Close() (*PutResponse, error) {
|
|
if p.target == nil {
|
|
return nil, errNotInit
|
|
}
|
|
|
|
ids, err := p.target.Close()
|
|
if err != nil {
|
|
return nil, fmt.Errorf("(%T) could not close object target: %w", p, err)
|
|
}
|
|
|
|
id := ids.ParentID()
|
|
if id == nil {
|
|
id = ids.SelfID()
|
|
}
|
|
|
|
return &PutResponse{
|
|
id: id,
|
|
}, nil
|
|
}
|