From 431e331373ad991ba307d43ca9ee7b0564cb5bea Mon Sep 17 00:00:00 2001 From: Pavel Karpy Date: Fri, 9 Sep 2022 14:36:34 +0300 Subject: [PATCH] [#1658] shard: Update metric counters Use meta's operation results to change the metrics. Support typed object counters. Signed-off-by: Pavel Karpy --- .../shard/control_test.go | 12 ++ pkg/local_object_storage/shard/delete.go | 3 +- pkg/local_object_storage/shard/gc.go | 12 +- pkg/local_object_storage/shard/inhume.go | 2 + .../shard/metrics_test.go | 181 ++++++++++++++++++ pkg/local_object_storage/shard/shard.go | 46 +++-- 6 files changed, 238 insertions(+), 18 deletions(-) create mode 100644 pkg/local_object_storage/shard/metrics_test.go diff --git a/pkg/local_object_storage/shard/control_test.go b/pkg/local_object_storage/shard/control_test.go index 5df24c068..01aa3f67f 100644 --- a/pkg/local_object_storage/shard/control_test.go +++ b/pkg/local_object_storage/shard/control_test.go @@ -312,6 +312,12 @@ func TestRefillMetabase(t *testing.T) { checkTombMembers(true) checkLocked(t, cnrLocked, locked) + c, err := sh.metaBase.ObjectCounters() + require.NoError(t, err) + + phyBefore := c.Phy() + logicalBefore := c.Logic() + err = sh.Close() require.NoError(t, err) @@ -340,6 +346,12 @@ func TestRefillMetabase(t *testing.T) { err = sh.refillMetabase() require.NoError(t, err) + c, err = sh.metaBase.ObjectCounters() + require.NoError(t, err) + + require.Equal(t, phyBefore, c.Phy()) + require.Equal(t, logicalBefore, c.Logic()) + checkAllObjs(true) checkObj(object.AddressOf(tombObj), tombObj) checkTombMembers(true) diff --git a/pkg/local_object_storage/shard/delete.go b/pkg/local_object_storage/shard/delete.go index b1c3c81ac..7e1025753 100644 --- a/pkg/local_object_storage/shard/delete.go +++ b/pkg/local_object_storage/shard/delete.go @@ -69,7 +69,8 @@ func (s *Shard) Delete(prm DeletePrm) (DeleteRes, error) { return DeleteRes{}, err // stop on metabase error ? } - s.decObjectCounterBy(res.RemovedObjects()) + s.decObjectCounterBy(physical, res.RawObjectsRemoved()) + s.decObjectCounterBy(logical, res.AvailableObjectsRemoved()) for i := range prm.addr { // delete small object var delPrm common.DeletePrm diff --git a/pkg/local_object_storage/shard/gc.go b/pkg/local_object_storage/shard/gc.go index 12653aea6..6765bfa05 100644 --- a/pkg/local_object_storage/shard/gc.go +++ b/pkg/local_object_storage/shard/gc.go @@ -241,7 +241,7 @@ func (s *Shard) collectExpiredObjects(ctx context.Context, e Event) { inhumePrm.SetGCMark() // inhume the collected objects - _, err = s.metaBase.Inhume(inhumePrm) + res, err := s.metaBase.Inhume(inhumePrm) if err != nil { s.log.Warn("could not inhume the objects", zap.String("error", err.Error()), @@ -249,6 +249,8 @@ func (s *Shard) collectExpiredObjects(ctx context.Context, e Event) { return } + + s.decObjectCounterBy(logical, res.AvailableInhumed()) } func (s *Shard) collectExpiredTombstones(ctx context.Context, e Event) { @@ -354,7 +356,7 @@ func (s *Shard) HandleExpiredTombstones(tss []meta.TombstonedObject) { pInhume.SetAddresses(tsAddrs...) // inhume tombstones - _, err := s.metaBase.Inhume(pInhume) + res, err := s.metaBase.Inhume(pInhume) if err != nil { s.log.Warn("could not mark tombstones as garbage", zap.String("error", err.Error()), @@ -363,6 +365,8 @@ func (s *Shard) HandleExpiredTombstones(tss []meta.TombstonedObject) { return } + s.decObjectCounterBy(logical, res.AvailableInhumed()) + // drop just processed expired tombstones // from graveyard err = s.metaBase.DropGraves(tss) @@ -387,7 +391,7 @@ func (s *Shard) HandleExpiredLocks(lockers []oid.Address) { pInhume.SetAddresses(lockers...) pInhume.SetGCMark() - _, err = s.metaBase.Inhume(pInhume) + res, err := s.metaBase.Inhume(pInhume) if err != nil { s.log.Warn("failure to mark lockers as garbage", zap.String("error", err.Error()), @@ -395,6 +399,8 @@ func (s *Shard) HandleExpiredLocks(lockers []oid.Address) { return } + + s.decObjectCounterBy(logical, res.AvailableInhumed()) } // HandleDeletedLocks unlocks all objects which were locked by lockers. diff --git a/pkg/local_object_storage/shard/inhume.go b/pkg/local_object_storage/shard/inhume.go index 4df938223..81ae4a7ae 100644 --- a/pkg/local_object_storage/shard/inhume.go +++ b/pkg/local_object_storage/shard/inhume.go @@ -101,6 +101,8 @@ func (s *Shard) Inhume(prm InhumePrm) (InhumeRes, error) { return InhumeRes{}, fmt.Errorf("metabase inhume: %w", err) } + s.decObjectCounterBy(logical, res.AvailableInhumed()) + if deletedLockObjs := res.DeletedLockObjects(); len(deletedLockObjs) != 0 { s.deletedLockCallBack(context.Background(), deletedLockObjs) } diff --git a/pkg/local_object_storage/shard/metrics_test.go b/pkg/local_object_storage/shard/metrics_test.go new file mode 100644 index 000000000..c6eec3456 --- /dev/null +++ b/pkg/local_object_storage/shard/metrics_test.go @@ -0,0 +1,181 @@ +package shard_test + +import ( + "path/filepath" + "testing" + + objectcore "github.com/nspcc-dev/neofs-node/pkg/core/object" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/fstree" + meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/pilorama" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard" + "github.com/nspcc-dev/neofs-sdk-go/object" + oid "github.com/nspcc-dev/neofs-sdk-go/object/id" + "github.com/stretchr/testify/require" +) + +type metricsStore struct { + s map[string]uint64 +} + +func (m metricsStore) SetObjectCounter(objectType string, v uint64) { + m.s[objectType] = v +} + +func (m metricsStore) AddToObjectCounter(objectType string, delta int) { + switch { + case delta > 0: + m.s[objectType] += uint64(delta) + case delta < 0: + uDelta := uint64(-delta) + + if m.s[objectType] >= uDelta { + m.s[objectType] -= uDelta + } else { + m.s[objectType] = 0 + } + case delta == 0: + return + } +} + +func (m metricsStore) IncObjectCounter(objectType string) { + m.s[objectType] += 1 +} + +func (m metricsStore) DecObjectCounter(objectType string) { + m.AddToObjectCounter(objectType, -1) +} + +const physical = "phy" +const logical = "logic" + +func TestCounters(t *testing.T) { + dir := t.TempDir() + sh, mm := shardWithMetrics(t, dir) + + const objNumber = 10 + oo := make([]*object.Object, objNumber) + for i := 0; i < objNumber; i++ { + oo[i] = generateObject(t) + } + + t.Run("defaults", func(t *testing.T) { + require.Zero(t, mm.s[physical]) + require.Zero(t, mm.s[logical]) + }) + + t.Run("put", func(t *testing.T) { + var prm shard.PutPrm + + for i := 0; i < objNumber; i++ { + prm.SetObject(oo[i]) + + _, err := sh.Put(prm) + require.NoError(t, err) + } + + require.Equal(t, uint64(objNumber), mm.s[physical]) + require.Equal(t, uint64(objNumber), mm.s[logical]) + }) + + t.Run("inhume_GC", func(t *testing.T) { + var prm shard.InhumePrm + inhumedNumber := objNumber / 4 + + for i := 0; i < inhumedNumber; i++ { + prm.MarkAsGarbage(objectcore.AddressOf(oo[i])) + + _, err := sh.Inhume(prm) + require.NoError(t, err) + } + + require.Equal(t, uint64(objNumber), mm.s[physical]) + require.Equal(t, uint64(objNumber-inhumedNumber), mm.s[logical]) + + oo = oo[inhumedNumber:] + }) + + t.Run("inhume_TS", func(t *testing.T) { + var prm shard.InhumePrm + ts := objectcore.AddressOf(generateObject(t)) + + phy := mm.s[physical] + logic := mm.s[logical] + + inhumedNumber := int(phy / 4) + prm.SetTarget(ts, addrFromObjs(oo[:inhumedNumber])...) + + _, err := sh.Inhume(prm) + require.NoError(t, err) + + require.Equal(t, phy, mm.s[physical]) + require.Equal(t, logic-uint64(inhumedNumber), mm.s[logical]) + + oo = oo[inhumedNumber:] + }) + + t.Run("Delete", func(t *testing.T) { + var prm shard.DeletePrm + + phy := mm.s[physical] + logic := mm.s[logical] + + deletedNumber := int(phy / 4) + prm.SetAddresses(addrFromObjs(oo[:deletedNumber])...) + + _, err := sh.Delete(prm) + require.NoError(t, err) + + require.Equal(t, phy-uint64(deletedNumber), mm.s[physical]) + require.Equal(t, logic-uint64(deletedNumber), mm.s[logical]) + }) +} + +func shardWithMetrics(t *testing.T, path string) (*shard.Shard, *metricsStore) { + blobOpts := []blobstor.Option{ + blobstor.WithStorages([]blobstor.SubStorage{ + { + Storage: fstree.New( + fstree.WithDirNameLen(2), + fstree.WithPath(filepath.Join(path, "blob")), + fstree.WithDepth(1)), + }, + }), + } + + mm := &metricsStore{ + s: map[string]uint64{ + "phy": 0, + "logic": 0, + }, + } + + sh := shard.New( + shard.WithBlobStorOptions(blobOpts...), + shard.WithPiloramaOptions(pilorama.WithPath(filepath.Join(path, "pilorama"))), + shard.WithMetaBaseOptions( + meta.WithPath(filepath.Join(path, "meta")), + meta.WithEpochState(epochState{})), + shard.WithMetricsWriter(mm), + ) + require.NoError(t, sh.Open()) + require.NoError(t, sh.Init()) + + t.Cleanup(func() { + sh.Close() + }) + + return sh, mm +} + +func addrFromObjs(oo []*object.Object) []oid.Address { + aa := make([]oid.Address, len(oo)) + + for i := 0; i < len(oo); i++ { + aa[i] = objectcore.AddressOf(oo[i]) + } + + return aa +} diff --git a/pkg/local_object_storage/shard/shard.go b/pkg/local_object_storage/shard/shard.go index 0785637cc..1c386a48c 100644 --- a/pkg/local_object_storage/shard/shard.go +++ b/pkg/local_object_storage/shard/shard.go @@ -47,15 +47,18 @@ type DeletedLockCallback func(context.Context, []oid.Address) // MetricsWriter is an interface that must store shard's metrics. type MetricsWriter interface { - // SetObjectCounter must set object counter. - SetObjectCounter(v uint64) - // AddToObjectCounter must update object counter. Negative - // parameter must decrease the counter. - AddToObjectCounter(delta int) - // IncObjectCounter must increment shard's object counter. - IncObjectCounter() - // DecObjectCounter must decrement shard's object counter. - DecObjectCounter() + // SetObjectCounter must set object counter taking into account object type. + SetObjectCounter(objectType string, v uint64) + // AddToObjectCounter must update object counter taking into account object + // type. + // Negative parameter must decrease the counter. + AddToObjectCounter(objectType string, delta int) + // IncObjectCounter must increment shard's object counter taking into account + // object type. + IncObjectCounter(objectType string) + // DecObjectCounter must decrement shard's object counter taking into account + // object type. + DecObjectCounter(objectType string) } type cfg struct { @@ -295,9 +298,20 @@ func (s *Shard) fillInfo() { } } +const ( + // physical is a physically stored object + // counter type + physical = "phy" + + // logical is a logically stored object + // counter type (excludes objects that are + // stored but unavailable) + logical = "logic" +) + func (s *Shard) updateObjectCounter() { if s.cfg.metricsWriter != nil && !s.GetMode().NoMetabase() { - c, err := s.metaBase.ObjectCounter() + cc, err := s.metaBase.ObjectCounters() if err != nil { s.log.Warn("meta: object counter read", zap.Error(err), @@ -306,18 +320,22 @@ func (s *Shard) updateObjectCounter() { return } - s.cfg.metricsWriter.SetObjectCounter(c) + s.cfg.metricsWriter.SetObjectCounter(physical, cc.Phy()) + s.cfg.metricsWriter.SetObjectCounter(logical, cc.Logic()) } } +// incObjectCounter increment both physical and logical object +// counters. func (s *Shard) incObjectCounter() { if s.cfg.metricsWriter != nil { - s.cfg.metricsWriter.IncObjectCounter() + s.cfg.metricsWriter.IncObjectCounter(physical) + s.cfg.metricsWriter.IncObjectCounter(logical) } } -func (s *Shard) decObjectCounterBy(v uint64) { +func (s *Shard) decObjectCounterBy(typ string, v uint64) { if s.cfg.metricsWriter != nil { - s.cfg.metricsWriter.AddToObjectCounter(-int(v)) + s.cfg.metricsWriter.AddToObjectCounter(typ, -int(v)) } }