forked from TrueCloudLab/frostfs-node
[#1319] blobstor: Compress big objects in a streaming fashion
Signed-off-by: Evgenii Stratonikov <evgeniy@nspcc.ru>
This commit is contained in:
parent
1219ff89d4
commit
3c39e6df11
2 changed files with 35 additions and 6 deletions
|
@ -194,6 +194,23 @@ func (t *FSTree) Put(addr *addressSDK.Address, data []byte) error {
|
|||
return os.WriteFile(p, data, t.Permissions)
|
||||
}
|
||||
|
||||
// PutStream puts executes handler on a file opened for write.
|
||||
func (t *FSTree) PutStream(addr *addressSDK.Address, handler func(*os.File) error) error {
|
||||
p := t.treePath(addr)
|
||||
|
||||
if err := util.MkdirAllX(filepath.Dir(p), t.Permissions); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
f, err := os.OpenFile(p, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, t.Permissions)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
return handler(f)
|
||||
}
|
||||
|
||||
// Get returns object from storage by address.
|
||||
func (t *FSTree) Get(addr *addressSDK.Address) ([]byte, error) {
|
||||
p := t.treePath(addr)
|
||||
|
|
|
@ -2,8 +2,10 @@ package blobstor
|
|||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"strings"
|
||||
|
||||
"github.com/klauspost/compress/zstd"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/core/object"
|
||||
storagelog "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/internal/log"
|
||||
objectSDK "github.com/nspcc-dev/neofs-sdk-go/object"
|
||||
|
@ -73,13 +75,19 @@ func (b *BlobStor) NeedsCompression(obj *objectSDK.Object) bool {
|
|||
func (b *BlobStor) PutRaw(addr *addressSDK.Address, data []byte, compress bool) (*PutRes, error) {
|
||||
big := b.isBig(data)
|
||||
|
||||
if compress {
|
||||
data = b.compressor(data)
|
||||
}
|
||||
|
||||
if big {
|
||||
// save object in shallow dir
|
||||
err := b.fsTree.Put(addr, data)
|
||||
var err error
|
||||
if compress {
|
||||
err = b.fsTree.PutStream(addr, func(f *os.File) error {
|
||||
enc, _ := zstd.NewWriter(f) // nil error if no options are provided
|
||||
if _, err := enc.Write(data); err != nil {
|
||||
return err
|
||||
}
|
||||
return enc.Close()
|
||||
})
|
||||
} else {
|
||||
err = b.fsTree.Put(addr, data)
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -89,6 +97,10 @@ func (b *BlobStor) PutRaw(addr *addressSDK.Address, data []byte, compress bool)
|
|||
return new(PutRes), nil
|
||||
}
|
||||
|
||||
if compress {
|
||||
data = b.compressor(data)
|
||||
}
|
||||
|
||||
// save object in blobovnicza
|
||||
res, err := b.blobovniczas.put(addr, data)
|
||||
if err != nil {
|
||||
|
|
Loading…
Reference in a new issue