From 1766ca20394e328b9438b3a87cdb6050a7d1382c Mon Sep 17 00:00:00 2001 From: Pavel Karpy Date: Tue, 18 Oct 2022 17:15:24 +0300 Subject: [PATCH] [#1902] tree: Allow synchronize all the container trees Add `SynchronizeAllTrees` method of the Tree service. It allows fetching tree IDs and sync all of them. Share common logic b/w the new method and the `SynchronizeTree`. Signed-off-by: Pavel Karpy --- cmd/neofs-node/control.go | 14 +++++- cmd/neofs-node/tree.go | 6 +-- pkg/services/tree/sync.go | 90 +++++++++++++++++++++++++++++++++++++-- 3 files changed, 103 insertions(+), 7 deletions(-) diff --git a/cmd/neofs-node/control.go b/cmd/neofs-node/control.go index 2e6d8b32..f9d3c0aa 100644 --- a/cmd/neofs-node/control.go +++ b/cmd/neofs-node/control.go @@ -7,9 +7,19 @@ import ( controlconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/control" "github.com/nspcc-dev/neofs-node/pkg/services/control" controlSvc "github.com/nspcc-dev/neofs-node/pkg/services/control/server" + "github.com/nspcc-dev/neofs-node/pkg/services/tree" + cid "github.com/nspcc-dev/neofs-sdk-go/container/id" "google.golang.org/grpc" ) +type treeSynchronizer struct { + treeSvc *tree.Service +} + +func (t treeSynchronizer) Synchronize(ctx context.Context, cnr cid.ID, treeID string) error { + return t.treeSvc.SynchronizeTree(ctx, cnr, treeID) +} + func initControlService(c *cfg) { endpoint := controlconfig.GRPC(c.appCfg).Endpoint() if endpoint == controlconfig.GRPCEndpointDefault { @@ -34,7 +44,9 @@ func initControlService(c *cfg) { controlSvc.WithReplicator(c.replicator), controlSvc.WithNodeState(c), controlSvc.WithLocalStorage(c.cfgObject.cfgLocalStorage.localStorage), - controlSvc.WithTreeService(c.treeService), + controlSvc.WithTreeService(treeSynchronizer{ + c.treeService, + }), ) lis, err := net.Listen("tcp", endpoint) diff --git a/cmd/neofs-node/tree.go b/cmd/neofs-node/tree.go index cd38d42a..e7c8a839 100644 --- a/cmd/neofs-node/tree.go +++ b/cmd/neofs-node/tree.go @@ -80,13 +80,13 @@ func syncTrees(ctx context.Context, treeSvc *tree.Service, cnrCli *containerClie wellKnownTrees := [...]string{"version", "system"} for _, id := range ids { - for _, tID := range wellKnownTrees { - err = treeSvc.Synchronize(ctx, id, tID) + for i := range wellKnownTrees { + err = treeSvc.SynchronizeTree(ctx, id, wellKnownTrees[i]) if err != nil && !errors.Is(err, tree.ErrNotInContainer) { log.Warn( "tree synchronization failed", zap.Stringer("cid", id), - zap.String("tree_id", tID), + zap.String("tree_id", wellKnownTrees[i]), zap.Error(err), ) } diff --git a/pkg/services/tree/sync.go b/pkg/services/tree/sync.go index 32052d09..37d559cb 100644 --- a/pkg/services/tree/sync.go +++ b/pkg/services/tree/sync.go @@ -10,6 +10,8 @@ import ( "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/pilorama" "github.com/nspcc-dev/neofs-node/pkg/network" cid "github.com/nspcc-dev/neofs-sdk-go/container/id" + netmapSDK "github.com/nspcc-dev/neofs-sdk-go/netmap" + "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" ) @@ -17,9 +19,12 @@ import ( // ErrNotInContainer is returned when operation could not be performed // because the node is not included in the container. var ErrNotInContainer = errors.New("node is not in container") +var errNoOtherNodes = errors.New("no nodes to fetch trees from") -// Synchronize tries to synchronize log starting from the last stored height. -func (s *Service) Synchronize(ctx context.Context, cid cid.ID, treeID string) error { +// SynchronizeAllTrees synchronizes all the trees of the container. It fetches +// tree IDs from the other container nodes. Returns ErrNotInContainer if the node +// is not included in the container. +func (s *Service) SynchronizeAllTrees(ctx context.Context, cid cid.ID) error { nodes, pos, err := s.getContainerNodes(cid) if err != nil { return fmt.Errorf("can't get container nodes: %w", err) @@ -34,7 +39,86 @@ func (s *Service) Synchronize(ctx context.Context, cid cid.ID, treeID string) er d.Position = pos d.Size = len(nodes) - lm, err := s.forest.TreeGetOpLog(cid, treeID, 0) + nodes = append(nodes[:pos], nodes[pos+1:]...) // exclude that node + if len(nodes) == 0 { + return errNoOtherNodes + } + + rawCID := make([]byte, sha256.Size) + cid.Encode(rawCID) + + req := &TreeListRequest{ + Body: &TreeListRequest_Body{ + ContainerId: rawCID, + }, + } + + err = SignMessage(req, s.key) + if err != nil { + return fmt.Errorf("could not sign request: %w", err) + } + + var resp *TreeListResponse + var treesToSync []string + var outErr error + + err = s.forEachNode(ctx, nodes, func(c TreeServiceClient) bool { + resp, outErr = c.TreeList(ctx, req) + if outErr != nil { + return false + } + + treesToSync = resp.GetBody().GetIds() + + return true + }) + if err != nil { + outErr = err + } + + if outErr != nil { + return fmt.Errorf("could not fetch tree ID list: %w", outErr) + } + + for _, tid := range treesToSync { + err = s.synchronizeTree(ctx, d, tid, nodes) + if err != nil { + s.log.Error("could not sync tree", + zap.Stringer("cid", cid), + zap.String("treeID", tid)) + } + } + + return nil +} + +// SynchronizeTree tries to synchronize log starting from the last stored height. +func (s *Service) SynchronizeTree(ctx context.Context, cid cid.ID, treeID string) error { + nodes, pos, err := s.getContainerNodes(cid) + if err != nil { + return fmt.Errorf("can't get container nodes: %w", err) + } + + if pos < 0 { + return ErrNotInContainer + } + + var d pilorama.CIDDescriptor + d.CID = cid + d.Position = pos + d.Size = len(nodes) + + nodes = append(nodes[:pos], nodes[pos+1:]...) // exclude that node + if len(nodes) == 0 { + return errNoOtherNodes + } + + return s.synchronizeTree(ctx, d, treeID, nodes) +} + +func (s *Service) synchronizeTree(ctx context.Context, d pilorama.CIDDescriptor, + treeID string, nodes []netmapSDK.NodeInfo) error { + lm, err := s.forest.TreeGetOpLog(d.CID, treeID, 0) if err != nil && !errors.Is(err, pilorama.ErrTreeNotFound) { return err }