forked from TrueCloudLab/frostfs-node
[#1307] object: Implement Patch
method
Signed-off-by: Airat Arifullin <a.arifullin@yadro.com>
This commit is contained in:
parent
2802f8c37a
commit
19caf3913e
19 changed files with 430 additions and 81 deletions
|
@ -74,7 +74,7 @@ func (s *objectSvc) Put() (objectService.PutObjectStream, error) {
|
||||||
return s.put.Put()
|
return s.put.Put()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *objectSvc) Patch() (objectService.PatchObjectstream, error) {
|
func (s *objectSvc) Patch() (objectService.PatchObjectStream, error) {
|
||||||
return s.patch.Patch()
|
return s.patch.Patch()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -249,24 +249,8 @@ func (b Service) Put() (object.PutObjectStream, error) {
|
||||||
}, err
|
}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
type patchStreamBasicChecker struct {
|
func (b Service) Patch() (object.PatchObjectStream, error) {
|
||||||
next object.PatchObjectstream
|
return b.next.Patch()
|
||||||
}
|
|
||||||
|
|
||||||
func (p patchStreamBasicChecker) Send(ctx context.Context, request *objectV2.PatchRequest) error {
|
|
||||||
return p.next.Send(ctx, request)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p patchStreamBasicChecker) CloseAndRecv(ctx context.Context) (*objectV2.PatchResponse, error) {
|
|
||||||
return p.next.CloseAndRecv(ctx)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (b Service) Patch() (object.PatchObjectstream, error) {
|
|
||||||
streamer, err := b.next.Patch()
|
|
||||||
|
|
||||||
return &patchStreamBasicChecker{
|
|
||||||
next: streamer,
|
|
||||||
}, err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b Service) Head(
|
func (b Service) Head(
|
||||||
|
|
|
@ -174,7 +174,7 @@ func isOwnerFromKey(id user.ID, key *keys.PublicKey) bool {
|
||||||
func assertVerb(tok sessionSDK.Object, op acl.Op) bool {
|
func assertVerb(tok sessionSDK.Object, op acl.Op) bool {
|
||||||
switch op {
|
switch op {
|
||||||
case acl.OpObjectPut:
|
case acl.OpObjectPut:
|
||||||
return tok.AssertVerb(sessionSDK.VerbObjectPut, sessionSDK.VerbObjectDelete)
|
return tok.AssertVerb(sessionSDK.VerbObjectPut, sessionSDK.VerbObjectDelete, sessionSDK.VerbObjectPatch)
|
||||||
case acl.OpObjectDelete:
|
case acl.OpObjectDelete:
|
||||||
return tok.AssertVerb(sessionSDK.VerbObjectDelete)
|
return tok.AssertVerb(sessionSDK.VerbObjectDelete)
|
||||||
case acl.OpObjectGet:
|
case acl.OpObjectGet:
|
||||||
|
@ -185,11 +185,13 @@ func assertVerb(tok sessionSDK.Object, op acl.Op) bool {
|
||||||
sessionSDK.VerbObjectGet,
|
sessionSDK.VerbObjectGet,
|
||||||
sessionSDK.VerbObjectDelete,
|
sessionSDK.VerbObjectDelete,
|
||||||
sessionSDK.VerbObjectRange,
|
sessionSDK.VerbObjectRange,
|
||||||
sessionSDK.VerbObjectRangeHash)
|
sessionSDK.VerbObjectRangeHash,
|
||||||
|
sessionSDK.VerbObjectPatch,
|
||||||
|
)
|
||||||
case acl.OpObjectSearch:
|
case acl.OpObjectSearch:
|
||||||
return tok.AssertVerb(sessionSDK.VerbObjectSearch, sessionSDK.VerbObjectDelete)
|
return tok.AssertVerb(sessionSDK.VerbObjectSearch, sessionSDK.VerbObjectDelete)
|
||||||
case acl.OpObjectRange:
|
case acl.OpObjectRange:
|
||||||
return tok.AssertVerb(sessionSDK.VerbObjectRange, sessionSDK.VerbObjectRangeHash)
|
return tok.AssertVerb(sessionSDK.VerbObjectRange, sessionSDK.VerbObjectRangeHash, sessionSDK.VerbObjectPatch)
|
||||||
case acl.OpObjectHash:
|
case acl.OpObjectHash:
|
||||||
return tok.AssertVerb(sessionSDK.VerbObjectRangeHash)
|
return tok.AssertVerb(sessionSDK.VerbObjectRangeHash)
|
||||||
}
|
}
|
||||||
|
|
|
@ -518,7 +518,22 @@ func TestAPECheck_BearerTokenOverrides(t *testing.T) {
|
||||||
ls := inmemory.NewInmemoryLocalStorage()
|
ls := inmemory.NewInmemoryLocalStorage()
|
||||||
ms := inmemory.NewInmemoryMorphRuleChainStorage()
|
ms := inmemory.NewInmemoryMorphRuleChainStorage()
|
||||||
|
|
||||||
checker := NewChecker(ls, ms, headerProvider, frostfsidProvider, nil, &stMock{}, nil, nil)
|
node1Key, err := keys.NewPrivateKey()
|
||||||
|
require.NoError(t, err)
|
||||||
|
node1 := netmapSDK.NodeInfo{}
|
||||||
|
node1.SetPublicKey(node1Key.PublicKey().Bytes())
|
||||||
|
netmap := &netmapSDK.NetMap{}
|
||||||
|
netmap.SetEpoch(100)
|
||||||
|
netmap.SetNodes([]netmapSDK.NodeInfo{node1})
|
||||||
|
|
||||||
|
nm := &netmapStub{
|
||||||
|
currentEpoch: 100,
|
||||||
|
netmaps: map[uint64]*netmapSDK.NetMap{
|
||||||
|
100: netmap,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
checker := NewChecker(ls, ms, headerProvider, frostfsidProvider, nm, &stMock{}, nil, nil)
|
||||||
|
|
||||||
prm := Prm{
|
prm := Prm{
|
||||||
Method: method,
|
Method: method,
|
||||||
|
@ -541,7 +556,7 @@ func TestAPECheck_BearerTokenOverrides(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
err := checker.CheckAPE(context.Background(), prm)
|
err = checker.CheckAPE(context.Background(), prm)
|
||||||
if test.expectAPEErr {
|
if test.expectAPEErr {
|
||||||
require.Error(t, err)
|
require.Error(t, err)
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -204,24 +204,8 @@ func (c *Service) Put() (objectSvc.PutObjectStream, error) {
|
||||||
}, err
|
}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
type patchStreamBasicChecker struct {
|
func (c *Service) Patch() (objectSvc.PatchObjectStream, error) {
|
||||||
next objectSvc.PatchObjectstream
|
return c.next.Patch()
|
||||||
}
|
|
||||||
|
|
||||||
func (p *patchStreamBasicChecker) Send(ctx context.Context, request *objectV2.PatchRequest) error {
|
|
||||||
return p.next.Send(ctx, request)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p patchStreamBasicChecker) CloseAndRecv(ctx context.Context) (*objectV2.PatchResponse, error) {
|
|
||||||
return p.next.CloseAndRecv(ctx)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *Service) Patch() (objectSvc.PatchObjectstream, error) {
|
|
||||||
streamer, err := c.next.Patch()
|
|
||||||
|
|
||||||
return &patchStreamBasicChecker{
|
|
||||||
next: streamer,
|
|
||||||
}, err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Service) Head(ctx context.Context, request *objectV2.HeadRequest) (*objectV2.HeadResponse, error) {
|
func (c *Service) Head(ctx context.Context, request *objectV2.HeadRequest) (*objectV2.HeadResponse, error) {
|
||||||
|
|
|
@ -172,16 +172,18 @@ func (a *auditPutStream) Send(ctx context.Context, req *object.PutRequest) error
|
||||||
}
|
}
|
||||||
|
|
||||||
type auditPatchStream struct {
|
type auditPatchStream struct {
|
||||||
stream PatchObjectstream
|
stream PatchObjectStream
|
||||||
log *logger.Logger
|
log *logger.Logger
|
||||||
|
|
||||||
failed bool
|
failed bool
|
||||||
key []byte
|
key []byte
|
||||||
containerID *refs.ContainerID
|
containerID *refs.ContainerID
|
||||||
objectID *refs.ObjectID
|
objectID *refs.ObjectID
|
||||||
|
|
||||||
|
nonFirstSend bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *auditService) Patch() (PatchObjectstream, error) {
|
func (a *auditService) Patch() (PatchObjectStream, error) {
|
||||||
res, err := a.next.Patch()
|
res, err := a.next.Patch()
|
||||||
if !a.enabled.Load() {
|
if !a.enabled.Load() {
|
||||||
return res, err
|
return res, err
|
||||||
|
@ -196,7 +198,7 @@ func (a *auditService) Patch() (PatchObjectstream, error) {
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// CloseAndRecv implements PutObjectStream.
|
// CloseAndRecv implements PatchObjectStream.
|
||||||
func (a *auditPatchStream) CloseAndRecv(ctx context.Context) (*object.PatchResponse, error) {
|
func (a *auditPatchStream) CloseAndRecv(ctx context.Context) (*object.PatchResponse, error) {
|
||||||
resp, err := a.stream.CloseAndRecv(ctx)
|
resp, err := a.stream.CloseAndRecv(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -209,11 +211,14 @@ func (a *auditPatchStream) CloseAndRecv(ctx context.Context) (*object.PatchRespo
|
||||||
return resp, err
|
return resp, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Send implements PutObjectStream.
|
// Send implements PatchObjectStream.
|
||||||
func (a *auditPatchStream) Send(ctx context.Context, req *object.PatchRequest) error {
|
func (a *auditPatchStream) Send(ctx context.Context, req *object.PatchRequest) error {
|
||||||
a.containerID = req.GetBody().GetAddress().GetContainerID()
|
if !a.nonFirstSend {
|
||||||
a.objectID = req.GetBody().GetAddress().GetObjectID()
|
a.containerID = req.GetBody().GetAddress().GetContainerID()
|
||||||
a.key = req.GetVerificationHeader().GetBodySignature().GetKey()
|
a.objectID = req.GetBody().GetAddress().GetObjectID()
|
||||||
|
a.key = req.GetVerificationHeader().GetBodySignature().GetKey()
|
||||||
|
a.nonFirstSend = true
|
||||||
|
}
|
||||||
|
|
||||||
err := a.stream.Send(ctx, req)
|
err := a.stream.Send(ctx, req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -48,7 +48,7 @@ func (x *Common) Put() (PutObjectStream, error) {
|
||||||
return x.nextHandler.Put()
|
return x.nextHandler.Put()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (x *Common) Patch() (PatchObjectstream, error) {
|
func (x *Common) Patch() (PatchObjectStream, error) {
|
||||||
if x.state.IsMaintenance() {
|
if x.state.IsMaintenance() {
|
||||||
return nil, new(apistatus.NodeUnderMaintenance)
|
return nil, new(apistatus.NodeUnderMaintenance)
|
||||||
}
|
}
|
||||||
|
|
|
@ -124,6 +124,10 @@ func (p *commonPrm) SetRequestForwarder(f RequestForwarder) {
|
||||||
p.forwarder = f
|
p.forwarder = f
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (p *commonPrm) SetSignerKey(signerKey *ecdsa.PrivateKey) {
|
||||||
|
p.signerKey = signerKey
|
||||||
|
}
|
||||||
|
|
||||||
// WithAddress sets object address to be read.
|
// WithAddress sets object address to be read.
|
||||||
func (p *commonPrm) WithAddress(addr oid.Address) {
|
func (p *commonPrm) WithAddress(addr oid.Address) {
|
||||||
p.addr = addr
|
p.addr = addr
|
||||||
|
|
|
@ -28,7 +28,7 @@ type (
|
||||||
}
|
}
|
||||||
|
|
||||||
patchStreamMetric struct {
|
patchStreamMetric struct {
|
||||||
stream PatchObjectstream
|
stream PatchObjectStream
|
||||||
metrics MetricRegister
|
metrics MetricRegister
|
||||||
start time.Time
|
start time.Time
|
||||||
}
|
}
|
||||||
|
@ -82,7 +82,7 @@ func (m MetricCollector) Put() (PutObjectStream, error) {
|
||||||
return m.next.Put()
|
return m.next.Put()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m MetricCollector) Patch() (PatchObjectstream, error) {
|
func (m MetricCollector) Patch() (PatchObjectStream, error) {
|
||||||
if m.enabled {
|
if m.enabled {
|
||||||
t := time.Now()
|
t := time.Now()
|
||||||
|
|
||||||
|
@ -214,7 +214,7 @@ func (s putStreamMetric) CloseAndRecv(ctx context.Context) (*object.PutResponse,
|
||||||
return res, err
|
return res, err
|
||||||
}
|
}
|
||||||
func (s patchStreamMetric) Send(ctx context.Context, req *object.PatchRequest) error {
|
func (s patchStreamMetric) Send(ctx context.Context, req *object.PatchRequest) error {
|
||||||
s.metrics.AddPayloadSize("Patch", len(req.GetBody().GetPatch().Chunk))
|
s.metrics.AddPayloadSize("Patch", len(req.GetBody().GetPatch().GetChunk()))
|
||||||
|
|
||||||
return s.stream.Send(ctx, req)
|
return s.stream.Send(ctx, req)
|
||||||
}
|
}
|
||||||
|
|
63
pkg/services/object/patch/range_provider.go
Normal file
63
pkg/services/object/patch/range_provider.go
Normal file
|
@ -0,0 +1,63 @@
|
||||||
|
package patchsvc
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"crypto/ecdsa"
|
||||||
|
"io"
|
||||||
|
|
||||||
|
getsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/get"
|
||||||
|
objectUtil "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
|
||||||
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
|
patcherSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/patcher"
|
||||||
|
)
|
||||||
|
|
||||||
|
func (p *pipeChunkWriter) WriteChunk(_ context.Context, chunk []byte) error {
|
||||||
|
_, err := p.wr.Write(chunk)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
type rangeProvider struct {
|
||||||
|
getSvc *getsvc.Service
|
||||||
|
|
||||||
|
addr oid.Address
|
||||||
|
|
||||||
|
commonPrm *objectUtil.CommonPrm
|
||||||
|
|
||||||
|
localNodeKey *ecdsa.PrivateKey
|
||||||
|
}
|
||||||
|
|
||||||
|
var _ patcherSDK.RangeProvider = (*rangeProvider)(nil)
|
||||||
|
|
||||||
|
func (r *rangeProvider) GetRange(ctx context.Context, rng *objectSDK.Range) io.Reader {
|
||||||
|
pipeReader, pipeWriter := io.Pipe()
|
||||||
|
|
||||||
|
var rngPrm getsvc.RangePrm
|
||||||
|
rngPrm.SetSignerKey(r.localNodeKey)
|
||||||
|
rngPrm.SetCommonParameters(r.commonPrm)
|
||||||
|
|
||||||
|
rngPrm.WithAddress(r.addr)
|
||||||
|
rngPrm.SetChunkWriter(&pipeChunkWriter{
|
||||||
|
wr: pipeWriter,
|
||||||
|
})
|
||||||
|
rngPrm.SetRange(rng)
|
||||||
|
|
||||||
|
getRangeErr := make(chan error)
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
defer pipeWriter.Close()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
pipeWriter.CloseWithError(ctx.Err())
|
||||||
|
case err := <-getRangeErr:
|
||||||
|
pipeWriter.CloseWithError(err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
getRangeErr <- r.getSvc.GetRange(ctx, rngPrm)
|
||||||
|
}()
|
||||||
|
|
||||||
|
return pipeReader
|
||||||
|
}
|
|
@ -9,14 +9,36 @@ import (
|
||||||
|
|
||||||
// Service implements Put operation of Object service v2.
|
// Service implements Put operation of Object service v2.
|
||||||
type Service struct {
|
type Service struct {
|
||||||
|
keyStorage *util.KeyStorage
|
||||||
|
|
||||||
|
getSvc *getsvc.Service
|
||||||
|
|
||||||
|
putSvc *putsvc.Service
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewService constructs Service instance from provided options.
|
// NewService constructs Service instance from provided options.
|
||||||
func NewService(_ *util.KeyStorage, _ *getsvc.Service, _ *putsvc.Service) *Service {
|
func NewService(ks *util.KeyStorage, getSvc *getsvc.Service, putSvc *putsvc.Service) *Service {
|
||||||
return &Service{}
|
return &Service{
|
||||||
|
keyStorage: ks,
|
||||||
|
|
||||||
|
getSvc: getSvc,
|
||||||
|
|
||||||
|
putSvc: putSvc,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Put calls internal service and returns v2 object streamer.
|
// Put calls internal service and returns v2 object streamer.
|
||||||
func (s *Service) Patch() (object.PatchObjectstream, error) {
|
func (s *Service) Patch() (object.PatchObjectStream, error) {
|
||||||
return &Streamer{}, nil
|
nodeKey, err := s.keyStorage.GetKey(nil)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return &Streamer{
|
||||||
|
getSvc: s.getSvc,
|
||||||
|
|
||||||
|
putSvc: s.putSvc,
|
||||||
|
|
||||||
|
localNodeKey: nodeKey,
|
||||||
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -2,27 +2,220 @@ package patchsvc
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"crypto/ecdsa"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
|
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
|
||||||
|
refsV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/refs"
|
||||||
|
getsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/get"
|
||||||
|
putsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/put"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||||
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/patcher"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Streamer for the patch handler is a pipeline that merges two incoming
|
// Streamer for the patch handler is a pipeline that merges two incoming streams of patches
|
||||||
// streams of patches and original object payload chunks.
|
// and original object payload chunks. The merged result is fed to Put stream target.
|
||||||
// The merged result is fed to Put stream target.
|
type Streamer struct {
|
||||||
type Streamer struct{}
|
// Patcher must be initialized at first Streamer.Send call.
|
||||||
|
patcher patcher.PatchApplier
|
||||||
|
|
||||||
func (s *Streamer) Send(ctx context.Context, _ *object.PatchRequest) error {
|
nonFirstSend bool
|
||||||
_, span := tracing.StartSpanFromContext(ctx, "patch.streamer.Send")
|
|
||||||
|
getSvc *getsvc.Service
|
||||||
|
|
||||||
|
putSvc *putsvc.Service
|
||||||
|
|
||||||
|
localNodeKey *ecdsa.PrivateKey
|
||||||
|
}
|
||||||
|
|
||||||
|
type pipeChunkWriter struct {
|
||||||
|
wr *io.PipeWriter
|
||||||
|
}
|
||||||
|
|
||||||
|
type headResponseWriter struct {
|
||||||
|
body *objectV2.HeadResponseBody
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *headResponseWriter) WriteHeader(_ context.Context, hdr *objectSDK.Object) error {
|
||||||
|
w.body.SetHeaderPart(toFullObjectHeader(hdr))
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func toFullObjectHeader(hdr *objectSDK.Object) objectV2.GetHeaderPart {
|
||||||
|
obj := hdr.ToV2()
|
||||||
|
|
||||||
|
hs := new(objectV2.HeaderWithSignature)
|
||||||
|
hs.SetHeader(obj.GetHeader())
|
||||||
|
hs.SetSignature(obj.GetSignature())
|
||||||
|
|
||||||
|
return hs
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Streamer) init(ctx context.Context, req *objectV2.PatchRequest) error {
|
||||||
|
hdrWithSig, addr, err := s.readHeader(ctx, req)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
commonPrm, err := util.CommonPrmFromV2(req)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
commonPrm.WithLocalOnly(false)
|
||||||
|
|
||||||
|
rangeProvider := &rangeProvider{
|
||||||
|
getSvc: s.getSvc,
|
||||||
|
|
||||||
|
addr: addr,
|
||||||
|
|
||||||
|
commonPrm: commonPrm,
|
||||||
|
|
||||||
|
localNodeKey: s.localNodeKey,
|
||||||
|
}
|
||||||
|
|
||||||
|
putstm, err := s.putSvc.Put()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
hdr := hdrWithSig.GetHeader()
|
||||||
|
oV2 := new(objectV2.Object)
|
||||||
|
hV2 := new(objectV2.Header)
|
||||||
|
oV2.SetHeader(hV2)
|
||||||
|
oV2.GetHeader().SetContainerID(hdr.GetContainerID())
|
||||||
|
oV2.GetHeader().SetPayloadLength(hdr.GetPayloadLength())
|
||||||
|
oV2.GetHeader().SetAttributes(hdr.GetAttributes())
|
||||||
|
|
||||||
|
ownerID, err := newOwnerID(req.GetVerificationHeader())
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
oV2.GetHeader().SetOwnerID(ownerID)
|
||||||
|
|
||||||
|
prm, err := s.putInitPrm(req, oV2)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
err = putstm.Init(ctx, prm)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
patcherPrm := patcher.Params{
|
||||||
|
Header: objectSDK.NewFromV2(oV2),
|
||||||
|
|
||||||
|
RangeProvider: rangeProvider,
|
||||||
|
|
||||||
|
ObjectWriter: putstm.Target(),
|
||||||
|
}
|
||||||
|
|
||||||
|
s.patcher = patcher.New(patcherPrm)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Streamer) readHeader(ctx context.Context, req *objectV2.PatchRequest) (hdrWithSig *objectV2.HeaderWithSignature, addr oid.Address, err error) {
|
||||||
|
addrV2 := req.GetBody().GetAddress()
|
||||||
|
if addrV2 == nil {
|
||||||
|
err = errors.New("patch request has nil-address")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if err = addr.ReadFromV2(*addrV2); err != nil {
|
||||||
|
err = fmt.Errorf("read address error: %w", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
commonPrm, err := util.CommonPrmFromV2(req)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
commonPrm.WithLocalOnly(false)
|
||||||
|
|
||||||
|
var p getsvc.HeadPrm
|
||||||
|
p.SetSignerKey(s.localNodeKey)
|
||||||
|
p.SetCommonParameters(commonPrm)
|
||||||
|
|
||||||
|
resp := new(objectV2.HeadResponse)
|
||||||
|
resp.SetBody(new(objectV2.HeadResponseBody))
|
||||||
|
|
||||||
|
p.WithAddress(addr)
|
||||||
|
p.SetHeaderWriter(&headResponseWriter{
|
||||||
|
body: resp.GetBody(),
|
||||||
|
})
|
||||||
|
|
||||||
|
err = s.getSvc.Head(ctx, p)
|
||||||
|
if err != nil {
|
||||||
|
err = fmt.Errorf("get header error: %w", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
var ok bool
|
||||||
|
hdrPart := resp.GetBody().GetHeaderPart()
|
||||||
|
if hdrWithSig, ok = hdrPart.(*objectV2.HeaderWithSignature); !ok {
|
||||||
|
err = fmt.Errorf("unexpected header type: %T", hdrPart)
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Streamer) Send(ctx context.Context, req *objectV2.PatchRequest) error {
|
||||||
|
ctx, span := tracing.StartSpanFromContext(ctx, "patch.streamer.Send")
|
||||||
defer span.End()
|
defer span.End()
|
||||||
|
|
||||||
|
defer func() {
|
||||||
|
s.nonFirstSend = true
|
||||||
|
}()
|
||||||
|
|
||||||
|
if !s.nonFirstSend {
|
||||||
|
if err := s.init(ctx, req); err != nil {
|
||||||
|
return fmt.Errorf("streamer init error: %w", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
patch := new(objectSDK.Patch)
|
||||||
|
patch.FromV2(req.GetBody())
|
||||||
|
|
||||||
|
if !s.nonFirstSend {
|
||||||
|
err := s.patcher.ApplyAttributesPatch(ctx, patch.NewAttributes, patch.ReplaceAttributes)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("patch attributes: %w", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if patch.PayloadPatch != nil {
|
||||||
|
err := s.patcher.ApplyPayloadPatch(ctx, patch.PayloadPatch)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("patch payload: %w", err)
|
||||||
|
}
|
||||||
|
} else if s.nonFirstSend {
|
||||||
|
return errors.New("invalid non-first patch: empty payload")
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Streamer) CloseAndRecv(_ context.Context) (*object.PatchResponse, error) {
|
func (s *Streamer) CloseAndRecv(ctx context.Context) (*objectV2.PatchResponse, error) {
|
||||||
return &object.PatchResponse{
|
patcherResp, err := s.patcher.Close(ctx)
|
||||||
Body: &object.PatchResponseBody{
|
if err != nil {
|
||||||
ObjectID: nil,
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
oidV2 := new(refsV2.ObjectID)
|
||||||
|
|
||||||
|
if patcherResp.AccessIdentifiers.ParentID != nil {
|
||||||
|
patcherResp.AccessIdentifiers.ParentID.WriteToV2(oidV2)
|
||||||
|
} else {
|
||||||
|
patcherResp.AccessIdentifiers.SelfID.WriteToV2(oidV2)
|
||||||
|
}
|
||||||
|
|
||||||
|
return &objectV2.PatchResponse{
|
||||||
|
Body: &objectV2.PatchResponseBody{
|
||||||
|
ObjectID: oidV2,
|
||||||
},
|
},
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
53
pkg/services/object/patch/util.go
Normal file
53
pkg/services/object/patch/util.go
Normal file
|
@ -0,0 +1,53 @@
|
||||||
|
package patchsvc
|
||||||
|
|
||||||
|
import (
|
||||||
|
"crypto/ecdsa"
|
||||||
|
"crypto/elliptic"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/refs"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/session"
|
||||||
|
putsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/put"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
|
||||||
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
|
||||||
|
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
||||||
|
)
|
||||||
|
|
||||||
|
// putInitPrm initializes put paramerer for Put stream.
|
||||||
|
func (s *Streamer) putInitPrm(req *objectV2.PatchRequest, obj *objectV2.Object) (*putsvc.PutInitPrm, error) {
|
||||||
|
commonPrm, err := util.CommonPrmFromV2(req)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
prm := new(putsvc.PutInitPrm)
|
||||||
|
prm.WithObject(objectSDK.NewFromV2(obj)).
|
||||||
|
WithCommonPrm(commonPrm).
|
||||||
|
WithPrivateKey(s.localNodeKey)
|
||||||
|
|
||||||
|
return prm, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func newOwnerID(vh *session.RequestVerificationHeader) (*refs.OwnerID, error) {
|
||||||
|
for vh.GetOrigin() != nil {
|
||||||
|
vh = vh.GetOrigin()
|
||||||
|
}
|
||||||
|
sig := vh.GetBodySignature()
|
||||||
|
if sig == nil {
|
||||||
|
return nil, errors.New("empty body signature")
|
||||||
|
}
|
||||||
|
key, err := keys.NewPublicKeyFromBytes(sig.GetKey(), elliptic.P256())
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("invalid signature key: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
var userID user.ID
|
||||||
|
user.IDFromKey(&userID, (ecdsa.PublicKey)(*key))
|
||||||
|
ownID := new(refs.OwnerID)
|
||||||
|
userID.WriteToV2(ownID)
|
||||||
|
|
||||||
|
return ownID, nil
|
||||||
|
}
|
|
@ -2,6 +2,7 @@ package putsvc
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"crypto/ecdsa"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
|
||||||
|
@ -20,6 +21,8 @@ type PutInitPrm struct {
|
||||||
traverseOpts []placement.Option
|
traverseOpts []placement.Option
|
||||||
|
|
||||||
relay func(context.Context, client.NodeInfo, client.MultiAddressClient) error
|
relay func(context.Context, client.NodeInfo, client.MultiAddressClient) error
|
||||||
|
|
||||||
|
privateKey *ecdsa.PrivateKey
|
||||||
}
|
}
|
||||||
|
|
||||||
type PutChunkPrm struct {
|
type PutChunkPrm struct {
|
||||||
|
@ -65,3 +68,11 @@ func (p *PutChunkPrm) WithChunk(v []byte) *PutChunkPrm {
|
||||||
|
|
||||||
return p
|
return p
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (p *PutInitPrm) WithPrivateKey(v *ecdsa.PrivateKey) *PutInitPrm {
|
||||||
|
if p != nil {
|
||||||
|
p.privateKey = v
|
||||||
|
}
|
||||||
|
|
||||||
|
return p
|
||||||
|
}
|
||||||
|
|
|
@ -47,6 +47,11 @@ func (p *Streamer) Init(ctx context.Context, prm *PutInitPrm) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Target accesses underlying target chunked object writer.
|
||||||
|
func (p *Streamer) Target() transformer.ChunkedObjectWriter {
|
||||||
|
return p.target
|
||||||
|
}
|
||||||
|
|
||||||
// MaxObjectSize returns maximum payload size for the streaming session.
|
// MaxObjectSize returns maximum payload size for the streaming session.
|
||||||
//
|
//
|
||||||
// Must be called after the successful Init.
|
// Must be called after the successful Init.
|
||||||
|
@ -79,11 +84,15 @@ func (p *Streamer) initTarget(prm *PutInitPrm) error {
|
||||||
func (p *Streamer) initUntrustedTarget(prm *PutInitPrm) error {
|
func (p *Streamer) initUntrustedTarget(prm *PutInitPrm) error {
|
||||||
p.relay = prm.relay
|
p.relay = prm.relay
|
||||||
|
|
||||||
nodeKey, err := p.cfg.keyStorage.GetKey(nil)
|
if prm.privateKey != nil {
|
||||||
if err != nil {
|
p.privateKey = prm.privateKey
|
||||||
return err
|
} else {
|
||||||
|
nodeKey, err := p.cfg.keyStorage.GetKey(nil)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
p.privateKey = nodeKey
|
||||||
}
|
}
|
||||||
p.privateKey = nodeKey
|
|
||||||
|
|
||||||
// prepare untrusted-Put object target
|
// prepare untrusted-Put object target
|
||||||
p.target = &validatingPreparedTarget{
|
p.target = &validatingPreparedTarget{
|
||||||
|
@ -136,7 +145,11 @@ func (p *Streamer) initTrustedTarget(prm *PutInitPrm) error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
p.privateKey = key
|
if prm.privateKey != nil {
|
||||||
|
p.privateKey = prm.privateKey
|
||||||
|
} else {
|
||||||
|
p.privateKey = key
|
||||||
|
}
|
||||||
p.target = &validatingTarget{
|
p.target = &validatingTarget{
|
||||||
fmt: p.fmtValidator,
|
fmt: p.fmtValidator,
|
||||||
nextTarget: transformer.NewPayloadSizeLimiter(transformer.Params{
|
nextTarget: transformer.NewPayloadSizeLimiter(transformer.Params{
|
||||||
|
|
|
@ -38,7 +38,7 @@ type putStreamResponser struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
type patchStreamResponser struct {
|
type patchStreamResponser struct {
|
||||||
stream PatchObjectstream
|
stream PatchObjectStream
|
||||||
respSvc *response.Service
|
respSvc *response.Service
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -109,7 +109,7 @@ func (s *patchStreamResponser) CloseAndRecv(ctx context.Context) (*object.PatchR
|
||||||
return r, nil
|
return r, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *ResponseService) Patch() (PatchObjectstream, error) {
|
func (s *ResponseService) Patch() (PatchObjectStream, error) {
|
||||||
stream, err := s.svc.Patch()
|
stream, err := s.svc.Patch()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("could not create Put object streamer: %w", err)
|
return nil, fmt.Errorf("could not create Put object streamer: %w", err)
|
||||||
|
|
|
@ -31,8 +31,8 @@ type PutObjectStream interface {
|
||||||
CloseAndRecv(context.Context) (*object.PutResponse, error)
|
CloseAndRecv(context.Context) (*object.PutResponse, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
// PatchObjectstream is an interface of FrostFS API v2 compatible patch streamer.
|
// PatchObjectStream is an interface of FrostFS API v2 compatible patch streamer.
|
||||||
type PatchObjectstream interface {
|
type PatchObjectStream interface {
|
||||||
Send(context.Context, *object.PatchRequest) error
|
Send(context.Context, *object.PatchRequest) error
|
||||||
CloseAndRecv(context.Context) (*object.PatchResponse, error)
|
CloseAndRecv(context.Context) (*object.PatchResponse, error)
|
||||||
}
|
}
|
||||||
|
@ -42,7 +42,7 @@ type PatchObjectstream interface {
|
||||||
type ServiceServer interface {
|
type ServiceServer interface {
|
||||||
Get(*object.GetRequest, GetObjectStream) error
|
Get(*object.GetRequest, GetObjectStream) error
|
||||||
Put() (PutObjectStream, error)
|
Put() (PutObjectStream, error)
|
||||||
Patch() (PatchObjectstream, error)
|
Patch() (PatchObjectStream, error)
|
||||||
Head(context.Context, *object.HeadRequest) (*object.HeadResponse, error)
|
Head(context.Context, *object.HeadRequest) (*object.HeadResponse, error)
|
||||||
Search(*object.SearchRequest, SearchStream) error
|
Search(*object.SearchRequest, SearchStream) error
|
||||||
Delete(context.Context, *object.DeleteRequest) (*object.DeleteResponse, error)
|
Delete(context.Context, *object.DeleteRequest) (*object.DeleteResponse, error)
|
||||||
|
|
|
@ -37,7 +37,7 @@ type putStreamSigner struct {
|
||||||
|
|
||||||
type patchStreamSigner struct {
|
type patchStreamSigner struct {
|
||||||
sigSvc *util.SignService
|
sigSvc *util.SignService
|
||||||
stream PatchObjectstream
|
stream PatchObjectStream
|
||||||
err error
|
err error
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -142,7 +142,7 @@ func (s *patchStreamSigner) CloseAndRecv(ctx context.Context) (resp *object.Patc
|
||||||
return resp, s.sigSvc.SignResponse(resp, err)
|
return resp, s.sigSvc.SignResponse(resp, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *SignService) Patch() (PatchObjectstream, error) {
|
func (s *SignService) Patch() (PatchObjectStream, error) {
|
||||||
stream, err := s.svc.Patch()
|
stream, err := s.svc.Patch()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("could not create Put object streamer: %w", err)
|
return nil, fmt.Errorf("could not create Put object streamer: %w", err)
|
||||||
|
|
|
@ -91,7 +91,7 @@ func (c TransportSplitter) Put() (PutObjectStream, error) {
|
||||||
return c.next.Put()
|
return c.next.Put()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c TransportSplitter) Patch() (PatchObjectstream, error) {
|
func (c TransportSplitter) Patch() (PatchObjectStream, error) {
|
||||||
return c.next.Patch()
|
return c.next.Patch()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue