Allow to seal writecache after flush #886
20 changed files with 1641 additions and 603 deletions
|
@ -8,17 +8,23 @@ import (
|
|||
"github.com/spf13/cobra"
|
||||
)
|
||||
|
||||
const sealFlag = "seal"
|
||||
|
||||
var flushCacheCmd = &cobra.Command{
|
||||
Use: "flush-cache",
|
||||
Short: "Flush objects from the write-cache to the main storage",
|
||||
Long: "Flush objects from the write-cache to the main storage",
|
||||
Run: flushCache,
|
||||
Use: "flush-cache",
|
||||
Short: "Flush objects from the write-cache to the main storage",
|
||||
Long: "Flush objects from the write-cache to the main storage",
|
||||
Run: flushCache,
|
||||
Deprecated: "Flushing objects from writecache to the main storage is performed by writecache automatically. To flush and seal writecache use `frostfs-cli control shards writecache seal`.",
|
||||
}
|
||||
|
||||
func flushCache(cmd *cobra.Command, _ []string) {
|
||||
pk := key.Get(cmd)
|
||||
|
||||
req := &control.FlushCacheRequest{Body: new(control.FlushCacheRequest_Body)}
|
||||
seal, _ := cmd.Flags().GetBool(sealFlag)
|
||||
req := &control.FlushCacheRequest{Body: &control.FlushCacheRequest_Body{
|
||||
Seal: seal,
|
||||
}}
|
||||
req.Body.Shard_ID = getShardIDList(cmd)
|
||||
|
||||
signRequest(cmd, pk, req)
|
||||
|
@ -44,6 +50,7 @@ func initControlFlushCacheCmd() {
|
|||
ff := flushCacheCmd.Flags()
|
||||
ff.StringSlice(shardIDFlag, nil, "List of shard IDs in base58 encoding")
|
||||
ff.Bool(shardAllFlag, false, "Process all shards")
|
||||
ff.Bool(sealFlag, false, "Writecache will be left in read-only mode after flush completed")
|
||||
|
||||
flushCacheCmd.MarkFlagsMutuallyExclusive(shardIDFlag, shardAllFlag)
|
||||
}
|
||||
|
|
|
@ -17,6 +17,7 @@ func initControlShardsCmd() {
|
|||
shardsCmd.AddCommand(evacuationShardCmd)
|
||||
shardsCmd.AddCommand(flushCacheCmd)
|
||||
shardsCmd.AddCommand(doctorCmd)
|
||||
shardsCmd.AddCommand(writecacheShardCmd)
|
||||
|
||||
initControlShardsListCmd()
|
||||
initControlSetShardModeCmd()
|
||||
|
@ -24,4 +25,5 @@ func initControlShardsCmd() {
|
|||
initControlEvacuationShardCmd()
|
||||
initControlFlushCacheCmd()
|
||||
initControlDoctorCmd()
|
||||
initControlShardsWritecacheCmd()
|
||||
}
|
||||
|
|
73
cmd/frostfs-cli/modules/control/writecache.go
Normal file
73
cmd/frostfs-cli/modules/control/writecache.go
Normal file
|
@ -0,0 +1,73 @@
|
|||
package control
|
||||
|
||||
import (
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc/client"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/key"
|
||||
commonCmd "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/internal/common"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control"
|
||||
"github.com/mr-tron/base58"
|
||||
"github.com/spf13/cobra"
|
||||
)
|
||||
|
||||
var writecacheShardCmd = &cobra.Command{
|
||||
Use: "writecache",
|
||||
Short: "Operations with storage node's write-cache",
|
||||
Long: "Operations with storage node's write-cache",
|
||||
}
|
||||
|
||||
var sealWritecacheShardCmd = &cobra.Command{
|
||||
Use: "seal",
|
||||
Short: "Flush objects from write-cache and move write-cache to degraded read only mode.",
|
||||
Long: "Flush all the objects from the write-cache to the main storage and move the write-cache to the degraded read only mode: write-cache will be empty and no objects will be put in it.",
|
||||
Run: sealWritecache,
|
||||
}
|
||||
|
||||
func sealWritecache(cmd *cobra.Command, _ []string) {
|
||||
pk := key.Get(cmd)
|
||||
|
||||
ignoreErrors, _ := cmd.Flags().GetBool(ignoreErrorsFlag)
|
||||
|
||||
req := &control.SealWriteCacheRequest{Body: &control.SealWriteCacheRequest_Body{
|
||||
Shard_ID: getShardIDList(cmd),
|
||||
IgnoreErrors: ignoreErrors,
|
||||
}}
|
||||
|
||||
signRequest(cmd, pk, req)
|
||||
|
||||
cli := getClient(cmd, pk)
|
||||
|
||||
var resp *control.SealWriteCacheResponse
|
||||
var err error
|
||||
err = cli.ExecRaw(func(client *client.Client) error {
|
||||
resp, err = control.SealWriteCache(client, req)
|
||||
return err
|
||||
})
|
||||
commonCmd.ExitOnErr(cmd, "rpc error: %w", err)
|
||||
|
||||
verifyResponse(cmd, resp.GetSignature(), resp.GetBody())
|
||||
|
||||
var success, failed uint
|
||||
for _, res := range resp.GetBody().GetResults() {
|
||||
if res.GetSuccess() {
|
||||
success++
|
||||
cmd.Printf("Shard %s: OK\n", base58.Encode(res.GetShard_ID()))
|
||||
} else {
|
||||
failed++
|
||||
cmd.Printf("Shard %s: failed with error %q\n", base58.Encode(res.GetShard_ID()), res.GetError())
|
||||
}
|
||||
}
|
||||
cmd.Printf("Total: %d success, %d failed\n", success, failed)
|
||||
}
|
||||
|
||||
func initControlShardsWritecacheCmd() {
|
||||
writecacheShardCmd.AddCommand(sealWritecacheShardCmd)
|
||||
|
||||
initControlFlags(sealWritecacheShardCmd)
|
||||
|
||||
ff := sealWritecacheShardCmd.Flags()
|
||||
ff.StringSlice(shardIDFlag, nil, "List of shard IDs in base58 encoding")
|
||||
ff.Bool(shardAllFlag, false, "Process all shards")
|
||||
ff.Bool(ignoreErrorsFlag, true, "Skip invalid/unreadable objects")
|
||||
|
||||
sealWritecacheShardCmd.MarkFlagsMutuallyExclusive(shardIDFlag, shardAllFlag)
|
||||
}
|
|
@ -2,6 +2,7 @@ package engine
|
|||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
|
||||
|
@ -11,12 +12,14 @@ import (
|
|||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
||||
// FlushWriteCachePrm groups the parameters of FlushWriteCache operation.
|
||||
type FlushWriteCachePrm struct {
|
||||
shardID *shard.ID
|
||||
ignoreErrors bool
|
||||
seal bool
|
||||
}
|
||||
|
||||
// SetShardID is an option to set shard ID.
|
||||
|
@ -26,11 +29,16 @@ func (p *FlushWriteCachePrm) SetShardID(id *shard.ID) {
|
|||
p.shardID = id
|
||||
}
|
||||
|
||||
// SetIgnoreErrors sets errors ignore flag..
|
||||
// SetIgnoreErrors sets errors ignore flag.
|
||||
func (p *FlushWriteCachePrm) SetIgnoreErrors(ignore bool) {
|
||||
p.ignoreErrors = ignore
|
||||
}
|
||||
|
||||
// SetSeal sets seal flag.
|
||||
func (p *FlushWriteCachePrm) SetSeal(v bool) {
|
||||
p.seal = v
|
||||
}
|
||||
|
||||
// FlushWriteCacheRes groups the resulting values of FlushWriteCache operation.
|
||||
type FlushWriteCacheRes struct{}
|
||||
|
||||
|
@ -38,8 +46,9 @@ type FlushWriteCacheRes struct{}
|
|||
func (e *StorageEngine) FlushWriteCache(ctx context.Context, p FlushWriteCachePrm) (FlushWriteCacheRes, error) {
|
||||
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.FlushWriteCache",
|
||||
trace.WithAttributes(
|
||||
attribute.String("shard)id", p.shardID.String()),
|
||||
attribute.String("shard_id", p.shardID.String()),
|
||||
attribute.Bool("ignore_errors", p.ignoreErrors),
|
||||
attribute.Bool("seal", p.seal),
|
||||
))
|
||||
defer span.End()
|
||||
|
||||
|
@ -53,10 +62,84 @@ func (e *StorageEngine) FlushWriteCache(ctx context.Context, p FlushWriteCachePr
|
|||
|
||||
var prm shard.FlushWriteCachePrm
|
||||
prm.SetIgnoreErrors(p.ignoreErrors)
|
||||
prm.SetSeal(p.seal)
|
||||
|
||||
return FlushWriteCacheRes{}, sh.FlushWriteCache(ctx, prm)
|
||||
}
|
||||
|
||||
type SealWriteCachePrm struct {
|
||||
ShardIDs []*shard.ID
|
||||
IgnoreErrors bool
|
||||
}
|
||||
|
||||
type ShardSealResult struct {
|
||||
ShardID *shard.ID
|
||||
Success bool
|
||||
ErrorMsg string
|
||||
}
|
||||
|
||||
type SealWriteCacheRes struct {
|
||||
ShardResults []ShardSealResult
|
||||
}
|
||||
|
||||
// SealWriteCache flushed all data to blobstore and moves write-cache to degraded read only mode.
|
||||
func (e *StorageEngine) SealWriteCache(ctx context.Context, prm SealWriteCachePrm) (SealWriteCacheRes, error) {
|
||||
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.SealWriteCache",
|
||||
trace.WithAttributes(
|
||||
attribute.Int("shard_id_count", len(prm.ShardIDs)),
|
||||
attribute.Bool("ignore_errors", prm.IgnoreErrors),
|
||||
))
|
||||
defer span.End()
|
||||
|
||||
res := SealWriteCacheRes{
|
||||
ShardResults: make([]ShardSealResult, 0, len(prm.ShardIDs)),
|
||||
}
|
||||
resGuard := &sync.Mutex{}
|
||||
|
||||
eg, egCtx := errgroup.WithContext(ctx)
|
||||
for _, shardID := range prm.ShardIDs {
|
||||
shardID := shardID
|
||||
eg.Go(func() error {
|
||||
e.mtx.RLock()
|
||||
sh, ok := e.shards[shardID.String()]
|
||||
e.mtx.RUnlock()
|
||||
|
||||
if !ok {
|
||||
resGuard.Lock()
|
||||
defer resGuard.Unlock()
|
||||
res.ShardResults = append(res.ShardResults, ShardSealResult{
|
||||
ShardID: shardID,
|
||||
ErrorMsg: errShardNotFound.Error(),
|
||||
})
|
||||
return nil
|
||||
}
|
||||
|
||||
err := sh.SealWriteCache(egCtx, shard.SealWriteCachePrm{IgnoreErrors: prm.IgnoreErrors})
|
||||
|
||||
resGuard.Lock()
|
||||
defer resGuard.Unlock()
|
||||
|
||||
if err != nil {
|
||||
res.ShardResults = append(res.ShardResults, ShardSealResult{
|
||||
ShardID: shardID,
|
||||
ErrorMsg: err.Error(),
|
||||
})
|
||||
} else {
|
||||
res.ShardResults = append(res.ShardResults, ShardSealResult{
|
||||
ShardID: shardID,
|
||||
Success: true,
|
||||
})
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
if err := eg.Wait(); err != nil {
|
||||
return SealWriteCacheRes{}, err
|
||||
}
|
||||
return res, nil
|
||||
}
|
||||
|
||||
type writeCacheMetrics struct {
|
||||
shardID string
|
||||
metrics metrics.WriteCacheMetrics
|
||||
|
|
|
@ -12,6 +12,7 @@ import (
|
|||
// FlushWriteCachePrm represents parameters of a `FlushWriteCache` operation.
|
||||
type FlushWriteCachePrm struct {
|
||||
ignoreErrors bool
|
||||
seal bool
|
||||
}
|
||||
|
||||
// SetIgnoreErrors sets the flag to ignore read-errors during flush.
|
||||
|
@ -19,6 +20,11 @@ func (p *FlushWriteCachePrm) SetIgnoreErrors(ignore bool) {
|
|||
p.ignoreErrors = ignore
|
||||
}
|
||||
|
||||
// SetSeal sets the flag to left writecache in read-only mode after flush.
|
||||
func (p *FlushWriteCachePrm) SetSeal(v bool) {
|
||||
p.seal = v
|
||||
}
|
||||
|
||||
// errWriteCacheDisabled is returned when an operation on write-cache is performed,
|
||||
// but write-cache is disabled.
|
||||
var errWriteCacheDisabled = errors.New("write-cache is disabled")
|
||||
|
@ -29,6 +35,7 @@ func (s *Shard) FlushWriteCache(ctx context.Context, p FlushWriteCachePrm) error
|
|||
trace.WithAttributes(
|
||||
attribute.String("shard_id", s.ID().String()),
|
||||
attribute.Bool("ignore_errors", p.ignoreErrors),
|
||||
attribute.Bool("seal", p.seal),
|
||||
))
|
||||
defer span.End()
|
||||
|
||||
|
@ -47,5 +54,35 @@ func (s *Shard) FlushWriteCache(ctx context.Context, p FlushWriteCachePrm) error
|
|||
return ErrDegradedMode
|
||||
}
|
||||
|
||||
return s.writeCache.Flush(ctx, p.ignoreErrors)
|
||||
return s.writeCache.Flush(ctx, p.ignoreErrors, p.seal)
|
||||
}
|
||||
|
||||
type SealWriteCachePrm struct {
|
||||
IgnoreErrors bool
|
||||
}
|
||||
|
||||
// SealWriteCache flushes all data from the write-cache and moves it to degraded read only mode.
|
||||
func (s *Shard) SealWriteCache(ctx context.Context, p SealWriteCachePrm) error {
|
||||
ctx, span := tracing.StartSpanFromContext(ctx, "Shard.SealWriteCache",
|
||||
trace.WithAttributes(
|
||||
attribute.String("shard_id", s.ID().String()),
|
||||
attribute.Bool("ignore_errors", p.IgnoreErrors),
|
||||
))
|
||||
defer span.End()
|
||||
|
||||
if !s.hasWriteCache() {
|
||||
return errWriteCacheDisabled
|
||||
}
|
||||
|
||||
s.m.RLock()
|
||||
defer s.m.RUnlock()
|
||||
|
||||
if s.info.Mode.ReadOnly() {
|
||||
return ErrReadOnlyMode
|
||||
}
|
||||
if s.info.Mode.NoMetabase() {
|
||||
return ErrDegradedMode
|
||||
}
|
||||
|
||||
return s.writeCache.Seal(ctx, p.IgnoreErrors)
|
||||
}
|
||||
|
|
|
@ -18,6 +18,7 @@ import (
|
|||
// Delete removes object from write-cache.
|
||||
//
|
||||
// Returns an error of type apistatus.ObjectNotFound if object is missing in write-cache.
|
||||
// Returns ErrNotInitialized if write-cache has not been initialized yet.
|
||||
func (c *cache) Delete(ctx context.Context, addr oid.Address) error {
|
||||
ctx, span := tracing.StartSpanFromContext(ctx, "writecache.Delete",
|
||||
trace.WithAttributes(
|
||||
|
@ -32,7 +33,9 @@ func (c *cache) Delete(ctx context.Context, addr oid.Address) error {
|
|||
c.metrics.Delete(time.Since(startedAt), deleted, storageType)
|
||||
}()
|
||||
|
||||
c.modeMtx.RLock()
|
||||
if !c.modeMtx.TryRLock() {
|
||||
return ErrNotInitialized
|
||||
}
|
||||
defer c.modeMtx.RUnlock()
|
||||
if c.readOnly() {
|
||||
return ErrReadOnly
|
||||
|
|
|
@ -12,6 +12,7 @@ import (
|
|||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
|
||||
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
|
@ -33,6 +34,8 @@ const (
|
|||
defaultFlushInterval = time.Second
|
||||
)
|
||||
|
||||
var errIterationCompleted = errors.New("iteration completed")
|
||||
|
||||
// runFlushLoop starts background workers which periodically flush objects to the blobstor.
|
||||
func (c *cache) runFlushLoop(ctx context.Context) {
|
||||
if c.disableBackgroundFlush {
|
||||
|
@ -135,13 +138,11 @@ func (c *cache) flushSmallObjects(ctx context.Context) {
|
|||
}
|
||||
}
|
||||
|
||||
c.modeMtx.RUnlock()
|
||||
if count == 0 {
|
||||
c.modeMtx.RUnlock()
|
||||
break
|
||||
}
|
||||
|
||||
c.modeMtx.RUnlock()
|
||||
|
||||
c.log.Debug(logs.WritecacheTriedToFlushItemsFromWritecache,
|
||||
zap.Int("count", count),
|
||||
zap.String("start", base58.Encode(lastKey)))
|
||||
|
@ -229,7 +230,7 @@ func (c *cache) workerFlushSmall(ctx context.Context) {
|
|||
continue
|
||||
}
|
||||
|
||||
c.deleteFromDB(objInfo.addr)
|
||||
c.deleteFromDB(objInfo.addr, true)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -270,19 +271,29 @@ func (c *cache) flushObject(ctx context.Context, obj *objectSDK.Object, data []b
|
|||
}
|
||||
|
||||
// Flush flushes all objects from the write-cache to the main storage.
|
||||
// Write-cache must be in readonly mode to ensure correctness of an operation and
|
||||
// to prevent interference with background flush workers.
|
||||
func (c *cache) Flush(ctx context.Context, ignoreErrors bool) error {
|
||||
ctx, span := tracing.StartSpanFromContext(ctx, "Flush",
|
||||
func (c *cache) Flush(ctx context.Context, ignoreErrors, seal bool) error {
|
||||
ctx, span := tracing.StartSpanFromContext(ctx, "writecache.Flush",
|
||||
trace.WithAttributes(
|
||||
attribute.Bool("ignore_errors", ignoreErrors),
|
||||
attribute.Bool("seal", seal),
|
||||
))
|
||||
defer span.End()
|
||||
|
||||
c.modeMtx.RLock()
|
||||
defer c.modeMtx.RUnlock()
|
||||
c.modeMtx.Lock() // exclusive lock to not to conflict with background flush
|
||||
defer c.modeMtx.Unlock()
|
||||
|
||||
return c.flush(ctx, ignoreErrors)
|
||||
if err := c.flush(ctx, ignoreErrors); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if seal {
|
||||
m := c.mode | mode.ReadOnly
|
||||
if err := c.setMode(ctx, m, ignoreErrors); err != nil {
|
||||
return err
|
||||
}
|
||||
c.metrics.SetMode(m)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *cache) flush(ctx context.Context, ignoreErrors bool) error {
|
||||
|
@ -290,13 +301,53 @@ func (c *cache) flush(ctx context.Context, ignoreErrors bool) error {
|
|||
return err
|
||||
}
|
||||
|
||||
return c.db.View(func(tx *bbolt.Tx) error {
|
||||
var last string
|
||||
for {
|
||||
batch, err := c.readNextDBBatch(ignoreErrors, last)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if len(batch) == 0 {
|
||||
break
|
||||
}
|
||||
for _, item := range batch {
|
||||
var obj objectSDK.Object
|
||||
if err := obj.Unmarshal(item.data); err != nil {
|
||||
c.reportFlushError(logs.FSTreeCantDecodeDBObjectAddress, item.address, metaerr.Wrap(err))
|
||||
if ignoreErrors {
|
||||
continue
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
if err := c.flushObject(ctx, &obj, item.data, StorageTypeDB); err != nil {
|
||||
return err
|
||||
}
|
||||
c.deleteFromDB(item.address, false)
|
||||
}
|
||||
last = batch[len(batch)-1].address
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
type batchItem struct {
|
||||
data []byte
|
||||
address string
|
||||
}
|
||||
|
||||
func (c *cache) readNextDBBatch(ignoreErrors bool, last string) ([]batchItem, error) {
|
||||
const batchSize = 100
|
||||
var batch []batchItem
|
||||
err := c.db.View(func(tx *bbolt.Tx) error {
|
||||
var addr oid.Address
|
||||
|
||||
b := tx.Bucket(defaultBucket)
|
||||
cs := b.Cursor()
|
||||
for k, data := cs.Seek(nil); k != nil; k, data = cs.Next() {
|
||||
for k, data := cs.Seek([]byte(last)); k != nil; k, data = cs.Next() {
|
||||
sa := string(k)
|
||||
if sa == last {
|
||||
continue
|
||||
}
|
||||
if err := addr.DecodeString(sa); err != nil {
|
||||
c.reportFlushError(logs.FSTreeCantDecodeDBObjectAddress, sa, metaerr.Wrap(err))
|
||||
if ignoreErrors {
|
||||
|
@ -305,19 +356,15 @@ func (c *cache) flush(ctx context.Context, ignoreErrors bool) error {
|
|||
return err
|
||||
}
|
||||
|
||||
var obj objectSDK.Object
|
||||
if err := obj.Unmarshal(data); err != nil {
|
||||
c.reportFlushError(logs.FSTreeCantDecodeDBObjectAddress, sa, metaerr.Wrap(err))
|
||||
if ignoreErrors {
|
||||
continue
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
if err := c.flushObject(ctx, &obj, data, StorageTypeDB); err != nil {
|
||||
return err
|
||||
batch = append(batch, batchItem{data: bytes.Clone(data), address: sa})
|
||||
if len(batch) == batchSize {
|
||||
return errIterationCompleted
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err == nil || errors.Is(err, errIterationCompleted) {
|
||||
return batch, nil
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
@ -147,7 +147,7 @@ func runFlushTest[Option any](
|
|||
require.NoError(t, bs.SetMode(mode.ReadWrite))
|
||||
require.NoError(t, mb.SetMode(mode.ReadWrite))
|
||||
|
||||
require.NoError(t, wc.Flush(context.Background(), false))
|
||||
require.NoError(t, wc.Flush(context.Background(), false, false))
|
||||
|
||||
check(t, mb, bs, objects)
|
||||
})
|
||||
|
@ -159,8 +159,6 @@ func runFlushTest[Option any](
|
|||
// Blobstor is read-only, so we expect en error from `flush` here.
|
||||
require.Error(t, wc.SetMode(mode.Degraded))
|
||||
|
||||
// First move to read-only mode to close background workers.
|
||||
require.NoError(t, wc.SetMode(mode.ReadOnly))
|
||||
require.NoError(t, bs.SetMode(mode.ReadWrite))
|
||||
require.NoError(t, mb.SetMode(mode.ReadWrite))
|
||||
require.NoError(t, wc.SetMode(mode.Degraded))
|
||||
|
@ -177,14 +175,13 @@ func runFlushTest[Option any](
|
|||
objects := putObjects(t, wc)
|
||||
f.InjectFn(t, wc)
|
||||
|
||||
require.NoError(t, wc.SetMode(mode.ReadOnly))
|
||||
require.NoError(t, bs.SetMode(mode.ReadWrite))
|
||||
require.NoError(t, mb.SetMode(mode.ReadWrite))
|
||||
|
||||
require.Equal(t, uint32(0), errCount.Load())
|
||||
require.Error(t, wc.Flush(context.Background(), false))
|
||||
require.Error(t, wc.Flush(context.Background(), false, false))
|
||||
require.Greater(t, errCount.Load(), uint32(0))
|
||||
require.NoError(t, wc.Flush(context.Background(), true))
|
||||
require.NoError(t, wc.Flush(context.Background(), true, false))
|
||||
|
||||
check(t, mb, bs, objects)
|
||||
})
|
||||
|
|
|
@ -25,7 +25,7 @@ func (c *cache) SetMode(m mode.Mode) error {
|
|||
c.modeMtx.Lock()
|
||||
defer c.modeMtx.Unlock()
|
||||
|
||||
err := c.setMode(ctx, m)
|
||||
err := c.setMode(ctx, m, true)
|
||||
if err == nil {
|
||||
c.metrics.SetMode(m)
|
||||
}
|
||||
|
@ -33,12 +33,12 @@ func (c *cache) SetMode(m mode.Mode) error {
|
|||
}
|
||||
|
||||
// setMode applies new mode. Must be called with cache.modeMtx lock taken.
|
||||
func (c *cache) setMode(ctx context.Context, m mode.Mode) error {
|
||||
func (c *cache) setMode(ctx context.Context, m mode.Mode, ignoreErrors bool) error {
|
||||
var err error
|
||||
turnOffMeta := m.NoMetabase()
|
||||
|
||||
if turnOffMeta && !c.mode.NoMetabase() {
|
||||
err = c.flush(ctx, true)
|
||||
err = c.flush(ctx, ignoreErrors)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -34,7 +34,9 @@ func (c *cache) Put(ctx context.Context, prm common.PutPrm) (common.PutRes, erro
|
|||
c.metrics.Put(time.Since(startedAt), added, storageType)
|
||||
}()
|
||||
|
||||
c.modeMtx.RLock()
|
||||
if !c.modeMtx.TryRLock() {
|
||||
return common.PutRes{}, ErrNotInitialized
|
||||
}
|
||||
defer c.modeMtx.RUnlock()
|
||||
if c.readOnly() {
|
||||
return common.PutRes{}, ErrReadOnly
|
||||
|
|
28
pkg/local_object_storage/writecache/seal.go
Normal file
28
pkg/local_object_storage/writecache/seal.go
Normal file
|
@ -0,0 +1,28 @@
|
|||
package writecache
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
)
|
||||
|
||||
func (c *cache) Seal(ctx context.Context, ignoreErrors bool) error {
|
||||
ctx, span := tracing.StartSpanFromContext(ctx, "writecache.Seal",
|
||||
trace.WithAttributes(
|
||||
attribute.Bool("ignore_errors", ignoreErrors),
|
||||
))
|
||||
defer span.End()
|
||||
|
||||
c.modeMtx.Lock()
|
||||
defer c.modeMtx.Unlock()
|
||||
|
||||
// flush will be done by setMode
|
||||
err := c.setMode(ctx, mode.DegradedReadOnly, ignoreErrors)
|
||||
if err == nil {
|
||||
c.metrics.SetMode(mode.DegradedReadOnly)
|
||||
}
|
||||
return err
|
||||
}
|
|
@ -67,14 +67,24 @@ func (c *cache) openStore(readOnly bool) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (c *cache) deleteFromDB(key string) {
|
||||
func (c *cache) deleteFromDB(key string, batched bool) {
|
||||
var recordDeleted bool
|
||||
err := c.db.Batch(func(tx *bbolt.Tx) error {
|
||||
b := tx.Bucket(defaultBucket)
|
||||
key := []byte(key)
|
||||
recordDeleted = b.Get(key) != nil
|
||||
return b.Delete(key)
|
||||
})
|
||||
var err error
|
||||
if batched {
|
||||
err = c.db.Batch(func(tx *bbolt.Tx) error {
|
||||
b := tx.Bucket(defaultBucket)
|
||||
key := []byte(key)
|
||||
recordDeleted = b.Get(key) != nil
|
||||
return b.Delete(key)
|
||||
})
|
||||
} else {
|
||||
err = c.db.Update(func(tx *bbolt.Tx) error {
|
||||
b := tx.Bucket(defaultBucket)
|
||||
key := []byte(key)
|
||||
recordDeleted = b.Get(key) != nil
|
||||
return b.Delete(key)
|
||||
})
|
||||
}
|
||||
|
||||
if err == nil {
|
||||
c.metrics.Evict(StorageTypeDB)
|
||||
|
|
|
@ -35,7 +35,8 @@ type Cache interface {
|
|||
SetMode(mode.Mode) error
|
||||
SetLogger(*logger.Logger)
|
||||
DumpInfo() Info
|
||||
Flush(context.Context, bool) error
|
||||
Flush(context.Context, bool, bool) error
|
||||
Seal(context.Context, bool) error
|
||||
|
||||
Init() error
|
||||
Open(ctx context.Context, readOnly bool) error
|
||||
|
|
|
@ -24,6 +24,7 @@ const (
|
|||
rpcGetChainLocalOverride = "GetChainLocalOverride"
|
||||
rpcListChainLocalOverrides = "ListChainLocalOverrides"
|
||||
rpcRemoveChainLocalOverride = "RemoveChainLocalOverride"
|
||||
rpcSealWriteCache = "SealWriteCache"
|
||||
)
|
||||
|
||||
// HealthCheck executes ControlService.HealthCheck RPC.
|
||||
|
@ -264,3 +265,16 @@ func RemoveChainLocalOverride(cli *client.Client, req *RemoveChainLocalOverrideR
|
|||
|
||||
return wResp.message, nil
|
||||
}
|
||||
|
||||
// SealWriteCache executes ControlService.SealWriteCache RPC.
|
||||
func SealWriteCache(cli *client.Client, req *SealWriteCacheRequest, opts ...client.CallOption) (*SealWriteCacheResponse, error) {
|
||||
wResp := newResponseWrapper[SealWriteCacheResponse]()
|
||||
wReq := &requestWrapper{m: req}
|
||||
|
||||
err := client.SendUnary(cli, common.CallMethodInfoUnary(serviceName, rpcSealWriteCache), wReq, wResp, opts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return wResp.message, nil
|
||||
}
|
||||
|
|
|
@ -18,6 +18,7 @@ func (s *Server) FlushCache(ctx context.Context, req *control.FlushCacheRequest)
|
|||
for _, shardID := range s.getShardIDList(req.GetBody().GetShard_ID()) {
|
||||
var prm engine.FlushWriteCachePrm
|
||||
prm.SetShardID(shardID)
|
||||
prm.SetSeal(req.GetBody().GetSeal())
|
||||
|
||||
_, err = s.s.FlushWriteCache(ctx, prm)
|
||||
if err != nil {
|
||||
|
|
48
pkg/services/control/server/seal_writecache.go
Normal file
48
pkg/services/control/server/seal_writecache.go
Normal file
|
@ -0,0 +1,48 @@
|
|||
package control
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
)
|
||||
|
||||
func (s *Server) SealWriteCache(ctx context.Context, req *control.SealWriteCacheRequest) (*control.SealWriteCacheResponse, error) {
|
||||
err := s.isValidRequest(req)
|
||||
if err != nil {
|
||||
return nil, status.Error(codes.PermissionDenied, err.Error())
|
||||
}
|
||||
|
||||
prm := engine.SealWriteCachePrm{
|
||||
ShardIDs: s.getShardIDList(req.GetBody().GetShard_ID()),
|
||||
IgnoreErrors: req.GetBody().GetIgnoreErrors(),
|
||||
}
|
||||
|
||||
res, err := s.s.SealWriteCache(ctx, prm)
|
||||
if err != nil {
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
|
||||
resp := &control.SealWriteCacheResponse{Body: &control.SealWriteCacheResponse_Body{}}
|
||||
for _, r := range res.ShardResults {
|
||||
if r.Success {
|
||||
resp.Body.Results = append(resp.GetBody().GetResults(), &control.SealWriteCacheResponse_Body_Status{
|
||||
Shard_ID: *r.ShardID,
|
||||
Success: true,
|
||||
})
|
||||
} else {
|
||||
resp.Body.Results = append(resp.GetBody().GetResults(), &control.SealWriteCacheResponse_Body_Status{
|
||||
Shard_ID: *r.ShardID,
|
||||
Error: r.ErrorMsg,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
err = SignMessage(s.key, resp)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return resp, nil
|
||||
}
|
1501
pkg/services/control/service.pb.go
generated
1501
pkg/services/control/service.pb.go
generated
File diff suppressed because it is too large
Load diff
|
@ -56,6 +56,9 @@ service ControlService {
|
|||
|
||||
// Remove local access policy engine overrides stored in the node by chaind id.
|
||||
rpc RemoveChainLocalOverride (RemoveChainLocalOverrideRequest) returns (RemoveChainLocalOverrideResponse);
|
||||
|
||||
// Flush objects from write-cache and move it to degraded read only mode.
|
||||
rpc SealWriteCache(SealWriteCacheRequest) returns (SealWriteCacheResponse);
|
||||
}
|
||||
|
||||
// Health check request.
|
||||
|
@ -280,6 +283,8 @@ message FlushCacheRequest {
|
|||
message Body {
|
||||
// ID of the shard.
|
||||
repeated bytes shard_ID = 1;
|
||||
// If true, then writecache will be left in read-only mode after flush completed.
|
||||
bool seal = 2;
|
||||
}
|
||||
|
||||
Body body = 1;
|
||||
|
@ -523,3 +528,32 @@ message RemoveChainLocalOverrideResponse {
|
|||
|
||||
Signature signature = 2;
|
||||
}
|
||||
|
||||
message SealWriteCacheRequest {
|
||||
// Request body structure.
|
||||
message Body {
|
||||
// ID of the shard.
|
||||
repeated bytes shard_ID = 1;
|
||||
|
||||
// Flag indicating whether object read errors should be ignored.
|
||||
bool ignore_errors = 2;
|
||||
}
|
||||
|
||||
Body body = 1;
|
||||
Signature signature = 2;
|
||||
}
|
||||
|
||||
message SealWriteCacheResponse {
|
||||
message Body {
|
||||
message Status {
|
||||
bytes shard_ID = 1;
|
||||
bool success = 2;
|
||||
string error = 3;
|
||||
}
|
||||
repeated Status results = 1;
|
||||
}
|
||||
|
||||
Body body = 1;
|
||||
|
||||
Signature signature = 2;
|
||||
}
|
215
pkg/services/control/service_frostfs.pb.go
generated
215
pkg/services/control/service_frostfs.pb.go
generated
|
@ -1180,6 +1180,7 @@ func (x *FlushCacheRequest_Body) StableSize() (size int) {
|
|||
return 0
|
||||
}
|
||||
size += proto.RepeatedBytesSize(1, x.Shard_ID)
|
||||
size += proto.BoolSize(2, x.Seal)
|
||||
return size
|
||||
}
|
||||
|
||||
|
@ -1200,6 +1201,7 @@ func (x *FlushCacheRequest_Body) StableMarshal(buf []byte) []byte {
|
|||
}
|
||||
var offset int
|
||||
offset += proto.RepeatedBytesMarshal(1, buf[offset:], x.Shard_ID)
|
||||
offset += proto.BoolMarshal(2, buf[offset:], x.Seal)
|
||||
return buf
|
||||
}
|
||||
|
||||
|
@ -2753,3 +2755,216 @@ func (x *RemoveChainLocalOverrideResponse) ReadSignedData(buf []byte) ([]byte, e
|
|||
func (x *RemoveChainLocalOverrideResponse) SetSignature(sig *Signature) {
|
||||
x.Signature = sig
|
||||
}
|
||||
|
||||
// StableSize returns the size of x in protobuf format.
|
||||
//
|
||||
// Structures with the same field values have the same binary size.
|
||||
func (x *SealWriteCacheRequest_Body) StableSize() (size int) {
|
||||
if x == nil {
|
||||
return 0
|
||||
}
|
||||
size += proto.RepeatedBytesSize(1, x.Shard_ID)
|
||||
size += proto.BoolSize(2, x.IgnoreErrors)
|
||||
return size
|
||||
}
|
||||
|
||||
// StableMarshal marshals x in protobuf binary format with stable field order.
|
||||
//
|
||||
// If buffer length is less than x.StableSize(), new buffer is allocated.
|
||||
//
|
||||
// Returns any error encountered which did not allow writing the data completely.
|
||||
// Otherwise, returns the buffer in which the data is written.
|
||||
//
|
||||
// Structures with the same field values have the same binary format.
|
||||
func (x *SealWriteCacheRequest_Body) StableMarshal(buf []byte) []byte {
|
||||
if x == nil {
|
||||
return []byte{}
|
||||
}
|
||||
if buf == nil {
|
||||
buf = make([]byte, x.StableSize())
|
||||
}
|
||||
var offset int
|
||||
offset += proto.RepeatedBytesMarshal(1, buf[offset:], x.Shard_ID)
|
||||
offset += proto.BoolMarshal(2, buf[offset:], x.IgnoreErrors)
|
||||
return buf
|
||||
}
|
||||
|
||||
// StableSize returns the size of x in protobuf format.
|
||||
//
|
||||
// Structures with the same field values have the same binary size.
|
||||
func (x *SealWriteCacheRequest) StableSize() (size int) {
|
||||
if x == nil {
|
||||
return 0
|
||||
}
|
||||
size += proto.NestedStructureSize(1, x.Body)
|
||||
size += proto.NestedStructureSize(2, x.Signature)
|
||||
return size
|
||||
}
|
||||
|
||||
// StableMarshal marshals x in protobuf binary format with stable field order.
|
||||
//
|
||||
// If buffer length is less than x.StableSize(), new buffer is allocated.
|
||||
//
|
||||
// Returns any error encountered which did not allow writing the data completely.
|
||||
// Otherwise, returns the buffer in which the data is written.
|
||||
//
|
||||
// Structures with the same field values have the same binary format.
|
||||
func (x *SealWriteCacheRequest) StableMarshal(buf []byte) []byte {
|
||||
if x == nil {
|
||||
return []byte{}
|
||||
}
|
||||
if buf == nil {
|
||||
buf = make([]byte, x.StableSize())
|
||||
}
|
||||
var offset int
|
||||
offset += proto.NestedStructureMarshal(1, buf[offset:], x.Body)
|
||||
offset += proto.NestedStructureMarshal(2, buf[offset:], x.Signature)
|
||||
return buf
|
||||
}
|
||||
|
||||
// ReadSignedData fills buf with signed data of x.
|
||||
// If buffer length is less than x.SignedDataSize(), new buffer is allocated.
|
||||
//
|
||||
// Returns any error encountered which did not allow writing the data completely.
|
||||
// Otherwise, returns the buffer in which the data is written.
|
||||
//
|
||||
// Structures with the same field values have the same signed data.
|
||||
func (x *SealWriteCacheRequest) SignedDataSize() int {
|
||||
return x.GetBody().StableSize()
|
||||
}
|
||||
|
||||
// SignedDataSize returns size of the request signed data in bytes.
|
||||
//
|
||||
// Structures with the same field values have the same signed data size.
|
||||
func (x *SealWriteCacheRequest) ReadSignedData(buf []byte) ([]byte, error) {
|
||||
return x.GetBody().StableMarshal(buf), nil
|
||||
}
|
||||
|
||||
func (x *SealWriteCacheRequest) SetSignature(sig *Signature) {
|
||||
x.Signature = sig
|
||||
}
|
||||
|
||||
// StableSize returns the size of x in protobuf format.
|
||||
//
|
||||
// Structures with the same field values have the same binary size.
|
||||
func (x *SealWriteCacheResponse_Body_Status) StableSize() (size int) {
|
||||
if x == nil {
|
||||
return 0
|
||||
}
|
||||
size += proto.BytesSize(1, x.Shard_ID)
|
||||
size += proto.BoolSize(2, x.Success)
|
||||
size += proto.StringSize(3, x.Error)
|
||||
return size
|
||||
}
|
||||
|
||||
// StableMarshal marshals x in protobuf binary format with stable field order.
|
||||
//
|
||||
// If buffer length is less than x.StableSize(), new buffer is allocated.
|
||||
//
|
||||
// Returns any error encountered which did not allow writing the data completely.
|
||||
// Otherwise, returns the buffer in which the data is written.
|
||||
//
|
||||
// Structures with the same field values have the same binary format.
|
||||
func (x *SealWriteCacheResponse_Body_Status) StableMarshal(buf []byte) []byte {
|
||||
if x == nil {
|
||||
return []byte{}
|
||||
}
|
||||
if buf == nil {
|
||||
buf = make([]byte, x.StableSize())
|
||||
}
|
||||
var offset int
|
||||
offset += proto.BytesMarshal(1, buf[offset:], x.Shard_ID)
|
||||
offset += proto.BoolMarshal(2, buf[offset:], x.Success)
|
||||
offset += proto.StringMarshal(3, buf[offset:], x.Error)
|
||||
return buf
|
||||
}
|
||||
|
||||
// StableSize returns the size of x in protobuf format.
|
||||
//
|
||||
// Structures with the same field values have the same binary size.
|
||||
func (x *SealWriteCacheResponse_Body) StableSize() (size int) {
|
||||
if x == nil {
|
||||
return 0
|
||||
}
|
||||
for i := range x.Results {
|
||||
size += proto.NestedStructureSize(1, x.Results[i])
|
||||
}
|
||||
return size
|
||||
}
|
||||
|
||||
// StableMarshal marshals x in protobuf binary format with stable field order.
|
||||
//
|
||||
// If buffer length is less than x.StableSize(), new buffer is allocated.
|
||||
//
|
||||
// Returns any error encountered which did not allow writing the data completely.
|
||||
// Otherwise, returns the buffer in which the data is written.
|
||||
//
|
||||
// Structures with the same field values have the same binary format.
|
||||
func (x *SealWriteCacheResponse_Body) StableMarshal(buf []byte) []byte {
|
||||
if x == nil {
|
||||
return []byte{}
|
||||
}
|
||||
if buf == nil {
|
||||
buf = make([]byte, x.StableSize())
|
||||
}
|
||||
var offset int
|
||||
for i := range x.Results {
|
||||
offset += proto.NestedStructureMarshal(1, buf[offset:], x.Results[i])
|
||||
}
|
||||
return buf
|
||||
}
|
||||
|
||||
// StableSize returns the size of x in protobuf format.
|
||||
//
|
||||
// Structures with the same field values have the same binary size.
|
||||
func (x *SealWriteCacheResponse) StableSize() (size int) {
|
||||
if x == nil {
|
||||
return 0
|
||||
}
|
||||
size += proto.NestedStructureSize(1, x.Body)
|
||||
size += proto.NestedStructureSize(2, x.Signature)
|
||||
return size
|
||||
}
|
||||
|
||||
// StableMarshal marshals x in protobuf binary format with stable field order.
|
||||
//
|
||||
// If buffer length is less than x.StableSize(), new buffer is allocated.
|
||||
//
|
||||
// Returns any error encountered which did not allow writing the data completely.
|
||||
// Otherwise, returns the buffer in which the data is written.
|
||||
//
|
||||
// Structures with the same field values have the same binary format.
|
||||
func (x *SealWriteCacheResponse) StableMarshal(buf []byte) []byte {
|
||||
if x == nil {
|
||||
return []byte{}
|
||||
}
|
||||
if buf == nil {
|
||||
buf = make([]byte, x.StableSize())
|
||||
}
|
||||
var offset int
|
||||
offset += proto.NestedStructureMarshal(1, buf[offset:], x.Body)
|
||||
offset += proto.NestedStructureMarshal(2, buf[offset:], x.Signature)
|
||||
return buf
|
||||
}
|
||||
|
||||
// ReadSignedData fills buf with signed data of x.
|
||||
// If buffer length is less than x.SignedDataSize(), new buffer is allocated.
|
||||
//
|
||||
// Returns any error encountered which did not allow writing the data completely.
|
||||
// Otherwise, returns the buffer in which the data is written.
|
||||
//
|
||||
// Structures with the same field values have the same signed data.
|
||||
func (x *SealWriteCacheResponse) SignedDataSize() int {
|
||||
return x.GetBody().StableSize()
|
||||
}
|
||||
|
||||
// SignedDataSize returns size of the request signed data in bytes.
|
||||
//
|
||||
// Structures with the same field values have the same signed data size.
|
||||
func (x *SealWriteCacheResponse) ReadSignedData(buf []byte) ([]byte, error) {
|
||||
return x.GetBody().StableMarshal(buf), nil
|
||||
}
|
||||
|
||||
func (x *SealWriteCacheResponse) SetSignature(sig *Signature) {
|
||||
x.Signature = sig
|
||||
}
|
||||
|
|
39
pkg/services/control/service_grpc.pb.go
generated
39
pkg/services/control/service_grpc.pb.go
generated
|
@ -35,6 +35,7 @@ const (
|
|||
ControlService_GetChainLocalOverride_FullMethodName = "/control.ControlService/GetChainLocalOverride"
|
||||
ControlService_ListChainLocalOverrides_FullMethodName = "/control.ControlService/ListChainLocalOverrides"
|
||||
ControlService_RemoveChainLocalOverride_FullMethodName = "/control.ControlService/RemoveChainLocalOverride"
|
||||
ControlService_SealWriteCache_FullMethodName = "/control.ControlService/SealWriteCache"
|
||||
)
|
||||
|
||||
// ControlServiceClient is the client API for ControlService service.
|
||||
|
@ -74,6 +75,8 @@ type ControlServiceClient interface {
|
|||
ListChainLocalOverrides(ctx context.Context, in *ListChainLocalOverridesRequest, opts ...grpc.CallOption) (*ListChainLocalOverridesResponse, error)
|
||||
// Remove local access policy engine overrides stored in the node by chaind id.
|
||||
RemoveChainLocalOverride(ctx context.Context, in *RemoveChainLocalOverrideRequest, opts ...grpc.CallOption) (*RemoveChainLocalOverrideResponse, error)
|
||||
// Flush objects from write-cache and move it to degraded read only mode.
|
||||
SealWriteCache(ctx context.Context, in *SealWriteCacheRequest, opts ...grpc.CallOption) (*SealWriteCacheResponse, error)
|
||||
}
|
||||
|
||||
type controlServiceClient struct {
|
||||
|
@ -228,6 +231,15 @@ func (c *controlServiceClient) RemoveChainLocalOverride(ctx context.Context, in
|
|||
return out, nil
|
||||
}
|
||||
|
||||
func (c *controlServiceClient) SealWriteCache(ctx context.Context, in *SealWriteCacheRequest, opts ...grpc.CallOption) (*SealWriteCacheResponse, error) {
|
||||
out := new(SealWriteCacheResponse)
|
||||
err := c.cc.Invoke(ctx, ControlService_SealWriteCache_FullMethodName, in, out, opts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
// ControlServiceServer is the server API for ControlService service.
|
||||
// All implementations should embed UnimplementedControlServiceServer
|
||||
// for forward compatibility
|
||||
|
@ -265,6 +277,8 @@ type ControlServiceServer interface {
|
|||
ListChainLocalOverrides(context.Context, *ListChainLocalOverridesRequest) (*ListChainLocalOverridesResponse, error)
|
||||
// Remove local access policy engine overrides stored in the node by chaind id.
|
||||
RemoveChainLocalOverride(context.Context, *RemoveChainLocalOverrideRequest) (*RemoveChainLocalOverrideResponse, error)
|
||||
// Flush objects from write-cache and move it to degraded read only mode.
|
||||
SealWriteCache(context.Context, *SealWriteCacheRequest) (*SealWriteCacheResponse, error)
|
||||
}
|
||||
|
||||
// UnimplementedControlServiceServer should be embedded to have forward compatible implementations.
|
||||
|
@ -319,6 +333,9 @@ func (UnimplementedControlServiceServer) ListChainLocalOverrides(context.Context
|
|||
func (UnimplementedControlServiceServer) RemoveChainLocalOverride(context.Context, *RemoveChainLocalOverrideRequest) (*RemoveChainLocalOverrideResponse, error) {
|
||||
return nil, status.Errorf(codes.Unimplemented, "method RemoveChainLocalOverride not implemented")
|
||||
}
|
||||
func (UnimplementedControlServiceServer) SealWriteCache(context.Context, *SealWriteCacheRequest) (*SealWriteCacheResponse, error) {
|
||||
return nil, status.Errorf(codes.Unimplemented, "method SealWriteCache not implemented")
|
||||
}
|
||||
|
||||
// UnsafeControlServiceServer may be embedded to opt out of forward compatibility for this service.
|
||||
// Use of this interface is not recommended, as added methods to ControlServiceServer will
|
||||
|
@ -619,6 +636,24 @@ func _ControlService_RemoveChainLocalOverride_Handler(srv interface{}, ctx conte
|
|||
return interceptor(ctx, in, info, handler)
|
||||
}
|
||||
|
||||
func _ControlService_SealWriteCache_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
|
||||
in := new(SealWriteCacheRequest)
|
||||
if err := dec(in); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if interceptor == nil {
|
||||
return srv.(ControlServiceServer).SealWriteCache(ctx, in)
|
||||
}
|
||||
info := &grpc.UnaryServerInfo{
|
||||
Server: srv,
|
||||
FullMethod: ControlService_SealWriteCache_FullMethodName,
|
||||
}
|
||||
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
|
||||
return srv.(ControlServiceServer).SealWriteCache(ctx, req.(*SealWriteCacheRequest))
|
||||
}
|
||||
return interceptor(ctx, in, info, handler)
|
||||
}
|
||||
|
||||
// ControlService_ServiceDesc is the grpc.ServiceDesc for ControlService service.
|
||||
// It's only intended for direct use with grpc.RegisterService,
|
||||
// and not to be introspected or modified (even as a copy)
|
||||
|
@ -690,6 +725,10 @@ var ControlService_ServiceDesc = grpc.ServiceDesc{
|
|||
MethodName: "RemoveChainLocalOverride",
|
||||
Handler: _ControlService_RemoveChainLocalOverride_Handler,
|
||||
},
|
||||
{
|
||||
MethodName: "SealWriteCache",
|
||||
Handler: _ControlService_SealWriteCache_Handler,
|
||||
},
|
||||
},
|
||||
Streams: []grpc.StreamDesc{},
|
||||
Metadata: "pkg/services/control/service.proto",
|
||||
|
|
Loading…
Add table
Reference in a new issue