package replicator import ( "context" "errors" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine" getsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/get" tracingPkg "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/tracing" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" "go.uber.org/zap" ) var errFailedToGetObjectFromAnyNode = errors.New("failed to get object from any node") func (p *Replicator) HandlePullTask(ctx context.Context, task Task) { p.metrics.IncInFlightRequest() defer p.metrics.DecInFlightRequest() defer func() { p.log.Debug(logs.ReplicatorFinishWork, zap.String("type", "pull")) }() ctx, span := tracing.StartSpanFromContext(ctx, "Replicator.HandlePullTask", trace.WithAttributes( attribute.Stringer("address", task.Addr), attribute.Int("nodes_count", len(task.Nodes)), )) defer span.End() var obj *objectSDK.Object for _, node := range task.Nodes { var err error obj, err = p.remoteGetter.Get(ctx, getsvc.RemoteGetPrm{ Address: task.Addr, Node: node, }) if err == nil { break } var endpoints []string node.IterateNetworkEndpoints(func(s string) bool { endpoints = append(endpoints, s) return false }) p.log.Error(logs.ReplicatorCouldNotGetObjectFromRemoteStorage, zap.Stringer("object", task.Addr), zap.Error(err), zap.Strings("endpoints", endpoints), zap.String("trace_id", tracingPkg.GetTraceID(ctx))) } if obj == nil { p.log.Error(logs.ReplicatorCouldNotGetObjectFromRemoteStorage, zap.Stringer("object", task.Addr), zap.Error(errFailedToGetObjectFromAnyNode), zap.String("trace_id", tracingPkg.GetTraceID(ctx))) return } err := engine.Put(ctx, p.localStorage, obj) if err != nil { p.log.Error(logs.ReplicatorCouldNotPutObjectToLocalStorage, zap.Stringer("object", task.Addr), zap.Error(err), zap.String("trace_id", tracingPkg.GetTraceID(ctx))) } }