[#1818] metabase: Add UpdateStorageID operation

By default writecache puts the whole object to update storage ID.
This logic comes from the times when we needed to put objects
in the metabase by the writecache itself. Now this is done by the
blobstor at unmarshaling objects during flush only to update storage ID
is an overkill.

Signed-off-by: Evgenii Stratonikov <evgeniy@morphbits.ru>
This commit is contained in:
Evgenii Stratonikov 2022-10-20 15:09:05 +03:00 committed by fyrchik
parent b580846630
commit 681400eed8
2 changed files with 55 additions and 1 deletions

View file

@ -52,3 +52,40 @@ func (db *DB) storageID(tx *bbolt.Tx, addr oid.Address) ([]byte, error) {
return slice.Copy(storageID), nil return slice.Copy(storageID), nil
} }
// UpdateStorageIDPrm groups the parameters of UpdateStorageID operation.
type UpdateStorageIDPrm struct {
addr oid.Address
id []byte
}
// UpdateStorageIDRes groups the resulting values of UpdateStorageID operation.
type UpdateStorageIDRes struct{}
// SetAddress is an UpdateStorageID option to set the object address to check.
func (p *UpdateStorageIDPrm) SetAddress(addr oid.Address) {
p.addr = addr
}
// SetStorageID is an UpdateStorageID option to set the storage ID.
func (p *UpdateStorageIDPrm) SetStorageID(id []byte) {
p.id = id
}
// UpdateStorageID updates storage descriptor for objects from the blobstor.
func (db *DB) UpdateStorageID(prm UpdateStorageIDPrm) (res UpdateStorageIDRes, err error) {
db.modeMtx.RLock()
defer db.modeMtx.RUnlock()
currEpoch := db.epochState.CurrentEpoch()
err = db.boltDB.Batch(func(tx *bbolt.Tx) error {
exists, err := db.exists(tx, prm.addr, currEpoch)
if !exists || err != nil {
return err
}
return updateStorageID(tx, prm.addr, prm.id)
})
return
}

View file

@ -9,7 +9,7 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
func TestDB_IsSmall(t *testing.T) { func TestDB_StorageID(t *testing.T) {
db := newDB(t) db := newDB(t)
raw1 := generateObject(t) raw1 := generateObject(t)
@ -39,6 +39,23 @@ func TestDB_IsSmall(t *testing.T) {
fetchedStorageID, err = metaStorageID(db, object.AddressOf(raw1)) fetchedStorageID, err = metaStorageID(db, object.AddressOf(raw1))
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, storageID, fetchedStorageID) require.Equal(t, storageID, fetchedStorageID)
t.Run("update", func(t *testing.T) {
require.NoError(t, metaUpdateStorageID(db, object.AddressOf(raw2), storageID))
fetchedStorageID, err = metaStorageID(db, object.AddressOf(raw2))
require.NoError(t, err)
require.Equal(t, storageID, fetchedStorageID)
})
}
func metaUpdateStorageID(db *meta.DB, addr oid.Address, id []byte) error {
var sidPrm meta.UpdateStorageIDPrm
sidPrm.SetAddress(addr)
sidPrm.SetStorageID(id)
_, err := db.UpdateStorageID(sidPrm)
return err
} }
func metaStorageID(db *meta.DB, addr oid.Address) ([]byte, error) { func metaStorageID(db *meta.DB, addr oid.Address) ([]byte, error) {