From 713aea06fafb50aa346605d88c15ab203c205855 Mon Sep 17 00:00:00 2001 From: Leonard Lyubich Date: Tue, 4 Oct 2022 17:01:16 +0400 Subject: [PATCH] [#1681] node: Block only Object service ops under maintenance In previous implementation node blocked any operation of local object storage in maintenance mode. There is a need to perform some storage operations like data evacuation or restoration. Do not call block storage engine in maintenance mode. Make all Object service operations to return `apistatus.NodeUnderMaintenance` error from each local op. Signed-off-by: Leonard Lyubich --- cmd/neofs-node/config.go | 22 ++++++++++++++++++ cmd/neofs-node/netmap.go | 18 ++------------- cmd/neofs-node/object.go | 31 +++++++++++++++++--------- pkg/services/object/get/service.go | 7 ++++++ pkg/services/object/get/util.go | 9 ++++++++ pkg/services/object/search/service.go | 7 ++++-- pkg/services/object/search/util.go | 14 ++++++++++-- pkg/services/object/util/node_state.go | 9 ++++++++ 8 files changed, 86 insertions(+), 31 deletions(-) create mode 100644 pkg/services/object/util/node_state.go diff --git a/cmd/neofs-node/config.go b/cmd/neofs-node/config.go index e753e07f..085c239d 100644 --- a/cmd/neofs-node/config.go +++ b/cmd/neofs-node/config.go @@ -335,6 +335,28 @@ type cfg struct { // current network map netMap atomicstd.Value // type netmap.NetMap + + // is node under maintenance + isMaintenance atomic.Bool +} + +// starts node's maintenance. +func (c *cfg) startMaintenance() { + c.isMaintenance.Store(true) + c.log.Info("started local node's maintenance") +} + +// stops node's maintenance. +func (c *cfg) stopMaintenance() { + c.isMaintenance.Store(false) + c.log.Info("stopped local node's maintenance") +} + +// IsMaintenance checks if storage node is under maintenance. +// +// Provides util.NodeState to Object service. +func (c *cfg) IsMaintenance() bool { + return c.isMaintenance.Load() } // ReadCurrentNetMap reads network map which has been cached at the diff --git a/cmd/neofs-node/netmap.go b/cmd/neofs-node/netmap.go index 1587e7bf..1af15956 100644 --- a/cmd/neofs-node/netmap.go +++ b/cmd/neofs-node/netmap.go @@ -16,7 +16,6 @@ import ( netmapTransportGRPC "github.com/nspcc-dev/neofs-node/pkg/network/transport/netmap/grpc" "github.com/nspcc-dev/neofs-node/pkg/services/control" netmapService "github.com/nspcc-dev/neofs-node/pkg/services/netmap" - apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status" netmapSDK "github.com/nspcc-dev/neofs-sdk-go/netmap" subnetid "github.com/nspcc-dev/neofs-sdk-go/subnet/id" "github.com/nspcc-dev/neofs-sdk-go/version" @@ -319,30 +318,17 @@ func addNewEpochAsyncNotificationHandler(c *cfg, h event.Handler) { var errRelayBootstrap = errors.New("setting netmap status is forbidden in relay mode") -var errNodeMaintenance apistatus.NodeUnderMaintenance - func (c *cfg) SetNetmapStatus(st control.NetmapStatus) error { switch st { default: return fmt.Errorf("unsupported status %v", st) case control.NetmapStatus_MAINTENANCE: - err := c.cfgObject.cfgLocalStorage.localStorage.BlockExecution(errNodeMaintenance) - if err != nil { - return fmt.Errorf("block execution of local object storage: %w", err) - } - - // TODO: #1691 think how to process two different actions which can fail both - + c.startMaintenance() return c.updateNetMapState((*nmClient.UpdatePeerPrm).SetMaintenance) case control.NetmapStatus_ONLINE, control.NetmapStatus_OFFLINE: } - err := c.cfgObject.cfgLocalStorage.localStorage.ResumeExecution() - if err != nil { - c.log.Error("failed to resume local object operations", - zap.String("error", err.Error()), - ) - } + c.stopMaintenance() if !c.needBootstrap() { return errRelayBootstrap diff --git a/cmd/neofs-node/object.go b/cmd/neofs-node/object.go index f1614090..844f84e2 100644 --- a/cmd/neofs-node/object.go +++ b/cmd/neofs-node/object.go @@ -253,18 +253,18 @@ func initObjectService(c *cfg) { c.workers = append(c.workers, pol) - var os putsvc.ObjectStorage + var os putsvc.ObjectStorage = engineWithoutNotifications{ + e: ls, + state: c, + } + if c.cfgNotifications.enabled { os = engineWithNotifications{ - e: ls, + base: os, nw: c.cfgNotifications.nw, ns: c.cfgNetmap.state, defaultTopic: c.cfgNotifications.defaultTopic, } - } else { - os = engineWithoutNotifications{ - e: ls, - } } sPut := putsvc.NewService( @@ -291,7 +291,7 @@ func initObjectService(c *cfg) { sSearch := searchsvc.New( searchsvc.WithLogger(c.log), - searchsvc.WithLocalStorageEngine(ls), + searchsvc.WithLocalStorageEngine(ls, c), searchsvc.WithClientConstructor(coreConstructor), searchsvc.WithTraverserGenerator( traverseGen.WithTraverseOptions( @@ -318,6 +318,7 @@ func initObjectService(c *cfg) { ), getsvc.WithNetMapSource(c.netMapSource), getsvc.WithKeyStorage(keyStorage), + getsvc.WithNodeState(c), ) *c.cfgObject.getSvc = *sGet // need smth better @@ -552,15 +553,15 @@ func (c *reputationClientConstructor) Get(info coreclient.NodeInfo) (coreclient. } type engineWithNotifications struct { - e *engine.StorageEngine - nw notificationWriter - ns netmap.State + base putsvc.ObjectStorage + nw notificationWriter + ns netmap.State defaultTopic string } func (e engineWithNotifications) Put(o *objectSDK.Object) error { - if err := engine.Put(e.e, o); err != nil { + if err := e.base.Put(o); err != nil { return err } @@ -582,8 +583,16 @@ func (e engineWithNotifications) Put(o *objectSDK.Object) error { type engineWithoutNotifications struct { e *engine.StorageEngine + + state util.NodeState } func (e engineWithoutNotifications) Put(o *objectSDK.Object) error { + if e.state.IsMaintenance() { + var st apistatus.NodeUnderMaintenance + + return st + } + return engine.Put(e.e, o) } diff --git a/pkg/services/object/get/service.go b/pkg/services/object/get/service.go index c48fc554..aeda2b15 100644 --- a/pkg/services/object/get/service.go +++ b/pkg/services/object/get/service.go @@ -130,3 +130,10 @@ func WithKeyStorage(store *util.KeyStorage) Option { c.keyStore = store } } + +// WithNodeState provides util.NodeState to Service. +func WithNodeState(v util.NodeState) Option { + return func(c *cfg) { + c.localStorage.(*storageEngineWrapper).state = v + } +} diff --git a/pkg/services/object/get/util.go b/pkg/services/object/get/util.go index 104f2a0d..be8c22d2 100644 --- a/pkg/services/object/get/util.go +++ b/pkg/services/object/get/util.go @@ -8,6 +8,8 @@ import ( "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine" internal "github.com/nspcc-dev/neofs-node/pkg/services/object/internal/client" internalclient "github.com/nspcc-dev/neofs-node/pkg/services/object/internal/client" + "github.com/nspcc-dev/neofs-node/pkg/services/object/util" + apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status" "github.com/nspcc-dev/neofs-sdk-go/object" ) @@ -26,6 +28,8 @@ type clientWrapper struct { } type storageEngineWrapper struct { + state util.NodeState + engine *engine.StorageEngine } @@ -170,6 +174,11 @@ func (c *clientWrapper) getObject(exec *execCtx, info coreclient.NodeInfo) (*obj } func (e *storageEngineWrapper) get(exec *execCtx) (*object.Object, error) { + if e.state != nil && e.state.IsMaintenance() { + var st apistatus.NodeUnderMaintenance + return nil, st + } + if exec.headOnly() { var headPrm engine.HeadPrm headPrm.WithAddress(exec.address()) diff --git a/pkg/services/object/search/service.go b/pkg/services/object/search/service.go index 43ca797f..cf14cdc3 100644 --- a/pkg/services/object/search/service.go +++ b/pkg/services/object/search/service.go @@ -81,9 +81,12 @@ func WithLogger(l *logger.Logger) Option { // WithLocalStorageEngine returns option to set local storage // instance. -func WithLocalStorageEngine(e *engine.StorageEngine) Option { +func WithLocalStorageEngine(e *engine.StorageEngine, state util.NodeState) Option { return func(c *cfg) { - c.localStorage = (*storageEngineWrapper)(e) + c.localStorage = &storageEngineWrapper{ + state: state, + storage: e, + } } } diff --git a/pkg/services/object/search/util.go b/pkg/services/object/search/util.go index 772b0991..411f0db9 100644 --- a/pkg/services/object/search/util.go +++ b/pkg/services/object/search/util.go @@ -9,6 +9,7 @@ import ( internalclient "github.com/nspcc-dev/neofs-node/pkg/services/object/internal/client" "github.com/nspcc-dev/neofs-node/pkg/services/object/util" "github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement" + apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status" cid "github.com/nspcc-dev/neofs-sdk-go/container/id" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" ) @@ -29,7 +30,11 @@ type clientWrapper struct { client client.MultiAddressClient } -type storageEngineWrapper engine.StorageEngine +type storageEngineWrapper struct { + state util.NodeState + + storage *engine.StorageEngine +} type traverseGeneratorWrapper util.TraverserGenerator @@ -120,11 +125,16 @@ func (c *clientWrapper) searchObjects(exec *execCtx, info client.NodeInfo) ([]oi } func (e *storageEngineWrapper) search(exec *execCtx) ([]oid.ID, error) { + if e.state != nil && e.state.IsMaintenance() { + var st apistatus.NodeUnderMaintenance + return nil, st + } + var selectPrm engine.SelectPrm selectPrm.WithFilters(exec.searchFilters()) selectPrm.WithContainerID(exec.containerID()) - r, err := (*engine.StorageEngine)(e).Select(selectPrm) + r, err := e.storage.Select(selectPrm) if err != nil { return nil, err } diff --git a/pkg/services/object/util/node_state.go b/pkg/services/object/util/node_state.go new file mode 100644 index 00000000..7ef31e3c --- /dev/null +++ b/pkg/services/object/util/node_state.go @@ -0,0 +1,9 @@ +package util + +// NodeState is storage node state processed by Object service. +type NodeState interface { + // IsMaintenance checks if node is under maintenance. Node MUST NOT serve + // local object operations. Node MUST respond with apistatus.NodeUnderMaintenance + // error if IsMaintenance returns true. + IsMaintenance() bool +}