package shard import ( "context" "sync" "sync/atomic" "time" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor" meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "go.uber.org/zap" ) // Shard represents single shard of FrostFS Local Storage Engine. type Shard struct { *cfg gc *gc writeCache writecache.Cache blobStor *blobstor.BlobStor pilorama pilorama.ForestStorage metaBase *meta.DB tsSource TombstoneSource gcCancel atomic.Value setModeRequested atomic.Bool } // Option represents Shard's constructor option. type Option func(*cfg) // ExpiredTombstonesCallback is a callback handling list of expired tombstones. type ExpiredTombstonesCallback func(context.Context, []meta.TombstonedObject) // ExpiredObjectsCallback is a callback handling list of expired objects. type ExpiredObjectsCallback func(context.Context, uint64, []oid.Address) // DeletedLockCallback is a callback handling list of deleted LOCK objects. type DeletedLockCallback func(context.Context, []oid.Address) // MetricsWriter is an interface that must store shard's metrics. type MetricsWriter interface { // SetObjectCounter must set object counter taking into account object type. SetObjectCounter(objectType string, v uint64) // AddToObjectCounter must update object counter taking into account object // type. // Negative parameter must decrease the counter. AddToObjectCounter(objectType string, delta int) // AddToContainerSize must add a value to the container size. // Value can be negative. AddToContainerSize(cnr string, value int64) // AddToPayloadSize must add a value to the payload size. // Value can be negative. AddToPayloadSize(value int64) // IncObjectCounter must increment shard's object counter taking into account // object type. IncObjectCounter(objectType string) // DecObjectCounter must decrement shard's object counter taking into account // object type. DecObjectCounter(objectType string) // SetShardID must set (update) the shard identifier that will be used in // metrics. SetShardID(id string) // SetReadonly must set shard mode. SetMode(mode mode.Mode) // IncErrorCounter increment error counter. IncErrorCounter() // ClearErrorCounter clear error counter. ClearErrorCounter() // DeleteShardMetrics deletes shard metrics from registry. DeleteShardMetrics() } type cfg struct { m sync.RWMutex refillMetabase bool rmBatchSize int useWriteCache bool info Info blobOpts []blobstor.Option metaOpts []meta.Option writeCacheOpts []writecache.Option piloramaOpts []pilorama.Option log *logger.Logger gcCfg gcCfg expiredTombstonesCallback ExpiredTombstonesCallback expiredLocksCallback ExpiredObjectsCallback deletedLockCallBack DeletedLockCallback tsSource TombstoneSource metricsWriter MetricsWriter reportErrorFunc func(selfID string, message string, err error) } func defaultCfg() *cfg { return &cfg{ rmBatchSize: 100, log: &logger.Logger{Logger: zap.L()}, gcCfg: defaultGCCfg(), reportErrorFunc: func(string, string, error) {}, } } // New creates, initializes and returns new Shard instance. func New(opts ...Option) *Shard { c := defaultCfg() for i := range opts { opts[i](c) } bs := blobstor.New(c.blobOpts...) mb := meta.New(c.metaOpts...) s := &Shard{ cfg: c, blobStor: bs, metaBase: mb, tsSource: c.tsSource, } reportFunc := func(msg string, err error) { s.reportErrorFunc(s.ID().String(), msg, err) } s.blobStor.SetReportErrorFunc(reportFunc) if c.useWriteCache { s.writeCache = writecache.New( append(c.writeCacheOpts, writecache.WithReportErrorFunc(reportFunc), writecache.WithBlobstor(bs), writecache.WithMetabase(mb))...) } if s.piloramaOpts != nil { s.pilorama = pilorama.NewBoltForest(c.piloramaOpts...) } s.fillInfo() return s } // WithID returns option to set the default shard identifier. func WithID(id *ID) Option { return func(c *cfg) { c.info.ID = id } } // WithBlobStorOptions returns option to set internal BlobStor options. func WithBlobStorOptions(opts ...blobstor.Option) Option { return func(c *cfg) { c.blobOpts = opts } } // WithMetaBaseOptions returns option to set internal metabase options. func WithMetaBaseOptions(opts ...meta.Option) Option { return func(c *cfg) { c.metaOpts = opts } } // WithWriteCacheOptions returns option to set internal write cache options. func WithWriteCacheOptions(opts ...writecache.Option) Option { return func(c *cfg) { c.writeCacheOpts = opts } } // WithExtraWriteCacheOptions returns option to add extra write cache options. func WithExtraWriteCacheOptions(opts ...writecache.Option) Option { return func(c *cfg) { c.writeCacheOpts = append(c.writeCacheOpts, opts...) } } // WithPiloramaOptions returns option to set internal write cache options. func WithPiloramaOptions(opts ...pilorama.Option) Option { return func(c *cfg) { c.piloramaOpts = opts } } // WithLogger returns option to set Shard's logger. func WithLogger(l *logger.Logger) Option { return func(c *cfg) { c.log = l c.gcCfg.log = l } } // WithWriteCache returns option to toggle write cache usage. func WithWriteCache(use bool) Option { return func(c *cfg) { c.useWriteCache = use } } // hasWriteCache returns bool if write cache exists on shards. func (s *Shard) hasWriteCache() bool { return s.cfg.useWriteCache } // needRefillMetabase returns true if metabase is needed to be refilled. func (s *Shard) needRefillMetabase() bool { return s.cfg.refillMetabase } // WithRemoverBatchSize returns option to set batch size // of single removal operation. func WithRemoverBatchSize(sz int) Option { return func(c *cfg) { c.rmBatchSize = sz } } // WithGCWorkerPoolInitializer returns option to set initializer of // worker pool with specified worker number. func WithGCWorkerPoolInitializer(wpInit func(int) util.WorkerPool) Option { return func(c *cfg) { c.gcCfg.workerPoolInit = wpInit } } // WithGCRemoverSleepInterval returns option to specify sleep // interval between object remover executions. func WithGCRemoverSleepInterval(dur time.Duration) Option { return func(c *cfg) { c.gcCfg.removerInterval = dur } } // WithExpiredTombstonesCallback returns option to specify callback // of the expired tombstones handler. func WithExpiredTombstonesCallback(cb ExpiredTombstonesCallback) Option { return func(c *cfg) { c.expiredTombstonesCallback = cb } } // WithExpiredLocksCallback returns option to specify callback // of the expired LOCK objects handler. func WithExpiredLocksCallback(cb ExpiredObjectsCallback) Option { return func(c *cfg) { c.expiredLocksCallback = cb } } // WithRefillMetabase returns option to set flag to refill the Metabase on Shard's initialization step. func WithRefillMetabase(v bool) Option { return func(c *cfg) { c.refillMetabase = v } } // WithMode returns option to set shard's mode. Mode must be one of the predefined: // - mode.ReadWrite; // - mode.ReadOnly. func WithMode(v mode.Mode) Option { return func(c *cfg) { c.info.Mode = v } } // WithTombstoneSource returns option to set TombstoneSource. func WithTombstoneSource(v TombstoneSource) Option { return func(c *cfg) { c.tsSource = v } } // WithDeletedLockCallback returns option to specify callback // of the deleted LOCK objects handler. func WithDeletedLockCallback(v DeletedLockCallback) Option { return func(c *cfg) { c.deletedLockCallBack = v } } // WithMetricsWriter returns option to specify storage of the // shard's metrics. func WithMetricsWriter(v MetricsWriter) Option { return func(c *cfg) { c.metricsWriter = v } } // WithGCMetrics returns option to specify storage of the GC metrics. func WithGCMetrics(v GCMectrics) Option { return func(c *cfg) { c.gcCfg.metrics = v } } // WithReportErrorFunc returns option to specify callback for handling storage-related errors // in the background workers. func WithReportErrorFunc(f func(selfID string, message string, err error)) Option { return func(c *cfg) { c.reportErrorFunc = f } } // WithExpiredCollectorBatchSize returns option to set batch size // of expired object collection operation. func WithExpiredCollectorBatchSize(size int) Option { return func(c *cfg) { c.gcCfg.expiredCollectorBatchSize = size } } // WithExpiredCollectorWorkersCount returns option to set concurrent // workers count of expired object collection operation. func WithExpiredCollectorWorkersCount(count int) Option { return func(c *cfg) { c.gcCfg.expiredCollectorWorkersCount = count } } func (s *Shard) fillInfo() { s.cfg.info.MetaBaseInfo = s.metaBase.DumpInfo() s.cfg.info.BlobStorInfo = s.blobStor.DumpInfo() s.cfg.info.Mode = s.GetMode() if s.cfg.useWriteCache { s.cfg.info.WriteCacheInfo = s.writeCache.DumpInfo() } if s.pilorama != nil { s.cfg.info.PiloramaInfo = s.pilorama.DumpInfo() } } const ( // physical is a physically stored object // counter type. physical = "phy" // logical is a logically stored object // counter type (excludes objects that are // stored but unavailable). logical = "logic" ) func (s *Shard) updateMetrics(ctx context.Context) { if s.cfg.metricsWriter != nil && !s.GetMode().NoMetabase() { cc, err := s.metaBase.ObjectCounters() if err != nil { s.log.Warn(logs.ShardMetaObjectCounterRead, zap.Error(err), ) return } s.cfg.metricsWriter.SetObjectCounter(physical, cc.Phy()) s.cfg.metricsWriter.SetObjectCounter(logical, cc.Logic()) cnrList, err := s.metaBase.Containers(ctx) if err != nil { s.log.Warn(logs.ShardMetaCantReadContainerList, zap.Error(err)) return } var totalPayload uint64 for i := range cnrList { size, err := s.metaBase.ContainerSize(cnrList[i]) if err != nil { s.log.Warn(logs.ShardMetaCantReadContainerSize, zap.String("cid", cnrList[i].EncodeToString()), zap.Error(err)) continue } s.metricsWriter.AddToContainerSize(cnrList[i].EncodeToString(), int64(size)) totalPayload += size } s.metricsWriter.AddToPayloadSize(int64(totalPayload)) } } // incObjectCounter increment both physical and logical object // counters. func (s *Shard) incObjectCounter() { if s.cfg.metricsWriter != nil { s.cfg.metricsWriter.IncObjectCounter(physical) s.cfg.metricsWriter.IncObjectCounter(logical) } } func (s *Shard) decObjectCounterBy(typ string, v uint64) { if s.cfg.metricsWriter != nil { s.cfg.metricsWriter.AddToObjectCounter(typ, -int(v)) } } func (s *Shard) addToContainerSize(cnr string, size int64) { if s.cfg.metricsWriter != nil { s.cfg.metricsWriter.AddToContainerSize(cnr, size) } } func (s *Shard) addToPayloadSize(size int64) { if s.cfg.metricsWriter != nil { s.cfg.metricsWriter.AddToPayloadSize(size) } } func (s *Shard) IncErrorCounter() { if s.cfg.metricsWriter != nil { s.cfg.metricsWriter.IncErrorCounter() } } func (s *Shard) ClearErrorCounter() { if s.cfg.metricsWriter != nil { s.cfg.metricsWriter.ClearErrorCounter() } } func (s *Shard) DeleteShardMetrics() { if s.cfg.metricsWriter != nil { s.cfg.metricsWriter.DeleteShardMetrics() } }