From e890f1b4b17d62cd95fe22d018212f91ff2be95d Mon Sep 17 00:00:00 2001 From: Airat Arifullin Date: Mon, 12 Aug 2024 17:11:10 +0300 Subject: [PATCH] [#1307] object: Implement `Patch` method Signed-off-by: Airat Arifullin --- cmd/frostfs-node/object.go | 2 +- pkg/services/object/acl/v2/service.go | 20 +- pkg/services/object/acl/v2/util.go | 8 +- pkg/services/object/ape/checker_test.go | 19 +- pkg/services/object/ape/service.go | 20 +- pkg/services/object/audit.go | 19 +- pkg/services/object/common.go | 2 +- pkg/services/object/get/prm.go | 4 + pkg/services/object/metrics.go | 6 +- pkg/services/object/patch/range_provider.go | 63 ++++++ pkg/services/object/patch/service.go | 30 ++- pkg/services/object/patch/streamer.go | 215 +++++++++++++++++++- pkg/services/object/patch/util.go | 53 +++++ pkg/services/object/put/prm.go | 11 + pkg/services/object/put/streamer.go | 23 ++- pkg/services/object/response.go | 4 +- pkg/services/object/server.go | 6 +- pkg/services/object/sign.go | 4 +- pkg/services/object/transport_splitter.go | 2 +- 19 files changed, 430 insertions(+), 81 deletions(-) create mode 100644 pkg/services/object/patch/range_provider.go create mode 100644 pkg/services/object/patch/util.go diff --git a/cmd/frostfs-node/object.go b/cmd/frostfs-node/object.go index eef142415..467c5901b 100644 --- a/cmd/frostfs-node/object.go +++ b/cmd/frostfs-node/object.go @@ -74,7 +74,7 @@ func (s *objectSvc) Put() (objectService.PutObjectStream, error) { return s.put.Put() } -func (s *objectSvc) Patch() (objectService.PatchObjectstream, error) { +func (s *objectSvc) Patch() (objectService.PatchObjectStream, error) { return s.patch.Patch() } diff --git a/pkg/services/object/acl/v2/service.go b/pkg/services/object/acl/v2/service.go index 58557d611..a9ddad7ca 100644 --- a/pkg/services/object/acl/v2/service.go +++ b/pkg/services/object/acl/v2/service.go @@ -249,24 +249,8 @@ func (b Service) Put() (object.PutObjectStream, error) { }, err } -type patchStreamBasicChecker struct { - next object.PatchObjectstream -} - -func (p patchStreamBasicChecker) Send(ctx context.Context, request *objectV2.PatchRequest) error { - return p.next.Send(ctx, request) -} - -func (p patchStreamBasicChecker) CloseAndRecv(ctx context.Context) (*objectV2.PatchResponse, error) { - return p.next.CloseAndRecv(ctx) -} - -func (b Service) Patch() (object.PatchObjectstream, error) { - streamer, err := b.next.Patch() - - return &patchStreamBasicChecker{ - next: streamer, - }, err +func (b Service) Patch() (object.PatchObjectStream, error) { + return b.next.Patch() } func (b Service) Head( diff --git a/pkg/services/object/acl/v2/util.go b/pkg/services/object/acl/v2/util.go index feda6a3cf..76fd9651d 100644 --- a/pkg/services/object/acl/v2/util.go +++ b/pkg/services/object/acl/v2/util.go @@ -174,7 +174,7 @@ func isOwnerFromKey(id user.ID, key *keys.PublicKey) bool { func assertVerb(tok sessionSDK.Object, op acl.Op) bool { switch op { case acl.OpObjectPut: - return tok.AssertVerb(sessionSDK.VerbObjectPut, sessionSDK.VerbObjectDelete) + return tok.AssertVerb(sessionSDK.VerbObjectPut, sessionSDK.VerbObjectDelete, sessionSDK.VerbObjectPatch) case acl.OpObjectDelete: return tok.AssertVerb(sessionSDK.VerbObjectDelete) case acl.OpObjectGet: @@ -185,11 +185,13 @@ func assertVerb(tok sessionSDK.Object, op acl.Op) bool { sessionSDK.VerbObjectGet, sessionSDK.VerbObjectDelete, sessionSDK.VerbObjectRange, - sessionSDK.VerbObjectRangeHash) + sessionSDK.VerbObjectRangeHash, + sessionSDK.VerbObjectPatch, + ) case acl.OpObjectSearch: return tok.AssertVerb(sessionSDK.VerbObjectSearch, sessionSDK.VerbObjectDelete) case acl.OpObjectRange: - return tok.AssertVerb(sessionSDK.VerbObjectRange, sessionSDK.VerbObjectRangeHash) + return tok.AssertVerb(sessionSDK.VerbObjectRange, sessionSDK.VerbObjectRangeHash, sessionSDK.VerbObjectPatch) case acl.OpObjectHash: return tok.AssertVerb(sessionSDK.VerbObjectRangeHash) } diff --git a/pkg/services/object/ape/checker_test.go b/pkg/services/object/ape/checker_test.go index 090f6a83c..afe19fc51 100644 --- a/pkg/services/object/ape/checker_test.go +++ b/pkg/services/object/ape/checker_test.go @@ -518,7 +518,22 @@ func TestAPECheck_BearerTokenOverrides(t *testing.T) { ls := inmemory.NewInmemoryLocalStorage() ms := inmemory.NewInmemoryMorphRuleChainStorage() - checker := NewChecker(ls, ms, headerProvider, frostfsidProvider, nil, &stMock{}, nil, nil) + node1Key, err := keys.NewPrivateKey() + require.NoError(t, err) + node1 := netmapSDK.NodeInfo{} + node1.SetPublicKey(node1Key.PublicKey().Bytes()) + netmap := &netmapSDK.NetMap{} + netmap.SetEpoch(100) + netmap.SetNodes([]netmapSDK.NodeInfo{node1}) + + nm := &netmapStub{ + currentEpoch: 100, + netmaps: map[uint64]*netmapSDK.NetMap{ + 100: netmap, + }, + } + + checker := NewChecker(ls, ms, headerProvider, frostfsidProvider, nm, &stMock{}, nil, nil) prm := Prm{ Method: method, @@ -541,7 +556,7 @@ func TestAPECheck_BearerTokenOverrides(t *testing.T) { } } - err := checker.CheckAPE(context.Background(), prm) + err = checker.CheckAPE(context.Background(), prm) if test.expectAPEErr { require.Error(t, err) } else { diff --git a/pkg/services/object/ape/service.go b/pkg/services/object/ape/service.go index f005d0873..64dd19c24 100644 --- a/pkg/services/object/ape/service.go +++ b/pkg/services/object/ape/service.go @@ -204,24 +204,8 @@ func (c *Service) Put() (objectSvc.PutObjectStream, error) { }, err } -type patchStreamBasicChecker struct { - next objectSvc.PatchObjectstream -} - -func (p *patchStreamBasicChecker) Send(ctx context.Context, request *objectV2.PatchRequest) error { - return p.next.Send(ctx, request) -} - -func (p patchStreamBasicChecker) CloseAndRecv(ctx context.Context) (*objectV2.PatchResponse, error) { - return p.next.CloseAndRecv(ctx) -} - -func (c *Service) Patch() (objectSvc.PatchObjectstream, error) { - streamer, err := c.next.Patch() - - return &patchStreamBasicChecker{ - next: streamer, - }, err +func (c *Service) Patch() (objectSvc.PatchObjectStream, error) { + return c.next.Patch() } func (c *Service) Head(ctx context.Context, request *objectV2.HeadRequest) (*objectV2.HeadResponse, error) { diff --git a/pkg/services/object/audit.go b/pkg/services/object/audit.go index 680a96c40..b924386d1 100644 --- a/pkg/services/object/audit.go +++ b/pkg/services/object/audit.go @@ -172,16 +172,18 @@ func (a *auditPutStream) Send(ctx context.Context, req *object.PutRequest) error } type auditPatchStream struct { - stream PatchObjectstream + stream PatchObjectStream log *logger.Logger failed bool key []byte containerID *refs.ContainerID objectID *refs.ObjectID + + nonFirstSend bool } -func (a *auditService) Patch() (PatchObjectstream, error) { +func (a *auditService) Patch() (PatchObjectStream, error) { res, err := a.next.Patch() if !a.enabled.Load() { return res, err @@ -196,7 +198,7 @@ func (a *auditService) Patch() (PatchObjectstream, error) { }, nil } -// CloseAndRecv implements PutObjectStream. +// CloseAndRecv implements PatchObjectStream. func (a *auditPatchStream) CloseAndRecv(ctx context.Context) (*object.PatchResponse, error) { resp, err := a.stream.CloseAndRecv(ctx) if err != nil { @@ -209,11 +211,14 @@ func (a *auditPatchStream) CloseAndRecv(ctx context.Context) (*object.PatchRespo return resp, err } -// Send implements PutObjectStream. +// Send implements PatchObjectStream. func (a *auditPatchStream) Send(ctx context.Context, req *object.PatchRequest) error { - a.containerID = req.GetBody().GetAddress().GetContainerID() - a.objectID = req.GetBody().GetAddress().GetObjectID() - a.key = req.GetVerificationHeader().GetBodySignature().GetKey() + if !a.nonFirstSend { + a.containerID = req.GetBody().GetAddress().GetContainerID() + a.objectID = req.GetBody().GetAddress().GetObjectID() + a.key = req.GetVerificationHeader().GetBodySignature().GetKey() + a.nonFirstSend = true + } err := a.stream.Send(ctx, req) if err != nil { diff --git a/pkg/services/object/common.go b/pkg/services/object/common.go index 841a3d021..f48cc5b3d 100644 --- a/pkg/services/object/common.go +++ b/pkg/services/object/common.go @@ -48,7 +48,7 @@ func (x *Common) Put() (PutObjectStream, error) { return x.nextHandler.Put() } -func (x *Common) Patch() (PatchObjectstream, error) { +func (x *Common) Patch() (PatchObjectStream, error) { if x.state.IsMaintenance() { return nil, new(apistatus.NodeUnderMaintenance) } diff --git a/pkg/services/object/get/prm.go b/pkg/services/object/get/prm.go index cbdb7a3e2..94c07381c 100644 --- a/pkg/services/object/get/prm.go +++ b/pkg/services/object/get/prm.go @@ -124,6 +124,10 @@ func (p *commonPrm) SetRequestForwarder(f RequestForwarder) { p.forwarder = f } +func (p *commonPrm) SetSignerKey(signerKey *ecdsa.PrivateKey) { + p.signerKey = signerKey +} + // WithAddress sets object address to be read. func (p *commonPrm) WithAddress(addr oid.Address) { p.addr = addr diff --git a/pkg/services/object/metrics.go b/pkg/services/object/metrics.go index b64f879ac..e53b7584f 100644 --- a/pkg/services/object/metrics.go +++ b/pkg/services/object/metrics.go @@ -28,7 +28,7 @@ type ( } patchStreamMetric struct { - stream PatchObjectstream + stream PatchObjectStream metrics MetricRegister start time.Time } @@ -82,7 +82,7 @@ func (m MetricCollector) Put() (PutObjectStream, error) { return m.next.Put() } -func (m MetricCollector) Patch() (PatchObjectstream, error) { +func (m MetricCollector) Patch() (PatchObjectStream, error) { if m.enabled { t := time.Now() @@ -214,7 +214,7 @@ func (s putStreamMetric) CloseAndRecv(ctx context.Context) (*object.PutResponse, return res, err } func (s patchStreamMetric) Send(ctx context.Context, req *object.PatchRequest) error { - s.metrics.AddPayloadSize("Patch", len(req.GetBody().GetPatch().Chunk)) + s.metrics.AddPayloadSize("Patch", len(req.GetBody().GetPatch().GetChunk())) return s.stream.Send(ctx, req) } diff --git a/pkg/services/object/patch/range_provider.go b/pkg/services/object/patch/range_provider.go new file mode 100644 index 000000000..755c5bf60 --- /dev/null +++ b/pkg/services/object/patch/range_provider.go @@ -0,0 +1,63 @@ +package patchsvc + +import ( + "context" + "crypto/ecdsa" + "io" + + getsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/get" + objectUtil "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util" + objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" + oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" + patcherSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/patcher" +) + +func (p *pipeChunkWriter) WriteChunk(_ context.Context, chunk []byte) error { + _, err := p.wr.Write(chunk) + return err +} + +type rangeProvider struct { + getSvc *getsvc.Service + + addr oid.Address + + commonPrm *objectUtil.CommonPrm + + localNodeKey *ecdsa.PrivateKey +} + +var _ patcherSDK.RangeProvider = (*rangeProvider)(nil) + +func (r *rangeProvider) GetRange(ctx context.Context, rng *objectSDK.Range) io.Reader { + pipeReader, pipeWriter := io.Pipe() + + var rngPrm getsvc.RangePrm + rngPrm.SetSignerKey(r.localNodeKey) + rngPrm.SetCommonParameters(r.commonPrm) + + rngPrm.WithAddress(r.addr) + rngPrm.SetChunkWriter(&pipeChunkWriter{ + wr: pipeWriter, + }) + rngPrm.SetRange(rng) + + getRangeErr := make(chan error) + + go func() { + defer pipeWriter.Close() + + select { + case <-ctx.Done(): + pipeWriter.CloseWithError(ctx.Err()) + case err := <-getRangeErr: + pipeWriter.CloseWithError(err) + } + }() + + go func() { + getRangeErr <- r.getSvc.GetRange(ctx, rngPrm) + }() + + return pipeReader +} diff --git a/pkg/services/object/patch/service.go b/pkg/services/object/patch/service.go index df6926e84..c4ab15abf 100644 --- a/pkg/services/object/patch/service.go +++ b/pkg/services/object/patch/service.go @@ -9,14 +9,36 @@ import ( // Service implements Put operation of Object service v2. type Service struct { + keyStorage *util.KeyStorage + + getSvc *getsvc.Service + + putSvc *putsvc.Service } // NewService constructs Service instance from provided options. -func NewService(_ *util.KeyStorage, _ *getsvc.Service, _ *putsvc.Service) *Service { - return &Service{} +func NewService(ks *util.KeyStorage, getSvc *getsvc.Service, putSvc *putsvc.Service) *Service { + return &Service{ + keyStorage: ks, + + getSvc: getSvc, + + putSvc: putSvc, + } } // Put calls internal service and returns v2 object streamer. -func (s *Service) Patch() (object.PatchObjectstream, error) { - return &Streamer{}, nil +func (s *Service) Patch() (object.PatchObjectStream, error) { + nodeKey, err := s.keyStorage.GetKey(nil) + if err != nil { + return nil, err + } + + return &Streamer{ + getSvc: s.getSvc, + + putSvc: s.putSvc, + + localNodeKey: nodeKey, + }, nil } diff --git a/pkg/services/object/patch/streamer.go b/pkg/services/object/patch/streamer.go index 5d021b9c3..84363530e 100644 --- a/pkg/services/object/patch/streamer.go +++ b/pkg/services/object/patch/streamer.go @@ -2,27 +2,220 @@ package patchsvc import ( "context" + "crypto/ecdsa" + "errors" + "fmt" + "io" - "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object" + objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object" + refsV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/refs" + getsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/get" + putsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/put" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" + objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" + oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/patcher" ) -// Streamer for the patch handler is a pipeline that merges two incoming -// streams of patches and original object payload chunks. -// The merged result is fed to Put stream target. -type Streamer struct{} +// Streamer for the patch handler is a pipeline that merges two incoming streams of patches +// and original object payload chunks. The merged result is fed to Put stream target. +type Streamer struct { + // Patcher must be initialized at first Streamer.Send call. + patcher patcher.PatchApplier -func (s *Streamer) Send(ctx context.Context, _ *object.PatchRequest) error { - _, span := tracing.StartSpanFromContext(ctx, "patch.streamer.Send") + nonFirstSend bool + + getSvc *getsvc.Service + + putSvc *putsvc.Service + + localNodeKey *ecdsa.PrivateKey +} + +type pipeChunkWriter struct { + wr *io.PipeWriter +} + +type headResponseWriter struct { + body *objectV2.HeadResponseBody +} + +func (w *headResponseWriter) WriteHeader(_ context.Context, hdr *objectSDK.Object) error { + w.body.SetHeaderPart(toFullObjectHeader(hdr)) + return nil +} + +func toFullObjectHeader(hdr *objectSDK.Object) objectV2.GetHeaderPart { + obj := hdr.ToV2() + + hs := new(objectV2.HeaderWithSignature) + hs.SetHeader(obj.GetHeader()) + hs.SetSignature(obj.GetSignature()) + + return hs +} + +func (s *Streamer) init(ctx context.Context, req *objectV2.PatchRequest) error { + hdrWithSig, addr, err := s.readHeader(ctx, req) + if err != nil { + return err + } + + commonPrm, err := util.CommonPrmFromV2(req) + if err != nil { + return err + } + commonPrm.WithLocalOnly(false) + + rangeProvider := &rangeProvider{ + getSvc: s.getSvc, + + addr: addr, + + commonPrm: commonPrm, + + localNodeKey: s.localNodeKey, + } + + putstm, err := s.putSvc.Put() + if err != nil { + return err + } + + hdr := hdrWithSig.GetHeader() + oV2 := new(objectV2.Object) + hV2 := new(objectV2.Header) + oV2.SetHeader(hV2) + oV2.GetHeader().SetContainerID(hdr.GetContainerID()) + oV2.GetHeader().SetPayloadLength(hdr.GetPayloadLength()) + oV2.GetHeader().SetAttributes(hdr.GetAttributes()) + + ownerID, err := newOwnerID(req.GetVerificationHeader()) + if err != nil { + return err + } + oV2.GetHeader().SetOwnerID(ownerID) + + prm, err := s.putInitPrm(req, oV2) + if err != nil { + return err + } + + err = putstm.Init(ctx, prm) + if err != nil { + return err + } + + patcherPrm := patcher.Params{ + Header: objectSDK.NewFromV2(oV2), + + RangeProvider: rangeProvider, + + ObjectWriter: putstm.Target(), + } + + s.patcher = patcher.New(patcherPrm) + return nil +} + +func (s *Streamer) readHeader(ctx context.Context, req *objectV2.PatchRequest) (hdrWithSig *objectV2.HeaderWithSignature, addr oid.Address, err error) { + addrV2 := req.GetBody().GetAddress() + if addrV2 == nil { + err = errors.New("patch request has nil-address") + return + } + + if err = addr.ReadFromV2(*addrV2); err != nil { + err = fmt.Errorf("read address error: %w", err) + return + } + + commonPrm, err := util.CommonPrmFromV2(req) + if err != nil { + return + } + commonPrm.WithLocalOnly(false) + + var p getsvc.HeadPrm + p.SetSignerKey(s.localNodeKey) + p.SetCommonParameters(commonPrm) + + resp := new(objectV2.HeadResponse) + resp.SetBody(new(objectV2.HeadResponseBody)) + + p.WithAddress(addr) + p.SetHeaderWriter(&headResponseWriter{ + body: resp.GetBody(), + }) + + err = s.getSvc.Head(ctx, p) + if err != nil { + err = fmt.Errorf("get header error: %w", err) + return + } + + var ok bool + hdrPart := resp.GetBody().GetHeaderPart() + if hdrWithSig, ok = hdrPart.(*objectV2.HeaderWithSignature); !ok { + err = fmt.Errorf("unexpected header type: %T", hdrPart) + } + return +} + +func (s *Streamer) Send(ctx context.Context, req *objectV2.PatchRequest) error { + ctx, span := tracing.StartSpanFromContext(ctx, "patch.streamer.Send") defer span.End() + defer func() { + s.nonFirstSend = true + }() + + if !s.nonFirstSend { + if err := s.init(ctx, req); err != nil { + return fmt.Errorf("streamer init error: %w", err) + } + } + + patch := new(objectSDK.Patch) + patch.FromV2(req.GetBody()) + + if !s.nonFirstSend { + err := s.patcher.ApplyAttributesPatch(ctx, patch.NewAttributes, patch.ReplaceAttributes) + if err != nil { + return fmt.Errorf("patch attributes: %w", err) + } + } + + if patch.PayloadPatch != nil { + err := s.patcher.ApplyPayloadPatch(ctx, patch.PayloadPatch) + if err != nil { + return fmt.Errorf("patch payload: %w", err) + } + } else if s.nonFirstSend { + return errors.New("invalid non-first patch: empty payload") + } + return nil } -func (s *Streamer) CloseAndRecv(_ context.Context) (*object.PatchResponse, error) { - return &object.PatchResponse{ - Body: &object.PatchResponseBody{ - ObjectID: nil, +func (s *Streamer) CloseAndRecv(ctx context.Context) (*objectV2.PatchResponse, error) { + patcherResp, err := s.patcher.Close(ctx) + if err != nil { + return nil, err + } + + oidV2 := new(refsV2.ObjectID) + + if patcherResp.AccessIdentifiers.ParentID != nil { + patcherResp.AccessIdentifiers.ParentID.WriteToV2(oidV2) + } else { + patcherResp.AccessIdentifiers.SelfID.WriteToV2(oidV2) + } + + return &objectV2.PatchResponse{ + Body: &objectV2.PatchResponseBody{ + ObjectID: oidV2, }, }, nil } diff --git a/pkg/services/object/patch/util.go b/pkg/services/object/patch/util.go new file mode 100644 index 000000000..1218d6694 --- /dev/null +++ b/pkg/services/object/patch/util.go @@ -0,0 +1,53 @@ +package patchsvc + +import ( + "crypto/ecdsa" + "crypto/elliptic" + "errors" + "fmt" + + objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object" + "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/refs" + "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/session" + putsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/put" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util" + objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user" + "github.com/nspcc-dev/neo-go/pkg/crypto/keys" +) + +// putInitPrm initializes put paramerer for Put stream. +func (s *Streamer) putInitPrm(req *objectV2.PatchRequest, obj *objectV2.Object) (*putsvc.PutInitPrm, error) { + commonPrm, err := util.CommonPrmFromV2(req) + if err != nil { + return nil, err + } + + prm := new(putsvc.PutInitPrm) + prm.WithObject(objectSDK.NewFromV2(obj)). + WithCommonPrm(commonPrm). + WithPrivateKey(s.localNodeKey) + + return prm, nil +} + +func newOwnerID(vh *session.RequestVerificationHeader) (*refs.OwnerID, error) { + for vh.GetOrigin() != nil { + vh = vh.GetOrigin() + } + sig := vh.GetBodySignature() + if sig == nil { + return nil, errors.New("empty body signature") + } + key, err := keys.NewPublicKeyFromBytes(sig.GetKey(), elliptic.P256()) + if err != nil { + return nil, fmt.Errorf("invalid signature key: %w", err) + } + + var userID user.ID + user.IDFromKey(&userID, (ecdsa.PublicKey)(*key)) + ownID := new(refs.OwnerID) + userID.WriteToV2(ownID) + + return ownID, nil +} diff --git a/pkg/services/object/put/prm.go b/pkg/services/object/put/prm.go index 52a7c102c..0c8f12b45 100644 --- a/pkg/services/object/put/prm.go +++ b/pkg/services/object/put/prm.go @@ -2,6 +2,7 @@ package putsvc import ( "context" + "crypto/ecdsa" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util" @@ -20,6 +21,8 @@ type PutInitPrm struct { traverseOpts []placement.Option relay func(context.Context, client.NodeInfo, client.MultiAddressClient) error + + privateKey *ecdsa.PrivateKey } type PutChunkPrm struct { @@ -65,3 +68,11 @@ func (p *PutChunkPrm) WithChunk(v []byte) *PutChunkPrm { return p } + +func (p *PutInitPrm) WithPrivateKey(v *ecdsa.PrivateKey) *PutInitPrm { + if p != nil { + p.privateKey = v + } + + return p +} diff --git a/pkg/services/object/put/streamer.go b/pkg/services/object/put/streamer.go index 4e655ed54..969c8fa19 100644 --- a/pkg/services/object/put/streamer.go +++ b/pkg/services/object/put/streamer.go @@ -47,6 +47,11 @@ func (p *Streamer) Init(ctx context.Context, prm *PutInitPrm) error { return nil } +// Target accesses underlying target chunked object writer. +func (p *Streamer) Target() transformer.ChunkedObjectWriter { + return p.target +} + // MaxObjectSize returns maximum payload size for the streaming session. // // Must be called after the successful Init. @@ -79,11 +84,15 @@ func (p *Streamer) initTarget(prm *PutInitPrm) error { func (p *Streamer) initUntrustedTarget(prm *PutInitPrm) error { p.relay = prm.relay - nodeKey, err := p.cfg.keyStorage.GetKey(nil) - if err != nil { - return err + if prm.privateKey != nil { + p.privateKey = prm.privateKey + } else { + nodeKey, err := p.cfg.keyStorage.GetKey(nil) + if err != nil { + return err + } + p.privateKey = nodeKey } - p.privateKey = nodeKey // prepare untrusted-Put object target p.target = &validatingPreparedTarget{ @@ -136,7 +145,11 @@ func (p *Streamer) initTrustedTarget(prm *PutInitPrm) error { } } - p.privateKey = key + if prm.privateKey != nil { + p.privateKey = prm.privateKey + } else { + p.privateKey = key + } p.target = &validatingTarget{ fmt: p.fmtValidator, nextTarget: transformer.NewPayloadSizeLimiter(transformer.Params{ diff --git a/pkg/services/object/response.go b/pkg/services/object/response.go index c85259c1f..d7ba9f843 100644 --- a/pkg/services/object/response.go +++ b/pkg/services/object/response.go @@ -38,7 +38,7 @@ type putStreamResponser struct { } type patchStreamResponser struct { - stream PatchObjectstream + stream PatchObjectStream respSvc *response.Service } @@ -109,7 +109,7 @@ func (s *patchStreamResponser) CloseAndRecv(ctx context.Context) (*object.PatchR return r, nil } -func (s *ResponseService) Patch() (PatchObjectstream, error) { +func (s *ResponseService) Patch() (PatchObjectStream, error) { stream, err := s.svc.Patch() if err != nil { return nil, fmt.Errorf("could not create Put object streamer: %w", err) diff --git a/pkg/services/object/server.go b/pkg/services/object/server.go index c1b036ab3..da98ce245 100644 --- a/pkg/services/object/server.go +++ b/pkg/services/object/server.go @@ -31,8 +31,8 @@ type PutObjectStream interface { CloseAndRecv(context.Context) (*object.PutResponse, error) } -// PatchObjectstream is an interface of FrostFS API v2 compatible patch streamer. -type PatchObjectstream interface { +// PatchObjectStream is an interface of FrostFS API v2 compatible patch streamer. +type PatchObjectStream interface { Send(context.Context, *object.PatchRequest) error CloseAndRecv(context.Context) (*object.PatchResponse, error) } @@ -42,7 +42,7 @@ type PatchObjectstream interface { type ServiceServer interface { Get(*object.GetRequest, GetObjectStream) error Put() (PutObjectStream, error) - Patch() (PatchObjectstream, error) + Patch() (PatchObjectStream, error) Head(context.Context, *object.HeadRequest) (*object.HeadResponse, error) Search(*object.SearchRequest, SearchStream) error Delete(context.Context, *object.DeleteRequest) (*object.DeleteResponse, error) diff --git a/pkg/services/object/sign.go b/pkg/services/object/sign.go index 631c539af..35367aafe 100644 --- a/pkg/services/object/sign.go +++ b/pkg/services/object/sign.go @@ -37,7 +37,7 @@ type putStreamSigner struct { type patchStreamSigner struct { sigSvc *util.SignService - stream PatchObjectstream + stream PatchObjectStream err error } @@ -142,7 +142,7 @@ func (s *patchStreamSigner) CloseAndRecv(ctx context.Context) (resp *object.Patc return resp, s.sigSvc.SignResponse(resp, err) } -func (s *SignService) Patch() (PatchObjectstream, error) { +func (s *SignService) Patch() (PatchObjectStream, error) { stream, err := s.svc.Patch() if err != nil { return nil, fmt.Errorf("could not create Put object streamer: %w", err) diff --git a/pkg/services/object/transport_splitter.go b/pkg/services/object/transport_splitter.go index 5acfac06b..e560d6d8c 100644 --- a/pkg/services/object/transport_splitter.go +++ b/pkg/services/object/transport_splitter.go @@ -91,7 +91,7 @@ func (c TransportSplitter) Put() (PutObjectStream, error) { return c.next.Put() } -func (c TransportSplitter) Patch() (PatchObjectstream, error) { +func (c TransportSplitter) Patch() (PatchObjectStream, error) { return c.next.Patch() }