[#2057] meta: Fix concurrent mode changes
Includes: 1. mode change read lock operation in every exported method that r/w the underlying database; 2. returning `ErrDegradedMode` logical error if any exported method is called in degraded (without a metabase) mode. Signed-off-by: Pavel Karpy <carpawell@nspcc.ru>
This commit is contained in:
parent
3d6defd3e8
commit
fdeea1dfac
16 changed files with 142 additions and 0 deletions
|
@ -33,6 +33,7 @@ Changelog for NeoFS Node
|
|||
- Assembly process triggered by a request with a bearer token (#2040)
|
||||
- Losing locking context after metabase resync (#1502)
|
||||
- Removing all trees by container ID if tree ID is empty in `pilorama.Forest.TreeDrop` (#1940)
|
||||
- Concurrent mode changes in the metabase and blobstor (#2057)
|
||||
|
||||
### Removed
|
||||
### Updated
|
||||
|
|
|
@ -42,6 +42,13 @@ func (db *DB) containers(tx *bbolt.Tx) ([]cid.ID, error) {
|
|||
}
|
||||
|
||||
func (db *DB) ContainerSize(id cid.ID) (size uint64, err error) {
|
||||
db.modeMtx.RLock()
|
||||
defer db.modeMtx.RUnlock()
|
||||
|
||||
if db.mode.NoMetabase() {
|
||||
return 0, ErrDegradedMode
|
||||
}
|
||||
|
||||
err = db.boltDB.View(func(tx *bbolt.Tx) error {
|
||||
size, err = db.containerSize(tx, id)
|
||||
|
||||
|
|
|
@ -81,6 +81,13 @@ func (db *DB) Init() error {
|
|||
// Reset resets metabase. Works similar to Init but cleans up all static buckets and
|
||||
// removes all dynamic (CID-dependent) ones in non-blank BoltDB instances.
|
||||
func (db *DB) Reset() error {
|
||||
db.modeMtx.RLock()
|
||||
defer db.modeMtx.RUnlock()
|
||||
|
||||
if db.mode.NoMetabase() {
|
||||
return ErrDegradedMode
|
||||
}
|
||||
|
||||
return db.init(true)
|
||||
}
|
||||
|
||||
|
@ -147,6 +154,13 @@ func (db *DB) init(reset bool) error {
|
|||
|
||||
// SyncCounters forces to synchronize the object counters.
|
||||
func (db *DB) SyncCounters() error {
|
||||
db.modeMtx.RLock()
|
||||
defer db.modeMtx.RUnlock()
|
||||
|
||||
if db.mode.NoMetabase() {
|
||||
return ErrDegradedMode
|
||||
}
|
||||
|
||||
return db.boltDB.Update(func(tx *bbolt.Tx) error {
|
||||
return syncCounter(tx, true)
|
||||
})
|
||||
|
|
|
@ -43,6 +43,13 @@ func (o ObjectCounters) Phy() uint64 {
|
|||
// Returns only the errors that do not allow reading counter
|
||||
// in Bolt database.
|
||||
func (db *DB) ObjectCounters() (cc ObjectCounters, err error) {
|
||||
db.modeMtx.RLock()
|
||||
defer db.modeMtx.RUnlock()
|
||||
|
||||
if db.mode.NoMetabase() {
|
||||
return ObjectCounters{}, ErrDegradedMode
|
||||
}
|
||||
|
||||
err = db.boltDB.View(func(tx *bbolt.Tx) error {
|
||||
b := tx.Bucket(shardInfoBucket)
|
||||
if b != nil {
|
||||
|
|
|
@ -57,6 +57,10 @@ func (db *DB) Delete(prm DeletePrm) (DeleteRes, error) {
|
|||
db.modeMtx.RLock()
|
||||
defer db.modeMtx.RUnlock()
|
||||
|
||||
if db.mode.NoMetabase() {
|
||||
return DeleteRes{}, ErrDegradedMode
|
||||
}
|
||||
|
||||
var rawRemoved uint64
|
||||
var availableRemoved uint64
|
||||
var err error
|
||||
|
|
|
@ -44,6 +44,10 @@ func (db *DB) Exists(prm ExistsPrm) (res ExistsRes, err error) {
|
|||
db.modeMtx.RLock()
|
||||
defer db.modeMtx.RUnlock()
|
||||
|
||||
if db.mode.NoMetabase() {
|
||||
return res, ErrDegradedMode
|
||||
}
|
||||
|
||||
currEpoch := db.epochState.CurrentEpoch()
|
||||
|
||||
err = db.boltDB.View(func(tx *bbolt.Tx) error {
|
||||
|
|
|
@ -58,6 +58,13 @@ func (g *GarbageIterationPrm) SetOffset(offset oid.Address) {
|
|||
// If h returns ErrInterruptIterator, nil returns immediately.
|
||||
// Returns other errors of h directly.
|
||||
func (db *DB) IterateOverGarbage(p GarbageIterationPrm) error {
|
||||
db.modeMtx.RLock()
|
||||
defer db.modeMtx.RUnlock()
|
||||
|
||||
if db.mode.NoMetabase() {
|
||||
return ErrDegradedMode
|
||||
}
|
||||
|
||||
return db.boltDB.View(func(tx *bbolt.Tx) error {
|
||||
return db.iterateDeletedObj(tx, gcHandler{p.h}, p.offset)
|
||||
})
|
||||
|
@ -118,6 +125,13 @@ func (g *GraveyardIterationPrm) SetOffset(offset oid.Address) {
|
|||
// If h returns ErrInterruptIterator, nil returns immediately.
|
||||
// Returns other errors of h directly.
|
||||
func (db *DB) IterateOverGraveyard(p GraveyardIterationPrm) error {
|
||||
db.modeMtx.RLock()
|
||||
defer db.modeMtx.RUnlock()
|
||||
|
||||
if db.mode.NoMetabase() {
|
||||
return ErrDegradedMode
|
||||
}
|
||||
|
||||
return db.boltDB.View(func(tx *bbolt.Tx) error {
|
||||
return db.iterateDeletedObj(tx, graveyardHandler{p.h}, p.offset)
|
||||
})
|
||||
|
@ -218,6 +232,13 @@ func graveFromKV(k, v []byte) (res TombstonedObject, err error) {
|
|||
//
|
||||
// Returns any error appeared during deletion process.
|
||||
func (db *DB) DropGraves(tss []TombstonedObject) error {
|
||||
db.modeMtx.RLock()
|
||||
defer db.modeMtx.RUnlock()
|
||||
|
||||
if db.mode.NoMetabase() {
|
||||
return ErrDegradedMode
|
||||
}
|
||||
|
||||
buf := make([]byte, addressKeySize)
|
||||
|
||||
return db.boltDB.Update(func(tx *bbolt.Tx) error {
|
||||
|
|
|
@ -15,5 +15,8 @@ type Info struct {
|
|||
|
||||
// DumpInfo returns information about the DB.
|
||||
func (db *DB) DumpInfo() Info {
|
||||
db.modeMtx.RLock()
|
||||
defer db.modeMtx.RUnlock()
|
||||
|
||||
return db.info
|
||||
}
|
||||
|
|
|
@ -44,6 +44,13 @@ var ErrInterruptIterator = logicerr.New("iterator is interrupted")
|
|||
// If h returns ErrInterruptIterator, nil returns immediately.
|
||||
// Returns other errors of h directly.
|
||||
func (db *DB) IterateExpired(epoch uint64, h ExpiredObjectHandler) error {
|
||||
db.modeMtx.RLock()
|
||||
defer db.modeMtx.RUnlock()
|
||||
|
||||
if db.mode.NoMetabase() {
|
||||
return ErrDegradedMode
|
||||
}
|
||||
|
||||
return db.boltDB.View(func(tx *bbolt.Tx) error {
|
||||
return db.iterateExpired(tx, epoch, h)
|
||||
})
|
||||
|
@ -119,6 +126,13 @@ func (db *DB) iterateExpired(tx *bbolt.Tx, epoch uint64, h ExpiredObjectHandler)
|
|||
//
|
||||
// Does not modify tss.
|
||||
func (db *DB) IterateCoveredByTombstones(tss map[string]oid.Address, h func(oid.Address) error) error {
|
||||
db.modeMtx.RLock()
|
||||
defer db.modeMtx.RUnlock()
|
||||
|
||||
if db.mode.NoMetabase() {
|
||||
return ErrDegradedMode
|
||||
}
|
||||
|
||||
return db.boltDB.View(func(tx *bbolt.Tx) error {
|
||||
return db.iterateCoveredByTombstones(tx, tss, h)
|
||||
})
|
||||
|
|
|
@ -64,6 +64,10 @@ func (db *DB) ListWithCursor(prm ListPrm) (res ListRes, err error) {
|
|||
db.modeMtx.RLock()
|
||||
defer db.modeMtx.RUnlock()
|
||||
|
||||
if db.mode.NoMetabase() {
|
||||
return res, ErrDegradedMode
|
||||
}
|
||||
|
||||
result := make([]objectcore.AddressWithType, 0, prm.count)
|
||||
|
||||
err = db.boltDB.View(func(tx *bbolt.Tx) error {
|
||||
|
|
|
@ -29,6 +29,10 @@ func (db *DB) Lock(cnr cid.ID, locker oid.ID, locked []oid.ID) error {
|
|||
db.modeMtx.RLock()
|
||||
defer db.modeMtx.RUnlock()
|
||||
|
||||
if db.mode.NoMetabase() {
|
||||
return ErrDegradedMode
|
||||
}
|
||||
|
||||
if len(locked) == 0 {
|
||||
panic("empty locked list")
|
||||
}
|
||||
|
@ -91,6 +95,13 @@ func (db *DB) Lock(cnr cid.ID, locker oid.ID, locked []oid.ID) error {
|
|||
|
||||
// FreeLockedBy unlocks all objects in DB which are locked by lockers.
|
||||
func (db *DB) FreeLockedBy(lockers []oid.Address) error {
|
||||
db.modeMtx.RLock()
|
||||
defer db.modeMtx.RUnlock()
|
||||
|
||||
if db.mode.NoMetabase() {
|
||||
return ErrDegradedMode
|
||||
}
|
||||
|
||||
return db.boltDB.Update(func(tx *bbolt.Tx) error {
|
||||
var err error
|
||||
|
||||
|
@ -202,6 +213,13 @@ func (i IsLockedRes) Locked() bool {
|
|||
//
|
||||
// Returns only non-logical errors related to underlying database.
|
||||
func (db *DB) IsLocked(prm IsLockedPrm) (res IsLockedRes, err error) {
|
||||
db.modeMtx.RLock()
|
||||
defer db.modeMtx.RUnlock()
|
||||
|
||||
if db.mode.NoMetabase() {
|
||||
return res, ErrDegradedMode
|
||||
}
|
||||
|
||||
return res, db.boltDB.View(func(tx *bbolt.Tx) error {
|
||||
res.locked = objectLocked(tx, prm.addr.Container(), prm.addr.Object())
|
||||
return nil
|
||||
|
|
|
@ -52,6 +52,10 @@ func (db *DB) ToMoveIt(prm ToMoveItPrm) (res ToMoveItRes, err error) {
|
|||
db.modeMtx.RLock()
|
||||
defer db.modeMtx.RUnlock()
|
||||
|
||||
if db.mode.NoMetabase() {
|
||||
return res, ErrDegradedMode
|
||||
}
|
||||
|
||||
key := make([]byte, addressKeySize)
|
||||
key = addressKey(prm.addr, key)
|
||||
|
||||
|
@ -68,6 +72,10 @@ func (db *DB) DoNotMove(prm DoNotMovePrm) (res DoNotMoveRes, err error) {
|
|||
db.modeMtx.RLock()
|
||||
defer db.modeMtx.RUnlock()
|
||||
|
||||
if db.mode.NoMetabase() {
|
||||
return res, ErrDegradedMode
|
||||
}
|
||||
|
||||
key := make([]byte, addressKeySize)
|
||||
key = addressKey(prm.addr, key)
|
||||
|
||||
|
@ -84,6 +92,10 @@ func (db *DB) Movable(_ MovablePrm) (MovableRes, error) {
|
|||
db.modeMtx.RLock()
|
||||
defer db.modeMtx.RUnlock()
|
||||
|
||||
if db.mode.NoMetabase() {
|
||||
return MovableRes{}, ErrDegradedMode
|
||||
}
|
||||
|
||||
var strAddrs []string
|
||||
|
||||
err := db.boltDB.View(func(tx *bbolt.Tx) error {
|
||||
|
|
|
@ -56,6 +56,10 @@ func (db *DB) Put(prm PutPrm) (res PutRes, err error) {
|
|||
db.modeMtx.RLock()
|
||||
defer db.modeMtx.RUnlock()
|
||||
|
||||
if db.mode.NoMetabase() {
|
||||
return res, ErrDegradedMode
|
||||
}
|
||||
|
||||
currEpoch := db.epochState.CurrentEpoch()
|
||||
|
||||
err = db.boltDB.Batch(func(tx *bbolt.Tx) error {
|
||||
|
|
|
@ -59,6 +59,10 @@ func (db *DB) Select(prm SelectPrm) (res SelectRes, err error) {
|
|||
db.modeMtx.RLock()
|
||||
defer db.modeMtx.RUnlock()
|
||||
|
||||
if db.mode.NoMetabase() {
|
||||
return res, ErrDegradedMode
|
||||
}
|
||||
|
||||
if blindlyProcess(prm.filters) {
|
||||
return res, nil
|
||||
}
|
||||
|
|
|
@ -13,6 +13,13 @@ var (
|
|||
// ReadShardID reads shard id from db.
|
||||
// If id is missing, returns nil, nil.
|
||||
func (db *DB) ReadShardID() ([]byte, error) {
|
||||
db.modeMtx.RLock()
|
||||
defer db.modeMtx.RUnlock()
|
||||
|
||||
if db.mode.NoMetabase() {
|
||||
return nil, ErrDegradedMode
|
||||
}
|
||||
|
||||
var id []byte
|
||||
err := db.boltDB.View(func(tx *bbolt.Tx) error {
|
||||
b := tx.Bucket(shardInfoBucket)
|
||||
|
@ -26,6 +33,13 @@ func (db *DB) ReadShardID() ([]byte, error) {
|
|||
|
||||
// WriteShardID writes shard it to db.
|
||||
func (db *DB) WriteShardID(id []byte) error {
|
||||
db.modeMtx.RLock()
|
||||
defer db.modeMtx.RUnlock()
|
||||
|
||||
if db.mode.NoMetabase() {
|
||||
return ErrDegradedMode
|
||||
}
|
||||
|
||||
return db.boltDB.Update(func(tx *bbolt.Tx) error {
|
||||
b, err := tx.CreateBucketIfNotExists(shardInfoBucket)
|
||||
if err != nil {
|
||||
|
|
|
@ -29,6 +29,13 @@ func (r StorageIDRes) StorageID() []byte {
|
|||
// StorageID returns storage descriptor for objects from the blobstor.
|
||||
// It is put together with the object can makes get/delete operation faster.
|
||||
func (db *DB) StorageID(prm StorageIDPrm) (res StorageIDRes, err error) {
|
||||
db.modeMtx.RLock()
|
||||
defer db.modeMtx.RUnlock()
|
||||
|
||||
if db.mode.NoMetabase() {
|
||||
return res, ErrDegradedMode
|
||||
}
|
||||
|
||||
err = db.boltDB.View(func(tx *bbolt.Tx) error {
|
||||
res.id, err = db.storageID(tx, prm.addr)
|
||||
|
||||
|
@ -77,6 +84,10 @@ func (db *DB) UpdateStorageID(prm UpdateStorageIDPrm) (res UpdateStorageIDRes, e
|
|||
db.modeMtx.RLock()
|
||||
defer db.modeMtx.RUnlock()
|
||||
|
||||
if db.mode.NoMetabase() {
|
||||
return res, ErrDegradedMode
|
||||
}
|
||||
|
||||
currEpoch := db.epochState.CurrentEpoch()
|
||||
|
||||
err = db.boltDB.Batch(func(tx *bbolt.Tx) error {
|
||||
|
|
Loading…
Reference in a new issue