From b3deb893ba26aa3ec9ce93213cef16243cc0f58d Mon Sep 17 00:00:00 2001 From: Airat Arifullin Date: Fri, 30 Aug 2024 12:09:14 +0300 Subject: [PATCH] [#1310] object: Move target initialization to separate package * Split the logic of write target initialization to different packages; * Refactor patch and put services: since both service initialize the target themselves. Signed-off-by: Airat Arifullin --- cmd/frostfs-node/cache.go | 6 +- cmd/frostfs-node/object.go | 17 +- .../object/{put => common/target}/builder.go | 2 +- .../object/{put => common/target}/pool.go | 2 +- pkg/services/object/common/target/target.go | 170 +++++++++++ .../{put => common/target}/validation.go | 2 +- .../object/{put => common/writer}/common.go | 42 +-- .../writer.go => common/writer/dispatcher.go} | 2 +- .../{put => common/writer}/distributed.go | 70 ++--- .../object/{put => common/writer}/ec.go | 104 +++---- .../object/{put => common/writer}/local.go | 14 +- .../object/{put => common/writer}/remote.go | 12 +- pkg/services/object/common/writer/writer.go | 183 +++++++++++ pkg/services/object/patch/service.go | 25 +- pkg/services/object/patch/streamer.go | 28 +- pkg/services/object/patch/util.go | 19 -- pkg/services/object/put/service.go | 114 ++----- pkg/services/object/put/single.go | 64 ++-- pkg/services/object/put/streamer.go | 289 ++---------------- pkg/services/object/put/v2/streamer.go | 9 +- pkg/services/replicator/process.go | 4 +- pkg/services/replicator/replicator.go | 6 +- 22 files changed, 599 insertions(+), 585 deletions(-) rename pkg/services/object/{put => common/target}/builder.go (98%) rename pkg/services/object/{put => common/target}/pool.go (96%) create mode 100644 pkg/services/object/common/target/target.go rename pkg/services/object/{put => common/target}/validation.go (99%) rename pkg/services/object/{put => common/writer}/common.go (65%) rename pkg/services/object/{put/writer.go => common/writer/dispatcher.go} (97%) rename pkg/services/object/{put => common/writer}/distributed.go (57%) rename pkg/services/object/{put => common/writer}/ec.go (69%) rename pkg/services/object/{put => common/writer}/local.go (81%) rename pkg/services/object/{put => common/writer}/remote.go (92%) create mode 100644 pkg/services/object/common/writer/writer.go diff --git a/cmd/frostfs-node/cache.go b/cmd/frostfs-node/cache.go index 81d55272..57f65d87 100644 --- a/cmd/frostfs-node/cache.go +++ b/cmd/frostfs-node/cache.go @@ -7,7 +7,7 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/internal/metrics" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap" - putsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/put" + objectwriter "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/common/writer" utilSync "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/sync" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" @@ -305,11 +305,11 @@ type ttlMaxObjectSizeCache struct { mtx sync.RWMutex lastUpdated time.Time lastSize uint64 - src putsvc.MaxSizeSource + src objectwriter.MaxSizeSource metrics cacheMetrics } -func newCachedMaxObjectSizeSource(src putsvc.MaxSizeSource) putsvc.MaxSizeSource { +func newCachedMaxObjectSizeSource(src objectwriter.MaxSizeSource) objectwriter.MaxSizeSource { return &ttlMaxObjectSizeCache{ src: src, metrics: metrics.NewCacheMetrics("max_object_size"), diff --git a/cmd/frostfs-node/object.go b/cmd/frostfs-node/object.go index 467c5901..610e2c36 100644 --- a/cmd/frostfs-node/object.go +++ b/cmd/frostfs-node/object.go @@ -24,6 +24,7 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/acl" v2 "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/acl/v2" objectAPE "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/ape" + objectwriter "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/common/writer" deletesvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/delete" deletesvcV2 "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/delete/v2" getsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/get" @@ -188,7 +189,7 @@ func initObjectService(c *cfg) { sDeleteV2 := createDeleteServiceV2(sDelete) - sPatch := createPatchSvc(sGet, sPut, keyStorage) + sPatch := createPatchSvc(sGet, sPut) // build service pipeline // grpc | audit | | signature | response | acl | ape | split @@ -326,7 +327,7 @@ func createReplicator(c *cfg, keyStorage *util.KeyStorage, cache *cache.ClientCa ), replicator.WithLocalStorage(ls), replicator.WithRemoteSender( - putsvc.NewRemoteSender(keyStorage, cache), + objectwriter.NewRemoteSender(keyStorage, cache), ), replicator.WithRemoteGetter( getsvc.NewRemoteGetter(c.clientCache, c.netMapSource, keyStorage), @@ -338,7 +339,7 @@ func createReplicator(c *cfg, keyStorage *util.KeyStorage, cache *cache.ClientCa func createPutSvc(c *cfg, keyStorage *util.KeyStorage, irFetcher *cachedIRFetcher) *putsvc.Service { ls := c.cfgObject.cfgLocalStorage.localStorage - var os putsvc.ObjectStorage = engineWithoutNotifications{ + var os objectwriter.ObjectStorage = engineWithoutNotifications{ engine: ls, } @@ -352,9 +353,9 @@ func createPutSvc(c *cfg, keyStorage *util.KeyStorage, irFetcher *cachedIRFetche c, c.cfgNetmap.state, irFetcher, - putsvc.WithWorkerPools(c.cfgObject.pool.putRemote, c.cfgObject.pool.putLocal), - putsvc.WithLogger(c.log), - putsvc.WithVerifySessionTokenIssuer(!c.cfgObject.skipSessionTokenIssuerVerification), + objectwriter.WithWorkerPools(c.cfgObject.pool.putRemote, c.cfgObject.pool.putLocal), + objectwriter.WithLogger(c.log), + objectwriter.WithVerifySessionTokenIssuer(!c.cfgObject.skipSessionTokenIssuerVerification), ) } @@ -362,8 +363,8 @@ func createPutSvcV2(sPut *putsvc.Service, keyStorage *util.KeyStorage) *putsvcV2 return putsvcV2.NewService(sPut, keyStorage) } -func createPatchSvc(sGet *getsvc.Service, sPut *putsvc.Service, keyStorage *util.KeyStorage) *patchsvc.Service { - return patchsvc.NewService(keyStorage, sGet, sPut) +func createPatchSvc(sGet *getsvc.Service, sPut *putsvc.Service) *patchsvc.Service { + return patchsvc.NewService(sPut.Config, sGet) } func createSearchSvc(c *cfg, keyStorage *util.KeyStorage, traverseGen *util.TraverserGenerator, coreConstructor *cache.ClientCache) *searchsvc.Service { diff --git a/pkg/services/object/put/builder.go b/pkg/services/object/common/target/builder.go similarity index 98% rename from pkg/services/object/put/builder.go rename to pkg/services/object/common/target/builder.go index 64baf4e0..ea68365a 100644 --- a/pkg/services/object/put/builder.go +++ b/pkg/services/object/common/target/builder.go @@ -1,4 +1,4 @@ -package putsvc +package target import ( "context" diff --git a/pkg/services/object/put/pool.go b/pkg/services/object/common/target/pool.go similarity index 96% rename from pkg/services/object/put/pool.go rename to pkg/services/object/common/target/pool.go index ebe214ca..71da305a 100644 --- a/pkg/services/object/put/pool.go +++ b/pkg/services/object/common/target/pool.go @@ -1,4 +1,4 @@ -package putsvc +package target import ( "sync" diff --git a/pkg/services/object/common/target/target.go b/pkg/services/object/common/target/target.go new file mode 100644 index 00000000..00080ace --- /dev/null +++ b/pkg/services/object/common/target/target.go @@ -0,0 +1,170 @@ +package target + +import ( + "errors" + "fmt" + + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap" + objectwriter "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/common/writer" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement" + containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/transformer" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user" +) + +func New(prm *objectwriter.Params) (transformer.ChunkedObjectWriter, error) { + // prepare needed put parameters + if err := preparePrm(prm); err != nil { + return nil, fmt.Errorf("could not prepare put parameters: %w", err) + } + + if prm.Header.Signature() != nil { + return newUntrustedTarget(prm) + } + return newTrustedTarget(prm) +} + +func newUntrustedTarget(prm *objectwriter.Params) (transformer.ChunkedObjectWriter, error) { + maxPayloadSz := prm.Config.MaxSizeSrc.MaxObjectSize() + if maxPayloadSz == 0 { + return nil, errors.New("could not obtain max object size parameter") + } + + if prm.SignRequestPrivateKey == nil { + nodeKey, err := prm.Config.KeyStorage.GetKey(nil) + if err != nil { + return nil, err + } + prm.SignRequestPrivateKey = nodeKey + } + + // prepare untrusted-Put object target + return &validatingPreparedTarget{ + nextTarget: newInMemoryObjectBuilder(objectwriter.New(prm)), + fmt: prm.Config.FormatValidator, + + maxPayloadSz: maxPayloadSz, + }, nil +} + +func newTrustedTarget(prm *objectwriter.Params) (transformer.ChunkedObjectWriter, error) { + maxPayloadSz := prm.Config.MaxSizeSrc.MaxObjectSize() + if maxPayloadSz == 0 { + return nil, errors.New("could not obtain max object size parameter") + } + + sToken := prm.Common.SessionToken() + + // prepare trusted-Put object target + + // get private token from local storage + var sessionInfo *util.SessionInfo + + if sToken != nil { + sessionInfo = &util.SessionInfo{ + ID: sToken.ID(), + Owner: sToken.Issuer(), + } + } + + key, err := prm.Config.KeyStorage.GetKey(sessionInfo) + if err != nil { + return nil, fmt.Errorf("could not receive session key: %w", err) + } + + // In case session token is missing, the line above returns the default key. + // If it isn't owner key, replication attempts will fail, thus this check. + ownerObj := prm.Header.OwnerID() + if ownerObj.IsEmpty() { + return nil, errors.New("missing object owner") + } + + if sToken == nil { + var ownerSession user.ID + user.IDFromKey(&ownerSession, key.PublicKey) + + if !ownerObj.Equals(ownerSession) { + return nil, errors.New("session token is missing but object owner id is different from the default key") + } + } else { + if !ownerObj.Equals(sessionInfo.Owner) { + return nil, fmt.Errorf("different token issuer and object owner identifiers %s/%s", sessionInfo.Owner, ownerObj) + } + } + + if prm.SignRequestPrivateKey == nil { + prm.SignRequestPrivateKey = key + } + + return &validatingTarget{ + fmt: prm.Config.FormatValidator, + nextTarget: transformer.NewPayloadSizeLimiter(transformer.Params{ + Key: key, + NextTargetInit: func() transformer.ObjectWriter { return objectwriter.New(prm) }, + NetworkState: prm.Config.NetworkState, + MaxSize: maxPayloadSz, + WithoutHomomorphicHash: containerSDK.IsHomomorphicHashingDisabled(prm.Container), + SessionToken: sToken, + }), + }, nil +} + +func preparePrm(prm *objectwriter.Params) error { + var err error + + // get latest network map + nm, err := netmap.GetLatestNetworkMap(prm.Config.NetmapSource) + if err != nil { + //return fmt.Errorf("(%T) could not get latest network map: %w", p, err) + return fmt.Errorf("could not get latest network map: %w", err) + } + + idCnr, ok := prm.Header.ContainerID() + if !ok { + return errors.New("missing container ID") + } + + // get container to store the object + cnrInfo, err := prm.Config.ContainerSource.Get(idCnr) + if err != nil { + //return fmt.Errorf("(%T) could not get container by ID: %w", p, err) + return fmt.Errorf("could not get container by ID: %w", err) + } + + prm.Container = cnrInfo.Value + + // add common options + prm.TraverseOpts = append(prm.TraverseOpts, + // set processing container + placement.ForContainer(prm.Container), + ) + + if ech := prm.Header.ECHeader(); ech != nil { + prm.TraverseOpts = append(prm.TraverseOpts, + // set identifier of the processing object + placement.ForObject(ech.Parent()), + ) + } else if id, ok := prm.Header.ID(); ok { + prm.TraverseOpts = append(prm.TraverseOpts, + // set identifier of the processing object + placement.ForObject(id), + ) + } + + // create placement builder from network map + builder := placement.NewNetworkMapBuilder(nm) + + if prm.Common.LocalOnly() { + // restrict success count to 1 stored copy (to local storage) + prm.TraverseOpts = append(prm.TraverseOpts, placement.SuccessAfter(1)) + + // use local-only placement builder + builder = util.NewLocalPlacement(builder, prm.Config.NetmapKeys) + } + + // set placement builder + prm.TraverseOpts = append(prm.TraverseOpts, placement.UseBuilder(builder)) + + return nil +} diff --git a/pkg/services/object/put/validation.go b/pkg/services/object/common/target/validation.go similarity index 99% rename from pkg/services/object/put/validation.go rename to pkg/services/object/common/target/validation.go index c2b078ef..b29721d0 100644 --- a/pkg/services/object/put/validation.go +++ b/pkg/services/object/common/target/validation.go @@ -1,4 +1,4 @@ -package putsvc +package target import ( "bytes" diff --git a/pkg/services/object/put/common.go b/pkg/services/object/common/writer/common.go similarity index 65% rename from pkg/services/object/put/common.go rename to pkg/services/object/common/writer/common.go index cbb7f5f3..6689557e 100644 --- a/pkg/services/object/put/common.go +++ b/pkg/services/object/common/writer/common.go @@ -1,4 +1,4 @@ -package putsvc +package writer import ( "context" @@ -13,23 +13,23 @@ import ( "go.uber.org/zap" ) -type nodeIterator struct { - traversal - cfg *cfg +type NodeIterator struct { + Traversal + cfg *Config } -func (c *cfg) newNodeIterator(opts []placement.Option) *nodeIterator { - return &nodeIterator{ - traversal: traversal{ - opts: opts, - mExclude: make(map[string]*bool), +func (c *Config) NewNodeIterator(opts []placement.Option) *NodeIterator { + return &NodeIterator{ + Traversal: Traversal{ + Opts: opts, + Exclude: make(map[string]*bool), }, cfg: c, } } -func (n *nodeIterator) forEachNode(ctx context.Context, f func(context.Context, nodeDesc) error) error { - traverser, err := placement.NewTraverser(n.traversal.opts...) +func (n *NodeIterator) ForEachNode(ctx context.Context, f func(context.Context, NodeDescriptor) error) error { + traverser, err := placement.NewTraverser(n.Traversal.Opts...) if err != nil { return fmt.Errorf("could not create object placement traverser: %w", err) } @@ -56,10 +56,10 @@ func (n *nodeIterator) forEachNode(ctx context.Context, f func(context.Context, } // perform additional container broadcast if needed - if n.traversal.submitPrimaryPlacementFinish() { - err := n.forEachNode(ctx, f) + if n.Traversal.submitPrimaryPlacementFinish() { + err := n.ForEachNode(ctx, f) if err != nil { - n.cfg.log.Error(logs.PutAdditionalContainerBroadcastFailure, zap.Error(err)) + n.cfg.Logger.Error(logs.PutAdditionalContainerBroadcastFailure, zap.Error(err)) // we don't fail primary operation because of broadcast failure } } @@ -67,11 +67,11 @@ func (n *nodeIterator) forEachNode(ctx context.Context, f func(context.Context, return nil } -func (n *nodeIterator) forEachAddress(ctx context.Context, traverser *placement.Traverser, addrs []placement.Node, f func(context.Context, nodeDesc) error, resErr *atomic.Value) bool { +func (n *NodeIterator) forEachAddress(ctx context.Context, traverser *placement.Traverser, addrs []placement.Node, f func(context.Context, NodeDescriptor) error, resErr *atomic.Value) bool { var wg sync.WaitGroup for _, addr := range addrs { - if ok := n.mExclude[string(addr.PublicKey())]; ok != nil { + if ok := n.Exclude[string(addr.PublicKey())]; ok != nil { if *ok { traverser.SubmitSuccess() } @@ -86,10 +86,10 @@ func (n *nodeIterator) forEachAddress(ctx context.Context, traverser *placement. if err := workerPool.Submit(func() { defer wg.Done() - err := f(ctx, nodeDesc{local: isLocal, info: addr}) + err := f(ctx, NodeDescriptor{Local: isLocal, Info: addr}) if err != nil { resErr.Store(err) - svcutil.LogServiceError(n.cfg.log, "PUT", addr.Addresses(), err) + svcutil.LogServiceError(n.cfg.Logger, "PUT", addr.Addresses(), err) return } @@ -97,7 +97,7 @@ func (n *nodeIterator) forEachAddress(ctx context.Context, traverser *placement. *item = true }); err != nil { wg.Done() - svcutil.LogWorkerPoolError(n.cfg.log, "PUT", err) + svcutil.LogWorkerPoolError(n.cfg.Logger, "PUT", err) return true } @@ -105,7 +105,7 @@ func (n *nodeIterator) forEachAddress(ctx context.Context, traverser *placement. // in subsequent container broadcast. Note that we don't // process this node during broadcast if primary placement // on it failed. - n.traversal.submitProcessed(addr, item) + n.Traversal.submitProcessed(addr, item) } wg.Wait() @@ -113,6 +113,6 @@ func (n *nodeIterator) forEachAddress(ctx context.Context, traverser *placement. return false } -func needAdditionalBroadcast(obj *objectSDK.Object, localOnly bool) bool { +func NeedAdditionalBroadcast(obj *objectSDK.Object, localOnly bool) bool { return len(obj.Children()) > 0 || (!localOnly && (obj.Type() == objectSDK.TypeTombstone || obj.Type() == objectSDK.TypeLock)) } diff --git a/pkg/services/object/put/writer.go b/pkg/services/object/common/writer/dispatcher.go similarity index 97% rename from pkg/services/object/put/writer.go rename to pkg/services/object/common/writer/dispatcher.go index 53eee600..bb9a54ce 100644 --- a/pkg/services/object/put/writer.go +++ b/pkg/services/object/common/writer/dispatcher.go @@ -1,4 +1,4 @@ -package putsvc +package writer import ( "context" diff --git a/pkg/services/object/put/distributed.go b/pkg/services/object/common/writer/distributed.go similarity index 57% rename from pkg/services/object/put/distributed.go rename to pkg/services/object/common/writer/distributed.go index 5176f7a5..f62934be 100644 --- a/pkg/services/object/put/distributed.go +++ b/pkg/services/object/common/writer/distributed.go @@ -1,4 +1,4 @@ -package putsvc +package writer import ( "context" @@ -13,47 +13,47 @@ type preparedObjectTarget interface { WriteObject(context.Context, *objectSDK.Object, object.ContentMeta) error } -type distributedTarget struct { +type distributedWriter struct { + cfg *Config + placementOpts []placement.Option obj *objectSDK.Object objMeta object.ContentMeta - *cfg + nodeTargetInitializer func(NodeDescriptor) preparedObjectTarget - nodeTargetInitializer func(nodeDesc) preparedObjectTarget - - relay func(context.Context, nodeDesc) error + relay func(context.Context, NodeDescriptor) error resetSuccessAfterOnBroadcast bool } -// parameters and state of container traversal. -type traversal struct { - opts []placement.Option +// parameters and state of container Traversal. +type Traversal struct { + Opts []placement.Option // need of additional broadcast after the object is saved - extraBroadcastEnabled bool + ExtraBroadcastEnabled bool // container nodes which was processed during the primary object placement - mExclude map[string]*bool + Exclude map[string]*bool - resetSuccessAfterOnBroadcast bool + ResetSuccessAfterOnBroadcast bool } // updates traversal parameters after the primary placement finish and // returns true if additional container broadcast is needed. -func (x *traversal) submitPrimaryPlacementFinish() bool { - if x.extraBroadcastEnabled { +func (x *Traversal) submitPrimaryPlacementFinish() bool { + if x.ExtraBroadcastEnabled { // do not track success during container broadcast (best-effort) - x.opts = append(x.opts, placement.WithoutSuccessTracking()) + x.Opts = append(x.Opts, placement.WithoutSuccessTracking()) - if x.resetSuccessAfterOnBroadcast { - x.opts = append(x.opts, placement.ResetSuccessAfter()) + if x.ResetSuccessAfterOnBroadcast { + x.Opts = append(x.Opts, placement.ResetSuccessAfter()) } // avoid 2nd broadcast - x.extraBroadcastEnabled = false + x.ExtraBroadcastEnabled = false return true } @@ -62,22 +62,22 @@ func (x *traversal) submitPrimaryPlacementFinish() bool { } // marks the container node as processed during the primary object placement. -func (x *traversal) submitProcessed(n placement.Node, item *bool) { - if x.extraBroadcastEnabled { +func (x *Traversal) submitProcessed(n placement.Node, item *bool) { + if x.ExtraBroadcastEnabled { key := string(n.PublicKey()) - if x.mExclude == nil { - x.mExclude = make(map[string]*bool, 1) + if x.Exclude == nil { + x.Exclude = make(map[string]*bool, 1) } - x.mExclude[key] = item + x.Exclude[key] = item } } -type nodeDesc struct { - local bool +type NodeDescriptor struct { + Local bool - info placement.Node + Info placement.Node } // errIncompletePut is returned if processing on a container fails. @@ -96,19 +96,19 @@ func (x errIncompletePut) Error() string { } // WriteObject implements the transformer.ObjectWriter interface. -func (t *distributedTarget) WriteObject(ctx context.Context, obj *objectSDK.Object) error { +func (t *distributedWriter) WriteObject(ctx context.Context, obj *objectSDK.Object) error { t.obj = obj var err error - if t.objMeta, err = t.fmtValidator.ValidateContent(t.obj); err != nil { + if t.objMeta, err = t.cfg.FormatValidator.ValidateContent(t.obj); err != nil { return fmt.Errorf("(%T) could not validate payload content: %w", t, err) } return t.iteratePlacement(ctx) } -func (t *distributedTarget) sendObject(ctx context.Context, node nodeDesc) error { - if !node.local && t.relay != nil { +func (t *distributedWriter) sendObject(ctx context.Context, node NodeDescriptor) error { + if !node.Local && t.relay != nil { return t.relay(ctx, node) } @@ -121,11 +121,11 @@ func (t *distributedTarget) sendObject(ctx context.Context, node nodeDesc) error return nil } -func (t *distributedTarget) iteratePlacement(ctx context.Context) error { +func (t *distributedWriter) iteratePlacement(ctx context.Context) error { id, _ := t.obj.ID() - iter := t.cfg.newNodeIterator(append(t.placementOpts, placement.ForObject(id))) - iter.extraBroadcastEnabled = needAdditionalBroadcast(t.obj, false /* Distributed target is for cluster-wide PUT */) - iter.resetSuccessAfterOnBroadcast = t.resetSuccessAfterOnBroadcast - return iter.forEachNode(ctx, t.sendObject) + iter := t.cfg.NewNodeIterator(append(t.placementOpts, placement.ForObject(id))) + iter.ExtraBroadcastEnabled = NeedAdditionalBroadcast(t.obj, false /* Distributed target is for cluster-wide PUT */) + iter.ResetSuccessAfterOnBroadcast = t.resetSuccessAfterOnBroadcast + return iter.ForEachNode(ctx, t.sendObject) } diff --git a/pkg/services/object/put/ec.go b/pkg/services/object/common/writer/ec.go similarity index 69% rename from pkg/services/object/put/ec.go rename to pkg/services/object/common/writer/ec.go index 9980f6d6..fb0a8e4e 100644 --- a/pkg/services/object/put/ec.go +++ b/pkg/services/object/common/writer/ec.go @@ -1,4 +1,4 @@ -package putsvc +package writer import ( "context" @@ -23,23 +23,23 @@ import ( "golang.org/x/sync/errgroup" ) -var _ transformer.ObjectWriter = (*ecWriter)(nil) +var _ transformer.ObjectWriter = (*ECWriter)(nil) var errUnsupportedECObject = errors.New("object is not supported for erasure coding") -type ecWriter struct { - cfg *cfg - placementOpts []placement.Option - container containerSDK.Container - key *ecdsa.PrivateKey - commonPrm *svcutil.CommonPrm - relay func(context.Context, client.NodeInfo, client.MultiAddressClient) error +type ECWriter struct { + Config *Config + PlacementOpts []placement.Option + Container containerSDK.Container + Key *ecdsa.PrivateKey + CommonPrm *svcutil.CommonPrm + Relay func(context.Context, client.NodeInfo, client.MultiAddressClient) error - objMeta object.ContentMeta - objMetaValid bool + ObjectMeta object.ContentMeta + ObjectMetaValid bool } -func (e *ecWriter) WriteObject(ctx context.Context, obj *objectSDK.Object) error { +func (e *ECWriter) WriteObject(ctx context.Context, obj *objectSDK.Object) error { relayed, err := e.relayIfNotContainerNode(ctx, obj) if err != nil { return err @@ -53,11 +53,11 @@ func (e *ecWriter) WriteObject(ctx context.Context, obj *objectSDK.Object) error return errUnsupportedECObject } - if !e.objMetaValid { - if e.objMeta, err = e.cfg.fmtValidator.ValidateContent(obj); err != nil { + if !e.ObjectMetaValid { + if e.ObjectMeta, err = e.Config.FormatValidator.ValidateContent(obj); err != nil { return fmt.Errorf("(%T) could not validate payload content: %w", e, err) } - e.objMetaValid = true + e.ObjectMetaValid = true } if obj.ECHeader() != nil { @@ -66,8 +66,8 @@ func (e *ecWriter) WriteObject(ctx context.Context, obj *objectSDK.Object) error return e.writeRawObject(ctx, obj) } -func (e *ecWriter) relayIfNotContainerNode(ctx context.Context, obj *objectSDK.Object) (bool, error) { - if e.relay == nil { +func (e *ECWriter) relayIfNotContainerNode(ctx context.Context, obj *objectSDK.Object) (bool, error) { + if e.Relay == nil { return false, nil } currentNodeIsContainerNode, err := e.currentNodeIsContainerNode() @@ -90,8 +90,8 @@ func (e *ecWriter) relayIfNotContainerNode(ctx context.Context, obj *objectSDK.O return true, nil } -func (e *ecWriter) currentNodeIsContainerNode() (bool, error) { - t, err := placement.NewTraverser(e.placementOpts...) +func (e *ECWriter) currentNodeIsContainerNode() (bool, error) { + t, err := placement.NewTraverser(e.PlacementOpts...) if err != nil { return false, err } @@ -101,7 +101,7 @@ func (e *ecWriter) currentNodeIsContainerNode() (bool, error) { break } for _, node := range nodes { - if e.cfg.netmapKeys.IsLocalKey(node.PublicKey()) { + if e.Config.NetmapKeys.IsLocalKey(node.PublicKey()) { return true, nil } } @@ -109,8 +109,8 @@ func (e *ecWriter) currentNodeIsContainerNode() (bool, error) { return false, nil } -func (e *ecWriter) relayToContainerNode(ctx context.Context, objID oid.ID, index uint32) error { - t, err := placement.NewTraverser(append(e.placementOpts, placement.ForObject(objID))...) +func (e *ECWriter) relayToContainerNode(ctx context.Context, objID oid.ID, index uint32) error { + t, err := placement.NewTraverser(append(e.PlacementOpts, placement.ForObject(objID))...) if err != nil { return err } @@ -126,18 +126,18 @@ func (e *ecWriter) relayToContainerNode(ctx context.Context, objID oid.ID, index var info client.NodeInfo client.NodeInfoFromNetmapElement(&info, node) - c, err := e.cfg.clientConstructor.Get(info) + c, err := e.Config.ClientConstructor.Get(info) if err != nil { return fmt.Errorf("could not create SDK client %s: %w", info.AddressGroup(), err) } completed := make(chan interface{}) - if poolErr := e.cfg.remotePool.Submit(func() { + if poolErr := e.Config.RemotePool.Submit(func() { defer close(completed) - err = e.relay(ctx, info, c) + err = e.Relay(ctx, info, c) }); poolErr != nil { close(completed) - svcutil.LogWorkerPoolError(e.cfg.log, "PUT", poolErr) + svcutil.LogWorkerPoolError(e.Config.Logger, "PUT", poolErr) return poolErr } <-completed @@ -145,7 +145,7 @@ func (e *ecWriter) relayToContainerNode(ctx context.Context, objID oid.ID, index if err == nil { return nil } - e.cfg.log.Logger.Warn(logs.ECFailedToSendToContainerNode, zap.Stringers("address_group", info.AddressGroup())) + e.Config.Logger.Logger.Warn(logs.ECFailedToSendToContainerNode, zap.Stringers("address_group", info.AddressGroup())) lastErr = err } } @@ -157,12 +157,12 @@ func (e *ecWriter) relayToContainerNode(ctx context.Context, objID oid.ID, index } } -func (e *ecWriter) writeECPart(ctx context.Context, obj *objectSDK.Object) error { - if e.commonPrm.LocalOnly() { +func (e *ECWriter) writeECPart(ctx context.Context, obj *objectSDK.Object) error { + if e.CommonPrm.LocalOnly() { return e.writePartLocal(ctx, obj) } - t, err := placement.NewTraverser(append(e.placementOpts, placement.ForObject(obj.ECHeader().Parent()))...) + t, err := placement.NewTraverser(append(e.PlacementOpts, placement.ForObject(obj.ECHeader().Parent()))...) if err != nil { return err } @@ -187,18 +187,18 @@ func (e *ecWriter) writeECPart(ctx context.Context, obj *objectSDK.Object) error return nil } -func (e *ecWriter) writeRawObject(ctx context.Context, obj *objectSDK.Object) error { +func (e *ECWriter) writeRawObject(ctx context.Context, obj *objectSDK.Object) error { // now only single EC policy is supported - c, err := erasurecode.NewConstructor(policy.ECDataCount(e.container.PlacementPolicy()), policy.ECParityCount(e.container.PlacementPolicy())) + c, err := erasurecode.NewConstructor(policy.ECDataCount(e.Container.PlacementPolicy()), policy.ECParityCount(e.Container.PlacementPolicy())) if err != nil { return err } - parts, err := c.Split(obj, e.key) + parts, err := c.Split(obj, e.Key) if err != nil { return err } objID, _ := obj.ID() - t, err := placement.NewTraverser(append(e.placementOpts, placement.ForObject(objID))...) + t, err := placement.NewTraverser(append(e.PlacementOpts, placement.ForObject(objID))...) if err != nil { return err } @@ -230,7 +230,7 @@ func (e *ecWriter) writeRawObject(ctx context.Context, obj *objectSDK.Object) er return nil } -func (e *ecWriter) writePart(ctx context.Context, obj *objectSDK.Object, partIdx int, nodes []placement.Node, visited []atomic.Bool) error { +func (e *ECWriter) writePart(ctx context.Context, obj *objectSDK.Object, partIdx int, nodes []placement.Node, visited []atomic.Bool) error { select { case <-ctx.Done(): return ctx.Err() @@ -243,7 +243,7 @@ func (e *ecWriter) writePart(ctx context.Context, obj *objectSDK.Object, partIdx if err == nil { return nil } - e.cfg.log.Warn(logs.ECFailedToSaveECPart, zap.Stringer("part_address", object.AddressOf(obj)), + e.Config.Logger.Warn(logs.ECFailedToSaveECPart, zap.Stringer("part_address", object.AddressOf(obj)), zap.Stringer("parent_address", obj.ECHeader().Parent()), zap.Int("part_index", partIdx), zap.String("node", hex.EncodeToString(node.PublicKey())), zap.Error(err)) @@ -267,7 +267,7 @@ func (e *ecWriter) writePart(ctx context.Context, obj *objectSDK.Object, partIdx if err == nil { return nil } - e.cfg.log.Warn(logs.ECFailedToSaveECPart, zap.Stringer("part_address", object.AddressOf(obj)), + e.Config.Logger.Warn(logs.ECFailedToSaveECPart, zap.Stringer("part_address", object.AddressOf(obj)), zap.Stringer("parent_address", obj.ECHeader().Parent()), zap.Int("part_index", partIdx), zap.String("node", hex.EncodeToString(node.PublicKey())), zap.Error(err)) @@ -291,7 +291,7 @@ func (e *ecWriter) writePart(ctx context.Context, obj *objectSDK.Object, partIdx if err == nil { return nil } - e.cfg.log.Warn(logs.ECFailedToSaveECPart, zap.Stringer("part_address", object.AddressOf(obj)), + e.Config.Logger.Warn(logs.ECFailedToSaveECPart, zap.Stringer("part_address", object.AddressOf(obj)), zap.Stringer("parent_address", obj.ECHeader().Parent()), zap.Int("part_index", partIdx), zap.String("node", hex.EncodeToString(node.PublicKey())), zap.Error(err)) @@ -300,22 +300,22 @@ func (e *ecWriter) writePart(ctx context.Context, obj *objectSDK.Object, partIdx return fmt.Errorf("failed to save EC chunk %s to any node", object.AddressOf(obj)) } -func (e *ecWriter) putECPartToNode(ctx context.Context, obj *objectSDK.Object, node placement.Node) error { - if e.cfg.netmapKeys.IsLocalKey(node.PublicKey()) { +func (e *ECWriter) putECPartToNode(ctx context.Context, obj *objectSDK.Object, node placement.Node) error { + if e.Config.NetmapKeys.IsLocalKey(node.PublicKey()) { return e.writePartLocal(ctx, obj) } return e.writePartRemote(ctx, obj, node) } -func (e *ecWriter) writePartLocal(ctx context.Context, obj *objectSDK.Object) error { +func (e *ECWriter) writePartLocal(ctx context.Context, obj *objectSDK.Object) error { var err error - localTarget := localTarget{ - storage: e.cfg.localStore, + localTarget := LocalTarget{ + Storage: e.Config.LocalStore, } completed := make(chan interface{}) - if poolErr := e.cfg.localPool.Submit(func() { + if poolErr := e.Config.LocalPool.Submit(func() { defer close(completed) - err = localTarget.WriteObject(ctx, obj, e.objMeta) + err = localTarget.WriteObject(ctx, obj, e.ObjectMeta) }); poolErr != nil { close(completed) return poolErr @@ -324,22 +324,22 @@ func (e *ecWriter) writePartLocal(ctx context.Context, obj *objectSDK.Object) er return err } -func (e *ecWriter) writePartRemote(ctx context.Context, obj *objectSDK.Object, node placement.Node) error { +func (e *ECWriter) writePartRemote(ctx context.Context, obj *objectSDK.Object, node placement.Node) error { var clientNodeInfo client.NodeInfo client.NodeInfoFromNetmapElement(&clientNodeInfo, node) - remoteTaget := remoteTarget{ - privateKey: e.key, - clientConstructor: e.cfg.clientConstructor, - commonPrm: e.commonPrm, + remoteTaget := remoteWriter{ + privateKey: e.Key, + clientConstructor: e.Config.ClientConstructor, + commonPrm: e.CommonPrm, nodeInfo: clientNodeInfo, } var err error completed := make(chan interface{}) - if poolErr := e.cfg.remotePool.Submit(func() { + if poolErr := e.Config.RemotePool.Submit(func() { defer close(completed) - err = remoteTaget.WriteObject(ctx, obj, e.objMeta) + err = remoteTaget.WriteObject(ctx, obj, e.ObjectMeta) }); poolErr != nil { close(completed) return poolErr diff --git a/pkg/services/object/put/local.go b/pkg/services/object/common/writer/local.go similarity index 81% rename from pkg/services/object/put/local.go rename to pkg/services/object/common/writer/local.go index 54649adc..02fd25b9 100644 --- a/pkg/services/object/put/local.go +++ b/pkg/services/object/common/writer/local.go @@ -1,4 +1,4 @@ -package putsvc +package writer import ( "context" @@ -24,19 +24,19 @@ type ObjectStorage interface { IsLocked(context.Context, oid.Address) (bool, error) } -type localTarget struct { - storage ObjectStorage +type LocalTarget struct { + Storage ObjectStorage } -func (t localTarget) WriteObject(ctx context.Context, obj *objectSDK.Object, meta objectCore.ContentMeta) error { +func (t LocalTarget) WriteObject(ctx context.Context, obj *objectSDK.Object, meta objectCore.ContentMeta) error { switch meta.Type() { case objectSDK.TypeTombstone: - err := t.storage.Delete(ctx, objectCore.AddressOf(obj), meta.Objects()) + err := t.Storage.Delete(ctx, objectCore.AddressOf(obj), meta.Objects()) if err != nil { return fmt.Errorf("could not delete objects from tombstone locally: %w", err) } case objectSDK.TypeLock: - err := t.storage.Lock(ctx, objectCore.AddressOf(obj), meta.Objects()) + err := t.Storage.Lock(ctx, objectCore.AddressOf(obj), meta.Objects()) if err != nil { return fmt.Errorf("could not lock object from lock objects locally: %w", err) } @@ -44,7 +44,7 @@ func (t localTarget) WriteObject(ctx context.Context, obj *objectSDK.Object, met // objects that do not change meta storage } - if err := t.storage.Put(ctx, obj); err != nil { + if err := t.Storage.Put(ctx, obj); err != nil { return fmt.Errorf("(%T) could not put object to local storage: %w", t, err) } return nil diff --git a/pkg/services/object/put/remote.go b/pkg/services/object/common/writer/remote.go similarity index 92% rename from pkg/services/object/put/remote.go rename to pkg/services/object/common/writer/remote.go index ee8d64e7..697613ff 100644 --- a/pkg/services/object/put/remote.go +++ b/pkg/services/object/common/writer/remote.go @@ -1,4 +1,4 @@ -package putsvc +package writer import ( "context" @@ -16,7 +16,7 @@ import ( "google.golang.org/grpc/status" ) -type remoteTarget struct { +type remoteWriter struct { privateKey *ecdsa.PrivateKey commonPrm *util.CommonPrm @@ -41,7 +41,7 @@ type RemotePutPrm struct { obj *objectSDK.Object } -func (t *remoteTarget) WriteObject(ctx context.Context, obj *objectSDK.Object, _ objectcore.ContentMeta) error { +func (t *remoteWriter) WriteObject(ctx context.Context, obj *objectSDK.Object, _ objectcore.ContentMeta) error { c, err := t.clientConstructor.Get(t.nodeInfo) if err != nil { return fmt.Errorf("(%T) could not create SDK client %s: %w", t, t.nodeInfo, err) @@ -64,7 +64,7 @@ func (t *remoteTarget) WriteObject(ctx context.Context, obj *objectSDK.Object, _ return t.putStream(ctx, prm) } -func (t *remoteTarget) putStream(ctx context.Context, prm internalclient.PutObjectPrm) error { +func (t *remoteWriter) putStream(ctx context.Context, prm internalclient.PutObjectPrm) error { _, err := internalclient.PutObject(ctx, prm) if err != nil { return fmt.Errorf("(%T) could not put object to %s: %w", t, t.nodeInfo.AddressGroup(), err) @@ -72,7 +72,7 @@ func (t *remoteTarget) putStream(ctx context.Context, prm internalclient.PutObje return nil } -func (t *remoteTarget) putSingle(ctx context.Context, prm internalclient.PutObjectPrm) error { +func (t *remoteWriter) putSingle(ctx context.Context, prm internalclient.PutObjectPrm) error { _, err := internalclient.PutObjectSingle(ctx, prm) if err != nil { return fmt.Errorf("(%T) could not put single object to %s: %w", t, t.nodeInfo.AddressGroup(), err) @@ -113,7 +113,7 @@ func (s *RemoteSender) PutObject(ctx context.Context, p *RemotePutPrm) error { return err } - t := &remoteTarget{ + t := &remoteWriter{ privateKey: key, clientConstructor: s.clientConstructor, } diff --git a/pkg/services/object/common/writer/writer.go b/pkg/services/object/common/writer/writer.go new file mode 100644 index 00000000..3d50da98 --- /dev/null +++ b/pkg/services/object/common/writer/writer.go @@ -0,0 +1,183 @@ +package writer + +import ( + "context" + "crypto/ecdsa" + "fmt" + + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/policy" + objutil "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" + containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container" + objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/transformer" +) + +type MaxSizeSource interface { + // MaxObjectSize returns maximum payload size + // of physically stored object in system. + // + // Must return 0 if value can not be obtained. + MaxObjectSize() uint64 +} + +type ClientConstructor interface { + Get(client.NodeInfo) (client.MultiAddressClient, error) +} + +type InnerRing interface { + InnerRingKeys() ([][]byte, error) +} + +type FormatValidatorConfig interface { + VerifySessionTokenIssuer() bool +} + +// Config represents a set of static parameters that are established during +// the initialization phase of all services. +type Config struct { + KeyStorage *objutil.KeyStorage + + MaxSizeSrc MaxSizeSource + + LocalStore ObjectStorage + + ContainerSource container.Source + + NetmapSource netmap.Source + + RemotePool, LocalPool util.WorkerPool + + NetmapKeys netmap.AnnouncedKeys + + FormatValidator *object.FormatValidator + + NetworkState netmap.State + + ClientConstructor ClientConstructor + + Logger *logger.Logger + + VerifySessionTokenIssuer bool +} + +type Option func(*Config) + +func WithWorkerPools(remote, local util.WorkerPool) Option { + return func(c *Config) { + c.RemotePool, c.LocalPool = remote, local + } +} + +func WithLogger(l *logger.Logger) Option { + return func(c *Config) { + c.Logger = l + } +} + +func WithVerifySessionTokenIssuer(v bool) Option { + return func(c *Config) { + c.VerifySessionTokenIssuer = v + } +} + +func (c *Config) getWorkerPool(pub []byte) (util.WorkerPool, bool) { + if c.NetmapKeys.IsLocalKey(pub) { + return c.LocalPool, true + } + return c.RemotePool, false +} + +type Params struct { + Config *Config + + Common *objutil.CommonPrm + + Header *objectSDK.Object + + Container containerSDK.Container + + TraverseOpts []placement.Option + + Relay func(context.Context, client.NodeInfo, client.MultiAddressClient) error + + SignRequestPrivateKey *ecdsa.PrivateKey +} + +func New(prm *Params) transformer.ObjectWriter { + if container.IsECContainer(prm.Container) && object.IsECSupported(prm.Header) { + return newECWriter(prm) + } + return newDefaultObjectWriter(prm, false) +} + +func newDefaultObjectWriter(prm *Params, forECPlacement bool) transformer.ObjectWriter { + var relay func(context.Context, NodeDescriptor) error + if prm.Relay != nil { + relay = func(ctx context.Context, node NodeDescriptor) error { + var info client.NodeInfo + + client.NodeInfoFromNetmapElement(&info, node.Info) + + c, err := prm.Config.ClientConstructor.Get(info) + if err != nil { + return fmt.Errorf("could not create SDK client %s: %w", info.AddressGroup(), err) + } + + return prm.Relay(ctx, info, c) + } + } + + var resetSuccessAfterOnBroadcast bool + traverseOpts := prm.TraverseOpts + if forECPlacement && !prm.Common.LocalOnly() { + // save non-regular and linking object to EC container. + // EC 2.1 -> REP 2, EC 2.2 -> REP 3 etc. + traverseOpts = append(traverseOpts, placement.SuccessAfter(uint32(policy.ECParityCount(prm.Container.PlacementPolicy())+1))) + resetSuccessAfterOnBroadcast = true + } + + return &distributedWriter{ + cfg: prm.Config, + placementOpts: traverseOpts, + resetSuccessAfterOnBroadcast: resetSuccessAfterOnBroadcast, + nodeTargetInitializer: func(node NodeDescriptor) preparedObjectTarget { + if node.Local { + return LocalTarget{ + Storage: prm.Config.LocalStore, + } + } + + rt := &remoteWriter{ + privateKey: prm.SignRequestPrivateKey, + commonPrm: prm.Common, + clientConstructor: prm.Config.ClientConstructor, + } + + client.NodeInfoFromNetmapElement(&rt.nodeInfo, node.Info) + + return rt + }, + relay: relay, + } +} + +func newECWriter(prm *Params) transformer.ObjectWriter { + return &objectWriterDispatcher{ + ecWriter: &ECWriter{ + Config: prm.Config, + PlacementOpts: append(prm.TraverseOpts, placement.WithCopyNumbers(nil)), // copies number ignored for EC + Container: prm.Container, + Key: prm.SignRequestPrivateKey, + CommonPrm: prm.Common, + Relay: prm.Relay, + }, + repWriter: newDefaultObjectWriter(prm, true), + } +} diff --git a/pkg/services/object/patch/service.go b/pkg/services/object/patch/service.go index c4ab15ab..f1082dff 100644 --- a/pkg/services/object/patch/service.go +++ b/pkg/services/object/patch/service.go @@ -2,43 +2,40 @@ package patchsvc import ( "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object" + objectwriter "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/common/writer" getsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/get" - putsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/put" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util" ) // Service implements Put operation of Object service v2. type Service struct { - keyStorage *util.KeyStorage + *objectwriter.Config getSvc *getsvc.Service - - putSvc *putsvc.Service } // NewService constructs Service instance from provided options. -func NewService(ks *util.KeyStorage, getSvc *getsvc.Service, putSvc *putsvc.Service) *Service { +// +// Patch service can use the same objectwriter.Config initializied by Put service. +func NewService(cfg *objectwriter.Config, + getSvc *getsvc.Service, +) *Service { return &Service{ - keyStorage: ks, + Config: cfg, getSvc: getSvc, - - putSvc: putSvc, } } // Put calls internal service and returns v2 object streamer. func (s *Service) Patch() (object.PatchObjectStream, error) { - nodeKey, err := s.keyStorage.GetKey(nil) + nodeKey, err := s.Config.KeyStorage.GetKey(nil) if err != nil { return nil, err } return &Streamer{ - getSvc: s.getSvc, - - putSvc: s.putSvc, - + Config: s.Config, + getSvc: s.getSvc, localNodeKey: nodeKey, }, nil } diff --git a/pkg/services/object/patch/streamer.go b/pkg/services/object/patch/streamer.go index 84363530..85c28cda 100644 --- a/pkg/services/object/patch/streamer.go +++ b/pkg/services/object/patch/streamer.go @@ -9,8 +9,9 @@ import ( objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object" refsV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/refs" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/common/target" + objectwriter "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/common/writer" getsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/get" - putsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/put" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" @@ -21,6 +22,8 @@ import ( // Streamer for the patch handler is a pipeline that merges two incoming streams of patches // and original object payload chunks. The merged result is fed to Put stream target. type Streamer struct { + *objectwriter.Config + // Patcher must be initialized at first Streamer.Send call. patcher patcher.PatchApplier @@ -28,8 +31,6 @@ type Streamer struct { getSvc *getsvc.Service - putSvc *putsvc.Service - localNodeKey *ecdsa.PrivateKey } @@ -78,11 +79,6 @@ func (s *Streamer) init(ctx context.Context, req *objectV2.PatchRequest) error { localNodeKey: s.localNodeKey, } - putstm, err := s.putSvc.Put() - if err != nil { - return err - } - hdr := hdrWithSig.GetHeader() oV2 := new(objectV2.Object) hV2 := new(objectV2.Header) @@ -97,14 +93,14 @@ func (s *Streamer) init(ctx context.Context, req *objectV2.PatchRequest) error { } oV2.GetHeader().SetOwnerID(ownerID) - prm, err := s.putInitPrm(req, oV2) + target, err := target.New(&objectwriter.Params{ + Config: s.Config, + Common: commonPrm, + Header: objectSDK.NewFromV2(oV2), + SignRequestPrivateKey: s.localNodeKey, + }) if err != nil { - return err - } - - err = putstm.Init(ctx, prm) - if err != nil { - return err + return fmt.Errorf("target creation: %w", err) } patcherPrm := patcher.Params{ @@ -112,7 +108,7 @@ func (s *Streamer) init(ctx context.Context, req *objectV2.PatchRequest) error { RangeProvider: rangeProvider, - ObjectWriter: putstm.Target(), + ObjectWriter: target, } s.patcher = patcher.New(patcherPrm) diff --git a/pkg/services/object/patch/util.go b/pkg/services/object/patch/util.go index 1218d669..4f3c3ef1 100644 --- a/pkg/services/object/patch/util.go +++ b/pkg/services/object/patch/util.go @@ -6,31 +6,12 @@ import ( "errors" "fmt" - objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object" "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/refs" "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/session" - putsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/put" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util" - objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user" "github.com/nspcc-dev/neo-go/pkg/crypto/keys" ) -// putInitPrm initializes put paramerer for Put stream. -func (s *Streamer) putInitPrm(req *objectV2.PatchRequest, obj *objectV2.Object) (*putsvc.PutInitPrm, error) { - commonPrm, err := util.CommonPrmFromV2(req) - if err != nil { - return nil, err - } - - prm := new(putsvc.PutInitPrm) - prm.WithObject(objectSDK.NewFromV2(obj)). - WithCommonPrm(commonPrm). - WithPrivateKey(s.localNodeKey) - - return prm, nil -} - func newOwnerID(vh *session.RequestVerificationHeader) (*refs.OwnerID, error) { for vh.GetOrigin() != nil { vh = vh.GetOrigin() diff --git a/pkg/services/object/put/service.go b/pkg/services/object/put/service.go index a9387373..8cf4f0d6 100644 --- a/pkg/services/object/put/service.go +++ b/pkg/services/object/put/service.go @@ -1,132 +1,66 @@ package putsvc import ( - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" + objectwriter "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/common/writer" objutil "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" "go.uber.org/zap" ) -type MaxSizeSource interface { - // MaxObjectSize returns maximum payload size - // of physically stored object in system. - // - // Must return 0 if value can not be obtained. - MaxObjectSize() uint64 -} - type Service struct { - *cfg -} - -type Option func(*cfg) - -type ClientConstructor interface { - Get(client.NodeInfo) (client.MultiAddressClient, error) -} - -type InnerRing interface { - InnerRingKeys() ([][]byte, error) -} - -type FormatValidatorConfig interface { - VerifySessionTokenIssuer() bool -} - -type cfg struct { - keyStorage *objutil.KeyStorage - - maxSizeSrc MaxSizeSource - - localStore ObjectStorage - - cnrSrc container.Source - - netMapSrc netmap.Source - - remotePool, localPool util.WorkerPool - - netmapKeys netmap.AnnouncedKeys - - fmtValidator *object.FormatValidator - - networkState netmap.State - - clientConstructor ClientConstructor - - log *logger.Logger - - verifySessionTokenIssuer bool + *objectwriter.Config } func NewService(ks *objutil.KeyStorage, - cc ClientConstructor, - ms MaxSizeSource, - os ObjectStorage, + cc objectwriter.ClientConstructor, + ms objectwriter.MaxSizeSource, + os objectwriter.ObjectStorage, cs container.Source, ns netmap.Source, nk netmap.AnnouncedKeys, nst netmap.State, - ir InnerRing, - opts ...Option, + ir objectwriter.InnerRing, + opts ...objectwriter.Option, ) *Service { - c := &cfg{ - remotePool: util.NewPseudoWorkerPool(), - localPool: util.NewPseudoWorkerPool(), - log: &logger.Logger{Logger: zap.L()}, - keyStorage: ks, - clientConstructor: cc, - maxSizeSrc: ms, - localStore: os, - cnrSrc: cs, - netMapSrc: ns, - netmapKeys: nk, - networkState: nst, + c := &objectwriter.Config{ + RemotePool: util.NewPseudoWorkerPool(), + LocalPool: util.NewPseudoWorkerPool(), + Logger: &logger.Logger{Logger: zap.L()}, + KeyStorage: ks, + ClientConstructor: cc, + MaxSizeSrc: ms, + LocalStore: os, + ContainerSource: cs, + NetmapSource: ns, + NetmapKeys: nk, + NetworkState: nst, } for i := range opts { opts[i](c) } - c.fmtValidator = object.NewFormatValidator( + c.FormatValidator = object.NewFormatValidator( object.WithLockSource(os), object.WithNetState(nst), object.WithInnerRing(ir), object.WithNetmapSource(ns), object.WithContainersSource(cs), - object.WithVerifySessionTokenIssuer(c.verifySessionTokenIssuer), - object.WithLogger(c.log), + object.WithVerifySessionTokenIssuer(c.VerifySessionTokenIssuer), + object.WithLogger(c.Logger), ) return &Service{ - cfg: c, + Config: c, } } func (p *Service) Put() (*Streamer, error) { return &Streamer{ - cfg: p.cfg, + Config: p.Config, }, nil } - -func WithWorkerPools(remote, local util.WorkerPool) Option { - return func(c *cfg) { - c.remotePool, c.localPool = remote, local - } -} - -func WithLogger(l *logger.Logger) Option { - return func(c *cfg) { - c.log = l - } -} - -func WithVerifySessionTokenIssuer(v bool) Option { - return func(c *cfg) { - c.verifySessionTokenIssuer = v - } -} diff --git a/pkg/services/object/put/single.go b/pkg/services/object/put/single.go index 3cc8518f..9b416326 100644 --- a/pkg/services/object/put/single.go +++ b/pkg/services/object/put/single.go @@ -21,6 +21,8 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/policy" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/common/target" + objectwriter "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/common/writer" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/internal" svcutil "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement" @@ -97,12 +99,12 @@ func (s *Service) validatePutSingle(ctx context.Context, obj *objectSDK.Object) func (s *Service) validarePutSingleSize(obj *objectSDK.Object) error { if uint64(len(obj.Payload())) != obj.PayloadSize() { - return ErrWrongPayloadSize + return target.ErrWrongPayloadSize } - maxAllowedSize := s.maxSizeSrc.MaxObjectSize() + maxAllowedSize := s.Config.MaxSizeSrc.MaxObjectSize() if obj.PayloadSize() > maxAllowedSize { - return ErrExceedingMaxSize + return target.ErrExceedingMaxSize } return nil @@ -137,11 +139,11 @@ func (s *Service) validatePutSingleChecksum(obj *objectSDK.Object) error { } func (s *Service) validatePutSingleObject(ctx context.Context, obj *objectSDK.Object) (object.ContentMeta, error) { - if err := s.fmtValidator.Validate(ctx, obj, false); err != nil { + if err := s.FormatValidator.Validate(ctx, obj, false); err != nil { return object.ContentMeta{}, fmt.Errorf("coud not validate object format: %w", err) } - meta, err := s.fmtValidator.ValidateContent(obj) + meta, err := s.FormatValidator.ValidateContent(obj) if err != nil { return object.ContentMeta{}, fmt.Errorf("could not validate payload content: %w", err) } @@ -164,17 +166,17 @@ func (s *Service) saveToNodes(ctx context.Context, obj *objectSDK.Object, req *o } func (s *Service) saveToREPReplicas(ctx context.Context, placement putSinglePlacement, obj *objectSDK.Object, localOnly bool, req *objectAPI.PutSingleRequest, meta object.ContentMeta) error { - iter := s.cfg.newNodeIterator(placement.placementOptions) - iter.extraBroadcastEnabled = needAdditionalBroadcast(obj, localOnly) - iter.resetSuccessAfterOnBroadcast = placement.resetSuccessAfterOnBroadcast + iter := s.Config.NewNodeIterator(placement.placementOptions) + iter.ExtraBroadcastEnabled = objectwriter.NeedAdditionalBroadcast(obj, localOnly) + iter.ResetSuccessAfterOnBroadcast = placement.resetSuccessAfterOnBroadcast signer := &putSingleRequestSigner{ req: req, - keyStorage: s.keyStorage, + keyStorage: s.Config.KeyStorage, signer: &sync.Once{}, } - return iter.forEachNode(ctx, func(ctx context.Context, nd nodeDesc) error { + return iter.ForEachNode(ctx, func(ctx context.Context, nd objectwriter.NodeDescriptor) error { return s.saveToPlacementNode(ctx, &nd, obj, signer, meta) }) } @@ -184,25 +186,25 @@ func (s *Service) saveToECReplicas(ctx context.Context, placement putSinglePlace if err != nil { return err } - key, err := s.cfg.keyStorage.GetKey(nil) + key, err := s.Config.KeyStorage.GetKey(nil) if err != nil { return err } signer := &putSingleRequestSigner{ req: req, - keyStorage: s.keyStorage, + keyStorage: s.Config.KeyStorage, signer: &sync.Once{}, } - w := ecWriter{ - cfg: s.cfg, - placementOpts: placement.placementOptions, - objMeta: meta, - objMetaValid: true, - commonPrm: commonPrm, - container: placement.container, - key: key, - relay: func(ctx context.Context, ni client.NodeInfo, mac client.MultiAddressClient) error { + w := objectwriter.ECWriter{ + Config: s.Config, + PlacementOpts: placement.placementOptions, + ObjectMeta: meta, + ObjectMetaValid: true, + CommonPrm: commonPrm, + Container: placement.container, + Key: key, + Relay: func(ctx context.Context, ni client.NodeInfo, mac client.MultiAddressClient) error { return s.redirectPutSingleRequest(ctx, signer, obj, ni, mac) }, } @@ -223,7 +225,7 @@ func (s *Service) getPutSinglePlacementOptions(obj *objectSDK.Object, copiesNumb if !ok { return result, errors.New("missing container ID") } - cnrInfo, err := s.cnrSrc.Get(cnrID) + cnrInfo, err := s.Config.ContainerSource.Get(cnrID) if err != nil { return result, fmt.Errorf("could not get container by ID: %w", err) } @@ -247,31 +249,31 @@ func (s *Service) getPutSinglePlacementOptions(obj *objectSDK.Object, copiesNumb } result.placementOptions = append(result.placementOptions, placement.ForObject(objID)) - latestNetmap, err := netmap.GetLatestNetworkMap(s.netMapSrc) + latestNetmap, err := netmap.GetLatestNetworkMap(s.Config.NetmapSource) if err != nil { return result, fmt.Errorf("could not get latest network map: %w", err) } builder := placement.NewNetworkMapBuilder(latestNetmap) if localOnly { result.placementOptions = append(result.placementOptions, placement.SuccessAfter(1)) - builder = svcutil.NewLocalPlacement(builder, s.netmapKeys) + builder = svcutil.NewLocalPlacement(builder, s.Config.NetmapKeys) } result.placementOptions = append(result.placementOptions, placement.UseBuilder(builder)) return result, nil } -func (s *Service) saveToPlacementNode(ctx context.Context, nodeDesc *nodeDesc, obj *objectSDK.Object, +func (s *Service) saveToPlacementNode(ctx context.Context, nodeDesc *objectwriter.NodeDescriptor, obj *objectSDK.Object, signer *putSingleRequestSigner, meta object.ContentMeta, ) error { - if nodeDesc.local { + if nodeDesc.Local { return s.saveLocal(ctx, obj, meta) } var info client.NodeInfo - client.NodeInfoFromNetmapElement(&info, nodeDesc.info) + client.NodeInfoFromNetmapElement(&info, nodeDesc.Info) - c, err := s.clientConstructor.Get(info) + c, err := s.Config.ClientConstructor.Get(info) if err != nil { return fmt.Errorf("could not create SDK client %s: %w", info.AddressGroup(), err) } @@ -280,8 +282,8 @@ func (s *Service) saveToPlacementNode(ctx context.Context, nodeDesc *nodeDesc, o } func (s *Service) saveLocal(ctx context.Context, obj *objectSDK.Object, meta object.ContentMeta) error { - localTarget := &localTarget{ - storage: s.localStore, + localTarget := &objectwriter.LocalTarget{ + Storage: s.Config.LocalStore, } return localTarget.WriteObject(ctx, obj, meta) } @@ -314,7 +316,7 @@ func (s *Service) redirectPutSingleRequest(ctx context.Context, if err != nil { objID, _ := obj.ID() cnrID, _ := obj.ContainerID() - s.log.Warn(logs.PutSingleRedirectFailure, + s.Config.Logger.Warn(logs.PutSingleRedirectFailure, zap.Error(err), zap.Stringer("address", addr), zap.Stringer("object_id", objID), diff --git a/pkg/services/object/put/streamer.go b/pkg/services/object/put/streamer.go index 6b396ec9..f3803d43 100644 --- a/pkg/services/object/put/streamer.go +++ b/pkg/services/object/put/streamer.go @@ -2,33 +2,21 @@ package putsvc import ( "context" - "crypto/ecdsa" "errors" "fmt" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/policy" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement" - pkgutil "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util" - containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/common/target" + objectwriter "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/common/writer" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/transformer" - "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user" ) type Streamer struct { - *cfg - - privateKey *ecdsa.PrivateKey + *objectwriter.Config target transformer.ChunkedObjectWriter relay func(context.Context, client.NodeInfo, client.MultiAddressClient) error - - maxPayloadSz uint64 // network config } var errNotInit = errors.New("stream not initialized") @@ -36,8 +24,23 @@ var errNotInit = errors.New("stream not initialized") var errInitRecall = errors.New("init recall") func (p *Streamer) Init(ctx context.Context, prm *PutInitPrm) error { + if p.target != nil { + return errInitRecall + } + // initialize destination target - if err := p.initTarget(prm); err != nil { + prmTarget := &objectwriter.Params{ + Config: p.Config, + Common: prm.common, + Header: prm.hdr, + Container: prm.cnr, + TraverseOpts: prm.traverseOpts, + Relay: p.relay, + } + + var err error + p.target, err = target.New(prmTarget) + if err != nil { return fmt.Errorf("(%T) could not initialize object target: %w", p, err) } @@ -47,253 +50,6 @@ func (p *Streamer) Init(ctx context.Context, prm *PutInitPrm) error { return nil } -// Target accesses underlying target chunked object writer. -func (p *Streamer) Target() transformer.ChunkedObjectWriter { - return p.target -} - -// MaxObjectSize returns maximum payload size for the streaming session. -// -// Must be called after the successful Init. -func (p *Streamer) MaxObjectSize() uint64 { - return p.maxPayloadSz -} - -func (p *Streamer) initTarget(prm *PutInitPrm) error { - // prevent re-calling - if p.target != nil { - return errInitRecall - } - - // prepare needed put parameters - if err := p.preparePrm(prm); err != nil { - return fmt.Errorf("(%T) could not prepare put parameters: %w", p, err) - } - - p.maxPayloadSz = p.maxSizeSrc.MaxObjectSize() - if p.maxPayloadSz == 0 { - return fmt.Errorf("(%T) could not obtain max object size parameter", p) - } - - if prm.hdr.Signature() != nil { - return p.initUntrustedTarget(prm) - } - return p.initTrustedTarget(prm) -} - -func (p *Streamer) initUntrustedTarget(prm *PutInitPrm) error { - p.relay = prm.relay - - if prm.privateKey != nil { - p.privateKey = prm.privateKey - } else { - nodeKey, err := p.cfg.keyStorage.GetKey(nil) - if err != nil { - return err - } - p.privateKey = nodeKey - } - - // prepare untrusted-Put object target - p.target = &validatingPreparedTarget{ - nextTarget: newInMemoryObjectBuilder(p.newObjectWriter(prm)), - fmt: p.fmtValidator, - - maxPayloadSz: p.maxPayloadSz, - } - - return nil -} - -func (p *Streamer) initTrustedTarget(prm *PutInitPrm) error { - sToken := prm.common.SessionToken() - - // prepare trusted-Put object target - - // get private token from local storage - var sessionInfo *util.SessionInfo - - if sToken != nil { - sessionInfo = &util.SessionInfo{ - ID: sToken.ID(), - Owner: sToken.Issuer(), - } - } - - key, err := p.keyStorage.GetKey(sessionInfo) - if err != nil { - return fmt.Errorf("(%T) could not receive session key: %w", p, err) - } - - // In case session token is missing, the line above returns the default key. - // If it isn't owner key, replication attempts will fail, thus this check. - ownerObj := prm.hdr.OwnerID() - if ownerObj.IsEmpty() { - return errors.New("missing object owner") - } - - if sToken == nil { - var ownerSession user.ID - user.IDFromKey(&ownerSession, key.PublicKey) - - if !ownerObj.Equals(ownerSession) { - return fmt.Errorf("(%T) session token is missing but object owner id is different from the default key", p) - } - } else { - if !ownerObj.Equals(sessionInfo.Owner) { - return fmt.Errorf("(%T) different token issuer and object owner identifiers %s/%s", p, sessionInfo.Owner, ownerObj) - } - } - - if prm.privateKey != nil { - p.privateKey = prm.privateKey - } else { - p.privateKey = key - } - p.target = &validatingTarget{ - fmt: p.fmtValidator, - nextTarget: transformer.NewPayloadSizeLimiter(transformer.Params{ - Key: key, - NextTargetInit: func() transformer.ObjectWriter { return p.newObjectWriter(prm) }, - NetworkState: p.networkState, - MaxSize: p.maxPayloadSz, - WithoutHomomorphicHash: containerSDK.IsHomomorphicHashingDisabled(prm.cnr), - SessionToken: sToken, - }), - } - - return nil -} - -func (p *Streamer) preparePrm(prm *PutInitPrm) error { - var err error - - // get latest network map - nm, err := netmap.GetLatestNetworkMap(p.netMapSrc) - if err != nil { - return fmt.Errorf("(%T) could not get latest network map: %w", p, err) - } - - idCnr, ok := prm.hdr.ContainerID() - if !ok { - return errors.New("missing container ID") - } - - // get container to store the object - cnrInfo, err := p.cnrSrc.Get(idCnr) - if err != nil { - return fmt.Errorf("(%T) could not get container by ID: %w", p, err) - } - - prm.cnr = cnrInfo.Value - - // add common options - prm.traverseOpts = append(prm.traverseOpts, - // set processing container - placement.ForContainer(prm.cnr), - ) - - if ech := prm.hdr.ECHeader(); ech != nil { - prm.traverseOpts = append(prm.traverseOpts, - // set identifier of the processing object - placement.ForObject(ech.Parent()), - ) - } else if id, ok := prm.hdr.ID(); ok { - prm.traverseOpts = append(prm.traverseOpts, - // set identifier of the processing object - placement.ForObject(id), - ) - } - - // create placement builder from network map - builder := placement.NewNetworkMapBuilder(nm) - - if prm.common.LocalOnly() { - // restrict success count to 1 stored copy (to local storage) - prm.traverseOpts = append(prm.traverseOpts, placement.SuccessAfter(1)) - - // use local-only placement builder - builder = util.NewLocalPlacement(builder, p.netmapKeys) - } - - // set placement builder - prm.traverseOpts = append(prm.traverseOpts, placement.UseBuilder(builder)) - - return nil -} - -func (p *Streamer) newObjectWriter(prm *PutInitPrm) transformer.ObjectWriter { - if container.IsECContainer(prm.cnr) && object.IsECSupported(prm.hdr) { - return p.newECWriter(prm) - } - return p.newDefaultObjectWriter(prm, false) -} - -func (p *Streamer) newDefaultObjectWriter(prm *PutInitPrm, forECPlacement bool) transformer.ObjectWriter { - var relay func(context.Context, nodeDesc) error - if p.relay != nil { - relay = func(ctx context.Context, node nodeDesc) error { - var info client.NodeInfo - - client.NodeInfoFromNetmapElement(&info, node.info) - - c, err := p.clientConstructor.Get(info) - if err != nil { - return fmt.Errorf("could not create SDK client %s: %w", info.AddressGroup(), err) - } - - return p.relay(ctx, info, c) - } - } - - var resetSuccessAfterOnBroadcast bool - traverseOpts := prm.traverseOpts - if forECPlacement && !prm.common.LocalOnly() { - // save non-regular and linking object to EC container. - // EC 2.1 -> REP 2, EC 2.2 -> REP 3 etc. - traverseOpts = append(traverseOpts, placement.SuccessAfter(uint32(policy.ECParityCount(prm.cnr.PlacementPolicy())+1))) - resetSuccessAfterOnBroadcast = true - } - - return &distributedTarget{ - cfg: p.cfg, - placementOpts: traverseOpts, - resetSuccessAfterOnBroadcast: resetSuccessAfterOnBroadcast, - nodeTargetInitializer: func(node nodeDesc) preparedObjectTarget { - if node.local { - return localTarget{ - storage: p.localStore, - } - } - - rt := &remoteTarget{ - privateKey: p.privateKey, - commonPrm: prm.common, - clientConstructor: p.clientConstructor, - } - - client.NodeInfoFromNetmapElement(&rt.nodeInfo, node.info) - - return rt - }, - relay: relay, - } -} - -func (p *Streamer) newECWriter(prm *PutInitPrm) transformer.ObjectWriter { - return &objectWriterDispatcher{ - ecWriter: &ecWriter{ - cfg: p.cfg, - placementOpts: append(prm.traverseOpts, placement.WithCopyNumbers(nil)), // copies number ignored for EC - container: prm.cnr, - key: p.privateKey, - commonPrm: prm.common, - relay: p.relay, - }, - repWriter: p.newDefaultObjectWriter(prm, true), - } -} - func (p *Streamer) SendChunk(ctx context.Context, prm *PutChunkPrm) error { if p.target == nil { return errNotInit @@ -327,10 +83,3 @@ func (p *Streamer) Close(ctx context.Context) (*PutResponse, error) { id: ids.SelfID, }, nil } - -func (c *cfg) getWorkerPool(pub []byte) (pkgutil.WorkerPool, bool) { - if c.netmapKeys.IsLocalKey(pub) { - return c.localPool, true - } - return c.remotePool, false -} diff --git a/pkg/services/object/put/v2/streamer.go b/pkg/services/object/put/v2/streamer.go index 9c6de4ca..5bf15b4c 100644 --- a/pkg/services/object/put/v2/streamer.go +++ b/pkg/services/object/put/v2/streamer.go @@ -11,6 +11,7 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/signature" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/common/target" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/internal" internalclient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/internal/client" putsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/put" @@ -55,7 +56,7 @@ func (s *streamer) Send(ctx context.Context, req *object.PutRequest) (err error) s.saveChunks = v.GetSignature() != nil if s.saveChunks { - maxSz := s.stream.MaxObjectSize() + maxSz := s.stream.MaxSizeSrc.MaxObjectSize() s.sizes = &sizes{ payloadSz: uint64(v.GetHeader().GetPayloadLength()), @@ -63,7 +64,7 @@ func (s *streamer) Send(ctx context.Context, req *object.PutRequest) (err error) // check payload size limit overflow if s.payloadSz > maxSz { - return putsvc.ErrExceedingMaxSize + return target.ErrExceedingMaxSize } s.init = req @@ -74,7 +75,7 @@ func (s *streamer) Send(ctx context.Context, req *object.PutRequest) (err error) // check payload size overflow if s.writtenPayload > s.payloadSz { - return putsvc.ErrWrongPayloadSize + return target.ErrWrongPayloadSize } } @@ -117,7 +118,7 @@ func (s *streamer) CloseAndRecv(ctx context.Context) (*object.PutResponse, error if s.saveChunks { // check payload size correctness if s.writtenPayload != s.payloadSz { - return nil, putsvc.ErrWrongPayloadSize + return nil, target.ErrWrongPayloadSize } } diff --git a/pkg/services/replicator/process.go b/pkg/services/replicator/process.go index 3d04b708..7e5c6e09 100644 --- a/pkg/services/replicator/process.go +++ b/pkg/services/replicator/process.go @@ -5,7 +5,7 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine" - putsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/put" + objectwriter "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/common/writer" tracingPkg "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/tracing" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap" @@ -52,7 +52,7 @@ func (p *Replicator) HandleReplicationTask(ctx context.Context, task Task, res T } } - prm := new(putsvc.RemotePutPrm). + prm := new(objectwriter.RemotePutPrm). WithObject(task.Obj) for i := 0; task.NumCopies > 0 && i < len(task.Nodes); i++ { diff --git a/pkg/services/replicator/replicator.go b/pkg/services/replicator/replicator.go index a67f2e76..f2f86daf 100644 --- a/pkg/services/replicator/replicator.go +++ b/pkg/services/replicator/replicator.go @@ -4,8 +4,8 @@ import ( "time" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine" + objectwriter "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/common/writer" getsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/get" - putsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/put" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" "go.uber.org/zap" ) @@ -24,7 +24,7 @@ type cfg struct { log *logger.Logger - remoteSender *putsvc.RemoteSender + remoteSender *objectwriter.RemoteSender remoteGetter *getsvc.RemoteGetter @@ -67,7 +67,7 @@ func WithLogger(v *logger.Logger) Option { } // WithRemoteSender returns option to set remote object sender of Replicator. -func WithRemoteSender(v *putsvc.RemoteSender) Option { +func WithRemoteSender(v *objectwriter.RemoteSender) Option { return func(c *cfg) { c.remoteSender = v }