package putsvc import ( "context" "crypto/ecdsa" "errors" "fmt" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/policy" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement" pkgutil "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util" containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/transformer" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user" ) type Streamer struct { *cfg privateKey *ecdsa.PrivateKey target transformer.ChunkedObjectWriter relay func(context.Context, client.NodeInfo, client.MultiAddressClient) error maxPayloadSz uint64 // network config } var errNotInit = errors.New("stream not initialized") var errInitRecall = errors.New("init recall") func (p *Streamer) Init(ctx context.Context, prm *PutInitPrm) error { // initialize destination target if err := p.initTarget(prm); err != nil { return fmt.Errorf("(%T) could not initialize object target: %w", p, err) } if err := p.target.WriteHeader(ctx, prm.hdr); err != nil { return fmt.Errorf("(%T) could not write header to target: %w", p, err) } return nil } // MaxObjectSize returns maximum payload size for the streaming session. // // Must be called after the successful Init. func (p *Streamer) MaxObjectSize() uint64 { return p.maxPayloadSz } func (p *Streamer) initTarget(prm *PutInitPrm) error { // prevent re-calling if p.target != nil { return errInitRecall } // prepare needed put parameters if err := p.preparePrm(prm); err != nil { return fmt.Errorf("(%T) could not prepare put parameters: %w", p, err) } p.maxPayloadSz = p.maxSizeSrc.MaxObjectSize() if p.maxPayloadSz == 0 { return fmt.Errorf("(%T) could not obtain max object size parameter", p) } if prm.hdr.Signature() != nil { return p.initUntrustedTarget(prm) } return p.initTrustedTarget(prm) } func (p *Streamer) initUntrustedTarget(prm *PutInitPrm) error { p.relay = prm.relay nodeKey, err := p.cfg.keyStorage.GetKey(nil) if err != nil { return err } p.privateKey = nodeKey // prepare untrusted-Put object target p.target = &validatingPreparedTarget{ nextTarget: newInMemoryObjectBuilder(p.newObjectWriter(prm)), fmt: p.fmtValidator, maxPayloadSz: p.maxPayloadSz, } return nil } func (p *Streamer) initTrustedTarget(prm *PutInitPrm) error { sToken := prm.common.SessionToken() // prepare trusted-Put object target // get private token from local storage var sessionInfo *util.SessionInfo if sToken != nil { sessionInfo = &util.SessionInfo{ ID: sToken.ID(), Owner: sToken.Issuer(), } } key, err := p.keyStorage.GetKey(sessionInfo) if err != nil { return fmt.Errorf("(%T) could not receive session key: %w", p, err) } // In case session token is missing, the line above returns the default key. // If it isn't owner key, replication attempts will fail, thus this check. ownerObj := prm.hdr.OwnerID() if ownerObj.IsEmpty() { return errors.New("missing object owner") } if sToken == nil { var ownerSession user.ID user.IDFromKey(&ownerSession, key.PublicKey) if !ownerObj.Equals(ownerSession) { return fmt.Errorf("(%T) session token is missing but object owner id is different from the default key", p) } } else { if !ownerObj.Equals(sessionInfo.Owner) { return fmt.Errorf("(%T) different token issuer and object owner identifiers %s/%s", p, sessionInfo.Owner, ownerObj) } } p.privateKey = key p.target = &validatingTarget{ fmt: p.fmtValidator, nextTarget: transformer.NewPayloadSizeLimiter(transformer.Params{ Key: key, NextTargetInit: func() transformer.ObjectWriter { return p.newObjectWriter(prm) }, NetworkState: p.networkState, MaxSize: p.maxPayloadSz, WithoutHomomorphicHash: containerSDK.IsHomomorphicHashingDisabled(prm.cnr), SessionToken: sToken, }), } return nil } func (p *Streamer) preparePrm(prm *PutInitPrm) error { var err error // get latest network map nm, err := netmap.GetLatestNetworkMap(p.netMapSrc) if err != nil { return fmt.Errorf("(%T) could not get latest network map: %w", p, err) } idCnr, ok := prm.hdr.ContainerID() if !ok { return errors.New("missing container ID") } // get container to store the object cnrInfo, err := p.cnrSrc.Get(idCnr) if err != nil { return fmt.Errorf("(%T) could not get container by ID: %w", p, err) } prm.cnr = cnrInfo.Value // add common options prm.traverseOpts = append(prm.traverseOpts, // set processing container placement.ForContainer(prm.cnr), ) if ech := prm.hdr.ECHeader(); ech != nil { prm.traverseOpts = append(prm.traverseOpts, // set identifier of the processing object placement.ForObject(ech.Parent()), ) } else if id, ok := prm.hdr.ID(); ok { prm.traverseOpts = append(prm.traverseOpts, // set identifier of the processing object placement.ForObject(id), ) } // create placement builder from network map builder := placement.NewNetworkMapBuilder(nm) if prm.common.LocalOnly() { // restrict success count to 1 stored copy (to local storage) prm.traverseOpts = append(prm.traverseOpts, placement.SuccessAfter(1)) // use local-only placement builder builder = util.NewLocalPlacement(builder, p.netmapKeys) } // set placement builder prm.traverseOpts = append(prm.traverseOpts, placement.UseBuilder(builder)) return nil } func (p *Streamer) newObjectWriter(prm *PutInitPrm) transformer.ObjectWriter { if container.IsECContainer(prm.cnr) && object.IsECSupported(prm.hdr) { return p.newECWriter(prm) } return p.newDefaultObjectWriter(prm, false) } func (p *Streamer) newDefaultObjectWriter(prm *PutInitPrm, forECPlacement bool) transformer.ObjectWriter { var relay func(context.Context, nodeDesc) error if p.relay != nil { relay = func(ctx context.Context, node nodeDesc) error { var info client.NodeInfo client.NodeInfoFromNetmapElement(&info, node.info) c, err := p.clientConstructor.Get(info) if err != nil { return fmt.Errorf("could not create SDK client %s: %w", info.AddressGroup(), err) } return p.relay(ctx, info, c) } } var resetSuccessAfterOnBroadcast bool traverseOpts := prm.traverseOpts if forECPlacement && !prm.common.LocalOnly() { // save non-regular and linking object to EC container. // EC 2.1 -> REP 2, EC 2.2 -> REP 3 etc. traverseOpts = append(traverseOpts, placement.SuccessAfter(uint32(policy.ECParityCount(prm.cnr.PlacementPolicy())+1))) resetSuccessAfterOnBroadcast = true } return &distributedTarget{ cfg: p.cfg, placementOpts: traverseOpts, resetSuccessAfterOnBroadcast: resetSuccessAfterOnBroadcast, nodeTargetInitializer: func(node nodeDesc) preparedObjectTarget { if node.local { return localTarget{ storage: p.localStore, } } rt := &remoteTarget{ privateKey: p.privateKey, commonPrm: prm.common, clientConstructor: p.clientConstructor, } client.NodeInfoFromNetmapElement(&rt.nodeInfo, node.info) return rt }, relay: relay, } } func (p *Streamer) newECWriter(prm *PutInitPrm) transformer.ObjectWriter { return &objectWriterDispatcher{ ecWriter: &ecWriter{ cfg: p.cfg, placementOpts: append(prm.traverseOpts, placement.WithCopyNumbers(nil)), // copies number ignored for EC container: prm.cnr, key: p.privateKey, commonPrm: prm.common, relay: p.relay, }, repWriter: p.newDefaultObjectWriter(prm, true), } } func (p *Streamer) SendChunk(ctx context.Context, prm *PutChunkPrm) error { if p.target == nil { return errNotInit } if _, err := p.target.Write(ctx, prm.chunk); err != nil { return fmt.Errorf("(%T) could not write payload chunk to target: %w", p, err) } return nil } func (p *Streamer) Close(ctx context.Context) (*PutResponse, error) { if p.target == nil { return nil, errNotInit } ids, err := p.target.Close(ctx) if err != nil { return nil, fmt.Errorf("(%T) could not close object target: %w", p, err) } id := ids.ParentID if id != nil { return &PutResponse{ id: *id, }, nil } return &PutResponse{ id: ids.SelfID, }, nil } func (c *cfg) getWorkerPool(pub []byte) (pkgutil.WorkerPool, bool) { if c.netmapKeys.IsLocalKey(pub) { return c.localPool, true } return c.remotePool, false }