object: Move target initialization to separate package #1344
|
@ -7,7 +7,7 @@ import (
|
|||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/metrics"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
|
||||
putsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/put"
|
||||
objectwriter "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/common/writer"
|
||||
utilSync "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/sync"
|
||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
|
@ -305,11 +305,11 @@ type ttlMaxObjectSizeCache struct {
|
|||
mtx sync.RWMutex
|
||||
lastUpdated time.Time
|
||||
lastSize uint64
|
||||
src putsvc.MaxSizeSource
|
||||
src objectwriter.MaxSizeSource
|
||||
metrics cacheMetrics
|
||||
}
|
||||
|
||||
func newCachedMaxObjectSizeSource(src putsvc.MaxSizeSource) putsvc.MaxSizeSource {
|
||||
func newCachedMaxObjectSizeSource(src objectwriter.MaxSizeSource) objectwriter.MaxSizeSource {
|
||||
return &ttlMaxObjectSizeCache{
|
||||
src: src,
|
||||
metrics: metrics.NewCacheMetrics("max_object_size"),
|
||||
|
|
|
@ -24,6 +24,7 @@ import (
|
|||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/acl"
|
||||
v2 "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/acl/v2"
|
||||
objectAPE "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/ape"
|
||||
objectwriter "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/common/writer"
|
||||
deletesvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/delete"
|
||||
deletesvcV2 "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/delete/v2"
|
||||
getsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/get"
|
||||
|
@ -188,7 +189,7 @@ func initObjectService(c *cfg) {
|
|||
|
||||
sDeleteV2 := createDeleteServiceV2(sDelete)
|
||||
|
||||
sPatch := createPatchSvc(sGet, sPut, keyStorage)
|
||||
sPatch := createPatchSvc(sGet, sPut)
|
||||
|
||||
// build service pipeline
|
||||
// grpc | audit | <metrics> | signature | response | acl | ape | split
|
||||
|
@ -326,7 +327,7 @@ func createReplicator(c *cfg, keyStorage *util.KeyStorage, cache *cache.ClientCa
|
|||
),
|
||||
replicator.WithLocalStorage(ls),
|
||||
replicator.WithRemoteSender(
|
||||
putsvc.NewRemoteSender(keyStorage, cache),
|
||||
objectwriter.NewRemoteSender(keyStorage, cache),
|
||||
),
|
||||
replicator.WithRemoteGetter(
|
||||
getsvc.NewRemoteGetter(c.clientCache, c.netMapSource, keyStorage),
|
||||
|
@ -338,7 +339,7 @@ func createReplicator(c *cfg, keyStorage *util.KeyStorage, cache *cache.ClientCa
|
|||
func createPutSvc(c *cfg, keyStorage *util.KeyStorage, irFetcher *cachedIRFetcher) *putsvc.Service {
|
||||
ls := c.cfgObject.cfgLocalStorage.localStorage
|
||||
|
||||
var os putsvc.ObjectStorage = engineWithoutNotifications{
|
||||
var os objectwriter.ObjectStorage = engineWithoutNotifications{
|
||||
engine: ls,
|
||||
}
|
||||
|
||||
|
@ -352,9 +353,9 @@ func createPutSvc(c *cfg, keyStorage *util.KeyStorage, irFetcher *cachedIRFetche
|
|||
c,
|
||||
c.cfgNetmap.state,
|
||||
irFetcher,
|
||||
putsvc.WithWorkerPools(c.cfgObject.pool.putRemote, c.cfgObject.pool.putLocal),
|
||||
putsvc.WithLogger(c.log),
|
||||
putsvc.WithVerifySessionTokenIssuer(!c.cfgObject.skipSessionTokenIssuerVerification),
|
||||
objectwriter.WithWorkerPools(c.cfgObject.pool.putRemote, c.cfgObject.pool.putLocal),
|
||||
objectwriter.WithLogger(c.log),
|
||||
objectwriter.WithVerifySessionTokenIssuer(!c.cfgObject.skipSessionTokenIssuerVerification),
|
||||
)
|
||||
}
|
||||
|
||||
|
@ -362,8 +363,8 @@ func createPutSvcV2(sPut *putsvc.Service, keyStorage *util.KeyStorage) *putsvcV2
|
|||
return putsvcV2.NewService(sPut, keyStorage)
|
||||
}
|
||||
|
||||
func createPatchSvc(sGet *getsvc.Service, sPut *putsvc.Service, keyStorage *util.KeyStorage) *patchsvc.Service {
|
||||
return patchsvc.NewService(keyStorage, sGet, sPut)
|
||||
func createPatchSvc(sGet *getsvc.Service, sPut *putsvc.Service) *patchsvc.Service {
|
||||
return patchsvc.NewService(sPut.Config, sGet)
|
||||
}
|
||||
|
||||
func createSearchSvc(c *cfg, keyStorage *util.KeyStorage, traverseGen *util.TraverserGenerator, coreConstructor *cache.ClientCache) *searchsvc.Service {
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
package putsvc
|
||||
package target
|
||||
|
||||
import (
|
||||
"context"
|
|
@ -1,4 +1,4 @@
|
|||
package putsvc
|
||||
package target
|
||||
|
||||
import (
|
||||
"sync"
|
170
pkg/services/object/common/target/target.go
Normal file
|
@ -0,0 +1,170 @@
|
|||
package target
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
|
||||
objectwriter "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/common/writer"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
|
||||
containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/transformer"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
|
||||
)
|
||||
|
||||
func New(prm *objectwriter.Params) (transformer.ChunkedObjectWriter, error) {
|
||||
// prepare needed put parameters
|
||||
if err := preparePrm(prm); err != nil {
|
||||
return nil, fmt.Errorf("could not prepare put parameters: %w", err)
|
||||
}
|
||||
fyrchik marked this conversation as resolved
Outdated
|
||||
|
||||
if prm.Header.Signature() != nil {
|
||||
return newUntrustedTarget(prm)
|
||||
}
|
||||
return newTrustedTarget(prm)
|
||||
}
|
||||
|
||||
func newUntrustedTarget(prm *objectwriter.Params) (transformer.ChunkedObjectWriter, error) {
|
||||
maxPayloadSz := prm.Config.MaxSizeSrc.MaxObjectSize()
|
||||
if maxPayloadSz == 0 {
|
||||
return nil, errors.New("could not obtain max object size parameter")
|
||||
}
|
||||
|
||||
if prm.SignRequestPrivateKey == nil {
|
||||
nodeKey, err := prm.Config.KeyStorage.GetKey(nil)
|
||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
Why do we need a separate struct here, just to be able to convert it to the struct of the same shape? Why do we need a separate struct here, just to be able to convert it to the struct of the same shape?
aarifullin
commented
Here are highlighted points that aren't related to specifically to this question, but which help to understand why packages were refactored in this way:
It looked ambiguous with me to use Here are highlighted points that aren't related to specifically to this question, but which help to understand why packages were refactored in this way:
1. The `Target` initialization logic should be moved in a common package. Since both `patch` and `put` can reuse this
2. I assumed that `writer` and `target` should be distingushed:
- `writer` package's purpose is to create `transformer.ObjectWriter`
- `writer` package is used in a few packages
- `target` package's purpose is to create `transformer.ChunkedObjectWriter`
- `target` is used only within `put` and `patch` streams
It looked ambiguous with me to use `objectwriter.Params` as an input parameter for target initialization (`New`). I admit that such convertation looks ugly - so, I'll remove it
aarifullin
commented
Since Since `target.New` uses `objectwriter.Params`. Please, check this out
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
prm.SignRequestPrivateKey = nodeKey
|
||||
}
|
||||
|
||||
// prepare untrusted-Put object target
|
||||
return &validatingPreparedTarget{
|
||||
nextTarget: newInMemoryObjectBuilder(objectwriter.New(prm)),
|
||||
fmt: prm.Config.FormatValidator,
|
||||
|
||||
maxPayloadSz: maxPayloadSz,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func newTrustedTarget(prm *objectwriter.Params) (transformer.ChunkedObjectWriter, error) {
|
||||
maxPayloadSz := prm.Config.MaxSizeSrc.MaxObjectSize()
|
||||
if maxPayloadSz == 0 {
|
||||
return nil, errors.New("could not obtain max object size parameter")
|
||||
}
|
||||
|
||||
sToken := prm.Common.SessionToken()
|
||||
|
||||
// prepare trusted-Put object target
|
||||
|
||||
// get private token from local storage
|
||||
var sessionInfo *util.SessionInfo
|
||||
|
||||
if sToken != nil {
|
||||
sessionInfo = &util.SessionInfo{
|
||||
ID: sToken.ID(),
|
||||
Owner: sToken.Issuer(),
|
||||
}
|
||||
}
|
||||
|
||||
key, err := prm.Config.KeyStorage.GetKey(sessionInfo)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not receive session key: %w", err)
|
||||
}
|
||||
|
||||
// In case session token is missing, the line above returns the default key.
|
||||
// If it isn't owner key, replication attempts will fail, thus this check.
|
||||
ownerObj := prm.Header.OwnerID()
|
||||
if ownerObj.IsEmpty() {
|
||||
return nil, errors.New("missing object owner")
|
||||
}
|
||||
|
||||
if sToken == nil {
|
||||
var ownerSession user.ID
|
||||
user.IDFromKey(&ownerSession, key.PublicKey)
|
||||
|
||||
if !ownerObj.Equals(ownerSession) {
|
||||
return nil, errors.New("session token is missing but object owner id is different from the default key")
|
||||
}
|
||||
} else {
|
||||
if !ownerObj.Equals(sessionInfo.Owner) {
|
||||
return nil, fmt.Errorf("different token issuer and object owner identifiers %s/%s", sessionInfo.Owner, ownerObj)
|
||||
}
|
||||
}
|
||||
|
||||
if prm.SignRequestPrivateKey == nil {
|
||||
prm.SignRequestPrivateKey = key
|
||||
}
|
||||
|
||||
return &validatingTarget{
|
||||
fmt: prm.Config.FormatValidator,
|
||||
nextTarget: transformer.NewPayloadSizeLimiter(transformer.Params{
|
||||
Key: key,
|
||||
NextTargetInit: func() transformer.ObjectWriter { return objectwriter.New(prm) },
|
||||
NetworkState: prm.Config.NetworkState,
|
||||
MaxSize: maxPayloadSz,
|
||||
WithoutHomomorphicHash: containerSDK.IsHomomorphicHashingDisabled(prm.Container),
|
||||
SessionToken: sToken,
|
||||
}),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func preparePrm(prm *objectwriter.Params) error {
|
||||
var err error
|
||||
|
||||
// get latest network map
|
||||
nm, err := netmap.GetLatestNetworkMap(prm.Config.NetmapSource)
|
||||
if err != nil {
|
||||
//return fmt.Errorf("(%T) could not get latest network map: %w", p, err)
|
||||
return fmt.Errorf("could not get latest network map: %w", err)
|
||||
}
|
||||
|
||||
idCnr, ok := prm.Header.ContainerID()
|
||||
if !ok {
|
||||
return errors.New("missing container ID")
|
||||
}
|
||||
|
||||
// get container to store the object
|
||||
cnrInfo, err := prm.Config.ContainerSource.Get(idCnr)
|
||||
if err != nil {
|
||||
//return fmt.Errorf("(%T) could not get container by ID: %w", p, err)
|
||||
return fmt.Errorf("could not get container by ID: %w", err)
|
||||
}
|
||||
|
||||
prm.Container = cnrInfo.Value
|
||||
|
||||
// add common options
|
||||
prm.TraverseOpts = append(prm.TraverseOpts,
|
||||
// set processing container
|
||||
placement.ForContainer(prm.Container),
|
||||
)
|
||||
|
||||
if ech := prm.Header.ECHeader(); ech != nil {
|
||||
prm.TraverseOpts = append(prm.TraverseOpts,
|
||||
// set identifier of the processing object
|
||||
placement.ForObject(ech.Parent()),
|
||||
)
|
||||
} else if id, ok := prm.Header.ID(); ok {
|
||||
prm.TraverseOpts = append(prm.TraverseOpts,
|
||||
// set identifier of the processing object
|
||||
placement.ForObject(id),
|
||||
)
|
||||
}
|
||||
|
||||
// create placement builder from network map
|
||||
builder := placement.NewNetworkMapBuilder(nm)
|
||||
|
||||
if prm.Common.LocalOnly() {
|
||||
// restrict success count to 1 stored copy (to local storage)
|
||||
prm.TraverseOpts = append(prm.TraverseOpts, placement.SuccessAfter(1))
|
||||
|
||||
// use local-only placement builder
|
||||
builder = util.NewLocalPlacement(builder, prm.Config.NetmapKeys)
|
||||
}
|
||||
|
||||
// set placement builder
|
||||
prm.TraverseOpts = append(prm.TraverseOpts, placement.UseBuilder(builder))
|
||||
|
||||
return nil
|
||||
}
|
|
@ -1,4 +1,4 @@
|
|||
package putsvc
|
||||
package target
|
||||
|
||||
import (
|
||||
"bytes"
|
|
@ -1,4 +1,4 @@
|
|||
package putsvc
|
||||
package writer
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
@ -13,23 +13,23 @@ import (
|
|||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
type nodeIterator struct {
|
||||
traversal
|
||||
cfg *cfg
|
||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
Why is it public now? Why is it public now?
aarifullin
commented
The file has been moved to another package (now it's within The file has been moved to another package (now it's within `common/writer`) and we need to access some fields
|
||||
type NodeIterator struct {
|
||||
Traversal
|
||||
cfg *Config
|
||||
}
|
||||
|
||||
func (c *cfg) newNodeIterator(opts []placement.Option) *nodeIterator {
|
||||
return &nodeIterator{
|
||||
traversal: traversal{
|
||||
opts: opts,
|
||||
mExclude: make(map[string]*bool),
|
||||
func (c *Config) NewNodeIterator(opts []placement.Option) *NodeIterator {
|
||||
return &NodeIterator{
|
||||
Traversal: Traversal{
|
||||
Opts: opts,
|
||||
Exclude: make(map[string]*bool),
|
||||
},
|
||||
cfg: c,
|
||||
}
|
||||
}
|
||||
|
||||
func (n *nodeIterator) forEachNode(ctx context.Context, f func(context.Context, nodeDesc) error) error {
|
||||
traverser, err := placement.NewTraverser(n.traversal.opts...)
|
||||
func (n *NodeIterator) ForEachNode(ctx context.Context, f func(context.Context, NodeDescriptor) error) error {
|
||||
traverser, err := placement.NewTraverser(n.Traversal.Opts...)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not create object placement traverser: %w", err)
|
||||
}
|
||||
|
@ -56,10 +56,10 @@ func (n *nodeIterator) forEachNode(ctx context.Context, f func(context.Context,
|
|||
}
|
||||
|
||||
// perform additional container broadcast if needed
|
||||
if n.traversal.submitPrimaryPlacementFinish() {
|
||||
err := n.forEachNode(ctx, f)
|
||||
if n.Traversal.submitPrimaryPlacementFinish() {
|
||||
err := n.ForEachNode(ctx, f)
|
||||
if err != nil {
|
||||
n.cfg.log.Error(logs.PutAdditionalContainerBroadcastFailure, zap.Error(err))
|
||||
n.cfg.Logger.Error(logs.PutAdditionalContainerBroadcastFailure, zap.Error(err))
|
||||
// we don't fail primary operation because of broadcast failure
|
||||
}
|
||||
}
|
||||
|
@ -67,11 +67,11 @@ func (n *nodeIterator) forEachNode(ctx context.Context, f func(context.Context,
|
|||
return nil
|
||||
}
|
||||
|
||||
func (n *nodeIterator) forEachAddress(ctx context.Context, traverser *placement.Traverser, addrs []placement.Node, f func(context.Context, nodeDesc) error, resErr *atomic.Value) bool {
|
||||
func (n *NodeIterator) forEachAddress(ctx context.Context, traverser *placement.Traverser, addrs []placement.Node, f func(context.Context, NodeDescriptor) error, resErr *atomic.Value) bool {
|
||||
var wg sync.WaitGroup
|
||||
|
||||
for _, addr := range addrs {
|
||||
if ok := n.mExclude[string(addr.PublicKey())]; ok != nil {
|
||||
if ok := n.Exclude[string(addr.PublicKey())]; ok != nil {
|
||||
if *ok {
|
||||
traverser.SubmitSuccess()
|
||||
}
|
||||
|
@ -86,10 +86,10 @@ func (n *nodeIterator) forEachAddress(ctx context.Context, traverser *placement.
|
|||
if err := workerPool.Submit(func() {
|
||||
defer wg.Done()
|
||||
|
||||
err := f(ctx, nodeDesc{local: isLocal, info: addr})
|
||||
err := f(ctx, NodeDescriptor{Local: isLocal, Info: addr})
|
||||
if err != nil {
|
||||
resErr.Store(err)
|
||||
svcutil.LogServiceError(n.cfg.log, "PUT", addr.Addresses(), err)
|
||||
svcutil.LogServiceError(n.cfg.Logger, "PUT", addr.Addresses(), err)
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -97,7 +97,7 @@ func (n *nodeIterator) forEachAddress(ctx context.Context, traverser *placement.
|
|||
*item = true
|
||||
}); err != nil {
|
||||
wg.Done()
|
||||
svcutil.LogWorkerPoolError(n.cfg.log, "PUT", err)
|
||||
svcutil.LogWorkerPoolError(n.cfg.Logger, "PUT", err)
|
||||
return true
|
||||
}
|
||||
|
||||
|
@ -105,7 +105,7 @@ func (n *nodeIterator) forEachAddress(ctx context.Context, traverser *placement.
|
|||
// in subsequent container broadcast. Note that we don't
|
||||
// process this node during broadcast if primary placement
|
||||
// on it failed.
|
||||
n.traversal.submitProcessed(addr, item)
|
||||
n.Traversal.submitProcessed(addr, item)
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
|
@ -113,6 +113,6 @@ func (n *nodeIterator) forEachAddress(ctx context.Context, traverser *placement.
|
|||
return false
|
||||
}
|
||||
|
||||
func needAdditionalBroadcast(obj *objectSDK.Object, localOnly bool) bool {
|
||||
func NeedAdditionalBroadcast(obj *objectSDK.Object, localOnly bool) bool {
|
||||
return len(obj.Children()) > 0 || (!localOnly && (obj.Type() == objectSDK.TypeTombstone || obj.Type() == objectSDK.TypeLock))
|
||||
}
|
|
@ -1,4 +1,4 @@
|
|||
package putsvc
|
||||
package writer
|
||||
|
||||
import (
|
||||
"context"
|
|
@ -1,4 +1,4 @@
|
|||
package putsvc
|
||||
package writer
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
@ -13,47 +13,47 @@ type preparedObjectTarget interface {
|
|||
WriteObject(context.Context, *objectSDK.Object, object.ContentMeta) error
|
||||
}
|
||||
|
||||
type distributedTarget struct {
|
||||
type distributedWriter struct {
|
||||
cfg *Config
|
||||
|
||||
placementOpts []placement.Option
|
||||
|
||||
obj *objectSDK.Object
|
||||
objMeta object.ContentMeta
|
||||
|
||||
*cfg
|
||||
nodeTargetInitializer func(NodeDescriptor) preparedObjectTarget
|
||||
|
||||
nodeTargetInitializer func(nodeDesc) preparedObjectTarget
|
||||
|
||||
relay func(context.Context, nodeDesc) error
|
||||
relay func(context.Context, NodeDescriptor) error
|
||||
|
||||
resetSuccessAfterOnBroadcast bool
|
||||
}
|
||||
|
||||
// parameters and state of container traversal.
|
||||
type traversal struct {
|
||||
opts []placement.Option
|
||||
// parameters and state of container Traversal.
|
||||
type Traversal struct {
|
||||
Opts []placement.Option
|
||||
|
||||
// need of additional broadcast after the object is saved
|
||||
extraBroadcastEnabled bool
|
||||
ExtraBroadcastEnabled bool
|
||||
|
||||
// container nodes which was processed during the primary object placement
|
||||
mExclude map[string]*bool
|
||||
Exclude map[string]*bool
|
||||
|
||||
resetSuccessAfterOnBroadcast bool
|
||||
ResetSuccessAfterOnBroadcast bool
|
||||
}
|
||||
|
||||
// updates traversal parameters after the primary placement finish and
|
||||
// returns true if additional container broadcast is needed.
|
||||
func (x *traversal) submitPrimaryPlacementFinish() bool {
|
||||
if x.extraBroadcastEnabled {
|
||||
func (x *Traversal) submitPrimaryPlacementFinish() bool {
|
||||
if x.ExtraBroadcastEnabled {
|
||||
// do not track success during container broadcast (best-effort)
|
||||
x.opts = append(x.opts, placement.WithoutSuccessTracking())
|
||||
x.Opts = append(x.Opts, placement.WithoutSuccessTracking())
|
||||
|
||||
if x.resetSuccessAfterOnBroadcast {
|
||||
x.opts = append(x.opts, placement.ResetSuccessAfter())
|
||||
if x.ResetSuccessAfterOnBroadcast {
|
||||
x.Opts = append(x.Opts, placement.ResetSuccessAfter())
|
||||
}
|
||||
|
||||
// avoid 2nd broadcast
|
||||
x.extraBroadcastEnabled = false
|
||||
x.ExtraBroadcastEnabled = false
|
||||
|
||||
return true
|
||||
}
|
||||
|
@ -62,22 +62,22 @@ func (x *traversal) submitPrimaryPlacementFinish() bool {
|
|||
}
|
||||
|
||||
// marks the container node as processed during the primary object placement.
|
||||
func (x *traversal) submitProcessed(n placement.Node, item *bool) {
|
||||
if x.extraBroadcastEnabled {
|
||||
func (x *Traversal) submitProcessed(n placement.Node, item *bool) {
|
||||
if x.ExtraBroadcastEnabled {
|
||||
key := string(n.PublicKey())
|
||||
|
||||
if x.mExclude == nil {
|
||||
x.mExclude = make(map[string]*bool, 1)
|
||||
if x.Exclude == nil {
|
||||
x.Exclude = make(map[string]*bool, 1)
|
||||
}
|
||||
|
||||
x.mExclude[key] = item
|
||||
x.Exclude[key] = item
|
||||
}
|
||||
}
|
||||
|
||||
type nodeDesc struct {
|
||||
local bool
|
||||
type NodeDescriptor struct {
|
||||
Local bool
|
||||
|
||||
info placement.Node
|
||||
Info placement.Node
|
||||
}
|
||||
|
||||
// errIncompletePut is returned if processing on a container fails.
|
||||
|
@ -96,19 +96,19 @@ func (x errIncompletePut) Error() string {
|
|||
}
|
||||
|
||||
// WriteObject implements the transformer.ObjectWriter interface.
|
||||
func (t *distributedTarget) WriteObject(ctx context.Context, obj *objectSDK.Object) error {
|
||||
func (t *distributedWriter) WriteObject(ctx context.Context, obj *objectSDK.Object) error {
|
||||
t.obj = obj
|
||||
|
||||
var err error
|
||||
|
||||
if t.objMeta, err = t.fmtValidator.ValidateContent(t.obj); err != nil {
|
||||
if t.objMeta, err = t.cfg.FormatValidator.ValidateContent(t.obj); err != nil {
|
||||
return fmt.Errorf("(%T) could not validate payload content: %w", t, err)
|
||||
}
|
||||
return t.iteratePlacement(ctx)
|
||||
}
|
||||
|
||||
func (t *distributedTarget) sendObject(ctx context.Context, node nodeDesc) error {
|
||||
if !node.local && t.relay != nil {
|
||||
func (t *distributedWriter) sendObject(ctx context.Context, node NodeDescriptor) error {
|
||||
if !node.Local && t.relay != nil {
|
||||
return t.relay(ctx, node)
|
||||
}
|
||||
|
||||
|
@ -121,11 +121,11 @@ func (t *distributedTarget) sendObject(ctx context.Context, node nodeDesc) error
|
|||
return nil
|
||||
}
|
||||
|
||||
func (t *distributedTarget) iteratePlacement(ctx context.Context) error {
|
||||
func (t *distributedWriter) iteratePlacement(ctx context.Context) error {
|
||||
id, _ := t.obj.ID()
|
||||
|
||||
iter := t.cfg.newNodeIterator(append(t.placementOpts, placement.ForObject(id)))
|
||||
iter.extraBroadcastEnabled = needAdditionalBroadcast(t.obj, false /* Distributed target is for cluster-wide PUT */)
|
||||
iter.resetSuccessAfterOnBroadcast = t.resetSuccessAfterOnBroadcast
|
||||
return iter.forEachNode(ctx, t.sendObject)
|
||||
iter := t.cfg.NewNodeIterator(append(t.placementOpts, placement.ForObject(id)))
|
||||
iter.ExtraBroadcastEnabled = NeedAdditionalBroadcast(t.obj, false /* Distributed target is for cluster-wide PUT */)
|
||||
iter.ResetSuccessAfterOnBroadcast = t.resetSuccessAfterOnBroadcast
|
||||
return iter.ForEachNode(ctx, t.sendObject)
|
||||
}
|
|
@ -1,4 +1,4 @@
|
|||
package putsvc
|
||||
package writer
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
@ -23,23 +23,23 @@ import (
|
|||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
||||
var _ transformer.ObjectWriter = (*ecWriter)(nil)
|
||||
var _ transformer.ObjectWriter = (*ECWriter)(nil)
|
||||
|
||||
var errUnsupportedECObject = errors.New("object is not supported for erasure coding")
|
||||
|
||||
type ecWriter struct {
|
||||
cfg *cfg
|
||||
placementOpts []placement.Option
|
||||
container containerSDK.Container
|
||||
key *ecdsa.PrivateKey
|
||||
commonPrm *svcutil.CommonPrm
|
||||
relay func(context.Context, client.NodeInfo, client.MultiAddressClient) error
|
||||
type ECWriter struct {
|
||||
Config *Config
|
||||
PlacementOpts []placement.Option
|
||||
Container containerSDK.Container
|
||||
Key *ecdsa.PrivateKey
|
||||
CommonPrm *svcutil.CommonPrm
|
||||
Relay func(context.Context, client.NodeInfo, client.MultiAddressClient) error
|
||||
|
||||
objMeta object.ContentMeta
|
||||
objMetaValid bool
|
||||
ObjectMeta object.ContentMeta
|
||||
ObjectMetaValid bool
|
||||
}
|
||||
|
||||
func (e *ecWriter) WriteObject(ctx context.Context, obj *objectSDK.Object) error {
|
||||
func (e *ECWriter) WriteObject(ctx context.Context, obj *objectSDK.Object) error {
|
||||
relayed, err := e.relayIfNotContainerNode(ctx, obj)
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -53,11 +53,11 @@ func (e *ecWriter) WriteObject(ctx context.Context, obj *objectSDK.Object) error
|
|||
return errUnsupportedECObject
|
||||
}
|
||||
|
||||
if !e.objMetaValid {
|
||||
if e.objMeta, err = e.cfg.fmtValidator.ValidateContent(obj); err != nil {
|
||||
if !e.ObjectMetaValid {
|
||||
if e.ObjectMeta, err = e.Config.FormatValidator.ValidateContent(obj); err != nil {
|
||||
return fmt.Errorf("(%T) could not validate payload content: %w", e, err)
|
||||
}
|
||||
e.objMetaValid = true
|
||||
e.ObjectMetaValid = true
|
||||
}
|
||||
|
||||
if obj.ECHeader() != nil {
|
||||
|
@ -66,8 +66,8 @@ func (e *ecWriter) WriteObject(ctx context.Context, obj *objectSDK.Object) error
|
|||
return e.writeRawObject(ctx, obj)
|
||||
}
|
||||
|
||||
func (e *ecWriter) relayIfNotContainerNode(ctx context.Context, obj *objectSDK.Object) (bool, error) {
|
||||
if e.relay == nil {
|
||||
func (e *ECWriter) relayIfNotContainerNode(ctx context.Context, obj *objectSDK.Object) (bool, error) {
|
||||
if e.Relay == nil {
|
||||
return false, nil
|
||||
}
|
||||
currentNodeIsContainerNode, err := e.currentNodeIsContainerNode()
|
||||
|
@ -90,8 +90,8 @@ func (e *ecWriter) relayIfNotContainerNode(ctx context.Context, obj *objectSDK.O
|
|||
return true, nil
|
||||
}
|
||||
|
||||
func (e *ecWriter) currentNodeIsContainerNode() (bool, error) {
|
||||
t, err := placement.NewTraverser(e.placementOpts...)
|
||||
func (e *ECWriter) currentNodeIsContainerNode() (bool, error) {
|
||||
t, err := placement.NewTraverser(e.PlacementOpts...)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
@ -101,7 +101,7 @@ func (e *ecWriter) currentNodeIsContainerNode() (bool, error) {
|
|||
break
|
||||
}
|
||||
for _, node := range nodes {
|
||||
if e.cfg.netmapKeys.IsLocalKey(node.PublicKey()) {
|
||||
if e.Config.NetmapKeys.IsLocalKey(node.PublicKey()) {
|
||||
return true, nil
|
||||
}
|
||||
}
|
||||
|
@ -109,8 +109,8 @@ func (e *ecWriter) currentNodeIsContainerNode() (bool, error) {
|
|||
return false, nil
|
||||
}
|
||||
|
||||
func (e *ecWriter) relayToContainerNode(ctx context.Context, objID oid.ID, index uint32) error {
|
||||
t, err := placement.NewTraverser(append(e.placementOpts, placement.ForObject(objID))...)
|
||||
func (e *ECWriter) relayToContainerNode(ctx context.Context, objID oid.ID, index uint32) error {
|
||||
t, err := placement.NewTraverser(append(e.PlacementOpts, placement.ForObject(objID))...)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -126,18 +126,18 @@ func (e *ecWriter) relayToContainerNode(ctx context.Context, objID oid.ID, index
|
|||
var info client.NodeInfo
|
||||
client.NodeInfoFromNetmapElement(&info, node)
|
||||
|
||||
c, err := e.cfg.clientConstructor.Get(info)
|
||||
c, err := e.Config.ClientConstructor.Get(info)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not create SDK client %s: %w", info.AddressGroup(), err)
|
||||
}
|
||||
|
||||
completed := make(chan interface{})
|
||||
if poolErr := e.cfg.remotePool.Submit(func() {
|
||||
if poolErr := e.Config.RemotePool.Submit(func() {
|
||||
defer close(completed)
|
||||
err = e.relay(ctx, info, c)
|
||||
err = e.Relay(ctx, info, c)
|
||||
}); poolErr != nil {
|
||||
close(completed)
|
||||
svcutil.LogWorkerPoolError(e.cfg.log, "PUT", poolErr)
|
||||
svcutil.LogWorkerPoolError(e.Config.Logger, "PUT", poolErr)
|
||||
return poolErr
|
||||
}
|
||||
<-completed
|
||||
|
@ -145,7 +145,7 @@ func (e *ecWriter) relayToContainerNode(ctx context.Context, objID oid.ID, index
|
|||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
e.cfg.log.Logger.Warn(logs.ECFailedToSendToContainerNode, zap.Stringers("address_group", info.AddressGroup()))
|
||||
e.Config.Logger.Logger.Warn(logs.ECFailedToSendToContainerNode, zap.Stringers("address_group", info.AddressGroup()))
|
||||
lastErr = err
|
||||
}
|
||||
}
|
||||
|
@ -157,12 +157,12 @@ func (e *ecWriter) relayToContainerNode(ctx context.Context, objID oid.ID, index
|
|||
}
|
||||
}
|
||||
|
||||
func (e *ecWriter) writeECPart(ctx context.Context, obj *objectSDK.Object) error {
|
||||
if e.commonPrm.LocalOnly() {
|
||||
func (e *ECWriter) writeECPart(ctx context.Context, obj *objectSDK.Object) error {
|
||||
if e.CommonPrm.LocalOnly() {
|
||||
return e.writePartLocal(ctx, obj)
|
||||
}
|
||||
|
||||
t, err := placement.NewTraverser(append(e.placementOpts, placement.ForObject(obj.ECHeader().Parent()))...)
|
||||
t, err := placement.NewTraverser(append(e.PlacementOpts, placement.ForObject(obj.ECHeader().Parent()))...)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -187,18 +187,18 @@ func (e *ecWriter) writeECPart(ctx context.Context, obj *objectSDK.Object) error
|
|||
return nil
|
||||
}
|
||||
|
||||
func (e *ecWriter) writeRawObject(ctx context.Context, obj *objectSDK.Object) error {
|
||||
func (e *ECWriter) writeRawObject(ctx context.Context, obj *objectSDK.Object) error {
|
||||
// now only single EC policy is supported
|
||||
c, err := erasurecode.NewConstructor(policy.ECDataCount(e.container.PlacementPolicy()), policy.ECParityCount(e.container.PlacementPolicy()))
|
||||
c, err := erasurecode.NewConstructor(policy.ECDataCount(e.Container.PlacementPolicy()), policy.ECParityCount(e.Container.PlacementPolicy()))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
parts, err := c.Split(obj, e.key)
|
||||
parts, err := c.Split(obj, e.Key)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
objID, _ := obj.ID()
|
||||
t, err := placement.NewTraverser(append(e.placementOpts, placement.ForObject(objID))...)
|
||||
t, err := placement.NewTraverser(append(e.PlacementOpts, placement.ForObject(objID))...)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -230,7 +230,7 @@ func (e *ecWriter) writeRawObject(ctx context.Context, obj *objectSDK.Object) er
|
|||
return nil
|
||||
}
|
||||
|
||||
func (e *ecWriter) writePart(ctx context.Context, obj *objectSDK.Object, partIdx int, nodes []placement.Node, visited []atomic.Bool) error {
|
||||
func (e *ECWriter) writePart(ctx context.Context, obj *objectSDK.Object, partIdx int, nodes []placement.Node, visited []atomic.Bool) error {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
|
@ -243,7 +243,7 @@ func (e *ecWriter) writePart(ctx context.Context, obj *objectSDK.Object, partIdx
|
|||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
e.cfg.log.Warn(logs.ECFailedToSaveECPart, zap.Stringer("part_address", object.AddressOf(obj)),
|
||||
e.Config.Logger.Warn(logs.ECFailedToSaveECPart, zap.Stringer("part_address", object.AddressOf(obj)),
|
||||
zap.Stringer("parent_address", obj.ECHeader().Parent()), zap.Int("part_index", partIdx),
|
||||
zap.String("node", hex.EncodeToString(node.PublicKey())), zap.Error(err))
|
||||
|
||||
|
@ -267,7 +267,7 @@ func (e *ecWriter) writePart(ctx context.Context, obj *objectSDK.Object, partIdx
|
|||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
e.cfg.log.Warn(logs.ECFailedToSaveECPart, zap.Stringer("part_address", object.AddressOf(obj)),
|
||||
e.Config.Logger.Warn(logs.ECFailedToSaveECPart, zap.Stringer("part_address", object.AddressOf(obj)),
|
||||
zap.Stringer("parent_address", obj.ECHeader().Parent()), zap.Int("part_index", partIdx),
|
||||
zap.String("node", hex.EncodeToString(node.PublicKey())),
|
||||
zap.Error(err))
|
||||
|
@ -291,7 +291,7 @@ func (e *ecWriter) writePart(ctx context.Context, obj *objectSDK.Object, partIdx
|
|||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
e.cfg.log.Warn(logs.ECFailedToSaveECPart, zap.Stringer("part_address", object.AddressOf(obj)),
|
||||
e.Config.Logger.Warn(logs.ECFailedToSaveECPart, zap.Stringer("part_address", object.AddressOf(obj)),
|
||||
zap.Stringer("parent_address", obj.ECHeader().Parent()), zap.Int("part_index", partIdx),
|
||||
zap.String("node", hex.EncodeToString(node.PublicKey())),
|
||||
zap.Error(err))
|
||||
|
@ -300,22 +300,22 @@ func (e *ecWriter) writePart(ctx context.Context, obj *objectSDK.Object, partIdx
|
|||
return fmt.Errorf("failed to save EC chunk %s to any node", object.AddressOf(obj))
|
||||
}
|
||||
|
||||
func (e *ecWriter) putECPartToNode(ctx context.Context, obj *objectSDK.Object, node placement.Node) error {
|
||||
if e.cfg.netmapKeys.IsLocalKey(node.PublicKey()) {
|
||||
func (e *ECWriter) putECPartToNode(ctx context.Context, obj *objectSDK.Object, node placement.Node) error {
|
||||
if e.Config.NetmapKeys.IsLocalKey(node.PublicKey()) {
|
||||
return e.writePartLocal(ctx, obj)
|
||||
}
|
||||
return e.writePartRemote(ctx, obj, node)
|
||||
}
|
||||
|
||||
func (e *ecWriter) writePartLocal(ctx context.Context, obj *objectSDK.Object) error {
|
||||
func (e *ECWriter) writePartLocal(ctx context.Context, obj *objectSDK.Object) error {
|
||||
var err error
|
||||
localTarget := localTarget{
|
||||
storage: e.cfg.localStore,
|
||||
localTarget := LocalTarget{
|
||||
Storage: e.Config.LocalStore,
|
||||
}
|
||||
completed := make(chan interface{})
|
||||
if poolErr := e.cfg.localPool.Submit(func() {
|
||||
if poolErr := e.Config.LocalPool.Submit(func() {
|
||||
defer close(completed)
|
||||
err = localTarget.WriteObject(ctx, obj, e.objMeta)
|
||||
err = localTarget.WriteObject(ctx, obj, e.ObjectMeta)
|
||||
}); poolErr != nil {
|
||||
close(completed)
|
||||
return poolErr
|
||||
|
@ -324,22 +324,22 @@ func (e *ecWriter) writePartLocal(ctx context.Context, obj *objectSDK.Object) er
|
|||
return err
|
||||
}
|
||||
|
||||
func (e *ecWriter) writePartRemote(ctx context.Context, obj *objectSDK.Object, node placement.Node) error {
|
||||
func (e *ECWriter) writePartRemote(ctx context.Context, obj *objectSDK.Object, node placement.Node) error {
|
||||
var clientNodeInfo client.NodeInfo
|
||||
client.NodeInfoFromNetmapElement(&clientNodeInfo, node)
|
||||
|
||||
remoteTaget := remoteTarget{
|
||||
privateKey: e.key,
|
||||
clientConstructor: e.cfg.clientConstructor,
|
||||
commonPrm: e.commonPrm,
|
||||
remoteTaget := remoteWriter{
|
||||
privateKey: e.Key,
|
||||
clientConstructor: e.Config.ClientConstructor,
|
||||
commonPrm: e.CommonPrm,
|
||||
nodeInfo: clientNodeInfo,
|
||||
}
|
||||
|
||||
var err error
|
||||
completed := make(chan interface{})
|
||||
if poolErr := e.cfg.remotePool.Submit(func() {
|
||||
if poolErr := e.Config.RemotePool.Submit(func() {
|
||||
defer close(completed)
|
||||
err = remoteTaget.WriteObject(ctx, obj, e.objMeta)
|
||||
err = remoteTaget.WriteObject(ctx, obj, e.ObjectMeta)
|
||||
}); poolErr != nil {
|
||||
close(completed)
|
||||
return poolErr
|
|
@ -1,4 +1,4 @@
|
|||
package putsvc
|
||||
package writer
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
@ -24,19 +24,19 @@ type ObjectStorage interface {
|
|||
IsLocked(context.Context, oid.Address) (bool, error)
|
||||
}
|
||||
|
||||
type localTarget struct {
|
||||
storage ObjectStorage
|
||||
type LocalTarget struct {
|
||||
Storage ObjectStorage
|
||||
}
|
||||
|
||||
func (t localTarget) WriteObject(ctx context.Context, obj *objectSDK.Object, meta objectCore.ContentMeta) error {
|
||||
func (t LocalTarget) WriteObject(ctx context.Context, obj *objectSDK.Object, meta objectCore.ContentMeta) error {
|
||||
switch meta.Type() {
|
||||
case objectSDK.TypeTombstone:
|
||||
err := t.storage.Delete(ctx, objectCore.AddressOf(obj), meta.Objects())
|
||||
err := t.Storage.Delete(ctx, objectCore.AddressOf(obj), meta.Objects())
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not delete objects from tombstone locally: %w", err)
|
||||
}
|
||||
case objectSDK.TypeLock:
|
||||
err := t.storage.Lock(ctx, objectCore.AddressOf(obj), meta.Objects())
|
||||
err := t.Storage.Lock(ctx, objectCore.AddressOf(obj), meta.Objects())
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not lock object from lock objects locally: %w", err)
|
||||
}
|
||||
|
@ -44,7 +44,7 @@ func (t localTarget) WriteObject(ctx context.Context, obj *objectSDK.Object, met
|
|||
// objects that do not change meta storage
|
||||
}
|
||||
|
||||
if err := t.storage.Put(ctx, obj); err != nil {
|
||||
if err := t.Storage.Put(ctx, obj); err != nil {
|
||||
return fmt.Errorf("(%T) could not put object to local storage: %w", t, err)
|
||||
}
|
||||
return nil
|
|
@ -1,4 +1,4 @@
|
|||
package putsvc
|
||||
package writer
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
@ -16,7 +16,7 @@ import (
|
|||
"google.golang.org/grpc/status"
|
||||
)
|
||||
|
||||
type remoteTarget struct {
|
||||
type remoteWriter struct {
|
||||
privateKey *ecdsa.PrivateKey
|
||||
|
||||
commonPrm *util.CommonPrm
|
||||
|
@ -41,7 +41,7 @@ type RemotePutPrm struct {
|
|||
obj *objectSDK.Object
|
||||
}
|
||||
|
||||
func (t *remoteTarget) WriteObject(ctx context.Context, obj *objectSDK.Object, _ objectcore.ContentMeta) error {
|
||||
func (t *remoteWriter) WriteObject(ctx context.Context, obj *objectSDK.Object, _ objectcore.ContentMeta) error {
|
||||
c, err := t.clientConstructor.Get(t.nodeInfo)
|
||||
if err != nil {
|
||||
return fmt.Errorf("(%T) could not create SDK client %s: %w", t, t.nodeInfo, err)
|
||||
|
@ -64,7 +64,7 @@ func (t *remoteTarget) WriteObject(ctx context.Context, obj *objectSDK.Object, _
|
|||
return t.putStream(ctx, prm)
|
||||
}
|
||||
|
||||
func (t *remoteTarget) putStream(ctx context.Context, prm internalclient.PutObjectPrm) error {
|
||||
func (t *remoteWriter) putStream(ctx context.Context, prm internalclient.PutObjectPrm) error {
|
||||
_, err := internalclient.PutObject(ctx, prm)
|
||||
if err != nil {
|
||||
return fmt.Errorf("(%T) could not put object to %s: %w", t, t.nodeInfo.AddressGroup(), err)
|
||||
|
@ -72,7 +72,7 @@ func (t *remoteTarget) putStream(ctx context.Context, prm internalclient.PutObje
|
|||
return nil
|
||||
}
|
||||
|
||||
func (t *remoteTarget) putSingle(ctx context.Context, prm internalclient.PutObjectPrm) error {
|
||||
func (t *remoteWriter) putSingle(ctx context.Context, prm internalclient.PutObjectPrm) error {
|
||||
_, err := internalclient.PutObjectSingle(ctx, prm)
|
||||
if err != nil {
|
||||
return fmt.Errorf("(%T) could not put single object to %s: %w", t, t.nodeInfo.AddressGroup(), err)
|
||||
|
@ -113,7 +113,7 @@ func (s *RemoteSender) PutObject(ctx context.Context, p *RemotePutPrm) error {
|
|||
return err
|
||||
}
|
||||
|
||||
t := &remoteTarget{
|
||||
t := &remoteWriter{
|
||||
privateKey: key,
|
||||
clientConstructor: s.clientConstructor,
|
||||
}
|
183
pkg/services/object/common/writer/writer.go
Normal file
|
@ -0,0 +1,183 @@
|
|||
package writer
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/ecdsa"
|
||||
"fmt"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/policy"
|
||||
objutil "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||
containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
|
||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/transformer"
|
||||
)
|
||||
|
||||
type MaxSizeSource interface {
|
||||
// MaxObjectSize returns maximum payload size
|
||||
// of physically stored object in system.
|
||||
//
|
||||
// Must return 0 if value can not be obtained.
|
||||
MaxObjectSize() uint64
|
||||
}
|
||||
|
||||
type ClientConstructor interface {
|
||||
Get(client.NodeInfo) (client.MultiAddressClient, error)
|
||||
}
|
||||
|
||||
type InnerRing interface {
|
||||
InnerRingKeys() ([][]byte, error)
|
||||
}
|
||||
|
||||
type FormatValidatorConfig interface {
|
||||
VerifySessionTokenIssuer() bool
|
||||
}
|
||||
|
||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
Why don't you use Why don't you use `Config` like in other code? "Static" is not even a noun (or doesn't have the meaning we are trying to convey here).
aarifullin
commented
Ok, I'll rename it back Ok, I'll rename it back
aarifullin
commented
BTW, I have named it BTW, I have named it `Static` because I was not sure that using the word `Config` is appropriate for non-service package - as it was previosly in `put` service package.
`Static` - I meant that there parameters are set during initialzation only unlike field in `Params`
|
||||
// Config represents a set of static parameters that are established during
|
||||
// the initialization phase of all services.
|
||||
type Config struct {
|
||||
KeyStorage *objutil.KeyStorage
|
||||
|
||||
MaxSizeSrc MaxSizeSource
|
||||
|
||||
LocalStore ObjectStorage
|
||||
|
||||
ContainerSource container.Source
|
||||
|
||||
NetmapSource netmap.Source
|
||||
|
||||
RemotePool, LocalPool util.WorkerPool
|
||||
|
||||
NetmapKeys netmap.AnnouncedKeys
|
||||
|
||||
FormatValidator *object.FormatValidator
|
||||
|
||||
NetworkState netmap.State
|
||||
|
||||
ClientConstructor ClientConstructor
|
||||
|
||||
Logger *logger.Logger
|
||||
|
||||
VerifySessionTokenIssuer bool
|
||||
}
|
||||
|
||||
type Option func(*Config)
|
||||
|
||||
func WithWorkerPools(remote, local util.WorkerPool) Option {
|
||||
return func(c *Config) {
|
||||
c.RemotePool, c.LocalPool = remote, local
|
||||
}
|
||||
}
|
||||
|
||||
func WithLogger(l *logger.Logger) Option {
|
||||
return func(c *Config) {
|
||||
c.Logger = l
|
||||
}
|
||||
}
|
||||
|
||||
func WithVerifySessionTokenIssuer(v bool) Option {
|
||||
return func(c *Config) {
|
||||
c.VerifySessionTokenIssuer = v
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Config) getWorkerPool(pub []byte) (util.WorkerPool, bool) {
|
||||
if c.NetmapKeys.IsLocalKey(pub) {
|
||||
return c.LocalPool, true
|
||||
}
|
||||
return c.RemotePool, false
|
||||
}
|
||||
|
||||
type Params struct {
|
||||
Config *Config
|
||||
|
||||
Common *objutil.CommonPrm
|
||||
|
||||
Header *objectSDK.Object
|
||||
|
||||
Container containerSDK.Container
|
||||
|
||||
TraverseOpts []placement.Option
|
||||
|
||||
Relay func(context.Context, client.NodeInfo, client.MultiAddressClient) error
|
||||
|
||||
SignRequestPrivateKey *ecdsa.PrivateKey
|
||||
}
|
||||
|
||||
func New(prm *Params) transformer.ObjectWriter {
|
||||
if container.IsECContainer(prm.Container) && object.IsECSupported(prm.Header) {
|
||||
return newECWriter(prm)
|
||||
}
|
||||
return newDefaultObjectWriter(prm, false)
|
||||
}
|
||||
|
||||
func newDefaultObjectWriter(prm *Params, forECPlacement bool) transformer.ObjectWriter {
|
||||
var relay func(context.Context, NodeDescriptor) error
|
||||
if prm.Relay != nil {
|
||||
relay = func(ctx context.Context, node NodeDescriptor) error {
|
||||
var info client.NodeInfo
|
||||
|
||||
client.NodeInfoFromNetmapElement(&info, node.Info)
|
||||
|
||||
c, err := prm.Config.ClientConstructor.Get(info)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not create SDK client %s: %w", info.AddressGroup(), err)
|
||||
}
|
||||
|
||||
return prm.Relay(ctx, info, c)
|
||||
}
|
||||
}
|
||||
|
||||
var resetSuccessAfterOnBroadcast bool
|
||||
traverseOpts := prm.TraverseOpts
|
||||
if forECPlacement && !prm.Common.LocalOnly() {
|
||||
// save non-regular and linking object to EC container.
|
||||
// EC 2.1 -> REP 2, EC 2.2 -> REP 3 etc.
|
||||
traverseOpts = append(traverseOpts, placement.SuccessAfter(uint32(policy.ECParityCount(prm.Container.PlacementPolicy())+1)))
|
||||
resetSuccessAfterOnBroadcast = true
|
||||
}
|
||||
|
||||
return &distributedWriter{
|
||||
cfg: prm.Config,
|
||||
placementOpts: traverseOpts,
|
||||
resetSuccessAfterOnBroadcast: resetSuccessAfterOnBroadcast,
|
||||
nodeTargetInitializer: func(node NodeDescriptor) preparedObjectTarget {
|
||||
if node.Local {
|
||||
return LocalTarget{
|
||||
Storage: prm.Config.LocalStore,
|
||||
}
|
||||
}
|
||||
|
||||
rt := &remoteWriter{
|
||||
privateKey: prm.SignRequestPrivateKey,
|
||||
commonPrm: prm.Common,
|
||||
clientConstructor: prm.Config.ClientConstructor,
|
||||
}
|
||||
|
||||
client.NodeInfoFromNetmapElement(&rt.nodeInfo, node.Info)
|
||||
|
||||
return rt
|
||||
},
|
||||
relay: relay,
|
||||
}
|
||||
}
|
||||
|
||||
func newECWriter(prm *Params) transformer.ObjectWriter {
|
||||
return &objectWriterDispatcher{
|
||||
ecWriter: &ECWriter{
|
||||
Config: prm.Config,
|
||||
PlacementOpts: append(prm.TraverseOpts, placement.WithCopyNumbers(nil)), // copies number ignored for EC
|
||||
Container: prm.Container,
|
||||
Key: prm.SignRequestPrivateKey,
|
||||
CommonPrm: prm.Common,
|
||||
Relay: prm.Relay,
|
||||
},
|
||||
repWriter: newDefaultObjectWriter(prm, true),
|
||||
}
|
||||
}
|
|
@ -2,43 +2,40 @@ package patchsvc
|
|||
|
||||
import (
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object"
|
||||
objectwriter "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/common/writer"
|
||||
getsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/get"
|
||||
putsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/put"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
|
||||
)
|
||||
|
||||
// Service implements Put operation of Object service v2.
|
||||
type Service struct {
|
||||
keyStorage *util.KeyStorage
|
||||
*objectwriter.Config
|
||||
|
||||
getSvc *getsvc.Service
|
||||
|
||||
putSvc *putsvc.Service
|
||||
}
|
||||
|
||||
// NewService constructs Service instance from provided options.
|
||||
func NewService(ks *util.KeyStorage, getSvc *getsvc.Service, putSvc *putsvc.Service) *Service {
|
||||
//
|
||||
// Patch service can use the same objectwriter.Config initializied by Put service.
|
||||
func NewService(cfg *objectwriter.Config,
|
||||
getSvc *getsvc.Service,
|
||||
) *Service {
|
||||
return &Service{
|
||||
keyStorage: ks,
|
||||
Config: cfg,
|
||||
|
||||
getSvc: getSvc,
|
||||
|
||||
putSvc: putSvc,
|
||||
}
|
||||
}
|
||||
|
||||
// Put calls internal service and returns v2 object streamer.
|
||||
func (s *Service) Patch() (object.PatchObjectStream, error) {
|
||||
nodeKey, err := s.keyStorage.GetKey(nil)
|
||||
nodeKey, err := s.Config.KeyStorage.GetKey(nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &Streamer{
|
||||
Config: s.Config,
|
||||
getSvc: s.getSvc,
|
||||
|
||||
putSvc: s.putSvc,
|
||||
|
||||
localNodeKey: nodeKey,
|
||||
}, nil
|
||||
}
|
||||
|
|
|
@ -9,8 +9,9 @@ import (
|
|||
|
||||
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
|
||||
refsV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/refs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/common/target"
|
||||
objectwriter "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/common/writer"
|
||||
getsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/get"
|
||||
putsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/put"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
|
@ -21,6 +22,8 @@ import (
|
|||
// Streamer for the patch handler is a pipeline that merges two incoming streams of patches
|
||||
// and original object payload chunks. The merged result is fed to Put stream target.
|
||||
type Streamer struct {
|
||||
*objectwriter.Config
|
||||
|
||||
// Patcher must be initialized at first Streamer.Send call.
|
||||
patcher patcher.PatchApplier
|
||||
|
||||
|
@ -28,8 +31,6 @@ type Streamer struct {
|
|||
|
||||
getSvc *getsvc.Service
|
||||
|
||||
putSvc *putsvc.Service
|
||||
|
||||
localNodeKey *ecdsa.PrivateKey
|
||||
}
|
||||
|
||||
|
@ -78,11 +79,6 @@ func (s *Streamer) init(ctx context.Context, req *objectV2.PatchRequest) error {
|
|||
localNodeKey: s.localNodeKey,
|
||||
}
|
||||
|
||||
putstm, err := s.putSvc.Put()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
hdr := hdrWithSig.GetHeader()
|
||||
oV2 := new(objectV2.Object)
|
||||
hV2 := new(objectV2.Header)
|
||||
|
@ -97,14 +93,14 @@ func (s *Streamer) init(ctx context.Context, req *objectV2.PatchRequest) error {
|
|||
}
|
||||
oV2.GetHeader().SetOwnerID(ownerID)
|
||||
|
||||
prm, err := s.putInitPrm(req, oV2)
|
||||
target, err := target.New(&objectwriter.Params{
|
||||
Config: s.Config,
|
||||
Common: commonPrm,
|
||||
Header: objectSDK.NewFromV2(oV2),
|
||||
SignRequestPrivateKey: s.localNodeKey,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = putstm.Init(ctx, prm)
|
||||
if err != nil {
|
||||
return err
|
||||
return fmt.Errorf("target creation: %w", err)
|
||||
}
|
||||
|
||||
patcherPrm := patcher.Params{
|
||||
|
@ -112,7 +108,7 @@ func (s *Streamer) init(ctx context.Context, req *objectV2.PatchRequest) error {
|
|||
|
||||
RangeProvider: rangeProvider,
|
||||
|
||||
ObjectWriter: putstm.Target(),
|
||||
ObjectWriter: target,
|
||||
}
|
||||
|
||||
s.patcher = patcher.New(patcherPrm)
|
||||
|
|
|
@ -6,31 +6,12 @@ import (
|
|||
"errors"
|
||||
"fmt"
|
||||
|
||||
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/refs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/session"
|
||||
putsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/put"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
|
||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
|
||||
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
||||
)
|
||||
|
||||
// putInitPrm initializes put paramerer for Put stream.
|
||||
func (s *Streamer) putInitPrm(req *objectV2.PatchRequest, obj *objectV2.Object) (*putsvc.PutInitPrm, error) {
|
||||
commonPrm, err := util.CommonPrmFromV2(req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
prm := new(putsvc.PutInitPrm)
|
||||
prm.WithObject(objectSDK.NewFromV2(obj)).
|
||||
WithCommonPrm(commonPrm).
|
||||
WithPrivateKey(s.localNodeKey)
|
||||
|
||||
return prm, nil
|
||||
}
|
||||
|
||||
func newOwnerID(vh *session.RequestVerificationHeader) (*refs.OwnerID, error) {
|
||||
for vh.GetOrigin() != nil {
|
||||
vh = vh.GetOrigin()
|
||||
|
|
|
@ -1,132 +1,66 @@
|
|||
package putsvc
|
||||
|
||||
import (
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||
objectwriter "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/common/writer"
|
||||
objutil "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
type MaxSizeSource interface {
|
||||
// MaxObjectSize returns maximum payload size
|
||||
// of physically stored object in system.
|
||||
//
|
||||
// Must return 0 if value can not be obtained.
|
||||
MaxObjectSize() uint64
|
||||
}
|
||||
|
||||
type Service struct {
|
||||
*cfg
|
||||
}
|
||||
|
||||
type Option func(*cfg)
|
||||
|
||||
type ClientConstructor interface {
|
||||
Get(client.NodeInfo) (client.MultiAddressClient, error)
|
||||
}
|
||||
|
||||
type InnerRing interface {
|
||||
InnerRingKeys() ([][]byte, error)
|
||||
}
|
||||
|
||||
type FormatValidatorConfig interface {
|
||||
VerifySessionTokenIssuer() bool
|
||||
}
|
||||
|
||||
type cfg struct {
|
||||
keyStorage *objutil.KeyStorage
|
||||
|
||||
maxSizeSrc MaxSizeSource
|
||||
|
||||
localStore ObjectStorage
|
||||
|
||||
cnrSrc container.Source
|
||||
|
||||
netMapSrc netmap.Source
|
||||
|
||||
remotePool, localPool util.WorkerPool
|
||||
|
||||
netmapKeys netmap.AnnouncedKeys
|
||||
|
||||
fmtValidator *object.FormatValidator
|
||||
|
||||
networkState netmap.State
|
||||
|
||||
clientConstructor ClientConstructor
|
||||
|
||||
log *logger.Logger
|
||||
|
||||
verifySessionTokenIssuer bool
|
||||
*objectwriter.Config
|
||||
}
|
||||
|
||||
func NewService(ks *objutil.KeyStorage,
|
||||
cc ClientConstructor,
|
||||
ms MaxSizeSource,
|
||||
os ObjectStorage,
|
||||
cc objectwriter.ClientConstructor,
|
||||
ms objectwriter.MaxSizeSource,
|
||||
os objectwriter.ObjectStorage,
|
||||
cs container.Source,
|
||||
ns netmap.Source,
|
||||
nk netmap.AnnouncedKeys,
|
||||
nst netmap.State,
|
||||
ir InnerRing,
|
||||
opts ...Option,
|
||||
ir objectwriter.InnerRing,
|
||||
opts ...objectwriter.Option,
|
||||
) *Service {
|
||||
c := &cfg{
|
||||
remotePool: util.NewPseudoWorkerPool(),
|
||||
localPool: util.NewPseudoWorkerPool(),
|
||||
log: &logger.Logger{Logger: zap.L()},
|
||||
keyStorage: ks,
|
||||
clientConstructor: cc,
|
||||
maxSizeSrc: ms,
|
||||
localStore: os,
|
||||
cnrSrc: cs,
|
||||
netMapSrc: ns,
|
||||
netmapKeys: nk,
|
||||
networkState: nst,
|
||||
c := &objectwriter.Config{
|
||||
RemotePool: util.NewPseudoWorkerPool(),
|
||||
LocalPool: util.NewPseudoWorkerPool(),
|
||||
Logger: &logger.Logger{Logger: zap.L()},
|
||||
KeyStorage: ks,
|
||||
ClientConstructor: cc,
|
||||
MaxSizeSrc: ms,
|
||||
LocalStore: os,
|
||||
ContainerSource: cs,
|
||||
NetmapSource: ns,
|
||||
NetmapKeys: nk,
|
||||
NetworkState: nst,
|
||||
}
|
||||
|
||||
for i := range opts {
|
||||
opts[i](c)
|
||||
}
|
||||
|
||||
c.fmtValidator = object.NewFormatValidator(
|
||||
c.FormatValidator = object.NewFormatValidator(
|
||||
object.WithLockSource(os),
|
||||
object.WithNetState(nst),
|
||||
object.WithInnerRing(ir),
|
||||
object.WithNetmapSource(ns),
|
||||
object.WithContainersSource(cs),
|
||||
object.WithVerifySessionTokenIssuer(c.verifySessionTokenIssuer),
|
||||
object.WithLogger(c.log),
|
||||
object.WithVerifySessionTokenIssuer(c.VerifySessionTokenIssuer),
|
||||
object.WithLogger(c.Logger),
|
||||
)
|
||||
|
||||
return &Service{
|
||||
cfg: c,
|
||||
Config: c,
|
||||
}
|
||||
}
|
||||
|
||||
func (p *Service) Put() (*Streamer, error) {
|
||||
return &Streamer{
|
||||
cfg: p.cfg,
|
||||
Config: p.Config,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func WithWorkerPools(remote, local util.WorkerPool) Option {
|
||||
return func(c *cfg) {
|
||||
c.remotePool, c.localPool = remote, local
|
||||
}
|
||||
}
|
||||
|
||||
func WithLogger(l *logger.Logger) Option {
|
||||
return func(c *cfg) {
|
||||
c.log = l
|
||||
}
|
||||
}
|
||||
|
||||
func WithVerifySessionTokenIssuer(v bool) Option {
|
||||
return func(c *cfg) {
|
||||
c.verifySessionTokenIssuer = v
|
||||
}
|
||||
}
|
||||
|
|
|
@ -21,6 +21,8 @@ import (
|
|||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/policy"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
|
||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
PutSingle seems to differ heavily from the regular PUT, as it uses multiple exported entities from our new packages, without using more coarse chunks. PutSingle seems to differ heavily from the regular PUT, as it uses multiple exported entities from our new packages, without using more coarse chunks.
Why is it so? It seems we send the same object to some target. Optimizations?
aarifullin
commented
Ah... That's the hardest thing in this refactoring. The refactoring for Moving Ah... That's the hardest thing in this refactoring. The refactoring for `single` is not for optimization purpose. I believe that `PutSingle` is not related neither to `writer` nor to `target`. It still uses the config (currently it's `Static` that's going to be renamed) and `NodeDescriptor`.
Moving `Static` (`Config`) and common things like `NodeDescriptor` to one more separate package is not the big deal
fyrchik
commented
If we go one step further and move If we go one step further and move `NodeIterator` to a separate package (together with it's config), will it help somehow? Or is the issue deeper?
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/common/target"
|
||||
fyrchik
commented
Why use different alias here? Why use different alias here?
aarifullin
commented
Oh, sorry. That was left after multiple renamings. Oh, sorry. That was left after multiple renamings.
I have removed all aliases for `common/target` package because this package's name seems unambiguous
|
||||
objectwriter "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/common/writer"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/internal"
|
||||
svcutil "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
|
||||
|
@ -97,12 +99,12 @@ func (s *Service) validatePutSingle(ctx context.Context, obj *objectSDK.Object)
|
|||
|
||||
func (s *Service) validarePutSingleSize(obj *objectSDK.Object) error {
|
||||
if uint64(len(obj.Payload())) != obj.PayloadSize() {
|
||||
return ErrWrongPayloadSize
|
||||
return target.ErrWrongPayloadSize
|
||||
}
|
||||
|
||||
maxAllowedSize := s.maxSizeSrc.MaxObjectSize()
|
||||
maxAllowedSize := s.Config.MaxSizeSrc.MaxObjectSize()
|
||||
if obj.PayloadSize() > maxAllowedSize {
|
||||
return ErrExceedingMaxSize
|
||||
return target.ErrExceedingMaxSize
|
||||
}
|
||||
|
||||
return nil
|
||||
|
@ -137,11 +139,11 @@ func (s *Service) validatePutSingleChecksum(obj *objectSDK.Object) error {
|
|||
}
|
||||
|
||||
func (s *Service) validatePutSingleObject(ctx context.Context, obj *objectSDK.Object) (object.ContentMeta, error) {
|
||||
if err := s.fmtValidator.Validate(ctx, obj, false); err != nil {
|
||||
if err := s.FormatValidator.Validate(ctx, obj, false); err != nil {
|
||||
return object.ContentMeta{}, fmt.Errorf("coud not validate object format: %w", err)
|
||||
}
|
||||
|
||||
meta, err := s.fmtValidator.ValidateContent(obj)
|
||||
meta, err := s.FormatValidator.ValidateContent(obj)
|
||||
if err != nil {
|
||||
return object.ContentMeta{}, fmt.Errorf("could not validate payload content: %w", err)
|
||||
}
|
||||
|
@ -164,17 +166,17 @@ func (s *Service) saveToNodes(ctx context.Context, obj *objectSDK.Object, req *o
|
|||
}
|
||||
|
||||
func (s *Service) saveToREPReplicas(ctx context.Context, placement putSinglePlacement, obj *objectSDK.Object, localOnly bool, req *objectAPI.PutSingleRequest, meta object.ContentMeta) error {
|
||||
iter := s.cfg.newNodeIterator(placement.placementOptions)
|
||||
iter.extraBroadcastEnabled = needAdditionalBroadcast(obj, localOnly)
|
||||
iter.resetSuccessAfterOnBroadcast = placement.resetSuccessAfterOnBroadcast
|
||||
iter := s.Config.NewNodeIterator(placement.placementOptions)
|
||||
iter.ExtraBroadcastEnabled = objectwriter.NeedAdditionalBroadcast(obj, localOnly)
|
||||
iter.ResetSuccessAfterOnBroadcast = placement.resetSuccessAfterOnBroadcast
|
||||
|
||||
signer := &putSingleRequestSigner{
|
||||
req: req,
|
||||
keyStorage: s.keyStorage,
|
||||
keyStorage: s.Config.KeyStorage,
|
||||
signer: &sync.Once{},
|
||||
}
|
||||
|
||||
return iter.forEachNode(ctx, func(ctx context.Context, nd nodeDesc) error {
|
||||
return iter.ForEachNode(ctx, func(ctx context.Context, nd objectwriter.NodeDescriptor) error {
|
||||
return s.saveToPlacementNode(ctx, &nd, obj, signer, meta)
|
||||
})
|
||||
}
|
||||
|
@ -184,25 +186,25 @@ func (s *Service) saveToECReplicas(ctx context.Context, placement putSinglePlace
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
key, err := s.cfg.keyStorage.GetKey(nil)
|
||||
key, err := s.Config.KeyStorage.GetKey(nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
signer := &putSingleRequestSigner{
|
||||
req: req,
|
||||
keyStorage: s.keyStorage,
|
||||
keyStorage: s.Config.KeyStorage,
|
||||
signer: &sync.Once{},
|
||||
}
|
||||
|
||||
w := ecWriter{
|
||||
cfg: s.cfg,
|
||||
placementOpts: placement.placementOptions,
|
||||
objMeta: meta,
|
||||
objMetaValid: true,
|
||||
commonPrm: commonPrm,
|
||||
container: placement.container,
|
||||
key: key,
|
||||
relay: func(ctx context.Context, ni client.NodeInfo, mac client.MultiAddressClient) error {
|
||||
w := objectwriter.ECWriter{
|
||||
Config: s.Config,
|
||||
PlacementOpts: placement.placementOptions,
|
||||
ObjectMeta: meta,
|
||||
ObjectMetaValid: true,
|
||||
CommonPrm: commonPrm,
|
||||
Container: placement.container,
|
||||
Key: key,
|
||||
Relay: func(ctx context.Context, ni client.NodeInfo, mac client.MultiAddressClient) error {
|
||||
return s.redirectPutSingleRequest(ctx, signer, obj, ni, mac)
|
||||
},
|
||||
}
|
||||
|
@ -223,7 +225,7 @@ func (s *Service) getPutSinglePlacementOptions(obj *objectSDK.Object, copiesNumb
|
|||
if !ok {
|
||||
return result, errors.New("missing container ID")
|
||||
}
|
||||
cnrInfo, err := s.cnrSrc.Get(cnrID)
|
||||
cnrInfo, err := s.Config.ContainerSource.Get(cnrID)
|
||||
if err != nil {
|
||||
return result, fmt.Errorf("could not get container by ID: %w", err)
|
||||
}
|
||||
|
@ -247,31 +249,31 @@ func (s *Service) getPutSinglePlacementOptions(obj *objectSDK.Object, copiesNumb
|
|||
}
|
||||
result.placementOptions = append(result.placementOptions, placement.ForObject(objID))
|
||||
|
||||
latestNetmap, err := netmap.GetLatestNetworkMap(s.netMapSrc)
|
||||
latestNetmap, err := netmap.GetLatestNetworkMap(s.Config.NetmapSource)
|
||||
if err != nil {
|
||||
return result, fmt.Errorf("could not get latest network map: %w", err)
|
||||
}
|
||||
builder := placement.NewNetworkMapBuilder(latestNetmap)
|
||||
if localOnly {
|
||||
result.placementOptions = append(result.placementOptions, placement.SuccessAfter(1))
|
||||
builder = svcutil.NewLocalPlacement(builder, s.netmapKeys)
|
||||
builder = svcutil.NewLocalPlacement(builder, s.Config.NetmapKeys)
|
||||
}
|
||||
result.placementOptions = append(result.placementOptions, placement.UseBuilder(builder))
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (s *Service) saveToPlacementNode(ctx context.Context, nodeDesc *nodeDesc, obj *objectSDK.Object,
|
||||
func (s *Service) saveToPlacementNode(ctx context.Context, nodeDesc *objectwriter.NodeDescriptor, obj *objectSDK.Object,
|
||||
signer *putSingleRequestSigner, meta object.ContentMeta,
|
||||
) error {
|
||||
if nodeDesc.local {
|
||||
if nodeDesc.Local {
|
||||
return s.saveLocal(ctx, obj, meta)
|
||||
}
|
||||
|
||||
var info client.NodeInfo
|
||||
|
||||
client.NodeInfoFromNetmapElement(&info, nodeDesc.info)
|
||||
client.NodeInfoFromNetmapElement(&info, nodeDesc.Info)
|
||||
|
||||
c, err := s.clientConstructor.Get(info)
|
||||
c, err := s.Config.ClientConstructor.Get(info)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not create SDK client %s: %w", info.AddressGroup(), err)
|
||||
}
|
||||
|
@ -280,8 +282,8 @@ func (s *Service) saveToPlacementNode(ctx context.Context, nodeDesc *nodeDesc, o
|
|||
}
|
||||
|
||||
func (s *Service) saveLocal(ctx context.Context, obj *objectSDK.Object, meta object.ContentMeta) error {
|
||||
localTarget := &localTarget{
|
||||
storage: s.localStore,
|
||||
localTarget := &objectwriter.LocalTarget{
|
||||
Storage: s.Config.LocalStore,
|
||||
}
|
||||
return localTarget.WriteObject(ctx, obj, meta)
|
||||
}
|
||||
|
@ -314,7 +316,7 @@ func (s *Service) redirectPutSingleRequest(ctx context.Context,
|
|||
if err != nil {
|
||||
objID, _ := obj.ID()
|
||||
cnrID, _ := obj.ContainerID()
|
||||
s.log.Warn(logs.PutSingleRedirectFailure,
|
||||
s.Config.Logger.Warn(logs.PutSingleRedirectFailure,
|
||||
zap.Error(err),
|
||||
zap.Stringer("address", addr),
|
||||
zap.Stringer("object_id", objID),
|
||||
|
|
|
@ -2,33 +2,21 @@ package putsvc
|
|||
|
||||
import (
|
||||
"context"
|
||||
"crypto/ecdsa"
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/policy"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
|
||||
pkgutil "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
|
||||
containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/common/target"
|
||||
objectwriter "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/common/writer"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/transformer"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
|
||||
)
|
||||
|
||||
type Streamer struct {
|
||||
*cfg
|
||||
|
||||
privateKey *ecdsa.PrivateKey
|
||||
*objectwriter.Config
|
||||
|
||||
target transformer.ChunkedObjectWriter
|
||||
|
||||
relay func(context.Context, client.NodeInfo, client.MultiAddressClient) error
|
||||
|
||||
maxPayloadSz uint64 // network config
|
||||
}
|
||||
|
||||
var errNotInit = errors.New("stream not initialized")
|
||||
|
@ -36,8 +24,23 @@ var errNotInit = errors.New("stream not initialized")
|
|||
var errInitRecall = errors.New("init recall")
|
||||
|
||||
func (p *Streamer) Init(ctx context.Context, prm *PutInitPrm) error {
|
||||
if p.target != nil {
|
||||
return errInitRecall
|
||||
}
|
||||
|
||||
// initialize destination target
|
||||
if err := p.initTarget(prm); err != nil {
|
||||
prmTarget := &objectwriter.Params{
|
||||
Config: p.Config,
|
||||
Common: prm.common,
|
||||
Header: prm.hdr,
|
||||
Container: prm.cnr,
|
||||
TraverseOpts: prm.traverseOpts,
|
||||
Relay: p.relay,
|
||||
}
|
||||
|
||||
var err error
|
||||
p.target, err = target.New(prmTarget)
|
||||
if err != nil {
|
||||
return fmt.Errorf("(%T) could not initialize object target: %w", p, err)
|
||||
}
|
||||
|
||||
|
@ -47,253 +50,6 @@ func (p *Streamer) Init(ctx context.Context, prm *PutInitPrm) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// Target accesses underlying target chunked object writer.
|
||||
func (p *Streamer) Target() transformer.ChunkedObjectWriter {
|
||||
return p.target
|
||||
}
|
||||
|
||||
// MaxObjectSize returns maximum payload size for the streaming session.
|
||||
//
|
||||
// Must be called after the successful Init.
|
||||
func (p *Streamer) MaxObjectSize() uint64 {
|
||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
Previously this method exported private field, what do we need this method for now? Previously this method exported private field, what do we need this method for now?
aarifullin
commented
Removed Removed
|
||||
return p.maxPayloadSz
|
||||
}
|
||||
|
||||
func (p *Streamer) initTarget(prm *PutInitPrm) error {
|
||||
// prevent re-calling
|
||||
if p.target != nil {
|
||||
return errInitRecall
|
||||
}
|
||||
|
||||
// prepare needed put parameters
|
||||
if err := p.preparePrm(prm); err != nil {
|
||||
return fmt.Errorf("(%T) could not prepare put parameters: %w", p, err)
|
||||
}
|
||||
|
||||
p.maxPayloadSz = p.maxSizeSrc.MaxObjectSize()
|
||||
if p.maxPayloadSz == 0 {
|
||||
return fmt.Errorf("(%T) could not obtain max object size parameter", p)
|
||||
}
|
||||
|
||||
if prm.hdr.Signature() != nil {
|
||||
return p.initUntrustedTarget(prm)
|
||||
}
|
||||
return p.initTrustedTarget(prm)
|
||||
}
|
||||
|
||||
func (p *Streamer) initUntrustedTarget(prm *PutInitPrm) error {
|
||||
p.relay = prm.relay
|
||||
|
||||
if prm.privateKey != nil {
|
||||
p.privateKey = prm.privateKey
|
||||
} else {
|
||||
nodeKey, err := p.cfg.keyStorage.GetKey(nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
p.privateKey = nodeKey
|
||||
}
|
||||
|
||||
// prepare untrusted-Put object target
|
||||
p.target = &validatingPreparedTarget{
|
||||
nextTarget: newInMemoryObjectBuilder(p.newObjectWriter(prm)),
|
||||
fmt: p.fmtValidator,
|
||||
|
||||
maxPayloadSz: p.maxPayloadSz,
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *Streamer) initTrustedTarget(prm *PutInitPrm) error {
|
||||
sToken := prm.common.SessionToken()
|
||||
|
||||
// prepare trusted-Put object target
|
||||
|
||||
// get private token from local storage
|
||||
var sessionInfo *util.SessionInfo
|
||||
|
||||
if sToken != nil {
|
||||
sessionInfo = &util.SessionInfo{
|
||||
ID: sToken.ID(),
|
||||
Owner: sToken.Issuer(),
|
||||
}
|
||||
}
|
||||
|
||||
key, err := p.keyStorage.GetKey(sessionInfo)
|
||||
if err != nil {
|
||||
return fmt.Errorf("(%T) could not receive session key: %w", p, err)
|
||||
}
|
||||
|
||||
// In case session token is missing, the line above returns the default key.
|
||||
// If it isn't owner key, replication attempts will fail, thus this check.
|
||||
ownerObj := prm.hdr.OwnerID()
|
||||
if ownerObj.IsEmpty() {
|
||||
return errors.New("missing object owner")
|
||||
}
|
||||
|
||||
if sToken == nil {
|
||||
var ownerSession user.ID
|
||||
user.IDFromKey(&ownerSession, key.PublicKey)
|
||||
|
||||
if !ownerObj.Equals(ownerSession) {
|
||||
return fmt.Errorf("(%T) session token is missing but object owner id is different from the default key", p)
|
||||
}
|
||||
} else {
|
||||
if !ownerObj.Equals(sessionInfo.Owner) {
|
||||
return fmt.Errorf("(%T) different token issuer and object owner identifiers %s/%s", p, sessionInfo.Owner, ownerObj)
|
||||
}
|
||||
}
|
||||
|
||||
if prm.privateKey != nil {
|
||||
p.privateKey = prm.privateKey
|
||||
} else {
|
||||
p.privateKey = key
|
||||
}
|
||||
p.target = &validatingTarget{
|
||||
fmt: p.fmtValidator,
|
||||
nextTarget: transformer.NewPayloadSizeLimiter(transformer.Params{
|
||||
Key: key,
|
||||
NextTargetInit: func() transformer.ObjectWriter { return p.newObjectWriter(prm) },
|
||||
NetworkState: p.networkState,
|
||||
MaxSize: p.maxPayloadSz,
|
||||
WithoutHomomorphicHash: containerSDK.IsHomomorphicHashingDisabled(prm.cnr),
|
||||
SessionToken: sToken,
|
||||
}),
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *Streamer) preparePrm(prm *PutInitPrm) error {
|
||||
var err error
|
||||
|
||||
// get latest network map
|
||||
nm, err := netmap.GetLatestNetworkMap(p.netMapSrc)
|
||||
if err != nil {
|
||||
return fmt.Errorf("(%T) could not get latest network map: %w", p, err)
|
||||
}
|
||||
|
||||
idCnr, ok := prm.hdr.ContainerID()
|
||||
if !ok {
|
||||
return errors.New("missing container ID")
|
||||
}
|
||||
|
||||
// get container to store the object
|
||||
cnrInfo, err := p.cnrSrc.Get(idCnr)
|
||||
if err != nil {
|
||||
return fmt.Errorf("(%T) could not get container by ID: %w", p, err)
|
||||
}
|
||||
|
||||
prm.cnr = cnrInfo.Value
|
||||
|
||||
// add common options
|
||||
prm.traverseOpts = append(prm.traverseOpts,
|
||||
// set processing container
|
||||
placement.ForContainer(prm.cnr),
|
||||
)
|
||||
|
||||
if ech := prm.hdr.ECHeader(); ech != nil {
|
||||
prm.traverseOpts = append(prm.traverseOpts,
|
||||
// set identifier of the processing object
|
||||
placement.ForObject(ech.Parent()),
|
||||
)
|
||||
} else if id, ok := prm.hdr.ID(); ok {
|
||||
prm.traverseOpts = append(prm.traverseOpts,
|
||||
// set identifier of the processing object
|
||||
placement.ForObject(id),
|
||||
)
|
||||
}
|
||||
|
||||
// create placement builder from network map
|
||||
builder := placement.NewNetworkMapBuilder(nm)
|
||||
|
||||
if prm.common.LocalOnly() {
|
||||
// restrict success count to 1 stored copy (to local storage)
|
||||
prm.traverseOpts = append(prm.traverseOpts, placement.SuccessAfter(1))
|
||||
|
||||
// use local-only placement builder
|
||||
builder = util.NewLocalPlacement(builder, p.netmapKeys)
|
||||
}
|
||||
|
||||
// set placement builder
|
||||
prm.traverseOpts = append(prm.traverseOpts, placement.UseBuilder(builder))
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *Streamer) newObjectWriter(prm *PutInitPrm) transformer.ObjectWriter {
|
||||
if container.IsECContainer(prm.cnr) && object.IsECSupported(prm.hdr) {
|
||||
return p.newECWriter(prm)
|
||||
}
|
||||
return p.newDefaultObjectWriter(prm, false)
|
||||
}
|
||||
|
||||
func (p *Streamer) newDefaultObjectWriter(prm *PutInitPrm, forECPlacement bool) transformer.ObjectWriter {
|
||||
var relay func(context.Context, nodeDesc) error
|
||||
if p.relay != nil {
|
||||
relay = func(ctx context.Context, node nodeDesc) error {
|
||||
var info client.NodeInfo
|
||||
|
||||
client.NodeInfoFromNetmapElement(&info, node.info)
|
||||
|
||||
c, err := p.clientConstructor.Get(info)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not create SDK client %s: %w", info.AddressGroup(), err)
|
||||
}
|
||||
|
||||
return p.relay(ctx, info, c)
|
||||
}
|
||||
}
|
||||
|
||||
var resetSuccessAfterOnBroadcast bool
|
||||
traverseOpts := prm.traverseOpts
|
||||
if forECPlacement && !prm.common.LocalOnly() {
|
||||
// save non-regular and linking object to EC container.
|
||||
// EC 2.1 -> REP 2, EC 2.2 -> REP 3 etc.
|
||||
traverseOpts = append(traverseOpts, placement.SuccessAfter(uint32(policy.ECParityCount(prm.cnr.PlacementPolicy())+1)))
|
||||
resetSuccessAfterOnBroadcast = true
|
||||
}
|
||||
|
||||
return &distributedTarget{
|
||||
cfg: p.cfg,
|
||||
placementOpts: traverseOpts,
|
||||
resetSuccessAfterOnBroadcast: resetSuccessAfterOnBroadcast,
|
||||
nodeTargetInitializer: func(node nodeDesc) preparedObjectTarget {
|
||||
if node.local {
|
||||
return localTarget{
|
||||
storage: p.localStore,
|
||||
}
|
||||
}
|
||||
|
||||
rt := &remoteTarget{
|
||||
privateKey: p.privateKey,
|
||||
commonPrm: prm.common,
|
||||
clientConstructor: p.clientConstructor,
|
||||
}
|
||||
|
||||
client.NodeInfoFromNetmapElement(&rt.nodeInfo, node.info)
|
||||
|
||||
return rt
|
||||
},
|
||||
relay: relay,
|
||||
}
|
||||
}
|
||||
|
||||
func (p *Streamer) newECWriter(prm *PutInitPrm) transformer.ObjectWriter {
|
||||
return &objectWriterDispatcher{
|
||||
ecWriter: &ecWriter{
|
||||
cfg: p.cfg,
|
||||
placementOpts: append(prm.traverseOpts, placement.WithCopyNumbers(nil)), // copies number ignored for EC
|
||||
container: prm.cnr,
|
||||
key: p.privateKey,
|
||||
commonPrm: prm.common,
|
||||
relay: p.relay,
|
||||
},
|
||||
repWriter: p.newDefaultObjectWriter(prm, true),
|
||||
}
|
||||
}
|
||||
|
||||
func (p *Streamer) SendChunk(ctx context.Context, prm *PutChunkPrm) error {
|
||||
if p.target == nil {
|
||||
return errNotInit
|
||||
|
@ -327,10 +83,3 @@ func (p *Streamer) Close(ctx context.Context) (*PutResponse, error) {
|
|||
id: ids.SelfID,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (c *cfg) getWorkerPool(pub []byte) (pkgutil.WorkerPool, bool) {
|
||||
if c.netmapKeys.IsLocalKey(pub) {
|
||||
return c.localPool, true
|
||||
}
|
||||
return c.remotePool, false
|
||||
}
|
||||
|
|
|
@ -11,6 +11,7 @@ import (
|
|||
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/signature"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/common/target"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/internal"
|
||||
internalclient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/internal/client"
|
||||
putsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/put"
|
||||
|
@ -55,7 +56,7 @@ func (s *streamer) Send(ctx context.Context, req *object.PutRequest) (err error)
|
|||
|
||||
s.saveChunks = v.GetSignature() != nil
|
||||
if s.saveChunks {
|
||||
maxSz := s.stream.MaxObjectSize()
|
||||
maxSz := s.stream.MaxSizeSrc.MaxObjectSize()
|
||||
|
||||
s.sizes = &sizes{
|
||||
payloadSz: uint64(v.GetHeader().GetPayloadLength()),
|
||||
|
@ -63,7 +64,7 @@ func (s *streamer) Send(ctx context.Context, req *object.PutRequest) (err error)
|
|||
|
||||
// check payload size limit overflow
|
||||
if s.payloadSz > maxSz {
|
||||
return putsvc.ErrExceedingMaxSize
|
||||
return target.ErrExceedingMaxSize
|
||||
}
|
||||
|
||||
s.init = req
|
||||
|
@ -74,7 +75,7 @@ func (s *streamer) Send(ctx context.Context, req *object.PutRequest) (err error)
|
|||
|
||||
// check payload size overflow
|
||||
if s.writtenPayload > s.payloadSz {
|
||||
return putsvc.ErrWrongPayloadSize
|
||||
return target.ErrWrongPayloadSize
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -117,7 +118,7 @@ func (s *streamer) CloseAndRecv(ctx context.Context) (*object.PutResponse, error
|
|||
if s.saveChunks {
|
||||
// check payload size correctness
|
||||
if s.writtenPayload != s.payloadSz {
|
||||
return nil, putsvc.ErrWrongPayloadSize
|
||||
return nil, target.ErrWrongPayloadSize
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -5,7 +5,7 @@ import (
|
|||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
|
||||
putsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/put"
|
||||
objectwriter "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/common/writer"
|
||||
tracingPkg "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/tracing"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
||||
|
@ -52,7 +52,7 @@ func (p *Replicator) HandleReplicationTask(ctx context.Context, task Task, res T
|
|||
}
|
||||
}
|
||||
|
||||
prm := new(putsvc.RemotePutPrm).
|
||||
prm := new(objectwriter.RemotePutPrm).
|
||||
WithObject(task.Obj)
|
||||
|
||||
for i := 0; task.NumCopies > 0 && i < len(task.Nodes); i++ {
|
||||
|
|
|
@ -4,8 +4,8 @@ import (
|
|||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
|
||||
objectwriter "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/common/writer"
|
||||
getsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/get"
|
||||
putsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/put"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
@ -24,7 +24,7 @@ type cfg struct {
|
|||
|
||||
log *logger.Logger
|
||||
|
||||
remoteSender *putsvc.RemoteSender
|
||||
remoteSender *objectwriter.RemoteSender
|
||||
|
||||
remoteGetter *getsvc.RemoteGetter
|
||||
|
||||
|
@ -67,7 +67,7 @@ func WithLogger(v *logger.Logger) Option {
|
|||
}
|
||||
|
||||
// WithRemoteSender returns option to set remote object sender of Replicator.
|
||||
func WithRemoteSender(v *putsvc.RemoteSender) Option {
|
||||
func WithRemoteSender(v *objectwriter.RemoteSender) Option {
|
||||
return func(c *cfg) {
|
||||
c.remoteSender = v
|
||||
}
|
||||
|
|
Why is this field embedded, while the others are not?
IMO it is better to use explicit names in
Params
.Config
is the field inParams
. Please, check this out