forked from TrueCloudLab/frostfs-node
[#1103] node: Implement Get\Head
requests for EC object
Signed-off-by: Anton Nikiforov <an.nikiforov@yadro.com>
This commit is contained in:
parent
167c52a1a9
commit
112a7c690f
30 changed files with 579 additions and 11 deletions
|
@ -139,9 +139,11 @@ func (r *request) getObjectWithIndependentRequest(ctx context.Context, prm Reque
|
|||
remoteStorageConstructor: r.remoteStorageConstructor,
|
||||
epochSource: r.epochSource,
|
||||
localStorage: r.localStorage,
|
||||
containerSource: r.containerSource,
|
||||
|
||||
prm: prm,
|
||||
infoSplit: objectSDK.NewSplitInfo(),
|
||||
infoEC: objectSDK.NewECInfo(),
|
||||
log: r.log,
|
||||
}
|
||||
|
||||
|
|
79
pkg/services/object/get/assembleec.go
Normal file
79
pkg/services/object/get/assembleec.go
Normal file
|
@ -0,0 +1,79 @@
|
|||
package getsvc
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
func (r *request) assembleEC(ctx context.Context) {
|
||||
if r.isRaw() {
|
||||
r.log.Debug(logs.GetCanNotAssembleTheObject)
|
||||
return
|
||||
}
|
||||
|
||||
// Any access tokens are not expected to be used in the assembly process:
|
||||
// - there is no requirement to specify child objects in session/bearer
|
||||
// token for `GET`/`GETRANGE`/`RANGEHASH` requests in the API protocol,
|
||||
// and, therefore, their missing in the original request should not be
|
||||
// considered as error; on the other hand, without session for every child
|
||||
// object, it is impossible to attach bearer token in the new generated
|
||||
// requests correctly because the token has not been issued for that node's
|
||||
// key;
|
||||
// - the assembly process is expected to be handled on a container node
|
||||
// only since the requests forwarding mechanism presentation; such the
|
||||
// node should have enough rights for getting any child object by design.
|
||||
r.prm.common.ForgetTokens()
|
||||
|
||||
// Do not use forwarding during assembly stage.
|
||||
// Request forwarding closure inherited in produced
|
||||
// `execCtx` so it should be disabled there.
|
||||
r.disableForwarding()
|
||||
|
||||
r.log.Debug(logs.GetTryingToAssembleTheECObject)
|
||||
|
||||
assembler := newAssemblerEC(r.address(), r.infoEC, r.ctxRange(), r, r.containerSource, r.log)
|
||||
|
||||
r.log.Debug(logs.GetAssemblingECObject,
|
||||
zap.Stringer("address", r.address()),
|
||||
zap.Uint64("range_offset", r.ctxRange().GetOffset()),
|
||||
zap.Uint64("range_length", r.ctxRange().GetLength()),
|
||||
)
|
||||
defer r.log.Debug(logs.GetAssemblingECObjectCompleted,
|
||||
zap.Stringer("address", r.address()),
|
||||
zap.Uint64("range_offset", r.ctxRange().GetOffset()),
|
||||
zap.Uint64("range_length", r.ctxRange().GetLength()),
|
||||
)
|
||||
|
||||
obj, err := assembler.Assemble(ctx, r.prm.objWriter, r.headOnly())
|
||||
if err != nil {
|
||||
r.log.Warn(logs.GetFailedToAssembleECObject,
|
||||
zap.Error(err),
|
||||
zap.Stringer("address", r.address()),
|
||||
zap.Uint64("range_offset", r.ctxRange().GetOffset()),
|
||||
zap.Uint64("range_length", r.ctxRange().GetLength()),
|
||||
)
|
||||
}
|
||||
|
||||
var errRemoved *apistatus.ObjectAlreadyRemoved
|
||||
var errOutOfRange *apistatus.ObjectOutOfRange
|
||||
|
||||
switch {
|
||||
default:
|
||||
r.status = statusUndefined
|
||||
r.err = err
|
||||
case err == nil:
|
||||
r.status = statusOK
|
||||
r.err = nil
|
||||
r.collectedObject = obj
|
||||
case errors.As(err, &errRemoved):
|
||||
r.status = statusINHUMED
|
||||
r.err = errRemoved
|
||||
case errors.As(err, &errOutOfRange):
|
||||
r.status = statusOutOfRange
|
||||
r.err = errOutOfRange
|
||||
}
|
||||
}
|
117
pkg/services/object/get/assemblerec.go
Normal file
117
pkg/services/object/get/assemblerec.go
Normal file
|
@ -0,0 +1,117 @@
|
|||
package getsvc
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/policy"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/erasurecode"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"go.uber.org/zap"
|
||||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
||||
type assemblerec struct {
|
||||
addr oid.Address
|
||||
ecInfo *objectSDK.ECInfo
|
||||
rng *objectSDK.Range
|
||||
objGetter objectGetter
|
||||
cs container.Source
|
||||
log *logger.Logger
|
||||
}
|
||||
|
||||
func newAssemblerEC(
|
||||
addr oid.Address,
|
||||
ecInfo *objectSDK.ECInfo,
|
||||
rng *objectSDK.Range,
|
||||
objGetter objectGetter,
|
||||
cs container.Source,
|
||||
log *logger.Logger,
|
||||
) *assemblerec {
|
||||
return &assemblerec{
|
||||
addr: addr,
|
||||
rng: rng,
|
||||
ecInfo: ecInfo,
|
||||
objGetter: objGetter,
|
||||
cs: cs,
|
||||
log: log,
|
||||
}
|
||||
}
|
||||
|
||||
// Assemble assembles erasure-coded object and writes it's content to ObjectWriter.
|
||||
// It returns parent object.
|
||||
func (a *assemblerec) Assemble(ctx context.Context, writer ObjectWriter, headOnly bool) (*objectSDK.Object, error) {
|
||||
parts := a.retrieveParts(ctx, headOnly)
|
||||
cnt, err := a.cs.Get(a.addr.Container())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
c, err := erasurecode.NewConstructor(
|
||||
policy.ECDataCount(cnt.Value.PlacementPolicy()),
|
||||
policy.ECParityCount(cnt.Value.PlacementPolicy()),
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if headOnly {
|
||||
obj, err := c.ReconstructHeader(parts)
|
||||
if err == nil {
|
||||
return obj, writer.WriteHeader(ctx, obj)
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
obj, err := c.Reconstruct(parts)
|
||||
if err == nil {
|
||||
err = writer.WriteHeader(ctx, obj.CutPayload())
|
||||
if err == nil {
|
||||
err = writer.WriteChunk(ctx, obj.Payload())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
}
|
||||
return obj, err
|
||||
}
|
||||
|
||||
func (a *assemblerec) retrieveParts(mainCtx context.Context, headOnly bool) []*objectSDK.Object {
|
||||
parts := make([]*objectSDK.Object, int(a.ecInfo.Chunks[0].Total))
|
||||
errGroup, ctx := errgroup.WithContext(mainCtx)
|
||||
|
||||
for i := range a.ecInfo.Chunks {
|
||||
chunk := a.ecInfo.Chunks[i]
|
||||
errGroup.Go(func() error {
|
||||
objID := new(oid.ID)
|
||||
err := objID.ReadFromV2(chunk.ID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("invalid object ID: %w", err)
|
||||
}
|
||||
var obj *objectSDK.Object
|
||||
if headOnly {
|
||||
obj, err = a.objGetter.HeadObject(ctx, *objID)
|
||||
if err != nil {
|
||||
a.log.Debug(logs.GetUnableToHeadPartECObject, zap.Stringer("part_id", objID), zap.Error(err))
|
||||
return nil
|
||||
}
|
||||
} else {
|
||||
sow := NewSimpleObjectWriter()
|
||||
obj, err = a.objGetter.GetObjectAndWritePayload(ctx, *objID, nil, sow)
|
||||
if err != nil {
|
||||
a.log.Debug(logs.GetUnableToGetPartECObject, zap.Stringer("part_id", objID), zap.Error(err))
|
||||
return nil
|
||||
}
|
||||
obj.SetPayload(sow.pld)
|
||||
}
|
||||
parts[chunk.Index] = obj
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
if err := errGroup.Wait(); err != nil {
|
||||
a.log.Debug(logs.GetUnableToGetAllPartsECObject, zap.Error(err))
|
||||
}
|
||||
return parts
|
||||
}
|
|
@ -73,9 +73,12 @@ func (s *Service) get(ctx context.Context, prm RequestParameters) error {
|
|||
remoteStorageConstructor: s.remoteStorageConstructor,
|
||||
epochSource: s.epochSource,
|
||||
localStorage: s.localStorage,
|
||||
containerSource: s.containerSource,
|
||||
|
||||
prm: prm,
|
||||
infoSplit: objectSDK.NewSplitInfo(),
|
||||
infoEC: objectSDK.NewECInfo(),
|
||||
log: s.log,
|
||||
}
|
||||
|
||||
exec.setLogger(s.log)
|
||||
|
@ -106,6 +109,16 @@ func (exec *request) analyzeStatus(ctx context.Context, execCnr bool) {
|
|||
exec.assemble(ctx)
|
||||
case statusOutOfRange:
|
||||
exec.log.Debug(logs.GetRequestedRangeIsOutOfObjectBounds)
|
||||
case statusEC:
|
||||
if !exec.isLocal() {
|
||||
if execCnr {
|
||||
exec.executeOnContainer(ctx)
|
||||
exec.analyzeStatus(ctx, false)
|
||||
} else {
|
||||
exec.log.Debug(logs.GetRequestedObjectIsEC)
|
||||
exec.assembleEC(ctx)
|
||||
}
|
||||
}
|
||||
default:
|
||||
exec.log.Debug(logs.OperationFinishedWithError,
|
||||
zap.Error(exec.err),
|
||||
|
|
|
@ -5,6 +5,7 @@ import (
|
|||
"errors"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
|
@ -22,6 +23,7 @@ func (r *request) executeLocal(ctx context.Context) {
|
|||
r.collectedObject, err = r.get(ctx)
|
||||
|
||||
var errSplitInfo *objectSDK.SplitInfoError
|
||||
var errECInfo *objectSDK.ECInfoError
|
||||
var errRemoved *apistatus.ObjectAlreadyRemoved
|
||||
var errOutOfRange *apistatus.ObjectOutOfRange
|
||||
|
||||
|
@ -42,6 +44,10 @@ func (r *request) executeLocal(ctx context.Context) {
|
|||
r.status = statusVIRTUAL
|
||||
mergeSplitInfo(r.splitInfo(), errSplitInfo.SplitInfo())
|
||||
r.err = objectSDK.NewSplitInfoError(r.infoSplit)
|
||||
case errors.As(err, &errECInfo):
|
||||
r.status = statusEC
|
||||
util.MergeECInfo(errECInfo.ECInfo(), r.infoEC)
|
||||
r.err = objectSDK.NewECInfoError(r.infoEC)
|
||||
case errors.As(err, &errOutOfRange):
|
||||
r.status = statusOutOfRange
|
||||
r.err = errOutOfRange
|
||||
|
|
|
@ -7,6 +7,8 @@ import (
|
|||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/policy"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
|
@ -27,15 +29,20 @@ func (r *request) processNode(ctx context.Context, info client.NodeInfo) bool {
|
|||
obj, err := r.getRemote(ctx, rs, info)
|
||||
|
||||
var errSplitInfo *objectSDK.SplitInfoError
|
||||
var errECInfo *objectSDK.ECInfoError
|
||||
var errRemoved *apistatus.ObjectAlreadyRemoved
|
||||
var errOutOfRange *apistatus.ObjectOutOfRange
|
||||
|
||||
switch {
|
||||
default:
|
||||
r.log.Debug(logs.GetRemoteCallFailed, zap.Error(err))
|
||||
if r.status == statusEC {
|
||||
// we need to continue getting another chunks from another nodes
|
||||
// in case of network issue
|
||||
return false
|
||||
}
|
||||
r.status = statusUndefined
|
||||
r.err = new(apistatus.ObjectNotFound)
|
||||
|
||||
r.log.Debug(logs.GetRemoteCallFailed, zap.Error(err))
|
||||
case err == nil:
|
||||
r.status = statusOK
|
||||
r.err = nil
|
||||
|
@ -57,6 +64,20 @@ func (r *request) processNode(ctx context.Context, info client.NodeInfo) bool {
|
|||
r.status = statusVIRTUAL
|
||||
mergeSplitInfo(r.splitInfo(), errSplitInfo.SplitInfo())
|
||||
r.err = objectSDK.NewSplitInfoError(r.infoSplit)
|
||||
case errors.As(err, &errECInfo):
|
||||
r.status = statusEC
|
||||
util.MergeECInfo(r.infoEC, errECInfo.ECInfo())
|
||||
r.infoEC = errECInfo.ECInfo()
|
||||
r.err = objectSDK.NewECInfoError(r.infoEC)
|
||||
if r.isRaw() {
|
||||
return len(r.infoEC.Chunks) == int(r.infoEC.Chunks[0].Total)
|
||||
}
|
||||
cnt, err := r.containerSource.Get(r.address().Container())
|
||||
if err == nil {
|
||||
return len(r.infoEC.Chunks) == policy.ECDataCount(cnt.Value.PlacementPolicy())
|
||||
}
|
||||
r.log.Debug(logs.GetUnableToGetECObjectContainer, zap.Error(err))
|
||||
return len(r.infoEC.Chunks) == int(r.infoEC.Chunks[0].Total)
|
||||
}
|
||||
|
||||
return r.status != statusUndefined
|
||||
|
|
|
@ -6,6 +6,7 @@ import (
|
|||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
clientcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||
|
@ -22,6 +23,8 @@ type request struct {
|
|||
|
||||
infoSplit *objectSDK.SplitInfo
|
||||
|
||||
infoEC *objectSDK.ECInfo
|
||||
|
||||
log *logger.Logger
|
||||
|
||||
collectedObject *objectSDK.Object
|
||||
|
@ -33,6 +36,7 @@ type request struct {
|
|||
traverserGenerator traverserGenerator
|
||||
remoteStorageConstructor remoteStorageConstructor
|
||||
localStorage localStorage
|
||||
containerSource container.Source
|
||||
}
|
||||
|
||||
func (r *request) setLogger(l *logger.Logger) {
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package getsvc
|
||||
|
||||
import (
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
@ -16,6 +17,7 @@ type Service struct {
|
|||
epochSource epochSource
|
||||
keyStore keyStorage
|
||||
remoteStorageConstructor remoteStorageConstructor
|
||||
containerSource container.Source
|
||||
}
|
||||
|
||||
// New creates, initializes and returns utility serving
|
||||
|
@ -26,6 +28,7 @@ func New(
|
|||
e localStorageEngine,
|
||||
tg traverserGenerator,
|
||||
cc clientConstructor,
|
||||
cs container.Source,
|
||||
opts ...Option,
|
||||
) *Service {
|
||||
result := &Service{
|
||||
|
@ -39,6 +42,7 @@ func New(
|
|||
remoteStorageConstructor: &multiclientRemoteStorageConstructor{
|
||||
clientConstructor: cc,
|
||||
},
|
||||
containerSource: cs,
|
||||
}
|
||||
for _, option := range opts {
|
||||
option(result)
|
||||
|
|
|
@ -6,6 +6,7 @@ const (
|
|||
statusINHUMED
|
||||
statusVIRTUAL
|
||||
statusOutOfRange
|
||||
statusEC
|
||||
)
|
||||
|
||||
type statusError struct {
|
||||
|
|
|
@ -166,6 +166,9 @@ func (f *getRequestForwarder) readStream(ctx context.Context, c client.MultiAddr
|
|||
case *objectV2.SplitInfo:
|
||||
si := objectSDK.NewSplitInfoFromV2(v)
|
||||
return objectSDK.NewSplitInfoError(si)
|
||||
case *objectV2.ECInfo:
|
||||
ei := objectSDK.NewECInfoFromV2(v)
|
||||
return objectSDK.NewECInfoError(ei)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
|
|
|
@ -84,6 +84,9 @@ func (f *headRequestForwarder) forwardRequestToNode(ctx context.Context, addr ne
|
|||
case *objectV2.SplitInfo:
|
||||
si := objectSDK.NewSplitInfoFromV2(v)
|
||||
return nil, objectSDK.NewSplitInfoError(si)
|
||||
case *objectV2.ECInfo:
|
||||
ei := objectSDK.NewECInfoFromV2(v)
|
||||
return nil, objectSDK.NewECInfoError(ei)
|
||||
}
|
||||
|
||||
objv2 := new(objectV2.Object)
|
||||
|
|
|
@ -82,10 +82,13 @@ func (s *Service) Get(req *objectV2.GetRequest, stream objectSvc.GetObjectStream
|
|||
err = s.svc.Get(stream.Context(), *p)
|
||||
|
||||
var splitErr *objectSDK.SplitInfoError
|
||||
var ecErr *objectSDK.ECInfoError
|
||||
|
||||
switch {
|
||||
case errors.As(err, &splitErr):
|
||||
return stream.Send(splitInfoResponse(splitErr.SplitInfo()))
|
||||
case errors.As(err, &ecErr):
|
||||
return stream.Send(ecInfoResponse(ecErr.ECInfo()))
|
||||
default:
|
||||
return err
|
||||
}
|
||||
|
@ -123,11 +126,16 @@ func (s *Service) Head(ctx context.Context, req *objectV2.HeadRequest) (*objectV
|
|||
err = s.svc.Head(ctx, *p)
|
||||
|
||||
var splitErr *objectSDK.SplitInfoError
|
||||
var ecErr *objectSDK.ECInfoError
|
||||
|
||||
if errors.As(err, &splitErr) {
|
||||
setSplitInfoHeadResponse(splitErr.SplitInfo(), resp)
|
||||
err = nil
|
||||
}
|
||||
if errors.As(err, &ecErr) {
|
||||
setECInfoHeadResponse(ecErr.ECInfo(), resp)
|
||||
err = nil
|
||||
}
|
||||
|
||||
return resp, err
|
||||
}
|
||||
|
|
|
@ -270,6 +270,17 @@ func splitInfoResponse(info *objectSDK.SplitInfo) *objectV2.GetResponse {
|
|||
return resp
|
||||
}
|
||||
|
||||
func ecInfoResponse(info *objectSDK.ECInfo) *objectV2.GetResponse {
|
||||
resp := new(objectV2.GetResponse)
|
||||
|
||||
body := new(objectV2.GetResponseBody)
|
||||
resp.SetBody(body)
|
||||
|
||||
body.SetObjectPart(info.ToV2())
|
||||
|
||||
return resp
|
||||
}
|
||||
|
||||
func splitInfoRangeResponse(info *objectSDK.SplitInfo) *objectV2.GetRangeResponse {
|
||||
resp := new(objectV2.GetRangeResponse)
|
||||
|
||||
|
@ -285,6 +296,10 @@ func setSplitInfoHeadResponse(info *objectSDK.SplitInfo, resp *objectV2.HeadResp
|
|||
resp.GetBody().SetHeaderPart(info.ToV2())
|
||||
}
|
||||
|
||||
func setECInfoHeadResponse(info *objectSDK.ECInfo, resp *objectV2.HeadResponse) {
|
||||
resp.GetBody().SetHeaderPart(info.ToV2())
|
||||
}
|
||||
|
||||
func toHashResponse(typ refs.ChecksumType, res *getsvc.RangeHashRes) *objectV2.GetRangeHashResponse {
|
||||
resp := new(objectV2.GetRangeHashResponse)
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue