forked from TrueCloudLab/frostfs-node
[#1060] blobstor: allow to disable compression based on content-type
For some data compression makes little sense, as it is already compressed. This commit allows to leave such data unchanged based on `Content-Type` attribute. Currently exact, prefix and suffix matching are supported. Signed-off-by: Evgenii Stratonikov <evgeniy@nspcc.ru>
This commit is contained in:
parent
0f1eb743af
commit
0d969d7a06
11 changed files with 133 additions and 8 deletions
|
@ -70,6 +70,7 @@ func TestEngineSection(t *testing.T) {
|
|||
require.Equal(t, "tmp/0/blob", blob.Path())
|
||||
require.EqualValues(t, 0644, blob.Perm())
|
||||
require.Equal(t, true, blob.Compress())
|
||||
require.Equal(t, []string{"audio/*", "video/*"}, blob.UncompressableContentTypes())
|
||||
require.EqualValues(t, 5, blob.ShallowDepth())
|
||||
require.EqualValues(t, 102400, blob.SmallSizeLimit())
|
||||
|
||||
|
@ -99,6 +100,7 @@ func TestEngineSection(t *testing.T) {
|
|||
require.Equal(t, "tmp/1/blob", blob.Path())
|
||||
require.EqualValues(t, 0644, blob.Perm())
|
||||
require.Equal(t, false, blob.Compress())
|
||||
require.Equal(t, []string(nil), blob.UncompressableContentTypes())
|
||||
require.EqualValues(t, 5, blob.ShallowDepth())
|
||||
require.EqualValues(t, 102400, blob.SmallSizeLimit())
|
||||
|
||||
|
|
|
@ -88,6 +88,15 @@ func (x *Config) Compress() bool {
|
|||
)
|
||||
}
|
||||
|
||||
// UncompressableContentTypes returns value of "compress_skip_content_types" config parameter.
|
||||
//
|
||||
// Returns nil if a value is missing or is invalid.
|
||||
func (x *Config) UncompressableContentTypes() []string {
|
||||
return config.StringSliceSafe(
|
||||
(*config.Config)(x),
|
||||
"compression_exclude_content_types")
|
||||
}
|
||||
|
||||
// SmallSizeLimit returns value of "small_object_size" config parameter.
|
||||
//
|
||||
// Returns SmallSizeLimitDefault if value is not a positive number.
|
||||
|
|
|
@ -90,6 +90,7 @@ NEOFS_STORAGE_SHARD_0_METABASE_PERM=0644
|
|||
NEOFS_STORAGE_SHARD_0_BLOBSTOR_PATH=tmp/0/blob
|
||||
NEOFS_STORAGE_SHARD_0_BLOBSTOR_PERM=0644
|
||||
NEOFS_STORAGE_SHARD_0_BLOBSTOR_COMPRESS=true
|
||||
NEOFS_STORAGE_SHARD_0_BLOBSTOR_COMPRESSION_EXCLUDE_CONTENT_TYPES="audio/* video/*"
|
||||
NEOFS_STORAGE_SHARD_0_BLOBSTOR_DEPTH=5
|
||||
NEOFS_STORAGE_SHARD_0_BLOBSTOR_SMALL_OBJECT_SIZE=102400
|
||||
### Blobovnicza config
|
||||
|
|
|
@ -136,6 +136,9 @@
|
|||
"path": "tmp/0/blob",
|
||||
"perm": "0644",
|
||||
"compress": true,
|
||||
"compression_exclude_content_types": [
|
||||
"audio/*", "video/*"
|
||||
],
|
||||
"depth": 5,
|
||||
"small_object_size": 102400,
|
||||
"blobovnicza": {
|
||||
|
|
|
@ -146,6 +146,9 @@ storage:
|
|||
blobstor:
|
||||
path: tmp/0/blob # blobstor path
|
||||
compress: true # turn on/off zstd(level 3) compression of stored objects
|
||||
compression_exclude_content_types:
|
||||
- audio/*
|
||||
- video/*
|
||||
|
||||
gc:
|
||||
remover_batch_size: 150 # number of objects to be removed by the garbage collector
|
||||
|
|
|
@ -31,7 +31,7 @@ func testAddress() *objectSDK.Address {
|
|||
return addr
|
||||
}
|
||||
|
||||
func testObject(sz uint64) *object.Object {
|
||||
func testObjectRaw(sz uint64) *object.RawObject {
|
||||
raw := object.NewRaw()
|
||||
|
||||
addr := testAddress()
|
||||
|
@ -46,7 +46,11 @@ func testObject(sz uint64) *object.Object {
|
|||
raw.SetPayload(raw.Payload()[:sz-(ln-sz)])
|
||||
}
|
||||
|
||||
return raw.Object()
|
||||
return raw
|
||||
}
|
||||
|
||||
func testObject(sz uint64) *object.Object {
|
||||
return testObjectRaw(sz).Object()
|
||||
}
|
||||
|
||||
func TestBlobovniczas(t *testing.T) {
|
||||
|
|
|
@ -28,6 +28,8 @@ type cfg struct {
|
|||
|
||||
compressionEnabled bool
|
||||
|
||||
uncompressableContentTypes []string
|
||||
|
||||
compressor func([]byte) []byte
|
||||
|
||||
decompressor func([]byte) ([]byte, error)
|
||||
|
@ -117,6 +119,14 @@ func WithCompressObjects(comp bool) Option {
|
|||
}
|
||||
}
|
||||
|
||||
// WithUncompressableContentTypes returns option to disable decompression
|
||||
// for specific content types as seen by object.AttributeContentType attribute.
|
||||
func WithUncompressableContentTypes(values []string) Option {
|
||||
return func(c *cfg) {
|
||||
c.uncompressableContentTypes = values
|
||||
}
|
||||
}
|
||||
|
||||
// WithRootPath returns option to set path to root directory
|
||||
// of the fs tree to write the objects.
|
||||
func WithRootPath(rootDir string) Option {
|
||||
|
|
|
@ -5,6 +5,7 @@ import (
|
|||
"testing"
|
||||
|
||||
"github.com/nspcc-dev/neofs-node/pkg/core/object"
|
||||
objectSDK "github.com/nspcc-dev/neofs-sdk-go/object"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
|
@ -76,3 +77,65 @@ func TestCompression(t *testing.T) {
|
|||
testGet(t, blobStor, 2)
|
||||
require.NoError(t, blobStor.Close())
|
||||
}
|
||||
|
||||
func TestBlobstor_needsCompression(t *testing.T) {
|
||||
const smallSizeLimit = 512
|
||||
newBlobStor := func(t *testing.T, compress bool, ct ...string) *BlobStor {
|
||||
dir, err := os.MkdirTemp("", "neofs*")
|
||||
require.NoError(t, err)
|
||||
t.Cleanup(func() { _ = os.RemoveAll(dir) })
|
||||
|
||||
bs := New(WithCompressObjects(compress),
|
||||
WithRootPath(dir),
|
||||
WithSmallSizeLimit(smallSizeLimit),
|
||||
WithBlobovniczaShallowWidth(1),
|
||||
WithUncompressableContentTypes(ct))
|
||||
require.NoError(t, bs.Open())
|
||||
require.NoError(t, bs.Init())
|
||||
return bs
|
||||
}
|
||||
|
||||
newObjectWithCt := func(contentType string) *object.Object {
|
||||
obj := testObjectRaw(smallSizeLimit + 1)
|
||||
if contentType != "" {
|
||||
a := objectSDK.NewAttribute()
|
||||
a.SetKey(objectSDK.AttributeContentType)
|
||||
a.SetValue(contentType)
|
||||
obj.SetAttributes(a)
|
||||
}
|
||||
return obj.Object()
|
||||
}
|
||||
|
||||
t.Run("content-types specified", func(t *testing.T) {
|
||||
b := newBlobStor(t, true, "audio/*", "*/x-mpeg", "*/mpeg", "application/x-midi")
|
||||
|
||||
obj := newObjectWithCt("video/mpeg")
|
||||
require.False(t, b.needsCompression(obj))
|
||||
|
||||
obj = newObjectWithCt("audio/aiff")
|
||||
require.False(t, b.needsCompression(obj))
|
||||
|
||||
obj = newObjectWithCt("application/x-midi")
|
||||
require.False(t, b.needsCompression(obj))
|
||||
|
||||
obj = newObjectWithCt("text/plain")
|
||||
require.True(t, b.needsCompression(obj))
|
||||
|
||||
obj = newObjectWithCt("")
|
||||
require.True(t, b.needsCompression(obj))
|
||||
})
|
||||
t.Run("content-types omitted", func(t *testing.T) {
|
||||
b := newBlobStor(t, true)
|
||||
obj := newObjectWithCt("video/mpeg")
|
||||
require.True(t, b.needsCompression(obj))
|
||||
})
|
||||
t.Run("compress disabled", func(t *testing.T) {
|
||||
b := newBlobStor(t, false, "video/mpeg")
|
||||
|
||||
obj := newObjectWithCt("video/mpeg")
|
||||
require.False(t, b.needsCompression(obj))
|
||||
|
||||
obj = newObjectWithCt("text/plain")
|
||||
require.False(t, b.needsCompression(obj))
|
||||
})
|
||||
}
|
||||
|
|
|
@ -66,7 +66,7 @@ func TestIterateObjects(t *testing.T) {
|
|||
}
|
||||
|
||||
for _, v := range mObjs {
|
||||
_, err := blobStor.PutRaw(v.addr, v.data)
|
||||
_, err := blobStor.PutRaw(v.addr, v.data, true)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
|
|
|
@ -2,7 +2,9 @@ package blobstor
|
|||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"github.com/nspcc-dev/neofs-node/pkg/core/object"
|
||||
storagelog "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/internal/log"
|
||||
objectSDK "github.com/nspcc-dev/neofs-sdk-go/object"
|
||||
)
|
||||
|
@ -32,15 +34,43 @@ func (b *BlobStor) Put(prm *PutPrm) (*PutRes, error) {
|
|||
return nil, fmt.Errorf("could not marshal the object: %w", err)
|
||||
}
|
||||
|
||||
return b.PutRaw(prm.obj.Address(), data)
|
||||
return b.PutRaw(prm.obj.Address(), data, b.needsCompression(prm.obj))
|
||||
}
|
||||
|
||||
func (b *BlobStor) needsCompression(obj *object.Object) bool {
|
||||
if !b.compressionEnabled || len(b.uncompressableContentTypes) == 0 {
|
||||
return b.compressionEnabled
|
||||
}
|
||||
|
||||
for _, attr := range obj.Attributes() {
|
||||
if attr.Key() == objectSDK.AttributeContentType {
|
||||
for _, value := range b.uncompressableContentTypes {
|
||||
match := false
|
||||
switch {
|
||||
case len(value) > 0 && value[len(value)-1] == '*':
|
||||
match = strings.HasPrefix(attr.Value(), value[:len(value)-1])
|
||||
case len(value) > 0 && value[0] == '*':
|
||||
match = strings.HasSuffix(attr.Value(), value[1:])
|
||||
default:
|
||||
match = attr.Value() == value
|
||||
}
|
||||
if match {
|
||||
return false
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return b.compressionEnabled
|
||||
}
|
||||
|
||||
// PutRaw saves already marshaled object in BLOB storage.
|
||||
func (b *BlobStor) PutRaw(addr *objectSDK.Address, data []byte) (*PutRes, error) {
|
||||
func (b *BlobStor) PutRaw(addr *objectSDK.Address, data []byte, compress bool) (*PutRes, error) {
|
||||
big := b.isBig(data)
|
||||
|
||||
// compress object data
|
||||
data = b.compressor(data)
|
||||
if compress {
|
||||
data = b.compressor(data)
|
||||
}
|
||||
|
||||
if big {
|
||||
// save object in shallow dir
|
||||
|
|
|
@ -125,7 +125,7 @@ func (c *cache) flushBigObjects() {
|
|||
return nil
|
||||
}
|
||||
|
||||
if _, err := c.blobstor.PutRaw(addr, data); err != nil {
|
||||
if _, err := c.blobstor.PutRaw(addr, data, true); err != nil {
|
||||
c.log.Error("cant flush object to blobstor", zap.Error(err))
|
||||
return nil
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue