[#563] writecache: Fix metrics and bolt delete

Estimate cache size after delete objects to update metric.
Update counters on small object deletion.
Do not count bbolt DB file as FSTree object.

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
Dmitrii Stepanov 2023-08-07 13:31:56 +03:00
parent 1a0cb0f34a
commit 44e92f06e0
3 changed files with 12 additions and 20 deletions

View file

@ -246,7 +246,7 @@ func (c *cache) workerFlushSmall() {
continue continue
} }
c.deleteFromDB([]string{objectCore.AddressOf(obj).EncodeToString()}) c.deleteFromDB(objectCore.AddressOf(obj).EncodeToString())
} }
} }

View file

@ -68,6 +68,9 @@ func (c *cache) initCounters() error {
if err != nil { if err != nil {
return fmt.Errorf("could not read write-cache FS counter: %w", err) return fmt.Errorf("could not read write-cache FS counter: %w", err)
} }
if inFS > 0 {
inFS-- //small.bolt DB file
}
c.objCounters.cDB.Store(inDB) c.objCounters.cDB.Store(inDB)
c.objCounters.cFS.Store(inFS) c.objCounters.cFS.Store(inFS)

View file

@ -63,36 +63,24 @@ func (c *cache) openStore(readOnly bool) error {
return nil return nil
} }
func (c *cache) deleteFromDB(keys []string) []string { func (c *cache) deleteFromDB(key string) {
if len(keys) == 0 {
return keys
}
var errorIndex int
err := c.db.Batch(func(tx *bbolt.Tx) error { err := c.db.Batch(func(tx *bbolt.Tx) error {
b := tx.Bucket(defaultBucket) b := tx.Bucket(defaultBucket)
for errorIndex = range keys { return b.Delete([]byte(key))
if err := b.Delete([]byte(keys[errorIndex])); err != nil {
return err
}
}
return nil
}) })
for i := 0; i < errorIndex; i++ {
if err == nil {
c.objCounters.DecDB() c.objCounters.DecDB()
c.metrics.Evict(writecache.StorageTypeDB) c.metrics.Evict(writecache.StorageTypeDB)
storagelog.Write(c.log, storagelog.Write(c.log,
storagelog.AddressField(keys[i]), storagelog.AddressField(key),
storagelog.StorageTypeField(wcStorageType), storagelog.StorageTypeField(wcStorageType),
storagelog.OpField("db DELETE"), storagelog.OpField("db DELETE"),
) )
} c.estimateCacheSize()
if err != nil { } else {
c.log.Error(logs.WritecacheCantRemoveObjectsFromTheDatabase, zap.Error(err)) c.log.Error(logs.WritecacheCantRemoveObjectsFromTheDatabase, zap.Error(err))
} }
copy(keys, keys[errorIndex:])
return keys[:len(keys)-errorIndex]
} }
func (c *cache) deleteFromDisk(ctx context.Context, keys []string) []string { func (c *cache) deleteFromDisk(ctx context.Context, keys []string) []string {
@ -125,6 +113,7 @@ func (c *cache) deleteFromDisk(ctx context.Context, keys []string) []string {
) )
c.metrics.Evict(writecache.StorageTypeFSTree) c.metrics.Evict(writecache.StorageTypeFSTree)
c.objCounters.DecFS() c.objCounters.DecFS()
c.estimateCacheSize()
} }
} }