forked from TrueCloudLab/frostfs-node
[#864] engine: Drop container count metric if container removed
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
parent
d75e7e9a21
commit
4b8b4da681
11 changed files with 310 additions and 38 deletions
|
@ -560,8 +560,11 @@ const (
|
|||
ShardGCCollectingExpiredMetricsStarted = "collecting expired metrics started"
|
||||
ShardGCCollectingExpiredMetricsCompleted = "collecting expired metrics completed"
|
||||
ShardGCFailedToCollectZeroSizeContainers = "failed to collect zero-size containers"
|
||||
ShardGCFailedToCollectZeroCountContainers = "failed to collect zero-count containers"
|
||||
EngineFailedToCheckContainerAvailability = "failed to check container availability"
|
||||
EngineFailedToGetContainerSize = "failed to get container size"
|
||||
EngineFailedToDeleteContainerSize = "failed to delete container size"
|
||||
EngineInterruptProcessingZeroSizeContainers = "interrupt processing zero-size containers"
|
||||
EngineInterruptProcessingZeroCountContainers = "interrupt processing zero-count containers"
|
||||
EngineFailedToGetContainerCounters = "failed to get container counters"
|
||||
)
|
||||
|
|
|
@ -266,7 +266,7 @@ func (e *StorageEngine) processZeroSizeContainers(ctx context.Context, ids []cid
|
|||
return
|
||||
}
|
||||
|
||||
idMap, err := e.selectNonExistedIDs(ctx, ids)
|
||||
idMap, err := e.selectNonExistentIDs(ctx, ids)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
@ -339,7 +339,85 @@ func (e *StorageEngine) processZeroSizeContainers(ctx context.Context, ids []cid
|
|||
}
|
||||
}
|
||||
|
||||
func (e *StorageEngine) selectNonExistedIDs(ctx context.Context, ids []cid.ID) (map[cid.ID]struct{}, error) {
|
||||
func (e *StorageEngine) processZeroCountContainers(ctx context.Context, ids []cid.ID) {
|
||||
if len(ids) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
idMap, err := e.selectNonExistentIDs(ctx, ids)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
if len(idMap) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
var failed bool
|
||||
var prm shard.ContainerCountPrm
|
||||
e.iterateOverUnsortedShards(func(sh hashedShard) bool {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
e.log.Info(logs.EngineInterruptProcessingZeroCountContainers, zap.Error(ctx.Err()))
|
||||
failed = true
|
||||
return true
|
||||
default:
|
||||
}
|
||||
|
||||
var drop []cid.ID
|
||||
for id := range idMap {
|
||||
prm.ContainerID = id
|
||||
s, err := sh.ContainerCount(ctx, prm)
|
||||
if err != nil {
|
||||
e.log.Warn(logs.EngineFailedToGetContainerCounters, zap.Stringer("container_id", id), zap.Error(err))
|
||||
failed = true
|
||||
return true
|
||||
}
|
||||
if s.User > 0 || s.Logic > 0 || s.Phy > 0 {
|
||||
drop = append(drop, id)
|
||||
}
|
||||
}
|
||||
for _, id := range drop {
|
||||
delete(idMap, id)
|
||||
}
|
||||
|
||||
return len(idMap) == 0
|
||||
})
|
||||
|
||||
if failed || len(idMap) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
e.iterateOverUnsortedShards(func(sh hashedShard) bool {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
e.log.Info(logs.EngineInterruptProcessingZeroCountContainers, zap.Error(ctx.Err()))
|
||||
failed = true
|
||||
return true
|
||||
default:
|
||||
}
|
||||
|
||||
for id := range idMap {
|
||||
if err := sh.DeleteContainerSize(ctx, id); err != nil {
|
||||
e.log.Warn(logs.EngineFailedToDeleteContainerSize, zap.Stringer("container_id", id), zap.Error(err))
|
||||
failed = true
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
return false
|
||||
})
|
||||
|
||||
if failed {
|
||||
return
|
||||
}
|
||||
|
||||
for id := range idMap {
|
||||
e.metrics.DeleteContainerCount(id.EncodeToString())
|
||||
}
|
||||
}
|
||||
|
||||
func (e *StorageEngine) selectNonExistentIDs(ctx context.Context, ids []cid.ID) (map[cid.ID]struct{}, error) {
|
||||
cs := e.containerSource.Load()
|
||||
|
||||
idMap := make(map[cid.ID]struct{})
|
||||
|
|
|
@ -17,6 +17,7 @@ type MetricRegister interface {
|
|||
|
||||
AddToContainerSize(cnrID string, size int64)
|
||||
DeleteContainerSize(cnrID string)
|
||||
DeleteContainerCount(cnrID string)
|
||||
AddToPayloadCounter(shardID string, size int64)
|
||||
IncErrorCounter(shardID string)
|
||||
ClearErrorCounter(shardID string)
|
||||
|
|
|
@ -120,6 +120,7 @@ func (e *StorageEngine) createShard(ctx context.Context, opts []shard.Option) (*
|
|||
shard.WithReportErrorFunc(e.reportShardErrorBackground),
|
||||
shard.WithRebuildWorkerLimiter(e.rebuildLimiter),
|
||||
shard.WithZeroSizeCallback(e.processZeroSizeContainers),
|
||||
shard.WithZeroCountCallback(e.processZeroCountContainers),
|
||||
)...)
|
||||
|
||||
if err := sh.UpdateID(ctx); err != nil {
|
||||
|
|
|
@ -120,7 +120,9 @@ func (db *DB) ContainerCounters(ctx context.Context) (ContainerCounters, error)
|
|||
default:
|
||||
}
|
||||
|
||||
completed, err := db.containerCountersNextBatch(lastKey, &cc)
|
||||
completed, err := db.containerCountersNextBatch(lastKey, func(id cid.ID, entity ObjectCounters) {
|
||||
cc.Counts[id] = entity
|
||||
})
|
||||
if err != nil {
|
||||
return cc, err
|
||||
}
|
||||
|
@ -133,7 +135,7 @@ func (db *DB) ContainerCounters(ctx context.Context) (ContainerCounters, error)
|
|||
return cc, nil
|
||||
}
|
||||
|
||||
func (db *DB) containerCountersNextBatch(lastKey []byte, cc *ContainerCounters) (bool, error) {
|
||||
func (db *DB) containerCountersNextBatch(lastKey []byte, f func(id cid.ID, entity ObjectCounters)) (bool, error) {
|
||||
db.modeMtx.RLock()
|
||||
defer db.modeMtx.RUnlock()
|
||||
|
||||
|
@ -165,7 +167,7 @@ func (db *DB) containerCountersNextBatch(lastKey []byte, cc *ContainerCounters)
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
cc.Counts[cnrID] = ent
|
||||
f(cnrID, ent)
|
||||
|
||||
counter++
|
||||
if counter == batchSize {
|
||||
|
@ -187,6 +189,43 @@ func (db *DB) containerCountersNextBatch(lastKey []byte, cc *ContainerCounters)
|
|||
return false, nil
|
||||
}
|
||||
|
||||
func (db *DB) ContainerCount(ctx context.Context, id cid.ID) (ObjectCounters, error) {
|
||||
var (
|
||||
startedAt = time.Now()
|
||||
success = false
|
||||
)
|
||||
defer func() {
|
||||
db.metrics.AddMethodDuration("ContainerCount", time.Since(startedAt), success)
|
||||
}()
|
||||
|
||||
_, span := tracing.StartSpanFromContext(ctx, "metabase.ContainerCount")
|
||||
defer span.End()
|
||||
|
||||
db.modeMtx.RLock()
|
||||
defer db.modeMtx.RUnlock()
|
||||
|
||||
if db.mode.NoMetabase() {
|
||||
return ObjectCounters{}, ErrDegradedMode
|
||||
}
|
||||
|
||||
var result ObjectCounters
|
||||
|
||||
err := db.boltDB.View(func(tx *bbolt.Tx) error {
|
||||
b := tx.Bucket(containerCounterBucketName)
|
||||
key := make([]byte, cidSize)
|
||||
id.Encode(key)
|
||||
v := b.Get(key)
|
||||
if v == nil {
|
||||
return nil
|
||||
}
|
||||
var err error
|
||||
result, err = parseContainerCounterValue(v)
|
||||
return err
|
||||
})
|
||||
|
||||
return result, metaerr.Wrap(err)
|
||||
}
|
||||
|
||||
func (db *DB) incCounters(tx *bbolt.Tx, cnrID cid.ID, isUserObject bool) error {
|
||||
if err := db.updateShardObjectCounter(tx, phy, 1, true); err != nil {
|
||||
return fmt.Errorf("could not increase phy object counter: %w", err)
|
||||
|
@ -270,9 +309,6 @@ func (*DB) editContainerCounterValue(b *bbolt.Bucket, key []byte, delta ObjectCo
|
|||
entity.Phy = nextValue(entity.Phy, delta.Phy, inc)
|
||||
entity.Logic = nextValue(entity.Logic, delta.Logic, inc)
|
||||
entity.User = nextValue(entity.User, delta.User, inc)
|
||||
if entity.IsZero() {
|
||||
return b.Delete(key)
|
||||
}
|
||||
value := containerCounterValue(entity)
|
||||
return b.Put(key, value)
|
||||
}
|
||||
|
@ -610,3 +646,86 @@ func (db *DB) DeleteContainerSize(ctx context.Context, id cid.ID) error {
|
|||
success = err == nil
|
||||
return metaerr.Wrap(err)
|
||||
}
|
||||
|
||||
// ZeroCountContainers returns containers with objects count = 0 in metabase.
|
||||
func (db *DB) ZeroCountContainers(ctx context.Context) ([]cid.ID, error) {
|
||||
var (
|
||||
startedAt = time.Now()
|
||||
success = false
|
||||
)
|
||||
defer func() {
|
||||
db.metrics.AddMethodDuration("ZeroCountContainers", time.Since(startedAt), success)
|
||||
}()
|
||||
|
||||
ctx, span := tracing.StartSpanFromContext(ctx, "metabase.ZeroCountContainers")
|
||||
defer span.End()
|
||||
|
||||
db.modeMtx.RLock()
|
||||
defer db.modeMtx.RUnlock()
|
||||
|
||||
if db.mode.NoMetabase() {
|
||||
return nil, ErrDegradedMode
|
||||
}
|
||||
|
||||
var result []cid.ID
|
||||
|
||||
lastKey := make([]byte, cidSize)
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil, ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
completed, err := db.containerCountersNextBatch(lastKey, func(id cid.ID, entity ObjectCounters) {
|
||||
if entity.IsZero() {
|
||||
result = append(result, id)
|
||||
}
|
||||
})
|
||||
if err != nil {
|
||||
return nil, metaerr.Wrap(err)
|
||||
}
|
||||
if completed {
|
||||
break
|
||||
}
|
||||
}
|
||||
success = true
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (db *DB) DeleteContainerCount(ctx context.Context, id cid.ID) error {
|
||||
var (
|
||||
startedAt = time.Now()
|
||||
success = false
|
||||
)
|
||||
defer func() {
|
||||
db.metrics.AddMethodDuration("DeleteContainerCount", time.Since(startedAt), success)
|
||||
}()
|
||||
|
||||
_, span := tracing.StartSpanFromContext(ctx, "metabase.DeleteContainerCount",
|
||||
trace.WithAttributes(
|
||||
attribute.Stringer("container_id", id),
|
||||
))
|
||||
defer span.End()
|
||||
|
||||
db.modeMtx.RLock()
|
||||
defer db.modeMtx.RUnlock()
|
||||
|
||||
if db.mode.NoMetabase() {
|
||||
return ErrDegradedMode
|
||||
}
|
||||
|
||||
if db.mode.ReadOnly() {
|
||||
return ErrReadOnlyMode
|
||||
}
|
||||
|
||||
err := db.boltDB.Update(func(tx *bbolt.Tx) error {
|
||||
b := tx.Bucket(containerCounterBucketName)
|
||||
|
||||
key := make([]byte, cidSize)
|
||||
id.Encode(key)
|
||||
return b.Delete(key)
|
||||
})
|
||||
success = err == nil
|
||||
return metaerr.Wrap(err)
|
||||
}
|
||||
|
|
|
@ -105,11 +105,7 @@ func TestCounters(t *testing.T) {
|
|||
v.Phy--
|
||||
v.Logic--
|
||||
v.User--
|
||||
if v.IsZero() {
|
||||
delete(exp, cnrID)
|
||||
} else {
|
||||
exp[cnrID] = v
|
||||
}
|
||||
exp[cnrID] = v
|
||||
}
|
||||
|
||||
cc, err := db.ContainerCounters(context.Background())
|
||||
|
@ -428,11 +424,7 @@ func TestCounters_Expired(t *testing.T) {
|
|||
|
||||
if v, ok := exp[oo[0].Container()]; ok {
|
||||
v.Phy--
|
||||
if v.IsZero() {
|
||||
delete(exp, oo[0].Container())
|
||||
} else {
|
||||
exp[oo[0].Container()] = v
|
||||
}
|
||||
exp[oo[0].Container()] = v
|
||||
}
|
||||
|
||||
oo = oo[1:]
|
||||
|
@ -463,11 +455,7 @@ func TestCounters_Expired(t *testing.T) {
|
|||
v.Phy--
|
||||
v.Logic--
|
||||
v.User--
|
||||
if v.IsZero() {
|
||||
delete(exp, oo[0].Container())
|
||||
} else {
|
||||
exp[oo[0].Container()] = v
|
||||
}
|
||||
exp[oo[0].Container()] = v
|
||||
}
|
||||
|
||||
oo = oo[1:]
|
||||
|
|
|
@ -44,6 +44,43 @@ func (s *Shard) ContainerSize(prm ContainerSizePrm) (ContainerSizeRes, error) {
|
|||
}, nil
|
||||
}
|
||||
|
||||
type ContainerCountPrm struct {
|
||||
ContainerID cid.ID
|
||||
}
|
||||
|
||||
type ContainerCountRes struct {
|
||||
Phy uint64
|
||||
Logic uint64
|
||||
User uint64
|
||||
}
|
||||
|
||||
func (s *Shard) ContainerCount(ctx context.Context, prm ContainerCountPrm) (ContainerCountRes, error) {
|
||||
ctx, span := tracing.StartSpanFromContext(ctx, "Shard.ContainerCount",
|
||||
trace.WithAttributes(
|
||||
attribute.String("shard_id", s.ID().String()),
|
||||
attribute.Stringer("container_id", prm.ContainerID),
|
||||
))
|
||||
defer span.End()
|
||||
|
||||
s.m.RLock()
|
||||
defer s.m.RUnlock()
|
||||
|
||||
if s.info.Mode.NoMetabase() {
|
||||
return ContainerCountRes{}, ErrDegradedMode
|
||||
}
|
||||
|
||||
counters, err := s.metaBase.ContainerCount(ctx, prm.ContainerID)
|
||||
if err != nil {
|
||||
return ContainerCountRes{}, fmt.Errorf("could not get container counters: %w", err)
|
||||
}
|
||||
|
||||
return ContainerCountRes{
|
||||
Phy: counters.Phy,
|
||||
Logic: counters.Logic,
|
||||
User: counters.User,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (s *Shard) DeleteContainerSize(ctx context.Context, id cid.ID) error {
|
||||
ctx, span := tracing.StartSpanFromContext(ctx, "Shard.DeleteContainerSize",
|
||||
trace.WithAttributes(
|
||||
|
@ -65,3 +102,25 @@ func (s *Shard) DeleteContainerSize(ctx context.Context, id cid.ID) error {
|
|||
|
||||
return s.metaBase.DeleteContainerSize(ctx, id)
|
||||
}
|
||||
|
||||
func (s *Shard) DeleteContainerCount(ctx context.Context, id cid.ID) error {
|
||||
ctx, span := tracing.StartSpanFromContext(ctx, "Shard.DeleteContainerCount",
|
||||
trace.WithAttributes(
|
||||
attribute.String("shard_id", s.ID().String()),
|
||||
attribute.Stringer("container_id", id),
|
||||
))
|
||||
defer span.End()
|
||||
|
||||
s.m.RLock()
|
||||
defer s.m.RUnlock()
|
||||
|
||||
if s.info.Mode.ReadOnly() {
|
||||
return ErrReadOnlyMode
|
||||
}
|
||||
|
||||
if s.info.Mode.NoMetabase() {
|
||||
return ErrDegradedMode
|
||||
}
|
||||
|
||||
return s.metaBase.DeleteContainerCount(ctx, id)
|
||||
}
|
||||
|
|
|
@ -738,6 +738,7 @@ func (s *Shard) collectExpiredMetrics(ctx context.Context, e Event) {
|
|||
defer s.log.Debug(logs.ShardGCCollectingExpiredMetricsCompleted, zap.Uint64("epoch", epoch))
|
||||
|
||||
s.collectExpiredContainerSizeMetrics(ctx, epoch)
|
||||
s.collectExpiredContainerCountMetrics(ctx, epoch)
|
||||
}
|
||||
|
||||
func (s *Shard) collectExpiredContainerSizeMetrics(ctx context.Context, epoch uint64) {
|
||||
|
@ -751,3 +752,15 @@ func (s *Shard) collectExpiredContainerSizeMetrics(ctx context.Context, epoch ui
|
|||
}
|
||||
s.zeroSizeContainersCallback(ctx, ids)
|
||||
}
|
||||
|
||||
func (s *Shard) collectExpiredContainerCountMetrics(ctx context.Context, epoch uint64) {
|
||||
ids, err := s.metaBase.ZeroCountContainers(ctx)
|
||||
if err != nil {
|
||||
s.log.Warn(logs.ShardGCFailedToCollectZeroCountContainers, zap.Uint64("epoch", epoch), zap.Error(err))
|
||||
return
|
||||
}
|
||||
if len(ids) == 0 {
|
||||
return
|
||||
}
|
||||
s.zeroCountContainersCallback(ctx, ids)
|
||||
}
|
||||
|
|
|
@ -336,11 +336,7 @@ func TestCounters(t *testing.T) {
|
|||
v.Logic--
|
||||
v.Phy--
|
||||
v.User--
|
||||
if v.IsZero() {
|
||||
delete(expected, cnr)
|
||||
} else {
|
||||
expected[cnr] = v
|
||||
}
|
||||
expected[cnr] = v
|
||||
}
|
||||
}
|
||||
require.Equal(t, expectedLogicalSizes, mm.containerSizes())
|
||||
|
|
|
@ -53,8 +53,8 @@ type ExpiredObjectsCallback func(context.Context, uint64, []oid.Address)
|
|||
// DeletedLockCallback is a callback handling list of deleted LOCK objects.
|
||||
type DeletedLockCallback func(context.Context, []oid.Address)
|
||||
|
||||
// ZeroSizeContainersCallback is a callback hanfling list of zero-size containers.
|
||||
type ZeroSizeContainersCallback func(context.Context, []cid.ID)
|
||||
// EmptyContainersCallback is a callback hanfling list of zero-size and zero-count containers.
|
||||
type EmptyContainersCallback func(context.Context, []cid.ID)
|
||||
|
||||
// MetricsWriter is an interface that must store shard's metrics.
|
||||
type MetricsWriter interface {
|
||||
|
@ -121,7 +121,8 @@ type cfg struct {
|
|||
|
||||
deletedLockCallBack DeletedLockCallback
|
||||
|
||||
zeroSizeContainersCallback ZeroSizeContainersCallback
|
||||
zeroSizeContainersCallback EmptyContainersCallback
|
||||
zeroCountContainersCallback EmptyContainersCallback
|
||||
|
||||
tsSource TombstoneSource
|
||||
|
||||
|
@ -134,12 +135,13 @@ type cfg struct {
|
|||
|
||||
func defaultCfg() *cfg {
|
||||
return &cfg{
|
||||
rmBatchSize: 100,
|
||||
log: &logger.Logger{Logger: zap.L()},
|
||||
gcCfg: defaultGCCfg(),
|
||||
reportErrorFunc: func(string, string, error) {},
|
||||
rebuildLimiter: &noopRebuildLimiter{},
|
||||
zeroSizeContainersCallback: func(context.Context, []cid.ID) {},
|
||||
rmBatchSize: 100,
|
||||
log: &logger.Logger{Logger: zap.L()},
|
||||
gcCfg: defaultGCCfg(),
|
||||
reportErrorFunc: func(string, string, error) {},
|
||||
rebuildLimiter: &noopRebuildLimiter{},
|
||||
zeroSizeContainersCallback: func(context.Context, []cid.ID) {},
|
||||
zeroCountContainersCallback: func(context.Context, []cid.ID) {},
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -370,12 +372,19 @@ func WithRebuildWorkerLimiter(l RebuildWorkerLimiter) Option {
|
|||
}
|
||||
|
||||
// WithZeroSizeCallback returns option to set zero-size containers callback.
|
||||
func WithZeroSizeCallback(cb ZeroSizeContainersCallback) Option {
|
||||
func WithZeroSizeCallback(cb EmptyContainersCallback) Option {
|
||||
return func(c *cfg) {
|
||||
c.zeroSizeContainersCallback = cb
|
||||
}
|
||||
}
|
||||
|
||||
// WithZeroCountCallback returns option to set zero-count containers callback.
|
||||
func WithZeroCountCallback(cb EmptyContainersCallback) Option {
|
||||
return func(c *cfg) {
|
||||
c.zeroCountContainersCallback = cb
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Shard) fillInfo() {
|
||||
s.cfg.info.MetaBaseInfo = s.metaBase.DumpInfo()
|
||||
s.cfg.info.BlobStorInfo = s.blobStor.DumpInfo()
|
||||
|
|
|
@ -12,6 +12,7 @@ type EngineMetrics interface {
|
|||
AddMethodDuration(method string, d time.Duration)
|
||||
AddToContainerSize(cnrID string, size int64)
|
||||
DeleteContainerSize(cnrID string)
|
||||
DeleteContainerCount(cnrID string)
|
||||
IncErrorCounter(shardID string)
|
||||
ClearErrorCounter(shardID string)
|
||||
DeleteShardMetrics(shardID string)
|
||||
|
@ -84,6 +85,10 @@ func (m *engineMetrics) DeleteContainerSize(cnrID string) {
|
|||
m.containerSize.DeletePartialMatch(prometheus.Labels{containerIDLabelKey: cnrID})
|
||||
}
|
||||
|
||||
func (m *engineMetrics) DeleteContainerCount(cnrID string) {
|
||||
m.contObjCounter.DeletePartialMatch(prometheus.Labels{containerIDLabelKey: cnrID})
|
||||
}
|
||||
|
||||
func (m *engineMetrics) AddToPayloadCounter(shardID string, size int64) {
|
||||
m.payloadSize.With(prometheus.Labels{shardIDLabel: shardID}).Add(float64(size))
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue