object: Implement Patch
method #1307
|
@ -2,10 +2,13 @@ package internal
|
|||
|
||||
import (
|
||||
"bytes"
|
||||
"cmp"
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"slices"
|
||||
"sort"
|
||||
"strings"
|
||||
|
||||
|
@ -869,3 +872,65 @@ func SyncContainerSettings(ctx context.Context, prm SyncContainerPrm) (*SyncCont
|
|||
|
||||
return new(SyncContainerRes), nil
|
||||
}
|
||||
|
||||
// PatchObjectPrm groups parameters of PatchObject operation.
|
||||
type PatchObjectPrm struct {
|
||||
commonObjectPrm
|
||||
objectAddressPrm
|
||||
|
||||
NewAttributes []objectSDK.Attribute
|
||||
|
||||
ReplaceAttribute bool
|
||||
|
||||
PayloadPatches []PayloadPatch
|
||||
}
|
||||
|
||||
type PayloadPatch struct {
|
||||
Range objectSDK.Range
|
||||
|
||||
PayloadPath string
|
||||
}
|
||||
|
||||
type PatchRes struct {
|
||||
OID oid.ID
|
||||
}
|
||||
|
||||
func Patch(ctx context.Context, prm PatchObjectPrm) (*PatchRes, error) {
|
||||
patchPrm := client.PrmObjectPatch{
|
||||
XHeaders: prm.xHeaders,
|
||||
BearerToken: prm.bearerToken,
|
||||
Session: prm.sessionToken,
|
||||
Address: prm.objAddr,
|
||||
}
|
||||
|
||||
slices.SortFunc(prm.PayloadPatches, func(a, b PayloadPatch) int {
|
||||
fyrchik marked this conversation as resolved
Outdated
|
||||
return cmp.Compare(a.Range.GetOffset(), b.Range.GetOffset())
|
||||
})
|
||||
|
||||
patcher, err := prm.cli.ObjectPatchInit(ctx, patchPrm)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("init payload reading: %w", err)
|
||||
}
|
||||
|
||||
if patcher.PatchAttributes(ctx, prm.NewAttributes, prm.ReplaceAttribute) {
|
||||
for _, pp := range prm.PayloadPatches {
|
||||
payloadFile, err := os.OpenFile(pp.PayloadPath, os.O_RDONLY, os.ModePerm)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
applied := patcher.PatchPayload(ctx, &pp.Range, payloadFile)
|
||||
_ = payloadFile.Close()
|
||||
if !applied {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
res, err := patcher.Close(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &PatchRes{
|
||||
OID: res.ObjectID(),
|
||||
}, nil
|
||||
}
|
||||
|
|
151
cmd/frostfs-cli/modules/object/patch.go
Normal file
|
@ -0,0 +1,151 @@
|
|||
package object
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
internalclient "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/client"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/commonflags"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/key"
|
||||
commonCmd "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/internal/common"
|
||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"github.com/spf13/cobra"
|
||||
)
|
||||
|
||||
const (
|
||||
newAttrsFlagName = "new-attrs"
|
||||
replaceAttrsFlagName = "replace-attrs"
|
||||
rangeFlagName = "range"
|
||||
payloadFlagName = "payload"
|
||||
)
|
||||
|
||||
var objectPatchCmd = &cobra.Command{
|
||||
Use: "patch",
|
||||
Run: patch,
|
||||
Short: "Patch FrostFS object",
|
||||
Long: "Patch FrostFS object. Each range passed to the command requires to pass a corresponding patch payload.",
|
||||
Example: `
|
||||
frostfs-cli -c config.yml -r 127.0.0.1:8080 object patch --cid <CID> --oid <OID> --new-attrs 'key1=val1,key2=val2' --replace-attrs
|
||||
frostfs-cli -c config.yml -r 127.0.0.1:8080 object patch --cid <CID> --oid <OID> --range offX:lnX --payload /path/to/payloadX --range offY:lnY --payload /path/to/payloadY
|
||||
frostfs-cli -c config.yml -r 127.0.0.1:8080 object patch --cid <CID> --oid <OID> --new-attrs 'key1=val1,key2=val2' --replace-attrs --range offX:lnX --payload /path/to/payload
|
||||
`,
|
||||
}
|
||||
|
||||
func initObjectPatchCmd() {
|
||||
commonflags.Init(objectPatchCmd)
|
||||
initFlagSession(objectPatchCmd, "PATCH")
|
||||
|
||||
flags := objectPatchCmd.Flags()
|
||||
|
||||
flags.String(commonflags.CIDFlag, "", commonflags.CIDFlagUsage)
|
||||
_ = objectRangeCmd.MarkFlagRequired(commonflags.CIDFlag)
|
||||
|
||||
flags.String(commonflags.OIDFlag, "", commonflags.OIDFlagUsage)
|
||||
_ = objectRangeCmd.MarkFlagRequired(commonflags.OIDFlag)
|
||||
|
||||
flags.String(newAttrsFlagName, "", "New object attributes in form of Key1=Value1,Key2=Value2")
|
||||
flags.Bool(replaceAttrsFlagName, false, "Replace object attributes by new ones.")
|
||||
flags.StringSlice(rangeFlagName, []string{}, "Range to which patch payload is applied. Format: offset:length")
|
||||
flags.StringSlice(payloadFlagName, []string{}, "Path to file with patch payload.")
|
||||
}
|
||||
|
||||
func patch(cmd *cobra.Command, _ []string) {
|
||||
var cnr cid.ID
|
||||
var obj oid.ID
|
||||
|
||||
objAddr := readObjectAddress(cmd, &cnr, &obj)
|
||||
|
||||
ranges, err := getRangeSlice(cmd)
|
||||
commonCmd.ExitOnErr(cmd, "", err)
|
||||
|
||||
payloads := patchPayloadPaths(cmd)
|
||||
|
||||
if len(ranges) != len(payloads) {
|
||||
commonCmd.ExitOnErr(cmd, "", fmt.Errorf("the number of ranges and payloads are not equal: ranges = %d, payloads = %d", len(ranges), len(payloads)))
|
||||
}
|
||||
|
||||
newAttrs, err := parseNewObjectAttrs(cmd)
|
||||
commonCmd.ExitOnErr(cmd, "can't parse new object attributes: %w", err)
|
||||
replaceAttrs, _ := cmd.Flags().GetBool(replaceAttrsFlagName)
|
||||
|
||||
pk := key.GetOrGenerate(cmd)
|
||||
|
||||
cli := internalclient.GetSDKClientByFlag(cmd, pk, commonflags.RPC)
|
||||
|
||||
var prm internalclient.PatchObjectPrm
|
||||
prm.SetClient(cli)
|
||||
Prepare(cmd, &prm)
|
||||
ReadOrOpenSession(cmd, &prm, pk, cnr, nil)
|
||||
|
||||
prm.SetAddress(objAddr)
|
||||
prm.NewAttributes = newAttrs
|
||||
prm.ReplaceAttribute = replaceAttrs
|
||||
|
||||
for i := range ranges {
|
||||
prm.PayloadPatches = append(prm.PayloadPatches, internalclient.PayloadPatch{
|
||||
Range: ranges[i],
|
||||
PayloadPath: payloads[i],
|
||||
})
|
||||
}
|
||||
|
||||
res, err := internalclient.Patch(cmd.Context(), prm)
|
||||
if err != nil {
|
||||
commonCmd.ExitOnErr(cmd, "can't patch the object: %w", err)
|
||||
}
|
||||
cmd.Println("Patched object ID: ", res.OID.EncodeToString())
|
||||
}
|
||||
|
||||
func parseNewObjectAttrs(cmd *cobra.Command) ([]objectSDK.Attribute, error) {
|
||||
var rawAttrs []string
|
||||
|
||||
raw := cmd.Flag(newAttrsFlagName).Value.String()
|
||||
if len(raw) != 0 {
|
||||
rawAttrs = strings.Split(raw, ",")
|
||||
}
|
||||
|
||||
attrs := make([]objectSDK.Attribute, len(rawAttrs), len(rawAttrs)+2) // name + timestamp attributes
|
||||
for i := range rawAttrs {
|
||||
k, v, found := strings.Cut(rawAttrs[i], "=")
|
||||
if !found {
|
||||
return nil, fmt.Errorf("invalid attribute format: %s", rawAttrs[i])
|
||||
}
|
||||
attrs[i].SetKey(k)
|
||||
attrs[i].SetValue(v)
|
||||
}
|
||||
return attrs, nil
|
||||
}
|
||||
|
||||
func getRangeSlice(cmd *cobra.Command) ([]objectSDK.Range, error) {
|
||||
v, _ := cmd.Flags().GetStringSlice(rangeFlagName)
|
||||
if len(v) == 0 {
|
||||
return []objectSDK.Range{}, nil
|
||||
}
|
||||
rs := make([]objectSDK.Range, len(v))
|
||||
for i := range v {
|
||||
before, after, found := strings.Cut(v[i], rangeSep)
|
||||
if !found {
|
||||
return nil, fmt.Errorf("invalid range specifier: %s", v[i])
|
||||
}
|
||||
|
||||
offset, err := strconv.ParseUint(before, 10, 64)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("invalid '%s' range offset specifier: %w", v[i], err)
|
||||
}
|
||||
length, err := strconv.ParseUint(after, 10, 64)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("invalid '%s' range length specifier: %w", v[i], err)
|
||||
}
|
||||
|
||||
rs[i].SetOffset(offset)
|
||||
rs[i].SetLength(length)
|
||||
}
|
||||
return rs, nil
|
||||
}
|
||||
|
||||
func patchPayloadPaths(cmd *cobra.Command) []string {
|
||||
v, _ := cmd.Flags().GetStringSlice(payloadFlagName)
|
||||
return v
|
||||
}
|
|
@ -29,6 +29,7 @@ func init() {
|
|||
objectRangeCmd,
|
||||
objectLockCmd,
|
||||
objectNodesCmd,
|
||||
objectPatchCmd,
|
||||
}
|
||||
|
||||
Cmd.AddCommand(objectChildCommands...)
|
||||
|
@ -39,6 +40,7 @@ func init() {
|
|||
}
|
||||
|
||||
initObjectPutCmd()
|
||||
initObjectPatchCmd()
|
||||
initObjectDeleteCmd()
|
||||
initObjectGetCmd()
|
||||
initObjectSearchCmd()
|
||||
|
|
|
@ -306,6 +306,8 @@ func finalizeSession(cmd *cobra.Command, dst SessionPrm, tok *session.Object, ke
|
|||
case *internal.PutObjectPrm:
|
||||
common.PrintVerbose(cmd, "Binding session to object PUT...")
|
||||
tok.ForVerb(session.VerbObjectPut)
|
||||
case *internal.PatchObjectPrm:
|
||||
tok.ForVerb(session.VerbObjectPatch)
|
||||
case *internal.DeleteObjectPrm:
|
||||
common.PrintVerbose(cmd, "Binding session to object DELETE...")
|
||||
tok.ForVerb(session.VerbObjectDelete)
|
||||
|
|
|
@ -239,6 +239,8 @@ func parseAction(lexeme string) ([]string, bool, error) {
|
|||
return []string{nativeschema.MethodRangeObject}, true, nil
|
||||
case "object.hash":
|
||||
return []string{nativeschema.MethodHashObject}, true, nil
|
||||
case "object.patch":
|
||||
return []string{nativeschema.MethodPatchObject}, true, nil
|
||||
case "object.*":
|
||||
return []string{
|
||||
nativeschema.MethodPutObject,
|
||||
|
@ -248,6 +250,7 @@ func parseAction(lexeme string) ([]string, bool, error) {
|
|||
nativeschema.MethodSearchObject,
|
||||
nativeschema.MethodRangeObject,
|
||||
nativeschema.MethodHashObject,
|
||||
nativeschema.MethodPatchObject,
|
||||
}, true, nil
|
||||
case "container.put":
|
||||
return []string{nativeschema.MethodPutContainer}, false, nil
|
||||
|
|
|
@ -28,6 +28,7 @@ import (
|
|||
deletesvcV2 "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/delete/v2"
|
||||
getsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/get"
|
||||
getsvcV2 "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/get/v2"
|
||||
patchsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/patch"
|
||||
putsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/put"
|
||||
putsvcV2 "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/put/v2"
|
||||
searchsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/search"
|
||||
|
@ -54,6 +55,8 @@ type objectSvc struct {
|
|||
get *getsvcV2.Service
|
||||
|
||||
delete *deletesvcV2.Service
|
||||
|
||||
patch *patchsvc.Service
|
||||
}
|
||||
|
||||
func (c *cfg) MaxObjectSize() uint64 {
|
||||
|
@ -71,6 +74,10 @@ func (s *objectSvc) Put() (objectService.PutObjectStream, error) {
|
|||
return s.put.Put()
|
||||
}
|
||||
|
||||
func (s *objectSvc) Patch() (objectService.PatchObjectStream, error) {
|
||||
return s.patch.Patch()
|
||||
}
|
||||
|
||||
func (s *objectSvc) PutSingle(ctx context.Context, req *object.PutSingleRequest) (*object.PutSingleResponse, error) {
|
||||
return s.put.PutSingle(ctx, req)
|
||||
}
|
||||
|
@ -181,10 +188,12 @@ func initObjectService(c *cfg) {
|
|||
|
||||
sDeleteV2 := createDeleteServiceV2(sDelete)
|
||||
|
||||
sPatch := createPatchSvc(sGet, sPut, keyStorage)
|
||||
|
||||
// build service pipeline
|
||||
// grpc | audit | <metrics> | signature | response | acl | ape | split
|
||||
|
||||
splitSvc := createSplitService(c, sPutV2, sGetV2, sSearchV2, sDeleteV2)
|
||||
splitSvc := createSplitService(c, sPutV2, sGetV2, sSearchV2, sDeleteV2, sPatch)
|
||||
|
||||
apeSvc := createAPEService(c, splitSvc)
|
||||
|
||||
|
@ -353,6 +362,10 @@ func createPutSvcV2(sPut *putsvc.Service, keyStorage *util.KeyStorage) *putsvcV2
|
|||
return putsvcV2.NewService(sPut, keyStorage)
|
||||
}
|
||||
|
||||
func createPatchSvc(sGet *getsvc.Service, sPut *putsvc.Service, keyStorage *util.KeyStorage) *patchsvc.Service {
|
||||
return patchsvc.NewService(keyStorage, sGet, sPut)
|
||||
}
|
||||
|
||||
func createSearchSvc(c *cfg, keyStorage *util.KeyStorage, traverseGen *util.TraverserGenerator, coreConstructor *cache.ClientCache) *searchsvc.Service {
|
||||
ls := c.cfgObject.cfgLocalStorage.localStorage
|
||||
|
||||
|
@ -425,7 +438,7 @@ func createDeleteServiceV2(sDelete *deletesvc.Service) *deletesvcV2.Service {
|
|||
}
|
||||
|
||||
func createSplitService(c *cfg, sPutV2 *putsvcV2.Service, sGetV2 *getsvcV2.Service,
|
||||
sSearchV2 *searchsvcV2.Service, sDeleteV2 *deletesvcV2.Service,
|
||||
sSearchV2 *searchsvcV2.Service, sDeleteV2 *deletesvcV2.Service, sPatch *patchsvc.Service,
|
||||
) *objectService.TransportSplitter {
|
||||
return objectService.NewTransportSplitter(
|
||||
c.cfgGRPC.maxChunkSize,
|
||||
|
@ -435,6 +448,7 @@ func createSplitService(c *cfg, sPutV2 *putsvcV2.Service, sGetV2 *getsvcV2.Servi
|
|||
search: sSearchV2,
|
||||
get: sGetV2,
|
||||
delete: sDeleteV2,
|
||||
patch: sPatch,
|
||||
},
|
||||
)
|
||||
}
|
||||
|
|
6
go.mod
|
@ -4,14 +4,14 @@ go 1.21
|
|||
|
||||
require (
|
||||
code.gitea.io/sdk/gitea v0.17.1
|
||||
git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.16.1-0.20240726072425-3dfa2f4fd65e
|
||||
git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.16.1-0.20240813155151-d112a28d382f
|
||||
git.frostfs.info/TrueCloudLab/frostfs-contract v0.19.3-0.20240621131249-49e5270f673e
|
||||
git.frostfs.info/TrueCloudLab/frostfs-crypto v0.6.0
|
||||
git.frostfs.info/TrueCloudLab/frostfs-locode-db v0.4.1-0.20240710074952-65761deb5c0d
|
||||
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20231101111734-b3ad3335ff65
|
||||
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20240726111349-9da46f566fec
|
||||
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20240813155821-98aabc45a720
|
||||
git.frostfs.info/TrueCloudLab/hrw v1.2.1
|
||||
git.frostfs.info/TrueCloudLab/policy-engine v0.0.0-20240712081403-2628f6184984
|
||||
git.frostfs.info/TrueCloudLab/policy-engine v0.0.0-20240814080254-96225afacb88
|
||||
git.frostfs.info/TrueCloudLab/tzhash v1.8.0
|
||||
git.frostfs.info/TrueCloudLab/zapjournald v0.0.0-20240124114243-cb2e66427d02
|
||||
github.com/cheggaaa/pb v1.0.29
|
||||
|
|
BIN
go.sum
|
@ -24,6 +24,48 @@ func New(c objectSvc.ServiceServer) *Server {
|
|||
}
|
||||
}
|
||||
|
||||
// Patch opens internal Object patch stream and feeds it by the data read from gRPC stream.
|
||||
func (s *Server) Patch(gStream objectGRPC.ObjectService_PatchServer) error {
|
||||
stream, err := s.srv.Patch()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for {
|
||||
req, err := gStream.Recv()
|
||||
if err != nil {
|
||||
if errors.Is(err, io.EOF) {
|
||||
resp, err := stream.CloseAndRecv(gStream.Context())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return gStream.SendAndClose(resp.ToGRPCMessage().(*objectGRPC.PatchResponse))
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
patchReq := new(object.PatchRequest)
|
||||
if err := patchReq.FromGRPCMessage(req); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := stream.Send(gStream.Context(), patchReq); err != nil {
|
||||
if errors.Is(err, util.ErrAbortStream) {
|
||||
resp, err := stream.CloseAndRecv(gStream.Context())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return gStream.SendAndClose(resp.ToGRPCMessage().(*objectGRPC.PatchResponse))
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Put opens internal Object service Put stream and overtakes data from gRPC stream to it.
|
||||
func (s *Server) Put(gStream objectGRPC.ObjectService_PutServer) error {
|
||||
stream, err := s.srv.Put()
|
||||
|
|
|
@ -35,6 +35,12 @@ type putStreamBasicChecker struct {
|
|||
next object.PutObjectStream
|
||||
}
|
||||
|
||||
type patchStreamBasicChecker struct {
|
||||
source *Service
|
||||
next object.PatchObjectStream
|
||||
nonFirstSend bool
|
||||
}
|
||||
|
||||
type getStreamBasicChecker struct {
|
||||
checker ACLChecker
|
||||
|
||||
|
@ -249,6 +255,15 @@ func (b Service) Put() (object.PutObjectStream, error) {
|
|||
}, err
|
||||
}
|
||||
|
||||
func (b Service) Patch() (object.PatchObjectStream, error) {
|
||||
streamer, err := b.next.Patch()
|
||||
|
||||
return &patchStreamBasicChecker{
|
||||
source: &b,
|
||||
next: streamer,
|
||||
}, err
|
||||
}
|
||||
|
||||
func (b Service) Head(
|
||||
ctx context.Context,
|
||||
request *objectV2.HeadRequest,
|
||||
|
@ -734,6 +749,65 @@ func (g *searchStreamBasicChecker) Send(resp *objectV2.SearchResponse) error {
|
|||
return g.SearchStream.Send(resp)
|
||||
}
|
||||
|
||||
func (p *patchStreamBasicChecker) Send(ctx context.Context, request *objectV2.PatchRequest) error {
|
||||
body := request.GetBody()
|
||||
if body == nil {
|
||||
return errEmptyBody
|
||||
}
|
||||
|
||||
if !p.nonFirstSend {
|
||||
p.nonFirstSend = true
|
||||
|
||||
cnr, err := getContainerIDFromRequest(request)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
objV2 := request.GetBody().GetAddress().GetObjectID()
|
||||
if objV2 == nil {
|
||||
return errors.New("missing oid")
|
||||
}
|
||||
obj := new(oid.ID)
|
||||
err = obj.ReadFromV2(*objV2)
|
||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
`*objV2`?
aarifullin
commented
Sure! Fixed Sure! Fixed
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var sTok *sessionSDK.Object
|
||||
sTok, err = readSessionToken(cnr, obj, request.GetMetaHeader().GetSessionToken())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
bTok, err := originalBearerToken(request.GetMetaHeader())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
req := MetaWithToken{
|
||||
vheader: request.GetVerificationHeader(),
|
||||
token: sTok,
|
||||
bearer: bTok,
|
||||
src: request,
|
||||
}
|
||||
|
||||
reqInfo, err := p.source.findRequestInfoWithoutACLOperationAssert(req, cnr)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
reqInfo.obj = obj
|
||||
|
||||
ctx = requestContext(ctx, reqInfo)
|
||||
}
|
||||
|
||||
return p.next.Send(ctx, request)
|
||||
}
|
||||
|
||||
func (p patchStreamBasicChecker) CloseAndRecv(ctx context.Context) (*objectV2.PatchResponse, error) {
|
||||
return p.next.CloseAndRecv(ctx)
|
||||
}
|
||||
|
||||
func (b Service) findRequestInfo(req MetaWithToken, idCnr cid.ID, op acl.Op) (info RequestInfo, err error) {
|
||||
cnr, err := b.containers.Get(idCnr) // fetch actual container
|
||||
if err != nil {
|
||||
|
@ -790,3 +864,56 @@ func (b Service) findRequestInfo(req MetaWithToken, idCnr cid.ID, op acl.Op) (in
|
|||
|
||||
return info, nil
|
||||
}
|
||||
|
||||
// findRequestInfoWithoutACLOperationAssert is findRequestInfo without session token verb assert.
|
||||
func (b Service) findRequestInfoWithoutACLOperationAssert(req MetaWithToken, idCnr cid.ID) (info RequestInfo, err error) {
|
||||
cnr, err := b.containers.Get(idCnr) // fetch actual container
|
||||
if err != nil {
|
||||
return info, err
|
||||
}
|
||||
|
||||
if req.token != nil {
|
||||
currentEpoch, err := b.nm.Epoch()
|
||||
if err != nil {
|
||||
return info, errors.New("can't fetch current epoch")
|
||||
}
|
||||
if req.token.ExpiredAt(currentEpoch) {
|
||||
return info, new(apistatus.SessionTokenExpired)
|
||||
}
|
||||
if req.token.InvalidAt(currentEpoch) {
|
||||
return info, fmt.Errorf("%s: token is invalid at %d epoch)",
|
||||
invalidRequestMessage, currentEpoch)
|
||||
}
|
||||
}
|
||||
|
||||
// find request role and key
|
||||
ownerID, ownerKey, err := req.RequestOwner()
|
||||
if err != nil {
|
||||
return info, err
|
||||
}
|
||||
res, err := b.c.Classify(ownerID, ownerKey, idCnr, cnr.Value)
|
||||
if err != nil {
|
||||
return info, err
|
||||
}
|
||||
|
||||
info.basicACL = cnr.Value.BasicACL()
|
||||
info.requestRole = res.Role
|
||||
info.cnrOwner = cnr.Value.Owner()
|
||||
info.idCnr = idCnr
|
||||
|
||||
cnrNamespace, hasNamespace := strings.CutSuffix(cnrSDK.ReadDomain(cnr.Value).Zone(), ".ns")
|
||||
if hasNamespace {
|
||||
info.cnrNamespace = cnrNamespace
|
||||
}
|
||||
|
||||
// it is assumed that at the moment the key will be valid,
|
||||
// otherwise the request would not pass validation
|
||||
info.senderKey = res.Key
|
||||
|
||||
// add bearer token if it is present in request
|
||||
info.bearer = req.bearer
|
||||
|
||||
info.srcRequest = req.src
|
||||
|
||||
return info, nil
|
||||
}
|
||||
|
|
|
@ -46,6 +46,8 @@ func getContainerIDFromRequest(req any) (cid.ID, error) {
|
|||
idV2 = v.GetBody().GetAddress().GetContainerID()
|
||||
case *objectV2.PutSingleRequest:
|
||||
idV2 = v.GetBody().GetObject().GetHeader().GetContainerID()
|
||||
case *objectV2.PatchRequest:
|
||||
idV2 = v.GetBody().GetAddress().GetContainerID()
|
||||
default:
|
||||
return cid.ID{}, errors.New("unknown request type")
|
||||
}
|
||||
|
@ -174,7 +176,7 @@ func isOwnerFromKey(id user.ID, key *keys.PublicKey) bool {
|
|||
func assertVerb(tok sessionSDK.Object, op acl.Op) bool {
|
||||
switch op {
|
||||
case acl.OpObjectPut:
|
||||
return tok.AssertVerb(sessionSDK.VerbObjectPut, sessionSDK.VerbObjectDelete)
|
||||
return tok.AssertVerb(sessionSDK.VerbObjectPut, sessionSDK.VerbObjectDelete, sessionSDK.VerbObjectPatch)
|
||||
case acl.OpObjectDelete:
|
||||
return tok.AssertVerb(sessionSDK.VerbObjectDelete)
|
||||
case acl.OpObjectGet:
|
||||
|
@ -185,11 +187,13 @@ func assertVerb(tok sessionSDK.Object, op acl.Op) bool {
|
|||
sessionSDK.VerbObjectGet,
|
||||
sessionSDK.VerbObjectDelete,
|
||||
sessionSDK.VerbObjectRange,
|
||||
sessionSDK.VerbObjectRangeHash)
|
||||
sessionSDK.VerbObjectRangeHash,
|
||||
sessionSDK.VerbObjectPatch,
|
||||
)
|
||||
case acl.OpObjectSearch:
|
||||
return tok.AssertVerb(sessionSDK.VerbObjectSearch, sessionSDK.VerbObjectDelete)
|
||||
case acl.OpObjectRange:
|
||||
return tok.AssertVerb(sessionSDK.VerbObjectRange, sessionSDK.VerbObjectRangeHash)
|
||||
return tok.AssertVerb(sessionSDK.VerbObjectRange, sessionSDK.VerbObjectRangeHash, sessionSDK.VerbObjectPatch)
|
||||
case acl.OpObjectHash:
|
||||
return tok.AssertVerb(sessionSDK.VerbObjectRangeHash)
|
||||
}
|
||||
|
|
|
@ -103,7 +103,8 @@ func (c *checkerImpl) newAPERequest(ctx context.Context, prm Prm) (aperequest.Re
|
|||
nativeschema.MethodHeadObject,
|
||||
nativeschema.MethodRangeObject,
|
||||
nativeschema.MethodHashObject,
|
||||
nativeschema.MethodDeleteObject:
|
||||
nativeschema.MethodDeleteObject,
|
||||
nativeschema.MethodPatchObject:
|
||||
if prm.Object == nil {
|
||||
return defaultRequest, fmt.Errorf("method %s: %w", prm.Method, errMissingOID)
|
||||
}
|
||||
|
|
|
@ -204,6 +204,62 @@ func (c *Service) Put() (objectSvc.PutObjectStream, error) {
|
|||
}, err
|
||||
}
|
||||
|
||||
type patchStreamBasicChecker struct {
|
||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
This seems like a useless wrapper currently. This seems like a useless wrapper currently.
Why do we need this?
aarifullin
commented
Removed Removed
fyrchik
commented
Now other methods have some APE checks defined here. Now other methods have some APE checks defined here.
Where are `Patch()` APE checks processed?
aarifullin
commented
I supposed to support I supposed to support `APE` checks in next PR as I need to introduce the method in `policy-engine` package. Currently, this is the stub but I can leave `TODO` for a while
fyrchik
commented
If they will be eventually introduced, OK, let's remove this wrapper. If they will be eventually introduced, OK, let's remove this wrapper.
I just do not see any reason not to do it now.
|
||||
apeChecker Checker
|
||||
|
||||
next objectSvc.PatchObjectStream
|
||||
|
||||
nonFirstSend bool
|
||||
}
|
||||
|
||||
func (p *patchStreamBasicChecker) Send(ctx context.Context, request *objectV2.PatchRequest) error {
|
||||
if !p.nonFirstSend {
|
||||
p.nonFirstSend = true
|
||||
|
||||
reqCtx, err := requestContext(ctx)
|
||||
if err != nil {
|
||||
return toStatusErr(err)
|
||||
}
|
||||
|
||||
cnrID, objID, err := getAddressParamsSDK(request.GetBody().GetAddress().GetContainerID(), request.GetBody().GetAddress().GetObjectID())
|
||||
if err != nil {
|
||||
return toStatusErr(err)
|
||||
}
|
||||
|
||||
prm := Prm{
|
||||
Namespace: reqCtx.Namespace,
|
||||
Container: cnrID,
|
||||
Object: objID,
|
||||
Method: nativeschema.MethodPatchObject,
|
||||
SenderKey: hex.EncodeToString(reqCtx.SenderKey),
|
||||
ContainerOwner: reqCtx.ContainerOwner,
|
||||
Role: nativeSchemaRole(reqCtx.Role),
|
||||
SoftAPECheck: reqCtx.SoftAPECheck,
|
||||
BearerToken: reqCtx.BearerToken,
|
||||
XHeaders: request.GetMetaHeader().GetXHeaders(),
|
||||
}
|
||||
|
||||
if err := p.apeChecker.CheckAPE(ctx, prm); err != nil {
|
||||
return toStatusErr(err)
|
||||
}
|
||||
}
|
||||
|
||||
return p.next.Send(ctx, request)
|
||||
}
|
||||
|
||||
func (p patchStreamBasicChecker) CloseAndRecv(ctx context.Context) (*objectV2.PatchResponse, error) {
|
||||
return p.next.CloseAndRecv(ctx)
|
||||
}
|
||||
|
||||
func (c *Service) Patch() (objectSvc.PatchObjectStream, error) {
|
||||
streamer, err := c.next.Patch()
|
||||
|
||||
return &patchStreamBasicChecker{
|
||||
apeChecker: c.apeChecker,
|
||||
next: streamer,
|
||||
}, err
|
||||
}
|
||||
|
||||
func (c *Service) Head(ctx context.Context, request *objectV2.HeadRequest) (*objectV2.HeadResponse, error) {
|
||||
cnrID, objID, err := getAddressParamsSDK(request.GetBody().GetAddress().GetContainerID(), request.GetBody().GetAddress().GetObjectID())
|
||||
if err != nil {
|
||||
|
|
|
@ -170,3 +170,64 @@ func (a *auditPutStream) Send(ctx context.Context, req *object.PutRequest) error
|
|||
}
|
||||
return err
|
||||
}
|
||||
|
||||
type auditPatchStream struct {
|
||||
stream PatchObjectStream
|
||||
log *logger.Logger
|
||||
|
||||
failed bool
|
||||
key []byte
|
||||
containerID *refs.ContainerID
|
||||
objectID *refs.ObjectID
|
||||
|
||||
nonFirstSend bool
|
||||
}
|
||||
|
||||
func (a *auditService) Patch() (PatchObjectStream, error) {
|
||||
res, err := a.next.Patch()
|
||||
if !a.enabled.Load() {
|
||||
return res, err
|
||||
}
|
||||
if err != nil {
|
||||
audit.LogRequest(a.log, objectGRPC.ObjectService_Patch_FullMethodName, nil, nil, false)
|
||||
return res, err
|
||||
}
|
||||
return &auditPatchStream{
|
||||
stream: res,
|
||||
log: a.log,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// CloseAndRecv implements PatchObjectStream.
|
||||
func (a *auditPatchStream) CloseAndRecv(ctx context.Context) (*object.PatchResponse, error) {
|
||||
resp, err := a.stream.CloseAndRecv(ctx)
|
||||
if err != nil {
|
||||
a.failed = true
|
||||
}
|
||||
a.objectID = resp.GetBody().ObjectID
|
||||
audit.LogRequestWithKey(a.log, objectGRPC.ObjectService_Patch_FullMethodName, a.key,
|
||||
audit.TargetFromContainerIDObjectID(a.containerID, a.objectID),
|
||||
!a.failed)
|
||||
return resp, err
|
||||
}
|
||||
|
||||
// Send implements PatchObjectStream.
|
||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
This should be similar to This should be similar to `put`, `GetAddress` can return nil on the non-first send, right?
aarifullin
commented
True. However, > This should be similar to put
True. However, `Patch` has no `init` message but that's can be fixed anyway
aarifullin
commented
Fixed Fixed
|
||||
func (a *auditPatchStream) Send(ctx context.Context, req *object.PatchRequest) error {
|
||||
if !a.nonFirstSend {
|
||||
a.containerID = req.GetBody().GetAddress().GetContainerID()
|
||||
a.objectID = req.GetBody().GetAddress().GetObjectID()
|
||||
a.key = req.GetVerificationHeader().GetBodySignature().GetKey()
|
||||
a.nonFirstSend = true
|
||||
}
|
||||
|
||||
err := a.stream.Send(ctx, req)
|
||||
if err != nil {
|
||||
a.failed = true
|
||||
}
|
||||
if !errors.Is(err, util.ErrAbortStream) { // CloseAndRecv will not be called, so log here
|
||||
audit.LogRequestWithKey(a.log, objectGRPC.ObjectService_Patch_FullMethodName, a.key,
|
||||
audit.TargetFromContainerIDObjectID(a.containerID, a.objectID),
|
||||
!a.failed)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -48,6 +48,14 @@ func (x *Common) Put() (PutObjectStream, error) {
|
|||
return x.nextHandler.Put()
|
||||
}
|
||||
|
||||
func (x *Common) Patch() (PatchObjectStream, error) {
|
||||
if x.state.IsMaintenance() {
|
||||
return nil, new(apistatus.NodeUnderMaintenance)
|
||||
}
|
||||
|
||||
return x.nextHandler.Patch()
|
||||
}
|
||||
|
||||
func (x *Common) Head(ctx context.Context, req *objectV2.HeadRequest) (*objectV2.HeadResponse, error) {
|
||||
if x.state.IsMaintenance() {
|
||||
return nil, new(apistatus.NodeUnderMaintenance)
|
||||
|
|
|
@ -124,6 +124,10 @@ func (p *commonPrm) SetRequestForwarder(f RequestForwarder) {
|
|||
p.forwarder = f
|
||||
}
|
||||
|
||||
func (p *commonPrm) SetSignerKey(signerKey *ecdsa.PrivateKey) {
|
||||
p.signerKey = signerKey
|
||||
}
|
||||
|
||||
// WithAddress sets object address to be read.
|
||||
func (p *commonPrm) WithAddress(addr oid.Address) {
|
||||
p.addr = addr
|
||||
|
|
|
@ -27,6 +27,12 @@ type (
|
|||
start time.Time
|
||||
}
|
||||
|
||||
patchStreamMetric struct {
|
||||
stream PatchObjectStream
|
||||
metrics MetricRegister
|
||||
start time.Time
|
||||
}
|
||||
|
||||
MetricRegister interface {
|
||||
AddRequestDuration(string, time.Duration, bool)
|
||||
AddPayloadSize(string, int)
|
||||
|
@ -76,6 +82,24 @@ func (m MetricCollector) Put() (PutObjectStream, error) {
|
|||
return m.next.Put()
|
||||
}
|
||||
|
||||
func (m MetricCollector) Patch() (PatchObjectStream, error) {
|
||||
if m.enabled {
|
||||
t := time.Now()
|
||||
|
||||
stream, err := m.next.Patch()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &patchStreamMetric{
|
||||
stream: stream,
|
||||
metrics: m.metrics,
|
||||
start: t,
|
||||
}, nil
|
||||
}
|
||||
return m.next.Patch()
|
||||
}
|
||||
|
||||
func (m MetricCollector) PutSingle(ctx context.Context, request *object.PutSingleRequest) (*object.PutSingleResponse, error) {
|
||||
if m.enabled {
|
||||
t := time.Now()
|
||||
|
@ -189,3 +213,16 @@ func (s putStreamMetric) CloseAndRecv(ctx context.Context) (*object.PutResponse,
|
|||
|
||||
return res, err
|
||||
}
|
||||
func (s patchStreamMetric) Send(ctx context.Context, req *object.PatchRequest) error {
|
||||
s.metrics.AddPayloadSize("Patch", len(req.GetBody().GetPatch().GetChunk()))
|
||||
|
||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
`GetPatch().GetChunk()` ?
It seems this way `len(nil)` will get us zero and we have no need to branch with `if`.
aarifullin
commented
Sorry, I need to fix Sorry, I need to fix `frostfs-api-go/v2` because I forgot to add the getter
aarifullin
commented
Fixed Fixed
|
||||
return s.stream.Send(ctx, req)
|
||||
}
|
||||
|
||||
func (s patchStreamMetric) CloseAndRecv(ctx context.Context) (*object.PatchResponse, error) {
|
||||
res, err := s.stream.CloseAndRecv(ctx)
|
||||
|
||||
s.metrics.AddRequestDuration("Patch", time.Since(s.start), err == nil)
|
||||
|
||||
return res, err
|
||||
}
|
||||
|
|
63
pkg/services/object/patch/range_provider.go
Normal file
|
@ -0,0 +1,63 @@
|
|||
package patchsvc
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/ecdsa"
|
||||
"io"
|
||||
|
||||
getsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/get"
|
||||
objectUtil "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
|
||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
patcherSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/patcher"
|
||||
)
|
||||
|
||||
func (p *pipeChunkWriter) WriteChunk(_ context.Context, chunk []byte) error {
|
||||
_, err := p.wr.Write(chunk)
|
||||
return err
|
||||
}
|
||||
|
||||
type rangeProvider struct {
|
||||
getSvc *getsvc.Service
|
||||
|
||||
addr oid.Address
|
||||
|
||||
commonPrm *objectUtil.CommonPrm
|
||||
|
||||
localNodeKey *ecdsa.PrivateKey
|
||||
}
|
||||
|
||||
var _ patcherSDK.RangeProvider = (*rangeProvider)(nil)
|
||||
|
||||
func (r *rangeProvider) GetRange(ctx context.Context, rng *objectSDK.Range) io.Reader {
|
||||
pipeReader, pipeWriter := io.Pipe()
|
||||
|
||||
var rngPrm getsvc.RangePrm
|
||||
rngPrm.SetSignerKey(r.localNodeKey)
|
||||
rngPrm.SetCommonParameters(r.commonPrm)
|
||||
|
||||
rngPrm.WithAddress(r.addr)
|
||||
rngPrm.SetChunkWriter(&pipeChunkWriter{
|
||||
wr: pipeWriter,
|
||||
})
|
||||
rngPrm.SetRange(rng)
|
||||
fyrchik
commented
What is the easiest way to see why this goroutine won't leak? What is the easiest way to see why this goroutine won't leak?
aarifullin
commented
If I get your question correctly, then the easiest way to check this - to cancel This way is way better:
If I get your question correctly, then the easiest way to check this - to cancel `ctx`.
When I was implementing `GetRange` I relied on this fact that `r.getSvc.GetRange(ctx, rngPrm)` returns an error by the cancelled job. But if it doesn't, the goroutine is screwed up.
This way is way better:
```go
func (r *rangeProvider) GetRange(ctx context.Context, rng *objectSDK.Range) io.Reader {
pipeReader, pipeWriter := io.Pipe()
var rngPrm getsvc.RangePrm
rngPrm.SetCommonParameters(r.commonPrm)
rngPrm.WithAddress(r.addr)
rngPrm.WithRawFlag(true)
rngPrm.SetChunkWriter(&pipeChunkWriter{
wr: pipeWriter,
})
rngPrm.SetRange(rng)
getRangeErr := make(chan error)
go func() {
defer pipeWriter.Close()
select {
case <-ctx.Done():
pipeWriter.CloseWithError(ctx.Err())
case err := <-getRangeErr:
pipeWriter.CloseWithError(err) // pipeWriter.CloseWithError(nil) -> pipeWriter.Close()
}
}()
go func() {
getRangeErr <- r.getSvc.GetRange(ctx, rngPrm)
}()
return pipeReader
}
```
|
||||
|
||||
getRangeErr := make(chan error)
|
||||
|
||||
go func() {
|
||||
defer pipeWriter.Close()
|
||||
|
||||
select {
|
||||
fyrchik
commented
What is so special about this error? What is so special about this error?
aarifullin
commented
You're right. From the patcher's POV it is the inability to get the original object's payloads You're right. From the patcher's POV it is the inability to get the original object's payloads
fyrchik
commented
This error is only returned on raw=true GET of a big object. This error is only returned on raw=true GET of a big object.
Do we currently handle big objects correctly?
|
||||
case <-ctx.Done():
|
||||
pipeWriter.CloseWithError(ctx.Err())
|
||||
case err := <-getRangeErr:
|
||||
pipeWriter.CloseWithError(err)
|
||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
Where will we handle this error in code? Where will we handle this error in code?
aarifullin
commented
This is done implicitly.
I already checked this trying to send incorrect object address to the handler and reading the range failed with error This is done implicitly. `pipeWriter` closes the pipe with the error and the reader within `patcher` will get an error by doing `Read`. So, streamer's `Send` gets this error and returns
```go
func (p *patcher) copyRange(ctx context.Context, rng *objectSDK.Range) error {
rdr := p.rangeProvider.GetRange(ctx, rng)
for {
buffOrigPayload := make([]byte, p.readerBuffSize)
n, readErr := rdr.Read(buffOrigPayload)
if readErr != nil {
if readErr != io.EOF {
return fmt.Errorf("read: %w", readErr)
}
}
_, wrErr := p.objectWriter.Write(ctx, buffOrigPayload[:n])
if wrErr != nil {
return fmt.Errorf("write: %w", wrErr)
}
if readErr == io.EOF {
break
}
}
return nil
}
```
I already checked this trying to send incorrect object address to the handler and reading the range failed with error
|
||||
}
|
||||
}()
|
||||
|
||||
go func() {
|
||||
getRangeErr <- r.getSvc.GetRange(ctx, rngPrm)
|
||||
}()
|
||||
|
||||
return pipeReader
|
||||
}
|
44
pkg/services/object/patch/service.go
Normal file
|
@ -0,0 +1,44 @@
|
|||
package patchsvc
|
||||
|
||||
import (
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object"
|
||||
getsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/get"
|
||||
putsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/put"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
|
||||
)
|
||||
|
||||
// Service implements Put operation of Object service v2.
|
||||
type Service struct {
|
||||
keyStorage *util.KeyStorage
|
||||
|
||||
getSvc *getsvc.Service
|
||||
|
||||
putSvc *putsvc.Service
|
||||
}
|
||||
|
||||
// NewService constructs Service instance from provided options.
|
||||
func NewService(ks *util.KeyStorage, getSvc *getsvc.Service, putSvc *putsvc.Service) *Service {
|
||||
return &Service{
|
||||
keyStorage: ks,
|
||||
|
||||
getSvc: getSvc,
|
||||
|
||||
putSvc: putSvc,
|
||||
}
|
||||
}
|
||||
|
||||
// Put calls internal service and returns v2 object streamer.
|
||||
func (s *Service) Patch() (object.PatchObjectStream, error) {
|
||||
nodeKey, err := s.keyStorage.GetKey(nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &Streamer{
|
||||
getSvc: s.getSvc,
|
||||
|
||||
putSvc: s.putSvc,
|
||||
|
||||
localNodeKey: nodeKey,
|
||||
}, nil
|
||||
}
|
221
pkg/services/object/patch/streamer.go
Normal file
|
@ -0,0 +1,221 @@
|
|||
package patchsvc
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/ecdsa"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
|
||||
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
|
||||
refsV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/refs"
|
||||
getsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/get"
|
||||
putsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/put"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/patcher"
|
||||
)
|
||||
|
||||
// Streamer for the patch handler is a pipeline that merges two incoming streams of patches
|
||||
// and original object payload chunks. The merged result is fed to Put stream target.
|
||||
type Streamer struct {
|
||||
// Patcher must be initialized at first Streamer.Send call.
|
||||
patcher patcher.PatchApplier
|
||||
|
||||
nonFirstSend bool
|
||||
|
||||
getSvc *getsvc.Service
|
||||
|
||||
putSvc *putsvc.Service
|
||||
|
||||
localNodeKey *ecdsa.PrivateKey
|
||||
}
|
||||
|
||||
type pipeChunkWriter struct {
|
||||
wr *io.PipeWriter
|
||||
}
|
||||
|
||||
type headResponseWriter struct {
|
||||
body *objectV2.HeadResponseBody
|
||||
}
|
||||
|
||||
func (w *headResponseWriter) WriteHeader(_ context.Context, hdr *objectSDK.Object) error {
|
||||
w.body.SetHeaderPart(toFullObjectHeader(hdr))
|
||||
return nil
|
||||
}
|
||||
|
||||
func toFullObjectHeader(hdr *objectSDK.Object) objectV2.GetHeaderPart {
|
||||
obj := hdr.ToV2()
|
||||
|
||||
hs := new(objectV2.HeaderWithSignature)
|
||||
hs.SetHeader(obj.GetHeader())
|
||||
hs.SetSignature(obj.GetSignature())
|
||||
|
||||
return hs
|
||||
}
|
||||
|
||||
func (s *Streamer) init(ctx context.Context, req *objectV2.PatchRequest) error {
|
||||
hdrWithSig, addr, err := s.readHeader(ctx, req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
commonPrm, err := util.CommonPrmFromV2(req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
commonPrm.WithLocalOnly(false)
|
||||
|
||||
rangeProvider := &rangeProvider{
|
||||
getSvc: s.getSvc,
|
||||
|
||||
addr: addr,
|
||||
|
||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
Will this stream will perform additional APE checks on everything? Will this stream will perform additional APE checks on everything?
aarifullin
commented
Fixed Fixed
|
||||
commonPrm: commonPrm,
|
||||
|
||||
localNodeKey: s.localNodeKey,
|
||||
}
|
||||
|
||||
putstm, err := s.putSvc.Put()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
hdr := hdrWithSig.GetHeader()
|
||||
oV2 := new(objectV2.Object)
|
||||
hV2 := new(objectV2.Header)
|
||||
oV2.SetHeader(hV2)
|
||||
oV2.GetHeader().SetContainerID(hdr.GetContainerID())
|
||||
oV2.GetHeader().SetPayloadLength(hdr.GetPayloadLength())
|
||||
oV2.GetHeader().SetAttributes(hdr.GetAttributes())
|
||||
|
||||
ownerID, err := newOwnerID(req.GetVerificationHeader())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
oV2.GetHeader().SetOwnerID(ownerID)
|
||||
|
||||
prm, err := s.putInitPrm(req, oV2)
|
||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
It seems you initialize It seems you initialize `putstm` just to violate incapsulation and get the target.
Please, let's make this target construction explicit instead.
If it is too much trouble now, please, create a task to fix this.
aarifullin
commented
The reason why I have violated the encapsulation - to make the > If it is too much trouble now
The reason why I have violated the encapsulation - to make the `Put` logic the same. So, this requires a code refactor to move the logic to common package. Otherwise, we would get two put approaches and each requires a support that will lead to errors for sure because any change in `PutSvc` must be reflected to `PatchSvc`
fyrchik
commented
Common code reuse is exactly what I was looking at. Not necessary in a different package. Common code reuse is exactly what I was looking at. Not necessary in a different package.
Could you create a task for this?
aarifullin
commented
https://git.frostfs.info/TrueCloudLab/frostfs-node/issues/1310
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = putstm.Init(ctx, prm)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
patcherPrm := patcher.Params{
|
||||
Header: objectSDK.NewFromV2(oV2),
|
||||
|
||||
RangeProvider: rangeProvider,
|
||||
|
||||
ObjectWriter: putstm.Target(),
|
||||
}
|
||||
|
||||
s.patcher = patcher.New(patcherPrm)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Streamer) readHeader(ctx context.Context, req *objectV2.PatchRequest) (hdrWithSig *objectV2.HeaderWithSignature, addr oid.Address, err error) {
|
||||
addrV2 := req.GetBody().GetAddress()
|
||||
if addrV2 == nil {
|
||||
err = errors.New("patch request has nil-address")
|
||||
return
|
||||
}
|
||||
|
||||
if err = addr.ReadFromV2(*addrV2); err != nil {
|
||||
err = fmt.Errorf("read address error: %w", err)
|
||||
return
|
||||
}
|
||||
|
||||
commonPrm, err := util.CommonPrmFromV2(req)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
commonPrm.WithLocalOnly(false)
|
||||
|
||||
var p getsvc.HeadPrm
|
||||
p.SetSignerKey(s.localNodeKey)
|
||||
p.SetCommonParameters(commonPrm)
|
||||
|
||||
resp := new(objectV2.HeadResponse)
|
||||
resp.SetBody(new(objectV2.HeadResponseBody))
|
||||
|
||||
p.WithAddress(addr)
|
||||
p.SetHeaderWriter(&headResponseWriter{
|
||||
body: resp.GetBody(),
|
||||
})
|
||||
|
||||
err = s.getSvc.Head(ctx, p)
|
||||
if err != nil {
|
||||
err = fmt.Errorf("get header error: %w", err)
|
||||
return
|
||||
}
|
||||
|
||||
var ok bool
|
||||
hdrPart := resp.GetBody().GetHeaderPart()
|
||||
if hdrWithSig, ok = hdrPart.(*objectV2.HeaderWithSignature); !ok {
|
||||
err = fmt.Errorf("unexpected header type: %T", hdrPart)
|
||||
}
|
||||
return
|
||||
}
|
||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
I expected that no such logic was needed in node, why do we have this I expected that no such logic was needed in node, why do we have this `if`?
aarifullin
commented
As we have no As we have no `init` message - we need to somehow track the first request message that can set attribute patch. I've replaced this with `nonFirstSend` flag
|
||||
|
||||
func (s *Streamer) Send(ctx context.Context, req *objectV2.PatchRequest) error {
|
||||
ctx, span := tracing.StartSpanFromContext(ctx, "patch.streamer.Send")
|
||||
defer span.End()
|
||||
|
||||
defer func() {
|
||||
s.nonFirstSend = true
|
||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
If both fields are nil (attributes and payload), this should be an error, is it? If both fields are nil (attributes and payload), this should be an error, is it?
aarifullin
commented
The check has been added The check has been added
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
remove remove `error`? it will occur in each level of wrapping (and in log message most likely)
aarifullin
commented
True True
aarifullin
commented
Fixed Fixed
|
||||
}()
|
||||
|
||||
if !s.nonFirstSend {
|
||||
if err := s.init(ctx, req); err != nil {
|
||||
return fmt.Errorf("streamer init error: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
patch := new(objectSDK.Patch)
|
||||
patch.FromV2(req.GetBody())
|
||||
|
||||
if !s.nonFirstSend {
|
||||
err := s.patcher.ApplyAttributesPatch(ctx, patch.NewAttributes, patch.ReplaceAttributes)
|
||||
if err != nil {
|
||||
return fmt.Errorf("patch attributes: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
if patch.PayloadPatch != nil {
|
||||
err := s.patcher.ApplyPayloadPatch(ctx, patch.PayloadPatch)
|
||||
if err != nil {
|
||||
return fmt.Errorf("patch payload: %w", err)
|
||||
}
|
||||
} else if s.nonFirstSend {
|
||||
return errors.New("invalid non-first patch: empty payload")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Streamer) CloseAndRecv(ctx context.Context) (*objectV2.PatchResponse, error) {
|
||||
patcherResp, err := s.patcher.Close(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
oidV2 := new(refsV2.ObjectID)
|
||||
|
||||
if patcherResp.AccessIdentifiers.ParentID != nil {
|
||||
patcherResp.AccessIdentifiers.ParentID.WriteToV2(oidV2)
|
||||
} else {
|
||||
patcherResp.AccessIdentifiers.SelfID.WriteToV2(oidV2)
|
||||
}
|
||||
|
||||
return &objectV2.PatchResponse{
|
||||
Body: &objectV2.PatchResponseBody{
|
||||
ObjectID: oidV2,
|
||||
},
|
||||
}, nil
|
||||
}
|
53
pkg/services/object/patch/util.go
Normal file
|
@ -0,0 +1,53 @@
|
|||
package patchsvc
|
||||
|
||||
import (
|
||||
"crypto/ecdsa"
|
||||
"crypto/elliptic"
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/refs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/session"
|
||||
putsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/put"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
|
||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
|
||||
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
||||
)
|
||||
|
||||
// putInitPrm initializes put paramerer for Put stream.
|
||||
func (s *Streamer) putInitPrm(req *objectV2.PatchRequest, obj *objectV2.Object) (*putsvc.PutInitPrm, error) {
|
||||
commonPrm, err := util.CommonPrmFromV2(req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
prm := new(putsvc.PutInitPrm)
|
||||
prm.WithObject(objectSDK.NewFromV2(obj)).
|
||||
WithCommonPrm(commonPrm).
|
||||
WithPrivateKey(s.localNodeKey)
|
||||
|
||||
return prm, nil
|
||||
}
|
||||
|
||||
func newOwnerID(vh *session.RequestVerificationHeader) (*refs.OwnerID, error) {
|
||||
for vh.GetOrigin() != nil {
|
||||
vh = vh.GetOrigin()
|
||||
}
|
||||
sig := vh.GetBodySignature()
|
||||
if sig == nil {
|
||||
return nil, errors.New("empty body signature")
|
||||
}
|
||||
key, err := keys.NewPublicKeyFromBytes(sig.GetKey(), elliptic.P256())
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("invalid signature key: %w", err)
|
||||
}
|
||||
|
||||
var userID user.ID
|
||||
user.IDFromKey(&userID, (ecdsa.PublicKey)(*key))
|
||||
ownID := new(refs.OwnerID)
|
||||
userID.WriteToV2(ownID)
|
||||
|
||||
return ownID, nil
|
||||
}
|
|
@ -2,6 +2,7 @@ package putsvc
|
|||
|
||||
import (
|
||||
"context"
|
||||
"crypto/ecdsa"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
|
||||
|
@ -20,6 +21,8 @@ type PutInitPrm struct {
|
|||
traverseOpts []placement.Option
|
||||
|
||||
relay func(context.Context, client.NodeInfo, client.MultiAddressClient) error
|
||||
|
||||
privateKey *ecdsa.PrivateKey
|
||||
}
|
||||
|
||||
type PutChunkPrm struct {
|
||||
|
@ -65,3 +68,11 @@ func (p *PutChunkPrm) WithChunk(v []byte) *PutChunkPrm {
|
|||
|
||||
return p
|
||||
}
|
||||
|
||||
func (p *PutInitPrm) WithPrivateKey(v *ecdsa.PrivateKey) *PutInitPrm {
|
||||
if p != nil {
|
||||
p.privateKey = v
|
||||
}
|
||||
|
||||
return p
|
||||
}
|
||||
|
|
|
@ -47,6 +47,11 @@ func (p *Streamer) Init(ctx context.Context, prm *PutInitPrm) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// Target accesses underlying target chunked object writer.
|
||||
func (p *Streamer) Target() transformer.ChunkedObjectWriter {
|
||||
return p.target
|
||||
}
|
||||
|
||||
// MaxObjectSize returns maximum payload size for the streaming session.
|
||||
//
|
||||
// Must be called after the successful Init.
|
||||
|
@ -79,11 +84,15 @@ func (p *Streamer) initTarget(prm *PutInitPrm) error {
|
|||
func (p *Streamer) initUntrustedTarget(prm *PutInitPrm) error {
|
||||
p.relay = prm.relay
|
||||
|
||||
if prm.privateKey != nil {
|
||||
p.privateKey = prm.privateKey
|
||||
} else {
|
||||
nodeKey, err := p.cfg.keyStorage.GetKey(nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
p.privateKey = nodeKey
|
||||
}
|
||||
|
||||
// prepare untrusted-Put object target
|
||||
p.target = &validatingPreparedTarget{
|
||||
|
@ -136,7 +145,11 @@ func (p *Streamer) initTrustedTarget(prm *PutInitPrm) error {
|
|||
}
|
||||
}
|
||||
|
||||
if prm.privateKey != nil {
|
||||
p.privateKey = prm.privateKey
|
||||
} else {
|
||||
p.privateKey = key
|
||||
}
|
||||
p.target = &validatingTarget{
|
||||
fmt: p.fmtValidator,
|
||||
nextTarget: transformer.NewPayloadSizeLimiter(transformer.Params{
|
||||
|
|
|
@ -37,6 +37,11 @@ type putStreamResponser struct {
|
|||
respSvc *response.Service
|
||||
}
|
||||
|
||||
type patchStreamResponser struct {
|
||||
stream PatchObjectStream
|
||||
respSvc *response.Service
|
||||
}
|
||||
|
||||
// NewResponseService returns object service instance that passes internal service
|
||||
// call to response service.
|
||||
func NewResponseService(objSvc ServiceServer, respSvc *response.Service) *ResponseService {
|
||||
|
@ -87,6 +92,35 @@ func (s *ResponseService) Put() (PutObjectStream, error) {
|
|||
}, nil
|
||||
}
|
||||
|
||||
func (s *patchStreamResponser) Send(ctx context.Context, req *object.PatchRequest) error {
|
||||
if err := s.stream.Send(ctx, req); err != nil {
|
||||
return fmt.Errorf("could not send the request: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *patchStreamResponser) CloseAndRecv(ctx context.Context) (*object.PatchResponse, error) {
|
||||
r, err := s.stream.CloseAndRecv(ctx)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not close stream and receive response: %w", err)
|
||||
}
|
||||
|
||||
s.respSvc.SetMeta(r)
|
||||
return r, nil
|
||||
}
|
||||
|
||||
func (s *ResponseService) Patch() (PatchObjectStream, error) {
|
||||
stream, err := s.svc.Patch()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not create Put object streamer: %w", err)
|
||||
}
|
||||
|
||||
return &patchStreamResponser{
|
||||
stream: stream,
|
||||
respSvc: s.respSvc,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (s *ResponseService) PutSingle(ctx context.Context, req *object.PutSingleRequest) (*object.PutSingleResponse, error) {
|
||||
resp, err := s.svc.PutSingle(ctx, req)
|
||||
if err != nil {
|
||||
|
|
|
@ -31,11 +31,18 @@ type PutObjectStream interface {
|
|||
CloseAndRecv(context.Context) (*object.PutResponse, error)
|
||||
}
|
||||
|
||||
// PatchObjectStream is an interface of FrostFS API v2 compatible patch streamer.
|
||||
type PatchObjectStream interface {
|
||||
Send(context.Context, *object.PatchRequest) error
|
||||
CloseAndRecv(context.Context) (*object.PatchResponse, error)
|
||||
}
|
||||
|
||||
// ServiceServer is an interface of utility
|
||||
// serving v2 Object service.
|
||||
type ServiceServer interface {
|
||||
Get(*object.GetRequest, GetObjectStream) error
|
||||
Put() (PutObjectStream, error)
|
||||
Patch() (PatchObjectStream, error)
|
||||
Head(context.Context, *object.HeadRequest) (*object.HeadResponse, error)
|
||||
Search(*object.SearchRequest, SearchStream) error
|
||||
Delete(context.Context, *object.DeleteRequest) (*object.DeleteResponse, error)
|
||||
|
|
|
@ -35,6 +35,12 @@ type putStreamSigner struct {
|
|||
err error
|
||||
}
|
||||
|
||||
type patchStreamSigner struct {
|
||||
sigSvc *util.SignService
|
||||
stream PatchObjectStream
|
||||
err error
|
||||
}
|
||||
|
||||
type getRangeStreamSigner struct {
|
||||
GetObjectRangeStream
|
||||
sigSvc *util.SignService
|
||||
|
@ -112,6 +118,42 @@ func (s *SignService) Put() (PutObjectStream, error) {
|
|||
}, nil
|
||||
}
|
||||
|
||||
func (s *patchStreamSigner) Send(ctx context.Context, req *object.PatchRequest) error {
|
||||
if s.err = s.sigSvc.VerifyRequest(req); s.err != nil {
|
||||
return util.ErrAbortStream
|
||||
}
|
||||
if s.err = s.stream.Send(ctx, req); s.err != nil {
|
||||
return util.ErrAbortStream
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *patchStreamSigner) CloseAndRecv(ctx context.Context) (resp *object.PatchResponse, err error) {
|
||||
if s.err != nil {
|
||||
err = s.err
|
||||
resp = new(object.PatchResponse)
|
||||
} else {
|
||||
resp, err = s.stream.CloseAndRecv(ctx)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not close stream and receive response: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
return resp, s.sigSvc.SignResponse(resp, err)
|
||||
}
|
||||
|
||||
func (s *SignService) Patch() (PatchObjectStream, error) {
|
||||
stream, err := s.svc.Patch()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not create Put object streamer: %w", err)
|
||||
}
|
||||
|
||||
return &patchStreamSigner{
|
||||
stream: stream,
|
||||
sigSvc: s.sigSvc,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (s *SignService) Head(ctx context.Context, req *object.HeadRequest) (*object.HeadResponse, error) {
|
||||
if err := s.sigSvc.VerifyRequest(req); err != nil {
|
||||
resp := new(object.HeadResponse)
|
||||
|
|
|
@ -91,6 +91,10 @@ func (c TransportSplitter) Put() (PutObjectStream, error) {
|
|||
return c.next.Put()
|
||||
}
|
||||
|
||||
func (c TransportSplitter) Patch() (PatchObjectStream, error) {
|
||||
return c.next.Patch()
|
||||
}
|
||||
|
||||
func (c TransportSplitter) Head(ctx context.Context, request *object.HeadRequest) (*object.HeadResponse, error) {
|
||||
return c.next.Head(ctx, request)
|
||||
}
|
||||
|
|
So, basically
cmp.Compare
? https://pkg.go.dev/cmp#CompareFixed