package tree import ( "context" "crypto/sha256" "encoding/hex" "errors" "fmt" "time" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama" tracingPkg "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/tracing" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" cidSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" "go.uber.org/zap" ) type movePair struct { cid cidSDK.ID treeID string op *pilorama.Move } type replicationTask struct { n netmapSDK.NodeInfo req *ApplyRequest } type applyOp struct { treeID string cid cidSDK.ID pilorama.Move } const ( defaultReplicatorCapacity = 64 defaultReplicatorWorkerCount = 64 defaultReplicatorSendTimeout = time.Second * 5 defaultSyncBatchSize = 1000 ) func (s *Service) localReplicationWorker(ctx context.Context) { for { select { case <-s.closeCh: return case op := <-s.replicateLocalCh: ctx, span := tracing.StartSpanFromContext(ctx, "TreeService.HandleReplicationOperation", trace.WithAttributes( attribute.String("tree_id", op.treeID), attribute.String("container_id", op.cid.EncodeToString()), ), ) err := s.forest.TreeApply(ctx, op.cid, op.treeID, &op.Move, false) if err != nil { s.log.Error(logs.TreeFailedToApplyReplicatedOperation, zap.String("err", err.Error())) } span.End() } } } func (s *Service) replicationWorker(ctx context.Context) { for { select { case <-s.closeCh: return case task := <-s.replicationTasks: _ = s.ReplicateTreeOp(ctx, task.n, task.req) } } } func (s *Service) ReplicateTreeOp(ctx context.Context, n netmapSDK.NodeInfo, req *ApplyRequest) error { ctx, span := tracing.StartSpanFromContext(ctx, "TreeService.HandleReplicationTask", trace.WithAttributes( attribute.String("public_key", hex.EncodeToString(n.PublicKey())), ), ) defer span.End() start := time.Now() var lastErr error var lastAddr string n.IterateNetworkEndpoints(func(addr string) bool { ctx, span := tracing.StartSpanFromContext(ctx, "TreeService.HandleReplicationTaskOnEndpoint", trace.WithAttributes( attribute.String("public_key", hex.EncodeToString(n.PublicKey())), attribute.String("address", addr), ), ) defer span.End() lastAddr = addr c, err := s.cache.get(ctx, addr) if err != nil { lastErr = fmt.Errorf("can't create client: %w", err) return false } ctx, cancel := context.WithTimeout(ctx, s.replicatorTimeout) _, lastErr = c.Apply(ctx, req) cancel() return lastErr == nil }) if lastErr != nil { if errors.Is(lastErr, errRecentlyFailed) { s.log.Debug(logs.TreeDoNotSendUpdateToTheNode, zap.String("last_error", lastErr.Error()), zap.String("trace_id", tracingPkg.GetTraceID(ctx))) } else { s.log.Warn(logs.TreeFailedToSentUpdateToTheNode, zap.String("last_error", lastErr.Error()), zap.String("address", lastAddr), zap.String("key", hex.EncodeToString(n.PublicKey())), zap.String("trace_id", tracingPkg.GetTraceID(ctx))) } s.metrics.AddReplicateTaskDuration(time.Since(start), false) return lastErr } s.metrics.AddReplicateTaskDuration(time.Since(start), true) return nil } func (s *Service) replicateLoop(ctx context.Context) { for range s.replicatorWorkerCount { go s.replicationWorker(ctx) go s.localReplicationWorker(ctx) } defer func() { for len(s.replicationTasks) != 0 { <-s.replicationTasks } }() for { select { case <-s.closeCh: return case <-ctx.Done(): return case op := <-s.replicateCh: start := time.Now() err := s.replicate(op) if err != nil { s.log.Error(logs.TreeErrorDuringReplication, zap.String("err", err.Error()), zap.Stringer("cid", op.cid), zap.String("treeID", op.treeID)) } s.metrics.AddReplicateWaitDuration(time.Since(start), err == nil) } } } func (s *Service) replicate(op movePair) error { req := newApplyRequest(&op) err := SignMessage(req, s.key) if err != nil { return fmt.Errorf("can't sign data: %w", err) } nodes, localIndex, err := s.getContainerNodes(op.cid) if err != nil { return fmt.Errorf("can't get container nodes: %w", err) } for i := range nodes { if i != localIndex { s.replicationTasks <- replicationTask{nodes[i], req} } } return nil } func (s *Service) pushToQueue(cid cidSDK.ID, treeID string, op *pilorama.Move) { select { case s.replicateCh <- movePair{ cid: cid, treeID: treeID, op: op, }: default: } } func newApplyRequest(op *movePair) *ApplyRequest { rawCID := make([]byte, sha256.Size) op.cid.Encode(rawCID) return &ApplyRequest{ Body: &ApplyRequest_Body{ ContainerId: rawCID, TreeId: op.treeID, Operation: &LogMove{ ParentId: op.op.Parent, Meta: op.op.Meta.Bytes(), ChildId: op.op.Child, }, }, } }