forked from TrueCloudLab/frostfs-node
Evgenii Stratonikov
6e2cc32768
Previously, the check was in place only when session token was missing. Format validator checks are applied only to fully-prepared object, so this lead to the following situation: 1. Object is put locally with malformed token, because there are no checks. 2. Object cannot be replicated, because the token is malformed. This is now fixed and token check is done before any payload receival. Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
286 lines
7.3 KiB
Go
286 lines
7.3 KiB
Go
package putsvc
|
|
|
|
import (
|
|
"context"
|
|
"crypto/ecdsa"
|
|
"errors"
|
|
"fmt"
|
|
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
|
|
pkgutil "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
|
|
containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
|
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/transformer"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
|
|
)
|
|
|
|
type Streamer struct {
|
|
*cfg
|
|
|
|
sessionKey *ecdsa.PrivateKey
|
|
|
|
target transformer.ChunkedObjectWriter
|
|
|
|
relay func(context.Context, client.NodeInfo, client.MultiAddressClient) error
|
|
|
|
maxPayloadSz uint64 // network config
|
|
}
|
|
|
|
var errNotInit = errors.New("stream not initialized")
|
|
|
|
var errInitRecall = errors.New("init recall")
|
|
|
|
func (p *Streamer) Init(ctx context.Context, prm *PutInitPrm) error {
|
|
// initialize destination target
|
|
if err := p.initTarget(prm); err != nil {
|
|
return fmt.Errorf("(%T) could not initialize object target: %w", p, err)
|
|
}
|
|
|
|
if err := p.target.WriteHeader(ctx, prm.hdr); err != nil {
|
|
return fmt.Errorf("(%T) could not write header to target: %w", p, err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// MaxObjectSize returns maximum payload size for the streaming session.
|
|
//
|
|
// Must be called after the successful Init.
|
|
func (p *Streamer) MaxObjectSize() uint64 {
|
|
return p.maxPayloadSz
|
|
}
|
|
|
|
func (p *Streamer) initTarget(prm *PutInitPrm) error {
|
|
// prevent re-calling
|
|
if p.target != nil {
|
|
return errInitRecall
|
|
}
|
|
|
|
// prepare needed put parameters
|
|
if err := p.preparePrm(prm); err != nil {
|
|
return fmt.Errorf("(%T) could not prepare put parameters: %w", p, err)
|
|
}
|
|
|
|
p.maxPayloadSz = p.maxSizeSrc.MaxObjectSize()
|
|
if p.maxPayloadSz == 0 {
|
|
return fmt.Errorf("(%T) could not obtain max object size parameter", p)
|
|
}
|
|
|
|
if prm.hdr.Signature() != nil {
|
|
return p.initUntrustedTarget(prm)
|
|
}
|
|
return p.initTrustedTarget(prm)
|
|
}
|
|
|
|
func (p *Streamer) initUntrustedTarget(prm *PutInitPrm) error {
|
|
p.relay = prm.relay
|
|
|
|
// prepare untrusted-Put object target
|
|
p.target = &validatingPreparedTarget{
|
|
nextTarget: p.newCommonTarget(prm),
|
|
fmt: p.fmtValidator,
|
|
|
|
maxPayloadSz: p.maxPayloadSz,
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (p *Streamer) initTrustedTarget(prm *PutInitPrm) error {
|
|
sToken := prm.common.SessionToken()
|
|
|
|
// prepare trusted-Put object target
|
|
|
|
// get private token from local storage
|
|
var sessionInfo *util.SessionInfo
|
|
|
|
if sToken != nil {
|
|
sessionInfo = &util.SessionInfo{
|
|
ID: sToken.ID(),
|
|
Owner: sToken.Issuer(),
|
|
}
|
|
}
|
|
|
|
sessionKey, err := p.keyStorage.GetKey(sessionInfo)
|
|
if err != nil {
|
|
return fmt.Errorf("(%T) could not receive session key: %w", p, err)
|
|
}
|
|
|
|
// In case session token is missing, the line above returns the default key.
|
|
// If it isn't owner key, replication attempts will fail, thus this check.
|
|
ownerObj := prm.hdr.OwnerID()
|
|
if ownerObj.IsEmpty() {
|
|
return errors.New("missing object owner")
|
|
}
|
|
|
|
if sToken == nil {
|
|
var ownerSession user.ID
|
|
user.IDFromKey(&ownerSession, sessionKey.PublicKey)
|
|
|
|
if !ownerObj.Equals(ownerSession) {
|
|
return fmt.Errorf("(%T) session token is missing but object owner id is different from the default key", p)
|
|
}
|
|
} else {
|
|
if !ownerObj.Equals(sessionInfo.Owner) {
|
|
return fmt.Errorf("(%T) different token issuer and object owner identifiers %s/%s", p, sessionInfo.Owner, ownerObj)
|
|
}
|
|
}
|
|
|
|
p.sessionKey = sessionKey
|
|
p.target = &validatingTarget{
|
|
fmt: p.fmtValidator,
|
|
nextTarget: transformer.NewPayloadSizeLimiter(transformer.Params{
|
|
Key: sessionKey,
|
|
NextTargetInit: func() transformer.ObjectWriter { return p.newCommonTarget(prm) },
|
|
NetworkState: p.networkState,
|
|
MaxSize: p.maxPayloadSz,
|
|
WithoutHomomorphicHash: containerSDK.IsHomomorphicHashingDisabled(prm.cnr),
|
|
SessionToken: sToken,
|
|
}),
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (p *Streamer) preparePrm(prm *PutInitPrm) error {
|
|
var err error
|
|
|
|
// get latest network map
|
|
nm, err := netmap.GetLatestNetworkMap(p.netMapSrc)
|
|
if err != nil {
|
|
return fmt.Errorf("(%T) could not get latest network map: %w", p, err)
|
|
}
|
|
|
|
idCnr, ok := prm.hdr.ContainerID()
|
|
if !ok {
|
|
return errors.New("missing container ID")
|
|
}
|
|
|
|
// get container to store the object
|
|
cnrInfo, err := p.cnrSrc.Get(idCnr)
|
|
if err != nil {
|
|
return fmt.Errorf("(%T) could not get container by ID: %w", p, err)
|
|
}
|
|
|
|
prm.cnr = cnrInfo.Value
|
|
|
|
// add common options
|
|
prm.traverseOpts = append(prm.traverseOpts,
|
|
// set processing container
|
|
placement.ForContainer(prm.cnr),
|
|
)
|
|
|
|
if id, ok := prm.hdr.ID(); ok {
|
|
prm.traverseOpts = append(prm.traverseOpts,
|
|
// set identifier of the processing object
|
|
placement.ForObject(id),
|
|
)
|
|
}
|
|
|
|
// create placement builder from network map
|
|
builder := placement.NewNetworkMapBuilder(nm)
|
|
|
|
if prm.common.LocalOnly() {
|
|
// restrict success count to 1 stored copy (to local storage)
|
|
prm.traverseOpts = append(prm.traverseOpts, placement.SuccessAfter(1))
|
|
|
|
// use local-only placement builder
|
|
builder = util.NewLocalPlacement(builder, p.netmapKeys)
|
|
}
|
|
|
|
// set placement builder
|
|
prm.traverseOpts = append(prm.traverseOpts, placement.UseBuilder(builder))
|
|
|
|
return nil
|
|
}
|
|
|
|
func (p *Streamer) newCommonTarget(prm *PutInitPrm) *distributedTarget {
|
|
var relay func(context.Context, nodeDesc) error
|
|
if p.relay != nil {
|
|
relay = func(ctx context.Context, node nodeDesc) error {
|
|
var info client.NodeInfo
|
|
|
|
client.NodeInfoFromNetmapElement(&info, node.info)
|
|
|
|
c, err := p.clientConstructor.Get(info)
|
|
if err != nil {
|
|
return fmt.Errorf("could not create SDK client %s: %w", info.AddressGroup(), err)
|
|
}
|
|
|
|
return p.relay(ctx, info, c)
|
|
}
|
|
}
|
|
|
|
// enable additional container broadcast on non-local operation
|
|
// if object has TOMBSTONE or LOCK type.
|
|
typ := prm.hdr.Type()
|
|
withBroadcast := !prm.common.LocalOnly() && (typ == objectSDK.TypeTombstone || typ == objectSDK.TypeLock)
|
|
|
|
return &distributedTarget{
|
|
cfg: p.cfg,
|
|
placementOpts: prm.traverseOpts,
|
|
extraBroadcastEnabled: withBroadcast,
|
|
payload: getPayload(),
|
|
nodeTargetInitializer: func(node nodeDesc) preparedObjectTarget {
|
|
if node.local {
|
|
return localTarget{
|
|
storage: p.localStore,
|
|
}
|
|
}
|
|
|
|
rt := &remoteTarget{
|
|
privateKey: p.sessionKey,
|
|
commonPrm: prm.common,
|
|
clientConstructor: p.clientConstructor,
|
|
}
|
|
|
|
client.NodeInfoFromNetmapElement(&rt.nodeInfo, node.info)
|
|
|
|
return rt
|
|
},
|
|
relay: relay,
|
|
}
|
|
}
|
|
|
|
func (p *Streamer) SendChunk(ctx context.Context, prm *PutChunkPrm) error {
|
|
if p.target == nil {
|
|
return errNotInit
|
|
}
|
|
|
|
if _, err := p.target.Write(ctx, prm.chunk); err != nil {
|
|
return fmt.Errorf("(%T) could not write payload chunk to target: %w", p, err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (p *Streamer) Close(ctx context.Context) (*PutResponse, error) {
|
|
if p.target == nil {
|
|
return nil, errNotInit
|
|
}
|
|
|
|
ids, err := p.target.Close(ctx)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("(%T) could not close object target: %w", p, err)
|
|
}
|
|
|
|
id := ids.ParentID
|
|
if id != nil {
|
|
return &PutResponse{
|
|
id: *id,
|
|
}, nil
|
|
}
|
|
|
|
return &PutResponse{
|
|
id: ids.SelfID,
|
|
}, nil
|
|
}
|
|
|
|
func (c *cfg) getWorkerPool(pub []byte) (pkgutil.WorkerPool, bool) {
|
|
if c.netmapKeys.IsLocalKey(pub) {
|
|
return c.localPool, true
|
|
}
|
|
return c.remotePool, false
|
|
}
|