[#9999] writecache: Fix after revert
Some checks failed
Tests and linters / Run gofumpt (pull_request) Successful in 30s
DCO action / DCO (pull_request) Failing after 40s
Vulncheck / Vulncheck (pull_request) Successful in 54s
Build / Build Components (pull_request) Successful in 1m25s
Pre-commit hooks / Pre-commit (pull_request) Successful in 1m28s
Tests and linters / Staticcheck (pull_request) Successful in 2m1s
Tests and linters / Tests (pull_request) Successful in 2m9s
Tests and linters / gopls check (pull_request) Successful in 2m24s
Tests and linters / Lint (pull_request) Successful in 2m42s
Tests and linters / Tests with -race (pull_request) Successful in 2m56s
Some checks failed
Tests and linters / Run gofumpt (pull_request) Successful in 30s
DCO action / DCO (pull_request) Failing after 40s
Vulncheck / Vulncheck (pull_request) Successful in 54s
Build / Build Components (pull_request) Successful in 1m25s
Pre-commit hooks / Pre-commit (pull_request) Successful in 1m28s
Tests and linters / Staticcheck (pull_request) Successful in 2m1s
Tests and linters / Tests (pull_request) Successful in 2m9s
Tests and linters / gopls check (pull_request) Successful in 2m24s
Tests and linters / Lint (pull_request) Successful in 2m42s
Tests and linters / Tests with -race (pull_request) Successful in 2m56s
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
parent
6e4b75b5f9
commit
d0592df3c1
7 changed files with 24 additions and 21 deletions
|
@ -519,4 +519,7 @@ const (
|
||||||
WritecacheShrinkSkippedNotEmpty = "writecache shrink skipped: database is not empty"
|
WritecacheShrinkSkippedNotEmpty = "writecache shrink skipped: database is not empty"
|
||||||
BlobovniczatreeFailedToRemoveRebuildTempFile = "failed to remove rebuild temp file"
|
BlobovniczatreeFailedToRemoveRebuildTempFile = "failed to remove rebuild temp file"
|
||||||
FailedToUpdateMultinetConfiguration = "failed to update multinet configuration"
|
FailedToUpdateMultinetConfiguration = "failed to update multinet configuration"
|
||||||
|
WritecacheCantRemoveObjectsFromTheDatabase = "can't remove objects from the database"
|
||||||
|
WritecacheTriedToFlushItemsFromWritecache = "tried to flush items from write-cache"
|
||||||
|
FSTreeCantDecodeDBObjectAddress = "can't decode object address from the DB"
|
||||||
)
|
)
|
||||||
|
|
|
@ -109,14 +109,14 @@ func (c *cache) Open(_ context.Context, mod mode.Mode) error {
|
||||||
// Init runs necessary services.
|
// Init runs necessary services.
|
||||||
func (c *cache) Init(ctx context.Context) error {
|
func (c *cache) Init(ctx context.Context) error {
|
||||||
c.metrics.SetMode(mode.ConvertToComponentModeDegraded(c.mode))
|
c.metrics.SetMode(mode.ConvertToComponentModeDegraded(c.mode))
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
ctx, cancel := context.WithCancel(ctx)
|
||||||
c.cancel.Store(cancel)
|
c.cancel.Store(cancel)
|
||||||
c.runFlushLoop(ctx)
|
c.runFlushLoop(ctx)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Close closes db connection and stops services. Executes ObjectCounters.FlushAndClose op.
|
// Close closes db connection and stops services. Executes ObjectCounters.FlushAndClose op.
|
||||||
func (c *cache) Close(ctx context.Context) error {
|
func (c *cache) Close(_ context.Context) error {
|
||||||
if cancelValue := c.cancel.Swap(dummyCanceler); cancelValue != nil {
|
if cancelValue := c.cancel.Swap(dummyCanceler); cancelValue != nil {
|
||||||
cancelValue.(context.CancelFunc)()
|
cancelValue.(context.CancelFunc)()
|
||||||
}
|
}
|
||||||
|
@ -133,7 +133,7 @@ func (c *cache) Close(ctx context.Context) error {
|
||||||
|
|
||||||
var err error
|
var err error
|
||||||
if c.db != nil {
|
if c.db != nil {
|
||||||
err = c.db.Close(ctx)
|
err = c.db.Close()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
c.db = nil
|
c.db = nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -67,7 +67,7 @@ func (c *cache) Delete(ctx context.Context, addr oid.Address) error {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
storagelog.Write(c.log,
|
storagelog.Write(ctx, c.log,
|
||||||
storagelog.AddressField(saddr),
|
storagelog.AddressField(saddr),
|
||||||
storagelog.StorageTypeField(wcStorageType),
|
storagelog.StorageTypeField(wcStorageType),
|
||||||
storagelog.OpField("db DELETE"),
|
storagelog.OpField("db DELETE"),
|
||||||
|
|
|
@ -143,7 +143,7 @@ func (c *cache) flushSmallObjects(ctx context.Context) {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
c.log.Debug(logs.WritecacheTriedToFlushItemsFromWritecache,
|
c.log.Debug(ctx, logs.WritecacheTriedToFlushItemsFromWritecache,
|
||||||
zap.Int("count", count),
|
zap.Int("count", count),
|
||||||
zap.String("start", base58.Encode(lastKey)))
|
zap.String("start", base58.Encode(lastKey)))
|
||||||
}
|
}
|
||||||
|
@ -230,7 +230,7 @@ func (c *cache) workerFlushSmall(ctx context.Context) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
c.deleteFromDB(objInfo.addr, true)
|
c.deleteFromDB(ctx, objInfo.addr, true)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -306,7 +306,7 @@ func (c *cache) flush(ctx context.Context, ignoreErrors bool) error {
|
||||||
|
|
||||||
var last string
|
var last string
|
||||||
for {
|
for {
|
||||||
batch, err := c.readNextDBBatch(ignoreErrors, last)
|
batch, err := c.readNextDBBatch(ctx, ignoreErrors, last)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -316,7 +316,7 @@ func (c *cache) flush(ctx context.Context, ignoreErrors bool) error {
|
||||||
for _, item := range batch {
|
for _, item := range batch {
|
||||||
var obj objectSDK.Object
|
var obj objectSDK.Object
|
||||||
if err := obj.Unmarshal(item.data); err != nil {
|
if err := obj.Unmarshal(item.data); err != nil {
|
||||||
c.reportFlushError(logs.FSTreeCantDecodeDBObjectAddress, item.address, metaerr.Wrap(err))
|
c.reportFlushError(ctx, logs.FSTreeCantDecodeDBObjectAddress, item.address, metaerr.Wrap(err))
|
||||||
if ignoreErrors {
|
if ignoreErrors {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
@ -326,7 +326,7 @@ func (c *cache) flush(ctx context.Context, ignoreErrors bool) error {
|
||||||
if err := c.flushObject(ctx, &obj, item.data, StorageTypeDB); err != nil {
|
if err := c.flushObject(ctx, &obj, item.data, StorageTypeDB); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
c.deleteFromDB(item.address, false)
|
c.deleteFromDB(ctx, item.address, false)
|
||||||
}
|
}
|
||||||
last = batch[len(batch)-1].address
|
last = batch[len(batch)-1].address
|
||||||
}
|
}
|
||||||
|
@ -338,7 +338,7 @@ type batchItem struct {
|
||||||
address string
|
address string
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *cache) readNextDBBatch(ignoreErrors bool, last string) ([]batchItem, error) {
|
func (c *cache) readNextDBBatch(ctx context.Context, ignoreErrors bool, last string) ([]batchItem, error) {
|
||||||
const batchSize = 100
|
const batchSize = 100
|
||||||
var batch []batchItem
|
var batch []batchItem
|
||||||
err := c.db.View(func(tx *bbolt.Tx) error {
|
err := c.db.View(func(tx *bbolt.Tx) error {
|
||||||
|
@ -352,7 +352,7 @@ func (c *cache) readNextDBBatch(ignoreErrors bool, last string) ([]batchItem, er
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if err := addr.DecodeString(sa); err != nil {
|
if err := addr.DecodeString(sa); err != nil {
|
||||||
c.reportFlushError(logs.FSTreeCantDecodeDBObjectAddress, sa, metaerr.Wrap(err))
|
c.reportFlushError(ctx, logs.FSTreeCantDecodeDBObjectAddress, sa, metaerr.Wrap(err))
|
||||||
if ignoreErrors {
|
if ignoreErrors {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
|
@ -53,7 +53,7 @@ func (c *cache) setMode(ctx context.Context, m mode.Mode, prm setModePrm) error
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := c.closeDB(prm.shrink); err != nil {
|
if err := c.closeDB(ctx, prm.shrink); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -78,12 +78,12 @@ func (c *cache) setMode(ctx context.Context, m mode.Mode, prm setModePrm) error
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *cache) closeDB(shrink bool) error {
|
func (c *cache) closeDB(ctx context.Context, shrink bool) error {
|
||||||
if c.db == nil {
|
if c.db == nil {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
if !shrink {
|
if !shrink {
|
||||||
if err := c.db.Close(ctx); err != nil {
|
if err := c.db.Close(); err != nil {
|
||||||
return fmt.Errorf("can't close write-cache database: %w", err)
|
return fmt.Errorf("can't close write-cache database: %w", err)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
|
@ -98,7 +98,7 @@ func (c *cache) closeDB(shrink bool) error {
|
||||||
if err != nil && !errors.Is(err, bbolt.ErrDatabaseNotOpen) {
|
if err != nil && !errors.Is(err, bbolt.ErrDatabaseNotOpen) {
|
||||||
return fmt.Errorf("failed to check DB items: %w", err)
|
return fmt.Errorf("failed to check DB items: %w", err)
|
||||||
}
|
}
|
||||||
if err := c.db.Close(ctx); err != nil {
|
if err := c.db.Close(); err != nil {
|
||||||
return fmt.Errorf("can't close write-cache database: %w", err)
|
return fmt.Errorf("can't close write-cache database: %w", err)
|
||||||
}
|
}
|
||||||
if empty {
|
if empty {
|
||||||
|
|
|
@ -58,7 +58,7 @@ func (c *cache) Put(ctx context.Context, prm common.PutPrm) (common.PutRes, erro
|
||||||
|
|
||||||
if sz <= c.smallObjectSize {
|
if sz <= c.smallObjectSize {
|
||||||
storageType = StorageTypeDB
|
storageType = StorageTypeDB
|
||||||
err := c.putSmall(oi)
|
err := c.putSmall(ctx, oi)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
added = true
|
added = true
|
||||||
}
|
}
|
||||||
|
@ -75,7 +75,7 @@ func (c *cache) Put(ctx context.Context, prm common.PutPrm) (common.PutRes, erro
|
||||||
|
|
||||||
// putSmall persists small objects to the write-cache database and
|
// putSmall persists small objects to the write-cache database and
|
||||||
// pushes the to the flush workers queue.
|
// pushes the to the flush workers queue.
|
||||||
func (c *cache) putSmall(obj objectInfo) error {
|
func (c *cache) putSmall(ctx context.Context, obj objectInfo) error {
|
||||||
if !c.hasEnoughSpaceDB() {
|
if !c.hasEnoughSpaceDB() {
|
||||||
return ErrOutOfSpace
|
return ErrOutOfSpace
|
||||||
}
|
}
|
||||||
|
@ -91,7 +91,7 @@ func (c *cache) putSmall(obj objectInfo) error {
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
if err == nil {
|
if err == nil {
|
||||||
storagelog.Write(c.log,
|
storagelog.Write(ctx, c.log,
|
||||||
storagelog.AddressField(obj.addr),
|
storagelog.AddressField(obj.addr),
|
||||||
storagelog.StorageTypeField(wcStorageType),
|
storagelog.StorageTypeField(wcStorageType),
|
||||||
storagelog.OpField("db PUT"),
|
storagelog.OpField("db PUT"),
|
||||||
|
|
|
@ -68,7 +68,7 @@ func (c *cache) openStore(mod mode.ComponentMode) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *cache) deleteFromDB(key string, batched bool) {
|
func (c *cache) deleteFromDB(ctx context.Context, key string, batched bool) {
|
||||||
var recordDeleted bool
|
var recordDeleted bool
|
||||||
var err error
|
var err error
|
||||||
if batched {
|
if batched {
|
||||||
|
@ -89,7 +89,7 @@ func (c *cache) deleteFromDB(key string, batched bool) {
|
||||||
|
|
||||||
if err == nil {
|
if err == nil {
|
||||||
c.metrics.Evict(StorageTypeDB)
|
c.metrics.Evict(StorageTypeDB)
|
||||||
storagelog.Write(c.log,
|
storagelog.Write(ctx, c.log,
|
||||||
storagelog.AddressField(key),
|
storagelog.AddressField(key),
|
||||||
storagelog.StorageTypeField(wcStorageType),
|
storagelog.StorageTypeField(wcStorageType),
|
||||||
storagelog.OpField("db DELETE"),
|
storagelog.OpField("db DELETE"),
|
||||||
|
@ -99,7 +99,7 @@ func (c *cache) deleteFromDB(key string, batched bool) {
|
||||||
c.estimateCacheSize()
|
c.estimateCacheSize()
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
c.log.Error(logs.WritecacheCantRemoveObjectsFromTheDatabase, zap.Error(err))
|
c.log.Error(ctx, logs.WritecacheCantRemoveObjectsFromTheDatabase, zap.Error(err))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Add table
Reference in a new issue