[#569] writecache: Allow to seal writecache after flush

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
Dmitrii Stepanov 2023-12-27 08:20:15 +03:00
parent b118734909
commit 0cb0fc1735
12 changed files with 121 additions and 41 deletions

View file

@ -8,6 +8,8 @@ import (
"github.com/spf13/cobra" "github.com/spf13/cobra"
) )
const sealFlag = "seal"
var flushCacheCmd = &cobra.Command{ var flushCacheCmd = &cobra.Command{
Use: "flush-cache", Use: "flush-cache",
Short: "Flush objects from the write-cache to the main storage", Short: "Flush objects from the write-cache to the main storage",
@ -18,7 +20,10 @@ var flushCacheCmd = &cobra.Command{
func flushCache(cmd *cobra.Command, _ []string) { func flushCache(cmd *cobra.Command, _ []string) {
pk := key.Get(cmd) pk := key.Get(cmd)
req := &control.FlushCacheRequest{Body: new(control.FlushCacheRequest_Body)} seal, _ := cmd.Flags().GetBool(sealFlag)
req := &control.FlushCacheRequest{Body: &control.FlushCacheRequest_Body{
Seal: seal,
}}
req.Body.Shard_ID = getShardIDList(cmd) req.Body.Shard_ID = getShardIDList(cmd)
signRequest(cmd, pk, req) signRequest(cmd, pk, req)
@ -44,6 +49,7 @@ func initControlFlushCacheCmd() {
ff := flushCacheCmd.Flags() ff := flushCacheCmd.Flags()
ff.StringSlice(shardIDFlag, nil, "List of shard IDs in base58 encoding") ff.StringSlice(shardIDFlag, nil, "List of shard IDs in base58 encoding")
ff.Bool(shardAllFlag, false, "Process all shards") ff.Bool(shardAllFlag, false, "Process all shards")
ff.Bool(sealFlag, false, "Writecache will be left in read-only mode after flush completed")
flushCacheCmd.MarkFlagsMutuallyExclusive(shardIDFlag, shardAllFlag) flushCacheCmd.MarkFlagsMutuallyExclusive(shardIDFlag, shardAllFlag)
} }

View file

@ -17,6 +17,7 @@ import (
type FlushWriteCachePrm struct { type FlushWriteCachePrm struct {
shardID *shard.ID shardID *shard.ID
ignoreErrors bool ignoreErrors bool
seal bool
} }
// SetShardID is an option to set shard ID. // SetShardID is an option to set shard ID.
@ -26,11 +27,16 @@ func (p *FlushWriteCachePrm) SetShardID(id *shard.ID) {
p.shardID = id p.shardID = id
} }
// SetIgnoreErrors sets errors ignore flag.. // SetIgnoreErrors sets errors ignore flag.
func (p *FlushWriteCachePrm) SetIgnoreErrors(ignore bool) { func (p *FlushWriteCachePrm) SetIgnoreErrors(ignore bool) {
p.ignoreErrors = ignore p.ignoreErrors = ignore
} }
// SetSeal sets seal flag.
func (p *FlushWriteCachePrm) SetSeal(v bool) {
p.seal = v
}
// FlushWriteCacheRes groups the resulting values of FlushWriteCache operation. // FlushWriteCacheRes groups the resulting values of FlushWriteCache operation.
type FlushWriteCacheRes struct{} type FlushWriteCacheRes struct{}
@ -40,6 +46,7 @@ func (e *StorageEngine) FlushWriteCache(ctx context.Context, p FlushWriteCachePr
trace.WithAttributes( trace.WithAttributes(
attribute.String("shard)id", p.shardID.String()), attribute.String("shard)id", p.shardID.String()),
attribute.Bool("ignore_errors", p.ignoreErrors), attribute.Bool("ignore_errors", p.ignoreErrors),
attribute.Bool("seal", p.seal),
)) ))
defer span.End() defer span.End()
@ -53,6 +60,7 @@ func (e *StorageEngine) FlushWriteCache(ctx context.Context, p FlushWriteCachePr
var prm shard.FlushWriteCachePrm var prm shard.FlushWriteCachePrm
prm.SetIgnoreErrors(p.ignoreErrors) prm.SetIgnoreErrors(p.ignoreErrors)
prm.SetSeal(p.seal)
return FlushWriteCacheRes{}, sh.FlushWriteCache(ctx, prm) return FlushWriteCacheRes{}, sh.FlushWriteCache(ctx, prm)
} }

View file

@ -12,6 +12,7 @@ import (
// FlushWriteCachePrm represents parameters of a `FlushWriteCache` operation. // FlushWriteCachePrm represents parameters of a `FlushWriteCache` operation.
type FlushWriteCachePrm struct { type FlushWriteCachePrm struct {
ignoreErrors bool ignoreErrors bool
seal bool
} }
// SetIgnoreErrors sets the flag to ignore read-errors during flush. // SetIgnoreErrors sets the flag to ignore read-errors during flush.
@ -19,6 +20,11 @@ func (p *FlushWriteCachePrm) SetIgnoreErrors(ignore bool) {
p.ignoreErrors = ignore p.ignoreErrors = ignore
} }
// SetSeal sets the flag to left writecache in read-only mode after flush.
func (p *FlushWriteCachePrm) SetSeal(v bool) {
p.seal = v
}
// errWriteCacheDisabled is returned when an operation on write-cache is performed, // errWriteCacheDisabled is returned when an operation on write-cache is performed,
// but write-cache is disabled. // but write-cache is disabled.
var errWriteCacheDisabled = errors.New("write-cache is disabled") var errWriteCacheDisabled = errors.New("write-cache is disabled")
@ -29,6 +35,7 @@ func (s *Shard) FlushWriteCache(ctx context.Context, p FlushWriteCachePrm) error
trace.WithAttributes( trace.WithAttributes(
attribute.String("shard_id", s.ID().String()), attribute.String("shard_id", s.ID().String()),
attribute.Bool("ignore_errors", p.ignoreErrors), attribute.Bool("ignore_errors", p.ignoreErrors),
attribute.Bool("seal", p.seal),
)) ))
defer span.End() defer span.End()
@ -47,5 +54,5 @@ func (s *Shard) FlushWriteCache(ctx context.Context, p FlushWriteCachePrm) error
return ErrDegradedMode return ErrDegradedMode
} }
return s.writeCache.Flush(ctx, p.ignoreErrors) return s.writeCache.Flush(ctx, p.ignoreErrors, p.seal)
} }

View file

@ -12,6 +12,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase" meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
@ -33,6 +34,8 @@ const (
defaultFlushInterval = time.Second defaultFlushInterval = time.Second
) )
var errIterationCompleted = errors.New("iteration completed")
// runFlushLoop starts background workers which periodically flush objects to the blobstor. // runFlushLoop starts background workers which periodically flush objects to the blobstor.
func (c *cache) runFlushLoop(ctx context.Context) { func (c *cache) runFlushLoop(ctx context.Context) {
if c.disableBackgroundFlush { if c.disableBackgroundFlush {
@ -229,7 +232,7 @@ func (c *cache) workerFlushSmall(ctx context.Context) {
continue continue
} }
c.deleteFromDB(objInfo.addr) c.deleteFromDB(objInfo.addr, true)
} }
} }
@ -270,19 +273,29 @@ func (c *cache) flushObject(ctx context.Context, obj *objectSDK.Object, data []b
} }
// Flush flushes all objects from the write-cache to the main storage. // Flush flushes all objects from the write-cache to the main storage.
// Write-cache must be in readonly mode to ensure correctness of an operation and func (c *cache) Flush(ctx context.Context, ignoreErrors, seal bool) error {
// to prevent interference with background flush workers. ctx, span := tracing.StartSpanFromContext(ctx, "writecache.Flush",
func (c *cache) Flush(ctx context.Context, ignoreErrors bool) error {
ctx, span := tracing.StartSpanFromContext(ctx, "Flush",
trace.WithAttributes( trace.WithAttributes(
attribute.Bool("ignore_errors", ignoreErrors), attribute.Bool("ignore_errors", ignoreErrors),
attribute.Bool("seal", seal),
)) ))
defer span.End() defer span.End()
c.modeMtx.RLock() c.modeMtx.Lock() // exclusive lock to not to conflict with background flush
defer c.modeMtx.RUnlock() defer c.modeMtx.Unlock()
return c.flush(ctx, ignoreErrors) if err := c.flush(ctx, ignoreErrors); err != nil {
return err
}
if seal {
m := c.mode | mode.ReadOnly
if err := c.setMode(ctx, m, ignoreErrors); err != nil {
return err
}
c.metrics.SetMode(m)
}
return nil
} }
func (c *cache) flush(ctx context.Context, ignoreErrors bool) error { func (c *cache) flush(ctx context.Context, ignoreErrors bool) error {
@ -290,13 +303,53 @@ func (c *cache) flush(ctx context.Context, ignoreErrors bool) error {
return err return err
} }
return c.db.View(func(tx *bbolt.Tx) error { var last string
for {
batch, err := c.readNextDBBatch(ignoreErrors, last)
if err != nil {
return err
}
if len(batch) == 0 {
break
}
for _, item := range batch {
var obj objectSDK.Object
if err := obj.Unmarshal(item.data); err != nil {
c.reportFlushError(logs.FSTreeCantDecodeDBObjectAddress, item.address, metaerr.Wrap(err))
if ignoreErrors {
continue
}
return err
}
if err := c.flushObject(ctx, &obj, item.data, StorageTypeDB); err != nil {
return err
}
c.deleteFromDB(item.address, false)
}
last = batch[len(batch)-1].address
}
return nil
}
type batchItem struct {
data []byte
address string
}
func (c *cache) readNextDBBatch(ignoreErrors bool, last string) ([]batchItem, error) {
const batchSize = 100
var batch []batchItem
err := c.db.View(func(tx *bbolt.Tx) error {
var addr oid.Address var addr oid.Address
b := tx.Bucket(defaultBucket) b := tx.Bucket(defaultBucket)
cs := b.Cursor() cs := b.Cursor()
for k, data := cs.Seek(nil); k != nil; k, data = cs.Next() { for k, data := cs.Seek([]byte(last)); k != nil; k, data = cs.Next() {
sa := string(k) sa := string(k)
if sa == last {
continue
}
if err := addr.DecodeString(sa); err != nil { if err := addr.DecodeString(sa); err != nil {
c.reportFlushError(logs.FSTreeCantDecodeDBObjectAddress, sa, metaerr.Wrap(err)) c.reportFlushError(logs.FSTreeCantDecodeDBObjectAddress, sa, metaerr.Wrap(err))
if ignoreErrors { if ignoreErrors {
@ -305,19 +358,15 @@ func (c *cache) flush(ctx context.Context, ignoreErrors bool) error {
return err return err
} }
var obj objectSDK.Object batch = append(batch, batchItem{data: bytes.Clone(data), address: sa})
if err := obj.Unmarshal(data); err != nil { if len(batch) == batchSize {
c.reportFlushError(logs.FSTreeCantDecodeDBObjectAddress, sa, metaerr.Wrap(err)) return errIterationCompleted
if ignoreErrors {
continue
}
return err
}
if err := c.flushObject(ctx, &obj, data, StorageTypeDB); err != nil {
return err
} }
} }
return nil return nil
}) })
if err == nil || errors.Is(err, errIterationCompleted) {
return batch, nil
}
return nil, err
} }

View file

@ -147,7 +147,7 @@ func runFlushTest[Option any](
require.NoError(t, bs.SetMode(mode.ReadWrite)) require.NoError(t, bs.SetMode(mode.ReadWrite))
require.NoError(t, mb.SetMode(mode.ReadWrite)) require.NoError(t, mb.SetMode(mode.ReadWrite))
require.NoError(t, wc.Flush(context.Background(), false)) require.NoError(t, wc.Flush(context.Background(), false, false))
check(t, mb, bs, objects) check(t, mb, bs, objects)
}) })
@ -159,8 +159,6 @@ func runFlushTest[Option any](
// Blobstor is read-only, so we expect en error from `flush` here. // Blobstor is read-only, so we expect en error from `flush` here.
require.Error(t, wc.SetMode(mode.Degraded)) require.Error(t, wc.SetMode(mode.Degraded))
// First move to read-only mode to close background workers.
require.NoError(t, wc.SetMode(mode.ReadOnly))
require.NoError(t, bs.SetMode(mode.ReadWrite)) require.NoError(t, bs.SetMode(mode.ReadWrite))
require.NoError(t, mb.SetMode(mode.ReadWrite)) require.NoError(t, mb.SetMode(mode.ReadWrite))
require.NoError(t, wc.SetMode(mode.Degraded)) require.NoError(t, wc.SetMode(mode.Degraded))
@ -177,14 +175,13 @@ func runFlushTest[Option any](
objects := putObjects(t, wc) objects := putObjects(t, wc)
f.InjectFn(t, wc) f.InjectFn(t, wc)
require.NoError(t, wc.SetMode(mode.ReadOnly))
require.NoError(t, bs.SetMode(mode.ReadWrite)) require.NoError(t, bs.SetMode(mode.ReadWrite))
require.NoError(t, mb.SetMode(mode.ReadWrite)) require.NoError(t, mb.SetMode(mode.ReadWrite))
require.Equal(t, uint32(0), errCount.Load()) require.Equal(t, uint32(0), errCount.Load())
require.Error(t, wc.Flush(context.Background(), false)) require.Error(t, wc.Flush(context.Background(), false, false))
require.Greater(t, errCount.Load(), uint32(0)) require.Greater(t, errCount.Load(), uint32(0))
require.NoError(t, wc.Flush(context.Background(), true)) require.NoError(t, wc.Flush(context.Background(), true, false))
check(t, mb, bs, objects) check(t, mb, bs, objects)
}) })

