From db67c21d5578729c2a329e3a475ac6ca08fd1f80 Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Tue, 6 Feb 2024 17:34:32 +0300 Subject: [PATCH] [#947] engine: Evacuate trees to remote nodes Signed-off-by: Dmitrii Stepanov --- cmd/frostfs-node/control.go | 14 +- internal/logs/logs.go | 3 + pkg/local_object_storage/engine/evacuate.go | 37 +++++- .../engine/evacuate_test.go | 85 ++++++++++++ pkg/services/control/server/evacuate.go | 121 ++++++++++++++---- pkg/services/control/server/evacuate_async.go | 3 +- .../control/server/syncronize_tree.go | 7 +- pkg/services/tree/replicator.go | 108 ++++++++-------- 8 files changed, 283 insertions(+), 95 deletions(-) diff --git a/cmd/frostfs-node/control.go b/cmd/frostfs-node/control.go index ee2a2952a..e1e6e3ac9 100644 --- a/cmd/frostfs-node/control.go +++ b/cmd/frostfs-node/control.go @@ -9,23 +9,13 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control" controlSvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control/server" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/tree" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/sdnotify" - cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" "go.uber.org/zap" "google.golang.org/grpc" ) const serviceNameControl = "control" -type treeSynchronizer struct { - treeSvc *tree.Service -} - -func (t treeSynchronizer) Synchronize(ctx context.Context, cnr cid.ID, treeID string) error { - return t.treeSvc.SynchronizeTree(ctx, cnr, treeID) -} - func initControlService(c *cfg) { endpoint := controlconfig.GRPC(c.appCfg).Endpoint() if endpoint == controlconfig.GRPCEndpointDefault { @@ -50,9 +40,7 @@ func initControlService(c *cfg) { controlSvc.WithReplicator(c.replicator), controlSvc.WithNodeState(c), controlSvc.WithLocalStorage(c.cfgObject.cfgLocalStorage.localStorage), - controlSvc.WithTreeService(treeSynchronizer{ - c.treeService, - }), + controlSvc.WithTreeService(c.treeService), controlSvc.WithLocalOverrideStorage(c.cfgObject.cfgAccessPolicyEngine.accessPolicyEngine), ) diff --git a/internal/logs/logs.go b/internal/logs/logs.go index c3d2e5150..d839915d8 100644 --- a/internal/logs/logs.go +++ b/internal/logs/logs.go @@ -575,4 +575,7 @@ const ( GetSvcV2FailedToGetRangeHashFromNode = "failed to get range hash from node" GetSvcV2FailedToGetRangeHashFromAllOfContainerNodes = "failed to get range hash from all of container nodes" FailedToUpdateShardID = "failed to update shard id" + EngineShardsEvacuationFailedToMoveTree = "failed to evacuate tree to other node" + EngineShardsEvacuationTreeEvacuatedLocal = "tree evacuated to local node" + EngineShardsEvacuationTreeEvacuatedRemote = "tree evacuated to other node" ) diff --git a/pkg/local_object_storage/engine/evacuate.go b/pkg/local_object_storage/engine/evacuate.go index 15fb69c2f..becddfa8b 100644 --- a/pkg/local_object_storage/engine/evacuate.go +++ b/pkg/local_object_storage/engine/evacuate.go @@ -17,6 +17,7 @@ import ( tracingPkg "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/tracing" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" + cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "git.frostfs.info/TrueCloudLab/hrw" @@ -74,6 +75,7 @@ func (s EvacuateScope) TreesOnly() bool { type EvacuateShardPrm struct { ShardID []*shard.ID ObjectsHandler func(context.Context, oid.Address, *objectSDK.Object) error + TreeHandler func(context.Context, cid.ID, string, pilorama.Forest) (string, error) IgnoreErrors bool Async bool Scope EvacuateScope @@ -436,19 +438,44 @@ func (e *StorageEngine) evacuateTrees(ctx context.Context, sh *shard.Shard, tree default: } - success, _, err := e.tryEvacuateTreeLocal(ctx, sh, contTree, prm, shards, weights, shardsToEvacuate) + success, shardID, err := e.tryEvacuateTreeLocal(ctx, sh, contTree, prm, shards, weights, shardsToEvacuate) if err != nil { return err } if success { + e.log.Debug(logs.EngineShardsEvacuationTreeEvacuatedLocal, + zap.String("cid", contTree.CID.EncodeToString()), zap.String("tree_id", contTree.TreeID), + zap.String("from_shard_id", sh.ID().String()), zap.String("to_shard_id", shardID), + evacuationOperationLogField, zap.String("trace_id", tracingPkg.GetTraceID(ctx))) res.trEvacuated.Add(1) - } else { - res.trFailed.Add(1) + continue } + + nodePK, err := e.evacuateTreeToOtherNode(ctx, sh, contTree, prm) + if err != nil { + e.log.Error(logs.EngineShardsEvacuationFailedToMoveTree, + zap.String("cid", contTree.CID.EncodeToString()), zap.String("tree_id", contTree.TreeID), + zap.String("from_shard_id", sh.ID().String()), evacuationOperationLogField, + zap.Error(err), zap.String("trace_id", tracingPkg.GetTraceID(ctx))) + return err + } + e.log.Debug(logs.EngineShardsEvacuationTreeEvacuatedRemote, + zap.String("cid", contTree.CID.EncodeToString()), zap.String("treeID", contTree.TreeID), + zap.String("from_shardID", sh.ID().String()), zap.String("to_node", nodePK), + evacuationOperationLogField, zap.String("trace_id", tracingPkg.GetTraceID(ctx))) + res.trEvacuated.Add(1) } return nil } +func (e *StorageEngine) evacuateTreeToOtherNode(ctx context.Context, sh *shard.Shard, tree pilorama.ContainerIDTreeID, prm EvacuateShardPrm) (string, error) { + if prm.TreeHandler == nil { + return "", fmt.Errorf("failed to evacuate tree '%s' for container %s from shard %s: local evacuation failed, but no remote evacuation available", tree.TreeID, tree.CID, sh.ID()) + } + + return prm.TreeHandler(ctx, tree.CID, tree.TreeID, sh) +} + func (e *StorageEngine) tryEvacuateTreeLocal(ctx context.Context, sh *shard.Shard, tree pilorama.ContainerIDTreeID, prm EvacuateShardPrm, shards []pooledShard, weights []float64, shardsToEvacuate map[string]*shard.Shard, ) (bool, string, error) { @@ -579,6 +606,10 @@ func (e *StorageEngine) getActualShards(shardIDs []string, prm EvacuateShardPrm) return nil, nil, errMustHaveTwoShards } + if len(e.shards)-len(shardIDs) < 1 && prm.TreeHandler == nil && prm.Scope.WithTrees() { + return nil, nil, errMustHaveTwoShards + } + // We must have all shards, to have correct information about their // indexes in a sorted slice and set appropriate marks in the metabase. // Evacuated shard is skipped during put. diff --git a/pkg/local_object_storage/engine/evacuate_test.go b/pkg/local_object_storage/engine/evacuate_test.go index d234f8ee3..32582cd2a 100644 --- a/pkg/local_object_storage/engine/evacuate_test.go +++ b/pkg/local_object_storage/engine/evacuate_test.go @@ -18,6 +18,7 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger/test" + cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" @@ -480,3 +481,87 @@ func TestEvacuateTreesLocal(t *testing.T) { require.Equal(t, sourceOps, targetOps) } } + +func TestEvacuateTreesRemote(t *testing.T) { + e, ids, _ := newEngineEvacuate(t, 2, 3) + defer func() { + require.NoError(t, e.Close(context.Background())) + }() + + require.NoError(t, e.shards[ids[0].String()].SetMode(mode.ReadOnly)) + require.NoError(t, e.shards[ids[1].String()].SetMode(mode.ReadOnly)) + + evacuatedTreeOps := make(map[string][]*pilorama.Move) + var prm EvacuateShardPrm + prm.ShardID = ids + prm.Scope = EvacuateScopeTrees + prm.TreeHandler = func(ctx context.Context, contID cid.ID, treeID string, f pilorama.Forest) (string, error) { + key := contID.String() + treeID + var height uint64 + for { + op, err := f.TreeGetOpLog(ctx, contID, treeID, height) + require.NoError(t, err) + + if op.Time == 0 { + return "", nil + } + evacuatedTreeOps[key] = append(evacuatedTreeOps[key], &op) + height = op.Time + 1 + } + } + + expectedShardIDs := make([]string, 0, len(ids)) + for _, id := range ids { + expectedShardIDs = append(expectedShardIDs, id.String()) + } + + st, err := e.GetEvacuationState(context.Background()) + require.NoError(t, err, "get init state failed") + require.Equal(t, EvacuateProcessStateUndefined, st.ProcessingStatus(), "invalid init state") + require.Equal(t, uint64(0), st.TreesEvacuated(), "invalid init count") + require.Nil(t, st.StartedAt(), "invalid init started at") + require.Nil(t, st.FinishedAt(), "invalid init finished at") + require.ElementsMatch(t, []string{}, st.ShardIDs(), "invalid init shard ids") + require.Equal(t, "", st.ErrorMessage(), "invalid init error message") + + res, err := e.Evacuate(context.Background(), prm) + require.NotNil(t, res, "sync evacuation must return not nil") + require.NoError(t, err, "evacuation failed") + + st, err = e.GetEvacuationState(context.Background()) + require.NoError(t, err, "get evacuation state failed") + require.Equal(t, EvacuateProcessStateCompleted, st.ProcessingStatus()) + + require.NoError(t, err, "get final state failed") + require.Equal(t, uint64(6), st.TreesTotal(), "invalid trees total count") + require.Equal(t, uint64(6), st.TreesEvacuated(), "invalid trees evacuated count") + require.Equal(t, uint64(0), st.TreesFailed(), "invalid trees failed count") + require.NotNil(t, st.StartedAt(), "invalid final started at") + require.NotNil(t, st.FinishedAt(), "invalid final finished at") + require.ElementsMatch(t, expectedShardIDs, st.ShardIDs(), "invalid final shard ids") + require.Equal(t, "", st.ErrorMessage(), "invalid final error message") + + expectedTreeOps := make(map[string][]*pilorama.Move) + for i := 0; i < len(e.shards); i++ { + sourceTrees, err := pilorama.TreeListAll(context.Background(), e.shards[ids[i].String()]) + require.NoError(t, err, "list source trees failed") + require.Len(t, sourceTrees, 3) + + for _, tr := range sourceTrees { + key := tr.CID.String() + tr.TreeID + var height uint64 + for { + op, err := e.shards[ids[i].String()].TreeGetOpLog(context.Background(), tr.CID, tr.TreeID, height) + require.NoError(t, err) + + if op.Time == 0 { + break + } + expectedTreeOps[key] = append(expectedTreeOps[key], &op) + height = op.Time + 1 + } + } + } + + require.Equal(t, expectedTreeOps, evacuatedTreeOps) +} diff --git a/pkg/services/control/server/evacuate.go b/pkg/services/control/server/evacuate.go index ac8b3d548..99082b30d 100644 --- a/pkg/services/control/server/evacuate.go +++ b/pkg/services/control/server/evacuate.go @@ -4,13 +4,17 @@ import ( "bytes" "context" "crypto/sha256" + "encoding/hex" "errors" "fmt" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/replicator" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/tree" + cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" @@ -28,7 +32,7 @@ func (s *Server) EvacuateShard(ctx context.Context, req *control.EvacuateShardRe prm := engine.EvacuateShardPrm{ ShardID: s.getShardIDList(req.GetBody().GetShard_ID()), IgnoreErrors: req.GetBody().GetIgnoreErrors(), - ObjectsHandler: s.replicate, + ObjectsHandler: s.replicateObject, Scope: engine.EvacuateScopeObjects, } @@ -50,7 +54,7 @@ func (s *Server) EvacuateShard(ctx context.Context, req *control.EvacuateShardRe return resp, nil } -func (s *Server) replicate(ctx context.Context, addr oid.Address, obj *objectSDK.Object) error { +func (s *Server) replicateObject(ctx context.Context, addr oid.Address, obj *objectSDK.Object) error { cid, ok := obj.ContainerID() if !ok { // Return nil to prevent situations where a shard can't be evacuated @@ -58,33 +62,11 @@ func (s *Server) replicate(ctx context.Context, addr oid.Address, obj *objectSDK return nil } - nm, err := s.netMapSrc.GetNetMap(0) + nodes, err := s.getContainerNodes(cid) if err != nil { return err } - c, err := s.cnrSrc.Get(cid) - if err != nil { - return err - } - - binCnr := make([]byte, sha256.Size) - cid.Encode(binCnr) - - ns, err := nm.ContainerNodes(c.Value.PlacementPolicy(), binCnr) - if err != nil { - return fmt.Errorf("can't build a list of container nodes") - } - - nodes := placement.FlattenNodes(ns) - bs := (*keys.PublicKey)(&s.key.PublicKey).Bytes() - for i := 0; i < len(nodes); i++ { - if bytes.Equal(nodes[i].PublicKey(), bs) { - copy(nodes[i:], nodes[i+1:]) - nodes = nodes[:len(nodes)-1] - } - } - var res replicatorResult task := replicator.Task{ NumCopies: 1, @@ -100,6 +82,95 @@ func (s *Server) replicate(ctx context.Context, addr oid.Address, obj *objectSDK return nil } +func (s *Server) replicateTree(ctx context.Context, contID cid.ID, treeID string, forest pilorama.Forest) (string, error) { + nodes, err := s.getContainerNodes(contID) + if err != nil { + return "", err + } + if len(nodes) == 0 { + return "", fmt.Errorf("no remote nodes available to replicate tree '%s' of container %s", treeID, contID) + } + + for _, node := range nodes { + err = s.replicateTreeToNode(ctx, forest, contID, treeID, node) + if err == nil { + return hex.EncodeToString(node.PublicKey()), nil + } + } + return "", err +} + +func (s *Server) replicateTreeToNode(ctx context.Context, forest pilorama.Forest, contID cid.ID, treeID string, node netmap.NodeInfo) error { + rawCID := make([]byte, sha256.Size) + contID.Encode(rawCID) + + var height uint64 + for { + op, err := forest.TreeGetOpLog(ctx, contID, treeID, height) + if err != nil { + return err + } + + if op.Time == 0 { + return nil + } + + req := &tree.ApplyRequest{ + Body: &tree.ApplyRequest_Body{ + ContainerId: rawCID, + TreeId: treeID, + Operation: &tree.LogMove{ + ParentId: op.Parent, + Meta: op.Meta.Bytes(), + ChildId: op.Child, + }, + }, + } + + err = tree.SignMessage(req, s.key) + if err != nil { + return fmt.Errorf("can't sign apply request: %w", err) + } + + err = s.treeService.ReplicateTreeOp(ctx, node, req) + if err != nil { + return err + } + + height = op.Time + 1 + } +} + +func (s *Server) getContainerNodes(contID cid.ID) ([]netmap.NodeInfo, error) { + nm, err := s.netMapSrc.GetNetMap(0) + if err != nil { + return nil, err + } + + c, err := s.cnrSrc.Get(contID) + if err != nil { + return nil, err + } + + binCnr := make([]byte, sha256.Size) + contID.Encode(binCnr) + + ns, err := nm.ContainerNodes(c.Value.PlacementPolicy(), binCnr) + if err != nil { + return nil, fmt.Errorf("can't build a list of container nodes") + } + + nodes := placement.FlattenNodes(ns) + bs := (*keys.PublicKey)(&s.key.PublicKey).Bytes() + for i := 0; i < len(nodes); i++ { + if bytes.Equal(nodes[i].PublicKey(), bs) { + copy(nodes[i:], nodes[i+1:]) + nodes = nodes[:len(nodes)-1] + } + } + return nodes, nil +} + type replicatorResult struct { count int } diff --git a/pkg/services/control/server/evacuate_async.go b/pkg/services/control/server/evacuate_async.go index 91f0731c4..cd7e8a2c7 100644 --- a/pkg/services/control/server/evacuate_async.go +++ b/pkg/services/control/server/evacuate_async.go @@ -24,7 +24,8 @@ func (s *Server) StartShardEvacuation(ctx context.Context, req *control.StartSha prm := engine.EvacuateShardPrm{ ShardID: s.getShardIDList(req.GetBody().GetShard_ID()), IgnoreErrors: req.GetBody().GetIgnoreErrors(), - ObjectsHandler: s.replicate, + ObjectsHandler: s.replicateObject, + TreeHandler: s.replicateTree, Async: true, Scope: engine.EvacuateScope(req.GetBody().GetScope()), } diff --git a/pkg/services/control/server/syncronize_tree.go b/pkg/services/control/server/syncronize_tree.go index dce3e8831..678f87d0a 100644 --- a/pkg/services/control/server/syncronize_tree.go +++ b/pkg/services/control/server/syncronize_tree.go @@ -4,14 +4,17 @@ import ( "context" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/tree" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" + netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) // TreeService represents a tree service instance. type TreeService interface { - Synchronize(ctx context.Context, cnr cid.ID, treeID string) error + SynchronizeTree(ctx context.Context, cnr cid.ID, treeID string) error + ReplicateTreeOp(ctx context.Context, n netmapSDK.NodeInfo, req *tree.ApplyRequest) error } func (s *Server) SynchronizeTree(ctx context.Context, req *control.SynchronizeTreeRequest) (*control.SynchronizeTreeResponse, error) { @@ -31,7 +34,7 @@ func (s *Server) SynchronizeTree(ctx context.Context, req *control.SynchronizeTr return nil, status.Error(codes.Internal, err.Error()) } - err = s.treeService.Synchronize(ctx, cnr, b.GetTreeId()) + err = s.treeService.SynchronizeTree(ctx, cnr, b.GetTreeId()) if err != nil { return nil, status.Error(codes.Internal, err.Error()) } diff --git a/pkg/services/tree/replicator.go b/pkg/services/tree/replicator.go index 49a37b8be..346198b3c 100644 --- a/pkg/services/tree/replicator.go +++ b/pkg/services/tree/replicator.go @@ -71,61 +71,67 @@ func (s *Service) replicationWorker(ctx context.Context) { case <-s.closeCh: return case task := <-s.replicationTasks: - ctx, span := tracing.StartSpanFromContext(ctx, "TreeService.HandleReplicationTask", - trace.WithAttributes( - attribute.String("public_key", hex.EncodeToString(task.n.PublicKey())), - ), - ) - start := time.Now() - - var lastErr error - var lastAddr string - - task.n.IterateNetworkEndpoints(func(addr string) bool { - ctx, span := tracing.StartSpanFromContext(ctx, "TreeService.HandleReplicationTaskOnEndpoint", - trace.WithAttributes( - attribute.String("public_key", hex.EncodeToString(task.n.PublicKey())), - attribute.String("address", addr), - ), - ) - defer span.End() - - lastAddr = addr - - c, err := s.cache.get(ctx, addr) - if err != nil { - lastErr = fmt.Errorf("can't create client: %w", err) - return false - } - - ctx, cancel := context.WithTimeout(ctx, s.replicatorTimeout) - _, lastErr = c.Apply(ctx, task.req) - cancel() - - return lastErr == nil - }) - - if lastErr != nil { - if errors.Is(lastErr, errRecentlyFailed) { - s.log.Debug(logs.TreeDoNotSendUpdateToTheNode, - zap.String("last_error", lastErr.Error()), - zap.String("trace_id", tracingPkg.GetTraceID(ctx))) - } else { - s.log.Warn(logs.TreeFailedToSentUpdateToTheNode, - zap.String("last_error", lastErr.Error()), - zap.String("address", lastAddr), - zap.String("key", hex.EncodeToString(task.n.PublicKey())), - zap.String("trace_id", tracingPkg.GetTraceID(ctx))) - } - s.metrics.AddReplicateTaskDuration(time.Since(start), false) - } else { - s.metrics.AddReplicateTaskDuration(time.Since(start), true) - } - span.End() + _ = s.ReplicateTreeOp(ctx, task.n, task.req) } } } +func (s *Service) ReplicateTreeOp(ctx context.Context, n netmapSDK.NodeInfo, req *ApplyRequest) error { + ctx, span := tracing.StartSpanFromContext(ctx, "TreeService.HandleReplicationTask", + trace.WithAttributes( + attribute.String("public_key", hex.EncodeToString(n.PublicKey())), + ), + ) + defer span.End() + + start := time.Now() + + var lastErr error + var lastAddr string + + n.IterateNetworkEndpoints(func(addr string) bool { + ctx, span := tracing.StartSpanFromContext(ctx, "TreeService.HandleReplicationTaskOnEndpoint", + trace.WithAttributes( + attribute.String("public_key", hex.EncodeToString(n.PublicKey())), + attribute.String("address", addr), + ), + ) + defer span.End() + + lastAddr = addr + + c, err := s.cache.get(ctx, addr) + if err != nil { + lastErr = fmt.Errorf("can't create client: %w", err) + return false + } + + ctx, cancel := context.WithTimeout(ctx, s.replicatorTimeout) + _, lastErr = c.Apply(ctx, req) + cancel() + + return lastErr == nil + }) + + if lastErr != nil { + if errors.Is(lastErr, errRecentlyFailed) { + s.log.Debug(logs.TreeDoNotSendUpdateToTheNode, + zap.String("last_error", lastErr.Error()), + zap.String("trace_id", tracingPkg.GetTraceID(ctx))) + } else { + s.log.Warn(logs.TreeFailedToSentUpdateToTheNode, + zap.String("last_error", lastErr.Error()), + zap.String("address", lastAddr), + zap.String("key", hex.EncodeToString(n.PublicKey())), + zap.String("trace_id", tracingPkg.GetTraceID(ctx))) + } + s.metrics.AddReplicateTaskDuration(time.Since(start), false) + return lastErr + } + s.metrics.AddReplicateTaskDuration(time.Since(start), true) + return nil +} + func (s *Service) replicateLoop(ctx context.Context) { for i := 0; i < s.replicatorWorkerCount; i++ { go s.replicationWorker(ctx)