node: Implement Get\Head
requests for EC object #1103
30 changed files with 575 additions and 7 deletions
|
@ -99,6 +99,10 @@ func getObject(cmd *cobra.Command, _ []string) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if ok := printECInfoErr(cmd, err); ok {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
commonCmd.ExitOnErr(cmd, "rpc error: %w", err)
|
commonCmd.ExitOnErr(cmd, "rpc error: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -70,6 +70,10 @@ func getObjectHeader(cmd *cobra.Command, _ []string) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if ok := printECInfoErr(cmd, err); ok {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
commonCmd.ExitOnErr(cmd, "rpc error: %w", err)
|
commonCmd.ExitOnErr(cmd, "rpc error: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -146,6 +146,50 @@ func marshalSplitInfo(cmd *cobra.Command, info *objectSDK.SplitInfo) ([]byte, er
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func printECInfoErr(cmd *cobra.Command, err error) bool {
|
||||||
|
var errECInfo *objectSDK.ECInfoError
|
||||||
|
|
||||||
|
ok := errors.As(err, &errECInfo)
|
||||||
|
|
||||||
|
if ok {
|
||||||
|
cmd.PrintErrln("Object is erasure-encoded, ec information received.")
|
||||||
|
printECInfo(cmd, errECInfo.ECInfo())
|
||||||
|
}
|
||||||
|
|
||||||
|
return ok
|
||||||
|
}
|
||||||
|
|
||||||
|
func printECInfo(cmd *cobra.Command, info *objectSDK.ECInfo) {
|
||||||
|
bs, err := marshalECInfo(cmd, info)
|
||||||
|
commonCmd.ExitOnErr(cmd, "can't marshal split info: %w", err)
|
||||||
|
|
||||||
|
cmd.Println(string(bs))
|
||||||
|
}
|
||||||
|
|
||||||
|
func marshalECInfo(cmd *cobra.Command, info *objectSDK.ECInfo) ([]byte, error) {
|
||||||
|
toJSON, _ := cmd.Flags().GetBool(commonflags.JSON)
|
||||||
|
toProto, _ := cmd.Flags().GetBool("proto")
|
||||||
|
switch {
|
||||||
|
case toJSON && toProto:
|
||||||
|
return nil, errors.New("'--json' and '--proto' flags are mutually exclusive")
|
||||||
|
case toJSON:
|
||||||
|
return info.MarshalJSON()
|
||||||
|
case toProto:
|
||||||
|
return info.Marshal()
|
||||||
|
default:
|
||||||
|
b := bytes.NewBuffer(nil)
|
||||||
|
b.WriteString("Total chunks: " + strconv.Itoa(int(info.Chunks[0].Total)))
|
||||||
|
for _, chunk := range info.Chunks {
|
||||||
|
var id oid.ID
|
||||||
|
if err := id.Decode(chunk.ID.GetValue()); err != nil {
|
||||||
|
return nil, fmt.Errorf("unable to decode chunk id: %w", err)
|
||||||
|
}
|
||||||
|
b.WriteString("\n Index: " + strconv.Itoa(int(chunk.Index)) + " ID: " + id.String())
|
||||||
|
}
|
||||||
|
return b.Bytes(), nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func getRangeList(cmd *cobra.Command) ([]objectSDK.Range, error) {
|
func getRangeList(cmd *cobra.Command) ([]objectSDK.Range, error) {
|
||||||
v := cmd.Flag("range").Value.String()
|
v := cmd.Flag("range").Value.String()
|
||||||
if len(v) == 0 {
|
if len(v) == 0 {
|
||||||
|
|
|
@ -173,7 +173,7 @@ func initObjectService(c *cfg) {
|
||||||
|
|
||||||
sSearchV2 := createSearchSvcV2(sSearch, keyStorage)
|
sSearchV2 := createSearchSvcV2(sSearch, keyStorage)
|
||||||
|
|
||||||
sGet := createGetService(c, keyStorage, traverseGen, c.clientCache)
|
sGet := createGetService(c, keyStorage, traverseGen, c.clientCache, c.cfgObject.cnrSource)
|
||||||
|
|
||||||
*c.cfgObject.getSvc = *sGet // need smth better
|
*c.cfgObject.getSvc = *sGet // need smth better
|
||||||
|
|
||||||
|
@ -358,6 +358,7 @@ func createSearchSvcV2(sSearch *searchsvc.Service, keyStorage *util.KeyStorage)
|
||||||
|
|
||||||
func createGetService(c *cfg, keyStorage *util.KeyStorage, traverseGen *util.TraverserGenerator,
|
func createGetService(c *cfg, keyStorage *util.KeyStorage, traverseGen *util.TraverserGenerator,
|
||||||
coreConstructor *cache.ClientCache,
|
coreConstructor *cache.ClientCache,
|
||||||
|
containerSource containercore.Source,
|
||||||
) *getsvc.Service {
|
) *getsvc.Service {
|
||||||
ls := c.cfgObject.cfgLocalStorage.localStorage
|
ls := c.cfgObject.cfgLocalStorage.localStorage
|
||||||
|
|
||||||
|
@ -369,6 +370,7 @@ func createGetService(c *cfg, keyStorage *util.KeyStorage, traverseGen *util.Tra
|
||||||
placement.SuccessAfter(1),
|
placement.SuccessAfter(1),
|
||||||
),
|
),
|
||||||
coreConstructor,
|
coreConstructor,
|
||||||
|
containerSource,
|
||||||
getsvc.WithLogger(c.log))
|
getsvc.WithLogger(c.log))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
4
go.mod
4
go.mod
|
@ -4,10 +4,10 @@ go 1.21
|
||||||
|
|
||||||
require (
|
require (
|
||||||
code.gitea.io/sdk/gitea v0.17.1
|
code.gitea.io/sdk/gitea v0.17.1
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.16.1-0.20240327095603-491a47e7fe24
|
git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.16.1-0.20240422151450-df9b65324a4c
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-contract v0.19.0
|
git.frostfs.info/TrueCloudLab/frostfs-contract v0.19.0
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20231101111734-b3ad3335ff65
|
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20231101111734-b3ad3335ff65
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20240329104804-ec0cb2169f92
|
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20240424080726-20ab57bf7ec3
|
||||||
git.frostfs.info/TrueCloudLab/hrw v1.2.1
|
git.frostfs.info/TrueCloudLab/hrw v1.2.1
|
||||||
git.frostfs.info/TrueCloudLab/policy-engine v0.0.0-20240412130734-0e69e485115a
|
git.frostfs.info/TrueCloudLab/policy-engine v0.0.0-20240412130734-0e69e485115a
|
||||||
git.frostfs.info/TrueCloudLab/tzhash v1.8.0
|
git.frostfs.info/TrueCloudLab/tzhash v1.8.0
|
||||||
|
|
BIN
go.sum
BIN
go.sum
Binary file not shown.
|
@ -99,9 +99,17 @@ const (
|
||||||
GetRemoteCallFailed = "remote call failed"
|
GetRemoteCallFailed = "remote call failed"
|
||||||
GetCanNotAssembleTheObject = "can not assemble the object"
|
GetCanNotAssembleTheObject = "can not assemble the object"
|
||||||
GetTryingToAssembleTheObject = "trying to assemble the object..."
|
GetTryingToAssembleTheObject = "trying to assemble the object..."
|
||||||
|
GetTryingToAssembleTheECObject = "trying to assemble the ec object..."
|
||||||
GetAssemblingSplittedObject = "assembling splitted object..."
|
GetAssemblingSplittedObject = "assembling splitted object..."
|
||||||
|
GetAssemblingECObject = "assembling erasure-coded object..."
|
||||||
|
GetUnableToGetAllPartsECObject = "unable to get all parts, continue to reconstruct with existed"
|
||||||
|
GetUnableToGetPartECObject = "unable to get part of the erasure-encoded object"
|
||||||
|
GetUnableToHeadPartECObject = "unable to head part of the erasure-encoded object"
|
||||||
|
GetUnableToGetECObjectContainer = "unable to get container for erasure-coded object"
|
||||||
GetAssemblingSplittedObjectCompleted = "assembling splitted object completed"
|
GetAssemblingSplittedObjectCompleted = "assembling splitted object completed"
|
||||||
|
GetAssemblingECObjectCompleted = "assembling erasure-coded object completed"
|
||||||
GetFailedToAssembleSplittedObject = "failed to assemble splitted object"
|
GetFailedToAssembleSplittedObject = "failed to assemble splitted object"
|
||||||
|
GetFailedToAssembleECObject = "failed to assemble erasure-coded object"
|
||||||
GetCouldNotGenerateContainerTraverser = "could not generate container traverser"
|
GetCouldNotGenerateContainerTraverser = "could not generate container traverser"
|
||||||
GetCouldNotConstructRemoteNodeClient = "could not construct remote node client"
|
GetCouldNotConstructRemoteNodeClient = "could not construct remote node client"
|
||||||
GetCouldNotWriteHeader = "could not write header"
|
GetCouldNotWriteHeader = "could not write header"
|
||||||
|
@ -111,6 +119,7 @@ const (
|
||||||
GetCompletingTheOperation = "completing the operation"
|
GetCompletingTheOperation = "completing the operation"
|
||||||
fyrchik marked this conversation as resolved
Outdated
|
|||||||
GetRequestedObjectWasMarkedAsRemoved = "requested object was marked as removed"
|
GetRequestedObjectWasMarkedAsRemoved = "requested object was marked as removed"
|
||||||
GetRequestedObjectIsVirtual = "requested object is virtual"
|
GetRequestedObjectIsVirtual = "requested object is virtual"
|
||||||
|
GetRequestedObjectIsEC = "requested object is erasure-coded"
|
||||||
GetRequestedRangeIsOutOfObjectBounds = "requested range is out of object bounds"
|
GetRequestedRangeIsOutOfObjectBounds = "requested range is out of object bounds"
|
||||||
PutAdditionalContainerBroadcastFailure = "additional container broadcast failure"
|
PutAdditionalContainerBroadcastFailure = "additional container broadcast failure"
|
||||||
SearchReturnResultDirectly = "return result directly"
|
SearchReturnResultDirectly = "return result directly"
|
||||||
|
|
|
@ -88,6 +88,10 @@ func (e *StorageEngine) get(ctx context.Context, prm GetPrm) (GetRes, error) {
|
||||||
return GetRes{}, logicerr.Wrap(objectSDK.NewSplitInfoError(it.SplitInfo))
|
return GetRes{}, logicerr.Wrap(objectSDK.NewSplitInfoError(it.SplitInfo))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if it.ECInfo != nil {
|
||||||
|
return GetRes{}, logicerr.Wrap(objectSDK.NewECInfoError(it.ECInfo))
|
||||||
|
}
|
||||||
|
|
||||||
if it.ObjectExpired {
|
if it.ObjectExpired {
|
||||||
return GetRes{}, errNotFound
|
return GetRes{}, errNotFound
|
||||||
}
|
}
|
||||||
|
@ -119,6 +123,7 @@ func (e *StorageEngine) get(ctx context.Context, prm GetPrm) (GetRes, error) {
|
||||||
type getShardIterator struct {
|
type getShardIterator struct {
|
||||||
Object *objectSDK.Object
|
Object *objectSDK.Object
|
||||||
SplitInfo *objectSDK.SplitInfo
|
SplitInfo *objectSDK.SplitInfo
|
||||||
|
ECInfo *objectSDK.ECInfo
|
||||||
OutError error
|
OutError error
|
||||||
ShardWithMeta hashedShard
|
ShardWithMeta hashedShard
|
||||||
MetaError error
|
MetaError error
|
||||||
|
@ -130,6 +135,7 @@ type getShardIterator struct {
|
||||||
Engine *StorageEngine
|
Engine *StorageEngine
|
||||||
|
|
||||||
splitInfoErr *objectSDK.SplitInfoError
|
splitInfoErr *objectSDK.SplitInfoError
|
||||||
|
ecInfoErr *objectSDK.ECInfoError
|
||||||
}
|
}
|
||||||
|
|
||||||
func (i *getShardIterator) tryGetWithMeta(ctx context.Context) {
|
func (i *getShardIterator) tryGetWithMeta(ctx context.Context) {
|
||||||
|
@ -164,6 +170,14 @@ func (i *getShardIterator) tryGetWithMeta(ctx context.Context) {
|
||||||
|
|
||||||
// stop iterating over shards if SplitInfo structure is complete
|
// stop iterating over shards if SplitInfo structure is complete
|
||||||
return withLink && withLast
|
return withLink && withLast
|
||||||
|
case errors.As(err, &i.ecInfoErr):
|
||||||
|
if i.ECInfo == nil {
|
||||||
|
i.ECInfo = objectSDK.NewECInfo()
|
||||||
|
}
|
||||||
|
|
||||||
|
util.MergeECInfo(i.ecInfoErr.ECInfo(), i.ECInfo)
|
||||||
|
// stop iterating over shards if ECInfo structure is complete
|
||||||
|
return len(i.ECInfo.Chunks) == int(i.ECInfo.Chunks[0].Total)
|
||||||
case client.IsErrObjectAlreadyRemoved(err):
|
case client.IsErrObjectAlreadyRemoved(err):
|
||||||
i.OutError = err
|
i.OutError = err
|
||||||
return true // stop, return it back
|
return true // stop, return it back
|
||||||
|
|
|
@ -75,6 +75,8 @@ func (e *StorageEngine) head(ctx context.Context, prm HeadPrm) (HeadRes, error)
|
||||||
head *objectSDK.Object
|
head *objectSDK.Object
|
||||||
siErr *objectSDK.SplitInfoError
|
siErr *objectSDK.SplitInfoError
|
||||||
outSI *objectSDK.SplitInfo
|
outSI *objectSDK.SplitInfo
|
||||||
|
eiErr *objectSDK.ECInfoError
|
||||||
|
outEI *objectSDK.ECInfo
|
||||||
outError error = new(apistatus.ObjectNotFound)
|
outError error = new(apistatus.ObjectNotFound)
|
||||||
shPrm shard.HeadPrm
|
shPrm shard.HeadPrm
|
||||||
)
|
)
|
||||||
|
@ -99,6 +101,13 @@ func (e *StorageEngine) head(ctx context.Context, prm HeadPrm) (HeadRes, error)
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
return false
|
return false
|
||||||
|
case errors.As(err, &eiErr):
|
||||||
|
if outEI == nil {
|
||||||
|
outEI = objectSDK.NewECInfo()
|
||||||
|
}
|
||||||
|
util.MergeECInfo(eiErr.ECInfo(), outEI)
|
||||||
|
// stop iterating over shards if ECInfo structure is complete
|
||||||
|
return len(outEI.Chunks) == int(outEI.Chunks[0].Total)
|
||||||
case client.IsErrObjectAlreadyRemoved(err):
|
case client.IsErrObjectAlreadyRemoved(err):
|
||||||
outError = err
|
outError = err
|
||||||
return true // stop, return it back
|
return true // stop, return it back
|
||||||
|
@ -118,6 +127,8 @@ func (e *StorageEngine) head(ctx context.Context, prm HeadPrm) (HeadRes, error)
|
||||||
|
|
||||||
if outSI != nil {
|
if outSI != nil {
|
||||||
return HeadRes{}, logicerr.Wrap(objectSDK.NewSplitInfoError(outSI))
|
return HeadRes{}, logicerr.Wrap(objectSDK.NewSplitInfoError(outSI))
|
||||||
|
} else if outEI != nil {
|
||||||
|
return HeadRes{}, logicerr.Wrap(objectSDK.NewECInfoError(outEI))
|
||||||
} else if head == nil {
|
} else if head == nil {
|
||||||
return HeadRes{}, outError
|
return HeadRes{}, outError
|
||||||
}
|
}
|
||||||
|
|
|
@ -108,6 +108,10 @@ func (db *DB) exists(tx *bbolt.Tx, addr oid.Address, currEpoch uint64) (exists b
|
||||||
|
|
||||||
return false, logicerr.Wrap(objectSDK.NewSplitInfoError(splitInfo))
|
return false, logicerr.Wrap(objectSDK.NewSplitInfoError(splitInfo))
|
||||||
}
|
}
|
||||||
|
// if parent bucket is empty, then check if object exists in ec bucket
|
||||||
|
if data := getFromBucket(tx, ecInfoBucketName(cnr, key), objKey); len(data) != 0 {
|
||||||
|
return false, getECInfoError(tx, cnr, data)
|
||||||
|
}
|
||||||
|
|
||||||
// if parent bucket is empty, then check if object exists in typed buckets
|
// if parent bucket is empty, then check if object exists in typed buckets
|
||||||
return firstIrregularObjectType(tx, cnr, objKey) != objectSDK.TypeRegular, nil
|
return firstIrregularObjectType(tx, cnr, objKey) != objectSDK.TypeRegular, nil
|
||||||
|
|
|
@ -110,6 +110,11 @@ func (db *DB) get(tx *bbolt.Tx, addr oid.Address, key []byte, checkStatus, raw b
|
||||||
return obj, obj.Unmarshal(data)
|
return obj, obj.Unmarshal(data)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
data = getFromBucket(tx, ecInfoBucketName(cnr, bucketName), key)
|
||||||
|
if len(data) != 0 {
|
||||||
|
return nil, getECInfoError(tx, cnr, data)
|
||||||
|
}
|
||||||
|
|
||||||
// if not found then check in tombstone index
|
// if not found then check in tombstone index
|
||||||
data = getFromBucket(tx, tombstoneBucketName(cnr, bucketName), key)
|
data = getFromBucket(tx, tombstoneBucketName(cnr, bucketName), key)
|
||||||
if len(data) != 0 {
|
if len(data) != 0 {
|
||||||
|
@ -185,3 +190,27 @@ func getSplitInfoError(tx *bbolt.Tx, cnr cid.ID, key []byte) error {
|
||||||
|
|
||||||
return logicerr.Wrap(new(apistatus.ObjectNotFound))
|
return logicerr.Wrap(new(apistatus.ObjectNotFound))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func getECInfoError(tx *bbolt.Tx, cnr cid.ID, data []byte) error {
|
||||||
|
offset := 0
|
||||||
|
ecInfo := objectSDK.NewECInfo()
|
||||||
|
for offset < len(data) {
|
||||||
|
key := data[offset : offset+objectKeySize]
|
||||||
|
// check in primary index
|
||||||
|
ojbData := getFromBucket(tx, primaryBucketName(cnr, make([]byte, bucketKeySize)), key)
|
||||||
|
if len(data) != 0 {
|
||||||
|
obj := objectSDK.New()
|
||||||
|
if err := obj.Unmarshal(ojbData); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
chunk := objectSDK.ECChunk{}
|
||||||
|
id, _ := obj.ID()
|
||||||
|
chunk.SetID(id)
|
||||||
|
chunk.Index = obj.ECHeader().Index()
|
||||||
|
chunk.Total = obj.ECHeader().Total()
|
||||||
|
ecInfo.AddChunk(chunk)
|
||||||
|
}
|
||||||
|
offset += objectKeySize
|
||||||
|
}
|
||||||
|
return logicerr.Wrap(objectSDK.NewECInfoError(ecInfo))
|
||||||
|
}
|
||||||
|
|
|
@ -3,6 +3,7 @@ package meta_test
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"os"
|
"os"
|
||||||
"runtime"
|
"runtime"
|
||||||
|
@ -15,8 +16,10 @@ import (
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
||||||
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
|
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
|
||||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/erasurecode"
|
||||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
|
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
|
||||||
|
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -109,6 +112,42 @@ func TestDB_Get(t *testing.T) {
|
||||||
require.True(t, binaryEqual(child.CutPayload(), newChild))
|
require.True(t, binaryEqual(child.CutPayload(), newChild))
|
||||||
})
|
})
|
||||||
|
|
||||||
|
t.Run("put erasure-coded object", func(t *testing.T) {
|
||||||
|
cnr := cidtest.ID()
|
||||||
|
virtual := testutil.GenerateObjectWithCID(cnr)
|
||||||
|
c, err := erasurecode.NewConstructor(3, 1)
|
||||||
|
require.NoError(t, err)
|
||||||
|
pk, err := keys.NewPrivateKey()
|
||||||
|
require.NoError(t, err)
|
||||||
|
parts, err := c.Split(virtual, &pk.PrivateKey)
|
||||||
|
require.NoError(t, err)
|
||||||
|
for _, part := range parts {
|
||||||
|
err = putBig(db, part)
|
||||||
|
var eiError *objectSDK.ECInfoError
|
||||||
|
if err != nil && !errors.As(err, &eiError) {
|
||||||
|
require.NoError(t, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
_, err = metaGet(db, object.AddressOf(virtual), true)
|
||||||
|
var eiError *objectSDK.ECInfoError
|
||||||
|
require.ErrorAs(t, err, &eiError)
|
||||||
|
require.Equal(t, len(eiError.ECInfo().Chunks), len(parts))
|
||||||
|
for _, chunk := range eiError.ECInfo().Chunks {
|
||||||
|
var found bool
|
||||||
|
for _, part := range parts {
|
||||||
|
partID, _ := part.ID()
|
||||||
|
var chunkID oid.ID
|
||||||
|
require.NoError(t, chunkID.ReadFromV2(chunk.ID))
|
||||||
|
if chunkID.Equals(partID) {
|
||||||
|
found = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if !found {
|
||||||
|
require.Fail(t, "chunk not found")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
t.Run("get removed object", func(t *testing.T) {
|
t.Run("get removed object", func(t *testing.T) {
|
||||||
obj := oidtest.Address()
|
obj := oidtest.Address()
|
||||||
ts := oidtest.Address()
|
ts := oidtest.Address()
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package meta
|
package meta
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
"encoding/binary"
|
"encoding/binary"
|
||||||
"errors"
|
"errors"
|
||||||
|
@ -264,6 +265,13 @@ func putUniqueIndexes(
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if ecHead := obj.GetECHeader(); ecHead != nil {
|
||||||
|
err = putECInfo(tx, cnr, objKey, ecHead)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
@ -571,3 +579,34 @@ func isLinkObject(obj *objectSDK.Object) bool {
|
||||||
func isLastObject(obj *objectSDK.Object) bool {
|
func isLastObject(obj *objectSDK.Object) bool {
|
||||||
return len(obj.Children()) == 0 && obj.Parent() != nil
|
return len(obj.Children()) == 0 && obj.Parent() != nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func putECInfo(tx *bbolt.Tx,
|
||||||
|
cnr cid.ID, objKey []byte,
|
||||||
|
ecHead *objectSDK.ECHeader,
|
||||||
|
) error {
|
||||||
|
parentID := objectKey(ecHead.Parent(), make([]byte, objectKeySize))
|
||||||
|
bucketName := make([]byte, bucketKeySize)
|
||||||
|
|
||||||
|
val := getFromBucket(tx, ecInfoBucketName(cnr, bucketName), parentID)
|
||||||
|
if len(val) == 0 {
|
||||||
|
val = objKey
|
||||||
|
} else {
|
||||||
|
offset := 0
|
||||||
|
found := false
|
||||||
|
for offset < len(val) {
|
||||||
|
if bytes.Equal(objKey, val[offset:offset+objectKeySize]) {
|
||||||
|
found = true
|
||||||
|
break
|
||||||
|
}
|
||||||
|
offset += objectKeySize
|
||||||
|
}
|
||||||
|
if !found {
|
||||||
|
val = append(val, objKey...)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return putUniqueIndexItem(tx, namedBucketItem{
|
||||||
|
name: ecInfoBucketName(cnr, make([]byte, bucketKeySize)),
|
||||||
|
key: parentID,
|
||||||
|
val: val,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
|
@ -119,6 +119,11 @@ const (
|
||||||
// Key: container ID + type
|
// Key: container ID + type
|
||||||
// Value: container size in bytes as little-endian uint64
|
// Value: container size in bytes as little-endian uint64
|
||||||
containerCountersPrefix
|
containerCountersPrefix
|
||||||
|
|
||||||
|
// ecInfoPrefix is used for storing relation between EC parent id and chunk id.
|
||||||
|
// Key: container ID + type
|
||||||
|
// Value: Object id
|
||||||
|
ecInfoPrefix
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
@ -190,6 +195,11 @@ func splitBucketName(cnr cid.ID, key []byte) []byte {
|
||||||
return bucketName(cnr, splitPrefix, key)
|
return bucketName(cnr, splitPrefix, key)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ecInfoBucketName returns <CID>_ecinfo.
|
||||||
|
func ecInfoBucketName(cnr cid.ID, key []byte) []byte {
|
||||||
|
return bucketName(cnr, ecInfoPrefix, key)
|
||||||
|
}
|
||||||
|
|
||||||
// addressKey returns key for K-V tables when key is a whole address.
|
// addressKey returns key for K-V tables when key is a whole address.
|
||||||
func addressKey(addr oid.Address, key []byte) []byte {
|
func addressKey(addr oid.Address, key []byte) []byte {
|
||||||
addr.Container().Encode(key)
|
addr.Container().Encode(key)
|
||||||
|
|
25
pkg/local_object_storage/util/ecinfo.go
Normal file
25
pkg/local_object_storage/util/ecinfo.go
Normal file
|
@ -0,0 +1,25 @@
|
||||||
|
package util
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
|
||||||
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
|
)
|
||||||
|
|
||||||
|
// MergeECInfo ignores conflicts and rewrites `to` with non empty values
|
||||||
|
// from `from`.
|
||||||
|
func MergeECInfo(from, to *objectSDK.ECInfo) *objectSDK.ECInfo {
|
||||||
|
for _, fchunk := range from.Chunks {
|
||||||
|
add := true
|
||||||
|
for _, tchunk := range to.Chunks {
|
||||||
|
if bytes.Equal(tchunk.ID.GetValue(), fchunk.ID.GetValue()) {
|
||||||
|
add = false
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if add {
|
||||||
|
to.AddChunk(*objectSDK.NewECChunkFromV2(&fchunk))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return to
|
||||||
|
}
|
56
pkg/local_object_storage/util/ecinfo_test.go
Normal file
56
pkg/local_object_storage/util/ecinfo_test.go
Normal file
|
@ -0,0 +1,56 @@
|
||||||
|
package util
|
||||||
|
|
||||||
|
import (
|
||||||
|
"crypto/rand"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestMergeECInfo(t *testing.T) {
|
||||||
|
id := generateV2ID()
|
||||||
|
target := objectSDK.NewECInfo()
|
||||||
|
var chunk objectSDK.ECChunk
|
||||||
|
chunk.Total = 2
|
||||||
|
chunk.Index = 0
|
||||||
|
chunk.SetID(id)
|
||||||
|
target.AddChunk(chunk)
|
||||||
|
|
||||||
|
t.Run("merge empty", func(t *testing.T) {
|
||||||
|
to := objectSDK.NewECInfo()
|
||||||
|
|
||||||
|
result := MergeECInfo(target, to)
|
||||||
|
require.Equal(t, result, target)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("merge existed", func(t *testing.T) {
|
||||||
|
to := objectSDK.NewECInfo()
|
||||||
|
to.AddChunk(chunk)
|
||||||
|
|
||||||
|
result := MergeECInfo(target, to)
|
||||||
|
require.Equal(t, result, target)
|
||||||
|
})
|
||||||
|
t.Run("merge extend", func(t *testing.T) {
|
||||||
|
to := objectSDK.NewECInfo()
|
||||||
|
var chunk objectSDK.ECChunk
|
||||||
|
chunk.Total = 2
|
||||||
|
chunk.Index = 1
|
||||||
|
chunk.SetID(generateV2ID())
|
||||||
|
to.AddChunk(chunk)
|
||||||
|
|
||||||
|
result := MergeECInfo(target, to)
|
||||||
|
require.Equal(t, len(result.Chunks), 2)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func generateV2ID() oid.ID {
|
||||||
|
var buf [32]byte
|
||||||
|
_, _ = rand.Read(buf[:])
|
||||||
|
|
||||||
|
var id oid.ID
|
||||||
|
_ = id.Decode(buf[:])
|
||||||
|
|
||||||
|
return id
|
||||||
|
}
|
6
pkg/network/cache/multi.go
vendored
6
pkg/network/cache/multi.go
vendored
|
@ -167,8 +167,9 @@ func (x *multiClient) iterateClients(ctx context.Context, f func(clientcore.Clie
|
||||||
// from the SDK client; should not be considered
|
// from the SDK client; should not be considered
|
||||||
// as a connection error
|
// as a connection error
|
||||||
var siErr *objectSDK.SplitInfoError
|
var siErr *objectSDK.SplitInfoError
|
||||||
|
var eiErr *objectSDK.ECInfoError
|
||||||
|
|
||||||
success := err == nil || errors.Is(err, context.Canceled) || errors.As(err, &siErr)
|
success := err == nil || errors.Is(err, context.Canceled) || errors.As(err, &siErr) || errors.As(err, &eiErr)
|
||||||
if success || firstErr == nil || errors.Is(firstErr, errRecentlyFailed) {
|
if success || firstErr == nil || errors.Is(firstErr, errRecentlyFailed) {
|
||||||
firstErr = err
|
firstErr = err
|
||||||
}
|
}
|
||||||
|
@ -196,7 +197,8 @@ func (x *multiClient) ReportError(err error) {
|
||||||
// from the SDK client; should not be considered
|
// from the SDK client; should not be considered
|
||||||
// as a connection error
|
// as a connection error
|
||||||
var siErr *objectSDK.SplitInfoError
|
var siErr *objectSDK.SplitInfoError
|
||||||
if errors.As(err, &siErr) {
|
var eiErr *objectSDK.ECInfoError
|
||||||
|
if errors.As(err, &siErr) || errors.As(err, &eiErr) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -139,9 +139,11 @@ func (r *request) getObjectWithIndependentRequest(ctx context.Context, prm Reque
|
||||||
remoteStorageConstructor: r.remoteStorageConstructor,
|
remoteStorageConstructor: r.remoteStorageConstructor,
|
||||||
epochSource: r.epochSource,
|
epochSource: r.epochSource,
|
||||||
localStorage: r.localStorage,
|
localStorage: r.localStorage,
|
||||||
|
containerSource: r.containerSource,
|
||||||
|
|
||||||
prm: prm,
|
prm: prm,
|
||||||
infoSplit: objectSDK.NewSplitInfo(),
|
infoSplit: objectSDK.NewSplitInfo(),
|
||||||
|
infoEC: objectSDK.NewECInfo(),
|
||||||
log: r.log,
|
log: r.log,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
79
pkg/services/object/get/assembleec.go
Normal file
79
pkg/services/object/get/assembleec.go
Normal file
|
@ -0,0 +1,79 @@
|
||||||
|
package getsvc
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||||
|
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||||
|
"go.uber.org/zap"
|
||||||
|
)
|
||||||
|
|
||||||
|
func (r *request) assembleEC(ctx context.Context) {
|
||||||
|
if r.isRaw() {
|
||||||
|
r.log.Debug(logs.GetCanNotAssembleTheObject)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Any access tokens are not expected to be used in the assembly process:
|
||||||
|
// - there is no requirement to specify child objects in session/bearer
|
||||||
|
// token for `GET`/`GETRANGE`/`RANGEHASH` requests in the API protocol,
|
||||||
|
// and, therefore, their missing in the original request should not be
|
||||||
|
// considered as error; on the other hand, without session for every child
|
||||||
|
// object, it is impossible to attach bearer token in the new generated
|
||||||
|
// requests correctly because the token has not been issued for that node's
|
||||||
|
// key;
|
||||||
|
// - the assembly process is expected to be handled on a container node
|
||||||
|
// only since the requests forwarding mechanism presentation; such the
|
||||||
|
// node should have enough rights for getting any child object by design.
|
||||||
|
r.prm.common.ForgetTokens()
|
||||||
|
|
||||||
|
// Do not use forwarding during assembly stage.
|
||||||
|
// Request forwarding closure inherited in produced
|
||||||
|
// `execCtx` so it should be disabled there.
|
||||||
|
r.disableForwarding()
|
||||||
|
|
||||||
|
r.log.Debug(logs.GetTryingToAssembleTheECObject)
|
||||||
|
|
||||||
|
assembler := newAssemblerEC(r.address(), r.infoEC, r.ctxRange(), r, r.containerSource, r.log)
|
||||||
|
|
||||||
|
r.log.Debug(logs.GetAssemblingECObject,
|
||||||
|
zap.Stringer("address", r.address()),
|
||||||
|
zap.Uint64("range_offset", r.ctxRange().GetOffset()),
|
||||||
|
zap.Uint64("range_length", r.ctxRange().GetLength()),
|
||||||
|
)
|
||||||
|
defer r.log.Debug(logs.GetAssemblingECObjectCompleted,
|
||||||
|
zap.Stringer("address", r.address()),
|
||||||
|
zap.Uint64("range_offset", r.ctxRange().GetOffset()),
|
||||||
|
zap.Uint64("range_length", r.ctxRange().GetLength()),
|
||||||
|
)
|
||||||
|
|
||||||
|
obj, err := assembler.Assemble(ctx, r.prm.objWriter, r.headOnly())
|
||||||
|
if err != nil {
|
||||||
|
r.log.Warn(logs.GetFailedToAssembleECObject,
|
||||||
|
zap.Error(err),
|
||||||
|
zap.Stringer("address", r.address()),
|
||||||
|
zap.Uint64("range_offset", r.ctxRange().GetOffset()),
|
||||||
|
zap.Uint64("range_length", r.ctxRange().GetLength()),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
var errRemoved *apistatus.ObjectAlreadyRemoved
|
||||||
|
var errOutOfRange *apistatus.ObjectOutOfRange
|
||||||
|
|
||||||
|
switch {
|
||||||
dstepanov-yadro
commented
Look like remote and local errors are the same. Look like remote and local errors are the same.
acid-ant
commented
Removed. Removed.
dstepanov-yadro
commented
Incompleted
Incompleted
```
var errOutOfRangeRemote *apistatus.ObjectOutOfRange
var errOutOfRangeLocal *apistatus.ObjectOutOfRange
```
acid-ant
commented
Sorry, fixed. Sorry, fixed.
|
|||||||
|
default:
|
||||||
|
r.status = statusUndefined
|
||||||
|
r.err = err
|
||||||
|
case err == nil:
|
||||||
|
r.status = statusOK
|
||||||
|
r.err = nil
|
||||||
|
r.collectedObject = obj
|
||||||
|
case errors.As(err, &errRemoved):
|
||||||
|
r.status = statusINHUMED
|
||||||
|
r.err = errRemoved
|
||||||
|
case errors.As(err, &errOutOfRange):
|
||||||
|
r.status = statusOutOfRange
|
||||||
|
r.err = errOutOfRange
|
||||||
|
}
|
||||||
|
}
|
117
pkg/services/object/get/assemblerec.go
Normal file
117
pkg/services/object/get/assemblerec.go
Normal file
|
@ -0,0 +1,117 @@
|
||||||
|
package getsvc
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/policy"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||||
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/erasurecode"
|
||||||
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
|
"go.uber.org/zap"
|
||||||
|
"golang.org/x/sync/errgroup"
|
||||||
|
)
|
||||||
|
|
||||||
|
type assemblerec struct {
|
||||||
|
addr oid.Address
|
||||||
|
ecInfo *objectSDK.ECInfo
|
||||||
|
rng *objectSDK.Range
|
||||||
|
objGetter objectGetter
|
||||||
|
cs container.Source
|
||||||
|
log *logger.Logger
|
||||||
|
}
|
||||||
|
|
||||||
|
func newAssemblerEC(
|
||||||
|
addr oid.Address,
|
||||||
|
ecInfo *objectSDK.ECInfo,
|
||||||
|
rng *objectSDK.Range,
|
||||||
|
objGetter objectGetter,
|
||||||
|
cs container.Source,
|
||||||
|
log *logger.Logger,
|
||||||
|
) *assemblerec {
|
||||||
|
return &assemblerec{
|
||||||
|
addr: addr,
|
||||||
|
rng: rng,
|
||||||
|
ecInfo: ecInfo,
|
||||||
|
objGetter: objGetter,
|
||||||
|
cs: cs,
|
||||||
|
log: log,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Assemble assembles erasure-coded object and writes it's content to ObjectWriter.
|
||||||
|
// It returns parent object.
|
||||||
|
func (a *assemblerec) Assemble(ctx context.Context, writer ObjectWriter, headOnly bool) (*objectSDK.Object, error) {
|
||||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
When we are to add GET_RANGE do we need to alter this function or write code in another place? When we are to add GET_RANGE do we need to alter this function or write code in another place?
acid-ant
commented
We need to extend assembler for EC a bit, yes. We need to extend assembler for EC a bit, yes.
|
|||||||
|
parts := a.retrieveParts(ctx, headOnly)
|
||||||
|
cnt, err := a.cs.Get(a.addr.Container())
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
c, err := erasurecode.NewConstructor(
|
||||||
|
policy.ECDataCount(cnt.Value.PlacementPolicy()),
|
||||||
|
policy.ECParityCount(cnt.Value.PlacementPolicy()),
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if headOnly {
|
||||||
|
obj, err := c.ReconstructHeader(parts)
|
||||||
|
if err == nil {
|
||||||
|
return obj, writer.WriteHeader(ctx, obj)
|
||||||
|
}
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
obj, err := c.Reconstruct(parts)
|
||||||
|
if err == nil {
|
||||||
|
err = writer.WriteHeader(ctx, obj.CutPayload())
|
||||||
|
if err == nil {
|
||||||
|
err = writer.WriteChunk(ctx, obj.Payload())
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return obj, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *assemblerec) retrieveParts(mainCtx context.Context, headOnly bool) []*objectSDK.Object {
|
||||||
|
parts := make([]*objectSDK.Object, int(a.ecInfo.Chunks[0].Total))
|
||||||
|
errGroup, ctx := errgroup.WithContext(mainCtx)
|
||||||
|
|
||||||
|
for i := range a.ecInfo.Chunks {
|
||||||
|
chunk := a.ecInfo.Chunks[i]
|
||||||
|
errGroup.Go(func() error {
|
||||||
|
objID := new(oid.ID)
|
||||||
|
err := objID.ReadFromV2(chunk.ID)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("invalid object ID: %w", err)
|
||||||
|
}
|
||||||
|
var obj *objectSDK.Object
|
||||||
|
if headOnly {
|
||||||
|
obj, err = a.objGetter.HeadObject(ctx, *objID)
|
||||||
|
if err != nil {
|
||||||
|
a.log.Debug(logs.GetUnableToHeadPartECObject, zap.Stringer("part_id", objID), zap.Error(err))
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
sow := NewSimpleObjectWriter()
|
||||||
|
obj, err = a.objGetter.GetObjectAndWritePayload(ctx, *objID, nil, sow)
|
||||||
|
if err != nil {
|
||||||
dstepanov-yadro
commented
ctx -> mainCtx, or even better to move ctx -> mainCtx, or even better to move `errgroup` related code to separate func.
As far as I remember errgoup's ctx is cancelled after Wait.
acid-ant
commented
Thanks a lot! Split on two functions. Thanks a lot! Split on two functions.
|
|||||||
|
a.log.Debug(logs.GetUnableToGetPartECObject, zap.Stringer("part_id", objID), zap.Error(err))
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
obj.SetPayload(sow.pld)
|
||||||
|
}
|
||||||
|
parts[chunk.Index] = obj
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := errGroup.Wait(); err != nil {
|
||||||
|
a.log.Debug(logs.GetUnableToGetAllPartsECObject, zap.Error(err))
|
||||||
|
}
|
||||||
|
return parts
|
||||||
|
}
|
|
@ -73,9 +73,12 @@ func (s *Service) get(ctx context.Context, prm RequestParameters) error {
|
||||||
remoteStorageConstructor: s.remoteStorageConstructor,
|
remoteStorageConstructor: s.remoteStorageConstructor,
|
||||||
epochSource: s.epochSource,
|
epochSource: s.epochSource,
|
||||||
localStorage: s.localStorage,
|
localStorage: s.localStorage,
|
||||||
|
containerSource: s.containerSource,
|
||||||
|
|
||||||
prm: prm,
|
prm: prm,
|
||||||
infoSplit: objectSDK.NewSplitInfo(),
|
infoSplit: objectSDK.NewSplitInfo(),
|
||||||
|
infoEC: objectSDK.NewECInfo(),
|
||||||
|
log: s.log,
|
||||||
}
|
}
|
||||||
|
|
||||||
exec.setLogger(s.log)
|
exec.setLogger(s.log)
|
||||||
|
@ -106,6 +109,16 @@ func (exec *request) analyzeStatus(ctx context.Context, execCnr bool) {
|
||||||
exec.assemble(ctx)
|
exec.assemble(ctx)
|
||||||
case statusOutOfRange:
|
case statusOutOfRange:
|
||||||
exec.log.Debug(logs.GetRequestedRangeIsOutOfObjectBounds)
|
exec.log.Debug(logs.GetRequestedRangeIsOutOfObjectBounds)
|
||||||
|
case statusEC:
|
||||||
|
if !exec.isLocal() {
|
||||||
|
if execCnr {
|
||||||
|
exec.executeOnContainer(ctx)
|
||||||
|
exec.analyzeStatus(ctx, false)
|
||||||
|
} else {
|
||||||
|
exec.log.Debug(logs.GetRequestedObjectIsEC)
|
||||||
|
exec.assembleEC(ctx)
|
||||||
|
}
|
||||||
|
}
|
||||||
default:
|
default:
|
||||||
exec.log.Debug(logs.OperationFinishedWithError,
|
exec.log.Debug(logs.OperationFinishedWithError,
|
||||||
zap.Error(exec.err),
|
zap.Error(exec.err),
|
||||||
|
|
|
@ -5,6 +5,7 @@ import (
|
||||||
"errors"
|
"errors"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
|
@ -22,6 +23,7 @@ func (r *request) executeLocal(ctx context.Context) {
|
||||||
r.collectedObject, err = r.get(ctx)
|
r.collectedObject, err = r.get(ctx)
|
||||||
|
|
||||||
var errSplitInfo *objectSDK.SplitInfoError
|
var errSplitInfo *objectSDK.SplitInfoError
|
||||||
|
var errECInfo *objectSDK.ECInfoError
|
||||||
var errRemoved *apistatus.ObjectAlreadyRemoved
|
var errRemoved *apistatus.ObjectAlreadyRemoved
|
||||||
var errOutOfRange *apistatus.ObjectOutOfRange
|
var errOutOfRange *apistatus.ObjectOutOfRange
|
||||||
|
|
||||||
|
@ -42,6 +44,10 @@ func (r *request) executeLocal(ctx context.Context) {
|
||||||
r.status = statusVIRTUAL
|
r.status = statusVIRTUAL
|
||||||
mergeSplitInfo(r.splitInfo(), errSplitInfo.SplitInfo())
|
mergeSplitInfo(r.splitInfo(), errSplitInfo.SplitInfo())
|
||||||
r.err = objectSDK.NewSplitInfoError(r.infoSplit)
|
r.err = objectSDK.NewSplitInfoError(r.infoSplit)
|
||||||
|
case errors.As(err, &errECInfo):
|
||||||
|
r.status = statusEC
|
||||||
|
util.MergeECInfo(errECInfo.ECInfo(), r.infoEC)
|
||||||
|
r.err = objectSDK.NewECInfoError(r.infoEC)
|
||||||
case errors.As(err, &errOutOfRange):
|
case errors.As(err, &errOutOfRange):
|
||||||
r.status = statusOutOfRange
|
r.status = statusOutOfRange
|
||||||
r.err = errOutOfRange
|
r.err = errOutOfRange
|
||||||
|
|
|
@ -7,6 +7,8 @@ import (
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/policy"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
|
@ -27,15 +29,20 @@ func (r *request) processNode(ctx context.Context, info client.NodeInfo) bool {
|
||||||
obj, err := r.getRemote(ctx, rs, info)
|
obj, err := r.getRemote(ctx, rs, info)
|
||||||
|
|
||||||
var errSplitInfo *objectSDK.SplitInfoError
|
var errSplitInfo *objectSDK.SplitInfoError
|
||||||
|
var errECInfo *objectSDK.ECInfoError
|
||||||
var errRemoved *apistatus.ObjectAlreadyRemoved
|
var errRemoved *apistatus.ObjectAlreadyRemoved
|
||||||
var errOutOfRange *apistatus.ObjectOutOfRange
|
var errOutOfRange *apistatus.ObjectOutOfRange
|
||||||
|
|
||||||
switch {
|
switch {
|
||||||
default:
|
default:
|
||||||
|
r.log.Debug(logs.GetRemoteCallFailed, zap.Error(err))
|
||||||
|
if r.status == statusEC {
|
||||||
fyrchik
commented
What does this condition checks? What does this condition checks?
acid-ant
commented
We need to continue to process the nodes in case of getting error from the node. Updated this line a bit. We need to continue to process the nodes in case of getting error from the node. Updated this line a bit.
|
|||||||
|
// we need to continue getting another chunks from another nodes
|
||||||
|
// in case of network issue
|
||||||
|
return false
|
||||||
|
}
|
||||||
r.status = statusUndefined
|
r.status = statusUndefined
|
||||||
r.err = new(apistatus.ObjectNotFound)
|
r.err = new(apistatus.ObjectNotFound)
|
||||||
|
|
||||||
r.log.Debug(logs.GetRemoteCallFailed, zap.Error(err))
|
|
||||||
case err == nil:
|
case err == nil:
|
||||||
r.status = statusOK
|
r.status = statusOK
|
||||||
r.err = nil
|
r.err = nil
|
||||||
|
@ -57,6 +64,20 @@ func (r *request) processNode(ctx context.Context, info client.NodeInfo) bool {
|
||||||
r.status = statusVIRTUAL
|
r.status = statusVIRTUAL
|
||||||
mergeSplitInfo(r.splitInfo(), errSplitInfo.SplitInfo())
|
mergeSplitInfo(r.splitInfo(), errSplitInfo.SplitInfo())
|
||||||
r.err = objectSDK.NewSplitInfoError(r.infoSplit)
|
r.err = objectSDK.NewSplitInfoError(r.infoSplit)
|
||||||
|
case errors.As(err, &errECInfo):
|
||||||
|
r.status = statusEC
|
||||||
|
util.MergeECInfo(r.infoEC, errECInfo.ECInfo())
|
||||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
Each TODO should be accompanied by an issue, could you create one? Each TODO should be accompanied by an issue, could you create one?
fyrchik
commented
Anyway, the condition here is easy to fix, the number of chunks we must have is equal to Anyway, the condition here is easy to fix, the number of chunks we must have is equal to `placementpolicy.ECDataCount()`
acid-ant
commented
Fixed. Fixed.
|
|||||||
|
r.infoEC = errECInfo.ECInfo()
|
||||||
|
r.err = objectSDK.NewECInfoError(r.infoEC)
|
||||||
|
if r.isRaw() {
|
||||||
|
return len(r.infoEC.Chunks) == int(r.infoEC.Chunks[0].Total)
|
||||||
|
}
|
||||||
|
cnt, err := r.containerSource.Get(r.address().Container())
|
||||||
|
if err == nil {
|
||||||
|
return len(r.infoEC.Chunks) == policy.ECDataCount(cnt.Value.PlacementPolicy())
|
||||||
|
}
|
||||||
|
r.log.Debug(logs.GetUnableToGetECObjectContainer, zap.Error(err))
|
||||||
|
return len(r.infoEC.Chunks) == int(r.infoEC.Chunks[0].Total)
|
||||||
}
|
}
|
||||||
|
|
||||||
return r.status != statusUndefined
|
return r.status != statusUndefined
|
||||||
|
|
|
@ -6,6 +6,7 @@ import (
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||||
clientcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
|
clientcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||||
|
@ -22,6 +23,8 @@ type request struct {
|
||||||
|
|
||||||
infoSplit *objectSDK.SplitInfo
|
infoSplit *objectSDK.SplitInfo
|
||||||
|
|
||||||
|
infoEC *objectSDK.ECInfo
|
||||||
|
|
||||||
log *logger.Logger
|
log *logger.Logger
|
||||||
|
|
||||||
collectedObject *objectSDK.Object
|
collectedObject *objectSDK.Object
|
||||||
|
@ -33,6 +36,7 @@ type request struct {
|
||||||
traverserGenerator traverserGenerator
|
traverserGenerator traverserGenerator
|
||||||
remoteStorageConstructor remoteStorageConstructor
|
remoteStorageConstructor remoteStorageConstructor
|
||||||
localStorage localStorage
|
localStorage localStorage
|
||||||
|
containerSource container.Source
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *request) setLogger(l *logger.Logger) {
|
func (r *request) setLogger(l *logger.Logger) {
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package getsvc
|
package getsvc
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
@ -16,6 +17,7 @@ type Service struct {
|
||||||
epochSource epochSource
|
epochSource epochSource
|
||||||
keyStore keyStorage
|
keyStore keyStorage
|
||||||
remoteStorageConstructor remoteStorageConstructor
|
remoteStorageConstructor remoteStorageConstructor
|
||||||
|
containerSource container.Source
|
||||||
}
|
}
|
||||||
|
|
||||||
// New creates, initializes and returns utility serving
|
// New creates, initializes and returns utility serving
|
||||||
|
@ -26,6 +28,7 @@ func New(
|
||||||
e localStorageEngine,
|
e localStorageEngine,
|
||||||
tg traverserGenerator,
|
tg traverserGenerator,
|
||||||
cc clientConstructor,
|
cc clientConstructor,
|
||||||
|
cs container.Source,
|
||||||
opts ...Option,
|
opts ...Option,
|
||||||
) *Service {
|
) *Service {
|
||||||
result := &Service{
|
result := &Service{
|
||||||
|
@ -39,6 +42,7 @@ func New(
|
||||||
remoteStorageConstructor: &multiclientRemoteStorageConstructor{
|
remoteStorageConstructor: &multiclientRemoteStorageConstructor{
|
||||||
clientConstructor: cc,
|
clientConstructor: cc,
|
||||||
},
|
},
|
||||||
|
containerSource: cs,
|
||||||
}
|
}
|
||||||
for _, option := range opts {
|
for _, option := range opts {
|
||||||
option(result)
|
option(result)
|
||||||
|
|
|
@ -6,6 +6,7 @@ const (
|
||||||
statusINHUMED
|
statusINHUMED
|
||||||
statusVIRTUAL
|
statusVIRTUAL
|
||||||
statusOutOfRange
|
statusOutOfRange
|
||||||
|
statusEC
|
||||||
)
|
)
|
||||||
|
|
||||||
type statusError struct {
|
type statusError struct {
|
||||||
|
|
|
@ -166,6 +166,9 @@ func (f *getRequestForwarder) readStream(ctx context.Context, c client.MultiAddr
|
||||||
case *objectV2.SplitInfo:
|
case *objectV2.SplitInfo:
|
||||||
si := objectSDK.NewSplitInfoFromV2(v)
|
si := objectSDK.NewSplitInfoFromV2(v)
|
||||||
return objectSDK.NewSplitInfoError(si)
|
return objectSDK.NewSplitInfoError(si)
|
||||||
|
case *objectV2.ECInfo:
|
||||||
|
ei := objectSDK.NewECInfoFromV2(v)
|
||||||
|
return objectSDK.NewECInfoError(ei)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
|
|
|
@ -84,6 +84,9 @@ func (f *headRequestForwarder) forwardRequestToNode(ctx context.Context, addr ne
|
||||||
case *objectV2.SplitInfo:
|
case *objectV2.SplitInfo:
|
||||||
si := objectSDK.NewSplitInfoFromV2(v)
|
si := objectSDK.NewSplitInfoFromV2(v)
|
||||||
return nil, objectSDK.NewSplitInfoError(si)
|
return nil, objectSDK.NewSplitInfoError(si)
|
||||||
|
case *objectV2.ECInfo:
|
||||||
|
ei := objectSDK.NewECInfoFromV2(v)
|
||||||
|
return nil, objectSDK.NewECInfoError(ei)
|
||||||
}
|
}
|
||||||
|
|
||||||
objv2 := new(objectV2.Object)
|
objv2 := new(objectV2.Object)
|
||||||
|
|
|
@ -82,10 +82,13 @@ func (s *Service) Get(req *objectV2.GetRequest, stream objectSvc.GetObjectStream
|
||||||
err = s.svc.Get(stream.Context(), *p)
|
err = s.svc.Get(stream.Context(), *p)
|
||||||
|
|
||||||
var splitErr *objectSDK.SplitInfoError
|
var splitErr *objectSDK.SplitInfoError
|
||||||
|
var ecErr *objectSDK.ECInfoError
|
||||||
|
|
||||||
switch {
|
switch {
|
||||||
case errors.As(err, &splitErr):
|
case errors.As(err, &splitErr):
|
||||||
return stream.Send(splitInfoResponse(splitErr.SplitInfo()))
|
return stream.Send(splitInfoResponse(splitErr.SplitInfo()))
|
||||||
|
case errors.As(err, &ecErr):
|
||||||
|
return stream.Send(ecInfoResponse(ecErr.ECInfo()))
|
||||||
default:
|
default:
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -123,11 +126,16 @@ func (s *Service) Head(ctx context.Context, req *objectV2.HeadRequest) (*objectV
|
||||||
err = s.svc.Head(ctx, *p)
|
err = s.svc.Head(ctx, *p)
|
||||||
|
|
||||||
var splitErr *objectSDK.SplitInfoError
|
var splitErr *objectSDK.SplitInfoError
|
||||||
|
var ecErr *objectSDK.ECInfoError
|
||||||
|
|
||||||
if errors.As(err, &splitErr) {
|
if errors.As(err, &splitErr) {
|
||||||
setSplitInfoHeadResponse(splitErr.SplitInfo(), resp)
|
setSplitInfoHeadResponse(splitErr.SplitInfo(), resp)
|
||||||
err = nil
|
err = nil
|
||||||
}
|
}
|
||||||
|
if errors.As(err, &ecErr) {
|
||||||
|
setECInfoHeadResponse(ecErr.ECInfo(), resp)
|
||||||
|
err = nil
|
||||||
|
}
|
||||||
|
|
||||||
return resp, err
|
return resp, err
|
||||||
}
|
}
|
||||||
|
|
|
@ -270,6 +270,17 @@ func splitInfoResponse(info *objectSDK.SplitInfo) *objectV2.GetResponse {
|
||||||
return resp
|
return resp
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func ecInfoResponse(info *objectSDK.ECInfo) *objectV2.GetResponse {
|
||||||
|
resp := new(objectV2.GetResponse)
|
||||||
|
|
||||||
|
body := new(objectV2.GetResponseBody)
|
||||||
|
resp.SetBody(body)
|
||||||
|
|
||||||
|
body.SetObjectPart(info.ToV2())
|
||||||
|
|
||||||
|
return resp
|
||||||
|
}
|
||||||
|
|
||||||
func splitInfoRangeResponse(info *objectSDK.SplitInfo) *objectV2.GetRangeResponse {
|
func splitInfoRangeResponse(info *objectSDK.SplitInfo) *objectV2.GetRangeResponse {
|
||||||
resp := new(objectV2.GetRangeResponse)
|
resp := new(objectV2.GetRangeResponse)
|
||||||
|
|
||||||
|
@ -285,6 +296,10 @@ func setSplitInfoHeadResponse(info *objectSDK.SplitInfo, resp *objectV2.HeadResp
|
||||||
resp.GetBody().SetHeaderPart(info.ToV2())
|
resp.GetBody().SetHeaderPart(info.ToV2())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func setECInfoHeadResponse(info *objectSDK.ECInfo, resp *objectV2.HeadResponse) {
|
||||||
|
resp.GetBody().SetHeaderPart(info.ToV2())
|
||||||
|
}
|
||||||
|
|
||||||
func toHashResponse(typ refs.ChecksumType, res *getsvc.RangeHashRes) *objectV2.GetRangeHashResponse {
|
func toHashResponse(typ refs.ChecksumType, res *getsvc.RangeHashRes) *objectV2.GetRangeHashResponse {
|
||||||
resp := new(objectV2.GetRangeHashResponse)
|
resp := new(objectV2.GetRangeHashResponse)
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue
We have "erasure coded" here and "erasure-encoded" in CLI print.
Wikipedia uses "erasure-coded" https://en.wikipedia.org/wiki/Erasure_code, let's stick with it.
Agree, updated here in other places.