73f8bb3e5f
Allow to extend blobstor with more storage sub-systems. Currently objects stored in the FSTree have empty byte slice descriptor and object from blobovnicza tree have the same id as earlier. Each such change in the identifier formation should be accompanied with metabase version increase. Signed-off-by: Evgenii Stratonikov <evgeniy@nspcc.ru>
242 lines
5.3 KiB
Go
242 lines
5.3 KiB
Go
package shard
|
|
|
|
import (
|
|
"errors"
|
|
"fmt"
|
|
|
|
"github.com/nspcc-dev/neofs-node/pkg/core/object"
|
|
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor"
|
|
meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase"
|
|
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard/mode"
|
|
objectSDK "github.com/nspcc-dev/neofs-sdk-go/object"
|
|
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
func (s *Shard) handleMetabaseFailure(stage string, err error) error {
|
|
s.log.Error("metabase failure, switching mode",
|
|
zap.String("stage", stage),
|
|
zap.Stringer("mode", mode.ReadOnly),
|
|
zap.Error(err))
|
|
|
|
err = s.SetMode(mode.ReadOnly)
|
|
if err == nil {
|
|
return nil
|
|
}
|
|
|
|
s.log.Error("can't move shard to readonly, switch mode",
|
|
zap.String("stage", stage),
|
|
zap.Stringer("mode", mode.DegradedReadOnly),
|
|
zap.Error(err))
|
|
|
|
err = s.SetMode(mode.DegradedReadOnly)
|
|
if err != nil {
|
|
return fmt.Errorf("could not switch to mode %s", mode.DegradedReadOnly)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Open opens all Shard's components.
|
|
func (s *Shard) Open() error {
|
|
components := []interface{ Open(bool) error }{
|
|
s.blobStor, s.metaBase,
|
|
}
|
|
|
|
if s.hasWriteCache() {
|
|
components = append(components, s.writeCache)
|
|
}
|
|
|
|
if s.pilorama != nil {
|
|
components = append(components, s.pilorama)
|
|
}
|
|
|
|
for _, component := range components {
|
|
if err := component.Open(false); err != nil {
|
|
if component == s.metaBase {
|
|
err = s.handleMetabaseFailure("open", err)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
break
|
|
}
|
|
|
|
return fmt.Errorf("could not open %T: %w", component, err)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
type metabaseSynchronizer Shard
|
|
|
|
func (x *metabaseSynchronizer) Init() error {
|
|
return (*Shard)(x).refillMetabase()
|
|
}
|
|
|
|
// Init initializes all Shard's components.
|
|
func (s *Shard) Init() error {
|
|
type initializer interface {
|
|
Init() error
|
|
}
|
|
|
|
var components []initializer
|
|
|
|
if !s.GetMode().NoMetabase() {
|
|
var initMetabase initializer
|
|
|
|
if s.needRefillMetabase() {
|
|
initMetabase = (*metabaseSynchronizer)(s)
|
|
} else {
|
|
initMetabase = s.metaBase
|
|
}
|
|
|
|
components = []initializer{
|
|
s.blobStor, initMetabase,
|
|
}
|
|
} else {
|
|
components = []initializer{s.blobStor}
|
|
}
|
|
|
|
if s.hasWriteCache() {
|
|
components = append(components, s.writeCache)
|
|
}
|
|
|
|
if s.pilorama != nil {
|
|
components = append(components, s.pilorama)
|
|
}
|
|
|
|
for _, component := range components {
|
|
if err := component.Init(); err != nil {
|
|
if component == s.metaBase {
|
|
err = s.handleMetabaseFailure("init", err)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
break
|
|
}
|
|
|
|
return fmt.Errorf("could not initialize %T: %w", component, err)
|
|
}
|
|
}
|
|
|
|
s.gc = &gc{
|
|
gcCfg: s.gcCfg,
|
|
remover: s.removeGarbage,
|
|
stopChannel: make(chan struct{}),
|
|
mEventHandler: map[eventType]*eventHandlers{
|
|
eventNewEpoch: {
|
|
cancelFunc: func() {},
|
|
handlers: []eventHandler{
|
|
s.collectExpiredObjects,
|
|
s.collectExpiredTombstones,
|
|
s.collectExpiredLocks,
|
|
},
|
|
},
|
|
},
|
|
}
|
|
|
|
s.gc.init()
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *Shard) refillMetabase() error {
|
|
err := s.metaBase.Reset()
|
|
if err != nil {
|
|
return fmt.Errorf("could not reset metabase: %w", err)
|
|
}
|
|
|
|
obj := objectSDK.New()
|
|
|
|
return blobstor.IterateBinaryObjects(s.blobStor, func(addr oid.Address, data []byte, descriptor []byte) error {
|
|
if err := obj.Unmarshal(data); err != nil {
|
|
s.log.Warn("could not unmarshal object",
|
|
zap.Stringer("address", addr),
|
|
zap.String("err", err.Error()))
|
|
return nil
|
|
}
|
|
|
|
//nolint: exhaustive
|
|
switch obj.Type() {
|
|
case objectSDK.TypeTombstone:
|
|
tombstone := objectSDK.NewTombstone()
|
|
|
|
if err := tombstone.Unmarshal(obj.Payload()); err != nil {
|
|
return fmt.Errorf("could not unmarshal tombstone content: %w", err)
|
|
}
|
|
|
|
tombAddr := object.AddressOf(obj)
|
|
memberIDs := tombstone.Members()
|
|
tombMembers := make([]oid.Address, 0, len(memberIDs))
|
|
|
|
for i := range memberIDs {
|
|
a := tombAddr
|
|
a.SetObject(memberIDs[i])
|
|
|
|
tombMembers = append(tombMembers, a)
|
|
}
|
|
|
|
var inhumePrm meta.InhumePrm
|
|
|
|
inhumePrm.SetTombstoneAddress(tombAddr)
|
|
inhumePrm.SetAddresses(tombMembers...)
|
|
|
|
_, err = s.metaBase.Inhume(inhumePrm)
|
|
if err != nil {
|
|
return fmt.Errorf("could not inhume objects: %w", err)
|
|
}
|
|
case objectSDK.TypeLock:
|
|
var lock objectSDK.Lock
|
|
if err := lock.Unmarshal(obj.Payload()); err != nil {
|
|
return fmt.Errorf("could not unmarshal lock content: %w", err)
|
|
}
|
|
|
|
locked := make([]oid.ID, lock.NumberOfMembers())
|
|
lock.ReadMembers(locked)
|
|
|
|
cnr, _ := obj.ContainerID()
|
|
id, _ := obj.ID()
|
|
err = s.metaBase.Lock(cnr, id, locked)
|
|
if err != nil {
|
|
return fmt.Errorf("could not lock objects: %w", err)
|
|
}
|
|
}
|
|
|
|
var mPrm meta.PutPrm
|
|
mPrm.SetObject(obj)
|
|
mPrm.SetStorageID(descriptor)
|
|
|
|
_, err := s.metaBase.Put(mPrm)
|
|
if err != nil && !meta.IsErrRemoved(err) && !errors.Is(err, object.ErrObjectIsExpired) {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
})
|
|
}
|
|
|
|
// Close releases all Shard's components.
|
|
func (s *Shard) Close() error {
|
|
components := []interface{ Close() error }{}
|
|
|
|
if s.pilorama != nil {
|
|
components = append(components, s.pilorama)
|
|
}
|
|
|
|
if s.hasWriteCache() {
|
|
components = append(components, s.writeCache)
|
|
}
|
|
|
|
components = append(components, s.blobStor, s.metaBase)
|
|
|
|
for _, component := range components {
|
|
if err := component.Close(); err != nil {
|
|
return fmt.Errorf("could not close %s: %w", component, err)
|
|
}
|
|
}
|
|
|
|
s.gc.stop()
|
|
|
|
return nil
|
|
}
|