object: Move target initialization to separate package #1344

Merged
fyrchik merged 1 commit from aarifullin/frostfs-node:feat/patch/refactor/1 into master 2024-09-05 13:04:00 +00:00
22 changed files with 599 additions and 585 deletions

View file

@ -7,7 +7,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/metrics"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
putsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/put"
objectwriter "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/common/writer"
utilSync "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/sync"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
@ -305,11 +305,11 @@ type ttlMaxObjectSizeCache struct {
mtx sync.RWMutex
lastUpdated time.Time
lastSize uint64
src putsvc.MaxSizeSource
src objectwriter.MaxSizeSource
metrics cacheMetrics
}
func newCachedMaxObjectSizeSource(src putsvc.MaxSizeSource) putsvc.MaxSizeSource {
func newCachedMaxObjectSizeSource(src objectwriter.MaxSizeSource) objectwriter.MaxSizeSource {
return &ttlMaxObjectSizeCache{
src: src,
metrics: metrics.NewCacheMetrics("max_object_size"),

View file

@ -24,6 +24,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/acl"
v2 "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/acl/v2"
objectAPE "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/ape"
objectwriter "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/common/writer"
deletesvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/delete"
deletesvcV2 "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/delete/v2"
getsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/get"
@ -188,7 +189,7 @@ func initObjectService(c *cfg) {
sDeleteV2 := createDeleteServiceV2(sDelete)
sPatch := createPatchSvc(sGet, sPut, keyStorage)
sPatch := createPatchSvc(sGet, sPut)
// build service pipeline
// grpc | audit | <metrics> | signature | response | acl | ape | split
@ -326,7 +327,7 @@ func createReplicator(c *cfg, keyStorage *util.KeyStorage, cache *cache.ClientCa
),
replicator.WithLocalStorage(ls),
replicator.WithRemoteSender(
putsvc.NewRemoteSender(keyStorage, cache),
objectwriter.NewRemoteSender(keyStorage, cache),
),
replicator.WithRemoteGetter(
getsvc.NewRemoteGetter(c.clientCache, c.netMapSource, keyStorage),
@ -338,7 +339,7 @@ func createReplicator(c *cfg, keyStorage *util.KeyStorage, cache *cache.ClientCa
func createPutSvc(c *cfg, keyStorage *util.KeyStorage, irFetcher *cachedIRFetcher) *putsvc.Service {
ls := c.cfgObject.cfgLocalStorage.localStorage
var os putsvc.ObjectStorage = engineWithoutNotifications{
var os objectwriter.ObjectStorage = engineWithoutNotifications{
engine: ls,
}
@ -352,9 +353,9 @@ func createPutSvc(c *cfg, keyStorage *util.KeyStorage, irFetcher *cachedIRFetche
c,
c.cfgNetmap.state,
irFetcher,
putsvc.WithWorkerPools(c.cfgObject.pool.putRemote, c.cfgObject.pool.putLocal),
putsvc.WithLogger(c.log),
putsvc.WithVerifySessionTokenIssuer(!c.cfgObject.skipSessionTokenIssuerVerification),
objectwriter.WithWorkerPools(c.cfgObject.pool.putRemote, c.cfgObject.pool.putLocal),
objectwriter.WithLogger(c.log),
objectwriter.WithVerifySessionTokenIssuer(!c.cfgObject.skipSessionTokenIssuerVerification),
)
}
@ -362,8 +363,8 @@ func createPutSvcV2(sPut *putsvc.Service, keyStorage *util.KeyStorage) *putsvcV2
return putsvcV2.NewService(sPut, keyStorage)
}
func createPatchSvc(sGet *getsvc.Service, sPut *putsvc.Service, keyStorage *util.KeyStorage) *patchsvc.Service {
return patchsvc.NewService(keyStorage, sGet, sPut)
func createPatchSvc(sGet *getsvc.Service, sPut *putsvc.Service) *patchsvc.Service {
return patchsvc.NewService(sPut.Config, sGet)
}
func createSearchSvc(c *cfg, keyStorage *util.KeyStorage, traverseGen *util.TraverserGenerator, coreConstructor *cache.ClientCache) *searchsvc.Service {

View file

@ -1,4 +1,4 @@
package putsvc
package target
import (
"context"

View file

@ -1,4 +1,4 @@
package putsvc
package target
import (
"sync"

View file

@ -0,0 +1,170 @@
package target
import (
"errors"
"fmt"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
objectwriter "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/common/writer"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/transformer"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
)
func New(prm *objectwriter.Params) (transformer.ChunkedObjectWriter, error) {
// prepare needed put parameters
if err := preparePrm(prm); err != nil {
return nil, fmt.Errorf("could not prepare put parameters: %w", err)
}
fyrchik marked this conversation as resolved Outdated

Why is this field embedded, while the others are not?
IMO it is better to use explicit names in Params.

Why is this field embedded, while the others are not? IMO it is better to use explicit names in `Params`.

Config is the field in Params. Please, check this out

`Config` is the field in `Params`. Please, check this out
if prm.Header.Signature() != nil {
return newUntrustedTarget(prm)
}
return newTrustedTarget(prm)
}
func newUntrustedTarget(prm *objectwriter.Params) (transformer.ChunkedObjectWriter, error) {
maxPayloadSz := prm.Config.MaxSizeSrc.MaxObjectSize()
if maxPayloadSz == 0 {
return nil, errors.New("could not obtain max object size parameter")
}
if prm.SignRequestPrivateKey == nil {
nodeKey, err := prm.Config.KeyStorage.GetKey(nil)
fyrchik marked this conversation as resolved Outdated

Why do we need a separate struct here, just to be able to convert it to the struct of the same shape?

Why do we need a separate struct here, just to be able to convert it to the struct of the same shape?

Here are highlighted points that aren't related to specifically to this question, but which help to understand why packages were refactored in this way:

  1. The Target initialization logic should be moved in a common package. Since both patch and put can reuse this
  2. I assumed that writer and target should be distingushed:
  • writer package's purpose is to create transformer.ObjectWriter
  • writer package is used in a few packages
  • target package's purpose is to create transformer.ChunkedObjectWriter
  • target is used only within put and patch streams

It looked ambiguous with me to use objectwriter.Params as an input parameter for target initialization (New). I admit that such convertation looks ugly - so, I'll remove it

Here are highlighted points that aren't related to specifically to this question, but which help to understand why packages were refactored in this way: 1. The `Target` initialization logic should be moved in a common package. Since both `patch` and `put` can reuse this 2. I assumed that `writer` and `target` should be distingushed: - `writer` package's purpose is to create `transformer.ObjectWriter` - `writer` package is used in a few packages - `target` package's purpose is to create `transformer.ChunkedObjectWriter` - `target` is used only within `put` and `patch` streams It looked ambiguous with me to use `objectwriter.Params` as an input parameter for target initialization (`New`). I admit that such convertation looks ugly - so, I'll remove it

Since target.New uses objectwriter.Params. Please, check this out

Since `target.New` uses `objectwriter.Params`. Please, check this out
if err != nil {
return nil, err
}
prm.SignRequestPrivateKey = nodeKey
}
// prepare untrusted-Put object target
return &validatingPreparedTarget{
nextTarget: newInMemoryObjectBuilder(objectwriter.New(prm)),
fmt: prm.Config.FormatValidator,
maxPayloadSz: maxPayloadSz,
}, nil
}
func newTrustedTarget(prm *objectwriter.Params) (transformer.ChunkedObjectWriter, error) {
maxPayloadSz := prm.Config.MaxSizeSrc.MaxObjectSize()
if maxPayloadSz == 0 {
return nil, errors.New("could not obtain max object size parameter")
}
sToken := prm.Common.SessionToken()
// prepare trusted-Put object target
// get private token from local storage
var sessionInfo *util.SessionInfo
if sToken != nil {
sessionInfo = &util.SessionInfo{
ID: sToken.ID(),
Owner: sToken.Issuer(),
}
}
key, err := prm.Config.KeyStorage.GetKey(sessionInfo)
if err != nil {
return nil, fmt.Errorf("could not receive session key: %w", err)
}
// In case session token is missing, the line above returns the default key.
// If it isn't owner key, replication attempts will fail, thus this check.
ownerObj := prm.Header.OwnerID()
if ownerObj.IsEmpty() {
return nil, errors.New("missing object owner")
}
if sToken == nil {
var ownerSession user.ID
user.IDFromKey(&ownerSession, key.PublicKey)
if !ownerObj.Equals(ownerSession) {
return nil, errors.New("session token is missing but object owner id is different from the default key")
}
} else {
if !ownerObj.Equals(sessionInfo.Owner) {
return nil, fmt.Errorf("different token issuer and object owner identifiers %s/%s", sessionInfo.Owner, ownerObj)
}
}
if prm.SignRequestPrivateKey == nil {
prm.SignRequestPrivateKey = key
}
return &validatingTarget{
fmt: prm.Config.FormatValidator,
nextTarget: transformer.NewPayloadSizeLimiter(transformer.Params{
Key: key,
NextTargetInit: func() transformer.ObjectWriter { return objectwriter.New(prm) },
NetworkState: prm.Config.NetworkState,
MaxSize: maxPayloadSz,
WithoutHomomorphicHash: containerSDK.IsHomomorphicHashingDisabled(prm.Container),
SessionToken: sToken,
}),
}, nil
}
func preparePrm(prm *objectwriter.Params) error {
var err error
// get latest network map
nm, err := netmap.GetLatestNetworkMap(prm.Config.NetmapSource)
if err != nil {
//return fmt.Errorf("(%T) could not get latest network map: %w", p, err)
return fmt.Errorf("could not get latest network map: %w", err)
}
idCnr, ok := prm.Header.ContainerID()
if !ok {
return errors.New("missing container ID")
}
// get container to store the object
cnrInfo, err := prm.Config.ContainerSource.Get(idCnr)
if err != nil {
//return fmt.Errorf("(%T) could not get container by ID: %w", p, err)
return fmt.Errorf("could not get container by ID: %w", err)
}
prm.Container = cnrInfo.Value
// add common options
prm.TraverseOpts = append(prm.TraverseOpts,
// set processing container
placement.ForContainer(prm.Container),
)
if ech := prm.Header.ECHeader(); ech != nil {
prm.TraverseOpts = append(prm.TraverseOpts,
// set identifier of the processing object
placement.ForObject(ech.Parent()),
)
} else if id, ok := prm.Header.ID(); ok {
prm.TraverseOpts = append(prm.TraverseOpts,
// set identifier of the processing object
placement.ForObject(id),
)
}
// create placement builder from network map
builder := placement.NewNetworkMapBuilder(nm)
if prm.Common.LocalOnly() {
// restrict success count to 1 stored copy (to local storage)
prm.TraverseOpts = append(prm.TraverseOpts, placement.SuccessAfter(1))
// use local-only placement builder
builder = util.NewLocalPlacement(builder, prm.Config.NetmapKeys)
}
// set placement builder
prm.TraverseOpts = append(prm.TraverseOpts, placement.UseBuilder(builder))
return nil
}

View file

@ -1,4 +1,4 @@
package putsvc
package target
import (
"bytes"

View file

@ -1,4 +1,4 @@
package putsvc
package writer
import (
"context"
@ -13,23 +13,23 @@ import (
"go.uber.org/zap"
)
type nodeIterator struct {
traversal
cfg *cfg
fyrchik marked this conversation as resolved Outdated

Why is it public now?

Why is it public now?

The file has been moved to another package (now it's within common/writer) and we need to access some fields

The file has been moved to another package (now it's within `common/writer`) and we need to access some fields
type NodeIterator struct {
Traversal
cfg *Config
}
func (c *cfg) newNodeIterator(opts []placement.Option) *nodeIterator {
return &nodeIterator{
traversal: traversal{
opts: opts,
mExclude: make(map[string]*bool),
func (c *Config) NewNodeIterator(opts []placement.Option) *NodeIterator {
return &NodeIterator{
Traversal: Traversal{
Opts: opts,
Exclude: make(map[string]*bool),
},
cfg: c,
}
}
func (n *nodeIterator) forEachNode(ctx context.Context, f func(context.Context, nodeDesc) error) error {
traverser, err := placement.NewTraverser(n.traversal.opts...)
func (n *NodeIterator) ForEachNode(ctx context.Context, f func(context.Context, NodeDescriptor) error) error {
traverser, err := placement.NewTraverser(n.Traversal.Opts...)
if err != nil {
return fmt.Errorf("could not create object placement traverser: %w", err)
}
@ -56,10 +56,10 @@ func (n *nodeIterator) forEachNode(ctx context.Context, f func(context.Context,
}
// perform additional container broadcast if needed
if n.traversal.submitPrimaryPlacementFinish() {
err := n.forEachNode(ctx, f)
if n.Traversal.submitPrimaryPlacementFinish() {
err := n.ForEachNode(ctx, f)
if err != nil {
n.cfg.log.Error(logs.PutAdditionalContainerBroadcastFailure, zap.Error(err))
n.cfg.Logger.Error(logs.PutAdditionalContainerBroadcastFailure, zap.Error(err))
// we don't fail primary operation because of broadcast failure
}
}
@ -67,11 +67,11 @@ func (n *nodeIterator) forEachNode(ctx context.Context, f func(context.Context,
return nil
}
func (n *nodeIterator) forEachAddress(ctx context.Context, traverser *placement.Traverser, addrs []placement.Node, f func(context.Context, nodeDesc) error, resErr *atomic.Value) bool {
func (n *NodeIterator) forEachAddress(ctx context.Context, traverser *placement.Traverser, addrs []placement.Node, f func(context.Context, NodeDescriptor) error, resErr *atomic.Value) bool {
var wg sync.WaitGroup
for _, addr := range addrs {
if ok := n.mExclude[string(addr.PublicKey())]; ok != nil {
if ok := n.Exclude[string(addr.PublicKey())]; ok != nil {
if *ok {
traverser.SubmitSuccess()
}
@ -86,10 +86,10 @@ func (n *nodeIterator) forEachAddress(ctx context.Context, traverser *placement.
if err := workerPool.Submit(func() {
defer wg.Done()
err := f(ctx, nodeDesc{local: isLocal, info: addr})
err := f(ctx, NodeDescriptor{Local: isLocal, Info: addr})
if err != nil {
resErr.Store(err)
svcutil.LogServiceError(n.cfg.log, "PUT", addr.Addresses(), err)
svcutil.LogServiceError(n.cfg.Logger, "PUT", addr.Addresses(), err)
return
}
@ -97,7 +97,7 @@ func (n *nodeIterator) forEachAddress(ctx context.Context, traverser *placement.
*item = true
}); err != nil {
wg.Done()
svcutil.LogWorkerPoolError(n.cfg.log, "PUT", err)
svcutil.LogWorkerPoolError(n.cfg.Logger, "PUT", err)
return true
}
@ -105,7 +105,7 @@ func (n *nodeIterator) forEachAddress(ctx context.Context, traverser *placement.
// in subsequent container broadcast. Note that we don't
// process this node during broadcast if primary placement
// on it failed.
n.traversal.submitProcessed(addr, item)
n.Traversal.submitProcessed(addr, item)
}
wg.Wait()
@ -113,6 +113,6 @@ func (n *nodeIterator) forEachAddress(ctx context.Context, traverser *placement.
return false
}
func needAdditionalBroadcast(obj *objectSDK.Object, localOnly bool) bool {
func NeedAdditionalBroadcast(obj *objectSDK.Object, localOnly bool) bool {
return len(obj.Children()) > 0 || (!localOnly && (obj.Type() == objectSDK.TypeTombstone || obj.Type() == objectSDK.TypeLock))
}

View file

@ -1,4 +1,4 @@
package putsvc
package writer
import (
"context"

View file

@ -1,4 +1,4 @@
package putsvc
package writer
import (
"context"
@ -13,47 +13,47 @@ type preparedObjectTarget interface {
WriteObject(context.Context, *objectSDK.Object, object.ContentMeta) error
}
type distributedTarget struct {
type distributedWriter struct {
cfg *Config
placementOpts []placement.Option
obj *objectSDK.Object
objMeta object.ContentMeta
*cfg
nodeTargetInitializer func(NodeDescriptor) preparedObjectTarget
nodeTargetInitializer func(nodeDesc) preparedObjectTarget
relay func(context.Context, nodeDesc) error
relay func(context.Context, NodeDescriptor) error
resetSuccessAfterOnBroadcast bool
}
// parameters and state of container traversal.
type traversal struct {
opts []placement.Option
// parameters and state of container Traversal.
type Traversal struct {
Opts []placement.Option
// need of additional broadcast after the object is saved
extraBroadcastEnabled bool
ExtraBroadcastEnabled bool
// container nodes which was processed during the primary object placement
mExclude map[string]*bool
Exclude map[string]*bool
resetSuccessAfterOnBroadcast bool
ResetSuccessAfterOnBroadcast bool
}
// updates traversal parameters after the primary placement finish and
// returns true if additional container broadcast is needed.
func (x *traversal) submitPrimaryPlacementFinish() bool {
if x.extraBroadcastEnabled {
func (x *Traversal) submitPrimaryPlacementFinish() bool {
if x.ExtraBroadcastEnabled {
// do not track success during container broadcast (best-effort)
x.opts = append(x.opts, placement.WithoutSuccessTracking())
x.Opts = append(x.Opts, placement.WithoutSuccessTracking())
if x.resetSuccessAfterOnBroadcast {
x.opts = append(x.opts, placement.ResetSuccessAfter())
if x.ResetSuccessAfterOnBroadcast {
x.Opts = append(x.Opts, placement.ResetSuccessAfter())
}
// avoid 2nd broadcast
x.extraBroadcastEnabled = false
x.ExtraBroadcastEnabled = false
return true
}
@ -62,22 +62,22 @@ func (x *traversal) submitPrimaryPlacementFinish() bool {
}
// marks the container node as processed during the primary object placement.
func (x *traversal) submitProcessed(n placement.Node, item *bool) {
if x.extraBroadcastEnabled {
func (x *Traversal) submitProcessed(n placement.Node, item *bool) {
if x.ExtraBroadcastEnabled {
key := string(n.PublicKey())
if x.mExclude == nil {
x.mExclude = make(map[string]*bool, 1)
if x.Exclude == nil {
x.Exclude = make(map[string]*bool, 1)
}
x.mExclude[key] = item
x.Exclude[key] = item
}
}
type nodeDesc struct {
local bool
type NodeDescriptor struct {
Local bool
info placement.Node
Info placement.Node
}
// errIncompletePut is returned if processing on a container fails.
@ -96,19 +96,19 @@ func (x errIncompletePut) Error() string {
}
// WriteObject implements the transformer.ObjectWriter interface.
func (t *distributedTarget) WriteObject(ctx context.Context, obj *objectSDK.Object) error {
func (t *distributedWriter) WriteObject(ctx context.Context, obj *objectSDK.Object) error {
t.obj = obj
var err error
if t.objMeta, err = t.fmtValidator.ValidateContent(t.obj); err != nil {
if t.objMeta, err = t.cfg.FormatValidator.ValidateContent(t.obj); err != nil {
return fmt.Errorf("(%T) could not validate payload content: %w", t, err)
}
return t.iteratePlacement(ctx)
}
func (t *distributedTarget) sendObject(ctx context.Context, node nodeDesc) error {
if !node.local && t.relay != nil {
func (t *distributedWriter) sendObject(ctx context.Context, node NodeDescriptor) error {
if !node.Local && t.relay != nil {
return t.relay(ctx, node)
}
@ -121,11 +121,11 @@ func (t *distributedTarget) sendObject(ctx context.Context, node nodeDesc) error
return nil
}
func (t *distributedTarget) iteratePlacement(ctx context.Context) error {
func (t *distributedWriter) iteratePlacement(ctx context.Context) error {
id, _ := t.obj.ID()
iter := t.cfg.newNodeIterator(append(t.placementOpts, placement.ForObject(id)))
iter.extraBroadcastEnabled = needAdditionalBroadcast(t.obj, false /* Distributed target is for cluster-wide PUT */)
iter.resetSuccessAfterOnBroadcast = t.resetSuccessAfterOnBroadcast
return iter.forEachNode(ctx, t.sendObject)
iter := t.cfg.NewNodeIterator(append(t.placementOpts, placement.ForObject(id)))
iter.ExtraBroadcastEnabled = NeedAdditionalBroadcast(t.obj, false /* Distributed target is for cluster-wide PUT */)
iter.ResetSuccessAfterOnBroadcast = t.resetSuccessAfterOnBroadcast
return iter.ForEachNode(ctx, t.sendObject)
}

View file

@ -1,4 +1,4 @@
package putsvc
package writer
import (
"context"
@ -23,23 +23,23 @@ import (
"golang.org/x/sync/errgroup"
)
var _ transformer.ObjectWriter = (*ecWriter)(nil)
var _ transformer.ObjectWriter = (*ECWriter)(nil)
var errUnsupportedECObject = errors.New("object is not supported for erasure coding")
type ecWriter struct {
cfg *cfg
placementOpts []placement.Option
container containerSDK.Container
key *ecdsa.PrivateKey
commonPrm *svcutil.CommonPrm
relay func(context.Context, client.NodeInfo, client.MultiAddressClient) error
type ECWriter struct {
Config *Config
PlacementOpts []placement.Option
Container containerSDK.Container
Key *ecdsa.PrivateKey
CommonPrm *svcutil.CommonPrm
Relay func(context.Context, client.NodeInfo, client.MultiAddressClient) error
objMeta object.ContentMeta
objMetaValid bool
ObjectMeta object.ContentMeta
ObjectMetaValid bool
}
func (e *ecWriter) WriteObject(ctx context.Context, obj *objectSDK.Object) error {
func (e *ECWriter) WriteObject(ctx context.Context, obj *objectSDK.Object) error {
relayed, err := e.relayIfNotContainerNode(ctx, obj)
if err != nil {
return err
@ -53,11 +53,11 @@ func (e *ecWriter) WriteObject(ctx context.Context, obj *objectSDK.Object) error
return errUnsupportedECObject
}
if !e.objMetaValid {
if e.objMeta, err = e.cfg.fmtValidator.ValidateContent(obj); err != nil {
if !e.ObjectMetaValid {
if e.ObjectMeta, err = e.Config.FormatValidator.ValidateContent(obj); err != nil {
return fmt.Errorf("(%T) could not validate payload content: %w", e, err)
}
e.objMetaValid = true
e.ObjectMetaValid = true
}
if obj.ECHeader() != nil {
@ -66,8 +66,8 @@ func (e *ecWriter) WriteObject(ctx context.Context, obj *objectSDK.Object) error
return e.writeRawObject(ctx, obj)
}
func (e *ecWriter) relayIfNotContainerNode(ctx context.Context, obj *objectSDK.Object) (bool, error) {
if e.relay == nil {
func (e *ECWriter) relayIfNotContainerNode(ctx context.Context, obj *objectSDK.Object) (bool, error) {
if e.Relay == nil {
return false, nil
}
currentNodeIsContainerNode, err := e.currentNodeIsContainerNode()
@ -90,8 +90,8 @@ func (e *ecWriter) relayIfNotContainerNode(ctx context.Context, obj *objectSDK.O
return true, nil
}
func (e *ecWriter) currentNodeIsContainerNode() (bool, error) {
t, err := placement.NewTraverser(e.placementOpts...)
func (e *ECWriter) currentNodeIsContainerNode() (bool, error) {
t, err := placement.NewTraverser(e.PlacementOpts...)
if err != nil {
return false, err
}
@ -101,7 +101,7 @@ func (e *ecWriter) currentNodeIsContainerNode() (bool, error) {
break
}
for _, node := range nodes {
if e.cfg.netmapKeys.IsLocalKey(node.PublicKey()) {
if e.Config.NetmapKeys.IsLocalKey(node.PublicKey()) {
return true, nil
}
}
@ -109,8 +109,8 @@ func (e *ecWriter) currentNodeIsContainerNode() (bool, error) {
return false, nil
}
func (e *ecWriter) relayToContainerNode(ctx context.Context, objID oid.ID, index uint32) error {
t, err := placement.NewTraverser(append(e.placementOpts, placement.ForObject(objID))...)
func (e *ECWriter) relayToContainerNode(ctx context.Context, objID oid.ID, index uint32) error {
t, err := placement.NewTraverser(append(e.PlacementOpts, placement.ForObject(objID))...)
if err != nil {
return err
}
@ -126,18 +126,18 @@ func (e *ecWriter) relayToContainerNode(ctx context.Context, objID oid.ID, index
var info client.NodeInfo
client.NodeInfoFromNetmapElement(&info, node)
c, err := e.cfg.clientConstructor.Get(info)
c, err := e.Config.ClientConstructor.Get(info)
if err != nil {
return fmt.Errorf("could not create SDK client %s: %w", info.AddressGroup(), err)
}
completed := make(chan interface{})
if poolErr := e.cfg.remotePool.Submit(func() {
if poolErr := e.Config.RemotePool.Submit(func() {
defer close(completed)
err = e.relay(ctx, info, c)
err = e.Relay(ctx, info, c)
}); poolErr != nil {
close(completed)
svcutil.LogWorkerPoolError(e.cfg.log, "PUT", poolErr)
svcutil.LogWorkerPoolError(e.Config.Logger, "PUT", poolErr)
return poolErr
}
<-completed
@ -145,7 +145,7 @@ func (e *ecWriter) relayToContainerNode(ctx context.Context, objID oid.ID, index
if err == nil {
return nil
}
e.cfg.log.Logger.Warn(logs.ECFailedToSendToContainerNode, zap.Stringers("address_group", info.AddressGroup()))
e.Config.Logger.Logger.Warn(logs.ECFailedToSendToContainerNode, zap.Stringers("address_group", info.AddressGroup()))
lastErr = err
}
}
@ -157,12 +157,12 @@ func (e *ecWriter) relayToContainerNode(ctx context.Context, objID oid.ID, index
}
}
func (e *ecWriter) writeECPart(ctx context.Context, obj *objectSDK.Object) error {
if e.commonPrm.LocalOnly() {
func (e *ECWriter) writeECPart(ctx context.Context, obj *objectSDK.Object) error {
if e.CommonPrm.LocalOnly() {
return e.writePartLocal(ctx, obj)
}
t, err := placement.NewTraverser(append(e.placementOpts, placement.ForObject(obj.ECHeader().Parent()))...)
t, err := placement.NewTraverser(append(e.PlacementOpts, placement.ForObject(obj.ECHeader().Parent()))...)
if err != nil {
return err
}
@ -187,18 +187,18 @@ func (e *ecWriter) writeECPart(ctx context.Context, obj *objectSDK.Object) error
return nil
}
func (e *ecWriter) writeRawObject(ctx context.Context, obj *objectSDK.Object) error {
func (e *ECWriter) writeRawObject(ctx context.Context, obj *objectSDK.Object) error {
// now only single EC policy is supported
c, err := erasurecode.NewConstructor(policy.ECDataCount(e.container.PlacementPolicy()), policy.ECParityCount(e.container.PlacementPolicy()))
c, err := erasurecode.NewConstructor(policy.ECDataCount(e.Container.PlacementPolicy()), policy.ECParityCount(e.Container.PlacementPolicy()))
if err != nil {
return err
}
parts, err := c.Split(obj, e.key)
parts, err := c.Split(obj, e.Key)
if err != nil {
return err
}
objID, _ := obj.ID()
t, err := placement.NewTraverser(append(e.placementOpts, placement.ForObject(objID))...)
t, err := placement.NewTraverser(append(e.PlacementOpts, placement.ForObject(objID))...)
if err != nil {
return err
}
@ -230,7 +230,7 @@ func (e *ecWriter) writeRawObject(ctx context.Context, obj *objectSDK.Object) er
return nil
}
func (e *ecWriter) writePart(ctx context.Context, obj *objectSDK.Object, partIdx int, nodes []placement.Node, visited []atomic.Bool) error {
func (e *ECWriter) writePart(ctx context.Context, obj *objectSDK.Object, partIdx int, nodes []placement.Node, visited []atomic.Bool) error {
select {
case <-ctx.Done():
return ctx.Err()
@ -243,7 +243,7 @@ func (e *ecWriter) writePart(ctx context.Context, obj *objectSDK.Object, partIdx
if err == nil {
return nil
}
e.cfg.log.Warn(logs.ECFailedToSaveECPart, zap.Stringer("part_address", object.AddressOf(obj)),
e.Config.Logger.Warn(logs.ECFailedToSaveECPart, zap.Stringer("part_address", object.AddressOf(obj)),
zap.Stringer("parent_address", obj.ECHeader().Parent()), zap.Int("part_index", partIdx),
zap.String("node", hex.EncodeToString(node.PublicKey())), zap.Error(err))
@ -267,7 +267,7 @@ func (e *ecWriter) writePart(ctx context.Context, obj *objectSDK.Object, partIdx
if err == nil {
return nil
}
e.cfg.log.Warn(logs.ECFailedToSaveECPart, zap.Stringer("part_address", object.AddressOf(obj)),
e.Config.Logger.Warn(logs.ECFailedToSaveECPart, zap.Stringer("part_address", object.AddressOf(obj)),
zap.Stringer("parent_address", obj.ECHeader().Parent()), zap.Int("part_index", partIdx),
zap.String("node", hex.EncodeToString(node.PublicKey())),
zap.Error(err))
@ -291,7 +291,7 @@ func (e *ecWriter) writePart(ctx context.Context, obj *objectSDK.Object, partIdx
if err == nil {
return nil
}
e.cfg.log.Warn(logs.ECFailedToSaveECPart, zap.Stringer("part_address", object.AddressOf(obj)),
e.Config.Logger.Warn(logs.ECFailedToSaveECPart, zap.Stringer("part_address", object.AddressOf(obj)),
zap.Stringer("parent_address", obj.ECHeader().Parent()), zap.Int("part_index", partIdx),
zap.String("node", hex.EncodeToString(node.PublicKey())),
zap.Error(err))
@ -300,22 +300,22 @@ func (e *ecWriter) writePart(ctx context.Context, obj *objectSDK.Object, partIdx
return fmt.Errorf("failed to save EC chunk %s to any node", object.AddressOf(obj))
}
func (e *ecWriter) putECPartToNode(ctx context.Context, obj *objectSDK.Object, node placement.Node) error {
if e.cfg.netmapKeys.IsLocalKey(node.PublicKey()) {
func (e *ECWriter) putECPartToNode(ctx context.Context, obj *objectSDK.Object, node placement.Node) error {
if e.Config.NetmapKeys.IsLocalKey(node.PublicKey()) {
return e.writePartLocal(ctx, obj)
}
return e.writePartRemote(ctx, obj, node)
}
func (e *ecWriter) writePartLocal(ctx context.Context, obj *objectSDK.Object) error {
func (e *ECWriter) writePartLocal(ctx context.Context, obj *objectSDK.Object) error {
var err error
localTarget := localTarget{
storage: e.cfg.localStore,
localTarget := LocalTarget{
Storage: e.Config.LocalStore,
}
completed := make(chan interface{})
if poolErr := e.cfg.localPool.Submit(func() {
if poolErr := e.Config.LocalPool.Submit(func() {
defer close(completed)
err = localTarget.WriteObject(ctx, obj, e.objMeta)
err = localTarget.WriteObject(ctx, obj, e.ObjectMeta)
}); poolErr != nil {
close(completed)
return poolErr
@ -324,22 +324,22 @@ func (e *ecWriter) writePartLocal(ctx context.Context, obj *objectSDK.Object) er
return err
}
func (e *ecWriter) writePartRemote(ctx context.Context, obj *objectSDK.Object, node placement.Node) error {
func (e *ECWriter) writePartRemote(ctx context.Context, obj *objectSDK.Object, node placement.Node) error {
var clientNodeInfo client.NodeInfo
client.NodeInfoFromNetmapElement(&clientNodeInfo, node)
remoteTaget := remoteTarget{
privateKey: e.key,
clientConstructor: e.cfg.clientConstructor,
commonPrm: e.commonPrm,
remoteTaget := remoteWriter{
privateKey: e.Key,
clientConstructor: e.Config.ClientConstructor,
commonPrm: e.CommonPrm,
nodeInfo: clientNodeInfo,
}
var err error
completed := make(chan interface{})
if poolErr := e.cfg.remotePool.Submit(func() {
if poolErr := e.Config.RemotePool.Submit(func() {
defer close(completed)
err = remoteTaget.WriteObject(ctx, obj, e.objMeta)
err = remoteTaget.WriteObject(ctx, obj, e.ObjectMeta)
}); poolErr != nil {
close(completed)
return poolErr

View file

@ -1,4 +1,4 @@
package putsvc
package writer
import (
"context"
@ -24,19 +24,19 @@ type ObjectStorage interface {
IsLocked(context.Context, oid.Address) (bool, error)
}
type localTarget struct {
storage ObjectStorage
type LocalTarget struct {
Storage ObjectStorage
}
func (t localTarget) WriteObject(ctx context.Context, obj *objectSDK.Object, meta objectCore.ContentMeta) error {
func (t LocalTarget) WriteObject(ctx context.Context, obj *objectSDK.Object, meta objectCore.ContentMeta) error {
switch meta.Type() {
case objectSDK.TypeTombstone:
err := t.storage.Delete(ctx, objectCore.AddressOf(obj), meta.Objects())
err := t.Storage.Delete(ctx, objectCore.AddressOf(obj), meta.Objects())
if err != nil {
return fmt.Errorf("could not delete objects from tombstone locally: %w", err)
}
case objectSDK.TypeLock:
err := t.storage.Lock(ctx, objectCore.AddressOf(obj), meta.Objects())
err := t.Storage.Lock(ctx, objectCore.AddressOf(obj), meta.Objects())
if err != nil {
return fmt.Errorf("could not lock object from lock objects locally: %w", err)
}
@ -44,7 +44,7 @@ func (t localTarget) WriteObject(ctx context.Context, obj *objectSDK.Object, met
// objects that do not change meta storage
}
if err := t.storage.Put(ctx, obj); err != nil {
if err := t.Storage.Put(ctx, obj); err != nil {
return fmt.Errorf("(%T) could not put object to local storage: %w", t, err)
}
return nil

View file

@ -1,4 +1,4 @@
package putsvc
package writer
import (
"context"
@ -16,7 +16,7 @@ import (
"google.golang.org/grpc/status"
)
type remoteTarget struct {
type remoteWriter struct {
privateKey *ecdsa.PrivateKey
commonPrm *util.CommonPrm
@ -41,7 +41,7 @@ type RemotePutPrm struct {
obj *objectSDK.Object
}
func (t *remoteTarget) WriteObject(ctx context.Context, obj *objectSDK.Object, _ objectcore.ContentMeta) error {
func (t *remoteWriter) WriteObject(ctx context.Context, obj *objectSDK.Object, _ objectcore.ContentMeta) error {
c, err := t.clientConstructor.Get(t.nodeInfo)
if err != nil {
return fmt.Errorf("(%T) could not create SDK client %s: %w", t, t.nodeInfo, err)
@ -64,7 +64,7 @@ func (t *remoteTarget) WriteObject(ctx context.Context, obj *objectSDK.Object, _
return t.putStream(ctx, prm)
}
func (t *remoteTarget) putStream(ctx context.Context, prm internalclient.PutObjectPrm) error {
func (t *remoteWriter) putStream(ctx context.Context, prm internalclient.PutObjectPrm) error {
_, err := internalclient.PutObject(ctx, prm)
if err != nil {
return fmt.Errorf("(%T) could not put object to %s: %w", t, t.nodeInfo.AddressGroup(), err)
@ -72,7 +72,7 @@ func (t *remoteTarget) putStream(ctx context.Context, prm internalclient.PutObje
return nil
}
func (t *remoteTarget) putSingle(ctx context.Context, prm internalclient.PutObjectPrm) error {
func (t *remoteWriter) putSingle(ctx context.Context, prm internalclient.PutObjectPrm) error {
_, err := internalclient.PutObjectSingle(ctx, prm)
if err != nil {
return fmt.Errorf("(%T) could not put single object to %s: %w", t, t.nodeInfo.AddressGroup(), err)
@ -113,7 +113,7 @@ func (s *RemoteSender) PutObject(ctx context.Context, p *RemotePutPrm) error {
return err
}
t := &remoteTarget{
t := &remoteWriter{
privateKey: key,
clientConstructor: s.clientConstructor,
}

View file

@ -0,0 +1,183 @@
package writer
import (
"context"
"crypto/ecdsa"
"fmt"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/policy"
objutil "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/transformer"
)
type MaxSizeSource interface {
// MaxObjectSize returns maximum payload size
// of physically stored object in system.
//
// Must return 0 if value can not be obtained.
MaxObjectSize() uint64
}
type ClientConstructor interface {
Get(client.NodeInfo) (client.MultiAddressClient, error)
}
type InnerRing interface {
InnerRingKeys() ([][]byte, error)
}
type FormatValidatorConfig interface {
VerifySessionTokenIssuer() bool
}
fyrchik marked this conversation as resolved Outdated

Why don't you use Config like in other code? "Static" is not even a noun (or doesn't have the meaning we are trying to convey here).

Why don't you use `Config` like in other code? "Static" is not even a noun (or doesn't have the meaning we are trying to convey here).

Ok, I'll rename it back

Ok, I'll rename it back

BTW, I have named it Static because I was not sure that using the word Config is appropriate for non-service package - as it was previosly in put service package.
Static - I meant that there parameters are set during initialzation only unlike field in Params

BTW, I have named it `Static` because I was not sure that using the word `Config` is appropriate for non-service package - as it was previosly in `put` service package. `Static` - I meant that there parameters are set during initialzation only unlike field in `Params`
// Config represents a set of static parameters that are established during
// the initialization phase of all services.
type Config struct {
KeyStorage *objutil.KeyStorage
MaxSizeSrc MaxSizeSource
LocalStore ObjectStorage
ContainerSource container.Source
NetmapSource netmap.Source
RemotePool, LocalPool util.WorkerPool
NetmapKeys netmap.AnnouncedKeys
FormatValidator *object.FormatValidator
NetworkState netmap.State
ClientConstructor ClientConstructor
Logger *logger.Logger
VerifySessionTokenIssuer bool
}
type Option func(*Config)
func WithWorkerPools(remote, local util.WorkerPool) Option {
return func(c *Config) {
c.RemotePool, c.LocalPool = remote, local
}
}
func WithLogger(l *logger.Logger) Option {
return func(c *Config) {
c.Logger = l
}
}
func WithVerifySessionTokenIssuer(v bool) Option {
return func(c *Config) {
c.VerifySessionTokenIssuer = v
}
}
func (c *Config) getWorkerPool(pub []byte) (util.WorkerPool, bool) {
if c.NetmapKeys.IsLocalKey(pub) {
return c.LocalPool, true
}
return c.RemotePool, false
}
type Params struct {
Config *Config
Common *objutil.CommonPrm
Header *objectSDK.Object
Container containerSDK.Container
TraverseOpts []placement.Option
Relay func(context.Context, client.NodeInfo, client.MultiAddressClient) error
SignRequestPrivateKey *ecdsa.PrivateKey
}
func New(prm *Params) transformer.ObjectWriter {
if container.IsECContainer(prm.Container) && object.IsECSupported(prm.Header) {
return newECWriter(prm)
}
return newDefaultObjectWriter(prm, false)
}
func newDefaultObjectWriter(prm *Params, forECPlacement bool) transformer.ObjectWriter {
var relay func(context.Context, NodeDescriptor) error
if prm.Relay != nil {
relay = func(ctx context.Context, node NodeDescriptor) error {
var info client.NodeInfo
client.NodeInfoFromNetmapElement(&info, node.Info)
c, err := prm.Config.ClientConstructor.Get(info)
if err != nil {
return fmt.Errorf("could not create SDK client %s: %w", info.AddressGroup(), err)
}
return prm.Relay(ctx, info, c)
}
}
var resetSuccessAfterOnBroadcast bool
traverseOpts := prm.TraverseOpts
if forECPlacement && !prm.Common.LocalOnly() {
// save non-regular and linking object to EC container.
// EC 2.1 -> REP 2, EC 2.2 -> REP 3 etc.
traverseOpts = append(traverseOpts, placement.SuccessAfter(uint32(policy.ECParityCount(prm.Container.PlacementPolicy())+1)))
resetSuccessAfterOnBroadcast = true
}
return &distributedWriter{
cfg: prm.Config,
placementOpts: traverseOpts,
resetSuccessAfterOnBroadcast: resetSuccessAfterOnBroadcast,
nodeTargetInitializer: func(node NodeDescriptor) preparedObjectTarget {
if node.Local {
return LocalTarget{
Storage: prm.Config.LocalStore,
}
}
rt := &remoteWriter{
privateKey: prm.SignRequestPrivateKey,
commonPrm: prm.Common,
clientConstructor: prm.Config.ClientConstructor,
}
client.NodeInfoFromNetmapElement(&rt.nodeInfo, node.Info)
return rt
},
relay: relay,
}
}
func newECWriter(prm *Params) transformer.ObjectWriter {
return &objectWriterDispatcher{
ecWriter: &ECWriter{
Config: prm.Config,
PlacementOpts: append(prm.TraverseOpts, placement.WithCopyNumbers(nil)), // copies number ignored for EC
Container: prm.Container,
Key: prm.SignRequestPrivateKey,
CommonPrm: prm.Common,
Relay: prm.Relay,
},
repWriter: newDefaultObjectWriter(prm, true),
}
}

View file

@ -2,43 +2,40 @@ package patchsvc
import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object"
objectwriter "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/common/writer"
getsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/get"
putsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/put"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
)
// Service implements Put operation of Object service v2.
type Service struct {
keyStorage *util.KeyStorage
*objectwriter.Config
getSvc *getsvc.Service
putSvc *putsvc.Service
}
// NewService constructs Service instance from provided options.
func NewService(ks *util.KeyStorage, getSvc *getsvc.Service, putSvc *putsvc.Service) *Service {
//
// Patch service can use the same objectwriter.Config initializied by Put service.
func NewService(cfg *objectwriter.Config,
getSvc *getsvc.Service,
) *Service {
return &Service{
keyStorage: ks,
Config: cfg,
getSvc: getSvc,
putSvc: putSvc,
}
}
// Put calls internal service and returns v2 object streamer.
func (s *Service) Patch() (object.PatchObjectStream, error) {
nodeKey, err := s.keyStorage.GetKey(nil)
nodeKey, err := s.Config.KeyStorage.GetKey(nil)
if err != nil {
return nil, err
}
return &Streamer{
getSvc: s.getSvc,
putSvc: s.putSvc,
Config: s.Config,
getSvc: s.getSvc,
localNodeKey: nodeKey,
}, nil
}

View file

@ -9,8 +9,9 @@ import (
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
refsV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/refs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/common/target"
objectwriter "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/common/writer"
getsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/get"
putsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/put"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
@ -21,6 +22,8 @@ import (
// Streamer for the patch handler is a pipeline that merges two incoming streams of patches
// and original object payload chunks. The merged result is fed to Put stream target.
type Streamer struct {
*objectwriter.Config
// Patcher must be initialized at first Streamer.Send call.
patcher patcher.PatchApplier
@ -28,8 +31,6 @@ type Streamer struct {
getSvc *getsvc.Service
putSvc *putsvc.Service
localNodeKey *ecdsa.PrivateKey
}
@ -78,11 +79,6 @@ func (s *Streamer) init(ctx context.Context, req *objectV2.PatchRequest) error {
localNodeKey: s.localNodeKey,
}
putstm, err := s.putSvc.Put()
if err != nil {
return err
}
hdr := hdrWithSig.GetHeader()
oV2 := new(objectV2.Object)
hV2 := new(objectV2.Header)
@ -97,14 +93,14 @@ func (s *Streamer) init(ctx context.Context, req *objectV2.PatchRequest) error {
}
oV2.GetHeader().SetOwnerID(ownerID)
prm, err := s.putInitPrm(req, oV2)
target, err := target.New(&objectwriter.Params{
Config: s.Config,
Common: commonPrm,
Header: objectSDK.NewFromV2(oV2),
SignRequestPrivateKey: s.localNodeKey,
})
if err != nil {
return err
}
err = putstm.Init(ctx, prm)
if err != nil {
return err
return fmt.Errorf("target creation: %w", err)
}
patcherPrm := patcher.Params{
@ -112,7 +108,7 @@ func (s *Streamer) init(ctx context.Context, req *objectV2.PatchRequest) error {
RangeProvider: rangeProvider,
ObjectWriter: putstm.Target(),
ObjectWriter: target,
}
s.patcher = patcher.New(patcherPrm)

View file

@ -6,31 +6,12 @@ import (
"errors"
"fmt"
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/refs"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/session"
putsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/put"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
)
// putInitPrm initializes put paramerer for Put stream.
func (s *Streamer) putInitPrm(req *objectV2.PatchRequest, obj *objectV2.Object) (*putsvc.PutInitPrm, error) {
commonPrm, err := util.CommonPrmFromV2(req)
if err != nil {
return nil, err
}
prm := new(putsvc.PutInitPrm)
prm.WithObject(objectSDK.NewFromV2(obj)).
WithCommonPrm(commonPrm).
WithPrivateKey(s.localNodeKey)
return prm, nil
}
func newOwnerID(vh *session.RequestVerificationHeader) (*refs.OwnerID, error) {
for vh.GetOrigin() != nil {
vh = vh.GetOrigin()

View file

@ -1,132 +1,66 @@
package putsvc
import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
objectwriter "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/common/writer"
objutil "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
"go.uber.org/zap"
)
type MaxSizeSource interface {
// MaxObjectSize returns maximum payload size
// of physically stored object in system.
//
// Must return 0 if value can not be obtained.
MaxObjectSize() uint64
}
type Service struct {
*cfg
}
type Option func(*cfg)
type ClientConstructor interface {
Get(client.NodeInfo) (client.MultiAddressClient, error)
}
type InnerRing interface {
InnerRingKeys() ([][]byte, error)
}
type FormatValidatorConfig interface {
VerifySessionTokenIssuer() bool
}
type cfg struct {
keyStorage *objutil.KeyStorage
maxSizeSrc MaxSizeSource
localStore ObjectStorage
cnrSrc container.Source
netMapSrc netmap.Source
remotePool, localPool util.WorkerPool
netmapKeys netmap.AnnouncedKeys
fmtValidator *object.FormatValidator
networkState netmap.State
clientConstructor ClientConstructor
log *logger.Logger
verifySessionTokenIssuer bool
*objectwriter.Config
}
func NewService(ks *objutil.KeyStorage,
cc ClientConstructor,
ms MaxSizeSource,
os ObjectStorage,
cc objectwriter.ClientConstructor,
ms objectwriter.MaxSizeSource,
os objectwriter.ObjectStorage,
cs container.Source,
ns netmap.Source,
nk netmap.AnnouncedKeys,
nst netmap.State,
ir InnerRing,
opts ...Option,
ir objectwriter.InnerRing,
opts ...objectwriter.Option,
) *Service {
c := &cfg{
remotePool: util.NewPseudoWorkerPool(),
localPool: util.NewPseudoWorkerPool(),
log: &logger.Logger{Logger: zap.L()},
keyStorage: ks,
clientConstructor: cc,
maxSizeSrc: ms,
localStore: os,
cnrSrc: cs,
netMapSrc: ns,
netmapKeys: nk,
networkState: nst,
c := &objectwriter.Config{
RemotePool: util.NewPseudoWorkerPool(),
LocalPool: util.NewPseudoWorkerPool(),
Logger: &logger.Logger{Logger: zap.L()},
KeyStorage: ks,
ClientConstructor: cc,
MaxSizeSrc: ms,
LocalStore: os,
ContainerSource: cs,
NetmapSource: ns,
NetmapKeys: nk,
NetworkState: nst,
}
for i := range opts {
opts[i](c)
}
c.fmtValidator = object.NewFormatValidator(
c.FormatValidator = object.NewFormatValidator(
object.WithLockSource(os),
object.WithNetState(nst),
object.WithInnerRing(ir),
object.WithNetmapSource(ns),
object.WithContainersSource(cs),
object.WithVerifySessionTokenIssuer(c.verifySessionTokenIssuer),
object.WithLogger(c.log),
object.WithVerifySessionTokenIssuer(c.VerifySessionTokenIssuer),
object.WithLogger(c.Logger),
)
return &Service{
cfg: c,
Config: c,
}
}
func (p *Service) Put() (*Streamer, error) {
return &Streamer{
cfg: p.cfg,
Config: p.Config,
}, nil
}
func WithWorkerPools(remote, local util.WorkerPool) Option {
return func(c *cfg) {
c.remotePool, c.localPool = remote, local
}
}
func WithLogger(l *logger.Logger) Option {
return func(c *cfg) {
c.log = l
}
}
func WithVerifySessionTokenIssuer(v bool) Option {
return func(c *cfg) {
c.verifySessionTokenIssuer = v
}
}

View file

@ -21,6 +21,8 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/policy"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
fyrchik marked this conversation as resolved Outdated

PutSingle seems to differ heavily from the regular PUT, as it uses multiple exported entities from our new packages, without using more coarse chunks.
Why is it so? It seems we send the same object to some target. Optimizations?

PutSingle seems to differ heavily from the regular PUT, as it uses multiple exported entities from our new packages, without using more coarse chunks. Why is it so? It seems we send the same object to some target. Optimizations?

Ah... That's the hardest thing in this refactoring. The refactoring for single is not for optimization purpose. I believe that PutSingle is not related neither to writer nor to target. It still uses the config (currently it's Static that's going to be renamed) and NodeDescriptor.

Moving Static (Config) and common things like NodeDescriptor to one more separate package is not the big deal

Ah... That's the hardest thing in this refactoring. The refactoring for `single` is not for optimization purpose. I believe that `PutSingle` is not related neither to `writer` nor to `target`. It still uses the config (currently it's `Static` that's going to be renamed) and `NodeDescriptor`. Moving `Static` (`Config`) and common things like `NodeDescriptor` to one more separate package is not the big deal

If we go one step further and move NodeIterator to a separate package (together with it's config), will it help somehow? Or is the issue deeper?

If we go one step further and move `NodeIterator` to a separate package (together with it's config), will it help somehow? Or is the issue deeper?
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/common/target"

Why use different alias here?

Why use different alias here?

Oh, sorry. That was left after multiple renamings.
I have removed all aliases for common/target package because this package's name seems unambiguous

Oh, sorry. That was left after multiple renamings. I have removed all aliases for `common/target` package because this package's name seems unambiguous
objectwriter "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/common/writer"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/internal"
svcutil "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
@ -97,12 +99,12 @@ func (s *Service) validatePutSingle(ctx context.Context, obj *objectSDK.Object)
func (s *Service) validarePutSingleSize(obj *objectSDK.Object) error {
if uint64(len(obj.Payload())) != obj.PayloadSize() {
return ErrWrongPayloadSize
return target.ErrWrongPayloadSize
}
maxAllowedSize := s.maxSizeSrc.MaxObjectSize()
maxAllowedSize := s.Config.MaxSizeSrc.MaxObjectSize()
if obj.PayloadSize() > maxAllowedSize {
return ErrExceedingMaxSize
return target.ErrExceedingMaxSize
}
return nil
@ -137,11 +139,11 @@ func (s *Service) validatePutSingleChecksum(obj *objectSDK.Object) error {
}
func (s *Service) validatePutSingleObject(ctx context.Context, obj *objectSDK.Object) (object.ContentMeta, error) {
if err := s.fmtValidator.Validate(ctx, obj, false); err != nil {
if err := s.FormatValidator.Validate(ctx, obj, false); err != nil {
return object.ContentMeta{}, fmt.Errorf("coud not validate object format: %w", err)
}
meta, err := s.fmtValidator.ValidateContent(obj)
meta, err := s.FormatValidator.ValidateContent(obj)
if err != nil {
return object.ContentMeta{}, fmt.Errorf("could not validate payload content: %w", err)
}
@ -164,17 +166,17 @@ func (s *Service) saveToNodes(ctx context.Context, obj *objectSDK.Object, req *o
}
func (s *Service) saveToREPReplicas(ctx context.Context, placement putSinglePlacement, obj *objectSDK.Object, localOnly bool, req *objectAPI.PutSingleRequest, meta object.ContentMeta) error {
iter := s.cfg.newNodeIterator(placement.placementOptions)
iter.extraBroadcastEnabled = needAdditionalBroadcast(obj, localOnly)
iter.resetSuccessAfterOnBroadcast = placement.resetSuccessAfterOnBroadcast
iter := s.Config.NewNodeIterator(placement.placementOptions)
iter.ExtraBroadcastEnabled = objectwriter.NeedAdditionalBroadcast(obj, localOnly)
iter.ResetSuccessAfterOnBroadcast = placement.resetSuccessAfterOnBroadcast
signer := &putSingleRequestSigner{
req: req,
keyStorage: s.keyStorage,
keyStorage: s.Config.KeyStorage,
signer: &sync.Once{},
}
return iter.forEachNode(ctx, func(ctx context.Context, nd nodeDesc) error {
return iter.ForEachNode(ctx, func(ctx context.Context, nd objectwriter.NodeDescriptor) error {
return s.saveToPlacementNode(ctx, &nd, obj, signer, meta)
})
}
@ -184,25 +186,25 @@ func (s *Service) saveToECReplicas(ctx context.Context, placement putSinglePlace
if err != nil {
return err
}
key, err := s.cfg.keyStorage.GetKey(nil)
key, err := s.Config.KeyStorage.GetKey(nil)
if err != nil {
return err
}
signer := &putSingleRequestSigner{
req: req,
keyStorage: s.keyStorage,
keyStorage: s.Config.KeyStorage,
signer: &sync.Once{},
}
w := ecWriter{
cfg: s.cfg,
placementOpts: placement.placementOptions,
objMeta: meta,
objMetaValid: true,
commonPrm: commonPrm,
container: placement.container,
key: key,
relay: func(ctx context.Context, ni client.NodeInfo, mac client.MultiAddressClient) error {
w := objectwriter.ECWriter{
Config: s.Config,
PlacementOpts: placement.placementOptions,
ObjectMeta: meta,
ObjectMetaValid: true,
CommonPrm: commonPrm,
Container: placement.container,
Key: key,
Relay: func(ctx context.Context, ni client.NodeInfo, mac client.MultiAddressClient) error {
return s.redirectPutSingleRequest(ctx, signer, obj, ni, mac)
},
}
@ -223,7 +225,7 @@ func (s *Service) getPutSinglePlacementOptions(obj *objectSDK.Object, copiesNumb
if !ok {
return result, errors.New("missing container ID")
}
cnrInfo, err := s.cnrSrc.Get(cnrID)
cnrInfo, err := s.Config.ContainerSource.Get(cnrID)
if err != nil {
return result, fmt.Errorf("could not get container by ID: %w", err)
}
@ -247,31 +249,31 @@ func (s *Service) getPutSinglePlacementOptions(obj *objectSDK.Object, copiesNumb
}
result.placementOptions = append(result.placementOptions, placement.ForObject(objID))
latestNetmap, err := netmap.GetLatestNetworkMap(s.netMapSrc)
latestNetmap, err := netmap.GetLatestNetworkMap(s.Config.NetmapSource)
if err != nil {
return result, fmt.Errorf("could not get latest network map: %w", err)
}
builder := placement.NewNetworkMapBuilder(latestNetmap)
if localOnly {
result.placementOptions = append(result.placementOptions, placement.SuccessAfter(1))
builder = svcutil.NewLocalPlacement(builder, s.netmapKeys)
builder = svcutil.NewLocalPlacement(builder, s.Config.NetmapKeys)
}
result.placementOptions = append(result.placementOptions, placement.UseBuilder(builder))
return result, nil
}
func (s *Service) saveToPlacementNode(ctx context.Context, nodeDesc *nodeDesc, obj *objectSDK.Object,
func (s *Service) saveToPlacementNode(ctx context.Context, nodeDesc *objectwriter.NodeDescriptor, obj *objectSDK.Object,
signer *putSingleRequestSigner, meta object.ContentMeta,
) error {
if nodeDesc.local {
if nodeDesc.Local {
return s.saveLocal(ctx, obj, meta)
}
var info client.NodeInfo
client.NodeInfoFromNetmapElement(&info, nodeDesc.info)
client.NodeInfoFromNetmapElement(&info, nodeDesc.Info)
c, err := s.clientConstructor.Get(info)
c, err := s.Config.ClientConstructor.Get(info)
if err != nil {
return fmt.Errorf("could not create SDK client %s: %w", info.AddressGroup(), err)
}
@ -280,8 +282,8 @@ func (s *Service) saveToPlacementNode(ctx context.Context, nodeDesc *nodeDesc, o
}
func (s *Service) saveLocal(ctx context.Context, obj *objectSDK.Object, meta object.ContentMeta) error {
localTarget := &localTarget{
storage: s.localStore,
localTarget := &objectwriter.LocalTarget{
Storage: s.Config.LocalStore,
}
return localTarget.WriteObject(ctx, obj, meta)
}
@ -314,7 +316,7 @@ func (s *Service) redirectPutSingleRequest(ctx context.Context,
if err != nil {
objID, _ := obj.ID()
cnrID, _ := obj.ContainerID()
s.log.Warn(logs.PutSingleRedirectFailure,
s.Config.Logger.Warn(logs.PutSingleRedirectFailure,
zap.Error(err),
zap.Stringer("address", addr),
zap.Stringer("object_id", objID),

View file

@ -2,33 +2,21 @@ package putsvc
import (
"context"
"crypto/ecdsa"
"errors"
"fmt"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/policy"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
pkgutil "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/common/target"
objectwriter "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/common/writer"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/transformer"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
)
type Streamer struct {
*cfg
privateKey *ecdsa.PrivateKey
*objectwriter.Config
target transformer.ChunkedObjectWriter
relay func(context.Context, client.NodeInfo, client.MultiAddressClient) error
maxPayloadSz uint64 // network config
}
var errNotInit = errors.New("stream not initialized")
@ -36,8 +24,23 @@ var errNotInit = errors.New("stream not initialized")
var errInitRecall = errors.New("init recall")
func (p *Streamer) Init(ctx context.Context, prm *PutInitPrm) error {
if p.target != nil {
return errInitRecall
}
// initialize destination target
if err := p.initTarget(prm); err != nil {
prmTarget := &objectwriter.Params{
Config: p.Config,
Common: prm.common,
Header: prm.hdr,
Container: prm.cnr,
TraverseOpts: prm.traverseOpts,
Relay: p.relay,
}
var err error
p.target, err = target.New(prmTarget)
if err != nil {
return fmt.Errorf("(%T) could not initialize object target: %w", p, err)
}
@ -47,253 +50,6 @@ func (p *Streamer) Init(ctx context.Context, prm *PutInitPrm) error {
return nil
}
// Target accesses underlying target chunked object writer.
func (p *Streamer) Target() transformer.ChunkedObjectWriter {
return p.target
}
// MaxObjectSize returns maximum payload size for the streaming session.
//
// Must be called after the successful Init.
func (p *Streamer) MaxObjectSize() uint64 {
fyrchik marked this conversation as resolved Outdated

Previously this method exported private field, what do we need this method for now?

Previously this method exported private field, what do we need this method for now?

Removed

Removed
return p.maxPayloadSz
}
func (p *Streamer) initTarget(prm *PutInitPrm) error {
// prevent re-calling
if p.target != nil {
return errInitRecall
}
// prepare needed put parameters
if err := p.preparePrm(prm); err != nil {
return fmt.Errorf("(%T) could not prepare put parameters: %w", p, err)
}
p.maxPayloadSz = p.maxSizeSrc.MaxObjectSize()
if p.maxPayloadSz == 0 {
return fmt.Errorf("(%T) could not obtain max object size parameter", p)
}
if prm.hdr.Signature() != nil {
return p.initUntrustedTarget(prm)
}
return p.initTrustedTarget(prm)
}
func (p *Streamer) initUntrustedTarget(prm *PutInitPrm) error {
p.relay = prm.relay
if prm.privateKey != nil {
p.privateKey = prm.privateKey
} else {
nodeKey, err := p.cfg.keyStorage.GetKey(nil)
if err != nil {
return err
}
p.privateKey = nodeKey
}
// prepare untrusted-Put object target
p.target = &validatingPreparedTarget{
nextTarget: newInMemoryObjectBuilder(p.newObjectWriter(prm)),
fmt: p.fmtValidator,
maxPayloadSz: p.maxPayloadSz,
}
return nil
}
func (p *Streamer) initTrustedTarget(prm *PutInitPrm) error {
sToken := prm.common.SessionToken()
// prepare trusted-Put object target
// get private token from local storage
var sessionInfo *util.SessionInfo
if sToken != nil {
sessionInfo = &util.SessionInfo{
ID: sToken.ID(),
Owner: sToken.Issuer(),
}
}
key, err := p.keyStorage.GetKey(sessionInfo)
if err != nil {
return fmt.Errorf("(%T) could not receive session key: %w", p, err)
}
// In case session token is missing, the line above returns the default key.
// If it isn't owner key, replication attempts will fail, thus this check.
ownerObj := prm.hdr.OwnerID()
if ownerObj.IsEmpty() {
return errors.New("missing object owner")
}
if sToken == nil {
var ownerSession user.ID
user.IDFromKey(&ownerSession, key.PublicKey)
if !ownerObj.Equals(ownerSession) {
return fmt.Errorf("(%T) session token is missing but object owner id is different from the default key", p)
}
} else {
if !ownerObj.Equals(sessionInfo.Owner) {
return fmt.Errorf("(%T) different token issuer and object owner identifiers %s/%s", p, sessionInfo.Owner, ownerObj)
}
}
if prm.privateKey != nil {
p.privateKey = prm.privateKey
} else {
p.privateKey = key
}
p.target = &validatingTarget{
fmt: p.fmtValidator,
nextTarget: transformer.NewPayloadSizeLimiter(transformer.Params{
Key: key,
NextTargetInit: func() transformer.ObjectWriter { return p.newObjectWriter(prm) },
NetworkState: p.networkState,
MaxSize: p.maxPayloadSz,
WithoutHomomorphicHash: containerSDK.IsHomomorphicHashingDisabled(prm.cnr),
SessionToken: sToken,
}),
}
return nil
}
func (p *Streamer) preparePrm(prm *PutInitPrm) error {
var err error
// get latest network map
nm, err := netmap.GetLatestNetworkMap(p.netMapSrc)
if err != nil {
return fmt.Errorf("(%T) could not get latest network map: %w", p, err)
}
idCnr, ok := prm.hdr.ContainerID()
if !ok {
return errors.New("missing container ID")
}
// get container to store the object
cnrInfo, err := p.cnrSrc.Get(idCnr)
if err != nil {
return fmt.Errorf("(%T) could not get container by ID: %w", p, err)
}
prm.cnr = cnrInfo.Value
// add common options
prm.traverseOpts = append(prm.traverseOpts,
// set processing container
placement.ForContainer(prm.cnr),
)
if ech := prm.hdr.ECHeader(); ech != nil {
prm.traverseOpts = append(prm.traverseOpts,
// set identifier of the processing object
placement.ForObject(ech.Parent()),
)
} else if id, ok := prm.hdr.ID(); ok {
prm.traverseOpts = append(prm.traverseOpts,
// set identifier of the processing object
placement.ForObject(id),
)
}
// create placement builder from network map
builder := placement.NewNetworkMapBuilder(nm)
if prm.common.LocalOnly() {
// restrict success count to 1 stored copy (to local storage)
prm.traverseOpts = append(prm.traverseOpts, placement.SuccessAfter(1))
// use local-only placement builder
builder = util.NewLocalPlacement(builder, p.netmapKeys)
}
// set placement builder
prm.traverseOpts = append(prm.traverseOpts, placement.UseBuilder(builder))
return nil
}
func (p *Streamer) newObjectWriter(prm *PutInitPrm) transformer.ObjectWriter {
if container.IsECContainer(prm.cnr) && object.IsECSupported(prm.hdr) {
return p.newECWriter(prm)
}
return p.newDefaultObjectWriter(prm, false)
}
func (p *Streamer) newDefaultObjectWriter(prm *PutInitPrm, forECPlacement bool) transformer.ObjectWriter {
var relay func(context.Context, nodeDesc) error
if p.relay != nil {
relay = func(ctx context.Context, node nodeDesc) error {
var info client.NodeInfo
client.NodeInfoFromNetmapElement(&info, node.info)
c, err := p.clientConstructor.Get(info)
if err != nil {
return fmt.Errorf("could not create SDK client %s: %w", info.AddressGroup(), err)
}
return p.relay(ctx, info, c)
}
}
var resetSuccessAfterOnBroadcast bool
traverseOpts := prm.traverseOpts
if forECPlacement && !prm.common.LocalOnly() {
// save non-regular and linking object to EC container.
// EC 2.1 -> REP 2, EC 2.2 -> REP 3 etc.
traverseOpts = append(traverseOpts, placement.SuccessAfter(uint32(policy.ECParityCount(prm.cnr.PlacementPolicy())+1)))
resetSuccessAfterOnBroadcast = true
}
return &distributedTarget{
cfg: p.cfg,
placementOpts: traverseOpts,
resetSuccessAfterOnBroadcast: resetSuccessAfterOnBroadcast,
nodeTargetInitializer: func(node nodeDesc) preparedObjectTarget {
if node.local {
return localTarget{
storage: p.localStore,
}
}
rt := &remoteTarget{
privateKey: p.privateKey,
commonPrm: prm.common,
clientConstructor: p.clientConstructor,
}
client.NodeInfoFromNetmapElement(&rt.nodeInfo, node.info)
return rt
},
relay: relay,
}
}
func (p *Streamer) newECWriter(prm *PutInitPrm) transformer.ObjectWriter {
return &objectWriterDispatcher{
ecWriter: &ecWriter{
cfg: p.cfg,
placementOpts: append(prm.traverseOpts, placement.WithCopyNumbers(nil)), // copies number ignored for EC
container: prm.cnr,
key: p.privateKey,
commonPrm: prm.common,
relay: p.relay,
},
repWriter: p.newDefaultObjectWriter(prm, true),
}
}
func (p *Streamer) SendChunk(ctx context.Context, prm *PutChunkPrm) error {
if p.target == nil {
return errNotInit
@ -327,10 +83,3 @@ func (p *Streamer) Close(ctx context.Context) (*PutResponse, error) {
id: ids.SelfID,
}, nil
}
func (c *cfg) getWorkerPool(pub []byte) (pkgutil.WorkerPool, bool) {
if c.netmapKeys.IsLocalKey(pub) {
return c.localPool, true
}
return c.remotePool, false
}

View file

@ -11,6 +11,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/signature"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/common/target"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/internal"
internalclient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/internal/client"
putsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/put"
@ -55,7 +56,7 @@ func (s *streamer) Send(ctx context.Context, req *object.PutRequest) (err error)
s.saveChunks = v.GetSignature() != nil
if s.saveChunks {
maxSz := s.stream.MaxObjectSize()
maxSz := s.stream.MaxSizeSrc.MaxObjectSize()
s.sizes = &sizes{
payloadSz: uint64(v.GetHeader().GetPayloadLength()),
@ -63,7 +64,7 @@ func (s *streamer) Send(ctx context.Context, req *object.PutRequest) (err error)
// check payload size limit overflow
if s.payloadSz > maxSz {
return putsvc.ErrExceedingMaxSize
return target.ErrExceedingMaxSize
}
s.init = req
@ -74,7 +75,7 @@ func (s *streamer) Send(ctx context.Context, req *object.PutRequest) (err error)
// check payload size overflow
if s.writtenPayload > s.payloadSz {
return putsvc.ErrWrongPayloadSize
return target.ErrWrongPayloadSize
}
}
@ -117,7 +118,7 @@ func (s *streamer) CloseAndRecv(ctx context.Context) (*object.PutResponse, error
if s.saveChunks {
// check payload size correctness
if s.writtenPayload != s.payloadSz {
return nil, putsvc.ErrWrongPayloadSize
return nil, target.ErrWrongPayloadSize
}
}

View file

@ -5,7 +5,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
putsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/put"
objectwriter "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/common/writer"
tracingPkg "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
@ -52,7 +52,7 @@ func (p *Replicator) HandleReplicationTask(ctx context.Context, task Task, res T
}
}
prm := new(putsvc.RemotePutPrm).
prm := new(objectwriter.RemotePutPrm).
WithObject(task.Obj)
for i := 0; task.NumCopies > 0 && i < len(task.Nodes); i++ {

View file

@ -4,8 +4,8 @@ import (
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
objectwriter "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/common/writer"
getsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/get"
putsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/put"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
"go.uber.org/zap"
)
@ -24,7 +24,7 @@ type cfg struct {
log *logger.Logger
remoteSender *putsvc.RemoteSender
remoteSender *objectwriter.RemoteSender
remoteGetter *getsvc.RemoteGetter
@ -67,7 +67,7 @@ func WithLogger(v *logger.Logger) Option {
}
// WithRemoteSender returns option to set remote object sender of Replicator.
func WithRemoteSender(v *putsvc.RemoteSender) Option {
func WithRemoteSender(v *objectwriter.RemoteSender) Option {
return func(c *cfg) {
c.remoteSender = v
}