From 918613546f0812234869b4f6906bb74adbba9e15 Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Mon, 26 Feb 2024 16:30:03 +0300 Subject: [PATCH] [#1008] metabase: Do not update storageID on put There may be a race condition between put an object and flushing the writecache: 1. Put object to the writecache 2. Writecache flushes object to the blobstore and sets blobstore's storageID 3. Put object to the metabase, set writecache's storageID Signed-off-by: Dmitrii Stepanov --- pkg/local_object_storage/metabase/put.go | 20 +++++++------- .../metabase/storage_id.go | 13 +--------- .../metabase/storage_id_test.go | 26 +++++++++++++++++++ 3 files changed, 36 insertions(+), 23 deletions(-) diff --git a/pkg/local_object_storage/metabase/put.go b/pkg/local_object_storage/metabase/put.go index 5d8e4d263..429d981fe 100644 --- a/pkg/local_object_storage/metabase/put.go +++ b/pkg/local_object_storage/metabase/put.go @@ -137,7 +137,7 @@ func (db *DB) updateObj(tx *bbolt.Tx, obj *objectSDK.Object, id []byte, si *obje // When storage engine moves objects between different sub-storages, // it calls metabase.Put method with new storage ID, thus triggering this code. if !isParent && id != nil { - return updateStorageID(tx, objectCore.AddressOf(obj), id) + return setStorageID(tx, objectCore.AddressOf(obj), id, true) } // when storage already has last object in split hierarchy and there is @@ -236,12 +236,7 @@ func putUniqueIndexes( // index storageID if it is present if id != nil { - err = putUniqueIndexItem(tx, namedBucketItem{ - name: smallBucketName(cnr, bucketName), - key: objKey, - val: id, - }) - if err != nil { + if err = setStorageID(tx, objectCore.AddressOf(obj), id, false); err != nil { return err } } @@ -483,16 +478,19 @@ func getVarUint(data []byte) (uint64, int, error) { } } -// updateStorageID for existing objects if they were moved from one +// setStorageID for existing objects if they were moved from one // storage location to another. -func updateStorageID(tx *bbolt.Tx, addr oid.Address, id []byte) error { +func setStorageID(tx *bbolt.Tx, addr oid.Address, id []byte, override bool) error { key := make([]byte, bucketKeySize) bkt, err := createBucketLikelyExists(tx, smallBucketName(addr.Container(), key)) if err != nil { return err } - - return bkt.Put(objectKey(addr.Object(), key), id) + key = objectKey(addr.Object(), key) + if override || bkt.Get(key) == nil { + return bkt.Put(key, id) + } + return nil } // updateSpliInfo for existing objects if storage filled with extra information diff --git a/pkg/local_object_storage/metabase/storage_id.go b/pkg/local_object_storage/metabase/storage_id.go index ef6b44dfd..f9767935c 100644 --- a/pkg/local_object_storage/metabase/storage_id.go +++ b/pkg/local_object_storage/metabase/storage_id.go @@ -3,11 +3,9 @@ package meta import ( "bytes" "context" - "errors" "time" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "go.etcd.io/bbolt" @@ -120,17 +118,8 @@ func (db *DB) UpdateStorageID(ctx context.Context, prm UpdateStorageIDPrm) (res return res, ErrReadOnlyMode } - currEpoch := db.epochState.CurrentEpoch() - err = db.boltDB.Batch(func(tx *bbolt.Tx) error { - exists, err := db.exists(tx, prm.addr, currEpoch) - if err == nil && exists { - err = updateStorageID(tx, prm.addr, prm.id) - } else if errors.As(err, new(logicerr.Logical)) { - err = updateStorageID(tx, prm.addr, prm.id) - } - - return err + return setStorageID(tx, prm.addr, prm.id, true) }) success = err == nil return res, metaerr.Wrap(err) diff --git a/pkg/local_object_storage/metabase/storage_id_test.go b/pkg/local_object_storage/metabase/storage_id_test.go index 063dfb270..aaf6480ab 100644 --- a/pkg/local_object_storage/metabase/storage_id_test.go +++ b/pkg/local_object_storage/metabase/storage_id_test.go @@ -75,6 +75,32 @@ func TestDB_StorageID(t *testing.T) { }) } +func TestPutWritecacheDataRace(t *testing.T) { + t.Parallel() + + db := newDB(t) + defer func() { require.NoError(t, db.Close()) }() + + putStorageID := []byte{1, 2, 3} + wcStorageID := []byte{1, 2, 3, 4, 5} + o := testutil.GenerateObject() + + fetchedStorageID, err := metaStorageID(db, object.AddressOf(o)) + require.NoError(t, err) + require.Nil(t, fetchedStorageID) + + // writecache flushes object and updates storageID before object actually saved to the metabase + metaUpdateStorageID(db, object.AddressOf(o), wcStorageID) + + // put object completes with writecache's storageID + err = metaPut(db, o, putStorageID) + require.NoError(t, err) + + fetchedStorageID, err = metaStorageID(db, object.AddressOf(o)) + require.NoError(t, err) + require.Equal(t, wcStorageID, fetchedStorageID) +} + func metaUpdateStorageID(db *meta.DB, addr oid.Address, id []byte) error { var sidPrm meta.UpdateStorageIDPrm sidPrm.SetAddress(addr)