frostfs-node/pkg/services/tree/replicator.go
Ekaterina Lebedeva a685fcdc96
All checks were successful
DCO action / DCO (pull_request) Successful in 2m41s
Tests and linters / Run gofumpt (pull_request) Successful in 2m32s
Vulncheck / Vulncheck (pull_request) Successful in 2m38s
Build / Build Components (1.23) (pull_request) Successful in 3m0s
Build / Build Components (1.22) (pull_request) Successful in 3m3s
Pre-commit hooks / Pre-commit (pull_request) Successful in 3m33s
Tests and linters / Tests (1.22) (pull_request) Successful in 3m34s
Tests and linters / Tests (1.23) (pull_request) Successful in 3m36s
Tests and linters / Staticcheck (pull_request) Successful in 3m35s
Tests and linters / Lint (pull_request) Successful in 4m18s
Tests and linters / Tests with -race (pull_request) Successful in 4m20s
Tests and linters / gopls check (pull_request) Successful in 4m25s
[#1317] go.mod: Use range over int
Since Go 1.22 a "for" statement with a "range" clause is able
to iterate through integer values from zero to an upper limit.

gopatch script:
@@
var i, e expression
@@
-for i := 0; i <= e - 1; i++ {
+for i := range e {
    ...
}

@@
var i, e expression
@@
-for i := 0; i <= e; i++ {
+for i := range e + 1 {
    ...
}

@@
var i, e expression
@@
-for i := 0; i < e; i++ {
+for i := range e {
    ...
}

Signed-off-by: Ekaterina Lebedeva <ekaterina.lebedeva@yadro.com>
2024-09-03 13:00:54 +03:00

212 lines
4.9 KiB
Go

package tree
import (
"context"
"crypto/sha256"
"encoding/hex"
"errors"
"fmt"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama"
tracingPkg "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
cidSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
)
type movePair struct {
cid cidSDK.ID
treeID string
op *pilorama.Move
}
type replicationTask struct {
n netmapSDK.NodeInfo
req *ApplyRequest
}
type applyOp struct {
treeID string
cid cidSDK.ID
pilorama.Move
}
const (
defaultReplicatorCapacity = 64
defaultReplicatorWorkerCount = 64
defaultReplicatorSendTimeout = time.Second * 5
)
func (s *Service) localReplicationWorker(ctx context.Context) {
for {
select {
case <-s.closeCh:
return
case op := <-s.replicateLocalCh:
ctx, span := tracing.StartSpanFromContext(ctx, "TreeService.HandleReplicationOperation",
trace.WithAttributes(
attribute.String("tree_id", op.treeID),
attribute.String("container_id", op.cid.EncodeToString()),
),
)
err := s.forest.TreeApply(ctx, op.cid, op.treeID, &op.Move, false)
if err != nil {
s.log.Error(logs.TreeFailedToApplyReplicatedOperation,
zap.String("err", err.Error()))
}
span.End()
}
}
}
func (s *Service) replicationWorker(ctx context.Context) {
for {
select {
case <-s.closeCh:
return
case task := <-s.replicationTasks:
_ = s.ReplicateTreeOp(ctx, task.n, task.req)
}
}
}
func (s *Service) ReplicateTreeOp(ctx context.Context, n netmapSDK.NodeInfo, req *ApplyRequest) error {
ctx, span := tracing.StartSpanFromContext(ctx, "TreeService.HandleReplicationTask",
trace.WithAttributes(
attribute.String("public_key", hex.EncodeToString(n.PublicKey())),
),
)
defer span.End()
start := time.Now()
var lastErr error
var lastAddr string
n.IterateNetworkEndpoints(func(addr string) bool {
ctx, span := tracing.StartSpanFromContext(ctx, "TreeService.HandleReplicationTaskOnEndpoint",
trace.WithAttributes(
attribute.String("public_key", hex.EncodeToString(n.PublicKey())),
attribute.String("address", addr),
),
)
defer span.End()
lastAddr = addr
c, err := s.cache.get(ctx, addr)
if err != nil {
lastErr = fmt.Errorf("can't create client: %w", err)
return false
}
ctx, cancel := context.WithTimeout(ctx, s.replicatorTimeout)
_, lastErr = c.Apply(ctx, req)
cancel()
return lastErr == nil
})
if lastErr != nil {
if errors.Is(lastErr, errRecentlyFailed) {
s.log.Debug(logs.TreeDoNotSendUpdateToTheNode,
zap.String("last_error", lastErr.Error()),
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
} else {
s.log.Warn(logs.TreeFailedToSentUpdateToTheNode,
zap.String("last_error", lastErr.Error()),
zap.String("address", lastAddr),
zap.String("key", hex.EncodeToString(n.PublicKey())),
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
}
s.metrics.AddReplicateTaskDuration(time.Since(start), false)
return lastErr
}
s.metrics.AddReplicateTaskDuration(time.Since(start), true)
return nil
}
func (s *Service) replicateLoop(ctx context.Context) {
for range s.replicatorWorkerCount {
go s.replicationWorker(ctx)
go s.localReplicationWorker(ctx)
}
defer func() {
for len(s.replicationTasks) != 0 {
<-s.replicationTasks
}
}()
for {
select {
case <-s.closeCh:
return
case <-ctx.Done():
return
case op := <-s.replicateCh:
start := time.Now()
err := s.replicate(op)
if err != nil {
s.log.Error(logs.TreeErrorDuringReplication,
zap.String("err", err.Error()),
zap.Stringer("cid", op.cid),
zap.String("treeID", op.treeID))
}
s.metrics.AddReplicateWaitDuration(time.Since(start), err == nil)
}
}
}
func (s *Service) replicate(op movePair) error {
req := newApplyRequest(&op)
err := SignMessage(req, s.key)
if err != nil {
return fmt.Errorf("can't sign data: %w", err)
}
nodes, localIndex, err := s.getContainerNodes(op.cid)
if err != nil {
return fmt.Errorf("can't get container nodes: %w", err)
}
for i := range nodes {
if i != localIndex {
s.replicationTasks <- replicationTask{nodes[i], req}
}
}
return nil
}
func (s *Service) pushToQueue(cid cidSDK.ID, treeID string, op *pilorama.Move) {
select {
case s.replicateCh <- movePair{
cid: cid,
treeID: treeID,
op: op,
}:
default:
}
}
func newApplyRequest(op *movePair) *ApplyRequest {
rawCID := make([]byte, sha256.Size)
op.cid.Encode(rawCID)
return &ApplyRequest{
Body: &ApplyRequest_Body{
ContainerId: rawCID,
TreeId: op.treeID,
Operation: &LogMove{
ParentId: op.op.Parent,
Meta: op.op.Meta.Bytes(),
ChildId: op.op.Child,
},
},
}
}