frostfs-node/pkg/services/object/patch/streamer.go

222 lines
5.1 KiB
Go
Raw Normal View History

package patchsvc
import (
"context"
"crypto/ecdsa"
"errors"
"fmt"
"io"
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
refsV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/refs"
getsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/get"
putsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/put"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/patcher"
)
// Streamer for the patch handler is a pipeline that merges two incoming streams of patches
// and original object payload chunks. The merged result is fed to Put stream target.
type Streamer struct {
// Patcher must be initialized at first Streamer.Send call.
patcher patcher.PatchApplier
nonFirstSend bool
getSvc *getsvc.Service
putSvc *putsvc.Service
localNodeKey *ecdsa.PrivateKey
}
type pipeChunkWriter struct {
wr *io.PipeWriter
}
type headResponseWriter struct {
body *objectV2.HeadResponseBody
}
func (w *headResponseWriter) WriteHeader(_ context.Context, hdr *objectSDK.Object) error {
w.body.SetHeaderPart(toFullObjectHeader(hdr))
return nil
}
func toFullObjectHeader(hdr *objectSDK.Object) objectV2.GetHeaderPart {
obj := hdr.ToV2()
hs := new(objectV2.HeaderWithSignature)
hs.SetHeader(obj.GetHeader())
hs.SetSignature(obj.GetSignature())
return hs
}
func (s *Streamer) init(ctx context.Context, req *objectV2.PatchRequest) error {
hdrWithSig, addr, err := s.readHeader(ctx, req)
if err != nil {
return err
}
commonPrm, err := util.CommonPrmFromV2(req)
if err != nil {
return err
}
commonPrm.WithLocalOnly(false)
rangeProvider := &rangeProvider{
getSvc: s.getSvc,
addr: addr,
commonPrm: commonPrm,
localNodeKey: s.localNodeKey,
}
putstm, err := s.putSvc.Put()
if err != nil {
return err
}
hdr := hdrWithSig.GetHeader()
oV2 := new(objectV2.Object)
hV2 := new(objectV2.Header)
oV2.SetHeader(hV2)
oV2.GetHeader().SetContainerID(hdr.GetContainerID())
oV2.GetHeader().SetPayloadLength(hdr.GetPayloadLength())
oV2.GetHeader().SetAttributes(hdr.GetAttributes())
ownerID, err := newOwnerID(req.GetVerificationHeader())
if err != nil {
return err
}
oV2.GetHeader().SetOwnerID(ownerID)
prm, err := s.putInitPrm(req, oV2)
if err != nil {
return err
}
err = putstm.Init(ctx, prm)
if err != nil {
return err
}
patcherPrm := patcher.Params{
Header: objectSDK.NewFromV2(oV2),
RangeProvider: rangeProvider,
ObjectWriter: putstm.Target(),
}
s.patcher = patcher.New(patcherPrm)
return nil
}
func (s *Streamer) readHeader(ctx context.Context, req *objectV2.PatchRequest) (hdrWithSig *objectV2.HeaderWithSignature, addr oid.Address, err error) {
addrV2 := req.GetBody().GetAddress()
if addrV2 == nil {
err = errors.New("patch request has nil-address")
return
}
if err = addr.ReadFromV2(*addrV2); err != nil {
err = fmt.Errorf("read address error: %w", err)
return
}
commonPrm, err := util.CommonPrmFromV2(req)
if err != nil {
return
}
commonPrm.WithLocalOnly(false)
var p getsvc.HeadPrm
p.SetSignerKey(s.localNodeKey)
p.SetCommonParameters(commonPrm)
resp := new(objectV2.HeadResponse)
resp.SetBody(new(objectV2.HeadResponseBody))
p.WithAddress(addr)
p.SetHeaderWriter(&headResponseWriter{
body: resp.GetBody(),
})
err = s.getSvc.Head(ctx, p)
if err != nil {
err = fmt.Errorf("get header error: %w", err)
return
}
var ok bool
hdrPart := resp.GetBody().GetHeaderPart()
if hdrWithSig, ok = hdrPart.(*objectV2.HeaderWithSignature); !ok {
err = fmt.Errorf("unexpected header type: %T", hdrPart)
}
return
}
func (s *Streamer) Send(ctx context.Context, req *objectV2.PatchRequest) error {
ctx, span := tracing.StartSpanFromContext(ctx, "patch.streamer.Send")
defer span.End()
defer func() {
s.nonFirstSend = true
}()
if !s.nonFirstSend {
if err := s.init(ctx, req); err != nil {
return fmt.Errorf("streamer init error: %w", err)
}
}
patch := new(objectSDK.Patch)
patch.FromV2(req.GetBody())
if !s.nonFirstSend {
err := s.patcher.ApplyAttributesPatch(ctx, patch.NewAttributes, patch.ReplaceAttributes)
if err != nil {
return fmt.Errorf("patch attributes: %w", err)
}
}
if patch.PayloadPatch != nil {
err := s.patcher.ApplyPayloadPatch(ctx, patch.PayloadPatch)
if err != nil {
return fmt.Errorf("patch payload: %w", err)
}
} else if s.nonFirstSend {
return errors.New("invalid non-first patch: empty payload")
}
return nil
}
func (s *Streamer) CloseAndRecv(ctx context.Context) (*objectV2.PatchResponse, error) {
patcherResp, err := s.patcher.Close(ctx)
if err != nil {
return nil, err
}
oidV2 := new(refsV2.ObjectID)
if patcherResp.AccessIdentifiers.ParentID != nil {
patcherResp.AccessIdentifiers.ParentID.WriteToV2(oidV2)
} else {
patcherResp.AccessIdentifiers.SelfID.WriteToV2(oidV2)
}
return &objectV2.PatchResponse{
Body: &objectV2.PatchResponseBody{
ObjectID: oidV2,
},
}, nil
}