forked from TrueCloudLab/frostfs-node
[#1008] metabase: Do not update storageID on put
There may be a race condition between put an object and flushing the writecache: 1. Put object to the writecache 2. Writecache flushes object to the blobstore and sets blobstore's storageID 3. Put object to the metabase, set writecache's storageID Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
parent
2ad433dbcb
commit
918613546f
3 changed files with 36 additions and 23 deletions
|
@ -137,7 +137,7 @@ func (db *DB) updateObj(tx *bbolt.Tx, obj *objectSDK.Object, id []byte, si *obje
|
||||||
// When storage engine moves objects between different sub-storages,
|
// When storage engine moves objects between different sub-storages,
|
||||||
// it calls metabase.Put method with new storage ID, thus triggering this code.
|
// it calls metabase.Put method with new storage ID, thus triggering this code.
|
||||||
if !isParent && id != nil {
|
if !isParent && id != nil {
|
||||||
return updateStorageID(tx, objectCore.AddressOf(obj), id)
|
return setStorageID(tx, objectCore.AddressOf(obj), id, true)
|
||||||
}
|
}
|
||||||
|
|
||||||
// when storage already has last object in split hierarchy and there is
|
// when storage already has last object in split hierarchy and there is
|
||||||
|
@ -236,12 +236,7 @@ func putUniqueIndexes(
|
||||||
|
|
||||||
// index storageID if it is present
|
// index storageID if it is present
|
||||||
if id != nil {
|
if id != nil {
|
||||||
err = putUniqueIndexItem(tx, namedBucketItem{
|
if err = setStorageID(tx, objectCore.AddressOf(obj), id, false); err != nil {
|
||||||
name: smallBucketName(cnr, bucketName),
|
|
||||||
key: objKey,
|
|
||||||
val: id,
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -483,16 +478,19 @@ func getVarUint(data []byte) (uint64, int, error) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// updateStorageID for existing objects if they were moved from one
|
// setStorageID for existing objects if they were moved from one
|
||||||
// storage location to another.
|
// storage location to another.
|
||||||
func updateStorageID(tx *bbolt.Tx, addr oid.Address, id []byte) error {
|
func setStorageID(tx *bbolt.Tx, addr oid.Address, id []byte, override bool) error {
|
||||||
key := make([]byte, bucketKeySize)
|
key := make([]byte, bucketKeySize)
|
||||||
bkt, err := createBucketLikelyExists(tx, smallBucketName(addr.Container(), key))
|
bkt, err := createBucketLikelyExists(tx, smallBucketName(addr.Container(), key))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
key = objectKey(addr.Object(), key)
|
||||||
return bkt.Put(objectKey(addr.Object(), key), id)
|
if override || bkt.Get(key) == nil {
|
||||||
|
return bkt.Put(key, id)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// updateSpliInfo for existing objects if storage filled with extra information
|
// updateSpliInfo for existing objects if storage filled with extra information
|
||||||
|
|
|
@ -3,11 +3,9 @@ package meta
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
"go.etcd.io/bbolt"
|
"go.etcd.io/bbolt"
|
||||||
|
@ -120,17 +118,8 @@ func (db *DB) UpdateStorageID(ctx context.Context, prm UpdateStorageIDPrm) (res
|
||||||
return res, ErrReadOnlyMode
|
return res, ErrReadOnlyMode
|
||||||
}
|
}
|
||||||
|
|
||||||
currEpoch := db.epochState.CurrentEpoch()
|
|
||||||
|
|
||||||
err = db.boltDB.Batch(func(tx *bbolt.Tx) error {
|
err = db.boltDB.Batch(func(tx *bbolt.Tx) error {
|
||||||
exists, err := db.exists(tx, prm.addr, currEpoch)
|
return setStorageID(tx, prm.addr, prm.id, true)
|
||||||
if err == nil && exists {
|
|
||||||
err = updateStorageID(tx, prm.addr, prm.id)
|
|
||||||
} else if errors.As(err, new(logicerr.Logical)) {
|
|
||||||
err = updateStorageID(tx, prm.addr, prm.id)
|
|
||||||
}
|
|
||||||
|
|
||||||
return err
|
|
||||||
})
|
})
|
||||||
success = err == nil
|
success = err == nil
|
||||||
return res, metaerr.Wrap(err)
|
return res, metaerr.Wrap(err)
|
||||||
|
|
|
@ -75,6 +75,32 @@ func TestDB_StorageID(t *testing.T) {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestPutWritecacheDataRace(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
db := newDB(t)
|
||||||
|
defer func() { require.NoError(t, db.Close()) }()
|
||||||
|
|
||||||
|
putStorageID := []byte{1, 2, 3}
|
||||||
|
wcStorageID := []byte{1, 2, 3, 4, 5}
|
||||||
|
o := testutil.GenerateObject()
|
||||||
|
|
||||||
|
fetchedStorageID, err := metaStorageID(db, object.AddressOf(o))
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Nil(t, fetchedStorageID)
|
||||||
|
|
||||||
|
// writecache flushes object and updates storageID before object actually saved to the metabase
|
||||||
|
metaUpdateStorageID(db, object.AddressOf(o), wcStorageID)
|
||||||
|
|
||||||
|
// put object completes with writecache's storageID
|
||||||
|
err = metaPut(db, o, putStorageID)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
fetchedStorageID, err = metaStorageID(db, object.AddressOf(o))
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, wcStorageID, fetchedStorageID)
|
||||||
|
}
|
||||||
|
|
||||||
func metaUpdateStorageID(db *meta.DB, addr oid.Address, id []byte) error {
|
func metaUpdateStorageID(db *meta.DB, addr oid.Address, id []byte) error {
|
||||||
var sidPrm meta.UpdateStorageIDPrm
|
var sidPrm meta.UpdateStorageIDPrm
|
||||||
sidPrm.SetAddress(addr)
|
sidPrm.SetAddress(addr)
|
||||||
|
|
Loading…
Reference in a new issue