[#9999] writecache: Add count limit
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
parent
6e92cf68e3
commit
725d3f8b02
9 changed files with 53 additions and 13 deletions
|
@ -149,6 +149,7 @@ type shardCfg struct {
|
||||||
maxObjSize uint64
|
maxObjSize uint64
|
||||||
flushWorkerCount int
|
flushWorkerCount int
|
||||||
sizeLimit uint64
|
sizeLimit uint64
|
||||||
|
countLimit uint64
|
||||||
noSync bool
|
noSync bool
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -269,6 +270,7 @@ func (a *applicationConfiguration) setShardWriteCacheConfig(newConfig *shardCfg,
|
||||||
wc.maxObjSize = writeCacheCfg.MaxObjectSize()
|
wc.maxObjSize = writeCacheCfg.MaxObjectSize()
|
||||||
wc.flushWorkerCount = writeCacheCfg.WorkerCount()
|
wc.flushWorkerCount = writeCacheCfg.WorkerCount()
|
||||||
wc.sizeLimit = writeCacheCfg.SizeLimit()
|
wc.sizeLimit = writeCacheCfg.SizeLimit()
|
||||||
|
wc.countLimit = writeCacheCfg.CountLimit()
|
||||||
wc.noSync = writeCacheCfg.NoSync()
|
wc.noSync = writeCacheCfg.NoSync()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -858,6 +860,7 @@ func (c *cfg) getWriteCacheOpts(shCfg shardCfg) []writecache.Option {
|
||||||
writecache.WithMaxObjectSize(wcRead.maxObjSize),
|
writecache.WithMaxObjectSize(wcRead.maxObjSize),
|
||||||
writecache.WithFlushWorkersCount(wcRead.flushWorkerCount),
|
writecache.WithFlushWorkersCount(wcRead.flushWorkerCount),
|
||||||
writecache.WithMaxCacheSize(wcRead.sizeLimit),
|
writecache.WithMaxCacheSize(wcRead.sizeLimit),
|
||||||
|
writecache.WithMaxCacheCount(wcRead.countLimit),
|
||||||
writecache.WithNoSync(wcRead.noSync),
|
writecache.WithNoSync(wcRead.noSync),
|
||||||
writecache.WithLogger(c.log),
|
writecache.WithLogger(c.log),
|
||||||
)
|
)
|
||||||
|
|
|
@ -77,6 +77,7 @@ func TestEngineSection(t *testing.T) {
|
||||||
require.EqualValues(t, 134217728, wc.MaxObjectSize())
|
require.EqualValues(t, 134217728, wc.MaxObjectSize())
|
||||||
require.EqualValues(t, 30, wc.WorkerCount())
|
require.EqualValues(t, 30, wc.WorkerCount())
|
||||||
require.EqualValues(t, 3221225472, wc.SizeLimit())
|
require.EqualValues(t, 3221225472, wc.SizeLimit())
|
||||||
|
require.EqualValues(t, 0, wc.CountLimit())
|
||||||
|
|
||||||
require.Equal(t, "tmp/0/meta", meta.Path())
|
require.Equal(t, "tmp/0/meta", meta.Path())
|
||||||
require.Equal(t, fs.FileMode(0o644), meta.BoltDB().Perm())
|
require.Equal(t, fs.FileMode(0o644), meta.BoltDB().Perm())
|
||||||
|
@ -131,6 +132,7 @@ func TestEngineSection(t *testing.T) {
|
||||||
require.EqualValues(t, 134217728, wc.MaxObjectSize())
|
require.EqualValues(t, 134217728, wc.MaxObjectSize())
|
||||||
require.EqualValues(t, 30, wc.WorkerCount())
|
require.EqualValues(t, 30, wc.WorkerCount())
|
||||||
require.EqualValues(t, 4294967296, wc.SizeLimit())
|
require.EqualValues(t, 4294967296, wc.SizeLimit())
|
||||||
|
require.EqualValues(t, 10000, wc.CountLimit())
|
||||||
|
|
||||||
require.Equal(t, "tmp/1/meta", meta.Path())
|
require.Equal(t, "tmp/1/meta", meta.Path())
|
||||||
require.Equal(t, fs.FileMode(0o644), meta.BoltDB().Perm())
|
require.Equal(t, fs.FileMode(0o644), meta.BoltDB().Perm())
|
||||||
|
|
|
@ -80,11 +80,20 @@ func (x *Config) WorkerCount() int {
|
||||||
return WorkersNumberDefault
|
return WorkersNumberDefault
|
||||||
}
|
}
|
||||||
|
|
||||||
// SizeLimit returns the value of "capacity" config parameter.
|
// SizeLimit returns the value of "capacity_size" or "capacity" config parameter.
|
||||||
//
|
//
|
||||||
// Returns SizeLimitDefault if the value is not a positive number.
|
// Returns SizeLimitDefault if the value is not a positive number.
|
||||||
func (x *Config) SizeLimit() uint64 {
|
func (x *Config) SizeLimit() uint64 {
|
||||||
c := config.SizeInBytesSafe(
|
c := config.SizeInBytesSafe(
|
||||||
|
(*config.Config)(x),
|
||||||
|
"capacity_size",
|
||||||
|
)
|
||||||
|
|
||||||
|
if c > 0 {
|
||||||
|
return c
|
||||||
|
}
|
||||||
|
|
||||||
|
c = config.SizeInBytesSafe(
|
||||||
(*config.Config)(x),
|
(*config.Config)(x),
|
||||||
"capacity",
|
"capacity",
|
||||||
)
|
)
|
||||||
|
@ -96,6 +105,16 @@ func (x *Config) SizeLimit() uint64 {
|
||||||
return SizeLimitDefault
|
return SizeLimitDefault
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// CountLimit returns the value of "capacity_count" config parameter.
|
||||||
|
//
|
||||||
|
// Returns 0 (means no limit) if the value is not a positive number.
|
||||||
|
func (x *Config) CountLimit() uint64 {
|
||||||
|
return config.SizeInBytesSafe(
|
||||||
|
(*config.Config)(x),
|
||||||
|
"capacity_count",
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
// NoSync returns the value of "no_sync" config parameter.
|
// NoSync returns the value of "no_sync" config parameter.
|
||||||
//
|
//
|
||||||
// Returns false if the value is not a boolean.
|
// Returns false if the value is not a boolean.
|
||||||
|
|
|
@ -156,7 +156,8 @@ FROSTFS_STORAGE_SHARD_1_WRITECACHE_ENABLED=true
|
||||||
FROSTFS_STORAGE_SHARD_1_WRITECACHE_PATH=tmp/1/cache
|
FROSTFS_STORAGE_SHARD_1_WRITECACHE_PATH=tmp/1/cache
|
||||||
FROSTFS_STORAGE_SHARD_1_WRITECACHE_MAX_OBJECT_SIZE=134217728
|
FROSTFS_STORAGE_SHARD_1_WRITECACHE_MAX_OBJECT_SIZE=134217728
|
||||||
FROSTFS_STORAGE_SHARD_1_WRITECACHE_FLUSH_WORKER_COUNT=30
|
FROSTFS_STORAGE_SHARD_1_WRITECACHE_FLUSH_WORKER_COUNT=30
|
||||||
FROSTFS_STORAGE_SHARD_1_WRITECACHE_CAPACITY=4294967296
|
FROSTFS_STORAGE_SHARD_1_WRITECACHE_CAPACITY_SIZE=4294967296
|
||||||
|
FROSTFS_STORAGE_SHARD_1_WRITECACHE_CAPACITY_COUNT=10000
|
||||||
### Metabase config
|
### Metabase config
|
||||||
FROSTFS_STORAGE_SHARD_1_METABASE_PATH=tmp/1/meta
|
FROSTFS_STORAGE_SHARD_1_METABASE_PATH=tmp/1/meta
|
||||||
FROSTFS_STORAGE_SHARD_1_METABASE_PERM=0644
|
FROSTFS_STORAGE_SHARD_1_METABASE_PERM=0644
|
||||||
|
|
|
@ -204,7 +204,8 @@
|
||||||
"memcache_capacity": 2147483648,
|
"memcache_capacity": 2147483648,
|
||||||
"max_object_size": 134217728,
|
"max_object_size": 134217728,
|
||||||
"flush_worker_count": 30,
|
"flush_worker_count": 30,
|
||||||
"capacity": 4294967296
|
"capacity_size": 4294967296,
|
||||||
|
"capacity_count": 10000
|
||||||
},
|
},
|
||||||
"metabase": {
|
"metabase": {
|
||||||
"path": "tmp/1/meta",
|
"path": "tmp/1/meta",
|
||||||
|
|
|
@ -207,7 +207,8 @@ storage:
|
||||||
1:
|
1:
|
||||||
writecache:
|
writecache:
|
||||||
path: tmp/1/cache # write-cache root directory
|
path: tmp/1/cache # write-cache root directory
|
||||||
capacity: 4 G # approximate write-cache total size, bytes
|
capacity_size: 4 G # approximate write-cache total size, bytes
|
||||||
|
capacity_count: 10000
|
||||||
|
|
||||||
metabase:
|
metabase:
|
||||||
path: tmp/1/meta # metabase path
|
path: tmp/1/meta # metabase path
|
||||||
|
|
|
@ -286,17 +286,20 @@ metabase:
|
||||||
writecache:
|
writecache:
|
||||||
enabled: true
|
enabled: true
|
||||||
path: /path/to/writecache
|
path: /path/to/writecache
|
||||||
capacity: 4294967296
|
capacity_size: 4294967296
|
||||||
|
capacity_count: 100000
|
||||||
max_object_size: 134217728
|
max_object_size: 134217728
|
||||||
flush_worker_count: 30
|
flush_worker_count: 30
|
||||||
```
|
```
|
||||||
|
|
||||||
| Parameter | Type | Default value | Description |
|
| Parameter | Type | Default value | Description |
|
||||||
|----------------------|------------|---------------|----------------------------------------------------------------------------------------------------------------------|
|
|----------------------|------------|---------------|----------------------------------------------------------------------------------------------------------------------------------|
|
||||||
| `path` | `string` | | Path to the metabase file. |
|
| `path` | `string` | | Path to the metabase file. |
|
||||||
| `capacity` | `size` | unrestricted | Approximate maximum size of the writecache. If the writecache is full, objects are written to the blobstor directly. |
|
| `capacity_size` | `size` | `1G` | Approximate maximum size of the writecache. If the writecache is full, objects are written to the blobstor directly. |
|
||||||
| `max_object_size` | `size` | `64M` | Maximum object size allowed to be stored in the writecache. |
|
| `capacity` | `size` | `1G` | The same as for `capacity`. Deprecated, use `capacity_size`. |
|
||||||
| `flush_worker_count` | `int` | `20` | Amount of background workers that move data from the writecache to the blobstor. |
|
| `capacity_count` | `int` | unrestricted | Approximate maximum count of objects in the writecache. If the writecache is full, objects are written to the blobstor directly. |
|
||||||
|
| `max_object_size` | `size` | `64M` | Maximum object size allowed to be stored in the writecache. |
|
||||||
|
| `flush_worker_count` | `int` | `20` | Amount of background workers that move data from the writecache to the blobstor. |
|
||||||
|
|
||||||
|
|
||||||
# `node` section
|
# `node` section
|
||||||
|
|
|
@ -27,6 +27,8 @@ type options struct {
|
||||||
// maxCacheSize is the maximum total size of all objects saved in cache.
|
// maxCacheSize is the maximum total size of all objects saved in cache.
|
||||||
// 1 GiB by default.
|
// 1 GiB by default.
|
||||||
maxCacheSize uint64
|
maxCacheSize uint64
|
||||||
|
// maxCacheCount is the maximum total count of all objects saved in cache.
|
||||||
|
maxCacheCount uint64
|
||||||
// counter contains atomic counters for the number of objects stored in cache.
|
// counter contains atomic counters for the number of objects stored in cache.
|
||||||
counter *fstree.SimpleCounter
|
counter *fstree.SimpleCounter
|
||||||
// noSync is true iff FSTree allows unsynchronized writes.
|
// noSync is true iff FSTree allows unsynchronized writes.
|
||||||
|
@ -93,6 +95,13 @@ func WithMaxCacheSize(sz uint64) Option {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// WithMaxCacheCount sets maximum write-cache count of objects.
|
||||||
|
func WithMaxCacheCount(cnt uint64) Option {
|
||||||
|
return func(o *options) {
|
||||||
|
o.maxCacheCount = cnt
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// WithNoSync sets an option to allow returning to caller on PUT before write is persisted.
|
// WithNoSync sets an option to allow returning to caller on PUT before write is persisted.
|
||||||
// Note, that we use this flag for FSTree only and DO NOT use it for a bolt DB because
|
// Note, that we use this flag for FSTree only and DO NOT use it for a bolt DB because
|
||||||
// we cannot yet properly handle the corrupted database during the startup. This SHOULD NOT
|
// we cannot yet properly handle the corrupted database during the startup. This SHOULD NOT
|
||||||
|
|
|
@ -14,6 +14,7 @@ func (c *cache) estimateCacheSize() {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *cache) hasFreeSpace(sz uint64) bool {
|
func (c *cache) hasFreeSpace(sz uint64) bool {
|
||||||
_, size := c.counter.Value()
|
count, size := c.counter.Value()
|
||||||
return size+int64(sz) <= int64(c.maxCacheSize)
|
return (size+int64(sz) <= int64(c.maxCacheSize)) &&
|
||||||
|
(c.maxCacheCount == 0 || count+1 <= int64(c.maxCacheCount))
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue