frostfs-node/pkg/local_object_storage/shard/control.go
Dmitrii Stepanov 80ce7c3a00
All checks were successful
DCO action / DCO (pull_request) Successful in 1m34s
Vulncheck / Vulncheck (pull_request) Successful in 2m0s
Tests and linters / Run gofumpt (pull_request) Successful in 2m14s
Build / Build Components (1.22) (pull_request) Successful in 2m41s
Build / Build Components (1.21) (pull_request) Successful in 2m43s
Pre-commit hooks / Pre-commit (pull_request) Successful in 2m50s
Tests and linters / Tests (1.21) (pull_request) Successful in 2m51s
Tests and linters / Tests (1.22) (pull_request) Successful in 2m51s
Tests and linters / Tests with -race (pull_request) Successful in 2m56s
Tests and linters / Staticcheck (pull_request) Successful in 2m57s
Tests and linters / Lint (pull_request) Successful in 3m27s
Tests and linters / gopls check (pull_request) Successful in 3m56s
[#1284] shard: Resolve funlen linter
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2024-08-09 13:23:35 +03:00

447 lines
11 KiB
Go

package shard
import (
"context"
"errors"
"fmt"
"sync"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)
func (s *Shard) handleMetabaseFailure(stage string, err error) error {
s.log.Error(logs.ShardMetabaseFailureSwitchingMode,
zap.String("stage", stage),
zap.Stringer("mode", mode.ReadOnly),
zap.Error(err))
err = s.SetMode(mode.ReadOnly)
if err == nil {
return nil
}
s.log.Error(logs.ShardCantMoveShardToReadonlySwitchMode,
zap.String("stage", stage),
zap.Stringer("mode", mode.DegradedReadOnly),
zap.Error(err))
err = s.SetMode(mode.DegradedReadOnly)
if err != nil {
return fmt.Errorf("could not switch to mode %s", mode.Mode(mode.DegradedReadOnly))
}
return nil
}
// Open opens all Shard's components.
func (s *Shard) Open(ctx context.Context) error {
components := []interface {
Open(context.Context, mode.Mode) error
}{
s.blobStor,
}
m := s.GetMode()
if !m.NoMetabase() {
components = append(components, s.metaBase)
}
if s.hasWriteCache() && !m.NoMetabase() {
components = append(components, s.writeCache)
}
if s.pilorama != nil {
components = append(components, s.pilorama)
}
for i, component := range components {
if err := component.Open(ctx, m); err != nil {
if component == s.metaBase {
// We must first open all other components to avoid
// opening non-existent DB in read-only mode.
for j := i + 1; j < len(components); j++ {
if err := components[j].Open(ctx, m); err != nil {
// Other components must be opened, fail.
return fmt.Errorf("could not open %T: %w", components[j], err)
}
}
err = s.handleMetabaseFailure("open", err)
if err != nil {
return err
}
break
}
return fmt.Errorf("could not open %T: %w", component, err)
}
}
return nil
}
type metabaseSynchronizer Shard
func (x *metabaseSynchronizer) Init() error {
ctx, span := tracing.StartSpanFromContext(context.TODO(), "metabaseSynchronizer.Init")
defer span.End()
return (*Shard)(x).refillMetabase(ctx)
}
// Init initializes all Shard's components.
func (s *Shard) Init(ctx context.Context) error {
m := s.GetMode()
if err := s.initializeComponents(m); err != nil {
return err
}
s.updateMetrics(ctx)
s.gc = &gc{
gcCfg: &s.gcCfg,
remover: s.removeGarbage,
stopChannel: make(chan struct{}),
eventChan: make(chan Event),
mEventHandler: map[eventType]*eventHandlers{
eventNewEpoch: {
cancelFunc: func() {},
handlers: []eventHandler{
s.collectExpiredLocks,
s.collectExpiredObjects,
s.collectExpiredTombstones,
s.collectExpiredMetrics,
},
},
},
}
if s.gc.metrics != nil {
s.gc.metrics.SetShardID(s.info.ID.String())
}
s.gc.init(ctx)
s.rb = newRebuilder(s.rebuildLimiter)
if !m.NoMetabase() {
s.rb.Start(ctx, s.blobStor, s.metaBase, s.log)
}
s.writecacheSealCancel.Store(dummyCancel)
return nil
}
func (s *Shard) initializeComponents(m mode.Mode) error {
type initializer interface {
Init() error
}
var components []initializer
if !m.NoMetabase() {
var initMetabase initializer
if s.NeedRefillMetabase() {
initMetabase = (*metabaseSynchronizer)(s)
} else {
initMetabase = s.metaBase
}
components = []initializer{
s.blobStor, initMetabase,
}
} else {
components = []initializer{s.blobStor}
}
if s.hasWriteCache() && !m.NoMetabase() {
components = append(components, s.writeCache)
}
if s.pilorama != nil {
components = append(components, s.pilorama)
}
for _, component := range components {
if err := component.Init(); err != nil {
if component == s.metaBase {
if errors.Is(err, meta.ErrOutdatedVersion) {
return fmt.Errorf("metabase initialization: %w", err)
}
err = s.handleMetabaseFailure("init", err)
if err != nil {
return err
}
break
}
return fmt.Errorf("could not initialize %T: %w", component, err)
}
}
return nil
}
func (s *Shard) refillMetabase(ctx context.Context) error {
path := s.metaBase.DumpInfo().Path
s.metricsWriter.SetRefillStatus(path, "running")
s.metricsWriter.SetRefillPercent(path, 0)
var success bool
defer func() {
if success {
s.metricsWriter.SetRefillStatus(path, "completed")
} else {
s.metricsWriter.SetRefillStatus(path, "failed")
}
}()
err := s.metaBase.Reset()
if err != nil {
return fmt.Errorf("could not reset metabase: %w", err)
}
withCount := true
totalObjects, err := s.blobStor.ObjectsCount(ctx)
if err != nil {
s.log.Warn(logs.EngineRefillFailedToGetObjectsCount, zap.Error(err))
withCount = false
}
eg, egCtx := errgroup.WithContext(ctx)
if s.cfg.refillMetabaseWorkersCount > 0 {
eg.SetLimit(s.cfg.refillMetabaseWorkersCount)
}
var completedCount uint64
var metricGuard sync.Mutex
itErr := blobstor.IterateBinaryObjects(egCtx, s.blobStor, func(addr oid.Address, data []byte, descriptor []byte) error {
eg.Go(func() error {
var success bool
defer func() {
s.metricsWriter.IncRefillObjectsCount(path, len(data), success)
if withCount {
metricGuard.Lock()
completedCount++
s.metricsWriter.SetRefillPercent(path, uint32(completedCount*100/totalObjects))
metricGuard.Unlock()
}
}()
if err := s.refillObject(egCtx, data, addr, descriptor); err != nil {
return err
}
success = true
return nil
})
select {
case <-egCtx.Done():
return egCtx.Err()
default:
return nil
}
})
egErr := eg.Wait()
err = errors.Join(egErr, itErr)
if err != nil {
return fmt.Errorf("could not put objects to the meta: %w", err)
}
err = s.metaBase.SyncCounters()
if err != nil {
return fmt.Errorf("could not sync object counters: %w", err)
}
success = true
s.metricsWriter.SetRefillPercent(path, 100)
return nil
}
func (s *Shard) refillObject(ctx context.Context, data []byte, addr oid.Address, descriptor []byte) error {
obj := objectSDK.New()
if err := obj.Unmarshal(data); err != nil {
s.log.Warn(logs.ShardCouldNotUnmarshalObject,
zap.Stringer("address", addr),
zap.String("err", err.Error()))
return nil
}
var err error
switch obj.Type() {
case objectSDK.TypeTombstone:
err = s.refillTombstoneObject(ctx, obj)
case objectSDK.TypeLock:
err = s.refillLockObject(ctx, obj)
default:
}
if err != nil {
return err
}
var mPrm meta.PutPrm
mPrm.SetObject(obj)
mPrm.SetStorageID(descriptor)
_, err = s.metaBase.Put(ctx, mPrm)
if err != nil && !client.IsErrObjectAlreadyRemoved(err) && !errors.Is(err, meta.ErrObjectIsExpired) {
return err
}
return nil
}
func (s *Shard) refillLockObject(ctx context.Context, obj *objectSDK.Object) error {
var lock objectSDK.Lock
if err := lock.Unmarshal(obj.Payload()); err != nil {
return fmt.Errorf("could not unmarshal lock content: %w", err)
}
locked := make([]oid.ID, lock.NumberOfMembers())
lock.ReadMembers(locked)
cnr, _ := obj.ContainerID()
id, _ := obj.ID()
err := s.metaBase.Lock(ctx, cnr, id, locked)
if err != nil {
return fmt.Errorf("could not lock objects: %w", err)
}
return nil
}
func (s *Shard) refillTombstoneObject(ctx context.Context, obj *objectSDK.Object) error {
tombstone := objectSDK.NewTombstone()
if err := tombstone.Unmarshal(obj.Payload()); err != nil {
return fmt.Errorf("could not unmarshal tombstone content: %w", err)
}
tombAddr := object.AddressOf(obj)
memberIDs := tombstone.Members()
tombMembers := make([]oid.Address, 0, len(memberIDs))
for i := range memberIDs {
a := tombAddr
a.SetObject(memberIDs[i])
tombMembers = append(tombMembers, a)
}
var inhumePrm meta.InhumePrm
inhumePrm.SetTombstoneAddress(tombAddr)
inhumePrm.SetAddresses(tombMembers...)
_, err := s.metaBase.Inhume(ctx, inhumePrm)
if err != nil {
return fmt.Errorf("could not inhume objects: %w", err)
}
return nil
}
// Close releases all Shard's components.
func (s *Shard) Close() error {
if s.rb != nil {
s.rb.Stop(s.log)
}
components := []interface{ Close() error }{}
if s.pilorama != nil {
components = append(components, s.pilorama)
}
if s.hasWriteCache() {
prev := s.writecacheSealCancel.Swap(notInitializedCancel)
prev.cancel() // no need to wait: writecache.Seal and writecache.Close lock the same mutex
components = append(components, s.writeCache)
}
components = append(components, s.blobStor, s.metaBase)
var lastErr error
for _, component := range components {
if err := component.Close(); err != nil {
lastErr = err
s.log.Error(logs.ShardCouldNotCloseShardComponent, zap.Error(err))
}
}
// If Init/Open was unsuccessful gc can be nil.
if s.gc != nil {
s.gc.stop()
}
return lastErr
}
// Reload reloads configuration portions that are necessary.
// If a config option is invalid, it logs an error and returns nil.
// If there was a problem with applying new configuration, an error is returned.
func (s *Shard) Reload(ctx context.Context, opts ...Option) error {
ctx, span := tracing.StartSpanFromContext(ctx, "Shard.Reload")
defer span.End()
// Do not use defaultCfg here missing options need not be reloaded.
var c cfg
for i := range opts {
opts[i](&c)
}
unlock := s.lockExclusive()
defer unlock()
s.rb.Stop(s.log)
if !s.info.Mode.NoMetabase() {
defer func() {
s.rb.Start(ctx, s.blobStor, s.metaBase, s.log)
}()
}
ok, err := s.metaBase.Reload(c.metaOpts...)
if err != nil {
if errors.Is(err, meta.ErrDegradedMode) {
s.log.Error(logs.ShardCantOpenMetabaseMoveToADegradedMode, zap.Error(err))
_ = s.setMode(mode.DegradedReadOnly)
}
return err
}
if ok {
var err error
if c.refillMetabase {
// Here we refill metabase only if a new instance was opened. This is a feature,
// we don't want to hang for some time just because we forgot to change
// config after the node was updated.
err = s.refillMetabase(ctx)
} else {
err = s.metaBase.Init()
}
if err != nil {
s.log.Error(logs.ShardCantInitializeMetabaseMoveToADegradedreadonlyMode, zap.Error(err))
_ = s.setMode(mode.DegradedReadOnly)
return err
}
}
return s.setMode(c.info.Mode)
}
func (s *Shard) lockExclusive() func() {
s.setModeRequested.Store(true)
val := s.gcCancel.Load()
if val != nil {
cancelGC := val.(context.CancelFunc)
cancelGC()
}
if c := s.writecacheSealCancel.Load(); c != nil {
c.cancel()
}
s.m.Lock()
s.setModeRequested.Store(false)
return s.m.Unlock
}