forked from TrueCloudLab/frostfs-node
[#1715] compression: Decouple Config and Compressor
Refactoring. Change-Id: Ide2e1378f30c39045d4bacd13a902331bd4f764f Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
parent
98308d0cad
commit
8c746a914a
12 changed files with 49 additions and 41 deletions
|
@ -158,11 +158,11 @@ func (b *Blobovniczas) Path() string {
|
||||||
}
|
}
|
||||||
|
|
||||||
// SetCompressor implements common.Storage.
|
// SetCompressor implements common.Storage.
|
||||||
func (b *Blobovniczas) SetCompressor(cc *compression.Config) {
|
func (b *Blobovniczas) SetCompressor(cc *compression.Compressor) {
|
||||||
b.compression = cc
|
b.compression = cc
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *Blobovniczas) Compressor() *compression.Config {
|
func (b *Blobovniczas) Compressor() *compression.Compressor {
|
||||||
return b.compression
|
return b.compression
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -19,7 +19,7 @@ type cfg struct {
|
||||||
openedCacheSize int
|
openedCacheSize int
|
||||||
blzShallowDepth uint64
|
blzShallowDepth uint64
|
||||||
blzShallowWidth uint64
|
blzShallowWidth uint64
|
||||||
compression *compression.Config
|
compression *compression.Compressor
|
||||||
blzOpts []blobovnicza.Option
|
blzOpts []blobovnicza.Option
|
||||||
reportError func(context.Context, string, error) // reportError is the function called when encountering disk errors.
|
reportError func(context.Context, string, error) // reportError is the function called when encountering disk errors.
|
||||||
metrics Metrics
|
metrics Metrics
|
||||||
|
|
|
@ -41,7 +41,7 @@ type SubStorageInfo struct {
|
||||||
type Option func(*cfg)
|
type Option func(*cfg)
|
||||||
|
|
||||||
type cfg struct {
|
type cfg struct {
|
||||||
compression compression.Config
|
compression compression.Compressor
|
||||||
log *logger.Logger
|
log *logger.Logger
|
||||||
storage []SubStorage
|
storage []SubStorage
|
||||||
metrics Metrics
|
metrics Metrics
|
||||||
|
@ -158,6 +158,6 @@ func WithMetrics(m Metrics) Option {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *BlobStor) Compressor() *compression.Config {
|
func (b *BlobStor) Compressor() *compression.Compressor {
|
||||||
return &b.compression
|
return &b.compression
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,8 +18,8 @@ type Storage interface {
|
||||||
Path() string
|
Path() string
|
||||||
ObjectsCount(ctx context.Context) (uint64, error)
|
ObjectsCount(ctx context.Context) (uint64, error)
|
||||||
|
|
||||||
SetCompressor(cc *compression.Config)
|
SetCompressor(cc *compression.Compressor)
|
||||||
Compressor() *compression.Config
|
Compressor() *compression.Compressor
|
||||||
|
|
||||||
// SetReportErrorFunc allows to provide a function to be called on disk errors.
|
// SetReportErrorFunc allows to provide a function to be called on disk errors.
|
||||||
// This function MUST be called before Open.
|
// This function MUST be called before Open.
|
||||||
|
|
|
@ -11,7 +11,7 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
func BenchmarkCompression(b *testing.B) {
|
func BenchmarkCompression(b *testing.B) {
|
||||||
c := Config{Enabled: true}
|
c := Compressor{Config: Config{Enabled: true}}
|
||||||
require.NoError(b, c.Init())
|
require.NoError(b, c.Init())
|
||||||
|
|
||||||
for _, size := range []int{128, 1024, 32 * 1024, 32 * 1024 * 1024} {
|
for _, size := range []int{128, 1024, 32 * 1024, 32 * 1024 * 1024} {
|
||||||
|
@ -33,7 +33,7 @@ func BenchmarkCompression(b *testing.B) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func benchWith(b *testing.B, c Config, data []byte) {
|
func benchWith(b *testing.B, c Compressor, data []byte) {
|
||||||
b.ResetTimer()
|
b.ResetTimer()
|
||||||
b.ReportAllocs()
|
b.ReportAllocs()
|
||||||
for range b.N {
|
for range b.N {
|
||||||
|
@ -56,8 +56,10 @@ func BenchmarkCompressionRealVSEstimate(b *testing.B) {
|
||||||
b.Run("estimate", func(b *testing.B) {
|
b.Run("estimate", func(b *testing.B) {
|
||||||
b.ResetTimer()
|
b.ResetTimer()
|
||||||
|
|
||||||
c := &Config{
|
c := &Compressor{
|
||||||
|
Config: Config{
|
||||||
Enabled: true,
|
Enabled: true,
|
||||||
|
},
|
||||||
}
|
}
|
||||||
require.NoError(b, c.Init())
|
require.NoError(b, c.Init())
|
||||||
|
|
||||||
|
@ -76,8 +78,10 @@ func BenchmarkCompressionRealVSEstimate(b *testing.B) {
|
||||||
b.Run("compress", func(b *testing.B) {
|
b.Run("compress", func(b *testing.B) {
|
||||||
b.ResetTimer()
|
b.ResetTimer()
|
||||||
|
|
||||||
c := &Config{
|
c := &Compressor{
|
||||||
|
Config: Config{
|
||||||
Enabled: true,
|
Enabled: true,
|
||||||
|
},
|
||||||
}
|
}
|
||||||
require.NoError(b, c.Init())
|
require.NoError(b, c.Init())
|
||||||
|
|
||||||
|
|
|
@ -19,6 +19,13 @@ const (
|
||||||
LevelSmallestSize Level = "smallest_size"
|
LevelSmallestSize Level = "smallest_size"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
type Compressor struct {
|
||||||
|
Config
|
||||||
|
|
||||||
|
encoder *zstd.Encoder
|
||||||
|
decoder *zstd.Decoder
|
||||||
|
}
|
||||||
|
|
||||||
// Config represents common compression-related configuration.
|
// Config represents common compression-related configuration.
|
||||||
type Config struct {
|
type Config struct {
|
||||||
Enabled bool
|
Enabled bool
|
||||||
|
@ -27,9 +34,6 @@ type Config struct {
|
||||||
|
|
||||||
UseCompressEstimation bool
|
UseCompressEstimation bool
|
||||||
CompressEstimationThreshold float64
|
CompressEstimationThreshold float64
|
||||||
|
|
||||||
encoder *zstd.Encoder
|
|
||||||
decoder *zstd.Decoder
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// zstdFrameMagic contains first 4 bytes of any compressed object
|
// zstdFrameMagic contains first 4 bytes of any compressed object
|
||||||
|
@ -37,7 +41,7 @@ type Config struct {
|
||||||
var zstdFrameMagic = []byte{0x28, 0xb5, 0x2f, 0xfd}
|
var zstdFrameMagic = []byte{0x28, 0xb5, 0x2f, 0xfd}
|
||||||
|
|
||||||
// Init initializes compression routines.
|
// Init initializes compression routines.
|
||||||
func (c *Config) Init() error {
|
func (c *Compressor) Init() error {
|
||||||
var err error
|
var err error
|
||||||
|
|
||||||
if c.Enabled {
|
if c.Enabled {
|
||||||
|
@ -84,7 +88,7 @@ func (c *Config) NeedsCompression(obj *objectSDK.Object) bool {
|
||||||
|
|
||||||
// Decompress decompresses data if it starts with the magic
|
// Decompress decompresses data if it starts with the magic
|
||||||
// and returns data untouched otherwise.
|
// and returns data untouched otherwise.
|
||||||
func (c *Config) Decompress(data []byte) ([]byte, error) {
|
func (c *Compressor) Decompress(data []byte) ([]byte, error) {
|
||||||
if len(data) < 4 || !bytes.Equal(data[:4], zstdFrameMagic) {
|
if len(data) < 4 || !bytes.Equal(data[:4], zstdFrameMagic) {
|
||||||
return data, nil
|
return data, nil
|
||||||
}
|
}
|
||||||
|
@ -93,7 +97,7 @@ func (c *Config) Decompress(data []byte) ([]byte, error) {
|
||||||
|
|
||||||
// Compress compresses data if compression is enabled
|
// Compress compresses data if compression is enabled
|
||||||
// and returns data untouched otherwise.
|
// and returns data untouched otherwise.
|
||||||
func (c *Config) Compress(data []byte) []byte {
|
func (c *Compressor) Compress(data []byte) []byte {
|
||||||
if c == nil || !c.Enabled {
|
if c == nil || !c.Enabled {
|
||||||
return data
|
return data
|
||||||
}
|
}
|
||||||
|
@ -107,7 +111,7 @@ func (c *Config) Compress(data []byte) []byte {
|
||||||
return c.compress(data)
|
return c.compress(data)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Config) compress(data []byte) []byte {
|
func (c *Compressor) compress(data []byte) []byte {
|
||||||
maxSize := c.encoder.MaxEncodedSize(len(data))
|
maxSize := c.encoder.MaxEncodedSize(len(data))
|
||||||
compressed := c.encoder.EncodeAll(data, make([]byte, 0, maxSize))
|
compressed := c.encoder.EncodeAll(data, make([]byte, 0, maxSize))
|
||||||
if len(data) < len(compressed) {
|
if len(data) < len(compressed) {
|
||||||
|
@ -117,7 +121,7 @@ func (c *Config) compress(data []byte) []byte {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Close closes encoder and decoder, returns any error occurred.
|
// Close closes encoder and decoder, returns any error occurred.
|
||||||
func (c *Config) Close() error {
|
func (c *Compressor) Close() error {
|
||||||
var err error
|
var err error
|
||||||
if c.encoder != nil {
|
if c.encoder != nil {
|
||||||
err = c.encoder.Close()
|
err = c.encoder.Close()
|
||||||
|
@ -135,7 +139,7 @@ func (c *Config) HasValidCompressionLevel() bool {
|
||||||
c.Level == LevelSmallestSize
|
c.Level == LevelSmallestSize
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Config) compressionLevel() zstd.EncoderLevel {
|
func (c *Compressor) compressionLevel() zstd.EncoderLevel {
|
||||||
switch c.Level {
|
switch c.Level {
|
||||||
case LevelDefault, LevelOptimal:
|
case LevelDefault, LevelOptimal:
|
||||||
return zstd.SpeedDefault
|
return zstd.SpeedDefault
|
||||||
|
|
|
@ -45,7 +45,7 @@ type FSTree struct {
|
||||||
|
|
||||||
log *logger.Logger
|
log *logger.Logger
|
||||||
|
|
||||||
*compression.Config
|
compressor *compression.Compressor
|
||||||
Depth uint64
|
Depth uint64
|
||||||
DirNameLen int
|
DirNameLen int
|
||||||
|
|
||||||
|
@ -82,7 +82,7 @@ func New(opts ...Option) *FSTree {
|
||||||
Permissions: 0o700,
|
Permissions: 0o700,
|
||||||
RootPath: "./",
|
RootPath: "./",
|
||||||
},
|
},
|
||||||
Config: nil,
|
compressor: nil,
|
||||||
Depth: 4,
|
Depth: 4,
|
||||||
DirNameLen: DirNameLen,
|
DirNameLen: DirNameLen,
|
||||||
metrics: &noopMetrics{},
|
metrics: &noopMetrics{},
|
||||||
|
@ -196,7 +196,7 @@ func (t *FSTree) iterate(ctx context.Context, depth uint64, curPath []string, pr
|
||||||
}
|
}
|
||||||
|
|
||||||
if err == nil {
|
if err == nil {
|
||||||
data, err = t.Decompress(data)
|
data, err = t.compressor.Decompress(data)
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if prm.IgnoreErrors {
|
if prm.IgnoreErrors {
|
||||||
|
@ -405,7 +405,7 @@ func (t *FSTree) Put(ctx context.Context, prm common.PutPrm) (common.PutRes, err
|
||||||
return common.PutRes{}, err
|
return common.PutRes{}, err
|
||||||
}
|
}
|
||||||
if !prm.DontCompress {
|
if !prm.DontCompress {
|
||||||
prm.RawData = t.Compress(prm.RawData)
|
prm.RawData = t.compressor.Compress(prm.RawData)
|
||||||
}
|
}
|
||||||
|
|
||||||
size = len(prm.RawData)
|
size = len(prm.RawData)
|
||||||
|
@ -448,7 +448,7 @@ func (t *FSTree) Get(ctx context.Context, prm common.GetPrm) (common.GetRes, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
data, err = t.Decompress(data)
|
data, err = t.compressor.Decompress(data)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return common.GetRes{}, err
|
return common.GetRes{}, err
|
||||||
}
|
}
|
||||||
|
@ -597,12 +597,12 @@ func (t *FSTree) Path() string {
|
||||||
}
|
}
|
||||||
|
|
||||||
// SetCompressor implements common.Storage.
|
// SetCompressor implements common.Storage.
|
||||||
func (t *FSTree) SetCompressor(cc *compression.Config) {
|
func (t *FSTree) SetCompressor(cc *compression.Compressor) {
|
||||||
t.Config = cc
|
t.compressor = cc
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *FSTree) Compressor() *compression.Config {
|
func (t *FSTree) Compressor() *compression.Compressor {
|
||||||
return t.Config
|
return t.compressor
|
||||||
}
|
}
|
||||||
|
|
||||||
// SetReportErrorFunc implements common.Storage.
|
// SetReportErrorFunc implements common.Storage.
|
||||||
|
|
|
@ -16,7 +16,7 @@ func (s *memstoreImpl) Init() error
|
||||||
func (s *memstoreImpl) Close(context.Context) error { return nil }
|
func (s *memstoreImpl) Close(context.Context) error { return nil }
|
||||||
func (s *memstoreImpl) Type() string { return Type }
|
func (s *memstoreImpl) Type() string { return Type }
|
||||||
func (s *memstoreImpl) Path() string { return s.rootPath }
|
func (s *memstoreImpl) Path() string { return s.rootPath }
|
||||||
func (s *memstoreImpl) SetCompressor(cc *compression.Config) { s.compression = cc }
|
func (s *memstoreImpl) SetCompressor(cc *compression.Compressor) { s.compression = cc }
|
||||||
func (s *memstoreImpl) Compressor() *compression.Config { return s.compression }
|
func (s *memstoreImpl) Compressor() *compression.Compressor { return s.compression }
|
||||||
func (s *memstoreImpl) SetReportErrorFunc(func(context.Context, string, error)) {}
|
func (s *memstoreImpl) SetReportErrorFunc(func(context.Context, string, error)) {}
|
||||||
func (s *memstoreImpl) SetParentID(string) {}
|
func (s *memstoreImpl) SetParentID(string) {}
|
||||||
|
|
|
@ -7,7 +7,7 @@ import (
|
||||||
type cfg struct {
|
type cfg struct {
|
||||||
rootPath string
|
rootPath string
|
||||||
readOnly bool
|
readOnly bool
|
||||||
compression *compression.Config
|
compression *compression.Compressor
|
||||||
}
|
}
|
||||||
|
|
||||||
func defaultConfig() *cfg {
|
func defaultConfig() *cfg {
|
||||||
|
|
|
@ -17,8 +17,8 @@ type cfg struct {
|
||||||
|
|
||||||
Type func() string
|
Type func() string
|
||||||
Path func() string
|
Path func() string
|
||||||
SetCompressor func(cc *compression.Config)
|
SetCompressor func(cc *compression.Compressor)
|
||||||
Compressor func() *compression.Config
|
Compressor func() *compression.Compressor
|
||||||
SetReportErrorFunc func(f func(context.Context, string, error))
|
SetReportErrorFunc func(f func(context.Context, string, error))
|
||||||
|
|
||||||
Get func(common.GetPrm) (common.GetRes, error)
|
Get func(common.GetPrm) (common.GetRes, error)
|
||||||
|
@ -45,11 +45,11 @@ func WithClose(f func() error) Option { return func(c *cfg) { c
|
||||||
func WithType(f func() string) Option { return func(c *cfg) { c.overrides.Type = f } }
|
func WithType(f func() string) Option { return func(c *cfg) { c.overrides.Type = f } }
|
||||||
func WithPath(f func() string) Option { return func(c *cfg) { c.overrides.Path = f } }
|
func WithPath(f func() string) Option { return func(c *cfg) { c.overrides.Path = f } }
|
||||||
|
|
||||||
func WithSetCompressor(f func(*compression.Config)) Option {
|
func WithSetCompressor(f func(*compression.Compressor)) Option {
|
||||||
return func(c *cfg) { c.overrides.SetCompressor = f }
|
return func(c *cfg) { c.overrides.SetCompressor = f }
|
||||||
}
|
}
|
||||||
|
|
||||||
func WithCompressor(f func() *compression.Config) Option {
|
func WithCompressor(f func() *compression.Compressor) Option {
|
||||||
return func(c *cfg) { c.overrides.Compressor = f }
|
return func(c *cfg) { c.overrides.Compressor = f }
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -116,7 +116,7 @@ func (s *TestStore) Path() string {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *TestStore) SetCompressor(cc *compression.Config) {
|
func (s *TestStore) SetCompressor(cc *compression.Compressor) {
|
||||||
s.mu.RLock()
|
s.mu.RLock()
|
||||||
defer s.mu.RUnlock()
|
defer s.mu.RUnlock()
|
||||||
switch {
|
switch {
|
||||||
|
@ -129,7 +129,7 @@ func (s *TestStore) SetCompressor(cc *compression.Config) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *TestStore) Compressor() *compression.Config {
|
func (s *TestStore) Compressor() *compression.Compressor {
|
||||||
s.mu.RLock()
|
s.mu.RLock()
|
||||||
defer s.mu.RUnlock()
|
defer s.mu.RUnlock()
|
||||||
switch {
|
switch {
|
||||||
|
|
|
@ -52,7 +52,7 @@ type Cache interface {
|
||||||
|
|
||||||
// MainStorage is the interface of the underlying storage of Cache implementations.
|
// MainStorage is the interface of the underlying storage of Cache implementations.
|
||||||
type MainStorage interface {
|
type MainStorage interface {
|
||||||
Compressor() *compression.Config
|
Compressor() *compression.Compressor
|
||||||
Exists(context.Context, common.ExistsPrm) (common.ExistsRes, error)
|
Exists(context.Context, common.ExistsPrm) (common.ExistsRes, error)
|
||||||
Put(context.Context, common.PutPrm) (common.PutRes, error)
|
Put(context.Context, common.PutPrm) (common.PutRes, error)
|
||||||
}
|
}
|
||||||
|
|
Loading…
Add table
Reference in a new issue