[#1715] blobstor: Allow to specify custom compression level
Change-Id: I140c39b9dceaaeb58767061b131777af22242b19 Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
parent
2d1232ce6d
commit
98308d0cad
13 changed files with 74 additions and 4 deletions
|
@ -13,6 +13,7 @@ import (
|
|||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/qos"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/blobovniczatree"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/compression"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
|
||||
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
||||
|
@ -135,6 +136,7 @@ func getMetabaseOpts(sh *shardconfig.Config) []meta.Option {
|
|||
func getBlobstorOpts(ctx context.Context, sh *shardconfig.Config) []blobstor.Option {
|
||||
result := []blobstor.Option{
|
||||
blobstor.WithCompressObjects(sh.Compress()),
|
||||
blobstor.WithCompressionLevel(compression.Level(sh.CompressionLevel())),
|
||||
blobstor.WithUncompressableContentTypes(sh.UncompressableContentTypes()),
|
||||
blobstor.WithCompressibilityEstimate(sh.EstimateCompressibility()),
|
||||
blobstor.WithCompressibilityEstimateThreshold(sh.EstimateCompressibilityThreshold()),
|
||||
|
|
|
@ -40,6 +40,7 @@ import (
|
|||
netmapCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/blobovniczatree"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/compression"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
|
||||
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
||||
|
@ -129,6 +130,7 @@ type applicationConfiguration struct {
|
|||
|
||||
type shardCfg struct {
|
||||
compress bool
|
||||
compressionLevel compression.Level
|
||||
estimateCompressibility bool
|
||||
estimateCompressibilityThreshold float64
|
||||
|
||||
|
@ -273,6 +275,7 @@ func (a *applicationConfiguration) updateShardConfig(c *config.Config, source *s
|
|||
target.refillMetabaseWorkersCount = source.RefillMetabaseWorkersCount()
|
||||
target.mode = source.Mode()
|
||||
target.compress = source.Compress()
|
||||
target.compressionLevel = compression.Level(source.CompressionLevel())
|
||||
target.estimateCompressibility = source.EstimateCompressibility()
|
||||
target.estimateCompressibilityThreshold = source.EstimateCompressibilityThreshold()
|
||||
target.uncompressableContentType = source.UncompressableContentTypes()
|
||||
|
@ -1027,6 +1030,7 @@ func (c *cfg) getShardOpts(ctx context.Context, shCfg shardCfg) shardOptsWithID
|
|||
|
||||
blobstoreOpts := []blobstor.Option{
|
||||
blobstor.WithCompressObjects(shCfg.compress),
|
||||
blobstor.WithCompressionLevel(shCfg.compressionLevel),
|
||||
blobstor.WithUncompressableContentTypes(shCfg.uncompressableContentType),
|
||||
blobstor.WithCompressibilityEstimate(shCfg.estimateCompressibility),
|
||||
blobstor.WithCompressibilityEstimateThreshold(shCfg.estimateCompressibilityThreshold),
|
||||
|
|
|
@ -101,6 +101,7 @@ func TestEngineSection(t *testing.T) {
|
|||
require.Equal(t, 10*time.Millisecond, meta.BoltDB().MaxBatchDelay())
|
||||
|
||||
require.Equal(t, true, sc.Compress())
|
||||
require.Equal(t, "fastest", sc.CompressionLevel())
|
||||
require.Equal(t, []string{"audio/*", "video/*"}, sc.UncompressableContentTypes())
|
||||
require.Equal(t, true, sc.EstimateCompressibility())
|
||||
require.Equal(t, float64(0.7), sc.EstimateCompressibilityThreshold())
|
||||
|
@ -237,6 +238,7 @@ func TestEngineSection(t *testing.T) {
|
|||
require.Equal(t, 20*time.Millisecond, meta.BoltDB().MaxBatchDelay())
|
||||
|
||||
require.Equal(t, false, sc.Compress())
|
||||
require.Equal(t, "", sc.CompressionLevel())
|
||||
require.Equal(t, []string(nil), sc.UncompressableContentTypes())
|
||||
require.EqualValues(t, 102400, sc.SmallSizeLimit())
|
||||
|
||||
|
|
|
@ -37,6 +37,16 @@ func (x *Config) Compress() bool {
|
|||
)
|
||||
}
|
||||
|
||||
// CompressionLevel returns the value of "compression_level" config parameter.
|
||||
//
|
||||
// Returns empty string if the value is not a valid string.
|
||||
func (x *Config) CompressionLevel() string {
|
||||
return config.StringSafe(
|
||||
(*config.Config)(x),
|
||||
"compression_level",
|
||||
)
|
||||
}
|
||||
|
||||
// UncompressableContentTypes returns the value of "compress_skip_content_types" config parameter.
|
||||
//
|
||||
// Returns nil if a the value is missing or is invalid.
|
||||
|
|
|
@ -123,6 +123,7 @@ FROSTFS_STORAGE_SHARD_0_METABASE_MAX_BATCH_SIZE=100
|
|||
FROSTFS_STORAGE_SHARD_0_METABASE_MAX_BATCH_DELAY=10ms
|
||||
### Blobstor config
|
||||
FROSTFS_STORAGE_SHARD_0_COMPRESS=true
|
||||
FROSTFS_STORAGE_SHARD_0_COMPRESSION_LEVEL=fastest
|
||||
FROSTFS_STORAGE_SHARD_0_COMPRESSION_EXCLUDE_CONTENT_TYPES="audio/* video/*"
|
||||
FROSTFS_STORAGE_SHARD_0_COMPRESSION_ESTIMATE_COMPRESSIBILITY=true
|
||||
FROSTFS_STORAGE_SHARD_0_COMPRESSION_ESTIMATE_COMPRESSIBILITY_THRESHOLD=0.7
|
||||
|
|
|
@ -184,6 +184,7 @@
|
|||
"max_batch_delay": "10ms"
|
||||
},
|
||||
"compress": true,
|
||||
"compression_level": "fastest",
|
||||
"compression_exclude_content_types": [
|
||||
"audio/*", "video/*"
|
||||
],
|
||||
|
|
|
@ -160,7 +160,7 @@ storage:
|
|||
max_batch_delay: 5ms # maximum delay for a batch of operations to be executed
|
||||
max_batch_size: 100 # maximum amount of operations in a single batch
|
||||
|
||||
compress: false # turn on/off zstd(level 3) compression of stored objects
|
||||
compress: false # turn on/off zstd compression of stored objects
|
||||
small_object_size: 100 kb # size threshold for "small" objects which are cached in key-value DB, not in FS, bytes
|
||||
|
||||
blobstor:
|
||||
|
@ -202,7 +202,8 @@ storage:
|
|||
max_batch_size: 100
|
||||
max_batch_delay: 10ms
|
||||
|
||||
compress: true # turn on/off zstd(level 3) compression of stored objects
|
||||
compress: true # turn on/off zstd compression of stored objects
|
||||
compression_level: fastest
|
||||
compression_exclude_content_types:
|
||||
- audio/*
|
||||
- video/*
|
||||
|
|
|
@ -188,6 +188,7 @@ The following table describes configuration for each shard.
|
|||
| Parameter | Type | Default value | Description |
|
||||
| ------------------------------------------------ | ------------------------------------------- | ------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
|
||||
| `compress` | `bool` | `false` | Flag to enable compression. |
|
||||
| `compression_level` | `string` | `optimal` | Compression level. Available values are `optimal`, `fastest`, `smallest_size`. |
|
||||
| `compression_exclude_content_types` | `[]string` | | List of content-types to disable compression for. Content-type is taken from `Content-Type` object attribute. Each element can contain a star `*` as a first (last) character, which matches any prefix (suffix). |
|
||||
| `compression_estimate_compressibility` | `bool` | `false` | If `true`, then noramalized compressibility estimation is used to decide compress data or not. |
|
||||
| `compression_estimate_compressibility_threshold` | `float` | `0.1` | Normilized compressibility estimate threshold: data will compress if estimation if greater than this value. |
|
||||
|
@ -199,7 +200,7 @@ The following table describes configuration for each shard.
|
|||
| `blobstor` | [Blobstor config](#blobstor-subsection) | | Blobstor configuration. |
|
||||
| `small_object_size` | `size` | `1M` | Maximum size of an object stored in blobovnicza tree. |
|
||||
| `gc` | [GC config](#gc-subsection) | | GC configuration. |
|
||||
| `limits` | [Shard limits config](#limits-subsection) | | Shard limits configuration. |
|
||||
| `limits` | [Shard limits config](#limits-subsection) | | Shard limits configuration. |
|
||||
|
||||
### `blobstor` subsection
|
||||
|
||||
|
|
|
@ -23,3 +23,7 @@ func NoError(err error, details ...string) {
|
|||
panic(content)
|
||||
}
|
||||
}
|
||||
|
||||
func Fail(details ...string) {
|
||||
panic(strings.Join(details, " "))
|
||||
}
|
||||
|
|
|
@ -516,4 +516,5 @@ const (
|
|||
FailedToValidateIncomingIOTag = "failed to validate incoming IO tag, replaced with `client`"
|
||||
WriteCacheFailedToAcquireRPSQuota = "writecache failed to acquire RPS quota to flush object"
|
||||
FailedToUpdateNetmapCandidates = "update netmap candidates failed"
|
||||
UnknownCompressionLevelDefaultWillBeUsed = "unknown compression level, 'optimal' will be used"
|
||||
)
|
||||
|
|
|
@ -109,6 +109,12 @@ func WithCompressObjects(comp bool) Option {
|
|||
}
|
||||
}
|
||||
|
||||
func WithCompressionLevel(level compression.Level) Option {
|
||||
return func(c *cfg) {
|
||||
c.compression.Level = level
|
||||
}
|
||||
}
|
||||
|
||||
// WithCompressibilityEstimate returns an option to use
|
||||
// normilized compressibility estimate to decide compress
|
||||
// data or not.
|
||||
|
|
|
@ -4,15 +4,26 @@ import (
|
|||
"bytes"
|
||||
"strings"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/assert"
|
||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
"github.com/klauspost/compress"
|
||||
"github.com/klauspost/compress/zstd"
|
||||
)
|
||||
|
||||
type Level string
|
||||
|
||||
const (
|
||||
LevelDefault Level = ""
|
||||
LevelOptimal Level = "optimal"
|
||||
LevelFastest Level = "fastest"
|
||||
LevelSmallestSize Level = "smallest_size"
|
||||
)
|
||||
|
||||
// Config represents common compression-related configuration.
|
||||
type Config struct {
|
||||
Enabled bool
|
||||
UncompressableContentTypes []string
|
||||
Level Level
|
||||
|
||||
UseCompressEstimation bool
|
||||
CompressEstimationThreshold float64
|
||||
|
@ -30,7 +41,7 @@ func (c *Config) Init() error {
|
|||
var err error
|
||||
|
||||
if c.Enabled {
|
||||
c.encoder, err = zstd.NewWriter(nil)
|
||||
c.encoder, err = zstd.NewWriter(nil, zstd.WithEncoderLevel(c.compressionLevel()))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -116,3 +127,24 @@ func (c *Config) Close() error {
|
|||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (c *Config) HasValidCompressionLevel() bool {
|
||||
return c.Level == LevelDefault ||
|
||||
c.Level == LevelOptimal ||
|
||||
c.Level == LevelFastest ||
|
||||
c.Level == LevelSmallestSize
|
||||
}
|
||||
|
||||
func (c *Config) compressionLevel() zstd.EncoderLevel {
|
||||
switch c.Level {
|
||||
case LevelDefault, LevelOptimal:
|
||||
return zstd.SpeedDefault
|
||||
case LevelFastest:
|
||||
return zstd.SpeedFastest
|
||||
case LevelSmallestSize:
|
||||
return zstd.SpeedBestCompression
|
||||
default:
|
||||
assert.Fail("unknown compression level", string(c.Level))
|
||||
return zstd.SpeedDefault
|
||||
}
|
||||
}
|
||||
|
|
|
@ -6,6 +6,7 @@ import (
|
|||
"fmt"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/compression"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
@ -53,6 +54,10 @@ var ErrInitBlobovniczas = errors.New("failure on blobovnicza initialization stag
|
|||
func (b *BlobStor) Init(ctx context.Context) error {
|
||||
b.log.Debug(ctx, logs.BlobstorInitializing)
|
||||
|
||||
if !b.compression.HasValidCompressionLevel() {
|
||||
b.log.Warn(ctx, logs.UnknownCompressionLevelDefaultWillBeUsed, zap.String("level", string(b.compression.Level)))
|
||||
b.compression.Level = compression.LevelDefault
|
||||
}
|
||||
if err := b.compression.Init(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue