package target import ( "errors" "fmt" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap" objectwriter "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/common/writer" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement" containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/transformer" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user" ) func New(prm objectwriter.Params) (transformer.ChunkedObjectWriter, error) { // prepare needed put parameters if err := preparePrm(&prm); err != nil { return nil, fmt.Errorf("could not prepare put parameters: %w", err) } if prm.Header.Signature() != nil { return newUntrustedTarget(&prm) } return newTrustedTarget(&prm) } func newUntrustedTarget(prm *objectwriter.Params) (transformer.ChunkedObjectWriter, error) { maxPayloadSz := prm.Config.MaxSizeSrc.MaxObjectSize() if maxPayloadSz == 0 { return nil, errors.New("could not obtain max object size parameter") } if prm.SignRequestPrivateKey == nil { nodeKey, err := prm.Config.KeyStorage.GetKey(nil) if err != nil { return nil, err } prm.SignRequestPrivateKey = nodeKey } // prepare untrusted-Put object target return &validatingPreparedTarget{ nextTarget: newInMemoryObjectBuilder(objectwriter.New(prm)), fmt: prm.Config.FormatValidator, maxPayloadSz: maxPayloadSz, }, nil } func newTrustedTarget(prm *objectwriter.Params) (transformer.ChunkedObjectWriter, error) { prm.Relay = nil // do not relay request without signature maxPayloadSz := prm.Config.MaxSizeSrc.MaxObjectSize() if maxPayloadSz == 0 { return nil, errors.New("could not obtain max object size parameter") } sToken := prm.Common.SessionToken() // prepare trusted-Put object target // get private token from local storage var sessionInfo *util.SessionInfo if sToken != nil { sessionInfo = &util.SessionInfo{ ID: sToken.ID(), Owner: sToken.Issuer(), } } key, err := prm.Config.KeyStorage.GetKey(sessionInfo) if err != nil { return nil, fmt.Errorf("could not receive session key: %w", err) } // In case session token is missing, the line above returns the default key. // If it isn't owner key, replication attempts will fail, thus this check. ownerObj := prm.Header.OwnerID() if ownerObj.IsEmpty() { return nil, errors.New("missing object owner") } if sToken == nil { var ownerSession user.ID user.IDFromKey(&ownerSession, key.PublicKey) if !ownerObj.Equals(ownerSession) { return nil, fmt.Errorf("session token is missing but object owner id (%s) is different from the default key (%s)", ownerObj, ownerSession) } } else { if !ownerObj.Equals(sessionInfo.Owner) { return nil, fmt.Errorf("different token issuer and object owner identifiers %s/%s", sessionInfo.Owner, ownerObj) } } if prm.SignRequestPrivateKey == nil { prm.SignRequestPrivateKey = key } return &validatingTarget{ fmt: prm.Config.FormatValidator, nextTarget: transformer.NewPayloadSizeLimiter(transformer.Params{ Key: key, NextTargetInit: func() transformer.ObjectWriter { return objectwriter.New(prm) }, NetworkState: prm.Config.NetworkState, MaxSize: maxPayloadSz, WithoutHomomorphicHash: containerSDK.IsHomomorphicHashingDisabled(prm.Container), SessionToken: sToken, }), }, nil } func preparePrm(prm *objectwriter.Params) error { var err error // get latest network map nm, err := netmap.GetLatestNetworkMap(prm.Config.NetmapSource) if err != nil { return fmt.Errorf("could not get latest network map: %w", err) } idCnr, ok := prm.Header.ContainerID() if !ok { return errors.New("missing container ID") } // get container to store the object cnrInfo, err := prm.Config.ContainerSource.Get(idCnr) if err != nil { return fmt.Errorf("could not get container by ID: %w", err) } prm.Container = cnrInfo.Value // add common options prm.TraverseOpts = append(prm.TraverseOpts, // set processing container placement.ForContainer(prm.Container), ) if ech := prm.Header.ECHeader(); ech != nil { prm.TraverseOpts = append(prm.TraverseOpts, // set identifier of the processing object placement.ForObject(ech.Parent()), ) } else if id, ok := prm.Header.ID(); ok { prm.TraverseOpts = append(prm.TraverseOpts, // set identifier of the processing object placement.ForObject(id), ) } // create placement builder from network map builder := placement.NewNetworkMapBuilder(nm) if prm.Common.LocalOnly() { // restrict success count to 1 stored copy (to local storage) prm.TraverseOpts = append(prm.TraverseOpts, placement.SuccessAfter(1)) // use local-only placement builder builder = util.NewLocalPlacement(builder, prm.Config.NetmapKeys) } // set placement builder prm.TraverseOpts = append(prm.TraverseOpts, placement.UseBuilder(builder)) return nil }