node: Implement Range\RangeHash
requests for EC object #1112
1 changed files with 182 additions and 0 deletions
182
pkg/services/object/get/getrangeec_test.go
Normal file
182
pkg/services/object/get/getrangeec_test.go
Normal file
|
@ -0,0 +1,182 @@
|
|||
package getsvc
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/rand"
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/refs"
|
||||
coreContainer "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/policy"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger/test"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
|
||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/erasurecode"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/transformer"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/version"
|
||||
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
type containerStorage struct {
|
||||
cnt *container.Container
|
||||
}
|
||||
|
||||
func (cs *containerStorage) Get(cid.ID) (*coreContainer.Container, error) {
|
||||
coreCnt := coreContainer.Container{
|
||||
Value: *cs.cnt,
|
||||
}
|
||||
return &coreCnt, nil
|
||||
}
|
||||
|
||||
func (cs *containerStorage) DeletionInfo(cid.ID) (*coreContainer.DelInfo, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func TestGetRangeEC(t *testing.T) {
|
||||
var dataCount uint32 = 3
|
||||
var parityCount uint32 = 1
|
||||
cnr := container.Container{}
|
||||
p := netmap.PlacementPolicy{}
|
||||
p.SetContainerBackupFactor(1)
|
||||
x := netmap.ReplicaDescriptor{}
|
||||
x.SetECDataCount(dataCount)
|
||||
x.SetECParityCount(parityCount)
|
||||
p.AddReplicas(x)
|
||||
cnr.SetPlacementPolicy(p)
|
||||
|
||||
var idCnr cid.ID
|
||||
container.CalculateID(&idCnr, cnr)
|
||||
|
||||
ns, as := testNodeMatrix(t, []int{4})
|
||||
|
||||
testGetRange := func(t *testing.T, svc *Service, addr oid.Address, from, to uint64, payload []byte) {
|
||||
w := NewSimpleObjectWriter()
|
||||
rngPrm := newRngPrm(false, w, from, to-from)
|
||||
rngPrm.WithAddress(addr)
|
||||
|
||||
err := svc.GetRange(context.Background(), rngPrm)
|
||||
require.NoError(t, err)
|
||||
if from == to {
|
||||
require.Nil(t, w.Object().Payload())
|
||||
} else {
|
||||
require.Equal(t, payload[from:to], w.Object().Payload())
|
||||
}
|
||||
}
|
||||
|
||||
newSvc := func(b *testPlacementBuilder, c *testClientCache) *Service {
|
||||
const curEpoch = 13
|
||||
|
||||
return &Service{
|
||||
log: test.NewLogger(t),
|
||||
localStorage: newTestStorage(),
|
||||
traverserGenerator: &testTraverserGenerator{
|
||||
c: cnr,
|
||||
b: map[uint64]placement.Builder{
|
||||
curEpoch: b,
|
||||
},
|
||||
},
|
||||
epochSource: testEpochReceiver(curEpoch),
|
||||
remoteStorageConstructor: c,
|
||||
keyStore: &testKeyStorage{},
|
||||
containerSource: &containerStorage{
|
||||
cnt: &cnr,
|
||||
},
|
||||
}
|
||||
}
|
||||
const totalSize = 5
|
||||
obj, parts := objectECChain(t, &idCnr, &cnr, totalSize, totalSize)
|
||||
require.Len(t, parts, int(dataCount+parityCount))
|
||||
require.Len(t, obj.Payload(), totalSize)
|
||||
|
||||
addr := object.AddressOf(obj)
|
||||
builder := &testPlacementBuilder{
|
||||
vectors: map[string][][]netmap.NodeInfo{
|
||||
addr.EncodeToString(): ns,
|
||||
},
|
||||
}
|
||||
|
||||
clients := map[string]*testClient{}
|
||||
for i, part := range parts {
|
||||
builder.vectors[object.AddressOf(part).EncodeToString()] = ns
|
||||
|
||||
tc := newTestClient()
|
||||
|
||||
ecInfo := objectSDK.NewECInfo()
|
||||
|
||||
chunk := objectSDK.ECChunk{}
|
||||
chunk.Total = uint32(len(parts))
|
||||
chunk.Index = uint32(i)
|
||||
id, _ := part.ID()
|
||||
idv2 := refs.ObjectID{}
|
||||
id.WriteToV2(&idv2)
|
||||
chunk.ID = idv2
|
||||
|
||||
ecInfo.AddChunk(chunk)
|
||||
errECInfo := objectSDK.NewECInfoError(ecInfo)
|
||||
|
||||
tc.addResult(addr, nil, errECInfo)
|
||||
tc.addResult(object.AddressOf(part), part, nil)
|
||||
|
||||
clients[as[0][i]] = tc
|
||||
}
|
||||
|
||||
svc := newSvc(builder, &testClientCache{
|
||||
clients: clients,
|
||||
})
|
||||
|
||||
for from := 0; from < totalSize-1; from++ {
|
||||
for to := from; to < totalSize; to++ {
|
||||
t.Run(fmt.Sprintf("from=%d,to=%d", from, to), func(t *testing.T) {
|
||||
testGetRange(t, svc, addr, uint64(from), uint64(to), obj.Payload())
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func objectECChain(t *testing.T, cnrId *cid.ID, cnr *container.Container, singleSize, totalSize uint64) (*objectSDK.Object, []*objectSDK.Object) {
|
||||
pk, err := keys.NewPrivateKey()
|
||||
require.NoError(t, err)
|
||||
|
||||
tt := new(testTarget)
|
||||
p := transformer.NewPayloadSizeLimiter(transformer.Params{
|
||||
Key: &pk.PrivateKey,
|
||||
NextTargetInit: func() transformer.ObjectWriter { return tt },
|
||||
NetworkState: testEpochReceiver(1),
|
||||
MaxSize: singleSize,
|
||||
})
|
||||
|
||||
payload := make([]byte, totalSize)
|
||||
_, err = rand.Read(payload)
|
||||
require.NoError(t, err)
|
||||
|
||||
ver := version.Current()
|
||||
hdr := objectSDK.New()
|
||||
hdr.SetContainerID(*cnrId)
|
||||
hdr.SetType(objectSDK.TypeRegular)
|
||||
hdr.SetVersion(&ver)
|
||||
|
||||
ctx := context.Background()
|
||||
require.NoError(t, p.WriteHeader(ctx, hdr))
|
||||
|
||||
_, err = p.Write(ctx, payload)
|
||||
require.NoError(t, err)
|
||||
|
||||
_, err = p.Close(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Len(t, tt.objects, 1)
|
||||
|
||||
c, err := erasurecode.NewConstructor(policy.ECDataCount(cnr.PlacementPolicy()), policy.ECParityCount(cnr.PlacementPolicy()))
|
||||
require.NoError(t, err)
|
||||
parts, err := c.Split(tt.objects[0], &pk.PrivateKey)
|
||||
require.NoError(t, err)
|
||||
|
||||
return tt.objects[0], parts
|
||||
}
|
Loading…
Reference in a new issue