[#1460] blobstor: Do not use pointers as the results

Signed-off-by: Pavel Karpy <carpawell@nspcc.ru>
This commit is contained in:
Pavel Karpy 2022-05-31 15:18:32 +03:00 committed by Pavel Karpy
parent 08bf8a68f1
commit 0e4a1beecf
11 changed files with 64 additions and 63 deletions

View file

@ -205,7 +205,7 @@ func (b *blobovniczas) put(addr oid.Address, data []byte) (*blobovnicza.ID, erro
//
// If blobocvnicza ID is specified, only this blobovnicza is processed.
// Otherwise, all blobovniczas are processed descending weight.
func (b *blobovniczas) get(prm GetSmallPrm) (res *GetSmallRes, err error) {
func (b *blobovniczas) get(prm GetSmallPrm) (res GetSmallRes, err error) {
var bPrm blobovnicza.GetPrm
bPrm.SetAddress(prm.addr)
@ -241,7 +241,7 @@ func (b *blobovniczas) get(prm GetSmallPrm) (res *GetSmallRes, err error) {
return err == nil, nil
})
if err == nil && res == nil {
if err == nil && res.Object() == nil {
// not found in any blobovnicza
var errNotFound apistatus.ObjectNotFound
@ -255,7 +255,7 @@ func (b *blobovniczas) get(prm GetSmallPrm) (res *GetSmallRes, err error) {
//
// If blobocvnicza ID is specified, only this blobovnicza is processed.
// Otherwise, all blobovniczas are processed descending weight.
func (b *blobovniczas) delete(prm DeleteSmallPrm) (res *DeleteSmallRes, err error) {
func (b *blobovniczas) delete(prm DeleteSmallPrm) (res DeleteSmallRes, err error) {
var bPrm blobovnicza.DeletePrm
bPrm.SetAddress(prm.addr)
@ -301,7 +301,7 @@ func (b *blobovniczas) delete(prm DeleteSmallPrm) (res *DeleteSmallRes, err erro
// not found in any blobovnicza
var errNotFound apistatus.ObjectNotFound
return nil, errNotFound
return DeleteSmallRes{}, errNotFound
}
return
@ -311,11 +311,11 @@ func (b *blobovniczas) delete(prm DeleteSmallPrm) (res *DeleteSmallRes, err erro
//
// If blobocvnicza ID is specified, only this blobovnicza is processed.
// Otherwise, all blobovniczas are processed descending weight.
func (b *blobovniczas) getRange(prm GetRangeSmallPrm) (res *GetRangeSmallRes, err error) {
func (b *blobovniczas) getRange(prm GetRangeSmallPrm) (res GetRangeSmallRes, err error) {
if prm.blobovniczaID != nil {
blz, err := b.openBlobovnicza(prm.blobovniczaID.String())
if err != nil {
return nil, err
return GetRangeSmallRes{}, err
}
return b.getObjectRange(blz, prm)
@ -355,7 +355,7 @@ func (b *blobovniczas) getRange(prm GetRangeSmallPrm) (res *GetRangeSmallRes, er
// not found in any blobovnicza
var errNotFound apistatus.ObjectNotFound
return nil, errNotFound
return GetRangeSmallRes{}, errNotFound
}
return
@ -364,7 +364,7 @@ func (b *blobovniczas) getRange(prm GetRangeSmallPrm) (res *GetRangeSmallRes, er
// tries to delete object from particular blobovnicza.
//
// returns no error if object was removed from some blobovnicza of the same level.
func (b *blobovniczas) deleteObjectFromLevel(prm blobovnicza.DeletePrm, blzPath string, tryActive bool, dp DeleteSmallPrm) (*DeleteSmallRes, error) {
func (b *blobovniczas) deleteObjectFromLevel(prm blobovnicza.DeletePrm, blzPath string, tryActive bool, dp DeleteSmallPrm) (DeleteSmallRes, error) {
lvlPath := filepath.Dir(blzPath)
// try to remove from blobovnicza if it is opened
@ -410,13 +410,13 @@ func (b *blobovniczas) deleteObjectFromLevel(prm blobovnicza.DeletePrm, blzPath
b.log.Debug("index is too big", zap.String("path", blzPath))
var errNotFound apistatus.ObjectNotFound
return nil, errNotFound
return DeleteSmallRes{}, errNotFound
}
// open blobovnicza (cached inside)
blz, err := b.openBlobovnicza(blzPath)
if err != nil {
return nil, err
return DeleteSmallRes{}, err
}
return b.deleteObject(blz, prm, dp)
@ -425,7 +425,7 @@ func (b *blobovniczas) deleteObjectFromLevel(prm blobovnicza.DeletePrm, blzPath
// tries to read object from particular blobovnicza.
//
// returns error if object could not be read from any blobovnicza of the same level.
func (b *blobovniczas) getObjectFromLevel(prm blobovnicza.GetPrm, blzPath string, tryActive bool) (*GetSmallRes, error) {
func (b *blobovniczas) getObjectFromLevel(prm blobovnicza.GetPrm, blzPath string, tryActive bool) (GetSmallRes, error) {
lvlPath := filepath.Dir(blzPath)
// try to read from blobovnicza if it is opened
@ -472,13 +472,13 @@ func (b *blobovniczas) getObjectFromLevel(prm blobovnicza.GetPrm, blzPath string
b.log.Debug("index is too big", zap.String("path", blzPath))
var errNotFound apistatus.ObjectNotFound
return nil, errNotFound
return GetSmallRes{}, errNotFound
}
// open blobovnicza (cached inside)
blz, err := b.openBlobovnicza(blzPath)
if err != nil {
return nil, err
return GetSmallRes{}, err
}
return b.getObject(blz, prm)
@ -487,7 +487,7 @@ func (b *blobovniczas) getObjectFromLevel(prm blobovnicza.GetPrm, blzPath string
// tries to read range of object payload data from particular blobovnicza.
//
// returns error if object could not be read from any blobovnicza of the same level.
func (b *blobovniczas) getRangeFromLevel(prm GetRangeSmallPrm, blzPath string, tryActive bool) (*GetRangeSmallRes, error) {
func (b *blobovniczas) getRangeFromLevel(prm GetRangeSmallPrm, blzPath string, tryActive bool) (GetRangeSmallRes, error) {
lvlPath := filepath.Dir(blzPath)
// try to read from blobovnicza if it is opened
@ -545,23 +545,23 @@ func (b *blobovniczas) getRangeFromLevel(prm GetRangeSmallPrm, blzPath string, t
var errNotFound apistatus.ObjectNotFound
return nil, errNotFound
return GetRangeSmallRes{}, errNotFound
}
// open blobovnicza (cached inside)
blz, err := b.openBlobovnicza(blzPath)
if err != nil {
return nil, err
return GetRangeSmallRes{}, err
}
return b.getObjectRange(blz, prm)
}
// removes object from blobovnicza and returns DeleteSmallRes.
func (b *blobovniczas) deleteObject(blz *blobovnicza.Blobovnicza, prm blobovnicza.DeletePrm, dp DeleteSmallPrm) (*DeleteSmallRes, error) {
func (b *blobovniczas) deleteObject(blz *blobovnicza.Blobovnicza, prm blobovnicza.DeletePrm, dp DeleteSmallPrm) (DeleteSmallRes, error) {
_, err := blz.Delete(prm)
if err != nil {
return nil, err
return DeleteSmallRes{}, err
}
storagelog.Write(b.log,
@ -570,29 +570,29 @@ func (b *blobovniczas) deleteObject(blz *blobovnicza.Blobovnicza, prm blobovnicz
zap.Stringer("blobovnicza ID", dp.blobovniczaID),
)
return nil, nil
return DeleteSmallRes{}, nil
}
// reads object from blobovnicza and returns GetSmallRes.
func (b *blobovniczas) getObject(blz *blobovnicza.Blobovnicza, prm blobovnicza.GetPrm) (*GetSmallRes, error) {
func (b *blobovniczas) getObject(blz *blobovnicza.Blobovnicza, prm blobovnicza.GetPrm) (GetSmallRes, error) {
res, err := blz.Get(prm)
if err != nil {
return nil, err
return GetSmallRes{}, err
}
// decompress the data
data, err := b.decompressor(res.Object())
if err != nil {
return nil, fmt.Errorf("could not decompress object data: %w", err)
return GetSmallRes{}, fmt.Errorf("could not decompress object data: %w", err)
}
// unmarshal the object
obj := objectSDK.New()
if err := obj.Unmarshal(data); err != nil {
return nil, fmt.Errorf("could not unmarshal the object: %w", err)
return GetSmallRes{}, fmt.Errorf("could not unmarshal the object: %w", err)
}
return &GetSmallRes{
return GetSmallRes{
roObject: roObject{
obj: obj,
},
@ -600,7 +600,7 @@ func (b *blobovniczas) getObject(blz *blobovnicza.Blobovnicza, prm blobovnicza.G
}
// reads range of object payload data from blobovnicza and returns GetRangeSmallRes.
func (b *blobovniczas) getObjectRange(blz *blobovnicza.Blobovnicza, prm GetRangeSmallPrm) (*GetRangeSmallRes, error) {
func (b *blobovniczas) getObjectRange(blz *blobovnicza.Blobovnicza, prm GetRangeSmallPrm) (GetRangeSmallRes, error) {
var gPrm blobovnicza.GetPrm
gPrm.SetAddress(prm.addr)
@ -610,19 +610,19 @@ func (b *blobovniczas) getObjectRange(blz *blobovnicza.Blobovnicza, prm GetRange
// we can start using GetRange.
res, err := blz.Get(gPrm)
if err != nil {
return nil, err
return GetRangeSmallRes{}, err
}
// decompress the data
data, err := b.decompressor(res.Object())
if err != nil {
return nil, fmt.Errorf("could not decompress object data: %w", err)
return GetRangeSmallRes{}, fmt.Errorf("could not decompress object data: %w", err)
}
// unmarshal the object
obj := objectSDK.New()
if err := obj.Unmarshal(data); err != nil {
return nil, fmt.Errorf("could not unmarshal the object: %w", err)
return GetRangeSmallRes{}, fmt.Errorf("could not unmarshal the object: %w", err)
}
from := prm.rng.GetOffset()
@ -630,10 +630,10 @@ func (b *blobovniczas) getObjectRange(blz *blobovnicza.Blobovnicza, prm GetRange
payload := obj.Payload()
if uint64(len(payload)) < to {
return nil, object.ErrRangeOutOfBounds
return GetRangeSmallRes{}, object.ErrRangeOutOfBounds
}
return &GetRangeSmallRes{
return GetRangeSmallRes{
rangeData: rangeData{
data: payload[from:to],
},

View file

@ -22,7 +22,7 @@ type DeleteBigRes struct{}
// to completely remove the object.
//
// Returns an error of type apistatus.ObjectNotFound if there is no object to delete.
func (b *BlobStor) DeleteBig(prm DeleteBigPrm) (*DeleteBigRes, error) {
func (b *BlobStor) DeleteBig(prm DeleteBigPrm) (DeleteBigRes, error) {
err := b.fsTree.Delete(prm.addr)
if errors.Is(err, fstree.ErrFileNotFound) {
var errNotFound apistatus.ObjectNotFound
@ -34,5 +34,5 @@ func (b *BlobStor) DeleteBig(prm DeleteBigPrm) (*DeleteBigRes, error) {
storagelog.Write(b.log, storagelog.AddressField(prm.addr), storagelog.OpField("fstree DELETE"))
}
return nil, err
return DeleteBigRes{}, err
}

View file

@ -18,6 +18,6 @@ type DeleteSmallRes struct{}
// to completely remove the object.
//
// Returns an error of type apistatus.ObjectNotFound if there is no object to delete.
func (b *BlobStor) DeleteSmall(prm DeleteSmallPrm) (*DeleteSmallRes, error) {
func (b *BlobStor) DeleteSmall(prm DeleteSmallPrm) (DeleteSmallRes, error) {
return b.blobovniczas.delete(prm)
}

View file

@ -29,7 +29,7 @@ func (r ExistsRes) Exists() bool {
//
// Returns any error encountered that did not allow
// to completely check object existence.
func (b *BlobStor) Exists(prm ExistsPrm) (*ExistsRes, error) {
func (b *BlobStor) Exists(prm ExistsPrm) (ExistsRes, error) {
// check presence in shallow dir first (cheaper)
exists, err := b.existsBig(prm.addr)
@ -58,9 +58,10 @@ func (b *BlobStor) Exists(prm ExistsPrm) (*ExistsRes, error) {
}
if err != nil {
return nil, err
return ExistsRes{}, err
}
return &ExistsRes{exists: exists}, err
return ExistsRes{exists: exists}, err
}
// checks if object is presented in shallow dir.

View file

@ -26,31 +26,31 @@ type GetBigRes struct {
//
// Returns an error of type apistatus.ObjectNotFound if the requested object is not
// presented in shallow dir.
func (b *BlobStor) GetBig(prm GetBigPrm) (*GetBigRes, error) {
func (b *BlobStor) GetBig(prm GetBigPrm) (GetBigRes, error) {
// get compressed object data
data, err := b.fsTree.Get(prm.addr)
if err != nil {
if errors.Is(err, fstree.ErrFileNotFound) {
var errNotFound apistatus.ObjectNotFound
return nil, errNotFound
return GetBigRes{}, errNotFound
}
return nil, fmt.Errorf("could not read object from fs tree: %w", err)
return GetBigRes{}, fmt.Errorf("could not read object from fs tree: %w", err)
}
data, err = b.decompressor(data)
if err != nil {
return nil, fmt.Errorf("could not decompress object data: %w", err)
return GetBigRes{}, fmt.Errorf("could not decompress object data: %w", err)
}
// unmarshal the object
obj := objectSDK.New()
if err := obj.Unmarshal(data); err != nil {
return nil, fmt.Errorf("could not unmarshal the object: %w", err)
return GetBigRes{}, fmt.Errorf("could not unmarshal the object: %w", err)
}
return &GetBigRes{
return GetBigRes{
roObject: roObject{
obj: obj,
},

View file

@ -28,38 +28,38 @@ type GetRangeBigRes struct {
//
// Returns ErrRangeOutOfBounds if the requested object range is out of bounds.
// Returns an error of type apistatus.ObjectNotFound if object is missing.
func (b *BlobStor) GetRangeBig(prm GetRangeBigPrm) (*GetRangeBigRes, error) {
func (b *BlobStor) GetRangeBig(prm GetRangeBigPrm) (GetRangeBigRes, error) {
// get compressed object data
data, err := b.fsTree.Get(prm.addr)
if err != nil {
if errors.Is(err, fstree.ErrFileNotFound) {
var errNotFound apistatus.ObjectNotFound
return nil, errNotFound
return GetRangeBigRes{}, errNotFound
}
return nil, fmt.Errorf("could not read object from fs tree: %w", err)
return GetRangeBigRes{}, fmt.Errorf("could not read object from fs tree: %w", err)
}
data, err = b.decompressor(data)
if err != nil {
return nil, fmt.Errorf("could not decompress object data: %w", err)
return GetRangeBigRes{}, fmt.Errorf("could not decompress object data: %w", err)
}
// unmarshal the object
obj := objectSDK.New()
if err := obj.Unmarshal(data); err != nil {
return nil, fmt.Errorf("could not unmarshal the object: %w", err)
return GetRangeBigRes{}, fmt.Errorf("could not unmarshal the object: %w", err)
}
payload := obj.Payload()
ln, off := prm.rng.GetLength(), prm.rng.GetOffset()
if pLen := uint64(len(payload)); pLen < ln+off {
return nil, object.ErrRangeOutOfBounds
return GetRangeBigRes{}, object.ErrRangeOutOfBounds
}
return &GetRangeBigRes{
return GetRangeBigRes{
rangeData: rangeData{
data: payload[off : off+ln],
},

View file

@ -22,6 +22,6 @@ type GetRangeSmallRes struct {
//
// Returns ErrRangeOutOfBounds if the requested object range is out of bounds.
// Returns an error of type apistatus.ObjectNotFound if the requested object is missing in blobovnicza(s).
func (b *BlobStor) GetRangeSmall(prm GetRangeSmallPrm) (*GetRangeSmallRes, error) {
func (b *BlobStor) GetRangeSmall(prm GetRangeSmallPrm) (GetRangeSmallRes, error) {
return b.blobovniczas.getRange(prm)
}

View file

@ -20,6 +20,6 @@ type GetSmallRes struct {
// did not allow to completely read the object.
//
// Returns an error of type apistatus.ObjectNotFound if the requested object is missing in blobovnicza(s).
func (b *BlobStor) GetSmall(prm GetSmallPrm) (*GetSmallRes, error) {
func (b *BlobStor) GetSmall(prm GetSmallPrm) (GetSmallRes, error) {
return b.blobovniczas.get(prm)
}

View file

@ -56,7 +56,7 @@ func (i *IteratePrm) IgnoreErrors() {
// did not allow to completely iterate over the storage.
//
// If handler returns an error, method wraps and returns it immediately.
func (b *BlobStor) Iterate(prm IteratePrm) (*IterateRes, error) {
func (b *BlobStor) Iterate(prm IteratePrm) (IterateRes, error) {
var elem IterationElement
err := b.blobovniczas.iterateBlobovniczas(prm.ignoreErrors, func(p string, blz *blobovnicza.Blobovnicza) error {
@ -83,7 +83,7 @@ func (b *BlobStor) Iterate(prm IteratePrm) (*IterateRes, error) {
return nil
})
if err != nil {
return nil, fmt.Errorf("blobovniczas iterator failure: %w", err)
return IterateRes{}, fmt.Errorf("blobovniczas iterator failure: %w", err)
}
elem.blzID = nil
@ -106,10 +106,10 @@ func (b *BlobStor) Iterate(prm IteratePrm) (*IterateRes, error) {
err = b.fsTree.Iterate(fsPrm)
if err != nil {
return nil, fmt.Errorf("fs tree iterator failure: %w", err)
return IterateRes{}, fmt.Errorf("fs tree iterator failure: %w", err)
}
return new(IterateRes), nil
return IterateRes{}, nil
}
// IterateBinaryObjects is a helper function which iterates over BlobStor and passes binary objects to f.

View file

@ -30,11 +30,11 @@ type PutRes struct {
//
// Returns any error encountered that
// did not allow to completely save the object.
func (b *BlobStor) Put(prm PutPrm) (*PutRes, error) {
func (b *BlobStor) Put(prm PutPrm) (PutRes, error) {
// marshal object
data, err := prm.obj.Marshal()
if err != nil {
return nil, fmt.Errorf("could not marshal the object: %w", err)
return PutRes{}, fmt.Errorf("could not marshal the object: %w", err)
}
return b.PutRaw(object.AddressOf(prm.obj), data, b.NeedsCompression(prm.obj))
@ -72,7 +72,7 @@ func (b *BlobStor) NeedsCompression(obj *objectSDK.Object) bool {
}
// PutRaw saves an already marshaled object in BLOB storage.
func (b *BlobStor) PutRaw(addr oid.Address, data []byte, compress bool) (*PutRes, error) {
func (b *BlobStor) PutRaw(addr oid.Address, data []byte, compress bool) (PutRes, error) {
big := b.isBig(data)
if big {
@ -89,12 +89,12 @@ func (b *BlobStor) PutRaw(addr oid.Address, data []byte, compress bool) (*PutRes
err = b.fsTree.Put(addr, data)
}
if err != nil {
return nil, err
return PutRes{}, err
}
storagelog.Write(b.log, storagelog.AddressField(addr), storagelog.OpField("fstree PUT"))
return new(PutRes), nil
return PutRes{}, nil
}
if compress {
@ -104,10 +104,10 @@ func (b *BlobStor) PutRaw(addr oid.Address, data []byte, compress bool) (*PutRes
// save object in blobovnicza
res, err := b.blobovniczas.put(addr, data)
if err != nil {
return nil, err
return PutRes{}, err
}
return &PutRes{
return PutRes{
roBlobovniczaID: roBlobovniczaID{
blobovniczaID: res,
},

View file

@ -52,7 +52,7 @@ func (s *Shard) Put(prm PutPrm) (PutRes, error) {
var (
err error
res *blobstor.PutRes
res blobstor.PutRes
)
if res, err = s.blobStor.Put(putPrm); err != nil {