WIP: metabase: Add missing expiration epoch for object stored with EC #1585
2 changed files with 70 additions and 1 deletions
|
@ -2,12 +2,18 @@ package meta_test
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"strconv"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
|
||||||
|
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
||||||
|
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/object"
|
||||||
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
|
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
|
||||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/erasurecode"
|
||||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/transformer"
|
||||||
|
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -92,3 +98,59 @@ func getAddressSafe(t *testing.T, o *objectSDK.Object) oid.Address {
|
||||||
addr.SetObject(id)
|
addr.SetObject(id)
|
||||||
return addr
|
return addr
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestPutExpiredComplexObjectWithEC(t *testing.T) {
|
||||||
|
const (
|
||||||
|
currEpoch = 100
|
||||||
|
expEpoch = currEpoch - 1
|
||||||
|
partSize = 1 << 10
|
||||||
|
partCount = 5
|
||||||
|
dataCount = 2
|
||||||
|
parityCount = 1
|
||||||
|
)
|
||||||
|
|
||||||
|
db := newDB(t, meta.WithEpochState(epochState{currEpoch}))
|
||||||
|
defer func() { require.NoError(t, db.Close(context.Background())) }()
|
||||||
|
|
||||||
|
cntr := cidtest.ID()
|
||||||
|
|
||||||
|
parent := testutil.GenerateObjectWithCID(cntr)
|
||||||
|
testutil.AddAttribute(parent, objectV2.SysAttributeExpEpoch, strconv.Itoa(expEpoch))
|
||||||
|
|
||||||
|
key, err := keys.NewPrivateKey()
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
var target testTarget
|
||||||
|
ids := cutObject(t, transformer.NewPayloadSizeLimiter(transformer.Params{
|
||||||
|
Key: &key.PrivateKey,
|
||||||
|
NextTargetInit: func() transformer.ObjectWriter { return &target },
|
||||||
|
NetworkState: epochState{currEpoch},
|
||||||
|
MaxSize: partSize,
|
||||||
|
}), parent, partCount*partSize)
|
||||||
|
|
||||||
|
n := len(target.objects)
|
||||||
|
objects, link := target.objects[:n-1], target.objects[n-1]
|
||||||
|
|
||||||
|
ecc, err := erasurecode.NewConstructor(dataCount, parityCount)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
var chunks []*objectSDK.Object
|
||||||
|
for _, object := range objects {
|
||||||
|
objectChunks, err := ecc.Split(object, &key.PrivateKey)
|
||||||
|
require.NoError(t, err)
|
||||||
|
chunks = append(chunks, objectChunks...)
|
||||||
|
}
|
||||||
|
|
||||||
|
// link mustn't be first, it's needed to reproduces the bug
|
||||||
|
for _, chunk := range chunks {
|
||||||
|
require.NoError(t, metaPut(db, chunk, nil))
|
||||||
|
}
|
||||||
|
require.NoError(t, metaPut(db, link, nil))
|
||||||
|
|
||||||
|
var addr oid.Address
|
||||||
|
addr.SetContainer(cntr)
|
||||||
|
addr.SetObject(*ids.ParentID)
|
||||||
|
|
||||||
|
_, err = metaGet(db, addr, false)
|
||||||
|
require.ErrorIs(t, err, meta.ErrObjectIsExpired)
|
||||||
|
}
|
||||||
|
|
|
@ -159,7 +159,14 @@ func (db *DB) updateObj(tx *bbolt.Tx, obj *objectSDK.Object, id []byte, si *obje
|
||||||
// a linking object to put (or vice versa), we should update split info
|
// a linking object to put (or vice versa), we should update split info
|
||||||
// with object ids of these objects
|
// with object ids of these objects
|
||||||
if isParent {
|
if isParent {
|
||||||
return updateSplitInfo(tx, objectCore.AddressOf(obj), si)
|
addr := objectCore.AddressOf(obj)
|
||||||
|
|
||||||
|
objKey := objectKey(addr.Object(), make([]byte, objectKeySize))
|
||||||
|
if err := putExpirationEpoch(tx, obj, addr, objKey); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return updateSplitInfo(tx, addr, si)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
|
Loading…
Add table
Reference in a new issue