node: Implement Range\RangeHash
requests for EC object #1112
12 changed files with 474 additions and 29 deletions
4
go.mod
4
go.mod
|
@ -4,10 +4,10 @@ go 1.21
|
||||||
|
|
||||||
require (
|
require (
|
||||||
code.gitea.io/sdk/gitea v0.17.1
|
code.gitea.io/sdk/gitea v0.17.1
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.16.1-0.20240422151450-df9b65324a4c
|
git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.16.1-0.20240427200446-67c6f305b21f
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-contract v0.19.0
|
git.frostfs.info/TrueCloudLab/frostfs-contract v0.19.0
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20231101111734-b3ad3335ff65
|
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20231101111734-b3ad3335ff65
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20240424080726-20ab57bf7ec3
|
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20240507063414-99e02858af12
|
||||||
git.frostfs.info/TrueCloudLab/hrw v1.2.1
|
git.frostfs.info/TrueCloudLab/hrw v1.2.1
|
||||||
git.frostfs.info/TrueCloudLab/policy-engine v0.0.0-20240412130734-0e69e485115a
|
git.frostfs.info/TrueCloudLab/policy-engine v0.0.0-20240412130734-0e69e485115a
|
||||||
git.frostfs.info/TrueCloudLab/tzhash v1.8.0
|
git.frostfs.info/TrueCloudLab/tzhash v1.8.0
|
||||||
|
|
BIN
go.sum
BIN
go.sum
Binary file not shown.
|
@ -102,6 +102,9 @@ func (e *StorageEngine) getRange(ctx context.Context, prm RngPrm) (RngRes, error
|
||||||
if it.SplitInfo != nil {
|
if it.SplitInfo != nil {
|
||||||
return RngRes{}, logicerr.Wrap(objectSDK.NewSplitInfoError(it.SplitInfo))
|
return RngRes{}, logicerr.Wrap(objectSDK.NewSplitInfoError(it.SplitInfo))
|
||||||
}
|
}
|
||||||
|
if it.ECInfo != nil {
|
||||||
|
return RngRes{}, logicerr.Wrap(objectSDK.NewECInfoError(it.ECInfo))
|
||||||
|
}
|
||||||
|
|
||||||
if it.Object == nil {
|
if it.Object == nil {
|
||||||
// If any shard is in a degraded mode, we should assume that metabase could store
|
// If any shard is in a degraded mode, we should assume that metabase could store
|
||||||
|
@ -147,6 +150,8 @@ type getRangeShardIterator struct {
|
||||||
Object *objectSDK.Object
|
Object *objectSDK.Object
|
||||||
SplitInfoError *objectSDK.SplitInfoError
|
SplitInfoError *objectSDK.SplitInfoError
|
||||||
SplitInfo *objectSDK.SplitInfo
|
SplitInfo *objectSDK.SplitInfo
|
||||||
|
ECInfoError *objectSDK.ECInfoError
|
||||||
|
ECInfo *objectSDK.ECInfo
|
||||||
OutError error
|
OutError error
|
||||||
ShardWithMeta hashedShard
|
ShardWithMeta hashedShard
|
||||||
MetaError error
|
MetaError error
|
||||||
|
@ -188,6 +193,14 @@ func (i *getRangeShardIterator) tryGetWithMeta(ctx context.Context) {
|
||||||
|
|
||||||
// stop iterating over shards if SplitInfo structure is complete
|
// stop iterating over shards if SplitInfo structure is complete
|
||||||
return withLink && withLast
|
return withLink && withLast
|
||||||
|
case errors.As(err, &i.ECInfoError):
|
||||||
|
if i.ECInfo == nil {
|
||||||
|
i.ECInfo = objectSDK.NewECInfo()
|
||||||
|
}
|
||||||
|
|
||||||
|
util.MergeECInfo(i.ECInfoError.ECInfo(), i.ECInfo)
|
||||||
|
// stop iterating over shards if ECInfo structure is complete
|
||||||
|
return len(i.ECInfo.Chunks) == int(i.ECInfo.Chunks[0].Total)
|
||||||
case
|
case
|
||||||
client.IsErrObjectAlreadyRemoved(err),
|
client.IsErrObjectAlreadyRemoved(err),
|
||||||
shard.IsErrOutOfRange(err):
|
shard.IsErrOutOfRange(err):
|
||||||
|
|
|
@ -37,6 +37,7 @@ func (r *request) assemble(ctx context.Context) {
|
||||||
|
|
||||||
r.log.Debug(logs.GetTryingToAssembleTheObject)
|
r.log.Debug(logs.GetTryingToAssembleTheObject)
|
||||||
|
|
||||||
|
r.prm.common = r.prm.common.WithLocalOnly(false)
|
||||||
assembler := newAssembler(r.address(), r.splitInfo(), r.ctxRange(), r)
|
assembler := newAssembler(r.address(), r.splitInfo(), r.ctxRange(), r)
|
||||||
|
|
||||||
r.log.Debug(logs.GetAssemblingSplittedObject,
|
r.log.Debug(logs.GetAssemblingSplittedObject,
|
||||||
|
@ -119,7 +120,6 @@ func (r *request) GetObjectAndWritePayload(ctx context.Context, id oid.ID, rng *
|
||||||
}
|
}
|
||||||
|
|
||||||
p := r.prm
|
p := r.prm
|
||||||
p.common = p.common.WithLocalOnly(false)
|
|
||||||
p.objWriter = w
|
p.objWriter = w
|
||||||
p.rng = rng
|
p.rng = rng
|
||||||
|
|
||||||
|
|
|
@ -35,6 +35,7 @@ func (r *request) assembleEC(ctx context.Context) {
|
||||||
|
|
||||||
r.log.Debug(logs.GetTryingToAssembleTheECObject)
|
r.log.Debug(logs.GetTryingToAssembleTheECObject)
|
||||||
|
|
||||||
|
r.prm.common = r.prm.common.WithLocalOnly(false)
|
||||||
assembler := newAssemblerEC(r.address(), r.infoEC, r.ctxRange(), r, r.containerSource, r.log)
|
assembler := newAssemblerEC(r.address(), r.infoEC, r.ctxRange(), r, r.containerSource, r.log)
|
||||||
|
|
||||||
r.log.Debug(logs.GetAssemblingECObject,
|
r.log.Debug(logs.GetAssemblingECObject,
|
||||||
|
|
|
@ -114,7 +114,7 @@ func (a *assembler) initializeFromSourceObjectID(ctx context.Context, id oid.ID)
|
||||||
}
|
}
|
||||||
|
|
||||||
to := uint64(0)
|
to := uint64(0)
|
||||||
if seekOff+seekLen > a.currentOffset+from {
|
if seekOff+seekLen >= a.currentOffset+from {
|
||||||
to = seekOff + seekLen - a.currentOffset
|
to = seekOff + seekLen - a.currentOffset
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -8,6 +8,7 @@ import (
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/policy"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/policy"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||||
|
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/erasurecode"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/erasurecode"
|
||||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
|
@ -45,24 +46,65 @@ func newAssemblerEC(
|
||||||
// Assemble assembles erasure-coded object and writes it's content to ObjectWriter.
|
// Assemble assembles erasure-coded object and writes it's content to ObjectWriter.
|
||||||
// It returns parent object.
|
// It returns parent object.
|
||||||
func (a *assemblerec) Assemble(ctx context.Context, writer ObjectWriter, headOnly bool) (*objectSDK.Object, error) {
|
func (a *assemblerec) Assemble(ctx context.Context, writer ObjectWriter, headOnly bool) (*objectSDK.Object, error) {
|
||||||
parts := a.retrieveParts(ctx, headOnly)
|
if headOnly {
|
||||||
|
return a.reconstructHeader(ctx, writer)
|
||||||
|
} else if a.rng != nil {
|
||||||
|
return a.reconstructRange(ctx, writer)
|
||||||
|
}
|
||||||
|
return a.reconstructObject(ctx, writer)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *assemblerec) getConstructor() (*erasurecode.Constructor, error) {
|
||||||
cnt, err := a.cs.Get(a.addr.Container())
|
cnt, err := a.cs.Get(a.addr.Container())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
c, err := erasurecode.NewConstructor(
|
dataCount := policy.ECDataCount(cnt.Value.PlacementPolicy())
|
||||||
policy.ECDataCount(cnt.Value.PlacementPolicy()),
|
parityCount := policy.ECParityCount(cnt.Value.PlacementPolicy())
|
||||||
policy.ECParityCount(cnt.Value.PlacementPolicy()),
|
return erasurecode.NewConstructor(dataCount, parityCount)
|
||||||
)
|
}
|
||||||
|
|
||||||
|
func (a *assemblerec) reconstructHeader(ctx context.Context, writer ObjectWriter) (*objectSDK.Object, error) {
|
||||||
|
parts := a.retrieveParts(ctx, true)
|
||||||
|
c, err := a.getConstructor()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if headOnly {
|
|
||||||
obj, err := c.ReconstructHeader(parts)
|
obj, err := c.ReconstructHeader(parts)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
return obj, writer.WriteHeader(ctx, obj)
|
return obj, writer.WriteHeader(ctx, obj)
|
||||||
}
|
}
|
||||||
return nil, err
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *assemblerec) reconstructRange(ctx context.Context, writer ObjectWriter) (*objectSDK.Object, error) {
|
||||||
|
parts := a.retrieveParts(ctx, false)
|
||||||
|
c, err := a.getConstructor()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
obj, err := c.Reconstruct(parts)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
from := a.rng.GetOffset()
|
||||||
|
to := from + a.rng.GetLength()
|
||||||
|
if pLen := uint64(len(obj.Payload())); to < from || pLen < from || pLen < to {
|
||||||
|
return nil, &apistatus.ObjectOutOfRange{}
|
||||||
|
}
|
||||||
|
err = writer.WriteChunk(ctx, obj.Payload()[from:to])
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return obj, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *assemblerec) reconstructObject(ctx context.Context, writer ObjectWriter) (*objectSDK.Object, error) {
|
||||||
|
parts := a.retrieveParts(ctx, false)
|
||||||
|
c, err := a.getConstructor()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
}
|
}
|
||||||
obj, err := c.Reconstruct(parts)
|
obj, err := c.Reconstruct(parts)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package getsvc
|
package getsvc
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
"crypto/ecdsa"
|
"crypto/ecdsa"
|
||||||
"crypto/rand"
|
"crypto/rand"
|
||||||
|
@ -25,6 +26,9 @@ import (
|
||||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
|
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/transformer"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/version"
|
||||||
|
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -62,6 +66,10 @@ func (e testEpochReceiver) Epoch() (uint64, error) {
|
||||||
return uint64(e), nil
|
return uint64(e), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (e testEpochReceiver) CurrentEpoch() uint64 {
|
||||||
|
return uint64(e)
|
||||||
|
}
|
||||||
|
|
||||||
func newTestStorage() *testStorage {
|
func newTestStorage() *testStorage {
|
||||||
return &testStorage{
|
return &testStorage{
|
||||||
inhumed: make(map[string]struct{}),
|
inhumed: make(map[string]struct{}),
|
||||||
|
@ -555,21 +563,6 @@ func TestGetRemoteSmall(t *testing.T) {
|
||||||
return p
|
return p
|
||||||
}
|
}
|
||||||
|
|
||||||
newRngPrm := func(raw bool, w ChunkWriter, off, ln uint64) RangePrm {
|
|
||||||
p := RangePrm{}
|
|
||||||
p.SetChunkWriter(w)
|
|
||||||
p.WithRawFlag(raw)
|
|
||||||
p.common = new(util.CommonPrm).WithLocalOnly(false)
|
|
||||||
|
|
||||||
r := objectSDK.NewRange()
|
|
||||||
r.SetOffset(off)
|
|
||||||
r.SetLength(ln)
|
|
||||||
|
|
||||||
p.SetRange(r)
|
|
||||||
|
|
||||||
return p
|
|
||||||
}
|
|
||||||
|
|
||||||
newHeadPrm := func(raw bool, w ObjectWriter) HeadPrm {
|
newHeadPrm := func(raw bool, w ObjectWriter) HeadPrm {
|
||||||
p := HeadPrm{}
|
p := HeadPrm{}
|
||||||
p.SetHeaderWriter(w)
|
p.SetHeaderWriter(w)
|
||||||
|
@ -1628,6 +1621,203 @@ func TestGetRemoteSmall(t *testing.T) {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type testTarget struct {
|
||||||
|
objects []*objectSDK.Object
|
||||||
|
}
|
||||||
|
|
||||||
|
func (tt *testTarget) WriteObject(_ context.Context, obj *objectSDK.Object) error {
|
||||||
|
tt.objects = append(tt.objects, obj)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func objectChain(t *testing.T, cnr cid.ID, singleSize, totalSize uint64) (oid.ID, []*objectSDK.Object, *objectSDK.Object, []byte) {
|
||||||
|
pk, err := keys.NewPrivateKey()
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
tt := new(testTarget)
|
||||||
|
p := transformer.NewPayloadSizeLimiter(transformer.Params{
|
||||||
|
Key: &pk.PrivateKey,
|
||||||
|
NextTargetInit: func() transformer.ObjectWriter { return tt },
|
||||||
|
NetworkState: testEpochReceiver(1),
|
||||||
|
MaxSize: singleSize,
|
||||||
|
})
|
||||||
|
|
||||||
|
payload := make([]byte, totalSize)
|
||||||
|
_, err = rand.Read(payload)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
ver := version.Current()
|
||||||
|
hdr := objectSDK.New()
|
||||||
|
hdr.SetContainerID(cnr)
|
||||||
|
hdr.SetType(objectSDK.TypeRegular)
|
||||||
|
hdr.SetVersion(&ver)
|
||||||
|
|
||||||
|
ctx := context.Background()
|
||||||
|
require.NoError(t, p.WriteHeader(ctx, hdr))
|
||||||
|
|
||||||
|
_, err = p.Write(ctx, payload)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
res, err := p.Close(ctx)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
if totalSize <= singleSize {
|
||||||
|
// Small object, no linking.
|
||||||
|
require.Len(t, tt.objects, 1)
|
||||||
|
return res.SelfID, tt.objects, nil, payload
|
||||||
|
}
|
||||||
|
|
||||||
|
return *res.ParentID, tt.objects[:len(tt.objects)-1], tt.objects[len(tt.objects)-1], bytes.Clone(payload)
|
||||||
|
}
|
||||||
|
|
||||||
|
func newRngPrm(raw bool, w ChunkWriter, off, ln uint64) RangePrm {
|
||||||
|
p := RangePrm{}
|
||||||
|
p.SetChunkWriter(w)
|
||||||
|
p.WithRawFlag(raw)
|
||||||
|
p.common = new(util.CommonPrm)
|
||||||
|
|
||||||
|
r := objectSDK.NewRange()
|
||||||
|
r.SetOffset(off)
|
||||||
|
r.SetLength(ln)
|
||||||
|
|
||||||
|
p.SetRange(r)
|
||||||
|
return p
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestGetRange(t *testing.T) {
|
||||||
|
var cnr container.Container
|
||||||
|
cnr.SetPlacementPolicy(netmaptest.PlacementPolicy())
|
||||||
|
|
||||||
|
var idCnr cid.ID
|
||||||
|
container.CalculateID(&idCnr, cnr)
|
||||||
|
|
||||||
|
ns, as := testNodeMatrix(t, []int{2})
|
||||||
|
|
||||||
|
testGetRange := func(t *testing.T, svc *Service, addr oid.Address, from, to uint64, payload []byte) {
|
||||||
|
w := NewSimpleObjectWriter()
|
||||||
|
rngPrm := newRngPrm(false, w, from, to-from)
|
||||||
|
rngPrm.WithAddress(addr)
|
||||||
|
|
||||||
|
err := svc.GetRange(context.Background(), rngPrm)
|
||||||
|
require.NoError(t, err)
|
||||||
|
if from == to {
|
||||||
|
require.Nil(t, w.Object().Payload())
|
||||||
|
} else {
|
||||||
|
require.Equal(t, payload[from:to], w.Object().Payload())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
newSvc := func(b *testPlacementBuilder, c *testClientCache) *Service {
|
||||||
|
const curEpoch = 13
|
||||||
|
|
||||||
|
return &Service{
|
||||||
|
log: test.NewLogger(t),
|
||||||
|
localStorage: newTestStorage(),
|
||||||
|
traverserGenerator: &testTraverserGenerator{
|
||||||
|
c: cnr,
|
||||||
|
b: map[uint64]placement.Builder{
|
||||||
|
curEpoch: b,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
epochSource: testEpochReceiver(curEpoch),
|
||||||
|
remoteStorageConstructor: c,
|
||||||
|
keyStore: &testKeyStorage{},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
t.Run("small", func(t *testing.T) {
|
||||||
|
const totalSize = 5
|
||||||
|
_, objs, _, payload := objectChain(t, idCnr, totalSize, totalSize)
|
||||||
|
require.Len(t, objs, 1)
|
||||||
|
require.Len(t, payload, totalSize)
|
||||||
|
|
||||||
|
obj := objs[0]
|
||||||
|
addr := object.AddressOf(obj)
|
||||||
|
builder := &testPlacementBuilder{vectors: map[string][][]netmap.NodeInfo{addr.EncodeToString(): ns}}
|
||||||
|
|
||||||
|
c1 := newTestClient()
|
||||||
|
c1.addResult(addr, obj, nil)
|
||||||
|
|
||||||
|
svc := newSvc(builder, &testClientCache{
|
||||||
|
clients: map[string]*testClient{
|
||||||
|
as[0][0]: c1,
|
||||||
|
as[0][1]: c1,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
|
||||||
|
for from := 0; from < totalSize-1; from++ {
|
||||||
|
for to := from; to < totalSize; to++ {
|
||||||
|
t.Run(fmt.Sprintf("from=%d,to=%d", from, to), func(t *testing.T) {
|
||||||
|
testGetRange(t, svc, addr, uint64(from), uint64(to), payload)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
t.Run("big", func(t *testing.T) {
|
||||||
|
const totalSize = 9
|
||||||
|
id, objs, link, payload := objectChain(t, idCnr, 3, totalSize) // 3 parts
|
||||||
|
require.Equal(t, totalSize, len(payload))
|
||||||
|
|
||||||
|
builder := &testPlacementBuilder{vectors: map[string][][]netmap.NodeInfo{}}
|
||||||
|
builder.vectors[idCnr.EncodeToString()+"/"+id.EncodeToString()] = ns
|
||||||
|
builder.vectors[object.AddressOf(link).EncodeToString()] = ns
|
||||||
|
for i := range objs {
|
||||||
|
builder.vectors[object.AddressOf(objs[i]).EncodeToString()] = ns
|
||||||
|
}
|
||||||
|
|
||||||
|
var addr oid.Address
|
||||||
|
addr.SetContainer(idCnr)
|
||||||
|
addr.SetObject(id)
|
||||||
|
|
||||||
|
const (
|
||||||
|
linkingLast = "splitinfo=last"
|
||||||
|
linkingChildren = "splitinfo=children"
|
||||||
|
linkingBoth = "splitinfo=both"
|
||||||
|
)
|
||||||
|
|
||||||
|
lastID, _ := objs[len(objs)-1].ID()
|
||||||
|
linkID, _ := link.ID()
|
||||||
|
|
||||||
|
for _, kind := range []string{linkingLast, linkingChildren, linkingBoth} {
|
||||||
|
t.Run(kind, func(t *testing.T) {
|
||||||
|
c1 := newTestClient()
|
||||||
|
for i := range objs {
|
||||||
|
c1.addResult(object.AddressOf(objs[i]), objs[i], nil)
|
||||||
|
}
|
||||||
|
|
||||||
|
c1.addResult(object.AddressOf(link), link, nil)
|
||||||
|
|
||||||
|
si := objectSDK.NewSplitInfo()
|
||||||
|
switch kind {
|
||||||
|
case linkingLast:
|
||||||
|
si.SetLastPart(lastID)
|
||||||
|
case linkingChildren:
|
||||||
|
si.SetLink(linkID)
|
||||||
|
case linkingBoth:
|
||||||
|
si.SetLastPart(lastID)
|
||||||
|
si.SetLink(linkID)
|
||||||
|
}
|
||||||
|
c1.addResult(addr, nil, objectSDK.NewSplitInfoError(si))
|
||||||
|
|
||||||
|
svc := newSvc(builder, &testClientCache{
|
||||||
|
clients: map[string]*testClient{
|
||||||
|
as[0][0]: c1,
|
||||||
|
as[0][1]: c1,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
|
||||||
|
for from := 0; from < totalSize-1; from++ {
|
||||||
|
for to := from; to < totalSize; to++ {
|
||||||
|
t.Run(fmt.Sprintf("from=%d,to=%d", from, to), func(t *testing.T) {
|
||||||
|
testGetRange(t, svc, addr, uint64(from), uint64(to), payload)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
func TestGetFromPastEpoch(t *testing.T) {
|
func TestGetFromPastEpoch(t *testing.T) {
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
|
|
||||||
|
|
182
pkg/services/object/get/getrangeec_test.go
Normal file
182
pkg/services/object/get/getrangeec_test.go
Normal file
|
@ -0,0 +1,182 @@
|
||||||
|
package getsvc
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"crypto/rand"
|
||||||
|
"fmt"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/refs"
|
||||||
|
coreContainer "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/policy"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger/test"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
|
||||||
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
||||||
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/erasurecode"
|
||||||
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/transformer"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/version"
|
||||||
|
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
type containerStorage struct {
|
||||||
|
cnt *container.Container
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cs *containerStorage) Get(cid.ID) (*coreContainer.Container, error) {
|
||||||
|
coreCnt := coreContainer.Container{
|
||||||
|
Value: *cs.cnt,
|
||||||
|
}
|
||||||
|
return &coreCnt, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cs *containerStorage) DeletionInfo(cid.ID) (*coreContainer.DelInfo, error) {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestGetRangeEC(t *testing.T) {
|
||||||
|
var dataCount uint32 = 3
|
||||||
|
var parityCount uint32 = 1
|
||||||
|
cnr := container.Container{}
|
||||||
|
p := netmap.PlacementPolicy{}
|
||||||
|
p.SetContainerBackupFactor(1)
|
||||||
|
x := netmap.ReplicaDescriptor{}
|
||||||
|
x.SetECDataCount(dataCount)
|
||||||
|
x.SetECParityCount(parityCount)
|
||||||
|
p.AddReplicas(x)
|
||||||
|
cnr.SetPlacementPolicy(p)
|
||||||
|
|
||||||
|
var idCnr cid.ID
|
||||||
|
container.CalculateID(&idCnr, cnr)
|
||||||
|
|
||||||
|
ns, as := testNodeMatrix(t, []int{4})
|
||||||
|
|
||||||
|
testGetRange := func(t *testing.T, svc *Service, addr oid.Address, from, to uint64, payload []byte) {
|
||||||
|
w := NewSimpleObjectWriter()
|
||||||
|
rngPrm := newRngPrm(false, w, from, to-from)
|
||||||
|
rngPrm.WithAddress(addr)
|
||||||
|
|
||||||
|
err := svc.GetRange(context.Background(), rngPrm)
|
||||||
|
require.NoError(t, err)
|
||||||
|
if from == to {
|
||||||
|
require.Nil(t, w.Object().Payload())
|
||||||
|
} else {
|
||||||
|
require.Equal(t, payload[from:to], w.Object().Payload())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
newSvc := func(b *testPlacementBuilder, c *testClientCache) *Service {
|
||||||
|
const curEpoch = 13
|
||||||
|
|
||||||
|
return &Service{
|
||||||
|
log: test.NewLogger(t),
|
||||||
|
localStorage: newTestStorage(),
|
||||||
|
traverserGenerator: &testTraverserGenerator{
|
||||||
|
c: cnr,
|
||||||
|
b: map[uint64]placement.Builder{
|
||||||
|
curEpoch: b,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
epochSource: testEpochReceiver(curEpoch),
|
||||||
|
remoteStorageConstructor: c,
|
||||||
|
keyStore: &testKeyStorage{},
|
||||||
|
containerSource: &containerStorage{
|
||||||
|
cnt: &cnr,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
const totalSize = 5
|
||||||
|
obj, parts := objectECChain(t, &idCnr, &cnr, totalSize, totalSize)
|
||||||
|
require.Len(t, parts, int(dataCount+parityCount))
|
||||||
|
require.Len(t, obj.Payload(), totalSize)
|
||||||
|
|
||||||
|
addr := object.AddressOf(obj)
|
||||||
|
builder := &testPlacementBuilder{
|
||||||
|
vectors: map[string][][]netmap.NodeInfo{
|
||||||
|
addr.EncodeToString(): ns,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
clients := map[string]*testClient{}
|
||||||
|
for i, part := range parts {
|
||||||
|
builder.vectors[object.AddressOf(part).EncodeToString()] = ns
|
||||||
|
|
||||||
|
tc := newTestClient()
|
||||||
|
|
||||||
|
ecInfo := objectSDK.NewECInfo()
|
||||||
|
|
||||||
|
chunk := objectSDK.ECChunk{}
|
||||||
|
chunk.Total = uint32(len(parts))
|
||||||
|
chunk.Index = uint32(i)
|
||||||
|
id, _ := part.ID()
|
||||||
|
idv2 := refs.ObjectID{}
|
||||||
|
id.WriteToV2(&idv2)
|
||||||
|
chunk.ID = idv2
|
||||||
|
|
||||||
|
ecInfo.AddChunk(chunk)
|
||||||
|
errECInfo := objectSDK.NewECInfoError(ecInfo)
|
||||||
|
|
||||||
|
tc.addResult(addr, nil, errECInfo)
|
||||||
|
tc.addResult(object.AddressOf(part), part, nil)
|
||||||
|
|
||||||
|
clients[as[0][i]] = tc
|
||||||
|
}
|
||||||
|
|
||||||
|
svc := newSvc(builder, &testClientCache{
|
||||||
|
clients: clients,
|
||||||
|
})
|
||||||
|
|
||||||
|
for from := 0; from < totalSize-1; from++ {
|
||||||
|
for to := from; to < totalSize; to++ {
|
||||||
|
t.Run(fmt.Sprintf("from=%d,to=%d", from, to), func(t *testing.T) {
|
||||||
|
testGetRange(t, svc, addr, uint64(from), uint64(to), obj.Payload())
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func objectECChain(t *testing.T, cnrId *cid.ID, cnr *container.Container, singleSize, totalSize uint64) (*objectSDK.Object, []*objectSDK.Object) {
|
||||||
|
pk, err := keys.NewPrivateKey()
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
tt := new(testTarget)
|
||||||
|
p := transformer.NewPayloadSizeLimiter(transformer.Params{
|
||||||
|
Key: &pk.PrivateKey,
|
||||||
|
NextTargetInit: func() transformer.ObjectWriter { return tt },
|
||||||
|
NetworkState: testEpochReceiver(1),
|
||||||
|
MaxSize: singleSize,
|
||||||
|
})
|
||||||
|
|
||||||
|
payload := make([]byte, totalSize)
|
||||||
|
_, err = rand.Read(payload)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
ver := version.Current()
|
||||||
|
hdr := objectSDK.New()
|
||||||
|
hdr.SetContainerID(*cnrId)
|
||||||
|
hdr.SetType(objectSDK.TypeRegular)
|
||||||
|
hdr.SetVersion(&ver)
|
||||||
|
|
||||||
|
ctx := context.Background()
|
||||||
|
require.NoError(t, p.WriteHeader(ctx, hdr))
|
||||||
|
|
||||||
|
_, err = p.Write(ctx, payload)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
_, err = p.Close(ctx)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
require.Len(t, tt.objects, 1)
|
||||||
|
|
||||||
|
c, err := erasurecode.NewConstructor(policy.ECDataCount(cnr.PlacementPolicy()), policy.ECParityCount(cnr.PlacementPolicy()))
|
||||||
|
require.NoError(t, err)
|
||||||
|
parts, err := c.Split(tt.objects[0], &pk.PrivateKey)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
return tt.objects[0], parts
|
||||||
|
}
|
|
@ -132,6 +132,9 @@ func (f *getRangeRequestForwarder) readStream(ctx context.Context, rangeStream *
|
||||||
case *objectV2.SplitInfo:
|
case *objectV2.SplitInfo:
|
||||||
si := objectSDK.NewSplitInfoFromV2(v)
|
si := objectSDK.NewSplitInfoFromV2(v)
|
||||||
return objectSDK.NewSplitInfoError(si)
|
return objectSDK.NewSplitInfoError(si)
|
||||||
|
case *objectV2.ECInfo:
|
||||||
|
ei := objectSDK.NewECInfoFromV2(v)
|
||||||
|
return objectSDK.NewECInfoError(ei)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
|
|
|
@ -104,10 +104,13 @@ func (s *Service) GetRange(req *objectV2.GetRangeRequest, stream objectSvc.GetOb
|
||||||
err = s.svc.GetRange(stream.Context(), *p)
|
err = s.svc.GetRange(stream.Context(), *p)
|
||||||
|
|
||||||
var splitErr *objectSDK.SplitInfoError
|
var splitErr *objectSDK.SplitInfoError
|
||||||
|
var ecErr *objectSDK.ECInfoError
|
||||||
|
|
||||||
switch {
|
switch {
|
||||||
case errors.As(err, &splitErr):
|
case errors.As(err, &splitErr):
|
||||||
return stream.Send(splitInfoRangeResponse(splitErr.SplitInfo()))
|
return stream.Send(splitInfoRangeResponse(splitErr.SplitInfo()))
|
||||||
|
case errors.As(err, &ecErr):
|
||||||
|
return stream.Send(ecInfoRangeResponse(ecErr.ECInfo()))
|
||||||
default:
|
default:
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
@ -292,6 +292,17 @@ func splitInfoRangeResponse(info *objectSDK.SplitInfo) *objectV2.GetRangeRespons
|
||||||
return resp
|
return resp
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func ecInfoRangeResponse(info *objectSDK.ECInfo) *objectV2.GetRangeResponse {
|
||||||
|
resp := new(objectV2.GetRangeResponse)
|
||||||
|
|
||||||
|
body := new(objectV2.GetRangeResponseBody)
|
||||||
|
resp.SetBody(body)
|
||||||
|
|
||||||
|
body.SetRangePart(info.ToV2())
|
||||||
|
|
||||||
|
return resp
|
||||||
|
}
|
||||||
|
|
||||||
func setSplitInfoHeadResponse(info *objectSDK.SplitInfo, resp *objectV2.HeadResponse) {
|
func setSplitInfoHeadResponse(info *objectSDK.SplitInfo, resp *objectV2.HeadResponse) {
|
||||||
resp.GetBody().SetHeaderPart(info.ToV2())
|
resp.GetBody().SetHeaderPart(info.ToV2())
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue