diff --git a/cmd/frostfs-cli/modules/control/flush_cache.go b/cmd/frostfs-cli/modules/control/flush_cache.go index 48be393dc..7f632e9fc 100644 --- a/cmd/frostfs-cli/modules/control/flush_cache.go +++ b/cmd/frostfs-cli/modules/control/flush_cache.go @@ -8,6 +8,8 @@ import ( "github.com/spf13/cobra" ) +const sealFlag = "seal" + var flushCacheCmd = &cobra.Command{ Use: "flush-cache", Short: "Flush objects from the write-cache to the main storage", @@ -18,7 +20,10 @@ var flushCacheCmd = &cobra.Command{ func flushCache(cmd *cobra.Command, _ []string) { pk := key.Get(cmd) - req := &control.FlushCacheRequest{Body: new(control.FlushCacheRequest_Body)} + seal, _ := cmd.Flags().GetBool(sealFlag) + req := &control.FlushCacheRequest{Body: &control.FlushCacheRequest_Body{ + Seal: seal, + }} req.Body.Shard_ID = getShardIDList(cmd) signRequest(cmd, pk, req) @@ -44,6 +49,7 @@ func initControlFlushCacheCmd() { ff := flushCacheCmd.Flags() ff.StringSlice(shardIDFlag, nil, "List of shard IDs in base58 encoding") ff.Bool(shardAllFlag, false, "Process all shards") + ff.Bool(sealFlag, false, "Writecache will be left in read-only mode after flush completed") flushCacheCmd.MarkFlagsMutuallyExclusive(shardIDFlag, shardAllFlag) } diff --git a/pkg/local_object_storage/engine/writecache.go b/pkg/local_object_storage/engine/writecache.go index 00a40105e..d92a86f5d 100644 --- a/pkg/local_object_storage/engine/writecache.go +++ b/pkg/local_object_storage/engine/writecache.go @@ -17,6 +17,7 @@ import ( type FlushWriteCachePrm struct { shardID *shard.ID ignoreErrors bool + seal bool } // SetShardID is an option to set shard ID. @@ -26,11 +27,16 @@ func (p *FlushWriteCachePrm) SetShardID(id *shard.ID) { p.shardID = id } -// SetIgnoreErrors sets errors ignore flag.. +// SetIgnoreErrors sets errors ignore flag. func (p *FlushWriteCachePrm) SetIgnoreErrors(ignore bool) { p.ignoreErrors = ignore } +// SetSeal sets seal flag. +func (p *FlushWriteCachePrm) SetSeal(v bool) { + p.seal = v +} + // FlushWriteCacheRes groups the resulting values of FlushWriteCache operation. type FlushWriteCacheRes struct{} @@ -40,6 +46,7 @@ func (e *StorageEngine) FlushWriteCache(ctx context.Context, p FlushWriteCachePr trace.WithAttributes( attribute.String("shard)id", p.shardID.String()), attribute.Bool("ignore_errors", p.ignoreErrors), + attribute.Bool("seal", p.seal), )) defer span.End() @@ -53,6 +60,7 @@ func (e *StorageEngine) FlushWriteCache(ctx context.Context, p FlushWriteCachePr var prm shard.FlushWriteCachePrm prm.SetIgnoreErrors(p.ignoreErrors) + prm.SetSeal(p.seal) return FlushWriteCacheRes{}, sh.FlushWriteCache(ctx, prm) } diff --git a/pkg/local_object_storage/shard/writecache.go b/pkg/local_object_storage/shard/writecache.go index 7ce279c54..4e57a0497 100644 --- a/pkg/local_object_storage/shard/writecache.go +++ b/pkg/local_object_storage/shard/writecache.go @@ -12,6 +12,7 @@ import ( // FlushWriteCachePrm represents parameters of a `FlushWriteCache` operation. type FlushWriteCachePrm struct { ignoreErrors bool + seal bool } // SetIgnoreErrors sets the flag to ignore read-errors during flush. @@ -19,6 +20,11 @@ func (p *FlushWriteCachePrm) SetIgnoreErrors(ignore bool) { p.ignoreErrors = ignore } +// SetSeal sets the flag to left writecache in read-only mode after flush. +func (p *FlushWriteCachePrm) SetSeal(v bool) { + p.seal = v +} + // errWriteCacheDisabled is returned when an operation on write-cache is performed, // but write-cache is disabled. var errWriteCacheDisabled = errors.New("write-cache is disabled") @@ -29,6 +35,7 @@ func (s *Shard) FlushWriteCache(ctx context.Context, p FlushWriteCachePrm) error trace.WithAttributes( attribute.String("shard_id", s.ID().String()), attribute.Bool("ignore_errors", p.ignoreErrors), + attribute.Bool("seal", p.seal), )) defer span.End() @@ -47,5 +54,5 @@ func (s *Shard) FlushWriteCache(ctx context.Context, p FlushWriteCachePrm) error return ErrDegradedMode } - return s.writeCache.Flush(ctx, p.ignoreErrors) + return s.writeCache.Flush(ctx, p.ignoreErrors, p.seal) } diff --git a/pkg/local_object_storage/writecache/flush.go b/pkg/local_object_storage/writecache/flush.go index f4ceec8c8..17dcc1107 100644 --- a/pkg/local_object_storage/writecache/flush.go +++ b/pkg/local_object_storage/writecache/flush.go @@ -12,6 +12,7 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr" meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" @@ -33,6 +34,8 @@ const ( defaultFlushInterval = time.Second ) +var errIterationCompleted = errors.New("iteration completed") + // runFlushLoop starts background workers which periodically flush objects to the blobstor. func (c *cache) runFlushLoop(ctx context.Context) { if c.disableBackgroundFlush { @@ -229,7 +232,7 @@ func (c *cache) workerFlushSmall(ctx context.Context) { continue } - c.deleteFromDB(objInfo.addr) + c.deleteFromDB(objInfo.addr, true) } } @@ -270,19 +273,29 @@ func (c *cache) flushObject(ctx context.Context, obj *objectSDK.Object, data []b } // Flush flushes all objects from the write-cache to the main storage. -// Write-cache must be in readonly mode to ensure correctness of an operation and -// to prevent interference with background flush workers. -func (c *cache) Flush(ctx context.Context, ignoreErrors bool) error { - ctx, span := tracing.StartSpanFromContext(ctx, "Flush", +func (c *cache) Flush(ctx context.Context, ignoreErrors, seal bool) error { + ctx, span := tracing.StartSpanFromContext(ctx, "writecache.Flush", trace.WithAttributes( attribute.Bool("ignore_errors", ignoreErrors), + attribute.Bool("seal", seal), )) defer span.End() - c.modeMtx.RLock() - defer c.modeMtx.RUnlock() + c.modeMtx.Lock() // exclusive lock to not to conflict with background flush + defer c.modeMtx.Unlock() - return c.flush(ctx, ignoreErrors) + if err := c.flush(ctx, ignoreErrors); err != nil { + return err + } + + if seal { + m := c.mode | mode.ReadOnly + if err := c.setMode(ctx, m, ignoreErrors); err != nil { + return err + } + c.metrics.SetMode(m) + } + return nil } func (c *cache) flush(ctx context.Context, ignoreErrors bool) error { @@ -290,13 +303,53 @@ func (c *cache) flush(ctx context.Context, ignoreErrors bool) error { return err } - return c.db.View(func(tx *bbolt.Tx) error { + var last string + for { + batch, err := c.readNextDBBatch(ignoreErrors, last) + if err != nil { + return err + } + if len(batch) == 0 { + break + } + for _, item := range batch { + var obj objectSDK.Object + if err := obj.Unmarshal(item.data); err != nil { + c.reportFlushError(logs.FSTreeCantDecodeDBObjectAddress, item.address, metaerr.Wrap(err)) + if ignoreErrors { + continue + } + return err + } + + if err := c.flushObject(ctx, &obj, item.data, StorageTypeDB); err != nil { + return err + } + c.deleteFromDB(item.address, false) + } + last = batch[len(batch)-1].address + } + return nil +} + +type batchItem struct { + data []byte + address string +} + +func (c *cache) readNextDBBatch(ignoreErrors bool, last string) ([]batchItem, error) { + const batchSize = 100 + var batch []batchItem + err := c.db.View(func(tx *bbolt.Tx) error { var addr oid.Address b := tx.Bucket(defaultBucket) cs := b.Cursor() - for k, data := cs.Seek(nil); k != nil; k, data = cs.Next() { + for k, data := cs.Seek([]byte(last)); k != nil; k, data = cs.Next() { sa := string(k) + if sa == last { + continue + } if err := addr.DecodeString(sa); err != nil { c.reportFlushError(logs.FSTreeCantDecodeDBObjectAddress, sa, metaerr.Wrap(err)) if ignoreErrors { @@ -305,19 +358,15 @@ func (c *cache) flush(ctx context.Context, ignoreErrors bool) error { return err } - var obj objectSDK.Object - if err := obj.Unmarshal(data); err != nil { - c.reportFlushError(logs.FSTreeCantDecodeDBObjectAddress, sa, metaerr.Wrap(err)) - if ignoreErrors { - continue - } - return err - } - - if err := c.flushObject(ctx, &obj, data, StorageTypeDB); err != nil { - return err + batch = append(batch, batchItem{data: bytes.Clone(data), address: sa}) + if len(batch) == batchSize { + return errIterationCompleted } } return nil }) + if err == nil || errors.Is(err, errIterationCompleted) { + return batch, nil + } + return nil, err } diff --git a/pkg/local_object_storage/writecache/flush_test.go b/pkg/local_object_storage/writecache/flush_test.go index 20db1de95..4a243c5ec 100644 --- a/pkg/local_object_storage/writecache/flush_test.go +++ b/pkg/local_object_storage/writecache/flush_test.go @@ -147,7 +147,7 @@ func runFlushTest[Option any]( require.NoError(t, bs.SetMode(mode.ReadWrite)) require.NoError(t, mb.SetMode(mode.ReadWrite)) - require.NoError(t, wc.Flush(context.Background(), false)) + require.NoError(t, wc.Flush(context.Background(), false, false)) check(t, mb, bs, objects) }) @@ -159,8 +159,6 @@ func runFlushTest[Option any]( // Blobstor is read-only, so we expect en error from `flush` here. require.Error(t, wc.SetMode(mode.Degraded)) - // First move to read-only mode to close background workers. - require.NoError(t, wc.SetMode(mode.ReadOnly)) require.NoError(t, bs.SetMode(mode.ReadWrite)) require.NoError(t, mb.SetMode(mode.ReadWrite)) require.NoError(t, wc.SetMode(mode.Degraded)) @@ -177,14 +175,13 @@ func runFlushTest[Option any]( objects := putObjects(t, wc) f.InjectFn(t, wc) - require.NoError(t, wc.SetMode(mode.ReadOnly)) require.NoError(t, bs.SetMode(mode.ReadWrite)) require.NoError(t, mb.SetMode(mode.ReadWrite)) require.Equal(t, uint32(0), errCount.Load()) - require.Error(t, wc.Flush(context.Background(), false)) + require.Error(t, wc.Flush(context.Background(), false, false)) require.Greater(t, errCount.Load(), uint32(0)) - require.NoError(t, wc.Flush(context.Background(), true)) + require.NoError(t, wc.Flush(context.Background(), true, false)) check(t, mb, bs, objects) }) diff --git a/pkg/local_object_storage/writecache/mode.go b/pkg/local_object_storage/writecache/mode.go index e3ff2286b..7c6439fe9 100644 --- a/pkg/local_object_storage/writecache/mode.go +++ b/pkg/local_object_storage/writecache/mode.go @@ -25,7 +25,7 @@ func (c *cache) SetMode(m mode.Mode) error { c.modeMtx.Lock() defer c.modeMtx.Unlock() - err := c.setMode(ctx, m) + err := c.setMode(ctx, m, true) if err == nil { c.metrics.SetMode(m) } @@ -33,12 +33,12 @@ func (c *cache) SetMode(m mode.Mode) error { } // setMode applies new mode. Must be called with cache.modeMtx lock taken. -func (c *cache) setMode(ctx context.Context, m mode.Mode) error { +func (c *cache) setMode(ctx context.Context, m mode.Mode, ignoreErrors bool) error { var err error turnOffMeta := m.NoMetabase() if turnOffMeta && !c.mode.NoMetabase() { - err = c.flush(ctx, true) + err = c.flush(ctx, ignoreErrors) if err != nil { return err } diff --git a/pkg/local_object_storage/writecache/storage.go b/pkg/local_object_storage/writecache/storage.go index 5c25a3b33..6cc3b06d0 100644 --- a/pkg/local_object_storage/writecache/storage.go +++ b/pkg/local_object_storage/writecache/storage.go @@ -67,14 +67,24 @@ func (c *cache) openStore(readOnly bool) error { return nil } -func (c *cache) deleteFromDB(key string) { +func (c *cache) deleteFromDB(key string, batched bool) { var recordDeleted bool - err := c.db.Batch(func(tx *bbolt.Tx) error { - b := tx.Bucket(defaultBucket) - key := []byte(key) - recordDeleted = b.Get(key) != nil - return b.Delete(key) - }) + var err error + if batched { + err = c.db.Batch(func(tx *bbolt.Tx) error { + b := tx.Bucket(defaultBucket) + key := []byte(key) + recordDeleted = b.Get(key) != nil + return b.Delete(key) + }) + } else { + err = c.db.Update(func(tx *bbolt.Tx) error { + b := tx.Bucket(defaultBucket) + key := []byte(key) + recordDeleted = b.Get(key) != nil + return b.Delete(key) + }) + } if err == nil { c.metrics.Evict(StorageTypeDB) diff --git a/pkg/local_object_storage/writecache/writecache.go b/pkg/local_object_storage/writecache/writecache.go index 3d3501969..0549c27f7 100644 --- a/pkg/local_object_storage/writecache/writecache.go +++ b/pkg/local_object_storage/writecache/writecache.go @@ -35,7 +35,7 @@ type Cache interface { SetMode(mode.Mode) error SetLogger(*logger.Logger) DumpInfo() Info - Flush(context.Context, bool) error + Flush(context.Context, bool, bool) error Init() error Open(ctx context.Context, readOnly bool) error diff --git a/pkg/services/control/server/flush_cache.go b/pkg/services/control/server/flush_cache.go index 9ead530db..67ffa1f2c 100644 --- a/pkg/services/control/server/flush_cache.go +++ b/pkg/services/control/server/flush_cache.go @@ -18,6 +18,7 @@ func (s *Server) FlushCache(ctx context.Context, req *control.FlushCacheRequest) for _, shardID := range s.getShardIDList(req.GetBody().GetShard_ID()) { var prm engine.FlushWriteCachePrm prm.SetShardID(shardID) + prm.SetSeal(req.GetBody().GetSeal()) _, err = s.s.FlushWriteCache(ctx, prm) if err != nil { diff --git a/pkg/services/control/service.pb.go b/pkg/services/control/service.pb.go index 8b8739aba..90e643e29 100644 Binary files a/pkg/services/control/service.pb.go and b/pkg/services/control/service.pb.go differ diff --git a/pkg/services/control/service.proto b/pkg/services/control/service.proto index cbc3aaf54..e3b507387 100644 --- a/pkg/services/control/service.proto +++ b/pkg/services/control/service.proto @@ -280,6 +280,8 @@ message FlushCacheRequest { message Body { // ID of the shard. repeated bytes shard_ID = 1; + // If true, then writecache will be left in read-only mode after flush completed. + bool seal = 2; } Body body = 1; diff --git a/pkg/services/control/service_frostfs.pb.go b/pkg/services/control/service_frostfs.pb.go index e163e54b7..3ace081e4 100644 Binary files a/pkg/services/control/service_frostfs.pb.go and b/pkg/services/control/service_frostfs.pb.go differ