WIP: Simplify write-cache #314
14 changed files with 58 additions and 83 deletions
|
@ -91,7 +91,7 @@ func (s *Shard) Get(ctx context.Context, prm GetPrm) (GetRes, error) {
|
||||||
return res.Object, nil
|
return res.Object, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
wc := func(c writecache.Cache) (*objectSDK.Object, error) {
|
wc := func(c *writecache.Cache) (*objectSDK.Object, error) {
|
||||||
return c.Get(ctx, prm.addr)
|
return c.Get(ctx, prm.addr)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -109,7 +109,7 @@ func (s *Shard) Get(ctx context.Context, prm GetPrm) (GetRes, error) {
|
||||||
var emptyStorageID = make([]byte, 0)
|
var emptyStorageID = make([]byte, 0)
|
||||||
|
|
||||||
// fetchObjectData looks through writeCache and blobStor to find object.
|
// fetchObjectData looks through writeCache and blobStor to find object.
|
||||||
func (s *Shard) fetchObjectData(ctx context.Context, addr oid.Address, skipMeta bool, cb storFetcher, wc func(w writecache.Cache) (*objectSDK.Object, error)) (*objectSDK.Object, bool, error) {
|
func (s *Shard) fetchObjectData(ctx context.Context, addr oid.Address, skipMeta bool, cb storFetcher, wc func(w *writecache.Cache) (*objectSDK.Object, error)) (*objectSDK.Object, bool, error) {
|
||||||
var (
|
var (
|
||||||
mErr error
|
mErr error
|
||||||
mRes meta.ExistsRes
|
mRes meta.ExistsRes
|
||||||
|
|
|
@ -104,7 +104,7 @@ func (s *Shard) GetRange(ctx context.Context, prm RngPrm) (RngRes, error) {
|
||||||
return obj, nil
|
return obj, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
wc := func(c writecache.Cache) (*object.Object, error) {
|
wc := func(c *writecache.Cache) (*object.Object, error) {
|
||||||
res, err := c.Get(ctx, prm.addr)
|
res, err := c.Get(ctx, prm.addr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
|
|
@ -23,7 +23,7 @@ type Shard struct {
|
||||||
|
|
||||||
gc *gc
|
gc *gc
|
||||||
|
|
||||||
writeCache writecache.Cache
|
writeCache *writecache.Cache
|
||||||
|
|
||||||
blobStor *blobstor.BlobStor
|
blobStor *blobstor.BlobStor
|
||||||
|
|
||||||
|
|
|
@ -15,7 +15,7 @@ import (
|
||||||
// Delete removes object from write-cache.
|
// Delete removes object from write-cache.
|
||||||
//
|
//
|
||||||
// Returns an error of type apistatus.ObjectNotFound if object is missing in write-cache.
|
// Returns an error of type apistatus.ObjectNotFound if object is missing in write-cache.
|
||||||
func (c *cache) Delete(ctx context.Context, addr oid.Address) error {
|
func (c *Cache) Delete(ctx context.Context, addr oid.Address) error {
|
||||||
ctx, span := tracing.StartSpanFromContext(ctx, "writecache.Delete",
|
ctx, span := tracing.StartSpanFromContext(ctx, "writecache.Delete",
|
||||||
trace.WithAttributes(
|
trace.WithAttributes(
|
||||||
attribute.String("address", addr.EncodeToString()),
|
attribute.String("address", addr.EncodeToString()),
|
||||||
|
|
|
@ -34,7 +34,7 @@ const (
|
||||||
)
|
)
|
||||||
|
|
||||||
// runFlushLoop starts background workers which periodically flush objects to the blobstor.
|
// runFlushLoop starts background workers which periodically flush objects to the blobstor.
|
||||||
func (c *cache) runFlushLoop() {
|
func (c *Cache) runFlushLoop() {
|
||||||
for i := 0; i < c.workersCount; i++ {
|
for i := 0; i < c.workersCount; i++ {
|
||||||
c.wg.Add(1)
|
c.wg.Add(1)
|
||||||
go c.flushWorker(i)
|
go c.flushWorker(i)
|
||||||
|
@ -65,7 +65,7 @@ func (c *cache) runFlushLoop() {
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *cache) flushDB() {
|
func (c *Cache) flushDB() {
|
||||||
var lastKey []byte
|
var lastKey []byte
|
||||||
var m []objectInfo
|
var m []objectInfo
|
||||||
for {
|
for {
|
||||||
|
@ -148,7 +148,7 @@ func (c *cache) flushDB() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *cache) flushBigObjects(ctx context.Context) {
|
func (c *Cache) flushBigObjects(ctx context.Context) {
|
||||||
tick := time.NewTicker(defaultFlushInterval * 10)
|
tick := time.NewTicker(defaultFlushInterval * 10)
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
|
@ -171,7 +171,7 @@ func (c *cache) flushBigObjects(ctx context.Context) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *cache) reportFlushError(msg string, addr string, err error) {
|
func (c *Cache) reportFlushError(msg string, addr string, err error) {
|
||||||
if c.reportError != nil {
|
if c.reportError != nil {
|
||||||
c.reportError(msg, err)
|
c.reportError(msg, err)
|
||||||
} else {
|
} else {
|
||||||
|
@ -181,7 +181,7 @@ func (c *cache) reportFlushError(msg string, addr string, err error) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *cache) flushFSTree(ctx context.Context, ignoreErrors bool) error {
|
func (c *Cache) flushFSTree(ctx context.Context, ignoreErrors bool) error {
|
||||||
var prm common.IteratePrm
|
var prm common.IteratePrm
|
||||||
prm.IgnoreErrors = ignoreErrors
|
prm.IgnoreErrors = ignoreErrors
|
||||||
prm.LazyHandler = func(addr oid.Address, f func() ([]byte, error)) error {
|
prm.LazyHandler = func(addr oid.Address, f func() ([]byte, error)) error {
|
||||||
|
@ -229,7 +229,7 @@ func (c *cache) flushFSTree(ctx context.Context, ignoreErrors bool) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// flushWorker writes objects to the main storage.
|
// flushWorker writes objects to the main storage.
|
||||||
func (c *cache) flushWorker(_ int) {
|
func (c *Cache) flushWorker(_ int) {
|
||||||
defer c.wg.Done()
|
defer c.wg.Done()
|
||||||
|
|
||||||
var obj *object.Object
|
var obj *object.Object
|
||||||
|
@ -249,7 +249,7 @@ func (c *cache) flushWorker(_ int) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// flushObject is used to write object directly to the main storage.
|
// flushObject is used to write object directly to the main storage.
|
||||||
func (c *cache) flushObject(ctx context.Context, obj *object.Object, data []byte) error {
|
func (c *Cache) flushObject(ctx context.Context, obj *object.Object, data []byte) error {
|
||||||
addr := objectCore.AddressOf(obj)
|
addr := objectCore.AddressOf(obj)
|
||||||
|
|
||||||
var prm common.PutPrm
|
var prm common.PutPrm
|
||||||
|
@ -281,7 +281,7 @@ func (c *cache) flushObject(ctx context.Context, obj *object.Object, data []byte
|
||||||
// Flush flushes all objects from the write-cache to the main storage.
|
// Flush flushes all objects from the write-cache to the main storage.
|
||||||
// Write-cache must be in readonly mode to ensure correctness of an operation and
|
// Write-cache must be in readonly mode to ensure correctness of an operation and
|
||||||
// to prevent interference with background flush workers.
|
// to prevent interference with background flush workers.
|
||||||
func (c *cache) Flush(ctx context.Context, ignoreErrors bool) error {
|
func (c *Cache) Flush(ctx context.Context, ignoreErrors bool) error {
|
||||||
ctx, span := tracing.StartSpanFromContext(ctx, "writecache.Flush",
|
ctx, span := tracing.StartSpanFromContext(ctx, "writecache.Flush",
|
||||||
trace.WithAttributes(
|
trace.WithAttributes(
|
||||||
attribute.Bool("ignore_errors", ignoreErrors),
|
attribute.Bool("ignore_errors", ignoreErrors),
|
||||||
|
@ -294,7 +294,7 @@ func (c *cache) Flush(ctx context.Context, ignoreErrors bool) error {
|
||||||
return c.flush(ctx, ignoreErrors)
|
return c.flush(ctx, ignoreErrors)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *cache) flush(ctx context.Context, ignoreErrors bool) error {
|
func (c *Cache) flush(ctx context.Context, ignoreErrors bool) error {
|
||||||
if err := c.flushFSTree(ctx, ignoreErrors); err != nil {
|
if err := c.flushFSTree(ctx, ignoreErrors); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
@ -39,7 +39,7 @@ func TestFlush(t *testing.T) {
|
||||||
smallSize = 256
|
smallSize = 256
|
||||||
)
|
)
|
||||||
|
|
||||||
newCache := func(t *testing.T, opts ...Option) (Cache, *blobstor.BlobStor, *meta.DB) {
|
newCache := func(t *testing.T, opts ...Option) (*Cache, *blobstor.BlobStor, *meta.DB) {
|
||||||
dir := t.TempDir()
|
dir := t.TempDir()
|
||||||
mb := meta.New(
|
mb := meta.New(
|
||||||
meta.WithPath(filepath.Join(dir, "meta")),
|
meta.WithPath(filepath.Join(dir, "meta")),
|
||||||
|
@ -76,7 +76,7 @@ func TestFlush(t *testing.T) {
|
||||||
return wc, bs, mb
|
return wc, bs, mb
|
||||||
}
|
}
|
||||||
|
|
||||||
putObjects := func(t *testing.T, c Cache) []objectPair {
|
putObjects := func(t *testing.T, c *Cache) []objectPair {
|
||||||
objects := make([]objectPair, objCount)
|
objects := make([]objectPair, objCount)
|
||||||
for i := range objects {
|
for i := range objects {
|
||||||
objects[i] = putObject(t, c, 1+(i%2)*smallSize)
|
objects[i] = putObject(t, c, 1+(i%2)*smallSize)
|
||||||
|
@ -109,8 +109,8 @@ func TestFlush(t *testing.T) {
|
||||||
require.NoError(t, bs.SetMode(mode.ReadWrite))
|
require.NoError(t, bs.SetMode(mode.ReadWrite))
|
||||||
require.NoError(t, mb.SetMode(mode.ReadWrite))
|
require.NoError(t, mb.SetMode(mode.ReadWrite))
|
||||||
|
|
||||||
wc.(*cache).flushed.Add(objects[0].addr.EncodeToString(), true)
|
wc.flushed.Add(objects[0].addr.EncodeToString(), true)
|
||||||
wc.(*cache).flushed.Add(objects[1].addr.EncodeToString(), false)
|
wc.flushed.Add(objects[1].addr.EncodeToString(), false)
|
||||||
|
|
||||||
require.NoError(t, wc.Flush(context.Background(), false))
|
require.NoError(t, wc.Flush(context.Background(), false))
|
||||||
|
|
||||||
|
@ -139,8 +139,8 @@ func TestFlush(t *testing.T) {
|
||||||
require.NoError(t, bs.SetMode(mode.ReadWrite))
|
require.NoError(t, bs.SetMode(mode.ReadWrite))
|
||||||
require.NoError(t, mb.SetMode(mode.ReadWrite))
|
require.NoError(t, mb.SetMode(mode.ReadWrite))
|
||||||
|
|
||||||
wc.(*cache).flushed.Add(objects[0].addr.EncodeToString(), true)
|
wc.flushed.Add(objects[0].addr.EncodeToString(), true)
|
||||||
wc.(*cache).flushed.Add(objects[1].addr.EncodeToString(), false)
|
wc.flushed.Add(objects[1].addr.EncodeToString(), false)
|
||||||
|
|
||||||
require.NoError(t, wc.SetMode(mode.Degraded))
|
require.NoError(t, wc.SetMode(mode.Degraded))
|
||||||
|
|
||||||
|
@ -158,13 +158,13 @@ func TestFlush(t *testing.T) {
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("ignore errors", func(t *testing.T) {
|
t.Run("ignore errors", func(t *testing.T) {
|
||||||
testIgnoreErrors := func(t *testing.T, f func(*cache)) {
|
testIgnoreErrors := func(t *testing.T, f func(*Cache)) {
|
||||||
var errCount atomic.Uint32
|
var errCount atomic.Uint32
|
||||||
wc, bs, mb := newCache(t, WithReportErrorFunc(func(message string, err error) {
|
wc, bs, mb := newCache(t, WithReportErrorFunc(func(message string, err error) {
|
||||||
errCount.Inc()
|
errCount.Inc()
|
||||||
}))
|
}))
|
||||||
objects := putObjects(t, wc)
|
objects := putObjects(t, wc)
|
||||||
f(wc.(*cache))
|
f(wc)
|
||||||
|
|
||||||
require.NoError(t, wc.SetMode(mode.ReadOnly))
|
require.NoError(t, wc.SetMode(mode.ReadOnly))
|
||||||
require.NoError(t, bs.SetMode(mode.ReadWrite))
|
require.NoError(t, bs.SetMode(mode.ReadWrite))
|
||||||
|
@ -178,7 +178,7 @@ func TestFlush(t *testing.T) {
|
||||||
check(t, mb, bs, objects)
|
check(t, mb, bs, objects)
|
||||||
}
|
}
|
||||||
t.Run("db, invalid address", func(t *testing.T) {
|
t.Run("db, invalid address", func(t *testing.T) {
|
||||||
testIgnoreErrors(t, func(c *cache) {
|
testIgnoreErrors(t, func(c *Cache) {
|
||||||
_, data := newObject(t, 1)
|
_, data := newObject(t, 1)
|
||||||
require.NoError(t, c.db.Batch(func(tx *bbolt.Tx) error {
|
require.NoError(t, c.db.Batch(func(tx *bbolt.Tx) error {
|
||||||
b := tx.Bucket(defaultBucket)
|
b := tx.Bucket(defaultBucket)
|
||||||
|
@ -187,7 +187,7 @@ func TestFlush(t *testing.T) {
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
t.Run("db, invalid object", func(t *testing.T) {
|
t.Run("db, invalid object", func(t *testing.T) {
|
||||||
testIgnoreErrors(t, func(c *cache) {
|
testIgnoreErrors(t, func(c *Cache) {
|
||||||
require.NoError(t, c.db.Batch(func(tx *bbolt.Tx) error {
|
require.NoError(t, c.db.Batch(func(tx *bbolt.Tx) error {
|
||||||
b := tx.Bucket(defaultBucket)
|
b := tx.Bucket(defaultBucket)
|
||||||
return b.Put([]byte(oidtest.Address().EncodeToString()), []byte{1, 2, 3})
|
return b.Put([]byte(oidtest.Address().EncodeToString()), []byte{1, 2, 3})
|
||||||
|
@ -195,7 +195,7 @@ func TestFlush(t *testing.T) {
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
t.Run("fs, read error", func(t *testing.T) {
|
t.Run("fs, read error", func(t *testing.T) {
|
||||||
testIgnoreErrors(t, func(c *cache) {
|
testIgnoreErrors(t, func(c *Cache) {
|
||||||
obj, data := newObject(t, 1)
|
obj, data := newObject(t, 1)
|
||||||
|
|
||||||
var prm common.PutPrm
|
var prm common.PutPrm
|
||||||
|
@ -214,7 +214,7 @@ func TestFlush(t *testing.T) {
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
t.Run("fs, invalid object", func(t *testing.T) {
|
t.Run("fs, invalid object", func(t *testing.T) {
|
||||||
testIgnoreErrors(t, func(c *cache) {
|
testIgnoreErrors(t, func(c *Cache) {
|
||||||
var prm common.PutPrm
|
var prm common.PutPrm
|
||||||
prm.Address = oidtest.Address()
|
prm.Address = oidtest.Address()
|
||||||
prm.RawData = []byte{1, 2, 3}
|
prm.RawData = []byte{1, 2, 3}
|
||||||
|
@ -286,7 +286,7 @@ func TestFlush(t *testing.T) {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func putObject(t *testing.T, c Cache, size int) objectPair {
|
func putObject(t *testing.T, c *Cache, size int) objectPair {
|
||||||
obj, data := newObject(t, size)
|
obj, data := newObject(t, size)
|
||||||
|
|
||||||
var prm common.PutPrm
|
var prm common.PutPrm
|
||||||
|
@ -319,12 +319,11 @@ func newObject(t *testing.T, size int) (*object.Object, []byte) {
|
||||||
return obj, data
|
return obj, data
|
||||||
}
|
}
|
||||||
|
|
||||||
func initWC(t *testing.T, wc Cache) {
|
func initWC(t *testing.T, wc *Cache) {
|
||||||
require.NoError(t, wc.Init())
|
require.NoError(t, wc.Init())
|
||||||
|
|
||||||
require.Eventually(t, func() bool {
|
require.Eventually(t, func() bool {
|
||||||
rawWc := wc.(*cache)
|
return wc.initialized.Load()
|
||||||
return rawWc.initialized.Load()
|
|
||||||
}, 100*time.Second, 1*time.Millisecond)
|
}, 100*time.Second, 1*time.Millisecond)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -18,7 +18,7 @@ import (
|
||||||
// Get returns object from write-cache.
|
// Get returns object from write-cache.
|
||||||
//
|
//
|
||||||
// Returns an error of type apistatus.ObjectNotFound if the requested object is missing in write-cache.
|
// Returns an error of type apistatus.ObjectNotFound if the requested object is missing in write-cache.
|
||||||
func (c *cache) Get(ctx context.Context, addr oid.Address) (*objectSDK.Object, error) {
|
func (c *Cache) Get(ctx context.Context, addr oid.Address) (*objectSDK.Object, error) {
|
||||||
saddr := addr.EncodeToString()
|
saddr := addr.EncodeToString()
|
||||||
|
|
||||||
ctx, span := tracing.StartSpanFromContext(ctx, "writecache.Get",
|
ctx, span := tracing.StartSpanFromContext(ctx, "writecache.Get",
|
||||||
|
@ -46,7 +46,7 @@ func (c *cache) Get(ctx context.Context, addr oid.Address) (*objectSDK.Object, e
|
||||||
// Head returns object header from write-cache.
|
// Head returns object header from write-cache.
|
||||||
//
|
//
|
||||||
// Returns an error of type apistatus.ObjectNotFound if the requested object is missing in write-cache.
|
// Returns an error of type apistatus.ObjectNotFound if the requested object is missing in write-cache.
|
||||||
func (c *cache) Head(ctx context.Context, addr oid.Address) (*objectSDK.Object, error) {
|
func (c *Cache) Head(ctx context.Context, addr oid.Address) (*objectSDK.Object, error) {
|
||||||
ctx, span := tracing.StartSpanFromContext(ctx, "writecache.Head",
|
ctx, span := tracing.StartSpanFromContext(ctx, "writecache.Head",
|
||||||
trace.WithAttributes(
|
trace.WithAttributes(
|
||||||
attribute.String("address", addr.EncodeToString()),
|
attribute.String("address", addr.EncodeToString()),
|
||||||
|
|
|
@ -15,7 +15,7 @@ import (
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (c *cache) initFlushMarks(ctx context.Context) {
|
func (c *Cache) initFlushMarks(ctx context.Context) {
|
||||||
var localWG sync.WaitGroup
|
var localWG sync.WaitGroup
|
||||||
|
|
||||||
localWG.Add(1)
|
localWG.Add(1)
|
||||||
|
@ -54,7 +54,7 @@ func (c *cache) initFlushMarks(ctx context.Context) {
|
||||||
|
|
||||||
var errStopIter = errors.New("stop iteration")
|
var errStopIter = errors.New("stop iteration")
|
||||||
|
|
||||||
func (c *cache) fsTreeFlushMarkUpdate(ctx context.Context) {
|
func (c *Cache) fsTreeFlushMarkUpdate(ctx context.Context) {
|
||||||
c.log.Info(logs.WritecacheFillingFlushMarksForObjectsInFSTree)
|
c.log.Info(logs.WritecacheFillingFlushMarksForObjectsInFSTree)
|
||||||
|
|
||||||
var prm common.IteratePrm
|
var prm common.IteratePrm
|
||||||
|
@ -95,7 +95,7 @@ func (c *cache) fsTreeFlushMarkUpdate(ctx context.Context) {
|
||||||
c.log.Info(logs.WritecacheFinishedUpdatingFSTreeFlushMarks)
|
c.log.Info(logs.WritecacheFinishedUpdatingFSTreeFlushMarks)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *cache) dbFlushMarkUpdate(ctx context.Context) {
|
func (c *Cache) dbFlushMarkUpdate(ctx context.Context) {
|
||||||
c.log.Info(logs.WritecacheFillingFlushMarksForObjectsInDatabase)
|
c.log.Info(logs.WritecacheFillingFlushMarksForObjectsInDatabase)
|
||||||
|
|
||||||
c.modeMtx.RLock()
|
c.modeMtx.RLock()
|
||||||
|
@ -173,7 +173,7 @@ func (c *cache) dbFlushMarkUpdate(ctx context.Context) {
|
||||||
// flushStatus returns info about the object state in the main storage.
|
// flushStatus returns info about the object state in the main storage.
|
||||||
// First return value is true iff object exists.
|
// First return value is true iff object exists.
|
||||||
// Second return value is true iff object can be safely removed.
|
// Second return value is true iff object can be safely removed.
|
||||||
func (c *cache) flushStatus(ctx context.Context, addr oid.Address) (bool, bool) {
|
func (c *Cache) flushStatus(ctx context.Context, addr oid.Address) (bool, bool) {
|
||||||
var existsPrm meta.ExistsPrm
|
var existsPrm meta.ExistsPrm
|
||||||
existsPrm.SetAddress(addr)
|
existsPrm.SetAddress(addr)
|
||||||
|
|
||||||
|
|
|
@ -31,7 +31,7 @@ func (p *IterationPrm) WithIgnoreErrors(ignore bool) {
|
||||||
// Iterate iterates over all objects present in write cache.
|
// Iterate iterates over all objects present in write cache.
|
||||||
// This is very difficult to do correctly unless write-cache is put in read-only mode.
|
// This is very difficult to do correctly unless write-cache is put in read-only mode.
|
||||||
// Thus we silently fail if shard is not in read-only mode to avoid reporting misleading results.
|
// Thus we silently fail if shard is not in read-only mode to avoid reporting misleading results.
|
||||||
func (c *cache) Iterate(prm IterationPrm) error {
|
func (c *Cache) Iterate(prm IterationPrm) error {
|
||||||
c.modeMtx.RLock()
|
c.modeMtx.RLock()
|
||||||
defer c.modeMtx.RUnlock()
|
defer c.modeMtx.RUnlock()
|
||||||
if !c.readOnly() {
|
if !c.readOnly() {
|
||||||
|
|
|
@ -22,7 +22,7 @@ var ErrNotInitialized = logicerr.New("write-cache is not initialized yet")
|
||||||
// SetMode sets write-cache mode of operation.
|
// SetMode sets write-cache mode of operation.
|
||||||
// When shard is put in read-only mode all objects in memory are flushed to disk
|
// When shard is put in read-only mode all objects in memory are flushed to disk
|
||||||
// and all background jobs are suspended.
|
// and all background jobs are suspended.
|
||||||
func (c *cache) SetMode(m mode.Mode) error {
|
func (c *Cache) SetMode(m mode.Mode) error {
|
||||||
ctx, span := tracing.StartSpanFromContext(context.TODO(), "writecache.SetMode",
|
ctx, span := tracing.StartSpanFromContext(context.TODO(), "writecache.SetMode",
|
||||||
trace.WithAttributes(
|
trace.WithAttributes(
|
||||||
attribute.String("mode", m.String()),
|
attribute.String("mode", m.String()),
|
||||||
|
@ -33,7 +33,7 @@ func (c *cache) SetMode(m mode.Mode) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// setMode applies new mode. Must be called with cache.modeMtx lock taken.
|
// setMode applies new mode. Must be called with cache.modeMtx lock taken.
|
||||||
func (c *cache) setMode(ctx context.Context, m mode.Mode) error {
|
func (c *Cache) setMode(ctx context.Context, m mode.Mode) error {
|
||||||
var err error
|
var err error
|
||||||
turnOffMeta := m.NoMetabase()
|
turnOffMeta := m.NoMetabase()
|
||||||
|
|
||||||
|
@ -89,6 +89,6 @@ func (c *cache) setMode(ctx context.Context, m mode.Mode) error {
|
||||||
|
|
||||||
// readOnly returns true if current mode is read-only.
|
// readOnly returns true if current mode is read-only.
|
||||||
// `c.modeMtx` must be taken.
|
// `c.modeMtx` must be taken.
|
||||||
func (c *cache) readOnly() bool {
|
func (c *Cache) readOnly() bool {
|
||||||
return c.mode.ReadOnly()
|
return c.mode.ReadOnly()
|
||||||
}
|
}
|
||||||
|
|
|
@ -25,7 +25,7 @@ var (
|
||||||
// Returns ErrNotInitialized if write-cache has not been initialized yet.
|
// Returns ErrNotInitialized if write-cache has not been initialized yet.
|
||||||
// Returns ErrOutOfSpace if saving an object leads to WC's size overflow.
|
// Returns ErrOutOfSpace if saving an object leads to WC's size overflow.
|
||||||
// Returns ErrBigObject if an objects exceeds maximum object size.
|
// Returns ErrBigObject if an objects exceeds maximum object size.
|
||||||
func (c *cache) Put(ctx context.Context, prm common.PutPrm) (common.PutRes, error) {
|
func (c *Cache) Put(ctx context.Context, prm common.PutPrm) (common.PutRes, error) {
|
||||||
ctx, span := tracing.StartSpanFromContext(ctx, "writecache.Put",
|
ctx, span := tracing.StartSpanFromContext(ctx, "writecache.Put",
|
||||||
trace.WithAttributes(
|
trace.WithAttributes(
|
||||||
attribute.String("address", prm.Address.EncodeToString()),
|
attribute.String("address", prm.Address.EncodeToString()),
|
||||||
|
@ -60,7 +60,7 @@ func (c *cache) Put(ctx context.Context, prm common.PutPrm) (common.PutRes, erro
|
||||||
|
|
||||||
// putSmall persists small objects to the write-cache database and
|
// putSmall persists small objects to the write-cache database and
|
||||||
// pushes the to the flush workers queue.
|
// pushes the to the flush workers queue.
|
||||||
func (c *cache) putSmall(obj objectInfo) error {
|
func (c *Cache) putSmall(obj objectInfo) error {
|
||||||
cacheSize := c.estimateCacheSize()
|
cacheSize := c.estimateCacheSize()
|
||||||
if c.maxCacheSize < c.incSizeDB(cacheSize) {
|
if c.maxCacheSize < c.incSizeDB(cacheSize) {
|
||||||
return ErrOutOfSpace
|
return ErrOutOfSpace
|
||||||
|
@ -82,7 +82,7 @@ func (c *cache) putSmall(obj objectInfo) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// putBig writes object to FSTree and pushes it to the flush workers queue.
|
// putBig writes object to FSTree and pushes it to the flush workers queue.
|
||||||
func (c *cache) putBig(ctx context.Context, addr string, prm common.PutPrm) error {
|
func (c *Cache) putBig(ctx context.Context, addr string, prm common.PutPrm) error {
|
||||||
cacheSz := c.estimateCacheSize()
|
cacheSz := c.estimateCacheSize()
|
||||||
if c.maxCacheSize < c.incSizeFS(cacheSz) {
|
if c.maxCacheSize < c.incSizeFS(cacheSz) {
|
||||||
return ErrOutOfSpace
|
return ErrOutOfSpace
|
||||||
|
|
|
@ -7,15 +7,15 @@ import (
|
||||||
"go.uber.org/atomic"
|
"go.uber.org/atomic"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (c *cache) estimateCacheSize() uint64 {
|
func (c *Cache) estimateCacheSize() uint64 {
|
||||||
return c.objCounters.DB()*c.smallObjectSize + c.objCounters.FS()*c.maxObjectSize
|
return c.objCounters.DB()*c.smallObjectSize + c.objCounters.FS()*c.maxObjectSize
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *cache) incSizeDB(sz uint64) uint64 {
|
func (c *Cache) incSizeDB(sz uint64) uint64 {
|
||||||
return sz + c.smallObjectSize
|
return sz + c.smallObjectSize
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *cache) incSizeFS(sz uint64) uint64 {
|
func (c *Cache) incSizeFS(sz uint64) uint64 {
|
||||||
return sz + c.maxObjectSize
|
return sz + c.maxObjectSize
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -47,7 +47,7 @@ func (x *counters) FS() uint64 {
|
||||||
return x.cFS.Load()
|
return x.cFS.Load()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *cache) initCounters() error {
|
func (c *Cache) initCounters() error {
|
||||||
var inDB uint64
|
var inDB uint64
|
||||||
err := c.db.View(func(tx *bbolt.Tx) error {
|
err := c.db.View(func(tx *bbolt.Tx) error {
|
||||||
b := tx.Bucket(defaultBucket)
|
b := tx.Bucket(defaultBucket)
|
||||||
|
|
|
@ -39,7 +39,7 @@ type store struct {
|
||||||
|
|
||||||
const dbName = "small.bolt"
|
const dbName = "small.bolt"
|
||||||
|
|
||||||
func (c *cache) openStore(readOnly bool) error {
|
func (c *Cache) openStore(readOnly bool) error {
|
||||||
err := util.MkdirAllX(c.path, os.ModePerm)
|
err := util.MkdirAllX(c.path, os.ModePerm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -88,7 +88,7 @@ func (c *cache) openStore(readOnly bool) error {
|
||||||
// To minimize interference with the client operations, the actual removal
|
// To minimize interference with the client operations, the actual removal
|
||||||
// is done in batches.
|
// is done in batches.
|
||||||
// It is not thread-safe and is used only as an evict callback to LRU cache.
|
// It is not thread-safe and is used only as an evict callback to LRU cache.
|
||||||
func (c *cache) removeFlushed(key string, value bool) {
|
func (c *Cache) removeFlushed(key string, value bool) {
|
||||||
fromDatabase := value
|
fromDatabase := value
|
||||||
if fromDatabase {
|
if fromDatabase {
|
||||||
c.dbKeysToRemove = append(c.dbKeysToRemove, key)
|
c.dbKeysToRemove = append(c.dbKeysToRemove, key)
|
||||||
|
@ -102,7 +102,7 @@ func (c *cache) removeFlushed(key string, value bool) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *cache) deleteFromDB(keys []string) []string {
|
func (c *Cache) deleteFromDB(keys []string) []string {
|
||||||
if len(keys) == 0 {
|
if len(keys) == 0 {
|
||||||
return keys
|
return keys
|
||||||
}
|
}
|
||||||
|
@ -133,7 +133,7 @@ func (c *cache) deleteFromDB(keys []string) []string {
|
||||||
return keys[:len(keys)-errorIndex]
|
return keys[:len(keys)-errorIndex]
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *cache) deleteFromDisk(keys []string) []string {
|
func (c *Cache) deleteFromDisk(keys []string) []string {
|
||||||
if len(keys) == 0 {
|
if len(keys) == 0 {
|
||||||
return keys
|
return keys
|
||||||
}
|
}
|
||||||
|
|
|
@ -6,12 +6,10 @@ import (
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing"
|
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
|
||||||
"go.etcd.io/bbolt"
|
"go.etcd.io/bbolt"
|
||||||
"go.uber.org/atomic"
|
"go.uber.org/atomic"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
|
@ -24,29 +22,7 @@ type Info struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Cache represents write-cache for objects.
|
// Cache represents write-cache for objects.
|
||||||
type Cache interface {
|
type Cache struct {
|
||||||
Get(ctx context.Context, address oid.Address) (*object.Object, error)
|
|
||||||
Head(context.Context, oid.Address) (*object.Object, error)
|
|
||||||
// Delete removes object referenced by the given oid.Address from the
|
|
||||||
// Cache. Returns any error encountered that prevented the object to be
|
|
||||||
// removed.
|
|
||||||
//
|
|
||||||
// Returns apistatus.ObjectNotFound if object is missing in the Cache.
|
|
||||||
// Returns ErrReadOnly if the Cache is currently in the read-only mode.
|
|
||||||
Delete(context.Context, oid.Address) error
|
|
||||||
Iterate(IterationPrm) error
|
|
||||||
Put(context.Context, common.PutPrm) (common.PutRes, error)
|
|
||||||
SetMode(mode.Mode) error
|
|
||||||
SetLogger(*logger.Logger)
|
|
||||||
DumpInfo() Info
|
|
||||||
Flush(context.Context, bool) error
|
|
||||||
|
|
||||||
Init() error
|
|
||||||
Open(readOnly bool) error
|
|
||||||
Close() error
|
|
||||||
}
|
|
||||||
|
|
||||||
type cache struct {
|
|
||||||
options
|
options
|
||||||
|
|
||||||
// mtx protects statistics, counters and compressFlags.
|
// mtx protects statistics, counters and compressFlags.
|
||||||
|
@ -94,8 +70,8 @@ var (
|
||||||
)
|
)
|
||||||
|
|
||||||
// New creates new writecache instance.
|
// New creates new writecache instance.
|
||||||
func New(opts ...Option) Cache {
|
func New(opts ...Option) *Cache {
|
||||||
c := &cache{
|
c := &Cache{
|
||||||
flushCh: make(chan *object.Object),
|
flushCh: make(chan *object.Object),
|
||||||
mode: mode.ReadWrite,
|
mode: mode.ReadWrite,
|
||||||
stopInitCh: make(chan struct{}),
|
stopInitCh: make(chan struct{}),
|
||||||
|
@ -127,18 +103,18 @@ func New(opts ...Option) Cache {
|
||||||
}
|
}
|
||||||
|
|
||||||
// SetLogger sets logger. It is used after the shard ID was generated to use it in logs.
|
// SetLogger sets logger. It is used after the shard ID was generated to use it in logs.
|
||||||
func (c *cache) SetLogger(l *logger.Logger) {
|
func (c *Cache) SetLogger(l *logger.Logger) {
|
||||||
c.log = l
|
c.log = l
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *cache) DumpInfo() Info {
|
func (c *Cache) DumpInfo() Info {
|
||||||
return Info{
|
return Info{
|
||||||
Path: c.path,
|
Path: c.path,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Open opens and initializes database. Reads object counters from the ObjectCounters instance.
|
// Open opens and initializes database. Reads object counters from the ObjectCounters instance.
|
||||||
func (c *cache) Open(readOnly bool) error {
|
func (c *Cache) Open(readOnly bool) error {
|
||||||
err := c.openStore(readOnly)
|
err := c.openStore(readOnly)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -152,7 +128,7 @@ func (c *cache) Open(readOnly bool) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Init runs necessary services.
|
// Init runs necessary services.
|
||||||
func (c *cache) Init() error {
|
func (c *Cache) Init() error {
|
||||||
ctx, span := tracing.StartSpanFromContext(context.TODO(), "writecache.Init")
|
ctx, span := tracing.StartSpanFromContext(context.TODO(), "writecache.Init")
|
||||||
defer span.End()
|
defer span.End()
|
||||||
|
|
||||||
|
@ -162,7 +138,7 @@ func (c *cache) Init() error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Close closes db connection and stops services. Executes ObjectCounters.FlushAndClose op.
|
// Close closes db connection and stops services. Executes ObjectCounters.FlushAndClose op.
|
||||||
func (c *cache) Close() error {
|
func (c *Cache) Close() error {
|
||||||
// Finish all in-progress operations.
|
// Finish all in-progress operations.
|
||||||
if err := c.setMode(context.TODO(), mode.ReadOnly); err != nil {
|
if err := c.setMode(context.TODO(), mode.ReadOnly); err != nil {
|
||||||
return err
|
return err
|
||||||
|
|
Loading…
Reference in a new issue