[#277] getsvc: Fix service deps
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
parent
8fc082b688
commit
30e1b62b67
8 changed files with 339 additions and 359 deletions
|
@ -336,17 +336,14 @@ func createGetService(c *cfg, keyStorage *util.KeyStorage, traverseGen *util.Tra
|
|||
ls := c.cfgObject.cfgLocalStorage.localStorage
|
||||
|
||||
return getsvc.New(
|
||||
getsvc.WithLogger(c.log),
|
||||
getsvc.WithLocalStorageEngine(ls),
|
||||
getsvc.WithClientConstructor(coreConstructor),
|
||||
getsvc.WithTraverserGenerator(
|
||||
keyStorage,
|
||||
c.netMapSource,
|
||||
ls,
|
||||
traverseGen.WithTraverseOptions(
|
||||
placement.SuccessAfter(1),
|
||||
),
|
||||
),
|
||||
getsvc.WithNetMapSource(c.netMapSource),
|
||||
getsvc.WithKeyStorage(keyStorage),
|
||||
)
|
||||
coreConstructor,
|
||||
getsvc.WithLogger(c.log))
|
||||
}
|
||||
|
||||
func createGetServiceV2(sGet *getsvc.Service, keyStorage *util.KeyStorage) *getsvcV2.Service {
|
||||
|
|
|
@ -143,7 +143,7 @@ func (exec *execCtx) initEpoch() bool {
|
|||
return true
|
||||
}
|
||||
|
||||
e, err := exec.svc.currentEpochReceiver.currentEpoch()
|
||||
e, err := exec.svc.epochSource.Epoch()
|
||||
|
||||
switch {
|
||||
default:
|
||||
|
@ -181,20 +181,18 @@ func (exec *execCtx) generateTraverser(addr oid.Address) (*placement.Traverser,
|
|||
}
|
||||
}
|
||||
|
||||
func (exec execCtx) remoteClient(info clientcore.NodeInfo) (getClient, bool) {
|
||||
c, err := exec.svc.clientCache.get(info)
|
||||
|
||||
switch {
|
||||
default:
|
||||
func (exec execCtx) remoteClient(info clientcore.NodeInfo) (remoteStorage, bool) {
|
||||
rs, err := exec.svc.remoteStorageConstructor.Get(info)
|
||||
if err != nil {
|
||||
exec.status = statusUndefined
|
||||
exec.err = err
|
||||
|
||||
exec.log.Debug(logs.GetCouldNotConstructRemoteNodeClient)
|
||||
case err == nil:
|
||||
return c, true
|
||||
}
|
||||
|
||||
return nil, false
|
||||
}
|
||||
|
||||
return rs, true
|
||||
}
|
||||
|
||||
func mergeSplitInfo(dst, src *objectSDK.SplitInfo) {
|
||||
|
|
|
@ -56,7 +56,7 @@ type testClient struct {
|
|||
|
||||
type testEpochReceiver uint64
|
||||
|
||||
func (e testEpochReceiver) currentEpoch() (uint64, error) {
|
||||
func (e testEpochReceiver) Epoch() (uint64, error) {
|
||||
return uint64(e), nil
|
||||
}
|
||||
|
||||
|
@ -99,7 +99,7 @@ func (p *testPlacementBuilder) BuildPlacement(cnr cid.ID, obj *oid.ID, _ netmap.
|
|||
return vs, nil
|
||||
}
|
||||
|
||||
func (c *testClientCache) get(info client.NodeInfo) (getClient, error) {
|
||||
func (c *testClientCache) Get(info client.NodeInfo) (remoteStorage, error) {
|
||||
v, ok := c.clients[network.StringifyGroup(info.AddressGroup())]
|
||||
if !ok {
|
||||
return nil, errors.New("could not construct client")
|
||||
|
@ -117,7 +117,7 @@ func newTestClient() *testClient {
|
|||
}
|
||||
}
|
||||
|
||||
func (c *testClient) getObject(ctx context.Context, exec *execCtx, _ client.NodeInfo) (*objectSDK.Object, error) {
|
||||
func (c *testClient) GetObject(ctx context.Context, exec *execCtx, _ client.NodeInfo) (*objectSDK.Object, error) {
|
||||
v, ok := c.results[exec.address().EncodeToString()]
|
||||
if !ok {
|
||||
var errNotFound apistatus.ObjectNotFound
|
||||
|
@ -139,11 +139,19 @@ func (c *testClient) addResult(addr oid.Address, obj *objectSDK.Object, err erro
|
|||
}{obj: obj, err: err}
|
||||
}
|
||||
|
||||
func (s *testStorage) get(_ context.Context, exec *execCtx) (*objectSDK.Object, error) {
|
||||
func (s *testStorage) Get(ctx context.Context, address oid.Address) (*objectSDK.Object, error) {
|
||||
return s.Range(ctx, address, nil)
|
||||
}
|
||||
|
||||
func (s *testStorage) Head(ctx context.Context, address oid.Address, isRaw bool) (*objectSDK.Object, error) {
|
||||
return s.Range(ctx, address, nil)
|
||||
}
|
||||
|
||||
func (s *testStorage) Range(_ context.Context, address oid.Address, rng *objectSDK.Range) (*objectSDK.Object, error) {
|
||||
var (
|
||||
ok bool
|
||||
obj *objectSDK.Object
|
||||
sAddr = exec.address().EncodeToString()
|
||||
sAddr = address.EncodeToString()
|
||||
)
|
||||
|
||||
if _, ok = s.inhumed[sAddr]; ok {
|
||||
|
@ -157,7 +165,7 @@ func (s *testStorage) get(_ context.Context, exec *execCtx) (*objectSDK.Object,
|
|||
}
|
||||
|
||||
if obj, ok = s.phy[sAddr]; ok {
|
||||
return cutToRange(obj, exec.ctxRange()), nil
|
||||
return cutToRange(obj, rng), nil
|
||||
}
|
||||
|
||||
var errNotFound apistatus.ObjectNotFound
|
||||
|
@ -245,11 +253,10 @@ func TestGetLocalOnly(t *testing.T) {
|
|||
ctx := context.Background()
|
||||
|
||||
newSvc := func(storage *testStorage) *Service {
|
||||
svc := &Service{cfg: new(cfg)}
|
||||
svc.log = test.NewLogger(t, false)
|
||||
svc.localStorage = storage
|
||||
|
||||
return svc
|
||||
return &Service{
|
||||
log: test.NewLogger(t, false),
|
||||
localStorage: storage,
|
||||
}
|
||||
}
|
||||
|
||||
newPrm := func(raw bool, w ObjectWriter) Prm {
|
||||
|
@ -506,22 +513,20 @@ func TestGetRemoteSmall(t *testing.T) {
|
|||
container.CalculateID(&idCnr, cnr)
|
||||
|
||||
newSvc := func(b *testPlacementBuilder, c *testClientCache) *Service {
|
||||
svc := &Service{cfg: new(cfg)}
|
||||
svc.log = test.NewLogger(t, false)
|
||||
svc.localStorage = newTestStorage()
|
||||
|
||||
const curEpoch = 13
|
||||
|
||||
svc.traverserGenerator = &testTraverserGenerator{
|
||||
return &Service{
|
||||
log: test.NewLogger(t, false),
|
||||
localStorage: newTestStorage(),
|
||||
traverserGenerator: &testTraverserGenerator{
|
||||
c: cnr,
|
||||
b: map[uint64]placement.Builder{
|
||||
curEpoch: b,
|
||||
},
|
||||
},
|
||||
epochSource: testEpochReceiver(curEpoch),
|
||||
remoteStorageConstructor: c,
|
||||
}
|
||||
svc.clientCache = c
|
||||
svc.currentEpochReceiver = testEpochReceiver(curEpoch)
|
||||
|
||||
return svc
|
||||
}
|
||||
|
||||
newPrm := func(raw bool, w ObjectWriter) Prm {
|
||||
|
@ -1639,13 +1644,13 @@ func TestGetFromPastEpoch(t *testing.T) {
|
|||
c22 := newTestClient()
|
||||
c22.addResult(addr, obj, nil)
|
||||
|
||||
svc := &Service{cfg: new(cfg)}
|
||||
svc.log = test.NewLogger(t, false)
|
||||
svc.localStorage = newTestStorage()
|
||||
|
||||
const curEpoch = 13
|
||||
|
||||
svc.traverserGenerator = &testTraverserGenerator{
|
||||
svc := &Service{
|
||||
log: test.NewLogger(t, false),
|
||||
localStorage: newTestStorage(),
|
||||
epochSource: testEpochReceiver(curEpoch),
|
||||
traverserGenerator: &testTraverserGenerator{
|
||||
c: cnr,
|
||||
b: map[uint64]placement.Builder{
|
||||
curEpoch: &testPlacementBuilder{
|
||||
|
@ -1659,19 +1664,17 @@ func TestGetFromPastEpoch(t *testing.T) {
|
|||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
svc.clientCache = &testClientCache{
|
||||
},
|
||||
remoteStorageConstructor: &testClientCache{
|
||||
clients: map[string]*testClient{
|
||||
as[0][0]: c11,
|
||||
as[0][1]: c12,
|
||||
as[1][0]: c21,
|
||||
as[1][1]: c22,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
svc.currentEpochReceiver = testEpochReceiver(curEpoch)
|
||||
|
||||
w := NewSimpleObjectWriter()
|
||||
|
||||
commonPrm := new(util.CommonPrm)
|
||||
|
|
|
@ -19,7 +19,7 @@ func (exec *execCtx) executeLocal(ctx context.Context) {
|
|||
|
||||
var err error
|
||||
|
||||
exec.collectedObject, err = exec.svc.localStorage.get(ctx, exec)
|
||||
exec.collectedObject, err = exec.get(ctx)
|
||||
|
||||
var errSplitInfo *objectSDK.SplitInfoError
|
||||
var errRemoved apistatus.ObjectAlreadyRemoved
|
||||
|
@ -49,3 +49,13 @@ func (exec *execCtx) executeLocal(ctx context.Context) {
|
|||
exec.err = errOutOfRange
|
||||
}
|
||||
}
|
||||
|
||||
func (exec *execCtx) get(ctx context.Context) (*objectSDK.Object, error) {
|
||||
if exec.headOnly() {
|
||||
return exec.svc.localStorage.Head(ctx, exec.address(), exec.isRaw())
|
||||
}
|
||||
if rng := exec.ctxRange(); rng != nil {
|
||||
return exec.svc.localStorage.Range(ctx, exec.address(), rng)
|
||||
}
|
||||
return exec.svc.localStorage.Get(ctx, exec.address())
|
||||
}
|
||||
|
|
|
@ -23,7 +23,7 @@ func (exec *execCtx) processNode(ctx context.Context, info client.NodeInfo) bool
|
|||
return true
|
||||
}
|
||||
|
||||
obj, err := client.getObject(ctx, exec, info)
|
||||
obj, err := client.GetObject(ctx, exec, info)
|
||||
|
||||
var errSplitInfo *objectSDK.SplitInfoError
|
||||
var errRemoved *apistatus.ObjectAlreadyRemoved
|
||||
|
|
|
@ -1,124 +1,54 @@
|
|||
package getsvc
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// Option is a Service's constructor option.
|
||||
type Option func(*Service)
|
||||
|
||||
// Service utility serving requests of Object.Get service.
|
||||
type Service struct {
|
||||
*cfg
|
||||
}
|
||||
|
||||
// Option is a Service's constructor option.
|
||||
type Option func(*cfg)
|
||||
|
||||
type getClient interface {
|
||||
getObject(context.Context, *execCtx, client.NodeInfo) (*object.Object, error)
|
||||
}
|
||||
|
||||
type cfg struct {
|
||||
log *logger.Logger
|
||||
|
||||
localStorage interface {
|
||||
get(context.Context, *execCtx) (*object.Object, error)
|
||||
}
|
||||
|
||||
clientCache interface {
|
||||
get(client.NodeInfo) (getClient, error)
|
||||
}
|
||||
|
||||
traverserGenerator interface {
|
||||
GenerateTraverser(cid.ID, *oid.ID, uint64) (*placement.Traverser, error)
|
||||
}
|
||||
|
||||
currentEpochReceiver interface {
|
||||
currentEpoch() (uint64, error)
|
||||
}
|
||||
|
||||
keyStore *util.KeyStorage
|
||||
}
|
||||
|
||||
func defaultCfg() *cfg {
|
||||
return &cfg{
|
||||
log: &logger.Logger{Logger: zap.L()},
|
||||
localStorage: new(storageEngineWrapper),
|
||||
clientCache: new(clientCacheWrapper),
|
||||
}
|
||||
localStorage localStorage
|
||||
traverserGenerator traverserGenerator
|
||||
epochSource epochSource
|
||||
keyStore keyStorage
|
||||
remoteStorageConstructor remoteStorageConstructor
|
||||
}
|
||||
|
||||
// New creates, initializes and returns utility serving
|
||||
// Object.Get service requests.
|
||||
func New(opts ...Option) *Service {
|
||||
c := defaultCfg()
|
||||
|
||||
for i := range opts {
|
||||
opts[i](c)
|
||||
func New(
|
||||
ks keyStorage,
|
||||
es epochSource,
|
||||
e localStorageEngine,
|
||||
tg traverserGenerator,
|
||||
cc clientConstructor,
|
||||
opts ...Option,
|
||||
) *Service {
|
||||
result := &Service{
|
||||
keyStore: ks,
|
||||
epochSource: es,
|
||||
log: &logger.Logger{Logger: zap.L()},
|
||||
localStorage: &engineLocalStorage{
|
||||
engine: e,
|
||||
},
|
||||
traverserGenerator: tg,
|
||||
remoteStorageConstructor: &multiclientRemoteStorageConstructor{
|
||||
clientConstructor: cc,
|
||||
},
|
||||
}
|
||||
|
||||
return &Service{
|
||||
cfg: c,
|
||||
for _, option := range opts {
|
||||
option(result)
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
// WithLogger returns option to specify Get service's logger.
|
||||
func WithLogger(l *logger.Logger) Option {
|
||||
return func(c *cfg) {
|
||||
c.log = &logger.Logger{Logger: l.With(zap.String("component", "Object.Get service"))}
|
||||
}
|
||||
}
|
||||
|
||||
// WithLocalStorageEngine returns option to set local storage
|
||||
// instance.
|
||||
func WithLocalStorageEngine(e *engine.StorageEngine) Option {
|
||||
return func(c *cfg) {
|
||||
c.localStorage.(*storageEngineWrapper).engine = e
|
||||
}
|
||||
}
|
||||
|
||||
type ClientConstructor interface {
|
||||
Get(client.NodeInfo) (client.MultiAddressClient, error)
|
||||
}
|
||||
|
||||
// WithClientConstructor returns option to set constructor of remote node clients.
|
||||
func WithClientConstructor(v ClientConstructor) Option {
|
||||
return func(c *cfg) {
|
||||
c.clientCache.(*clientCacheWrapper).cache = v
|
||||
}
|
||||
}
|
||||
|
||||
// WithTraverserGenerator returns option to set generator of
|
||||
// placement traverser to get the objects from containers.
|
||||
func WithTraverserGenerator(t *util.TraverserGenerator) Option {
|
||||
return func(c *cfg) {
|
||||
c.traverserGenerator = t
|
||||
}
|
||||
}
|
||||
|
||||
// WithNetMapSource returns option to set network
|
||||
// map storage to receive current network state.
|
||||
func WithNetMapSource(nmSrc netmap.Source) Option {
|
||||
return func(c *cfg) {
|
||||
c.currentEpochReceiver = &nmSrcWrapper{
|
||||
nmSrc: nmSrc,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// WithKeyStorage returns option to set private
|
||||
// key storage for session tokens and node key.
|
||||
func WithKeyStorage(store *util.KeyStorage) Option {
|
||||
return func(c *cfg) {
|
||||
c.keyStore = store
|
||||
return func(s *Service) {
|
||||
s.log = &logger.Logger{Logger: l.With(zap.String("component", "Object.Get service"))}
|
||||
}
|
||||
}
|
||||
|
|
230
pkg/services/object/get/types.go
Normal file
230
pkg/services/object/get/types.go
Normal file
|
@ -0,0 +1,230 @@
|
|||
package getsvc
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/ecdsa"
|
||||
"errors"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
|
||||
coreclient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
|
||||
internalclient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/internal/client"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
|
||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
)
|
||||
|
||||
type epochSource interface {
|
||||
Epoch() (uint64, error)
|
||||
}
|
||||
|
||||
type traverserGenerator interface {
|
||||
GenerateTraverser(cid.ID, *oid.ID, uint64) (*placement.Traverser, error)
|
||||
}
|
||||
|
||||
type keyStorage interface {
|
||||
GetKey(info *util.SessionInfo) (*ecdsa.PrivateKey, error)
|
||||
}
|
||||
|
||||
type localStorageEngine interface {
|
||||
Head(ctx context.Context, p engine.HeadPrm) (engine.HeadRes, error)
|
||||
GetRange(ctx context.Context, p engine.RngPrm) (engine.RngRes, error)
|
||||
Get(ctx context.Context, p engine.GetPrm) (engine.GetRes, error)
|
||||
}
|
||||
|
||||
type clientConstructor interface {
|
||||
Get(client.NodeInfo) (client.MultiAddressClient, error)
|
||||
}
|
||||
|
||||
type remoteStorageConstructor interface {
|
||||
Get(client.NodeInfo) (remoteStorage, error)
|
||||
}
|
||||
|
||||
type multiclientRemoteStorageConstructor struct {
|
||||
clientConstructor clientConstructor
|
||||
}
|
||||
|
||||
func (c *multiclientRemoteStorageConstructor) Get(info client.NodeInfo) (remoteStorage, error) {
|
||||
clt, err := c.clientConstructor.Get(info)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &multiaddressRemoteStorage{
|
||||
client: clt,
|
||||
}, nil
|
||||
}
|
||||
|
||||
type remoteStorage interface {
|
||||
GetObject(context.Context, *execCtx, client.NodeInfo) (*objectSDK.Object, error)
|
||||
}
|
||||
|
||||
type localStorage interface {
|
||||
Head(ctx context.Context, address oid.Address, isRaw bool) (*objectSDK.Object, error)
|
||||
Range(ctx context.Context, address oid.Address, rng *objectSDK.Range) (*objectSDK.Object, error)
|
||||
Get(ctx context.Context, address oid.Address) (*objectSDK.Object, error)
|
||||
}
|
||||
|
||||
type engineLocalStorage struct {
|
||||
engine localStorageEngine
|
||||
}
|
||||
|
||||
func (s *engineLocalStorage) Head(ctx context.Context, address oid.Address, isRaw bool) (*objectSDK.Object, error) {
|
||||
var headPrm engine.HeadPrm
|
||||
headPrm.WithAddress(address)
|
||||
headPrm.WithRaw(isRaw)
|
||||
|
||||
r, err := s.engine.Head(ctx, headPrm)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return r.Header(), nil
|
||||
}
|
||||
|
||||
func (s *engineLocalStorage) Range(ctx context.Context, address oid.Address, rng *objectSDK.Range) (*objectSDK.Object, error) {
|
||||
var getRange engine.RngPrm
|
||||
getRange.WithAddress(address)
|
||||
getRange.WithPayloadRange(rng)
|
||||
|
||||
r, err := s.engine.GetRange(ctx, getRange)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return r.Object(), nil
|
||||
}
|
||||
|
||||
func (s *engineLocalStorage) Get(ctx context.Context, address oid.Address) (*objectSDK.Object, error) {
|
||||
var getPrm engine.GetPrm
|
||||
getPrm.WithAddress(address)
|
||||
|
||||
r, err := s.engine.Get(ctx, getPrm)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return r.Object(), nil
|
||||
}
|
||||
|
||||
type multiaddressRemoteStorage struct {
|
||||
client coreclient.MultiAddressClient
|
||||
}
|
||||
|
||||
func (s *multiaddressRemoteStorage) GetObject(ctx context.Context, exec *execCtx, info coreclient.NodeInfo) (*objectSDK.Object, error) {
|
||||
if exec.isForwardingEnabled() {
|
||||
return exec.prm.forwarder(ctx, info, s.client)
|
||||
}
|
||||
|
||||
key, err := exec.key()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if exec.headOnly() {
|
||||
return s.getHeadOnly(ctx, exec, key)
|
||||
}
|
||||
// we don't specify payload writer because we accumulate
|
||||
// the object locally (even huge).
|
||||
if rng := exec.ctxRange(); rng != nil {
|
||||
// Current spec allows other storage node to deny access,
|
||||
// fallback to GET here.
|
||||
return s.getRange(ctx, exec, key, rng)
|
||||
}
|
||||
|
||||
return s.get(ctx, exec, key)
|
||||
}
|
||||
|
||||
func (s *multiaddressRemoteStorage) getRange(ctx context.Context, exec *execCtx, key *ecdsa.PrivateKey, rng *object.Range) (*object.Object, error) {
|
||||
var prm internalclient.PayloadRangePrm
|
||||
|
||||
prm.SetClient(s.client)
|
||||
prm.SetTTL(exec.prm.common.TTL())
|
||||
prm.SetNetmapEpoch(exec.curProcEpoch)
|
||||
prm.SetAddress(exec.address())
|
||||
prm.SetPrivateKey(key)
|
||||
prm.SetSessionToken(exec.prm.common.SessionToken())
|
||||
prm.SetBearerToken(exec.prm.common.BearerToken())
|
||||
prm.SetXHeaders(exec.prm.common.XHeaders())
|
||||
prm.SetRange(rng)
|
||||
|
||||
if exec.isRaw() {
|
||||
prm.SetRawFlag()
|
||||
}
|
||||
|
||||
res, err := internalclient.PayloadRange(ctx, prm)
|
||||
if err != nil {
|
||||
var errAccessDenied *apistatus.ObjectAccessDenied
|
||||
if errors.As(err, &errAccessDenied) {
|
||||
obj, err := s.get(ctx, exec, key)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
payload := obj.Payload()
|
||||
from := rng.GetOffset()
|
||||
to := from + rng.GetLength()
|
||||
|
||||
if pLen := uint64(len(payload)); to < from || pLen < from || pLen < to {
|
||||
return nil, new(apistatus.ObjectOutOfRange)
|
||||
}
|
||||
|
||||
return payloadOnlyObject(payload[from:to]), nil
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return payloadOnlyObject(res.PayloadRange()), nil
|
||||
}
|
||||
|
||||
func (s *multiaddressRemoteStorage) getHeadOnly(ctx context.Context, exec *execCtx, key *ecdsa.PrivateKey) (*object.Object, error) {
|
||||
var prm internalclient.HeadObjectPrm
|
||||
|
||||
prm.SetClient(s.client)
|
||||
prm.SetTTL(exec.prm.common.TTL())
|
||||
prm.SetNetmapEpoch(exec.curProcEpoch)
|
||||
prm.SetAddress(exec.address())
|
||||
prm.SetPrivateKey(key)
|
||||
prm.SetSessionToken(exec.prm.common.SessionToken())
|
||||
prm.SetBearerToken(exec.prm.common.BearerToken())
|
||||
prm.SetXHeaders(exec.prm.common.XHeaders())
|
||||
|
||||
if exec.isRaw() {
|
||||
prm.SetRawFlag()
|
||||
}
|
||||
|
||||
res, err := internalclient.HeadObject(ctx, prm)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return res.Header(), nil
|
||||
}
|
||||
|
||||
func (s *multiaddressRemoteStorage) get(ctx context.Context, exec *execCtx, key *ecdsa.PrivateKey) (*object.Object, error) {
|
||||
var prm internalclient.GetObjectPrm
|
||||
|
||||
prm.SetClient(s.client)
|
||||
prm.SetTTL(exec.prm.common.TTL())
|
||||
prm.SetNetmapEpoch(exec.curProcEpoch)
|
||||
prm.SetAddress(exec.address())
|
||||
prm.SetPrivateKey(key)
|
||||
prm.SetSessionToken(exec.prm.common.SessionToken())
|
||||
prm.SetBearerToken(exec.prm.common.BearerToken())
|
||||
prm.SetXHeaders(exec.prm.common.XHeaders())
|
||||
|
||||
if exec.isRaw() {
|
||||
prm.SetRawFlag()
|
||||
}
|
||||
|
||||
res, err := internalclient.GetObject(ctx, prm)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return res.Object(), nil
|
||||
}
|
|
@ -2,15 +2,8 @@ package getsvc
|
|||
|
||||
import (
|
||||
"context"
|
||||
"crypto/ecdsa"
|
||||
"errors"
|
||||
"io"
|
||||
|
||||
coreclient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
|
||||
internalclient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/internal/client"
|
||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
)
|
||||
|
||||
|
@ -20,18 +13,6 @@ type SimpleObjectWriter struct {
|
|||
pld []byte
|
||||
}
|
||||
|
||||
type clientCacheWrapper struct {
|
||||
cache ClientConstructor
|
||||
}
|
||||
|
||||
type clientWrapper struct {
|
||||
client coreclient.MultiAddressClient
|
||||
}
|
||||
|
||||
type storageEngineWrapper struct {
|
||||
engine *engine.StorageEngine
|
||||
}
|
||||
|
||||
type partWriter struct {
|
||||
ObjectWriter
|
||||
|
||||
|
@ -44,10 +25,6 @@ type hasherWrapper struct {
|
|||
hash io.Writer
|
||||
}
|
||||
|
||||
type nmSrcWrapper struct {
|
||||
nmSrc netmap.Source
|
||||
}
|
||||
|
||||
func NewSimpleObjectWriter() *SimpleObjectWriter {
|
||||
return &SimpleObjectWriter{
|
||||
obj: object.New(),
|
||||
|
@ -75,167 +52,6 @@ func (s *SimpleObjectWriter) Object() *object.Object {
|
|||
return s.obj
|
||||
}
|
||||
|
||||
func (c *clientCacheWrapper) get(info coreclient.NodeInfo) (getClient, error) {
|
||||
clt, err := c.cache.Get(info)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &clientWrapper{
|
||||
client: clt,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (c *clientWrapper) getObject(ctx context.Context, exec *execCtx, info coreclient.NodeInfo) (*object.Object, error) {
|
||||
if exec.isForwardingEnabled() {
|
||||
return exec.prm.forwarder(ctx, info, c.client)
|
||||
}
|
||||
|
||||
key, err := exec.key()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if exec.headOnly() {
|
||||
return c.getHeadOnly(ctx, exec, key)
|
||||
}
|
||||
// we don't specify payload writer because we accumulate
|
||||
// the object locally (even huge).
|
||||
if rng := exec.ctxRange(); rng != nil {
|
||||
// Current spec allows other storage node to deny access,
|
||||
// fallback to GET here.
|
||||
return c.getRange(ctx, exec, key, rng)
|
||||
}
|
||||
|
||||
return c.get(ctx, exec, key)
|
||||
}
|
||||
|
||||
func (c *clientWrapper) getRange(ctx context.Context, exec *execCtx, key *ecdsa.PrivateKey, rng *object.Range) (*object.Object, error) {
|
||||
var prm internalclient.PayloadRangePrm
|
||||
|
||||
prm.SetClient(c.client)
|
||||
prm.SetTTL(exec.prm.common.TTL())
|
||||
prm.SetNetmapEpoch(exec.curProcEpoch)
|
||||
prm.SetAddress(exec.address())
|
||||
prm.SetPrivateKey(key)
|
||||
prm.SetSessionToken(exec.prm.common.SessionToken())
|
||||
prm.SetBearerToken(exec.prm.common.BearerToken())
|
||||
prm.SetXHeaders(exec.prm.common.XHeaders())
|
||||
prm.SetRange(rng)
|
||||
|
||||
if exec.isRaw() {
|
||||
prm.SetRawFlag()
|
||||
}
|
||||
|
||||
res, err := internalclient.PayloadRange(ctx, prm)
|
||||
if err != nil {
|
||||
var errAccessDenied *apistatus.ObjectAccessDenied
|
||||
if errors.As(err, &errAccessDenied) {
|
||||
obj, err := c.get(ctx, exec, key)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
payload := obj.Payload()
|
||||
from := rng.GetOffset()
|
||||
to := from + rng.GetLength()
|
||||
|
||||
if pLen := uint64(len(payload)); to < from || pLen < from || pLen < to {
|
||||
return nil, new(apistatus.ObjectOutOfRange)
|
||||
}
|
||||
|
||||
return payloadOnlyObject(payload[from:to]), nil
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return payloadOnlyObject(res.PayloadRange()), nil
|
||||
}
|
||||
|
||||
func (c *clientWrapper) getHeadOnly(ctx context.Context, exec *execCtx, key *ecdsa.PrivateKey) (*object.Object, error) {
|
||||
var prm internalclient.HeadObjectPrm
|
||||
|
||||
prm.SetClient(c.client)
|
||||
prm.SetTTL(exec.prm.common.TTL())
|
||||
prm.SetNetmapEpoch(exec.curProcEpoch)
|
||||
prm.SetAddress(exec.address())
|
||||
prm.SetPrivateKey(key)
|
||||
prm.SetSessionToken(exec.prm.common.SessionToken())
|
||||
prm.SetBearerToken(exec.prm.common.BearerToken())
|
||||
prm.SetXHeaders(exec.prm.common.XHeaders())
|
||||
|
||||
if exec.isRaw() {
|
||||
prm.SetRawFlag()
|
||||
}
|
||||
|
||||
res, err := internalclient.HeadObject(ctx, prm)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return res.Header(), nil
|
||||
}
|
||||
|
||||
func (c *clientWrapper) get(ctx context.Context, exec *execCtx, key *ecdsa.PrivateKey) (*object.Object, error) {
|
||||
var prm internalclient.GetObjectPrm
|
||||
|
||||
prm.SetClient(c.client)
|
||||
prm.SetTTL(exec.prm.common.TTL())
|
||||
prm.SetNetmapEpoch(exec.curProcEpoch)
|
||||
prm.SetAddress(exec.address())
|
||||
prm.SetPrivateKey(key)
|
||||
prm.SetSessionToken(exec.prm.common.SessionToken())
|
||||
prm.SetBearerToken(exec.prm.common.BearerToken())
|
||||
prm.SetXHeaders(exec.prm.common.XHeaders())
|
||||
|
||||
if exec.isRaw() {
|
||||
prm.SetRawFlag()
|
||||
}
|
||||
|
||||
res, err := internalclient.GetObject(ctx, prm)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return res.Object(), nil
|
||||
}
|
||||
|
||||
func (e *storageEngineWrapper) get(ctx context.Context, exec *execCtx) (*object.Object, error) {
|
||||
if exec.headOnly() {
|
||||
var headPrm engine.HeadPrm
|
||||
headPrm.WithAddress(exec.address())
|
||||
headPrm.WithRaw(exec.isRaw())
|
||||
|
||||
r, err := e.engine.Head(ctx, headPrm)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return r.Header(), nil
|
||||
} else if rng := exec.ctxRange(); rng != nil {
|
||||
var getRange engine.RngPrm
|
||||
getRange.WithAddress(exec.address())
|
||||
getRange.WithPayloadRange(rng)
|
||||
|
||||
r, err := e.engine.GetRange(ctx, getRange)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return r.Object(), nil
|
||||
} else {
|
||||
var getPrm engine.GetPrm
|
||||
getPrm.WithAddress(exec.address())
|
||||
|
||||
r, err := e.engine.Get(ctx, getPrm)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return r.Object(), nil
|
||||
}
|
||||
}
|
||||
|
||||
func (w *partWriter) WriteChunk(ctx context.Context, p []byte) error {
|
||||
return w.chunkWriter.WriteChunk(ctx, p)
|
||||
}
|
||||
|
@ -255,7 +71,3 @@ func (h *hasherWrapper) WriteChunk(_ context.Context, p []byte) error {
|
|||
_, err := h.hash.Write(p)
|
||||
return err
|
||||
}
|
||||
|
||||
func (n *nmSrcWrapper) currentEpoch() (uint64, error) {
|
||||
return n.nmSrc.Epoch()
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue