[#270] Add IR epoch tick control call
All checks were successful
ci/woodpecker/push/pre-commit Pipeline was successful

Signed-off-by: Alejandro Lopez <a.lopez@yadro.com>
This commit is contained in:
Alejandro Lopez 2023-04-25 15:09:20 +03:00 committed by Evgenii Stratonikov
parent 58f1ba4b51
commit ff25521204
20 changed files with 188 additions and 18 deletions

View file

@ -0,0 +1,15 @@
package control
import "github.com/spf13/cobra"
var irCmd = &cobra.Command{
Use: "ir",
Short: "Operations with inner ring nodes",
Long: "Operations with inner ring nodes",
}
func initControlIRCmd() {
irCmd.AddCommand(tickEpochCmd)
initControlIRTickEpochCmd()
}

View file

@ -0,0 +1,43 @@
package control
import (
rawclient "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc/client"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/key"
commonCmd "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/internal/common"
ircontrol "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control/ir"
ircontrolsrv "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control/ir/server"
"github.com/spf13/cobra"
)
var tickEpochCmd = &cobra.Command{
Use: "tick-epoch",
Short: "Forces a new epoch",
Long: "Forces a new epoch via a notary request. It should be executed on other IR nodes as well.",
Run: tickEpoch,
}
func initControlIRTickEpochCmd() {
initControlFlags(tickEpochCmd)
}
func tickEpoch(cmd *cobra.Command, _ []string) {
pk := key.Get(cmd)
c := getClient(cmd, pk)
req := new(ircontrol.TickEpochRequest)
req.SetBody(new(ircontrol.TickEpochRequest_Body))
err := ircontrolsrv.SignMessage(pk, req)
commonCmd.ExitOnErr(cmd, "could not sign request: %w", err)
var resp *ircontrol.TickEpochResponse
err = c.ExecRaw(func(client *rawclient.Client) error {
resp, err = ircontrol.TickEpoch(client, req)
return err
})
commonCmd.ExitOnErr(cmd, "rpc error: %w", err)
verifyResponse(cmd, resp.GetSignature(), resp.GetBody())
cmd.Println("Epoch tick requested")
}

View file

@ -33,6 +33,7 @@ func init() {
dropObjectsCmd, dropObjectsCmd,
shardsCmd, shardsCmd,
synchronizeTreeCmd, synchronizeTreeCmd,
irCmd,
) )
initControlHealthCheckCmd() initControlHealthCheckCmd()
@ -40,4 +41,5 @@ func init() {
initControlDropObjectsCmd() initControlDropObjectsCmd()
initControlShardsCmd() initControlShardsCmd()
initControlSynchronizeTreeCmd() initControlSynchronizeTreeCmd()
initControlIRCmd()
} }

View file

