blobovnicza: Prevent concurrent Put/Close #1313

Merged
fyrchik merged 1 commit from fyrchik/frostfs-node:fix-blobovnicza-rc into support/v0.42 2024-08-15 13:42:21 +00:00
7 changed files with 19 additions and 1 deletions

View file

@ -22,7 +22,7 @@ type Blobovnicza struct {
boltDB *bbolt.DB boltDB *bbolt.DB
opened bool opened bool
controlMtx sync.Mutex controlMtx sync.RWMutex
} }
// Option is an option of Blobovnicza's constructor. // Option is an option of Blobovnicza's constructor.

View file

@ -45,6 +45,9 @@ func (b *Blobovnicza) Delete(ctx context.Context, prm DeletePrm) (DeleteRes, err
)) ))
defer span.End() defer span.End()
b.controlMtx.RLock()
defer b.controlMtx.RUnlock()
addrKey := addressKey(prm.addr) addrKey := addressKey(prm.addr)
found := false found := false

View file

@ -21,6 +21,9 @@ func (b *Blobovnicza) Exists(ctx context.Context, addr oid.Address) (bool, error
)) ))
defer span.End() defer span.End()
b.controlMtx.RLock()
defer b.controlMtx.RUnlock()
addrKey := addressKey(addr) addrKey := addressKey(addr)
err := b.boltDB.View(func(tx *bbolt.Tx) error { err := b.boltDB.View(func(tx *bbolt.Tx) error {

View file

@ -51,6 +51,9 @@ func (b *Blobovnicza) Get(ctx context.Context, prm GetPrm) (GetRes, error) {
)) ))
defer span.End() defer span.End()
b.controlMtx.RLock()
defer b.controlMtx.RUnlock()
var ( var (
data []byte data []byte
addrKey = addressKey(prm.addr) addrKey = addressKey(prm.addr)

View file

@ -128,6 +128,9 @@ func (b *Blobovnicza) Iterate(ctx context.Context, prm IteratePrm) (IterateRes,
)) ))
defer span.End() defer span.End()
b.controlMtx.RLock()
defer b.controlMtx.RUnlock()
var elem IterationElement var elem IterationElement
if err := b.boltDB.View(func(tx *bbolt.Tx) error { if err := b.boltDB.View(func(tx *bbolt.Tx) error {

View file

@ -29,6 +29,9 @@ func (b *Blobovnicza) PutMoveInfo(ctx context.Context, prm MoveInfo) error {
)) ))
defer span.End() defer span.End()
b.controlMtx.RLock()
defer b.controlMtx.RUnlock()
key := addressKey(prm.Address) key := addressKey(prm.Address)
err := b.boltDB.Update(func(tx *bbolt.Tx) error { err := b.boltDB.Update(func(tx *bbolt.Tx) error {

View file

@ -64,6 +64,9 @@ func (b *Blobovnicza) Put(ctx context.Context, prm PutPrm) (PutRes, error) {
)) ))
defer span.End() defer span.End()
b.controlMtx.RLock()
defer b.controlMtx.RUnlock()
sz := uint64(len(prm.objData)) sz := uint64(len(prm.objData))
bucketName := bucketForSize(sz) bucketName := bucketForSize(sz)
key := addressKey(prm.addr) key := addressKey(prm.addr)