frostfs-node/pkg/services/object/common/target/target.go
Dmitrii Stepanov a5e1aa22c9 [#1394] putSvc: Fix relay
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2024-09-25 17:28:03 +03:00

169 lines
4.9 KiB
Go

package target
import (
"errors"
"fmt"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
objectwriter "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/common/writer"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/transformer"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
)
func New(prm objectwriter.Params) (transformer.ChunkedObjectWriter, error) {
// prepare needed put parameters
if err := preparePrm(&prm); err != nil {
return nil, fmt.Errorf("could not prepare put parameters: %w", err)
}
if prm.Header.Signature() != nil {
return newUntrustedTarget(&prm)
}
return newTrustedTarget(&prm)
}
func newUntrustedTarget(prm *objectwriter.Params) (transformer.ChunkedObjectWriter, error) {
maxPayloadSz := prm.Config.MaxSizeSrc.MaxObjectSize()
if maxPayloadSz == 0 {
return nil, errors.New("could not obtain max object size parameter")
}
if prm.SignRequestPrivateKey == nil {
nodeKey, err := prm.Config.KeyStorage.GetKey(nil)
if err != nil {
return nil, err
}
prm.SignRequestPrivateKey = nodeKey
}
// prepare untrusted-Put object target
return &validatingPreparedTarget{
nextTarget: newInMemoryObjectBuilder(objectwriter.New(prm)),
fmt: prm.Config.FormatValidator,
maxPayloadSz: maxPayloadSz,
}, nil
}
func newTrustedTarget(prm *objectwriter.Params) (transformer.ChunkedObjectWriter, error) {
prm.Relay = nil // do not relay request without signature
maxPayloadSz := prm.Config.MaxSizeSrc.MaxObjectSize()
if maxPayloadSz == 0 {
return nil, errors.New("could not obtain max object size parameter")
}
sToken := prm.Common.SessionToken()
// prepare trusted-Put object target
// get private token from local storage
var sessionInfo *util.SessionInfo
if sToken != nil {
sessionInfo = &util.SessionInfo{
ID: sToken.ID(),
Owner: sToken.Issuer(),
}
}
key, err := prm.Config.KeyStorage.GetKey(sessionInfo)
if err != nil {
return nil, fmt.Errorf("could not receive session key: %w", err)
}
// In case session token is missing, the line above returns the default key.
// If it isn't owner key, replication attempts will fail, thus this check.
ownerObj := prm.Header.OwnerID()
if ownerObj.IsEmpty() {
return nil, errors.New("missing object owner")
}
if sToken == nil {
var ownerSession user.ID
user.IDFromKey(&ownerSession, key.PublicKey)
if !ownerObj.Equals(ownerSession) {
return nil, errors.New("session token is missing but object owner id is different from the default key")
}
} else {
if !ownerObj.Equals(sessionInfo.Owner) {
return nil, fmt.Errorf("different token issuer and object owner identifiers %s/%s", sessionInfo.Owner, ownerObj)
}
}
if prm.SignRequestPrivateKey == nil {
prm.SignRequestPrivateKey = key
}
return &validatingTarget{
fmt: prm.Config.FormatValidator,
nextTarget: transformer.NewPayloadSizeLimiter(transformer.Params{
Key: key,
NextTargetInit: func() transformer.ObjectWriter { return objectwriter.New(prm) },
NetworkState: prm.Config.NetworkState,
MaxSize: maxPayloadSz,
WithoutHomomorphicHash: containerSDK.IsHomomorphicHashingDisabled(prm.Container),
SessionToken: sToken,
}),
}, nil
}
func preparePrm(prm *objectwriter.Params) error {
var err error
// get latest network map
nm, err := netmap.GetLatestNetworkMap(prm.Config.NetmapSource)
if err != nil {
return fmt.Errorf("could not get latest network map: %w", err)
}
idCnr, ok := prm.Header.ContainerID()
if !ok {
return errors.New("missing container ID")
}
// get container to store the object
cnrInfo, err := prm.Config.ContainerSource.Get(idCnr)
if err != nil {
return fmt.Errorf("could not get container by ID: %w", err)
}
prm.Container = cnrInfo.Value
// add common options
prm.TraverseOpts = append(prm.TraverseOpts,
// set processing container
placement.ForContainer(prm.Container),
)
if ech := prm.Header.ECHeader(); ech != nil {
prm.TraverseOpts = append(prm.TraverseOpts,
// set identifier of the processing object
placement.ForObject(ech.Parent()),
)
} else if id, ok := prm.Header.ID(); ok {
prm.TraverseOpts = append(prm.TraverseOpts,
// set identifier of the processing object
placement.ForObject(id),
)
}
// create placement builder from network map
builder := placement.NewNetworkMapBuilder(nm)
if prm.Common.LocalOnly() {
// restrict success count to 1 stored copy (to local storage)
prm.TraverseOpts = append(prm.TraverseOpts, placement.SuccessAfter(1))
// use local-only placement builder
builder = util.NewLocalPlacement(builder, prm.Config.NetmapKeys)
}
// set placement builder
prm.TraverseOpts = append(prm.TraverseOpts, placement.UseBuilder(builder))
return nil
}