frostfs-node/pkg/local_object_storage/blobstor/blobovniczatree/blobovnicza.go

275 lines
7.9 KiB
Go
Raw Normal View History

package blobovniczatree
import (
"errors"
"fmt"
"path/filepath"
"strconv"
"sync"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobovnicza"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/compression"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"git.frostfs.info/TrueCloudLab/hrw"
"github.com/hashicorp/golang-lru/v2/simplelru"
"go.uber.org/zap"
)
// Blobovniczas represents the storage of the "small" objects.
//
// Each object is stored in Blobovnicza's (B-s).
// B-s are structured in a multilevel directory hierarchy
// with fixed depth and width (configured by BlobStor).
//
// Example (width = 4, depth = 3):
//
// x===============================x
// |[0] [1] [2] [3]|
// | \ / |
// | \ / |
// | \ / |
// | \ / |
// |[0] [1] [2] [3]|
// | | / |
// | | / |
// | | / |
// | | / |
// |[0](F) [1](A) [X] [X]|
// x===============================x
//
// Elements of the deepest level are B-s.
// B-s are allocated dynamically. At each moment of the time there is
// an active B (ex. A), set of already filled B-s (ex. F) and
// a list of not yet initialized B-s (ex. X). After filling the active B
// it becomes full, and next B becomes initialized and active.
//
// Active B and some of the full B-s are cached (LRU). All cached
// B-s are intitialized and opened.
//
// Object is saved as follows:
// 1. at each level, according to HRW, the next one is selected and
// dives into it until we reach the deepest;
// 2. at the B-s level object is saved to the active B. If active B
// is full, next B is opened, initialized and cached. If there
// is no more X candidates, goto 1 and process next level.
//
// After the object is saved in B, path concatenation is returned
// in system path format as B identifier (ex. "0/1/1" or "3/2/1").
type Blobovniczas struct {
cfg
// cache of opened filled Blobovniczas
opened *simplelru.LRU[string, *blobovnicza.Blobovnicza]
// lruMtx protects opened cache.
// It isn't RWMutex because `Get` calls must
// lock this mutex on write, as LRU info is updated.
// It must be taken after activeMtx in case when eviction is possible
// i.e. `Add`, `Purge` and `Remove` calls.
lruMtx sync.Mutex
// mutex to exclude parallel bbolt.Open() calls
// bbolt.Open() deadlocks if it tries to open already opened file
openMtx sync.Mutex
// list of active (opened, non-filled) Blobovniczas
activeMtx sync.RWMutex
active map[string]blobovniczaWithIndex
}
type blobovniczaWithIndex struct {
ind uint64
blz *blobovnicza.Blobovnicza
}
var _ common.Storage = (*Blobovniczas)(nil)
var errPutFailed = errors.New("could not save the object in any blobovnicza")
// NewBlobovniczaTree returns new instance of blobovniczas tree.
func NewBlobovniczaTree(opts ...Option) (blz *Blobovniczas) {
blz = new(Blobovniczas)
initConfig(&blz.cfg)
for i := range opts {
opts[i](&blz.cfg)
}
if blz.blzLeafWidth == 0 {
blz.blzLeafWidth = blz.blzShallowWidth
}
cache, err := simplelru.NewLRU[string, *blobovnicza.Blobovnicza](blz.openedCacheSize, func(p string, value *blobovnicza.Blobovnicza) {
lvlPath := filepath.Dir(p)
if b, ok := blz.active[lvlPath]; ok && b.ind == u64FromHexString(filepath.Base(p)) {
// This branch is taken if we have recently updated active blobovnicza and remove
// it from opened cache.
return
} else if err := value.Close(); err != nil {
blz.log.Error(logs.BlobovniczatreeCouldNotCloseBlobovnicza,
zap.String("id", p),
zap.String("error", err.Error()),
)
} else {
blz.log.Debug(logs.BlobovniczatreeBlobovniczaSuccessfullyClosedOnEvict,
zap.String("id", p),
)
}
})
if err != nil {
// occurs only if the size is not positive
panic(fmt.Errorf("could not create LRU cache of size %d: %w", blz.openedCacheSize, err))
}
activeMapCapacity := uint64(1)
for i := uint64(0); i < blz.blzShallowDepth; i++ {
if i+1 == blz.blzShallowDepth {
activeMapCapacity *= blz.blzLeafWidth
} else {
activeMapCapacity *= blz.blzShallowWidth
}
}
blz.opened = cache
blz.active = make(map[string]blobovniczaWithIndex, activeMapCapacity)
return blz
}
// activates and returns activated blobovnicza of p-level (dir).
//
// returns error if blobvnicza could not be activated.
func (b *Blobovniczas) getActivated(lvlPath string) (blobovniczaWithIndex, error) {
return b.updateAndGet(lvlPath, nil)
}
// updates active blobovnicza of p-level (dir).
//
// if current active blobovnicza's index is not old, it remains unchanged.
func (b *Blobovniczas) updateActive(lvlPath string, old *uint64) error {
b.log.Debug(logs.BlobovniczatreeUpdatingActiveBlobovnicza, zap.String("path", lvlPath))
_, err := b.updateAndGet(lvlPath, old)
b.log.Debug(logs.BlobovniczatreeActiveBlobovniczaSuccessfullyUpdated, zap.String("path", lvlPath))
return err
}
// updates and returns active blobovnicza of p-level (dir).
//
// if current active blobovnicza's index is not old, it is returned unchanged.
func (b *Blobovniczas) updateAndGet(lvlPath string, old *uint64) (blobovniczaWithIndex, error) {
b.activeMtx.RLock()
active, ok := b.active[lvlPath]
b.activeMtx.RUnlock()
if ok {
if old != nil {
if active.ind == b.blzLeafWidth-1 {
return active, logicerr.New("no more Blobovniczas")
} else if active.ind != *old {
// sort of CAS in order to control concurrent
// updateActive calls
return active, nil
}
} else {
return active, nil
}
active.ind++
}
var err error
if active.blz, err = b.openBlobovnicza(filepath.Join(lvlPath, u64ToHexString(active.ind))); err != nil {
return active, err
}
b.activeMtx.Lock()
defer b.activeMtx.Unlock()
// check 2nd time to find out if it blobovnicza was activated while thread was locked
tryActive, ok := b.active[lvlPath]
if ok && tryActive.blz == active.blz {
return tryActive, nil
}
// Remove from opened cache (active blobovnicza should always be opened).
// Because `onEvict` callback is called in `Remove`, we need to update
// active map beforehand.
b.active[lvlPath] = active
activePath := filepath.Join(lvlPath, u64ToHexString(active.ind))
b.lruMtx.Lock()
b.opened.Remove(activePath)
if ok {
b.opened.Add(filepath.Join(lvlPath, u64ToHexString(tryActive.ind)), tryActive.blz)
}
b.lruMtx.Unlock()
b.log.Debug(logs.BlobovniczatreeBlobovniczaSuccessfullyActivated,
zap.String("path", activePath))
return active, nil
}
// returns hash of the object address.
func addressHash(addr *oid.Address, path string) uint64 {
var a string
if addr != nil {
a = addr.EncodeToString()
}
return hrw.StringHash(a + path)
}
// converts uint64 to hex string.
func u64ToHexString(ind uint64) string {
return strconv.FormatUint(ind, 16)
}
// converts uint64 hex string to uint64.
func u64FromHexString(str string) uint64 {
v, err := strconv.ParseUint(str, 16, 64)
if err != nil {
panic(fmt.Sprintf("blobovnicza name is not an index %s", str))
}
return v
}
// Type is blobovniczatree storage type used in logs and configuration.
const Type = "blobovnicza"
// Type implements common.Storage.
func (b *Blobovniczas) Type() string {
return Type
}
// Path implements common.Storage.
func (b *Blobovniczas) Path() string {
return b.rootPath
}
// SetCompressor implements common.Storage.
func (b *Blobovniczas) SetCompressor(cc *compression.Config) {
b.compression = cc
}
func (b *Blobovniczas) Compressor() *compression.Config {
return b.compression
}
// SetReportErrorFunc implements common.Storage.
func (b *Blobovniczas) SetReportErrorFunc(f func(string, error)) {
b.reportError = f
}
func (b *Blobovniczas) SetParentID(parentID string) {
b.metrics.SetParentID(parentID)
}