WIP: metabase: Add missing expiration epoch for object stored with EC #1585

Closed
a-savchuk wants to merge 1 commit from a-savchuk:exp-epoch-for-object-with-ec into master
2 changed files with 70 additions and 1 deletions

View file

@ -2,12 +2,18 @@ package meta_test
import ( import (
"context" "context"
"strconv"
"testing" "testing"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/object"
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test" cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/erasurecode"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/transformer"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
@ -92,3 +98,59 @@ func getAddressSafe(t *testing.T, o *objectSDK.Object) oid.Address {
addr.SetObject(id) addr.SetObject(id)
return addr return addr
} }
func TestPutExpiredComplexObjectWithEC(t *testing.T) {
const (
currEpoch = 100
expEpoch = currEpoch - 1
partSize = 1 << 10
partCount = 5
dataCount = 2
parityCount = 1
)
db := newDB(t, meta.WithEpochState(epochState{currEpoch}))
defer func() { require.NoError(t, db.Close(context.Background())) }()
cntr := cidtest.ID()
parent := testutil.GenerateObjectWithCID(cntr)
testutil.AddAttribute(parent, objectV2.SysAttributeExpEpoch, strconv.Itoa(expEpoch))
key, err := keys.NewPrivateKey()
require.NoError(t, err)
var target testTarget
ids := cutObject(t, transformer.NewPayloadSizeLimiter(transformer.Params{
Key: &key.PrivateKey,
NextTargetInit: func() transformer.ObjectWriter { return &target },
NetworkState: epochState{currEpoch},
MaxSize: partSize,
}), parent, partCount*partSize)
n := len(target.objects)
objects, link := target.objects[:n-1], target.objects[n-1]
ecc, err := erasurecode.NewConstructor(dataCount, parityCount)
require.NoError(t, err)
var chunks []*objectSDK.Object
for _, object := range objects {
objectChunks, err := ecc.Split(object, &key.PrivateKey)
require.NoError(t, err)
chunks = append(chunks, objectChunks...)
}
// link mustn't be first, it's needed to reproduces the bug
for _, chunk := range chunks {
require.NoError(t, metaPut(db, chunk, nil))
}
require.NoError(t, metaPut(db, link, nil))
var addr oid.Address
addr.SetContainer(cntr)
addr.SetObject(*ids.ParentID)
_, err = metaGet(db, addr, false)
require.ErrorIs(t, err, meta.ErrObjectIsExpired)
}

View file

@ -159,7 +159,14 @@ func (db *DB) updateObj(tx *bbolt.Tx, obj *objectSDK.Object, id []byte, si *obje
// a linking object to put (or vice versa), we should update split info // a linking object to put (or vice versa), we should update split info
// with object ids of these objects // with object ids of these objects
if isParent { if isParent {
return updateSplitInfo(tx, objectCore.AddressOf(obj), si) addr := objectCore.AddressOf(obj)
objKey := objectKey(addr.Object(), make([]byte, objectKeySize))
if err := putExpirationEpoch(tx, obj, addr, objKey); err != nil {
return err
}
return updateSplitInfo(tx, addr, si)
} }
return nil return nil