package object import ( "context" "errors" "io" "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object" objectGRPC "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object/grpc" objectSvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/util" ) // Server wraps FrostFS API Object service and // provides gRPC Object service server interface. type Server struct { srv objectSvc.ServiceServer } // New creates, initializes and returns Server instance. func New(c objectSvc.ServiceServer) *Server { return &Server{ srv: c, } } // Patch opens internal Object patch stream and feeds it by the data read from gRPC stream. func (s *Server) Patch(gStream objectGRPC.ObjectService_PatchServer) error { stream, err := s.srv.Patch() if err != nil { return err } for { req, err := gStream.Recv() if err != nil { if errors.Is(err, io.EOF) { resp, err := stream.CloseAndRecv(gStream.Context()) if err != nil { return err } return gStream.SendAndClose(resp.ToGRPCMessage().(*objectGRPC.PatchResponse)) } return err } patchReq := new(object.PatchRequest) if err := patchReq.FromGRPCMessage(req); err != nil { return err } if err := stream.Send(gStream.Context(), patchReq); err != nil { if errors.Is(err, util.ErrAbortStream) { resp, err := stream.CloseAndRecv(gStream.Context()) if err != nil { return err } return gStream.SendAndClose(resp.ToGRPCMessage().(*objectGRPC.PatchResponse)) } return err } } } // Put opens internal Object service Put stream and overtakes data from gRPC stream to it. func (s *Server) Put(gStream objectGRPC.ObjectService_PutServer) error { stream, err := s.srv.Put() if err != nil { return err } for { req, err := gStream.Recv() if err != nil { if errors.Is(err, io.EOF) { resp, err := stream.CloseAndRecv(gStream.Context()) if err != nil { return err } return gStream.SendAndClose(resp.ToGRPCMessage().(*objectGRPC.PutResponse)) } return err } putReq := new(object.PutRequest) if err := putReq.FromGRPCMessage(req); err != nil { return err } if err := stream.Send(gStream.Context(), putReq); err != nil { if errors.Is(err, util.ErrAbortStream) { resp, err := stream.CloseAndRecv(gStream.Context()) if err != nil { return err } return gStream.SendAndClose(resp.ToGRPCMessage().(*objectGRPC.PutResponse)) } return err } } } // Delete converts gRPC DeleteRequest message and passes it to internal Object service. func (s *Server) Delete(ctx context.Context, req *objectGRPC.DeleteRequest) (*objectGRPC.DeleteResponse, error) { delReq := new(object.DeleteRequest) if err := delReq.FromGRPCMessage(req); err != nil { return nil, err } resp, err := s.srv.Delete(ctx, delReq) if err != nil { return nil, err } return resp.ToGRPCMessage().(*objectGRPC.DeleteResponse), nil } // Head converts gRPC HeadRequest message and passes it to internal Object service. func (s *Server) Head(ctx context.Context, req *objectGRPC.HeadRequest) (*objectGRPC.HeadResponse, error) { searchReq := new(object.HeadRequest) if err := searchReq.FromGRPCMessage(req); err != nil { return nil, err } resp, err := s.srv.Head(ctx, searchReq) if err != nil { return nil, err } return resp.ToGRPCMessage().(*objectGRPC.HeadResponse), nil } // GetRangeHash converts gRPC GetRangeHashRequest message and passes it to internal Object service. func (s *Server) GetRangeHash(ctx context.Context, req *objectGRPC.GetRangeHashRequest) (*objectGRPC.GetRangeHashResponse, error) { hashRngReq := new(object.GetRangeHashRequest) if err := hashRngReq.FromGRPCMessage(req); err != nil { return nil, err } resp, err := s.srv.GetRangeHash(ctx, hashRngReq) if err != nil { return nil, err } return resp.ToGRPCMessage().(*objectGRPC.GetRangeHashResponse), nil } func (s *Server) PutSingle(ctx context.Context, req *objectGRPC.PutSingleRequest) (*objectGRPC.PutSingleResponse, error) { putSingleReq := &object.PutSingleRequest{} if err := putSingleReq.FromGRPCMessage(req); err != nil { return nil, err } resp, err := s.srv.PutSingle(ctx, putSingleReq) if err != nil { return nil, err } return resp.ToGRPCMessage().(*objectGRPC.PutSingleResponse), nil }