forked from TrueCloudLab/frostfs-node
[#460] services/util: Remove CreateRequestStreamer
There is no need in a wrapper with many from-`interface{}` conversions. Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
This commit is contained in:
parent
a64dc9ad70
commit
40eae22109
2 changed files with 11 additions and 61 deletions
|
@ -5,7 +5,6 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
|
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/util"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/util/response"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/util/response"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -34,7 +33,8 @@ type getRangeStreamResponser struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
type putStreamResponser struct {
|
type putStreamResponser struct {
|
||||||
stream *response.ClientMessageStreamer
|
stream PutObjectStream
|
||||||
|
respSvc *response.Service
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewResponseService returns object service instance that passes internal service
|
// NewResponseService returns object service instance that passes internal service
|
||||||
|
@ -59,16 +59,20 @@ func (s *ResponseService) Get(req *object.GetRequest, stream GetObjectStream) er
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *putStreamResponser) Send(ctx context.Context, req *object.PutRequest) error {
|
func (s *putStreamResponser) Send(ctx context.Context, req *object.PutRequest) error {
|
||||||
return s.stream.Send(ctx, req)
|
if err := s.stream.Send(ctx, req); err != nil {
|
||||||
|
return fmt.Errorf("could not send the request: %w", err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *putStreamResponser) CloseAndRecv(ctx context.Context) (*object.PutResponse, error) {
|
func (s *putStreamResponser) CloseAndRecv(ctx context.Context) (*object.PutResponse, error) {
|
||||||
r, err := s.stream.CloseAndRecv(ctx)
|
r, err := s.stream.CloseAndRecv(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("(%T) could not receive response: %w", s, err)
|
return nil, fmt.Errorf("could not close stream and receive response: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return r.(*object.PutResponse), nil
|
s.respSvc.SetMeta(r)
|
||||||
|
return r, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *ResponseService) Put() (PutObjectStream, error) {
|
func (s *ResponseService) Put() (PutObjectStream, error) {
|
||||||
|
@ -78,14 +82,8 @@ func (s *ResponseService) Put() (PutObjectStream, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
return &putStreamResponser{
|
return &putStreamResponser{
|
||||||
stream: s.respSvc.CreateRequestStreamer(
|
stream: stream,
|
||||||
func(ctx context.Context, req any) error {
|
respSvc: s.respSvc,
|
||||||
return stream.Send(ctx, req.(*object.PutRequest))
|
|
||||||
},
|
|
||||||
func(ctx context.Context) (util.ResponseMessage, error) {
|
|
||||||
return stream.CloseAndRecv(ctx)
|
|
||||||
},
|
|
||||||
),
|
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,48 +0,0 @@
|
||||||
package response
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"fmt"
|
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/util"
|
|
||||||
)
|
|
||||||
|
|
||||||
// ClientMessageStreamer represents client-side message streamer
|
|
||||||
// that sets meta values to the response.
|
|
||||||
type ClientMessageStreamer struct {
|
|
||||||
srv *Service
|
|
||||||
|
|
||||||
send util.RequestMessageWriter
|
|
||||||
|
|
||||||
close util.ClientStreamCloser
|
|
||||||
}
|
|
||||||
|
|
||||||
// Send calls send method of internal streamer.
|
|
||||||
func (s *ClientMessageStreamer) Send(ctx context.Context, req any) error {
|
|
||||||
if err := s.send(ctx, req); err != nil {
|
|
||||||
return fmt.Errorf("(%T) could not send the request: %w", s, err)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// CloseAndRecv closes internal stream, receivers the response,
|
|
||||||
// sets meta values and returns the result.
|
|
||||||
func (s *ClientMessageStreamer) CloseAndRecv(ctx context.Context) (util.ResponseMessage, error) {
|
|
||||||
resp, err := s.close(ctx)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("(%T) could not close stream and receive response: %w", s, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
s.srv.SetMeta(resp)
|
|
||||||
|
|
||||||
return resp, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// CreateRequestStreamer wraps stream methods and returns ClientMessageStreamer instance.
|
|
||||||
func (s *Service) CreateRequestStreamer(sender util.RequestMessageWriter, closer util.ClientStreamCloser) *ClientMessageStreamer {
|
|
||||||
return &ClientMessageStreamer{
|
|
||||||
srv: s,
|
|
||||||
send: sender,
|
|
||||||
close: closer,
|
|
||||||
}
|
|
||||||
}
|
|
Loading…
Reference in a new issue