From b5f3c6b60b7bd6b99bd279223b905968ac6eb3c3 Mon Sep 17 00:00:00 2001 From: Anton Nikiforov Date: Mon, 22 Apr 2024 09:17:40 +0300 Subject: [PATCH 1/2] [#xx] Fix end of file Signed-off-by: Anton Nikiforov --- dev/.vscode-example/launch.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dev/.vscode-example/launch.json b/dev/.vscode-example/launch.json index 6aedde85e..990fd42a8 100644 --- a/dev/.vscode-example/launch.json +++ b/dev/.vscode-example/launch.json @@ -250,4 +250,4 @@ "stopAll": true } ] -} \ No newline at end of file +} -- 2.45.2 From a7a14f0ff6fc07eb08d74425bb565af29f03dca6 Mon Sep 17 00:00:00 2001 From: Anton Nikiforov Date: Mon, 22 Apr 2024 09:43:42 +0300 Subject: [PATCH 2/2] [#xx] node: Implement `Get\Head` requests for EC object Signed-off-by: Anton Nikiforov --- cmd/frostfs-cli/modules/object/get.go | 4 + cmd/frostfs-cli/modules/object/head.go | 4 + cmd/frostfs-cli/modules/object/range.go | 44 +++++++++ cmd/frostfs-node/object.go | 4 +- go.mod | 5 + go.sum | Bin 43242 -> 42704 bytes internal/logs/logs.go | 5 + pkg/local_object_storage/engine/get.go | 14 +++ pkg/local_object_storage/engine/head.go | 17 +++- pkg/local_object_storage/metabase/exists.go | 4 + pkg/local_object_storage/metabase/get.go | 33 +++++++ pkg/local_object_storage/metabase/get_test.go | 39 ++++++++ pkg/local_object_storage/metabase/put.go | 39 ++++++++ pkg/local_object_storage/metabase/util.go | 10 ++ pkg/local_object_storage/util/ecinfo.go | 28 ++++++ pkg/local_object_storage/util/ecinfo_test.go | 58 +++++++++++ pkg/network/cache/multi.go | 6 +- pkg/services/object/get/assembleec.go | 87 ++++++++++++++++ pkg/services/object/get/assemblerec.go | 93 ++++++++++++++++++ pkg/services/object/get/get.go | 12 +++ pkg/services/object/get/local.go | 6 ++ pkg/services/object/get/remote.go | 15 ++- pkg/services/object/get/request.go | 4 + pkg/services/object/get/service.go | 4 + pkg/services/object/get/status.go | 1 + pkg/services/object/get/v2/get_forwarder.go | 3 + pkg/services/object/get/v2/head_forwarder.go | 3 + pkg/services/object/get/v2/service.go | 8 ++ pkg/services/object/get/v2/util.go | 15 +++ 29 files changed, 555 insertions(+), 10 deletions(-) create mode 100644 pkg/local_object_storage/util/ecinfo.go create mode 100644 pkg/local_object_storage/util/ecinfo_test.go create mode 100644 pkg/services/object/get/assembleec.go create mode 100644 pkg/services/object/get/assemblerec.go diff --git a/cmd/frostfs-cli/modules/object/get.go b/cmd/frostfs-cli/modules/object/get.go index 9a888ccd3..f1edccba2 100644 --- a/cmd/frostfs-cli/modules/object/get.go +++ b/cmd/frostfs-cli/modules/object/get.go @@ -99,6 +99,10 @@ func getObject(cmd *cobra.Command, _ []string) { return } + if ok := printECInfoErr(cmd, err); ok { + return + } + commonCmd.ExitOnErr(cmd, "rpc error: %w", err) } diff --git a/cmd/frostfs-cli/modules/object/head.go b/cmd/frostfs-cli/modules/object/head.go index f97ef952d..14797dc41 100644 --- a/cmd/frostfs-cli/modules/object/head.go +++ b/cmd/frostfs-cli/modules/object/head.go @@ -70,6 +70,10 @@ func getObjectHeader(cmd *cobra.Command, _ []string) { return } + if ok := printECInfoErr(cmd, err); ok { + return + } + commonCmd.ExitOnErr(cmd, "rpc error: %w", err) } diff --git a/cmd/frostfs-cli/modules/object/range.go b/cmd/frostfs-cli/modules/object/range.go index 0eee7bdba..9ba752237 100644 --- a/cmd/frostfs-cli/modules/object/range.go +++ b/cmd/frostfs-cli/modules/object/range.go @@ -146,6 +146,50 @@ func marshalSplitInfo(cmd *cobra.Command, info *objectSDK.SplitInfo) ([]byte, er } } +func printECInfoErr(cmd *cobra.Command, err error) bool { + var errECInfo *objectSDK.ECInfoError + + ok := errors.As(err, &errECInfo) + + if ok { + cmd.PrintErrln("Object is erasure-encoded, ec information received.") + printECInfo(cmd, errECInfo.ECInfo()) + } + + return ok +} + +func printECInfo(cmd *cobra.Command, info *objectSDK.ECInfo) { + bs, err := marshalECInfo(cmd, info) + commonCmd.ExitOnErr(cmd, "can't marshal split info: %w", err) + + cmd.Println(string(bs)) +} + +func marshalECInfo(cmd *cobra.Command, info *objectSDK.ECInfo) ([]byte, error) { + toJSON, _ := cmd.Flags().GetBool(commonflags.JSON) + toProto, _ := cmd.Flags().GetBool("proto") + switch { + case toJSON && toProto: + return nil, errors.New("'--json' and '--proto' flags are mutually exclusive") + case toJSON: + return info.MarshalJSON() + case toProto: + return info.Marshal() + default: + b := bytes.NewBuffer(nil) + b.WriteString("Total chunks: " + strconv.Itoa(int(info.Chunks[0].Total))) + for _, chunk := range info.Chunks { + var id oid.ID + if err := id.Decode(chunk.ID.GetValue()); err != nil { + return nil, fmt.Errorf("unable to decode chunk id: %w", err) + } + b.WriteString("\n Index: " + strconv.Itoa(int(chunk.Index)) + " ID: " + id.String()) + } + return b.Bytes(), nil + } +} + func getRangeList(cmd *cobra.Command) ([]objectSDK.Range, error) { v := cmd.Flag("range").Value.String() if len(v) == 0 { diff --git a/cmd/frostfs-node/object.go b/cmd/frostfs-node/object.go index 6160bbc76..a7a084fd1 100644 --- a/cmd/frostfs-node/object.go +++ b/cmd/frostfs-node/object.go @@ -173,7 +173,7 @@ func initObjectService(c *cfg) { sSearchV2 := createSearchSvcV2(sSearch, keyStorage) - sGet := createGetService(c, keyStorage, traverseGen, c.clientCache) + sGet := createGetService(c, keyStorage, traverseGen, c.clientCache, c.cfgObject.cnrSource) *c.cfgObject.getSvc = *sGet // need smth better @@ -358,6 +358,7 @@ func createSearchSvcV2(sSearch *searchsvc.Service, keyStorage *util.KeyStorage) func createGetService(c *cfg, keyStorage *util.KeyStorage, traverseGen *util.TraverserGenerator, coreConstructor *cache.ClientCache, + containerSource containercore.Source, ) *getsvc.Service { ls := c.cfgObject.cfgLocalStorage.localStorage @@ -369,6 +370,7 @@ func createGetService(c *cfg, keyStorage *util.KeyStorage, traverseGen *util.Tra placement.SuccessAfter(1), ), coreConstructor, + containerSource, getsvc.WithLogger(c.log)) } diff --git a/go.mod b/go.mod index 8cfea305d..a85493be2 100644 --- a/go.mod +++ b/go.mod @@ -2,6 +2,11 @@ module git.frostfs.info/TrueCloudLab/frostfs-node go 1.21 +replace ( + git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.16.1-0.20240327095603-491a47e7fe24 => /home/annikifa/workspace/frostfs-api-go + git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20240329104804-ec0cb2169f92 => /home/annikifa/workspace/frostfs-sdk-go +) + require ( code.gitea.io/sdk/gitea v0.17.1 git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.16.1-0.20240327095603-491a47e7fe24 diff --git a/go.sum b/go.sum index 76ed7e5dc963cac023ebe5deadddb39c17afc41a..20dc153edc664e32f4a3be9b7fa057332c3ff4f2 100644 GIT binary patch delta 18 acmaELk?F!&rU_>^KVaO*w0Q<=&|&~!lnF)v delta 419 zcmb8qOHRT-0LJm8ZahYpCZuIv0^^P{5z70~2g5>B+M%r!3MI6{!ZVm~1W!Ob2MJ!o zl}C^!E?jE9{rAiNe3Cvtq}S7fvkDbIE~ZFMkqS#HRDg;AA^@W@ptmJ}6+)py%A!0I zh{yrF+v@WOELwKP^qM&_rd@l2D~&d1nog6B?^OdnuqU8K!@=P!-H3k@ML$*-v6o9d zjg685TU&5XEEaF$G3K2}bX~&wnx)xB9ox)6{T7UK>3BLyhcD@Ryz%CjC^-e{Wy((u zg@9B5QG^TJDS{FH diff --git a/internal/logs/logs.go b/internal/logs/logs.go index bd67217c4..82e9e837e 100644 --- a/internal/logs/logs.go +++ b/internal/logs/logs.go @@ -99,9 +99,13 @@ const ( GetRemoteCallFailed = "remote call failed" GetCanNotAssembleTheObject = "can not assemble the object" GetTryingToAssembleTheObject = "trying to assemble the object..." + GetTryingToAssembleTheECObject = "trying to assemble the ec object..." GetAssemblingSplittedObject = "assembling splitted object..." + GetAssemblingECObject = "assembling erasure coded object..." GetAssemblingSplittedObjectCompleted = "assembling splitted object completed" + GetAssemblingECObjectCompleted = "assembling erasure coded object completed" GetFailedToAssembleSplittedObject = "failed to assemble splitted object" + GetFailedToAssembleECObject = "failed to assemble erasure coded object" GetCouldNotGenerateContainerTraverser = "could not generate container traverser" GetCouldNotConstructRemoteNodeClient = "could not construct remote node client" GetCouldNotWriteHeader = "could not write header" @@ -111,6 +115,7 @@ const ( GetCompletingTheOperation = "completing the operation" GetRequestedObjectWasMarkedAsRemoved = "requested object was marked as removed" GetRequestedObjectIsVirtual = "requested object is virtual" + GetRequestedObjectIsEC = "requested object is erasure coded" GetRequestedRangeIsOutOfObjectBounds = "requested range is out of object bounds" PutAdditionalContainerBroadcastFailure = "additional container broadcast failure" SearchReturnResultDirectly = "return result directly" diff --git a/pkg/local_object_storage/engine/get.go b/pkg/local_object_storage/engine/get.go index f77c44226..991af3d1a 100644 --- a/pkg/local_object_storage/engine/get.go +++ b/pkg/local_object_storage/engine/get.go @@ -88,6 +88,10 @@ func (e *StorageEngine) get(ctx context.Context, prm GetPrm) (GetRes, error) { return GetRes{}, logicerr.Wrap(objectSDK.NewSplitInfoError(it.SplitInfo)) } + if it.ECInfo != nil { + return GetRes{}, logicerr.Wrap(objectSDK.NewECInfoError(it.ECInfo)) + } + if it.ObjectExpired { return GetRes{}, errNotFound } @@ -119,6 +123,7 @@ func (e *StorageEngine) get(ctx context.Context, prm GetPrm) (GetRes, error) { type getShardIterator struct { Object *objectSDK.Object SplitInfo *objectSDK.SplitInfo + ECInfo *objectSDK.ECInfo OutError error ShardWithMeta hashedShard MetaError error @@ -130,6 +135,7 @@ type getShardIterator struct { Engine *StorageEngine splitInfoErr *objectSDK.SplitInfoError + ecInfoErr *objectSDK.ECInfoError } func (i *getShardIterator) tryGetWithMeta(ctx context.Context) { @@ -164,6 +170,14 @@ func (i *getShardIterator) tryGetWithMeta(ctx context.Context) { // stop iterating over shards if SplitInfo structure is complete return withLink && withLast + case errors.As(err, &i.ecInfoErr): + if i.ECInfo == nil { + i.ECInfo = objectSDK.NewECInfo() + } + + util.MergeECInfo(i.ecInfoErr.ECInfo(), i.ECInfo) + // stop iterating over shards if ECInfo structure is complete + return len(i.ECInfo.Chunks) == int(i.ECInfo.Chunks[0].Total) case client.IsErrObjectAlreadyRemoved(err): i.OutError = err return true // stop, return it back diff --git a/pkg/local_object_storage/engine/head.go b/pkg/local_object_storage/engine/head.go index ba5e7cc1d..d5411beb1 100644 --- a/pkg/local_object_storage/engine/head.go +++ b/pkg/local_object_storage/engine/head.go @@ -76,6 +76,8 @@ func (e *StorageEngine) head(ctx context.Context, prm HeadPrm) (HeadRes, error) head *objectSDK.Object siErr *objectSDK.SplitInfoError outSI *objectSDK.SplitInfo + eiErr *objectSDK.ECInfoError + outEI *objectSDK.ECInfo outError error = new(apistatus.ObjectNotFound) ) @@ -106,9 +108,15 @@ func (e *StorageEngine) head(ctx context.Context, prm HeadPrm) (HeadRes, error) } return false + case errors.As(err, &eiErr): + if outEI == nil { + outEI = objectSDK.NewECInfo() + } + util.MergeECInfo(eiErr.ECInfo(), outEI) + // stop iterating over shards if ECInfo structure is complete + return len(outEI.Chunks) == int(outEI.Chunks[0].Total) case client.IsErrObjectAlreadyRemoved(err): outError = err - return true // stop, return it back case shard.IsErrObjectExpired(err): // object is found but should not @@ -123,15 +131,14 @@ func (e *StorageEngine) head(ctx context.Context, prm HeadPrm) (HeadRes, error) } head = res.Object() - return true }) if outSI != nil { return HeadRes{}, logicerr.Wrap(objectSDK.NewSplitInfoError(outSI)) - } - - if head == nil { + } else if outEI != nil { + return HeadRes{}, logicerr.Wrap(objectSDK.NewECInfoError(outEI)) + } else if head == nil { return HeadRes{}, outError } diff --git a/pkg/local_object_storage/metabase/exists.go b/pkg/local_object_storage/metabase/exists.go index aa9aba106..bf6766c05 100644 --- a/pkg/local_object_storage/metabase/exists.go +++ b/pkg/local_object_storage/metabase/exists.go @@ -108,6 +108,10 @@ func (db *DB) exists(tx *bbolt.Tx, addr oid.Address, currEpoch uint64) (exists b return false, logicerr.Wrap(objectSDK.NewSplitInfoError(splitInfo)) } + // if parent bucket is empty, then check if object exists in ec bucket + if data := getFromBucket(tx, ecInfoBucketName(cnr, key), objKey); len(data) != 0 { + return false, getECInfoError(tx, cnr, data) + } // if parent bucket is empty, then check if object exists in typed buckets return firstIrregularObjectType(tx, cnr, objKey) != objectSDK.TypeRegular, nil diff --git a/pkg/local_object_storage/metabase/get.go b/pkg/local_object_storage/metabase/get.go index d18331a3d..7bcd65252 100644 --- a/pkg/local_object_storage/metabase/get.go +++ b/pkg/local_object_storage/metabase/get.go @@ -5,6 +5,8 @@ import ( "fmt" "time" + "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object" + "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/refs" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" @@ -110,6 +112,11 @@ func (db *DB) get(tx *bbolt.Tx, addr oid.Address, key []byte, checkStatus, raw b return obj, obj.Unmarshal(data) } + data = getFromBucket(tx, ecInfoBucketName(cnr, bucketName), key) + if len(data) != 0 { + return nil, getECInfoError(tx, cnr, data) + } + // if not found then check in tombstone index data = getFromBucket(tx, tombstoneBucketName(cnr, bucketName), key) if len(data) != 0 { @@ -185,3 +192,29 @@ func getSplitInfoError(tx *bbolt.Tx, cnr cid.ID, key []byte) error { return logicerr.Wrap(new(apistatus.ObjectNotFound)) } + +func getECInfoError(tx *bbolt.Tx, cnr cid.ID, data []byte) error { + offset := 0 + ecInfo := objectSDK.NewECInfo() + for offset < len(data) { + key := data[offset : offset+objectKeySize] + // check in primary index + ojbData := getFromBucket(tx, primaryBucketName(cnr, make([]byte, bucketKeySize)), key) + if len(data) != 0 { + obj := objectSDK.New() + if err := obj.Unmarshal(ojbData); err != nil { + return err + } + chunk := object.ECChunk{} + id, _ := obj.ID() + objV2 := new(refs.ObjectID) + id.WriteToV2(objV2) + chunk.ID = *objV2 + chunk.Index = obj.ECHeader().Index() + chunk.Total = obj.ECHeader().Total() + ecInfo.Chunks = append(ecInfo.Chunks, chunk) + } + offset += objectKeySize + } + return logicerr.Wrap(objectSDK.NewECInfoError(ecInfo)) +} diff --git a/pkg/local_object_storage/metabase/get_test.go b/pkg/local_object_storage/metabase/get_test.go index af6b41327..2b92778af 100644 --- a/pkg/local_object_storage/metabase/get_test.go +++ b/pkg/local_object_storage/metabase/get_test.go @@ -3,6 +3,7 @@ package meta_test import ( "bytes" "context" + "errors" "fmt" "os" "runtime" @@ -15,8 +16,10 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client" cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/erasurecode" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test" + "github.com/nspcc-dev/neo-go/pkg/crypto/keys" "github.com/stretchr/testify/require" ) @@ -109,6 +112,42 @@ func TestDB_Get(t *testing.T) { require.True(t, binaryEqual(child.CutPayload(), newChild)) }) + t.Run("put erasure coded object", func(t *testing.T) { + cnr := cidtest.ID() + virtual := testutil.GenerateObjectWithCID(cnr) + c, err := erasurecode.NewConstructor(3, 1) + require.NoError(t, err) + pk, err := keys.NewPrivateKey() + require.NoError(t, err) + parts, err := c.Split(virtual, &pk.PrivateKey) + require.NoError(t, err) + for _, part := range parts { + err = putBig(db, part) + var eiError *objectSDK.ECInfoError + if err != nil && !errors.As(err, &eiError) { + require.NoError(t, err) + } + } + _, err = metaGet(db, object.AddressOf(virtual), true) + var eiError *objectSDK.ECInfoError + require.ErrorAs(t, err, &eiError) + require.Equal(t, len(eiError.ECInfo().Chunks), len(parts)) + for _, chunk := range eiError.ECInfo().Chunks { + var found bool + for _, part := range parts { + partID, _ := part.ID() + var chunkID oid.ID + require.NoError(t, chunkID.ReadFromV2(chunk.ID)) + if chunkID.Equals(partID) { + found = true + } + } + if !found { + require.Fail(t, "chunk not found") + } + } + }) + t.Run("get removed object", func(t *testing.T) { obj := oidtest.Address() ts := oidtest.Address() diff --git a/pkg/local_object_storage/metabase/put.go b/pkg/local_object_storage/metabase/put.go index 429d981fe..2822303a0 100644 --- a/pkg/local_object_storage/metabase/put.go +++ b/pkg/local_object_storage/metabase/put.go @@ -1,6 +1,7 @@ package meta import ( + "bytes" "context" "encoding/binary" "errors" @@ -264,6 +265,13 @@ func putUniqueIndexes( if err != nil { return err } + + if ecHead := obj.GetECHeader(); ecHead != nil { + err = putECInfo(tx, cnr, objKey, ecHead) + if err != nil { + return err + } + } } return nil @@ -571,3 +579,34 @@ func isLinkObject(obj *objectSDK.Object) bool { func isLastObject(obj *objectSDK.Object) bool { return len(obj.Children()) == 0 && obj.Parent() != nil } + +func putECInfo(tx *bbolt.Tx, + cnr cid.ID, objKey []byte, + ecHead *objectSDK.ECHeader, +) error { + parentID := objectKey(ecHead.Parent(), make([]byte, objectKeySize)) + bucketName := make([]byte, bucketKeySize) + + val := getFromBucket(tx, ecInfoBucketName(cnr, bucketName), parentID) + if len(val) == 0 { + val = objKey + } else { + offset := 0 + found := false + for offset < len(val) { + if bytes.Equal(objKey, val[offset:offset+objectKeySize]) { + found = true + break + } + offset += objectKeySize + } + if !found { + val = append(val, objKey...) + } + } + return putUniqueIndexItem(tx, namedBucketItem{ + name: ecInfoBucketName(cnr, make([]byte, bucketKeySize)), + key: parentID, + val: val, + }) +} diff --git a/pkg/local_object_storage/metabase/util.go b/pkg/local_object_storage/metabase/util.go index d46a421a3..9249ae49b 100644 --- a/pkg/local_object_storage/metabase/util.go +++ b/pkg/local_object_storage/metabase/util.go @@ -119,6 +119,11 @@ const ( // Key: container ID + type // Value: container size in bytes as little-endian uint64 containerCountersPrefix + + // ecInfoPrefix is used for storing relation between EC parent id and chunk id. + // Key: container ID + type + // Value: Object id + ecInfoPrefix ) const ( @@ -190,6 +195,11 @@ func splitBucketName(cnr cid.ID, key []byte) []byte { return bucketName(cnr, splitPrefix, key) } +// ecInfoBucketName returns _ecinfo. +func ecInfoBucketName(cnr cid.ID, key []byte) []byte { + return bucketName(cnr, ecInfoPrefix, key) +} + // addressKey returns key for K-V tables when key is a whole address. func addressKey(addr oid.Address, key []byte) []byte { addr.Container().Encode(key) diff --git a/pkg/local_object_storage/util/ecinfo.go b/pkg/local_object_storage/util/ecinfo.go new file mode 100644 index 000000000..258cce763 --- /dev/null +++ b/pkg/local_object_storage/util/ecinfo.go @@ -0,0 +1,28 @@ +package util + +import ( + "bytes" + + "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object" + objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" +) + +// MergeECInfo ignores conflicts and rewrites `to` with non empty values +// from `from`. +func MergeECInfo(from, to *objectSDK.ECInfo) *objectSDK.ECInfo { + var ext []object.ECChunk + for _, fchunk := range from.Chunks { + add := true + for _, tchunk := range to.Chunks { + if bytes.Equal(tchunk.ID.GetValue(), fchunk.ID.GetValue()) { + add = false + break + } + } + if add { + ext = append(ext, fchunk) + } + } + to.Chunks = append(to.Chunks, ext...) + return to +} diff --git a/pkg/local_object_storage/util/ecinfo_test.go b/pkg/local_object_storage/util/ecinfo_test.go new file mode 100644 index 000000000..e2d6d471c --- /dev/null +++ b/pkg/local_object_storage/util/ecinfo_test.go @@ -0,0 +1,58 @@ +package util + +import ( + "crypto/rand" + "testing" + + "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object" + "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/refs" + objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" + "github.com/stretchr/testify/require" +) + +func TestMergeECInfo(t *testing.T) { + id := generateV2ID() + target := objectSDK.NewECInfo() + var chunk object.ECChunk + chunk.Total = 2 + chunk.Index = 0 + chunk.ID = id + target.Chunks = append(target.Chunks, chunk) + + t.Run("merge empty", func(t *testing.T) { + to := objectSDK.NewECInfo() + + result := MergeECInfo(target, to) + require.Equal(t, result, target) + }) + + t.Run("merge existed", func(t *testing.T) { + to := objectSDK.NewECInfo() + to.Chunks = append(to.Chunks, chunk) + + result := MergeECInfo(target, to) + require.Equal(t, result, target) + }) + t.Run("merge extend", func(t *testing.T) { + to := objectSDK.NewECInfo() + id := generateV2ID() + var chunk object.ECChunk + chunk.Total = 2 + chunk.Index = 1 + chunk.ID = id + to.Chunks = append(to.Chunks, chunk) + + result := MergeECInfo(target, to) + require.Equal(t, len(result.Chunks), 2) + }) +} + +func generateV2ID() refs.ObjectID { + var buf [32]byte + _, _ = rand.Read(buf[:]) + + var id refs.ObjectID + id.SetValue(buf[:]) + + return id +} diff --git a/pkg/network/cache/multi.go b/pkg/network/cache/multi.go index 5dd206283..f19510d76 100644 --- a/pkg/network/cache/multi.go +++ b/pkg/network/cache/multi.go @@ -167,8 +167,9 @@ func (x *multiClient) iterateClients(ctx context.Context, f func(clientcore.Clie // from the SDK client; should not be considered // as a connection error var siErr *objectSDK.SplitInfoError + var eiErr *objectSDK.ECInfoError - success := err == nil || errors.Is(err, context.Canceled) || errors.As(err, &siErr) + success := err == nil || errors.Is(err, context.Canceled) || errors.As(err, &siErr) || errors.As(err, &eiErr) if success || firstErr == nil || errors.Is(firstErr, errRecentlyFailed) { firstErr = err } @@ -196,7 +197,8 @@ func (x *multiClient) ReportError(err error) { // from the SDK client; should not be considered // as a connection error var siErr *objectSDK.SplitInfoError - if errors.As(err, &siErr) { + var eiErr *objectSDK.ECInfoError + if errors.As(err, &siErr) || errors.As(err, &eiErr) { return } diff --git a/pkg/services/object/get/assembleec.go b/pkg/services/object/get/assembleec.go new file mode 100644 index 000000000..3989e44b8 --- /dev/null +++ b/pkg/services/object/get/assembleec.go @@ -0,0 +1,87 @@ +package getsvc + +import ( + "context" + "errors" + + "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" + apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" + "go.uber.org/zap" +) + +func (r *request) assembleEC(ctx context.Context) { + if r.isRaw() { + r.log.Debug(logs.GetCanNotAssembleTheObject) + return + } + + // Any access tokens are not expected to be used in the assembly process: + // - there is no requirement to specify child objects in session/bearer + // token for `GET`/`GETRANGE`/`RANGEHASH` requests in the API protocol, + // and, therefore, their missing in the original request should not be + // considered as error; on the other hand, without session for every child + // object, it is impossible to attach bearer token in the new generated + // requests correctly because the token has not been issued for that node's + // key; + // - the assembly process is expected to be handled on a container node + // only since the requests forwarding mechanism presentation; such the + // node should have enough rights for getting any child object by design. + r.prm.common.ForgetTokens() + + // Do not use forwarding during assembly stage. + // Request forwarding closure inherited in produced + // `execCtx` so it should be disabled there. + r.disableForwarding() + + r.log.Debug(logs.GetTryingToAssembleTheECObject) + + assembler := newAssemblerEC(r.address(), r.infoEC, r.ctxRange(), r, r.containerSource) + + r.log.Debug(logs.GetAssemblingECObject, + zap.Stringer("address", r.address()), + zap.Uint64("range_offset", r.ctxRange().GetOffset()), + zap.Uint64("range_length", r.ctxRange().GetLength()), + ) + defer r.log.Debug(logs.GetAssemblingECObjectCompleted, + zap.Stringer("address", r.address()), + zap.Uint64("range_offset", r.ctxRange().GetOffset()), + zap.Uint64("range_length", r.ctxRange().GetLength()), + ) + + obj, err := assembler.Assemble(ctx, r.prm.objWriter, r.headOnly()) + if err != nil { + r.log.Warn(logs.GetFailedToAssembleECObject, + zap.Error(err), + zap.Stringer("address", r.address()), + zap.Uint64("range_offset", r.ctxRange().GetOffset()), + zap.Uint64("range_length", r.ctxRange().GetLength()), + ) + } + + var errRemovedRemote *apistatus.ObjectAlreadyRemoved + var errOutOfRangeRemote *apistatus.ObjectOutOfRange + var errRemovedLocal *apistatus.ObjectAlreadyRemoved + var errOutOfRangeLocal *apistatus.ObjectOutOfRange + + switch { + default: + r.status = statusUndefined + r.err = err + case err == nil: + r.status = statusOK + r.err = nil + r.collectedObject = obj + case errors.As(err, &errRemovedRemote): + r.status = statusINHUMED + r.err = errRemovedRemote + case errors.As(err, &errRemovedLocal): + r.status = statusINHUMED + r.err = errRemovedLocal + case errors.As(err, &errOutOfRangeRemote): + r.status = statusOutOfRange + r.err = errOutOfRangeRemote + case errors.As(err, &errOutOfRangeLocal): + r.status = statusOutOfRange + r.err = errOutOfRangeLocal + } +} diff --git a/pkg/services/object/get/assemblerec.go b/pkg/services/object/get/assemblerec.go new file mode 100644 index 000000000..9e1bc5ab5 --- /dev/null +++ b/pkg/services/object/get/assemblerec.go @@ -0,0 +1,93 @@ +package getsvc + +import ( + "context" + "fmt" + + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/policy" + objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/erasurecode" + oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" +) + +type assemblerec struct { + addr oid.Address + ecInfo *objectSDK.ECInfo + rng *objectSDK.Range + objGetter objectGetter + cs container.Source +} + +func newAssemblerEC( + addr oid.Address, + ecInfo *objectSDK.ECInfo, + rng *objectSDK.Range, + objGetter objectGetter, + cs container.Source, +) *assemblerec { + return &assemblerec{ + addr: addr, + rng: rng, + ecInfo: ecInfo, + objGetter: objGetter, + cs: cs, + } +} + +// Assemble assembles erasure coded object and writes it's content to ObjectWriter. +// It returns parent object. +func (a *assemblerec) Assemble(ctx context.Context, writer ObjectWriter, headOnly bool) (*objectSDK.Object, error) { + parts := make([]*objectSDK.Object, int(a.ecInfo.Chunks[0].Total)) + for _, chunk := range a.ecInfo.Chunks { + objID := new(oid.ID) + err := objID.ReadFromV2(chunk.ID) + if err != nil { + return nil, fmt.Errorf("invalid object ID: %w", err) + } + var obj *objectSDK.Object + if headOnly { + obj, err = a.objGetter.HeadObject(ctx, *objID) + if err != nil { + return nil, err + } + } else { + sow := NewSimpleObjectWriter() + obj, err = a.objGetter.GetObjectAndWritePayload(ctx, *objID, nil, sow) + if err != nil { + return nil, err + } + obj.SetPayload(sow.pld) + } + parts[chunk.Index] = obj + } + cnt, err := a.cs.Get(a.addr.Container()) + if err != nil { + return nil, err + } + c, err := erasurecode.NewConstructor( + policy.ECDataCount(cnt.Value.PlacementPolicy()), + policy.ECParityCount(cnt.Value.PlacementPolicy()), + ) + if err != nil { + return nil, err + } + if headOnly { + obj, err := c.ReconstructHeader(parts) + if err == nil { + return obj, writer.WriteHeader(ctx, obj) + } + return nil, err + } + obj, err := c.Reconstruct(parts) + if err == nil { + err = writer.WriteHeader(ctx, obj.CutPayload()) + if err == nil { + err = writer.WriteChunk(ctx, obj.Payload()) + if err != nil { + return nil, err + } + } + } + return obj, err +} diff --git a/pkg/services/object/get/get.go b/pkg/services/object/get/get.go index 3d5547047..3e70200ee 100644 --- a/pkg/services/object/get/get.go +++ b/pkg/services/object/get/get.go @@ -73,9 +73,11 @@ func (s *Service) get(ctx context.Context, prm RequestParameters) error { remoteStorageConstructor: s.remoteStorageConstructor, epochSource: s.epochSource, localStorage: s.localStorage, + containerSource: s.containerSource, prm: prm, infoSplit: objectSDK.NewSplitInfo(), + infoEC: objectSDK.NewECInfo(), } exec.setLogger(s.log) @@ -106,6 +108,16 @@ func (exec *request) analyzeStatus(ctx context.Context, execCnr bool) { exec.assemble(ctx) case statusOutOfRange: exec.log.Debug(logs.GetRequestedRangeIsOutOfObjectBounds) + case statusEC: + if !exec.isLocal() { + if execCnr { + exec.executeOnContainer(ctx) + exec.analyzeStatus(ctx, false) + } else { + exec.log.Debug(logs.GetRequestedObjectIsEC) + exec.assembleEC(ctx) + } + } default: exec.log.Debug(logs.OperationFinishedWithError, zap.Error(exec.err), diff --git a/pkg/services/object/get/local.go b/pkg/services/object/get/local.go index 257465019..fcfc9befc 100644 --- a/pkg/services/object/get/local.go +++ b/pkg/services/object/get/local.go @@ -5,6 +5,7 @@ import ( "errors" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" @@ -22,6 +23,7 @@ func (r *request) executeLocal(ctx context.Context) { r.collectedObject, err = r.get(ctx) var errSplitInfo *objectSDK.SplitInfoError + var errECInfo *objectSDK.ECInfoError var errRemoved *apistatus.ObjectAlreadyRemoved var errOutOfRange *apistatus.ObjectOutOfRange @@ -42,6 +44,10 @@ func (r *request) executeLocal(ctx context.Context) { r.status = statusVIRTUAL mergeSplitInfo(r.splitInfo(), errSplitInfo.SplitInfo()) r.err = objectSDK.NewSplitInfoError(r.infoSplit) + case errors.As(err, &errECInfo): + r.status = statusEC + util.MergeECInfo(errECInfo.ECInfo(), r.infoEC) + r.err = objectSDK.NewECInfoError(r.infoEC) case errors.As(err, &errOutOfRange): r.status = statusOutOfRange r.err = errOutOfRange diff --git a/pkg/services/object/get/remote.go b/pkg/services/object/get/remote.go index cd94434cf..c44f6ce6d 100644 --- a/pkg/services/object/get/remote.go +++ b/pkg/services/object/get/remote.go @@ -7,6 +7,7 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" @@ -27,15 +28,18 @@ func (r *request) processNode(ctx context.Context, info client.NodeInfo) bool { obj, err := r.getRemote(ctx, rs, info) var errSplitInfo *objectSDK.SplitInfoError + var errECInfo *objectSDK.ECInfoError var errRemoved *apistatus.ObjectAlreadyRemoved var errOutOfRange *apistatus.ObjectOutOfRange switch { default: + r.log.Debug(logs.GetRemoteCallFailed, zap.Error(err)) + if r.status == statusEC { + return len(r.infoEC.Chunks) == int(r.infoEC.Chunks[0].Total) + } r.status = statusUndefined r.err = new(apistatus.ObjectNotFound) - - r.log.Debug(logs.GetRemoteCallFailed, zap.Error(err)) case err == nil: r.status = statusOK r.err = nil @@ -57,6 +61,13 @@ func (r *request) processNode(ctx context.Context, info client.NodeInfo) bool { r.status = statusVIRTUAL mergeSplitInfo(r.splitInfo(), errSplitInfo.SplitInfo()) r.err = objectSDK.NewSplitInfoError(r.infoSplit) + case errors.As(err, &errECInfo): + r.status = statusEC + util.MergeECInfo(r.infoEC, errECInfo.ECInfo()) + r.infoEC = errECInfo.ECInfo() + r.err = objectSDK.NewECInfoError(r.infoEC) + // TODO maybe we don't need to get all chunks here + return len(r.infoEC.Chunks) == int(r.infoEC.Chunks[0].Total) } return r.status != statusUndefined diff --git a/pkg/services/object/get/request.go b/pkg/services/object/get/request.go index b9223a637..d0b79e30c 100644 --- a/pkg/services/object/get/request.go +++ b/pkg/services/object/get/request.go @@ -6,6 +6,7 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" clientcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" @@ -22,6 +23,8 @@ type request struct { infoSplit *objectSDK.SplitInfo + infoEC *objectSDK.ECInfo + log *logger.Logger collectedObject *objectSDK.Object @@ -33,6 +36,7 @@ type request struct { traverserGenerator traverserGenerator remoteStorageConstructor remoteStorageConstructor localStorage localStorage + containerSource container.Source } func (r *request) setLogger(l *logger.Logger) { diff --git a/pkg/services/object/get/service.go b/pkg/services/object/get/service.go index bdf01a977..3413abeb7 100644 --- a/pkg/services/object/get/service.go +++ b/pkg/services/object/get/service.go @@ -1,6 +1,7 @@ package getsvc import ( + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" "go.uber.org/zap" ) @@ -16,6 +17,7 @@ type Service struct { epochSource epochSource keyStore keyStorage remoteStorageConstructor remoteStorageConstructor + containerSource container.Source } // New creates, initializes and returns utility serving @@ -26,6 +28,7 @@ func New( e localStorageEngine, tg traverserGenerator, cc clientConstructor, + cs container.Source, opts ...Option, ) *Service { result := &Service{ @@ -39,6 +42,7 @@ func New( remoteStorageConstructor: &multiclientRemoteStorageConstructor{ clientConstructor: cc, }, + containerSource: cs, } for _, option := range opts { option(result) diff --git a/pkg/services/object/get/status.go b/pkg/services/object/get/status.go index 3a5eebe32..919338d7f 100644 --- a/pkg/services/object/get/status.go +++ b/pkg/services/object/get/status.go @@ -6,6 +6,7 @@ const ( statusINHUMED statusVIRTUAL statusOutOfRange + statusEC ) type statusError struct { diff --git a/pkg/services/object/get/v2/get_forwarder.go b/pkg/services/object/get/v2/get_forwarder.go index 40aa3f62e..774f98643 100644 --- a/pkg/services/object/get/v2/get_forwarder.go +++ b/pkg/services/object/get/v2/get_forwarder.go @@ -166,6 +166,9 @@ func (f *getRequestForwarder) readStream(ctx context.Context, c client.MultiAddr case *objectV2.SplitInfo: si := objectSDK.NewSplitInfoFromV2(v) return objectSDK.NewSplitInfoError(si) + case *objectV2.ECInfo: + ei := objectSDK.NewECInfoFromV2(v) + return objectSDK.NewECInfoError(ei) } } return nil diff --git a/pkg/services/object/get/v2/head_forwarder.go b/pkg/services/object/get/v2/head_forwarder.go index a1bce1517..11286321a 100644 --- a/pkg/services/object/get/v2/head_forwarder.go +++ b/pkg/services/object/get/v2/head_forwarder.go @@ -84,6 +84,9 @@ func (f *headRequestForwarder) forwardRequestToNode(ctx context.Context, addr ne case *objectV2.SplitInfo: si := objectSDK.NewSplitInfoFromV2(v) return nil, objectSDK.NewSplitInfoError(si) + case *objectV2.ECInfo: + ei := objectSDK.NewECInfoFromV2(v) + return nil, objectSDK.NewECInfoError(ei) } objv2 := new(objectV2.Object) diff --git a/pkg/services/object/get/v2/service.go b/pkg/services/object/get/v2/service.go index bcdc4120e..682128df6 100644 --- a/pkg/services/object/get/v2/service.go +++ b/pkg/services/object/get/v2/service.go @@ -82,10 +82,13 @@ func (s *Service) Get(req *objectV2.GetRequest, stream objectSvc.GetObjectStream err = s.svc.Get(stream.Context(), *p) var splitErr *objectSDK.SplitInfoError + var ecErr *objectSDK.ECInfoError switch { case errors.As(err, &splitErr): return stream.Send(splitInfoResponse(splitErr.SplitInfo())) + case errors.As(err, &ecErr): + return stream.Send(ecInfoResponse(ecErr.ECInfo())) default: return err } @@ -123,11 +126,16 @@ func (s *Service) Head(ctx context.Context, req *objectV2.HeadRequest) (*objectV err = s.svc.Head(ctx, *p) var splitErr *objectSDK.SplitInfoError + var ecErr *objectSDK.ECInfoError if errors.As(err, &splitErr) { setSplitInfoHeadResponse(splitErr.SplitInfo(), resp) err = nil } + if errors.As(err, &ecErr) { + setECInfoHeadResponse(ecErr.ECInfo(), resp) + err = nil + } return resp, err } diff --git a/pkg/services/object/get/v2/util.go b/pkg/services/object/get/v2/util.go index 7f7dd7480..da6428985 100644 --- a/pkg/services/object/get/v2/util.go +++ b/pkg/services/object/get/v2/util.go @@ -270,6 +270,17 @@ func splitInfoResponse(info *objectSDK.SplitInfo) *objectV2.GetResponse { return resp } +func ecInfoResponse(info *objectSDK.ECInfo) *objectV2.GetResponse { + resp := new(objectV2.GetResponse) + + body := new(objectV2.GetResponseBody) + resp.SetBody(body) + + body.SetObjectPart(info.ToV2()) + + return resp +} + func splitInfoRangeResponse(info *objectSDK.SplitInfo) *objectV2.GetRangeResponse { resp := new(objectV2.GetRangeResponse) @@ -285,6 +296,10 @@ func setSplitInfoHeadResponse(info *objectSDK.SplitInfo, resp *objectV2.HeadResp resp.GetBody().SetHeaderPart(info.ToV2()) } +func setECInfoHeadResponse(info *objectSDK.ECInfo, resp *objectV2.HeadResponse) { + resp.GetBody().SetHeaderPart(info.ToV2()) +} + func toHashResponse(typ refs.ChecksumType, res *getsvc.RangeHashRes) *objectV2.GetRangeHashResponse { resp := new(objectV2.GetRangeHashResponse) -- 2.45.2