From 8d18fa159e085dde62336ec80f76fdfee358c412 Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Fri, 17 Nov 2023 17:41:13 +0300 Subject: [PATCH] [#667] writecache: Fix flush test Allow to disable background flusher for testing purposes. Signed-off-by: Dmitrii Stepanov --- internal/logs/logs.go | 1 - .../writecache/writecachebadger/flush.go | 3 ++ .../writecache/writecachebadger/flush_test.go | 1 + .../writecache/writecachebadger/options.go | 9 ++++ .../writecache/writecachebbolt/flush.go | 5 +- .../writecache/writecachebbolt/flush_test.go | 1 + .../writecache/writecachebbolt/options.go | 9 ++++ .../writecache/writecachebbolt/storage.go | 47 +++++-------------- 8 files changed, 40 insertions(+), 36 deletions(-) diff --git a/internal/logs/logs.go b/internal/logs/logs.go index 5f361cefb..e8472357c 100644 --- a/internal/logs/logs.go +++ b/internal/logs/logs.go @@ -296,7 +296,6 @@ const ( WritecacheFillingFlushMarksForObjectsInDatabase = "filling flush marks for objects in database" WritecacheFinishedUpdatingFlushMarks = "finished updating flush marks" WritecacheCantRemoveObjectsFromTheDatabase = "can't remove objects from the database" - WritecacheCantParseAddress = "can't parse address" WritecacheCantRemoveObjectFromWritecache = "can't remove object from write-cache" WritecacheDBValueLogGCRunCompleted = "value log GC run completed" WritecacheBadgerObjAlreadyScheduled = "object already scheduled for flush" diff --git a/pkg/local_object_storage/writecache/writecachebadger/flush.go b/pkg/local_object_storage/writecache/writecachebadger/flush.go index 1ee6bbb66..48e31dee6 100644 --- a/pkg/local_object_storage/writecache/writecachebadger/flush.go +++ b/pkg/local_object_storage/writecache/writecachebadger/flush.go @@ -97,6 +97,9 @@ func (c *collector) send(ctx context.Context, cancel func(), buf *z.Buffer) erro // runFlushLoop starts background workers which periodically flush objects to the blobstor. func (c *cache) runFlushLoop(ctx context.Context) { + if c.disableBackgroundFlush { + return + } for i := 0; i < c.workersCount; i++ { c.wg.Add(1) go c.workerFlushSmall(ctx) diff --git a/pkg/local_object_storage/writecache/writecachebadger/flush_test.go b/pkg/local_object_storage/writecache/writecachebadger/flush_test.go index 392654e48..5e3f60eab 100644 --- a/pkg/local_object_storage/writecache/writecachebadger/flush_test.go +++ b/pkg/local_object_storage/writecache/writecachebadger/flush_test.go @@ -28,6 +28,7 @@ func TestFlush(t *testing.T) { WithMetabase(mb), WithBlobstor(bs), WithGCInterval(1 * time.Second), + WithDisableBackgroundFlush(), }, opts...)...) } diff --git a/pkg/local_object_storage/writecache/writecachebadger/options.go b/pkg/local_object_storage/writecache/writecachebadger/options.go index 63bfb196c..d041a9b88 100644 --- a/pkg/local_object_storage/writecache/writecachebadger/options.go +++ b/pkg/local_object_storage/writecache/writecachebadger/options.go @@ -34,6 +34,8 @@ type options struct { metrics writecache.Metrics // gcInterval is the interval duration to run the GC cycle. gcInterval time.Duration + // disableBackgroundFlush is for testing purposes only. + disableBackgroundFlush bool } // WithLogger sets logger. @@ -108,3 +110,10 @@ func WithGCInterval(d time.Duration) Option { o.gcInterval = d } } + +// WithDisableBackgroundFlush disables background flush, for testing purposes only. +func WithDisableBackgroundFlush() Option { + return func(o *options) { + o.disableBackgroundFlush = true + } +} diff --git a/pkg/local_object_storage/writecache/writecachebbolt/flush.go b/pkg/local_object_storage/writecache/writecachebbolt/flush.go index d0e89a385..d73e374f5 100644 --- a/pkg/local_object_storage/writecache/writecachebbolt/flush.go +++ b/pkg/local_object_storage/writecache/writecachebbolt/flush.go @@ -36,6 +36,9 @@ const ( // runFlushLoop starts background workers which periodically flush objects to the blobstor. func (c *cache) runFlushLoop(ctx context.Context) { + if c.disableBackgroundFlush { + return + } for i := 0; i < c.workersCount; i++ { c.wg.Add(1) go c.workerFlushSmall(ctx) @@ -200,7 +203,7 @@ func (c *cache) flushFSTree(ctx context.Context, ignoreErrors bool) error { return err } - c.deleteFromDisk(ctx, []string{sAddr}) + c.deleteFromDisk(ctx, e.Address) return nil } diff --git a/pkg/local_object_storage/writecache/writecachebbolt/flush_test.go b/pkg/local_object_storage/writecache/writecachebbolt/flush_test.go index 533cec1d2..89add8115 100644 --- a/pkg/local_object_storage/writecache/writecachebbolt/flush_test.go +++ b/pkg/local_object_storage/writecache/writecachebbolt/flush_test.go @@ -31,6 +31,7 @@ func TestFlush(t *testing.T) { WithSmallObjectSize(smallSize), WithMetabase(mb), WithBlobstor(bs), + WithDisableBackgroundFlush(), }, opts...)...) } diff --git a/pkg/local_object_storage/writecache/writecachebbolt/options.go b/pkg/local_object_storage/writecache/writecachebbolt/options.go index d8eedfc79..3ea329192 100644 --- a/pkg/local_object_storage/writecache/writecachebbolt/options.go +++ b/pkg/local_object_storage/writecache/writecachebbolt/options.go @@ -44,6 +44,8 @@ type options struct { openFile func(string, int, fs.FileMode) (*os.File, error) // metrics is metrics implementation metrics writecache.Metrics + // disableBackgroundFlush is for testing purposes only. + disableBackgroundFlush bool } // WithLogger sets logger. @@ -155,3 +157,10 @@ func WithMetrics(metrics writecache.Metrics) Option { o.metrics = metrics } } + +// WithDisableBackgroundFlush disables background flush, for testing purposes only. +func WithDisableBackgroundFlush() Option { + return func(o *options) { + o.disableBackgroundFlush = true + } +} diff --git a/pkg/local_object_storage/writecache/writecachebbolt/storage.go b/pkg/local_object_storage/writecache/writecachebbolt/storage.go index bff9385dc..7a5030625 100644 --- a/pkg/local_object_storage/writecache/writecachebbolt/storage.go +++ b/pkg/local_object_storage/writecache/writecachebbolt/storage.go @@ -93,39 +93,18 @@ func (c *cache) deleteFromDB(key string) { } } -func (c *cache) deleteFromDisk(ctx context.Context, keys []string) []string { - if len(keys) == 0 { - return keys +func (c *cache) deleteFromDisk(ctx context.Context, addr oid.Address) { + _, err := c.fsTree.Delete(ctx, common.DeletePrm{Address: addr}) + if err != nil && !client.IsErrObjectNotFound(err) { + c.log.Error(logs.WritecacheCantRemoveObjectFromWritecache, zap.Error(err)) + } else if err == nil { + storagelog.Write(c.log, + storagelog.AddressField(addr.EncodeToString()), + storagelog.StorageTypeField(wcStorageType), + storagelog.OpField("fstree DELETE"), + ) + c.metrics.Evict(writecache.StorageTypeFSTree) + // counter changed by fstree + c.estimateCacheSize() } - - var copyIndex int - var addr oid.Address - - for i := range keys { - if err := addr.DecodeString(keys[i]); err != nil { - c.log.Error(logs.WritecacheCantParseAddress, zap.String("address", keys[i])) - continue - } - - _, err := c.fsTree.Delete(ctx, common.DeletePrm{Address: addr}) - if err != nil && !client.IsErrObjectNotFound(err) { - c.log.Error(logs.WritecacheCantRemoveObjectFromWritecache, zap.Error(err)) - - // Save the key for the next iteration. - keys[copyIndex] = keys[i] - copyIndex++ - continue - } else if err == nil { - storagelog.Write(c.log, - storagelog.AddressField(keys[i]), - storagelog.StorageTypeField(wcStorageType), - storagelog.OpField("fstree DELETE"), - ) - c.metrics.Evict(writecache.StorageTypeFSTree) - // counter changed by fstree - c.estimateCacheSize() - } - } - - return keys[:copyIndex] }