From c672f59ab8a2b80f0fd84dd9c22897f0a7324075 Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Wed, 30 Aug 2023 23:36:48 +0300 Subject: [PATCH] [#536] blobovnicza: Drop cache Each blobovnicza instance is opened while is in use. Signed-off-by: Dmitrii Stepanov --- internal/logs/logs.go | 1 + .../blobovnicza/blobovnicza_test.go | 5 +- pkg/local_object_storage/blobovnicza/put.go | 8 - pkg/local_object_storage/blobovnicza/sizes.go | 2 +- .../blobstor/blobovniczatree/active.go | 213 +++++++++++++++ .../blobstor/blobovniczatree/blobovnicza.go | 143 +---------- .../blobovniczatree/concurrency_test.go | 59 +++++ .../blobstor/blobovniczatree/control.go | 111 ++------ .../blobstor/blobovniczatree/delete.go | 67 +---- .../blobstor/blobovniczatree/errors.go | 2 + .../blobstor/blobovniczatree/exists.go | 14 +- .../blobstor/blobovniczatree/get.go | 67 +---- .../blobstor/blobovniczatree/get_range.go | 76 +----- .../blobstor/blobovniczatree/iterate.go | 4 +- .../blobstor/blobovniczatree/manager.go | 243 ++++++++++++++++++ .../blobstor/blobovniczatree/put.go | 45 +--- 16 files changed, 586 insertions(+), 474 deletions(-) create mode 100644 pkg/local_object_storage/blobstor/blobovniczatree/active.go create mode 100644 pkg/local_object_storage/blobstor/blobovniczatree/concurrency_test.go create mode 100644 pkg/local_object_storage/blobstor/blobovniczatree/manager.go diff --git a/internal/logs/logs.go b/internal/logs/logs.go index 6ceee4f17..ed88e615b 100644 --- a/internal/logs/logs.go +++ b/internal/logs/logs.go @@ -509,4 +509,5 @@ const ( RuntimeSoftMemoryLimitUpdated = "soft runtime memory limit value updated" RuntimeSoftMemoryDefinedWithGOMEMLIMIT = "soft runtime memory defined with GOMEMLIMIT environment variable, config value skipped" FailedToCountWritecacheItems = "failed to count writecache items" + AttemtToCloseAlreadyClosedBlobovnicza = "attempt to close an already closed blobovnicza" ) diff --git a/pkg/local_object_storage/blobovnicza/blobovnicza_test.go b/pkg/local_object_storage/blobovnicza/blobovnicza_test.go index 48a189c9d..8d701ae5c 100644 --- a/pkg/local_object_storage/blobovnicza/blobovnicza_test.go +++ b/pkg/local_object_storage/blobovnicza/blobovnicza_test.go @@ -3,7 +3,6 @@ package blobovnicza import ( "context" "crypto/rand" - "errors" "os" "testing" @@ -98,9 +97,9 @@ func TestBlobovnicza(t *testing.T) { testPutGet(t, blz, oidtest.Address(), objSizeLim, nil, nil) } - // from now objects should not be saved + // blobovnizca accepts object event if full testPutGet(t, blz, oidtest.Address(), 1024, func(err error) bool { - return errors.Is(err, ErrFull) + return err == nil }, nil) require.NoError(t, blz.Close()) diff --git a/pkg/local_object_storage/blobovnicza/put.go b/pkg/local_object_storage/blobovnicza/put.go index e43b89ec6..787372211 100644 --- a/pkg/local_object_storage/blobovnicza/put.go +++ b/pkg/local_object_storage/blobovnicza/put.go @@ -23,10 +23,6 @@ type PutPrm struct { type PutRes struct { } -// ErrFull is returned when trying to save an -// object to a filled blobovnicza. -var ErrFull = logicerr.New("blobovnicza is full") - // SetAddress sets the address of the saving object. func (p *PutPrm) SetAddress(addr oid.Address) { p.addr = addr @@ -65,10 +61,6 @@ func (b *Blobovnicza) Put(ctx context.Context, prm PutPrm) (PutRes, error) { key := addressKey(prm.addr) err := b.boltDB.Batch(func(tx *bbolt.Tx) error { - if b.full() { - return ErrFull - } - buck := tx.Bucket(bucketName) if buck == nil { // expected to happen: diff --git a/pkg/local_object_storage/blobovnicza/sizes.go b/pkg/local_object_storage/blobovnicza/sizes.go index 290df9c93..cd1f69725 100644 --- a/pkg/local_object_storage/blobovnicza/sizes.go +++ b/pkg/local_object_storage/blobovnicza/sizes.go @@ -55,6 +55,6 @@ func (b *Blobovnicza) itemDeleted(itemSize uint64) { b.metrics.SubOpenBlobovniczaItems(1) } -func (b *Blobovnicza) full() bool { +func (b *Blobovnicza) IsFull() bool { return b.dataSize.Load() >= b.fullSizeLimit } diff --git a/pkg/local_object_storage/blobstor/blobovniczatree/active.go b/pkg/local_object_storage/blobstor/blobovniczatree/active.go new file mode 100644 index 000000000..526699b45 --- /dev/null +++ b/pkg/local_object_storage/blobstor/blobovniczatree/active.go @@ -0,0 +1,213 @@ +package blobovniczatree + +import ( + "path/filepath" + "sync" + + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobovnicza" + utilSync "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/sync" +) + +type activeDB struct { + blz *blobovnicza.Blobovnicza + shDB *sharedDB +} + +func (db *activeDB) Blobovnicza() *blobovnicza.Blobovnicza { + return db.blz +} + +func (db *activeDB) Close() { + db.shDB.Close() +} + +func (db *activeDB) Path() string { + return db.shDB.Path() +} + +// activeDBManager manages active blobovnicza instances (that is, those that are being used for Put). +// +// Uses dbManager for opening/closing sharedDB instances. +// Stores a reference to an open active sharedDB, so dbManager does not close it. +// When changing the active sharedDB, releases the reference to the previous active sharedDB. +type activeDBManager struct { + levelToActiveDBGuard *sync.RWMutex + levelToActiveDB map[string]*sharedDB + levelLock *utilSync.KeyLocker[string] + closed bool + + dbManager *dbManager + leafWidth uint64 +} + +func newActiveDBManager(dbManager *dbManager, leafWidth uint64) *activeDBManager { + return &activeDBManager{ + levelToActiveDBGuard: &sync.RWMutex{}, + levelToActiveDB: make(map[string]*sharedDB), + levelLock: utilSync.NewKeyLocker[string](), + + dbManager: dbManager, + leafWidth: leafWidth, + } +} + +// GetOpenedActiveDBForLevel returns active DB for level. +// DB must be closed after use. +func (m *activeDBManager) GetOpenedActiveDBForLevel(lvlPath string) (*activeDB, error) { + activeDB, err := m.getCurrentActiveIfOk(lvlPath) + if err != nil { + return nil, err + } + if activeDB != nil { + return activeDB, nil + } + + return m.updateAndGetActive(lvlPath) +} + +func (m *activeDBManager) Open() { + m.levelToActiveDBGuard.Lock() + defer m.levelToActiveDBGuard.Unlock() + + m.closed = false +} + +func (m *activeDBManager) Close() { + m.levelToActiveDBGuard.Lock() + defer m.levelToActiveDBGuard.Unlock() + + for _, db := range m.levelToActiveDB { + db.Close() + } + m.levelToActiveDB = make(map[string]*sharedDB) + m.closed = true +} + +func (m *activeDBManager) getCurrentActiveIfOk(lvlPath string) (*activeDB, error) { + m.levelToActiveDBGuard.RLock() + defer m.levelToActiveDBGuard.RUnlock() + + if m.closed { + return nil, errClosed + } + + db, ok := m.levelToActiveDB[lvlPath] + if !ok { + return nil, nil + } + + blz, err := db.Open() //open db for usage, will be closed on activeDB.Close() + if err != nil { + return nil, err + } + + if blz.IsFull() { + db.Close() + return nil, nil + } + + return &activeDB{ + blz: blz, + shDB: db, + }, nil +} + +func (m *activeDBManager) updateAndGetActive(lvlPath string) (*activeDB, error) { + m.levelLock.Lock(lvlPath) + defer m.levelLock.Unlock(lvlPath) + + current, err := m.getCurrentActiveIfOk(lvlPath) + if err != nil { + return nil, err + } + if current != nil { + return current, nil + } + + nextShDB, err := m.getNextSharedDB(lvlPath) + if err != nil { + return nil, err + } + + if nextShDB == nil { + return nil, nil + } + + blz, err := nextShDB.Open() // open db for client, client must call Close() after usage + if err != nil { + return nil, err + } + return &activeDB{ + blz: blz, + shDB: nextShDB, + }, nil +} + +func (m *activeDBManager) getNextSharedDB(lvlPath string) (*sharedDB, error) { + var idx uint64 + var iterCount uint64 + hasActive, currentIdx := m.hasActiveDB(lvlPath) + if hasActive { + idx = (currentIdx + 1) % m.leafWidth + } + + var next *sharedDB + + for iterCount < m.leafWidth { + path := filepath.Join(lvlPath, u64ToHexString(idx)) + shDB := m.dbManager.GetByPath(path) + db, err := shDB.Open() //open db to hold active DB open, will be closed if db is full, after m.replace or by activeDBManager.Close() + if err != nil { + return nil, err + } + if db.IsFull() { + shDB.Close() + } else { + next = shDB + break + } + idx = (idx + 1) % m.leafWidth + iterCount++ + } + + previous, updated := m.replace(lvlPath, next) + if !updated && next != nil { + next.Close() // manager is closed, so don't hold active DB open + } + if updated && previous != nil { + previous.Close() + } + return next, nil +} + +func (m *activeDBManager) hasActiveDB(lvlPath string) (bool, uint64) { + m.levelToActiveDBGuard.RLock() + defer m.levelToActiveDBGuard.RUnlock() + + if m.closed { + return false, 0 + } + + db, ok := m.levelToActiveDB[lvlPath] + if !ok { + return false, 0 + } + return true, u64FromHexString(filepath.Base(db.Path())) +} + +func (m *activeDBManager) replace(lvlPath string, shDB *sharedDB) (*sharedDB, bool) { + m.levelToActiveDBGuard.Lock() + defer m.levelToActiveDBGuard.Unlock() + + if m.closed { + return nil, false + } + + previous := m.levelToActiveDB[lvlPath] + if shDB == nil { + delete(m.levelToActiveDB, lvlPath) + } else { + m.levelToActiveDB[lvlPath] = shDB + } + return previous, true +} diff --git a/pkg/local_object_storage/blobstor/blobovniczatree/blobovnicza.go b/pkg/local_object_storage/blobstor/blobovniczatree/blobovnicza.go index 80d064c80..46a8df685 100644 --- a/pkg/local_object_storage/blobstor/blobovniczatree/blobovnicza.go +++ b/pkg/local_object_storage/blobstor/blobovniczatree/blobovnicza.go @@ -3,19 +3,12 @@ package blobovniczatree import ( "errors" "fmt" - "path/filepath" "strconv" - "sync" - "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobovnicza" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/compression" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "git.frostfs.info/TrueCloudLab/hrw" - "github.com/hashicorp/golang-lru/v2/simplelru" - "go.uber.org/zap" ) // Blobovniczas represents the storage of the "small" objects. @@ -61,28 +54,8 @@ import ( type Blobovniczas struct { cfg - // cache of opened filled Blobovniczas - opened *simplelru.LRU[string, *blobovnicza.Blobovnicza] - // lruMtx protects opened cache. - // It isn't RWMutex because `Get` calls must - // lock this mutex on write, as LRU info is updated. - // It must be taken after activeMtx in case when eviction is possible - // i.e. `Add`, `Purge` and `Remove` calls. - lruMtx sync.Mutex - - // mutex to exclude parallel bbolt.Open() calls - // bbolt.Open() deadlocks if it tries to open already opened file - openMtx sync.Mutex - - // list of active (opened, non-filled) Blobovniczas - activeMtx sync.RWMutex - active map[string]blobovniczaWithIndex -} - -type blobovniczaWithIndex struct { - ind uint64 - - blz *blobovnicza.Blobovnicza + commondbManager *dbManager + activeDBManager *activeDBManager } var _ common.Storage = (*Blobovniczas)(nil) @@ -102,120 +75,12 @@ func NewBlobovniczaTree(opts ...Option) (blz *Blobovniczas) { blz.blzLeafWidth = blz.blzShallowWidth } - cache, err := simplelru.NewLRU[string, *blobovnicza.Blobovnicza](blz.openedCacheSize, func(p string, value *blobovnicza.Blobovnicza) { - lvlPath := filepath.Dir(p) - if b, ok := blz.active[lvlPath]; ok && b.ind == u64FromHexString(filepath.Base(p)) { - // This branch is taken if we have recently updated active blobovnicza and remove - // it from opened cache. - return - } else if err := value.Close(); err != nil { - blz.log.Error(logs.BlobovniczatreeCouldNotCloseBlobovnicza, - zap.String("id", p), - zap.String("error", err.Error()), - ) - } else { - blz.log.Debug(logs.BlobovniczatreeBlobovniczaSuccessfullyClosedOnEvict, - zap.String("id", p), - ) - } - }) - if err != nil { - // occurs only if the size is not positive - panic(fmt.Errorf("could not create LRU cache of size %d: %w", blz.openedCacheSize, err)) - } - - activeMapCapacity := uint64(1) - for i := uint64(0); i < blz.blzShallowDepth; i++ { - if i+1 == blz.blzShallowDepth { - activeMapCapacity *= blz.blzLeafWidth - } else { - activeMapCapacity *= blz.blzShallowWidth - } - } - - blz.opened = cache - blz.active = make(map[string]blobovniczaWithIndex, activeMapCapacity) + blz.commondbManager = newDBManager(blz.rootPath, blz.blzOpts, blz.blzLeafWidth, blz.readOnly, blz.metrics.Blobovnicza(), blz.log) + blz.activeDBManager = newActiveDBManager(blz.commondbManager, blz.blzLeafWidth) return blz } -// activates and returns activated blobovnicza of p-level (dir). -// -// returns error if blobvnicza could not be activated. -func (b *Blobovniczas) getActivated(lvlPath string) (blobovniczaWithIndex, error) { - return b.updateAndGet(lvlPath, nil) -} - -// updates active blobovnicza of p-level (dir). -// -// if current active blobovnicza's index is not old, it remains unchanged. -func (b *Blobovniczas) updateActive(lvlPath string, old *uint64) error { - b.log.Debug(logs.BlobovniczatreeUpdatingActiveBlobovnicza, zap.String("path", lvlPath)) - - _, err := b.updateAndGet(lvlPath, old) - - b.log.Debug(logs.BlobovniczatreeActiveBlobovniczaSuccessfullyUpdated, zap.String("path", lvlPath)) - - return err -} - -// updates and returns active blobovnicza of p-level (dir). -// -// if current active blobovnicza's index is not old, it is returned unchanged. -func (b *Blobovniczas) updateAndGet(lvlPath string, old *uint64) (blobovniczaWithIndex, error) { - b.activeMtx.RLock() - active, ok := b.active[lvlPath] - b.activeMtx.RUnlock() - - if ok { - if old != nil { - if active.ind == b.blzLeafWidth-1 { - return active, logicerr.New("no more Blobovniczas") - } else if active.ind != *old { - // sort of CAS in order to control concurrent - // updateActive calls - return active, nil - } - } else { - return active, nil - } - - active.ind++ - } - - var err error - if active.blz, err = b.openBlobovnicza(filepath.Join(lvlPath, u64ToHexString(active.ind))); err != nil { - return active, err - } - - b.activeMtx.Lock() - defer b.activeMtx.Unlock() - - // check 2nd time to find out if it blobovnicza was activated while thread was locked - tryActive, ok := b.active[lvlPath] - if ok && tryActive.blz == active.blz { - return tryActive, nil - } - - // Remove from opened cache (active blobovnicza should always be opened). - // Because `onEvict` callback is called in `Remove`, we need to update - // active map beforehand. - b.active[lvlPath] = active - - activePath := filepath.Join(lvlPath, u64ToHexString(active.ind)) - b.lruMtx.Lock() - b.opened.Remove(activePath) - if ok { - b.opened.Add(filepath.Join(lvlPath, u64ToHexString(tryActive.ind)), tryActive.blz) - } - b.lruMtx.Unlock() - - b.log.Debug(logs.BlobovniczatreeBlobovniczaSuccessfullyActivated, - zap.String("path", activePath)) - - return active, nil -} - // returns hash of the object address. func addressHash(addr *oid.Address, path string) uint64 { var a string diff --git a/pkg/local_object_storage/blobstor/blobovniczatree/concurrency_test.go b/pkg/local_object_storage/blobstor/blobovniczatree/concurrency_test.go new file mode 100644 index 000000000..018ecce3c --- /dev/null +++ b/pkg/local_object_storage/blobstor/blobovniczatree/concurrency_test.go @@ -0,0 +1,59 @@ +package blobovniczatree + +import ( + "context" + "sync" + "sync/atomic" + "testing" + + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger/test" + "github.com/stretchr/testify/require" +) + +func TestBlobovniczaTree_Concurrency(t *testing.T) { + t.Parallel() + const n = 1000 + + st := NewBlobovniczaTree( + WithLogger(test.NewLogger(t, true)), + WithObjectSizeLimit(1024), + WithBlobovniczaShallowWidth(10), + WithBlobovniczaShallowDepth(1), + WithRootPath(t.TempDir())) + require.NoError(t, st.Open(false)) + require.NoError(t, st.Init()) + t.Cleanup(func() { + require.NoError(t, st.Close()) + }) + + objGen := &testutil.SeqObjGenerator{ObjSize: 1} + + var cnt atomic.Int64 + var wg sync.WaitGroup + for i := 0; i < 1000; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for cnt.Add(1) <= n { + obj := objGen.Next() + addr := testutil.AddressFromObject(t, obj) + + raw, err := obj.Marshal() + require.NoError(t, err) + + _, err = st.Put(context.Background(), common.PutPrm{ + Address: addr, + RawData: raw, + }) + require.NoError(t, err) + + _, err = st.Get(context.Background(), common.GetPrm{Address: addr}) + require.NoError(t, err) // fails very often, correlated to how many goroutines are started + } + }() + } + + wg.Wait() +} diff --git a/pkg/local_object_storage/blobstor/blobovniczatree/control.go b/pkg/local_object_storage/blobstor/blobovniczatree/control.go index d7b0799c1..c3d12088a 100644 --- a/pkg/local_object_storage/blobstor/blobovniczatree/control.go +++ b/pkg/local_object_storage/blobstor/blobovniczatree/control.go @@ -2,11 +2,8 @@ package blobovniczatree import ( "context" - "fmt" - "path/filepath" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobovnicza" "go.uber.org/zap" ) @@ -14,6 +11,7 @@ import ( func (b *Blobovniczas) Open(readOnly bool) error { b.readOnly = readOnly b.metrics.SetMode(readOnly) + b.openManagers() return nil } @@ -23,59 +21,35 @@ func (b *Blobovniczas) Open(readOnly bool) error { func (b *Blobovniczas) Init() error { b.log.Debug(logs.BlobovniczatreeInitializingBlobovniczas) + b.openManagers() + if b.readOnly { b.log.Debug(logs.BlobovniczatreeReadonlyModeSkipBlobovniczasInitialization) return nil } return b.iterateLeaves(context.TODO(), func(p string) (bool, error) { - blz, err := b.openBlobovniczaNoCache(p) + shBlz := b.openBlobovniczaNoCache(p) + _, err := shBlz.Open() if err != nil { return true, err } - defer blz.Close() - - if err := blz.Init(); err != nil { - return true, fmt.Errorf("could not initialize blobovnicza structure %s: %w", p, err) - } + defer shBlz.Close() b.log.Debug(logs.BlobovniczatreeBlobovniczaSuccessfullyInitializedClosing, zap.String("id", p)) return false, nil }) } +func (b *Blobovniczas) openManagers() { + b.commondbManager.Open() //order important + b.activeDBManager.Open() +} + // Close implements common.Storage. func (b *Blobovniczas) Close() error { - b.activeMtx.Lock() - - b.lruMtx.Lock() - - for p, v := range b.active { - if err := v.blz.Close(); err != nil { - b.log.Debug(logs.BlobovniczatreeCouldNotCloseActiveBlobovnicza, - zap.String("path", p), - zap.String("error", err.Error()), - ) - } - b.opened.Remove(p) - } - for _, k := range b.opened.Keys() { - blz, _ := b.opened.Get(k) - if err := blz.Close(); err != nil { - b.log.Debug(logs.BlobovniczatreeCouldNotCloseActiveBlobovnicza, - zap.String("path", k), - zap.String("error", err.Error()), - ) - } - b.opened.Remove(k) - } - - b.active = make(map[string]blobovniczaWithIndex) - b.metrics.Close() - - b.lruMtx.Unlock() - - b.activeMtx.Unlock() + b.activeDBManager.Close() //order important + b.commondbManager.Close() return nil } @@ -83,61 +57,10 @@ func (b *Blobovniczas) Close() error { // opens and returns blobovnicza with path p. // // If blobovnicza is already opened and cached, instance from cache is returned w/o changes. -func (b *Blobovniczas) openBlobovnicza(p string) (*blobovnicza.Blobovnicza, error) { - b.lruMtx.Lock() - v, ok := b.opened.Get(p) - b.lruMtx.Unlock() - if ok { - // blobovnicza should be opened in cache - return v, nil - } - - lvlPath := filepath.Dir(p) - curIndex := u64FromHexString(filepath.Base(p)) - - b.activeMtx.RLock() - defer b.activeMtx.RUnlock() - - active, ok := b.active[lvlPath] - if ok && active.ind == curIndex { - return active.blz, nil - } - - b.lruMtx.Lock() - defer b.lruMtx.Unlock() - - v, ok = b.opened.Get(p) - if ok { - return v, nil - } - - blz, err := b.openBlobovniczaNoCache(p) - if err != nil { - return nil, err - } - - b.opened.Add(p, blz) - - return blz, nil +func (b *Blobovniczas) openBlobovnicza(p string) *sharedDB { + return b.openBlobovniczaNoCache(p) } -func (b *Blobovniczas) openBlobovniczaNoCache(p string) (*blobovnicza.Blobovnicza, error) { - b.openMtx.Lock() - defer b.openMtx.Unlock() - - path := filepath.Join(b.rootPath, p) - - blz := blobovnicza.New(append(b.blzOpts, - blobovnicza.WithReadOnly(b.readOnly), - blobovnicza.WithPath(path), - blobovnicza.WithMetrics(b.metrics.Blobovnicza()), - )...) - - if err := blz.Open(); err != nil { - return nil, fmt.Errorf("could not open blobovnicza %s: %w", p, err) - } - if err := blz.Init(); err != nil { - return nil, fmt.Errorf("could not init blobovnicza %s: %w", p, err) - } - return blz, nil +func (b *Blobovniczas) openBlobovniczaNoCache(p string) *sharedDB { + return b.commondbManager.GetByPath(p) } diff --git a/pkg/local_object_storage/blobstor/blobovniczatree/delete.go b/pkg/local_object_storage/blobstor/blobovniczatree/delete.go index 0698c232d..bc7735e74 100644 --- a/pkg/local_object_storage/blobstor/blobovniczatree/delete.go +++ b/pkg/local_object_storage/blobstor/blobovniczatree/delete.go @@ -3,7 +3,6 @@ package blobovniczatree import ( "context" "encoding/hex" - "path/filepath" "time" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" @@ -48,10 +47,12 @@ func (b *Blobovniczas) Delete(ctx context.Context, prm common.DeletePrm) (res co if prm.StorageID != nil { id := blobovnicza.NewIDFromBytes(prm.StorageID) - blz, err := b.openBlobovnicza(id.String()) + shBlz := b.openBlobovnicza(id.String()) + blz, err := shBlz.Open() if err != nil { return res, err } + defer shBlz.Close() if res, err = b.deleteObject(ctx, blz, bPrm); err == nil { success = true @@ -59,16 +60,10 @@ func (b *Blobovniczas) Delete(ctx context.Context, prm common.DeletePrm) (res co return res, err } - activeCache := make(map[string]struct{}) objectFound := false err = b.iterateSortedLeaves(ctx, &prm.Address, func(p string) (bool, error) { - dirPath := filepath.Dir(p) - - // don't process active blobovnicza of the level twice - _, ok := activeCache[dirPath] - - res, err = b.deleteObjectFromLevel(ctx, bPrm, p, !ok) + res, err = b.deleteObjectFromLevel(ctx, bPrm, p) if err != nil { if !client.IsErrObjectNotFound(err) { b.log.Debug(logs.BlobovniczatreeCouldNotRemoveObjectFromLevel, @@ -78,8 +73,6 @@ func (b *Blobovniczas) Delete(ctx context.Context, prm common.DeletePrm) (res co } } - activeCache[dirPath] = struct{}{} - if err == nil { objectFound = true } @@ -100,57 +93,13 @@ func (b *Blobovniczas) Delete(ctx context.Context, prm common.DeletePrm) (res co // tries to delete object from particular blobovnicza. // // returns no error if object was removed from some blobovnicza of the same level. -func (b *Blobovniczas) deleteObjectFromLevel(ctx context.Context, prm blobovnicza.DeletePrm, blzPath string, tryActive bool) (common.DeleteRes, error) { - lvlPath := filepath.Dir(blzPath) - - // try to remove from blobovnicza if it is opened - b.lruMtx.Lock() - v, ok := b.opened.Get(blzPath) - b.lruMtx.Unlock() - if ok { - if res, err := b.deleteObject(ctx, v, prm); err == nil { - return res, err - } else if !client.IsErrObjectNotFound(err) { - b.log.Debug(logs.BlobovniczatreeCouldNotRemoveObjectFromOpenedBlobovnicza, - zap.String("path", blzPath), - zap.String("error", err.Error()), - ) - } - } - - // therefore the object is possibly placed in a lighter blobovnicza - - // next we check in the active level blobobnicza: - // * the active blobovnicza is always opened. - b.activeMtx.RLock() - active, ok := b.active[lvlPath] - b.activeMtx.RUnlock() - - if ok && tryActive { - if res, err := b.deleteObject(ctx, active.blz, prm); err == nil { - return res, err - } else if !client.IsErrObjectNotFound(err) { - b.log.Debug(logs.BlobovniczatreeCouldNotRemoveObjectFromActiveBlobovnicza, - zap.String("path", blzPath), - zap.String("error", err.Error()), - ) - } - } - - // then object is possibly placed in closed blobovnicza - - // check if it makes sense to try to open the blob - // (Blobovniczas "after" the active one are empty anyway, - // and it's pointless to open them). - if u64FromHexString(filepath.Base(blzPath)) > active.ind { - return common.DeleteRes{}, logicerr.Wrap(new(apistatus.ObjectNotFound)) - } - - // open blobovnicza (cached inside) - blz, err := b.openBlobovnicza(blzPath) +func (b *Blobovniczas) deleteObjectFromLevel(ctx context.Context, prm blobovnicza.DeletePrm, blzPath string) (common.DeleteRes, error) { + shBlz := b.openBlobovnicza(blzPath) + blz, err := shBlz.Open() if err != nil { return common.DeleteRes{}, err } + defer shBlz.Close() return b.deleteObject(ctx, blz, prm) } diff --git a/pkg/local_object_storage/blobstor/blobovniczatree/errors.go b/pkg/local_object_storage/blobstor/blobovniczatree/errors.go index 04247280a..be0fd81c3 100644 --- a/pkg/local_object_storage/blobstor/blobovniczatree/errors.go +++ b/pkg/local_object_storage/blobstor/blobovniczatree/errors.go @@ -7,6 +7,8 @@ import ( apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" ) +var errClosed = logicerr.New("blobvnicza is closed") + func isErrOutOfRange(err error) bool { var target *apistatus.ObjectOutOfRange return errors.As(err, &target) diff --git a/pkg/local_object_storage/blobstor/blobovniczatree/exists.go b/pkg/local_object_storage/blobstor/blobovniczatree/exists.go index c1964c227..f7a192526 100644 --- a/pkg/local_object_storage/blobstor/blobovniczatree/exists.go +++ b/pkg/local_object_storage/blobstor/blobovniczatree/exists.go @@ -3,7 +3,6 @@ package blobovniczatree import ( "context" "encoding/hex" - "path/filepath" "time" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" @@ -37,26 +36,22 @@ func (b *Blobovniczas) Exists(ctx context.Context, prm common.ExistsPrm) (common if prm.StorageID != nil { id := blobovnicza.NewIDFromBytes(prm.StorageID) - blz, err := b.openBlobovnicza(id.String()) + shBlz := b.openBlobovnicza(id.String()) + blz, err := shBlz.Open() if err != nil { return common.ExistsRes{}, err } + defer shBlz.Close() exists, err := blz.Exists(ctx, prm.Address) return common.ExistsRes{Exists: exists}, err } - activeCache := make(map[string]struct{}) - var gPrm blobovnicza.GetPrm gPrm.SetAddress(prm.Address) err := b.iterateSortedLeaves(ctx, &prm.Address, func(p string) (bool, error) { - dirPath := filepath.Dir(p) - - _, ok := activeCache[dirPath] - - _, err := b.getObjectFromLevel(ctx, gPrm, p, !ok) + _, err := b.getObjectFromLevel(ctx, gPrm, p) if err != nil { if !client.IsErrObjectNotFound(err) { b.log.Debug(logs.BlobovniczatreeCouldNotGetObjectFromLevel, @@ -65,7 +60,6 @@ func (b *Blobovniczas) Exists(ctx context.Context, prm common.ExistsPrm) (common } } - activeCache[dirPath] = struct{}{} found = err == nil return found, nil }) diff --git a/pkg/local_object_storage/blobstor/blobovniczatree/get.go b/pkg/local_object_storage/blobstor/blobovniczatree/get.go index 671d045b9..aca39bf3b 100644 --- a/pkg/local_object_storage/blobstor/blobovniczatree/get.go +++ b/pkg/local_object_storage/blobstor/blobovniczatree/get.go @@ -4,7 +4,6 @@ import ( "context" "encoding/hex" "fmt" - "path/filepath" "time" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" @@ -48,10 +47,12 @@ func (b *Blobovniczas) Get(ctx context.Context, prm common.GetPrm) (res common.G if prm.StorageID != nil { id := blobovnicza.NewIDFromBytes(prm.StorageID) - blz, err := b.openBlobovnicza(id.String()) + shBlz := b.openBlobovnicza(id.String()) + blz, err := shBlz.Open() if err != nil { return res, err } + defer shBlz.Close() res, err = b.getObject(ctx, blz, bPrm) if err == nil { @@ -61,14 +62,8 @@ func (b *Blobovniczas) Get(ctx context.Context, prm common.GetPrm) (res common.G return res, err } - activeCache := make(map[string]struct{}) - err = b.iterateSortedLeaves(ctx, &prm.Address, func(p string) (bool, error) { - dirPath := filepath.Dir(p) - - _, ok := activeCache[dirPath] - - res, err = b.getObjectFromLevel(ctx, bPrm, p, !ok) + res, err = b.getObjectFromLevel(ctx, bPrm, p) if err != nil { if !client.IsErrObjectNotFound(err) { b.log.Debug(logs.BlobovniczatreeCouldNotGetObjectFromLevel, @@ -78,8 +73,6 @@ func (b *Blobovniczas) Get(ctx context.Context, prm common.GetPrm) (res common.G } } - activeCache[dirPath] = struct{}{} - // abort iterator if found, otherwise process all Blobovniczas return err == nil, nil }) @@ -98,58 +91,14 @@ func (b *Blobovniczas) Get(ctx context.Context, prm common.GetPrm) (res common.G // tries to read object from particular blobovnicza. // // returns error if object could not be read from any blobovnicza of the same level. -func (b *Blobovniczas) getObjectFromLevel(ctx context.Context, prm blobovnicza.GetPrm, blzPath string, tryActive bool) (common.GetRes, error) { - lvlPath := filepath.Dir(blzPath) - - // try to read from blobovnicza if it is opened - b.lruMtx.Lock() - v, ok := b.opened.Get(blzPath) - b.lruMtx.Unlock() - if ok { - if res, err := b.getObject(ctx, v, prm); err == nil { - return res, err - } else if !client.IsErrObjectNotFound(err) { - b.log.Debug(logs.BlobovniczatreeCouldNotReadObjectFromOpenedBlobovnicza, - zap.String("path", blzPath), - zap.String("error", err.Error()), - ) - } - } - - // therefore the object is possibly placed in a lighter blobovnicza - - // next we check in the active level blobobnicza: - // * the freshest objects are probably the most demanded; - // * the active blobovnicza is always opened. - b.activeMtx.RLock() - active, ok := b.active[lvlPath] - b.activeMtx.RUnlock() - - if ok && tryActive { - if res, err := b.getObject(ctx, active.blz, prm); err == nil { - return res, err - } else if !client.IsErrObjectNotFound(err) { - b.log.Debug(logs.BlobovniczatreeCouldNotGetObjectFromActiveBlobovnicza, - zap.String("path", blzPath), - zap.String("error", err.Error()), - ) - } - } - - // then object is possibly placed in closed blobovnicza - - // check if it makes sense to try to open the blob - // (Blobovniczas "after" the active one are empty anyway, - // and it's pointless to open them). - if u64FromHexString(filepath.Base(blzPath)) > active.ind { - return common.GetRes{}, logicerr.Wrap(new(apistatus.ObjectNotFound)) - } - +func (b *Blobovniczas) getObjectFromLevel(ctx context.Context, prm blobovnicza.GetPrm, blzPath string) (common.GetRes, error) { // open blobovnicza (cached inside) - blz, err := b.openBlobovnicza(blzPath) + shBlz := b.openBlobovnicza(blzPath) + blz, err := shBlz.Open() if err != nil { return common.GetRes{}, err } + defer shBlz.Close() return b.getObject(ctx, blz, prm) } diff --git a/pkg/local_object_storage/blobstor/blobovniczatree/get_range.go b/pkg/local_object_storage/blobstor/blobovniczatree/get_range.go index 088ebe249..3d1a6b988 100644 --- a/pkg/local_object_storage/blobstor/blobovniczatree/get_range.go +++ b/pkg/local_object_storage/blobstor/blobovniczatree/get_range.go @@ -4,7 +4,6 @@ import ( "context" "encoding/hex" "fmt" - "path/filepath" "strconv" "time" @@ -47,10 +46,12 @@ func (b *Blobovniczas) GetRange(ctx context.Context, prm common.GetRangePrm) (re if prm.StorageID != nil { id := blobovnicza.NewIDFromBytes(prm.StorageID) - blz, err := b.openBlobovnicza(id.String()) + shBlz := b.openBlobovnicza(id.String()) + blz, err := shBlz.Open() if err != nil { return common.GetRangeRes{}, err } + defer shBlz.Close() res, err := b.getObjectRange(ctx, blz, prm) if err == nil { @@ -60,15 +61,10 @@ func (b *Blobovniczas) GetRange(ctx context.Context, prm common.GetRangePrm) (re return res, err } - activeCache := make(map[string]struct{}) objectFound := false err = b.iterateSortedLeaves(ctx, &prm.Address, func(p string) (bool, error) { - dirPath := filepath.Dir(p) - - _, ok := activeCache[dirPath] - - res, err = b.getRangeFromLevel(ctx, prm, p, !ok) + res, err = b.getRangeFromLevel(ctx, prm, p) if err != nil { outOfBounds := isErrOutOfRange(err) if !outOfBounds && !client.IsErrObjectNotFound(err) { @@ -82,8 +78,6 @@ func (b *Blobovniczas) GetRange(ctx context.Context, prm common.GetRangePrm) (re } } - activeCache[dirPath] = struct{}{} - objectFound = err == nil // abort iterator if found, otherwise process all Blobovniczas @@ -106,68 +100,14 @@ func (b *Blobovniczas) GetRange(ctx context.Context, prm common.GetRangePrm) (re // tries to read range of object payload data from particular blobovnicza. // // returns error if object could not be read from any blobovnicza of the same level. -func (b *Blobovniczas) getRangeFromLevel(ctx context.Context, prm common.GetRangePrm, blzPath string, tryActive bool) (common.GetRangeRes, error) { - lvlPath := filepath.Dir(blzPath) - - // try to read from blobovnicza if it is opened - b.lruMtx.Lock() - v, ok := b.opened.Get(blzPath) - b.lruMtx.Unlock() - if ok { - res, err := b.getObjectRange(ctx, v, prm) - switch { - case err == nil, - isErrOutOfRange(err): - return res, err - default: - if !client.IsErrObjectNotFound(err) { - b.log.Debug(logs.BlobovniczatreeCouldNotReadPayloadRangeFromOpenedBlobovnicza, - zap.String("path", blzPath), - zap.String("error", err.Error()), - ) - } - } - } - - // therefore the object is possibly placed in a lighter blobovnicza - - // next we check in the active level blobobnicza: - // * the freshest objects are probably the most demanded; - // * the active blobovnicza is always opened. - b.activeMtx.RLock() - active, ok := b.active[lvlPath] - b.activeMtx.RUnlock() - - if ok && tryActive { - res, err := b.getObjectRange(ctx, active.blz, prm) - switch { - case err == nil, - isErrOutOfRange(err): - return res, err - default: - if !client.IsErrObjectNotFound(err) { - b.log.Debug(logs.BlobovniczatreeCouldNotReadPayloadRangeFromActiveBlobovnicza, - zap.String("path", blzPath), - zap.String("error", err.Error()), - ) - } - } - } - - // then object is possibly placed in closed blobovnicza - - // check if it makes sense to try to open the blob - // (Blobovniczas "after" the active one are empty anyway, - // and it's pointless to open them). - if u64FromHexString(filepath.Base(blzPath)) > active.ind { - return common.GetRangeRes{}, logicerr.Wrap(new(apistatus.ObjectNotFound)) - } - +func (b *Blobovniczas) getRangeFromLevel(ctx context.Context, prm common.GetRangePrm, blzPath string) (common.GetRangeRes, error) { // open blobovnicza (cached inside) - blz, err := b.openBlobovnicza(blzPath) + shBlz := b.openBlobovnicza(blzPath) + blz, err := shBlz.Open() if err != nil { return common.GetRangeRes{}, err } + defer shBlz.Close() return b.getObjectRange(ctx, blz, prm) } diff --git a/pkg/local_object_storage/blobstor/blobovniczatree/iterate.go b/pkg/local_object_storage/blobstor/blobovniczatree/iterate.go index aeac854f7..cc273839a 100644 --- a/pkg/local_object_storage/blobstor/blobovniczatree/iterate.go +++ b/pkg/local_object_storage/blobstor/blobovniczatree/iterate.go @@ -68,13 +68,15 @@ func (b *Blobovniczas) Iterate(ctx context.Context, prm common.IteratePrm) (comm // iterator over all Blobovniczas in unsorted order. Break on f's error return. func (b *Blobovniczas) iterateBlobovniczas(ctx context.Context, ignoreErrors bool, f func(string, *blobovnicza.Blobovnicza) error) error { return b.iterateLeaves(ctx, func(p string) (bool, error) { - blz, err := b.openBlobovnicza(p) + shBlz := b.openBlobovnicza(p) + blz, err := shBlz.Open() if err != nil { if ignoreErrors { return false, nil } return false, fmt.Errorf("could not open blobovnicza %s: %w", p, err) } + defer shBlz.Close() err = f(p, blz) diff --git a/pkg/local_object_storage/blobstor/blobovniczatree/manager.go b/pkg/local_object_storage/blobstor/blobovniczatree/manager.go new file mode 100644 index 000000000..941d9961d --- /dev/null +++ b/pkg/local_object_storage/blobstor/blobovniczatree/manager.go @@ -0,0 +1,243 @@ +package blobovniczatree + +import ( + "fmt" + "path/filepath" + "sync" + "sync/atomic" + + "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobovnicza" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" + "go.uber.org/zap" +) + +// sharedDB is responsible for opening and closing a file of single blobovnicza. +type sharedDB struct { + guard *sync.RWMutex + blcza *blobovnicza.Blobovnicza + refCount uint32 + + openDBCounter *openDBCounter + closedFlag *atomic.Bool + options []blobovnicza.Option + path string + readOnly bool + metrics blobovnicza.Metrics + log *logger.Logger +} + +func newSharedDB(options []blobovnicza.Option, path string, readOnly bool, + metrics blobovnicza.Metrics, openDBCounter *openDBCounter, closedFlag *atomic.Bool, log *logger.Logger) *sharedDB { + return &sharedDB{ + guard: &sync.RWMutex{}, + + options: options, + path: path, + readOnly: readOnly, + metrics: metrics, + closedFlag: closedFlag, + log: log, + openDBCounter: openDBCounter, + } +} + +func (b *sharedDB) Open() (*blobovnicza.Blobovnicza, error) { + if b.closedFlag.Load() { + return nil, errClosed + } + + b.guard.Lock() + defer b.guard.Unlock() + + if b.refCount > 0 { + b.refCount++ + return b.blcza, nil + } + + blz := blobovnicza.New(append(b.options, + blobovnicza.WithReadOnly(b.readOnly), + blobovnicza.WithPath(b.path), + blobovnicza.WithMetrics(b.metrics), + )...) + + if err := blz.Open(); err != nil { + return nil, fmt.Errorf("could not open blobovnicza %s: %w", b.path, err) + } + if err := blz.Init(); err != nil { + return nil, fmt.Errorf("could not init blobovnicza %s: %w", b.path, err) + } + + b.refCount++ + b.blcza = blz + b.openDBCounter.Inc() + + return blz, nil +} + +func (b *sharedDB) Close() { + b.guard.Lock() + defer b.guard.Unlock() + + if b.refCount == 0 { + b.log.Error(logs.AttemtToCloseAlreadyClosedBlobovnicza, zap.String("id", b.path)) + return + } + + if b.refCount == 1 { + b.refCount = 0 + if err := b.blcza.Close(); err != nil { + b.log.Error(logs.BlobovniczatreeCouldNotCloseBlobovnicza, + zap.String("id", b.path), + zap.String("error", err.Error()), + ) + } + b.blcza = nil + b.openDBCounter.Dec() + return + } + + b.refCount-- +} + +func (b *sharedDB) Path() string { + return b.path +} + +// levelDbManager stores pointers of the sharedDB's for the leaf directory of the blobovnicza tree. +type levelDbManager struct { + databases []*sharedDB +} + +func newLevelDBManager(width uint64, options []blobovnicza.Option, rootPath string, lvlPath string, + readOnly bool, metrics blobovnicza.Metrics, openDBCounter *openDBCounter, closedFlog *atomic.Bool, log *logger.Logger) *levelDbManager { + result := &levelDbManager{ + databases: make([]*sharedDB, width), + } + var idx uint64 + for idx = 0; idx < width; idx++ { + result.databases[idx] = newSharedDB(options, filepath.Join(rootPath, lvlPath, u64ToHexString(idx)), readOnly, metrics, openDBCounter, closedFlog, log) + } + return result +} + +func (m *levelDbManager) GetByIndex(idx uint64) *sharedDB { + return m.databases[idx] +} + +// dbManager manages the opening and closing of blobovnicza instances. +// +// The blobovnicza opens at the first request, closes after the last request. +type dbManager struct { + levelToManager map[string]*levelDbManager + levelToManagerGuard *sync.RWMutex + closedFlag *atomic.Bool + dbCounter *openDBCounter + + rootPath string + options []blobovnicza.Option + readOnly bool + metrics blobovnicza.Metrics + leafWidth uint64 + log *logger.Logger +} + +func newDBManager(rootPath string, options []blobovnicza.Option, leafWidth uint64, readOnly bool, metrics blobovnicza.Metrics, log *logger.Logger) *dbManager { + return &dbManager{ + rootPath: rootPath, + options: options, + readOnly: readOnly, + metrics: metrics, + leafWidth: leafWidth, + levelToManager: make(map[string]*levelDbManager), + levelToManagerGuard: &sync.RWMutex{}, + log: log, + closedFlag: &atomic.Bool{}, + dbCounter: newOpenDBCounter(), + } +} + +func (m *dbManager) GetByPath(path string) *sharedDB { + lvlPath := filepath.Dir(path) + curIndex := u64FromHexString(filepath.Base(path)) + levelManager := m.getLevelManager(lvlPath) + return levelManager.GetByIndex(curIndex) +} + +func (m *dbManager) Open() { + m.closedFlag.Store(false) +} + +func (m *dbManager) Close() { + m.closedFlag.Store(true) + m.dbCounter.WaitUntilAllClosed() +} + +func (m *dbManager) getLevelManager(lvlPath string) *levelDbManager { + result := m.getLevelManagerIfExists(lvlPath) + if result != nil { + return result + } + return m.getOrCreateLevelManager(lvlPath) +} + +func (m *dbManager) getLevelManagerIfExists(lvlPath string) *levelDbManager { + m.levelToManagerGuard.RLock() + defer m.levelToManagerGuard.RUnlock() + + return m.levelToManager[lvlPath] +} + +func (m *dbManager) getOrCreateLevelManager(lvlPath string) *levelDbManager { + m.levelToManagerGuard.Lock() + defer m.levelToManagerGuard.Unlock() + + if result, ok := m.levelToManager[lvlPath]; ok { + return result + } + + result := newLevelDBManager(m.leafWidth, m.options, m.rootPath, lvlPath, m.readOnly, m.metrics, m.dbCounter, m.closedFlag, m.log) + m.levelToManager[lvlPath] = result + return result +} + +type openDBCounter struct { + cond *sync.Cond + count uint64 +} + +func newOpenDBCounter() *openDBCounter { + return &openDBCounter{ + cond: &sync.Cond{ + L: &sync.Mutex{}, + }, + } +} + +func (c *openDBCounter) Inc() { + c.cond.L.Lock() + defer c.cond.L.Unlock() + + c.count++ +} + +func (c *openDBCounter) Dec() { + c.cond.L.Lock() + defer c.cond.L.Unlock() + + if c.count > 0 { + c.count-- + } + + if c.count == 0 { + c.cond.Broadcast() + } +} + +func (c *openDBCounter) WaitUntilAllClosed() { + c.cond.L.Lock() + for c.count > 0 { + c.cond.Wait() + } + c.cond.L.Unlock() +} diff --git a/pkg/local_object_storage/blobstor/blobovniczatree/put.go b/pkg/local_object_storage/blobstor/blobovniczatree/put.go index 038d5244d..aae280e0b 100644 --- a/pkg/local_object_storage/blobstor/blobovniczatree/put.go +++ b/pkg/local_object_storage/blobstor/blobovniczatree/put.go @@ -2,7 +2,6 @@ package blobovniczatree import ( "context" - "errors" "path/filepath" "time" @@ -10,7 +9,6 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobovnicza" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" - "go.etcd.io/bbolt" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" "go.uber.org/zap" @@ -76,8 +74,8 @@ type putIterator struct { PutPrm blobovnicza.PutPrm } -func (i *putIterator) iterate(ctx context.Context, path string) (bool, error) { - active, err := i.B.getActivated(path) +func (i *putIterator) iterate(ctx context.Context, lvlPath string) (bool, error) { + active, err := i.B.activeDBManager.GetOpenedActiveDBForLevel(lvlPath) if err != nil { if !isLogical(err) { i.B.reportError(logs.BlobovniczatreeCouldNotGetActiveBlobovnicza, err) @@ -89,46 +87,29 @@ func (i *putIterator) iterate(ctx context.Context, path string) (bool, error) { return false, nil } - if _, err := active.blz.Put(ctx, i.PutPrm); err != nil { - // Check if blobovnicza is full. We could either receive `blobovnicza.ErrFull` error - // or update active blobovnicza in other thread. In the latter case the database will be closed - // and `updateActive` takes care of not updating the active blobovnicza twice. - if isFull := errors.Is(err, blobovnicza.ErrFull); isFull || errors.Is(err, bbolt.ErrDatabaseNotOpen) { - if isFull { - i.B.log.Debug(logs.BlobovniczatreeBlobovniczaOverflowed, - zap.String("path", filepath.Join(path, u64ToHexString(active.ind)))) - } + if active == nil { + i.B.log.Debug(logs.BlobovniczatreeBlobovniczaOverflowed, zap.String("level", lvlPath)) + return false, nil + } + defer active.Close() - if err := i.B.updateActive(path, &active.ind); err != nil { - if !isLogical(err) { - i.B.reportError(logs.BlobovniczatreeCouldNotUpdateActiveBlobovnicza, err) - } else { - i.B.log.Debug(logs.BlobovniczatreeCouldNotUpdateActiveBlobovnicza, - zap.String("level", path), - zap.String("error", err.Error())) - } + i.AllFull = false - return false, nil - } - - return i.iterate(ctx, path) - } - - i.AllFull = false + _, err = active.Blobovnicza().Put(ctx, i.PutPrm) + if err != nil { if !isLogical(err) { i.B.reportError(logs.BlobovniczatreeCouldNotPutObjectToActiveBlobovnicza, err) } else { i.B.log.Debug(logs.BlobovniczatreeCouldNotPutObjectToActiveBlobovnicza, - zap.String("path", filepath.Join(path, u64ToHexString(active.ind))), + zap.String("path", active.Path()), zap.String("error", err.Error())) } return false, nil } - path = filepath.Join(path, u64ToHexString(active.ind)) - - i.ID = blobovnicza.NewIDFromBytes([]byte(path)) + idx := u64FromHexString(filepath.Base(active.Path())) + i.ID = blobovnicza.NewIDFromBytes([]byte(filepath.Join(lvlPath, u64ToHexString(idx)))) return true, nil }