WIP: Morph: Add unit tests #2

Closed
dstepanov-yadro wants to merge 233 commits from TrueCloudLab/frostfs-node:master into object-3608-morph-unit-tests
7 changed files with 244 additions and 26 deletions
Showing only changes of commit 3220c4df9f - Show all commits

View file

@ -28,6 +28,7 @@ type MetricRegister interface {
AddToPayloadCounter(shardID string, size int64) AddToPayloadCounter(shardID string, size int64)
WriteCache() metrics.WriteCacheMetrics WriteCache() metrics.WriteCacheMetrics
GC() metrics.GCMetrics
} }
func elapsed(addFunc func(d time.Duration)) func() { func elapsed(addFunc func(d time.Duration)) func() {
@ -37,3 +38,24 @@ func elapsed(addFunc func(d time.Duration)) func() {
addFunc(time.Since(t)) addFunc(time.Since(t))
} }
} }
type gcMetrics struct {
storage metrics.GCMetrics
shardID string
}
func (m *gcMetrics) AddRunDuration(d time.Duration, success bool) {
m.storage.AddRunDuration(m.shardID, d, success)
}
func (m *gcMetrics) AddDeletedCount(deleted, failed uint64) {
m.storage.AddDeletedCount(m.shardID, deleted, failed)
}
func (m *gcMetrics) AddExpiredObjectCollectionDuration(d time.Duration, success bool, objectType string) {
m.storage.AddExpiredObjectCollectionDuration(m.shardID, d, success, objectType)
}
func (m *gcMetrics) AddInhumedObjectCount(count uint64, objectType string) {
m.storage.AddInhumedObjectCount(m.shardID, count, objectType)
}

View file

@ -98,13 +98,20 @@ func (e *StorageEngine) createShard(opts []shard.Option) (*shard.Shard, error) {
id: id.String(), id: id.String(),
mw: e.metrics, mw: e.metrics,
}, },
)) ),
opts = append(opts, shard.WithExtraWriteCacheOptions(writecache.WithMetrics( shard.WithExtraWriteCacheOptions(writecache.WithMetrics(
&writeCacheMetrics{ &writeCacheMetrics{
shardID: id.String(), shardID: id.String(),
metrics: e.metrics.WriteCache(), metrics: e.metrics.WriteCache(),
}),
),
shard.WithGCMetrics(
&gcMetrics{
storage: e.metrics.GC(),
shardID: id.String(),
}, },
))) ),
)
} }
e.mtx.RUnlock() e.mtx.RUnlock()

View file

@ -21,7 +21,9 @@ type DeletePrm struct {
} }
// DeleteRes groups the resulting values of Delete operation. // DeleteRes groups the resulting values of Delete operation.
type DeleteRes struct{} type DeleteRes struct {
deleted uint64
}
// SetAddresses is a Delete option to set the addresses of the objects to delete. // SetAddresses is a Delete option to set the addresses of the objects to delete.
// //
@ -53,10 +55,11 @@ func (s *Shard) delete(ctx context.Context, prm DeletePrm) (DeleteRes, error) {
return DeleteRes{}, ErrDegradedMode return DeleteRes{}, ErrDegradedMode
} }
result := DeleteRes{}
for _, addr := range prm.addr { for _, addr := range prm.addr {
select { select {
case <-ctx.Done(): case <-ctx.Done():
return DeleteRes{}, ctx.Err() return result, ctx.Err()
default: default:
} }
@ -65,11 +68,12 @@ func (s *Shard) delete(ctx context.Context, prm DeletePrm) (DeleteRes, error) {
s.deleteFromBlobstorSafe(ctx, addr) s.deleteFromBlobstorSafe(ctx, addr)
if err := s.deleteFromMetabase(ctx, addr); err != nil { if err := s.deleteFromMetabase(ctx, addr); err != nil {
return DeleteRes{}, err // stop on metabase error ? return result, err // stop on metabase error ?
} }
result.deleted++
} }
return DeleteRes{}, nil return result, nil
} }
func (s *Shard) deleteObjectFromWriteCacheSafe(ctx context.Context, addr oid.Address) { func (s *Shard) deleteObjectFromWriteCacheSafe(ctx context.Context, addr oid.Address) {

View file

@ -67,6 +67,32 @@ type eventHandlers struct {
handlers []eventHandler handlers []eventHandler
} }
type gcRunResult struct {
success bool
deleted uint64
failedToDelete uint64
}
const (
objectTypeLock = "lock"
objectTypeTombstone = "tombstone"
objectTypeRegular = "regular"
)
type GCMectrics interface {
AddRunDuration(d time.Duration, success bool)
AddDeletedCount(deleted, failed uint64)
AddExpiredObjectCollectionDuration(d time.Duration, success bool, objectType string)
AddInhumedObjectCount(count uint64, objectType string)
}
type noopGCMetrics struct{}
func (m *noopGCMetrics) AddRunDuration(time.Duration, bool) {}
func (m *noopGCMetrics) AddDeletedCount(uint64, uint64) {}
func (m *noopGCMetrics) AddExpiredObjectCollectionDuration(time.Duration, bool, string) {}
func (m *noopGCMetrics) AddInhumedObjectCount(uint64, string) {}
type gc struct { type gc struct {
*gcCfg *gcCfg
@ -76,7 +102,7 @@ type gc struct {
workerPool util.WorkerPool workerPool util.WorkerPool
remover func(context.Context) remover func(context.Context) gcRunResult
eventChan chan Event eventChan chan Event
mEventHandler map[eventType]*eventHandlers mEventHandler map[eventType]*eventHandlers
@ -91,6 +117,8 @@ type gcCfg struct {
expiredCollectorWorkersCount int expiredCollectorWorkersCount int
expiredCollectorBatchSize int expiredCollectorBatchSize int
metrics GCMectrics
} }
func defaultGCCfg() gcCfg { func defaultGCCfg() gcCfg {
@ -100,6 +128,7 @@ func defaultGCCfg() gcCfg {
workerPoolInit: func(int) util.WorkerPool { workerPoolInit: func(int) util.WorkerPool {
return nil return nil
}, },
metrics: &noopGCMetrics{},
} }
} }
@ -178,8 +207,13 @@ func (gc *gc) tickRemover(ctx context.Context) {
gc.log.Debug(logs.ShardGCIsStopped) gc.log.Debug(logs.ShardGCIsStopped)
return return
case <-timer.C: case <-timer.C:
gc.remover(ctx) startedAt := time.Now()
result := gc.remover(ctx)
timer.Reset(gc.removerInterval) timer.Reset(gc.removerInterval)
gc.metrics.AddRunDuration(time.Since(startedAt), result.success)
gc.metrics.AddDeletedCount(result.deleted, result.failedToDelete)
} }
} }
} }
@ -196,7 +230,7 @@ func (gc *gc) stop() {
// iterates over metabase and deletes objects // iterates over metabase and deletes objects
// with GC-marked graves. // with GC-marked graves.
// Does nothing if shard is in "read-only" mode. // Does nothing if shard is in "read-only" mode.
func (s *Shard) removeGarbage(pctx context.Context) { func (s *Shard) removeGarbage(pctx context.Context) (result gcRunResult) {
ctx, cancel := context.WithCancel(pctx) ctx, cancel := context.WithCancel(pctx)
defer cancel() defer cancel()
@ -244,6 +278,7 @@ func (s *Shard) removeGarbage(pctx context.Context) {
return return
} else if len(buf) == 0 { } else if len(buf) == 0 {
result.success = true
return return
} }
@ -251,15 +286,21 @@ func (s *Shard) removeGarbage(pctx context.Context) {
deletePrm.SetAddresses(buf...) deletePrm.SetAddresses(buf...)
// delete accumulated objects // delete accumulated objects
_, err = s.delete(ctx, deletePrm) res, err := s.delete(ctx, deletePrm)
result.deleted = res.deleted
result.failedToDelete = uint64(len(buf)) - res.deleted
result.success = true
if err != nil { if err != nil {
s.log.Warn(logs.ShardCouldNotDeleteTheObjects, s.log.Warn(logs.ShardCouldNotDeleteTheObjects,
zap.String("error", err.Error()), zap.String("error", err.Error()),
) )
result.success = false
}
return return
} }
}
func (s *Shard) getExpiredObjectsParameters() (workersCount, batchSize int) { func (s *Shard) getExpiredObjectsParameters() (workersCount, batchSize int) {
workersCount = minExpiredWorkers workersCount = minExpiredWorkers
@ -276,6 +317,13 @@ func (s *Shard) getExpiredObjectsParameters() (workersCount, batchSize int) {
} }
func (s *Shard) collectExpiredObjects(ctx context.Context, e Event) { func (s *Shard) collectExpiredObjects(ctx context.Context, e Event) {
var err error
startedAt := time.Now()
defer func() {
s.gc.metrics.AddExpiredObjectCollectionDuration(time.Since(startedAt), err == nil, objectTypeRegular)
}()
s.log.Debug(logs.ShardGCCollectingExpiredObjectsStarted, zap.Uint64("epoch", e.(newEpoch).epoch)) s.log.Debug(logs.ShardGCCollectingExpiredObjectsStarted, zap.Uint64("epoch", e.(newEpoch).epoch))
defer s.log.Debug(logs.ShardGCCollectingExpiredObjectsCompleted, zap.Uint64("epoch", e.(newEpoch).epoch)) defer s.log.Debug(logs.ShardGCCollectingExpiredObjectsCompleted, zap.Uint64("epoch", e.(newEpoch).epoch))
@ -286,7 +334,7 @@ func (s *Shard) collectExpiredObjects(ctx context.Context, e Event) {
errGroup.Go(func() error { errGroup.Go(func() error {
batch := make([]oid.Address, 0, batchSize) batch := make([]oid.Address, 0, batchSize)
err := s.getExpiredObjects(egCtx, e.(newEpoch).epoch, func(o *meta.ExpiredObject) { expErr := s.getExpiredObjects(egCtx, e.(newEpoch).epoch, func(o *meta.ExpiredObject) {
if o.Type() != object.TypeTombstone && o.Type() != object.TypeLock { if o.Type() != object.TypeTombstone && o.Type() != object.TypeLock {
batch = append(batch, o.Address()) batch = append(batch, o.Address())
@ -300,8 +348,8 @@ func (s *Shard) collectExpiredObjects(ctx context.Context, e Event) {
} }
} }
}) })
if err != nil { if expErr != nil {
return err return expErr
} }
if len(batch) > 0 { if len(batch) > 0 {
@ -315,7 +363,7 @@ func (s *Shard) collectExpiredObjects(ctx context.Context, e Event) {
return nil return nil
}) })
if err := errGroup.Wait(); err != nil { if err = errGroup.Wait(); err != nil {
s.log.Warn(logs.ShardIteratorOverExpiredObjectsFailed, zap.String("error", err.Error())) s.log.Warn(logs.ShardIteratorOverExpiredObjectsFailed, zap.String("error", err.Error()))
} }
} }
@ -355,6 +403,7 @@ func (s *Shard) handleExpiredObjects(ctx context.Context, expired []oid.Address)
return return
} }
s.gc.metrics.AddInhumedObjectCount(res.AvailableInhumed(), objectTypeRegular)
s.decObjectCounterBy(logical, res.AvailableInhumed()) s.decObjectCounterBy(logical, res.AvailableInhumed())
i := 0 i := 0
@ -380,6 +429,13 @@ func (s *Shard) getExpiredWithLinked(source []oid.Address) ([]oid.Address, error
} }
func (s *Shard) collectExpiredTombstones(ctx context.Context, e Event) { func (s *Shard) collectExpiredTombstones(ctx context.Context, e Event) {
var err error
startedAt := time.Now()
defer func() {
s.gc.metrics.AddExpiredObjectCollectionDuration(time.Since(startedAt), err == nil, objectTypeTombstone)
}()
epoch := e.(newEpoch).epoch epoch := e.(newEpoch).epoch
log := s.log.With(zap.Uint64("epoch", epoch)) log := s.log.With(zap.Uint64("epoch", epoch))
@ -413,7 +469,7 @@ func (s *Shard) collectExpiredTombstones(ctx context.Context, e Event) {
return return
} }
err := s.metaBase.IterateOverGraveyard(iterPrm) err = s.metaBase.IterateOverGraveyard(iterPrm)
if err != nil { if err != nil {
log.Error(logs.ShardIteratorOverGraveyardFailed, zap.Error(err)) log.Error(logs.ShardIteratorOverGraveyardFailed, zap.Error(err))
s.m.RUnlock() s.m.RUnlock()
@ -444,6 +500,13 @@ func (s *Shard) collectExpiredTombstones(ctx context.Context, e Event) {
} }
func (s *Shard) collectExpiredLocks(ctx context.Context, e Event) { func (s *Shard) collectExpiredLocks(ctx context.Context, e Event) {
var err error
startedAt := time.Now()
defer func() {
s.gc.metrics.AddExpiredObjectCollectionDuration(time.Since(startedAt), err == nil, objectTypeLock)
}()
s.log.Debug(logs.ShardGCCollectingExpiredLocksStarted, zap.Uint64("epoch", e.(newEpoch).epoch)) s.log.Debug(logs.ShardGCCollectingExpiredLocksStarted, zap.Uint64("epoch", e.(newEpoch).epoch))
defer s.log.Debug(logs.ShardGCCollectingExpiredLocksCompleted, zap.Uint64("epoch", e.(newEpoch).epoch)) defer s.log.Debug(logs.ShardGCCollectingExpiredLocksCompleted, zap.Uint64("epoch", e.(newEpoch).epoch))
@ -455,7 +518,7 @@ func (s *Shard) collectExpiredLocks(ctx context.Context, e Event) {
errGroup.Go(func() error { errGroup.Go(func() error {
batch := make([]oid.Address, 0, batchSize) batch := make([]oid.Address, 0, batchSize)
err := s.getExpiredObjects(egCtx, e.(newEpoch).epoch, func(o *meta.ExpiredObject) { expErr := s.getExpiredObjects(egCtx, e.(newEpoch).epoch, func(o *meta.ExpiredObject) {
if o.Type() == object.TypeLock { if o.Type() == object.TypeLock {
batch = append(batch, o.Address()) batch = append(batch, o.Address())
@ -469,8 +532,8 @@ func (s *Shard) collectExpiredLocks(ctx context.Context, e Event) {
} }
} }
}) })
if err != nil { if expErr != nil {
return err return expErr
} }
if len(batch) > 0 { if len(batch) > 0 {
@ -484,7 +547,7 @@ func (s *Shard) collectExpiredLocks(ctx context.Context, e Event) {
return nil return nil
}) })
if err := errGroup.Wait(); err != nil { if err = errGroup.Wait(); err != nil {
s.log.Warn(logs.ShardIteratorOverExpiredLocksFailed, zap.String("error", err.Error())) s.log.Warn(logs.ShardIteratorOverExpiredLocksFailed, zap.String("error", err.Error()))
} }
} }
@ -553,6 +616,7 @@ func (s *Shard) HandleExpiredTombstones(ctx context.Context, tss []meta.Tombston
return return
} }
s.gc.metrics.AddInhumedObjectCount(res.AvailableInhumed(), objectTypeTombstone)
s.decObjectCounterBy(logical, res.AvailableInhumed()) s.decObjectCounterBy(logical, res.AvailableInhumed())
i := 0 i := 0
@ -598,6 +662,7 @@ func (s *Shard) HandleExpiredLocks(ctx context.Context, epoch uint64, lockers []
return return
} }
s.gc.metrics.AddInhumedObjectCount(res.AvailableInhumed(), objectTypeLock)
s.decObjectCounterBy(logical, res.AvailableInhumed()) s.decObjectCounterBy(logical, res.AvailableInhumed())
i := 0 i := 0

View file

@ -309,6 +309,13 @@ func WithMetricsWriter(v MetricsWriter) Option {
} }
} }
// WithGCMetrics returns option to specify storage of the GC metrics.
func WithGCMetrics(v GCMectrics) Option {
return func(c *cfg) {
c.gcCfg.metrics = v
}
}
// WithReportErrorFunc returns option to specify callback for handling storage-related errors // WithReportErrorFunc returns option to specify callback for handling storage-related errors
// in the background workers. // in the background workers.
func WithReportErrorFunc(f func(selfID string, message string, err error)) Option { func WithReportErrorFunc(f func(selfID string, message string, err error)) Option {

104
pkg/metrics/gc.go Normal file
View file

@ -0,0 +1,104 @@
package metrics
import (
"fmt"
"time"
"github.com/prometheus/client_golang/prometheus"
)
const (
gcSubsystem = "garbage_collector"
gcShardID = "shard_id"
gcSuccess = "success"
gcStatus = "status"
gcDeleted = "deleted"
gcFailed = "failed_to_delete"
gcObjectType = "object_type"
)
type GCMetrics interface {
AddRunDuration(shardID string, d time.Duration, success bool)
AddDeletedCount(shardID string, deleted, failed uint64)
AddExpiredObjectCollectionDuration(shardID string, d time.Duration, success bool, objectType string)
AddInhumedObjectCount(shardID string, count uint64, objectType string)
}
type gcMetrics struct {
runDuration metric[*prometheus.CounterVec]
deletedCounter metric[*prometheus.CounterVec]
expCollectDuration metric[*prometheus.CounterVec]
inhumedCounter metric[*prometheus.CounterVec]
}
func (m *gcMetrics) register() {
mustRegister(m.runDuration)
mustRegister(m.deletedCounter)
mustRegister(m.expCollectDuration)
mustRegister(m.inhumedCounter)
}
func newGCMetrics() *gcMetrics {
return &gcMetrics{
runDuration: newCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: gcSubsystem,
Name: "delete_duration_seconds",
Help: "The total time of GC runs to delete objects from disk",
}, []string{gcShardID, gcSuccess}),
deletedCounter: newCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: gcSubsystem,
Name: "deleted_objects_count",
Help: "Total count of objects GC deleted or failed to delete from disk",
}, []string{gcShardID, gcStatus}),
expCollectDuration: newCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: gcSubsystem,
Name: "marking_duration_seconds",
Help: "The total time of GC runs to mark expired objects as removed",
}, []string{gcShardID, gcSuccess, gcObjectType}),
inhumedCounter: newCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: gcSubsystem,
Name: "marked_for_removal_objects_count",
Help: "Total count of expired objects GC marked to remove",
}, []string{gcShardID, gcObjectType}),
}
}
func (m *gcMetrics) AddRunDuration(shardID string, d time.Duration, success bool) {
m.runDuration.value.With(prometheus.Labels{
gcShardID: shardID,
gcSuccess: fmt.Sprintf("%v", success),
}).Add(d.Seconds())
}
func (m *gcMetrics) AddDeletedCount(shardID string, deleted, failed uint64) {
m.deletedCounter.value.With(
prometheus.Labels{
gcShardID: shardID,
gcStatus: gcDeleted,
}).Add(float64(deleted))
m.deletedCounter.value.With(
prometheus.Labels{
gcShardID: shardID,
gcStatus: gcFailed,
}).Add(float64(failed))
}
func (m *gcMetrics) AddExpiredObjectCollectionDuration(shardID string, d time.Duration, success bool, objectType string) {
m.expCollectDuration.value.With(prometheus.Labels{
gcShardID: shardID,
gcSuccess: fmt.Sprintf("%v", success),
gcObjectType: objectType,
}).Add(d.Seconds())
}
func (m *gcMetrics) AddInhumedObjectCount(shardID string, count uint64, objectType string) {
m.inhumedCounter.value.With(
prometheus.Labels{
gcShardID: shardID,
gcObjectType: objectType,
}).Add(float64(count))
}

