package object import ( "context" "github.com/nspcc-dev/neofs-api-go/v2/object" "github.com/nspcc-dev/neofs-api-go/v2/session" "github.com/pkg/errors" ) type GetObjectBodyStreamer interface { Recv() (*object.GetResponseBody, error) } type PutObjectBodyStreamer interface { Send(*object.PutRequestBody) error CloseAndRecv() (*object.PutResponseBody, error) } type SearchObjectBodyStreamer interface { Recv() (*object.SearchResponseBody, error) } type GetRangeObjectBodyStreamer interface { Recv() (*object.GetRangeResponseBody, error) } type ServiceExecutor interface { Get(context.Context, *object.GetRequestBody) (GetObjectBodyStreamer, error) Put(context.Context) (PutObjectBodyStreamer, error) Head(context.Context, *object.HeadRequestBody) (*object.HeadResponseBody, error) Search(context.Context, *object.SearchRequestBody) (SearchObjectBodyStreamer, error) Delete(context.Context, *object.DeleteRequestBody) (*object.DeleteResponseBody, error) GetRange(context.Context, *object.GetRangeRequestBody) (GetRangeObjectBodyStreamer, error) GetRangeHash(context.Context, *object.GetRangeHashRequestBody) (*object.GetRangeHashResponseBody, error) } type executorSvc struct { exec ServiceExecutor metaHeader *session.ResponseMetaHeader } type searchStreamer struct { bodyStreamer SearchObjectBodyStreamer metaHdr *session.ResponseMetaHeader } type getStreamer struct { bodyStreamer GetObjectBodyStreamer metaHdr *session.ResponseMetaHeader } type putStreamer struct { bodyStreamer PutObjectBodyStreamer metaHdr *session.ResponseMetaHeader } type rangeStreamer struct { bodyStreamer GetRangeObjectBodyStreamer metaHdr *session.ResponseMetaHeader } // NewExecutionService wraps ServiceExecutor and returns Object Service interface. // // Passed meta header is attached to all responses. func NewExecutionService(exec ServiceExecutor, metaHdr *session.ResponseMetaHeader) object.Service { return &executorSvc{ exec: exec, metaHeader: metaHdr, } } func (s *getStreamer) Recv() (*object.GetResponse, error) { body, err := s.bodyStreamer.Recv() if err != nil { return nil, errors.Wrap(err, "could not receive response body") } resp := new(object.GetResponse) resp.SetBody(body) resp.SetMetaHeader(s.metaHdr) return resp, nil } func (s *executorSvc) Get(ctx context.Context, req *object.GetRequest) (object.GetObjectStreamer, error) { bodyStream, err := s.exec.Get(ctx, req.GetBody()) if err != nil { return nil, errors.Wrap(err, "could not execute Get request") } return &getStreamer{ bodyStreamer: bodyStream, metaHdr: s.metaHeader, }, nil } func (s *putStreamer) Send(req *object.PutRequest) error { return s.bodyStreamer.Send(req.GetBody()) } func (s *putStreamer) CloseAndRecv() (*object.PutResponse, error) { body, err := s.bodyStreamer.CloseAndRecv() if err != nil { return nil, errors.Wrap(err, "could not receive response body") } resp := new(object.PutResponse) resp.SetBody(body) resp.SetMetaHeader(s.metaHdr) return resp, nil } func (s *executorSvc) Put(ctx context.Context) (object.PutObjectStreamer, error) { bodyStream, err := s.exec.Put(ctx) if err != nil { return nil, errors.Wrap(err, "could not execute Put request") } return &putStreamer{ bodyStreamer: bodyStream, metaHdr: s.metaHeader, }, nil } func (s *executorSvc) Head(ctx context.Context, req *object.HeadRequest) (*object.HeadResponse, error) { respBody, err := s.exec.Head(ctx, req.GetBody()) if err != nil { return nil, errors.Wrap(err, "could not execute Head request") } resp := new(object.HeadResponse) resp.SetBody(respBody) resp.SetMetaHeader(s.metaHeader) return resp, nil } func (s *searchStreamer) Recv() (*object.SearchResponse, error) { body, err := s.bodyStreamer.Recv() if err != nil { return nil, errors.Wrap(err, "could not receive response body") } resp := new(object.SearchResponse) resp.SetBody(body) resp.SetMetaHeader(s.metaHdr) return resp, nil } func (s *executorSvc) Search(ctx context.Context, req *object.SearchRequest) (object.SearchObjectStreamer, error) { bodyStream, err := s.exec.Search(ctx, req.GetBody()) if err != nil { return nil, errors.Wrap(err, "could not execute Search request") } return &searchStreamer{ bodyStreamer: bodyStream, metaHdr: s.metaHeader, }, nil } func (s *executorSvc) Delete(ctx context.Context, req *object.DeleteRequest) (*object.DeleteResponse, error) { respBody, err := s.exec.Delete(ctx, req.GetBody()) if err != nil { return nil, errors.Wrap(err, "could not execute Delete request") } resp := new(object.DeleteResponse) resp.SetBody(respBody) resp.SetMetaHeader(s.metaHeader) return resp, nil } func (s *rangeStreamer) Recv() (*object.GetRangeResponse, error) { body, err := s.bodyStreamer.Recv() if err != nil { return nil, errors.Wrap(err, "could not receive response body") } resp := new(object.GetRangeResponse) resp.SetBody(body) resp.SetMetaHeader(s.metaHdr) return resp, nil } func (s *executorSvc) GetRange(ctx context.Context, req *object.GetRangeRequest) (object.GetRangeObjectStreamer, error) { bodyStream, err := s.exec.GetRange(ctx, req.GetBody()) if err != nil { return nil, errors.Wrap(err, "could not execute GetRange request") } return &rangeStreamer{ bodyStreamer: bodyStream, metaHdr: s.metaHeader, }, nil } func (s *executorSvc) GetRangeHash(ctx context.Context, req *object.GetRangeHashRequest) (*object.GetRangeHashResponse, error) { respBody, err := s.exec.GetRangeHash(ctx, req.GetBody()) if err != nil { return nil, errors.Wrap(err, "could not execute GetRangeHash request") } resp := new(object.GetRangeHashResponse) resp.SetBody(respBody) resp.SetMetaHeader(s.metaHeader) return resp, nil }