forked from TrueCloudLab/frostfs-node
[#229] blobovnicza: Store objects in a binary format
In previous implementation Blobovnicza's stored objects in protocol format which did not allow working with externally compressed objects. To achieve this goal, operations Get and Put no longer work with the structure of the object, but only with abstract binary data. Operation GetRange has become incorrect in its original purpose to receive the payload range. In this regard, BlobStor receives the payload range of the object through Get operation. In the future either Blobovnicza will learn to compress objects by itself, or the GetRange operation will be eliminated. Signed-off-by: Leonard Lyubich <leonard@nspcc.ru>
This commit is contained in:
parent
3d77fdb347
commit
eaae5a5dd7
6 changed files with 73 additions and 58 deletions
|
@ -55,12 +55,15 @@ func testObject(sz uint64) *object.Object {
|
||||||
}
|
}
|
||||||
|
|
||||||
func testPutGet(t *testing.T, blz *Blobovnicza, sz uint64, expPut, expGet error) *objectSDK.Address {
|
func testPutGet(t *testing.T, blz *Blobovnicza, sz uint64, expPut, expGet error) *objectSDK.Address {
|
||||||
// create new object
|
// create binary object
|
||||||
obj := testObject(sz)
|
data := make([]byte, sz)
|
||||||
|
|
||||||
|
addr := testAddress()
|
||||||
|
|
||||||
// try to save object in Blobovnicza
|
// try to save object in Blobovnicza
|
||||||
pPut := new(PutPrm)
|
pPut := new(PutPrm)
|
||||||
pPut.SetObject(obj)
|
pPut.SetAddress(addr)
|
||||||
|
pPut.SetMarshaledObject(data)
|
||||||
_, err := blz.Put(pPut)
|
_, err := blz.Put(pPut)
|
||||||
require.True(t, errors.Is(err, expPut))
|
require.True(t, errors.Is(err, expPut))
|
||||||
|
|
||||||
|
@ -68,12 +71,12 @@ func testPutGet(t *testing.T, blz *Blobovnicza, sz uint64, expPut, expGet error)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
testGet(t, blz, obj.Address(), obj, expGet)
|
testGet(t, blz, addr, data, expGet)
|
||||||
|
|
||||||
return obj.Address()
|
return addr
|
||||||
}
|
}
|
||||||
|
|
||||||
func testGet(t *testing.T, blz *Blobovnicza, addr *objectSDK.Address, expObj *object.Object, expErr error) {
|
func testGet(t *testing.T, blz *Blobovnicza, addr *objectSDK.Address, expObj []byte, expErr error) {
|
||||||
pGet := new(GetPrm)
|
pGet := new(GetPrm)
|
||||||
pGet.SetAddress(addr)
|
pGet.SetAddress(addr)
|
||||||
|
|
||||||
|
|
|
@ -3,7 +3,6 @@ package blobovnicza
|
||||||
import (
|
import (
|
||||||
objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object"
|
objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/core/object"
|
"github.com/nspcc-dev/neofs-node/pkg/core/object"
|
||||||
"github.com/pkg/errors"
|
|
||||||
"go.etcd.io/bbolt"
|
"go.etcd.io/bbolt"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
@ -15,7 +14,7 @@ type GetPrm struct {
|
||||||
|
|
||||||
// GetRes groups resulting values of Get operation.
|
// GetRes groups resulting values of Get operation.
|
||||||
type GetRes struct {
|
type GetRes struct {
|
||||||
obj *object.Object
|
obj []byte
|
||||||
}
|
}
|
||||||
|
|
||||||
// SetAddress sets address of the requested object.
|
// SetAddress sets address of the requested object.
|
||||||
|
@ -23,8 +22,8 @@ func (p *GetPrm) SetAddress(addr *objectSDK.Address) {
|
||||||
p.addr = addr
|
p.addr = addr
|
||||||
}
|
}
|
||||||
|
|
||||||
// Object returns the requested object.
|
// Object returns binary representation of the requested object.
|
||||||
func (p *GetRes) Object() *object.Object {
|
func (p *GetRes) Object() []byte {
|
||||||
return p.obj
|
return p.obj
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -64,15 +63,7 @@ func (b *Blobovnicza) Get(prm *GetPrm) (*GetRes, error) {
|
||||||
return nil, object.ErrNotFound
|
return nil, object.ErrNotFound
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: add decompression step
|
|
||||||
|
|
||||||
// unmarshal the object
|
|
||||||
obj := object.New()
|
|
||||||
if err := obj.Unmarshal(data); err != nil {
|
|
||||||
return nil, errors.Wrap(err, "could not unmarshal the object")
|
|
||||||
}
|
|
||||||
|
|
||||||
return &GetRes{
|
return &GetRes{
|
||||||
obj: obj,
|
obj: data,
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -49,9 +49,14 @@ func (b *Blobovnicza) GetRange(prm *GetRangePrm) (*GetRangeRes, error) {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// FIXME: code below is incorrect because Get returns raw object data
|
||||||
|
// so we should unmarshal payload from it before. If blobovnicza
|
||||||
|
// stores objects in non-protocol format (e.g. compressed)
|
||||||
|
// then it should not provide GetRange method.
|
||||||
|
|
||||||
from := prm.rng.GetOffset()
|
from := prm.rng.GetOffset()
|
||||||
to := from + prm.rng.GetLength()
|
to := from + prm.rng.GetLength()
|
||||||
payload := res.obj.Payload()
|
payload := res.obj
|
||||||
|
|
||||||
if from > to {
|
if from > to {
|
||||||
return nil, errors.Errorf("invalid range [%d:%d]", from, to)
|
return nil, errors.Errorf("invalid range [%d:%d]", from, to)
|
||||||
|
|
|
@ -2,7 +2,6 @@ package blobovnicza
|
||||||
|
|
||||||
import (
|
import (
|
||||||
objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object"
|
objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/core/object"
|
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
"go.etcd.io/bbolt"
|
"go.etcd.io/bbolt"
|
||||||
)
|
)
|
||||||
|
@ -11,8 +10,6 @@ import (
|
||||||
type PutPrm struct {
|
type PutPrm struct {
|
||||||
addr *objectSDK.Address
|
addr *objectSDK.Address
|
||||||
|
|
||||||
obj *object.Object
|
|
||||||
|
|
||||||
objData []byte
|
objData []byte
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -31,11 +28,6 @@ func (p *PutPrm) SetAddress(addr *objectSDK.Address) {
|
||||||
p.addr = addr
|
p.addr = addr
|
||||||
}
|
}
|
||||||
|
|
||||||
// SetObject sets the object.
|
|
||||||
func (p *PutPrm) SetObject(obj *object.Object) {
|
|
||||||
p.obj = obj
|
|
||||||
}
|
|
||||||
|
|
||||||
// SetMarshaledObject sets binary representation of the object.
|
// SetMarshaledObject sets binary representation of the object.
|
||||||
func (p *PutPrm) SetMarshaledObject(data []byte) {
|
func (p *PutPrm) SetMarshaledObject(data []byte) {
|
||||||
p.objData = data
|
p.objData = data
|
||||||
|
@ -56,9 +48,7 @@ func (p *PutPrm) SetMarshaledObject(data []byte) {
|
||||||
func (b *Blobovnicza) Put(prm *PutPrm) (*PutRes, error) {
|
func (b *Blobovnicza) Put(prm *PutPrm) (*PutRes, error) {
|
||||||
addr := prm.addr
|
addr := prm.addr
|
||||||
if addr == nil {
|
if addr == nil {
|
||||||
if addr = prm.obj.Address(); addr == nil {
|
return nil, errNilAddress
|
||||||
return nil, errNilAddress
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
err := b.boltDB.Update(func(tx *bbolt.Tx) error {
|
err := b.boltDB.Update(func(tx *bbolt.Tx) error {
|
||||||
|
@ -66,19 +56,8 @@ func (b *Blobovnicza) Put(prm *PutPrm) (*PutRes, error) {
|
||||||
return ErrFull
|
return ErrFull
|
||||||
}
|
}
|
||||||
|
|
||||||
// marshal the object
|
|
||||||
data := prm.objData
|
|
||||||
if data == nil {
|
|
||||||
var err error
|
|
||||||
|
|
||||||
if data, err = prm.obj.Marshal(); err != nil {
|
|
||||||
return errors.Wrapf(err, "(%T) could not marshal the object", b)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// TODO: add compression step
|
|
||||||
|
|
||||||
// calculate size
|
// calculate size
|
||||||
sz := uint64(len(data))
|
sz := uint64(len(prm.objData))
|
||||||
|
|
||||||
// get bucket for size
|
// get bucket for size
|
||||||
buck := tx.Bucket(bucketForSize(sz))
|
buck := tx.Bucket(bucketForSize(sz))
|
||||||
|
@ -90,7 +69,7 @@ func (b *Blobovnicza) Put(prm *PutPrm) (*PutRes, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// save the object in bucket
|
// save the object in bucket
|
||||||
if err := buck.Put(addressKey(addr), data); err != nil {
|
if err := buck.Put(addressKey(addr), prm.objData); err != nil {
|
||||||
return errors.Wrapf(err, "(%T) could not save object in bucket", b)
|
return errors.Wrapf(err, "(%T) could not save object in bucket", b)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -301,17 +301,13 @@ func (b *blobovniczas) delete(prm *DeleteSmallPrm) (res *DeleteSmallRes, err err
|
||||||
//
|
//
|
||||||
// TODO:quite similar to GET, can be unified
|
// TODO:quite similar to GET, can be unified
|
||||||
func (b *blobovniczas) getRange(prm *GetRangeSmallPrm) (res *GetRangeSmallRes, err error) {
|
func (b *blobovniczas) getRange(prm *GetRangeSmallPrm) (res *GetRangeSmallRes, err error) {
|
||||||
bPrm := new(blobovnicza.GetRangePrm)
|
|
||||||
bPrm.SetAddress(prm.addr)
|
|
||||||
bPrm.SetRange(prm.rng)
|
|
||||||
|
|
||||||
if prm.blobovniczaID != nil {
|
if prm.blobovniczaID != nil {
|
||||||
blz, err := b.openBlobovnicza(prm.blobovniczaID.String())
|
blz, err := b.openBlobovnicza(prm.blobovniczaID.String())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return b.getObjectRange(blz, bPrm)
|
return b.getObjectRange(blz, prm)
|
||||||
}
|
}
|
||||||
|
|
||||||
activeCache := make(map[string]struct{})
|
activeCache := make(map[string]struct{})
|
||||||
|
@ -321,7 +317,7 @@ func (b *blobovniczas) getRange(prm *GetRangeSmallPrm) (res *GetRangeSmallRes, e
|
||||||
|
|
||||||
_, ok := activeCache[dirPath]
|
_, ok := activeCache[dirPath]
|
||||||
|
|
||||||
res, err = b.getRangeFromLevel(bPrm, p, !ok)
|
res, err = b.getRangeFromLevel(prm, p, !ok)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if !errors.Is(err, object.ErrNotFound) {
|
if !errors.Is(err, object.ErrNotFound) {
|
||||||
b.log.Debug("could not get object from level",
|
b.log.Debug("could not get object from level",
|
||||||
|
@ -467,7 +463,7 @@ func (b *blobovniczas) getObjectFromLevel(prm *blobovnicza.GetPrm, blzPath strin
|
||||||
// tries to read range of object payload data from particular blobovnicza.
|
// tries to read range of object payload data from particular blobovnicza.
|
||||||
//
|
//
|
||||||
// returns error if object could not be read from any blobovnicza of the same level.
|
// returns error if object could not be read from any blobovnicza of the same level.
|
||||||
func (b *blobovniczas) getRangeFromLevel(prm *blobovnicza.GetRangePrm, blzPath string, tryActive bool) (*GetRangeSmallRes, error) {
|
func (b *blobovniczas) getRangeFromLevel(prm *GetRangeSmallPrm, blzPath string, tryActive bool) (*GetRangeSmallRes, error) {
|
||||||
lvlPath := path.Dir(blzPath)
|
lvlPath := path.Dir(blzPath)
|
||||||
|
|
||||||
log := b.log.With(
|
log := b.log.With(
|
||||||
|
@ -541,23 +537,62 @@ func (b *blobovniczas) getObject(blz *blobovnicza.Blobovnicza, prm *blobovnicza.
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// decompress the data
|
||||||
|
data, err := b.decompressor(res.Object())
|
||||||
|
if err != nil {
|
||||||
|
return nil, errors.Wrap(err, "could not decompress object data")
|
||||||
|
}
|
||||||
|
|
||||||
|
// unmarshal the object
|
||||||
|
obj := object.New()
|
||||||
|
if err := obj.Unmarshal(data); err != nil {
|
||||||
|
return nil, errors.Wrap(err, "could not unmarshal the object")
|
||||||
|
}
|
||||||
|
|
||||||
return &GetSmallRes{
|
return &GetSmallRes{
|
||||||
roObject: roObject{
|
roObject: roObject{
|
||||||
obj: res.Object(),
|
obj: obj,
|
||||||
},
|
},
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// reads range of object payload data from blobovnicza and returns GetRangeSmallRes.
|
// reads range of object payload data from blobovnicza and returns GetRangeSmallRes.
|
||||||
func (b *blobovniczas) getObjectRange(blz *blobovnicza.Blobovnicza, prm *blobovnicza.GetRangePrm) (*GetRangeSmallRes, error) {
|
func (b *blobovniczas) getObjectRange(blz *blobovnicza.Blobovnicza, prm *GetRangeSmallPrm) (*GetRangeSmallRes, error) {
|
||||||
res, err := blz.GetRange(prm)
|
gPrm := new(blobovnicza.GetPrm)
|
||||||
|
gPrm.SetAddress(prm.addr)
|
||||||
|
|
||||||
|
// we don't use GetRange call for now since blobovnicza
|
||||||
|
// stores data that is compressed on BlobStor side.
|
||||||
|
// If blobovnicza learns to do the compression itself,
|
||||||
|
// wecan start using GetRange.
|
||||||
|
res, err := blz.Get(gPrm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// decompress the data
|
||||||
|
data, err := b.decompressor(res.Object())
|
||||||
|
if err != nil {
|
||||||
|
return nil, errors.Wrap(err, "could not decompress object data")
|
||||||
|
}
|
||||||
|
|
||||||
|
// unmarshal the object
|
||||||
|
obj := object.New()
|
||||||
|
if err := obj.Unmarshal(data); err != nil {
|
||||||
|
return nil, errors.Wrap(err, "could not unmarshal the object")
|
||||||
|
}
|
||||||
|
|
||||||
|
from := prm.rng.GetOffset()
|
||||||
|
to := from + prm.rng.GetLength()
|
||||||
|
payload := obj.Payload()
|
||||||
|
|
||||||
|
if uint64(len(payload)) < to {
|
||||||
|
return nil, object.ErrRangeOutOfBounds
|
||||||
|
}
|
||||||
|
|
||||||
return &GetRangeSmallRes{
|
return &GetRangeSmallRes{
|
||||||
rangeData: rangeData{
|
rangeData: rangeData{
|
||||||
data: res.RangeData(),
|
data: payload[from:to],
|
||||||
},
|
},
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -34,10 +34,12 @@ func (b *BlobStor) Put(prm *PutPrm) (*PutRes, error) {
|
||||||
return nil, errors.Wrap(err, "could not marshal the object")
|
return nil, errors.Wrap(err, "could not marshal the object")
|
||||||
}
|
}
|
||||||
|
|
||||||
if b.isBig(data) {
|
big := b.isBig(data)
|
||||||
// compress object data
|
|
||||||
data = b.compressor(data)
|
|
||||||
|
|
||||||
|
// compress object data
|
||||||
|
data = b.compressor(data)
|
||||||
|
|
||||||
|
if big {
|
||||||
// save object in shallow dir
|
// save object in shallow dir
|
||||||
return new(PutRes), b.fsTree.put(prm.obj.Address(), data)
|
return new(PutRes), b.fsTree.put(prm.obj.Address(), data)
|
||||||
} else {
|
} else {
|
||||||
|
|
Loading…
Reference in a new issue