package patchsvc import ( "context" "crypto/ecdsa" "errors" "fmt" "io" objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object" refsV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/refs" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/common/target" objectwriter "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/common/writer" getsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/get" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/patcher" ) // Streamer for the patch handler is a pipeline that merges two incoming streams of patches // and original object payload chunks. The merged result is fed to Put stream target. type Streamer struct { *objectwriter.Config // Patcher must be initialized at first Streamer.Send call. patcher patcher.PatchApplier nonFirstSend bool getSvc *getsvc.Service localNodeKey *ecdsa.PrivateKey } type pipeChunkWriter struct { wr *io.PipeWriter } type headResponseWriter struct { body *objectV2.HeadResponseBody } func (w *headResponseWriter) WriteHeader(_ context.Context, hdr *objectSDK.Object) error { w.body.SetHeaderPart(toFullObjectHeader(hdr)) return nil } func toFullObjectHeader(hdr *objectSDK.Object) objectV2.GetHeaderPart { obj := hdr.ToV2() hs := new(objectV2.HeaderWithSignature) hs.SetHeader(obj.GetHeader()) hs.SetSignature(obj.GetSignature()) return hs } func isLinkObject(hdr *objectV2.HeaderWithSignature) bool { split := hdr.GetHeader().GetSplit() return len(split.GetChildren()) > 0 && split.GetParent() != nil } func isComplexObjectPart(hdr *objectV2.HeaderWithSignature) bool { return hdr.GetHeader().GetEC() != nil || hdr.GetHeader().GetSplit() != nil } func (s *Streamer) init(ctx context.Context, req *objectV2.PatchRequest) error { hdrWithSig, addr, err := s.readHeader(ctx, req) if err != nil { return err } if hdrWithSig.GetHeader().GetObjectType() != objectV2.TypeRegular { return errors.New("non-regular object can't be patched") } if isLinkObject(hdrWithSig) { return errors.New("linking object can't be patched") } if isComplexObjectPart(hdrWithSig) { return errors.New("complex object parts can't be patched") } commonPrm, err := util.CommonPrmFromV2(req) if err != nil { return err } commonPrm.WithLocalOnly(false) rangeProvider := &rangeProvider{ getSvc: s.getSvc, addr: addr, commonPrm: commonPrm, localNodeKey: s.localNodeKey, } hdr := hdrWithSig.GetHeader() oV2 := new(objectV2.Object) hV2 := new(objectV2.Header) oV2.SetHeader(hV2) oV2.GetHeader().SetContainerID(hdr.GetContainerID()) oV2.GetHeader().SetPayloadLength(hdr.GetPayloadLength()) oV2.GetHeader().SetAttributes(hdr.GetAttributes()) ownerID, err := newOwnerID(req.GetVerificationHeader()) if err != nil { return err } oV2.GetHeader().SetOwnerID(ownerID) target, err := target.New(objectwriter.Params{ Config: s.Config, Common: commonPrm, Header: objectSDK.NewFromV2(oV2), SignRequestPrivateKey: s.localNodeKey, }) if err != nil { return fmt.Errorf("target creation: %w", err) } patcherPrm := patcher.Params{ Header: objectSDK.NewFromV2(oV2), RangeProvider: rangeProvider, ObjectWriter: target, } s.patcher = patcher.New(patcherPrm) return nil } func (s *Streamer) readHeader(ctx context.Context, req *objectV2.PatchRequest) (hdrWithSig *objectV2.HeaderWithSignature, addr oid.Address, err error) { addrV2 := req.GetBody().GetAddress() if addrV2 == nil { err = errors.New("patch request has nil-address") return } if err = addr.ReadFromV2(*addrV2); err != nil { err = fmt.Errorf("read address error: %w", err) return } commonPrm, err := util.CommonPrmFromV2(req) if err != nil { return } commonPrm.WithLocalOnly(false) var p getsvc.HeadPrm p.SetSignerKey(s.localNodeKey) p.SetCommonParameters(commonPrm) resp := new(objectV2.HeadResponse) resp.SetBody(new(objectV2.HeadResponseBody)) p.WithAddress(addr) p.SetHeaderWriter(&headResponseWriter{ body: resp.GetBody(), }) err = s.getSvc.Head(ctx, p) if err != nil { err = fmt.Errorf("get header error: %w", err) return } var ok bool hdrPart := resp.GetBody().GetHeaderPart() if hdrWithSig, ok = hdrPart.(*objectV2.HeaderWithSignature); !ok { err = fmt.Errorf("unexpected header type: %T", hdrPart) } return } func (s *Streamer) Send(ctx context.Context, req *objectV2.PatchRequest) error { ctx, span := tracing.StartSpanFromContext(ctx, "patch.streamer.Send") defer span.End() defer func() { s.nonFirstSend = true }() if !s.nonFirstSend { if err := s.init(ctx, req); err != nil { return fmt.Errorf("streamer init error: %w", err) } } patch := new(objectSDK.Patch) patch.FromV2(req.GetBody()) if !s.nonFirstSend { err := s.patcher.ApplyAttributesPatch(ctx, patch.NewAttributes, patch.ReplaceAttributes) if err != nil { return fmt.Errorf("patch attributes: %w", err) } } if patch.PayloadPatch != nil { err := s.patcher.ApplyPayloadPatch(ctx, patch.PayloadPatch) if err != nil { return fmt.Errorf("patch payload: %w", err) } } else if s.nonFirstSend { return errors.New("invalid non-first patch: empty payload") } return nil } func (s *Streamer) CloseAndRecv(ctx context.Context) (*objectV2.PatchResponse, error) { patcherResp, err := s.patcher.Close(ctx) if err != nil { return nil, err } oidV2 := new(refsV2.ObjectID) if patcherResp.AccessIdentifiers.ParentID != nil { patcherResp.AccessIdentifiers.ParentID.WriteToV2(oidV2) } else { patcherResp.AccessIdentifiers.SelfID.WriteToV2(oidV2) } return &objectV2.PatchResponse{ Body: &objectV2.PatchResponseBody{ ObjectID: oidV2, }, }, nil }