[#1163] metabase: Properly save EC parent split ID

Search by SplitID should return all parts of a complex object.

Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
This commit is contained in:
Evgenii Stratonikov 2024-06-05 11:14:06 +03:00
parent 8fcd0f8f8d
commit 2e074d3846
2 changed files with 115 additions and 0 deletions

View file

@ -347,6 +347,18 @@ func updateListIndexes(tx *bbolt.Tx, obj *objectSDK.Object, f updateIndexItemFun
if err != nil {
return err
}
if ech.ParentSplitID() != nil {
objKey := objectKey(ech.Parent(), make([]byte, objectKeySize))
err := f(tx, namedBucketItem{
name: splitBucketName(cnr, bucketName),
key: ech.ParentSplitID().ToV2(),
val: objKey,
})
if err != nil {
return err
}
}
}
return nil

View file

@ -3,6 +3,7 @@ package meta_test
import (
"context"
"encoding/hex"
"math/rand"
"strconv"
"testing"
@ -13,9 +14,13 @@ import (
cidSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/erasurecode"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/transformer"
usertest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user/test"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/version"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"github.com/stretchr/testify/require"
)
@ -628,6 +633,104 @@ func TestDB_SelectObjectID(t *testing.T) {
})
}
type testTarget struct {
objects []*objectSDK.Object
}
func (tt *testTarget) WriteObject(_ context.Context, obj *objectSDK.Object) error {
tt.objects = append(tt.objects, obj)
return nil
}
func cutObject(t *testing.T, p transformer.ChunkedObjectWriter, hdr *objectSDK.Object, size int) {
ctx := context.Background()
require.NoError(t, p.WriteHeader(ctx, hdr))
payload := make([]byte, size)
rand.New(rand.NewSource(0)).Read(payload)
_, err := p.Write(ctx, payload)
require.NoError(t, err)
_, err = p.Close(ctx)
require.NoError(t, err)
}
func TestDB_SelectSplitID_EC(t *testing.T) {
t.Parallel()
const (
partSize = 10
partCount = 2
dataCount = 2
parityCount = 1
)
db := newDB(t)
defer func() { require.NoError(t, db.Close()) }()
cnr := cidtest.ID()
pk, err := keys.NewPrivateKey()
require.NoError(t, err)
tt := new(testTarget)
p := transformer.NewPayloadSizeLimiter(transformer.Params{
Key: &pk.PrivateKey,
NextTargetInit: func() transformer.ObjectWriter { return tt },
NetworkState: epochState{e: 1},
MaxSize: partSize,
})
hdr := objectSDK.New()
hdr.SetContainerID(cnr)
hdr.SetOwnerID(usertest.ID())
cutObject(t, p, hdr, partSize*partCount)
require.Equal(t, len(tt.objects), partCount+1)
split := tt.objects[0].SplitID()
require.NotNil(t, split)
ec, err := erasurecode.NewConstructor(dataCount, parityCount)
require.NoError(t, err)
for i := 0; i < partCount; i++ {
cs, err := ec.Split(tt.objects[i], &pk.PrivateKey)
require.NoError(t, err)
require.NoError(t, putBig(db, cs[0]))
}
t.Run("not present", func(t *testing.T) {
fs := objectSDK.SearchFilters{}
fs.AddFilter(v2object.FilterHeaderSplitID, "", objectSDK.MatchNotPresent)
testSelect(t, db, cnr, fs)
})
t.Run("split id", func(t *testing.T) {
fs := objectSDK.SearchFilters{}
fs.AddFilter(v2object.FilterHeaderSplitID, split.String(), objectSDK.MatchStringEqual)
testSelect(t, db, cnr, fs,
object.AddressOf(tt.objects[0]),
object.AddressOf(tt.objects[1]),
)
})
t.Run("empty split", func(t *testing.T) {
fs := objectSDK.SearchFilters{}
fs.AddFilter(v2object.FilterHeaderSplitID, "", objectSDK.MatchStringEqual)
testSelect(t, db, cnr, fs)
})
t.Run("unknown split id", func(t *testing.T) {
fs := objectSDK.SearchFilters{}
fs.AddFilter(v2object.FilterHeaderSplitID,
objectSDK.NewSplitID().String(),
objectSDK.MatchStringEqual)
testSelect(t, db, cnr, fs)
})
}
func TestDB_SelectSplitID(t *testing.T) {
t.Parallel()