forked from TrueCloudLab/frostfs-node
[#1523] blobstor: Export single Get
, GetRange
and Delete
methods
Signed-off-by: Evgenii Stratonikov <evgeniy@nspcc.ru>
This commit is contained in:
parent
ca15083a50
commit
7d6df543d7
7 changed files with 91 additions and 72 deletions
|
@ -38,11 +38,11 @@ func TestCompression(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
testGet := func(t *testing.T, b *BlobStor, i int) {
|
testGet := func(t *testing.T, b *BlobStor, i int) {
|
||||||
res1, err := b.GetSmall(common.GetPrm{Address: object.AddressOf(smallObj[i])})
|
res1, err := b.getSmall(common.GetPrm{Address: object.AddressOf(smallObj[i])})
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, smallObj[i], res1.Object)
|
require.Equal(t, smallObj[i], res1.Object)
|
||||||
|
|
||||||
res2, err := b.GetBig(common.GetPrm{Address: object.AddressOf(bigObj[i])})
|
res2, err := b.getBig(common.GetPrm{Address: object.AddressOf(bigObj[i])})
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, bigObj[i], res2.Object)
|
require.Equal(t, bigObj[i], res2.Object)
|
||||||
}
|
}
|
||||||
|
|
|
@ -9,13 +9,28 @@ import (
|
||||||
apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status"
|
apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status"
|
||||||
)
|
)
|
||||||
|
|
||||||
// DeleteBig removes an object from shallow dir of BLOB storage.
|
func (b *BlobStor) Delete(prm common.DeletePrm) (common.DeleteRes, error) {
|
||||||
|
if prm.BlobovniczaID == nil {
|
||||||
|
// Nothing specified, try everything.
|
||||||
|
res, err := b.deleteBig(prm)
|
||||||
|
if err == nil || !errors.As(err, new(apistatus.ObjectNotFound)) {
|
||||||
|
return res, err
|
||||||
|
}
|
||||||
|
return b.deleteSmall(prm)
|
||||||
|
}
|
||||||
|
if *prm.BlobovniczaID == nil {
|
||||||
|
return b.deleteBig(prm)
|
||||||
|
}
|
||||||
|
return b.deleteSmall(prm)
|
||||||
|
}
|
||||||
|
|
||||||
|
// deleteBig removes an object from shallow dir of BLOB storage.
|
||||||
//
|
//
|
||||||
// Returns any error encountered that did not allow
|
// Returns any error encountered that did not allow
|
||||||
// to completely remove the object.
|
// to completely remove the object.
|
||||||
//
|
//
|
||||||
// Returns an error of type apistatus.ObjectNotFound if there is no object to delete.
|
// Returns an error of type apistatus.ObjectNotFound if there is no object to delete.
|
||||||
func (b *BlobStor) DeleteBig(prm common.DeletePrm) (common.DeleteRes, error) {
|
func (b *BlobStor) deleteBig(prm common.DeletePrm) (common.DeleteRes, error) {
|
||||||
err := b.fsTree.Delete(prm.Address)
|
err := b.fsTree.Delete(prm.Address)
|
||||||
if errors.Is(err, fstree.ErrFileNotFound) {
|
if errors.Is(err, fstree.ErrFileNotFound) {
|
||||||
var errNotFound apistatus.ObjectNotFound
|
var errNotFound apistatus.ObjectNotFound
|
||||||
|
@ -30,7 +45,7 @@ func (b *BlobStor) DeleteBig(prm common.DeletePrm) (common.DeleteRes, error) {
|
||||||
return common.DeleteRes{}, err
|
return common.DeleteRes{}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// DeleteSmall removes an object from blobovnicza of BLOB storage.
|
// deleteSmall removes an object from blobovnicza of BLOB storage.
|
||||||
//
|
//
|
||||||
// If blobovnicza ID is not set or set to nil, BlobStor tries to
|
// If blobovnicza ID is not set or set to nil, BlobStor tries to
|
||||||
// find and remove object from any blobovnicza.
|
// find and remove object from any blobovnicza.
|
||||||
|
@ -39,6 +54,6 @@ func (b *BlobStor) DeleteBig(prm common.DeletePrm) (common.DeleteRes, error) {
|
||||||
// to completely remove the object.
|
// to completely remove the object.
|
||||||
//
|
//
|
||||||
// Returns an error of type apistatus.ObjectNotFound if there is no object to delete.
|
// Returns an error of type apistatus.ObjectNotFound if there is no object to delete.
|
||||||
func (b *BlobStor) DeleteSmall(prm common.DeletePrm) (common.DeleteRes, error) {
|
func (b *BlobStor) deleteSmall(prm common.DeletePrm) (common.DeleteRes, error) {
|
||||||
return b.blobovniczas.Delete(prm)
|
return b.blobovniczas.Delete(prm)
|
||||||
}
|
}
|
||||||
|
|
|
@ -10,14 +10,32 @@ import (
|
||||||
objectSDK "github.com/nspcc-dev/neofs-sdk-go/object"
|
objectSDK "github.com/nspcc-dev/neofs-sdk-go/object"
|
||||||
)
|
)
|
||||||
|
|
||||||
// GetBig reads the object from shallow dir of BLOB storage by address.
|
// Get reads the object from b.
|
||||||
|
// If the descriptor is present, only one sub-storage is tried,
|
||||||
|
// Otherwise, each sub-storage is tried in order.
|
||||||
|
func (b *BlobStor) Get(prm common.GetPrm) (common.GetRes, error) {
|
||||||
|
if prm.BlobovniczaID == nil {
|
||||||
|
// Nothing specified, try everything.
|
||||||
|
res, err := b.getBig(prm)
|
||||||
|
if err == nil || !errors.As(err, new(apistatus.ObjectNotFound)) {
|
||||||
|
return res, err
|
||||||
|
}
|
||||||
|
return b.getSmall(prm)
|
||||||
|
}
|
||||||
|
if *prm.BlobovniczaID == nil {
|
||||||
|
return b.getBig(prm)
|
||||||
|
}
|
||||||
|
return b.getSmall(prm)
|
||||||
|
}
|
||||||
|
|
||||||
|
// getBig reads the object from shallow dir of BLOB storage by address.
|
||||||
//
|
//
|
||||||
// Returns any error encountered that
|
// Returns any error encountered that
|
||||||
// did not allow to completely read the object.
|
// did not allow to completely read the object.
|
||||||
//
|
//
|
||||||
// Returns an error of type apistatus.ObjectNotFound if the requested object is not
|
// Returns an error of type apistatus.ObjectNotFound if the requested object is not
|
||||||
// presented in shallow dir.
|
// presented in shallow dir.
|
||||||
func (b *BlobStor) GetBig(prm common.GetPrm) (common.GetRes, error) {
|
func (b *BlobStor) getBig(prm common.GetPrm) (common.GetRes, error) {
|
||||||
// get compressed object data
|
// get compressed object data
|
||||||
data, err := b.fsTree.Get(prm)
|
data, err := b.fsTree.Get(prm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -44,6 +62,6 @@ func (b *BlobStor) GetBig(prm common.GetPrm) (common.GetRes, error) {
|
||||||
return common.GetRes{Object: obj}, nil
|
return common.GetRes{Object: obj}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *BlobStor) GetSmall(prm common.GetPrm) (common.GetRes, error) {
|
func (b *BlobStor) getSmall(prm common.GetPrm) (common.GetRes, error) {
|
||||||
return b.blobovniczas.Get(prm)
|
return b.blobovniczas.Get(prm)
|
||||||
}
|
}
|
||||||
|
|
|
@ -10,14 +10,32 @@ import (
|
||||||
objectSDK "github.com/nspcc-dev/neofs-sdk-go/object"
|
objectSDK "github.com/nspcc-dev/neofs-sdk-go/object"
|
||||||
)
|
)
|
||||||
|
|
||||||
// GetRangeBig reads data of object payload range from shallow dir of BLOB storage.
|
// GetRange reads object payload data from b.
|
||||||
|
// If the descriptor is present, only one sub-storage is tried,
|
||||||
|
// Otherwise, each sub-storage is tried in order.
|
||||||
|
func (b *BlobStor) GetRange(prm common.GetRangePrm) (common.GetRangeRes, error) {
|
||||||
|
if prm.BlobovniczaID == nil {
|
||||||
|
// Nothing specified, try everything.
|
||||||
|
res, err := b.getRangeBig(prm)
|
||||||
|
if err == nil || !errors.As(err, new(apistatus.ObjectNotFound)) {
|
||||||
|
return res, err
|
||||||
|
}
|
||||||
|
return b.getRangeSmall(prm)
|
||||||
|
}
|
||||||
|
if *prm.BlobovniczaID == nil {
|
||||||
|
return b.getRangeBig(prm)
|
||||||
|
}
|
||||||
|
return b.getRangeSmall(prm)
|
||||||
|
}
|
||||||
|
|
||||||
|
// getRangeBig reads data of object payload range from shallow dir of BLOB storage.
|
||||||
//
|
//
|
||||||
// Returns any error encountered that
|
// Returns any error encountered that
|
||||||
// did not allow to completely read the object payload range.
|
// did not allow to completely read the object payload range.
|
||||||
//
|
//
|
||||||
// Returns ErrRangeOutOfBounds if the requested object range is out of bounds.
|
// Returns ErrRangeOutOfBounds if the requested object range is out of bounds.
|
||||||
// Returns an error of type apistatus.ObjectNotFound if object is missing.
|
// Returns an error of type apistatus.ObjectNotFound if object is missing.
|
||||||
func (b *BlobStor) GetRangeBig(prm common.GetRangePrm) (common.GetRangeRes, error) {
|
func (b *BlobStor) getRangeBig(prm common.GetRangePrm) (common.GetRangeRes, error) {
|
||||||
// get compressed object data
|
// get compressed object data
|
||||||
data, err := b.fsTree.Get(common.GetPrm{Address: prm.Address})
|
data, err := b.fsTree.Get(common.GetPrm{Address: prm.Address})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -55,7 +73,7 @@ func (b *BlobStor) GetRangeBig(prm common.GetRangePrm) (common.GetRangeRes, erro
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetRangeSmall reads data of object payload range from blobovnicza of BLOB storage.
|
// getRangeSmall reads data of object payload range from blobovnicza of BLOB storage.
|
||||||
//
|
//
|
||||||
// If blobovnicza ID is not set or set to nil, BlobStor tries to get payload range
|
// If blobovnicza ID is not set or set to nil, BlobStor tries to get payload range
|
||||||
// from any blobovnicza.
|
// from any blobovnicza.
|
||||||
|
@ -65,6 +83,6 @@ func (b *BlobStor) GetRangeBig(prm common.GetRangePrm) (common.GetRangeRes, erro
|
||||||
//
|
//
|
||||||
// Returns ErrRangeOutOfBounds if the requested object range is out of bounds.
|
// Returns ErrRangeOutOfBounds if the requested object range is out of bounds.
|
||||||
// Returns an error of type apistatus.ObjectNotFound if the requested object is missing in blobovnicza(s).
|
// Returns an error of type apistatus.ObjectNotFound if the requested object is missing in blobovnicza(s).
|
||||||
func (b *BlobStor) GetRangeSmall(prm common.GetRangePrm) (common.GetRangeRes, error) {
|
func (b *BlobStor) getRangeSmall(prm common.GetRangePrm) (common.GetRangeRes, error) {
|
||||||
return b.blobovniczas.GetRange(prm)
|
return b.blobovniczas.GetRange(prm)
|
||||||
}
|
}
|
||||||
|
|
|
@ -77,7 +77,7 @@ func (s *Shard) Delete(prm DeletePrm) (DeleteRes, error) {
|
||||||
delSmallPrm.Address = prm.addr[i]
|
delSmallPrm.Address = prm.addr[i]
|
||||||
delSmallPrm.BlobovniczaID = id
|
delSmallPrm.BlobovniczaID = id
|
||||||
|
|
||||||
_, err = s.blobStor.DeleteSmall(delSmallPrm)
|
_, err = s.blobStor.Delete(delSmallPrm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
s.log.Debug("can't remove small object from blobStor",
|
s.log.Debug("can't remove small object from blobStor",
|
||||||
zap.Stringer("object_address", prm.addr[i]),
|
zap.Stringer("object_address", prm.addr[i]),
|
||||||
|
@ -87,11 +87,14 @@ func (s *Shard) Delete(prm DeletePrm) (DeleteRes, error) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var id blobovnicza.ID
|
||||||
|
|
||||||
// delete big object
|
// delete big object
|
||||||
var delBigPrm common.DeletePrm
|
var delBigPrm common.DeletePrm
|
||||||
delBigPrm.Address = prm.addr[i]
|
delBigPrm.Address = prm.addr[i]
|
||||||
|
delBigPrm.BlobovniczaID = &id
|
||||||
|
|
||||||
_, err = s.blobStor.DeleteBig(delBigPrm)
|
_, err = s.blobStor.Delete(delBigPrm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
s.log.Debug("can't remove big object from blobStor",
|
s.log.Debug("can't remove big object from blobStor",
|
||||||
zap.Stringer("object_address", prm.addr[i]),
|
zap.Stringer("object_address", prm.addr[i]),
|
||||||
|
|
|
@ -62,26 +62,12 @@ func (r GetRes) HasMeta() bool {
|
||||||
// Returns an error of type apistatus.ObjectAlreadyRemoved if the requested object has been marked as removed in shard.
|
// Returns an error of type apistatus.ObjectAlreadyRemoved if the requested object has been marked as removed in shard.
|
||||||
// Returns the object.ErrObjectIsExpired if the object is presented but already expired.
|
// Returns the object.ErrObjectIsExpired if the object is presented but already expired.
|
||||||
func (s *Shard) Get(prm GetPrm) (GetRes, error) {
|
func (s *Shard) Get(prm GetPrm) (GetRes, error) {
|
||||||
var big, small storFetcher
|
cb := func(stor *blobstor.BlobStor, id *blobovnicza.ID) (*objectSDK.Object, error) {
|
||||||
|
var getPrm common.GetPrm
|
||||||
|
getPrm.Address = prm.addr
|
||||||
|
getPrm.BlobovniczaID = id
|
||||||
|
|
||||||
big = func(stor *blobstor.BlobStor, _ *blobovnicza.ID) (*objectSDK.Object, error) {
|
res, err := stor.Get(getPrm)
|
||||||
var getBigPrm common.GetPrm
|
|
||||||
getBigPrm.Address = prm.addr
|
|
||||||
|
|
||||||
res, err := stor.GetBig(getBigPrm)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return res.Object, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
small = func(stor *blobstor.BlobStor, id *blobovnicza.ID) (*objectSDK.Object, error) {
|
|
||||||
var getSmallPrm common.GetPrm
|
|
||||||
getSmallPrm.Address = prm.addr
|
|
||||||
getSmallPrm.BlobovniczaID = id
|
|
||||||
|
|
||||||
res, err := stor.GetSmall(getSmallPrm)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -94,7 +80,7 @@ func (s *Shard) Get(prm GetPrm) (GetRes, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
skipMeta := prm.skipMeta || s.GetMode().NoMetabase()
|
skipMeta := prm.skipMeta || s.GetMode().NoMetabase()
|
||||||
obj, hasMeta, err := s.fetchObjectData(prm.addr, skipMeta, big, small, wc)
|
obj, hasMeta, err := s.fetchObjectData(prm.addr, skipMeta, cb, wc)
|
||||||
|
|
||||||
return GetRes{
|
return GetRes{
|
||||||
obj: obj,
|
obj: obj,
|
||||||
|
@ -103,7 +89,7 @@ func (s *Shard) Get(prm GetPrm) (GetRes, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// fetchObjectData looks through writeCache and blobStor to find object.
|
// fetchObjectData looks through writeCache and blobStor to find object.
|
||||||
func (s *Shard) fetchObjectData(addr oid.Address, skipMeta bool, big, small storFetcher, wc func(w writecache.Cache) (*objectSDK.Object, error)) (*objectSDK.Object, bool, error) {
|
func (s *Shard) fetchObjectData(addr oid.Address, skipMeta bool, cb storFetcher, wc func(w writecache.Cache) (*objectSDK.Object, error)) (*objectSDK.Object, bool, error) {
|
||||||
var (
|
var (
|
||||||
err error
|
err error
|
||||||
res *objectSDK.Object
|
res *objectSDK.Object
|
||||||
|
@ -135,11 +121,7 @@ func (s *Shard) fetchObjectData(addr oid.Address, skipMeta bool, big, small stor
|
||||||
}
|
}
|
||||||
|
|
||||||
if skipMeta || err != nil {
|
if skipMeta || err != nil {
|
||||||
res, err = small(s.blobStor, nil)
|
res, err = cb(s.blobStor, nil)
|
||||||
if err == nil || IsErrOutOfRange(err) {
|
|
||||||
return res, false, err
|
|
||||||
}
|
|
||||||
res, err = big(s.blobStor, nil)
|
|
||||||
return res, false, err
|
return res, false, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -157,11 +139,13 @@ func (s *Shard) fetchObjectData(addr oid.Address, skipMeta bool, big, small stor
|
||||||
return nil, true, fmt.Errorf("can't fetch blobovnicza id from metabase: %w", err)
|
return nil, true, fmt.Errorf("can't fetch blobovnicza id from metabase: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if mRes.BlobovniczaID() != nil {
|
blobovniczaID := mRes.BlobovniczaID()
|
||||||
res, err = small(s.blobStor, mRes.BlobovniczaID())
|
if blobovniczaID == nil {
|
||||||
} else {
|
var id blobovnicza.ID
|
||||||
res, err = big(s.blobStor, nil)
|
blobovniczaID = &id
|
||||||
}
|
}
|
||||||
|
|
||||||
|
res, err = cb(s.blobStor, blobovniczaID)
|
||||||
|
|
||||||
return res, true, err
|
return res, true, err
|
||||||
}
|
}
|
||||||
|
|
|
@ -67,33 +67,14 @@ func (r RngRes) HasMeta() bool {
|
||||||
// Returns an error of type apistatus.ObjectAlreadyRemoved if the requested object has been marked as removed in shard.
|
// Returns an error of type apistatus.ObjectAlreadyRemoved if the requested object has been marked as removed in shard.
|
||||||
// Returns the object.ErrObjectIsExpired if the object is presented but already expired.
|
// Returns the object.ErrObjectIsExpired if the object is presented but already expired.
|
||||||
func (s *Shard) GetRange(prm RngPrm) (RngRes, error) {
|
func (s *Shard) GetRange(prm RngPrm) (RngRes, error) {
|
||||||
var big, small storFetcher
|
cb := func(stor *blobstor.BlobStor, id *blobovnicza.ID) (*object.Object, error) {
|
||||||
|
var getRngPrm common.GetRangePrm
|
||||||
|
getRngPrm.Address = prm.addr
|
||||||
|
getRngPrm.Range.SetOffset(prm.off)
|
||||||
|
getRngPrm.Range.SetLength(prm.ln)
|
||||||
|
getRngPrm.BlobovniczaID = id
|
||||||
|
|
||||||
big = func(stor *blobstor.BlobStor, _ *blobovnicza.ID) (*object.Object, error) {
|
res, err := stor.GetRange(getRngPrm)
|
||||||
var getRngBigPrm common.GetRangePrm
|
|
||||||
getRngBigPrm.Address = prm.addr
|
|
||||||
getRngBigPrm.Range.SetOffset(prm.off)
|
|
||||||
getRngBigPrm.Range.SetLength(prm.ln)
|
|
||||||
|
|
||||||
res, err := stor.GetRangeBig(getRngBigPrm)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
obj := object.New()
|
|
||||||
obj.SetPayload(res.Data)
|
|
||||||
|
|
||||||
return obj, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
small = func(stor *blobstor.BlobStor, id *blobovnicza.ID) (*object.Object, error) {
|
|
||||||
var getRngSmallPrm common.GetRangePrm
|
|
||||||
getRngSmallPrm.Address = prm.addr
|
|
||||||
getRngSmallPrm.Range.SetOffset(prm.off)
|
|
||||||
getRngSmallPrm.Range.SetLength(prm.ln)
|
|
||||||
getRngSmallPrm.BlobovniczaID = id
|
|
||||||
|
|
||||||
res, err := stor.GetRangeSmall(getRngSmallPrm)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -123,7 +104,7 @@ func (s *Shard) GetRange(prm RngPrm) (RngRes, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
skipMeta := prm.skipMeta || s.GetMode().NoMetabase()
|
skipMeta := prm.skipMeta || s.GetMode().NoMetabase()
|
||||||
obj, hasMeta, err := s.fetchObjectData(prm.addr, skipMeta, big, small, wc)
|
obj, hasMeta, err := s.fetchObjectData(prm.addr, skipMeta, cb, wc)
|
||||||
|
|
||||||
return RngRes{
|
return RngRes{
|
||||||
obj: obj,
|
obj: obj,
|
||||||
|
|
Loading…
Reference in a new issue