@ -459,7 +459,7 @@ func (s *Server) initGRPCServer(cfg *viper.Viper) error {
p.SetPrivateKey(*s.key) p.SetPrivateKey(*s.key)
p.SetHealthChecker(s) p.SetHealthChecker(s)
controlSvc := controlsrv.New(p, controlSvc := controlsrv.New(p, s.netmapClient,
controlsrv.WithAllowedKeys(authKeys), controlsrv.WithAllowedKeys(authKeys),
) )

View file

@ -79,7 +79,7 @@ func (np *Processor) processNewEpochTick() {
nextEpoch := np.epochState.EpochCounter() + 1 nextEpoch := np.epochState.EpochCounter() + 1
np.log.Debug(logs.NetmapNextEpoch, zap.Uint64("value", nextEpoch)) np.log.Debug(logs.NetmapNextEpoch, zap.Uint64("value", nextEpoch))
err := np.netmapClient.NewEpoch(nextEpoch) err := np.netmapClient.NewEpoch(nextEpoch, false)
if err != nil { if err != nil {
np.log.Error(logs.NetmapCantInvokeNetmapNewEpoch, zap.Error(err)) np.log.Error(logs.NetmapCantInvokeNetmapNewEpoch, zap.Error(err))
} }

View file

@ -8,10 +8,14 @@ import (
// NewEpoch updates FrostFS epoch number through // NewEpoch updates FrostFS epoch number through
// Netmap contract call. // Netmap contract call.
func (c *Client) NewEpoch(epoch uint64) error { // If `force` is true, this call is normally initiated by a control
// service command and uses a control notary transaction internally
// to ensure all nodes produce the same transaction with high probability.
func (c *Client) NewEpoch(epoch uint64, force bool) error {
prm := client.InvokePrm{} prm := client.InvokePrm{}
prm.SetMethod(newEpochMethod) prm.SetMethod(newEpochMethod)
prm.SetArgs(epoch) prm.SetArgs(epoch)
prm.SetControlTX(force)
if err := c.client.Invoke(prm); err != nil { if err := c.client.Invoke(prm); err != nil {
return fmt.Errorf("could not invoke method (%s): %w", newEpochMethod, err) return fmt.Errorf("could not invoke method (%s): %w", newEpochMethod, err)

View file

@ -886,6 +886,16 @@ func CalculateNotaryDepositAmount(c *Client, gasMul, gasDiv int64) (fixedn.Fixed
// CalculateNonceAndVUB calculates nonce and ValidUntilBlock values // CalculateNonceAndVUB calculates nonce and ValidUntilBlock values
// based on transaction hash. // based on transaction hash.
func (c *Client) CalculateNonceAndVUB(hash util.Uint256) (nonce uint32, vub uint32, err error) { func (c *Client) CalculateNonceAndVUB(hash util.Uint256) (nonce uint32, vub uint32, err error) {
return c.calculateNonceAndVUB(hash, false)
}
// CalculateNonceAndVUBControl calculates nonce and rounded ValidUntilBlock values
// based on transaction hash for use in control transactions.
func (c *Client) CalculateNonceAndVUBControl(hash util.Uint256) (nonce uint32, vub uint32, err error) {
return c.calculateNonceAndVUB(hash, true)
}
func (c *Client) calculateNonceAndVUB(hash util.Uint256, roundBlockHeight bool) (nonce uint32, vub uint32, err error) {
c.switchLock.RLock() c.switchLock.RLock()
defer c.switchLock.RUnlock() defer c.switchLock.RUnlock()
@ -904,6 +914,14 @@ func (c *Client) CalculateNonceAndVUB(hash util.Uint256) (nonce uint32, vub uint
return 0, 0, fmt.Errorf("could not get transaction height: %w", err) return 0, 0, fmt.Errorf("could not get transaction height: %w", err)
} }
// For control transactions, we round down the block height to control the
// probability of all nodes producing the same transaction, since it depends
// on this value.
if roundBlockHeight {
inc := c.rpcActor.GetVersion().Protocol.MaxValidUntilBlockIncrement
height = height / inc * inc
}
return nonce, height + c.notary.txValidTime, nil return nonce, height + c.notary.txValidTime, nil
} }

View file

@ -94,6 +94,12 @@ type InvokePrmOptional struct {
// `validUntilBlock` values by all notification // `validUntilBlock` values by all notification
// receivers. // receivers.
hash *util.Uint256 hash *util.Uint256
// controlTX controls whether the invoke method will use a rounded
// block height value, which is useful for control transactions which
// are required to be produced by all nodes with very high probability.
// It's only used by notary transactions and it affects only the
// computation of `validUntilBlock` values.
controlTX bool
} }
// SetHash sets optional hash of the transaction. // SetHash sets optional hash of the transaction.
@ -104,6 +110,11 @@ func (i *InvokePrmOptional) SetHash(hash util.Uint256) {
i.hash = &hash i.hash = &hash
} }
// SetControlTX sets whether a control transaction will be used.
func (i *InvokePrmOptional) SetControlTX(b bool) {
i.controlTX = b
}
// Invoke calls Invoke method of Client with static internal script hash and fee. // Invoke calls Invoke method of Client with static internal script hash and fee.
// Supported args types are the same as in Client. // Supported args types are the same as in Client.
// //
@ -126,7 +137,11 @@ func (s StaticClient) Invoke(prm InvokePrm) error {
) )
if prm.hash != nil { if prm.hash != nil {
if prm.controlTX {
nonce, vub, err = s.client.CalculateNonceAndVUBControl(*prm.hash)
} else {
nonce, vub, err = s.client.CalculateNonceAndVUB(*prm.hash) nonce, vub, err = s.client.CalculateNonceAndVUB(*prm.hash)
}
if err != nil { if err != nil {
return fmt.Errorf("could not calculate nonce and VUB for notary alphabet invoke: %w", err) return fmt.Errorf("could not calculate nonce and VUB for notary alphabet invoke: %w", err)
} }

View file

@ -14,18 +14,18 @@ func (w *requestWrapper) ToGRPCMessage() grpc.Message {
return w.m return w.m
} }
type healthCheckResponseWrapper struct { type responseWrapper[M grpc.Message] struct {
m *HealthCheckResponse m M
} }
func (w *healthCheckResponseWrapper) ToGRPCMessage() grpc.Message { func (w *responseWrapper[M]) ToGRPCMessage() grpc.Message {
return w.m return w.m
} }
func (w *healthCheckResponseWrapper) FromGRPCMessage(m grpc.Message) error { func (w *responseWrapper[M]) FromGRPCMessage(m grpc.Message) error {
var ok bool var ok bool
w.m, ok = m.(*HealthCheckResponse) w.m, ok = m.(M)
if !ok { if !ok {
return message.NewUnexpectedMessageType(m, w.m) return message.NewUnexpectedMessageType(m, w.m)
} }

View file

@ -3,12 +3,14 @@ package control
import ( import (
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc/client" "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc/client"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc/common" "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc/common"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc/grpc"
) )
const serviceName = "ircontrol.ControlService" const serviceName = "ircontrol.ControlService"
const ( const (
rpcHealthCheck = "HealthCheck" rpcHealthCheck = "HealthCheck"
rpcTickEpoch = "TickEpoch"
) )
// HealthCheck executes ControlService.HealthCheck RPC. // HealthCheck executes ControlService.HealthCheck RPC.
@ -17,15 +19,29 @@ func HealthCheck(
req *HealthCheckRequest, req *HealthCheckRequest,
opts ...client.CallOption, opts ...client.CallOption,
) (*HealthCheckResponse, error) { ) (*HealthCheckResponse, error) {
wResp := &healthCheckResponseWrapper{ return sendUnary[HealthCheckRequest, HealthCheckResponse](cli, rpcHealthCheck, req, opts...)
m: new(HealthCheckResponse), }
// TickEpoch executes ControlService.TickEpoch RPC.
func TickEpoch(
cli *client.Client,
req *TickEpochRequest,
opts ...client.CallOption,
) (*TickEpochResponse, error) {
return sendUnary[TickEpochRequest, TickEpochResponse](cli, rpcTickEpoch, req, opts...)
}
func sendUnary[I, O grpc.Message](cli *client.Client, rpcName string, req *I, opts ...client.CallOption) (*O, error) {
var resp O
wResp := &responseWrapper[*O]{
m: &resp,
} }
wReq := &requestWrapper{ wReq := &requestWrapper{
m: req, m: req,
} }
err := client.SendUnary(cli, common.CallMethodInfoUnary(serviceName, rpcHealthCheck), wReq, wResp, opts...) err := client.SendUnary(cli, common.CallMethodInfoUnary(serviceName, rpcName), wReq, wResp, opts...)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View file

@ -2,6 +2,7 @@ package control
import ( import (
"context" "context"
"fmt"
control "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control/ir" control "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control/ir"
"google.golang.org/grpc/codes" "google.golang.org/grpc/codes"
@ -12,12 +13,10 @@ import (
// //
// If request is not signed with a key from white list, permission error returns. // If request is not signed with a key from white list, permission error returns.
func (s *Server) HealthCheck(_ context.Context, req *control.HealthCheckRequest) (*control.HealthCheckResponse, error) { func (s *Server) HealthCheck(_ context.Context, req *control.HealthCheckRequest) (*control.HealthCheckResponse, error) {
// verify request
if err := s.isValidRequest(req); err != nil { if err := s.isValidRequest(req); err != nil {
return nil, status.Error(codes.PermissionDenied, err.Error()) return nil, status.Error(codes.PermissionDenied, err.Error())
} }
// create and fill response
resp := new(control.HealthCheckResponse) resp := new(control.HealthCheckResponse)
body := new(control.HealthCheckResponse_Body) body := new(control.HealthCheckResponse_Body)
@ -25,7 +24,33 @@ func (s *Server) HealthCheck(_ context.Context, req *control.HealthCheckRequest)
body.SetHealthStatus(s.prm.healthChecker.HealthStatus()) body.SetHealthStatus(s.prm.healthChecker.HealthStatus())
// sign the response if err := SignMessage(&s.prm.key.PrivateKey, resp); err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
return resp, nil
}
// TickEpoch forces a new epoch.
//
// If request is not signed with a key from white list, permission error returns.
func (s *Server) TickEpoch(_ context.Context, req *control.TickEpochRequest) (*control.TickEpochResponse, error) {
if err := s.isValidRequest(req); err != nil {
return nil, status.Error(codes.PermissionDenied, err.Error())
}
resp := new(control.TickEpochResponse)
resp.SetBody(new(control.TickEpochResponse_Body))
epoch, err := s.netmapClient.Epoch()
if err != nil {
return nil, fmt.Errorf("getting current epoch: %w", err)
}
if err := s.netmapClient.NewEpoch(epoch+1, true); err != nil {
return nil, fmt.Errorf("forcing new epoch: %w", err)
}
if err := SignMessage(&s.prm.key.PrivateKey, resp); err != nil { if err := SignMessage(&s.prm.key.PrivateKey, resp); err != nil {
return nil, status.Error(codes.Internal, err.Error()) return nil, status.Error(codes.Internal, err.Error())
} }

View file

@ -2,6 +2,8 @@ package control
import ( import (
"fmt" "fmt"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/netmap"
) )
// Server is an entity that serves // Server is an entity that serves
@ -11,6 +13,7 @@ import (
// signed with a key from the white list. // signed with a key from the white list.
type Server struct { type Server struct {
prm Prm prm Prm
netmapClient *netmap.Client
allowedKeys [][]byte allowedKeys [][]byte
} }
@ -29,7 +32,7 @@ func panicOnPrmValue(n string, v any) {
// Forms white list from all keys specified via // Forms white list from all keys specified via
// WithAllowedKeys option and a public key of // WithAllowedKeys option and a public key of
// the parameterized private key. // the parameterized private key.
func New(prm Prm, opts ...Option) *Server { func New(prm Prm, netmapClient *netmap.Client, opts ...Option) *Server {
// verify required parameters // verify required parameters
switch { switch {
case prm.healthChecker == nil: case prm.healthChecker == nil:
@ -45,6 +48,7 @@ func New(prm Prm, opts ...Option) *Server {
return &Server{ return &Server{
prm: prm, prm: prm,
netmapClient: netmapClient,
allowedKeys: append(o.allowedKeys, prm.key.PublicKey().Bytes()), allowedKeys: append(o.allowedKeys, prm.key.PublicKey().Bytes()),
} }

View file

@ -20,3 +20,15 @@ func (x *HealthCheckResponse) SetBody(v *HealthCheckResponse_Body) {
x.Body = v x.Body = v
} }
} }
func (x *TickEpochRequest) SetBody(v *TickEpochRequest_Body) {
if x != nil {
x.Body = v
}
}
func (x *TickEpochResponse) SetBody(v *TickEpochResponse_Body) {
if x != nil {
x.Body = v
}
}

Binary file not shown.

View file

@ -10,6 +10,8 @@ option go_package = "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/ir/
service ControlService { service ControlService {
// Performs health check of the IR node. // Performs health check of the IR node.
rpc HealthCheck (HealthCheckRequest) returns (HealthCheckResponse); rpc HealthCheck (HealthCheckRequest) returns (HealthCheckResponse);
// Forces a new epoch to be signaled by the IR node with high probability.
rpc TickEpoch (TickEpochRequest) returns (TickEpochResponse);
} }
// Health check request. // Health check request.
@ -41,3 +43,17 @@ message HealthCheckResponse {
// Body signature. // Body signature.
Signature signature = 2; Signature signature = 2;
} }
message TickEpochRequest {
message Body{}
Body body = 1;
Signature signature = 2;
}
message TickEpochResponse {
message Body{}
Body body = 1;
Signature signature = 2;
}

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.