From 3c39e6df110d3140c9209ab37e4f57fdcf805362 Mon Sep 17 00:00:00 2001 From: Evgenii Stratonikov Date: Thu, 28 Apr 2022 13:19:03 +0300 Subject: [PATCH] [#1319] blobstor: Compress big objects in a streaming fashion Signed-off-by: Evgenii Stratonikov --- .../blobstor/fstree/fstree.go | 17 +++++++++++++ pkg/local_object_storage/blobstor/put.go | 24 ++++++++++++++----- 2 files changed, 35 insertions(+), 6 deletions(-) diff --git a/pkg/local_object_storage/blobstor/fstree/fstree.go b/pkg/local_object_storage/blobstor/fstree/fstree.go index a70ea3e00..062373b6d 100644 --- a/pkg/local_object_storage/blobstor/fstree/fstree.go +++ b/pkg/local_object_storage/blobstor/fstree/fstree.go @@ -194,6 +194,23 @@ func (t *FSTree) Put(addr *addressSDK.Address, data []byte) error { return os.WriteFile(p, data, t.Permissions) } +// PutStream puts executes handler on a file opened for write. +func (t *FSTree) PutStream(addr *addressSDK.Address, handler func(*os.File) error) error { + p := t.treePath(addr) + + if err := util.MkdirAllX(filepath.Dir(p), t.Permissions); err != nil { + return err + } + + f, err := os.OpenFile(p, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, t.Permissions) + if err != nil { + return err + } + defer f.Close() + + return handler(f) +} + // Get returns object from storage by address. func (t *FSTree) Get(addr *addressSDK.Address) ([]byte, error) { p := t.treePath(addr) diff --git a/pkg/local_object_storage/blobstor/put.go b/pkg/local_object_storage/blobstor/put.go index 29de825a9..86fcdad50 100644 --- a/pkg/local_object_storage/blobstor/put.go +++ b/pkg/local_object_storage/blobstor/put.go @@ -2,8 +2,10 @@ package blobstor import ( "fmt" + "os" "strings" + "github.com/klauspost/compress/zstd" "github.com/nspcc-dev/neofs-node/pkg/core/object" storagelog "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/internal/log" objectSDK "github.com/nspcc-dev/neofs-sdk-go/object" @@ -73,13 +75,19 @@ func (b *BlobStor) NeedsCompression(obj *objectSDK.Object) bool { func (b *BlobStor) PutRaw(addr *addressSDK.Address, data []byte, compress bool) (*PutRes, error) { big := b.isBig(data) - if compress { - data = b.compressor(data) - } - if big { - // save object in shallow dir - err := b.fsTree.Put(addr, data) + var err error + if compress { + err = b.fsTree.PutStream(addr, func(f *os.File) error { + enc, _ := zstd.NewWriter(f) // nil error if no options are provided + if _, err := enc.Write(data); err != nil { + return err + } + return enc.Close() + }) + } else { + err = b.fsTree.Put(addr, data) + } if err != nil { return nil, err } @@ -89,6 +97,10 @@ func (b *BlobStor) PutRaw(addr *addressSDK.Address, data []byte, compress bool) return new(PutRes), nil } + if compress { + data = b.compressor(data) + } + // save object in blobovnicza res, err := b.blobovniczas.put(addr, data) if err != nil {