forked from TrueCloudLab/frostfs-node
203 lines
4.7 KiB
Go
203 lines
4.7 KiB
Go
package tree
|
|
|
|
import (
|
|
"context"
|
|
"crypto/sha256"
|
|
"encoding/hex"
|
|
"errors"
|
|
"fmt"
|
|
"time"
|
|
|
|
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama"
|
|
cidSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
|
netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
|
"go.opentelemetry.io/otel/attribute"
|
|
"go.opentelemetry.io/otel/trace"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
type movePair struct {
|
|
cid cidSDK.ID
|
|
treeID string
|
|
op *pilorama.Move
|
|
}
|
|
|
|
type replicationTask struct {
|
|
n netmapSDK.NodeInfo
|
|
req *ApplyRequest
|
|
}
|
|
|
|
type applyOp struct {
|
|
treeID string
|
|
cid cidSDK.ID
|
|
pilorama.Move
|
|
}
|
|
|
|
const (
|
|
defaultReplicatorCapacity = 64
|
|
defaultReplicatorWorkerCount = 64
|
|
defaultReplicatorSendTimeout = time.Second * 5
|
|
)
|
|
|
|
func (s *Service) localReplicationWorker(ctx context.Context) {
|
|
for {
|
|
select {
|
|
case <-s.closeCh:
|
|
return
|
|
case op := <-s.replicateLocalCh:
|
|
ctx, span := tracing.StartSpanFromContext(ctx, "TreeService.HandleReplicationOperation",
|
|
trace.WithAttributes(
|
|
attribute.String("tree_id", op.treeID),
|
|
attribute.String("container_id", op.cid.EncodeToString()),
|
|
),
|
|
)
|
|
|
|
err := s.forest.TreeApply(ctx, op.cid, op.treeID, &op.Move, false)
|
|
if err != nil {
|
|
s.log.Error(logs.TreeFailedToApplyReplicatedOperation,
|
|
zap.String("err", err.Error()))
|
|
}
|
|
span.End()
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *Service) replicationWorker(ctx context.Context) {
|
|
for {
|
|
select {
|
|
case <-s.closeCh:
|
|
return
|
|
case task := <-s.replicationTasks:
|
|
ctx, span := tracing.StartSpanFromContext(ctx, "TreeService.HandleReplicationTask",
|
|
trace.WithAttributes(
|
|
attribute.String("public_key", hex.EncodeToString(task.n.PublicKey())),
|
|
),
|
|
)
|
|
start := time.Now()
|
|
|
|
var lastErr error
|
|
var lastAddr string
|
|
|
|
task.n.IterateNetworkEndpoints(func(addr string) bool {
|
|
ctx, span := tracing.StartSpanFromContext(ctx, "TreeService.HandleReplicationTaskOnEndpoint",
|
|
trace.WithAttributes(
|
|
attribute.String("public_key", hex.EncodeToString(task.n.PublicKey())),
|
|
attribute.String("address", addr),
|
|
),
|
|
)
|
|
defer span.End()
|
|
|
|
lastAddr = addr
|
|
|
|
c, err := s.cache.get(ctx, addr)
|
|
if err != nil {
|
|
lastErr = fmt.Errorf("can't create client: %w", err)
|
|
return false
|
|
}
|
|
|
|
ctx, cancel := context.WithTimeout(ctx, s.replicatorTimeout)
|
|
_, lastErr = c.Apply(ctx, task.req)
|
|
cancel()
|
|
|
|
return lastErr == nil
|
|
})
|
|
|
|
if lastErr != nil {
|
|
if errors.Is(lastErr, errRecentlyFailed) {
|
|
s.log.Debug(logs.TreeDoNotSendUpdateToTheNode,
|
|
zap.String("last_error", lastErr.Error()))
|
|
} else {
|
|
s.log.Warn(logs.TreeFailedToSentUpdateToTheNode,
|
|
zap.String("last_error", lastErr.Error()),
|
|
zap.String("address", lastAddr),
|
|
zap.String("key", hex.EncodeToString(task.n.PublicKey())))
|
|
}
|
|
s.metrics.AddReplicateTaskDuration(time.Since(start), false)
|
|
} else {
|
|
s.metrics.AddReplicateTaskDuration(time.Since(start), true)
|
|
}
|
|
span.End()
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *Service) replicateLoop(ctx context.Context) {
|
|
for i := 0; i < s.replicatorWorkerCount; i++ {
|
|
go s.replicationWorker(ctx)
|
|
go s.localReplicationWorker(ctx)
|
|
}
|
|
defer func() {
|
|
for len(s.replicationTasks) != 0 {
|
|
<-s.replicationTasks
|
|
}
|
|
}()
|
|
|
|
for {
|
|
select {
|
|
case <-s.closeCh:
|
|
return
|
|
case <-ctx.Done():
|
|
return
|
|
case op := <-s.replicateCh:
|
|
start := time.Now()
|
|
err := s.replicate(op)
|
|
if err != nil {
|
|
s.log.Error(logs.TreeErrorDuringReplication,
|
|
zap.String("err", err.Error()),
|
|
zap.Stringer("cid", op.cid),
|
|
zap.String("treeID", op.treeID))
|
|
}
|
|
s.metrics.AddReplicateWaitDuration(time.Since(start), err == nil)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *Service) replicate(op movePair) error {
|
|
req := newApplyRequest(&op)
|
|
err := SignMessage(req, s.key)
|
|
if err != nil {
|
|
return fmt.Errorf("can't sign data: %w", err)
|
|
}
|
|
|
|
nodes, localIndex, err := s.getContainerNodes(op.cid)
|
|
if err != nil {
|
|
return fmt.Errorf("can't get container nodes: %w", err)
|
|
}
|
|
|
|
for i := range nodes {
|
|
if i != localIndex {
|
|
s.replicationTasks <- replicationTask{nodes[i], req}
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (s *Service) pushToQueue(cid cidSDK.ID, treeID string, op *pilorama.Move) {
|
|
select {
|
|
case s.replicateCh <- movePair{
|
|
cid: cid,
|
|
treeID: treeID,
|
|
op: op,
|
|
}:
|
|
default:
|
|
}
|
|
}
|
|
|
|
func newApplyRequest(op *movePair) *ApplyRequest {
|
|
rawCID := make([]byte, sha256.Size)
|
|
op.cid.Encode(rawCID)
|
|
|
|
return &ApplyRequest{
|
|
Body: &ApplyRequest_Body{
|
|
ContainerId: rawCID,
|
|
TreeId: op.treeID,
|
|
Operation: &LogMove{
|
|
ParentId: op.op.Parent,
|
|
Meta: op.op.Meta.Bytes(),
|
|
ChildId: op.op.Child,
|
|
},
|
|
},
|
|
}
|
|
}
|