From 3f1961157ec794af9a36df858f2cb32f2a42907f Mon Sep 17 00:00:00 2001 From: Evgenii Stratonikov Date: Thu, 6 Jun 2024 10:52:17 +0300 Subject: [PATCH] [#1163] metabase: Handle multiple splitInfos for EC For REP updating split info is handled explicitly by a high-level PUT logic. For EC it is trickier, because the address of an object we put is only distantly related to a split info. Signed-off-by: Evgenii Stratonikov --- pkg/local_object_storage/metabase/put.go | 115 +++++++--------- .../metabase/select_test.go | 124 +++++++++++++++++- 2 files changed, 168 insertions(+), 71 deletions(-) diff --git a/pkg/local_object_storage/metabase/put.go b/pkg/local_object_storage/metabase/put.go index 05922d1bd..ceb79758f 100644 --- a/pkg/local_object_storage/metabase/put.go +++ b/pkg/local_object_storage/metabase/put.go @@ -244,52 +244,49 @@ func putUniqueIndexes( // index root object if obj.Type() == objectSDK.TypeRegular && !obj.HasParent() { - var ( - err error - splitInfo []byte - ) - - if isParent { - splitInfo, err = si.Marshal() - if err != nil { - return fmt.Errorf("can't marshal split info: %w", err) - } - } - - isObjKeySet := true if ecHead := obj.ECHeader(); ecHead != nil { - objKey, isObjKeySet = objectKeyByECHeader(ecHead) - } - if isObjKeySet { - err = putUniqueIndexItem(tx, namedBucketItem{ - name: rootBucketName(cnr, bucketName), - key: objKey, - val: splitInfo, - }) - if err != nil { - return err + parentID := ecHead.Parent() + if ecHead.ParentSplitID() != nil { + parentSplitParentID := ecHead.ParentSplitParentID() + if parentSplitParentID == nil { + return nil + } + + si = objectSDK.NewSplitInfo() + si.SetSplitID(ecHead.ParentSplitID()) + si.SetLastPart(ecHead.Parent()) + + parentID = *parentSplitParentID } + objKey = objectKey(parentID, objKey) } + return updateSplitInfoIndex(tx, objKey, cnr, bucketName, si) } return nil } -// objectKeyByECHeader returns objectKey for an object that has EC Header. -// If object's parent is in Split, then parent's non-nil Split parent ID is set to object key. -// If object's parent is not in Split, then its ID is set to object key. -// Otherwise, such object keys should be ignored -- they are not put to the root bucket. -func objectKeyByECHeader(ech *objectSDK.ECHeader) (objKey []byte, isSet bool) { - if ech.ParentSplitID() != nil { - if parentSplitParentID := ech.ParentSplitParentID(); parentSplitParentID != nil { - isSet = true - objKey = objectKey(*parentSplitParentID, make([]byte, objectKeySize)) +func updateSplitInfoIndex(tx *bbolt.Tx, objKey []byte, cnr cid.ID, bucketName []byte, si *objectSDK.SplitInfo) error { + return updateUniqueIndexItem(tx, namedBucketItem{ + name: rootBucketName(cnr, bucketName), + key: objKey, + }, func(old, _ []byte) ([]byte, error) { + switch { + case si == nil && old == nil: + return []byte{}, nil + case si == nil: + return old, nil + case old == nil: + return si.Marshal() + default: + oldSI := objectSDK.NewSplitInfo() + if err := oldSI.Unmarshal(old); err != nil { + return nil, err + } + si = util.MergeSplitInfo(si, oldSI) + return si.Marshal() } - return - } - isSet = true - objKey = objectKey(ech.Parent(), make([]byte, objectKeySize)) - return + }) } type updateIndexItemFunc = func(tx *bbolt.Tx, item namedBucketItem) error @@ -416,13 +413,21 @@ func createBucketLikelyExists[T bucketContainer](tx T, name []byte) (*bbolt.Buck return tx.CreateBucket(name) } -func putUniqueIndexItem(tx *bbolt.Tx, item namedBucketItem) error { +func updateUniqueIndexItem(tx *bbolt.Tx, item namedBucketItem, update func(oldData, newData []byte) ([]byte, error)) error { bkt, err := createBucketLikelyExists(tx, item.name) if err != nil { return fmt.Errorf("can't create index %v: %w", item.name, err) } - return bkt.Put(item.key, item.val) + data, err := update(bkt.Get(item.key), item.val) + if err != nil { + return err + } + return bkt.Put(item.key, data) +} + +func putUniqueIndexItem(tx *bbolt.Tx, item namedBucketItem) error { + return updateUniqueIndexItem(tx, item, func(_, val []byte) ([]byte, error) { return val, nil }) } func putFKBTIndexItem(tx *bbolt.Tx, item namedBucketItem) error { @@ -548,36 +553,8 @@ func setStorageID(tx *bbolt.Tx, addr oid.Address, id []byte, override bool) erro // updateSpliInfo for existing objects if storage filled with extra information // about last object in split hierarchy or linking object. func updateSplitInfo(tx *bbolt.Tx, addr oid.Address, from *objectSDK.SplitInfo) error { - key := make([]byte, bucketKeySize) - bkt := tx.Bucket(rootBucketName(addr.Container(), key)) - if bkt == nil { - // if object doesn't exists and we want to update split info on it - // then ignore, this should never happen - return ErrIncorrectSplitInfoUpdate - } - - objectKey := objectKey(addr.Object(), key) - - rawSplitInfo := bkt.Get(objectKey) - if len(rawSplitInfo) == 0 { - return ErrIncorrectSplitInfoUpdate - } - - to := objectSDK.NewSplitInfo() - - err := to.Unmarshal(rawSplitInfo) - if err != nil { - return fmt.Errorf("can't unmarshal split info from root index: %w", err) - } - - result := util.MergeSplitInfo(from, to) - - rawSplitInfo, err = result.Marshal() - if err != nil { - return fmt.Errorf("can't marhsal merged split info: %w", err) - } - - return bkt.Put(objectKey, rawSplitInfo) + objKey := objectKey(addr.Object(), make([]byte, bucketKeySize)) + return updateSplitInfoIndex(tx, objKey, addr.Container(), make([]byte, bucketKeySize), from) } // splitInfoFromObject returns split info based on last or linkin object. diff --git a/pkg/local_object_storage/metabase/select_test.go b/pkg/local_object_storage/metabase/select_test.go index 8951a2dbe..8f9294d07 100644 --- a/pkg/local_object_storage/metabase/select_test.go +++ b/pkg/local_object_storage/metabase/select_test.go @@ -642,7 +642,7 @@ func (tt *testTarget) WriteObject(_ context.Context, obj *objectSDK.Object) erro return nil } -func cutObject(t *testing.T, p transformer.ChunkedObjectWriter, hdr *objectSDK.Object, size int) { +func cutObject(t *testing.T, p transformer.ChunkedObjectWriter, hdr *objectSDK.Object, size int) *transformer.AccessIdentifiers { ctx := context.Background() require.NoError(t, p.WriteHeader(ctx, hdr)) @@ -652,8 +652,128 @@ func cutObject(t *testing.T, p transformer.ChunkedObjectWriter, hdr *objectSDK.O _, err := p.Write(ctx, payload) require.NoError(t, err) - _, err = p.Close(ctx) + ids, err := p.Close(ctx) require.NoError(t, err) + return ids +} + +func TestDB_RawHead_SplitInfo(t *testing.T) { + t.Parallel() + + const ( + partSize = 10 + partCount = 2 + dataCount = 2 + parityCount = 1 + ) + + db := newDB(t) + defer func() { require.NoError(t, db.Close()) }() + + cnr := cidtest.ID() + + pk, err := keys.NewPrivateKey() + require.NoError(t, err) + + tt := new(testTarget) + p := transformer.NewPayloadSizeLimiter(transformer.Params{ + Key: &pk.PrivateKey, + NextTargetInit: func() transformer.ObjectWriter { return tt }, + NetworkState: epochState{e: 1}, + MaxSize: partSize, + }) + + hdr := objectSDK.New() + hdr.SetContainerID(cnr) + hdr.SetOwnerID(usertest.ID()) + ids := cutObject(t, p, hdr, partSize*partCount) + require.Equal(t, len(tt.objects), partCount+1) + + t.Run("rep", func(t *testing.T) { + testGetRawSplitInfo(t, cnr, ids, tt.objects[partCount], tt.objects[partCount-1]) + }) + t.Run("with ec", func(t *testing.T) { + ec, err := erasurecode.NewConstructor(dataCount, parityCount) + require.NoError(t, err) + + cs, err := ec.Split(tt.objects[partCount-1], &pk.PrivateKey) + require.NoError(t, err) + + testGetRawSplitInfo(t, cnr, ids, tt.objects[partCount], cs[0]) + }) +} + +func testGetRawSplitInfo(t *testing.T, cnr cidSDK.ID, ids *transformer.AccessIdentifiers, linking, lastPart *objectSDK.Object) { + expectedLinkID, ok := linking.ID() + require.True(t, ok) + + t.Run("first last, then linking", func(t *testing.T) { + db := newDB(t) + defer func() { require.NoError(t, db.Close()) }() + + require.NoError(t, metaPut(db, lastPart, nil)) + require.NoError(t, metaPut(db, linking, nil)) + + var addr oid.Address + addr.SetContainer(cnr) + addr.SetObject(*ids.ParentID) + + _, err := metaGet(db, addr, true) + + var siErr *objectSDK.SplitInfoError + require.ErrorAs(t, err, &siErr) + + lastID, ok := siErr.SplitInfo().LastPart() + require.True(t, ok) + require.Equal(t, ids.SelfID, lastID) + + linkID, ok := siErr.SplitInfo().Link() + require.True(t, ok) + require.Equal(t, expectedLinkID, linkID) + }) + t.Run("first linking, then last", func(t *testing.T) { + db := newDB(t) + defer func() { require.NoError(t, db.Close()) }() + + require.NoError(t, metaPut(db, linking, nil)) + require.NoError(t, metaPut(db, lastPart, nil)) + + var addr oid.Address + addr.SetContainer(cnr) + addr.SetObject(*ids.ParentID) + + _, err := metaGet(db, addr, true) + + var siErr *objectSDK.SplitInfoError + require.ErrorAs(t, err, &siErr) + + lastID, ok := siErr.SplitInfo().LastPart() + require.True(t, ok) + require.Equal(t, ids.SelfID, lastID) + + linkID, ok := siErr.SplitInfo().Link() + require.True(t, ok) + require.Equal(t, expectedLinkID, linkID) + }) + t.Run("only last part", func(t *testing.T) { + db := newDB(t) + defer func() { require.NoError(t, db.Close()) }() + + require.NoError(t, metaPut(db, lastPart, nil)) + + var addr oid.Address + addr.SetContainer(cnr) + addr.SetObject(*ids.ParentID) + + _, err := metaGet(db, addr, true) + + var siErr *objectSDK.SplitInfoError + require.ErrorAs(t, err, &siErr) + + lastPart, ok := siErr.SplitInfo().LastPart() + require.True(t, ok) + require.Equal(t, ids.SelfID, lastPart) + }) } func TestDB_SelectSplitID_EC(t *testing.T) {