[#1418] meta: Do not use pointers as parameters

Signed-off-by: Pavel Karpy <carpawell@nspcc.ru>
This commit is contained in:
Pavel Karpy 2022-05-20 19:48:14 +03:00 committed by fyrchik
parent 1c100fb4b0
commit e265ce2d52
20 changed files with 252 additions and 223 deletions

View file

@ -24,17 +24,18 @@ type DeleteRes struct{}
// WithAddresses is a Delete option to set the addresses of the objects to delete.
//
// Option is required.
func (p *DeletePrm) WithAddresses(addrs ...oid.Address) *DeletePrm {
func (p *DeletePrm) WithAddresses(addrs ...oid.Address) {
if p != nil {
p.addrs = addrs
}
return p
}
// Delete removes objects from DB.
func Delete(db *DB, addrs ...oid.Address) error {
_, err := db.Delete(new(DeletePrm).WithAddresses(addrs...))
var deletePrm DeletePrm
deletePrm.WithAddresses(addrs...)
_, err := db.Delete(deletePrm)
return err
}
@ -49,7 +50,7 @@ type referenceNumber struct {
type referenceCounter map[string]*referenceNumber
// Delete removed object records from metabase indexes.
func (db *DB) Delete(prm *DeletePrm) (*DeleteRes, error) {
func (db *DB) Delete(prm DeletePrm) (*DeleteRes, error) {
err := db.boltDB.Update(func(tx *bbolt.Tx) error {
return db.deleteGroup(tx, prm.addrs)
})

View file

@ -24,12 +24,10 @@ type ExistsRes struct {
var ErrLackSplitInfo = errors.New("no split info on parent object")
// WithAddress is an Exists option to set object checked for existence.
func (p *ExistsPrm) WithAddress(addr oid.Address) *ExistsPrm {
func (p *ExistsPrm) WithAddress(addr oid.Address) {
if p != nil {
p.addr = addr
}
return p
}
// Exists returns the fact that the object is in the metabase.
@ -41,7 +39,10 @@ func (p *ExistsRes) Exists() bool {
//
// See DB.Exists docs.
func Exists(db *DB, addr oid.Address) (bool, error) {
r, err := db.Exists(new(ExistsPrm).WithAddress(addr))
var existsPrm ExistsPrm
existsPrm.WithAddress(addr)
r, err := db.Exists(existsPrm)
if err != nil {
return false, err
}
@ -53,7 +54,7 @@ func Exists(db *DB, addr oid.Address) (bool, error) {
// returns true if addr is in primary index or false if it is not.
//
// Returns an error of type apistatus.ObjectAlreadyRemoved if object has been placed in graveyard.
func (db *DB) Exists(prm *ExistsPrm) (res *ExistsRes, err error) {
func (db *DB) Exists(prm ExistsPrm) (res *ExistsRes, err error) {
res = new(ExistsRes)
err = db.boltDB.View(func(tx *bbolt.Tx) error {

View file

@ -24,23 +24,19 @@ type GetRes struct {
// WithAddress is a Get option to set the address of the requested object.
//
// Option is required.
func (p *GetPrm) WithAddress(addr oid.Address) *GetPrm {
func (p *GetPrm) WithAddress(addr oid.Address) {
if p != nil {
p.addr = addr
}
return p
}
// WithRaw is a Get option to set raw flag value. If flag is unset, then Get
// returns header of virtual object, otherwise it returns SplitInfo of virtual
// object.
func (p *GetPrm) WithRaw(raw bool) *GetPrm {
func (p *GetPrm) WithRaw(raw bool) {
if p != nil {
p.raw = raw
}
return p
}
// Header returns the requested object header.
@ -50,7 +46,10 @@ func (r *GetRes) Header() *objectSDK.Object {
// Get reads the object from DB.
func Get(db *DB, addr oid.Address) (*objectSDK.Object, error) {
r, err := db.Get(new(GetPrm).WithAddress(addr))
var getPrm GetPrm
getPrm.WithAddress(addr)
r, err := db.Get(getPrm)
if err != nil {
return nil, err
}
@ -60,7 +59,11 @@ func Get(db *DB, addr oid.Address) (*objectSDK.Object, error) {
// GetRaw reads physically stored object from DB.
func GetRaw(db *DB, addr oid.Address, raw bool) (*objectSDK.Object, error) {
r, err := db.Get(new(GetPrm).WithAddress(addr).WithRaw(raw))
var getPrm GetPrm
getPrm.WithAddress(addr)
getPrm.WithRaw(raw)
r, err := db.Get(getPrm)
if err != nil {
return nil, err
}
@ -72,7 +75,7 @@ func GetRaw(db *DB, addr oid.Address, raw bool) (*objectSDK.Object, error) {
//
// Returns an error of type apistatus.ObjectNotFound if object is missing in DB.
// Returns an error of type apistatus.ObjectAlreadyRemoved if object has been placed in graveyard.
func (db *DB) Get(prm *GetPrm) (res *GetRes, err error) {
func (db *DB) Get(prm GetPrm) (res *GetRes, err error) {
res = new(GetRes)
err = db.boltDB.View(func(tx *bbolt.Tx) error {

View file

@ -123,7 +123,11 @@ func TestDB_Get(t *testing.T) {
require.ErrorAs(t, err, new(apistatus.ObjectAlreadyRemoved))
obj = oidtest.Address()
_, err = db.Inhume(new(meta.InhumePrm).WithAddresses(obj))
var prm meta.InhumePrm
prm.WithAddresses(obj)
_, err = db.Inhume(prm)
require.NoError(t, err)
_, err = meta.Get(db, obj)
require.ErrorAs(t, err, new(apistatus.ObjectNotFound))

View file

@ -32,9 +32,8 @@ type GarbageIterationPrm struct {
// SetHandler sets a handler that will be called on every
// GarbageObject.
func (g *GarbageIterationPrm) SetHandler(h GarbageHandler) *GarbageIterationPrm {
func (g *GarbageIterationPrm) SetHandler(h GarbageHandler) {
g.h = h
return g
}
// SetOffset sets an offset of the iteration operation.
@ -49,9 +48,8 @@ func (g *GarbageIterationPrm) SetHandler(h GarbageHandler) *GarbageIterationPrm
// next element.
//
// Nil offset means start an integration from the beginning.
func (g *GarbageIterationPrm) SetOffset(offset oid.Address) *GarbageIterationPrm {
func (g *GarbageIterationPrm) SetOffset(offset oid.Address) {
g.offset = &offset
return g
}
// IterateOverGarbage iterates over all objects
@ -59,7 +57,7 @@ func (g *GarbageIterationPrm) SetOffset(offset oid.Address) *GarbageIterationPrm
//
// If h returns ErrInterruptIterator, nil returns immediately.
// Returns other errors of h directly.
func (db *DB) IterateOverGarbage(p *GarbageIterationPrm) error {
func (db *DB) IterateOverGarbage(p GarbageIterationPrm) error {
return db.boltDB.View(func(tx *bbolt.Tx) error {
return db.iterateDeletedObj(tx, gcHandler{p.h}, p.offset)
})
@ -95,9 +93,8 @@ type GraveyardIterationPrm struct {
// SetHandler sets a handler that will be called on every
// TombstonedObject.
func (g *GraveyardIterationPrm) SetHandler(h TombstonedHandler) *GraveyardIterationPrm {
func (g *GraveyardIterationPrm) SetHandler(h TombstonedHandler) {
g.h = h
return g
}
// SetOffset sets an offset of the iteration operation.
@ -112,16 +109,15 @@ func (g *GraveyardIterationPrm) SetHandler(h TombstonedHandler) *GraveyardIterat
// next element.
//
// Nil offset means start an integration from the beginning.
func (g *GraveyardIterationPrm) SetOffset(offset oid.Address) *GraveyardIterationPrm {
func (g *GraveyardIterationPrm) SetOffset(offset oid.Address) {
g.offset = &offset
return g
}
// IterateOverGraveyard iterates over all graves in DB.
//
// If h returns ErrInterruptIterator, nil returns immediately.
// Returns other errors of h directly.
func (db *DB) IterateOverGraveyard(p *GraveyardIterationPrm) error {
func (db *DB) IterateOverGraveyard(p GraveyardIterationPrm) error {
return db.boltDB.View(func(tx *bbolt.Tx) error {
return db.iterateDeletedObj(tx, graveyardHandler{p.h}, p.offset)
})

View file

@ -14,21 +14,24 @@ func TestDB_IterateDeletedObjects_EmptyDB(t *testing.T) {
db := newDB(t)
var counter int
iterGravePRM := new(meta.GraveyardIterationPrm)
var iterGravePRM meta.GraveyardIterationPrm
err := db.IterateOverGraveyard(iterGravePRM.SetHandler(func(garbage meta.TombstonedObject) error {
iterGravePRM.SetHandler(func(garbage meta.TombstonedObject) error {
counter++
return nil
}))
})
err := db.IterateOverGraveyard(iterGravePRM)
require.NoError(t, err)
require.Zero(t, counter)
iterGCPRM := new(meta.GarbageIterationPrm)
err = db.IterateOverGarbage(iterGCPRM.SetHandler(func(garbage meta.GarbageObject) error {
var iterGCPRM meta.GarbageIterationPrm
iterGCPRM.SetHandler(func(garbage meta.GarbageObject) error {
counter++
return nil
}))
})
err = db.IterateOverGarbage(iterGCPRM)
require.NoError(t, err)
require.Zero(t, counter)
}
@ -60,29 +63,25 @@ func TestDB_Iterate_OffsetNotFound(t *testing.T) {
err = putBig(db, obj1)
require.NoError(t, err)
_, err = db.Inhume(new(meta.InhumePrm).
WithAddresses(object.AddressOf(obj1)).
WithGCMark(),
)
var inhumePrm meta.InhumePrm
inhumePrm.WithAddresses(object.AddressOf(obj1))
inhumePrm.WithGCMark()
_, err = db.Inhume(inhumePrm)
require.NoError(t, err)
var counter int
iterGCPRM := new(meta.GarbageIterationPrm).
SetOffset(object.AddressOf(obj2)).
SetHandler(func(garbage meta.GarbageObject) error {
require.Equal(t, garbage.Address(), addr1)
counter++
return nil
})
err = db.IterateOverGarbage(iterGCPRM.SetHandler(func(garbage meta.GarbageObject) error {
var iterGCPRM meta.GarbageIterationPrm
iterGCPRM.SetOffset(object.AddressOf(obj2))
iterGCPRM.SetHandler(func(garbage meta.GarbageObject) error {
require.Equal(t, garbage.Address(), addr1)
counter++
return nil
}))
})
err = db.IterateOverGarbage(iterGCPRM)
require.NoError(t, err)
// the second object would be put after the
@ -91,12 +90,14 @@ func TestDB_Iterate_OffsetNotFound(t *testing.T) {
require.Equal(t, 0, counter)
iterGCPRM.SetOffset(addr3)
err = db.IterateOverGarbage(iterGCPRM.SetHandler(func(garbage meta.GarbageObject) error {
iterGCPRM.SetHandler(func(garbage meta.GarbageObject) error {
require.Equal(t, garbage.Address(), addr1)
counter++
return nil
}))
})
err = db.IterateOverGarbage(iterGCPRM)
require.NoError(t, err)
// the third object would be put before the
@ -128,22 +129,22 @@ func TestDB_IterateDeletedObjects(t *testing.T) {
err = putBig(db, obj4)
require.NoError(t, err)
inhumePrm := new(meta.InhumePrm)
var inhumePrm meta.InhumePrm
// inhume with tombstone
addrTombstone := oidtest.Address()
_, err = db.Inhume(inhumePrm.
WithAddresses(object.AddressOf(obj1), object.AddressOf(obj2)).
WithTombstoneAddress(addrTombstone),
)
inhumePrm.WithAddresses(object.AddressOf(obj1), object.AddressOf(obj2))
inhumePrm.WithTombstoneAddress(addrTombstone)
_, err = db.Inhume(inhumePrm)
require.NoError(t, err)
inhumePrm.WithAddresses(object.AddressOf(obj3), object.AddressOf(obj4))
inhumePrm.WithGCMark()
// inhume with GC mark
_, err = db.Inhume(inhumePrm.
WithAddresses(object.AddressOf(obj3), object.AddressOf(obj4)).
WithGCMark(),
)
_, err = db.Inhume(inhumePrm)
require.NoError(t, err)
var (
@ -151,26 +152,28 @@ func TestDB_IterateDeletedObjects(t *testing.T) {
buriedTS, buriedGC []oid.Address
)
iterGravePRM := new(meta.GraveyardIterationPrm)
err = db.IterateOverGraveyard(iterGravePRM.SetHandler(func(tomstoned meta.TombstonedObject) error {
var iterGravePRM meta.GraveyardIterationPrm
iterGravePRM.SetHandler(func(tomstoned meta.TombstonedObject) error {
require.Equal(t, addrTombstone, tomstoned.Tombstone())
buriedTS = append(buriedTS, tomstoned.Address())
counterAll++
return nil
}))
})
err = db.IterateOverGraveyard(iterGravePRM)
require.NoError(t, err)
iterGCPRM := new(meta.GarbageIterationPrm)
err = db.IterateOverGarbage(iterGCPRM.SetHandler(func(garbage meta.GarbageObject) error {
var iterGCPRM meta.GarbageIterationPrm
iterGCPRM.SetHandler(func(garbage meta.GarbageObject) error {
buriedGC = append(buriedGC, garbage.Address())
counterAll++
return nil
}))
})
err = db.IterateOverGarbage(iterGCPRM)
require.NoError(t, err)
// objects covered with a tombstone
@ -212,15 +215,16 @@ func TestDB_IterateOverGraveyard_Offset(t *testing.T) {
err = putBig(db, obj4)
require.NoError(t, err)
inhumePrm := new(meta.InhumePrm)
// inhume with tombstone
addrTombstone := oidtest.Address()
_, err = db.Inhume(inhumePrm.
WithAddresses(object.AddressOf(obj1), object.AddressOf(obj2), object.AddressOf(obj3), object.AddressOf(obj4)).
WithTombstoneAddress(addrTombstone),
)
var inhumePrm meta.InhumePrm
inhumePrm.WithAddresses(
object.AddressOf(obj1), object.AddressOf(obj2),
object.AddressOf(obj3), object.AddressOf(obj4))
inhumePrm.WithTombstoneAddress(addrTombstone)
_, err = db.Inhume(inhumePrm)
require.NoError(t, err)
expectedGraveyard := []oid.Address{
@ -235,9 +239,8 @@ func TestDB_IterateOverGraveyard_Offset(t *testing.T) {
gotGraveyard []oid.Address
)
iterGraveyardPrm := new(meta.GraveyardIterationPrm)
err = db.IterateOverGraveyard(iterGraveyardPrm.SetHandler(func(tombstoned meta.TombstonedObject) error {
var iterGraveyardPrm meta.GraveyardIterationPrm
iterGraveyardPrm.SetHandler(func(tombstoned meta.TombstonedObject) error {
require.Equal(t, addrTombstone, tombstoned.Tombstone())
gotGraveyard = append(gotGraveyard, tombstoned.Address())
@ -248,7 +251,9 @@ func TestDB_IterateOverGraveyard_Offset(t *testing.T) {
}
return nil
}))
})
err = db.IterateOverGraveyard(iterGraveyardPrm)
require.NoError(t, err)
require.Equal(t, firstIterationSize, counter)
require.Equal(t, firstIterationSize, len(gotGraveyard))
@ -256,15 +261,16 @@ func TestDB_IterateOverGraveyard_Offset(t *testing.T) {
// last received address is an offset
offset := gotGraveyard[len(gotGraveyard)-1]
iterGraveyardPrm.SetOffset(offset)
err = db.IterateOverGraveyard(iterGraveyardPrm.SetHandler(func(tombstoned meta.TombstonedObject) error {
iterGraveyardPrm.SetHandler(func(tombstoned meta.TombstonedObject) error {
require.Equal(t, addrTombstone, tombstoned.Tombstone())
gotGraveyard = append(gotGraveyard, tombstoned.Address())
counter++
return nil
}))
})
err = db.IterateOverGraveyard(iterGraveyardPrm)
require.NoError(t, err)
require.Equal(t, len(expectedGraveyard), counter)
require.ElementsMatch(t, gotGraveyard, expectedGraveyard)
@ -273,13 +279,13 @@ func TestDB_IterateOverGraveyard_Offset(t *testing.T) {
// should lead to no iteration at all
offset = gotGraveyard[len(gotGraveyard)-1]
iterGraveyardPrm.SetOffset(offset)
iWasCalled := false
err = db.IterateOverGraveyard(iterGraveyardPrm.SetHandler(func(tombstoned meta.TombstonedObject) error {
iterGraveyardPrm.SetHandler(func(tombstoned meta.TombstonedObject) error {
iWasCalled = true
return nil
}))
})
err = db.IterateOverGraveyard(iterGraveyardPrm)
require.NoError(t, err)
require.False(t, iWasCalled)
}
@ -307,12 +313,13 @@ func TestDB_IterateOverGarbage_Offset(t *testing.T) {
err = putBig(db, obj4)
require.NoError(t, err)
inhumePrm := new(meta.InhumePrm)
var inhumePrm meta.InhumePrm
inhumePrm.WithAddresses(
object.AddressOf(obj1), object.AddressOf(obj2),
object.AddressOf(obj3), object.AddressOf(obj4))
inhumePrm.WithGCMark()
_, err = db.Inhume(inhumePrm.
WithAddresses(object.AddressOf(obj1), object.AddressOf(obj2), object.AddressOf(obj3), object.AddressOf(obj4)).
WithGCMark(),
)
_, err = db.Inhume(inhumePrm)
require.NoError(t, err)
expectedGarbage := []oid.Address{
@ -327,9 +334,8 @@ func TestDB_IterateOverGarbage_Offset(t *testing.T) {
gotGarbage []oid.Address
)
iterGarbagePrm := new(meta.GarbageIterationPrm)
err = db.IterateOverGarbage(iterGarbagePrm.SetHandler(func(garbage meta.GarbageObject) error {
var iterGarbagePrm meta.GarbageIterationPrm
iterGarbagePrm.SetHandler(func(garbage meta.GarbageObject) error {
gotGarbage = append(gotGarbage, garbage.Address())
counter++
@ -338,7 +344,9 @@ func TestDB_IterateOverGarbage_Offset(t *testing.T) {
}
return nil
}))
})
err = db.IterateOverGarbage(iterGarbagePrm)
require.NoError(t, err)
require.Equal(t, firstIterationSize, counter)
require.Equal(t, firstIterationSize, len(gotGarbage))
@ -346,13 +354,14 @@ func TestDB_IterateOverGarbage_Offset(t *testing.T) {
// last received address is an offset
offset := gotGarbage[len(gotGarbage)-1]
iterGarbagePrm.SetOffset(offset)
err = db.IterateOverGarbage(iterGarbagePrm.SetHandler(func(garbage meta.GarbageObject) error {
iterGarbagePrm.SetHandler(func(garbage meta.GarbageObject) error {
gotGarbage = append(gotGarbage, garbage.Address())
counter++
return nil
}))
})
err = db.IterateOverGarbage(iterGarbagePrm)
require.NoError(t, err)
require.Equal(t, len(expectedGarbage), counter)
require.ElementsMatch(t, gotGarbage, expectedGarbage)
@ -361,13 +370,13 @@ func TestDB_IterateOverGarbage_Offset(t *testing.T) {
// should lead to no iteration at all
offset = gotGarbage[len(gotGarbage)-1]
iterGarbagePrm.SetOffset(offset)
iWasCalled := false
err = db.IterateOverGarbage(iterGarbagePrm.SetHandler(func(garbage meta.GarbageObject) error {
iterGarbagePrm.SetHandler(func(garbage meta.GarbageObject) error {
iWasCalled = true
return nil
}))
})
err = db.IterateOverGarbage(iterGarbagePrm)
require.NoError(t, err)
require.False(t, iWasCalled)
}
@ -387,27 +396,27 @@ func TestDB_DropGraves(t *testing.T) {
err = putBig(db, obj2)
require.NoError(t, err)
inhumePrm := new(meta.InhumePrm)
// inhume with tombstone
addrTombstone := oidtest.Address()
_, err = db.Inhume(inhumePrm.
WithAddresses(object.AddressOf(obj1), object.AddressOf(obj2)).
WithTombstoneAddress(addrTombstone),
)
var inhumePrm meta.InhumePrm
inhumePrm.WithAddresses(object.AddressOf(obj1), object.AddressOf(obj2))
inhumePrm.WithTombstoneAddress(addrTombstone)
_, err = db.Inhume(inhumePrm)
require.NoError(t, err)
buriedTS := make([]meta.TombstonedObject, 0)
iterGravePRM := new(meta.GraveyardIterationPrm)
var iterGravePRM meta.GraveyardIterationPrm
var counter int
err = db.IterateOverGraveyard(iterGravePRM.SetHandler(func(tomstoned meta.TombstonedObject) error {
iterGravePRM.SetHandler(func(tomstoned meta.TombstonedObject) error {
buriedTS = append(buriedTS, tomstoned)
counter++
return nil
}))
})
err = db.IterateOverGraveyard(iterGravePRM)
require.NoError(t, err)
require.Equal(t, 2, counter)
@ -415,11 +424,12 @@ func TestDB_DropGraves(t *testing.T) {
require.NoError(t, err)
counter = 0
err = db.IterateOverGraveyard(iterGravePRM.SetHandler(func(_ meta.TombstonedObject) error {
iterGravePRM.SetHandler(func(_ meta.TombstonedObject) error {
counter++
return nil
}))
})
err = db.IterateOverGraveyard(iterGravePRM)
require.NoError(t, err)
require.Zero(t, counter)
}

View file

@ -22,45 +22,40 @@ type InhumePrm struct {
type InhumeRes struct{}
// WithAddresses sets a list of object addresses that should be inhumed.
func (p *InhumePrm) WithAddresses(addrs ...oid.Address) *InhumePrm {
func (p *InhumePrm) WithAddresses(addrs ...oid.Address) {
if p != nil {
p.target = addrs
}
return p
}
// WithTombstoneAddress sets tombstone address as the reason for inhume operation.
//
// addr should not be nil.
// Should not be called along with WithGCMark.
func (p *InhumePrm) WithTombstoneAddress(addr oid.Address) *InhumePrm {
func (p *InhumePrm) WithTombstoneAddress(addr oid.Address) {
if p != nil {
p.tomb = &addr
}
return p
}
// WithGCMark marks the object to be physically removed.
//
// Should not be called along with WithTombstoneAddress.
func (p *InhumePrm) WithGCMark() *InhumePrm {
func (p *InhumePrm) WithGCMark() {
if p != nil {
p.tomb = nil
}
return p
}
// Inhume inhumes the object by specified address.
//
// tomb should not be nil.
func Inhume(db *DB, target, tomb oid.Address) error {
_, err := db.Inhume(new(InhumePrm).
WithAddresses(target).
WithTombstoneAddress(tomb),
)
var inhumePrm InhumePrm
inhumePrm.WithAddresses(target)
inhumePrm.WithTombstoneAddress(tomb)
_, err := db.Inhume(inhumePrm)
return err
}
@ -71,7 +66,7 @@ var errBreakBucketForEach = errors.New("bucket ForEach break")
//
// Allows inhuming non-locked objects only. Returns apistatus.ObjectLocked
// if at least one object is locked.
func (db *DB) Inhume(prm *InhumePrm) (res *InhumeRes, err error) {
func (db *DB) Inhume(prm InhumePrm) (res *InhumeRes, err error) {
err = db.boltDB.Update(func(tx *bbolt.Tx) error {
garbageBKT := tx.Bucket(garbageBucketName)

View file

@ -41,50 +41,56 @@ func TestInhumeTombOnTomb(t *testing.T) {
addr1 = oidtest.Address()
addr2 = oidtest.Address()
addr3 = oidtest.Address()
inhumePrm = new(meta.InhumePrm)
existsPrm = new(meta.ExistsPrm)
inhumePrm meta.InhumePrm
existsPrm meta.ExistsPrm
)
inhumePrm.WithAddresses(addr1)
inhumePrm.WithTombstoneAddress(addr2)
// inhume addr1 via addr2
_, err = db.Inhume(inhumePrm.
WithAddresses(addr1).
WithTombstoneAddress(addr2),
)
_, err = db.Inhume(inhumePrm)
require.NoError(t, err)
existsPrm.WithAddress(addr1)
// addr1 should become inhumed {addr1:addr2}
_, err = db.Exists(existsPrm.WithAddress(addr1))
_, err = db.Exists(existsPrm)
require.ErrorAs(t, err, new(apistatus.ObjectAlreadyRemoved))
inhumePrm.WithAddresses(addr3)
inhumePrm.WithTombstoneAddress(addr1)
// try to inhume addr3 via addr1
_, err = db.Inhume(inhumePrm.
WithAddresses(addr3).
WithTombstoneAddress(addr1),
)
_, err = db.Inhume(inhumePrm)
require.NoError(t, err)
// record with {addr1:addr2} should be removed from graveyard
// as a tomb-on-tomb; metabase should return ObjectNotFound
// NOT ObjectAlreadyRemoved since that record has been removed
// from graveyard but addr1 is still marked with GC
_, err = db.Exists(existsPrm.WithAddress(addr1))
_, err = db.Exists(existsPrm)
require.ErrorAs(t, err, new(apistatus.ObjectNotFound))
existsPrm.WithAddress(addr3)
// addr3 should be inhumed {addr3: addr1}
_, err = db.Exists(existsPrm.WithAddress(addr3))
_, err = db.Exists(existsPrm)
require.ErrorAs(t, err, new(apistatus.ObjectAlreadyRemoved))
inhumePrm.WithAddresses(addr1)
inhumePrm.WithTombstoneAddress(oidtest.Address())
// try to inhume addr1 (which is already a tombstone in graveyard)
_, err = db.Inhume(inhumePrm.
WithAddresses(addr1).
WithTombstoneAddress(oidtest.Address()),
)
_, err = db.Inhume(inhumePrm)
require.NoError(t, err)
existsPrm.WithAddress(addr1)
// record with addr1 key should not appear in graveyard
// (tomb can not be inhumed) but should be kept as object
// with GC mark
_, err = db.Exists(existsPrm.WithAddress(addr1))
_, err = db.Exists(existsPrm)
require.ErrorAs(t, err, new(apistatus.ObjectNotFound))
}
@ -99,7 +105,7 @@ func TestInhumeLocked(t *testing.T) {
var prm meta.InhumePrm
prm.WithAddresses(locked)
_, err = db.Inhume(&prm)
_, err = db.Inhume(prm)
var e apistatus.ObjectLocked
require.ErrorAs(t, err, &e)

View file

@ -74,20 +74,19 @@ func TestDB_IterateCoveredByTombstones(t *testing.T) {
protectedLocked := oidtest.Address()
garbage := oidtest.Address()
prm := new(meta.InhumePrm)
var prm meta.InhumePrm
var err error
_, err = db.Inhume(prm.
WithTombstoneAddress(ts).
WithAddresses(protected1, protected2, protectedLocked),
)
prm.WithAddresses(protected1, protected2, protectedLocked)
prm.WithTombstoneAddress(ts)
_, err = db.Inhume(prm)
require.NoError(t, err)
_, err = db.Inhume(prm.
WithAddresses(garbage).
WithGCMark(),
)
prm.WithAddresses(garbage)
prm.WithGCMark()
_, err = db.Inhume(prm)
require.NoError(t, err)
var handled []oid.Address

View file

@ -27,17 +27,15 @@ type ListPrm struct {
}
// WithCount sets maximum amount of addresses that ListWithCursor should return.
func (l *ListPrm) WithCount(count uint32) *ListPrm {
func (l *ListPrm) WithCount(count uint32) {
l.count = int(count)
return l
}
// WithCursor sets cursor for ListWithCursor operation. For initial request
// ignore this param or use nil value. For consecutive requests, use value
// from ListRes.
func (l *ListPrm) WithCursor(cursor *Cursor) *ListPrm {
func (l *ListPrm) WithCursor(cursor *Cursor) {
l.cursor = cursor
return l
}
// ListRes contains values returned from ListWithCursor operation.
@ -63,7 +61,11 @@ func (l ListRes) Cursor() *Cursor {
// Returns ErrEndOfListing if there are no more objects to return or count
// parameter set to zero.
func ListWithCursor(db *DB, count uint32, cursor *Cursor) ([]oid.Address, *Cursor, error) {
r, err := db.ListWithCursor(new(ListPrm).WithCount(count).WithCursor(cursor))
var listPrm ListPrm
listPrm.WithCount(count)
listPrm.WithCursor(cursor)
r, err := db.ListWithCursor(listPrm)
if err != nil {
return nil, nil, err
}
@ -77,7 +79,7 @@ func ListWithCursor(db *DB, count uint32, cursor *Cursor) ([]oid.Address, *Curso
//
// Returns ErrEndOfListing if there are no more objects to return or count
// parameter set to zero.
func (db *DB) ListWithCursor(prm *ListPrm) (res *ListRes, err error) {
func (db *DB) ListWithCursor(prm ListPrm) (res *ListRes, err error) {
err = db.boltDB.View(func(tx *bbolt.Tx) error {
res = new(ListRes)
res.addrList, res.cursor, err = db.listWithCursor(tx, prm.count, prm.cursor)

View file

@ -78,8 +78,12 @@ func TestDB_Lock(t *testing.T) {
err = meta.Inhume(db, objectcore.AddressOf(obj), tombAddr)
require.ErrorAs(t, err, new(apistatus.ObjectLocked))
var inhumePrm meta.InhumePrm
inhumePrm.WithAddresses(tombAddr)
inhumePrm.WithGCMark()
// inhume the tombstone
_, err = db.Inhume(new(meta.InhumePrm).WithAddresses(tombAddr).WithGCMark())
_, err = db.Inhume(inhumePrm)
require.NoError(t, err)
// now we can inhume the object

View file

@ -16,12 +16,10 @@ type ToMoveItPrm struct {
type ToMoveItRes struct{}
// WithAddress sets address of the object to move into another shard.
func (p *ToMoveItPrm) WithAddress(addr oid.Address) *ToMoveItPrm {
func (p *ToMoveItPrm) WithAddress(addr oid.Address) {
if p != nil {
p.addr = addr
}
return p
}
// DoNotMovePrm groups the parameters of DoNotMove operation.
@ -33,12 +31,10 @@ type DoNotMovePrm struct {
type DoNotMoveRes struct{}
// WithAddress sets address of the object to prevent moving into another shard.
func (p *DoNotMovePrm) WithAddress(addr oid.Address) *DoNotMovePrm {
func (p *DoNotMovePrm) WithAddress(addr oid.Address) {
if p != nil {
p.addr = addr
}
return p
}
// MovablePrm groups the parameters of Movable operation.
@ -50,19 +46,22 @@ type MovableRes struct {
}
// AddressList returns resulting addresses of Movable operation.
func (p *MovableRes) AddressList() []oid.Address {
func (p MovableRes) AddressList() []oid.Address {
return p.addrList
}
// ToMoveIt marks object to move it into another shard.
func ToMoveIt(db *DB, addr oid.Address) error {
_, err := db.ToMoveIt(new(ToMoveItPrm).WithAddress(addr))
var toMovePrm ToMoveItPrm
toMovePrm.WithAddress(addr)
_, err := db.ToMoveIt(toMovePrm)
return err
}
// ToMoveIt marks objects to move it into another shard. This useful for
// faster HRW fetching.
func (db *DB) ToMoveIt(prm *ToMoveItPrm) (res *ToMoveItRes, err error) {
func (db *DB) ToMoveIt(prm ToMoveItPrm) (res *ToMoveItRes, err error) {
err = db.boltDB.Update(func(tx *bbolt.Tx) error {
toMoveIt, err := tx.CreateBucketIfNotExists(toMoveItBucketName)
if err != nil {
@ -77,12 +76,15 @@ func (db *DB) ToMoveIt(prm *ToMoveItPrm) (res *ToMoveItRes, err error) {
// DoNotMove prevents the object to be moved into another shard.
func DoNotMove(db *DB, addr oid.Address) error {
_, err := db.DoNotMove(new(DoNotMovePrm).WithAddress(addr))
var doNotMovePrm DoNotMovePrm
doNotMovePrm.WithAddress(addr)
_, err := db.DoNotMove(doNotMovePrm)
return err
}
// DoNotMove removes `MoveIt` mark from the object.
func (db *DB) DoNotMove(prm *DoNotMovePrm) (res *DoNotMoveRes, err error) {
func (db *DB) DoNotMove(prm DoNotMovePrm) (res *DoNotMoveRes, err error) {
err = db.boltDB.Update(func(tx *bbolt.Tx) error {
toMoveIt := tx.Bucket(toMoveItBucketName)
if toMoveIt == nil {
@ -97,7 +99,7 @@ func (db *DB) DoNotMove(prm *DoNotMovePrm) (res *DoNotMoveRes, err error) {
// Movable returns all movable objects of DB.
func Movable(db *DB) ([]oid.Address, error) {
r, err := db.Movable(new(MovablePrm))
r, err := db.Movable(MovablePrm{})
if err != nil {
return nil, err
}
@ -106,7 +108,7 @@ func Movable(db *DB) ([]oid.Address, error) {
}
// Movable returns list of marked objects to move into other shard.
func (db *DB) Movable(prm *MovablePrm) (*MovableRes, error) {
func (db *DB) Movable(_ MovablePrm) (*MovableRes, error) {
var strAddrs []string
err := db.boltDB.View(func(tx *bbolt.Tx) error {

View file

@ -34,21 +34,17 @@ type PutPrm struct {
type PutRes struct{}
// WithObject is a Put option to set object to save.
func (p *PutPrm) WithObject(obj *objectSDK.Object) *PutPrm {
func (p *PutPrm) WithObject(obj *objectSDK.Object) {
if p != nil {
p.obj = obj
}
return p
}
// WithBlobovniczaID is a Put option to set blobovnicza ID to save.
func (p *PutPrm) WithBlobovniczaID(id *blobovnicza.ID) *PutPrm {
func (p *PutPrm) WithBlobovniczaID(id *blobovnicza.ID) {
if p != nil {
p.id = id
}
return p
}
var (
@ -61,10 +57,11 @@ var (
//
// See DB.Put docs.
func Put(db *DB, obj *objectSDK.Object, id *blobovnicza.ID) error {
_, err := db.Put(new(PutPrm).
WithObject(obj).
WithBlobovniczaID(id),
)
var putPrm PutPrm
putPrm.WithObject(obj)
putPrm.WithBlobovniczaID(id)
_, err := db.Put(putPrm)
return err
}
@ -73,7 +70,7 @@ func Put(db *DB, obj *objectSDK.Object, id *blobovnicza.ID) error {
// Big objects have nil blobovniczaID.
//
// Returns an error of type apistatus.ObjectAlreadyRemoved if object has been placed in graveyard.
func (db *DB) Put(prm *PutPrm) (res *PutRes, err error) {
func (db *DB) Put(prm PutPrm) (res *PutRes, err error) {
err = db.boltDB.Batch(func(tx *bbolt.Tx) error {
return db.put(tx, prm.obj, prm.id, nil)
})

View file

@ -40,21 +40,17 @@ type SelectRes struct {
}
// WithContainerID is a Select option to set the container id to search in.
func (p *SelectPrm) WithContainerID(cnr cid.ID) *SelectPrm {
func (p *SelectPrm) WithContainerID(cnr cid.ID) {
if p != nil {
p.cnr = cnr
}
return p
}
// WithFilters is a Select option to set the object filters.
func (p *SelectPrm) WithFilters(fs object.SearchFilters) *SelectPrm {
func (p *SelectPrm) WithFilters(fs object.SearchFilters) {
if p != nil {
p.filters = fs
}
return p
}
// AddressList returns list of addresses of the selected objects.
@ -64,7 +60,11 @@ func (r *SelectRes) AddressList() []oid.Address {
// Select selects the objects from DB with filtering.
func Select(db *DB, cnr cid.ID, fs object.SearchFilters) ([]oid.Address, error) {
r, err := db.Select(new(SelectPrm).WithFilters(fs).WithContainerID(cnr))
var selectPrm SelectPrm
selectPrm.WithFilters(fs)
selectPrm.WithContainerID(cnr)
r, err := db.Select(selectPrm)
if err != nil {
return nil, err
}
@ -73,7 +73,7 @@ func Select(db *DB, cnr cid.ID, fs object.SearchFilters) ([]oid.Address, error)
}
// Select returns list of addresses of objects that match search filters.
func (db *DB) Select(prm *SelectPrm) (res *SelectRes, err error) {
func (db *DB) Select(prm SelectPrm) (res *SelectRes, err error) {
res = new(SelectRes)
if blindlyProcess(prm.filters) {

View file

@ -18,12 +18,10 @@ type IsSmallRes struct {
}
// WithAddress is a IsSmall option to set the object address to check.
func (p *IsSmallPrm) WithAddress(addr oid.Address) *IsSmallPrm {
func (p *IsSmallPrm) WithAddress(addr oid.Address) {
if p != nil {
p.addr = addr
}
return p
}
// BlobovniczaID returns blobovnicza identifier.
@ -35,7 +33,10 @@ func (r *IsSmallRes) BlobovniczaID() *blobovnicza.ID {
// address and other parameters by default. Returns only
// the blobovnicza identifier.
func IsSmall(db *DB, addr oid.Address) (*blobovnicza.ID, error) {
r, err := db.IsSmall(new(IsSmallPrm).WithAddress(addr))
var isSmallPrm IsSmallPrm
isSmallPrm.WithAddress(addr)
r, err := db.IsSmall(isSmallPrm)
if err != nil {
return nil, err
}
@ -47,7 +48,7 @@ func IsSmall(db *DB, addr oid.Address) (*blobovnicza.ID, error) {
// Small objects stored in blobovnicza instances. Big objects stored in FS by
// shallow path which is calculated from address and therefore it is not
// indexed in metabase.
func (db *DB) IsSmall(prm *IsSmallPrm) (res *IsSmallRes, err error) {
func (db *DB) IsSmall(prm IsSmallPrm) (res *IsSmallRes, err error) {
res = new(IsSmallRes)
err = db.boltDB.View(func(tx *bbolt.Tx) error {

View file

@ -104,7 +104,7 @@ func (s *Shard) refillMetabase() error {
inhumePrm.WithTombstoneAddress(tombAddr)
inhumePrm.WithAddresses(tombMembers...)
_, err = s.metaBase.Inhume(&inhumePrm)
_, err = s.metaBase.Inhume(inhumePrm)
if err != nil {
return fmt.Errorf("could not inhume objects: %w", err)
}

View file

@ -185,11 +185,8 @@ func (s *Shard) removeGarbage() {
buf := make([]oid.Address, 0, s.rmBatchSize)
iterPrm := new(meta.GarbageIterationPrm)
// iterate over metabase's objects with GC mark
// (no more than s.rmBatchSize objects)
err := s.metaBase.IterateOverGarbage(iterPrm.SetHandler(func(g meta.GarbageObject) error {
var iterPrm meta.GarbageIterationPrm
iterPrm.SetHandler(func(g meta.GarbageObject) error {
buf = append(buf, g.Address())
if len(buf) == s.rmBatchSize {
@ -197,7 +194,11 @@ func (s *Shard) removeGarbage() {
}
return nil
}))
})
// iterate over metabase's objects with GC mark
// (no more than s.rmBatchSize objects)
err := s.metaBase.IterateOverGarbage(iterPrm)
if err != nil {
s.log.Warn("iterator over metabase graveyard failed",
zap.String("error", err.Error()),
@ -232,11 +233,13 @@ func (s *Shard) collectExpiredObjects(ctx context.Context, e Event) {
return
}
var inhumePrm meta.InhumePrm
inhumePrm.WithAddresses(expired...)
inhumePrm.WithGCMark()
// inhume the collected objects
_, err = s.metaBase.Inhume(new(meta.InhumePrm).
WithAddresses(expired...).
WithGCMark(),
)
_, err = s.metaBase.Inhume(inhumePrm)
if err != nil {
s.log.Warn("could not inhume the objects",
zap.String("error", err.Error()),
@ -256,7 +259,8 @@ func (s *Shard) collectExpiredTombstones(ctx context.Context, e Event) {
tss := make([]meta.TombstonedObject, 0, tssDeleteBatch)
tssExp := make([]meta.TombstonedObject, 0, tssDeleteBatch)
iterPrm := new(meta.GraveyardIterationPrm).SetHandler(func(deletedObject meta.TombstonedObject) error {
var iterPrm meta.GraveyardIterationPrm
iterPrm.SetHandler(func(deletedObject meta.TombstonedObject) error {
tss = append(tss, deletedObject)
if len(tss) == tssDeleteBatch {
@ -348,7 +352,7 @@ func (s *Shard) HandleExpiredTombstones(tss []meta.TombstonedObject) {
pInhume.WithAddresses(tsAddrs...)
// inhume tombstones
_, err := s.metaBase.Inhume(&pInhume)
_, err := s.metaBase.Inhume(pInhume)
if err != nil {
s.log.Warn("could not mark tombstones as garbage",
zap.String("error", err.Error()),
@ -381,7 +385,7 @@ func (s *Shard) HandleExpiredLocks(lockers []oid.Address) {
pInhume.WithAddresses(lockers...)
pInhume.WithGCMark()
_, err = s.metaBase.Inhume(&pInhume)
_, err = s.metaBase.Inhume(pInhume)
if err != nil {
s.log.Warn("failure to mark lockers as garbage",
zap.String("error", err.Error()),

View file

@ -71,9 +71,9 @@ func (s *Shard) Head(prm *HeadPrm) (*HeadRes, error) {
// otherwise object seems to be flushed to metabase
}
headParams := new(meta.GetPrm).
WithAddress(prm.addr).
WithRaw(prm.raw)
var headParams meta.GetPrm
headParams.WithAddress(prm.addr)
headParams.WithRaw(prm.raw)
res, err := s.metaBase.Get(headParams)
if err != nil {

View file

@ -61,7 +61,8 @@ func (s *Shard) Inhume(prm *InhumePrm) (*InhumeRes, error) {
}
}
metaPrm := new(meta.InhumePrm).WithAddresses(prm.target...)
var metaPrm meta.InhumePrm
metaPrm.WithAddresses(prm.target...)
if prm.tombstone != nil {
metaPrm.WithTombstoneAddress(*prm.tombstone)

View file

@ -118,7 +118,10 @@ func ListContainers(s *Shard) ([]cid.ID, error) {
// Returns ErrEndOfListing if there are no more objects to return or count
// parameter set to zero.
func (s *Shard) ListWithCursor(prm *ListWithCursorPrm) (*ListWithCursorRes, error) {
metaPrm := new(meta.ListPrm).WithCount(prm.count).WithCursor(prm.cursor)
var metaPrm meta.ListPrm
metaPrm.WithCount(prm.count)
metaPrm.WithCursor(prm.cursor)
res, err := s.metaBase.ListWithCursor(metaPrm)
if err != nil {
return nil, fmt.Errorf("could not get list of objects: %w", err)