Estimate compression #766
11 changed files with 153 additions and 12 deletions
|
@ -105,7 +105,10 @@ type applicationConfiguration struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
type shardCfg struct {
|
type shardCfg struct {
|
||||||
compress bool
|
compress bool
|
||||||
|
estimateCompressibility bool
|
||||||
|
estimateCompressibilityThreshold float64
|
||||||
|
|
||||||
smallSizeObjectLimit uint64
|
smallSizeObjectLimit uint64
|
||||||
uncompressableContentType []string
|
uncompressableContentType []string
|
||||||
refillMetabase bool
|
refillMetabase bool
|
||||||
|
@ -217,6 +220,8 @@ func (a *applicationConfiguration) updateShardConfig(c *config.Config, oldConfig
|
||||||
newConfig.refillMetabase = oldConfig.RefillMetabase()
|
newConfig.refillMetabase = oldConfig.RefillMetabase()
|
||||||
newConfig.mode = oldConfig.Mode()
|
newConfig.mode = oldConfig.Mode()
|
||||||
newConfig.compress = oldConfig.Compress()
|
newConfig.compress = oldConfig.Compress()
|
||||||
|
newConfig.estimateCompressibility = oldConfig.EstimateCompressibility()
|
||||||
|
newConfig.estimateCompressibilityThreshold = oldConfig.EstimateCompressibilityThreshold()
|
||||||
newConfig.uncompressableContentType = oldConfig.UncompressableContentTypes()
|
newConfig.uncompressableContentType = oldConfig.UncompressableContentTypes()
|
||||||
newConfig.smallSizeObjectLimit = oldConfig.SmallSizeLimit()
|
newConfig.smallSizeObjectLimit = oldConfig.SmallSizeLimit()
|
||||||
|
|
||||||
|
@ -830,6 +835,8 @@ func (c *cfg) getShardOpts(shCfg shardCfg) shardOptsWithID {
|
||||||
blobstoreOpts := []blobstor.Option{
|
blobstoreOpts := []blobstor.Option{
|
||||||
blobstor.WithCompressObjects(shCfg.compress),
|
blobstor.WithCompressObjects(shCfg.compress),
|
||||||
blobstor.WithUncompressableContentTypes(shCfg.uncompressableContentType),
|
blobstor.WithUncompressableContentTypes(shCfg.uncompressableContentType),
|
||||||
|
blobstor.WithCompressibilityEstimate(shCfg.estimateCompressibility),
|
||||||
|
blobstor.WithCompressibilityEstimateThreshold(shCfg.estimateCompressibilityThreshold),
|
||||||
blobstor.WithStorages(ss),
|
blobstor.WithStorages(ss),
|
||||||
blobstor.WithLogger(c.log),
|
blobstor.WithLogger(c.log),
|
||||||
}
|
}
|
||||||
|
|
|
@ -223,3 +223,15 @@ func parseSizeInBytes(sizeStr string) uint64 {
|
||||||
size := cast.ToFloat64(sizeStr)
|
size := cast.ToFloat64(sizeStr)
|
||||||
return safeMul(size, multiplier)
|
return safeMul(size, multiplier)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// FloatOrDefault reads a configuration value
|
||||||
|
// from c by name and casts it to float64.
|
||||||
|
//
|
||||||
|
// Returns defaultValue if the value can not be casted.
|
||||||
|
func FloatOrDefault(c *Config, name string, defaultValue float64) float64 {
|
||||||
|
v, err := cast.ToFloat64E(c.Value(name))
|
||||||
|
if err != nil {
|
||||||
|
return defaultValue
|
||||||
|
}
|
||||||
|
return v
|
||||||
|
}
|
||||||
|
|
|
@ -84,6 +84,8 @@ func TestEngineSection(t *testing.T) {
|
||||||
|
|
||||||
require.Equal(t, true, sc.Compress())
|
require.Equal(t, true, sc.Compress())
|
||||||
require.Equal(t, []string{"audio/*", "video/*"}, sc.UncompressableContentTypes())
|
require.Equal(t, []string{"audio/*", "video/*"}, sc.UncompressableContentTypes())
|
||||||
|
require.Equal(t, true, sc.EstimateCompressibility())
|
||||||
|
require.Equal(t, float64(0.7), sc.EstimateCompressibilityThreshold())
|
||||||
require.EqualValues(t, 102400, sc.SmallSizeLimit())
|
require.EqualValues(t, 102400, sc.SmallSizeLimit())
|
||||||
|
|
||||||
require.Equal(t, 2, len(ss))
|
require.Equal(t, 2, len(ss))
|
||||||
|
|
|
@ -16,8 +16,11 @@ import (
|
||||||
// which provides access to Shard configurations.
|
// which provides access to Shard configurations.
|
||||||
type Config config.Config
|
type Config config.Config
|
||||||
|
|
||||||
// SmallSizeLimitDefault is a default limit of small objects payload in bytes.
|
const (
|
||||||
const SmallSizeLimitDefault = 1 << 20
|
// SmallSizeLimitDefault is a default limit of small objects payload in bytes.
|
||||||
|
SmallSizeLimitDefault = 1 << 20
|
||||||
|
EstimateCompressibilityThresholdDefault = 0.1
|
||||||
|
)
|
||||||
|
|
||||||
// From wraps config section into Config.
|
// From wraps config section into Config.
|
||||||
func From(c *config.Config) *Config {
|
func From(c *config.Config) *Config {
|
||||||
|
@ -43,6 +46,30 @@ func (x *Config) UncompressableContentTypes() []string {
|
||||||
"compression_exclude_content_types")
|
"compression_exclude_content_types")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// EstimateCompressibility returns the value of "estimate_compressibility" config parameter.
|
||||||
|
//
|
||||||
|
// Returns false if the value is not a valid bool.
|
||||||
|
func (x *Config) EstimateCompressibility() bool {
|
||||||
|
return config.BoolSafe(
|
||||||
|
(*config.Config)(x),
|
||||||
|
"compression_estimate_compressibility",
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
// EstimateCompressibilityThreshold returns the value of "estimate_compressibility_threshold" config parameter.
|
||||||
|
//
|
||||||
|
// Returns EstimateCompressibilityThresholdDefault if the value is not defined, not valid float or not in range [0.0; 1.0].
|
||||||
|
func (x *Config) EstimateCompressibilityThreshold() float64 {
|
||||||
|
v := config.FloatOrDefault(
|
||||||
|
(*config.Config)(x),
|
||||||
|
"compression_estimate_compressibility_threshold",
|
||||||
|
EstimateCompressibilityThresholdDefault)
|
||||||
|
if v < 0.0 || v > 1.0 {
|
||||||
|
return EstimateCompressibilityThresholdDefault
|
||||||
|
}
|
||||||
|
return v
|
||||||
|
}
|
||||||
|
|
||||||
// SmallSizeLimit returns the value of "small_object_size" config parameter.
|
// SmallSizeLimit returns the value of "small_object_size" config parameter.
|
||||||
//
|
//
|
||||||
// Returns SmallSizeLimitDefault if the value is not a positive number.
|
// Returns SmallSizeLimitDefault if the value is not a positive number.
|
||||||
|
|
|
@ -113,6 +113,8 @@ FROSTFS_STORAGE_SHARD_0_METABASE_MAX_BATCH_DELAY=10ms
|
||||||
### Blobstor config
|
### Blobstor config
|
||||||
FROSTFS_STORAGE_SHARD_0_COMPRESS=true
|
FROSTFS_STORAGE_SHARD_0_COMPRESS=true
|
||||||
FROSTFS_STORAGE_SHARD_0_COMPRESSION_EXCLUDE_CONTENT_TYPES="audio/* video/*"
|
FROSTFS_STORAGE_SHARD_0_COMPRESSION_EXCLUDE_CONTENT_TYPES="audio/* video/*"
|
||||||
|
FROSTFS_STORAGE_SHARD_0_COMPRESSION_ESTIMATE_COMPRESSIBILITY=true
|
||||||
|
FROSTFS_STORAGE_SHARD_0_COMPRESSION_ESTIMATE_COMPRESSIBILITY_THRESHOLD=0.7
|
||||||
FROSTFS_STORAGE_SHARD_0_SMALL_OBJECT_SIZE=102400
|
FROSTFS_STORAGE_SHARD_0_SMALL_OBJECT_SIZE=102400
|
||||||
### Blobovnicza config
|
### Blobovnicza config
|
||||||
FROSTFS_STORAGE_SHARD_0_BLOBSTOR_0_PATH=tmp/0/blob/blobovnicza
|
FROSTFS_STORAGE_SHARD_0_BLOBSTOR_0_PATH=tmp/0/blob/blobovnicza
|
||||||
|
|
|
@ -160,6 +160,8 @@
|
||||||
"compression_exclude_content_types": [
|
"compression_exclude_content_types": [
|
||||||
"audio/*", "video/*"
|
"audio/*", "video/*"
|
||||||
],
|
],
|
||||||
|
"compression_estimate_compressibility": true,
|
||||||
|
"compression_estimate_compressibility_threshold": 0.7,
|
||||||
"small_object_size": 102400,
|
"small_object_size": 102400,
|
||||||
"blobstor": [
|
"blobstor": [
|
||||||
{
|
{
|
||||||
|
|
|
@ -178,6 +178,8 @@ storage:
|
||||||
compression_exclude_content_types:
|
compression_exclude_content_types:
|
||||||
- audio/*
|
- audio/*
|
||||||
- video/*
|
- video/*
|
||||||
|
compression_estimate_compressibility: true
|
||||||
|
compression_estimate_compressibility_threshold: 0.7
|
||||||
|
|
||||||
blobstor:
|
blobstor:
|
||||||
- type: blobovnicza
|
- type: blobovnicza
|
||||||
|
|
|
@ -179,15 +179,17 @@ The following table describes configuration for each shard.
|
||||||
|
|
||||||
| Parameter | Type | Default value | Description |
|
| Parameter | Type | Default value | Description |
|
||||||
|-------------------------------------|---------------------------------------------|---------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
|
|-------------------------------------|---------------------------------------------|---------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
|
||||||
| `compress` | `bool` | `false` | Flag to enable compression. |
|
| `compress` | `bool` | `false` | Flag to enable compression. |
|
||||||
| `compression_exclude_content_types` | `[]string` | | List of content-types to disable compression for. Content-type is taken from `Content-Type` object attribute. Each element can contain a star `*` as a first (last) character, which matches any prefix (suffix). |
|
| `compression_exclude_content_types` | `[]string` | | List of content-types to disable compression for. Content-type is taken from `Content-Type` object attribute. Each element can contain a star `*` as a first (last) character, which matches any prefix (suffix). |
|
||||||
| `mode` | `string` | `read-write` | Shard Mode.<br/>Possible values: `read-write`, `read-only`, `degraded`, `degraded-read-only`, `disabled` |
|
| `compression_estimate_compressibility` | `bool` | `false` | If `true`, then noramalized compressibility estimation is used to decide compress data or not. |
|
||||||
| `resync_metabase` | `bool` | `false` | Flag to enable metabase resync on start. |
|
| `compression_estimate_compressibility_threshold` | `float` | `0.1` | Normilized compressibility estimate threshold: data will compress if estimation if greater than this value. |
|
||||||
| `writecache` | [Writecache config](#writecache-subsection) | | Write-cache configuration. |
|
| `mode` | `string` | `read-write` | Shard Mode.<br/>Possible values: `read-write`, `read-only`, `degraded`, `degraded-read-only`, `disabled` |
|
||||||
| `metabase` | [Metabase config](#metabase-subsection) | | Metabase configuration. |
|
| `resync_metabase` | `bool` | `false` | Flag to enable metabase resync on start. |
|
||||||
| `blobstor` | [Blobstor config](#blobstor-subsection) | | Blobstor configuration. |
|
| `writecache` | [Writecache config](#writecache-subsection) | | Write-cache configuration. |
|
||||||
| `small_object_size` | `size` | `1M` | Maximum size of an object stored in blobovnicza tree. |
|
| `metabase` | [Metabase config](#metabase-subsection) | | Metabase configuration. |
|
||||||
| `gc` | [GC config](#gc-subsection) | | GC configuration. |
|
| `blobstor` | [Blobstor config](#blobstor-subsection) | | Blobstor configuration. |
|
||||||
|
| `small_object_size` | `size` | `1M` | Maximum size of an object stored in blobovnicza tree. |
|
||||||
|
| `gc` | [GC config](#gc-subsection) | | GC configuration. |
|
||||||
|
|
||||||
### `blobstor` subsection
|
### `blobstor` subsection
|
||||||
|
|
||||||
|
|
|
@ -107,6 +107,27 @@ func WithCompressObjects(comp bool) Option {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// WithCompressibilityEstimate returns an option to use
|
||||||
|
// normilized compressibility estimate to decide compress
|
||||||
|
// data or not.
|
||||||
|
//
|
||||||
|
// See https://github.com/klauspost/compress/blob/v1.17.2/compressible.go#L5
|
||||||
|
func WithCompressibilityEstimate(v bool) Option {
|
||||||
|
return func(c *cfg) {
|
||||||
|
c.compression.UseCompressEstimation = v
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// WithCompressibilityEstimateThreshold returns an option to set
|
||||||
|
// normilized compressibility estimate threshold.
|
||||||
|
//
|
||||||
|
// See https://github.com/klauspost/compress/blob/v1.17.2/compressible.go#L5
|
||||||
|
func WithCompressibilityEstimateThreshold(threshold float64) Option {
|
||||||
|
return func(c *cfg) {
|
||||||
|
c.compression.CompressEstimationThreshold = threshold
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// WithUncompressableContentTypes returns option to disable decompression
|
// WithUncompressableContentTypes returns option to disable decompression
|
||||||
// for specific content types as seen by object.AttributeContentType attribute.
|
// for specific content types as seen by object.AttributeContentType attribute.
|
||||||
func WithUncompressableContentTypes(values []string) Option {
|
func WithUncompressableContentTypes(values []string) Option {
|
||||||
|
|
|
@ -3,8 +3,10 @@ package compression
|
||||||
import (
|
import (
|
||||||
"crypto/rand"
|
"crypto/rand"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"log"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
|
"github.com/klauspost/compress"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -47,3 +49,50 @@ func notSoRandomSlice(size, blockSize int) []byte {
|
||||||
}
|
}
|
||||||
return data
|
return data
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func BenchmarkCompressionRealVSEstimate(b *testing.B) {
|
||||||
|
var total float64 // to prevent from compiler optimizations
|
||||||
|
maxSize := 60 * 1024 * 1024
|
||||||
|
b.Run("estimate", func(b *testing.B) {
|
||||||
|
b.ResetTimer()
|
||||||
|
|
||||||
|
c := &Config{
|
||||||
|
Enabled: true,
|
||||||
|
}
|
||||||
|
require.NoError(b, c.Init())
|
||||||
|
|
||||||
|
for size := 1024; size <= maxSize; size *= 2 {
|
||||||
|
data := make([]byte, size)
|
||||||
|
_, err := rand.Reader.Read(data)
|
||||||
|
require.NoError(b, err)
|
||||||
|
|
||||||
|
b.StartTimer()
|
||||||
|
estimation := compress.Estimate(data)
|
||||||
|
total += estimation
|
||||||
|
b.StopTimer()
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
b.Run("compress", func(b *testing.B) {
|
||||||
|
b.ResetTimer()
|
||||||
|
|
||||||
|
c := &Config{
|
||||||
|
Enabled: true,
|
||||||
|
}
|
||||||
|
require.NoError(b, c.Init())
|
||||||
|
|
||||||
|
for size := 1024; size <= maxSize; size *= 2 {
|
||||||
|
data := make([]byte, size)
|
||||||
|
_, err := rand.Reader.Read(data)
|
||||||
|
require.NoError(b, err)
|
||||||
|
|
||||||
|
b.StartTimer()
|
||||||
|
maxSize := c.encoder.MaxEncodedSize(len(data))
|
||||||
|
compressed := c.encoder.EncodeAll(data, make([]byte, 0, maxSize))
|
||||||
|
total += float64(len(compressed)) / float64(len(data))
|
||||||
|
b.StopTimer()
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
log.Println(total)
|
||||||
|
}
|
||||||
|
|
|
@ -5,6 +5,7 @@ import (
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
|
"github.com/klauspost/compress"
|
||||||
"github.com/klauspost/compress/zstd"
|
"github.com/klauspost/compress/zstd"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -13,6 +14,9 @@ type Config struct {
|
||||||
Enabled bool
|
Enabled bool
|
||||||
UncompressableContentTypes []string
|
UncompressableContentTypes []string
|
||||||
|
|
||||||
|
UseCompressEstimation bool
|
||||||
|
CompressEstimationThreshold float64
|
||||||
|
|
||||||
encoder *zstd.Encoder
|
encoder *zstd.Encoder
|
||||||
decoder *zstd.Decoder
|
decoder *zstd.Decoder
|
||||||
}
|
}
|
||||||
|
@ -82,6 +86,17 @@ func (c *Config) Compress(data []byte) []byte {
|
||||||
if c == nil || !c.Enabled {
|
if c == nil || !c.Enabled {
|
||||||
return data
|
return data
|
||||||
}
|
}
|
||||||
|
if c.UseCompressEstimation {
|
||||||
|
estimated := compress.Estimate(data)
|
||||||
|
if estimated >= c.CompressEstimationThreshold {
|
||||||
|
return c.compress(data)
|
||||||
|
}
|
||||||
|
return data
|
||||||
|
}
|
||||||
|
return c.compress(data)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Config) compress(data []byte) []byte {
|
||||||
maxSize := c.encoder.MaxEncodedSize(len(data))
|
maxSize := c.encoder.MaxEncodedSize(len(data))
|
||||||
compressed := c.encoder.EncodeAll(data, make([]byte, 0, maxSize))
|
compressed := c.encoder.EncodeAll(data, make([]byte, 0, maxSize))
|
||||||
if len(data) < len(compressed) {
|
if len(data) < len(compressed) {
|
||||||
|
|
Loading…
Reference in a new issue