forked from TrueCloudLab/frostfs-node
[#947] engine: Evacuate trees to remote nodes
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
parent
728150d1d2
commit
db67c21d55
8 changed files with 283 additions and 95 deletions
cmd/frostfs-node
internal/logs
pkg
local_object_storage/engine
services
|
@ -9,23 +9,13 @@ import (
|
|||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control"
|
||||
controlSvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control/server"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/tree"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/sdnotify"
|
||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
"go.uber.org/zap"
|
||||
"google.golang.org/grpc"
|
||||
)
|
||||
|
||||
const serviceNameControl = "control"
|
||||
|
||||
type treeSynchronizer struct {
|
||||
treeSvc *tree.Service
|
||||
}
|
||||
|
||||
func (t treeSynchronizer) Synchronize(ctx context.Context, cnr cid.ID, treeID string) error {
|
||||
return t.treeSvc.SynchronizeTree(ctx, cnr, treeID)
|
||||
}
|
||||
|
||||
func initControlService(c *cfg) {
|
||||
endpoint := controlconfig.GRPC(c.appCfg).Endpoint()
|
||||
if endpoint == controlconfig.GRPCEndpointDefault {
|
||||
|
@ -50,9 +40,7 @@ func initControlService(c *cfg) {
|
|||
controlSvc.WithReplicator(c.replicator),
|
||||
controlSvc.WithNodeState(c),
|
||||
controlSvc.WithLocalStorage(c.cfgObject.cfgLocalStorage.localStorage),
|
||||
controlSvc.WithTreeService(treeSynchronizer{
|
||||
c.treeService,
|
||||
}),
|
||||
controlSvc.WithTreeService(c.treeService),
|
||||
controlSvc.WithLocalOverrideStorage(c.cfgObject.cfgAccessPolicyEngine.accessPolicyEngine),
|
||||
)
|
||||
|
||||
|
|
|
@ -575,4 +575,7 @@ const (
|
|||
GetSvcV2FailedToGetRangeHashFromNode = "failed to get range hash from node"
|
||||
GetSvcV2FailedToGetRangeHashFromAllOfContainerNodes = "failed to get range hash from all of container nodes"
|
||||
FailedToUpdateShardID = "failed to update shard id"
|
||||
EngineShardsEvacuationFailedToMoveTree = "failed to evacuate tree to other node"
|
||||
EngineShardsEvacuationTreeEvacuatedLocal = "tree evacuated to local node"
|
||||
EngineShardsEvacuationTreeEvacuatedRemote = "tree evacuated to other node"
|
||||
)
|
||||
|
|
|
@ -17,6 +17,7 @@ import (
|
|||
tracingPkg "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/tracing"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"git.frostfs.info/TrueCloudLab/hrw"
|
||||
|
@ -74,6 +75,7 @@ func (s EvacuateScope) TreesOnly() bool {
|
|||
type EvacuateShardPrm struct {
|
||||
ShardID []*shard.ID
|
||||
ObjectsHandler func(context.Context, oid.Address, *objectSDK.Object) error
|
||||
TreeHandler func(context.Context, cid.ID, string, pilorama.Forest) (string, error)
|
||||
IgnoreErrors bool
|
||||
Async bool
|
||||
Scope EvacuateScope
|
||||
|
@ -436,19 +438,44 @@ func (e *StorageEngine) evacuateTrees(ctx context.Context, sh *shard.Shard, tree
|
|||
default:
|
||||
}
|
||||
|
||||
success, _, err := e.tryEvacuateTreeLocal(ctx, sh, contTree, prm, shards, weights, shardsToEvacuate)
|
||||
success, shardID, err := e.tryEvacuateTreeLocal(ctx, sh, contTree, prm, shards, weights, shardsToEvacuate)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if success {
|
||||
e.log.Debug(logs.EngineShardsEvacuationTreeEvacuatedLocal,
|
||||
zap.String("cid", contTree.CID.EncodeToString()), zap.String("tree_id", contTree.TreeID),
|
||||
zap.String("from_shard_id", sh.ID().String()), zap.String("to_shard_id", shardID),
|
||||
evacuationOperationLogField, zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
|
||||
res.trEvacuated.Add(1)
|
||||
} else {
|
||||
res.trFailed.Add(1)
|
||||
continue
|
||||
}
|
||||
|
||||
nodePK, err := e.evacuateTreeToOtherNode(ctx, sh, contTree, prm)
|
||||
if err != nil {
|
||||
e.log.Error(logs.EngineShardsEvacuationFailedToMoveTree,
|
||||
zap.String("cid", contTree.CID.EncodeToString()), zap.String("tree_id", contTree.TreeID),
|
||||
zap.String("from_shard_id", sh.ID().String()), evacuationOperationLogField,
|
||||
zap.Error(err), zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
|
||||
return err
|
||||
}
|
||||
e.log.Debug(logs.EngineShardsEvacuationTreeEvacuatedRemote,
|
||||
zap.String("cid", contTree.CID.EncodeToString()), zap.String("treeID", contTree.TreeID),
|
||||
zap.String("from_shardID", sh.ID().String()), zap.String("to_node", nodePK),
|
||||
evacuationOperationLogField, zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
|
||||
res.trEvacuated.Add(1)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (e *StorageEngine) evacuateTreeToOtherNode(ctx context.Context, sh *shard.Shard, tree pilorama.ContainerIDTreeID, prm EvacuateShardPrm) (string, error) {
|
||||
if prm.TreeHandler == nil {
|
||||
return "", fmt.Errorf("failed to evacuate tree '%s' for container %s from shard %s: local evacuation failed, but no remote evacuation available", tree.TreeID, tree.CID, sh.ID())
|
||||
}
|
||||
|
||||
return prm.TreeHandler(ctx, tree.CID, tree.TreeID, sh)
|
||||
}
|
||||
|
||||
func (e *StorageEngine) tryEvacuateTreeLocal(ctx context.Context, sh *shard.Shard, tree pilorama.ContainerIDTreeID,
|
||||
prm EvacuateShardPrm, shards []pooledShard, weights []float64, shardsToEvacuate map[string]*shard.Shard,
|
||||
) (bool, string, error) {
|
||||
|
@ -579,6 +606,10 @@ func (e *StorageEngine) getActualShards(shardIDs []string, prm EvacuateShardPrm)
|
|||
return nil, nil, errMustHaveTwoShards
|
||||
}
|
||||
|
||||
if len(e.shards)-len(shardIDs) < 1 && prm.TreeHandler == nil && prm.Scope.WithTrees() {
|
||||
return nil, nil, errMustHaveTwoShards
|
||||
}
|
||||
|
||||
// We must have all shards, to have correct information about their
|
||||
// indexes in a sorted slice and set appropriate marks in the metabase.
|
||||
// Evacuated shard is skipped during put.
|
||||
|
|
|
@ -18,6 +18,7 @@ import (
|
|||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger/test"
|
||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
|
||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
|
@ -480,3 +481,87 @@ func TestEvacuateTreesLocal(t *testing.T) {
|
|||
require.Equal(t, sourceOps, targetOps)
|
||||
}
|
||||
}
|
||||
|
||||
func TestEvacuateTreesRemote(t *testing.T) {
|
||||
e, ids, _ := newEngineEvacuate(t, 2, 3)
|
||||
defer func() {
|
||||
require.NoError(t, e.Close(context.Background()))
|
||||
}()
|
||||
|
||||
require.NoError(t, e.shards[ids[0].String()].SetMode(mode.ReadOnly))
|
||||
require.NoError(t, e.shards[ids[1].String()].SetMode(mode.ReadOnly))
|
||||
|
||||
evacuatedTreeOps := make(map[string][]*pilorama.Move)
|
||||
var prm EvacuateShardPrm
|
||||
prm.ShardID = ids
|
||||
prm.Scope = EvacuateScopeTrees
|
||||
prm.TreeHandler = func(ctx context.Context, contID cid.ID, treeID string, f pilorama.Forest) (string, error) {
|
||||
key := contID.String() + treeID
|
||||
var height uint64
|
||||
for {
|
||||
op, err := f.TreeGetOpLog(ctx, contID, treeID, height)
|
||||
require.NoError(t, err)
|
||||
|
||||
if op.Time == 0 {
|
||||
return "", nil
|
||||
}
|
||||
evacuatedTreeOps[key] = append(evacuatedTreeOps[key], &op)
|
||||
height = op.Time + 1
|
||||
}
|
||||
}
|
||||
|
||||
expectedShardIDs := make([]string, 0, len(ids))
|
||||
for _, id := range ids {
|
||||
expectedShardIDs = append(expectedShardIDs, id.String())
|
||||
}
|
||||
|
||||
st, err := e.GetEvacuationState(context.Background())
|
||||
require.NoError(t, err, "get init state failed")
|
||||
require.Equal(t, EvacuateProcessStateUndefined, st.ProcessingStatus(), "invalid init state")
|
||||
require.Equal(t, uint64(0), st.TreesEvacuated(), "invalid init count")
|
||||
require.Nil(t, st.StartedAt(), "invalid init started at")
|
||||
require.Nil(t, st.FinishedAt(), "invalid init finished at")
|
||||
require.ElementsMatch(t, []string{}, st.ShardIDs(), "invalid init shard ids")
|
||||
require.Equal(t, "", st.ErrorMessage(), "invalid init error message")
|
||||
|
||||
res, err := e.Evacuate(context.Background(), prm)
|
||||
require.NotNil(t, res, "sync evacuation must return not nil")
|
||||
require.NoError(t, err, "evacuation failed")
|
||||
|
||||
st, err = e.GetEvacuationState(context.Background())
|
||||
require.NoError(t, err, "get evacuation state failed")
|
||||
require.Equal(t, EvacuateProcessStateCompleted, st.ProcessingStatus())
|
||||
|
||||
require.NoError(t, err, "get final state failed")
|
||||
require.Equal(t, uint64(6), st.TreesTotal(), "invalid trees total count")
|
||||
require.Equal(t, uint64(6), st.TreesEvacuated(), "invalid trees evacuated count")
|
||||
require.Equal(t, uint64(0), st.TreesFailed(), "invalid trees failed count")
|
||||
require.NotNil(t, st.StartedAt(), "invalid final started at")
|
||||
require.NotNil(t, st.FinishedAt(), "invalid final finished at")
|
||||
require.ElementsMatch(t, expectedShardIDs, st.ShardIDs(), "invalid final shard ids")
|
||||
require.Equal(t, "", st.ErrorMessage(), "invalid final error message")
|
||||
|
||||
expectedTreeOps := make(map[string][]*pilorama.Move)
|
||||
for i := 0; i < len(e.shards); i++ {
|
||||
sourceTrees, err := pilorama.TreeListAll(context.Background(), e.shards[ids[i].String()])
|
||||
require.NoError(t, err, "list source trees failed")
|
||||
require.Len(t, sourceTrees, 3)
|
||||
|
||||
for _, tr := range sourceTrees {
|
||||
key := tr.CID.String() + tr.TreeID
|
||||
var height uint64
|
||||
for {
|
||||
op, err := e.shards[ids[i].String()].TreeGetOpLog(context.Background(), tr.CID, tr.TreeID, height)
|
||||
require.NoError(t, err)
|
||||
|
||||
if op.Time == 0 {
|
||||
break
|
||||
}
|
||||
expectedTreeOps[key] = append(expectedTreeOps[key], &op)
|
||||
height = op.Time + 1
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
require.Equal(t, expectedTreeOps, evacuatedTreeOps)
|
||||
}
|
||||
|
|
|
@ -4,13 +4,17 @@ import (
|
|||
"bytes"
|
||||
"context"
|
||||
"crypto/sha256"
|
||||
"encoding/hex"
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/replicator"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/tree"
|
||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
|
@ -28,7 +32,7 @@ func (s *Server) EvacuateShard(ctx context.Context, req *control.EvacuateShardRe
|
|||
prm := engine.EvacuateShardPrm{
|
||||
ShardID: s.getShardIDList(req.GetBody().GetShard_ID()),
|
||||
IgnoreErrors: req.GetBody().GetIgnoreErrors(),
|
||||
ObjectsHandler: s.replicate,
|
||||
ObjectsHandler: s.replicateObject,
|
||||
Scope: engine.EvacuateScopeObjects,
|
||||
}
|
||||
|
||||
|
@ -50,7 +54,7 @@ func (s *Server) EvacuateShard(ctx context.Context, req *control.EvacuateShardRe
|
|||
return resp, nil
|
||||
}
|
||||
|
||||
func (s *Server) replicate(ctx context.Context, addr oid.Address, obj *objectSDK.Object) error {
|
||||
func (s *Server) replicateObject(ctx context.Context, addr oid.Address, obj *objectSDK.Object) error {
|
||||
cid, ok := obj.ContainerID()
|
||||
if !ok {
|
||||
// Return nil to prevent situations where a shard can't be evacuated
|
||||
|
@ -58,33 +62,11 @@ func (s *Server) replicate(ctx context.Context, addr oid.Address, obj *objectSDK
|
|||
return nil
|
||||
}
|
||||
|
||||
nm, err := s.netMapSrc.GetNetMap(0)
|
||||
nodes, err := s.getContainerNodes(cid)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
c, err := s.cnrSrc.Get(cid)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
binCnr := make([]byte, sha256.Size)
|
||||
cid.Encode(binCnr)
|
||||
|
||||
ns, err := nm.ContainerNodes(c.Value.PlacementPolicy(), binCnr)
|
||||
if err != nil {
|
||||
return fmt.Errorf("can't build a list of container nodes")
|
||||
}
|
||||
|
||||
nodes := placement.FlattenNodes(ns)
|
||||
bs := (*keys.PublicKey)(&s.key.PublicKey).Bytes()
|
||||
for i := 0; i < len(nodes); i++ {
|
||||
if bytes.Equal(nodes[i].PublicKey(), bs) {
|
||||
copy(nodes[i:], nodes[i+1:])
|
||||
nodes = nodes[:len(nodes)-1]
|
||||
}
|
||||
}
|
||||
|
||||
var res replicatorResult
|
||||
task := replicator.Task{
|
||||
NumCopies: 1,
|
||||
|
@ -100,6 +82,95 @@ func (s *Server) replicate(ctx context.Context, addr oid.Address, obj *objectSDK
|
|||
return nil
|
||||
}
|
||||
|
||||
func (s *Server) replicateTree(ctx context.Context, contID cid.ID, treeID string, forest pilorama.Forest) (string, error) {
|
||||
nodes, err := s.getContainerNodes(contID)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
if len(nodes) == 0 {
|
||||
return "", fmt.Errorf("no remote nodes available to replicate tree '%s' of container %s", treeID, contID)
|
||||
}
|
||||
|
||||
for _, node := range nodes {
|
||||
err = s.replicateTreeToNode(ctx, forest, contID, treeID, node)
|
||||
if err == nil {
|
||||
return hex.EncodeToString(node.PublicKey()), nil
|
||||
}
|
||||
}
|
||||
return "", err
|
||||
}
|
||||
|
||||
func (s *Server) replicateTreeToNode(ctx context.Context, forest pilorama.Forest, contID cid.ID, treeID string, node netmap.NodeInfo) error {
|
||||
rawCID := make([]byte, sha256.Size)
|
||||
contID.Encode(rawCID)
|
||||
|
||||
var height uint64
|
||||
for {
|
||||
op, err := forest.TreeGetOpLog(ctx, contID, treeID, height)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if op.Time == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
req := &tree.ApplyRequest{
|
||||
Body: &tree.ApplyRequest_Body{
|
||||
ContainerId: rawCID,
|
||||
TreeId: treeID,
|
||||
Operation: &tree.LogMove{
|
||||
ParentId: op.Parent,
|
||||
Meta: op.Meta.Bytes(),
|
||||
ChildId: op.Child,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
err = tree.SignMessage(req, s.key)
|
||||
if err != nil {
|
||||
return fmt.Errorf("can't sign apply request: %w", err)
|
||||
}
|
||||
|
||||
err = s.treeService.ReplicateTreeOp(ctx, node, req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
height = op.Time + 1
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Server) getContainerNodes(contID cid.ID) ([]netmap.NodeInfo, error) {
|
||||
nm, err := s.netMapSrc.GetNetMap(0)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
c, err := s.cnrSrc.Get(contID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
binCnr := make([]byte, sha256.Size)
|
||||
contID.Encode(binCnr)
|
||||
|
||||
ns, err := nm.ContainerNodes(c.Value.PlacementPolicy(), binCnr)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("can't build a list of container nodes")
|
||||
}
|
||||
|
||||
nodes := placement.FlattenNodes(ns)
|
||||
bs := (*keys.PublicKey)(&s.key.PublicKey).Bytes()
|
||||
for i := 0; i < len(nodes); i++ {
|
||||
if bytes.Equal(nodes[i].PublicKey(), bs) {
|
||||
copy(nodes[i:], nodes[i+1:])
|
||||
nodes = nodes[:len(nodes)-1]
|
||||
}
|
||||
}
|
||||
return nodes, nil
|
||||
}
|
||||
|
||||
type replicatorResult struct {
|
||||
count int
|
||||
}
|
||||
|
|
|
@ -24,7 +24,8 @@ func (s *Server) StartShardEvacuation(ctx context.Context, req *control.StartSha
|
|||
prm := engine.EvacuateShardPrm{
|
||||
ShardID: s.getShardIDList(req.GetBody().GetShard_ID()),
|
||||
IgnoreErrors: req.GetBody().GetIgnoreErrors(),
|
||||
ObjectsHandler: s.replicate,
|
||||
ObjectsHandler: s.replicateObject,
|
||||
TreeHandler: s.replicateTree,
|
||||
Async: true,
|
||||
Scope: engine.EvacuateScope(req.GetBody().GetScope()),
|
||||
}
|
||||
|
|
|
@ -4,14 +4,17 @@ import (
|
|||
"context"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/tree"
|
||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
)
|
||||
|
||||
// TreeService represents a tree service instance.
|
||||
type TreeService interface {
|
||||
Synchronize(ctx context.Context, cnr cid.ID, treeID string) error
|
||||
SynchronizeTree(ctx context.Context, cnr cid.ID, treeID string) error
|
||||
ReplicateTreeOp(ctx context.Context, n netmapSDK.NodeInfo, req *tree.ApplyRequest) error
|
||||
}
|
||||
|
||||
func (s *Server) SynchronizeTree(ctx context.Context, req *control.SynchronizeTreeRequest) (*control.SynchronizeTreeResponse, error) {
|
||||
|
@ -31,7 +34,7 @@ func (s *Server) SynchronizeTree(ctx context.Context, req *control.SynchronizeTr
|
|||
return nil, status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
|
||||
err = s.treeService.Synchronize(ctx, cnr, b.GetTreeId())
|
||||
err = s.treeService.SynchronizeTree(ctx, cnr, b.GetTreeId())
|
||||
if err != nil {
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
|
|
|
@ -71,61 +71,67 @@ func (s *Service) replicationWorker(ctx context.Context) {
|
|||
case <-s.closeCh:
|
||||
return
|
||||
case task := <-s.replicationTasks:
|
||||
ctx, span := tracing.StartSpanFromContext(ctx, "TreeService.HandleReplicationTask",
|
||||
trace.WithAttributes(
|
||||
attribute.String("public_key", hex.EncodeToString(task.n.PublicKey())),
|
||||
),
|
||||
)
|
||||
start := time.Now()
|
||||
|
||||
var lastErr error
|
||||
var lastAddr string
|
||||
|
||||
task.n.IterateNetworkEndpoints(func(addr string) bool {
|
||||
ctx, span := tracing.StartSpanFromContext(ctx, "TreeService.HandleReplicationTaskOnEndpoint",
|
||||
trace.WithAttributes(
|
||||
attribute.String("public_key", hex.EncodeToString(task.n.PublicKey())),
|
||||
attribute.String("address", addr),
|
||||
),
|
||||
)
|
||||
defer span.End()
|
||||
|
||||
lastAddr = addr
|
||||
|
||||
c, err := s.cache.get(ctx, addr)
|
||||
if err != nil {
|
||||
lastErr = fmt.Errorf("can't create client: %w", err)
|
||||
return false
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(ctx, s.replicatorTimeout)
|
||||
_, lastErr = c.Apply(ctx, task.req)
|
||||
cancel()
|
||||
|
||||
return lastErr == nil
|
||||
})
|
||||
|
||||
if lastErr != nil {
|
||||
if errors.Is(lastErr, errRecentlyFailed) {
|
||||
s.log.Debug(logs.TreeDoNotSendUpdateToTheNode,
|
||||
zap.String("last_error", lastErr.Error()),
|
||||
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
|
||||
} else {
|
||||
s.log.Warn(logs.TreeFailedToSentUpdateToTheNode,
|
||||
zap.String("last_error", lastErr.Error()),
|
||||
zap.String("address", lastAddr),
|
||||
zap.String("key", hex.EncodeToString(task.n.PublicKey())),
|
||||
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
|
||||
}
|
||||
s.metrics.AddReplicateTaskDuration(time.Since(start), false)
|
||||
} else {
|
||||
s.metrics.AddReplicateTaskDuration(time.Since(start), true)
|
||||
}
|
||||
span.End()
|
||||
_ = s.ReplicateTreeOp(ctx, task.n, task.req)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Service) ReplicateTreeOp(ctx context.Context, n netmapSDK.NodeInfo, req *ApplyRequest) error {
|
||||
ctx, span := tracing.StartSpanFromContext(ctx, "TreeService.HandleReplicationTask",
|
||||
trace.WithAttributes(
|
||||
attribute.String("public_key", hex.EncodeToString(n.PublicKey())),
|
||||
),
|
||||
)
|
||||
defer span.End()
|
||||
|
||||
start := time.Now()
|
||||
|
||||
var lastErr error
|
||||
var lastAddr string
|
||||
|
||||
n.IterateNetworkEndpoints(func(addr string) bool {
|
||||
ctx, span := tracing.StartSpanFromContext(ctx, "TreeService.HandleReplicationTaskOnEndpoint",
|
||||
trace.WithAttributes(
|
||||
attribute.String("public_key", hex.EncodeToString(n.PublicKey())),
|
||||
attribute.String("address", addr),
|
||||
),
|
||||
)
|
||||
defer span.End()
|
||||
|
||||
lastAddr = addr
|
||||
|
||||
c, err := s.cache.get(ctx, addr)
|
||||
if err != nil {
|
||||
lastErr = fmt.Errorf("can't create client: %w", err)
|
||||
return false
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(ctx, s.replicatorTimeout)
|
||||
_, lastErr = c.Apply(ctx, req)
|
||||
cancel()
|
||||
|
||||
return lastErr == nil
|
||||
})
|
||||
|
||||
if lastErr != nil {
|
||||
if errors.Is(lastErr, errRecentlyFailed) {
|
||||
s.log.Debug(logs.TreeDoNotSendUpdateToTheNode,
|
||||
zap.String("last_error", lastErr.Error()),
|
||||
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
|
||||
} else {
|
||||
s.log.Warn(logs.TreeFailedToSentUpdateToTheNode,
|
||||
zap.String("last_error", lastErr.Error()),
|
||||
zap.String("address", lastAddr),
|
||||
zap.String("key", hex.EncodeToString(n.PublicKey())),
|
||||
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
|
||||
}
|
||||
s.metrics.AddReplicateTaskDuration(time.Since(start), false)
|
||||
return lastErr
|
||||
}
|
||||
s.metrics.AddReplicateTaskDuration(time.Since(start), true)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Service) replicateLoop(ctx context.Context) {
|
||||
for i := 0; i < s.replicatorWorkerCount; i++ {
|
||||
go s.replicationWorker(ctx)
|
||||
|
|
Loading…
Reference in a new issue