diff --git a/cmd/neofs-node/config.go b/cmd/neofs-node/config.go index d273ea7e..7a725f08 100644 --- a/cmd/neofs-node/config.go +++ b/cmd/neofs-node/config.go @@ -40,6 +40,7 @@ import ( tsourse "github.com/nspcc-dev/neofs-node/pkg/services/object_manager/tombstone/source" trustcontroller "github.com/nspcc-dev/neofs-node/pkg/services/reputation/local/controller" truststorage "github.com/nspcc-dev/neofs-node/pkg/services/reputation/local/storage" + "github.com/nspcc-dev/neofs-node/pkg/services/tree" "github.com/nspcc-dev/neofs-node/pkg/services/util/response" "github.com/nspcc-dev/neofs-node/pkg/util" "github.com/nspcc-dev/neofs-node/pkg/util/logger" @@ -111,6 +112,8 @@ type cfg struct { cfgControlService cfgControlService + treeService *tree.Service + healthStatus *atomic.Int32 closers []func() diff --git a/cmd/neofs-node/control.go b/cmd/neofs-node/control.go index 5e6c7620..439050a0 100644 --- a/cmd/neofs-node/control.go +++ b/cmd/neofs-node/control.go @@ -43,6 +43,7 @@ func initControlService(c *cfg) { return err }), controlSvc.WithLocalStorage(c.cfgObject.cfgLocalStorage.localStorage), + controlSvc.WithTreeService(c.treeService), ) lis, err := net.Listen("tcp", endpoint) diff --git a/cmd/neofs-node/tree.go b/cmd/neofs-node/tree.go index 95987402..4be5ac09 100644 --- a/cmd/neofs-node/tree.go +++ b/cmd/neofs-node/tree.go @@ -7,7 +7,7 @@ import ( ) func initTreeService(c *cfg) { - treeSvc := tree.New( + c.treeService = tree.New( tree.WithContainerSource(c.cfgObject.cnrSource), tree.WithNetmapSource(c.netMapSource), tree.WithPrivateKey(&c.key.PrivateKey), @@ -15,12 +15,12 @@ func initTreeService(c *cfg) { tree.WithStorage(c.cfgObject.cfgLocalStorage.localStorage)) for _, srv := range c.cfgGRPC.servers { - tree.RegisterTreeServiceServer(srv, treeSvc) + tree.RegisterTreeServiceServer(srv, c.treeService) } c.workers = append(c.workers, newWorkerFromFunc(func(ctx context.Context) { - treeSvc.Start(ctx) + c.treeService.Start(ctx) })) - c.onShutdown(treeSvc.Shutdown) + c.onShutdown(c.treeService.Shutdown) } diff --git a/pkg/services/control/convert.go b/pkg/services/control/convert.go index b8a8a616..a4d0bced 100644 --- a/pkg/services/control/convert.go +++ b/pkg/services/control/convert.go @@ -166,3 +166,21 @@ func (w *restoreShardResponseWrapper) FromGRPCMessage(m grpc.Message) error { w.RestoreShardResponse = r return nil } + +type synchronizeTreeResponseWrapper struct { + *SynchronizeTreeResponse +} + +func (w *synchronizeTreeResponseWrapper) ToGRPCMessage() grpc.Message { + return w.SynchronizeTreeResponse +} + +func (w *synchronizeTreeResponseWrapper) FromGRPCMessage(m grpc.Message) error { + r, ok := m.(*SynchronizeTreeResponse) + if !ok { + return message.NewUnexpectedMessageType(m, (*SynchronizeTreeResponse)(nil)) + } + + w.SynchronizeTreeResponse = r + return nil +} diff --git a/pkg/services/control/rpc.go b/pkg/services/control/rpc.go index 4c23f503..13751e69 100644 --- a/pkg/services/control/rpc.go +++ b/pkg/services/control/rpc.go @@ -16,6 +16,7 @@ const ( rpcSetShardMode = "SetShardMode" rpcDumpShard = "DumpShard" rpcRestoreShard = "RestoreShard" + rpcSynchronizeTree = "SynchronizeTree" ) // HealthCheck executes ControlService.HealthCheck RPC. @@ -172,3 +173,16 @@ func RestoreShard(cli *client.Client, req *RestoreShardRequest, opts ...client.C return wResp.RestoreShardResponse, nil } + +// SynchronizeTree executes ControlService.SynchronizeTree RPC. +func SynchronizeTree(cli *client.Client, req *SynchronizeTreeRequest, opts ...client.CallOption) (*SynchronizeTreeResponse, error) { + wResp := &synchronizeTreeResponseWrapper{new(SynchronizeTreeResponse)} + wReq := &requestWrapper{m: req} + + err := client.SendUnary(cli, common.CallMethodInfoUnary(serviceName, rpcSynchronizeTree), wReq, wResp, opts...) + if err != nil { + return nil, err + } + + return wResp.SynchronizeTreeResponse, nil +} diff --git a/pkg/services/control/server/server.go b/pkg/services/control/server/server.go index 562e5dc2..f1b0e573 100644 --- a/pkg/services/control/server/server.go +++ b/pkg/services/control/server/server.go @@ -3,9 +3,8 @@ package control import ( "crypto/ecdsa" - "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine" - "github.com/nspcc-dev/neofs-node/pkg/core/netmap" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine" "github.com/nspcc-dev/neofs-node/pkg/services/control" ) @@ -52,6 +51,8 @@ type cfg struct { delObjHandler DeletedObjectHandler + treeService TreeService + s *engine.StorageEngine } @@ -125,3 +126,10 @@ func WithLocalStorage(engine *engine.StorageEngine) Option { c.s = engine } } + +// WithTreeService returns an option to set tree service. +func WithTreeService(s TreeService) Option { + return func(c *cfg) { + c.treeService = s + } +} diff --git a/pkg/services/control/server/syncronize_tree.go b/pkg/services/control/server/syncronize_tree.go new file mode 100644 index 00000000..b4e91071 --- /dev/null +++ b/pkg/services/control/server/syncronize_tree.go @@ -0,0 +1,48 @@ +package control + +import ( + "context" + + "github.com/nspcc-dev/neofs-node/pkg/services/control" + cid "github.com/nspcc-dev/neofs-sdk-go/container/id" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +// TreeService represents a tree service instance. +type TreeService interface { + Synchronize(ctx context.Context, cnr cid.ID, treeID string) error +} + +func (s *Server) SynchronizeTree(ctx context.Context, req *control.SynchronizeTreeRequest) (*control.SynchronizeTreeResponse, error) { + err := s.isValidRequest(req) + if err != nil { + return nil, status.Error(codes.PermissionDenied, err.Error()) + } + + if s.treeService == nil { + return nil, status.Error(codes.Internal, "tree service is disabled") + } + + b := req.GetBody() + + var cnr cid.ID + if err := cnr.Decode(b.GetContainerId()); err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } + + err = s.treeService.Synchronize(ctx, cnr, b.GetTreeId()) + if err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } + + resp := new(control.SynchronizeTreeResponse) + resp.SetBody(new(control.SynchronizeTreeResponse_Body)) + + err = SignMessage(s.key, resp) + if err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } + + return resp, nil +} diff --git a/pkg/services/control/service.go b/pkg/services/control/service.go index f120b388..2cf9ee77 100644 --- a/pkg/services/control/service.go +++ b/pkg/services/control/service.go @@ -200,3 +200,17 @@ func (x *RestoreShardResponse) SetBody(v *RestoreShardResponse_Body) { x.Body = v } } + +// SetBody sets list shards request body. +func (x *SynchronizeTreeRequest) SetBody(v *SynchronizeTreeRequest_Body) { + if x != nil { + x.Body = v + } +} + +// SetBody sets list shards response body. +func (x *SynchronizeTreeResponse) SetBody(v *SynchronizeTreeResponse_Body) { + if x != nil { + x.Body = v + } +} diff --git a/pkg/services/control/service.pb.go b/pkg/services/control/service.pb.go index 435cd25f..4ad323c0 100644 Binary files a/pkg/services/control/service.pb.go and b/pkg/services/control/service.pb.go differ diff --git a/pkg/services/control/service.proto b/pkg/services/control/service.proto index 3446e262..5f90399d 100644 --- a/pkg/services/control/service.proto +++ b/pkg/services/control/service.proto @@ -31,6 +31,9 @@ service ControlService { // Restore objects from dump. rpc RestoreShard (RestoreShardRequest) returns (RestoreShardResponse); + + // Synchronizes all log operations for the specified tree. + rpc SynchronizeTree (SynchronizeTreeRequest) returns (SynchronizeTreeResponse); } // Health check request. @@ -279,3 +282,33 @@ message RestoreShardResponse { // Body signature. Signature signature = 2; } + +// SynchronizeTree request. +message SynchronizeTreeRequest { + // Request body structure. + message Body { + bytes container_id = 1; + string tree_id = 2; + // Starting height for the synchronization. Can be omitted. + uint64 height = 3; + } + + // Body of restore shard request message. + Body body = 1; + + // Body signature. + Signature signature = 2; +} + +// SynchronizeTree response. +message SynchronizeTreeResponse { + // Response body structure. + message Body { + } + + // Body of restore shard response message. + Body body = 1; + + // Body signature. + Signature signature = 2; +} diff --git a/pkg/services/control/service_grpc.pb.go b/pkg/services/control/service_grpc.pb.go index 5c088311..bee17aee 100644 Binary files a/pkg/services/control/service_grpc.pb.go and b/pkg/services/control/service_grpc.pb.go differ diff --git a/pkg/services/control/service_neofs.pb.go b/pkg/services/control/service_neofs.pb.go index 3b4616e2..98987a86 100644 Binary files a/pkg/services/control/service_neofs.pb.go and b/pkg/services/control/service_neofs.pb.go differ diff --git a/pkg/services/control/service_test.go b/pkg/services/control/service_test.go index 8a316754..8dbfffa4 100644 --- a/pkg/services/control/service_test.go +++ b/pkg/services/control/service_test.go @@ -161,3 +161,21 @@ func equalSetShardModeRequestBodies(b1, b2 *control.SetShardModeRequest_Body) bo return true } + +func TestSynchronizeTreeRequest_Body_StableMarshal(t *testing.T) { + testStableMarshal(t, + &control.SynchronizeTreeRequest_Body{ + ContainerId: []byte{1, 2, 3, 4, 5, 6, 7}, + TreeId: "someID", + Height: 42, + }, + new(control.SynchronizeTreeRequest_Body), + func(m1, m2 protoMessage) bool { + b1 := m1.(*control.SynchronizeTreeRequest_Body) + b2 := m2.(*control.SynchronizeTreeRequest_Body) + return bytes.Equal(b1.GetContainerId(), b2.GetContainerId()) && + b1.GetTreeId() == b2.GetTreeId() && + b1.GetHeight() == b2.GetHeight() + }, + ) +}