WIP: Simplify write-cache #314
16 changed files with 470 additions and 580 deletions
|
@ -91,7 +91,7 @@ func (s *Shard) Get(ctx context.Context, prm GetPrm) (GetRes, error) {
|
||||||
return res.Object, nil
|
return res.Object, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
wc := func(c writecache.Cache) (*objectSDK.Object, error) {
|
wc := func(c *writecache.Cache) (*objectSDK.Object, error) {
|
||||||
return c.Get(ctx, prm.addr)
|
return c.Get(ctx, prm.addr)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -109,7 +109,7 @@ func (s *Shard) Get(ctx context.Context, prm GetPrm) (GetRes, error) {
|
||||||
var emptyStorageID = make([]byte, 0)
|
var emptyStorageID = make([]byte, 0)
|
||||||
|
|
||||||
// fetchObjectData looks through writeCache and blobStor to find object.
|
// fetchObjectData looks through writeCache and blobStor to find object.
|
||||||
func (s *Shard) fetchObjectData(ctx context.Context, addr oid.Address, skipMeta bool, cb storFetcher, wc func(w writecache.Cache) (*objectSDK.Object, error)) (*objectSDK.Object, bool, error) {
|
func (s *Shard) fetchObjectData(ctx context.Context, addr oid.Address, skipMeta bool, cb storFetcher, wc func(w *writecache.Cache) (*objectSDK.Object, error)) (*objectSDK.Object, bool, error) {
|
||||||
var (
|
var (
|
||||||
mErr error
|
mErr error
|
||||||
mRes meta.ExistsRes
|
mRes meta.ExistsRes
|
||||||
|
|
|
@ -57,3 +57,7 @@ func (m Mode) NoMetabase() bool {
|
||||||
func (m Mode) ReadOnly() bool {
|
func (m Mode) ReadOnly() bool {
|
||||||
return m&ReadOnly != 0
|
return m&ReadOnly != 0
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (m Mode) ReadWrite() bool {
|
||||||
|
return m == 0
|
||||||
|
}
|
||||||
|
|
|
@ -104,7 +104,7 @@ func (s *Shard) GetRange(ctx context.Context, prm RngPrm) (RngRes, error) {
|
||||||
return obj, nil
|
return obj, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
wc := func(c writecache.Cache) (*object.Object, error) {
|
wc := func(c *writecache.Cache) (*object.Object, error) {
|
||||||
res, err := c.Get(ctx, prm.addr)
|
res, err := c.Get(ctx, prm.addr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
|
|
@ -23,7 +23,7 @@ type Shard struct {
|
||||||
|
|
||||||
gc *gc
|
gc *gc
|
||||||
|
|
||||||
writeCache writecache.Cache
|
writeCache *writecache.Cache
|
||||||
|
|
||||||
blobStor *blobstor.BlobStor
|
blobStor *blobstor.BlobStor
|
||||||
|
|
||||||
|
|
|
@ -2,10 +2,12 @@ package writecache
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"errors"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing"
|
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||||
storagelog "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/log"
|
storagelog "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/log"
|
||||||
|
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
"go.etcd.io/bbolt"
|
"go.etcd.io/bbolt"
|
||||||
"go.opentelemetry.io/otel/attribute"
|
"go.opentelemetry.io/otel/attribute"
|
||||||
|
@ -15,7 +17,7 @@ import (
|
||||||
// Delete removes object from write-cache.
|
// Delete removes object from write-cache.
|
||||||
//
|
//
|
||||||
// Returns an error of type apistatus.ObjectNotFound if object is missing in write-cache.
|
// Returns an error of type apistatus.ObjectNotFound if object is missing in write-cache.
|
||||||
func (c *cache) Delete(ctx context.Context, addr oid.Address) error {
|
func (c *Cache) Delete(ctx context.Context, addr oid.Address) error {
|
||||||
ctx, span := tracing.StartSpanFromContext(ctx, "writecache.Delete",
|
ctx, span := tracing.StartSpanFromContext(ctx, "writecache.Delete",
|
||||||
trace.WithAttributes(
|
trace.WithAttributes(
|
||||||
attribute.String("address", addr.EncodeToString()),
|
attribute.String("address", addr.EncodeToString()),
|
||||||
|
@ -31,14 +33,14 @@ func (c *cache) Delete(ctx context.Context, addr oid.Address) error {
|
||||||
saddr := addr.EncodeToString()
|
saddr := addr.EncodeToString()
|
||||||
|
|
||||||
// Check disk cache.
|
// Check disk cache.
|
||||||
var has int
|
var valLen int
|
||||||
_ = c.db.View(func(tx *bbolt.Tx) error {
|
_ = c.db.View(func(tx *bbolt.Tx) error {
|
||||||
b := tx.Bucket(defaultBucket)
|
b := tx.Bucket(defaultBucket)
|
||||||
has = len(b.Get([]byte(saddr)))
|
valLen = len(b.Get([]byte(saddr)))
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
|
|
||||||
if 0 < has {
|
if valLen > 0 {
|
||||||
err := c.db.Update(func(tx *bbolt.Tx) error {
|
err := c.db.Update(func(tx *bbolt.Tx) error {
|
||||||
b := tx.Bucket(defaultBucket)
|
b := tx.Bucket(defaultBucket)
|
||||||
err := b.Delete([]byte(saddr))
|
err := b.Delete([]byte(saddr))
|
||||||
|
@ -52,18 +54,32 @@ func (c *cache) Delete(ctx context.Context, addr oid.Address) error {
|
||||||
storagelog.StorageTypeField(wcStorageType),
|
storagelog.StorageTypeField(wcStorageType),
|
||||||
storagelog.OpField("db DELETE"),
|
storagelog.OpField("db DELETE"),
|
||||||
)
|
)
|
||||||
c.objCounters.DecDB()
|
c.objCounters.decDB(valLen)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err := c.fsTree.Delete(ctx, common.DeletePrm{Address: addr})
|
// While getting an object looks overheadly, it allows to
|
||||||
|
// get its size correctly without any additional memory/disk/CPU
|
||||||
|
// usage on the WC's side _for every object_. `Delete` is not
|
||||||
|
// expected to be called right after an object is put to the
|
||||||
|
// Write-cache often, and for non-existing objects (persisted
|
||||||
|
// to the main storage and dropped from the WC's storage) it
|
||||||
|
// is `os.Stat` vs `os.Remove` calls after all.
|
||||||
|
res, err := c.fsTree.Get(ctx, common.GetPrm{Address: addr})
|
||||||
|
if errors.As(err, new(apistatus.ObjectNotFound)) {
|
||||||
|
return nil
|
||||||
|
} else if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = c.fsTree.Delete(ctx, common.DeletePrm{Address: addr})
|
||||||
if err == nil {
|
if err == nil {
|
||||||
storagelog.Write(c.log,
|
storagelog.Write(c.log,
|
||||||
storagelog.AddressField(saddr),
|
storagelog.AddressField(saddr),
|
||||||
storagelog.StorageTypeField(wcStorageType),
|
storagelog.StorageTypeField(wcStorageType),
|
||||||
storagelog.OpField("fstree DELETE"),
|
storagelog.OpField("fstree DELETE"),
|
||||||
)
|
)
|
||||||
c.objCounters.DecFS()
|
c.objCounters.decFS(len(res.RawData))
|
||||||
}
|
}
|
||||||
|
|
||||||
return err
|
return err
|
||||||
|
|
|
@ -11,7 +11,9 @@ import (
|
||||||
objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||||
|
storagelog "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/log"
|
||||||
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
||||||
|
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
"github.com/mr-tron/base58"
|
"github.com/mr-tron/base58"
|
||||||
|
@ -33,11 +35,16 @@ const (
|
||||||
defaultFlushInterval = time.Second
|
defaultFlushInterval = time.Second
|
||||||
)
|
)
|
||||||
|
|
||||||
|
type objWithData struct {
|
||||||
|
obj *object.Object
|
||||||
|
data []byte
|
||||||
|
}
|
||||||
|
|
||||||
// runFlushLoop starts background workers which periodically flush objects to the blobstor.
|
// runFlushLoop starts background workers which periodically flush objects to the blobstor.
|
||||||
func (c *cache) runFlushLoop() {
|
func (c *Cache) runFlushLoop() {
|
||||||
for i := 0; i < c.workersCount; i++ {
|
for i := 0; i < c.workersCount; i++ {
|
||||||
c.wg.Add(1)
|
c.wg.Add(1)
|
||||||
go c.flushWorker(i)
|
go c.smallObjectsFlusher()
|
||||||
}
|
}
|
||||||
|
|
||||||
c.wg.Add(1)
|
c.wg.Add(1)
|
||||||
|
@ -56,34 +63,27 @@ func (c *cache) runFlushLoop() {
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-tt.C:
|
case <-tt.C:
|
||||||
c.flushDB()
|
c.flushSmallObjects()
|
||||||
tt.Reset(defaultFlushInterval)
|
tt.Reset(defaultFlushInterval)
|
||||||
case <-c.closeCh:
|
case <-c.workersChan:
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *cache) flushDB() {
|
func (c *Cache) flushSmallObjects() {
|
||||||
var lastKey []byte
|
var lastKey []byte
|
||||||
var m []objectInfo
|
var m []objectInfo
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-c.closeCh:
|
case <-c.workersChan:
|
||||||
return
|
return
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
|
|
||||||
m = m[:0]
|
m = m[:0]
|
||||||
|
|
||||||
c.modeMtx.RLock()
|
|
||||||
if c.readOnly() || !c.initialized.Load() {
|
|
||||||
c.modeMtx.RUnlock()
|
|
||||||
time.Sleep(time.Second)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
// We put objects in batches of fixed size to not interfere with main put cycle a lot.
|
// We put objects in batches of fixed size to not interfere with main put cycle a lot.
|
||||||
_ = c.db.View(func(tx *bbolt.Tx) error {
|
_ = c.db.View(func(tx *bbolt.Tx) error {
|
||||||
b := tx.Bucket(defaultBucket)
|
b := tx.Bucket(defaultBucket)
|
||||||
|
@ -117,38 +117,32 @@ func (c *cache) flushDB() {
|
||||||
|
|
||||||
var count int
|
var count int
|
||||||
for i := range m {
|
for i := range m {
|
||||||
if c.flushed.Contains(m[i].addr) {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
obj := object.New()
|
obj := object.New()
|
||||||
if err := obj.Unmarshal(m[i].data); err != nil {
|
data := m[i].data
|
||||||
|
|
||||||
|
if err := obj.Unmarshal(data); err != nil {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
count++
|
count++
|
||||||
select {
|
select {
|
||||||
case c.flushCh <- obj:
|
case c.smallFlushCh <- objWithData{obj: obj, data: data}:
|
||||||
case <-c.closeCh:
|
case <-c.workersChan:
|
||||||
c.modeMtx.RUnlock()
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if count == 0 {
|
if count == 0 {
|
||||||
c.modeMtx.RUnlock()
|
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
c.modeMtx.RUnlock()
|
|
||||||
|
|
||||||
c.log.Debug(logs.WritecacheTriedToFlushItemsFromWritecache,
|
c.log.Debug(logs.WritecacheTriedToFlushItemsFromWritecache,
|
||||||
zap.Int("count", count),
|
zap.Int("count", count),
|
||||||
zap.String("start", base58.Encode(lastKey)))
|
zap.String("start", base58.Encode(lastKey)))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *cache) flushBigObjects(ctx context.Context) {
|
func (c *Cache) flushBigObjects(ctx context.Context) {
|
||||||
tick := time.NewTicker(defaultFlushInterval * 10)
|
tick := time.NewTicker(defaultFlushInterval * 10)
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
|
@ -157,21 +151,18 @@ func (c *cache) flushBigObjects(ctx context.Context) {
|
||||||
if c.readOnly() {
|
if c.readOnly() {
|
||||||
c.modeMtx.RUnlock()
|
c.modeMtx.RUnlock()
|
||||||
break
|
break
|
||||||
} else if !c.initialized.Load() {
|
|
||||||
c.modeMtx.RUnlock()
|
|
||||||
continue
|
|
||||||
}
|
}
|
||||||
|
|
||||||
_ = c.flushFSTree(ctx, true)
|
_ = c.flushFSTree(ctx, true)
|
||||||
|
|
||||||
c.modeMtx.RUnlock()
|
c.modeMtx.RUnlock()
|
||||||
case <-c.closeCh:
|
case <-c.workersChan:
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *cache) reportFlushError(msg string, addr string, err error) {
|
func (c *Cache) reportFlushError(msg string, addr string, err error) {
|
||||||
if c.reportError != nil {
|
if c.reportError != nil {
|
||||||
c.reportError(msg, err)
|
c.reportError(msg, err)
|
||||||
} else {
|
} else {
|
||||||
|
@ -181,14 +172,18 @@ func (c *cache) reportFlushError(msg string, addr string, err error) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *cache) flushFSTree(ctx context.Context, ignoreErrors bool) error {
|
func (c *Cache) flushFSTree(ctx context.Context, ignoreErrors bool) error {
|
||||||
var prm common.IteratePrm
|
var prm common.IteratePrm
|
||||||
prm.IgnoreErrors = ignoreErrors
|
prm.IgnoreErrors = ignoreErrors
|
||||||
prm.LazyHandler = func(addr oid.Address, f func() ([]byte, error)) error {
|
prm.LazyHandler = func(addr oid.Address, f func() ([]byte, error)) error {
|
||||||
sAddr := addr.EncodeToString()
|
sAddr := addr.EncodeToString()
|
||||||
|
|
||||||
if _, ok := c.store.flushed.Peek(sAddr); ok {
|
select {
|
||||||
return nil
|
case <-c.workersChan:
|
||||||
|
return stopIter
|
||||||
|
case <-ctx.Done():
|
||||||
|
return ctx.Err()
|
||||||
|
default:
|
||||||
}
|
}
|
||||||
|
|
||||||
data, err := f()
|
data, err := f()
|
||||||
|
@ -210,7 +205,7 @@ func (c *cache) flushFSTree(ctx context.Context, ignoreErrors bool) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
err = c.flushObject(ctx, &obj, data)
|
err = c.flushObject(ctx, objWithData{obj: &obj, data: data})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if ignoreErrors {
|
if ignoreErrors {
|
||||||
return nil
|
return nil
|
||||||
|
@ -218,38 +213,56 @@ func (c *cache) flushFSTree(ctx context.Context, ignoreErrors bool) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// mark object as flushed
|
err = c.dropBigObject(ctx, addr, len(data))
|
||||||
c.flushed.Add(sAddr, false)
|
if err != nil {
|
||||||
|
c.reportFlushError("can't drop an object from FSTree", sAddr, err)
|
||||||
|
if ignoreErrors {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err := c.fsTree.Iterate(prm)
|
_, err := c.fsTree.Iterate(prm)
|
||||||
|
if errors.Is(err, stopIter) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// flushWorker writes objects to the main storage.
|
// smallObjectsFlusher writes small objects to the main storage.
|
||||||
func (c *cache) flushWorker(_ int) {
|
func (c *Cache) smallObjectsFlusher() {
|
||||||
defer c.wg.Done()
|
defer c.wg.Done()
|
||||||
|
|
||||||
var obj *object.Object
|
var objAndData objWithData
|
||||||
for {
|
for {
|
||||||
// Give priority to direct put.
|
// Give priority to direct put.
|
||||||
select {
|
select {
|
||||||
case obj = <-c.flushCh:
|
case objAndData = <-c.smallFlushCh:
|
||||||
case <-c.closeCh:
|
case <-c.workersChan:
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
err := c.flushObject(context.TODO(), obj, nil)
|
err := c.flushObject(context.TODO(), objAndData)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
c.flushed.Add(objectCore.AddressOf(obj).EncodeToString(), true)
|
addr := objectCore.AddressOf(objAndData.obj)
|
||||||
|
|
||||||
|
err = c.dropSmallObject(context.TODO(), addr)
|
||||||
|
if err != nil {
|
||||||
|
c.reportFlushError("can't drop object from write-cache",
|
||||||
|
addr.EncodeToString(), err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// flushObject is used to write object directly to the main storage.
|
// flushObject is used to write object directly to the main storage.
|
||||||
func (c *cache) flushObject(ctx context.Context, obj *object.Object, data []byte) error {
|
func (c *Cache) flushObject(ctx context.Context, objAndData objWithData) error {
|
||||||
|
obj := objAndData.obj
|
||||||
|
data := objAndData.data
|
||||||
addr := objectCore.AddressOf(obj)
|
addr := objectCore.AddressOf(obj)
|
||||||
|
|
||||||
var prm common.PutPrm
|
var prm common.PutPrm
|
||||||
|
@ -272,6 +285,11 @@ func (c *cache) flushObject(ctx context.Context, obj *object.Object, data []byte
|
||||||
|
|
||||||
_, err = c.metabase.UpdateStorageID(updPrm)
|
_, err = c.metabase.UpdateStorageID(updPrm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
if errors.As(err, new(apistatus.ObjectNotFound)) || errors.As(err, new(apistatus.ObjectAlreadyRemoved)) {
|
||||||
|
// object info is outdated in the WC
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
c.reportFlushError("can't update object storage ID",
|
c.reportFlushError("can't update object storage ID",
|
||||||
addr.EncodeToString(), err)
|
addr.EncodeToString(), err)
|
||||||
}
|
}
|
||||||
|
@ -281,7 +299,7 @@ func (c *cache) flushObject(ctx context.Context, obj *object.Object, data []byte
|
||||||
// Flush flushes all objects from the write-cache to the main storage.
|
// Flush flushes all objects from the write-cache to the main storage.
|
||||||
// Write-cache must be in readonly mode to ensure correctness of an operation and
|
// Write-cache must be in readonly mode to ensure correctness of an operation and
|
||||||
// to prevent interference with background flush workers.
|
// to prevent interference with background flush workers.
|
||||||
func (c *cache) Flush(ctx context.Context, ignoreErrors bool) error {
|
func (c *Cache) Flush(ctx context.Context, ignoreErrors bool) error {
|
||||||
ctx, span := tracing.StartSpanFromContext(ctx, "writecache.Flush",
|
ctx, span := tracing.StartSpanFromContext(ctx, "writecache.Flush",
|
||||||
trace.WithAttributes(
|
trace.WithAttributes(
|
||||||
attribute.Bool("ignore_errors", ignoreErrors),
|
attribute.Bool("ignore_errors", ignoreErrors),
|
||||||
|
@ -294,21 +312,25 @@ func (c *cache) Flush(ctx context.Context, ignoreErrors bool) error {
|
||||||
return c.flush(ctx, ignoreErrors)
|
return c.flush(ctx, ignoreErrors)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *cache) flush(ctx context.Context, ignoreErrors bool) error {
|
func (c *Cache) flush(ctx context.Context, ignoreErrors bool) error {
|
||||||
if err := c.flushFSTree(ctx, ignoreErrors); err != nil {
|
if err := c.flushFSTree(ctx, ignoreErrors); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
return c.db.View(func(tx *bbolt.Tx) error {
|
var dbFunc func(func(*bbolt.Tx) error) error
|
||||||
|
if c.readOnly() {
|
||||||
|
dbFunc = c.db.View
|
||||||
|
} else {
|
||||||
|
dbFunc = c.db.Update
|
||||||
|
}
|
||||||
|
|
||||||
|
return dbFunc(func(tx *bbolt.Tx) error {
|
||||||
var addr oid.Address
|
var addr oid.Address
|
||||||
|
|
||||||
b := tx.Bucket(defaultBucket)
|
b := tx.Bucket(defaultBucket)
|
||||||
cs := b.Cursor()
|
cs := b.Cursor()
|
||||||
for k, data := cs.Seek(nil); k != nil; k, data = cs.Next() {
|
for k, data := cs.Seek(nil); k != nil; k, data = cs.Next() {
|
||||||
sa := string(k)
|
sa := string(k)
|
||||||
if _, ok := c.flushed.Peek(sa); ok {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := addr.DecodeString(sa); err != nil {
|
if err := addr.DecodeString(sa); err != nil {
|
||||||
c.reportFlushError("can't decode object address from the DB", sa, err)
|
c.reportFlushError("can't decode object address from the DB", sa, err)
|
||||||
|
@ -327,10 +349,101 @@ func (c *cache) flush(ctx context.Context, ignoreErrors bool) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := c.flushObject(ctx, &obj, data); err != nil {
|
err := c.flushObject(ctx, objWithData{obj: &obj, data: data})
|
||||||
|
if err != nil {
|
||||||
|
if ignoreErrors {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if c.readOnly() {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
removed, err := dropObject(tx, k)
|
||||||
|
if err != nil {
|
||||||
|
c.reportFlushError("can't drop an object from the DB", sa, err)
|
||||||
|
if ignoreErrors {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
storagelog.Write(c.log,
|
||||||
|
storagelog.AddressField(addr),
|
||||||
|
storagelog.StorageTypeField(wcStorageType),
|
||||||
|
storagelog.OpField("db DELETE"),
|
||||||
|
)
|
||||||
|
c.objCounters.decDB(removed)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *Cache) dropSmallObject(ctx context.Context, addr oid.Address) error {
|
||||||
|
var removedBytes int
|
||||||
|
key := []byte(addr.EncodeToString())
|
||||||
|
var err error
|
||||||
|
|
||||||
|
err = c.db.Batch(func(tx *bbolt.Tx) error {
|
||||||
|
select {
|
||||||
|
case <-c.workersChan:
|
||||||
|
return nil
|
||||||
|
case <-ctx.Done():
|
||||||
|
return ctx.Err()
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
|
||||||
|
removedBytes, err = dropObject(tx, key)
|
||||||
|
|
||||||
|
return err
|
||||||
|
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
storagelog.Write(c.log,
|
||||||
|
storagelog.AddressField(addr),
|
||||||
|
storagelog.StorageTypeField(wcStorageType),
|
||||||
|
storagelog.OpField("db DELETE"),
|
||||||
|
)
|
||||||
|
|
||||||
|
if removedBytes > 0 {
|
||||||
|
c.objCounters.decDB(removedBytes)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func dropObject(tx *bbolt.Tx, key []byte) (int, error) {
|
||||||
|
b := tx.Bucket(defaultBucket)
|
||||||
|
|
||||||
|
removedBytes := len(b.Get(key))
|
||||||
|
if removedBytes > 0 {
|
||||||
|
return removedBytes, b.Delete(key)
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Cache) dropBigObject(ctx context.Context, addr oid.Address, size int) error {
|
||||||
|
_, err := c.fsTree.Delete(ctx, common.DeletePrm{Address: addr})
|
||||||
|
if err != nil {
|
||||||
|
if errors.As(err, new(apistatus.ObjectNotFound)) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
storagelog.Write(c.log,
|
||||||
|
storagelog.AddressField(addr),
|
||||||
|
storagelog.StorageTypeField(wcStorageType),
|
||||||
|
storagelog.OpField("fstree DELETE"),
|
||||||
|
)
|
||||||
|
c.objCounters.decFS(size)
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
|
@ -5,7 +5,6 @@ import (
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
|
||||||
|
|
||||||
objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
|
||||||
|
@ -39,7 +38,7 @@ func TestFlush(t *testing.T) {
|
||||||
smallSize = 256
|
smallSize = 256
|
||||||
)
|
)
|
||||||
|
|
||||||
newCache := func(t *testing.T, opts ...Option) (Cache, *blobstor.BlobStor, *meta.DB) {
|
newCache := func(t *testing.T, opts ...Option) (*Cache, *blobstor.BlobStor, *meta.DB) {
|
||||||
dir := t.TempDir()
|
dir := t.TempDir()
|
||||||
mb := meta.New(
|
mb := meta.New(
|
||||||
meta.WithPath(filepath.Join(dir, "meta")),
|
meta.WithPath(filepath.Join(dir, "meta")),
|
||||||
|
@ -76,7 +75,7 @@ func TestFlush(t *testing.T) {
|
||||||
return wc, bs, mb
|
return wc, bs, mb
|
||||||
}
|
}
|
||||||
|
|
||||||
putObjects := func(t *testing.T, c Cache) []objectPair {
|
putObjects := func(t *testing.T, c *Cache) []objectPair {
|
||||||
objects := make([]objectPair, objCount)
|
objects := make([]objectPair, objCount)
|
||||||
for i := range objects {
|
for i := range objects {
|
||||||
objects[i] = putObject(t, c, 1+(i%2)*smallSize)
|
objects[i] = putObject(t, c, 1+(i%2)*smallSize)
|
||||||
|
@ -106,12 +105,19 @@ func TestFlush(t *testing.T) {
|
||||||
wc, bs, mb := newCache(t)
|
wc, bs, mb := newCache(t)
|
||||||
objects := putObjects(t, wc)
|
objects := putObjects(t, wc)
|
||||||
|
|
||||||
|
for i := 0; i < 2; i++ {
|
||||||
|
var mPrm meta.GetPrm
|
||||||
|
mPrm.SetAddress(objects[i].addr)
|
||||||
|
_, err := mb.Get(context.Background(), mPrm)
|
||||||
|
require.Error(t, err)
|
||||||
|
|
||||||
|
_, err = bs.Get(context.Background(), common.GetPrm{Address: objects[i].addr})
|
||||||
|
require.Error(t, err)
|
||||||
|
}
|
||||||
|
|
||||||
require.NoError(t, bs.SetMode(mode.ReadWrite))
|
require.NoError(t, bs.SetMode(mode.ReadWrite))
|
||||||
require.NoError(t, mb.SetMode(mode.ReadWrite))
|
require.NoError(t, mb.SetMode(mode.ReadWrite))
|
||||||
|
|
||||||
wc.(*cache).flushed.Add(objects[0].addr.EncodeToString(), true)
|
|
||||||
wc.(*cache).flushed.Add(objects[1].addr.EncodeToString(), false)
|
|
||||||
|
|
||||||
require.NoError(t, wc.Flush(context.Background(), false))
|
require.NoError(t, wc.Flush(context.Background(), false))
|
||||||
|
|
||||||
for i := 0; i < 2; i++ {
|
for i := 0; i < 2; i++ {
|
||||||
|
@ -121,7 +127,7 @@ func TestFlush(t *testing.T) {
|
||||||
require.Error(t, err)
|
require.Error(t, err)
|
||||||
|
|
||||||
_, err = bs.Get(context.Background(), common.GetPrm{Address: objects[i].addr})
|
_, err = bs.Get(context.Background(), common.GetPrm{Address: objects[i].addr})
|
||||||
require.Error(t, err)
|
require.NoError(t, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
check(t, mb, bs, objects[2:])
|
check(t, mb, bs, objects[2:])
|
||||||
|
@ -131,19 +137,11 @@ func TestFlush(t *testing.T) {
|
||||||
wc, bs, mb := newCache(t)
|
wc, bs, mb := newCache(t)
|
||||||
objects := putObjects(t, wc)
|
objects := putObjects(t, wc)
|
||||||
|
|
||||||
// Blobstor is read-only, so we expect en error from `flush` here.
|
// Moving to the degraded mode is called with `ignoreErrors` so
|
||||||
require.Error(t, wc.SetMode(mode.Degraded))
|
// we do not expect an error from `flush` here.
|
||||||
|
|
||||||
// First move to read-only mode to close background workers.
|
|
||||||
require.NoError(t, wc.SetMode(mode.ReadOnly))
|
|
||||||
require.NoError(t, bs.SetMode(mode.ReadWrite))
|
|
||||||
require.NoError(t, mb.SetMode(mode.ReadWrite))
|
|
||||||
|
|
||||||
wc.(*cache).flushed.Add(objects[0].addr.EncodeToString(), true)
|
|
||||||
wc.(*cache).flushed.Add(objects[1].addr.EncodeToString(), false)
|
|
||||||
|
|
||||||
require.NoError(t, wc.SetMode(mode.Degraded))
|
require.NoError(t, wc.SetMode(mode.Degraded))
|
||||||
|
|
||||||
|
// bs is read-only; so is can't get the objects
|
||||||
for i := 0; i < 2; i++ {
|
for i := 0; i < 2; i++ {
|
||||||
var mPrm meta.GetPrm
|
var mPrm meta.GetPrm
|
||||||
mPrm.SetAddress(objects[i].addr)
|
mPrm.SetAddress(objects[i].addr)
|
||||||
|
@ -154,19 +152,29 @@ func TestFlush(t *testing.T) {
|
||||||
require.Error(t, err)
|
require.Error(t, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
require.NoError(t, wc.SetMode(mode.ReadWrite))
|
||||||
|
require.NoError(t, bs.SetMode(mode.ReadWrite))
|
||||||
|
require.NoError(t, mb.SetMode(mode.ReadWrite))
|
||||||
|
|
||||||
|
require.NoError(t, wc.SetMode(mode.Degraded))
|
||||||
|
|
||||||
|
for i := 0; i < 2; i++ {
|
||||||
|
_, err := bs.Get(context.Background(), common.GetPrm{Address: objects[i].addr})
|
||||||
|
require.NoError(t, err)
|
||||||
|
}
|
||||||
|
|
||||||
check(t, mb, bs, objects[2:])
|
check(t, mb, bs, objects[2:])
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("ignore errors", func(t *testing.T) {
|
t.Run("ignore errors", func(t *testing.T) {
|
||||||
testIgnoreErrors := func(t *testing.T, f func(*cache)) {
|
testIgnoreErrors := func(t *testing.T, f func(*Cache)) {
|
||||||
var errCount atomic.Uint32
|
var errCount atomic.Uint32
|
||||||
wc, bs, mb := newCache(t, WithReportErrorFunc(func(message string, err error) {
|
wc, bs, mb := newCache(t, WithReportErrorFunc(func(message string, err error) {
|
||||||
errCount.Inc()
|
errCount.Inc()
|
||||||
}))
|
}))
|
||||||
objects := putObjects(t, wc)
|
objects := putObjects(t, wc)
|
||||||
f(wc.(*cache))
|
f(wc)
|
||||||
|
|
||||||
require.NoError(t, wc.SetMode(mode.ReadOnly))
|
|
||||||
require.NoError(t, bs.SetMode(mode.ReadWrite))
|
require.NoError(t, bs.SetMode(mode.ReadWrite))
|
||||||
require.NoError(t, mb.SetMode(mode.ReadWrite))
|
require.NoError(t, mb.SetMode(mode.ReadWrite))
|
||||||
|
|
||||||
|
@ -178,7 +186,7 @@ func TestFlush(t *testing.T) {
|
||||||
check(t, mb, bs, objects)
|
check(t, mb, bs, objects)
|
||||||
}
|
}
|
||||||
t.Run("db, invalid address", func(t *testing.T) {
|
t.Run("db, invalid address", func(t *testing.T) {
|
||||||
testIgnoreErrors(t, func(c *cache) {
|
testIgnoreErrors(t, func(c *Cache) {
|
||||||
_, data := newObject(t, 1)
|
_, data := newObject(t, 1)
|
||||||
require.NoError(t, c.db.Batch(func(tx *bbolt.Tx) error {
|
require.NoError(t, c.db.Batch(func(tx *bbolt.Tx) error {
|
||||||
b := tx.Bucket(defaultBucket)
|
b := tx.Bucket(defaultBucket)
|
||||||
|
@ -187,7 +195,7 @@ func TestFlush(t *testing.T) {
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
t.Run("db, invalid object", func(t *testing.T) {
|
t.Run("db, invalid object", func(t *testing.T) {
|
||||||
testIgnoreErrors(t, func(c *cache) {
|
testIgnoreErrors(t, func(c *Cache) {
|
||||||
require.NoError(t, c.db.Batch(func(tx *bbolt.Tx) error {
|
require.NoError(t, c.db.Batch(func(tx *bbolt.Tx) error {
|
||||||
b := tx.Bucket(defaultBucket)
|
b := tx.Bucket(defaultBucket)
|
||||||
return b.Put([]byte(oidtest.Address().EncodeToString()), []byte{1, 2, 3})
|
return b.Put([]byte(oidtest.Address().EncodeToString()), []byte{1, 2, 3})
|
||||||
|
@ -195,7 +203,7 @@ func TestFlush(t *testing.T) {
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
t.Run("fs, read error", func(t *testing.T) {
|
t.Run("fs, read error", func(t *testing.T) {
|
||||||
testIgnoreErrors(t, func(c *cache) {
|
testIgnoreErrors(t, func(c *Cache) {
|
||||||
obj, data := newObject(t, 1)
|
obj, data := newObject(t, 1)
|
||||||
|
|
||||||
var prm common.PutPrm
|
var prm common.PutPrm
|
||||||
|
@ -214,7 +222,7 @@ func TestFlush(t *testing.T) {
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
t.Run("fs, invalid object", func(t *testing.T) {
|
t.Run("fs, invalid object", func(t *testing.T) {
|
||||||
testIgnoreErrors(t, func(c *cache) {
|
testIgnoreErrors(t, func(c *Cache) {
|
||||||
var prm common.PutPrm
|
var prm common.PutPrm
|
||||||
prm.Address = oidtest.Address()
|
prm.Address = oidtest.Address()
|
||||||
prm.RawData = []byte{1, 2, 3}
|
prm.RawData = []byte{1, 2, 3}
|
||||||
|
@ -224,7 +232,7 @@ func TestFlush(t *testing.T) {
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("on init", func(t *testing.T) {
|
t.Run("flush", func(t *testing.T) {
|
||||||
wc, bs, mb := newCache(t)
|
wc, bs, mb := newCache(t)
|
||||||
objects := []objectPair{
|
objects := []objectPair{
|
||||||
// removed
|
// removed
|
||||||
|
@ -260,9 +268,6 @@ func TestFlush(t *testing.T) {
|
||||||
_, err = mb.Delete(context.Background(), deletePrm)
|
_, err = mb.Delete(context.Background(), deletePrm)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
require.NoError(t, bs.SetMode(mode.ReadOnly))
|
|
||||||
require.NoError(t, mb.SetMode(mode.ReadOnly))
|
|
||||||
|
|
||||||
// Open in read-only: no error, nothing is removed.
|
// Open in read-only: no error, nothing is removed.
|
||||||
require.NoError(t, wc.Open(true))
|
require.NoError(t, wc.Open(true))
|
||||||
initWC(t, wc)
|
initWC(t, wc)
|
||||||
|
@ -275,18 +280,22 @@ func TestFlush(t *testing.T) {
|
||||||
// Open in read-write: no error, something is removed.
|
// Open in read-write: no error, something is removed.
|
||||||
require.NoError(t, wc.Open(false))
|
require.NoError(t, wc.Open(false))
|
||||||
initWC(t, wc)
|
initWC(t, wc)
|
||||||
|
|
||||||
for i := range objects {
|
for i := range objects {
|
||||||
_, err := wc.Get(context.Background(), objects[i].addr)
|
_, err := wc.Get(context.Background(), objects[i].addr)
|
||||||
if i < 2 {
|
require.NoError(t, err, i)
|
||||||
require.ErrorAs(t, err, new(apistatus.ObjectNotFound), i)
|
}
|
||||||
} else {
|
|
||||||
require.NoError(t, err, i)
|
require.NoError(t, wc.Flush(context.Background(), true))
|
||||||
}
|
|
||||||
|
for i := range objects {
|
||||||
|
_, err := wc.Get(context.Background(), objects[i].addr)
|
||||||
|
require.ErrorAs(t, err, new(apistatus.ObjectNotFound), i)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func putObject(t *testing.T, c Cache, size int) objectPair {
|
func putObject(t *testing.T, c *Cache, size int) objectPair {
|
||||||
obj, data := newObject(t, size)
|
obj, data := newObject(t, size)
|
||||||
|
|
||||||
var prm common.PutPrm
|
var prm common.PutPrm
|
||||||
|
@ -319,13 +328,8 @@ func newObject(t *testing.T, size int) (*object.Object, []byte) {
|
||||||
return obj, data
|
return obj, data
|
||||||
}
|
}
|
||||||
|
|
||||||
func initWC(t *testing.T, wc Cache) {
|
func initWC(t *testing.T, wc *Cache) {
|
||||||
require.NoError(t, wc.Init())
|
require.NoError(t, wc.Init())
|
||||||
|
|
||||||
require.Eventually(t, func() bool {
|
|
||||||
rawWc := wc.(*cache)
|
|
||||||
return rawWc.initialized.Load()
|
|
||||||
}, 100*time.Second, 1*time.Millisecond)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type dummyEpoch struct{}
|
type dummyEpoch struct{}
|
||||||
|
|
|
@ -18,7 +18,7 @@ import (
|
||||||
// Get returns object from write-cache.
|
// Get returns object from write-cache.
|
||||||
//
|
//
|
||||||
// Returns an error of type apistatus.ObjectNotFound if the requested object is missing in write-cache.
|
// Returns an error of type apistatus.ObjectNotFound if the requested object is missing in write-cache.
|
||||||
func (c *cache) Get(ctx context.Context, addr oid.Address) (*objectSDK.Object, error) {
|
func (c *Cache) Get(ctx context.Context, addr oid.Address) (*objectSDK.Object, error) {
|
||||||
saddr := addr.EncodeToString()
|
saddr := addr.EncodeToString()
|
||||||
|
|
||||||
ctx, span := tracing.StartSpanFromContext(ctx, "writecache.Get",
|
ctx, span := tracing.StartSpanFromContext(ctx, "writecache.Get",
|
||||||
|
@ -30,7 +30,6 @@ func (c *cache) Get(ctx context.Context, addr oid.Address) (*objectSDK.Object, e
|
||||||
value, err := Get(c.db, []byte(saddr))
|
value, err := Get(c.db, []byte(saddr))
|
||||||
if err == nil {
|
if err == nil {
|
||||||
obj := objectSDK.New()
|
obj := objectSDK.New()
|
||||||
c.flushed.Get(saddr)
|
|
||||||
return obj, obj.Unmarshal(value)
|
return obj, obj.Unmarshal(value)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -39,14 +38,13 @@ func (c *cache) Get(ctx context.Context, addr oid.Address) (*objectSDK.Object, e
|
||||||
return nil, logicerr.Wrap(apistatus.ObjectNotFound{})
|
return nil, logicerr.Wrap(apistatus.ObjectNotFound{})
|
||||||
}
|
}
|
||||||
|
|
||||||
c.flushed.Get(saddr)
|
|
||||||
return res.Object, nil
|
return res.Object, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Head returns object header from write-cache.
|
// Head returns object header from write-cache.
|
||||||
//
|
//
|
||||||
// Returns an error of type apistatus.ObjectNotFound if the requested object is missing in write-cache.
|
// Returns an error of type apistatus.ObjectNotFound if the requested object is missing in write-cache.
|
||||||
func (c *cache) Head(ctx context.Context, addr oid.Address) (*objectSDK.Object, error) {
|
func (c *Cache) Head(ctx context.Context, addr oid.Address) (*objectSDK.Object, error) {
|
||||||
ctx, span := tracing.StartSpanFromContext(ctx, "writecache.Head",
|
ctx, span := tracing.StartSpanFromContext(ctx, "writecache.Head",
|
||||||
trace.WithAttributes(
|
trace.WithAttributes(
|
||||||
attribute.String("address", addr.EncodeToString()),
|
attribute.String("address", addr.EncodeToString()),
|
||||||
|
|
|
@ -2,191 +2,33 @@ package writecache
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
"fmt"
|
||||||
"sync"
|
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
|
||||||
storagelog "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/log"
|
|
||||||
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
|
||||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
|
||||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
|
||||||
"go.etcd.io/bbolt"
|
|
||||||
"go.uber.org/zap"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func (c *cache) initFlushMarks(ctx context.Context) {
|
// Init runs necessary services.
|
||||||
var localWG sync.WaitGroup
|
func (c *Cache) Init() error {
|
||||||
|
ctx, span := tracing.StartSpanFromContext(context.TODO(), "writecache.Init")
|
||||||
|
defer span.End()
|
||||||
|
|
||||||
localWG.Add(1)
|
c.modeMtx.Lock()
|
||||||
go func() {
|
defer c.modeMtx.Unlock()
|
||||||
defer localWG.Done()
|
|
||||||
|
|
||||||
c.fsTreeFlushMarkUpdate(ctx)
|
if c.mode.NoMetabase() {
|
||||||
}()
|
|
||||||
|
|
||||||
localWG.Add(1)
|
|
||||||
go func() {
|
|
||||||
defer localWG.Done()
|
|
||||||
|
|
||||||
c.dbFlushMarkUpdate(ctx)
|
|
||||||
}()
|
|
||||||
|
|
||||||
c.initWG.Add(1)
|
|
||||||
c.wg.Add(1)
|
|
||||||
go func() {
|
|
||||||
defer c.wg.Done()
|
|
||||||
defer c.initWG.Done()
|
|
||||||
|
|
||||||
localWG.Wait()
|
|
||||||
|
|
||||||
select {
|
|
||||||
case <-c.stopInitCh:
|
|
||||||
return
|
|
||||||
case <-c.closeCh:
|
|
||||||
return
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
|
|
||||||
c.initialized.Store(true)
|
|
||||||
}()
|
|
||||||
}
|
|
||||||
|
|
||||||
var errStopIter = errors.New("stop iteration")
|
|
||||||
|
|
||||||
func (c *cache) fsTreeFlushMarkUpdate(ctx context.Context) {
|
|
||||||
c.log.Info(logs.WritecacheFillingFlushMarksForObjectsInFSTree)
|
|
||||||
|
|
||||||
var prm common.IteratePrm
|
|
||||||
prm.LazyHandler = func(addr oid.Address, _ func() ([]byte, error)) error {
|
|
||||||
select {
|
|
||||||
case <-c.closeCh:
|
|
||||||
return errStopIter
|
|
||||||
case <-c.stopInitCh:
|
|
||||||
return errStopIter
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
|
|
||||||
flushed, needRemove := c.flushStatus(ctx, addr)
|
|
||||||
if flushed {
|
|
||||||
c.store.flushed.Add(addr.EncodeToString(), true)
|
|
||||||
if needRemove {
|
|
||||||
var prm common.DeletePrm
|
|
||||||
prm.Address = addr
|
|
||||||
|
|
||||||
_, err := c.fsTree.Delete(ctx, prm)
|
|
||||||
if err == nil {
|
|
||||||
storagelog.Write(c.log,
|
|
||||||
storagelog.AddressField(addr),
|
|
||||||
storagelog.StorageTypeField(wcStorageType),
|
|
||||||
storagelog.OpField("fstree DELETE"),
|
|
||||||
)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
c.modeMtx.RLock()
|
err := c.initCounters(ctx)
|
||||||
defer c.modeMtx.RUnlock()
|
|
||||||
|
|
||||||
_, _ = c.fsTree.Iterate(prm)
|
|
||||||
|
|
||||||
c.log.Info(logs.WritecacheFinishedUpdatingFSTreeFlushMarks)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *cache) dbFlushMarkUpdate(ctx context.Context) {
|
|
||||||
c.log.Info(logs.WritecacheFillingFlushMarksForObjectsInDatabase)
|
|
||||||
|
|
||||||
c.modeMtx.RLock()
|
|
||||||
defer c.modeMtx.RUnlock()
|
|
||||||
|
|
||||||
var m []string
|
|
||||||
var indices []int
|
|
||||||
var lastKey []byte
|
|
||||||
var batchSize = flushBatchSize
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case <-c.closeCh:
|
|
||||||
return
|
|
||||||
case <-c.stopInitCh:
|
|
||||||
return
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
|
|
||||||
m = m[:0]
|
|
||||||
indices = indices[:0]
|
|
||||||
|
|
||||||
// We put objects in batches of fixed size to not interfere with main put cycle a lot.
|
|
||||||
_ = c.db.View(func(tx *bbolt.Tx) error {
|
|
||||||
b := tx.Bucket(defaultBucket)
|
|
||||||
cs := b.Cursor()
|
|
||||||
for k, _ := cs.Seek(lastKey); k != nil && len(m) < batchSize; k, _ = cs.Next() {
|
|
||||||
m = append(m, string(k))
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
})
|
|
||||||
|
|
||||||
var addr oid.Address
|
|
||||||
for i := range m {
|
|
||||||
if err := addr.DecodeString(m[i]); err != nil {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
flushed, needRemove := c.flushStatus(ctx, addr)
|
|
||||||
if flushed {
|
|
||||||
c.store.flushed.Add(addr.EncodeToString(), true)
|
|
||||||
if needRemove {
|
|
||||||
indices = append(indices, i)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(m) == 0 {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
|
|
||||||
err := c.db.Batch(func(tx *bbolt.Tx) error {
|
|
||||||
b := tx.Bucket(defaultBucket)
|
|
||||||
for _, j := range indices {
|
|
||||||
if err := b.Delete([]byte(m[j])); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
})
|
|
||||||
if err == nil {
|
|
||||||
for _, j := range indices {
|
|
||||||
storagelog.Write(c.log,
|
|
||||||
zap.String("address", m[j]),
|
|
||||||
storagelog.StorageTypeField(wcStorageType),
|
|
||||||
storagelog.OpField("db DELETE"),
|
|
||||||
)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
lastKey = append([]byte(m[len(m)-1]), 0)
|
|
||||||
}
|
|
||||||
|
|
||||||
c.log.Info(logs.WritecacheFinishedUpdatingFlushMarks)
|
|
||||||
}
|
|
||||||
|
|
||||||
// flushStatus returns info about the object state in the main storage.
|
|
||||||
// First return value is true iff object exists.
|
|
||||||
// Second return value is true iff object can be safely removed.
|
|
||||||
func (c *cache) flushStatus(ctx context.Context, addr oid.Address) (bool, bool) {
|
|
||||||
var existsPrm meta.ExistsPrm
|
|
||||||
existsPrm.SetAddress(addr)
|
|
||||||
|
|
||||||
_, err := c.metabase.Exists(ctx, existsPrm)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
needRemove := errors.Is(err, meta.ErrObjectIsExpired) || errors.As(err, new(apistatus.ObjectAlreadyRemoved))
|
return fmt.Errorf("initializing write-cache size: %w", err)
|
||||||
return needRemove, needRemove
|
|
||||||
}
|
}
|
||||||
|
|
||||||
var prm meta.StorageIDPrm
|
if c.mode == mode.ReadWrite {
|
||||||
prm.SetAddress(addr)
|
c.workersChan = make(chan struct{})
|
||||||
|
c.runFlushLoop()
|
||||||
|
}
|
||||||
|
|
||||||
mRes, _ := c.metabase.StorageID(ctx, prm)
|
return nil
|
||||||
res, err := c.blobstor.Exists(ctx, common.ExistsPrm{Address: addr, StorageID: mRes.StorageID()})
|
|
||||||
return err == nil && res.Exists, false
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -31,7 +31,7 @@ func (p *IterationPrm) WithIgnoreErrors(ignore bool) {
|
||||||
// Iterate iterates over all objects present in write cache.
|
// Iterate iterates over all objects present in write cache.
|
||||||
// This is very difficult to do correctly unless write-cache is put in read-only mode.
|
// This is very difficult to do correctly unless write-cache is put in read-only mode.
|
||||||
// Thus we silently fail if shard is not in read-only mode to avoid reporting misleading results.
|
// Thus we silently fail if shard is not in read-only mode to avoid reporting misleading results.
|
||||||
func (c *cache) Iterate(prm IterationPrm) error {
|
func (c *Cache) Iterate(prm IterationPrm) error {
|
||||||
c.modeMtx.RLock()
|
c.modeMtx.RLock()
|
||||||
defer c.modeMtx.RUnlock()
|
defer c.modeMtx.RUnlock()
|
||||||
if !c.readOnly() {
|
if !c.readOnly() {
|
||||||
|
@ -41,9 +41,6 @@ func (c *cache) Iterate(prm IterationPrm) error {
|
||||||
err := c.db.View(func(tx *bbolt.Tx) error {
|
err := c.db.View(func(tx *bbolt.Tx) error {
|
||||||
b := tx.Bucket(defaultBucket)
|
b := tx.Bucket(defaultBucket)
|
||||||
return b.ForEach(func(k, data []byte) error {
|
return b.ForEach(func(k, data []byte) error {
|
||||||
if _, ok := c.flushed.Peek(string(k)); ok {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
return prm.handler(data)
|
return prm.handler(data)
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
@ -54,9 +51,6 @@ func (c *cache) Iterate(prm IterationPrm) error {
|
||||||
var fsPrm common.IteratePrm
|
var fsPrm common.IteratePrm
|
||||||
fsPrm.IgnoreErrors = prm.ignoreErrors
|
fsPrm.IgnoreErrors = prm.ignoreErrors
|
||||||
fsPrm.LazyHandler = func(addr oid.Address, f func() ([]byte, error)) error {
|
fsPrm.LazyHandler = func(addr oid.Address, f func() ([]byte, error)) error {
|
||||||
if _, ok := c.flushed.Peek(addr.EncodeToString()); ok {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
data, err := f()
|
data, err := f()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if prm.ignoreErrors {
|
if prm.ignoreErrors {
|
||||||
|
|
|
@ -16,13 +16,10 @@ import (
|
||||||
// ErrReadOnly is returned when Put/Write is performed in a read-only mode.
|
// ErrReadOnly is returned when Put/Write is performed in a read-only mode.
|
||||||
var ErrReadOnly = logicerr.New("write-cache is in read-only mode")
|
var ErrReadOnly = logicerr.New("write-cache is in read-only mode")
|
||||||
|
|
||||||
// ErrNotInitialized is returned when write-cache is initializing.
|
|
||||||
var ErrNotInitialized = logicerr.New("write-cache is not initialized yet")
|
|
||||||
|
|
||||||
// SetMode sets write-cache mode of operation.
|
// SetMode sets write-cache mode of operation.
|
||||||
// When shard is put in read-only mode all objects in memory are flushed to disk
|
// When shard is put in read-only mode all objects in memory are flushed to disk
|
||||||
// and all background jobs are suspended.
|
// and all background jobs are suspended.
|
||||||
func (c *cache) SetMode(m mode.Mode) error {
|
func (c *Cache) SetMode(m mode.Mode) error {
|
||||||
ctx, span := tracing.StartSpanFromContext(context.TODO(), "writecache.SetMode",
|
ctx, span := tracing.StartSpanFromContext(context.TODO(), "writecache.SetMode",
|
||||||
trace.WithAttributes(
|
trace.WithAttributes(
|
||||||
attribute.String("mode", m.String()),
|
attribute.String("mode", m.String()),
|
||||||
|
@ -33,31 +30,30 @@ func (c *cache) SetMode(m mode.Mode) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// setMode applies new mode. Must be called with cache.modeMtx lock taken.
|
// setMode applies new mode. Must be called with cache.modeMtx lock taken.
|
||||||
func (c *cache) setMode(ctx context.Context, m mode.Mode) error {
|
func (c *Cache) setMode(ctx context.Context, m mode.Mode) error {
|
||||||
var err error
|
var err error
|
||||||
turnOffMeta := m.NoMetabase()
|
|
||||||
|
|
||||||
if !c.initialized.Load() {
|
|
||||||
close(c.stopInitCh)
|
|
||||||
|
|
||||||
c.initWG.Wait()
|
|
||||||
c.stopInitCh = make(chan struct{})
|
|
||||||
|
|
||||||
defer func() {
|
|
||||||
if err == nil && !turnOffMeta {
|
|
||||||
c.initFlushMarks(ctx)
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
}
|
|
||||||
|
|
||||||
c.modeMtx.Lock()
|
c.modeMtx.Lock()
|
||||||
defer c.modeMtx.Unlock()
|
defer c.modeMtx.Unlock()
|
||||||
|
|
||||||
if turnOffMeta && !c.mode.NoMetabase() {
|
var workersActive bool
|
||||||
|
select {
|
||||||
|
case <-c.workersChan:
|
||||||
|
default:
|
||||||
|
workersActive = true
|
||||||
|
}
|
||||||
|
|
||||||
|
stopWorkers := m.NoMetabase() && !c.mode.NoMetabase() || c.mode.ReadWrite() && !m.ReadWrite()
|
||||||
|
|||||||
|
if stopWorkers {
|
||||||
err = c.flush(ctx, true)
|
err = c.flush(ctx, true)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if workersActive {
|
||||||
|
close(c.workersChan)
|
||||||
|
c.wg.Wait()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if c.db != nil {
|
if c.db != nil {
|
||||||
|
@ -67,14 +63,14 @@ func (c *cache) setMode(ctx context.Context, m mode.Mode) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Suspend producers to ensure there are channel send operations in fly.
|
// Suspend producers to ensure there are channel send operations in fly.
|
||||||
// flushCh is populated by `flush` with `modeMtx` taken, thus waiting until it is empty
|
// smallFlushCh is populated by `flush` with `modeMtx` taken, thus waiting until it is empty
|
||||||
// guarantees that there are no in-fly operations.
|
// guarantees that there are no in-fly operations.
|
||||||
for len(c.flushCh) != 0 {
|
for len(c.smallFlushCh) != 0 {
|
||||||
c.log.Info(logs.WritecacheWaitingForChannelsToFlush)
|
c.log.Info(logs.WritecacheWaitingForChannelsToFlush)
|
||||||
time.Sleep(time.Second)
|
time.Sleep(time.Second)
|
||||||
}
|
}
|
||||||
|
|
||||||
if turnOffMeta {
|
if m.NoMetabase() {
|
||||||
c.mode = m
|
c.mode = m
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -84,11 +80,21 @@ func (c *cache) setMode(ctx context.Context, m mode.Mode) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
c.mode = m
|
c.mode = m
|
||||||
|
|
||||||
|
if m == mode.ReadWrite {
|
||||||
|
select {
|
||||||
|
case <-c.workersChan:
|
||||||
|
c.workersChan = make(chan struct{})
|
||||||
|
c.runFlushLoop()
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// readOnly returns true if current mode is read-only.
|
// readOnly returns true if current mode is read-only.
|
||||||
// `c.modeMtx` must be taken.
|
// `c.modeMtx` must be taken.
|
||||||
func (c *cache) readOnly() bool {
|
func (c *Cache) readOnly() bool {
|
||||||
return c.mode.ReadOnly()
|
return c.mode.ReadOnly()
|
||||||
}
|
}
|
||||||
|
|
|
@ -48,8 +48,6 @@ type options struct {
|
||||||
// maxCacheSize is the maximum total size of all objects saved in cache (DB + FS).
|
// maxCacheSize is the maximum total size of all objects saved in cache (DB + FS).
|
||||||
// 1 GiB by default.
|
// 1 GiB by default.
|
||||||
maxCacheSize uint64
|
maxCacheSize uint64
|
||||||
// objCounters contains atomic counters for the number of objects stored in cache.
|
|
||||||
objCounters counters
|
|
||||||
// maxBatchSize is the maximum batch size for the small object database.
|
// maxBatchSize is the maximum batch size for the small object database.
|
||||||
maxBatchSize int
|
maxBatchSize int
|
||||||
// maxBatchDelay is the maximum batch wait time for the small object database.
|
// maxBatchDelay is the maximum batch wait time for the small object database.
|
||||||
|
|
|
@ -3,6 +3,7 @@ package writecache
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing"
|
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||||
|
@ -25,7 +26,7 @@ var (
|
||||||
// Returns ErrNotInitialized if write-cache has not been initialized yet.
|
// Returns ErrNotInitialized if write-cache has not been initialized yet.
|
||||||
// Returns ErrOutOfSpace if saving an object leads to WC's size overflow.
|
// Returns ErrOutOfSpace if saving an object leads to WC's size overflow.
|
||||||
// Returns ErrBigObject if an objects exceeds maximum object size.
|
// Returns ErrBigObject if an objects exceeds maximum object size.
|
||||||
func (c *cache) Put(ctx context.Context, prm common.PutPrm) (common.PutRes, error) {
|
func (c *Cache) Put(ctx context.Context, prm common.PutPrm) (common.PutRes, error) {
|
||||||
ctx, span := tracing.StartSpanFromContext(ctx, "writecache.Put",
|
ctx, span := tracing.StartSpanFromContext(ctx, "writecache.Put",
|
||||||
trace.WithAttributes(
|
trace.WithAttributes(
|
||||||
attribute.String("address", prm.Address.EncodeToString()),
|
attribute.String("address", prm.Address.EncodeToString()),
|
||||||
|
@ -37,8 +38,6 @@ func (c *cache) Put(ctx context.Context, prm common.PutPrm) (common.PutRes, erro
|
||||||
defer c.modeMtx.RUnlock()
|
defer c.modeMtx.RUnlock()
|
||||||
if c.readOnly() {
|
if c.readOnly() {
|
||||||
return common.PutRes{}, ErrReadOnly
|
return common.PutRes{}, ErrReadOnly
|
||||||
} else if !c.initialized.Load() {
|
|
||||||
return common.PutRes{}, ErrNotInitialized
|
|
||||||
}
|
}
|
||||||
|
|
||||||
sz := uint64(len(prm.RawData))
|
sz := uint64(len(prm.RawData))
|
||||||
|
@ -52,23 +51,42 @@ func (c *cache) Put(ctx context.Context, prm common.PutPrm) (common.PutRes, erro
|
||||||
data: prm.RawData,
|
data: prm.RawData,
|
||||||
}
|
}
|
||||||
|
|
||||||
if sz <= c.smallObjectSize {
|
if c.maxCacheSize < c.sizeIfAdd(sz) {
|
||||||
return common.PutRes{}, c.putSmall(oi)
|
return common.PutRes{}, ErrOutOfSpace
|
||||||
}
|
}
|
||||||
return common.PutRes{}, c.putBig(ctx, oi.addr, prm)
|
|
||||||
|
if sz <= c.smallObjectSize {
|
||||||
|
err := c.putSmall(oi)
|
||||||
|
if err != nil {
|
||||||
|
err = fmt.Errorf("could not put small object to DB: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return common.PutRes{}, err
|
||||||
|
}
|
||||||
|
|
||||||
|
err := c.putBig(ctx, oi.addr, prm)
|
||||||
|
if err != nil {
|
||||||
|
err = fmt.Errorf("could not put big object to FSTree: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return common.PutRes{}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// putSmall persists small objects to the write-cache database and
|
// putSmall persists small objects to the write-cache database and
|
||||||
// pushes the to the flush workers queue.
|
// pushes the to the flush workers queue.
|
||||||
func (c *cache) putSmall(obj objectInfo) error {
|
func (c *Cache) putSmall(obj objectInfo) error {
|
||||||
cacheSize := c.estimateCacheSize()
|
var alreadyExists bool
|
||||||
if c.maxCacheSize < c.incSizeDB(cacheSize) {
|
|
||||||
return ErrOutOfSpace
|
|
||||||
}
|
|
||||||
|
|
||||||
err := c.db.Batch(func(tx *bbolt.Tx) error {
|
err := c.db.Batch(func(tx *bbolt.Tx) error {
|
||||||
b := tx.Bucket(defaultBucket)
|
b := tx.Bucket(defaultBucket)
|
||||||
return b.Put([]byte(obj.addr), obj.data)
|
addr := []byte(obj.addr)
|
||||||
|
|
||||||
|
alreadyExists = len(b.Get(addr)) != 0
|
||||||
|
if alreadyExists {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return b.Put(addr, obj.data)
|
||||||
})
|
})
|
||||||
if err == nil {
|
if err == nil {
|
||||||
storagelog.Write(c.log,
|
storagelog.Write(c.log,
|
||||||
|
@ -76,29 +94,22 @@ func (c *cache) putSmall(obj objectInfo) error {
|
||||||
storagelog.StorageTypeField(wcStorageType),
|
storagelog.StorageTypeField(wcStorageType),
|
||||||
storagelog.OpField("db PUT"),
|
storagelog.OpField("db PUT"),
|
||||||
)
|
)
|
||||||
c.objCounters.IncDB()
|
|
||||||
|
if !alreadyExists {
|
||||||
|
c.objCounters.incDB(len(obj.data))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return nil
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// putBig writes object to FSTree and pushes it to the flush workers queue.
|
// putBig writes object to FSTree and pushes it to the flush workers queue.
|
||||||
func (c *cache) putBig(ctx context.Context, addr string, prm common.PutPrm) error {
|
func (c *Cache) putBig(ctx context.Context, addr string, prm common.PutPrm) error {
|
||||||
cacheSz := c.estimateCacheSize()
|
|
||||||
if c.maxCacheSize < c.incSizeFS(cacheSz) {
|
|
||||||
return ErrOutOfSpace
|
|
||||||
}
|
|
||||||
|
|
||||||
_, err := c.fsTree.Put(ctx, prm)
|
_, err := c.fsTree.Put(ctx, prm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if c.blobstor.NeedsCompression(prm.Object) {
|
c.objCounters.incFS(len(prm.RawData))
|
||||||
c.mtx.Lock()
|
|
||||||
c.compressFlags[addr] = struct{}{}
|
|
||||||
c.mtx.Unlock()
|
|
||||||
}
|
|
||||||
c.objCounters.IncFS()
|
|
||||||
storagelog.Write(c.log,
|
storagelog.Write(c.log,
|
||||||
storagelog.AddressField(addr),
|
storagelog.AddressField(addr),
|
||||||
storagelog.StorageTypeField(wcStorageType),
|
storagelog.StorageTypeField(wcStorageType),
|
||||||
|
|
|
@ -1,72 +1,134 @@
|
||||||
package writecache
|
package writecache
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||||
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
"go.etcd.io/bbolt"
|
"go.etcd.io/bbolt"
|
||||||
"go.uber.org/atomic"
|
"go.uber.org/atomic"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (c *cache) estimateCacheSize() uint64 {
|
func (c *Cache) sizeIfAdd(delta uint64) uint64 {
|
||||||
return c.objCounters.DB()*c.smallObjectSize + c.objCounters.FS()*c.maxObjectSize
|
return delta + c.objCounters.fstreeSize.Load() + c.objCounters.dbSize.Load()
|
||||||
}
|
|
||||||
|
|
||||||
func (c *cache) incSizeDB(sz uint64) uint64 {
|
|
||||||
return sz + c.smallObjectSize
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *cache) incSizeFS(sz uint64) uint64 {
|
|
||||||
return sz + c.maxObjectSize
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type counters struct {
|
type counters struct {
|
||||||
cDB, cFS atomic.Uint64
|
dbSize, fstreeSize atomic.Uint64
|
||||||
}
|
}
|
||||||
|
|
||||||
func (x *counters) IncDB() {
|
func (x *counters) incDB(delta int) {
|
||||||
x.cDB.Inc()
|
x.dbSize.Add(uint64(delta))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (x *counters) DecDB() {
|
func (x *counters) decDB(delta int) {
|
||||||
x.cDB.Dec()
|
x.dbSize.Sub(uint64(delta))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (x *counters) DB() uint64 {
|
func (x *counters) incFS(delta int) {
|
||||||
return x.cDB.Load()
|
x.fstreeSize.Add(uint64(delta))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (x *counters) IncFS() {
|
func (x *counters) decFS(delta int) {
|
||||||
x.cFS.Inc()
|
x.fstreeSize.Sub(uint64(delta))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (x *counters) DecFS() {
|
func (c *Cache) initCounters(ctx context.Context) error {
|
||||||
x.cFS.Dec()
|
var wg sync.WaitGroup
|
||||||
|
var dbErr error
|
||||||
|
var fsErr error
|
||||||
|
|
||||||
|
wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
dbErr = c.initDBSizeCounter(ctx)
|
||||||
|
wg.Done()
|
||||||
|
}()
|
||||||
|
|
||||||
|
wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
fsErr = c.initFSSizeCounter(ctx)
|
||||||
|
wg.Done()
|
||||||
|
}()
|
||||||
|
|
||||||
|
wg.Wait()
|
||||||
|
|
||||||
|
switch {
|
||||||
|
case dbErr != nil:
|
||||||
|
return fmt.Errorf("database counter initialization: %w", dbErr)
|
||||||
|
case fsErr != nil:
|
||||||
|
return fmt.Errorf("FSTree counter initialization: %w", fsErr)
|
||||||
|
default:
|
||||||
|
return nil
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (x *counters) FS() uint64 {
|
var stopIter = errors.New("stop")
|
||||||
return x.cFS.Load()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *cache) initCounters() error {
|
func (c *Cache) initDBSizeCounter(ctx context.Context) error {
|
||||||
var inDB uint64
|
var inDB int
|
||||||
err := c.db.View(func(tx *bbolt.Tx) error {
|
err := c.db.View(func(tx *bbolt.Tx) error {
|
||||||
b := tx.Bucket(defaultBucket)
|
b := tx.Bucket(defaultBucket)
|
||||||
if b != nil {
|
if b == nil {
|
||||||
inDB = uint64(b.Stats().KeyN)
|
return nil
|
||||||
}
|
}
|
||||||
return nil
|
|
||||||
|
return b.ForEach(func(_, v []byte) error {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return ctx.Err()
|
||||||
|
case <-c.workersChan:
|
||||||
|
return stopIter
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
|
||||||
|
inDB += len(v)
|
||||||
|
return nil
|
||||||
|
})
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil && !errors.Is(err, stopIter) {
|
||||||
return fmt.Errorf("could not read write-cache DB counter: %w", err)
|
return fmt.Errorf("could not read write-cache DB counter: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
inFS, err := c.fsTree.NumberOfObjects()
|
c.objCounters.dbSize.Store(uint64(inDB))
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("could not read write-cache FS counter: %w", err)
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
c.objCounters.cDB.Store(inDB)
|
func (c *Cache) initFSSizeCounter(ctx context.Context) error {
|
||||||
c.objCounters.cFS.Store(inFS)
|
var inFSTree int
|
||||||
|
|
||||||
|
var prm common.IteratePrm
|
||||||
|
prm.LazyHandler = func(address oid.Address, f func() ([]byte, error)) error {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return ctx.Err()
|
||||||
|
case <-c.workersChan:
|
||||||
|
return stopIter
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
|
||||||
|
data, err := f()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// write-cache is a temporary storage on a fast disk,
|
||||||
|
// so it is not expected to be configured with any
|
||||||
|
// compressor ever
|
||||||
|
inFSTree += len(data)
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err := c.fsTree.Iterate(prm)
|
||||||
|
if err != nil && !errors.Is(err, stopIter) {
|
||||||
|
return fmt.Errorf("could not read write-cache FSTree counter: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
c.objCounters.fstreeSize.Store(uint64(inFSTree))
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,45 +1,23 @@
|
||||||
package writecache
|
package writecache
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
|
||||||
"errors"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"os"
|
"os"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree"
|
||||||
storagelog "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/log"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
|
||||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
|
||||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
|
||||||
lru "github.com/hashicorp/golang-lru/v2"
|
|
||||||
"github.com/hashicorp/golang-lru/v2/simplelru"
|
|
||||||
"go.etcd.io/bbolt"
|
"go.etcd.io/bbolt"
|
||||||
"go.uber.org/zap"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// store represents persistent storage with in-memory LRU cache
|
// smallStore represents persistent storage with in-memory LRU cache
|
||||||
// for flushed items on top of it.
|
// for flushed items on top of it.
|
||||||
type store struct {
|
type smallStore struct {
|
||||||
maxFlushedMarksCount int
|
db *bbolt.DB
|
||||||
maxRemoveBatchSize int
|
|
||||||
|
|
||||||
// flushed contains addresses of objects that were already flushed to the main storage.
|
|
||||||
// We use LRU cache instead of map here to facilitate removing of unused object in favour of
|
|
||||||
// frequently read ones.
|
|
||||||
// MUST NOT be used inside bolt db transaction because it's eviction handler
|
|
||||||
// removes untracked items from the database.
|
|
||||||
flushed simplelru.LRUCache[string, bool]
|
|
||||||
db *bbolt.DB
|
|
||||||
|
|
||||||
dbKeysToRemove []string
|
|
||||||
fsKeysToRemove []string
|
|
||||||
}
|
}
|
||||||
|
|
||||||
const dbName = "small.bolt"
|
const dbName = "small.bolt"
|
||||||
|
|
||||||
func (c *cache) openStore(readOnly bool) error {
|
func (c *Cache) openStore(readOnly bool) error {
|
||||||
err := util.MkdirAllX(c.path, os.ModePerm)
|
err := util.MkdirAllX(c.path, os.ModePerm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -69,101 +47,9 @@ func (c *cache) openStore(readOnly bool) error {
|
||||||
fstree.WithDepth(1),
|
fstree.WithDepth(1),
|
||||||
fstree.WithDirNameLen(1),
|
fstree.WithDirNameLen(1),
|
||||||
fstree.WithNoSync(c.noSync))
|
fstree.WithNoSync(c.noSync))
|
||||||
if err := c.fsTree.Open(readOnly); err != nil {
|
if err = c.fsTree.Open(readOnly); err != nil {
|
||||||
return fmt.Errorf("could not open FSTree: %w", err)
|
return fmt.Errorf("could not open FSTree: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Write-cache can be opened multiple times during `SetMode`.
|
|
||||||
// flushed map must not be re-created in this case.
|
|
||||||
if c.flushed == nil {
|
|
||||||
c.flushed, _ = lru.NewWithEvict[string, bool](c.maxFlushedMarksCount, c.removeFlushed)
|
|
||||||
}
|
|
||||||
|
|
||||||
c.initialized.Store(false)
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// removeFlushed removes an object from the writecache.
|
|
||||||
// To minimize interference with the client operations, the actual removal
|
|
||||||
// is done in batches.
|
|
||||||
// It is not thread-safe and is used only as an evict callback to LRU cache.
|
|
||||||
func (c *cache) removeFlushed(key string, value bool) {
|
|
||||||
fromDatabase := value
|
|
||||||
if fromDatabase {
|
|
||||||
c.dbKeysToRemove = append(c.dbKeysToRemove, key)
|
|
||||||
} else {
|
|
||||||
c.fsKeysToRemove = append(c.fsKeysToRemove, key)
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(c.dbKeysToRemove)+len(c.fsKeysToRemove) >= c.maxRemoveBatchSize {
|
|
||||||
c.dbKeysToRemove = c.deleteFromDB(c.dbKeysToRemove)
|
|
||||||
c.fsKeysToRemove = c.deleteFromDisk(c.fsKeysToRemove)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *cache) deleteFromDB(keys []string) []string {
|
|
||||||
if len(keys) == 0 {
|
|
||||||
return keys
|
|
||||||
}
|
|
||||||
|
|
||||||
var errorIndex int
|
|
||||||
err := c.db.Batch(func(tx *bbolt.Tx) error {
|
|
||||||
b := tx.Bucket(defaultBucket)
|
|
||||||
for errorIndex = range keys {
|
|
||||||
if err := b.Delete([]byte(keys[errorIndex])); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
})
|
|
||||||
for i := 0; i < errorIndex; i++ {
|
|
||||||
c.objCounters.DecDB()
|
|
||||||
storagelog.Write(c.log,
|
|
||||||
storagelog.AddressField(keys[i]),
|
|
||||||
storagelog.StorageTypeField(wcStorageType),
|
|
||||||
storagelog.OpField("db DELETE"),
|
|
||||||
)
|
|
||||||
}
|
|
||||||
if err != nil {
|
|
||||||
c.log.Error(logs.WritecacheCantRemoveObjectsFromTheDatabase, zap.Error(err))
|
|
||||||
}
|
|
||||||
|
|
||||||
copy(keys, keys[errorIndex:])
|
|
||||||
return keys[:len(keys)-errorIndex]
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *cache) deleteFromDisk(keys []string) []string {
|
|
||||||
if len(keys) == 0 {
|
|
||||||
return keys
|
|
||||||
}
|
|
||||||
|
|
||||||
var copyIndex int
|
|
||||||
var addr oid.Address
|
|
||||||
|
|
||||||
for i := range keys {
|
|
||||||
if err := addr.DecodeString(keys[i]); err != nil {
|
|
||||||
c.log.Error(logs.WritecacheCantParseAddress, zap.String("address", keys[i]))
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
_, err := c.fsTree.Delete(context.TODO(), common.DeletePrm{Address: addr})
|
|
||||||
if err != nil && !errors.As(err, new(apistatus.ObjectNotFound)) {
|
|
||||||
c.log.Error(logs.WritecacheCantRemoveObjectFromWritecache, zap.Error(err))
|
|
||||||
|
|
||||||
// Save the key for the next iteration.
|
|
||||||
keys[copyIndex] = keys[i]
|
|
||||||
copyIndex++
|
|
||||||
continue
|
|
||||||
} else if err == nil {
|
|
||||||
storagelog.Write(c.log,
|
|
||||||
storagelog.AddressField(keys[i]),
|
|
||||||
storagelog.StorageTypeField(wcStorageType),
|
|
||||||
storagelog.OpField("fstree DELETE"),
|
|
||||||
)
|
|
||||||
c.objCounters.DecFS()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return keys[:copyIndex]
|
|
||||||
}
|
|
||||||
|
|
|
@ -5,15 +5,11 @@ import (
|
||||||
"os"
|
"os"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
|
||||||
"go.etcd.io/bbolt"
|
"go.etcd.io/bbolt"
|
||||||
"go.uber.org/atomic"
|
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -24,52 +20,24 @@ type Info struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Cache represents write-cache for objects.
|
// Cache represents write-cache for objects.
|
||||||
type Cache interface {
|
type Cache struct {
|
||||||
Get(ctx context.Context, address oid.Address) (*object.Object, error)
|
|
||||||
Head(context.Context, oid.Address) (*object.Object, error)
|
|
||||||
// Delete removes object referenced by the given oid.Address from the
|
|
||||||
// Cache. Returns any error encountered that prevented the object to be
|
|
||||||
// removed.
|
|
||||||
//
|
|
||||||
// Returns apistatus.ObjectNotFound if object is missing in the Cache.
|
|
||||||
// Returns ErrReadOnly if the Cache is currently in the read-only mode.
|
|
||||||
Delete(context.Context, oid.Address) error
|
|
||||||
Iterate(IterationPrm) error
|
|
||||||
Put(context.Context, common.PutPrm) (common.PutRes, error)
|
|
||||||
SetMode(mode.Mode) error
|
|
||||||
SetLogger(*logger.Logger)
|
|
||||||
DumpInfo() Info
|
|
||||||
Flush(context.Context, bool) error
|
|
||||||
|
|
||||||
Init() error
|
|
||||||
Open(readOnly bool) error
|
|
||||||
Close() error
|
|
||||||
}
|
|
||||||
|
|
||||||
type cache struct {
|
|
||||||
options
|
options
|
||||||
|
|
||||||
// mtx protects statistics, counters and compressFlags.
|
// objCounters contains atomic counters for the number of objects stored in cache.
|
||||||
mtx sync.RWMutex
|
objCounters counters
|
||||||
|
|
||||||
mode mode.Mode
|
modeMtx sync.RWMutex
|
||||||
initialized atomic.Bool
|
mode mode.Mode
|
||||||
stopInitCh chan struct{} // used to sync initWG initialisation routines and _only_ them
|
|
||||||
initWG sync.WaitGroup // for initialisation routines only
|
|
||||||
modeMtx sync.RWMutex
|
|
||||||
|
|
||||||
// compressFlags maps address of a big object to boolean value indicating
|
|
||||||
// whether object should be compressed.
|
|
||||||
compressFlags map[string]struct{}
|
|
||||||
|
|
||||||
// flushCh is a channel with objects to flush.
|
// flushCh is a channel with objects to flush.
|
||||||
flushCh chan *object.Object
|
smallFlushCh chan objWithData
|
||||||
// closeCh is close channel, protected by modeMtx.
|
// workersChan is close channel, protected by modeMtx.
|
||||||
closeCh chan struct{}
|
// It indicates status of the background workers.
|
||||||
|
workersChan chan struct{}
|
||||||
// wg is a wait group for flush workers.
|
// wg is a wait group for flush workers.
|
||||||
wg sync.WaitGroup
|
wg sync.WaitGroup
|
||||||
// store contains underlying database.
|
// store contains underlying database.
|
||||||
store
|
smallStore
|
||||||
// fsTree contains big files stored directly on file-system.
|
// fsTree contains big files stored directly on file-system.
|
||||||
fsTree *fstree.FSTree
|
fsTree *fstree.FSTree
|
||||||
}
|
}
|
||||||
|
@ -94,13 +62,16 @@ var (
|
||||||
)
|
)
|
||||||
|
|
||||||
// New creates new writecache instance.
|
// New creates new writecache instance.
|
||||||
func New(opts ...Option) Cache {
|
// The value must not be copied after creation.
|
||||||
c := &cache{
|
func New(opts ...Option) *Cache {
|
||||||
flushCh: make(chan *object.Object),
|
closeCh := make(chan struct{})
|
||||||
mode: mode.ReadWrite,
|
close(closeCh)
|
||||||
stopInitCh: make(chan struct{}),
|
|
||||||
|
c := &Cache{
|
||||||
|
smallFlushCh: make(chan objWithData),
|
||||||
|
mode: mode.ReadWrite,
|
||||||
|
workersChan: closeCh,
|
||||||
|
|
||||||
compressFlags: make(map[string]struct{}),
|
|
||||||
options: options{
|
options: options{
|
||||||
log: &logger.Logger{Logger: zap.NewNop()},
|
log: &logger.Logger{Logger: zap.NewNop()},
|
||||||
maxObjectSize: defaultMaxObjectSize,
|
maxObjectSize: defaultMaxObjectSize,
|
||||||
|
@ -117,67 +88,52 @@ func New(opts ...Option) Cache {
|
||||||
opts[i](&c.options)
|
opts[i](&c.options)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Make the LRU cache contain which take approximately 3/4 of the maximum space.
|
|
||||||
// Assume small and big objects are stored in 50-50 proportion.
|
|
||||||
c.maxFlushedMarksCount = int(c.maxCacheSize/c.maxObjectSize+c.maxCacheSize/c.smallObjectSize) / 2 * 3 / 4
|
|
||||||
// Trigger the removal when the cache is 7/8 full, so that new items can still arrive.
|
|
||||||
c.maxRemoveBatchSize = c.maxFlushedMarksCount / 8
|
|
||||||
|
|
||||||
return c
|
return c
|
||||||
}
|
}
|
||||||
|
|
||||||
// SetLogger sets logger. It is used after the shard ID was generated to use it in logs.
|
// SetLogger sets logger. It is used after the shard ID was generated to use it in logs.
|
||||||
func (c *cache) SetLogger(l *logger.Logger) {
|
func (c *Cache) SetLogger(l *logger.Logger) {
|
||||||
c.log = l
|
c.log = l
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *cache) DumpInfo() Info {
|
func (c *Cache) DumpInfo() Info {
|
||||||
return Info{
|
return Info{
|
||||||
Path: c.path,
|
Path: c.path,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Open opens and initializes database. Reads object counters from the ObjectCounters instance.
|
// Open opens and initializes database. Reads object counters from the ObjectCounters instance.
|
||||||
func (c *cache) Open(readOnly bool) error {
|
func (c *Cache) Open(readOnly bool) error {
|
||||||
err := c.openStore(readOnly)
|
err := c.openStore(readOnly)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Opening after Close is done during maintenance mode,
|
c.modeMtx.Lock()
|
||||||
// thus we need to create a channel here.
|
defer c.modeMtx.Unlock()
|
||||||
c.closeCh = make(chan struct{})
|
|
||||||
|
|
||||||
return c.initCounters()
|
if readOnly {
|
||||||
}
|
c.mode = mode.ReadOnly
|
||||||
|
} else {
|
||||||
|
c.mode = mode.ReadWrite
|
||||||
|
}
|
||||||
|
|
||||||
// Init runs necessary services.
|
|
||||||
func (c *cache) Init() error {
|
|
||||||
ctx, span := tracing.StartSpanFromContext(context.TODO(), "writecache.Init")
|
|
||||||
defer span.End()
|
|
||||||
|
|
||||||
c.initFlushMarks(ctx)
|
|
||||||
c.runFlushLoop()
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Close closes db connection and stops services. Executes ObjectCounters.FlushAndClose op.
|
// Close closes db connection and stops services. Executes ObjectCounters.FlushAndClose op.
|
||||||
func (c *cache) Close() error {
|
func (c *Cache) Close() error {
|
||||||
// Finish all in-progress operations.
|
// Finish all in-progress operations if they are
|
||||||
if err := c.setMode(context.TODO(), mode.ReadOnly); err != nil {
|
// in progress.
|
||||||
return err
|
select {
|
||||||
|
case <-c.workersChan:
|
||||||
|
default:
|
||||||
|
err := c.setMode(context.TODO(), mode.ReadOnly)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if c.closeCh != nil {
|
|
||||||
close(c.closeCh)
|
|
||||||
}
|
|
||||||
c.wg.Wait()
|
|
||||||
if c.closeCh != nil {
|
|
||||||
c.closeCh = nil
|
|
||||||
}
|
|
||||||
|
|
||||||
c.initialized.Store(false)
|
|
||||||
|
|
||||||
var err error
|
var err error
|
||||||
if c.db != nil {
|
if c.db != nil {
|
||||||
err = c.db.Close()
|
err = c.db.Close()
|
||||||
|
|
Loading…
Reference in a new issue
wow. looks too complex.