Refactor getsvc #277
8 changed files with 339 additions and 359 deletions
|
@ -336,17 +336,14 @@ func createGetService(c *cfg, keyStorage *util.KeyStorage, traverseGen *util.Tra
|
||||||
ls := c.cfgObject.cfgLocalStorage.localStorage
|
ls := c.cfgObject.cfgLocalStorage.localStorage
|
||||||
|
|
||||||
return getsvc.New(
|
return getsvc.New(
|
||||||
getsvc.WithLogger(c.log),
|
keyStorage,
|
||||||
getsvc.WithLocalStorageEngine(ls),
|
c.netMapSource,
|
||||||
getsvc.WithClientConstructor(coreConstructor),
|
ls,
|
||||||
getsvc.WithTraverserGenerator(
|
|
||||||
traverseGen.WithTraverseOptions(
|
traverseGen.WithTraverseOptions(
|
||||||
placement.SuccessAfter(1),
|
placement.SuccessAfter(1),
|
||||||
),
|
),
|
||||||
),
|
coreConstructor,
|
||||||
getsvc.WithNetMapSource(c.netMapSource),
|
getsvc.WithLogger(c.log))
|
||||||
getsvc.WithKeyStorage(keyStorage),
|
|
||||||
)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func createGetServiceV2(sGet *getsvc.Service, keyStorage *util.KeyStorage) *getsvcV2.Service {
|
func createGetServiceV2(sGet *getsvc.Service, keyStorage *util.KeyStorage) *getsvcV2.Service {
|
||||||
|
|
|
@ -143,7 +143,7 @@ func (exec *execCtx) initEpoch() bool {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
e, err := exec.svc.currentEpochReceiver.currentEpoch()
|
e, err := exec.svc.epochSource.Epoch()
|
||||||
|
|
||||||
switch {
|
switch {
|
||||||
default:
|
default:
|
||||||
|
@ -181,22 +181,20 @@ func (exec *execCtx) generateTraverser(addr oid.Address) (*placement.Traverser,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (exec execCtx) remoteClient(info clientcore.NodeInfo) (getClient, bool) {
|
func (exec execCtx) remoteClient(info clientcore.NodeInfo) (remoteStorage, bool) {
|
||||||
c, err := exec.svc.clientCache.get(info)
|
rs, err := exec.svc.remoteStorageConstructor.Get(info)
|
||||||
|
if err != nil {
|
||||||
switch {
|
|
||||||
default:
|
|
||||||
exec.status = statusUndefined
|
exec.status = statusUndefined
|
||||||
exec.err = err
|
exec.err = err
|
||||||
|
|
||||||
exec.log.Debug(logs.GetCouldNotConstructRemoteNodeClient)
|
exec.log.Debug(logs.GetCouldNotConstructRemoteNodeClient)
|
||||||
case err == nil:
|
|
||||||
return c, true
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil, false
|
return nil, false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return rs, true
|
||||||
|
}
|
||||||
|
|
||||||
func mergeSplitInfo(dst, src *objectSDK.SplitInfo) {
|
func mergeSplitInfo(dst, src *objectSDK.SplitInfo) {
|
||||||
if last, ok := src.LastPart(); ok {
|
if last, ok := src.LastPart(); ok {
|
||||||
dst.SetLastPart(last)
|
dst.SetLastPart(last)
|
||||||
|
|
|
@ -56,7 +56,7 @@ type testClient struct {
|
||||||
|
|
||||||
type testEpochReceiver uint64
|
type testEpochReceiver uint64
|
||||||
|
|
||||||
func (e testEpochReceiver) currentEpoch() (uint64, error) {
|
func (e testEpochReceiver) Epoch() (uint64, error) {
|
||||||
return uint64(e), nil
|
return uint64(e), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -99,7 +99,7 @@ func (p *testPlacementBuilder) BuildPlacement(cnr cid.ID, obj *oid.ID, _ netmap.
|
||||||
return vs, nil
|
return vs, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *testClientCache) get(info client.NodeInfo) (getClient, error) {
|
func (c *testClientCache) Get(info client.NodeInfo) (remoteStorage, error) {
|
||||||
v, ok := c.clients[network.StringifyGroup(info.AddressGroup())]
|
v, ok := c.clients[network.StringifyGroup(info.AddressGroup())]
|
||||||
if !ok {
|
if !ok {
|
||||||
return nil, errors.New("could not construct client")
|
return nil, errors.New("could not construct client")
|
||||||
|
@ -117,7 +117,7 @@ func newTestClient() *testClient {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *testClient) getObject(ctx context.Context, exec *execCtx, _ client.NodeInfo) (*objectSDK.Object, error) {
|
func (c *testClient) GetObject(ctx context.Context, exec *execCtx, _ client.NodeInfo) (*objectSDK.Object, error) {
|
||||||
v, ok := c.results[exec.address().EncodeToString()]
|
v, ok := c.results[exec.address().EncodeToString()]
|
||||||
if !ok {
|
if !ok {
|
||||||
var errNotFound apistatus.ObjectNotFound
|
var errNotFound apistatus.ObjectNotFound
|
||||||
|
@ -139,11 +139,19 @@ func (c *testClient) addResult(addr oid.Address, obj *objectSDK.Object, err erro
|
||||||
}{obj: obj, err: err}
|
}{obj: obj, err: err}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *testStorage) get(_ context.Context, exec *execCtx) (*objectSDK.Object, error) {
|
func (s *testStorage) Get(ctx context.Context, address oid.Address) (*objectSDK.Object, error) {
|
||||||
|
return s.Range(ctx, address, nil)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *testStorage) Head(ctx context.Context, address oid.Address, isRaw bool) (*objectSDK.Object, error) {
|
||||||
|
return s.Range(ctx, address, nil)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *testStorage) Range(_ context.Context, address oid.Address, rng *objectSDK.Range) (*objectSDK.Object, error) {
|
||||||
var (
|
var (
|
||||||
ok bool
|
ok bool
|
||||||
obj *objectSDK.Object
|
obj *objectSDK.Object
|
||||||
sAddr = exec.address().EncodeToString()
|
sAddr = address.EncodeToString()
|
||||||
)
|
)
|
||||||
|
|
||||||
if _, ok = s.inhumed[sAddr]; ok {
|
if _, ok = s.inhumed[sAddr]; ok {
|
||||||
|
@ -157,7 +165,7 @@ func (s *testStorage) get(_ context.Context, exec *execCtx) (*objectSDK.Object,
|
||||||
}
|
}
|
||||||
|
|
||||||
if obj, ok = s.phy[sAddr]; ok {
|
if obj, ok = s.phy[sAddr]; ok {
|
||||||
return cutToRange(obj, exec.ctxRange()), nil
|
return cutToRange(obj, rng), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
var errNotFound apistatus.ObjectNotFound
|
var errNotFound apistatus.ObjectNotFound
|
||||||
|
@ -245,11 +253,10 @@ func TestGetLocalOnly(t *testing.T) {
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
|
|
||||||
newSvc := func(storage *testStorage) *Service {
|
newSvc := func(storage *testStorage) *Service {
|
||||||
svc := &Service{cfg: new(cfg)}
|
return &Service{
|
||||||
svc.log = test.NewLogger(t, false)
|
log: test.NewLogger(t, false),
|
||||||
svc.localStorage = storage
|
localStorage: storage,
|
||||||
|
}
|
||||||
return svc
|
|
||||||
}
|
}
|
||||||
|
|
||||||
newPrm := func(raw bool, w ObjectWriter) Prm {
|
newPrm := func(raw bool, w ObjectWriter) Prm {
|
||||||
|
@ -506,22 +513,20 @@ func TestGetRemoteSmall(t *testing.T) {
|
||||||
container.CalculateID(&idCnr, cnr)
|
container.CalculateID(&idCnr, cnr)
|
||||||
|
|
||||||
newSvc := func(b *testPlacementBuilder, c *testClientCache) *Service {
|
newSvc := func(b *testPlacementBuilder, c *testClientCache) *Service {
|
||||||
svc := &Service{cfg: new(cfg)}
|
|
||||||
svc.log = test.NewLogger(t, false)
|
|
||||||
svc.localStorage = newTestStorage()
|
|
||||||
|
|
||||||
const curEpoch = 13
|
const curEpoch = 13
|
||||||
|
|
||||||
svc.traverserGenerator = &testTraverserGenerator{
|
return &Service{
|
||||||
|
log: test.NewLogger(t, false),
|
||||||
|
localStorage: newTestStorage(),
|
||||||
|
traverserGenerator: &testTraverserGenerator{
|
||||||
c: cnr,
|
c: cnr,
|
||||||
b: map[uint64]placement.Builder{
|
b: map[uint64]placement.Builder{
|
||||||
curEpoch: b,
|
curEpoch: b,
|
||||||
},
|
},
|
||||||
|
},
|
||||||
|
epochSource: testEpochReceiver(curEpoch),
|
||||||
|
remoteStorageConstructor: c,
|
||||||
}
|
}
|
||||||
svc.clientCache = c
|
|
||||||
svc.currentEpochReceiver = testEpochReceiver(curEpoch)
|
|
||||||
|
|
||||||
return svc
|
|
||||||
}
|
}
|
||||||
|
|
||||||
newPrm := func(raw bool, w ObjectWriter) Prm {
|
newPrm := func(raw bool, w ObjectWriter) Prm {
|
||||||
|
@ -1639,13 +1644,13 @@ func TestGetFromPastEpoch(t *testing.T) {
|
||||||
c22 := newTestClient()
|
c22 := newTestClient()
|
||||||
c22.addResult(addr, obj, nil)
|
c22.addResult(addr, obj, nil)
|
||||||
|
|
||||||
svc := &Service{cfg: new(cfg)}
|
|
||||||
svc.log = test.NewLogger(t, false)
|
|
||||||
svc.localStorage = newTestStorage()
|
|
||||||
|
|
||||||
const curEpoch = 13
|
const curEpoch = 13
|
||||||
|
|
||||||
svc.traverserGenerator = &testTraverserGenerator{
|
svc := &Service{
|
||||||
|
log: test.NewLogger(t, false),
|
||||||
|
localStorage: newTestStorage(),
|
||||||
|
epochSource: testEpochReceiver(curEpoch),
|
||||||
|
traverserGenerator: &testTraverserGenerator{
|
||||||
c: cnr,
|
c: cnr,
|
||||||
b: map[uint64]placement.Builder{
|
b: map[uint64]placement.Builder{
|
||||||
curEpoch: &testPlacementBuilder{
|
curEpoch: &testPlacementBuilder{
|
||||||
|
@ -1659,19 +1664,17 @@ func TestGetFromPastEpoch(t *testing.T) {
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
}
|
},
|
||||||
|
remoteStorageConstructor: &testClientCache{
|
||||||
svc.clientCache = &testClientCache{
|
|
||||||
clients: map[string]*testClient{
|
clients: map[string]*testClient{
|
||||||
as[0][0]: c11,
|
as[0][0]: c11,
|
||||||
as[0][1]: c12,
|
as[0][1]: c12,
|
||||||
as[1][0]: c21,
|
as[1][0]: c21,
|
||||||
as[1][1]: c22,
|
as[1][1]: c22,
|
||||||
},
|
},
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
svc.currentEpochReceiver = testEpochReceiver(curEpoch)
|
|
||||||
|
|
||||||
w := NewSimpleObjectWriter()
|
w := NewSimpleObjectWriter()
|
||||||
|
|
||||||
commonPrm := new(util.CommonPrm)
|
commonPrm := new(util.CommonPrm)
|
||||||
|
|
|
@ -19,7 +19,7 @@ func (exec *execCtx) executeLocal(ctx context.Context) {
|
||||||
|
|
||||||
var err error
|
var err error
|
||||||
|
|
||||||
exec.collectedObject, err = exec.svc.localStorage.get(ctx, exec)
|
exec.collectedObject, err = exec.get(ctx)
|
||||||
|
|
||||||
var errSplitInfo *objectSDK.SplitInfoError
|
var errSplitInfo *objectSDK.SplitInfoError
|
||||||
var errRemoved apistatus.ObjectAlreadyRemoved
|
var errRemoved apistatus.ObjectAlreadyRemoved
|
||||||
|
@ -49,3 +49,13 @@ func (exec *execCtx) executeLocal(ctx context.Context) {
|
||||||
exec.err = errOutOfRange
|
exec.err = errOutOfRange
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (exec *execCtx) get(ctx context.Context) (*objectSDK.Object, error) {
|
||||||
|
if exec.headOnly() {
|
||||||
|
return exec.svc.localStorage.Head(ctx, exec.address(), exec.isRaw())
|
||||||
|
}
|
||||||
|
if rng := exec.ctxRange(); rng != nil {
|
||||||
|
return exec.svc.localStorage.Range(ctx, exec.address(), rng)
|
||||||
|
}
|
||||||
|
return exec.svc.localStorage.Get(ctx, exec.address())
|
||||||
|
}
|
||||||
|
|
|
@ -23,7 +23,7 @@ func (exec *execCtx) processNode(ctx context.Context, info client.NodeInfo) bool
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
obj, err := client.getObject(ctx, exec, info)
|
obj, err := client.GetObject(ctx, exec, info)
|
||||||
|
|
||||||
var errSplitInfo *objectSDK.SplitInfoError
|
var errSplitInfo *objectSDK.SplitInfoError
|
||||||
var errRemoved *apistatus.ObjectAlreadyRemoved
|
var errRemoved *apistatus.ObjectAlreadyRemoved
|
||||||
|
|
|
@ -1,124 +1,54 @@
|
||||||
package getsvc
|
package getsvc
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
|
||||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// Option is a Service's constructor option.
|
||||||
|
type Option func(*Service)
|
||||||
|
|
||||||
// Service utility serving requests of Object.Get service.
|
// Service utility serving requests of Object.Get service.
|
||||||
type Service struct {
|
type Service struct {
|
||||||
*cfg
|
|
||||||
}
|
|
||||||
|
|
||||||
// Option is a Service's constructor option.
|
|
||||||
type Option func(*cfg)
|
|
||||||
|
|
||||||
type getClient interface {
|
|
||||||
getObject(context.Context, *execCtx, client.NodeInfo) (*object.Object, error)
|
|
||||||
}
|
|
||||||
|
|
||||||
type cfg struct {
|
|
||||||
log *logger.Logger
|
log *logger.Logger
|
||||||
|
localStorage localStorage
|
||||||
localStorage interface {
|
traverserGenerator traverserGenerator
|
||||||
get(context.Context, *execCtx) (*object.Object, error)
|
epochSource epochSource
|
||||||
}
|
keyStore keyStorage
|
||||||
|
remoteStorageConstructor remoteStorageConstructor
|
||||||
clientCache interface {
|
|
||||||
get(client.NodeInfo) (getClient, error)
|
|
||||||
}
|
|
||||||
|
|
||||||
traverserGenerator interface {
|
|
||||||
GenerateTraverser(cid.ID, *oid.ID, uint64) (*placement.Traverser, error)
|
|
||||||
}
|
|
||||||
|
|
||||||
currentEpochReceiver interface {
|
|
||||||
currentEpoch() (uint64, error)
|
|
||||||
}
|
|
||||||
|
|
||||||
keyStore *util.KeyStorage
|
|
||||||
}
|
|
||||||
|
|
||||||
func defaultCfg() *cfg {
|
|
||||||
return &cfg{
|
|
||||||
log: &logger.Logger{Logger: zap.L()},
|
|
||||||
localStorage: new(storageEngineWrapper),
|
|
||||||
clientCache: new(clientCacheWrapper),
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// New creates, initializes and returns utility serving
|
// New creates, initializes and returns utility serving
|
||||||
|
|||||||
// Object.Get service requests.
|
// Object.Get service requests.
|
||||||
func New(opts ...Option) *Service {
|
func New(
|
||||||
c := defaultCfg()
|
ks keyStorage,
|
||||||
|
es epochSource,
|
||||||
for i := range opts {
|
e localStorageEngine,
|
||||||
opts[i](c)
|
tg traverserGenerator,
|
||||||
|
cc clientConstructor,
|
||||||
|
opts ...Option,
|
||||||
|
) *Service {
|
||||||
|
result := &Service{
|
||||||
|
keyStore: ks,
|
||||||
|
epochSource: es,
|
||||||
|
log: &logger.Logger{Logger: zap.L()},
|
||||||
|
localStorage: &engineLocalStorage{
|
||||||
|
engine: e,
|
||||||
|
},
|
||||||
|
traverserGenerator: tg,
|
||||||
|
remoteStorageConstructor: &multiclientRemoteStorageConstructor{
|
||||||
|
clientConstructor: cc,
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
for _, option := range opts {
|
||||||
return &Service{
|
option(result)
|
||||||
cfg: c,
|
|
||||||
}
|
}
|
||||||
|
return result
|
||||||
}
|
}
|
||||||
|
|
||||||
// WithLogger returns option to specify Get service's logger.
|
// WithLogger returns option to specify Get service's logger.
|
||||||
func WithLogger(l *logger.Logger) Option {
|
func WithLogger(l *logger.Logger) Option {
|
||||||
return func(c *cfg) {
|
return func(s *Service) {
|
||||||
c.log = &logger.Logger{Logger: l.With(zap.String("component", "Object.Get service"))}
|
s.log = &logger.Logger{Logger: l.With(zap.String("component", "Object.Get service"))}
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// WithLocalStorageEngine returns option to set local storage
|
|
||||||
// instance.
|
|
||||||
func WithLocalStorageEngine(e *engine.StorageEngine) Option {
|
|
||||||
return func(c *cfg) {
|
|
||||||
c.localStorage.(*storageEngineWrapper).engine = e
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
type ClientConstructor interface {
|
|
||||||
Get(client.NodeInfo) (client.MultiAddressClient, error)
|
|
||||||
}
|
|
||||||
|
|
||||||
// WithClientConstructor returns option to set constructor of remote node clients.
|
|
||||||
func WithClientConstructor(v ClientConstructor) Option {
|
|
||||||
return func(c *cfg) {
|
|
||||||
c.clientCache.(*clientCacheWrapper).cache = v
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// WithTraverserGenerator returns option to set generator of
|
|
||||||
// placement traverser to get the objects from containers.
|
|
||||||
func WithTraverserGenerator(t *util.TraverserGenerator) Option {
|
|
||||||
return func(c *cfg) {
|
|
||||||
c.traverserGenerator = t
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// WithNetMapSource returns option to set network
|
|
||||||
// map storage to receive current network state.
|
|
||||||
func WithNetMapSource(nmSrc netmap.Source) Option {
|
|
||||||
return func(c *cfg) {
|
|
||||||
c.currentEpochReceiver = &nmSrcWrapper{
|
|
||||||
nmSrc: nmSrc,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// WithKeyStorage returns option to set private
|
|
||||||
// key storage for session tokens and node key.
|
|
||||||
func WithKeyStorage(store *util.KeyStorage) Option {
|
|
||||||
return func(c *cfg) {
|
|
||||||
c.keyStore = store
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
230
pkg/services/object/get/types.go
Normal file
230
pkg/services/object/get/types.go
Normal file
|
@ -0,0 +1,230 @@
|
||||||
|
package getsvc
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"crypto/ecdsa"
|
||||||
|
"errors"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
|
||||||
|
coreclient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
|
||||||
|
internalclient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/internal/client"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
|
||||||
|
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||||
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
|
)
|
||||||
|
|
||||||
|
type epochSource interface {
|
||||||
|
Epoch() (uint64, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
type traverserGenerator interface {
|
||||||
|
GenerateTraverser(cid.ID, *oid.ID, uint64) (*placement.Traverser, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
type keyStorage interface {
|
||||||
|
GetKey(info *util.SessionInfo) (*ecdsa.PrivateKey, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
type localStorageEngine interface {
|
||||||
|
Head(ctx context.Context, p engine.HeadPrm) (engine.HeadRes, error)
|
||||||
|
GetRange(ctx context.Context, p engine.RngPrm) (engine.RngRes, error)
|
||||||
|
Get(ctx context.Context, p engine.GetPrm) (engine.GetRes, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
type clientConstructor interface {
|
||||||
|
Get(client.NodeInfo) (client.MultiAddressClient, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
type remoteStorageConstructor interface {
|
||||||
|
Get(client.NodeInfo) (remoteStorage, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
type multiclientRemoteStorageConstructor struct {
|
||||||
|
clientConstructor clientConstructor
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *multiclientRemoteStorageConstructor) Get(info client.NodeInfo) (remoteStorage, error) {
|
||||||
|
clt, err := c.clientConstructor.Get(info)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return &multiaddressRemoteStorage{
|
||||||
|
client: clt,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type remoteStorage interface {
|
||||||
|
GetObject(context.Context, *execCtx, client.NodeInfo) (*objectSDK.Object, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
type localStorage interface {
|
||||||
|
Head(ctx context.Context, address oid.Address, isRaw bool) (*objectSDK.Object, error)
|
||||||
|
Range(ctx context.Context, address oid.Address, rng *objectSDK.Range) (*objectSDK.Object, error)
|
||||||
|
Get(ctx context.Context, address oid.Address) (*objectSDK.Object, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
type engineLocalStorage struct {
|
||||||
|
engine localStorageEngine
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *engineLocalStorage) Head(ctx context.Context, address oid.Address, isRaw bool) (*objectSDK.Object, error) {
|
||||||
|
var headPrm engine.HeadPrm
|
||||||
|
headPrm.WithAddress(address)
|
||||||
|
headPrm.WithRaw(isRaw)
|
||||||
|
|
||||||
|
r, err := s.engine.Head(ctx, headPrm)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return r.Header(), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *engineLocalStorage) Range(ctx context.Context, address oid.Address, rng *objectSDK.Range) (*objectSDK.Object, error) {
|
||||||
|
var getRange engine.RngPrm
|
||||||
|
getRange.WithAddress(address)
|
||||||
|
getRange.WithPayloadRange(rng)
|
||||||
|
|
||||||
|
r, err := s.engine.GetRange(ctx, getRange)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return r.Object(), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *engineLocalStorage) Get(ctx context.Context, address oid.Address) (*objectSDK.Object, error) {
|
||||||
|
var getPrm engine.GetPrm
|
||||||
|
getPrm.WithAddress(address)
|
||||||
|
|
||||||
|
r, err := s.engine.Get(ctx, getPrm)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return r.Object(), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type multiaddressRemoteStorage struct {
|
||||||
|
client coreclient.MultiAddressClient
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *multiaddressRemoteStorage) GetObject(ctx context.Context, exec *execCtx, info coreclient.NodeInfo) (*objectSDK.Object, error) {
|
||||||
|
if exec.isForwardingEnabled() {
|
||||||
|
return exec.prm.forwarder(ctx, info, s.client)
|
||||||
|
}
|
||||||
|
|
||||||
|
key, err := exec.key()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if exec.headOnly() {
|
||||||
|
return s.getHeadOnly(ctx, exec, key)
|
||||||
|
}
|
||||||
|
// we don't specify payload writer because we accumulate
|
||||||
|
// the object locally (even huge).
|
||||||
|
if rng := exec.ctxRange(); rng != nil {
|
||||||
|
// Current spec allows other storage node to deny access,
|
||||||
|
// fallback to GET here.
|
||||||
|
return s.getRange(ctx, exec, key, rng)
|
||||||
|
}
|
||||||
|
|
||||||
|
return s.get(ctx, exec, key)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *multiaddressRemoteStorage) getRange(ctx context.Context, exec *execCtx, key *ecdsa.PrivateKey, rng *object.Range) (*object.Object, error) {
|
||||||
|
var prm internalclient.PayloadRangePrm
|
||||||
|
|
||||||
|
prm.SetClient(s.client)
|
||||||
|
prm.SetTTL(exec.prm.common.TTL())
|
||||||
|
prm.SetNetmapEpoch(exec.curProcEpoch)
|
||||||
|
prm.SetAddress(exec.address())
|
||||||
|
prm.SetPrivateKey(key)
|
||||||
|
prm.SetSessionToken(exec.prm.common.SessionToken())
|
||||||
|
prm.SetBearerToken(exec.prm.common.BearerToken())
|
||||||
|
prm.SetXHeaders(exec.prm.common.XHeaders())
|
||||||
|
prm.SetRange(rng)
|
||||||
|
|
||||||
|
if exec.isRaw() {
|
||||||
|
prm.SetRawFlag()
|
||||||
|
}
|
||||||
|
|
||||||
|
res, err := internalclient.PayloadRange(ctx, prm)
|
||||||
|
if err != nil {
|
||||||
|
var errAccessDenied *apistatus.ObjectAccessDenied
|
||||||
|
if errors.As(err, &errAccessDenied) {
|
||||||
|
obj, err := s.get(ctx, exec, key)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
payload := obj.Payload()
|
||||||
|
from := rng.GetOffset()
|
||||||
|
to := from + rng.GetLength()
|
||||||
|
|
||||||
|
if pLen := uint64(len(payload)); to < from || pLen < from || pLen < to {
|
||||||
|
return nil, new(apistatus.ObjectOutOfRange)
|
||||||
|
}
|
||||||
|
|
||||||
|
return payloadOnlyObject(payload[from:to]), nil
|
||||||
|
}
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return payloadOnlyObject(res.PayloadRange()), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *multiaddressRemoteStorage) getHeadOnly(ctx context.Context, exec *execCtx, key *ecdsa.PrivateKey) (*object.Object, error) {
|
||||||
|
var prm internalclient.HeadObjectPrm
|
||||||
|
|
||||||
|
prm.SetClient(s.client)
|
||||||
|
prm.SetTTL(exec.prm.common.TTL())
|
||||||
|
prm.SetNetmapEpoch(exec.curProcEpoch)
|
||||||
|
prm.SetAddress(exec.address())
|
||||||
|
prm.SetPrivateKey(key)
|
||||||
|
prm.SetSessionToken(exec.prm.common.SessionToken())
|
||||||
|
prm.SetBearerToken(exec.prm.common.BearerToken())
|
||||||
|
prm.SetXHeaders(exec.prm.common.XHeaders())
|
||||||
|
|
||||||
|
if exec.isRaw() {
|
||||||
|
prm.SetRawFlag()
|
||||||
|
}
|
||||||
|
|
||||||
|
res, err := internalclient.HeadObject(ctx, prm)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return res.Header(), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *multiaddressRemoteStorage) get(ctx context.Context, exec *execCtx, key *ecdsa.PrivateKey) (*object.Object, error) {
|
||||||
|
var prm internalclient.GetObjectPrm
|
||||||
|
|
||||||
|
prm.SetClient(s.client)
|
||||||
|
prm.SetTTL(exec.prm.common.TTL())
|
||||||
|
prm.SetNetmapEpoch(exec.curProcEpoch)
|
||||||
|
prm.SetAddress(exec.address())
|
||||||
|
prm.SetPrivateKey(key)
|
||||||
|
prm.SetSessionToken(exec.prm.common.SessionToken())
|
||||||
|
prm.SetBearerToken(exec.prm.common.BearerToken())
|
||||||
|
prm.SetXHeaders(exec.prm.common.XHeaders())
|
||||||
|
|
||||||
|
if exec.isRaw() {
|
||||||
|
prm.SetRawFlag()
|
||||||
|
}
|
||||||
|
|
||||||
|
res, err := internalclient.GetObject(ctx, prm)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return res.Object(), nil
|
||||||
|
}
|
|
@ -2,15 +2,8 @@ package getsvc
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"crypto/ecdsa"
|
|
||||||
"errors"
|
|
||||||
"io"
|
"io"
|
||||||
|
|
||||||
coreclient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
|
|
||||||
internalclient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/internal/client"
|
|
||||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -20,18 +13,6 @@ type SimpleObjectWriter struct {
|
||||||
pld []byte
|
pld []byte
|
||||||
}
|
}
|
||||||
|
|
||||||
type clientCacheWrapper struct {
|
|
||||||
cache ClientConstructor
|
|
||||||
}
|
|
||||||
|
|
||||||
type clientWrapper struct {
|
|
||||||
client coreclient.MultiAddressClient
|
|
||||||
}
|
|
||||||
|
|
||||||
type storageEngineWrapper struct {
|
|
||||||
engine *engine.StorageEngine
|
|
||||||
}
|
|
||||||
|
|
||||||
type partWriter struct {
|
type partWriter struct {
|
||||||
ObjectWriter
|
ObjectWriter
|
||||||
|
|
||||||
|
@ -44,10 +25,6 @@ type hasherWrapper struct {
|
||||||
hash io.Writer
|
hash io.Writer
|
||||||
}
|
}
|
||||||
|
|
||||||
type nmSrcWrapper struct {
|
|
||||||
nmSrc netmap.Source
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewSimpleObjectWriter() *SimpleObjectWriter {
|
func NewSimpleObjectWriter() *SimpleObjectWriter {
|
||||||
return &SimpleObjectWriter{
|
return &SimpleObjectWriter{
|
||||||
obj: object.New(),
|
obj: object.New(),
|
||||||
|
@ -75,167 +52,6 @@ func (s *SimpleObjectWriter) Object() *object.Object {
|
||||||
return s.obj
|
return s.obj
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *clientCacheWrapper) get(info coreclient.NodeInfo) (getClient, error) {
|
|
||||||
clt, err := c.cache.Get(info)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return &clientWrapper{
|
|
||||||
client: clt,
|
|
||||||
}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *clientWrapper) getObject(ctx context.Context, exec *execCtx, info coreclient.NodeInfo) (*object.Object, error) {
|
|
||||||
if exec.isForwardingEnabled() {
|
|
||||||
return exec.prm.forwarder(ctx, info, c.client)
|
|
||||||
}
|
|
||||||
|
|
||||||
key, err := exec.key()
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
if exec.headOnly() {
|
|
||||||
return c.getHeadOnly(ctx, exec, key)
|
|
||||||
}
|
|
||||||
// we don't specify payload writer because we accumulate
|
|
||||||
// the object locally (even huge).
|
|
||||||
if rng := exec.ctxRange(); rng != nil {
|
|
||||||
// Current spec allows other storage node to deny access,
|
|
||||||
// fallback to GET here.
|
|
||||||
return c.getRange(ctx, exec, key, rng)
|
|
||||||
}
|
|
||||||
|
|
||||||
return c.get(ctx, exec, key)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *clientWrapper) getRange(ctx context.Context, exec *execCtx, key *ecdsa.PrivateKey, rng *object.Range) (*object.Object, error) {
|
|
||||||
var prm internalclient.PayloadRangePrm
|
|
||||||
|
|
||||||
prm.SetClient(c.client)
|
|
||||||
prm.SetTTL(exec.prm.common.TTL())
|
|
||||||
prm.SetNetmapEpoch(exec.curProcEpoch)
|
|
||||||
prm.SetAddress(exec.address())
|
|
||||||
prm.SetPrivateKey(key)
|
|
||||||
prm.SetSessionToken(exec.prm.common.SessionToken())
|
|
||||||
prm.SetBearerToken(exec.prm.common.BearerToken())
|
|
||||||
prm.SetXHeaders(exec.prm.common.XHeaders())
|
|
||||||
prm.SetRange(rng)
|
|
||||||
|
|
||||||
if exec.isRaw() {
|
|
||||||
prm.SetRawFlag()
|
|
||||||
}
|
|
||||||
|
|
||||||
res, err := internalclient.PayloadRange(ctx, prm)
|
|
||||||
if err != nil {
|
|
||||||
var errAccessDenied *apistatus.ObjectAccessDenied
|
|
||||||
if errors.As(err, &errAccessDenied) {
|
|
||||||
obj, err := c.get(ctx, exec, key)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
payload := obj.Payload()
|
|
||||||
from := rng.GetOffset()
|
|
||||||
to := from + rng.GetLength()
|
|
||||||
|
|
||||||
if pLen := uint64(len(payload)); to < from || pLen < from || pLen < to {
|
|
||||||
return nil, new(apistatus.ObjectOutOfRange)
|
|
||||||
}
|
|
||||||
|
|
||||||
return payloadOnlyObject(payload[from:to]), nil
|
|
||||||
}
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return payloadOnlyObject(res.PayloadRange()), nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *clientWrapper) getHeadOnly(ctx context.Context, exec *execCtx, key *ecdsa.PrivateKey) (*object.Object, error) {
|
|
||||||
var prm internalclient.HeadObjectPrm
|
|
||||||
|
|
||||||
prm.SetClient(c.client)
|
|
||||||
prm.SetTTL(exec.prm.common.TTL())
|
|
||||||
prm.SetNetmapEpoch(exec.curProcEpoch)
|
|
||||||
prm.SetAddress(exec.address())
|
|
||||||
prm.SetPrivateKey(key)
|
|
||||||
prm.SetSessionToken(exec.prm.common.SessionToken())
|
|
||||||
prm.SetBearerToken(exec.prm.common.BearerToken())
|
|
||||||
prm.SetXHeaders(exec.prm.common.XHeaders())
|
|
||||||
|
|
||||||
if exec.isRaw() {
|
|
||||||
prm.SetRawFlag()
|
|
||||||
}
|
|
||||||
|
|
||||||
res, err := internalclient.HeadObject(ctx, prm)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return res.Header(), nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *clientWrapper) get(ctx context.Context, exec *execCtx, key *ecdsa.PrivateKey) (*object.Object, error) {
|
|
||||||
var prm internalclient.GetObjectPrm
|
|
||||||
|
|
||||||
prm.SetClient(c.client)
|
|
||||||
prm.SetTTL(exec.prm.common.TTL())
|
|
||||||
prm.SetNetmapEpoch(exec.curProcEpoch)
|
|
||||||
prm.SetAddress(exec.address())
|
|
||||||
prm.SetPrivateKey(key)
|
|
||||||
prm.SetSessionToken(exec.prm.common.SessionToken())
|
|
||||||
prm.SetBearerToken(exec.prm.common.BearerToken())
|
|
||||||
prm.SetXHeaders(exec.prm.common.XHeaders())
|
|
||||||
|
|
||||||
if exec.isRaw() {
|
|
||||||
prm.SetRawFlag()
|
|
||||||
}
|
|
||||||
|
|
||||||
res, err := internalclient.GetObject(ctx, prm)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return res.Object(), nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (e *storageEngineWrapper) get(ctx context.Context, exec *execCtx) (*object.Object, error) {
|
|
||||||
if exec.headOnly() {
|
|
||||||
var headPrm engine.HeadPrm
|
|
||||||
headPrm.WithAddress(exec.address())
|
|
||||||
headPrm.WithRaw(exec.isRaw())
|
|
||||||
|
|
||||||
r, err := e.engine.Head(ctx, headPrm)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return r.Header(), nil
|
|
||||||
} else if rng := exec.ctxRange(); rng != nil {
|
|
||||||
var getRange engine.RngPrm
|
|
||||||
getRange.WithAddress(exec.address())
|
|
||||||
getRange.WithPayloadRange(rng)
|
|
||||||
|
|
||||||
r, err := e.engine.GetRange(ctx, getRange)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return r.Object(), nil
|
|
||||||
} else {
|
|
||||||
var getPrm engine.GetPrm
|
|
||||||
getPrm.WithAddress(exec.address())
|
|
||||||
|
|
||||||
r, err := e.engine.Get(ctx, getPrm)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return r.Object(), nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (w *partWriter) WriteChunk(ctx context.Context, p []byte) error {
|
func (w *partWriter) WriteChunk(ctx context.Context, p []byte) error {
|
||||||
return w.chunkWriter.WriteChunk(ctx, p)
|
return w.chunkWriter.WriteChunk(ctx, p)
|
||||||
}
|
}
|
||||||
|
@ -255,7 +71,3 @@ func (h *hasherWrapper) WriteChunk(_ context.Context, p []byte) error {
|
||||||
_, err := h.hash.Write(p)
|
_, err := h.hash.Write(p)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *nmSrcWrapper) currentEpoch() (uint64, error) {
|
|
||||||
return n.nmSrc.Epoch()
|
|
||||||
}
|
|
||||||
|
|
Loading…
Add table
Reference in a new issue
Why this change? We use functional options in services and you can do everything with both?
For example, it was not obvious to me which of the dependencies and options were required and which were not.
So
New
requires mandatory deps,With...
- optional.Fixed back to Option for logger.
From https://github.com/uber-go/guide/blob/master/style.md#functional-options
Yes, but 5 positional arguments don't look nice either.
Not that I am against the change, but I if we were to commit to a new scheme, I would like to do this atomically across the whole repo. We can create an issue for the discussion.
I would prefer the positional arguments if they are required. They can be packed into a
Options
struct if needed.It seems relatively easy to build incorrect objects in other places as well, because of this.
I like struct, but it seems we do not enforce anything with it either.
However it may be easier to see what arguments are required.
We can try adopting this linter https://github.com/GaijinEntertainment/go-exhaustruct
Ofc it's not enforced, especially when having pointer fields. But I guess it's a combination of it being conventional and easier to look at the struct and its doc when calling
New
, rather than at the multiple option builders which might be spread around.how about a struct that shows a caller that it contains only the required params + validaion inside
New
(pointers to be sure it is not a default value)? i do not like 5 positional args tooWhat's the difference? Instead of 5 arguments, there will be a structure with 5 fields that will most likely be filled in when calling the
New
method.For example:
New
declaration: https://git.frostfs.info/TrueCloudLab/frostfs-node/src/branch/master/pkg/innerring/processors/frostfs/processor.go#L88New
call: https://git.frostfs.info/TrueCloudLab/frostfs-node/src/branch/master/pkg/innerring/initialization.go#L412But positional arguments have an advantage: if a new dependency is added to the
New
method, then with a very high probability the program will not assemble until this dependency is added everywhere.For me, it looks unreadable, especially when we provide a bunch of nil vars or empty strings.
It's about required arguments only. If you can pass nil or empty string as an argument value, i think that this argument is not required.
For not required arguments use Option.