node: Implement Get\Head
requests for EC object #1103
|
@ -16,7 +16,7 @@ repos:
|
|||
- id: trailing-whitespace
|
||||
args: [--markdown-linebreak-ext=md]
|
||||
- id: end-of-file-fixer
|
||||
exclude: ".key$"
|
||||
exclude: "(.key|.svg)$"
|
||||
|
||||
- repo: https://github.com/shellcheck-py/shellcheck-py
|
||||
rev: v0.9.0.6
|
||||
|
|
|
@ -99,6 +99,10 @@ func getObject(cmd *cobra.Command, _ []string) {
|
|||
return
|
||||
}
|
||||
|
||||
if ok := printECInfoErr(cmd, err); ok {
|
||||
return
|
||||
}
|
||||
|
||||
commonCmd.ExitOnErr(cmd, "rpc error: %w", err)
|
||||
}
|
||||
|
||||
|
|
|
@ -70,6 +70,10 @@ func getObjectHeader(cmd *cobra.Command, _ []string) {
|
|||
return
|
||||
}
|
||||
|
||||
if ok := printECInfoErr(cmd, err); ok {
|
||||
return
|
||||
}
|
||||
|
||||
commonCmd.ExitOnErr(cmd, "rpc error: %w", err)
|
||||
}
|
||||
|
||||
|
|
|
@ -146,6 +146,50 @@ func marshalSplitInfo(cmd *cobra.Command, info *objectSDK.SplitInfo) ([]byte, er
|
|||
}
|
||||
}
|
||||
|
||||
func printECInfoErr(cmd *cobra.Command, err error) bool {
|
||||
var errECInfo *objectSDK.ECInfoError
|
||||
|
||||
ok := errors.As(err, &errECInfo)
|
||||
|
||||
if ok {
|
||||
cmd.PrintErrln("Object is erasure-encoded, ec information received.")
|
||||
printECInfo(cmd, errECInfo.ECInfo())
|
||||
}
|
||||
|
||||
return ok
|
||||
}
|
||||
|
||||
func printECInfo(cmd *cobra.Command, info *objectSDK.ECInfo) {
|
||||
bs, err := marshalECInfo(cmd, info)
|
||||
commonCmd.ExitOnErr(cmd, "can't marshal split info: %w", err)
|
||||
|
||||
cmd.Println(string(bs))
|
||||
}
|
||||
|
||||
func marshalECInfo(cmd *cobra.Command, info *objectSDK.ECInfo) ([]byte, error) {
|
||||
toJSON, _ := cmd.Flags().GetBool(commonflags.JSON)
|
||||
toProto, _ := cmd.Flags().GetBool("proto")
|
||||
switch {
|
||||
case toJSON && toProto:
|
||||
return nil, errors.New("'--json' and '--proto' flags are mutually exclusive")
|
||||
case toJSON:
|
||||
return info.MarshalJSON()
|
||||
case toProto:
|
||||
return info.Marshal()
|
||||
default:
|
||||
b := bytes.NewBuffer(nil)
|
||||
b.WriteString("Total chunks: " + strconv.Itoa(int(info.Chunks[0].Total)))
|
||||
for _, chunk := range info.Chunks {
|
||||
var id oid.ID
|
||||
if err := id.Decode(chunk.ID.GetValue()); err != nil {
|
||||
return nil, fmt.Errorf("unable to decode chunk id: %w", err)
|
||||
}
|
||||
b.WriteString("\n Index: " + strconv.Itoa(int(chunk.Index)) + " ID: " + id.String())
|
||||
}
|
||||
return b.Bytes(), nil
|
||||
}
|
||||
}
|
||||
|
||||
func getRangeList(cmd *cobra.Command) ([]objectSDK.Range, error) {
|
||||
v := cmd.Flag("range").Value.String()
|
||||
if len(v) == 0 {
|
||||
|
|
|
@ -173,7 +173,7 @@ func initObjectService(c *cfg) {
|
|||
|
||||
sSearchV2 := createSearchSvcV2(sSearch, keyStorage)
|
||||
|
||||
sGet := createGetService(c, keyStorage, traverseGen, c.clientCache)
|
||||
sGet := createGetService(c, keyStorage, traverseGen, c.clientCache, c.cfgObject.cnrSource)
|
||||
|
||||
*c.cfgObject.getSvc = *sGet // need smth better
|
||||
|
||||
|
@ -358,6 +358,7 @@ func createSearchSvcV2(sSearch *searchsvc.Service, keyStorage *util.KeyStorage)
|
|||
|
||||
func createGetService(c *cfg, keyStorage *util.KeyStorage, traverseGen *util.TraverserGenerator,
|
||||
coreConstructor *cache.ClientCache,
|
||||
containerSource containercore.Source,
|
||||
) *getsvc.Service {
|
||||
ls := c.cfgObject.cfgLocalStorage.localStorage
|
||||
|
||||
|
@ -369,6 +370,7 @@ func createGetService(c *cfg, keyStorage *util.KeyStorage, traverseGen *util.Tra
|
|||
placement.SuccessAfter(1),
|
||||
),
|
||||
coreConstructor,
|
||||
containerSource,
|
||||
getsvc.WithLogger(c.log))
|
||||
}
|
||||
|
||||
|
|
4
go.mod
|
@ -4,10 +4,10 @@ go 1.21
|
|||
|
||||
require (
|
||||
code.gitea.io/sdk/gitea v0.17.1
|
||||
git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.16.1-0.20240327095603-491a47e7fe24
|
||||
git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.16.1-0.20240422151450-df9b65324a4c
|
||||
git.frostfs.info/TrueCloudLab/frostfs-contract v0.19.0
|
||||
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20231101111734-b3ad3335ff65
|
||||
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20240329104804-ec0cb2169f92
|
||||
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20240424080726-20ab57bf7ec3
|
||||
git.frostfs.info/TrueCloudLab/hrw v1.2.1
|
||||
git.frostfs.info/TrueCloudLab/policy-engine v0.0.0-20240412130734-0e69e485115a
|
||||
git.frostfs.info/TrueCloudLab/tzhash v1.8.0
|
||||
|
|
8
go.sum
|
@ -1,15 +1,15 @@
|
|||
code.gitea.io/sdk/gitea v0.17.1 h1:3jCPOG2ojbl8AcfaUCRYLT5MUcBMFwS0OSK2mA5Zok8=
|
||||
code.gitea.io/sdk/gitea v0.17.1/go.mod h1:aCnBqhHpoEWA180gMbaCtdX9Pl6BWBAuuP2miadoTNM=
|
||||
git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.16.1-0.20240327095603-491a47e7fe24 h1:uIkl0mKWwDICUZTbNWZ38HLYDBI9rMgdAhYQWZ0C9iQ=
|
||||
git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.16.1-0.20240327095603-491a47e7fe24/go.mod h1:OBDSr+DqV1z4VDouoX3YMleNc4DPBVBWTG3WDT2PK1o=
|
||||
git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.16.1-0.20240422151450-df9b65324a4c h1:RFDrNsF2e+EJfaB8lZrRRxNjQkLfM09gnEyudvGuc10=
|
||||
git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.16.1-0.20240422151450-df9b65324a4c/go.mod h1:OBDSr+DqV1z4VDouoX3YMleNc4DPBVBWTG3WDT2PK1o=
|
||||
git.frostfs.info/TrueCloudLab/frostfs-contract v0.19.0 h1:FzurjElUwC7InY9v5rzXReKbfBL5yRJKSWJPq6BKhH0=
|
||||
git.frostfs.info/TrueCloudLab/frostfs-contract v0.19.0/go.mod h1:F/fe1OoIDKr5Bz99q4sriuHDuf3aZefZy9ZsCqEtgxc=
|
||||
git.frostfs.info/TrueCloudLab/frostfs-crypto v0.6.0 h1:FxqFDhQYYgpe41qsIHVOcdzSVCB8JNSfPG7Uk4r2oSk=
|
||||
git.frostfs.info/TrueCloudLab/frostfs-crypto v0.6.0/go.mod h1:RUIKZATQLJ+TaYQa60X2fTDwfuhMfm8Ar60bQ5fr+vU=
|
||||
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20231101111734-b3ad3335ff65 h1:PaZ8GpnUoXxUoNsc1qp36bT2u7FU+neU4Jn9cl8AWqI=
|
||||
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20231101111734-b3ad3335ff65/go.mod h1:6aAX80dvJ3r5fjN9CzzPglRptoiPgIC9KFGGsUA+1Hw=
|
||||
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20240329104804-ec0cb2169f92 h1:hSyM52d8yIaOpYQlLlVYdrGbgCsvIDjwl3AJaJUlYPU=
|
||||
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20240329104804-ec0cb2169f92/go.mod h1:i0RKqiF4z3UOxLSNwhHw+cUz/JyYWuTRpnn9ere4Y3w=
|
||||
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20240424080726-20ab57bf7ec3 h1:7Sd/J2IM0uGpmFKBgseUh6/JsdJN06b8W8UZMKAUDZg=
|
||||
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20240424080726-20ab57bf7ec3/go.mod h1:wDFmMP7l00Xd5VZVzF2MuhyJCnotyhfxHYnvrEEG/e4=
|
||||
git.frostfs.info/TrueCloudLab/hrw v1.2.1 h1:ccBRK21rFvY5R1WotI6LNoPlizk7qSvdfD8lNIRudVc=
|
||||
git.frostfs.info/TrueCloudLab/hrw v1.2.1/go.mod h1:C1Ygde2n843yTZEQ0FP69jYiuaYV0kriLvP4zm8JuvM=
|
||||
git.frostfs.info/TrueCloudLab/policy-engine v0.0.0-20240412130734-0e69e485115a h1:wbndKvHbwDQiSMQWL75RxiTZCeUyCi7NUj1lsfdAGkc=
|
||||
|
|
|
@ -99,9 +99,17 @@ const (
|
|||
GetRemoteCallFailed = "remote call failed"
|
||||
GetCanNotAssembleTheObject = "can not assemble the object"
|
||||
GetTryingToAssembleTheObject = "trying to assemble the object..."
|
||||
GetTryingToAssembleTheECObject = "trying to assemble the ec object..."
|
||||
GetAssemblingSplittedObject = "assembling splitted object..."
|
||||
GetAssemblingECObject = "assembling erasure-coded object..."
|
||||
GetUnableToGetAllPartsECObject = "unable to get all parts, continue to reconstruct with existed"
|
||||
GetUnableToGetPartECObject = "unable to get part of the erasure-encoded object"
|
||||
GetUnableToHeadPartECObject = "unable to head part of the erasure-encoded object"
|
||||
GetUnableToGetECObjectContainer = "unable to get container for erasure-coded object"
|
||||
GetAssemblingSplittedObjectCompleted = "assembling splitted object completed"
|
||||
GetAssemblingECObjectCompleted = "assembling erasure-coded object completed"
|
||||
GetFailedToAssembleSplittedObject = "failed to assemble splitted object"
|
||||
GetFailedToAssembleECObject = "failed to assemble erasure-coded object"
|
||||
GetCouldNotGenerateContainerTraverser = "could not generate container traverser"
|
||||
GetCouldNotConstructRemoteNodeClient = "could not construct remote node client"
|
||||
GetCouldNotWriteHeader = "could not write header"
|
||||
|
@ -111,6 +119,7 @@ const (
|
|||
GetCompletingTheOperation = "completing the operation"
|
||||
fyrchik marked this conversation as resolved
Outdated
|
||||
GetRequestedObjectWasMarkedAsRemoved = "requested object was marked as removed"
|
||||
GetRequestedObjectIsVirtual = "requested object is virtual"
|
||||
GetRequestedObjectIsEC = "requested object is erasure-coded"
|
||||
GetRequestedRangeIsOutOfObjectBounds = "requested range is out of object bounds"
|
||||
PutAdditionalContainerBroadcastFailure = "additional container broadcast failure"
|
||||
SearchReturnResultDirectly = "return result directly"
|
||||
|
|
|
@ -88,6 +88,10 @@ func (e *StorageEngine) get(ctx context.Context, prm GetPrm) (GetRes, error) {
|
|||
return GetRes{}, logicerr.Wrap(objectSDK.NewSplitInfoError(it.SplitInfo))
|
||||
}
|
||||
|
||||
if it.ECInfo != nil {
|
||||
return GetRes{}, logicerr.Wrap(objectSDK.NewECInfoError(it.ECInfo))
|
||||
}
|
||||
|
||||
if it.ObjectExpired {
|
||||
return GetRes{}, errNotFound
|
||||
}
|
||||
|
@ -119,6 +123,7 @@ func (e *StorageEngine) get(ctx context.Context, prm GetPrm) (GetRes, error) {
|
|||
type getShardIterator struct {
|
||||
Object *objectSDK.Object
|
||||
SplitInfo *objectSDK.SplitInfo
|
||||
ECInfo *objectSDK.ECInfo
|
||||
OutError error
|
||||
ShardWithMeta hashedShard
|
||||
MetaError error
|
||||
|
@ -130,6 +135,7 @@ type getShardIterator struct {
|
|||
Engine *StorageEngine
|
||||
|
||||
splitInfoErr *objectSDK.SplitInfoError
|
||||
ecInfoErr *objectSDK.ECInfoError
|
||||
}
|
||||
|
||||
func (i *getShardIterator) tryGetWithMeta(ctx context.Context) {
|
||||
|
@ -164,6 +170,14 @@ func (i *getShardIterator) tryGetWithMeta(ctx context.Context) {
|
|||
|
||||
// stop iterating over shards if SplitInfo structure is complete
|
||||
return withLink && withLast
|
||||
case errors.As(err, &i.ecInfoErr):
|
||||
if i.ECInfo == nil {
|
||||
i.ECInfo = objectSDK.NewECInfo()
|
||||
}
|
||||
|
||||
util.MergeECInfo(i.ecInfoErr.ECInfo(), i.ECInfo)
|
||||
// stop iterating over shards if ECInfo structure is complete
|
||||
return len(i.ECInfo.Chunks) == int(i.ECInfo.Chunks[0].Total)
|
||||
case client.IsErrObjectAlreadyRemoved(err):
|
||||
i.OutError = err
|
||||
return true // stop, return it back
|
||||
|
|
|
@ -67,7 +67,6 @@ func (e *StorageEngine) Head(ctx context.Context, prm HeadPrm) (res HeadRes, err
|
|||
func (e *StorageEngine) head(ctx context.Context, prm HeadPrm) (HeadRes, error) {
|
||||
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.head")
|
||||
defer span.End()
|
||||
|
||||
if e.metrics != nil {
|
||||
defer elapsed("Head", e.metrics.AddMethodDuration)()
|
||||
}
|
||||
|
@ -76,11 +75,11 @@ func (e *StorageEngine) head(ctx context.Context, prm HeadPrm) (HeadRes, error)
|
|||
head *objectSDK.Object
|
||||
siErr *objectSDK.SplitInfoError
|
||||
outSI *objectSDK.SplitInfo
|
||||
|
||||
eiErr *objectSDK.ECInfoError
|
||||
outEI *objectSDK.ECInfo
|
||||
outError error = new(apistatus.ObjectNotFound)
|
||||
shPrm shard.HeadPrm
|
||||
)
|
||||
|
||||
var shPrm shard.HeadPrm
|
||||
shPrm.SetAddress(prm.addr)
|
||||
shPrm.SetRaw(prm.raw)
|
||||
|
||||
|
@ -94,44 +93,43 @@ func (e *StorageEngine) head(ctx context.Context, prm HeadPrm) (HeadRes, error)
|
|||
if outSI == nil {
|
||||
outSI = objectSDK.NewSplitInfo()
|
||||
}
|
||||
|
||||
util.MergeSplitInfo(siErr.SplitInfo(), outSI)
|
||||
|
||||
_, withLink := outSI.Link()
|
||||
_, withLast := outSI.LastPart()
|
||||
|
||||
// stop iterating over shards if SplitInfo structure is complete
|
||||
if withLink && withLast {
|
||||
return true
|
||||
}
|
||||
|
||||
return false
|
||||
case errors.As(err, &eiErr):
|
||||
if outEI == nil {
|
||||
outEI = objectSDK.NewECInfo()
|
||||
}
|
||||
util.MergeECInfo(eiErr.ECInfo(), outEI)
|
||||
// stop iterating over shards if ECInfo structure is complete
|
||||
return len(outEI.Chunks) == int(outEI.Chunks[0].Total)
|
||||
case client.IsErrObjectAlreadyRemoved(err):
|
||||
outError = err
|
||||
|
||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
unrelated to the commit unrelated to the commit
acid-ant
commented
This change required because we have check for amount of lines in method. Don't want to do refactoring in scope of this PR. This change required because we have check for amount of lines in method. Don't want to do refactoring in scope of this PR.
fyrchik
commented
That is what I mean by unrelated, can we have a separate commit for it, before the EC changes? That is what I mean by unrelated, can we have a separate commit for it, before the EC changes?
Well, there is another whitespace change commit already.
acid-ant
commented
Ok, moved to the separate commit. Ok, moved to the separate commit.
|
||||
return true // stop, return it back
|
||||
case shard.IsErrObjectExpired(err):
|
||||
// object is found but should not
|
||||
// be returned
|
||||
outError = new(apistatus.ObjectNotFound)
|
||||
|
||||
return true
|
||||
default:
|
||||
e.reportShardError(sh, "could not head object from shard", err)
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
head = res.Object()
|
||||
|
||||
return true
|
||||
})
|
||||
|
||||
if outSI != nil {
|
||||
return HeadRes{}, logicerr.Wrap(objectSDK.NewSplitInfoError(outSI))
|
||||
}
|
||||
|
||||
if head == nil {
|
||||
} else if outEI != nil {
|
||||
return HeadRes{}, logicerr.Wrap(objectSDK.NewECInfoError(outEI))
|
||||
} else if head == nil {
|
||||
return HeadRes{}, outError
|
||||
}
|
||||
|
||||
|
|
|
@ -108,6 +108,10 @@ func (db *DB) exists(tx *bbolt.Tx, addr oid.Address, currEpoch uint64) (exists b
|
|||
|
||||
return false, logicerr.Wrap(objectSDK.NewSplitInfoError(splitInfo))
|
||||
}
|
||||
// if parent bucket is empty, then check if object exists in ec bucket
|
||||
if data := getFromBucket(tx, ecInfoBucketName(cnr, key), objKey); len(data) != 0 {
|
||||
return false, getECInfoError(tx, cnr, data)
|
||||
}
|
||||
|
||||
// if parent bucket is empty, then check if object exists in typed buckets
|
||||
return firstIrregularObjectType(tx, cnr, objKey) != objectSDK.TypeRegular, nil
|
||||
|
|
|
@ -110,6 +110,11 @@ func (db *DB) get(tx *bbolt.Tx, addr oid.Address, key []byte, checkStatus, raw b
|
|||
return obj, obj.Unmarshal(data)
|
||||
}
|
||||
|
||||
data = getFromBucket(tx, ecInfoBucketName(cnr, bucketName), key)
|
||||
if len(data) != 0 {
|
||||
return nil, getECInfoError(tx, cnr, data)
|
||||
}
|
||||
|
||||
// if not found then check in tombstone index
|
||||
data = getFromBucket(tx, tombstoneBucketName(cnr, bucketName), key)
|
||||
if len(data) != 0 {
|
||||
|
@ -185,3 +190,27 @@ func getSplitInfoError(tx *bbolt.Tx, cnr cid.ID, key []byte) error {
|
|||
|
||||
return logicerr.Wrap(new(apistatus.ObjectNotFound))
|
||||
}
|
||||
|
||||
func getECInfoError(tx *bbolt.Tx, cnr cid.ID, data []byte) error {
|
||||
offset := 0
|
||||
ecInfo := objectSDK.NewECInfo()
|
||||
for offset < len(data) {
|
||||
key := data[offset : offset+objectKeySize]
|
||||
// check in primary index
|
||||
ojbData := getFromBucket(tx, primaryBucketName(cnr, make([]byte, bucketKeySize)), key)
|
||||
if len(data) != 0 {
|
||||
obj := objectSDK.New()
|
||||
if err := obj.Unmarshal(ojbData); err != nil {
|
||||
return err
|
||||
}
|
||||
chunk := objectSDK.ECChunk{}
|
||||
id, _ := obj.ID()
|
||||
chunk.SetID(id)
|
||||
chunk.Index = obj.ECHeader().Index()
|
||||
chunk.Total = obj.ECHeader().Total()
|
||||
ecInfo.AddChunk(chunk)
|
||||
}
|
||||
offset += objectKeySize
|
||||
}
|
||||
return logicerr.Wrap(objectSDK.NewECInfoError(ecInfo))
|
||||
}
|
||||
|
|
|
@ -3,6 +3,7 @@ package meta_test
|
|||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"os"
|
||||
"runtime"
|
||||
|
@ -15,8 +16,10 @@ import (
|
|||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
||||
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
|
||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/erasurecode"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
|
||||
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
|
@ -109,6 +112,42 @@ func TestDB_Get(t *testing.T) {
|
|||
require.True(t, binaryEqual(child.CutPayload(), newChild))
|
||||
})
|
||||
|
||||
t.Run("put erasure-coded object", func(t *testing.T) {
|
||||
cnr := cidtest.ID()
|
||||
virtual := testutil.GenerateObjectWithCID(cnr)
|
||||
c, err := erasurecode.NewConstructor(3, 1)
|
||||
require.NoError(t, err)
|
||||
pk, err := keys.NewPrivateKey()
|
||||
require.NoError(t, err)
|
||||
parts, err := c.Split(virtual, &pk.PrivateKey)
|
||||
require.NoError(t, err)
|
||||
for _, part := range parts {
|
||||
err = putBig(db, part)
|
||||
var eiError *objectSDK.ECInfoError
|
||||
if err != nil && !errors.As(err, &eiError) {
|
||||
require.NoError(t, err)
|
||||
}
|
||||
}
|
||||
_, err = metaGet(db, object.AddressOf(virtual), true)
|
||||
var eiError *objectSDK.ECInfoError
|
||||
require.ErrorAs(t, err, &eiError)
|
||||
require.Equal(t, len(eiError.ECInfo().Chunks), len(parts))
|
||||
for _, chunk := range eiError.ECInfo().Chunks {
|
||||
var found bool
|
||||
for _, part := range parts {
|
||||
partID, _ := part.ID()
|
||||
var chunkID oid.ID
|
||||
require.NoError(t, chunkID.ReadFromV2(chunk.ID))
|
||||
if chunkID.Equals(partID) {
|
||||
found = true
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
require.Fail(t, "chunk not found")
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("get removed object", func(t *testing.T) {
|
||||
obj := oidtest.Address()
|
||||
ts := oidtest.Address()
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package meta
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/binary"
|
||||
"errors"
|
||||
|
@ -264,6 +265,13 @@ func putUniqueIndexes(
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if ecHead := obj.GetECHeader(); ecHead != nil {
|
||||
err = putECInfo(tx, cnr, objKey, ecHead)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
|
@ -571,3 +579,34 @@ func isLinkObject(obj *objectSDK.Object) bool {
|
|||
func isLastObject(obj *objectSDK.Object) bool {
|
||||
return len(obj.Children()) == 0 && obj.Parent() != nil
|
||||
}
|
||||
|
||||
func putECInfo(tx *bbolt.Tx,
|
||||
cnr cid.ID, objKey []byte,
|
||||
ecHead *objectSDK.ECHeader,
|
||||
) error {
|
||||
parentID := objectKey(ecHead.Parent(), make([]byte, objectKeySize))
|
||||
bucketName := make([]byte, bucketKeySize)
|
||||
|
||||
val := getFromBucket(tx, ecInfoBucketName(cnr, bucketName), parentID)
|
||||
if len(val) == 0 {
|
||||
val = objKey
|
||||
} else {
|
||||
offset := 0
|
||||
found := false
|
||||
for offset < len(val) {
|
||||
if bytes.Equal(objKey, val[offset:offset+objectKeySize]) {
|
||||
found = true
|
||||
break
|
||||
}
|
||||
offset += objectKeySize
|
||||
}
|
||||
if !found {
|
||||
val = append(val, objKey...)
|
||||
}
|
||||
}
|
||||
return putUniqueIndexItem(tx, namedBucketItem{
|
||||
name: ecInfoBucketName(cnr, make([]byte, bucketKeySize)),
|
||||
key: parentID,
|
||||
val: val,
|
||||
})
|
||||
}
|
||||
|
|
|
@ -119,6 +119,11 @@ const (
|
|||
// Key: container ID + type
|
||||
// Value: container size in bytes as little-endian uint64
|
||||
containerCountersPrefix
|
||||
|
||||
// ecInfoPrefix is used for storing relation between EC parent id and chunk id.
|
||||
// Key: container ID + type
|
||||
// Value: Object id
|
||||
ecInfoPrefix
|
||||
)
|
||||
|
||||
const (
|
||||
|
@ -190,6 +195,11 @@ func splitBucketName(cnr cid.ID, key []byte) []byte {
|
|||
return bucketName(cnr, splitPrefix, key)
|
||||
}
|
||||
|
||||
// ecInfoBucketName returns <CID>_ecinfo.
|
||||
func ecInfoBucketName(cnr cid.ID, key []byte) []byte {
|
||||
return bucketName(cnr, ecInfoPrefix, key)
|
||||
}
|
||||
|
||||
// addressKey returns key for K-V tables when key is a whole address.
|
||||
func addressKey(addr oid.Address, key []byte) []byte {
|
||||
addr.Container().Encode(key)
|
||||
|
|
25
pkg/local_object_storage/util/ecinfo.go
Normal file
|
@ -0,0 +1,25 @@
|
|||
package util
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
|
||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
)
|
||||
|
||||
// MergeECInfo ignores conflicts and rewrites `to` with non empty values
|
||||
// from `from`.
|
||||
func MergeECInfo(from, to *objectSDK.ECInfo) *objectSDK.ECInfo {
|
||||
for _, fchunk := range from.Chunks {
|
||||
add := true
|
||||
for _, tchunk := range to.Chunks {
|
||||
if bytes.Equal(tchunk.ID.GetValue(), fchunk.ID.GetValue()) {
|
||||
add = false
|
||||
break
|
||||
}
|
||||
}
|
||||
if add {
|
||||
to.AddChunk(*objectSDK.NewECChunkFromV2(&fchunk))
|
||||
}
|
||||
}
|
||||
return to
|
||||
}
|
56
pkg/local_object_storage/util/ecinfo_test.go
Normal file
|
@ -0,0 +1,56 @@
|
|||
package util
|
||||
|
||||
import (
|
||||
"crypto/rand"
|
||||
"testing"
|
||||
|
||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestMergeECInfo(t *testing.T) {
|
||||
id := generateV2ID()
|
||||
target := objectSDK.NewECInfo()
|
||||
var chunk objectSDK.ECChunk
|
||||
chunk.Total = 2
|
||||
chunk.Index = 0
|
||||
chunk.SetID(id)
|
||||
target.AddChunk(chunk)
|
||||
|
||||
t.Run("merge empty", func(t *testing.T) {
|
||||
to := objectSDK.NewECInfo()
|
||||
|
||||
result := MergeECInfo(target, to)
|
||||
require.Equal(t, result, target)
|
||||
})
|
||||
|
||||
t.Run("merge existed", func(t *testing.T) {
|
||||
to := objectSDK.NewECInfo()
|
||||
to.AddChunk(chunk)
|
||||
|
||||
result := MergeECInfo(target, to)
|
||||
require.Equal(t, result, target)
|
||||
})
|
||||
t.Run("merge extend", func(t *testing.T) {
|
||||
to := objectSDK.NewECInfo()
|
||||
var chunk objectSDK.ECChunk
|
||||
chunk.Total = 2
|
||||
chunk.Index = 1
|
||||
chunk.SetID(generateV2ID())
|
||||
to.AddChunk(chunk)
|
||||
|
||||
result := MergeECInfo(target, to)
|
||||
require.Equal(t, len(result.Chunks), 2)
|
||||
})
|
||||
}
|
||||
|
||||
func generateV2ID() oid.ID {
|
||||
var buf [32]byte
|
||||
_, _ = rand.Read(buf[:])
|
||||
|
||||
var id oid.ID
|
||||
_ = id.Decode(buf[:])
|
||||
|
||||
return id
|
||||
}
|
6
pkg/network/cache/multi.go
vendored
|
@ -167,8 +167,9 @@ func (x *multiClient) iterateClients(ctx context.Context, f func(clientcore.Clie
|
|||
// from the SDK client; should not be considered
|
||||
// as a connection error
|
||||
var siErr *objectSDK.SplitInfoError
|
||||
var eiErr *objectSDK.ECInfoError
|
||||
|
||||
success := err == nil || errors.Is(err, context.Canceled) || errors.As(err, &siErr)
|
||||
success := err == nil || errors.Is(err, context.Canceled) || errors.As(err, &siErr) || errors.As(err, &eiErr)
|
||||
if success || firstErr == nil || errors.Is(firstErr, errRecentlyFailed) {
|
||||
firstErr = err
|
||||
}
|
||||
|
@ -196,7 +197,8 @@ func (x *multiClient) ReportError(err error) {
|
|||
// from the SDK client; should not be considered
|
||||
// as a connection error
|
||||
var siErr *objectSDK.SplitInfoError
|
||||
if errors.As(err, &siErr) {
|
||||
var eiErr *objectSDK.ECInfoError
|
||||
if errors.As(err, &siErr) || errors.As(err, &eiErr) {
|
||||
return
|
||||
}
|
||||
|
||||
|
|
|
@ -139,9 +139,11 @@ func (r *request) getObjectWithIndependentRequest(ctx context.Context, prm Reque
|
|||
remoteStorageConstructor: r.remoteStorageConstructor,
|
||||
epochSource: r.epochSource,
|
||||
localStorage: r.localStorage,
|
||||
containerSource: r.containerSource,
|
||||
|
||||
prm: prm,
|
||||
infoSplit: objectSDK.NewSplitInfo(),
|
||||
infoEC: objectSDK.NewECInfo(),
|
||||
log: r.log,
|
||||
}
|
||||
|
||||
|
|
79
pkg/services/object/get/assembleec.go
Normal file
|
@ -0,0 +1,79 @@
|
|||
package getsvc
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
func (r *request) assembleEC(ctx context.Context) {
|
||||
if r.isRaw() {
|
||||
r.log.Debug(logs.GetCanNotAssembleTheObject)
|
||||
return
|
||||
}
|
||||
|
||||
// Any access tokens are not expected to be used in the assembly process:
|
||||
// - there is no requirement to specify child objects in session/bearer
|
||||
// token for `GET`/`GETRANGE`/`RANGEHASH` requests in the API protocol,
|
||||
// and, therefore, their missing in the original request should not be
|
||||
// considered as error; on the other hand, without session for every child
|
||||
// object, it is impossible to attach bearer token in the new generated
|
||||
// requests correctly because the token has not been issued for that node's
|
||||
// key;
|
||||
// - the assembly process is expected to be handled on a container node
|
||||
// only since the requests forwarding mechanism presentation; such the
|
||||
// node should have enough rights for getting any child object by design.
|
||||
r.prm.common.ForgetTokens()
|
||||
|
||||
// Do not use forwarding during assembly stage.
|
||||
// Request forwarding closure inherited in produced
|
||||
// `execCtx` so it should be disabled there.
|
||||
r.disableForwarding()
|
||||
|
||||
r.log.Debug(logs.GetTryingToAssembleTheECObject)
|
||||
|
||||
assembler := newAssemblerEC(r.address(), r.infoEC, r.ctxRange(), r, r.containerSource, r.log)
|
||||
|
||||
r.log.Debug(logs.GetAssemblingECObject,
|
||||
zap.Stringer("address", r.address()),
|
||||
zap.Uint64("range_offset", r.ctxRange().GetOffset()),
|
||||
zap.Uint64("range_length", r.ctxRange().GetLength()),
|
||||
)
|
||||
defer r.log.Debug(logs.GetAssemblingECObjectCompleted,
|
||||
zap.Stringer("address", r.address()),
|
||||
zap.Uint64("range_offset", r.ctxRange().GetOffset()),
|
||||
zap.Uint64("range_length", r.ctxRange().GetLength()),
|
||||
)
|
||||
|
||||
obj, err := assembler.Assemble(ctx, r.prm.objWriter, r.headOnly())
|
||||
if err != nil {
|
||||
r.log.Warn(logs.GetFailedToAssembleECObject,
|
||||
zap.Error(err),
|
||||
zap.Stringer("address", r.address()),
|
||||
zap.Uint64("range_offset", r.ctxRange().GetOffset()),
|
||||
zap.Uint64("range_length", r.ctxRange().GetLength()),
|
||||
)
|
||||
}
|
||||
|
||||
var errRemoved *apistatus.ObjectAlreadyRemoved
|
||||
var errOutOfRange *apistatus.ObjectOutOfRange
|
||||
|
||||
switch {
|
||||
dstepanov-yadro
commented
Look like remote and local errors are the same. Look like remote and local errors are the same.
acid-ant
commented
Removed. Removed.
dstepanov-yadro
commented
Incompleted
Incompleted
```
var errOutOfRangeRemote *apistatus.ObjectOutOfRange
var errOutOfRangeLocal *apistatus.ObjectOutOfRange
```
acid-ant
commented
Sorry, fixed. Sorry, fixed.
|
||||
default:
|
||||
r.status = statusUndefined
|
||||
r.err = err
|
||||
case err == nil:
|
||||
r.status = statusOK
|
||||
r.err = nil
|
||||
r.collectedObject = obj
|
||||
case errors.As(err, &errRemoved):
|
||||
r.status = statusINHUMED
|
||||
r.err = errRemoved
|
||||
case errors.As(err, &errOutOfRange):
|
||||
r.status = statusOutOfRange
|
||||
r.err = errOutOfRange
|
||||
}
|
||||
}
|
117
pkg/services/object/get/assemblerec.go
Normal file
|
@ -0,0 +1,117 @@
|
|||
package getsvc
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/policy"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/erasurecode"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"go.uber.org/zap"
|
||||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
||||
type assemblerec struct {
|
||||
addr oid.Address
|
||||
ecInfo *objectSDK.ECInfo
|
||||
rng *objectSDK.Range
|
||||
objGetter objectGetter
|
||||
cs container.Source
|
||||
log *logger.Logger
|
||||
}
|
||||
|
||||
func newAssemblerEC(
|
||||
addr oid.Address,
|
||||
ecInfo *objectSDK.ECInfo,
|
||||
rng *objectSDK.Range,
|
||||
objGetter objectGetter,
|
||||
cs container.Source,
|
||||
log *logger.Logger,
|
||||
) *assemblerec {
|
||||
return &assemblerec{
|
||||
addr: addr,
|
||||
rng: rng,
|
||||
ecInfo: ecInfo,
|
||||
objGetter: objGetter,
|
||||
cs: cs,
|
||||
log: log,
|
||||
}
|
||||
}
|
||||
|
||||
// Assemble assembles erasure-coded object and writes it's content to ObjectWriter.
|
||||
// It returns parent object.
|
||||
func (a *assemblerec) Assemble(ctx context.Context, writer ObjectWriter, headOnly bool) (*objectSDK.Object, error) {
|
||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
When we are to add GET_RANGE do we need to alter this function or write code in another place? When we are to add GET_RANGE do we need to alter this function or write code in another place?
acid-ant
commented
We need to extend assembler for EC a bit, yes. We need to extend assembler for EC a bit, yes.
|
||||
parts := a.retrieveParts(ctx, headOnly)
|
||||
cnt, err := a.cs.Get(a.addr.Container())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
c, err := erasurecode.NewConstructor(
|
||||
policy.ECDataCount(cnt.Value.PlacementPolicy()),
|
||||
policy.ECParityCount(cnt.Value.PlacementPolicy()),
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if headOnly {
|
||||
obj, err := c.ReconstructHeader(parts)
|
||||
if err == nil {
|
||||
return obj, writer.WriteHeader(ctx, obj)
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
obj, err := c.Reconstruct(parts)
|
||||
if err == nil {
|
||||
err = writer.WriteHeader(ctx, obj.CutPayload())
|
||||
if err == nil {
|
||||
err = writer.WriteChunk(ctx, obj.Payload())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
}
|
||||
return obj, err
|
||||
}
|
||||
|
||||
func (a *assemblerec) retrieveParts(mainCtx context.Context, headOnly bool) []*objectSDK.Object {
|
||||
parts := make([]*objectSDK.Object, int(a.ecInfo.Chunks[0].Total))
|
||||
errGroup, ctx := errgroup.WithContext(mainCtx)
|
||||
|
||||
for i := range a.ecInfo.Chunks {
|
||||
chunk := a.ecInfo.Chunks[i]
|
||||
errGroup.Go(func() error {
|
||||
objID := new(oid.ID)
|
||||
err := objID.ReadFromV2(chunk.ID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("invalid object ID: %w", err)
|
||||
}
|
||||
var obj *objectSDK.Object
|
||||
if headOnly {
|
||||
obj, err = a.objGetter.HeadObject(ctx, *objID)
|
||||
if err != nil {
|
||||
a.log.Debug(logs.GetUnableToHeadPartECObject, zap.Stringer("part_id", objID), zap.Error(err))
|
||||
return nil
|
||||
}
|
||||
} else {
|
||||
sow := NewSimpleObjectWriter()
|
||||
obj, err = a.objGetter.GetObjectAndWritePayload(ctx, *objID, nil, sow)
|
||||
if err != nil {
|
||||
dstepanov-yadro
commented
ctx -> mainCtx, or even better to move ctx -> mainCtx, or even better to move `errgroup` related code to separate func.
As far as I remember errgoup's ctx is cancelled after Wait.
acid-ant
commented
Thanks a lot! Split on two functions. Thanks a lot! Split on two functions.
|
||||
a.log.Debug(logs.GetUnableToGetPartECObject, zap.Stringer("part_id", objID), zap.Error(err))
|
||||
return nil
|
||||
}
|
||||
obj.SetPayload(sow.pld)
|
||||
}
|
||||
parts[chunk.Index] = obj
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
if err := errGroup.Wait(); err != nil {
|
||||
a.log.Debug(logs.GetUnableToGetAllPartsECObject, zap.Error(err))
|
||||
}
|
||||
return parts
|
||||
}
|
|
@ -73,9 +73,12 @@ func (s *Service) get(ctx context.Context, prm RequestParameters) error {
|
|||
remoteStorageConstructor: s.remoteStorageConstructor,
|
||||
epochSource: s.epochSource,
|
||||
localStorage: s.localStorage,
|
||||
containerSource: s.containerSource,
|
||||
|
||||
prm: prm,
|
||||
infoSplit: objectSDK.NewSplitInfo(),
|
||||
infoEC: objectSDK.NewECInfo(),
|
||||
log: s.log,
|
||||
}
|
||||
|
||||
exec.setLogger(s.log)
|
||||
|
@ -106,6 +109,16 @@ func (exec *request) analyzeStatus(ctx context.Context, execCnr bool) {
|
|||
exec.assemble(ctx)
|
||||
case statusOutOfRange:
|
||||
exec.log.Debug(logs.GetRequestedRangeIsOutOfObjectBounds)
|
||||
case statusEC:
|
||||
if !exec.isLocal() {
|
||||
if execCnr {
|
||||
exec.executeOnContainer(ctx)
|
||||
exec.analyzeStatus(ctx, false)
|
||||
} else {
|
||||
exec.log.Debug(logs.GetRequestedObjectIsEC)
|
||||
exec.assembleEC(ctx)
|
||||
}
|
||||
}
|
||||
default:
|
||||
exec.log.Debug(logs.OperationFinishedWithError,
|
||||
zap.Error(exec.err),
|
||||
|
|
|
@ -5,6 +5,7 @@ import (
|
|||
"errors"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
|
@ -22,6 +23,7 @@ func (r *request) executeLocal(ctx context.Context) {
|
|||
r.collectedObject, err = r.get(ctx)
|
||||
|
||||
var errSplitInfo *objectSDK.SplitInfoError
|
||||
var errECInfo *objectSDK.ECInfoError
|
||||
var errRemoved *apistatus.ObjectAlreadyRemoved
|
||||
var errOutOfRange *apistatus.ObjectOutOfRange
|
||||
|
||||
|
@ -42,6 +44,10 @@ func (r *request) executeLocal(ctx context.Context) {
|
|||
r.status = statusVIRTUAL
|
||||
mergeSplitInfo(r.splitInfo(), errSplitInfo.SplitInfo())
|
||||
r.err = objectSDK.NewSplitInfoError(r.infoSplit)
|
||||
case errors.As(err, &errECInfo):
|
||||
r.status = statusEC
|
||||
util.MergeECInfo(errECInfo.ECInfo(), r.infoEC)
|
||||
r.err = objectSDK.NewECInfoError(r.infoEC)
|
||||
case errors.As(err, &errOutOfRange):
|
||||
r.status = statusOutOfRange
|
||||
r.err = errOutOfRange
|
||||
|
|
|
@ -7,6 +7,8 @@ import (
|
|||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/policy"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
|
@ -27,15 +29,20 @@ func (r *request) processNode(ctx context.Context, info client.NodeInfo) bool {
|
|||
obj, err := r.getRemote(ctx, rs, info)
|
||||
|
||||
var errSplitInfo *objectSDK.SplitInfoError
|
||||
var errECInfo *objectSDK.ECInfoError
|
||||
var errRemoved *apistatus.ObjectAlreadyRemoved
|
||||
var errOutOfRange *apistatus.ObjectOutOfRange
|
||||
|
||||
switch {
|
||||
default:
|
||||
r.log.Debug(logs.GetRemoteCallFailed, zap.Error(err))
|
||||
if r.status == statusEC {
|
||||
fyrchik
commented
What does this condition checks? What does this condition checks?
acid-ant
commented
We need to continue to process the nodes in case of getting error from the node. Updated this line a bit. We need to continue to process the nodes in case of getting error from the node. Updated this line a bit.
|
||||
// we need to continue getting another chunks from another nodes
|
||||
// in case of network issue
|
||||
return false
|
||||
}
|
||||
r.status = statusUndefined
|
||||
r.err = new(apistatus.ObjectNotFound)
|
||||
|
||||
r.log.Debug(logs.GetRemoteCallFailed, zap.Error(err))
|
||||
case err == nil:
|
||||
r.status = statusOK
|
||||
r.err = nil
|
||||
|
@ -57,6 +64,20 @@ func (r *request) processNode(ctx context.Context, info client.NodeInfo) bool {
|
|||
r.status = statusVIRTUAL
|
||||
mergeSplitInfo(r.splitInfo(), errSplitInfo.SplitInfo())
|
||||
r.err = objectSDK.NewSplitInfoError(r.infoSplit)
|
||||
case errors.As(err, &errECInfo):
|
||||
r.status = statusEC
|
||||
util.MergeECInfo(r.infoEC, errECInfo.ECInfo())
|
||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
Each TODO should be accompanied by an issue, could you create one? Each TODO should be accompanied by an issue, could you create one?
fyrchik
commented
Anyway, the condition here is easy to fix, the number of chunks we must have is equal to Anyway, the condition here is easy to fix, the number of chunks we must have is equal to `placementpolicy.ECDataCount()`
acid-ant
commented
Fixed. Fixed.
|
||||
r.infoEC = errECInfo.ECInfo()
|
||||
r.err = objectSDK.NewECInfoError(r.infoEC)
|
||||
if r.isRaw() {
|
||||
return len(r.infoEC.Chunks) == int(r.infoEC.Chunks[0].Total)
|
||||
}
|
||||
cnt, err := r.containerSource.Get(r.address().Container())
|
||||
if err == nil {
|
||||
return len(r.infoEC.Chunks) == policy.ECDataCount(cnt.Value.PlacementPolicy())
|
||||
}
|
||||
r.log.Debug(logs.GetUnableToGetECObjectContainer, zap.Error(err))
|
||||
return len(r.infoEC.Chunks) == int(r.infoEC.Chunks[0].Total)
|
||||
}
|
||||
|
||||
return r.status != statusUndefined
|
||||
|
|
|
@ -6,6 +6,7 @@ import (
|
|||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
clientcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||
|
@ -22,6 +23,8 @@ type request struct {
|
|||
|
||||
infoSplit *objectSDK.SplitInfo
|
||||
|
||||
infoEC *objectSDK.ECInfo
|
||||
|
||||
log *logger.Logger
|
||||
|
||||
collectedObject *objectSDK.Object
|
||||
|
@ -33,6 +36,7 @@ type request struct {
|
|||
traverserGenerator traverserGenerator
|
||||
remoteStorageConstructor remoteStorageConstructor
|
||||
localStorage localStorage
|
||||
containerSource container.Source
|
||||
}
|
||||
|
||||
func (r *request) setLogger(l *logger.Logger) {
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package getsvc
|
||||
|
||||
import (
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
@ -16,6 +17,7 @@ type Service struct {
|
|||
epochSource epochSource
|
||||
keyStore keyStorage
|
||||
remoteStorageConstructor remoteStorageConstructor
|
||||
containerSource container.Source
|
||||
}
|
||||
|
||||
// New creates, initializes and returns utility serving
|
||||
|
@ -26,6 +28,7 @@ func New(
|
|||
e localStorageEngine,
|
||||
tg traverserGenerator,
|
||||
cc clientConstructor,
|
||||
cs container.Source,
|
||||
opts ...Option,
|
||||
) *Service {
|
||||
result := &Service{
|
||||
|
@ -39,6 +42,7 @@ func New(
|
|||
remoteStorageConstructor: &multiclientRemoteStorageConstructor{
|
||||
clientConstructor: cc,
|
||||
},
|
||||
containerSource: cs,
|
||||
}
|
||||
for _, option := range opts {
|
||||
option(result)
|
||||
|
|
|
@ -6,6 +6,7 @@ const (
|
|||
statusINHUMED
|
||||
statusVIRTUAL
|
||||
statusOutOfRange
|
||||
statusEC
|
||||
)
|
||||
|
||||
type statusError struct {
|
||||
|
|
|
@ -166,6 +166,9 @@ func (f *getRequestForwarder) readStream(ctx context.Context, c client.MultiAddr
|
|||
case *objectV2.SplitInfo:
|
||||
si := objectSDK.NewSplitInfoFromV2(v)
|
||||
return objectSDK.NewSplitInfoError(si)
|
||||
case *objectV2.ECInfo:
|
||||
ei := objectSDK.NewECInfoFromV2(v)
|
||||
return objectSDK.NewECInfoError(ei)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
|
|
|
@ -84,6 +84,9 @@ func (f *headRequestForwarder) forwardRequestToNode(ctx context.Context, addr ne
|
|||
case *objectV2.SplitInfo:
|
||||
si := objectSDK.NewSplitInfoFromV2(v)
|
||||
return nil, objectSDK.NewSplitInfoError(si)
|
||||
case *objectV2.ECInfo:
|
||||
ei := objectSDK.NewECInfoFromV2(v)
|
||||
return nil, objectSDK.NewECInfoError(ei)
|
||||
}
|
||||
|
||||
objv2 := new(objectV2.Object)
|
||||
|
|
|
@ -82,10 +82,13 @@ func (s *Service) Get(req *objectV2.GetRequest, stream objectSvc.GetObjectStream
|
|||
err = s.svc.Get(stream.Context(), *p)
|
||||
|
||||
var splitErr *objectSDK.SplitInfoError
|
||||
var ecErr *objectSDK.ECInfoError
|
||||
|
||||
switch {
|
||||
case errors.As(err, &splitErr):
|
||||
return stream.Send(splitInfoResponse(splitErr.SplitInfo()))
|
||||
case errors.As(err, &ecErr):
|
||||
return stream.Send(ecInfoResponse(ecErr.ECInfo()))
|
||||
default:
|
||||
return err
|
||||
}
|
||||
|
@ -123,11 +126,16 @@ func (s *Service) Head(ctx context.Context, req *objectV2.HeadRequest) (*objectV
|
|||
err = s.svc.Head(ctx, *p)
|
||||
|
||||
var splitErr *objectSDK.SplitInfoError
|
||||
var ecErr *objectSDK.ECInfoError
|
||||
|
||||
if errors.As(err, &splitErr) {
|
||||
setSplitInfoHeadResponse(splitErr.SplitInfo(), resp)
|
||||
err = nil
|
||||
}
|
||||
if errors.As(err, &ecErr) {
|
||||
setECInfoHeadResponse(ecErr.ECInfo(), resp)
|
||||
err = nil
|
||||
}
|
||||
|
||||
return resp, err
|
||||
}
|
||||
|
|
|
@ -270,6 +270,17 @@ func splitInfoResponse(info *objectSDK.SplitInfo) *objectV2.GetResponse {
|
|||
return resp
|
||||
}
|
||||
|
||||
func ecInfoResponse(info *objectSDK.ECInfo) *objectV2.GetResponse {
|
||||
resp := new(objectV2.GetResponse)
|
||||
|
||||
body := new(objectV2.GetResponseBody)
|
||||
resp.SetBody(body)
|
||||
|
||||
body.SetObjectPart(info.ToV2())
|
||||
|
||||
return resp
|
||||
}
|
||||
|
||||
func splitInfoRangeResponse(info *objectSDK.SplitInfo) *objectV2.GetRangeResponse {
|
||||
resp := new(objectV2.GetRangeResponse)
|
||||
|
||||
|
@ -285,6 +296,10 @@ func setSplitInfoHeadResponse(info *objectSDK.SplitInfo, resp *objectV2.HeadResp
|
|||
resp.GetBody().SetHeaderPart(info.ToV2())
|
||||
}
|
||||
|
||||
func setECInfoHeadResponse(info *objectSDK.ECInfo, resp *objectV2.HeadResponse) {
|
||||
resp.GetBody().SetHeaderPart(info.ToV2())
|
||||
}
|
||||
|
||||
func toHashResponse(typ refs.ChecksumType, res *getsvc.RangeHashRes) *objectV2.GetRangeHashResponse {
|
||||
resp := new(objectV2.GetRangeHashResponse)
|
||||
|
||||
|
|
|
@ -19,9 +19,7 @@ import (
|
|||
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
||||
)
|
||||
|
||||
var (
|
||||
subjectNotFoundErrorMessage = "subject not found"
|
||||
)
|
||||
var subjectNotFoundErrorMessage = "subject not found"
|
||||
|
||||
func (s *Service) checkAPE(container *core.Container, cid cid.ID, operation acl.Op, role acl.Role, publicKey *keys.PublicKey) error {
|
||||
namespace := ""
|
||||
|
|
We have "erasure coded" here and "erasure-encoded" in CLI print.
Wikipedia uses "erasure-coded" https://en.wikipedia.org/wiki/Erasure_code, let's stick with it.
Agree, updated here in other places.