[#1307] object: Implement Patch method

Signed-off-by: Airat Arifullin <a.arifullin@yadro.com>
This commit is contained in:
Airat Arifullin 2024-08-12 17:11:10 +03:00 committed by Evgenii Stratonikov
parent a4a1c3f18b
commit e890f1b4b1
19 changed files with 430 additions and 81 deletions

View file

@ -0,0 +1,63 @@
package patchsvc
import (
"context"
"crypto/ecdsa"
"io"
getsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/get"
objectUtil "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
patcherSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/patcher"
)
func (p *pipeChunkWriter) WriteChunk(_ context.Context, chunk []byte) error {
_, err := p.wr.Write(chunk)
return err
}
type rangeProvider struct {
getSvc *getsvc.Service
addr oid.Address
commonPrm *objectUtil.CommonPrm
localNodeKey *ecdsa.PrivateKey
}
var _ patcherSDK.RangeProvider = (*rangeProvider)(nil)
func (r *rangeProvider) GetRange(ctx context.Context, rng *objectSDK.Range) io.Reader {
pipeReader, pipeWriter := io.Pipe()
var rngPrm getsvc.RangePrm
rngPrm.SetSignerKey(r.localNodeKey)
rngPrm.SetCommonParameters(r.commonPrm)
rngPrm.WithAddress(r.addr)
rngPrm.SetChunkWriter(&pipeChunkWriter{
wr: pipeWriter,
})
rngPrm.SetRange(rng)
getRangeErr := make(chan error)
go func() {
defer pipeWriter.Close()
select {
case <-ctx.Done():
pipeWriter.CloseWithError(ctx.Err())
case err := <-getRangeErr:
pipeWriter.CloseWithError(err)
}
}()
go func() {
getRangeErr <- r.getSvc.GetRange(ctx, rngPrm)
}()
return pipeReader
}

View file

@ -9,14 +9,36 @@ import (
// Service implements Put operation of Object service v2.
type Service struct {
keyStorage *util.KeyStorage
getSvc *getsvc.Service
putSvc *putsvc.Service
}
// NewService constructs Service instance from provided options.
func NewService(_ *util.KeyStorage, _ *getsvc.Service, _ *putsvc.Service) *Service {
return &Service{}
func NewService(ks *util.KeyStorage, getSvc *getsvc.Service, putSvc *putsvc.Service) *Service {
return &Service{
keyStorage: ks,
getSvc: getSvc,
putSvc: putSvc,
}
}
// Put calls internal service and returns v2 object streamer.
func (s *Service) Patch() (object.PatchObjectstream, error) {
return &Streamer{}, nil
func (s *Service) Patch() (object.PatchObjectStream, error) {
nodeKey, err := s.keyStorage.GetKey(nil)
if err != nil {
return nil, err
}
return &Streamer{
getSvc: s.getSvc,
putSvc: s.putSvc,
localNodeKey: nodeKey,
}, nil
}

View file

@ -2,27 +2,220 @@ package patchsvc
import (
"context"
"crypto/ecdsa"
"errors"
"fmt"
"io"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
refsV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/refs"
getsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/get"
putsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/put"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/patcher"
)
// Streamer for the patch handler is a pipeline that merges two incoming
// streams of patches and original object payload chunks.
// The merged result is fed to Put stream target.
type Streamer struct{}
// Streamer for the patch handler is a pipeline that merges two incoming streams of patches
// and original object payload chunks. The merged result is fed to Put stream target.
type Streamer struct {
// Patcher must be initialized at first Streamer.Send call.
patcher patcher.PatchApplier
func (s *Streamer) Send(ctx context.Context, _ *object.PatchRequest) error {
_, span := tracing.StartSpanFromContext(ctx, "patch.streamer.Send")
nonFirstSend bool
getSvc *getsvc.Service
putSvc *putsvc.Service
localNodeKey *ecdsa.PrivateKey
}
type pipeChunkWriter struct {
wr *io.PipeWriter
}
type headResponseWriter struct {
body *objectV2.HeadResponseBody
}
func (w *headResponseWriter) WriteHeader(_ context.Context, hdr *objectSDK.Object) error {
w.body.SetHeaderPart(toFullObjectHeader(hdr))
return nil
}
func toFullObjectHeader(hdr *objectSDK.Object) objectV2.GetHeaderPart {
obj := hdr.ToV2()
hs := new(objectV2.HeaderWithSignature)
hs.SetHeader(obj.GetHeader())
hs.SetSignature(obj.GetSignature())
return hs
}
func (s *Streamer) init(ctx context.Context, req *objectV2.PatchRequest) error {
hdrWithSig, addr, err := s.readHeader(ctx, req)
if err != nil {
return err
}
commonPrm, err := util.CommonPrmFromV2(req)
if err != nil {
return err
}
commonPrm.WithLocalOnly(false)
rangeProvider := &rangeProvider{
getSvc: s.getSvc,
addr: addr,
commonPrm: commonPrm,
localNodeKey: s.localNodeKey,
}
putstm, err := s.putSvc.Put()
if err != nil {
return err
}
hdr := hdrWithSig.GetHeader()
oV2 := new(objectV2.Object)
hV2 := new(objectV2.Header)
oV2.SetHeader(hV2)
oV2.GetHeader().SetContainerID(hdr.GetContainerID())
oV2.GetHeader().SetPayloadLength(hdr.GetPayloadLength())
oV2.GetHeader().SetAttributes(hdr.GetAttributes())
ownerID, err := newOwnerID(req.GetVerificationHeader())
if err != nil {
return err
}
oV2.GetHeader().SetOwnerID(ownerID)
prm, err := s.putInitPrm(req, oV2)
if err != nil {
return err
}
err = putstm.Init(ctx, prm)
if err != nil {
return err
}
patcherPrm := patcher.Params{
Header: objectSDK.NewFromV2(oV2),
RangeProvider: rangeProvider,
ObjectWriter: putstm.Target(),
}
s.patcher = patcher.New(patcherPrm)
return nil
}
func (s *Streamer) readHeader(ctx context.Context, req *objectV2.PatchRequest) (hdrWithSig *objectV2.HeaderWithSignature, addr oid.Address, err error) {
addrV2 := req.GetBody().GetAddress()
if addrV2 == nil {
err = errors.New("patch request has nil-address")
return
}
if err = addr.ReadFromV2(*addrV2); err != nil {
err = fmt.Errorf("read address error: %w", err)
return
}
commonPrm, err := util.CommonPrmFromV2(req)
if err != nil {
return
}
commonPrm.WithLocalOnly(false)
var p getsvc.HeadPrm
p.SetSignerKey(s.localNodeKey)
p.SetCommonParameters(commonPrm)
resp := new(objectV2.HeadResponse)
resp.SetBody(new(objectV2.HeadResponseBody))
p.WithAddress(addr)
p.SetHeaderWriter(&headResponseWriter{
body: resp.GetBody(),
})
err = s.getSvc.Head(ctx, p)
if err != nil {
err = fmt.Errorf("get header error: %w", err)
return
}
var ok bool
hdrPart := resp.GetBody().GetHeaderPart()
if hdrWithSig, ok = hdrPart.(*objectV2.HeaderWithSignature); !ok {
err = fmt.Errorf("unexpected header type: %T", hdrPart)
}
return
}
func (s *Streamer) Send(ctx context.Context, req *objectV2.PatchRequest) error {
ctx, span := tracing.StartSpanFromContext(ctx, "patch.streamer.Send")
defer span.End()
defer func() {
s.nonFirstSend = true
}()
if !s.nonFirstSend {
if err := s.init(ctx, req); err != nil {
return fmt.Errorf("streamer init error: %w", err)
}
}
patch := new(objectSDK.Patch)
patch.FromV2(req.GetBody())
if !s.nonFirstSend {
err := s.patcher.ApplyAttributesPatch(ctx, patch.NewAttributes, patch.ReplaceAttributes)
if err != nil {
return fmt.Errorf("patch attributes: %w", err)
}
}
if patch.PayloadPatch != nil {
err := s.patcher.ApplyPayloadPatch(ctx, patch.PayloadPatch)
if err != nil {
return fmt.Errorf("patch payload: %w", err)
}
} else if s.nonFirstSend {
return errors.New("invalid non-first patch: empty payload")
}
return nil
}
func (s *Streamer) CloseAndRecv(_ context.Context) (*object.PatchResponse, error) {
return &object.PatchResponse{
Body: &object.PatchResponseBody{
ObjectID: nil,
func (s *Streamer) CloseAndRecv(ctx context.Context) (*objectV2.PatchResponse, error) {
patcherResp, err := s.patcher.Close(ctx)
if err != nil {
return nil, err
}
oidV2 := new(refsV2.ObjectID)
if patcherResp.AccessIdentifiers.ParentID != nil {
patcherResp.AccessIdentifiers.ParentID.WriteToV2(oidV2)
} else {
patcherResp.AccessIdentifiers.SelfID.WriteToV2(oidV2)
}
return &objectV2.PatchResponse{
Body: &objectV2.PatchResponseBody{
ObjectID: oidV2,
},
}, nil
}

View file

@ -0,0 +1,53 @@
package patchsvc
import (
"crypto/ecdsa"
"crypto/elliptic"
"errors"
"fmt"
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/refs"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/session"
putsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/put"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
)
// putInitPrm initializes put paramerer for Put stream.
func (s *Streamer) putInitPrm(req *objectV2.PatchRequest, obj *objectV2.Object) (*putsvc.PutInitPrm, error) {
commonPrm, err := util.CommonPrmFromV2(req)
if err != nil {
return nil, err
}
prm := new(putsvc.PutInitPrm)
prm.WithObject(objectSDK.NewFromV2(obj)).
WithCommonPrm(commonPrm).
WithPrivateKey(s.localNodeKey)
return prm, nil
}
func newOwnerID(vh *session.RequestVerificationHeader) (*refs.OwnerID, error) {
for vh.GetOrigin() != nil {
vh = vh.GetOrigin()
}
sig := vh.GetBodySignature()
if sig == nil {
return nil, errors.New("empty body signature")
}
key, err := keys.NewPublicKeyFromBytes(sig.GetKey(), elliptic.P256())
if err != nil {
return nil, fmt.Errorf("invalid signature key: %w", err)
}
var userID user.ID
user.IDFromKey(&userID, (ecdsa.PublicKey)(*key))
ownID := new(refs.OwnerID)
userID.WriteToV2(ownID)
return ownID, nil
}