forked from TrueCloudLab/frostfs-node
Airat Arifullin
a4a1c3f18b
* Also, resolve dependencies and conflicts for object service by creating stub for `Patch` method. Signed-off-by: Airat Arifullin <a.arifullin@yadro.com>
168 lines
4.2 KiB
Go
168 lines
4.2 KiB
Go
package object
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"io"
|
|
|
|
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
|
|
objectGRPC "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object/grpc"
|
|
objectSvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/util"
|
|
)
|
|
|
|
// Server wraps FrostFS API Object service and
|
|
// provides gRPC Object service server interface.
|
|
type Server struct {
|
|
srv objectSvc.ServiceServer
|
|
}
|
|
|
|
// New creates, initializes and returns Server instance.
|
|
func New(c objectSvc.ServiceServer) *Server {
|
|
return &Server{
|
|
srv: c,
|
|
}
|
|
}
|
|
|
|
// Patch opens internal Object patch stream and feeds it by the data read from gRPC stream.
|
|
func (s *Server) Patch(gStream objectGRPC.ObjectService_PatchServer) error {
|
|
stream, err := s.srv.Patch()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
for {
|
|
req, err := gStream.Recv()
|
|
if err != nil {
|
|
if errors.Is(err, io.EOF) {
|
|
resp, err := stream.CloseAndRecv(gStream.Context())
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return gStream.SendAndClose(resp.ToGRPCMessage().(*objectGRPC.PatchResponse))
|
|
}
|
|
|
|
return err
|
|
}
|
|
|
|
patchReq := new(object.PatchRequest)
|
|
if err := patchReq.FromGRPCMessage(req); err != nil {
|
|
return err
|
|
}
|
|
|
|
if err := stream.Send(gStream.Context(), patchReq); err != nil {
|
|
if errors.Is(err, util.ErrAbortStream) {
|
|
resp, err := stream.CloseAndRecv(gStream.Context())
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return gStream.SendAndClose(resp.ToGRPCMessage().(*objectGRPC.PatchResponse))
|
|
}
|
|
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
|
|
// Put opens internal Object service Put stream and overtakes data from gRPC stream to it.
|
|
func (s *Server) Put(gStream objectGRPC.ObjectService_PutServer) error {
|
|
stream, err := s.srv.Put()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
for {
|
|
req, err := gStream.Recv()
|
|
if err != nil {
|
|
if errors.Is(err, io.EOF) {
|
|
resp, err := stream.CloseAndRecv(gStream.Context())
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return gStream.SendAndClose(resp.ToGRPCMessage().(*objectGRPC.PutResponse))
|
|
}
|
|
|
|
return err
|
|
}
|
|
|
|
putReq := new(object.PutRequest)
|
|
if err := putReq.FromGRPCMessage(req); err != nil {
|
|
return err
|
|
}
|
|
|
|
if err := stream.Send(gStream.Context(), putReq); err != nil {
|
|
if errors.Is(err, util.ErrAbortStream) {
|
|
resp, err := stream.CloseAndRecv(gStream.Context())
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return gStream.SendAndClose(resp.ToGRPCMessage().(*objectGRPC.PutResponse))
|
|
}
|
|
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
|
|
// Delete converts gRPC DeleteRequest message and passes it to internal Object service.
|
|
func (s *Server) Delete(ctx context.Context, req *objectGRPC.DeleteRequest) (*objectGRPC.DeleteResponse, error) {
|
|
delReq := new(object.DeleteRequest)
|
|
if err := delReq.FromGRPCMessage(req); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
resp, err := s.srv.Delete(ctx, delReq)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return resp.ToGRPCMessage().(*objectGRPC.DeleteResponse), nil
|
|
}
|
|
|
|
// Head converts gRPC HeadRequest message and passes it to internal Object service.
|
|
func (s *Server) Head(ctx context.Context, req *objectGRPC.HeadRequest) (*objectGRPC.HeadResponse, error) {
|
|
searchReq := new(object.HeadRequest)
|
|
if err := searchReq.FromGRPCMessage(req); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
resp, err := s.srv.Head(ctx, searchReq)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return resp.ToGRPCMessage().(*objectGRPC.HeadResponse), nil
|
|
}
|
|
|
|
// GetRangeHash converts gRPC GetRangeHashRequest message and passes it to internal Object service.
|
|
func (s *Server) GetRangeHash(ctx context.Context, req *objectGRPC.GetRangeHashRequest) (*objectGRPC.GetRangeHashResponse, error) {
|
|
hashRngReq := new(object.GetRangeHashRequest)
|
|
if err := hashRngReq.FromGRPCMessage(req); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
resp, err := s.srv.GetRangeHash(ctx, hashRngReq)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return resp.ToGRPCMessage().(*objectGRPC.GetRangeHashResponse), nil
|
|
}
|
|
|
|
func (s *Server) PutSingle(ctx context.Context, req *objectGRPC.PutSingleRequest) (*objectGRPC.PutSingleResponse, error) {
|
|
putSingleReq := &object.PutSingleRequest{}
|
|
if err := putSingleReq.FromGRPCMessage(req); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
resp, err := s.srv.PutSingle(ctx, putSingleReq)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return resp.ToGRPCMessage().(*objectGRPC.PutSingleResponse), nil
|
|
}
|