diff --git a/pkg/local_object_storage/engine/metrics.go b/pkg/local_object_storage/engine/metrics.go index 69fecc27..254a7d6d 100644 --- a/pkg/local_object_storage/engine/metrics.go +++ b/pkg/local_object_storage/engine/metrics.go @@ -16,6 +16,8 @@ type MetricRegister interface { AddRangeDuration(d time.Duration) AddSearchDuration(d time.Duration) AddListObjectsDuration(d time.Duration) + + AddToObjectCounter(shardID string, delta int) } func elapsed(addFunc func(d time.Duration)) func() { diff --git a/pkg/local_object_storage/engine/shards.go b/pkg/local_object_storage/engine/shards.go index bfcf778f..db72fd8b 100644 --- a/pkg/local_object_storage/engine/shards.go +++ b/pkg/local_object_storage/engine/shards.go @@ -17,6 +17,23 @@ var errShardNotFound = errors.New("shard not found") type hashedShard shardWrapper +type metricsWithID struct { + id string + mw MetricRegister +} + +func (m metricsWithID) AddToObjectCounter(delta int) { + m.mw.AddToObjectCounter(m.id, delta) +} + +func (m metricsWithID) IncObjectCounter() { + m.mw.AddToObjectCounter(m.id, +1) +} + +func (m metricsWithID) DecObjectCounter() { + m.mw.AddToObjectCounter(m.id, -1) +} + // AddShard adds a new shard to the storage engine. // // Returns any error encountered that did not allow adding a shard. @@ -35,6 +52,15 @@ func (e *StorageEngine) AddShard(opts ...shard.Option) (*shard.ID, error) { return nil, fmt.Errorf("could not generate shard ID: %w", err) } + if e.metrics != nil { + opts = append(opts, shard.WithMetricsWriter( + metricsWithID{ + id: id.String(), + mw: e.metrics, + }, + )) + } + sh := shard.New(append(opts, shard.WithID(id), shard.WithExpiredTombstonesCallback(e.processExpiredTombstones), diff --git a/pkg/local_object_storage/metabase/delete.go b/pkg/local_object_storage/metabase/delete.go index f0d5e9d1..f6c4d125 100644 --- a/pkg/local_object_storage/metabase/delete.go +++ b/pkg/local_object_storage/metabase/delete.go @@ -11,7 +11,6 @@ import ( objectSDK "github.com/nspcc-dev/neofs-sdk-go/object" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" "go.etcd.io/bbolt" - "go.uber.org/zap" ) // DeletePrm groups the parameters of Delete operation. @@ -20,7 +19,14 @@ type DeletePrm struct { } // DeleteRes groups the resulting values of Delete operation. -type DeleteRes struct{} +type DeleteRes struct { + removed uint64 +} + +// RemovedObjects returns number of removed raw objects. +func (d DeleteRes) RemovedObjects() uint64 { + return d.removed +} // SetAddresses is a Delete option to set the addresses of the objects to delete. // @@ -44,8 +50,12 @@ func (db *DB) Delete(prm DeletePrm) (DeleteRes, error) { db.modeMtx.RLock() defer db.modeMtx.RUnlock() - err := db.boltDB.Update(func(tx *bbolt.Tx) error { - return db.deleteGroup(tx, prm.addrs) + var rawRemoved uint64 + var err error + + err = db.boltDB.Update(func(tx *bbolt.Tx) error { + rawRemoved, err = db.deleteGroup(tx, prm.addrs) + return err }) if err == nil { for i := range prm.addrs { @@ -54,10 +64,10 @@ func (db *DB) Delete(prm DeletePrm) (DeleteRes, error) { storagelog.OpField("metabase DELETE")) } } - return DeleteRes{}, err + return DeleteRes{removed: rawRemoved}, err } -func (db *DB) deleteGroup(tx *bbolt.Tx, addrs []oid.Address) error { +func (db *DB) deleteGroup(tx *bbolt.Tx, addrs []oid.Address) (uint64, error) { refCounter := make(referenceCounter, len(addrs)) currEpoch := db.epochState.CurrentEpoch() @@ -65,7 +75,7 @@ func (db *DB) deleteGroup(tx *bbolt.Tx, addrs []oid.Address) error { for i := range addrs { removed, err := db.delete(tx, addrs[i], refCounter, currEpoch) if err != nil { - return err // maybe log and continue? + return 0, err // maybe log and continue? } if removed { @@ -75,20 +85,19 @@ func (db *DB) deleteGroup(tx *bbolt.Tx, addrs []oid.Address) error { err := db.updateCounter(tx, rawDeleted, false) if err != nil { - db.log.Error("could not decrease object counter", - zap.Error(err)) + return 0, fmt.Errorf("could not decrease object counter: %w", err) } for _, refNum := range refCounter { if refNum.cur == refNum.all { err := db.deleteObject(tx, refNum.obj, true) if err != nil { - return err // maybe log and continue? + return rawDeleted, err // maybe log and continue? } } } - return nil + return rawDeleted, nil } func (db *DB) delete(tx *bbolt.Tx, addr oid.Address, refCounter referenceCounter, currEpoch uint64) (bool, error) { diff --git a/pkg/local_object_storage/metabase/put.go b/pkg/local_object_storage/metabase/put.go index 2c129ce0..8abe527f 100644 --- a/pkg/local_object_storage/metabase/put.go +++ b/pkg/local_object_storage/metabase/put.go @@ -14,7 +14,6 @@ import ( objectSDK "github.com/nspcc-dev/neofs-sdk-go/object" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" "go.etcd.io/bbolt" - "go.uber.org/zap" ) type ( @@ -146,8 +145,7 @@ func (db *DB) put( if !isParent { err = db.updateCounter(tx, 1, true) if err != nil { - db.log.Error("could not increase object counter: %w", - zap.Error(err)) + return fmt.Errorf("could not increase object counter: %w", err) } } diff --git a/pkg/local_object_storage/shard/delete.go b/pkg/local_object_storage/shard/delete.go index e5e9ded7..3115dada 100644 --- a/pkg/local_object_storage/shard/delete.go +++ b/pkg/local_object_storage/shard/delete.go @@ -65,11 +65,13 @@ func (s *Shard) Delete(prm DeletePrm) (DeleteRes, error) { var delPrm meta.DeletePrm delPrm.SetAddresses(prm.addr...) - _, err := s.metaBase.Delete(delPrm) + res, err := s.metaBase.Delete(delPrm) if err != nil { return DeleteRes{}, err // stop on metabase error ? } + s.decObjectCounterBy(res.RemovedObjects()) + for i := range prm.addr { // delete small object var delPrm common.DeletePrm delPrm.Address = prm.addr[i] diff --git a/pkg/local_object_storage/shard/put.go b/pkg/local_object_storage/shard/put.go index 839d735b..585972e7 100644 --- a/pkg/local_object_storage/shard/put.go +++ b/pkg/local_object_storage/shard/put.go @@ -73,6 +73,8 @@ func (s *Shard) Put(prm PutPrm) (PutRes, error) { // since the object has been successfully written to BlobStor return PutRes{}, fmt.Errorf("could not put object to metabase: %w", err) } + + s.incObjectCounter() } return PutRes{}, nil diff --git a/pkg/local_object_storage/shard/shard.go b/pkg/local_object_storage/shard/shard.go index 9bf0cefa..bd54fc94 100644 --- a/pkg/local_object_storage/shard/shard.go +++ b/pkg/local_object_storage/shard/shard.go @@ -45,6 +45,17 @@ type ExpiredObjectsCallback func(context.Context, []oid.Address) // DeletedLockCallback is a callback handling list of deleted LOCK objects. type DeletedLockCallback func(context.Context, []oid.Address) +// MetricsWriter is an interface that must store shard's metrics. +type MetricsWriter interface { + // AddToObjectCounter must update object counter. Negative + // parameter must decrease the counter. + AddToObjectCounter(delta int) + // IncObjectCounter must increment shard's object counter. + IncObjectCounter() + // DecObjectCounter must decrement shard's object counter. + DecObjectCounter() +} + type cfg struct { m sync.RWMutex @@ -75,6 +86,8 @@ type cfg struct { deletedLockCallBack DeletedLockCallback tsSource TombstoneSource + + metricsWriter MetricsWriter } func defaultCfg() *cfg { @@ -259,6 +272,14 @@ func WithDeletedLockCallback(v DeletedLockCallback) Option { } } +// WithMetricsWriter returns option to specify storage of the +// shard's metrics. +func WithMetricsWriter(v MetricsWriter) Option { + return func(c *cfg) { + c.metricsWriter = v + } +} + func (s *Shard) fillInfo() { s.cfg.info.MetaBaseInfo = s.metaBase.DumpInfo() s.cfg.info.BlobStorInfo = s.blobStor.DumpInfo() @@ -271,3 +292,15 @@ func (s *Shard) fillInfo() { s.cfg.info.PiloramaInfo = s.pilorama.DumpInfo() } } + +func (s *Shard) incObjectCounter() { + if s.cfg.metricsWriter != nil { + s.cfg.metricsWriter.IncObjectCounter() + } +} + +func (s *Shard) decObjectCounterBy(v uint64) { + if s.cfg.metricsWriter != nil { + s.cfg.metricsWriter.AddToObjectCounter(-int(v)) + } +}