writecache: Fix flush test #817
8 changed files with 40 additions and 36 deletions
|
@ -296,7 +296,6 @@ const (
|
||||||
WritecacheFillingFlushMarksForObjectsInDatabase = "filling flush marks for objects in database"
|
WritecacheFillingFlushMarksForObjectsInDatabase = "filling flush marks for objects in database"
|
||||||
WritecacheFinishedUpdatingFlushMarks = "finished updating flush marks"
|
WritecacheFinishedUpdatingFlushMarks = "finished updating flush marks"
|
||||||
WritecacheCantRemoveObjectsFromTheDatabase = "can't remove objects from the database"
|
WritecacheCantRemoveObjectsFromTheDatabase = "can't remove objects from the database"
|
||||||
WritecacheCantParseAddress = "can't parse address"
|
|
||||||
WritecacheCantRemoveObjectFromWritecache = "can't remove object from write-cache"
|
WritecacheCantRemoveObjectFromWritecache = "can't remove object from write-cache"
|
||||||
WritecacheDBValueLogGCRunCompleted = "value log GC run completed"
|
WritecacheDBValueLogGCRunCompleted = "value log GC run completed"
|
||||||
WritecacheBadgerObjAlreadyScheduled = "object already scheduled for flush"
|
WritecacheBadgerObjAlreadyScheduled = "object already scheduled for flush"
|
||||||
|
|
|
@ -97,6 +97,9 @@ func (c *collector) send(ctx context.Context, cancel func(), buf *z.Buffer) erro
|
||||||
|
|
||||||
// runFlushLoop starts background workers which periodically flush objects to the blobstor.
|
// runFlushLoop starts background workers which periodically flush objects to the blobstor.
|
||||||
func (c *cache) runFlushLoop(ctx context.Context) {
|
func (c *cache) runFlushLoop(ctx context.Context) {
|
||||||
|
if c.disableBackgroundFlush {
|
||||||
|
return
|
||||||
|
}
|
||||||
for i := 0; i < c.workersCount; i++ {
|
for i := 0; i < c.workersCount; i++ {
|
||||||
c.wg.Add(1)
|
c.wg.Add(1)
|
||||||
go c.workerFlushSmall(ctx)
|
go c.workerFlushSmall(ctx)
|
||||||
|
|
|
@ -28,6 +28,7 @@ func TestFlush(t *testing.T) {
|
||||||
WithMetabase(mb),
|
WithMetabase(mb),
|
||||||
WithBlobstor(bs),
|
WithBlobstor(bs),
|
||||||
WithGCInterval(1 * time.Second),
|
WithGCInterval(1 * time.Second),
|
||||||
|
WithDisableBackgroundFlush(),
|
||||||
}, opts...)...)
|
}, opts...)...)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -34,6 +34,8 @@ type options struct {
|
||||||
metrics writecache.Metrics
|
metrics writecache.Metrics
|
||||||
// gcInterval is the interval duration to run the GC cycle.
|
// gcInterval is the interval duration to run the GC cycle.
|
||||||
gcInterval time.Duration
|
gcInterval time.Duration
|
||||||
|
// disableBackgroundFlush is for testing purposes only.
|
||||||
|
disableBackgroundFlush bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// WithLogger sets logger.
|
// WithLogger sets logger.
|
||||||
|
@ -108,3 +110,10 @@ func WithGCInterval(d time.Duration) Option {
|
||||||
o.gcInterval = d
|
o.gcInterval = d
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// WithDisableBackgroundFlush disables background flush, for testing purposes only.
|
||||||
|
func WithDisableBackgroundFlush() Option {
|
||||||
|
return func(o *options) {
|
||||||
|
o.disableBackgroundFlush = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -36,6 +36,9 @@ const (
|
||||||
|
|
||||||
// runFlushLoop starts background workers which periodically flush objects to the blobstor.
|
// runFlushLoop starts background workers which periodically flush objects to the blobstor.
|
||||||
func (c *cache) runFlushLoop(ctx context.Context) {
|
func (c *cache) runFlushLoop(ctx context.Context) {
|
||||||
|
if c.disableBackgroundFlush {
|
||||||
|
return
|
||||||
|
}
|
||||||
for i := 0; i < c.workersCount; i++ {
|
for i := 0; i < c.workersCount; i++ {
|
||||||
c.wg.Add(1)
|
c.wg.Add(1)
|
||||||
go c.workerFlushSmall(ctx)
|
go c.workerFlushSmall(ctx)
|
||||||
|
@ -200,7 +203,7 @@ func (c *cache) flushFSTree(ctx context.Context, ignoreErrors bool) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
c.deleteFromDisk(ctx, []string{sAddr})
|
c.deleteFromDisk(ctx, e.Address)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -31,6 +31,7 @@ func TestFlush(t *testing.T) {
|
||||||
WithSmallObjectSize(smallSize),
|
WithSmallObjectSize(smallSize),
|
||||||
WithMetabase(mb),
|
WithMetabase(mb),
|
||||||
WithBlobstor(bs),
|
WithBlobstor(bs),
|
||||||
|
WithDisableBackgroundFlush(),
|
||||||
}, opts...)...)
|
}, opts...)...)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -44,6 +44,8 @@ type options struct {
|
||||||
openFile func(string, int, fs.FileMode) (*os.File, error)
|
openFile func(string, int, fs.FileMode) (*os.File, error)
|
||||||
// metrics is metrics implementation
|
// metrics is metrics implementation
|
||||||
metrics writecache.Metrics
|
metrics writecache.Metrics
|
||||||
|
// disableBackgroundFlush is for testing purposes only.
|
||||||
|
disableBackgroundFlush bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// WithLogger sets logger.
|
// WithLogger sets logger.
|
||||||
|
@ -155,3 +157,10 @@ func WithMetrics(metrics writecache.Metrics) Option {
|
||||||
o.metrics = metrics
|
o.metrics = metrics
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// WithDisableBackgroundFlush disables background flush, for testing purposes only.
|
||||||
|
func WithDisableBackgroundFlush() Option {
|
||||||
|
|||||||
|
return func(o *options) {
|
||||||
|
o.disableBackgroundFlush = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -93,39 +93,18 @@ func (c *cache) deleteFromDB(key string) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *cache) deleteFromDisk(ctx context.Context, keys []string) []string {
|
func (c *cache) deleteFromDisk(ctx context.Context, addr oid.Address) {
|
||||||
dstepanov-yadro
commented
This change doesn't relate to fix, but small refactoring. This change doesn't relate to fix, but small refactoring.
|
|||||||
if len(keys) == 0 {
|
_, err := c.fsTree.Delete(ctx, common.DeletePrm{Address: addr})
|
||||||
return keys
|
if err != nil && !client.IsErrObjectNotFound(err) {
|
||||||
|
c.log.Error(logs.WritecacheCantRemoveObjectFromWritecache, zap.Error(err))
|
||||||
|
} else if err == nil {
|
||||||
|
storagelog.Write(c.log,
|
||||||
|
storagelog.AddressField(addr.EncodeToString()),
|
||||||
|
storagelog.StorageTypeField(wcStorageType),
|
||||||
|
storagelog.OpField("fstree DELETE"),
|
||||||
|
)
|
||||||
|
c.metrics.Evict(writecache.StorageTypeFSTree)
|
||||||
|
// counter changed by fstree
|
||||||
|
c.estimateCacheSize()
|
||||||
}
|
}
|
||||||
|
|
||||||
var copyIndex int
|
|
||||||
var addr oid.Address
|
|
||||||
|
|
||||||
for i := range keys {
|
|
||||||
if err := addr.DecodeString(keys[i]); err != nil {
|
|
||||||
c.log.Error(logs.WritecacheCantParseAddress, zap.String("address", keys[i]))
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
_, err := c.fsTree.Delete(ctx, common.DeletePrm{Address: addr})
|
|
||||||
if err != nil && !client.IsErrObjectNotFound(err) {
|
|
||||||
c.log.Error(logs.WritecacheCantRemoveObjectFromWritecache, zap.Error(err))
|
|
||||||
|
|
||||||
// Save the key for the next iteration.
|
|
||||||
keys[copyIndex] = keys[i]
|
|
||||||
copyIndex++
|
|
||||||
continue
|
|
||||||
} else if err == nil {
|
|
||||||
storagelog.Write(c.log,
|
|
||||||
storagelog.AddressField(keys[i]),
|
|
||||||
storagelog.StorageTypeField(wcStorageType),
|
|
||||||
storagelog.OpField("fstree DELETE"),
|
|
||||||
)
|
|
||||||
c.metrics.Evict(writecache.StorageTypeFSTree)
|
|
||||||
// counter changed by fstree
|
|
||||||
c.estimateCacheSize()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return keys[:copyIndex]
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue
If it is for tests only, can we define it in
_test.go
file?