diff --git a/cmd/neofs-node/config/engine/config_test.go b/cmd/neofs-node/config/engine/config_test.go index e3a73766..fd4df9af 100644 --- a/cmd/neofs-node/config/engine/config_test.go +++ b/cmd/neofs-node/config/engine/config_test.go @@ -70,6 +70,7 @@ func TestEngineSection(t *testing.T) { require.Equal(t, "tmp/0/blob", blob.Path()) require.EqualValues(t, 0644, blob.Perm()) require.Equal(t, true, blob.Compress()) + require.Equal(t, []string{"audio/*", "video/*"}, blob.UncompressableContentTypes()) require.EqualValues(t, 5, blob.ShallowDepth()) require.EqualValues(t, 102400, blob.SmallSizeLimit()) @@ -99,6 +100,7 @@ func TestEngineSection(t *testing.T) { require.Equal(t, "tmp/1/blob", blob.Path()) require.EqualValues(t, 0644, blob.Perm()) require.Equal(t, false, blob.Compress()) + require.Equal(t, []string(nil), blob.UncompressableContentTypes()) require.EqualValues(t, 5, blob.ShallowDepth()) require.EqualValues(t, 102400, blob.SmallSizeLimit()) diff --git a/cmd/neofs-node/config/engine/shard/blobstor/config.go b/cmd/neofs-node/config/engine/shard/blobstor/config.go index 51c380ce..f3e3924b 100644 --- a/cmd/neofs-node/config/engine/shard/blobstor/config.go +++ b/cmd/neofs-node/config/engine/shard/blobstor/config.go @@ -88,6 +88,15 @@ func (x *Config) Compress() bool { ) } +// UncompressableContentTypes returns value of "compress_skip_content_types" config parameter. +// +// Returns nil if a value is missing or is invalid. +func (x *Config) UncompressableContentTypes() []string { + return config.StringSliceSafe( + (*config.Config)(x), + "compression_exclude_content_types") +} + // SmallSizeLimit returns value of "small_object_size" config parameter. // // Returns SmallSizeLimitDefault if value is not a positive number. diff --git a/config/example/node.env b/config/example/node.env index 0ee9b286..f241817c 100644 --- a/config/example/node.env +++ b/config/example/node.env @@ -90,6 +90,7 @@ NEOFS_STORAGE_SHARD_0_METABASE_PERM=0644 NEOFS_STORAGE_SHARD_0_BLOBSTOR_PATH=tmp/0/blob NEOFS_STORAGE_SHARD_0_BLOBSTOR_PERM=0644 NEOFS_STORAGE_SHARD_0_BLOBSTOR_COMPRESS=true +NEOFS_STORAGE_SHARD_0_BLOBSTOR_COMPRESSION_EXCLUDE_CONTENT_TYPES="audio/* video/*" NEOFS_STORAGE_SHARD_0_BLOBSTOR_DEPTH=5 NEOFS_STORAGE_SHARD_0_BLOBSTOR_SMALL_OBJECT_SIZE=102400 ### Blobovnicza config diff --git a/config/example/node.json b/config/example/node.json index ca8b8112..7c288739 100644 --- a/config/example/node.json +++ b/config/example/node.json @@ -136,6 +136,9 @@ "path": "tmp/0/blob", "perm": "0644", "compress": true, + "compression_exclude_content_types": [ + "audio/*", "video/*" + ], "depth": 5, "small_object_size": 102400, "blobovnicza": { diff --git a/config/example/node.yaml b/config/example/node.yaml index 69e5f846..217d81b0 100644 --- a/config/example/node.yaml +++ b/config/example/node.yaml @@ -146,6 +146,9 @@ storage: blobstor: path: tmp/0/blob # blobstor path compress: true # turn on/off zstd(level 3) compression of stored objects + compression_exclude_content_types: + - audio/* + - video/* gc: remover_batch_size: 150 # number of objects to be removed by the garbage collector diff --git a/pkg/local_object_storage/blobstor/blobovnicza_test.go b/pkg/local_object_storage/blobstor/blobovnicza_test.go index 0b2d9af7..76a96b80 100644 --- a/pkg/local_object_storage/blobstor/blobovnicza_test.go +++ b/pkg/local_object_storage/blobstor/blobovnicza_test.go @@ -31,7 +31,7 @@ func testAddress() *objectSDK.Address { return addr } -func testObject(sz uint64) *object.Object { +func testObjectRaw(sz uint64) *object.RawObject { raw := object.NewRaw() addr := testAddress() @@ -46,7 +46,11 @@ func testObject(sz uint64) *object.Object { raw.SetPayload(raw.Payload()[:sz-(ln-sz)]) } - return raw.Object() + return raw +} + +func testObject(sz uint64) *object.Object { + return testObjectRaw(sz).Object() } func TestBlobovniczas(t *testing.T) { diff --git a/pkg/local_object_storage/blobstor/blobstor.go b/pkg/local_object_storage/blobstor/blobstor.go index 395c85a8..5c927b07 100644 --- a/pkg/local_object_storage/blobstor/blobstor.go +++ b/pkg/local_object_storage/blobstor/blobstor.go @@ -28,6 +28,8 @@ type cfg struct { compressionEnabled bool + uncompressableContentTypes []string + compressor func([]byte) []byte decompressor func([]byte) ([]byte, error) @@ -117,6 +119,14 @@ func WithCompressObjects(comp bool) Option { } } +// WithUncompressableContentTypes returns option to disable decompression +// for specific content types as seen by object.AttributeContentType attribute. +func WithUncompressableContentTypes(values []string) Option { + return func(c *cfg) { + c.uncompressableContentTypes = values + } +} + // WithRootPath returns option to set path to root directory // of the fs tree to write the objects. func WithRootPath(rootDir string) Option { diff --git a/pkg/local_object_storage/blobstor/blobstor_test.go b/pkg/local_object_storage/blobstor/blobstor_test.go index 9af7956e..df614e4e 100644 --- a/pkg/local_object_storage/blobstor/blobstor_test.go +++ b/pkg/local_object_storage/blobstor/blobstor_test.go @@ -5,6 +5,7 @@ import ( "testing" "github.com/nspcc-dev/neofs-node/pkg/core/object" + objectSDK "github.com/nspcc-dev/neofs-sdk-go/object" "github.com/stretchr/testify/require" ) @@ -76,3 +77,65 @@ func TestCompression(t *testing.T) { testGet(t, blobStor, 2) require.NoError(t, blobStor.Close()) } + +func TestBlobstor_needsCompression(t *testing.T) { + const smallSizeLimit = 512 + newBlobStor := func(t *testing.T, compress bool, ct ...string) *BlobStor { + dir, err := os.MkdirTemp("", "neofs*") + require.NoError(t, err) + t.Cleanup(func() { _ = os.RemoveAll(dir) }) + + bs := New(WithCompressObjects(compress), + WithRootPath(dir), + WithSmallSizeLimit(smallSizeLimit), + WithBlobovniczaShallowWidth(1), + WithUncompressableContentTypes(ct)) + require.NoError(t, bs.Open()) + require.NoError(t, bs.Init()) + return bs + } + + newObjectWithCt := func(contentType string) *object.Object { + obj := testObjectRaw(smallSizeLimit + 1) + if contentType != "" { + a := objectSDK.NewAttribute() + a.SetKey(objectSDK.AttributeContentType) + a.SetValue(contentType) + obj.SetAttributes(a) + } + return obj.Object() + } + + t.Run("content-types specified", func(t *testing.T) { + b := newBlobStor(t, true, "audio/*", "*/x-mpeg", "*/mpeg", "application/x-midi") + + obj := newObjectWithCt("video/mpeg") + require.False(t, b.needsCompression(obj)) + + obj = newObjectWithCt("audio/aiff") + require.False(t, b.needsCompression(obj)) + + obj = newObjectWithCt("application/x-midi") + require.False(t, b.needsCompression(obj)) + + obj = newObjectWithCt("text/plain") + require.True(t, b.needsCompression(obj)) + + obj = newObjectWithCt("") + require.True(t, b.needsCompression(obj)) + }) + t.Run("content-types omitted", func(t *testing.T) { + b := newBlobStor(t, true) + obj := newObjectWithCt("video/mpeg") + require.True(t, b.needsCompression(obj)) + }) + t.Run("compress disabled", func(t *testing.T) { + b := newBlobStor(t, false, "video/mpeg") + + obj := newObjectWithCt("video/mpeg") + require.False(t, b.needsCompression(obj)) + + obj = newObjectWithCt("text/plain") + require.False(t, b.needsCompression(obj)) + }) +} diff --git a/pkg/local_object_storage/blobstor/iterate_test.go b/pkg/local_object_storage/blobstor/iterate_test.go index e300f1b7..49bd0fca 100644 --- a/pkg/local_object_storage/blobstor/iterate_test.go +++ b/pkg/local_object_storage/blobstor/iterate_test.go @@ -66,7 +66,7 @@ func TestIterateObjects(t *testing.T) { } for _, v := range mObjs { - _, err := blobStor.PutRaw(v.addr, v.data) + _, err := blobStor.PutRaw(v.addr, v.data, true) require.NoError(t, err) } diff --git a/pkg/local_object_storage/blobstor/put.go b/pkg/local_object_storage/blobstor/put.go index 50757344..c72b4455 100644 --- a/pkg/local_object_storage/blobstor/put.go +++ b/pkg/local_object_storage/blobstor/put.go @@ -2,7 +2,9 @@ package blobstor import ( "fmt" + "strings" + "github.com/nspcc-dev/neofs-node/pkg/core/object" storagelog "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/internal/log" objectSDK "github.com/nspcc-dev/neofs-sdk-go/object" ) @@ -32,15 +34,43 @@ func (b *BlobStor) Put(prm *PutPrm) (*PutRes, error) { return nil, fmt.Errorf("could not marshal the object: %w", err) } - return b.PutRaw(prm.obj.Address(), data) + return b.PutRaw(prm.obj.Address(), data, b.needsCompression(prm.obj)) +} + +func (b *BlobStor) needsCompression(obj *object.Object) bool { + if !b.compressionEnabled || len(b.uncompressableContentTypes) == 0 { + return b.compressionEnabled + } + + for _, attr := range obj.Attributes() { + if attr.Key() == objectSDK.AttributeContentType { + for _, value := range b.uncompressableContentTypes { + match := false + switch { + case len(value) > 0 && value[len(value)-1] == '*': + match = strings.HasPrefix(attr.Value(), value[:len(value)-1]) + case len(value) > 0 && value[0] == '*': + match = strings.HasSuffix(attr.Value(), value[1:]) + default: + match = attr.Value() == value + } + if match { + return false + } + } + } + } + + return b.compressionEnabled } // PutRaw saves already marshaled object in BLOB storage. -func (b *BlobStor) PutRaw(addr *objectSDK.Address, data []byte) (*PutRes, error) { +func (b *BlobStor) PutRaw(addr *objectSDK.Address, data []byte, compress bool) (*PutRes, error) { big := b.isBig(data) - // compress object data - data = b.compressor(data) + if compress { + data = b.compressor(data) + } if big { // save object in shallow dir diff --git a/pkg/local_object_storage/writecache/flush.go b/pkg/local_object_storage/writecache/flush.go index b2faf7a1..1684dfcc 100644 --- a/pkg/local_object_storage/writecache/flush.go +++ b/pkg/local_object_storage/writecache/flush.go @@ -125,7 +125,7 @@ func (c *cache) flushBigObjects() { return nil } - if _, err := c.blobstor.PutRaw(addr, data); err != nil { + if _, err := c.blobstor.PutRaw(addr, data, true); err != nil { c.log.Error("cant flush object to blobstor", zap.Error(err)) return nil }