frostfs-node/pkg/services/object/get/remote.go

119 lines
3.5 KiB
Go
Raw Permalink Normal View History

package getsvc
import (
"context"
"encoding/hex"
"errors"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/policy"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
"go.uber.org/zap"
)
func (r *request) processNode(ctx context.Context, info client.NodeInfo) bool {
ctx, span := tracing.StartSpanFromContext(ctx, "getService.processNode")
defer span.End()
r.log.Debug(logs.ProcessingNode, zap.String("node_key", hex.EncodeToString(info.PublicKey())))
rs, ok := r.getRemoteStorage(info)
if !ok {
return true
}
obj, err := r.getRemote(ctx, rs, info)
var errSplitInfo *objectSDK.SplitInfoError
var errECInfo *objectSDK.ECInfoError
var errRemoved *apistatus.ObjectAlreadyRemoved
var errOutOfRange *apistatus.ObjectOutOfRange
switch {
default:
r.log.Debug(logs.GetRemoteCallFailed, zap.Error(err))
if r.status == statusEC {
// we need to continue getting another chunks from another nodes
// in case of network issue
return false
}
r.status = statusUndefined
r.err = new(apistatus.ObjectNotFound)
case err == nil:
r.status = statusOK
r.err = nil
// both object and err are nil only if the original
// request was forwarded to another node and the object
// has already been streamed to the requesting party
if obj != nil {
r.collectedObject = obj
r.writeCollectedObject(ctx)
}
case errors.As(err, &errRemoved):
r.status = statusINHUMED
r.err = errRemoved
case errors.As(err, &errOutOfRange):
r.status = statusOutOfRange
r.err = errOutOfRange
case errors.As(err, &errSplitInfo):
r.status = statusVIRTUAL
mergeSplitInfo(r.splitInfo(), errSplitInfo.SplitInfo())
r.err = objectSDK.NewSplitInfoError(r.infoSplit)
case errors.As(err, &errECInfo):
r.status = statusEC
util.MergeECInfo(r.infoEC, errECInfo.ECInfo())
r.infoEC = errECInfo.ECInfo()
r.err = objectSDK.NewECInfoError(r.infoEC)
if r.isRaw() {
return len(r.infoEC.Chunks) == int(r.infoEC.Chunks[0].Total)
}
cnt, err := r.containerSource.Get(r.address().Container())
if err == nil {
return len(r.infoEC.Chunks) == policy.ECDataCount(cnt.Value.PlacementPolicy())
}
r.log.Debug(logs.GetUnableToGetECObjectContainer, zap.Error(err))
return len(r.infoEC.Chunks) == int(r.infoEC.Chunks[0].Total)
}
return r.status != statusUndefined
}
func (r *request) getRemote(ctx context.Context, rs remoteStorage, info client.NodeInfo) (*objectSDK.Object, error) {
if r.isForwardingEnabled() {
return rs.ForwardRequest(ctx, info, r.prm.forwarder)
}
key, err := r.key()
if err != nil {
return nil, err
}
prm := RemoteRequestParams{
Epoch: r.curProcEpoch,
TTL: r.prm.common.TTL(),
PrivateKey: key,
SessionToken: r.prm.common.SessionToken(),
BearerToken: r.prm.common.BearerToken(),
XHeaders: r.prm.common.XHeaders(),
IsRaw: r.isRaw(),
}
if r.headOnly() {
return rs.Head(ctx, r.address(), prm)
}
// we don't specify payload writer because we accumulate
// the object locally (even huge).
if rng := r.ctxRange(); rng != nil {
// Current spec allows other storage node to deny access,
// fallback to GET here.
return rs.Range(ctx, r.address(), rng, prm)
}
return rs.Get(ctx, r.address(), prm)
}