[#1310] object: Move target initialization to separate package
All checks were successful
Tests and linters / Run gofumpt (pull_request) Successful in 30s
DCO action / DCO (pull_request) Successful in 58s
Vulncheck / Vulncheck (pull_request) Successful in 1m35s
Pre-commit hooks / Pre-commit (pull_request) Successful in 1m46s
Build / Build Components (pull_request) Successful in 1m56s
Tests and linters / gopls check (pull_request) Successful in 2m19s
Tests and linters / Staticcheck (pull_request) Successful in 2m39s
Tests and linters / Tests (pull_request) Successful in 3m17s
Tests and linters / Tests with -race (pull_request) Successful in 3m23s
Tests and linters / Lint (pull_request) Successful in 3m36s

* Split the logic of write target initialization to different packages;
* Refactor patch and put services: since both service initialize the target
  themselves.

Signed-off-by: Airat Arifullin <a.arifullin@yadro.com>
This commit is contained in:
Airat Arifullin 2024-08-30 12:09:14 +03:00
parent a685fcdc96
commit 18220985bf
22 changed files with 599 additions and 585 deletions

View file

@ -7,7 +7,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/metrics" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/metrics"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
putsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/put" objectwriter "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/common/writer"
utilSync "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/sync" utilSync "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/sync"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
@ -305,11 +305,11 @@ type ttlMaxObjectSizeCache struct {
mtx sync.RWMutex mtx sync.RWMutex
lastUpdated time.Time lastUpdated time.Time
lastSize uint64 lastSize uint64
src putsvc.MaxSizeSource src objectwriter.MaxSizeSource
metrics cacheMetrics metrics cacheMetrics
} }
func newCachedMaxObjectSizeSource(src putsvc.MaxSizeSource) putsvc.MaxSizeSource { func newCachedMaxObjectSizeSource(src objectwriter.MaxSizeSource) objectwriter.MaxSizeSource {
return &ttlMaxObjectSizeCache{ return &ttlMaxObjectSizeCache{
src: src, src: src,
metrics: metrics.NewCacheMetrics("max_object_size"), metrics: metrics.NewCacheMetrics("max_object_size"),

View file

@ -24,6 +24,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/acl" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/acl"
v2 "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/acl/v2" v2 "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/acl/v2"
objectAPE "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/ape" objectAPE "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/ape"
objectwriter "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/common/writer"
deletesvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/delete" deletesvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/delete"
deletesvcV2 "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/delete/v2" deletesvcV2 "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/delete/v2"
getsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/get" getsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/get"
@ -188,7 +189,7 @@ func initObjectService(c *cfg) {
sDeleteV2 := createDeleteServiceV2(sDelete) sDeleteV2 := createDeleteServiceV2(sDelete)
sPatch := createPatchSvc(sGet, sPut, keyStorage) sPatch := createPatchSvc(sGet, sPut)
// build service pipeline // build service pipeline
// grpc | audit | <metrics> | signature | response | acl | ape | split // grpc | audit | <metrics> | signature | response | acl | ape | split
@ -326,7 +327,7 @@ func createReplicator(c *cfg, keyStorage *util.KeyStorage, cache *cache.ClientCa
), ),
replicator.WithLocalStorage(ls), replicator.WithLocalStorage(ls),
replicator.WithRemoteSender( replicator.WithRemoteSender(
putsvc.NewRemoteSender(keyStorage, cache), objectwriter.NewRemoteSender(keyStorage, cache),
), ),
replicator.WithRemoteGetter( replicator.WithRemoteGetter(
getsvc.NewRemoteGetter(c.clientCache, c.netMapSource, keyStorage), getsvc.NewRemoteGetter(c.clientCache, c.netMapSource, keyStorage),
@ -338,7 +339,7 @@ func createReplicator(c *cfg, keyStorage *util.KeyStorage, cache *cache.ClientCa
func createPutSvc(c *cfg, keyStorage *util.KeyStorage, irFetcher *cachedIRFetcher) *putsvc.Service { func createPutSvc(c *cfg, keyStorage *util.KeyStorage, irFetcher *cachedIRFetcher) *putsvc.Service {
ls := c.cfgObject.cfgLocalStorage.localStorage ls := c.cfgObject.cfgLocalStorage.localStorage
var os putsvc.ObjectStorage = engineWithoutNotifications{ var os objectwriter.ObjectStorage = engineWithoutNotifications{
engine: ls, engine: ls,
} }
@ -352,9 +353,9 @@ func createPutSvc(c *cfg, keyStorage *util.KeyStorage, irFetcher *cachedIRFetche
c, c,
c.cfgNetmap.state, c.cfgNetmap.state,
irFetcher, irFetcher,
putsvc.WithWorkerPools(c.cfgObject.pool.putRemote, c.cfgObject.pool.putLocal), objectwriter.WithWorkerPools(c.cfgObject.pool.putRemote, c.cfgObject.pool.putLocal),
putsvc.WithLogger(c.log), objectwriter.WithLogger(c.log),
putsvc.WithVerifySessionTokenIssuer(!c.cfgObject.skipSessionTokenIssuerVerification), objectwriter.WithVerifySessionTokenIssuer(!c.cfgObject.skipSessionTokenIssuerVerification),
) )
} }
@ -362,8 +363,8 @@ func createPutSvcV2(sPut *putsvc.Service, keyStorage *util.KeyStorage) *putsvcV2
return putsvcV2.NewService(sPut, keyStorage) return putsvcV2.NewService(sPut, keyStorage)
} }
func createPatchSvc(sGet *getsvc.Service, sPut *putsvc.Service, keyStorage *util.KeyStorage) *patchsvc.Service { func createPatchSvc(sGet *getsvc.Service, sPut *putsvc.Service) *patchsvc.Service {
return patchsvc.NewService(keyStorage, sGet, sPut) return patchsvc.NewService(sPut.Config, sGet)
} }
func createSearchSvc(c *cfg, keyStorage *util.KeyStorage, traverseGen *util.TraverserGenerator, coreConstructor *cache.ClientCache) *searchsvc.Service { func createSearchSvc(c *cfg, keyStorage *util.KeyStorage, traverseGen *util.TraverserGenerator, coreConstructor *cache.ClientCache) *searchsvc.Service {

View file

@ -1,4 +1,4 @@
package putsvc package target
import ( import (
"context" "context"

View file

@ -1,4 +1,4 @@
package putsvc package target
import ( import (
"sync" "sync"

View file

@ -0,0 +1,170 @@
package target
import (
"errors"
"fmt"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
objectwriter "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/common/writer"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/transformer"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
)
func New(prm *objectwriter.Params) (transformer.ChunkedObjectWriter, error) {
// prepare needed put parameters
if err := preparePrm(prm); err != nil {
return nil, fmt.Errorf("could not prepare put parameters: %w", err)
}
if prm.Header.Signature() != nil {
return newUntrustedTarget(prm)
}
return newTrustedTarget(prm)
}
func newUntrustedTarget(prm *objectwriter.Params) (transformer.ChunkedObjectWriter, error) {
maxPayloadSz := prm.Config.MaxSizeSrc.MaxObjectSize()
if maxPayloadSz == 0 {
return nil, errors.New("could not obtain max object size parameter")
}
if prm.SignRequestPrivateKey == nil {
nodeKey, err := prm.Config.KeyStorage.GetKey(nil)
if err != nil {
return nil, err
}
prm.SignRequestPrivateKey = nodeKey
}
// prepare untrusted-Put object target
return &validatingPreparedTarget{
nextTarget: newInMemoryObjectBuilder(objectwriter.New(prm)),
fmt: prm.Config.FormatValidator,
maxPayloadSz: maxPayloadSz,
}, nil
}
func newTrustedTarget(prm *objectwriter.Params) (transformer.ChunkedObjectWriter, error) {
maxPayloadSz := prm.Config.MaxSizeSrc.MaxObjectSize()
if maxPayloadSz == 0 {
return nil, errors.New("could not obtain max object size parameter")
}
sToken := prm.Common.SessionToken()
// prepare trusted-Put object target
// get private token from local storage
var sessionInfo *util.SessionInfo
if sToken != nil {
sessionInfo = &util.SessionInfo{
ID: sToken.ID(),
Owner: sToken.Issuer(),
}
}
key, err := prm.Config.KeyStorage.GetKey(sessionInfo)
if err != nil {
return nil, fmt.Errorf("could not receive session key: %w", err)
}
// In case session token is missing, the line above returns the default key.
// If it isn't owner key, replication attempts will fail, thus this check.
ownerObj := prm.Header.OwnerID()
if ownerObj.IsEmpty() {
return nil, errors.New("missing object owner")
}
if sToken == nil {
var ownerSession user.ID
user.IDFromKey(&ownerSession, key.PublicKey)
if !ownerObj.Equals(ownerSession) {
return nil, errors.New("session token is missing but object owner id is different from the default key")
}
} else {
if !ownerObj.Equals(sessionInfo.Owner) {
return nil, fmt.Errorf("different token issuer and object owner identifiers %s/%s", sessionInfo.Owner, ownerObj)
}
}
if prm.SignRequestPrivateKey == nil {
prm.SignRequestPrivateKey = key
}
return &validatingTarget{
fmt: prm.Config.FormatValidator,
nextTarget: transformer.NewPayloadSizeLimiter(transformer.Params{
Key: key,
NextTargetInit: func() transformer.ObjectWriter { return objectwriter.New(prm) },
NetworkState: prm.Config.NetworkState,
MaxSize: maxPayloadSz,
WithoutHomomorphicHash: containerSDK.IsHomomorphicHashingDisabled(prm.Container),
SessionToken: sToken,
}),
}, nil
}
func preparePrm(prm *objectwriter.Params) error {
var err error
// get latest network map
nm, err := netmap.GetLatestNetworkMap(prm.Config.NetmapSource)
if err != nil {
//return fmt.Errorf("(%T) could not get latest network map: %w", p, err)
return fmt.Errorf("could not get latest network map: %w", err)
}
idCnr, ok := prm.Header.ContainerID()
if !ok {
return errors.New("missing container ID")
}
// get container to store the object
cnrInfo, err := prm.Config.ContainerSource.Get(idCnr)
if err != nil {
//return fmt.Errorf("(%T) could not get container by ID: %w", p, err)
return fmt.Errorf("could not get container by ID: %w", err)
}
prm.Container = cnrInfo.Value
// add common options
prm.TraverseOpts = append(prm.TraverseOpts,
// set processing container
placement.ForContainer(prm.Container),
)
if ech := prm.Header.ECHeader(); ech != nil {
prm.TraverseOpts = append(prm.TraverseOpts,
// set identifier of the processing object
placement.ForObject(ech.Parent()),
)
} else if id, ok := prm.Header.ID(); ok {
prm.TraverseOpts = append(prm.TraverseOpts,
// set identifier of the processing object
placement.ForObject(id),
)
}
// create placement builder from network map
builder := placement.NewNetworkMapBuilder(nm)
if prm.Common.LocalOnly() {
// restrict success count to 1 stored copy (to local storage)
prm.TraverseOpts = append(prm.TraverseOpts, placement.SuccessAfter(1))
// use local-only placement builder
builder = util.NewLocalPlacement(builder, prm.Config.NetmapKeys)
}
// set placement builder
prm.TraverseOpts = append(prm.TraverseOpts, placement.UseBuilder(builder))
return nil
}

View file

@ -1,4 +1,4 @@
package putsvc package target
import ( import (
"bytes" "bytes"

View file

@ -1,4 +1,4 @@
package putsvc package writer
import ( import (
"context" "context"
@ -13,23 +13,23 @@ import (
"go.uber.org/zap" "go.uber.org/zap"
) )
type nodeIterator struct { type NodeIterator struct {
traversal Traversal
cfg *cfg cfg *Config
} }
func (c *cfg) newNodeIterator(opts []placement.Option) *nodeIterator { func (c *Config) NewNodeIterator(opts []placement.Option) *NodeIterator {
return &nodeIterator{ return &NodeIterator{
traversal: traversal{ Traversal: Traversal{
opts: opts, Opts: opts,
mExclude: make(map[string]*bool), Exclude: make(map[string]*bool),
}, },
cfg: c, cfg: c,
} }
} }
func (n *nodeIterator) forEachNode(ctx context.Context, f func(context.Context, nodeDesc) error) error { func (n *NodeIterator) ForEachNode(ctx context.Context, f func(context.Context, NodeDescriptor) error) error {
traverser, err := placement.NewTraverser(n.traversal.opts...) traverser, err := placement.NewTraverser(n.Traversal.Opts...)
if err != nil { if err != nil {
return fmt.Errorf("could not create object placement traverser: %w", err) return fmt.Errorf("could not create object placement traverser: %w", err)
} }
@ -56,10 +56,10 @@ func (n *nodeIterator) forEachNode(ctx context.Context, f func(context.Context,
} }
// perform additional container broadcast if needed // perform additional container broadcast if needed
if n.traversal.submitPrimaryPlacementFinish() { if n.Traversal.submitPrimaryPlacementFinish() {
err := n.forEachNode(ctx, f) err := n.ForEachNode(ctx, f)
if err != nil { if err != nil {
n.cfg.log.Error(logs.PutAdditionalContainerBroadcastFailure, zap.Error(err)) n.cfg.Logger.Error(logs.PutAdditionalContainerBroadcastFailure, zap.Error(err))
// we don't fail primary operation because of broadcast failure // we don't fail primary operation because of broadcast failure
} }
} }
@ -67,11 +67,11 @@ func (n *nodeIterator) forEachNode(ctx context.Context, f func(context.Context,
return nil return nil
} }
func (n *nodeIterator) forEachAddress(ctx context.Context, traverser *placement.Traverser, addrs []placement.Node, f func(context.Context, nodeDesc) error, resErr *atomic.Value) bool { func (n *NodeIterator) forEachAddress(ctx context.Context, traverser *placement.Traverser, addrs []placement.Node, f func(context.Context, NodeDescriptor) error, resErr *atomic.Value) bool {
var wg sync.WaitGroup var wg sync.WaitGroup
for _, addr := range addrs { for _, addr := range addrs {
if ok := n.mExclude[string(addr.PublicKey())]; ok != nil { if ok := n.Exclude[string(addr.PublicKey())]; ok != nil {
if *ok { if *ok {
traverser.SubmitSuccess() traverser.SubmitSuccess()
} }
@ -86,10 +86,10 @@ func (n *nodeIterator) forEachAddress(ctx context.Context, traverser *placement.
if err := workerPool.Submit(func() { if err := workerPool.Submit(func() {
defer wg.Done() defer wg.Done()
err := f(ctx, nodeDesc{local: isLocal, info: addr}) err := f(ctx, NodeDescriptor{Local: isLocal, Info: addr})
if err != nil { if err != nil {
resErr.Store(err) resErr.Store(err)
svcutil.LogServiceError(n.cfg.log, "PUT", addr.Addresses(), err) svcutil.LogServiceError(n.cfg.Logger, "PUT", addr.Addresses(), err)
return return
} }
@ -97,7 +97,7 @@ func (n *nodeIterator) forEachAddress(ctx context.Context, traverser *placement.
*item = true *item = true
}); err != nil { }); err != nil {
wg.Done() wg.Done()
svcutil.LogWorkerPoolError(n.cfg.log, "PUT", err) svcutil.LogWorkerPoolError(n.cfg.Logger, "PUT", err)
return true return true
} }
@ -105,7 +105,7 @@ func (n *nodeIterator) forEachAddress(ctx context.Context, traverser *placement.
// in subsequent container broadcast. Note that we don't // in subsequent container broadcast. Note that we don't
// process this node during broadcast if primary placement // process this node during broadcast if primary placement
// on it failed. // on it failed.
n.traversal.submitProcessed(addr, item) n.Traversal.submitProcessed(addr, item)
} }
wg.Wait() wg.Wait()
@ -113,6 +113,6 @@ func (n *nodeIterator) forEachAddress(ctx context.Context, traverser *placement.
return false return false
} }
func needAdditionalBroadcast(obj *objectSDK.Object, localOnly bool) bool { func NeedAdditionalBroadcast(obj *objectSDK.Object, localOnly bool) bool {
return len(obj.Children()) > 0 || (!localOnly && (obj.Type() == objectSDK.TypeTombstone || obj.Type() == objectSDK.TypeLock)) return len(obj.Children()) > 0 || (!localOnly && (obj.Type() == objectSDK.TypeTombstone || obj.Type() == objectSDK.TypeLock))
} }

View file

@ -1,4 +1,4 @@
package putsvc package writer
import ( import (
"context" "context"

View file

@ -1,4 +1,4 @@
package putsvc package writer
import ( import (
"context" "context"
@ -13,47 +13,47 @@ type preparedObjectTarget interface {
WriteObject(context.Context, *objectSDK.Object, object.ContentMeta) error WriteObject(context.Context, *objectSDK.Object, object.ContentMeta) error
} }
type distributedTarget struct { type distributedWriter struct {
cfg *Config
placementOpts []placement.Option placementOpts []placement.Option
obj *objectSDK.Object obj *objectSDK.Object
objMeta object.ContentMeta objMeta object.ContentMeta
*cfg nodeTargetInitializer func(NodeDescriptor) preparedObjectTarget
nodeTargetInitializer func(nodeDesc) preparedObjectTarget relay func(context.Context, NodeDescriptor) error
relay func(context.Context, nodeDesc) error
resetSuccessAfterOnBroadcast bool resetSuccessAfterOnBroadcast bool
} }
// parameters and state of container traversal. // parameters and state of container Traversal.
type traversal struct { type Traversal struct {
opts []placement.Option Opts []placement.Option
// need of additional broadcast after the object is saved // need of additional broadcast after the object is saved
extraBroadcastEnabled bool ExtraBroadcastEnabled bool
// container nodes which was processed during the primary object placement // container nodes which was processed during the primary object placement
mExclude map[string]*bool Exclude map[string]*bool
resetSuccessAfterOnBroadcast bool ResetSuccessAfterOnBroadcast bool
} }
// updates traversal parameters after the primary placement finish and // updates traversal parameters after the primary placement finish and
// returns true if additional container broadcast is needed. // returns true if additional container broadcast is needed.
func (x *traversal) submitPrimaryPlacementFinish() bool { func (x *Traversal) submitPrimaryPlacementFinish() bool {
if x.extraBroadcastEnabled { if x.ExtraBroadcastEnabled {
// do not track success during container broadcast (best-effort) // do not track success during container broadcast (best-effort)
x.opts = append(x.opts, placement.WithoutSuccessTracking()) x.Opts = append(x.Opts, placement.WithoutSuccessTracking())
if x.resetSuccessAfterOnBroadcast { if x.ResetSuccessAfterOnBroadcast {
x.opts = append(x.opts, placement.ResetSuccessAfter()) x.Opts = append(x.Opts, placement.ResetSuccessAfter())
} }
// avoid 2nd broadcast // avoid 2nd broadcast
x.extraBroadcastEnabled = false x.ExtraBroadcastEnabled = false
return true return true
} }
@ -62,22 +62,22 @@ func (x *traversal) submitPrimaryPlacementFinish() bool {
} }
// marks the container node as processed during the primary object placement. // marks the container node as processed during the primary object placement.
func (x *traversal) submitProcessed(n placement.Node, item *bool) { func (x *Traversal) submitProcessed(n placement.Node, item *bool) {
if x.extraBroadcastEnabled { if x.ExtraBroadcastEnabled {
key := string(n.PublicKey()) key := string(n.PublicKey())
if x.mExclude == nil { if x.Exclude == nil {
x.mExclude = make(map[string]*bool, 1) x.Exclude = make(map[string]*bool, 1)
} }
x.mExclude[key] = item x.Exclude[key] = item
} }
} }
type nodeDesc struct { type NodeDescriptor struct {
local bool Local bool
info placement.Node Info placement.Node
} }
// errIncompletePut is returned if processing on a container fails. // errIncompletePut is returned if processing on a container fails.
@ -96,19 +96,19 @@ func (x errIncompletePut) Error() string {
} }
// WriteObject implements the transformer.ObjectWriter interface. // WriteObject implements the transformer.ObjectWriter interface.
func (t *distributedTarget) WriteObject(ctx context.Context, obj *objectSDK.Object) error { func (t *distributedWriter) WriteObject(ctx context.Context, obj *objectSDK.Object) error {
t.obj = obj t.obj = obj
var err error var err error
if t.objMeta, err = t.fmtValidator.ValidateContent(t.obj); err != nil { if t.objMeta, err = t.cfg.FormatValidator.ValidateContent(t.obj); err != nil {
return fmt.Errorf("(%T) could not validate payload content: %w", t, err) return fmt.Errorf("(%T) could not validate payload content: %w", t, err)
} }
return t.iteratePlacement(ctx) return t.iteratePlacement(ctx)
} }
func (t *distributedTarget) sendObject(ctx context.Context, node nodeDesc) error { func (t *distributedWriter) sendObject(ctx context.Context, node NodeDescriptor) error {
if !node.local && t.relay != nil { if !node.Local && t.relay != nil {
return t.relay(ctx, node) return t.relay(ctx, node)
} }
@ -121,11 +121,11 @@ func (t *distributedTarget) sendObject(ctx context.Context, node nodeDesc) error
return nil return nil
} }
func (t *distributedTarget) iteratePlacement(ctx context.Context) error { func (t *distributedWriter) iteratePlacement(ctx context.Context) error {
id, _ := t.obj.ID() id, _ := t.obj.ID()
iter := t.cfg.newNodeIterator(append(t.placementOpts, placement.ForObject(id))) iter := t.cfg.NewNodeIterator(append(t.placementOpts, placement.ForObject(id)))
iter.extraBroadcastEnabled = needAdditionalBroadcast(t.obj, false /* Distributed target is for cluster-wide PUT */) iter.ExtraBroadcastEnabled = NeedAdditionalBroadcast(t.obj, false /* Distributed target is for cluster-wide PUT */)
iter.resetSuccessAfterOnBroadcast = t.resetSuccessAfterOnBroadcast iter.ResetSuccessAfterOnBroadcast = t.resetSuccessAfterOnBroadcast
return iter.forEachNode(ctx, t.sendObject) return iter.ForEachNode(ctx, t.sendObject)
} }

View file

@ -1,4 +1,4 @@
package putsvc package writer
import ( import (
"context" "context"
@ -23,23 +23,23 @@ import (
"golang.org/x/sync/errgroup" "golang.org/x/sync/errgroup"
) )
var _ transformer.ObjectWriter = (*ecWriter)(nil) var _ transformer.ObjectWriter = (*ECWriter)(nil)
var errUnsupportedECObject = errors.New("object is not supported for erasure coding") var errUnsupportedECObject = errors.New("object is not supported for erasure coding")
type ecWriter struct { type ECWriter struct {
cfg *cfg Config *Config
placementOpts []placement.Option PlacementOpts []placement.Option
container containerSDK.Container Container containerSDK.Container
key *ecdsa.PrivateKey Key *ecdsa.PrivateKey
commonPrm *svcutil.CommonPrm CommonPrm *svcutil.CommonPrm
relay func(context.Context, client.NodeInfo, client.MultiAddressClient) error Relay func(context.Context, client.NodeInfo, client.MultiAddressClient) error
objMeta object.ContentMeta ObjectMeta object.ContentMeta
objMetaValid bool ObjectMetaValid bool
} }
func (e *ecWriter) WriteObject(ctx context.Context, obj *objectSDK.Object) error { func (e *ECWriter) WriteObject(ctx context.Context, obj *objectSDK.Object) error {
relayed, err := e.relayIfNotContainerNode(ctx, obj) relayed, err := e.relayIfNotContainerNode(ctx, obj)
if err != nil { if err != nil {
return err return err
@ -53,11 +53,11 @@ func (e *ecWriter) WriteObject(ctx context.Context, obj *objectSDK.Object) error
return errUnsupportedECObject return errUnsupportedECObject
} }
if !e.objMetaValid { if !e.ObjectMetaValid {
if e.objMeta, err = e.cfg.fmtValidator.ValidateContent(obj); err != nil { if e.ObjectMeta, err = e.Config.FormatValidator.ValidateContent(obj); err != nil {
return fmt.Errorf("(%T) could not validate payload content: %w", e, err) return fmt.Errorf("(%T) could not validate payload content: %w", e, err)
} }
e.objMetaValid = true e.ObjectMetaValid = true
} }
if obj.ECHeader() != nil { if obj.ECHeader() != nil {
@ -66,8 +66,8 @@ func (e *ecWriter) WriteObject(ctx context.Context, obj *objectSDK.Object) error
return e.writeRawObject(ctx, obj) return e.writeRawObject(ctx, obj)
} }
func (e *ecWriter) relayIfNotContainerNode(ctx context.Context, obj *objectSDK.Object) (bool, error) { func (e *ECWriter) relayIfNotContainerNode(ctx context.Context, obj *objectSDK.Object) (bool, error) {
if e.relay == nil { if e.Relay == nil {
return false, nil return false, nil
} }
currentNodeIsContainerNode, err := e.currentNodeIsContainerNode() currentNodeIsContainerNode, err := e.currentNodeIsContainerNode()
@ -90,8 +90,8 @@ func (e *ecWriter) relayIfNotContainerNode(ctx context.Context, obj *objectSDK.O
return true, nil return true, nil
} }
func (e *ecWriter) currentNodeIsContainerNode() (bool, error) { func (e *ECWriter) currentNodeIsContainerNode() (bool, error) {
t, err := placement.NewTraverser(e.placementOpts...) t, err := placement.NewTraverser(e.PlacementOpts...)
if err != nil { if err != nil {
return false, err return false, err
} }
@ -101,7 +101,7 @@ func (e *ecWriter) currentNodeIsContainerNode() (bool, error) {
break break
} }
for _, node := range nodes { for _, node := range nodes {
if e.cfg.netmapKeys.IsLocalKey(node.PublicKey()) { if e.Config.NetmapKeys.IsLocalKey(node.PublicKey()) {
return true, nil return true, nil
} }
} }
@ -109,8 +109,8 @@ func (e *ecWriter) currentNodeIsContainerNode() (bool, error) {
return false, nil return false, nil
} }
func (e *ecWriter) relayToContainerNode(ctx context.Context, objID oid.ID, index uint32) error { func (e *ECWriter) relayToContainerNode(ctx context.Context, objID oid.ID, index uint32) error {
t, err := placement.NewTraverser(append(e.placementOpts, placement.ForObject(objID))...) t, err := placement.NewTraverser(append(e.PlacementOpts, placement.ForObject(objID))...)
if err != nil { if err != nil {
return err return err
} }
@ -126,18 +126,18 @@ func (e *ecWriter) relayToContainerNode(ctx context.Context, objID oid.ID, index
var info client.NodeInfo var info client.NodeInfo
client.NodeInfoFromNetmapElement(&info, node) client.NodeInfoFromNetmapElement(&info, node)
c, err := e.cfg.clientConstructor.Get(info) c, err := e.Config.ClientConstructor.Get(info)
if err != nil { if err != nil {
return fmt.Errorf("could not create SDK client %s: %w", info.AddressGroup(), err) return fmt.Errorf("could not create SDK client %s: %w", info.AddressGroup(), err)
} }
completed := make(chan interface{}) completed := make(chan interface{})
if poolErr := e.cfg.remotePool.Submit(func() { if poolErr := e.Config.RemotePool.Submit(func() {
defer close(completed) defer close(completed)
err = e.relay(ctx, info, c) err = e.Relay(ctx, info, c)
}); poolErr != nil { }); poolErr != nil {
close(completed) close(completed)
svcutil.LogWorkerPoolError(e.cfg.log, "PUT", poolErr) svcutil.LogWorkerPoolError(e.Config.Logger, "PUT", poolErr)
return poolErr return poolErr
} }
<-completed <-completed
@ -145,7 +145,7 @@ func (e *ecWriter) relayToContainerNode(ctx context.Context, objID oid.ID, index
if err == nil { if err == nil {
return nil return nil
} }
e.cfg.log.Logger.Warn(logs.ECFailedToSendToContainerNode, zap.Stringers("address_group", info.AddressGroup())) e.Config.Logger.Logger.Warn(logs.ECFailedToSendToContainerNode, zap.Stringers("address_group", info.AddressGroup()))
lastErr = err lastErr = err
} }
} }
@ -157,12 +157,12 @@ func (e *ecWriter) relayToContainerNode(ctx context.Context, objID oid.ID, index
} }
} }
func (e *ecWriter) writeECPart(ctx context.Context, obj *objectSDK.Object) error { func (e *ECWriter) writeECPart(ctx context.Context, obj *objectSDK.Object) error {
if e.commonPrm.LocalOnly() { if e.CommonPrm.LocalOnly() {
return e.writePartLocal(ctx, obj) return e.writePartLocal(ctx, obj)
} }
t, err := placement.NewTraverser(append(e.placementOpts, placement.ForObject(obj.ECHeader().Parent()))...) t, err := placement.NewTraverser(append(e.PlacementOpts, placement.ForObject(obj.ECHeader().Parent()))...)
if err != nil { if err != nil {
return err return err
} }
@ -187,18 +187,18 @@ func (e *ecWriter) writeECPart(ctx context.Context, obj *objectSDK.Object) error
return nil return nil
} }
func (e *ecWriter) writeRawObject(ctx context.Context, obj *objectSDK.Object) error { func (e *ECWriter) writeRawObject(ctx context.Context, obj *objectSDK.Object) error {
// now only single EC policy is supported // now only single EC policy is supported
c, err := erasurecode.NewConstructor(policy.ECDataCount(e.container.PlacementPolicy()), policy.ECParityCount(e.container.PlacementPolicy())) c, err := erasurecode.NewConstructor(policy.ECDataCount(e.Container.PlacementPolicy()), policy.ECParityCount(e.Container.PlacementPolicy()))
if err != nil { if err != nil {
return err return err
} }
parts, err := c.Split(obj, e.key) parts, err := c.Split(obj, e.Key)
if err != nil { if err != nil {
return err return err
} }
objID, _ := obj.ID() objID, _ := obj.ID()
t, err := placement.NewTraverser(append(e.placementOpts, placement.ForObject(objID))...) t, err := placement.NewTraverser(append(e.PlacementOpts, placement.ForObject(objID))...)
if err != nil { if err != nil {
return err return err
} }
@ -230,7 +230,7 @@ func (e *ecWriter) writeRawObject(ctx context.Context, obj *objectSDK.Object) er
return nil return nil
} }
func (e *ecWriter) writePart(ctx context.Context, obj *objectSDK.Object, partIdx int, nodes []placement.Node, visited []atomic.Bool) error { func (e *ECWriter) writePart(ctx context.Context, obj *objectSDK.Object, partIdx int, nodes []placement.Node, visited []atomic.Bool) error {
select { select {
case <-ctx.Done(): case <-ctx.Done():
return ctx.Err() return ctx.Err()
@ -243,7 +243,7 @@ func (e *ecWriter) writePart(ctx context.Context, obj *objectSDK.Object, partIdx
if err == nil { if err == nil {
return nil return nil
} }
e.cfg.log.Warn(logs.ECFailedToSaveECPart, zap.Stringer("part_address", object.AddressOf(obj)), e.Config.Logger.Warn(logs.ECFailedToSaveECPart, zap.Stringer("part_address", object.AddressOf(obj)),
zap.Stringer("parent_address", obj.ECHeader().Parent()), zap.Int("part_index", partIdx), zap.Stringer("parent_address", obj.ECHeader().Parent()), zap.Int("part_index", partIdx),
zap.String("node", hex.EncodeToString(node.PublicKey())), zap.Error(err)) zap.String("node", hex.EncodeToString(node.PublicKey())), zap.Error(err))
@ -267,7 +267,7 @@ func (e *ecWriter) writePart(ctx context.Context, obj *objectSDK.Object, partIdx
if err == nil { if err == nil {
return nil return nil
} }
e.cfg.log.Warn(logs.ECFailedToSaveECPart, zap.Stringer("part_address", object.AddressOf(obj)), e.Config.Logger.Warn(logs.ECFailedToSaveECPart, zap.Stringer("part_address", object.AddressOf(obj)),
zap.Stringer("parent_address", obj.ECHeader().Parent()), zap.Int("part_index", partIdx), zap.Stringer("parent_address", obj.ECHeader().Parent()), zap.Int("part_index", partIdx),
zap.String("node", hex.EncodeToString(node.PublicKey())), zap.String("node", hex.EncodeToString(node.PublicKey())),
zap.Error(err)) zap.Error(err))
@ -291,7 +291,7 @@ func (e *ecWriter) writePart(ctx context.Context, obj *objectSDK.Object, partIdx
if err == nil { if err == nil {
return nil return nil
} }
e.cfg.log.Warn(logs.ECFailedToSaveECPart, zap.Stringer("part_address", object.AddressOf(obj)), e.Config.Logger.Warn(logs.ECFailedToSaveECPart, zap.Stringer("part_address", object.AddressOf(obj)),
zap.Stringer("parent_address", obj.ECHeader().Parent()), zap.Int("part_index", partIdx), zap.Stringer("parent_address", obj.ECHeader().Parent()), zap.Int("part_index", partIdx),
zap.String("node", hex.EncodeToString(node.PublicKey())), zap.String("node", hex.EncodeToString(node.PublicKey())),
zap.Error(err)) zap.Error(err))
@ -300,22 +300,22 @@ func (e *ecWriter) writePart(ctx context.Context, obj *objectSDK.Object, partIdx
return fmt.Errorf("failed to save EC chunk %s to any node", object.AddressOf(obj)) return fmt.Errorf("failed to save EC chunk %s to any node", object.AddressOf(obj))
} }
func (e *ecWriter) putECPartToNode(ctx context.Context, obj *objectSDK.Object, node placement.Node) error { func (e *ECWriter) putECPartToNode(ctx context.Context, obj *objectSDK.Object, node placement.Node) error {
if e.cfg.netmapKeys.IsLocalKey(node.PublicKey()) { if e.Config.NetmapKeys.IsLocalKey(node.PublicKey()) {
return e.writePartLocal(ctx, obj) return e.writePartLocal(ctx, obj)
} }
return e.writePartRemote(ctx, obj, node) return e.writePartRemote(ctx, obj, node)
} }
func (e *ecWriter) writePartLocal(ctx context.Context, obj *objectSDK.Object) error { func (e *ECWriter) writePartLocal(ctx context.Context, obj *objectSDK.Object) error {
var err error var err error
localTarget := localTarget{ localTarget := LocalTarget{
storage: e.cfg.localStore, Storage: e.Config.LocalStore,
} }
completed := make(chan interface{}) completed := make(chan interface{})
if poolErr := e.cfg.localPool.Submit(func() { if poolErr := e.Config.LocalPool.Submit(func() {
defer close(completed) defer close(completed)
err = localTarget.WriteObject(ctx, obj, e.objMeta) err = localTarget.WriteObject(ctx, obj, e.ObjectMeta)
}); poolErr != nil { }); poolErr != nil {
close(completed) close(completed)
return poolErr return poolErr
@ -324,22 +324,22 @@ func (e *ecWriter) writePartLocal(ctx context.Context, obj *objectSDK.Object) er
return err return err
} }
func (e *ecWriter) writePartRemote(ctx context.Context, obj *objectSDK.Object, node placement.Node) error { func (e *ECWriter) writePartRemote(ctx context.Context, obj *objectSDK.Object, node placement.Node) error {
var clientNodeInfo client.NodeInfo var clientNodeInfo client.NodeInfo
client.NodeInfoFromNetmapElement(&clientNodeInfo, node) client.NodeInfoFromNetmapElement(&clientNodeInfo, node)
remoteTaget := remoteTarget{ remoteTaget := remoteWriter{
privateKey: e.key, privateKey: e.Key,
clientConstructor: e.cfg.clientConstructor, clientConstructor: e.Config.ClientConstructor,
commonPrm: e.commonPrm, commonPrm: e.CommonPrm,
nodeInfo: clientNodeInfo, nodeInfo: clientNodeInfo,
} }
var err error var err error
completed := make(chan interface{}) completed := make(chan interface{})
if poolErr := e.cfg.remotePool.Submit(func() { if poolErr := e.Config.RemotePool.Submit(func() {
defer close(completed) defer close(completed)
err = remoteTaget.WriteObject(ctx, obj, e.objMeta) err = remoteTaget.WriteObject(ctx, obj, e.ObjectMeta)
}); poolErr != nil { }); poolErr != nil {
close(completed) close(completed)
return poolErr return poolErr

View file

@ -1,4 +1,4 @@
package putsvc package writer
import ( import (
"context" "context"
@ -24,19 +24,19 @@ type ObjectStorage interface {
IsLocked(context.Context, oid.Address) (bool, error) IsLocked(context.Context, oid.Address) (bool, error)
} }
type localTarget struct { type LocalTarget struct {
storage ObjectStorage Storage ObjectStorage
} }
func (t localTarget) WriteObject(ctx context.Context, obj *objectSDK.Object, meta objectCore.ContentMeta) error { func (t LocalTarget) WriteObject(ctx context.Context, obj *objectSDK.Object, meta objectCore.ContentMeta) error {
switch meta.Type() { switch meta.Type() {
case objectSDK.TypeTombstone: case objectSDK.TypeTombstone:
err := t.storage.Delete(ctx, objectCore.AddressOf(obj), meta.Objects()) err := t.Storage.Delete(ctx, objectCore.AddressOf(obj), meta.Objects())
if err != nil { if err != nil {
return fmt.Errorf("could not delete objects from tombstone locally: %w", err) return fmt.Errorf("could not delete objects from tombstone locally: %w", err)
} }
case objectSDK.TypeLock: case objectSDK.TypeLock:
err := t.storage.Lock(ctx, objectCore.AddressOf(obj), meta.Objects()) err := t.Storage.Lock(ctx, objectCore.AddressOf(obj), meta.Objects())
if err != nil { if err != nil {
return fmt.Errorf("could not lock object from lock objects locally: %w", err) return fmt.Errorf("could not lock object from lock objects locally: %w", err)
} }
@ -44,7 +44,7 @@ func (t localTarget) WriteObject(ctx context.Context, obj *objectSDK.Object, met
// objects that do not change meta storage // objects that do not change meta storage
} }
if err := t.storage.Put(ctx, obj); err != nil { if err := t.Storage.Put(ctx, obj); err != nil {
return fmt.Errorf("(%T) could not put object to local storage: %w", t, err) return fmt.Errorf("(%T) could not put object to local storage: %w", t, err)
} }
return nil return nil

View file

@ -1,4 +1,4 @@
package putsvc package writer
import ( import (
"context" "context"
@ -16,7 +16,7 @@ import (
"google.golang.org/grpc/status" "google.golang.org/grpc/status"
) )
type remoteTarget struct { type remoteWriter struct {
privateKey *ecdsa.PrivateKey privateKey *ecdsa.PrivateKey
commonPrm *util.CommonPrm commonPrm *util.CommonPrm
@ -41,7 +41,7 @@ type RemotePutPrm struct {
obj *objectSDK.Object obj *objectSDK.Object
} }
func (t *remoteTarget) WriteObject(ctx context.Context, obj *objectSDK.Object, _ objectcore.ContentMeta) error { func (t *remoteWriter) WriteObject(ctx context.Context, obj *objectSDK.Object, _ objectcore.ContentMeta) error {
c, err := t.clientConstructor.Get(t.nodeInfo) c, err := t.clientConstructor.Get(t.nodeInfo)
if err != nil { if err != nil {
return fmt.Errorf("(%T) could not create SDK client %s: %w", t, t.nodeInfo, err) return fmt.Errorf("(%T) could not create SDK client %s: %w", t, t.nodeInfo, err)
@ -64,7 +64,7 @@ func (t *remoteTarget) WriteObject(ctx context.Context, obj *objectSDK.Object, _
return t.putStream(ctx, prm) return t.putStream(ctx, prm)
} }
func (t *remoteTarget) putStream(ctx context.Context, prm internalclient.PutObjectPrm) error { func (t *remoteWriter) putStream(ctx context.Context, prm internalclient.PutObjectPrm) error {
_, err := internalclient.PutObject(ctx, prm) _, err := internalclient.PutObject(ctx, prm)
if err != nil { if err != nil {
return fmt.Errorf("(%T) could not put object to %s: %w", t, t.nodeInfo.AddressGroup(), err) return fmt.Errorf("(%T) could not put object to %s: %w", t, t.nodeInfo.AddressGroup(), err)
@ -72,7 +72,7 @@ func (t *remoteTarget) putStream(ctx context.Context, prm internalclient.PutObje
return nil return nil
} }
func (t *remoteTarget) putSingle(ctx context.Context, prm internalclient.PutObjectPrm) error { func (t *remoteWriter) putSingle(ctx context.Context, prm internalclient.PutObjectPrm) error {
_, err := internalclient.PutObjectSingle(ctx, prm) _, err := internalclient.PutObjectSingle(ctx, prm)
if err != nil { if err != nil {
return fmt.Errorf("(%T) could not put single object to %s: %w", t, t.nodeInfo.AddressGroup(), err) return fmt.Errorf("(%T) could not put single object to %s: %w", t, t.nodeInfo.AddressGroup(), err)
@ -113,7 +113,7 @@ func (s *RemoteSender) PutObject(ctx context.Context, p *RemotePutPrm) error {
return err return err
} }
t := &remoteTarget{ t := &remoteWriter{
privateKey: key, privateKey: key,
clientConstructor: s.clientConstructor, clientConstructor: s.clientConstructor,
} }

View file

@ -0,0 +1,183 @@
package writer
import (
"context"
"crypto/ecdsa"
"fmt"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/policy"
objutil "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/transformer"
)
type MaxSizeSource interface {
// MaxObjectSize returns maximum payload size
// of physically stored object in system.
//
// Must return 0 if value can not be obtained.
MaxObjectSize() uint64
}
type ClientConstructor interface {
Get(client.NodeInfo) (client.MultiAddressClient, error)
}
type InnerRing interface {
InnerRingKeys() ([][]byte, error)
}
type FormatValidatorConfig interface {
VerifySessionTokenIssuer() bool
}
// Config represents a set of static parameters that are established during
// the initialization phase of all services.
type Config struct {
KeyStorage *objutil.KeyStorage
MaxSizeSrc MaxSizeSource
LocalStore ObjectStorage
ContainerSource container.Source
NetmapSource netmap.Source
RemotePool, LocalPool util.WorkerPool
NetmapKeys netmap.AnnouncedKeys
FormatValidator *object.FormatValidator
NetworkState netmap.State
ClientConstructor ClientConstructor
Logger *logger.Logger
VerifySessionTokenIssuer bool
}
type Option func(*Config)
func WithWorkerPools(remote, local util.WorkerPool) Option {
return func(c *Config) {
c.RemotePool, c.LocalPool = remote, local
}
}
func WithLogger(l *logger.Logger) Option {
return func(c *Config) {
c.Logger = l
}
}
func WithVerifySessionTokenIssuer(v bool) Option {
return func(c *Config) {
c.VerifySessionTokenIssuer = v
}
}
func (c *Config) getWorkerPool(pub []byte) (util.WorkerPool, bool) {
if c.NetmapKeys.IsLocalKey(pub) {
return c.LocalPool, true
}
return c.RemotePool, false
}
type Params struct {
Config *Config
Common *objutil.CommonPrm
Header *objectSDK.Object
Container containerSDK.Container
TraverseOpts []placement.Option
Relay func(context.Context, client.NodeInfo, client.MultiAddressClient) error
SignRequestPrivateKey *ecdsa.PrivateKey
}
func New(prm *Params) transformer.ObjectWriter {
if container.IsECContainer(prm.Container) && object.IsECSupported(prm.Header) {
return newECWriter(prm)
}
return newDefaultObjectWriter(prm, false)
}
func newDefaultObjectWriter(prm *Params, forECPlacement bool) transformer.ObjectWriter {
var relay func(context.Context, NodeDescriptor) error
if prm.Relay != nil {
relay = func(ctx context.Context, node NodeDescriptor) error {
var info client.NodeInfo
client.NodeInfoFromNetmapElement(&info, node.Info)
c, err := prm.Config.ClientConstructor.Get(info)
if err != nil {
return fmt.Errorf("could not create SDK client %s: %w", info.AddressGroup(), err)
}
return prm.Relay(ctx, info, c)
}
}
var resetSuccessAfterOnBroadcast bool
traverseOpts := prm.TraverseOpts
if forECPlacement && !prm.Common.LocalOnly() {
// save non-regular and linking object to EC container.
// EC 2.1 -> REP 2, EC 2.2 -> REP 3 etc.
traverseOpts = append(traverseOpts, placement.SuccessAfter(uint32(policy.ECParityCount(prm.Container.PlacementPolicy())+1)))
resetSuccessAfterOnBroadcast = true
}
return &distributedWriter{
cfg: prm.Config,
placementOpts: traverseOpts,
resetSuccessAfterOnBroadcast: resetSuccessAfterOnBroadcast,
nodeTargetInitializer: func(node NodeDescriptor) preparedObjectTarget {
if node.Local {
return LocalTarget{
Storage: prm.Config.LocalStore,
}
}
rt := &remoteWriter{
privateKey: prm.SignRequestPrivateKey,
commonPrm: prm.Common,
clientConstructor: prm.Config.ClientConstructor,
}
client.NodeInfoFromNetmapElement(&rt.nodeInfo, node.Info)
return rt
},
relay: relay,
}
}
func newECWriter(prm *Params) transformer.ObjectWriter {
return &objectWriterDispatcher{
ecWriter: &ECWriter{
Config: prm.Config,
PlacementOpts: append(prm.TraverseOpts, placement.WithCopyNumbers(nil)), // copies number ignored for EC
Container: prm.Container,
Key: prm.SignRequestPrivateKey,
CommonPrm: prm.Common,
Relay: prm.Relay,
},
repWriter: newDefaultObjectWriter(prm, true),
}
}

View file

@ -2,43 +2,40 @@ package patchsvc
import ( import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object"
objectwriter "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/common/writer"
getsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/get" getsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/get"
putsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/put"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
) )
// Service implements Put operation of Object service v2. // Service implements Put operation of Object service v2.
type Service struct { type Service struct {
keyStorage *util.KeyStorage *objectwriter.Config
getSvc *getsvc.Service getSvc *getsvc.Service
putSvc *putsvc.Service
} }
// NewService constructs Service instance from provided options. // NewService constructs Service instance from provided options.
func NewService(ks *util.KeyStorage, getSvc *getsvc.Service, putSvc *putsvc.Service) *Service { //
// Patch service can use the same objectwriter.Config initializied by Put service.
func NewService(cfg *objectwriter.Config,
getSvc *getsvc.Service,
) *Service {
return &Service{ return &Service{
keyStorage: ks, Config: cfg,
getSvc: getSvc, getSvc: getSvc,
putSvc: putSvc,
} }
} }
// Put calls internal service and returns v2 object streamer. // Put calls internal service and returns v2 object streamer.
func (s *Service) Patch() (object.PatchObjectStream, error) { func (s *Service) Patch() (object.PatchObjectStream, error) {
nodeKey, err := s.keyStorage.GetKey(nil) nodeKey, err := s.Config.KeyStorage.GetKey(nil)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return &Streamer{ return &Streamer{
Config: s.Config,
getSvc: s.getSvc, getSvc: s.getSvc,
putSvc: s.putSvc,
localNodeKey: nodeKey, localNodeKey: nodeKey,
}, nil }, nil
} }

View file

@ -9,8 +9,9 @@ import (
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object" objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
refsV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/refs" refsV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/refs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/common/target"
objectwriter "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/common/writer"
getsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/get" getsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/get"
putsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/put"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
@ -21,6 +22,8 @@ import (
// Streamer for the patch handler is a pipeline that merges two incoming streams of patches // Streamer for the patch handler is a pipeline that merges two incoming streams of patches
// and original object payload chunks. The merged result is fed to Put stream target. // and original object payload chunks. The merged result is fed to Put stream target.
type Streamer struct { type Streamer struct {
*objectwriter.Config
// Patcher must be initialized at first Streamer.Send call. // Patcher must be initialized at first Streamer.Send call.
patcher patcher.PatchApplier patcher patcher.PatchApplier
@ -28,8 +31,6 @@ type Streamer struct {
getSvc *getsvc.Service getSvc *getsvc.Service
putSvc *putsvc.Service
localNodeKey *ecdsa.PrivateKey localNodeKey *ecdsa.PrivateKey
} }
@ -78,11 +79,6 @@ func (s *Streamer) init(ctx context.Context, req *objectV2.PatchRequest) error {
localNodeKey: s.localNodeKey, localNodeKey: s.localNodeKey,
} }
putstm, err := s.putSvc.Put()
if err != nil {
return err
}
hdr := hdrWithSig.GetHeader() hdr := hdrWithSig.GetHeader()
oV2 := new(objectV2.Object) oV2 := new(objectV2.Object)
hV2 := new(objectV2.Header) hV2 := new(objectV2.Header)
@ -97,14 +93,14 @@ func (s *Streamer) init(ctx context.Context, req *objectV2.PatchRequest) error {
} }
oV2.GetHeader().SetOwnerID(ownerID) oV2.GetHeader().SetOwnerID(ownerID)
prm, err := s.putInitPrm(req, oV2) target, err := target.New(&objectwriter.Params{
Config: s.Config,
Common: commonPrm,
Header: objectSDK.NewFromV2(oV2),
SignRequestPrivateKey: s.localNodeKey,
})
if err != nil { if err != nil {
return err return fmt.Errorf("target creation: %w", err)
}
err = putstm.Init(ctx, prm)
if err != nil {
return err
} }
patcherPrm := patcher.Params{ patcherPrm := patcher.Params{
@ -112,7 +108,7 @@ func (s *Streamer) init(ctx context.Context, req *objectV2.PatchRequest) error {
RangeProvider: rangeProvider, RangeProvider: rangeProvider,
ObjectWriter: putstm.Target(), ObjectWriter: target,
} }
s.patcher = patcher.New(patcherPrm) s.patcher = patcher.New(patcherPrm)

View file

@ -6,31 +6,12 @@ import (
"errors" "errors"
"fmt" "fmt"
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/refs" "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/refs"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/session" "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/session"
putsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/put"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys" "github.com/nspcc-dev/neo-go/pkg/crypto/keys"
) )
// putInitPrm initializes put paramerer for Put stream.
func (s *Streamer) putInitPrm(req *objectV2.PatchRequest, obj *objectV2.Object) (*putsvc.PutInitPrm, error) {
commonPrm, err := util.CommonPrmFromV2(req)
if err != nil {
return nil, err
}
prm := new(putsvc.PutInitPrm)
prm.WithObject(objectSDK.NewFromV2(obj)).
WithCommonPrm(commonPrm).
WithPrivateKey(s.localNodeKey)
return prm, nil
}
func newOwnerID(vh *session.RequestVerificationHeader) (*refs.OwnerID, error) { func newOwnerID(vh *session.RequestVerificationHeader) (*refs.OwnerID, error) {
for vh.GetOrigin() != nil { for vh.GetOrigin() != nil {
vh = vh.GetOrigin() vh = vh.GetOrigin()

View file

@ -1,132 +1,66 @@
package putsvc package putsvc
import ( import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
objectwriter "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/common/writer"
objutil "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util" objutil "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
"go.uber.org/zap" "go.uber.org/zap"
) )
type MaxSizeSource interface {
// MaxObjectSize returns maximum payload size
// of physically stored object in system.
//
// Must return 0 if value can not be obtained.
MaxObjectSize() uint64
}
type Service struct { type Service struct {
*cfg *objectwriter.Config
}
type Option func(*cfg)
type ClientConstructor interface {
Get(client.NodeInfo) (client.MultiAddressClient, error)
}
type InnerRing interface {
InnerRingKeys() ([][]byte, error)
}
type FormatValidatorConfig interface {
VerifySessionTokenIssuer() bool
}
type cfg struct {
keyStorage *objutil.KeyStorage
maxSizeSrc MaxSizeSource
localStore ObjectStorage
cnrSrc container.Source
netMapSrc netmap.Source
remotePool, localPool util.WorkerPool
netmapKeys netmap.AnnouncedKeys
fmtValidator *object.FormatValidator
networkState netmap.State
clientConstructor ClientConstructor
log *logger.Logger
verifySessionTokenIssuer bool
} }
func NewService(ks *objutil.KeyStorage, func NewService(ks *objutil.KeyStorage,
cc ClientConstructor, cc objectwriter.ClientConstructor,
ms MaxSizeSource, ms objectwriter.MaxSizeSource,
os ObjectStorage, os objectwriter.ObjectStorage,
cs container.Source, cs container.Source,
ns netmap.Source, ns netmap.Source,
nk netmap.AnnouncedKeys, nk netmap.AnnouncedKeys,
nst netmap.State, nst netmap.State,
ir InnerRing, ir objectwriter.InnerRing,
opts ...Option, opts ...objectwriter.Option,
) *Service { ) *Service {
c := &cfg{ c := &objectwriter.Config{
remotePool: util.NewPseudoWorkerPool(), RemotePool: util.NewPseudoWorkerPool(),
localPool: util.NewPseudoWorkerPool(), LocalPool: util.NewPseudoWorkerPool(),
log: &logger.Logger{Logger: zap.L()}, Logger: &logger.Logger{Logger: zap.L()},
keyStorage: ks, KeyStorage: ks,
clientConstructor: cc, ClientConstructor: cc,
maxSizeSrc: ms, MaxSizeSrc: ms,
localStore: os, LocalStore: os,
cnrSrc: cs, ContainerSource: cs,
netMapSrc: ns, NetmapSource: ns,
netmapKeys: nk, NetmapKeys: nk,
networkState: nst, NetworkState: nst,
} }
for i := range opts { for i := range opts {
opts[i](c) opts[i](c)
} }
c.fmtValidator = object.NewFormatValidator( c.FormatValidator = object.NewFormatValidator(
object.WithLockSource(os), object.WithLockSource(os),
object.WithNetState(nst), object.WithNetState(nst),
object.WithInnerRing(ir), object.WithInnerRing(ir),
object.WithNetmapSource(ns), object.WithNetmapSource(ns),
object.WithContainersSource(cs), object.WithContainersSource(cs),
object.WithVerifySessionTokenIssuer(c.verifySessionTokenIssuer), object.WithVerifySessionTokenIssuer(c.VerifySessionTokenIssuer),
object.WithLogger(c.log), object.WithLogger(c.Logger),
) )
return &Service{ return &Service{
cfg: c, Config: c,
} }
} }
func (p *Service) Put() (*Streamer, error) { func (p *Service) Put() (*Streamer, error) {
return &Streamer{ return &Streamer{
cfg: p.cfg, Config: p.Config,
}, nil }, nil
} }
func WithWorkerPools(remote, local util.WorkerPool) Option {
return func(c *cfg) {
c.remotePool, c.localPool = remote, local
}
}
func WithLogger(l *logger.Logger) Option {
return func(c *cfg) {
c.log = l
}
}
func WithVerifySessionTokenIssuer(v bool) Option {
return func(c *cfg) {
c.verifySessionTokenIssuer = v
}
}

View file

@ -21,6 +21,8 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/policy" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/policy"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/common/target"
objectwriter "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/common/writer"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/internal" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/internal"
svcutil "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util" svcutil "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
@ -97,12 +99,12 @@ func (s *Service) validatePutSingle(ctx context.Context, obj *objectSDK.Object)
func (s *Service) validarePutSingleSize(obj *objectSDK.Object) error { func (s *Service) validarePutSingleSize(obj *objectSDK.Object) error {
if uint64(len(obj.Payload())) != obj.PayloadSize() { if uint64(len(obj.Payload())) != obj.PayloadSize() {
return ErrWrongPayloadSize return target.ErrWrongPayloadSize
} }
maxAllowedSize := s.maxSizeSrc.MaxObjectSize() maxAllowedSize := s.Config.MaxSizeSrc.MaxObjectSize()
if obj.PayloadSize() > maxAllowedSize { if obj.PayloadSize() > maxAllowedSize {
return ErrExceedingMaxSize return target.ErrExceedingMaxSize
} }
return nil return nil
@ -137,11 +139,11 @@ func (s *Service) validatePutSingleChecksum(obj *objectSDK.Object) error {
} }
func (s *Service) validatePutSingleObject(ctx context.Context, obj *objectSDK.Object) (object.ContentMeta, error) { func (s *Service) validatePutSingleObject(ctx context.Context, obj *objectSDK.Object) (object.ContentMeta, error) {
if err := s.fmtValidator.Validate(ctx, obj, false); err != nil { if err := s.FormatValidator.Validate(ctx, obj, false); err != nil {
return object.ContentMeta{}, fmt.Errorf("coud not validate object format: %w", err) return object.ContentMeta{}, fmt.Errorf("coud not validate object format: %w", err)
} }
meta, err := s.fmtValidator.ValidateContent(obj) meta, err := s.FormatValidator.ValidateContent(obj)
if err != nil { if err != nil {
return object.ContentMeta{}, fmt.Errorf("could not validate payload content: %w", err) return object.ContentMeta{}, fmt.Errorf("could not validate payload content: %w", err)
} }
@ -164,17 +166,17 @@ func (s *Service) saveToNodes(ctx context.Context, obj *objectSDK.Object, req *o
} }
func (s *Service) saveToREPReplicas(ctx context.Context, placement putSinglePlacement, obj *objectSDK.Object, localOnly bool, req *objectAPI.PutSingleRequest, meta object.ContentMeta) error { func (s *Service) saveToREPReplicas(ctx context.Context, placement putSinglePlacement, obj *objectSDK.Object, localOnly bool, req *objectAPI.PutSingleRequest, meta object.ContentMeta) error {
iter := s.cfg.newNodeIterator(placement.placementOptions) iter := s.Config.NewNodeIterator(placement.placementOptions)
iter.extraBroadcastEnabled = needAdditionalBroadcast(obj, localOnly) iter.ExtraBroadcastEnabled = objectwriter.NeedAdditionalBroadcast(obj, localOnly)
iter.resetSuccessAfterOnBroadcast = placement.resetSuccessAfterOnBroadcast iter.ResetSuccessAfterOnBroadcast = placement.resetSuccessAfterOnBroadcast
signer := &putSingleRequestSigner{ signer := &putSingleRequestSigner{
req: req, req: req,
keyStorage: s.keyStorage, keyStorage: s.Config.KeyStorage,
signer: &sync.Once{}, signer: &sync.Once{},
} }
return iter.forEachNode(ctx, func(ctx context.Context, nd nodeDesc) error { return iter.ForEachNode(ctx, func(ctx context.Context, nd objectwriter.NodeDescriptor) error {
return s.saveToPlacementNode(ctx, &nd, obj, signer, meta) return s.saveToPlacementNode(ctx, &nd, obj, signer, meta)
}) })
} }
@ -184,25 +186,25 @@ func (s *Service) saveToECReplicas(ctx context.Context, placement putSinglePlace
if err != nil { if err != nil {
return err return err
} }
key, err := s.cfg.keyStorage.GetKey(nil) key, err := s.Config.KeyStorage.GetKey(nil)
if err != nil { if err != nil {
return err return err
} }
signer := &putSingleRequestSigner{ signer := &putSingleRequestSigner{
req: req, req: req,
keyStorage: s.keyStorage, keyStorage: s.Config.KeyStorage,
signer: &sync.Once{}, signer: &sync.Once{},
} }
w := ecWriter{ w := objectwriter.ECWriter{
cfg: s.cfg, Config: s.Config,
placementOpts: placement.placementOptions, PlacementOpts: placement.placementOptions,
objMeta: meta, ObjectMeta: meta,
objMetaValid: true, ObjectMetaValid: true,
commonPrm: commonPrm, CommonPrm: commonPrm,
container: placement.container, Container: placement.container,
key: key, Key: key,
relay: func(ctx context.Context, ni client.NodeInfo, mac client.MultiAddressClient) error { Relay: func(ctx context.Context, ni client.NodeInfo, mac client.MultiAddressClient) error {
return s.redirectPutSingleRequest(ctx, signer, obj, ni, mac) return s.redirectPutSingleRequest(ctx, signer, obj, ni, mac)
}, },
} }
@ -223,7 +225,7 @@ func (s *Service) getPutSinglePlacementOptions(obj *objectSDK.Object, copiesNumb
if !ok { if !ok {
return result, errors.New("missing container ID") return result, errors.New("missing container ID")
} }
cnrInfo, err := s.cnrSrc.Get(cnrID) cnrInfo, err := s.Config.ContainerSource.Get(cnrID)
if err != nil { if err != nil {
return result, fmt.Errorf("could not get container by ID: %w", err) return result, fmt.Errorf("could not get container by ID: %w", err)
} }
@ -247,31 +249,31 @@ func (s *Service) getPutSinglePlacementOptions(obj *objectSDK.Object, copiesNumb
} }
result.placementOptions = append(result.placementOptions, placement.ForObject(objID)) result.placementOptions = append(result.placementOptions, placement.ForObject(objID))
latestNetmap, err := netmap.GetLatestNetworkMap(s.netMapSrc) latestNetmap, err := netmap.GetLatestNetworkMap(s.Config.NetmapSource)
if err != nil { if err != nil {
return result, fmt.Errorf("could not get latest network map: %w", err) return result, fmt.Errorf("could not get latest network map: %w", err)
} }
builder := placement.NewNetworkMapBuilder(latestNetmap) builder := placement.NewNetworkMapBuilder(latestNetmap)
if localOnly { if localOnly {
result.placementOptions = append(result.placementOptions, placement.SuccessAfter(1)) result.placementOptions = append(result.placementOptions, placement.SuccessAfter(1))
builder = svcutil.NewLocalPlacement(builder, s.netmapKeys) builder = svcutil.NewLocalPlacement(builder, s.Config.NetmapKeys)
} }
result.placementOptions = append(result.placementOptions, placement.UseBuilder(builder)) result.placementOptions = append(result.placementOptions, placement.UseBuilder(builder))
return result, nil return result, nil
} }
func (s *Service) saveToPlacementNode(ctx context.Context, nodeDesc *nodeDesc, obj *objectSDK.Object, func (s *Service) saveToPlacementNode(ctx context.Context, nodeDesc *objectwriter.NodeDescriptor, obj *objectSDK.Object,
signer *putSingleRequestSigner, meta object.ContentMeta, signer *putSingleRequestSigner, meta object.ContentMeta,
) error { ) error {
if nodeDesc.local { if nodeDesc.Local {
return s.saveLocal(ctx, obj, meta) return s.saveLocal(ctx, obj, meta)
} }
var info client.NodeInfo var info client.NodeInfo
client.NodeInfoFromNetmapElement(&info, nodeDesc.info) client.NodeInfoFromNetmapElement(&info, nodeDesc.Info)
c, err := s.clientConstructor.Get(info) c, err := s.Config.ClientConstructor.Get(info)
if err != nil { if err != nil {
return fmt.Errorf("could not create SDK client %s: %w", info.AddressGroup(), err) return fmt.Errorf("could not create SDK client %s: %w", info.AddressGroup(), err)
} }
@ -280,8 +282,8 @@ func (s *Service) saveToPlacementNode(ctx context.Context, nodeDesc *nodeDesc, o
} }
func (s *Service) saveLocal(ctx context.Context, obj *objectSDK.Object, meta object.ContentMeta) error { func (s *Service) saveLocal(ctx context.Context, obj *objectSDK.Object, meta object.ContentMeta) error {
localTarget := &localTarget{ localTarget := &objectwriter.LocalTarget{
storage: s.localStore, Storage: s.Config.LocalStore,
} }
return localTarget.WriteObject(ctx, obj, meta) return localTarget.WriteObject(ctx, obj, meta)
} }
@ -314,7 +316,7 @@ func (s *Service) redirectPutSingleRequest(ctx context.Context,
if err != nil { if err != nil {
objID, _ := obj.ID() objID, _ := obj.ID()
cnrID, _ := obj.ContainerID() cnrID, _ := obj.ContainerID()
s.log.Warn(logs.PutSingleRedirectFailure, s.Config.Logger.Warn(logs.PutSingleRedirectFailure,
zap.Error(err), zap.Error(err),
zap.Stringer("address", addr), zap.Stringer("address", addr),
zap.Stringer("object_id", objID), zap.Stringer("object_id", objID),

View file

@ -2,33 +2,21 @@ package putsvc
import ( import (
"context" "context"
"crypto/ecdsa"
"errors" "errors"
"fmt" "fmt"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/common/target"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap" objectwriter "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/common/writer"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/policy"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
pkgutil "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/transformer" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/transformer"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
) )
type Streamer struct { type Streamer struct {
*cfg *objectwriter.Config
privateKey *ecdsa.PrivateKey
target transformer.ChunkedObjectWriter target transformer.ChunkedObjectWriter
relay func(context.Context, client.NodeInfo, client.MultiAddressClient) error relay func(context.Context, client.NodeInfo, client.MultiAddressClient) error
maxPayloadSz uint64 // network config
} }
var errNotInit = errors.New("stream not initialized") var errNotInit = errors.New("stream not initialized")
@ -36,8 +24,23 @@ var errNotInit = errors.New("stream not initialized")
var errInitRecall = errors.New("init recall") var errInitRecall = errors.New("init recall")
func (p *Streamer) Init(ctx context.Context, prm *PutInitPrm) error { func (p *Streamer) Init(ctx context.Context, prm *PutInitPrm) error {
if p.target != nil {
return errInitRecall
}
// initialize destination target // initialize destination target
if err := p.initTarget(prm); err != nil { prmTarget := &objectwriter.Params{
Config: p.Config,
Common: prm.common,
Header: prm.hdr,
Container: prm.cnr,
TraverseOpts: prm.traverseOpts,
Relay: p.relay,
}
var err error
p.target, err = target.New(prmTarget)
if err != nil {
return fmt.Errorf("(%T) could not initialize object target: %w", p, err) return fmt.Errorf("(%T) could not initialize object target: %w", p, err)
} }
@ -47,253 +50,6 @@ func (p *Streamer) Init(ctx context.Context, prm *PutInitPrm) error {
return nil return nil
} }
// Target accesses underlying target chunked object writer.
func (p *Streamer) Target() transformer.ChunkedObjectWriter {
return p.target
}
// MaxObjectSize returns maximum payload size for the streaming session.
//
// Must be called after the successful Init.
func (p *Streamer) MaxObjectSize() uint64 {
return p.maxPayloadSz
}
func (p *Streamer) initTarget(prm *PutInitPrm) error {
// prevent re-calling
if p.target != nil {
return errInitRecall
}
// prepare needed put parameters
if err := p.preparePrm(prm); err != nil {
return fmt.Errorf("(%T) could not prepare put parameters: %w", p, err)
}
p.maxPayloadSz = p.maxSizeSrc.MaxObjectSize()
if p.maxPayloadSz == 0 {
return fmt.Errorf("(%T) could not obtain max object size parameter", p)
}
if prm.hdr.Signature() != nil {
return p.initUntrustedTarget(prm)
}
return p.initTrustedTarget(prm)
}
func (p *Streamer) initUntrustedTarget(prm *PutInitPrm) error {
p.relay = prm.relay
if prm.privateKey != nil {
p.privateKey = prm.privateKey
} else {
nodeKey, err := p.cfg.keyStorage.GetKey(nil)
if err != nil {
return err
}
p.privateKey = nodeKey
}
// prepare untrusted-Put object target
p.target = &validatingPreparedTarget{
nextTarget: newInMemoryObjectBuilder(p.newObjectWriter(prm)),
fmt: p.fmtValidator,
maxPayloadSz: p.maxPayloadSz,
}
return nil
}
func (p *Streamer) initTrustedTarget(prm *PutInitPrm) error {
sToken := prm.common.SessionToken()
// prepare trusted-Put object target
// get private token from local storage
var sessionInfo *util.SessionInfo
if sToken != nil {
sessionInfo = &util.SessionInfo{
ID: sToken.ID(),
Owner: sToken.Issuer(),
}
}
key, err := p.keyStorage.GetKey(sessionInfo)
if err != nil {
return fmt.Errorf("(%T) could not receive session key: %w", p, err)
}
// In case session token is missing, the line above returns the default key.
// If it isn't owner key, replication attempts will fail, thus this check.
ownerObj := prm.hdr.OwnerID()
if ownerObj.IsEmpty() {
return errors.New("missing object owner")
}
if sToken == nil {
var ownerSession user.ID
user.IDFromKey(&ownerSession, key.PublicKey)
if !ownerObj.Equals(ownerSession) {
return fmt.Errorf("(%T) session token is missing but object owner id is different from the default key", p)
}
} else {
if !ownerObj.Equals(sessionInfo.Owner) {
return fmt.Errorf("(%T) different token issuer and object owner identifiers %s/%s", p, sessionInfo.Owner, ownerObj)
}
}
if prm.privateKey != nil {
p.privateKey = prm.privateKey
} else {
p.privateKey = key
}
p.target = &validatingTarget{
fmt: p.fmtValidator,
nextTarget: transformer.NewPayloadSizeLimiter(transformer.Params{
Key: key,
NextTargetInit: func() transformer.ObjectWriter { return p.newObjectWriter(prm) },
NetworkState: p.networkState,
MaxSize: p.maxPayloadSz,
WithoutHomomorphicHash: containerSDK.IsHomomorphicHashingDisabled(prm.cnr),
SessionToken: sToken,
}),
}
return nil
}
func (p *Streamer) preparePrm(prm *PutInitPrm) error {
var err error
// get latest network map
nm, err := netmap.GetLatestNetworkMap(p.netMapSrc)
if err != nil {
return fmt.Errorf("(%T) could not get latest network map: %w", p, err)
}
idCnr, ok := prm.hdr.ContainerID()
if !ok {
return errors.New("missing container ID")
}
// get container to store the object
cnrInfo, err := p.cnrSrc.Get(idCnr)
if err != nil {
return fmt.Errorf("(%T) could not get container by ID: %w", p, err)
}
prm.cnr = cnrInfo.Value
// add common options
prm.traverseOpts = append(prm.traverseOpts,
// set processing container
placement.ForContainer(prm.cnr),
)
if ech := prm.hdr.ECHeader(); ech != nil {
prm.traverseOpts = append(prm.traverseOpts,
// set identifier of the processing object
placement.ForObject(ech.Parent()),
)
} else if id, ok := prm.hdr.ID(); ok {
prm.traverseOpts = append(prm.traverseOpts,
// set identifier of the processing object
placement.ForObject(id),
)
}
// create placement builder from network map
builder := placement.NewNetworkMapBuilder(nm)
if prm.common.LocalOnly() {
// restrict success count to 1 stored copy (to local storage)
prm.traverseOpts = append(prm.traverseOpts, placement.SuccessAfter(1))
// use local-only placement builder
builder = util.NewLocalPlacement(builder, p.netmapKeys)
}
// set placement builder
prm.traverseOpts = append(prm.traverseOpts, placement.UseBuilder(builder))
return nil
}
func (p *Streamer) newObjectWriter(prm *PutInitPrm) transformer.ObjectWriter {
if container.IsECContainer(prm.cnr) && object.IsECSupported(prm.hdr) {
return p.newECWriter(prm)
}
return p.newDefaultObjectWriter(prm, false)
}
func (p *Streamer) newDefaultObjectWriter(prm *PutInitPrm, forECPlacement bool) transformer.ObjectWriter {
var relay func(context.Context, nodeDesc) error
if p.relay != nil {
relay = func(ctx context.Context, node nodeDesc) error {
var info client.NodeInfo
client.NodeInfoFromNetmapElement(&info, node.info)
c, err := p.clientConstructor.Get(info)
if err != nil {
return fmt.Errorf("could not create SDK client %s: %w", info.AddressGroup(), err)
}
return p.relay(ctx, info, c)
}
}
var resetSuccessAfterOnBroadcast bool
traverseOpts := prm.traverseOpts
if forECPlacement && !prm.common.LocalOnly() {
// save non-regular and linking object to EC container.
// EC 2.1 -> REP 2, EC 2.2 -> REP 3 etc.
traverseOpts = append(traverseOpts, placement.SuccessAfter(uint32(policy.ECParityCount(prm.cnr.PlacementPolicy())+1)))
resetSuccessAfterOnBroadcast = true
}
return &distributedTarget{
cfg: p.cfg,
placementOpts: traverseOpts,
resetSuccessAfterOnBroadcast: resetSuccessAfterOnBroadcast,
nodeTargetInitializer: func(node nodeDesc) preparedObjectTarget {
if node.local {
return localTarget{
storage: p.localStore,
}
}
rt := &remoteTarget{
privateKey: p.privateKey,
commonPrm: prm.common,
clientConstructor: p.clientConstructor,
}
client.NodeInfoFromNetmapElement(&rt.nodeInfo, node.info)
return rt
},
relay: relay,
}
}
func (p *Streamer) newECWriter(prm *PutInitPrm) transformer.ObjectWriter {
return &objectWriterDispatcher{
ecWriter: &ecWriter{
cfg: p.cfg,
placementOpts: append(prm.traverseOpts, placement.WithCopyNumbers(nil)), // copies number ignored for EC
container: prm.cnr,
key: p.privateKey,
commonPrm: prm.common,
relay: p.relay,
},
repWriter: p.newDefaultObjectWriter(prm, true),
}
}
func (p *Streamer) SendChunk(ctx context.Context, prm *PutChunkPrm) error { func (p *Streamer) SendChunk(ctx context.Context, prm *PutChunkPrm) error {
if p.target == nil { if p.target == nil {
return errNotInit return errNotInit
@ -327,10 +83,3 @@ func (p *Streamer) Close(ctx context.Context) (*PutResponse, error) {
id: ids.SelfID, id: ids.SelfID,
}, nil }, nil
} }
func (c *cfg) getWorkerPool(pub []byte) (pkgutil.WorkerPool, bool) {
if c.netmapKeys.IsLocalKey(pub) {
return c.localPool, true
}
return c.remotePool, false
}

View file

@ -11,6 +11,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/signature" "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/signature"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/common/target"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/internal" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/internal"
internalclient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/internal/client" internalclient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/internal/client"
putsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/put" putsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/put"
@ -55,7 +56,7 @@ func (s *streamer) Send(ctx context.Context, req *object.PutRequest) (err error)
s.saveChunks = v.GetSignature() != nil s.saveChunks = v.GetSignature() != nil
if s.saveChunks { if s.saveChunks {
maxSz := s.stream.MaxObjectSize() maxSz := s.stream.MaxSizeSrc.MaxObjectSize()
s.sizes = &sizes{ s.sizes = &sizes{
payloadSz: uint64(v.GetHeader().GetPayloadLength()), payloadSz: uint64(v.GetHeader().GetPayloadLength()),
@ -63,7 +64,7 @@ func (s *streamer) Send(ctx context.Context, req *object.PutRequest) (err error)
// check payload size limit overflow // check payload size limit overflow
if s.payloadSz > maxSz { if s.payloadSz > maxSz {
return putsvc.ErrExceedingMaxSize return target.ErrExceedingMaxSize
} }
s.init = req s.init = req
@ -74,7 +75,7 @@ func (s *streamer) Send(ctx context.Context, req *object.PutRequest) (err error)
// check payload size overflow // check payload size overflow
if s.writtenPayload > s.payloadSz { if s.writtenPayload > s.payloadSz {
return putsvc.ErrWrongPayloadSize return target.ErrWrongPayloadSize
} }
} }
@ -117,7 +118,7 @@ func (s *streamer) CloseAndRecv(ctx context.Context) (*object.PutResponse, error
if s.saveChunks { if s.saveChunks {
// check payload size correctness // check payload size correctness
if s.writtenPayload != s.payloadSz { if s.writtenPayload != s.payloadSz {
return nil, putsvc.ErrWrongPayloadSize return nil, target.ErrWrongPayloadSize
} }
} }

View file

@ -5,7 +5,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
putsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/put" objectwriter "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/common/writer"
tracingPkg "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/tracing" tracingPkg "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
@ -52,7 +52,7 @@ func (p *Replicator) HandleReplicationTask(ctx context.Context, task Task, res T
} }
} }
prm := new(putsvc.RemotePutPrm). prm := new(objectwriter.RemotePutPrm).
WithObject(task.Obj) WithObject(task.Obj)
for i := 0; task.NumCopies > 0 && i < len(task.Nodes); i++ { for i := 0; task.NumCopies > 0 && i < len(task.Nodes); i++ {

View file

@ -4,8 +4,8 @@ import (
"time" "time"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
objectwriter "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/common/writer"
getsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/get" getsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/get"
putsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/put"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
"go.uber.org/zap" "go.uber.org/zap"
) )
@ -24,7 +24,7 @@ type cfg struct {
log *logger.Logger log *logger.Logger
remoteSender *putsvc.RemoteSender remoteSender *objectwriter.RemoteSender
remoteGetter *getsvc.RemoteGetter remoteGetter *getsvc.RemoteGetter
@ -67,7 +67,7 @@ func WithLogger(v *logger.Logger) Option {
} }
// WithRemoteSender returns option to set remote object sender of Replicator. // WithRemoteSender returns option to set remote object sender of Replicator.
func WithRemoteSender(v *putsvc.RemoteSender) Option { func WithRemoteSender(v *objectwriter.RemoteSender) Option {
return func(c *cfg) { return func(c *cfg) {
c.remoteSender = v c.remoteSender = v
} }