From dd678cd9761b7c1da5a1068e3e01e7625241929d Mon Sep 17 00:00:00 2001 From: Evgenii Stratonikov Date: Thu, 7 Oct 2021 17:50:36 +0300 Subject: [PATCH] [#868] blobstor: allow to decompress objects on-the-fly We should be able to read whatever we have written earlier. Compression setting applies only to the new objects. Signed-off-by: Evgenii Stratonikov --- .../blobstor/blobovnicza.go | 34 +++++--- .../blobstor/blobstor_test.go | 77 +++++++++++++++++++ pkg/local_object_storage/blobstor/compress.go | 4 + 3 files changed, 105 insertions(+), 10 deletions(-) create mode 100644 pkg/local_object_storage/blobstor/blobstor_test.go diff --git a/pkg/local_object_storage/blobstor/blobovnicza.go b/pkg/local_object_storage/blobstor/blobovnicza.go index 355445ba..4808a0db 100644 --- a/pkg/local_object_storage/blobstor/blobovnicza.go +++ b/pkg/local_object_storage/blobstor/blobovnicza.go @@ -1,6 +1,7 @@ package blobstor import ( + "bytes" "errors" "fmt" "path" @@ -781,20 +782,33 @@ func (b *blobovniczas) updateAndGet(p string, old *uint64) (blobovniczaWithIndex func (b *blobovniczas) init() error { b.log.Debug("initializing Blobovnicza's") + zstdC, err := zstdCompressor() + if err != nil { + return fmt.Errorf("could not create zstd compressor: %v", err) + } + zstdD, err := zstdDecompressor() + if err != nil { + return fmt.Errorf("could not create zstd decompressor: %v", err) + } + + // Compression is always done based on config settings. if b.compressionEnabled { - zstdC, err := zstdCompressor() - if err != nil { - return fmt.Errorf("could not create zstd compressor: %v", err) - } - zstdD, err := zstdDecompressor() - if err != nil { - return fmt.Errorf("could not create zstd decompressor: %v", err) - } b.compressor = zstdC - b.decompressor = zstdD } else { b.compressor = noOpCompressor - b.decompressor = noOpDecompressor + } + + // However we should be able to read any object + // we have previously written. + b.decompressor = func(data []byte) ([]byte, error) { + // Fallback to reading decompressed objects. + // For normal objects data is always bigger than 4 bytes, the first check is here + // because function interface is rather generic (Go compiler inserts bound + // checks anyway). + if len(data) < 4 || !bytes.Equal(data[:4], zstdFrameMagic) { + return noOpDecompressor(data) + } + return zstdD(data) } return b.iterateBlobovniczas(func(p string, blz *blobovnicza.Blobovnicza) error { diff --git a/pkg/local_object_storage/blobstor/blobstor_test.go b/pkg/local_object_storage/blobstor/blobstor_test.go new file mode 100644 index 00000000..ef67a86a --- /dev/null +++ b/pkg/local_object_storage/blobstor/blobstor_test.go @@ -0,0 +1,77 @@ +package blobstor + +import ( + "os" + "testing" + + "github.com/nspcc-dev/neofs-node/pkg/core/object" + "github.com/stretchr/testify/require" +) + +func TestCompression(t *testing.T) { + dir, err := os.MkdirTemp("", "neofs*") + require.NoError(t, err) + t.Cleanup(func() { _ = os.RemoveAll(dir) }) + + const ( + smallSizeLimit = 512 + objCount = 4 + ) + + newBlobStor := func(t *testing.T, compress bool) *BlobStor { + bs := New(WithCompressObjects(compress), + WithRootPath(dir), + WithSmallSizeLimit(smallSizeLimit)) + require.NoError(t, bs.Open()) + require.NoError(t, bs.Init()) + return bs + } + + bigObj := make([]*object.Object, objCount) + smallObj := make([]*object.Object, objCount) + for i := 0; i < objCount; i++ { + bigObj[i] = testObject(smallSizeLimit * 2) + smallObj[i] = testObject(smallSizeLimit / 2) + } + + testGet := func(t *testing.T, b *BlobStor, i int) { + res1, err := b.GetSmall(&GetSmallPrm{address: address{smallObj[i].Address()}}) + require.NoError(t, err) + require.Equal(t, smallObj[i], res1.Object()) + + res2, err := b.GetBig(&GetBigPrm{address: address{bigObj[i].Address()}}) + require.NoError(t, err) + require.Equal(t, bigObj[i], res2.Object()) + } + + testPut := func(t *testing.T, b *BlobStor, i int) { + prm := new(PutPrm) + prm.SetObject(smallObj[i]) + _, err = b.Put(prm) + require.NoError(t, err) + + prm = new(PutPrm) + prm.SetObject(bigObj[i]) + _, err = b.Put(prm) + require.NoError(t, err) + } + + // Put and Get uncompressed object + blobStor := newBlobStor(t, false) + testPut(t, blobStor, 0) + testGet(t, blobStor, 0) + require.NoError(t, blobStor.Close()) + + blobStor = newBlobStor(t, true) + testGet(t, blobStor, 0) // get uncompressed object with compress enabled + testPut(t, blobStor, 1) + testGet(t, blobStor, 1) + require.NoError(t, blobStor.Close()) + + blobStor = newBlobStor(t, false) + testGet(t, blobStor, 0) // get old uncompressed object + testGet(t, blobStor, 1) // get compressed object with compression disabled + testPut(t, blobStor, 2) + testGet(t, blobStor, 2) + require.NoError(t, blobStor.Close()) +} diff --git a/pkg/local_object_storage/blobstor/compress.go b/pkg/local_object_storage/blobstor/compress.go index 414ae356..e9ca45d3 100644 --- a/pkg/local_object_storage/blobstor/compress.go +++ b/pkg/local_object_storage/blobstor/compress.go @@ -4,6 +4,10 @@ import ( "github.com/klauspost/compress/zstd" ) +// zstdFrameMagic contains first 4 bytes of any compressed object +// https://github.com/klauspost/compress/blob/master/zstd/framedec.go#L58 . +var zstdFrameMagic = []byte{0x28, 0xb5, 0x2f, 0xfd} + func noOpCompressor(data []byte) []byte { return data }