frostfs-node/pkg/services/object/put/streamer.go

173 lines
4 KiB
Go
Raw Normal View History

package putsvc
import (
"context"
"github.com/nspcc-dev/neofs-node/pkg/core/netmap"
"github.com/nspcc-dev/neofs-node/pkg/network"
"github.com/nspcc-dev/neofs-node/pkg/services/object/util"
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement"
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/transformer"
"github.com/pkg/errors"
)
type Streamer struct {
*cfg
ctx context.Context
target transformer.ObjectTarget
}
var errNotInit = errors.New("stream not initialized")
var errInitRecall = errors.New("init recall")
func (p *Streamer) Init(prm *PutInitPrm) error {
// initialize destination target
if err := p.initTarget(prm); err != nil {
return errors.Wrapf(err, "(%T) could not initialize object target", p)
}
return errors.Wrapf(
p.target.WriteHeader(prm.hdr),
"(%T) could not write header to target", p,
)
}
func (p *Streamer) initTarget(prm *PutInitPrm) error {
// prevent re-calling
if p.target != nil {
return errInitRecall
}
// prepare needed put parameters
if err := p.preparePrm(prm); err != nil {
return errors.Wrapf(err, "(%T) could not prepare put parameters", p)
}
sToken := prm.common.SessionToken()
if sToken == nil {
// prepare untrusted-Put object target
p.target = &validatingTarget{
nextTarget: p.newCommonTarget(prm),
fmt: p.fmtValidator,
}
return nil
}
// prepare trusted-Put object target
// get private token from local storage
sessionKey, err := p.keyStorage.GetKey(sToken)
if err != nil {
return errors.Wrapf(err, "(%T) could not receive session key", p)
}
p.target = transformer.NewPayloadSizeLimiter(
p.maxSizeSrc.MaxObjectSize(),
func() transformer.ObjectTarget {
return transformer.NewFormatTarget(sessionKey, p.newCommonTarget(prm), sToken)
},
)
return nil
}
func (p *Streamer) preparePrm(prm *PutInitPrm) error {
var err error
// get latest network map
nm, err := netmap.GetLatestNetworkMap(p.netMapSrc)
if err != nil {
return errors.Wrapf(err, "(%T) could not get latest network map", p)
}
// get container to store the object
cnr, err := p.cnrSrc.Get(prm.hdr.GetContainerID())
if err != nil {
return errors.Wrapf(err, "(%T) could not get container by ID", p)
}
// allocate placement traverser options
prm.traverseOpts = make([]placement.Option, 0, 4)
// add common options
prm.traverseOpts = append(prm.traverseOpts,
// set processing container
placement.ForContainer(cnr),
// set identifier of the processing object
placement.ForObject(prm.hdr.GetID()),
)
// create placement builder from network map
builder := placement.NewNetworkMapBuilder(nm)
if prm.common.LocalOnly() {
// restrict success count to 1 stored copy (to local storage)
prm.traverseOpts = append(prm.traverseOpts, placement.SuccessAfter(1))
// use local-only placement builder
builder = util.NewLocalPlacement(builder, p.localAddrSrc)
}
// set placement builder
prm.traverseOpts = append(prm.traverseOpts, placement.UseBuilder(builder))
return nil
}
func (p *Streamer) newCommonTarget(prm *PutInitPrm) transformer.ObjectTarget {
return &distributedTarget{
traverseOpts: prm.traverseOpts,
workerPool: p.workerPool,
nodeTargetInitializer: func(addr *network.Address) transformer.ObjectTarget {
if network.IsLocalAddress(p.localAddrSrc, addr) {
return &localTarget{
storage: p.localStore,
}
} else {
return &remoteTarget{
ctx: p.ctx,
keyStorage: p.keyStorage,
token: prm.common.SessionToken(),
addr: addr,
}
}
},
}
}
func (p *Streamer) SendChunk(prm *PutChunkPrm) error {
if p.target == nil {
return errNotInit
}
_, err := p.target.Write(prm.chunk)
return errors.Wrapf(err, "(%T) could not write payload chunk to target", p)
}
func (p *Streamer) Close() (*PutResponse, error) {
if p.target == nil {
return nil, errNotInit
}
ids, err := p.target.Close()
if err != nil {
return nil, errors.Wrapf(err, "(%T) could not close object target", p)
}
id := ids.ParentID()
if id == nil {
id = ids.SelfID()
}
return &PutResponse{
id: id,
}, nil
}