metabase: Set storageID even if there is no object in metabase #1008
3 changed files with 36 additions and 23 deletions
|
@ -137,7 +137,7 @@ func (db *DB) updateObj(tx *bbolt.Tx, obj *objectSDK.Object, id []byte, si *obje
|
|||
// When storage engine moves objects between different sub-storages,
|
||||
// it calls metabase.Put method with new storage ID, thus triggering this code.
|
||||
if !isParent && id != nil {
|
||||
return updateStorageID(tx, objectCore.AddressOf(obj), id)
|
||||
return setStorageID(tx, objectCore.AddressOf(obj), id, true)
|
||||
}
|
||||
|
||||
// when storage already has last object in split hierarchy and there is
|
||||
|
@ -236,12 +236,7 @@ func putUniqueIndexes(
|
|||
|
||||
// index storageID if it is present
|
||||
if id != nil {
|
||||
err = putUniqueIndexItem(tx, namedBucketItem{
|
||||
name: smallBucketName(cnr, bucketName),
|
||||
key: objKey,
|
||||
val: id,
|
||||
})
|
||||
if err != nil {
|
||||
if err = setStorageID(tx, objectCore.AddressOf(obj), id, false); err != nil {
|
||||
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
@ -483,16 +478,19 @@ func getVarUint(data []byte) (uint64, int, error) {
|
|||
}
|
||||
}
|
||||
|
||||
// updateStorageID for existing objects if they were moved from one
|
||||
// setStorageID for existing objects if they were moved from one
|
||||
// storage location to another.
|
||||
func updateStorageID(tx *bbolt.Tx, addr oid.Address, id []byte) error {
|
||||
func setStorageID(tx *bbolt.Tx, addr oid.Address, id []byte, override bool) error {
|
||||
key := make([]byte, bucketKeySize)
|
||||
bkt, err := createBucketLikelyExists(tx, smallBucketName(addr.Container(), key))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return bkt.Put(objectKey(addr.Object(), key), id)
|
||||
key = objectKey(addr.Object(), key)
|
||||
if override || bkt.Get(key) == nil {
|
||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
`&& insertOnly` is not needed, as it is always true in this branch
dstepanov-yadro
commented
done done
|
||||
return bkt.Put(key, id)
|
||||
aarifullin
commented
Just for curiosity: if it happens that a shard inserts storageID for an object and, at the same time, updated Blobstor causes updating storageID for the object, may we expect some error like Just for curiosity: if it happens that a shard *inserts* storageID for an object and, at the same time, *updated* Blobstor causes *updating* storageID for the object, may we expect some error like `g2 cannot Put because g1 is Getting`?
dstepanov-yadro
commented
Only one concurrent write tx is allowed. So it can be only Only one concurrent write tx is allowed. So it can be only `put->update` or `update->put`.
fyrchik
commented
You can see the mechanics of it inside bbolt You can see the mechanics of it inside bbolt `Batch` function -- we amortize only fsyncs, the transactions themselves are executed sequentially.
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// updateSpliInfo for existing objects if storage filled with extra information
|
||||
|
|
|
@ -3,11 +3,9 @@ package meta
|
|||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"errors"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"go.etcd.io/bbolt"
|
||||
|
@ -120,17 +118,8 @@ func (db *DB) UpdateStorageID(ctx context.Context, prm UpdateStorageIDPrm) (res
|
|||
return res, ErrReadOnlyMode
|
||||
}
|
||||
|
||||
currEpoch := db.epochState.CurrentEpoch()
|
||||
|
||||
err = db.boltDB.Batch(func(tx *bbolt.Tx) error {
|
||||
exists, err := db.exists(tx, prm.addr, currEpoch)
|
||||
if err == nil && exists {
|
||||
err = updateStorageID(tx, prm.addr, prm.id)
|
||||
} else if errors.As(err, new(logicerr.Logical)) {
|
||||
err = updateStorageID(tx, prm.addr, prm.id)
|
||||
}
|
||||
|
||||
return err
|
||||
return setStorageID(tx, prm.addr, prm.id, true)
|
||||
})
|
||||
success = err == nil
|
||||
return res, metaerr.Wrap(err)
|
||||
|
|
|
@ -75,6 +75,32 @@ func TestDB_StorageID(t *testing.T) {
|
|||
})
|
||||
}
|
||||
|
||||
func TestPutWritecacheDataRace(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
db := newDB(t)
|
||||
defer func() { require.NoError(t, db.Close()) }()
|
||||
|
||||
putStorageID := []byte{1, 2, 3}
|
||||
wcStorageID := []byte{1, 2, 3, 4, 5}
|
||||
o := testutil.GenerateObject()
|
||||
|
||||
fetchedStorageID, err := metaStorageID(db, object.AddressOf(o))
|
||||
require.NoError(t, err)
|
||||
require.Nil(t, fetchedStorageID)
|
||||
|
||||
// writecache flushes object and updates storageID before object actually saved to the metabase
|
||||
metaUpdateStorageID(db, object.AddressOf(o), wcStorageID)
|
||||
|
||||
// put object completes with writecache's storageID
|
||||
err = metaPut(db, o, putStorageID)
|
||||
require.NoError(t, err)
|
||||
|
||||
fetchedStorageID, err = metaStorageID(db, object.AddressOf(o))
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, wcStorageID, fetchedStorageID)
|
||||
}
|
||||
|
||||
func metaUpdateStorageID(db *meta.DB, addr oid.Address, id []byte) error {
|
||||
var sidPrm meta.UpdateStorageIDPrm
|
||||
sidPrm.SetAddress(addr)
|
||||
|
|
Loading…
Reference in a new issue
Correct me if I am wrong:
Now we will overwrite this ID, so if flush happened before PUT, the stored id will be invalid and won't change because the object will be removed from the writecache?
No. Flush doesnt set storageID if there is no object in metabase (exist = false).
Then I don't understand, what is the problem now (on master)?
If PUT happened before the
UpdateStorageID()
everything is ok.If PUT happened after the
UpdateStorageID()
everything is ok too, as you described in the comment.No.
I described so:
This means that after
Shard.Put
metabase hasstorageID = writecache
, but there is already no object in writecache (already flushed). Correct storageID is blobovnicza path, not writecache.