frostfs-node/pkg/services/object/put/streamer.go

283 lines
7.2 KiB
Go
Raw Permalink Normal View History

package putsvc
import (
"context"
"crypto/ecdsa"
"errors"
"fmt"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
pkgutil "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/transformer"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
)
type Streamer struct {
*cfg
sessionKey *ecdsa.PrivateKey
target transformer.ChunkedObjectWriter
relay func(context.Context, client.NodeInfo, client.MultiAddressClient) error
maxPayloadSz uint64 // network config
}
var errNotInit = errors.New("stream not initialized")
var errInitRecall = errors.New("init recall")
func (p *Streamer) Init(ctx context.Context, prm *PutInitPrm) error {
// initialize destination target
if err := p.initTarget(prm); err != nil {
return fmt.Errorf("(%T) could not initialize object target: %w", p, err)
}
if err := p.target.WriteHeader(ctx, prm.hdr); err != nil {
return fmt.Errorf("(%T) could not write header to target: %w", p, err)
}
return nil
}
// MaxObjectSize returns maximum payload size for the streaming session.
//
// Must be called after the successful Init.
func (p *Streamer) MaxObjectSize() uint64 {
return p.maxPayloadSz
}
func (p *Streamer) initTarget(prm *PutInitPrm) error {
// prevent re-calling
if p.target != nil {
return errInitRecall
}
// prepare needed put parameters
if err := p.preparePrm(prm); err != nil {
return fmt.Errorf("(%T) could not prepare put parameters: %w", p, err)
}
p.maxPayloadSz = p.maxSizeSrc.MaxObjectSize()
if p.maxPayloadSz == 0 {
return fmt.Errorf("(%T) could not obtain max object size parameter", p)
}
if prm.hdr.Signature() != nil {
return p.initUntrustedTarget(prm)
}
return p.initTrustedTarget(prm)
}
func (p *Streamer) initUntrustedTarget(prm *PutInitPrm) error {
p.relay = prm.relay
// prepare untrusted-Put object target
p.target = &validatingPreparedTarget{
nextTarget: p.newCommonTarget(prm),
fmt: p.fmtValidator,
maxPayloadSz: p.maxPayloadSz,
}
return nil
}
func (p *Streamer) initTrustedTarget(prm *PutInitPrm) error {
sToken := prm.common.SessionToken()
// prepare trusted-Put object target
// get private token from local storage
var sessionInfo *util.SessionInfo
if sToken != nil {
sessionInfo = &util.SessionInfo{
ID: sToken.ID(),
Owner: sToken.Issuer(),
}
}
sessionKey, err := p.keyStorage.GetKey(sessionInfo)
if err != nil {
return fmt.Errorf("(%T) could not receive session key: %w", p, err)
}
// In case session token is missing, the line above returns the default key.
// If it isn't owner key, replication attempts will fail, thus this check.
if sToken == nil {
ownerObj := prm.hdr.OwnerID()
if ownerObj.IsEmpty() {
return errors.New("missing object owner")
}
var ownerSession user.ID
user.IDFromKey(&ownerSession, sessionKey.PublicKey)
if !ownerObj.Equals(ownerSession) {
return fmt.Errorf("(%T) session token is missing but object owner id is different from the default key", p)
}
}
p.sessionKey = sessionKey
p.target = &validatingTarget{
fmt: p.fmtValidator,
nextTarget: transformer.NewPayloadSizeLimiter(transformer.Params{
Key: sessionKey,
NextTargetInit: func() transformer.ObjectWriter { return p.newCommonTarget(prm) },
NetworkState: p.networkState,
MaxSize: p.maxPayloadSz,
WithoutHomomorphicHash: containerSDK.IsHomomorphicHashingDisabled(prm.cnr),
SessionToken: sToken,
}),
}
return nil
}
func (p *Streamer) preparePrm(prm *PutInitPrm) error {
var err error
// get latest network map
nm, err := netmap.GetLatestNetworkMap(p.netMapSrc)
if err != nil {
return fmt.Errorf("(%T) could not get latest network map: %w", p, err)
}
idCnr, ok := prm.hdr.ContainerID()
if !ok {
return errors.New("missing container ID")
}
// get container to store the object
cnrInfo, err := p.cnrSrc.Get(idCnr)
if err != nil {
return fmt.Errorf("(%T) could not get container by ID: %w", p, err)
}
prm.cnr = cnrInfo.Value
// add common options
prm.traverseOpts = append(prm.traverseOpts,
// set processing container
placement.ForContainer(prm.cnr),
)
if id, ok := prm.hdr.ID(); ok {
prm.traverseOpts = append(prm.traverseOpts,
// set identifier of the processing object
placement.ForObject(id),
)
}
// create placement builder from network map
builder := placement.NewNetworkMapBuilder(nm)
if prm.common.LocalOnly() {
// restrict success count to 1 stored copy (to local storage)
prm.traverseOpts = append(prm.traverseOpts, placement.SuccessAfter(1))
// use local-only placement builder
builder = util.NewLocalPlacement(builder, p.netmapKeys)
}
// set placement builder
prm.traverseOpts = append(prm.traverseOpts, placement.UseBuilder(builder))
return nil
}
func (p *Streamer) newCommonTarget(prm *PutInitPrm) *distributedTarget {
var relay func(context.Context, nodeDesc) error
if p.relay != nil {
relay = func(ctx context.Context, node nodeDesc) error {
var info client.NodeInfo
client.NodeInfoFromNetmapElement(&info, node.info)
c, err := p.clientConstructor.Get(info)
if err != nil {
return fmt.Errorf("could not create SDK client %s: %w", info.AddressGroup(), err)
}
return p.relay(ctx, info, c)
}
}
// enable additional container broadcast on non-local operation
// if object has TOMBSTONE or LOCK type.
typ := prm.hdr.Type()
withBroadcast := !prm.common.LocalOnly() && (typ == objectSDK.TypeTombstone || typ == objectSDK.TypeLock)
return &distributedTarget{
cfg: p.cfg,
placementOpts: prm.traverseOpts,
extraBroadcastEnabled: withBroadcast,
payload: getPayload(),
nodeTargetInitializer: func(node nodeDesc) preparedObjectTarget {
if node.local {
return localTarget{
storage: p.localStore,
}
}
rt := &remoteTarget{
privateKey: p.sessionKey,
commonPrm: prm.common,
clientConstructor: p.clientConstructor,
}
client.NodeInfoFromNetmapElement(&rt.nodeInfo, node.info)
return rt
},
relay: relay,
}
}
func (p *Streamer) SendChunk(ctx context.Context, prm *PutChunkPrm) error {
if p.target == nil {
return errNotInit
}
if _, err := p.target.Write(ctx, prm.chunk); err != nil {
return fmt.Errorf("(%T) could not write payload chunk to target: %w", p, err)
}
return nil
}
func (p *Streamer) Close(ctx context.Context) (*PutResponse, error) {
if p.target == nil {
return nil, errNotInit
}
ids, err := p.target.Close(ctx)
if err != nil {
return nil, fmt.Errorf("(%T) could not close object target: %w", p, err)
}
id := ids.ParentID
if id != nil {
return &PutResponse{
id: *id,
}, nil
}
return &PutResponse{
id: ids.SelfID,
}, nil
}
func (c *cfg) getWorkerPool(pub []byte) (pkgutil.WorkerPool, bool) {
if c.netmapKeys.IsLocalKey(pub) {
return c.localPool, true
}
return c.remotePool, false
}