From 6e9ff698f61fe746a4c889d0a4a8c7e2f4b1cdbd Mon Sep 17 00:00:00 2001 From: Ekaterina Lebedeva Date: Tue, 4 Feb 2025 21:21:31 +0300 Subject: [PATCH] [#1635] control: Add method to search for shard by `oid` Added method `GetShardByObjectID` to ControlService and to StorageEngine. It returns information about shards storing object on the node. Signed-off-by: Ekaterina Lebedeva --- internal/logs/logs.go | 1 + pkg/local_object_storage/engine/shards.go | 21 + pkg/local_object_storage/shard/search.go | 36 + pkg/services/control/rpc.go | 20 + .../control/server/get_shard_by_oid.go | 96 +++ pkg/services/control/service.proto | 27 + pkg/services/control/service_frostfs.pb.go | 721 ++++++++++++++++++ pkg/services/control/service_grpc.pb.go | 39 + 8 files changed, 961 insertions(+) create mode 100644 pkg/local_object_storage/shard/search.go create mode 100644 pkg/services/control/server/get_shard_by_oid.go diff --git a/internal/logs/logs.go b/internal/logs/logs.go index 0610dc175..1b3f4c45b 100644 --- a/internal/logs/logs.go +++ b/internal/logs/logs.go @@ -253,6 +253,7 @@ const ( ShardFailureToMarkLockersAsGarbage = "failure to mark lockers as garbage" ShardFailureToGetExpiredUnlockedObjects = "failure to get expired unlocked objects" ShardCouldNotMarkObjectToDeleteInMetabase = "could not mark object to delete in metabase" + ShardCouldNotFindObject = "could not find object" WritecacheWaitingForChannelsToFlush = "waiting for channels to flush" WritecacheCantRemoveObjectFromWritecache = "can't remove object from write-cache" BlobovniczatreeCouldNotGetObjectFromLevel = "could not get object from level" diff --git a/pkg/local_object_storage/engine/shards.go b/pkg/local_object_storage/engine/shards.go index 8e191f72c..080556f83 100644 --- a/pkg/local_object_storage/engine/shards.go +++ b/pkg/local_object_storage/engine/shards.go @@ -442,3 +442,24 @@ func (e *StorageEngine) deleteShards(ctx context.Context, ids []*shard.ID) ([]ha func (s hashedShard) Hash() uint64 { return s.hash } + +func (e *StorageEngine) GetShardByObjectID(ctx context.Context, obj oid.Address) ([]shard.Info, error) { + var err error + info := make([]shard.Info, 0, len(e.shards)) + prm := shard.ExistsPrm{ + Address: obj, + } + + e.iterateOverUnsortedShards(func(hs hashedShard) (stop bool) { + res, exErr := hs.Exists(ctx, prm) + if exErr != nil { + err = exErr + return true + } + if res.Exists() { + info = append(info, hs.DumpInfo()) + } + return false + }) + return info, err +} diff --git a/pkg/local_object_storage/shard/search.go b/pkg/local_object_storage/shard/search.go new file mode 100644 index 000000000..e5201d4cd --- /dev/null +++ b/pkg/local_object_storage/shard/search.go @@ -0,0 +1,36 @@ +package shard + +import ( + "context" + "errors" + + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" + cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" + objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" + oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" +) + +var ErrObjectFound = errors.New("found object") + +type SearchPrm struct { + ObjectID oid.ID + ContainerID cid.ID +} + +func GetObjectIteratePrm(prm SearchPrm) IterateOverObjectsInContainerPrm { + return IterateOverObjectsInContainerPrm{ + ObjectType: objectSDK.TypeRegular, + ContainerID: prm.ContainerID, + Handler: func(ctx context.Context, i *object.Info) error { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + if i.Address.Object().Equals(prm.ObjectID) { + return ErrObjectFound + } + return nil + }, + } +} diff --git a/pkg/services/control/rpc.go b/pkg/services/control/rpc.go index bbf2cf0cc..cbb707d8e 100644 --- a/pkg/services/control/rpc.go +++ b/pkg/services/control/rpc.go @@ -32,6 +32,7 @@ const ( rpcListTargetsLocalOverrides = "ListTargetsLocalOverrides" rpcDetachShards = "DetachShards" rpcStartShardRebuild = "StartShardRebuild" + rpcGetShardByObjectID = "GetShardByObjectID" ) // HealthCheck executes ControlService.HealthCheck RPC. @@ -364,3 +365,22 @@ func StartShardRebuild(cli *client.Client, req *StartShardRebuildRequest, opts . return wResp.message, nil } + +// GetShardByObjectID executes ControlService.GetShardByObjectID RPC. +func GetShardByObjectID( + cli *client.Client, + req *GetShardByObjectIDRequest, + opts ...client.CallOption, +) (*GetShardByObjectIDResponse, error) { + wResp := newResponseWrapper[GetShardByObjectIDResponse]() + + wReq := &requestWrapper{ + m: req, + } + err := client.SendUnary(cli, common.CallMethodInfoUnary(serviceName, rpcGetShardByObjectID), wReq, wResp, opts...) + if err != nil { + return nil, err + } + + return wResp.message, nil +} diff --git a/pkg/services/control/server/get_shard_by_oid.go b/pkg/services/control/server/get_shard_by_oid.go new file mode 100644 index 000000000..c01f9c800 --- /dev/null +++ b/pkg/services/control/server/get_shard_by_oid.go @@ -0,0 +1,96 @@ +package control + +import ( + "context" + + "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control/server/ctrlmessage" + + cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" + oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +func (s *Server) GetShardByObjectID(ctx context.Context, req *control.GetShardByObjectIDRequest) (*control.GetShardByObjectIDResponse, error) { + // verify request + err := s.isValidRequest(req) + if err != nil { + return nil, status.Error(codes.PermissionDenied, err.Error()) + } + + var obj oid.ID + err = obj.DecodeString(req.GetBody().GetOid()) + if err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } + + var cnr cid.ID + err = cnr.DecodeString(req.GetBody().GetCid()) + if err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } + + // create and fill response + resp := new(control.GetShardByObjectIDResponse) + + body := new(control.GetShardByObjectIDResponse_Body) + resp.SetBody(body) + + // fill shard info + var objAddr oid.Address + objAddr.SetContainer(cnr) + objAddr.SetObject(obj) + info, err := s.s.GetShardByObjectID(ctx, objAddr) + if err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } + if len(info) == 0 { + return nil, status.Error(codes.Internal, logs.ShardCouldNotFindObject) + } + + body.SetShards(shardInfoToProto(info)) + + // Sign the response + if err := ctrlmessage.Sign(s.key, resp); err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } + return resp, nil +} + +func shardInfoToProto(infos []shard.Info) []control.ShardInfo { + shardInfos := make([]control.ShardInfo, 0, len(infos)) + for _, info := range infos { + shardInfo := new(control.ShardInfo) + shardInfo.SetShard_ID(*info.ID) + shardInfo.SetMetabasePath(info.MetaBaseInfo.Path) + shardInfo.Blobstor = blobstorInfoToProto(info.BlobStorInfo) + shardInfo.SetWritecachePath(info.WriteCacheInfo.Path) + shardInfo.SetPiloramaPath(info.PiloramaInfo.Path) + + var m control.ShardMode + + switch info.Mode { + case mode.ReadWrite: + m = control.ShardMode_READ_WRITE + case mode.ReadOnly: + m = control.ShardMode_READ_ONLY + case mode.Degraded: + m = control.ShardMode_DEGRADED + case mode.DegradedReadOnly: + m = control.ShardMode_DEGRADED_READ_ONLY + default: + m = control.ShardMode_SHARD_MODE_UNDEFINED + } + + shardInfo.SetMode(m) + shardInfo.SetErrorCount(info.ErrorCount) + shardInfo.SetEvacuationInProgress(info.EvacuationInProgress) + shardInfos = append(shardInfos, *shardInfo) + } + + return shardInfos +} diff --git a/pkg/services/control/service.proto b/pkg/services/control/service.proto index 97ecf9a8c..a91e15ef2 100644 --- a/pkg/services/control/service.proto +++ b/pkg/services/control/service.proto @@ -89,6 +89,9 @@ service ControlService { // StartShardRebuild starts shard rebuild process. rpc StartShardRebuild(StartShardRebuildRequest) returns (StartShardRebuildResponse); + + // GetShardByObjectID returns shard info where object is stored. + rpc GetShardByObjectID(GetShardByObjectIDRequest) returns (GetShardByObjectIDResponse); } // Health check request. @@ -729,3 +732,27 @@ message StartShardRebuildResponse { Signature signature = 2; } + +message GetShardByObjectIDRequest { + // Request body structure. + message Body { + // + string oid = 1; + // + string cid = 2; + } + + Body body = 1; + Signature signature = 2; +} + +message GetShardByObjectIDResponse { + // Request body structure. + message Body { + // + repeated ShardInfo shards = 1; + } + + Body body = 1; + Signature signature = 2; +} diff --git a/pkg/services/control/service_frostfs.pb.go b/pkg/services/control/service_frostfs.pb.go index 0b4e3cf32..7691af501 100644 --- a/pkg/services/control/service_frostfs.pb.go +++ b/pkg/services/control/service_frostfs.pb.go @@ -17303,3 +17303,724 @@ func (x *StartShardRebuildResponse) UnmarshalEasyJSON(in *jlexer.Lexer) { in.Consumed() } } + +type GetShardByObjectIDRequest_Body struct { + Oid string `json:"oid"` + Cid string `json:"cid"` +} + +var ( + _ encoding.ProtoMarshaler = (*GetShardByObjectIDRequest_Body)(nil) + _ encoding.ProtoUnmarshaler = (*GetShardByObjectIDRequest_Body)(nil) + _ json.Marshaler = (*GetShardByObjectIDRequest_Body)(nil) + _ json.Unmarshaler = (*GetShardByObjectIDRequest_Body)(nil) +) + +// StableSize returns the size of x in protobuf format. +// +// Structures with the same field values have the same binary size. +func (x *GetShardByObjectIDRequest_Body) StableSize() (size int) { + if x == nil { + return 0 + } + size += proto.StringSize(1, x.Oid) + size += proto.StringSize(2, x.Cid) + return size +} + +// MarshalProtobuf implements the encoding.ProtoMarshaler interface. +func (x *GetShardByObjectIDRequest_Body) MarshalProtobuf(dst []byte) []byte { + m := pool.MarshalerPool.Get() + defer pool.MarshalerPool.Put(m) + x.EmitProtobuf(m.MessageMarshaler()) + dst = m.Marshal(dst) + return dst +} + +func (x *GetShardByObjectIDRequest_Body) EmitProtobuf(mm *easyproto.MessageMarshaler) { + if x == nil { + return + } + if len(x.Oid) != 0 { + mm.AppendString(1, x.Oid) + } + if len(x.Cid) != 0 { + mm.AppendString(2, x.Cid) + } +} + +// UnmarshalProtobuf implements the encoding.ProtoUnmarshaler interface. +func (x *GetShardByObjectIDRequest_Body) UnmarshalProtobuf(src []byte) (err error) { + var fc easyproto.FieldContext + for len(src) > 0 { + src, err = fc.NextField(src) + if err != nil { + return fmt.Errorf("cannot read next field in %s", "GetShardByObjectIDRequest_Body") + } + switch fc.FieldNum { + case 1: // Oid + data, ok := fc.String() + if !ok { + return fmt.Errorf("cannot unmarshal field %s", "Oid") + } + x.Oid = data + case 2: // Cid + data, ok := fc.String() + if !ok { + return fmt.Errorf("cannot unmarshal field %s", "Cid") + } + x.Cid = data + } + } + return nil +} +func (x *GetShardByObjectIDRequest_Body) GetOid() string { + if x != nil { + return x.Oid + } + return "" +} +func (x *GetShardByObjectIDRequest_Body) SetOid(v string) { + x.Oid = v +} +func (x *GetShardByObjectIDRequest_Body) GetCid() string { + if x != nil { + return x.Cid + } + return "" +} +func (x *GetShardByObjectIDRequest_Body) SetCid(v string) { + x.Cid = v +} + +// MarshalJSON implements the json.Marshaler interface. +func (x *GetShardByObjectIDRequest_Body) MarshalJSON() ([]byte, error) { + w := jwriter.Writer{} + x.MarshalEasyJSON(&w) + return w.Buffer.BuildBytes(), w.Error +} +func (x *GetShardByObjectIDRequest_Body) MarshalEasyJSON(out *jwriter.Writer) { + if x == nil { + out.RawString("null") + return + } + first := true + out.RawByte('{') + { + if !first { + out.RawByte(',') + } else { + first = false + } + const prefix string = "\"oid\":" + out.RawString(prefix) + out.String(x.Oid) + } + { + if !first { + out.RawByte(',') + } else { + first = false + } + const prefix string = "\"cid\":" + out.RawString(prefix) + out.String(x.Cid) + } + out.RawByte('}') +} + +// UnmarshalJSON implements the json.Unmarshaler interface. +func (x *GetShardByObjectIDRequest_Body) UnmarshalJSON(data []byte) error { + r := jlexer.Lexer{Data: data} + x.UnmarshalEasyJSON(&r) + return r.Error() +} +func (x *GetShardByObjectIDRequest_Body) UnmarshalEasyJSON(in *jlexer.Lexer) { + isTopLevel := in.IsStart() + if in.IsNull() { + if isTopLevel { + in.Consumed() + } + in.Skip() + return + } + in.Delim('{') + for !in.IsDelim('}') { + key := in.UnsafeFieldName(false) + in.WantColon() + if in.IsNull() { + in.Skip() + in.WantComma() + continue + } + switch key { + case "oid": + { + var f string + f = in.String() + x.Oid = f + } + case "cid": + { + var f string + f = in.String() + x.Cid = f + } + } + in.WantComma() + } + in.Delim('}') + if isTopLevel { + in.Consumed() + } +} + +type GetShardByObjectIDRequest struct { + Body *GetShardByObjectIDRequest_Body `json:"body"` + Signature *Signature `json:"signature"` +} + +var ( + _ encoding.ProtoMarshaler = (*GetShardByObjectIDRequest)(nil) + _ encoding.ProtoUnmarshaler = (*GetShardByObjectIDRequest)(nil) + _ json.Marshaler = (*GetShardByObjectIDRequest)(nil) + _ json.Unmarshaler = (*GetShardByObjectIDRequest)(nil) +) + +// StableSize returns the size of x in protobuf format. +// +// Structures with the same field values have the same binary size. +func (x *GetShardByObjectIDRequest) StableSize() (size int) { + if x == nil { + return 0 + } + size += proto.NestedStructureSize(1, x.Body) + size += proto.NestedStructureSize(2, x.Signature) + return size +} + +// ReadSignedData fills buf with signed data of x. +// If buffer length is less than x.SignedDataSize(), new buffer is allocated. +// +// Returns any error encountered which did not allow writing the data completely. +// Otherwise, returns the buffer in which the data is written. +// +// Structures with the same field values have the same signed data. +func (x *GetShardByObjectIDRequest) SignedDataSize() int { + return x.GetBody().StableSize() +} + +// SignedDataSize returns size of the request signed data in bytes. +// +// Structures with the same field values have the same signed data size. +func (x *GetShardByObjectIDRequest) ReadSignedData(buf []byte) ([]byte, error) { + return x.GetBody().MarshalProtobuf(buf), nil +} + +// MarshalProtobuf implements the encoding.ProtoMarshaler interface. +func (x *GetShardByObjectIDRequest) MarshalProtobuf(dst []byte) []byte { + m := pool.MarshalerPool.Get() + defer pool.MarshalerPool.Put(m) + x.EmitProtobuf(m.MessageMarshaler()) + dst = m.Marshal(dst) + return dst +} + +func (x *GetShardByObjectIDRequest) EmitProtobuf(mm *easyproto.MessageMarshaler) { + if x == nil { + return + } + if x.Body != nil { + x.Body.EmitProtobuf(mm.AppendMessage(1)) + } + if x.Signature != nil { + x.Signature.EmitProtobuf(mm.AppendMessage(2)) + } +} + +// UnmarshalProtobuf implements the encoding.ProtoUnmarshaler interface. +func (x *GetShardByObjectIDRequest) UnmarshalProtobuf(src []byte) (err error) { + var fc easyproto.FieldContext + for len(src) > 0 { + src, err = fc.NextField(src) + if err != nil { + return fmt.Errorf("cannot read next field in %s", "GetShardByObjectIDRequest") + } + switch fc.FieldNum { + case 1: // Body + data, ok := fc.MessageData() + if !ok { + return fmt.Errorf("cannot unmarshal field %s", "Body") + } + x.Body = new(GetShardByObjectIDRequest_Body) + if err := x.Body.UnmarshalProtobuf(data); err != nil { + return fmt.Errorf("unmarshal: %w", err) + } + case 2: // Signature + data, ok := fc.MessageData() + if !ok { + return fmt.Errorf("cannot unmarshal field %s", "Signature") + } + x.Signature = new(Signature) + if err := x.Signature.UnmarshalProtobuf(data); err != nil { + return fmt.Errorf("unmarshal: %w", err) + } + } + } + return nil +} +func (x *GetShardByObjectIDRequest) GetBody() *GetShardByObjectIDRequest_Body { + if x != nil { + return x.Body + } + return nil +} +func (x *GetShardByObjectIDRequest) SetBody(v *GetShardByObjectIDRequest_Body) { + x.Body = v +} +func (x *GetShardByObjectIDRequest) GetSignature() *Signature { + if x != nil { + return x.Signature + } + return nil +} +func (x *GetShardByObjectIDRequest) SetSignature(v *Signature) { + x.Signature = v +} + +// MarshalJSON implements the json.Marshaler interface. +func (x *GetShardByObjectIDRequest) MarshalJSON() ([]byte, error) { + w := jwriter.Writer{} + x.MarshalEasyJSON(&w) + return w.Buffer.BuildBytes(), w.Error +} +func (x *GetShardByObjectIDRequest) MarshalEasyJSON(out *jwriter.Writer) { + if x == nil { + out.RawString("null") + return + } + first := true + out.RawByte('{') + { + if !first { + out.RawByte(',') + } else { + first = false + } + const prefix string = "\"body\":" + out.RawString(prefix) + x.Body.MarshalEasyJSON(out) + } + { + if !first { + out.RawByte(',') + } else { + first = false + } + const prefix string = "\"signature\":" + out.RawString(prefix) + x.Signature.MarshalEasyJSON(out) + } + out.RawByte('}') +} + +// UnmarshalJSON implements the json.Unmarshaler interface. +func (x *GetShardByObjectIDRequest) UnmarshalJSON(data []byte) error { + r := jlexer.Lexer{Data: data} + x.UnmarshalEasyJSON(&r) + return r.Error() +} +func (x *GetShardByObjectIDRequest) UnmarshalEasyJSON(in *jlexer.Lexer) { + isTopLevel := in.IsStart() + if in.IsNull() { + if isTopLevel { + in.Consumed() + } + in.Skip() + return + } + in.Delim('{') + for !in.IsDelim('}') { + key := in.UnsafeFieldName(false) + in.WantColon() + if in.IsNull() { + in.Skip() + in.WantComma() + continue + } + switch key { + case "body": + { + var f *GetShardByObjectIDRequest_Body + f = new(GetShardByObjectIDRequest_Body) + f.UnmarshalEasyJSON(in) + x.Body = f + } + case "signature": + { + var f *Signature + f = new(Signature) + f.UnmarshalEasyJSON(in) + x.Signature = f + } + } + in.WantComma() + } + in.Delim('}') + if isTopLevel { + in.Consumed() + } +} + +type GetShardByObjectIDResponse_Body struct { + Shards []ShardInfo `json:"shards"` +} + +var ( + _ encoding.ProtoMarshaler = (*GetShardByObjectIDResponse_Body)(nil) + _ encoding.ProtoUnmarshaler = (*GetShardByObjectIDResponse_Body)(nil) + _ json.Marshaler = (*GetShardByObjectIDResponse_Body)(nil) + _ json.Unmarshaler = (*GetShardByObjectIDResponse_Body)(nil) +) + +// StableSize returns the size of x in protobuf format. +// +// Structures with the same field values have the same binary size. +func (x *GetShardByObjectIDResponse_Body) StableSize() (size int) { + if x == nil { + return 0 + } + for i := range x.Shards { + size += proto.NestedStructureSizeUnchecked(1, &x.Shards[i]) + } + return size +} + +// MarshalProtobuf implements the encoding.ProtoMarshaler interface. +func (x *GetShardByObjectIDResponse_Body) MarshalProtobuf(dst []byte) []byte { + m := pool.MarshalerPool.Get() + defer pool.MarshalerPool.Put(m) + x.EmitProtobuf(m.MessageMarshaler()) + dst = m.Marshal(dst) + return dst +} + +func (x *GetShardByObjectIDResponse_Body) EmitProtobuf(mm *easyproto.MessageMarshaler) { + if x == nil { + return + } + for i := range x.Shards { + x.Shards[i].EmitProtobuf(mm.AppendMessage(1)) + } +} + +// UnmarshalProtobuf implements the encoding.ProtoUnmarshaler interface. +func (x *GetShardByObjectIDResponse_Body) UnmarshalProtobuf(src []byte) (err error) { + var fc easyproto.FieldContext + for len(src) > 0 { + src, err = fc.NextField(src) + if err != nil { + return fmt.Errorf("cannot read next field in %s", "GetShardByObjectIDResponse_Body") + } + switch fc.FieldNum { + case 1: // Shards + data, ok := fc.MessageData() + if !ok { + return fmt.Errorf("cannot unmarshal field %s", "Shards") + } + x.Shards = append(x.Shards, ShardInfo{}) + ff := &x.Shards[len(x.Shards)-1] + if err := ff.UnmarshalProtobuf(data); err != nil { + return fmt.Errorf("unmarshal: %w", err) + } + } + } + return nil +} +func (x *GetShardByObjectIDResponse_Body) GetShards() []ShardInfo { + if x != nil { + return x.Shards + } + return nil +} +func (x *GetShardByObjectIDResponse_Body) SetShards(v []ShardInfo) { + x.Shards = v +} + +// MarshalJSON implements the json.Marshaler interface. +func (x *GetShardByObjectIDResponse_Body) MarshalJSON() ([]byte, error) { + w := jwriter.Writer{} + x.MarshalEasyJSON(&w) + return w.Buffer.BuildBytes(), w.Error +} +func (x *GetShardByObjectIDResponse_Body) MarshalEasyJSON(out *jwriter.Writer) { + if x == nil { + out.RawString("null") + return + } + first := true + out.RawByte('{') + { + if !first { + out.RawByte(',') + } else { + first = false + } + const prefix string = "\"shards\":" + out.RawString(prefix) + out.RawByte('[') + for i := range x.Shards { + if i != 0 { + out.RawByte(',') + } + x.Shards[i].MarshalEasyJSON(out) + } + out.RawByte(']') + } + out.RawByte('}') +} + +// UnmarshalJSON implements the json.Unmarshaler interface. +func (x *GetShardByObjectIDResponse_Body) UnmarshalJSON(data []byte) error { + r := jlexer.Lexer{Data: data} + x.UnmarshalEasyJSON(&r) + return r.Error() +} +func (x *GetShardByObjectIDResponse_Body) UnmarshalEasyJSON(in *jlexer.Lexer) { + isTopLevel := in.IsStart() + if in.IsNull() { + if isTopLevel { + in.Consumed() + } + in.Skip() + return + } + in.Delim('{') + for !in.IsDelim('}') { + key := in.UnsafeFieldName(false) + in.WantColon() + if in.IsNull() { + in.Skip() + in.WantComma() + continue + } + switch key { + case "shards": + { + var f ShardInfo + var list []ShardInfo + in.Delim('[') + for !in.IsDelim(']') { + f = ShardInfo{} + f.UnmarshalEasyJSON(in) + list = append(list, f) + in.WantComma() + } + x.Shards = list + in.Delim(']') + } + } + in.WantComma() + } + in.Delim('}') + if isTopLevel { + in.Consumed() + } +} + +type GetShardByObjectIDResponse struct { + Body *GetShardByObjectIDResponse_Body `json:"body"` + Signature *Signature `json:"signature"` +} + +var ( + _ encoding.ProtoMarshaler = (*GetShardByObjectIDResponse)(nil) + _ encoding.ProtoUnmarshaler = (*GetShardByObjectIDResponse)(nil) + _ json.Marshaler = (*GetShardByObjectIDResponse)(nil) + _ json.Unmarshaler = (*GetShardByObjectIDResponse)(nil) +) + +// StableSize returns the size of x in protobuf format. +// +// Structures with the same field values have the same binary size. +func (x *GetShardByObjectIDResponse) StableSize() (size int) { + if x == nil { + return 0 + } + size += proto.NestedStructureSize(1, x.Body) + size += proto.NestedStructureSize(2, x.Signature) + return size +} + +// ReadSignedData fills buf with signed data of x. +// If buffer length is less than x.SignedDataSize(), new buffer is allocated. +// +// Returns any error encountered which did not allow writing the data completely. +// Otherwise, returns the buffer in which the data is written. +// +// Structures with the same field values have the same signed data. +func (x *GetShardByObjectIDResponse) SignedDataSize() int { + return x.GetBody().StableSize() +} + +// SignedDataSize returns size of the request signed data in bytes. +// +// Structures with the same field values have the same signed data size. +func (x *GetShardByObjectIDResponse) ReadSignedData(buf []byte) ([]byte, error) { + return x.GetBody().MarshalProtobuf(buf), nil +} + +// MarshalProtobuf implements the encoding.ProtoMarshaler interface. +func (x *GetShardByObjectIDResponse) MarshalProtobuf(dst []byte) []byte { + m := pool.MarshalerPool.Get() + defer pool.MarshalerPool.Put(m) + x.EmitProtobuf(m.MessageMarshaler()) + dst = m.Marshal(dst) + return dst +} + +func (x *GetShardByObjectIDResponse) EmitProtobuf(mm *easyproto.MessageMarshaler) { + if x == nil { + return + } + if x.Body != nil { + x.Body.EmitProtobuf(mm.AppendMessage(1)) + } + if x.Signature != nil { + x.Signature.EmitProtobuf(mm.AppendMessage(2)) + } +} + +// UnmarshalProtobuf implements the encoding.ProtoUnmarshaler interface. +func (x *GetShardByObjectIDResponse) UnmarshalProtobuf(src []byte) (err error) { + var fc easyproto.FieldContext + for len(src) > 0 { + src, err = fc.NextField(src) + if err != nil { + return fmt.Errorf("cannot read next field in %s", "GetShardByObjectIDResponse") + } + switch fc.FieldNum { + case 1: // Body + data, ok := fc.MessageData() + if !ok { + return fmt.Errorf("cannot unmarshal field %s", "Body") + } + x.Body = new(GetShardByObjectIDResponse_Body) + if err := x.Body.UnmarshalProtobuf(data); err != nil { + return fmt.Errorf("unmarshal: %w", err) + } + case 2: // Signature + data, ok := fc.MessageData() + if !ok { + return fmt.Errorf("cannot unmarshal field %s", "Signature") + } + x.Signature = new(Signature) + if err := x.Signature.UnmarshalProtobuf(data); err != nil { + return fmt.Errorf("unmarshal: %w", err) + } + } + } + return nil +} +func (x *GetShardByObjectIDResponse) GetBody() *GetShardByObjectIDResponse_Body { + if x != nil { + return x.Body + } + return nil +} +func (x *GetShardByObjectIDResponse) SetBody(v *GetShardByObjectIDResponse_Body) { + x.Body = v +} +func (x *GetShardByObjectIDResponse) GetSignature() *Signature { + if x != nil { + return x.Signature + } + return nil +} +func (x *GetShardByObjectIDResponse) SetSignature(v *Signature) { + x.Signature = v +} + +// MarshalJSON implements the json.Marshaler interface. +func (x *GetShardByObjectIDResponse) MarshalJSON() ([]byte, error) { + w := jwriter.Writer{} + x.MarshalEasyJSON(&w) + return w.Buffer.BuildBytes(), w.Error +} +func (x *GetShardByObjectIDResponse) MarshalEasyJSON(out *jwriter.Writer) { + if x == nil { + out.RawString("null") + return + } + first := true + out.RawByte('{') + { + if !first { + out.RawByte(',') + } else { + first = false + } + const prefix string = "\"body\":" + out.RawString(prefix) + x.Body.MarshalEasyJSON(out) + } + { + if !first { + out.RawByte(',') + } else { + first = false + } + const prefix string = "\"signature\":" + out.RawString(prefix) + x.Signature.MarshalEasyJSON(out) + } + out.RawByte('}') +} + +// UnmarshalJSON implements the json.Unmarshaler interface. +func (x *GetShardByObjectIDResponse) UnmarshalJSON(data []byte) error { + r := jlexer.Lexer{Data: data} + x.UnmarshalEasyJSON(&r) + return r.Error() +} +func (x *GetShardByObjectIDResponse) UnmarshalEasyJSON(in *jlexer.Lexer) { + isTopLevel := in.IsStart() + if in.IsNull() { + if isTopLevel { + in.Consumed() + } + in.Skip() + return + } + in.Delim('{') + for !in.IsDelim('}') { + key := in.UnsafeFieldName(false) + in.WantColon() + if in.IsNull() { + in.Skip() + in.WantComma() + continue + } + switch key { + case "body": + { + var f *GetShardByObjectIDResponse_Body + f = new(GetShardByObjectIDResponse_Body) + f.UnmarshalEasyJSON(in) + x.Body = f + } + case "signature": + { + var f *Signature + f = new(Signature) + f.UnmarshalEasyJSON(in) + x.Signature = f + } + } + in.WantComma() + } + in.Delim('}') + if isTopLevel { + in.Consumed() + } +} diff --git a/pkg/services/control/service_grpc.pb.go b/pkg/services/control/service_grpc.pb.go index 987e08c59..a9e579c22 100644 --- a/pkg/services/control/service_grpc.pb.go +++ b/pkg/services/control/service_grpc.pb.go @@ -41,6 +41,7 @@ const ( ControlService_SealWriteCache_FullMethodName = "/control.ControlService/SealWriteCache" ControlService_DetachShards_FullMethodName = "/control.ControlService/DetachShards" ControlService_StartShardRebuild_FullMethodName = "/control.ControlService/StartShardRebuild" + ControlService_GetShardByObjectID_FullMethodName = "/control.ControlService/GetShardByObjectID" ) // ControlServiceClient is the client API for ControlService service. @@ -95,6 +96,8 @@ type ControlServiceClient interface { DetachShards(ctx context.Context, in *DetachShardsRequest, opts ...grpc.CallOption) (*DetachShardsResponse, error) // StartShardRebuild starts shard rebuild process. StartShardRebuild(ctx context.Context, in *StartShardRebuildRequest, opts ...grpc.CallOption) (*StartShardRebuildResponse, error) + // GetShardByObjectID returns shard info where object is stored. + GetShardByObjectID(ctx context.Context, in *GetShardByObjectIDRequest, opts ...grpc.CallOption) (*GetShardByObjectIDResponse, error) } type controlServiceClient struct { @@ -303,6 +306,15 @@ func (c *controlServiceClient) StartShardRebuild(ctx context.Context, in *StartS return out, nil } +func (c *controlServiceClient) GetShardByObjectID(ctx context.Context, in *GetShardByObjectIDRequest, opts ...grpc.CallOption) (*GetShardByObjectIDResponse, error) { + out := new(GetShardByObjectIDResponse) + err := c.cc.Invoke(ctx, ControlService_GetShardByObjectID_FullMethodName, in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + // ControlServiceServer is the server API for ControlService service. // All implementations should embed UnimplementedControlServiceServer // for forward compatibility @@ -355,6 +367,8 @@ type ControlServiceServer interface { DetachShards(context.Context, *DetachShardsRequest) (*DetachShardsResponse, error) // StartShardRebuild starts shard rebuild process. StartShardRebuild(context.Context, *StartShardRebuildRequest) (*StartShardRebuildResponse, error) + // GetShardByObjectID returns shard info where object is stored. + GetShardByObjectID(context.Context, *GetShardByObjectIDRequest) (*GetShardByObjectIDResponse, error) } // UnimplementedControlServiceServer should be embedded to have forward compatible implementations. @@ -427,6 +441,9 @@ func (UnimplementedControlServiceServer) DetachShards(context.Context, *DetachSh func (UnimplementedControlServiceServer) StartShardRebuild(context.Context, *StartShardRebuildRequest) (*StartShardRebuildResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method StartShardRebuild not implemented") } +func (UnimplementedControlServiceServer) GetShardByObjectID(context.Context, *GetShardByObjectIDRequest) (*GetShardByObjectIDResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method GetShardByObjectID not implemented") +} // UnsafeControlServiceServer may be embedded to opt out of forward compatibility for this service. // Use of this interface is not recommended, as added methods to ControlServiceServer will @@ -835,6 +852,24 @@ func _ControlService_StartShardRebuild_Handler(srv interface{}, ctx context.Cont return interceptor(ctx, in, info, handler) } +func _ControlService_GetShardByObjectID_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(GetShardByObjectIDRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(ControlServiceServer).GetShardByObjectID(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: ControlService_GetShardByObjectID_FullMethodName, + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(ControlServiceServer).GetShardByObjectID(ctx, req.(*GetShardByObjectIDRequest)) + } + return interceptor(ctx, in, info, handler) +} + // ControlService_ServiceDesc is the grpc.ServiceDesc for ControlService service. // It's only intended for direct use with grpc.RegisterService, // and not to be introspected or modified (even as a copy) @@ -930,6 +965,10 @@ var ControlService_ServiceDesc = grpc.ServiceDesc{ MethodName: "StartShardRebuild", Handler: _ControlService_StartShardRebuild_Handler, }, + { + MethodName: "GetShardByObjectID", + Handler: _ControlService_GetShardByObjectID_Handler, + }, }, Streams: []grpc.StreamDesc{}, Metadata: "pkg/services/control/service.proto",