package putsvc import ( "context" "github.com/nspcc-dev/neofs-node/pkg/core/netmap" "github.com/nspcc-dev/neofs-node/pkg/network" "github.com/nspcc-dev/neofs-node/pkg/services/object/util" "github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement" "github.com/nspcc-dev/neofs-node/pkg/services/object_manager/transformer" "github.com/pkg/errors" ) type Streamer struct { *cfg ctx context.Context target transformer.ObjectTarget } var errNotInit = errors.New("stream not initialized") var errInitRecall = errors.New("init recall") func (p *Streamer) Init(prm *PutInitPrm) error { // initialize destination target if err := p.initTarget(prm); err != nil { return errors.Wrapf(err, "(%T) could not initialize object target", p) } return errors.Wrapf( p.target.WriteHeader(prm.hdr), "(%T) could not write header to target", p, ) } func (p *Streamer) initTarget(prm *PutInitPrm) error { // prevent re-calling if p.target != nil { return errInitRecall } // prepare needed put parameters if err := p.preparePrm(prm); err != nil { return errors.Wrapf(err, "(%T) could not prepare put parameters", p) } sToken := prm.common.SessionToken() if sToken == nil { // prepare untrusted-Put object target p.target = &validatingTarget{ nextTarget: p.newCommonTarget(prm), fmt: p.fmtValidator, } return nil } // prepare trusted-Put object target // get private token from local storage sessionKey, err := p.keyStorage.GetKey(sToken) if err != nil { return errors.Wrapf(err, "(%T) could not receive session key", p) } p.target = transformer.NewPayloadSizeLimiter( p.maxSizeSrc.MaxObjectSize(), func() transformer.ObjectTarget { return transformer.NewFormatTarget(sessionKey, p.newCommonTarget(prm), sToken) }, ) return nil } func (p *Streamer) preparePrm(prm *PutInitPrm) error { var err error // get latest network map nm, err := netmap.GetLatestNetworkMap(p.netMapSrc) if err != nil { return errors.Wrapf(err, "(%T) could not get latest network map", p) } // get container to store the object cnr, err := p.cnrSrc.Get(prm.hdr.GetContainerID()) if err != nil { return errors.Wrapf(err, "(%T) could not get container by ID", p) } // allocate placement traverser options prm.traverseOpts = make([]placement.Option, 0, 4) // add common options prm.traverseOpts = append(prm.traverseOpts, // set processing container placement.ForContainer(cnr), // set identifier of the processing object placement.ForObject(prm.hdr.GetID()), ) // create placement builder from network map builder := placement.NewNetworkMapBuilder(nm) if prm.common.LocalOnly() { // restrict success count to 1 stored copy (to local storage) prm.traverseOpts = append(prm.traverseOpts, placement.SuccessAfter(1)) // use local-only placement builder builder = util.NewLocalPlacement(builder, p.localAddrSrc) } // set placement builder prm.traverseOpts = append(prm.traverseOpts, placement.UseBuilder(builder)) return nil } func (p *Streamer) newCommonTarget(prm *PutInitPrm) transformer.ObjectTarget { return &distributedTarget{ traverseOpts: prm.traverseOpts, workerPool: p.workerPool, nodeTargetInitializer: func(addr *network.Address) transformer.ObjectTarget { if network.IsLocalAddress(p.localAddrSrc, addr) { return &localTarget{ storage: p.localStore, } } else { return &remoteTarget{ ctx: p.ctx, keyStorage: p.keyStorage, token: prm.common.SessionToken(), addr: addr, } } }, } } func (p *Streamer) SendChunk(prm *PutChunkPrm) error { if p.target == nil { return errNotInit } _, err := p.target.Write(prm.chunk) return errors.Wrapf(err, "(%T) could not write payload chunk to target", p) } func (p *Streamer) Close() (*PutResponse, error) { if p.target == nil { return nil, errNotInit } ids, err := p.target.Close() if err != nil { return nil, errors.Wrapf(err, "(%T) could not close object target", p) } id := ids.ParentID() if id == nil { id = ids.SelfID() } return &PutResponse{ id: id, }, nil }