Fixes for split-related EC queries #1163
2 changed files with 168 additions and 71 deletions
|
@ -244,52 +244,49 @@ func putUniqueIndexes(
|
|||
|
||||
// index root object
|
||||
if obj.Type() == objectSDK.TypeRegular && !obj.HasParent() {
|
||||
var (
|
||||
err error
|
||||
splitInfo []byte
|
||||
)
|
||||
|
||||
if isParent {
|
||||
splitInfo, err = si.Marshal()
|
||||
if err != nil {
|
||||
return fmt.Errorf("can't marshal split info: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
isObjKeySet := true
|
||||
if ecHead := obj.ECHeader(); ecHead != nil {
|
||||
objKey, isObjKeySet = objectKeyByECHeader(ecHead)
|
||||
}
|
||||
if isObjKeySet {
|
||||
err = putUniqueIndexItem(tx, namedBucketItem{
|
||||
name: rootBucketName(cnr, bucketName),
|
||||
key: objKey,
|
||||
val: splitInfo,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
parentID := ecHead.Parent()
|
||||
if ecHead.ParentSplitID() != nil {
|
||||
parentSplitParentID := ecHead.ParentSplitParentID()
|
||||
if parentSplitParentID == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
si = objectSDK.NewSplitInfo()
|
||||
si.SetSplitID(ecHead.ParentSplitID())
|
||||
si.SetLastPart(ecHead.Parent())
|
||||
|
||||
parentID = *parentSplitParentID
|
||||
}
|
||||
objKey = objectKey(parentID, objKey)
|
||||
}
|
||||
return updateSplitInfoIndex(tx, objKey, cnr, bucketName, si)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// objectKeyByECHeader returns objectKey for an object that has EC Header.
|
||||
// If object's parent is in Split, then parent's non-nil Split parent ID is set to object key.
|
||||
// If object's parent is not in Split, then its ID is set to object key.
|
||||
// Otherwise, such object keys should be ignored -- they are not put to the root bucket.
|
||||
func objectKeyByECHeader(ech *objectSDK.ECHeader) (objKey []byte, isSet bool) {
|
||||
if ech.ParentSplitID() != nil {
|
||||
if parentSplitParentID := ech.ParentSplitParentID(); parentSplitParentID != nil {
|
||||
isSet = true
|
||||
objKey = objectKey(*parentSplitParentID, make([]byte, objectKeySize))
|
||||
func updateSplitInfoIndex(tx *bbolt.Tx, objKey []byte, cnr cid.ID, bucketName []byte, si *objectSDK.SplitInfo) error {
|
||||
return updateUniqueIndexItem(tx, namedBucketItem{
|
||||
name: rootBucketName(cnr, bucketName),
|
||||
key: objKey,
|
||||
}, func(old, _ []byte) ([]byte, error) {
|
||||
switch {
|
||||
case si == nil && old == nil:
|
||||
return []byte{}, nil
|
||||
case si == nil:
|
||||
return old, nil
|
||||
case old == nil:
|
||||
return si.Marshal()
|
||||
default:
|
||||
oldSI := objectSDK.NewSplitInfo()
|
||||
if err := oldSI.Unmarshal(old); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
si = util.MergeSplitInfo(si, oldSI)
|
||||
return si.Marshal()
|
||||
}
|
||||
return
|
||||
}
|
||||
isSet = true
|
||||
objKey = objectKey(ech.Parent(), make([]byte, objectKeySize))
|
||||
return
|
||||
})
|
||||
}
|
||||
|
||||
type updateIndexItemFunc = func(tx *bbolt.Tx, item namedBucketItem) error
|
||||
|
@ -416,13 +413,21 @@ func createBucketLikelyExists[T bucketContainer](tx T, name []byte) (*bbolt.Buck
|
|||
return tx.CreateBucket(name)
|
||||
}
|
||||
|
||||
func putUniqueIndexItem(tx *bbolt.Tx, item namedBucketItem) error {
|
||||
func updateUniqueIndexItem(tx *bbolt.Tx, item namedBucketItem, update func(oldData, newData []byte) ([]byte, error)) error {
|
||||
|
||||
bkt, err := createBucketLikelyExists(tx, item.name)
|
||||
if err != nil {
|
||||
return fmt.Errorf("can't create index %v: %w", item.name, err)
|
||||
}
|
||||
|
||||
return bkt.Put(item.key, item.val)
|
||||
data, err := update(bkt.Get(item.key), item.val)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return bkt.Put(item.key, data)
|
||||
}
|
||||
|
||||
func putUniqueIndexItem(tx *bbolt.Tx, item namedBucketItem) error {
|
||||
return updateUniqueIndexItem(tx, item, func(_, val []byte) ([]byte, error) { return val, nil })
|
||||
}
|
||||
|
||||
func putFKBTIndexItem(tx *bbolt.Tx, item namedBucketItem) error {
|
||||
|
@ -548,36 +553,8 @@ func setStorageID(tx *bbolt.Tx, addr oid.Address, id []byte, override bool) erro
|
|||
// updateSpliInfo for existing objects if storage filled with extra information
|
||||
// about last object in split hierarchy or linking object.
|
||||
func updateSplitInfo(tx *bbolt.Tx, addr oid.Address, from *objectSDK.SplitInfo) error {
|
||||
key := make([]byte, bucketKeySize)
|
||||
bkt := tx.Bucket(rootBucketName(addr.Container(), key))
|
||||
if bkt == nil {
|
||||
// if object doesn't exists and we want to update split info on it
|
||||
// then ignore, this should never happen
|
||||
return ErrIncorrectSplitInfoUpdate
|
||||
}
|
||||
|
||||
objectKey := objectKey(addr.Object(), key)
|
||||
|
||||
rawSplitInfo := bkt.Get(objectKey)
|
||||
if len(rawSplitInfo) == 0 {
|
||||
return ErrIncorrectSplitInfoUpdate
|
||||
}
|
||||
|
||||
to := objectSDK.NewSplitInfo()
|
||||
|
||||
err := to.Unmarshal(rawSplitInfo)
|
||||
if err != nil {
|
||||
return fmt.Errorf("can't unmarshal split info from root index: %w", err)
|
||||
}
|
||||
|
||||
result := util.MergeSplitInfo(from, to)
|
||||
|
||||
rawSplitInfo, err = result.Marshal()
|
||||
if err != nil {
|
||||
return fmt.Errorf("can't marhsal merged split info: %w", err)
|
||||
}
|
||||
|
||||
return bkt.Put(objectKey, rawSplitInfo)
|
||||
objKey := objectKey(addr.Object(), make([]byte, bucketKeySize))
|
||||
return updateSplitInfoIndex(tx, objKey, addr.Container(), make([]byte, bucketKeySize), from)
|
||||
}
|
||||
|
||||
// splitInfoFromObject returns split info based on last or linkin object.
|
||||
|
|
|
@ -642,7 +642,7 @@ func (tt *testTarget) WriteObject(_ context.Context, obj *objectSDK.Object) erro
|
|||
return nil
|
||||
}
|
||||
|
||||
func cutObject(t *testing.T, p transformer.ChunkedObjectWriter, hdr *objectSDK.Object, size int) {
|
||||
func cutObject(t *testing.T, p transformer.ChunkedObjectWriter, hdr *objectSDK.Object, size int) *transformer.AccessIdentifiers {
|
||||
ctx := context.Background()
|
||||
require.NoError(t, p.WriteHeader(ctx, hdr))
|
||||
|
||||
|
@ -652,8 +652,128 @@ func cutObject(t *testing.T, p transformer.ChunkedObjectWriter, hdr *objectSDK.O
|
|||
_, err := p.Write(ctx, payload)
|
||||
require.NoError(t, err)
|
||||
|
||||
_, err = p.Close(ctx)
|
||||
ids, err := p.Close(ctx)
|
||||
require.NoError(t, err)
|
||||
return ids
|
||||
}
|
||||
|
||||
func TestDB_RawHead_SplitInfo(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
const (
|
||||
partSize = 10
|
||||
partCount = 2
|
||||
dataCount = 2
|
||||
parityCount = 1
|
||||
)
|
||||
|
||||
db := newDB(t)
|
||||
defer func() { require.NoError(t, db.Close()) }()
|
||||
|
||||
cnr := cidtest.ID()
|
||||
|
||||
pk, err := keys.NewPrivateKey()
|
||||
require.NoError(t, err)
|
||||
|
||||
tt := new(testTarget)
|
||||
p := transformer.NewPayloadSizeLimiter(transformer.Params{
|
||||
Key: &pk.PrivateKey,
|
||||
NextTargetInit: func() transformer.ObjectWriter { return tt },
|
||||
NetworkState: epochState{e: 1},
|
||||
MaxSize: partSize,
|
||||
})
|
||||
|
||||
hdr := objectSDK.New()
|
||||
hdr.SetContainerID(cnr)
|
||||
hdr.SetOwnerID(usertest.ID())
|
||||
ids := cutObject(t, p, hdr, partSize*partCount)
|
||||
require.Equal(t, len(tt.objects), partCount+1)
|
||||
|
||||
t.Run("rep", func(t *testing.T) {
|
||||
testGetRawSplitInfo(t, cnr, ids, tt.objects[partCount], tt.objects[partCount-1])
|
||||
})
|
||||
t.Run("with ec", func(t *testing.T) {
|
||||
ec, err := erasurecode.NewConstructor(dataCount, parityCount)
|
||||
require.NoError(t, err)
|
||||
|
||||
cs, err := ec.Split(tt.objects[partCount-1], &pk.PrivateKey)
|
||||
require.NoError(t, err)
|
||||
|
||||
testGetRawSplitInfo(t, cnr, ids, tt.objects[partCount], cs[0])
|
||||
})
|
||||
}
|
||||
|
||||
func testGetRawSplitInfo(t *testing.T, cnr cidSDK.ID, ids *transformer.AccessIdentifiers, linking, lastPart *objectSDK.Object) {
|
||||
expectedLinkID, ok := linking.ID()
|
||||
require.True(t, ok)
|
||||
|
||||
t.Run("first last, then linking", func(t *testing.T) {
|
||||
db := newDB(t)
|
||||
defer func() { require.NoError(t, db.Close()) }()
|
||||
|
||||
require.NoError(t, metaPut(db, lastPart, nil))
|
||||
require.NoError(t, metaPut(db, linking, nil))
|
||||
|
||||
var addr oid.Address
|
||||
addr.SetContainer(cnr)
|
||||
addr.SetObject(*ids.ParentID)
|
||||
|
||||
_, err := metaGet(db, addr, true)
|
||||
|
||||
var siErr *objectSDK.SplitInfoError
|
||||
require.ErrorAs(t, err, &siErr)
|
||||
|
||||
lastID, ok := siErr.SplitInfo().LastPart()
|
||||
require.True(t, ok)
|
||||
require.Equal(t, ids.SelfID, lastID)
|
||||
|
||||
linkID, ok := siErr.SplitInfo().Link()
|
||||
require.True(t, ok)
|
||||
require.Equal(t, expectedLinkID, linkID)
|
||||
})
|
||||
t.Run("first linking, then last", func(t *testing.T) {
|
||||
db := newDB(t)
|
||||
defer func() { require.NoError(t, db.Close()) }()
|
||||
|
||||
require.NoError(t, metaPut(db, linking, nil))
|
||||
require.NoError(t, metaPut(db, lastPart, nil))
|
||||
|
||||
var addr oid.Address
|
||||
addr.SetContainer(cnr)
|
||||
addr.SetObject(*ids.ParentID)
|
||||
|
||||
_, err := metaGet(db, addr, true)
|
||||
|
||||
var siErr *objectSDK.SplitInfoError
|
||||
require.ErrorAs(t, err, &siErr)
|
||||
|
||||
lastID, ok := siErr.SplitInfo().LastPart()
|
||||
require.True(t, ok)
|
||||
require.Equal(t, ids.SelfID, lastID)
|
||||
|
||||
linkID, ok := siErr.SplitInfo().Link()
|
||||
require.True(t, ok)
|
||||
require.Equal(t, expectedLinkID, linkID)
|
||||
})
|
||||
t.Run("only last part", func(t *testing.T) {
|
||||
db := newDB(t)
|
||||
defer func() { require.NoError(t, db.Close()) }()
|
||||
|
||||
require.NoError(t, metaPut(db, lastPart, nil))
|
||||
|
||||
var addr oid.Address
|
||||
addr.SetContainer(cnr)
|
||||
addr.SetObject(*ids.ParentID)
|
||||
|
||||
_, err := metaGet(db, addr, true)
|
||||
|
||||
var siErr *objectSDK.SplitInfoError
|
||||
require.ErrorAs(t, err, &siErr)
|
||||
|
||||
lastPart, ok := siErr.SplitInfo().LastPart()
|
||||
require.True(t, ok)
|
||||
require.Equal(t, ids.SelfID, lastPart)
|
||||
})
|
||||
}
|
||||
|
||||
func TestDB_SelectSplitID_EC(t *testing.T) {
|
||||
|
|
Loading…
Reference in a new issue
update func(oldSI, newSI []byte)
->update func(oldData, newData []byte)
or something else, but notSI
Fixed