[#1418] engine: Do not use pointers as parameters
Signed-off-by: Pavel Karpy <carpawell@nspcc.ru>
This commit is contained in:
parent
6e752f36dc
commit
babd382ba5
20 changed files with 134 additions and 119 deletions
|
@ -34,7 +34,8 @@ func initControlService(c *cfg) {
|
|||
controlSvc.WithNetMapSource(c.netMapSource),
|
||||
controlSvc.WithNodeState(c),
|
||||
controlSvc.WithDeletedObjectHandler(func(addrList []oid.Address) error {
|
||||
prm := new(engine.DeletePrm).WithAddresses(addrList...)
|
||||
var prm engine.DeletePrm
|
||||
prm.WithAddresses(addrList...)
|
||||
|
||||
_, err := c.cfgObject.cfgLocalStorage.localStorage.Delete(prm)
|
||||
|
||||
|
|
|
@ -33,7 +33,7 @@ func (n *notificationSource) Iterate(epoch uint64, handler func(topic string, ad
|
|||
filters := objectSDK.NewSearchFilters()
|
||||
filters.AddNotificationEpochFilter(epoch)
|
||||
|
||||
selectPrm := new(engine.SelectPrm)
|
||||
var selectPrm engine.SelectPrm
|
||||
selectPrm.WithFilters(filters)
|
||||
|
||||
for _, c := range listRes.Containers() {
|
||||
|
@ -67,7 +67,7 @@ func (n *notificationSource) processAddress(
|
|||
a oid.Address,
|
||||
h func(topic string, addr oid.Address),
|
||||
) error {
|
||||
prm := new(engine.HeadPrm)
|
||||
var prm engine.HeadPrm
|
||||
prm.WithAddress(a)
|
||||
|
||||
res, err := n.e.Head(prm)
|
||||
|
|
|
@ -103,7 +103,7 @@ type localObjectInhumer struct {
|
|||
}
|
||||
|
||||
func (r *localObjectInhumer) DeleteObjects(ts oid.Address, addr ...oid.Address) error {
|
||||
prm := new(engine.InhumePrm)
|
||||
var prm engine.InhumePrm
|
||||
prm.WithTarget(ts, addr...)
|
||||
|
||||
_, err := r.storage.Inhume(prm)
|
||||
|
@ -235,7 +235,10 @@ func initObjectService(c *cfg) {
|
|||
),
|
||||
policer.WithReplicator(repl),
|
||||
policer.WithRedundantCopyCallback(func(addr oid.Address) {
|
||||
_, err := ls.Inhume(new(engine.InhumePrm).MarkAsGarbage(addr))
|
||||
var inhumePrm engine.InhumePrm
|
||||
inhumePrm.MarkAsGarbage(addr)
|
||||
|
||||
_, err := ls.Inhume(inhumePrm)
|
||||
if err != nil {
|
||||
c.log.Warn("could not inhume mark redundant copy as garbage",
|
||||
zap.String("error", err.Error()),
|
||||
|
|
|
@ -19,12 +19,10 @@ type DeleteRes struct{}
|
|||
// WithAddresses is a Delete option to set the addresses of the objects to delete.
|
||||
//
|
||||
// Option is required.
|
||||
func (p *DeletePrm) WithAddresses(addr ...oid.Address) *DeletePrm {
|
||||
func (p *DeletePrm) WithAddresses(addr ...oid.Address) {
|
||||
if p != nil {
|
||||
p.addr = append(p.addr, addr...)
|
||||
}
|
||||
|
||||
return p
|
||||
}
|
||||
|
||||
// Delete marks the objects to be removed.
|
||||
|
@ -33,7 +31,7 @@ func (p *DeletePrm) WithAddresses(addr ...oid.Address) *DeletePrm {
|
|||
//
|
||||
// Returns apistatus.ObjectLocked if at least one object is locked.
|
||||
// In this case no object from the list is marked to be deleted.
|
||||
func (e *StorageEngine) Delete(prm *DeletePrm) (res *DeleteRes, err error) {
|
||||
func (e *StorageEngine) Delete(prm DeletePrm) (res *DeleteRes, err error) {
|
||||
err = e.execIfNotBlocked(func() error {
|
||||
res, err = e.delete(prm)
|
||||
return err
|
||||
|
@ -42,13 +40,11 @@ func (e *StorageEngine) Delete(prm *DeletePrm) (res *DeleteRes, err error) {
|
|||
return
|
||||
}
|
||||
|
||||
func (e *StorageEngine) delete(prm *DeletePrm) (*DeleteRes, error) {
|
||||
func (e *StorageEngine) delete(prm DeletePrm) (*DeleteRes, error) {
|
||||
if e.metrics != nil {
|
||||
defer elapsed(e.metrics.AddDeleteDuration)()
|
||||
}
|
||||
|
||||
var shPrm shard.InhumePrm
|
||||
var existsPrm shard.ExistsPrm
|
||||
var locked struct {
|
||||
is bool
|
||||
err apistatus.ObjectLocked
|
||||
|
@ -56,6 +52,7 @@ func (e *StorageEngine) delete(prm *DeletePrm) (*DeleteRes, error) {
|
|||
|
||||
for i := range prm.addr {
|
||||
e.iterateOverSortedShards(prm.addr[i], func(_ int, sh hashedShard) (stop bool) {
|
||||
var existsPrm shard.ExistsPrm
|
||||
existsPrm.WithAddress(prm.addr[i])
|
||||
|
||||
resExists, err := sh.Exists(existsPrm)
|
||||
|
@ -66,6 +63,7 @@ func (e *StorageEngine) delete(prm *DeletePrm) (*DeleteRes, error) {
|
|||
return false
|
||||
}
|
||||
|
||||
var shPrm shard.InhumePrm
|
||||
shPrm.MarkAsGarbage(prm.addr[i])
|
||||
|
||||
_, err = sh.Inhume(shPrm)
|
||||
|
|
|
@ -71,7 +71,7 @@ func TestErrorReporting(t *testing.T) {
|
|||
e.mtx.RUnlock()
|
||||
require.NoError(t, err)
|
||||
|
||||
_, err = e.Get(&GetPrm{addr: object.AddressOf(obj)})
|
||||
_, err = e.Get(GetPrm{addr: object.AddressOf(obj)})
|
||||
require.NoError(t, err)
|
||||
|
||||
checkShardState(t, e, id[0], 0, shard.ModeReadWrite)
|
||||
|
@ -80,7 +80,7 @@ func TestErrorReporting(t *testing.T) {
|
|||
corruptSubDir(t, filepath.Join(dir, "0"))
|
||||
|
||||
for i := uint32(1); i < 3; i++ {
|
||||
_, err = e.Get(&GetPrm{addr: object.AddressOf(obj)})
|
||||
_, err = e.Get(GetPrm{addr: object.AddressOf(obj)})
|
||||
require.Error(t, err)
|
||||
checkShardState(t, e, id[0], i, shard.ModeReadWrite)
|
||||
checkShardState(t, e, id[1], 0, shard.ModeReadWrite)
|
||||
|
@ -101,7 +101,7 @@ func TestErrorReporting(t *testing.T) {
|
|||
e.mtx.RUnlock()
|
||||
require.NoError(t, err)
|
||||
|
||||
_, err = e.Get(&GetPrm{addr: object.AddressOf(obj)})
|
||||
_, err = e.Get(GetPrm{addr: object.AddressOf(obj)})
|
||||
require.NoError(t, err)
|
||||
|
||||
checkShardState(t, e, id[0], 0, shard.ModeReadWrite)
|
||||
|
@ -110,14 +110,14 @@ func TestErrorReporting(t *testing.T) {
|
|||
corruptSubDir(t, filepath.Join(dir, "0"))
|
||||
|
||||
for i := uint32(1); i < errThreshold; i++ {
|
||||
_, err = e.Get(&GetPrm{addr: object.AddressOf(obj)})
|
||||
_, err = e.Get(GetPrm{addr: object.AddressOf(obj)})
|
||||
require.Error(t, err)
|
||||
checkShardState(t, e, id[0], i, shard.ModeReadWrite)
|
||||
checkShardState(t, e, id[1], 0, shard.ModeReadWrite)
|
||||
}
|
||||
|
||||
for i := uint32(0); i < 2; i++ {
|
||||
_, err = e.Get(&GetPrm{addr: object.AddressOf(obj)})
|
||||
_, err = e.Get(GetPrm{addr: object.AddressOf(obj)})
|
||||
require.Error(t, err)
|
||||
checkShardState(t, e, id[0], errThreshold+i, shard.ModeDegraded)
|
||||
checkShardState(t, e, id[1], 0, shard.ModeReadWrite)
|
||||
|
@ -155,9 +155,9 @@ func TestBlobstorFailback(t *testing.T) {
|
|||
|
||||
for i := range objs {
|
||||
addr := object.AddressOf(objs[i])
|
||||
_, err = e.Get(&GetPrm{addr: addr})
|
||||
_, err = e.Get(GetPrm{addr: addr})
|
||||
require.NoError(t, err)
|
||||
_, err = e.GetRange(&RngPrm{addr: addr})
|
||||
_, err = e.GetRange(RngPrm{addr: addr})
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
|
@ -175,15 +175,15 @@ func TestBlobstorFailback(t *testing.T) {
|
|||
|
||||
for i := range objs {
|
||||
addr := object.AddressOf(objs[i])
|
||||
getRes, err := e.Get(&GetPrm{addr: addr})
|
||||
getRes, err := e.Get(GetPrm{addr: addr})
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, objs[i], getRes.Object())
|
||||
|
||||
rngRes, err := e.GetRange(&RngPrm{addr: addr, off: 1, ln: 10})
|
||||
rngRes, err := e.GetRange(RngPrm{addr: addr, off: 1, ln: 10})
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, objs[i].Payload()[1:11], rngRes.Object().Payload())
|
||||
|
||||
_, err = e.GetRange(&RngPrm{addr: addr, off: errSmallSize + 10, ln: 1})
|
||||
_, err = e.GetRange(RngPrm{addr: addr, off: errSmallSize + 10, ln: 1})
|
||||
require.ErrorIs(t, err, object.ErrRangeOutOfBounds)
|
||||
}
|
||||
|
||||
|
|
|
@ -24,12 +24,10 @@ type GetRes struct {
|
|||
// WithAddress is a Get option to set the address of the requested object.
|
||||
//
|
||||
// Option is required.
|
||||
func (p *GetPrm) WithAddress(addr oid.Address) *GetPrm {
|
||||
func (p *GetPrm) WithAddress(addr oid.Address) {
|
||||
if p != nil {
|
||||
p.addr = addr
|
||||
}
|
||||
|
||||
return p
|
||||
}
|
||||
|
||||
// Object returns the requested object.
|
||||
|
@ -46,7 +44,7 @@ func (r *GetRes) Object() *objectSDK.Object {
|
|||
// Returns an error of type apistatus.ObjectAlreadyRemoved if the object has been marked as removed.
|
||||
//
|
||||
// Returns an error if executions are blocked (see BlockExecution).
|
||||
func (e *StorageEngine) Get(prm *GetPrm) (res *GetRes, err error) {
|
||||
func (e *StorageEngine) Get(prm GetPrm) (res *GetRes, err error) {
|
||||
err = e.execIfNotBlocked(func() error {
|
||||
res, err = e.get(prm)
|
||||
return err
|
||||
|
@ -55,7 +53,7 @@ func (e *StorageEngine) Get(prm *GetPrm) (res *GetRes, err error) {
|
|||
return
|
||||
}
|
||||
|
||||
func (e *StorageEngine) get(prm *GetPrm) (*GetRes, error) {
|
||||
func (e *StorageEngine) get(prm GetPrm) (*GetRes, error) {
|
||||
if e.metrics != nil {
|
||||
defer elapsed(e.metrics.AddGetDuration)()
|
||||
}
|
||||
|
@ -152,9 +150,10 @@ func (e *StorageEngine) get(prm *GetPrm) (*GetRes, error) {
|
|||
|
||||
// Get reads object from local storage by provided address.
|
||||
func Get(storage *StorageEngine, addr oid.Address) (*objectSDK.Object, error) {
|
||||
res, err := storage.Get(new(GetPrm).
|
||||
WithAddress(addr),
|
||||
)
|
||||
var getPrm GetPrm
|
||||
getPrm.WithAddress(addr)
|
||||
|
||||
res, err := storage.Get(getPrm)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
@ -24,23 +24,19 @@ type HeadRes struct {
|
|||
// WithAddress is a Head option to set the address of the requested object.
|
||||
//
|
||||
// Option is required.
|
||||
func (p *HeadPrm) WithAddress(addr oid.Address) *HeadPrm {
|
||||
func (p *HeadPrm) WithAddress(addr oid.Address) {
|
||||
if p != nil {
|
||||
p.addr = addr
|
||||
}
|
||||
|
||||
return p
|
||||
}
|
||||
|
||||
// WithRaw is a Head option to set raw flag value. If flag is unset, then Head
|
||||
// returns the header of the virtual object, otherwise it returns SplitInfo of the virtual
|
||||
// object.
|
||||
func (p *HeadPrm) WithRaw(raw bool) *HeadPrm {
|
||||
func (p *HeadPrm) WithRaw(raw bool) {
|
||||
if p != nil {
|
||||
p.raw = raw
|
||||
}
|
||||
|
||||
return p
|
||||
}
|
||||
|
||||
// Header returns the requested object header.
|
||||
|
@ -59,7 +55,7 @@ func (r *HeadRes) Header() *objectSDK.Object {
|
|||
// Returns an error of type apistatus.ObjectAlreadyRemoved if the requested object was inhumed.
|
||||
//
|
||||
// Returns an error if executions are blocked (see BlockExecution).
|
||||
func (e *StorageEngine) Head(prm *HeadPrm) (res *HeadRes, err error) {
|
||||
func (e *StorageEngine) Head(prm HeadPrm) (res *HeadRes, err error) {
|
||||
err = e.execIfNotBlocked(func() error {
|
||||
res, err = e.head(prm)
|
||||
return err
|
||||
|
@ -68,7 +64,7 @@ func (e *StorageEngine) Head(prm *HeadPrm) (res *HeadRes, err error) {
|
|||
return
|
||||
}
|
||||
|
||||
func (e *StorageEngine) head(prm *HeadPrm) (*HeadRes, error) {
|
||||
func (e *StorageEngine) head(prm HeadPrm) (*HeadRes, error) {
|
||||
if e.metrics != nil {
|
||||
defer elapsed(e.metrics.AddHeadDuration)()
|
||||
}
|
||||
|
@ -141,9 +137,10 @@ func (e *StorageEngine) head(prm *HeadPrm) (*HeadRes, error) {
|
|||
|
||||
// Head reads object header from local storage by provided address.
|
||||
func Head(storage *StorageEngine, addr oid.Address) (*objectSDK.Object, error) {
|
||||
res, err := storage.Head(new(HeadPrm).
|
||||
WithAddress(addr),
|
||||
)
|
||||
var headPrm HeadPrm
|
||||
headPrm.WithAddress(addr)
|
||||
|
||||
res, err := storage.Head(headPrm)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -154,10 +151,11 @@ func Head(storage *StorageEngine, addr oid.Address) (*objectSDK.Object, error) {
|
|||
// HeadRaw reads object header from local storage by provided address and raw
|
||||
// flag.
|
||||
func HeadRaw(storage *StorageEngine, addr oid.Address, raw bool) (*objectSDK.Object, error) {
|
||||
res, err := storage.Head(new(HeadPrm).
|
||||
WithAddress(addr).
|
||||
WithRaw(raw),
|
||||
)
|
||||
var headPrm HeadPrm
|
||||
headPrm.WithAddress(addr)
|
||||
headPrm.WithRaw(raw)
|
||||
|
||||
res, err := storage.Head(headPrm)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
@ -61,7 +61,10 @@ func TestHeadRaw(t *testing.T) {
|
|||
require.NoError(t, err)
|
||||
|
||||
// head with raw flag should return SplitInfoError
|
||||
headPrm := new(HeadPrm).WithAddress(parentAddr).WithRaw(true)
|
||||
var headPrm HeadPrm
|
||||
headPrm.WithAddress(parentAddr)
|
||||
headPrm.WithRaw(true)
|
||||
|
||||
_, err = e.Head(headPrm)
|
||||
require.Error(t, err)
|
||||
|
||||
|
|
|
@ -25,25 +25,21 @@ type InhumeRes struct{}
|
|||
//
|
||||
// tombstone should not be nil, addr should not be empty.
|
||||
// Should not be called along with MarkAsGarbage.
|
||||
func (p *InhumePrm) WithTarget(tombstone oid.Address, addrs ...oid.Address) *InhumePrm {
|
||||
func (p *InhumePrm) WithTarget(tombstone oid.Address, addrs ...oid.Address) {
|
||||
if p != nil {
|
||||
p.addrs = addrs
|
||||
p.tombstone = &tombstone
|
||||
}
|
||||
|
||||
return p
|
||||
}
|
||||
|
||||
// MarkAsGarbage marks an object to be physically removed from local storage.
|
||||
//
|
||||
// Should not be called along with WithTarget.
|
||||
func (p *InhumePrm) MarkAsGarbage(addrs ...oid.Address) *InhumePrm {
|
||||
func (p *InhumePrm) MarkAsGarbage(addrs ...oid.Address) {
|
||||
if p != nil {
|
||||
p.addrs = addrs
|
||||
p.tombstone = nil
|
||||
}
|
||||
|
||||
return p
|
||||
}
|
||||
|
||||
var errInhumeFailure = errors.New("inhume operation failed")
|
||||
|
@ -55,7 +51,7 @@ var errInhumeFailure = errors.New("inhume operation failed")
|
|||
// if at least one object is locked.
|
||||
//
|
||||
// Returns an error if executions are blocked (see BlockExecution).
|
||||
func (e *StorageEngine) Inhume(prm *InhumePrm) (res *InhumeRes, err error) {
|
||||
func (e *StorageEngine) Inhume(prm InhumePrm) (res *InhumeRes, err error) {
|
||||
err = e.execIfNotBlocked(func() error {
|
||||
res, err = e.inhume(prm)
|
||||
return err
|
||||
|
@ -64,7 +60,7 @@ func (e *StorageEngine) Inhume(prm *InhumePrm) (res *InhumeRes, err error) {
|
|||
return
|
||||
}
|
||||
|
||||
func (e *StorageEngine) inhume(prm *InhumePrm) (*InhumeRes, error) {
|
||||
func (e *StorageEngine) inhume(prm InhumePrm) (*InhumeRes, error) {
|
||||
if e.metrics != nil {
|
||||
defer elapsed(e.metrics.AddInhumeDuration)()
|
||||
}
|
||||
|
|
|
@ -43,7 +43,9 @@ func TestStorageEngine_Inhume(t *testing.T) {
|
|||
err := Put(e, parent)
|
||||
require.NoError(t, err)
|
||||
|
||||
inhumePrm := new(InhumePrm).WithTarget(tombstoneID, object.AddressOf(parent))
|
||||
var inhumePrm InhumePrm
|
||||
inhumePrm.WithTarget(tombstoneID, object.AddressOf(parent))
|
||||
|
||||
_, err = e.Inhume(inhumePrm)
|
||||
require.NoError(t, err)
|
||||
|
||||
|
@ -69,7 +71,9 @@ func TestStorageEngine_Inhume(t *testing.T) {
|
|||
_, err = s2.Put(putLink)
|
||||
require.NoError(t, err)
|
||||
|
||||
inhumePrm := new(InhumePrm).WithTarget(tombstoneID, object.AddressOf(parent))
|
||||
var inhumePrm InhumePrm
|
||||
inhumePrm.WithTarget(tombstoneID, object.AddressOf(parent))
|
||||
|
||||
_, err = e.Inhume(inhumePrm)
|
||||
require.NoError(t, err)
|
||||
|
||||
|
|
|
@ -25,17 +25,15 @@ type ListWithCursorPrm struct {
|
|||
}
|
||||
|
||||
// WithCount sets the maximum amount of addresses that ListWithCursor should return.
|
||||
func (p *ListWithCursorPrm) WithCount(count uint32) *ListWithCursorPrm {
|
||||
func (p *ListWithCursorPrm) WithCount(count uint32) {
|
||||
p.count = count
|
||||
return p
|
||||
}
|
||||
|
||||
// WithCursor sets a cursor for ListWithCursor operation. For initial request
|
||||
// ignore this param or use nil value. For consecutive requests, use value
|
||||
// from ListWithCursorRes.
|
||||
func (p *ListWithCursorPrm) WithCursor(cursor *Cursor) *ListWithCursorPrm {
|
||||
func (p *ListWithCursorPrm) WithCursor(cursor *Cursor) {
|
||||
p.cursor = cursor
|
||||
return p
|
||||
}
|
||||
|
||||
// ListWithCursorRes contains values returned from ListWithCursor operation.
|
||||
|
@ -61,7 +59,7 @@ func (l ListWithCursorRes) Cursor() *Cursor {
|
|||
//
|
||||
// Returns ErrEndOfListing if there are no more objects to return or count
|
||||
// parameter set to zero.
|
||||
func (e *StorageEngine) ListWithCursor(prm *ListWithCursorPrm) (*ListWithCursorRes, error) {
|
||||
func (e *StorageEngine) ListWithCursor(prm ListWithCursorPrm) (*ListWithCursorRes, error) {
|
||||
result := make([]oid.Address, 0, prm.count)
|
||||
|
||||
// 1. Get available shards and sort them.
|
||||
|
|
|
@ -30,7 +30,10 @@ func TestListWithCursor(t *testing.T) {
|
|||
for i := 0; i < total; i++ {
|
||||
containerID := cidtest.ID()
|
||||
obj := generateObjectWithCID(t, containerID)
|
||||
prm := new(PutPrm).WithObject(obj)
|
||||
|
||||
var prm PutPrm
|
||||
prm.WithObject(obj)
|
||||
|
||||
_, err := e.Put(prm)
|
||||
require.NoError(t, err)
|
||||
expected = append(expected, object.AddressOf(obj))
|
||||
|
@ -38,21 +41,27 @@ func TestListWithCursor(t *testing.T) {
|
|||
|
||||
expected = sortAddresses(expected)
|
||||
|
||||
prm := new(ListWithCursorPrm).WithCount(1)
|
||||
var prm ListWithCursorPrm
|
||||
prm.WithCount(1)
|
||||
|
||||
res, err := e.ListWithCursor(prm)
|
||||
require.NoError(t, err)
|
||||
require.NotEmpty(t, res.AddressList())
|
||||
got = append(got, res.AddressList()...)
|
||||
|
||||
for i := 0; i < total-1; i++ {
|
||||
res, err = e.ListWithCursor(prm.WithCursor(res.Cursor()))
|
||||
prm.WithCursor(res.Cursor())
|
||||
|
||||
res, err = e.ListWithCursor(prm)
|
||||
if errors.Is(err, ErrEndOfListing) {
|
||||
break
|
||||
}
|
||||
got = append(got, res.AddressList()...)
|
||||
}
|
||||
|
||||
_, err = e.ListWithCursor(prm.WithCursor(res.Cursor()))
|
||||
prm.WithCursor(res.Cursor())
|
||||
|
||||
_, err = e.ListWithCursor(prm)
|
||||
require.ErrorIs(t, err, ErrEndOfListing)
|
||||
|
||||
got = sortAddresses(got)
|
||||
|
|
|
@ -88,7 +88,10 @@ func TestLockUserScenario(t *testing.T) {
|
|||
require.NoError(t, err)
|
||||
|
||||
// 3.
|
||||
_, err = e.Inhume(new(InhumePrm).WithTarget(tombAddr, objAddr))
|
||||
var inhumePrm InhumePrm
|
||||
inhumePrm.WithTarget(tombAddr, objAddr)
|
||||
|
||||
_, err = e.Inhume(inhumePrm)
|
||||
require.ErrorAs(t, err, new(apistatus.ObjectLocked))
|
||||
|
||||
// 4.
|
||||
|
@ -104,7 +107,8 @@ func TestLockUserScenario(t *testing.T) {
|
|||
err = Put(e, tombObj)
|
||||
require.NoError(t, err)
|
||||
|
||||
_, err = e.Inhume(new(InhumePrm).WithTarget(tombForLockAddr, lockerAddr))
|
||||
inhumePrm.WithTarget(tombForLockAddr, lockerAddr)
|
||||
_, err = e.Inhume(inhumePrm)
|
||||
require.NoError(t, err, new(apistatus.ObjectLocked))
|
||||
|
||||
// 5.
|
||||
|
@ -115,7 +119,8 @@ func TestLockUserScenario(t *testing.T) {
|
|||
// delay for GC
|
||||
time.Sleep(time.Second)
|
||||
|
||||
_, err = e.Inhume(new(InhumePrm).WithTarget(tombAddr, objAddr))
|
||||
inhumePrm.WithTarget(tombAddr, objAddr)
|
||||
_, err = e.Inhume(inhumePrm)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
|
@ -177,7 +182,10 @@ func TestLockExpiration(t *testing.T) {
|
|||
err = e.Lock(cnr, idLock, []oid.ID{id})
|
||||
require.NoError(t, err)
|
||||
|
||||
_, err = e.Inhume(new(InhumePrm).WithTarget(oidtest.Address(), objectcore.AddressOf(obj)))
|
||||
var inhumePrm InhumePrm
|
||||
inhumePrm.WithTarget(oidtest.Address(), objectcore.AddressOf(obj))
|
||||
|
||||
_, err = e.Inhume(inhumePrm)
|
||||
require.ErrorAs(t, err, new(apistatus.ObjectLocked))
|
||||
|
||||
// 3.
|
||||
|
@ -190,6 +198,7 @@ func TestLockExpiration(t *testing.T) {
|
|||
time.Sleep(time.Second)
|
||||
|
||||
// 4.
|
||||
_, err = e.Inhume(new(InhumePrm).WithTarget(oidtest.Address(), objectcore.AddressOf(obj)))
|
||||
inhumePrm.WithTarget(oidtest.Address(), objectcore.AddressOf(obj))
|
||||
_, err = e.Inhume(inhumePrm)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
|
|
@ -22,12 +22,10 @@ var errPutShard = errors.New("could not put object to any shard")
|
|||
// WithObject is a Put option to set object to save.
|
||||
//
|
||||
// Option is required.
|
||||
func (p *PutPrm) WithObject(obj *objectSDK.Object) *PutPrm {
|
||||
func (p *PutPrm) WithObject(obj *objectSDK.Object) {
|
||||
if p != nil {
|
||||
p.obj = obj
|
||||
}
|
||||
|
||||
return p
|
||||
}
|
||||
|
||||
// Put saves the object to local storage.
|
||||
|
@ -38,7 +36,7 @@ func (p *PutPrm) WithObject(obj *objectSDK.Object) *PutPrm {
|
|||
// Returns an error if executions are blocked (see BlockExecution).
|
||||
//
|
||||
// Returns an error of type apistatus.ObjectAlreadyRemoved if the object has been marked as removed.
|
||||
func (e *StorageEngine) Put(prm *PutPrm) (res *PutRes, err error) {
|
||||
func (e *StorageEngine) Put(prm PutPrm) (res *PutRes, err error) {
|
||||
err = e.execIfNotBlocked(func() error {
|
||||
res, err = e.put(prm)
|
||||
return err
|
||||
|
@ -47,7 +45,7 @@ func (e *StorageEngine) Put(prm *PutPrm) (res *PutRes, err error) {
|
|||
return
|
||||
}
|
||||
|
||||
func (e *StorageEngine) put(prm *PutPrm) (*PutRes, error) {
|
||||
func (e *StorageEngine) put(prm PutPrm) (*PutRes, error) {
|
||||
if e.metrics != nil {
|
||||
defer elapsed(e.metrics.AddPutDuration)()
|
||||
}
|
||||
|
@ -132,9 +130,10 @@ func (e *StorageEngine) put(prm *PutPrm) (*PutRes, error) {
|
|||
|
||||
// Put writes provided object to local storage.
|
||||
func Put(storage *StorageEngine, obj *objectSDK.Object) error {
|
||||
_, err := storage.Put(new(PutPrm).
|
||||
WithObject(obj),
|
||||
)
|
||||
var putPrm PutPrm
|
||||
putPrm.WithObject(obj)
|
||||
|
||||
_, err := storage.Put(putPrm)
|
||||
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -27,24 +27,20 @@ type RngRes struct {
|
|||
// WithAddress is a GetRng option to set the address of the requested object.
|
||||
//
|
||||
// Option is required.
|
||||
func (p *RngPrm) WithAddress(addr oid.Address) *RngPrm {
|
||||
func (p *RngPrm) WithAddress(addr oid.Address) {
|
||||
if p != nil {
|
||||
p.addr = addr
|
||||
}
|
||||
|
||||
return p
|
||||
}
|
||||
|
||||
// WithPayloadRange is a GetRange option to set range of requested payload data.
|
||||
//
|
||||
// Missing an option or calling with zero length is equivalent
|
||||
// to getting the full payload range.
|
||||
func (p *RngPrm) WithPayloadRange(rng *objectSDK.Range) *RngPrm {
|
||||
func (p *RngPrm) WithPayloadRange(rng *objectSDK.Range) {
|
||||
if p != nil {
|
||||
p.off, p.ln = rng.GetOffset(), rng.GetLength()
|
||||
}
|
||||
|
||||
return p
|
||||
}
|
||||
|
||||
// Object returns the requested object part.
|
||||
|
@ -64,7 +60,7 @@ func (r *RngRes) Object() *objectSDK.Object {
|
|||
// Returns ErrRangeOutOfBounds if the requested object range is out of bounds.
|
||||
//
|
||||
// Returns an error if executions are blocked (see BlockExecution).
|
||||
func (e *StorageEngine) GetRange(prm *RngPrm) (res *RngRes, err error) {
|
||||
func (e *StorageEngine) GetRange(prm RngPrm) (res *RngRes, err error) {
|
||||
err = e.execIfNotBlocked(func() error {
|
||||
res, err = e.getRange(prm)
|
||||
return err
|
||||
|
@ -73,7 +69,7 @@ func (e *StorageEngine) GetRange(prm *RngPrm) (res *RngRes, err error) {
|
|||
return
|
||||
}
|
||||
|
||||
func (e *StorageEngine) getRange(prm *RngPrm) (*RngRes, error) {
|
||||
func (e *StorageEngine) getRange(prm RngPrm) (*RngRes, error) {
|
||||
if e.metrics != nil {
|
||||
defer elapsed(e.metrics.AddRangeDuration)()
|
||||
}
|
||||
|
@ -179,10 +175,11 @@ func (e *StorageEngine) getRange(prm *RngPrm) (*RngRes, error) {
|
|||
|
||||
// GetRange reads object payload range from local storage by provided address.
|
||||
func GetRange(storage *StorageEngine, addr oid.Address, rng *objectSDK.Range) ([]byte, error) {
|
||||
res, err := storage.GetRange(new(RngPrm).
|
||||
WithAddress(addr).
|
||||
WithPayloadRange(rng),
|
||||
)
|
||||
var rangePrm RngPrm
|
||||
rangePrm.WithAddress(addr)
|
||||
rangePrm.WithPayloadRange(rng)
|
||||
|
||||
res, err := storage.GetRange(rangePrm)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
@ -19,21 +19,17 @@ type SelectRes struct {
|
|||
}
|
||||
|
||||
// WithContainerID is a Select option to set the container id to search in.
|
||||
func (p *SelectPrm) WithContainerID(cnr cid.ID) *SelectPrm {
|
||||
func (p *SelectPrm) WithContainerID(cnr cid.ID) {
|
||||
if p != nil {
|
||||
p.cnr = cnr
|
||||
}
|
||||
|
||||
return p
|
||||
}
|
||||
|
||||
// WithFilters is a Select option to set the object filters.
|
||||
func (p *SelectPrm) WithFilters(fs object.SearchFilters) *SelectPrm {
|
||||
func (p *SelectPrm) WithFilters(fs object.SearchFilters) {
|
||||
if p != nil {
|
||||
p.filters = fs
|
||||
}
|
||||
|
||||
return p
|
||||
}
|
||||
|
||||
// AddressList returns list of addresses of the selected objects.
|
||||
|
@ -46,7 +42,7 @@ func (r *SelectRes) AddressList() []oid.Address {
|
|||
// Returns any error encountered that did not allow to completely select the objects.
|
||||
//
|
||||
// Returns an error if executions are blocked (see BlockExecution).
|
||||
func (e *StorageEngine) Select(prm *SelectPrm) (res *SelectRes, err error) {
|
||||
func (e *StorageEngine) Select(prm SelectPrm) (res *SelectRes, err error) {
|
||||
err = e.execIfNotBlocked(func() error {
|
||||
res, err = e._select(prm)
|
||||
return err
|
||||
|
@ -55,7 +51,7 @@ func (e *StorageEngine) Select(prm *SelectPrm) (res *SelectRes, err error) {
|
|||
return
|
||||
}
|
||||
|
||||
func (e *StorageEngine) _select(prm *SelectPrm) (*SelectRes, error) {
|
||||
func (e *StorageEngine) _select(prm SelectPrm) (*SelectRes, error) {
|
||||
if e.metrics != nil {
|
||||
defer elapsed(e.metrics.AddSearchDuration)()
|
||||
}
|
||||
|
@ -142,10 +138,11 @@ func (e *StorageEngine) list(limit uint64) (*SelectRes, error) {
|
|||
|
||||
// Select selects objects from local storage using provided filters.
|
||||
func Select(storage *StorageEngine, cnr cid.ID, fs object.SearchFilters) ([]oid.Address, error) {
|
||||
res, err := storage.Select(new(SelectPrm).
|
||||
WithContainerID(cnr).
|
||||
WithFilters(fs),
|
||||
)
|
||||
var selectPrm SelectPrm
|
||||
selectPrm.WithContainerID(cnr)
|
||||
selectPrm.WithFilters(fs)
|
||||
|
||||
res, err := storage.Select(selectPrm)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
@ -171,29 +171,32 @@ func (c *clientWrapper) getObject(exec *execCtx, info coreclient.NodeInfo) (*obj
|
|||
|
||||
func (e *storageEngineWrapper) get(exec *execCtx) (*object.Object, error) {
|
||||
if exec.headOnly() {
|
||||
r, err := e.engine.Head(new(engine.HeadPrm).
|
||||
WithAddress(exec.address()).
|
||||
WithRaw(exec.isRaw()),
|
||||
)
|
||||
var headPrm engine.HeadPrm
|
||||
headPrm.WithAddress(exec.address())
|
||||
headPrm.WithRaw(exec.isRaw())
|
||||
|
||||
r, err := e.engine.Head(headPrm)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return r.Header(), nil
|
||||
} else if rng := exec.ctxRange(); rng != nil {
|
||||
r, err := e.engine.GetRange(new(engine.RngPrm).
|
||||
WithAddress(exec.address()).
|
||||
WithPayloadRange(rng),
|
||||
)
|
||||
var getRange engine.RngPrm
|
||||
getRange.WithAddress(exec.address())
|
||||
getRange.WithPayloadRange(rng)
|
||||
|
||||
r, err := e.engine.GetRange(getRange)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return r.Object(), nil
|
||||
} else {
|
||||
r, err := e.engine.Get(new(engine.GetPrm).
|
||||
WithAddress(exec.address()),
|
||||
)
|
||||
var getPrm engine.GetPrm
|
||||
getPrm.WithAddress(exec.address())
|
||||
|
||||
r, err := e.engine.Get(getPrm)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
@ -120,10 +120,11 @@ func (c *clientWrapper) searchObjects(exec *execCtx, info client.NodeInfo) ([]oi
|
|||
}
|
||||
|
||||
func (e *storageEngineWrapper) search(exec *execCtx) ([]oid.ID, error) {
|
||||
r, err := (*engine.StorageEngine)(e).Select(new(engine.SelectPrm).
|
||||
WithFilters(exec.searchFilters()).
|
||||
WithContainerID(exec.containerID()),
|
||||
)
|
||||
var selectPrm engine.SelectPrm
|
||||
selectPrm.WithFilters(exec.searchFilters())
|
||||
selectPrm.WithContainerID(exec.containerID())
|
||||
|
||||
r, err := (*engine.StorageEngine)(e).Select(selectPrm)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
@ -24,7 +24,7 @@ func (p *Policer) processObject(ctx context.Context, addr oid.Address) {
|
|||
zap.String("error", err.Error()),
|
||||
)
|
||||
if container.IsErrNotFound(err) {
|
||||
prm := new(engine.InhumePrm)
|
||||
var prm engine.InhumePrm
|
||||
prm.MarkAsGarbage(addr)
|
||||
_, err := p.jobQueue.localStorage.Inhume(prm)
|
||||
if err != nil {
|
||||
|
|
|
@ -12,7 +12,7 @@ type jobQueue struct {
|
|||
}
|
||||
|
||||
func (q *jobQueue) Select(cursor *engine.Cursor, count uint32) ([]oid.Address, *engine.Cursor, error) {
|
||||
prm := new(engine.ListWithCursorPrm)
|
||||
var prm engine.ListWithCursorPrm
|
||||
prm.WithCursor(cursor)
|
||||
prm.WithCount(count)
|
||||
|
||||
|
|
Loading…
Reference in a new issue