writecache: Fix flush test #817

Merged
dstepanov-yadro merged 1 commit from dstepanov-yadro/frostfs-node:fix/test_flush_flaky into master 2023-11-20 15:09:37 +00:00
8 changed files with 40 additions and 36 deletions

View file

@ -296,7 +296,6 @@ const (
WritecacheFillingFlushMarksForObjectsInDatabase = "filling flush marks for objects in database" WritecacheFillingFlushMarksForObjectsInDatabase = "filling flush marks for objects in database"
WritecacheFinishedUpdatingFlushMarks = "finished updating flush marks" WritecacheFinishedUpdatingFlushMarks = "finished updating flush marks"
WritecacheCantRemoveObjectsFromTheDatabase = "can't remove objects from the database" WritecacheCantRemoveObjectsFromTheDatabase = "can't remove objects from the database"
WritecacheCantParseAddress = "can't parse address"
WritecacheCantRemoveObjectFromWritecache = "can't remove object from write-cache" WritecacheCantRemoveObjectFromWritecache = "can't remove object from write-cache"
WritecacheDBValueLogGCRunCompleted = "value log GC run completed" WritecacheDBValueLogGCRunCompleted = "value log GC run completed"
WritecacheBadgerObjAlreadyScheduled = "object already scheduled for flush" WritecacheBadgerObjAlreadyScheduled = "object already scheduled for flush"

View file

@ -97,6 +97,9 @@ func (c *collector) send(ctx context.Context, cancel func(), buf *z.Buffer) erro
// runFlushLoop starts background workers which periodically flush objects to the blobstor. // runFlushLoop starts background workers which periodically flush objects to the blobstor.
func (c *cache) runFlushLoop(ctx context.Context) { func (c *cache) runFlushLoop(ctx context.Context) {
if c.disableBackgroundFlush {
return
}
for i := 0; i < c.workersCount; i++ { for i := 0; i < c.workersCount; i++ {
c.wg.Add(1) c.wg.Add(1)
go c.workerFlushSmall(ctx) go c.workerFlushSmall(ctx)

View file

@ -28,6 +28,7 @@ func TestFlush(t *testing.T) {
WithMetabase(mb), WithMetabase(mb),
WithBlobstor(bs), WithBlobstor(bs),
WithGCInterval(1 * time.Second), WithGCInterval(1 * time.Second),
WithDisableBackgroundFlush(),
}, opts...)...) }, opts...)...)
} }

View file

@ -34,6 +34,8 @@ type options struct {
metrics writecache.Metrics metrics writecache.Metrics
// gcInterval is the interval duration to run the GC cycle. // gcInterval is the interval duration to run the GC cycle.
gcInterval time.Duration gcInterval time.Duration
// disableBackgroundFlush is for testing purposes only.
disableBackgroundFlush bool
} }
// WithLogger sets logger. // WithLogger sets logger.
@ -108,3 +110,10 @@ func WithGCInterval(d time.Duration) Option {
o.gcInterval = d o.gcInterval = d
} }
} }
// WithDisableBackgroundFlush disables background flush, for testing purposes only.
func WithDisableBackgroundFlush() Option {
return func(o *options) {
o.disableBackgroundFlush = true
}
}

View file

@ -36,6 +36,9 @@ const (
// runFlushLoop starts background workers which periodically flush objects to the blobstor. // runFlushLoop starts background workers which periodically flush objects to the blobstor.
func (c *cache) runFlushLoop(ctx context.Context) { func (c *cache) runFlushLoop(ctx context.Context) {
if c.disableBackgroundFlush {
return
}
for i := 0; i < c.workersCount; i++ { for i := 0; i < c.workersCount; i++ {
c.wg.Add(1) c.wg.Add(1)
go c.workerFlushSmall(ctx) go c.workerFlushSmall(ctx)
@ -200,7 +203,7 @@ func (c *cache) flushFSTree(ctx context.Context, ignoreErrors bool) error {
return err return err
} }
c.deleteFromDisk(ctx, []string{sAddr}) c.deleteFromDisk(ctx, e.Address)
return nil return nil
} }

View file

@ -31,6 +31,7 @@ func TestFlush(t *testing.T) {
WithSmallObjectSize(smallSize), WithSmallObjectSize(smallSize),
WithMetabase(mb), WithMetabase(mb),
WithBlobstor(bs), WithBlobstor(bs),
WithDisableBackgroundFlush(),
}, opts...)...) }, opts...)...)
} }

View file

@ -44,6 +44,8 @@ type options struct {
openFile func(string, int, fs.FileMode) (*os.File, error) openFile func(string, int, fs.FileMode) (*os.File, error)
// metrics is metrics implementation // metrics is metrics implementation
metrics writecache.Metrics metrics writecache.Metrics
// disableBackgroundFlush is for testing purposes only.
disableBackgroundFlush bool
} }
// WithLogger sets logger. // WithLogger sets logger.
@ -155,3 +157,10 @@ func WithMetrics(metrics writecache.Metrics) Option {
o.metrics = metrics o.metrics = metrics
} }
} }
// WithDisableBackgroundFlush disables background flush, for testing purposes only.
func WithDisableBackgroundFlush() Option {
Review

If it is for tests only, can we define it in _test.go file?

If it is for tests only, can we define it in `_test.go` file?
return func(o *options) {
o.disableBackgroundFlush = true
}
}

View file

@ -93,31 +93,13 @@ func (c *cache) deleteFromDB(key string) {
} }
} }
func (c *cache) deleteFromDisk(ctx context.Context, keys []string) []string { func (c *cache) deleteFromDisk(ctx context.Context, addr oid.Address) {
Review

This change doesn't relate to fix, but small refactoring.

This change doesn't relate to fix, but small refactoring.
if len(keys) == 0 {
return keys
}
var copyIndex int
var addr oid.Address
for i := range keys {
if err := addr.DecodeString(keys[i]); err != nil {
c.log.Error(logs.WritecacheCantParseAddress, zap.String("address", keys[i]))
continue
}
_, err := c.fsTree.Delete(ctx, common.DeletePrm{Address: addr}) _, err := c.fsTree.Delete(ctx, common.DeletePrm{Address: addr})
if err != nil && !client.IsErrObjectNotFound(err) { if err != nil && !client.IsErrObjectNotFound(err) {
c.log.Error(logs.WritecacheCantRemoveObjectFromWritecache, zap.Error(err)) c.log.Error(logs.WritecacheCantRemoveObjectFromWritecache, zap.Error(err))
// Save the key for the next iteration.
keys[copyIndex] = keys[i]
copyIndex++
continue
} else if err == nil { } else if err == nil {
storagelog.Write(c.log, storagelog.Write(c.log,
storagelog.AddressField(keys[i]), storagelog.AddressField(addr.EncodeToString()),
storagelog.StorageTypeField(wcStorageType), storagelog.StorageTypeField(wcStorageType),
storagelog.OpField("fstree DELETE"), storagelog.OpField("fstree DELETE"),
) )
@ -125,7 +107,4 @@ func (c *cache) deleteFromDisk(ctx context.Context, keys []string) []string {
// counter changed by fstree // counter changed by fstree
c.estimateCacheSize() c.estimateCacheSize()
} }
}
return keys[:copyIndex]
} }