node: Implement Get\Head requests for EC object #1103

Merged
fyrchik merged 1 commit from acid-ant/frostfs-node:feature/ec-get-head into master 2024-09-04 19:51:08 +00:00
30 changed files with 575 additions and 7 deletions

View file

@ -99,6 +99,10 @@ func getObject(cmd *cobra.Command, _ []string) {
return return
} }
if ok := printECInfoErr(cmd, err); ok {
return
}
commonCmd.ExitOnErr(cmd, "rpc error: %w", err) commonCmd.ExitOnErr(cmd, "rpc error: %w", err)
} }

View file

@ -70,6 +70,10 @@ func getObjectHeader(cmd *cobra.Command, _ []string) {
return return
} }
if ok := printECInfoErr(cmd, err); ok {
return
}
commonCmd.ExitOnErr(cmd, "rpc error: %w", err) commonCmd.ExitOnErr(cmd, "rpc error: %w", err)
} }

View file

@ -146,6 +146,50 @@ func marshalSplitInfo(cmd *cobra.Command, info *objectSDK.SplitInfo) ([]byte, er
} }
} }
func printECInfoErr(cmd *cobra.Command, err error) bool {
var errECInfo *objectSDK.ECInfoError
ok := errors.As(err, &errECInfo)
if ok {
cmd.PrintErrln("Object is erasure-encoded, ec information received.")
printECInfo(cmd, errECInfo.ECInfo())
}
return ok
}
func printECInfo(cmd *cobra.Command, info *objectSDK.ECInfo) {
bs, err := marshalECInfo(cmd, info)
commonCmd.ExitOnErr(cmd, "can't marshal split info: %w", err)
cmd.Println(string(bs))
}
func marshalECInfo(cmd *cobra.Command, info *objectSDK.ECInfo) ([]byte, error) {
toJSON, _ := cmd.Flags().GetBool(commonflags.JSON)
toProto, _ := cmd.Flags().GetBool("proto")
switch {
case toJSON && toProto:
return nil, errors.New("'--json' and '--proto' flags are mutually exclusive")
case toJSON:
return info.MarshalJSON()
case toProto:
return info.Marshal()
default:
b := bytes.NewBuffer(nil)
b.WriteString("Total chunks: " + strconv.Itoa(int(info.Chunks[0].Total)))
for _, chunk := range info.Chunks {
var id oid.ID
if err := id.Decode(chunk.ID.GetValue()); err != nil {
return nil, fmt.Errorf("unable to decode chunk id: %w", err)
}
b.WriteString("\n Index: " + strconv.Itoa(int(chunk.Index)) + " ID: " + id.String())
}
return b.Bytes(), nil
}
}
func getRangeList(cmd *cobra.Command) ([]objectSDK.Range, error) { func getRangeList(cmd *cobra.Command) ([]objectSDK.Range, error) {
v := cmd.Flag("range").Value.String() v := cmd.Flag("range").Value.String()
if len(v) == 0 { if len(v) == 0 {

View file

@ -173,7 +173,7 @@ func initObjectService(c *cfg) {
sSearchV2 := createSearchSvcV2(sSearch, keyStorage) sSearchV2 := createSearchSvcV2(sSearch, keyStorage)
sGet := createGetService(c, keyStorage, traverseGen, c.clientCache) sGet := createGetService(c, keyStorage, traverseGen, c.clientCache, c.cfgObject.cnrSource)
*c.cfgObject.getSvc = *sGet // need smth better *c.cfgObject.getSvc = *sGet // need smth better
@ -358,6 +358,7 @@ func createSearchSvcV2(sSearch *searchsvc.Service, keyStorage *util.KeyStorage)
func createGetService(c *cfg, keyStorage *util.KeyStorage, traverseGen *util.TraverserGenerator, func createGetService(c *cfg, keyStorage *util.KeyStorage, traverseGen *util.TraverserGenerator,
coreConstructor *cache.ClientCache, coreConstructor *cache.ClientCache,
containerSource containercore.Source,
) *getsvc.Service { ) *getsvc.Service {
ls := c.cfgObject.cfgLocalStorage.localStorage ls := c.cfgObject.cfgLocalStorage.localStorage
@ -369,6 +370,7 @@ func createGetService(c *cfg, keyStorage *util.KeyStorage, traverseGen *util.Tra
placement.SuccessAfter(1), placement.SuccessAfter(1),
), ),
coreConstructor, coreConstructor,
containerSource,
getsvc.WithLogger(c.log)) getsvc.WithLogger(c.log))
} }

4
go.mod
View file

@ -4,10 +4,10 @@ go 1.21
require ( require (
code.gitea.io/sdk/gitea v0.17.1 code.gitea.io/sdk/gitea v0.17.1
git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.16.1-0.20240327095603-491a47e7fe24 git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.16.1-0.20240422151450-df9b65324a4c
git.frostfs.info/TrueCloudLab/frostfs-contract v0.19.0 git.frostfs.info/TrueCloudLab/frostfs-contract v0.19.0
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20231101111734-b3ad3335ff65 git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20231101111734-b3ad3335ff65
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20240329104804-ec0cb2169f92 git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20240424080726-20ab57bf7ec3
git.frostfs.info/TrueCloudLab/hrw v1.2.1 git.frostfs.info/TrueCloudLab/hrw v1.2.1
git.frostfs.info/TrueCloudLab/policy-engine v0.0.0-20240412130734-0e69e485115a git.frostfs.info/TrueCloudLab/policy-engine v0.0.0-20240412130734-0e69e485115a
git.frostfs.info/TrueCloudLab/tzhash v1.8.0 git.frostfs.info/TrueCloudLab/tzhash v1.8.0

BIN
go.sum

Binary file not shown.

View file

@ -99,9 +99,17 @@ const (
GetRemoteCallFailed = "remote call failed" GetRemoteCallFailed = "remote call failed"
GetCanNotAssembleTheObject = "can not assemble the object" GetCanNotAssembleTheObject = "can not assemble the object"
GetTryingToAssembleTheObject = "trying to assemble the object..." GetTryingToAssembleTheObject = "trying to assemble the object..."
GetTryingToAssembleTheECObject = "trying to assemble the ec object..."
GetAssemblingSplittedObject = "assembling splitted object..." GetAssemblingSplittedObject = "assembling splitted object..."
GetAssemblingECObject = "assembling erasure-coded object..."
GetUnableToGetAllPartsECObject = "unable to get all parts, continue to reconstruct with existed"
GetUnableToGetPartECObject = "unable to get part of the erasure-encoded object"
GetUnableToHeadPartECObject = "unable to head part of the erasure-encoded object"
GetUnableToGetECObjectContainer = "unable to get container for erasure-coded object"
GetAssemblingSplittedObjectCompleted = "assembling splitted object completed" GetAssemblingSplittedObjectCompleted = "assembling splitted object completed"
GetAssemblingECObjectCompleted = "assembling erasure-coded object completed"
GetFailedToAssembleSplittedObject = "failed to assemble splitted object" GetFailedToAssembleSplittedObject = "failed to assemble splitted object"
GetFailedToAssembleECObject = "failed to assemble erasure-coded object"
GetCouldNotGenerateContainerTraverser = "could not generate container traverser" GetCouldNotGenerateContainerTraverser = "could not generate container traverser"
GetCouldNotConstructRemoteNodeClient = "could not construct remote node client" GetCouldNotConstructRemoteNodeClient = "could not construct remote node client"
GetCouldNotWriteHeader = "could not write header" GetCouldNotWriteHeader = "could not write header"
@ -111,6 +119,7 @@ const (
GetCompletingTheOperation = "completing the operation" GetCompletingTheOperation = "completing the operation"
fyrchik marked this conversation as resolved Outdated

We have "erasure coded" here and "erasure-encoded" in CLI print.
Wikipedia uses "erasure-coded" https://en.wikipedia.org/wiki/Erasure_code, let's stick with it.

We have "erasure coded" here and "erasure-encoded" in CLI print. Wikipedia uses "erasure-coded" https://en.wikipedia.org/wiki/Erasure_code, let's stick with it.

Agree, updated here in other places.

Agree, updated here in other places.
GetRequestedObjectWasMarkedAsRemoved = "requested object was marked as removed" GetRequestedObjectWasMarkedAsRemoved = "requested object was marked as removed"
GetRequestedObjectIsVirtual = "requested object is virtual" GetRequestedObjectIsVirtual = "requested object is virtual"
GetRequestedObjectIsEC = "requested object is erasure-coded"
GetRequestedRangeIsOutOfObjectBounds = "requested range is out of object bounds" GetRequestedRangeIsOutOfObjectBounds = "requested range is out of object bounds"
PutAdditionalContainerBroadcastFailure = "additional container broadcast failure" PutAdditionalContainerBroadcastFailure = "additional container broadcast failure"
SearchReturnResultDirectly = "return result directly" SearchReturnResultDirectly = "return result directly"

View file

@ -88,6 +88,10 @@ func (e *StorageEngine) get(ctx context.Context, prm GetPrm) (GetRes, error) {
return GetRes{}, logicerr.Wrap(objectSDK.NewSplitInfoError(it.SplitInfo)) return GetRes{}, logicerr.Wrap(objectSDK.NewSplitInfoError(it.SplitInfo))
} }
if it.ECInfo != nil {
return GetRes{}, logicerr.Wrap(objectSDK.NewECInfoError(it.ECInfo))
}
if it.ObjectExpired { if it.ObjectExpired {
return GetRes{}, errNotFound return GetRes{}, errNotFound
} }
@ -119,6 +123,7 @@ func (e *StorageEngine) get(ctx context.Context, prm GetPrm) (GetRes, error) {
type getShardIterator struct { type getShardIterator struct {
Object *objectSDK.Object Object *objectSDK.Object
SplitInfo *objectSDK.SplitInfo SplitInfo *objectSDK.SplitInfo
ECInfo *objectSDK.ECInfo
OutError error OutError error
ShardWithMeta hashedShard ShardWithMeta hashedShard
MetaError error MetaError error
@ -130,6 +135,7 @@ type getShardIterator struct {
Engine *StorageEngine Engine *StorageEngine
splitInfoErr *objectSDK.SplitInfoError splitInfoErr *objectSDK.SplitInfoError
ecInfoErr *objectSDK.ECInfoError
} }
func (i *getShardIterator) tryGetWithMeta(ctx context.Context) { func (i *getShardIterator) tryGetWithMeta(ctx context.Context) {
@ -164,6 +170,14 @@ func (i *getShardIterator) tryGetWithMeta(ctx context.Context) {
// stop iterating over shards if SplitInfo structure is complete // stop iterating over shards if SplitInfo structure is complete
return withLink && withLast return withLink && withLast
case errors.As(err, &i.ecInfoErr):
if i.ECInfo == nil {
i.ECInfo = objectSDK.NewECInfo()
}
util.MergeECInfo(i.ecInfoErr.ECInfo(), i.ECInfo)
// stop iterating over shards if ECInfo structure is complete
return len(i.ECInfo.Chunks) == int(i.ECInfo.Chunks[0].Total)
case client.IsErrObjectAlreadyRemoved(err): case client.IsErrObjectAlreadyRemoved(err):
i.OutError = err i.OutError = err
return true // stop, return it back return true // stop, return it back

View file

@ -75,6 +75,8 @@ func (e *StorageEngine) head(ctx context.Context, prm HeadPrm) (HeadRes, error)
head *objectSDK.Object head *objectSDK.Object
siErr *objectSDK.SplitInfoError siErr *objectSDK.SplitInfoError
outSI *objectSDK.SplitInfo outSI *objectSDK.SplitInfo
eiErr *objectSDK.ECInfoError
outEI *objectSDK.ECInfo
outError error = new(apistatus.ObjectNotFound) outError error = new(apistatus.ObjectNotFound)
shPrm shard.HeadPrm shPrm shard.HeadPrm
) )
@ -99,6 +101,13 @@ func (e *StorageEngine) head(ctx context.Context, prm HeadPrm) (HeadRes, error)
return true return true
} }
return false return false
case errors.As(err, &eiErr):
if outEI == nil {
outEI = objectSDK.NewECInfo()
}
util.MergeECInfo(eiErr.ECInfo(), outEI)
// stop iterating over shards if ECInfo structure is complete
return len(outEI.Chunks) == int(outEI.Chunks[0].Total)
case client.IsErrObjectAlreadyRemoved(err): case client.IsErrObjectAlreadyRemoved(err):
outError = err outError = err
return true // stop, return it back return true // stop, return it back
@ -118,6 +127,8 @@ func (e *StorageEngine) head(ctx context.Context, prm HeadPrm) (HeadRes, error)
if outSI != nil { if outSI != nil {
return HeadRes{}, logicerr.Wrap(objectSDK.NewSplitInfoError(outSI)) return HeadRes{}, logicerr.Wrap(objectSDK.NewSplitInfoError(outSI))
} else if outEI != nil {
return HeadRes{}, logicerr.Wrap(objectSDK.NewECInfoError(outEI))
} else if head == nil { } else if head == nil {
return HeadRes{}, outError return HeadRes{}, outError
} }

View file

@ -108,6 +108,10 @@ func (db *DB) exists(tx *bbolt.Tx, addr oid.Address, currEpoch uint64) (exists b
return false, logicerr.Wrap(objectSDK.NewSplitInfoError(splitInfo)) return false, logicerr.Wrap(objectSDK.NewSplitInfoError(splitInfo))
} }
// if parent bucket is empty, then check if object exists in ec bucket
if data := getFromBucket(tx, ecInfoBucketName(cnr, key), objKey); len(data) != 0 {
return false, getECInfoError(tx, cnr, data)
}
// if parent bucket is empty, then check if object exists in typed buckets // if parent bucket is empty, then check if object exists in typed buckets
return firstIrregularObjectType(tx, cnr, objKey) != objectSDK.TypeRegular, nil return firstIrregularObjectType(tx, cnr, objKey) != objectSDK.TypeRegular, nil

View file

@ -110,6 +110,11 @@ func (db *DB) get(tx *bbolt.Tx, addr oid.Address, key []byte, checkStatus, raw b
return obj, obj.Unmarshal(data) return obj, obj.Unmarshal(data)
} }
data = getFromBucket(tx, ecInfoBucketName(cnr, bucketName), key)
if len(data) != 0 {
return nil, getECInfoError(tx, cnr, data)
}
// if not found then check in tombstone index // if not found then check in tombstone index
data = getFromBucket(tx, tombstoneBucketName(cnr, bucketName), key) data = getFromBucket(tx, tombstoneBucketName(cnr, bucketName), key)
if len(data) != 0 { if len(data) != 0 {
@ -185,3 +190,27 @@ func getSplitInfoError(tx *bbolt.Tx, cnr cid.ID, key []byte) error {
return logicerr.Wrap(new(apistatus.ObjectNotFound)) return logicerr.Wrap(new(apistatus.ObjectNotFound))
} }
func getECInfoError(tx *bbolt.Tx, cnr cid.ID, data []byte) error {
offset := 0
ecInfo := objectSDK.NewECInfo()
for offset < len(data) {
key := data[offset : offset+objectKeySize]
// check in primary index
ojbData := getFromBucket(tx, primaryBucketName(cnr, make([]byte, bucketKeySize)), key)
if len(data) != 0 {
obj := objectSDK.New()
if err := obj.Unmarshal(ojbData); err != nil {
return err
}
chunk := objectSDK.ECChunk{}
id, _ := obj.ID()
chunk.SetID(id)
chunk.Index = obj.ECHeader().Index()
chunk.Total = obj.ECHeader().Total()
ecInfo.AddChunk(chunk)
}
offset += objectKeySize
}
return logicerr.Wrap(objectSDK.NewECInfoError(ecInfo))
}

View file

@ -3,6 +3,7 @@ package meta_test
import ( import (
"bytes" "bytes"
"context" "context"
"errors"
"fmt" "fmt"
"os" "os"
"runtime" "runtime"
@ -15,8 +16,10 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test" cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/erasurecode"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test" oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
@ -109,6 +112,42 @@ func TestDB_Get(t *testing.T) {
require.True(t, binaryEqual(child.CutPayload(), newChild)) require.True(t, binaryEqual(child.CutPayload(), newChild))
}) })
t.Run("put erasure-coded object", func(t *testing.T) {
cnr := cidtest.ID()
virtual := testutil.GenerateObjectWithCID(cnr)
c, err := erasurecode.NewConstructor(3, 1)
require.NoError(t, err)
pk, err := keys.NewPrivateKey()
require.NoError(t, err)
parts, err := c.Split(virtual, &pk.PrivateKey)
require.NoError(t, err)
for _, part := range parts {
err = putBig(db, part)
var eiError *objectSDK.ECInfoError
if err != nil && !errors.As(err, &eiError) {
require.NoError(t, err)
}
}
_, err = metaGet(db, object.AddressOf(virtual), true)
var eiError *objectSDK.ECInfoError
require.ErrorAs(t, err, &eiError)
require.Equal(t, len(eiError.ECInfo().Chunks), len(parts))
for _, chunk := range eiError.ECInfo().Chunks {
var found bool
for _, part := range parts {
partID, _ := part.ID()
var chunkID oid.ID
require.NoError(t, chunkID.ReadFromV2(chunk.ID))
if chunkID.Equals(partID) {
found = true
}
}
if !found {
require.Fail(t, "chunk not found")
}
}
})
t.Run("get removed object", func(t *testing.T) { t.Run("get removed object", func(t *testing.T) {
obj := oidtest.Address() obj := oidtest.Address()
ts := oidtest.Address() ts := oidtest.Address()

View file

@ -1,6 +1,7 @@
package meta package meta
import ( import (
"bytes"
"context" "context"
"encoding/binary" "encoding/binary"
"errors" "errors"
@ -264,6 +265,13 @@ func putUniqueIndexes(
if err != nil { if err != nil {
return err return err
} }
if ecHead := obj.GetECHeader(); ecHead != nil {
err = putECInfo(tx, cnr, objKey, ecHead)
if err != nil {
return err
}
}
} }
return nil return nil
@ -571,3 +579,34 @@ func isLinkObject(obj *objectSDK.Object) bool {
func isLastObject(obj *objectSDK.Object) bool { func isLastObject(obj *objectSDK.Object) bool {
return len(obj.Children()) == 0 && obj.Parent() != nil return len(obj.Children()) == 0 && obj.Parent() != nil
} }
func putECInfo(tx *bbolt.Tx,
cnr cid.ID, objKey []byte,
ecHead *objectSDK.ECHeader,
) error {
parentID := objectKey(ecHead.Parent(), make([]byte, objectKeySize))
bucketName := make([]byte, bucketKeySize)
val := getFromBucket(tx, ecInfoBucketName(cnr, bucketName), parentID)
if len(val) == 0 {
val = objKey
} else {
offset := 0
found := false
for offset < len(val) {
if bytes.Equal(objKey, val[offset:offset+objectKeySize]) {
found = true
break
}
offset += objectKeySize
}
if !found {
val = append(val, objKey...)
}
}
return putUniqueIndexItem(tx, namedBucketItem{
name: ecInfoBucketName(cnr, make([]byte, bucketKeySize)),
key: parentID,
val: val,
})
}

View file

@ -119,6 +119,11 @@ const (
// Key: container ID + type // Key: container ID + type
// Value: container size in bytes as little-endian uint64 // Value: container size in bytes as little-endian uint64
containerCountersPrefix containerCountersPrefix
// ecInfoPrefix is used for storing relation between EC parent id and chunk id.
// Key: container ID + type
// Value: Object id
ecInfoPrefix
) )
const ( const (
@ -190,6 +195,11 @@ func splitBucketName(cnr cid.ID, key []byte) []byte {
return bucketName(cnr, splitPrefix, key) return bucketName(cnr, splitPrefix, key)
} }
// ecInfoBucketName returns <CID>_ecinfo.
func ecInfoBucketName(cnr cid.ID, key []byte) []byte {
return bucketName(cnr, ecInfoPrefix, key)
}
// addressKey returns key for K-V tables when key is a whole address. // addressKey returns key for K-V tables when key is a whole address.
func addressKey(addr oid.Address, key []byte) []byte { func addressKey(addr oid.Address, key []byte) []byte {
addr.Container().Encode(key) addr.Container().Encode(key)

View file

@ -0,0 +1,25 @@
package util
import (
"bytes"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
)
// MergeECInfo ignores conflicts and rewrites `to` with non empty values
// from `from`.
func MergeECInfo(from, to *objectSDK.ECInfo) *objectSDK.ECInfo {
for _, fchunk := range from.Chunks {
add := true
for _, tchunk := range to.Chunks {
if bytes.Equal(tchunk.ID.GetValue(), fchunk.ID.GetValue()) {
add = false
break
}
}
if add {
to.AddChunk(*objectSDK.NewECChunkFromV2(&fchunk))
}
}
return to
}

View file

@ -0,0 +1,56 @@
package util
import (
"crypto/rand"
"testing"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"github.com/stretchr/testify/require"
)
func TestMergeECInfo(t *testing.T) {
id := generateV2ID()
target := objectSDK.NewECInfo()
var chunk objectSDK.ECChunk
chunk.Total = 2
chunk.Index = 0
chunk.SetID(id)
target.AddChunk(chunk)
t.Run("merge empty", func(t *testing.T) {
to := objectSDK.NewECInfo()
result := MergeECInfo(target, to)
require.Equal(t, result, target)
})
t.Run("merge existed", func(t *testing.T) {
to := objectSDK.NewECInfo()
to.AddChunk(chunk)
result := MergeECInfo(target, to)
require.Equal(t, result, target)
})
t.Run("merge extend", func(t *testing.T) {
to := objectSDK.NewECInfo()
var chunk objectSDK.ECChunk
chunk.Total = 2
chunk.Index = 1
chunk.SetID(generateV2ID())
to.AddChunk(chunk)
result := MergeECInfo(target, to)
require.Equal(t, len(result.Chunks), 2)
})
}
func generateV2ID() oid.ID {
var buf [32]byte
_, _ = rand.Read(buf[:])
var id oid.ID
_ = id.Decode(buf[:])
return id
}

View file

@ -167,8 +167,9 @@ func (x *multiClient) iterateClients(ctx context.Context, f func(clientcore.Clie
// from the SDK client; should not be considered // from the SDK client; should not be considered
// as a connection error // as a connection error
var siErr *objectSDK.SplitInfoError var siErr *objectSDK.SplitInfoError
var eiErr *objectSDK.ECInfoError
success := err == nil || errors.Is(err, context.Canceled) || errors.As(err, &siErr) success := err == nil || errors.Is(err, context.Canceled) || errors.As(err, &siErr) || errors.As(err, &eiErr)
if success || firstErr == nil || errors.Is(firstErr, errRecentlyFailed) { if success || firstErr == nil || errors.Is(firstErr, errRecentlyFailed) {
firstErr = err firstErr = err
} }
@ -196,7 +197,8 @@ func (x *multiClient) ReportError(err error) {
// from the SDK client; should not be considered // from the SDK client; should not be considered
// as a connection error // as a connection error
var siErr *objectSDK.SplitInfoError var siErr *objectSDK.SplitInfoError
if errors.As(err, &siErr) { var eiErr *objectSDK.ECInfoError
if errors.As(err, &siErr) || errors.As(err, &eiErr) {
return return
} }

View file

@ -139,9 +139,11 @@ func (r *request) getObjectWithIndependentRequest(ctx context.Context, prm Reque
remoteStorageConstructor: r.remoteStorageConstructor, remoteStorageConstructor: r.remoteStorageConstructor,
epochSource: r.epochSource, epochSource: r.epochSource,
localStorage: r.localStorage, localStorage: r.localStorage,
containerSource: r.containerSource,
prm: prm, prm: prm,
infoSplit: objectSDK.NewSplitInfo(), infoSplit: objectSDK.NewSplitInfo(),
infoEC: objectSDK.NewECInfo(),
log: r.log, log: r.log,
} }

View file

@ -0,0 +1,79 @@
package getsvc
import (
"context"
"errors"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
"go.uber.org/zap"
)
func (r *request) assembleEC(ctx context.Context) {
if r.isRaw() {
r.log.Debug(logs.GetCanNotAssembleTheObject)
return
}
// Any access tokens are not expected to be used in the assembly process:
// - there is no requirement to specify child objects in session/bearer
// token for `GET`/`GETRANGE`/`RANGEHASH` requests in the API protocol,
// and, therefore, their missing in the original request should not be
// considered as error; on the other hand, without session for every child
// object, it is impossible to attach bearer token in the new generated
// requests correctly because the token has not been issued for that node's
// key;
// - the assembly process is expected to be handled on a container node
// only since the requests forwarding mechanism presentation; such the
// node should have enough rights for getting any child object by design.
r.prm.common.ForgetTokens()
// Do not use forwarding during assembly stage.
// Request forwarding closure inherited in produced
// `execCtx` so it should be disabled there.
r.disableForwarding()
r.log.Debug(logs.GetTryingToAssembleTheECObject)
assembler := newAssemblerEC(r.address(), r.infoEC, r.ctxRange(), r, r.containerSource, r.log)
r.log.Debug(logs.GetAssemblingECObject,
zap.Stringer("address", r.address()),
zap.Uint64("range_offset", r.ctxRange().GetOffset()),
zap.Uint64("range_length", r.ctxRange().GetLength()),
)
defer r.log.Debug(logs.GetAssemblingECObjectCompleted,
zap.Stringer("address", r.address()),
zap.Uint64("range_offset", r.ctxRange().GetOffset()),
zap.Uint64("range_length", r.ctxRange().GetLength()),
)
obj, err := assembler.Assemble(ctx, r.prm.objWriter, r.headOnly())
if err != nil {
r.log.Warn(logs.GetFailedToAssembleECObject,
zap.Error(err),
zap.Stringer("address", r.address()),
zap.Uint64("range_offset", r.ctxRange().GetOffset()),
zap.Uint64("range_length", r.ctxRange().GetLength()),
)
}
var errRemoved *apistatus.ObjectAlreadyRemoved
var errOutOfRange *apistatus.ObjectOutOfRange
switch {

Look like remote and local errors are the same.

Look like remote and local errors are the same.

Removed.

Removed.

Incompleted

var errOutOfRangeRemote *apistatus.ObjectOutOfRange
var errOutOfRangeLocal *apistatus.ObjectOutOfRange
Incompleted ``` var errOutOfRangeRemote *apistatus.ObjectOutOfRange var errOutOfRangeLocal *apistatus.ObjectOutOfRange ```

Sorry, fixed.

Sorry, fixed.
default:
r.status = statusUndefined
r.err = err
case err == nil:
r.status = statusOK
r.err = nil
r.collectedObject = obj
case errors.As(err, &errRemoved):
r.status = statusINHUMED
r.err = errRemoved
case errors.As(err, &errOutOfRange):
r.status = statusOutOfRange
r.err = errOutOfRange
}
}

View file

@ -0,0 +1,117 @@
package getsvc
import (
"context"
"fmt"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/policy"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/erasurecode"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)
type assemblerec struct {
addr oid.Address
ecInfo *objectSDK.ECInfo
rng *objectSDK.Range
objGetter objectGetter
cs container.Source
log *logger.Logger
}
func newAssemblerEC(
addr oid.Address,
ecInfo *objectSDK.ECInfo,
rng *objectSDK.Range,
objGetter objectGetter,
cs container.Source,
log *logger.Logger,
) *assemblerec {
return &assemblerec{
addr: addr,
rng: rng,
ecInfo: ecInfo,
objGetter: objGetter,
cs: cs,
log: log,
}
}
// Assemble assembles erasure-coded object and writes it's content to ObjectWriter.
// It returns parent object.
func (a *assemblerec) Assemble(ctx context.Context, writer ObjectWriter, headOnly bool) (*objectSDK.Object, error) {
fyrchik marked this conversation as resolved Outdated

When we are to add GET_RANGE do we need to alter this function or write code in another place?

When we are to add GET_RANGE do we need to alter this function or write code in another place?

We need to extend assembler for EC a bit, yes.

We need to extend assembler for EC a bit, yes.
parts := a.retrieveParts(ctx, headOnly)
cnt, err := a.cs.Get(a.addr.Container())
if err != nil {
return nil, err
}
c, err := erasurecode.NewConstructor(
policy.ECDataCount(cnt.Value.PlacementPolicy()),
policy.ECParityCount(cnt.Value.PlacementPolicy()),
)
if err != nil {
return nil, err
}
if headOnly {
obj, err := c.ReconstructHeader(parts)
if err == nil {
return obj, writer.WriteHeader(ctx, obj)
}
return nil, err
}
obj, err := c.Reconstruct(parts)
if err == nil {
err = writer.WriteHeader(ctx, obj.CutPayload())
if err == nil {
err = writer.WriteChunk(ctx, obj.Payload())
if err != nil {
return nil, err
}
}
}
return obj, err
}
func (a *assemblerec) retrieveParts(mainCtx context.Context, headOnly bool) []*objectSDK.Object {
parts := make([]*objectSDK.Object, int(a.ecInfo.Chunks[0].Total))
errGroup, ctx := errgroup.WithContext(mainCtx)
for i := range a.ecInfo.Chunks {
chunk := a.ecInfo.Chunks[i]
errGroup.Go(func() error {
objID := new(oid.ID)
err := objID.ReadFromV2(chunk.ID)
if err != nil {
return fmt.Errorf("invalid object ID: %w", err)
}
var obj *objectSDK.Object
if headOnly {
obj, err = a.objGetter.HeadObject(ctx, *objID)
if err != nil {
a.log.Debug(logs.GetUnableToHeadPartECObject, zap.Stringer("part_id", objID), zap.Error(err))
return nil
}
} else {
sow := NewSimpleObjectWriter()
obj, err = a.objGetter.GetObjectAndWritePayload(ctx, *objID, nil, sow)
if err != nil {

ctx -> mainCtx, or even better to move errgroup related code to separate func.
As far as I remember errgoup's ctx is cancelled after Wait.

ctx -> mainCtx, or even better to move `errgroup` related code to separate func. As far as I remember errgoup's ctx is cancelled after Wait.

Thanks a lot! Split on two functions.

Thanks a lot! Split on two functions.
a.log.Debug(logs.GetUnableToGetPartECObject, zap.Stringer("part_id", objID), zap.Error(err))
return nil
}
obj.SetPayload(sow.pld)
}
parts[chunk.Index] = obj
return nil
})
}
if err := errGroup.Wait(); err != nil {
a.log.Debug(logs.GetUnableToGetAllPartsECObject, zap.Error(err))
}
return parts
}

View file

@ -73,9 +73,12 @@ func (s *Service) get(ctx context.Context, prm RequestParameters) error {
remoteStorageConstructor: s.remoteStorageConstructor, remoteStorageConstructor: s.remoteStorageConstructor,
epochSource: s.epochSource, epochSource: s.epochSource,
localStorage: s.localStorage, localStorage: s.localStorage,
containerSource: s.containerSource,
prm: prm, prm: prm,
infoSplit: objectSDK.NewSplitInfo(), infoSplit: objectSDK.NewSplitInfo(),
infoEC: objectSDK.NewECInfo(),
log: s.log,
} }
exec.setLogger(s.log) exec.setLogger(s.log)
@ -106,6 +109,16 @@ func (exec *request) analyzeStatus(ctx context.Context, execCnr bool) {
exec.assemble(ctx) exec.assemble(ctx)
case statusOutOfRange: case statusOutOfRange:
exec.log.Debug(logs.GetRequestedRangeIsOutOfObjectBounds) exec.log.Debug(logs.GetRequestedRangeIsOutOfObjectBounds)
case statusEC:
if !exec.isLocal() {
if execCnr {
exec.executeOnContainer(ctx)
exec.analyzeStatus(ctx, false)
} else {
exec.log.Debug(logs.GetRequestedObjectIsEC)
exec.assembleEC(ctx)
}
}
default: default:
exec.log.Debug(logs.OperationFinishedWithError, exec.log.Debug(logs.OperationFinishedWithError,
zap.Error(exec.err), zap.Error(exec.err),

View file

@ -5,6 +5,7 @@ import (
"errors" "errors"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
@ -22,6 +23,7 @@ func (r *request) executeLocal(ctx context.Context) {
r.collectedObject, err = r.get(ctx) r.collectedObject, err = r.get(ctx)
var errSplitInfo *objectSDK.SplitInfoError var errSplitInfo *objectSDK.SplitInfoError
var errECInfo *objectSDK.ECInfoError
var errRemoved *apistatus.ObjectAlreadyRemoved var errRemoved *apistatus.ObjectAlreadyRemoved
var errOutOfRange *apistatus.ObjectOutOfRange var errOutOfRange *apistatus.ObjectOutOfRange
@ -42,6 +44,10 @@ func (r *request) executeLocal(ctx context.Context) {
r.status = statusVIRTUAL r.status = statusVIRTUAL
mergeSplitInfo(r.splitInfo(), errSplitInfo.SplitInfo()) mergeSplitInfo(r.splitInfo(), errSplitInfo.SplitInfo())
r.err = objectSDK.NewSplitInfoError(r.infoSplit) r.err = objectSDK.NewSplitInfoError(r.infoSplit)
case errors.As(err, &errECInfo):
r.status = statusEC
util.MergeECInfo(errECInfo.ECInfo(), r.infoEC)
r.err = objectSDK.NewECInfoError(r.infoEC)
case errors.As(err, &errOutOfRange): case errors.As(err, &errOutOfRange):
r.status = statusOutOfRange r.status = statusOutOfRange
r.err = errOutOfRange r.err = errOutOfRange

View file

@ -7,6 +7,8 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/policy"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
@ -27,15 +29,20 @@ func (r *request) processNode(ctx context.Context, info client.NodeInfo) bool {
obj, err := r.getRemote(ctx, rs, info) obj, err := r.getRemote(ctx, rs, info)
var errSplitInfo *objectSDK.SplitInfoError var errSplitInfo *objectSDK.SplitInfoError
var errECInfo *objectSDK.ECInfoError
var errRemoved *apistatus.ObjectAlreadyRemoved var errRemoved *apistatus.ObjectAlreadyRemoved
var errOutOfRange *apistatus.ObjectOutOfRange var errOutOfRange *apistatus.ObjectOutOfRange
switch { switch {
default: default:
r.log.Debug(logs.GetRemoteCallFailed, zap.Error(err))
if r.status == statusEC {

What does this condition checks?

What does this condition checks?

We need to continue to process the nodes in case of getting error from the node. Updated this line a bit.

We need to continue to process the nodes in case of getting error from the node. Updated this line a bit.
// we need to continue getting another chunks from another nodes
// in case of network issue
return false
}
r.status = statusUndefined r.status = statusUndefined
r.err = new(apistatus.ObjectNotFound) r.err = new(apistatus.ObjectNotFound)
r.log.Debug(logs.GetRemoteCallFailed, zap.Error(err))
case err == nil: case err == nil:
r.status = statusOK r.status = statusOK
r.err = nil r.err = nil
@ -57,6 +64,20 @@ func (r *request) processNode(ctx context.Context, info client.NodeInfo) bool {
r.status = statusVIRTUAL r.status = statusVIRTUAL
mergeSplitInfo(r.splitInfo(), errSplitInfo.SplitInfo()) mergeSplitInfo(r.splitInfo(), errSplitInfo.SplitInfo())
r.err = objectSDK.NewSplitInfoError(r.infoSplit) r.err = objectSDK.NewSplitInfoError(r.infoSplit)
case errors.As(err, &errECInfo):
r.status = statusEC
util.MergeECInfo(r.infoEC, errECInfo.ECInfo())
fyrchik marked this conversation as resolved Outdated

Each TODO should be accompanied by an issue, could you create one?

Each TODO should be accompanied by an issue, could you create one?

Anyway, the condition here is easy to fix, the number of chunks we must have is equal to placementpolicy.ECDataCount()

Anyway, the condition here is easy to fix, the number of chunks we must have is equal to `placementpolicy.ECDataCount()`

Fixed.

Fixed.
r.infoEC = errECInfo.ECInfo()
r.err = objectSDK.NewECInfoError(r.infoEC)
if r.isRaw() {
return len(r.infoEC.Chunks) == int(r.infoEC.Chunks[0].Total)
}
cnt, err := r.containerSource.Get(r.address().Container())
if err == nil {
return len(r.infoEC.Chunks) == policy.ECDataCount(cnt.Value.PlacementPolicy())
}
r.log.Debug(logs.GetUnableToGetECObjectContainer, zap.Error(err))
return len(r.infoEC.Chunks) == int(r.infoEC.Chunks[0].Total)
} }
return r.status != statusUndefined return r.status != statusUndefined

View file

@ -6,6 +6,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
clientcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client" clientcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
@ -22,6 +23,8 @@ type request struct {
infoSplit *objectSDK.SplitInfo infoSplit *objectSDK.SplitInfo
infoEC *objectSDK.ECInfo
log *logger.Logger log *logger.Logger
collectedObject *objectSDK.Object collectedObject *objectSDK.Object
@ -33,6 +36,7 @@ type request struct {
traverserGenerator traverserGenerator traverserGenerator traverserGenerator
remoteStorageConstructor remoteStorageConstructor remoteStorageConstructor remoteStorageConstructor
localStorage localStorage localStorage localStorage
containerSource container.Source
} }
func (r *request) setLogger(l *logger.Logger) { func (r *request) setLogger(l *logger.Logger) {

View file

@ -1,6 +1,7 @@
package getsvc package getsvc
import ( import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
"go.uber.org/zap" "go.uber.org/zap"
) )
@ -16,6 +17,7 @@ type Service struct {
epochSource epochSource epochSource epochSource
keyStore keyStorage keyStore keyStorage
remoteStorageConstructor remoteStorageConstructor remoteStorageConstructor remoteStorageConstructor
containerSource container.Source
} }
// New creates, initializes and returns utility serving // New creates, initializes and returns utility serving
@ -26,6 +28,7 @@ func New(
e localStorageEngine, e localStorageEngine,
tg traverserGenerator, tg traverserGenerator,
cc clientConstructor, cc clientConstructor,
cs container.Source,
opts ...Option, opts ...Option,
) *Service { ) *Service {
result := &Service{ result := &Service{
@ -39,6 +42,7 @@ func New(
remoteStorageConstructor: &multiclientRemoteStorageConstructor{ remoteStorageConstructor: &multiclientRemoteStorageConstructor{
clientConstructor: cc, clientConstructor: cc,
}, },
containerSource: cs,
} }
for _, option := range opts { for _, option := range opts {
option(result) option(result)

View file

@ -6,6 +6,7 @@ const (
statusINHUMED statusINHUMED
statusVIRTUAL statusVIRTUAL
statusOutOfRange statusOutOfRange
statusEC
) )
type statusError struct { type statusError struct {

View file

@ -166,6 +166,9 @@ func (f *getRequestForwarder) readStream(ctx context.Context, c client.MultiAddr
case *objectV2.SplitInfo: case *objectV2.SplitInfo:
si := objectSDK.NewSplitInfoFromV2(v) si := objectSDK.NewSplitInfoFromV2(v)
return objectSDK.NewSplitInfoError(si) return objectSDK.NewSplitInfoError(si)
case *objectV2.ECInfo:
ei := objectSDK.NewECInfoFromV2(v)
return objectSDK.NewECInfoError(ei)
} }
} }
return nil return nil

View file

@ -84,6 +84,9 @@ func (f *headRequestForwarder) forwardRequestToNode(ctx context.Context, addr ne
case *objectV2.SplitInfo: case *objectV2.SplitInfo:
si := objectSDK.NewSplitInfoFromV2(v) si := objectSDK.NewSplitInfoFromV2(v)
return nil, objectSDK.NewSplitInfoError(si) return nil, objectSDK.NewSplitInfoError(si)
case *objectV2.ECInfo:
ei := objectSDK.NewECInfoFromV2(v)
return nil, objectSDK.NewECInfoError(ei)
} }
objv2 := new(objectV2.Object) objv2 := new(objectV2.Object)

View file

@ -82,10 +82,13 @@ func (s *Service) Get(req *objectV2.GetRequest, stream objectSvc.GetObjectStream
err = s.svc.Get(stream.Context(), *p) err = s.svc.Get(stream.Context(), *p)
var splitErr *objectSDK.SplitInfoError var splitErr *objectSDK.SplitInfoError
var ecErr *objectSDK.ECInfoError
switch { switch {
case errors.As(err, &splitErr): case errors.As(err, &splitErr):
return stream.Send(splitInfoResponse(splitErr.SplitInfo())) return stream.Send(splitInfoResponse(splitErr.SplitInfo()))
case errors.As(err, &ecErr):
return stream.Send(ecInfoResponse(ecErr.ECInfo()))
default: default:
return err return err
} }
@ -123,11 +126,16 @@ func (s *Service) Head(ctx context.Context, req *objectV2.HeadRequest) (*objectV
err = s.svc.Head(ctx, *p) err = s.svc.Head(ctx, *p)
var splitErr *objectSDK.SplitInfoError var splitErr *objectSDK.SplitInfoError
var ecErr *objectSDK.ECInfoError
if errors.As(err, &splitErr) { if errors.As(err, &splitErr) {
setSplitInfoHeadResponse(splitErr.SplitInfo(), resp) setSplitInfoHeadResponse(splitErr.SplitInfo(), resp)
err = nil err = nil
} }
if errors.As(err, &ecErr) {
setECInfoHeadResponse(ecErr.ECInfo(), resp)
err = nil
}
return resp, err return resp, err
} }

View file

@ -270,6 +270,17 @@ func splitInfoResponse(info *objectSDK.SplitInfo) *objectV2.GetResponse {
return resp return resp
} }
func ecInfoResponse(info *objectSDK.ECInfo) *objectV2.GetResponse {
resp := new(objectV2.GetResponse)
body := new(objectV2.GetResponseBody)
resp.SetBody(body)
body.SetObjectPart(info.ToV2())
return resp
}
func splitInfoRangeResponse(info *objectSDK.SplitInfo) *objectV2.GetRangeResponse { func splitInfoRangeResponse(info *objectSDK.SplitInfo) *objectV2.GetRangeResponse {
resp := new(objectV2.GetRangeResponse) resp := new(objectV2.GetRangeResponse)
@ -285,6 +296,10 @@ func setSplitInfoHeadResponse(info *objectSDK.SplitInfo, resp *objectV2.HeadResp
resp.GetBody().SetHeaderPart(info.ToV2()) resp.GetBody().SetHeaderPart(info.ToV2())
} }
func setECInfoHeadResponse(info *objectSDK.ECInfo, resp *objectV2.HeadResponse) {
resp.GetBody().SetHeaderPart(info.ToV2())
}
func toHashResponse(typ refs.ChecksumType, res *getsvc.RangeHashRes) *objectV2.GetRangeHashResponse { func toHashResponse(typ refs.ChecksumType, res *getsvc.RangeHashRes) *objectV2.GetRangeHashResponse {
resp := new(objectV2.GetRangeHashResponse) resp := new(objectV2.GetRangeHashResponse)