[#270] Add IR epoch tick control call #282
20 changed files with 188 additions and 18 deletions
15
cmd/frostfs-cli/modules/control/ir.go
Normal file
15
cmd/frostfs-cli/modules/control/ir.go
Normal file
|
@ -0,0 +1,15 @@
|
|||
package control
|
||||
|
||||
import "github.com/spf13/cobra"
|
||||
|
||||
var irCmd = &cobra.Command{
|
||||
Use: "ir",
|
||||
Short: "Operations with inner ring nodes",
|
||||
Long: "Operations with inner ring nodes",
|
||||
}
|
||||
|
||||
func initControlIRCmd() {
|
||||
irCmd.AddCommand(tickEpochCmd)
|
||||
|
||||
initControlIRTickEpochCmd()
|
||||
}
|
43
cmd/frostfs-cli/modules/control/ir_tick_epoch.go
Normal file
43
cmd/frostfs-cli/modules/control/ir_tick_epoch.go
Normal file
|
@ -0,0 +1,43 @@
|
|||
package control
|
||||
|
||||
import (
|
||||
rawclient "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc/client"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/key"
|
||||
commonCmd "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/internal/common"
|
||||
ircontrol "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control/ir"
|
||||
ircontrolsrv "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control/ir/server"
|
||||
"github.com/spf13/cobra"
|
||||
)
|
||||
|
||||
var tickEpochCmd = &cobra.Command{
|
||||
Use: "tick-epoch",
|
||||
Short: "Forces a new epoch",
|
||||
Long: "Forces a new epoch via a notary request. It should be executed on other IR nodes as well.",
|
||||
fyrchik marked this conversation as resolved
Outdated
|
||||
Run: tickEpoch,
|
||||
}
|
||||
|
||||
func initControlIRTickEpochCmd() {
|
||||
initControlFlags(tickEpochCmd)
|
||||
}
|
||||
|
||||
func tickEpoch(cmd *cobra.Command, _ []string) {
|
||||
pk := key.Get(cmd)
|
||||
c := getClient(cmd, pk)
|
||||
|
||||
req := new(ircontrol.TickEpochRequest)
|
||||
req.SetBody(new(ircontrol.TickEpochRequest_Body))
|
||||
|
||||
err := ircontrolsrv.SignMessage(pk, req)
|
||||
commonCmd.ExitOnErr(cmd, "could not sign request: %w", err)
|
||||
|
||||
var resp *ircontrol.TickEpochResponse
|
||||
err = c.ExecRaw(func(client *rawclient.Client) error {
|
||||
resp, err = ircontrol.TickEpoch(client, req)
|
||||
return err
|
||||
})
|
||||
commonCmd.ExitOnErr(cmd, "rpc error: %w", err)
|
||||
|
||||
verifyResponse(cmd, resp.GetSignature(), resp.GetBody())
|
||||
|
||||
cmd.Println("Epoch tick requested")
|
||||
}
|
|
@ -33,6 +33,7 @@ func init() {
|
|||
dropObjectsCmd,
|
||||
shardsCmd,
|
||||
synchronizeTreeCmd,
|
||||
irCmd,
|
||||
)
|
||||
|
||||
initControlHealthCheckCmd()
|
||||
|
@ -40,4 +41,5 @@ func init() {
|
|||
initControlDropObjectsCmd()
|
||||
initControlShardsCmd()
|
||||
initControlSynchronizeTreeCmd()
|
||||
initControlIRCmd()
|
||||
}
|
||||
|
|
|
@ -459,7 +459,7 @@ func (s *Server) initGRPCServer(cfg *viper.Viper) error {
|
|||
p.SetPrivateKey(*s.key)
|
||||
p.SetHealthChecker(s)
|
||||
|
||||
controlSvc := controlsrv.New(p,
|
||||
controlSvc := controlsrv.New(p, s.netmapClient,
|
||||
controlsrv.WithAllowedKeys(authKeys),
|
||||
)
|
||||
|
||||
|
|
|
@ -79,7 +79,7 @@ func (np *Processor) processNewEpochTick() {
|
|||
nextEpoch := np.epochState.EpochCounter() + 1
|
||||
np.log.Debug(logs.NetmapNextEpoch, zap.Uint64("value", nextEpoch))
|
||||
|
||||
err := np.netmapClient.NewEpoch(nextEpoch)
|
||||
err := np.netmapClient.NewEpoch(nextEpoch, false)
|
||||
if err != nil {
|
||||
np.log.Error(logs.NetmapCantInvokeNetmapNewEpoch, zap.Error(err))
|
||||
}
|
||||
|
|
|
@ -8,10 +8,14 @@ import (
|
|||
|
||||
// NewEpoch updates FrostFS epoch number through
|
||||
// Netmap contract call.
|
||||
func (c *Client) NewEpoch(epoch uint64) error {
|
||||
// If `force` is true, this call is normally initiated by a control
|
||||
fyrchik
commented
Why is it called Why is it called `force` then?
ale64bit
commented
what should it be called? what should it be called?
|
||||
// service command and uses a control notary transaction internally
|
||||
// to ensure all nodes produce the same transaction with high probability.
|
||||
func (c *Client) NewEpoch(epoch uint64, force bool) error {
|
||||
prm := client.InvokePrm{}
|
||||
prm.SetMethod(newEpochMethod)
|
||||
prm.SetArgs(epoch)
|
||||
prm.SetControlTX(force)
|
||||
|
||||
if err := c.client.Invoke(prm); err != nil {
|
||||
return fmt.Errorf("could not invoke method (%s): %w", newEpochMethod, err)
|
||||
|
|
|
@ -886,6 +886,16 @@ func CalculateNotaryDepositAmount(c *Client, gasMul, gasDiv int64) (fixedn.Fixed
|
|||
// CalculateNonceAndVUB calculates nonce and ValidUntilBlock values
|
||||
// based on transaction hash.
|
||||
func (c *Client) CalculateNonceAndVUB(hash util.Uint256) (nonce uint32, vub uint32, err error) {
|
||||
return c.calculateNonceAndVUB(hash, false)
|
||||
}
|
||||
|
||||
// CalculateNonceAndVUBControl calculates nonce and rounded ValidUntilBlock values
|
||||
// based on transaction hash for use in control transactions.
|
||||
func (c *Client) CalculateNonceAndVUBControl(hash util.Uint256) (nonce uint32, vub uint32, err error) {
|
||||
return c.calculateNonceAndVUB(hash, true)
|
||||
}
|
||||
|
||||
func (c *Client) calculateNonceAndVUB(hash util.Uint256, roundBlockHeight bool) (nonce uint32, vub uint32, err error) {
|
||||
c.switchLock.RLock()
|
||||
defer c.switchLock.RUnlock()
|
||||
|
||||
|
@ -904,6 +914,14 @@ func (c *Client) CalculateNonceAndVUB(hash util.Uint256) (nonce uint32, vub uint
|
|||
return 0, 0, fmt.Errorf("could not get transaction height: %w", err)
|
||||
}
|
||||
|
||||
// For control transactions, we round down the block height to control the
|
||||
// probability of all nodes producing the same transaction, since it depends
|
||||
// on this value.
|
||||
if roundBlockHeight {
|
||||
inc := c.rpcActor.GetVersion().Protocol.MaxValidUntilBlockIncrement
|
||||
height = height / inc * inc
|
||||
}
|
||||
|
||||
return nonce, height + c.notary.txValidTime, nil
|
||||
}
|
||||
|
||||
|
|
|
@ -94,6 +94,12 @@ type InvokePrmOptional struct {
|
|||
// `validUntilBlock` values by all notification
|
||||
// receivers.
|
||||
hash *util.Uint256
|
||||
// controlTX controls whether the invoke method will use a rounded
|
||||
// block height value, which is useful for control transactions which
|
||||
// are required to be produced by all nodes with very high probability.
|
||||
// It's only used by notary transactions and it affects only the
|
||||
// computation of `validUntilBlock` values.
|
||||
controlTX bool
|
||||
}
|
||||
|
||||
// SetHash sets optional hash of the transaction.
|
||||
|
@ -104,6 +110,11 @@ func (i *InvokePrmOptional) SetHash(hash util.Uint256) {
|
|||
i.hash = &hash
|
||||
}
|
||||
|
||||
// SetControlTX sets whether a control transaction will be used.
|
||||
func (i *InvokePrmOptional) SetControlTX(b bool) {
|
||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
What about using What about using `calculateVUB` as a parameter instead of less descriptive `controlTX`?
ale64bit
commented
discussed offline. Renamed to discussed offline. Renamed to `roundBlockHeight` and added additional comment.
|
||||
i.controlTX = b
|
||||
}
|
||||
|
||||
// Invoke calls Invoke method of Client with static internal script hash and fee.
|
||||
// Supported args types are the same as in Client.
|
||||
//
|
||||
|
@ -126,7 +137,11 @@ func (s StaticClient) Invoke(prm InvokePrm) error {
|
|||
)
|
||||
|
||||
if prm.hash != nil {
|
||||
nonce, vub, err = s.client.CalculateNonceAndVUB(*prm.hash)
|
||||
if prm.controlTX {
|
||||
nonce, vub, err = s.client.CalculateNonceAndVUBControl(*prm.hash)
|
||||
} else {
|
||||
nonce, vub, err = s.client.CalculateNonceAndVUB(*prm.hash)
|
||||
}
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not calculate nonce and VUB for notary alphabet invoke: %w", err)
|
||||
}
|
||||
|
|
|
@ -14,18 +14,18 @@ func (w *requestWrapper) ToGRPCMessage() grpc.Message {
|
|||
return w.m
|
||||
}
|
||||
|
||||
type healthCheckResponseWrapper struct {
|
||||
m *HealthCheckResponse
|
||||
type responseWrapper[M grpc.Message] struct {
|
||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
Unrelated to the commit Unrelated to the commit
ale64bit
commented
It's used by It's used by `sendUnary`.
|
||||
m M
|
||||
}
|
||||
|
||||
func (w *healthCheckResponseWrapper) ToGRPCMessage() grpc.Message {
|
||||
func (w *responseWrapper[M]) ToGRPCMessage() grpc.Message {
|
||||
return w.m
|
||||
}
|
||||
|
||||
func (w *healthCheckResponseWrapper) FromGRPCMessage(m grpc.Message) error {
|
||||
func (w *responseWrapper[M]) FromGRPCMessage(m grpc.Message) error {
|
||||
var ok bool
|
||||
|
||||
w.m, ok = m.(*HealthCheckResponse)
|
||||
w.m, ok = m.(M)
|
||||
if !ok {
|
||||
return message.NewUnexpectedMessageType(m, w.m)
|
||||
}
|
||||
|
|
|
@ -3,12 +3,14 @@ package control
|
|||
import (
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc/client"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc/common"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc/grpc"
|
||||
)
|
||||
|
||||
const serviceName = "ircontrol.ControlService"
|
||||
|
||||
const (
|
||||
rpcHealthCheck = "HealthCheck"
|
||||
rpcTickEpoch = "TickEpoch"
|
||||
)
|
||||
|
||||
// HealthCheck executes ControlService.HealthCheck RPC.
|
||||
|
@ -17,15 +19,29 @@ func HealthCheck(
|
|||
req *HealthCheckRequest,
|
||||
opts ...client.CallOption,
|
||||
) (*HealthCheckResponse, error) {
|
||||
wResp := &healthCheckResponseWrapper{
|
||||
m: new(HealthCheckResponse),
|
||||
return sendUnary[HealthCheckRequest, HealthCheckResponse](cli, rpcHealthCheck, req, opts...)
|
||||
}
|
||||
|
||||
// TickEpoch executes ControlService.TickEpoch RPC.
|
||||
func TickEpoch(
|
||||
cli *client.Client,
|
||||
req *TickEpochRequest,
|
||||
opts ...client.CallOption,
|
||||
) (*TickEpochResponse, error) {
|
||||
return sendUnary[TickEpochRequest, TickEpochResponse](cli, rpcTickEpoch, req, opts...)
|
||||
}
|
||||
|
||||
func sendUnary[I, O grpc.Message](cli *client.Client, rpcName string, req *I, opts ...client.CallOption) (*O, error) {
|
||||
var resp O
|
||||
wResp := &responseWrapper[*O]{
|
||||
m: &resp,
|
||||
}
|
||||
|
||||
wReq := &requestWrapper{
|
||||
m: req,
|
||||
}
|
||||
|
||||
err := client.SendUnary(cli, common.CallMethodInfoUnary(serviceName, rpcHealthCheck), wReq, wResp, opts...)
|
||||
err := client.SendUnary(cli, common.CallMethodInfoUnary(serviceName, rpcName), wReq, wResp, opts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
@ -2,6 +2,7 @@ package control
|
|||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
control "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control/ir"
|
||||
"google.golang.org/grpc/codes"
|
||||
|
@ -12,12 +13,10 @@ import (
|
|||
//
|
||||
// If request is not signed with a key from white list, permission error returns.
|
||||
func (s *Server) HealthCheck(_ context.Context, req *control.HealthCheckRequest) (*control.HealthCheckResponse, error) {
|
||||
// verify request
|
||||
if err := s.isValidRequest(req); err != nil {
|
||||
return nil, status.Error(codes.PermissionDenied, err.Error())
|
||||
}
|
||||
|
||||
// create and fill response
|
||||
resp := new(control.HealthCheckResponse)
|
||||
|
||||
body := new(control.HealthCheckResponse_Body)
|
||||
|
@ -25,7 +24,33 @@ func (s *Server) HealthCheck(_ context.Context, req *control.HealthCheckRequest)
|
|||
|
||||
body.SetHealthStatus(s.prm.healthChecker.HealthStatus())
|
||||
|
||||
// sign the response
|
||||
if err := SignMessage(&s.prm.key.PrivateKey, resp); err != nil {
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
// TickEpoch forces a new epoch.
|
||||
//
|
||||
// If request is not signed with a key from white list, permission error returns.
|
||||
func (s *Server) TickEpoch(_ context.Context, req *control.TickEpochRequest) (*control.TickEpochResponse, error) {
|
||||
if err := s.isValidRequest(req); err != nil {
|
||||
return nil, status.Error(codes.PermissionDenied, err.Error())
|
||||
}
|
||||
|
||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
We have this such comments in code, but IMO they are not needed, especially when the function is called We have this such comments in code, but IMO they are not needed, especially when the function is called `isValidRequest`
ale64bit
commented
done done
|
||||
resp := new(control.TickEpochResponse)
|
||||
resp.SetBody(new(control.TickEpochResponse_Body))
|
||||
|
||||
epoch, err := s.netmapClient.Epoch()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("getting current epoch: %w", err)
|
||||
}
|
||||
|
||||
if err := s.netmapClient.NewEpoch(epoch+1, true); err != nil {
|
||||
return nil, fmt.Errorf("forcing new epoch: %w", err)
|
||||
}
|
||||
|
||||
if err := SignMessage(&s.prm.key.PrivateKey, resp); err != nil {
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
|
|
|
@ -2,6 +2,8 @@ package control
|
|||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/netmap"
|
||||
)
|
||||
|
||||
// Server is an entity that serves
|
||||
|
@ -10,7 +12,8 @@ import (
|
|||
// To gain access to the service, any request must be
|
||||
// signed with a key from the white list.
|
||||
type Server struct {
|
||||
prm Prm
|
||||
prm Prm
|
||||
netmapClient *netmap.Client
|
||||
|
||||
allowedKeys [][]byte
|
||||
}
|
||||
|
@ -29,7 +32,7 @@ func panicOnPrmValue(n string, v any) {
|
|||
// Forms white list from all keys specified via
|
||||
// WithAllowedKeys option and a public key of
|
||||
// the parameterized private key.
|
||||
func New(prm Prm, opts ...Option) *Server {
|
||||
func New(prm Prm, netmapClient *netmap.Client, opts ...Option) *Server {
|
||||
// verify required parameters
|
||||
switch {
|
||||
case prm.healthChecker == nil:
|
||||
|
@ -44,7 +47,8 @@ func New(prm Prm, opts ...Option) *Server {
|
|||
}
|
||||
|
||||
return &Server{
|
||||
prm: prm,
|
||||
prm: prm,
|
||||
netmapClient: netmapClient,
|
||||
|
||||
allowedKeys: append(o.allowedKeys, prm.key.PublicKey().Bytes()),
|
||||
}
|
||||
|
|
|
@ -20,3 +20,15 @@ func (x *HealthCheckResponse) SetBody(v *HealthCheckResponse_Body) {
|
|||
x.Body = v
|
||||
}
|
||||
}
|
||||
|
||||
func (x *TickEpochRequest) SetBody(v *TickEpochRequest_Body) {
|
||||
if x != nil {
|
||||
x.Body = v
|
||||
}
|
||||
}
|
||||
|
||||
func (x *TickEpochResponse) SetBody(v *TickEpochResponse_Body) {
|
||||
if x != nil {
|
||||
x.Body = v
|
||||
}
|
||||
}
|
||||
|
|
BIN
pkg/services/control/ir/service.pb.go
generated
BIN
pkg/services/control/ir/service.pb.go
generated
Binary file not shown.
|
@ -10,6 +10,8 @@ option go_package = "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/ir/
|
|||
service ControlService {
|
||||
// Performs health check of the IR node.
|
||||
rpc HealthCheck (HealthCheckRequest) returns (HealthCheckResponse);
|
||||
// Forces a new epoch to be signaled by the IR node with high probability.
|
||||
rpc TickEpoch (TickEpochRequest) returns (TickEpochResponse);
|
||||
}
|
||||
|
||||
// Health check request.
|
||||
|
@ -41,3 +43,17 @@ message HealthCheckResponse {
|
|||
// Body signature.
|
||||
Signature signature = 2;
|
||||
}
|
||||
|
||||
message TickEpochRequest {
|
||||
message Body{}
|
||||
|
||||
Body body = 1;
|
||||
Signature signature = 2;
|
||||
}
|
||||
|
||||
message TickEpochResponse {
|
||||
message Body{}
|
||||
|
||||
Body body = 1;
|
||||
Signature signature = 2;
|
||||
}
|
||||
|
|
BIN
pkg/services/control/ir/service_frostfs.pb.go
generated
BIN
pkg/services/control/ir/service_frostfs.pb.go
generated
Binary file not shown.
BIN
pkg/services/control/ir/service_grpc.pb.go
generated
BIN
pkg/services/control/ir/service_grpc.pb.go
generated
Binary file not shown.
BIN
pkg/services/control/service.pb.go
generated
BIN
pkg/services/control/service.pb.go
generated
Binary file not shown.
BIN
pkg/services/control/service_grpc.pb.go
generated
BIN
pkg/services/control/service_grpc.pb.go
generated
Binary file not shown.
BIN
pkg/services/tree/service_grpc.pb.go
generated
BIN
pkg/services/tree/service_grpc.pb.go
generated
Binary file not shown.
Loading…
Reference in a new issue
:)
Can we add the probability value? (It is a joke)
Well :) it's not a constant value anyway. But >95%, so likely enough.
This command sends a notary request, so it always succeeds.
Can we mention this and that the command must be executed on other IR nodes?
done