node: Implement Get\Head requests for EC object #1103

Merged
fyrchik merged 4 commits from acid-ant/frostfs-node:feature/ec-get-head into master 2024-09-04 19:51:08 +00:00
35 changed files with 590 additions and 37 deletions

View file

@ -16,7 +16,7 @@ repos:
- id: trailing-whitespace
args: [--markdown-linebreak-ext=md]
- id: end-of-file-fixer
exclude: ".key$"
exclude: "(.key|.svg)$"
- repo: https://github.com/shellcheck-py/shellcheck-py
rev: v0.9.0.6

View file

@ -99,6 +99,10 @@ func getObject(cmd *cobra.Command, _ []string) {
return
}
if ok := printECInfoErr(cmd, err); ok {
return
}
commonCmd.ExitOnErr(cmd, "rpc error: %w", err)
}

View file

@ -70,6 +70,10 @@ func getObjectHeader(cmd *cobra.Command, _ []string) {
return
}
if ok := printECInfoErr(cmd, err); ok {
return
}
commonCmd.ExitOnErr(cmd, "rpc error: %w", err)
}

View file

@ -146,6 +146,50 @@ func marshalSplitInfo(cmd *cobra.Command, info *objectSDK.SplitInfo) ([]byte, er
}
}
func printECInfoErr(cmd *cobra.Command, err error) bool {
var errECInfo *objectSDK.ECInfoError
ok := errors.As(err, &errECInfo)
if ok {
cmd.PrintErrln("Object is erasure-encoded, ec information received.")
printECInfo(cmd, errECInfo.ECInfo())
}
return ok
}
func printECInfo(cmd *cobra.Command, info *objectSDK.ECInfo) {
bs, err := marshalECInfo(cmd, info)
commonCmd.ExitOnErr(cmd, "can't marshal split info: %w", err)
cmd.Println(string(bs))
}
func marshalECInfo(cmd *cobra.Command, info *objectSDK.ECInfo) ([]byte, error) {
toJSON, _ := cmd.Flags().GetBool(commonflags.JSON)
toProto, _ := cmd.Flags().GetBool("proto")
switch {
case toJSON && toProto:
return nil, errors.New("'--json' and '--proto' flags are mutually exclusive")
case toJSON:
return info.MarshalJSON()
case toProto:
return info.Marshal()
default:
b := bytes.NewBuffer(nil)
b.WriteString("Total chunks: " + strconv.Itoa(int(info.Chunks[0].Total)))
for _, chunk := range info.Chunks {
var id oid.ID
if err := id.Decode(chunk.ID.GetValue()); err != nil {
return nil, fmt.Errorf("unable to decode chunk id: %w", err)
}
b.WriteString("\n Index: " + strconv.Itoa(int(chunk.Index)) + " ID: " + id.String())
}
return b.Bytes(), nil
}
}
func getRangeList(cmd *cobra.Command) ([]objectSDK.Range, error) {
v := cmd.Flag("range").Value.String()
if len(v) == 0 {

View file

@ -173,7 +173,7 @@ func initObjectService(c *cfg) {
sSearchV2 := createSearchSvcV2(sSearch, keyStorage)
sGet := createGetService(c, keyStorage, traverseGen, c.clientCache)
sGet := createGetService(c, keyStorage, traverseGen, c.clientCache, c.cfgObject.cnrSource)
*c.cfgObject.getSvc = *sGet // need smth better
@ -358,6 +358,7 @@ func createSearchSvcV2(sSearch *searchsvc.Service, keyStorage *util.KeyStorage)
func createGetService(c *cfg, keyStorage *util.KeyStorage, traverseGen *util.TraverserGenerator,
coreConstructor *cache.ClientCache,
containerSource containercore.Source,
) *getsvc.Service {
ls := c.cfgObject.cfgLocalStorage.localStorage
@ -369,6 +370,7 @@ func createGetService(c *cfg, keyStorage *util.KeyStorage, traverseGen *util.Tra
placement.SuccessAfter(1),
),
coreConstructor,
containerSource,
getsvc.WithLogger(c.log))
}

4
go.mod
View file

@ -4,10 +4,10 @@ go 1.21
require (
code.gitea.io/sdk/gitea v0.17.1
git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.16.1-0.20240327095603-491a47e7fe24
git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.16.1-0.20240422151450-df9b65324a4c
git.frostfs.info/TrueCloudLab/frostfs-contract v0.19.0
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20231101111734-b3ad3335ff65
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20240329104804-ec0cb2169f92
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20240424080726-20ab57bf7ec3
git.frostfs.info/TrueCloudLab/hrw v1.2.1
git.frostfs.info/TrueCloudLab/policy-engine v0.0.0-20240412130734-0e69e485115a
git.frostfs.info/TrueCloudLab/tzhash v1.8.0

8
go.sum
View file

@ -1,15 +1,15 @@
code.gitea.io/sdk/gitea v0.17.1 h1:3jCPOG2ojbl8AcfaUCRYLT5MUcBMFwS0OSK2mA5Zok8=
code.gitea.io/sdk/gitea v0.17.1/go.mod h1:aCnBqhHpoEWA180gMbaCtdX9Pl6BWBAuuP2miadoTNM=
git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.16.1-0.20240327095603-491a47e7fe24 h1:uIkl0mKWwDICUZTbNWZ38HLYDBI9rMgdAhYQWZ0C9iQ=
git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.16.1-0.20240327095603-491a47e7fe24/go.mod h1:OBDSr+DqV1z4VDouoX3YMleNc4DPBVBWTG3WDT2PK1o=
git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.16.1-0.20240422151450-df9b65324a4c h1:RFDrNsF2e+EJfaB8lZrRRxNjQkLfM09gnEyudvGuc10=
git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.16.1-0.20240422151450-df9b65324a4c/go.mod h1:OBDSr+DqV1z4VDouoX3YMleNc4DPBVBWTG3WDT2PK1o=
git.frostfs.info/TrueCloudLab/frostfs-contract v0.19.0 h1:FzurjElUwC7InY9v5rzXReKbfBL5yRJKSWJPq6BKhH0=
git.frostfs.info/TrueCloudLab/frostfs-contract v0.19.0/go.mod h1:F/fe1OoIDKr5Bz99q4sriuHDuf3aZefZy9ZsCqEtgxc=
git.frostfs.info/TrueCloudLab/frostfs-crypto v0.6.0 h1:FxqFDhQYYgpe41qsIHVOcdzSVCB8JNSfPG7Uk4r2oSk=
git.frostfs.info/TrueCloudLab/frostfs-crypto v0.6.0/go.mod h1:RUIKZATQLJ+TaYQa60X2fTDwfuhMfm8Ar60bQ5fr+vU=
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20231101111734-b3ad3335ff65 h1:PaZ8GpnUoXxUoNsc1qp36bT2u7FU+neU4Jn9cl8AWqI=
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20231101111734-b3ad3335ff65/go.mod h1:6aAX80dvJ3r5fjN9CzzPglRptoiPgIC9KFGGsUA+1Hw=
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20240329104804-ec0cb2169f92 h1:hSyM52d8yIaOpYQlLlVYdrGbgCsvIDjwl3AJaJUlYPU=
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20240329104804-ec0cb2169f92/go.mod h1:i0RKqiF4z3UOxLSNwhHw+cUz/JyYWuTRpnn9ere4Y3w=
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20240424080726-20ab57bf7ec3 h1:7Sd/J2IM0uGpmFKBgseUh6/JsdJN06b8W8UZMKAUDZg=
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20240424080726-20ab57bf7ec3/go.mod h1:wDFmMP7l00Xd5VZVzF2MuhyJCnotyhfxHYnvrEEG/e4=
git.frostfs.info/TrueCloudLab/hrw v1.2.1 h1:ccBRK21rFvY5R1WotI6LNoPlizk7qSvdfD8lNIRudVc=
git.frostfs.info/TrueCloudLab/hrw v1.2.1/go.mod h1:C1Ygde2n843yTZEQ0FP69jYiuaYV0kriLvP4zm8JuvM=
git.frostfs.info/TrueCloudLab/policy-engine v0.0.0-20240412130734-0e69e485115a h1:wbndKvHbwDQiSMQWL75RxiTZCeUyCi7NUj1lsfdAGkc=

View file

@ -99,9 +99,17 @@ const (
GetRemoteCallFailed = "remote call failed"
GetCanNotAssembleTheObject = "can not assemble the object"
GetTryingToAssembleTheObject = "trying to assemble the object..."
GetTryingToAssembleTheECObject = "trying to assemble the ec object..."
GetAssemblingSplittedObject = "assembling splitted object..."
GetAssemblingECObject = "assembling erasure-coded object..."
GetUnableToGetAllPartsECObject = "unable to get all parts, continue to reconstruct with existed"
GetUnableToGetPartECObject = "unable to get part of the erasure-encoded object"
GetUnableToHeadPartECObject = "unable to head part of the erasure-encoded object"
GetUnableToGetECObjectContainer = "unable to get container for erasure-coded object"
GetAssemblingSplittedObjectCompleted = "assembling splitted object completed"
GetAssemblingECObjectCompleted = "assembling erasure-coded object completed"
GetFailedToAssembleSplittedObject = "failed to assemble splitted object"
GetFailedToAssembleECObject = "failed to assemble erasure-coded object"
GetCouldNotGenerateContainerTraverser = "could not generate container traverser"
GetCouldNotConstructRemoteNodeClient = "could not construct remote node client"
GetCouldNotWriteHeader = "could not write header"
@ -111,6 +119,7 @@ const (
GetCompletingTheOperation = "completing the operation"
fyrchik marked this conversation as resolved Outdated

We have "erasure coded" here and "erasure-encoded" in CLI print.
Wikipedia uses "erasure-coded" https://en.wikipedia.org/wiki/Erasure_code, let's stick with it.

We have "erasure coded" here and "erasure-encoded" in CLI print. Wikipedia uses "erasure-coded" https://en.wikipedia.org/wiki/Erasure_code, let's stick with it.

Agree, updated here in other places.

Agree, updated here in other places.
GetRequestedObjectWasMarkedAsRemoved = "requested object was marked as removed"
GetRequestedObjectIsVirtual = "requested object is virtual"
GetRequestedObjectIsEC = "requested object is erasure-coded"
GetRequestedRangeIsOutOfObjectBounds = "requested range is out of object bounds"
PutAdditionalContainerBroadcastFailure = "additional container broadcast failure"
SearchReturnResultDirectly = "return result directly"

View file

@ -88,6 +88,10 @@ func (e *StorageEngine) get(ctx context.Context, prm GetPrm) (GetRes, error) {
return GetRes{}, logicerr.Wrap(objectSDK.NewSplitInfoError(it.SplitInfo))
}
if it.ECInfo != nil {
return GetRes{}, logicerr.Wrap(objectSDK.NewECInfoError(it.ECInfo))
}
if it.ObjectExpired {
return GetRes{}, errNotFound
}
@ -119,6 +123,7 @@ func (e *StorageEngine) get(ctx context.Context, prm GetPrm) (GetRes, error) {
type getShardIterator struct {
Object *objectSDK.Object
SplitInfo *objectSDK.SplitInfo
ECInfo *objectSDK.ECInfo
OutError error
ShardWithMeta hashedShard
MetaError error
@ -130,6 +135,7 @@ type getShardIterator struct {
Engine *StorageEngine
splitInfoErr *objectSDK.SplitInfoError
ecInfoErr *objectSDK.ECInfoError
}
func (i *getShardIterator) tryGetWithMeta(ctx context.Context) {
@ -164,6 +170,14 @@ func (i *getShardIterator) tryGetWithMeta(ctx context.Context) {
// stop iterating over shards if SplitInfo structure is complete
return withLink && withLast
case errors.As(err, &i.ecInfoErr):
if i.ECInfo == nil {
i.ECInfo = objectSDK.NewECInfo()
}
util.MergeECInfo(i.ecInfoErr.ECInfo(), i.ECInfo)
// stop iterating over shards if ECInfo structure is complete
return len(i.ECInfo.Chunks) == int(i.ECInfo.Chunks[0].Total)
case client.IsErrObjectAlreadyRemoved(err):
i.OutError = err
return true // stop, return it back

View file

@ -67,7 +67,6 @@ func (e *StorageEngine) Head(ctx context.Context, prm HeadPrm) (res HeadRes, err
func (e *StorageEngine) head(ctx context.Context, prm HeadPrm) (HeadRes, error) {
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.head")
defer span.End()
if e.metrics != nil {
defer elapsed("Head", e.metrics.AddMethodDuration)()
}
@ -76,11 +75,11 @@ func (e *StorageEngine) head(ctx context.Context, prm HeadPrm) (HeadRes, error)
head *objectSDK.Object
siErr *objectSDK.SplitInfoError
outSI *objectSDK.SplitInfo
eiErr *objectSDK.ECInfoError
outEI *objectSDK.ECInfo
outError error = new(apistatus.ObjectNotFound)
shPrm shard.HeadPrm
)
var shPrm shard.HeadPrm
shPrm.SetAddress(prm.addr)
shPrm.SetRaw(prm.raw)
@ -94,44 +93,43 @@ func (e *StorageEngine) head(ctx context.Context, prm HeadPrm) (HeadRes, error)
if outSI == nil {
outSI = objectSDK.NewSplitInfo()
}
util.MergeSplitInfo(siErr.SplitInfo(), outSI)
_, withLink := outSI.Link()
_, withLast := outSI.LastPart()
// stop iterating over shards if SplitInfo structure is complete
if withLink && withLast {
return true
}
return false
case errors.As(err, &eiErr):
if outEI == nil {
outEI = objectSDK.NewECInfo()
}
util.MergeECInfo(eiErr.ECInfo(), outEI)
// stop iterating over shards if ECInfo structure is complete
return len(outEI.Chunks) == int(outEI.Chunks[0].Total)
case client.IsErrObjectAlreadyRemoved(err):
outError = err
fyrchik marked this conversation as resolved Outdated

unrelated to the commit

unrelated to the commit

This change required because we have check for amount of lines in method. Don't want to do refactoring in scope of this PR.

This change required because we have check for amount of lines in method. Don't want to do refactoring in scope of this PR.

That is what I mean by unrelated, can we have a separate commit for it, before the EC changes?
Well, there is another whitespace change commit already.

That is what I mean by unrelated, can we have a separate commit for it, before the EC changes? Well, there is another whitespace change commit already.

Ok, moved to the separate commit.

Ok, moved to the separate commit.
return true // stop, return it back
case shard.IsErrObjectExpired(err):
// object is found but should not
// be returned
outError = new(apistatus.ObjectNotFound)
return true
default:
e.reportShardError(sh, "could not head object from shard", err)
return false
}
}
head = res.Object()
return true
})
if outSI != nil {
return HeadRes{}, logicerr.Wrap(objectSDK.NewSplitInfoError(outSI))
}
if head == nil {
} else if outEI != nil {
return HeadRes{}, logicerr.Wrap(objectSDK.NewECInfoError(outEI))
} else if head == nil {
return HeadRes{}, outError
}

View file

@ -108,6 +108,10 @@ func (db *DB) exists(tx *bbolt.Tx, addr oid.Address, currEpoch uint64) (exists b
return false, logicerr.Wrap(objectSDK.NewSplitInfoError(splitInfo))
}
// if parent bucket is empty, then check if object exists in ec bucket
if data := getFromBucket(tx, ecInfoBucketName(cnr, key), objKey); len(data) != 0 {
return false, getECInfoError(tx, cnr, data)
}
// if parent bucket is empty, then check if object exists in typed buckets
return firstIrregularObjectType(tx, cnr, objKey) != objectSDK.TypeRegular, nil

View file

@ -110,6 +110,11 @@ func (db *DB) get(tx *bbolt.Tx, addr oid.Address, key []byte, checkStatus, raw b
return obj, obj.Unmarshal(data)
}
data = getFromBucket(tx, ecInfoBucketName(cnr, bucketName), key)
if len(data) != 0 {
return nil, getECInfoError(tx, cnr, data)
}
// if not found then check in tombstone index
data = getFromBucket(tx, tombstoneBucketName(cnr, bucketName), key)
if len(data) != 0 {
@ -185,3 +190,27 @@ func getSplitInfoError(tx *bbolt.Tx, cnr cid.ID, key []byte) error {
return logicerr.Wrap(new(apistatus.ObjectNotFound))
}
func getECInfoError(tx *bbolt.Tx, cnr cid.ID, data []byte) error {
offset := 0
ecInfo := objectSDK.NewECInfo()
for offset < len(data) {
key := data[offset : offset+objectKeySize]
// check in primary index
ojbData := getFromBucket(tx, primaryBucketName(cnr, make([]byte, bucketKeySize)), key)
if len(data) != 0 {
obj := objectSDK.New()
if err := obj.Unmarshal(ojbData); err != nil {
return err
}
chunk := objectSDK.ECChunk{}
id, _ := obj.ID()
chunk.SetID(id)
chunk.Index = obj.ECHeader().Index()
chunk.Total = obj.ECHeader().Total()
ecInfo.AddChunk(chunk)
}
offset += objectKeySize
}
return logicerr.Wrap(objectSDK.NewECInfoError(ecInfo))
}

View file

@ -3,6 +3,7 @@ package meta_test
import (
"bytes"
"context"
"errors"
"fmt"
"os"
"runtime"
@ -15,8 +16,10 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/erasurecode"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"github.com/stretchr/testify/require"
)
@ -109,6 +112,42 @@ func TestDB_Get(t *testing.T) {
require.True(t, binaryEqual(child.CutPayload(), newChild))
})
t.Run("put erasure-coded object", func(t *testing.T) {
cnr := cidtest.ID()
virtual := testutil.GenerateObjectWithCID(cnr)
c, err := erasurecode.NewConstructor(3, 1)
require.NoError(t, err)
pk, err := keys.NewPrivateKey()
require.NoError(t, err)
parts, err := c.Split(virtual, &pk.PrivateKey)
require.NoError(t, err)
for _, part := range parts {
err = putBig(db, part)
var eiError *objectSDK.ECInfoError
if err != nil && !errors.As(err, &eiError) {
require.NoError(t, err)
}
}
_, err = metaGet(db, object.AddressOf(virtual), true)
var eiError *objectSDK.ECInfoError
require.ErrorAs(t, err, &eiError)
require.Equal(t, len(eiError.ECInfo().Chunks), len(parts))
for _, chunk := range eiError.ECInfo().Chunks {
var found bool
for _, part := range parts {
partID, _ := part.ID()
var chunkID oid.ID
require.NoError(t, chunkID.ReadFromV2(chunk.ID))
if chunkID.Equals(partID) {
found = true
}
}
if !found {
require.Fail(t, "chunk not found")
}
}
})
t.Run("get removed object", func(t *testing.T) {
obj := oidtest.Address()
ts := oidtest.Address()

View file

@ -1,6 +1,7 @@
package meta
import (
"bytes"
"context"
"encoding/binary"
"errors"
@ -264,6 +265,13 @@ func putUniqueIndexes(
if err != nil {
return err
}
if ecHead := obj.GetECHeader(); ecHead != nil {
err = putECInfo(tx, cnr, objKey, ecHead)
if err != nil {
return err
}
}
}
return nil
@ -571,3 +579,34 @@ func isLinkObject(obj *objectSDK.Object) bool {
func isLastObject(obj *objectSDK.Object) bool {
return len(obj.Children()) == 0 && obj.Parent() != nil
}
func putECInfo(tx *bbolt.Tx,
cnr cid.ID, objKey []byte,
ecHead *objectSDK.ECHeader,
) error {
parentID := objectKey(ecHead.Parent(), make([]byte, objectKeySize))
bucketName := make([]byte, bucketKeySize)
val := getFromBucket(tx, ecInfoBucketName(cnr, bucketName), parentID)
if len(val) == 0 {
val = objKey
} else {
offset := 0
found := false
for offset < len(val) {
if bytes.Equal(objKey, val[offset:offset+objectKeySize]) {
found = true
break
}
offset += objectKeySize
}
if !found {
val = append(val, objKey...)
}
}
return putUniqueIndexItem(tx, namedBucketItem{
name: ecInfoBucketName(cnr, make([]byte, bucketKeySize)),
key: parentID,
val: val,
})
}

View file

@ -119,6 +119,11 @@ const (
// Key: container ID + type
// Value: container size in bytes as little-endian uint64
containerCountersPrefix
// ecInfoPrefix is used for storing relation between EC parent id and chunk id.
// Key: container ID + type
// Value: Object id
ecInfoPrefix
)
const (
@ -190,6 +195,11 @@ func splitBucketName(cnr cid.ID, key []byte) []byte {
return bucketName(cnr, splitPrefix, key)
}
// ecInfoBucketName returns <CID>_ecinfo.
func ecInfoBucketName(cnr cid.ID, key []byte) []byte {
return bucketName(cnr, ecInfoPrefix, key)
}
// addressKey returns key for K-V tables when key is a whole address.
func addressKey(addr oid.Address, key []byte) []byte {
addr.Container().Encode(key)

View file

@ -0,0 +1,25 @@
package util
import (
"bytes"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
)
// MergeECInfo ignores conflicts and rewrites `to` with non empty values
// from `from`.
func MergeECInfo(from, to *objectSDK.ECInfo) *objectSDK.ECInfo {
for _, fchunk := range from.Chunks {
add := true
for _, tchunk := range to.Chunks {
if bytes.Equal(tchunk.ID.GetValue(), fchunk.ID.GetValue()) {
add = false
break
}
}
if add {
to.AddChunk(*objectSDK.NewECChunkFromV2(&fchunk))
}
}
return to
}

View file

@ -0,0 +1,56 @@
package util
import (
"crypto/rand"
"testing"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"github.com/stretchr/testify/require"
)
func TestMergeECInfo(t *testing.T) {
id := generateV2ID()
target := objectSDK.NewECInfo()
var chunk objectSDK.ECChunk
chunk.Total = 2
chunk.Index = 0
chunk.SetID(id)
target.AddChunk(chunk)
t.Run("merge empty", func(t *testing.T) {
to := objectSDK.NewECInfo()
result := MergeECInfo(target, to)
require.Equal(t, result, target)
})
t.Run("merge existed", func(t *testing.T) {
to := objectSDK.NewECInfo()
to.AddChunk(chunk)
result := MergeECInfo(target, to)
require.Equal(t, result, target)
})
t.Run("merge extend", func(t *testing.T) {
to := objectSDK.NewECInfo()
var chunk objectSDK.ECChunk
chunk.Total = 2
chunk.Index = 1
chunk.SetID(generateV2ID())
to.AddChunk(chunk)
result := MergeECInfo(target, to)
require.Equal(t, len(result.Chunks), 2)
})
}
func generateV2ID() oid.ID {
var buf [32]byte
_, _ = rand.Read(buf[:])
var id oid.ID
_ = id.Decode(buf[:])
return id
}

View file

@ -167,8 +167,9 @@ func (x *multiClient) iterateClients(ctx context.Context, f func(clientcore.Clie
// from the SDK client; should not be considered
// as a connection error
var siErr *objectSDK.SplitInfoError
var eiErr *objectSDK.ECInfoError
success := err == nil || errors.Is(err, context.Canceled) || errors.As(err, &siErr)
success := err == nil || errors.Is(err, context.Canceled) || errors.As(err, &siErr) || errors.As(err, &eiErr)
if success || firstErr == nil || errors.Is(firstErr, errRecentlyFailed) {
firstErr = err
}
@ -196,7 +197,8 @@ func (x *multiClient) ReportError(err error) {
// from the SDK client; should not be considered
// as a connection error
var siErr *objectSDK.SplitInfoError
if errors.As(err, &siErr) {
var eiErr *objectSDK.ECInfoError
if errors.As(err, &siErr) || errors.As(err, &eiErr) {
return
}

View file

@ -139,9 +139,11 @@ func (r *request) getObjectWithIndependentRequest(ctx context.Context, prm Reque
remoteStorageConstructor: r.remoteStorageConstructor,
epochSource: r.epochSource,
localStorage: r.localStorage,
containerSource: r.containerSource,
prm: prm,
infoSplit: objectSDK.NewSplitInfo(),
infoEC: objectSDK.NewECInfo(),
log: r.log,
}

View file

@ -0,0 +1,79 @@
package getsvc
import (
"context"
"errors"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
"go.uber.org/zap"
)
func (r *request) assembleEC(ctx context.Context) {
if r.isRaw() {
r.log.Debug(logs.GetCanNotAssembleTheObject)
return
}
// Any access tokens are not expected to be used in the assembly process:
// - there is no requirement to specify child objects in session/bearer
// token for `GET`/`GETRANGE`/`RANGEHASH` requests in the API protocol,
// and, therefore, their missing in the original request should not be
// considered as error; on the other hand, without session for every child
// object, it is impossible to attach bearer token in the new generated
// requests correctly because the token has not been issued for that node's
// key;
// - the assembly process is expected to be handled on a container node
// only since the requests forwarding mechanism presentation; such the
// node should have enough rights for getting any child object by design.
r.prm.common.ForgetTokens()
// Do not use forwarding during assembly stage.
// Request forwarding closure inherited in produced
// `execCtx` so it should be disabled there.
r.disableForwarding()
r.log.Debug(logs.GetTryingToAssembleTheECObject)
assembler := newAssemblerEC(r.address(), r.infoEC, r.ctxRange(), r, r.containerSource, r.log)
r.log.Debug(logs.GetAssemblingECObject,
zap.Stringer("address", r.address()),
zap.Uint64("range_offset", r.ctxRange().GetOffset()),
zap.Uint64("range_length", r.ctxRange().GetLength()),
)
defer r.log.Debug(logs.GetAssemblingECObjectCompleted,
zap.Stringer("address", r.address()),
zap.Uint64("range_offset", r.ctxRange().GetOffset()),
zap.Uint64("range_length", r.ctxRange().GetLength()),
)
obj, err := assembler.Assemble(ctx, r.prm.objWriter, r.headOnly())
if err != nil {
r.log.Warn(logs.GetFailedToAssembleECObject,
zap.Error(err),
zap.Stringer("address", r.address()),
zap.Uint64("range_offset", r.ctxRange().GetOffset()),
zap.Uint64("range_length", r.ctxRange().GetLength()),
)
}
var errRemoved *apistatus.ObjectAlreadyRemoved
var errOutOfRange *apistatus.ObjectOutOfRange
switch {

Look like remote and local errors are the same.

Look like remote and local errors are the same.

Removed.

Removed.

Incompleted

var errOutOfRangeRemote *apistatus.ObjectOutOfRange
var errOutOfRangeLocal *apistatus.ObjectOutOfRange
Incompleted ``` var errOutOfRangeRemote *apistatus.ObjectOutOfRange var errOutOfRangeLocal *apistatus.ObjectOutOfRange ```

Sorry, fixed.

Sorry, fixed.
default:
r.status = statusUndefined
r.err = err
case err == nil:
r.status = statusOK
r.err = nil
r.collectedObject = obj
case errors.As(err, &errRemoved):
r.status = statusINHUMED
r.err = errRemoved
case errors.As(err, &errOutOfRange):
r.status = statusOutOfRange
r.err = errOutOfRange
}
}

View file

@ -0,0 +1,117 @@
package getsvc
import (
"context"
"fmt"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/policy"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/erasurecode"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)
type assemblerec struct {
addr oid.Address
ecInfo *objectSDK.ECInfo
rng *objectSDK.Range
objGetter objectGetter
cs container.Source
log *logger.Logger
}
func newAssemblerEC(
addr oid.Address,
ecInfo *objectSDK.ECInfo,
rng *objectSDK.Range,
objGetter objectGetter,
cs container.Source,
log *logger.Logger,
) *assemblerec {
return &assemblerec{
addr: addr,
rng: rng,
ecInfo: ecInfo,
objGetter: objGetter,
cs: cs,
log: log,
}
}
// Assemble assembles erasure-coded object and writes it's content to ObjectWriter.
// It returns parent object.
func (a *assemblerec) Assemble(ctx context.Context, writer ObjectWriter, headOnly bool) (*objectSDK.Object, error) {
fyrchik marked this conversation as resolved Outdated

When we are to add GET_RANGE do we need to alter this function or write code in another place?

When we are to add GET_RANGE do we need to alter this function or write code in another place?

We need to extend assembler for EC a bit, yes.

We need to extend assembler for EC a bit, yes.
parts := a.retrieveParts(ctx, headOnly)
cnt, err := a.cs.Get(a.addr.Container())
if err != nil {
return nil, err
}
c, err := erasurecode.NewConstructor(
policy.ECDataCount(cnt.Value.PlacementPolicy()),
policy.ECParityCount(cnt.Value.PlacementPolicy()),
)
if err != nil {
return nil, err
}
if headOnly {
obj, err := c.ReconstructHeader(parts)
if err == nil {
return obj, writer.WriteHeader(ctx, obj)
}
return nil, err
}
obj, err := c.Reconstruct(parts)
if err == nil {
err = writer.WriteHeader(ctx, obj.CutPayload())
if err == nil {
err = writer.WriteChunk(ctx, obj.Payload())
if err != nil {
return nil, err
}
}
}
return obj, err
}
func (a *assemblerec) retrieveParts(mainCtx context.Context, headOnly bool) []*objectSDK.Object {
parts := make([]*objectSDK.Object, int(a.ecInfo.Chunks[0].Total))
errGroup, ctx := errgroup.WithContext(mainCtx)
for i := range a.ecInfo.Chunks {
chunk := a.ecInfo.Chunks[i]
errGroup.Go(func() error {
objID := new(oid.ID)
err := objID.ReadFromV2(chunk.ID)
if err != nil {
return fmt.Errorf("invalid object ID: %w", err)
}
var obj *objectSDK.Object
if headOnly {
obj, err = a.objGetter.HeadObject(ctx, *objID)
if err != nil {
a.log.Debug(logs.GetUnableToHeadPartECObject, zap.Stringer("part_id", objID), zap.Error(err))
return nil
}
} else {
sow := NewSimpleObjectWriter()
obj, err = a.objGetter.GetObjectAndWritePayload(ctx, *objID, nil, sow)
if err != nil {

ctx -> mainCtx, or even better to move errgroup related code to separate func.
As far as I remember errgoup's ctx is cancelled after Wait.

ctx -> mainCtx, or even better to move `errgroup` related code to separate func. As far as I remember errgoup's ctx is cancelled after Wait.

Thanks a lot! Split on two functions.

Thanks a lot! Split on two functions.
a.log.Debug(logs.GetUnableToGetPartECObject, zap.Stringer("part_id", objID), zap.Error(err))
return nil
}
obj.SetPayload(sow.pld)
}
parts[chunk.Index] = obj
return nil
})
}
if err := errGroup.Wait(); err != nil {
a.log.Debug(logs.GetUnableToGetAllPartsECObject, zap.Error(err))
}
return parts
}

View file

@ -73,9 +73,12 @@ func (s *Service) get(ctx context.Context, prm RequestParameters) error {
remoteStorageConstructor: s.remoteStorageConstructor,
epochSource: s.epochSource,
localStorage: s.localStorage,
containerSource: s.containerSource,
prm: prm,
infoSplit: objectSDK.NewSplitInfo(),
infoEC: objectSDK.NewECInfo(),
log: s.log,
}
exec.setLogger(s.log)
@ -106,6 +109,16 @@ func (exec *request) analyzeStatus(ctx context.Context, execCnr bool) {
exec.assemble(ctx)
case statusOutOfRange:
exec.log.Debug(logs.GetRequestedRangeIsOutOfObjectBounds)
case statusEC:
if !exec.isLocal() {
if execCnr {
exec.executeOnContainer(ctx)
exec.analyzeStatus(ctx, false)
} else {
exec.log.Debug(logs.GetRequestedObjectIsEC)
exec.assembleEC(ctx)
}
}
default:
exec.log.Debug(logs.OperationFinishedWithError,
zap.Error(exec.err),

View file

@ -5,6 +5,7 @@ import (
"errors"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
@ -22,6 +23,7 @@ func (r *request) executeLocal(ctx context.Context) {
r.collectedObject, err = r.get(ctx)
var errSplitInfo *objectSDK.SplitInfoError
var errECInfo *objectSDK.ECInfoError
var errRemoved *apistatus.ObjectAlreadyRemoved
var errOutOfRange *apistatus.ObjectOutOfRange
@ -42,6 +44,10 @@ func (r *request) executeLocal(ctx context.Context) {
r.status = statusVIRTUAL
mergeSplitInfo(r.splitInfo(), errSplitInfo.SplitInfo())
r.err = objectSDK.NewSplitInfoError(r.infoSplit)
case errors.As(err, &errECInfo):
r.status = statusEC
util.MergeECInfo(errECInfo.ECInfo(), r.infoEC)
r.err = objectSDK.NewECInfoError(r.infoEC)
case errors.As(err, &errOutOfRange):
r.status = statusOutOfRange
r.err = errOutOfRange

View file

@ -7,6 +7,8 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/policy"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
@ -27,15 +29,20 @@ func (r *request) processNode(ctx context.Context, info client.NodeInfo) bool {
obj, err := r.getRemote(ctx, rs, info)
var errSplitInfo *objectSDK.SplitInfoError
var errECInfo *objectSDK.ECInfoError
var errRemoved *apistatus.ObjectAlreadyRemoved
var errOutOfRange *apistatus.ObjectOutOfRange
switch {
default:
r.log.Debug(logs.GetRemoteCallFailed, zap.Error(err))
if r.status == statusEC {

What does this condition checks?

What does this condition checks?

We need to continue to process the nodes in case of getting error from the node. Updated this line a bit.

We need to continue to process the nodes in case of getting error from the node. Updated this line a bit.
// we need to continue getting another chunks from another nodes
// in case of network issue
return false
}
r.status = statusUndefined
r.err = new(apistatus.ObjectNotFound)
r.log.Debug(logs.GetRemoteCallFailed, zap.Error(err))
case err == nil:
r.status = statusOK
r.err = nil
@ -57,6 +64,20 @@ func (r *request) processNode(ctx context.Context, info client.NodeInfo) bool {
r.status = statusVIRTUAL
mergeSplitInfo(r.splitInfo(), errSplitInfo.SplitInfo())
r.err = objectSDK.NewSplitInfoError(r.infoSplit)
case errors.As(err, &errECInfo):
r.status = statusEC
util.MergeECInfo(r.infoEC, errECInfo.ECInfo())
fyrchik marked this conversation as resolved Outdated

Each TODO should be accompanied by an issue, could you create one?

Each TODO should be accompanied by an issue, could you create one?

Anyway, the condition here is easy to fix, the number of chunks we must have is equal to placementpolicy.ECDataCount()

Anyway, the condition here is easy to fix, the number of chunks we must have is equal to `placementpolicy.ECDataCount()`

Fixed.

Fixed.
r.infoEC = errECInfo.ECInfo()
r.err = objectSDK.NewECInfoError(r.infoEC)
if r.isRaw() {
return len(r.infoEC.Chunks) == int(r.infoEC.Chunks[0].Total)
}
cnt, err := r.containerSource.Get(r.address().Container())
if err == nil {
return len(r.infoEC.Chunks) == policy.ECDataCount(cnt.Value.PlacementPolicy())
}
r.log.Debug(logs.GetUnableToGetECObjectContainer, zap.Error(err))
return len(r.infoEC.Chunks) == int(r.infoEC.Chunks[0].Total)
}
return r.status != statusUndefined

View file

@ -6,6 +6,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
clientcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
@ -22,6 +23,8 @@ type request struct {
infoSplit *objectSDK.SplitInfo
infoEC *objectSDK.ECInfo
log *logger.Logger
collectedObject *objectSDK.Object
@ -33,6 +36,7 @@ type request struct {
traverserGenerator traverserGenerator
remoteStorageConstructor remoteStorageConstructor
localStorage localStorage
containerSource container.Source
}
func (r *request) setLogger(l *logger.Logger) {

View file

@ -1,6 +1,7 @@
package getsvc
import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
"go.uber.org/zap"
)
@ -16,6 +17,7 @@ type Service struct {
epochSource epochSource
keyStore keyStorage
remoteStorageConstructor remoteStorageConstructor
containerSource container.Source
}
// New creates, initializes and returns utility serving
@ -26,6 +28,7 @@ func New(
e localStorageEngine,
tg traverserGenerator,
cc clientConstructor,
cs container.Source,
opts ...Option,
) *Service {
result := &Service{
@ -39,6 +42,7 @@ func New(
remoteStorageConstructor: &multiclientRemoteStorageConstructor{
clientConstructor: cc,
},
containerSource: cs,
}
for _, option := range opts {
option(result)

View file

@ -6,6 +6,7 @@ const (
statusINHUMED
statusVIRTUAL
statusOutOfRange
statusEC
)
type statusError struct {

View file

@ -166,6 +166,9 @@ func (f *getRequestForwarder) readStream(ctx context.Context, c client.MultiAddr
case *objectV2.SplitInfo:
si := objectSDK.NewSplitInfoFromV2(v)
return objectSDK.NewSplitInfoError(si)
case *objectV2.ECInfo:
ei := objectSDK.NewECInfoFromV2(v)
return objectSDK.NewECInfoError(ei)
}
}
return nil

View file

@ -84,6 +84,9 @@ func (f *headRequestForwarder) forwardRequestToNode(ctx context.Context, addr ne
case *objectV2.SplitInfo:
si := objectSDK.NewSplitInfoFromV2(v)
return nil, objectSDK.NewSplitInfoError(si)
case *objectV2.ECInfo:
ei := objectSDK.NewECInfoFromV2(v)
return nil, objectSDK.NewECInfoError(ei)
}
objv2 := new(objectV2.Object)

View file

@ -82,10 +82,13 @@ func (s *Service) Get(req *objectV2.GetRequest, stream objectSvc.GetObjectStream
err = s.svc.Get(stream.Context(), *p)
var splitErr *objectSDK.SplitInfoError
var ecErr *objectSDK.ECInfoError
switch {
case errors.As(err, &splitErr):
return stream.Send(splitInfoResponse(splitErr.SplitInfo()))
case errors.As(err, &ecErr):
return stream.Send(ecInfoResponse(ecErr.ECInfo()))
default:
return err
}
@ -123,11 +126,16 @@ func (s *Service) Head(ctx context.Context, req *objectV2.HeadRequest) (*objectV
err = s.svc.Head(ctx, *p)
var splitErr *objectSDK.SplitInfoError
var ecErr *objectSDK.ECInfoError
if errors.As(err, &splitErr) {
setSplitInfoHeadResponse(splitErr.SplitInfo(), resp)
err = nil
}
if errors.As(err, &ecErr) {
setECInfoHeadResponse(ecErr.ECInfo(), resp)
err = nil
}
return resp, err
}

View file

@ -270,6 +270,17 @@ func splitInfoResponse(info *objectSDK.SplitInfo) *objectV2.GetResponse {
return resp
}
func ecInfoResponse(info *objectSDK.ECInfo) *objectV2.GetResponse {
resp := new(objectV2.GetResponse)
body := new(objectV2.GetResponseBody)
resp.SetBody(body)
body.SetObjectPart(info.ToV2())
return resp
}
func splitInfoRangeResponse(info *objectSDK.SplitInfo) *objectV2.GetRangeResponse {
resp := new(objectV2.GetRangeResponse)
@ -285,6 +296,10 @@ func setSplitInfoHeadResponse(info *objectSDK.SplitInfo, resp *objectV2.HeadResp
resp.GetBody().SetHeaderPart(info.ToV2())
}
func setECInfoHeadResponse(info *objectSDK.ECInfo, resp *objectV2.HeadResponse) {
resp.GetBody().SetHeaderPart(info.ToV2())
}
func toHashResponse(typ refs.ChecksumType, res *getsvc.RangeHashRes) *objectV2.GetRangeHashResponse {
resp := new(objectV2.GetRangeHashResponse)

View file

@ -19,9 +19,7 @@ import (
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
)
var (
subjectNotFoundErrorMessage = "subject not found"
)
var subjectNotFoundErrorMessage = "subject not found"
func (s *Service) checkAPE(container *core.Container, cid cid.ID, operation acl.Op, role acl.Role, publicKey *keys.PublicKey) error {
namespace := ""