Allow to seal writecache after flush #886
20 changed files with 1641 additions and 603 deletions
|
@ -8,17 +8,23 @@ import (
|
||||||
"github.com/spf13/cobra"
|
"github.com/spf13/cobra"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const sealFlag = "seal"
|
||||||
|
|
||||||
var flushCacheCmd = &cobra.Command{
|
var flushCacheCmd = &cobra.Command{
|
||||||
Use: "flush-cache",
|
Use: "flush-cache",
|
||||||
Short: "Flush objects from the write-cache to the main storage",
|
Short: "Flush objects from the write-cache to the main storage",
|
||||||
Long: "Flush objects from the write-cache to the main storage",
|
Long: "Flush objects from the write-cache to the main storage",
|
||||||
Run: flushCache,
|
Run: flushCache,
|
||||||
|
Deprecated: "Flushing objects from writecache to the main storage is performed by writecache automatically. To flush and seal writecache use `frostfs-cli control shards writecache seal`.",
|
||||||
}
|
}
|
||||||
|
|
||||||
func flushCache(cmd *cobra.Command, _ []string) {
|
func flushCache(cmd *cobra.Command, _ []string) {
|
||||||
pk := key.Get(cmd)
|
pk := key.Get(cmd)
|
||||||
|
|
||||||
req := &control.FlushCacheRequest{Body: new(control.FlushCacheRequest_Body)}
|
seal, _ := cmd.Flags().GetBool(sealFlag)
|
||||||
|
req := &control.FlushCacheRequest{Body: &control.FlushCacheRequest_Body{
|
||||||
|
Seal: seal,
|
||||||
|
}}
|
||||||
req.Body.Shard_ID = getShardIDList(cmd)
|
req.Body.Shard_ID = getShardIDList(cmd)
|
||||||
|
|
||||||
signRequest(cmd, pk, req)
|
signRequest(cmd, pk, req)
|
||||||
|
@ -44,6 +50,7 @@ func initControlFlushCacheCmd() {
|
||||||
ff := flushCacheCmd.Flags()
|
ff := flushCacheCmd.Flags()
|
||||||
ff.StringSlice(shardIDFlag, nil, "List of shard IDs in base58 encoding")
|
ff.StringSlice(shardIDFlag, nil, "List of shard IDs in base58 encoding")
|
||||||
ff.Bool(shardAllFlag, false, "Process all shards")
|
ff.Bool(shardAllFlag, false, "Process all shards")
|
||||||
|
ff.Bool(sealFlag, false, "Writecache will be left in read-only mode after flush completed")
|
||||||
|
|
||||||
flushCacheCmd.MarkFlagsMutuallyExclusive(shardIDFlag, shardAllFlag)
|
flushCacheCmd.MarkFlagsMutuallyExclusive(shardIDFlag, shardAllFlag)
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,6 +17,7 @@ func initControlShardsCmd() {
|
||||||
shardsCmd.AddCommand(evacuationShardCmd)
|
shardsCmd.AddCommand(evacuationShardCmd)
|
||||||
shardsCmd.AddCommand(flushCacheCmd)
|
shardsCmd.AddCommand(flushCacheCmd)
|
||||||
shardsCmd.AddCommand(doctorCmd)
|
shardsCmd.AddCommand(doctorCmd)
|
||||||
|
shardsCmd.AddCommand(writecacheShardCmd)
|
||||||
|
|
||||||
initControlShardsListCmd()
|
initControlShardsListCmd()
|
||||||
initControlSetShardModeCmd()
|
initControlSetShardModeCmd()
|
||||||
|
@ -24,4 +25,5 @@ func initControlShardsCmd() {
|
||||||
initControlEvacuationShardCmd()
|
initControlEvacuationShardCmd()
|
||||||
initControlFlushCacheCmd()
|
initControlFlushCacheCmd()
|
||||||
initControlDoctorCmd()
|
initControlDoctorCmd()
|
||||||
|
initControlShardsWritecacheCmd()
|
||||||
}
|
}
|
||||||
|
|
73
cmd/frostfs-cli/modules/control/writecache.go
Normal file
73
cmd/frostfs-cli/modules/control/writecache.go
Normal file
|
@ -0,0 +1,73 @@
|
||||||
|
package control
|
||||||
|
|
||||||
|
import (
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc/client"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/key"
|
||||||
|
commonCmd "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/internal/common"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control"
|
||||||
|
"github.com/mr-tron/base58"
|
||||||
|
"github.com/spf13/cobra"
|
||||||
|
)
|
||||||
|
|
||||||
|
var writecacheShardCmd = &cobra.Command{
|
||||||
|
Use: "writecache",
|
||||||
|
Short: "Operations with storage node's write-cache",
|
||||||
|
Long: "Operations with storage node's write-cache",
|
||||||
|
}
|
||||||
|
|
||||||
|
var sealWritecacheShardCmd = &cobra.Command{
|
||||||
|
Use: "seal",
|
||||||
|
Short: "Flush objects from write-cache and move write-cache to degraded read only mode.",
|
||||||
|
Long: "Flush all the objects from the write-cache to the main storage and move the write-cache to the degraded read only mode: write-cache will be empty and no objects will be put in it.",
|
||||||
|
Run: sealWritecache,
|
||||||
|
}
|
||||||
|
|
||||||
|
func sealWritecache(cmd *cobra.Command, _ []string) {
|
||||||
|
pk := key.Get(cmd)
|
||||||
|
|
||||||
|
ignoreErrors, _ := cmd.Flags().GetBool(ignoreErrorsFlag)
|
||||||
|
|
||||||
|
req := &control.SealWriteCacheRequest{Body: &control.SealWriteCacheRequest_Body{
|
||||||
|
Shard_ID: getShardIDList(cmd),
|
||||||
|
IgnoreErrors: ignoreErrors,
|
||||||
|
}}
|
||||||
|
|
||||||
|
signRequest(cmd, pk, req)
|
||||||
|
|
||||||
|
cli := getClient(cmd, pk)
|
||||||
|
|
||||||
|
var resp *control.SealWriteCacheResponse
|
||||||
|
var err error
|
||||||
|
err = cli.ExecRaw(func(client *client.Client) error {
|
||||||
|
resp, err = control.SealWriteCache(client, req)
|
||||||
|
return err
|
||||||
|
})
|
||||||
|
commonCmd.ExitOnErr(cmd, "rpc error: %w", err)
|
||||||
|
|
||||||
|
verifyResponse(cmd, resp.GetSignature(), resp.GetBody())
|
||||||
|
|
||||||
|
var success, failed uint
|
||||||
|
for _, res := range resp.GetBody().GetResults() {
|
||||||
|
if res.GetSuccess() {
|
||||||
|
success++
|
||||||
|
cmd.Printf("Shard %s: OK\n", base58.Encode(res.GetShard_ID()))
|
||||||
|
} else {
|
||||||
|
failed++
|
||||||
|
cmd.Printf("Shard %s: failed with error %q\n", base58.Encode(res.GetShard_ID()), res.GetError())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
cmd.Printf("Total: %d success, %d failed\n", success, failed)
|
||||||
|
}
|
||||||
|
|
||||||
|
func initControlShardsWritecacheCmd() {
|
||||||
|
writecacheShardCmd.AddCommand(sealWritecacheShardCmd)
|
||||||
|
|
||||||
|
initControlFlags(sealWritecacheShardCmd)
|
||||||
|
|
||||||
|
ff := sealWritecacheShardCmd.Flags()
|
||||||
|
ff.StringSlice(shardIDFlag, nil, "List of shard IDs in base58 encoding")
|
||||||
|
ff.Bool(shardAllFlag, false, "Process all shards")
|
||||||
|
ff.Bool(ignoreErrorsFlag, true, "Skip invalid/unreadable objects")
|
||||||
|
|
||||||
|
sealWritecacheShardCmd.MarkFlagsMutuallyExclusive(shardIDFlag, shardAllFlag)
|
||||||
|
}
|
|
@ -2,6 +2,7 @@ package engine
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
|
||||||
|
@ -11,12 +12,14 @@ import (
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||||
"go.opentelemetry.io/otel/attribute"
|
"go.opentelemetry.io/otel/attribute"
|
||||||
"go.opentelemetry.io/otel/trace"
|
"go.opentelemetry.io/otel/trace"
|
||||||
|
"golang.org/x/sync/errgroup"
|
||||||
)
|
)
|
||||||
|
|
||||||
// FlushWriteCachePrm groups the parameters of FlushWriteCache operation.
|
// FlushWriteCachePrm groups the parameters of FlushWriteCache operation.
|
||||||
type FlushWriteCachePrm struct {
|
type FlushWriteCachePrm struct {
|
||||||
shardID *shard.ID
|
shardID *shard.ID
|
||||||
ignoreErrors bool
|
ignoreErrors bool
|
||||||
|
seal bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// SetShardID is an option to set shard ID.
|
// SetShardID is an option to set shard ID.
|
||||||
|
@ -26,11 +29,16 @@ func (p *FlushWriteCachePrm) SetShardID(id *shard.ID) {
|
||||||
p.shardID = id
|
p.shardID = id
|
||||||
}
|
}
|
||||||
|
|
||||||
// SetIgnoreErrors sets errors ignore flag..
|
// SetIgnoreErrors sets errors ignore flag.
|
||||||
func (p *FlushWriteCachePrm) SetIgnoreErrors(ignore bool) {
|
func (p *FlushWriteCachePrm) SetIgnoreErrors(ignore bool) {
|
||||||
p.ignoreErrors = ignore
|
p.ignoreErrors = ignore
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SetSeal sets seal flag.
|
||||||
|
func (p *FlushWriteCachePrm) SetSeal(v bool) {
|
||||||
|
p.seal = v
|
||||||
|
}
|
||||||
|
|
||||||
// FlushWriteCacheRes groups the resulting values of FlushWriteCache operation.
|
// FlushWriteCacheRes groups the resulting values of FlushWriteCache operation.
|
||||||
type FlushWriteCacheRes struct{}
|
type FlushWriteCacheRes struct{}
|
||||||
|
|
||||||
|
@ -38,8 +46,9 @@ type FlushWriteCacheRes struct{}
|
||||||
func (e *StorageEngine) FlushWriteCache(ctx context.Context, p FlushWriteCachePrm) (FlushWriteCacheRes, error) {
|
func (e *StorageEngine) FlushWriteCache(ctx context.Context, p FlushWriteCachePrm) (FlushWriteCacheRes, error) {
|
||||||
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.FlushWriteCache",
|
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.FlushWriteCache",
|
||||||
trace.WithAttributes(
|
trace.WithAttributes(
|
||||||
attribute.String("shard)id", p.shardID.String()),
|
attribute.String("shard_id", p.shardID.String()),
|
||||||
attribute.Bool("ignore_errors", p.ignoreErrors),
|
attribute.Bool("ignore_errors", p.ignoreErrors),
|
||||||
|
attribute.Bool("seal", p.seal),
|
||||||
))
|
))
|
||||||
defer span.End()
|
defer span.End()
|
||||||
|
|
||||||
|
@ -53,10 +62,84 @@ func (e *StorageEngine) FlushWriteCache(ctx context.Context, p FlushWriteCachePr
|
||||||
|
|
||||||
var prm shard.FlushWriteCachePrm
|
var prm shard.FlushWriteCachePrm
|
||||||
prm.SetIgnoreErrors(p.ignoreErrors)
|
prm.SetIgnoreErrors(p.ignoreErrors)
|
||||||
|
prm.SetSeal(p.seal)
|
||||||
|
|
||||||
return FlushWriteCacheRes{}, sh.FlushWriteCache(ctx, prm)
|
return FlushWriteCacheRes{}, sh.FlushWriteCache(ctx, prm)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type SealWriteCachePrm struct {
|
||||||
|
ShardIDs []*shard.ID
|
||||||
|
IgnoreErrors bool
|
||||||
|
}
|
||||||
|
|
||||||
|
type ShardSealResult struct {
|
||||||
|
ShardID *shard.ID
|
||||||
|
Success bool
|
||||||
|
ErrorMsg string
|
||||||
|
}
|
||||||
|
|
||||||
|
type SealWriteCacheRes struct {
|
||||||
|
ShardResults []ShardSealResult
|
||||||
|
}
|
||||||
|
|
||||||
|
// SealWriteCache flushed all data to blobstore and moves write-cache to degraded read only mode.
|
||||||
|
func (e *StorageEngine) SealWriteCache(ctx context.Context, prm SealWriteCachePrm) (SealWriteCacheRes, error) {
|
||||||
|
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.SealWriteCache",
|
||||||
|
trace.WithAttributes(
|
||||||
|
attribute.Int("shard_id_count", len(prm.ShardIDs)),
|
||||||
|
attribute.Bool("ignore_errors", prm.IgnoreErrors),
|
||||||
|
))
|
||||||
|
defer span.End()
|
||||||
|
|
||||||
|
res := SealWriteCacheRes{
|
||||||
|
ShardResults: make([]ShardSealResult, 0, len(prm.ShardIDs)),
|
||||||
|
}
|
||||||
|
resGuard := &sync.Mutex{}
|
||||||
|
|
||||||
|
eg, egCtx := errgroup.WithContext(ctx)
|
||||||
|
for _, shardID := range prm.ShardIDs {
|
||||||
|
shardID := shardID
|
||||||
|
eg.Go(func() error {
|
||||||
|
e.mtx.RLock()
|
||||||
|
sh, ok := e.shards[shardID.String()]
|
||||||
|
e.mtx.RUnlock()
|
||||||
|
|
||||||
|
if !ok {
|
||||||
|
resGuard.Lock()
|
||||||
|
defer resGuard.Unlock()
|
||||||
|
res.ShardResults = append(res.ShardResults, ShardSealResult{
|
||||||
|
ShardID: shardID,
|
||||||
|
ErrorMsg: errShardNotFound.Error(),
|
||||||
|
})
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
err := sh.SealWriteCache(egCtx, shard.SealWriteCachePrm{IgnoreErrors: prm.IgnoreErrors})
|
||||||
|
|
||||||
|
resGuard.Lock()
|
||||||
|
defer resGuard.Unlock()
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
res.ShardResults = append(res.ShardResults, ShardSealResult{
|
||||||
|
ShardID: shardID,
|
||||||
|
ErrorMsg: err.Error(),
|
||||||
|
})
|
||||||
|
} else {
|
||||||
|
res.ShardResults = append(res.ShardResults, ShardSealResult{
|
||||||
|
ShardID: shardID,
|
||||||
|
Success: true,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := eg.Wait(); err != nil {
|
||||||
|
return SealWriteCacheRes{}, err
|
||||||
|
}
|
||||||
|
return res, nil
|
||||||
|
}
|
||||||
|
|
||||||
type writeCacheMetrics struct {
|
type writeCacheMetrics struct {
|
||||||
shardID string
|
shardID string
|
||||||
metrics metrics.WriteCacheMetrics
|
metrics metrics.WriteCacheMetrics
|
||||||
|
|
|
@ -12,6 +12,7 @@ import (
|
||||||
// FlushWriteCachePrm represents parameters of a `FlushWriteCache` operation.
|
// FlushWriteCachePrm represents parameters of a `FlushWriteCache` operation.
|
||||||
type FlushWriteCachePrm struct {
|
type FlushWriteCachePrm struct {
|
||||||
ignoreErrors bool
|
ignoreErrors bool
|
||||||
|
seal bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// SetIgnoreErrors sets the flag to ignore read-errors during flush.
|
// SetIgnoreErrors sets the flag to ignore read-errors during flush.
|
||||||
|
@ -19,6 +20,11 @@ func (p *FlushWriteCachePrm) SetIgnoreErrors(ignore bool) {
|
||||||
p.ignoreErrors = ignore
|
p.ignoreErrors = ignore
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SetSeal sets the flag to left writecache in read-only mode after flush.
|
||||||
|
func (p *FlushWriteCachePrm) SetSeal(v bool) {
|
||||||
|
p.seal = v
|
||||||
|
}
|
||||||
|
|
||||||
// errWriteCacheDisabled is returned when an operation on write-cache is performed,
|
// errWriteCacheDisabled is returned when an operation on write-cache is performed,
|
||||||
// but write-cache is disabled.
|
// but write-cache is disabled.
|
||||||
var errWriteCacheDisabled = errors.New("write-cache is disabled")
|
var errWriteCacheDisabled = errors.New("write-cache is disabled")
|
||||||
|
@ -29,6 +35,7 @@ func (s *Shard) FlushWriteCache(ctx context.Context, p FlushWriteCachePrm) error
|
||||||
trace.WithAttributes(
|
trace.WithAttributes(
|
||||||
attribute.String("shard_id", s.ID().String()),
|
attribute.String("shard_id", s.ID().String()),
|
||||||
attribute.Bool("ignore_errors", p.ignoreErrors),
|
attribute.Bool("ignore_errors", p.ignoreErrors),
|
||||||
|
attribute.Bool("seal", p.seal),
|
||||||
))
|
))
|
||||||
defer span.End()
|
defer span.End()
|
||||||
|
|
||||||
|
@ -47,5 +54,35 @@ func (s *Shard) FlushWriteCache(ctx context.Context, p FlushWriteCachePrm) error
|
||||||
return ErrDegradedMode
|
return ErrDegradedMode
|
||||||
}
|
}
|
||||||
|
|
||||||
return s.writeCache.Flush(ctx, p.ignoreErrors)
|
return s.writeCache.Flush(ctx, p.ignoreErrors, p.seal)
|
||||||
|
}
|
||||||
|
|
||||||
|
type SealWriteCachePrm struct {
|
||||||
|
IgnoreErrors bool
|
||||||
|
}
|
||||||
|
|
||||||
|
// SealWriteCache flushes all data from the write-cache and moves it to degraded read only mode.
|
||||||
|
func (s *Shard) SealWriteCache(ctx context.Context, p SealWriteCachePrm) error {
|
||||||
|
ctx, span := tracing.StartSpanFromContext(ctx, "Shard.SealWriteCache",
|
||||||
|
trace.WithAttributes(
|
||||||
|
attribute.String("shard_id", s.ID().String()),
|
||||||
|
attribute.Bool("ignore_errors", p.IgnoreErrors),
|
||||||
|
))
|
||||||
|
defer span.End()
|
||||||
|
|
||||||
|
if !s.hasWriteCache() {
|
||||||
|
return errWriteCacheDisabled
|
||||||
|
}
|
||||||
|
|
||||||
|
s.m.RLock()
|
||||||
|
defer s.m.RUnlock()
|
||||||
|
|
||||||
|
if s.info.Mode.ReadOnly() {
|
||||||
|
return ErrReadOnlyMode
|
||||||
|
}
|
||||||
|
if s.info.Mode.NoMetabase() {
|
||||||
|
return ErrDegradedMode
|
||||||
|
}
|
||||||
|
|
||||||
|
return s.writeCache.Seal(ctx, p.IgnoreErrors)
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,6 +18,7 @@ import (
|
||||||
// Delete removes object from write-cache.
|
// Delete removes object from write-cache.
|
||||||
//
|
//
|
||||||
// Returns an error of type apistatus.ObjectNotFound if object is missing in write-cache.
|
// Returns an error of type apistatus.ObjectNotFound if object is missing in write-cache.
|
||||||
|
// Returns ErrNotInitialized if write-cache has not been initialized yet.
|
||||||
func (c *cache) Delete(ctx context.Context, addr oid.Address) error {
|
func (c *cache) Delete(ctx context.Context, addr oid.Address) error {
|
||||||
ctx, span := tracing.StartSpanFromContext(ctx, "writecache.Delete",
|
ctx, span := tracing.StartSpanFromContext(ctx, "writecache.Delete",
|
||||||
trace.WithAttributes(
|
trace.WithAttributes(
|
||||||
|
@ -32,7 +33,9 @@ func (c *cache) Delete(ctx context.Context, addr oid.Address) error {
|
||||||
c.metrics.Delete(time.Since(startedAt), deleted, storageType)
|
c.metrics.Delete(time.Since(startedAt), deleted, storageType)
|
||||||
}()
|
}()
|
||||||
|
|
||||||
c.modeMtx.RLock()
|
if !c.modeMtx.TryRLock() {
|
||||||
|
return ErrNotInitialized
|
||||||
|
}
|
||||||
defer c.modeMtx.RUnlock()
|
defer c.modeMtx.RUnlock()
|
||||||
if c.readOnly() {
|
if c.readOnly() {
|
||||||
return ErrReadOnly
|
return ErrReadOnly
|
||||||
|
|
|
@ -12,6 +12,7 @@ import (
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
|
||||||
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
|
@ -33,6 +34,8 @@ const (
|
||||||
defaultFlushInterval = time.Second
|
defaultFlushInterval = time.Second
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var errIterationCompleted = errors.New("iteration completed")
|
||||||
|
|
||||||
// runFlushLoop starts background workers which periodically flush objects to the blobstor.
|
// runFlushLoop starts background workers which periodically flush objects to the blobstor.
|
||||||
func (c *cache) runFlushLoop(ctx context.Context) {
|
func (c *cache) runFlushLoop(ctx context.Context) {
|
||||||
if c.disableBackgroundFlush {
|
if c.disableBackgroundFlush {
|
||||||
|
@ -135,13 +138,11 @@ func (c *cache) flushSmallObjects(ctx context.Context) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
c.modeMtx.RUnlock()
|
||||||
if count == 0 {
|
if count == 0 {
|
||||||
c.modeMtx.RUnlock()
|
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
c.modeMtx.RUnlock()
|
|
||||||
|
|
||||||
c.log.Debug(logs.WritecacheTriedToFlushItemsFromWritecache,
|
c.log.Debug(logs.WritecacheTriedToFlushItemsFromWritecache,
|
||||||
zap.Int("count", count),
|
zap.Int("count", count),
|
||||||
zap.String("start", base58.Encode(lastKey)))
|
zap.String("start", base58.Encode(lastKey)))
|
||||||
|
@ -229,7 +230,7 @@ func (c *cache) workerFlushSmall(ctx context.Context) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
c.deleteFromDB(objInfo.addr)
|
c.deleteFromDB(objInfo.addr, true)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -270,19 +271,29 @@ func (c *cache) flushObject(ctx context.Context, obj *objectSDK.Object, data []b
|
||||||
}
|
}
|
||||||
|
|
||||||
// Flush flushes all objects from the write-cache to the main storage.
|
// Flush flushes all objects from the write-cache to the main storage.
|
||||||
// Write-cache must be in readonly mode to ensure correctness of an operation and
|
func (c *cache) Flush(ctx context.Context, ignoreErrors, seal bool) error {
|
||||||
// to prevent interference with background flush workers.
|
ctx, span := tracing.StartSpanFromContext(ctx, "writecache.Flush",
|
||||||
func (c *cache) Flush(ctx context.Context, ignoreErrors bool) error {
|
|
||||||
ctx, span := tracing.StartSpanFromContext(ctx, "Flush",
|
|
||||||
trace.WithAttributes(
|
trace.WithAttributes(
|
||||||
attribute.Bool("ignore_errors", ignoreErrors),
|
attribute.Bool("ignore_errors", ignoreErrors),
|
||||||
|
attribute.Bool("seal", seal),
|
||||||
))
|
))
|
||||||
defer span.End()
|
defer span.End()
|
||||||
|
|
||||||
c.modeMtx.RLock()
|
c.modeMtx.Lock() // exclusive lock to not to conflict with background flush
|
||||||
defer c.modeMtx.RUnlock()
|
defer c.modeMtx.Unlock()
|
||||||
|
|
||||||
return c.flush(ctx, ignoreErrors)
|
if err := c.flush(ctx, ignoreErrors); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if seal {
|
||||||
|
m := c.mode | mode.ReadOnly
|
||||||
|
if err := c.setMode(ctx, m, ignoreErrors); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
c.metrics.SetMode(m)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *cache) flush(ctx context.Context, ignoreErrors bool) error {
|
func (c *cache) flush(ctx context.Context, ignoreErrors bool) error {
|
||||||
|
@ -290,13 +301,53 @@ func (c *cache) flush(ctx context.Context, ignoreErrors bool) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
return c.db.View(func(tx *bbolt.Tx) error {
|
var last string
|
||||||
|
for {
|
||||||
|
batch, err := c.readNextDBBatch(ignoreErrors, last)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if len(batch) == 0 {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
for _, item := range batch {
|
||||||
|
var obj objectSDK.Object
|
||||||
|
if err := obj.Unmarshal(item.data); err != nil {
|
||||||
|
c.reportFlushError(logs.FSTreeCantDecodeDBObjectAddress, item.address, metaerr.Wrap(err))
|
||||||
|
if ignoreErrors {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := c.flushObject(ctx, &obj, item.data, StorageTypeDB); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
c.deleteFromDB(item.address, false)
|
||||||
|
}
|
||||||
|
last = batch[len(batch)-1].address
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type batchItem struct {
|
||||||
|
data []byte
|
||||||
|
address string
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *cache) readNextDBBatch(ignoreErrors bool, last string) ([]batchItem, error) {
|
||||||
|
const batchSize = 100
|
||||||
|
var batch []batchItem
|
||||||
|
err := c.db.View(func(tx *bbolt.Tx) error {
|
||||||
var addr oid.Address
|
var addr oid.Address
|
||||||
|
|
||||||
b := tx.Bucket(defaultBucket)
|
b := tx.Bucket(defaultBucket)
|
||||||
cs := b.Cursor()
|
cs := b.Cursor()
|
||||||
for k, data := cs.Seek(nil); k != nil; k, data = cs.Next() {
|
for k, data := cs.Seek([]byte(last)); k != nil; k, data = cs.Next() {
|
||||||
sa := string(k)
|
sa := string(k)
|
||||||
|
if sa == last {
|
||||||
|
continue
|
||||||
|
}
|
||||||
if err := addr.DecodeString(sa); err != nil {
|
if err := addr.DecodeString(sa); err != nil {
|
||||||
c.reportFlushError(logs.FSTreeCantDecodeDBObjectAddress, sa, metaerr.Wrap(err))
|
c.reportFlushError(logs.FSTreeCantDecodeDBObjectAddress, sa, metaerr.Wrap(err))
|
||||||
if ignoreErrors {
|
if ignoreErrors {
|
||||||
|
@ -305,19 +356,15 @@ func (c *cache) flush(ctx context.Context, ignoreErrors bool) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
var obj objectSDK.Object
|
batch = append(batch, batchItem{data: bytes.Clone(data), address: sa})
|
||||||
if err := obj.Unmarshal(data); err != nil {
|
if len(batch) == batchSize {
|
||||||
c.reportFlushError(logs.FSTreeCantDecodeDBObjectAddress, sa, metaerr.Wrap(err))
|
return errIterationCompleted
|
||||||
if ignoreErrors {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := c.flushObject(ctx, &obj, data, StorageTypeDB); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
|
if err == nil || errors.Is(err, errIterationCompleted) {
|
||||||
|
return batch, nil
|
||||||
|
}
|
||||||
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
|
@ -147,7 +147,7 @@ func runFlushTest[Option any](
|
||||||
require.NoError(t, bs.SetMode(mode.ReadWrite))
|
require.NoError(t, bs.SetMode(mode.ReadWrite))
|
||||||
require.NoError(t, mb.SetMode(mode.ReadWrite))
|
require.NoError(t, mb.SetMode(mode.ReadWrite))
|
||||||
|
|
||||||
require.NoError(t, wc.Flush(context.Background(), false))
|
require.NoError(t, wc.Flush(context.Background(), false, false))
|
||||||
|
|
||||||
check(t, mb, bs, objects)
|
check(t, mb, bs, objects)
|
||||||
})
|
})
|
||||||
|
@ -159,8 +159,6 @@ func runFlushTest[Option any](
|
||||||
// Blobstor is read-only, so we expect en error from `flush` here.
|
// Blobstor is read-only, so we expect en error from `flush` here.
|
||||||
require.Error(t, wc.SetMode(mode.Degraded))
|
require.Error(t, wc.SetMode(mode.Degraded))
|
||||||
|
|
||||||
// First move to read-only mode to close background workers.
|
|
||||||
require.NoError(t, wc.SetMode(mode.ReadOnly))
|
|
||||||
require.NoError(t, bs.SetMode(mode.ReadWrite))
|
require.NoError(t, bs.SetMode(mode.ReadWrite))
|
||||||
require.NoError(t, mb.SetMode(mode.ReadWrite))
|
require.NoError(t, mb.SetMode(mode.ReadWrite))
|
||||||
require.NoError(t, wc.SetMode(mode.Degraded))
|
require.NoError(t, wc.SetMode(mode.Degraded))
|
||||||
|
@ -177,14 +175,13 @@ func runFlushTest[Option any](
|
||||||
objects := putObjects(t, wc)
|
objects := putObjects(t, wc)
|
||||||
f.InjectFn(t, wc)
|
f.InjectFn(t, wc)
|
||||||
|
|
||||||
require.NoError(t, wc.SetMode(mode.ReadOnly))
|
|
||||||
require.NoError(t, bs.SetMode(mode.ReadWrite))
|
require.NoError(t, bs.SetMode(mode.ReadWrite))
|
||||||
require.NoError(t, mb.SetMode(mode.ReadWrite))
|
require.NoError(t, mb.SetMode(mode.ReadWrite))
|
||||||
|
|
||||||
require.Equal(t, uint32(0), errCount.Load())
|
require.Equal(t, uint32(0), errCount.Load())
|
||||||
require.Error(t, wc.Flush(context.Background(), false))
|
require.Error(t, wc.Flush(context.Background(), false, false))
|
||||||
require.Greater(t, errCount.Load(), uint32(0))
|
require.Greater(t, errCount.Load(), uint32(0))
|
||||||
require.NoError(t, wc.Flush(context.Background(), true))
|
require.NoError(t, wc.Flush(context.Background(), true, false))
|
||||||
|
|
||||||
check(t, mb, bs, objects)
|
check(t, mb, bs, objects)
|
||||||
})
|
})
|
||||||
|
|
|
@ -25,7 +25,7 @@ func (c *cache) SetMode(m mode.Mode) error {
|
||||||
c.modeMtx.Lock()
|
c.modeMtx.Lock()
|
||||||
defer c.modeMtx.Unlock()
|
defer c.modeMtx.Unlock()
|
||||||
|
|
||||||
err := c.setMode(ctx, m)
|
err := c.setMode(ctx, m, true)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
c.metrics.SetMode(m)
|
c.metrics.SetMode(m)
|
||||||
}
|
}
|
||||||
|
@ -33,12 +33,12 @@ func (c *cache) SetMode(m mode.Mode) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// setMode applies new mode. Must be called with cache.modeMtx lock taken.
|
// setMode applies new mode. Must be called with cache.modeMtx lock taken.
|
||||||
func (c *cache) setMode(ctx context.Context, m mode.Mode) error {
|
func (c *cache) setMode(ctx context.Context, m mode.Mode, ignoreErrors bool) error {
|
||||||
var err error
|
var err error
|
||||||
turnOffMeta := m.NoMetabase()
|
turnOffMeta := m.NoMetabase()
|
||||||
|
|
||||||
if turnOffMeta && !c.mode.NoMetabase() {
|
if turnOffMeta && !c.mode.NoMetabase() {
|
||||||
err = c.flush(ctx, true)
|
err = c.flush(ctx, ignoreErrors)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
@ -34,7 +34,9 @@ func (c *cache) Put(ctx context.Context, prm common.PutPrm) (common.PutRes, erro
|
||||||
c.metrics.Put(time.Since(startedAt), added, storageType)
|
c.metrics.Put(time.Since(startedAt), added, storageType)
|
||||||
}()
|
}()
|
||||||
|
|
||||||
c.modeMtx.RLock()
|
if !c.modeMtx.TryRLock() {
|
||||||
|
return common.PutRes{}, ErrNotInitialized
|
||||||
|
}
|
||||||
defer c.modeMtx.RUnlock()
|
defer c.modeMtx.RUnlock()
|
||||||
if c.readOnly() {
|
if c.readOnly() {
|
||||||
return common.PutRes{}, ErrReadOnly
|
return common.PutRes{}, ErrReadOnly
|
||||||
|
|
28
pkg/local_object_storage/writecache/seal.go
Normal file
28
pkg/local_object_storage/writecache/seal.go
Normal file
|
@ -0,0 +1,28 @@
|
||||||
|
package writecache
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||||
|
"go.opentelemetry.io/otel/attribute"
|
||||||
|
"go.opentelemetry.io/otel/trace"
|
||||||
|
)
|
||||||
|
|
||||||
|
func (c *cache) Seal(ctx context.Context, ignoreErrors bool) error {
|
||||||
|
ctx, span := tracing.StartSpanFromContext(ctx, "writecache.Seal",
|
||||||
|
trace.WithAttributes(
|
||||||
|
attribute.Bool("ignore_errors", ignoreErrors),
|
||||||
|
))
|
||||||
|
defer span.End()
|
||||||
|
|
||||||
|
c.modeMtx.Lock()
|
||||||
|
defer c.modeMtx.Unlock()
|
||||||
|
|
||||||
|
// flush will be done by setMode
|
||||||
|
err := c.setMode(ctx, mode.DegradedReadOnly, ignoreErrors)
|
||||||
|
if err == nil {
|
||||||
|
c.metrics.SetMode(mode.DegradedReadOnly)
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
|
@ -67,14 +67,24 @@ func (c *cache) openStore(readOnly bool) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *cache) deleteFromDB(key string) {
|
func (c *cache) deleteFromDB(key string, batched bool) {
|
||||||
var recordDeleted bool
|
var recordDeleted bool
|
||||||
err := c.db.Batch(func(tx *bbolt.Tx) error {
|
var err error
|
||||||
b := tx.Bucket(defaultBucket)
|
if batched {
|
||||||
key := []byte(key)
|
err = c.db.Batch(func(tx *bbolt.Tx) error {
|
||||||
recordDeleted = b.Get(key) != nil
|
b := tx.Bucket(defaultBucket)
|
||||||
return b.Delete(key)
|
key := []byte(key)
|
||||||
})
|
recordDeleted = b.Get(key) != nil
|
||||||
|
return b.Delete(key)
|
||||||
|
})
|
||||||
|
} else {
|
||||||
|
err = c.db.Update(func(tx *bbolt.Tx) error {
|
||||||
|
b := tx.Bucket(defaultBucket)
|
||||||
|
key := []byte(key)
|
||||||
|
recordDeleted = b.Get(key) != nil
|
||||||
|
return b.Delete(key)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
if err == nil {
|
if err == nil {
|
||||||
c.metrics.Evict(StorageTypeDB)
|
c.metrics.Evict(StorageTypeDB)
|
||||||
|
|
|
@ -35,7 +35,8 @@ type Cache interface {
|
||||||
SetMode(mode.Mode) error
|
SetMode(mode.Mode) error
|
||||||
SetLogger(*logger.Logger)
|
SetLogger(*logger.Logger)
|
||||||
DumpInfo() Info
|
DumpInfo() Info
|
||||||
Flush(context.Context, bool) error
|
Flush(context.Context, bool, bool) error
|
||||||
|
Seal(context.Context, bool) error
|
||||||
|
|
||||||
Init() error
|
Init() error
|
||||||
Open(ctx context.Context, readOnly bool) error
|
Open(ctx context.Context, readOnly bool) error
|
||||||
|
|
|
@ -24,6 +24,7 @@ const (
|
||||||
rpcGetChainLocalOverride = "GetChainLocalOverride"
|
rpcGetChainLocalOverride = "GetChainLocalOverride"
|
||||||
rpcListChainLocalOverrides = "ListChainLocalOverrides"
|
rpcListChainLocalOverrides = "ListChainLocalOverrides"
|
||||||
rpcRemoveChainLocalOverride = "RemoveChainLocalOverride"
|
rpcRemoveChainLocalOverride = "RemoveChainLocalOverride"
|
||||||
|
rpcSealWriteCache = "SealWriteCache"
|
||||||
)
|
)
|
||||||
|
|
||||||
// HealthCheck executes ControlService.HealthCheck RPC.
|
// HealthCheck executes ControlService.HealthCheck RPC.
|
||||||
|
@ -264,3 +265,16 @@ func RemoveChainLocalOverride(cli *client.Client, req *RemoveChainLocalOverrideR
|
||||||
|
|
||||||
return wResp.message, nil
|
return wResp.message, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SealWriteCache executes ControlService.SealWriteCache RPC.
|
||||||
|
func SealWriteCache(cli *client.Client, req *SealWriteCacheRequest, opts ...client.CallOption) (*SealWriteCacheResponse, error) {
|
||||||
|
wResp := newResponseWrapper[SealWriteCacheResponse]()
|
||||||
|
wReq := &requestWrapper{m: req}
|
||||||
|
|
||||||
|
err := client.SendUnary(cli, common.CallMethodInfoUnary(serviceName, rpcSealWriteCache), wReq, wResp, opts...)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return wResp.message, nil
|
||||||
|
}
|
||||||
|
|
|
@ -18,6 +18,7 @@ func (s *Server) FlushCache(ctx context.Context, req *control.FlushCacheRequest)
|
||||||
for _, shardID := range s.getShardIDList(req.GetBody().GetShard_ID()) {
|
for _, shardID := range s.getShardIDList(req.GetBody().GetShard_ID()) {
|
||||||
var prm engine.FlushWriteCachePrm
|
var prm engine.FlushWriteCachePrm
|
||||||
prm.SetShardID(shardID)
|
prm.SetShardID(shardID)
|
||||||
|
prm.SetSeal(req.GetBody().GetSeal())
|
||||||
|
|
||||||
_, err = s.s.FlushWriteCache(ctx, prm)
|
_, err = s.s.FlushWriteCache(ctx, prm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
48
pkg/services/control/server/seal_writecache.go
Normal file
48
pkg/services/control/server/seal_writecache.go
Normal file
|
@ -0,0 +1,48 @@
|
||||||
|
package control
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control"
|
||||||
|
"google.golang.org/grpc/codes"
|
||||||
|
"google.golang.org/grpc/status"
|
||||||
|
)
|
||||||
|
|
||||||
|
func (s *Server) SealWriteCache(ctx context.Context, req *control.SealWriteCacheRequest) (*control.SealWriteCacheResponse, error) {
|
||||||
|
err := s.isValidRequest(req)
|
||||||
|
if err != nil {
|
||||||
|
return nil, status.Error(codes.PermissionDenied, err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
prm := engine.SealWriteCachePrm{
|
||||||
|
ShardIDs: s.getShardIDList(req.GetBody().GetShard_ID()),
|
||||||
|
IgnoreErrors: req.GetBody().GetIgnoreErrors(),
|
||||||
|
}
|
||||||
|
|
||||||
|
res, err := s.s.SealWriteCache(ctx, prm)
|
||||||
|
if err != nil {
|
||||||
|
return nil, status.Error(codes.Internal, err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
resp := &control.SealWriteCacheResponse{Body: &control.SealWriteCacheResponse_Body{}}
|
||||||
|
for _, r := range res.ShardResults {
|
||||||
|
if r.Success {
|
||||||
|
resp.Body.Results = append(resp.GetBody().GetResults(), &control.SealWriteCacheResponse_Body_Status{
|
||||||
|
Shard_ID: *r.ShardID,
|
||||||
|
Success: true,
|
||||||
|
})
|
||||||
|
} else {
|
||||||
|
resp.Body.Results = append(resp.GetBody().GetResults(), &control.SealWriteCacheResponse_Body_Status{
|
||||||
|
Shard_ID: *r.ShardID,
|
||||||
|
Error: r.ErrorMsg,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
err = SignMessage(s.key, resp)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return resp, nil
|
||||||
|
}
|
1501
pkg/services/control/service.pb.go
generated
1501
pkg/services/control/service.pb.go
generated
File diff suppressed because it is too large
Load diff
|
@ -56,6 +56,9 @@ service ControlService {
|
||||||
|
|
||||||
// Remove local access policy engine overrides stored in the node by chaind id.
|
// Remove local access policy engine overrides stored in the node by chaind id.
|
||||||
rpc RemoveChainLocalOverride (RemoveChainLocalOverrideRequest) returns (RemoveChainLocalOverrideResponse);
|
rpc RemoveChainLocalOverride (RemoveChainLocalOverrideRequest) returns (RemoveChainLocalOverrideResponse);
|
||||||
|
|
||||||
|
// Flush objects from write-cache and move it to degraded read only mode.
|
||||||
|
rpc SealWriteCache(SealWriteCacheRequest) returns (SealWriteCacheResponse);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Health check request.
|
// Health check request.
|
||||||
|
@ -280,6 +283,8 @@ message FlushCacheRequest {
|
||||||
message Body {
|
message Body {
|
||||||
// ID of the shard.
|
// ID of the shard.
|
||||||
repeated bytes shard_ID = 1;
|
repeated bytes shard_ID = 1;
|
||||||
|
// If true, then writecache will be left in read-only mode after flush completed.
|
||||||
|
bool seal = 2;
|
||||||
}
|
}
|
||||||
|
|
||||||
Body body = 1;
|
Body body = 1;
|
||||||
|
@ -523,3 +528,32 @@ message RemoveChainLocalOverrideResponse {
|
||||||
|
|
||||||
Signature signature = 2;
|
Signature signature = 2;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
message SealWriteCacheRequest {
|
||||||
|
// Request body structure.
|
||||||
|
message Body {
|
||||||
|
// ID of the shard.
|
||||||
|
repeated bytes shard_ID = 1;
|
||||||
|
|
||||||
|
// Flag indicating whether object read errors should be ignored.
|
||||||
|
bool ignore_errors = 2;
|
||||||
|
}
|
||||||
|
|
||||||
|
Body body = 1;
|
||||||
|
Signature signature = 2;
|
||||||
|
}
|
||||||
|
|
||||||
|
message SealWriteCacheResponse {
|
||||||
|
message Body {
|
||||||
|
message Status {
|
||||||
|
bytes shard_ID = 1;
|
||||||
|
bool success = 2;
|
||||||
|
string error = 3;
|
||||||
|
}
|
||||||
|
repeated Status results = 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
Body body = 1;
|
||||||
|
|
||||||
|
Signature signature = 2;
|
||||||
|
}
|
215
pkg/services/control/service_frostfs.pb.go
generated
215
pkg/services/control/service_frostfs.pb.go
generated
|
@ -1180,6 +1180,7 @@ func (x *FlushCacheRequest_Body) StableSize() (size int) {
|
||||||
return 0
|
return 0
|
||||||
}
|
}
|
||||||
size += proto.RepeatedBytesSize(1, x.Shard_ID)
|
size += proto.RepeatedBytesSize(1, x.Shard_ID)
|
||||||
|
size += proto.BoolSize(2, x.Seal)
|
||||||
return size
|
return size
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1200,6 +1201,7 @@ func (x *FlushCacheRequest_Body) StableMarshal(buf []byte) []byte {
|
||||||
}
|
}
|
||||||
var offset int
|
var offset int
|
||||||
offset += proto.RepeatedBytesMarshal(1, buf[offset:], x.Shard_ID)
|
offset += proto.RepeatedBytesMarshal(1, buf[offset:], x.Shard_ID)
|
||||||
|
offset += proto.BoolMarshal(2, buf[offset:], x.Seal)
|
||||||
return buf
|
return buf
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2753,3 +2755,216 @@ func (x *RemoveChainLocalOverrideResponse) ReadSignedData(buf []byte) ([]byte, e
|
||||||
func (x *RemoveChainLocalOverrideResponse) SetSignature(sig *Signature) {
|
func (x *RemoveChainLocalOverrideResponse) SetSignature(sig *Signature) {
|
||||||
x.Signature = sig
|
x.Signature = sig
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// StableSize returns the size of x in protobuf format.
|
||||||
|
//
|
||||||
|
// Structures with the same field values have the same binary size.
|
||||||
|
func (x *SealWriteCacheRequest_Body) StableSize() (size int) {
|
||||||
|
if x == nil {
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
size += proto.RepeatedBytesSize(1, x.Shard_ID)
|
||||||
|
size += proto.BoolSize(2, x.IgnoreErrors)
|
||||||
|
return size
|
||||||
|
}
|
||||||
|
|
||||||
|
// StableMarshal marshals x in protobuf binary format with stable field order.
|
||||||
|
//
|
||||||
|
// If buffer length is less than x.StableSize(), new buffer is allocated.
|
||||||
|
//
|
||||||
|
// Returns any error encountered which did not allow writing the data completely.
|
||||||
|
// Otherwise, returns the buffer in which the data is written.
|
||||||
|
//
|
||||||
|
// Structures with the same field values have the same binary format.
|
||||||
|
func (x *SealWriteCacheRequest_Body) StableMarshal(buf []byte) []byte {
|
||||||
|
if x == nil {
|
||||||
|
return []byte{}
|
||||||
|
}
|
||||||
|
if buf == nil {
|
||||||
|
buf = make([]byte, x.StableSize())
|
||||||
|
}
|
||||||
|
var offset int
|
||||||
|
offset += proto.RepeatedBytesMarshal(1, buf[offset:], x.Shard_ID)
|
||||||
|
offset += proto.BoolMarshal(2, buf[offset:], x.IgnoreErrors)
|
||||||
|
return buf
|
||||||
|
}
|
||||||
|
|
||||||
|
// StableSize returns the size of x in protobuf format.
|
||||||
|
//
|
||||||
|
// Structures with the same field values have the same binary size.
|
||||||
|
func (x *SealWriteCacheRequest) StableSize() (size int) {
|
||||||
|
if x == nil {
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
size += proto.NestedStructureSize(1, x.Body)
|
||||||
|
size += proto.NestedStructureSize(2, x.Signature)
|
||||||
|
return size
|
||||||
|
}
|
||||||
|
|
||||||
|
// StableMarshal marshals x in protobuf binary format with stable field order.
|
||||||
|
//
|
||||||
|
// If buffer length is less than x.StableSize(), new buffer is allocated.
|
||||||
|
//
|
||||||
|
// Returns any error encountered which did not allow writing the data completely.
|
||||||
|
// Otherwise, returns the buffer in which the data is written.
|
||||||
|
//
|
||||||
|
// Structures with the same field values have the same binary format.
|
||||||
|
func (x *SealWriteCacheRequest) StableMarshal(buf []byte) []byte {
|
||||||
|
if x == nil {
|
||||||
|
return []byte{}
|
||||||
|
}
|
||||||
|
if buf == nil {
|
||||||
|
buf = make([]byte, x.StableSize())
|
||||||
|
}
|
||||||
|
var offset int
|
||||||
|
offset += proto.NestedStructureMarshal(1, buf[offset:], x.Body)
|
||||||
|
offset += proto.NestedStructureMarshal(2, buf[offset:], x.Signature)
|
||||||
|
return buf
|
||||||
|
}
|
||||||
|
|
||||||
|
// ReadSignedData fills buf with signed data of x.
|
||||||
|
// If buffer length is less than x.SignedDataSize(), new buffer is allocated.
|
||||||
|
//
|
||||||
|
// Returns any error encountered which did not allow writing the data completely.
|
||||||
|
// Otherwise, returns the buffer in which the data is written.
|
||||||
|
//
|
||||||
|
// Structures with the same field values have the same signed data.
|
||||||
|
func (x *SealWriteCacheRequest) SignedDataSize() int {
|
||||||
|
return x.GetBody().StableSize()
|
||||||
|
}
|
||||||
|
|
||||||
|
// SignedDataSize returns size of the request signed data in bytes.
|
||||||
|
//
|
||||||
|
// Structures with the same field values have the same signed data size.
|
||||||
|
func (x *SealWriteCacheRequest) ReadSignedData(buf []byte) ([]byte, error) {
|
||||||
|
return x.GetBody().StableMarshal(buf), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (x *SealWriteCacheRequest) SetSignature(sig *Signature) {
|
||||||
|
x.Signature = sig
|
||||||
|
}
|
||||||
|
|
||||||
|
// StableSize returns the size of x in protobuf format.
|
||||||
|
//
|
||||||
|
// Structures with the same field values have the same binary size.
|
||||||
|
func (x *SealWriteCacheResponse_Body_Status) StableSize() (size int) {
|
||||||
|
if x == nil {
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
size += proto.BytesSize(1, x.Shard_ID)
|
||||||
|
size += proto.BoolSize(2, x.Success)
|
||||||
|
size += proto.StringSize(3, x.Error)
|
||||||
|
return size
|
||||||
|
}
|
||||||
|
|
||||||
|
// StableMarshal marshals x in protobuf binary format with stable field order.
|
||||||
|
//
|
||||||
|
// If buffer length is less than x.StableSize(), new buffer is allocated.
|
||||||
|
//
|
||||||
|
// Returns any error encountered which did not allow writing the data completely.
|
||||||
|
// Otherwise, returns the buffer in which the data is written.
|
||||||
|
//
|
||||||
|
// Structures with the same field values have the same binary format.
|
||||||
|
func (x *SealWriteCacheResponse_Body_Status) StableMarshal(buf []byte) []byte {
|
||||||
|
if x == nil {
|
||||||
|
return []byte{}
|
||||||
|
}
|
||||||
|
if buf == nil {
|
||||||
|
buf = make([]byte, x.StableSize())
|
||||||
|
}
|
||||||
|
var offset int
|
||||||
|
offset += proto.BytesMarshal(1, buf[offset:], x.Shard_ID)
|
||||||
|
offset += proto.BoolMarshal(2, buf[offset:], x.Success)
|
||||||
|
offset += proto.StringMarshal(3, buf[offset:], x.Error)
|
||||||
|
return buf
|
||||||
|
}
|
||||||
|
|
||||||
|
// StableSize returns the size of x in protobuf format.
|
||||||
|
//
|
||||||
|
// Structures with the same field values have the same binary size.
|
||||||
|
func (x *SealWriteCacheResponse_Body) StableSize() (size int) {
|
||||||
|
if x == nil {
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
for i := range x.Results {
|
||||||
|
size += proto.NestedStructureSize(1, x.Results[i])
|
||||||
|
}
|
||||||
|
return size
|
||||||
|
}
|
||||||
|
|
||||||
|
// StableMarshal marshals x in protobuf binary format with stable field order.
|
||||||
|
//
|
||||||
|
// If buffer length is less than x.StableSize(), new buffer is allocated.
|
||||||
|
//
|
||||||
|
// Returns any error encountered which did not allow writing the data completely.
|
||||||
|
// Otherwise, returns the buffer in which the data is written.
|
||||||
|
//
|
||||||
|
// Structures with the same field values have the same binary format.
|
||||||
|
func (x *SealWriteCacheResponse_Body) StableMarshal(buf []byte) []byte {
|
||||||
|
if x == nil {
|
||||||
|
return []byte{}
|
||||||
|
}
|
||||||
|
if buf == nil {
|
||||||
|
buf = make([]byte, x.StableSize())
|
||||||
|
}
|
||||||
|
var offset int
|
||||||
|
for i := range x.Results {
|
||||||
|
offset += proto.NestedStructureMarshal(1, buf[offset:], x.Results[i])
|
||||||
|
}
|
||||||
|
return buf
|
||||||
|
}
|
||||||
|
|
||||||
|
// StableSize returns the size of x in protobuf format.
|
||||||
|
//
|
||||||
|
// Structures with the same field values have the same binary size.
|
||||||
|
func (x *SealWriteCacheResponse) StableSize() (size int) {
|
||||||
|
if x == nil {
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
size += proto.NestedStructureSize(1, x.Body)
|
||||||
|
size += proto.NestedStructureSize(2, x.Signature)
|
||||||
|
return size
|
||||||
|
}
|
||||||
|
|
||||||
|
// StableMarshal marshals x in protobuf binary format with stable field order.
|
||||||
|
//
|
||||||
|
// If buffer length is less than x.StableSize(), new buffer is allocated.
|
||||||
|
//
|
||||||
|
// Returns any error encountered which did not allow writing the data completely.
|
||||||
|
// Otherwise, returns the buffer in which the data is written.
|
||||||
|
//
|
||||||
|
// Structures with the same field values have the same binary format.
|
||||||
|
func (x *SealWriteCacheResponse) StableMarshal(buf []byte) []byte {
|
||||||
|
if x == nil {
|
||||||
|
return []byte{}
|
||||||
|
}
|
||||||
|
if buf == nil {
|
||||||
|
buf = make([]byte, x.StableSize())
|
||||||
|
}
|
||||||
|
var offset int
|
||||||
|
offset += proto.NestedStructureMarshal(1, buf[offset:], x.Body)
|
||||||
|
offset += proto.NestedStructureMarshal(2, buf[offset:], x.Signature)
|
||||||
|
return buf
|
||||||
|
}
|
||||||
|
|
||||||
|
// ReadSignedData fills buf with signed data of x.
|
||||||
|
// If buffer length is less than x.SignedDataSize(), new buffer is allocated.
|
||||||
|
//
|
||||||
|
// Returns any error encountered which did not allow writing the data completely.
|
||||||
|
// Otherwise, returns the buffer in which the data is written.
|
||||||
|
//
|
||||||
|
// Structures with the same field values have the same signed data.
|
||||||
|
func (x *SealWriteCacheResponse) SignedDataSize() int {
|
||||||
|
return x.GetBody().StableSize()
|
||||||
|
}
|
||||||
|
|
||||||
|
// SignedDataSize returns size of the request signed data in bytes.
|
||||||
|
//
|
||||||
|
// Structures with the same field values have the same signed data size.
|
||||||
|
func (x *SealWriteCacheResponse) ReadSignedData(buf []byte) ([]byte, error) {
|
||||||
|
return x.GetBody().StableMarshal(buf), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (x *SealWriteCacheResponse) SetSignature(sig *Signature) {
|
||||||
|
x.Signature = sig
|
||||||
|
}
|
||||||
|
|
39
pkg/services/control/service_grpc.pb.go
generated
39
pkg/services/control/service_grpc.pb.go
generated
|
@ -35,6 +35,7 @@ const (
|
||||||
ControlService_GetChainLocalOverride_FullMethodName = "/control.ControlService/GetChainLocalOverride"
|
ControlService_GetChainLocalOverride_FullMethodName = "/control.ControlService/GetChainLocalOverride"
|
||||||
ControlService_ListChainLocalOverrides_FullMethodName = "/control.ControlService/ListChainLocalOverrides"
|
ControlService_ListChainLocalOverrides_FullMethodName = "/control.ControlService/ListChainLocalOverrides"
|
||||||
ControlService_RemoveChainLocalOverride_FullMethodName = "/control.ControlService/RemoveChainLocalOverride"
|
ControlService_RemoveChainLocalOverride_FullMethodName = "/control.ControlService/RemoveChainLocalOverride"
|
||||||
|
ControlService_SealWriteCache_FullMethodName = "/control.ControlService/SealWriteCache"
|
||||||
)
|
)
|
||||||
|
|
||||||
// ControlServiceClient is the client API for ControlService service.
|
// ControlServiceClient is the client API for ControlService service.
|
||||||
|
@ -74,6 +75,8 @@ type ControlServiceClient interface {
|
||||||
ListChainLocalOverrides(ctx context.Context, in *ListChainLocalOverridesRequest, opts ...grpc.CallOption) (*ListChainLocalOverridesResponse, error)
|
ListChainLocalOverrides(ctx context.Context, in *ListChainLocalOverridesRequest, opts ...grpc.CallOption) (*ListChainLocalOverridesResponse, error)
|
||||||
// Remove local access policy engine overrides stored in the node by chaind id.
|
// Remove local access policy engine overrides stored in the node by chaind id.
|
||||||
RemoveChainLocalOverride(ctx context.Context, in *RemoveChainLocalOverrideRequest, opts ...grpc.CallOption) (*RemoveChainLocalOverrideResponse, error)
|
RemoveChainLocalOverride(ctx context.Context, in *RemoveChainLocalOverrideRequest, opts ...grpc.CallOption) (*RemoveChainLocalOverrideResponse, error)
|
||||||
|
// Flush objects from write-cache and move it to degraded read only mode.
|
||||||
|
SealWriteCache(ctx context.Context, in *SealWriteCacheRequest, opts ...grpc.CallOption) (*SealWriteCacheResponse, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
type controlServiceClient struct {
|
type controlServiceClient struct {
|
||||||
|
@ -228,6 +231,15 @@ func (c *controlServiceClient) RemoveChainLocalOverride(ctx context.Context, in
|
||||||
return out, nil
|
return out, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *controlServiceClient) SealWriteCache(ctx context.Context, in *SealWriteCacheRequest, opts ...grpc.CallOption) (*SealWriteCacheResponse, error) {
|
||||||
|
out := new(SealWriteCacheResponse)
|
||||||
|
err := c.cc.Invoke(ctx, ControlService_SealWriteCache_FullMethodName, in, out, opts...)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return out, nil
|
||||||
|
}
|
||||||
|
|
||||||
// ControlServiceServer is the server API for ControlService service.
|
// ControlServiceServer is the server API for ControlService service.
|
||||||
// All implementations should embed UnimplementedControlServiceServer
|
// All implementations should embed UnimplementedControlServiceServer
|
||||||
// for forward compatibility
|
// for forward compatibility
|
||||||
|
@ -265,6 +277,8 @@ type ControlServiceServer interface {
|
||||||
ListChainLocalOverrides(context.Context, *ListChainLocalOverridesRequest) (*ListChainLocalOverridesResponse, error)
|
ListChainLocalOverrides(context.Context, *ListChainLocalOverridesRequest) (*ListChainLocalOverridesResponse, error)
|
||||||
// Remove local access policy engine overrides stored in the node by chaind id.
|
// Remove local access policy engine overrides stored in the node by chaind id.
|
||||||
RemoveChainLocalOverride(context.Context, *RemoveChainLocalOverrideRequest) (*RemoveChainLocalOverrideResponse, error)
|
RemoveChainLocalOverride(context.Context, *RemoveChainLocalOverrideRequest) (*RemoveChainLocalOverrideResponse, error)
|
||||||
|
// Flush objects from write-cache and move it to degraded read only mode.
|
||||||
|
SealWriteCache(context.Context, *SealWriteCacheRequest) (*SealWriteCacheResponse, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
// UnimplementedControlServiceServer should be embedded to have forward compatible implementations.
|
// UnimplementedControlServiceServer should be embedded to have forward compatible implementations.
|
||||||
|
@ -319,6 +333,9 @@ func (UnimplementedControlServiceServer) ListChainLocalOverrides(context.Context
|
||||||
func (UnimplementedControlServiceServer) RemoveChainLocalOverride(context.Context, *RemoveChainLocalOverrideRequest) (*RemoveChainLocalOverrideResponse, error) {
|
func (UnimplementedControlServiceServer) RemoveChainLocalOverride(context.Context, *RemoveChainLocalOverrideRequest) (*RemoveChainLocalOverrideResponse, error) {
|
||||||
return nil, status.Errorf(codes.Unimplemented, "method RemoveChainLocalOverride not implemented")
|
return nil, status.Errorf(codes.Unimplemented, "method RemoveChainLocalOverride not implemented")
|
||||||
}
|
}
|
||||||
|
func (UnimplementedControlServiceServer) SealWriteCache(context.Context, *SealWriteCacheRequest) (*SealWriteCacheResponse, error) {
|
||||||
|
return nil, status.Errorf(codes.Unimplemented, "method SealWriteCache not implemented")
|
||||||
|
}
|
||||||
|
|
||||||
// UnsafeControlServiceServer may be embedded to opt out of forward compatibility for this service.
|
// UnsafeControlServiceServer may be embedded to opt out of forward compatibility for this service.
|
||||||
// Use of this interface is not recommended, as added methods to ControlServiceServer will
|
// Use of this interface is not recommended, as added methods to ControlServiceServer will
|
||||||
|
@ -619,6 +636,24 @@ func _ControlService_RemoveChainLocalOverride_Handler(srv interface{}, ctx conte
|
||||||
return interceptor(ctx, in, info, handler)
|
return interceptor(ctx, in, info, handler)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func _ControlService_SealWriteCache_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
|
||||||
|
in := new(SealWriteCacheRequest)
|
||||||
|
if err := dec(in); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if interceptor == nil {
|
||||||
|
return srv.(ControlServiceServer).SealWriteCache(ctx, in)
|
||||||
|
}
|
||||||
|
info := &grpc.UnaryServerInfo{
|
||||||
|
Server: srv,
|
||||||
|
FullMethod: ControlService_SealWriteCache_FullMethodName,
|
||||||
|
}
|
||||||
|
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
|
||||||
|
return srv.(ControlServiceServer).SealWriteCache(ctx, req.(*SealWriteCacheRequest))
|
||||||
|
}
|
||||||
|
return interceptor(ctx, in, info, handler)
|
||||||
|
}
|
||||||
|
|
||||||
// ControlService_ServiceDesc is the grpc.ServiceDesc for ControlService service.
|
// ControlService_ServiceDesc is the grpc.ServiceDesc for ControlService service.
|
||||||
// It's only intended for direct use with grpc.RegisterService,
|
// It's only intended for direct use with grpc.RegisterService,
|
||||||
// and not to be introspected or modified (even as a copy)
|
// and not to be introspected or modified (even as a copy)
|
||||||
|
@ -690,6 +725,10 @@ var ControlService_ServiceDesc = grpc.ServiceDesc{
|
||||||
MethodName: "RemoveChainLocalOverride",
|
MethodName: "RemoveChainLocalOverride",
|
||||||
Handler: _ControlService_RemoveChainLocalOverride_Handler,
|
Handler: _ControlService_RemoveChainLocalOverride_Handler,
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
MethodName: "SealWriteCache",
|
||||||
|
Handler: _ControlService_SealWriteCache_Handler,
|
||||||
|
},
|
||||||
},
|
},
|
||||||
Streams: []grpc.StreamDesc{},
|
Streams: []grpc.StreamDesc{},
|
||||||
Metadata: "pkg/services/control/service.proto",
|
Metadata: "pkg/services/control/service.proto",
|
||||||
|
|
Loading…
Add table
Reference in a new issue