[#1307] go.mod: Bump frostfs-sdk-go/frostfs-api-go/v2 versions
* Also, resolve dependencies and conflicts for object service by creating stub for `Patch` method. Signed-off-by: Airat Arifullin <a.arifullin@yadro.com>
This commit is contained in:
parent
80ce7c3a00
commit
2802f8c37a
15 changed files with 339 additions and 5 deletions
|
@ -28,6 +28,7 @@ import (
|
||||||
deletesvcV2 "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/delete/v2"
|
deletesvcV2 "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/delete/v2"
|
||||||
getsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/get"
|
getsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/get"
|
||||||
getsvcV2 "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/get/v2"
|
getsvcV2 "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/get/v2"
|
||||||
|
patchsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/patch"
|
||||||
putsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/put"
|
putsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/put"
|
||||||
putsvcV2 "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/put/v2"
|
putsvcV2 "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/put/v2"
|
||||||
searchsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/search"
|
searchsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/search"
|
||||||
|
@ -54,6 +55,8 @@ type objectSvc struct {
|
||||||
get *getsvcV2.Service
|
get *getsvcV2.Service
|
||||||
|
|
||||||
delete *deletesvcV2.Service
|
delete *deletesvcV2.Service
|
||||||
|
|
||||||
|
patch *patchsvc.Service
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *cfg) MaxObjectSize() uint64 {
|
func (c *cfg) MaxObjectSize() uint64 {
|
||||||
|
@ -71,6 +74,10 @@ func (s *objectSvc) Put() (objectService.PutObjectStream, error) {
|
||||||
return s.put.Put()
|
return s.put.Put()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *objectSvc) Patch() (objectService.PatchObjectstream, error) {
|
||||||
|
return s.patch.Patch()
|
||||||
|
}
|
||||||
|
|
||||||
func (s *objectSvc) PutSingle(ctx context.Context, req *object.PutSingleRequest) (*object.PutSingleResponse, error) {
|
func (s *objectSvc) PutSingle(ctx context.Context, req *object.PutSingleRequest) (*object.PutSingleResponse, error) {
|
||||||
return s.put.PutSingle(ctx, req)
|
return s.put.PutSingle(ctx, req)
|
||||||
}
|
}
|
||||||
|
@ -181,10 +188,12 @@ func initObjectService(c *cfg) {
|
||||||
|
|
||||||
sDeleteV2 := createDeleteServiceV2(sDelete)
|
sDeleteV2 := createDeleteServiceV2(sDelete)
|
||||||
|
|
||||||
|
sPatch := createPatchSvc(sGet, sPut, keyStorage)
|
||||||
|
|
||||||
// build service pipeline
|
// build service pipeline
|
||||||
// grpc | audit | <metrics> | signature | response | acl | ape | split
|
// grpc | audit | <metrics> | signature | response | acl | ape | split
|
||||||
|
|
||||||
splitSvc := createSplitService(c, sPutV2, sGetV2, sSearchV2, sDeleteV2)
|
splitSvc := createSplitService(c, sPutV2, sGetV2, sSearchV2, sDeleteV2, sPatch)
|
||||||
|
|
||||||
apeSvc := createAPEService(c, splitSvc)
|
apeSvc := createAPEService(c, splitSvc)
|
||||||
|
|
||||||
|
@ -353,6 +362,10 @@ func createPutSvcV2(sPut *putsvc.Service, keyStorage *util.KeyStorage) *putsvcV2
|
||||||
return putsvcV2.NewService(sPut, keyStorage)
|
return putsvcV2.NewService(sPut, keyStorage)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func createPatchSvc(sGet *getsvc.Service, sPut *putsvc.Service, keyStorage *util.KeyStorage) *patchsvc.Service {
|
||||||
|
return patchsvc.NewService(keyStorage, sGet, sPut)
|
||||||
|
}
|
||||||
|
|
||||||
func createSearchSvc(c *cfg, keyStorage *util.KeyStorage, traverseGen *util.TraverserGenerator, coreConstructor *cache.ClientCache) *searchsvc.Service {
|
func createSearchSvc(c *cfg, keyStorage *util.KeyStorage, traverseGen *util.TraverserGenerator, coreConstructor *cache.ClientCache) *searchsvc.Service {
|
||||||
ls := c.cfgObject.cfgLocalStorage.localStorage
|
ls := c.cfgObject.cfgLocalStorage.localStorage
|
||||||
|
|
||||||
|
@ -425,7 +438,7 @@ func createDeleteServiceV2(sDelete *deletesvc.Service) *deletesvcV2.Service {
|
||||||
}
|
}
|
||||||
|
|
||||||
func createSplitService(c *cfg, sPutV2 *putsvcV2.Service, sGetV2 *getsvcV2.Service,
|
func createSplitService(c *cfg, sPutV2 *putsvcV2.Service, sGetV2 *getsvcV2.Service,
|
||||||
sSearchV2 *searchsvcV2.Service, sDeleteV2 *deletesvcV2.Service,
|
sSearchV2 *searchsvcV2.Service, sDeleteV2 *deletesvcV2.Service, sPatch *patchsvc.Service,
|
||||||
) *objectService.TransportSplitter {
|
) *objectService.TransportSplitter {
|
||||||
return objectService.NewTransportSplitter(
|
return objectService.NewTransportSplitter(
|
||||||
c.cfgGRPC.maxChunkSize,
|
c.cfgGRPC.maxChunkSize,
|
||||||
|
@ -435,6 +448,7 @@ func createSplitService(c *cfg, sPutV2 *putsvcV2.Service, sGetV2 *getsvcV2.Servi
|
||||||
search: sSearchV2,
|
search: sSearchV2,
|
||||||
get: sGetV2,
|
get: sGetV2,
|
||||||
delete: sDeleteV2,
|
delete: sDeleteV2,
|
||||||
|
patch: sPatch,
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
6
go.mod
6
go.mod
|
@ -4,14 +4,14 @@ go 1.21
|
||||||
|
|
||||||
require (
|
require (
|
||||||
code.gitea.io/sdk/gitea v0.17.1
|
code.gitea.io/sdk/gitea v0.17.1
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.16.1-0.20240726072425-3dfa2f4fd65e
|
git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.16.1-0.20240813155151-d112a28d382f
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-contract v0.19.3-0.20240621131249-49e5270f673e
|
git.frostfs.info/TrueCloudLab/frostfs-contract v0.19.3-0.20240621131249-49e5270f673e
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-crypto v0.6.0
|
git.frostfs.info/TrueCloudLab/frostfs-crypto v0.6.0
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-locode-db v0.4.1-0.20240710074952-65761deb5c0d
|
git.frostfs.info/TrueCloudLab/frostfs-locode-db v0.4.1-0.20240710074952-65761deb5c0d
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20231101111734-b3ad3335ff65
|
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20231101111734-b3ad3335ff65
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20240726111349-9da46f566fec
|
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20240813155821-98aabc45a720
|
||||||
git.frostfs.info/TrueCloudLab/hrw v1.2.1
|
git.frostfs.info/TrueCloudLab/hrw v1.2.1
|
||||||
git.frostfs.info/TrueCloudLab/policy-engine v0.0.0-20240712081403-2628f6184984
|
git.frostfs.info/TrueCloudLab/policy-engine v0.0.0-20240814080254-96225afacb88
|
||||||
git.frostfs.info/TrueCloudLab/tzhash v1.8.0
|
git.frostfs.info/TrueCloudLab/tzhash v1.8.0
|
||||||
git.frostfs.info/TrueCloudLab/zapjournald v0.0.0-20240124114243-cb2e66427d02
|
git.frostfs.info/TrueCloudLab/zapjournald v0.0.0-20240124114243-cb2e66427d02
|
||||||
github.com/cheggaaa/pb v1.0.29
|
github.com/cheggaaa/pb v1.0.29
|
||||||
|
|
BIN
go.sum
BIN
go.sum
Binary file not shown.
|
@ -24,6 +24,48 @@ func New(c objectSvc.ServiceServer) *Server {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Patch opens internal Object patch stream and feeds it by the data read from gRPC stream.
|
||||||
|
func (s *Server) Patch(gStream objectGRPC.ObjectService_PatchServer) error {
|
||||||
|
stream, err := s.srv.Patch()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
for {
|
||||||
|
req, err := gStream.Recv()
|
||||||
|
if err != nil {
|
||||||
|
if errors.Is(err, io.EOF) {
|
||||||
|
resp, err := stream.CloseAndRecv(gStream.Context())
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return gStream.SendAndClose(resp.ToGRPCMessage().(*objectGRPC.PatchResponse))
|
||||||
|
}
|
||||||
|
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
patchReq := new(object.PatchRequest)
|
||||||
|
if err := patchReq.FromGRPCMessage(req); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := stream.Send(gStream.Context(), patchReq); err != nil {
|
||||||
|
if errors.Is(err, util.ErrAbortStream) {
|
||||||
|
resp, err := stream.CloseAndRecv(gStream.Context())
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return gStream.SendAndClose(resp.ToGRPCMessage().(*objectGRPC.PatchResponse))
|
||||||
|
}
|
||||||
|
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Put opens internal Object service Put stream and overtakes data from gRPC stream to it.
|
// Put opens internal Object service Put stream and overtakes data from gRPC stream to it.
|
||||||
func (s *Server) Put(gStream objectGRPC.ObjectService_PutServer) error {
|
func (s *Server) Put(gStream objectGRPC.ObjectService_PutServer) error {
|
||||||
stream, err := s.srv.Put()
|
stream, err := s.srv.Put()
|
||||||
|
|
|
@ -249,6 +249,26 @@ func (b Service) Put() (object.PutObjectStream, error) {
|
||||||
}, err
|
}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type patchStreamBasicChecker struct {
|
||||||
|
next object.PatchObjectstream
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p patchStreamBasicChecker) Send(ctx context.Context, request *objectV2.PatchRequest) error {
|
||||||
|
return p.next.Send(ctx, request)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p patchStreamBasicChecker) CloseAndRecv(ctx context.Context) (*objectV2.PatchResponse, error) {
|
||||||
|
return p.next.CloseAndRecv(ctx)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b Service) Patch() (object.PatchObjectstream, error) {
|
||||||
|
streamer, err := b.next.Patch()
|
||||||
|
|
||||||
|
return &patchStreamBasicChecker{
|
||||||
|
next: streamer,
|
||||||
|
}, err
|
||||||
|
}
|
||||||
|
|
||||||
func (b Service) Head(
|
func (b Service) Head(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
request *objectV2.HeadRequest,
|
request *objectV2.HeadRequest,
|
||||||
|
|
|
@ -204,6 +204,26 @@ func (c *Service) Put() (objectSvc.PutObjectStream, error) {
|
||||||
}, err
|
}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type patchStreamBasicChecker struct {
|
||||||
|
next objectSvc.PatchObjectstream
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *patchStreamBasicChecker) Send(ctx context.Context, request *objectV2.PatchRequest) error {
|
||||||
|
return p.next.Send(ctx, request)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p patchStreamBasicChecker) CloseAndRecv(ctx context.Context) (*objectV2.PatchResponse, error) {
|
||||||
|
return p.next.CloseAndRecv(ctx)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Service) Patch() (objectSvc.PatchObjectstream, error) {
|
||||||
|
streamer, err := c.next.Patch()
|
||||||
|
|
||||||
|
return &patchStreamBasicChecker{
|
||||||
|
next: streamer,
|
||||||
|
}, err
|
||||||
|
}
|
||||||
|
|
||||||
func (c *Service) Head(ctx context.Context, request *objectV2.HeadRequest) (*objectV2.HeadResponse, error) {
|
func (c *Service) Head(ctx context.Context, request *objectV2.HeadRequest) (*objectV2.HeadResponse, error) {
|
||||||
cnrID, objID, err := getAddressParamsSDK(request.GetBody().GetAddress().GetContainerID(), request.GetBody().GetAddress().GetObjectID())
|
cnrID, objID, err := getAddressParamsSDK(request.GetBody().GetAddress().GetContainerID(), request.GetBody().GetAddress().GetObjectID())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -170,3 +170,59 @@ func (a *auditPutStream) Send(ctx context.Context, req *object.PutRequest) error
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type auditPatchStream struct {
|
||||||
|
stream PatchObjectstream
|
||||||
|
log *logger.Logger
|
||||||
|
|
||||||
|
failed bool
|
||||||
|
key []byte
|
||||||
|
containerID *refs.ContainerID
|
||||||
|
objectID *refs.ObjectID
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *auditService) Patch() (PatchObjectstream, error) {
|
||||||
|
res, err := a.next.Patch()
|
||||||
|
if !a.enabled.Load() {
|
||||||
|
return res, err
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
audit.LogRequest(a.log, objectGRPC.ObjectService_Patch_FullMethodName, nil, nil, false)
|
||||||
|
return res, err
|
||||||
|
}
|
||||||
|
return &auditPatchStream{
|
||||||
|
stream: res,
|
||||||
|
log: a.log,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// CloseAndRecv implements PutObjectStream.
|
||||||
|
func (a *auditPatchStream) CloseAndRecv(ctx context.Context) (*object.PatchResponse, error) {
|
||||||
|
resp, err := a.stream.CloseAndRecv(ctx)
|
||||||
|
if err != nil {
|
||||||
|
a.failed = true
|
||||||
|
}
|
||||||
|
a.objectID = resp.GetBody().ObjectID
|
||||||
|
audit.LogRequestWithKey(a.log, objectGRPC.ObjectService_Patch_FullMethodName, a.key,
|
||||||
|
audit.TargetFromContainerIDObjectID(a.containerID, a.objectID),
|
||||||
|
!a.failed)
|
||||||
|
return resp, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Send implements PutObjectStream.
|
||||||
|
func (a *auditPatchStream) Send(ctx context.Context, req *object.PatchRequest) error {
|
||||||
|
a.containerID = req.GetBody().GetAddress().GetContainerID()
|
||||||
|
a.objectID = req.GetBody().GetAddress().GetObjectID()
|
||||||
|
a.key = req.GetVerificationHeader().GetBodySignature().GetKey()
|
||||||
|
|
||||||
|
err := a.stream.Send(ctx, req)
|
||||||
|
if err != nil {
|
||||||
|
a.failed = true
|
||||||
|
}
|
||||||
|
if !errors.Is(err, util.ErrAbortStream) { // CloseAndRecv will not be called, so log here
|
||||||
|
audit.LogRequestWithKey(a.log, objectGRPC.ObjectService_Patch_FullMethodName, a.key,
|
||||||
|
audit.TargetFromContainerIDObjectID(a.containerID, a.objectID),
|
||||||
|
!a.failed)
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
|
@ -48,6 +48,14 @@ func (x *Common) Put() (PutObjectStream, error) {
|
||||||
return x.nextHandler.Put()
|
return x.nextHandler.Put()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (x *Common) Patch() (PatchObjectstream, error) {
|
||||||
|
if x.state.IsMaintenance() {
|
||||||
|
return nil, new(apistatus.NodeUnderMaintenance)
|
||||||
|
}
|
||||||
|
|
||||||
|
return x.nextHandler.Patch()
|
||||||
|
}
|
||||||
|
|
||||||
func (x *Common) Head(ctx context.Context, req *objectV2.HeadRequest) (*objectV2.HeadResponse, error) {
|
func (x *Common) Head(ctx context.Context, req *objectV2.HeadRequest) (*objectV2.HeadResponse, error) {
|
||||||
if x.state.IsMaintenance() {
|
if x.state.IsMaintenance() {
|
||||||
return nil, new(apistatus.NodeUnderMaintenance)
|
return nil, new(apistatus.NodeUnderMaintenance)
|
||||||
|
|
|
@ -27,6 +27,12 @@ type (
|
||||||
start time.Time
|
start time.Time
|
||||||
}
|
}
|
||||||
|
|
||||||
|
patchStreamMetric struct {
|
||||||
|
stream PatchObjectstream
|
||||||
|
metrics MetricRegister
|
||||||
|
start time.Time
|
||||||
|
}
|
||||||
|
|
||||||
MetricRegister interface {
|
MetricRegister interface {
|
||||||
AddRequestDuration(string, time.Duration, bool)
|
AddRequestDuration(string, time.Duration, bool)
|
||||||
AddPayloadSize(string, int)
|
AddPayloadSize(string, int)
|
||||||
|
@ -76,6 +82,24 @@ func (m MetricCollector) Put() (PutObjectStream, error) {
|
||||||
return m.next.Put()
|
return m.next.Put()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (m MetricCollector) Patch() (PatchObjectstream, error) {
|
||||||
|
if m.enabled {
|
||||||
|
t := time.Now()
|
||||||
|
|
||||||
|
stream, err := m.next.Patch()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return &patchStreamMetric{
|
||||||
|
stream: stream,
|
||||||
|
metrics: m.metrics,
|
||||||
|
start: t,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
return m.next.Patch()
|
||||||
|
}
|
||||||
|
|
||||||
func (m MetricCollector) PutSingle(ctx context.Context, request *object.PutSingleRequest) (*object.PutSingleResponse, error) {
|
func (m MetricCollector) PutSingle(ctx context.Context, request *object.PutSingleRequest) (*object.PutSingleResponse, error) {
|
||||||
if m.enabled {
|
if m.enabled {
|
||||||
t := time.Now()
|
t := time.Now()
|
||||||
|
@ -189,3 +213,16 @@ func (s putStreamMetric) CloseAndRecv(ctx context.Context) (*object.PutResponse,
|
||||||
|
|
||||||
return res, err
|
return res, err
|
||||||
}
|
}
|
||||||
|
func (s patchStreamMetric) Send(ctx context.Context, req *object.PatchRequest) error {
|
||||||
|
s.metrics.AddPayloadSize("Patch", len(req.GetBody().GetPatch().Chunk))
|
||||||
|
|
||||||
|
return s.stream.Send(ctx, req)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s patchStreamMetric) CloseAndRecv(ctx context.Context) (*object.PatchResponse, error) {
|
||||||
|
res, err := s.stream.CloseAndRecv(ctx)
|
||||||
|
|
||||||
|
s.metrics.AddRequestDuration("Patch", time.Since(s.start), err == nil)
|
||||||
|
|
||||||
|
return res, err
|
||||||
|
}
|
||||||
|
|
22
pkg/services/object/patch/service.go
Normal file
22
pkg/services/object/patch/service.go
Normal file
|
@ -0,0 +1,22 @@
|
||||||
|
package patchsvc
|
||||||
|
|
||||||
|
import (
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object"
|
||||||
|
getsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/get"
|
||||||
|
putsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/put"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Service implements Put operation of Object service v2.
|
||||||
|
type Service struct {
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewService constructs Service instance from provided options.
|
||||||
|
func NewService(_ *util.KeyStorage, _ *getsvc.Service, _ *putsvc.Service) *Service {
|
||||||
|
return &Service{}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Put calls internal service and returns v2 object streamer.
|
||||||
|
func (s *Service) Patch() (object.PatchObjectstream, error) {
|
||||||
|
return &Streamer{}, nil
|
||||||
|
}
|
28
pkg/services/object/patch/streamer.go
Normal file
28
pkg/services/object/patch/streamer.go
Normal file
|
@ -0,0 +1,28 @@
|
||||||
|
package patchsvc
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Streamer for the patch handler is a pipeline that merges two incoming
|
||||||
|
// streams of patches and original object payload chunks.
|
||||||
|
// The merged result is fed to Put stream target.
|
||||||
|
type Streamer struct{}
|
||||||
|
|
||||||
|
func (s *Streamer) Send(ctx context.Context, _ *object.PatchRequest) error {
|
||||||
|
_, span := tracing.StartSpanFromContext(ctx, "patch.streamer.Send")
|
||||||
|
defer span.End()
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Streamer) CloseAndRecv(_ context.Context) (*object.PatchResponse, error) {
|
||||||
|
return &object.PatchResponse{
|
||||||
|
Body: &object.PatchResponseBody{
|
||||||
|
ObjectID: nil,
|
||||||
|
},
|
||||||
|
}, nil
|
||||||
|
}
|
|
@ -37,6 +37,11 @@ type putStreamResponser struct {
|
||||||
respSvc *response.Service
|
respSvc *response.Service
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type patchStreamResponser struct {
|
||||||
|
stream PatchObjectstream
|
||||||
|
respSvc *response.Service
|
||||||
|
}
|
||||||
|
|
||||||
// NewResponseService returns object service instance that passes internal service
|
// NewResponseService returns object service instance that passes internal service
|
||||||
// call to response service.
|
// call to response service.
|
||||||
func NewResponseService(objSvc ServiceServer, respSvc *response.Service) *ResponseService {
|
func NewResponseService(objSvc ServiceServer, respSvc *response.Service) *ResponseService {
|
||||||
|
@ -87,6 +92,35 @@ func (s *ResponseService) Put() (PutObjectStream, error) {
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *patchStreamResponser) Send(ctx context.Context, req *object.PatchRequest) error {
|
||||||
|
if err := s.stream.Send(ctx, req); err != nil {
|
||||||
|
return fmt.Errorf("could not send the request: %w", err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *patchStreamResponser) CloseAndRecv(ctx context.Context) (*object.PatchResponse, error) {
|
||||||
|
r, err := s.stream.CloseAndRecv(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("could not close stream and receive response: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
s.respSvc.SetMeta(r)
|
||||||
|
return r, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *ResponseService) Patch() (PatchObjectstream, error) {
|
||||||
|
stream, err := s.svc.Patch()
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("could not create Put object streamer: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return &patchStreamResponser{
|
||||||
|
stream: stream,
|
||||||
|
respSvc: s.respSvc,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
func (s *ResponseService) PutSingle(ctx context.Context, req *object.PutSingleRequest) (*object.PutSingleResponse, error) {
|
func (s *ResponseService) PutSingle(ctx context.Context, req *object.PutSingleRequest) (*object.PutSingleResponse, error) {
|
||||||
resp, err := s.svc.PutSingle(ctx, req)
|
resp, err := s.svc.PutSingle(ctx, req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -31,11 +31,18 @@ type PutObjectStream interface {
|
||||||
CloseAndRecv(context.Context) (*object.PutResponse, error)
|
CloseAndRecv(context.Context) (*object.PutResponse, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// PatchObjectstream is an interface of FrostFS API v2 compatible patch streamer.
|
||||||
|
type PatchObjectstream interface {
|
||||||
|
Send(context.Context, *object.PatchRequest) error
|
||||||
|
CloseAndRecv(context.Context) (*object.PatchResponse, error)
|
||||||
|
}
|
||||||
|
|
||||||
// ServiceServer is an interface of utility
|
// ServiceServer is an interface of utility
|
||||||
// serving v2 Object service.
|
// serving v2 Object service.
|
||||||
type ServiceServer interface {
|
type ServiceServer interface {
|
||||||
Get(*object.GetRequest, GetObjectStream) error
|
Get(*object.GetRequest, GetObjectStream) error
|
||||||
Put() (PutObjectStream, error)
|
Put() (PutObjectStream, error)
|
||||||
|
Patch() (PatchObjectstream, error)
|
||||||
Head(context.Context, *object.HeadRequest) (*object.HeadResponse, error)
|
Head(context.Context, *object.HeadRequest) (*object.HeadResponse, error)
|
||||||
Search(*object.SearchRequest, SearchStream) error
|
Search(*object.SearchRequest, SearchStream) error
|
||||||
Delete(context.Context, *object.DeleteRequest) (*object.DeleteResponse, error)
|
Delete(context.Context, *object.DeleteRequest) (*object.DeleteResponse, error)
|
||||||
|
|
|
@ -35,6 +35,12 @@ type putStreamSigner struct {
|
||||||
err error
|
err error
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type patchStreamSigner struct {
|
||||||
|
sigSvc *util.SignService
|
||||||
|
stream PatchObjectstream
|
||||||
|
err error
|
||||||
|
}
|
||||||
|
|
||||||
type getRangeStreamSigner struct {
|
type getRangeStreamSigner struct {
|
||||||
GetObjectRangeStream
|
GetObjectRangeStream
|
||||||
sigSvc *util.SignService
|
sigSvc *util.SignService
|
||||||
|
@ -112,6 +118,42 @@ func (s *SignService) Put() (PutObjectStream, error) {
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *patchStreamSigner) Send(ctx context.Context, req *object.PatchRequest) error {
|
||||||
|
if s.err = s.sigSvc.VerifyRequest(req); s.err != nil {
|
||||||
|
return util.ErrAbortStream
|
||||||
|
}
|
||||||
|
if s.err = s.stream.Send(ctx, req); s.err != nil {
|
||||||
|
return util.ErrAbortStream
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *patchStreamSigner) CloseAndRecv(ctx context.Context) (resp *object.PatchResponse, err error) {
|
||||||
|
if s.err != nil {
|
||||||
|
err = s.err
|
||||||
|
resp = new(object.PatchResponse)
|
||||||
|
} else {
|
||||||
|
resp, err = s.stream.CloseAndRecv(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("could not close stream and receive response: %w", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return resp, s.sigSvc.SignResponse(resp, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *SignService) Patch() (PatchObjectstream, error) {
|
||||||
|
stream, err := s.svc.Patch()
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("could not create Put object streamer: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return &patchStreamSigner{
|
||||||
|
stream: stream,
|
||||||
|
sigSvc: s.sigSvc,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
func (s *SignService) Head(ctx context.Context, req *object.HeadRequest) (*object.HeadResponse, error) {
|
func (s *SignService) Head(ctx context.Context, req *object.HeadRequest) (*object.HeadResponse, error) {
|
||||||
if err := s.sigSvc.VerifyRequest(req); err != nil {
|
if err := s.sigSvc.VerifyRequest(req); err != nil {
|
||||||
resp := new(object.HeadResponse)
|
resp := new(object.HeadResponse)
|
||||||
|
|
|
@ -91,6 +91,10 @@ func (c TransportSplitter) Put() (PutObjectStream, error) {
|
||||||
return c.next.Put()
|
return c.next.Put()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c TransportSplitter) Patch() (PatchObjectstream, error) {
|
||||||
|
return c.next.Patch()
|
||||||
|
}
|
||||||
|
|
||||||
func (c TransportSplitter) Head(ctx context.Context, request *object.HeadRequest) (*object.HeadResponse, error) {
|
func (c TransportSplitter) Head(ctx context.Context, request *object.HeadRequest) (*object.HeadResponse, error) {
|
||||||
return c.next.Head(ctx, request)
|
return c.next.Head(ctx, request)
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue