From f4a3fa2977de8b929af1b9cdd7b245af2c4ad297 Mon Sep 17 00:00:00 2001 From: Pavel Karpy Date: Thu, 6 Oct 2022 23:18:46 +0300 Subject: [PATCH] [#1329] tree: Sync tree on startup Signed-off-by: Pavel Karpy --- cmd/neofs-node/config.go | 18 ++++++++++++++++ cmd/neofs-node/container.go | 2 ++ cmd/neofs-node/tree.go | 42 +++++++++++++++++++++++++++++++++++++ 3 files changed, 62 insertions(+) diff --git a/cmd/neofs-node/config.go b/cmd/neofs-node/config.go index e6abbfd8..3f2ac1fe 100644 --- a/cmd/neofs-node/config.go +++ b/cmd/neofs-node/config.go @@ -42,6 +42,7 @@ import ( "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/writecache" "github.com/nspcc-dev/neofs-node/pkg/metrics" "github.com/nspcc-dev/neofs-node/pkg/morph/client" + containerClient "github.com/nspcc-dev/neofs-node/pkg/morph/client/container" nmClient "github.com/nspcc-dev/neofs-node/pkg/morph/client/netmap" "github.com/nspcc-dev/neofs-node/pkg/morph/event" netmap2 "github.com/nspcc-dev/neofs-node/pkg/morph/event/netmap" @@ -305,6 +306,10 @@ type internals struct { workers []worker closers []func() + // onlineStateHandlers are executed in a separate + // goroutine on every !ONLINE -> ONLINE state transition + onlineStateHandlers []func(context.Context) + apiVersion version.Version healthStatus *atomic.Int32 // is node under maintenance @@ -347,6 +352,8 @@ type shared struct { netMap atomicstd.Value // type netmap.NetMap netMapSource netmapCore.Source + cnrClient *containerClient.Client + respSvc *response.Service replicator *replicator.Replicator @@ -831,6 +838,13 @@ func (c *cfg) LocalNodeInfo() (*netmapV2.NodeInfo, error) { // Called with nil when storage node is outside the NeoFS network map // (before entering the network and after leaving it). func (c *cfg) handleLocalNodeInfo(ni *netmap.NodeInfo) { + if c.cfgNetmap.state.controlNetmapStatus() != control.NetmapStatus_ONLINE && + ni != nil && ni.IsOnline() { + for _, h := range c.onlineStateHandlers { + go h(c.ctx) + } + } + c.cfgNetmap.state.setNodeInfo(ni) } @@ -935,3 +949,7 @@ func (c *cfg) configWatcher(ctx context.Context) { } } } + +func (c *cfg) addOnlineStateHandler(h func(ctx context.Context)) { + c.onlineStateHandlers = append(c.onlineStateHandlers, h) +} diff --git a/cmd/neofs-node/container.go b/cmd/neofs-node/container.go index 2cbe3093..1e57873a 100644 --- a/cmd/neofs-node/container.go +++ b/cmd/neofs-node/container.go @@ -45,6 +45,8 @@ func initContainerService(c *cfg) { wrap, err := cntClient.NewFromMorph(c.cfgMorph.client, c.cfgContainer.scriptHash, 0, cntClient.TryNotary()) fatalOnErr(err) + c.shared.cnrClient = wrap + // container wrapper that always sends non-notary // requests wrapperNoNotary, err := cntClient.NewFromMorph(c.cfgMorph.client, c.cfgContainer.scriptHash, 0) diff --git a/cmd/neofs-node/tree.go b/cmd/neofs-node/tree.go index f7050238..cd38d42a 100644 --- a/cmd/neofs-node/tree.go +++ b/cmd/neofs-node/tree.go @@ -6,9 +6,12 @@ import ( treeconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/tree" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/pilorama" + containerClient "github.com/nspcc-dev/neofs-node/pkg/morph/client/container" "github.com/nspcc-dev/neofs-node/pkg/morph/event" containerEvent "github.com/nspcc-dev/neofs-node/pkg/morph/event/container" + "github.com/nspcc-dev/neofs-node/pkg/services/control" "github.com/nspcc-dev/neofs-node/pkg/services/tree" + "github.com/nspcc-dev/neofs-node/pkg/util/logger" "go.uber.org/zap" ) @@ -38,6 +41,16 @@ func initTreeService(c *cfg) { c.treeService.Start(ctx) })) + syncTreeFunc := func(ctx context.Context) { + syncTrees(ctx, c.treeService, c.shared.cnrClient, c.log) + } + + if c.cfgNetmap.state.controlNetmapStatus() == control.NetmapStatus_ONLINE { + c.workers = append(c.workers, newWorkerFromFunc(syncTreeFunc)) + } + + c.addOnlineStateHandler(syncTreeFunc) + subscribeToContainerRemoval(c, func(e event.Event) { ev := e.(containerEvent.DeleteSuccess) @@ -53,3 +66,32 @@ func initTreeService(c *cfg) { c.onShutdown(c.treeService.Shutdown) } + +func syncTrees(ctx context.Context, treeSvc *tree.Service, cnrCli *containerClient.Client, log *logger.Logger) { + log.Info("synchronizing trees...") + + ids, err := cnrCli.List(nil) + if err != nil { + log.Error("trees are not synchronized", zap.Error(err)) + return + } + + // TODO: #1902 fetch all the trees via a new tree RPC + wellKnownTrees := [...]string{"version", "system"} + + for _, id := range ids { + for _, tID := range wellKnownTrees { + err = treeSvc.Synchronize(ctx, id, tID) + if err != nil && !errors.Is(err, tree.ErrNotInContainer) { + log.Warn( + "tree synchronization failed", + zap.Stringer("cid", id), + zap.String("tree_id", tID), + zap.Error(err), + ) + } + } + } + + log.Info("trees have been synchronized") +}