forked from TrueCloudLab/frostfs-node
[#1112] node: Implement Range\RangeHash
requests for EC object
Signed-off-by: Anton Nikiforov <an.nikiforov@yadro.com>
This commit is contained in:
parent
a23d53b2d4
commit
00b2b77b26
7 changed files with 84 additions and 12 deletions
4
go.mod
4
go.mod
|
@ -4,10 +4,10 @@ go 1.21
|
||||||
|
|
||||||
require (
|
require (
|
||||||
code.gitea.io/sdk/gitea v0.17.1
|
code.gitea.io/sdk/gitea v0.17.1
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.16.1-0.20240422151450-df9b65324a4c
|
git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.16.1-0.20240427200446-67c6f305b21f
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-contract v0.19.0
|
git.frostfs.info/TrueCloudLab/frostfs-contract v0.19.0
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20231101111734-b3ad3335ff65
|
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20231101111734-b3ad3335ff65
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20240424080726-20ab57bf7ec3
|
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20240507063414-99e02858af12
|
||||||
git.frostfs.info/TrueCloudLab/hrw v1.2.1
|
git.frostfs.info/TrueCloudLab/hrw v1.2.1
|
||||||
git.frostfs.info/TrueCloudLab/policy-engine v0.0.0-20240412130734-0e69e485115a
|
git.frostfs.info/TrueCloudLab/policy-engine v0.0.0-20240412130734-0e69e485115a
|
||||||
git.frostfs.info/TrueCloudLab/tzhash v1.8.0
|
git.frostfs.info/TrueCloudLab/tzhash v1.8.0
|
||||||
|
|
BIN
go.sum
BIN
go.sum
Binary file not shown.
|
@ -102,6 +102,9 @@ func (e *StorageEngine) getRange(ctx context.Context, prm RngPrm) (RngRes, error
|
||||||
if it.SplitInfo != nil {
|
if it.SplitInfo != nil {
|
||||||
return RngRes{}, logicerr.Wrap(objectSDK.NewSplitInfoError(it.SplitInfo))
|
return RngRes{}, logicerr.Wrap(objectSDK.NewSplitInfoError(it.SplitInfo))
|
||||||
}
|
}
|
||||||
|
if it.ECInfo != nil {
|
||||||
|
return RngRes{}, logicerr.Wrap(objectSDK.NewECInfoError(it.ECInfo))
|
||||||
|
}
|
||||||
|
|
||||||
if it.Object == nil {
|
if it.Object == nil {
|
||||||
// If any shard is in a degraded mode, we should assume that metabase could store
|
// If any shard is in a degraded mode, we should assume that metabase could store
|
||||||
|
@ -147,6 +150,8 @@ type getRangeShardIterator struct {
|
||||||
Object *objectSDK.Object
|
Object *objectSDK.Object
|
||||||
SplitInfoError *objectSDK.SplitInfoError
|
SplitInfoError *objectSDK.SplitInfoError
|
||||||
SplitInfo *objectSDK.SplitInfo
|
SplitInfo *objectSDK.SplitInfo
|
||||||
|
ECInfoError *objectSDK.ECInfoError
|
||||||
|
ECInfo *objectSDK.ECInfo
|
||||||
OutError error
|
OutError error
|
||||||
ShardWithMeta hashedShard
|
ShardWithMeta hashedShard
|
||||||
MetaError error
|
MetaError error
|
||||||
|
@ -188,6 +193,14 @@ func (i *getRangeShardIterator) tryGetWithMeta(ctx context.Context) {
|
||||||
|
|
||||||
// stop iterating over shards if SplitInfo structure is complete
|
// stop iterating over shards if SplitInfo structure is complete
|
||||||
return withLink && withLast
|
return withLink && withLast
|
||||||
|
case errors.As(err, &i.ECInfoError):
|
||||||
|
if i.ECInfo == nil {
|
||||||
|
i.ECInfo = objectSDK.NewECInfo()
|
||||||
|
}
|
||||||
|
|
||||||
|
util.MergeECInfo(i.ECInfoError.ECInfo(), i.ECInfo)
|
||||||
|
// stop iterating over shards if ECInfo structure is complete
|
||||||
|
return len(i.ECInfo.Chunks) == int(i.ECInfo.Chunks[0].Total)
|
||||||
case
|
case
|
||||||
client.IsErrObjectAlreadyRemoved(err),
|
client.IsErrObjectAlreadyRemoved(err),
|
||||||
shard.IsErrOutOfRange(err):
|
shard.IsErrOutOfRange(err):
|
||||||
|
|
|
@ -8,6 +8,7 @@ import (
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/policy"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/policy"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||||
|
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/erasurecode"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/erasurecode"
|
||||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
|
@ -45,24 +46,65 @@ func newAssemblerEC(
|
||||||
// Assemble assembles erasure-coded object and writes it's content to ObjectWriter.
|
// Assemble assembles erasure-coded object and writes it's content to ObjectWriter.
|
||||||
// It returns parent object.
|
// It returns parent object.
|
||||||
func (a *assemblerec) Assemble(ctx context.Context, writer ObjectWriter, headOnly bool) (*objectSDK.Object, error) {
|
func (a *assemblerec) Assemble(ctx context.Context, writer ObjectWriter, headOnly bool) (*objectSDK.Object, error) {
|
||||||
parts := a.retrieveParts(ctx, headOnly)
|
if headOnly {
|
||||||
|
return a.reconstructHeader(ctx, writer)
|
||||||
|
} else if a.rng != nil {
|
||||||
|
return a.reconstructRange(ctx, writer)
|
||||||
|
}
|
||||||
|
return a.reconstructObject(ctx, writer)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *assemblerec) getConstructor() (*erasurecode.Constructor, error) {
|
||||||
cnt, err := a.cs.Get(a.addr.Container())
|
cnt, err := a.cs.Get(a.addr.Container())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
c, err := erasurecode.NewConstructor(
|
dataCount := policy.ECDataCount(cnt.Value.PlacementPolicy())
|
||||||
policy.ECDataCount(cnt.Value.PlacementPolicy()),
|
parityCount := policy.ECParityCount(cnt.Value.PlacementPolicy())
|
||||||
policy.ECParityCount(cnt.Value.PlacementPolicy()),
|
return erasurecode.NewConstructor(dataCount, parityCount)
|
||||||
)
|
}
|
||||||
|
|
||||||
|
func (a *assemblerec) reconstructHeader(ctx context.Context, writer ObjectWriter) (*objectSDK.Object, error) {
|
||||||
|
parts := a.retrieveParts(ctx, true)
|
||||||
|
c, err := a.getConstructor()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if headOnly {
|
|
||||||
obj, err := c.ReconstructHeader(parts)
|
obj, err := c.ReconstructHeader(parts)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
return obj, writer.WriteHeader(ctx, obj)
|
return obj, writer.WriteHeader(ctx, obj)
|
||||||
}
|
}
|
||||||
return nil, err
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *assemblerec) reconstructRange(ctx context.Context, writer ObjectWriter) (*objectSDK.Object, error) {
|
||||||
|
parts := a.retrieveParts(ctx, false)
|
||||||
|
c, err := a.getConstructor()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
obj, err := c.Reconstruct(parts)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
from := a.rng.GetOffset()
|
||||||
|
to := from + a.rng.GetLength()
|
||||||
|
if pLen := uint64(len(obj.Payload())); to < from || pLen < from || pLen < to {
|
||||||
|
return nil, &apistatus.ObjectOutOfRange{}
|
||||||
|
}
|
||||||
|
err = writer.WriteChunk(ctx, obj.Payload()[from:to])
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return obj, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *assemblerec) reconstructObject(ctx context.Context, writer ObjectWriter) (*objectSDK.Object, error) {
|
||||||
|
parts := a.retrieveParts(ctx, false)
|
||||||
|
c, err := a.getConstructor()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
}
|
}
|
||||||
obj, err := c.Reconstruct(parts)
|
obj, err := c.Reconstruct(parts)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
|
|
|
@ -132,6 +132,9 @@ func (f *getRangeRequestForwarder) readStream(ctx context.Context, rangeStream *
|
||||||
case *objectV2.SplitInfo:
|
case *objectV2.SplitInfo:
|
||||||
si := objectSDK.NewSplitInfoFromV2(v)
|
si := objectSDK.NewSplitInfoFromV2(v)
|
||||||
return objectSDK.NewSplitInfoError(si)
|
return objectSDK.NewSplitInfoError(si)
|
||||||
|
case *objectV2.ECInfo:
|
||||||
|
ei := objectSDK.NewECInfoFromV2(v)
|
||||||
|
return objectSDK.NewECInfoError(ei)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
|
|
|
@ -104,10 +104,13 @@ func (s *Service) GetRange(req *objectV2.GetRangeRequest, stream objectSvc.GetOb
|
||||||
err = s.svc.GetRange(stream.Context(), *p)
|
err = s.svc.GetRange(stream.Context(), *p)
|
||||||
|
|
||||||
var splitErr *objectSDK.SplitInfoError
|
var splitErr *objectSDK.SplitInfoError
|
||||||
|
var ecErr *objectSDK.ECInfoError
|
||||||
|
|
||||||
switch {
|
switch {
|
||||||
case errors.As(err, &splitErr):
|
case errors.As(err, &splitErr):
|
||||||
return stream.Send(splitInfoRangeResponse(splitErr.SplitInfo()))
|
return stream.Send(splitInfoRangeResponse(splitErr.SplitInfo()))
|
||||||
|
case errors.As(err, &ecErr):
|
||||||
|
return stream.Send(ecInfoRangeResponse(ecErr.ECInfo()))
|
||||||
default:
|
default:
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
@ -292,6 +292,17 @@ func splitInfoRangeResponse(info *objectSDK.SplitInfo) *objectV2.GetRangeRespons
|
||||||
return resp
|
return resp
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func ecInfoRangeResponse(info *objectSDK.ECInfo) *objectV2.GetRangeResponse {
|
||||||
|
resp := new(objectV2.GetRangeResponse)
|
||||||
|
|
||||||
|
body := new(objectV2.GetRangeResponseBody)
|
||||||
|
resp.SetBody(body)
|
||||||
|
|
||||||
|
body.SetRangePart(info.ToV2())
|
||||||
|
|
||||||
|
return resp
|
||||||
|
}
|
||||||
|
|
||||||
func setSplitInfoHeadResponse(info *objectSDK.SplitInfo, resp *objectV2.HeadResponse) {
|
func setSplitInfoHeadResponse(info *objectSDK.SplitInfo, resp *objectV2.HeadResponse) {
|
||||||
resp.GetBody().SetHeaderPart(info.ToV2())
|
resp.GetBody().SetHeaderPart(info.ToV2())
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue