forked from TrueCloudLab/frostfs-node
[#156] shard: Make refillMetabase() pass linter checks
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
This commit is contained in:
parent
3f6b962349
commit
fb13902db9
1 changed files with 56 additions and 43 deletions
|
@ -155,7 +155,6 @@ func (s *Shard) Init() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// nolint: funlen
|
||||
func (s *Shard) refillMetabase() error {
|
||||
err := s.metaBase.Reset()
|
||||
if err != nil {
|
||||
|
@ -172,57 +171,23 @@ func (s *Shard) refillMetabase() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// nolint: exhaustive
|
||||
var err error
|
||||
switch obj.Type() {
|
||||
case objectSDK.TypeTombstone:
|
||||
tombstone := objectSDK.NewTombstone()
|
||||
|
||||
if err := tombstone.Unmarshal(obj.Payload()); err != nil {
|
||||
return fmt.Errorf("could not unmarshal tombstone content: %w", err)
|
||||
}
|
||||
|
||||
tombAddr := object.AddressOf(obj)
|
||||
memberIDs := tombstone.Members()
|
||||
tombMembers := make([]oid.Address, 0, len(memberIDs))
|
||||
|
||||
for i := range memberIDs {
|
||||
a := tombAddr
|
||||
a.SetObject(memberIDs[i])
|
||||
|
||||
tombMembers = append(tombMembers, a)
|
||||
}
|
||||
|
||||
var inhumePrm meta.InhumePrm
|
||||
|
||||
inhumePrm.SetTombstoneAddress(tombAddr)
|
||||
inhumePrm.SetAddresses(tombMembers...)
|
||||
|
||||
_, err = s.metaBase.Inhume(inhumePrm)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not inhume objects: %w", err)
|
||||
}
|
||||
err = s.refillTombstoneObject(obj)
|
||||
case objectSDK.TypeLock:
|
||||
var lock objectSDK.Lock
|
||||
if err := lock.Unmarshal(obj.Payload()); err != nil {
|
||||
return fmt.Errorf("could not unmarshal lock content: %w", err)
|
||||
}
|
||||
|
||||
locked := make([]oid.ID, lock.NumberOfMembers())
|
||||
lock.ReadMembers(locked)
|
||||
|
||||
cnr, _ := obj.ContainerID()
|
||||
id, _ := obj.ID()
|
||||
err = s.metaBase.Lock(cnr, id, locked)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not lock objects: %w", err)
|
||||
}
|
||||
err = s.refillLockObject(obj)
|
||||
default:
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var mPrm meta.PutPrm
|
||||
mPrm.SetObject(obj)
|
||||
mPrm.SetStorageID(descriptor)
|
||||
|
||||
_, err := s.metaBase.Put(mPrm)
|
||||
_, err = s.metaBase.Put(mPrm)
|
||||
if err != nil && !meta.IsErrRemoved(err) && !errors.Is(err, meta.ErrObjectIsExpired) {
|
||||
return err
|
||||
}
|
||||
|
@ -241,6 +206,54 @@ func (s *Shard) refillMetabase() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (s *Shard) refillLockObject(obj *objectSDK.Object) error {
|
||||
var lock objectSDK.Lock
|
||||
if err := lock.Unmarshal(obj.Payload()); err != nil {
|
||||
return fmt.Errorf("could not unmarshal lock content: %w", err)
|
||||
}
|
||||
|
||||
locked := make([]oid.ID, lock.NumberOfMembers())
|
||||
lock.ReadMembers(locked)
|
||||
|
||||
cnr, _ := obj.ContainerID()
|
||||
id, _ := obj.ID()
|
||||
err := s.metaBase.Lock(cnr, id, locked)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not lock objects: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Shard) refillTombstoneObject(obj *objectSDK.Object) error {
|
||||
tombstone := objectSDK.NewTombstone()
|
||||
|
||||
if err := tombstone.Unmarshal(obj.Payload()); err != nil {
|
||||
return fmt.Errorf("could not unmarshal tombstone content: %w", err)
|
||||
}
|
||||
|
||||
tombAddr := object.AddressOf(obj)
|
||||
memberIDs := tombstone.Members()
|
||||
tombMembers := make([]oid.Address, 0, len(memberIDs))
|
||||
|
||||
for i := range memberIDs {
|
||||
a := tombAddr
|
||||
a.SetObject(memberIDs[i])
|
||||
|
||||
tombMembers = append(tombMembers, a)
|
||||
}
|
||||
|
||||
var inhumePrm meta.InhumePrm
|
||||
|
||||
inhumePrm.SetTombstoneAddress(tombAddr)
|
||||
inhumePrm.SetAddresses(tombMembers...)
|
||||
|
||||
_, err := s.metaBase.Inhume(inhumePrm)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not inhume objects: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Close releases all Shard's components.
|
||||
func (s *Shard) Close() error {
|
||||
components := []interface{ Close() error }{}
|
||||
|
|
Loading…
Reference in a new issue