View file

@ -16,6 +16,7 @@ type NodeMetrics struct {
writeCacheMetrics *writeCacheMetrics writeCacheMetrics *writeCacheMetrics
treeService *treeServiceMetrics treeService *treeServiceMetrics
epoch metric[prometheus.Gauge] epoch metric[prometheus.Gauge]
gc *gcMetrics
} }
func NewNodeMetrics() *NodeMetrics { func NewNodeMetrics() *NodeMetrics {
@ -45,6 +46,9 @@ func NewNodeMetrics() *NodeMetrics {
writeCacheMetrics := newWriteCacheMetrics() writeCacheMetrics := newWriteCacheMetrics()
writeCacheMetrics.register() writeCacheMetrics.register()
gc := newGCMetrics()
gc.register()
return &NodeMetrics{ return &NodeMetrics{
objectServiceMetrics: objectService, objectServiceMetrics: objectService,
engineMetrics: engine, engineMetrics: engine,
@ -53,6 +57,7 @@ func NewNodeMetrics() *NodeMetrics {
treeService: treeService, treeService: treeService,
epoch: epoch, epoch: epoch,
writeCacheMetrics: writeCacheMetrics, writeCacheMetrics: writeCacheMetrics,
gc: gc,
} }
} }
@ -72,3 +77,7 @@ func (m *NodeMetrics) WriteCache() WriteCacheMetrics {
func (m *NodeMetrics) TreeService() tree.MetricsRegister { func (m *NodeMetrics) TreeService() tree.MetricsRegister {
return m.treeService return m.treeService
} }
func (m *NodeMetrics) GC() GCMetrics {
return m.gc
}