diff --git a/pkg/services/object/get/assembler.go b/pkg/services/object/get/assembler.go index f10c67e5..025296ec 100644 --- a/pkg/services/object/get/assembler.go +++ b/pkg/services/object/get/assembler.go @@ -114,7 +114,7 @@ func (a *assembler) initializeFromSourceObjectID(ctx context.Context, id oid.ID) } to := uint64(0) - if seekOff+seekLen > a.currentOffset+from { + if seekOff+seekLen >= a.currentOffset+from { to = seekOff + seekLen - a.currentOffset } diff --git a/pkg/services/object/get/get_test.go b/pkg/services/object/get/get_test.go index 64614d8d..be1e96c2 100644 --- a/pkg/services/object/get/get_test.go +++ b/pkg/services/object/get/get_test.go @@ -1,6 +1,7 @@ package getsvc import ( + "bytes" "context" "crypto/ecdsa" "crypto/rand" @@ -25,6 +26,9 @@ import ( objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/transformer" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/version" + "github.com/nspcc-dev/neo-go/pkg/crypto/keys" "github.com/stretchr/testify/require" ) @@ -61,6 +65,9 @@ type testEpochReceiver uint64 func (e testEpochReceiver) Epoch() (uint64, error) { return uint64(e), nil } +func (e testEpochReceiver) CurrentEpoch() uint64 { + return uint64(e) +} func newTestStorage() *testStorage { return &testStorage{ @@ -555,21 +562,6 @@ func TestGetRemoteSmall(t *testing.T) { return p } - newRngPrm := func(raw bool, w ChunkWriter, off, ln uint64) RangePrm { - p := RangePrm{} - p.SetChunkWriter(w) - p.WithRawFlag(raw) - p.common = new(util.CommonPrm).WithLocalOnly(false) - - r := objectSDK.NewRange() - r.SetOffset(off) - r.SetLength(ln) - - p.SetRange(r) - - return p - } - newHeadPrm := func(raw bool, w ObjectWriter) HeadPrm { p := HeadPrm{} p.SetHeaderWriter(w) @@ -1628,6 +1620,204 @@ func TestGetRemoteSmall(t *testing.T) { }) } +type testTarget struct { + objects []*objectSDK.Object +} + +func (tt *testTarget) WriteObject(_ context.Context, obj *objectSDK.Object) error { + tt.objects = append(tt.objects, obj) + return nil +} + +func objectChain(t *testing.T, cnr cid.ID, singleSize, totalSize uint64) (oid.ID, []*objectSDK.Object, *objectSDK.Object, []byte) { + pk, err := keys.NewPrivateKey() + require.NoError(t, err) + + tt := new(testTarget) + p := transformer.NewPayloadSizeLimiter(transformer.Params{ + Key: &pk.PrivateKey, + NextTargetInit: func() transformer.ObjectWriter { return tt }, + NetworkState: testEpochReceiver(1), + MaxSize: singleSize, + }) + + payload := make([]byte, totalSize) + _, err = rand.Read(payload) + require.NoError(t, err) + + ver := version.Current() + hdr := objectSDK.New() + hdr.SetContainerID(cnr) + hdr.SetType(objectSDK.TypeRegular) + hdr.SetVersion(&ver) + + ctx := context.Background() + require.NoError(t, p.WriteHeader(ctx, hdr)) + + _, err = p.Write(ctx, payload) + require.NoError(t, err) + + res, err := p.Close(ctx) + require.NoError(t, err) + + if totalSize <= singleSize { + // Small object, no linking. + require.Len(t, tt.objects, 1) + return res.SelfID, tt.objects, nil, payload + } + + return *res.ParentID, tt.objects[:len(tt.objects)-1], tt.objects[len(tt.objects)-1], bytes.Clone(payload) +} + +func newRngPrm(raw bool, w ChunkWriter, off, ln uint64) RangePrm { + p := RangePrm{} + p.SetChunkWriter(w) + p.WithRawFlag(raw) + p.common = new(util.CommonPrm) + + r := objectSDK.NewRange() + r.SetOffset(off) + r.SetLength(ln) + + p.SetRange(r) + return p +} + +func TestGetRange(t *testing.T) { + var cnr container.Container + cnr.SetPlacementPolicy(netmaptest.PlacementPolicy()) + + var idCnr cid.ID + container.CalculateID(&idCnr, cnr) + + ns, as := testNodeMatrix(t, []int{2}) + + testGetRange := func(t *testing.T, svc *Service, addr oid.Address, from, to uint64, payload []byte) { + w := NewSimpleObjectWriter() + rngPrm := newRngPrm(false, w, from, to-from) + rngPrm.WithAddress(addr) + + err := svc.GetRange(context.Background(), rngPrm) + require.NoError(t, err) + if from == to { + require.Nil(t, w.Object().Payload()) + } else { + require.Equal(t, payload[from:to], w.Object().Payload()) + } + } + + newSvc := func(b *testPlacementBuilder, c *testClientCache) *Service { + const curEpoch = 13 + + return &Service{ + log: test.NewLogger(t), + localStorage: newTestStorage(), + traverserGenerator: &testTraverserGenerator{ + c: cnr, + b: map[uint64]placement.Builder{ + curEpoch: b, + }, + }, + epochSource: testEpochReceiver(curEpoch), + remoteStorageConstructor: c, + keyStore: &testKeyStorage{}, + } + } + + t.Run("small", func(t *testing.T) { + const totalSize = 5 + _, objs, _, payload := objectChain(t, idCnr, totalSize, totalSize) + require.Len(t, objs, 1) + require.Len(t, payload, totalSize) + + obj := objs[0] + addr := object.AddressOf(obj) + builder := &testPlacementBuilder{vectors: map[string][][]netmap.NodeInfo{addr.EncodeToString(): ns}} + + c1 := newTestClient() + c1.addResult(addr, obj, nil) + + svc := newSvc(builder, &testClientCache{ + clients: map[string]*testClient{ + as[0][0]: c1, + as[0][1]: c1, + }, + }) + + for from := 0; from < totalSize-1; from++ { + for to := from; to < totalSize; to++ { + t.Run(fmt.Sprintf("from=%d,to=%d", from, to), func(t *testing.T) { + testGetRange(t, svc, addr, uint64(from), uint64(to), payload) + }) + } + } + + }) + t.Run("big", func(t *testing.T) { + const totalSize = 9 + id, objs, link, payload := objectChain(t, idCnr, 3, totalSize) // 3 parts + require.Equal(t, totalSize, len(payload)) + + builder := &testPlacementBuilder{vectors: map[string][][]netmap.NodeInfo{}} + builder.vectors[idCnr.EncodeToString()+"/"+id.EncodeToString()] = ns + builder.vectors[object.AddressOf(link).EncodeToString()] = ns + for i := range objs { + builder.vectors[object.AddressOf(objs[i]).EncodeToString()] = ns + } + + var addr oid.Address + addr.SetContainer(idCnr) + addr.SetObject(id) + + const ( + linkingLast = "splitinfo=last" + linkingChildren = "splitinfo=children" + linkingBoth = "splitinfo=both" + ) + + lastID, _ := objs[len(objs)-1].ID() + linkID, _ := link.ID() + + for _, kind := range []string{linkingLast, linkingChildren, linkingBoth} { + t.Run(kind, func(t *testing.T) { + c1 := newTestClient() + for i := range objs { + c1.addResult(object.AddressOf(objs[i]), objs[i], nil) + } + + c1.addResult(object.AddressOf(link), link, nil) + + si := objectSDK.NewSplitInfo() + switch kind { + case linkingLast: + si.SetLastPart(lastID) + case linkingChildren: + si.SetLink(linkID) + case linkingBoth: + si.SetLastPart(lastID) + si.SetLink(linkID) + } + c1.addResult(addr, nil, objectSDK.NewSplitInfoError(si)) + + svc := newSvc(builder, &testClientCache{ + clients: map[string]*testClient{ + as[0][0]: c1, + as[0][1]: c1, + }, + }) + + for from := 0; from < totalSize-1; from++ { + for to := from; to < totalSize; to++ { + t.Run(fmt.Sprintf("from=%d,to=%d", from, to), func(t *testing.T) { + testGetRange(t, svc, addr, uint64(from), uint64(to), payload) + }) + } + } + }) + } + }) +} + func TestGetFromPastEpoch(t *testing.T) { ctx := context.Background()