WIP: metabase: Add missing expiration epoch for object stored with EC #1585
2 changed files with 70 additions and 1 deletions
|
@ -2,12 +2,18 @@ package meta_test
|
|||
|
||||
import (
|
||||
"context"
|
||||
"strconv"
|
||||
"testing"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
|
||||
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
||||
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/object"
|
||||
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
|
||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/erasurecode"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/transformer"
|
||||
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
|
@ -92,3 +98,59 @@ func getAddressSafe(t *testing.T, o *objectSDK.Object) oid.Address {
|
|||
addr.SetObject(id)
|
||||
return addr
|
||||
}
|
||||
|
||||
func TestPutExpiredComplexObjectWithEC(t *testing.T) {
|
||||
const (
|
||||
currEpoch = 100
|
||||
expEpoch = currEpoch - 1
|
||||
partSize = 1 << 10
|
||||
partCount = 5
|
||||
dataCount = 2
|
||||
parityCount = 1
|
||||
)
|
||||
|
||||
db := newDB(t, meta.WithEpochState(epochState{currEpoch}))
|
||||
defer func() { require.NoError(t, db.Close(context.Background())) }()
|
||||
|
||||
cntr := cidtest.ID()
|
||||
|
||||
parent := testutil.GenerateObjectWithCID(cntr)
|
||||
testutil.AddAttribute(parent, objectV2.SysAttributeExpEpoch, strconv.Itoa(expEpoch))
|
||||
|
||||
key, err := keys.NewPrivateKey()
|
||||
require.NoError(t, err)
|
||||
|
||||
var target testTarget
|
||||
ids := cutObject(t, transformer.NewPayloadSizeLimiter(transformer.Params{
|
||||
Key: &key.PrivateKey,
|
||||
NextTargetInit: func() transformer.ObjectWriter { return &target },
|
||||
NetworkState: epochState{currEpoch},
|
||||
MaxSize: partSize,
|
||||
}), parent, partCount*partSize)
|
||||
|
||||
n := len(target.objects)
|
||||
objects, link := target.objects[:n-1], target.objects[n-1]
|
||||
|
||||
ecc, err := erasurecode.NewConstructor(dataCount, parityCount)
|
||||
require.NoError(t, err)
|
||||
|
||||
var chunks []*objectSDK.Object
|
||||
for _, object := range objects {
|
||||
objectChunks, err := ecc.Split(object, &key.PrivateKey)
|
||||
require.NoError(t, err)
|
||||
chunks = append(chunks, objectChunks...)
|
||||
}
|
||||
|
||||
// link mustn't be first, it's needed to reproduces the bug
|
||||
for _, chunk := range chunks {
|
||||
require.NoError(t, metaPut(db, chunk, nil))
|
||||
}
|
||||
require.NoError(t, metaPut(db, link, nil))
|
||||
|
||||
var addr oid.Address
|
||||
addr.SetContainer(cntr)
|
||||
addr.SetObject(*ids.ParentID)
|
||||
|
||||
_, err = metaGet(db, addr, false)
|
||||
require.ErrorIs(t, err, meta.ErrObjectIsExpired)
|
||||
}
|
||||
|
|
|
@ -159,7 +159,14 @@ func (db *DB) updateObj(tx *bbolt.Tx, obj *objectSDK.Object, id []byte, si *obje
|
|||
// a linking object to put (or vice versa), we should update split info
|
||||
// with object ids of these objects
|
||||
if isParent {
|
||||
return updateSplitInfo(tx, objectCore.AddressOf(obj), si)
|
||||
addr := objectCore.AddressOf(obj)
|
||||
|
||||
objKey := objectKey(addr.Object(), make([]byte, objectKeySize))
|
||||
if err := putExpirationEpoch(tx, obj, addr, objKey); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return updateSplitInfo(tx, addr, si)
|
||||
}
|
||||
|
||||
return nil
|
||||
|
|
Loading…
Add table
Reference in a new issue