[#1310] object: Move target initialization to separate package

* Split the logic of write target initialization to different packages;
* Refactor patch and put services: since both service initialize the target
  themselves.

Signed-off-by: Airat Arifullin <a.arifullin@yadro.com>
This commit is contained in:
Airat Arifullin 2024-08-30 12:09:14 +03:00 committed by Evgenii Stratonikov
parent 7768a482b5
commit b3deb893ba
22 changed files with 599 additions and 585 deletions

View file

@ -7,7 +7,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/metrics"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
putsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/put"
objectwriter "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/common/writer"
utilSync "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/sync"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
@ -305,11 +305,11 @@ type ttlMaxObjectSizeCache struct {
mtx sync.RWMutex
lastUpdated time.Time
lastSize uint64
src putsvc.MaxSizeSource
src objectwriter.MaxSizeSource
metrics cacheMetrics
}
func newCachedMaxObjectSizeSource(src putsvc.MaxSizeSource) putsvc.MaxSizeSource {
func newCachedMaxObjectSizeSource(src objectwriter.MaxSizeSource) objectwriter.MaxSizeSource {
return &ttlMaxObjectSizeCache{
src: src,
metrics: metrics.NewCacheMetrics("max_object_size"),

View file

@ -24,6 +24,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/acl"
v2 "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/acl/v2"
objectAPE "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/ape"
objectwriter "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/common/writer"
deletesvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/delete"
deletesvcV2 "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/delete/v2"
getsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/get"
@ -188,7 +189,7 @@ func initObjectService(c *cfg) {
sDeleteV2 := createDeleteServiceV2(sDelete)
sPatch := createPatchSvc(sGet, sPut, keyStorage)
sPatch := createPatchSvc(sGet, sPut)
// build service pipeline
// grpc | audit | <metrics> | signature | response | acl | ape | split
@ -326,7 +327,7 @@ func createReplicator(c *cfg, keyStorage *util.KeyStorage, cache *cache.ClientCa
),
replicator.WithLocalStorage(ls),
replicator.WithRemoteSender(
putsvc.NewRemoteSender(keyStorage, cache),
objectwriter.NewRemoteSender(keyStorage, cache),
),
replicator.WithRemoteGetter(
getsvc.NewRemoteGetter(c.clientCache, c.netMapSource, keyStorage),
@ -338,7 +339,7 @@ func createReplicator(c *cfg, keyStorage *util.KeyStorage, cache *cache.ClientCa
func createPutSvc(c *cfg, keyStorage *util.KeyStorage, irFetcher *cachedIRFetcher) *putsvc.Service {
ls := c.cfgObject.cfgLocalStorage.localStorage
var os putsvc.ObjectStorage = engineWithoutNotifications{
var os objectwriter.ObjectStorage = engineWithoutNotifications{
engine: ls,
}
@ -352,9 +353,9 @@ func createPutSvc(c *cfg, keyStorage *util.KeyStorage, irFetcher *cachedIRFetche
c,
c.cfgNetmap.state,
irFetcher,
putsvc.WithWorkerPools(c.cfgObject.pool.putRemote, c.cfgObject.pool.putLocal),
putsvc.WithLogger(c.log),
putsvc.WithVerifySessionTokenIssuer(!c.cfgObject.skipSessionTokenIssuerVerification),
objectwriter.WithWorkerPools(c.cfgObject.pool.putRemote, c.cfgObject.pool.putLocal),
objectwriter.WithLogger(c.log),
objectwriter.WithVerifySessionTokenIssuer(!c.cfgObject.skipSessionTokenIssuerVerification),
)
}
@ -362,8 +363,8 @@ func createPutSvcV2(sPut *putsvc.Service, keyStorage *util.KeyStorage) *putsvcV2
return putsvcV2.NewService(sPut, keyStorage)
}
func createPatchSvc(sGet *getsvc.Service, sPut *putsvc.Service, keyStorage *util.KeyStorage) *patchsvc.Service {
return patchsvc.NewService(keyStorage, sGet, sPut)
func createPatchSvc(sGet *getsvc.Service, sPut *putsvc.Service) *patchsvc.Service {
return patchsvc.NewService(sPut.Config, sGet)
}
func createSearchSvc(c *cfg, keyStorage *util.KeyStorage, traverseGen *util.TraverserGenerator, coreConstructor *cache.ClientCache) *searchsvc.Service {

View file

@ -1,4 +1,4 @@
package putsvc
package target
import (
"context"

View file

@ -1,4 +1,4 @@
package putsvc
package target
import (
"sync"

View file

@ -0,0 +1,170 @@
package target
import (
"errors"
"fmt"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
objectwriter "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/common/writer"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/transformer"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
)
func New(prm *objectwriter.Params) (transformer.ChunkedObjectWriter, error) {
// prepare needed put parameters
if err := preparePrm(prm); err != nil {
return nil, fmt.Errorf("could not prepare put parameters: %w", err)
}
if prm.Header.Signature() != nil {
return newUntrustedTarget(prm)
}
return newTrustedTarget(prm)
}
func newUntrustedTarget(prm *objectwriter.Params) (transformer.ChunkedObjectWriter, error) {
maxPayloadSz := prm.Config.MaxSizeSrc.MaxObjectSize()
if maxPayloadSz == 0 {
return nil, errors.New("could not obtain max object size parameter")
}
if prm.SignRequestPrivateKey == nil {
nodeKey, err := prm.Config.KeyStorage.GetKey(nil)
if err != nil {
return nil, err
}
prm.SignRequestPrivateKey = nodeKey
}
// prepare untrusted-Put object target
return &validatingPreparedTarget{
nextTarget: newInMemoryObjectBuilder(objectwriter.New(prm)),
fmt: prm.Config.FormatValidator,
maxPayloadSz: maxPayloadSz,
}, nil
}
func newTrustedTarget(prm *objectwriter.Params) (transformer.ChunkedObjectWriter, error) {
maxPayloadSz := prm.Config.MaxSizeSrc.MaxObjectSize()
if maxPayloadSz == 0 {
return nil, errors.New("could not obtain max object size parameter")
}
sToken := prm.Common.SessionToken()
// prepare trusted-Put object target
// get private token from local storage
var sessionInfo *util.SessionInfo
if sToken != nil {
sessionInfo = &util.SessionInfo{
ID: sToken.ID(),
Owner: sToken.Issuer(),
}
}
key, err := prm.Config.KeyStorage.GetKey(sessionInfo)
if err != nil {
return nil, fmt.Errorf("could not receive session key: %w", err)
}
// In case session token is missing, the line above returns the default key.
// If it isn't owner key, replication attempts will fail, thus this check.
ownerObj := prm.Header.OwnerID()
if ownerObj.IsEmpty() {
return nil, errors.New("missing object owner")
}
if sToken == nil {
var ownerSession user.ID
user.IDFromKey(&ownerSession, key.PublicKey)
if !ownerObj.Equals(ownerSession) {
return nil, errors.New("session token is missing but object owner id is different from the default key")
}
} else {
if !ownerObj.Equals(sessionInfo.Owner) {
return nil, fmt.Errorf("different token issuer and object owner identifiers %s/%s", sessionInfo.Owner, ownerObj)
}
}
if prm.SignRequestPrivateKey == nil {
prm.SignRequestPrivateKey = key
}
return &validatingTarget{
fmt: prm.Config.FormatValidator,
nextTarget: transformer.NewPayloadSizeLimiter(transformer.Params{
Key: key,
NextTargetInit: func() transformer.ObjectWriter { return objectwriter.New(prm) },
NetworkState: prm.Config.NetworkState,
MaxSize: maxPayloadSz,
WithoutHomomorphicHash: containerSDK.IsHomomorphicHashingDisabled(prm.Container),
SessionToken: sToken,
}),
}, nil
}
func preparePrm(prm *objectwriter.Params) error {
var err error
// get latest network map
nm, err := netmap.GetLatestNetworkMap(prm.Config.NetmapSource)
if err != nil {
//return fmt.Errorf("(%T) could not get latest network map: %w", p, err)
return fmt.Errorf("could not get latest network map: %w", err)
}
idCnr, ok := prm.Header.ContainerID()
if !ok {
return errors.New("missing container ID")
}
// get container to store the object
cnrInfo, err := prm.Config.ContainerSource.Get(idCnr)
if err != nil {
//return fmt.Errorf("(%T) could not get container by ID: %w", p, err)
return fmt.Errorf("could not get container by ID: %w", err)
}
prm.Container = cnrInfo.Value
// add common options
prm.TraverseOpts = append(prm.TraverseOpts,
// set processing container
placement.ForContainer(prm.Container),
)
if ech := prm.Header.ECHeader(); ech != nil {
prm.TraverseOpts = append(prm.TraverseOpts,
// set identifier of the processing object
placement.ForObject(ech.Parent()),
)
} else if id, ok := prm.Header.ID(); ok {
prm.TraverseOpts = append(prm.TraverseOpts,
// set identifier of the processing object
placement.ForObject(id),
)
}
// create placement builder from network map
builder := placement.NewNetworkMapBuilder(nm)
if prm.Common.LocalOnly() {
// restrict success count to 1 stored copy (to local storage)
prm.TraverseOpts = append(prm.TraverseOpts, placement.SuccessAfter(1))
// use local-only placement builder
builder = util.NewLocalPlacement(builder, prm.Config.NetmapKeys)
}
// set placement builder
prm.TraverseOpts = append(prm.TraverseOpts, placement.UseBuilder(builder))
return nil
}

View file

@ -1,4 +1,4 @@
package putsvc
package target
import (
"bytes"

View file

@ -1,4 +1,4 @@
package putsvc
package writer
import (
"context"
@ -13,23 +13,23 @@ import (
"go.uber.org/zap"
)
type nodeIterator struct {
traversal
cfg *cfg
type NodeIterator struct {
Traversal
cfg *Config
}
func (c *cfg) newNodeIterator(opts []placement.Option) *nodeIterator {
return &nodeIterator{
traversal: traversal{
opts: opts,
mExclude: make(map[string]*bool),
func (c *Config) NewNodeIterator(opts []placement.Option) *NodeIterator {
return &NodeIterator{
Traversal: Traversal{
Opts: opts,
Exclude: make(map[string]*bool),
},
cfg: c,
}
}
func (n *nodeIterator) forEachNode(ctx context.Context, f func(context.Context, nodeDesc) error) error {
traverser, err := placement.NewTraverser(n.traversal.opts...)
func (n *NodeIterator) ForEachNode(ctx context.Context, f func(context.Context, NodeDescriptor) error) error {
traverser, err := placement.NewTraverser(n.Traversal.Opts...)
if err != nil {
return fmt.Errorf("could not create object placement traverser: %w", err)
}
@ -56,10 +56,10 @@ func (n *nodeIterator) forEachNode(ctx context.Context, f func(context.Context,
}
// perform additional container broadcast if needed
if n.traversal.submitPrimaryPlacementFinish() {
err := n.forEachNode(ctx, f)
if n.Traversal.submitPrimaryPlacementFinish() {
err := n.ForEachNode(ctx, f)
if err != nil {
n.cfg.log.Error(logs.PutAdditionalContainerBroadcastFailure, zap.Error(err))
n.cfg.Logger.Error(logs.PutAdditionalContainerBroadcastFailure, zap.Error(err))
// we don't fail primary operation because of broadcast failure
}
}
@ -67,11 +67,11 @@ func (n *nodeIterator) forEachNode(ctx context.Context, f func(context.Context,
return nil
}
func (n *nodeIterator) forEachAddress(ctx context.Context, traverser *placement.Traverser, addrs []placement.Node, f func(context.Context, nodeDesc) error, resErr *atomic.Value) bool {
func (n *NodeIterator) forEachAddress(ctx context.Context, traverser *placement.Traverser, addrs []placement.Node, f func(context.Context, NodeDescriptor) error, resErr *atomic.Value) bool {
var wg sync.WaitGroup
for _, addr := range addrs {
if ok := n.mExclude[string(addr.PublicKey())]; ok != nil {
if ok := n.Exclude[string(addr.PublicKey())]; ok != nil {
if *ok {
traverser.SubmitSuccess()
}
@ -86,10 +86,10 @@ func (n *nodeIterator) forEachAddress(ctx context.Context, traverser *placement.
if err := workerPool.Submit(func() {
defer wg.Done()
err := f(ctx, nodeDesc{local: isLocal, info: addr})
err := f(ctx, NodeDescriptor{Local: isLocal, Info: addr})
if err != nil {
resErr.Store(err)
svcutil.LogServiceError(n.cfg.log, "PUT", addr.Addresses(), err)
svcutil.LogServiceError(n.cfg.Logger, "PUT", addr.Addresses(), err)
return
}
@ -97,7 +97,7 @@ func (n *nodeIterator) forEachAddress(ctx context.Context, traverser *placement.
*item = true
}); err != nil {
wg.Done()
svcutil.LogWorkerPoolError(n.cfg.log, "PUT", err)
svcutil.LogWorkerPoolError(n.cfg.Logger, "PUT", err)
return true
}
@ -105,7 +105,7 @@ func (n *nodeIterator) forEachAddress(ctx context.Context, traverser *placement.
// in subsequent container broadcast. Note that we don't
// process this node during broadcast if primary placement
// on it failed.
n.traversal.submitProcessed(addr, item)
n.Traversal.submitProcessed(addr, item)
}
wg.Wait()
@ -113,6 +113,6 @@ func (n *nodeIterator) forEachAddress(ctx context.Context, traverser *placement.
return false
}
func needAdditionalBroadcast(obj *objectSDK.Object, localOnly bool) bool {
func NeedAdditionalBroadcast(obj *objectSDK.Object, localOnly bool) bool {
return len(obj.Children()) > 0 || (!localOnly && (obj.Type() == objectSDK.TypeTombstone || obj.Type() == objectSDK.TypeLock))
}

View file

@ -1,4 +1,4 @@
package putsvc
package writer
import (
"context"

View file

@ -1,4 +1,4 @@
package putsvc
package writer
import (
"context"
@ -13,47 +13,47 @@ type preparedObjectTarget interface {
WriteObject(context.Context, *objectSDK.Object, object.ContentMeta) error
}
type distributedTarget struct {
type distributedWriter struct {
cfg *Config
placementOpts []placement.Option
obj *objectSDK.Object
objMeta object.ContentMeta
*cfg
nodeTargetInitializer func(NodeDescriptor) preparedObjectTarget
nodeTargetInitializer func(nodeDesc) preparedObjectTarget
relay func(context.Context, nodeDesc) error
relay func(context.Context, NodeDescriptor) error
resetSuccessAfterOnBroadcast bool
}
// parameters and state of container traversal.
type traversal struct {
opts []placement.Option
// parameters and state of container Traversal.
type Traversal struct {
Opts []placement.Option
// need of additional broadcast after the object is saved
extraBroadcastEnabled bool
ExtraBroadcastEnabled bool
// container nodes which was processed during the primary object placement
mExclude map[string]*bool
Exclude map[string]*bool
resetSuccessAfterOnBroadcast bool
ResetSuccessAfterOnBroadcast bool
}
// updates traversal parameters after the primary placement finish and
// returns true if additional container broadcast is needed.
func (x *traversal) submitPrimaryPlacementFinish() bool {
if x.extraBroadcastEnabled {
func (x *Traversal) submitPrimaryPlacementFinish() bool {
if x.ExtraBroadcastEnabled {
// do not track success during container broadcast (best-effort)
x.opts = append(x.opts, placement.WithoutSuccessTracking())
x.Opts = append(x.Opts, placement.WithoutSuccessTracking())
if x.resetSuccessAfterOnBroadcast {
x.opts = append(x.opts, placement.ResetSuccessAfter())
if x.ResetSuccessAfterOnBroadcast {
x.Opts = append(x.Opts, placement.ResetSuccessAfter())
}
// avoid 2nd broadcast
x.extraBroadcastEnabled = false
x.ExtraBroadcastEnabled = false
return true
}
@ -62,22 +62,22 @@ func (x *traversal) submitPrimaryPlacementFinish() bool {
}
// marks the container node as processed during the primary object placement.
func (x *traversal) submitProcessed(n placement.Node, item *bool) {
if x.extraBroadcastEnabled {
func (x *Traversal) submitProcessed(n placement.Node, item *bool) {
if x.ExtraBroadcastEnabled {
key := string(n.PublicKey())
if x.mExclude == nil {
x.mExclude = make(map[string]*bool, 1)
if x.Exclude == nil {
x.Exclude = make(map[string]*bool, 1)
}
x.mExclude[key] = item
x.Exclude[key] = item
}
}
type nodeDesc struct {
local bool
type NodeDescriptor struct {
Local bool
info placement.Node
Info placement.Node
}
// errIncompletePut is returned if processing on a container fails.
@ -96,19 +96,19 @@ func (x errIncompletePut) Error() string {
}
// WriteObject implements the transformer.ObjectWriter interface.
func (t *distributedTarget) WriteObject(ctx context.Context, obj *objectSDK.Object) error {
func (t *distributedWriter) WriteObject(ctx context.Context, obj *objectSDK.Object) error {
t.obj = obj
var err error
if t.objMeta, err = t.fmtValidator.ValidateContent(t.obj); err != nil {
if t.objMeta, err = t.cfg.FormatValidator.ValidateContent(t.obj); err != nil {
return fmt.Errorf("(%T) could not validate payload content: %w", t, err)
}
return t.iteratePlacement(ctx)
}
func (t *distributedTarget) sendObject(ctx context.Context, node nodeDesc) error {
if !node.local && t.relay != nil {
func (t *distributedWriter) sendObject(ctx context.Context, node NodeDescriptor) error {
if !node.Local && t.relay != nil {
return t.relay(ctx, node)
}
@ -121,11 +121,11 @@ func (t *distributedTarget) sendObject(ctx context.Context, node nodeDesc) error
return nil
}
func (t *distributedTarget) iteratePlacement(ctx context.Context) error {
func (t *distributedWriter) iteratePlacement(ctx context.Context) error {
id, _ := t.obj.ID()
iter := t.cfg.newNodeIterator(append(t.placementOpts, placement.ForObject(id)))
iter.extraBroadcastEnabled = needAdditionalBroadcast(t.obj, false /* Distributed target is for cluster-wide PUT */)
iter.resetSuccessAfterOnBroadcast = t.resetSuccessAfterOnBroadcast
return iter.forEachNode(ctx, t.sendObject)
iter := t.cfg.NewNodeIterator(append(t.placementOpts, placement.ForObject(id)))
iter.ExtraBroadcastEnabled = NeedAdditionalBroadcast(t.obj, false /* Distributed target is for cluster-wide PUT */)
iter.ResetSuccessAfterOnBroadcast = t.resetSuccessAfterOnBroadcast
return iter.ForEachNode(ctx, t.sendObject)
}

View file

@ -1,4 +1,4 @@
package putsvc
package writer
import (
"context"
@ -23,23 +23,23 @@ import (
"golang.org/x/sync/errgroup"
)
var _ transformer.ObjectWriter = (*ecWriter)(nil)
var _ transformer.ObjectWriter = (*ECWriter)(nil)
var errUnsupportedECObject = errors.New("object is not supported for erasure coding")
type ecWriter struct {
cfg *cfg
placementOpts []placement.Option
container containerSDK.Container
key *ecdsa.PrivateKey
commonPrm *svcutil.CommonPrm
relay func(context.Context, client.NodeInfo, client.MultiAddressClient) error
type ECWriter struct {
Config *Config
PlacementOpts []placement.Option
Container containerSDK.Container
Key *ecdsa.PrivateKey
CommonPrm *svcutil.CommonPrm
Relay func(context.Context, client.NodeInfo, client.MultiAddressClient) error
objMeta object.ContentMeta
objMetaValid bool
ObjectMeta object.ContentMeta
ObjectMetaValid bool
}
func (e *ecWriter) WriteObject(ctx context.Context, obj *objectSDK.Object) error {
func (e *ECWriter) WriteObject(ctx context.Context, obj *objectSDK.Object) error {
relayed, err := e.relayIfNotContainerNode(ctx, obj)
if err != nil {
return err
@ -53,11 +53,11 @@ func (e *ecWriter) WriteObject(ctx context.Context, obj *objectSDK.Object) error
return errUnsupportedECObject
}
if !e.objMetaValid {
if e.objMeta, err = e.cfg.fmtValidator.ValidateContent(obj); err != nil {
if !e.ObjectMetaValid {
if e.ObjectMeta, err = e.Config.FormatValidator.ValidateContent(obj); err != nil {
return fmt.Errorf("(%T) could not validate payload content: %w", e, err)
}
e.objMetaValid = true
e.ObjectMetaValid = true
}
if obj.ECHeader() != nil {
@ -66,8 +66,8 @@ func (e *ecWriter) WriteObject(ctx context.Context, obj *objectSDK.Object) error
return e.writeRawObject(ctx, obj)
}
func (e *ecWriter) relayIfNotContainerNode(ctx context.Context, obj *objectSDK.Object) (bool, error) {
if e.relay == nil {
func (e *ECWriter) relayIfNotContainerNode(ctx context.Context, obj *objectSDK.Object) (bool, error) {
if e.Relay == nil {
return false, nil
}
currentNodeIsContainerNode, err := e.currentNodeIsContainerNode()
@ -90,8 +90,8 @@ func (e *ecWriter) relayIfNotContainerNode(ctx context.Context, obj *objectSDK.O
return true, nil
}
func (e *ecWriter) currentNodeIsContainerNode() (bool, error) {
t, err := placement.NewTraverser(e.placementOpts...)
func (e *ECWriter) currentNodeIsContainerNode() (bool, error) {
t, err := placement.NewTraverser(e.PlacementOpts...)
if err != nil {
return false, err
}
@ -101,7 +101,7 @@ func (e *ecWriter) currentNodeIsContainerNode() (bool, error) {
break
}
for _, node := range nodes {
if e.cfg.netmapKeys.IsLocalKey(node.PublicKey()) {
if e.Config.NetmapKeys.IsLocalKey(node.PublicKey()) {
return true, nil
}
}
@ -109,8 +109,8 @@ func (e *ecWriter) currentNodeIsContainerNode() (bool, error) {
return false, nil
}
func (e *ecWriter) relayToContainerNode(ctx context.Context, objID oid.ID, index uint32) error {
t, err := placement.NewTraverser(append(e.placementOpts, placement.ForObject(objID))...)
func (e *ECWriter) relayToContainerNode(ctx context.Context, objID oid.ID, index uint32) error {
t, err := placement.NewTraverser(append(e.PlacementOpts, placement.ForObject(objID))...)
if err != nil {
return err
}
@ -126,18 +126,18 @@ func (e *ecWriter) relayToContainerNode(ctx context.Context, objID oid.ID, index
var info client.NodeInfo
client.NodeInfoFromNetmapElement(&info, node)
c, err := e.cfg.clientConstructor.Get(info)
c, err := e.Config.ClientConstructor.Get(info)
if err != nil {
return fmt.Errorf("could not create SDK client %s: %w", info.AddressGroup(), err)
}
completed := make(chan interface{})
if poolErr := e.cfg.remotePool.Submit(func() {
if poolErr := e.Config.RemotePool.Submit(func() {
defer close(completed)
err = e.relay(ctx, info, c)
err = e.Relay(ctx, info, c)
}); poolErr != nil {
close(completed)
svcutil.LogWorkerPoolError(e.cfg.log, "PUT", poolErr)
svcutil.LogWorkerPoolError(e.Config.Logger, "PUT", poolErr)
return poolErr
}
<-completed
@ -145,7 +145,7 @@ func (e *ecWriter) relayToContainerNode(ctx context.Context, objID oid.ID, index
if err == nil {
return nil
}
e.cfg.log.Logger.Warn(logs.ECFailedToSendToContainerNode, zap.Stringers("address_group", info.AddressGroup()))
e.Config.Logger.Logger.Warn(logs.ECFailedToSendToContainerNode, zap.Stringers("address_group", info.AddressGroup()))
lastErr = err
}
}
@ -157,12 +157,12 @@ func (e *ecWriter) relayToContainerNode(ctx context.Context, objID oid.ID, index
}
}
func (e *ecWriter) writeECPart(ctx context.Context, obj *objectSDK.Object) error {
if e.commonPrm.LocalOnly() {
func (e *ECWriter) writeECPart(ctx context.Context, obj *objectSDK.Object) error {
if e.CommonPrm.LocalOnly() {
return e.writePartLocal(ctx, obj)
}
t, err := placement.NewTraverser(append(e.placementOpts, placement.ForObject(obj.ECHeader().Parent()))...)
t, err := placement.NewTraverser(append(e.PlacementOpts, placement.ForObject(obj.ECHeader().Parent()))...)
if err != nil {
return err
}
@ -187,18 +187,18 @@ func (e *ecWriter) writeECPart(ctx context.Context, obj *objectSDK.Object) error
return nil
}
func (e *ecWriter) writeRawObject(ctx context.Context, obj *objectSDK.Object) error {
func (e *ECWriter) writeRawObject(ctx context.Context, obj *objectSDK.Object) error {
// now only single EC policy is supported
c, err := erasurecode.NewConstructor(policy.ECDataCount(e.container.PlacementPolicy()), policy.ECParityCount(e.container.PlacementPolicy()))
c, err := erasurecode.NewConstructor(policy.ECDataCount(e.Container.PlacementPolicy()), policy.ECParityCount(e.Container.PlacementPolicy()))
if err != nil {
return err
}
parts, err := c.Split(obj, e.key)
parts, err := c.Split(obj, e.Key)
if err != nil {
return err
}
objID, _ := obj.ID()
t, err := placement.NewTraverser(append(e.placementOpts, placement.ForObject(objID))...)
t, err := placement.NewTraverser(append(e.PlacementOpts, placement.ForObject(objID))...)
if err != nil {
return err
}
@ -230,7 +230,7 @@ func (e *ecWriter) writeRawObject(ctx context.Context, obj *objectSDK.Object) er
return nil
}
func (e *ecWriter) writePart(ctx context.Context, obj *objectSDK.Object, partIdx int, nodes []placement.Node, visited []atomic.Bool) error {
func (e *ECWriter) writePart(ctx context.Context, obj *objectSDK.Object, partIdx int, nodes []placement.Node, visited []atomic.Bool) error {
select {
case <-ctx.Done():
return ctx.Err()
@ -243,7 +243,7 @@ func (e *ecWriter) writePart(ctx context.Context, obj *objectSDK.Object, partIdx
if err == nil {
return nil
}
e.cfg.log.Warn(logs.ECFailedToSaveECPart, zap.Stringer("part_address", object.AddressOf(obj)),
e.Config.Logger.Warn(logs.ECFailedToSaveECPart, zap.Stringer("part_address", object.AddressOf(obj)),
zap.Stringer("parent_address", obj.ECHeader().Parent()), zap.Int("part_index", partIdx),
zap.String("node", hex.EncodeToString(node.PublicKey())), zap.Error(err))
@ -267,7 +267,7 @@ func (e *ecWriter) writePart(ctx context.Context, obj *objectSDK.Object, partIdx
if err == nil {
return nil
}
e.cfg.log.Warn(logs.ECFailedToSaveECPart, zap.Stringer("part_address", object.AddressOf(obj)),
e.Config.Logger.Warn(logs.ECFailedToSaveECPart, zap.Stringer("part_address", object.AddressOf(obj)),
zap.Stringer("parent_address", obj.ECHeader().Parent()), zap.Int("part_index", partIdx),
zap.String("node", hex.EncodeToString(node.PublicKey())),
zap.Error(err))
@ -291,7 +291,7 @@ func (e *ecWriter) writePart(ctx context.Context, obj *objectSDK.Object, partIdx
if err == nil {
return nil
}
e.cfg.log.Warn(logs.ECFailedToSaveECPart, zap.Stringer("part_address", object.AddressOf(obj)),
e.Config.Logger.Warn(logs.ECFailedToSaveECPart, zap.Stringer("part_address", object.AddressOf(obj)),
zap.Stringer("parent_address", obj.ECHeader().Parent()), zap.Int("part_index", partIdx),
zap.String("node", hex.EncodeToString(node.PublicKey())),
zap.Error(err))
@ -300,22 +300,22 @@ func (e *ecWriter) writePart(ctx context.Context, obj *objectSDK.Object, partIdx
return fmt.Errorf("failed to save EC chunk %s to any node", object.AddressOf(obj))
}
func (e *ecWriter) putECPartToNode(ctx context.Context, obj *objectSDK.Object, node placement.Node) error {
if e.cfg.netmapKeys.IsLocalKey(node.PublicKey()) {
func (e *ECWriter) putECPartToNode(ctx context.Context, obj *objectSDK.Object, node placement.Node) error {
if e.Config.NetmapKeys.IsLocalKey(node.PublicKey()) {
return e.writePartLocal(ctx, obj)
}
return e.writePartRemote(ctx, obj, node)
}
func (e *ecWriter) writePartLocal(ctx context.Context, obj *objectSDK.Object) error {
func (e *ECWriter) writePartLocal(ctx context.Context, obj *objectSDK.Object) error {
var err error
localTarget := localTarget{
storage: e.cfg.localStore,
localTarget := LocalTarget{
Storage: e.Config.LocalStore,
}
completed := make(chan interface{})
if poolErr := e.cfg.localPool.Submit(func() {
if poolErr := e.Config.LocalPool.Submit(func() {
defer close(completed)
err = localTarget.WriteObject(ctx, obj, e.objMeta)
err = localTarget.WriteObject(ctx, obj, e.ObjectMeta)
}); poolErr != nil {
close(completed)
return poolErr
@ -324,22 +324,22 @@ func (e *ecWriter) writePartLocal(ctx context.Context, obj *objectSDK.Object) er
return err
}
func (e *ecWriter) writePartRemote(ctx context.Context, obj *objectSDK.Object, node placement.Node) error {
func (e *ECWriter) writePartRemote(ctx context.Context, obj *objectSDK.Object, node placement.Node) error {
var clientNodeInfo client.NodeInfo
client.NodeInfoFromNetmapElement(&clientNodeInfo, node)
remoteTaget := remoteTarget{
privateKey: e.key,
clientConstructor: e.cfg.clientConstructor,
commonPrm: e.commonPrm,
remoteTaget := remoteWriter{
privateKey: e.Key,
clientConstructor: e.Config.ClientConstructor,
commonPrm: e.CommonPrm,
nodeInfo: clientNodeInfo,
}
var err error
completed := make(chan interface{})
if poolErr := e.cfg.remotePool.Submit(func() {
if poolErr := e.Config.RemotePool.Submit(func() {
defer close(completed)
err = remoteTaget.WriteObject(ctx, obj, e.objMeta)
err = remoteTaget.WriteObject(ctx, obj, e.ObjectMeta)
}); poolErr != nil {
close(completed)
return poolErr

View file

@ -1,4 +1,4 @@
package putsvc
package writer
import (
"context"
@ -24,19 +24,19 @@ type ObjectStorage interface {
IsLocked(context.Context, oid.Address) (bool, error)
}
type localTarget struct {
storage ObjectStorage
type LocalTarget struct {
Storage ObjectStorage
}
func (t localTarget) WriteObject(ctx context.Context, obj *objectSDK.Object, meta objectCore.ContentMeta) error {
func (t LocalTarget) WriteObject(ctx context.Context, obj *objectSDK.Object, meta objectCore.ContentMeta) error {
switch meta.Type() {
case objectSDK.TypeTombstone:
err := t.storage.Delete(ctx, objectCore.AddressOf(obj), meta.Objects())
err := t.Storage.Delete(ctx, objectCore.AddressOf(obj), meta.Objects())
if err != nil {
return fmt.Errorf("could not delete objects from tombstone locally: %w", err)
}
case objectSDK.TypeLock:
err := t.storage.Lock(ctx, objectCore.AddressOf(obj), meta.Objects())
err := t.Storage.Lock(ctx, objectCore.AddressOf(obj), meta.Objects())
if err != nil {
return fmt.Errorf("could not lock object from lock objects locally: %w", err)
}
@ -44,7 +44,7 @@ func (t localTarget) WriteObject(ctx context.Context, obj *objectSDK.Object, met
// objects that do not change meta storage
}
if err := t.storage.Put(ctx, obj); err != nil {
if err := t.Storage.Put(ctx, obj); err != nil {
return fmt.Errorf("(%T) could not put object to local storage: %w", t, err)
}
return nil

View file

@ -1,4 +1,4 @@
package putsvc
package writer
import (
"context"
@ -16,7 +16,7 @@ import (
"google.golang.org/grpc/status"
)
type remoteTarget struct {
type remoteWriter struct {
privateKey *ecdsa.PrivateKey
commonPrm *util.CommonPrm
@ -41,7 +41,7 @@ type RemotePutPrm struct {
obj *objectSDK.Object
}
func (t *remoteTarget) WriteObject(ctx context.Context, obj *objectSDK.Object, _ objectcore.ContentMeta) error {
func (t *remoteWriter) WriteObject(ctx context.Context, obj *objectSDK.Object, _ objectcore.ContentMeta) error {
c, err := t.clientConstructor.Get(t.nodeInfo)
if err != nil {
return fmt.Errorf("(%T) could not create SDK client %s: %w", t, t.nodeInfo, err)
@ -64,7 +64,7 @@ func (t *remoteTarget) WriteObject(ctx context.Context, obj *objectSDK.Object, _
return t.putStream(ctx, prm)
}
func (t *remoteTarget) putStream(ctx context.Context, prm internalclient.PutObjectPrm) error {
func (t *remoteWriter) putStream(ctx context.Context, prm internalclient.PutObjectPrm) error {
_, err := internalclient.PutObject(ctx, prm)
if err != nil {
return fmt.Errorf("(%T) could not put object to %s: %w", t, t.nodeInfo.AddressGroup(), err)
@ -72,7 +72,7 @@ func (t *remoteTarget) putStream(ctx context.Context, prm internalclient.PutObje
return nil
}
func (t *remoteTarget) putSingle(ctx context.Context, prm internalclient.PutObjectPrm) error {
func (t *remoteWriter) putSingle(ctx context.Context, prm internalclient.PutObjectPrm) error {
_, err := internalclient.PutObjectSingle(ctx, prm)
if err != nil {
return fmt.Errorf("(%T) could not put single object to %s: %w", t, t.nodeInfo.AddressGroup(), err)
@ -113,7 +113,7 @@ func (s *RemoteSender) PutObject(ctx context.Context, p *RemotePutPrm) error {
return err
}
t := &remoteTarget{
t := &remoteWriter{
privateKey: key,
clientConstructor: s.clientConstructor,
}

View file

@ -0,0 +1,183 @@
package writer
import (
"context"
"crypto/ecdsa"
"fmt"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/policy"
objutil "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/transformer"
)
type MaxSizeSource interface {
// MaxObjectSize returns maximum payload size
// of physically stored object in system.
//
// Must return 0 if value can not be obtained.
MaxObjectSize() uint64
}
type ClientConstructor interface {
Get(client.NodeInfo) (client.MultiAddressClient, error)
}
type InnerRing interface {
InnerRingKeys() ([][]byte, error)
}
type FormatValidatorConfig interface {
VerifySessionTokenIssuer() bool
}
// Config represents a set of static parameters that are established during
// the initialization phase of all services.
type Config struct {
KeyStorage *objutil.KeyStorage
MaxSizeSrc MaxSizeSource
LocalStore ObjectStorage
ContainerSource container.Source
NetmapSource netmap.Source
RemotePool, LocalPool util.WorkerPool
NetmapKeys netmap.AnnouncedKeys
FormatValidator *object.FormatValidator
NetworkState netmap.State
ClientConstructor ClientConstructor
Logger *logger.Logger
VerifySessionTokenIssuer bool
}
type Option func(*Config)
func WithWorkerPools(remote, local util.WorkerPool) Option {
return func(c *Config) {
c.RemotePool, c.LocalPool = remote, local
}
}
func WithLogger(l *logger.Logger) Option {
return func(c *Config) {
c.Logger = l
}
}
func WithVerifySessionTokenIssuer(v bool) Option {
return func(c *Config) {
c.VerifySessionTokenIssuer = v
}
}
func (c *Config) getWorkerPool(pub []byte) (util.WorkerPool, bool) {
if c.NetmapKeys.IsLocalKey(pub) {
return c.LocalPool, true
}
return c.RemotePool, false
}
type Params struct {
Config *Config
Common *objutil.CommonPrm
Header *objectSDK.Object
Container containerSDK.Container
TraverseOpts []placement.Option
Relay func(context.Context, client.NodeInfo, client.MultiAddressClient) error
SignRequestPrivateKey *ecdsa.PrivateKey
}
func New(prm *Params) transformer.ObjectWriter {
if container.IsECContainer(prm.Container) && object.IsECSupported(prm.Header) {
return newECWriter(prm)
}
return newDefaultObjectWriter(prm, false)
}
func newDefaultObjectWriter(prm *Params, forECPlacement bool) transformer.ObjectWriter {
var relay func(context.Context, NodeDescriptor) error
if prm.Relay != nil {
relay = func(ctx context.Context, node NodeDescriptor) error {
var info client.NodeInfo
client.NodeInfoFromNetmapElement(&info, node.Info)
c, err := prm.Config.ClientConstructor.Get(info)
if err != nil {
return fmt.Errorf("could not create SDK client %s: %w", info.AddressGroup(), err)
}
return prm.Relay(ctx, info, c)
}
}
var resetSuccessAfterOnBroadcast bool
traverseOpts := prm.TraverseOpts
if forECPlacement && !prm.Common.LocalOnly() {
// save non-regular and linking object to EC container.
// EC 2.1 -> REP 2, EC 2.2 -> REP 3 etc.
traverseOpts = append(traverseOpts, placement.SuccessAfter(uint32(policy.ECParityCount(prm.Container.PlacementPolicy())+1)))
resetSuccessAfterOnBroadcast = true
}
return &distributedWriter{
cfg: prm.Config,
placementOpts: traverseOpts,
resetSuccessAfterOnBroadcast: resetSuccessAfterOnBroadcast,
nodeTargetInitializer: func(node NodeDescriptor) preparedObjectTarget {
if node.Local {
return LocalTarget{
Storage: prm.Config.LocalStore,
}
}
rt := &remoteWriter{
privateKey: prm.SignRequestPrivateKey,
commonPrm: prm.Common,
clientConstructor: prm.Config.ClientConstructor,
}
client.NodeInfoFromNetmapElement(&rt.nodeInfo, node.Info)
return rt
},
relay: relay,
}
}
func newECWriter(prm *Params) transformer.ObjectWriter {
return &objectWriterDispatcher{
ecWriter: &ECWriter{
Config: prm.Config,
PlacementOpts: append(prm.TraverseOpts, placement.WithCopyNumbers(nil)), // copies number ignored for EC
Container: prm.Container,
Key: prm.SignRequestPrivateKey,
CommonPrm: prm.Common,
Relay: prm.Relay,
},
repWriter: newDefaultObjectWriter(prm, true),
}
}

View file

@ -2,43 +2,40 @@ package patchsvc
import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object"
objectwriter "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/common/writer"
getsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/get"
putsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/put"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
)
// Service implements Put operation of Object service v2.
type Service struct {
keyStorage *util.KeyStorage
*objectwriter.Config
getSvc *getsvc.Service
putSvc *putsvc.Service
}
// NewService constructs Service instance from provided options.
func NewService(ks *util.KeyStorage, getSvc *getsvc.Service, putSvc *putsvc.Service) *Service {
//
// Patch service can use the same objectwriter.Config initializied by Put service.
func NewService(cfg *objectwriter.Config,
getSvc *getsvc.Service,
) *Service {
return &Service{
keyStorage: ks,
Config: cfg,
getSvc: getSvc,
putSvc: putSvc,
}
}
// Put calls internal service and returns v2 object streamer.
func (s *Service) Patch() (object.PatchObjectStream, error) {
nodeKey, err := s.keyStorage.GetKey(nil)
nodeKey, err := s.Config.KeyStorage.GetKey(nil)
if err != nil {
return nil, err
}
return &Streamer{
getSvc: s.getSvc,
putSvc: s.putSvc,
Config: s.Config,
getSvc: s.getSvc,
localNodeKey: nodeKey,
}, nil
}

View file

@ -9,8 +9,9 @@ import (
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
refsV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/refs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/common/target"
objectwriter "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/common/writer"
getsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/get"
putsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/put"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
@ -21,6 +22,8 @@ import (
// Streamer for the patch handler is a pipeline that merges two incoming streams of patches
// and original object payload chunks. The merged result is fed to Put stream target.
type Streamer struct {
*objectwriter.Config
// Patcher must be initialized at first Streamer.Send call.
patcher patcher.PatchApplier
@ -28,8 +31,6 @@ type Streamer struct {
getSvc *getsvc.Service
putSvc *putsvc.Service
localNodeKey *ecdsa.PrivateKey
}
@ -78,11 +79,6 @@ func (s *Streamer) init(ctx context.Context, req *objectV2.PatchRequest) error {
localNodeKey: s.localNodeKey,
}
putstm, err := s.putSvc.Put()
if err != nil {
return err
}
hdr := hdrWithSig.GetHeader()
oV2 := new(objectV2.Object)
hV2 := new(objectV2.Header)
@ -97,14 +93,14 @@ func (s *Streamer) init(ctx context.Context, req *objectV2.PatchRequest) error {
}
oV2.GetHeader().SetOwnerID(ownerID)
prm, err := s.putInitPrm(req, oV2)
target, err := target.New(&objectwriter.Params{
Config: s.Config,
Common: commonPrm,
Header: objectSDK.NewFromV2(oV2),
SignRequestPrivateKey: s.localNodeKey,
})
if err != nil {
return err
}
err = putstm.Init(ctx, prm)
if err != nil {
return err
return fmt.Errorf("target creation: %w", err)
}
patcherPrm := patcher.Params{
@ -112,7 +108,7 @@ func (s *Streamer) init(ctx context.Context, req *objectV2.PatchRequest) error {
RangeProvider: rangeProvider,
ObjectWriter: putstm.Target(),
ObjectWriter: target,
}
s.patcher = patcher.New(patcherPrm)

View file

@ -6,31 +6,12 @@ import (
"errors"
"fmt"
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/refs"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/session"
putsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/put"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
)
// putInitPrm initializes put paramerer for Put stream.
func (s *Streamer) putInitPrm(req *objectV2.PatchRequest, obj *objectV2.Object) (*putsvc.PutInitPrm, error) {
commonPrm, err := util.CommonPrmFromV2(req)
if err != nil {
return nil, err
}
prm := new(putsvc.PutInitPrm)
prm.WithObject(objectSDK.NewFromV2(obj)).
WithCommonPrm(commonPrm).
WithPrivateKey(s.localNodeKey)
return prm, nil
}
func newOwnerID(vh *session.RequestVerificationHeader) (*refs.OwnerID, error) {
for vh.GetOrigin() != nil {
vh = vh.GetOrigin()

View file

@ -1,132 +1,66 @@
package putsvc
import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
objectwriter "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/common/writer"
objutil "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
"go.uber.org/zap"
)
type MaxSizeSource interface {
// MaxObjectSize returns maximum payload size
// of physically stored object in system.
//
// Must return 0 if value can not be obtained.
MaxObjectSize() uint64
}
type Service struct {
*cfg
}
type Option func(*cfg)
type ClientConstructor interface {
Get(client.NodeInfo) (client.MultiAddressClient, error)
}
type InnerRing interface {
InnerRingKeys() ([][]byte, error)
}
type FormatValidatorConfig interface {
VerifySessionTokenIssuer() bool
}
type cfg struct {
keyStorage *objutil.KeyStorage
maxSizeSrc MaxSizeSource
localStore ObjectStorage
cnrSrc container.Source
netMapSrc netmap.Source
remotePool, localPool util.WorkerPool
netmapKeys netmap.AnnouncedKeys
fmtValidator *object.FormatValidator
networkState netmap.State
clientConstructor ClientConstructor
log *logger.Logger
verifySessionTokenIssuer bool
*objectwriter.Config
}
func NewService(ks *objutil.KeyStorage,
cc ClientConstructor,
ms MaxSizeSource,
os ObjectStorage,
cc objectwriter.ClientConstructor,
ms objectwriter.MaxSizeSource,
os objectwriter.ObjectStorage,
cs container.Source,
ns netmap.Source,
nk netmap.AnnouncedKeys,
nst netmap.State,
ir InnerRing,
opts ...Option,
ir objectwriter.InnerRing,
opts ...objectwriter.Option,
) *Service {
c := &cfg{
remotePool: util.NewPseudoWorkerPool(),
localPool: util.NewPseudoWorkerPool(),
log: &logger.Logger{Logger: zap.L()},
keyStorage: ks,
clientConstructor: cc,
maxSizeSrc: ms,
localStore: os,
cnrSrc: cs,
netMapSrc: ns,
netmapKeys: nk,
networkState: nst,
c := &objectwriter.Config{
RemotePool: util.NewPseudoWorkerPool(),
LocalPool: util.NewPseudoWorkerPool(),
Logger: &logger.Logger{Logger: zap.L()},
KeyStorage: ks,
ClientConstructor: cc,
MaxSizeSrc: ms,
LocalStore: os,
ContainerSource: cs,
NetmapSource: ns,
NetmapKeys: nk,
NetworkState: nst,
}
for i := range opts {
opts[i](c)
}
c.fmtValidator = object.NewFormatValidator(
c.FormatValidator = object.NewFormatValidator(
object.WithLockSource(os),
object.WithNetState(nst),
object.WithInnerRing(ir),
object.WithNetmapSource(ns),
object.WithContainersSource(cs),
object.WithVerifySessionTokenIssuer(c.verifySessionTokenIssuer),
object.WithLogger(c.log),
object.WithVerifySessionTokenIssuer(c.VerifySessionTokenIssuer),
object.WithLogger(c.Logger),
)
return &Service{
cfg: c,
Config: c,
}
}
func (p *Service) Put() (*Streamer, error) {
return &Streamer{
cfg: p.cfg,
Config: p.Config,
}, nil
}
func WithWorkerPools(remote, local util.WorkerPool) Option {
return func(c *cfg) {
c.remotePool, c.localPool = remote, local
}
}
func WithLogger(l *logger.Logger) Option {
return func(c *cfg) {
c.log = l
}
}
func WithVerifySessionTokenIssuer(v bool) Option {
return func(c *cfg) {
c.verifySessionTokenIssuer = v
}
}

View file

@ -21,6 +21,8 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/policy"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/common/target"
objectwriter "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/common/writer"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/internal"
svcutil "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
@ -97,12 +99,12 @@ func (s *Service) validatePutSingle(ctx context.Context, obj *objectSDK.Object)
func (s *Service) validarePutSingleSize(obj *objectSDK.Object) error {
if uint64(len(obj.Payload())) != obj.PayloadSize() {
return ErrWrongPayloadSize
return target.ErrWrongPayloadSize
}
maxAllowedSize := s.maxSizeSrc.MaxObjectSize()
maxAllowedSize := s.Config.MaxSizeSrc.MaxObjectSize()
if obj.PayloadSize() > maxAllowedSize {
return ErrExceedingMaxSize
return target.ErrExceedingMaxSize
}
return nil
@ -137,11 +139,11 @@ func (s *Service) validatePutSingleChecksum(obj *objectSDK.Object) error {
}
func (s *Service) validatePutSingleObject(ctx context.Context, obj *objectSDK.Object) (object.ContentMeta, error) {
if err := s.fmtValidator.Validate(ctx, obj, false); err != nil {
if err := s.FormatValidator.Validate(ctx, obj, false); err != nil {
return object.ContentMeta{}, fmt.Errorf("coud not validate object format: %w", err)
}
meta, err := s.fmtValidator.ValidateContent(obj)
meta, err := s.FormatValidator.ValidateContent(obj)
if err != nil {
return object.ContentMeta{}, fmt.Errorf("could not validate payload content: %w", err)
}
@ -164,17 +166,17 @@ func (s *Service) saveToNodes(ctx context.Context, obj *objectSDK.Object, req *o
}
func (s *Service) saveToREPReplicas(ctx context.Context, placement putSinglePlacement, obj *objectSDK.Object, localOnly bool, req *objectAPI.PutSingleRequest, meta object.ContentMeta) error {
iter := s.cfg.newNodeIterator(placement.placementOptions)
iter.extraBroadcastEnabled = needAdditionalBroadcast(obj, localOnly)
iter.resetSuccessAfterOnBroadcast = placement.resetSuccessAfterOnBroadcast
iter := s.Config.NewNodeIterator(placement.placementOptions)
iter.ExtraBroadcastEnabled = objectwriter.NeedAdditionalBroadcast(obj, localOnly)
iter.ResetSuccessAfterOnBroadcast = placement.resetSuccessAfterOnBroadcast
signer := &putSingleRequestSigner{
req: req,
keyStorage: s.keyStorage,
keyStorage: s.Config.KeyStorage,
signer: &sync.Once{},
}
return iter.forEachNode(ctx, func(ctx context.Context, nd nodeDesc) error {
return iter.ForEachNode(ctx, func(ctx context.Context, nd objectwriter.NodeDescriptor) error {
return s.saveToPlacementNode(ctx, &nd, obj, signer, meta)
})
}
@ -184,25 +186,25 @@ func (s *Service) saveToECReplicas(ctx context.Context, placement putSinglePlace
if err != nil {
return err
}
key, err := s.cfg.keyStorage.GetKey(nil)
key, err := s.Config.KeyStorage.GetKey(nil)
if err != nil {
return err
}
signer := &putSingleRequestSigner{
req: req,
keyStorage: s.keyStorage,
keyStorage: s.Config.KeyStorage,
signer: &sync.Once{},
}
w := ecWriter{
cfg: s.cfg,
placementOpts: placement.placementOptions,
objMeta: meta,
objMetaValid: true,
commonPrm: commonPrm,
container: placement.container,
key: key,
relay: func(ctx context.Context, ni client.NodeInfo, mac client.MultiAddressClient) error {
w := objectwriter.ECWriter{
Config: s.Config,
PlacementOpts: placement.placementOptions,
ObjectMeta: meta,
ObjectMetaValid: true,
CommonPrm: commonPrm,
Container: placement.container,
Key: key,
Relay: func(ctx context.Context, ni client.NodeInfo, mac client.MultiAddressClient) error {
return s.redirectPutSingleRequest(ctx, signer, obj, ni, mac)
},
}
@ -223,7 +225,7 @@ func (s *Service) getPutSinglePlacementOptions(obj *objectSDK.Object, copiesNumb
if !ok {
return result, errors.New("missing container ID")
}
cnrInfo, err := s.cnrSrc.Get(cnrID)
cnrInfo, err := s.Config.ContainerSource.Get(cnrID)
if err != nil {
return result, fmt.Errorf("could not get container by ID: %w", err)
}
@ -247,31 +249,31 @@ func (s *Service) getPutSinglePlacementOptions(obj *objectSDK.Object, copiesNumb
}
result.placementOptions = append(result.placementOptions, placement.ForObject(objID))
latestNetmap, err := netmap.GetLatestNetworkMap(s.netMapSrc)
latestNetmap, err := netmap.GetLatestNetworkMap(s.Config.NetmapSource)
if err != nil {
return result, fmt.Errorf("could not get latest network map: %w", err)
}
builder := placement.NewNetworkMapBuilder(latestNetmap)
if localOnly {
result.placementOptions = append(result.placementOptions, placement.SuccessAfter(1))
builder = svcutil.NewLocalPlacement(builder, s.netmapKeys)
builder = svcutil.NewLocalPlacement(builder, s.Config.NetmapKeys)
}
result.placementOptions = append(result.placementOptions, placement.UseBuilder(builder))
return result, nil
}
func (s *Service) saveToPlacementNode(ctx context.Context, nodeDesc *nodeDesc, obj *objectSDK.Object,
func (s *Service) saveToPlacementNode(ctx context.Context, nodeDesc *objectwriter.NodeDescriptor, obj *objectSDK.Object,
signer *putSingleRequestSigner, meta object.ContentMeta,
) error {
if nodeDesc.local {
if nodeDesc.Local {
return s.saveLocal(ctx, obj, meta)
}
var info client.NodeInfo
client.NodeInfoFromNetmapElement(&info, nodeDesc.info)
client.NodeInfoFromNetmapElement(&info, nodeDesc.Info)
c, err := s.clientConstructor.Get(info)
c, err := s.Config.ClientConstructor.Get(info)
if err != nil {
return fmt.Errorf("could not create SDK client %s: %w", info.AddressGroup(), err)
}
@ -280,8 +282,8 @@ func (s *Service) saveToPlacementNode(ctx context.Context, nodeDesc *nodeDesc, o
}
func (s *Service) saveLocal(ctx context.Context, obj *objectSDK.Object, meta object.ContentMeta) error {
localTarget := &localTarget{
storage: s.localStore,
localTarget := &objectwriter.LocalTarget{
Storage: s.Config.LocalStore,
}
return localTarget.WriteObject(ctx, obj, meta)
}
@ -314,7 +316,7 @@ func (s *Service) redirectPutSingleRequest(ctx context.Context,
if err != nil {
objID, _ := obj.ID()
cnrID, _ := obj.ContainerID()
s.log.Warn(logs.PutSingleRedirectFailure,
s.Config.Logger.Warn(logs.PutSingleRedirectFailure,
zap.Error(err),
zap.Stringer("address", addr),
zap.Stringer("object_id", objID),

View file

@ -2,33 +2,21 @@ package putsvc
import (
"context"
"crypto/ecdsa"
"errors"
"fmt"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/policy"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
pkgutil "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/common/target"
objectwriter "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/common/writer"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/transformer"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
)
type Streamer struct {
*cfg
privateKey *ecdsa.PrivateKey
*objectwriter.Config
target transformer.ChunkedObjectWriter
relay func(context.Context, client.NodeInfo, client.MultiAddressClient) error
maxPayloadSz uint64 // network config
}
var errNotInit = errors.New("stream not initialized")
@ -36,8 +24,23 @@ var errNotInit = errors.New("stream not initialized")
var errInitRecall = errors.New("init recall")
func (p *Streamer) Init(ctx context.Context, prm *PutInitPrm) error {
if p.target != nil {
return errInitRecall
}
// initialize destination target
if err := p.initTarget(prm); err != nil {
prmTarget := &objectwriter.Params{
Config: p.Config,
Common: prm.common,
Header: prm.hdr,
Container: prm.cnr,
TraverseOpts: prm.traverseOpts,
Relay: p.relay,
}
var err error
p.target, err = target.New(prmTarget)
if err != nil {
return fmt.Errorf("(%T) could not initialize object target: %w", p, err)
}
@ -47,253 +50,6 @@ func (p *Streamer) Init(ctx context.Context, prm *PutInitPrm) error {
return nil
}
// Target accesses underlying target chunked object writer.
func (p *Streamer) Target() transformer.ChunkedObjectWriter {
return p.target
}
// MaxObjectSize returns maximum payload size for the streaming session.
//
// Must be called after the successful Init.
func (p *Streamer) MaxObjectSize() uint64 {
return p.maxPayloadSz
}
func (p *Streamer) initTarget(prm *PutInitPrm) error {
// prevent re-calling
if p.target != nil {
return errInitRecall
}
// prepare needed put parameters
if err := p.preparePrm(prm); err != nil {
return fmt.Errorf("(%T) could not prepare put parameters: %w", p, err)
}
p.maxPayloadSz = p.maxSizeSrc.MaxObjectSize()
if p.maxPayloadSz == 0 {
return fmt.Errorf("(%T) could not obtain max object size parameter", p)
}
if prm.hdr.Signature() != nil {
return p.initUntrustedTarget(prm)
}
return p.initTrustedTarget(prm)
}
func (p *Streamer) initUntrustedTarget(prm *PutInitPrm) error {
p.relay = prm.relay
if prm.privateKey != nil {
p.privateKey = prm.privateKey
} else {
nodeKey, err := p.cfg.keyStorage.GetKey(nil)
if err != nil {
return err
}
p.privateKey = nodeKey
}
// prepare untrusted-Put object target
p.target = &validatingPreparedTarget{
nextTarget: newInMemoryObjectBuilder(p.newObjectWriter(prm)),
fmt: p.fmtValidator,
maxPayloadSz: p.maxPayloadSz,
}
return nil
}
func (p *Streamer) initTrustedTarget(prm *PutInitPrm) error {
sToken := prm.common.SessionToken()
// prepare trusted-Put object target
// get private token from local storage
var sessionInfo *util.SessionInfo
if sToken != nil {
sessionInfo = &util.SessionInfo{
ID: sToken.ID(),
Owner: sToken.Issuer(),
}
}
key, err := p.keyStorage.GetKey(sessionInfo)
if err != nil {
return fmt.Errorf("(%T) could not receive session key: %w", p, err)
}
// In case session token is missing, the line above returns the default key.
// If it isn't owner key, replication attempts will fail, thus this check.
ownerObj := prm.hdr.OwnerID()
if ownerObj.IsEmpty() {
return errors.New("missing object owner")
}
if sToken == nil {
var ownerSession user.ID
user.IDFromKey(&ownerSession, key.PublicKey)
if !ownerObj.Equals(ownerSession) {
return fmt.Errorf("(%T) session token is missing but object owner id is different from the default key", p)
}
} else {
if !ownerObj.Equals(sessionInfo.Owner) {
return fmt.Errorf("(%T) different token issuer and object owner identifiers %s/%s", p, sessionInfo.Owner, ownerObj)
}
}
if prm.privateKey != nil {
p.privateKey = prm.privateKey
} else {
p.privateKey = key
}
p.target = &validatingTarget{
fmt: p.fmtValidator,
nextTarget: transformer.NewPayloadSizeLimiter(transformer.Params{
Key: key,
NextTargetInit: func() transformer.ObjectWriter { return p.newObjectWriter(prm) },
NetworkState: p.networkState,
MaxSize: p.maxPayloadSz,
WithoutHomomorphicHash: containerSDK.IsHomomorphicHashingDisabled(prm.cnr),
SessionToken: sToken,
}),
}
return nil
}
func (p *Streamer) preparePrm(prm *PutInitPrm) error {
var err error
// get latest network map
nm, err := netmap.GetLatestNetworkMap(p.netMapSrc)
if err != nil {
return fmt.Errorf("(%T) could not get latest network map: %w", p, err)
}
idCnr, ok := prm.hdr.ContainerID()
if !ok {
return errors.New("missing container ID")
}
// get container to store the object
cnrInfo, err := p.cnrSrc.Get(idCnr)
if err != nil {
return fmt.Errorf("(%T) could not get container by ID: %w", p, err)
}
prm.cnr = cnrInfo.Value
// add common options
prm.traverseOpts = append(prm.traverseOpts,
// set processing container
placement.ForContainer(prm.cnr),
)
if ech := prm.hdr.ECHeader(); ech != nil {
prm.traverseOpts = append(prm.traverseOpts,
// set identifier of the processing object
placement.ForObject(ech.Parent()),
)
} else if id, ok := prm.hdr.ID(); ok {
prm.traverseOpts = append(prm.traverseOpts,
// set identifier of the processing object
placement.ForObject(id),
)
}
// create placement builder from network map
builder := placement.NewNetworkMapBuilder(nm)
if prm.common.LocalOnly() {
// restrict success count to 1 stored copy (to local storage)
prm.traverseOpts = append(prm.traverseOpts, placement.SuccessAfter(1))
// use local-only placement builder
builder = util.NewLocalPlacement(builder, p.netmapKeys)
}
// set placement builder
prm.traverseOpts = append(prm.traverseOpts, placement.UseBuilder(builder))
return nil
}
func (p *Streamer) newObjectWriter(prm *PutInitPrm) transformer.ObjectWriter {
if container.IsECContainer(prm.cnr) && object.IsECSupported(prm.hdr) {
return p.newECWriter(prm)
}
return p.newDefaultObjectWriter(prm, false)
}
func (p *Streamer) newDefaultObjectWriter(prm *PutInitPrm, forECPlacement bool) transformer.ObjectWriter {
var relay func(context.Context, nodeDesc) error
if p.relay != nil {
relay = func(ctx context.Context, node nodeDesc) error {
var info client.NodeInfo
client.NodeInfoFromNetmapElement(&info, node.info)
c, err := p.clientConstructor.Get(info)
if err != nil {
return fmt.Errorf("could not create SDK client %s: %w", info.AddressGroup(), err)
}
return p.relay(ctx, info, c)
}
}
var resetSuccessAfterOnBroadcast bool
traverseOpts := prm.traverseOpts
if forECPlacement && !prm.common.LocalOnly() {
// save non-regular and linking object to EC container.
// EC 2.1 -> REP 2, EC 2.2 -> REP 3 etc.
traverseOpts = append(traverseOpts, placement.SuccessAfter(uint32(policy.ECParityCount(prm.cnr.PlacementPolicy())+1)))
resetSuccessAfterOnBroadcast = true
}
return &distributedTarget{
cfg: p.cfg,
placementOpts: traverseOpts,
resetSuccessAfterOnBroadcast: resetSuccessAfterOnBroadcast,
nodeTargetInitializer: func(node nodeDesc) preparedObjectTarget {
if node.local {
return localTarget{
storage: p.localStore,
}
}
rt := &remoteTarget{
privateKey: p.privateKey,
commonPrm: prm.common,
clientConstructor: p.clientConstructor,
}
client.NodeInfoFromNetmapElement(&rt.nodeInfo, node.info)
return rt
},
relay: relay,
}
}
func (p *Streamer) newECWriter(prm *PutInitPrm) transformer.ObjectWriter {
return &objectWriterDispatcher{
ecWriter: &ecWriter{
cfg: p.cfg,
placementOpts: append(prm.traverseOpts, placement.WithCopyNumbers(nil)), // copies number ignored for EC
container: prm.cnr,
key: p.privateKey,
commonPrm: prm.common,
relay: p.relay,
},
repWriter: p.newDefaultObjectWriter(prm, true),
}
}
func (p *Streamer) SendChunk(ctx context.Context, prm *PutChunkPrm) error {
if p.target == nil {
return errNotInit
@ -327,10 +83,3 @@ func (p *Streamer) Close(ctx context.Context) (*PutResponse, error) {
id: ids.SelfID,
}, nil
}
func (c *cfg) getWorkerPool(pub []byte) (pkgutil.WorkerPool, bool) {
if c.netmapKeys.IsLocalKey(pub) {
return c.localPool, true
}
return c.remotePool, false
}

View file

@ -11,6 +11,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/signature"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/common/target"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/internal"
internalclient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/internal/client"
putsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/put"
@ -55,7 +56,7 @@ func (s *streamer) Send(ctx context.Context, req *object.PutRequest) (err error)
s.saveChunks = v.GetSignature() != nil
if s.saveChunks {
maxSz := s.stream.MaxObjectSize()
maxSz := s.stream.MaxSizeSrc.MaxObjectSize()
s.sizes = &sizes{
payloadSz: uint64(v.GetHeader().GetPayloadLength()),
@ -63,7 +64,7 @@ func (s *streamer) Send(ctx context.Context, req *object.PutRequest) (err error)
// check payload size limit overflow
if s.payloadSz > maxSz {
return putsvc.ErrExceedingMaxSize
return target.ErrExceedingMaxSize
}
s.init = req
@ -74,7 +75,7 @@ func (s *streamer) Send(ctx context.Context, req *object.PutRequest) (err error)
// check payload size overflow
if s.writtenPayload > s.payloadSz {
return putsvc.ErrWrongPayloadSize
return target.ErrWrongPayloadSize
}
}
@ -117,7 +118,7 @@ func (s *streamer) CloseAndRecv(ctx context.Context) (*object.PutResponse, error
if s.saveChunks {
// check payload size correctness
if s.writtenPayload != s.payloadSz {
return nil, putsvc.ErrWrongPayloadSize
return nil, target.ErrWrongPayloadSize
}
}

View file

@ -5,7 +5,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
putsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/put"
objectwriter "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/common/writer"
tracingPkg "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
@ -52,7 +52,7 @@ func (p *Replicator) HandleReplicationTask(ctx context.Context, task Task, res T
}
}
prm := new(putsvc.RemotePutPrm).
prm := new(objectwriter.RemotePutPrm).
WithObject(task.Obj)
for i := 0; task.NumCopies > 0 && i < len(task.Nodes); i++ {

View file

@ -4,8 +4,8 @@ import (
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
objectwriter "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/common/writer"
getsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/get"
putsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/put"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
"go.uber.org/zap"
)
@ -24,7 +24,7 @@ type cfg struct {
log *logger.Logger
remoteSender *putsvc.RemoteSender
remoteSender *objectwriter.RemoteSender
remoteGetter *getsvc.RemoteGetter
@ -67,7 +67,7 @@ func WithLogger(v *logger.Logger) Option {
}
// WithRemoteSender returns option to set remote object sender of Replicator.
func WithRemoteSender(v *putsvc.RemoteSender) Option {
func WithRemoteSender(v *objectwriter.RemoteSender) Option {
return func(c *cfg) {
c.remoteSender = v
}