From 527fc95160946402e4784b041f30320f9f7a4d24 Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Thu, 20 Apr 2023 13:57:45 +0300 Subject: [PATCH] [#277] getsvc: Fix service deps Signed-off-by: Dmitrii Stepanov --- cmd/frostfs-node/object.go | 17 +- pkg/services/object/get/exec.go | 16 +- pkg/services/object/get/get_test.go | 97 ++++++------ pkg/services/object/get/local.go | 12 +- pkg/services/object/get/remote.go | 2 +- pkg/services/object/get/service.go | 136 ++++------------ pkg/services/object/get/types.go | 230 ++++++++++++++++++++++++++++ pkg/services/object/get/util.go | 188 ----------------------- 8 files changed, 339 insertions(+), 359 deletions(-) create mode 100644 pkg/services/object/get/types.go diff --git a/cmd/frostfs-node/object.go b/cmd/frostfs-node/object.go index 83025a44c..4ff9b8522 100644 --- a/cmd/frostfs-node/object.go +++ b/cmd/frostfs-node/object.go @@ -336,17 +336,14 @@ func createGetService(c *cfg, keyStorage *util.KeyStorage, traverseGen *util.Tra ls := c.cfgObject.cfgLocalStorage.localStorage return getsvc.New( - getsvc.WithLogger(c.log), - getsvc.WithLocalStorageEngine(ls), - getsvc.WithClientConstructor(coreConstructor), - getsvc.WithTraverserGenerator( - traverseGen.WithTraverseOptions( - placement.SuccessAfter(1), - ), + keyStorage, + c.netMapSource, + ls, + traverseGen.WithTraverseOptions( + placement.SuccessAfter(1), ), - getsvc.WithNetMapSource(c.netMapSource), - getsvc.WithKeyStorage(keyStorage), - ) + coreConstructor, + getsvc.WithLogger(c.log)) } func createGetServiceV2(sGet *getsvc.Service, keyStorage *util.KeyStorage) *getsvcV2.Service { diff --git a/pkg/services/object/get/exec.go b/pkg/services/object/get/exec.go index 1bd5aa7f8..e57decb67 100644 --- a/pkg/services/object/get/exec.go +++ b/pkg/services/object/get/exec.go @@ -143,7 +143,7 @@ func (exec *execCtx) initEpoch() bool { return true } - e, err := exec.svc.currentEpochReceiver.currentEpoch() + e, err := exec.svc.epochSource.Epoch() switch { default: @@ -181,20 +181,18 @@ func (exec *execCtx) generateTraverser(addr oid.Address) (*placement.Traverser, } } -func (exec execCtx) remoteClient(info clientcore.NodeInfo) (getClient, bool) { - c, err := exec.svc.clientCache.get(info) - - switch { - default: +func (exec execCtx) remoteClient(info clientcore.NodeInfo) (remoteStorage, bool) { + rs, err := exec.svc.remoteStorageConstructor.Get(info) + if err != nil { exec.status = statusUndefined exec.err = err exec.log.Debug(logs.GetCouldNotConstructRemoteNodeClient) - case err == nil: - return c, true + + return nil, false } - return nil, false + return rs, true } func mergeSplitInfo(dst, src *objectSDK.SplitInfo) { diff --git a/pkg/services/object/get/get_test.go b/pkg/services/object/get/get_test.go index 319bc6b58..15a14ac18 100644 --- a/pkg/services/object/get/get_test.go +++ b/pkg/services/object/get/get_test.go @@ -56,7 +56,7 @@ type testClient struct { type testEpochReceiver uint64 -func (e testEpochReceiver) currentEpoch() (uint64, error) { +func (e testEpochReceiver) Epoch() (uint64, error) { return uint64(e), nil } @@ -99,7 +99,7 @@ func (p *testPlacementBuilder) BuildPlacement(cnr cid.ID, obj *oid.ID, _ netmap. return vs, nil } -func (c *testClientCache) get(info client.NodeInfo) (getClient, error) { +func (c *testClientCache) Get(info client.NodeInfo) (remoteStorage, error) { v, ok := c.clients[network.StringifyGroup(info.AddressGroup())] if !ok { return nil, errors.New("could not construct client") @@ -117,7 +117,7 @@ func newTestClient() *testClient { } } -func (c *testClient) getObject(ctx context.Context, exec *execCtx, _ client.NodeInfo) (*objectSDK.Object, error) { +func (c *testClient) GetObject(ctx context.Context, exec *execCtx, _ client.NodeInfo) (*objectSDK.Object, error) { v, ok := c.results[exec.address().EncodeToString()] if !ok { var errNotFound apistatus.ObjectNotFound @@ -139,11 +139,19 @@ func (c *testClient) addResult(addr oid.Address, obj *objectSDK.Object, err erro }{obj: obj, err: err} } -func (s *testStorage) get(_ context.Context, exec *execCtx) (*objectSDK.Object, error) { +func (s *testStorage) Get(ctx context.Context, address oid.Address) (*objectSDK.Object, error) { + return s.Range(ctx, address, nil) +} + +func (s *testStorage) Head(ctx context.Context, address oid.Address, isRaw bool) (*objectSDK.Object, error) { + return s.Range(ctx, address, nil) +} + +func (s *testStorage) Range(_ context.Context, address oid.Address, rng *objectSDK.Range) (*objectSDK.Object, error) { var ( ok bool obj *objectSDK.Object - sAddr = exec.address().EncodeToString() + sAddr = address.EncodeToString() ) if _, ok = s.inhumed[sAddr]; ok { @@ -157,7 +165,7 @@ func (s *testStorage) get(_ context.Context, exec *execCtx) (*objectSDK.Object, } if obj, ok = s.phy[sAddr]; ok { - return cutToRange(obj, exec.ctxRange()), nil + return cutToRange(obj, rng), nil } var errNotFound apistatus.ObjectNotFound @@ -245,11 +253,10 @@ func TestGetLocalOnly(t *testing.T) { ctx := context.Background() newSvc := func(storage *testStorage) *Service { - svc := &Service{cfg: new(cfg)} - svc.log = test.NewLogger(t, false) - svc.localStorage = storage - - return svc + return &Service{ + log: test.NewLogger(t, false), + localStorage: storage, + } } newPrm := func(raw bool, w ObjectWriter) Prm { @@ -506,22 +513,20 @@ func TestGetRemoteSmall(t *testing.T) { container.CalculateID(&idCnr, cnr) newSvc := func(b *testPlacementBuilder, c *testClientCache) *Service { - svc := &Service{cfg: new(cfg)} - svc.log = test.NewLogger(t, false) - svc.localStorage = newTestStorage() - const curEpoch = 13 - svc.traverserGenerator = &testTraverserGenerator{ - c: cnr, - b: map[uint64]placement.Builder{ - curEpoch: b, + return &Service{ + log: test.NewLogger(t, false), + localStorage: newTestStorage(), + traverserGenerator: &testTraverserGenerator{ + c: cnr, + b: map[uint64]placement.Builder{ + curEpoch: b, + }, }, + epochSource: testEpochReceiver(curEpoch), + remoteStorageConstructor: c, } - svc.clientCache = c - svc.currentEpochReceiver = testEpochReceiver(curEpoch) - - return svc } newPrm := func(raw bool, w ObjectWriter) Prm { @@ -1639,39 +1644,37 @@ func TestGetFromPastEpoch(t *testing.T) { c22 := newTestClient() c22.addResult(addr, obj, nil) - svc := &Service{cfg: new(cfg)} - svc.log = test.NewLogger(t, false) - svc.localStorage = newTestStorage() - const curEpoch = 13 - svc.traverserGenerator = &testTraverserGenerator{ - c: cnr, - b: map[uint64]placement.Builder{ - curEpoch: &testPlacementBuilder{ - vectors: map[string][][]netmap.NodeInfo{ - addr.EncodeToString(): ns[:1], + svc := &Service{ + log: test.NewLogger(t, false), + localStorage: newTestStorage(), + epochSource: testEpochReceiver(curEpoch), + traverserGenerator: &testTraverserGenerator{ + c: cnr, + b: map[uint64]placement.Builder{ + curEpoch: &testPlacementBuilder{ + vectors: map[string][][]netmap.NodeInfo{ + addr.EncodeToString(): ns[:1], + }, }, - }, - curEpoch - 1: &testPlacementBuilder{ - vectors: map[string][][]netmap.NodeInfo{ - addr.EncodeToString(): ns[1:], + curEpoch - 1: &testPlacementBuilder{ + vectors: map[string][][]netmap.NodeInfo{ + addr.EncodeToString(): ns[1:], + }, }, }, }, - } - - svc.clientCache = &testClientCache{ - clients: map[string]*testClient{ - as[0][0]: c11, - as[0][1]: c12, - as[1][0]: c21, - as[1][1]: c22, + remoteStorageConstructor: &testClientCache{ + clients: map[string]*testClient{ + as[0][0]: c11, + as[0][1]: c12, + as[1][0]: c21, + as[1][1]: c22, + }, }, } - svc.currentEpochReceiver = testEpochReceiver(curEpoch) - w := NewSimpleObjectWriter() commonPrm := new(util.CommonPrm) diff --git a/pkg/services/object/get/local.go b/pkg/services/object/get/local.go index 8ac83d97a..97fda6ce7 100644 --- a/pkg/services/object/get/local.go +++ b/pkg/services/object/get/local.go @@ -19,7 +19,7 @@ func (exec *execCtx) executeLocal(ctx context.Context) { var err error - exec.collectedObject, err = exec.svc.localStorage.get(ctx, exec) + exec.collectedObject, err = exec.get(ctx) var errSplitInfo *objectSDK.SplitInfoError var errRemoved apistatus.ObjectAlreadyRemoved @@ -49,3 +49,13 @@ func (exec *execCtx) executeLocal(ctx context.Context) { exec.err = errOutOfRange } } + +func (exec *execCtx) get(ctx context.Context) (*objectSDK.Object, error) { + if exec.headOnly() { + return exec.svc.localStorage.Head(ctx, exec.address(), exec.isRaw()) + } + if rng := exec.ctxRange(); rng != nil { + return exec.svc.localStorage.Range(ctx, exec.address(), rng) + } + return exec.svc.localStorage.Get(ctx, exec.address()) +} diff --git a/pkg/services/object/get/remote.go b/pkg/services/object/get/remote.go index ac8ec5105..56be476f2 100644 --- a/pkg/services/object/get/remote.go +++ b/pkg/services/object/get/remote.go @@ -23,7 +23,7 @@ func (exec *execCtx) processNode(ctx context.Context, info client.NodeInfo) bool return true } - obj, err := client.getObject(ctx, exec, info) + obj, err := client.GetObject(ctx, exec, info) var errSplitInfo *objectSDK.SplitInfoError var errRemoved *apistatus.ObjectAlreadyRemoved diff --git a/pkg/services/object/get/service.go b/pkg/services/object/get/service.go index a9391d016..bdf01a977 100644 --- a/pkg/services/object/get/service.go +++ b/pkg/services/object/get/service.go @@ -1,124 +1,54 @@ package getsvc import ( - "context" - - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" - cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" - "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" - oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "go.uber.org/zap" ) +// Option is a Service's constructor option. +type Option func(*Service) + // Service utility serving requests of Object.Get service. type Service struct { - *cfg -} - -// Option is a Service's constructor option. -type Option func(*cfg) - -type getClient interface { - getObject(context.Context, *execCtx, client.NodeInfo) (*object.Object, error) -} - -type cfg struct { - log *logger.Logger - - localStorage interface { - get(context.Context, *execCtx) (*object.Object, error) - } - - clientCache interface { - get(client.NodeInfo) (getClient, error) - } - - traverserGenerator interface { - GenerateTraverser(cid.ID, *oid.ID, uint64) (*placement.Traverser, error) - } - - currentEpochReceiver interface { - currentEpoch() (uint64, error) - } - - keyStore *util.KeyStorage -} - -func defaultCfg() *cfg { - return &cfg{ - log: &logger.Logger{Logger: zap.L()}, - localStorage: new(storageEngineWrapper), - clientCache: new(clientCacheWrapper), - } + log *logger.Logger + localStorage localStorage + traverserGenerator traverserGenerator + epochSource epochSource + keyStore keyStorage + remoteStorageConstructor remoteStorageConstructor } // New creates, initializes and returns utility serving // Object.Get service requests. -func New(opts ...Option) *Service { - c := defaultCfg() - - for i := range opts { - opts[i](c) +func New( + ks keyStorage, + es epochSource, + e localStorageEngine, + tg traverserGenerator, + cc clientConstructor, + opts ...Option, +) *Service { + result := &Service{ + keyStore: ks, + epochSource: es, + log: &logger.Logger{Logger: zap.L()}, + localStorage: &engineLocalStorage{ + engine: e, + }, + traverserGenerator: tg, + remoteStorageConstructor: &multiclientRemoteStorageConstructor{ + clientConstructor: cc, + }, } - - return &Service{ - cfg: c, + for _, option := range opts { + option(result) } + return result } // WithLogger returns option to specify Get service's logger. func WithLogger(l *logger.Logger) Option { - return func(c *cfg) { - c.log = &logger.Logger{Logger: l.With(zap.String("component", "Object.Get service"))} - } -} - -// WithLocalStorageEngine returns option to set local storage -// instance. -func WithLocalStorageEngine(e *engine.StorageEngine) Option { - return func(c *cfg) { - c.localStorage.(*storageEngineWrapper).engine = e - } -} - -type ClientConstructor interface { - Get(client.NodeInfo) (client.MultiAddressClient, error) -} - -// WithClientConstructor returns option to set constructor of remote node clients. -func WithClientConstructor(v ClientConstructor) Option { - return func(c *cfg) { - c.clientCache.(*clientCacheWrapper).cache = v - } -} - -// WithTraverserGenerator returns option to set generator of -// placement traverser to get the objects from containers. -func WithTraverserGenerator(t *util.TraverserGenerator) Option { - return func(c *cfg) { - c.traverserGenerator = t - } -} - -// WithNetMapSource returns option to set network -// map storage to receive current network state. -func WithNetMapSource(nmSrc netmap.Source) Option { - return func(c *cfg) { - c.currentEpochReceiver = &nmSrcWrapper{ - nmSrc: nmSrc, - } - } -} - -// WithKeyStorage returns option to set private -// key storage for session tokens and node key. -func WithKeyStorage(store *util.KeyStorage) Option { - return func(c *cfg) { - c.keyStore = store + return func(s *Service) { + s.log = &logger.Logger{Logger: l.With(zap.String("component", "Object.Get service"))} } } diff --git a/pkg/services/object/get/types.go b/pkg/services/object/get/types.go new file mode 100644 index 000000000..47f27c0f0 --- /dev/null +++ b/pkg/services/object/get/types.go @@ -0,0 +1,230 @@ +package getsvc + +import ( + "context" + "crypto/ecdsa" + "errors" + + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client" + coreclient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine" + internalclient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/internal/client" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement" + apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" + cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" + objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" + oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" +) + +type epochSource interface { + Epoch() (uint64, error) +} + +type traverserGenerator interface { + GenerateTraverser(cid.ID, *oid.ID, uint64) (*placement.Traverser, error) +} + +type keyStorage interface { + GetKey(info *util.SessionInfo) (*ecdsa.PrivateKey, error) +} + +type localStorageEngine interface { + Head(ctx context.Context, p engine.HeadPrm) (engine.HeadRes, error) + GetRange(ctx context.Context, p engine.RngPrm) (engine.RngRes, error) + Get(ctx context.Context, p engine.GetPrm) (engine.GetRes, error) +} + +type clientConstructor interface { + Get(client.NodeInfo) (client.MultiAddressClient, error) +} + +type remoteStorageConstructor interface { + Get(client.NodeInfo) (remoteStorage, error) +} + +type multiclientRemoteStorageConstructor struct { + clientConstructor clientConstructor +} + +func (c *multiclientRemoteStorageConstructor) Get(info client.NodeInfo) (remoteStorage, error) { + clt, err := c.clientConstructor.Get(info) + if err != nil { + return nil, err + } + + return &multiaddressRemoteStorage{ + client: clt, + }, nil +} + +type remoteStorage interface { + GetObject(context.Context, *execCtx, client.NodeInfo) (*objectSDK.Object, error) +} + +type localStorage interface { + Head(ctx context.Context, address oid.Address, isRaw bool) (*objectSDK.Object, error) + Range(ctx context.Context, address oid.Address, rng *objectSDK.Range) (*objectSDK.Object, error) + Get(ctx context.Context, address oid.Address) (*objectSDK.Object, error) +} + +type engineLocalStorage struct { + engine localStorageEngine +} + +func (s *engineLocalStorage) Head(ctx context.Context, address oid.Address, isRaw bool) (*objectSDK.Object, error) { + var headPrm engine.HeadPrm + headPrm.WithAddress(address) + headPrm.WithRaw(isRaw) + + r, err := s.engine.Head(ctx, headPrm) + if err != nil { + return nil, err + } + + return r.Header(), nil +} + +func (s *engineLocalStorage) Range(ctx context.Context, address oid.Address, rng *objectSDK.Range) (*objectSDK.Object, error) { + var getRange engine.RngPrm + getRange.WithAddress(address) + getRange.WithPayloadRange(rng) + + r, err := s.engine.GetRange(ctx, getRange) + if err != nil { + return nil, err + } + + return r.Object(), nil +} + +func (s *engineLocalStorage) Get(ctx context.Context, address oid.Address) (*objectSDK.Object, error) { + var getPrm engine.GetPrm + getPrm.WithAddress(address) + + r, err := s.engine.Get(ctx, getPrm) + if err != nil { + return nil, err + } + + return r.Object(), nil +} + +type multiaddressRemoteStorage struct { + client coreclient.MultiAddressClient +} + +func (s *multiaddressRemoteStorage) GetObject(ctx context.Context, exec *execCtx, info coreclient.NodeInfo) (*objectSDK.Object, error) { + if exec.isForwardingEnabled() { + return exec.prm.forwarder(ctx, info, s.client) + } + + key, err := exec.key() + if err != nil { + return nil, err + } + + if exec.headOnly() { + return s.getHeadOnly(ctx, exec, key) + } + // we don't specify payload writer because we accumulate + // the object locally (even huge). + if rng := exec.ctxRange(); rng != nil { + // Current spec allows other storage node to deny access, + // fallback to GET here. + return s.getRange(ctx, exec, key, rng) + } + + return s.get(ctx, exec, key) +} + +func (s *multiaddressRemoteStorage) getRange(ctx context.Context, exec *execCtx, key *ecdsa.PrivateKey, rng *object.Range) (*object.Object, error) { + var prm internalclient.PayloadRangePrm + + prm.SetClient(s.client) + prm.SetTTL(exec.prm.common.TTL()) + prm.SetNetmapEpoch(exec.curProcEpoch) + prm.SetAddress(exec.address()) + prm.SetPrivateKey(key) + prm.SetSessionToken(exec.prm.common.SessionToken()) + prm.SetBearerToken(exec.prm.common.BearerToken()) + prm.SetXHeaders(exec.prm.common.XHeaders()) + prm.SetRange(rng) + + if exec.isRaw() { + prm.SetRawFlag() + } + + res, err := internalclient.PayloadRange(ctx, prm) + if err != nil { + var errAccessDenied *apistatus.ObjectAccessDenied + if errors.As(err, &errAccessDenied) { + obj, err := s.get(ctx, exec, key) + if err != nil { + return nil, err + } + + payload := obj.Payload() + from := rng.GetOffset() + to := from + rng.GetLength() + + if pLen := uint64(len(payload)); to < from || pLen < from || pLen < to { + return nil, new(apistatus.ObjectOutOfRange) + } + + return payloadOnlyObject(payload[from:to]), nil + } + return nil, err + } + + return payloadOnlyObject(res.PayloadRange()), nil +} + +func (s *multiaddressRemoteStorage) getHeadOnly(ctx context.Context, exec *execCtx, key *ecdsa.PrivateKey) (*object.Object, error) { + var prm internalclient.HeadObjectPrm + + prm.SetClient(s.client) + prm.SetTTL(exec.prm.common.TTL()) + prm.SetNetmapEpoch(exec.curProcEpoch) + prm.SetAddress(exec.address()) + prm.SetPrivateKey(key) + prm.SetSessionToken(exec.prm.common.SessionToken()) + prm.SetBearerToken(exec.prm.common.BearerToken()) + prm.SetXHeaders(exec.prm.common.XHeaders()) + + if exec.isRaw() { + prm.SetRawFlag() + } + + res, err := internalclient.HeadObject(ctx, prm) + if err != nil { + return nil, err + } + + return res.Header(), nil +} + +func (s *multiaddressRemoteStorage) get(ctx context.Context, exec *execCtx, key *ecdsa.PrivateKey) (*object.Object, error) { + var prm internalclient.GetObjectPrm + + prm.SetClient(s.client) + prm.SetTTL(exec.prm.common.TTL()) + prm.SetNetmapEpoch(exec.curProcEpoch) + prm.SetAddress(exec.address()) + prm.SetPrivateKey(key) + prm.SetSessionToken(exec.prm.common.SessionToken()) + prm.SetBearerToken(exec.prm.common.BearerToken()) + prm.SetXHeaders(exec.prm.common.XHeaders()) + + if exec.isRaw() { + prm.SetRawFlag() + } + + res, err := internalclient.GetObject(ctx, prm) + if err != nil { + return nil, err + } + + return res.Object(), nil +} diff --git a/pkg/services/object/get/util.go b/pkg/services/object/get/util.go index dd4ace407..3038d0d5e 100644 --- a/pkg/services/object/get/util.go +++ b/pkg/services/object/get/util.go @@ -2,15 +2,8 @@ package getsvc import ( "context" - "crypto/ecdsa" - "errors" "io" - coreclient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine" - internalclient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/internal/client" - apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" ) @@ -20,18 +13,6 @@ type SimpleObjectWriter struct { pld []byte } -type clientCacheWrapper struct { - cache ClientConstructor -} - -type clientWrapper struct { - client coreclient.MultiAddressClient -} - -type storageEngineWrapper struct { - engine *engine.StorageEngine -} - type partWriter struct { ObjectWriter @@ -44,10 +25,6 @@ type hasherWrapper struct { hash io.Writer } -type nmSrcWrapper struct { - nmSrc netmap.Source -} - func NewSimpleObjectWriter() *SimpleObjectWriter { return &SimpleObjectWriter{ obj: object.New(), @@ -75,167 +52,6 @@ func (s *SimpleObjectWriter) Object() *object.Object { return s.obj } -func (c *clientCacheWrapper) get(info coreclient.NodeInfo) (getClient, error) { - clt, err := c.cache.Get(info) - if err != nil { - return nil, err - } - - return &clientWrapper{ - client: clt, - }, nil -} - -func (c *clientWrapper) getObject(ctx context.Context, exec *execCtx, info coreclient.NodeInfo) (*object.Object, error) { - if exec.isForwardingEnabled() { - return exec.prm.forwarder(ctx, info, c.client) - } - - key, err := exec.key() - if err != nil { - return nil, err - } - - if exec.headOnly() { - return c.getHeadOnly(ctx, exec, key) - } - // we don't specify payload writer because we accumulate - // the object locally (even huge). - if rng := exec.ctxRange(); rng != nil { - // Current spec allows other storage node to deny access, - // fallback to GET here. - return c.getRange(ctx, exec, key, rng) - } - - return c.get(ctx, exec, key) -} - -func (c *clientWrapper) getRange(ctx context.Context, exec *execCtx, key *ecdsa.PrivateKey, rng *object.Range) (*object.Object, error) { - var prm internalclient.PayloadRangePrm - - prm.SetClient(c.client) - prm.SetTTL(exec.prm.common.TTL()) - prm.SetNetmapEpoch(exec.curProcEpoch) - prm.SetAddress(exec.address()) - prm.SetPrivateKey(key) - prm.SetSessionToken(exec.prm.common.SessionToken()) - prm.SetBearerToken(exec.prm.common.BearerToken()) - prm.SetXHeaders(exec.prm.common.XHeaders()) - prm.SetRange(rng) - - if exec.isRaw() { - prm.SetRawFlag() - } - - res, err := internalclient.PayloadRange(ctx, prm) - if err != nil { - var errAccessDenied *apistatus.ObjectAccessDenied - if errors.As(err, &errAccessDenied) { - obj, err := c.get(ctx, exec, key) - if err != nil { - return nil, err - } - - payload := obj.Payload() - from := rng.GetOffset() - to := from + rng.GetLength() - - if pLen := uint64(len(payload)); to < from || pLen < from || pLen < to { - return nil, new(apistatus.ObjectOutOfRange) - } - - return payloadOnlyObject(payload[from:to]), nil - } - return nil, err - } - - return payloadOnlyObject(res.PayloadRange()), nil -} - -func (c *clientWrapper) getHeadOnly(ctx context.Context, exec *execCtx, key *ecdsa.PrivateKey) (*object.Object, error) { - var prm internalclient.HeadObjectPrm - - prm.SetClient(c.client) - prm.SetTTL(exec.prm.common.TTL()) - prm.SetNetmapEpoch(exec.curProcEpoch) - prm.SetAddress(exec.address()) - prm.SetPrivateKey(key) - prm.SetSessionToken(exec.prm.common.SessionToken()) - prm.SetBearerToken(exec.prm.common.BearerToken()) - prm.SetXHeaders(exec.prm.common.XHeaders()) - - if exec.isRaw() { - prm.SetRawFlag() - } - - res, err := internalclient.HeadObject(ctx, prm) - if err != nil { - return nil, err - } - - return res.Header(), nil -} - -func (c *clientWrapper) get(ctx context.Context, exec *execCtx, key *ecdsa.PrivateKey) (*object.Object, error) { - var prm internalclient.GetObjectPrm - - prm.SetClient(c.client) - prm.SetTTL(exec.prm.common.TTL()) - prm.SetNetmapEpoch(exec.curProcEpoch) - prm.SetAddress(exec.address()) - prm.SetPrivateKey(key) - prm.SetSessionToken(exec.prm.common.SessionToken()) - prm.SetBearerToken(exec.prm.common.BearerToken()) - prm.SetXHeaders(exec.prm.common.XHeaders()) - - if exec.isRaw() { - prm.SetRawFlag() - } - - res, err := internalclient.GetObject(ctx, prm) - if err != nil { - return nil, err - } - - return res.Object(), nil -} - -func (e *storageEngineWrapper) get(ctx context.Context, exec *execCtx) (*object.Object, error) { - if exec.headOnly() { - var headPrm engine.HeadPrm - headPrm.WithAddress(exec.address()) - headPrm.WithRaw(exec.isRaw()) - - r, err := e.engine.Head(ctx, headPrm) - if err != nil { - return nil, err - } - - return r.Header(), nil - } else if rng := exec.ctxRange(); rng != nil { - var getRange engine.RngPrm - getRange.WithAddress(exec.address()) - getRange.WithPayloadRange(rng) - - r, err := e.engine.GetRange(ctx, getRange) - if err != nil { - return nil, err - } - - return r.Object(), nil - } else { - var getPrm engine.GetPrm - getPrm.WithAddress(exec.address()) - - r, err := e.engine.Get(ctx, getPrm) - if err != nil { - return nil, err - } - - return r.Object(), nil - } -} - func (w *partWriter) WriteChunk(ctx context.Context, p []byte) error { return w.chunkWriter.WriteChunk(ctx, p) } @@ -255,7 +71,3 @@ func (h *hasherWrapper) WriteChunk(_ context.Context, p []byte) error { _, err := h.hash.Write(p) return err } - -func (n *nmSrcWrapper) currentEpoch() (uint64, error) { - return n.nmSrc.Epoch() -}