forked from TrueCloudLab/frostfs-node
[#1680] service/object: Fail all operations in maintenance mode
Storage node should not provide NeoFS Object API service when it is under maintenance. Declare `Common` service that unifies behavior of all object operations. The implementation pre-checks if node is under maintenance and returns `apistatus.NodeUnderMaintenance` if so. Use `Common` service as a first logical processor in object service pipeline. Signed-off-by: Leonard Lyubich <ctulhurider@gmail.com>
This commit is contained in:
parent
05420173cc
commit
1406d096a2
7 changed files with 98 additions and 47 deletions
|
@ -254,8 +254,7 @@ func initObjectService(c *cfg) {
|
||||||
c.workers = append(c.workers, pol)
|
c.workers = append(c.workers, pol)
|
||||||
|
|
||||||
var os putsvc.ObjectStorage = engineWithoutNotifications{
|
var os putsvc.ObjectStorage = engineWithoutNotifications{
|
||||||
e: ls,
|
e: ls,
|
||||||
state: &c.internals,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if c.cfgNotifications.enabled {
|
if c.cfgNotifications.enabled {
|
||||||
|
@ -291,7 +290,7 @@ func initObjectService(c *cfg) {
|
||||||
|
|
||||||
sSearch := searchsvc.New(
|
sSearch := searchsvc.New(
|
||||||
searchsvc.WithLogger(c.log),
|
searchsvc.WithLogger(c.log),
|
||||||
searchsvc.WithLocalStorageEngine(ls, &c.internals),
|
searchsvc.WithLocalStorageEngine(ls),
|
||||||
searchsvc.WithClientConstructor(coreConstructor),
|
searchsvc.WithClientConstructor(coreConstructor),
|
||||||
searchsvc.WithTraverserGenerator(
|
searchsvc.WithTraverserGenerator(
|
||||||
traverseGen.WithTraverseOptions(
|
traverseGen.WithTraverseOptions(
|
||||||
|
@ -318,7 +317,6 @@ func initObjectService(c *cfg) {
|
||||||
),
|
),
|
||||||
getsvc.WithNetMapSource(c.netMapSource),
|
getsvc.WithNetMapSource(c.netMapSource),
|
||||||
getsvc.WithKeyStorage(keyStorage),
|
getsvc.WithKeyStorage(keyStorage),
|
||||||
getsvc.WithNodeState(&c.internals),
|
|
||||||
)
|
)
|
||||||
|
|
||||||
*c.cfgObject.getSvc = *sGet // need smth better
|
*c.cfgObject.getSvc = *sGet // need smth better
|
||||||
|
@ -378,8 +376,11 @@ func initObjectService(c *cfg) {
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var commonSvc objectService.Common
|
||||||
|
commonSvc.Init(&c.internals, aclSvc)
|
||||||
|
|
||||||
respSvc := objectService.NewResponseService(
|
respSvc := objectService.NewResponseService(
|
||||||
aclSvc,
|
&commonSvc,
|
||||||
c.respSvc,
|
c.respSvc,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -583,16 +584,8 @@ func (e engineWithNotifications) Put(o *objectSDK.Object) error {
|
||||||
|
|
||||||
type engineWithoutNotifications struct {
|
type engineWithoutNotifications struct {
|
||||||
e *engine.StorageEngine
|
e *engine.StorageEngine
|
||||||
|
|
||||||
state util.NodeState
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e engineWithoutNotifications) Put(o *objectSDK.Object) error {
|
func (e engineWithoutNotifications) Put(o *objectSDK.Object) error {
|
||||||
if e.state.IsMaintenance() {
|
|
||||||
var st apistatus.NodeUnderMaintenance
|
|
||||||
|
|
||||||
return st
|
|
||||||
}
|
|
||||||
|
|
||||||
return engine.Put(e.e, o)
|
return engine.Put(e.e, o)
|
||||||
}
|
}
|
||||||
|
|
91
pkg/services/object/common.go
Normal file
91
pkg/services/object/common.go
Normal file
|
@ -0,0 +1,91 @@
|
||||||
|
package object
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
objectV2 "github.com/nspcc-dev/neofs-api-go/v2/object"
|
||||||
|
apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status"
|
||||||
|
)
|
||||||
|
|
||||||
|
// NodeState is storage node state processed by Object service.
|
||||||
|
type NodeState interface {
|
||||||
|
// IsMaintenance checks if node is under maintenance. Node MUST NOT serve
|
||||||
|
// local object operations. Node MUST respond with apistatus.NodeUnderMaintenance
|
||||||
|
// error if IsMaintenance returns true.
|
||||||
|
IsMaintenance() bool
|
||||||
|
}
|
||||||
|
|
||||||
|
// Common is an Object API ServiceServer which encapsulates logic spread to all
|
||||||
|
// object operations.
|
||||||
|
//
|
||||||
|
// If underlying NodeState.IsMaintenance returns true, all operations are
|
||||||
|
// immediately failed with apistatus.NodeUnderMaintenance.
|
||||||
|
type Common struct {
|
||||||
|
state NodeState
|
||||||
|
|
||||||
|
nextHandler ServiceServer
|
||||||
|
}
|
||||||
|
|
||||||
|
// Init initializes the Common instance.
|
||||||
|
func (x *Common) Init(state NodeState, nextHandler ServiceServer) {
|
||||||
|
x.state = state
|
||||||
|
x.nextHandler = nextHandler
|
||||||
|
}
|
||||||
|
|
||||||
|
var errMaintenance apistatus.NodeUnderMaintenance
|
||||||
|
|
||||||
|
func (x *Common) Get(req *objectV2.GetRequest, stream GetObjectStream) error {
|
||||||
|
if x.state.IsMaintenance() {
|
||||||
|
return errMaintenance
|
||||||
|
}
|
||||||
|
|
||||||
|
return x.nextHandler.Get(req, stream)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (x *Common) Put(ctx context.Context) (PutObjectStream, error) {
|
||||||
|
if x.state.IsMaintenance() {
|
||||||
|
return nil, errMaintenance
|
||||||
|
}
|
||||||
|
|
||||||
|
return x.nextHandler.Put(ctx)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (x *Common) Head(ctx context.Context, req *objectV2.HeadRequest) (*objectV2.HeadResponse, error) {
|
||||||
|
if x.state.IsMaintenance() {
|
||||||
|
return nil, errMaintenance
|
||||||
|
}
|
||||||
|
|
||||||
|
return x.nextHandler.Head(ctx, req)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (x *Common) Search(req *objectV2.SearchRequest, stream SearchStream) error {
|
||||||
|
if x.state.IsMaintenance() {
|
||||||
|
return errMaintenance
|
||||||
|
}
|
||||||
|
|
||||||
|
return x.nextHandler.Search(req, stream)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (x *Common) Delete(ctx context.Context, req *objectV2.DeleteRequest) (*objectV2.DeleteResponse, error) {
|
||||||
|
if x.state.IsMaintenance() {
|
||||||
|
return nil, errMaintenance
|
||||||
|
}
|
||||||
|
|
||||||
|
return x.nextHandler.Delete(ctx, req)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (x *Common) GetRange(req *objectV2.GetRangeRequest, stream GetObjectRangeStream) error {
|
||||||
|
if x.state.IsMaintenance() {
|
||||||
|
return errMaintenance
|
||||||
|
}
|
||||||
|
|
||||||
|
return x.nextHandler.GetRange(req, stream)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (x *Common) GetRangeHash(ctx context.Context, req *objectV2.GetRangeHashRequest) (*objectV2.GetRangeHashResponse, error) {
|
||||||
|
if x.state.IsMaintenance() {
|
||||||
|
return nil, errMaintenance
|
||||||
|
}
|
||||||
|
|
||||||
|
return x.nextHandler.GetRangeHash(ctx, req)
|
||||||
|
}
|
|
@ -130,10 +130,3 @@ func WithKeyStorage(store *util.KeyStorage) Option {
|
||||||
c.keyStore = store
|
c.keyStore = store
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// WithNodeState provides util.NodeState to Service.
|
|
||||||
func WithNodeState(v util.NodeState) Option {
|
|
||||||
return func(c *cfg) {
|
|
||||||
c.localStorage.(*storageEngineWrapper).state = v
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
|
@ -10,7 +10,6 @@ import (
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine"
|
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine"
|
||||||
internal "github.com/nspcc-dev/neofs-node/pkg/services/object/internal/client"
|
internal "github.com/nspcc-dev/neofs-node/pkg/services/object/internal/client"
|
||||||
internalclient "github.com/nspcc-dev/neofs-node/pkg/services/object/internal/client"
|
internalclient "github.com/nspcc-dev/neofs-node/pkg/services/object/internal/client"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/services/object/util"
|
|
||||||
apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status"
|
apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status"
|
||||||
"github.com/nspcc-dev/neofs-sdk-go/object"
|
"github.com/nspcc-dev/neofs-sdk-go/object"
|
||||||
)
|
)
|
||||||
|
@ -30,8 +29,6 @@ type clientWrapper struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
type storageEngineWrapper struct {
|
type storageEngineWrapper struct {
|
||||||
state util.NodeState
|
|
||||||
|
|
||||||
engine *engine.StorageEngine
|
engine *engine.StorageEngine
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -199,11 +196,6 @@ func (c *clientWrapper) get(exec *execCtx, key *ecdsa.PrivateKey) (*object.Objec
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *storageEngineWrapper) get(exec *execCtx) (*object.Object, error) {
|
func (e *storageEngineWrapper) get(exec *execCtx) (*object.Object, error) {
|
||||||
if e.state != nil && e.state.IsMaintenance() {
|
|
||||||
var st apistatus.NodeUnderMaintenance
|
|
||||||
return nil, st
|
|
||||||
}
|
|
||||||
|
|
||||||
if exec.headOnly() {
|
if exec.headOnly() {
|
||||||
var headPrm engine.HeadPrm
|
var headPrm engine.HeadPrm
|
||||||
headPrm.WithAddress(exec.address())
|
headPrm.WithAddress(exec.address())
|
||||||
|
|
|
@ -83,10 +83,9 @@ func WithLogger(l *logger.Logger) Option {
|
||||||
|
|
||||||
// WithLocalStorageEngine returns option to set local storage
|
// WithLocalStorageEngine returns option to set local storage
|
||||||
// instance.
|
// instance.
|
||||||
func WithLocalStorageEngine(e *engine.StorageEngine, state util.NodeState) Option {
|
func WithLocalStorageEngine(e *engine.StorageEngine) Option {
|
||||||
return func(c *cfg) {
|
return func(c *cfg) {
|
||||||
c.localStorage = &storageEngineWrapper{
|
c.localStorage = &storageEngineWrapper{
|
||||||
state: state,
|
|
||||||
storage: e,
|
storage: e,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -9,7 +9,6 @@ import (
|
||||||
internalclient "github.com/nspcc-dev/neofs-node/pkg/services/object/internal/client"
|
internalclient "github.com/nspcc-dev/neofs-node/pkg/services/object/internal/client"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/services/object/util"
|
"github.com/nspcc-dev/neofs-node/pkg/services/object/util"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement"
|
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement"
|
||||||
apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status"
|
|
||||||
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
|
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
|
||||||
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
|
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
|
||||||
)
|
)
|
||||||
|
@ -31,8 +30,6 @@ type clientWrapper struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
type storageEngineWrapper struct {
|
type storageEngineWrapper struct {
|
||||||
state util.NodeState
|
|
||||||
|
|
||||||
storage *engine.StorageEngine
|
storage *engine.StorageEngine
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -121,11 +118,6 @@ func (c *clientWrapper) searchObjects(exec *execCtx, info client.NodeInfo) ([]oi
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *storageEngineWrapper) search(exec *execCtx) ([]oid.ID, error) {
|
func (e *storageEngineWrapper) search(exec *execCtx) ([]oid.ID, error) {
|
||||||
if e.state != nil && e.state.IsMaintenance() {
|
|
||||||
var st apistatus.NodeUnderMaintenance
|
|
||||||
return nil, st
|
|
||||||
}
|
|
||||||
|
|
||||||
var selectPrm engine.SelectPrm
|
var selectPrm engine.SelectPrm
|
||||||
selectPrm.WithFilters(exec.searchFilters())
|
selectPrm.WithFilters(exec.searchFilters())
|
||||||
selectPrm.WithContainerID(exec.containerID())
|
selectPrm.WithContainerID(exec.containerID())
|
||||||
|
|
|
@ -1,9 +0,0 @@
|
||||||
package util
|
|
||||||
|
|
||||||
// NodeState is storage node state processed by Object service.
|
|
||||||
type NodeState interface {
|
|
||||||
// IsMaintenance checks if node is under maintenance. Node MUST NOT serve
|
|
||||||
// local object operations. Node MUST respond with apistatus.NodeUnderMaintenance
|
|
||||||
// error if IsMaintenance returns true.
|
|
||||||
IsMaintenance() bool
|
|
||||||
}
|
|
Loading…
Reference in a new issue