Unify parameters in object services #495
17 changed files with 179 additions and 497 deletions
|
@ -313,48 +313,40 @@ func createPutSvc(c *cfg, keyStorage *util.KeyStorage) *putsvc.Service {
|
|||
}
|
||||
|
||||
return putsvc.NewService(
|
||||
putsvc.WithKeyStorage(keyStorage),
|
||||
putsvc.WithClientConstructor(c.putClientCache),
|
||||
putsvc.WithMaxSizeSource(newCachedMaxObjectSizeSource(c)),
|
||||
putsvc.WithObjectStorage(os),
|
||||
putsvc.WithContainerSource(c.cfgObject.cnrSource),
|
||||
putsvc.WithNetworkMapSource(c.netMapSource),
|
||||
putsvc.WithNetmapKeys(c),
|
||||
putsvc.WithNetworkState(c.cfgNetmap.state),
|
||||
keyStorage,
|
||||
c.putClientCache,
|
||||
newCachedMaxObjectSizeSource(c),
|
||||
os,
|
||||
c.cfgObject.cnrSource,
|
||||
c.netMapSource,
|
||||
c,
|
||||
c.cfgNetmap.state,
|
||||
putsvc.WithWorkerPools(c.cfgObject.pool.putRemote, c.cfgObject.pool.putLocal),
|
||||
putsvc.WithLogger(c.log),
|
||||
)
|
||||
}
|
||||
|
||||
func createPutSvcV2(sPut *putsvc.Service, keyStorage *util.KeyStorage) *putsvcV2.Service {
|
||||
return putsvcV2.NewService(
|
||||
putsvcV2.WithInternalService(sPut),
|
||||
putsvcV2.WithKeyStorage(keyStorage),
|
||||
)
|
||||
return putsvcV2.NewService(sPut, keyStorage)
|
||||
}
|
||||
|
||||
func createSearchSvc(c *cfg, keyStorage *util.KeyStorage, traverseGen *util.TraverserGenerator, coreConstructor *cache.ClientCache) *searchsvc.Service {
|
||||
ls := c.cfgObject.cfgLocalStorage.localStorage
|
||||
|
||||
return searchsvc.New(
|
||||
searchsvc.WithLogger(c.log),
|
||||
searchsvc.WithLocalStorageEngine(ls),
|
||||
searchsvc.WithClientConstructor(coreConstructor),
|
||||
searchsvc.WithTraverserGenerator(
|
||||
ls,
|
||||
coreConstructor,
|
||||
traverseGen.WithTraverseOptions(
|
||||
placement.WithoutSuccessTracking(),
|
||||
),
|
||||
),
|
||||
searchsvc.WithNetMapSource(c.netMapSource),
|
||||
searchsvc.WithKeyStorage(keyStorage),
|
||||
c.netMapSource,
|
||||
keyStorage,
|
||||
searchsvc.WithLogger(c.log),
|
||||
)
|
||||
}
|
||||
|
||||
func createSearchSvcV2(sSearch *searchsvc.Service, keyStorage *util.KeyStorage) *searchsvcV2.Service {
|
||||
return searchsvcV2.NewService(
|
||||
searchsvcV2.WithInternalService(sSearch),
|
||||
searchsvcV2.WithKeyStorage(keyStorage),
|
||||
)
|
||||
return searchsvcV2.NewService(sSearch, keyStorage)
|
||||
}
|
||||
|
||||
func createGetService(c *cfg, keyStorage *util.KeyStorage, traverseGen *util.TraverserGenerator,
|
||||
|
@ -382,24 +374,22 @@ func createGetServiceV2(sGet *getsvc.Service, keyStorage *util.KeyStorage) *gets
|
|||
func createDeleteService(c *cfg, keyStorage *util.KeyStorage, sGet *getsvc.Service,
|
||||
sSearch *searchsvc.Service, sPut *putsvc.Service) *deletesvc.Service {
|
||||
return deletesvc.New(
|
||||
deletesvc.WithLogger(c.log),
|
||||
deletesvc.WithHeadService(sGet),
|
||||
deletesvc.WithSearchService(sSearch),
|
||||
deletesvc.WithPutService(sPut),
|
||||
deletesvc.WithNetworkInfo(&delNetInfo{
|
||||
sGet,
|
||||
sSearch,
|
||||
sPut,
|
||||
&delNetInfo{
|
||||
State: c.cfgNetmap.state,
|
||||
tsLifetime: c.cfgObject.tombstoneLifetime,
|
||||
|
||||
cfg: c,
|
||||
}),
|
||||
deletesvc.WithKeyStorage(keyStorage),
|
||||
},
|
||||
keyStorage,
|
||||
deletesvc.WithLogger(c.log),
|
||||
)
|
||||
}
|
||||
|
||||
func createDeleteServiceV2(sDelete *deletesvc.Service) *deletesvcV2.Service {
|
||||
return deletesvcV2.NewService(
|
||||
deletesvcV2.WithInternalService(sDelete),
|
||||
)
|
||||
return deletesvcV2.NewService(sDelete)
|
||||
}
|
||||
|
||||
func createSplitService(c *cfg, sPutV2 *putsvcV2.Service, sGetV2 *getsvcV2.Service,
|
||||
|
@ -421,21 +411,16 @@ func createACLServiceV2(c *cfg, splitSvc *objectService.TransportSplitter) v2.Se
|
|||
irFetcher := createInnerRingFetcher(c)
|
||||
|
||||
return v2.New(
|
||||
v2.WithLogger(c.log),
|
||||
v2.WithIRFetcher(newCachedIRFetcher(irFetcher)),
|
||||
v2.WithNetmapSource(c.netMapSource),
|
||||
v2.WithContainerSource(
|
||||
splitSvc,
|
||||
c.netMapSource,
|
||||
newCachedIRFetcher(irFetcher),
|
||||
acl.NewChecker(
|
||||
c.cfgNetmap.state,
|
||||
c.cfgObject.eaclSource,
|
||||
eaclSDK.NewValidator(),
|
||||
ls),
|
||||
c.cfgObject.cnrSource,
|
||||
),
|
||||
v2.WithNextService(splitSvc),
|
||||
v2.WithEACLChecker(
|
||||
acl.NewChecker(new(acl.CheckerPrm).
|
||||
SetNetmapState(c.cfgNetmap.state).
|
||||
SetEACLSource(c.cfgObject.eaclSource).
|
||||
SetValidator(eaclSDK.NewValidator()).
|
||||
SetLocalStorage(ls),
|
||||
),
|
||||
),
|
||||
v2.WithLogger(c.log),
|
||||
)
|
||||
}
|
||||
|
||||
|
|
|
@ -1,10 +1,12 @@
|
|||
package acl
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/ecdsa"
|
||||
"crypto/elliptic"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
|
||||
|
@ -17,39 +19,12 @@ import (
|
|||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
frostfsecdsa "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/crypto/ecdsa"
|
||||
eaclSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/eacl"
|
||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
|
||||
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
||||
)
|
||||
|
||||
// CheckerPrm groups parameters for Checker
|
||||
// constructor.
|
||||
type CheckerPrm struct {
|
||||
eaclSrc container.EACLSource
|
||||
validator *eaclSDK.Validator
|
||||
localStorage *engine.StorageEngine
|
||||
state netmap.State
|
||||
}
|
||||
|
||||
func (c *CheckerPrm) SetEACLSource(v container.EACLSource) *CheckerPrm {
|
||||
c.eaclSrc = v
|
||||
return c
|
||||
}
|
||||
|
||||
func (c *CheckerPrm) SetValidator(v *eaclSDK.Validator) *CheckerPrm {
|
||||
c.validator = v
|
||||
return c
|
||||
}
|
||||
|
||||
func (c *CheckerPrm) SetLocalStorage(v *engine.StorageEngine) *CheckerPrm {
|
||||
c.localStorage = v
|
||||
return c
|
||||
}
|
||||
|
||||
func (c *CheckerPrm) SetNetmapState(v netmap.State) *CheckerPrm {
|
||||
c.state = v
|
||||
return c
|
||||
}
|
||||
|
||||
// Checker implements v2.ACLChecker interfaces and provides
|
||||
// ACL/eACL validation functionality.
|
||||
type Checker struct {
|
||||
|
@ -59,6 +34,18 @@ type Checker struct {
|
|||
state netmap.State
|
||||
}
|
||||
|
||||
type localStorage struct {
|
||||
ls *engine.StorageEngine
|
||||
}
|
||||
|
||||
func (s *localStorage) Head(ctx context.Context, addr oid.Address) (*objectSDK.Object, error) {
|
||||
if s.ls == nil {
|
||||
return nil, io.ErrUnexpectedEOF
|
||||
}
|
||||
|
||||
return engine.Head(ctx, s.ls, addr)
|
||||
}
|
||||
|
||||
// Various EACL check errors.
|
||||
var (
|
||||
errEACLDeniedByRule = errors.New("denied by rule")
|
||||
|
@ -71,23 +58,17 @@ var (
|
|||
|
||||
// NewChecker creates Checker.
|
||||
// Panics if at least one of the parameter is nil.
|
||||
func NewChecker(prm *CheckerPrm) *Checker {
|
||||
panicOnNil := func(fieldName string, field any) {
|
||||
if field == nil {
|
||||
panic(fmt.Sprintf("incorrect field %s (%T): %v", fieldName, field, field))
|
||||
}
|
||||
}
|
||||
|
||||
panicOnNil("EACLSource", prm.eaclSrc)
|
||||
panicOnNil("EACLValidator", prm.validator)
|
||||
panicOnNil("LocalStorageEngine", prm.localStorage)
|
||||
panicOnNil("NetmapState", prm.state)
|
||||
|
||||
func NewChecker(
|
||||
state netmap.State,
|
||||
eaclSrc container.EACLSource,
|
||||
validator *eaclSDK.Validator,
|
||||
localStorage *engine.StorageEngine,
|
||||
) *Checker {
|
||||
return &Checker{
|
||||
eaclSrc: prm.eaclSrc,
|
||||
validator: prm.validator,
|
||||
localStorage: prm.localStorage,
|
||||
state: prm.state,
|
||||
eaclSrc: eaclSrc,
|
||||
validator: validator,
|
||||
localStorage: localStorage,
|
||||
state: state,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -193,26 +174,14 @@ func getRole(reqInfo v2.RequestInfo) eaclSDK.Role {
|
|||
}
|
||||
|
||||
func (c *Checker) getHeaderSource(cnr cid.ID, msg any, reqInfo v2.RequestInfo) (eaclSDK.TypedHeaderSource, error) {
|
||||
hdrSrcOpts := make([]eaclV2.Option, 0, 3)
|
||||
|
||||
hdrSrcOpts = append(hdrSrcOpts,
|
||||
eaclV2.WithLocalObjectStorage(c.localStorage),
|
||||
eaclV2.WithCID(cnr),
|
||||
eaclV2.WithOID(reqInfo.ObjectID()),
|
||||
)
|
||||
|
||||
var xHeaderSource eaclV2.XHeaderSource
|
||||
if req, ok := msg.(eaclV2.Request); ok {
|
||||
hdrSrcOpts = append(hdrSrcOpts, eaclV2.WithServiceRequest(req))
|
||||
xHeaderSource = eaclV2.NewRequestXHeaderSource(req)
|
||||
} else {
|
||||
hdrSrcOpts = append(hdrSrcOpts,
|
||||
eaclV2.WithServiceResponse(
|
||||
msg.(eaclV2.Response),
|
||||
reqInfo.Request().(eaclV2.Request),
|
||||
),
|
||||
)
|
||||
xHeaderSource = eaclV2.NewResponseXHeaderSource(msg.(eaclV2.Response), reqInfo.Request().(eaclV2.Request))
|
||||
}
|
||||
|
||||
hdrSrc, err := eaclV2.NewMessageHeaderSource(hdrSrcOpts...)
|
||||
hdrSrc, err := eaclV2.NewMessageHeaderSource(&localStorage{ls: c.localStorage}, xHeaderSource, cnr, eaclV2.WithOID(reqInfo.ObjectID()))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("can't parse headers: %w", err)
|
||||
}
|
||||
|
|
|
@ -27,12 +27,11 @@ func (e emptyNetmapState) CurrentEpoch() uint64 {
|
|||
}
|
||||
|
||||
func TestStickyCheck(t *testing.T) {
|
||||
checker := NewChecker(new(CheckerPrm).
|
||||
SetLocalStorage(&engine.StorageEngine{}).
|
||||
SetValidator(eaclSDK.NewValidator()).
|
||||
SetEACLSource(emptyEACLSource{}).
|
||||
SetNetmapState(emptyNetmapState{}),
|
||||
)
|
||||
checker := NewChecker(
|
||||
emptyNetmapState{},
|
||||
emptyEACLSource{},
|
||||
eaclSDK.NewValidator(),
|
||||
&engine.StorageEngine{})
|
||||
|
||||
t.Run("system role", func(t *testing.T) {
|
||||
var info v2.RequestInfo
|
||||
|
|
|
@ -103,9 +103,9 @@ func TestHeadRequest(t *testing.T) {
|
|||
|
||||
newSource := func(t *testing.T) eaclSDK.TypedHeaderSource {
|
||||
hdrSrc, err := NewMessageHeaderSource(
|
||||
WithObjectStorage(lStorage),
|
||||
WithServiceRequest(req),
|
||||
WithCID(addr.Container()),
|
||||
lStorage,
|
||||
NewRequestXHeaderSource(req),
|
||||
addr.Container(),
|
||||
WithOID(&id))
|
||||
require.NoError(t, err)
|
||||
return hdrSrc
|
||||
|
|
|
@ -21,7 +21,7 @@ type Option func(*cfg)
|
|||
type cfg struct {
|
||||
storage ObjectStorage
|
||||
|
||||
msg xHeaderSource
|
||||
msg XHeaderSource
|
||||
|
||||
cnr cid.ID
|
||||
obj *oid.ID
|
||||
|
@ -46,14 +46,12 @@ type headerSource struct {
|
|||
incompleteObjectHeaders bool
|
||||
}
|
||||
|
||||
func defaultCfg() *cfg {
|
||||
return &cfg{
|
||||
storage: new(localStorage),
|
||||
func NewMessageHeaderSource(os ObjectStorage, xhs XHeaderSource, cnrID cid.ID, opts ...Option) (eaclSDK.TypedHeaderSource, error) {
|
||||
cfg := &cfg{
|
||||
storage: os,
|
||||
cnr: cnrID,
|
||||
msg: xhs,
|
||||
}
|
||||
}
|
||||
|
||||
func NewMessageHeaderSource(opts ...Option) (eaclSDK.TypedHeaderSource, error) {
|
||||
cfg := defaultCfg()
|
||||
|
||||
for i := range opts {
|
||||
opts[i](cfg)
|
||||
|
@ -70,7 +68,7 @@ func NewMessageHeaderSource(opts ...Option) (eaclSDK.TypedHeaderSource, error) {
|
|||
return nil, err
|
||||
}
|
||||
|
||||
res.requestHeaders = requestHeaders(cfg.msg)
|
||||
res.requestHeaders = cfg.msg.GetXHeaders()
|
||||
|
||||
return res, nil
|
||||
}
|
||||
|
@ -96,10 +94,6 @@ func (x xHeader) Value() string {
|
|||
return (*session.XHeader)(&x).GetValue()
|
||||
}
|
||||
|
||||
func requestHeaders(msg xHeaderSource) []eaclSDK.Header {
|
||||
return msg.GetXHeaders()
|
||||
}
|
||||
|
||||
var errMissingOID = errors.New("object ID is missing")
|
||||
|
||||
func (h *cfg) readObjectHeaders(dst *headerSource) error {
|
||||
|
|
|
@ -1,22 +0,0 @@
|
|||
package v2
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
|
||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
)
|
||||
|
||||
type localStorage struct {
|
||||
ls *engine.StorageEngine
|
||||
}
|
||||
|
||||
func (s *localStorage) Head(ctx context.Context, addr oid.Address) (*objectSDK.Object, error) {
|
||||
if s.ls == nil {
|
||||
return nil, io.ErrUnexpectedEOF
|
||||
}
|
||||
|
||||
return engine.Head(ctx, s.ls, addr)
|
||||
}
|
|
@ -1,48 +1,9 @@
|
|||
package v2
|
||||
|
||||
import (
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
|
||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
)
|
||||
|
||||
func WithObjectStorage(v ObjectStorage) Option {
|
||||
return func(c *cfg) {
|
||||
c.storage = v
|
||||
}
|
||||
}
|
||||
|
||||
func WithLocalObjectStorage(v *engine.StorageEngine) Option {
|
||||
return func(c *cfg) {
|
||||
c.storage = &localStorage{
|
||||
ls: v,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func WithServiceRequest(v Request) Option {
|
||||
return func(c *cfg) {
|
||||
c.msg = requestXHeaderSource{
|
||||
req: v,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func WithServiceResponse(resp Response, req Request) Option {
|
||||
return func(c *cfg) {
|
||||
c.msg = responseXHeaderSource{
|
||||
resp: resp,
|
||||
req: req,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func WithCID(v cid.ID) Option {
|
||||
return func(c *cfg) {
|
||||
c.cnr = v
|
||||
}
|
||||
}
|
||||
|
||||
func WithOID(v *oid.ID) Option {
|
||||
return func(c *cfg) {
|
||||
c.obj = v
|
||||
|
|
|
@ -5,7 +5,7 @@ import (
|
|||
eaclSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/eacl"
|
||||
)
|
||||
|
||||
type xHeaderSource interface {
|
||||
type XHeaderSource interface {
|
||||
GetXHeaders() []eaclSDK.Header
|
||||
}
|
||||
|
||||
|
@ -13,12 +13,20 @@ type requestXHeaderSource struct {
|
|||
req Request
|
||||
}
|
||||
|
||||
func NewRequestXHeaderSource(req Request) XHeaderSource {
|
||||
return requestXHeaderSource{req: req}
|
||||
}
|
||||
|
||||
type responseXHeaderSource struct {
|
||||
resp Response
|
||||
|
||||
req Request
|
||||
}
|
||||
|
||||
func NewResponseXHeaderSource(resp Response, req Request) XHeaderSource {
|
||||
return responseXHeaderSource{resp: resp, req: req}
|
||||
}
|
||||
|
||||
func (s requestXHeaderSource) GetXHeaders() []eaclSDK.Header {
|
||||
ln := 0
|
||||
|
||||
|
|
|
@ -1,9 +1,6 @@
|
|||
package v2
|
||||
|
||||
import (
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
|
||||
objectSvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||
)
|
||||
|
||||
|
@ -13,39 +10,3 @@ func WithLogger(v *logger.Logger) Option {
|
|||
c.log = v
|
||||
}
|
||||
}
|
||||
|
||||
// WithNetmapSource return option to set
|
||||
// netmap source.
|
||||
func WithNetmapSource(v netmap.Source) Option {
|
||||
return func(c *cfg) {
|
||||
c.nm = v
|
||||
}
|
||||
}
|
||||
|
||||
// WithContainerSource returns option to set container source.
|
||||
func WithContainerSource(v container.Source) Option {
|
||||
return func(c *cfg) {
|
||||
c.containers = v
|
||||
}
|
||||
}
|
||||
|
||||
// WithNextService returns option to set next object service.
|
||||
func WithNextService(v objectSvc.ServiceServer) Option {
|
||||
return func(c *cfg) {
|
||||
c.next = v
|
||||
}
|
||||
}
|
||||
|
||||
// WithEACLChecker returns option to set eACL checker.
|
||||
func WithEACLChecker(v ACLChecker) Option {
|
||||
return func(c *cfg) {
|
||||
c.checker = v
|
||||
}
|
||||
}
|
||||
|
||||
// WithIRFetcher returns option to set inner ring fetcher.
|
||||
func WithIRFetcher(v InnerRingFetcher) Option {
|
||||
return func(c *cfg) {
|
||||
c.irFetcher = v
|
||||
}
|
||||
}
|
||||
|
|
|
@ -73,32 +73,26 @@ type cfg struct {
|
|||
next object.ServiceServer
|
||||
}
|
||||
|
||||
func defaultCfg() *cfg {
|
||||
return &cfg{
|
||||
log: &logger.Logger{Logger: zap.L()},
|
||||
}
|
||||
}
|
||||
|
||||
// New is a constructor for object ACL checking service.
|
||||
func New(opts ...Option) Service {
|
||||
cfg := defaultCfg()
|
||||
func New(next object.ServiceServer,
|
||||
nm netmap.Source,
|
||||
irf InnerRingFetcher,
|
||||
acl ACLChecker,
|
||||
cs container.Source,
|
||||
opts ...Option) Service {
|
||||
cfg := &cfg{
|
||||
log: &logger.Logger{Logger: zap.L()},
|
||||
next: next,
|
||||
nm: nm,
|
||||
irFetcher: irf,
|
||||
checker: acl,
|
||||
containers: cs,
|
||||
}
|
||||
|
||||
for i := range opts {
|
||||
opts[i](cfg)
|
||||
}
|
||||
|
||||
panicOnNil := func(v any, name string) {
|
||||
if v == nil {
|
||||
panic(fmt.Sprintf("ACL service: %s is nil", name))
|
||||
}
|
||||
}
|
||||
|
||||
panicOnNil(cfg.next, "next Service")
|
||||
panicOnNil(cfg.nm, "netmap client")
|
||||
panicOnNil(cfg.irFetcher, "inner Ring fetcher")
|
||||
panicOnNil(cfg.checker, "acl checker")
|
||||
panicOnNil(cfg.containers, "container source")
|
||||
|
||||
return Service{
|
||||
cfg: cfg,
|
||||
c: senderClassifier{
|
||||
|
|
|
@ -62,16 +62,22 @@ type cfg struct {
|
|||
keyStorage *util.KeyStorage
|
||||
}
|
||||
|
||||
func defaultCfg() *cfg {
|
||||
return &cfg{
|
||||
log: &logger.Logger{Logger: zap.L()},
|
||||
}
|
||||
}
|
||||
|
||||
// New creates, initializes and returns utility serving
|
||||
// Object.Get service requests.
|
||||
func New(opts ...Option) *Service {
|
||||
c := defaultCfg()
|
||||
func New(gs *getsvc.Service,
|
||||
ss *searchsvc.Service,
|
||||
ps *putsvc.Service,
|
||||
ni NetworkInfo,
|
||||
ks *util.KeyStorage,
|
||||
opts ...Option) *Service {
|
||||
c := &cfg{
|
||||
log: &logger.Logger{Logger: zap.L()},
|
||||
header: &headSvcWrapper{s: gs},
|
||||
searcher: &searchSvcWrapper{s: ss},
|
||||
placer: &putSvcWrapper{s: ps},
|
||||
netInfo: ni,
|
||||
keyStorage: ks,
|
||||
}
|
||||
|
||||
for i := range opts {
|
||||
opts[i](c)
|
||||
|
@ -88,39 +94,3 @@ func WithLogger(l *logger.Logger) Option {
|
|||
c.log = &logger.Logger{Logger: l.With(zap.String("component", "objectSDK.Delete service"))}
|
||||
}
|
||||
}
|
||||
|
||||
// WithHeadService returns option to set Head service
|
||||
// to work with object headers.
|
||||
func WithHeadService(h *getsvc.Service) Option {
|
||||
return func(c *cfg) {
|
||||
c.header = (*headSvcWrapper)(h)
|
||||
}
|
||||
}
|
||||
|
||||
// WithSearchService returns option to set search service.
|
||||
func WithSearchService(s *searchsvc.Service) Option {
|
||||
return func(c *cfg) {
|
||||
c.searcher = (*searchSvcWrapper)(s)
|
||||
}
|
||||
}
|
||||
|
||||
// WithPutService returns option to specify put service.
|
||||
func WithPutService(p *putsvc.Service) Option {
|
||||
return func(c *cfg) {
|
||||
c.placer = (*putSvcWrapper)(p)
|
||||
}
|
||||
}
|
||||
|
||||
// WithNetworkInfo returns option to set network information source.
|
||||
func WithNetworkInfo(netInfo NetworkInfo) Option {
|
||||
return func(c *cfg) {
|
||||
c.netInfo = netInfo
|
||||
}
|
||||
}
|
||||
|
||||
// WithKeyStorage returns option to set local private key storage.
|
||||
func WithKeyStorage(ks *util.KeyStorage) Option {
|
||||
return func(c *cfg) {
|
||||
c.keyStorage = ks
|
||||
}
|
||||
}
|
||||
|
|
|
@ -11,11 +11,17 @@ import (
|
|||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
)
|
||||
|
||||
type headSvcWrapper getsvc.Service
|
||||
type headSvcWrapper struct {
|
||||
s *getsvc.Service
|
||||
}
|
||||
|
||||
type searchSvcWrapper searchsvc.Service
|
||||
type searchSvcWrapper struct {
|
||||
s *searchsvc.Service
|
||||
}
|
||||
|
||||
type putSvcWrapper putsvc.Service
|
||||
type putSvcWrapper struct {
|
||||
s *putsvc.Service
|
||||
}
|
||||
|
||||
type simpleIDWriter struct {
|
||||
ids []oid.ID
|
||||
|
@ -30,7 +36,7 @@ func (w *headSvcWrapper) headAddress(ctx context.Context, exec *execCtx, addr oi
|
|||
p.WithRawFlag(true)
|
||||
p.WithAddress(addr)
|
||||
|
||||
err := (*getsvc.Service)(w).Head(ctx, p)
|
||||
err := w.s.Head(ctx, p)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -94,7 +100,7 @@ func (w *searchSvcWrapper) splitMembers(ctx context.Context, exec *execCtx) ([]o
|
|||
p.WithContainerID(exec.containerID())
|
||||
p.WithSearchFilters(fs)
|
||||
|
||||
err := (*searchsvc.Service)(w).Search(ctx, p)
|
||||
err := w.s.Search(ctx, p)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -109,7 +115,7 @@ func (s *simpleIDWriter) WriteIDs(ids []oid.ID) error {
|
|||
}
|
||||
|
||||
func (w *putSvcWrapper) put(ctx context.Context, exec *execCtx) (*oid.ID, error) {
|
||||
streamer, err := (*putsvc.Service)(w).Put()
|
||||
streamer, err := w.s.Put()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
@ -9,26 +9,13 @@ import (
|
|||
|
||||
// Service implements Delete operation of Object service v2.
|
||||
type Service struct {
|
||||
*cfg
|
||||
}
|
||||
|
||||
// Option represents Service constructor option.
|
||||
type Option func(*cfg)
|
||||
|
||||
type cfg struct {
|
||||
svc *deletesvc.Service
|
||||
}
|
||||
|
||||
// NewService constructs Service instance from provided options.
|
||||
func NewService(opts ...Option) *Service {
|
||||
c := new(cfg)
|
||||
|
||||
for i := range opts {
|
||||
opts[i](c)
|
||||
}
|
||||
|
||||
func NewService(svc *deletesvc.Service) *Service {
|
||||
return &Service{
|
||||
cfg: c,
|
||||
svc: svc,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -51,9 +38,3 @@ func (s *Service) Delete(ctx context.Context, req *objectV2.DeleteRequest) (*obj
|
|||
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
func WithInternalService(v *deletesvc.Service) Option {
|
||||
return func(c *cfg) {
|
||||
c.svc = v
|
||||
}
|
||||
}
|
||||
|
|
|
@ -46,8 +46,6 @@ type cfg struct {
|
|||
|
||||
fmtValidator *object.FormatValidator
|
||||
|
||||
fmtValidatorOpts []object.FormatValidatorOption
|
||||
|
||||
networkState netmap.State
|
||||
|
||||
clientConstructor ClientConstructor
|
||||
|
@ -55,22 +53,34 @@ type cfg struct {
|
|||
log *logger.Logger
|
||||
}
|
||||
|
||||
func defaultCfg() *cfg {
|
||||
return &cfg{
|
||||
func NewService(ks *objutil.KeyStorage,
|
||||
cc ClientConstructor,
|
||||
ms MaxSizeSource,
|
||||
os ObjectStorage,
|
||||
cs container.Source,
|
||||
ns netmap.Source,
|
||||
nk netmap.AnnouncedKeys,
|
||||
nst netmap.State,
|
||||
opts ...Option) *Service {
|
||||
c := &cfg{
|
||||
remotePool: util.NewPseudoWorkerPool(),
|
||||
localPool: util.NewPseudoWorkerPool(),
|
||||
log: &logger.Logger{Logger: zap.L()},
|
||||
keyStorage: ks,
|
||||
clientConstructor: cc,
|
||||
maxSizeSrc: ms,
|
||||
localStore: os,
|
||||
cnrSrc: cs,
|
||||
netMapSrc: ns,
|
||||
netmapKeys: nk,
|
||||
networkState: nst,
|
||||
}
|
||||
}
|
||||
|
||||
func NewService(opts ...Option) *Service {
|
||||
c := defaultCfg()
|
||||
|
||||
for i := range opts {
|
||||
opts[i](c)
|
||||
}
|
||||
|
||||
c.fmtValidator = object.NewFormatValidator(c.fmtValidatorOpts...)
|
||||
c.fmtValidator = object.NewFormatValidator(object.WithLockSource(os), object.WithNetState(nst))
|
||||
|
||||
return &Service{
|
||||
cfg: c,
|
||||
|
@ -83,62 +93,12 @@ func (p *Service) Put() (*Streamer, error) {
|
|||
}, nil
|
||||
}
|
||||
|
||||
func WithKeyStorage(v *objutil.KeyStorage) Option {
|
||||
return func(c *cfg) {
|
||||
c.keyStorage = v
|
||||
}
|
||||
}
|
||||
|
||||
func WithMaxSizeSource(v MaxSizeSource) Option {
|
||||
return func(c *cfg) {
|
||||
c.maxSizeSrc = v
|
||||
}
|
||||
}
|
||||
|
||||
func WithObjectStorage(v ObjectStorage) Option {
|
||||
return func(c *cfg) {
|
||||
c.localStore = v
|
||||
c.fmtValidatorOpts = append(c.fmtValidatorOpts, object.WithLockSource(v))
|
||||
}
|
||||
}
|
||||
|
||||
func WithContainerSource(v container.Source) Option {
|
||||
return func(c *cfg) {
|
||||
c.cnrSrc = v
|
||||
}
|
||||
}
|
||||
|
||||
func WithNetworkMapSource(v netmap.Source) Option {
|
||||
return func(c *cfg) {
|
||||
c.netMapSrc = v
|
||||
}
|
||||
}
|
||||
|
||||
func WithWorkerPools(remote, local util.WorkerPool) Option {
|
||||
return func(c *cfg) {
|
||||
c.remotePool, c.localPool = remote, local
|
||||
}
|
||||
}
|
||||
|
||||
func WithNetmapKeys(v netmap.AnnouncedKeys) Option {
|
||||
return func(c *cfg) {
|
||||
c.netmapKeys = v
|
||||
}
|
||||
}
|
||||
|
||||
func WithNetworkState(v netmap.State) Option {
|
||||
return func(c *cfg) {
|
||||
c.networkState = v
|
||||
c.fmtValidatorOpts = append(c.fmtValidatorOpts, object.WithNetState(v))
|
||||
}
|
||||
}
|
||||
|
||||
func WithClientConstructor(v ClientConstructor) Option {
|
||||
return func(c *cfg) {
|
||||
c.clientConstructor = v
|
||||
}
|
||||
}
|
||||
|
||||
func WithLogger(l *logger.Logger) Option {
|
||||
return func(c *cfg) {
|
||||
c.log = l
|
||||
|
|
|
@ -12,27 +12,15 @@ import (
|
|||
|
||||
// Service implements Put operation of Object service v2.
|
||||
type Service struct {
|
||||
*cfg
|
||||
}
|
||||
|
||||
// Option represents Service constructor option.
|
||||
type Option func(*cfg)
|
||||
|
||||
type cfg struct {
|
||||
svc *putsvc.Service
|
||||
keyStorage *util.KeyStorage
|
||||
}
|
||||
|
||||
// NewService constructs Service instance from provided options.
|
||||
func NewService(opts ...Option) *Service {
|
||||
c := new(cfg)
|
||||
|
||||
for i := range opts {
|
||||
opts[i](c)
|
||||
}
|
||||
|
||||
func NewService(svc *putsvc.Service, ks *util.KeyStorage) *Service {
|
||||
return &Service{
|
||||
cfg: c,
|
||||
svc: svc,
|
||||
keyStorage: ks,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -52,15 +40,3 @@ func (s *Service) Put() (object.PutObjectStream, error) {
|
|||
func (s *Service) PutSingle(ctx context.Context, req *objectAPI.PutSingleRequest) (*objectAPI.PutSingleResponse, error) {
|
||||
return s.svc.PutSingle(ctx, req)
|
||||
}
|
||||
|
||||
func WithInternalService(v *putsvc.Service) Option {
|
||||
return func(c *cfg) {
|
||||
c.svc = v
|
||||
}
|
||||
}
|
||||
|
||||
func WithKeyStorage(ks *util.KeyStorage) Option {
|
||||
return func(c *cfg) {
|
||||
c.keyStorage = ks
|
||||
}
|
||||
}
|
||||
|
|
|
@ -55,17 +55,28 @@ type cfg struct {
|
|||
keyStore *util.KeyStorage
|
||||
}
|
||||
|
||||
func defaultCfg() *cfg {
|
||||
return &cfg{
|
||||
log: &logger.Logger{Logger: zap.L()},
|
||||
clientConstructor: new(clientConstructorWrapper),
|
||||
}
|
||||
}
|
||||
|
||||
// New creates, initializes and returns utility serving
|
||||
// Object.Get service requests.
|
||||
func New(opts ...Option) *Service {
|
||||
c := defaultCfg()
|
||||
func New(e *engine.StorageEngine,
|
||||
cc ClientConstructor,
|
||||
tg *util.TraverserGenerator,
|
||||
ns netmap.Source,
|
||||
ks *util.KeyStorage,
|
||||
opts ...Option) *Service {
|
||||
c := &cfg{
|
||||
log: &logger.Logger{Logger: zap.L()},
|
||||
clientConstructor: &clientConstructorWrapper{
|
||||
constructor: cc,
|
||||
},
|
||||
localStorage: &storageEngineWrapper{
|
||||
storage: e,
|
||||
},
|
||||
traverserGenerator: (*traverseGeneratorWrapper)(tg),
|
||||
currentEpochReceiver: &nmSrcWrapper{
|
||||
nmSrc: ns,
|
||||
},
|
||||
keyStore: ks,
|
||||
}
|
||||
|
||||
for i := range opts {
|
||||
opts[i](c)
|
||||
|
@ -82,46 +93,3 @@ func WithLogger(l *logger.Logger) Option {
|
|||
c.log = &logger.Logger{Logger: l.With(zap.String("component", "Object.Search service"))}
|
||||
}
|
||||
}
|
||||
|
||||
// WithLocalStorageEngine returns option to set local storage
|
||||
// instance.
|
||||
func WithLocalStorageEngine(e *engine.StorageEngine) Option {
|
||||
return func(c *cfg) {
|
||||
c.localStorage = &storageEngineWrapper{
|
||||
storage: e,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// WithClientConstructor returns option to set constructor of remote node clients.
|
||||
func WithClientConstructor(v ClientConstructor) Option {
|
||||
return func(c *cfg) {
|
||||
c.clientConstructor.(*clientConstructorWrapper).constructor = v
|
||||
}
|
||||
}
|
||||
|
||||
// WithTraverserGenerator returns option to set generator of
|
||||
// placement traverser to get the objects from containers.
|
||||
func WithTraverserGenerator(t *util.TraverserGenerator) Option {
|
||||
return func(c *cfg) {
|
||||
c.traverserGenerator = (*traverseGeneratorWrapper)(t)
|
||||
}
|
||||
}
|
||||
|
||||
// WithNetMapSource returns option to set network
|
||||
// map storage to receive current network state.
|
||||
func WithNetMapSource(nmSrc netmap.Source) Option {
|
||||
return func(c *cfg) {
|
||||
c.currentEpochReceiver = &nmSrcWrapper{
|
||||
nmSrc: nmSrc,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// WithKeyStorage returns option to set private
|
||||
// key storage for session tokens and node key.
|
||||
func WithKeyStorage(store *util.KeyStorage) Option {
|
||||
return func(c *cfg) {
|
||||
c.keyStore = store
|
||||
}
|
||||
}
|
||||
|
|
|
@ -9,28 +9,15 @@ import (
|
|||
|
||||
// Service implements Search operation of Object service v2.
|
||||
type Service struct {
|
||||
*cfg
|
||||
}
|
||||
|
||||
// Option represents Service constructor option.
|
||||
type Option func(*cfg)
|
||||
|
||||
type cfg struct {
|
||||
svc *searchsvc.Service
|
||||
|
||||
keyStorage *objutil.KeyStorage
|
||||
}
|
||||
|
||||
// NewService constructs Service instance from provided options.
|
||||
func NewService(opts ...Option) *Service {
|
||||
c := new(cfg)
|
||||
|
||||
for i := range opts {
|
||||
opts[i](c)
|
||||
}
|
||||
|
||||
func NewService(s *searchsvc.Service, ks *objutil.KeyStorage) *Service {
|
||||
return &Service{
|
||||
cfg: c,
|
||||
svc: s,
|
||||
keyStorage: ks,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -43,18 +30,3 @@ func (s *Service) Search(req *objectV2.SearchRequest, stream objectSvc.SearchStr
|
|||
|
||||
return s.svc.Search(stream.Context(), *p)
|
||||
}
|
||||
|
||||
// WithInternalService returns option to set entity
|
||||
// that handles request payload.
|
||||
func WithInternalService(v *searchsvc.Service) Option {
|
||||
return func(c *cfg) {
|
||||
c.svc = v
|
||||
}
|
||||
}
|
||||
|
||||
// WithKeyStorage returns option to set local private key storage.
|
||||
func WithKeyStorage(ks *objutil.KeyStorage) Option {
|
||||
return func(c *cfg) {
|
||||
c.keyStorage = ks
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Add table
Reference in a new issue