forked from TrueCloudLab/frostfs-node
WIP: node: Implement Get\Head
requests for EC object #2
30 changed files with 556 additions and 11 deletions
|
@ -99,6 +99,10 @@ func getObject(cmd *cobra.Command, _ []string) {
|
|||
return
|
||||
}
|
||||
|
||||
if ok := printECInfoErr(cmd, err); ok {
|
||||
return
|
||||
}
|
||||
|
||||
commonCmd.ExitOnErr(cmd, "rpc error: %w", err)
|
||||
}
|
||||
|
||||
|
|
|
@ -70,6 +70,10 @@ func getObjectHeader(cmd *cobra.Command, _ []string) {
|
|||
return
|
||||
}
|
||||
|
||||
if ok := printECInfoErr(cmd, err); ok {
|
||||
return
|
||||
}
|
||||
|
||||
commonCmd.ExitOnErr(cmd, "rpc error: %w", err)
|
||||
}
|
||||
|
||||
|
|
|
@ -146,6 +146,50 @@ func marshalSplitInfo(cmd *cobra.Command, info *objectSDK.SplitInfo) ([]byte, er
|
|||
}
|
||||
}
|
||||
|
||||
func printECInfoErr(cmd *cobra.Command, err error) bool {
|
||||
var errECInfo *objectSDK.ECInfoError
|
||||
|
||||
ok := errors.As(err, &errECInfo)
|
||||
|
||||
if ok {
|
||||
cmd.PrintErrln("Object is erasure-encoded, ec information received.")
|
||||
printECInfo(cmd, errECInfo.ECInfo())
|
||||
}
|
||||
|
||||
return ok
|
||||
}
|
||||
|
||||
func printECInfo(cmd *cobra.Command, info *objectSDK.ECInfo) {
|
||||
bs, err := marshalECInfo(cmd, info)
|
||||
commonCmd.ExitOnErr(cmd, "can't marshal split info: %w", err)
|
||||
|
||||
cmd.Println(string(bs))
|
||||
}
|
||||
|
||||
func marshalECInfo(cmd *cobra.Command, info *objectSDK.ECInfo) ([]byte, error) {
|
||||
toJSON, _ := cmd.Flags().GetBool(commonflags.JSON)
|
||||
toProto, _ := cmd.Flags().GetBool("proto")
|
||||
switch {
|
||||
case toJSON && toProto:
|
||||
return nil, errors.New("'--json' and '--proto' flags are mutually exclusive")
|
||||
case toJSON:
|
||||
return info.MarshalJSON()
|
||||
case toProto:
|
||||
return info.Marshal()
|
||||
default:
|
||||
b := bytes.NewBuffer(nil)
|
||||
b.WriteString("Total chunks: " + strconv.Itoa(int(info.Chunks[0].Total)))
|
||||
for _, chunk := range info.Chunks {
|
||||
var id oid.ID
|
||||
if err := id.Decode(chunk.ID.GetValue()); err != nil {
|
||||
return nil, fmt.Errorf("unable to decode chunk id: %w", err)
|
||||
}
|
||||
b.WriteString("\n Index: " + strconv.Itoa(int(chunk.Index)) + " ID: " + id.String())
|
||||
}
|
||||
return b.Bytes(), nil
|
||||
}
|
||||
}
|
||||
|
||||
func getRangeList(cmd *cobra.Command) ([]objectSDK.Range, error) {
|
||||
v := cmd.Flag("range").Value.String()
|
||||
if len(v) == 0 {
|
||||
|
|
|
@ -173,7 +173,7 @@ func initObjectService(c *cfg) {
|
|||
|
||||
sSearchV2 := createSearchSvcV2(sSearch, keyStorage)
|
||||
|
||||
sGet := createGetService(c, keyStorage, traverseGen, c.clientCache)
|
||||
sGet := createGetService(c, keyStorage, traverseGen, c.clientCache, c.cfgObject.cnrSource)
|
||||
|
||||
*c.cfgObject.getSvc = *sGet // need smth better
|
||||
|
||||
|
@ -358,6 +358,7 @@ func createSearchSvcV2(sSearch *searchsvc.Service, keyStorage *util.KeyStorage)
|
|||
|
||||
func createGetService(c *cfg, keyStorage *util.KeyStorage, traverseGen *util.TraverserGenerator,
|
||||
coreConstructor *cache.ClientCache,
|
||||
containerSource containercore.Source,
|
||||
) *getsvc.Service {
|
||||
ls := c.cfgObject.cfgLocalStorage.localStorage
|
||||
|
||||
|
@ -369,6 +370,7 @@ func createGetService(c *cfg, keyStorage *util.KeyStorage, traverseGen *util.Tra
|
|||
placement.SuccessAfter(1),
|
||||
),
|
||||
coreConstructor,
|
||||
containerSource,
|
||||
getsvc.WithLogger(c.log))
|
||||
}
|
||||
|
||||
|
|
|
@ -250,4 +250,4 @@
|
|||
"stopAll": true
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
||||
|
|
5
go.mod
5
go.mod
|
@ -2,6 +2,11 @@ module git.frostfs.info/TrueCloudLab/frostfs-node
|
|||
|
||||
go 1.21
|
||||
|
||||
replace (
|
||||
git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.16.1-0.20240327095603-491a47e7fe24 => /home/annikifa/workspace/frostfs-api-go
|
||||
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20240329104804-ec0cb2169f92 => /home/annikifa/workspace/frostfs-sdk-go
|
||||
)
|
||||
|
||||
require (
|
||||
code.gitea.io/sdk/gitea v0.17.1
|
||||
git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.16.1-0.20240327095603-491a47e7fe24
|
||||
|
|
BIN
go.sum
BIN
go.sum
Binary file not shown.
|
@ -99,9 +99,13 @@ const (
|
|||
GetRemoteCallFailed = "remote call failed"
|
||||
GetCanNotAssembleTheObject = "can not assemble the object"
|
||||
GetTryingToAssembleTheObject = "trying to assemble the object..."
|
||||
GetTryingToAssembleTheECObject = "trying to assemble the ec object..."
|
||||
GetAssemblingSplittedObject = "assembling splitted object..."
|
||||
GetAssemblingECObject = "assembling erasure coded object..."
|
||||
GetAssemblingSplittedObjectCompleted = "assembling splitted object completed"
|
||||
GetAssemblingECObjectCompleted = "assembling erasure coded object completed"
|
||||
GetFailedToAssembleSplittedObject = "failed to assemble splitted object"
|
||||
GetFailedToAssembleECObject = "failed to assemble erasure coded object"
|
||||
GetCouldNotGenerateContainerTraverser = "could not generate container traverser"
|
||||
GetCouldNotConstructRemoteNodeClient = "could not construct remote node client"
|
||||
GetCouldNotWriteHeader = "could not write header"
|
||||
|
@ -111,6 +115,7 @@ const (
|
|||
GetCompletingTheOperation = "completing the operation"
|
||||
GetRequestedObjectWasMarkedAsRemoved = "requested object was marked as removed"
|
||||
GetRequestedObjectIsVirtual = "requested object is virtual"
|
||||
GetRequestedObjectIsEC = "requested object is erasure coded"
|
||||
GetRequestedRangeIsOutOfObjectBounds = "requested range is out of object bounds"
|
||||
PutAdditionalContainerBroadcastFailure = "additional container broadcast failure"
|
||||
SearchReturnResultDirectly = "return result directly"
|
||||
|
|
|
@ -88,6 +88,10 @@ func (e *StorageEngine) get(ctx context.Context, prm GetPrm) (GetRes, error) {
|
|||
return GetRes{}, logicerr.Wrap(objectSDK.NewSplitInfoError(it.SplitInfo))
|
||||
}
|
||||
|
||||
if it.ECInfo != nil {
|
||||
return GetRes{}, logicerr.Wrap(objectSDK.NewECInfoError(it.ECInfo))
|
||||
}
|
||||
|
||||
if it.ObjectExpired {
|
||||
return GetRes{}, errNotFound
|
||||
}
|
||||
|
@ -119,6 +123,7 @@ func (e *StorageEngine) get(ctx context.Context, prm GetPrm) (GetRes, error) {
|
|||
type getShardIterator struct {
|
||||
Object *objectSDK.Object
|
||||
SplitInfo *objectSDK.SplitInfo
|
||||
ECInfo *objectSDK.ECInfo
|
||||
OutError error
|
||||
ShardWithMeta hashedShard
|
||||
MetaError error
|
||||
|
@ -130,6 +135,7 @@ type getShardIterator struct {
|
|||
Engine *StorageEngine
|
||||
|
||||
splitInfoErr *objectSDK.SplitInfoError
|
||||
ecInfoErr *objectSDK.ECInfoError
|
||||
}
|
||||
|
||||
func (i *getShardIterator) tryGetWithMeta(ctx context.Context) {
|
||||
|
@ -164,6 +170,14 @@ func (i *getShardIterator) tryGetWithMeta(ctx context.Context) {
|
|||
|
||||
// stop iterating over shards if SplitInfo structure is complete
|
||||
return withLink && withLast
|
||||
case errors.As(err, &i.ecInfoErr):
|
||||
if i.ECInfo == nil {
|
||||
i.ECInfo = objectSDK.NewECInfo()
|
||||
}
|
||||
|
||||
util.MergeECInfo(i.ecInfoErr.ECInfo(), i.ECInfo)
|
||||
// stop iterating over shards if ECInfo structure is complete
|
||||
return len(i.ECInfo.Chunks) == int(i.ECInfo.Chunks[0].Total)
|
||||
case client.IsErrObjectAlreadyRemoved(err):
|
||||
i.OutError = err
|
||||
return true // stop, return it back
|
||||
|
|
|
@ -76,6 +76,8 @@ func (e *StorageEngine) head(ctx context.Context, prm HeadPrm) (HeadRes, error)
|
|||
head *objectSDK.Object
|
||||
siErr *objectSDK.SplitInfoError
|
||||
outSI *objectSDK.SplitInfo
|
||||
eiErr *objectSDK.ECInfoError
|
||||
outEI *objectSDK.ECInfo
|
||||
|
||||
outError error = new(apistatus.ObjectNotFound)
|
||||
)
|
||||
|
@ -106,9 +108,15 @@ func (e *StorageEngine) head(ctx context.Context, prm HeadPrm) (HeadRes, error)
|
|||
}
|
||||
|
||||
return false
|
||||
case errors.As(err, &eiErr):
|
||||
if outEI == nil {
|
||||
outEI = objectSDK.NewECInfo()
|
||||
}
|
||||
util.MergeECInfo(eiErr.ECInfo(), outEI)
|
||||
// stop iterating over shards if ECInfo structure is complete
|
||||
return len(outEI.Chunks) == int(outEI.Chunks[0].Total)
|
||||
case client.IsErrObjectAlreadyRemoved(err):
|
||||
outError = err
|
||||
|
||||
return true // stop, return it back
|
||||
case shard.IsErrObjectExpired(err):
|
||||
// object is found but should not
|
||||
|
@ -123,15 +131,14 @@ func (e *StorageEngine) head(ctx context.Context, prm HeadPrm) (HeadRes, error)
|
|||
}
|
||||
|
||||
head = res.Object()
|
||||
|
||||
return true
|
||||
})
|
||||
|
||||
if outSI != nil {
|
||||
return HeadRes{}, logicerr.Wrap(objectSDK.NewSplitInfoError(outSI))
|
||||
}
|
||||
|
||||
if head == nil {
|
||||
} else if outEI != nil {
|
||||
return HeadRes{}, logicerr.Wrap(objectSDK.NewECInfoError(outEI))
|
||||
} else if head == nil {
|
||||
return HeadRes{}, outError
|
||||
}
|
||||
|
||||
|
|
|
@ -108,6 +108,10 @@ func (db *DB) exists(tx *bbolt.Tx, addr oid.Address, currEpoch uint64) (exists b
|
|||
|
||||
return false, logicerr.Wrap(objectSDK.NewSplitInfoError(splitInfo))
|
||||
}
|
||||
// if parent bucket is empty, then check if object exists in ec bucket
|
||||
if data := getFromBucket(tx, ecInfoBucketName(cnr, key), objKey); len(data) != 0 {
|
||||
return false, getECInfoError(tx, cnr, data)
|
||||
}
|
||||
|
||||
// if parent bucket is empty, then check if object exists in typed buckets
|
||||
return firstIrregularObjectType(tx, cnr, objKey) != objectSDK.TypeRegular, nil
|
||||
|
|
|
@ -5,6 +5,8 @@ import (
|
|||
"fmt"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/refs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||
|
@ -110,6 +112,11 @@ func (db *DB) get(tx *bbolt.Tx, addr oid.Address, key []byte, checkStatus, raw b
|
|||
return obj, obj.Unmarshal(data)
|
||||
}
|
||||
|
||||
data = getFromBucket(tx, ecInfoBucketName(cnr, bucketName), key)
|
||||
if len(data) != 0 {
|
||||
return nil, getECInfoError(tx, cnr, data)
|
||||
}
|
||||
|
||||
// if not found then check in tombstone index
|
||||
data = getFromBucket(tx, tombstoneBucketName(cnr, bucketName), key)
|
||||
if len(data) != 0 {
|
||||
|
@ -185,3 +192,29 @@ func getSplitInfoError(tx *bbolt.Tx, cnr cid.ID, key []byte) error {
|
|||
|
||||
return logicerr.Wrap(new(apistatus.ObjectNotFound))
|
||||
}
|
||||
|
||||
func getECInfoError(tx *bbolt.Tx, cnr cid.ID, data []byte) error {
|
||||
offset := 0
|
||||
ecInfo := objectSDK.NewECInfo()
|
||||
for offset < len(data) {
|
||||
key := data[offset : offset+objectKeySize]
|
||||
// check in primary index
|
||||
ojbData := getFromBucket(tx, primaryBucketName(cnr, make([]byte, bucketKeySize)), key)
|
||||
if len(data) != 0 {
|
||||
obj := objectSDK.New()
|
||||
if err := obj.Unmarshal(ojbData); err != nil {
|
||||
return err
|
||||
}
|
||||
chunk := object.ECChunk{}
|
||||
id, _ := obj.ID()
|
||||
objV2 := new(refs.ObjectID)
|
||||
id.WriteToV2(objV2)
|
||||
chunk.ID = *objV2
|
||||
chunk.Index = obj.ECHeader().Index()
|
||||
chunk.Total = obj.ECHeader().Total()
|
||||
ecInfo.Chunks = append(ecInfo.Chunks, chunk)
|
||||
}
|
||||
offset += objectKeySize
|
||||
}
|
||||
return logicerr.Wrap(objectSDK.NewECInfoError(ecInfo))
|
||||
}
|
||||
|
|
|
@ -3,6 +3,7 @@ package meta_test
|
|||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"os"
|
||||
"runtime"
|
||||
|
@ -15,8 +16,10 @@ import (
|
|||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
||||
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
|
||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/erasurecode"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
|
||||
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
|
@ -109,6 +112,42 @@ func TestDB_Get(t *testing.T) {
|
|||
require.True(t, binaryEqual(child.CutPayload(), newChild))
|
||||
})
|
||||
|
||||
t.Run("put erasure coded object", func(t *testing.T) {
|
||||
cnr := cidtest.ID()
|
||||
virtual := testutil.GenerateObjectWithCID(cnr)
|
||||
c, err := erasurecode.NewConstructor(3, 1)
|
||||
require.NoError(t, err)
|
||||
pk, err := keys.NewPrivateKey()
|
||||
require.NoError(t, err)
|
||||
parts, err := c.Split(virtual, &pk.PrivateKey)
|
||||
require.NoError(t, err)
|
||||
for _, part := range parts {
|
||||
err = putBig(db, part)
|
||||
var eiError *objectSDK.ECInfoError
|
||||
if err != nil && !errors.As(err, &eiError) {
|
||||
require.NoError(t, err)
|
||||
}
|
||||
}
|
||||
_, err = metaGet(db, object.AddressOf(virtual), true)
|
||||
var eiError *objectSDK.ECInfoError
|
||||
require.ErrorAs(t, err, &eiError)
|
||||
require.Equal(t, len(eiError.ECInfo().Chunks), len(parts))
|
||||
for _, chunk := range eiError.ECInfo().Chunks {
|
||||
var found bool
|
||||
for _, part := range parts {
|
||||
partID, _ := part.ID()
|
||||
var chunkID oid.ID
|
||||
require.NoError(t, chunkID.ReadFromV2(chunk.ID))
|
||||
if chunkID.Equals(partID) {
|
||||
found = true
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
require.Fail(t, "chunk not found")
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("get removed object", func(t *testing.T) {
|
||||
obj := oidtest.Address()
|
||||
ts := oidtest.Address()
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package meta
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/binary"
|
||||
"errors"
|
||||
|
@ -264,6 +265,13 @@ func putUniqueIndexes(
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if ecHead := obj.GetECHeader(); ecHead != nil {
|
||||
err = putECInfo(tx, cnr, objKey, ecHead)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
|
@ -571,3 +579,34 @@ func isLinkObject(obj *objectSDK.Object) bool {
|
|||
func isLastObject(obj *objectSDK.Object) bool {
|
||||
return len(obj.Children()) == 0 && obj.Parent() != nil
|
||||
}
|
||||
|
||||
func putECInfo(tx *bbolt.Tx,
|
||||
cnr cid.ID, objKey []byte,
|
||||
ecHead *objectSDK.ECHeader,
|
||||
) error {
|
||||
parentID := objectKey(ecHead.Parent(), make([]byte, objectKeySize))
|
||||
bucketName := make([]byte, bucketKeySize)
|
||||
|
||||
val := getFromBucket(tx, ecInfoBucketName(cnr, bucketName), parentID)
|
||||
if len(val) == 0 {
|
||||
val = objKey
|
||||
} else {
|
||||
offset := 0
|
||||
found := false
|
||||
for offset < len(val) {
|
||||
if bytes.Equal(objKey, val[offset:offset+objectKeySize]) {
|
||||
found = true
|
||||
break
|
||||
}
|
||||
offset += objectKeySize
|
||||
}
|
||||
if !found {
|
||||
val = append(val, objKey...)
|
||||
}
|
||||
}
|
||||
return putUniqueIndexItem(tx, namedBucketItem{
|
||||
name: ecInfoBucketName(cnr, make([]byte, bucketKeySize)),
|
||||
key: parentID,
|
||||
val: val,
|
||||
})
|
||||
}
|
||||
|
|
|
@ -119,6 +119,11 @@ const (
|
|||
// Key: container ID + type
|
||||
// Value: container size in bytes as little-endian uint64
|
||||
containerCountersPrefix
|
||||
|
||||
// ecInfoPrefix is used for storing relation between EC parent id and chunk id.
|
||||
// Key: container ID + type
|
||||
// Value: Object id
|
||||
ecInfoPrefix
|
||||
)
|
||||
|
||||
const (
|
||||
|
@ -190,6 +195,11 @@ func splitBucketName(cnr cid.ID, key []byte) []byte {
|
|||
return bucketName(cnr, splitPrefix, key)
|
||||
}
|
||||
|
||||
// ecInfoBucketName returns <CID>_ecinfo.
|
||||
func ecInfoBucketName(cnr cid.ID, key []byte) []byte {
|
||||
return bucketName(cnr, ecInfoPrefix, key)
|
||||
}
|
||||
|
||||
// addressKey returns key for K-V tables when key is a whole address.
|
||||
func addressKey(addr oid.Address, key []byte) []byte {
|
||||
addr.Container().Encode(key)
|
||||
|
|
28
pkg/local_object_storage/util/ecinfo.go
Normal file
28
pkg/local_object_storage/util/ecinfo.go
Normal file
|
@ -0,0 +1,28 @@
|
|||
package util
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
|
||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
)
|
||||
|
||||
// MergeECInfo ignores conflicts and rewrites `to` with non empty values
|
||||
// from `from`.
|
||||
func MergeECInfo(from, to *objectSDK.ECInfo) *objectSDK.ECInfo {
|
||||
var ext []object.ECChunk
|
||||
for _, fchunk := range from.Chunks {
|
||||
add := true
|
||||
for _, tchunk := range to.Chunks {
|
||||
if bytes.Equal(tchunk.ID.GetValue(), fchunk.ID.GetValue()) {
|
||||
add = false
|
||||
break
|
||||
}
|
||||
}
|
||||
if add {
|
||||
ext = append(ext, fchunk)
|
||||
}
|
||||
}
|
||||
to.Chunks = append(to.Chunks, ext...)
|
||||
return to
|
||||
}
|
58
pkg/local_object_storage/util/ecinfo_test.go
Normal file
58
pkg/local_object_storage/util/ecinfo_test.go
Normal file
|
@ -0,0 +1,58 @@
|
|||
package util
|
||||
|
||||
import (
|
||||
"crypto/rand"
|
||||
"testing"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/refs"
|
||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestMergeECInfo(t *testing.T) {
|
||||
id := generateV2ID()
|
||||
target := objectSDK.NewECInfo()
|
||||
var chunk object.ECChunk
|
||||
chunk.Total = 2
|
||||
chunk.Index = 0
|
||||
chunk.ID = id
|
||||
target.Chunks = append(target.Chunks, chunk)
|
||||
|
||||
t.Run("merge empty", func(t *testing.T) {
|
||||
to := objectSDK.NewECInfo()
|
||||
|
||||
result := MergeECInfo(target, to)
|
||||
require.Equal(t, result, target)
|
||||
})
|
||||
|
||||
t.Run("merge existed", func(t *testing.T) {
|
||||
to := objectSDK.NewECInfo()
|
||||
to.Chunks = append(to.Chunks, chunk)
|
||||
|
||||
result := MergeECInfo(target, to)
|
||||
require.Equal(t, result, target)
|
||||
})
|
||||
t.Run("merge extend", func(t *testing.T) {
|
||||
to := objectSDK.NewECInfo()
|
||||
id := generateV2ID()
|
||||
var chunk object.ECChunk
|
||||
chunk.Total = 2
|
||||
chunk.Index = 1
|
||||
chunk.ID = id
|
||||
to.Chunks = append(to.Chunks, chunk)
|
||||
|
||||
result := MergeECInfo(target, to)
|
||||
require.Equal(t, len(result.Chunks), 2)
|
||||
})
|
||||
}
|
||||
|
||||
func generateV2ID() refs.ObjectID {
|
||||
var buf [32]byte
|
||||
_, _ = rand.Read(buf[:])
|
||||
|
||||
var id refs.ObjectID
|
||||
id.SetValue(buf[:])
|
||||
|
||||
return id
|
||||
}
|
6
pkg/network/cache/multi.go
vendored
6
pkg/network/cache/multi.go
vendored
|
@ -167,8 +167,9 @@ func (x *multiClient) iterateClients(ctx context.Context, f func(clientcore.Clie
|
|||
// from the SDK client; should not be considered
|
||||
// as a connection error
|
||||
var siErr *objectSDK.SplitInfoError
|
||||
var eiErr *objectSDK.ECInfoError
|
||||
|
||||
success := err == nil || errors.Is(err, context.Canceled) || errors.As(err, &siErr)
|
||||
success := err == nil || errors.Is(err, context.Canceled) || errors.As(err, &siErr) || errors.As(err, &eiErr)
|
||||
if success || firstErr == nil || errors.Is(firstErr, errRecentlyFailed) {
|
||||
firstErr = err
|
||||
}
|
||||
|
@ -196,7 +197,8 @@ func (x *multiClient) ReportError(err error) {
|
|||
// from the SDK client; should not be considered
|
||||
// as a connection error
|
||||
var siErr *objectSDK.SplitInfoError
|
||||
if errors.As(err, &siErr) {
|
||||
var eiErr *objectSDK.ECInfoError
|
||||
if errors.As(err, &siErr) || errors.As(err, &eiErr) {
|
||||
return
|
||||
}
|
||||
|
||||
|
|
87
pkg/services/object/get/assembleec.go
Normal file
87
pkg/services/object/get/assembleec.go
Normal file
|
@ -0,0 +1,87 @@
|
|||
package getsvc
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
func (r *request) assembleEC(ctx context.Context) {
|
||||
if r.isRaw() {
|
||||
r.log.Debug(logs.GetCanNotAssembleTheObject)
|
||||
return
|
||||
}
|
||||
|
||||
// Any access tokens are not expected to be used in the assembly process:
|
||||
// - there is no requirement to specify child objects in session/bearer
|
||||
// token for `GET`/`GETRANGE`/`RANGEHASH` requests in the API protocol,
|
||||
// and, therefore, their missing in the original request should not be
|
||||
// considered as error; on the other hand, without session for every child
|
||||
// object, it is impossible to attach bearer token in the new generated
|
||||
// requests correctly because the token has not been issued for that node's
|
||||
// key;
|
||||
// - the assembly process is expected to be handled on a container node
|
||||
// only since the requests forwarding mechanism presentation; such the
|
||||
// node should have enough rights for getting any child object by design.
|
||||
r.prm.common.ForgetTokens()
|
||||
|
||||
// Do not use forwarding during assembly stage.
|
||||
// Request forwarding closure inherited in produced
|
||||
// `execCtx` so it should be disabled there.
|
||||
r.disableForwarding()
|
||||
|
||||
r.log.Debug(logs.GetTryingToAssembleTheECObject)
|
||||
|
||||
assembler := newAssemblerEC(r.address(), r.infoEC, r.ctxRange(), r, r.containerSource)
|
||||
|
||||
r.log.Debug(logs.GetAssemblingECObject,
|
||||
zap.Stringer("address", r.address()),
|
||||
zap.Uint64("range_offset", r.ctxRange().GetOffset()),
|
||||
zap.Uint64("range_length", r.ctxRange().GetLength()),
|
||||
)
|
||||
defer r.log.Debug(logs.GetAssemblingECObjectCompleted,
|
||||
zap.Stringer("address", r.address()),
|
||||
zap.Uint64("range_offset", r.ctxRange().GetOffset()),
|
||||
zap.Uint64("range_length", r.ctxRange().GetLength()),
|
||||
)
|
||||
|
||||
obj, err := assembler.Assemble(ctx, r.prm.objWriter, r.headOnly())
|
||||
if err != nil {
|
||||
r.log.Warn(logs.GetFailedToAssembleECObject,
|
||||
zap.Error(err),
|
||||
zap.Stringer("address", r.address()),
|
||||
zap.Uint64("range_offset", r.ctxRange().GetOffset()),
|
||||
zap.Uint64("range_length", r.ctxRange().GetLength()),
|
||||
)
|
||||
}
|
||||
|
||||
var errRemovedRemote *apistatus.ObjectAlreadyRemoved
|
||||
var errOutOfRangeRemote *apistatus.ObjectOutOfRange
|
||||
var errRemovedLocal *apistatus.ObjectAlreadyRemoved
|
||||
var errOutOfRangeLocal *apistatus.ObjectOutOfRange
|
||||
|
||||
switch {
|
||||
default:
|
||||
r.status = statusUndefined
|
||||
r.err = err
|
||||
case err == nil:
|
||||
r.status = statusOK
|
||||
r.err = nil
|
||||
r.collectedObject = obj
|
||||
case errors.As(err, &errRemovedRemote):
|
||||
r.status = statusINHUMED
|
||||
r.err = errRemovedRemote
|
||||
case errors.As(err, &errRemovedLocal):
|
||||
r.status = statusINHUMED
|
||||
r.err = errRemovedLocal
|
||||
case errors.As(err, &errOutOfRangeRemote):
|
||||
r.status = statusOutOfRange
|
||||
r.err = errOutOfRangeRemote
|
||||
case errors.As(err, &errOutOfRangeLocal):
|
||||
r.status = statusOutOfRange
|
||||
r.err = errOutOfRangeLocal
|
||||
}
|
||||
}
|
93
pkg/services/object/get/assemblerec.go
Normal file
93
pkg/services/object/get/assemblerec.go
Normal file
|
@ -0,0 +1,93 @@
|
|||
package getsvc
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/policy"
|
||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/erasurecode"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
)
|
||||
|
||||
type assemblerec struct {
|
||||
addr oid.Address
|
||||
ecInfo *objectSDK.ECInfo
|
||||
rng *objectSDK.Range
|
||||
objGetter objectGetter
|
||||
cs container.Source
|
||||
}
|
||||
|
||||
func newAssemblerEC(
|
||||
addr oid.Address,
|
||||
ecInfo *objectSDK.ECInfo,
|
||||
rng *objectSDK.Range,
|
||||
objGetter objectGetter,
|
||||
cs container.Source,
|
||||
) *assemblerec {
|
||||
return &assemblerec{
|
||||
addr: addr,
|
||||
rng: rng,
|
||||
ecInfo: ecInfo,
|
||||
objGetter: objGetter,
|
||||
cs: cs,
|
||||
}
|
||||
}
|
||||
|
||||
// Assemble assembles erasure coded object and writes it's content to ObjectWriter.
|
||||
// It returns parent object.
|
||||
func (a *assemblerec) Assemble(ctx context.Context, writer ObjectWriter, headOnly bool) (*objectSDK.Object, error) {
|
||||
parts := make([]*objectSDK.Object, int(a.ecInfo.Chunks[0].Total))
|
||||
for _, chunk := range a.ecInfo.Chunks {
|
||||
objID := new(oid.ID)
|
||||
err := objID.ReadFromV2(chunk.ID)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("invalid object ID: %w", err)
|
||||
}
|
||||
var obj *objectSDK.Object
|
||||
if headOnly {
|
||||
obj, err = a.objGetter.HeadObject(ctx, *objID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
} else {
|
||||
sow := NewSimpleObjectWriter()
|
||||
obj, err = a.objGetter.GetObjectAndWritePayload(ctx, *objID, nil, sow)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
obj.SetPayload(sow.pld)
|
||||
}
|
||||
parts[chunk.Index] = obj
|
||||
}
|
||||
cnt, err := a.cs.Get(a.addr.Container())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
c, err := erasurecode.NewConstructor(
|
||||
policy.ECDataCount(cnt.Value.PlacementPolicy()),
|
||||
policy.ECParityCount(cnt.Value.PlacementPolicy()),
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if headOnly {
|
||||
obj, err := c.ReconstructHeader(parts)
|
||||
if err == nil {
|
||||
return obj, writer.WriteHeader(ctx, obj)
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
obj, err := c.Reconstruct(parts)
|
||||
if err == nil {
|
||||
err = writer.WriteHeader(ctx, obj.CutPayload())
|
||||
if err == nil {
|
||||
err = writer.WriteChunk(ctx, obj.Payload())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
}
|
||||
return obj, err
|
||||
}
|
|
@ -73,9 +73,11 @@ func (s *Service) get(ctx context.Context, prm RequestParameters) error {
|
|||
remoteStorageConstructor: s.remoteStorageConstructor,
|
||||
epochSource: s.epochSource,
|
||||
localStorage: s.localStorage,
|
||||
containerSource: s.containerSource,
|
||||
|
||||
prm: prm,
|
||||
infoSplit: objectSDK.NewSplitInfo(),
|
||||
infoEC: objectSDK.NewECInfo(),
|
||||
}
|
||||
|
||||
exec.setLogger(s.log)
|
||||
|
@ -106,6 +108,16 @@ func (exec *request) analyzeStatus(ctx context.Context, execCnr bool) {
|
|||
exec.assemble(ctx)
|
||||
case statusOutOfRange:
|
||||
exec.log.Debug(logs.GetRequestedRangeIsOutOfObjectBounds)
|
||||
case statusEC:
|
||||
if !exec.isLocal() {
|
||||
if execCnr {
|
||||
exec.executeOnContainer(ctx)
|
||||
exec.analyzeStatus(ctx, false)
|
||||
} else {
|
||||
exec.log.Debug(logs.GetRequestedObjectIsEC)
|
||||
exec.assembleEC(ctx)
|
||||
}
|
||||
}
|
||||
default:
|
||||
exec.log.Debug(logs.OperationFinishedWithError,
|
||||
zap.Error(exec.err),
|
||||
|
|
|
@ -5,6 +5,7 @@ import (
|
|||
"errors"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
|
@ -22,6 +23,7 @@ func (r *request) executeLocal(ctx context.Context) {
|
|||
r.collectedObject, err = r.get(ctx)
|
||||
|
||||
var errSplitInfo *objectSDK.SplitInfoError
|
||||
var errECInfo *objectSDK.ECInfoError
|
||||
var errRemoved *apistatus.ObjectAlreadyRemoved
|
||||
var errOutOfRange *apistatus.ObjectOutOfRange
|
||||
|
||||
|
@ -42,6 +44,10 @@ func (r *request) executeLocal(ctx context.Context) {
|
|||
r.status = statusVIRTUAL
|
||||
mergeSplitInfo(r.splitInfo(), errSplitInfo.SplitInfo())
|
||||
r.err = objectSDK.NewSplitInfoError(r.infoSplit)
|
||||
case errors.As(err, &errECInfo):
|
||||
r.status = statusEC
|
||||
util.MergeECInfo(errECInfo.ECInfo(), r.infoEC)
|
||||
r.err = objectSDK.NewECInfoError(r.infoEC)
|
||||
case errors.As(err, &errOutOfRange):
|
||||
r.status = statusOutOfRange
|
||||
r.err = errOutOfRange
|
||||
|
|
|
@ -7,6 +7,7 @@ import (
|
|||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
|
@ -27,15 +28,18 @@ func (r *request) processNode(ctx context.Context, info client.NodeInfo) bool {
|
|||
obj, err := r.getRemote(ctx, rs, info)
|
||||
|
||||
var errSplitInfo *objectSDK.SplitInfoError
|
||||
var errECInfo *objectSDK.ECInfoError
|
||||
var errRemoved *apistatus.ObjectAlreadyRemoved
|
||||
var errOutOfRange *apistatus.ObjectOutOfRange
|
||||
|
||||
switch {
|
||||
default:
|
||||
r.log.Debug(logs.GetRemoteCallFailed, zap.Error(err))
|
||||
if r.status == statusEC {
|
||||
return len(r.infoEC.Chunks) == int(r.infoEC.Chunks[0].Total)
|
||||
}
|
||||
r.status = statusUndefined
|
||||
r.err = new(apistatus.ObjectNotFound)
|
||||
|
||||
r.log.Debug(logs.GetRemoteCallFailed, zap.Error(err))
|
||||
case err == nil:
|
||||
r.status = statusOK
|
||||
r.err = nil
|
||||
|
@ -57,6 +61,13 @@ func (r *request) processNode(ctx context.Context, info client.NodeInfo) bool {
|
|||
r.status = statusVIRTUAL
|
||||
mergeSplitInfo(r.splitInfo(), errSplitInfo.SplitInfo())
|
||||
r.err = objectSDK.NewSplitInfoError(r.infoSplit)
|
||||
case errors.As(err, &errECInfo):
|
||||
r.status = statusEC
|
||||
util.MergeECInfo(r.infoEC, errECInfo.ECInfo())
|
||||
r.infoEC = errECInfo.ECInfo()
|
||||
r.err = objectSDK.NewECInfoError(r.infoEC)
|
||||
// TODO maybe we don't need to get all chunks here
|
||||
return len(r.infoEC.Chunks) == int(r.infoEC.Chunks[0].Total)
|
||||
}
|
||||
|
||||
return r.status != statusUndefined
|
||||
|
|
|
@ -6,6 +6,7 @@ import (
|
|||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
clientcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||
|
@ -22,6 +23,8 @@ type request struct {
|
|||
|
||||
infoSplit *objectSDK.SplitInfo
|
||||
|
||||
infoEC *objectSDK.ECInfo
|
||||
|
||||
log *logger.Logger
|
||||
|
||||
collectedObject *objectSDK.Object
|
||||
|
@ -33,6 +36,7 @@ type request struct {
|
|||
traverserGenerator traverserGenerator
|
||||
remoteStorageConstructor remoteStorageConstructor
|
||||
localStorage localStorage
|
||||
containerSource container.Source
|
||||
}
|
||||
|
||||
func (r *request) setLogger(l *logger.Logger) {
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package getsvc
|
||||
|
||||
import (
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
@ -16,6 +17,7 @@ type Service struct {
|
|||
epochSource epochSource
|
||||
keyStore keyStorage
|
||||
remoteStorageConstructor remoteStorageConstructor
|
||||
containerSource container.Source
|
||||
}
|
||||
|
||||
// New creates, initializes and returns utility serving
|
||||
|
@ -26,6 +28,7 @@ func New(
|
|||
e localStorageEngine,
|
||||
tg traverserGenerator,
|
||||
cc clientConstructor,
|
||||
cs container.Source,
|
||||
opts ...Option,
|
||||
) *Service {
|
||||
result := &Service{
|
||||
|
@ -39,6 +42,7 @@ func New(
|
|||
remoteStorageConstructor: &multiclientRemoteStorageConstructor{
|
||||
clientConstructor: cc,
|
||||
},
|
||||
containerSource: cs,
|
||||
}
|
||||
for _, option := range opts {
|
||||
option(result)
|
||||
|
|
|
@ -6,6 +6,7 @@ const (
|
|||
statusINHUMED
|
||||
statusVIRTUAL
|
||||
statusOutOfRange
|
||||
statusEC
|
||||
)
|
||||
|
||||
type statusError struct {
|
||||
|
|
|
@ -166,6 +166,9 @@ func (f *getRequestForwarder) readStream(ctx context.Context, c client.MultiAddr
|
|||
case *objectV2.SplitInfo:
|
||||
si := objectSDK.NewSplitInfoFromV2(v)
|
||||
return objectSDK.NewSplitInfoError(si)
|
||||
case *objectV2.ECInfo:
|
||||
ei := objectSDK.NewECInfoFromV2(v)
|
||||
return objectSDK.NewECInfoError(ei)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
|
|
|
@ -84,6 +84,9 @@ func (f *headRequestForwarder) forwardRequestToNode(ctx context.Context, addr ne
|
|||
case *objectV2.SplitInfo:
|
||||
si := objectSDK.NewSplitInfoFromV2(v)
|
||||
return nil, objectSDK.NewSplitInfoError(si)
|
||||
case *objectV2.ECInfo:
|
||||
ei := objectSDK.NewECInfoFromV2(v)
|
||||
return nil, objectSDK.NewECInfoError(ei)
|
||||
}
|
||||
|
||||
objv2 := new(objectV2.Object)
|
||||
|
|
|
@ -82,10 +82,13 @@ func (s *Service) Get(req *objectV2.GetRequest, stream objectSvc.GetObjectStream
|
|||
err = s.svc.Get(stream.Context(), *p)
|
||||
|
||||
var splitErr *objectSDK.SplitInfoError
|
||||
var ecErr *objectSDK.ECInfoError
|
||||
|
||||
switch {
|
||||
case errors.As(err, &splitErr):
|
||||
return stream.Send(splitInfoResponse(splitErr.SplitInfo()))
|
||||
case errors.As(err, &ecErr):
|
||||
return stream.Send(ecInfoResponse(ecErr.ECInfo()))
|
||||
default:
|
||||
return err
|
||||
}
|
||||
|
@ -123,11 +126,16 @@ func (s *Service) Head(ctx context.Context, req *objectV2.HeadRequest) (*objectV
|
|||
err = s.svc.Head(ctx, *p)
|
||||
|
||||
var splitErr *objectSDK.SplitInfoError
|
||||
var ecErr *objectSDK.ECInfoError
|
||||
|
||||
if errors.As(err, &splitErr) {
|
||||
setSplitInfoHeadResponse(splitErr.SplitInfo(), resp)
|
||||
err = nil
|
||||
}
|
||||
if errors.As(err, &ecErr) {
|
||||
setECInfoHeadResponse(ecErr.ECInfo(), resp)
|
||||
err = nil
|
||||
}
|
||||
|
||||
return resp, err
|
||||
}
|
||||
|
|
|
@ -270,6 +270,17 @@ func splitInfoResponse(info *objectSDK.SplitInfo) *objectV2.GetResponse {
|
|||
return resp
|
||||
}
|
||||
|
||||
func ecInfoResponse(info *objectSDK.ECInfo) *objectV2.GetResponse {
|
||||
resp := new(objectV2.GetResponse)
|
||||
|
||||
body := new(objectV2.GetResponseBody)
|
||||
resp.SetBody(body)
|
||||
|
||||
body.SetObjectPart(info.ToV2())
|
||||
|
||||
return resp
|
||||
}
|
||||
|
||||
func splitInfoRangeResponse(info *objectSDK.SplitInfo) *objectV2.GetRangeResponse {
|
||||
resp := new(objectV2.GetRangeResponse)
|
||||
|
||||
|
@ -285,6 +296,10 @@ func setSplitInfoHeadResponse(info *objectSDK.SplitInfo, resp *objectV2.HeadResp
|
|||
resp.GetBody().SetHeaderPart(info.ToV2())
|
||||
}
|
||||
|
||||
func setECInfoHeadResponse(info *objectSDK.ECInfo, resp *objectV2.HeadResponse) {
|
||||
resp.GetBody().SetHeaderPart(info.ToV2())
|
||||
}
|
||||
|
||||
func toHashResponse(typ refs.ChecksumType, res *getsvc.RangeHashRes) *objectV2.GetRangeHashResponse {
|
||||
resp := new(objectV2.GetRangeHashResponse)
|
||||
|
||||
|
|
Loading…
Reference in a new issue