[#1418] shard: Do not use pointers as parameters
Signed-off-by: Pavel Karpy <carpawell@nspcc.ru>
This commit is contained in:
parent
e265ce2d52
commit
6e752f36dc
39 changed files with 205 additions and 161 deletions
|
@ -47,8 +47,8 @@ func (e *StorageEngine) delete(prm *DeletePrm) (*DeleteRes, error) {
|
|||
defer elapsed(e.metrics.AddDeleteDuration)()
|
||||
}
|
||||
|
||||
shPrm := new(shard.InhumePrm)
|
||||
existsPrm := new(shard.ExistsPrm)
|
||||
var shPrm shard.InhumePrm
|
||||
var existsPrm shard.ExistsPrm
|
||||
var locked struct {
|
||||
is bool
|
||||
err apistatus.ObjectLocked
|
||||
|
@ -56,7 +56,9 @@ func (e *StorageEngine) delete(prm *DeletePrm) (*DeleteRes, error) {
|
|||
|
||||
for i := range prm.addr {
|
||||
e.iterateOverSortedShards(prm.addr[i], func(_ int, sh hashedShard) (stop bool) {
|
||||
resExists, err := sh.Exists(existsPrm.WithAddress(prm.addr[i]))
|
||||
existsPrm.WithAddress(prm.addr[i])
|
||||
|
||||
resExists, err := sh.Exists(existsPrm)
|
||||
if err != nil {
|
||||
e.reportShardError(sh, "could not check object existence", err)
|
||||
return false
|
||||
|
@ -64,7 +66,9 @@ func (e *StorageEngine) delete(prm *DeletePrm) (*DeleteRes, error) {
|
|||
return false
|
||||
}
|
||||
|
||||
_, err = sh.Inhume(shPrm.MarkAsGarbage(prm.addr[i]))
|
||||
shPrm.MarkAsGarbage(prm.addr[i])
|
||||
|
||||
_, err = sh.Inhume(shPrm)
|
||||
if err != nil {
|
||||
e.reportShardError(sh, "could not inhume object in shard", err)
|
||||
|
||||
|
|
|
@ -5,7 +5,7 @@ import "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard"
|
|||
// DumpShard dumps objects from the shard with provided identifier.
|
||||
//
|
||||
// Returns an error if shard is not read-only.
|
||||
func (e *StorageEngine) DumpShard(id *shard.ID, prm *shard.DumpPrm) error {
|
||||
func (e *StorageEngine) DumpShard(id *shard.ID, prm shard.DumpPrm) error {
|
||||
e.mtx.RLock()
|
||||
defer e.mtx.RUnlock()
|
||||
|
||||
|
|
|
@ -64,7 +64,8 @@ func TestErrorReporting(t *testing.T) {
|
|||
obj := generateObjectWithCID(t, cidtest.ID())
|
||||
obj.SetPayload(make([]byte, errSmallSize))
|
||||
|
||||
prm := new(shard.PutPrm).WithObject(obj)
|
||||
var prm shard.PutPrm
|
||||
prm.WithObject(obj)
|
||||
e.mtx.RLock()
|
||||
_, err := e.shards[id[0].String()].Shard.Put(prm)
|
||||
e.mtx.RUnlock()
|
||||
|
@ -93,7 +94,8 @@ func TestErrorReporting(t *testing.T) {
|
|||
obj := generateObjectWithCID(t, cidtest.ID())
|
||||
obj.SetPayload(make([]byte, errSmallSize))
|
||||
|
||||
prm := new(shard.PutPrm).WithObject(obj)
|
||||
var prm shard.PutPrm
|
||||
prm.WithObject(obj)
|
||||
e.mtx.RLock()
|
||||
_, err := e.shards[id[0].String()].Put(prm)
|
||||
e.mtx.RUnlock()
|
||||
|
@ -142,7 +144,8 @@ func TestBlobstorFailback(t *testing.T) {
|
|||
obj := generateObjectWithCID(t, cidtest.ID())
|
||||
obj.SetPayload(make([]byte, size))
|
||||
|
||||
prm := new(shard.PutPrm).WithObject(obj)
|
||||
var prm shard.PutPrm
|
||||
prm.WithObject(obj)
|
||||
e.mtx.RLock()
|
||||
_, err = e.shards[id[0].String()].Shard.Put(prm)
|
||||
e.mtx.RUnlock()
|
||||
|
|
|
@ -7,7 +7,8 @@ import (
|
|||
)
|
||||
|
||||
func (e *StorageEngine) exists(addr oid.Address) (bool, error) {
|
||||
shPrm := new(shard.ExistsPrm).WithAddress(addr)
|
||||
var shPrm shard.ExistsPrm
|
||||
shPrm.WithAddress(addr)
|
||||
alreadyRemoved := false
|
||||
exists := false
|
||||
|
||||
|
|
|
@ -73,8 +73,8 @@ func (e *StorageEngine) get(prm *GetPrm) (*GetRes, error) {
|
|||
metaError error
|
||||
)
|
||||
|
||||
shPrm := new(shard.GetPrm).
|
||||
WithAddress(prm.addr)
|
||||
var shPrm shard.GetPrm
|
||||
shPrm.WithAddress(prm.addr)
|
||||
|
||||
e.iterateOverSortedShards(prm.addr, func(_ int, sh hashedShard) (stop bool) {
|
||||
res, err := sh.Get(shPrm)
|
||||
|
@ -131,7 +131,7 @@ func (e *StorageEngine) get(prm *GetPrm) (*GetRes, error) {
|
|||
// If the object is not found but is present in metabase,
|
||||
// try to fetch it from blobstor directly. If it is found in any
|
||||
// blobstor, increase the error counter for the shard which contains the meta.
|
||||
shPrm = shPrm.WithIgnoreMeta(true)
|
||||
shPrm.WithIgnoreMeta(true)
|
||||
|
||||
e.iterateOverSortedShards(prm.addr, func(_ int, sh hashedShard) (stop bool) {
|
||||
res, err := sh.Get(shPrm)
|
||||
|
|
|
@ -83,9 +83,9 @@ func (e *StorageEngine) head(prm *HeadPrm) (*HeadRes, error) {
|
|||
outError error = errNotFound
|
||||
)
|
||||
|
||||
shPrm := new(shard.HeadPrm).
|
||||
WithAddress(prm.addr).
|
||||
WithRaw(prm.raw)
|
||||
var shPrm shard.HeadPrm
|
||||
shPrm.WithAddress(prm.addr)
|
||||
shPrm.WithRaw(prm.raw)
|
||||
|
||||
e.iterateOverSortedShards(prm.addr, func(_ int, sh hashedShard) (stop bool) {
|
||||
res, err := sh.Head(shPrm)
|
||||
|
|
|
@ -46,8 +46,11 @@ func TestHeadRaw(t *testing.T) {
|
|||
e := testNewEngineWithShards(s1, s2)
|
||||
defer e.Close()
|
||||
|
||||
putPrmLeft := new(shard.PutPrm).WithObject(child)
|
||||
putPrmLink := new(shard.PutPrm).WithObject(link)
|
||||
var putPrmLeft shard.PutPrm
|
||||
putPrmLeft.WithObject(child)
|
||||
|
||||
var putPrmLink shard.PutPrm
|
||||
putPrmLink.WithObject(link)
|
||||
|
||||
// put most left object in one shard
|
||||
_, err := s1.Put(putPrmLeft)
|
||||
|
|
|
@ -69,7 +69,7 @@ func (e *StorageEngine) inhume(prm *InhumePrm) (*InhumeRes, error) {
|
|||
defer elapsed(e.metrics.AddInhumeDuration)()
|
||||
}
|
||||
|
||||
shPrm := new(shard.InhumePrm)
|
||||
var shPrm shard.InhumePrm
|
||||
|
||||
for i := range prm.addrs {
|
||||
if prm.tombstone != nil {
|
||||
|
@ -98,9 +98,10 @@ func (e *StorageEngine) inhume(prm *InhumePrm) (*InhumeRes, error) {
|
|||
// 0 - fail
|
||||
// 1 - object locked
|
||||
// 2 - ok
|
||||
func (e *StorageEngine) inhumeAddr(addr oid.Address, prm *shard.InhumePrm, checkExists bool) (status uint8) {
|
||||
func (e *StorageEngine) inhumeAddr(addr oid.Address, prm shard.InhumePrm, checkExists bool) (status uint8) {
|
||||
root := false
|
||||
var errLocked apistatus.ObjectLocked
|
||||
var existPrm shard.ExistsPrm
|
||||
|
||||
e.iterateOverSortedShards(addr, func(_ int, sh hashedShard) (stop bool) {
|
||||
defer func() {
|
||||
|
@ -112,9 +113,8 @@ func (e *StorageEngine) inhumeAddr(addr oid.Address, prm *shard.InhumePrm, check
|
|||
}()
|
||||
|
||||
if checkExists {
|
||||
exRes, err := sh.Exists(new(shard.ExistsPrm).
|
||||
WithAddress(addr),
|
||||
)
|
||||
existPrm.WithAddress(addr)
|
||||
exRes, err := sh.Exists(existPrm)
|
||||
if err != nil {
|
||||
if shard.IsErrRemoved(err) {
|
||||
// inhumed once - no need to be inhumed again
|
||||
|
|
|
@ -59,11 +59,13 @@ func TestStorageEngine_Inhume(t *testing.T) {
|
|||
e := testNewEngineWithShards(s1, s2)
|
||||
defer e.Close()
|
||||
|
||||
putChild := new(shard.PutPrm).WithObject(child)
|
||||
var putChild shard.PutPrm
|
||||
putChild.WithObject(child)
|
||||
_, err := s1.Put(putChild)
|
||||
require.NoError(t, err)
|
||||
|
||||
putLink := new(shard.PutPrm).WithObject(link)
|
||||
var putLink shard.PutPrm
|
||||
putLink.WithObject(link)
|
||||
_, err = s2.Put(putLink)
|
||||
require.NoError(t, err)
|
||||
|
||||
|
|
|
@ -104,7 +104,8 @@ func (e *StorageEngine) ListWithCursor(prm *ListWithCursorPrm) (*ListWithCursorR
|
|||
}
|
||||
|
||||
count := uint32(int(prm.count) - len(result))
|
||||
shardPrm := new(shard.ListWithCursorPrm).WithCount(count)
|
||||
var shardPrm shard.ListWithCursorPrm
|
||||
shardPrm.WithCount(count)
|
||||
if shardIDs[i] == cursor.shardID {
|
||||
shardPrm.WithCursor(cursor.shardCursor)
|
||||
}
|
||||
|
|
|
@ -65,9 +65,10 @@ func (e *StorageEngine) lockSingle(idCnr cid.ID, locker, locked oid.ID, checkExi
|
|||
}()
|
||||
|
||||
if checkExists {
|
||||
exRes, err := sh.Exists(new(shard.ExistsPrm).
|
||||
WithAddress(addrLocked),
|
||||
)
|
||||
var existsPrm shard.ExistsPrm
|
||||
existsPrm.WithAddress(addrLocked)
|
||||
|
||||
exRes, err := sh.Exists(existsPrm)
|
||||
if err != nil {
|
||||
var siErr *objectSDK.SplitInfoError
|
||||
if !errors.As(err, &siErr) {
|
||||
|
|
|
@ -61,7 +61,7 @@ func (e *StorageEngine) put(prm *PutPrm) (*PutRes, error) {
|
|||
return nil, err
|
||||
}
|
||||
|
||||
existPrm := new(shard.ExistsPrm)
|
||||
var existPrm shard.ExistsPrm
|
||||
existPrm.WithAddress(addr)
|
||||
|
||||
finished := false
|
||||
|
@ -83,7 +83,7 @@ func (e *StorageEngine) put(prm *PutPrm) (*PutRes, error) {
|
|||
|
||||
if exists.Exists() {
|
||||
if ind != 0 {
|
||||
toMoveItPrm := new(shard.ToMoveItPrm)
|
||||
var toMoveItPrm shard.ToMoveItPrm
|
||||
toMoveItPrm.WithAddress(addr)
|
||||
|
||||
_, err = sh.ToMoveIt(toMoveItPrm)
|
||||
|
@ -100,7 +100,7 @@ func (e *StorageEngine) put(prm *PutPrm) (*PutRes, error) {
|
|||
return
|
||||
}
|
||||
|
||||
putPrm := new(shard.PutPrm)
|
||||
var putPrm shard.PutPrm
|
||||
putPrm.WithObject(prm.obj)
|
||||
|
||||
_, err = sh.Put(putPrm)
|
||||
|
|
|
@ -91,9 +91,9 @@ func (e *StorageEngine) getRange(prm *RngPrm) (*RngRes, error) {
|
|||
metaError error
|
||||
)
|
||||
|
||||
shPrm := new(shard.RngPrm).
|
||||
WithAddress(prm.addr).
|
||||
WithRange(prm.off, prm.ln)
|
||||
var shPrm shard.RngPrm
|
||||
shPrm.WithAddress(prm.addr)
|
||||
shPrm.WithRange(prm.off, prm.ln)
|
||||
|
||||
e.iterateOverSortedShards(prm.addr, func(_ int, sh hashedShard) (stop bool) {
|
||||
res, err := sh.GetRange(shPrm)
|
||||
|
@ -152,7 +152,7 @@ func (e *StorageEngine) getRange(prm *RngPrm) (*RngRes, error) {
|
|||
// If the object is not found but is present in metabase,
|
||||
// try to fetch it from blobstor directly. If it is found in any
|
||||
// blobstor, increase the error counter for the shard which contains the meta.
|
||||
shPrm = shPrm.WithIgnoreMeta(true)
|
||||
shPrm.WithIgnoreMeta(true)
|
||||
|
||||
e.iterateOverSortedShards(prm.addr, func(_ int, sh hashedShard) (stop bool) {
|
||||
res, err := sh.GetRange(shPrm)
|
||||
|
|
|
@ -5,7 +5,7 @@ import "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard"
|
|||
// RestoreShard restores objects from dump to the shard with provided identifier.
|
||||
//
|
||||
// Returns an error if shard is not read-only.
|
||||
func (e *StorageEngine) RestoreShard(id *shard.ID, prm *shard.RestorePrm) error {
|
||||
func (e *StorageEngine) RestoreShard(id *shard.ID, prm shard.RestorePrm) error {
|
||||
e.mtx.RLock()
|
||||
defer e.mtx.RUnlock()
|
||||
|
||||
|
|
|
@ -65,9 +65,9 @@ func (e *StorageEngine) _select(prm *SelectPrm) (*SelectRes, error) {
|
|||
|
||||
var outError error
|
||||
|
||||
shPrm := new(shard.SelectPrm).
|
||||
WithContainerID(prm.cnr).
|
||||
WithFilters(prm.filters)
|
||||
var shPrm shard.SelectPrm
|
||||
shPrm.WithContainerID(prm.cnr)
|
||||
shPrm.WithFilters(prm.filters)
|
||||
|
||||
e.iterateOverUnsortedShards(func(sh hashedShard) (stop bool) {
|
||||
res, err := sh.Select(shPrm)
|
||||
|
|
|
@ -14,19 +14,17 @@ type ContainerSizeRes struct {
|
|||
size uint64
|
||||
}
|
||||
|
||||
func (p *ContainerSizePrm) WithContainerID(cnr cid.ID) *ContainerSizePrm {
|
||||
func (p *ContainerSizePrm) WithContainerID(cnr cid.ID) {
|
||||
if p != nil {
|
||||
p.cnr = cnr
|
||||
}
|
||||
|
||||
return p
|
||||
}
|
||||
|
||||
func (r *ContainerSizeRes) Size() uint64 {
|
||||
return r.size
|
||||
}
|
||||
|
||||
func (s *Shard) ContainerSize(prm *ContainerSizePrm) (*ContainerSizeRes, error) {
|
||||
func (s *Shard) ContainerSize(prm ContainerSizePrm) (*ContainerSizeRes, error) {
|
||||
size, err := s.metaBase.ContainerSize(prm.cnr)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not get container size: %w", err)
|
||||
|
@ -38,7 +36,7 @@ func (s *Shard) ContainerSize(prm *ContainerSizePrm) (*ContainerSizeRes, error)
|
|||
}
|
||||
|
||||
func ContainerSize(s *Shard, cnr cid.ID) (uint64, error) {
|
||||
res, err := s.ContainerSize(&ContainerSizePrm{cnr: cnr})
|
||||
res, err := s.ContainerSize(ContainerSizePrm{cnr: cnr})
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
|
|
@ -85,20 +85,29 @@ func TestRefillMetabase(t *testing.T) {
|
|||
var putPrm PutPrm
|
||||
|
||||
for _, v := range mObjs {
|
||||
_, err := sh.Put(putPrm.WithObject(v.obj))
|
||||
putPrm.WithObject(v.obj)
|
||||
|
||||
_, err := sh.Put(putPrm)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
_, err = sh.Put(putPrm.WithObject(tombObj))
|
||||
putPrm.WithObject(tombObj)
|
||||
|
||||
_, err = sh.Put(putPrm)
|
||||
require.NoError(t, err)
|
||||
|
||||
_, err = sh.Inhume(new(InhumePrm).WithTarget(object.AddressOf(tombObj), tombMembers...))
|
||||
var inhumePrm InhumePrm
|
||||
inhumePrm.WithTarget(object.AddressOf(tombObj), tombMembers...)
|
||||
|
||||
_, err = sh.Inhume(inhumePrm)
|
||||
require.NoError(t, err)
|
||||
|
||||
var headPrm HeadPrm
|
||||
|
||||
checkObj := func(addr oid.Address, expObj *objectSDK.Object) {
|
||||
res, err := sh.Head(headPrm.WithAddress(addr))
|
||||
headPrm.WithAddress(addr)
|
||||
|
||||
res, err := sh.Head(headPrm)
|
||||
|
||||
if expObj == nil {
|
||||
require.ErrorAs(t, err, new(apistatus.ObjectNotFound))
|
||||
|
@ -121,7 +130,9 @@ func TestRefillMetabase(t *testing.T) {
|
|||
|
||||
checkTombMembers := func(exists bool) {
|
||||
for _, member := range tombMembers {
|
||||
_, err := sh.Head(headPrm.WithAddress(member))
|
||||
headPrm.WithAddress(member)
|
||||
|
||||
_, err := sh.Head(headPrm)
|
||||
|
||||
if exists {
|
||||
require.ErrorAs(t, err, new(apistatus.ObjectAlreadyRemoved))
|
||||
|
|
|
@ -20,17 +20,15 @@ type DeleteRes struct{}
|
|||
// WithAddresses is a Delete option to set the addresses of the objects to delete.
|
||||
//
|
||||
// Option is required.
|
||||
func (p *DeletePrm) WithAddresses(addr ...oid.Address) *DeletePrm {
|
||||
func (p *DeletePrm) WithAddresses(addr ...oid.Address) {
|
||||
if p != nil {
|
||||
p.addr = append(p.addr, addr...)
|
||||
}
|
||||
|
||||
return p
|
||||
}
|
||||
|
||||
// Delete removes data from the shard's writeCache, metaBase and
|
||||
// blobStor.
|
||||
func (s *Shard) Delete(prm *DeletePrm) (*DeleteRes, error) {
|
||||
func (s *Shard) Delete(prm DeletePrm) (*DeleteRes, error) {
|
||||
if s.GetMode() != ModeReadWrite {
|
||||
return nil, ErrReadOnlyMode
|
||||
}
|
||||
|
|
|
@ -30,8 +30,8 @@ func testShardDelete(t *testing.T, hasWriteCache bool) {
|
|||
obj := generateObjectWithCID(t, cnr)
|
||||
addAttribute(obj, "foo", "bar")
|
||||
|
||||
putPrm := new(shard.PutPrm)
|
||||
getPrm := new(shard.GetPrm)
|
||||
var putPrm shard.PutPrm
|
||||
var getPrm shard.GetPrm
|
||||
|
||||
t.Run("big object", func(t *testing.T) {
|
||||
addPayload(obj, 1<<20)
|
||||
|
@ -39,7 +39,7 @@ func testShardDelete(t *testing.T, hasWriteCache bool) {
|
|||
putPrm.WithObject(obj)
|
||||
getPrm.WithAddress(object.AddressOf(obj))
|
||||
|
||||
delPrm := new(shard.DeletePrm)
|
||||
var delPrm shard.DeletePrm
|
||||
delPrm.WithAddresses(object.AddressOf(obj))
|
||||
|
||||
_, err := sh.Put(putPrm)
|
||||
|
@ -62,7 +62,7 @@ func testShardDelete(t *testing.T, hasWriteCache bool) {
|
|||
putPrm.WithObject(obj)
|
||||
getPrm.WithAddress(object.AddressOf(obj))
|
||||
|
||||
delPrm := new(shard.DeletePrm)
|
||||
var delPrm shard.DeletePrm
|
||||
delPrm.WithAddresses(object.AddressOf(obj))
|
||||
|
||||
_, err := sh.Put(putPrm)
|
||||
|
|
|
@ -20,23 +20,20 @@ type DumpPrm struct {
|
|||
}
|
||||
|
||||
// WithPath is an Dump option to set the destination path.
|
||||
func (p *DumpPrm) WithPath(path string) *DumpPrm {
|
||||
func (p *DumpPrm) WithPath(path string) {
|
||||
p.path = path
|
||||
return p
|
||||
}
|
||||
|
||||
// WithStream is an Dump option to set the destination stream.
|
||||
// It takes priority over `path` option.
|
||||
func (p *DumpPrm) WithStream(r io.Writer) *DumpPrm {
|
||||
func (p *DumpPrm) WithStream(r io.Writer) {
|
||||
p.stream = r
|
||||
return p
|
||||
}
|
||||
|
||||
// WithIgnoreErrors is an Dump option to allow ignore all errors during iteration.
|
||||
// This includes invalid blobovniczas as well as corrupted objects.
|
||||
func (p *DumpPrm) WithIgnoreErrors(ignore bool) *DumpPrm {
|
||||
func (p *DumpPrm) WithIgnoreErrors(ignore bool) {
|
||||
p.ignoreErrors = ignore
|
||||
return p
|
||||
}
|
||||
|
||||
// DumpRes groups the result fields of Dump operation.
|
||||
|
@ -54,7 +51,7 @@ var ErrMustBeReadOnly = errors.New("shard must be in read-only mode")
|
|||
// Dump dumps all objects from the shard to a file or stream.
|
||||
//
|
||||
// Returns any error encountered.
|
||||
func (s *Shard) Dump(prm *DumpPrm) (*DumpRes, error) {
|
||||
func (s *Shard) Dump(prm DumpPrm) (*DumpRes, error) {
|
||||
s.m.RLock()
|
||||
defer s.m.RUnlock()
|
||||
|
||||
|
|
|
@ -54,7 +54,8 @@ func testDump(t *testing.T, objCount int, hasWriteCache bool) {
|
|||
defer releaseShard(sh, t)
|
||||
|
||||
out := filepath.Join(t.TempDir(), "dump")
|
||||
prm := new(shard.DumpPrm).WithPath(out)
|
||||
var prm shard.DumpPrm
|
||||
prm.WithPath(out)
|
||||
|
||||
t.Run("must be read-only", func(t *testing.T) {
|
||||
_, err := sh.Dump(prm)
|
||||
|
@ -63,7 +64,10 @@ func testDump(t *testing.T, objCount int, hasWriteCache bool) {
|
|||
|
||||
require.NoError(t, sh.SetMode(shard.ModeReadOnly))
|
||||
outEmpty := out + ".empty"
|
||||
res, err := sh.Dump(new(shard.DumpPrm).WithPath(outEmpty))
|
||||
var dumpPrm shard.DumpPrm
|
||||
dumpPrm.WithPath(outEmpty)
|
||||
|
||||
res, err := sh.Dump(dumpPrm)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 0, res.Count())
|
||||
require.NoError(t, sh.SetMode(shard.ModeReadWrite))
|
||||
|
@ -90,7 +94,8 @@ func testDump(t *testing.T, objCount int, hasWriteCache bool) {
|
|||
obj := generateObjectWithPayload(cnr, data)
|
||||
objects[i] = obj
|
||||
|
||||
prm := new(shard.PutPrm).WithObject(objects[i])
|
||||
var prm shard.PutPrm
|
||||
prm.WithObject(objects[i])
|
||||
_, err := sh.Put(prm)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
@ -98,7 +103,10 @@ func testDump(t *testing.T, objCount int, hasWriteCache bool) {
|
|||
require.NoError(t, sh.SetMode(shard.ModeReadOnly))
|
||||
|
||||
t.Run("invalid path", func(t *testing.T) {
|
||||
_, err := sh.Dump(new(shard.DumpPrm).WithPath("\x00"))
|
||||
var dumpPrm shard.DumpPrm
|
||||
dumpPrm.WithPath("\x00")
|
||||
|
||||
_, err := sh.Dump(dumpPrm)
|
||||
require.Error(t, err)
|
||||
})
|
||||
|
||||
|
@ -111,13 +119,15 @@ func testDump(t *testing.T, objCount int, hasWriteCache bool) {
|
|||
defer releaseShard(sh, t)
|
||||
|
||||
t.Run("empty dump", func(t *testing.T) {
|
||||
res, err := sh.Restore(new(shard.RestorePrm).WithPath(outEmpty))
|
||||
var restorePrm shard.RestorePrm
|
||||
restorePrm.WithPath(outEmpty)
|
||||
res, err := sh.Restore(restorePrm)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 0, res.Count())
|
||||
})
|
||||
|
||||
t.Run("invalid path", func(t *testing.T) {
|
||||
_, err := sh.Restore(new(shard.RestorePrm))
|
||||
_, err := sh.Restore(*new(shard.RestorePrm))
|
||||
require.ErrorIs(t, err, os.ErrNotExist)
|
||||
})
|
||||
|
||||
|
@ -126,7 +136,10 @@ func testDump(t *testing.T, objCount int, hasWriteCache bool) {
|
|||
out := out + ".wrongmagic"
|
||||
require.NoError(t, os.WriteFile(out, []byte{0, 0, 0, 0}, os.ModePerm))
|
||||
|
||||
_, err := sh.Restore(new(shard.RestorePrm).WithPath(out))
|
||||
var restorePrm shard.RestorePrm
|
||||
restorePrm.WithPath(out)
|
||||
|
||||
_, err := sh.Restore(restorePrm)
|
||||
require.ErrorIs(t, err, shard.ErrInvalidMagic)
|
||||
})
|
||||
|
||||
|
@ -138,7 +151,10 @@ func testDump(t *testing.T, objCount int, hasWriteCache bool) {
|
|||
fileData := append(fileData, 1)
|
||||
require.NoError(t, os.WriteFile(out, fileData, os.ModePerm))
|
||||
|
||||
_, err := sh.Restore(new(shard.RestorePrm).WithPath(out))
|
||||
var restorePrm shard.RestorePrm
|
||||
restorePrm.WithPath(out)
|
||||
|
||||
_, err := sh.Restore(restorePrm)
|
||||
require.ErrorIs(t, err, io.ErrUnexpectedEOF)
|
||||
})
|
||||
t.Run("incomplete object data", func(t *testing.T) {
|
||||
|
@ -146,7 +162,10 @@ func testDump(t *testing.T, objCount int, hasWriteCache bool) {
|
|||
fileData := append(fileData, 1, 0, 0, 0)
|
||||
require.NoError(t, os.WriteFile(out, fileData, os.ModePerm))
|
||||
|
||||
_, err := sh.Restore(new(shard.RestorePrm).WithPath(out))
|
||||
var restorePrm shard.RestorePrm
|
||||
restorePrm.WithPath(out)
|
||||
|
||||
_, err := sh.Restore(restorePrm)
|
||||
require.ErrorIs(t, err, io.EOF)
|
||||
})
|
||||
t.Run("invalid object", func(t *testing.T) {
|
||||
|
@ -154,14 +173,21 @@ func testDump(t *testing.T, objCount int, hasWriteCache bool) {
|
|||
fileData := append(fileData, 1, 0, 0, 0, 0xFF, 4, 0, 0, 0, 1, 2, 3, 4)
|
||||
require.NoError(t, os.WriteFile(out, fileData, os.ModePerm))
|
||||
|
||||
_, err := sh.Restore(new(shard.RestorePrm).WithPath(out))
|
||||
var restorePrm shard.RestorePrm
|
||||
restorePrm.WithPath(out)
|
||||
|
||||
_, err := sh.Restore(restorePrm)
|
||||
require.Error(t, err)
|
||||
|
||||
t.Run("skip errors", func(t *testing.T) {
|
||||
sh := newCustomShard(t, filepath.Join(t.TempDir(), "ignore"), false, nil, nil)
|
||||
defer releaseShard(sh, t)
|
||||
|
||||
res, err := sh.Restore(new(shard.RestorePrm).WithPath(out).WithIgnoreErrors(true))
|
||||
var restorePrm shard.RestorePrm
|
||||
restorePrm.WithPath(out)
|
||||
restorePrm.WithIgnoreErrors(true)
|
||||
|
||||
res, err := sh.Restore(restorePrm)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, objCount, res.Count())
|
||||
require.Equal(t, 2, res.FailCount())
|
||||
|
@ -169,7 +195,8 @@ func testDump(t *testing.T, objCount int, hasWriteCache bool) {
|
|||
})
|
||||
})
|
||||
|
||||
prm := new(shard.RestorePrm).WithPath(out)
|
||||
var prm shard.RestorePrm
|
||||
prm.WithPath(out)
|
||||
t.Run("must allow write", func(t *testing.T) {
|
||||
require.NoError(t, sh.SetMode(shard.ModeReadOnly))
|
||||
|
||||
|
@ -197,7 +224,8 @@ func TestStream(t *testing.T) {
|
|||
obj := generateObjectWithCID(t, cnr)
|
||||
objects[i] = obj
|
||||
|
||||
prm := new(shard.PutPrm).WithObject(objects[i])
|
||||
var prm shard.PutPrm
|
||||
prm.WithObject(objects[i])
|
||||
_, err := sh1.Put(prm)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
@ -208,14 +236,20 @@ func TestStream(t *testing.T) {
|
|||
finish := make(chan struct{})
|
||||
|
||||
go func() {
|
||||
res, err := sh1.Dump(new(shard.DumpPrm).WithStream(w))
|
||||
var dumpPrm shard.DumpPrm
|
||||
dumpPrm.WithStream(w)
|
||||
|
||||
res, err := sh1.Dump(dumpPrm)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, objCount, res.Count())
|
||||
require.NoError(t, w.Close())
|
||||
close(finish)
|
||||
}()
|
||||
|
||||
checkRestore(t, sh2, new(shard.RestorePrm).WithStream(r), objects)
|
||||
var restorePrm shard.RestorePrm
|
||||
restorePrm.WithStream(r)
|
||||
|
||||
checkRestore(t, sh2, restorePrm, objects)
|
||||
require.Eventually(t, func() bool {
|
||||
select {
|
||||
case <-finish:
|
||||
|
@ -226,13 +260,16 @@ func TestStream(t *testing.T) {
|
|||
}, time.Second, time.Millisecond)
|
||||
}
|
||||
|
||||
func checkRestore(t *testing.T, sh *shard.Shard, prm *shard.RestorePrm, objects []*objectSDK.Object) {
|
||||
func checkRestore(t *testing.T, sh *shard.Shard, prm shard.RestorePrm, objects []*objectSDK.Object) {
|
||||
res, err := sh.Restore(prm)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, len(objects), res.Count())
|
||||
|
||||
var getPrm shard.GetPrm
|
||||
|
||||
for i := range objects {
|
||||
res, err := sh.Get(new(shard.GetPrm).WithAddress(object.AddressOf(objects[i])))
|
||||
getPrm.WithAddress(object.AddressOf(objects[i]))
|
||||
res, err := sh.Get(getPrm)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, objects[i], res.Object())
|
||||
}
|
||||
|
@ -273,7 +310,8 @@ func TestDumpIgnoreErrors(t *testing.T) {
|
|||
obj := generateObjectWithPayload(cidtest.ID(), make([]byte, size))
|
||||
objects[i] = obj
|
||||
|
||||
prm := new(shard.PutPrm).WithObject(objects[i])
|
||||
var prm shard.PutPrm
|
||||
prm.WithObject(objects[i])
|
||||
_, err := sh.Put(prm)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
@ -345,7 +383,10 @@ func TestDumpIgnoreErrors(t *testing.T) {
|
|||
}
|
||||
|
||||
out := filepath.Join(t.TempDir(), "out.dump")
|
||||
res, err := sh.Dump(new(shard.DumpPrm).WithPath(out).WithIgnoreErrors(true))
|
||||
var dumpPrm shard.DumpPrm
|
||||
dumpPrm.WithPath(out)
|
||||
dumpPrm.WithIgnoreErrors(true)
|
||||
res, err := sh.Dump(dumpPrm)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, objCount, res.Count())
|
||||
}
|
||||
|
|
|
@ -37,7 +37,7 @@ func (p *ExistsRes) Exists() bool {
|
|||
// unambiguously determine the presence of an object.
|
||||
//
|
||||
// Returns an error of type apistatus.ObjectAlreadyRemoved if object has been marked as removed.
|
||||
func (s *Shard) Exists(prm *ExistsPrm) (*ExistsRes, error) {
|
||||
func (s *Shard) Exists(prm ExistsPrm) (*ExistsRes, error) {
|
||||
exists, err := meta.Exists(s.metaBase, prm.addr)
|
||||
if err != nil {
|
||||
// If the shard is in degraded mode, try to consult blobstor directly.
|
||||
|
|
|
@ -209,10 +209,11 @@ func (s *Shard) removeGarbage() {
|
|||
return
|
||||
}
|
||||
|
||||
var deletePrm DeletePrm
|
||||
deletePrm.WithAddresses(buf...)
|
||||
|
||||
// delete accumulated objects
|
||||
_, err = s.Delete(new(DeletePrm).
|
||||
WithAddresses(buf...),
|
||||
)
|
||||
_, err = s.Delete(deletePrm)
|
||||
if err != nil {
|
||||
s.log.Warn("could not delete the objects",
|
||||
zap.String("error", err.Error()),
|
||||
|
|
|
@ -34,19 +34,16 @@ type GetRes struct {
|
|||
// WithAddress is a Get option to set the address of the requested object.
|
||||
//
|
||||
// Option is required.
|
||||
func (p *GetPrm) WithAddress(addr oid.Address) *GetPrm {
|
||||
func (p *GetPrm) WithAddress(addr oid.Address) {
|
||||
if p != nil {
|
||||
p.addr = addr
|
||||
}
|
||||
|
||||
return p
|
||||
}
|
||||
|
||||
// WithIgnoreMeta is a Get option try to fetch object from blobstor directly,
|
||||
// without accessing metabase.
|
||||
func (p *GetPrm) WithIgnoreMeta(ignore bool) *GetPrm {
|
||||
func (p *GetPrm) WithIgnoreMeta(ignore bool) {
|
||||
p.skipMeta = ignore
|
||||
return p
|
||||
}
|
||||
|
||||
// Object returns the requested object.
|
||||
|
@ -66,7 +63,7 @@ func (r *GetRes) HasMeta() bool {
|
|||
//
|
||||
// Returns an error of type apistatus.ObjectNotFound if the requested object is missing in shard.
|
||||
// Returns an error of type apistatus.ObjectAlreadyRemoved if the requested object has been marked as removed in shard.
|
||||
func (s *Shard) Get(prm *GetPrm) (*GetRes, error) {
|
||||
func (s *Shard) Get(prm GetPrm) (*GetRes, error) {
|
||||
var big, small storFetcher
|
||||
|
||||
big = func(stor *blobstor.BlobStor, _ *blobovnicza.ID) (*objectSDK.Object, error) {
|
||||
|
|
|
@ -28,8 +28,8 @@ func testShardGet(t *testing.T, hasWriteCache bool) {
|
|||
sh := newShard(t, hasWriteCache)
|
||||
defer releaseShard(sh, t)
|
||||
|
||||
putPrm := new(shard.PutPrm)
|
||||
getPrm := new(shard.GetPrm)
|
||||
var putPrm shard.PutPrm
|
||||
var getPrm shard.GetPrm
|
||||
|
||||
t.Run("small object", func(t *testing.T) {
|
||||
obj := generateObject(t)
|
||||
|
@ -111,7 +111,7 @@ func testShardGet(t *testing.T, hasWriteCache bool) {
|
|||
})
|
||||
}
|
||||
|
||||
func testGet(t *testing.T, sh *shard.Shard, getPrm *shard.GetPrm, hasWriteCache bool) (*shard.GetRes, error) {
|
||||
func testGet(t *testing.T, sh *shard.Shard, getPrm shard.GetPrm, hasWriteCache bool) (*shard.GetRes, error) {
|
||||
res, err := sh.Get(getPrm)
|
||||
if hasWriteCache {
|
||||
require.Eventually(t, func() bool {
|
||||
|
|
|
@ -23,23 +23,19 @@ type HeadRes struct {
|
|||
// WithAddress is a Head option to set the address of the requested object.
|
||||
//
|
||||
// Option is required.
|
||||
func (p *HeadPrm) WithAddress(addr oid.Address) *HeadPrm {
|
||||
func (p *HeadPrm) WithAddress(addr oid.Address) {
|
||||
if p != nil {
|
||||
p.addr = addr
|
||||
}
|
||||
|
||||
return p
|
||||
}
|
||||
|
||||
// WithRaw is a Head option to set raw flag value. If flag is unset, then Head
|
||||
// returns header of virtual object, otherwise it returns SplitInfo of virtual
|
||||
// object.
|
||||
func (p *HeadPrm) WithRaw(raw bool) *HeadPrm {
|
||||
func (p *HeadPrm) WithRaw(raw bool) {
|
||||
if p != nil {
|
||||
p.raw = raw
|
||||
}
|
||||
|
||||
return p
|
||||
}
|
||||
|
||||
// Object returns the requested object header.
|
||||
|
@ -53,7 +49,7 @@ func (r *HeadRes) Object() *objectSDK.Object {
|
|||
//
|
||||
// Returns an error of type apistatus.ObjectNotFound if object is missing in Shard.
|
||||
// Returns an error of type apistatus.ObjectAlreadyRemoved if the requested object has been marked as removed in shard.
|
||||
func (s *Shard) Head(prm *HeadPrm) (*HeadRes, error) {
|
||||
func (s *Shard) Head(prm HeadPrm) (*HeadRes, error) {
|
||||
// object can be saved in write-cache (if enabled) or in metabase
|
||||
|
||||
if s.hasWriteCache() {
|
||||
|
|
|
@ -26,8 +26,8 @@ func testShardHead(t *testing.T, hasWriteCache bool) {
|
|||
sh := newShard(t, hasWriteCache)
|
||||
defer releaseShard(sh, t)
|
||||
|
||||
putPrm := new(shard.PutPrm)
|
||||
headPrm := new(shard.HeadPrm)
|
||||
var putPrm shard.PutPrm
|
||||
var headPrm shard.HeadPrm
|
||||
|
||||
t.Run("regular object", func(t *testing.T) {
|
||||
obj := generateObject(t)
|
||||
|
@ -80,7 +80,7 @@ func testShardHead(t *testing.T, hasWriteCache bool) {
|
|||
})
|
||||
}
|
||||
|
||||
func testHead(t *testing.T, sh *shard.Shard, headPrm *shard.HeadPrm, hasWriteCache bool) (*shard.HeadRes, error) {
|
||||
func testHead(t *testing.T, sh *shard.Shard, headPrm shard.HeadPrm, hasWriteCache bool) (*shard.HeadRes, error) {
|
||||
res, err := sh.Head(headPrm)
|
||||
if hasWriteCache {
|
||||
require.Eventually(t, func() bool {
|
||||
|
|
|
@ -22,25 +22,21 @@ type InhumeRes struct{}
|
|||
//
|
||||
// tombstone should not be nil, addr should not be empty.
|
||||
// Should not be called along with MarkAsGarbage.
|
||||
func (p *InhumePrm) WithTarget(tombstone oid.Address, addrs ...oid.Address) *InhumePrm {
|
||||
func (p *InhumePrm) WithTarget(tombstone oid.Address, addrs ...oid.Address) {
|
||||
if p != nil {
|
||||
p.target = addrs
|
||||
p.tombstone = &tombstone
|
||||
}
|
||||
|
||||
return p
|
||||
}
|
||||
|
||||
// MarkAsGarbage marks object to be physically removed from shard.
|
||||
//
|
||||
// Should not be called along with WithTarget.
|
||||
func (p *InhumePrm) MarkAsGarbage(addr ...oid.Address) *InhumePrm {
|
||||
func (p *InhumePrm) MarkAsGarbage(addr ...oid.Address) {
|
||||
if p != nil {
|
||||
p.target = addr
|
||||
p.tombstone = nil
|
||||
}
|
||||
|
||||
return p
|
||||
}
|
||||
|
||||
// Inhume calls metabase. Inhume method to mark object as removed. It won't be
|
||||
|
@ -50,7 +46,7 @@ func (p *InhumePrm) MarkAsGarbage(addr ...oid.Address) *InhumePrm {
|
|||
// if at least one object is locked.
|
||||
//
|
||||
// Returns ErrReadOnlyMode error if shard is in "read-only" mode.
|
||||
func (s *Shard) Inhume(prm *InhumePrm) (*InhumeRes, error) {
|
||||
func (s *Shard) Inhume(prm InhumePrm) (*InhumeRes, error) {
|
||||
if s.GetMode() != ModeReadWrite {
|
||||
return nil, ErrReadOnlyMode
|
||||
}
|
||||
|
|
|
@ -31,13 +31,13 @@ func testShardInhume(t *testing.T, hasWriteCache bool) {
|
|||
|
||||
ts := generateObjectWithCID(t, cnr)
|
||||
|
||||
putPrm := new(shard.PutPrm)
|
||||
var putPrm shard.PutPrm
|
||||
putPrm.WithObject(obj)
|
||||
|
||||
inhPrm := new(shard.InhumePrm)
|
||||
var inhPrm shard.InhumePrm
|
||||
inhPrm.WithTarget(object.AddressOf(ts), object.AddressOf(obj))
|
||||
|
||||
getPrm := new(shard.GetPrm)
|
||||
var getPrm shard.GetPrm
|
||||
getPrm.WithAddress(object.AddressOf(obj))
|
||||
|
||||
_, err := sh.Put(putPrm)
|
||||
|
|
|
@ -41,17 +41,15 @@ type ListWithCursorRes struct {
|
|||
}
|
||||
|
||||
// WithCount sets maximum amount of addresses that ListWithCursor should return.
|
||||
func (p *ListWithCursorPrm) WithCount(count uint32) *ListWithCursorPrm {
|
||||
func (p *ListWithCursorPrm) WithCount(count uint32) {
|
||||
p.count = count
|
||||
return p
|
||||
}
|
||||
|
||||
// WithCursor sets cursor for ListWithCursor operation. For initial request,
|
||||
// ignore this param or use nil value. For consecutive requests, use value
|
||||
// from ListWithCursorRes.
|
||||
func (p *ListWithCursorPrm) WithCursor(cursor *Cursor) *ListWithCursorPrm {
|
||||
func (p *ListWithCursorPrm) WithCursor(cursor *Cursor) {
|
||||
p.cursor = cursor
|
||||
return p
|
||||
}
|
||||
|
||||
// AddressList returns addresses selected by ListWithCursor operation.
|
||||
|
@ -91,7 +89,7 @@ func (s *Shard) List() (*SelectRes, error) {
|
|||
return res, nil
|
||||
}
|
||||
|
||||
func (s *Shard) ListContainers(_ *ListContainersPrm) (*ListContainersRes, error) {
|
||||
func (s *Shard) ListContainers(_ ListContainersPrm) (*ListContainersRes, error) {
|
||||
containers, err := s.metaBase.Containers()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not get list of containers: %w", err)
|
||||
|
@ -103,7 +101,7 @@ func (s *Shard) ListContainers(_ *ListContainersPrm) (*ListContainersRes, error)
|
|||
}
|
||||
|
||||
func ListContainers(s *Shard) ([]cid.ID, error) {
|
||||
res, err := s.ListContainers(&ListContainersPrm{})
|
||||
res, err := s.ListContainers(ListContainersPrm{})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -117,7 +115,7 @@ func ListContainers(s *Shard) ([]cid.ID, error) {
|
|||
//
|
||||
// Returns ErrEndOfListing if there are no more objects to return or count
|
||||
// parameter set to zero.
|
||||
func (s *Shard) ListWithCursor(prm *ListWithCursorPrm) (*ListWithCursorRes, error) {
|
||||
func (s *Shard) ListWithCursor(prm ListWithCursorPrm) (*ListWithCursorRes, error) {
|
||||
var metaPrm meta.ListPrm
|
||||
metaPrm.WithCount(prm.count)
|
||||
metaPrm.WithCursor(prm.cursor)
|
||||
|
@ -140,7 +138,9 @@ func (s *Shard) ListWithCursor(prm *ListWithCursorPrm) (*ListWithCursorRes, erro
|
|||
// Returns ErrEndOfListing if there are no more objects to return or count
|
||||
// parameter set to zero.
|
||||
func ListWithCursor(s *Shard, count uint32, cursor *Cursor) ([]oid.Address, *Cursor, error) {
|
||||
prm := new(ListWithCursorPrm).WithCount(count).WithCursor(cursor)
|
||||
var prm ListWithCursorPrm
|
||||
prm.WithCount(count)
|
||||
prm.WithCursor(cursor)
|
||||
res, err := s.ListWithCursor(prm)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
|
|
|
@ -32,7 +32,7 @@ func testShardList(t *testing.T, sh *shard.Shard) {
|
|||
const N = 5
|
||||
|
||||
objs := make(map[string]int)
|
||||
putPrm := new(shard.PutPrm)
|
||||
var putPrm shard.PutPrm
|
||||
|
||||
for i := 0; i < C; i++ {
|
||||
cnr := cidtest.ID()
|
||||
|
|
|
@ -16,17 +16,15 @@ type ToMoveItRes struct{}
|
|||
|
||||
// WithAddress sets object address that should be marked to move into another
|
||||
// shard.
|
||||
func (p *ToMoveItPrm) WithAddress(addr oid.Address) *ToMoveItPrm {
|
||||
func (p *ToMoveItPrm) WithAddress(addr oid.Address) {
|
||||
if p != nil {
|
||||
p.addr = addr
|
||||
}
|
||||
|
||||
return p
|
||||
}
|
||||
|
||||
// ToMoveIt calls metabase.ToMoveIt method to mark object as relocatable to
|
||||
// another shard.
|
||||
func (s *Shard) ToMoveIt(prm *ToMoveItPrm) (*ToMoveItRes, error) {
|
||||
func (s *Shard) ToMoveIt(prm ToMoveItPrm) (*ToMoveItRes, error) {
|
||||
if s.GetMode() != ModeReadWrite {
|
||||
return nil, ErrReadOnlyMode
|
||||
}
|
||||
|
|
|
@ -18,12 +18,10 @@ type PutPrm struct {
|
|||
type PutRes struct{}
|
||||
|
||||
// WithObject is a Put option to set object to save.
|
||||
func (p *PutPrm) WithObject(obj *object.Object) *PutPrm {
|
||||
func (p *PutPrm) WithObject(obj *object.Object) {
|
||||
if p != nil {
|
||||
p.obj = obj
|
||||
}
|
||||
|
||||
return p
|
||||
}
|
||||
|
||||
// Put saves the object in shard.
|
||||
|
@ -32,7 +30,7 @@ func (p *PutPrm) WithObject(obj *object.Object) *PutPrm {
|
|||
// did not allow to completely save the object.
|
||||
//
|
||||
// Returns ErrReadOnlyMode error if shard is in "read-only" mode.
|
||||
func (s *Shard) Put(prm *PutPrm) (*PutRes, error) {
|
||||
func (s *Shard) Put(prm PutPrm) (*PutRes, error) {
|
||||
if s.GetMode() != ModeReadWrite {
|
||||
return nil, ErrReadOnlyMode
|
||||
}
|
||||
|
|
|
@ -27,28 +27,23 @@ type RngRes struct {
|
|||
// WithAddress is a Rng option to set the address of the requested object.
|
||||
//
|
||||
// Option is required.
|
||||
func (p *RngPrm) WithAddress(addr oid.Address) *RngPrm {
|
||||
func (p *RngPrm) WithAddress(addr oid.Address) {
|
||||
if p != nil {
|
||||
p.addr = addr
|
||||
}
|
||||
|
||||
return p
|
||||
}
|
||||
|
||||
// WithRange is a GetRange option to set range of requested payload data.
|
||||
func (p *RngPrm) WithRange(off uint64, ln uint64) *RngPrm {
|
||||
func (p *RngPrm) WithRange(off uint64, ln uint64) {
|
||||
if p != nil {
|
||||
p.off, p.ln = off, ln
|
||||
}
|
||||
|
||||
return p
|
||||
}
|
||||
|
||||
// WithIgnoreMeta is a Get option try to fetch object from blobstor directly,
|
||||
// without accessing metabase.
|
||||
func (p *RngPrm) WithIgnoreMeta(ignore bool) *RngPrm {
|
||||
func (p *RngPrm) WithIgnoreMeta(ignore bool) {
|
||||
p.skipMeta = ignore
|
||||
return p
|
||||
}
|
||||
|
||||
// Object returns the requested object part.
|
||||
|
@ -71,7 +66,7 @@ func (r *RngRes) HasMeta() bool {
|
|||
// Returns ErrRangeOutOfBounds if the requested object range is out of bounds.
|
||||
// Returns an error of type apistatus.ObjectNotFound if the requested object is missing.
|
||||
// Returns an error of type apistatus.ObjectAlreadyRemoved if the requested object has been marked as removed in shard.
|
||||
func (s *Shard) GetRange(prm *RngPrm) (*RngRes, error) {
|
||||
func (s *Shard) GetRange(prm RngPrm) (*RngRes, error) {
|
||||
var big, small storFetcher
|
||||
|
||||
rng := object.NewRange()
|
||||
|
|
|
@ -21,23 +21,20 @@ type RestorePrm struct {
|
|||
}
|
||||
|
||||
// WithPath is a Restore option to set the destination path.
|
||||
func (p *RestorePrm) WithPath(path string) *RestorePrm {
|
||||
func (p *RestorePrm) WithPath(path string) {
|
||||
p.path = path
|
||||
return p
|
||||
}
|
||||
|
||||
// WithStream is a Restore option to set the stream to read objects from.
|
||||
// It takes priority over `WithPath` option.
|
||||
func (p *RestorePrm) WithStream(r io.Reader) *RestorePrm {
|
||||
func (p *RestorePrm) WithStream(r io.Reader) {
|
||||
p.stream = r
|
||||
return p
|
||||
}
|
||||
|
||||
// WithIgnoreErrors is a Restore option which allows to ignore errors encountered during restore.
|
||||
// Corrupted objects will not be processed.
|
||||
func (p *RestorePrm) WithIgnoreErrors(ignore bool) *RestorePrm {
|
||||
func (p *RestorePrm) WithIgnoreErrors(ignore bool) {
|
||||
p.ignoreErrors = ignore
|
||||
return p
|
||||
}
|
||||
|
||||
// RestoreRes groups the result fields of Restore operation.
|
||||
|
@ -59,7 +56,7 @@ func (r *RestoreRes) FailCount() int {
|
|||
// Restore restores objects from the dump prepared by Dump.
|
||||
//
|
||||
// Returns any error encountered.
|
||||
func (s *Shard) Restore(prm *RestorePrm) (*RestoreRes, error) {
|
||||
func (s *Shard) Restore(prm RestorePrm) (*RestoreRes, error) {
|
||||
// Disallow changing mode during restore.
|
||||
s.m.RLock()
|
||||
defer s.m.RUnlock()
|
||||
|
@ -85,6 +82,8 @@ func (s *Shard) Restore(prm *RestorePrm) (*RestoreRes, error) {
|
|||
return nil, ErrInvalidMagic
|
||||
}
|
||||
|
||||
var putPrm PutPrm
|
||||
|
||||
var count, failCount int
|
||||
var data []byte
|
||||
var size [4]byte
|
||||
|
@ -121,7 +120,8 @@ func (s *Shard) Restore(prm *RestorePrm) (*RestoreRes, error) {
|
|||
return nil, err
|
||||
}
|
||||
|
||||
_, err = s.Put(new(PutPrm).WithObject(obj))
|
||||
putPrm.WithObject(obj)
|
||||
_, err = s.Put(putPrm)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
@ -21,21 +21,17 @@ type SelectRes struct {
|
|||
}
|
||||
|
||||
// WithContainerID is a Select option to set the container id to search in.
|
||||
func (p *SelectPrm) WithContainerID(cnr cid.ID) *SelectPrm {
|
||||
func (p *SelectPrm) WithContainerID(cnr cid.ID) {
|
||||
if p != nil {
|
||||
p.cnr = cnr
|
||||
}
|
||||
|
||||
return p
|
||||
}
|
||||
|
||||
// WithFilters is a Select option to set the object filters.
|
||||
func (p *SelectPrm) WithFilters(fs object.SearchFilters) *SelectPrm {
|
||||
func (p *SelectPrm) WithFilters(fs object.SearchFilters) {
|
||||
if p != nil {
|
||||
p.filters = fs
|
||||
}
|
||||
|
||||
return p
|
||||
}
|
||||
|
||||
// AddressList returns list of addresses of the selected objects.
|
||||
|
@ -47,7 +43,7 @@ func (r *SelectRes) AddressList() []oid.Address {
|
|||
//
|
||||
// Returns any error encountered that
|
||||
// did not allow to completely select the objects.
|
||||
func (s *Shard) Select(prm *SelectPrm) (*SelectRes, error) {
|
||||
func (s *Shard) Select(prm SelectPrm) (*SelectRes, error) {
|
||||
addrList, err := meta.Select(s.metaBase, prm.cnr, prm.filters)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not select objects from metabase: %w", err)
|
||||
|
|
|
@ -37,8 +37,11 @@ func TestWriteCacheObjectLoss(t *testing.T) {
|
|||
|
||||
sh := newCustomShard(t, dir, true, wcOpts, nil)
|
||||
|
||||
var putPrm shard.PutPrm
|
||||
|
||||
for i := range objects {
|
||||
_, err := sh.Put(new(shard.PutPrm).WithObject(objects[i]))
|
||||
putPrm.WithObject(objects[i])
|
||||
_, err := sh.Put(putPrm)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
require.NoError(t, sh.Close())
|
||||
|
@ -46,8 +49,12 @@ func TestWriteCacheObjectLoss(t *testing.T) {
|
|||
sh = newCustomShard(t, dir, true, wcOpts, nil)
|
||||
defer releaseShard(sh, t)
|
||||
|
||||
var getPrm shard.GetPrm
|
||||
|
||||
for i := range objects {
|
||||
_, err := sh.Get(new(shard.GetPrm).WithAddress(object.AddressOf(objects[i])))
|
||||
getPrm.WithAddress(object.AddressOf(objects[i]))
|
||||
|
||||
_, err := sh.Get(getPrm)
|
||||
require.NoError(t, err, i)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -17,7 +17,7 @@ func (s *Server) DumpShard(_ context.Context, req *control.DumpShardRequest) (*c
|
|||
|
||||
shardID := shard.NewIDFromBytes(req.GetBody().GetShard_ID())
|
||||
|
||||
prm := new(shard.DumpPrm)
|
||||
var prm shard.DumpPrm
|
||||
prm.WithPath(req.GetBody().GetFilepath())
|
||||
prm.WithIgnoreErrors(req.GetBody().GetIgnoreErrors())
|
||||
|
||||
|
|
|
@ -17,7 +17,7 @@ func (s *Server) RestoreShard(_ context.Context, req *control.RestoreShardReques
|
|||
|
||||
shardID := shard.NewIDFromBytes(req.GetBody().GetShard_ID())
|
||||
|
||||
prm := new(shard.RestorePrm)
|
||||
var prm shard.RestorePrm
|
||||
prm.WithPath(req.GetBody().GetFilepath())
|
||||
prm.WithIgnoreErrors(req.GetBody().GetIgnoreErrors())
|
||||
|
||||
|
|
Loading…
Reference in a new issue