forked from TrueCloudLab/frostfs-node
[#1460] shard: Do not use pointers as the results
Signed-off-by: Pavel Karpy <carpawell@nspcc.ru>
This commit is contained in:
parent
9b2932609b
commit
7b6363f4c6
15 changed files with 64 additions and 65 deletions
|
@ -24,13 +24,13 @@ func (r ContainerSizeRes) Size() uint64 {
|
|||
return r.size
|
||||
}
|
||||
|
||||
func (s *Shard) ContainerSize(prm ContainerSizePrm) (*ContainerSizeRes, error) {
|
||||
func (s *Shard) ContainerSize(prm ContainerSizePrm) (ContainerSizeRes, error) {
|
||||
size, err := s.metaBase.ContainerSize(prm.cnr)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not get container size: %w", err)
|
||||
return ContainerSizeRes{}, fmt.Errorf("could not get container size: %w", err)
|
||||
}
|
||||
|
||||
return &ContainerSizeRes{
|
||||
return ContainerSizeRes{
|
||||
size: size,
|
||||
}, nil
|
||||
}
|
||||
|
|
|
@ -28,9 +28,9 @@ func (p *DeletePrm) WithAddresses(addr ...oid.Address) {
|
|||
|
||||
// Delete removes data from the shard's writeCache, metaBase and
|
||||
// blobStor.
|
||||
func (s *Shard) Delete(prm DeletePrm) (*DeleteRes, error) {
|
||||
func (s *Shard) Delete(prm DeletePrm) (DeleteRes, error) {
|
||||
if s.GetMode() != ModeReadWrite {
|
||||
return nil, ErrReadOnlyMode
|
||||
return DeleteRes{}, ErrReadOnlyMode
|
||||
}
|
||||
|
||||
ln := len(prm.addr)
|
||||
|
@ -63,7 +63,7 @@ func (s *Shard) Delete(prm DeletePrm) (*DeleteRes, error) {
|
|||
|
||||
err := meta.Delete(s.metaBase, prm.addr...)
|
||||
if err != nil {
|
||||
return nil, err // stop on metabase error ?
|
||||
return DeleteRes{}, err // stop on metabase error ?
|
||||
}
|
||||
|
||||
for i := range prm.addr { // delete small object
|
||||
|
@ -93,5 +93,5 @@ func (s *Shard) Delete(prm DeletePrm) (*DeleteRes, error) {
|
|||
}
|
||||
}
|
||||
|
||||
return nil, nil
|
||||
return DeleteRes{}, nil
|
||||
}
|
||||
|
|
|
@ -51,19 +51,19 @@ var ErrMustBeReadOnly = errors.New("shard must be in read-only mode")
|
|||
// Dump dumps all objects from the shard to a file or stream.
|
||||
//
|
||||
// Returns any error encountered.
|
||||
func (s *Shard) Dump(prm DumpPrm) (*DumpRes, error) {
|
||||
func (s *Shard) Dump(prm DumpPrm) (DumpRes, error) {
|
||||
s.m.RLock()
|
||||
defer s.m.RUnlock()
|
||||
|
||||
if s.info.Mode != ModeReadOnly {
|
||||
return nil, ErrMustBeReadOnly
|
||||
return DumpRes{}, ErrMustBeReadOnly
|
||||
}
|
||||
|
||||
w := prm.stream
|
||||
if w == nil {
|
||||
f, err := os.OpenFile(prm.path, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0640)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return DumpRes{}, err
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
|
@ -72,7 +72,7 @@ func (s *Shard) Dump(prm DumpPrm) (*DumpRes, error) {
|
|||
|
||||
_, err := w.Write(dumpMagic)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return DumpRes{}, err
|
||||
}
|
||||
|
||||
var count int
|
||||
|
@ -98,7 +98,7 @@ func (s *Shard) Dump(prm DumpPrm) (*DumpRes, error) {
|
|||
|
||||
err := s.writeCache.Iterate(iterPrm)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return DumpRes{}, err
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -125,8 +125,8 @@ func (s *Shard) Dump(prm DumpPrm) (*DumpRes, error) {
|
|||
})
|
||||
|
||||
if _, err := s.blobStor.Iterate(pi); err != nil {
|
||||
return nil, err
|
||||
return DumpRes{}, err
|
||||
}
|
||||
|
||||
return &DumpRes{count: count}, nil
|
||||
return DumpRes{count: count}, nil
|
||||
}
|
||||
|
|
|
@ -37,7 +37,7 @@ func (p ExistsRes) Exists() bool {
|
|||
// unambiguously determine the presence of an object.
|
||||
//
|
||||
// Returns an error of type apistatus.ObjectAlreadyRemoved if object has been marked as removed.
|
||||
func (s *Shard) Exists(prm ExistsPrm) (*ExistsRes, error) {
|
||||
func (s *Shard) Exists(prm ExistsPrm) (ExistsRes, error) {
|
||||
exists, err := meta.Exists(s.metaBase, prm.addr)
|
||||
if err != nil {
|
||||
// If the shard is in degraded mode, try to consult blobstor directly.
|
||||
|
@ -57,7 +57,7 @@ func (s *Shard) Exists(prm ExistsPrm) (*ExistsRes, error) {
|
|||
}
|
||||
}
|
||||
|
||||
return &ExistsRes{
|
||||
return ExistsRes{
|
||||
ex: exists,
|
||||
}, err
|
||||
}
|
||||
|
|
|
@ -47,12 +47,12 @@ func (p *GetPrm) WithIgnoreMeta(ignore bool) {
|
|||
}
|
||||
|
||||
// Object returns the requested object.
|
||||
func (r *GetRes) Object() *objectSDK.Object {
|
||||
func (r GetRes) Object() *objectSDK.Object {
|
||||
return r.obj
|
||||
}
|
||||
|
||||
// HasMeta returns true if info about the object was found in the metabase.
|
||||
func (r *GetRes) HasMeta() bool {
|
||||
func (r GetRes) HasMeta() bool {
|
||||
return r.hasMeta
|
||||
}
|
||||
|
||||
|
@ -63,7 +63,7 @@ func (r *GetRes) HasMeta() bool {
|
|||
//
|
||||
// Returns an error of type apistatus.ObjectNotFound if the requested object is missing in shard.
|
||||
// Returns an error of type apistatus.ObjectAlreadyRemoved if the requested object has been marked as removed in shard.
|
||||
func (s *Shard) Get(prm GetPrm) (*GetRes, error) {
|
||||
func (s *Shard) Get(prm GetPrm) (GetRes, error) {
|
||||
var big, small storFetcher
|
||||
|
||||
big = func(stor *blobstor.BlobStor, _ *blobovnicza.ID) (*objectSDK.Object, error) {
|
||||
|
@ -93,7 +93,7 @@ func (s *Shard) Get(prm GetPrm) (*GetRes, error) {
|
|||
|
||||
obj, hasMeta, err := s.fetchObjectData(prm.addr, prm.skipMeta, big, small)
|
||||
|
||||
return &GetRes{
|
||||
return GetRes{
|
||||
obj: obj,
|
||||
hasMeta: hasMeta,
|
||||
}, err
|
||||
|
|
|
@ -111,7 +111,7 @@ func testShardGet(t *testing.T, hasWriteCache bool) {
|
|||
})
|
||||
}
|
||||
|
||||
func testGet(t *testing.T, sh *shard.Shard, getPrm shard.GetPrm, hasWriteCache bool) (*shard.GetRes, error) {
|
||||
func testGet(t *testing.T, sh *shard.Shard, getPrm shard.GetPrm, hasWriteCache bool) (shard.GetRes, error) {
|
||||
res, err := sh.Get(getPrm)
|
||||
if hasWriteCache {
|
||||
require.Eventually(t, func() bool {
|
||||
|
|
|
@ -39,7 +39,7 @@ func (p *HeadPrm) WithRaw(raw bool) {
|
|||
}
|
||||
|
||||
// Object returns the requested object header.
|
||||
func (r *HeadRes) Object() *objectSDK.Object {
|
||||
func (r HeadRes) Object() *objectSDK.Object {
|
||||
return r.obj
|
||||
}
|
||||
|
||||
|
@ -49,19 +49,19 @@ func (r *HeadRes) Object() *objectSDK.Object {
|
|||
//
|
||||
// Returns an error of type apistatus.ObjectNotFound if object is missing in Shard.
|
||||
// Returns an error of type apistatus.ObjectAlreadyRemoved if the requested object has been marked as removed in shard.
|
||||
func (s *Shard) Head(prm HeadPrm) (*HeadRes, error) {
|
||||
func (s *Shard) Head(prm HeadPrm) (HeadRes, error) {
|
||||
// object can be saved in write-cache (if enabled) or in metabase
|
||||
|
||||
if s.hasWriteCache() {
|
||||
// try to read header from write-cache
|
||||
header, err := s.writeCache.Head(prm.addr)
|
||||
if err == nil {
|
||||
return &HeadRes{
|
||||
return HeadRes{
|
||||
obj: header,
|
||||
}, nil
|
||||
} else if !writecache.IsErrNotFound(err) {
|
||||
// in this case we think that object is presented in write-cache, but corrupted
|
||||
return nil, fmt.Errorf("could not read header from write-cache: %w", err)
|
||||
return HeadRes{}, fmt.Errorf("could not read header from write-cache: %w", err)
|
||||
}
|
||||
|
||||
// otherwise object seems to be flushed to metabase
|
||||
|
@ -73,10 +73,10 @@ func (s *Shard) Head(prm HeadPrm) (*HeadRes, error) {
|
|||
|
||||
res, err := s.metaBase.Get(headParams)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return HeadRes{}, err
|
||||
}
|
||||
|
||||
return &HeadRes{
|
||||
return HeadRes{
|
||||
obj: res.Header(),
|
||||
}, nil
|
||||
}
|
||||
|
|
|
@ -80,7 +80,7 @@ func testShardHead(t *testing.T, hasWriteCache bool) {
|
|||
})
|
||||
}
|
||||
|
||||
func testHead(t *testing.T, sh *shard.Shard, headPrm shard.HeadPrm, hasWriteCache bool) (*shard.HeadRes, error) {
|
||||
func testHead(t *testing.T, sh *shard.Shard, headPrm shard.HeadPrm, hasWriteCache bool) (shard.HeadRes, error) {
|
||||
res, err := sh.Head(headPrm)
|
||||
if hasWriteCache {
|
||||
require.Eventually(t, func() bool {
|
||||
|
|
|
@ -46,9 +46,9 @@ func (p *InhumePrm) MarkAsGarbage(addr ...oid.Address) {
|
|||
// if at least one object is locked.
|
||||
//
|
||||
// Returns ErrReadOnlyMode error if shard is in "read-only" mode.
|
||||
func (s *Shard) Inhume(prm InhumePrm) (*InhumeRes, error) {
|
||||
func (s *Shard) Inhume(prm InhumePrm) (InhumeRes, error) {
|
||||
if s.GetMode() != ModeReadWrite {
|
||||
return nil, ErrReadOnlyMode
|
||||
return InhumeRes{}, ErrReadOnlyMode
|
||||
}
|
||||
|
||||
if s.hasWriteCache() {
|
||||
|
@ -72,8 +72,8 @@ func (s *Shard) Inhume(prm InhumePrm) (*InhumeRes, error) {
|
|||
zap.String("error", err.Error()),
|
||||
)
|
||||
|
||||
return nil, fmt.Errorf("metabase inhume: %w", err)
|
||||
return InhumeRes{}, fmt.Errorf("metabase inhume: %w", err)
|
||||
}
|
||||
|
||||
return nil, nil
|
||||
return InhumeRes{}, nil
|
||||
}
|
||||
|
|
|
@ -24,7 +24,7 @@ type ListContainersRes struct {
|
|||
containers []cid.ID
|
||||
}
|
||||
|
||||
func (r *ListContainersRes) Containers() []cid.ID {
|
||||
func (r ListContainersRes) Containers() []cid.ID {
|
||||
return r.containers
|
||||
}
|
||||
|
||||
|
@ -63,13 +63,12 @@ func (r ListWithCursorRes) Cursor() *Cursor {
|
|||
}
|
||||
|
||||
// List returns all objects physically stored in the Shard.
|
||||
func (s *Shard) List() (*SelectRes, error) {
|
||||
func (s *Shard) List() (res SelectRes, err error) {
|
||||
lst, err := s.metaBase.Containers()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("can't list stored containers: %w", err)
|
||||
return res, fmt.Errorf("can't list stored containers: %w", err)
|
||||
}
|
||||
|
||||
res := new(SelectRes)
|
||||
filters := object.NewSearchFilters()
|
||||
filters.AddPhyFilter()
|
||||
|
||||
|
@ -89,13 +88,13 @@ func (s *Shard) List() (*SelectRes, error) {
|
|||
return res, nil
|
||||
}
|
||||
|
||||
func (s *Shard) ListContainers(_ ListContainersPrm) (*ListContainersRes, error) {
|
||||
func (s *Shard) ListContainers(_ ListContainersPrm) (ListContainersRes, error) {
|
||||
containers, err := s.metaBase.Containers()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not get list of containers: %w", err)
|
||||
return ListContainersRes{}, fmt.Errorf("could not get list of containers: %w", err)
|
||||
}
|
||||
|
||||
return &ListContainersRes{
|
||||
return ListContainersRes{
|
||||
containers: containers,
|
||||
}, nil
|
||||
}
|
||||
|
@ -115,16 +114,16 @@ func ListContainers(s *Shard) ([]cid.ID, error) {
|
|||
//
|
||||
// Returns ErrEndOfListing if there are no more objects to return or count
|
||||
// parameter set to zero.
|
||||
func (s *Shard) ListWithCursor(prm ListWithCursorPrm) (*ListWithCursorRes, error) {
|
||||
func (s *Shard) ListWithCursor(prm ListWithCursorPrm) (ListWithCursorRes, error) {
|
||||
var metaPrm meta.ListPrm
|
||||
metaPrm.WithCount(prm.count)
|
||||
metaPrm.WithCursor(prm.cursor)
|
||||
res, err := s.metaBase.ListWithCursor(metaPrm)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not get list of objects: %w", err)
|
||||
return ListWithCursorRes{}, fmt.Errorf("could not get list of objects: %w", err)
|
||||
}
|
||||
|
||||
return &ListWithCursorRes{
|
||||
return ListWithCursorRes{
|
||||
addrList: res.AddressList(),
|
||||
cursor: res.Cursor(),
|
||||
}, nil
|
||||
|
|
|
@ -24,9 +24,9 @@ func (p *ToMoveItPrm) WithAddress(addr oid.Address) {
|
|||
|
||||
// ToMoveIt calls metabase.ToMoveIt method to mark object as relocatable to
|
||||
// another shard.
|
||||
func (s *Shard) ToMoveIt(prm ToMoveItPrm) (*ToMoveItRes, error) {
|
||||
func (s *Shard) ToMoveIt(prm ToMoveItPrm) (ToMoveItRes, error) {
|
||||
if s.GetMode() != ModeReadWrite {
|
||||
return nil, ErrReadOnlyMode
|
||||
return ToMoveItRes{}, ErrReadOnlyMode
|
||||
}
|
||||
|
||||
err := meta.ToMoveIt(s.metaBase, prm.addr)
|
||||
|
@ -36,5 +36,5 @@ func (s *Shard) ToMoveIt(prm ToMoveItPrm) (*ToMoveItRes, error) {
|
|||
)
|
||||
}
|
||||
|
||||
return new(ToMoveItRes), nil
|
||||
return ToMoveItRes{}, nil
|
||||
}
|
||||
|
|
|
@ -30,9 +30,9 @@ func (p *PutPrm) WithObject(obj *object.Object) {
|
|||
// did not allow to completely save the object.
|
||||
//
|
||||
// Returns ErrReadOnlyMode error if shard is in "read-only" mode.
|
||||
func (s *Shard) Put(prm PutPrm) (*PutRes, error) {
|
||||
func (s *Shard) Put(prm PutPrm) (PutRes, error) {
|
||||
if s.GetMode() != ModeReadWrite {
|
||||
return nil, ErrReadOnlyMode
|
||||
return PutRes{}, ErrReadOnlyMode
|
||||
}
|
||||
|
||||
var putPrm blobstor.PutPrm // form Put parameters
|
||||
|
@ -43,7 +43,7 @@ func (s *Shard) Put(prm PutPrm) (*PutRes, error) {
|
|||
if s.hasWriteCache() {
|
||||
err := s.writeCache.Put(prm.obj)
|
||||
if err == nil {
|
||||
return nil, nil
|
||||
return PutRes{}, nil
|
||||
}
|
||||
|
||||
s.log.Debug("can't put message to writeCache, trying to blobStor",
|
||||
|
@ -56,15 +56,15 @@ func (s *Shard) Put(prm PutPrm) (*PutRes, error) {
|
|||
)
|
||||
|
||||
if res, err = s.blobStor.Put(putPrm); err != nil {
|
||||
return nil, fmt.Errorf("could not put object to BLOB storage: %w", err)
|
||||
return PutRes{}, fmt.Errorf("could not put object to BLOB storage: %w", err)
|
||||
}
|
||||
|
||||
// put to metabase
|
||||
if err := meta.Put(s.metaBase, prm.obj, res.BlobovniczaID()); err != nil {
|
||||
// may we need to handle this case in a special way
|
||||
// since the object has been successfully written to BlobStor
|
||||
return nil, fmt.Errorf("could not put object to metabase: %w", err)
|
||||
return PutRes{}, fmt.Errorf("could not put object to metabase: %w", err)
|
||||
}
|
||||
|
||||
return nil, nil
|
||||
return PutRes{}, nil
|
||||
}
|
||||
|
|
|
@ -66,7 +66,7 @@ func (r RngRes) HasMeta() bool {
|
|||
// Returns ErrRangeOutOfBounds if the requested object range is out of bounds.
|
||||
// Returns an error of type apistatus.ObjectNotFound if the requested object is missing.
|
||||
// Returns an error of type apistatus.ObjectAlreadyRemoved if the requested object has been marked as removed in shard.
|
||||
func (s *Shard) GetRange(prm RngPrm) (*RngRes, error) {
|
||||
func (s *Shard) GetRange(prm RngPrm) (RngRes, error) {
|
||||
var big, small storFetcher
|
||||
|
||||
rng := object.NewRange()
|
||||
|
@ -108,7 +108,7 @@ func (s *Shard) GetRange(prm RngPrm) (*RngRes, error) {
|
|||
|
||||
obj, hasMeta, err := s.fetchObjectData(prm.addr, prm.skipMeta, big, small)
|
||||
|
||||
return &RngRes{
|
||||
return RngRes{
|
||||
obj: obj,
|
||||
hasMeta: hasMeta,
|
||||
}, err
|
||||
|
|
|
@ -56,20 +56,20 @@ func (r RestoreRes) FailCount() int {
|
|||
// Restore restores objects from the dump prepared by Dump.
|
||||
//
|
||||
// Returns any error encountered.
|
||||
func (s *Shard) Restore(prm RestorePrm) (*RestoreRes, error) {
|
||||
func (s *Shard) Restore(prm RestorePrm) (RestoreRes, error) {
|
||||
// Disallow changing mode during restore.
|
||||
s.m.RLock()
|
||||
defer s.m.RUnlock()
|
||||
|
||||
if s.info.Mode != ModeReadWrite {
|
||||
return nil, ErrReadOnlyMode
|
||||
return RestoreRes{}, ErrReadOnlyMode
|
||||
}
|
||||
|
||||
r := prm.stream
|
||||
if r == nil {
|
||||
f, err := os.OpenFile(prm.path, os.O_RDONLY, os.ModeExclusive)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return RestoreRes{}, err
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
|
@ -79,7 +79,7 @@ func (s *Shard) Restore(prm RestorePrm) (*RestoreRes, error) {
|
|||
var m [4]byte
|
||||
_, _ = io.ReadFull(r, m[:])
|
||||
if !bytes.Equal(m[:], dumpMagic) {
|
||||
return nil, ErrInvalidMagic
|
||||
return RestoreRes{}, ErrInvalidMagic
|
||||
}
|
||||
|
||||
var putPrm PutPrm
|
||||
|
@ -95,7 +95,7 @@ func (s *Shard) Restore(prm RestorePrm) (*RestoreRes, error) {
|
|||
if errors.Is(err, io.EOF) {
|
||||
break
|
||||
}
|
||||
return nil, err
|
||||
return RestoreRes{}, err
|
||||
}
|
||||
|
||||
sz := binary.LittleEndian.Uint32(size[:])
|
||||
|
@ -107,7 +107,7 @@ func (s *Shard) Restore(prm RestorePrm) (*RestoreRes, error) {
|
|||
|
||||
_, err = r.Read(data)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return RestoreRes{}, err
|
||||
}
|
||||
|
||||
obj := object.New()
|
||||
|
@ -117,17 +117,17 @@ func (s *Shard) Restore(prm RestorePrm) (*RestoreRes, error) {
|
|||
failCount++
|
||||
continue
|
||||
}
|
||||
return nil, err
|
||||
return RestoreRes{}, err
|
||||
}
|
||||
|
||||
putPrm.WithObject(obj)
|
||||
_, err = s.Put(putPrm)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return RestoreRes{}, err
|
||||
}
|
||||
|
||||
count++
|
||||
}
|
||||
|
||||
return &RestoreRes{count: count, failed: failCount}, nil
|
||||
return RestoreRes{count: count, failed: failCount}, nil
|
||||
}
|
||||
|
|
|
@ -43,13 +43,13 @@ func (r SelectRes) AddressList() []oid.Address {
|
|||
//
|
||||
// Returns any error encountered that
|
||||
// did not allow to completely select the objects.
|
||||
func (s *Shard) Select(prm SelectPrm) (*SelectRes, error) {
|
||||
func (s *Shard) Select(prm SelectPrm) (SelectRes, error) {
|
||||
addrList, err := meta.Select(s.metaBase, prm.cnr, prm.filters)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not select objects from metabase: %w", err)
|
||||
return SelectRes{}, fmt.Errorf("could not select objects from metabase: %w", err)
|
||||
}
|
||||
|
||||
return &SelectRes{
|
||||
return SelectRes{
|
||||
addrList: addrList,
|
||||
}, nil
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue