diff --git a/cmd/frostfs-node/object.go b/cmd/frostfs-node/object.go index 0124bf772d..eef1424153 100644 --- a/cmd/frostfs-node/object.go +++ b/cmd/frostfs-node/object.go @@ -28,6 +28,7 @@ import ( deletesvcV2 "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/delete/v2" getsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/get" getsvcV2 "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/get/v2" + patchsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/patch" putsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/put" putsvcV2 "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/put/v2" searchsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/search" @@ -54,6 +55,8 @@ type objectSvc struct { get *getsvcV2.Service delete *deletesvcV2.Service + + patch *patchsvc.Service } func (c *cfg) MaxObjectSize() uint64 { @@ -71,6 +74,10 @@ func (s *objectSvc) Put() (objectService.PutObjectStream, error) { return s.put.Put() } +func (s *objectSvc) Patch() (objectService.PatchObjectstream, error) { + return s.patch.Patch() +} + func (s *objectSvc) PutSingle(ctx context.Context, req *object.PutSingleRequest) (*object.PutSingleResponse, error) { return s.put.PutSingle(ctx, req) } @@ -181,10 +188,12 @@ func initObjectService(c *cfg) { sDeleteV2 := createDeleteServiceV2(sDelete) + sPatch := createPatchSvc(sGet, sPut, keyStorage) + // build service pipeline // grpc | audit | | signature | response | acl | ape | split - splitSvc := createSplitService(c, sPutV2, sGetV2, sSearchV2, sDeleteV2) + splitSvc := createSplitService(c, sPutV2, sGetV2, sSearchV2, sDeleteV2, sPatch) apeSvc := createAPEService(c, splitSvc) @@ -353,6 +362,10 @@ func createPutSvcV2(sPut *putsvc.Service, keyStorage *util.KeyStorage) *putsvcV2 return putsvcV2.NewService(sPut, keyStorage) } +func createPatchSvc(sGet *getsvc.Service, sPut *putsvc.Service, keyStorage *util.KeyStorage) *patchsvc.Service { + return patchsvc.NewService(keyStorage, sGet, sPut) +} + func createSearchSvc(c *cfg, keyStorage *util.KeyStorage, traverseGen *util.TraverserGenerator, coreConstructor *cache.ClientCache) *searchsvc.Service { ls := c.cfgObject.cfgLocalStorage.localStorage @@ -425,7 +438,7 @@ func createDeleteServiceV2(sDelete *deletesvc.Service) *deletesvcV2.Service { } func createSplitService(c *cfg, sPutV2 *putsvcV2.Service, sGetV2 *getsvcV2.Service, - sSearchV2 *searchsvcV2.Service, sDeleteV2 *deletesvcV2.Service, + sSearchV2 *searchsvcV2.Service, sDeleteV2 *deletesvcV2.Service, sPatch *patchsvc.Service, ) *objectService.TransportSplitter { return objectService.NewTransportSplitter( c.cfgGRPC.maxChunkSize, @@ -435,6 +448,7 @@ func createSplitService(c *cfg, sPutV2 *putsvcV2.Service, sGetV2 *getsvcV2.Servi search: sSearchV2, get: sGetV2, delete: sDeleteV2, + patch: sPatch, }, ) } diff --git a/go.mod b/go.mod index 196b4d4639..93ed7d7505 100644 --- a/go.mod +++ b/go.mod @@ -4,14 +4,14 @@ go 1.21 require ( code.gitea.io/sdk/gitea v0.17.1 - git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.16.1-0.20240726072425-3dfa2f4fd65e + git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.16.1-0.20240813155151-d112a28d382f git.frostfs.info/TrueCloudLab/frostfs-contract v0.19.3-0.20240621131249-49e5270f673e git.frostfs.info/TrueCloudLab/frostfs-crypto v0.6.0 git.frostfs.info/TrueCloudLab/frostfs-locode-db v0.4.1-0.20240710074952-65761deb5c0d git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20231101111734-b3ad3335ff65 - git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20240726111349-9da46f566fec + git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20240813155821-98aabc45a720 git.frostfs.info/TrueCloudLab/hrw v1.2.1 - git.frostfs.info/TrueCloudLab/policy-engine v0.0.0-20240712081403-2628f6184984 + git.frostfs.info/TrueCloudLab/policy-engine v0.0.0-20240814080254-96225afacb88 git.frostfs.info/TrueCloudLab/tzhash v1.8.0 git.frostfs.info/TrueCloudLab/zapjournald v0.0.0-20240124114243-cb2e66427d02 github.com/cheggaaa/pb v1.0.29 diff --git a/go.sum b/go.sum index bd6d858829..803a065c3d 100644 --- a/go.sum +++ b/go.sum @@ -1,7 +1,7 @@ code.gitea.io/sdk/gitea v0.17.1 h1:3jCPOG2ojbl8AcfaUCRYLT5MUcBMFwS0OSK2mA5Zok8= code.gitea.io/sdk/gitea v0.17.1/go.mod h1:aCnBqhHpoEWA180gMbaCtdX9Pl6BWBAuuP2miadoTNM= -git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.16.1-0.20240726072425-3dfa2f4fd65e h1:gEWT+70E/RvGkxtSv+PlyUN2vtJVymhQa1mypvrXukM= -git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.16.1-0.20240726072425-3dfa2f4fd65e/go.mod h1:OBDSr+DqV1z4VDouoX3YMleNc4DPBVBWTG3WDT2PK1o= +git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.16.1-0.20240813155151-d112a28d382f h1:xrJqsXOZeSkBFMSyN+PQ9DiCGxVULU3VIN/tuH/vtb8= +git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.16.1-0.20240813155151-d112a28d382f/go.mod h1:mc7j6Cc1GU1tJZNmDwEYiJJ339biNnU1Bz3wZGogMe0= git.frostfs.info/TrueCloudLab/frostfs-contract v0.19.3-0.20240621131249-49e5270f673e h1:kcBqZBiFIUBATUqEuvVigtkJJWQ2Gug/eYXn967o3M4= git.frostfs.info/TrueCloudLab/frostfs-contract v0.19.3-0.20240621131249-49e5270f673e/go.mod h1:F/fe1OoIDKr5Bz99q4sriuHDuf3aZefZy9ZsCqEtgxc= git.frostfs.info/TrueCloudLab/frostfs-crypto v0.6.0 h1:FxqFDhQYYgpe41qsIHVOcdzSVCB8JNSfPG7Uk4r2oSk= @@ -10,14 +10,14 @@ git.frostfs.info/TrueCloudLab/frostfs-locode-db v0.4.1-0.20240710074952-65761deb git.frostfs.info/TrueCloudLab/frostfs-locode-db v0.4.1-0.20240710074952-65761deb5c0d/go.mod h1:7ZZq8iguY7qFsXajdHGmZd2AW4QbucyrJwhbsRfOfek= git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20231101111734-b3ad3335ff65 h1:PaZ8GpnUoXxUoNsc1qp36bT2u7FU+neU4Jn9cl8AWqI= git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20231101111734-b3ad3335ff65/go.mod h1:6aAX80dvJ3r5fjN9CzzPglRptoiPgIC9KFGGsUA+1Hw= -git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20240726111349-9da46f566fec h1:A09Swh7yogmmiABUf7Ht6MTQXJ07MyGx4+ziUQNelec= -git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20240726111349-9da46f566fec/go.mod h1:DlJmgV4/qkFkx2ab+YWznlMijiF2yZHnrJswJOB7XGs= +git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20240813155821-98aabc45a720 h1:15UXpW42bfshIv/X5kww92jG2o0drHgsdFd+UJ6zD7g= +git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20240813155821-98aabc45a720/go.mod h1:XRX/bBQsDJKr040N/a0YnDhxJqaUv1XyMVj3qxnb5K0= git.frostfs.info/TrueCloudLab/hrw v1.2.1 h1:ccBRK21rFvY5R1WotI6LNoPlizk7qSvdfD8lNIRudVc= git.frostfs.info/TrueCloudLab/hrw v1.2.1/go.mod h1:C1Ygde2n843yTZEQ0FP69jYiuaYV0kriLvP4zm8JuvM= git.frostfs.info/TrueCloudLab/neoneo-go v0.106.1-0.20240726093631-5481339d6928 h1:LK3mCkNZkY48eBA9jnk1N0eQZLsZhOG+XYw4EBoKUjM= git.frostfs.info/TrueCloudLab/neoneo-go v0.106.1-0.20240726093631-5481339d6928/go.mod h1:bZyJexBlrja4ngxiBgo8by5pVHuAbhg9l09/8yVGDyg= -git.frostfs.info/TrueCloudLab/policy-engine v0.0.0-20240712081403-2628f6184984 h1:O3F2Grz07RWZ68mRz1xsYsNPNvZLwY00BM+xoYb1kNk= -git.frostfs.info/TrueCloudLab/policy-engine v0.0.0-20240712081403-2628f6184984/go.mod h1:SgioiGhQNWqiV5qpFAXRDJF81SEFRBhtwGEiU0FViyA= +git.frostfs.info/TrueCloudLab/policy-engine v0.0.0-20240814080254-96225afacb88 h1:vgbfkcnIexZUm3vREBBSa/Gv1Whjd1SFCUd0A+IaGPQ= +git.frostfs.info/TrueCloudLab/policy-engine v0.0.0-20240814080254-96225afacb88/go.mod h1:SgioiGhQNWqiV5qpFAXRDJF81SEFRBhtwGEiU0FViyA= git.frostfs.info/TrueCloudLab/rfc6979 v0.4.0 h1:M2KR3iBj7WpY3hP10IevfIB9MURr4O9mwVfJ+SjT3HA= git.frostfs.info/TrueCloudLab/rfc6979 v0.4.0/go.mod h1:okpbKfVYf/BpejtfFTfhZqFP+sZ8rsHrP8Rr/jYPNRc= git.frostfs.info/TrueCloudLab/tzhash v1.8.0 h1:UFMnUIk0Zh17m8rjGHJMqku2hCgaXDqjqZzS4gsb4UA= diff --git a/pkg/network/transport/object/grpc/service.go b/pkg/network/transport/object/grpc/service.go index 7c6b395d5a..d55e3d87f9 100644 --- a/pkg/network/transport/object/grpc/service.go +++ b/pkg/network/transport/object/grpc/service.go @@ -24,6 +24,48 @@ func New(c objectSvc.ServiceServer) *Server { } } +// Patch opens internal Object patch stream and feeds it by the data read from gRPC stream. +func (s *Server) Patch(gStream objectGRPC.ObjectService_PatchServer) error { + stream, err := s.srv.Patch() + if err != nil { + return err + } + + for { + req, err := gStream.Recv() + if err != nil { + if errors.Is(err, io.EOF) { + resp, err := stream.CloseAndRecv(gStream.Context()) + if err != nil { + return err + } + + return gStream.SendAndClose(resp.ToGRPCMessage().(*objectGRPC.PatchResponse)) + } + + return err + } + + patchReq := new(object.PatchRequest) + if err := patchReq.FromGRPCMessage(req); err != nil { + return err + } + + if err := stream.Send(gStream.Context(), patchReq); err != nil { + if errors.Is(err, util.ErrAbortStream) { + resp, err := stream.CloseAndRecv(gStream.Context()) + if err != nil { + return err + } + + return gStream.SendAndClose(resp.ToGRPCMessage().(*objectGRPC.PatchResponse)) + } + + return err + } + } +} + // Put opens internal Object service Put stream and overtakes data from gRPC stream to it. func (s *Server) Put(gStream objectGRPC.ObjectService_PutServer) error { stream, err := s.srv.Put() diff --git a/pkg/services/object/acl/v2/service.go b/pkg/services/object/acl/v2/service.go index 3e128836fc..58557d6111 100644 --- a/pkg/services/object/acl/v2/service.go +++ b/pkg/services/object/acl/v2/service.go @@ -249,6 +249,26 @@ func (b Service) Put() (object.PutObjectStream, error) { }, err } +type patchStreamBasicChecker struct { + next object.PatchObjectstream +} + +func (p patchStreamBasicChecker) Send(ctx context.Context, request *objectV2.PatchRequest) error { + return p.next.Send(ctx, request) +} + +func (p patchStreamBasicChecker) CloseAndRecv(ctx context.Context) (*objectV2.PatchResponse, error) { + return p.next.CloseAndRecv(ctx) +} + +func (b Service) Patch() (object.PatchObjectstream, error) { + streamer, err := b.next.Patch() + + return &patchStreamBasicChecker{ + next: streamer, + }, err +} + func (b Service) Head( ctx context.Context, request *objectV2.HeadRequest, diff --git a/pkg/services/object/ape/service.go b/pkg/services/object/ape/service.go index 2adb1b7367..f005d0873a 100644 --- a/pkg/services/object/ape/service.go +++ b/pkg/services/object/ape/service.go @@ -204,6 +204,26 @@ func (c *Service) Put() (objectSvc.PutObjectStream, error) { }, err } +type patchStreamBasicChecker struct { + next objectSvc.PatchObjectstream +} + +func (p *patchStreamBasicChecker) Send(ctx context.Context, request *objectV2.PatchRequest) error { + return p.next.Send(ctx, request) +} + +func (p patchStreamBasicChecker) CloseAndRecv(ctx context.Context) (*objectV2.PatchResponse, error) { + return p.next.CloseAndRecv(ctx) +} + +func (c *Service) Patch() (objectSvc.PatchObjectstream, error) { + streamer, err := c.next.Patch() + + return &patchStreamBasicChecker{ + next: streamer, + }, err +} + func (c *Service) Head(ctx context.Context, request *objectV2.HeadRequest) (*objectV2.HeadResponse, error) { cnrID, objID, err := getAddressParamsSDK(request.GetBody().GetAddress().GetContainerID(), request.GetBody().GetAddress().GetObjectID()) if err != nil { diff --git a/pkg/services/object/audit.go b/pkg/services/object/audit.go index 1305fa0087..680a96c403 100644 --- a/pkg/services/object/audit.go +++ b/pkg/services/object/audit.go @@ -170,3 +170,59 @@ func (a *auditPutStream) Send(ctx context.Context, req *object.PutRequest) error } return err } + +type auditPatchStream struct { + stream PatchObjectstream + log *logger.Logger + + failed bool + key []byte + containerID *refs.ContainerID + objectID *refs.ObjectID +} + +func (a *auditService) Patch() (PatchObjectstream, error) { + res, err := a.next.Patch() + if !a.enabled.Load() { + return res, err + } + if err != nil { + audit.LogRequest(a.log, objectGRPC.ObjectService_Patch_FullMethodName, nil, nil, false) + return res, err + } + return &auditPatchStream{ + stream: res, + log: a.log, + }, nil +} + +// CloseAndRecv implements PutObjectStream. +func (a *auditPatchStream) CloseAndRecv(ctx context.Context) (*object.PatchResponse, error) { + resp, err := a.stream.CloseAndRecv(ctx) + if err != nil { + a.failed = true + } + a.objectID = resp.GetBody().ObjectID + audit.LogRequestWithKey(a.log, objectGRPC.ObjectService_Patch_FullMethodName, a.key, + audit.TargetFromContainerIDObjectID(a.containerID, a.objectID), + !a.failed) + return resp, err +} + +// Send implements PutObjectStream. +func (a *auditPatchStream) Send(ctx context.Context, req *object.PatchRequest) error { + a.containerID = req.GetBody().GetAddress().GetContainerID() + a.objectID = req.GetBody().GetAddress().GetObjectID() + a.key = req.GetVerificationHeader().GetBodySignature().GetKey() + + err := a.stream.Send(ctx, req) + if err != nil { + a.failed = true + } + if !errors.Is(err, util.ErrAbortStream) { // CloseAndRecv will not be called, so log here + audit.LogRequestWithKey(a.log, objectGRPC.ObjectService_Patch_FullMethodName, a.key, + audit.TargetFromContainerIDObjectID(a.containerID, a.objectID), + !a.failed) + } + return err +} diff --git a/pkg/services/object/common.go b/pkg/services/object/common.go index 73ee9f81b3..841a3d021b 100644 --- a/pkg/services/object/common.go +++ b/pkg/services/object/common.go @@ -48,6 +48,14 @@ func (x *Common) Put() (PutObjectStream, error) { return x.nextHandler.Put() } +func (x *Common) Patch() (PatchObjectstream, error) { + if x.state.IsMaintenance() { + return nil, new(apistatus.NodeUnderMaintenance) + } + + return x.nextHandler.Patch() +} + func (x *Common) Head(ctx context.Context, req *objectV2.HeadRequest) (*objectV2.HeadResponse, error) { if x.state.IsMaintenance() { return nil, new(apistatus.NodeUnderMaintenance) diff --git a/pkg/services/object/metrics.go b/pkg/services/object/metrics.go index f972f43ae6..b64f879ac1 100644 --- a/pkg/services/object/metrics.go +++ b/pkg/services/object/metrics.go @@ -27,6 +27,12 @@ type ( start time.Time } + patchStreamMetric struct { + stream PatchObjectstream + metrics MetricRegister + start time.Time + } + MetricRegister interface { AddRequestDuration(string, time.Duration, bool) AddPayloadSize(string, int) @@ -76,6 +82,24 @@ func (m MetricCollector) Put() (PutObjectStream, error) { return m.next.Put() } +func (m MetricCollector) Patch() (PatchObjectstream, error) { + if m.enabled { + t := time.Now() + + stream, err := m.next.Patch() + if err != nil { + return nil, err + } + + return &patchStreamMetric{ + stream: stream, + metrics: m.metrics, + start: t, + }, nil + } + return m.next.Patch() +} + func (m MetricCollector) PutSingle(ctx context.Context, request *object.PutSingleRequest) (*object.PutSingleResponse, error) { if m.enabled { t := time.Now() @@ -189,3 +213,16 @@ func (s putStreamMetric) CloseAndRecv(ctx context.Context) (*object.PutResponse, return res, err } +func (s patchStreamMetric) Send(ctx context.Context, req *object.PatchRequest) error { + s.metrics.AddPayloadSize("Patch", len(req.GetBody().GetPatch().Chunk)) + + return s.stream.Send(ctx, req) +} + +func (s patchStreamMetric) CloseAndRecv(ctx context.Context) (*object.PatchResponse, error) { + res, err := s.stream.CloseAndRecv(ctx) + + s.metrics.AddRequestDuration("Patch", time.Since(s.start), err == nil) + + return res, err +} diff --git a/pkg/services/object/patch/service.go b/pkg/services/object/patch/service.go new file mode 100644 index 0000000000..df6926e84b --- /dev/null +++ b/pkg/services/object/patch/service.go @@ -0,0 +1,22 @@ +package patchsvc + +import ( + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object" + getsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/get" + putsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/put" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util" +) + +// Service implements Put operation of Object service v2. +type Service struct { +} + +// NewService constructs Service instance from provided options. +func NewService(_ *util.KeyStorage, _ *getsvc.Service, _ *putsvc.Service) *Service { + return &Service{} +} + +// Put calls internal service and returns v2 object streamer. +func (s *Service) Patch() (object.PatchObjectstream, error) { + return &Streamer{}, nil +} diff --git a/pkg/services/object/patch/streamer.go b/pkg/services/object/patch/streamer.go new file mode 100644 index 0000000000..5d021b9c3e --- /dev/null +++ b/pkg/services/object/patch/streamer.go @@ -0,0 +1,28 @@ +package patchsvc + +import ( + "context" + + "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object" + "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" +) + +// Streamer for the patch handler is a pipeline that merges two incoming +// streams of patches and original object payload chunks. +// The merged result is fed to Put stream target. +type Streamer struct{} + +func (s *Streamer) Send(ctx context.Context, _ *object.PatchRequest) error { + _, span := tracing.StartSpanFromContext(ctx, "patch.streamer.Send") + defer span.End() + + return nil +} + +func (s *Streamer) CloseAndRecv(_ context.Context) (*object.PatchResponse, error) { + return &object.PatchResponse{ + Body: &object.PatchResponseBody{ + ObjectID: nil, + }, + }, nil +} diff --git a/pkg/services/object/response.go b/pkg/services/object/response.go index a10f26a349..c85259c1f7 100644 --- a/pkg/services/object/response.go +++ b/pkg/services/object/response.go @@ -37,6 +37,11 @@ type putStreamResponser struct { respSvc *response.Service } +type patchStreamResponser struct { + stream PatchObjectstream + respSvc *response.Service +} + // NewResponseService returns object service instance that passes internal service // call to response service. func NewResponseService(objSvc ServiceServer, respSvc *response.Service) *ResponseService { @@ -87,6 +92,35 @@ func (s *ResponseService) Put() (PutObjectStream, error) { }, nil } +func (s *patchStreamResponser) Send(ctx context.Context, req *object.PatchRequest) error { + if err := s.stream.Send(ctx, req); err != nil { + return fmt.Errorf("could not send the request: %w", err) + } + return nil +} + +func (s *patchStreamResponser) CloseAndRecv(ctx context.Context) (*object.PatchResponse, error) { + r, err := s.stream.CloseAndRecv(ctx) + if err != nil { + return nil, fmt.Errorf("could not close stream and receive response: %w", err) + } + + s.respSvc.SetMeta(r) + return r, nil +} + +func (s *ResponseService) Patch() (PatchObjectstream, error) { + stream, err := s.svc.Patch() + if err != nil { + return nil, fmt.Errorf("could not create Put object streamer: %w", err) + } + + return &patchStreamResponser{ + stream: stream, + respSvc: s.respSvc, + }, nil +} + func (s *ResponseService) PutSingle(ctx context.Context, req *object.PutSingleRequest) (*object.PutSingleResponse, error) { resp, err := s.svc.PutSingle(ctx, req) if err != nil { diff --git a/pkg/services/object/server.go b/pkg/services/object/server.go index 73b88f2339..c1b036ab33 100644 --- a/pkg/services/object/server.go +++ b/pkg/services/object/server.go @@ -31,11 +31,18 @@ type PutObjectStream interface { CloseAndRecv(context.Context) (*object.PutResponse, error) } +// PatchObjectstream is an interface of FrostFS API v2 compatible patch streamer. +type PatchObjectstream interface { + Send(context.Context, *object.PatchRequest) error + CloseAndRecv(context.Context) (*object.PatchResponse, error) +} + // ServiceServer is an interface of utility // serving v2 Object service. type ServiceServer interface { Get(*object.GetRequest, GetObjectStream) error Put() (PutObjectStream, error) + Patch() (PatchObjectstream, error) Head(context.Context, *object.HeadRequest) (*object.HeadResponse, error) Search(*object.SearchRequest, SearchStream) error Delete(context.Context, *object.DeleteRequest) (*object.DeleteResponse, error) diff --git a/pkg/services/object/sign.go b/pkg/services/object/sign.go index 4bf581b787..631c539af3 100644 --- a/pkg/services/object/sign.go +++ b/pkg/services/object/sign.go @@ -35,6 +35,12 @@ type putStreamSigner struct { err error } +type patchStreamSigner struct { + sigSvc *util.SignService + stream PatchObjectstream + err error +} + type getRangeStreamSigner struct { GetObjectRangeStream sigSvc *util.SignService @@ -112,6 +118,42 @@ func (s *SignService) Put() (PutObjectStream, error) { }, nil } +func (s *patchStreamSigner) Send(ctx context.Context, req *object.PatchRequest) error { + if s.err = s.sigSvc.VerifyRequest(req); s.err != nil { + return util.ErrAbortStream + } + if s.err = s.stream.Send(ctx, req); s.err != nil { + return util.ErrAbortStream + } + return nil +} + +func (s *patchStreamSigner) CloseAndRecv(ctx context.Context) (resp *object.PatchResponse, err error) { + if s.err != nil { + err = s.err + resp = new(object.PatchResponse) + } else { + resp, err = s.stream.CloseAndRecv(ctx) + if err != nil { + return nil, fmt.Errorf("could not close stream and receive response: %w", err) + } + } + + return resp, s.sigSvc.SignResponse(resp, err) +} + +func (s *SignService) Patch() (PatchObjectstream, error) { + stream, err := s.svc.Patch() + if err != nil { + return nil, fmt.Errorf("could not create Put object streamer: %w", err) + } + + return &patchStreamSigner{ + stream: stream, + sigSvc: s.sigSvc, + }, nil +} + func (s *SignService) Head(ctx context.Context, req *object.HeadRequest) (*object.HeadResponse, error) { if err := s.sigSvc.VerifyRequest(req); err != nil { resp := new(object.HeadResponse) diff --git a/pkg/services/object/transport_splitter.go b/pkg/services/object/transport_splitter.go index 54e49cb129..5acfac06b6 100644 --- a/pkg/services/object/transport_splitter.go +++ b/pkg/services/object/transport_splitter.go @@ -91,6 +91,10 @@ func (c TransportSplitter) Put() (PutObjectStream, error) { return c.next.Put() } +func (c TransportSplitter) Patch() (PatchObjectstream, error) { + return c.next.Patch() +} + func (c TransportSplitter) Head(ctx context.Context, request *object.HeadRequest) (*object.HeadResponse, error) { return c.next.Head(ctx, request) }