[#1333] services/control: Allow to synchronize local trees

Do not check that a node indeed belongs to the container, because the
synchronization will fail in this case anyway.

Signed-off-by: Evgenii Stratonikov <evgeniy@nspcc.ru>
This commit is contained in:
Evgenii Stratonikov 2022-05-16 19:31:50 +03:00 committed by fyrchik
parent 0dc1a4e336
commit bfdd68dcb3
13 changed files with 163 additions and 6 deletions

View file

@ -40,6 +40,7 @@ import (
tsourse "github.com/nspcc-dev/neofs-node/pkg/services/object_manager/tombstone/source"
trustcontroller "github.com/nspcc-dev/neofs-node/pkg/services/reputation/local/controller"
truststorage "github.com/nspcc-dev/neofs-node/pkg/services/reputation/local/storage"
"github.com/nspcc-dev/neofs-node/pkg/services/tree"
"github.com/nspcc-dev/neofs-node/pkg/services/util/response"
"github.com/nspcc-dev/neofs-node/pkg/util"
"github.com/nspcc-dev/neofs-node/pkg/util/logger"
@ -111,6 +112,8 @@ type cfg struct {
cfgControlService cfgControlService
treeService *tree.Service
healthStatus *atomic.Int32
closers []func()

View file

@ -43,6 +43,7 @@ func initControlService(c *cfg) {
return err
}),
controlSvc.WithLocalStorage(c.cfgObject.cfgLocalStorage.localStorage),
controlSvc.WithTreeService(c.treeService),
)
lis, err := net.Listen("tcp", endpoint)

View file

@ -7,7 +7,7 @@ import (
)
func initTreeService(c *cfg) {
treeSvc := tree.New(
c.treeService = tree.New(
tree.WithContainerSource(c.cfgObject.cnrSource),
tree.WithNetmapSource(c.netMapSource),
tree.WithPrivateKey(&c.key.PrivateKey),
@ -15,12 +15,12 @@ func initTreeService(c *cfg) {
tree.WithStorage(c.cfgObject.cfgLocalStorage.localStorage))
for _, srv := range c.cfgGRPC.servers {
tree.RegisterTreeServiceServer(srv, treeSvc)
tree.RegisterTreeServiceServer(srv, c.treeService)
}
c.workers = append(c.workers, newWorkerFromFunc(func(ctx context.Context) {
treeSvc.Start(ctx)
c.treeService.Start(ctx)
}))
c.onShutdown(treeSvc.Shutdown)
c.onShutdown(c.treeService.Shutdown)
}

View file

@ -166,3 +166,21 @@ func (w *restoreShardResponseWrapper) FromGRPCMessage(m grpc.Message) error {
w.RestoreShardResponse = r
return nil
}
type synchronizeTreeResponseWrapper struct {
*SynchronizeTreeResponse
}
func (w *synchronizeTreeResponseWrapper) ToGRPCMessage() grpc.Message {
return w.SynchronizeTreeResponse
}
func (w *synchronizeTreeResponseWrapper) FromGRPCMessage(m grpc.Message) error {
r, ok := m.(*SynchronizeTreeResponse)
if !ok {
return message.NewUnexpectedMessageType(m, (*SynchronizeTreeResponse)(nil))
}
w.SynchronizeTreeResponse = r
return nil
}

View file

@ -16,6 +16,7 @@ const (
rpcSetShardMode = "SetShardMode"
rpcDumpShard = "DumpShard"
rpcRestoreShard = "RestoreShard"
rpcSynchronizeTree = "SynchronizeTree"
)
// HealthCheck executes ControlService.HealthCheck RPC.
@ -172,3 +173,16 @@ func RestoreShard(cli *client.Client, req *RestoreShardRequest, opts ...client.C
return wResp.RestoreShardResponse, nil
}
// SynchronizeTree executes ControlService.SynchronizeTree RPC.
func SynchronizeTree(cli *client.Client, req *SynchronizeTreeRequest, opts ...client.CallOption) (*SynchronizeTreeResponse, error) {
wResp := &synchronizeTreeResponseWrapper{new(SynchronizeTreeResponse)}
wReq := &requestWrapper{m: req}
err := client.SendUnary(cli, common.CallMethodInfoUnary(serviceName, rpcSynchronizeTree), wReq, wResp, opts...)
if err != nil {
return nil, err
}
return wResp.SynchronizeTreeResponse, nil
}

View file

@ -3,9 +3,8 @@ package control
import (
"crypto/ecdsa"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine"
"github.com/nspcc-dev/neofs-node/pkg/core/netmap"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine"
"github.com/nspcc-dev/neofs-node/pkg/services/control"
)
@ -52,6 +51,8 @@ type cfg struct {
delObjHandler DeletedObjectHandler
treeService TreeService
s *engine.StorageEngine
}
@ -125,3 +126,10 @@ func WithLocalStorage(engine *engine.StorageEngine) Option {
c.s = engine
}
}
// WithTreeService returns an option to set tree service.
func WithTreeService(s TreeService) Option {
return func(c *cfg) {
c.treeService = s
}
}

View file

@ -0,0 +1,48 @@
package control
import (
"context"
"github.com/nspcc-dev/neofs-node/pkg/services/control"
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
// TreeService represents a tree service instance.
type TreeService interface {
Synchronize(ctx context.Context, cnr cid.ID, treeID string) error
}
func (s *Server) SynchronizeTree(ctx context.Context, req *control.SynchronizeTreeRequest) (*control.SynchronizeTreeResponse, error) {
err := s.isValidRequest(req)
if err != nil {
return nil, status.Error(codes.PermissionDenied, err.Error())
}
if s.treeService == nil {
return nil, status.Error(codes.Internal, "tree service is disabled")
}
b := req.GetBody()
var cnr cid.ID
if err := cnr.Decode(b.GetContainerId()); err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
err = s.treeService.Synchronize(ctx, cnr, b.GetTreeId())
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
resp := new(control.SynchronizeTreeResponse)
resp.SetBody(new(control.SynchronizeTreeResponse_Body))
err = SignMessage(s.key, resp)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
return resp, nil
}

View file

@ -200,3 +200,17 @@ func (x *RestoreShardResponse) SetBody(v *RestoreShardResponse_Body) {
x.Body = v
}
}
// SetBody sets list shards request body.
func (x *SynchronizeTreeRequest) SetBody(v *SynchronizeTreeRequest_Body) {
if x != nil {
x.Body = v
}
}
// SetBody sets list shards response body.
func (x *SynchronizeTreeResponse) SetBody(v *SynchronizeTreeResponse_Body) {
if x != nil {
x.Body = v
}
}

Binary file not shown.

View file

@ -31,6 +31,9 @@ service ControlService {
// Restore objects from dump.
rpc RestoreShard (RestoreShardRequest) returns (RestoreShardResponse);
// Synchronizes all log operations for the specified tree.
rpc SynchronizeTree (SynchronizeTreeRequest) returns (SynchronizeTreeResponse);
}
// Health check request.
@ -279,3 +282,33 @@ message RestoreShardResponse {
// Body signature.
Signature signature = 2;
}
// SynchronizeTree request.
message SynchronizeTreeRequest {
// Request body structure.
message Body {
bytes container_id = 1;
string tree_id = 2;
// Starting height for the synchronization. Can be omitted.
uint64 height = 3;
}
// Body of restore shard request message.
Body body = 1;
// Body signature.
Signature signature = 2;
}
// SynchronizeTree response.
message SynchronizeTreeResponse {
// Response body structure.
message Body {
}
// Body of restore shard response message.
Body body = 1;
// Body signature.
Signature signature = 2;
}

Binary file not shown.

Binary file not shown.

View file

@ -161,3 +161,21 @@ func equalSetShardModeRequestBodies(b1, b2 *control.SetShardModeRequest_Body) bo
return true
}
func TestSynchronizeTreeRequest_Body_StableMarshal(t *testing.T) {
testStableMarshal(t,
&control.SynchronizeTreeRequest_Body{
ContainerId: []byte{1, 2, 3, 4, 5, 6, 7},
TreeId: "someID",
Height: 42,
},
new(control.SynchronizeTreeRequest_Body),
func(m1, m2 protoMessage) bool {
b1 := m1.(*control.SynchronizeTreeRequest_Body)
b2 := m2.(*control.SynchronizeTreeRequest_Body)
return bytes.Equal(b1.GetContainerId(), b2.GetContainerId()) &&
b1.GetTreeId() == b2.GetTreeId() &&
b1.GetHeight() == b2.GetHeight()
},
)
}