forked from TrueCloudLab/frostfs-node
[#868] blobstor: allow to decompress objects on-the-fly
We should be able to read whatever we have written earlier. Compression setting applies only to the new objects. Signed-off-by: Evgenii Stratonikov <evgeniy@nspcc.ru>
This commit is contained in:
parent
cc377b34d2
commit
dd678cd976
3 changed files with 105 additions and 10 deletions
|
@ -1,6 +1,7 @@
|
||||||
package blobstor
|
package blobstor
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"path"
|
"path"
|
||||||
|
@ -781,7 +782,6 @@ func (b *blobovniczas) updateAndGet(p string, old *uint64) (blobovniczaWithIndex
|
||||||
func (b *blobovniczas) init() error {
|
func (b *blobovniczas) init() error {
|
||||||
b.log.Debug("initializing Blobovnicza's")
|
b.log.Debug("initializing Blobovnicza's")
|
||||||
|
|
||||||
if b.compressionEnabled {
|
|
||||||
zstdC, err := zstdCompressor()
|
zstdC, err := zstdCompressor()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("could not create zstd compressor: %v", err)
|
return fmt.Errorf("could not create zstd compressor: %v", err)
|
||||||
|
@ -790,11 +790,25 @@ func (b *blobovniczas) init() error {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("could not create zstd decompressor: %v", err)
|
return fmt.Errorf("could not create zstd decompressor: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Compression is always done based on config settings.
|
||||||
|
if b.compressionEnabled {
|
||||||
b.compressor = zstdC
|
b.compressor = zstdC
|
||||||
b.decompressor = zstdD
|
|
||||||
} else {
|
} else {
|
||||||
b.compressor = noOpCompressor
|
b.compressor = noOpCompressor
|
||||||
b.decompressor = noOpDecompressor
|
}
|
||||||
|
|
||||||
|
// However we should be able to read any object
|
||||||
|
// we have previously written.
|
||||||
|
b.decompressor = func(data []byte) ([]byte, error) {
|
||||||
|
// Fallback to reading decompressed objects.
|
||||||
|
// For normal objects data is always bigger than 4 bytes, the first check is here
|
||||||
|
// because function interface is rather generic (Go compiler inserts bound
|
||||||
|
// checks anyway).
|
||||||
|
if len(data) < 4 || !bytes.Equal(data[:4], zstdFrameMagic) {
|
||||||
|
return noOpDecompressor(data)
|
||||||
|
}
|
||||||
|
return zstdD(data)
|
||||||
}
|
}
|
||||||
|
|
||||||
return b.iterateBlobovniczas(func(p string, blz *blobovnicza.Blobovnicza) error {
|
return b.iterateBlobovniczas(func(p string, blz *blobovnicza.Blobovnicza) error {
|
||||||
|
|
77
pkg/local_object_storage/blobstor/blobstor_test.go
Normal file
77
pkg/local_object_storage/blobstor/blobstor_test.go
Normal file
|
@ -0,0 +1,77 @@
|
||||||
|
package blobstor
|
||||||
|
|
||||||
|
import (
|
||||||
|
"os"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/nspcc-dev/neofs-node/pkg/core/object"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestCompression(t *testing.T) {
|
||||||
|
dir, err := os.MkdirTemp("", "neofs*")
|
||||||
|
require.NoError(t, err)
|
||||||
|
t.Cleanup(func() { _ = os.RemoveAll(dir) })
|
||||||
|
|
||||||
|
const (
|
||||||
|
smallSizeLimit = 512
|
||||||
|
objCount = 4
|
||||||
|
)
|
||||||
|
|
||||||
|
newBlobStor := func(t *testing.T, compress bool) *BlobStor {
|
||||||
|
bs := New(WithCompressObjects(compress),
|
||||||
|
WithRootPath(dir),
|
||||||
|
WithSmallSizeLimit(smallSizeLimit))
|
||||||
|
require.NoError(t, bs.Open())
|
||||||
|
require.NoError(t, bs.Init())
|
||||||
|
return bs
|
||||||
|
}
|
||||||
|
|
||||||
|
bigObj := make([]*object.Object, objCount)
|
||||||
|
smallObj := make([]*object.Object, objCount)
|
||||||
|
for i := 0; i < objCount; i++ {
|
||||||
|
bigObj[i] = testObject(smallSizeLimit * 2)
|
||||||
|
smallObj[i] = testObject(smallSizeLimit / 2)
|
||||||
|
}
|
||||||
|
|
||||||
|
testGet := func(t *testing.T, b *BlobStor, i int) {
|
||||||
|
res1, err := b.GetSmall(&GetSmallPrm{address: address{smallObj[i].Address()}})
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, smallObj[i], res1.Object())
|
||||||
|
|
||||||
|
res2, err := b.GetBig(&GetBigPrm{address: address{bigObj[i].Address()}})
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, bigObj[i], res2.Object())
|
||||||
|
}
|
||||||
|
|
||||||
|
testPut := func(t *testing.T, b *BlobStor, i int) {
|
||||||
|
prm := new(PutPrm)
|
||||||
|
prm.SetObject(smallObj[i])
|
||||||
|
_, err = b.Put(prm)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
prm = new(PutPrm)
|
||||||
|
prm.SetObject(bigObj[i])
|
||||||
|
_, err = b.Put(prm)
|
||||||
|
require.NoError(t, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Put and Get uncompressed object
|
||||||
|
blobStor := newBlobStor(t, false)
|
||||||
|
testPut(t, blobStor, 0)
|
||||||
|
testGet(t, blobStor, 0)
|
||||||
|
require.NoError(t, blobStor.Close())
|
||||||
|
|
||||||
|
blobStor = newBlobStor(t, true)
|
||||||
|
testGet(t, blobStor, 0) // get uncompressed object with compress enabled
|
||||||
|
testPut(t, blobStor, 1)
|
||||||
|
testGet(t, blobStor, 1)
|
||||||
|
require.NoError(t, blobStor.Close())
|
||||||
|
|
||||||
|
blobStor = newBlobStor(t, false)
|
||||||
|
testGet(t, blobStor, 0) // get old uncompressed object
|
||||||
|
testGet(t, blobStor, 1) // get compressed object with compression disabled
|
||||||
|
testPut(t, blobStor, 2)
|
||||||
|
testGet(t, blobStor, 2)
|
||||||
|
require.NoError(t, blobStor.Close())
|
||||||
|
}
|
|
@ -4,6 +4,10 @@ import (
|
||||||
"github.com/klauspost/compress/zstd"
|
"github.com/klauspost/compress/zstd"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// zstdFrameMagic contains first 4 bytes of any compressed object
|
||||||
|
// https://github.com/klauspost/compress/blob/master/zstd/framedec.go#L58 .
|
||||||
|
var zstdFrameMagic = []byte{0x28, 0xb5, 0x2f, 0xfd}
|
||||||
|
|
||||||
func noOpCompressor(data []byte) []byte {
|
func noOpCompressor(data []byte) []byte {
|
||||||
return data
|
return data
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue