[#235] blobstor: Return object.ErrRangeOutOfBounds from shallow dir

Signed-off-by: Leonard Lyubich <leonard@nspcc.ru>
This commit is contained in:
Leonard Lyubich 2020-12-08 19:53:01 +03:00 committed by Alex Vanin
parent 41b9fa5b45
commit 9dd83bdf0d
4 changed files with 27 additions and 12 deletions

View file

@ -473,14 +473,19 @@ func (b *blobovniczas) getRangeFromLevel(prm *GetRangeSmallPrm, blzPath string,
// try to read from blobovnicza if it is opened
v, ok := b.opened.Get(blzPath)
if ok {
if res, err := b.getObjectRange(v.(*blobovnicza.Blobovnicza), prm); err == nil {
res, err := b.getObjectRange(v.(*blobovnicza.Blobovnicza), prm)
switch {
case err == nil,
errors.Is(err, object.ErrRangeOutOfBounds):
return res, err
} else if !errors.Is(err, object.ErrNotFound) {
default:
if !errors.Is(err, object.ErrNotFound) {
log.Debug("could not read payload range from opened blobovnicza",
zap.String("error", err.Error()),
)
}
}
}
// therefore the object is possibly placed in a lighter blobovnicza
@ -492,14 +497,19 @@ func (b *blobovniczas) getRangeFromLevel(prm *GetRangeSmallPrm, blzPath string,
b.activeMtx.RUnlock()
if ok && tryActive {
if res, err := b.getObjectRange(active.blz, prm); err == nil {
res, err := b.getObjectRange(active.blz, prm)
switch {
case err == nil,
errors.Is(err, object.ErrRangeOutOfBounds):
return res, err
} else if !errors.Is(err, object.ErrNotFound) {
default:
if !errors.Is(err, object.ErrNotFound) {
log.Debug("could not read payload range from active blobovnicza",
zap.String("error", err.Error()),
)
}
}
}
// then object is possibly placed in closed blobovnicza

View file

@ -20,6 +20,8 @@ type GetRangeBigRes struct {
//
// Returns any error encountered that
// did not allow to completely read the object payload range.
//
// Returns ErrRangeOutOfBounds if requested object range is out of bounds.
func (b *BlobStor) GetRangeBig(prm *GetRangeBigPrm) (*GetRangeBigRes, error) {
// get compressed object data
data, err := b.fsTree.get(prm.addr)
@ -42,8 +44,7 @@ func (b *BlobStor) GetRangeBig(prm *GetRangeBigPrm) (*GetRangeBigRes, error) {
ln, off := prm.rng.GetLength(), prm.rng.GetOffset()
if pLen := uint64(len(payload)); pLen < ln+off {
return nil, errors.Errorf("range is out-of-bounds (payload %d, off %d, ln %d)",
pLen, off, ln)
return nil, object.ErrRangeOutOfBounds
}
return &GetRangeBigRes{

View file

@ -19,6 +19,8 @@ type GetRangeSmallRes struct {
//
// Returns any error encountered that
// did not allow to completely read the object payload range.
//
// Returns ErrRangeOutOfBounds if requested object range is out of bounds.
func (b *BlobStor) GetRangeSmall(prm *GetRangeSmallPrm) (*GetRangeSmallRes, error) {
return b.blobovniczas.getRange(prm)
}

View file

@ -52,6 +52,8 @@ func (r *RngRes) Object() *object.Object {
//
// Returns any error encountered that
// did not allow to completely read the object part.
//
// Returns ErrRangeOutOfBounds if requested object range is out of bounds.
func (s *Shard) GetRange(prm *RngPrm) (*RngRes, error) {
var big, small storFetcher