[#1268] blobstor: Cleanup zstd encoders/decoders

Signed-off-by: Evgenii Stratonikov <evgeniy@nspcc.ru>
This commit is contained in:
Evgenii Stratonikov 2022-03-25 10:59:32 +03:00 committed by fyrchik
parent 4253931699
commit ad92493b86
3 changed files with 25 additions and 8 deletions

View file

@ -78,6 +78,8 @@ type blobovniczas struct {
// list of active (opened, non-filled) blobovniczas // list of active (opened, non-filled) blobovniczas
activeMtx sync.RWMutex activeMtx sync.RWMutex
active map[string]blobovniczaWithIndex active map[string]blobovniczaWithIndex
onClose []func()
} }
type blobovniczaWithIndex struct { type blobovniczaWithIndex struct {
@ -800,14 +802,21 @@ func (b *blobovniczas) updateAndGet(p string, old *uint64) (blobovniczaWithIndex
func (b *blobovniczas) init() error { func (b *blobovniczas) init() error {
b.log.Debug("initializing Blobovnicza's") b.log.Debug("initializing Blobovnicza's")
zstdC, err := zstdCompressor() enc, zstdC, err := zstdCompressor()
if err != nil { if err != nil {
return fmt.Errorf("could not create zstd compressor: %v", err) return fmt.Errorf("could not create zstd compressor: %v", err)
} }
zstdD, err := zstdDecompressor() b.onClose = append(b.onClose, func() {
if err := enc.Close(); err != nil {
b.log.Debug("can't close zstd compressor", zap.String("err", err.Error()))
}
})
dec, zstdD, err := zstdDecompressor()
if err != nil { if err != nil {
return fmt.Errorf("could not create zstd decompressor: %v", err) return fmt.Errorf("could not create zstd decompressor: %v", err)
} }
b.onClose = append(b.onClose, dec.Close)
// Compression is always done based on config settings. // Compression is always done based on config settings.
if b.compressionEnabled { if b.compressionEnabled {
@ -875,6 +884,10 @@ func (b *blobovniczas) close() error {
b.activeMtx.Unlock() b.activeMtx.Unlock()
for i := range b.onClose {
b.onClose[i]()
}
return nil return nil
} }

View file

@ -16,24 +16,24 @@ func noOpDecompressor(data []byte) ([]byte, error) {
return data, nil return data, nil
} }
func zstdCompressor() (func([]byte) []byte, error) { func zstdCompressor() (*zstd.Encoder, func([]byte) []byte, error) {
enc, err := zstd.NewWriter(nil) enc, err := zstd.NewWriter(nil)
if err != nil { if err != nil {
return nil, err return nil, nil, err
} }
return func(data []byte) []byte { return enc, func(data []byte) []byte {
return enc.EncodeAll(data, make([]byte, 0, len(data))) return enc.EncodeAll(data, make([]byte, 0, len(data)))
}, nil }, nil
} }
func zstdDecompressor() (func([]byte) ([]byte, error), error) { func zstdDecompressor() (*zstd.Decoder, func([]byte) ([]byte, error), error) {
dec, err := zstd.NewReader(nil) dec, err := zstd.NewReader(nil)
if err != nil { if err != nil {
return nil, err return nil, nil, err
} }
return func(data []byte) ([]byte, error) { return dec, func(data []byte) ([]byte, error) {
return dec.DecodeAll(data, nil) return dec.DecodeAll(data, nil)
}, nil }, nil
} }

View file

@ -161,6 +161,10 @@ func TestIterate_IgnoreErrors(t *testing.T) {
require.NotEqual(t, "", p, "expected to not have at least 1 blobovnicza in cache") require.NotEqual(t, "", p, "expected to not have at least 1 blobovnicza in cache")
require.NoError(t, os.Chmod(p, 0)) require.NoError(t, os.Chmod(p, 0))
require.NoError(t, b.Close())
require.NoError(t, bs.Open())
require.NoError(t, bs.Init())
var prm IteratePrm var prm IteratePrm
prm.SetIterationHandler(func(e IterationElement) error { prm.SetIterationHandler(func(e IterationElement) error {
return nil return nil