node/control: Allow to await control.NetmapSetStatus

Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
This commit is contained in:
Evgenii Stratonikov 2024-11-14 10:12:15 +03:00
parent 9a4bb0a23b
commit a3eb359621
Signed by: fyrchik
SSH key fingerprint: SHA256:m/TTwCzjnRkXgnzEx9X92ccxy1CcVeinOgDb3NPWWmg
7 changed files with 69 additions and 25 deletions

View file

@ -353,12 +353,12 @@ func addNewEpochAsyncNotificationHandler(c *cfg, h event.Handler) {
var errRelayBootstrap = errors.New("setting netmap status is forbidden in relay mode") var errRelayBootstrap = errors.New("setting netmap status is forbidden in relay mode")
func (c *cfg) SetNetmapStatus(ctx context.Context, st control.NetmapStatus) error { func (c *cfg) SetNetmapStatus(ctx context.Context, st control.NetmapStatus, await bool) error {
switch st { switch st {
default: default:
return fmt.Errorf("unsupported status %v", st) return fmt.Errorf("unsupported status %v", st)
case control.NetmapStatus_MAINTENANCE: case control.NetmapStatus_MAINTENANCE:
return c.setMaintenanceStatus(ctx, false) return c.setMaintenanceStatus(ctx, false, await)
case control.NetmapStatus_ONLINE, control.NetmapStatus_OFFLINE: case control.NetmapStatus_ONLINE, control.NetmapStatus_OFFLINE:
} }
@ -375,7 +375,7 @@ func (c *cfg) SetNetmapStatus(ctx context.Context, st control.NetmapStatus) erro
c.cfgNetmap.reBoostrapTurnedOff.Store(true) c.cfgNetmap.reBoostrapTurnedOff.Store(true)
return c.updateNetMapState(ctx, func(*nmClient.UpdatePeerPrm) {}) return c.updateNetMapState(ctx, func(*nmClient.UpdatePeerPrm) {}, await)
} }
func (c *cfg) GetNetmapStatus() (control.NetmapStatus, uint64, error) { func (c *cfg) GetNetmapStatus() (control.NetmapStatus, uint64, error) {
@ -387,11 +387,11 @@ func (c *cfg) GetNetmapStatus() (control.NetmapStatus, uint64, error) {
return st, epoch, nil return st, epoch, nil
} }
func (c *cfg) ForceMaintenance(ctx context.Context) error { func (c *cfg) ForceMaintenance(ctx context.Context, await bool) error {
return c.setMaintenanceStatus(ctx, true) return c.setMaintenanceStatus(ctx, true, await)
} }
func (c *cfg) setMaintenanceStatus(ctx context.Context, force bool) error { func (c *cfg) setMaintenanceStatus(ctx context.Context, force bool, await bool) error {
netSettings, err := c.cfgNetmap.wrapper.ReadNetworkConfiguration() netSettings, err := c.cfgNetmap.wrapper.ReadNetworkConfiguration()
if err != nil { if err != nil {
err = fmt.Errorf("read network settings to check maintenance allowance: %w", err) err = fmt.Errorf("read network settings to check maintenance allowance: %w", err)
@ -403,7 +403,7 @@ func (c *cfg) setMaintenanceStatus(ctx context.Context, force bool) error {
c.startMaintenance(ctx) c.startMaintenance(ctx)
if err == nil { if err == nil {
err = c.updateNetMapState(ctx, (*nmClient.UpdatePeerPrm).SetMaintenance) err = c.updateNetMapState(ctx, (*nmClient.UpdatePeerPrm).SetMaintenance, await)
} }
if err != nil { if err != nil {
@ -416,13 +416,19 @@ func (c *cfg) setMaintenanceStatus(ctx context.Context, force bool) error {
// calls UpdatePeerState operation of Netmap contract's client for the local node. // calls UpdatePeerState operation of Netmap contract's client for the local node.
// State setter is used to specify node state to switch to. // State setter is used to specify node state to switch to.
func (c *cfg) updateNetMapState(ctx context.Context, stateSetter func(*nmClient.UpdatePeerPrm)) error { func (c *cfg) updateNetMapState(ctx context.Context, stateSetter func(*nmClient.UpdatePeerPrm), await bool) error {
var prm nmClient.UpdatePeerPrm var prm nmClient.UpdatePeerPrm
prm.SetKey(c.key.PublicKey().Bytes()) prm.SetKey(c.key.PublicKey().Bytes())
stateSetter(&prm) stateSetter(&prm)
_, err := c.cfgNetmap.wrapper.UpdatePeerState(ctx, prm) res, err := c.cfgNetmap.wrapper.UpdatePeerState(ctx, prm)
if err != nil {
return err return err
}
if await {
return c.cfgNetmap.wrapper.Morph().WaitTxHalt(ctx, res)
}
return nil
} }
type netInfo struct { type netInfo struct {

View file

@ -58,9 +58,9 @@ func (c *Client) ForceRemovePeer(ctx context.Context, nodeInfo netmap.NodeInfo,
prm.SetControlTX(true) prm.SetControlTX(true)
prm.SetVUB(vub) prm.SetVUB(vub)
vub, err := c.UpdatePeerState(ctx, prm) res, err := c.UpdatePeerState(ctx, prm)
if err != nil { if err != nil {
return 0, fmt.Errorf("updating peer state: %v", err) return 0, fmt.Errorf("updating peer state: %v", err)
} }
return vub, nil return res.VUB, nil
} }

View file

@ -2,7 +2,6 @@ package netmap
import ( import (
"context" "context"
"fmt"
"git.frostfs.info/TrueCloudLab/frostfs-contract/netmap" "git.frostfs.info/TrueCloudLab/frostfs-contract/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client"
@ -37,7 +36,7 @@ func (u *UpdatePeerPrm) SetMaintenance() {
} }
// UpdatePeerState changes peer status through Netmap contract call. // UpdatePeerState changes peer status through Netmap contract call.
func (c *Client) UpdatePeerState(ctx context.Context, p UpdatePeerPrm) (uint32, error) { func (c *Client) UpdatePeerState(ctx context.Context, p UpdatePeerPrm) (client.InvokeRes, error) {
method := updateStateMethod method := updateStateMethod
if c.client.WithNotary() && c.client.IsAlpha() { if c.client.WithNotary() && c.client.IsAlpha() {
@ -56,9 +55,5 @@ func (c *Client) UpdatePeerState(ctx context.Context, p UpdatePeerPrm) (uint32,
prm.SetArgs(int64(p.state), p.key) prm.SetArgs(int64(p.state), p.key)
prm.InvokePrmOptional = p.InvokePrmOptional prm.InvokePrmOptional = p.InvokePrmOptional
res, err := c.client.Invoke(ctx, prm) return c.client.Invoke(ctx, prm)
if err != nil {
return 0, fmt.Errorf("could not invoke smart contract: %w", err)
}
return res.VUB, nil
} }

View file

@ -46,11 +46,11 @@ type NodeState interface {
// //
// If status is control.NetmapStatus_MAINTENANCE and maintenance is allowed // If status is control.NetmapStatus_MAINTENANCE and maintenance is allowed
// in the network settings, the node additionally starts local maintenance. // in the network settings, the node additionally starts local maintenance.
SetNetmapStatus(ctx context.Context, st control.NetmapStatus) error SetNetmapStatus(ctx context.Context, st control.NetmapStatus, await bool) error
// ForceMaintenance works like SetNetmapStatus(control.NetmapStatus_MAINTENANCE) // ForceMaintenance works like SetNetmapStatus(control.NetmapStatus_MAINTENANCE)
// but starts local maintenance regardless of the network settings. // but starts local maintenance regardless of the network settings.
ForceMaintenance(ctx context.Context) error ForceMaintenance(ctx context.Context, await bool) error
GetNetmapStatus() (control.NetmapStatus, uint64, error) GetNetmapStatus() (control.NetmapStatus, uint64, error)
} }

View file

@ -22,6 +22,7 @@ func (s *Server) SetNetmapStatus(ctx context.Context, req *control.SetNetmapStat
bodyReq := req.GetBody() bodyReq := req.GetBody()
st := bodyReq.GetStatus() st := bodyReq.GetStatus()
force := bodyReq.GetForceMaintenance() force := bodyReq.GetForceMaintenance()
await := bodyReq.GetAwait()
if force { if force {
if st != control.NetmapStatus_MAINTENANCE { if st != control.NetmapStatus_MAINTENANCE {
@ -29,9 +30,9 @@ func (s *Server) SetNetmapStatus(ctx context.Context, req *control.SetNetmapStat
"force_maintenance MUST be set for %s status only", control.NetmapStatus_MAINTENANCE) "force_maintenance MUST be set for %s status only", control.NetmapStatus_MAINTENANCE)
} }
err = s.nodeState.ForceMaintenance(ctx) err = s.nodeState.ForceMaintenance(ctx, await)
} else { } else {
err = s.nodeState.SetNetmapStatus(ctx, st) err = s.nodeState.SetNetmapStatus(ctx, st, await)
} }
if err != nil { if err != nil {

View file

@ -93,7 +93,8 @@ service ControlService {
rpc DetachShards(DetachShardsRequest) returns (DetachShardsResponse); rpc DetachShards(DetachShardsRequest) returns (DetachShardsResponse);
// StartShardRebuild starts shard rebuild process. // StartShardRebuild starts shard rebuild process.
rpc StartShardRebuild(StartShardRebuildRequest) returns (StartShardRebuildResponse); rpc StartShardRebuild(StartShardRebuildRequest)
returns (StartShardRebuildResponse);
} }
// Health check request. // Health check request.
@ -141,6 +142,10 @@ message SetNetmapStatusRequest {
// maintenance regardless of network settings. The flag MUST NOT be // maintenance regardless of network settings. The flag MUST NOT be
// set for any other status. // set for any other status.
bool force_maintenance = 2; bool force_maintenance = 2;
// Boolean flag indicating whether RPC should wait until status change
// request is persisted.
bool await = 3;
} }
// Body of set netmap status request message. // Body of set netmap status request message.
@ -668,7 +673,8 @@ message SealWriteCacheRequest {
// Flag indicating whether writecache will be sealed async. // Flag indicating whether writecache will be sealed async.
bool async = 3; bool async = 3;
// If true, then writecache will be sealed, but mode will be restored to the current one. // If true, then writecache will be sealed, but mode will be restored to the
// current one.
bool restore_mode = 4; bool restore_mode = 4;
// If true, then writecache will shrink internal storage. // If true, then writecache will shrink internal storage.

View file

@ -719,6 +719,7 @@ func (x *HealthCheckResponse) UnmarshalEasyJSON(in *jlexer.Lexer) {
type SetNetmapStatusRequest_Body struct { type SetNetmapStatusRequest_Body struct {
Status NetmapStatus `json:"status"` Status NetmapStatus `json:"status"`
ForceMaintenance bool `json:"forceMaintenance"` ForceMaintenance bool `json:"forceMaintenance"`
Await bool `json:"await"`
} }
var ( var (
@ -737,6 +738,7 @@ func (x *SetNetmapStatusRequest_Body) StableSize() (size int) {
} }
size += proto.EnumSize(1, int32(x.Status)) size += proto.EnumSize(1, int32(x.Status))
size += proto.BoolSize(2, x.ForceMaintenance) size += proto.BoolSize(2, x.ForceMaintenance)
size += proto.BoolSize(3, x.Await)
return size return size
} }
@ -759,6 +761,9 @@ func (x *SetNetmapStatusRequest_Body) EmitProtobuf(mm *easyproto.MessageMarshale
if x.ForceMaintenance { if x.ForceMaintenance {
mm.AppendBool(2, x.ForceMaintenance) mm.AppendBool(2, x.ForceMaintenance)
} }
if x.Await {
mm.AppendBool(3, x.Await)
}
} }
// UnmarshalProtobuf implements the encoding.ProtoUnmarshaler interface. // UnmarshalProtobuf implements the encoding.ProtoUnmarshaler interface.
@ -782,6 +787,12 @@ func (x *SetNetmapStatusRequest_Body) UnmarshalProtobuf(src []byte) (err error)
return fmt.Errorf("cannot unmarshal field %s", "ForceMaintenance") return fmt.Errorf("cannot unmarshal field %s", "ForceMaintenance")
} }
x.ForceMaintenance = data x.ForceMaintenance = data
case 3: // Await
data, ok := fc.Bool()
if !ok {
return fmt.Errorf("cannot unmarshal field %s", "Await")
}
x.Await = data
} }
} }
return nil return nil
@ -804,6 +815,15 @@ func (x *SetNetmapStatusRequest_Body) GetForceMaintenance() bool {
func (x *SetNetmapStatusRequest_Body) SetForceMaintenance(v bool) { func (x *SetNetmapStatusRequest_Body) SetForceMaintenance(v bool) {
x.ForceMaintenance = v x.ForceMaintenance = v
} }
func (x *SetNetmapStatusRequest_Body) GetAwait() bool {
if x != nil {
return x.Await
}
return false
}
func (x *SetNetmapStatusRequest_Body) SetAwait(v bool) {
x.Await = v
}
// MarshalJSON implements the json.Marshaler interface. // MarshalJSON implements the json.Marshaler interface.
func (x *SetNetmapStatusRequest_Body) MarshalJSON() ([]byte, error) { func (x *SetNetmapStatusRequest_Body) MarshalJSON() ([]byte, error) {
@ -843,6 +863,16 @@ func (x *SetNetmapStatusRequest_Body) MarshalEasyJSON(out *jwriter.Writer) {
out.RawString(prefix) out.RawString(prefix)
out.Bool(x.ForceMaintenance) out.Bool(x.ForceMaintenance)
} }
{
if !first {
out.RawByte(',')
} else {
first = false
}
const prefix string = "\"await\":"
out.RawString(prefix)
out.Bool(x.Await)
}
out.RawByte('}') out.RawByte('}')
} }
@ -899,6 +929,12 @@ func (x *SetNetmapStatusRequest_Body) UnmarshalEasyJSON(in *jlexer.Lexer) {
f = in.Bool() f = in.Bool()
x.ForceMaintenance = f x.ForceMaintenance = f
} }
case "await":
{
var f bool
f = in.Bool()
x.Await = f
}
} }
in.WantComma() in.WantComma()
} }