frostfs-node/pkg/local_object_storage/shard/shard.go
Dmitrii Stepanov b01f05c951
[#1689] shard: Use single qos.Limiter instance for shard and writecache
Before fix shard and writecache used the same instance of qos.Limiter.
In case of SIGHUP signal shard was closing qos.Limiter, but didn't
update writecache's pointer.

Now shard uses atomic pointer to qos.Limiter and shares it with writecache.
On SIGHUP shard updates atomic pointer value and closes old qos.Limiter.

Change-Id: Ic2ab62441d3872e71c5771f54d070e0ca48fe375
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2025-05-06 12:46:48 +03:00

562 lines
14 KiB
Go

package shard
import (
"context"
"sync"
"sync/atomic"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/qos"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.uber.org/zap"
)
// Shard represents single shard of FrostFS Local Storage Engine.
type Shard struct {
*cfg
gc *gc
writeCache writecache.Cache
blobStor *blobstor.BlobStor
pilorama pilorama.ForestStorage
metaBase *meta.DB
tsSource TombstoneSource
rb *rebuilder
opsLimiter *atomicOpsLimiter
gcCancel atomic.Value
setModeRequested atomic.Bool
writecacheSealCancel atomic.Pointer[writecacheSealCanceler]
}
// Option represents Shard's constructor option.
type Option func(*cfg)
// ExpiredTombstonesCallback is a callback handling list of expired tombstones.
type ExpiredTombstonesCallback func(context.Context, []meta.TombstonedObject)
// ExpiredObjectsCallback is a callback handling list of expired objects.
type ExpiredObjectsCallback func(context.Context, uint64, []oid.Address)
// DeletedLockCallback is a callback handling list of deleted LOCK objects.
type DeletedLockCallback func(context.Context, []oid.Address)
// EmptyContainersCallback is a callback hanfling list of zero-size and zero-count containers.
type EmptyContainersCallback func(context.Context, []cid.ID)
type cfg struct {
m sync.RWMutex
refillMetabase bool
refillMetabaseWorkersCount int
rmBatchSize int
useWriteCache bool
info Info
blobOpts []blobstor.Option
metaOpts []meta.Option
writeCacheOpts []writecache.Option
piloramaOpts []pilorama.Option
log *logger.Logger
gcCfg gcCfg
expiredTombstonesCallback ExpiredTombstonesCallback
expiredLocksCallback ExpiredObjectsCallback
deletedLockCallBack DeletedLockCallback
zeroSizeContainersCallback EmptyContainersCallback
zeroCountContainersCallback EmptyContainersCallback
tsSource TombstoneSource
metricsWriter MetricsWriter
reportErrorFunc func(ctx context.Context, selfID string, message string, err error)
containerInfo container.InfoProvider
configOpsLimiter qos.Limiter
}
func defaultCfg() *cfg {
return &cfg{
rmBatchSize: 100,
log: logger.NewLoggerWrapper(zap.L()),
gcCfg: defaultGCCfg(),
reportErrorFunc: func(context.Context, string, string, error) {},
zeroSizeContainersCallback: func(context.Context, []cid.ID) {},
zeroCountContainersCallback: func(context.Context, []cid.ID) {},
metricsWriter: noopMetrics{},
configOpsLimiter: qos.NewNoopLimiter(),
}
}
// New creates, initializes and returns new Shard instance.
func New(opts ...Option) *Shard {
c := defaultCfg()
for i := range opts {
opts[i](c)
}
bs := blobstor.New(c.blobOpts...)
mb := meta.New(c.metaOpts...)
s := &Shard{
cfg: c,
blobStor: bs,
metaBase: mb,
tsSource: c.tsSource,
opsLimiter: newAtomicOpsLimiter(c.configOpsLimiter),
}
reportFunc := func(ctx context.Context, msg string, err error) {
s.reportErrorFunc(ctx, s.ID().String(), msg, err)
}
s.blobStor.SetReportErrorFunc(reportFunc)
if c.useWriteCache {
s.writeCache = writecache.New(
append(c.writeCacheOpts,
writecache.WithReportErrorFunc(reportFunc),
writecache.WithBlobstor(bs),
writecache.WithMetabase(mb),
writecache.WithQoSLimiter(s.opsLimiter))...)
s.writeCache.GetMetrics().SetPath(s.writeCache.DumpInfo().Path)
}
if s.piloramaOpts != nil {
s.pilorama = pilorama.NewBoltForest(c.piloramaOpts...)
}
s.fillInfo()
s.writecacheSealCancel.Store(notInitializedCancel)
return s
}
// WithID returns option to set the default shard identifier.
func WithID(id *ID) Option {
return func(c *cfg) {
c.info.ID = id
}
}
// WithBlobStorOptions returns option to set internal BlobStor options.
func WithBlobStorOptions(opts ...blobstor.Option) Option {
return func(c *cfg) {
c.blobOpts = opts
}
}
// WithMetaBaseOptions returns option to set internal metabase options.
func WithMetaBaseOptions(opts ...meta.Option) Option {
return func(c *cfg) {
c.metaOpts = opts
}
}
// WithWriteCacheOptions returns option to set internal write cache options.
func WithWriteCacheOptions(opts []writecache.Option) Option {
return func(c *cfg) {
c.writeCacheOpts = opts
}
}
// WithWriteCacheMetrics returns an option to set the metrics register used by the write cache.
func WithWriteCacheMetrics(wcMetrics writecache.Metrics) Option {
return func(c *cfg) {
c.writeCacheOpts = append(c.writeCacheOpts, writecache.WithMetrics(wcMetrics))
}
}
// WithPiloramaOptions returns option to set internal write cache options.
func WithPiloramaOptions(opts ...pilorama.Option) Option {
return func(c *cfg) {
c.piloramaOpts = opts
}
}
// WithLogger returns option to set Shard's logger.
func WithLogger(l *logger.Logger) Option {
return func(c *cfg) {
c.log = l
c.gcCfg.log = l.WithTag(logger.TagGC)
}
}
// WithWriteCache returns option to toggle write cache usage.
func WithWriteCache(use bool) Option {
return func(c *cfg) {
c.useWriteCache = use
}
}
// hasWriteCache returns bool if write cache exists on shards.
func (s *Shard) hasWriteCache() bool {
return s.useWriteCache
}
// NeedRefillMetabase returns true if metabase is needed to be refilled.
func (s *Shard) NeedRefillMetabase() bool {
return s.cfg.refillMetabase
}
// WithRemoverBatchSize returns option to set batch size
// of single removal operation.
func WithRemoverBatchSize(sz int) Option {
return func(c *cfg) {
c.rmBatchSize = sz
}
}
// WithGCWorkerPoolInitializer returns option to set initializer of
// worker pool with specified worker number.
func WithGCWorkerPoolInitializer(wpInit func(int) util.WorkerPool) Option {
return func(c *cfg) {
c.gcCfg.workerPoolInit = wpInit
}
}
// WithGCRemoverSleepInterval returns option to specify sleep
// interval between object remover executions.
func WithGCRemoverSleepInterval(dur time.Duration) Option {
return func(c *cfg) {
c.gcCfg.removerInterval = dur
}
}
// WithExpiredTombstonesCallback returns option to specify callback
// of the expired tombstones handler.
func WithExpiredTombstonesCallback(cb ExpiredTombstonesCallback) Option {
return func(c *cfg) {
c.expiredTombstonesCallback = cb
}
}
// WithExpiredLocksCallback returns option to specify callback
// of the expired LOCK objects handler.
func WithExpiredLocksCallback(cb ExpiredObjectsCallback) Option {
return func(c *cfg) {
c.expiredLocksCallback = cb
}
}
// WithRefillMetabase returns option to set flag to refill the Metabase on Shard's initialization step.
func WithRefillMetabase(v bool) Option {
return func(c *cfg) {
c.refillMetabase = v
}
}
// WithRefillMetabaseWorkersCount returns option to set count of workers to refill the Metabase on Shard's initialization step.
func WithRefillMetabaseWorkersCount(v int) Option {
return func(c *cfg) {
c.refillMetabaseWorkersCount = v
}
}
// WithMode returns option to set shard's mode. Mode must be one of the predefined:
// - mode.ReadWrite;
// - mode.ReadOnly.
func WithMode(v mode.Mode) Option {
return func(c *cfg) {
c.info.Mode = v
}
}
// WithTombstoneSource returns option to set TombstoneSource.
func WithTombstoneSource(v TombstoneSource) Option {
return func(c *cfg) {
c.tsSource = v
}
}
// WithDeletedLockCallback returns option to specify callback
// of the deleted LOCK objects handler.
func WithDeletedLockCallback(v DeletedLockCallback) Option {
return func(c *cfg) {
c.deletedLockCallBack = v
}
}
// WithMetricsWriter returns option to specify storage of the
// shard's metrics.
func WithMetricsWriter(v MetricsWriter) Option {
return func(c *cfg) {
c.metricsWriter = v
}
}
// WithGCMetrics returns option to specify storage of the GC metrics.
func WithGCMetrics(v GCMectrics) Option {
return func(c *cfg) {
c.gcCfg.metrics = v
}
}
// WithReportErrorFunc returns option to specify callback for handling storage-related errors
// in the background workers.
func WithReportErrorFunc(f func(ctx context.Context, selfID string, message string, err error)) Option {
return func(c *cfg) {
c.reportErrorFunc = f
}
}
// WithExpiredCollectorBatchSize returns option to set batch size
// of expired object collection operation.
func WithExpiredCollectorBatchSize(size int) Option {
return func(c *cfg) {
c.gcCfg.expiredCollectorBatchSize = size
}
}
// WithExpiredCollectorWorkerCount returns option to set concurrent
// workers count of expired object collection operation.
func WithExpiredCollectorWorkerCount(count int) Option {
return func(c *cfg) {
c.gcCfg.expiredCollectorWorkerCount = count
}
}
// WithDisabledGC disables GC.
// For testing purposes only.
func WithDisabledGC() Option {
return func(c *cfg) {
c.gcCfg.testHookRemover = func(_ context.Context) gcRunResult { return gcRunResult{} }
}
}
// WithZeroSizeCallback returns option to set zero-size containers callback.
func WithZeroSizeCallback(cb EmptyContainersCallback) Option {
return func(c *cfg) {
c.zeroSizeContainersCallback = cb
}
}
// WithZeroCountCallback returns option to set zero-count containers callback.
func WithZeroCountCallback(cb EmptyContainersCallback) Option {
return func(c *cfg) {
c.zeroCountContainersCallback = cb
}
}
// WithContainerInfoProvider returns option to set container info provider.
func WithContainerInfoProvider(containerInfo container.InfoProvider) Option {
return func(c *cfg) {
c.containerInfo = containerInfo
}
}
func WithLimiter(l qos.Limiter) Option {
return func(c *cfg) {
c.configOpsLimiter = l
}
}
func (s *Shard) fillInfo() {
s.info.MetaBaseInfo = s.metaBase.DumpInfo()
s.info.BlobStorInfo = s.blobStor.DumpInfo()
s.info.Mode = s.GetMode()
if s.useWriteCache {
s.info.WriteCacheInfo = s.writeCache.DumpInfo()
}
if s.pilorama != nil {
s.info.PiloramaInfo = s.pilorama.DumpInfo()
}
}
const (
// physical is a physically stored object
// counter type.
physical = "phy"
// logical is a logically stored object
// counter type (excludes objects that are
// stored but unavailable).
logical = "logic"
// user is an available small or big regular object.
user = "user"
)
func (s *Shard) updateMetrics(ctx context.Context) {
if s.GetMode().NoMetabase() {
return
}
cc, err := s.metaBase.ObjectCounters()
if err != nil {
s.log.Warn(ctx, logs.ShardMetaObjectCounterRead,
zap.Error(err),
)
return
}
s.setObjectCounterBy(physical, cc.Phy)
s.setObjectCounterBy(logical, cc.Logic)
s.setObjectCounterBy(user, cc.User)
cnrList, err := s.metaBase.Containers(ctx)
if err != nil {
s.log.Warn(ctx, logs.ShardMetaCantReadContainerList, zap.Error(err))
return
}
var totalPayload uint64
for i := range cnrList {
size, err := s.metaBase.ContainerSize(cnrList[i])
if err != nil {
s.log.Warn(ctx, logs.ShardMetaCantReadContainerSize,
zap.String("cid", cnrList[i].EncodeToString()),
zap.Error(err))
continue
}
s.addToContainerSize(cnrList[i].EncodeToString(), int64(size))
totalPayload += size
}
s.addToPayloadSize(int64(totalPayload))
contCount, err := s.metaBase.ContainerCounters(ctx)
if err != nil {
s.log.Warn(ctx, logs.FailedToGetContainerCounters, zap.Error(err))
return
}
for contID, count := range contCount.Counts {
s.setContainerObjectsCount(contID.EncodeToString(), physical, count.Phy)
s.setContainerObjectsCount(contID.EncodeToString(), logical, count.Logic)
s.setContainerObjectsCount(contID.EncodeToString(), user, count.User)
}
s.metricsWriter.SetMode(s.info.Mode)
}
// incObjectCounter increment both physical and logical object
// counters.
func (s *Shard) incObjectCounter(cnrID cid.ID, isUser bool) {
s.metricsWriter.IncObjectCounter(physical)
s.metricsWriter.IncObjectCounter(logical)
s.metricsWriter.IncContainerObjectsCount(cnrID.EncodeToString(), physical)
s.metricsWriter.IncContainerObjectsCount(cnrID.EncodeToString(), logical)
if isUser {
s.metricsWriter.IncObjectCounter(user)
s.metricsWriter.IncContainerObjectsCount(cnrID.EncodeToString(), user)
}
}
func (s *Shard) decObjectCounterBy(typ string, v uint64) {
if v > 0 {
s.metricsWriter.AddToObjectCounter(typ, -int(v))
}
}
func (s *Shard) setObjectCounterBy(typ string, v uint64) {
if v > 0 {
s.metricsWriter.SetObjectCounter(typ, v)
}
}
func (s *Shard) decContainerObjectCounter(byCnr map[cid.ID]meta.ObjectCounters) {
for cnrID, count := range byCnr {
if count.Phy > 0 {
s.metricsWriter.SubContainerObjectsCount(cnrID.EncodeToString(), physical, count.Phy)
}
if count.Logic > 0 {
s.metricsWriter.SubContainerObjectsCount(cnrID.EncodeToString(), logical, count.Logic)
}
if count.User > 0 {
s.metricsWriter.SubContainerObjectsCount(cnrID.EncodeToString(), user, count.User)
}
}
}
func (s *Shard) addToContainerSize(cnr string, size int64) {
if size != 0 {
s.metricsWriter.AddToContainerSize(cnr, size)
}
}
func (s *Shard) addToPayloadSize(size int64) {
if size != 0 {
s.metricsWriter.AddToPayloadSize(size)
}
}
func (s *Shard) setContainerObjectsCount(cnr string, typ string, v uint64) {
if v > 0 {
s.metricsWriter.SetContainerObjectsCount(cnr, typ, v)
}
}
func (s *Shard) SetEvacuationInProgress(val bool) {
s.m.Lock()
defer s.m.Unlock()
s.info.EvacuationInProgress = val
s.metricsWriter.SetEvacuationInProgress(val)
}
var _ qos.Limiter = &atomicOpsLimiter{}
func newAtomicOpsLimiter(l qos.Limiter) *atomicOpsLimiter {
result := &atomicOpsLimiter{}
result.ptr.Store(&qosLimiterHolder{Limiter: l})
return result
}
type atomicOpsLimiter struct {
ptr atomic.Pointer[qosLimiterHolder]
}
func (a *atomicOpsLimiter) Close() {
a.ptr.Load().Close()
}
func (a *atomicOpsLimiter) ReadRequest(ctx context.Context) (qos.ReleaseFunc, error) {
return a.ptr.Load().ReadRequest(ctx)
}
func (a *atomicOpsLimiter) SetMetrics(m qos.Metrics) {
a.ptr.Load().SetMetrics(m)
}
func (a *atomicOpsLimiter) SetParentID(id string) {
a.ptr.Load().SetParentID(id)
}
func (a *atomicOpsLimiter) WriteRequest(ctx context.Context) (qos.ReleaseFunc, error) {
return a.ptr.Load().WriteRequest(ctx)
}
type qosLimiterHolder struct {
qos.Limiter
}