forked from TrueCloudLab/frostfs-node
[#585] writecache: Fix DB counter
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
parent
58c8722c81
commit
2efe9cc1be
3 changed files with 31 additions and 5 deletions
|
@ -2,6 +2,7 @@ package writecachebbolt
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"math"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||||
|
@ -49,9 +50,12 @@ func (c *cache) Delete(ctx context.Context, addr oid.Address) error {
|
||||||
|
|
||||||
if dataSize > 0 {
|
if dataSize > 0 {
|
||||||
storageType = writecache.StorageTypeDB
|
storageType = writecache.StorageTypeDB
|
||||||
|
var recordDeleted bool
|
||||||
err := c.db.Update(func(tx *bbolt.Tx) error {
|
err := c.db.Update(func(tx *bbolt.Tx) error {
|
||||||
b := tx.Bucket(defaultBucket)
|
b := tx.Bucket(defaultBucket)
|
||||||
err := b.Delete([]byte(saddr))
|
key := []byte(saddr)
|
||||||
|
recordDeleted = b.Get(key) != nil
|
||||||
|
err := b.Delete(key)
|
||||||
return err
|
return err
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -62,6 +66,10 @@ func (c *cache) Delete(ctx context.Context, addr oid.Address) error {
|
||||||
storagelog.StorageTypeField(wcStorageType),
|
storagelog.StorageTypeField(wcStorageType),
|
||||||
storagelog.OpField("db DELETE"),
|
storagelog.OpField("db DELETE"),
|
||||||
)
|
)
|
||||||
|
if recordDeleted {
|
||||||
|
c.objCounters.cDB.Add(math.MaxUint64)
|
||||||
|
c.estimateCacheSize()
|
||||||
|
}
|
||||||
deleted = true
|
deleted = true
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -75,7 +83,7 @@ func (c *cache) Delete(ctx context.Context, addr oid.Address) error {
|
||||||
storagelog.OpField("fstree DELETE"),
|
storagelog.OpField("fstree DELETE"),
|
||||||
)
|
)
|
||||||
deleted = true
|
deleted = true
|
||||||
|
c.estimateCacheSize()
|
||||||
}
|
}
|
||||||
|
|
||||||
return metaerr.Wrap(err)
|
return metaerr.Wrap(err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -85,9 +85,15 @@ func (c *cache) putSmall(obj objectInfo) error {
|
||||||
return ErrOutOfSpace
|
return ErrOutOfSpace
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var newRecord bool
|
||||||
err := c.db.Batch(func(tx *bbolt.Tx) error {
|
err := c.db.Batch(func(tx *bbolt.Tx) error {
|
||||||
b := tx.Bucket(defaultBucket)
|
b := tx.Bucket(defaultBucket)
|
||||||
return b.Put([]byte(obj.addr), obj.data)
|
key := []byte(obj.addr)
|
||||||
|
newRecord = b.Get(key) == nil
|
||||||
|
if newRecord {
|
||||||
|
return b.Put(key, obj.data)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
})
|
})
|
||||||
if err == nil {
|
if err == nil {
|
||||||
storagelog.Write(c.log,
|
storagelog.Write(c.log,
|
||||||
|
@ -95,6 +101,10 @@ func (c *cache) putSmall(obj objectInfo) error {
|
||||||
storagelog.StorageTypeField(wcStorageType),
|
storagelog.StorageTypeField(wcStorageType),
|
||||||
storagelog.OpField("db PUT"),
|
storagelog.OpField("db PUT"),
|
||||||
)
|
)
|
||||||
|
if newRecord {
|
||||||
|
c.objCounters.cDB.Add(1)
|
||||||
|
c.estimateCacheSize()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -121,6 +131,7 @@ func (c *cache) putBig(ctx context.Context, addr string, prm common.PutPrm) erro
|
||||||
storagelog.StorageTypeField(wcStorageType),
|
storagelog.StorageTypeField(wcStorageType),
|
||||||
storagelog.OpField("fstree PUT"),
|
storagelog.OpField("fstree PUT"),
|
||||||
)
|
)
|
||||||
|
c.estimateCacheSize()
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -3,6 +3,7 @@ package writecachebbolt
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"math"
|
||||||
"os"
|
"os"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||||
|
@ -68,9 +69,12 @@ func (c *cache) openStore(readOnly bool) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *cache) deleteFromDB(key string) {
|
func (c *cache) deleteFromDB(key string) {
|
||||||
|
var recordDeleted bool
|
||||||
err := c.db.Batch(func(tx *bbolt.Tx) error {
|
err := c.db.Batch(func(tx *bbolt.Tx) error {
|
||||||
b := tx.Bucket(defaultBucket)
|
b := tx.Bucket(defaultBucket)
|
||||||
return b.Delete([]byte(key))
|
key := []byte(key)
|
||||||
|
recordDeleted = !recordDeleted && b.Get(key) != nil
|
||||||
|
return b.Delete(key)
|
||||||
})
|
})
|
||||||
|
|
||||||
if err == nil {
|
if err == nil {
|
||||||
|
@ -80,7 +84,10 @@ func (c *cache) deleteFromDB(key string) {
|
||||||
storagelog.StorageTypeField(wcStorageType),
|
storagelog.StorageTypeField(wcStorageType),
|
||||||
storagelog.OpField("db DELETE"),
|
storagelog.OpField("db DELETE"),
|
||||||
)
|
)
|
||||||
|
if recordDeleted {
|
||||||
|
c.objCounters.cDB.Add(math.MaxUint64)
|
||||||
c.estimateCacheSize()
|
c.estimateCacheSize()
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
c.log.Error(logs.WritecacheCantRemoveObjectsFromTheDatabase, zap.Error(err))
|
c.log.Error(logs.WritecacheCantRemoveObjectsFromTheDatabase, zap.Error(err))
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue