node: Implement Get\Head requests for EC object #1103

Merged
fyrchik merged 1 commit from acid-ant/frostfs-node:feature/ec-get-head into master 2024-09-04 19:51:08 +00:00
30 changed files with 575 additions and 7 deletions

View file

@ -99,6 +99,10 @@ func getObject(cmd *cobra.Command, _ []string) {
return
}
if ok := printECInfoErr(cmd, err); ok {
return
}
commonCmd.ExitOnErr(cmd, "rpc error: %w", err)
}

View file

@ -70,6 +70,10 @@ func getObjectHeader(cmd *cobra.Command, _ []string) {
return
}
if ok := printECInfoErr(cmd, err); ok {
return
}
commonCmd.ExitOnErr(cmd, "rpc error: %w", err)
}

View file

@ -146,6 +146,50 @@ func marshalSplitInfo(cmd *cobra.Command, info *objectSDK.SplitInfo) ([]byte, er
}
}
func printECInfoErr(cmd *cobra.Command, err error) bool {
var errECInfo *objectSDK.ECInfoError
ok := errors.As(err, &errECInfo)
if ok {
cmd.PrintErrln("Object is erasure-encoded, ec information received.")
printECInfo(cmd, errECInfo.ECInfo())
}
return ok
}
func printECInfo(cmd *cobra.Command, info *objectSDK.ECInfo) {
bs, err := marshalECInfo(cmd, info)
commonCmd.ExitOnErr(cmd, "can't marshal split info: %w", err)
cmd.Println(string(bs))
}
func marshalECInfo(cmd *cobra.Command, info *objectSDK.ECInfo) ([]byte, error) {
toJSON, _ := cmd.Flags().GetBool(commonflags.JSON)
toProto, _ := cmd.Flags().GetBool("proto")
switch {
case toJSON && toProto:
return nil, errors.New("'--json' and '--proto' flags are mutually exclusive")
case toJSON:
return info.MarshalJSON()
case toProto:
return info.Marshal()
default:
b := bytes.NewBuffer(nil)
b.WriteString("Total chunks: " + strconv.Itoa(int(info.Chunks[0].Total)))
for _, chunk := range info.Chunks {
var id oid.ID
if err := id.Decode(chunk.ID.GetValue()); err != nil {
return nil, fmt.Errorf("unable to decode chunk id: %w", err)
}
b.WriteString("\n Index: " + strconv.Itoa(int(chunk.Index)) + " ID: " + id.String())
}
return b.Bytes(), nil
}
}
func getRangeList(cmd *cobra.Command) ([]objectSDK.Range, error) {
v := cmd.Flag("range").Value.String()
if len(v) == 0 {

View file

@ -173,7 +173,7 @@ func initObjectService(c *cfg) {
sSearchV2 := createSearchSvcV2(sSearch, keyStorage)
sGet := createGetService(c, keyStorage, traverseGen, c.clientCache)
sGet := createGetService(c, keyStorage, traverseGen, c.clientCache, c.cfgObject.cnrSource)
*c.cfgObject.getSvc = *sGet // need smth better
@ -358,6 +358,7 @@ func createSearchSvcV2(sSearch *searchsvc.Service, keyStorage *util.KeyStorage)
func createGetService(c *cfg, keyStorage *util.KeyStorage, traverseGen *util.TraverserGenerator,
coreConstructor *cache.ClientCache,
containerSource containercore.Source,
) *getsvc.Service {
ls := c.cfgObject.cfgLocalStorage.localStorage
@ -369,6 +370,7 @@ func createGetService(c *cfg, keyStorage *util.KeyStorage, traverseGen *util.Tra
placement.SuccessAfter(1),
),
coreConstructor,
containerSource,
getsvc.WithLogger(c.log))
}

4
go.mod
View file

@ -4,10 +4,10 @@ go 1.21
require (
code.gitea.io/sdk/gitea v0.17.1
git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.16.1-0.20240327095603-491a47e7fe24
git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.16.1-0.20240422151450-df9b65324a4c
git.frostfs.info/TrueCloudLab/frostfs-contract v0.19.0
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20231101111734-b3ad3335ff65
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20240329104804-ec0cb2169f92
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20240424080726-20ab57bf7ec3
git.frostfs.info/TrueCloudLab/hrw v1.2.1
git.frostfs.info/TrueCloudLab/policy-engine v0.0.0-20240412130734-0e69e485115a
git.frostfs.info/TrueCloudLab/tzhash v1.8.0

BIN
go.sum

Binary file not shown.

View file

@ -99,9 +99,17 @@ const (
GetRemoteCallFailed = "remote call failed"
GetCanNotAssembleTheObject = "can not assemble the object"
GetTryingToAssembleTheObject = "trying to assemble the object..."
GetTryingToAssembleTheECObject = "trying to assemble the ec object..."
GetAssemblingSplittedObject = "assembling splitted object..."
GetAssemblingECObject = "assembling erasure-coded object..."
GetUnableToGetAllPartsECObject = "unable to get all parts, continue to reconstruct with existed"
GetUnableToGetPartECObject = "unable to get part of the erasure-encoded object"
GetUnableToHeadPartECObject = "unable to head part of the erasure-encoded object"
GetUnableToGetECObjectContainer = "unable to get container for erasure-coded object"
GetAssemblingSplittedObjectCompleted = "assembling splitted object completed"
GetAssemblingECObjectCompleted = "assembling erasure-coded object completed"
GetFailedToAssembleSplittedObject = "failed to assemble splitted object"
GetFailedToAssembleECObject = "failed to assemble erasure-coded object"
GetCouldNotGenerateContainerTraverser = "could not generate container traverser"
GetCouldNotConstructRemoteNodeClient = "could not construct remote node client"
GetCouldNotWriteHeader = "could not write header"
@ -111,6 +119,7 @@ const (
GetCompletingTheOperation = "completing the operation"
GetRequestedObjectWasMarkedAsRemoved = "requested object was marked as removed"
GetRequestedObjectIsVirtual = "requested object is virtual"
GetRequestedObjectIsEC = "requested object is erasure-coded"
GetRequestedRangeIsOutOfObjectBounds = "requested range is out of object bounds"
PutAdditionalContainerBroadcastFailure = "additional container broadcast failure"
SearchReturnResultDirectly = "return result directly"

View file

@ -88,6 +88,10 @@ func (e *StorageEngine) get(ctx context.Context, prm GetPrm) (GetRes, error) {
return GetRes{}, logicerr.Wrap(objectSDK.NewSplitInfoError(it.SplitInfo))
}
if it.ECInfo != nil {
return GetRes{}, logicerr.Wrap(objectSDK.NewECInfoError(it.ECInfo))
}
if it.ObjectExpired {
return GetRes{}, errNotFound
}
@ -119,6 +123,7 @@ func (e *StorageEngine) get(ctx context.Context, prm GetPrm) (GetRes, error) {
type getShardIterator struct {
Object *objectSDK.Object
SplitInfo *objectSDK.SplitInfo
ECInfo *objectSDK.ECInfo
OutError error
ShardWithMeta hashedShard
MetaError error
@ -130,6 +135,7 @@ type getShardIterator struct {
Engine *StorageEngine
splitInfoErr *objectSDK.SplitInfoError
ecInfoErr *objectSDK.ECInfoError
}
func (i *getShardIterator) tryGetWithMeta(ctx context.Context) {
@ -164,6 +170,14 @@ func (i *getShardIterator) tryGetWithMeta(ctx context.Context) {
// stop iterating over shards if SplitInfo structure is complete
return withLink && withLast
case errors.As(err, &i.ecInfoErr):
if i.ECInfo == nil {
i.ECInfo = objectSDK.NewECInfo()
}
util.MergeECInfo(i.ecInfoErr.ECInfo(), i.ECInfo)
// stop iterating over shards if ECInfo structure is complete
return len(i.ECInfo.Chunks) == int(i.ECInfo.Chunks[0].Total)
case client.IsErrObjectAlreadyRemoved(err):
i.OutError = err
return true // stop, return it back

View file

@ -75,6 +75,8 @@ func (e *StorageEngine) head(ctx context.Context, prm HeadPrm) (HeadRes, error)
head *objectSDK.Object
siErr *objectSDK.SplitInfoError
outSI *objectSDK.SplitInfo
eiErr *objectSDK.ECInfoError
outEI *objectSDK.ECInfo
outError error = new(apistatus.ObjectNotFound)
shPrm shard.HeadPrm
)
@ -99,6 +101,13 @@ func (e *StorageEngine) head(ctx context.Context, prm HeadPrm) (HeadRes, error)
return true
}
return false
case errors.As(err, &eiErr):
if outEI == nil {
outEI = objectSDK.NewECInfo()
}
util.MergeECInfo(eiErr.ECInfo(), outEI)
// stop iterating over shards if ECInfo structure is complete
return len(outEI.Chunks) == int(outEI.Chunks[0].Total)
case client.IsErrObjectAlreadyRemoved(err):
outError = err
return true // stop, return it back
@ -118,6 +127,8 @@ func (e *StorageEngine) head(ctx context.Context, prm HeadPrm) (HeadRes, error)
if outSI != nil {
return HeadRes{}, logicerr.Wrap(objectSDK.NewSplitInfoError(outSI))
} else if outEI != nil {
return HeadRes{}, logicerr.Wrap(objectSDK.NewECInfoError(outEI))
} else if head == nil {
return HeadRes{}, outError
}

View file

@ -108,6 +108,10 @@ func (db *DB) exists(tx *bbolt.Tx, addr oid.Address, currEpoch uint64) (exists b
return false, logicerr.Wrap(objectSDK.NewSplitInfoError(splitInfo))
}
// if parent bucket is empty, then check if object exists in ec bucket
if data := getFromBucket(tx, ecInfoBucketName(cnr, key), objKey); len(data) != 0 {
return false, getECInfoError(tx, cnr, data)
}
// if parent bucket is empty, then check if object exists in typed buckets
return firstIrregularObjectType(tx, cnr, objKey) != objectSDK.TypeRegular, nil

View file

@ -110,6 +110,11 @@ func (db *DB) get(tx *bbolt.Tx, addr oid.Address, key []byte, checkStatus, raw b
return obj, obj.Unmarshal(data)
}
data = getFromBucket(tx, ecInfoBucketName(cnr, bucketName), key)
if len(data) != 0 {
return nil, getECInfoError(tx, cnr, data)
}
// if not found then check in tombstone index
data = getFromBucket(tx, tombstoneBucketName(cnr, bucketName), key)
if len(data) != 0 {
@ -185,3 +190,27 @@ func getSplitInfoError(tx *bbolt.Tx, cnr cid.ID, key []byte) error {
return logicerr.Wrap(new(apistatus.ObjectNotFound))
}
func getECInfoError(tx *bbolt.Tx, cnr cid.ID, data []byte) error {
offset := 0
ecInfo := objectSDK.NewECInfo()
for offset < len(data) {
key := data[offset : offset+objectKeySize]
// check in primary index
ojbData := getFromBucket(tx, primaryBucketName(cnr, make([]byte, bucketKeySize)), key)
if len(data) != 0 {
obj := objectSDK.New()
if err := obj.Unmarshal(ojbData); err != nil {
return err
}
chunk := objectSDK.ECChunk{}
id, _ := obj.ID()
chunk.SetID(id)
chunk.Index = obj.ECHeader().Index()
chunk.Total = obj.ECHeader().Total()
ecInfo.AddChunk(chunk)
}
offset += objectKeySize
}
return logicerr.Wrap(objectSDK.NewECInfoError(ecInfo))
}

View file

@ -3,6 +3,7 @@ package meta_test
import (
"bytes"
"context"
"errors"
"fmt"
"os"
"runtime"
@ -15,8 +16,10 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/erasurecode"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"github.com/stretchr/testify/require"
)
@ -109,6 +112,42 @@ func TestDB_Get(t *testing.T) {
require.True(t, binaryEqual(child.CutPayload(), newChild))
})
t.Run("put erasure-coded object", func(t *testing.T) {
cnr := cidtest.ID()
virtual := testutil.GenerateObjectWithCID(cnr)
c, err := erasurecode.NewConstructor(3, 1)
require.NoError(t, err)
pk, err := keys.NewPrivateKey()
require.NoError(t, err)
parts, err := c.Split(virtual, &pk.PrivateKey)
require.NoError(t, err)
for _, part := range parts {
err = putBig(db, part)
var eiError *objectSDK.ECInfoError
if err != nil && !errors.As(err, &eiError) {
require.NoError(t, err)
}
}
_, err = metaGet(db, object.AddressOf(virtual), true)
var eiError *objectSDK.ECInfoError
require.ErrorAs(t, err, &eiError)
require.Equal(t, len(eiError.ECInfo().Chunks), len(parts))
for _, chunk := range eiError.ECInfo().Chunks {
var found bool
for _, part := range parts {
partID, _ := part.ID()
var chunkID oid.ID
require.NoError(t, chunkID.ReadFromV2(chunk.ID))
if chunkID.Equals(partID) {
found = true
}
}
if !found {
require.Fail(t, "chunk not found")
}
}
})
t.Run("get removed object", func(t *testing.T) {
obj := oidtest.Address()
ts := oidtest.Address()

View file

@ -1,6 +1,7 @@
package meta
import (
"bytes"
"context"
"encoding/binary"
"errors"
@ -264,6 +265,13 @@ func putUniqueIndexes(
if err != nil {
return err
}
if ecHead := obj.GetECHeader(); ecHead != nil {
err = putECInfo(tx, cnr, objKey, ecHead)
if err != nil {
return err
}
}
}
return nil
@ -571,3 +579,34 @@ func isLinkObject(obj *objectSDK.Object) bool {
func isLastObject(obj *objectSDK.Object) bool {
return len(obj.Children()) == 0 && obj.Parent() != nil
}
func putECInfo(tx *bbolt.Tx,
cnr cid.ID, objKey []byte,
ecHead *objectSDK.ECHeader,
) error {
parentID := objectKey(ecHead.Parent(), make([]byte, objectKeySize))
bucketName := make([]byte, bucketKeySize)
val := getFromBucket(tx, ecInfoBucketName(cnr, bucketName), parentID)
if len(val) == 0 {
val = objKey
} else {
offset := 0
found := false
for offset < len(val) {
if bytes.Equal(objKey, val[offset:offset+objectKeySize]) {
found = true
break
}
offset += objectKeySize
}
if !found {
val = append(val, objKey...)
}
}
return putUniqueIndexItem(tx, namedBucketItem{
name: ecInfoBucketName(cnr, make([]byte, bucketKeySize)),
key: parentID,
val: val,
})
}

View file

@ -119,6 +119,11 @@ const (
// Key: container ID + type
// Value: container size in bytes as little-endian uint64
containerCountersPrefix
// ecInfoPrefix is used for storing relation between EC parent id and chunk id.
// Key: container ID + type
// Value: Object id
ecInfoPrefix
)
const (
@ -190,6 +195,11 @@ func splitBucketName(cnr cid.ID, key []byte) []byte {
return bucketName(cnr, splitPrefix, key)
}
// ecInfoBucketName returns <CID>_ecinfo.
func ecInfoBucketName(cnr cid.ID, key []byte) []byte {
return bucketName(cnr, ecInfoPrefix, key)
}
// addressKey returns key for K-V tables when key is a whole address.
func addressKey(addr oid.Address, key []byte) []byte {
addr.Container().Encode(key)

View file

@ -0,0 +1,25 @@
package util
import (
"bytes"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
)
// MergeECInfo ignores conflicts and rewrites `to` with non empty values
// from `from`.
func MergeECInfo(from, to *objectSDK.ECInfo) *objectSDK.ECInfo {
for _, fchunk := range from.Chunks {
add := true
for _, tchunk := range to.Chunks {
if bytes.Equal(tchunk.ID.GetValue(), fchunk.ID.GetValue()) {
add = false
break
}
}
if add {
to.AddChunk(*objectSDK.NewECChunkFromV2(&fchunk))
}
}
return to
}

View file

@ -0,0 +1,56 @@
package util
import (
"crypto/rand"
"testing"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"github.com/stretchr/testify/require"
)
func TestMergeECInfo(t *testing.T) {
id := generateV2ID()
target := objectSDK.NewECInfo()
var chunk objectSDK.ECChunk
chunk.Total = 2
chunk.Index = 0
chunk.SetID(id)
target.AddChunk(chunk)
t.Run("merge empty", func(t *testing.T) {
to := objectSDK.NewECInfo()
result := MergeECInfo(target, to)
require.Equal(t, result, target)
})
t.Run("merge existed", func(t *testing.T) {
to := objectSDK.NewECInfo()
to.AddChunk(chunk)
result := MergeECInfo(target, to)
require.Equal(t, result, target)
})
t.Run("merge extend", func(t *testing.T) {
to := objectSDK.NewECInfo()
var chunk objectSDK.ECChunk
chunk.Total = 2
chunk.Index = 1
chunk.SetID(generateV2ID())
to.AddChunk(chunk)
result := MergeECInfo(target, to)
require.Equal(t, len(result.Chunks), 2)
})
}
func generateV2ID() oid.ID {
var buf [32]byte
_, _ = rand.Read(buf[:])
var id oid.ID
_ = id.Decode(buf[:])
return id
}

View file

@ -167,8 +167,9 @@ func (x *multiClient) iterateClients(ctx context.Context, f func(clientcore.Clie
// from the SDK client; should not be considered
// as a connection error
var siErr *objectSDK.SplitInfoError
var eiErr *objectSDK.ECInfoError
success := err == nil || errors.Is(err, context.Canceled) || errors.As(err, &siErr)
success := err == nil || errors.Is(err, context.Canceled) || errors.As(err, &siErr) || errors.As(err, &eiErr)
if success || firstErr == nil || errors.Is(firstErr, errRecentlyFailed) {
firstErr = err
}
@ -196,7 +197,8 @@ func (x *multiClient) ReportError(err error) {
// from the SDK client; should not be considered
// as a connection error
var siErr *objectSDK.SplitInfoError
if errors.As(err, &siErr) {
var eiErr *objectSDK.ECInfoError
if errors.As(err, &siErr) || errors.As(err, &eiErr) {
return
}

View file

@ -139,9 +139,11 @@ func (r *request) getObjectWithIndependentRequest(ctx context.Context, prm Reque
remoteStorageConstructor: r.remoteStorageConstructor,
epochSource: r.epochSource,
localStorage: r.localStorage,
containerSource: r.containerSource,
prm: prm,
infoSplit: objectSDK.NewSplitInfo(),
infoEC: objectSDK.NewECInfo(),
log: r.log,
}

View file

@ -0,0 +1,79 @@
package getsvc
import (
"context"
"errors"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
"go.uber.org/zap"
)
func (r *request) assembleEC(ctx context.Context) {
if r.isRaw() {
r.log.Debug(logs.GetCanNotAssembleTheObject)
return
}
// Any access tokens are not expected to be used in the assembly process:
// - there is no requirement to specify child objects in session/bearer
// token for `GET`/`GETRANGE`/`RANGEHASH` requests in the API protocol,
// and, therefore, their missing in the original request should not be
// considered as error; on the other hand, without session for every child
// object, it is impossible to attach bearer token in the new generated
// requests correctly because the token has not been issued for that node's
// key;
// - the assembly process is expected to be handled on a container node
// only since the requests forwarding mechanism presentation; such the
// node should have enough rights for getting any child object by design.
r.prm.common.ForgetTokens()
// Do not use forwarding during assembly stage.
// Request forwarding closure inherited in produced
// `execCtx` so it should be disabled there.
r.disableForwarding()
r.log.Debug(logs.GetTryingToAssembleTheECObject)
assembler := newAssemblerEC(r.address(), r.infoEC, r.ctxRange(), r, r.containerSource, r.log)
r.log.Debug(logs.GetAssemblingECObject,
zap.Stringer("address", r.address()),
zap.Uint64("range_offset", r.ctxRange().GetOffset()),
zap.Uint64("range_length", r.ctxRange().GetLength()),
)
defer r.log.Debug(logs.GetAssemblingECObjectCompleted,
zap.Stringer("address", r.address()),
zap.Uint64("range_offset", r.ctxRange().GetOffset()),
zap.Uint64("range_length", r.ctxRange().GetLength()),
)
obj, err := assembler.Assemble(ctx, r.prm.objWriter, r.headOnly())
if err != nil {
r.log.Warn(logs.GetFailedToAssembleECObject,
zap.Error(err),
zap.Stringer("address", r.address()),
zap.Uint64("range_offset", r.ctxRange().GetOffset()),
zap.Uint64("range_length", r.ctxRange().GetLength()),
)
}
var errRemoved *apistatus.ObjectAlreadyRemoved
var errOutOfRange *apistatus.ObjectOutOfRange
switch {
default:
r.status = statusUndefined
r.err = err
case err == nil:
r.status = statusOK
r.err = nil
r.collectedObject = obj
case errors.As(err, &errRemoved):
r.status = statusINHUMED
r.err = errRemoved
case errors.As(err, &errOutOfRange):
r.status = statusOutOfRange
r.err = errOutOfRange
}
}

View file

@ -0,0 +1,117 @@
package getsvc
import (
"context"
"fmt"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/policy"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/erasurecode"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)
type assemblerec struct {
addr oid.Address
ecInfo *objectSDK.ECInfo
rng *objectSDK.Range
objGetter objectGetter
cs container.Source
log *logger.Logger
}
func newAssemblerEC(
addr oid.Address,
ecInfo *objectSDK.ECInfo,
rng *objectSDK.Range,
objGetter objectGetter,
cs container.Source,
log *logger.Logger,
) *assemblerec {
return &assemblerec{
addr: addr,
rng: rng,
ecInfo: ecInfo,
objGetter: objGetter,
cs: cs,
log: log,
}
}
// Assemble assembles erasure-coded object and writes it's content to ObjectWriter.
// It returns parent object.
func (a *assemblerec) Assemble(ctx context.Context, writer ObjectWriter, headOnly bool) (*objectSDK.Object, error) {
parts := a.retrieveParts(ctx, headOnly)
cnt, err := a.cs.Get(a.addr.Container())
if err != nil {
return nil, err
}
c, err := erasurecode.NewConstructor(
policy.ECDataCount(cnt.Value.PlacementPolicy()),
policy.ECParityCount(cnt.Value.PlacementPolicy()),
)
if err != nil {
return nil, err
}
if headOnly {
obj, err := c.ReconstructHeader(parts)
if err == nil {
return obj, writer.WriteHeader(ctx, obj)
}
return nil, err
}
obj, err := c.Reconstruct(parts)
if err == nil {
err = writer.WriteHeader(ctx, obj.CutPayload())
if err == nil {
err = writer.WriteChunk(ctx, obj.Payload())
if err != nil {
return nil, err
}
}
}
return obj, err
}
func (a *assemblerec) retrieveParts(mainCtx context.Context, headOnly bool) []*objectSDK.Object {
parts := make([]*objectSDK.Object, int(a.ecInfo.Chunks[0].Total))
errGroup, ctx := errgroup.WithContext(mainCtx)
for i := range a.ecInfo.Chunks {
chunk := a.ecInfo.Chunks[i]
errGroup.Go(func() error {
objID := new(oid.ID)
err := objID.ReadFromV2(chunk.ID)
if err != nil {
return fmt.Errorf("invalid object ID: %w", err)
}
var obj *objectSDK.Object
if headOnly {
obj, err = a.objGetter.HeadObject(ctx, *objID)
if err != nil {
a.log.Debug(logs.GetUnableToHeadPartECObject, zap.Stringer("part_id", objID), zap.Error(err))
return nil
}
} else {
sow := NewSimpleObjectWriter()
obj, err = a.objGetter.GetObjectAndWritePayload(ctx, *objID, nil, sow)
if err != nil {
a.log.Debug(logs.GetUnableToGetPartECObject, zap.Stringer("part_id", objID), zap.Error(err))
return nil
}
obj.SetPayload(sow.pld)
}
parts[chunk.Index] = obj
return nil
})
}
if err := errGroup.Wait(); err != nil {
a.log.Debug(logs.GetUnableToGetAllPartsECObject, zap.Error(err))
}
return parts
}

View file

@ -73,9 +73,12 @@ func (s *Service) get(ctx context.Context, prm RequestParameters) error {
remoteStorageConstructor: s.remoteStorageConstructor,
epochSource: s.epochSource,
localStorage: s.localStorage,
containerSource: s.containerSource,
prm: prm,
infoSplit: objectSDK.NewSplitInfo(),
infoEC: objectSDK.NewECInfo(),
log: s.log,
}
exec.setLogger(s.log)
@ -106,6 +109,16 @@ func (exec *request) analyzeStatus(ctx context.Context, execCnr bool) {
exec.assemble(ctx)
case statusOutOfRange:
exec.log.Debug(logs.GetRequestedRangeIsOutOfObjectBounds)
case statusEC:
if !exec.isLocal() {
if execCnr {
exec.executeOnContainer(ctx)
exec.analyzeStatus(ctx, false)
} else {
exec.log.Debug(logs.GetRequestedObjectIsEC)
exec.assembleEC(ctx)
}
}
default:
exec.log.Debug(logs.OperationFinishedWithError,
zap.Error(exec.err),

View file

@ -5,6 +5,7 @@ import (
"errors"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
@ -22,6 +23,7 @@ func (r *request) executeLocal(ctx context.Context) {
r.collectedObject, err = r.get(ctx)
var errSplitInfo *objectSDK.SplitInfoError
var errECInfo *objectSDK.ECInfoError
var errRemoved *apistatus.ObjectAlreadyRemoved
var errOutOfRange *apistatus.ObjectOutOfRange
@ -42,6 +44,10 @@ func (r *request) executeLocal(ctx context.Context) {
r.status = statusVIRTUAL
mergeSplitInfo(r.splitInfo(), errSplitInfo.SplitInfo())
r.err = objectSDK.NewSplitInfoError(r.infoSplit)
case errors.As(err, &errECInfo):
r.status = statusEC
util.MergeECInfo(errECInfo.ECInfo(), r.infoEC)
r.err = objectSDK.NewECInfoError(r.infoEC)
case errors.As(err, &errOutOfRange):
r.status = statusOutOfRange
r.err = errOutOfRange

View file

@ -7,6 +7,8 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/policy"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
@ -27,15 +29,20 @@ func (r *request) processNode(ctx context.Context, info client.NodeInfo) bool {
obj, err := r.getRemote(ctx, rs, info)
var errSplitInfo *objectSDK.SplitInfoError
var errECInfo *objectSDK.ECInfoError
var errRemoved *apistatus.ObjectAlreadyRemoved
var errOutOfRange *apistatus.ObjectOutOfRange
switch {
default:
r.log.Debug(logs.GetRemoteCallFailed, zap.Error(err))
if r.status == statusEC {
// we need to continue getting another chunks from another nodes
// in case of network issue
return false
}
r.status = statusUndefined
r.err = new(apistatus.ObjectNotFound)
r.log.Debug(logs.GetRemoteCallFailed, zap.Error(err))
case err == nil:
r.status = statusOK
r.err = nil
@ -57,6 +64,20 @@ func (r *request) processNode(ctx context.Context, info client.NodeInfo) bool {
r.status = statusVIRTUAL
mergeSplitInfo(r.splitInfo(), errSplitInfo.SplitInfo())
r.err = objectSDK.NewSplitInfoError(r.infoSplit)
case errors.As(err, &errECInfo):
r.status = statusEC
util.MergeECInfo(r.infoEC, errECInfo.ECInfo())
r.infoEC = errECInfo.ECInfo()
r.err = objectSDK.NewECInfoError(r.infoEC)
if r.isRaw() {
return len(r.infoEC.Chunks) == int(r.infoEC.Chunks[0].Total)
}
cnt, err := r.containerSource.Get(r.address().Container())
if err == nil {
return len(r.infoEC.Chunks) == policy.ECDataCount(cnt.Value.PlacementPolicy())
}
r.log.Debug(logs.GetUnableToGetECObjectContainer, zap.Error(err))
return len(r.infoEC.Chunks) == int(r.infoEC.Chunks[0].Total)
}
return r.status != statusUndefined

View file

@ -6,6 +6,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
clientcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
@ -22,6 +23,8 @@ type request struct {
infoSplit *objectSDK.SplitInfo
infoEC *objectSDK.ECInfo
log *logger.Logger
collectedObject *objectSDK.Object
@ -33,6 +36,7 @@ type request struct {
traverserGenerator traverserGenerator
remoteStorageConstructor remoteStorageConstructor
localStorage localStorage
containerSource container.Source
}
func (r *request) setLogger(l *logger.Logger) {

View file

@ -1,6 +1,7 @@
package getsvc
import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
"go.uber.org/zap"
)
@ -16,6 +17,7 @@ type Service struct {
epochSource epochSource
keyStore keyStorage
remoteStorageConstructor remoteStorageConstructor
containerSource container.Source
}
// New creates, initializes and returns utility serving
@ -26,6 +28,7 @@ func New(
e localStorageEngine,
tg traverserGenerator,
cc clientConstructor,
cs container.Source,
opts ...Option,
) *Service {
result := &Service{
@ -39,6 +42,7 @@ func New(
remoteStorageConstructor: &multiclientRemoteStorageConstructor{
clientConstructor: cc,
},
containerSource: cs,
}
for _, option := range opts {
option(result)

View file

@ -6,6 +6,7 @@ const (
statusINHUMED
statusVIRTUAL
statusOutOfRange
statusEC
)
type statusError struct {

View file

@ -166,6 +166,9 @@ func (f *getRequestForwarder) readStream(ctx context.Context, c client.MultiAddr
case *objectV2.SplitInfo:
si := objectSDK.NewSplitInfoFromV2(v)
return objectSDK.NewSplitInfoError(si)
case *objectV2.ECInfo:
ei := objectSDK.NewECInfoFromV2(v)
return objectSDK.NewECInfoError(ei)
}
}
return nil

View file

@ -84,6 +84,9 @@ func (f *headRequestForwarder) forwardRequestToNode(ctx context.Context, addr ne
case *objectV2.SplitInfo:
si := objectSDK.NewSplitInfoFromV2(v)
return nil, objectSDK.NewSplitInfoError(si)
case *objectV2.ECInfo:
ei := objectSDK.NewECInfoFromV2(v)
return nil, objectSDK.NewECInfoError(ei)
}
objv2 := new(objectV2.Object)

View file

@ -82,10 +82,13 @@ func (s *Service) Get(req *objectV2.GetRequest, stream objectSvc.GetObjectStream
err = s.svc.Get(stream.Context(), *p)
var splitErr *objectSDK.SplitInfoError
var ecErr *objectSDK.ECInfoError
switch {
case errors.As(err, &splitErr):
return stream.Send(splitInfoResponse(splitErr.SplitInfo()))
case errors.As(err, &ecErr):
return stream.Send(ecInfoResponse(ecErr.ECInfo()))
default:
return err
}
@ -123,11 +126,16 @@ func (s *Service) Head(ctx context.Context, req *objectV2.HeadRequest) (*objectV
err = s.svc.Head(ctx, *p)
var splitErr *objectSDK.SplitInfoError
var ecErr *objectSDK.ECInfoError
if errors.As(err, &splitErr) {
setSplitInfoHeadResponse(splitErr.SplitInfo(), resp)
err = nil
}
if errors.As(err, &ecErr) {
setECInfoHeadResponse(ecErr.ECInfo(), resp)
err = nil
}
return resp, err
}

View file

@ -270,6 +270,17 @@ func splitInfoResponse(info *objectSDK.SplitInfo) *objectV2.GetResponse {
return resp
}
func ecInfoResponse(info *objectSDK.ECInfo) *objectV2.GetResponse {
resp := new(objectV2.GetResponse)
body := new(objectV2.GetResponseBody)
resp.SetBody(body)
body.SetObjectPart(info.ToV2())
return resp
}
func splitInfoRangeResponse(info *objectSDK.SplitInfo) *objectV2.GetRangeResponse {
resp := new(objectV2.GetRangeResponse)
@ -285,6 +296,10 @@ func setSplitInfoHeadResponse(info *objectSDK.SplitInfo, resp *objectV2.HeadResp
resp.GetBody().SetHeaderPart(info.ToV2())
}
func setECInfoHeadResponse(info *objectSDK.ECInfo, resp *objectV2.HeadResponse) {
resp.GetBody().SetHeaderPart(info.ToV2())
}
func toHashResponse(typ refs.ChecksumType, res *getsvc.RangeHashRes) *objectV2.GetRangeHashResponse {
resp := new(objectV2.GetRangeHashResponse)