[#1658] shard: Update metric counters
Use meta's operation results to change the metrics. Support typed object counters. Signed-off-by: Pavel Karpy <carpawell@nspcc.ru>
This commit is contained in:
parent
ad47e2a985
commit
431e331373
6 changed files with 238 additions and 18 deletions
|
@ -312,6 +312,12 @@ func TestRefillMetabase(t *testing.T) {
|
||||||
checkTombMembers(true)
|
checkTombMembers(true)
|
||||||
checkLocked(t, cnrLocked, locked)
|
checkLocked(t, cnrLocked, locked)
|
||||||
|
|
||||||
|
c, err := sh.metaBase.ObjectCounters()
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
phyBefore := c.Phy()
|
||||||
|
logicalBefore := c.Logic()
|
||||||
|
|
||||||
err = sh.Close()
|
err = sh.Close()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
@ -340,6 +346,12 @@ func TestRefillMetabase(t *testing.T) {
|
||||||
err = sh.refillMetabase()
|
err = sh.refillMetabase()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
c, err = sh.metaBase.ObjectCounters()
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
require.Equal(t, phyBefore, c.Phy())
|
||||||
|
require.Equal(t, logicalBefore, c.Logic())
|
||||||
|
|
||||||
checkAllObjs(true)
|
checkAllObjs(true)
|
||||||
checkObj(object.AddressOf(tombObj), tombObj)
|
checkObj(object.AddressOf(tombObj), tombObj)
|
||||||
checkTombMembers(true)
|
checkTombMembers(true)
|
||||||
|
|
|
@ -69,7 +69,8 @@ func (s *Shard) Delete(prm DeletePrm) (DeleteRes, error) {
|
||||||
return DeleteRes{}, err // stop on metabase error ?
|
return DeleteRes{}, err // stop on metabase error ?
|
||||||
}
|
}
|
||||||
|
|
||||||
s.decObjectCounterBy(res.RemovedObjects())
|
s.decObjectCounterBy(physical, res.RawObjectsRemoved())
|
||||||
|
s.decObjectCounterBy(logical, res.AvailableObjectsRemoved())
|
||||||
|
|
||||||
for i := range prm.addr { // delete small object
|
for i := range prm.addr { // delete small object
|
||||||
var delPrm common.DeletePrm
|
var delPrm common.DeletePrm
|
||||||
|
|
|
@ -241,7 +241,7 @@ func (s *Shard) collectExpiredObjects(ctx context.Context, e Event) {
|
||||||
inhumePrm.SetGCMark()
|
inhumePrm.SetGCMark()
|
||||||
|
|
||||||
// inhume the collected objects
|
// inhume the collected objects
|
||||||
_, err = s.metaBase.Inhume(inhumePrm)
|
res, err := s.metaBase.Inhume(inhumePrm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
s.log.Warn("could not inhume the objects",
|
s.log.Warn("could not inhume the objects",
|
||||||
zap.String("error", err.Error()),
|
zap.String("error", err.Error()),
|
||||||
|
@ -249,6 +249,8 @@ func (s *Shard) collectExpiredObjects(ctx context.Context, e Event) {
|
||||||
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
s.decObjectCounterBy(logical, res.AvailableInhumed())
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Shard) collectExpiredTombstones(ctx context.Context, e Event) {
|
func (s *Shard) collectExpiredTombstones(ctx context.Context, e Event) {
|
||||||
|
@ -354,7 +356,7 @@ func (s *Shard) HandleExpiredTombstones(tss []meta.TombstonedObject) {
|
||||||
pInhume.SetAddresses(tsAddrs...)
|
pInhume.SetAddresses(tsAddrs...)
|
||||||
|
|
||||||
// inhume tombstones
|
// inhume tombstones
|
||||||
_, err := s.metaBase.Inhume(pInhume)
|
res, err := s.metaBase.Inhume(pInhume)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
s.log.Warn("could not mark tombstones as garbage",
|
s.log.Warn("could not mark tombstones as garbage",
|
||||||
zap.String("error", err.Error()),
|
zap.String("error", err.Error()),
|
||||||
|
@ -363,6 +365,8 @@ func (s *Shard) HandleExpiredTombstones(tss []meta.TombstonedObject) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
s.decObjectCounterBy(logical, res.AvailableInhumed())
|
||||||
|
|
||||||
// drop just processed expired tombstones
|
// drop just processed expired tombstones
|
||||||
// from graveyard
|
// from graveyard
|
||||||
err = s.metaBase.DropGraves(tss)
|
err = s.metaBase.DropGraves(tss)
|
||||||
|
@ -387,7 +391,7 @@ func (s *Shard) HandleExpiredLocks(lockers []oid.Address) {
|
||||||
pInhume.SetAddresses(lockers...)
|
pInhume.SetAddresses(lockers...)
|
||||||
pInhume.SetGCMark()
|
pInhume.SetGCMark()
|
||||||
|
|
||||||
_, err = s.metaBase.Inhume(pInhume)
|
res, err := s.metaBase.Inhume(pInhume)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
s.log.Warn("failure to mark lockers as garbage",
|
s.log.Warn("failure to mark lockers as garbage",
|
||||||
zap.String("error", err.Error()),
|
zap.String("error", err.Error()),
|
||||||
|
@ -395,6 +399,8 @@ func (s *Shard) HandleExpiredLocks(lockers []oid.Address) {
|
||||||
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
s.decObjectCounterBy(logical, res.AvailableInhumed())
|
||||||
}
|
}
|
||||||
|
|
||||||
// HandleDeletedLocks unlocks all objects which were locked by lockers.
|
// HandleDeletedLocks unlocks all objects which were locked by lockers.
|
||||||
|
|
|
@ -101,6 +101,8 @@ func (s *Shard) Inhume(prm InhumePrm) (InhumeRes, error) {
|
||||||
return InhumeRes{}, fmt.Errorf("metabase inhume: %w", err)
|
return InhumeRes{}, fmt.Errorf("metabase inhume: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
s.decObjectCounterBy(logical, res.AvailableInhumed())
|
||||||
|
|
||||||
if deletedLockObjs := res.DeletedLockObjects(); len(deletedLockObjs) != 0 {
|
if deletedLockObjs := res.DeletedLockObjects(); len(deletedLockObjs) != 0 {
|
||||||
s.deletedLockCallBack(context.Background(), deletedLockObjs)
|
s.deletedLockCallBack(context.Background(), deletedLockObjs)
|
||||||
}
|
}
|
||||||
|
|
181
pkg/local_object_storage/shard/metrics_test.go
Normal file
181
pkg/local_object_storage/shard/metrics_test.go
Normal file
|
@ -0,0 +1,181 @@
|
||||||
|
package shard_test
|
||||||
|
|
||||||
|
import (
|
||||||
|
"path/filepath"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
objectcore "github.com/nspcc-dev/neofs-node/pkg/core/object"
|
||||||
|
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor"
|
||||||
|
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/fstree"
|
||||||
|
meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase"
|
||||||
|
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/pilorama"
|
||||||
|
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard"
|
||||||
|
"github.com/nspcc-dev/neofs-sdk-go/object"
|
||||||
|
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
type metricsStore struct {
|
||||||
|
s map[string]uint64
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m metricsStore) SetObjectCounter(objectType string, v uint64) {
|
||||||
|
m.s[objectType] = v
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m metricsStore) AddToObjectCounter(objectType string, delta int) {
|
||||||
|
switch {
|
||||||
|
case delta > 0:
|
||||||
|
m.s[objectType] += uint64(delta)
|
||||||
|
case delta < 0:
|
||||||
|
uDelta := uint64(-delta)
|
||||||
|
|
||||||
|
if m.s[objectType] >= uDelta {
|
||||||
|
m.s[objectType] -= uDelta
|
||||||
|
} else {
|
||||||
|
m.s[objectType] = 0
|
||||||
|
}
|
||||||
|
case delta == 0:
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m metricsStore) IncObjectCounter(objectType string) {
|
||||||
|
m.s[objectType] += 1
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m metricsStore) DecObjectCounter(objectType string) {
|
||||||
|
m.AddToObjectCounter(objectType, -1)
|
||||||
|
}
|
||||||
|
|
||||||
|
const physical = "phy"
|
||||||
|
const logical = "logic"
|
||||||
|
|
||||||
|
func TestCounters(t *testing.T) {
|
||||||
|
dir := t.TempDir()
|
||||||
|
sh, mm := shardWithMetrics(t, dir)
|
||||||
|
|
||||||
|
const objNumber = 10
|
||||||
|
oo := make([]*object.Object, objNumber)
|
||||||
|
for i := 0; i < objNumber; i++ {
|
||||||
|
oo[i] = generateObject(t)
|
||||||
|
}
|
||||||
|
|
||||||
|
t.Run("defaults", func(t *testing.T) {
|
||||||
|
require.Zero(t, mm.s[physical])
|
||||||
|
require.Zero(t, mm.s[logical])
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("put", func(t *testing.T) {
|
||||||
|
var prm shard.PutPrm
|
||||||
|
|
||||||
|
for i := 0; i < objNumber; i++ {
|
||||||
|
prm.SetObject(oo[i])
|
||||||
|
|
||||||
|
_, err := sh.Put(prm)
|
||||||
|
require.NoError(t, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
require.Equal(t, uint64(objNumber), mm.s[physical])
|
||||||
|
require.Equal(t, uint64(objNumber), mm.s[logical])
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("inhume_GC", func(t *testing.T) {
|
||||||
|
var prm shard.InhumePrm
|
||||||
|
inhumedNumber := objNumber / 4
|
||||||
|
|
||||||
|
for i := 0; i < inhumedNumber; i++ {
|
||||||
|
prm.MarkAsGarbage(objectcore.AddressOf(oo[i]))
|
||||||
|
|
||||||
|
_, err := sh.Inhume(prm)
|
||||||
|
require.NoError(t, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
require.Equal(t, uint64(objNumber), mm.s[physical])
|
||||||
|
require.Equal(t, uint64(objNumber-inhumedNumber), mm.s[logical])
|
||||||
|
|
||||||
|
oo = oo[inhumedNumber:]
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("inhume_TS", func(t *testing.T) {
|
||||||
|
var prm shard.InhumePrm
|
||||||
|
ts := objectcore.AddressOf(generateObject(t))
|
||||||
|
|
||||||
|
phy := mm.s[physical]
|
||||||
|
logic := mm.s[logical]
|
||||||
|
|
||||||
|
inhumedNumber := int(phy / 4)
|
||||||
|
prm.SetTarget(ts, addrFromObjs(oo[:inhumedNumber])...)
|
||||||
|
|
||||||
|
_, err := sh.Inhume(prm)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
require.Equal(t, phy, mm.s[physical])
|
||||||
|
require.Equal(t, logic-uint64(inhumedNumber), mm.s[logical])
|
||||||
|
|
||||||
|
oo = oo[inhumedNumber:]
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("Delete", func(t *testing.T) {
|
||||||
|
var prm shard.DeletePrm
|
||||||
|
|
||||||
|
phy := mm.s[physical]
|
||||||
|
logic := mm.s[logical]
|
||||||
|
|
||||||
|
deletedNumber := int(phy / 4)
|
||||||
|
prm.SetAddresses(addrFromObjs(oo[:deletedNumber])...)
|
||||||
|
|
||||||
|
_, err := sh.Delete(prm)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
require.Equal(t, phy-uint64(deletedNumber), mm.s[physical])
|
||||||
|
require.Equal(t, logic-uint64(deletedNumber), mm.s[logical])
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func shardWithMetrics(t *testing.T, path string) (*shard.Shard, *metricsStore) {
|
||||||
|
blobOpts := []blobstor.Option{
|
||||||
|
blobstor.WithStorages([]blobstor.SubStorage{
|
||||||
|
{
|
||||||
|
Storage: fstree.New(
|
||||||
|
fstree.WithDirNameLen(2),
|
||||||
|
fstree.WithPath(filepath.Join(path, "blob")),
|
||||||
|
fstree.WithDepth(1)),
|
||||||
|
},
|
||||||
|
}),
|
||||||
|
}
|
||||||
|
|
||||||
|
mm := &metricsStore{
|
||||||
|
s: map[string]uint64{
|
||||||
|
"phy": 0,
|
||||||
|
"logic": 0,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
sh := shard.New(
|
||||||
|
shard.WithBlobStorOptions(blobOpts...),
|
||||||
|
shard.WithPiloramaOptions(pilorama.WithPath(filepath.Join(path, "pilorama"))),
|
||||||
|
shard.WithMetaBaseOptions(
|
||||||
|
meta.WithPath(filepath.Join(path, "meta")),
|
||||||
|
meta.WithEpochState(epochState{})),
|
||||||
|
shard.WithMetricsWriter(mm),
|
||||||
|
)
|
||||||
|
require.NoError(t, sh.Open())
|
||||||
|
require.NoError(t, sh.Init())
|
||||||
|
|
||||||
|
t.Cleanup(func() {
|
||||||
|
sh.Close()
|
||||||
|
})
|
||||||
|
|
||||||
|
return sh, mm
|
||||||
|
}
|
||||||
|
|
||||||
|
func addrFromObjs(oo []*object.Object) []oid.Address {
|
||||||
|
aa := make([]oid.Address, len(oo))
|
||||||
|
|
||||||
|
for i := 0; i < len(oo); i++ {
|
||||||
|
aa[i] = objectcore.AddressOf(oo[i])
|
||||||
|
}
|
||||||
|
|
||||||
|
return aa
|
||||||
|
}
|
|
@ -47,15 +47,18 @@ type DeletedLockCallback func(context.Context, []oid.Address)
|
||||||
|
|
||||||
// MetricsWriter is an interface that must store shard's metrics.
|
// MetricsWriter is an interface that must store shard's metrics.
|
||||||
type MetricsWriter interface {
|
type MetricsWriter interface {
|
||||||
// SetObjectCounter must set object counter.
|
// SetObjectCounter must set object counter taking into account object type.
|
||||||
SetObjectCounter(v uint64)
|
SetObjectCounter(objectType string, v uint64)
|
||||||
// AddToObjectCounter must update object counter. Negative
|
// AddToObjectCounter must update object counter taking into account object
|
||||||
// parameter must decrease the counter.
|
// type.
|
||||||
AddToObjectCounter(delta int)
|
// Negative parameter must decrease the counter.
|
||||||
// IncObjectCounter must increment shard's object counter.
|
AddToObjectCounter(objectType string, delta int)
|
||||||
IncObjectCounter()
|
// IncObjectCounter must increment shard's object counter taking into account
|
||||||
// DecObjectCounter must decrement shard's object counter.
|
// object type.
|
||||||
DecObjectCounter()
|
IncObjectCounter(objectType string)
|
||||||
|
// DecObjectCounter must decrement shard's object counter taking into account
|
||||||
|
// object type.
|
||||||
|
DecObjectCounter(objectType string)
|
||||||
}
|
}
|
||||||
|
|
||||||
type cfg struct {
|
type cfg struct {
|
||||||
|
@ -295,9 +298,20 @@ func (s *Shard) fillInfo() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const (
|
||||||
|
// physical is a physically stored object
|
||||||
|
// counter type
|
||||||
|
physical = "phy"
|
||||||
|
|
||||||
|
// logical is a logically stored object
|
||||||
|
// counter type (excludes objects that are
|
||||||
|
// stored but unavailable)
|
||||||
|
logical = "logic"
|
||||||
|
)
|
||||||
|
|
||||||
func (s *Shard) updateObjectCounter() {
|
func (s *Shard) updateObjectCounter() {
|
||||||
if s.cfg.metricsWriter != nil && !s.GetMode().NoMetabase() {
|
if s.cfg.metricsWriter != nil && !s.GetMode().NoMetabase() {
|
||||||
c, err := s.metaBase.ObjectCounter()
|
cc, err := s.metaBase.ObjectCounters()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
s.log.Warn("meta: object counter read",
|
s.log.Warn("meta: object counter read",
|
||||||
zap.Error(err),
|
zap.Error(err),
|
||||||
|
@ -306,18 +320,22 @@ func (s *Shard) updateObjectCounter() {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
s.cfg.metricsWriter.SetObjectCounter(c)
|
s.cfg.metricsWriter.SetObjectCounter(physical, cc.Phy())
|
||||||
|
s.cfg.metricsWriter.SetObjectCounter(logical, cc.Logic())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// incObjectCounter increment both physical and logical object
|
||||||
|
// counters.
|
||||||
func (s *Shard) incObjectCounter() {
|
func (s *Shard) incObjectCounter() {
|
||||||
if s.cfg.metricsWriter != nil {
|
if s.cfg.metricsWriter != nil {
|
||||||
s.cfg.metricsWriter.IncObjectCounter()
|
s.cfg.metricsWriter.IncObjectCounter(physical)
|
||||||
|
s.cfg.metricsWriter.IncObjectCounter(logical)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Shard) decObjectCounterBy(v uint64) {
|
func (s *Shard) decObjectCounterBy(typ string, v uint64) {
|
||||||
if s.cfg.metricsWriter != nil {
|
if s.cfg.metricsWriter != nil {
|
||||||
s.cfg.metricsWriter.AddToObjectCounter(-int(v))
|
s.cfg.metricsWriter.AddToObjectCounter(typ, -int(v))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue