diff --git a/internal/logs/logs.go b/internal/logs/logs.go index 5e1c4b0de..ae77a5ab2 100644 --- a/internal/logs/logs.go +++ b/internal/logs/logs.go @@ -519,4 +519,7 @@ const ( WritecacheShrinkSkippedNotEmpty = "writecache shrink skipped: database is not empty" BlobovniczatreeFailedToRemoveRebuildTempFile = "failed to remove rebuild temp file" FailedToUpdateMultinetConfiguration = "failed to update multinet configuration" + WritecacheCantRemoveObjectsFromTheDatabase = "can't remove objects from the database" + WritecacheTriedToFlushItemsFromWritecache = "tried to flush items from write-cache" + FSTreeCantDecodeDBObjectAddress = "can't decode object address from the DB" ) diff --git a/pkg/local_object_storage/writecache/cachebbolt.go b/pkg/local_object_storage/writecache/cachebbolt.go index 113ce54cd..9bcc89f6a 100644 --- a/pkg/local_object_storage/writecache/cachebbolt.go +++ b/pkg/local_object_storage/writecache/cachebbolt.go @@ -109,14 +109,14 @@ func (c *cache) Open(_ context.Context, mod mode.Mode) error { // Init runs necessary services. func (c *cache) Init(ctx context.Context) error { c.metrics.SetMode(mode.ConvertToComponentModeDegraded(c.mode)) - ctx, cancel := context.WithCancel(context.Background()) + ctx, cancel := context.WithCancel(ctx) c.cancel.Store(cancel) c.runFlushLoop(ctx) return nil } // Close closes db connection and stops services. Executes ObjectCounters.FlushAndClose op. -func (c *cache) Close(ctx context.Context) error { +func (c *cache) Close(_ context.Context) error { if cancelValue := c.cancel.Swap(dummyCanceler); cancelValue != nil { cancelValue.(context.CancelFunc)() } @@ -133,7 +133,7 @@ func (c *cache) Close(ctx context.Context) error { var err error if c.db != nil { - err = c.db.Close(ctx) + err = c.db.Close() if err != nil { c.db = nil } diff --git a/pkg/local_object_storage/writecache/delete.go b/pkg/local_object_storage/writecache/delete.go index ca3ab686c..a17c1c0b6 100644 --- a/pkg/local_object_storage/writecache/delete.go +++ b/pkg/local_object_storage/writecache/delete.go @@ -67,7 +67,7 @@ func (c *cache) Delete(ctx context.Context, addr oid.Address) error { if err != nil { return err } - storagelog.Write(c.log, + storagelog.Write(ctx, c.log, storagelog.AddressField(saddr), storagelog.StorageTypeField(wcStorageType), storagelog.OpField("db DELETE"), diff --git a/pkg/local_object_storage/writecache/flush.go b/pkg/local_object_storage/writecache/flush.go index acfb61653..a869eb028 100644 --- a/pkg/local_object_storage/writecache/flush.go +++ b/pkg/local_object_storage/writecache/flush.go @@ -143,7 +143,7 @@ func (c *cache) flushSmallObjects(ctx context.Context) { break } - c.log.Debug(logs.WritecacheTriedToFlushItemsFromWritecache, + c.log.Debug(ctx, logs.WritecacheTriedToFlushItemsFromWritecache, zap.Int("count", count), zap.String("start", base58.Encode(lastKey))) } @@ -230,7 +230,7 @@ func (c *cache) workerFlushSmall(ctx context.Context) { continue } - c.deleteFromDB(objInfo.addr, true) + c.deleteFromDB(ctx, objInfo.addr, true) } } @@ -306,7 +306,7 @@ func (c *cache) flush(ctx context.Context, ignoreErrors bool) error { var last string for { - batch, err := c.readNextDBBatch(ignoreErrors, last) + batch, err := c.readNextDBBatch(ctx, ignoreErrors, last) if err != nil { return err } @@ -316,7 +316,7 @@ func (c *cache) flush(ctx context.Context, ignoreErrors bool) error { for _, item := range batch { var obj objectSDK.Object if err := obj.Unmarshal(item.data); err != nil { - c.reportFlushError(logs.FSTreeCantDecodeDBObjectAddress, item.address, metaerr.Wrap(err)) + c.reportFlushError(ctx, logs.FSTreeCantDecodeDBObjectAddress, item.address, metaerr.Wrap(err)) if ignoreErrors { continue } @@ -326,7 +326,7 @@ func (c *cache) flush(ctx context.Context, ignoreErrors bool) error { if err := c.flushObject(ctx, &obj, item.data, StorageTypeDB); err != nil { return err } - c.deleteFromDB(item.address, false) + c.deleteFromDB(ctx, item.address, false) } last = batch[len(batch)-1].address } @@ -338,7 +338,7 @@ type batchItem struct { address string } -func (c *cache) readNextDBBatch(ignoreErrors bool, last string) ([]batchItem, error) { +func (c *cache) readNextDBBatch(ctx context.Context, ignoreErrors bool, last string) ([]batchItem, error) { const batchSize = 100 var batch []batchItem err := c.db.View(func(tx *bbolt.Tx) error { @@ -352,7 +352,7 @@ func (c *cache) readNextDBBatch(ignoreErrors bool, last string) ([]batchItem, er continue } if err := addr.DecodeString(sa); err != nil { - c.reportFlushError(logs.FSTreeCantDecodeDBObjectAddress, sa, metaerr.Wrap(err)) + c.reportFlushError(ctx, logs.FSTreeCantDecodeDBObjectAddress, sa, metaerr.Wrap(err)) if ignoreErrors { continue } diff --git a/pkg/local_object_storage/writecache/mode.go b/pkg/local_object_storage/writecache/mode.go index 9a0c09f41..603da7e78 100644 --- a/pkg/local_object_storage/writecache/mode.go +++ b/pkg/local_object_storage/writecache/mode.go @@ -53,7 +53,7 @@ func (c *cache) setMode(ctx context.Context, m mode.Mode, prm setModePrm) error } } - if err := c.closeDB(prm.shrink); err != nil { + if err := c.closeDB(ctx, prm.shrink); err != nil { return err } @@ -78,12 +78,12 @@ func (c *cache) setMode(ctx context.Context, m mode.Mode, prm setModePrm) error return nil } -func (c *cache) closeDB(shrink bool) error { +func (c *cache) closeDB(ctx context.Context, shrink bool) error { if c.db == nil { return nil } if !shrink { - if err := c.db.Close(ctx); err != nil { + if err := c.db.Close(); err != nil { return fmt.Errorf("can't close write-cache database: %w", err) } return nil @@ -98,7 +98,7 @@ func (c *cache) closeDB(shrink bool) error { if err != nil && !errors.Is(err, bbolt.ErrDatabaseNotOpen) { return fmt.Errorf("failed to check DB items: %w", err) } - if err := c.db.Close(ctx); err != nil { + if err := c.db.Close(); err != nil { return fmt.Errorf("can't close write-cache database: %w", err) } if empty { diff --git a/pkg/local_object_storage/writecache/put.go b/pkg/local_object_storage/writecache/put.go index c423c2163..b9515a401 100644 --- a/pkg/local_object_storage/writecache/put.go +++ b/pkg/local_object_storage/writecache/put.go @@ -58,7 +58,7 @@ func (c *cache) Put(ctx context.Context, prm common.PutPrm) (common.PutRes, erro if sz <= c.smallObjectSize { storageType = StorageTypeDB - err := c.putSmall(oi) + err := c.putSmall(ctx, oi) if err == nil { added = true } @@ -75,7 +75,7 @@ func (c *cache) Put(ctx context.Context, prm common.PutPrm) (common.PutRes, erro // putSmall persists small objects to the write-cache database and // pushes the to the flush workers queue. -func (c *cache) putSmall(obj objectInfo) error { +func (c *cache) putSmall(ctx context.Context, obj objectInfo) error { if !c.hasEnoughSpaceDB() { return ErrOutOfSpace } @@ -91,7 +91,7 @@ func (c *cache) putSmall(obj objectInfo) error { return nil }) if err == nil { - storagelog.Write(c.log, + storagelog.Write(ctx, c.log, storagelog.AddressField(obj.addr), storagelog.StorageTypeField(wcStorageType), storagelog.OpField("db PUT"), diff --git a/pkg/local_object_storage/writecache/storage.go b/pkg/local_object_storage/writecache/storage.go index 411ec34b9..feb44bd1d 100644 --- a/pkg/local_object_storage/writecache/storage.go +++ b/pkg/local_object_storage/writecache/storage.go @@ -68,7 +68,7 @@ func (c *cache) openStore(mod mode.ComponentMode) error { return nil } -func (c *cache) deleteFromDB(key string, batched bool) { +func (c *cache) deleteFromDB(ctx context.Context, key string, batched bool) { var recordDeleted bool var err error if batched { @@ -89,7 +89,7 @@ func (c *cache) deleteFromDB(key string, batched bool) { if err == nil { c.metrics.Evict(StorageTypeDB) - storagelog.Write(c.log, + storagelog.Write(ctx, c.log, storagelog.AddressField(key), storagelog.StorageTypeField(wcStorageType), storagelog.OpField("db DELETE"), @@ -99,7 +99,7 @@ func (c *cache) deleteFromDB(key string, batched bool) { c.estimateCacheSize() } } else { - c.log.Error(logs.WritecacheCantRemoveObjectsFromTheDatabase, zap.Error(err)) + c.log.Error(ctx, logs.WritecacheCantRemoveObjectsFromTheDatabase, zap.Error(err)) } }