From e1be0180f654686655d48538cd92aacbddf55700 Mon Sep 17 00:00:00 2001 From: Pavel Karpy Date: Wed, 19 Oct 2022 00:33:45 +0300 Subject: [PATCH] [#1329] tree: Sync trees when a node first time appears in a container Signed-off-by: Pavel Karpy --- cmd/neofs-node/config.go | 15 ---- cmd/neofs-node/tree.go | 65 ++++++++-------- pkg/services/tree/options.go | 13 +++- pkg/services/tree/service.go | 14 ++++ pkg/services/tree/signature_test.go | 16 ++++ pkg/services/tree/sync.go | 110 ++++++++++++++++++++++++++++ 6 files changed, 181 insertions(+), 52 deletions(-) diff --git a/cmd/neofs-node/config.go b/cmd/neofs-node/config.go index 21ef0d94e..719398acc 100644 --- a/cmd/neofs-node/config.go +++ b/cmd/neofs-node/config.go @@ -305,10 +305,6 @@ type internals struct { workers []worker closers []func() - // onlineStateHandlers are executed in a separate - // goroutine on every !ONLINE -> ONLINE state transition - onlineStateHandlers []func(context.Context) - apiVersion version.Version healthStatus *atomic.Int32 // is node under maintenance @@ -837,13 +833,6 @@ func (c *cfg) LocalNodeInfo() (*netmapV2.NodeInfo, error) { // Called with nil when storage node is outside the NeoFS network map // (before entering the network and after leaving it). func (c *cfg) handleLocalNodeInfo(ni *netmap.NodeInfo) { - if c.cfgNetmap.state.controlNetmapStatus() != control.NetmapStatus_ONLINE && - ni != nil && ni.IsOnline() { - for _, h := range c.onlineStateHandlers { - go h(c.ctx) - } - } - c.cfgNetmap.state.setNodeInfo(ni) } @@ -950,7 +939,3 @@ func (c *cfg) configWatcher(ctx context.Context) { } } } - -func (c *cfg) addOnlineStateHandler(h func(ctx context.Context)) { - c.onlineStateHandlers = append(c.onlineStateHandlers, h) -} diff --git a/cmd/neofs-node/tree.go b/cmd/neofs-node/tree.go index befa5fdc2..e36d67303 100644 --- a/cmd/neofs-node/tree.go +++ b/cmd/neofs-node/tree.go @@ -5,16 +5,34 @@ import ( "errors" treeconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/tree" + "github.com/nspcc-dev/neofs-node/pkg/core/container" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/pilorama" containerClient "github.com/nspcc-dev/neofs-node/pkg/morph/client/container" "github.com/nspcc-dev/neofs-node/pkg/morph/event" containerEvent "github.com/nspcc-dev/neofs-node/pkg/morph/event/container" - "github.com/nspcc-dev/neofs-node/pkg/services/control" "github.com/nspcc-dev/neofs-node/pkg/services/tree" - "github.com/nspcc-dev/neofs-node/pkg/util/logger" + cid "github.com/nspcc-dev/neofs-sdk-go/container/id" "go.uber.org/zap" ) +type cnrSource struct { + // cache of raw client. + src container.Source + // raw client; no need to cache request results + // since sync is performed once in epoch and is + // expected to receive different results every + // call. + cli *containerClient.Client +} + +func (c cnrSource) Get(id cid.ID) (*container.Container, error) { + return c.src.Get(id) +} + +func (c cnrSource) List() ([]cid.ID, error) { + return c.cli.List(nil) +} + func initTreeService(c *cfg) { treeConfig := treeconfig.Tree(c.appCfg) if !treeConfig.Enabled() { @@ -23,7 +41,10 @@ func initTreeService(c *cfg) { } c.treeService = tree.New( - tree.WithContainerSource(c.cfgObject.cnrSource), + tree.WithContainerSource(cnrSource{ + src: c.cfgObject.cnrSource, + cli: c.shared.cnrClient, + }), tree.WithEACLSource(c.cfgObject.eaclSource), tree.WithNetmapSource(c.netMapSource), tree.WithPrivateKey(&c.key.PrivateKey), @@ -41,15 +62,12 @@ func initTreeService(c *cfg) { c.treeService.Start(ctx) })) - syncTreeFunc := func(ctx context.Context) { - syncTrees(ctx, c.treeService, c.shared.cnrClient, c.log) - } - - if c.cfgNetmap.state.controlNetmapStatus() == control.NetmapStatus_ONLINE { - c.workers = append(c.workers, newWorkerFromFunc(syncTreeFunc)) - } - - c.addOnlineStateHandler(syncTreeFunc) + addNewEpochNotificationHandler(c, func(_ event.Event) { + err := c.treeService.SynchronizeAll() + if err != nil { + c.log.Error("could not synchronize Tree Service", zap.Error(err)) + } + }) subscribeToContainerRemoval(c, func(e event.Event) { ev := e.(containerEvent.DeleteSuccess) @@ -66,26 +84,3 @@ func initTreeService(c *cfg) { c.onShutdown(c.treeService.Shutdown) } - -func syncTrees(ctx context.Context, treeSvc *tree.Service, cnrCli *containerClient.Client, log *logger.Logger) { - log.Info("synchronizing trees...") - - ids, err := cnrCli.List(nil) - if err != nil { - log.Error("trees are not synchronized", zap.Error(err)) - return - } - - for _, id := range ids { - err = treeSvc.SynchronizeAllTrees(ctx, id) - if err != nil && !errors.Is(err, tree.ErrNotInContainer) { - log.Warn( - "tree synchronization failed", - zap.Stringer("cid", id), - zap.Error(err), - ) - } - } - - log.Info("trees have been synchronized") -} diff --git a/pkg/services/tree/options.go b/pkg/services/tree/options.go index b3f01617a..8646e2a1e 100644 --- a/pkg/services/tree/options.go +++ b/pkg/services/tree/options.go @@ -8,14 +8,23 @@ import ( "github.com/nspcc-dev/neofs-node/pkg/core/netmap" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/pilorama" "github.com/nspcc-dev/neofs-node/pkg/util/logger" + cid "github.com/nspcc-dev/neofs-sdk-go/container/id" ) +type ContainerSource interface { + container.Source + // List must return list of all the containers in the NeoFS network + // at the moment of a call and any error that does not allow fetching + // container information. + List() ([]cid.ID, error) +} + type cfg struct { log *logger.Logger key *ecdsa.PrivateKey rawPub []byte nmSource netmap.Source - cnrSource container.Source + cnrSource ContainerSource eaclSource container.EACLSource forest pilorama.Forest // replication-related parameters @@ -29,7 +38,7 @@ type Option func(*cfg) // WithContainerSource sets a container source for a tree service. // This option is required. -func WithContainerSource(src container.Source) Option { +func WithContainerSource(src ContainerSource) Option { return func(c *cfg) { c.cnrSource = src } diff --git a/pkg/services/tree/service.go b/pkg/services/tree/service.go index 16ec780ca..18dc1518a 100644 --- a/pkg/services/tree/service.go +++ b/pkg/services/tree/service.go @@ -24,6 +24,9 @@ type Service struct { replicationTasks chan replicationTask closeCh chan struct{} containerCache containerCache + + syncChan chan struct{} + cnrMap map[cidSDK.ID]struct{} } var _ TreeServiceServer = (*Service)(nil) @@ -48,6 +51,8 @@ func New(opts ...Option) *Service { s.replicateCh = make(chan movePair, s.replicatorChannelCapacity) s.replicationTasks = make(chan replicationTask, s.replicatorWorkerCount) s.containerCache.init(s.containerCacheSize) + s.cnrMap = make(map[cidSDK.ID]struct{}) + s.syncChan = make(chan struct{}) return &s } @@ -55,6 +60,15 @@ func New(opts ...Option) *Service { // Start starts the service. func (s *Service) Start(ctx context.Context) { go s.replicateLoop(ctx) + go s.syncLoop(ctx) + + select { + case <-s.closeCh: + case <-ctx.Done(): + default: + // initial sync + s.syncChan <- struct{}{} + } } // Shutdown shutdowns the service. diff --git a/pkg/services/tree/signature_test.go b/pkg/services/tree/signature_test.go index aaab9a00c..6e3189cc6 100644 --- a/pkg/services/tree/signature_test.go +++ b/pkg/services/tree/signature_test.go @@ -29,6 +29,22 @@ type dummyNetmapSource struct { type dummyContainerSource map[string]*containercore.Container +func (s dummyContainerSource) List() ([]cid.ID, error) { + res := make([]cid.ID, 0, len(s)) + var cnr cid.ID + + for cidStr := range s { + err := cnr.DecodeString(cidStr) + if err != nil { + return nil, err + } + + res = append(res, cnr) + } + + return res, nil +} + func (s dummyContainerSource) Get(id cid.ID) (*containercore.Container, error) { cnt, ok := s[id.String()] if !ok { diff --git a/pkg/services/tree/sync.go b/pkg/services/tree/sync.go index 37d559cbe..8b964c186 100644 --- a/pkg/services/tree/sync.go +++ b/pkg/services/tree/sync.go @@ -200,3 +200,113 @@ func (s *Service) synchronizeSingle(ctx context.Context, d pilorama.CIDDescripto height = newHeight } } + +// ErrAlreadySyncing is returned when a service synchronization has already +// been started. +var ErrAlreadySyncing = errors.New("service is being synchronized") + +// ErrShuttingDown is returned when the service is shitting down and could not +// accept any calls. +var ErrShuttingDown = errors.New("service is shutting down") + +// SynchronizeAll forces tree service to synchronize all the trees according to +// netmap information. Must not be called before Service.Start. +// Returns ErrAlreadySyncing if synchronization has been started and blocked +// by another routine. +// Note: non-blocking operation. +func (s *Service) SynchronizeAll() error { + select { + case <-s.closeCh: + return ErrShuttingDown + default: + } + + select { + case s.syncChan <- struct{}{}: + return nil + default: + return ErrAlreadySyncing + } +} + +func (s *Service) syncLoop(ctx context.Context) { + for { + select { + case <-s.closeCh: + return + case <-ctx.Done(): + return + case <-s.syncChan: + s.log.Debug("syncing trees...") + + cnrs, err := s.cfg.cnrSource.List() + if err != nil { + s.log.Error("could not fetch containers", zap.Error(err)) + continue + } + + newMap := make(map[cid.ID]struct{}, len(s.cnrMap)) + cnrsToSync := make([]cid.ID, 0, len(cnrs)) + + for _, cnr := range cnrs { + _, pos, err := s.getContainerNodes(cnr) + if err != nil { + s.log.Error("could not calculate container nodes", + zap.Stringer("cid", cnr), + zap.Error(err)) + delete(s.cnrMap, cnr) + + continue + } + + if pos < 0 { + // node is not included in the container. + continue + } + + _, ok := s.cnrMap[cnr] + if ok { + // known container; already in sync. + delete(s.cnrMap, cnr) + newMap[cnr] = struct{}{} + } else { + // unknown container; need to sync. + cnrsToSync = append(cnrsToSync, cnr) + } + } + + // sync new containers + for _, cnr := range cnrsToSync { + s.log.Debug("syncing container trees...", zap.Stringer("cid", cnr)) + + err = s.SynchronizeAllTrees(ctx, cnr) + if err != nil { + s.log.Error("could not sync trees", zap.Stringer("cid", cnr), zap.Error(err)) + continue + } + + // mark as synced + newMap[cnr] = struct{}{} + + s.log.Debug("container trees have been synced", zap.Stringer("cid", cnr)) + } + + // remove stored redundant trees + for cnr := range s.cnrMap { + s.log.Debug("removing redundant trees...", zap.Stringer("cid", cnr)) + + err = s.DropTree(ctx, cnr, "") // TODO: #1940 drop all the trees here + if err != nil { + s.log.Error("could not remove redundant tree", + zap.Stringer("cid", cnr), + zap.Error(err)) + continue + } + } + + s.cnrMap = newMap + + s.log.Debug("trees have been synchronized") + } + } +}