diff --git a/pkg/local_object_storage/blobovnicza/blobovnicza_test.go b/pkg/local_object_storage/blobovnicza/blobovnicza_test.go index 158eba49..85214810 100644 --- a/pkg/local_object_storage/blobovnicza/blobovnicza_test.go +++ b/pkg/local_object_storage/blobovnicza/blobovnicza_test.go @@ -55,12 +55,15 @@ func testObject(sz uint64) *object.Object { } func testPutGet(t *testing.T, blz *Blobovnicza, sz uint64, expPut, expGet error) *objectSDK.Address { - // create new object - obj := testObject(sz) + // create binary object + data := make([]byte, sz) + + addr := testAddress() // try to save object in Blobovnicza pPut := new(PutPrm) - pPut.SetObject(obj) + pPut.SetAddress(addr) + pPut.SetMarshaledObject(data) _, err := blz.Put(pPut) require.True(t, errors.Is(err, expPut)) @@ -68,12 +71,12 @@ func testPutGet(t *testing.T, blz *Blobovnicza, sz uint64, expPut, expGet error) return nil } - testGet(t, blz, obj.Address(), obj, expGet) + testGet(t, blz, addr, data, expGet) - return obj.Address() + return addr } -func testGet(t *testing.T, blz *Blobovnicza, addr *objectSDK.Address, expObj *object.Object, expErr error) { +func testGet(t *testing.T, blz *Blobovnicza, addr *objectSDK.Address, expObj []byte, expErr error) { pGet := new(GetPrm) pGet.SetAddress(addr) diff --git a/pkg/local_object_storage/blobovnicza/get.go b/pkg/local_object_storage/blobovnicza/get.go index c2350cec..66f55806 100644 --- a/pkg/local_object_storage/blobovnicza/get.go +++ b/pkg/local_object_storage/blobovnicza/get.go @@ -3,7 +3,6 @@ package blobovnicza import ( objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object" "github.com/nspcc-dev/neofs-node/pkg/core/object" - "github.com/pkg/errors" "go.etcd.io/bbolt" "go.uber.org/zap" ) @@ -15,7 +14,7 @@ type GetPrm struct { // GetRes groups resulting values of Get operation. type GetRes struct { - obj *object.Object + obj []byte } // SetAddress sets address of the requested object. @@ -23,8 +22,8 @@ func (p *GetPrm) SetAddress(addr *objectSDK.Address) { p.addr = addr } -// Object returns the requested object. -func (p *GetRes) Object() *object.Object { +// Object returns binary representation of the requested object. +func (p *GetRes) Object() []byte { return p.obj } @@ -64,15 +63,7 @@ func (b *Blobovnicza) Get(prm *GetPrm) (*GetRes, error) { return nil, object.ErrNotFound } - // TODO: add decompression step - - // unmarshal the object - obj := object.New() - if err := obj.Unmarshal(data); err != nil { - return nil, errors.Wrap(err, "could not unmarshal the object") - } - return &GetRes{ - obj: obj, + obj: data, }, nil } diff --git a/pkg/local_object_storage/blobovnicza/get_range.go b/pkg/local_object_storage/blobovnicza/get_range.go index 986de8b5..43b2282d 100644 --- a/pkg/local_object_storage/blobovnicza/get_range.go +++ b/pkg/local_object_storage/blobovnicza/get_range.go @@ -49,9 +49,14 @@ func (b *Blobovnicza) GetRange(prm *GetRangePrm) (*GetRangeRes, error) { return nil, err } + // FIXME: code below is incorrect because Get returns raw object data + // so we should unmarshal payload from it before. If blobovnicza + // stores objects in non-protocol format (e.g. compressed) + // then it should not provide GetRange method. + from := prm.rng.GetOffset() to := from + prm.rng.GetLength() - payload := res.obj.Payload() + payload := res.obj if from > to { return nil, errors.Errorf("invalid range [%d:%d]", from, to) diff --git a/pkg/local_object_storage/blobovnicza/put.go b/pkg/local_object_storage/blobovnicza/put.go index 461fb87a..ef178275 100644 --- a/pkg/local_object_storage/blobovnicza/put.go +++ b/pkg/local_object_storage/blobovnicza/put.go @@ -2,7 +2,6 @@ package blobovnicza import ( objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object" - "github.com/nspcc-dev/neofs-node/pkg/core/object" "github.com/pkg/errors" "go.etcd.io/bbolt" ) @@ -11,8 +10,6 @@ import ( type PutPrm struct { addr *objectSDK.Address - obj *object.Object - objData []byte } @@ -31,11 +28,6 @@ func (p *PutPrm) SetAddress(addr *objectSDK.Address) { p.addr = addr } -// SetObject sets the object. -func (p *PutPrm) SetObject(obj *object.Object) { - p.obj = obj -} - // SetMarshaledObject sets binary representation of the object. func (p *PutPrm) SetMarshaledObject(data []byte) { p.objData = data @@ -56,9 +48,7 @@ func (p *PutPrm) SetMarshaledObject(data []byte) { func (b *Blobovnicza) Put(prm *PutPrm) (*PutRes, error) { addr := prm.addr if addr == nil { - if addr = prm.obj.Address(); addr == nil { - return nil, errNilAddress - } + return nil, errNilAddress } err := b.boltDB.Update(func(tx *bbolt.Tx) error { @@ -66,19 +56,8 @@ func (b *Blobovnicza) Put(prm *PutPrm) (*PutRes, error) { return ErrFull } - // marshal the object - data := prm.objData - if data == nil { - var err error - - if data, err = prm.obj.Marshal(); err != nil { - return errors.Wrapf(err, "(%T) could not marshal the object", b) - } - } - // TODO: add compression step - // calculate size - sz := uint64(len(data)) + sz := uint64(len(prm.objData)) // get bucket for size buck := tx.Bucket(bucketForSize(sz)) @@ -90,7 +69,7 @@ func (b *Blobovnicza) Put(prm *PutPrm) (*PutRes, error) { } // save the object in bucket - if err := buck.Put(addressKey(addr), data); err != nil { + if err := buck.Put(addressKey(addr), prm.objData); err != nil { return errors.Wrapf(err, "(%T) could not save object in bucket", b) } diff --git a/pkg/local_object_storage/blobstor/blobovnicza.go b/pkg/local_object_storage/blobstor/blobovnicza.go index 175af2cb..ebc21489 100644 --- a/pkg/local_object_storage/blobstor/blobovnicza.go +++ b/pkg/local_object_storage/blobstor/blobovnicza.go @@ -301,17 +301,13 @@ func (b *blobovniczas) delete(prm *DeleteSmallPrm) (res *DeleteSmallRes, err err // // TODO:quite similar to GET, can be unified func (b *blobovniczas) getRange(prm *GetRangeSmallPrm) (res *GetRangeSmallRes, err error) { - bPrm := new(blobovnicza.GetRangePrm) - bPrm.SetAddress(prm.addr) - bPrm.SetRange(prm.rng) - if prm.blobovniczaID != nil { blz, err := b.openBlobovnicza(prm.blobovniczaID.String()) if err != nil { return nil, err } - return b.getObjectRange(blz, bPrm) + return b.getObjectRange(blz, prm) } activeCache := make(map[string]struct{}) @@ -321,7 +317,7 @@ func (b *blobovniczas) getRange(prm *GetRangeSmallPrm) (res *GetRangeSmallRes, e _, ok := activeCache[dirPath] - res, err = b.getRangeFromLevel(bPrm, p, !ok) + res, err = b.getRangeFromLevel(prm, p, !ok) if err != nil { if !errors.Is(err, object.ErrNotFound) { b.log.Debug("could not get object from level", @@ -467,7 +463,7 @@ func (b *blobovniczas) getObjectFromLevel(prm *blobovnicza.GetPrm, blzPath strin // tries to read range of object payload data from particular blobovnicza. // // returns error if object could not be read from any blobovnicza of the same level. -func (b *blobovniczas) getRangeFromLevel(prm *blobovnicza.GetRangePrm, blzPath string, tryActive bool) (*GetRangeSmallRes, error) { +func (b *blobovniczas) getRangeFromLevel(prm *GetRangeSmallPrm, blzPath string, tryActive bool) (*GetRangeSmallRes, error) { lvlPath := path.Dir(blzPath) log := b.log.With( @@ -541,23 +537,62 @@ func (b *blobovniczas) getObject(blz *blobovnicza.Blobovnicza, prm *blobovnicza. return nil, err } + // decompress the data + data, err := b.decompressor(res.Object()) + if err != nil { + return nil, errors.Wrap(err, "could not decompress object data") + } + + // unmarshal the object + obj := object.New() + if err := obj.Unmarshal(data); err != nil { + return nil, errors.Wrap(err, "could not unmarshal the object") + } + return &GetSmallRes{ roObject: roObject{ - obj: res.Object(), + obj: obj, }, }, nil } // reads range of object payload data from blobovnicza and returns GetRangeSmallRes. -func (b *blobovniczas) getObjectRange(blz *blobovnicza.Blobovnicza, prm *blobovnicza.GetRangePrm) (*GetRangeSmallRes, error) { - res, err := blz.GetRange(prm) +func (b *blobovniczas) getObjectRange(blz *blobovnicza.Blobovnicza, prm *GetRangeSmallPrm) (*GetRangeSmallRes, error) { + gPrm := new(blobovnicza.GetPrm) + gPrm.SetAddress(prm.addr) + + // we don't use GetRange call for now since blobovnicza + // stores data that is compressed on BlobStor side. + // If blobovnicza learns to do the compression itself, + // wecan start using GetRange. + res, err := blz.Get(gPrm) if err != nil { return nil, err } + // decompress the data + data, err := b.decompressor(res.Object()) + if err != nil { + return nil, errors.Wrap(err, "could not decompress object data") + } + + // unmarshal the object + obj := object.New() + if err := obj.Unmarshal(data); err != nil { + return nil, errors.Wrap(err, "could not unmarshal the object") + } + + from := prm.rng.GetOffset() + to := from + prm.rng.GetLength() + payload := obj.Payload() + + if uint64(len(payload)) < to { + return nil, object.ErrRangeOutOfBounds + } + return &GetRangeSmallRes{ rangeData: rangeData{ - data: res.RangeData(), + data: payload[from:to], }, }, nil } diff --git a/pkg/local_object_storage/blobstor/put.go b/pkg/local_object_storage/blobstor/put.go index 7557a3d8..53e80187 100644 --- a/pkg/local_object_storage/blobstor/put.go +++ b/pkg/local_object_storage/blobstor/put.go @@ -34,10 +34,12 @@ func (b *BlobStor) Put(prm *PutPrm) (*PutRes, error) { return nil, errors.Wrap(err, "could not marshal the object") } - if b.isBig(data) { - // compress object data - data = b.compressor(data) + big := b.isBig(data) + // compress object data + data = b.compressor(data) + + if big { // save object in shallow dir return new(PutRes), b.fsTree.put(prm.obj.Address(), data) } else {