blobovnicza: Use TTL for blobovnicza tree cache #1017
26 changed files with 201 additions and 65 deletions
|
@ -61,6 +61,8 @@ storage:
|
|||
depth: 1 # max depth of object tree storage in key-value DB
|
||||
width: 4 # max width of object tree storage in key-value DB
|
||||
opened_cache_capacity: 50 # maximum number of opened database files
|
||||
opened_cache_ttl: 5m # ttl for opened database file
|
||||
opened_cache_exp_interval: 15s # cache cleanup interval for expired blobovnicza's
|
||||
|
||||
gc:
|
||||
remover_batch_size: 200 # number of objects to be removed by the garbage collector
|
||||
|
|
|
@ -185,13 +185,15 @@ type subStorageCfg struct {
|
|||
noSync bool
|
||||
|
||||
// blobovnicza-specific
|
||||
size uint64
|
||||
width uint64
|
||||
leafWidth uint64
|
||||
openedCacheSize int
|
||||
initWorkerCount int
|
||||
initInAdvance bool
|
||||
rebuildDropTimeout time.Duration
|
||||
size uint64
|
||||
width uint64
|
||||
leafWidth uint64
|
||||
openedCacheSize int
|
||||
initWorkerCount int
|
||||
initInAdvance bool
|
||||
rebuildDropTimeout time.Duration
|
||||
openedCacheTTL time.Duration
|
||||
openedCacheExpInterval time.Duration
|
||||
}
|
||||
|
||||
// readConfig fills applicationConfiguration with raw configuration values
|
||||
|
@ -313,6 +315,8 @@ func (a *applicationConfiguration) setShardStorageConfig(newConfig *shardCfg, ol
|
|||
sCfg.width = sub.ShallowWidth()
|
||||
sCfg.leafWidth = sub.LeafWidth()
|
||||
sCfg.openedCacheSize = sub.OpenedCacheSize()
|
||||
sCfg.openedCacheTTL = sub.OpenedCacheTTL()
|
||||
sCfg.openedCacheExpInterval = sub.OpenedCacheExpInterval()
|
||||
sCfg.initWorkerCount = sub.InitWorkerCount()
|
||||
sCfg.initInAdvance = sub.InitInAdvance()
|
||||
sCfg.rebuildDropTimeout = sub.RebuildDropTimeout()
|
||||
|
@ -846,11 +850,11 @@ type shardOptsWithID struct {
|
|||
shOpts []shard.Option
|
||||
}
|
||||
|
||||
func (c *cfg) shardOpts() []shardOptsWithID {
|
||||
func (c *cfg) shardOpts(ctx context.Context) []shardOptsWithID {
|
||||
shards := make([]shardOptsWithID, 0, len(c.EngineCfg.shards))
|
||||
|
||||
for _, shCfg := range c.EngineCfg.shards {
|
||||
shards = append(shards, c.getShardOpts(shCfg))
|
||||
shards = append(shards, c.getShardOpts(ctx, shCfg))
|
||||
}
|
||||
|
||||
return shards
|
||||
|
@ -891,7 +895,7 @@ func (c *cfg) getPiloramaOpts(shCfg shardCfg) []pilorama.Option {
|
|||
return piloramaOpts
|
||||
}
|
||||
|
||||
func (c *cfg) getSubstorageOpts(shCfg shardCfg) []blobstor.SubStorage {
|
||||
func (c *cfg) getSubstorageOpts(ctx context.Context, shCfg shardCfg) []blobstor.SubStorage {
|
||||
var ss []blobstor.SubStorage
|
||||
for _, sRead := range shCfg.subStorages {
|
||||
switch sRead.typ {
|
||||
|
@ -904,6 +908,8 @@ func (c *cfg) getSubstorageOpts(shCfg shardCfg) []blobstor.SubStorage {
|
|||
blobovniczatree.WithBlobovniczaShallowWidth(sRead.width),
|
||||
blobovniczatree.WithBlobovniczaLeafWidth(sRead.leafWidth),
|
||||
blobovniczatree.WithOpenedCacheSize(sRead.openedCacheSize),
|
||||
blobovniczatree.WithOpenedCacheTTL(sRead.openedCacheTTL),
|
||||
blobovniczatree.WithOpenedCacheExpInterval(sRead.openedCacheExpInterval),
|
||||
blobovniczatree.WithInitWorkerCount(sRead.initWorkerCount),
|
||||
blobovniczatree.WithInitInAdvance(sRead.initInAdvance),
|
||||
blobovniczatree.WithWaitBeforeDropDB(sRead.rebuildDropTimeout),
|
||||
|
@ -919,7 +925,7 @@ func (c *cfg) getSubstorageOpts(shCfg shardCfg) []blobstor.SubStorage {
|
|||
)
|
||||
}
|
||||
ss = append(ss, blobstor.SubStorage{
|
||||
Storage: blobovniczatree.NewBlobovniczaTree(blobTreeOpts...),
|
||||
Storage: blobovniczatree.NewBlobovniczaTree(ctx, blobTreeOpts...),
|
||||
Policy: func(_ *objectSDK.Object, data []byte) bool {
|
||||
return uint64(len(data)) < shCfg.smallSizeObjectLimit
|
||||
},
|
||||
|
@ -954,10 +960,10 @@ func (c *cfg) getSubstorageOpts(shCfg shardCfg) []blobstor.SubStorage {
|
|||
return ss
|
||||
}
|
||||
|
||||
func (c *cfg) getShardOpts(shCfg shardCfg) shardOptsWithID {
|
||||
func (c *cfg) getShardOpts(ctx context.Context, shCfg shardCfg) shardOptsWithID {
|
||||
writeCacheOpts := c.getWriteCacheOpts(shCfg)
|
||||
piloramaOpts := c.getPiloramaOpts(shCfg)
|
||||
ss := c.getSubstorageOpts(shCfg)
|
||||
ss := c.getSubstorageOpts(ctx, shCfg)
|
||||
|
||||
blobstoreOpts := []blobstor.Option{
|
||||
blobstor.WithCompressObjects(shCfg.compress),
|
||||
|
@ -1049,7 +1055,7 @@ func initLocalStorage(ctx context.Context, c *cfg) {
|
|||
c.cfgObject.getSvc = new(getsvc.Service)
|
||||
|
||||
var shardsAttached int
|
||||
for _, optsWithMeta := range c.shardOpts() {
|
||||
for _, optsWithMeta := range c.shardOpts(ctx) {
|
||||
id, err := ls.AddShard(ctx, append(optsWithMeta.shOpts, shard.WithTombstoneSource(c.createTombstoneSource()))...)
|
||||
if err != nil {
|
||||
c.log.Error(logs.FrostFSNodeFailedToAttachShardToEngine, zap.Error(err))
|
||||
|
@ -1289,7 +1295,7 @@ func (c *cfg) reloadConfig(ctx context.Context) {
|
|||
// Storage Engine
|
||||
|
||||
var rcfg engine.ReConfiguration
|
||||
for _, optsWithID := range c.shardOpts() {
|
||||
for _, optsWithID := range c.shardOpts(ctx) {
|
||||
rcfg.AddShard(optsWithID.configID, append(optsWithID.shOpts, shard.WithTombstoneSource(c.createTombstoneSource())))
|
||||
}
|
||||
|
||||
|
|
|
@ -25,6 +25,12 @@ const (
|
|||
// OpenedCacheSizeDefault is a default cache size of opened Blobovnicza's.
|
||||
OpenedCacheSizeDefault = 16
|
||||
|
||||
// OpenedCacheTTLDefault is a default cache ttl of opened Blobovnicza's.
|
||||
OpenedCacheTTLDefault = 0 // means expiring is off
|
||||
|
||||
// OpenedCacheExpIntervalDefault is a default cache cleanup interval for expired Blobovnicza's.
|
||||
OpenedCacheExpIntervalDefault = 15 * time.Second
|
||||
|
||||
// InitWorkerCountDefault is a default workers count to initialize Blobovnicza's.
|
||||
InitWorkerCountDefault = 5
|
||||
|
||||
|
@ -106,6 +112,38 @@ func (x *Config) OpenedCacheSize() int {
|
|||
return OpenedCacheSizeDefault
|
||||
}
|
||||
|
||||
// OpenedCacheTTL returns the value of "opened_cache_ttl" config parameter.
|
||||
//
|
||||
// Returns OpenedCacheTTLDefault if the value is not a positive number.
|
||||
func (x *Config) OpenedCacheTTL() time.Duration {
|
||||
d := config.DurationSafe(
|
||||
(*config.Config)(x),
|
||||
"opened_cache_ttl",
|
||||
)
|
||||
|
||||
if d > 0 {
|
||||
return d
|
||||
}
|
||||
|
||||
return OpenedCacheTTLDefault
|
||||
}
|
||||
|
||||
// OpenedCacheExpInterval returns the value of "opened_cache_exp_interval" config parameter.
|
||||
//
|
||||
// Returns OpenedCacheExpIntervalDefault if the value is not a positive number.
|
||||
func (x *Config) OpenedCacheExpInterval() time.Duration {
|
||||
d := config.DurationSafe(
|
||||
(*config.Config)(x),
|
||||
"opened_cache_exp_interval",
|
||||
)
|
||||
|
||||
if d > 0 {
|
||||
return d
|
||||
}
|
||||
|
||||
return OpenedCacheExpIntervalDefault
|
||||
}
|
||||
|
||||
// BoltDB returns config instance for querying bolt db specific parameters.
|
||||
func (x *Config) BoltDB() *boltdbconfig.Config {
|
||||
return (*boltdbconfig.Config)(x)
|
||||
|
|
|
@ -128,6 +128,8 @@ FROSTFS_STORAGE_SHARD_0_BLOBSTOR_0_SIZE=4194304
|
|||
FROSTFS_STORAGE_SHARD_0_BLOBSTOR_0_DEPTH=1
|
||||
FROSTFS_STORAGE_SHARD_0_BLOBSTOR_0_WIDTH=4
|
||||
FROSTFS_STORAGE_SHARD_0_BLOBSTOR_0_OPENED_CACHE_CAPACITY=50
|
||||
FROSTFS_STORAGE_SHARD_0_BLOBSTOR_0_OPENED_CACHE_TTL=5m
|
||||
FROSTFS_STORAGE_SHARD_0_BLOBSTOR_0_OPENED_CACHE_EXP_INTERVAL=15s
|
||||
FROSTFS_STORAGE_SHARD_0_BLOBSTOR_0_LEAF_WIDTH=10
|
||||
FROSTFS_STORAGE_SHARD_0_BLOBSTOR_0_INIT_WORKER_COUNT=10
|
||||
FROSTFS_STORAGE_SHARD_0_BLOBSTOR_0_INIT_IN_ADVANCE=TRUE
|
||||
|
@ -178,6 +180,8 @@ FROSTFS_STORAGE_SHARD_1_BLOBSTOR_0_SIZE=4194304
|
|||
FROSTFS_STORAGE_SHARD_1_BLOBSTOR_0_DEPTH=1
|
||||
FROSTFS_STORAGE_SHARD_1_BLOBSTOR_0_WIDTH=4
|
||||
FROSTFS_STORAGE_SHARD_1_BLOBSTOR_0_OPENED_CACHE_CAPACITY=50
|
||||
FROSTFS_STORAGE_SHARD_1_BLOBSTOR_0_OPENED_CACHE_TTL=5m
|
||||
FROSTFS_STORAGE_SHARD_1_BLOBSTOR_0_OPENED_CACHE_EXP_INTERVAL=15s
|
||||
FROSTFS_STORAGE_SHARD_1_BLOBSTOR_0_LEAF_WIDTH=10
|
||||
### FSTree config
|
||||
FROSTFS_STORAGE_SHARD_1_BLOBSTOR_1_TYPE=fstree
|
||||
|
|
|
@ -176,6 +176,8 @@
|
|||
"depth": 1,
|
||||
"width": 4,
|
||||
"opened_cache_capacity": 50,
|
||||
"opened_cache_ttl": "5m",
|
||||
"opened_cache_exp_interval": "15s",
|
||||
"leaf_width": 10,
|
||||
"init_worker_count": 10,
|
||||
"init_in_advance": true,
|
||||
|
@ -229,6 +231,8 @@
|
|||
"depth": 1,
|
||||
"width": 4,
|
||||
"opened_cache_capacity": 50,
|
||||
"opened_cache_ttl": "5m",
|
||||
"opened_cache_exp_interval": "15s",
|
||||
"leaf_width": 10
|
||||
},
|
||||
{
|
||||
|
|
|
@ -151,6 +151,8 @@ storage:
|
|||
depth: 1 # max depth of object tree storage in key-value DB
|
||||
width: 4 # max width of object tree storage in key-value DB
|
||||
opened_cache_capacity: 50 # maximum number of opened database files
|
||||
opened_cache_ttl: 5m # ttl for opened database file
|
||||
opened_cache_exp_interval: 15s # cache cleanup interval for expired blobovnicza's
|
||||
leaf_width: 10 # max count of key-value DB on leafs of object tree storage
|
||||
- perm: 0644 # permissions for blobstor files(directories: +x for current user and group)
|
||||
depth: 5 # max depth of object tree storage in FS
|
||||
|
|
|
@ -212,6 +212,8 @@ blobstor:
|
|||
depth: 1
|
||||
width: 4
|
||||
opened_cache_capacity: 50
|
||||
opened_cache_ttl: 5m
|
||||
opened_cache_exp_interval: 15s
|
||||
```
|
||||
|
||||
#### Common options for sub-storages
|
||||
|
@ -228,17 +230,19 @@ blobstor:
|
|||
| `depth` | `int` | `4` | File-system tree depth. |
|
||||
|
||||
#### `blobovnicza` type options
|
||||
| Parameter | Type | Default value | Description |
|
||||
| ----------------------- | ---------- | ------------- | --------------------------------------------------------------------- |
|
||||
| `path` | `string` | | Path to the root of the blobstor. |
|
||||
| `perm` | file mode | `0660` | Default permission for created files and directories. |
|
||||
| `size` | `size` | `1 G` | Maximum size of a single blobovnicza |
|
||||
| `depth` | `int` | `2` | Blobovnicza tree depth. |
|
||||
| `width` | `int` | `16` | Blobovnicza tree width. |
|
||||
| `opened_cache_capacity` | `int` | `16` | Maximum number of simultaneously opened blobovniczas. |
|
||||
| `init_worker_count` | `int` | `5` | Maximum number of concurrent initialization workers. |
|
||||
| `init_in_advance` | `bool` | `false` | If `true`, than all the blobovnicza files will be created on startup. |
|
||||
| `rebuild_drop_timeout` | `duration` | `10s` | Timeout before drop empty blobovnicza file during rebuild. |
|
||||
| Parameter | Type | Default value | Description |
|
||||
|-----------------------------| ---------- |---------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
|
||||
| `path` | `string` | | Path to the root of the blobstor. |
|
||||
| `perm` | file mode | `0660` | Default permission for created files and directories. |
|
||||
| `size` | `size` | `1 G` | Maximum size of a single blobovnicza |
|
||||
| `depth` | `int` | `2` | Blobovnicza tree depth. |
|
||||
| `width` | `int` | `16` | Blobovnicza tree width. |
|
||||
| `opened_cache_capacity` | `int` | `16` | Maximum number of simultaneously opened blobovniczas. |
|
||||
| `opened_cache_ttl` | `duration` | `0` | TTL in cache for opened blobovniczas(disabled by default). In case of heavy random-read and 10 shards each with 10_000 databases and accessing 400 objects per-second we will access each db approximately once per ((10 * 10_000 / 400) = 250 seconds <= 300 seconds = 5 min). Also take in mind that in this scenario they will probably be closed earlier because of the cache capacity, so bigger values are likely to be of no use. |
|
||||
| `opened_cache_exp_interval` | `duration` | `15s` | Cache cleanup interval for expired blobovnicza's. |
|
||||
| `init_worker_count` | `int` | `5` | Maximum number of concurrent initialization workers. |
|
||||
| `init_in_advance` | `bool` | `false` | If `true`, than all the blobovnicza files will be created on startup. |
|
||||
| `rebuild_drop_timeout` | `duration` | `10s` | Timeout before drop empty blobovnicza file during rebuild. |
|
||||
|
||||
### `gc` subsection
|
||||
|
||||
|
|
1
go.mod
1
go.mod
|
@ -16,6 +16,7 @@ require (
|
|||
github.com/chzyer/readline v1.5.1
|
||||
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc
|
||||
github.com/flynn-archive/go-shlex v0.0.0-20150515145356-3f9db97f8568
|
||||
github.com/go-pkgz/expirable-cache/v3 v3.0.0
|
||||
github.com/google/uuid v1.6.0
|
||||
github.com/hashicorp/golang-lru/v2 v2.0.7
|
||||
github.com/klauspost/compress v1.17.4
|
||||
|
|
BIN
go.sum
BIN
go.sum
Binary file not shown.
|
@ -1,6 +1,7 @@
|
|||
package blobovniczatree
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
@ -72,7 +73,7 @@ const (
|
|||
)
|
||||
|
||||
// NewBlobovniczaTree returns new instance of blobovniczas tree.
|
||||
func NewBlobovniczaTree(opts ...Option) (blz *Blobovniczas) {
|
||||
func NewBlobovniczaTree(ctx context.Context, opts ...Option) (blz *Blobovniczas) {
|
||||
blz = new(Blobovniczas)
|
||||
initConfig(&blz.cfg)
|
||||
|
||||
|
@ -86,7 +87,8 @@ func NewBlobovniczaTree(opts ...Option) (blz *Blobovniczas) {
|
|||
|
||||
blz.commondbManager = newDBManager(blz.rootPath, blz.blzOpts, blz.readOnly, blz.metrics.Blobovnicza(), blz.log)
|
||||
blz.activeDBManager = newActiveDBManager(blz.commondbManager, blz.blzLeafWidth)
|
||||
blz.dbCache = newDBCache(blz.openedCacheSize, blz.commondbManager)
|
||||
blz.dbCache = newDBCache(ctx, blz.openedCacheSize,
|
||||
blz.openedCacheTTL, blz.openedCacheExpInterval, blz.commondbManager)
|
||||
blz.deleteProtectedObjects = newAddressMap()
|
||||
blz.dbFilesGuard = &sync.RWMutex{}
|
||||
blz.rebuildGuard = &sync.RWMutex{}
|
||||
|
|
|
@ -1,11 +1,12 @@
|
|||
package blobovniczatree
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"context"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
utilSync "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/sync"
|
||||
"github.com/hashicorp/golang-lru/v2/simplelru"
|
||||
cache "github.com/go-pkgz/expirable-cache/v3"
|
||||
)
|
||||
|
||||
// dbCache caches sharedDB instances that are NOT open for Put.
|
||||
|
@ -13,30 +14,55 @@ import (
|
|||
// Uses dbManager for opening/closing sharedDB instances.
|
||||
// Stores a reference to an cached sharedDB, so dbManager does not close it.
|
||||
type dbCache struct {
|
||||
cacheGuard *sync.RWMutex
|
||||
cache simplelru.LRUCache[string, *sharedDB]
|
||||
cacheGuard *sync.Mutex
|
||||
cache cache.Cache[string, *sharedDB]
|
||||
pathLock *utilSync.KeyLocker[string] // the order of locks is important: pathLock first, cacheGuard second
|
||||
closed bool
|
||||
nonCached map[string]struct{}
|
||||
wg sync.WaitGroup
|
||||
cancel context.CancelFunc
|
||||
|
||||
dbManager *dbManager
|
||||
}
|
||||
|
||||
func newDBCache(size int, dbManager *dbManager) *dbCache {
|
||||
cache, err := simplelru.NewLRU(size, func(_ string, evictedDB *sharedDB) {
|
||||
evictedDB.Close()
|
||||
})
|
||||
if err != nil {
|
||||
// occurs only if the size is not positive
|
||||
panic(fmt.Errorf("could not create LRU cache of size %d: %w", size, err))
|
||||
}
|
||||
return &dbCache{
|
||||
cacheGuard: &sync.RWMutex{},
|
||||
cache: cache,
|
||||
func newDBCache(parentCtx context.Context, size int,
|
||||
ttl time.Duration, expInterval time.Duration,
|
||||
dbManager *dbManager,
|
||||
) *dbCache {
|
||||
ch := cache.NewCache[string, *sharedDB]().
|
||||
WithTTL(ttl).WithLRU().WithMaxKeys(size).
|
||||
WithOnEvicted(func(_ string, db *sharedDB) {
|
||||
db.Close()
|
||||
})
|
||||
ctx, cancel := context.WithCancel(parentCtx)
|
||||
res := &dbCache{
|
||||
dstepanov-yadro marked this conversation as resolved
Outdated
|
||||
cacheGuard: &sync.Mutex{},
|
||||
wg: sync.WaitGroup{},
|
||||
cancel: cancel,
|
||||
cache: ch,
|
||||
dbManager: dbManager,
|
||||
pathLock: utilSync.NewKeyLocker[string](),
|
||||
nonCached: make(map[string]struct{}),
|
||||
}
|
||||
if ttl > 0 {
|
||||
fyrchik
commented
Magic constant Magic constant
Also, why 100?
fyrchik
commented
IMO some fixed interval is ok, so that we can say blobovnicza is closed after the time specified in config + e.g. 15 seconds. IMO some fixed interval is ok, so that we can say blobovnicza is closed after the time specified in config + e.g. 15 seconds.
fyrchik
commented
@acid-ant
acid-ant
commented
Thanks for the reminder, updated. Thanks for the reminder, updated.
|
||||
res.wg.Add(1)
|
||||
go func() {
|
||||
ticker := time.NewTicker(expInterval)
|
||||
defer ticker.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
res.wg.Done()
|
||||
return
|
||||
case <-ticker.C:
|
||||
res.cacheGuard.Lock()
|
||||
res.cache.DeleteExpired()
|
||||
res.cacheGuard.Unlock()
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
return res
|
||||
}
|
||||
|
||||
func (c *dbCache) Open() {
|
||||
|
@ -49,6 +75,8 @@ func (c *dbCache) Open() {
|
|||
func (c *dbCache) Close() {
|
||||
c.cacheGuard.Lock()
|
||||
defer c.cacheGuard.Unlock()
|
||||
c.cancel()
|
||||
c.wg.Wait()
|
||||
c.cache.Purge()
|
||||
c.closed = true
|
||||
}
|
||||
|
@ -88,6 +116,8 @@ func (c *dbCache) getExisted(path string) *sharedDB {
|
|||
|
||||
if value, ok := c.cache.Get(path); ok {
|
||||
return value
|
||||
} else if value != nil {
|
||||
c.cache.Invalidate(path)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
@ -119,10 +149,9 @@ func (c *dbCache) put(path string, db *sharedDB) bool {
|
|||
|
||||
dstepanov-yadro
commented
It must be It must be `if isNonCached || c.closed {`
acid-ant
commented
Thanks, fixed. Thanks, fixed.
|
||||
_, isNonCached := c.nonCached[path]
|
||||
|
||||
if !isNonCached && !c.closed {
|
||||
c.cache.Add(path, db)
|
||||
return true
|
||||
if isNonCached || c.closed {
|
||||
return false
|
||||
}
|
||||
|
||||
return false
|
||||
c.cache.Add(path, db)
|
||||
return true
|
||||
}
|
||||
|
|
|
@ -17,6 +17,7 @@ func TestBlobovniczaTree_Concurrency(t *testing.T) {
|
|||
const n = 1000
|
||||
|
||||
st := NewBlobovniczaTree(
|
||||
context.Background(),
|
||||
WithLogger(test.NewLogger(t)),
|
||||
WithObjectSizeLimit(1024),
|
||||
WithBlobovniczaShallowWidth(10),
|
||||
|
|
|
@ -49,6 +49,7 @@ func createTestTree(t *testing.T, currentDepth, depth, width uint64, path string
|
|||
|
||||
func openAndCloseTestTree(t *testing.T, depth, width uint64, path string) {
|
||||
blz := NewBlobovniczaTree(
|
||||
context.Background(),
|
||||
WithBlobovniczaShallowDepth(depth),
|
||||
WithBlobovniczaShallowWidth(width),
|
||||
WithRootPath(path),
|
||||
|
@ -78,6 +79,7 @@ func TestObjectsAvailableAfterDepthAndWidthEdit(t *testing.T) {
|
|||
rootDir := t.TempDir()
|
||||
|
||||
blz := NewBlobovniczaTree(
|
||||
context.Background(),
|
||||
WithBlobovniczaShallowDepth(3),
|
||||
WithBlobovniczaShallowWidth(5),
|
||||
WithRootPath(rootDir),
|
||||
|
@ -115,6 +117,7 @@ func TestObjectsAvailableAfterDepthAndWidthEdit(t *testing.T) {
|
|||
|
||||
// change depth and width
|
||||
blz = NewBlobovniczaTree(
|
||||
context.Background(),
|
||||
WithBlobovniczaShallowDepth(5),
|
||||
WithBlobovniczaShallowWidth(2),
|
||||
WithRootPath(rootDir),
|
||||
|
@ -152,6 +155,7 @@ func TestObjectsAvailableAfterDepthAndWidthEdit(t *testing.T) {
|
|||
|
||||
// change depth and width back
|
||||
blz = NewBlobovniczaTree(
|
||||
context.Background(),
|
||||
WithBlobovniczaShallowDepth(3),
|
||||
WithBlobovniczaShallowWidth(5),
|
||||
WithRootPath(rootDir),
|
||||
|
|
|
@ -17,6 +17,7 @@ import (
|
|||
func TestExistsInvalidStorageID(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
b := NewBlobovniczaTree(
|
||||
context.Background(),
|
||||
WithLogger(test.NewLogger(t)),
|
||||
WithObjectSizeLimit(1024),
|
||||
WithBlobovniczaShallowWidth(2),
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package blobovniczatree
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||
|
@ -13,6 +14,7 @@ func TestGeneric(t *testing.T) {
|
|||
|
||||
helper := func(t *testing.T, dir string) common.Storage {
|
||||
return NewBlobovniczaTree(
|
||||
context.Background(),
|
||||
WithLogger(test.NewLogger(t)),
|
||||
WithObjectSizeLimit(maxObjectSize),
|
||||
WithBlobovniczaShallowWidth(2),
|
||||
|
@ -40,6 +42,7 @@ func TestControl(t *testing.T) {
|
|||
|
||||
newTree := func(t *testing.T) common.Storage {
|
||||
return NewBlobovniczaTree(
|
||||
context.Background(),
|
||||
WithLogger(test.NewLogger(t)),
|
||||
WithObjectSizeLimit(maxObjectSize),
|
||||
WithBlobovniczaShallowWidth(2),
|
||||
|
|
|
@ -12,6 +12,7 @@ func TestIterateSortedLeavesAndDBPathsAreSame(t *testing.T) {
|
|||
t.Parallel()
|
||||
|
||||
blz := NewBlobovniczaTree(
|
||||
context.Background(),
|
||||
WithBlobovniczaShallowDepth(3),
|
||||
WithBlobovniczaShallowWidth(5),
|
||||
WithRootPath(t.TempDir()),
|
||||
|
|
|
@ -27,32 +27,40 @@ type cfg struct {
|
|||
blzInitWorkerCount int
|
||||
blzMoveBatchSize int
|
||||
createDBInAdvance bool
|
||||
// TTL for blobovnicza's cache
|
||||
openedCacheTTL time.Duration
|
||||
// Interval for deletion expired blobovnicza's
|
||||
openedCacheExpInterval time.Duration
|
||||
}
|
||||
|
||||
type Option func(*cfg)
|
||||
|
||||
const (
|
||||
defaultPerm = 0o700
|
||||
defaultOpenedCacheSize = 50
|
||||
defaultBlzShallowDepth = 2
|
||||
defaultBlzShallowWidth = 16
|
||||
defaultWaitBeforeDropDB = 10 * time.Second
|
||||
defaultBlzInitWorkerCount = 5
|
||||
defaulBlzMoveBatchSize = 10000
|
||||
defaultPerm = 0o700
|
||||
defaultOpenedCacheSize = 50
|
||||
defaultOpenedCacheTTL = 0 // means expiring is off
|
||||
defaultOpenedCacheInterval = 15 * time.Second
|
||||
defaultBlzShallowDepth = 2
|
||||
defaultBlzShallowWidth = 16
|
||||
defaultWaitBeforeDropDB = 10 * time.Second
|
||||
defaultBlzInitWorkerCount = 5
|
||||
defaulBlzMoveBatchSize = 10000
|
||||
)
|
||||
|
||||
func initConfig(c *cfg) {
|
||||
*c = cfg{
|
||||
log: &logger.Logger{Logger: zap.L()},
|
||||
perm: defaultPerm,
|
||||
openedCacheSize: defaultOpenedCacheSize,
|
||||
blzShallowDepth: defaultBlzShallowDepth,
|
||||
blzShallowWidth: defaultBlzShallowWidth,
|
||||
reportError: func(string, error) {},
|
||||
metrics: &noopMetrics{},
|
||||
waitBeforeDropDB: defaultWaitBeforeDropDB,
|
||||
blzInitWorkerCount: defaultBlzInitWorkerCount,
|
||||
blzMoveBatchSize: defaulBlzMoveBatchSize,
|
||||
log: &logger.Logger{Logger: zap.L()},
|
||||
perm: defaultPerm,
|
||||
openedCacheSize: defaultOpenedCacheSize,
|
||||
openedCacheTTL: defaultOpenedCacheTTL,
|
||||
openedCacheExpInterval: defaultOpenedCacheInterval,
|
||||
blzShallowDepth: defaultBlzShallowDepth,
|
||||
blzShallowWidth: defaultBlzShallowWidth,
|
||||
reportError: func(string, error) {},
|
||||
metrics: &noopMetrics{},
|
||||
waitBeforeDropDB: defaultWaitBeforeDropDB,
|
||||
blzInitWorkerCount: defaultBlzInitWorkerCount,
|
||||
blzMoveBatchSize: defaulBlzMoveBatchSize,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -105,6 +113,18 @@ func WithOpenedCacheSize(sz int) Option {
|
|||
}
|
||||
}
|
||||
|
||||
func WithOpenedCacheTTL(ttl time.Duration) Option {
|
||||
return func(c *cfg) {
|
||||
c.openedCacheTTL = ttl
|
||||
}
|
||||
}
|
||||
|
||||
func WithOpenedCacheExpInterval(expInterval time.Duration) Option {
|
||||
return func(c *cfg) {
|
||||
c.openedCacheExpInterval = expInterval
|
||||
}
|
||||
}
|
||||
|
||||
func WithObjectSizeLimit(sz uint64) Option {
|
||||
return func(c *cfg) {
|
||||
c.blzOpts = append(c.blzOpts, blobovnicza.WithObjectSizeLimit(sz))
|
||||
|
|
|
@ -129,6 +129,7 @@ func testRebuildFailoverObjectDeletedFromSource(t *testing.T) {
|
|||
|
||||
func testRebuildFailoverValidate(t *testing.T, dir string, obj *objectSDK.Object, mustUpdateStorageID bool) {
|
||||
b := NewBlobovniczaTree(
|
||||
context.Background(),
|
||||
WithLogger(test.NewLogger(t)),
|
||||
WithObjectSizeLimit(2048),
|
||||
WithBlobovniczaShallowWidth(2),
|
||||
|
|
|
@ -43,6 +43,7 @@ func TestBlobovniczaTreeRebuildLargeObject(t *testing.T) {
|
|||
|
||||
dir := t.TempDir()
|
||||
b := NewBlobovniczaTree(
|
||||
context.Background(),
|
||||
WithLogger(test.NewLogger(t)),
|
||||
WithObjectSizeLimit(64*1024), // 64KB object size limit
|
||||
WithBlobovniczaShallowWidth(5),
|
||||
|
@ -70,6 +71,7 @@ func TestBlobovniczaTreeRebuildLargeObject(t *testing.T) {
|
|||
require.NoError(t, b.Close())
|
||||
|
||||
b = NewBlobovniczaTree(
|
||||
context.Background(),
|
||||
WithLogger(test.NewLogger(t)),
|
||||
WithObjectSizeLimit(32*1024), // 32KB object size limit
|
||||
WithBlobovniczaShallowWidth(5),
|
||||
|
@ -108,6 +110,7 @@ func TestBlobovniczaTreeRebuildLargeObject(t *testing.T) {
|
|||
func testBlobovniczaTreeRebuildHelper(t *testing.T, sourceDepth, sourceWidth, targetDepth, targetWidth uint64, shouldMigrate bool) {
|
||||
dir := t.TempDir()
|
||||
b := NewBlobovniczaTree(
|
||||
context.Background(),
|
||||
WithLogger(test.NewLogger(t)),
|
||||
WithObjectSizeLimit(2048),
|
||||
WithBlobovniczaShallowWidth(sourceWidth),
|
||||
|
@ -148,6 +151,7 @@ func testBlobovniczaTreeRebuildHelper(t *testing.T, sourceDepth, sourceWidth, ta
|
|||
require.NoError(t, b.Close())
|
||||
|
||||
b = NewBlobovniczaTree(
|
||||
context.Background(),
|
||||
WithLogger(test.NewLogger(t)),
|
||||
WithObjectSizeLimit(2048),
|
||||
WithBlobovniczaShallowWidth(targetWidth),
|
||||
|
|
|
@ -18,6 +18,7 @@ import (
|
|||
|
||||
func defaultTestStorages(p string, smallSizeLimit uint64) ([]SubStorage, *teststore.TestStore, *teststore.TestStore) {
|
||||
smallFileStorage := teststore.New(teststore.WithSubstorage(blobovniczatree.NewBlobovniczaTree(
|
||||
context.Background(),
|
||||
blobovniczatree.WithRootPath(filepath.Join(p, "blobovniczas")),
|
||||
blobovniczatree.WithBlobovniczaShallowWidth(1)), // default width is 16, slow init
|
||||
))
|
||||
|
@ -117,6 +118,7 @@ func TestBlobstor_needsCompression(t *testing.T) {
|
|||
WithStorages([]SubStorage{
|
||||
{
|
||||
Storage: blobovniczatree.NewBlobovniczaTree(
|
||||
context.Background(),
|
||||
blobovniczatree.WithRootPath(filepath.Join(dir, "blobovnicza")),
|
||||
blobovniczatree.WithBlobovniczaShallowWidth(1)), // default width is 16, slow init
|
||||
Policy: func(_ *objectSDK.Object, data []byte) bool {
|
||||
|
|
|
@ -73,6 +73,7 @@ var storages = []storage{
|
|||
desc: "blobovniczatree",
|
||||
create: func(dir string) common.Storage {
|
||||
return blobovniczatree.NewBlobovniczaTree(
|
||||
context.Background(),
|
||||
blobovniczatree.WithRootPath(dir),
|
||||
)
|
||||
},
|
||||
|
|
|
@ -137,6 +137,7 @@ func newStorages(t testing.TB, root string, smallSize uint64) []blobstor.SubStor
|
|||
return []blobstor.SubStorage{
|
||||
{
|
||||
Storage: blobovniczatree.NewBlobovniczaTree(
|
||||
context.Background(),
|
||||
blobovniczatree.WithRootPath(filepath.Join(root, "blobovnicza")),
|
||||
blobovniczatree.WithBlobovniczaShallowDepth(1),
|
||||
blobovniczatree.WithBlobovniczaShallowWidth(1),
|
||||
|
@ -158,6 +159,7 @@ func newStorages(t testing.TB, root string, smallSize uint64) []blobstor.SubStor
|
|||
func newTestStorages(root string, smallSize uint64) ([]blobstor.SubStorage, *teststore.TestStore, *teststore.TestStore) {
|
||||
smallFileStorage := teststore.New(
|
||||
teststore.WithSubstorage(blobovniczatree.NewBlobovniczaTree(
|
||||
context.Background(),
|
||||
blobovniczatree.WithRootPath(filepath.Join(root, "blobovnicza")),
|
||||
blobovniczatree.WithBlobovniczaShallowDepth(1),
|
||||
blobovniczatree.WithBlobovniczaShallowWidth(1),
|
||||
|
|
|
@ -36,6 +36,7 @@ func Test_ObjectNotFoundIfNotDeletedFromMetabase(t *testing.T) {
|
|||
blobstor.WithStorages([]blobstor.SubStorage{
|
||||
{
|
||||
Storage: blobovniczatree.NewBlobovniczaTree(
|
||||
context.Background(),
|
||||
blobovniczatree.WithLogger(test.NewLogger(t)),
|
||||
blobovniczatree.WithRootPath(filepath.Join(rootPath, "blob", "blobovnicza")),
|
||||
blobovniczatree.WithBlobovniczaShallowDepth(1),
|
||||
|
|
|
@ -35,6 +35,7 @@ func TestShard_Lock(t *testing.T) {
|
|||
blobstor.WithStorages([]blobstor.SubStorage{
|
||||
{
|
||||
Storage: blobovniczatree.NewBlobovniczaTree(
|
||||
context.Background(),
|
||||
blobovniczatree.WithRootPath(filepath.Join(rootPath, "blob", "blobovnicza")),
|
||||
blobovniczatree.WithBlobovniczaShallowDepth(2),
|
||||
blobovniczatree.WithBlobovniczaShallowWidth(2)),
|
||||
|
|
|
@ -78,6 +78,7 @@ func testShardGetRange(t *testing.T, hasWriteCache bool) {
|
|||
blobstor.WithStorages([]blobstor.SubStorage{
|
||||
{
|
||||
Storage: blobovniczatree.NewBlobovniczaTree(
|
||||
context.Background(),
|
||||
blobovniczatree.WithLogger(test.NewLogger(t)),
|
||||
blobovniczatree.WithRootPath(filepath.Join(t.TempDir(), "blob", "blobovnicza")),
|
||||
blobovniczatree.WithBlobovniczaShallowDepth(1),
|
||||
|
|
|
@ -59,6 +59,7 @@ func newCustomShard(t testing.TB, enableWriteCache bool, o shardOptions) *Shard
|
|||
blobstor.WithStorages([]blobstor.SubStorage{
|
||||
{
|
||||
Storage: blobovniczatree.NewBlobovniczaTree(
|
||||
context.Background(),
|
||||
blobovniczatree.WithLogger(test.NewLogger(t)),
|
||||
blobovniczatree.WithRootPath(filepath.Join(o.rootPath, "blob", "blobovnicza")),
|
||||
blobovniczatree.WithBlobovniczaShallowDepth(1),
|
||||
|
|
Loading…
Reference in a new issue
I suppose to not to pass parentCtx but use
done
channel. This will reduce changes count.Disagree. We spent a lot of time to exclude using of the
close channel
approach. Routine for eviction may stop earlier thanblobstor
closed, but don't see any obstacles. When parent context cancelled, we need to stop service as faster as we can, because we have only 1m and a half until SIGKILL.