[#1523] blobstor: Unify parameters for Put
operation
Signed-off-by: Evgenii Stratonikov <evgeniy@nspcc.ru>
This commit is contained in:
parent
7d6df543d7
commit
9eb018672c
13 changed files with 91 additions and 73 deletions
|
@ -6,21 +6,11 @@ import (
|
|||
|
||||
"github.com/klauspost/compress/zstd"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/core/object"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common"
|
||||
storagelog "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/internal/log"
|
||||
objectSDK "github.com/nspcc-dev/neofs-sdk-go/object"
|
||||
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
|
||||
)
|
||||
|
||||
// PutPrm groups the parameters of Put operation.
|
||||
type PutPrm struct {
|
||||
rwObject
|
||||
}
|
||||
|
||||
// PutRes groups the resulting values of Put operation.
|
||||
type PutRes struct {
|
||||
roBlobovniczaID
|
||||
}
|
||||
|
||||
// Put saves the object in BLOB storage.
|
||||
//
|
||||
// If object is "big", BlobStor saves the object in shallow dir.
|
||||
|
@ -29,14 +19,18 @@ type PutRes struct {
|
|||
//
|
||||
// Returns any error encountered that
|
||||
// did not allow to completely save the object.
|
||||
func (b *BlobStor) Put(prm PutPrm) (PutRes, error) {
|
||||
// marshal object
|
||||
data, err := prm.obj.Marshal()
|
||||
if err != nil {
|
||||
return PutRes{}, fmt.Errorf("could not marshal the object: %w", err)
|
||||
func (b *BlobStor) Put(prm common.PutPrm) (common.PutRes, error) {
|
||||
prm.Address = object.AddressOf(prm.Object)
|
||||
if prm.RawData == nil {
|
||||
// marshal object
|
||||
data, err := prm.Object.Marshal()
|
||||
if err != nil {
|
||||
return common.PutRes{}, fmt.Errorf("could not marshal the object: %w", err)
|
||||
}
|
||||
prm.RawData = data
|
||||
}
|
||||
|
||||
return b.PutRaw(object.AddressOf(prm.obj), data, b.NeedsCompression(prm.obj))
|
||||
return b.PutRaw(prm, b.NeedsCompression(prm.Object))
|
||||
}
|
||||
|
||||
// NeedsCompression returns true if the object should be compressed.
|
||||
|
@ -48,46 +42,37 @@ func (b *BlobStor) NeedsCompression(obj *objectSDK.Object) bool {
|
|||
}
|
||||
|
||||
// PutRaw saves an already marshaled object in BLOB storage.
|
||||
func (b *BlobStor) PutRaw(addr oid.Address, data []byte, compress bool) (PutRes, error) {
|
||||
big := b.isBig(data)
|
||||
func (b *BlobStor) PutRaw(prm common.PutPrm, compress bool) (common.PutRes, error) {
|
||||
big := b.isBig(prm.RawData)
|
||||
|
||||
if big {
|
||||
var err error
|
||||
if compress {
|
||||
err = b.fsTree.PutStream(addr, func(f *os.File) error {
|
||||
err = b.fsTree.PutStream(prm.Address, func(f *os.File) error {
|
||||
enc, _ := zstd.NewWriter(f) // nil error if no options are provided
|
||||
if _, err := enc.Write(data); err != nil {
|
||||
if _, err := enc.Write(prm.RawData); err != nil {
|
||||
return err
|
||||
}
|
||||
return enc.Close()
|
||||
})
|
||||
} else {
|
||||
err = b.fsTree.Put(addr, data)
|
||||
_, err = b.fsTree.Put(prm)
|
||||
}
|
||||
if err != nil {
|
||||
return PutRes{}, err
|
||||
return common.PutRes{}, err
|
||||
}
|
||||
|
||||
storagelog.Write(b.log, storagelog.AddressField(addr), storagelog.OpField("fstree PUT"))
|
||||
storagelog.Write(b.log, storagelog.AddressField(prm.Address), storagelog.OpField("fstree PUT"))
|
||||
|
||||
return PutRes{}, nil
|
||||
return common.PutRes{}, nil
|
||||
}
|
||||
|
||||
if compress {
|
||||
data = b.CConfig.Compress(data)
|
||||
prm.RawData = b.CConfig.Compress(prm.RawData)
|
||||
}
|
||||
|
||||
// save object in blobovnicza
|
||||
res, err := b.blobovniczas.Put(addr, data)
|
||||
if err != nil {
|
||||
return PutRes{}, err
|
||||
}
|
||||
|
||||
return PutRes{
|
||||
roBlobovniczaID: roBlobovniczaID{
|
||||
blobovniczaID: res,
|
||||
},
|
||||
}, nil
|
||||
return b.blobovniczas.Put(prm)
|
||||
}
|
||||
|
||||
// checks if object is "big".
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue