[#1681] node: Block only Object service ops under maintenance
In previous implementation node blocked any operation of local object storage in maintenance mode. There is a need to perform some storage operations like data evacuation or restoration. Do not call block storage engine in maintenance mode. Make all Object service operations to return `apistatus.NodeUnderMaintenance` error from each local op. Signed-off-by: Leonard Lyubich <ctulhurider@gmail.com>
This commit is contained in:
parent
082602b668
commit
713aea06fa
8 changed files with 86 additions and 31 deletions
|
@ -335,6 +335,28 @@ type cfg struct {
|
||||||
|
|
||||||
// current network map
|
// current network map
|
||||||
netMap atomicstd.Value // type netmap.NetMap
|
netMap atomicstd.Value // type netmap.NetMap
|
||||||
|
|
||||||
|
// is node under maintenance
|
||||||
|
isMaintenance atomic.Bool
|
||||||
|
}
|
||||||
|
|
||||||
|
// starts node's maintenance.
|
||||||
|
func (c *cfg) startMaintenance() {
|
||||||
|
c.isMaintenance.Store(true)
|
||||||
|
c.log.Info("started local node's maintenance")
|
||||||
|
}
|
||||||
|
|
||||||
|
// stops node's maintenance.
|
||||||
|
func (c *cfg) stopMaintenance() {
|
||||||
|
c.isMaintenance.Store(false)
|
||||||
|
c.log.Info("stopped local node's maintenance")
|
||||||
|
}
|
||||||
|
|
||||||
|
// IsMaintenance checks if storage node is under maintenance.
|
||||||
|
//
|
||||||
|
// Provides util.NodeState to Object service.
|
||||||
|
func (c *cfg) IsMaintenance() bool {
|
||||||
|
return c.isMaintenance.Load()
|
||||||
}
|
}
|
||||||
|
|
||||||
// ReadCurrentNetMap reads network map which has been cached at the
|
// ReadCurrentNetMap reads network map which has been cached at the
|
||||||
|
|
|
@ -16,7 +16,6 @@ import (
|
||||||
netmapTransportGRPC "github.com/nspcc-dev/neofs-node/pkg/network/transport/netmap/grpc"
|
netmapTransportGRPC "github.com/nspcc-dev/neofs-node/pkg/network/transport/netmap/grpc"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/services/control"
|
"github.com/nspcc-dev/neofs-node/pkg/services/control"
|
||||||
netmapService "github.com/nspcc-dev/neofs-node/pkg/services/netmap"
|
netmapService "github.com/nspcc-dev/neofs-node/pkg/services/netmap"
|
||||||
apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status"
|
|
||||||
netmapSDK "github.com/nspcc-dev/neofs-sdk-go/netmap"
|
netmapSDK "github.com/nspcc-dev/neofs-sdk-go/netmap"
|
||||||
subnetid "github.com/nspcc-dev/neofs-sdk-go/subnet/id"
|
subnetid "github.com/nspcc-dev/neofs-sdk-go/subnet/id"
|
||||||
"github.com/nspcc-dev/neofs-sdk-go/version"
|
"github.com/nspcc-dev/neofs-sdk-go/version"
|
||||||
|
@ -319,30 +318,17 @@ func addNewEpochAsyncNotificationHandler(c *cfg, h event.Handler) {
|
||||||
|
|
||||||
var errRelayBootstrap = errors.New("setting netmap status is forbidden in relay mode")
|
var errRelayBootstrap = errors.New("setting netmap status is forbidden in relay mode")
|
||||||
|
|
||||||
var errNodeMaintenance apistatus.NodeUnderMaintenance
|
|
||||||
|
|
||||||
func (c *cfg) SetNetmapStatus(st control.NetmapStatus) error {
|
func (c *cfg) SetNetmapStatus(st control.NetmapStatus) error {
|
||||||
switch st {
|
switch st {
|
||||||
default:
|
default:
|
||||||
return fmt.Errorf("unsupported status %v", st)
|
return fmt.Errorf("unsupported status %v", st)
|
||||||
case control.NetmapStatus_MAINTENANCE:
|
case control.NetmapStatus_MAINTENANCE:
|
||||||
err := c.cfgObject.cfgLocalStorage.localStorage.BlockExecution(errNodeMaintenance)
|
c.startMaintenance()
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("block execution of local object storage: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO: #1691 think how to process two different actions which can fail both
|
|
||||||
|
|
||||||
return c.updateNetMapState((*nmClient.UpdatePeerPrm).SetMaintenance)
|
return c.updateNetMapState((*nmClient.UpdatePeerPrm).SetMaintenance)
|
||||||
case control.NetmapStatus_ONLINE, control.NetmapStatus_OFFLINE:
|
case control.NetmapStatus_ONLINE, control.NetmapStatus_OFFLINE:
|
||||||
}
|
}
|
||||||
|
|
||||||
err := c.cfgObject.cfgLocalStorage.localStorage.ResumeExecution()
|
c.stopMaintenance()
|
||||||
if err != nil {
|
|
||||||
c.log.Error("failed to resume local object operations",
|
|
||||||
zap.String("error", err.Error()),
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
if !c.needBootstrap() {
|
if !c.needBootstrap() {
|
||||||
return errRelayBootstrap
|
return errRelayBootstrap
|
||||||
|
|
|
@ -253,18 +253,18 @@ func initObjectService(c *cfg) {
|
||||||
|
|
||||||
c.workers = append(c.workers, pol)
|
c.workers = append(c.workers, pol)
|
||||||
|
|
||||||
var os putsvc.ObjectStorage
|
var os putsvc.ObjectStorage = engineWithoutNotifications{
|
||||||
|
e: ls,
|
||||||
|
state: c,
|
||||||
|
}
|
||||||
|
|
||||||
if c.cfgNotifications.enabled {
|
if c.cfgNotifications.enabled {
|
||||||
os = engineWithNotifications{
|
os = engineWithNotifications{
|
||||||
e: ls,
|
base: os,
|
||||||
nw: c.cfgNotifications.nw,
|
nw: c.cfgNotifications.nw,
|
||||||
ns: c.cfgNetmap.state,
|
ns: c.cfgNetmap.state,
|
||||||
defaultTopic: c.cfgNotifications.defaultTopic,
|
defaultTopic: c.cfgNotifications.defaultTopic,
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
os = engineWithoutNotifications{
|
|
||||||
e: ls,
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
sPut := putsvc.NewService(
|
sPut := putsvc.NewService(
|
||||||
|
@ -291,7 +291,7 @@ func initObjectService(c *cfg) {
|
||||||
|
|
||||||
sSearch := searchsvc.New(
|
sSearch := searchsvc.New(
|
||||||
searchsvc.WithLogger(c.log),
|
searchsvc.WithLogger(c.log),
|
||||||
searchsvc.WithLocalStorageEngine(ls),
|
searchsvc.WithLocalStorageEngine(ls, c),
|
||||||
searchsvc.WithClientConstructor(coreConstructor),
|
searchsvc.WithClientConstructor(coreConstructor),
|
||||||
searchsvc.WithTraverserGenerator(
|
searchsvc.WithTraverserGenerator(
|
||||||
traverseGen.WithTraverseOptions(
|
traverseGen.WithTraverseOptions(
|
||||||
|
@ -318,6 +318,7 @@ func initObjectService(c *cfg) {
|
||||||
),
|
),
|
||||||
getsvc.WithNetMapSource(c.netMapSource),
|
getsvc.WithNetMapSource(c.netMapSource),
|
||||||
getsvc.WithKeyStorage(keyStorage),
|
getsvc.WithKeyStorage(keyStorage),
|
||||||
|
getsvc.WithNodeState(c),
|
||||||
)
|
)
|
||||||
|
|
||||||
*c.cfgObject.getSvc = *sGet // need smth better
|
*c.cfgObject.getSvc = *sGet // need smth better
|
||||||
|
@ -552,7 +553,7 @@ func (c *reputationClientConstructor) Get(info coreclient.NodeInfo) (coreclient.
|
||||||
}
|
}
|
||||||
|
|
||||||
type engineWithNotifications struct {
|
type engineWithNotifications struct {
|
||||||
e *engine.StorageEngine
|
base putsvc.ObjectStorage
|
||||||
nw notificationWriter
|
nw notificationWriter
|
||||||
ns netmap.State
|
ns netmap.State
|
||||||
|
|
||||||
|
@ -560,7 +561,7 @@ type engineWithNotifications struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e engineWithNotifications) Put(o *objectSDK.Object) error {
|
func (e engineWithNotifications) Put(o *objectSDK.Object) error {
|
||||||
if err := engine.Put(e.e, o); err != nil {
|
if err := e.base.Put(o); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -582,8 +583,16 @@ func (e engineWithNotifications) Put(o *objectSDK.Object) error {
|
||||||
|
|
||||||
type engineWithoutNotifications struct {
|
type engineWithoutNotifications struct {
|
||||||
e *engine.StorageEngine
|
e *engine.StorageEngine
|
||||||
|
|
||||||
|
state util.NodeState
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e engineWithoutNotifications) Put(o *objectSDK.Object) error {
|
func (e engineWithoutNotifications) Put(o *objectSDK.Object) error {
|
||||||
|
if e.state.IsMaintenance() {
|
||||||
|
var st apistatus.NodeUnderMaintenance
|
||||||
|
|
||||||
|
return st
|
||||||
|
}
|
||||||
|
|
||||||
return engine.Put(e.e, o)
|
return engine.Put(e.e, o)
|
||||||
}
|
}
|
||||||
|
|
|
@ -130,3 +130,10 @@ func WithKeyStorage(store *util.KeyStorage) Option {
|
||||||
c.keyStore = store
|
c.keyStore = store
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// WithNodeState provides util.NodeState to Service.
|
||||||
|
func WithNodeState(v util.NodeState) Option {
|
||||||
|
return func(c *cfg) {
|
||||||
|
c.localStorage.(*storageEngineWrapper).state = v
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -8,6 +8,8 @@ import (
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine"
|
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine"
|
||||||
internal "github.com/nspcc-dev/neofs-node/pkg/services/object/internal/client"
|
internal "github.com/nspcc-dev/neofs-node/pkg/services/object/internal/client"
|
||||||
internalclient "github.com/nspcc-dev/neofs-node/pkg/services/object/internal/client"
|
internalclient "github.com/nspcc-dev/neofs-node/pkg/services/object/internal/client"
|
||||||
|
"github.com/nspcc-dev/neofs-node/pkg/services/object/util"
|
||||||
|
apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status"
|
||||||
"github.com/nspcc-dev/neofs-sdk-go/object"
|
"github.com/nspcc-dev/neofs-sdk-go/object"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -26,6 +28,8 @@ type clientWrapper struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
type storageEngineWrapper struct {
|
type storageEngineWrapper struct {
|
||||||
|
state util.NodeState
|
||||||
|
|
||||||
engine *engine.StorageEngine
|
engine *engine.StorageEngine
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -170,6 +174,11 @@ func (c *clientWrapper) getObject(exec *execCtx, info coreclient.NodeInfo) (*obj
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *storageEngineWrapper) get(exec *execCtx) (*object.Object, error) {
|
func (e *storageEngineWrapper) get(exec *execCtx) (*object.Object, error) {
|
||||||
|
if e.state != nil && e.state.IsMaintenance() {
|
||||||
|
var st apistatus.NodeUnderMaintenance
|
||||||
|
return nil, st
|
||||||
|
}
|
||||||
|
|
||||||
if exec.headOnly() {
|
if exec.headOnly() {
|
||||||
var headPrm engine.HeadPrm
|
var headPrm engine.HeadPrm
|
||||||
headPrm.WithAddress(exec.address())
|
headPrm.WithAddress(exec.address())
|
||||||
|
|
|
@ -81,9 +81,12 @@ func WithLogger(l *logger.Logger) Option {
|
||||||
|
|
||||||
// WithLocalStorageEngine returns option to set local storage
|
// WithLocalStorageEngine returns option to set local storage
|
||||||
// instance.
|
// instance.
|
||||||
func WithLocalStorageEngine(e *engine.StorageEngine) Option {
|
func WithLocalStorageEngine(e *engine.StorageEngine, state util.NodeState) Option {
|
||||||
return func(c *cfg) {
|
return func(c *cfg) {
|
||||||
c.localStorage = (*storageEngineWrapper)(e)
|
c.localStorage = &storageEngineWrapper{
|
||||||
|
state: state,
|
||||||
|
storage: e,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -9,6 +9,7 @@ import (
|
||||||
internalclient "github.com/nspcc-dev/neofs-node/pkg/services/object/internal/client"
|
internalclient "github.com/nspcc-dev/neofs-node/pkg/services/object/internal/client"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/services/object/util"
|
"github.com/nspcc-dev/neofs-node/pkg/services/object/util"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement"
|
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement"
|
||||||
|
apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status"
|
||||||
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
|
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
|
||||||
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
|
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
|
||||||
)
|
)
|
||||||
|
@ -29,7 +30,11 @@ type clientWrapper struct {
|
||||||
client client.MultiAddressClient
|
client client.MultiAddressClient
|
||||||
}
|
}
|
||||||
|
|
||||||
type storageEngineWrapper engine.StorageEngine
|
type storageEngineWrapper struct {
|
||||||
|
state util.NodeState
|
||||||
|
|
||||||
|
storage *engine.StorageEngine
|
||||||
|
}
|
||||||
|
|
||||||
type traverseGeneratorWrapper util.TraverserGenerator
|
type traverseGeneratorWrapper util.TraverserGenerator
|
||||||
|
|
||||||
|
@ -120,11 +125,16 @@ func (c *clientWrapper) searchObjects(exec *execCtx, info client.NodeInfo) ([]oi
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *storageEngineWrapper) search(exec *execCtx) ([]oid.ID, error) {
|
func (e *storageEngineWrapper) search(exec *execCtx) ([]oid.ID, error) {
|
||||||
|
if e.state != nil && e.state.IsMaintenance() {
|
||||||
|
var st apistatus.NodeUnderMaintenance
|
||||||
|
return nil, st
|
||||||
|
}
|
||||||
|
|
||||||
var selectPrm engine.SelectPrm
|
var selectPrm engine.SelectPrm
|
||||||
selectPrm.WithFilters(exec.searchFilters())
|
selectPrm.WithFilters(exec.searchFilters())
|
||||||
selectPrm.WithContainerID(exec.containerID())
|
selectPrm.WithContainerID(exec.containerID())
|
||||||
|
|
||||||
r, err := (*engine.StorageEngine)(e).Select(selectPrm)
|
r, err := e.storage.Select(selectPrm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
9
pkg/services/object/util/node_state.go
Normal file
9
pkg/services/object/util/node_state.go
Normal file
|
@ -0,0 +1,9 @@
|
||||||
|
package util
|
||||||
|
|
||||||
|
// NodeState is storage node state processed by Object service.
|
||||||
|
type NodeState interface {
|
||||||
|
// IsMaintenance checks if node is under maintenance. Node MUST NOT serve
|
||||||
|
// local object operations. Node MUST respond with apistatus.NodeUnderMaintenance
|
||||||
|
// error if IsMaintenance returns true.
|
||||||
|
IsMaintenance() bool
|
||||||
|
}
|
Loading…
Reference in a new issue