forked from TrueCloudLab/frostfs-node
[#1418] blobstor: Do not use pointers as parameters
Signed-off-by: Pavel Karpy <carpawell@nspcc.ru>
This commit is contained in:
parent
babd382ba5
commit
281befec67
26 changed files with 89 additions and 66 deletions
|
@ -85,7 +85,7 @@ func objectInspectCmd(cmd *cobra.Command, _ []string) {
|
||||||
|
|
||||||
defer blz.Close()
|
defer blz.Close()
|
||||||
|
|
||||||
prm := new(blobovnicza.GetPrm)
|
var prm blobovnicza.GetPrm
|
||||||
prm.SetAddress(addr)
|
prm.SetAddress(addr)
|
||||||
res, err := blz.Get(prm)
|
res, err := blz.Get(prm)
|
||||||
common.ExitOnErr(cmd, common.Errf("could not fetch object: %w", err))
|
common.ExitOnErr(cmd, common.Errf("could not fetch object: %w", err))
|
||||||
|
|
|
@ -36,7 +36,7 @@ func testPutGet(t *testing.T, blz *Blobovnicza, addr oid.Address, sz uint64, ass
|
||||||
}
|
}
|
||||||
|
|
||||||
func testGet(t *testing.T, blz *Blobovnicza, addr oid.Address, expObj []byte, assertErr func(error) bool) {
|
func testGet(t *testing.T, blz *Blobovnicza, addr oid.Address, expObj []byte, assertErr func(error) bool) {
|
||||||
pGet := new(GetPrm)
|
var pGet GetPrm
|
||||||
pGet.SetAddress(addr)
|
pGet.SetAddress(addr)
|
||||||
|
|
||||||
// try to read object from Blobovnicza
|
// try to read object from Blobovnicza
|
||||||
|
@ -85,7 +85,7 @@ func TestBlobovnicza(t *testing.T) {
|
||||||
addr := testPutGet(t, blz, oidtest.Address(), filled, nil, nil)
|
addr := testPutGet(t, blz, oidtest.Address(), filled, nil, nil)
|
||||||
|
|
||||||
// remove the object
|
// remove the object
|
||||||
dPrm := new(DeletePrm)
|
var dPrm DeletePrm
|
||||||
dPrm.SetAddress(addr)
|
dPrm.SetAddress(addr)
|
||||||
|
|
||||||
_, err := blz.Delete(dPrm)
|
_, err := blz.Delete(dPrm)
|
||||||
|
|
|
@ -29,7 +29,7 @@ func (p *DeletePrm) SetAddress(addr oid.Address) {
|
||||||
// Returns an error of type apistatus.ObjectNotFound if the object to be deleted is not in blobovnicza.
|
// Returns an error of type apistatus.ObjectNotFound if the object to be deleted is not in blobovnicza.
|
||||||
//
|
//
|
||||||
// Should not be called in read-only configuration.
|
// Should not be called in read-only configuration.
|
||||||
func (b *Blobovnicza) Delete(prm *DeletePrm) (*DeleteRes, error) {
|
func (b *Blobovnicza) Delete(prm DeletePrm) (*DeleteRes, error) {
|
||||||
addrKey := addressKey(prm.addr)
|
addrKey := addressKey(prm.addr)
|
||||||
|
|
||||||
removed := false
|
removed := false
|
||||||
|
|
|
@ -24,7 +24,7 @@ func (p *GetPrm) SetAddress(addr oid.Address) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Object returns binary representation of the requested object.
|
// Object returns binary representation of the requested object.
|
||||||
func (p *GetRes) Object() []byte {
|
func (p GetRes) Object() []byte {
|
||||||
return p.obj
|
return p.obj
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -35,7 +35,7 @@ func (p *GetRes) Object() []byte {
|
||||||
//
|
//
|
||||||
// Returns an error of type apistatus.ObjectNotFound if the requested object is not
|
// Returns an error of type apistatus.ObjectNotFound if the requested object is not
|
||||||
// presented in Blobovnicza.
|
// presented in Blobovnicza.
|
||||||
func (b *Blobovnicza) Get(prm *GetPrm) (*GetRes, error) {
|
func (b *Blobovnicza) Get(prm GetPrm) (*GetRes, error) {
|
||||||
var (
|
var (
|
||||||
data []byte
|
data []byte
|
||||||
addrKey = addressKey(prm.addr)
|
addrKey = addressKey(prm.addr)
|
||||||
|
|
|
@ -205,8 +205,8 @@ func (b *blobovniczas) put(addr oid.Address, data []byte) (*blobovnicza.ID, erro
|
||||||
//
|
//
|
||||||
// If blobocvnicza ID is specified, only this blobovnicza is processed.
|
// If blobocvnicza ID is specified, only this blobovnicza is processed.
|
||||||
// Otherwise, all blobovniczas are processed descending weight.
|
// Otherwise, all blobovniczas are processed descending weight.
|
||||||
func (b *blobovniczas) get(prm *GetSmallPrm) (res *GetSmallRes, err error) {
|
func (b *blobovniczas) get(prm GetSmallPrm) (res *GetSmallRes, err error) {
|
||||||
bPrm := new(blobovnicza.GetPrm)
|
var bPrm blobovnicza.GetPrm
|
||||||
bPrm.SetAddress(prm.addr)
|
bPrm.SetAddress(prm.addr)
|
||||||
|
|
||||||
if prm.blobovniczaID != nil {
|
if prm.blobovniczaID != nil {
|
||||||
|
@ -255,8 +255,8 @@ func (b *blobovniczas) get(prm *GetSmallPrm) (res *GetSmallRes, err error) {
|
||||||
//
|
//
|
||||||
// If blobocvnicza ID is specified, only this blobovnicza is processed.
|
// If blobocvnicza ID is specified, only this blobovnicza is processed.
|
||||||
// Otherwise, all blobovniczas are processed descending weight.
|
// Otherwise, all blobovniczas are processed descending weight.
|
||||||
func (b *blobovniczas) delete(prm *DeleteSmallPrm) (res *DeleteSmallRes, err error) {
|
func (b *blobovniczas) delete(prm DeleteSmallPrm) (res *DeleteSmallRes, err error) {
|
||||||
bPrm := new(blobovnicza.DeletePrm)
|
var bPrm blobovnicza.DeletePrm
|
||||||
bPrm.SetAddress(prm.addr)
|
bPrm.SetAddress(prm.addr)
|
||||||
|
|
||||||
if prm.blobovniczaID != nil {
|
if prm.blobovniczaID != nil {
|
||||||
|
@ -310,7 +310,7 @@ func (b *blobovniczas) delete(prm *DeleteSmallPrm) (res *DeleteSmallRes, err err
|
||||||
//
|
//
|
||||||
// If blobocvnicza ID is specified, only this blobovnicza is processed.
|
// If blobocvnicza ID is specified, only this blobovnicza is processed.
|
||||||
// Otherwise, all blobovniczas are processed descending weight.
|
// Otherwise, all blobovniczas are processed descending weight.
|
||||||
func (b *blobovniczas) getRange(prm *GetRangeSmallPrm) (res *GetRangeSmallRes, err error) {
|
func (b *blobovniczas) getRange(prm GetRangeSmallPrm) (res *GetRangeSmallRes, err error) {
|
||||||
if prm.blobovniczaID != nil {
|
if prm.blobovniczaID != nil {
|
||||||
blz, err := b.openBlobovnicza(prm.blobovniczaID.String())
|
blz, err := b.openBlobovnicza(prm.blobovniczaID.String())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -360,7 +360,7 @@ func (b *blobovniczas) getRange(prm *GetRangeSmallPrm) (res *GetRangeSmallRes, e
|
||||||
// tries to delete object from particular blobovnicza.
|
// tries to delete object from particular blobovnicza.
|
||||||
//
|
//
|
||||||
// returns no error if object was removed from some blobovnicza of the same level.
|
// returns no error if object was removed from some blobovnicza of the same level.
|
||||||
func (b *blobovniczas) deleteObjectFromLevel(prm *blobovnicza.DeletePrm, blzPath string, tryActive bool, dp *DeleteSmallPrm) (*DeleteSmallRes, error) {
|
func (b *blobovniczas) deleteObjectFromLevel(prm blobovnicza.DeletePrm, blzPath string, tryActive bool, dp DeleteSmallPrm) (*DeleteSmallRes, error) {
|
||||||
lvlPath := filepath.Dir(blzPath)
|
lvlPath := filepath.Dir(blzPath)
|
||||||
|
|
||||||
log := b.log.With(
|
log := b.log.With(
|
||||||
|
@ -423,7 +423,7 @@ func (b *blobovniczas) deleteObjectFromLevel(prm *blobovnicza.DeletePrm, blzPath
|
||||||
// tries to read object from particular blobovnicza.
|
// tries to read object from particular blobovnicza.
|
||||||
//
|
//
|
||||||
// returns error if object could not be read from any blobovnicza of the same level.
|
// returns error if object could not be read from any blobovnicza of the same level.
|
||||||
func (b *blobovniczas) getObjectFromLevel(prm *blobovnicza.GetPrm, blzPath string, tryActive bool) (*GetSmallRes, error) {
|
func (b *blobovniczas) getObjectFromLevel(prm blobovnicza.GetPrm, blzPath string, tryActive bool) (*GetSmallRes, error) {
|
||||||
lvlPath := filepath.Dir(blzPath)
|
lvlPath := filepath.Dir(blzPath)
|
||||||
|
|
||||||
log := b.log.With(
|
log := b.log.With(
|
||||||
|
@ -487,7 +487,7 @@ func (b *blobovniczas) getObjectFromLevel(prm *blobovnicza.GetPrm, blzPath strin
|
||||||
// tries to read range of object payload data from particular blobovnicza.
|
// tries to read range of object payload data from particular blobovnicza.
|
||||||
//
|
//
|
||||||
// returns error if object could not be read from any blobovnicza of the same level.
|
// returns error if object could not be read from any blobovnicza of the same level.
|
||||||
func (b *blobovniczas) getRangeFromLevel(prm *GetRangeSmallPrm, blzPath string, tryActive bool) (*GetRangeSmallRes, error) {
|
func (b *blobovniczas) getRangeFromLevel(prm GetRangeSmallPrm, blzPath string, tryActive bool) (*GetRangeSmallRes, error) {
|
||||||
lvlPath := filepath.Dir(blzPath)
|
lvlPath := filepath.Dir(blzPath)
|
||||||
|
|
||||||
log := b.log.With(
|
log := b.log.With(
|
||||||
|
@ -560,7 +560,7 @@ func (b *blobovniczas) getRangeFromLevel(prm *GetRangeSmallPrm, blzPath string,
|
||||||
}
|
}
|
||||||
|
|
||||||
// removes object from blobovnicza and returns DeleteSmallRes.
|
// removes object from blobovnicza and returns DeleteSmallRes.
|
||||||
func (b *blobovniczas) deleteObject(blz *blobovnicza.Blobovnicza, prm *blobovnicza.DeletePrm, dp *DeleteSmallPrm) (*DeleteSmallRes, error) {
|
func (b *blobovniczas) deleteObject(blz *blobovnicza.Blobovnicza, prm blobovnicza.DeletePrm, dp DeleteSmallPrm) (*DeleteSmallRes, error) {
|
||||||
_, err := blz.Delete(prm)
|
_, err := blz.Delete(prm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -576,7 +576,7 @@ func (b *blobovniczas) deleteObject(blz *blobovnicza.Blobovnicza, prm *blobovnic
|
||||||
}
|
}
|
||||||
|
|
||||||
// reads object from blobovnicza and returns GetSmallRes.
|
// reads object from blobovnicza and returns GetSmallRes.
|
||||||
func (b *blobovniczas) getObject(blz *blobovnicza.Blobovnicza, prm *blobovnicza.GetPrm) (*GetSmallRes, error) {
|
func (b *blobovniczas) getObject(blz *blobovnicza.Blobovnicza, prm blobovnicza.GetPrm) (*GetSmallRes, error) {
|
||||||
res, err := blz.Get(prm)
|
res, err := blz.Get(prm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -602,8 +602,8 @@ func (b *blobovniczas) getObject(blz *blobovnicza.Blobovnicza, prm *blobovnicza.
|
||||||
}
|
}
|
||||||
|
|
||||||
// reads range of object payload data from blobovnicza and returns GetRangeSmallRes.
|
// reads range of object payload data from blobovnicza and returns GetRangeSmallRes.
|
||||||
func (b *blobovniczas) getObjectRange(blz *blobovnicza.Blobovnicza, prm *GetRangeSmallPrm) (*GetRangeSmallRes, error) {
|
func (b *blobovniczas) getObjectRange(blz *blobovnicza.Blobovnicza, prm GetRangeSmallPrm) (*GetRangeSmallRes, error) {
|
||||||
gPrm := new(blobovnicza.GetPrm)
|
var gPrm blobovnicza.GetPrm
|
||||||
gPrm.SetAddress(prm.addr)
|
gPrm.SetAddress(prm.addr)
|
||||||
|
|
||||||
// we don't use GetRange call for now since blobovnicza
|
// we don't use GetRange call for now since blobovnicza
|
||||||
|
|
|
@ -80,7 +80,7 @@ func TestBlobovniczas(t *testing.T) {
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
// get w/ blobovnicza ID
|
// get w/ blobovnicza ID
|
||||||
prm := new(GetSmallPrm)
|
var prm GetSmallPrm
|
||||||
prm.SetBlobovniczaID(id)
|
prm.SetBlobovniczaID(id)
|
||||||
prm.SetAddress(addr)
|
prm.SetAddress(addr)
|
||||||
|
|
||||||
|
@ -96,7 +96,7 @@ func TestBlobovniczas(t *testing.T) {
|
||||||
require.Equal(t, obj, res.Object())
|
require.Equal(t, obj, res.Object())
|
||||||
|
|
||||||
// get range w/ blobovnicza ID
|
// get range w/ blobovnicza ID
|
||||||
rngPrm := new(GetRangeSmallPrm)
|
var rngPrm GetRangeSmallPrm
|
||||||
rngPrm.SetBlobovniczaID(id)
|
rngPrm.SetBlobovniczaID(id)
|
||||||
rngPrm.SetAddress(addr)
|
rngPrm.SetAddress(addr)
|
||||||
|
|
||||||
|
@ -122,8 +122,8 @@ func TestBlobovniczas(t *testing.T) {
|
||||||
require.Equal(t, payload[off:off+ln], rngRes.RangeData())
|
require.Equal(t, payload[off:off+ln], rngRes.RangeData())
|
||||||
}
|
}
|
||||||
|
|
||||||
dPrm := new(DeleteSmallPrm)
|
var dPrm DeleteSmallPrm
|
||||||
gPrm := new(GetSmallPrm)
|
var gPrm GetSmallPrm
|
||||||
|
|
||||||
for i := range addrList {
|
for i := range addrList {
|
||||||
dPrm.SetAddress(addrList[i])
|
dPrm.SetAddress(addrList[i])
|
||||||
|
|
|
@ -37,22 +37,22 @@ func TestCompression(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
testGet := func(t *testing.T, b *BlobStor, i int) {
|
testGet := func(t *testing.T, b *BlobStor, i int) {
|
||||||
res1, err := b.GetSmall(&GetSmallPrm{address: address{object.AddressOf(smallObj[i])}})
|
res1, err := b.GetSmall(GetSmallPrm{address: address{object.AddressOf(smallObj[i])}})
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, smallObj[i], res1.Object())
|
require.Equal(t, smallObj[i], res1.Object())
|
||||||
|
|
||||||
res2, err := b.GetBig(&GetBigPrm{address: address{object.AddressOf(bigObj[i])}})
|
res2, err := b.GetBig(GetBigPrm{address: address{object.AddressOf(bigObj[i])}})
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, bigObj[i], res2.Object())
|
require.Equal(t, bigObj[i], res2.Object())
|
||||||
}
|
}
|
||||||
|
|
||||||
testPut := func(t *testing.T, b *BlobStor, i int) {
|
testPut := func(t *testing.T, b *BlobStor, i int) {
|
||||||
prm := new(PutPrm)
|
var prm PutPrm
|
||||||
prm.SetObject(smallObj[i])
|
prm.SetObject(smallObj[i])
|
||||||
_, err = b.Put(prm)
|
_, err = b.Put(prm)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
prm = new(PutPrm)
|
prm = PutPrm{}
|
||||||
prm.SetObject(bigObj[i])
|
prm.SetObject(bigObj[i])
|
||||||
_, err = b.Put(prm)
|
_, err = b.Put(prm)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
|
@ -22,7 +22,7 @@ type DeleteBigRes struct{}
|
||||||
// to completely remove the object.
|
// to completely remove the object.
|
||||||
//
|
//
|
||||||
// Returns an error of type apistatus.ObjectNotFound if there is no object to delete.
|
// Returns an error of type apistatus.ObjectNotFound if there is no object to delete.
|
||||||
func (b *BlobStor) DeleteBig(prm *DeleteBigPrm) (*DeleteBigRes, error) {
|
func (b *BlobStor) DeleteBig(prm DeleteBigPrm) (*DeleteBigRes, error) {
|
||||||
err := b.fsTree.Delete(prm.addr)
|
err := b.fsTree.Delete(prm.addr)
|
||||||
if errors.Is(err, fstree.ErrFileNotFound) {
|
if errors.Is(err, fstree.ErrFileNotFound) {
|
||||||
var errNotFound apistatus.ObjectNotFound
|
var errNotFound apistatus.ObjectNotFound
|
||||||
|
|
|
@ -18,6 +18,6 @@ type DeleteSmallRes struct{}
|
||||||
// to completely remove the object.
|
// to completely remove the object.
|
||||||
//
|
//
|
||||||
// Returns an error of type apistatus.ObjectNotFound if there is no object to delete.
|
// Returns an error of type apistatus.ObjectNotFound if there is no object to delete.
|
||||||
func (b *BlobStor) DeleteSmall(prm *DeleteSmallPrm) (*DeleteSmallRes, error) {
|
func (b *BlobStor) DeleteSmall(prm DeleteSmallPrm) (*DeleteSmallRes, error) {
|
||||||
return b.blobovniczas.delete(prm)
|
return b.blobovniczas.delete(prm)
|
||||||
}
|
}
|
||||||
|
|
|
@ -29,7 +29,7 @@ func (r ExistsRes) Exists() bool {
|
||||||
//
|
//
|
||||||
// Returns any error encountered that did not allow
|
// Returns any error encountered that did not allow
|
||||||
// to completely check object existence.
|
// to completely check object existence.
|
||||||
func (b *BlobStor) Exists(prm *ExistsPrm) (*ExistsRes, error) {
|
func (b *BlobStor) Exists(prm ExistsPrm) (*ExistsRes, error) {
|
||||||
// check presence in shallow dir first (cheaper)
|
// check presence in shallow dir first (cheaper)
|
||||||
exists, err := b.existsBig(prm.addr)
|
exists, err := b.existsBig(prm.addr)
|
||||||
|
|
||||||
|
@ -81,7 +81,7 @@ func (b *BlobStor) existsSmall(addr oid.Address) (bool, error) {
|
||||||
func (b *blobovniczas) existsSmall(addr oid.Address) (bool, error) {
|
func (b *blobovniczas) existsSmall(addr oid.Address) (bool, error) {
|
||||||
activeCache := make(map[string]struct{})
|
activeCache := make(map[string]struct{})
|
||||||
|
|
||||||
prm := new(blobovnicza.GetPrm)
|
var prm blobovnicza.GetPrm
|
||||||
prm.SetAddress(addr)
|
prm.SetAddress(addr)
|
||||||
|
|
||||||
var found bool
|
var found bool
|
||||||
|
|
|
@ -30,13 +30,13 @@ func TestExists(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
for i := range objects {
|
for i := range objects {
|
||||||
prm := new(PutPrm)
|
var prm PutPrm
|
||||||
prm.SetObject(objects[i])
|
prm.SetObject(objects[i])
|
||||||
_, err = b.Put(prm)
|
_, err = b.Put(prm)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
prm := new(ExistsPrm)
|
var prm ExistsPrm
|
||||||
for i := range objects {
|
for i := range objects {
|
||||||
prm.SetAddress(objectCore.AddressOf(objects[i]))
|
prm.SetAddress(objectCore.AddressOf(objects[i]))
|
||||||
|
|
||||||
|
|
|
@ -75,23 +75,21 @@ type IterationPrm struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
// WithHandler sets a function to call on each object.
|
// WithHandler sets a function to call on each object.
|
||||||
func (p *IterationPrm) WithHandler(f func(addr oid.Address, data []byte) error) *IterationPrm {
|
func (p *IterationPrm) WithHandler(f func(addr oid.Address, data []byte) error) {
|
||||||
p.handler = f
|
p.handler = f
|
||||||
return p
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// WithIgnoreErrors sets a flag indicating whether errors should be ignored.
|
// WithIgnoreErrors sets a flag indicating whether errors should be ignored.
|
||||||
func (p *IterationPrm) WithIgnoreErrors(ignore bool) *IterationPrm {
|
func (p *IterationPrm) WithIgnoreErrors(ignore bool) {
|
||||||
p.ignoreErrors = ignore
|
p.ignoreErrors = ignore
|
||||||
return p
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Iterate iterates over all stored objects.
|
// Iterate iterates over all stored objects.
|
||||||
func (t *FSTree) Iterate(prm *IterationPrm) error {
|
func (t *FSTree) Iterate(prm IterationPrm) error {
|
||||||
return t.iterate(0, []string{t.RootPath}, prm)
|
return t.iterate(0, []string{t.RootPath}, prm)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *FSTree) iterate(depth int, curPath []string, prm *IterationPrm) error {
|
func (t *FSTree) iterate(depth int, curPath []string, prm IterationPrm) error {
|
||||||
curName := strings.Join(curPath[1:], "")
|
curName := strings.Join(curPath[1:], "")
|
||||||
des, err := os.ReadDir(filepath.Join(curPath...))
|
des, err := os.ReadDir(filepath.Join(curPath...))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -73,13 +73,16 @@ func TestFSTree(t *testing.T) {
|
||||||
|
|
||||||
t.Run("iterate", func(t *testing.T) {
|
t.Run("iterate", func(t *testing.T) {
|
||||||
n := 0
|
n := 0
|
||||||
err := fs.Iterate(new(IterationPrm).WithHandler(func(addr oid.Address, data []byte) error {
|
var iterationPrm IterationPrm
|
||||||
|
iterationPrm.WithHandler(func(addr oid.Address, data []byte) error {
|
||||||
n++
|
n++
|
||||||
expected, ok := store[addr.EncodeToString()]
|
expected, ok := store[addr.EncodeToString()]
|
||||||
require.True(t, ok, "object %s was not found", addr.EncodeToString())
|
require.True(t, ok, "object %s was not found", addr.EncodeToString())
|
||||||
require.Equal(t, data, expected)
|
require.Equal(t, data, expected)
|
||||||
return nil
|
return nil
|
||||||
}))
|
})
|
||||||
|
|
||||||
|
err := fs.Iterate(iterationPrm)
|
||||||
|
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, count, n)
|
require.Equal(t, count, n)
|
||||||
|
@ -87,12 +90,15 @@ func TestFSTree(t *testing.T) {
|
||||||
t.Run("leave early", func(t *testing.T) {
|
t.Run("leave early", func(t *testing.T) {
|
||||||
n := 0
|
n := 0
|
||||||
errStop := errors.New("stop")
|
errStop := errors.New("stop")
|
||||||
err := fs.Iterate(new(IterationPrm).WithHandler(func(addr oid.Address, data []byte) error {
|
|
||||||
|
iterationPrm.WithHandler(func(addr oid.Address, data []byte) error {
|
||||||
if n++; n == count-1 {
|
if n++; n == count-1 {
|
||||||
return errStop
|
return errStop
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}))
|
})
|
||||||
|
|
||||||
|
err := fs.Iterate(iterationPrm)
|
||||||
|
|
||||||
require.ErrorIs(t, err, errStop)
|
require.ErrorIs(t, err, errStop)
|
||||||
require.Equal(t, count-1, n)
|
require.Equal(t, count-1, n)
|
||||||
|
@ -114,23 +120,29 @@ func TestFSTree(t *testing.T) {
|
||||||
require.NoError(t, util.MkdirAllX(filepath.Dir(p), fs.Permissions))
|
require.NoError(t, util.MkdirAllX(filepath.Dir(p), fs.Permissions))
|
||||||
require.NoError(t, os.WriteFile(p, []byte{1, 2, 3}, fs.Permissions))
|
require.NoError(t, os.WriteFile(p, []byte{1, 2, 3}, fs.Permissions))
|
||||||
|
|
||||||
err := fs.Iterate(new(IterationPrm).WithHandler(func(addr oid.Address, data []byte) error {
|
iterationPrm.WithIgnoreErrors(true)
|
||||||
|
iterationPrm.WithHandler(func(addr oid.Address, data []byte) error {
|
||||||
n++
|
n++
|
||||||
return nil
|
return nil
|
||||||
}).WithIgnoreErrors(true))
|
})
|
||||||
|
|
||||||
|
err := fs.Iterate(iterationPrm)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, count, n)
|
require.Equal(t, count, n)
|
||||||
|
|
||||||
t.Run("error from handler is returned", func(t *testing.T) {
|
t.Run("error from handler is returned", func(t *testing.T) {
|
||||||
expectedErr := errors.New("expected error")
|
expectedErr := errors.New("expected error")
|
||||||
n := 0
|
n := 0
|
||||||
err := fs.Iterate(new(IterationPrm).WithHandler(func(addr oid.Address, data []byte) error {
|
|
||||||
|
iterationPrm.WithHandler(func(addr oid.Address, data []byte) error {
|
||||||
n++
|
n++
|
||||||
if n == count/2 { // process some iterations
|
if n == count/2 { // process some iterations
|
||||||
return expectedErr
|
return expectedErr
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}).WithIgnoreErrors(true))
|
})
|
||||||
|
|
||||||
|
err := fs.Iterate(iterationPrm)
|
||||||
require.ErrorIs(t, err, expectedErr)
|
require.ErrorIs(t, err, expectedErr)
|
||||||
require.Equal(t, count/2, n)
|
require.Equal(t, count/2, n)
|
||||||
})
|
})
|
||||||
|
|
|
@ -26,7 +26,7 @@ type GetBigRes struct {
|
||||||
//
|
//
|
||||||
// Returns an error of type apistatus.ObjectNotFound if the requested object is not
|
// Returns an error of type apistatus.ObjectNotFound if the requested object is not
|
||||||
// presented in shallow dir.
|
// presented in shallow dir.
|
||||||
func (b *BlobStor) GetBig(prm *GetBigPrm) (*GetBigRes, error) {
|
func (b *BlobStor) GetBig(prm GetBigPrm) (*GetBigRes, error) {
|
||||||
// get compressed object data
|
// get compressed object data
|
||||||
data, err := b.fsTree.Get(prm.addr)
|
data, err := b.fsTree.Get(prm.addr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -28,7 +28,7 @@ type GetRangeBigRes struct {
|
||||||
//
|
//
|
||||||
// Returns ErrRangeOutOfBounds if the requested object range is out of bounds.
|
// Returns ErrRangeOutOfBounds if the requested object range is out of bounds.
|
||||||
// Returns an error of type apistatus.ObjectNotFound if object is missing.
|
// Returns an error of type apistatus.ObjectNotFound if object is missing.
|
||||||
func (b *BlobStor) GetRangeBig(prm *GetRangeBigPrm) (*GetRangeBigRes, error) {
|
func (b *BlobStor) GetRangeBig(prm GetRangeBigPrm) (*GetRangeBigRes, error) {
|
||||||
// get compressed object data
|
// get compressed object data
|
||||||
data, err := b.fsTree.Get(prm.addr)
|
data, err := b.fsTree.Get(prm.addr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -22,6 +22,6 @@ type GetRangeSmallRes struct {
|
||||||
//
|
//
|
||||||
// Returns ErrRangeOutOfBounds if the requested object range is out of bounds.
|
// Returns ErrRangeOutOfBounds if the requested object range is out of bounds.
|
||||||
// Returns an error of type apistatus.ObjectNotFound if the requested object is missing in blobovnicza(s).
|
// Returns an error of type apistatus.ObjectNotFound if the requested object is missing in blobovnicza(s).
|
||||||
func (b *BlobStor) GetRangeSmall(prm *GetRangeSmallPrm) (*GetRangeSmallRes, error) {
|
func (b *BlobStor) GetRangeSmall(prm GetRangeSmallPrm) (*GetRangeSmallRes, error) {
|
||||||
return b.blobovniczas.getRange(prm)
|
return b.blobovniczas.getRange(prm)
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,6 +20,6 @@ type GetSmallRes struct {
|
||||||
// did not allow to completely read the object.
|
// did not allow to completely read the object.
|
||||||
//
|
//
|
||||||
// Returns an error of type apistatus.ObjectNotFound if the requested object is missing in blobovnicza(s).
|
// Returns an error of type apistatus.ObjectNotFound if the requested object is missing in blobovnicza(s).
|
||||||
func (b *BlobStor) GetSmall(prm *GetSmallPrm) (*GetSmallRes, error) {
|
func (b *BlobStor) GetSmall(prm GetSmallPrm) (*GetSmallRes, error) {
|
||||||
return b.blobovniczas.get(prm)
|
return b.blobovniczas.get(prm)
|
||||||
}
|
}
|
||||||
|
|
|
@ -88,7 +88,9 @@ func (b *BlobStor) Iterate(prm IteratePrm) (*IterateRes, error) {
|
||||||
|
|
||||||
elem.blzID = nil
|
elem.blzID = nil
|
||||||
|
|
||||||
err = b.fsTree.Iterate(new(fstree.IterationPrm).WithHandler(func(_ oid.Address, data []byte) error {
|
var fsPrm fstree.IterationPrm
|
||||||
|
fsPrm.WithIgnoreErrors(prm.ignoreErrors)
|
||||||
|
fsPrm.WithHandler(func(_ oid.Address, data []byte) error {
|
||||||
// decompress the data
|
// decompress the data
|
||||||
elem.data, err = b.decompressor(data)
|
elem.data, err = b.decompressor(data)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -99,7 +101,9 @@ func (b *BlobStor) Iterate(prm IteratePrm) (*IterateRes, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
return prm.handler(elem)
|
return prm.handler(elem)
|
||||||
}).WithIgnoreErrors(prm.ignoreErrors))
|
})
|
||||||
|
|
||||||
|
err = b.fsTree.Iterate(fsPrm)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("fs tree iterator failure: %w", err)
|
return nil, fmt.Errorf("fs tree iterator failure: %w", err)
|
||||||
|
|
|
@ -30,7 +30,7 @@ type PutRes struct {
|
||||||
//
|
//
|
||||||
// Returns any error encountered that
|
// Returns any error encountered that
|
||||||
// did not allow to completely save the object.
|
// did not allow to completely save the object.
|
||||||
func (b *BlobStor) Put(prm *PutPrm) (*PutRes, error) {
|
func (b *BlobStor) Put(prm PutPrm) (*PutRes, error) {
|
||||||
// marshal object
|
// marshal object
|
||||||
data, err := prm.obj.Marshal()
|
data, err := prm.obj.Marshal()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -34,8 +34,8 @@ func (s *Shard) Delete(prm DeletePrm) (*DeleteRes, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
ln := len(prm.addr)
|
ln := len(prm.addr)
|
||||||
delSmallPrm := new(blobstor.DeleteSmallPrm)
|
var delSmallPrm blobstor.DeleteSmallPrm
|
||||||
delBigPrm := new(blobstor.DeleteBigPrm)
|
var delBigPrm blobstor.DeleteBigPrm
|
||||||
|
|
||||||
smalls := make(map[oid.Address]*blobovnicza.ID, ln)
|
smalls := make(map[oid.Address]*blobovnicza.ID, ln)
|
||||||
|
|
||||||
|
|
|
@ -43,7 +43,7 @@ func (s *Shard) Exists(prm ExistsPrm) (*ExistsRes, error) {
|
||||||
// If the shard is in degraded mode, try to consult blobstor directly.
|
// If the shard is in degraded mode, try to consult blobstor directly.
|
||||||
// Otherwise, just return an error.
|
// Otherwise, just return an error.
|
||||||
if s.GetMode() == ModeDegraded {
|
if s.GetMode() == ModeDegraded {
|
||||||
p := new(blobstor.ExistsPrm)
|
var p blobstor.ExistsPrm
|
||||||
p.SetAddress(prm.addr)
|
p.SetAddress(prm.addr)
|
||||||
|
|
||||||
res, bErr := s.blobStor.Exists(p)
|
res, bErr := s.blobStor.Exists(p)
|
||||||
|
|
|
@ -67,7 +67,7 @@ func (s *Shard) Get(prm GetPrm) (*GetRes, error) {
|
||||||
var big, small storFetcher
|
var big, small storFetcher
|
||||||
|
|
||||||
big = func(stor *blobstor.BlobStor, _ *blobovnicza.ID) (*objectSDK.Object, error) {
|
big = func(stor *blobstor.BlobStor, _ *blobovnicza.ID) (*objectSDK.Object, error) {
|
||||||
getBigPrm := new(blobstor.GetBigPrm)
|
var getBigPrm blobstor.GetBigPrm
|
||||||
getBigPrm.SetAddress(prm.addr)
|
getBigPrm.SetAddress(prm.addr)
|
||||||
|
|
||||||
res, err := stor.GetBig(getBigPrm)
|
res, err := stor.GetBig(getBigPrm)
|
||||||
|
@ -79,7 +79,7 @@ func (s *Shard) Get(prm GetPrm) (*GetRes, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
small = func(stor *blobstor.BlobStor, id *blobovnicza.ID) (*objectSDK.Object, error) {
|
small = func(stor *blobstor.BlobStor, id *blobovnicza.ID) (*objectSDK.Object, error) {
|
||||||
getSmallPrm := new(blobstor.GetSmallPrm)
|
var getSmallPrm blobstor.GetSmallPrm
|
||||||
getSmallPrm.SetAddress(prm.addr)
|
getSmallPrm.SetAddress(prm.addr)
|
||||||
getSmallPrm.SetBlobovniczaID(id)
|
getSmallPrm.SetBlobovniczaID(id)
|
||||||
|
|
||||||
|
|
|
@ -35,7 +35,7 @@ func (s *Shard) Put(prm PutPrm) (*PutRes, error) {
|
||||||
return nil, ErrReadOnlyMode
|
return nil, ErrReadOnlyMode
|
||||||
}
|
}
|
||||||
|
|
||||||
putPrm := new(blobstor.PutPrm) // form Put parameters
|
var putPrm blobstor.PutPrm // form Put parameters
|
||||||
putPrm.SetObject(prm.obj)
|
putPrm.SetObject(prm.obj)
|
||||||
|
|
||||||
// exist check are not performed there, these checks should be executed
|
// exist check are not performed there, these checks should be executed
|
||||||
|
|
|
@ -74,7 +74,7 @@ func (s *Shard) GetRange(prm RngPrm) (*RngRes, error) {
|
||||||
rng.SetLength(prm.ln)
|
rng.SetLength(prm.ln)
|
||||||
|
|
||||||
big = func(stor *blobstor.BlobStor, _ *blobovnicza.ID) (*object.Object, error) {
|
big = func(stor *blobstor.BlobStor, _ *blobovnicza.ID) (*object.Object, error) {
|
||||||
getRngBigPrm := new(blobstor.GetRangeBigPrm)
|
var getRngBigPrm blobstor.GetRangeBigPrm
|
||||||
getRngBigPrm.SetAddress(prm.addr)
|
getRngBigPrm.SetAddress(prm.addr)
|
||||||
getRngBigPrm.SetRange(rng)
|
getRngBigPrm.SetRange(rng)
|
||||||
|
|
||||||
|
@ -90,7 +90,7 @@ func (s *Shard) GetRange(prm RngPrm) (*RngRes, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
small = func(stor *blobstor.BlobStor, id *blobovnicza.ID) (*object.Object, error) {
|
small = func(stor *blobstor.BlobStor, id *blobovnicza.ID) (*object.Object, error) {
|
||||||
getRngSmallPrm := new(blobstor.GetRangeSmallPrm)
|
var getRngSmallPrm blobstor.GetRangeSmallPrm
|
||||||
getRngSmallPrm.SetAddress(prm.addr)
|
getRngSmallPrm.SetAddress(prm.addr)
|
||||||
getRngSmallPrm.SetRange(rng)
|
getRngSmallPrm.SetRange(rng)
|
||||||
getRngSmallPrm.SetBlobovniczaID(id)
|
getRngSmallPrm.SetBlobovniczaID(id)
|
||||||
|
|
|
@ -133,7 +133,9 @@ func (c *cache) flushBigObjects() {
|
||||||
}
|
}
|
||||||
|
|
||||||
evictNum := 0
|
evictNum := 0
|
||||||
_ = c.fsTree.Iterate(new(fstree.IterationPrm).WithHandler(func(addr oid.Address, data []byte) error {
|
|
||||||
|
var prm fstree.IterationPrm
|
||||||
|
prm.WithHandler(func(addr oid.Address, data []byte) error {
|
||||||
sAddr := addr.EncodeToString()
|
sAddr := addr.EncodeToString()
|
||||||
|
|
||||||
if _, ok := c.store.flushed.Peek(sAddr); ok {
|
if _, ok := c.store.flushed.Peek(sAddr); ok {
|
||||||
|
@ -161,7 +163,9 @@ func (c *cache) flushBigObjects() {
|
||||||
evictNum++
|
evictNum++
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}))
|
})
|
||||||
|
|
||||||
|
_ = c.fsTree.Iterate(prm)
|
||||||
|
|
||||||
// evict objects which were successfully written to BlobStor
|
// evict objects which were successfully written to BlobStor
|
||||||
c.evictObjects(evictNum)
|
c.evictObjects(evictNum)
|
||||||
|
@ -215,8 +219,9 @@ func (c *cache) writeObject(obj *object.Object, metaOnly bool) error {
|
||||||
var id *blobovnicza.ID
|
var id *blobovnicza.ID
|
||||||
|
|
||||||
if !metaOnly {
|
if !metaOnly {
|
||||||
prm := new(blobstor.PutPrm)
|
var prm blobstor.PutPrm
|
||||||
prm.SetObject(obj)
|
prm.SetObject(obj)
|
||||||
|
|
||||||
res, err := c.blobstor.Put(prm)
|
res, err := c.blobstor.Put(prm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
|
|
@ -51,12 +51,16 @@ func (c *cache) Iterate(prm IterationPrm) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
return c.fsTree.Iterate(new(fstree.IterationPrm).WithHandler(func(addr oid.Address, data []byte) error {
|
var fsPrm fstree.IterationPrm
|
||||||
|
fsPrm.WithIgnoreErrors(prm.ignoreErrors)
|
||||||
|
fsPrm.WithHandler(func(addr oid.Address, data []byte) error {
|
||||||
if _, ok := c.flushed.Peek(addr.EncodeToString()); ok {
|
if _, ok := c.flushed.Peek(addr.EncodeToString()); ok {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
return prm.handler(data)
|
return prm.handler(data)
|
||||||
}).WithIgnoreErrors(prm.ignoreErrors))
|
})
|
||||||
|
|
||||||
|
return c.fsTree.Iterate(fsPrm)
|
||||||
}
|
}
|
||||||
|
|
||||||
// IterateDB iterates over all objects stored in bbolt.DB instance and passes them to f until error return.
|
// IterateDB iterates over all objects stored in bbolt.DB instance and passes them to f until error return.
|
||||||
|
|
Loading…
Reference in a new issue