From 112a7c690f55a334eb05ce2b219f4551d928f45f Mon Sep 17 00:00:00 2001 From: Anton Nikiforov Date: Mon, 22 Apr 2024 09:43:42 +0300 Subject: [PATCH] [#1103] node: Implement `Get\Head` requests for EC object Signed-off-by: Anton Nikiforov --- cmd/frostfs-cli/modules/object/get.go | 4 + cmd/frostfs-cli/modules/object/head.go | 4 + cmd/frostfs-cli/modules/object/range.go | 44 +++++++ cmd/frostfs-node/object.go | 4 +- go.mod | 4 +- go.sum | Bin 43242 -> 43242 bytes internal/logs/logs.go | 9 ++ pkg/local_object_storage/engine/get.go | 14 +++ pkg/local_object_storage/engine/head.go | 11 ++ pkg/local_object_storage/metabase/exists.go | 4 + pkg/local_object_storage/metabase/get.go | 29 +++++ pkg/local_object_storage/metabase/get_test.go | 39 ++++++ pkg/local_object_storage/metabase/put.go | 39 ++++++ pkg/local_object_storage/metabase/util.go | 10 ++ pkg/local_object_storage/util/ecinfo.go | 25 ++++ pkg/local_object_storage/util/ecinfo_test.go | 56 +++++++++ pkg/network/cache/multi.go | 6 +- pkg/services/object/get/assemble.go | 2 + pkg/services/object/get/assembleec.go | 79 ++++++++++++ pkg/services/object/get/assemblerec.go | 117 ++++++++++++++++++ pkg/services/object/get/get.go | 13 ++ pkg/services/object/get/local.go | 6 + pkg/services/object/get/remote.go | 25 +++- pkg/services/object/get/request.go | 4 + pkg/services/object/get/service.go | 4 + pkg/services/object/get/status.go | 1 + pkg/services/object/get/v2/get_forwarder.go | 3 + pkg/services/object/get/v2/head_forwarder.go | 3 + pkg/services/object/get/v2/service.go | 8 ++ pkg/services/object/get/v2/util.go | 15 +++ 30 files changed, 575 insertions(+), 7 deletions(-) create mode 100644 pkg/local_object_storage/util/ecinfo.go create mode 100644 pkg/local_object_storage/util/ecinfo_test.go create mode 100644 pkg/services/object/get/assembleec.go create mode 100644 pkg/services/object/get/assemblerec.go diff --git a/cmd/frostfs-cli/modules/object/get.go b/cmd/frostfs-cli/modules/object/get.go index 9a888ccd3..f1edccba2 100644 --- a/cmd/frostfs-cli/modules/object/get.go +++ b/cmd/frostfs-cli/modules/object/get.go @@ -99,6 +99,10 @@ func getObject(cmd *cobra.Command, _ []string) { return } + if ok := printECInfoErr(cmd, err); ok { + return + } + commonCmd.ExitOnErr(cmd, "rpc error: %w", err) } diff --git a/cmd/frostfs-cli/modules/object/head.go b/cmd/frostfs-cli/modules/object/head.go index f97ef952d..14797dc41 100644 --- a/cmd/frostfs-cli/modules/object/head.go +++ b/cmd/frostfs-cli/modules/object/head.go @@ -70,6 +70,10 @@ func getObjectHeader(cmd *cobra.Command, _ []string) { return } + if ok := printECInfoErr(cmd, err); ok { + return + } + commonCmd.ExitOnErr(cmd, "rpc error: %w", err) } diff --git a/cmd/frostfs-cli/modules/object/range.go b/cmd/frostfs-cli/modules/object/range.go index 0eee7bdba..9ba752237 100644 --- a/cmd/frostfs-cli/modules/object/range.go +++ b/cmd/frostfs-cli/modules/object/range.go @@ -146,6 +146,50 @@ func marshalSplitInfo(cmd *cobra.Command, info *objectSDK.SplitInfo) ([]byte, er } } +func printECInfoErr(cmd *cobra.Command, err error) bool { + var errECInfo *objectSDK.ECInfoError + + ok := errors.As(err, &errECInfo) + + if ok { + cmd.PrintErrln("Object is erasure-encoded, ec information received.") + printECInfo(cmd, errECInfo.ECInfo()) + } + + return ok +} + +func printECInfo(cmd *cobra.Command, info *objectSDK.ECInfo) { + bs, err := marshalECInfo(cmd, info) + commonCmd.ExitOnErr(cmd, "can't marshal split info: %w", err) + + cmd.Println(string(bs)) +} + +func marshalECInfo(cmd *cobra.Command, info *objectSDK.ECInfo) ([]byte, error) { + toJSON, _ := cmd.Flags().GetBool(commonflags.JSON) + toProto, _ := cmd.Flags().GetBool("proto") + switch { + case toJSON && toProto: + return nil, errors.New("'--json' and '--proto' flags are mutually exclusive") + case toJSON: + return info.MarshalJSON() + case toProto: + return info.Marshal() + default: + b := bytes.NewBuffer(nil) + b.WriteString("Total chunks: " + strconv.Itoa(int(info.Chunks[0].Total))) + for _, chunk := range info.Chunks { + var id oid.ID + if err := id.Decode(chunk.ID.GetValue()); err != nil { + return nil, fmt.Errorf("unable to decode chunk id: %w", err) + } + b.WriteString("\n Index: " + strconv.Itoa(int(chunk.Index)) + " ID: " + id.String()) + } + return b.Bytes(), nil + } +} + func getRangeList(cmd *cobra.Command) ([]objectSDK.Range, error) { v := cmd.Flag("range").Value.String() if len(v) == 0 { diff --git a/cmd/frostfs-node/object.go b/cmd/frostfs-node/object.go index 6160bbc76..a7a084fd1 100644 --- a/cmd/frostfs-node/object.go +++ b/cmd/frostfs-node/object.go @@ -173,7 +173,7 @@ func initObjectService(c *cfg) { sSearchV2 := createSearchSvcV2(sSearch, keyStorage) - sGet := createGetService(c, keyStorage, traverseGen, c.clientCache) + sGet := createGetService(c, keyStorage, traverseGen, c.clientCache, c.cfgObject.cnrSource) *c.cfgObject.getSvc = *sGet // need smth better @@ -358,6 +358,7 @@ func createSearchSvcV2(sSearch *searchsvc.Service, keyStorage *util.KeyStorage) func createGetService(c *cfg, keyStorage *util.KeyStorage, traverseGen *util.TraverserGenerator, coreConstructor *cache.ClientCache, + containerSource containercore.Source, ) *getsvc.Service { ls := c.cfgObject.cfgLocalStorage.localStorage @@ -369,6 +370,7 @@ func createGetService(c *cfg, keyStorage *util.KeyStorage, traverseGen *util.Tra placement.SuccessAfter(1), ), coreConstructor, + containerSource, getsvc.WithLogger(c.log)) } diff --git a/go.mod b/go.mod index 8cfea305d..f5469dee6 100644 --- a/go.mod +++ b/go.mod @@ -4,10 +4,10 @@ go 1.21 require ( code.gitea.io/sdk/gitea v0.17.1 - git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.16.1-0.20240327095603-491a47e7fe24 + git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.16.1-0.20240422151450-df9b65324a4c git.frostfs.info/TrueCloudLab/frostfs-contract v0.19.0 git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20231101111734-b3ad3335ff65 - git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20240329104804-ec0cb2169f92 + git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20240424080726-20ab57bf7ec3 git.frostfs.info/TrueCloudLab/hrw v1.2.1 git.frostfs.info/TrueCloudLab/policy-engine v0.0.0-20240412130734-0e69e485115a git.frostfs.info/TrueCloudLab/tzhash v1.8.0 diff --git a/go.sum b/go.sum index 76ed7e5dc963cac023ebe5deadddb39c17afc41a..0a885cb2f3c4603957fa8ca007b165e45b729322 100644 GIT binary patch delta 273 zcmaELk?GY%rU}nnO^l2TO$|*<4Rlk|ER)Pkjg3qaO_CKd46TCPT#EdP-HcMTUA@v0 zoh)*qih_bF{IUYGebRglEYtH`D@#+#+)I-U4JLnN(wKNwQVhFco0}N5n1L3V7+4sX z8=2`E86+l|nkS{1rzRVNEHV#H(f2a)^ff4TFUWQCc1kZ!4b3pq_bN{D@-r|?vIw^b zjq>$&40VZ0pIpdd?StJU{q%gj-24=ff#oi4xxNABIR*w1DW+jjVO4HMzNHzJUe0;> eC6yUz6&{g!Wks&8?)s@FlP|JrZC=Fcu^0eNpj99M delta 273 zcmaELk?GY%rU}nnjg8C=EKSV}jCD;c4HHewQ_a&-jZ73W46RB%vvUk`y~E30Je@crdb++EXoM3^ffg~v8eP+^e>1E%<;(yi%cnUPfB+#F7tHBD$g-? z^h)#!&4~;Mom|Lb?StJU{q%gj-24=fftdzD-i4WNCRN6v{uMsKe&rb+<=V-iRr+3) ek>RBwK?QkvmZ?RlCXvSFlP|JrZC=Fcu^0d~z*VdO diff --git a/internal/logs/logs.go b/internal/logs/logs.go index bd67217c4..e76ab10b6 100644 --- a/internal/logs/logs.go +++ b/internal/logs/logs.go @@ -99,9 +99,17 @@ const ( GetRemoteCallFailed = "remote call failed" GetCanNotAssembleTheObject = "can not assemble the object" GetTryingToAssembleTheObject = "trying to assemble the object..." + GetTryingToAssembleTheECObject = "trying to assemble the ec object..." GetAssemblingSplittedObject = "assembling splitted object..." + GetAssemblingECObject = "assembling erasure-coded object..." + GetUnableToGetAllPartsECObject = "unable to get all parts, continue to reconstruct with existed" + GetUnableToGetPartECObject = "unable to get part of the erasure-encoded object" + GetUnableToHeadPartECObject = "unable to head part of the erasure-encoded object" + GetUnableToGetECObjectContainer = "unable to get container for erasure-coded object" GetAssemblingSplittedObjectCompleted = "assembling splitted object completed" + GetAssemblingECObjectCompleted = "assembling erasure-coded object completed" GetFailedToAssembleSplittedObject = "failed to assemble splitted object" + GetFailedToAssembleECObject = "failed to assemble erasure-coded object" GetCouldNotGenerateContainerTraverser = "could not generate container traverser" GetCouldNotConstructRemoteNodeClient = "could not construct remote node client" GetCouldNotWriteHeader = "could not write header" @@ -111,6 +119,7 @@ const ( GetCompletingTheOperation = "completing the operation" GetRequestedObjectWasMarkedAsRemoved = "requested object was marked as removed" GetRequestedObjectIsVirtual = "requested object is virtual" + GetRequestedObjectIsEC = "requested object is erasure-coded" GetRequestedRangeIsOutOfObjectBounds = "requested range is out of object bounds" PutAdditionalContainerBroadcastFailure = "additional container broadcast failure" SearchReturnResultDirectly = "return result directly" diff --git a/pkg/local_object_storage/engine/get.go b/pkg/local_object_storage/engine/get.go index f77c44226..991af3d1a 100644 --- a/pkg/local_object_storage/engine/get.go +++ b/pkg/local_object_storage/engine/get.go @@ -88,6 +88,10 @@ func (e *StorageEngine) get(ctx context.Context, prm GetPrm) (GetRes, error) { return GetRes{}, logicerr.Wrap(objectSDK.NewSplitInfoError(it.SplitInfo)) } + if it.ECInfo != nil { + return GetRes{}, logicerr.Wrap(objectSDK.NewECInfoError(it.ECInfo)) + } + if it.ObjectExpired { return GetRes{}, errNotFound } @@ -119,6 +123,7 @@ func (e *StorageEngine) get(ctx context.Context, prm GetPrm) (GetRes, error) { type getShardIterator struct { Object *objectSDK.Object SplitInfo *objectSDK.SplitInfo + ECInfo *objectSDK.ECInfo OutError error ShardWithMeta hashedShard MetaError error @@ -130,6 +135,7 @@ type getShardIterator struct { Engine *StorageEngine splitInfoErr *objectSDK.SplitInfoError + ecInfoErr *objectSDK.ECInfoError } func (i *getShardIterator) tryGetWithMeta(ctx context.Context) { @@ -164,6 +170,14 @@ func (i *getShardIterator) tryGetWithMeta(ctx context.Context) { // stop iterating over shards if SplitInfo structure is complete return withLink && withLast + case errors.As(err, &i.ecInfoErr): + if i.ECInfo == nil { + i.ECInfo = objectSDK.NewECInfo() + } + + util.MergeECInfo(i.ecInfoErr.ECInfo(), i.ECInfo) + // stop iterating over shards if ECInfo structure is complete + return len(i.ECInfo.Chunks) == int(i.ECInfo.Chunks[0].Total) case client.IsErrObjectAlreadyRemoved(err): i.OutError = err return true // stop, return it back diff --git a/pkg/local_object_storage/engine/head.go b/pkg/local_object_storage/engine/head.go index f56e1b772..1c61ca455 100644 --- a/pkg/local_object_storage/engine/head.go +++ b/pkg/local_object_storage/engine/head.go @@ -75,6 +75,8 @@ func (e *StorageEngine) head(ctx context.Context, prm HeadPrm) (HeadRes, error) head *objectSDK.Object siErr *objectSDK.SplitInfoError outSI *objectSDK.SplitInfo + eiErr *objectSDK.ECInfoError + outEI *objectSDK.ECInfo outError error = new(apistatus.ObjectNotFound) shPrm shard.HeadPrm ) @@ -99,6 +101,13 @@ func (e *StorageEngine) head(ctx context.Context, prm HeadPrm) (HeadRes, error) return true } return false + case errors.As(err, &eiErr): + if outEI == nil { + outEI = objectSDK.NewECInfo() + } + util.MergeECInfo(eiErr.ECInfo(), outEI) + // stop iterating over shards if ECInfo structure is complete + return len(outEI.Chunks) == int(outEI.Chunks[0].Total) case client.IsErrObjectAlreadyRemoved(err): outError = err return true // stop, return it back @@ -118,6 +127,8 @@ func (e *StorageEngine) head(ctx context.Context, prm HeadPrm) (HeadRes, error) if outSI != nil { return HeadRes{}, logicerr.Wrap(objectSDK.NewSplitInfoError(outSI)) + } else if outEI != nil { + return HeadRes{}, logicerr.Wrap(objectSDK.NewECInfoError(outEI)) } else if head == nil { return HeadRes{}, outError } diff --git a/pkg/local_object_storage/metabase/exists.go b/pkg/local_object_storage/metabase/exists.go index aa9aba106..bf6766c05 100644 --- a/pkg/local_object_storage/metabase/exists.go +++ b/pkg/local_object_storage/metabase/exists.go @@ -108,6 +108,10 @@ func (db *DB) exists(tx *bbolt.Tx, addr oid.Address, currEpoch uint64) (exists b return false, logicerr.Wrap(objectSDK.NewSplitInfoError(splitInfo)) } + // if parent bucket is empty, then check if object exists in ec bucket + if data := getFromBucket(tx, ecInfoBucketName(cnr, key), objKey); len(data) != 0 { + return false, getECInfoError(tx, cnr, data) + } // if parent bucket is empty, then check if object exists in typed buckets return firstIrregularObjectType(tx, cnr, objKey) != objectSDK.TypeRegular, nil diff --git a/pkg/local_object_storage/metabase/get.go b/pkg/local_object_storage/metabase/get.go index d18331a3d..aa19502e8 100644 --- a/pkg/local_object_storage/metabase/get.go +++ b/pkg/local_object_storage/metabase/get.go @@ -110,6 +110,11 @@ func (db *DB) get(tx *bbolt.Tx, addr oid.Address, key []byte, checkStatus, raw b return obj, obj.Unmarshal(data) } + data = getFromBucket(tx, ecInfoBucketName(cnr, bucketName), key) + if len(data) != 0 { + return nil, getECInfoError(tx, cnr, data) + } + // if not found then check in tombstone index data = getFromBucket(tx, tombstoneBucketName(cnr, bucketName), key) if len(data) != 0 { @@ -185,3 +190,27 @@ func getSplitInfoError(tx *bbolt.Tx, cnr cid.ID, key []byte) error { return logicerr.Wrap(new(apistatus.ObjectNotFound)) } + +func getECInfoError(tx *bbolt.Tx, cnr cid.ID, data []byte) error { + offset := 0 + ecInfo := objectSDK.NewECInfo() + for offset < len(data) { + key := data[offset : offset+objectKeySize] + // check in primary index + ojbData := getFromBucket(tx, primaryBucketName(cnr, make([]byte, bucketKeySize)), key) + if len(data) != 0 { + obj := objectSDK.New() + if err := obj.Unmarshal(ojbData); err != nil { + return err + } + chunk := objectSDK.ECChunk{} + id, _ := obj.ID() + chunk.SetID(id) + chunk.Index = obj.ECHeader().Index() + chunk.Total = obj.ECHeader().Total() + ecInfo.AddChunk(chunk) + } + offset += objectKeySize + } + return logicerr.Wrap(objectSDK.NewECInfoError(ecInfo)) +} diff --git a/pkg/local_object_storage/metabase/get_test.go b/pkg/local_object_storage/metabase/get_test.go index af6b41327..247ddf9cd 100644 --- a/pkg/local_object_storage/metabase/get_test.go +++ b/pkg/local_object_storage/metabase/get_test.go @@ -3,6 +3,7 @@ package meta_test import ( "bytes" "context" + "errors" "fmt" "os" "runtime" @@ -15,8 +16,10 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client" cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/erasurecode" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test" + "github.com/nspcc-dev/neo-go/pkg/crypto/keys" "github.com/stretchr/testify/require" ) @@ -109,6 +112,42 @@ func TestDB_Get(t *testing.T) { require.True(t, binaryEqual(child.CutPayload(), newChild)) }) + t.Run("put erasure-coded object", func(t *testing.T) { + cnr := cidtest.ID() + virtual := testutil.GenerateObjectWithCID(cnr) + c, err := erasurecode.NewConstructor(3, 1) + require.NoError(t, err) + pk, err := keys.NewPrivateKey() + require.NoError(t, err) + parts, err := c.Split(virtual, &pk.PrivateKey) + require.NoError(t, err) + for _, part := range parts { + err = putBig(db, part) + var eiError *objectSDK.ECInfoError + if err != nil && !errors.As(err, &eiError) { + require.NoError(t, err) + } + } + _, err = metaGet(db, object.AddressOf(virtual), true) + var eiError *objectSDK.ECInfoError + require.ErrorAs(t, err, &eiError) + require.Equal(t, len(eiError.ECInfo().Chunks), len(parts)) + for _, chunk := range eiError.ECInfo().Chunks { + var found bool + for _, part := range parts { + partID, _ := part.ID() + var chunkID oid.ID + require.NoError(t, chunkID.ReadFromV2(chunk.ID)) + if chunkID.Equals(partID) { + found = true + } + } + if !found { + require.Fail(t, "chunk not found") + } + } + }) + t.Run("get removed object", func(t *testing.T) { obj := oidtest.Address() ts := oidtest.Address() diff --git a/pkg/local_object_storage/metabase/put.go b/pkg/local_object_storage/metabase/put.go index 429d981fe..2822303a0 100644 --- a/pkg/local_object_storage/metabase/put.go +++ b/pkg/local_object_storage/metabase/put.go @@ -1,6 +1,7 @@ package meta import ( + "bytes" "context" "encoding/binary" "errors" @@ -264,6 +265,13 @@ func putUniqueIndexes( if err != nil { return err } + + if ecHead := obj.GetECHeader(); ecHead != nil { + err = putECInfo(tx, cnr, objKey, ecHead) + if err != nil { + return err + } + } } return nil @@ -571,3 +579,34 @@ func isLinkObject(obj *objectSDK.Object) bool { func isLastObject(obj *objectSDK.Object) bool { return len(obj.Children()) == 0 && obj.Parent() != nil } + +func putECInfo(tx *bbolt.Tx, + cnr cid.ID, objKey []byte, + ecHead *objectSDK.ECHeader, +) error { + parentID := objectKey(ecHead.Parent(), make([]byte, objectKeySize)) + bucketName := make([]byte, bucketKeySize) + + val := getFromBucket(tx, ecInfoBucketName(cnr, bucketName), parentID) + if len(val) == 0 { + val = objKey + } else { + offset := 0 + found := false + for offset < len(val) { + if bytes.Equal(objKey, val[offset:offset+objectKeySize]) { + found = true + break + } + offset += objectKeySize + } + if !found { + val = append(val, objKey...) + } + } + return putUniqueIndexItem(tx, namedBucketItem{ + name: ecInfoBucketName(cnr, make([]byte, bucketKeySize)), + key: parentID, + val: val, + }) +} diff --git a/pkg/local_object_storage/metabase/util.go b/pkg/local_object_storage/metabase/util.go index d46a421a3..9249ae49b 100644 --- a/pkg/local_object_storage/metabase/util.go +++ b/pkg/local_object_storage/metabase/util.go @@ -119,6 +119,11 @@ const ( // Key: container ID + type // Value: container size in bytes as little-endian uint64 containerCountersPrefix + + // ecInfoPrefix is used for storing relation between EC parent id and chunk id. + // Key: container ID + type + // Value: Object id + ecInfoPrefix ) const ( @@ -190,6 +195,11 @@ func splitBucketName(cnr cid.ID, key []byte) []byte { return bucketName(cnr, splitPrefix, key) } +// ecInfoBucketName returns _ecinfo. +func ecInfoBucketName(cnr cid.ID, key []byte) []byte { + return bucketName(cnr, ecInfoPrefix, key) +} + // addressKey returns key for K-V tables when key is a whole address. func addressKey(addr oid.Address, key []byte) []byte { addr.Container().Encode(key) diff --git a/pkg/local_object_storage/util/ecinfo.go b/pkg/local_object_storage/util/ecinfo.go new file mode 100644 index 000000000..a92fbceea --- /dev/null +++ b/pkg/local_object_storage/util/ecinfo.go @@ -0,0 +1,25 @@ +package util + +import ( + "bytes" + + objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" +) + +// MergeECInfo ignores conflicts and rewrites `to` with non empty values +// from `from`. +func MergeECInfo(from, to *objectSDK.ECInfo) *objectSDK.ECInfo { + for _, fchunk := range from.Chunks { + add := true + for _, tchunk := range to.Chunks { + if bytes.Equal(tchunk.ID.GetValue(), fchunk.ID.GetValue()) { + add = false + break + } + } + if add { + to.AddChunk(*objectSDK.NewECChunkFromV2(&fchunk)) + } + } + return to +} diff --git a/pkg/local_object_storage/util/ecinfo_test.go b/pkg/local_object_storage/util/ecinfo_test.go new file mode 100644 index 000000000..081006088 --- /dev/null +++ b/pkg/local_object_storage/util/ecinfo_test.go @@ -0,0 +1,56 @@ +package util + +import ( + "crypto/rand" + "testing" + + objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" + oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" + "github.com/stretchr/testify/require" +) + +func TestMergeECInfo(t *testing.T) { + id := generateV2ID() + target := objectSDK.NewECInfo() + var chunk objectSDK.ECChunk + chunk.Total = 2 + chunk.Index = 0 + chunk.SetID(id) + target.AddChunk(chunk) + + t.Run("merge empty", func(t *testing.T) { + to := objectSDK.NewECInfo() + + result := MergeECInfo(target, to) + require.Equal(t, result, target) + }) + + t.Run("merge existed", func(t *testing.T) { + to := objectSDK.NewECInfo() + to.AddChunk(chunk) + + result := MergeECInfo(target, to) + require.Equal(t, result, target) + }) + t.Run("merge extend", func(t *testing.T) { + to := objectSDK.NewECInfo() + var chunk objectSDK.ECChunk + chunk.Total = 2 + chunk.Index = 1 + chunk.SetID(generateV2ID()) + to.AddChunk(chunk) + + result := MergeECInfo(target, to) + require.Equal(t, len(result.Chunks), 2) + }) +} + +func generateV2ID() oid.ID { + var buf [32]byte + _, _ = rand.Read(buf[:]) + + var id oid.ID + _ = id.Decode(buf[:]) + + return id +} diff --git a/pkg/network/cache/multi.go b/pkg/network/cache/multi.go index 5dd206283..f19510d76 100644 --- a/pkg/network/cache/multi.go +++ b/pkg/network/cache/multi.go @@ -167,8 +167,9 @@ func (x *multiClient) iterateClients(ctx context.Context, f func(clientcore.Clie // from the SDK client; should not be considered // as a connection error var siErr *objectSDK.SplitInfoError + var eiErr *objectSDK.ECInfoError - success := err == nil || errors.Is(err, context.Canceled) || errors.As(err, &siErr) + success := err == nil || errors.Is(err, context.Canceled) || errors.As(err, &siErr) || errors.As(err, &eiErr) if success || firstErr == nil || errors.Is(firstErr, errRecentlyFailed) { firstErr = err } @@ -196,7 +197,8 @@ func (x *multiClient) ReportError(err error) { // from the SDK client; should not be considered // as a connection error var siErr *objectSDK.SplitInfoError - if errors.As(err, &siErr) { + var eiErr *objectSDK.ECInfoError + if errors.As(err, &siErr) || errors.As(err, &eiErr) { return } diff --git a/pkg/services/object/get/assemble.go b/pkg/services/object/get/assemble.go index 6a8c5c818..548e8e45d 100644 --- a/pkg/services/object/get/assemble.go +++ b/pkg/services/object/get/assemble.go @@ -139,9 +139,11 @@ func (r *request) getObjectWithIndependentRequest(ctx context.Context, prm Reque remoteStorageConstructor: r.remoteStorageConstructor, epochSource: r.epochSource, localStorage: r.localStorage, + containerSource: r.containerSource, prm: prm, infoSplit: objectSDK.NewSplitInfo(), + infoEC: objectSDK.NewECInfo(), log: r.log, } diff --git a/pkg/services/object/get/assembleec.go b/pkg/services/object/get/assembleec.go new file mode 100644 index 000000000..d288ef249 --- /dev/null +++ b/pkg/services/object/get/assembleec.go @@ -0,0 +1,79 @@ +package getsvc + +import ( + "context" + "errors" + + "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" + apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" + "go.uber.org/zap" +) + +func (r *request) assembleEC(ctx context.Context) { + if r.isRaw() { + r.log.Debug(logs.GetCanNotAssembleTheObject) + return + } + + // Any access tokens are not expected to be used in the assembly process: + // - there is no requirement to specify child objects in session/bearer + // token for `GET`/`GETRANGE`/`RANGEHASH` requests in the API protocol, + // and, therefore, their missing in the original request should not be + // considered as error; on the other hand, without session for every child + // object, it is impossible to attach bearer token in the new generated + // requests correctly because the token has not been issued for that node's + // key; + // - the assembly process is expected to be handled on a container node + // only since the requests forwarding mechanism presentation; such the + // node should have enough rights for getting any child object by design. + r.prm.common.ForgetTokens() + + // Do not use forwarding during assembly stage. + // Request forwarding closure inherited in produced + // `execCtx` so it should be disabled there. + r.disableForwarding() + + r.log.Debug(logs.GetTryingToAssembleTheECObject) + + assembler := newAssemblerEC(r.address(), r.infoEC, r.ctxRange(), r, r.containerSource, r.log) + + r.log.Debug(logs.GetAssemblingECObject, + zap.Stringer("address", r.address()), + zap.Uint64("range_offset", r.ctxRange().GetOffset()), + zap.Uint64("range_length", r.ctxRange().GetLength()), + ) + defer r.log.Debug(logs.GetAssemblingECObjectCompleted, + zap.Stringer("address", r.address()), + zap.Uint64("range_offset", r.ctxRange().GetOffset()), + zap.Uint64("range_length", r.ctxRange().GetLength()), + ) + + obj, err := assembler.Assemble(ctx, r.prm.objWriter, r.headOnly()) + if err != nil { + r.log.Warn(logs.GetFailedToAssembleECObject, + zap.Error(err), + zap.Stringer("address", r.address()), + zap.Uint64("range_offset", r.ctxRange().GetOffset()), + zap.Uint64("range_length", r.ctxRange().GetLength()), + ) + } + + var errRemoved *apistatus.ObjectAlreadyRemoved + var errOutOfRange *apistatus.ObjectOutOfRange + + switch { + default: + r.status = statusUndefined + r.err = err + case err == nil: + r.status = statusOK + r.err = nil + r.collectedObject = obj + case errors.As(err, &errRemoved): + r.status = statusINHUMED + r.err = errRemoved + case errors.As(err, &errOutOfRange): + r.status = statusOutOfRange + r.err = errOutOfRange + } +} diff --git a/pkg/services/object/get/assemblerec.go b/pkg/services/object/get/assemblerec.go new file mode 100644 index 000000000..d73d771cf --- /dev/null +++ b/pkg/services/object/get/assemblerec.go @@ -0,0 +1,117 @@ +package getsvc + +import ( + "context" + "fmt" + + "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/policy" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" + objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/erasurecode" + oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" + "go.uber.org/zap" + "golang.org/x/sync/errgroup" +) + +type assemblerec struct { + addr oid.Address + ecInfo *objectSDK.ECInfo + rng *objectSDK.Range + objGetter objectGetter + cs container.Source + log *logger.Logger +} + +func newAssemblerEC( + addr oid.Address, + ecInfo *objectSDK.ECInfo, + rng *objectSDK.Range, + objGetter objectGetter, + cs container.Source, + log *logger.Logger, +) *assemblerec { + return &assemblerec{ + addr: addr, + rng: rng, + ecInfo: ecInfo, + objGetter: objGetter, + cs: cs, + log: log, + } +} + +// Assemble assembles erasure-coded object and writes it's content to ObjectWriter. +// It returns parent object. +func (a *assemblerec) Assemble(ctx context.Context, writer ObjectWriter, headOnly bool) (*objectSDK.Object, error) { + parts := a.retrieveParts(ctx, headOnly) + cnt, err := a.cs.Get(a.addr.Container()) + if err != nil { + return nil, err + } + c, err := erasurecode.NewConstructor( + policy.ECDataCount(cnt.Value.PlacementPolicy()), + policy.ECParityCount(cnt.Value.PlacementPolicy()), + ) + if err != nil { + return nil, err + } + if headOnly { + obj, err := c.ReconstructHeader(parts) + if err == nil { + return obj, writer.WriteHeader(ctx, obj) + } + return nil, err + } + obj, err := c.Reconstruct(parts) + if err == nil { + err = writer.WriteHeader(ctx, obj.CutPayload()) + if err == nil { + err = writer.WriteChunk(ctx, obj.Payload()) + if err != nil { + return nil, err + } + } + } + return obj, err +} + +func (a *assemblerec) retrieveParts(mainCtx context.Context, headOnly bool) []*objectSDK.Object { + parts := make([]*objectSDK.Object, int(a.ecInfo.Chunks[0].Total)) + errGroup, ctx := errgroup.WithContext(mainCtx) + + for i := range a.ecInfo.Chunks { + chunk := a.ecInfo.Chunks[i] + errGroup.Go(func() error { + objID := new(oid.ID) + err := objID.ReadFromV2(chunk.ID) + if err != nil { + return fmt.Errorf("invalid object ID: %w", err) + } + var obj *objectSDK.Object + if headOnly { + obj, err = a.objGetter.HeadObject(ctx, *objID) + if err != nil { + a.log.Debug(logs.GetUnableToHeadPartECObject, zap.Stringer("part_id", objID), zap.Error(err)) + return nil + } + } else { + sow := NewSimpleObjectWriter() + obj, err = a.objGetter.GetObjectAndWritePayload(ctx, *objID, nil, sow) + if err != nil { + a.log.Debug(logs.GetUnableToGetPartECObject, zap.Stringer("part_id", objID), zap.Error(err)) + return nil + } + obj.SetPayload(sow.pld) + } + parts[chunk.Index] = obj + return nil + }) + } + + if err := errGroup.Wait(); err != nil { + a.log.Debug(logs.GetUnableToGetAllPartsECObject, zap.Error(err)) + } + return parts +} diff --git a/pkg/services/object/get/get.go b/pkg/services/object/get/get.go index 3d5547047..9738935d2 100644 --- a/pkg/services/object/get/get.go +++ b/pkg/services/object/get/get.go @@ -73,9 +73,12 @@ func (s *Service) get(ctx context.Context, prm RequestParameters) error { remoteStorageConstructor: s.remoteStorageConstructor, epochSource: s.epochSource, localStorage: s.localStorage, + containerSource: s.containerSource, prm: prm, infoSplit: objectSDK.NewSplitInfo(), + infoEC: objectSDK.NewECInfo(), + log: s.log, } exec.setLogger(s.log) @@ -106,6 +109,16 @@ func (exec *request) analyzeStatus(ctx context.Context, execCnr bool) { exec.assemble(ctx) case statusOutOfRange: exec.log.Debug(logs.GetRequestedRangeIsOutOfObjectBounds) + case statusEC: + if !exec.isLocal() { + if execCnr { + exec.executeOnContainer(ctx) + exec.analyzeStatus(ctx, false) + } else { + exec.log.Debug(logs.GetRequestedObjectIsEC) + exec.assembleEC(ctx) + } + } default: exec.log.Debug(logs.OperationFinishedWithError, zap.Error(exec.err), diff --git a/pkg/services/object/get/local.go b/pkg/services/object/get/local.go index 257465019..fcfc9befc 100644 --- a/pkg/services/object/get/local.go +++ b/pkg/services/object/get/local.go @@ -5,6 +5,7 @@ import ( "errors" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" @@ -22,6 +23,7 @@ func (r *request) executeLocal(ctx context.Context) { r.collectedObject, err = r.get(ctx) var errSplitInfo *objectSDK.SplitInfoError + var errECInfo *objectSDK.ECInfoError var errRemoved *apistatus.ObjectAlreadyRemoved var errOutOfRange *apistatus.ObjectOutOfRange @@ -42,6 +44,10 @@ func (r *request) executeLocal(ctx context.Context) { r.status = statusVIRTUAL mergeSplitInfo(r.splitInfo(), errSplitInfo.SplitInfo()) r.err = objectSDK.NewSplitInfoError(r.infoSplit) + case errors.As(err, &errECInfo): + r.status = statusEC + util.MergeECInfo(errECInfo.ECInfo(), r.infoEC) + r.err = objectSDK.NewECInfoError(r.infoEC) case errors.As(err, &errOutOfRange): r.status = statusOutOfRange r.err = errOutOfRange diff --git a/pkg/services/object/get/remote.go b/pkg/services/object/get/remote.go index cd94434cf..302a4a4bc 100644 --- a/pkg/services/object/get/remote.go +++ b/pkg/services/object/get/remote.go @@ -7,6 +7,8 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/policy" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" @@ -27,15 +29,20 @@ func (r *request) processNode(ctx context.Context, info client.NodeInfo) bool { obj, err := r.getRemote(ctx, rs, info) var errSplitInfo *objectSDK.SplitInfoError + var errECInfo *objectSDK.ECInfoError var errRemoved *apistatus.ObjectAlreadyRemoved var errOutOfRange *apistatus.ObjectOutOfRange switch { default: + r.log.Debug(logs.GetRemoteCallFailed, zap.Error(err)) + if r.status == statusEC { + // we need to continue getting another chunks from another nodes + // in case of network issue + return false + } r.status = statusUndefined r.err = new(apistatus.ObjectNotFound) - - r.log.Debug(logs.GetRemoteCallFailed, zap.Error(err)) case err == nil: r.status = statusOK r.err = nil @@ -57,6 +64,20 @@ func (r *request) processNode(ctx context.Context, info client.NodeInfo) bool { r.status = statusVIRTUAL mergeSplitInfo(r.splitInfo(), errSplitInfo.SplitInfo()) r.err = objectSDK.NewSplitInfoError(r.infoSplit) + case errors.As(err, &errECInfo): + r.status = statusEC + util.MergeECInfo(r.infoEC, errECInfo.ECInfo()) + r.infoEC = errECInfo.ECInfo() + r.err = objectSDK.NewECInfoError(r.infoEC) + if r.isRaw() { + return len(r.infoEC.Chunks) == int(r.infoEC.Chunks[0].Total) + } + cnt, err := r.containerSource.Get(r.address().Container()) + if err == nil { + return len(r.infoEC.Chunks) == policy.ECDataCount(cnt.Value.PlacementPolicy()) + } + r.log.Debug(logs.GetUnableToGetECObjectContainer, zap.Error(err)) + return len(r.infoEC.Chunks) == int(r.infoEC.Chunks[0].Total) } return r.status != statusUndefined diff --git a/pkg/services/object/get/request.go b/pkg/services/object/get/request.go index b9223a637..d0b79e30c 100644 --- a/pkg/services/object/get/request.go +++ b/pkg/services/object/get/request.go @@ -6,6 +6,7 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" clientcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" @@ -22,6 +23,8 @@ type request struct { infoSplit *objectSDK.SplitInfo + infoEC *objectSDK.ECInfo + log *logger.Logger collectedObject *objectSDK.Object @@ -33,6 +36,7 @@ type request struct { traverserGenerator traverserGenerator remoteStorageConstructor remoteStorageConstructor localStorage localStorage + containerSource container.Source } func (r *request) setLogger(l *logger.Logger) { diff --git a/pkg/services/object/get/service.go b/pkg/services/object/get/service.go index bdf01a977..3413abeb7 100644 --- a/pkg/services/object/get/service.go +++ b/pkg/services/object/get/service.go @@ -1,6 +1,7 @@ package getsvc import ( + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" "go.uber.org/zap" ) @@ -16,6 +17,7 @@ type Service struct { epochSource epochSource keyStore keyStorage remoteStorageConstructor remoteStorageConstructor + containerSource container.Source } // New creates, initializes and returns utility serving @@ -26,6 +28,7 @@ func New( e localStorageEngine, tg traverserGenerator, cc clientConstructor, + cs container.Source, opts ...Option, ) *Service { result := &Service{ @@ -39,6 +42,7 @@ func New( remoteStorageConstructor: &multiclientRemoteStorageConstructor{ clientConstructor: cc, }, + containerSource: cs, } for _, option := range opts { option(result) diff --git a/pkg/services/object/get/status.go b/pkg/services/object/get/status.go index 3a5eebe32..919338d7f 100644 --- a/pkg/services/object/get/status.go +++ b/pkg/services/object/get/status.go @@ -6,6 +6,7 @@ const ( statusINHUMED statusVIRTUAL statusOutOfRange + statusEC ) type statusError struct { diff --git a/pkg/services/object/get/v2/get_forwarder.go b/pkg/services/object/get/v2/get_forwarder.go index 40aa3f62e..774f98643 100644 --- a/pkg/services/object/get/v2/get_forwarder.go +++ b/pkg/services/object/get/v2/get_forwarder.go @@ -166,6 +166,9 @@ func (f *getRequestForwarder) readStream(ctx context.Context, c client.MultiAddr case *objectV2.SplitInfo: si := objectSDK.NewSplitInfoFromV2(v) return objectSDK.NewSplitInfoError(si) + case *objectV2.ECInfo: + ei := objectSDK.NewECInfoFromV2(v) + return objectSDK.NewECInfoError(ei) } } return nil diff --git a/pkg/services/object/get/v2/head_forwarder.go b/pkg/services/object/get/v2/head_forwarder.go index a1bce1517..11286321a 100644 --- a/pkg/services/object/get/v2/head_forwarder.go +++ b/pkg/services/object/get/v2/head_forwarder.go @@ -84,6 +84,9 @@ func (f *headRequestForwarder) forwardRequestToNode(ctx context.Context, addr ne case *objectV2.SplitInfo: si := objectSDK.NewSplitInfoFromV2(v) return nil, objectSDK.NewSplitInfoError(si) + case *objectV2.ECInfo: + ei := objectSDK.NewECInfoFromV2(v) + return nil, objectSDK.NewECInfoError(ei) } objv2 := new(objectV2.Object) diff --git a/pkg/services/object/get/v2/service.go b/pkg/services/object/get/v2/service.go index bcdc4120e..682128df6 100644 --- a/pkg/services/object/get/v2/service.go +++ b/pkg/services/object/get/v2/service.go @@ -82,10 +82,13 @@ func (s *Service) Get(req *objectV2.GetRequest, stream objectSvc.GetObjectStream err = s.svc.Get(stream.Context(), *p) var splitErr *objectSDK.SplitInfoError + var ecErr *objectSDK.ECInfoError switch { case errors.As(err, &splitErr): return stream.Send(splitInfoResponse(splitErr.SplitInfo())) + case errors.As(err, &ecErr): + return stream.Send(ecInfoResponse(ecErr.ECInfo())) default: return err } @@ -123,11 +126,16 @@ func (s *Service) Head(ctx context.Context, req *objectV2.HeadRequest) (*objectV err = s.svc.Head(ctx, *p) var splitErr *objectSDK.SplitInfoError + var ecErr *objectSDK.ECInfoError if errors.As(err, &splitErr) { setSplitInfoHeadResponse(splitErr.SplitInfo(), resp) err = nil } + if errors.As(err, &ecErr) { + setECInfoHeadResponse(ecErr.ECInfo(), resp) + err = nil + } return resp, err } diff --git a/pkg/services/object/get/v2/util.go b/pkg/services/object/get/v2/util.go index 7f7dd7480..da6428985 100644 --- a/pkg/services/object/get/v2/util.go +++ b/pkg/services/object/get/v2/util.go @@ -270,6 +270,17 @@ func splitInfoResponse(info *objectSDK.SplitInfo) *objectV2.GetResponse { return resp } +func ecInfoResponse(info *objectSDK.ECInfo) *objectV2.GetResponse { + resp := new(objectV2.GetResponse) + + body := new(objectV2.GetResponseBody) + resp.SetBody(body) + + body.SetObjectPart(info.ToV2()) + + return resp +} + func splitInfoRangeResponse(info *objectSDK.SplitInfo) *objectV2.GetRangeResponse { resp := new(objectV2.GetRangeResponse) @@ -285,6 +296,10 @@ func setSplitInfoHeadResponse(info *objectSDK.SplitInfo, resp *objectV2.HeadResp resp.GetBody().SetHeaderPart(info.ToV2()) } +func setECInfoHeadResponse(info *objectSDK.ECInfo, resp *objectV2.HeadResponse) { + resp.GetBody().SetHeaderPart(info.ToV2()) +} + func toHashResponse(typ refs.ChecksumType, res *getsvc.RangeHashRes) *objectV2.GetRangeHashResponse { resp := new(objectV2.GetRangeHashResponse)