object: Implement Patch method #1307

Merged
fyrchik merged 5 commits from aarifullin/frostfs-node:feat/patch/1 into master 2024-08-16 14:13:12 +00:00
27 changed files with 1083 additions and 14 deletions

View file

@ -2,10 +2,13 @@ package internal
import (
"bytes"
"cmp"
"context"
"errors"
"fmt"
"io"
"os"
"slices"
"sort"
"strings"
@ -869,3 +872,65 @@ func SyncContainerSettings(ctx context.Context, prm SyncContainerPrm) (*SyncCont
return new(SyncContainerRes), nil
}
// PatchObjectPrm groups parameters of PatchObject operation.
type PatchObjectPrm struct {
commonObjectPrm
objectAddressPrm
NewAttributes []objectSDK.Attribute
ReplaceAttribute bool
PayloadPatches []PayloadPatch
}
type PayloadPatch struct {
Range objectSDK.Range
PayloadPath string
}
type PatchRes struct {
OID oid.ID
}
func Patch(ctx context.Context, prm PatchObjectPrm) (*PatchRes, error) {
patchPrm := client.PrmObjectPatch{
XHeaders: prm.xHeaders,
BearerToken: prm.bearerToken,
Session: prm.sessionToken,
Address: prm.objAddr,
}
slices.SortFunc(prm.PayloadPatches, func(a, b PayloadPatch) int {
fyrchik marked this conversation as resolved Outdated

So, basically cmp.Compare? https://pkg.go.dev/cmp#Compare

So, basically `cmp.Compare`? https://pkg.go.dev/cmp#Compare

Fixed

Fixed
return cmp.Compare(a.Range.GetOffset(), b.Range.GetOffset())
})
patcher, err := prm.cli.ObjectPatchInit(ctx, patchPrm)
if err != nil {
return nil, fmt.Errorf("init payload reading: %w", err)
}
if patcher.PatchAttributes(ctx, prm.NewAttributes, prm.ReplaceAttribute) {
for _, pp := range prm.PayloadPatches {
payloadFile, err := os.OpenFile(pp.PayloadPath, os.O_RDONLY, os.ModePerm)
if err != nil {
return nil, err
}
applied := patcher.PatchPayload(ctx, &pp.Range, payloadFile)
_ = payloadFile.Close()
if !applied {
break
}
}
}
res, err := patcher.Close(ctx)
if err != nil {
return nil, err
}
return &PatchRes{
OID: res.ObjectID(),
}, nil
}

View file

@ -0,0 +1,151 @@
package object
import (
"fmt"
"strconv"
"strings"
internalclient "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/client"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/commonflags"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/key"
commonCmd "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/internal/common"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"github.com/spf13/cobra"
)
const (
newAttrsFlagName = "new-attrs"
replaceAttrsFlagName = "replace-attrs"
rangeFlagName = "range"
payloadFlagName = "payload"
)
var objectPatchCmd = &cobra.Command{
Use: "patch",
Run: patch,
Short: "Patch FrostFS object",
Long: "Patch FrostFS object. Each range passed to the command requires to pass a corresponding patch payload.",
Example: `
frostfs-cli -c config.yml -r 127.0.0.1:8080 object patch --cid <CID> --oid <OID> --new-attrs 'key1=val1,key2=val2' --replace-attrs
frostfs-cli -c config.yml -r 127.0.0.1:8080 object patch --cid <CID> --oid <OID> --range offX:lnX --payload /path/to/payloadX --range offY:lnY --payload /path/to/payloadY
frostfs-cli -c config.yml -r 127.0.0.1:8080 object patch --cid <CID> --oid <OID> --new-attrs 'key1=val1,key2=val2' --replace-attrs --range offX:lnX --payload /path/to/payload
`,
}
func initObjectPatchCmd() {
commonflags.Init(objectPatchCmd)
initFlagSession(objectPatchCmd, "PATCH")
flags := objectPatchCmd.Flags()
flags.String(commonflags.CIDFlag, "", commonflags.CIDFlagUsage)
_ = objectRangeCmd.MarkFlagRequired(commonflags.CIDFlag)
flags.String(commonflags.OIDFlag, "", commonflags.OIDFlagUsage)
_ = objectRangeCmd.MarkFlagRequired(commonflags.OIDFlag)
flags.String(newAttrsFlagName, "", "New object attributes in form of Key1=Value1,Key2=Value2")
flags.Bool(replaceAttrsFlagName, false, "Replace object attributes by new ones.")
flags.StringSlice(rangeFlagName, []string{}, "Range to which patch payload is applied. Format: offset:length")
flags.StringSlice(payloadFlagName, []string{}, "Path to file with patch payload.")
}
func patch(cmd *cobra.Command, _ []string) {
var cnr cid.ID
var obj oid.ID
objAddr := readObjectAddress(cmd, &cnr, &obj)
ranges, err := getRangeSlice(cmd)
commonCmd.ExitOnErr(cmd, "", err)
payloads := patchPayloadPaths(cmd)
if len(ranges) != len(payloads) {
commonCmd.ExitOnErr(cmd, "", fmt.Errorf("the number of ranges and payloads are not equal: ranges = %d, payloads = %d", len(ranges), len(payloads)))
}
newAttrs, err := parseNewObjectAttrs(cmd)
commonCmd.ExitOnErr(cmd, "can't parse new object attributes: %w", err)
replaceAttrs, _ := cmd.Flags().GetBool(replaceAttrsFlagName)
pk := key.GetOrGenerate(cmd)
cli := internalclient.GetSDKClientByFlag(cmd, pk, commonflags.RPC)
var prm internalclient.PatchObjectPrm
prm.SetClient(cli)
Prepare(cmd, &prm)
ReadOrOpenSession(cmd, &prm, pk, cnr, nil)
prm.SetAddress(objAddr)
prm.NewAttributes = newAttrs
prm.ReplaceAttribute = replaceAttrs
for i := range ranges {
prm.PayloadPatches = append(prm.PayloadPatches, internalclient.PayloadPatch{
Range: ranges[i],
PayloadPath: payloads[i],
})
}
res, err := internalclient.Patch(cmd.Context(), prm)
if err != nil {
commonCmd.ExitOnErr(cmd, "can't patch the object: %w", err)
}
cmd.Println("Patched object ID: ", res.OID.EncodeToString())
}
func parseNewObjectAttrs(cmd *cobra.Command) ([]objectSDK.Attribute, error) {
var rawAttrs []string
raw := cmd.Flag(newAttrsFlagName).Value.String()
if len(raw) != 0 {
rawAttrs = strings.Split(raw, ",")
}
attrs := make([]objectSDK.Attribute, len(rawAttrs), len(rawAttrs)+2) // name + timestamp attributes
for i := range rawAttrs {
k, v, found := strings.Cut(rawAttrs[i], "=")
if !found {
return nil, fmt.Errorf("invalid attribute format: %s", rawAttrs[i])
}
attrs[i].SetKey(k)
attrs[i].SetValue(v)
}
return attrs, nil
}
func getRangeSlice(cmd *cobra.Command) ([]objectSDK.Range, error) {
v, _ := cmd.Flags().GetStringSlice(rangeFlagName)
if len(v) == 0 {
return []objectSDK.Range{}, nil
}
rs := make([]objectSDK.Range, len(v))
for i := range v {
before, after, found := strings.Cut(v[i], rangeSep)
if !found {
return nil, fmt.Errorf("invalid range specifier: %s", v[i])
}
offset, err := strconv.ParseUint(before, 10, 64)
if err != nil {
return nil, fmt.Errorf("invalid '%s' range offset specifier: %w", v[i], err)
}
length, err := strconv.ParseUint(after, 10, 64)
if err != nil {
return nil, fmt.Errorf("invalid '%s' range length specifier: %w", v[i], err)
}
rs[i].SetOffset(offset)
rs[i].SetLength(length)
}
return rs, nil
}
func patchPayloadPaths(cmd *cobra.Command) []string {
v, _ := cmd.Flags().GetStringSlice(payloadFlagName)
return v
}

View file

@ -29,6 +29,7 @@ func init() {
objectRangeCmd,
objectLockCmd,
objectNodesCmd,
objectPatchCmd,
}
Cmd.AddCommand(objectChildCommands...)
@ -39,6 +40,7 @@ func init() {
}
initObjectPutCmd()
initObjectPatchCmd()
initObjectDeleteCmd()
initObjectGetCmd()
initObjectSearchCmd()

View file

@ -306,6 +306,8 @@ func finalizeSession(cmd *cobra.Command, dst SessionPrm, tok *session.Object, ke
case *internal.PutObjectPrm:
common.PrintVerbose(cmd, "Binding session to object PUT...")
tok.ForVerb(session.VerbObjectPut)
case *internal.PatchObjectPrm:
tok.ForVerb(session.VerbObjectPatch)
case *internal.DeleteObjectPrm:
common.PrintVerbose(cmd, "Binding session to object DELETE...")
tok.ForVerb(session.VerbObjectDelete)

View file

@ -239,6 +239,8 @@ func parseAction(lexeme string) ([]string, bool, error) {
return []string{nativeschema.MethodRangeObject}, true, nil
case "object.hash":
return []string{nativeschema.MethodHashObject}, true, nil
case "object.patch":
return []string{nativeschema.MethodPatchObject}, true, nil
case "object.*":
return []string{
nativeschema.MethodPutObject,
@ -248,6 +250,7 @@ func parseAction(lexeme string) ([]string, bool, error) {
nativeschema.MethodSearchObject,
nativeschema.MethodRangeObject,
nativeschema.MethodHashObject,
nativeschema.MethodPatchObject,
}, true, nil
case "container.put":
return []string{nativeschema.MethodPutContainer}, false, nil

View file

@ -28,6 +28,7 @@ import (
deletesvcV2 "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/delete/v2"
getsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/get"
getsvcV2 "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/get/v2"
patchsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/patch"
putsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/put"
putsvcV2 "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/put/v2"
searchsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/search"
@ -54,6 +55,8 @@ type objectSvc struct {
get *getsvcV2.Service
delete *deletesvcV2.Service
patch *patchsvc.Service
}
func (c *cfg) MaxObjectSize() uint64 {
@ -71,6 +74,10 @@ func (s *objectSvc) Put() (objectService.PutObjectStream, error) {
return s.put.Put()
}
func (s *objectSvc) Patch() (objectService.PatchObjectStream, error) {
return s.patch.Patch()
}
func (s *objectSvc) PutSingle(ctx context.Context, req *object.PutSingleRequest) (*object.PutSingleResponse, error) {
return s.put.PutSingle(ctx, req)
}
@ -181,10 +188,12 @@ func initObjectService(c *cfg) {
sDeleteV2 := createDeleteServiceV2(sDelete)
sPatch := createPatchSvc(sGet, sPut, keyStorage)
// build service pipeline
// grpc | audit | <metrics> | signature | response | acl | ape | split
splitSvc := createSplitService(c, sPutV2, sGetV2, sSearchV2, sDeleteV2)
splitSvc := createSplitService(c, sPutV2, sGetV2, sSearchV2, sDeleteV2, sPatch)
apeSvc := createAPEService(c, splitSvc)
@ -353,6 +362,10 @@ func createPutSvcV2(sPut *putsvc.Service, keyStorage *util.KeyStorage) *putsvcV2
return putsvcV2.NewService(sPut, keyStorage)
}
func createPatchSvc(sGet *getsvc.Service, sPut *putsvc.Service, keyStorage *util.KeyStorage) *patchsvc.Service {
return patchsvc.NewService(keyStorage, sGet, sPut)
}
func createSearchSvc(c *cfg, keyStorage *util.KeyStorage, traverseGen *util.TraverserGenerator, coreConstructor *cache.ClientCache) *searchsvc.Service {
ls := c.cfgObject.cfgLocalStorage.localStorage
@ -425,7 +438,7 @@ func createDeleteServiceV2(sDelete *deletesvc.Service) *deletesvcV2.Service {
}
func createSplitService(c *cfg, sPutV2 *putsvcV2.Service, sGetV2 *getsvcV2.Service,
sSearchV2 *searchsvcV2.Service, sDeleteV2 *deletesvcV2.Service,
sSearchV2 *searchsvcV2.Service, sDeleteV2 *deletesvcV2.Service, sPatch *patchsvc.Service,
) *objectService.TransportSplitter {
return objectService.NewTransportSplitter(
c.cfgGRPC.maxChunkSize,
@ -435,6 +448,7 @@ func createSplitService(c *cfg, sPutV2 *putsvcV2.Service, sGetV2 *getsvcV2.Servi
search: sSearchV2,
get: sGetV2,
delete: sDeleteV2,
patch: sPatch,
},
)
}

6
go.mod
View file

@ -4,14 +4,14 @@ go 1.21
require (
code.gitea.io/sdk/gitea v0.17.1
git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.16.1-0.20240726072425-3dfa2f4fd65e
git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.16.1-0.20240813155151-d112a28d382f
git.frostfs.info/TrueCloudLab/frostfs-contract v0.19.3-0.20240621131249-49e5270f673e
git.frostfs.info/TrueCloudLab/frostfs-crypto v0.6.0
git.frostfs.info/TrueCloudLab/frostfs-locode-db v0.4.1-0.20240710074952-65761deb5c0d
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20231101111734-b3ad3335ff65
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20240726111349-9da46f566fec
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20240813155821-98aabc45a720
git.frostfs.info/TrueCloudLab/hrw v1.2.1
git.frostfs.info/TrueCloudLab/policy-engine v0.0.0-20240712081403-2628f6184984
git.frostfs.info/TrueCloudLab/policy-engine v0.0.0-20240814080254-96225afacb88
git.frostfs.info/TrueCloudLab/tzhash v1.8.0
git.frostfs.info/TrueCloudLab/zapjournald v0.0.0-20240124114243-cb2e66427d02
github.com/cheggaaa/pb v1.0.29

BIN
go.sum

Binary file not shown.

View file

@ -24,6 +24,48 @@ func New(c objectSvc.ServiceServer) *Server {
}
}
// Patch opens internal Object patch stream and feeds it by the data read from gRPC stream.
func (s *Server) Patch(gStream objectGRPC.ObjectService_PatchServer) error {
stream, err := s.srv.Patch()
if err != nil {
return err
}
for {
req, err := gStream.Recv()
if err != nil {
if errors.Is(err, io.EOF) {
resp, err := stream.CloseAndRecv(gStream.Context())
if err != nil {
return err
}
return gStream.SendAndClose(resp.ToGRPCMessage().(*objectGRPC.PatchResponse))
}
return err
}
patchReq := new(object.PatchRequest)
if err := patchReq.FromGRPCMessage(req); err != nil {
return err
}
if err := stream.Send(gStream.Context(), patchReq); err != nil {
if errors.Is(err, util.ErrAbortStream) {
resp, err := stream.CloseAndRecv(gStream.Context())
if err != nil {
return err
}
return gStream.SendAndClose(resp.ToGRPCMessage().(*objectGRPC.PatchResponse))
}
return err
}
}
}
// Put opens internal Object service Put stream and overtakes data from gRPC stream to it.
func (s *Server) Put(gStream objectGRPC.ObjectService_PutServer) error {
stream, err := s.srv.Put()

View file

@ -35,6 +35,12 @@ type putStreamBasicChecker struct {
next object.PutObjectStream
}
type patchStreamBasicChecker struct {
source *Service
next object.PatchObjectStream
nonFirstSend bool
}
type getStreamBasicChecker struct {
checker ACLChecker
@ -249,6 +255,15 @@ func (b Service) Put() (object.PutObjectStream, error) {
}, err
}
func (b Service) Patch() (object.PatchObjectStream, error) {
streamer, err := b.next.Patch()
return &patchStreamBasicChecker{
source: &b,
next: streamer,
}, err
}
func (b Service) Head(
ctx context.Context,
request *objectV2.HeadRequest,
@ -734,6 +749,65 @@ func (g *searchStreamBasicChecker) Send(resp *objectV2.SearchResponse) error {
return g.SearchStream.Send(resp)
}
func (p *patchStreamBasicChecker) Send(ctx context.Context, request *objectV2.PatchRequest) error {
body := request.GetBody()
if body == nil {
return errEmptyBody
}
if !p.nonFirstSend {
p.nonFirstSend = true
cnr, err := getContainerIDFromRequest(request)
if err != nil {
return err
}
objV2 := request.GetBody().GetAddress().GetObjectID()
if objV2 == nil {
return errors.New("missing oid")
}
obj := new(oid.ID)
err = obj.ReadFromV2(*objV2)
fyrchik marked this conversation as resolved Outdated

*objV2?

`*objV2`?

Sure! Fixed

Sure! Fixed
if err != nil {
return err
}
var sTok *sessionSDK.Object
sTok, err = readSessionToken(cnr, obj, request.GetMetaHeader().GetSessionToken())
if err != nil {
return err
}
bTok, err := originalBearerToken(request.GetMetaHeader())
if err != nil {
return err
}
req := MetaWithToken{
vheader: request.GetVerificationHeader(),
token: sTok,
bearer: bTok,
src: request,
}
reqInfo, err := p.source.findRequestInfoWithoutACLOperationAssert(req, cnr)
if err != nil {
return err
}
reqInfo.obj = obj
ctx = requestContext(ctx, reqInfo)
}
return p.next.Send(ctx, request)
}
func (p patchStreamBasicChecker) CloseAndRecv(ctx context.Context) (*objectV2.PatchResponse, error) {
return p.next.CloseAndRecv(ctx)
}
func (b Service) findRequestInfo(req MetaWithToken, idCnr cid.ID, op acl.Op) (info RequestInfo, err error) {
cnr, err := b.containers.Get(idCnr) // fetch actual container
if err != nil {
@ -790,3 +864,56 @@ func (b Service) findRequestInfo(req MetaWithToken, idCnr cid.ID, op acl.Op) (in
return info, nil
}
// findRequestInfoWithoutACLOperationAssert is findRequestInfo without session token verb assert.
func (b Service) findRequestInfoWithoutACLOperationAssert(req MetaWithToken, idCnr cid.ID) (info RequestInfo, err error) {
cnr, err := b.containers.Get(idCnr) // fetch actual container
if err != nil {
return info, err
}
if req.token != nil {
currentEpoch, err := b.nm.Epoch()
if err != nil {
return info, errors.New("can't fetch current epoch")
}
if req.token.ExpiredAt(currentEpoch) {
return info, new(apistatus.SessionTokenExpired)
}
if req.token.InvalidAt(currentEpoch) {
return info, fmt.Errorf("%s: token is invalid at %d epoch)",
invalidRequestMessage, currentEpoch)
}
}
// find request role and key
ownerID, ownerKey, err := req.RequestOwner()
if err != nil {
return info, err
}
res, err := b.c.Classify(ownerID, ownerKey, idCnr, cnr.Value)
if err != nil {
return info, err
}
info.basicACL = cnr.Value.BasicACL()
info.requestRole = res.Role
info.cnrOwner = cnr.Value.Owner()
info.idCnr = idCnr
cnrNamespace, hasNamespace := strings.CutSuffix(cnrSDK.ReadDomain(cnr.Value).Zone(), ".ns")
if hasNamespace {
info.cnrNamespace = cnrNamespace
}
// it is assumed that at the moment the key will be valid,
// otherwise the request would not pass validation
info.senderKey = res.Key
// add bearer token if it is present in request
info.bearer = req.bearer
info.srcRequest = req.src
return info, nil
}

View file

@ -46,6 +46,8 @@ func getContainerIDFromRequest(req any) (cid.ID, error) {
idV2 = v.GetBody().GetAddress().GetContainerID()
case *objectV2.PutSingleRequest:
idV2 = v.GetBody().GetObject().GetHeader().GetContainerID()
case *objectV2.PatchRequest:
idV2 = v.GetBody().GetAddress().GetContainerID()
default:
return cid.ID{}, errors.New("unknown request type")
}
@ -174,7 +176,7 @@ func isOwnerFromKey(id user.ID, key *keys.PublicKey) bool {
func assertVerb(tok sessionSDK.Object, op acl.Op) bool {
switch op {
case acl.OpObjectPut:
return tok.AssertVerb(sessionSDK.VerbObjectPut, sessionSDK.VerbObjectDelete)
return tok.AssertVerb(sessionSDK.VerbObjectPut, sessionSDK.VerbObjectDelete, sessionSDK.VerbObjectPatch)
case acl.OpObjectDelete:
return tok.AssertVerb(sessionSDK.VerbObjectDelete)
case acl.OpObjectGet:
@ -185,11 +187,13 @@ func assertVerb(tok sessionSDK.Object, op acl.Op) bool {
sessionSDK.VerbObjectGet,
sessionSDK.VerbObjectDelete,
sessionSDK.VerbObjectRange,
sessionSDK.VerbObjectRangeHash)
sessionSDK.VerbObjectRangeHash,
sessionSDK.VerbObjectPatch,
)
case acl.OpObjectSearch:
return tok.AssertVerb(sessionSDK.VerbObjectSearch, sessionSDK.VerbObjectDelete)
case acl.OpObjectRange:
return tok.AssertVerb(sessionSDK.VerbObjectRange, sessionSDK.VerbObjectRangeHash)
return tok.AssertVerb(sessionSDK.VerbObjectRange, sessionSDK.VerbObjectRangeHash, sessionSDK.VerbObjectPatch)
case acl.OpObjectHash:
return tok.AssertVerb(sessionSDK.VerbObjectRangeHash)
}

View file

@ -103,7 +103,8 @@ func (c *checkerImpl) newAPERequest(ctx context.Context, prm Prm) (aperequest.Re
nativeschema.MethodHeadObject,
nativeschema.MethodRangeObject,
nativeschema.MethodHashObject,
nativeschema.MethodDeleteObject:
nativeschema.MethodDeleteObject,
nativeschema.MethodPatchObject:
if prm.Object == nil {
return defaultRequest, fmt.Errorf("method %s: %w", prm.Method, errMissingOID)
}

View file

@ -204,6 +204,62 @@ func (c *Service) Put() (objectSvc.PutObjectStream, error) {
}, err
}
type patchStreamBasicChecker struct {
fyrchik marked this conversation as resolved Outdated

This seems like a useless wrapper currently.
Why do we need this?

This seems like a useless wrapper currently. Why do we need this?

Removed

Removed

Now other methods have some APE checks defined here.
Where are Patch() APE checks processed?

Now other methods have some APE checks defined here. Where are `Patch()` APE checks processed?

I supposed to support APE checks in next PR as I need to introduce the method in policy-engine package. Currently, this is the stub but I can leave TODO for a while

I supposed to support `APE` checks in next PR as I need to introduce the method in `policy-engine` package. Currently, this is the stub but I can leave `TODO` for a while

If they will be eventually introduced, OK, let's remove this wrapper.
I just do not see any reason not to do it now.

If they will be eventually introduced, OK, let's remove this wrapper. I just do not see any reason not to do it now.
apeChecker Checker
next objectSvc.PatchObjectStream
nonFirstSend bool
}
func (p *patchStreamBasicChecker) Send(ctx context.Context, request *objectV2.PatchRequest) error {
if !p.nonFirstSend {
p.nonFirstSend = true
reqCtx, err := requestContext(ctx)
if err != nil {
return toStatusErr(err)
}
cnrID, objID, err := getAddressParamsSDK(request.GetBody().GetAddress().GetContainerID(), request.GetBody().GetAddress().GetObjectID())
if err != nil {
return toStatusErr(err)
}
prm := Prm{
Namespace: reqCtx.Namespace,
Container: cnrID,
Object: objID,
Method: nativeschema.MethodPatchObject,
SenderKey: hex.EncodeToString(reqCtx.SenderKey),
ContainerOwner: reqCtx.ContainerOwner,
Role: nativeSchemaRole(reqCtx.Role),
SoftAPECheck: reqCtx.SoftAPECheck,
BearerToken: reqCtx.BearerToken,
XHeaders: request.GetMetaHeader().GetXHeaders(),
}
if err := p.apeChecker.CheckAPE(ctx, prm); err != nil {
return toStatusErr(err)
}
}
return p.next.Send(ctx, request)
}
func (p patchStreamBasicChecker) CloseAndRecv(ctx context.Context) (*objectV2.PatchResponse, error) {
return p.next.CloseAndRecv(ctx)
}
func (c *Service) Patch() (objectSvc.PatchObjectStream, error) {
streamer, err := c.next.Patch()
return &patchStreamBasicChecker{
apeChecker: c.apeChecker,
next: streamer,
}, err
}
func (c *Service) Head(ctx context.Context, request *objectV2.HeadRequest) (*objectV2.HeadResponse, error) {
cnrID, objID, err := getAddressParamsSDK(request.GetBody().GetAddress().GetContainerID(), request.GetBody().GetAddress().GetObjectID())
if err != nil {

View file

@ -170,3 +170,64 @@ func (a *auditPutStream) Send(ctx context.Context, req *object.PutRequest) error
}
return err
}
type auditPatchStream struct {
stream PatchObjectStream
log *logger.Logger
failed bool
key []byte
containerID *refs.ContainerID
objectID *refs.ObjectID
nonFirstSend bool
}
func (a *auditService) Patch() (PatchObjectStream, error) {
res, err := a.next.Patch()
if !a.enabled.Load() {
return res, err
}
if err != nil {
audit.LogRequest(a.log, objectGRPC.ObjectService_Patch_FullMethodName, nil, nil, false)
return res, err
}
return &auditPatchStream{
stream: res,
log: a.log,
}, nil
}
// CloseAndRecv implements PatchObjectStream.
func (a *auditPatchStream) CloseAndRecv(ctx context.Context) (*object.PatchResponse, error) {
resp, err := a.stream.CloseAndRecv(ctx)
if err != nil {
a.failed = true
}
a.objectID = resp.GetBody().ObjectID
audit.LogRequestWithKey(a.log, objectGRPC.ObjectService_Patch_FullMethodName, a.key,
audit.TargetFromContainerIDObjectID(a.containerID, a.objectID),
!a.failed)
return resp, err
}
// Send implements PatchObjectStream.
fyrchik marked this conversation as resolved Outdated

This should be similar to put, GetAddress can return nil on the non-first send, right?

This should be similar to `put`, `GetAddress` can return nil on the non-first send, right?

This should be similar to put

True. However, Patch has no init message but that's can be fixed anyway

> This should be similar to put True. However, `Patch` has no `init` message but that's can be fixed anyway

Fixed

Fixed
func (a *auditPatchStream) Send(ctx context.Context, req *object.PatchRequest) error {
if !a.nonFirstSend {
a.containerID = req.GetBody().GetAddress().GetContainerID()
a.objectID = req.GetBody().GetAddress().GetObjectID()
a.key = req.GetVerificationHeader().GetBodySignature().GetKey()
a.nonFirstSend = true
}
err := a.stream.Send(ctx, req)
if err != nil {
a.failed = true
}
if !errors.Is(err, util.ErrAbortStream) { // CloseAndRecv will not be called, so log here
audit.LogRequestWithKey(a.log, objectGRPC.ObjectService_Patch_FullMethodName, a.key,
audit.TargetFromContainerIDObjectID(a.containerID, a.objectID),
!a.failed)
}
return err
}

View file

@ -48,6 +48,14 @@ func (x *Common) Put() (PutObjectStream, error) {
return x.nextHandler.Put()
}
func (x *Common) Patch() (PatchObjectStream, error) {
if x.state.IsMaintenance() {
return nil, new(apistatus.NodeUnderMaintenance)
}
return x.nextHandler.Patch()
}
func (x *Common) Head(ctx context.Context, req *objectV2.HeadRequest) (*objectV2.HeadResponse, error) {
if x.state.IsMaintenance() {
return nil, new(apistatus.NodeUnderMaintenance)

View file

@ -124,6 +124,10 @@ func (p *commonPrm) SetRequestForwarder(f RequestForwarder) {
p.forwarder = f
}
func (p *commonPrm) SetSignerKey(signerKey *ecdsa.PrivateKey) {
p.signerKey = signerKey
}
// WithAddress sets object address to be read.
func (p *commonPrm) WithAddress(addr oid.Address) {
p.addr = addr

View file

@ -27,6 +27,12 @@ type (
start time.Time
}
patchStreamMetric struct {
stream PatchObjectStream
metrics MetricRegister
start time.Time
}
MetricRegister interface {
AddRequestDuration(string, time.Duration, bool)
AddPayloadSize(string, int)
@ -76,6 +82,24 @@ func (m MetricCollector) Put() (PutObjectStream, error) {
return m.next.Put()
}
func (m MetricCollector) Patch() (PatchObjectStream, error) {
if m.enabled {
t := time.Now()
stream, err := m.next.Patch()
if err != nil {
return nil, err
}
return &patchStreamMetric{
stream: stream,
metrics: m.metrics,
start: t,
}, nil
}
return m.next.Patch()
}
func (m MetricCollector) PutSingle(ctx context.Context, request *object.PutSingleRequest) (*object.PutSingleResponse, error) {
if m.enabled {
t := time.Now()
@ -189,3 +213,16 @@ func (s putStreamMetric) CloseAndRecv(ctx context.Context) (*object.PutResponse,
return res, err
}
func (s patchStreamMetric) Send(ctx context.Context, req *object.PatchRequest) error {
s.metrics.AddPayloadSize("Patch", len(req.GetBody().GetPatch().GetChunk()))
fyrchik marked this conversation as resolved Outdated

GetPatch().GetChunk() ?
It seems this way len(nil) will get us zero and we have no need to branch with if.

`GetPatch().GetChunk()` ? It seems this way `len(nil)` will get us zero and we have no need to branch with `if`.

Sorry, I need to fix frostfs-api-go/v2 because I forgot to add the getter

Sorry, I need to fix `frostfs-api-go/v2` because I forgot to add the getter

Fixed

Fixed
return s.stream.Send(ctx, req)
}
func (s patchStreamMetric) CloseAndRecv(ctx context.Context) (*object.PatchResponse, error) {
res, err := s.stream.CloseAndRecv(ctx)
s.metrics.AddRequestDuration("Patch", time.Since(s.start), err == nil)
return res, err
}

View file

@ -0,0 +1,63 @@
package patchsvc
import (
"context"
"crypto/ecdsa"
"io"
getsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/get"
objectUtil "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
patcherSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/patcher"
)
func (p *pipeChunkWriter) WriteChunk(_ context.Context, chunk []byte) error {
_, err := p.wr.Write(chunk)
return err
}
type rangeProvider struct {
getSvc *getsvc.Service
addr oid.Address
commonPrm *objectUtil.CommonPrm
localNodeKey *ecdsa.PrivateKey
}
var _ patcherSDK.RangeProvider = (*rangeProvider)(nil)
func (r *rangeProvider) GetRange(ctx context.Context, rng *objectSDK.Range) io.Reader {
pipeReader, pipeWriter := io.Pipe()
var rngPrm getsvc.RangePrm
rngPrm.SetSignerKey(r.localNodeKey)
rngPrm.SetCommonParameters(r.commonPrm)
rngPrm.WithAddress(r.addr)
rngPrm.SetChunkWriter(&pipeChunkWriter{
wr: pipeWriter,
})
rngPrm.SetRange(rng)

What is the easiest way to see why this goroutine won't leak?

What is the easiest way to see why this goroutine won't leak?

If I get your question correctly, then the easiest way to check this - to cancel ctx.
When I was implementing GetRange I relied on this fact that r.getSvc.GetRange(ctx, rngPrm) returns an error by the cancelled job. But if it doesn't, the goroutine is screwed up.

This way is way better:

func (r *rangeProvider) GetRange(ctx context.Context, rng *objectSDK.Range) io.Reader {
	pipeReader, pipeWriter := io.Pipe()

	var rngPrm getsvc.RangePrm
	rngPrm.SetCommonParameters(r.commonPrm)

	rngPrm.WithAddress(r.addr)
	rngPrm.WithRawFlag(true)
	rngPrm.SetChunkWriter(&pipeChunkWriter{
		wr: pipeWriter,
	})
	rngPrm.SetRange(rng)

	getRangeErr := make(chan error)

	go func() {
		defer pipeWriter.Close()

		select {
		case <-ctx.Done():
			pipeWriter.CloseWithError(ctx.Err())
		case err := <-getRangeErr:
			pipeWriter.CloseWithError(err) // pipeWriter.CloseWithError(nil) -> pipeWriter.Close()
		}
	}()

	go func() {
		getRangeErr <- r.getSvc.GetRange(ctx, rngPrm)
	}()

	return pipeReader
}
If I get your question correctly, then the easiest way to check this - to cancel `ctx`. When I was implementing `GetRange` I relied on this fact that `r.getSvc.GetRange(ctx, rngPrm)` returns an error by the cancelled job. But if it doesn't, the goroutine is screwed up. This way is way better: ```go func (r *rangeProvider) GetRange(ctx context.Context, rng *objectSDK.Range) io.Reader { pipeReader, pipeWriter := io.Pipe() var rngPrm getsvc.RangePrm rngPrm.SetCommonParameters(r.commonPrm) rngPrm.WithAddress(r.addr) rngPrm.WithRawFlag(true) rngPrm.SetChunkWriter(&pipeChunkWriter{ wr: pipeWriter, }) rngPrm.SetRange(rng) getRangeErr := make(chan error) go func() { defer pipeWriter.Close() select { case <-ctx.Done(): pipeWriter.CloseWithError(ctx.Err()) case err := <-getRangeErr: pipeWriter.CloseWithError(err) // pipeWriter.CloseWithError(nil) -> pipeWriter.Close() } }() go func() { getRangeErr <- r.getSvc.GetRange(ctx, rngPrm) }() return pipeReader } ```
getRangeErr := make(chan error)
go func() {
defer pipeWriter.Close()
select {

What is so special about this error?

What is so special about this error?

You're right. From the patcher's POV it is the inability to get the original object's payloads

You're right. From the patcher's POV it is the inability to get the original object's payloads

This error is only returned on raw=true GET of a big object.
Do we currently handle big objects correctly?

This error is only returned on raw=true GET of a big object. Do we currently handle big objects correctly?
case <-ctx.Done():
pipeWriter.CloseWithError(ctx.Err())
case err := <-getRangeErr:
pipeWriter.CloseWithError(err)
fyrchik marked this conversation as resolved Outdated

Where will we handle this error in code?

Where will we handle this error in code?

This is done implicitly. pipeWriter closes the pipe with the error and the reader within patcher will get an error by doing Read. So, streamer's Send gets this error and returns

func (p *patcher) copyRange(ctx context.Context, rng *objectSDK.Range) error {
	rdr := p.rangeProvider.GetRange(ctx, rng)
	for {
		buffOrigPayload := make([]byte, p.readerBuffSize)
		n, readErr := rdr.Read(buffOrigPayload)
		if readErr != nil {
			if readErr != io.EOF {
				return fmt.Errorf("read: %w", readErr)
			}
		}
		_, wrErr := p.objectWriter.Write(ctx, buffOrigPayload[:n])
		if wrErr != nil {
			return fmt.Errorf("write: %w", wrErr)
		}
		if readErr == io.EOF {
			break
		}
	}
	return nil
}

I already checked this trying to send incorrect object address to the handler and reading the range failed with error

This is done implicitly. `pipeWriter` closes the pipe with the error and the reader within `patcher` will get an error by doing `Read`. So, streamer's `Send` gets this error and returns ```go func (p *patcher) copyRange(ctx context.Context, rng *objectSDK.Range) error { rdr := p.rangeProvider.GetRange(ctx, rng) for { buffOrigPayload := make([]byte, p.readerBuffSize) n, readErr := rdr.Read(buffOrigPayload) if readErr != nil { if readErr != io.EOF { return fmt.Errorf("read: %w", readErr) } } _, wrErr := p.objectWriter.Write(ctx, buffOrigPayload[:n]) if wrErr != nil { return fmt.Errorf("write: %w", wrErr) } if readErr == io.EOF { break } } return nil } ``` I already checked this trying to send incorrect object address to the handler and reading the range failed with error
}
}()
go func() {
getRangeErr <- r.getSvc.GetRange(ctx, rngPrm)
}()
return pipeReader
}

View file

@ -0,0 +1,44 @@
package patchsvc
import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object"
getsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/get"
putsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/put"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
)
// Service implements Put operation of Object service v2.
type Service struct {
keyStorage *util.KeyStorage
getSvc *getsvc.Service
putSvc *putsvc.Service
}
// NewService constructs Service instance from provided options.
func NewService(ks *util.KeyStorage, getSvc *getsvc.Service, putSvc *putsvc.Service) *Service {
return &Service{
keyStorage: ks,
getSvc: getSvc,
putSvc: putSvc,
}
}
// Put calls internal service and returns v2 object streamer.
func (s *Service) Patch() (object.PatchObjectStream, error) {
nodeKey, err := s.keyStorage.GetKey(nil)
if err != nil {
return nil, err
}
return &Streamer{
getSvc: s.getSvc,
putSvc: s.putSvc,
localNodeKey: nodeKey,
}, nil
}

View file

@ -0,0 +1,221 @@
package patchsvc
import (
"context"
"crypto/ecdsa"
"errors"
"fmt"
"io"
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
refsV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/refs"
getsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/get"
putsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/put"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/patcher"
)
// Streamer for the patch handler is a pipeline that merges two incoming streams of patches
// and original object payload chunks. The merged result is fed to Put stream target.
type Streamer struct {
// Patcher must be initialized at first Streamer.Send call.
patcher patcher.PatchApplier
nonFirstSend bool
getSvc *getsvc.Service
putSvc *putsvc.Service
localNodeKey *ecdsa.PrivateKey
}
type pipeChunkWriter struct {
wr *io.PipeWriter
}
type headResponseWriter struct {
body *objectV2.HeadResponseBody
}
func (w *headResponseWriter) WriteHeader(_ context.Context, hdr *objectSDK.Object) error {
w.body.SetHeaderPart(toFullObjectHeader(hdr))
return nil
}
func toFullObjectHeader(hdr *objectSDK.Object) objectV2.GetHeaderPart {
obj := hdr.ToV2()
hs := new(objectV2.HeaderWithSignature)
hs.SetHeader(obj.GetHeader())
hs.SetSignature(obj.GetSignature())
return hs
}
func (s *Streamer) init(ctx context.Context, req *objectV2.PatchRequest) error {
hdrWithSig, addr, err := s.readHeader(ctx, req)
if err != nil {
return err
}
commonPrm, err := util.CommonPrmFromV2(req)
if err != nil {
return err
}
commonPrm.WithLocalOnly(false)
rangeProvider := &rangeProvider{
getSvc: s.getSvc,
addr: addr,
fyrchik marked this conversation as resolved Outdated

Will this stream will perform additional APE checks on everything?

Will this stream will perform additional APE checks on everything?

Fixed

Fixed
commonPrm: commonPrm,
localNodeKey: s.localNodeKey,
}
putstm, err := s.putSvc.Put()
if err != nil {
return err
}
hdr := hdrWithSig.GetHeader()
oV2 := new(objectV2.Object)
hV2 := new(objectV2.Header)
oV2.SetHeader(hV2)
oV2.GetHeader().SetContainerID(hdr.GetContainerID())
oV2.GetHeader().SetPayloadLength(hdr.GetPayloadLength())
oV2.GetHeader().SetAttributes(hdr.GetAttributes())
ownerID, err := newOwnerID(req.GetVerificationHeader())
if err != nil {
return err
}
oV2.GetHeader().SetOwnerID(ownerID)
prm, err := s.putInitPrm(req, oV2)
fyrchik marked this conversation as resolved Outdated

It seems you initialize putstm just to violate incapsulation and get the target.
Please, let's make this target construction explicit instead.
If it is too much trouble now, please, create a task to fix this.

It seems you initialize `putstm` just to violate incapsulation and get the target. Please, let's make this target construction explicit instead. If it is too much trouble now, please, create a task to fix this.

If it is too much trouble now

The reason why I have violated the encapsulation - to make the Put logic the same. So, this requires a code refactor to move the logic to common package. Otherwise, we would get two put approaches and each requires a support that will lead to errors for sure because any change in PutSvc must be reflected to PatchSvc

> If it is too much trouble now The reason why I have violated the encapsulation - to make the `Put` logic the same. So, this requires a code refactor to move the logic to common package. Otherwise, we would get two put approaches and each requires a support that will lead to errors for sure because any change in `PutSvc` must be reflected to `PatchSvc`

Common code reuse is exactly what I was looking at. Not necessary in a different package.
Could you create a task for this?

Common code reuse is exactly what I was looking at. Not necessary in a different package. Could you create a task for this?
https://git.frostfs.info/TrueCloudLab/frostfs-node/issues/1310
if err != nil {
return err
}
err = putstm.Init(ctx, prm)
if err != nil {
return err
}
patcherPrm := patcher.Params{
Header: objectSDK.NewFromV2(oV2),
RangeProvider: rangeProvider,
ObjectWriter: putstm.Target(),
}
s.patcher = patcher.New(patcherPrm)
return nil
}
func (s *Streamer) readHeader(ctx context.Context, req *objectV2.PatchRequest) (hdrWithSig *objectV2.HeaderWithSignature, addr oid.Address, err error) {
addrV2 := req.GetBody().GetAddress()
if addrV2 == nil {
err = errors.New("patch request has nil-address")
return
}
if err = addr.ReadFromV2(*addrV2); err != nil {
err = fmt.Errorf("read address error: %w", err)
return
}
commonPrm, err := util.CommonPrmFromV2(req)
if err != nil {
return
}
commonPrm.WithLocalOnly(false)
var p getsvc.HeadPrm
p.SetSignerKey(s.localNodeKey)
p.SetCommonParameters(commonPrm)
resp := new(objectV2.HeadResponse)
resp.SetBody(new(objectV2.HeadResponseBody))
p.WithAddress(addr)
p.SetHeaderWriter(&headResponseWriter{
body: resp.GetBody(),
})
err = s.getSvc.Head(ctx, p)
if err != nil {
err = fmt.Errorf("get header error: %w", err)
return
}
var ok bool
hdrPart := resp.GetBody().GetHeaderPart()
if hdrWithSig, ok = hdrPart.(*objectV2.HeaderWithSignature); !ok {
err = fmt.Errorf("unexpected header type: %T", hdrPart)
}
return
}
fyrchik marked this conversation as resolved Outdated

I expected that no such logic was needed in node, why do we have this if?

I expected that no such logic was needed in node, why do we have this `if`?

As we have no init message - we need to somehow track the first request message that can set attribute patch. I've replaced this with nonFirstSend flag

As we have no `init` message - we need to somehow track the first request message that can set attribute patch. I've replaced this with `nonFirstSend` flag
func (s *Streamer) Send(ctx context.Context, req *objectV2.PatchRequest) error {
ctx, span := tracing.StartSpanFromContext(ctx, "patch.streamer.Send")
defer span.End()
defer func() {
s.nonFirstSend = true
fyrchik marked this conversation as resolved Outdated

If both fields are nil (attributes and payload), this should be an error, is it?

If both fields are nil (attributes and payload), this should be an error, is it?

The check has been added

The check has been added
fyrchik marked this conversation as resolved Outdated

remove error? it will occur in each level of wrapping (and in log message most likely)

remove `error`? it will occur in each level of wrapping (and in log message most likely)

True

True

Fixed

Fixed
}()
if !s.nonFirstSend {
if err := s.init(ctx, req); err != nil {
return fmt.Errorf("streamer init error: %w", err)
}
}
patch := new(objectSDK.Patch)
patch.FromV2(req.GetBody())
if !s.nonFirstSend {
err := s.patcher.ApplyAttributesPatch(ctx, patch.NewAttributes, patch.ReplaceAttributes)
if err != nil {
return fmt.Errorf("patch attributes: %w", err)
}
}
if patch.PayloadPatch != nil {
err := s.patcher.ApplyPayloadPatch(ctx, patch.PayloadPatch)
if err != nil {
return fmt.Errorf("patch payload: %w", err)
}
} else if s.nonFirstSend {
return errors.New("invalid non-first patch: empty payload")
}
return nil
}
func (s *Streamer) CloseAndRecv(ctx context.Context) (*objectV2.PatchResponse, error) {
patcherResp, err := s.patcher.Close(ctx)
if err != nil {
return nil, err
}
oidV2 := new(refsV2.ObjectID)
if patcherResp.AccessIdentifiers.ParentID != nil {
patcherResp.AccessIdentifiers.ParentID.WriteToV2(oidV2)
} else {
patcherResp.AccessIdentifiers.SelfID.WriteToV2(oidV2)
}
return &objectV2.PatchResponse{
Body: &objectV2.PatchResponseBody{
ObjectID: oidV2,
},
}, nil
}

View file

@ -0,0 +1,53 @@
package patchsvc
import (
"crypto/ecdsa"
"crypto/elliptic"
"errors"
"fmt"
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/refs"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/session"
putsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/put"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
)
// putInitPrm initializes put paramerer for Put stream.
func (s *Streamer) putInitPrm(req *objectV2.PatchRequest, obj *objectV2.Object) (*putsvc.PutInitPrm, error) {
commonPrm, err := util.CommonPrmFromV2(req)
if err != nil {
return nil, err
}
prm := new(putsvc.PutInitPrm)
prm.WithObject(objectSDK.NewFromV2(obj)).
WithCommonPrm(commonPrm).
WithPrivateKey(s.localNodeKey)
return prm, nil
}
func newOwnerID(vh *session.RequestVerificationHeader) (*refs.OwnerID, error) {
for vh.GetOrigin() != nil {
vh = vh.GetOrigin()
}
sig := vh.GetBodySignature()
if sig == nil {
return nil, errors.New("empty body signature")
}
key, err := keys.NewPublicKeyFromBytes(sig.GetKey(), elliptic.P256())
if err != nil {
return nil, fmt.Errorf("invalid signature key: %w", err)
}
var userID user.ID
user.IDFromKey(&userID, (ecdsa.PublicKey)(*key))
ownID := new(refs.OwnerID)
userID.WriteToV2(ownID)
return ownID, nil
}

View file

@ -2,6 +2,7 @@ package putsvc
import (
"context"
"crypto/ecdsa"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
@ -20,6 +21,8 @@ type PutInitPrm struct {
traverseOpts []placement.Option
relay func(context.Context, client.NodeInfo, client.MultiAddressClient) error
privateKey *ecdsa.PrivateKey
}
type PutChunkPrm struct {
@ -65,3 +68,11 @@ func (p *PutChunkPrm) WithChunk(v []byte) *PutChunkPrm {
return p
}
func (p *PutInitPrm) WithPrivateKey(v *ecdsa.PrivateKey) *PutInitPrm {
if p != nil {
p.privateKey = v
}
return p
}

View file

@ -47,6 +47,11 @@ func (p *Streamer) Init(ctx context.Context, prm *PutInitPrm) error {
return nil
}
// Target accesses underlying target chunked object writer.
func (p *Streamer) Target() transformer.ChunkedObjectWriter {
return p.target
}
// MaxObjectSize returns maximum payload size for the streaming session.
//
// Must be called after the successful Init.
@ -79,11 +84,15 @@ func (p *Streamer) initTarget(prm *PutInitPrm) error {
func (p *Streamer) initUntrustedTarget(prm *PutInitPrm) error {
p.relay = prm.relay
if prm.privateKey != nil {
p.privateKey = prm.privateKey
} else {
nodeKey, err := p.cfg.keyStorage.GetKey(nil)
if err != nil {
return err
}
p.privateKey = nodeKey
}
// prepare untrusted-Put object target
p.target = &validatingPreparedTarget{
@ -136,7 +145,11 @@ func (p *Streamer) initTrustedTarget(prm *PutInitPrm) error {
}
}
if prm.privateKey != nil {
p.privateKey = prm.privateKey
} else {
p.privateKey = key
}
p.target = &validatingTarget{
fmt: p.fmtValidator,
nextTarget: transformer.NewPayloadSizeLimiter(transformer.Params{

View file

@ -37,6 +37,11 @@ type putStreamResponser struct {
respSvc *response.Service
}
type patchStreamResponser struct {
stream PatchObjectStream
respSvc *response.Service
}
// NewResponseService returns object service instance that passes internal service
// call to response service.
func NewResponseService(objSvc ServiceServer, respSvc *response.Service) *ResponseService {
@ -87,6 +92,35 @@ func (s *ResponseService) Put() (PutObjectStream, error) {
}, nil
}
func (s *patchStreamResponser) Send(ctx context.Context, req *object.PatchRequest) error {
if err := s.stream.Send(ctx, req); err != nil {
return fmt.Errorf("could not send the request: %w", err)
}
return nil
}
func (s *patchStreamResponser) CloseAndRecv(ctx context.Context) (*object.PatchResponse, error) {
r, err := s.stream.CloseAndRecv(ctx)
if err != nil {
return nil, fmt.Errorf("could not close stream and receive response: %w", err)
}
s.respSvc.SetMeta(r)
return r, nil
}
func (s *ResponseService) Patch() (PatchObjectStream, error) {
stream, err := s.svc.Patch()
if err != nil {
return nil, fmt.Errorf("could not create Put object streamer: %w", err)
}
return &patchStreamResponser{
stream: stream,
respSvc: s.respSvc,
}, nil
}
func (s *ResponseService) PutSingle(ctx context.Context, req *object.PutSingleRequest) (*object.PutSingleResponse, error) {
resp, err := s.svc.PutSingle(ctx, req)
if err != nil {

View file

@ -31,11 +31,18 @@ type PutObjectStream interface {
CloseAndRecv(context.Context) (*object.PutResponse, error)
}
// PatchObjectStream is an interface of FrostFS API v2 compatible patch streamer.
type PatchObjectStream interface {
Send(context.Context, *object.PatchRequest) error
CloseAndRecv(context.Context) (*object.PatchResponse, error)
}
// ServiceServer is an interface of utility
// serving v2 Object service.
type ServiceServer interface {
Get(*object.GetRequest, GetObjectStream) error
Put() (PutObjectStream, error)
Patch() (PatchObjectStream, error)
Head(context.Context, *object.HeadRequest) (*object.HeadResponse, error)
Search(*object.SearchRequest, SearchStream) error
Delete(context.Context, *object.DeleteRequest) (*object.DeleteResponse, error)

View file

@ -35,6 +35,12 @@ type putStreamSigner struct {
err error
}
type patchStreamSigner struct {
sigSvc *util.SignService
stream PatchObjectStream
err error
}
type getRangeStreamSigner struct {
GetObjectRangeStream
sigSvc *util.SignService
@ -112,6 +118,42 @@ func (s *SignService) Put() (PutObjectStream, error) {
}, nil
}
func (s *patchStreamSigner) Send(ctx context.Context, req *object.PatchRequest) error {
if s.err = s.sigSvc.VerifyRequest(req); s.err != nil {
return util.ErrAbortStream
}
if s.err = s.stream.Send(ctx, req); s.err != nil {
return util.ErrAbortStream
}
return nil
}
func (s *patchStreamSigner) CloseAndRecv(ctx context.Context) (resp *object.PatchResponse, err error) {
if s.err != nil {
err = s.err
resp = new(object.PatchResponse)
} else {
resp, err = s.stream.CloseAndRecv(ctx)
if err != nil {
return nil, fmt.Errorf("could not close stream and receive response: %w", err)
}
}
return resp, s.sigSvc.SignResponse(resp, err)
}
func (s *SignService) Patch() (PatchObjectStream, error) {
stream, err := s.svc.Patch()
if err != nil {
return nil, fmt.Errorf("could not create Put object streamer: %w", err)
}
return &patchStreamSigner{
stream: stream,
sigSvc: s.sigSvc,
}, nil
}
func (s *SignService) Head(ctx context.Context, req *object.HeadRequest) (*object.HeadResponse, error) {
if err := s.sigSvc.VerifyRequest(req); err != nil {
resp := new(object.HeadResponse)

View file

@ -91,6 +91,10 @@ func (c TransportSplitter) Put() (PutObjectStream, error) {
return c.next.Put()
}
func (c TransportSplitter) Patch() (PatchObjectStream, error) {
return c.next.Patch()
}
func (c TransportSplitter) Head(ctx context.Context, request *object.HeadRequest) (*object.HeadResponse, error) {
return c.next.Head(ctx, request)
}