forked from TrueCloudLab/frostfs-node
[#1060] writecache: compress big object if needed
Small objects use `blobstor.Put`, so no changes are required. Signed-off-by: Evgenii Stratonikov <evgeniy@nspcc.ru>
This commit is contained in:
parent
0d969d7a06
commit
486d5c2e86
5 changed files with 36 additions and 12 deletions
|
@ -110,32 +110,32 @@ func TestBlobstor_needsCompression(t *testing.T) {
|
|||
b := newBlobStor(t, true, "audio/*", "*/x-mpeg", "*/mpeg", "application/x-midi")
|
||||
|
||||
obj := newObjectWithCt("video/mpeg")
|
||||
require.False(t, b.needsCompression(obj))
|
||||
require.False(t, b.NeedsCompression(obj))
|
||||
|
||||
obj = newObjectWithCt("audio/aiff")
|
||||
require.False(t, b.needsCompression(obj))
|
||||
require.False(t, b.NeedsCompression(obj))
|
||||
|
||||
obj = newObjectWithCt("application/x-midi")
|
||||
require.False(t, b.needsCompression(obj))
|
||||
require.False(t, b.NeedsCompression(obj))
|
||||
|
||||
obj = newObjectWithCt("text/plain")
|
||||
require.True(t, b.needsCompression(obj))
|
||||
require.True(t, b.NeedsCompression(obj))
|
||||
|
||||
obj = newObjectWithCt("")
|
||||
require.True(t, b.needsCompression(obj))
|
||||
require.True(t, b.NeedsCompression(obj))
|
||||
})
|
||||
t.Run("content-types omitted", func(t *testing.T) {
|
||||
b := newBlobStor(t, true)
|
||||
obj := newObjectWithCt("video/mpeg")
|
||||
require.True(t, b.needsCompression(obj))
|
||||
require.True(t, b.NeedsCompression(obj))
|
||||
})
|
||||
t.Run("compress disabled", func(t *testing.T) {
|
||||
b := newBlobStor(t, false, "video/mpeg")
|
||||
|
||||
obj := newObjectWithCt("video/mpeg")
|
||||
require.False(t, b.needsCompression(obj))
|
||||
require.False(t, b.NeedsCompression(obj))
|
||||
|
||||
obj = newObjectWithCt("text/plain")
|
||||
require.False(t, b.needsCompression(obj))
|
||||
require.False(t, b.NeedsCompression(obj))
|
||||
})
|
||||
}
|
||||
|
|
|
@ -34,10 +34,14 @@ func (b *BlobStor) Put(prm *PutPrm) (*PutRes, error) {
|
|||
return nil, fmt.Errorf("could not marshal the object: %w", err)
|
||||
}
|
||||
|
||||
return b.PutRaw(prm.obj.Address(), data, b.needsCompression(prm.obj))
|
||||
return b.PutRaw(prm.obj.Address(), data, b.NeedsCompression(prm.obj))
|
||||
}
|
||||
|
||||
func (b *BlobStor) needsCompression(obj *object.Object) bool {
|
||||
// NeedsCompression returns true if object should be compressed.
|
||||
// For object to be compressed 2 conditions must hold:
|
||||
// 1. Compression is enabled in settings.
|
||||
// 2. Object MIME Content-Type is allowed for compression.
|
||||
func (b *BlobStor) NeedsCompression(obj *object.Object) bool {
|
||||
if !b.compressionEnabled || len(b.uncompressableContentTypes) == 0 {
|
||||
return b.compressionEnabled
|
||||
}
|
||||
|
|
|
@ -125,11 +125,21 @@ func (c *cache) flushBigObjects() {
|
|||
return nil
|
||||
}
|
||||
|
||||
if _, err := c.blobstor.PutRaw(addr, data, true); err != nil {
|
||||
c.mtx.Lock()
|
||||
_, compress := c.compressFlags[sAddr]
|
||||
c.mtx.Unlock()
|
||||
|
||||
if _, err := c.blobstor.PutRaw(addr, data, compress); err != nil {
|
||||
c.log.Error("cant flush object to blobstor", zap.Error(err))
|
||||
return nil
|
||||
}
|
||||
|
||||
if compress {
|
||||
c.mtx.Lock()
|
||||
delete(c.compressFlags, sAddr)
|
||||
c.mtx.Unlock()
|
||||
}
|
||||
|
||||
// mark object as flushed
|
||||
c.store.flushed.Add(sAddr, false)
|
||||
|
||||
|
|
|
@ -99,6 +99,11 @@ func (c *cache) persistBigObject(objInfo objectInfo) {
|
|||
err := c.fsTree.Put(objInfo.obj.Address(), objInfo.data)
|
||||
if err == nil {
|
||||
metaIndex = 1
|
||||
if c.blobstor.NeedsCompression(objInfo.obj) {
|
||||
c.mtx.Lock()
|
||||
c.compressFlags[objInfo.addr] = struct{}{}
|
||||
c.mtx.Unlock()
|
||||
}
|
||||
c.objCounters.IncFS()
|
||||
storagelog.Write(c.log, storagelog.AddressField(objInfo.addr), storagelog.OpField("fstree PUT"))
|
||||
}
|
||||
|
|
|
@ -31,10 +31,14 @@ type Cache interface {
|
|||
type cache struct {
|
||||
options
|
||||
|
||||
// mtx protects mem field, statistics and counters.
|
||||
// mtx protects mem field, statistics, counters and compressFlags.
|
||||
mtx sync.RWMutex
|
||||
mem []objectInfo
|
||||
|
||||
// compressFlags maps address of a big object to boolean value indicating
|
||||
// whether object should be compressed.
|
||||
compressFlags map[string]struct{}
|
||||
|
||||
// curMemSize is the current size of all objects cached in memory.
|
||||
curMemSize uint64
|
||||
|
||||
|
@ -80,6 +84,7 @@ func New(opts ...Option) Cache {
|
|||
closeCh: make(chan struct{}),
|
||||
evictCh: make(chan []byte),
|
||||
|
||||
compressFlags: make(map[string]struct{}),
|
||||
options: options{
|
||||
log: zap.NewNop(),
|
||||
maxMemSize: maxInMemorySizeBytes,
|
||||
|
|
Loading…
Reference in a new issue