diff --git a/pkg/local_object_storage/blobstor/blobstor_test.go b/pkg/local_object_storage/blobstor/blobstor_test.go index df614e4ecc..3b32fe777a 100644 --- a/pkg/local_object_storage/blobstor/blobstor_test.go +++ b/pkg/local_object_storage/blobstor/blobstor_test.go @@ -110,32 +110,32 @@ func TestBlobstor_needsCompression(t *testing.T) { b := newBlobStor(t, true, "audio/*", "*/x-mpeg", "*/mpeg", "application/x-midi") obj := newObjectWithCt("video/mpeg") - require.False(t, b.needsCompression(obj)) + require.False(t, b.NeedsCompression(obj)) obj = newObjectWithCt("audio/aiff") - require.False(t, b.needsCompression(obj)) + require.False(t, b.NeedsCompression(obj)) obj = newObjectWithCt("application/x-midi") - require.False(t, b.needsCompression(obj)) + require.False(t, b.NeedsCompression(obj)) obj = newObjectWithCt("text/plain") - require.True(t, b.needsCompression(obj)) + require.True(t, b.NeedsCompression(obj)) obj = newObjectWithCt("") - require.True(t, b.needsCompression(obj)) + require.True(t, b.NeedsCompression(obj)) }) t.Run("content-types omitted", func(t *testing.T) { b := newBlobStor(t, true) obj := newObjectWithCt("video/mpeg") - require.True(t, b.needsCompression(obj)) + require.True(t, b.NeedsCompression(obj)) }) t.Run("compress disabled", func(t *testing.T) { b := newBlobStor(t, false, "video/mpeg") obj := newObjectWithCt("video/mpeg") - require.False(t, b.needsCompression(obj)) + require.False(t, b.NeedsCompression(obj)) obj = newObjectWithCt("text/plain") - require.False(t, b.needsCompression(obj)) + require.False(t, b.NeedsCompression(obj)) }) } diff --git a/pkg/local_object_storage/blobstor/put.go b/pkg/local_object_storage/blobstor/put.go index c72b445517..dd30089eed 100644 --- a/pkg/local_object_storage/blobstor/put.go +++ b/pkg/local_object_storage/blobstor/put.go @@ -34,10 +34,14 @@ func (b *BlobStor) Put(prm *PutPrm) (*PutRes, error) { return nil, fmt.Errorf("could not marshal the object: %w", err) } - return b.PutRaw(prm.obj.Address(), data, b.needsCompression(prm.obj)) + return b.PutRaw(prm.obj.Address(), data, b.NeedsCompression(prm.obj)) } -func (b *BlobStor) needsCompression(obj *object.Object) bool { +// NeedsCompression returns true if object should be compressed. +// For object to be compressed 2 conditions must hold: +// 1. Compression is enabled in settings. +// 2. Object MIME Content-Type is allowed for compression. +func (b *BlobStor) NeedsCompression(obj *object.Object) bool { if !b.compressionEnabled || len(b.uncompressableContentTypes) == 0 { return b.compressionEnabled } diff --git a/pkg/local_object_storage/writecache/flush.go b/pkg/local_object_storage/writecache/flush.go index 1684dfcc66..8ceb8e3ed3 100644 --- a/pkg/local_object_storage/writecache/flush.go +++ b/pkg/local_object_storage/writecache/flush.go @@ -125,11 +125,21 @@ func (c *cache) flushBigObjects() { return nil } - if _, err := c.blobstor.PutRaw(addr, data, true); err != nil { + c.mtx.Lock() + _, compress := c.compressFlags[sAddr] + c.mtx.Unlock() + + if _, err := c.blobstor.PutRaw(addr, data, compress); err != nil { c.log.Error("cant flush object to blobstor", zap.Error(err)) return nil } + if compress { + c.mtx.Lock() + delete(c.compressFlags, sAddr) + c.mtx.Unlock() + } + // mark object as flushed c.store.flushed.Add(sAddr, false) diff --git a/pkg/local_object_storage/writecache/persist.go b/pkg/local_object_storage/writecache/persist.go index b810c77449..035e8e8ddc 100644 --- a/pkg/local_object_storage/writecache/persist.go +++ b/pkg/local_object_storage/writecache/persist.go @@ -99,6 +99,11 @@ func (c *cache) persistBigObject(objInfo objectInfo) { err := c.fsTree.Put(objInfo.obj.Address(), objInfo.data) if err == nil { metaIndex = 1 + if c.blobstor.NeedsCompression(objInfo.obj) { + c.mtx.Lock() + c.compressFlags[objInfo.addr] = struct{}{} + c.mtx.Unlock() + } c.objCounters.IncFS() storagelog.Write(c.log, storagelog.AddressField(objInfo.addr), storagelog.OpField("fstree PUT")) } diff --git a/pkg/local_object_storage/writecache/writecache.go b/pkg/local_object_storage/writecache/writecache.go index fa95141311..2addf23900 100644 --- a/pkg/local_object_storage/writecache/writecache.go +++ b/pkg/local_object_storage/writecache/writecache.go @@ -31,10 +31,14 @@ type Cache interface { type cache struct { options - // mtx protects mem field, statistics and counters. + // mtx protects mem field, statistics, counters and compressFlags. mtx sync.RWMutex mem []objectInfo + // compressFlags maps address of a big object to boolean value indicating + // whether object should be compressed. + compressFlags map[string]struct{} + // curMemSize is the current size of all objects cached in memory. curMemSize uint64 @@ -80,6 +84,7 @@ func New(opts ...Option) Cache { closeCh: make(chan struct{}), evictCh: make(chan []byte), + compressFlags: make(map[string]struct{}), options: options{ log: zap.NewNop(), maxMemSize: maxInMemorySizeBytes,