forked from TrueCloudLab/frostfs-node
WIP: Morph: Add unit tests #2
10 changed files with 130 additions and 14 deletions
|
@ -2,6 +2,7 @@ package writecache
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"time"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing"
|
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||||
|
@ -22,6 +23,13 @@ func (c *cache) Delete(ctx context.Context, addr oid.Address) error {
|
||||||
))
|
))
|
||||||
defer span.End()
|
defer span.End()
|
||||||
|
|
||||||
|
deleted := false
|
||||||
|
storageType := storageTypeUndefined
|
||||||
|
startedAt := time.Now()
|
||||||
|
defer func() {
|
||||||
|
c.metrics.Delete(time.Since(startedAt), deleted, storageType)
|
||||||
|
}()
|
||||||
|
|
||||||
c.modeMtx.RLock()
|
c.modeMtx.RLock()
|
||||||
defer c.modeMtx.RUnlock()
|
defer c.modeMtx.RUnlock()
|
||||||
if c.readOnly() {
|
if c.readOnly() {
|
||||||
|
@ -30,15 +38,15 @@ func (c *cache) Delete(ctx context.Context, addr oid.Address) error {
|
||||||
|
|
||||||
saddr := addr.EncodeToString()
|
saddr := addr.EncodeToString()
|
||||||
|
|
||||||
// Check disk cache.
|
var dataSize int
|
||||||
var has int
|
|
||||||
_ = c.db.View(func(tx *bbolt.Tx) error {
|
_ = c.db.View(func(tx *bbolt.Tx) error {
|
||||||
b := tx.Bucket(defaultBucket)
|
b := tx.Bucket(defaultBucket)
|
||||||
has = len(b.Get([]byte(saddr)))
|
dataSize = len(b.Get([]byte(saddr)))
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
|
|
||||||
if 0 < has {
|
if dataSize > 0 {
|
||||||
|
storageType = storageTypeDB
|
||||||
err := c.db.Update(func(tx *bbolt.Tx) error {
|
err := c.db.Update(func(tx *bbolt.Tx) error {
|
||||||
b := tx.Bucket(defaultBucket)
|
b := tx.Bucket(defaultBucket)
|
||||||
err := b.Delete([]byte(saddr))
|
err := b.Delete([]byte(saddr))
|
||||||
|
@ -52,10 +60,12 @@ func (c *cache) Delete(ctx context.Context, addr oid.Address) error {
|
||||||
storagelog.StorageTypeField(wcStorageType),
|
storagelog.StorageTypeField(wcStorageType),
|
||||||
storagelog.OpField("db DELETE"),
|
storagelog.OpField("db DELETE"),
|
||||||
)
|
)
|
||||||
|
deleted = true
|
||||||
c.objCounters.DecDB()
|
c.objCounters.DecDB()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
storageType = storageTypeFSTree
|
||||||
_, err := c.fsTree.Delete(ctx, common.DeletePrm{Address: addr})
|
_, err := c.fsTree.Delete(ctx, common.DeletePrm{Address: addr})
|
||||||
if err == nil {
|
if err == nil {
|
||||||
storagelog.Write(c.log,
|
storagelog.Write(c.log,
|
||||||
|
@ -64,6 +74,7 @@ func (c *cache) Delete(ctx context.Context, addr oid.Address) error {
|
||||||
storagelog.OpField("fstree DELETE"),
|
storagelog.OpField("fstree DELETE"),
|
||||||
)
|
)
|
||||||
c.objCounters.DecFS()
|
c.objCounters.DecFS()
|
||||||
|
deleted = true
|
||||||
}
|
}
|
||||||
|
|
||||||
return err
|
return err
|
||||||
|
|
|
@ -199,7 +199,7 @@ func (c *cache) flushFSTree(ctx context.Context, ignoreErrors bool) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
err = c.flushObject(ctx, &obj, data)
|
err = c.flushObject(ctx, &obj, data, storageTypeFSTree)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if ignoreErrors {
|
if ignoreErrors {
|
||||||
return nil
|
return nil
|
||||||
|
@ -228,7 +228,7 @@ func (c *cache) workerFlushSmall() {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
err := c.flushObject(context.TODO(), obj, nil)
|
err := c.flushObject(context.TODO(), obj, nil, storageTypeDB)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// Error is handled in flushObject.
|
// Error is handled in flushObject.
|
||||||
continue
|
continue
|
||||||
|
@ -239,7 +239,13 @@ func (c *cache) workerFlushSmall() {
|
||||||
}
|
}
|
||||||
|
|
||||||
// flushObject is used to write object directly to the main storage.
|
// flushObject is used to write object directly to the main storage.
|
||||||
func (c *cache) flushObject(ctx context.Context, obj *object.Object, data []byte) error {
|
func (c *cache) flushObject(ctx context.Context, obj *object.Object, data []byte, st storageType) error {
|
||||||
|
var err error
|
||||||
|
|
||||||
|
defer func() {
|
||||||
|
c.metrics.Flush(err == nil, st)
|
||||||
|
}()
|
||||||
|
|
||||||
addr := objectCore.AddressOf(obj)
|
addr := objectCore.AddressOf(obj)
|
||||||
|
|
||||||
var prm common.PutPrm
|
var prm common.PutPrm
|
||||||
|
@ -313,7 +319,7 @@ func (c *cache) flush(ctx context.Context, ignoreErrors bool) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := c.flushObject(ctx, &obj, data); err != nil {
|
if err := c.flushObject(ctx, &obj, data, storageTypeDB); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -2,6 +2,7 @@ package writecache
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"time"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing"
|
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||||
|
@ -27,9 +28,22 @@ func (c *cache) Get(ctx context.Context, addr oid.Address) (*objectSDK.Object, e
|
||||||
))
|
))
|
||||||
defer span.End()
|
defer span.End()
|
||||||
|
|
||||||
|
return c.getInternal(ctx, saddr, addr)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *cache) getInternal(ctx context.Context, saddr string, addr oid.Address) (*objectSDK.Object, error) {
|
||||||
|
found := false
|
||||||
|
storageType := storageTypeUndefined
|
||||||
|
startedAt := time.Now()
|
||||||
|
defer func() {
|
||||||
|
c.metrics.Get(time.Since(startedAt), found, storageType)
|
||||||
|
}()
|
||||||
|
|
||||||
value, err := Get(c.db, []byte(saddr))
|
value, err := Get(c.db, []byte(saddr))
|
||||||
if err == nil {
|
if err == nil {
|
||||||
obj := objectSDK.New()
|
obj := objectSDK.New()
|
||||||
|
found = true
|
||||||
|
storageType = storageTypeDB
|
||||||
return obj, obj.Unmarshal(value)
|
return obj, obj.Unmarshal(value)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -38,6 +52,8 @@ func (c *cache) Get(ctx context.Context, addr oid.Address) (*objectSDK.Object, e
|
||||||
return nil, logicerr.Wrap(apistatus.ObjectNotFound{})
|
return nil, logicerr.Wrap(apistatus.ObjectNotFound{})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
found = true
|
||||||
|
storageType = storageTypeFSTree
|
||||||
return res.Object, nil
|
return res.Object, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -45,13 +61,15 @@ func (c *cache) Get(ctx context.Context, addr oid.Address) (*objectSDK.Object, e
|
||||||
//
|
//
|
||||||
// Returns an error of type apistatus.ObjectNotFound if the requested object is missing in write-cache.
|
// Returns an error of type apistatus.ObjectNotFound if the requested object is missing in write-cache.
|
||||||
func (c *cache) Head(ctx context.Context, addr oid.Address) (*objectSDK.Object, error) {
|
func (c *cache) Head(ctx context.Context, addr oid.Address) (*objectSDK.Object, error) {
|
||||||
|
saddr := addr.EncodeToString()
|
||||||
|
|
||||||
ctx, span := tracing.StartSpanFromContext(ctx, "writecache.Head",
|
ctx, span := tracing.StartSpanFromContext(ctx, "writecache.Head",
|
||||||
trace.WithAttributes(
|
trace.WithAttributes(
|
||||||
attribute.String("address", addr.EncodeToString()),
|
attribute.String("address", saddr),
|
||||||
))
|
))
|
||||||
defer span.End()
|
defer span.End()
|
||||||
|
|
||||||
obj, err := c.Get(ctx, addr)
|
obj, err := c.getInternal(ctx, saddr, addr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
42
pkg/local_object_storage/writecache/metrics.go
Normal file
42
pkg/local_object_storage/writecache/metrics.go
Normal file
|
@ -0,0 +1,42 @@
|
||||||
|
package writecache
|
||||||
|
|
||||||
|
import (
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
|
||||||
|
)
|
||||||
|
|
||||||
|
type storageType string
|
||||||
|
|
||||||
|
const (
|
||||||
|
storageTypeUndefined storageType = "null"
|
||||||
|
storageTypeDB storageType = "db"
|
||||||
|
storageTypeFSTree storageType = "fstree"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Metrics interface {
|
||||||
|
Get(d time.Duration, success bool, st storageType)
|
||||||
|
Delete(d time.Duration, success bool, st storageType)
|
||||||
|
Put(d time.Duration, success bool, st storageType)
|
||||||
|
Flush(success bool, st storageType)
|
||||||
|
Evict(st storageType)
|
||||||
|
|
||||||
|
Estimate(db, fstree uint64)
|
||||||
|
SetMode(m mode.Mode)
|
||||||
|
}
|
||||||
|
|
||||||
|
type metricsStub struct{}
|
||||||
|
|
||||||
|
func (s *metricsStub) Get(time.Duration, bool, storageType) {}
|
||||||
|
|
||||||
|
func (s *metricsStub) Delete(time.Duration, bool, storageType) {}
|
||||||
|
|
||||||
|
func (s *metricsStub) Put(time.Duration, bool, storageType) {}
|
||||||
|
|
||||||
|
func (s *metricsStub) Estimate(uint64, uint64) {}
|
||||||
|
|
||||||
|
func (s *metricsStub) SetMode(mode.Mode) {}
|
||||||
|
|
||||||
|
func (s *metricsStub) Flush(bool, storageType) {}
|
||||||
|
|
||||||
|
func (s *metricsStub) Evict(storageType) {}
|
|
@ -29,7 +29,11 @@ func (c *cache) SetMode(m mode.Mode) error {
|
||||||
))
|
))
|
||||||
defer span.End()
|
defer span.End()
|
||||||
|
|
||||||
return c.setMode(ctx, m)
|
err := c.setMode(ctx, m)
|
||||||
|
if err == nil {
|
||||||
|
c.metrics.SetMode(m)
|
||||||
|
}
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// setMode applies new mode. Must be called with cache.modeMtx lock taken.
|
// setMode applies new mode. Must be called with cache.modeMtx lock taken.
|
||||||
|
|
|
@ -60,6 +60,8 @@ type options struct {
|
||||||
reportError func(string, error)
|
reportError func(string, error)
|
||||||
// openFile is the function called internally by bbolt to open database files. Useful for hermetic testing.
|
// openFile is the function called internally by bbolt to open database files. Useful for hermetic testing.
|
||||||
openFile func(string, int, fs.FileMode) (*os.File, error)
|
openFile func(string, int, fs.FileMode) (*os.File, error)
|
||||||
|
// metrics is metrics implementation
|
||||||
|
metrics Metrics
|
||||||
}
|
}
|
||||||
|
|
||||||
// WithLogger sets logger.
|
// WithLogger sets logger.
|
||||||
|
@ -164,3 +166,10 @@ func WithOpenFile(f func(string, int, fs.FileMode) (*os.File, error)) Option {
|
||||||
o.openFile = f
|
o.openFile = f
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// WithMetrics sets metrics implementation.
|
||||||
|
func WithMetrics(metrics Metrics) Option {
|
||||||
|
return func(o *options) {
|
||||||
|
o.metrics = metrics
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -3,6 +3,7 @@ package writecache
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
|
"time"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing"
|
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||||
|
@ -33,6 +34,13 @@ func (c *cache) Put(ctx context.Context, prm common.PutPrm) (common.PutRes, erro
|
||||||
))
|
))
|
||||||
defer span.End()
|
defer span.End()
|
||||||
|
|
||||||
|
startedAt := time.Now()
|
||||||
|
added := false
|
||||||
|
storageType := storageTypeUndefined
|
||||||
|
defer func() {
|
||||||
|
c.metrics.Put(time.Since(startedAt), added, storageType)
|
||||||
|
}()
|
||||||
|
|
||||||
c.modeMtx.RLock()
|
c.modeMtx.RLock()
|
||||||
defer c.modeMtx.RUnlock()
|
defer c.modeMtx.RUnlock()
|
||||||
if c.readOnly() {
|
if c.readOnly() {
|
||||||
|
@ -51,9 +59,20 @@ func (c *cache) Put(ctx context.Context, prm common.PutPrm) (common.PutRes, erro
|
||||||
}
|
}
|
||||||
|
|
||||||
if sz <= c.smallObjectSize {
|
if sz <= c.smallObjectSize {
|
||||||
return common.PutRes{}, c.putSmall(oi)
|
storageType = storageTypeDB
|
||||||
|
err := c.putSmall(oi)
|
||||||
|
if err == nil {
|
||||||
|
added = true
|
||||||
}
|
}
|
||||||
return common.PutRes{}, c.putBig(ctx, oi.addr, prm)
|
return common.PutRes{}, err
|
||||||
|
}
|
||||||
|
|
||||||
|
storageType = storageTypeFSTree
|
||||||
|
err := c.putBig(ctx, oi.addr, prm)
|
||||||
|
if err == nil {
|
||||||
|
added = true
|
||||||
|
}
|
||||||
|
return common.PutRes{}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// putSmall persists small objects to the write-cache database and
|
// putSmall persists small objects to the write-cache database and
|
||||||
|
|
|
@ -9,7 +9,10 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
func (c *cache) estimateCacheSize() uint64 {
|
func (c *cache) estimateCacheSize() uint64 {
|
||||||
return c.objCounters.DB()*c.smallObjectSize + c.objCounters.FS()*c.maxObjectSize
|
db := c.objCounters.DB() * c.smallObjectSize
|
||||||
|
fstree := c.objCounters.FS() * c.maxObjectSize
|
||||||
|
c.metrics.Estimate(db, fstree)
|
||||||
|
return db + fstree
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *cache) incSizeDB(sz uint64) uint64 {
|
func (c *cache) incSizeDB(sz uint64) uint64 {
|
||||||
|
|
|
@ -79,6 +79,7 @@ func (c *cache) deleteFromDB(keys []string) []string {
|
||||||
})
|
})
|
||||||
for i := 0; i < errorIndex; i++ {
|
for i := 0; i < errorIndex; i++ {
|
||||||
c.objCounters.DecDB()
|
c.objCounters.DecDB()
|
||||||
|
c.metrics.Evict(storageTypeDB)
|
||||||
storagelog.Write(c.log,
|
storagelog.Write(c.log,
|
||||||
storagelog.AddressField(keys[i]),
|
storagelog.AddressField(keys[i]),
|
||||||
storagelog.StorageTypeField(wcStorageType),
|
storagelog.StorageTypeField(wcStorageType),
|
||||||
|
@ -121,6 +122,7 @@ func (c *cache) deleteFromDisk(ctx context.Context, keys []string) []string {
|
||||||
storagelog.StorageTypeField(wcStorageType),
|
storagelog.StorageTypeField(wcStorageType),
|
||||||
storagelog.OpField("fstree DELETE"),
|
storagelog.OpField("fstree DELETE"),
|
||||||
)
|
)
|
||||||
|
c.metrics.Evict(storageTypeFSTree)
|
||||||
c.objCounters.DecFS()
|
c.objCounters.DecFS()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -103,6 +103,7 @@ func New(opts ...Option) Cache {
|
||||||
maxBatchSize: bbolt.DefaultMaxBatchSize,
|
maxBatchSize: bbolt.DefaultMaxBatchSize,
|
||||||
maxBatchDelay: bbolt.DefaultMaxBatchDelay,
|
maxBatchDelay: bbolt.DefaultMaxBatchDelay,
|
||||||
openFile: os.OpenFile,
|
openFile: os.OpenFile,
|
||||||
|
metrics: &metricsStub{},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -140,6 +141,7 @@ func (c *cache) Open(readOnly bool) error {
|
||||||
|
|
||||||
// Init runs necessary services.
|
// Init runs necessary services.
|
||||||
func (c *cache) Init() error {
|
func (c *cache) Init() error {
|
||||||
|
c.metrics.SetMode(c.mode)
|
||||||
c.runFlushLoop()
|
c.runFlushLoop()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue