Allow to seal writecache after flush #886

Merged
fyrchik merged 4 commits from dstepanov-yadro/frostfs-node:feat/flush_and_disable_writecache into master 2024-09-04 19:51:05 +00:00
20 changed files with 1641 additions and 603 deletions

View file

@ -8,17 +8,23 @@ import (
"github.com/spf13/cobra"
)
const sealFlag = "seal"
var flushCacheCmd = &cobra.Command{
Use: "flush-cache",
Short: "Flush objects from the write-cache to the main storage",
Long: "Flush objects from the write-cache to the main storage",
Run: flushCache,
Use: "flush-cache",
Short: "Flush objects from the write-cache to the main storage",
Long: "Flush objects from the write-cache to the main storage",
Run: flushCache,
Deprecated: "Flushing objects from writecache to the main storage is performed by writecache automatically. To flush and seal writecache use `frostfs-cli control shards writecache seal`.",
}
func flushCache(cmd *cobra.Command, _ []string) {
pk := key.Get(cmd)
req := &control.FlushCacheRequest{Body: new(control.FlushCacheRequest_Body)}
seal, _ := cmd.Flags().GetBool(sealFlag)
req := &control.FlushCacheRequest{Body: &control.FlushCacheRequest_Body{
Seal: seal,
}}
req.Body.Shard_ID = getShardIDList(cmd)
signRequest(cmd, pk, req)
@ -44,6 +50,7 @@ func initControlFlushCacheCmd() {
ff := flushCacheCmd.Flags()
ff.StringSlice(shardIDFlag, nil, "List of shard IDs in base58 encoding")
ff.Bool(shardAllFlag, false, "Process all shards")
ff.Bool(sealFlag, false, "Writecache will be left in read-only mode after flush completed")
flushCacheCmd.MarkFlagsMutuallyExclusive(shardIDFlag, shardAllFlag)
}

View file

@ -17,6 +17,7 @@ func initControlShardsCmd() {
shardsCmd.AddCommand(evacuationShardCmd)
shardsCmd.AddCommand(flushCacheCmd)
shardsCmd.AddCommand(doctorCmd)
shardsCmd.AddCommand(writecacheShardCmd)
initControlShardsListCmd()
initControlSetShardModeCmd()
@ -24,4 +25,5 @@ func initControlShardsCmd() {
initControlEvacuationShardCmd()
initControlFlushCacheCmd()
initControlDoctorCmd()
initControlShardsWritecacheCmd()
}

View file

@ -0,0 +1,73 @@
package control
import (
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc/client"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/key"
commonCmd "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/internal/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control"
"github.com/mr-tron/base58"
"github.com/spf13/cobra"
)
var writecacheShardCmd = &cobra.Command{
Use: "writecache",
Short: "Operations with storage node's write-cache",
Long: "Operations with storage node's write-cache",
}
var sealWritecacheShardCmd = &cobra.Command{
Use: "seal",
Short: "Flush objects from write-cache and move write-cache to degraded read only mode.",
Long: "Flush all the objects from the write-cache to the main storage and move the write-cache to the degraded read only mode: write-cache will be empty and no objects will be put in it.",
Run: sealWritecache,
}
func sealWritecache(cmd *cobra.Command, _ []string) {
pk := key.Get(cmd)
ignoreErrors, _ := cmd.Flags().GetBool(ignoreErrorsFlag)
req := &control.SealWriteCacheRequest{Body: &control.SealWriteCacheRequest_Body{
Shard_ID: getShardIDList(cmd),
IgnoreErrors: ignoreErrors,
}}
signRequest(cmd, pk, req)
cli := getClient(cmd, pk)
var resp *control.SealWriteCacheResponse
var err error
err = cli.ExecRaw(func(client *client.Client) error {
resp, err = control.SealWriteCache(client, req)
return err
})
commonCmd.ExitOnErr(cmd, "rpc error: %w", err)
verifyResponse(cmd, resp.GetSignature(), resp.GetBody())
var success, failed uint
for _, res := range resp.GetBody().GetResults() {
if res.GetSuccess() {
success++
cmd.Printf("Shard %s: OK\n", base58.Encode(res.GetShard_ID()))
} else {
failed++
cmd.Printf("Shard %s: failed with error %q\n", base58.Encode(res.GetShard_ID()), res.GetError())
}
}
cmd.Printf("Total: %d success, %d failed\n", success, failed)
}
func initControlShardsWritecacheCmd() {
writecacheShardCmd.AddCommand(sealWritecacheShardCmd)
initControlFlags(sealWritecacheShardCmd)
ff := sealWritecacheShardCmd.Flags()
ff.StringSlice(shardIDFlag, nil, "List of shard IDs in base58 encoding")
ff.Bool(shardAllFlag, false, "Process all shards")
ff.Bool(ignoreErrorsFlag, true, "Skip invalid/unreadable objects")
sealWritecacheShardCmd.MarkFlagsMutuallyExclusive(shardIDFlag, shardAllFlag)
}

View file

@ -2,6 +2,7 @@ package engine
import (
"context"
"sync"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
@ -11,12 +12,14 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"golang.org/x/sync/errgroup"
)
// FlushWriteCachePrm groups the parameters of FlushWriteCache operation.
type FlushWriteCachePrm struct {
shardID *shard.ID
ignoreErrors bool
seal bool
}
// SetShardID is an option to set shard ID.
@ -26,11 +29,16 @@ func (p *FlushWriteCachePrm) SetShardID(id *shard.ID) {
p.shardID = id
}
// SetIgnoreErrors sets errors ignore flag..
// SetIgnoreErrors sets errors ignore flag.
func (p *FlushWriteCachePrm) SetIgnoreErrors(ignore bool) {
p.ignoreErrors = ignore
}
// SetSeal sets seal flag.
func (p *FlushWriteCachePrm) SetSeal(v bool) {
p.seal = v
}
// FlushWriteCacheRes groups the resulting values of FlushWriteCache operation.
type FlushWriteCacheRes struct{}
@ -38,8 +46,9 @@ type FlushWriteCacheRes struct{}
func (e *StorageEngine) FlushWriteCache(ctx context.Context, p FlushWriteCachePrm) (FlushWriteCacheRes, error) {
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.FlushWriteCache",
trace.WithAttributes(
attribute.String("shard)id", p.shardID.String()),
attribute.String("shard_id", p.shardID.String()),
attribute.Bool("ignore_errors", p.ignoreErrors),
attribute.Bool("seal", p.seal),
))
defer span.End()
@ -53,10 +62,84 @@ func (e *StorageEngine) FlushWriteCache(ctx context.Context, p FlushWriteCachePr
var prm shard.FlushWriteCachePrm
prm.SetIgnoreErrors(p.ignoreErrors)
prm.SetSeal(p.seal)
return FlushWriteCacheRes{}, sh.FlushWriteCache(ctx, prm)
}
type SealWriteCachePrm struct {
ShardIDs []*shard.ID
IgnoreErrors bool
}
type ShardSealResult struct {
ShardID *shard.ID
Success bool
ErrorMsg string
}
type SealWriteCacheRes struct {
ShardResults []ShardSealResult
}
// SealWriteCache flushed all data to blobstore and moves write-cache to degraded read only mode.
func (e *StorageEngine) SealWriteCache(ctx context.Context, prm SealWriteCachePrm) (SealWriteCacheRes, error) {
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.SealWriteCache",
trace.WithAttributes(
attribute.Int("shard_id_count", len(prm.ShardIDs)),
attribute.Bool("ignore_errors", prm.IgnoreErrors),
))
defer span.End()
res := SealWriteCacheRes{
ShardResults: make([]ShardSealResult, 0, len(prm.ShardIDs)),
}
resGuard := &sync.Mutex{}
eg, egCtx := errgroup.WithContext(ctx)
for _, shardID := range prm.ShardIDs {
shardID := shardID
eg.Go(func() error {
e.mtx.RLock()
sh, ok := e.shards[shardID.String()]
e.mtx.RUnlock()
if !ok {
resGuard.Lock()
defer resGuard.Unlock()
res.ShardResults = append(res.ShardResults, ShardSealResult{
ShardID: shardID,
ErrorMsg: errShardNotFound.Error(),
})
return nil
}
err := sh.SealWriteCache(egCtx, shard.SealWriteCachePrm{IgnoreErrors: prm.IgnoreErrors})
resGuard.Lock()
defer resGuard.Unlock()
if err != nil {
res.ShardResults = append(res.ShardResults, ShardSealResult{
ShardID: shardID,
ErrorMsg: err.Error(),
})
} else {
res.ShardResults = append(res.ShardResults, ShardSealResult{
ShardID: shardID,
Success: true,
})
}
return nil
})
}
if err := eg.Wait(); err != nil {
return SealWriteCacheRes{}, err
}
return res, nil
}
type writeCacheMetrics struct {
shardID string
metrics metrics.WriteCacheMetrics

View file

@ -12,6 +12,7 @@ import (
// FlushWriteCachePrm represents parameters of a `FlushWriteCache` operation.
type FlushWriteCachePrm struct {
ignoreErrors bool
seal bool
}
// SetIgnoreErrors sets the flag to ignore read-errors during flush.
@ -19,6 +20,11 @@ func (p *FlushWriteCachePrm) SetIgnoreErrors(ignore bool) {
p.ignoreErrors = ignore
}
// SetSeal sets the flag to left writecache in read-only mode after flush.
func (p *FlushWriteCachePrm) SetSeal(v bool) {
p.seal = v
}
// errWriteCacheDisabled is returned when an operation on write-cache is performed,
// but write-cache is disabled.
var errWriteCacheDisabled = errors.New("write-cache is disabled")
@ -29,6 +35,7 @@ func (s *Shard) FlushWriteCache(ctx context.Context, p FlushWriteCachePrm) error
trace.WithAttributes(
attribute.String("shard_id", s.ID().String()),
attribute.Bool("ignore_errors", p.ignoreErrors),
attribute.Bool("seal", p.seal),
))
defer span.End()
@ -47,5 +54,35 @@ func (s *Shard) FlushWriteCache(ctx context.Context, p FlushWriteCachePrm) error
return ErrDegradedMode
}
return s.writeCache.Flush(ctx, p.ignoreErrors)
return s.writeCache.Flush(ctx, p.ignoreErrors, p.seal)
}
type SealWriteCachePrm struct {
IgnoreErrors bool
}
// SealWriteCache flushes all data from the write-cache and moves it to degraded read only mode.
func (s *Shard) SealWriteCache(ctx context.Context, p SealWriteCachePrm) error {
ctx, span := tracing.StartSpanFromContext(ctx, "Shard.SealWriteCache",
trace.WithAttributes(
attribute.String("shard_id", s.ID().String()),
attribute.Bool("ignore_errors", p.IgnoreErrors),
))
defer span.End()
if !s.hasWriteCache() {
return errWriteCacheDisabled
}
s.m.RLock()
defer s.m.RUnlock()
if s.info.Mode.ReadOnly() {
return ErrReadOnlyMode
}
if s.info.Mode.NoMetabase() {
return ErrDegradedMode
}
return s.writeCache.Seal(ctx, p.IgnoreErrors)
}

View file

@ -18,6 +18,7 @@ import (
// Delete removes object from write-cache.
//
// Returns an error of type apistatus.ObjectNotFound if object is missing in write-cache.
// Returns ErrNotInitialized if write-cache has not been initialized yet.
func (c *cache) Delete(ctx context.Context, addr oid.Address) error {
ctx, span := tracing.StartSpanFromContext(ctx, "writecache.Delete",
trace.WithAttributes(
@ -32,7 +33,9 @@ func (c *cache) Delete(ctx context.Context, addr oid.Address) error {
c.metrics.Delete(time.Since(startedAt), deleted, storageType)
}()
c.modeMtx.RLock()
if !c.modeMtx.TryRLock() {
return ErrNotInitialized
}
defer c.modeMtx.RUnlock()
if c.readOnly() {
return ErrReadOnly

View file

@ -12,6 +12,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
@ -33,6 +34,8 @@ const (
defaultFlushInterval = time.Second
)
var errIterationCompleted = errors.New("iteration completed")
// runFlushLoop starts background workers which periodically flush objects to the blobstor.
func (c *cache) runFlushLoop(ctx context.Context) {
if c.disableBackgroundFlush {
@ -135,13 +138,11 @@ func (c *cache) flushSmallObjects(ctx context.Context) {
}
}
c.modeMtx.RUnlock()
if count == 0 {
c.modeMtx.RUnlock()
break
}
c.modeMtx.RUnlock()
c.log.Debug(logs.WritecacheTriedToFlushItemsFromWritecache,
zap.Int("count", count),
zap.String("start", base58.Encode(lastKey)))
@ -229,7 +230,7 @@ func (c *cache) workerFlushSmall(ctx context.Context) {
continue
}
c.deleteFromDB(objInfo.addr)
c.deleteFromDB(objInfo.addr, true)
}
}
@ -270,19 +271,29 @@ func (c *cache) flushObject(ctx context.Context, obj *objectSDK.Object, data []b
}
// Flush flushes all objects from the write-cache to the main storage.
// Write-cache must be in readonly mode to ensure correctness of an operation and
// to prevent interference with background flush workers.
func (c *cache) Flush(ctx context.Context, ignoreErrors bool) error {
ctx, span := tracing.StartSpanFromContext(ctx, "Flush",
func (c *cache) Flush(ctx context.Context, ignoreErrors, seal bool) error {
ctx, span := tracing.StartSpanFromContext(ctx, "writecache.Flush",
trace.WithAttributes(
attribute.Bool("ignore_errors", ignoreErrors),
attribute.Bool("seal", seal),
))
defer span.End()
c.modeMtx.RLock()
defer c.modeMtx.RUnlock()
c.modeMtx.Lock() // exclusive lock to not to conflict with background flush
defer c.modeMtx.Unlock()
return c.flush(ctx, ignoreErrors)
if err := c.flush(ctx, ignoreErrors); err != nil {
return err
}
if seal {
m := c.mode | mode.ReadOnly
if err := c.setMode(ctx, m, ignoreErrors); err != nil {
return err
}
c.metrics.SetMode(m)
}
return nil
}
func (c *cache) flush(ctx context.Context, ignoreErrors bool) error {
@ -290,13 +301,53 @@ func (c *cache) flush(ctx context.Context, ignoreErrors bool) error {
return err
}
return c.db.View(func(tx *bbolt.Tx) error {
var last string
for {
batch, err := c.readNextDBBatch(ignoreErrors, last)
if err != nil {
return err
}
if len(batch) == 0 {
break
}
for _, item := range batch {
var obj objectSDK.Object
if err := obj.Unmarshal(item.data); err != nil {
c.reportFlushError(logs.FSTreeCantDecodeDBObjectAddress, item.address, metaerr.Wrap(err))
if ignoreErrors {
continue
}
return err
}
if err := c.flushObject(ctx, &obj, item.data, StorageTypeDB); err != nil {
return err
}
c.deleteFromDB(item.address, false)
}
last = batch[len(batch)-1].address
}
return nil
}
type batchItem struct {
data []byte
address string
}
func (c *cache) readNextDBBatch(ignoreErrors bool, last string) ([]batchItem, error) {
const batchSize = 100
var batch []batchItem
err := c.db.View(func(tx *bbolt.Tx) error {
var addr oid.Address
b := tx.Bucket(defaultBucket)
cs := b.Cursor()
for k, data := cs.Seek(nil); k != nil; k, data = cs.Next() {
for k, data := cs.Seek([]byte(last)); k != nil; k, data = cs.Next() {
sa := string(k)
if sa == last {
continue
}
if err := addr.DecodeString(sa); err != nil {
c.reportFlushError(logs.FSTreeCantDecodeDBObjectAddress, sa, metaerr.Wrap(err))
if ignoreErrors {
@ -305,19 +356,15 @@ func (c *cache) flush(ctx context.Context, ignoreErrors bool) error {
return err
}
var obj objectSDK.Object
if err := obj.Unmarshal(data); err != nil {
c.reportFlushError(logs.FSTreeCantDecodeDBObjectAddress, sa, metaerr.Wrap(err))
if ignoreErrors {
continue
}
return err
}
if err := c.flushObject(ctx, &obj, data, StorageTypeDB); err != nil {
return err
batch = append(batch, batchItem{data: bytes.Clone(data), address: sa})
if len(batch) == batchSize {
return errIterationCompleted
}
}
return nil
})
if err == nil || errors.Is(err, errIterationCompleted) {
return batch, nil
}
return nil, err
}

View file

@ -147,7 +147,7 @@ func runFlushTest[Option any](
require.NoError(t, bs.SetMode(mode.ReadWrite))
require.NoError(t, mb.SetMode(mode.ReadWrite))
require.NoError(t, wc.Flush(context.Background(), false))
require.NoError(t, wc.Flush(context.Background(), false, false))
check(t, mb, bs, objects)
})
@ -159,8 +159,6 @@ func runFlushTest[Option any](
// Blobstor is read-only, so we expect en error from `flush` here.
require.Error(t, wc.SetMode(mode.Degraded))
// First move to read-only mode to close background workers.
require.NoError(t, wc.SetMode(mode.ReadOnly))
require.NoError(t, bs.SetMode(mode.ReadWrite))
require.NoError(t, mb.SetMode(mode.ReadWrite))
require.NoError(t, wc.SetMode(mode.Degraded))
@ -177,14 +175,13 @@ func runFlushTest[Option any](
objects := putObjects(t, wc)
f.InjectFn(t, wc)
require.NoError(t, wc.SetMode(mode.ReadOnly))
require.NoError(t, bs.SetMode(mode.ReadWrite))
require.NoError(t, mb.SetMode(mode.ReadWrite))
require.Equal(t, uint32(0), errCount.Load())
require.Error(t, wc.Flush(context.Background(), false))
require.Error(t, wc.Flush(context.Background(), false, false))
require.Greater(t, errCount.Load(), uint32(0))
require.NoError(t, wc.Flush(context.Background(), true))
require.NoError(t, wc.Flush(context.Background(), true, false))
check(t, mb, bs, objects)
})

View file

@ -25,7 +25,7 @@ func (c *cache) SetMode(m mode.Mode) error {
c.modeMtx.Lock()
defer c.modeMtx.Unlock()
err := c.setMode(ctx, m)
err := c.setMode(ctx, m, true)
if err == nil {
c.metrics.SetMode(m)
}
@ -33,12 +33,12 @@ func (c *cache) SetMode(m mode.Mode) error {
}
// setMode applies new mode. Must be called with cache.modeMtx lock taken.
func (c *cache) setMode(ctx context.Context, m mode.Mode) error {
func (c *cache) setMode(ctx context.Context, m mode.Mode, ignoreErrors bool) error {
var err error
turnOffMeta := m.NoMetabase()
if turnOffMeta && !c.mode.NoMetabase() {
err = c.flush(ctx, true)
err = c.flush(ctx, ignoreErrors)
if err != nil {
return err
}

View file

@ -34,7 +34,9 @@ func (c *cache) Put(ctx context.Context, prm common.PutPrm) (common.PutRes, erro
c.metrics.Put(time.Since(startedAt), added, storageType)
}()
c.modeMtx.RLock()
if !c.modeMtx.TryRLock() {
return common.PutRes{}, ErrNotInitialized
}
defer c.modeMtx.RUnlock()
if c.readOnly() {
return common.PutRes{}, ErrReadOnly

View file

@ -0,0 +1,28 @@
package writecache
import (
"context"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)
func (c *cache) Seal(ctx context.Context, ignoreErrors bool) error {
ctx, span := tracing.StartSpanFromContext(ctx, "writecache.Seal",
trace.WithAttributes(
attribute.Bool("ignore_errors", ignoreErrors),
))
defer span.End()
c.modeMtx.Lock()
defer c.modeMtx.Unlock()
// flush will be done by setMode
err := c.setMode(ctx, mode.DegradedReadOnly, ignoreErrors)
if err == nil {
c.metrics.SetMode(mode.DegradedReadOnly)
}
return err
}

View file

@ -67,14 +67,24 @@ func (c *cache) openStore(readOnly bool) error {
return nil
}
func (c *cache) deleteFromDB(key string) {
func (c *cache) deleteFromDB(key string, batched bool) {
var recordDeleted bool
err := c.db.Batch(func(tx *bbolt.Tx) error {
b := tx.Bucket(defaultBucket)
key := []byte(key)
recordDeleted = b.Get(key) != nil
return b.Delete(key)
})
var err error
if batched {
err = c.db.Batch(func(tx *bbolt.Tx) error {
b := tx.Bucket(defaultBucket)
key := []byte(key)
recordDeleted = b.Get(key) != nil
return b.Delete(key)
})
} else {
err = c.db.Update(func(tx *bbolt.Tx) error {
b := tx.Bucket(defaultBucket)
key := []byte(key)
recordDeleted = b.Get(key) != nil
return b.Delete(key)
})
}
if err == nil {
c.metrics.Evict(StorageTypeDB)

View file

@ -35,7 +35,8 @@ type Cache interface {
SetMode(mode.Mode) error
SetLogger(*logger.Logger)
DumpInfo() Info
Flush(context.Context, bool) error
Flush(context.Context, bool, bool) error
Seal(context.Context, bool) error
Init() error
Open(ctx context.Context, readOnly bool) error

View file

@ -24,6 +24,7 @@ const (
rpcGetChainLocalOverride = "GetChainLocalOverride"
rpcListChainLocalOverrides = "ListChainLocalOverrides"
rpcRemoveChainLocalOverride = "RemoveChainLocalOverride"
rpcSealWriteCache = "SealWriteCache"
)
// HealthCheck executes ControlService.HealthCheck RPC.
@ -264,3 +265,16 @@ func RemoveChainLocalOverride(cli *client.Client, req *RemoveChainLocalOverrideR
return wResp.message, nil
}
// SealWriteCache executes ControlService.SealWriteCache RPC.
func SealWriteCache(cli *client.Client, req *SealWriteCacheRequest, opts ...client.CallOption) (*SealWriteCacheResponse, error) {
wResp := newResponseWrapper[SealWriteCacheResponse]()
wReq := &requestWrapper{m: req}
err := client.SendUnary(cli, common.CallMethodInfoUnary(serviceName, rpcSealWriteCache), wReq, wResp, opts...)
if err != nil {
return nil, err
}
return wResp.message, nil
}

View file

@ -18,6 +18,7 @@ func (s *Server) FlushCache(ctx context.Context, req *control.FlushCacheRequest)
for _, shardID := range s.getShardIDList(req.GetBody().GetShard_ID()) {
var prm engine.FlushWriteCachePrm
prm.SetShardID(shardID)
prm.SetSeal(req.GetBody().GetSeal())
_, err = s.s.FlushWriteCache(ctx, prm)
if err != nil {

View file

@ -0,0 +1,48 @@
package control
import (
"context"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
func (s *Server) SealWriteCache(ctx context.Context, req *control.SealWriteCacheRequest) (*control.SealWriteCacheResponse, error) {
err := s.isValidRequest(req)
if err != nil {
return nil, status.Error(codes.PermissionDenied, err.Error())
}
prm := engine.SealWriteCachePrm{
ShardIDs: s.getShardIDList(req.GetBody().GetShard_ID()),
IgnoreErrors: req.GetBody().GetIgnoreErrors(),
}
res, err := s.s.SealWriteCache(ctx, prm)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
resp := &control.SealWriteCacheResponse{Body: &control.SealWriteCacheResponse_Body{}}
for _, r := range res.ShardResults {
if r.Success {
resp.Body.Results = append(resp.GetBody().GetResults(), &control.SealWriteCacheResponse_Body_Status{
Shard_ID: *r.ShardID,
Success: true,
})
} else {
resp.Body.Results = append(resp.GetBody().GetResults(), &control.SealWriteCacheResponse_Body_Status{
Shard_ID: *r.ShardID,
Error: r.ErrorMsg,
})
}
}
err = SignMessage(s.key, resp)
if err != nil {
return nil, err
}
return resp, nil
}

File diff suppressed because it is too large Load diff

View file

@ -56,6 +56,9 @@ service ControlService {
// Remove local access policy engine overrides stored in the node by chaind id.
rpc RemoveChainLocalOverride (RemoveChainLocalOverrideRequest) returns (RemoveChainLocalOverrideResponse);
// Flush objects from write-cache and move it to degraded read only mode.
rpc SealWriteCache(SealWriteCacheRequest) returns (SealWriteCacheResponse);
}
// Health check request.
@ -280,6 +283,8 @@ message FlushCacheRequest {
message Body {
// ID of the shard.
repeated bytes shard_ID = 1;
// If true, then writecache will be left in read-only mode after flush completed.
bool seal = 2;
}
Body body = 1;
@ -523,3 +528,32 @@ message RemoveChainLocalOverrideResponse {
Signature signature = 2;
}
message SealWriteCacheRequest {
// Request body structure.
message Body {
// ID of the shard.
repeated bytes shard_ID = 1;
// Flag indicating whether object read errors should be ignored.
bool ignore_errors = 2;
}
Body body = 1;
Signature signature = 2;
}
message SealWriteCacheResponse {
message Body {
message Status {
bytes shard_ID = 1;
bool success = 2;
string error = 3;
}
repeated Status results = 1;
}
Body body = 1;
Signature signature = 2;
}

View file

@ -1180,6 +1180,7 @@ func (x *FlushCacheRequest_Body) StableSize() (size int) {
return 0
}
size += proto.RepeatedBytesSize(1, x.Shard_ID)
size += proto.BoolSize(2, x.Seal)
return size
}
@ -1200,6 +1201,7 @@ func (x *FlushCacheRequest_Body) StableMarshal(buf []byte) []byte {
}
var offset int
offset += proto.RepeatedBytesMarshal(1, buf[offset:], x.Shard_ID)
offset += proto.BoolMarshal(2, buf[offset:], x.Seal)
return buf
}
@ -2753,3 +2755,216 @@ func (x *RemoveChainLocalOverrideResponse) ReadSignedData(buf []byte) ([]byte, e
func (x *RemoveChainLocalOverrideResponse) SetSignature(sig *Signature) {
x.Signature = sig
}
// StableSize returns the size of x in protobuf format.
//
// Structures with the same field values have the same binary size.
func (x *SealWriteCacheRequest_Body) StableSize() (size int) {
if x == nil {
return 0
}
size += proto.RepeatedBytesSize(1, x.Shard_ID)
size += proto.BoolSize(2, x.IgnoreErrors)
return size
}
// StableMarshal marshals x in protobuf binary format with stable field order.
//
// If buffer length is less than x.StableSize(), new buffer is allocated.
//
// Returns any error encountered which did not allow writing the data completely.
// Otherwise, returns the buffer in which the data is written.
//
// Structures with the same field values have the same binary format.
func (x *SealWriteCacheRequest_Body) StableMarshal(buf []byte) []byte {
if x == nil {
return []byte{}
}
if buf == nil {
buf = make([]byte, x.StableSize())
}
var offset int
offset += proto.RepeatedBytesMarshal(1, buf[offset:], x.Shard_ID)
offset += proto.BoolMarshal(2, buf[offset:], x.IgnoreErrors)
return buf
}
// StableSize returns the size of x in protobuf format.
//
// Structures with the same field values have the same binary size.
func (x *SealWriteCacheRequest) StableSize() (size int) {
if x == nil {
return 0
}
size += proto.NestedStructureSize(1, x.Body)
size += proto.NestedStructureSize(2, x.Signature)
return size
}
// StableMarshal marshals x in protobuf binary format with stable field order.
//
// If buffer length is less than x.StableSize(), new buffer is allocated.
//
// Returns any error encountered which did not allow writing the data completely.
// Otherwise, returns the buffer in which the data is written.
//
// Structures with the same field values have the same binary format.
func (x *SealWriteCacheRequest) StableMarshal(buf []byte) []byte {
if x == nil {
return []byte{}
}
if buf == nil {
buf = make([]byte, x.StableSize())
}
var offset int
offset += proto.NestedStructureMarshal(1, buf[offset:], x.Body)
offset += proto.NestedStructureMarshal(2, buf[offset:], x.Signature)
return buf
}
// ReadSignedData fills buf with signed data of x.
// If buffer length is less than x.SignedDataSize(), new buffer is allocated.
//
// Returns any error encountered which did not allow writing the data completely.
// Otherwise, returns the buffer in which the data is written.
//
// Structures with the same field values have the same signed data.
func (x *SealWriteCacheRequest) SignedDataSize() int {
return x.GetBody().StableSize()
}
// SignedDataSize returns size of the request signed data in bytes.
//
// Structures with the same field values have the same signed data size.
func (x *SealWriteCacheRequest) ReadSignedData(buf []byte) ([]byte, error) {
return x.GetBody().StableMarshal(buf), nil
}
func (x *SealWriteCacheRequest) SetSignature(sig *Signature) {
x.Signature = sig
}
// StableSize returns the size of x in protobuf format.
//
// Structures with the same field values have the same binary size.
func (x *SealWriteCacheResponse_Body_Status) StableSize() (size int) {
if x == nil {
return 0
}
size += proto.BytesSize(1, x.Shard_ID)
size += proto.BoolSize(2, x.Success)
size += proto.StringSize(3, x.Error)
return size
}
// StableMarshal marshals x in protobuf binary format with stable field order.
//
// If buffer length is less than x.StableSize(), new buffer is allocated.
//
// Returns any error encountered which did not allow writing the data completely.
// Otherwise, returns the buffer in which the data is written.
//
// Structures with the same field values have the same binary format.
func (x *SealWriteCacheResponse_Body_Status) StableMarshal(buf []byte) []byte {
if x == nil {
return []byte{}
}
if buf == nil {
buf = make([]byte, x.StableSize())
}
var offset int
offset += proto.BytesMarshal(1, buf[offset:], x.Shard_ID)
offset += proto.BoolMarshal(2, buf[offset:], x.Success)
offset += proto.StringMarshal(3, buf[offset:], x.Error)
return buf
}
// StableSize returns the size of x in protobuf format.
//
// Structures with the same field values have the same binary size.
func (x *SealWriteCacheResponse_Body) StableSize() (size int) {
if x == nil {
return 0
}
for i := range x.Results {
size += proto.NestedStructureSize(1, x.Results[i])
}
return size
}
// StableMarshal marshals x in protobuf binary format with stable field order.
//
// If buffer length is less than x.StableSize(), new buffer is allocated.
//
// Returns any error encountered which did not allow writing the data completely.
// Otherwise, returns the buffer in which the data is written.
//
// Structures with the same field values have the same binary format.
func (x *SealWriteCacheResponse_Body) StableMarshal(buf []byte) []byte {
if x == nil {
return []byte{}
}
if buf == nil {
buf = make([]byte, x.StableSize())
}
var offset int
for i := range x.Results {
offset += proto.NestedStructureMarshal(1, buf[offset:], x.Results[i])
}
return buf
}
// StableSize returns the size of x in protobuf format.
//
// Structures with the same field values have the same binary size.
func (x *SealWriteCacheResponse) StableSize() (size int) {
if x == nil {
return 0
}
size += proto.NestedStructureSize(1, x.Body)
size += proto.NestedStructureSize(2, x.Signature)
return size
}
// StableMarshal marshals x in protobuf binary format with stable field order.
//
// If buffer length is less than x.StableSize(), new buffer is allocated.
//
// Returns any error encountered which did not allow writing the data completely.
// Otherwise, returns the buffer in which the data is written.
//
// Structures with the same field values have the same binary format.
func (x *SealWriteCacheResponse) StableMarshal(buf []byte) []byte {
if x == nil {
return []byte{}
}
if buf == nil {
buf = make([]byte, x.StableSize())
}
var offset int
offset += proto.NestedStructureMarshal(1, buf[offset:], x.Body)
offset += proto.NestedStructureMarshal(2, buf[offset:], x.Signature)
return buf
}
// ReadSignedData fills buf with signed data of x.
// If buffer length is less than x.SignedDataSize(), new buffer is allocated.
//
// Returns any error encountered which did not allow writing the data completely.
// Otherwise, returns the buffer in which the data is written.
//
// Structures with the same field values have the same signed data.
func (x *SealWriteCacheResponse) SignedDataSize() int {
return x.GetBody().StableSize()
}
// SignedDataSize returns size of the request signed data in bytes.
//
// Structures with the same field values have the same signed data size.
func (x *SealWriteCacheResponse) ReadSignedData(buf []byte) ([]byte, error) {
return x.GetBody().StableMarshal(buf), nil
}
func (x *SealWriteCacheResponse) SetSignature(sig *Signature) {
x.Signature = sig
}

View file

@ -35,6 +35,7 @@ const (
ControlService_GetChainLocalOverride_FullMethodName = "/control.ControlService/GetChainLocalOverride"
ControlService_ListChainLocalOverrides_FullMethodName = "/control.ControlService/ListChainLocalOverrides"
ControlService_RemoveChainLocalOverride_FullMethodName = "/control.ControlService/RemoveChainLocalOverride"
ControlService_SealWriteCache_FullMethodName = "/control.ControlService/SealWriteCache"
)
// ControlServiceClient is the client API for ControlService service.
@ -74,6 +75,8 @@ type ControlServiceClient interface {
ListChainLocalOverrides(ctx context.Context, in *ListChainLocalOverridesRequest, opts ...grpc.CallOption) (*ListChainLocalOverridesResponse, error)
// Remove local access policy engine overrides stored in the node by chaind id.
RemoveChainLocalOverride(ctx context.Context, in *RemoveChainLocalOverrideRequest, opts ...grpc.CallOption) (*RemoveChainLocalOverrideResponse, error)
// Flush objects from write-cache and move it to degraded read only mode.
SealWriteCache(ctx context.Context, in *SealWriteCacheRequest, opts ...grpc.CallOption) (*SealWriteCacheResponse, error)
}
type controlServiceClient struct {
@ -228,6 +231,15 @@ func (c *controlServiceClient) RemoveChainLocalOverride(ctx context.Context, in
return out, nil
}
func (c *controlServiceClient) SealWriteCache(ctx context.Context, in *SealWriteCacheRequest, opts ...grpc.CallOption) (*SealWriteCacheResponse, error) {
out := new(SealWriteCacheResponse)
err := c.cc.Invoke(ctx, ControlService_SealWriteCache_FullMethodName, in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
// ControlServiceServer is the server API for ControlService service.
// All implementations should embed UnimplementedControlServiceServer
// for forward compatibility
@ -265,6 +277,8 @@ type ControlServiceServer interface {
ListChainLocalOverrides(context.Context, *ListChainLocalOverridesRequest) (*ListChainLocalOverridesResponse, error)
// Remove local access policy engine overrides stored in the node by chaind id.
RemoveChainLocalOverride(context.Context, *RemoveChainLocalOverrideRequest) (*RemoveChainLocalOverrideResponse, error)
// Flush objects from write-cache and move it to degraded read only mode.
SealWriteCache(context.Context, *SealWriteCacheRequest) (*SealWriteCacheResponse, error)
}
// UnimplementedControlServiceServer should be embedded to have forward compatible implementations.
@ -319,6 +333,9 @@ func (UnimplementedControlServiceServer) ListChainLocalOverrides(context.Context
func (UnimplementedControlServiceServer) RemoveChainLocalOverride(context.Context, *RemoveChainLocalOverrideRequest) (*RemoveChainLocalOverrideResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method RemoveChainLocalOverride not implemented")
}
func (UnimplementedControlServiceServer) SealWriteCache(context.Context, *SealWriteCacheRequest) (*SealWriteCacheResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method SealWriteCache not implemented")
}
// UnsafeControlServiceServer may be embedded to opt out of forward compatibility for this service.
// Use of this interface is not recommended, as added methods to ControlServiceServer will
@ -619,6 +636,24 @@ func _ControlService_RemoveChainLocalOverride_Handler(srv interface{}, ctx conte
return interceptor(ctx, in, info, handler)
}
func _ControlService_SealWriteCache_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(SealWriteCacheRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(ControlServiceServer).SealWriteCache(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: ControlService_SealWriteCache_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(ControlServiceServer).SealWriteCache(ctx, req.(*SealWriteCacheRequest))
}
return interceptor(ctx, in, info, handler)
}
// ControlService_ServiceDesc is the grpc.ServiceDesc for ControlService service.
// It's only intended for direct use with grpc.RegisterService,
// and not to be introspected or modified (even as a copy)
@ -690,6 +725,10 @@ var ControlService_ServiceDesc = grpc.ServiceDesc{
MethodName: "RemoveChainLocalOverride",
Handler: _ControlService_RemoveChainLocalOverride_Handler,
},
{
MethodName: "SealWriteCache",
Handler: _ControlService_SealWriteCache_Handler,
},
},
Streams: []grpc.StreamDesc{},
Metadata: "pkg/services/control/service.proto",