engine: Allow to remove redundant object copies #191

Merged
fyrchik merged 3 commits from fyrchik/frostfs-node:shard-reinsertion into master 2023-04-07 17:25:51 +00:00
7 changed files with 97 additions and 0 deletions
Showing only changes of commit 1856873049 - Show all commits

View file

@ -200,3 +200,21 @@ func (w *flushCacheResponseWrapper) FromGRPCMessage(m grpc.Message) error {
w.FlushCacheResponse = r
return nil
}
type doctorResponseWrapper struct {
*DoctorResponse
}
func (w *doctorResponseWrapper) ToGRPCMessage() grpc.Message {
return w.DoctorResponse
}
func (w *doctorResponseWrapper) FromGRPCMessage(m grpc.Message) error {
r, ok := m.(*DoctorResponse)
if !ok {
return message.NewUnexpectedMessageType(m, (*DoctorResponse)(nil))
}
w.DoctorResponse = r
return nil
}

View file

@ -18,6 +18,7 @@ const (
rpcSynchronizeTree = "SynchronizeTree"
rpcEvacuateShard = "EvacuateShard"
rpcFlushCache = "FlushCache"
rpcDoctor = "Doctor"
)
// HealthCheck executes ControlService.HealthCheck RPC.
@ -191,3 +192,16 @@ func FlushCache(cli *client.Client, req *FlushCacheRequest, opts ...client.CallO
return wResp.FlushCacheResponse, nil
}
// Doctor executes ControlService.Doctor RPC.
func Doctor(cli *client.Client, req *DoctorRequest, opts ...client.CallOption) (*DoctorResponse, error) {
wResp := &doctorResponseWrapper{new(DoctorResponse)}
wReq := &requestWrapper{m: req}
err := client.SendUnary(cli, common.CallMethodInfoUnary(serviceName, rpcDoctor), wReq, wResp, opts...)
dstepanov-yadro marked this conversation as resolved Outdated

There is no timeout for RPC call?

There is no timeout for RPC call?
It is hidden inside the client. https://git.frostfs.info/TrueCloudLab/frostfs-node/src/branch/master/cmd/frostfs-cli/internal/client/sdk.go#L53 (yes, we could improve this)
if err != nil {
return nil, err
}
return wResp.DoctorResponse, nil
}

View file

@ -0,0 +1,37 @@
package control
import (
"context"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
func (s *Server) Doctor(ctx context.Context, req *control.DoctorRequest) (*control.DoctorResponse, error) {
dstepanov-yadro marked this conversation as resolved Outdated

Will context be canceled if the command execution is interrupted?

Will ```context``` be canceled if the command execution is interrupted?

Haven't tested this, but I would expect this from the gRPC.

Haven't tested this, but I would expect this from the gRPC.
err := s.isValidRequest(req)
if err != nil {
return nil, status.Error(codes.PermissionDenied, err.Error())
}
if !req.Body.RemoveDuplicates {
return nil, status.Error(codes.InvalidArgument, "operation not specified")
}
var prm engine.RemoveDuplicatesPrm
prm.Concurrency = int(req.Body.Concurrency)
err = s.s.RemoveDuplicates(ctx, prm)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
resp := &control.DoctorResponse{Body: &control.DoctorResponse_Body{}}
err = SignMessage(s.key, resp)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
return resp, nil
}

Binary file not shown.

View file

@ -37,6 +37,9 @@ service ControlService {
// FlushCache moves all data from one shard to the others.
rpc FlushCache (FlushCacheRequest) returns (FlushCacheResponse);
// Doctor performs storage restructuring operations on engine.
rpc Doctor (DoctorRequest) returns (DoctorResponse);
}
// Health check request.
@ -345,3 +348,28 @@ message FlushCacheResponse {
Body body = 1;
Signature signature = 2;
}
// Doctor request.
message DoctorRequest {
// Request body structure.
message Body {
// Number of threads to use for the operation.
uint32 concurrency = 1;
// Flag to search engine for duplicate objects and leave only one copy.
bool remove_duplicates = 2;
}
Body body = 1;
Signature signature = 2;
}
// Doctor response.
message DoctorResponse {
// Response body structure.
message Body {
}
Body body = 1;
Signature signature = 2;
}

Binary file not shown.

Binary file not shown.