View file

@ -25,7 +25,7 @@ func (c *cache) SetMode(m mode.Mode) error {
c.modeMtx.Lock() c.modeMtx.Lock()
defer c.modeMtx.Unlock() defer c.modeMtx.Unlock()
err := c.setMode(ctx, m) err := c.setMode(ctx, m, true)
if err == nil { if err == nil {
c.metrics.SetMode(m) c.metrics.SetMode(m)
} }
@ -33,12 +33,12 @@ func (c *cache) SetMode(m mode.Mode) error {
} }
// setMode applies new mode. Must be called with cache.modeMtx lock taken. // setMode applies new mode. Must be called with cache.modeMtx lock taken.
func (c *cache) setMode(ctx context.Context, m mode.Mode) error { func (c *cache) setMode(ctx context.Context, m mode.Mode, ignoreErrors bool) error {
var err error var err error
turnOffMeta := m.NoMetabase() turnOffMeta := m.NoMetabase()
if turnOffMeta && !c.mode.NoMetabase() { if turnOffMeta && !c.mode.NoMetabase() {
err = c.flush(ctx, true) err = c.flush(ctx, ignoreErrors)
if err != nil { if err != nil {
return err return err
} }

View file

@ -67,14 +67,24 @@ func (c *cache) openStore(readOnly bool) error {
return nil return nil
} }
func (c *cache) deleteFromDB(key string) { func (c *cache) deleteFromDB(key string, batched bool) {
var recordDeleted bool var recordDeleted bool
err := c.db.Batch(func(tx *bbolt.Tx) error { var err error
b := tx.Bucket(defaultBucket) if batched {
key := []byte(key) err = c.db.Batch(func(tx *bbolt.Tx) error {
recordDeleted = b.Get(key) != nil b := tx.Bucket(defaultBucket)
return b.Delete(key) key := []byte(key)
}) recordDeleted = b.Get(key) != nil
return b.Delete(key)
})
} else {
err = c.db.Update(func(tx *bbolt.Tx) error {
b := tx.Bucket(defaultBucket)
key := []byte(key)
recordDeleted = b.Get(key) != nil
return b.Delete(key)
})
}
if err == nil { if err == nil {
c.metrics.Evict(StorageTypeDB) c.metrics.Evict(StorageTypeDB)

View file

@ -35,7 +35,7 @@ type Cache interface {
SetMode(mode.Mode) error SetMode(mode.Mode) error
SetLogger(*logger.Logger) SetLogger(*logger.Logger)
DumpInfo() Info DumpInfo() Info
Flush(context.Context, bool) error Flush(context.Context, bool, bool) error
Init() error Init() error
Open(ctx context.Context, readOnly bool) error Open(ctx context.Context, readOnly bool) error

View file

@ -18,6 +18,7 @@ func (s *Server) FlushCache(ctx context.Context, req *control.FlushCacheRequest)
for _, shardID := range s.getShardIDList(req.GetBody().GetShard_ID()) { for _, shardID := range s.getShardIDList(req.GetBody().GetShard_ID()) {
var prm engine.FlushWriteCachePrm var prm engine.FlushWriteCachePrm
prm.SetShardID(shardID) prm.SetShardID(shardID)
prm.SetSeal(req.GetBody().GetSeal())
_, err = s.s.FlushWriteCache(ctx, prm) _, err = s.s.FlushWriteCache(ctx, prm)
if err != nil { if err != nil {

Binary file not shown.

View file

@ -280,6 +280,8 @@ message FlushCacheRequest {
message Body { message Body {
// ID of the shard. // ID of the shard.
repeated bytes shard_ID = 1; repeated bytes shard_ID = 1;
// If true, then writecache will be left in read-only mode after flush completed.
bool seal = 2;
} }
Body body = 1; Body body = 1;

Binary file not shown.