Allow to seal writecache after flush #886
|
@ -8,17 +8,23 @@ import (
|
||||||
"github.com/spf13/cobra"
|
"github.com/spf13/cobra"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const sealFlag = "seal"
|
||||||
|
|
||||||
var flushCacheCmd = &cobra.Command{
|
var flushCacheCmd = &cobra.Command{
|
||||||
Use: "flush-cache",
|
Use: "flush-cache",
|
||||||
Short: "Flush objects from the write-cache to the main storage",
|
Short: "Flush objects from the write-cache to the main storage",
|
||||||
Long: "Flush objects from the write-cache to the main storage",
|
Long: "Flush objects from the write-cache to the main storage",
|
||||||
Run: flushCache,
|
Run: flushCache,
|
||||||
|
Deprecated: "Flushing objects from writecache to the main storage is performed by writecache automatically. To flush and seal writecache use `frostfs-cli control shards writecache seal`.",
|
||||||
fyrchik marked this conversation as resolved
Outdated
|
|||||||
}
|
}
|
||||||
|
|
||||||
func flushCache(cmd *cobra.Command, _ []string) {
|
func flushCache(cmd *cobra.Command, _ []string) {
|
||||||
pk := key.Get(cmd)
|
pk := key.Get(cmd)
|
||||||
|
|
||||||
req := &control.FlushCacheRequest{Body: new(control.FlushCacheRequest_Body)}
|
seal, _ := cmd.Flags().GetBool(sealFlag)
|
||||||
|
req := &control.FlushCacheRequest{Body: &control.FlushCacheRequest_Body{
|
||||||
|
Seal: seal,
|
||||||
|
}}
|
||||||
req.Body.Shard_ID = getShardIDList(cmd)
|
req.Body.Shard_ID = getShardIDList(cmd)
|
||||||
|
|
||||||
signRequest(cmd, pk, req)
|
signRequest(cmd, pk, req)
|
||||||
|
@ -44,6 +50,7 @@ func initControlFlushCacheCmd() {
|
||||||
ff := flushCacheCmd.Flags()
|
ff := flushCacheCmd.Flags()
|
||||||
ff.StringSlice(shardIDFlag, nil, "List of shard IDs in base58 encoding")
|
ff.StringSlice(shardIDFlag, nil, "List of shard IDs in base58 encoding")
|
||||||
ff.Bool(shardAllFlag, false, "Process all shards")
|
ff.Bool(shardAllFlag, false, "Process all shards")
|
||||||
|
ff.Bool(sealFlag, false, "Writecache will be left in read-only mode after flush completed")
|
||||||
|
|
||||||
flushCacheCmd.MarkFlagsMutuallyExclusive(shardIDFlag, shardAllFlag)
|
flushCacheCmd.MarkFlagsMutuallyExclusive(shardIDFlag, shardAllFlag)
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,6 +17,7 @@ func initControlShardsCmd() {
|
||||||
shardsCmd.AddCommand(evacuationShardCmd)
|
shardsCmd.AddCommand(evacuationShardCmd)
|
||||||
shardsCmd.AddCommand(flushCacheCmd)
|
shardsCmd.AddCommand(flushCacheCmd)
|
||||||
shardsCmd.AddCommand(doctorCmd)
|
shardsCmd.AddCommand(doctorCmd)
|
||||||
|
shardsCmd.AddCommand(writecacheShardCmd)
|
||||||
|
|
||||||
initControlShardsListCmd()
|
initControlShardsListCmd()
|
||||||
initControlSetShardModeCmd()
|
initControlSetShardModeCmd()
|
||||||
|
@ -24,4 +25,5 @@ func initControlShardsCmd() {
|
||||||
initControlEvacuationShardCmd()
|
initControlEvacuationShardCmd()
|
||||||
initControlFlushCacheCmd()
|
initControlFlushCacheCmd()
|
||||||
initControlDoctorCmd()
|
initControlDoctorCmd()
|
||||||
|
initControlShardsWritecacheCmd()
|
||||||
}
|
}
|
||||||
|
|
73
cmd/frostfs-cli/modules/control/writecache.go
Normal file
|
@ -0,0 +1,73 @@
|
||||||
|
package control
|
||||||
|
|
||||||
|
import (
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc/client"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/key"
|
||||||
|
commonCmd "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/internal/common"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control"
|
||||||
|
"github.com/mr-tron/base58"
|
||||||
|
"github.com/spf13/cobra"
|
||||||
|
)
|
||||||
|
|
||||||
|
var writecacheShardCmd = &cobra.Command{
|
||||||
|
Use: "writecache",
|
||||||
|
Short: "Operations with storage node's write-cache",
|
||||||
|
Long: "Operations with storage node's write-cache",
|
||||||
|
}
|
||||||
|
|
||||||
|
var sealWritecacheShardCmd = &cobra.Command{
|
||||||
|
Use: "seal",
|
||||||
|
Short: "Flush objects from write-cache and move write-cache to degraded read only mode.",
|
||||||
|
Long: "Flush all the objects from the write-cache to the main storage and move the write-cache to the degraded read only mode: write-cache will be empty and no objects will be put in it.",
|
||||||
|
Run: sealWritecache,
|
||||||
|
}
|
||||||
|
|
||||||
|
func sealWritecache(cmd *cobra.Command, _ []string) {
|
||||||
|
pk := key.Get(cmd)
|
||||||
|
|
||||||
|
ignoreErrors, _ := cmd.Flags().GetBool(ignoreErrorsFlag)
|
||||||
|
|
||||||
|
req := &control.SealWriteCacheRequest{Body: &control.SealWriteCacheRequest_Body{
|
||||||
|
Shard_ID: getShardIDList(cmd),
|
||||||
|
IgnoreErrors: ignoreErrors,
|
||||||
|
}}
|
||||||
|
|
||||||
|
signRequest(cmd, pk, req)
|
||||||
|
|
||||||
|
cli := getClient(cmd, pk)
|
||||||
|
|
||||||
|
var resp *control.SealWriteCacheResponse
|
||||||
|
var err error
|
||||||
|
err = cli.ExecRaw(func(client *client.Client) error {
|
||||||
|
resp, err = control.SealWriteCache(client, req)
|
||||||
|
return err
|
||||||
|
})
|
||||||
|
commonCmd.ExitOnErr(cmd, "rpc error: %w", err)
|
||||||
|
|
||||||
|
verifyResponse(cmd, resp.GetSignature(), resp.GetBody())
|
||||||
|
|
||||||
|
var success, failed uint
|
||||||
|
for _, res := range resp.GetBody().GetResults() {
|
||||||
|
if res.GetSuccess() {
|
||||||
|
success++
|
||||||
|
cmd.Printf("Shard %s: OK\n", base58.Encode(res.GetShard_ID()))
|
||||||
|
} else {
|
||||||
|
failed++
|
||||||
|
cmd.Printf("Shard %s: failed with error %q\n", base58.Encode(res.GetShard_ID()), res.GetError())
|
||||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
`\"%s\"` -> `%q`?
dstepanov-yadro
commented
fixed fixed
|
|||||||
|
}
|
||||||
|
}
|
||||||
|
cmd.Printf("Total: %d success, %d failed\n", success, failed)
|
||||||
|
}
|
||||||
|
|
||||||
|
func initControlShardsWritecacheCmd() {
|
||||||
|
writecacheShardCmd.AddCommand(sealWritecacheShardCmd)
|
||||||
|
|
||||||
|
initControlFlags(sealWritecacheShardCmd)
|
||||||
|
|
||||||
|
ff := sealWritecacheShardCmd.Flags()
|
||||||
|
ff.StringSlice(shardIDFlag, nil, "List of shard IDs in base58 encoding")
|
||||||
|
ff.Bool(shardAllFlag, false, "Process all shards")
|
||||||
|
ff.Bool(ignoreErrorsFlag, true, "Skip invalid/unreadable objects")
|
||||||
|
|
||||||
|
sealWritecacheShardCmd.MarkFlagsMutuallyExclusive(shardIDFlag, shardAllFlag)
|
||||||
|
}
|
|
@ -2,6 +2,7 @@ package engine
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
|
||||||
|
@ -11,12 +12,14 @@ import (
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||||
"go.opentelemetry.io/otel/attribute"
|
"go.opentelemetry.io/otel/attribute"
|
||||||
"go.opentelemetry.io/otel/trace"
|
"go.opentelemetry.io/otel/trace"
|
||||||
|
"golang.org/x/sync/errgroup"
|
||||||
)
|
)
|
||||||
|
|
||||||
// FlushWriteCachePrm groups the parameters of FlushWriteCache operation.
|
// FlushWriteCachePrm groups the parameters of FlushWriteCache operation.
|
||||||
type FlushWriteCachePrm struct {
|
type FlushWriteCachePrm struct {
|
||||||
shardID *shard.ID
|
shardID *shard.ID
|
||||||
ignoreErrors bool
|
ignoreErrors bool
|
||||||
|
seal bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// SetShardID is an option to set shard ID.
|
// SetShardID is an option to set shard ID.
|
||||||
|
@ -26,11 +29,16 @@ func (p *FlushWriteCachePrm) SetShardID(id *shard.ID) {
|
||||||
p.shardID = id
|
p.shardID = id
|
||||||
}
|
}
|
||||||
|
|
||||||
// SetIgnoreErrors sets errors ignore flag..
|
// SetIgnoreErrors sets errors ignore flag.
|
||||||
func (p *FlushWriteCachePrm) SetIgnoreErrors(ignore bool) {
|
func (p *FlushWriteCachePrm) SetIgnoreErrors(ignore bool) {
|
||||||
p.ignoreErrors = ignore
|
p.ignoreErrors = ignore
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SetSeal sets seal flag.
|
||||||
|
func (p *FlushWriteCachePrm) SetSeal(v bool) {
|
||||||
|
p.seal = v
|
||||||
|
}
|
||||||
|
|
||||||
// FlushWriteCacheRes groups the resulting values of FlushWriteCache operation.
|
// FlushWriteCacheRes groups the resulting values of FlushWriteCache operation.
|
||||||
type FlushWriteCacheRes struct{}
|
type FlushWriteCacheRes struct{}
|
||||||
|
|
||||||
|
@ -38,8 +46,9 @@ type FlushWriteCacheRes struct{}
|
||||||
func (e *StorageEngine) FlushWriteCache(ctx context.Context, p FlushWriteCachePrm) (FlushWriteCacheRes, error) {
|
func (e *StorageEngine) FlushWriteCache(ctx context.Context, p FlushWriteCachePrm) (FlushWriteCacheRes, error) {
|
||||||
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.FlushWriteCache",
|
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.FlushWriteCache",
|
||||||
trace.WithAttributes(
|
trace.WithAttributes(
|
||||||
attribute.String("shard)id", p.shardID.String()),
|
attribute.String("shard_id", p.shardID.String()),
|
||||||
attribute.Bool("ignore_errors", p.ignoreErrors),
|
attribute.Bool("ignore_errors", p.ignoreErrors),
|
||||||
|
attribute.Bool("seal", p.seal),
|
||||||
))
|
))
|
||||||
defer span.End()
|
defer span.End()
|
||||||
|
|
||||||
|
@ -53,10 +62,84 @@ func (e *StorageEngine) FlushWriteCache(ctx context.Context, p FlushWriteCachePr
|
||||||
|
|
||||||
var prm shard.FlushWriteCachePrm
|
var prm shard.FlushWriteCachePrm
|
||||||
prm.SetIgnoreErrors(p.ignoreErrors)
|
prm.SetIgnoreErrors(p.ignoreErrors)
|
||||||
|
prm.SetSeal(p.seal)
|
||||||
|
|
||||||
return FlushWriteCacheRes{}, sh.FlushWriteCache(ctx, prm)
|
return FlushWriteCacheRes{}, sh.FlushWriteCache(ctx, prm)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type SealWriteCachePrm struct {
|
||||||
|
ShardIDs []*shard.ID
|
||||||
|
IgnoreErrors bool
|
||||||
|
}
|
||||||
|
|
||||||
|
type ShardSealResult struct {
|
||||||
|
ShardID *shard.ID
|
||||||
|
Success bool
|
||||||
|
ErrorMsg string
|
||||||
|
}
|
||||||
|
|
||||||
|
type SealWriteCacheRes struct {
|
||||||
|
ShardResults []ShardSealResult
|
||||||
|
}
|
||||||
|
|
||||||
|
// SealWriteCache flushed all data to blobstore and moves write-cache to degraded read only mode.
|
||||||
|
func (e *StorageEngine) SealWriteCache(ctx context.Context, prm SealWriteCachePrm) (SealWriteCacheRes, error) {
|
||||||
|
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.SealWriteCache",
|
||||||
|
trace.WithAttributes(
|
||||||
|
attribute.Int("shard_id_count", len(prm.ShardIDs)),
|
||||||
|
attribute.Bool("ignore_errors", prm.IgnoreErrors),
|
||||||
|
))
|
||||||
|
defer span.End()
|
||||||
|
|
||||||
|
res := SealWriteCacheRes{
|
||||||
|
ShardResults: make([]ShardSealResult, 0, len(prm.ShardIDs)),
|
||||||
|
}
|
||||||
|
resGuard := &sync.Mutex{}
|
||||||
|
|
||||||
|
eg, egCtx := errgroup.WithContext(ctx)
|
||||||
|
for _, shardID := range prm.ShardIDs {
|
||||||
|
shardID := shardID
|
||||||
|
eg.Go(func() error {
|
||||||
|
e.mtx.RLock()
|
||||||
|
sh, ok := e.shards[shardID.String()]
|
||||||
|
e.mtx.RUnlock()
|
||||||
|
|
||||||
|
if !ok {
|
||||||
|
resGuard.Lock()
|
||||||
|
defer resGuard.Unlock()
|
||||||
|
res.ShardResults = append(res.ShardResults, ShardSealResult{
|
||||||
|
ShardID: shardID,
|
||||||
|
ErrorMsg: errShardNotFound.Error(),
|
||||||
|
})
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
err := sh.SealWriteCache(egCtx, shard.SealWriteCachePrm{IgnoreErrors: prm.IgnoreErrors})
|
||||||
|
|
||||||
|
resGuard.Lock()
|
||||||
|
defer resGuard.Unlock()
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
res.ShardResults = append(res.ShardResults, ShardSealResult{
|
||||||
|
ShardID: shardID,
|
||||||
|
ErrorMsg: err.Error(),
|
||||||
|
})
|
||||||
|
} else {
|
||||||
|
res.ShardResults = append(res.ShardResults, ShardSealResult{
|
||||||
|
ShardID: shardID,
|
||||||
|
Success: true,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := eg.Wait(); err != nil {
|
||||||
|
return SealWriteCacheRes{}, err
|
||||||
|
}
|
||||||
|
return res, nil
|
||||||
|
}
|
||||||
|
|
||||||
type writeCacheMetrics struct {
|
type writeCacheMetrics struct {
|
||||||
shardID string
|
shardID string
|
||||||
metrics metrics.WriteCacheMetrics
|
metrics metrics.WriteCacheMetrics
|
||||||
|
|
|
@ -12,6 +12,7 @@ import (
|
||||||
// FlushWriteCachePrm represents parameters of a `FlushWriteCache` operation.
|
// FlushWriteCachePrm represents parameters of a `FlushWriteCache` operation.
|
||||||
type FlushWriteCachePrm struct {
|
type FlushWriteCachePrm struct {
|
||||||
ignoreErrors bool
|
ignoreErrors bool
|
||||||
|
seal bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// SetIgnoreErrors sets the flag to ignore read-errors during flush.
|
// SetIgnoreErrors sets the flag to ignore read-errors during flush.
|
||||||
|
@ -19,6 +20,11 @@ func (p *FlushWriteCachePrm) SetIgnoreErrors(ignore bool) {
|
||||||
p.ignoreErrors = ignore
|
p.ignoreErrors = ignore
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SetSeal sets the flag to left writecache in read-only mode after flush.
|
||||||
|
func (p *FlushWriteCachePrm) SetSeal(v bool) {
|
||||||
|
p.seal = v
|
||||||
|
}
|
||||||
|
|
||||||
// errWriteCacheDisabled is returned when an operation on write-cache is performed,
|
// errWriteCacheDisabled is returned when an operation on write-cache is performed,
|
||||||
// but write-cache is disabled.
|
// but write-cache is disabled.
|
||||||
var errWriteCacheDisabled = errors.New("write-cache is disabled")
|
var errWriteCacheDisabled = errors.New("write-cache is disabled")
|
||||||
|
@ -29,6 +35,7 @@ func (s *Shard) FlushWriteCache(ctx context.Context, p FlushWriteCachePrm) error
|
||||||
trace.WithAttributes(
|
trace.WithAttributes(
|
||||||
attribute.String("shard_id", s.ID().String()),
|
attribute.String("shard_id", s.ID().String()),
|
||||||
attribute.Bool("ignore_errors", p.ignoreErrors),
|
attribute.Bool("ignore_errors", p.ignoreErrors),
|
||||||
|
attribute.Bool("seal", p.seal),
|
||||||
))
|
))
|
||||||
defer span.End()
|
defer span.End()
|
||||||
|
|
||||||
|
@ -47,5 +54,35 @@ func (s *Shard) FlushWriteCache(ctx context.Context, p FlushWriteCachePrm) error
|
||||||
return ErrDegradedMode
|
return ErrDegradedMode
|
||||||
}
|
}
|
||||||
|
|
||||||
return s.writeCache.Flush(ctx, p.ignoreErrors)
|
return s.writeCache.Flush(ctx, p.ignoreErrors, p.seal)
|
||||||
|
}
|
||||||
|
|
||||||
|
type SealWriteCachePrm struct {
|
||||||
|
IgnoreErrors bool
|
||||||
|
}
|
||||||
|
|
||||||
|
// SealWriteCache flushes all data from the write-cache and moves it to degraded read only mode.
|
||||||
|
func (s *Shard) SealWriteCache(ctx context.Context, p SealWriteCachePrm) error {
|
||||||
|
ctx, span := tracing.StartSpanFromContext(ctx, "Shard.SealWriteCache",
|
||||||
|
trace.WithAttributes(
|
||||||
|
attribute.String("shard_id", s.ID().String()),
|
||||||
|
attribute.Bool("ignore_errors", p.IgnoreErrors),
|
||||||
|
))
|
||||||
|
defer span.End()
|
||||||
|
|
||||||
|
if !s.hasWriteCache() {
|
||||||
|
return errWriteCacheDisabled
|
||||||
|
}
|
||||||
|
|
||||||
|
s.m.RLock()
|
||||||
|
defer s.m.RUnlock()
|
||||||
|
|
||||||
|
if s.info.Mode.ReadOnly() {
|
||||||
|
return ErrReadOnlyMode
|
||||||
|
}
|
||||||
|
if s.info.Mode.NoMetabase() {
|
||||||
|
return ErrDegradedMode
|
||||||
|
}
|
||||||
|
|
||||||
|
return s.writeCache.Seal(ctx, p.IgnoreErrors)
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,6 +18,7 @@ import (
|
||||||
// Delete removes object from write-cache.
|
// Delete removes object from write-cache.
|
||||||
//
|
//
|
||||||
// Returns an error of type apistatus.ObjectNotFound if object is missing in write-cache.
|
// Returns an error of type apistatus.ObjectNotFound if object is missing in write-cache.
|
||||||
|
// Returns ErrNotInitialized if write-cache has not been initialized yet.
|
||||||
func (c *cache) Delete(ctx context.Context, addr oid.Address) error {
|
func (c *cache) Delete(ctx context.Context, addr oid.Address) error {
|
||||||
ctx, span := tracing.StartSpanFromContext(ctx, "writecache.Delete",
|
ctx, span := tracing.StartSpanFromContext(ctx, "writecache.Delete",
|
||||||
trace.WithAttributes(
|
trace.WithAttributes(
|
||||||
|
@ -32,7 +33,9 @@ func (c *cache) Delete(ctx context.Context, addr oid.Address) error {
|
||||||
c.metrics.Delete(time.Since(startedAt), deleted, storageType)
|
c.metrics.Delete(time.Since(startedAt), deleted, storageType)
|
||||||
}()
|
}()
|
||||||
|
|
||||||
c.modeMtx.RLock()
|
if !c.modeMtx.TryRLock() {
|
||||||
fyrchik
commented
If writecache is doing something, why don't we just hang here? Anyway, if it is not related to sealing, how about doing it in a separate commit or before your changes -- it deserves a separate description. If writecache is doing something, why don't we just hang here? Anyway, if it is not related to sealing, how about doing it in a separate commit or before your changes -- it deserves a separate description.
dstepanov-yadro
commented
Writecache is not mandatory, but it can take a lot of time to flush for example. Also if modeMtx is locked, then highly likely writecache will be unavailable for write. So put or delete can be processed with blobstor directly. Writecache is not mandatory, but it can take a lot of time to flush for example. Also if modeMtx is locked, then highly likely writecache will be unavailable for write. So put or delete can be processed with blobstor directly.
|
|||||||
|
return ErrNotInitialized
|
||||||
|
}
|
||||||
defer c.modeMtx.RUnlock()
|
defer c.modeMtx.RUnlock()
|
||||||
if c.readOnly() {
|
if c.readOnly() {
|
||||||
return ErrReadOnly
|
return ErrReadOnly
|
||||||
|
|
|
@ -12,6 +12,7 @@ import (
|
||||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
|
||||||||
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
||||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
|
||||||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||||
|
@ -33,6 +34,8 @@ const (
|
||||||||
defaultFlushInterval = time.Second
|
defaultFlushInterval = time.Second
|
||||||||
)
|
)
|
||||||||
|
|
||||||||
|
var errIterationCompleted = errors.New("iteration completed")
|
||||||||
|
|
||||||||
// runFlushLoop starts background workers which periodically flush objects to the blobstor.
|
// runFlushLoop starts background workers which periodically flush objects to the blobstor.
|
||||||||
func (c *cache) runFlushLoop(ctx context.Context) {
|
func (c *cache) runFlushLoop(ctx context.Context) {
|
||||||||
if c.disableBackgroundFlush {
|
if c.disableBackgroundFlush {
|
||||||||
|
@ -135,13 +138,11 @@ func (c *cache) flushSmallObjects(ctx context.Context) {
|
||||||||
}
|
}
|
||||||||
}
|
}
|
||||||||
|
|
||||||||
if count == 0 {
|
|
||||||||
c.modeMtx.RUnlock()
|
c.modeMtx.RUnlock()
|
||||||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
Pure refactoring, please, make it a separate commit. Pure refactoring, please, make it a separate commit.
dstepanov-yadro
commented
Done. Done.
|
|||||||||
|
if count == 0 {
|
||||||||
break
|
break
|
||||||||
}
|
}
|
||||||||
|
|
||||||||
c.modeMtx.RUnlock()
|
|
||||||||
|
|
||||||||
c.log.Debug(logs.WritecacheTriedToFlushItemsFromWritecache,
|
c.log.Debug(logs.WritecacheTriedToFlushItemsFromWritecache,
|
||||||||
zap.Int("count", count),
|
zap.Int("count", count),
|
||||||||
zap.String("start", base58.Encode(lastKey)))
|
zap.String("start", base58.Encode(lastKey)))
|
||||||||
|
@ -229,7 +230,7 @@ func (c *cache) workerFlushSmall(ctx context.Context) {
|
||||||||
continue
|
continue
|
||||||||
}
|
}
|
||||||||
|
|
||||||||
c.deleteFromDB(objInfo.addr)
|
c.deleteFromDB(objInfo.addr, true)
|
||||||||
}
|
}
|
||||||||
}
|
}
|
||||||||
|
|
||||||||
|
@ -270,19 +271,29 @@ func (c *cache) flushObject(ctx context.Context, obj *objectSDK.Object, data []b
|
||||||||
}
|
}
|
||||||||
|
|
||||||||
// Flush flushes all objects from the write-cache to the main storage.
|
// Flush flushes all objects from the write-cache to the main storage.
|
||||||||
// Write-cache must be in readonly mode to ensure correctness of an operation and
|
func (c *cache) Flush(ctx context.Context, ignoreErrors, seal bool) error {
|
||||||||
// to prevent interference with background flush workers.
|
ctx, span := tracing.StartSpanFromContext(ctx, "writecache.Flush",
|
||||||||
func (c *cache) Flush(ctx context.Context, ignoreErrors bool) error {
|
|
||||||||
ctx, span := tracing.StartSpanFromContext(ctx, "Flush",
|
|
||||||||
trace.WithAttributes(
|
trace.WithAttributes(
|
||||||||
attribute.Bool("ignore_errors", ignoreErrors),
|
attribute.Bool("ignore_errors", ignoreErrors),
|
||||||||
|
attribute.Bool("seal", seal),
|
||||||||
))
|
))
|
||||||||
defer span.End()
|
defer span.End()
|
||||||||
|
|
||||||||
c.modeMtx.RLock()
|
c.modeMtx.Lock() // exclusive lock to not to conflict with background flush
|
||||||||
fyrchik
commented
What do you mean by "conflict with background flush"? What do you mean by "conflict with background flush"?
Both flushes do the same, here we just need to return _after_ everything is flushed.
dstepanov-yadro
commented
Background flush acquires RLock, so only one flush will be in process: background or manual Background flush acquires RLock, so only one flush will be in process: background or manual
|
|||||||||
defer c.modeMtx.RUnlock()
|
defer c.modeMtx.Unlock()
|
||||||||
|
|
||||||||
return c.flush(ctx, ignoreErrors)
|
if err := c.flush(ctx, ignoreErrors); err != nil {
|
||||||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
There are 2 cases to consider:
What is the behaviour in these situations? It looks like we should fail in both:
There are 2 cases to consider:
1. DEGRADED shard mode.
2. READONLY _shard_ mode.
What is the behaviour in these situations? It looks like we should fail in both:
1. In DEGRADED writecache is not opened and SSD can be missing.
2. In READONLY blobstor is in READONLY too (likely for a reason, like dead HDD), so flushing won't be done.
dstepanov-yadro
commented
And we will fail:
And we will fail: https://git.frostfs.info/TrueCloudLab/frostfs-node/src/commit/8180a0664f05b5adedf399cbd8ad90f0f37115aa/pkg/local_object_storage/shard/writecache.go#L27
|
|||||||||
|
return err
|
||||||||
|
}
|
||||||||
|
|
||||||||
|
if seal {
|
||||||||
|
m := c.mode | mode.ReadOnly
|
||||||||
|
if err := c.setMode(ctx, m, ignoreErrors); err != nil {
|
||||||||
|
return err
|
||||||||
|
}
|
||||||||
|
c.metrics.SetMode(m)
|
||||||||
|
}
|
||||||||
|
return nil
|
||||||||
}
|
}
|
||||||||
|
|
||||||||
func (c *cache) flush(ctx context.Context, ignoreErrors bool) error {
|
func (c *cache) flush(ctx context.Context, ignoreErrors bool) error {
|
||||||||
|
@ -290,13 +301,53 @@ func (c *cache) flush(ctx context.Context, ignoreErrors bool) error {
|
||||||||
return err
|
return err
|
||||||||
}
|
}
|
||||||||
|
|
||||||||
return c.db.View(func(tx *bbolt.Tx) error {
|
var last string
|
||||||||
|
for {
|
||||||||
|
batch, err := c.readNextDBBatch(ignoreErrors, last)
|
||||||||
|
if err != nil {
|
||||||||
|
return err
|
||||||||
|
}
|
||||||||
|
if len(batch) == 0 {
|
||||||||
|
break
|
||||||||
|
}
|
||||||||
|
for _, item := range batch {
|
||||||||
|
var obj objectSDK.Object
|
||||||||
|
if err := obj.Unmarshal(item.data); err != nil {
|
||||||||
|
c.reportFlushError(logs.FSTreeCantDecodeDBObjectAddress, item.address, metaerr.Wrap(err))
|
||||||||
|
if ignoreErrors {
|
||||||||
|
continue
|
||||||||
|
}
|
||||||||
|
return err
|
||||||||
|
}
|
||||||||
|
|
||||||||
|
if err := c.flushObject(ctx, &obj, item.data, StorageTypeDB); err != nil {
|
||||||||
|
return err
|
||||||||
|
}
|
||||||||
|
c.deleteFromDB(item.address, false)
|
||||||||
|
}
|
||||||||
|
last = batch[len(batch)-1].address
|
||||||||
|
}
|
||||||||
|
return nil
|
||||||||
|
}
|
||||||||
|
|
||||||||
|
type batchItem struct {
|
||||||||
|
data []byte
|
||||||||
|
address string
|
||||||||
|
}
|
||||||||
|
|
||||||||
|
func (c *cache) readNextDBBatch(ignoreErrors bool, last string) ([]batchItem, error) {
|
||||||||
|
const batchSize = 100
|
||||||||
|
var batch []batchItem
|
||||||||
|
err := c.db.View(func(tx *bbolt.Tx) error {
|
||||||||
var addr oid.Address
|
var addr oid.Address
|
||||||||
|
|
||||||||
b := tx.Bucket(defaultBucket)
|
b := tx.Bucket(defaultBucket)
|
||||||||
cs := b.Cursor()
|
cs := b.Cursor()
|
||||||||
for k, data := cs.Seek(nil); k != nil; k, data = cs.Next() {
|
for k, data := cs.Seek([]byte(last)); k != nil; k, data = cs.Next() {
|
||||||||
sa := string(k)
|
sa := string(k)
|
||||||||
|
if sa == last {
|
||||||||
|
continue
|
||||||||
|
}
|
||||||||
if err := addr.DecodeString(sa); err != nil {
|
if err := addr.DecodeString(sa); err != nil {
|
||||||||
c.reportFlushError(logs.FSTreeCantDecodeDBObjectAddress, sa, metaerr.Wrap(err))
|
c.reportFlushError(logs.FSTreeCantDecodeDBObjectAddress, sa, metaerr.Wrap(err))
|
||||||||
if ignoreErrors {
|
if ignoreErrors {
|
||||||||
|
@ -305,19 +356,15 @@ func (c *cache) flush(ctx context.Context, ignoreErrors bool) error {
|
||||||||
return err
|
return err
|
||||||||
}
|
}
|
||||||||
|
|
||||||||
var obj objectSDK.Object
|
batch = append(batch, batchItem{data: bytes.Clone(data), address: sa})
|
||||||||
if err := obj.Unmarshal(data); err != nil {
|
if len(batch) == batchSize {
|
||||||||
c.reportFlushError(logs.FSTreeCantDecodeDBObjectAddress, sa, metaerr.Wrap(err))
|
return errIterationCompleted
|
||||||||
if ignoreErrors {
|
|
||||||||
continue
|
|
||||||||
}
|
|
||||||||
return err
|
|
||||||||
}
|
|
||||||||
|
|
||||||||
if err := c.flushObject(ctx, &obj, data, StorageTypeDB); err != nil {
|
|
||||||||
return err
|
|
||||||||
}
|
}
|
||||||||
}
|
}
|
||||||||
return nil
|
return nil
|
||||||||
})
|
})
|
||||||||
|
if err == nil || errors.Is(err, errIterationCompleted) {
|
||||||||
|
return batch, nil
|
||||||||
|
}
|
||||||||
|
return nil, err
|
||||||||
}
|
}
|
||||||||
|
|
|
@ -147,7 +147,7 @@ func runFlushTest[Option any](
|
||||||
require.NoError(t, bs.SetMode(mode.ReadWrite))
|
require.NoError(t, bs.SetMode(mode.ReadWrite))
|
||||||
require.NoError(t, mb.SetMode(mode.ReadWrite))
|
require.NoError(t, mb.SetMode(mode.ReadWrite))
|
||||||
|
|
||||||
require.NoError(t, wc.Flush(context.Background(), false))
|
require.NoError(t, wc.Flush(context.Background(), false, false))
|
||||||
|
|
||||||
check(t, mb, bs, objects)
|
check(t, mb, bs, objects)
|
||||||
})
|
})
|
||||||
|
@ -159,8 +159,6 @@ func runFlushTest[Option any](
|
||||||
// Blobstor is read-only, so we expect en error from `flush` here.
|
// Blobstor is read-only, so we expect en error from `flush` here.
|
||||||
require.Error(t, wc.SetMode(mode.Degraded))
|
require.Error(t, wc.SetMode(mode.Degraded))
|
||||||
|
|
||||||
// First move to read-only mode to close background workers.
|
|
||||||
require.NoError(t, wc.SetMode(mode.ReadOnly))
|
|
||||||
require.NoError(t, bs.SetMode(mode.ReadWrite))
|
require.NoError(t, bs.SetMode(mode.ReadWrite))
|
||||||
require.NoError(t, mb.SetMode(mode.ReadWrite))
|
require.NoError(t, mb.SetMode(mode.ReadWrite))
|
||||||
require.NoError(t, wc.SetMode(mode.Degraded))
|
require.NoError(t, wc.SetMode(mode.Degraded))
|
||||||
|
@ -177,14 +175,13 @@ func runFlushTest[Option any](
|
||||||
objects := putObjects(t, wc)
|
objects := putObjects(t, wc)
|
||||||
f.InjectFn(t, wc)
|
f.InjectFn(t, wc)
|
||||||
|
|
||||||
require.NoError(t, wc.SetMode(mode.ReadOnly))
|
|
||||||
require.NoError(t, bs.SetMode(mode.ReadWrite))
|
require.NoError(t, bs.SetMode(mode.ReadWrite))
|
||||||
require.NoError(t, mb.SetMode(mode.ReadWrite))
|
require.NoError(t, mb.SetMode(mode.ReadWrite))
|
||||||
|
|
||||||
require.Equal(t, uint32(0), errCount.Load())
|
require.Equal(t, uint32(0), errCount.Load())
|
||||||
require.Error(t, wc.Flush(context.Background(), false))
|
require.Error(t, wc.Flush(context.Background(), false, false))
|
||||||
require.Greater(t, errCount.Load(), uint32(0))
|
require.Greater(t, errCount.Load(), uint32(0))
|
||||||
require.NoError(t, wc.Flush(context.Background(), true))
|
require.NoError(t, wc.Flush(context.Background(), true, false))
|
||||||
|
|
||||||
check(t, mb, bs, objects)
|
check(t, mb, bs, objects)
|
||||||
})
|
})
|
||||||
|
|
|
@ -25,7 +25,7 @@ func (c *cache) SetMode(m mode.Mode) error {
|
||||||
c.modeMtx.Lock()
|
c.modeMtx.Lock()
|
||||||
defer c.modeMtx.Unlock()
|
defer c.modeMtx.Unlock()
|
||||||
|
|
||||||
err := c.setMode(ctx, m)
|
err := c.setMode(ctx, m, true)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
c.metrics.SetMode(m)
|
c.metrics.SetMode(m)
|
||||||
}
|
}
|
||||||
|
@ -33,12 +33,12 @@ func (c *cache) SetMode(m mode.Mode) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// setMode applies new mode. Must be called with cache.modeMtx lock taken.
|
// setMode applies new mode. Must be called with cache.modeMtx lock taken.
|
||||||
func (c *cache) setMode(ctx context.Context, m mode.Mode) error {
|
func (c *cache) setMode(ctx context.Context, m mode.Mode, ignoreErrors bool) error {
|
||||||
var err error
|
var err error
|
||||||
turnOffMeta := m.NoMetabase()
|
turnOffMeta := m.NoMetabase()
|
||||||
|
|
||||||
if turnOffMeta && !c.mode.NoMetabase() {
|
if turnOffMeta && !c.mode.NoMetabase() {
|
||||||
err = c.flush(ctx, true)
|
err = c.flush(ctx, ignoreErrors)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
@ -34,7 +34,9 @@ func (c *cache) Put(ctx context.Context, prm common.PutPrm) (common.PutRes, erro
|
||||||
c.metrics.Put(time.Since(startedAt), added, storageType)
|
c.metrics.Put(time.Since(startedAt), added, storageType)
|
||||||
}()
|
}()
|
||||||
|
|
||||||
c.modeMtx.RLock()
|
if !c.modeMtx.TryRLock() {
|
||||||
|
return common.PutRes{}, ErrNotInitialized
|
||||||
|
}
|
||||||
defer c.modeMtx.RUnlock()
|
defer c.modeMtx.RUnlock()
|
||||||
if c.readOnly() {
|
if c.readOnly() {
|
||||||
return common.PutRes{}, ErrReadOnly
|
return common.PutRes{}, ErrReadOnly
|
||||||
|
|
28
pkg/local_object_storage/writecache/seal.go
Normal file
|
@ -0,0 +1,28 @@
|
||||||
|
package writecache
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||||
|
"go.opentelemetry.io/otel/attribute"
|
||||||
|
"go.opentelemetry.io/otel/trace"
|
||||||
|
)
|
||||||
|
|
||||||
|
func (c *cache) Seal(ctx context.Context, ignoreErrors bool) error {
|
||||||
|
ctx, span := tracing.StartSpanFromContext(ctx, "writecache.Seal",
|
||||||
|
trace.WithAttributes(
|
||||||
|
attribute.Bool("ignore_errors", ignoreErrors),
|
||||||
|
))
|
||||||
|
defer span.End()
|
||||||
|
|
||||||
|
c.modeMtx.Lock()
|
||||||
|
defer c.modeMtx.Unlock()
|
||||||
|
|
||||||
|
// flush will be done by setMode
|
||||||
|
err := c.setMode(ctx, mode.DegradedReadOnly, ignoreErrors)
|
||||||
|
if err == nil {
|
||||||
|
c.metrics.SetMode(mode.DegradedReadOnly)
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
|
@ -67,14 +67,24 @@ func (c *cache) openStore(readOnly bool) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *cache) deleteFromDB(key string) {
|
func (c *cache) deleteFromDB(key string, batched bool) {
|
||||||
var recordDeleted bool
|
var recordDeleted bool
|
||||||
err := c.db.Batch(func(tx *bbolt.Tx) error {
|
var err error
|
||||||
|
if batched {
|
||||||
|
err = c.db.Batch(func(tx *bbolt.Tx) error {
|
||||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
Again, an optimization, please, separate commit. Again, an optimization, please, separate commit.
dstepanov-yadro
commented
No, it isn't an optimization: for manual flush we don't need batch. No, it isn't an optimization: for manual flush we don't need batch.
fyrchik
commented
Why is manual flush single-threaded then? Why is manual flush single-threaded then?
dstepanov-yadro
commented
To reduce complexity. No need to do manual flush as fast as possible. To reduce complexity. No need to do manual flush as fast as possible.
fyrchik
commented
On the contrary 8 hours vs 1 hour can make a difference -- this is all done by a human. On the contrary 8 hours vs 1 hour can make a difference -- this is all done by a human.
dstepanov-yadro
commented
Looks like that these values taken not from real life: flush works pretty fast now. Looks like that these values taken not from real life: flush works pretty fast now.
|
|||||||
b := tx.Bucket(defaultBucket)
|
b := tx.Bucket(defaultBucket)
|
||||||
key := []byte(key)
|
key := []byte(key)
|
||||||
recordDeleted = b.Get(key) != nil
|
recordDeleted = b.Get(key) != nil
|
||||||
return b.Delete(key)
|
return b.Delete(key)
|
||||||
})
|
})
|
||||||
|
} else {
|
||||||
|
err = c.db.Update(func(tx *bbolt.Tx) error {
|
||||||
|
b := tx.Bucket(defaultBucket)
|
||||||
|
key := []byte(key)
|
||||||
|
recordDeleted = b.Get(key) != nil
|
||||||
|
return b.Delete(key)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
if err == nil {
|
if err == nil {
|
||||||
c.metrics.Evict(StorageTypeDB)
|
c.metrics.Evict(StorageTypeDB)
|
||||||
|
|
|
@ -35,7 +35,8 @@ type Cache interface {
|
||||||
SetMode(mode.Mode) error
|
SetMode(mode.Mode) error
|
||||||
SetLogger(*logger.Logger)
|
SetLogger(*logger.Logger)
|
||||||
DumpInfo() Info
|
DumpInfo() Info
|
||||||
Flush(context.Context, bool) error
|
Flush(context.Context, bool, bool) error
|
||||||
|
Seal(context.Context, bool) error
|
||||||
|
|
||||||
Init() error
|
Init() error
|
||||||
Open(ctx context.Context, readOnly bool) error
|
Open(ctx context.Context, readOnly bool) error
|
||||||
|
|
|
@ -24,6 +24,7 @@ const (
|
||||||
rpcGetChainLocalOverride = "GetChainLocalOverride"
|
rpcGetChainLocalOverride = "GetChainLocalOverride"
|
||||||
rpcListChainLocalOverrides = "ListChainLocalOverrides"
|
rpcListChainLocalOverrides = "ListChainLocalOverrides"
|
||||||
rpcRemoveChainLocalOverride = "RemoveChainLocalOverride"
|
rpcRemoveChainLocalOverride = "RemoveChainLocalOverride"
|
||||||
|
rpcSealWriteCache = "SealWriteCache"
|
||||||
)
|
)
|
||||||
|
|
||||||
// HealthCheck executes ControlService.HealthCheck RPC.
|
// HealthCheck executes ControlService.HealthCheck RPC.
|
||||||
|
@ -264,3 +265,16 @@ func RemoveChainLocalOverride(cli *client.Client, req *RemoveChainLocalOverrideR
|
||||||
|
|
||||||
return wResp.message, nil
|
return wResp.message, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SealWriteCache executes ControlService.SealWriteCache RPC.
|
||||||
|
func SealWriteCache(cli *client.Client, req *SealWriteCacheRequest, opts ...client.CallOption) (*SealWriteCacheResponse, error) {
|
||||||
|
wResp := newResponseWrapper[SealWriteCacheResponse]()
|
||||||
|
wReq := &requestWrapper{m: req}
|
||||||
|
|
||||||
|
err := client.SendUnary(cli, common.CallMethodInfoUnary(serviceName, rpcSealWriteCache), wReq, wResp, opts...)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return wResp.message, nil
|
||||||
|
}
|
||||||
|
|
|
@ -18,6 +18,7 @@ func (s *Server) FlushCache(ctx context.Context, req *control.FlushCacheRequest)
|
||||||
for _, shardID := range s.getShardIDList(req.GetBody().GetShard_ID()) {
|
for _, shardID := range s.getShardIDList(req.GetBody().GetShard_ID()) {
|
||||||
var prm engine.FlushWriteCachePrm
|
var prm engine.FlushWriteCachePrm
|
||||||
prm.SetShardID(shardID)
|
prm.SetShardID(shardID)
|
||||||
|
prm.SetSeal(req.GetBody().GetSeal())
|
||||||
|
|
||||||
_, err = s.s.FlushWriteCache(ctx, prm)
|
_, err = s.s.FlushWriteCache(ctx, prm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
48
pkg/services/control/server/seal_writecache.go
Normal file
|
@ -0,0 +1,48 @@
|
||||||
|
package control
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control"
|
||||||
|
"google.golang.org/grpc/codes"
|
||||||
|
"google.golang.org/grpc/status"
|
||||||
|
)
|
||||||
|
|
||||||
|
func (s *Server) SealWriteCache(ctx context.Context, req *control.SealWriteCacheRequest) (*control.SealWriteCacheResponse, error) {
|
||||||
|
err := s.isValidRequest(req)
|
||||||
|
if err != nil {
|
||||||
|
return nil, status.Error(codes.PermissionDenied, err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
prm := engine.SealWriteCachePrm{
|
||||||
|
ShardIDs: s.getShardIDList(req.GetBody().GetShard_ID()),
|
||||||
|
IgnoreErrors: req.GetBody().GetIgnoreErrors(),
|
||||||
|
}
|
||||||
|
|
||||||
|
res, err := s.s.SealWriteCache(ctx, prm)
|
||||||
|
if err != nil {
|
||||||
|
return nil, status.Error(codes.Internal, err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
resp := &control.SealWriteCacheResponse{Body: &control.SealWriteCacheResponse_Body{}}
|
||||||
|
for _, r := range res.ShardResults {
|
||||||
|
if r.Success {
|
||||||
|
resp.Body.Results = append(resp.GetBody().GetResults(), &control.SealWriteCacheResponse_Body_Status{
|
||||||
|
Shard_ID: *r.ShardID,
|
||||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
Doesn't Doesn't `*r.ShardID` work? We already have similar code in other parts.
dstepanov-yadro
commented
fixed fixed
|
|||||||
|
Success: true,
|
||||||
|
})
|
||||||
|
} else {
|
||||||
|
resp.Body.Results = append(resp.GetBody().GetResults(), &control.SealWriteCacheResponse_Body_Status{
|
||||||
|
Shard_ID: *r.ShardID,
|
||||||
|
Error: r.ErrorMsg,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
err = SignMessage(s.key, resp)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return resp, nil
|
||||||
|
}
|
1501
pkg/services/control/service.pb.go
generated
|
@ -56,6 +56,9 @@ service ControlService {
|
||||||
|
|
||||||
// Remove local access policy engine overrides stored in the node by chaind id.
|
// Remove local access policy engine overrides stored in the node by chaind id.
|
||||||
rpc RemoveChainLocalOverride (RemoveChainLocalOverrideRequest) returns (RemoveChainLocalOverrideResponse);
|
rpc RemoveChainLocalOverride (RemoveChainLocalOverrideRequest) returns (RemoveChainLocalOverrideResponse);
|
||||||
|
|
||||||
|
// Flush objects from write-cache and move it to degraded read only mode.
|
||||||
|
rpc SealWriteCache(SealWriteCacheRequest) returns (SealWriteCacheResponse);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Health check request.
|
// Health check request.
|
||||||
|
@ -280,6 +283,8 @@ message FlushCacheRequest {
|
||||||
message Body {
|
message Body {
|
||||||
// ID of the shard.
|
// ID of the shard.
|
||||||
repeated bytes shard_ID = 1;
|
repeated bytes shard_ID = 1;
|
||||||
|
// If true, then writecache will be left in read-only mode after flush completed.
|
||||||
|
bool seal = 2;
|
||||||
}
|
}
|
||||||
|
|
||||||
Body body = 1;
|
Body body = 1;
|
||||||
|
@ -523,3 +528,32 @@ message RemoveChainLocalOverrideResponse {
|
||||||
|
|
||||||
Signature signature = 2;
|
Signature signature = 2;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
message SealWriteCacheRequest {
|
||||||
|
// Request body structure.
|
||||||
|
message Body {
|
||||||
|
// ID of the shard.
|
||||||
|
repeated bytes shard_ID = 1;
|
||||||
|
|
||||||
|
// Flag indicating whether object read errors should be ignored.
|
||||||
|
bool ignore_errors = 2;
|
||||||
|
}
|
||||||
|
|
||||||
|
Body body = 1;
|
||||||
|
Signature signature = 2;
|
||||||
|
}
|
||||||
|
|
||||||
|
message SealWriteCacheResponse {
|
||||||
|
message Body {
|
||||||
|
message Status {
|
||||||
|
bytes shard_ID = 1;
|
||||||
|
bool success = 2;
|
||||||
|
string error = 3;
|
||||||
|
}
|
||||||
|
repeated Status results = 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
Body body = 1;
|
||||||
|
|
||||||
|
Signature signature = 2;
|
||||||
|
}
|
215
pkg/services/control/service_frostfs.pb.go
generated
|
@ -1180,6 +1180,7 @@ func (x *FlushCacheRequest_Body) StableSize() (size int) {
|
||||||
return 0
|
return 0
|
||||||
}
|
}
|
||||||
size += proto.RepeatedBytesSize(1, x.Shard_ID)
|
size += proto.RepeatedBytesSize(1, x.Shard_ID)
|
||||||
|
size += proto.BoolSize(2, x.Seal)
|
||||||
return size
|
return size
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1200,6 +1201,7 @@ func (x *FlushCacheRequest_Body) StableMarshal(buf []byte) []byte {
|
||||||
}
|
}
|
||||||
var offset int
|
var offset int
|
||||||
offset += proto.RepeatedBytesMarshal(1, buf[offset:], x.Shard_ID)
|
offset += proto.RepeatedBytesMarshal(1, buf[offset:], x.Shard_ID)
|
||||||
|
offset += proto.BoolMarshal(2, buf[offset:], x.Seal)
|
||||||
return buf
|
return buf
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2753,3 +2755,216 @@ func (x *RemoveChainLocalOverrideResponse) ReadSignedData(buf []byte) ([]byte, e
|
||||||
func (x *RemoveChainLocalOverrideResponse) SetSignature(sig *Signature) {
|
func (x *RemoveChainLocalOverrideResponse) SetSignature(sig *Signature) {
|
||||||
x.Signature = sig
|
x.Signature = sig
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// StableSize returns the size of x in protobuf format.
|
||||||
|
//
|
||||||
|
// Structures with the same field values have the same binary size.
|
||||||
|
func (x *SealWriteCacheRequest_Body) StableSize() (size int) {
|
||||||
|
if x == nil {
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
size += proto.RepeatedBytesSize(1, x.Shard_ID)
|
||||||
|
size += proto.BoolSize(2, x.IgnoreErrors)
|
||||||
|
return size
|
||||||
|
}
|
||||||
|
|
||||||
|
// StableMarshal marshals x in protobuf binary format with stable field order.
|
||||||
|
//
|
||||||
|
// If buffer length is less than x.StableSize(), new buffer is allocated.
|
||||||
|
//
|
||||||
|
// Returns any error encountered which did not allow writing the data completely.
|
||||||
|
// Otherwise, returns the buffer in which the data is written.
|
||||||
|
//
|
||||||
|
// Structures with the same field values have the same binary format.
|
||||||
|
func (x *SealWriteCacheRequest_Body) StableMarshal(buf []byte) []byte {
|
||||||
|
if x == nil {
|
||||||
|
return []byte{}
|
||||||
|
}
|
||||||
|
if buf == nil {
|
||||||
|
buf = make([]byte, x.StableSize())
|
||||||
|
}
|
||||||
|
var offset int
|
||||||
|
offset += proto.RepeatedBytesMarshal(1, buf[offset:], x.Shard_ID)
|
||||||
|
offset += proto.BoolMarshal(2, buf[offset:], x.IgnoreErrors)
|
||||||
|
return buf
|
||||||
|
}
|
||||||
|
|
||||||
|
// StableSize returns the size of x in protobuf format.
|
||||||
|
//
|
||||||
|
// Structures with the same field values have the same binary size.
|
||||||
|
func (x *SealWriteCacheRequest) StableSize() (size int) {
|
||||||
|
if x == nil {
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
size += proto.NestedStructureSize(1, x.Body)
|
||||||
|
size += proto.NestedStructureSize(2, x.Signature)
|
||||||
|
return size
|
||||||
|
}
|
||||||
|
|
||||||
|
// StableMarshal marshals x in protobuf binary format with stable field order.
|
||||||
|
//
|
||||||
|
// If buffer length is less than x.StableSize(), new buffer is allocated.
|
||||||
|
//
|
||||||
|
// Returns any error encountered which did not allow writing the data completely.
|
||||||
|
// Otherwise, returns the buffer in which the data is written.
|
||||||
|
//
|
||||||
|
// Structures with the same field values have the same binary format.
|
||||||
|
func (x *SealWriteCacheRequest) StableMarshal(buf []byte) []byte {
|
||||||
|
if x == nil {
|
||||||
|
return []byte{}
|
||||||
|
}
|
||||||
|
if buf == nil {
|
||||||
|
buf = make([]byte, x.StableSize())
|
||||||
|
}
|
||||||
|
var offset int
|
||||||
|
offset += proto.NestedStructureMarshal(1, buf[offset:], x.Body)
|
||||||
|
offset += proto.NestedStructureMarshal(2, buf[offset:], x.Signature)
|
||||||
|
return buf
|
||||||
|
}
|
||||||
|
|
||||||
|
// ReadSignedData fills buf with signed data of x.
|
||||||
|
// If buffer length is less than x.SignedDataSize(), new buffer is allocated.
|
||||||
|
//
|
||||||
|
// Returns any error encountered which did not allow writing the data completely.
|
||||||
|
// Otherwise, returns the buffer in which the data is written.
|
||||||
|
//
|
||||||
|
// Structures with the same field values have the same signed data.
|
||||||
|
func (x *SealWriteCacheRequest) SignedDataSize() int {
|
||||||
|
return x.GetBody().StableSize()
|
||||||
|
}
|
||||||
|
|
||||||
|
// SignedDataSize returns size of the request signed data in bytes.
|
||||||
|
//
|
||||||
|
// Structures with the same field values have the same signed data size.
|
||||||
|
func (x *SealWriteCacheRequest) ReadSignedData(buf []byte) ([]byte, error) {
|
||||||
|
return x.GetBody().StableMarshal(buf), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (x *SealWriteCacheRequest) SetSignature(sig *Signature) {
|
||||||
|
x.Signature = sig
|
||||||
|
}
|
||||||
|
|
||||||
|
// StableSize returns the size of x in protobuf format.
|
||||||
|
//
|
||||||
|
// Structures with the same field values have the same binary size.
|
||||||
|
func (x *SealWriteCacheResponse_Body_Status) StableSize() (size int) {
|
||||||
|
if x == nil {
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
size += proto.BytesSize(1, x.Shard_ID)
|
||||||
|
size += proto.BoolSize(2, x.Success)
|
||||||
|
size += proto.StringSize(3, x.Error)
|
||||||
|
return size
|
||||||
|
}
|
||||||
|
|
||||||
|
// StableMarshal marshals x in protobuf binary format with stable field order.
|
||||||
|
//
|
||||||
|
// If buffer length is less than x.StableSize(), new buffer is allocated.
|
||||||
|
//
|
||||||
|
// Returns any error encountered which did not allow writing the data completely.
|
||||||
|
// Otherwise, returns the buffer in which the data is written.
|
||||||
|
//
|
||||||
|
// Structures with the same field values have the same binary format.
|
||||||
|
func (x *SealWriteCacheResponse_Body_Status) StableMarshal(buf []byte) []byte {
|
||||||
|
if x == nil {
|
||||||
|
return []byte{}
|
||||||
|
}
|
||||||
|
if buf == nil {
|
||||||
|
buf = make([]byte, x.StableSize())
|
||||||
|
}
|
||||||
|
var offset int
|
||||||
|
offset += proto.BytesMarshal(1, buf[offset:], x.Shard_ID)
|
||||||
|
offset += proto.BoolMarshal(2, buf[offset:], x.Success)
|
||||||
|
offset += proto.StringMarshal(3, buf[offset:], x.Error)
|
||||||
|
return buf
|
||||||
|
}
|
||||||
|
|
||||||
|
// StableSize returns the size of x in protobuf format.
|
||||||
|
//
|
||||||
|
// Structures with the same field values have the same binary size.
|
||||||
|
func (x *SealWriteCacheResponse_Body) StableSize() (size int) {
|
||||||
|
if x == nil {
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
for i := range x.Results {
|
||||||
|
size += proto.NestedStructureSize(1, x.Results[i])
|
||||||
|
}
|
||||||
|
return size
|
||||||
|
}
|
||||||
|
|
||||||
|
// StableMarshal marshals x in protobuf binary format with stable field order.
|
||||||
|
//
|
||||||
|
// If buffer length is less than x.StableSize(), new buffer is allocated.
|
||||||
|
//
|
||||||
|
// Returns any error encountered which did not allow writing the data completely.
|
||||||
|
// Otherwise, returns the buffer in which the data is written.
|
||||||
|
//
|
||||||
|
// Structures with the same field values have the same binary format.
|
||||||
|
func (x *SealWriteCacheResponse_Body) StableMarshal(buf []byte) []byte {
|
||||||
|
if x == nil {
|
||||||
|
return []byte{}
|
||||||
|
}
|
||||||
|
if buf == nil {
|
||||||
|
buf = make([]byte, x.StableSize())
|
||||||
|
}
|
||||||
|
var offset int
|
||||||
|
for i := range x.Results {
|
||||||
|
offset += proto.NestedStructureMarshal(1, buf[offset:], x.Results[i])
|
||||||
|
}
|
||||||
|
return buf
|
||||||
|
}
|
||||||
|
|
||||||
|
// StableSize returns the size of x in protobuf format.
|
||||||
|
//
|
||||||
|
// Structures with the same field values have the same binary size.
|
||||||
|
func (x *SealWriteCacheResponse) StableSize() (size int) {
|
||||||
|
if x == nil {
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
size += proto.NestedStructureSize(1, x.Body)
|
||||||
|
size += proto.NestedStructureSize(2, x.Signature)
|
||||||
|
return size
|
||||||
|
}
|
||||||
|
|
||||||
|
// StableMarshal marshals x in protobuf binary format with stable field order.
|
||||||
|
//
|
||||||
|
// If buffer length is less than x.StableSize(), new buffer is allocated.
|
||||||
|
//
|
||||||
|
// Returns any error encountered which did not allow writing the data completely.
|
||||||
|
// Otherwise, returns the buffer in which the data is written.
|
||||||
|
//
|
||||||
|
// Structures with the same field values have the same binary format.
|
||||||
|
func (x *SealWriteCacheResponse) StableMarshal(buf []byte) []byte {
|
||||||
|
if x == nil {
|
||||||
|
return []byte{}
|
||||||
|
}
|
||||||
|
if buf == nil {
|
||||||
|
buf = make([]byte, x.StableSize())
|
||||||
|
}
|
||||||
|
var offset int
|
||||||
|
offset += proto.NestedStructureMarshal(1, buf[offset:], x.Body)
|
||||||
|
offset += proto.NestedStructureMarshal(2, buf[offset:], x.Signature)
|
||||||
|
return buf
|
||||||
|
}
|
||||||
|
|
||||||
|
// ReadSignedData fills buf with signed data of x.
|
||||||
|
// If buffer length is less than x.SignedDataSize(), new buffer is allocated.
|
||||||
|
//
|
||||||
|
// Returns any error encountered which did not allow writing the data completely.
|
||||||
|
// Otherwise, returns the buffer in which the data is written.
|
||||||
|
//
|
||||||
|
// Structures with the same field values have the same signed data.
|
||||||
|
func (x *SealWriteCacheResponse) SignedDataSize() int {
|
||||||
|
return x.GetBody().StableSize()
|
||||||
|
}
|
||||||
|
|
||||||
|
// SignedDataSize returns size of the request signed data in bytes.
|
||||||
|
//
|
||||||
|
// Structures with the same field values have the same signed data size.
|
||||||
|
func (x *SealWriteCacheResponse) ReadSignedData(buf []byte) ([]byte, error) {
|
||||||
|
return x.GetBody().StableMarshal(buf), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (x *SealWriteCacheResponse) SetSignature(sig *Signature) {
|
||||||
|
x.Signature = sig
|
||||||
|
}
|
||||||
|
|
39
pkg/services/control/service_grpc.pb.go
generated
|
@ -35,6 +35,7 @@ const (
|
||||||
ControlService_GetChainLocalOverride_FullMethodName = "/control.ControlService/GetChainLocalOverride"
|
ControlService_GetChainLocalOverride_FullMethodName = "/control.ControlService/GetChainLocalOverride"
|
||||||
ControlService_ListChainLocalOverrides_FullMethodName = "/control.ControlService/ListChainLocalOverrides"
|
ControlService_ListChainLocalOverrides_FullMethodName = "/control.ControlService/ListChainLocalOverrides"
|
||||||
ControlService_RemoveChainLocalOverride_FullMethodName = "/control.ControlService/RemoveChainLocalOverride"
|
ControlService_RemoveChainLocalOverride_FullMethodName = "/control.ControlService/RemoveChainLocalOverride"
|
||||||
|
ControlService_SealWriteCache_FullMethodName = "/control.ControlService/SealWriteCache"
|
||||||
)
|
)
|
||||||
|
|
||||||
// ControlServiceClient is the client API for ControlService service.
|
// ControlServiceClient is the client API for ControlService service.
|
||||||
|
@ -74,6 +75,8 @@ type ControlServiceClient interface {
|
||||||
ListChainLocalOverrides(ctx context.Context, in *ListChainLocalOverridesRequest, opts ...grpc.CallOption) (*ListChainLocalOverridesResponse, error)
|
ListChainLocalOverrides(ctx context.Context, in *ListChainLocalOverridesRequest, opts ...grpc.CallOption) (*ListChainLocalOverridesResponse, error)
|
||||||
// Remove local access policy engine overrides stored in the node by chaind id.
|
// Remove local access policy engine overrides stored in the node by chaind id.
|
||||||
RemoveChainLocalOverride(ctx context.Context, in *RemoveChainLocalOverrideRequest, opts ...grpc.CallOption) (*RemoveChainLocalOverrideResponse, error)
|
RemoveChainLocalOverride(ctx context.Context, in *RemoveChainLocalOverrideRequest, opts ...grpc.CallOption) (*RemoveChainLocalOverrideResponse, error)
|
||||||
|
// Flush objects from write-cache and move it to degraded read only mode.
|
||||||
|
SealWriteCache(ctx context.Context, in *SealWriteCacheRequest, opts ...grpc.CallOption) (*SealWriteCacheResponse, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
type controlServiceClient struct {
|
type controlServiceClient struct {
|
||||||
|
@ -228,6 +231,15 @@ func (c *controlServiceClient) RemoveChainLocalOverride(ctx context.Context, in
|
||||||
return out, nil
|
return out, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *controlServiceClient) SealWriteCache(ctx context.Context, in *SealWriteCacheRequest, opts ...grpc.CallOption) (*SealWriteCacheResponse, error) {
|
||||||
|
out := new(SealWriteCacheResponse)
|
||||||
|
err := c.cc.Invoke(ctx, ControlService_SealWriteCache_FullMethodName, in, out, opts...)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return out, nil
|
||||||
|
}
|
||||||
|
|
||||||
// ControlServiceServer is the server API for ControlService service.
|
// ControlServiceServer is the server API for ControlService service.
|
||||||
// All implementations should embed UnimplementedControlServiceServer
|
// All implementations should embed UnimplementedControlServiceServer
|
||||||
// for forward compatibility
|
// for forward compatibility
|
||||||
|
@ -265,6 +277,8 @@ type ControlServiceServer interface {
|
||||||
ListChainLocalOverrides(context.Context, *ListChainLocalOverridesRequest) (*ListChainLocalOverridesResponse, error)
|
ListChainLocalOverrides(context.Context, *ListChainLocalOverridesRequest) (*ListChainLocalOverridesResponse, error)
|
||||||
// Remove local access policy engine overrides stored in the node by chaind id.
|
// Remove local access policy engine overrides stored in the node by chaind id.
|
||||||
RemoveChainLocalOverride(context.Context, *RemoveChainLocalOverrideRequest) (*RemoveChainLocalOverrideResponse, error)
|
RemoveChainLocalOverride(context.Context, *RemoveChainLocalOverrideRequest) (*RemoveChainLocalOverrideResponse, error)
|
||||||
|
// Flush objects from write-cache and move it to degraded read only mode.
|
||||||
|
SealWriteCache(context.Context, *SealWriteCacheRequest) (*SealWriteCacheResponse, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
// UnimplementedControlServiceServer should be embedded to have forward compatible implementations.
|
// UnimplementedControlServiceServer should be embedded to have forward compatible implementations.
|
||||||
|
@ -319,6 +333,9 @@ func (UnimplementedControlServiceServer) ListChainLocalOverrides(context.Context
|
||||||
func (UnimplementedControlServiceServer) RemoveChainLocalOverride(context.Context, *RemoveChainLocalOverrideRequest) (*RemoveChainLocalOverrideResponse, error) {
|
func (UnimplementedControlServiceServer) RemoveChainLocalOverride(context.Context, *RemoveChainLocalOverrideRequest) (*RemoveChainLocalOverrideResponse, error) {
|
||||||
return nil, status.Errorf(codes.Unimplemented, "method RemoveChainLocalOverride not implemented")
|
return nil, status.Errorf(codes.Unimplemented, "method RemoveChainLocalOverride not implemented")
|
||||||
}
|
}
|
||||||
|
func (UnimplementedControlServiceServer) SealWriteCache(context.Context, *SealWriteCacheRequest) (*SealWriteCacheResponse, error) {
|
||||||
|
return nil, status.Errorf(codes.Unimplemented, "method SealWriteCache not implemented")
|
||||||
|
}
|
||||||
|
|
||||||
// UnsafeControlServiceServer may be embedded to opt out of forward compatibility for this service.
|
// UnsafeControlServiceServer may be embedded to opt out of forward compatibility for this service.
|
||||||
// Use of this interface is not recommended, as added methods to ControlServiceServer will
|
// Use of this interface is not recommended, as added methods to ControlServiceServer will
|
||||||
|
@ -619,6 +636,24 @@ func _ControlService_RemoveChainLocalOverride_Handler(srv interface{}, ctx conte
|
||||||
return interceptor(ctx, in, info, handler)
|
return interceptor(ctx, in, info, handler)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func _ControlService_SealWriteCache_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
|
||||||
|
in := new(SealWriteCacheRequest)
|
||||||
|
if err := dec(in); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if interceptor == nil {
|
||||||
|
return srv.(ControlServiceServer).SealWriteCache(ctx, in)
|
||||||
|
}
|
||||||
|
info := &grpc.UnaryServerInfo{
|
||||||
|
Server: srv,
|
||||||
|
FullMethod: ControlService_SealWriteCache_FullMethodName,
|
||||||
|
}
|
||||||
|
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
|
||||||
|
return srv.(ControlServiceServer).SealWriteCache(ctx, req.(*SealWriteCacheRequest))
|
||||||
|
}
|
||||||
|
return interceptor(ctx, in, info, handler)
|
||||||
|
}
|
||||||
|
|
||||||
// ControlService_ServiceDesc is the grpc.ServiceDesc for ControlService service.
|
// ControlService_ServiceDesc is the grpc.ServiceDesc for ControlService service.
|
||||||
// It's only intended for direct use with grpc.RegisterService,
|
// It's only intended for direct use with grpc.RegisterService,
|
||||||
// and not to be introspected or modified (even as a copy)
|
// and not to be introspected or modified (even as a copy)
|
||||||
|
@ -690,6 +725,10 @@ var ControlService_ServiceDesc = grpc.ServiceDesc{
|
||||||
MethodName: "RemoveChainLocalOverride",
|
MethodName: "RemoveChainLocalOverride",
|
||||||
Handler: _ControlService_RemoveChainLocalOverride_Handler,
|
Handler: _ControlService_RemoveChainLocalOverride_Handler,
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
MethodName: "SealWriteCache",
|
||||||
|
Handler: _ControlService_SealWriteCache_Handler,
|
||||||
|
},
|
||||||
},
|
},
|
||||||
Streams: []grpc.StreamDesc{},
|
Streams: []grpc.StreamDesc{},
|
||||||
Metadata: "pkg/services/control/service.proto",
|
Metadata: "pkg/services/control/service.proto",
|
||||||
|
|
s/performs/is performed by/
fixed