From 2802f8c37ada1ad847834e95f3d0d778fff7c759 Mon Sep 17 00:00:00 2001 From: Airat Arifullin Date: Mon, 12 Aug 2024 13:01:57 +0300 Subject: [PATCH 1/5] [#1307] go.mod: Bump frostfs-sdk-go/frostfs-api-go/v2 versions * Also, resolve dependencies and conflicts for object service by creating stub for `Patch` method. Signed-off-by: Airat Arifullin --- cmd/frostfs-node/object.go | 18 +++++- go.mod | 6 +- go.sum | Bin 39512 -> 39512 bytes pkg/network/transport/object/grpc/service.go | 42 ++++++++++++++ pkg/services/object/acl/v2/service.go | 20 +++++++ pkg/services/object/ape/service.go | 20 +++++++ pkg/services/object/audit.go | 56 +++++++++++++++++++ pkg/services/object/common.go | 8 +++ pkg/services/object/metrics.go | 37 ++++++++++++ pkg/services/object/patch/service.go | 22 ++++++++ pkg/services/object/patch/streamer.go | 28 ++++++++++ pkg/services/object/response.go | 34 +++++++++++ pkg/services/object/server.go | 7 +++ pkg/services/object/sign.go | 42 ++++++++++++++ pkg/services/object/transport_splitter.go | 4 ++ 15 files changed, 339 insertions(+), 5 deletions(-) create mode 100644 pkg/services/object/patch/service.go create mode 100644 pkg/services/object/patch/streamer.go diff --git a/cmd/frostfs-node/object.go b/cmd/frostfs-node/object.go index 0124bf772..eef142415 100644 --- a/cmd/frostfs-node/object.go +++ b/cmd/frostfs-node/object.go @@ -28,6 +28,7 @@ import ( deletesvcV2 "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/delete/v2" getsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/get" getsvcV2 "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/get/v2" + patchsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/patch" putsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/put" putsvcV2 "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/put/v2" searchsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/search" @@ -54,6 +55,8 @@ type objectSvc struct { get *getsvcV2.Service delete *deletesvcV2.Service + + patch *patchsvc.Service } func (c *cfg) MaxObjectSize() uint64 { @@ -71,6 +74,10 @@ func (s *objectSvc) Put() (objectService.PutObjectStream, error) { return s.put.Put() } +func (s *objectSvc) Patch() (objectService.PatchObjectstream, error) { + return s.patch.Patch() +} + func (s *objectSvc) PutSingle(ctx context.Context, req *object.PutSingleRequest) (*object.PutSingleResponse, error) { return s.put.PutSingle(ctx, req) } @@ -181,10 +188,12 @@ func initObjectService(c *cfg) { sDeleteV2 := createDeleteServiceV2(sDelete) + sPatch := createPatchSvc(sGet, sPut, keyStorage) + // build service pipeline // grpc | audit | | signature | response | acl | ape | split - splitSvc := createSplitService(c, sPutV2, sGetV2, sSearchV2, sDeleteV2) + splitSvc := createSplitService(c, sPutV2, sGetV2, sSearchV2, sDeleteV2, sPatch) apeSvc := createAPEService(c, splitSvc) @@ -353,6 +362,10 @@ func createPutSvcV2(sPut *putsvc.Service, keyStorage *util.KeyStorage) *putsvcV2 return putsvcV2.NewService(sPut, keyStorage) } +func createPatchSvc(sGet *getsvc.Service, sPut *putsvc.Service, keyStorage *util.KeyStorage) *patchsvc.Service { + return patchsvc.NewService(keyStorage, sGet, sPut) +} + func createSearchSvc(c *cfg, keyStorage *util.KeyStorage, traverseGen *util.TraverserGenerator, coreConstructor *cache.ClientCache) *searchsvc.Service { ls := c.cfgObject.cfgLocalStorage.localStorage @@ -425,7 +438,7 @@ func createDeleteServiceV2(sDelete *deletesvc.Service) *deletesvcV2.Service { } func createSplitService(c *cfg, sPutV2 *putsvcV2.Service, sGetV2 *getsvcV2.Service, - sSearchV2 *searchsvcV2.Service, sDeleteV2 *deletesvcV2.Service, + sSearchV2 *searchsvcV2.Service, sDeleteV2 *deletesvcV2.Service, sPatch *patchsvc.Service, ) *objectService.TransportSplitter { return objectService.NewTransportSplitter( c.cfgGRPC.maxChunkSize, @@ -435,6 +448,7 @@ func createSplitService(c *cfg, sPutV2 *putsvcV2.Service, sGetV2 *getsvcV2.Servi search: sSearchV2, get: sGetV2, delete: sDeleteV2, + patch: sPatch, }, ) } diff --git a/go.mod b/go.mod index 09a098502..0187d0cd5 100644 --- a/go.mod +++ b/go.mod @@ -4,14 +4,14 @@ go 1.21 require ( code.gitea.io/sdk/gitea v0.17.1 - git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.16.1-0.20240726072425-3dfa2f4fd65e + git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.16.1-0.20240813155151-d112a28d382f git.frostfs.info/TrueCloudLab/frostfs-contract v0.19.3-0.20240621131249-49e5270f673e git.frostfs.info/TrueCloudLab/frostfs-crypto v0.6.0 git.frostfs.info/TrueCloudLab/frostfs-locode-db v0.4.1-0.20240710074952-65761deb5c0d git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20231101111734-b3ad3335ff65 - git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20240726111349-9da46f566fec + git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20240813155821-98aabc45a720 git.frostfs.info/TrueCloudLab/hrw v1.2.1 - git.frostfs.info/TrueCloudLab/policy-engine v0.0.0-20240712081403-2628f6184984 + git.frostfs.info/TrueCloudLab/policy-engine v0.0.0-20240814080254-96225afacb88 git.frostfs.info/TrueCloudLab/tzhash v1.8.0 git.frostfs.info/TrueCloudLab/zapjournald v0.0.0-20240124114243-cb2e66427d02 github.com/cheggaaa/pb v1.0.29 diff --git a/go.sum b/go.sum index 1034ff61fe818a58da31dd01d20510fac1cbf80a..c15f10ed2d348070865bc8469f11c8360b5e958b 100644 GIT binary patch delta 445 zcmZwBOHRT-007WP1V3976I`)0A=sH|2c|AeEQLaW$|nPb-L!=kNI@u;!YL#i!6U#m zbj{KgTd$xsER5j-FK@Sv?6#5DeU3mLVhk}%Sr8(Ih{Y4sOE~a8n5q*`f9iA_Ln&Wr zEVN{`Cd&P+5;SR<^39^g&0Y$eKQqW0L~mE`@$zwuv&VPt*a}V8%nZ_56DkzW)Q;xL z^IW^Hsyr_meQiXcwB+Xfq1^*(xaIR4lE9QC@+8B)!Y)zTCQK;sx_k7OrM zCye?~UsRd{eiDofTndkg^~2fa<}3Db^79k`0SF5zF^v#rJ!Tq&giqf#dP8$m>;@g` o@_s!hNe#v+K5RJy3pVmuYJtaOktx;MFZp(UbA4Rf)GuPIZy`{Hr2qf` delta 445 zcmZwCOKyTd0D$3Ws(oymG_fm}O^9`77-z7HCK41w1>YdI;EW)Ods*L9fxS zU3TTRS7>q7g%|kp|A&42u#dkVixd$+5k|QTJ2ZR9#O9FXjPw{jjfz@}p`e&;c%tPm zn;uiMgSLXarKASVxZZGy1`)SCCT^p~GuppK!IqqninBATWcjGqXLK!` z+VbReQbIwuJe^C6wN&M4M_l}&>Fx97&F&}p6)RGR00%K(Gl(G0B#^^-4xhH!5)$Vj spbf1{IHwUpf1xjwn&NdUYaIYyW_(xgLrbwv Date: Mon, 12 Aug 2024 17:11:10 +0300 Subject: [PATCH 2/5] [#1307] object: Implement `Patch` method Signed-off-by: Airat Arifullin --- cmd/frostfs-node/object.go | 2 +- pkg/services/object/acl/v2/service.go | 20 +- pkg/services/object/acl/v2/util.go | 8 +- pkg/services/object/ape/checker_test.go | 19 +- pkg/services/object/ape/service.go | 20 +- pkg/services/object/audit.go | 19 +- pkg/services/object/common.go | 2 +- pkg/services/object/get/prm.go | 4 + pkg/services/object/metrics.go | 6 +- pkg/services/object/patch/range_provider.go | 63 ++++++ pkg/services/object/patch/service.go | 30 ++- pkg/services/object/patch/streamer.go | 215 +++++++++++++++++++- pkg/services/object/patch/util.go | 53 +++++ pkg/services/object/put/prm.go | 11 + pkg/services/object/put/streamer.go | 23 ++- pkg/services/object/response.go | 4 +- pkg/services/object/server.go | 6 +- pkg/services/object/sign.go | 4 +- pkg/services/object/transport_splitter.go | 2 +- 19 files changed, 430 insertions(+), 81 deletions(-) create mode 100644 pkg/services/object/patch/range_provider.go create mode 100644 pkg/services/object/patch/util.go diff --git a/cmd/frostfs-node/object.go b/cmd/frostfs-node/object.go index eef142415..467c5901b 100644 --- a/cmd/frostfs-node/object.go +++ b/cmd/frostfs-node/object.go @@ -74,7 +74,7 @@ func (s *objectSvc) Put() (objectService.PutObjectStream, error) { return s.put.Put() } -func (s *objectSvc) Patch() (objectService.PatchObjectstream, error) { +func (s *objectSvc) Patch() (objectService.PatchObjectStream, error) { return s.patch.Patch() } diff --git a/pkg/services/object/acl/v2/service.go b/pkg/services/object/acl/v2/service.go index 58557d611..a9ddad7ca 100644 --- a/pkg/services/object/acl/v2/service.go +++ b/pkg/services/object/acl/v2/service.go @@ -249,24 +249,8 @@ func (b Service) Put() (object.PutObjectStream, error) { }, err } -type patchStreamBasicChecker struct { - next object.PatchObjectstream -} - -func (p patchStreamBasicChecker) Send(ctx context.Context, request *objectV2.PatchRequest) error { - return p.next.Send(ctx, request) -} - -func (p patchStreamBasicChecker) CloseAndRecv(ctx context.Context) (*objectV2.PatchResponse, error) { - return p.next.CloseAndRecv(ctx) -} - -func (b Service) Patch() (object.PatchObjectstream, error) { - streamer, err := b.next.Patch() - - return &patchStreamBasicChecker{ - next: streamer, - }, err +func (b Service) Patch() (object.PatchObjectStream, error) { + return b.next.Patch() } func (b Service) Head( diff --git a/pkg/services/object/acl/v2/util.go b/pkg/services/object/acl/v2/util.go index feda6a3cf..76fd9651d 100644 --- a/pkg/services/object/acl/v2/util.go +++ b/pkg/services/object/acl/v2/util.go @@ -174,7 +174,7 @@ func isOwnerFromKey(id user.ID, key *keys.PublicKey) bool { func assertVerb(tok sessionSDK.Object, op acl.Op) bool { switch op { case acl.OpObjectPut: - return tok.AssertVerb(sessionSDK.VerbObjectPut, sessionSDK.VerbObjectDelete) + return tok.AssertVerb(sessionSDK.VerbObjectPut, sessionSDK.VerbObjectDelete, sessionSDK.VerbObjectPatch) case acl.OpObjectDelete: return tok.AssertVerb(sessionSDK.VerbObjectDelete) case acl.OpObjectGet: @@ -185,11 +185,13 @@ func assertVerb(tok sessionSDK.Object, op acl.Op) bool { sessionSDK.VerbObjectGet, sessionSDK.VerbObjectDelete, sessionSDK.VerbObjectRange, - sessionSDK.VerbObjectRangeHash) + sessionSDK.VerbObjectRangeHash, + sessionSDK.VerbObjectPatch, + ) case acl.OpObjectSearch: return tok.AssertVerb(sessionSDK.VerbObjectSearch, sessionSDK.VerbObjectDelete) case acl.OpObjectRange: - return tok.AssertVerb(sessionSDK.VerbObjectRange, sessionSDK.VerbObjectRangeHash) + return tok.AssertVerb(sessionSDK.VerbObjectRange, sessionSDK.VerbObjectRangeHash, sessionSDK.VerbObjectPatch) case acl.OpObjectHash: return tok.AssertVerb(sessionSDK.VerbObjectRangeHash) } diff --git a/pkg/services/object/ape/checker_test.go b/pkg/services/object/ape/checker_test.go index 090f6a83c..afe19fc51 100644 --- a/pkg/services/object/ape/checker_test.go +++ b/pkg/services/object/ape/checker_test.go @@ -518,7 +518,22 @@ func TestAPECheck_BearerTokenOverrides(t *testing.T) { ls := inmemory.NewInmemoryLocalStorage() ms := inmemory.NewInmemoryMorphRuleChainStorage() - checker := NewChecker(ls, ms, headerProvider, frostfsidProvider, nil, &stMock{}, nil, nil) + node1Key, err := keys.NewPrivateKey() + require.NoError(t, err) + node1 := netmapSDK.NodeInfo{} + node1.SetPublicKey(node1Key.PublicKey().Bytes()) + netmap := &netmapSDK.NetMap{} + netmap.SetEpoch(100) + netmap.SetNodes([]netmapSDK.NodeInfo{node1}) + + nm := &netmapStub{ + currentEpoch: 100, + netmaps: map[uint64]*netmapSDK.NetMap{ + 100: netmap, + }, + } + + checker := NewChecker(ls, ms, headerProvider, frostfsidProvider, nm, &stMock{}, nil, nil) prm := Prm{ Method: method, @@ -541,7 +556,7 @@ func TestAPECheck_BearerTokenOverrides(t *testing.T) { } } - err := checker.CheckAPE(context.Background(), prm) + err = checker.CheckAPE(context.Background(), prm) if test.expectAPEErr { require.Error(t, err) } else { diff --git a/pkg/services/object/ape/service.go b/pkg/services/object/ape/service.go index f005d0873..64dd19c24 100644 --- a/pkg/services/object/ape/service.go +++ b/pkg/services/object/ape/service.go @@ -204,24 +204,8 @@ func (c *Service) Put() (objectSvc.PutObjectStream, error) { }, err } -type patchStreamBasicChecker struct { - next objectSvc.PatchObjectstream -} - -func (p *patchStreamBasicChecker) Send(ctx context.Context, request *objectV2.PatchRequest) error { - return p.next.Send(ctx, request) -} - -func (p patchStreamBasicChecker) CloseAndRecv(ctx context.Context) (*objectV2.PatchResponse, error) { - return p.next.CloseAndRecv(ctx) -} - -func (c *Service) Patch() (objectSvc.PatchObjectstream, error) { - streamer, err := c.next.Patch() - - return &patchStreamBasicChecker{ - next: streamer, - }, err +func (c *Service) Patch() (objectSvc.PatchObjectStream, error) { + return c.next.Patch() } func (c *Service) Head(ctx context.Context, request *objectV2.HeadRequest) (*objectV2.HeadResponse, error) { diff --git a/pkg/services/object/audit.go b/pkg/services/object/audit.go index 680a96c40..b924386d1 100644 --- a/pkg/services/object/audit.go +++ b/pkg/services/object/audit.go @@ -172,16 +172,18 @@ func (a *auditPutStream) Send(ctx context.Context, req *object.PutRequest) error } type auditPatchStream struct { - stream PatchObjectstream + stream PatchObjectStream log *logger.Logger failed bool key []byte containerID *refs.ContainerID objectID *refs.ObjectID + + nonFirstSend bool } -func (a *auditService) Patch() (PatchObjectstream, error) { +func (a *auditService) Patch() (PatchObjectStream, error) { res, err := a.next.Patch() if !a.enabled.Load() { return res, err @@ -196,7 +198,7 @@ func (a *auditService) Patch() (PatchObjectstream, error) { }, nil } -// CloseAndRecv implements PutObjectStream. +// CloseAndRecv implements PatchObjectStream. func (a *auditPatchStream) CloseAndRecv(ctx context.Context) (*object.PatchResponse, error) { resp, err := a.stream.CloseAndRecv(ctx) if err != nil { @@ -209,11 +211,14 @@ func (a *auditPatchStream) CloseAndRecv(ctx context.Context) (*object.PatchRespo return resp, err } -// Send implements PutObjectStream. +// Send implements PatchObjectStream. func (a *auditPatchStream) Send(ctx context.Context, req *object.PatchRequest) error { - a.containerID = req.GetBody().GetAddress().GetContainerID() - a.objectID = req.GetBody().GetAddress().GetObjectID() - a.key = req.GetVerificationHeader().GetBodySignature().GetKey() + if !a.nonFirstSend { + a.containerID = req.GetBody().GetAddress().GetContainerID() + a.objectID = req.GetBody().GetAddress().GetObjectID() + a.key = req.GetVerificationHeader().GetBodySignature().GetKey() + a.nonFirstSend = true + } err := a.stream.Send(ctx, req) if err != nil { diff --git a/pkg/services/object/common.go b/pkg/services/object/common.go index 841a3d021..f48cc5b3d 100644 --- a/pkg/services/object/common.go +++ b/pkg/services/object/common.go @@ -48,7 +48,7 @@ func (x *Common) Put() (PutObjectStream, error) { return x.nextHandler.Put() } -func (x *Common) Patch() (PatchObjectstream, error) { +func (x *Common) Patch() (PatchObjectStream, error) { if x.state.IsMaintenance() { return nil, new(apistatus.NodeUnderMaintenance) } diff --git a/pkg/services/object/get/prm.go b/pkg/services/object/get/prm.go index cbdb7a3e2..94c07381c 100644 --- a/pkg/services/object/get/prm.go +++ b/pkg/services/object/get/prm.go @@ -124,6 +124,10 @@ func (p *commonPrm) SetRequestForwarder(f RequestForwarder) { p.forwarder = f } +func (p *commonPrm) SetSignerKey(signerKey *ecdsa.PrivateKey) { + p.signerKey = signerKey +} + // WithAddress sets object address to be read. func (p *commonPrm) WithAddress(addr oid.Address) { p.addr = addr diff --git a/pkg/services/object/metrics.go b/pkg/services/object/metrics.go index b64f879ac..e53b7584f 100644 --- a/pkg/services/object/metrics.go +++ b/pkg/services/object/metrics.go @@ -28,7 +28,7 @@ type ( } patchStreamMetric struct { - stream PatchObjectstream + stream PatchObjectStream metrics MetricRegister start time.Time } @@ -82,7 +82,7 @@ func (m MetricCollector) Put() (PutObjectStream, error) { return m.next.Put() } -func (m MetricCollector) Patch() (PatchObjectstream, error) { +func (m MetricCollector) Patch() (PatchObjectStream, error) { if m.enabled { t := time.Now() @@ -214,7 +214,7 @@ func (s putStreamMetric) CloseAndRecv(ctx context.Context) (*object.PutResponse, return res, err } func (s patchStreamMetric) Send(ctx context.Context, req *object.PatchRequest) error { - s.metrics.AddPayloadSize("Patch", len(req.GetBody().GetPatch().Chunk)) + s.metrics.AddPayloadSize("Patch", len(req.GetBody().GetPatch().GetChunk())) return s.stream.Send(ctx, req) } diff --git a/pkg/services/object/patch/range_provider.go b/pkg/services/object/patch/range_provider.go new file mode 100644 index 000000000..755c5bf60 --- /dev/null +++ b/pkg/services/object/patch/range_provider.go @@ -0,0 +1,63 @@ +package patchsvc + +import ( + "context" + "crypto/ecdsa" + "io" + + getsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/get" + objectUtil "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util" + objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" + oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" + patcherSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/patcher" +) + +func (p *pipeChunkWriter) WriteChunk(_ context.Context, chunk []byte) error { + _, err := p.wr.Write(chunk) + return err +} + +type rangeProvider struct { + getSvc *getsvc.Service + + addr oid.Address + + commonPrm *objectUtil.CommonPrm + + localNodeKey *ecdsa.PrivateKey +} + +var _ patcherSDK.RangeProvider = (*rangeProvider)(nil) + +func (r *rangeProvider) GetRange(ctx context.Context, rng *objectSDK.Range) io.Reader { + pipeReader, pipeWriter := io.Pipe() + + var rngPrm getsvc.RangePrm + rngPrm.SetSignerKey(r.localNodeKey) + rngPrm.SetCommonParameters(r.commonPrm) + + rngPrm.WithAddress(r.addr) + rngPrm.SetChunkWriter(&pipeChunkWriter{ + wr: pipeWriter, + }) + rngPrm.SetRange(rng) + + getRangeErr := make(chan error) + + go func() { + defer pipeWriter.Close() + + select { + case <-ctx.Done(): + pipeWriter.CloseWithError(ctx.Err()) + case err := <-getRangeErr: + pipeWriter.CloseWithError(err) + } + }() + + go func() { + getRangeErr <- r.getSvc.GetRange(ctx, rngPrm) + }() + + return pipeReader +} diff --git a/pkg/services/object/patch/service.go b/pkg/services/object/patch/service.go index df6926e84..c4ab15abf 100644 --- a/pkg/services/object/patch/service.go +++ b/pkg/services/object/patch/service.go @@ -9,14 +9,36 @@ import ( // Service implements Put operation of Object service v2. type Service struct { + keyStorage *util.KeyStorage + + getSvc *getsvc.Service + + putSvc *putsvc.Service } // NewService constructs Service instance from provided options. -func NewService(_ *util.KeyStorage, _ *getsvc.Service, _ *putsvc.Service) *Service { - return &Service{} +func NewService(ks *util.KeyStorage, getSvc *getsvc.Service, putSvc *putsvc.Service) *Service { + return &Service{ + keyStorage: ks, + + getSvc: getSvc, + + putSvc: putSvc, + } } // Put calls internal service and returns v2 object streamer. -func (s *Service) Patch() (object.PatchObjectstream, error) { - return &Streamer{}, nil +func (s *Service) Patch() (object.PatchObjectStream, error) { + nodeKey, err := s.keyStorage.GetKey(nil) + if err != nil { + return nil, err + } + + return &Streamer{ + getSvc: s.getSvc, + + putSvc: s.putSvc, + + localNodeKey: nodeKey, + }, nil } diff --git a/pkg/services/object/patch/streamer.go b/pkg/services/object/patch/streamer.go index 5d021b9c3..84363530e 100644 --- a/pkg/services/object/patch/streamer.go +++ b/pkg/services/object/patch/streamer.go @@ -2,27 +2,220 @@ package patchsvc import ( "context" + "crypto/ecdsa" + "errors" + "fmt" + "io" - "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object" + objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object" + refsV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/refs" + getsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/get" + putsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/put" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" + objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" + oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/patcher" ) -// Streamer for the patch handler is a pipeline that merges two incoming -// streams of patches and original object payload chunks. -// The merged result is fed to Put stream target. -type Streamer struct{} +// Streamer for the patch handler is a pipeline that merges two incoming streams of patches +// and original object payload chunks. The merged result is fed to Put stream target. +type Streamer struct { + // Patcher must be initialized at first Streamer.Send call. + patcher patcher.PatchApplier -func (s *Streamer) Send(ctx context.Context, _ *object.PatchRequest) error { - _, span := tracing.StartSpanFromContext(ctx, "patch.streamer.Send") + nonFirstSend bool + + getSvc *getsvc.Service + + putSvc *putsvc.Service + + localNodeKey *ecdsa.PrivateKey +} + +type pipeChunkWriter struct { + wr *io.PipeWriter +} + +type headResponseWriter struct { + body *objectV2.HeadResponseBody +} + +func (w *headResponseWriter) WriteHeader(_ context.Context, hdr *objectSDK.Object) error { + w.body.SetHeaderPart(toFullObjectHeader(hdr)) + return nil +} + +func toFullObjectHeader(hdr *objectSDK.Object) objectV2.GetHeaderPart { + obj := hdr.ToV2() + + hs := new(objectV2.HeaderWithSignature) + hs.SetHeader(obj.GetHeader()) + hs.SetSignature(obj.GetSignature()) + + return hs +} + +func (s *Streamer) init(ctx context.Context, req *objectV2.PatchRequest) error { + hdrWithSig, addr, err := s.readHeader(ctx, req) + if err != nil { + return err + } + + commonPrm, err := util.CommonPrmFromV2(req) + if err != nil { + return err + } + commonPrm.WithLocalOnly(false) + + rangeProvider := &rangeProvider{ + getSvc: s.getSvc, + + addr: addr, + + commonPrm: commonPrm, + + localNodeKey: s.localNodeKey, + } + + putstm, err := s.putSvc.Put() + if err != nil { + return err + } + + hdr := hdrWithSig.GetHeader() + oV2 := new(objectV2.Object) + hV2 := new(objectV2.Header) + oV2.SetHeader(hV2) + oV2.GetHeader().SetContainerID(hdr.GetContainerID()) + oV2.GetHeader().SetPayloadLength(hdr.GetPayloadLength()) + oV2.GetHeader().SetAttributes(hdr.GetAttributes()) + + ownerID, err := newOwnerID(req.GetVerificationHeader()) + if err != nil { + return err + } + oV2.GetHeader().SetOwnerID(ownerID) + + prm, err := s.putInitPrm(req, oV2) + if err != nil { + return err + } + + err = putstm.Init(ctx, prm) + if err != nil { + return err + } + + patcherPrm := patcher.Params{ + Header: objectSDK.NewFromV2(oV2), + + RangeProvider: rangeProvider, + + ObjectWriter: putstm.Target(), + } + + s.patcher = patcher.New(patcherPrm) + return nil +} + +func (s *Streamer) readHeader(ctx context.Context, req *objectV2.PatchRequest) (hdrWithSig *objectV2.HeaderWithSignature, addr oid.Address, err error) { + addrV2 := req.GetBody().GetAddress() + if addrV2 == nil { + err = errors.New("patch request has nil-address") + return + } + + if err = addr.ReadFromV2(*addrV2); err != nil { + err = fmt.Errorf("read address error: %w", err) + return + } + + commonPrm, err := util.CommonPrmFromV2(req) + if err != nil { + return + } + commonPrm.WithLocalOnly(false) + + var p getsvc.HeadPrm + p.SetSignerKey(s.localNodeKey) + p.SetCommonParameters(commonPrm) + + resp := new(objectV2.HeadResponse) + resp.SetBody(new(objectV2.HeadResponseBody)) + + p.WithAddress(addr) + p.SetHeaderWriter(&headResponseWriter{ + body: resp.GetBody(), + }) + + err = s.getSvc.Head(ctx, p) + if err != nil { + err = fmt.Errorf("get header error: %w", err) + return + } + + var ok bool + hdrPart := resp.GetBody().GetHeaderPart() + if hdrWithSig, ok = hdrPart.(*objectV2.HeaderWithSignature); !ok { + err = fmt.Errorf("unexpected header type: %T", hdrPart) + } + return +} + +func (s *Streamer) Send(ctx context.Context, req *objectV2.PatchRequest) error { + ctx, span := tracing.StartSpanFromContext(ctx, "patch.streamer.Send") defer span.End() + defer func() { + s.nonFirstSend = true + }() + + if !s.nonFirstSend { + if err := s.init(ctx, req); err != nil { + return fmt.Errorf("streamer init error: %w", err) + } + } + + patch := new(objectSDK.Patch) + patch.FromV2(req.GetBody()) + + if !s.nonFirstSend { + err := s.patcher.ApplyAttributesPatch(ctx, patch.NewAttributes, patch.ReplaceAttributes) + if err != nil { + return fmt.Errorf("patch attributes: %w", err) + } + } + + if patch.PayloadPatch != nil { + err := s.patcher.ApplyPayloadPatch(ctx, patch.PayloadPatch) + if err != nil { + return fmt.Errorf("patch payload: %w", err) + } + } else if s.nonFirstSend { + return errors.New("invalid non-first patch: empty payload") + } + return nil } -func (s *Streamer) CloseAndRecv(_ context.Context) (*object.PatchResponse, error) { - return &object.PatchResponse{ - Body: &object.PatchResponseBody{ - ObjectID: nil, +func (s *Streamer) CloseAndRecv(ctx context.Context) (*objectV2.PatchResponse, error) { + patcherResp, err := s.patcher.Close(ctx) + if err != nil { + return nil, err + } + + oidV2 := new(refsV2.ObjectID) + + if patcherResp.AccessIdentifiers.ParentID != nil { + patcherResp.AccessIdentifiers.ParentID.WriteToV2(oidV2) + } else { + patcherResp.AccessIdentifiers.SelfID.WriteToV2(oidV2) + } + + return &objectV2.PatchResponse{ + Body: &objectV2.PatchResponseBody{ + ObjectID: oidV2, }, }, nil } diff --git a/pkg/services/object/patch/util.go b/pkg/services/object/patch/util.go new file mode 100644 index 000000000..1218d6694 --- /dev/null +++ b/pkg/services/object/patch/util.go @@ -0,0 +1,53 @@ +package patchsvc + +import ( + "crypto/ecdsa" + "crypto/elliptic" + "errors" + "fmt" + + objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object" + "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/refs" + "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/session" + putsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/put" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util" + objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user" + "github.com/nspcc-dev/neo-go/pkg/crypto/keys" +) + +// putInitPrm initializes put paramerer for Put stream. +func (s *Streamer) putInitPrm(req *objectV2.PatchRequest, obj *objectV2.Object) (*putsvc.PutInitPrm, error) { + commonPrm, err := util.CommonPrmFromV2(req) + if err != nil { + return nil, err + } + + prm := new(putsvc.PutInitPrm) + prm.WithObject(objectSDK.NewFromV2(obj)). + WithCommonPrm(commonPrm). + WithPrivateKey(s.localNodeKey) + + return prm, nil +} + +func newOwnerID(vh *session.RequestVerificationHeader) (*refs.OwnerID, error) { + for vh.GetOrigin() != nil { + vh = vh.GetOrigin() + } + sig := vh.GetBodySignature() + if sig == nil { + return nil, errors.New("empty body signature") + } + key, err := keys.NewPublicKeyFromBytes(sig.GetKey(), elliptic.P256()) + if err != nil { + return nil, fmt.Errorf("invalid signature key: %w", err) + } + + var userID user.ID + user.IDFromKey(&userID, (ecdsa.PublicKey)(*key)) + ownID := new(refs.OwnerID) + userID.WriteToV2(ownID) + + return ownID, nil +} diff --git a/pkg/services/object/put/prm.go b/pkg/services/object/put/prm.go index 52a7c102c..0c8f12b45 100644 --- a/pkg/services/object/put/prm.go +++ b/pkg/services/object/put/prm.go @@ -2,6 +2,7 @@ package putsvc import ( "context" + "crypto/ecdsa" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util" @@ -20,6 +21,8 @@ type PutInitPrm struct { traverseOpts []placement.Option relay func(context.Context, client.NodeInfo, client.MultiAddressClient) error + + privateKey *ecdsa.PrivateKey } type PutChunkPrm struct { @@ -65,3 +68,11 @@ func (p *PutChunkPrm) WithChunk(v []byte) *PutChunkPrm { return p } + +func (p *PutInitPrm) WithPrivateKey(v *ecdsa.PrivateKey) *PutInitPrm { + if p != nil { + p.privateKey = v + } + + return p +} diff --git a/pkg/services/object/put/streamer.go b/pkg/services/object/put/streamer.go index 4e655ed54..969c8fa19 100644 --- a/pkg/services/object/put/streamer.go +++ b/pkg/services/object/put/streamer.go @@ -47,6 +47,11 @@ func (p *Streamer) Init(ctx context.Context, prm *PutInitPrm) error { return nil } +// Target accesses underlying target chunked object writer. +func (p *Streamer) Target() transformer.ChunkedObjectWriter { + return p.target +} + // MaxObjectSize returns maximum payload size for the streaming session. // // Must be called after the successful Init. @@ -79,11 +84,15 @@ func (p *Streamer) initTarget(prm *PutInitPrm) error { func (p *Streamer) initUntrustedTarget(prm *PutInitPrm) error { p.relay = prm.relay - nodeKey, err := p.cfg.keyStorage.GetKey(nil) - if err != nil { - return err + if prm.privateKey != nil { + p.privateKey = prm.privateKey + } else { + nodeKey, err := p.cfg.keyStorage.GetKey(nil) + if err != nil { + return err + } + p.privateKey = nodeKey } - p.privateKey = nodeKey // prepare untrusted-Put object target p.target = &validatingPreparedTarget{ @@ -136,7 +145,11 @@ func (p *Streamer) initTrustedTarget(prm *PutInitPrm) error { } } - p.privateKey = key + if prm.privateKey != nil { + p.privateKey = prm.privateKey + } else { + p.privateKey = key + } p.target = &validatingTarget{ fmt: p.fmtValidator, nextTarget: transformer.NewPayloadSizeLimiter(transformer.Params{ diff --git a/pkg/services/object/response.go b/pkg/services/object/response.go index c85259c1f..d7ba9f843 100644 --- a/pkg/services/object/response.go +++ b/pkg/services/object/response.go @@ -38,7 +38,7 @@ type putStreamResponser struct { } type patchStreamResponser struct { - stream PatchObjectstream + stream PatchObjectStream respSvc *response.Service } @@ -109,7 +109,7 @@ func (s *patchStreamResponser) CloseAndRecv(ctx context.Context) (*object.PatchR return r, nil } -func (s *ResponseService) Patch() (PatchObjectstream, error) { +func (s *ResponseService) Patch() (PatchObjectStream, error) { stream, err := s.svc.Patch() if err != nil { return nil, fmt.Errorf("could not create Put object streamer: %w", err) diff --git a/pkg/services/object/server.go b/pkg/services/object/server.go index c1b036ab3..da98ce245 100644 --- a/pkg/services/object/server.go +++ b/pkg/services/object/server.go @@ -31,8 +31,8 @@ type PutObjectStream interface { CloseAndRecv(context.Context) (*object.PutResponse, error) } -// PatchObjectstream is an interface of FrostFS API v2 compatible patch streamer. -type PatchObjectstream interface { +// PatchObjectStream is an interface of FrostFS API v2 compatible patch streamer. +type PatchObjectStream interface { Send(context.Context, *object.PatchRequest) error CloseAndRecv(context.Context) (*object.PatchResponse, error) } @@ -42,7 +42,7 @@ type PatchObjectstream interface { type ServiceServer interface { Get(*object.GetRequest, GetObjectStream) error Put() (PutObjectStream, error) - Patch() (PatchObjectstream, error) + Patch() (PatchObjectStream, error) Head(context.Context, *object.HeadRequest) (*object.HeadResponse, error) Search(*object.SearchRequest, SearchStream) error Delete(context.Context, *object.DeleteRequest) (*object.DeleteResponse, error) diff --git a/pkg/services/object/sign.go b/pkg/services/object/sign.go index 631c539af..35367aafe 100644 --- a/pkg/services/object/sign.go +++ b/pkg/services/object/sign.go @@ -37,7 +37,7 @@ type putStreamSigner struct { type patchStreamSigner struct { sigSvc *util.SignService - stream PatchObjectstream + stream PatchObjectStream err error } @@ -142,7 +142,7 @@ func (s *patchStreamSigner) CloseAndRecv(ctx context.Context) (resp *object.Patc return resp, s.sigSvc.SignResponse(resp, err) } -func (s *SignService) Patch() (PatchObjectstream, error) { +func (s *SignService) Patch() (PatchObjectStream, error) { stream, err := s.svc.Patch() if err != nil { return nil, fmt.Errorf("could not create Put object streamer: %w", err) diff --git a/pkg/services/object/transport_splitter.go b/pkg/services/object/transport_splitter.go index 5acfac06b..e560d6d8c 100644 --- a/pkg/services/object/transport_splitter.go +++ b/pkg/services/object/transport_splitter.go @@ -91,7 +91,7 @@ func (c TransportSplitter) Put() (PutObjectStream, error) { return c.next.Put() } -func (c TransportSplitter) Patch() (PatchObjectstream, error) { +func (c TransportSplitter) Patch() (PatchObjectStream, error) { return c.next.Patch() } -- 2.45.2 From c55ab2f0f4aace1f5bc8b041d809fcb8e39f1bd3 Mon Sep 17 00:00:00 2001 From: Airat Arifullin Date: Tue, 13 Aug 2024 15:01:01 +0300 Subject: [PATCH 3/5] [#1307] cli: Introduce `object patch` command Signed-off-by: Airat Arifullin --- cmd/frostfs-cli/internal/client/client.go | 65 ++++++++++ cmd/frostfs-cli/modules/object/patch.go | 151 ++++++++++++++++++++++ cmd/frostfs-cli/modules/object/root.go | 2 + cmd/frostfs-cli/modules/object/util.go | 2 + 4 files changed, 220 insertions(+) create mode 100644 cmd/frostfs-cli/modules/object/patch.go diff --git a/cmd/frostfs-cli/internal/client/client.go b/cmd/frostfs-cli/internal/client/client.go index 215490dbe..a0fa22410 100644 --- a/cmd/frostfs-cli/internal/client/client.go +++ b/cmd/frostfs-cli/internal/client/client.go @@ -2,10 +2,13 @@ package internal import ( "bytes" + "cmp" "context" "errors" "fmt" "io" + "os" + "slices" "sort" "strings" @@ -869,3 +872,65 @@ func SyncContainerSettings(ctx context.Context, prm SyncContainerPrm) (*SyncCont return new(SyncContainerRes), nil } + +// PatchObjectPrm groups parameters of PatchObject operation. +type PatchObjectPrm struct { + commonObjectPrm + objectAddressPrm + + NewAttributes []objectSDK.Attribute + + ReplaceAttribute bool + + PayloadPatches []PayloadPatch +} + +type PayloadPatch struct { + Range objectSDK.Range + + PayloadPath string +} + +type PatchRes struct { + OID oid.ID +} + +func Patch(ctx context.Context, prm PatchObjectPrm) (*PatchRes, error) { + patchPrm := client.PrmObjectPatch{ + XHeaders: prm.xHeaders, + BearerToken: prm.bearerToken, + Session: prm.sessionToken, + Address: prm.objAddr, + } + + slices.SortFunc(prm.PayloadPatches, func(a, b PayloadPatch) int { + return cmp.Compare(a.Range.GetOffset(), b.Range.GetOffset()) + }) + + patcher, err := prm.cli.ObjectPatchInit(ctx, patchPrm) + if err != nil { + return nil, fmt.Errorf("init payload reading: %w", err) + } + + if patcher.PatchAttributes(ctx, prm.NewAttributes, prm.ReplaceAttribute) { + for _, pp := range prm.PayloadPatches { + payloadFile, err := os.OpenFile(pp.PayloadPath, os.O_RDONLY, os.ModePerm) + if err != nil { + return nil, err + } + applied := patcher.PatchPayload(ctx, &pp.Range, payloadFile) + _ = payloadFile.Close() + if !applied { + break + } + } + } + + res, err := patcher.Close(ctx) + if err != nil { + return nil, err + } + return &PatchRes{ + OID: res.ObjectID(), + }, nil +} diff --git a/cmd/frostfs-cli/modules/object/patch.go b/cmd/frostfs-cli/modules/object/patch.go new file mode 100644 index 000000000..8f03885ab --- /dev/null +++ b/cmd/frostfs-cli/modules/object/patch.go @@ -0,0 +1,151 @@ +package object + +import ( + "fmt" + "strconv" + "strings" + + internalclient "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/client" + "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/commonflags" + "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/key" + commonCmd "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/internal/common" + cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" + objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" + oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" + "github.com/spf13/cobra" +) + +const ( + newAttrsFlagName = "new-attrs" + replaceAttrsFlagName = "replace-attrs" + rangeFlagName = "range" + payloadFlagName = "payload" +) + +var objectPatchCmd = &cobra.Command{ + Use: "patch", + Run: patch, + Short: "Patch FrostFS object", + Long: "Patch FrostFS object. Each range passed to the command requires to pass a corresponding patch payload.", + Example: ` +frostfs-cli -c config.yml -r 127.0.0.1:8080 object patch --cid --oid --new-attrs 'key1=val1,key2=val2' --replace-attrs +frostfs-cli -c config.yml -r 127.0.0.1:8080 object patch --cid --oid --range offX:lnX --payload /path/to/payloadX --range offY:lnY --payload /path/to/payloadY +frostfs-cli -c config.yml -r 127.0.0.1:8080 object patch --cid --oid --new-attrs 'key1=val1,key2=val2' --replace-attrs --range offX:lnX --payload /path/to/payload +`, +} + +func initObjectPatchCmd() { + commonflags.Init(objectPatchCmd) + initFlagSession(objectPatchCmd, "PATCH") + + flags := objectPatchCmd.Flags() + + flags.String(commonflags.CIDFlag, "", commonflags.CIDFlagUsage) + _ = objectRangeCmd.MarkFlagRequired(commonflags.CIDFlag) + + flags.String(commonflags.OIDFlag, "", commonflags.OIDFlagUsage) + _ = objectRangeCmd.MarkFlagRequired(commonflags.OIDFlag) + + flags.String(newAttrsFlagName, "", "New object attributes in form of Key1=Value1,Key2=Value2") + flags.Bool(replaceAttrsFlagName, false, "Replace object attributes by new ones.") + flags.StringSlice(rangeFlagName, []string{}, "Range to which patch payload is applied. Format: offset:length") + flags.StringSlice(payloadFlagName, []string{}, "Path to file with patch payload.") +} + +func patch(cmd *cobra.Command, _ []string) { + var cnr cid.ID + var obj oid.ID + + objAddr := readObjectAddress(cmd, &cnr, &obj) + + ranges, err := getRangeSlice(cmd) + commonCmd.ExitOnErr(cmd, "", err) + + payloads := patchPayloadPaths(cmd) + + if len(ranges) != len(payloads) { + commonCmd.ExitOnErr(cmd, "", fmt.Errorf("the number of ranges and payloads are not equal: ranges = %d, payloads = %d", len(ranges), len(payloads))) + } + + newAttrs, err := parseNewObjectAttrs(cmd) + commonCmd.ExitOnErr(cmd, "can't parse new object attributes: %w", err) + replaceAttrs, _ := cmd.Flags().GetBool(replaceAttrsFlagName) + + pk := key.GetOrGenerate(cmd) + + cli := internalclient.GetSDKClientByFlag(cmd, pk, commonflags.RPC) + + var prm internalclient.PatchObjectPrm + prm.SetClient(cli) + Prepare(cmd, &prm) + ReadOrOpenSession(cmd, &prm, pk, cnr, nil) + + prm.SetAddress(objAddr) + prm.NewAttributes = newAttrs + prm.ReplaceAttribute = replaceAttrs + + for i := range ranges { + prm.PayloadPatches = append(prm.PayloadPatches, internalclient.PayloadPatch{ + Range: ranges[i], + PayloadPath: payloads[i], + }) + } + + res, err := internalclient.Patch(cmd.Context(), prm) + if err != nil { + commonCmd.ExitOnErr(cmd, "can't patch the object: %w", err) + } + cmd.Println("Patched object ID: ", res.OID.EncodeToString()) +} + +func parseNewObjectAttrs(cmd *cobra.Command) ([]objectSDK.Attribute, error) { + var rawAttrs []string + + raw := cmd.Flag(newAttrsFlagName).Value.String() + if len(raw) != 0 { + rawAttrs = strings.Split(raw, ",") + } + + attrs := make([]objectSDK.Attribute, len(rawAttrs), len(rawAttrs)+2) // name + timestamp attributes + for i := range rawAttrs { + k, v, found := strings.Cut(rawAttrs[i], "=") + if !found { + return nil, fmt.Errorf("invalid attribute format: %s", rawAttrs[i]) + } + attrs[i].SetKey(k) + attrs[i].SetValue(v) + } + return attrs, nil +} + +func getRangeSlice(cmd *cobra.Command) ([]objectSDK.Range, error) { + v, _ := cmd.Flags().GetStringSlice(rangeFlagName) + if len(v) == 0 { + return []objectSDK.Range{}, nil + } + rs := make([]objectSDK.Range, len(v)) + for i := range v { + before, after, found := strings.Cut(v[i], rangeSep) + if !found { + return nil, fmt.Errorf("invalid range specifier: %s", v[i]) + } + + offset, err := strconv.ParseUint(before, 10, 64) + if err != nil { + return nil, fmt.Errorf("invalid '%s' range offset specifier: %w", v[i], err) + } + length, err := strconv.ParseUint(after, 10, 64) + if err != nil { + return nil, fmt.Errorf("invalid '%s' range length specifier: %w", v[i], err) + } + + rs[i].SetOffset(offset) + rs[i].SetLength(length) + } + return rs, nil +} + +func patchPayloadPaths(cmd *cobra.Command) []string { + v, _ := cmd.Flags().GetStringSlice(payloadFlagName) + return v +} diff --git a/cmd/frostfs-cli/modules/object/root.go b/cmd/frostfs-cli/modules/object/root.go index 7d8008b10..b808a509e 100644 --- a/cmd/frostfs-cli/modules/object/root.go +++ b/cmd/frostfs-cli/modules/object/root.go @@ -29,6 +29,7 @@ func init() { objectRangeCmd, objectLockCmd, objectNodesCmd, + objectPatchCmd, } Cmd.AddCommand(objectChildCommands...) @@ -39,6 +40,7 @@ func init() { } initObjectPutCmd() + initObjectPatchCmd() initObjectDeleteCmd() initObjectGetCmd() initObjectSearchCmd() diff --git a/cmd/frostfs-cli/modules/object/util.go b/cmd/frostfs-cli/modules/object/util.go index 381c790e9..96b80fe1b 100644 --- a/cmd/frostfs-cli/modules/object/util.go +++ b/cmd/frostfs-cli/modules/object/util.go @@ -306,6 +306,8 @@ func finalizeSession(cmd *cobra.Command, dst SessionPrm, tok *session.Object, ke case *internal.PutObjectPrm: common.PrintVerbose(cmd, "Binding session to object PUT...") tok.ForVerb(session.VerbObjectPut) + case *internal.PatchObjectPrm: + tok.ForVerb(session.VerbObjectPatch) case *internal.DeleteObjectPrm: common.PrintVerbose(cmd, "Binding session to object DELETE...") tok.ForVerb(session.VerbObjectDelete) -- 2.45.2 From 4a7c8581244cfc30a18f246654f533bfc3e0792a Mon Sep 17 00:00:00 2001 From: Airat Arifullin Date: Wed, 14 Aug 2024 14:38:01 +0300 Subject: [PATCH 4/5] [#1307] object: Add APE check for `Patch` handler Signed-off-by: Airat Arifullin --- pkg/services/object/acl/v2/service.go | 125 +++++++++++++++++++++++- pkg/services/object/acl/v2/util.go | 2 + pkg/services/object/ape/checker_test.go | 19 +--- pkg/services/object/ape/request.go | 3 +- pkg/services/object/ape/service.go | 54 +++++++++- 5 files changed, 183 insertions(+), 20 deletions(-) diff --git a/pkg/services/object/acl/v2/service.go b/pkg/services/object/acl/v2/service.go index a9ddad7ca..5a8e8b065 100644 --- a/pkg/services/object/acl/v2/service.go +++ b/pkg/services/object/acl/v2/service.go @@ -35,6 +35,12 @@ type putStreamBasicChecker struct { next object.PutObjectStream } +type patchStreamBasicChecker struct { + source *Service + next object.PatchObjectStream + nonFirstSend bool +} + type getStreamBasicChecker struct { checker ACLChecker @@ -250,7 +256,12 @@ func (b Service) Put() (object.PutObjectStream, error) { } func (b Service) Patch() (object.PatchObjectStream, error) { - return b.next.Patch() + streamer, err := b.next.Patch() + + return &patchStreamBasicChecker{ + source: &b, + next: streamer, + }, err } func (b Service) Head( @@ -738,6 +749,65 @@ func (g *searchStreamBasicChecker) Send(resp *objectV2.SearchResponse) error { return g.SearchStream.Send(resp) } +func (p *patchStreamBasicChecker) Send(ctx context.Context, request *objectV2.PatchRequest) error { + body := request.GetBody() + if body == nil { + return errEmptyBody + } + + if !p.nonFirstSend { + p.nonFirstSend = true + + cnr, err := getContainerIDFromRequest(request) + if err != nil { + return err + } + + objV2 := request.GetBody().GetAddress().GetObjectID() + if objV2 == nil { + return errors.New("missing oid") + } + obj := new(oid.ID) + err = obj.ReadFromV2(*objV2) + if err != nil { + return err + } + + var sTok *sessionSDK.Object + sTok, err = readSessionToken(cnr, obj, request.GetMetaHeader().GetSessionToken()) + if err != nil { + return err + } + + bTok, err := originalBearerToken(request.GetMetaHeader()) + if err != nil { + return err + } + + req := MetaWithToken{ + vheader: request.GetVerificationHeader(), + token: sTok, + bearer: bTok, + src: request, + } + + reqInfo, err := p.source.findRequestInfoWithoutACLOperationAssert(req, cnr) + if err != nil { + return err + } + + reqInfo.obj = obj + + ctx = requestContext(ctx, reqInfo) + } + + return p.next.Send(ctx, request) +} + +func (p patchStreamBasicChecker) CloseAndRecv(ctx context.Context) (*objectV2.PatchResponse, error) { + return p.next.CloseAndRecv(ctx) +} + func (b Service) findRequestInfo(req MetaWithToken, idCnr cid.ID, op acl.Op) (info RequestInfo, err error) { cnr, err := b.containers.Get(idCnr) // fetch actual container if err != nil { @@ -794,3 +864,56 @@ func (b Service) findRequestInfo(req MetaWithToken, idCnr cid.ID, op acl.Op) (in return info, nil } + +// findRequestInfoWithoutACLOperationAssert is findRequestInfo without session token verb assert. +func (b Service) findRequestInfoWithoutACLOperationAssert(req MetaWithToken, idCnr cid.ID) (info RequestInfo, err error) { + cnr, err := b.containers.Get(idCnr) // fetch actual container + if err != nil { + return info, err + } + + if req.token != nil { + currentEpoch, err := b.nm.Epoch() + if err != nil { + return info, errors.New("can't fetch current epoch") + } + if req.token.ExpiredAt(currentEpoch) { + return info, new(apistatus.SessionTokenExpired) + } + if req.token.InvalidAt(currentEpoch) { + return info, fmt.Errorf("%s: token is invalid at %d epoch)", + invalidRequestMessage, currentEpoch) + } + } + + // find request role and key + ownerID, ownerKey, err := req.RequestOwner() + if err != nil { + return info, err + } + res, err := b.c.Classify(ownerID, ownerKey, idCnr, cnr.Value) + if err != nil { + return info, err + } + + info.basicACL = cnr.Value.BasicACL() + info.requestRole = res.Role + info.cnrOwner = cnr.Value.Owner() + info.idCnr = idCnr + + cnrNamespace, hasNamespace := strings.CutSuffix(cnrSDK.ReadDomain(cnr.Value).Zone(), ".ns") + if hasNamespace { + info.cnrNamespace = cnrNamespace + } + + // it is assumed that at the moment the key will be valid, + // otherwise the request would not pass validation + info.senderKey = res.Key + + // add bearer token if it is present in request + info.bearer = req.bearer + + info.srcRequest = req.src + + return info, nil +} diff --git a/pkg/services/object/acl/v2/util.go b/pkg/services/object/acl/v2/util.go index 76fd9651d..c5225e8c4 100644 --- a/pkg/services/object/acl/v2/util.go +++ b/pkg/services/object/acl/v2/util.go @@ -46,6 +46,8 @@ func getContainerIDFromRequest(req any) (cid.ID, error) { idV2 = v.GetBody().GetAddress().GetContainerID() case *objectV2.PutSingleRequest: idV2 = v.GetBody().GetObject().GetHeader().GetContainerID() + case *objectV2.PatchRequest: + idV2 = v.GetBody().GetAddress().GetContainerID() default: return cid.ID{}, errors.New("unknown request type") } diff --git a/pkg/services/object/ape/checker_test.go b/pkg/services/object/ape/checker_test.go index afe19fc51..090f6a83c 100644 --- a/pkg/services/object/ape/checker_test.go +++ b/pkg/services/object/ape/checker_test.go @@ -518,22 +518,7 @@ func TestAPECheck_BearerTokenOverrides(t *testing.T) { ls := inmemory.NewInmemoryLocalStorage() ms := inmemory.NewInmemoryMorphRuleChainStorage() - node1Key, err := keys.NewPrivateKey() - require.NoError(t, err) - node1 := netmapSDK.NodeInfo{} - node1.SetPublicKey(node1Key.PublicKey().Bytes()) - netmap := &netmapSDK.NetMap{} - netmap.SetEpoch(100) - netmap.SetNodes([]netmapSDK.NodeInfo{node1}) - - nm := &netmapStub{ - currentEpoch: 100, - netmaps: map[uint64]*netmapSDK.NetMap{ - 100: netmap, - }, - } - - checker := NewChecker(ls, ms, headerProvider, frostfsidProvider, nm, &stMock{}, nil, nil) + checker := NewChecker(ls, ms, headerProvider, frostfsidProvider, nil, &stMock{}, nil, nil) prm := Prm{ Method: method, @@ -556,7 +541,7 @@ func TestAPECheck_BearerTokenOverrides(t *testing.T) { } } - err = checker.CheckAPE(context.Background(), prm) + err := checker.CheckAPE(context.Background(), prm) if test.expectAPEErr { require.Error(t, err) } else { diff --git a/pkg/services/object/ape/request.go b/pkg/services/object/ape/request.go index 1c129f65f..da5307ca7 100644 --- a/pkg/services/object/ape/request.go +++ b/pkg/services/object/ape/request.go @@ -103,7 +103,8 @@ func (c *checkerImpl) newAPERequest(ctx context.Context, prm Prm) (aperequest.Re nativeschema.MethodHeadObject, nativeschema.MethodRangeObject, nativeschema.MethodHashObject, - nativeschema.MethodDeleteObject: + nativeschema.MethodDeleteObject, + nativeschema.MethodPatchObject: if prm.Object == nil { return defaultRequest, fmt.Errorf("method %s: %w", prm.Method, errMissingOID) } diff --git a/pkg/services/object/ape/service.go b/pkg/services/object/ape/service.go index 64dd19c24..a1634e7c5 100644 --- a/pkg/services/object/ape/service.go +++ b/pkg/services/object/ape/service.go @@ -204,8 +204,60 @@ func (c *Service) Put() (objectSvc.PutObjectStream, error) { }, err } +type patchStreamBasicChecker struct { + apeChecker Checker + + next objectSvc.PatchObjectStream + + nonFirstSend bool +} + +func (p *patchStreamBasicChecker) Send(ctx context.Context, request *objectV2.PatchRequest) error { + if !p.nonFirstSend { + p.nonFirstSend = true + + reqCtx, err := requestContext(ctx) + if err != nil { + return toStatusErr(err) + } + + cnrID, objID, err := getAddressParamsSDK(request.GetBody().GetAddress().GetContainerID(), request.GetBody().GetAddress().GetObjectID()) + if err != nil { + return toStatusErr(err) + } + + prm := Prm{ + Namespace: reqCtx.Namespace, + Container: cnrID, + Object: objID, + Method: nativeschema.MethodPatchObject, + SenderKey: hex.EncodeToString(reqCtx.SenderKey), + ContainerOwner: reqCtx.ContainerOwner, + Role: nativeSchemaRole(reqCtx.Role), + SoftAPECheck: reqCtx.SoftAPECheck, + BearerToken: reqCtx.BearerToken, + XHeaders: request.GetMetaHeader().GetXHeaders(), + } + + if err := p.apeChecker.CheckAPE(ctx, prm); err != nil { + return toStatusErr(err) + } + } + + return p.next.Send(ctx, request) +} + +func (p patchStreamBasicChecker) CloseAndRecv(ctx context.Context) (*objectV2.PatchResponse, error) { + return p.next.CloseAndRecv(ctx) +} + func (c *Service) Patch() (objectSvc.PatchObjectStream, error) { - return c.next.Patch() + streamer, err := c.next.Patch() + + return &patchStreamBasicChecker{ + apeChecker: c.apeChecker, + next: streamer, + }, err } func (c *Service) Head(ctx context.Context, request *objectV2.HeadRequest) (*objectV2.HeadResponse, error) { -- 2.45.2 From d168ba9bebf0624d6f3066544393c0fe06a40f74 Mon Sep 17 00:00:00 2001 From: Airat Arifullin Date: Wed, 14 Aug 2024 14:39:13 +0300 Subject: [PATCH 5/5] [#1307] cli: Make cli process object.patch Signed-off-by: Airat Arifullin --- cmd/frostfs-cli/modules/util/ape.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/cmd/frostfs-cli/modules/util/ape.go b/cmd/frostfs-cli/modules/util/ape.go index 532dc0a50..9af57434a 100644 --- a/cmd/frostfs-cli/modules/util/ape.go +++ b/cmd/frostfs-cli/modules/util/ape.go @@ -239,6 +239,8 @@ func parseAction(lexeme string) ([]string, bool, error) { return []string{nativeschema.MethodRangeObject}, true, nil case "object.hash": return []string{nativeschema.MethodHashObject}, true, nil + case "object.patch": + return []string{nativeschema.MethodPatchObject}, true, nil case "object.*": return []string{ nativeschema.MethodPutObject, @@ -248,6 +250,7 @@ func parseAction(lexeme string) ([]string, bool, error) { nativeschema.MethodSearchObject, nativeschema.MethodRangeObject, nativeschema.MethodHashObject, + nativeschema.MethodPatchObject, }, true, nil case "container.put": return []string{nativeschema.MethodPutContainer}, false, nil -- 2.45.2