Fix blobovnicza cache #669
|
@ -509,4 +509,5 @@ const (
|
||||||
RuntimeSoftMemoryLimitUpdated = "soft runtime memory limit value updated"
|
RuntimeSoftMemoryLimitUpdated = "soft runtime memory limit value updated"
|
||||||
RuntimeSoftMemoryDefinedWithGOMEMLIMIT = "soft runtime memory defined with GOMEMLIMIT environment variable, config value skipped"
|
RuntimeSoftMemoryDefinedWithGOMEMLIMIT = "soft runtime memory defined with GOMEMLIMIT environment variable, config value skipped"
|
||||||
FailedToCountWritecacheItems = "failed to count writecache items"
|
FailedToCountWritecacheItems = "failed to count writecache items"
|
||||||
|
AttemtToCloseAlreadyClosedBlobovnicza = "attempt to close an already closed blobovnicza"
|
||||||
)
|
)
|
||||||
|
|
|
@ -3,7 +3,6 @@ package blobovnicza
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"crypto/rand"
|
"crypto/rand"
|
||||||
"errors"
|
|
||||||
"os"
|
"os"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
|
@ -98,9 +97,9 @@ func TestBlobovnicza(t *testing.T) {
|
||||||
testPutGet(t, blz, oidtest.Address(), objSizeLim, nil, nil)
|
testPutGet(t, blz, oidtest.Address(), objSizeLim, nil, nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
// from now objects should not be saved
|
// blobovnizca accepts object event if full
|
||||||
|
|||||||
testPutGet(t, blz, oidtest.Address(), 1024, func(err error) bool {
|
testPutGet(t, blz, oidtest.Address(), 1024, func(err error) bool {
|
||||||
return errors.Is(err, ErrFull)
|
return err == nil
|
||||||
}, nil)
|
}, nil)
|
||||||
|
|
||||||
require.NoError(t, blz.Close())
|
require.NoError(t, blz.Close())
|
||||||
|
|
|
@ -23,10 +23,6 @@ type PutPrm struct {
|
||||||
type PutRes struct {
|
type PutRes struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
// ErrFull is returned when trying to save an
|
|
||||||
// object to a filled blobovnicza.
|
|
||||||
var ErrFull = logicerr.New("blobovnicza is full")
|
|
||||||
|
|
||||||
// SetAddress sets the address of the saving object.
|
// SetAddress sets the address of the saving object.
|
||||||
func (p *PutPrm) SetAddress(addr oid.Address) {
|
func (p *PutPrm) SetAddress(addr oid.Address) {
|
||||||
p.addr = addr
|
p.addr = addr
|
||||||
|
@ -65,10 +61,6 @@ func (b *Blobovnicza) Put(ctx context.Context, prm PutPrm) (PutRes, error) {
|
||||||
key := addressKey(prm.addr)
|
key := addressKey(prm.addr)
|
||||||
|
|
||||||
err := b.boltDB.Batch(func(tx *bbolt.Tx) error {
|
err := b.boltDB.Batch(func(tx *bbolt.Tx) error {
|
||||||
if b.full() {
|
|
||||||
return ErrFull
|
|
||||||
}
|
|
||||||
|
|
||||||
buck := tx.Bucket(bucketName)
|
buck := tx.Bucket(bucketName)
|
||||||
if buck == nil {
|
if buck == nil {
|
||||||
// expected to happen:
|
// expected to happen:
|
||||||
|
|
|
@ -55,6 +55,6 @@ func (b *Blobovnicza) itemDeleted(itemSize uint64) {
|
||||||
b.metrics.SubOpenBlobovniczaItems(1)
|
b.metrics.SubOpenBlobovniczaItems(1)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *Blobovnicza) full() bool {
|
func (b *Blobovnicza) IsFull() bool {
|
||||||
return b.dataSize.Load() >= b.fullSizeLimit
|
return b.dataSize.Load() >= b.fullSizeLimit
|
||||||
}
|
}
|
||||||
|
|
213
pkg/local_object_storage/blobstor/blobovniczatree/active.go
Normal file
|
@ -0,0 +1,213 @@
|
||||||
|
package blobovniczatree
|
||||||
|
|
||||||
|
import (
|
||||||
|
"path/filepath"
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobovnicza"
|
||||||
|
utilSync "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/sync"
|
||||||
|
)
|
||||||
|
|
||||||
|
type activeDB struct {
|
||||||
|
blz *blobovnicza.Blobovnicza
|
||||||
|
shDB *sharedDB
|
||||||
|
}
|
||||||
|
|
||||||
|
func (db *activeDB) Blobovnicza() *blobovnicza.Blobovnicza {
|
||||||
|
return db.blz
|
||||||
|
}
|
||||||
|
|
||||||
|
func (db *activeDB) Close() {
|
||||||
|
db.shDB.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (db *activeDB) Path() string {
|
||||||
|
return db.shDB.Path()
|
||||||
|
}
|
||||||
|
|
||||||
|
// activeDBManager manages active blobovnicza instances (that is, those that are being used for Put).
|
||||||
|
//
|
||||||
|
// Uses dbManager for opening/closing sharedDB instances.
|
||||||
|
// Stores a reference to an open active sharedDB, so dbManager does not close it.
|
||||||
|
// When changing the active sharedDB, releases the reference to the previous active sharedDB.
|
||||||
|
type activeDBManager struct {
|
||||||
|
levelToActiveDBGuard *sync.RWMutex
|
||||||
|
levelToActiveDB map[string]*sharedDB
|
||||||
|
levelLock *utilSync.KeyLocker[string]
|
||||||
|
closed bool
|
||||||
|
|
||||||
|
dbManager *dbManager
|
||||||
|
leafWidth uint64
|
||||||
|
}
|
||||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
`levelToActive` or `levelToShared`?
dstepanov-yadro
commented
Active DB of type Active DB of type `sharedDB`.
fyrchik
commented
Ok, for some reason I've though shared/active is a pair of mutually exclusive roles. Ok, for some reason I've though shared/active is a pair of mutually exclusive roles.
|
|||||||
|
|
||||||
|
func newActiveDBManager(dbManager *dbManager, leafWidth uint64) *activeDBManager {
|
||||||
|
return &activeDBManager{
|
||||||
|
levelToActiveDBGuard: &sync.RWMutex{},
|
||||||
|
levelToActiveDB: make(map[string]*sharedDB),
|
||||||
|
levelLock: utilSync.NewKeyLocker[string](),
|
||||||
|
|
||||||
|
dbManager: dbManager,
|
||||||
|
leafWidth: leafWidth,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetOpenedActiveDBForLevel returns active DB for level.
|
||||||
|
// DB must be closed after use.
|
||||||
|
func (m *activeDBManager) GetOpenedActiveDBForLevel(lvlPath string) (*activeDB, error) {
|
||||||
|
activeDB, err := m.getCurrentActiveIfOk(lvlPath)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if activeDB != nil {
|
||||||
|
return activeDB, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return m.updateAndGetActive(lvlPath)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *activeDBManager) Open() {
|
||||||
|
m.levelToActiveDBGuard.Lock()
|
||||||
|
defer m.levelToActiveDBGuard.Unlock()
|
||||||
|
|
||||||
|
m.closed = false
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *activeDBManager) Close() {
|
||||||
fyrchik
commented
Could you leave a comment about error handling here? We may found ourselves in a situation when the DB was not closed and later would not be opened because open FD leaked. Could you leave a comment about error handling here? We may found ourselves in a situation when the DB was not closed and later would not be opened because open FD leaked.
dstepanov-yadro
commented
It is existed behaviour: https://git.frostfs.info/TrueCloudLab/frostfs-node/src/branch/master/pkg/local_object_storage/blobstor/blobovniczatree/blobovnicza.go#L111 When closing DB we just log error: see It is existed behaviour: https://git.frostfs.info/TrueCloudLab/frostfs-node/src/branch/master/pkg/local_object_storage/blobstor/blobovniczatree/blobovnicza.go#L111
When closing DB we just log error: see `sharedDB.Close()`
fyrchik
commented
I don't argue, I think it is just something worth thinking about if you touch the code or investigate problems. I don't argue, I think it is just something worth thinking about if you touch the code or investigate problems.
|
|||||||
|
m.levelToActiveDBGuard.Lock()
|
||||||
|
defer m.levelToActiveDBGuard.Unlock()
|
||||||
|
|
||||||
|
for _, db := range m.levelToActiveDB {
|
||||||
|
db.Close()
|
||||||
|
}
|
||||||
|
m.levelToActiveDB = make(map[string]*sharedDB)
|
||||||
|
m.closed = true
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *activeDBManager) getCurrentActiveIfOk(lvlPath string) (*activeDB, error) {
|
||||||
|
m.levelToActiveDBGuard.RLock()
|
||||||
|
defer m.levelToActiveDBGuard.RUnlock()
|
||||||
|
|
||||||
|
if m.closed {
|
||||||
|
return nil, errClosed
|
||||||
|
}
|
||||||
|
|
||||||
|
db, ok := m.levelToActiveDB[lvlPath]
|
||||||
dstepanov-yadro
commented
db 0 is active (refCount = 1) To prevent this situation active db manager returns active db opened, so user has to close it. db 0 is active (refCount = 1)
goroutine 1: got and opened db 0 (refCount = 2)
goroutine 2: got db 0
goroutine 1: saved object to db 0, db 0 is full, closed (refCount = 1)
goroutine 3: replaced active db from db 0 to db 1, so db 0 was physically closed (refCount = 0)
goroutine 2: physically opens db 0 (refCount = 1)
To prevent this situation active db manager returns active db opened, so user has to close it.
|
|||||||
|
if !ok {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
blz, err := db.Open() //open db for usage, will be closed on activeDB.Close()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if blz.IsFull() {
|
||||||
|
db.Close()
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return &activeDB{
|
||||||
|
blz: blz,
|
||||||
|
shDB: db,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *activeDBManager) updateAndGetActive(lvlPath string) (*activeDB, error) {
|
||||||
|
m.levelLock.Lock(lvlPath)
|
||||||
|
defer m.levelLock.Unlock(lvlPath)
|
||||||
|
|
||||||
|
current, err := m.getCurrentActiveIfOk(lvlPath)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if current != nil {
|
||||||
|
return current, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
nextShDB, err := m.getNextSharedDB(lvlPath)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if nextShDB == nil {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
blz, err := nextShDB.Open() // open db for client, client must call Close() after usage
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return &activeDB{
|
||||||
|
blz: blz,
|
||||||
|
shDB: nextShDB,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *activeDBManager) getNextSharedDB(lvlPath string) (*sharedDB, error) {
|
||||||
|
var idx uint64
|
||||||
|
var iterCount uint64
|
||||||
|
hasActive, currentIdx := m.hasActiveDB(lvlPath)
|
||||||
|
if hasActive {
|
||||||
|
idx = (currentIdx + 1) % m.leafWidth
|
||||||
|
}
|
||||||
|
|
||||||
|
var next *sharedDB
|
||||||
|
|
||||||
|
for iterCount < m.leafWidth {
|
||||||
|
path := filepath.Join(lvlPath, u64ToHexString(idx))
|
||||||
|
shDB := m.dbManager.GetByPath(path)
|
||||||
|
db, err := shDB.Open() //open db to hold active DB open, will be closed if db is full, after m.replace or by activeDBManager.Close()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if db.IsFull() {
|
||||||
|
shDB.Close()
|
||||||
|
} else {
|
||||||
|
next = shDB
|
||||||
|
break
|
||||||
|
}
|
||||||
|
idx = (idx + 1) % m.leafWidth
|
||||||
|
iterCount++
|
||||||
|
}
|
||||||
|
|
||||||
|
previous, updated := m.replace(lvlPath, next)
|
||||||
|
if !updated && next != nil {
|
||||||
|
next.Close() // manager is closed, so don't hold active DB open
|
||||||
|
}
|
||||||
|
if updated && previous != nil {
|
||||||
|
previous.Close()
|
||||||
|
}
|
||||||
|
return next, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *activeDBManager) hasActiveDB(lvlPath string) (bool, uint64) {
|
||||||
|
m.levelToActiveDBGuard.RLock()
|
||||||
|
defer m.levelToActiveDBGuard.RUnlock()
|
||||||
|
|
||||||
|
if m.closed {
|
||||||
|
return false, 0
|
||||||
|
}
|
||||||
|
|
||||||
|
db, ok := m.levelToActiveDB[lvlPath]
|
||||||
|
if !ok {
|
||||||
|
return false, 0
|
||||||
|
}
|
||||||
|
return true, u64FromHexString(filepath.Base(db.Path()))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *activeDBManager) replace(lvlPath string, shDB *sharedDB) (*sharedDB, bool) {
|
||||||
|
m.levelToActiveDBGuard.Lock()
|
||||||
|
defer m.levelToActiveDBGuard.Unlock()
|
||||||
|
|
||||||
|
if m.closed {
|
||||||
|
return nil, false
|
||||||
|
}
|
||||||
|
|
||||||
|
previous := m.levelToActiveDB[lvlPath]
|
||||||
|
if shDB == nil {
|
||||||
|
delete(m.levelToActiveDB, lvlPath)
|
||||||
|
} else {
|
||||||
|
m.levelToActiveDB[lvlPath] = shDB
|
||||||
|
}
|
||||||
|
return previous, true
|
||||||
|
}
|
|
@ -3,19 +3,12 @@ package blobovniczatree
|
||||||
import (
|
import (
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"path/filepath"
|
|
||||||
"strconv"
|
"strconv"
|
||||||
"sync"
|
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobovnicza"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/compression"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/compression"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
|
|
||||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
"git.frostfs.info/TrueCloudLab/hrw"
|
"git.frostfs.info/TrueCloudLab/hrw"
|
||||||
"github.com/hashicorp/golang-lru/v2/simplelru"
|
|
||||||
"go.uber.org/zap"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// Blobovniczas represents the storage of the "small" objects.
|
// Blobovniczas represents the storage of the "small" objects.
|
||||||
|
@ -61,28 +54,9 @@ import (
|
||||||
type Blobovniczas struct {
|
type Blobovniczas struct {
|
||||||
cfg
|
cfg
|
||||||
|
|
||||||
// cache of opened filled Blobovniczas
|
commondbManager *dbManager
|
||||||
opened *simplelru.LRU[string, *blobovnicza.Blobovnicza]
|
activeDBManager *activeDBManager
|
||||||
// lruMtx protects opened cache.
|
dbCache *dbCache
|
||||||
// It isn't RWMutex because `Get` calls must
|
|
||||||
// lock this mutex on write, as LRU info is updated.
|
|
||||||
// It must be taken after activeMtx in case when eviction is possible
|
|
||||||
// i.e. `Add`, `Purge` and `Remove` calls.
|
|
||||||
lruMtx sync.Mutex
|
|
||||||
|
|
||||||
// mutex to exclude parallel bbolt.Open() calls
|
|
||||||
// bbolt.Open() deadlocks if it tries to open already opened file
|
|
||||||
openMtx sync.Mutex
|
|
||||||
|
|
||||||
// list of active (opened, non-filled) Blobovniczas
|
|
||||||
activeMtx sync.RWMutex
|
|
||||||
active map[string]blobovniczaWithIndex
|
|
||||||
}
|
|
||||||
|
|
||||||
type blobovniczaWithIndex struct {
|
|
||||||
ind uint64
|
|
||||||
|
|
||||||
blz *blobovnicza.Blobovnicza
|
|
||||||
}
|
}
|
||||||
|
|
||||||
var _ common.Storage = (*Blobovniczas)(nil)
|
var _ common.Storage = (*Blobovniczas)(nil)
|
||||||
|
@ -102,120 +76,13 @@ func NewBlobovniczaTree(opts ...Option) (blz *Blobovniczas) {
|
||||||
blz.blzLeafWidth = blz.blzShallowWidth
|
blz.blzLeafWidth = blz.blzShallowWidth
|
||||||
}
|
}
|
||||||
|
|
||||||
cache, err := simplelru.NewLRU[string, *blobovnicza.Blobovnicza](blz.openedCacheSize, func(p string, value *blobovnicza.Blobovnicza) {
|
blz.commondbManager = newDBManager(blz.rootPath, blz.blzOpts, blz.blzLeafWidth, blz.readOnly, blz.metrics.Blobovnicza(), blz.log)
|
||||||
lvlPath := filepath.Dir(p)
|
blz.activeDBManager = newActiveDBManager(blz.commondbManager, blz.blzLeafWidth)
|
||||||
if b, ok := blz.active[lvlPath]; ok && b.ind == u64FromHexString(filepath.Base(p)) {
|
blz.dbCache = newDBCache(blz.openedCacheSize, blz.commondbManager)
|
||||||
// This branch is taken if we have recently updated active blobovnicza and remove
|
|
||||||
// it from opened cache.
|
|
||||||
return
|
|
||||||
} else if err := value.Close(); err != nil {
|
|
||||||
blz.log.Error(logs.BlobovniczatreeCouldNotCloseBlobovnicza,
|
|
||||||
zap.String("id", p),
|
|
||||||
zap.String("error", err.Error()),
|
|
||||||
)
|
|
||||||
} else {
|
|
||||||
blz.log.Debug(logs.BlobovniczatreeBlobovniczaSuccessfullyClosedOnEvict,
|
|
||||||
zap.String("id", p),
|
|
||||||
)
|
|
||||||
}
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
// occurs only if the size is not positive
|
|
||||||
panic(fmt.Errorf("could not create LRU cache of size %d: %w", blz.openedCacheSize, err))
|
|
||||||
}
|
|
||||||
|
|
||||||
activeMapCapacity := uint64(1)
|
|
||||||
for i := uint64(0); i < blz.blzShallowDepth; i++ {
|
|
||||||
if i+1 == blz.blzShallowDepth {
|
|
||||||
activeMapCapacity *= blz.blzLeafWidth
|
|
||||||
} else {
|
|
||||||
activeMapCapacity *= blz.blzShallowWidth
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
blz.opened = cache
|
|
||||||
blz.active = make(map[string]blobovniczaWithIndex, activeMapCapacity)
|
|
||||||
|
|
||||||
return blz
|
return blz
|
||||||
}
|
}
|
||||||
|
|
||||||
// activates and returns activated blobovnicza of p-level (dir).
|
|
||||||
//
|
|
||||||
// returns error if blobvnicza could not be activated.
|
|
||||||
func (b *Blobovniczas) getActivated(lvlPath string) (blobovniczaWithIndex, error) {
|
|
||||||
return b.updateAndGet(lvlPath, nil)
|
|
||||||
}
|
|
||||||
|
|
||||||
// updates active blobovnicza of p-level (dir).
|
|
||||||
//
|
|
||||||
// if current active blobovnicza's index is not old, it remains unchanged.
|
|
||||||
func (b *Blobovniczas) updateActive(lvlPath string, old *uint64) error {
|
|
||||||
b.log.Debug(logs.BlobovniczatreeUpdatingActiveBlobovnicza, zap.String("path", lvlPath))
|
|
||||||
|
|
||||||
_, err := b.updateAndGet(lvlPath, old)
|
|
||||||
|
|
||||||
b.log.Debug(logs.BlobovniczatreeActiveBlobovniczaSuccessfullyUpdated, zap.String("path", lvlPath))
|
|
||||||
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// updates and returns active blobovnicza of p-level (dir).
|
|
||||||
//
|
|
||||||
// if current active blobovnicza's index is not old, it is returned unchanged.
|
|
||||||
func (b *Blobovniczas) updateAndGet(lvlPath string, old *uint64) (blobovniczaWithIndex, error) {
|
|
||||||
b.activeMtx.RLock()
|
|
||||||
active, ok := b.active[lvlPath]
|
|
||||||
b.activeMtx.RUnlock()
|
|
||||||
|
|
||||||
if ok {
|
|
||||||
if old != nil {
|
|
||||||
if active.ind == b.blzLeafWidth-1 {
|
|
||||||
return active, logicerr.New("no more Blobovniczas")
|
|
||||||
dstepanov-yadro
commented
In general, this is an invalid statement. Imagine the situation: the width of the leaf level is 3, the current active database has index = 2, but all objects were deleted from the database with index = 0. So database with index = 0 is not full and can be next active database. In general, this is an invalid statement. Imagine the situation: the width of the leaf level is 3, the current active database has index = 2, but all objects were deleted from the database with index = 0. So database with index = 0 is not full and can be next active database.
|
|||||||
} else if active.ind != *old {
|
|
||||||
// sort of CAS in order to control concurrent
|
|
||||||
// updateActive calls
|
|
||||||
return active, nil
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
return active, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
active.ind++
|
|
||||||
}
|
|
||||||
|
|
||||||
var err error
|
|
||||||
if active.blz, err = b.openBlobovnicza(filepath.Join(lvlPath, u64ToHexString(active.ind))); err != nil {
|
|
||||||
return active, err
|
|
||||||
}
|
|
||||||
|
|
||||||
b.activeMtx.Lock()
|
|
||||||
defer b.activeMtx.Unlock()
|
|
||||||
|
|
||||||
// check 2nd time to find out if it blobovnicza was activated while thread was locked
|
|
||||||
tryActive, ok := b.active[lvlPath]
|
|
||||||
if ok && tryActive.blz == active.blz {
|
|
||||||
return tryActive, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Remove from opened cache (active blobovnicza should always be opened).
|
|
||||||
// Because `onEvict` callback is called in `Remove`, we need to update
|
|
||||||
// active map beforehand.
|
|
||||||
b.active[lvlPath] = active
|
|
||||||
|
|
||||||
activePath := filepath.Join(lvlPath, u64ToHexString(active.ind))
|
|
||||||
b.lruMtx.Lock()
|
|
||||||
b.opened.Remove(activePath)
|
|
||||||
if ok {
|
|
||||||
b.opened.Add(filepath.Join(lvlPath, u64ToHexString(tryActive.ind)), tryActive.blz)
|
|
||||||
}
|
|
||||||
b.lruMtx.Unlock()
|
|
||||||
|
|
||||||
b.log.Debug(logs.BlobovniczatreeBlobovniczaSuccessfullyActivated,
|
|
||||||
zap.String("path", activePath))
|
|
||||||
|
|
||||||
return active, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// returns hash of the object address.
|
// returns hash of the object address.
|
||||||
func addressHash(addr *oid.Address, path string) uint64 {
|
func addressHash(addr *oid.Address, path string) uint64 {
|
||||||
var a string
|
var a string
|
||||||
|
|
103
pkg/local_object_storage/blobstor/blobovniczatree/cache.go
Normal file
|
@ -0,0 +1,103 @@
|
||||||
|
package blobovniczatree
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
utilSync "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/sync"
|
||||||
|
"github.com/hashicorp/golang-lru/v2/simplelru"
|
||||||
|
)
|
||||||
|
|
||||||
|
// dbCache caches sharedDB instances that are NOT open for Put.
|
||||||
|
//
|
||||||
|
// Uses dbManager for opening/closing sharedDB instances.
|
||||||
|
// Stores a reference to an cached sharedDB, so dbManager does not close it.
|
||||||
|
type dbCache struct {
|
||||||
|
cacheGuard *sync.RWMutex
|
||||||
|
cache simplelru.LRUCache[string, *sharedDB]
|
||||||
|
pathLock *utilSync.KeyLocker[string]
|
||||||
|
closed bool
|
||||||
|
|
||||||
|
dbManager *dbManager
|
||||||
|
}
|
||||||
|
|
||||||
|
func newDBCache(size int, dbManager *dbManager) *dbCache {
|
||||||
|
cache, err := simplelru.NewLRU[string, *sharedDB](size, func(_ string, evictedDB *sharedDB) {
|
||||||
|
evictedDB.Close()
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
// occurs only if the size is not positive
|
||||||
|
panic(fmt.Errorf("could not create LRU cache of size %d: %w", size, err))
|
||||||
|
}
|
||||||
|
return &dbCache{
|
||||||
|
cacheGuard: &sync.RWMutex{},
|
||||||
|
cache: cache,
|
||||||
|
dbManager: dbManager,
|
||||||
|
pathLock: utilSync.NewKeyLocker[string](),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *dbCache) Open() {
|
||||||
|
c.cacheGuard.Lock()
|
||||||
|
defer c.cacheGuard.Unlock()
|
||||||
|
|
||||||
|
c.closed = false
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *dbCache) Close() {
|
||||||
|
c.cacheGuard.Lock()
|
||||||
|
defer c.cacheGuard.Unlock()
|
||||||
|
c.cache.Purge()
|
||||||
|
c.closed = true
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *dbCache) GetOrCreate(path string) *sharedDB {
|
||||||
|
value := c.getExisted(path)
|
||||||
|
if value != nil {
|
||||||
|
return value
|
||||||
|
}
|
||||||
|
return c.create(path)
|
||||||
|
}
|
||||||
dstepanov-yadro
commented
`cache.Get` can rebuild cache, so exclusive lock.
|
|||||||
|
|
||||||
|
func (c *dbCache) getExisted(path string) *sharedDB {
|
||||||
|
c.cacheGuard.Lock()
|
||||||
|
defer c.cacheGuard.Unlock()
|
||||||
|
|
||||||
|
if value, ok := c.cache.Get(path); ok {
|
||||||
|
return value
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *dbCache) create(path string) *sharedDB {
|
||||||
|
c.pathLock.Lock(path)
|
||||||
|
defer c.pathLock.Unlock(path)
|
||||||
|
|
||||||
|
value := c.getExisted(path)
|
||||||
|
if value != nil {
|
||||||
|
return value
|
||||||
|
}
|
||||||
|
|
||||||
|
value = c.dbManager.GetByPath(path)
|
||||||
|
|
||||||
|
_, err := value.Open() //open db to hold reference, closed by evictedDB.Close() or if cache closed
|
||||||
|
if err != nil {
|
||||||
|
return value
|
||||||
|
}
|
||||||
|
if added := c.put(path, value); !added {
|
||||||
|
value.Close()
|
||||||
|
}
|
||||||
|
return value
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *dbCache) put(path string, db *sharedDB) bool {
|
||||||
|
c.cacheGuard.Lock()
|
||||||
|
defer c.cacheGuard.Unlock()
|
||||||
|
|
||||||
|
if !c.closed {
|
||||||
|
c.cache.Add(path, db)
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
return false
|
||||||
|
}
|
|
@ -0,0 +1,59 @@
|
||||||
|
package blobovniczatree
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger/test"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestBlobovniczaTree_Concurrency(t *testing.T) {
|
||||||
dstepanov-yadro
commented
Test from task. Test from task.
|
|||||||
|
t.Parallel()
|
||||||
|
const n = 1000
|
||||||
|
|
||||||
|
st := NewBlobovniczaTree(
|
||||||
|
WithLogger(test.NewLogger(t, true)),
|
||||||
|
WithObjectSizeLimit(1024),
|
||||||
|
WithBlobovniczaShallowWidth(10),
|
||||||
|
WithBlobovniczaShallowDepth(1),
|
||||||
|
WithRootPath(t.TempDir()))
|
||||||
|
require.NoError(t, st.Open(false))
|
||||||
|
require.NoError(t, st.Init())
|
||||||
|
t.Cleanup(func() {
|
||||||
|
require.NoError(t, st.Close())
|
||||||
|
})
|
||||||
|
|
||||||
|
objGen := &testutil.SeqObjGenerator{ObjSize: 1}
|
||||||
|
|
||||||
|
var cnt atomic.Int64
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
for i := 0; i < 1000; i++ {
|
||||||
|
wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
for cnt.Add(1) <= n {
|
||||||
|
obj := objGen.Next()
|
||||||
|
addr := testutil.AddressFromObject(t, obj)
|
||||||
|
|
||||||
|
raw, err := obj.Marshal()
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
_, err = st.Put(context.Background(), common.PutPrm{
|
||||||
|
Address: addr,
|
||||||
|
RawData: raw,
|
||||||
|
})
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
_, err = st.Get(context.Background(), common.GetPrm{Address: addr})
|
||||||
|
require.NoError(t, err)
|
||||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
Still fails? Still fails?
dstepanov-yadro
commented
Ups, comment removed Ups, comment removed
|
|||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
wg.Wait()
|
||||||
|
}
|
|
@ -2,11 +2,8 @@ package blobovniczatree
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
|
||||||
"path/filepath"
|
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobovnicza"
|
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -14,6 +11,7 @@ import (
|
||||||
func (b *Blobovniczas) Open(readOnly bool) error {
|
func (b *Blobovniczas) Open(readOnly bool) error {
|
||||||
b.readOnly = readOnly
|
b.readOnly = readOnly
|
||||||
b.metrics.SetMode(readOnly)
|
b.metrics.SetMode(readOnly)
|
||||||
|
b.openManagers()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -29,115 +27,40 @@ func (b *Blobovniczas) Init() error {
|
||||||
}
|
}
|
||||||
|
|
||||||
return b.iterateLeaves(context.TODO(), func(p string) (bool, error) {
|
return b.iterateLeaves(context.TODO(), func(p string) (bool, error) {
|
||||||
blz, err := b.openBlobovniczaNoCache(p)
|
shBlz := b.getBlobovniczaWithoutCaching(p)
|
||||||
|
_, err := shBlz.Open()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return true, err
|
return true, err
|
||||||
}
|
}
|
||||||
defer blz.Close()
|
defer shBlz.Close()
|
||||||
|
|
||||||
if err := blz.Init(); err != nil {
|
|
||||||
return true, fmt.Errorf("could not initialize blobovnicza structure %s: %w", p, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
b.log.Debug(logs.BlobovniczatreeBlobovniczaSuccessfullyInitializedClosing, zap.String("id", p))
|
b.log.Debug(logs.BlobovniczatreeBlobovniczaSuccessfullyInitializedClosing, zap.String("id", p))
|
||||||
return false, nil
|
return false, nil
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (b *Blobovniczas) openManagers() {
|
||||||
|
b.commondbManager.Open() //order important
|
||||||
|
b.activeDBManager.Open()
|
||||||
|
b.dbCache.Open()
|
||||||
|
}
|
||||||
|
|
||||||
// Close implements common.Storage.
|
// Close implements common.Storage.
|
||||||
func (b *Blobovniczas) Close() error {
|
func (b *Blobovniczas) Close() error {
|
||||||
b.activeMtx.Lock()
|
b.dbCache.Close() //order important
|
||||||
|
b.activeDBManager.Close()
|
||||||
b.lruMtx.Lock()
|
b.commondbManager.Close()
|
||||||
|
|
||||||
for p, v := range b.active {
|
|
||||||
if err := v.blz.Close(); err != nil {
|
|
||||||
b.log.Debug(logs.BlobovniczatreeCouldNotCloseActiveBlobovnicza,
|
|
||||||
zap.String("path", p),
|
|
||||||
zap.String("error", err.Error()),
|
|
||||||
)
|
|
||||||
}
|
|
||||||
b.opened.Remove(p)
|
|
||||||
}
|
|
||||||
for _, k := range b.opened.Keys() {
|
|
||||||
blz, _ := b.opened.Get(k)
|
|
||||||
if err := blz.Close(); err != nil {
|
|
||||||
b.log.Debug(logs.BlobovniczatreeCouldNotCloseActiveBlobovnicza,
|
|
||||||
zap.String("path", k),
|
|
||||||
zap.String("error", err.Error()),
|
|
||||||
)
|
|
||||||
}
|
|
||||||
b.opened.Remove(k)
|
|
||||||
}
|
|
||||||
|
|
||||||
b.active = make(map[string]blobovniczaWithIndex)
|
|
||||||
b.metrics.Close()
|
|
||||||
|
|
||||||
b.lruMtx.Unlock()
|
|
||||||
|
|
||||||
b.activeMtx.Unlock()
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// opens and returns blobovnicza with path p.
|
// returns blobovnicza with path p
|
||||||
//
|
//
|
||||||
// If blobovnicza is already opened and cached, instance from cache is returned w/o changes.
|
// If blobovnicza is already cached, instance from cache is returned w/o changes.
|
||||||
func (b *Blobovniczas) openBlobovnicza(p string) (*blobovnicza.Blobovnicza, error) {
|
func (b *Blobovniczas) getBlobovnicza(p string) *sharedDB {
|
||||||
b.lruMtx.Lock()
|
return b.dbCache.GetOrCreate(p)
|
||||||
v, ok := b.opened.Get(p)
|
|
||||||
b.lruMtx.Unlock()
|
|
||||||
if ok {
|
|
||||||
// blobovnicza should be opened in cache
|
|
||||||
return v, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
lvlPath := filepath.Dir(p)
|
|
||||||
curIndex := u64FromHexString(filepath.Base(p))
|
|
||||||
|
|
||||||
b.activeMtx.RLock()
|
|
||||||
defer b.activeMtx.RUnlock()
|
|
||||||
|
|
||||||
active, ok := b.active[lvlPath]
|
|
||||||
if ok && active.ind == curIndex {
|
|
||||||
return active.blz, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
b.lruMtx.Lock()
|
|
||||||
defer b.lruMtx.Unlock()
|
|
||||||
|
|
||||||
v, ok = b.opened.Get(p)
|
|
||||||
if ok {
|
|
||||||
return v, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
blz, err := b.openBlobovniczaNoCache(p)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
b.opened.Add(p, blz)
|
|
||||||
|
|
||||||
return blz, nil
|
|
||||||
}
|
}
|
||||||
fyrchik
commented
Is the name and the comment still valid? Is the name and the comment still valid?
dstepanov-yadro
commented
Yes, they are still valid. Yes, they are still valid.
fyrchik
commented
Then why do we call Then why do we call `Open()` on the result at the callsites?
dstepanov-yadro
commented
Ok, then just return. Fixed. Ok, then just return. Fixed.
|
|||||||
|
|
||||||
func (b *Blobovniczas) openBlobovniczaNoCache(p string) (*blobovnicza.Blobovnicza, error) {
|
func (b *Blobovniczas) getBlobovniczaWithoutCaching(p string) *sharedDB {
|
||||||
b.openMtx.Lock()
|
return b.commondbManager.GetByPath(p)
|
||||||
defer b.openMtx.Unlock()
|
|
||||||
|
|
||||||
path := filepath.Join(b.rootPath, p)
|
|
||||||
|
|
||||||
blz := blobovnicza.New(append(b.blzOpts,
|
|
||||||
blobovnicza.WithReadOnly(b.readOnly),
|
|
||||||
blobovnicza.WithPath(path),
|
|
||||||
blobovnicza.WithMetrics(b.metrics.Blobovnicza()),
|
|
||||||
)...)
|
|
||||||
|
|
||||||
if err := blz.Open(); err != nil {
|
|
||||||
return nil, fmt.Errorf("could not open blobovnicza %s: %w", p, err)
|
|
||||||
}
|
|
||||||
if err := blz.Init(); err != nil {
|
|
||||||
return nil, fmt.Errorf("could not init blobovnicza %s: %w", p, err)
|
|
||||||
}
|
|
||||||
return blz, nil
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -3,7 +3,6 @@ package blobovniczatree
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"encoding/hex"
|
"encoding/hex"
|
||||||
"path/filepath"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||||
|
@ -48,10 +47,12 @@ func (b *Blobovniczas) Delete(ctx context.Context, prm common.DeletePrm) (res co
|
||||||
|
|
||||||
if prm.StorageID != nil {
|
if prm.StorageID != nil {
|
||||||
id := blobovnicza.NewIDFromBytes(prm.StorageID)
|
id := blobovnicza.NewIDFromBytes(prm.StorageID)
|
||||||
blz, err := b.openBlobovnicza(id.String())
|
shBlz := b.getBlobovnicza(id.String())
|
||||||
|
blz, err := shBlz.Open()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return res, err
|
return res, err
|
||||||
}
|
}
|
||||||
|
defer shBlz.Close()
|
||||||
|
|
||||||
if res, err = b.deleteObject(ctx, blz, bPrm); err == nil {
|
if res, err = b.deleteObject(ctx, blz, bPrm); err == nil {
|
||||||
success = true
|
success = true
|
||||||
|
@ -59,16 +60,10 @@ func (b *Blobovniczas) Delete(ctx context.Context, prm common.DeletePrm) (res co
|
||||||
return res, err
|
return res, err
|
||||||
}
|
}
|
||||||
|
|
||||||
activeCache := make(map[string]struct{})
|
|
||||||
dstepanov-yadro
commented
It was a protection in case the active database was changed during the iteration. Now changing the active database does not affect the opening of the database. It was a protection in case the active database was changed during the iteration. Now changing the active database does not affect the opening of the database.
|
|||||||
objectFound := false
|
objectFound := false
|
||||||
|
|
||||||
err = b.iterateSortedLeaves(ctx, &prm.Address, func(p string) (bool, error) {
|
err = b.iterateSortedLeaves(ctx, &prm.Address, func(p string) (bool, error) {
|
||||||
dirPath := filepath.Dir(p)
|
res, err = b.deleteObjectFromLevel(ctx, bPrm, p)
|
||||||
|
|
||||||
// don't process active blobovnicza of the level twice
|
|
||||||
_, ok := activeCache[dirPath]
|
|
||||||
|
|
||||||
res, err = b.deleteObjectFromLevel(ctx, bPrm, p, !ok)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if !client.IsErrObjectNotFound(err) {
|
if !client.IsErrObjectNotFound(err) {
|
||||||
b.log.Debug(logs.BlobovniczatreeCouldNotRemoveObjectFromLevel,
|
b.log.Debug(logs.BlobovniczatreeCouldNotRemoveObjectFromLevel,
|
||||||
|
@ -78,8 +73,6 @@ func (b *Blobovniczas) Delete(ctx context.Context, prm common.DeletePrm) (res co
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
activeCache[dirPath] = struct{}{}
|
|
||||||
|
|
||||||
if err == nil {
|
if err == nil {
|
||||||
objectFound = true
|
objectFound = true
|
||||||
}
|
}
|
||||||
|
@ -100,57 +93,13 @@ func (b *Blobovniczas) Delete(ctx context.Context, prm common.DeletePrm) (res co
|
||||||
// tries to delete object from particular blobovnicza.
|
// tries to delete object from particular blobovnicza.
|
||||||
//
|
//
|
||||||
// returns no error if object was removed from some blobovnicza of the same level.
|
// returns no error if object was removed from some blobovnicza of the same level.
|
||||||
func (b *Blobovniczas) deleteObjectFromLevel(ctx context.Context, prm blobovnicza.DeletePrm, blzPath string, tryActive bool) (common.DeleteRes, error) {
|
func (b *Blobovniczas) deleteObjectFromLevel(ctx context.Context, prm blobovnicza.DeletePrm, blzPath string) (common.DeleteRes, error) {
|
||||||
lvlPath := filepath.Dir(blzPath)
|
shBlz := b.getBlobovnicza(blzPath)
|
||||||
|
blz, err := shBlz.Open()
|
||||||
// try to remove from blobovnicza if it is opened
|
|
||||||
b.lruMtx.Lock()
|
|
||||||
v, ok := b.opened.Get(blzPath)
|
|
||||||
b.lruMtx.Unlock()
|
|
||||||
if ok {
|
|
||||||
if res, err := b.deleteObject(ctx, v, prm); err == nil {
|
|
||||||
return res, err
|
|
||||||
} else if !client.IsErrObjectNotFound(err) {
|
|
||||||
b.log.Debug(logs.BlobovniczatreeCouldNotRemoveObjectFromOpenedBlobovnicza,
|
|
||||||
zap.String("path", blzPath),
|
|
||||||
zap.String("error", err.Error()),
|
|
||||||
)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// therefore the object is possibly placed in a lighter blobovnicza
|
|
||||||
|
|
||||||
// next we check in the active level blobobnicza:
|
|
||||||
// * the active blobovnicza is always opened.
|
|
||||||
b.activeMtx.RLock()
|
|
||||||
active, ok := b.active[lvlPath]
|
|
||||||
b.activeMtx.RUnlock()
|
|
||||||
|
|
||||||
if ok && tryActive {
|
|
||||||
if res, err := b.deleteObject(ctx, active.blz, prm); err == nil {
|
|
||||||
return res, err
|
|
||||||
} else if !client.IsErrObjectNotFound(err) {
|
|
||||||
b.log.Debug(logs.BlobovniczatreeCouldNotRemoveObjectFromActiveBlobovnicza,
|
|
||||||
zap.String("path", blzPath),
|
|
||||||
zap.String("error", err.Error()),
|
|
||||||
)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// then object is possibly placed in closed blobovnicza
|
|
||||||
|
|
||||||
// check if it makes sense to try to open the blob
|
|
||||||
// (Blobovniczas "after" the active one are empty anyway,
|
|
||||||
// and it's pointless to open them).
|
|
||||||
if u64FromHexString(filepath.Base(blzPath)) > active.ind {
|
|
||||||
return common.DeleteRes{}, logicerr.Wrap(new(apistatus.ObjectNotFound))
|
|
||||||
}
|
|
||||||
|
|
||||||
// open blobovnicza (cached inside)
|
|
||||||
blz, err := b.openBlobovnicza(blzPath)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return common.DeleteRes{}, err
|
return common.DeleteRes{}, err
|
||||||
}
|
}
|
||||||
|
defer shBlz.Close()
|
||||||
|
|
||||||
return b.deleteObject(ctx, blz, prm)
|
return b.deleteObject(ctx, blz, prm)
|
||||||
}
|
}
|
||||||
|
|
|
@ -7,6 +7,8 @@ import (
|
||||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var errClosed = logicerr.New("blobvnicza is closed")
|
||||||
dstepanov-yadro
commented
Error is logical to not to count as shard error. Error is logical to not to count as shard error.
|
|||||||
|
|
||||||
func isErrOutOfRange(err error) bool {
|
func isErrOutOfRange(err error) bool {
|
||||||
var target *apistatus.ObjectOutOfRange
|
var target *apistatus.ObjectOutOfRange
|
||||||
return errors.As(err, &target)
|
return errors.As(err, &target)
|
||||||
|
|
|
@ -3,7 +3,6 @@ package blobovniczatree
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"encoding/hex"
|
"encoding/hex"
|
||||||
"path/filepath"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||||
|
@ -37,26 +36,22 @@ func (b *Blobovniczas) Exists(ctx context.Context, prm common.ExistsPrm) (common
|
||||||
|
|
||||||
if prm.StorageID != nil {
|
if prm.StorageID != nil {
|
||||||
id := blobovnicza.NewIDFromBytes(prm.StorageID)
|
id := blobovnicza.NewIDFromBytes(prm.StorageID)
|
||||||
blz, err := b.openBlobovnicza(id.String())
|
shBlz := b.getBlobovnicza(id.String())
|
||||||
|
blz, err := shBlz.Open()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return common.ExistsRes{}, err
|
return common.ExistsRes{}, err
|
||||||
}
|
}
|
||||||
|
defer shBlz.Close()
|
||||||
|
|
||||||
exists, err := blz.Exists(ctx, prm.Address)
|
exists, err := blz.Exists(ctx, prm.Address)
|
||||||
return common.ExistsRes{Exists: exists}, err
|
return common.ExistsRes{Exists: exists}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
activeCache := make(map[string]struct{})
|
|
||||||
|
|
||||||
var gPrm blobovnicza.GetPrm
|
var gPrm blobovnicza.GetPrm
|
||||||
gPrm.SetAddress(prm.Address)
|
gPrm.SetAddress(prm.Address)
|
||||||
|
|
||||||
err := b.iterateSortedLeaves(ctx, &prm.Address, func(p string) (bool, error) {
|
err := b.iterateSortedLeaves(ctx, &prm.Address, func(p string) (bool, error) {
|
||||||
dirPath := filepath.Dir(p)
|
_, err := b.getObjectFromLevel(ctx, gPrm, p)
|
||||||
|
|
||||||
_, ok := activeCache[dirPath]
|
|
||||||
|
|
||||||
_, err := b.getObjectFromLevel(ctx, gPrm, p, !ok)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if !client.IsErrObjectNotFound(err) {
|
if !client.IsErrObjectNotFound(err) {
|
||||||
b.log.Debug(logs.BlobovniczatreeCouldNotGetObjectFromLevel,
|
b.log.Debug(logs.BlobovniczatreeCouldNotGetObjectFromLevel,
|
||||||
|
@ -65,7 +60,6 @@ func (b *Blobovniczas) Exists(ctx context.Context, prm common.ExistsPrm) (common
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
activeCache[dirPath] = struct{}{}
|
|
||||||
found = err == nil
|
found = err == nil
|
||||||
return found, nil
|
return found, nil
|
||||||
})
|
})
|
||||||
|
|
|
@ -4,7 +4,6 @@ import (
|
||||||
"context"
|
"context"
|
||||||
"encoding/hex"
|
"encoding/hex"
|
||||||
"fmt"
|
"fmt"
|
||||||
"path/filepath"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||||
|
@ -48,10 +47,12 @@ func (b *Blobovniczas) Get(ctx context.Context, prm common.GetPrm) (res common.G
|
||||||
|
|
||||||
if prm.StorageID != nil {
|
if prm.StorageID != nil {
|
||||||
id := blobovnicza.NewIDFromBytes(prm.StorageID)
|
id := blobovnicza.NewIDFromBytes(prm.StorageID)
|
||||||
blz, err := b.openBlobovnicza(id.String())
|
shBlz := b.getBlobovnicza(id.String())
|
||||||
|
blz, err := shBlz.Open()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return res, err
|
return res, err
|
||||||
}
|
}
|
||||||
|
defer shBlz.Close()
|
||||||
|
|
||||||
res, err = b.getObject(ctx, blz, bPrm)
|
res, err = b.getObject(ctx, blz, bPrm)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
|
@ -61,14 +62,8 @@ func (b *Blobovniczas) Get(ctx context.Context, prm common.GetPrm) (res common.G
|
||||||
return res, err
|
return res, err
|
||||||
}
|
}
|
||||||
|
|
||||||
activeCache := make(map[string]struct{})
|
|
||||||
|
|
||||||
err = b.iterateSortedLeaves(ctx, &prm.Address, func(p string) (bool, error) {
|
err = b.iterateSortedLeaves(ctx, &prm.Address, func(p string) (bool, error) {
|
||||||
dirPath := filepath.Dir(p)
|
res, err = b.getObjectFromLevel(ctx, bPrm, p)
|
||||||
|
|
||||||
_, ok := activeCache[dirPath]
|
|
||||||
|
|
||||||
res, err = b.getObjectFromLevel(ctx, bPrm, p, !ok)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if !client.IsErrObjectNotFound(err) {
|
if !client.IsErrObjectNotFound(err) {
|
||||||
b.log.Debug(logs.BlobovniczatreeCouldNotGetObjectFromLevel,
|
b.log.Debug(logs.BlobovniczatreeCouldNotGetObjectFromLevel,
|
||||||
|
@ -78,8 +73,6 @@ func (b *Blobovniczas) Get(ctx context.Context, prm common.GetPrm) (res common.G
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
activeCache[dirPath] = struct{}{}
|
|
||||||
|
|
||||||
// abort iterator if found, otherwise process all Blobovniczas
|
// abort iterator if found, otherwise process all Blobovniczas
|
||||||
return err == nil, nil
|
return err == nil, nil
|
||||||
})
|
})
|
||||||
|
@ -98,58 +91,14 @@ func (b *Blobovniczas) Get(ctx context.Context, prm common.GetPrm) (res common.G
|
||||||
// tries to read object from particular blobovnicza.
|
// tries to read object from particular blobovnicza.
|
||||||
//
|
//
|
||||||
// returns error if object could not be read from any blobovnicza of the same level.
|
// returns error if object could not be read from any blobovnicza of the same level.
|
||||||
func (b *Blobovniczas) getObjectFromLevel(ctx context.Context, prm blobovnicza.GetPrm, blzPath string, tryActive bool) (common.GetRes, error) {
|
func (b *Blobovniczas) getObjectFromLevel(ctx context.Context, prm blobovnicza.GetPrm, blzPath string) (common.GetRes, error) {
|
||||||
lvlPath := filepath.Dir(blzPath)
|
|
||||||
|
|
||||||
// try to read from blobovnicza if it is opened
|
|
||||||
b.lruMtx.Lock()
|
|
||||||
v, ok := b.opened.Get(blzPath)
|
|
||||||
b.lruMtx.Unlock()
|
|
||||||
if ok {
|
|
||||||
if res, err := b.getObject(ctx, v, prm); err == nil {
|
|
||||||
return res, err
|
|
||||||
} else if !client.IsErrObjectNotFound(err) {
|
|
||||||
b.log.Debug(logs.BlobovniczatreeCouldNotReadObjectFromOpenedBlobovnicza,
|
|
||||||
zap.String("path", blzPath),
|
|
||||||
zap.String("error", err.Error()),
|
|
||||||
)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// therefore the object is possibly placed in a lighter blobovnicza
|
|
||||||
|
|
||||||
// next we check in the active level blobobnicza:
|
|
||||||
// * the freshest objects are probably the most demanded;
|
|
||||||
// * the active blobovnicza is always opened.
|
|
||||||
b.activeMtx.RLock()
|
|
||||||
active, ok := b.active[lvlPath]
|
|
||||||
b.activeMtx.RUnlock()
|
|
||||||
|
|
||||||
if ok && tryActive {
|
|
||||||
if res, err := b.getObject(ctx, active.blz, prm); err == nil {
|
|
||||||
return res, err
|
|
||||||
} else if !client.IsErrObjectNotFound(err) {
|
|
||||||
b.log.Debug(logs.BlobovniczatreeCouldNotGetObjectFromActiveBlobovnicza,
|
|
||||||
zap.String("path", blzPath),
|
|
||||||
zap.String("error", err.Error()),
|
|
||||||
)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// then object is possibly placed in closed blobovnicza
|
|
||||||
|
|
||||||
// check if it makes sense to try to open the blob
|
|
||||||
// (Blobovniczas "after" the active one are empty anyway,
|
|
||||||
// and it's pointless to open them).
|
|
||||||
if u64FromHexString(filepath.Base(blzPath)) > active.ind {
|
|
||||||
return common.GetRes{}, logicerr.Wrap(new(apistatus.ObjectNotFound))
|
|
||||||
}
|
|
||||||
|
|
||||||
// open blobovnicza (cached inside)
|
// open blobovnicza (cached inside)
|
||||||
blz, err := b.openBlobovnicza(blzPath)
|
shBlz := b.getBlobovnicza(blzPath)
|
||||||
|
blz, err := shBlz.Open()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return common.GetRes{}, err
|
return common.GetRes{}, err
|
||||||
}
|
}
|
||||||
|
defer shBlz.Close()
|
||||||
|
|
||||||
return b.getObject(ctx, blz, prm)
|
return b.getObject(ctx, blz, prm)
|
||||||
}
|
}
|
||||||
|
|
|
@ -4,7 +4,6 @@ import (
|
||||||
"context"
|
"context"
|
||||||
"encoding/hex"
|
"encoding/hex"
|
||||||
"fmt"
|
"fmt"
|
||||||
"path/filepath"
|
|
||||||
"strconv"
|
"strconv"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -47,10 +46,12 @@ func (b *Blobovniczas) GetRange(ctx context.Context, prm common.GetRangePrm) (re
|
||||||
|
|
||||||
if prm.StorageID != nil {
|
if prm.StorageID != nil {
|
||||||
id := blobovnicza.NewIDFromBytes(prm.StorageID)
|
id := blobovnicza.NewIDFromBytes(prm.StorageID)
|
||||||
blz, err := b.openBlobovnicza(id.String())
|
shBlz := b.getBlobovnicza(id.String())
|
||||||
|
blz, err := shBlz.Open()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return common.GetRangeRes{}, err
|
return common.GetRangeRes{}, err
|
||||||
}
|
}
|
||||||
|
defer shBlz.Close()
|
||||||
|
|
||||||
res, err := b.getObjectRange(ctx, blz, prm)
|
res, err := b.getObjectRange(ctx, blz, prm)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
|
@ -60,15 +61,10 @@ func (b *Blobovniczas) GetRange(ctx context.Context, prm common.GetRangePrm) (re
|
||||||
return res, err
|
return res, err
|
||||||
}
|
}
|
||||||
|
|
||||||
activeCache := make(map[string]struct{})
|
|
||||||
objectFound := false
|
objectFound := false
|
||||||
|
|
||||||
err = b.iterateSortedLeaves(ctx, &prm.Address, func(p string) (bool, error) {
|
err = b.iterateSortedLeaves(ctx, &prm.Address, func(p string) (bool, error) {
|
||||||
dirPath := filepath.Dir(p)
|
res, err = b.getRangeFromLevel(ctx, prm, p)
|
||||||
|
|
||||||
_, ok := activeCache[dirPath]
|
|
||||||
|
|
||||||
res, err = b.getRangeFromLevel(ctx, prm, p, !ok)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
outOfBounds := isErrOutOfRange(err)
|
outOfBounds := isErrOutOfRange(err)
|
||||||
if !outOfBounds && !client.IsErrObjectNotFound(err) {
|
if !outOfBounds && !client.IsErrObjectNotFound(err) {
|
||||||
|
@ -82,8 +78,6 @@ func (b *Blobovniczas) GetRange(ctx context.Context, prm common.GetRangePrm) (re
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
activeCache[dirPath] = struct{}{}
|
|
||||||
|
|
||||||
objectFound = err == nil
|
objectFound = err == nil
|
||||||
|
|
||||||
// abort iterator if found, otherwise process all Blobovniczas
|
// abort iterator if found, otherwise process all Blobovniczas
|
||||||
|
@ -106,68 +100,14 @@ func (b *Blobovniczas) GetRange(ctx context.Context, prm common.GetRangePrm) (re
|
||||||
// tries to read range of object payload data from particular blobovnicza.
|
// tries to read range of object payload data from particular blobovnicza.
|
||||||
//
|
//
|
||||||
// returns error if object could not be read from any blobovnicza of the same level.
|
// returns error if object could not be read from any blobovnicza of the same level.
|
||||||
func (b *Blobovniczas) getRangeFromLevel(ctx context.Context, prm common.GetRangePrm, blzPath string, tryActive bool) (common.GetRangeRes, error) {
|
func (b *Blobovniczas) getRangeFromLevel(ctx context.Context, prm common.GetRangePrm, blzPath string) (common.GetRangeRes, error) {
|
||||||
lvlPath := filepath.Dir(blzPath)
|
|
||||||
|
|
||||||
// try to read from blobovnicza if it is opened
|
|
||||||
b.lruMtx.Lock()
|
|
||||||
v, ok := b.opened.Get(blzPath)
|
|
||||||
b.lruMtx.Unlock()
|
|
||||||
if ok {
|
|
||||||
res, err := b.getObjectRange(ctx, v, prm)
|
|
||||||
switch {
|
|
||||||
case err == nil,
|
|
||||||
isErrOutOfRange(err):
|
|
||||||
return res, err
|
|
||||||
default:
|
|
||||||
if !client.IsErrObjectNotFound(err) {
|
|
||||||
b.log.Debug(logs.BlobovniczatreeCouldNotReadPayloadRangeFromOpenedBlobovnicza,
|
|
||||||
zap.String("path", blzPath),
|
|
||||||
zap.String("error", err.Error()),
|
|
||||||
)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// therefore the object is possibly placed in a lighter blobovnicza
|
|
||||||
|
|
||||||
// next we check in the active level blobobnicza:
|
|
||||||
// * the freshest objects are probably the most demanded;
|
|
||||||
// * the active blobovnicza is always opened.
|
|
||||||
b.activeMtx.RLock()
|
|
||||||
active, ok := b.active[lvlPath]
|
|
||||||
b.activeMtx.RUnlock()
|
|
||||||
|
|
||||||
if ok && tryActive {
|
|
||||||
res, err := b.getObjectRange(ctx, active.blz, prm)
|
|
||||||
switch {
|
|
||||||
case err == nil,
|
|
||||||
isErrOutOfRange(err):
|
|
||||||
return res, err
|
|
||||||
default:
|
|
||||||
if !client.IsErrObjectNotFound(err) {
|
|
||||||
b.log.Debug(logs.BlobovniczatreeCouldNotReadPayloadRangeFromActiveBlobovnicza,
|
|
||||||
zap.String("path", blzPath),
|
|
||||||
zap.String("error", err.Error()),
|
|
||||||
)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// then object is possibly placed in closed blobovnicza
|
|
||||||
|
|
||||||
// check if it makes sense to try to open the blob
|
|
||||||
// (Blobovniczas "after" the active one are empty anyway,
|
|
||||||
// and it's pointless to open them).
|
|
||||||
if u64FromHexString(filepath.Base(blzPath)) > active.ind {
|
|
||||||
return common.GetRangeRes{}, logicerr.Wrap(new(apistatus.ObjectNotFound))
|
|
||||||
}
|
|
||||||
|
|
||||||
// open blobovnicza (cached inside)
|
// open blobovnicza (cached inside)
|
||||||
blz, err := b.openBlobovnicza(blzPath)
|
shBlz := b.getBlobovnicza(blzPath)
|
||||||
|
blz, err := shBlz.Open()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return common.GetRangeRes{}, err
|
return common.GetRangeRes{}, err
|
||||||
}
|
}
|
||||||
|
defer shBlz.Close()
|
||||||
|
|
||||||
return b.getObjectRange(ctx, blz, prm)
|
return b.getObjectRange(ctx, blz, prm)
|
||||||
}
|
}
|
||||||
|
|
|
@ -68,13 +68,15 @@ func (b *Blobovniczas) Iterate(ctx context.Context, prm common.IteratePrm) (comm
|
||||||
// iterator over all Blobovniczas in unsorted order. Break on f's error return.
|
// iterator over all Blobovniczas in unsorted order. Break on f's error return.
|
||||||
func (b *Blobovniczas) iterateBlobovniczas(ctx context.Context, ignoreErrors bool, f func(string, *blobovnicza.Blobovnicza) error) error {
|
func (b *Blobovniczas) iterateBlobovniczas(ctx context.Context, ignoreErrors bool, f func(string, *blobovnicza.Blobovnicza) error) error {
|
||||||
return b.iterateLeaves(ctx, func(p string) (bool, error) {
|
return b.iterateLeaves(ctx, func(p string) (bool, error) {
|
||||||
blz, err := b.openBlobovnicza(p)
|
shBlz := b.getBlobovnicza(p)
|
||||||
|
blz, err := shBlz.Open()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if ignoreErrors {
|
if ignoreErrors {
|
||||||
return false, nil
|
return false, nil
|
||||||
}
|
}
|
||||||
return false, fmt.Errorf("could not open blobovnicza %s: %w", p, err)
|
return false, fmt.Errorf("could not open blobovnicza %s: %w", p, err)
|
||||||
}
|
}
|
||||||
|
defer shBlz.Close()
|
||||||
|
|
||||||
err = f(p, blz)
|
err = f(p, blz)
|
||||||
|
|
||||||
|
|
242
pkg/local_object_storage/blobstor/blobovniczatree/manager.go
Normal file
|
@ -0,0 +1,242 @@
|
||||||
|
package blobovniczatree
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"path/filepath"
|
||||||
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobovnicza"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||||
|
"go.uber.org/zap"
|
||||||
|
)
|
||||||
|
|
||||||
|
// sharedDB is responsible for opening and closing a file of single blobovnicza.
|
||||||
|
type sharedDB struct {
|
||||||
|
guard *sync.RWMutex
|
||||||
|
blcza *blobovnicza.Blobovnicza
|
||||||
|
refCount uint32
|
||||||
|
|
||||||
|
openDBCounter *openDBCounter
|
||||||
|
closedFlag *atomic.Bool
|
||||||
|
options []blobovnicza.Option
|
||||||
|
path string
|
||||||
|
readOnly bool
|
||||||
|
metrics blobovnicza.Metrics
|
||||||
|
log *logger.Logger
|
||||||
|
}
|
||||||
|
|
||||||
|
func newSharedDB(options []blobovnicza.Option, path string, readOnly bool,
|
||||||
|
metrics blobovnicza.Metrics, openDBCounter *openDBCounter, closedFlag *atomic.Bool, log *logger.Logger) *sharedDB {
|
||||||
|
return &sharedDB{
|
||||||
|
guard: &sync.RWMutex{},
|
||||||
|
|
||||||
|
options: options,
|
||||||
|
path: path,
|
||||||
|
readOnly: readOnly,
|
||||||
|
metrics: metrics,
|
||||||
|
closedFlag: closedFlag,
|
||||||
|
log: log,
|
||||||
|
openDBCounter: openDBCounter,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *sharedDB) Open() (*blobovnicza.Blobovnicza, error) {
|
||||||
|
if b.closedFlag.Load() {
|
||||||
|
return nil, errClosed
|
||||||
|
}
|
||||||
|
|
||||||
|
b.guard.Lock()
|
||||||
|
defer b.guard.Unlock()
|
||||||
|
|
||||||
|
if b.refCount > 0 {
|
||||||
|
b.refCount++
|
||||||
|
return b.blcza, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
blz := blobovnicza.New(append(b.options,
|
||||||
|
blobovnicza.WithReadOnly(b.readOnly),
|
||||||
|
blobovnicza.WithPath(b.path),
|
||||||
|
blobovnicza.WithMetrics(b.metrics),
|
||||||
|
)...)
|
||||||
|
|
||||||
|
if err := blz.Open(); err != nil {
|
||||||
|
return nil, fmt.Errorf("could not open blobovnicza %s: %w", b.path, err)
|
||||||
|
}
|
||||||
|
if err := blz.Init(); err != nil {
|
||||||
|
return nil, fmt.Errorf("could not init blobovnicza %s: %w", b.path, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
b.refCount++
|
||||||
|
b.blcza = blz
|
||||||
|
b.openDBCounter.Inc()
|
||||||
|
|
||||||
|
return blz, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *sharedDB) Close() {
|
||||||
|
b.guard.Lock()
|
||||||
|
defer b.guard.Unlock()
|
||||||
|
|
||||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
What is the expected scenario for this to happen? What is the expected scenario for this to happen?
dstepanov-yadro
commented
Developer mistake. Developer mistake.
fyrchik
commented
It's worth some high-level logs then (I would panic, but it is not obvious why it couldn't happen) It's worth some high-level logs then (I would panic, but it is not obvious why it couldn't happen)
dstepanov-yadro
commented
Added error log Added error log
|
|||||||
|
if b.refCount == 0 {
|
||||||
|
b.log.Error(logs.AttemtToCloseAlreadyClosedBlobovnicza, zap.String("id", b.path))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if b.refCount == 1 {
|
||||||
|
b.refCount = 0
|
||||||
|
if err := b.blcza.Close(); err != nil {
|
||||||
|
b.log.Error(logs.BlobovniczatreeCouldNotCloseBlobovnicza,
|
||||||
|
zap.String("id", b.path),
|
||||||
|
zap.String("error", err.Error()),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
b.blcza = nil
|
||||||
|
b.openDBCounter.Dec()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
b.refCount--
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *sharedDB) Path() string {
|
||||||
|
return b.path
|
||||||
|
}
|
||||||
|
|
||||||
|
// levelDbManager stores pointers of the sharedDB's for the leaf directory of the blobovnicza tree.
|
||||||
|
type levelDbManager struct {
|
||||||
|
databases []*sharedDB
|
||||||
|
}
|
||||||
|
|
||||||
|
func newLevelDBManager(width uint64, options []blobovnicza.Option, rootPath string, lvlPath string,
|
||||||
|
readOnly bool, metrics blobovnicza.Metrics, openDBCounter *openDBCounter, closedFlog *atomic.Bool, log *logger.Logger) *levelDbManager {
|
||||||
|
result := &levelDbManager{
|
||||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
It is not used outside the loop, why not go with It is not used outside the loop, why not go with `idx := 0`?
dstepanov-yadro
commented
Fixed Fixed
|
|||||||
|
databases: make([]*sharedDB, width),
|
||||||
|
}
|
||||||
|
for idx := uint64(0); idx < width; idx++ {
|
||||||
|
result.databases[idx] = newSharedDB(options, filepath.Join(rootPath, lvlPath, u64ToHexString(idx)), readOnly, metrics, openDBCounter, closedFlog, log)
|
||||||
|
}
|
||||||
|
return result
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *levelDbManager) GetByIndex(idx uint64) *sharedDB {
|
||||||
|
return m.databases[idx]
|
||||||
|
}
|
||||||
|
|
||||||
|
// dbManager manages the opening and closing of blobovnicza instances.
|
||||||
|
//
|
||||||
|
// The blobovnicza opens at the first request, closes after the last request.
|
||||||
|
type dbManager struct {
|
||||||
|
levelToManager map[string]*levelDbManager
|
||||||
|
levelToManagerGuard *sync.RWMutex
|
||||||
|
closedFlag *atomic.Bool
|
||||||
|
dbCounter *openDBCounter
|
||||||
|
|
||||||
|
rootPath string
|
||||||
|
options []blobovnicza.Option
|
||||||
|
readOnly bool
|
||||||
|
metrics blobovnicza.Metrics
|
||||||
|
leafWidth uint64
|
||||||
|
log *logger.Logger
|
||||||
|
}
|
||||||
|
|
||||||
|
func newDBManager(rootPath string, options []blobovnicza.Option, leafWidth uint64, readOnly bool, metrics blobovnicza.Metrics, log *logger.Logger) *dbManager {
|
||||||
|
return &dbManager{
|
||||||
|
rootPath: rootPath,
|
||||||
|
options: options,
|
||||||
|
readOnly: readOnly,
|
||||||
|
metrics: metrics,
|
||||||
|
leafWidth: leafWidth,
|
||||||
|
levelToManager: make(map[string]*levelDbManager),
|
||||||
|
levelToManagerGuard: &sync.RWMutex{},
|
||||||
|
log: log,
|
||||||
|
closedFlag: &atomic.Bool{},
|
||||||
|
dbCounter: newOpenDBCounter(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *dbManager) GetByPath(path string) *sharedDB {
|
||||||
|
lvlPath := filepath.Dir(path)
|
||||||
|
curIndex := u64FromHexString(filepath.Base(path))
|
||||||
|
levelManager := m.getLevelManager(lvlPath)
|
||||||
|
return levelManager.GetByIndex(curIndex)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *dbManager) Open() {
|
||||||
|
m.closedFlag.Store(false)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *dbManager) Close() {
|
||||||
|
m.closedFlag.Store(true)
|
||||||
|
m.dbCounter.WaitUntilAllClosed()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *dbManager) getLevelManager(lvlPath string) *levelDbManager {
|
||||||
|
result := m.getLevelManagerIfExists(lvlPath)
|
||||||
|
if result != nil {
|
||||||
|
return result
|
||||||
|
}
|
||||||
|
return m.getOrCreateLevelManager(lvlPath)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *dbManager) getLevelManagerIfExists(lvlPath string) *levelDbManager {
|
||||||
|
m.levelToManagerGuard.RLock()
|
||||||
|
defer m.levelToManagerGuard.RUnlock()
|
||||||
|
|
||||||
|
return m.levelToManager[lvlPath]
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *dbManager) getOrCreateLevelManager(lvlPath string) *levelDbManager {
|
||||||
|
m.levelToManagerGuard.Lock()
|
||||||
|
defer m.levelToManagerGuard.Unlock()
|
||||||
|
|
||||||
|
if result, ok := m.levelToManager[lvlPath]; ok {
|
||||||
|
return result
|
||||||
|
}
|
||||||
|
|
||||||
|
result := newLevelDBManager(m.leafWidth, m.options, m.rootPath, lvlPath, m.readOnly, m.metrics, m.dbCounter, m.closedFlag, m.log)
|
||||||
|
m.levelToManager[lvlPath] = result
|
||||||
dstepanov-yadro
commented
Yes, it's not Go way. But it looks simpler than mutex-protected channel. Yes, it's not Go way. But it looks simpler than mutex-protected channel.
|
|||||||
|
return result
|
||||||
|
}
|
||||||
|
|
||||||
|
type openDBCounter struct {
|
||||||
|
cond *sync.Cond
|
||||||
|
count uint64
|
||||||
|
}
|
||||||
|
|
||||||
|
func newOpenDBCounter() *openDBCounter {
|
||||||
|
return &openDBCounter{
|
||||||
|
cond: &sync.Cond{
|
||||||
|
L: &sync.Mutex{},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *openDBCounter) Inc() {
|
||||||
|
c.cond.L.Lock()
|
||||||
|
defer c.cond.L.Unlock()
|
||||||
|
|
||||||
|
c.count++
|
||||||
|
}
|
||||||
|
|
||||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
It there an expected situation when It there an expected situation when `c.count == 0` here?
dstepanov-yadro
commented
Only if there is a developer error. Only if there is a developer error.
|
|||||||
|
func (c *openDBCounter) Dec() {
|
||||||
|
c.cond.L.Lock()
|
||||||
|
defer c.cond.L.Unlock()
|
||||||
|
|
||||||
|
if c.count > 0 {
|
||||||
|
c.count--
|
||||||
|
}
|
||||||
|
|
||||||
|
if c.count == 0 {
|
||||||
|
c.cond.Broadcast()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *openDBCounter) WaitUntilAllClosed() {
|
||||||
|
c.cond.L.Lock()
|
||||||
|
for c.count > 0 {
|
||||||
|
c.cond.Wait()
|
||||||
|
}
|
||||||
|
c.cond.L.Unlock()
|
||||||
|
}
|
|
@ -2,7 +2,6 @@ package blobovniczatree
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -10,7 +9,6 @@ import (
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobovnicza"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobovnicza"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||||
"go.etcd.io/bbolt"
|
|
||||||
"go.opentelemetry.io/otel/attribute"
|
"go.opentelemetry.io/otel/attribute"
|
||||||
"go.opentelemetry.io/otel/trace"
|
"go.opentelemetry.io/otel/trace"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
|
@ -76,8 +74,8 @@ type putIterator struct {
|
||||||
PutPrm blobovnicza.PutPrm
|
PutPrm blobovnicza.PutPrm
|
||||||
}
|
}
|
||||||
|
|
||||||
func (i *putIterator) iterate(ctx context.Context, path string) (bool, error) {
|
func (i *putIterator) iterate(ctx context.Context, lvlPath string) (bool, error) {
|
||||||
active, err := i.B.getActivated(path)
|
active, err := i.B.activeDBManager.GetOpenedActiveDBForLevel(lvlPath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if !isLogical(err) {
|
if !isLogical(err) {
|
||||||
i.B.reportError(logs.BlobovniczatreeCouldNotGetActiveBlobovnicza, err)
|
i.B.reportError(logs.BlobovniczatreeCouldNotGetActiveBlobovnicza, err)
|
||||||
|
@ -89,46 +87,29 @@ func (i *putIterator) iterate(ctx context.Context, path string) (bool, error) {
|
||||||
return false, nil
|
return false, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
if _, err := active.blz.Put(ctx, i.PutPrm); err != nil {
|
if active == nil {
|
||||||
// Check if blobovnicza is full. We could either receive `blobovnicza.ErrFull` error
|
i.B.log.Debug(logs.BlobovniczatreeBlobovniczaOverflowed, zap.String("level", lvlPath))
|
||||||
// or update active blobovnicza in other thread. In the latter case the database will be closed
|
return false, nil
|
||||||
// and `updateActive` takes care of not updating the active blobovnicza twice.
|
}
|
||||||
if isFull := errors.Is(err, blobovnicza.ErrFull); isFull || errors.Is(err, bbolt.ErrDatabaseNotOpen) {
|
defer active.Close()
|
||||||
if isFull {
|
|
||||||
i.B.log.Debug(logs.BlobovniczatreeBlobovniczaOverflowed,
|
|
||||||
zap.String("path", filepath.Join(path, u64ToHexString(active.ind))))
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := i.B.updateActive(path, &active.ind); err != nil {
|
i.AllFull = false
|
||||||
if !isLogical(err) {
|
|
||||||
i.B.reportError(logs.BlobovniczatreeCouldNotUpdateActiveBlobovnicza, err)
|
|
||||||
} else {
|
|
||||||
i.B.log.Debug(logs.BlobovniczatreeCouldNotUpdateActiveBlobovnicza,
|
|
||||||
zap.String("level", path),
|
|
||||||
zap.String("error", err.Error()))
|
|
||||||
}
|
|
||||||
|
|
||||||
return false, nil
|
_, err = active.Blobovnicza().Put(ctx, i.PutPrm)
|
||||||
}
|
if err != nil {
|
||||||
|
|
||||||
return i.iterate(ctx, path)
|
|
||||||
}
|
|
||||||
|
|
||||||
i.AllFull = false
|
|
||||||
if !isLogical(err) {
|
if !isLogical(err) {
|
||||||
i.B.reportError(logs.BlobovniczatreeCouldNotPutObjectToActiveBlobovnicza, err)
|
i.B.reportError(logs.BlobovniczatreeCouldNotPutObjectToActiveBlobovnicza, err)
|
||||||
} else {
|
} else {
|
||||||
i.B.log.Debug(logs.BlobovniczatreeCouldNotPutObjectToActiveBlobovnicza,
|
i.B.log.Debug(logs.BlobovniczatreeCouldNotPutObjectToActiveBlobovnicza,
|
||||||
zap.String("path", filepath.Join(path, u64ToHexString(active.ind))),
|
zap.String("path", active.Path()),
|
||||||
zap.String("error", err.Error()))
|
zap.String("error", err.Error()))
|
||||||
}
|
}
|
||||||
|
|
||||||
return false, nil
|
return false, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
path = filepath.Join(path, u64ToHexString(active.ind))
|
idx := u64FromHexString(filepath.Base(active.Path()))
|
||||||
|
i.ID = blobovnicza.NewIDFromBytes([]byte(filepath.Join(lvlPath, u64ToHexString(idx))))
|
||||||
i.ID = blobovnicza.NewIDFromBytes([]byte(path))
|
|
||||||
|
|
||||||
return true, nil
|
return true, nil
|
||||||
}
|
}
|
||||||
|
|
Now the check for the fullness of the database is performed at the blobovnicza tree level.