forked from TrueCloudLab/frostfs-node
[#536] blobovnicza: Drop cache
Each blobovnicza instance is opened while is in use. Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
parent
b9b86d2ec8
commit
c672f59ab8
16 changed files with 586 additions and 474 deletions
|
@ -509,4 +509,5 @@ const (
|
|||
RuntimeSoftMemoryLimitUpdated = "soft runtime memory limit value updated"
|
||||
RuntimeSoftMemoryDefinedWithGOMEMLIMIT = "soft runtime memory defined with GOMEMLIMIT environment variable, config value skipped"
|
||||
FailedToCountWritecacheItems = "failed to count writecache items"
|
||||
AttemtToCloseAlreadyClosedBlobovnicza = "attempt to close an already closed blobovnicza"
|
||||
)
|
||||
|
|
|
@ -3,7 +3,6 @@ package blobovnicza
|
|||
import (
|
||||
"context"
|
||||
"crypto/rand"
|
||||
"errors"
|
||||
"os"
|
||||
"testing"
|
||||
|
||||
|
@ -98,9 +97,9 @@ func TestBlobovnicza(t *testing.T) {
|
|||
testPutGet(t, blz, oidtest.Address(), objSizeLim, nil, nil)
|
||||
}
|
||||
|
||||
// from now objects should not be saved
|
||||
// blobovnizca accepts object event if full
|
||||
testPutGet(t, blz, oidtest.Address(), 1024, func(err error) bool {
|
||||
return errors.Is(err, ErrFull)
|
||||
return err == nil
|
||||
}, nil)
|
||||
|
||||
require.NoError(t, blz.Close())
|
||||
|
|
|
@ -23,10 +23,6 @@ type PutPrm struct {
|
|||
type PutRes struct {
|
||||
}
|
||||
|
||||
// ErrFull is returned when trying to save an
|
||||
// object to a filled blobovnicza.
|
||||
var ErrFull = logicerr.New("blobovnicza is full")
|
||||
|
||||
// SetAddress sets the address of the saving object.
|
||||
func (p *PutPrm) SetAddress(addr oid.Address) {
|
||||
p.addr = addr
|
||||
|
@ -65,10 +61,6 @@ func (b *Blobovnicza) Put(ctx context.Context, prm PutPrm) (PutRes, error) {
|
|||
key := addressKey(prm.addr)
|
||||
|
||||
err := b.boltDB.Batch(func(tx *bbolt.Tx) error {
|
||||
if b.full() {
|
||||
return ErrFull
|
||||
}
|
||||
|
||||
buck := tx.Bucket(bucketName)
|
||||
if buck == nil {
|
||||
// expected to happen:
|
||||
|
|
|
@ -55,6 +55,6 @@ func (b *Blobovnicza) itemDeleted(itemSize uint64) {
|
|||
b.metrics.SubOpenBlobovniczaItems(1)
|
||||
}
|
||||
|
||||
func (b *Blobovnicza) full() bool {
|
||||
func (b *Blobovnicza) IsFull() bool {
|
||||
return b.dataSize.Load() >= b.fullSizeLimit
|
||||
}
|
||||
|
|
213
pkg/local_object_storage/blobstor/blobovniczatree/active.go
Normal file
213
pkg/local_object_storage/blobstor/blobovniczatree/active.go
Normal file
|
@ -0,0 +1,213 @@
|
|||
package blobovniczatree
|
||||
|
||||
import (
|
||||
"path/filepath"
|
||||
"sync"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobovnicza"
|
||||
utilSync "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/sync"
|
||||
)
|
||||
|
||||
type activeDB struct {
|
||||
blz *blobovnicza.Blobovnicza
|
||||
shDB *sharedDB
|
||||
}
|
||||
|
||||
func (db *activeDB) Blobovnicza() *blobovnicza.Blobovnicza {
|
||||
return db.blz
|
||||
}
|
||||
|
||||
func (db *activeDB) Close() {
|
||||
db.shDB.Close()
|
||||
}
|
||||
|
||||
func (db *activeDB) Path() string {
|
||||
return db.shDB.Path()
|
||||
}
|
||||
|
||||
// activeDBManager manages active blobovnicza instances (that is, those that are being used for Put).
|
||||
//
|
||||
// Uses dbManager for opening/closing sharedDB instances.
|
||||
// Stores a reference to an open active sharedDB, so dbManager does not close it.
|
||||
// When changing the active sharedDB, releases the reference to the previous active sharedDB.
|
||||
type activeDBManager struct {
|
||||
levelToActiveDBGuard *sync.RWMutex
|
||||
levelToActiveDB map[string]*sharedDB
|
||||
levelLock *utilSync.KeyLocker[string]
|
||||
closed bool
|
||||
|
||||
dbManager *dbManager
|
||||
leafWidth uint64
|
||||
}
|
||||
|
||||
func newActiveDBManager(dbManager *dbManager, leafWidth uint64) *activeDBManager {
|
||||
return &activeDBManager{
|
||||
levelToActiveDBGuard: &sync.RWMutex{},
|
||||
levelToActiveDB: make(map[string]*sharedDB),
|
||||
levelLock: utilSync.NewKeyLocker[string](),
|
||||
|
||||
dbManager: dbManager,
|
||||
leafWidth: leafWidth,
|
||||
}
|
||||
}
|
||||
|
||||
// GetOpenedActiveDBForLevel returns active DB for level.
|
||||
// DB must be closed after use.
|
||||
func (m *activeDBManager) GetOpenedActiveDBForLevel(lvlPath string) (*activeDB, error) {
|
||||
activeDB, err := m.getCurrentActiveIfOk(lvlPath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if activeDB != nil {
|
||||
return activeDB, nil
|
||||
}
|
||||
|
||||
return m.updateAndGetActive(lvlPath)
|
||||
}
|
||||
|
||||
func (m *activeDBManager) Open() {
|
||||
m.levelToActiveDBGuard.Lock()
|
||||
defer m.levelToActiveDBGuard.Unlock()
|
||||
|
||||
m.closed = false
|
||||
}
|
||||
|
||||
func (m *activeDBManager) Close() {
|
||||
m.levelToActiveDBGuard.Lock()
|
||||
defer m.levelToActiveDBGuard.Unlock()
|
||||
|
||||
for _, db := range m.levelToActiveDB {
|
||||
db.Close()
|
||||
}
|
||||
m.levelToActiveDB = make(map[string]*sharedDB)
|
||||
m.closed = true
|
||||
}
|
||||
|
||||
func (m *activeDBManager) getCurrentActiveIfOk(lvlPath string) (*activeDB, error) {
|
||||
m.levelToActiveDBGuard.RLock()
|
||||
defer m.levelToActiveDBGuard.RUnlock()
|
||||
|
||||
if m.closed {
|
||||
return nil, errClosed
|
||||
}
|
||||
|
||||
db, ok := m.levelToActiveDB[lvlPath]
|
||||
if !ok {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
blz, err := db.Open() //open db for usage, will be closed on activeDB.Close()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if blz.IsFull() {
|
||||
db.Close()
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
return &activeDB{
|
||||
blz: blz,
|
||||
shDB: db,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (m *activeDBManager) updateAndGetActive(lvlPath string) (*activeDB, error) {
|
||||
m.levelLock.Lock(lvlPath)
|
||||
defer m.levelLock.Unlock(lvlPath)
|
||||
|
||||
current, err := m.getCurrentActiveIfOk(lvlPath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if current != nil {
|
||||
return current, nil
|
||||
}
|
||||
|
||||
nextShDB, err := m.getNextSharedDB(lvlPath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if nextShDB == nil {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
blz, err := nextShDB.Open() // open db for client, client must call Close() after usage
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &activeDB{
|
||||
blz: blz,
|
||||
shDB: nextShDB,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (m *activeDBManager) getNextSharedDB(lvlPath string) (*sharedDB, error) {
|
||||
var idx uint64
|
||||
var iterCount uint64
|
||||
hasActive, currentIdx := m.hasActiveDB(lvlPath)
|
||||
if hasActive {
|
||||
idx = (currentIdx + 1) % m.leafWidth
|
||||
}
|
||||
|
||||
var next *sharedDB
|
||||
|
||||
for iterCount < m.leafWidth {
|
||||
path := filepath.Join(lvlPath, u64ToHexString(idx))
|
||||
shDB := m.dbManager.GetByPath(path)
|
||||
db, err := shDB.Open() //open db to hold active DB open, will be closed if db is full, after m.replace or by activeDBManager.Close()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if db.IsFull() {
|
||||
shDB.Close()
|
||||
} else {
|
||||
next = shDB
|
||||
break
|
||||
}
|
||||
idx = (idx + 1) % m.leafWidth
|
||||
iterCount++
|
||||
}
|
||||
|
||||
previous, updated := m.replace(lvlPath, next)
|
||||
if !updated && next != nil {
|
||||
next.Close() // manager is closed, so don't hold active DB open
|
||||
}
|
||||
if updated && previous != nil {
|
||||
previous.Close()
|
||||
}
|
||||
return next, nil
|
||||
}
|
||||
|
||||
func (m *activeDBManager) hasActiveDB(lvlPath string) (bool, uint64) {
|
||||
m.levelToActiveDBGuard.RLock()
|
||||
defer m.levelToActiveDBGuard.RUnlock()
|
||||
|
||||
if m.closed {
|
||||
return false, 0
|
||||
}
|
||||
|
||||
db, ok := m.levelToActiveDB[lvlPath]
|
||||
if !ok {
|
||||
return false, 0
|
||||
}
|
||||
return true, u64FromHexString(filepath.Base(db.Path()))
|
||||
}
|
||||
|
||||
func (m *activeDBManager) replace(lvlPath string, shDB *sharedDB) (*sharedDB, bool) {
|
||||
m.levelToActiveDBGuard.Lock()
|
||||
defer m.levelToActiveDBGuard.Unlock()
|
||||
|
||||
if m.closed {
|
||||
return nil, false
|
||||
}
|
||||
|
||||
previous := m.levelToActiveDB[lvlPath]
|
||||
if shDB == nil {
|
||||
delete(m.levelToActiveDB, lvlPath)
|
||||
} else {
|
||||
m.levelToActiveDB[lvlPath] = shDB
|
||||
}
|
||||
return previous, true
|
||||
}
|
|
@ -3,19 +3,12 @@ package blobovniczatree
|
|||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
"sync"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobovnicza"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/compression"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"git.frostfs.info/TrueCloudLab/hrw"
|
||||
"github.com/hashicorp/golang-lru/v2/simplelru"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// Blobovniczas represents the storage of the "small" objects.
|
||||
|
@ -61,28 +54,8 @@ import (
|
|||
type Blobovniczas struct {
|
||||
cfg
|
||||
|
||||
// cache of opened filled Blobovniczas
|
||||
opened *simplelru.LRU[string, *blobovnicza.Blobovnicza]
|
||||
// lruMtx protects opened cache.
|
||||
// It isn't RWMutex because `Get` calls must
|
||||
// lock this mutex on write, as LRU info is updated.
|
||||
// It must be taken after activeMtx in case when eviction is possible
|
||||
// i.e. `Add`, `Purge` and `Remove` calls.
|
||||
lruMtx sync.Mutex
|
||||
|
||||
// mutex to exclude parallel bbolt.Open() calls
|
||||
// bbolt.Open() deadlocks if it tries to open already opened file
|
||||
openMtx sync.Mutex
|
||||
|
||||
// list of active (opened, non-filled) Blobovniczas
|
||||
activeMtx sync.RWMutex
|
||||
active map[string]blobovniczaWithIndex
|
||||
}
|
||||
|
||||
type blobovniczaWithIndex struct {
|
||||
ind uint64
|
||||
|
||||
blz *blobovnicza.Blobovnicza
|
||||
commondbManager *dbManager
|
||||
activeDBManager *activeDBManager
|
||||
}
|
||||
|
||||
var _ common.Storage = (*Blobovniczas)(nil)
|
||||
|
@ -102,120 +75,12 @@ func NewBlobovniczaTree(opts ...Option) (blz *Blobovniczas) {
|
|||
blz.blzLeafWidth = blz.blzShallowWidth
|
||||
}
|
||||
|
||||
cache, err := simplelru.NewLRU[string, *blobovnicza.Blobovnicza](blz.openedCacheSize, func(p string, value *blobovnicza.Blobovnicza) {
|
||||
lvlPath := filepath.Dir(p)
|
||||
if b, ok := blz.active[lvlPath]; ok && b.ind == u64FromHexString(filepath.Base(p)) {
|
||||
// This branch is taken if we have recently updated active blobovnicza and remove
|
||||
// it from opened cache.
|
||||
return
|
||||
} else if err := value.Close(); err != nil {
|
||||
blz.log.Error(logs.BlobovniczatreeCouldNotCloseBlobovnicza,
|
||||
zap.String("id", p),
|
||||
zap.String("error", err.Error()),
|
||||
)
|
||||
} else {
|
||||
blz.log.Debug(logs.BlobovniczatreeBlobovniczaSuccessfullyClosedOnEvict,
|
||||
zap.String("id", p),
|
||||
)
|
||||
}
|
||||
})
|
||||
if err != nil {
|
||||
// occurs only if the size is not positive
|
||||
panic(fmt.Errorf("could not create LRU cache of size %d: %w", blz.openedCacheSize, err))
|
||||
}
|
||||
|
||||
activeMapCapacity := uint64(1)
|
||||
for i := uint64(0); i < blz.blzShallowDepth; i++ {
|
||||
if i+1 == blz.blzShallowDepth {
|
||||
activeMapCapacity *= blz.blzLeafWidth
|
||||
} else {
|
||||
activeMapCapacity *= blz.blzShallowWidth
|
||||
}
|
||||
}
|
||||
|
||||
blz.opened = cache
|
||||
blz.active = make(map[string]blobovniczaWithIndex, activeMapCapacity)
|
||||
blz.commondbManager = newDBManager(blz.rootPath, blz.blzOpts, blz.blzLeafWidth, blz.readOnly, blz.metrics.Blobovnicza(), blz.log)
|
||||
blz.activeDBManager = newActiveDBManager(blz.commondbManager, blz.blzLeafWidth)
|
||||
|
||||
return blz
|
||||
}
|
||||
|
||||
// activates and returns activated blobovnicza of p-level (dir).
|
||||
//
|
||||
// returns error if blobvnicza could not be activated.
|
||||
func (b *Blobovniczas) getActivated(lvlPath string) (blobovniczaWithIndex, error) {
|
||||
return b.updateAndGet(lvlPath, nil)
|
||||
}
|
||||
|
||||
// updates active blobovnicza of p-level (dir).
|
||||
//
|
||||
// if current active blobovnicza's index is not old, it remains unchanged.
|
||||
func (b *Blobovniczas) updateActive(lvlPath string, old *uint64) error {
|
||||
b.log.Debug(logs.BlobovniczatreeUpdatingActiveBlobovnicza, zap.String("path", lvlPath))
|
||||
|
||||
_, err := b.updateAndGet(lvlPath, old)
|
||||
|
||||
b.log.Debug(logs.BlobovniczatreeActiveBlobovniczaSuccessfullyUpdated, zap.String("path", lvlPath))
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
// updates and returns active blobovnicza of p-level (dir).
|
||||
//
|
||||
// if current active blobovnicza's index is not old, it is returned unchanged.
|
||||
func (b *Blobovniczas) updateAndGet(lvlPath string, old *uint64) (blobovniczaWithIndex, error) {
|
||||
b.activeMtx.RLock()
|
||||
active, ok := b.active[lvlPath]
|
||||
b.activeMtx.RUnlock()
|
||||
|
||||
if ok {
|
||||
if old != nil {
|
||||
if active.ind == b.blzLeafWidth-1 {
|
||||
return active, logicerr.New("no more Blobovniczas")
|
||||
} else if active.ind != *old {
|
||||
// sort of CAS in order to control concurrent
|
||||
// updateActive calls
|
||||
return active, nil
|
||||
}
|
||||
} else {
|
||||
return active, nil
|
||||
}
|
||||
|
||||
active.ind++
|
||||
}
|
||||
|
||||
var err error
|
||||
if active.blz, err = b.openBlobovnicza(filepath.Join(lvlPath, u64ToHexString(active.ind))); err != nil {
|
||||
return active, err
|
||||
}
|
||||
|
||||
b.activeMtx.Lock()
|
||||
defer b.activeMtx.Unlock()
|
||||
|
||||
// check 2nd time to find out if it blobovnicza was activated while thread was locked
|
||||
tryActive, ok := b.active[lvlPath]
|
||||
if ok && tryActive.blz == active.blz {
|
||||
return tryActive, nil
|
||||
}
|
||||
|
||||
// Remove from opened cache (active blobovnicza should always be opened).
|
||||
// Because `onEvict` callback is called in `Remove`, we need to update
|
||||
// active map beforehand.
|
||||
b.active[lvlPath] = active
|
||||
|
||||
activePath := filepath.Join(lvlPath, u64ToHexString(active.ind))
|
||||
b.lruMtx.Lock()
|
||||
b.opened.Remove(activePath)
|
||||
if ok {
|
||||
b.opened.Add(filepath.Join(lvlPath, u64ToHexString(tryActive.ind)), tryActive.blz)
|
||||
}
|
||||
b.lruMtx.Unlock()
|
||||
|
||||
b.log.Debug(logs.BlobovniczatreeBlobovniczaSuccessfullyActivated,
|
||||
zap.String("path", activePath))
|
||||
|
||||
return active, nil
|
||||
}
|
||||
|
||||
// returns hash of the object address.
|
||||
func addressHash(addr *oid.Address, path string) uint64 {
|
||||
var a string
|
||||
|
|
|
@ -0,0 +1,59 @@
|
|||
package blobovniczatree
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger/test"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestBlobovniczaTree_Concurrency(t *testing.T) {
|
||||
t.Parallel()
|
||||
const n = 1000
|
||||
|
||||
st := NewBlobovniczaTree(
|
||||
WithLogger(test.NewLogger(t, true)),
|
||||
WithObjectSizeLimit(1024),
|
||||
WithBlobovniczaShallowWidth(10),
|
||||
WithBlobovniczaShallowDepth(1),
|
||||
WithRootPath(t.TempDir()))
|
||||
require.NoError(t, st.Open(false))
|
||||
require.NoError(t, st.Init())
|
||||
t.Cleanup(func() {
|
||||
require.NoError(t, st.Close())
|
||||
})
|
||||
|
||||
objGen := &testutil.SeqObjGenerator{ObjSize: 1}
|
||||
|
||||
var cnt atomic.Int64
|
||||
var wg sync.WaitGroup
|
||||
for i := 0; i < 1000; i++ {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
for cnt.Add(1) <= n {
|
||||
obj := objGen.Next()
|
||||
addr := testutil.AddressFromObject(t, obj)
|
||||
|
||||
raw, err := obj.Marshal()
|
||||
require.NoError(t, err)
|
||||
|
||||
_, err = st.Put(context.Background(), common.PutPrm{
|
||||
Address: addr,
|
||||
RawData: raw,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
_, err = st.Get(context.Background(), common.GetPrm{Address: addr})
|
||||
require.NoError(t, err) // fails very often, correlated to how many goroutines are started
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
}
|
|
@ -2,11 +2,8 @@ package blobovniczatree
|
|||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"path/filepath"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobovnicza"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
|
@ -14,6 +11,7 @@ import (
|
|||
func (b *Blobovniczas) Open(readOnly bool) error {
|
||||
b.readOnly = readOnly
|
||||
b.metrics.SetMode(readOnly)
|
||||
b.openManagers()
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -23,59 +21,35 @@ func (b *Blobovniczas) Open(readOnly bool) error {
|
|||
func (b *Blobovniczas) Init() error {
|
||||
b.log.Debug(logs.BlobovniczatreeInitializingBlobovniczas)
|
||||
|
||||
b.openManagers()
|
||||
|
||||
if b.readOnly {
|
||||
b.log.Debug(logs.BlobovniczatreeReadonlyModeSkipBlobovniczasInitialization)
|
||||
return nil
|
||||
}
|
||||
|
||||
return b.iterateLeaves(context.TODO(), func(p string) (bool, error) {
|
||||
blz, err := b.openBlobovniczaNoCache(p)
|
||||
shBlz := b.openBlobovniczaNoCache(p)
|
||||
_, err := shBlz.Open()
|
||||
if err != nil {
|
||||
return true, err
|
||||
}
|
||||
defer blz.Close()
|
||||
|
||||
if err := blz.Init(); err != nil {
|
||||
return true, fmt.Errorf("could not initialize blobovnicza structure %s: %w", p, err)
|
||||
}
|
||||
defer shBlz.Close()
|
||||
|
||||
b.log.Debug(logs.BlobovniczatreeBlobovniczaSuccessfullyInitializedClosing, zap.String("id", p))
|
||||
return false, nil
|
||||
})
|
||||
}
|
||||
|
||||
func (b *Blobovniczas) openManagers() {
|
||||
b.commondbManager.Open() //order important
|
||||
b.activeDBManager.Open()
|
||||
}
|
||||
|
||||
// Close implements common.Storage.
|
||||
func (b *Blobovniczas) Close() error {
|
||||
b.activeMtx.Lock()
|
||||
|
||||
b.lruMtx.Lock()
|
||||
|
||||
for p, v := range b.active {
|
||||
if err := v.blz.Close(); err != nil {
|
||||
b.log.Debug(logs.BlobovniczatreeCouldNotCloseActiveBlobovnicza,
|
||||
zap.String("path", p),
|
||||
zap.String("error", err.Error()),
|
||||
)
|
||||
}
|
||||
b.opened.Remove(p)
|
||||
}
|
||||
for _, k := range b.opened.Keys() {
|
||||
blz, _ := b.opened.Get(k)
|
||||
if err := blz.Close(); err != nil {
|
||||
b.log.Debug(logs.BlobovniczatreeCouldNotCloseActiveBlobovnicza,
|
||||
zap.String("path", k),
|
||||
zap.String("error", err.Error()),
|
||||
)
|
||||
}
|
||||
b.opened.Remove(k)
|
||||
}
|
||||
|
||||
b.active = make(map[string]blobovniczaWithIndex)
|
||||
b.metrics.Close()
|
||||
|
||||
b.lruMtx.Unlock()
|
||||
|
||||
b.activeMtx.Unlock()
|
||||
b.activeDBManager.Close() //order important
|
||||
b.commondbManager.Close()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
@ -83,61 +57,10 @@ func (b *Blobovniczas) Close() error {
|
|||
// opens and returns blobovnicza with path p.
|
||||
//
|
||||
// If blobovnicza is already opened and cached, instance from cache is returned w/o changes.
|
||||
func (b *Blobovniczas) openBlobovnicza(p string) (*blobovnicza.Blobovnicza, error) {
|
||||
b.lruMtx.Lock()
|
||||
v, ok := b.opened.Get(p)
|
||||
b.lruMtx.Unlock()
|
||||
if ok {
|
||||
// blobovnicza should be opened in cache
|
||||
return v, nil
|
||||
func (b *Blobovniczas) openBlobovnicza(p string) *sharedDB {
|
||||
return b.openBlobovniczaNoCache(p)
|
||||
}
|
||||
|
||||
lvlPath := filepath.Dir(p)
|
||||
curIndex := u64FromHexString(filepath.Base(p))
|
||||
|
||||
b.activeMtx.RLock()
|
||||
defer b.activeMtx.RUnlock()
|
||||
|
||||
active, ok := b.active[lvlPath]
|
||||
if ok && active.ind == curIndex {
|
||||
return active.blz, nil
|
||||
}
|
||||
|
||||
b.lruMtx.Lock()
|
||||
defer b.lruMtx.Unlock()
|
||||
|
||||
v, ok = b.opened.Get(p)
|
||||
if ok {
|
||||
return v, nil
|
||||
}
|
||||
|
||||
blz, err := b.openBlobovniczaNoCache(p)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
b.opened.Add(p, blz)
|
||||
|
||||
return blz, nil
|
||||
}
|
||||
|
||||
func (b *Blobovniczas) openBlobovniczaNoCache(p string) (*blobovnicza.Blobovnicza, error) {
|
||||
b.openMtx.Lock()
|
||||
defer b.openMtx.Unlock()
|
||||
|
||||
path := filepath.Join(b.rootPath, p)
|
||||
|
||||
blz := blobovnicza.New(append(b.blzOpts,
|
||||
blobovnicza.WithReadOnly(b.readOnly),
|
||||
blobovnicza.WithPath(path),
|
||||
blobovnicza.WithMetrics(b.metrics.Blobovnicza()),
|
||||
)...)
|
||||
|
||||
if err := blz.Open(); err != nil {
|
||||
return nil, fmt.Errorf("could not open blobovnicza %s: %w", p, err)
|
||||
}
|
||||
if err := blz.Init(); err != nil {
|
||||
return nil, fmt.Errorf("could not init blobovnicza %s: %w", p, err)
|
||||
}
|
||||
return blz, nil
|
||||
func (b *Blobovniczas) openBlobovniczaNoCache(p string) *sharedDB {
|
||||
return b.commondbManager.GetByPath(p)
|
||||
}
|
||||
|
|
|
@ -3,7 +3,6 @@ package blobovniczatree
|
|||
import (
|
||||
"context"
|
||||
"encoding/hex"
|
||||
"path/filepath"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
|
@ -48,10 +47,12 @@ func (b *Blobovniczas) Delete(ctx context.Context, prm common.DeletePrm) (res co
|
|||
|
||||
if prm.StorageID != nil {
|
||||
id := blobovnicza.NewIDFromBytes(prm.StorageID)
|
||||
blz, err := b.openBlobovnicza(id.String())
|
||||
shBlz := b.openBlobovnicza(id.String())
|
||||
blz, err := shBlz.Open()
|
||||
if err != nil {
|
||||
return res, err
|
||||
}
|
||||
defer shBlz.Close()
|
||||
|
||||
if res, err = b.deleteObject(ctx, blz, bPrm); err == nil {
|
||||
success = true
|
||||
|
@ -59,16 +60,10 @@ func (b *Blobovniczas) Delete(ctx context.Context, prm common.DeletePrm) (res co
|
|||
return res, err
|
||||
}
|
||||
|
||||
activeCache := make(map[string]struct{})
|
||||
objectFound := false
|
||||
|
||||
err = b.iterateSortedLeaves(ctx, &prm.Address, func(p string) (bool, error) {
|
||||
dirPath := filepath.Dir(p)
|
||||
|
||||
// don't process active blobovnicza of the level twice
|
||||
_, ok := activeCache[dirPath]
|
||||
|
||||
res, err = b.deleteObjectFromLevel(ctx, bPrm, p, !ok)
|
||||
res, err = b.deleteObjectFromLevel(ctx, bPrm, p)
|
||||
if err != nil {
|
||||
if !client.IsErrObjectNotFound(err) {
|
||||
b.log.Debug(logs.BlobovniczatreeCouldNotRemoveObjectFromLevel,
|
||||
|
@ -78,8 +73,6 @@ func (b *Blobovniczas) Delete(ctx context.Context, prm common.DeletePrm) (res co
|
|||
}
|
||||
}
|
||||
|
||||
activeCache[dirPath] = struct{}{}
|
||||
|
||||
if err == nil {
|
||||
objectFound = true
|
||||
}
|
||||
|
@ -100,57 +93,13 @@ func (b *Blobovniczas) Delete(ctx context.Context, prm common.DeletePrm) (res co
|
|||
// tries to delete object from particular blobovnicza.
|
||||
//
|
||||
// returns no error if object was removed from some blobovnicza of the same level.
|
||||
func (b *Blobovniczas) deleteObjectFromLevel(ctx context.Context, prm blobovnicza.DeletePrm, blzPath string, tryActive bool) (common.DeleteRes, error) {
|
||||
lvlPath := filepath.Dir(blzPath)
|
||||
|
||||
// try to remove from blobovnicza if it is opened
|
||||
b.lruMtx.Lock()
|
||||
v, ok := b.opened.Get(blzPath)
|
||||
b.lruMtx.Unlock()
|
||||
if ok {
|
||||
if res, err := b.deleteObject(ctx, v, prm); err == nil {
|
||||
return res, err
|
||||
} else if !client.IsErrObjectNotFound(err) {
|
||||
b.log.Debug(logs.BlobovniczatreeCouldNotRemoveObjectFromOpenedBlobovnicza,
|
||||
zap.String("path", blzPath),
|
||||
zap.String("error", err.Error()),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
// therefore the object is possibly placed in a lighter blobovnicza
|
||||
|
||||
// next we check in the active level blobobnicza:
|
||||
// * the active blobovnicza is always opened.
|
||||
b.activeMtx.RLock()
|
||||
active, ok := b.active[lvlPath]
|
||||
b.activeMtx.RUnlock()
|
||||
|
||||
if ok && tryActive {
|
||||
if res, err := b.deleteObject(ctx, active.blz, prm); err == nil {
|
||||
return res, err
|
||||
} else if !client.IsErrObjectNotFound(err) {
|
||||
b.log.Debug(logs.BlobovniczatreeCouldNotRemoveObjectFromActiveBlobovnicza,
|
||||
zap.String("path", blzPath),
|
||||
zap.String("error", err.Error()),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
// then object is possibly placed in closed blobovnicza
|
||||
|
||||
// check if it makes sense to try to open the blob
|
||||
// (Blobovniczas "after" the active one are empty anyway,
|
||||
// and it's pointless to open them).
|
||||
if u64FromHexString(filepath.Base(blzPath)) > active.ind {
|
||||
return common.DeleteRes{}, logicerr.Wrap(new(apistatus.ObjectNotFound))
|
||||
}
|
||||
|
||||
// open blobovnicza (cached inside)
|
||||
blz, err := b.openBlobovnicza(blzPath)
|
||||
func (b *Blobovniczas) deleteObjectFromLevel(ctx context.Context, prm blobovnicza.DeletePrm, blzPath string) (common.DeleteRes, error) {
|
||||
shBlz := b.openBlobovnicza(blzPath)
|
||||
blz, err := shBlz.Open()
|
||||
if err != nil {
|
||||
return common.DeleteRes{}, err
|
||||
}
|
||||
defer shBlz.Close()
|
||||
|
||||
return b.deleteObject(ctx, blz, prm)
|
||||
}
|
||||
|
|
|
@ -7,6 +7,8 @@ import (
|
|||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||
)
|
||||
|
||||
var errClosed = logicerr.New("blobvnicza is closed")
|
||||
|
||||
func isErrOutOfRange(err error) bool {
|
||||
var target *apistatus.ObjectOutOfRange
|
||||
return errors.As(err, &target)
|
||||
|
|
|
@ -3,7 +3,6 @@ package blobovniczatree
|
|||
import (
|
||||
"context"
|
||||
"encoding/hex"
|
||||
"path/filepath"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
|
@ -37,26 +36,22 @@ func (b *Blobovniczas) Exists(ctx context.Context, prm common.ExistsPrm) (common
|
|||
|
||||
if prm.StorageID != nil {
|
||||
id := blobovnicza.NewIDFromBytes(prm.StorageID)
|
||||
blz, err := b.openBlobovnicza(id.String())
|
||||
shBlz := b.openBlobovnicza(id.String())
|
||||
blz, err := shBlz.Open()
|
||||
if err != nil {
|
||||
return common.ExistsRes{}, err
|
||||
}
|
||||
defer shBlz.Close()
|
||||
|
||||
exists, err := blz.Exists(ctx, prm.Address)
|
||||
return common.ExistsRes{Exists: exists}, err
|
||||
}
|
||||
|
||||
activeCache := make(map[string]struct{})
|
||||
|
||||
var gPrm blobovnicza.GetPrm
|
||||
gPrm.SetAddress(prm.Address)
|
||||
|
||||
err := b.iterateSortedLeaves(ctx, &prm.Address, func(p string) (bool, error) {
|
||||
dirPath := filepath.Dir(p)
|
||||
|
||||
_, ok := activeCache[dirPath]
|
||||
|
||||
_, err := b.getObjectFromLevel(ctx, gPrm, p, !ok)
|
||||
_, err := b.getObjectFromLevel(ctx, gPrm, p)
|
||||
if err != nil {
|
||||
if !client.IsErrObjectNotFound(err) {
|
||||
b.log.Debug(logs.BlobovniczatreeCouldNotGetObjectFromLevel,
|
||||
|
@ -65,7 +60,6 @@ func (b *Blobovniczas) Exists(ctx context.Context, prm common.ExistsPrm) (common
|
|||
}
|
||||
}
|
||||
|
||||
activeCache[dirPath] = struct{}{}
|
||||
found = err == nil
|
||||
return found, nil
|
||||
})
|
||||
|
|
|
@ -4,7 +4,6 @@ import (
|
|||
"context"
|
||||
"encoding/hex"
|
||||
"fmt"
|
||||
"path/filepath"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
|
@ -48,10 +47,12 @@ func (b *Blobovniczas) Get(ctx context.Context, prm common.GetPrm) (res common.G
|
|||
|
||||
if prm.StorageID != nil {
|
||||
id := blobovnicza.NewIDFromBytes(prm.StorageID)
|
||||
blz, err := b.openBlobovnicza(id.String())
|
||||
shBlz := b.openBlobovnicza(id.String())
|
||||
blz, err := shBlz.Open()
|
||||
if err != nil {
|
||||
return res, err
|
||||
}
|
||||
defer shBlz.Close()
|
||||
|
||||
res, err = b.getObject(ctx, blz, bPrm)
|
||||
if err == nil {
|
||||
|
@ -61,14 +62,8 @@ func (b *Blobovniczas) Get(ctx context.Context, prm common.GetPrm) (res common.G
|
|||
return res, err
|
||||
}
|
||||
|
||||
activeCache := make(map[string]struct{})
|
||||
|
||||
err = b.iterateSortedLeaves(ctx, &prm.Address, func(p string) (bool, error) {
|
||||
dirPath := filepath.Dir(p)
|
||||
|
||||
_, ok := activeCache[dirPath]
|
||||
|
||||
res, err = b.getObjectFromLevel(ctx, bPrm, p, !ok)
|
||||
res, err = b.getObjectFromLevel(ctx, bPrm, p)
|
||||
if err != nil {
|
||||
if !client.IsErrObjectNotFound(err) {
|
||||
b.log.Debug(logs.BlobovniczatreeCouldNotGetObjectFromLevel,
|
||||
|
@ -78,8 +73,6 @@ func (b *Blobovniczas) Get(ctx context.Context, prm common.GetPrm) (res common.G
|
|||
}
|
||||
}
|
||||
|
||||
activeCache[dirPath] = struct{}{}
|
||||
|
||||
// abort iterator if found, otherwise process all Blobovniczas
|
||||
return err == nil, nil
|
||||
})
|
||||
|
@ -98,58 +91,14 @@ func (b *Blobovniczas) Get(ctx context.Context, prm common.GetPrm) (res common.G
|
|||
// tries to read object from particular blobovnicza.
|
||||
//
|
||||
// returns error if object could not be read from any blobovnicza of the same level.
|
||||
func (b *Blobovniczas) getObjectFromLevel(ctx context.Context, prm blobovnicza.GetPrm, blzPath string, tryActive bool) (common.GetRes, error) {
|
||||
lvlPath := filepath.Dir(blzPath)
|
||||
|
||||
// try to read from blobovnicza if it is opened
|
||||
b.lruMtx.Lock()
|
||||
v, ok := b.opened.Get(blzPath)
|
||||
b.lruMtx.Unlock()
|
||||
if ok {
|
||||
if res, err := b.getObject(ctx, v, prm); err == nil {
|
||||
return res, err
|
||||
} else if !client.IsErrObjectNotFound(err) {
|
||||
b.log.Debug(logs.BlobovniczatreeCouldNotReadObjectFromOpenedBlobovnicza,
|
||||
zap.String("path", blzPath),
|
||||
zap.String("error", err.Error()),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
// therefore the object is possibly placed in a lighter blobovnicza
|
||||
|
||||
// next we check in the active level blobobnicza:
|
||||
// * the freshest objects are probably the most demanded;
|
||||
// * the active blobovnicza is always opened.
|
||||
b.activeMtx.RLock()
|
||||
active, ok := b.active[lvlPath]
|
||||
b.activeMtx.RUnlock()
|
||||
|
||||
if ok && tryActive {
|
||||
if res, err := b.getObject(ctx, active.blz, prm); err == nil {
|
||||
return res, err
|
||||
} else if !client.IsErrObjectNotFound(err) {
|
||||
b.log.Debug(logs.BlobovniczatreeCouldNotGetObjectFromActiveBlobovnicza,
|
||||
zap.String("path", blzPath),
|
||||
zap.String("error", err.Error()),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
// then object is possibly placed in closed blobovnicza
|
||||
|
||||
// check if it makes sense to try to open the blob
|
||||
// (Blobovniczas "after" the active one are empty anyway,
|
||||
// and it's pointless to open them).
|
||||
if u64FromHexString(filepath.Base(blzPath)) > active.ind {
|
||||
return common.GetRes{}, logicerr.Wrap(new(apistatus.ObjectNotFound))
|
||||
}
|
||||
|
||||
func (b *Blobovniczas) getObjectFromLevel(ctx context.Context, prm blobovnicza.GetPrm, blzPath string) (common.GetRes, error) {
|
||||
// open blobovnicza (cached inside)
|
||||
blz, err := b.openBlobovnicza(blzPath)
|
||||
shBlz := b.openBlobovnicza(blzPath)
|
||||
blz, err := shBlz.Open()
|
||||
if err != nil {
|
||||
return common.GetRes{}, err
|
||||
}
|
||||
defer shBlz.Close()
|
||||
|
||||
return b.getObject(ctx, blz, prm)
|
||||
}
|
||||
|
|
|
@ -4,7 +4,6 @@ import (
|
|||
"context"
|
||||
"encoding/hex"
|
||||
"fmt"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
|
@ -47,10 +46,12 @@ func (b *Blobovniczas) GetRange(ctx context.Context, prm common.GetRangePrm) (re
|
|||
|
||||
if prm.StorageID != nil {
|
||||
id := blobovnicza.NewIDFromBytes(prm.StorageID)
|
||||
blz, err := b.openBlobovnicza(id.String())
|
||||
shBlz := b.openBlobovnicza(id.String())
|
||||
blz, err := shBlz.Open()
|
||||
if err != nil {
|
||||
return common.GetRangeRes{}, err
|
||||
}
|
||||
defer shBlz.Close()
|
||||
|
||||
res, err := b.getObjectRange(ctx, blz, prm)
|
||||
if err == nil {
|
||||
|
@ -60,15 +61,10 @@ func (b *Blobovniczas) GetRange(ctx context.Context, prm common.GetRangePrm) (re
|
|||
return res, err
|
||||
}
|
||||
|
||||
activeCache := make(map[string]struct{})
|
||||
objectFound := false
|
||||
|
||||
err = b.iterateSortedLeaves(ctx, &prm.Address, func(p string) (bool, error) {
|
||||
dirPath := filepath.Dir(p)
|
||||
|
||||
_, ok := activeCache[dirPath]
|
||||
|
||||
res, err = b.getRangeFromLevel(ctx, prm, p, !ok)
|
||||
res, err = b.getRangeFromLevel(ctx, prm, p)
|
||||
if err != nil {
|
||||
outOfBounds := isErrOutOfRange(err)
|
||||
if !outOfBounds && !client.IsErrObjectNotFound(err) {
|
||||
|
@ -82,8 +78,6 @@ func (b *Blobovniczas) GetRange(ctx context.Context, prm common.GetRangePrm) (re
|
|||
}
|
||||
}
|
||||
|
||||
activeCache[dirPath] = struct{}{}
|
||||
|
||||
objectFound = err == nil
|
||||
|
||||
// abort iterator if found, otherwise process all Blobovniczas
|
||||
|
@ -106,68 +100,14 @@ func (b *Blobovniczas) GetRange(ctx context.Context, prm common.GetRangePrm) (re
|
|||
// tries to read range of object payload data from particular blobovnicza.
|
||||
//
|
||||
// returns error if object could not be read from any blobovnicza of the same level.
|
||||
func (b *Blobovniczas) getRangeFromLevel(ctx context.Context, prm common.GetRangePrm, blzPath string, tryActive bool) (common.GetRangeRes, error) {
|
||||
lvlPath := filepath.Dir(blzPath)
|
||||
|
||||
// try to read from blobovnicza if it is opened
|
||||
b.lruMtx.Lock()
|
||||
v, ok := b.opened.Get(blzPath)
|
||||
b.lruMtx.Unlock()
|
||||
if ok {
|
||||
res, err := b.getObjectRange(ctx, v, prm)
|
||||
switch {
|
||||
case err == nil,
|
||||
isErrOutOfRange(err):
|
||||
return res, err
|
||||
default:
|
||||
if !client.IsErrObjectNotFound(err) {
|
||||
b.log.Debug(logs.BlobovniczatreeCouldNotReadPayloadRangeFromOpenedBlobovnicza,
|
||||
zap.String("path", blzPath),
|
||||
zap.String("error", err.Error()),
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// therefore the object is possibly placed in a lighter blobovnicza
|
||||
|
||||
// next we check in the active level blobobnicza:
|
||||
// * the freshest objects are probably the most demanded;
|
||||
// * the active blobovnicza is always opened.
|
||||
b.activeMtx.RLock()
|
||||
active, ok := b.active[lvlPath]
|
||||
b.activeMtx.RUnlock()
|
||||
|
||||
if ok && tryActive {
|
||||
res, err := b.getObjectRange(ctx, active.blz, prm)
|
||||
switch {
|
||||
case err == nil,
|
||||
isErrOutOfRange(err):
|
||||
return res, err
|
||||
default:
|
||||
if !client.IsErrObjectNotFound(err) {
|
||||
b.log.Debug(logs.BlobovniczatreeCouldNotReadPayloadRangeFromActiveBlobovnicza,
|
||||
zap.String("path", blzPath),
|
||||
zap.String("error", err.Error()),
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// then object is possibly placed in closed blobovnicza
|
||||
|
||||
// check if it makes sense to try to open the blob
|
||||
// (Blobovniczas "after" the active one are empty anyway,
|
||||
// and it's pointless to open them).
|
||||
if u64FromHexString(filepath.Base(blzPath)) > active.ind {
|
||||
return common.GetRangeRes{}, logicerr.Wrap(new(apistatus.ObjectNotFound))
|
||||
}
|
||||
|
||||
func (b *Blobovniczas) getRangeFromLevel(ctx context.Context, prm common.GetRangePrm, blzPath string) (common.GetRangeRes, error) {
|
||||
// open blobovnicza (cached inside)
|
||||
blz, err := b.openBlobovnicza(blzPath)
|
||||
shBlz := b.openBlobovnicza(blzPath)
|
||||
blz, err := shBlz.Open()
|
||||
if err != nil {
|
||||
return common.GetRangeRes{}, err
|
||||
}
|
||||
defer shBlz.Close()
|
||||
|
||||
return b.getObjectRange(ctx, blz, prm)
|
||||
}
|
||||
|
|
|
@ -68,13 +68,15 @@ func (b *Blobovniczas) Iterate(ctx context.Context, prm common.IteratePrm) (comm
|
|||
// iterator over all Blobovniczas in unsorted order. Break on f's error return.
|
||||
func (b *Blobovniczas) iterateBlobovniczas(ctx context.Context, ignoreErrors bool, f func(string, *blobovnicza.Blobovnicza) error) error {
|
||||
return b.iterateLeaves(ctx, func(p string) (bool, error) {
|
||||
blz, err := b.openBlobovnicza(p)
|
||||
shBlz := b.openBlobovnicza(p)
|
||||
blz, err := shBlz.Open()
|
||||
if err != nil {
|
||||
if ignoreErrors {
|
||||
return false, nil
|
||||
}
|
||||
return false, fmt.Errorf("could not open blobovnicza %s: %w", p, err)
|
||||
}
|
||||
defer shBlz.Close()
|
||||
|
||||
err = f(p, blz)
|
||||
|
||||
|
|
243
pkg/local_object_storage/blobstor/blobovniczatree/manager.go
Normal file
243
pkg/local_object_storage/blobstor/blobovniczatree/manager.go
Normal file
|
@ -0,0 +1,243 @@
|
|||
package blobovniczatree
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobovnicza"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// sharedDB is responsible for opening and closing a file of single blobovnicza.
|
||||
type sharedDB struct {
|
||||
guard *sync.RWMutex
|
||||
blcza *blobovnicza.Blobovnicza
|
||||
refCount uint32
|
||||
|
||||
openDBCounter *openDBCounter
|
||||
closedFlag *atomic.Bool
|
||||
options []blobovnicza.Option
|
||||
path string
|
||||
readOnly bool
|
||||
metrics blobovnicza.Metrics
|
||||
log *logger.Logger
|
||||
}
|
||||
|
||||
func newSharedDB(options []blobovnicza.Option, path string, readOnly bool,
|
||||
metrics blobovnicza.Metrics, openDBCounter *openDBCounter, closedFlag *atomic.Bool, log *logger.Logger) *sharedDB {
|
||||
return &sharedDB{
|
||||
guard: &sync.RWMutex{},
|
||||
|
||||
options: options,
|
||||
path: path,
|
||||
readOnly: readOnly,
|
||||
metrics: metrics,
|
||||
closedFlag: closedFlag,
|
||||
log: log,
|
||||
openDBCounter: openDBCounter,
|
||||
}
|
||||
}
|
||||
|
||||
func (b *sharedDB) Open() (*blobovnicza.Blobovnicza, error) {
|
||||
if b.closedFlag.Load() {
|
||||
return nil, errClosed
|
||||
}
|
||||
|
||||
b.guard.Lock()
|
||||
defer b.guard.Unlock()
|
||||
|
||||
if b.refCount > 0 {
|
||||
b.refCount++
|
||||
return b.blcza, nil
|
||||
}
|
||||
|
||||
blz := blobovnicza.New(append(b.options,
|
||||
blobovnicza.WithReadOnly(b.readOnly),
|
||||
blobovnicza.WithPath(b.path),
|
||||
blobovnicza.WithMetrics(b.metrics),
|
||||
)...)
|
||||
|
||||
if err := blz.Open(); err != nil {
|
||||
return nil, fmt.Errorf("could not open blobovnicza %s: %w", b.path, err)
|
||||
}
|
||||
if err := blz.Init(); err != nil {
|
||||
return nil, fmt.Errorf("could not init blobovnicza %s: %w", b.path, err)
|
||||
}
|
||||
|
||||
b.refCount++
|
||||
b.blcza = blz
|
||||
b.openDBCounter.Inc()
|
||||
|
||||
return blz, nil
|
||||
}
|
||||
|
||||
func (b *sharedDB) Close() {
|
||||
b.guard.Lock()
|
||||
defer b.guard.Unlock()
|
||||
|
||||
if b.refCount == 0 {
|
||||
b.log.Error(logs.AttemtToCloseAlreadyClosedBlobovnicza, zap.String("id", b.path))
|
||||
return
|
||||
}
|
||||
|
||||
if b.refCount == 1 {
|
||||
b.refCount = 0
|
||||
if err := b.blcza.Close(); err != nil {
|
||||
b.log.Error(logs.BlobovniczatreeCouldNotCloseBlobovnicza,
|
||||
zap.String("id", b.path),
|
||||
zap.String("error", err.Error()),
|
||||
)
|
||||
}
|
||||
b.blcza = nil
|
||||
b.openDBCounter.Dec()
|
||||
return
|
||||
}
|
||||
|
||||
b.refCount--
|
||||
}
|
||||
|
||||
func (b *sharedDB) Path() string {
|
||||
return b.path
|
||||
}
|
||||
|
||||
// levelDbManager stores pointers of the sharedDB's for the leaf directory of the blobovnicza tree.
|
||||
type levelDbManager struct {
|
||||
databases []*sharedDB
|
||||
}
|
||||
|
||||
func newLevelDBManager(width uint64, options []blobovnicza.Option, rootPath string, lvlPath string,
|
||||
readOnly bool, metrics blobovnicza.Metrics, openDBCounter *openDBCounter, closedFlog *atomic.Bool, log *logger.Logger) *levelDbManager {
|
||||
result := &levelDbManager{
|
||||
databases: make([]*sharedDB, width),
|
||||
}
|
||||
var idx uint64
|
||||
for idx = 0; idx < width; idx++ {
|
||||
result.databases[idx] = newSharedDB(options, filepath.Join(rootPath, lvlPath, u64ToHexString(idx)), readOnly, metrics, openDBCounter, closedFlog, log)
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
func (m *levelDbManager) GetByIndex(idx uint64) *sharedDB {
|
||||
return m.databases[idx]
|
||||
}
|
||||
|
||||
// dbManager manages the opening and closing of blobovnicza instances.
|
||||
//
|
||||
// The blobovnicza opens at the first request, closes after the last request.
|
||||
type dbManager struct {
|
||||
levelToManager map[string]*levelDbManager
|
||||
levelToManagerGuard *sync.RWMutex
|
||||
closedFlag *atomic.Bool
|
||||
dbCounter *openDBCounter
|
||||
|
||||
rootPath string
|
||||
options []blobovnicza.Option
|
||||
readOnly bool
|
||||
metrics blobovnicza.Metrics
|
||||
leafWidth uint64
|
||||
log *logger.Logger
|
||||
}
|
||||
|
||||
func newDBManager(rootPath string, options []blobovnicza.Option, leafWidth uint64, readOnly bool, metrics blobovnicza.Metrics, log *logger.Logger) *dbManager {
|
||||
return &dbManager{
|
||||
rootPath: rootPath,
|
||||
options: options,
|
||||
readOnly: readOnly,
|
||||
metrics: metrics,
|
||||
leafWidth: leafWidth,
|
||||
levelToManager: make(map[string]*levelDbManager),
|
||||
levelToManagerGuard: &sync.RWMutex{},
|
||||
log: log,
|
||||
closedFlag: &atomic.Bool{},
|
||||
dbCounter: newOpenDBCounter(),
|
||||
}
|
||||
}
|
||||
|
||||
func (m *dbManager) GetByPath(path string) *sharedDB {
|
||||
lvlPath := filepath.Dir(path)
|
||||
curIndex := u64FromHexString(filepath.Base(path))
|
||||
levelManager := m.getLevelManager(lvlPath)
|
||||
return levelManager.GetByIndex(curIndex)
|
||||
}
|
||||
|
||||
func (m *dbManager) Open() {
|
||||
m.closedFlag.Store(false)
|
||||
}
|
||||
|
||||
func (m *dbManager) Close() {
|
||||
m.closedFlag.Store(true)
|
||||
m.dbCounter.WaitUntilAllClosed()
|
||||
}
|
||||
|
||||
func (m *dbManager) getLevelManager(lvlPath string) *levelDbManager {
|
||||
result := m.getLevelManagerIfExists(lvlPath)
|
||||
if result != nil {
|
||||
return result
|
||||
}
|
||||
return m.getOrCreateLevelManager(lvlPath)
|
||||
}
|
||||
|
||||
func (m *dbManager) getLevelManagerIfExists(lvlPath string) *levelDbManager {
|
||||
m.levelToManagerGuard.RLock()
|
||||
defer m.levelToManagerGuard.RUnlock()
|
||||
|
||||
return m.levelToManager[lvlPath]
|
||||
}
|
||||
|
||||
func (m *dbManager) getOrCreateLevelManager(lvlPath string) *levelDbManager {
|
||||
m.levelToManagerGuard.Lock()
|
||||
defer m.levelToManagerGuard.Unlock()
|
||||
|
||||
if result, ok := m.levelToManager[lvlPath]; ok {
|
||||
return result
|
||||
}
|
||||
|
||||
result := newLevelDBManager(m.leafWidth, m.options, m.rootPath, lvlPath, m.readOnly, m.metrics, m.dbCounter, m.closedFlag, m.log)
|
||||
m.levelToManager[lvlPath] = result
|
||||
return result
|
||||
}
|
||||
|
||||
type openDBCounter struct {
|
||||
cond *sync.Cond
|
||||
count uint64
|
||||
}
|
||||
|
||||
func newOpenDBCounter() *openDBCounter {
|
||||
return &openDBCounter{
|
||||
cond: &sync.Cond{
|
||||
L: &sync.Mutex{},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func (c *openDBCounter) Inc() {
|
||||
c.cond.L.Lock()
|
||||
defer c.cond.L.Unlock()
|
||||
|
||||
c.count++
|
||||
}
|
||||
|
||||
func (c *openDBCounter) Dec() {
|
||||
c.cond.L.Lock()
|
||||
defer c.cond.L.Unlock()
|
||||
|
||||
if c.count > 0 {
|
||||
c.count--
|
||||
}
|
||||
|
||||
if c.count == 0 {
|
||||
c.cond.Broadcast()
|
||||
}
|
||||
}
|
||||
|
||||
func (c *openDBCounter) WaitUntilAllClosed() {
|
||||
c.cond.L.Lock()
|
||||
for c.count > 0 {
|
||||
c.cond.Wait()
|
||||
}
|
||||
c.cond.L.Unlock()
|
||||
}
|
|
@ -2,7 +2,6 @@ package blobovniczatree
|
|||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"path/filepath"
|
||||
"time"
|
||||
|
||||
|
@ -10,7 +9,6 @@ import (
|
|||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobovnicza"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||
"go.etcd.io/bbolt"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
"go.uber.org/zap"
|
||||
|
@ -76,8 +74,8 @@ type putIterator struct {
|
|||
PutPrm blobovnicza.PutPrm
|
||||
}
|
||||
|
||||
func (i *putIterator) iterate(ctx context.Context, path string) (bool, error) {
|
||||
active, err := i.B.getActivated(path)
|
||||
func (i *putIterator) iterate(ctx context.Context, lvlPath string) (bool, error) {
|
||||
active, err := i.B.activeDBManager.GetOpenedActiveDBForLevel(lvlPath)
|
||||
if err != nil {
|
||||
if !isLogical(err) {
|
||||
i.B.reportError(logs.BlobovniczatreeCouldNotGetActiveBlobovnicza, err)
|
||||
|
@ -89,46 +87,29 @@ func (i *putIterator) iterate(ctx context.Context, path string) (bool, error) {
|
|||
return false, nil
|
||||
}
|
||||
|
||||
if _, err := active.blz.Put(ctx, i.PutPrm); err != nil {
|
||||
// Check if blobovnicza is full. We could either receive `blobovnicza.ErrFull` error
|
||||
// or update active blobovnicza in other thread. In the latter case the database will be closed
|
||||
// and `updateActive` takes care of not updating the active blobovnicza twice.
|
||||
if isFull := errors.Is(err, blobovnicza.ErrFull); isFull || errors.Is(err, bbolt.ErrDatabaseNotOpen) {
|
||||
if isFull {
|
||||
i.B.log.Debug(logs.BlobovniczatreeBlobovniczaOverflowed,
|
||||
zap.String("path", filepath.Join(path, u64ToHexString(active.ind))))
|
||||
}
|
||||
|
||||
if err := i.B.updateActive(path, &active.ind); err != nil {
|
||||
if !isLogical(err) {
|
||||
i.B.reportError(logs.BlobovniczatreeCouldNotUpdateActiveBlobovnicza, err)
|
||||
} else {
|
||||
i.B.log.Debug(logs.BlobovniczatreeCouldNotUpdateActiveBlobovnicza,
|
||||
zap.String("level", path),
|
||||
zap.String("error", err.Error()))
|
||||
}
|
||||
|
||||
if active == nil {
|
||||
i.B.log.Debug(logs.BlobovniczatreeBlobovniczaOverflowed, zap.String("level", lvlPath))
|
||||
return false, nil
|
||||
}
|
||||
|
||||
return i.iterate(ctx, path)
|
||||
}
|
||||
defer active.Close()
|
||||
|
||||
i.AllFull = false
|
||||
|
||||
_, err = active.Blobovnicza().Put(ctx, i.PutPrm)
|
||||
if err != nil {
|
||||
if !isLogical(err) {
|
||||
i.B.reportError(logs.BlobovniczatreeCouldNotPutObjectToActiveBlobovnicza, err)
|
||||
} else {
|
||||
i.B.log.Debug(logs.BlobovniczatreeCouldNotPutObjectToActiveBlobovnicza,
|
||||
zap.String("path", filepath.Join(path, u64ToHexString(active.ind))),
|
||||
zap.String("path", active.Path()),
|
||||
zap.String("error", err.Error()))
|
||||
}
|
||||
|
||||
return false, nil
|
||||
}
|
||||
|
||||
path = filepath.Join(path, u64ToHexString(active.ind))
|
||||
|
||||
i.ID = blobovnicza.NewIDFromBytes([]byte(path))
|
||||
idx := u64FromHexString(filepath.Base(active.Path()))
|
||||
i.ID = blobovnicza.NewIDFromBytes([]byte(filepath.Join(lvlPath, u64ToHexString(idx))))
|
||||
|
||||
return true, nil
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue