From ad92493b86463329f47c7dff9e8f024441c71b3b Mon Sep 17 00:00:00 2001 From: Evgenii Stratonikov Date: Fri, 25 Mar 2022 10:59:32 +0300 Subject: [PATCH] [#1268] blobstor: Cleanup zstd encoders/decoders Signed-off-by: Evgenii Stratonikov --- .../blobstor/blobovnicza.go | 17 +++++++++++++++-- pkg/local_object_storage/blobstor/compress.go | 12 ++++++------ .../blobstor/iterate_test.go | 4 ++++ 3 files changed, 25 insertions(+), 8 deletions(-) diff --git a/pkg/local_object_storage/blobstor/blobovnicza.go b/pkg/local_object_storage/blobstor/blobovnicza.go index af4b4705..94f5610e 100644 --- a/pkg/local_object_storage/blobstor/blobovnicza.go +++ b/pkg/local_object_storage/blobstor/blobovnicza.go @@ -78,6 +78,8 @@ type blobovniczas struct { // list of active (opened, non-filled) blobovniczas activeMtx sync.RWMutex active map[string]blobovniczaWithIndex + + onClose []func() } type blobovniczaWithIndex struct { @@ -800,14 +802,21 @@ func (b *blobovniczas) updateAndGet(p string, old *uint64) (blobovniczaWithIndex func (b *blobovniczas) init() error { b.log.Debug("initializing Blobovnicza's") - zstdC, err := zstdCompressor() + enc, zstdC, err := zstdCompressor() if err != nil { return fmt.Errorf("could not create zstd compressor: %v", err) } - zstdD, err := zstdDecompressor() + b.onClose = append(b.onClose, func() { + if err := enc.Close(); err != nil { + b.log.Debug("can't close zstd compressor", zap.String("err", err.Error())) + } + }) + + dec, zstdD, err := zstdDecompressor() if err != nil { return fmt.Errorf("could not create zstd decompressor: %v", err) } + b.onClose = append(b.onClose, dec.Close) // Compression is always done based on config settings. if b.compressionEnabled { @@ -875,6 +884,10 @@ func (b *blobovniczas) close() error { b.activeMtx.Unlock() + for i := range b.onClose { + b.onClose[i]() + } + return nil } diff --git a/pkg/local_object_storage/blobstor/compress.go b/pkg/local_object_storage/blobstor/compress.go index e9ca45d3..ad348e4f 100644 --- a/pkg/local_object_storage/blobstor/compress.go +++ b/pkg/local_object_storage/blobstor/compress.go @@ -16,24 +16,24 @@ func noOpDecompressor(data []byte) ([]byte, error) { return data, nil } -func zstdCompressor() (func([]byte) []byte, error) { +func zstdCompressor() (*zstd.Encoder, func([]byte) []byte, error) { enc, err := zstd.NewWriter(nil) if err != nil { - return nil, err + return nil, nil, err } - return func(data []byte) []byte { + return enc, func(data []byte) []byte { return enc.EncodeAll(data, make([]byte, 0, len(data))) }, nil } -func zstdDecompressor() (func([]byte) ([]byte, error), error) { +func zstdDecompressor() (*zstd.Decoder, func([]byte) ([]byte, error), error) { dec, err := zstd.NewReader(nil) if err != nil { - return nil, err + return nil, nil, err } - return func(data []byte) ([]byte, error) { + return dec, func(data []byte) ([]byte, error) { return dec.DecodeAll(data, nil) }, nil } diff --git a/pkg/local_object_storage/blobstor/iterate_test.go b/pkg/local_object_storage/blobstor/iterate_test.go index aafb24a6..e51c1af9 100644 --- a/pkg/local_object_storage/blobstor/iterate_test.go +++ b/pkg/local_object_storage/blobstor/iterate_test.go @@ -161,6 +161,10 @@ func TestIterate_IgnoreErrors(t *testing.T) { require.NotEqual(t, "", p, "expected to not have at least 1 blobovnicza in cache") require.NoError(t, os.Chmod(p, 0)) + require.NoError(t, b.Close()) + require.NoError(t, bs.Open()) + require.NoError(t, bs.Init()) + var prm IteratePrm prm.SetIterationHandler(func(e IterationElement) error { return nil