objectsvc: Fix possible panic in GetRange() #1078
2 changed files with 206 additions and 16 deletions
|
@ -114,7 +114,7 @@ func (a *assembler) initializeFromSourceObjectID(ctx context.Context, id oid.ID)
|
|||
}
|
||||
|
||||
to := uint64(0)
|
||||
if seekOff+seekLen > a.currentOffset+from {
|
||||
if seekOff+seekLen >= a.currentOffset+from {
|
||||
to = seekOff + seekLen - a.currentOffset
|
||||
}
|
||||
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package getsvc
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"crypto/ecdsa"
|
||||
"crypto/rand"
|
||||
|
@ -25,6 +26,9 @@ import (
|
|||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/transformer"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/version"
|
||||
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
|
@ -61,6 +65,9 @@ type testEpochReceiver uint64
|
|||
func (e testEpochReceiver) Epoch() (uint64, error) {
|
||||
return uint64(e), nil
|
||||
}
|
||||
func (e testEpochReceiver) CurrentEpoch() uint64 {
|
||||
return uint64(e)
|
||||
}
|
||||
|
||||
func newTestStorage() *testStorage {
|
||||
return &testStorage{
|
||||
|
@ -555,21 +562,6 @@ func TestGetRemoteSmall(t *testing.T) {
|
|||
return p
|
||||
}
|
||||
|
||||
newRngPrm := func(raw bool, w ChunkWriter, off, ln uint64) RangePrm {
|
||||
p := RangePrm{}
|
||||
p.SetChunkWriter(w)
|
||||
p.WithRawFlag(raw)
|
||||
p.common = new(util.CommonPrm).WithLocalOnly(false)
|
||||
|
||||
r := objectSDK.NewRange()
|
||||
r.SetOffset(off)
|
||||
r.SetLength(ln)
|
||||
|
||||
p.SetRange(r)
|
||||
|
||||
return p
|
||||
}
|
||||
|
||||
newHeadPrm := func(raw bool, w ObjectWriter) HeadPrm {
|
||||
p := HeadPrm{}
|
||||
p.SetHeaderWriter(w)
|
||||
|
@ -1628,6 +1620,204 @@ func TestGetRemoteSmall(t *testing.T) {
|
|||
})
|
||||
}
|
||||
|
||||
type testTarget struct {
|
||||
objects []*objectSDK.Object
|
||||
}
|
||||
|
||||
func (tt *testTarget) WriteObject(_ context.Context, obj *objectSDK.Object) error {
|
||||
tt.objects = append(tt.objects, obj)
|
||||
return nil
|
||||
}
|
||||
|
||||
func objectChain(t *testing.T, cnr cid.ID, singleSize, totalSize uint64) (oid.ID, []*objectSDK.Object, *objectSDK.Object, []byte) {
|
||||
pk, err := keys.NewPrivateKey()
|
||||
require.NoError(t, err)
|
||||
|
||||
tt := new(testTarget)
|
||||
p := transformer.NewPayloadSizeLimiter(transformer.Params{
|
||||
Key: &pk.PrivateKey,
|
||||
NextTargetInit: func() transformer.ObjectWriter { return tt },
|
||||
NetworkState: testEpochReceiver(1),
|
||||
MaxSize: singleSize,
|
||||
})
|
||||
|
||||
payload := make([]byte, totalSize)
|
||||
_, err = rand.Read(payload)
|
||||
require.NoError(t, err)
|
||||
|
||||
ver := version.Current()
|
||||
hdr := objectSDK.New()
|
||||
hdr.SetContainerID(cnr)
|
||||
hdr.SetType(objectSDK.TypeRegular)
|
||||
hdr.SetVersion(&ver)
|
||||
|
||||
ctx := context.Background()
|
||||
require.NoError(t, p.WriteHeader(ctx, hdr))
|
||||
|
||||
_, err = p.Write(ctx, payload)
|
||||
require.NoError(t, err)
|
||||
|
||||
res, err := p.Close(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
if totalSize <= singleSize {
|
||||
// Small object, no linking.
|
||||
require.Len(t, tt.objects, 1)
|
||||
return res.SelfID, tt.objects, nil, payload
|
||||
}
|
||||
|
||||
return *res.ParentID, tt.objects[:len(tt.objects)-1], tt.objects[len(tt.objects)-1], bytes.Clone(payload)
|
||||
}
|
||||
|
||||
func newRngPrm(raw bool, w ChunkWriter, off, ln uint64) RangePrm {
|
||||
p := RangePrm{}
|
||||
p.SetChunkWriter(w)
|
||||
p.WithRawFlag(raw)
|
||||
p.common = new(util.CommonPrm)
|
||||
|
||||
r := objectSDK.NewRange()
|
||||
r.SetOffset(off)
|
||||
r.SetLength(ln)
|
||||
|
||||
p.SetRange(r)
|
||||
return p
|
||||
}
|
||||
|
||||
func TestGetRange(t *testing.T) {
|
||||
var cnr container.Container
|
||||
cnr.SetPlacementPolicy(netmaptest.PlacementPolicy())
|
||||
|
||||
var idCnr cid.ID
|
||||
container.CalculateID(&idCnr, cnr)
|
||||
|
||||
ns, as := testNodeMatrix(t, []int{2})
|
||||
|
||||
testGetRange := func(t *testing.T, svc *Service, addr oid.Address, from, to uint64, payload []byte) {
|
||||
w := NewSimpleObjectWriter()
|
||||
rngPrm := newRngPrm(false, w, from, to-from)
|
||||
rngPrm.WithAddress(addr)
|
||||
|
||||
err := svc.GetRange(context.Background(), rngPrm)
|
||||
require.NoError(t, err)
|
||||
if from == to {
|
||||
require.Nil(t, w.Object().Payload())
|
||||
} else {
|
||||
require.Equal(t, payload[from:to], w.Object().Payload())
|
||||
}
|
||||
}
|
||||
|
||||
newSvc := func(b *testPlacementBuilder, c *testClientCache) *Service {
|
||||
const curEpoch = 13
|
||||
|
||||
return &Service{
|
||||
log: test.NewLogger(t),
|
||||
localStorage: newTestStorage(),
|
||||
traverserGenerator: &testTraverserGenerator{
|
||||
c: cnr,
|
||||
b: map[uint64]placement.Builder{
|
||||
curEpoch: b,
|
||||
},
|
||||
},
|
||||
epochSource: testEpochReceiver(curEpoch),
|
||||
remoteStorageConstructor: c,
|
||||
keyStore: &testKeyStorage{},
|
||||
}
|
||||
}
|
||||
|
||||
t.Run("small", func(t *testing.T) {
|
||||
const totalSize = 5
|
||||
_, objs, _, payload := objectChain(t, idCnr, totalSize, totalSize)
|
||||
require.Len(t, objs, 1)
|
||||
require.Len(t, payload, totalSize)
|
||||
|
||||
obj := objs[0]
|
||||
addr := object.AddressOf(obj)
|
||||
builder := &testPlacementBuilder{vectors: map[string][][]netmap.NodeInfo{addr.EncodeToString(): ns}}
|
||||
|
||||
c1 := newTestClient()
|
||||
c1.addResult(addr, obj, nil)
|
||||
|
||||
svc := newSvc(builder, &testClientCache{
|
||||
clients: map[string]*testClient{
|
||||
as[0][0]: c1,
|
||||
as[0][1]: c1,
|
||||
},
|
||||
})
|
||||
|
||||
for from := 0; from < totalSize-1; from++ {
|
||||
for to := from; to < totalSize; to++ {
|
||||
t.Run(fmt.Sprintf("from=%d,to=%d", from, to), func(t *testing.T) {
|
||||
testGetRange(t, svc, addr, uint64(from), uint64(to), payload)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
})
|
||||
t.Run("big", func(t *testing.T) {
|
||||
const totalSize = 9
|
||||
id, objs, link, payload := objectChain(t, idCnr, 3, totalSize) // 3 parts
|
||||
require.Equal(t, totalSize, len(payload))
|
||||
|
||||
builder := &testPlacementBuilder{vectors: map[string][][]netmap.NodeInfo{}}
|
||||
builder.vectors[idCnr.EncodeToString()+"/"+id.EncodeToString()] = ns
|
||||
builder.vectors[object.AddressOf(link).EncodeToString()] = ns
|
||||
for i := range objs {
|
||||
builder.vectors[object.AddressOf(objs[i]).EncodeToString()] = ns
|
||||
}
|
||||
|
||||
var addr oid.Address
|
||||
addr.SetContainer(idCnr)
|
||||
addr.SetObject(id)
|
||||
|
||||
const (
|
||||
linkingLast = "splitinfo=last"
|
||||
linkingChildren = "splitinfo=children"
|
||||
linkingBoth = "splitinfo=both"
|
||||
)
|
||||
|
||||
lastID, _ := objs[len(objs)-1].ID()
|
||||
linkID, _ := link.ID()
|
||||
|
||||
for _, kind := range []string{linkingLast, linkingChildren, linkingBoth} {
|
||||
t.Run(kind, func(t *testing.T) {
|
||||
c1 := newTestClient()
|
||||
for i := range objs {
|
||||
c1.addResult(object.AddressOf(objs[i]), objs[i], nil)
|
||||
}
|
||||
|
||||
c1.addResult(object.AddressOf(link), link, nil)
|
||||
|
||||
si := objectSDK.NewSplitInfo()
|
||||
switch kind {
|
||||
case linkingLast:
|
||||
si.SetLastPart(lastID)
|
||||
case linkingChildren:
|
||||
si.SetLink(linkID)
|
||||
case linkingBoth:
|
||||
si.SetLastPart(lastID)
|
||||
si.SetLink(linkID)
|
||||
}
|
||||
c1.addResult(addr, nil, objectSDK.NewSplitInfoError(si))
|
||||
|
||||
svc := newSvc(builder, &testClientCache{
|
||||
clients: map[string]*testClient{
|
||||
as[0][0]: c1,
|
||||
as[0][1]: c1,
|
||||
},
|
||||
})
|
||||
|
||||
for from := 0; from < totalSize-1; from++ {
|
||||
for to := from; to < totalSize; to++ {
|
||||
t.Run(fmt.Sprintf("from=%d,to=%d", from, to), func(t *testing.T) {
|
||||
testGetRange(t, svc, addr, uint64(from), uint64(to), payload)
|
||||
})
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestGetFromPastEpoch(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
|
|
Loading…
Reference in a new issue