Allow to seal writecache after flush #886

Merged
fyrchik merged 4 commits from dstepanov-yadro/frostfs-node:feat/flush_and_disable_writecache into master 2024-09-04 19:51:05 +00:00
20 changed files with 1641 additions and 603 deletions

View file

@ -8,17 +8,23 @@ import (
"github.com/spf13/cobra" "github.com/spf13/cobra"
) )
const sealFlag = "seal"
var flushCacheCmd = &cobra.Command{ var flushCacheCmd = &cobra.Command{
Use: "flush-cache", Use: "flush-cache",
Short: "Flush objects from the write-cache to the main storage", Short: "Flush objects from the write-cache to the main storage",
Long: "Flush objects from the write-cache to the main storage", Long: "Flush objects from the write-cache to the main storage",
Run: flushCache, Run: flushCache,
Deprecated: "Flushing objects from writecache to the main storage is performed by writecache automatically. To flush and seal writecache use `frostfs-cli control shards writecache seal`.",
} }
func flushCache(cmd *cobra.Command, _ []string) { func flushCache(cmd *cobra.Command, _ []string) {
pk := key.Get(cmd) pk := key.Get(cmd)
req := &control.FlushCacheRequest{Body: new(control.FlushCacheRequest_Body)} seal, _ := cmd.Flags().GetBool(sealFlag)
req := &control.FlushCacheRequest{Body: &control.FlushCacheRequest_Body{
Seal: seal,
}}
req.Body.Shard_ID = getShardIDList(cmd) req.Body.Shard_ID = getShardIDList(cmd)
signRequest(cmd, pk, req) signRequest(cmd, pk, req)
@ -44,6 +50,7 @@ func initControlFlushCacheCmd() {
ff := flushCacheCmd.Flags() ff := flushCacheCmd.Flags()
ff.StringSlice(shardIDFlag, nil, "List of shard IDs in base58 encoding") ff.StringSlice(shardIDFlag, nil, "List of shard IDs in base58 encoding")
ff.Bool(shardAllFlag, false, "Process all shards") ff.Bool(shardAllFlag, false, "Process all shards")
ff.Bool(sealFlag, false, "Writecache will be left in read-only mode after flush completed")
flushCacheCmd.MarkFlagsMutuallyExclusive(shardIDFlag, shardAllFlag) flushCacheCmd.MarkFlagsMutuallyExclusive(shardIDFlag, shardAllFlag)
} }

View file

@ -17,6 +17,7 @@ func initControlShardsCmd() {
shardsCmd.AddCommand(evacuationShardCmd) shardsCmd.AddCommand(evacuationShardCmd)
shardsCmd.AddCommand(flushCacheCmd) shardsCmd.AddCommand(flushCacheCmd)
shardsCmd.AddCommand(doctorCmd) shardsCmd.AddCommand(doctorCmd)
shardsCmd.AddCommand(writecacheShardCmd)
initControlShardsListCmd() initControlShardsListCmd()
initControlSetShardModeCmd() initControlSetShardModeCmd()
@ -24,4 +25,5 @@ func initControlShardsCmd() {
initControlEvacuationShardCmd() initControlEvacuationShardCmd()
initControlFlushCacheCmd() initControlFlushCacheCmd()
initControlDoctorCmd() initControlDoctorCmd()
initControlShardsWritecacheCmd()
} }

View file

@ -0,0 +1,73 @@
package control
import (
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc/client"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/key"
commonCmd "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/internal/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control"
"github.com/mr-tron/base58"
"github.com/spf13/cobra"
)
var writecacheShardCmd = &cobra.Command{
Use: "writecache",
Short: "Operations with storage node's write-cache",
Long: "Operations with storage node's write-cache",
}
var sealWritecacheShardCmd = &cobra.Command{
Use: "seal",
Short: "Flush objects from write-cache and move write-cache to degraded read only mode.",
Long: "Flush all the objects from the write-cache to the main storage and move the write-cache to the degraded read only mode: write-cache will be empty and no objects will be put in it.",
Run: sealWritecache,
}
func sealWritecache(cmd *cobra.Command, _ []string) {
pk := key.Get(cmd)
ignoreErrors, _ := cmd.Flags().GetBool(ignoreErrorsFlag)
req := &control.SealWriteCacheRequest{Body: &control.SealWriteCacheRequest_Body{
Shard_ID: getShardIDList(cmd),
IgnoreErrors: ignoreErrors,
}}
signRequest(cmd, pk, req)
cli := getClient(cmd, pk)
var resp *control.SealWriteCacheResponse
var err error
err = cli.ExecRaw(func(client *client.Client) error {
resp, err = control.SealWriteCache(client, req)
return err
})
commonCmd.ExitOnErr(cmd, "rpc error: %w", err)
verifyResponse(cmd, resp.GetSignature(), resp.GetBody())
var success, failed uint
for _, res := range resp.GetBody().GetResults() {
if res.GetSuccess() {
success++
cmd.Printf("Shard %s: OK\n", base58.Encode(res.GetShard_ID()))
} else {
failed++
cmd.Printf("Shard %s: failed with error %q\n", base58.Encode(res.GetShard_ID()), res.GetError())
}
}
cmd.Printf("Total: %d success, %d failed\n", success, failed)
}
func initControlShardsWritecacheCmd() {
writecacheShardCmd.AddCommand(sealWritecacheShardCmd)
initControlFlags(sealWritecacheShardCmd)
ff := sealWritecacheShardCmd.Flags()
ff.StringSlice(shardIDFlag, nil, "List of shard IDs in base58 encoding")
ff.Bool(shardAllFlag, false, "Process all shards")
ff.Bool(ignoreErrorsFlag, true, "Skip invalid/unreadable objects")
sealWritecacheShardCmd.MarkFlagsMutuallyExclusive(shardIDFlag, shardAllFlag)
}

View file

@ -2,6 +2,7 @@ package engine
import ( import (
"context" "context"
"sync"
"time" "time"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
@ -11,12 +12,14 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
"go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace" "go.opentelemetry.io/otel/trace"
"golang.org/x/sync/errgroup"
) )
// FlushWriteCachePrm groups the parameters of FlushWriteCache operation. // FlushWriteCachePrm groups the parameters of FlushWriteCache operation.
type FlushWriteCachePrm struct { type FlushWriteCachePrm struct {
shardID *shard.ID shardID *shard.ID
ignoreErrors bool ignoreErrors bool
seal bool
} }
// SetShardID is an option to set shard ID. // SetShardID is an option to set shard ID.
@ -26,11 +29,16 @@ func (p *FlushWriteCachePrm) SetShardID(id *shard.ID) {
p.shardID = id p.shardID = id
} }
// SetIgnoreErrors sets errors ignore flag.. // SetIgnoreErrors sets errors ignore flag.
func (p *FlushWriteCachePrm) SetIgnoreErrors(ignore bool) { func (p *FlushWriteCachePrm) SetIgnoreErrors(ignore bool) {
p.ignoreErrors = ignore p.ignoreErrors = ignore
} }
// SetSeal sets seal flag.
func (p *FlushWriteCachePrm) SetSeal(v bool) {
p.seal = v
}
// FlushWriteCacheRes groups the resulting values of FlushWriteCache operation. // FlushWriteCacheRes groups the resulting values of FlushWriteCache operation.
type FlushWriteCacheRes struct{} type FlushWriteCacheRes struct{}
@ -38,8 +46,9 @@ type FlushWriteCacheRes struct{}
func (e *StorageEngine) FlushWriteCache(ctx context.Context, p FlushWriteCachePrm) (FlushWriteCacheRes, error) { func (e *StorageEngine) FlushWriteCache(ctx context.Context, p FlushWriteCachePrm) (FlushWriteCacheRes, error) {
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.FlushWriteCache", ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.FlushWriteCache",
trace.WithAttributes( trace.WithAttributes(
attribute.String("shard)id", p.shardID.String()), attribute.String("shard_id", p.shardID.String()),
attribute.Bool("ignore_errors", p.ignoreErrors), attribute.Bool("ignore_errors", p.ignoreErrors),
attribute.Bool("seal", p.seal),
)) ))
defer span.End() defer span.End()
@ -53,10 +62,84 @@ func (e *StorageEngine) FlushWriteCache(ctx context.Context, p FlushWriteCachePr
var prm shard.FlushWriteCachePrm var prm shard.FlushWriteCachePrm
prm.SetIgnoreErrors(p.ignoreErrors) prm.SetIgnoreErrors(p.ignoreErrors)
prm.SetSeal(p.seal)
return FlushWriteCacheRes{}, sh.FlushWriteCache(ctx, prm) return FlushWriteCacheRes{}, sh.FlushWriteCache(ctx, prm)
} }
type SealWriteCachePrm struct {
ShardIDs []*shard.ID
IgnoreErrors bool
}
type ShardSealResult struct {
ShardID *shard.ID
Success bool
ErrorMsg string
}
type SealWriteCacheRes struct {
ShardResults []ShardSealResult
}
// SealWriteCache flushed all data to blobstore and moves write-cache to degraded read only mode.
func (e *StorageEngine) SealWriteCache(ctx context.Context, prm SealWriteCachePrm) (SealWriteCacheRes, error) {
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.SealWriteCache",
trace.WithAttributes(
attribute.Int("shard_id_count", len(prm.ShardIDs)),
attribute.Bool("ignore_errors", prm.IgnoreErrors),
))
defer span.End()
res := SealWriteCacheRes{
ShardResults: make([]ShardSealResult, 0, len(prm.ShardIDs)),
}
resGuard := &sync.Mutex{}
eg, egCtx := errgroup.WithContext(ctx)
for _, shardID := range prm.ShardIDs {
shardID := shardID
eg.Go(func() error {
e.mtx.RLock()
sh, ok := e.shards[shardID.String()]
e.mtx.RUnlock()
if !ok {
resGuard.Lock()
defer resGuard.Unlock()
res.ShardResults = append(res.ShardResults, ShardSealResult{
ShardID: shardID,
ErrorMsg: errShardNotFound.Error(),
})
return nil
}
err := sh.SealWriteCache(egCtx, shard.SealWriteCachePrm{IgnoreErrors: prm.IgnoreErrors})
resGuard.Lock()
defer resGuard.Unlock()
if err != nil {
res.ShardResults = append(res.ShardResults, ShardSealResult{
ShardID: shardID,
ErrorMsg: err.Error(),
})
} else {
res.ShardResults = append(res.ShardResults, ShardSealResult{
ShardID: shardID,
Success: true,
})
}
return nil
})
}
if err := eg.Wait(); err != nil {
return SealWriteCacheRes{}, err
}
return res, nil
}
type writeCacheMetrics struct { type writeCacheMetrics struct {
shardID string shardID string
metrics metrics.WriteCacheMetrics metrics metrics.WriteCacheMetrics

View file

@ -12,6 +12,7 @@ import (
// FlushWriteCachePrm represents parameters of a `FlushWriteCache` operation. // FlushWriteCachePrm represents parameters of a `FlushWriteCache` operation.
type FlushWriteCachePrm struct { type FlushWriteCachePrm struct {
ignoreErrors bool ignoreErrors bool
seal bool
} }
// SetIgnoreErrors sets the flag to ignore read-errors during flush. // SetIgnoreErrors sets the flag to ignore read-errors during flush.
@ -19,6 +20,11 @@ func (p *FlushWriteCachePrm) SetIgnoreErrors(ignore bool) {
p.ignoreErrors = ignore p.ignoreErrors = ignore
} }
// SetSeal sets the flag to left writecache in read-only mode after flush.
func (p *FlushWriteCachePrm) SetSeal(v bool) {
p.seal = v
}
// errWriteCacheDisabled is returned when an operation on write-cache is performed, // errWriteCacheDisabled is returned when an operation on write-cache is performed,
// but write-cache is disabled. // but write-cache is disabled.
var errWriteCacheDisabled = errors.New("write-cache is disabled") var errWriteCacheDisabled = errors.New("write-cache is disabled")
@ -29,6 +35,7 @@ func (s *Shard) FlushWriteCache(ctx context.Context, p FlushWriteCachePrm) error
trace.WithAttributes( trace.WithAttributes(
attribute.String("shard_id", s.ID().String()), attribute.String("shard_id", s.ID().String()),
attribute.Bool("ignore_errors", p.ignoreErrors), attribute.Bool("ignore_errors", p.ignoreErrors),
attribute.Bool("seal", p.seal),
)) ))
defer span.End() defer span.End()
@ -47,5 +54,35 @@ func (s *Shard) FlushWriteCache(ctx context.Context, p FlushWriteCachePrm) error
return ErrDegradedMode return ErrDegradedMode
} }
return s.writeCache.Flush(ctx, p.ignoreErrors) return s.writeCache.Flush(ctx, p.ignoreErrors, p.seal)
}
type SealWriteCachePrm struct {
IgnoreErrors bool
}
// SealWriteCache flushes all data from the write-cache and moves it to degraded read only mode.
func (s *Shard) SealWriteCache(ctx context.Context, p SealWriteCachePrm) error {
ctx, span := tracing.StartSpanFromContext(ctx, "Shard.SealWriteCache",
trace.WithAttributes(
attribute.String("shard_id", s.ID().String()),
attribute.Bool("ignore_errors", p.IgnoreErrors),
))
defer span.End()
if !s.hasWriteCache() {
return errWriteCacheDisabled
}
s.m.RLock()
defer s.m.RUnlock()
if s.info.Mode.ReadOnly() {
return ErrReadOnlyMode
}
if s.info.Mode.NoMetabase() {
return ErrDegradedMode
}
return s.writeCache.Seal(ctx, p.IgnoreErrors)
} }

View file

@ -18,6 +18,7 @@ import (
// Delete removes object from write-cache. // Delete removes object from write-cache.
// //
// Returns an error of type apistatus.ObjectNotFound if object is missing in write-cache. // Returns an error of type apistatus.ObjectNotFound if object is missing in write-cache.
// Returns ErrNotInitialized if write-cache has not been initialized yet.
func (c *cache) Delete(ctx context.Context, addr oid.Address) error { func (c *cache) Delete(ctx context.Context, addr oid.Address) error {
ctx, span := tracing.StartSpanFromContext(ctx, "writecache.Delete", ctx, span := tracing.StartSpanFromContext(ctx, "writecache.Delete",
trace.WithAttributes( trace.WithAttributes(
@ -32,7 +33,9 @@ func (c *cache) Delete(ctx context.Context, addr oid.Address) error {
c.metrics.Delete(time.Since(startedAt), deleted, storageType) c.metrics.Delete(time.Since(startedAt), deleted, storageType)
}() }()
c.modeMtx.RLock() if !c.modeMtx.TryRLock() {
return ErrNotInitialized
}
defer c.modeMtx.RUnlock() defer c.modeMtx.RUnlock()
if c.readOnly() { if c.readOnly() {
return ErrReadOnly return ErrReadOnly

View file

@ -12,6 +12,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase" meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
@ -33,6 +34,8 @@ const (
defaultFlushInterval = time.Second defaultFlushInterval = time.Second
) )
var errIterationCompleted = errors.New("iteration completed")
// runFlushLoop starts background workers which periodically flush objects to the blobstor. // runFlushLoop starts background workers which periodically flush objects to the blobstor.
func (c *cache) runFlushLoop(ctx context.Context) { func (c *cache) runFlushLoop(ctx context.Context) {
if c.disableBackgroundFlush { if c.disableBackgroundFlush {
@ -135,13 +138,11 @@ func (c *cache) flushSmallObjects(ctx context.Context) {
} }
} }
c.modeMtx.RUnlock()
if count == 0 { if count == 0 {
c.modeMtx.RUnlock()
break break
} }
c.modeMtx.RUnlock()
c.log.Debug(logs.WritecacheTriedToFlushItemsFromWritecache, c.log.Debug(logs.WritecacheTriedToFlushItemsFromWritecache,
zap.Int("count", count), zap.Int("count", count),
zap.String("start", base58.Encode(lastKey))) zap.String("start", base58.Encode(lastKey)))
@ -229,7 +230,7 @@ func (c *cache) workerFlushSmall(ctx context.Context) {
continue continue
} }
c.deleteFromDB(objInfo.addr) c.deleteFromDB(objInfo.addr, true)
} }
} }
@ -270,19 +271,29 @@ func (c *cache) flushObject(ctx context.Context, obj *objectSDK.Object, data []b
} }
// Flush flushes all objects from the write-cache to the main storage. // Flush flushes all objects from the write-cache to the main storage.
// Write-cache must be in readonly mode to ensure correctness of an operation and func (c *cache) Flush(ctx context.Context, ignoreErrors, seal bool) error {
// to prevent interference with background flush workers. ctx, span := tracing.StartSpanFromContext(ctx, "writecache.Flush",
func (c *cache) Flush(ctx context.Context, ignoreErrors bool) error {
ctx, span := tracing.StartSpanFromContext(ctx, "Flush",
trace.WithAttributes( trace.WithAttributes(
attribute.Bool("ignore_errors", ignoreErrors), attribute.Bool("ignore_errors", ignoreErrors),
attribute.Bool("seal", seal),
)) ))
defer span.End() defer span.End()
c.modeMtx.RLock() c.modeMtx.Lock() // exclusive lock to not to conflict with background flush
defer c.modeMtx.RUnlock() defer c.modeMtx.Unlock()
return c.flush(ctx, ignoreErrors) if err := c.flush(ctx, ignoreErrors); err != nil {
return err
}
if seal {
m := c.mode | mode.ReadOnly
if err := c.setMode(ctx, m, ignoreErrors); err != nil {
return err
}
c.metrics.SetMode(m)
}
return nil
} }
func (c *cache) flush(ctx context.Context, ignoreErrors bool) error { func (c *cache) flush(ctx context.Context, ignoreErrors bool) error {
@ -290,13 +301,53 @@ func (c *cache) flush(ctx context.Context, ignoreErrors bool) error {
return err return err
} }
return c.db.View(func(tx *bbolt.Tx) error { var last string
for {
batch, err := c.readNextDBBatch(ignoreErrors, last)
if err != nil {
return err
}
if len(batch) == 0 {
break
}
for _, item := range batch {
var obj objectSDK.Object
if err := obj.Unmarshal(item.data); err != nil {
c.reportFlushError(logs.FSTreeCantDecodeDBObjectAddress, item.address, metaerr.Wrap(err))
if ignoreErrors {
continue
}
return err
}
if err := c.flushObject(ctx, &obj, item.data, StorageTypeDB); err != nil {
return err
}
c.deleteFromDB(item.address, false)
}
last = batch[len(batch)-1].address
}
return nil
}
type batchItem struct {
data []byte
address string
}
func (c *cache) readNextDBBatch(ignoreErrors bool, last string) ([]batchItem, error) {
const batchSize = 100
var batch []batchItem
err := c.db.View(func(tx *bbolt.Tx) error {
var addr oid.Address var addr oid.Address
b := tx.Bucket(defaultBucket) b := tx.Bucket(defaultBucket)
cs := b.Cursor() cs := b.Cursor()
for k, data := cs.Seek(nil); k != nil; k, data = cs.Next() { for k, data := cs.Seek([]byte(last)); k != nil; k, data = cs.Next() {
sa := string(k) sa := string(k)
if sa == last {
continue
}
if err := addr.DecodeString(sa); err != nil { if err := addr.DecodeString(sa); err != nil {
c.reportFlushError(logs.FSTreeCantDecodeDBObjectAddress, sa, metaerr.Wrap(err)) c.reportFlushError(logs.FSTreeCantDecodeDBObjectAddress, sa, metaerr.Wrap(err))
if ignoreErrors { if ignoreErrors {
@ -305,19 +356,15 @@ func (c *cache) flush(ctx context.Context, ignoreErrors bool) error {
return err return err
} }
var obj objectSDK.Object batch = append(batch, batchItem{data: bytes.Clone(data), address: sa})
if err := obj.Unmarshal(data); err != nil { if len(batch) == batchSize {
c.reportFlushError(logs.FSTreeCantDecodeDBObjectAddress, sa, metaerr.Wrap(err)) return errIterationCompleted
if ignoreErrors {
continue
}
return err
}
if err := c.flushObject(ctx, &obj, data, StorageTypeDB); err != nil {
return err
} }
} }
return nil return nil
}) })
if err == nil || errors.Is(err, errIterationCompleted) {
return batch, nil
}
return nil, err
} }

View file

@ -147,7 +147,7 @@ func runFlushTest[Option any](
require.NoError(t, bs.SetMode(mode.ReadWrite)) require.NoError(t, bs.SetMode(mode.ReadWrite))
require.NoError(t, mb.SetMode(mode.ReadWrite)) require.NoError(t, mb.SetMode(mode.ReadWrite))
require.NoError(t, wc.Flush(context.Background(), false)) require.NoError(t, wc.Flush(context.Background(), false, false))
check(t, mb, bs, objects) check(t, mb, bs, objects)
}) })
@ -159,8 +159,6 @@ func runFlushTest[Option any](
// Blobstor is read-only, so we expect en error from `flush` here. // Blobstor is read-only, so we expect en error from `flush` here.
require.Error(t, wc.SetMode(mode.Degraded)) require.Error(t, wc.SetMode(mode.Degraded))
// First move to read-only mode to close background workers.
require.NoError(t, wc.SetMode(mode.ReadOnly))
require.NoError(t, bs.SetMode(mode.ReadWrite)) require.NoError(t, bs.SetMode(mode.ReadWrite))
require.NoError(t, mb.SetMode(mode.ReadWrite)) require.NoError(t, mb.SetMode(mode.ReadWrite))
require.NoError(t, wc.SetMode(mode.Degraded)) require.NoError(t, wc.SetMode(mode.Degraded))
@ -177,14 +175,13 @@ func runFlushTest[Option any](
objects := putObjects(t, wc) objects := putObjects(t, wc)
f.InjectFn(t, wc) f.InjectFn(t, wc)
require.NoError(t, wc.SetMode(mode.ReadOnly))
require.NoError(t, bs.SetMode(mode.ReadWrite)) require.NoError(t, bs.SetMode(mode.ReadWrite))
require.NoError(t, mb.SetMode(mode.ReadWrite)) require.NoError(t, mb.SetMode(mode.ReadWrite))
require.Equal(t, uint32(0), errCount.Load()) require.Equal(t, uint32(0), errCount.Load())
require.Error(t, wc.Flush(context.Background(), false)) require.Error(t, wc.Flush(context.Background(), false, false))
require.Greater(t, errCount.Load(), uint32(0)) require.Greater(t, errCount.Load(), uint32(0))
require.NoError(t, wc.Flush(context.Background(), true)) require.NoError(t, wc.Flush(context.Background(), true, false))
check(t, mb, bs, objects) check(t, mb, bs, objects)
}) })

View file

@ -25,7 +25,7 @@ func (c *cache) SetMode(m mode.Mode) error {
c.modeMtx.Lock() c.modeMtx.Lock()
defer c.modeMtx.Unlock() defer c.modeMtx.Unlock()
err := c.setMode(ctx, m) err := c.setMode(ctx, m, true)
if err == nil { if err == nil {
c.metrics.SetMode(m) c.metrics.SetMode(m)
} }
@ -33,12 +33,12 @@ func (c *cache) SetMode(m mode.Mode) error {
} }
// setMode applies new mode. Must be called with cache.modeMtx lock taken. // setMode applies new mode. Must be called with cache.modeMtx lock taken.
func (c *cache) setMode(ctx context.Context, m mode.Mode) error { func (c *cache) setMode(ctx context.Context, m mode.Mode, ignoreErrors bool) error {
var err error var err error
turnOffMeta := m.NoMetabase() turnOffMeta := m.NoMetabase()
if turnOffMeta && !c.mode.NoMetabase() { if turnOffMeta && !c.mode.NoMetabase() {
err = c.flush(ctx, true) err = c.flush(ctx, ignoreErrors)
if err != nil { if err != nil {
return err return err
} }

View file

@ -34,7 +34,9 @@ func (c *cache) Put(ctx context.Context, prm common.PutPrm) (common.PutRes, erro
c.metrics.Put(time.Since(startedAt), added, storageType) c.metrics.Put(time.Since(startedAt), added, storageType)
}() }()
c.modeMtx.RLock() if !c.modeMtx.TryRLock() {
return common.PutRes{}, ErrNotInitialized
}
defer c.modeMtx.RUnlock() defer c.modeMtx.RUnlock()
if c.readOnly() { if c.readOnly() {
return common.PutRes{}, ErrReadOnly return common.PutRes{}, ErrReadOnly

View file

@ -0,0 +1,28 @@
package writecache
import (
"context"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)
func (c *cache) Seal(ctx context.Context, ignoreErrors bool) error {
ctx, span := tracing.StartSpanFromContext(ctx, "writecache.Seal",
trace.WithAttributes(
attribute.Bool("ignore_errors", ignoreErrors),
))
defer span.End()
c.modeMtx.Lock()
defer c.modeMtx.Unlock()
// flush will be done by setMode
err := c.setMode(ctx, mode.DegradedReadOnly, ignoreErrors)
if err == nil {
c.metrics.SetMode(mode.DegradedReadOnly)
}
return err
}

View file

@ -67,14 +67,24 @@ func (c *cache) openStore(readOnly bool) error {
return nil return nil
} }
func (c *cache) deleteFromDB(key string) { func (c *cache) deleteFromDB(key string, batched bool) {
var recordDeleted bool var recordDeleted bool
err := c.db.Batch(func(tx *bbolt.Tx) error { var err error
b := tx.Bucket(defaultBucket) if batched {
key := []byte(key) err = c.db.Batch(func(tx *bbolt.Tx) error {
recordDeleted = b.Get(key) != nil b := tx.Bucket(defaultBucket)
return b.Delete(key) key := []byte(key)
}) recordDeleted = b.Get(key) != nil
return b.Delete(key)
})
} else {
err = c.db.Update(func(tx *bbolt.Tx) error {
b := tx.Bucket(defaultBucket)
key := []byte(key)
recordDeleted = b.Get(key) != nil
return b.Delete(key)
})
}
if err == nil { if err == nil {
c.metrics.Evict(StorageTypeDB) c.metrics.Evict(StorageTypeDB)

View file

@ -35,7 +35,8 @@ type Cache interface {
SetMode(mode.Mode) error SetMode(mode.Mode) error
SetLogger(*logger.Logger) SetLogger(*logger.Logger)
DumpInfo() Info DumpInfo() Info
Flush(context.Context, bool) error Flush(context.Context, bool, bool) error
Seal(context.Context, bool) error
Init() error Init() error
Open(ctx context.Context, readOnly bool) error Open(ctx context.Context, readOnly bool) error

View file

@ -24,6 +24,7 @@ const (
rpcGetChainLocalOverride = "GetChainLocalOverride" rpcGetChainLocalOverride = "GetChainLocalOverride"
rpcListChainLocalOverrides = "ListChainLocalOverrides" rpcListChainLocalOverrides = "ListChainLocalOverrides"
rpcRemoveChainLocalOverride = "RemoveChainLocalOverride" rpcRemoveChainLocalOverride = "RemoveChainLocalOverride"
rpcSealWriteCache = "SealWriteCache"
) )
// HealthCheck executes ControlService.HealthCheck RPC. // HealthCheck executes ControlService.HealthCheck RPC.
@ -264,3 +265,16 @@ func RemoveChainLocalOverride(cli *client.Client, req *RemoveChainLocalOverrideR
return wResp.message, nil return wResp.message, nil
} }
// SealWriteCache executes ControlService.SealWriteCache RPC.
func SealWriteCache(cli *client.Client, req *SealWriteCacheRequest, opts ...client.CallOption) (*SealWriteCacheResponse, error) {
wResp := newResponseWrapper[SealWriteCacheResponse]()
wReq := &requestWrapper{m: req}
err := client.SendUnary(cli, common.CallMethodInfoUnary(serviceName, rpcSealWriteCache), wReq, wResp, opts...)
if err != nil {
return nil, err
}
return wResp.message, nil
}

View file

@ -18,6 +18,7 @@ func (s *Server) FlushCache(ctx context.Context, req *control.FlushCacheRequest)
for _, shardID := range s.getShardIDList(req.GetBody().GetShard_ID()) { for _, shardID := range s.getShardIDList(req.GetBody().GetShard_ID()) {
var prm engine.FlushWriteCachePrm var prm engine.FlushWriteCachePrm
prm.SetShardID(shardID) prm.SetShardID(shardID)
prm.SetSeal(req.GetBody().GetSeal())
_, err = s.s.FlushWriteCache(ctx, prm) _, err = s.s.FlushWriteCache(ctx, prm)
if err != nil { if err != nil {

View file

@ -0,0 +1,48 @@
package control
import (
"context"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
func (s *Server) SealWriteCache(ctx context.Context, req *control.SealWriteCacheRequest) (*control.SealWriteCacheResponse, error) {
err := s.isValidRequest(req)
if err != nil {
return nil, status.Error(codes.PermissionDenied, err.Error())
}
prm := engine.SealWriteCachePrm{
ShardIDs: s.getShardIDList(req.GetBody().GetShard_ID()),
IgnoreErrors: req.GetBody().GetIgnoreErrors(),
}
res, err := s.s.SealWriteCache(ctx, prm)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
resp := &control.SealWriteCacheResponse{Body: &control.SealWriteCacheResponse_Body{}}
for _, r := range res.ShardResults {
if r.Success {
resp.Body.Results = append(resp.GetBody().GetResults(), &control.SealWriteCacheResponse_Body_Status{
Shard_ID: *r.ShardID,
Success: true,
})
} else {
resp.Body.Results = append(resp.GetBody().GetResults(), &control.SealWriteCacheResponse_Body_Status{
Shard_ID: *r.ShardID,
Error: r.ErrorMsg,
})
}
}
err = SignMessage(s.key, resp)
if err != nil {
return nil, err
}
return resp, nil
}

File diff suppressed because it is too large Load diff

View file

@ -56,6 +56,9 @@ service ControlService {
// Remove local access policy engine overrides stored in the node by chaind id. // Remove local access policy engine overrides stored in the node by chaind id.
rpc RemoveChainLocalOverride (RemoveChainLocalOverrideRequest) returns (RemoveChainLocalOverrideResponse); rpc RemoveChainLocalOverride (RemoveChainLocalOverrideRequest) returns (RemoveChainLocalOverrideResponse);
// Flush objects from write-cache and move it to degraded read only mode.
rpc SealWriteCache(SealWriteCacheRequest) returns (SealWriteCacheResponse);
} }
// Health check request. // Health check request.
@ -280,6 +283,8 @@ message FlushCacheRequest {
message Body { message Body {
// ID of the shard. // ID of the shard.
repeated bytes shard_ID = 1; repeated bytes shard_ID = 1;
// If true, then writecache will be left in read-only mode after flush completed.
bool seal = 2;
} }
Body body = 1; Body body = 1;
@ -523,3 +528,32 @@ message RemoveChainLocalOverrideResponse {
Signature signature = 2; Signature signature = 2;
} }
message SealWriteCacheRequest {
// Request body structure.
message Body {
// ID of the shard.
repeated bytes shard_ID = 1;
// Flag indicating whether object read errors should be ignored.
bool ignore_errors = 2;
}
Body body = 1;
Signature signature = 2;
}
message SealWriteCacheResponse {
message Body {
message Status {
bytes shard_ID = 1;
bool success = 2;
string error = 3;
}
repeated Status results = 1;
}
Body body = 1;
Signature signature = 2;
}

View file

@ -1180,6 +1180,7 @@ func (x *FlushCacheRequest_Body) StableSize() (size int) {
return 0 return 0
} }
size += proto.RepeatedBytesSize(1, x.Shard_ID) size += proto.RepeatedBytesSize(1, x.Shard_ID)
size += proto.BoolSize(2, x.Seal)
return size return size
} }
@ -1200,6 +1201,7 @@ func (x *FlushCacheRequest_Body) StableMarshal(buf []byte) []byte {
} }
var offset int var offset int
offset += proto.RepeatedBytesMarshal(1, buf[offset:], x.Shard_ID) offset += proto.RepeatedBytesMarshal(1, buf[offset:], x.Shard_ID)
offset += proto.BoolMarshal(2, buf[offset:], x.Seal)
return buf return buf
} }
@ -2753,3 +2755,216 @@ func (x *RemoveChainLocalOverrideResponse) ReadSignedData(buf []byte) ([]byte, e
func (x *RemoveChainLocalOverrideResponse) SetSignature(sig *Signature) { func (x *RemoveChainLocalOverrideResponse) SetSignature(sig *Signature) {
x.Signature = sig x.Signature = sig
} }
// StableSize returns the size of x in protobuf format.
//
// Structures with the same field values have the same binary size.
func (x *SealWriteCacheRequest_Body) StableSize() (size int) {
if x == nil {
return 0
}
size += proto.RepeatedBytesSize(1, x.Shard_ID)
size += proto.BoolSize(2, x.IgnoreErrors)
return size
}
// StableMarshal marshals x in protobuf binary format with stable field order.
//
// If buffer length is less than x.StableSize(), new buffer is allocated.
//
// Returns any error encountered which did not allow writing the data completely.
// Otherwise, returns the buffer in which the data is written.
//
// Structures with the same field values have the same binary format.
func (x *SealWriteCacheRequest_Body) StableMarshal(buf []byte) []byte {
if x == nil {
return []byte{}
}
if buf == nil {
buf = make([]byte, x.StableSize())
}
var offset int
offset += proto.RepeatedBytesMarshal(1, buf[offset:], x.Shard_ID)
offset += proto.BoolMarshal(2, buf[offset:], x.IgnoreErrors)
return buf
}
// StableSize returns the size of x in protobuf format.
//
// Structures with the same field values have the same binary size.
func (x *SealWriteCacheRequest) StableSize() (size int) {
if x == nil {
return 0
}
size += proto.NestedStructureSize(1, x.Body)
size += proto.NestedStructureSize(2, x.Signature)
return size
}
// StableMarshal marshals x in protobuf binary format with stable field order.
//
// If buffer length is less than x.StableSize(), new buffer is allocated.
//
// Returns any error encountered which did not allow writing the data completely.
// Otherwise, returns the buffer in which the data is written.
//
// Structures with the same field values have the same binary format.
func (x *SealWriteCacheRequest) StableMarshal(buf []byte) []byte {
if x == nil {
return []byte{}
}
if buf == nil {
buf = make([]byte, x.StableSize())
}
var offset int
offset += proto.NestedStructureMarshal(1, buf[offset:], x.Body)
offset += proto.NestedStructureMarshal(2, buf[offset:], x.Signature)
return buf
}
// ReadSignedData fills buf with signed data of x.
// If buffer length is less than x.SignedDataSize(), new buffer is allocated.
//
// Returns any error encountered which did not allow writing the data completely.
// Otherwise, returns the buffer in which the data is written.
//
// Structures with the same field values have the same signed data.
func (x *SealWriteCacheRequest) SignedDataSize() int {
return x.GetBody().StableSize()
}
// SignedDataSize returns size of the request signed data in bytes.
//
// Structures with the same field values have the same signed data size.
func (x *SealWriteCacheRequest) ReadSignedData(buf []byte) ([]byte, error) {
return x.GetBody().StableMarshal(buf), nil
}
func (x *SealWriteCacheRequest) SetSignature(sig *Signature) {
x.Signature = sig
}
// StableSize returns the size of x in protobuf format.
//
// Structures with the same field values have the same binary size.
func (x *SealWriteCacheResponse_Body_Status) StableSize() (size int) {
if x == nil {
return 0
}
size += proto.BytesSize(1, x.Shard_ID)
size += proto.BoolSize(2, x.Success)
size += proto.StringSize(3, x.Error)
return size
}
// StableMarshal marshals x in protobuf binary format with stable field order.
//
// If buffer length is less than x.StableSize(), new buffer is allocated.
//
// Returns any error encountered which did not allow writing the data completely.
// Otherwise, returns the buffer in which the data is written.
//
// Structures with the same field values have the same binary format.
func (x *SealWriteCacheResponse_Body_Status) StableMarshal(buf []byte) []byte {
if x == nil {
return []byte{}
}
if buf == nil {
buf = make([]byte, x.StableSize())
}
var offset int
offset += proto.BytesMarshal(1, buf[offset:], x.Shard_ID)
offset += proto.BoolMarshal(2, buf[offset:], x.Success)
offset += proto.StringMarshal(3, buf[offset:], x.Error)
return buf
}
// StableSize returns the size of x in protobuf format.
//
// Structures with the same field values have the same binary size.
func (x *SealWriteCacheResponse_Body) StableSize() (size int) {
if x == nil {
return 0
}
for i := range x.Results {
size += proto.NestedStructureSize(1, x.Results[i])
}
return size
}
// StableMarshal marshals x in protobuf binary format with stable field order.
//
// If buffer length is less than x.StableSize(), new buffer is allocated.
//
// Returns any error encountered which did not allow writing the data completely.
// Otherwise, returns the buffer in which the data is written.
//
// Structures with the same field values have the same binary format.
func (x *SealWriteCacheResponse_Body) StableMarshal(buf []byte) []byte {
if x == nil {
return []byte{}
}
if buf == nil {
buf = make([]byte, x.StableSize())
}
var offset int
for i := range x.Results {
offset += proto.NestedStructureMarshal(1, buf[offset:], x.Results[i])
}
return buf
}
// StableSize returns the size of x in protobuf format.
//
// Structures with the same field values have the same binary size.
func (x *SealWriteCacheResponse) StableSize() (size int) {
if x == nil {
return 0
}
size += proto.NestedStructureSize(1, x.Body)
size += proto.NestedStructureSize(2, x.Signature)
return size
}
// StableMarshal marshals x in protobuf binary format with stable field order.
//
// If buffer length is less than x.StableSize(), new buffer is allocated.
//
// Returns any error encountered which did not allow writing the data completely.
// Otherwise, returns the buffer in which the data is written.
//
// Structures with the same field values have the same binary format.
func (x *SealWriteCacheResponse) StableMarshal(buf []byte) []byte {
if x == nil {
return []byte{}
}
if buf == nil {
buf = make([]byte, x.StableSize())
}
var offset int
offset += proto.NestedStructureMarshal(1, buf[offset:], x.Body)
offset += proto.NestedStructureMarshal(2, buf[offset:], x.Signature)
return buf
}
// ReadSignedData fills buf with signed data of x.
// If buffer length is less than x.SignedDataSize(), new buffer is allocated.
//
// Returns any error encountered which did not allow writing the data completely.
// Otherwise, returns the buffer in which the data is written.
//
// Structures with the same field values have the same signed data.
func (x *SealWriteCacheResponse) SignedDataSize() int {
return x.GetBody().StableSize()
}
// SignedDataSize returns size of the request signed data in bytes.
//
// Structures with the same field values have the same signed data size.
func (x *SealWriteCacheResponse) ReadSignedData(buf []byte) ([]byte, error) {
return x.GetBody().StableMarshal(buf), nil
}
func (x *SealWriteCacheResponse) SetSignature(sig *Signature) {
x.Signature = sig
}

View file

@ -35,6 +35,7 @@ const (
ControlService_GetChainLocalOverride_FullMethodName = "/control.ControlService/GetChainLocalOverride" ControlService_GetChainLocalOverride_FullMethodName = "/control.ControlService/GetChainLocalOverride"
ControlService_ListChainLocalOverrides_FullMethodName = "/control.ControlService/ListChainLocalOverrides" ControlService_ListChainLocalOverrides_FullMethodName = "/control.ControlService/ListChainLocalOverrides"
ControlService_RemoveChainLocalOverride_FullMethodName = "/control.ControlService/RemoveChainLocalOverride" ControlService_RemoveChainLocalOverride_FullMethodName = "/control.ControlService/RemoveChainLocalOverride"
ControlService_SealWriteCache_FullMethodName = "/control.ControlService/SealWriteCache"
) )
// ControlServiceClient is the client API for ControlService service. // ControlServiceClient is the client API for ControlService service.
@ -74,6 +75,8 @@ type ControlServiceClient interface {
ListChainLocalOverrides(ctx context.Context, in *ListChainLocalOverridesRequest, opts ...grpc.CallOption) (*ListChainLocalOverridesResponse, error) ListChainLocalOverrides(ctx context.Context, in *ListChainLocalOverridesRequest, opts ...grpc.CallOption) (*ListChainLocalOverridesResponse, error)
// Remove local access policy engine overrides stored in the node by chaind id. // Remove local access policy engine overrides stored in the node by chaind id.
RemoveChainLocalOverride(ctx context.Context, in *RemoveChainLocalOverrideRequest, opts ...grpc.CallOption) (*RemoveChainLocalOverrideResponse, error) RemoveChainLocalOverride(ctx context.Context, in *RemoveChainLocalOverrideRequest, opts ...grpc.CallOption) (*RemoveChainLocalOverrideResponse, error)
// Flush objects from write-cache and move it to degraded read only mode.
SealWriteCache(ctx context.Context, in *SealWriteCacheRequest, opts ...grpc.CallOption) (*SealWriteCacheResponse, error)
} }
type controlServiceClient struct { type controlServiceClient struct {
@ -228,6 +231,15 @@ func (c *controlServiceClient) RemoveChainLocalOverride(ctx context.Context, in
return out, nil return out, nil
} }
func (c *controlServiceClient) SealWriteCache(ctx context.Context, in *SealWriteCacheRequest, opts ...grpc.CallOption) (*SealWriteCacheResponse, error) {
out := new(SealWriteCacheResponse)
err := c.cc.Invoke(ctx, ControlService_SealWriteCache_FullMethodName, in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
// ControlServiceServer is the server API for ControlService service. // ControlServiceServer is the server API for ControlService service.
// All implementations should embed UnimplementedControlServiceServer // All implementations should embed UnimplementedControlServiceServer
// for forward compatibility // for forward compatibility
@ -265,6 +277,8 @@ type ControlServiceServer interface {
ListChainLocalOverrides(context.Context, *ListChainLocalOverridesRequest) (*ListChainLocalOverridesResponse, error) ListChainLocalOverrides(context.Context, *ListChainLocalOverridesRequest) (*ListChainLocalOverridesResponse, error)
// Remove local access policy engine overrides stored in the node by chaind id. // Remove local access policy engine overrides stored in the node by chaind id.
RemoveChainLocalOverride(context.Context, *RemoveChainLocalOverrideRequest) (*RemoveChainLocalOverrideResponse, error) RemoveChainLocalOverride(context.Context, *RemoveChainLocalOverrideRequest) (*RemoveChainLocalOverrideResponse, error)
// Flush objects from write-cache and move it to degraded read only mode.
SealWriteCache(context.Context, *SealWriteCacheRequest) (*SealWriteCacheResponse, error)
} }
// UnimplementedControlServiceServer should be embedded to have forward compatible implementations. // UnimplementedControlServiceServer should be embedded to have forward compatible implementations.
@ -319,6 +333,9 @@ func (UnimplementedControlServiceServer) ListChainLocalOverrides(context.Context
func (UnimplementedControlServiceServer) RemoveChainLocalOverride(context.Context, *RemoveChainLocalOverrideRequest) (*RemoveChainLocalOverrideResponse, error) { func (UnimplementedControlServiceServer) RemoveChainLocalOverride(context.Context, *RemoveChainLocalOverrideRequest) (*RemoveChainLocalOverrideResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method RemoveChainLocalOverride not implemented") return nil, status.Errorf(codes.Unimplemented, "method RemoveChainLocalOverride not implemented")
} }
func (UnimplementedControlServiceServer) SealWriteCache(context.Context, *SealWriteCacheRequest) (*SealWriteCacheResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method SealWriteCache not implemented")
}
// UnsafeControlServiceServer may be embedded to opt out of forward compatibility for this service. // UnsafeControlServiceServer may be embedded to opt out of forward compatibility for this service.
// Use of this interface is not recommended, as added methods to ControlServiceServer will // Use of this interface is not recommended, as added methods to ControlServiceServer will
@ -619,6 +636,24 @@ func _ControlService_RemoveChainLocalOverride_Handler(srv interface{}, ctx conte
return interceptor(ctx, in, info, handler) return interceptor(ctx, in, info, handler)
} }
func _ControlService_SealWriteCache_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(SealWriteCacheRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(ControlServiceServer).SealWriteCache(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: ControlService_SealWriteCache_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(ControlServiceServer).SealWriteCache(ctx, req.(*SealWriteCacheRequest))
}
return interceptor(ctx, in, info, handler)
}
// ControlService_ServiceDesc is the grpc.ServiceDesc for ControlService service. // ControlService_ServiceDesc is the grpc.ServiceDesc for ControlService service.
// It's only intended for direct use with grpc.RegisterService, // It's only intended for direct use with grpc.RegisterService,
// and not to be introspected or modified (even as a copy) // and not to be introspected or modified (even as a copy)
@ -690,6 +725,10 @@ var ControlService_ServiceDesc = grpc.ServiceDesc{
MethodName: "RemoveChainLocalOverride", MethodName: "RemoveChainLocalOverride",
Handler: _ControlService_RemoveChainLocalOverride_Handler, Handler: _ControlService_RemoveChainLocalOverride_Handler,
}, },
{
MethodName: "SealWriteCache",
Handler: _ControlService_SealWriteCache_Handler,
},
}, },
Streams: []grpc.StreamDesc{}, Streams: []grpc.StreamDesc{},
Metadata: "pkg/services/control/service.proto", Metadata: "pkg/services/control/service.proto",