[#1310] object: Move target initialization to separate package
All checks were successful
Tests and linters / Run gofumpt (pull_request) Successful in 2m2s
DCO action / DCO (pull_request) Successful in 2m3s
Pre-commit hooks / Pre-commit (pull_request) Successful in 3m1s
Vulncheck / Vulncheck (pull_request) Successful in 2m43s
Tests and linters / Tests (1.23) (pull_request) Successful in 3m17s
Tests and linters / Tests (1.22) (pull_request) Successful in 3m21s
Build / Build Components (1.23) (pull_request) Successful in 3m24s
Tests and linters / Lint (pull_request) Successful in 3m41s
Build / Build Components (1.22) (pull_request) Successful in 3m28s
Tests and linters / Staticcheck (pull_request) Successful in 3m39s
Tests and linters / gopls check (pull_request) Successful in 4m16s
Tests and linters / Tests with -race (pull_request) Successful in 4m26s
All checks were successful
Tests and linters / Run gofumpt (pull_request) Successful in 2m2s
DCO action / DCO (pull_request) Successful in 2m3s
Pre-commit hooks / Pre-commit (pull_request) Successful in 3m1s
Vulncheck / Vulncheck (pull_request) Successful in 2m43s
Tests and linters / Tests (1.23) (pull_request) Successful in 3m17s
Tests and linters / Tests (1.22) (pull_request) Successful in 3m21s
Build / Build Components (1.23) (pull_request) Successful in 3m24s
Tests and linters / Lint (pull_request) Successful in 3m41s
Build / Build Components (1.22) (pull_request) Successful in 3m28s
Tests and linters / Staticcheck (pull_request) Successful in 3m39s
Tests and linters / gopls check (pull_request) Successful in 4m16s
Tests and linters / Tests with -race (pull_request) Successful in 4m26s
* Split the logic of write target initialization to different packages; * Refactor patch and put services: since both service initialize the target themselves. Signed-off-by: Airat Arifullin <a.arifullin@yadro.com>
This commit is contained in:
parent
98fe24cdb7
commit
a85561bf25
22 changed files with 599 additions and 581 deletions
|
@ -7,7 +7,7 @@ import (
|
|||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/metrics"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
|
||||
putsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/put"
|
||||
objectwriter "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/common/writer"
|
||||
utilSync "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/sync"
|
||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
|
@ -305,11 +305,11 @@ type ttlMaxObjectSizeCache struct {
|
|||
mtx sync.RWMutex
|
||||
lastUpdated time.Time
|
||||
lastSize uint64
|
||||
src putsvc.MaxSizeSource
|
||||
src objectwriter.MaxSizeSource
|
||||
metrics cacheMetrics
|
||||
}
|
||||
|
||||
func newCachedMaxObjectSizeSource(src putsvc.MaxSizeSource) putsvc.MaxSizeSource {
|
||||
func newCachedMaxObjectSizeSource(src objectwriter.MaxSizeSource) objectwriter.MaxSizeSource {
|
||||
return &ttlMaxObjectSizeCache{
|
||||
src: src,
|
||||
metrics: metrics.NewCacheMetrics("max_object_size"),
|
||||
|
|
|
@ -24,6 +24,7 @@ import (
|
|||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/acl"
|
||||
v2 "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/acl/v2"
|
||||
objectAPE "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/ape"
|
||||
objectwriter "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/common/writer"
|
||||
deletesvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/delete"
|
||||
deletesvcV2 "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/delete/v2"
|
||||
getsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/get"
|
||||
|
@ -188,7 +189,7 @@ func initObjectService(c *cfg) {
|
|||
|
||||
sDeleteV2 := createDeleteServiceV2(sDelete)
|
||||
|
||||
sPatch := createPatchSvc(sGet, sPut, keyStorage)
|
||||
sPatch := createPatchSvc(sGet, sPut)
|
||||
|
||||
// build service pipeline
|
||||
// grpc | audit | <metrics> | signature | response | acl | ape | split
|
||||
|
@ -326,7 +327,7 @@ func createReplicator(c *cfg, keyStorage *util.KeyStorage, cache *cache.ClientCa
|
|||
),
|
||||
replicator.WithLocalStorage(ls),
|
||||
replicator.WithRemoteSender(
|
||||
putsvc.NewRemoteSender(keyStorage, cache),
|
||||
objectwriter.NewRemoteSender(keyStorage, cache),
|
||||
),
|
||||
replicator.WithRemoteGetter(
|
||||
getsvc.NewRemoteGetter(c.clientCache, c.netMapSource, keyStorage),
|
||||
|
@ -338,7 +339,7 @@ func createReplicator(c *cfg, keyStorage *util.KeyStorage, cache *cache.ClientCa
|
|||
func createPutSvc(c *cfg, keyStorage *util.KeyStorage, irFetcher *cachedIRFetcher) *putsvc.Service {
|
||||
ls := c.cfgObject.cfgLocalStorage.localStorage
|
||||
|
||||
var os putsvc.ObjectStorage = engineWithoutNotifications{
|
||||
var os objectwriter.ObjectStorage = engineWithoutNotifications{
|
||||
engine: ls,
|
||||
}
|
||||
|
||||
|
@ -352,9 +353,9 @@ func createPutSvc(c *cfg, keyStorage *util.KeyStorage, irFetcher *cachedIRFetche
|
|||
c,
|
||||
c.cfgNetmap.state,
|
||||
irFetcher,
|
||||
putsvc.WithWorkerPools(c.cfgObject.pool.putRemote, c.cfgObject.pool.putLocal),
|
||||
putsvc.WithLogger(c.log),
|
||||
putsvc.WithVerifySessionTokenIssuer(!c.cfgObject.skipSessionTokenIssuerVerification),
|
||||
objectwriter.WithWorkerPools(c.cfgObject.pool.putRemote, c.cfgObject.pool.putLocal),
|
||||
objectwriter.WithLogger(c.log),
|
||||
objectwriter.WithVerifySessionTokenIssuer(!c.cfgObject.skipSessionTokenIssuerVerification),
|
||||
)
|
||||
}
|
||||
|
||||
|
@ -362,8 +363,8 @@ func createPutSvcV2(sPut *putsvc.Service, keyStorage *util.KeyStorage) *putsvcV2
|
|||
return putsvcV2.NewService(sPut, keyStorage)
|
||||
}
|
||||
|
||||
func createPatchSvc(sGet *getsvc.Service, sPut *putsvc.Service, keyStorage *util.KeyStorage) *patchsvc.Service {
|
||||
return patchsvc.NewService(keyStorage, sGet, sPut)
|
||||
func createPatchSvc(sGet *getsvc.Service, sPut *putsvc.Service) *patchsvc.Service {
|
||||
return patchsvc.NewService(sPut.Config, sGet)
|
||||
}
|
||||
|
||||
func createSearchSvc(c *cfg, keyStorage *util.KeyStorage, traverseGen *util.TraverserGenerator, coreConstructor *cache.ClientCache) *searchsvc.Service {
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
package putsvc
|
||||
package writetarget
|
||||
|
||||
import (
|
||||
"context"
|
|
@ -1,4 +1,4 @@
|
|||
package putsvc
|
||||
package writetarget
|
||||
|
||||
import (
|
||||
"sync"
|
170
pkg/services/object/common/target/target.go
Normal file
170
pkg/services/object/common/target/target.go
Normal file
|
@ -0,0 +1,170 @@
|
|||
package writetarget
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
|
||||
objectwriter "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/common/writer"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
|
||||
containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/transformer"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
|
||||
)
|
||||
|
||||
func New(prm *objectwriter.Params) (transformer.ChunkedObjectWriter, error) {
|
||||
// prepare needed put parameters
|
||||
if err := preparePrm(prm); err != nil {
|
||||
return nil, fmt.Errorf("could not prepare put parameters: %w", err)
|
||||
}
|
||||
|
||||
if prm.Header.Signature() != nil {
|
||||
return newUntrustedTarget(prm)
|
||||
}
|
||||
return newTrustedTarget(prm)
|
||||
}
|
||||
|
||||
func newUntrustedTarget(prm *objectwriter.Params) (transformer.ChunkedObjectWriter, error) {
|
||||
maxPayloadSz := prm.Config.MaxSizeSrc.MaxObjectSize()
|
||||
if maxPayloadSz == 0 {
|
||||
return nil, errors.New("could not obtain max object size parameter")
|
||||
}
|
||||
|
||||
if prm.SignRequestPrivateKey == nil {
|
||||
nodeKey, err := prm.Config.KeyStorage.GetKey(nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
prm.SignRequestPrivateKey = nodeKey
|
||||
}
|
||||
|
||||
// prepare untrusted-Put object target
|
||||
return &validatingPreparedTarget{
|
||||
nextTarget: newInMemoryObjectBuilder(objectwriter.New(prm)),
|
||||
fmt: prm.Config.FormatValidator,
|
||||
|
||||
maxPayloadSz: maxPayloadSz,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func newTrustedTarget(prm *objectwriter.Params) (transformer.ChunkedObjectWriter, error) {
|
||||
maxPayloadSz := prm.Config.MaxSizeSrc.MaxObjectSize()
|
||||
if maxPayloadSz == 0 {
|
||||
return nil, errors.New("could not obtain max object size parameter")
|
||||
}
|
||||
|
||||
sToken := prm.Common.SessionToken()
|
||||
|
||||
// prepare trusted-Put object target
|
||||
|
||||
// get private token from local storage
|
||||
var sessionInfo *util.SessionInfo
|
||||
|
||||
if sToken != nil {
|
||||
sessionInfo = &util.SessionInfo{
|
||||
ID: sToken.ID(),
|
||||
Owner: sToken.Issuer(),
|
||||
}
|
||||
}
|
||||
|
||||
key, err := prm.Config.KeyStorage.GetKey(sessionInfo)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not receive session key: %w", err)
|
||||
}
|
||||
|
||||
// In case session token is missing, the line above returns the default key.
|
||||
// If it isn't owner key, replication attempts will fail, thus this check.
|
||||
ownerObj := prm.Header.OwnerID()
|
||||
if ownerObj.IsEmpty() {
|
||||
return nil, errors.New("missing object owner")
|
||||
}
|
||||
|
||||
if sToken == nil {
|
||||
var ownerSession user.ID
|
||||
user.IDFromKey(&ownerSession, key.PublicKey)
|
||||
|
||||
if !ownerObj.Equals(ownerSession) {
|
||||
return nil, errors.New("session token is missing but object owner id is different from the default key")
|
||||
}
|
||||
} else {
|
||||
if !ownerObj.Equals(sessionInfo.Owner) {
|
||||
return nil, fmt.Errorf("different token issuer and object owner identifiers %s/%s", sessionInfo.Owner, ownerObj)
|
||||
}
|
||||
}
|
||||
|
||||
if prm.SignRequestPrivateKey == nil {
|
||||
prm.SignRequestPrivateKey = key
|
||||
}
|
||||
|
||||
return &validatingTarget{
|
||||
fmt: prm.Config.FormatValidator,
|
||||
nextTarget: transformer.NewPayloadSizeLimiter(transformer.Params{
|
||||
Key: key,
|
||||
NextTargetInit: func() transformer.ObjectWriter { return objectwriter.New(prm) },
|
||||
NetworkState: prm.Config.NetworkState,
|
||||
MaxSize: maxPayloadSz,
|
||||
WithoutHomomorphicHash: containerSDK.IsHomomorphicHashingDisabled(prm.Container),
|
||||
SessionToken: sToken,
|
||||
}),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func preparePrm(prm *objectwriter.Params) error {
|
||||
var err error
|
||||
|
||||
// get latest network map
|
||||
nm, err := netmap.GetLatestNetworkMap(prm.Config.NetmapSource)
|
||||
if err != nil {
|
||||
//return fmt.Errorf("(%T) could not get latest network map: %w", p, err)
|
||||
return fmt.Errorf("could not get latest network map: %w", err)
|
||||
}
|
||||
|
||||
idCnr, ok := prm.Header.ContainerID()
|
||||
if !ok {
|
||||
return errors.New("missing container ID")
|
||||
}
|
||||
|
||||
// get container to store the object
|
||||
cnrInfo, err := prm.Config.ContainerSource.Get(idCnr)
|
||||
if err != nil {
|
||||
//return fmt.Errorf("(%T) could not get container by ID: %w", p, err)
|
||||
return fmt.Errorf("could not get container by ID: %w", err)
|
||||
}
|
||||
|
||||
prm.Container = cnrInfo.Value
|
||||
|
||||
// add common options
|
||||
prm.TraverseOpts = append(prm.TraverseOpts,
|
||||
// set processing container
|
||||
placement.ForContainer(prm.Container),
|
||||
)
|
||||
|
||||
if ech := prm.Header.ECHeader(); ech != nil {
|
||||
prm.TraverseOpts = append(prm.TraverseOpts,
|
||||
// set identifier of the processing object
|
||||
placement.ForObject(ech.Parent()),
|
||||
)
|
||||
} else if id, ok := prm.Header.ID(); ok {
|
||||
prm.TraverseOpts = append(prm.TraverseOpts,
|
||||
// set identifier of the processing object
|
||||
placement.ForObject(id),
|
||||
)
|
||||
}
|
||||
|
||||
// create placement builder from network map
|
||||
builder := placement.NewNetworkMapBuilder(nm)
|
||||
|
||||
if prm.Common.LocalOnly() {
|
||||
// restrict success count to 1 stored copy (to local storage)
|
||||
prm.TraverseOpts = append(prm.TraverseOpts, placement.SuccessAfter(1))
|
||||
|
||||
// use local-only placement builder
|
||||
builder = util.NewLocalPlacement(builder, prm.Config.NetmapKeys)
|
||||
}
|
||||
|
||||
// set placement builder
|
||||
prm.TraverseOpts = append(prm.TraverseOpts, placement.UseBuilder(builder))
|
||||
|
||||
return nil
|
||||
}
|
|
@ -1,4 +1,4 @@
|
|||
package putsvc
|
||||
package writetarget
|
||||
|
||||
import (
|
||||
"bytes"
|
|
@ -1,4 +1,4 @@
|
|||
package putsvc
|
||||
package writer
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
@ -13,23 +13,23 @@ import (
|
|||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
type nodeIterator struct {
|
||||
traversal
|
||||
cfg *cfg
|
||||
type NodeIterator struct {
|
||||
Traversal
|
||||
cfg *Config
|
||||
}
|
||||
|
||||
func (c *cfg) newNodeIterator(opts []placement.Option) *nodeIterator {
|
||||
return &nodeIterator{
|
||||
traversal: traversal{
|
||||
opts: opts,
|
||||
mExclude: make(map[string]*bool),
|
||||
func (c *Config) NewNodeIterator(opts []placement.Option) *NodeIterator {
|
||||
return &NodeIterator{
|
||||
Traversal: Traversal{
|
||||
Opts: opts,
|
||||
Exclude: make(map[string]*bool),
|
||||
},
|
||||
cfg: c,
|
||||
}
|
||||
}
|
||||
|
||||
func (n *nodeIterator) forEachNode(ctx context.Context, f func(context.Context, nodeDesc) error) error {
|
||||
traverser, err := placement.NewTraverser(n.traversal.opts...)
|
||||
func (n *NodeIterator) ForEachNode(ctx context.Context, f func(context.Context, NodeDescriptor) error) error {
|
||||
traverser, err := placement.NewTraverser(n.Traversal.Opts...)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not create object placement traverser: %w", err)
|
||||
}
|
||||
|
@ -56,10 +56,10 @@ func (n *nodeIterator) forEachNode(ctx context.Context, f func(context.Context,
|
|||
}
|
||||
|
||||
// perform additional container broadcast if needed
|
||||
if n.traversal.submitPrimaryPlacementFinish() {
|
||||
err := n.forEachNode(ctx, f)
|
||||
if n.Traversal.submitPrimaryPlacementFinish() {
|
||||
err := n.ForEachNode(ctx, f)
|
||||
if err != nil {
|
||||
n.cfg.log.Error(logs.PutAdditionalContainerBroadcastFailure, zap.Error(err))
|
||||
n.cfg.Logger.Error(logs.PutAdditionalContainerBroadcastFailure, zap.Error(err))
|
||||
// we don't fail primary operation because of broadcast failure
|
||||
}
|
||||
}
|
||||
|
@ -67,11 +67,11 @@ func (n *nodeIterator) forEachNode(ctx context.Context, f func(context.Context,
|
|||
return nil
|
||||
}
|
||||
|
||||
func (n *nodeIterator) forEachAddress(ctx context.Context, traverser *placement.Traverser, addrs []placement.Node, f func(context.Context, nodeDesc) error, resErr *atomic.Value) bool {
|
||||
func (n *NodeIterator) forEachAddress(ctx context.Context, traverser *placement.Traverser, addrs []placement.Node, f func(context.Context, NodeDescriptor) error, resErr *atomic.Value) bool {
|
||||
var wg sync.WaitGroup
|
||||
|
||||
for _, addr := range addrs {
|
||||
if ok := n.mExclude[string(addr.PublicKey())]; ok != nil {
|
||||
if ok := n.Exclude[string(addr.PublicKey())]; ok != nil {
|
||||
if *ok {
|
||||
traverser.SubmitSuccess()
|
||||
}
|
||||
|
@ -86,10 +86,10 @@ func (n *nodeIterator) forEachAddress(ctx context.Context, traverser *placement.
|
|||
if err := workerPool.Submit(func() {
|
||||
defer wg.Done()
|
||||
|
||||
err := f(ctx, nodeDesc{local: isLocal, info: addr})
|
||||
err := f(ctx, NodeDescriptor{Local: isLocal, Info: addr})
|
||||
if err != nil {
|
||||
resErr.Store(err)
|
||||
svcutil.LogServiceError(n.cfg.log, "PUT", addr.Addresses(), err)
|
||||
svcutil.LogServiceError(n.cfg.Logger, "PUT", addr.Addresses(), err)
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -97,7 +97,7 @@ func (n *nodeIterator) forEachAddress(ctx context.Context, traverser *placement.
|
|||
*item = true
|
||||
}); err != nil {
|
||||
wg.Done()
|
||||
svcutil.LogWorkerPoolError(n.cfg.log, "PUT", err)
|
||||
svcutil.LogWorkerPoolError(n.cfg.Logger, "PUT", err)
|
||||
return true
|
||||
}
|
||||
|
||||
|
@ -105,7 +105,7 @@ func (n *nodeIterator) forEachAddress(ctx context.Context, traverser *placement.
|
|||
// in subsequent container broadcast. Note that we don't
|
||||
// process this node during broadcast if primary placement
|
||||
// on it failed.
|
||||
n.traversal.submitProcessed(addr, item)
|
||||
n.Traversal.submitProcessed(addr, item)
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
|
@ -113,6 +113,6 @@ func (n *nodeIterator) forEachAddress(ctx context.Context, traverser *placement.
|
|||
return false
|
||||
}
|
||||
|
||||
func needAdditionalBroadcast(obj *objectSDK.Object, localOnly bool) bool {
|
||||
func NeedAdditionalBroadcast(obj *objectSDK.Object, localOnly bool) bool {
|
||||
return len(obj.Children()) > 0 || (!localOnly && (obj.Type() == objectSDK.TypeTombstone || obj.Type() == objectSDK.TypeLock))
|
||||
}
|
|
@ -1,4 +1,4 @@
|
|||
package putsvc
|
||||
package writer
|
||||
|
||||
import (
|
||||
"context"
|
|
@ -1,4 +1,4 @@
|
|||
package putsvc
|
||||
package writer
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
@ -13,47 +13,47 @@ type preparedObjectTarget interface {
|
|||
WriteObject(context.Context, *objectSDK.Object, object.ContentMeta) error
|
||||
}
|
||||
|
||||
type distributedTarget struct {
|
||||
type distributedWriter struct {
|
||||
cfg *Config
|
||||
|
||||
placementOpts []placement.Option
|
||||
|
||||
obj *objectSDK.Object
|
||||
objMeta object.ContentMeta
|
||||
|
||||
*cfg
|
||||
nodeTargetInitializer func(NodeDescriptor) preparedObjectTarget
|
||||
|
||||
nodeTargetInitializer func(nodeDesc) preparedObjectTarget
|
||||
|
||||
relay func(context.Context, nodeDesc) error
|
||||
relay func(context.Context, NodeDescriptor) error
|
||||
|
||||
resetSuccessAfterOnBroadcast bool
|
||||
}
|
||||
|
||||
// parameters and state of container traversal.
|
||||
type traversal struct {
|
||||
opts []placement.Option
|
||||
// parameters and state of container Traversal.
|
||||
type Traversal struct {
|
||||
Opts []placement.Option
|
||||
|
||||
// need of additional broadcast after the object is saved
|
||||
extraBroadcastEnabled bool
|
||||
ExtraBroadcastEnabled bool
|
||||
|
||||
// container nodes which was processed during the primary object placement
|
||||
mExclude map[string]*bool
|
||||
Exclude map[string]*bool
|
||||
|
||||
resetSuccessAfterOnBroadcast bool
|
||||
ResetSuccessAfterOnBroadcast bool
|
||||
}
|
||||
|
||||
// updates traversal parameters after the primary placement finish and
|
||||
// returns true if additional container broadcast is needed.
|
||||
func (x *traversal) submitPrimaryPlacementFinish() bool {
|
||||
if x.extraBroadcastEnabled {
|
||||
func (x *Traversal) submitPrimaryPlacementFinish() bool {
|
||||
if x.ExtraBroadcastEnabled {
|
||||
// do not track success during container broadcast (best-effort)
|
||||
x.opts = append(x.opts, placement.WithoutSuccessTracking())
|
||||
x.Opts = append(x.Opts, placement.WithoutSuccessTracking())
|
||||
|
||||
if x.resetSuccessAfterOnBroadcast {
|
||||
x.opts = append(x.opts, placement.ResetSuccessAfter())
|
||||
if x.ResetSuccessAfterOnBroadcast {
|
||||
x.Opts = append(x.Opts, placement.ResetSuccessAfter())
|
||||
}
|
||||
|
||||
// avoid 2nd broadcast
|
||||
x.extraBroadcastEnabled = false
|
||||
x.ExtraBroadcastEnabled = false
|
||||
|
||||
return true
|
||||
}
|
||||
|
@ -62,22 +62,22 @@ func (x *traversal) submitPrimaryPlacementFinish() bool {
|
|||
}
|
||||
|
||||
// marks the container node as processed during the primary object placement.
|
||||
func (x *traversal) submitProcessed(n placement.Node, item *bool) {
|
||||
if x.extraBroadcastEnabled {
|
||||
func (x *Traversal) submitProcessed(n placement.Node, item *bool) {
|
||||
if x.ExtraBroadcastEnabled {
|
||||
key := string(n.PublicKey())
|
||||
|
||||
if x.mExclude == nil {
|
||||
x.mExclude = make(map[string]*bool, 1)
|
||||
if x.Exclude == nil {
|
||||
x.Exclude = make(map[string]*bool, 1)
|
||||
}
|
||||
|
||||
x.mExclude[key] = item
|
||||
x.Exclude[key] = item
|
||||
}
|
||||
}
|
||||
|
||||
type nodeDesc struct {
|
||||
local bool
|
||||
type NodeDescriptor struct {
|
||||
Local bool
|
||||
|
||||
info placement.Node
|
||||
Info placement.Node
|
||||
}
|
||||
|
||||
// errIncompletePut is returned if processing on a container fails.
|
||||
|
@ -96,19 +96,19 @@ func (x errIncompletePut) Error() string {
|
|||
}
|
||||
|
||||
// WriteObject implements the transformer.ObjectWriter interface.
|
||||
func (t *distributedTarget) WriteObject(ctx context.Context, obj *objectSDK.Object) error {
|
||||
func (t *distributedWriter) WriteObject(ctx context.Context, obj *objectSDK.Object) error {
|
||||
t.obj = obj
|
||||
|
||||
var err error
|
||||
|
||||
if t.objMeta, err = t.fmtValidator.ValidateContent(t.obj); err != nil {
|
||||
if t.objMeta, err = t.cfg.FormatValidator.ValidateContent(t.obj); err != nil {
|
||||
return fmt.Errorf("(%T) could not validate payload content: %w", t, err)
|
||||
}
|
||||
return t.iteratePlacement(ctx)
|
||||
}
|
||||
|
||||
func (t *distributedTarget) sendObject(ctx context.Context, node nodeDesc) error {
|
||||
if !node.local && t.relay != nil {
|
||||
func (t *distributedWriter) sendObject(ctx context.Context, node NodeDescriptor) error {
|
||||
if !node.Local && t.relay != nil {
|
||||
return t.relay(ctx, node)
|
||||
}
|
||||
|
||||
|
@ -121,11 +121,11 @@ func (t *distributedTarget) sendObject(ctx context.Context, node nodeDesc) error
|
|||
return nil
|
||||
}
|
||||
|
||||
func (t *distributedTarget) iteratePlacement(ctx context.Context) error {
|
||||
func (t *distributedWriter) iteratePlacement(ctx context.Context) error {
|
||||
id, _ := t.obj.ID()
|
||||
|
||||
iter := t.cfg.newNodeIterator(append(t.placementOpts, placement.ForObject(id)))
|
||||
iter.extraBroadcastEnabled = needAdditionalBroadcast(t.obj, false /* Distributed target is for cluster-wide PUT */)
|
||||
iter.resetSuccessAfterOnBroadcast = t.resetSuccessAfterOnBroadcast
|
||||
return iter.forEachNode(ctx, t.sendObject)
|
||||
iter := t.cfg.NewNodeIterator(append(t.placementOpts, placement.ForObject(id)))
|
||||
iter.ExtraBroadcastEnabled = NeedAdditionalBroadcast(t.obj, false /* Distributed target is for cluster-wide PUT */)
|
||||
iter.ResetSuccessAfterOnBroadcast = t.resetSuccessAfterOnBroadcast
|
||||
return iter.ForEachNode(ctx, t.sendObject)
|
||||
}
|
|
@ -1,4 +1,4 @@
|
|||
package putsvc
|
||||
package writer
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
@ -23,23 +23,23 @@ import (
|
|||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
||||
var _ transformer.ObjectWriter = (*ecWriter)(nil)
|
||||
var _ transformer.ObjectWriter = (*ECWriter)(nil)
|
||||
|
||||
var errUnsupportedECObject = errors.New("object is not supported for erasure coding")
|
||||
|
||||
type ecWriter struct {
|
||||
cfg *cfg
|
||||
placementOpts []placement.Option
|
||||
container containerSDK.Container
|
||||
key *ecdsa.PrivateKey
|
||||
commonPrm *svcutil.CommonPrm
|
||||
relay func(context.Context, client.NodeInfo, client.MultiAddressClient) error
|
||||
type ECWriter struct {
|
||||
Config *Config
|
||||
PlacementOpts []placement.Option
|
||||
Container containerSDK.Container
|
||||
Key *ecdsa.PrivateKey
|
||||
CommonPrm *svcutil.CommonPrm
|
||||
Relay func(context.Context, client.NodeInfo, client.MultiAddressClient) error
|
||||
|
||||
objMeta object.ContentMeta
|
||||
objMetaValid bool
|
||||
ObjectMeta object.ContentMeta
|
||||
ObjectMetaValid bool
|
||||
}
|
||||
|
||||
func (e *ecWriter) WriteObject(ctx context.Context, obj *objectSDK.Object) error {
|
||||
func (e *ECWriter) WriteObject(ctx context.Context, obj *objectSDK.Object) error {
|
||||
relayed, err := e.relayIfNotContainerNode(ctx, obj)
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -53,11 +53,11 @@ func (e *ecWriter) WriteObject(ctx context.Context, obj *objectSDK.Object) error
|
|||
return errUnsupportedECObject
|
||||
}
|
||||
|
||||
if !e.objMetaValid {
|
||||
if e.objMeta, err = e.cfg.fmtValidator.ValidateContent(obj); err != nil {
|
||||
if !e.ObjectMetaValid {
|
||||
if e.ObjectMeta, err = e.Config.FormatValidator.ValidateContent(obj); err != nil {
|
||||
return fmt.Errorf("(%T) could not validate payload content: %w", e, err)
|
||||
}
|
||||
e.objMetaValid = true
|
||||
e.ObjectMetaValid = true
|
||||
}
|
||||
|
||||
if obj.ECHeader() != nil {
|
||||
|
@ -66,8 +66,8 @@ func (e *ecWriter) WriteObject(ctx context.Context, obj *objectSDK.Object) error
|
|||
return e.writeRawObject(ctx, obj)
|
||||
}
|
||||
|
||||
func (e *ecWriter) relayIfNotContainerNode(ctx context.Context, obj *objectSDK.Object) (bool, error) {
|
||||
if e.relay == nil {
|
||||
func (e *ECWriter) relayIfNotContainerNode(ctx context.Context, obj *objectSDK.Object) (bool, error) {
|
||||
if e.Relay == nil {
|
||||
return false, nil
|
||||
}
|
||||
currentNodeIsContainerNode, err := e.currentNodeIsContainerNode()
|
||||
|
@ -90,8 +90,8 @@ func (e *ecWriter) relayIfNotContainerNode(ctx context.Context, obj *objectSDK.O
|
|||
return true, nil
|
||||
}
|
||||
|
||||
func (e *ecWriter) currentNodeIsContainerNode() (bool, error) {
|
||||
t, err := placement.NewTraverser(e.placementOpts...)
|
||||
func (e *ECWriter) currentNodeIsContainerNode() (bool, error) {
|
||||
t, err := placement.NewTraverser(e.PlacementOpts...)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
@ -101,7 +101,7 @@ func (e *ecWriter) currentNodeIsContainerNode() (bool, error) {
|
|||
break
|
||||
}
|
||||
for _, node := range nodes {
|
||||
if e.cfg.netmapKeys.IsLocalKey(node.PublicKey()) {
|
||||
if e.Config.NetmapKeys.IsLocalKey(node.PublicKey()) {
|
||||
return true, nil
|
||||
}
|
||||
}
|
||||
|
@ -109,8 +109,8 @@ func (e *ecWriter) currentNodeIsContainerNode() (bool, error) {
|
|||
return false, nil
|
||||
}
|
||||
|
||||
func (e *ecWriter) relayToContainerNode(ctx context.Context, objID oid.ID, index uint32) error {
|
||||
t, err := placement.NewTraverser(append(e.placementOpts, placement.ForObject(objID))...)
|
||||
func (e *ECWriter) relayToContainerNode(ctx context.Context, objID oid.ID, index uint32) error {
|
||||
t, err := placement.NewTraverser(append(e.PlacementOpts, placement.ForObject(objID))...)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -126,18 +126,18 @@ func (e *ecWriter) relayToContainerNode(ctx context.Context, objID oid.ID, index
|
|||
var info client.NodeInfo
|
||||
client.NodeInfoFromNetmapElement(&info, node)
|
||||
|
||||
c, err := e.cfg.clientConstructor.Get(info)
|
||||
c, err := e.Config.ClientConstructor.Get(info)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not create SDK client %s: %w", info.AddressGroup(), err)
|
||||
}
|
||||
|
||||
completed := make(chan interface{})
|
||||
if poolErr := e.cfg.remotePool.Submit(func() {
|
||||
if poolErr := e.Config.RemotePool.Submit(func() {
|
||||
defer close(completed)
|
||||
err = e.relay(ctx, info, c)
|
||||
err = e.Relay(ctx, info, c)
|
||||
}); poolErr != nil {
|
||||
close(completed)
|
||||
svcutil.LogWorkerPoolError(e.cfg.log, "PUT", poolErr)
|
||||
svcutil.LogWorkerPoolError(e.Config.Logger, "PUT", poolErr)
|
||||
return poolErr
|
||||
}
|
||||
<-completed
|
||||
|
@ -145,7 +145,7 @@ func (e *ecWriter) relayToContainerNode(ctx context.Context, objID oid.ID, index
|
|||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
e.cfg.log.Logger.Warn(logs.ECFailedToSendToContainerNode, zap.Stringers("address_group", info.AddressGroup()))
|
||||
e.Config.Logger.Logger.Warn(logs.ECFailedToSendToContainerNode, zap.Stringers("address_group", info.AddressGroup()))
|
||||
lastErr = err
|
||||
}
|
||||
}
|
||||
|
@ -157,12 +157,12 @@ func (e *ecWriter) relayToContainerNode(ctx context.Context, objID oid.ID, index
|
|||
}
|
||||
}
|
||||
|
||||
func (e *ecWriter) writeECPart(ctx context.Context, obj *objectSDK.Object) error {
|
||||
if e.commonPrm.LocalOnly() {
|
||||
func (e *ECWriter) writeECPart(ctx context.Context, obj *objectSDK.Object) error {
|
||||
if e.CommonPrm.LocalOnly() {
|
||||
return e.writePartLocal(ctx, obj)
|
||||
}
|
||||
|
||||
t, err := placement.NewTraverser(append(e.placementOpts, placement.ForObject(obj.ECHeader().Parent()))...)
|
||||
t, err := placement.NewTraverser(append(e.PlacementOpts, placement.ForObject(obj.ECHeader().Parent()))...)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -187,18 +187,18 @@ func (e *ecWriter) writeECPart(ctx context.Context, obj *objectSDK.Object) error
|
|||
return nil
|
||||
}
|
||||
|
||||
func (e *ecWriter) writeRawObject(ctx context.Context, obj *objectSDK.Object) error {
|
||||
func (e *ECWriter) writeRawObject(ctx context.Context, obj *objectSDK.Object) error {
|
||||
// now only single EC policy is supported
|
||||
c, err := erasurecode.NewConstructor(policy.ECDataCount(e.container.PlacementPolicy()), policy.ECParityCount(e.container.PlacementPolicy()))
|
||||
c, err := erasurecode.NewConstructor(policy.ECDataCount(e.Container.PlacementPolicy()), policy.ECParityCount(e.Container.PlacementPolicy()))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
parts, err := c.Split(obj, e.key)
|
||||
parts, err := c.Split(obj, e.Key)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
objID, _ := obj.ID()
|
||||
t, err := placement.NewTraverser(append(e.placementOpts, placement.ForObject(objID))...)
|
||||
t, err := placement.NewTraverser(append(e.PlacementOpts, placement.ForObject(objID))...)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -230,7 +230,7 @@ func (e *ecWriter) writeRawObject(ctx context.Context, obj *objectSDK.Object) er
|
|||
return nil
|
||||
}
|
||||
|
||||
func (e *ecWriter) writePart(ctx context.Context, obj *objectSDK.Object, partIdx int, nodes []placement.Node, visited []atomic.Bool) error {
|
||||
func (e *ECWriter) writePart(ctx context.Context, obj *objectSDK.Object, partIdx int, nodes []placement.Node, visited []atomic.Bool) error {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
|
@ -243,7 +243,7 @@ func (e *ecWriter) writePart(ctx context.Context, obj *objectSDK.Object, partIdx
|
|||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
e.cfg.log.Warn(logs.ECFailedToSaveECPart, zap.Stringer("part_address", object.AddressOf(obj)),
|
||||
e.Config.Logger.Warn(logs.ECFailedToSaveECPart, zap.Stringer("part_address", object.AddressOf(obj)),
|
||||
zap.Stringer("parent_address", obj.ECHeader().Parent()), zap.Int("part_index", partIdx),
|
||||
zap.String("node", hex.EncodeToString(node.PublicKey())), zap.Error(err))
|
||||
|
||||
|
@ -267,7 +267,7 @@ func (e *ecWriter) writePart(ctx context.Context, obj *objectSDK.Object, partIdx
|
|||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
e.cfg.log.Warn(logs.ECFailedToSaveECPart, zap.Stringer("part_address", object.AddressOf(obj)),
|
||||
e.Config.Logger.Warn(logs.ECFailedToSaveECPart, zap.Stringer("part_address", object.AddressOf(obj)),
|
||||
zap.Stringer("parent_address", obj.ECHeader().Parent()), zap.Int("part_index", partIdx),
|
||||
zap.String("node", hex.EncodeToString(node.PublicKey())),
|
||||
zap.Error(err))
|
||||
|
@ -291,7 +291,7 @@ func (e *ecWriter) writePart(ctx context.Context, obj *objectSDK.Object, partIdx
|
|||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
e.cfg.log.Warn(logs.ECFailedToSaveECPart, zap.Stringer("part_address", object.AddressOf(obj)),
|
||||
e.Config.Logger.Warn(logs.ECFailedToSaveECPart, zap.Stringer("part_address", object.AddressOf(obj)),
|
||||
zap.Stringer("parent_address", obj.ECHeader().Parent()), zap.Int("part_index", partIdx),
|
||||
zap.String("node", hex.EncodeToString(node.PublicKey())),
|
||||
zap.Error(err))
|
||||
|
@ -300,22 +300,22 @@ func (e *ecWriter) writePart(ctx context.Context, obj *objectSDK.Object, partIdx
|
|||
return fmt.Errorf("failed to save EC chunk %s to any node", object.AddressOf(obj))
|
||||
}
|
||||
|
||||
func (e *ecWriter) putECPartToNode(ctx context.Context, obj *objectSDK.Object, node placement.Node) error {
|
||||
if e.cfg.netmapKeys.IsLocalKey(node.PublicKey()) {
|
||||
func (e *ECWriter) putECPartToNode(ctx context.Context, obj *objectSDK.Object, node placement.Node) error {
|
||||
if e.Config.NetmapKeys.IsLocalKey(node.PublicKey()) {
|
||||
return e.writePartLocal(ctx, obj)
|
||||
}
|
||||
return e.writePartRemote(ctx, obj, node)
|
||||
}
|
||||
|
||||
func (e *ecWriter) writePartLocal(ctx context.Context, obj *objectSDK.Object) error {
|
||||
func (e *ECWriter) writePartLocal(ctx context.Context, obj *objectSDK.Object) error {
|
||||
var err error
|
||||
localTarget := localTarget{
|
||||
storage: e.cfg.localStore,
|
||||
localTarget := LocalTarget{
|
||||
Storage: e.Config.LocalStore,
|
||||
}
|
||||
completed := make(chan interface{})
|
||||
if poolErr := e.cfg.localPool.Submit(func() {
|
||||
if poolErr := e.Config.LocalPool.Submit(func() {
|
||||
defer close(completed)
|
||||
err = localTarget.WriteObject(ctx, obj, e.objMeta)
|
||||
err = localTarget.WriteObject(ctx, obj, e.ObjectMeta)
|
||||
}); poolErr != nil {
|
||||
close(completed)
|
||||
return poolErr
|
||||
|
@ -324,22 +324,22 @@ func (e *ecWriter) writePartLocal(ctx context.Context, obj *objectSDK.Object) er
|
|||
return err
|
||||
}
|
||||
|
||||
func (e *ecWriter) writePartRemote(ctx context.Context, obj *objectSDK.Object, node placement.Node) error {
|
||||
func (e *ECWriter) writePartRemote(ctx context.Context, obj *objectSDK.Object, node placement.Node) error {
|
||||
var clientNodeInfo client.NodeInfo
|
||||
client.NodeInfoFromNetmapElement(&clientNodeInfo, node)
|
||||
|
||||
remoteTaget := remoteTarget{
|
||||
privateKey: e.key,
|
||||
clientConstructor: e.cfg.clientConstructor,
|
||||
commonPrm: e.commonPrm,
|
||||
remoteTaget := remoteWriter{
|
||||
privateKey: e.Key,
|
||||
clientConstructor: e.Config.ClientConstructor,
|
||||
commonPrm: e.CommonPrm,
|
||||
nodeInfo: clientNodeInfo,
|
||||
}
|
||||
|
||||
var err error
|
||||
completed := make(chan interface{})
|
||||
if poolErr := e.cfg.remotePool.Submit(func() {
|
||||
if poolErr := e.Config.RemotePool.Submit(func() {
|
||||
defer close(completed)
|
||||
err = remoteTaget.WriteObject(ctx, obj, e.objMeta)
|
||||
err = remoteTaget.WriteObject(ctx, obj, e.ObjectMeta)
|
||||
}); poolErr != nil {
|
||||
close(completed)
|
||||
return poolErr
|
|
@ -1,4 +1,4 @@
|
|||
package putsvc
|
||||
package writer
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
@ -24,19 +24,19 @@ type ObjectStorage interface {
|
|||
IsLocked(context.Context, oid.Address) (bool, error)
|
||||
}
|
||||
|
||||
type localTarget struct {
|
||||
storage ObjectStorage
|
||||
type LocalTarget struct {
|
||||
Storage ObjectStorage
|
||||
}
|
||||
|
||||
func (t localTarget) WriteObject(ctx context.Context, obj *objectSDK.Object, meta objectCore.ContentMeta) error {
|
||||
func (t LocalTarget) WriteObject(ctx context.Context, obj *objectSDK.Object, meta objectCore.ContentMeta) error {
|
||||
switch meta.Type() {
|
||||
case objectSDK.TypeTombstone:
|
||||
err := t.storage.Delete(ctx, objectCore.AddressOf(obj), meta.Objects())
|
||||
err := t.Storage.Delete(ctx, objectCore.AddressOf(obj), meta.Objects())
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not delete objects from tombstone locally: %w", err)
|
||||
}
|
||||
case objectSDK.TypeLock:
|
||||
err := t.storage.Lock(ctx, objectCore.AddressOf(obj), meta.Objects())
|
||||
err := t.Storage.Lock(ctx, objectCore.AddressOf(obj), meta.Objects())
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not lock object from lock objects locally: %w", err)
|
||||
}
|
||||
|
@ -44,7 +44,7 @@ func (t localTarget) WriteObject(ctx context.Context, obj *objectSDK.Object, met
|
|||
// objects that do not change meta storage
|
||||
}
|
||||
|
||||
if err := t.storage.Put(ctx, obj); err != nil {
|
||||
if err := t.Storage.Put(ctx, obj); err != nil {
|
||||
return fmt.Errorf("(%T) could not put object to local storage: %w", t, err)
|
||||
}
|
||||
return nil
|
|
@ -1,4 +1,4 @@
|
|||
package putsvc
|
||||
package writer
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
@ -16,7 +16,7 @@ import (
|
|||
"google.golang.org/grpc/status"
|
||||
)
|
||||
|
||||
type remoteTarget struct {
|
||||
type remoteWriter struct {
|
||||
privateKey *ecdsa.PrivateKey
|
||||
|
||||
commonPrm *util.CommonPrm
|
||||
|
@ -41,7 +41,7 @@ type RemotePutPrm struct {
|
|||
obj *objectSDK.Object
|
||||
}
|
||||
|
||||
func (t *remoteTarget) WriteObject(ctx context.Context, obj *objectSDK.Object, _ objectcore.ContentMeta) error {
|
||||
func (t *remoteWriter) WriteObject(ctx context.Context, obj *objectSDK.Object, _ objectcore.ContentMeta) error {
|
||||
c, err := t.clientConstructor.Get(t.nodeInfo)
|
||||
if err != nil {
|
||||
return fmt.Errorf("(%T) could not create SDK client %s: %w", t, t.nodeInfo, err)
|
||||
|
@ -64,7 +64,7 @@ func (t *remoteTarget) WriteObject(ctx context.Context, obj *objectSDK.Object, _
|
|||
return t.putStream(ctx, prm)
|
||||
}
|
||||
|
||||
func (t *remoteTarget) putStream(ctx context.Context, prm internalclient.PutObjectPrm) error {
|
||||
func (t *remoteWriter) putStream(ctx context.Context, prm internalclient.PutObjectPrm) error {
|
||||
_, err := internalclient.PutObject(ctx, prm)
|
||||
if err != nil {
|
||||
return fmt.Errorf("(%T) could not put object to %s: %w", t, t.nodeInfo.AddressGroup(), err)
|
||||
|
@ -72,7 +72,7 @@ func (t *remoteTarget) putStream(ctx context.Context, prm internalclient.PutObje
|
|||
return nil
|
||||
}
|
||||
|
||||
func (t *remoteTarget) putSingle(ctx context.Context, prm internalclient.PutObjectPrm) error {
|
||||
func (t *remoteWriter) putSingle(ctx context.Context, prm internalclient.PutObjectPrm) error {
|
||||
_, err := internalclient.PutObjectSingle(ctx, prm)
|
||||
if err != nil {
|
||||
return fmt.Errorf("(%T) could not put single object to %s: %w", t, t.nodeInfo.AddressGroup(), err)
|
||||
|
@ -113,7 +113,7 @@ func (s *RemoteSender) PutObject(ctx context.Context, p *RemotePutPrm) error {
|
|||
return err
|
||||
}
|
||||
|
||||
t := &remoteTarget{
|
||||
t := &remoteWriter{
|
||||
privateKey: key,
|
||||
clientConstructor: s.clientConstructor,
|
||||
}
|
183
pkg/services/object/common/writer/writer.go
Normal file
183
pkg/services/object/common/writer/writer.go
Normal file
|
@ -0,0 +1,183 @@
|
|||
package writer
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/ecdsa"
|
||||
"fmt"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/policy"
|
||||
objutil "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||
containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
|
||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/transformer"
|
||||
)
|
||||
|
||||
type MaxSizeSource interface {
|
||||
// MaxObjectSize returns maximum payload size
|
||||
// of physically stored object in system.
|
||||
//
|
||||
// Must return 0 if value can not be obtained.
|
||||
MaxObjectSize() uint64
|
||||
}
|
||||
|
||||
type ClientConstructor interface {
|
||||
Get(client.NodeInfo) (client.MultiAddressClient, error)
|
||||
}
|
||||
|
||||
type InnerRing interface {
|
||||
InnerRingKeys() ([][]byte, error)
|
||||
}
|
||||
|
||||
type FormatValidatorConfig interface {
|
||||
VerifySessionTokenIssuer() bool
|
||||
}
|
||||
|
||||
// Config represents a set of static parameters that are established during
|
||||
// the initialization phase of all services.
|
||||
type Config struct {
|
||||
KeyStorage *objutil.KeyStorage
|
||||
|
||||
MaxSizeSrc MaxSizeSource
|
||||
|
||||
LocalStore ObjectStorage
|
||||
|
||||
ContainerSource container.Source
|
||||
|
||||
NetmapSource netmap.Source
|
||||
|
||||
RemotePool, LocalPool util.WorkerPool
|
||||
|
||||
NetmapKeys netmap.AnnouncedKeys
|
||||
|
||||
FormatValidator *object.FormatValidator
|
||||
|
||||
NetworkState netmap.State
|
||||
|
||||
ClientConstructor ClientConstructor
|
||||
|
||||
Logger *logger.Logger
|
||||
|
||||
VerifySessionTokenIssuer bool
|
||||
}
|
||||
|
||||
type Option func(*Config)
|
||||
|
||||
func WithWorkerPools(remote, local util.WorkerPool) Option {
|
||||
return func(c *Config) {
|
||||
c.RemotePool, c.LocalPool = remote, local
|
||||
}
|
||||
}
|
||||
|
||||
func WithLogger(l *logger.Logger) Option {
|
||||
return func(c *Config) {
|
||||
c.Logger = l
|
||||
}
|
||||
}
|
||||
|
||||
func WithVerifySessionTokenIssuer(v bool) Option {
|
||||
return func(c *Config) {
|
||||
c.VerifySessionTokenIssuer = v
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Config) getWorkerPool(pub []byte) (util.WorkerPool, bool) {
|
||||
if c.NetmapKeys.IsLocalKey(pub) {
|
||||
return c.LocalPool, true
|
||||
}
|
||||
return c.RemotePool, false
|
||||
}
|
||||
|
||||
type Params struct {
|
||||
Config *Config
|
||||
|
||||
Common *objutil.CommonPrm
|
||||
|
||||
Header *objectSDK.Object
|
||||
|
||||
Container containerSDK.Container
|
||||
|
||||
TraverseOpts []placement.Option
|
||||
|
||||
Relay func(context.Context, client.NodeInfo, client.MultiAddressClient) error
|
||||
|
||||
SignRequestPrivateKey *ecdsa.PrivateKey
|
||||
}
|
||||
|
||||
func New(prm *Params) transformer.ObjectWriter {
|
||||
if container.IsECContainer(prm.Container) && object.IsECSupported(prm.Header) {
|
||||
return newECWriter(prm)
|
||||
}
|
||||
return newDefaultObjectWriter(prm, false)
|
||||
}
|
||||
|
||||
func newDefaultObjectWriter(prm *Params, forECPlacement bool) transformer.ObjectWriter {
|
||||
var relay func(context.Context, NodeDescriptor) error
|
||||
if prm.Relay != nil {
|
||||
relay = func(ctx context.Context, node NodeDescriptor) error {
|
||||
var info client.NodeInfo
|
||||
|
||||
client.NodeInfoFromNetmapElement(&info, node.Info)
|
||||
|
||||
c, err := prm.Config.ClientConstructor.Get(info)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not create SDK client %s: %w", info.AddressGroup(), err)
|
||||
}
|
||||
|
||||
return prm.Relay(ctx, info, c)
|
||||
}
|
||||
}
|
||||
|
||||
var resetSuccessAfterOnBroadcast bool
|
||||
traverseOpts := prm.TraverseOpts
|
||||
if forECPlacement && !prm.Common.LocalOnly() {
|
||||
// save non-regular and linking object to EC container.
|
||||
// EC 2.1 -> REP 2, EC 2.2 -> REP 3 etc.
|
||||
traverseOpts = append(traverseOpts, placement.SuccessAfter(uint32(policy.ECParityCount(prm.Container.PlacementPolicy())+1)))
|
||||
resetSuccessAfterOnBroadcast = true
|
||||
}
|
||||
|
||||
return &distributedWriter{
|
||||
cfg: prm.Config,
|
||||
placementOpts: traverseOpts,
|
||||
resetSuccessAfterOnBroadcast: resetSuccessAfterOnBroadcast,
|
||||
nodeTargetInitializer: func(node NodeDescriptor) preparedObjectTarget {
|
||||
if node.Local {
|
||||
return LocalTarget{
|
||||
Storage: prm.Config.LocalStore,
|
||||
}
|
||||
}
|
||||
|
||||
rt := &remoteWriter{
|
||||
privateKey: prm.SignRequestPrivateKey,
|
||||
commonPrm: prm.Common,
|
||||
clientConstructor: prm.Config.ClientConstructor,
|
||||
}
|
||||
|
||||
client.NodeInfoFromNetmapElement(&rt.nodeInfo, node.Info)
|
||||
|
||||
return rt
|
||||
},
|
||||
relay: relay,
|
||||
}
|
||||
}
|
||||
|
||||
func newECWriter(prm *Params) transformer.ObjectWriter {
|
||||
return &objectWriterDispatcher{
|
||||
ecWriter: &ECWriter{
|
||||
Config: prm.Config,
|
||||
PlacementOpts: append(prm.TraverseOpts, placement.WithCopyNumbers(nil)), // copies number ignored for EC
|
||||
Container: prm.Container,
|
||||
Key: prm.SignRequestPrivateKey,
|
||||
CommonPrm: prm.Common,
|
||||
Relay: prm.Relay,
|
||||
},
|
||||
repWriter: newDefaultObjectWriter(prm, true),
|
||||
}
|
||||
}
|
|
@ -2,43 +2,40 @@ package patchsvc
|
|||
|
||||
import (
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object"
|
||||
objectwriter "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/common/writer"
|
||||
getsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/get"
|
||||
putsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/put"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
|
||||
)
|
||||
|
||||
// Service implements Put operation of Object service v2.
|
||||
type Service struct {
|
||||
keyStorage *util.KeyStorage
|
||||
*objectwriter.Config
|
||||
|
||||
getSvc *getsvc.Service
|
||||
|
||||
putSvc *putsvc.Service
|
||||
}
|
||||
|
||||
// NewService constructs Service instance from provided options.
|
||||
func NewService(ks *util.KeyStorage, getSvc *getsvc.Service, putSvc *putsvc.Service) *Service {
|
||||
//
|
||||
// Patch service can use the same objectwriter.Config initializied by Put service.
|
||||
func NewService(cfg *objectwriter.Config,
|
||||
getSvc *getsvc.Service,
|
||||
) *Service {
|
||||
return &Service{
|
||||
keyStorage: ks,
|
||||
Config: cfg,
|
||||
|
||||
getSvc: getSvc,
|
||||
|
||||
putSvc: putSvc,
|
||||
}
|
||||
}
|
||||
|
||||
// Put calls internal service and returns v2 object streamer.
|
||||
func (s *Service) Patch() (object.PatchObjectStream, error) {
|
||||
nodeKey, err := s.keyStorage.GetKey(nil)
|
||||
nodeKey, err := s.Config.KeyStorage.GetKey(nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &Streamer{
|
||||
getSvc: s.getSvc,
|
||||
|
||||
putSvc: s.putSvc,
|
||||
|
||||
Config: s.Config,
|
||||
getSvc: s.getSvc,
|
||||
localNodeKey: nodeKey,
|
||||
}, nil
|
||||
}
|
||||
|
|
|
@ -9,8 +9,9 @@ import (
|
|||
|
||||
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
|
||||
refsV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/refs"
|
||||
writetarget "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/common/target"
|
||||
objectwriter "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/common/writer"
|
||||
getsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/get"
|
||||
putsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/put"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
|
@ -21,6 +22,8 @@ import (
|
|||
// Streamer for the patch handler is a pipeline that merges two incoming streams of patches
|
||||
// and original object payload chunks. The merged result is fed to Put stream target.
|
||||
type Streamer struct {
|
||||
*objectwriter.Config
|
||||
|
||||
// Patcher must be initialized at first Streamer.Send call.
|
||||
patcher patcher.PatchApplier
|
||||
|
||||
|
@ -28,8 +31,6 @@ type Streamer struct {
|
|||
|
||||
getSvc *getsvc.Service
|
||||
|
||||
putSvc *putsvc.Service
|
||||
|
||||
localNodeKey *ecdsa.PrivateKey
|
||||
}
|
||||
|
||||
|
@ -78,11 +79,6 @@ func (s *Streamer) init(ctx context.Context, req *objectV2.PatchRequest) error {
|
|||
localNodeKey: s.localNodeKey,
|
||||
}
|
||||
|
||||
putstm, err := s.putSvc.Put()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
hdr := hdrWithSig.GetHeader()
|
||||
oV2 := new(objectV2.Object)
|
||||
hV2 := new(objectV2.Header)
|
||||
|
@ -97,14 +93,14 @@ func (s *Streamer) init(ctx context.Context, req *objectV2.PatchRequest) error {
|
|||
}
|
||||
oV2.GetHeader().SetOwnerID(ownerID)
|
||||
|
||||
prm, err := s.putInitPrm(req, oV2)
|
||||
target, err := writetarget.New(&objectwriter.Params{
|
||||
Config: s.Config,
|
||||
Common: commonPrm,
|
||||
Header: objectSDK.NewFromV2(oV2),
|
||||
SignRequestPrivateKey: s.localNodeKey,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = putstm.Init(ctx, prm)
|
||||
if err != nil {
|
||||
return err
|
||||
return fmt.Errorf("target creation: %w", err)
|
||||
}
|
||||
|
||||
patcherPrm := patcher.Params{
|
||||
|
@ -112,7 +108,7 @@ func (s *Streamer) init(ctx context.Context, req *objectV2.PatchRequest) error {
|
|||
|
||||
RangeProvider: rangeProvider,
|
||||
|
||||
ObjectWriter: putstm.Target(),
|
||||
ObjectWriter: target,
|
||||
}
|
||||
|
||||
s.patcher = patcher.New(patcherPrm)
|
||||
|
|
|
@ -6,31 +6,12 @@ import (
|
|||
"errors"
|
||||
"fmt"
|
||||
|
||||
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/refs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/session"
|
||||
putsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/put"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
|
||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
|
||||
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
||||
)
|
||||
|
||||
// putInitPrm initializes put paramerer for Put stream.
|
||||
func (s *Streamer) putInitPrm(req *objectV2.PatchRequest, obj *objectV2.Object) (*putsvc.PutInitPrm, error) {
|
||||
commonPrm, err := util.CommonPrmFromV2(req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
prm := new(putsvc.PutInitPrm)
|
||||
prm.WithObject(objectSDK.NewFromV2(obj)).
|
||||
WithCommonPrm(commonPrm).
|
||||
WithPrivateKey(s.localNodeKey)
|
||||
|
||||
return prm, nil
|
||||
}
|
||||
|
||||
func newOwnerID(vh *session.RequestVerificationHeader) (*refs.OwnerID, error) {
|
||||
for vh.GetOrigin() != nil {
|
||||
vh = vh.GetOrigin()
|
||||
|
|
|
@ -1,132 +1,66 @@
|
|||
package putsvc
|
||||
|
||||
import (
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||
objectwriter "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/common/writer"
|
||||
objutil "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
type MaxSizeSource interface {
|
||||
// MaxObjectSize returns maximum payload size
|
||||
// of physically stored object in system.
|
||||
//
|
||||
// Must return 0 if value can not be obtained.
|
||||
MaxObjectSize() uint64
|
||||
}
|
||||
|
||||
type Service struct {
|
||||
*cfg
|
||||
}
|
||||
|
||||
type Option func(*cfg)
|
||||
|
||||
type ClientConstructor interface {
|
||||
Get(client.NodeInfo) (client.MultiAddressClient, error)
|
||||
}
|
||||
|
||||
type InnerRing interface {
|
||||
InnerRingKeys() ([][]byte, error)
|
||||
}
|
||||
|
||||
type FormatValidatorConfig interface {
|
||||
VerifySessionTokenIssuer() bool
|
||||
}
|
||||
|
||||
type cfg struct {
|
||||
keyStorage *objutil.KeyStorage
|
||||
|
||||
maxSizeSrc MaxSizeSource
|
||||
|
||||
localStore ObjectStorage
|
||||
|
||||
cnrSrc container.Source
|
||||
|
||||
netMapSrc netmap.Source
|
||||
|
||||
remotePool, localPool util.WorkerPool
|
||||
|
||||
netmapKeys netmap.AnnouncedKeys
|
||||
|
||||
fmtValidator *object.FormatValidator
|
||||
|
||||
networkState netmap.State
|
||||
|
||||
clientConstructor ClientConstructor
|
||||
|
||||
log *logger.Logger
|
||||
|
||||
verifySessionTokenIssuer bool
|
||||
*objectwriter.Config
|
||||
}
|
||||
|
||||
func NewService(ks *objutil.KeyStorage,
|
||||
cc ClientConstructor,
|
||||
ms MaxSizeSource,
|
||||
os ObjectStorage,
|
||||
cc objectwriter.ClientConstructor,
|
||||
ms objectwriter.MaxSizeSource,
|
||||
os objectwriter.ObjectStorage,
|
||||
cs container.Source,
|
||||
ns netmap.Source,
|
||||
nk netmap.AnnouncedKeys,
|
||||
nst netmap.State,
|
||||
ir InnerRing,
|
||||
opts ...Option,
|
||||
ir objectwriter.InnerRing,
|
||||
opts ...objectwriter.Option,
|
||||
) *Service {
|
||||
c := &cfg{
|
||||
remotePool: util.NewPseudoWorkerPool(),
|
||||
localPool: util.NewPseudoWorkerPool(),
|
||||
log: &logger.Logger{Logger: zap.L()},
|
||||
keyStorage: ks,
|
||||
clientConstructor: cc,
|
||||
maxSizeSrc: ms,
|
||||
localStore: os,
|
||||
cnrSrc: cs,
|
||||
netMapSrc: ns,
|
||||
netmapKeys: nk,
|
||||
networkState: nst,
|
||||
c := &objectwriter.Config{
|
||||
RemotePool: util.NewPseudoWorkerPool(),
|
||||
LocalPool: util.NewPseudoWorkerPool(),
|
||||
Logger: &logger.Logger{Logger: zap.L()},
|
||||
KeyStorage: ks,
|
||||
ClientConstructor: cc,
|
||||
MaxSizeSrc: ms,
|
||||
LocalStore: os,
|
||||
ContainerSource: cs,
|
||||
NetmapSource: ns,
|
||||
NetmapKeys: nk,
|
||||
NetworkState: nst,
|
||||
}
|
||||
|
||||
for i := range opts {
|
||||
opts[i](c)
|
||||
}
|
||||
|
||||
c.fmtValidator = object.NewFormatValidator(
|
||||
c.FormatValidator = object.NewFormatValidator(
|
||||
object.WithLockSource(os),
|
||||
object.WithNetState(nst),
|
||||
object.WithInnerRing(ir),
|
||||
object.WithNetmapSource(ns),
|
||||
object.WithContainersSource(cs),
|
||||
object.WithVerifySessionTokenIssuer(c.verifySessionTokenIssuer),
|
||||
object.WithLogger(c.log),
|
||||
object.WithVerifySessionTokenIssuer(c.VerifySessionTokenIssuer),
|
||||
object.WithLogger(c.Logger),
|
||||
)
|
||||
|
||||
return &Service{
|
||||
cfg: c,
|
||||
Config: c,
|
||||
}
|
||||
}
|
||||
|
||||
func (p *Service) Put() (*Streamer, error) {
|
||||
return &Streamer{
|
||||
cfg: p.cfg,
|
||||
Config: p.Config,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func WithWorkerPools(remote, local util.WorkerPool) Option {
|
||||
return func(c *cfg) {
|
||||
c.remotePool, c.localPool = remote, local
|
||||
}
|
||||
}
|
||||
|
||||
func WithLogger(l *logger.Logger) Option {
|
||||
return func(c *cfg) {
|
||||
c.log = l
|
||||
}
|
||||
}
|
||||
|
||||
func WithVerifySessionTokenIssuer(v bool) Option {
|
||||
return func(c *cfg) {
|
||||
c.verifySessionTokenIssuer = v
|
||||
}
|
||||
}
|
||||
|
|
|
@ -21,6 +21,7 @@ import (
|
|||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/policy"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
|
||||
objectwriter "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/common/writer"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/internal"
|
||||
svcutil "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
|
||||
|
@ -35,7 +36,13 @@ import (
|
|||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
var errInvalidPayloadChecksum = errors.New("incorrect payload checksum")
|
||||
var (
|
||||
errInvalidPayloadChecksum = errors.New("incorrect payload checksum")
|
||||
// ErrExceedingMaxSize is returned when payload size is greater than the limit.
|
||||
ErrExceedingMaxSize = errors.New("payload size is greater than the limit")
|
||||
// ErrWrongPayloadSize is returned when chunk payload size is greater than the length declared in header.
|
||||
ErrWrongPayloadSize = errors.New("wrong payload size")
|
||||
)
|
||||
|
||||
type putSingleRequestSigner struct {
|
||||
req *objectAPI.PutSingleRequest
|
||||
|
@ -100,7 +107,7 @@ func (s *Service) validarePutSingleSize(obj *objectSDK.Object) error {
|
|||
return ErrWrongPayloadSize
|
||||
}
|
||||
|
||||
maxAllowedSize := s.maxSizeSrc.MaxObjectSize()
|
||||
maxAllowedSize := s.Config.MaxSizeSrc.MaxObjectSize()
|
||||
if obj.PayloadSize() > maxAllowedSize {
|
||||
return ErrExceedingMaxSize
|
||||
}
|
||||
|
@ -137,11 +144,11 @@ func (s *Service) validatePutSingleChecksum(obj *objectSDK.Object) error {
|
|||
}
|
||||
|
||||
func (s *Service) validatePutSingleObject(ctx context.Context, obj *objectSDK.Object) (object.ContentMeta, error) {
|
||||
if err := s.fmtValidator.Validate(ctx, obj, false); err != nil {
|
||||
if err := s.FormatValidator.Validate(ctx, obj, false); err != nil {
|
||||
return object.ContentMeta{}, fmt.Errorf("coud not validate object format: %w", err)
|
||||
}
|
||||
|
||||
meta, err := s.fmtValidator.ValidateContent(obj)
|
||||
meta, err := s.FormatValidator.ValidateContent(obj)
|
||||
if err != nil {
|
||||
return object.ContentMeta{}, fmt.Errorf("could not validate payload content: %w", err)
|
||||
}
|
||||
|
@ -164,17 +171,17 @@ func (s *Service) saveToNodes(ctx context.Context, obj *objectSDK.Object, req *o
|
|||
}
|
||||
|
||||
func (s *Service) saveToREPReplicas(ctx context.Context, placement putSinglePlacement, obj *objectSDK.Object, localOnly bool, req *objectAPI.PutSingleRequest, meta object.ContentMeta) error {
|
||||
iter := s.cfg.newNodeIterator(placement.placementOptions)
|
||||
iter.extraBroadcastEnabled = needAdditionalBroadcast(obj, localOnly)
|
||||
iter.resetSuccessAfterOnBroadcast = placement.resetSuccessAfterOnBroadcast
|
||||
iter := s.Config.NewNodeIterator(placement.placementOptions)
|
||||
iter.ExtraBroadcastEnabled = objectwriter.NeedAdditionalBroadcast(obj, localOnly)
|
||||
iter.ResetSuccessAfterOnBroadcast = placement.resetSuccessAfterOnBroadcast
|
||||
|
||||
signer := &putSingleRequestSigner{
|
||||
req: req,
|
||||
keyStorage: s.keyStorage,
|
||||
keyStorage: s.Config.KeyStorage,
|
||||
signer: &sync.Once{},
|
||||
}
|
||||
|
||||
return iter.forEachNode(ctx, func(ctx context.Context, nd nodeDesc) error {
|
||||
return iter.ForEachNode(ctx, func(ctx context.Context, nd objectwriter.NodeDescriptor) error {
|
||||
return s.saveToPlacementNode(ctx, &nd, obj, signer, meta)
|
||||
})
|
||||
}
|
||||
|
@ -184,25 +191,25 @@ func (s *Service) saveToECReplicas(ctx context.Context, placement putSinglePlace
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
key, err := s.cfg.keyStorage.GetKey(nil)
|
||||
key, err := s.Config.KeyStorage.GetKey(nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
signer := &putSingleRequestSigner{
|
||||
req: req,
|
||||
keyStorage: s.keyStorage,
|
||||
keyStorage: s.Config.KeyStorage,
|
||||
signer: &sync.Once{},
|
||||
}
|
||||
|
||||
w := ecWriter{
|
||||
cfg: s.cfg,
|
||||
placementOpts: placement.placementOptions,
|
||||
objMeta: meta,
|
||||
objMetaValid: true,
|
||||
commonPrm: commonPrm,
|
||||
container: placement.container,
|
||||
key: key,
|
||||
relay: func(ctx context.Context, ni client.NodeInfo, mac client.MultiAddressClient) error {
|
||||
w := objectwriter.ECWriter{
|
||||
Config: s.Config,
|
||||
PlacementOpts: placement.placementOptions,
|
||||
ObjectMeta: meta,
|
||||
ObjectMetaValid: true,
|
||||
CommonPrm: commonPrm,
|
||||
Container: placement.container,
|
||||
Key: key,
|
||||
Relay: func(ctx context.Context, ni client.NodeInfo, mac client.MultiAddressClient) error {
|
||||
return s.redirectPutSingleRequest(ctx, signer, obj, ni, mac)
|
||||
},
|
||||
}
|
||||
|
@ -223,7 +230,7 @@ func (s *Service) getPutSinglePlacementOptions(obj *objectSDK.Object, copiesNumb
|
|||
if !ok {
|
||||
return result, errors.New("missing container ID")
|
||||
}
|
||||
cnrInfo, err := s.cnrSrc.Get(cnrID)
|
||||
cnrInfo, err := s.Config.ContainerSource.Get(cnrID)
|
||||
if err != nil {
|
||||
return result, fmt.Errorf("could not get container by ID: %w", err)
|
||||
}
|
||||
|
@ -247,31 +254,31 @@ func (s *Service) getPutSinglePlacementOptions(obj *objectSDK.Object, copiesNumb
|
|||
}
|
||||
result.placementOptions = append(result.placementOptions, placement.ForObject(objID))
|
||||
|
||||
latestNetmap, err := netmap.GetLatestNetworkMap(s.netMapSrc)
|
||||
latestNetmap, err := netmap.GetLatestNetworkMap(s.Config.NetmapSource)
|
||||
if err != nil {
|
||||
return result, fmt.Errorf("could not get latest network map: %w", err)
|
||||
}
|
||||
builder := placement.NewNetworkMapBuilder(latestNetmap)
|
||||
if localOnly {
|
||||
result.placementOptions = append(result.placementOptions, placement.SuccessAfter(1))
|
||||
builder = svcutil.NewLocalPlacement(builder, s.netmapKeys)
|
||||
builder = svcutil.NewLocalPlacement(builder, s.Config.NetmapKeys)
|
||||
}
|
||||
result.placementOptions = append(result.placementOptions, placement.UseBuilder(builder))
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (s *Service) saveToPlacementNode(ctx context.Context, nodeDesc *nodeDesc, obj *objectSDK.Object,
|
||||
func (s *Service) saveToPlacementNode(ctx context.Context, nodeDesc *objectwriter.NodeDescriptor, obj *objectSDK.Object,
|
||||
signer *putSingleRequestSigner, meta object.ContentMeta,
|
||||
) error {
|
||||
if nodeDesc.local {
|
||||
if nodeDesc.Local {
|
||||
return s.saveLocal(ctx, obj, meta)
|
||||
}
|
||||
|
||||
var info client.NodeInfo
|
||||
|
||||
client.NodeInfoFromNetmapElement(&info, nodeDesc.info)
|
||||
client.NodeInfoFromNetmapElement(&info, nodeDesc.Info)
|
||||
|
||||
c, err := s.clientConstructor.Get(info)
|
||||
c, err := s.Config.ClientConstructor.Get(info)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not create SDK client %s: %w", info.AddressGroup(), err)
|
||||
}
|
||||
|
@ -280,8 +287,8 @@ func (s *Service) saveToPlacementNode(ctx context.Context, nodeDesc *nodeDesc, o
|
|||
}
|
||||
|
||||
func (s *Service) saveLocal(ctx context.Context, obj *objectSDK.Object, meta object.ContentMeta) error {
|
||||
localTarget := &localTarget{
|
||||
storage: s.localStore,
|
||||
localTarget := &objectwriter.LocalTarget{
|
||||
Storage: s.Config.LocalStore,
|
||||
}
|
||||
return localTarget.WriteObject(ctx, obj, meta)
|
||||
}
|
||||
|
@ -314,7 +321,7 @@ func (s *Service) redirectPutSingleRequest(ctx context.Context,
|
|||
if err != nil {
|
||||
objID, _ := obj.ID()
|
||||
cnrID, _ := obj.ContainerID()
|
||||
s.log.Warn(logs.PutSingleRedirectFailure,
|
||||
s.Config.Logger.Warn(logs.PutSingleRedirectFailure,
|
||||
zap.Error(err),
|
||||
zap.Stringer("address", addr),
|
||||
zap.Stringer("object_id", objID),
|
||||
|
|
|
@ -2,33 +2,21 @@ package putsvc
|
|||
|
||||
import (
|
||||
"context"
|
||||
"crypto/ecdsa"
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/policy"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
|
||||
pkgutil "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
|
||||
containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
|
||||
writetarget "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/common/target"
|
||||
objectwriter "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/common/writer"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/transformer"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
|
||||
)
|
||||
|
||||
type Streamer struct {
|
||||
*cfg
|
||||
|
||||
privateKey *ecdsa.PrivateKey
|
||||
*objectwriter.Config
|
||||
|
||||
target transformer.ChunkedObjectWriter
|
||||
|
||||
relay func(context.Context, client.NodeInfo, client.MultiAddressClient) error
|
||||
|
||||
maxPayloadSz uint64 // network config
|
||||
}
|
||||
|
||||
var errNotInit = errors.New("stream not initialized")
|
||||
|
@ -36,8 +24,23 @@ var errNotInit = errors.New("stream not initialized")
|
|||
var errInitRecall = errors.New("init recall")
|
||||
|
||||
func (p *Streamer) Init(ctx context.Context, prm *PutInitPrm) error {
|
||||
if p.target != nil {
|
||||
return errInitRecall
|
||||
}
|
||||
|
||||
// initialize destination target
|
||||
if err := p.initTarget(prm); err != nil {
|
||||
prmTarget := &objectwriter.Params{
|
||||
Config: p.Config,
|
||||
Common: prm.common,
|
||||
Header: prm.hdr,
|
||||
Container: prm.cnr,
|
||||
TraverseOpts: prm.traverseOpts,
|
||||
Relay: p.relay,
|
||||
}
|
||||
|
||||
var err error
|
||||
p.target, err = writetarget.New(prmTarget)
|
||||
if err != nil {
|
||||
return fmt.Errorf("(%T) could not initialize object target: %w", p, err)
|
||||
}
|
||||
|
||||
|
@ -47,253 +50,6 @@ func (p *Streamer) Init(ctx context.Context, prm *PutInitPrm) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// Target accesses underlying target chunked object writer.
|
||||
func (p *Streamer) Target() transformer.ChunkedObjectWriter {
|
||||
return p.target
|
||||
}
|
||||
|
||||
// MaxObjectSize returns maximum payload size for the streaming session.
|
||||
//
|
||||
// Must be called after the successful Init.
|
||||
func (p *Streamer) MaxObjectSize() uint64 {
|
||||
return p.maxPayloadSz
|
||||
}
|
||||
|
||||
func (p *Streamer) initTarget(prm *PutInitPrm) error {
|
||||
// prevent re-calling
|
||||
if p.target != nil {
|
||||
return errInitRecall
|
||||
}
|
||||
|
||||
// prepare needed put parameters
|
||||
if err := p.preparePrm(prm); err != nil {
|
||||
return fmt.Errorf("(%T) could not prepare put parameters: %w", p, err)
|
||||
}
|
||||
|
||||
p.maxPayloadSz = p.maxSizeSrc.MaxObjectSize()
|
||||
if p.maxPayloadSz == 0 {
|
||||
return fmt.Errorf("(%T) could not obtain max object size parameter", p)
|
||||
}
|
||||
|
||||
if prm.hdr.Signature() != nil {
|
||||
return p.initUntrustedTarget(prm)
|
||||
}
|
||||
return p.initTrustedTarget(prm)
|
||||
}
|
||||
|
||||
func (p *Streamer) initUntrustedTarget(prm *PutInitPrm) error {
|
||||
p.relay = prm.relay
|
||||
|
||||
if prm.privateKey != nil {
|
||||
p.privateKey = prm.privateKey
|
||||
} else {
|
||||
nodeKey, err := p.cfg.keyStorage.GetKey(nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
p.privateKey = nodeKey
|
||||
}
|
||||
|
||||
// prepare untrusted-Put object target
|
||||
p.target = &validatingPreparedTarget{
|
||||
nextTarget: newInMemoryObjectBuilder(p.newObjectWriter(prm)),
|
||||
fmt: p.fmtValidator,
|
||||
|
||||
maxPayloadSz: p.maxPayloadSz,
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *Streamer) initTrustedTarget(prm *PutInitPrm) error {
|
||||
sToken := prm.common.SessionToken()
|
||||
|
||||
// prepare trusted-Put object target
|
||||
|
||||
// get private token from local storage
|
||||
var sessionInfo *util.SessionInfo
|
||||
|
||||
if sToken != nil {
|
||||
sessionInfo = &util.SessionInfo{
|
||||
ID: sToken.ID(),
|
||||
Owner: sToken.Issuer(),
|
||||
}
|
||||
}
|
||||
|
||||
key, err := p.keyStorage.GetKey(sessionInfo)
|
||||
if err != nil {
|
||||
return fmt.Errorf("(%T) could not receive session key: %w", p, err)
|
||||
}
|
||||
|
||||
// In case session token is missing, the line above returns the default key.
|
||||
// If it isn't owner key, replication attempts will fail, thus this check.
|
||||
ownerObj := prm.hdr.OwnerID()
|
||||
if ownerObj.IsEmpty() {
|
||||
return errors.New("missing object owner")
|
||||
}
|
||||
|
||||
if sToken == nil {
|
||||
var ownerSession user.ID
|
||||
user.IDFromKey(&ownerSession, key.PublicKey)
|
||||
|
||||
if !ownerObj.Equals(ownerSession) {
|
||||
return fmt.Errorf("(%T) session token is missing but object owner id is different from the default key", p)
|
||||
}
|
||||
} else {
|
||||
if !ownerObj.Equals(sessionInfo.Owner) {
|
||||
return fmt.Errorf("(%T) different token issuer and object owner identifiers %s/%s", p, sessionInfo.Owner, ownerObj)
|
||||
}
|
||||
}
|
||||
|
||||
if prm.privateKey != nil {
|
||||
p.privateKey = prm.privateKey
|
||||
} else {
|
||||
p.privateKey = key
|
||||
}
|
||||
p.target = &validatingTarget{
|
||||
fmt: p.fmtValidator,
|
||||
nextTarget: transformer.NewPayloadSizeLimiter(transformer.Params{
|
||||
Key: key,
|
||||
NextTargetInit: func() transformer.ObjectWriter { return p.newObjectWriter(prm) },
|
||||
NetworkState: p.networkState,
|
||||
MaxSize: p.maxPayloadSz,
|
||||
WithoutHomomorphicHash: containerSDK.IsHomomorphicHashingDisabled(prm.cnr),
|
||||
SessionToken: sToken,
|
||||
}),
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *Streamer) preparePrm(prm *PutInitPrm) error {
|
||||
var err error
|
||||
|
||||
// get latest network map
|
||||
nm, err := netmap.GetLatestNetworkMap(p.netMapSrc)
|
||||
if err != nil {
|
||||
return fmt.Errorf("(%T) could not get latest network map: %w", p, err)
|
||||
}
|
||||
|
||||
idCnr, ok := prm.hdr.ContainerID()
|
||||
if !ok {
|
||||
return errors.New("missing container ID")
|
||||
}
|
||||
|
||||
// get container to store the object
|
||||
cnrInfo, err := p.cnrSrc.Get(idCnr)
|
||||
if err != nil {
|
||||
return fmt.Errorf("(%T) could not get container by ID: %w", p, err)
|
||||
}
|
||||
|
||||
prm.cnr = cnrInfo.Value
|
||||
|
||||
// add common options
|
||||
prm.traverseOpts = append(prm.traverseOpts,
|
||||
// set processing container
|
||||
placement.ForContainer(prm.cnr),
|
||||
)
|
||||
|
||||
if ech := prm.hdr.ECHeader(); ech != nil {
|
||||
prm.traverseOpts = append(prm.traverseOpts,
|
||||
// set identifier of the processing object
|
||||
placement.ForObject(ech.Parent()),
|
||||
)
|
||||
} else if id, ok := prm.hdr.ID(); ok {
|
||||
prm.traverseOpts = append(prm.traverseOpts,
|
||||
// set identifier of the processing object
|
||||
placement.ForObject(id),
|
||||
)
|
||||
}
|
||||
|
||||
// create placement builder from network map
|
||||
builder := placement.NewNetworkMapBuilder(nm)
|
||||
|
||||
if prm.common.LocalOnly() {
|
||||
// restrict success count to 1 stored copy (to local storage)
|
||||
prm.traverseOpts = append(prm.traverseOpts, placement.SuccessAfter(1))
|
||||
|
||||
// use local-only placement builder
|
||||
builder = util.NewLocalPlacement(builder, p.netmapKeys)
|
||||
}
|
||||
|
||||
// set placement builder
|
||||
prm.traverseOpts = append(prm.traverseOpts, placement.UseBuilder(builder))
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *Streamer) newObjectWriter(prm *PutInitPrm) transformer.ObjectWriter {
|
||||
if container.IsECContainer(prm.cnr) && object.IsECSupported(prm.hdr) {
|
||||
return p.newECWriter(prm)
|
||||
}
|
||||
return p.newDefaultObjectWriter(prm, false)
|
||||
}
|
||||
|
||||
func (p *Streamer) newDefaultObjectWriter(prm *PutInitPrm, forECPlacement bool) transformer.ObjectWriter {
|
||||
var relay func(context.Context, nodeDesc) error
|
||||
if p.relay != nil {
|
||||
relay = func(ctx context.Context, node nodeDesc) error {
|
||||
var info client.NodeInfo
|
||||
|
||||
client.NodeInfoFromNetmapElement(&info, node.info)
|
||||
|
||||
c, err := p.clientConstructor.Get(info)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not create SDK client %s: %w", info.AddressGroup(), err)
|
||||
}
|
||||
|
||||
return p.relay(ctx, info, c)
|
||||
}
|
||||
}
|
||||
|
||||
var resetSuccessAfterOnBroadcast bool
|
||||
traverseOpts := prm.traverseOpts
|
||||
if forECPlacement && !prm.common.LocalOnly() {
|
||||
// save non-regular and linking object to EC container.
|
||||
// EC 2.1 -> REP 2, EC 2.2 -> REP 3 etc.
|
||||
traverseOpts = append(traverseOpts, placement.SuccessAfter(uint32(policy.ECParityCount(prm.cnr.PlacementPolicy())+1)))
|
||||
resetSuccessAfterOnBroadcast = true
|
||||
}
|
||||
|
||||
return &distributedTarget{
|
||||
cfg: p.cfg,
|
||||
placementOpts: traverseOpts,
|
||||
resetSuccessAfterOnBroadcast: resetSuccessAfterOnBroadcast,
|
||||
nodeTargetInitializer: func(node nodeDesc) preparedObjectTarget {
|
||||
if node.local {
|
||||
return localTarget{
|
||||
storage: p.localStore,
|
||||
}
|
||||
}
|
||||
|
||||
rt := &remoteTarget{
|
||||
privateKey: p.privateKey,
|
||||
commonPrm: prm.common,
|
||||
clientConstructor: p.clientConstructor,
|
||||
}
|
||||
|
||||
client.NodeInfoFromNetmapElement(&rt.nodeInfo, node.info)
|
||||
|
||||
return rt
|
||||
},
|
||||
relay: relay,
|
||||
}
|
||||
}
|
||||
|
||||
func (p *Streamer) newECWriter(prm *PutInitPrm) transformer.ObjectWriter {
|
||||
return &objectWriterDispatcher{
|
||||
ecWriter: &ecWriter{
|
||||
cfg: p.cfg,
|
||||
placementOpts: append(prm.traverseOpts, placement.WithCopyNumbers(nil)), // copies number ignored for EC
|
||||
container: prm.cnr,
|
||||
key: p.privateKey,
|
||||
commonPrm: prm.common,
|
||||
relay: p.relay,
|
||||
},
|
||||
repWriter: p.newDefaultObjectWriter(prm, true),
|
||||
}
|
||||
}
|
||||
|
||||
func (p *Streamer) SendChunk(ctx context.Context, prm *PutChunkPrm) error {
|
||||
if p.target == nil {
|
||||
return errNotInit
|
||||
|
@ -327,10 +83,3 @@ func (p *Streamer) Close(ctx context.Context) (*PutResponse, error) {
|
|||
id: ids.SelfID,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (c *cfg) getWorkerPool(pub []byte) (pkgutil.WorkerPool, bool) {
|
||||
if c.netmapKeys.IsLocalKey(pub) {
|
||||
return c.localPool, true
|
||||
}
|
||||
return c.remotePool, false
|
||||
}
|
||||
|
|
|
@ -55,7 +55,7 @@ func (s *streamer) Send(ctx context.Context, req *object.PutRequest) (err error)
|
|||
|
||||
s.saveChunks = v.GetSignature() != nil
|
||||
if s.saveChunks {
|
||||
maxSz := s.stream.MaxObjectSize()
|
||||
maxSz := s.stream.MaxSizeSrc.MaxObjectSize()
|
||||
|
||||
s.sizes = &sizes{
|
||||
payloadSz: uint64(v.GetHeader().GetPayloadLength()),
|
||||
|
|
|
@ -5,7 +5,7 @@ import (
|
|||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
|
||||
putsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/put"
|
||||
objectwriter "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/common/writer"
|
||||
tracingPkg "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/tracing"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
||||
|
@ -52,7 +52,7 @@ func (p *Replicator) HandleReplicationTask(ctx context.Context, task Task, res T
|
|||
}
|
||||
}
|
||||
|
||||
prm := new(putsvc.RemotePutPrm).
|
||||
prm := new(objectwriter.RemotePutPrm).
|
||||
WithObject(task.Obj)
|
||||
|
||||
for i := 0; task.NumCopies > 0 && i < len(task.Nodes); i++ {
|
||||
|
|
|
@ -4,8 +4,8 @@ import (
|
|||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
|
||||
objectwriter "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/common/writer"
|
||||
getsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/get"
|
||||
putsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/put"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
@ -24,7 +24,7 @@ type cfg struct {
|
|||
|
||||
log *logger.Logger
|
||||
|
||||
remoteSender *putsvc.RemoteSender
|
||||
remoteSender *objectwriter.RemoteSender
|
||||
|
||||
remoteGetter *getsvc.RemoteGetter
|
||||
|
||||
|
@ -67,7 +67,7 @@ func WithLogger(v *logger.Logger) Option {
|
|||
}
|
||||
|
||||
// WithRemoteSender returns option to set remote object sender of Replicator.
|
||||
func WithRemoteSender(v *putsvc.RemoteSender) Option {
|
||||
func WithRemoteSender(v *objectwriter.RemoteSender) Option {
|
||||
return func(c *cfg) {
|
||||
c.remoteSender = v
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue