object: Implement Patch
method #1307
27 changed files with 1083 additions and 14 deletions
|
@ -2,10 +2,13 @@ package internal
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
|
"cmp"
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
|
"os"
|
||||||
|
"slices"
|
||||||
"sort"
|
"sort"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
|
@ -869,3 +872,65 @@ func SyncContainerSettings(ctx context.Context, prm SyncContainerPrm) (*SyncCont
|
||||||
|
|
||||||
return new(SyncContainerRes), nil
|
return new(SyncContainerRes), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// PatchObjectPrm groups parameters of PatchObject operation.
|
||||||
|
type PatchObjectPrm struct {
|
||||||
|
commonObjectPrm
|
||||||
|
objectAddressPrm
|
||||||
|
|
||||||
|
NewAttributes []objectSDK.Attribute
|
||||||
|
|
||||||
|
ReplaceAttribute bool
|
||||||
|
|
||||||
|
PayloadPatches []PayloadPatch
|
||||||
|
}
|
||||||
|
|
||||||
|
type PayloadPatch struct {
|
||||||
|
Range objectSDK.Range
|
||||||
|
|
||||||
|
PayloadPath string
|
||||||
|
}
|
||||||
|
|
||||||
|
type PatchRes struct {
|
||||||
|
OID oid.ID
|
||||||
|
}
|
||||||
|
|
||||||
|
func Patch(ctx context.Context, prm PatchObjectPrm) (*PatchRes, error) {
|
||||||
|
patchPrm := client.PrmObjectPatch{
|
||||||
|
XHeaders: prm.xHeaders,
|
||||||
|
BearerToken: prm.bearerToken,
|
||||||
|
Session: prm.sessionToken,
|
||||||
|
Address: prm.objAddr,
|
||||||
|
}
|
||||||
|
|
||||||
|
slices.SortFunc(prm.PayloadPatches, func(a, b PayloadPatch) int {
|
||||||
|
return cmp.Compare(a.Range.GetOffset(), b.Range.GetOffset())
|
||||||
|
})
|
||||||
|
|
||||||
|
patcher, err := prm.cli.ObjectPatchInit(ctx, patchPrm)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("init payload reading: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if patcher.PatchAttributes(ctx, prm.NewAttributes, prm.ReplaceAttribute) {
|
||||||
|
for _, pp := range prm.PayloadPatches {
|
||||||
|
payloadFile, err := os.OpenFile(pp.PayloadPath, os.O_RDONLY, os.ModePerm)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
applied := patcher.PatchPayload(ctx, &pp.Range, payloadFile)
|
||||||
|
_ = payloadFile.Close()
|
||||||
|
if !applied {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
res, err := patcher.Close(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return &PatchRes{
|
||||||
|
OID: res.ObjectID(),
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
151
cmd/frostfs-cli/modules/object/patch.go
Normal file
151
cmd/frostfs-cli/modules/object/patch.go
Normal file
|
@ -0,0 +1,151 @@
|
||||||
|
package object
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"strconv"
|
||||||
|
"strings"
|
||||||
|
|
||||||
|
internalclient "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/client"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/commonflags"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/key"
|
||||||
|
commonCmd "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/internal/common"
|
||||||
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||||
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
|
"github.com/spf13/cobra"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
newAttrsFlagName = "new-attrs"
|
||||||
|
replaceAttrsFlagName = "replace-attrs"
|
||||||
|
rangeFlagName = "range"
|
||||||
|
payloadFlagName = "payload"
|
||||||
|
)
|
||||||
|
|
||||||
|
var objectPatchCmd = &cobra.Command{
|
||||||
|
Use: "patch",
|
||||||
|
Run: patch,
|
||||||
|
Short: "Patch FrostFS object",
|
||||||
|
Long: "Patch FrostFS object. Each range passed to the command requires to pass a corresponding patch payload.",
|
||||||
|
Example: `
|
||||||
|
frostfs-cli -c config.yml -r 127.0.0.1:8080 object patch --cid <CID> --oid <OID> --new-attrs 'key1=val1,key2=val2' --replace-attrs
|
||||||
|
frostfs-cli -c config.yml -r 127.0.0.1:8080 object patch --cid <CID> --oid <OID> --range offX:lnX --payload /path/to/payloadX --range offY:lnY --payload /path/to/payloadY
|
||||||
|
frostfs-cli -c config.yml -r 127.0.0.1:8080 object patch --cid <CID> --oid <OID> --new-attrs 'key1=val1,key2=val2' --replace-attrs --range offX:lnX --payload /path/to/payload
|
||||||
|
`,
|
||||||
|
}
|
||||||
|
|
||||||
|
func initObjectPatchCmd() {
|
||||||
|
commonflags.Init(objectPatchCmd)
|
||||||
|
initFlagSession(objectPatchCmd, "PATCH")
|
||||||
|
|
||||||
|
flags := objectPatchCmd.Flags()
|
||||||
|
|
||||||
|
flags.String(commonflags.CIDFlag, "", commonflags.CIDFlagUsage)
|
||||||
|
_ = objectRangeCmd.MarkFlagRequired(commonflags.CIDFlag)
|
||||||
|
|
||||||
|
flags.String(commonflags.OIDFlag, "", commonflags.OIDFlagUsage)
|
||||||
|
_ = objectRangeCmd.MarkFlagRequired(commonflags.OIDFlag)
|
||||||
|
|
||||||
|
flags.String(newAttrsFlagName, "", "New object attributes in form of Key1=Value1,Key2=Value2")
|
||||||
|
flags.Bool(replaceAttrsFlagName, false, "Replace object attributes by new ones.")
|
||||||
|
flags.StringSlice(rangeFlagName, []string{}, "Range to which patch payload is applied. Format: offset:length")
|
||||||
|
flags.StringSlice(payloadFlagName, []string{}, "Path to file with patch payload.")
|
||||||
|
}
|
||||||
|
|
||||||
|
func patch(cmd *cobra.Command, _ []string) {
|
||||||
|
var cnr cid.ID
|
||||||
|
var obj oid.ID
|
||||||
|
|
||||||
|
objAddr := readObjectAddress(cmd, &cnr, &obj)
|
||||||
|
|
||||||
|
ranges, err := getRangeSlice(cmd)
|
||||||
|
commonCmd.ExitOnErr(cmd, "", err)
|
||||||
|
|
||||||
|
payloads := patchPayloadPaths(cmd)
|
||||||
|
|
||||||
|
if len(ranges) != len(payloads) {
|
||||||
|
commonCmd.ExitOnErr(cmd, "", fmt.Errorf("the number of ranges and payloads are not equal: ranges = %d, payloads = %d", len(ranges), len(payloads)))
|
||||||
|
}
|
||||||
|
|
||||||
|
newAttrs, err := parseNewObjectAttrs(cmd)
|
||||||
|
commonCmd.ExitOnErr(cmd, "can't parse new object attributes: %w", err)
|
||||||
|
replaceAttrs, _ := cmd.Flags().GetBool(replaceAttrsFlagName)
|
||||||
|
|
||||||
|
pk := key.GetOrGenerate(cmd)
|
||||||
|
|
||||||
|
cli := internalclient.GetSDKClientByFlag(cmd, pk, commonflags.RPC)
|
||||||
|
|
||||||
|
var prm internalclient.PatchObjectPrm
|
||||||
|
prm.SetClient(cli)
|
||||||
|
Prepare(cmd, &prm)
|
||||||
|
ReadOrOpenSession(cmd, &prm, pk, cnr, nil)
|
||||||
|
|
||||||
|
prm.SetAddress(objAddr)
|
||||||
|
prm.NewAttributes = newAttrs
|
||||||
|
prm.ReplaceAttribute = replaceAttrs
|
||||||
|
|
||||||
|
for i := range ranges {
|
||||||
|
prm.PayloadPatches = append(prm.PayloadPatches, internalclient.PayloadPatch{
|
||||||
|
Range: ranges[i],
|
||||||
|
PayloadPath: payloads[i],
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
res, err := internalclient.Patch(cmd.Context(), prm)
|
||||||
|
if err != nil {
|
||||||
|
commonCmd.ExitOnErr(cmd, "can't patch the object: %w", err)
|
||||||
|
}
|
||||||
|
cmd.Println("Patched object ID: ", res.OID.EncodeToString())
|
||||||
|
}
|
||||||
|
|
||||||
|
func parseNewObjectAttrs(cmd *cobra.Command) ([]objectSDK.Attribute, error) {
|
||||||
|
var rawAttrs []string
|
||||||
|
|
||||||
|
raw := cmd.Flag(newAttrsFlagName).Value.String()
|
||||||
|
if len(raw) != 0 {
|
||||||
|
rawAttrs = strings.Split(raw, ",")
|
||||||
|
}
|
||||||
|
|
||||||
|
attrs := make([]objectSDK.Attribute, len(rawAttrs), len(rawAttrs)+2) // name + timestamp attributes
|
||||||
|
for i := range rawAttrs {
|
||||||
|
k, v, found := strings.Cut(rawAttrs[i], "=")
|
||||||
|
if !found {
|
||||||
|
return nil, fmt.Errorf("invalid attribute format: %s", rawAttrs[i])
|
||||||
|
}
|
||||||
|
attrs[i].SetKey(k)
|
||||||
|
attrs[i].SetValue(v)
|
||||||
|
}
|
||||||
|
return attrs, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func getRangeSlice(cmd *cobra.Command) ([]objectSDK.Range, error) {
|
||||||
|
v, _ := cmd.Flags().GetStringSlice(rangeFlagName)
|
||||||
|
if len(v) == 0 {
|
||||||
|
return []objectSDK.Range{}, nil
|
||||||
|
}
|
||||||
|
rs := make([]objectSDK.Range, len(v))
|
||||||
|
for i := range v {
|
||||||
|
before, after, found := strings.Cut(v[i], rangeSep)
|
||||||
|
if !found {
|
||||||
|
return nil, fmt.Errorf("invalid range specifier: %s", v[i])
|
||||||
|
}
|
||||||
|
|
||||||
|
offset, err := strconv.ParseUint(before, 10, 64)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("invalid '%s' range offset specifier: %w", v[i], err)
|
||||||
|
}
|
||||||
|
length, err := strconv.ParseUint(after, 10, 64)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("invalid '%s' range length specifier: %w", v[i], err)
|
||||||
|
}
|
||||||
|
|
||||||
|
rs[i].SetOffset(offset)
|
||||||
|
rs[i].SetLength(length)
|
||||||
|
}
|
||||||
|
return rs, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func patchPayloadPaths(cmd *cobra.Command) []string {
|
||||||
|
v, _ := cmd.Flags().GetStringSlice(payloadFlagName)
|
||||||
|
return v
|
||||||
|
}
|
|
@ -29,6 +29,7 @@ func init() {
|
||||||
objectRangeCmd,
|
objectRangeCmd,
|
||||||
objectLockCmd,
|
objectLockCmd,
|
||||||
objectNodesCmd,
|
objectNodesCmd,
|
||||||
|
objectPatchCmd,
|
||||||
}
|
}
|
||||||
|
|
||||||
Cmd.AddCommand(objectChildCommands...)
|
Cmd.AddCommand(objectChildCommands...)
|
||||||
|
@ -39,6 +40,7 @@ func init() {
|
||||||
}
|
}
|
||||||
|
|
||||||
initObjectPutCmd()
|
initObjectPutCmd()
|
||||||
|
initObjectPatchCmd()
|
||||||
initObjectDeleteCmd()
|
initObjectDeleteCmd()
|
||||||
initObjectGetCmd()
|
initObjectGetCmd()
|
||||||
initObjectSearchCmd()
|
initObjectSearchCmd()
|
||||||
|
|
|
@ -306,6 +306,8 @@ func finalizeSession(cmd *cobra.Command, dst SessionPrm, tok *session.Object, ke
|
||||||
case *internal.PutObjectPrm:
|
case *internal.PutObjectPrm:
|
||||||
common.PrintVerbose(cmd, "Binding session to object PUT...")
|
common.PrintVerbose(cmd, "Binding session to object PUT...")
|
||||||
tok.ForVerb(session.VerbObjectPut)
|
tok.ForVerb(session.VerbObjectPut)
|
||||||
|
case *internal.PatchObjectPrm:
|
||||||
|
tok.ForVerb(session.VerbObjectPatch)
|
||||||
case *internal.DeleteObjectPrm:
|
case *internal.DeleteObjectPrm:
|
||||||
common.PrintVerbose(cmd, "Binding session to object DELETE...")
|
common.PrintVerbose(cmd, "Binding session to object DELETE...")
|
||||||
tok.ForVerb(session.VerbObjectDelete)
|
tok.ForVerb(session.VerbObjectDelete)
|
||||||
|
|
|
@ -239,6 +239,8 @@ func parseAction(lexeme string) ([]string, bool, error) {
|
||||||
return []string{nativeschema.MethodRangeObject}, true, nil
|
return []string{nativeschema.MethodRangeObject}, true, nil
|
||||||
case "object.hash":
|
case "object.hash":
|
||||||
return []string{nativeschema.MethodHashObject}, true, nil
|
return []string{nativeschema.MethodHashObject}, true, nil
|
||||||
|
case "object.patch":
|
||||||
|
return []string{nativeschema.MethodPatchObject}, true, nil
|
||||||
case "object.*":
|
case "object.*":
|
||||||
return []string{
|
return []string{
|
||||||
nativeschema.MethodPutObject,
|
nativeschema.MethodPutObject,
|
||||||
|
@ -248,6 +250,7 @@ func parseAction(lexeme string) ([]string, bool, error) {
|
||||||
nativeschema.MethodSearchObject,
|
nativeschema.MethodSearchObject,
|
||||||
nativeschema.MethodRangeObject,
|
nativeschema.MethodRangeObject,
|
||||||
nativeschema.MethodHashObject,
|
nativeschema.MethodHashObject,
|
||||||
|
nativeschema.MethodPatchObject,
|
||||||
}, true, nil
|
}, true, nil
|
||||||
case "container.put":
|
case "container.put":
|
||||||
return []string{nativeschema.MethodPutContainer}, false, nil
|
return []string{nativeschema.MethodPutContainer}, false, nil
|
||||||
|
|
|
@ -28,6 +28,7 @@ import (
|
||||||
deletesvcV2 "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/delete/v2"
|
deletesvcV2 "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/delete/v2"
|
||||||
getsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/get"
|
getsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/get"
|
||||||
getsvcV2 "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/get/v2"
|
getsvcV2 "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/get/v2"
|
||||||
|
patchsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/patch"
|
||||||
putsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/put"
|
putsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/put"
|
||||||
putsvcV2 "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/put/v2"
|
putsvcV2 "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/put/v2"
|
||||||
searchsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/search"
|
searchsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/search"
|
||||||
|
@ -54,6 +55,8 @@ type objectSvc struct {
|
||||||
get *getsvcV2.Service
|
get *getsvcV2.Service
|
||||||
|
|
||||||
delete *deletesvcV2.Service
|
delete *deletesvcV2.Service
|
||||||
|
|
||||||
|
patch *patchsvc.Service
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *cfg) MaxObjectSize() uint64 {
|
func (c *cfg) MaxObjectSize() uint64 {
|
||||||
|
@ -71,6 +74,10 @@ func (s *objectSvc) Put() (objectService.PutObjectStream, error) {
|
||||||
return s.put.Put()
|
return s.put.Put()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *objectSvc) Patch() (objectService.PatchObjectStream, error) {
|
||||||
|
return s.patch.Patch()
|
||||||
|
}
|
||||||
|
|
||||||
func (s *objectSvc) PutSingle(ctx context.Context, req *object.PutSingleRequest) (*object.PutSingleResponse, error) {
|
func (s *objectSvc) PutSingle(ctx context.Context, req *object.PutSingleRequest) (*object.PutSingleResponse, error) {
|
||||||
return s.put.PutSingle(ctx, req)
|
return s.put.PutSingle(ctx, req)
|
||||||
}
|
}
|
||||||
|
@ -181,10 +188,12 @@ func initObjectService(c *cfg) {
|
||||||
|
|
||||||
sDeleteV2 := createDeleteServiceV2(sDelete)
|
sDeleteV2 := createDeleteServiceV2(sDelete)
|
||||||
|
|
||||||
|
sPatch := createPatchSvc(sGet, sPut, keyStorage)
|
||||||
|
|
||||||
// build service pipeline
|
// build service pipeline
|
||||||
// grpc | audit | <metrics> | signature | response | acl | ape | split
|
// grpc | audit | <metrics> | signature | response | acl | ape | split
|
||||||
|
|
||||||
splitSvc := createSplitService(c, sPutV2, sGetV2, sSearchV2, sDeleteV2)
|
splitSvc := createSplitService(c, sPutV2, sGetV2, sSearchV2, sDeleteV2, sPatch)
|
||||||
|
|
||||||
apeSvc := createAPEService(c, splitSvc)
|
apeSvc := createAPEService(c, splitSvc)
|
||||||
|
|
||||||
|
@ -353,6 +362,10 @@ func createPutSvcV2(sPut *putsvc.Service, keyStorage *util.KeyStorage) *putsvcV2
|
||||||
return putsvcV2.NewService(sPut, keyStorage)
|
return putsvcV2.NewService(sPut, keyStorage)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func createPatchSvc(sGet *getsvc.Service, sPut *putsvc.Service, keyStorage *util.KeyStorage) *patchsvc.Service {
|
||||||
|
return patchsvc.NewService(keyStorage, sGet, sPut)
|
||||||
|
}
|
||||||
|
|
||||||
func createSearchSvc(c *cfg, keyStorage *util.KeyStorage, traverseGen *util.TraverserGenerator, coreConstructor *cache.ClientCache) *searchsvc.Service {
|
func createSearchSvc(c *cfg, keyStorage *util.KeyStorage, traverseGen *util.TraverserGenerator, coreConstructor *cache.ClientCache) *searchsvc.Service {
|
||||||
ls := c.cfgObject.cfgLocalStorage.localStorage
|
ls := c.cfgObject.cfgLocalStorage.localStorage
|
||||||
|
|
||||||
|
@ -425,7 +438,7 @@ func createDeleteServiceV2(sDelete *deletesvc.Service) *deletesvcV2.Service {
|
||||||
}
|
}
|
||||||
|
|
||||||
func createSplitService(c *cfg, sPutV2 *putsvcV2.Service, sGetV2 *getsvcV2.Service,
|
func createSplitService(c *cfg, sPutV2 *putsvcV2.Service, sGetV2 *getsvcV2.Service,
|
||||||
sSearchV2 *searchsvcV2.Service, sDeleteV2 *deletesvcV2.Service,
|
sSearchV2 *searchsvcV2.Service, sDeleteV2 *deletesvcV2.Service, sPatch *patchsvc.Service,
|
||||||
) *objectService.TransportSplitter {
|
) *objectService.TransportSplitter {
|
||||||
return objectService.NewTransportSplitter(
|
return objectService.NewTransportSplitter(
|
||||||
c.cfgGRPC.maxChunkSize,
|
c.cfgGRPC.maxChunkSize,
|
||||||
|
@ -435,6 +448,7 @@ func createSplitService(c *cfg, sPutV2 *putsvcV2.Service, sGetV2 *getsvcV2.Servi
|
||||||
search: sSearchV2,
|
search: sSearchV2,
|
||||||
get: sGetV2,
|
get: sGetV2,
|
||||||
delete: sDeleteV2,
|
delete: sDeleteV2,
|
||||||
|
patch: sPatch,
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
6
go.mod
6
go.mod
|
@ -4,14 +4,14 @@ go 1.21
|
||||||
|
|
||||||
require (
|
require (
|
||||||
code.gitea.io/sdk/gitea v0.17.1
|
code.gitea.io/sdk/gitea v0.17.1
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.16.1-0.20240726072425-3dfa2f4fd65e
|
git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.16.1-0.20240813155151-d112a28d382f
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-contract v0.19.3-0.20240621131249-49e5270f673e
|
git.frostfs.info/TrueCloudLab/frostfs-contract v0.19.3-0.20240621131249-49e5270f673e
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-crypto v0.6.0
|
git.frostfs.info/TrueCloudLab/frostfs-crypto v0.6.0
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-locode-db v0.4.1-0.20240710074952-65761deb5c0d
|
git.frostfs.info/TrueCloudLab/frostfs-locode-db v0.4.1-0.20240710074952-65761deb5c0d
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20231101111734-b3ad3335ff65
|
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20231101111734-b3ad3335ff65
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20240726111349-9da46f566fec
|
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20240813155821-98aabc45a720
|
||||||
git.frostfs.info/TrueCloudLab/hrw v1.2.1
|
git.frostfs.info/TrueCloudLab/hrw v1.2.1
|
||||||
git.frostfs.info/TrueCloudLab/policy-engine v0.0.0-20240712081403-2628f6184984
|
git.frostfs.info/TrueCloudLab/policy-engine v0.0.0-20240814080254-96225afacb88
|
||||||
git.frostfs.info/TrueCloudLab/tzhash v1.8.0
|
git.frostfs.info/TrueCloudLab/tzhash v1.8.0
|
||||||
git.frostfs.info/TrueCloudLab/zapjournald v0.0.0-20240124114243-cb2e66427d02
|
git.frostfs.info/TrueCloudLab/zapjournald v0.0.0-20240124114243-cb2e66427d02
|
||||||
github.com/cheggaaa/pb v1.0.29
|
github.com/cheggaaa/pb v1.0.29
|
||||||
|
|
BIN
go.sum
BIN
go.sum
Binary file not shown.
|
@ -24,6 +24,48 @@ func New(c objectSvc.ServiceServer) *Server {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Patch opens internal Object patch stream and feeds it by the data read from gRPC stream.
|
||||||
|
func (s *Server) Patch(gStream objectGRPC.ObjectService_PatchServer) error {
|
||||||
|
stream, err := s.srv.Patch()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
for {
|
||||||
|
req, err := gStream.Recv()
|
||||||
|
if err != nil {
|
||||||
|
if errors.Is(err, io.EOF) {
|
||||||
|
resp, err := stream.CloseAndRecv(gStream.Context())
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return gStream.SendAndClose(resp.ToGRPCMessage().(*objectGRPC.PatchResponse))
|
||||||
|
}
|
||||||
|
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
patchReq := new(object.PatchRequest)
|
||||||
|
if err := patchReq.FromGRPCMessage(req); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := stream.Send(gStream.Context(), patchReq); err != nil {
|
||||||
|
if errors.Is(err, util.ErrAbortStream) {
|
||||||
|
resp, err := stream.CloseAndRecv(gStream.Context())
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return gStream.SendAndClose(resp.ToGRPCMessage().(*objectGRPC.PatchResponse))
|
||||||
|
}
|
||||||
|
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Put opens internal Object service Put stream and overtakes data from gRPC stream to it.
|
// Put opens internal Object service Put stream and overtakes data from gRPC stream to it.
|
||||||
func (s *Server) Put(gStream objectGRPC.ObjectService_PutServer) error {
|
func (s *Server) Put(gStream objectGRPC.ObjectService_PutServer) error {
|
||||||
stream, err := s.srv.Put()
|
stream, err := s.srv.Put()
|
||||||
|
|
|
@ -35,6 +35,12 @@ type putStreamBasicChecker struct {
|
||||||
next object.PutObjectStream
|
next object.PutObjectStream
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type patchStreamBasicChecker struct {
|
||||||
|
source *Service
|
||||||
|
next object.PatchObjectStream
|
||||||
|
nonFirstSend bool
|
||||||
|
}
|
||||||
|
|
||||||
type getStreamBasicChecker struct {
|
type getStreamBasicChecker struct {
|
||||||
checker ACLChecker
|
checker ACLChecker
|
||||||
|
|
||||||
|
@ -249,6 +255,15 @@ func (b Service) Put() (object.PutObjectStream, error) {
|
||||||
}, err
|
}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (b Service) Patch() (object.PatchObjectStream, error) {
|
||||||
|
streamer, err := b.next.Patch()
|
||||||
|
|
||||||
|
return &patchStreamBasicChecker{
|
||||||
|
source: &b,
|
||||||
|
next: streamer,
|
||||||
|
}, err
|
||||||
|
}
|
||||||
|
|
||||||
func (b Service) Head(
|
func (b Service) Head(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
request *objectV2.HeadRequest,
|
request *objectV2.HeadRequest,
|
||||||
|
@ -734,6 +749,65 @@ func (g *searchStreamBasicChecker) Send(resp *objectV2.SearchResponse) error {
|
||||||
return g.SearchStream.Send(resp)
|
return g.SearchStream.Send(resp)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (p *patchStreamBasicChecker) Send(ctx context.Context, request *objectV2.PatchRequest) error {
|
||||||
|
body := request.GetBody()
|
||||||
|
if body == nil {
|
||||||
|
return errEmptyBody
|
||||||
|
}
|
||||||
|
|
||||||
|
if !p.nonFirstSend {
|
||||||
|
p.nonFirstSend = true
|
||||||
|
|
||||||
|
cnr, err := getContainerIDFromRequest(request)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
objV2 := request.GetBody().GetAddress().GetObjectID()
|
||||||
|
if objV2 == nil {
|
||||||
|
return errors.New("missing oid")
|
||||||
|
}
|
||||||
|
obj := new(oid.ID)
|
||||||
|
err = obj.ReadFromV2(*objV2)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
var sTok *sessionSDK.Object
|
||||||
|
sTok, err = readSessionToken(cnr, obj, request.GetMetaHeader().GetSessionToken())
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
bTok, err := originalBearerToken(request.GetMetaHeader())
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
req := MetaWithToken{
|
||||||
|
vheader: request.GetVerificationHeader(),
|
||||||
|
token: sTok,
|
||||||
|
bearer: bTok,
|
||||||
|
src: request,
|
||||||
|
}
|
||||||
|
|
||||||
|
reqInfo, err := p.source.findRequestInfoWithoutACLOperationAssert(req, cnr)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
reqInfo.obj = obj
|
||||||
|
|
||||||
|
ctx = requestContext(ctx, reqInfo)
|
||||||
|
}
|
||||||
|
|
||||||
|
return p.next.Send(ctx, request)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p patchStreamBasicChecker) CloseAndRecv(ctx context.Context) (*objectV2.PatchResponse, error) {
|
||||||
|
return p.next.CloseAndRecv(ctx)
|
||||||
|
}
|
||||||
|
|
||||||
func (b Service) findRequestInfo(req MetaWithToken, idCnr cid.ID, op acl.Op) (info RequestInfo, err error) {
|
func (b Service) findRequestInfo(req MetaWithToken, idCnr cid.ID, op acl.Op) (info RequestInfo, err error) {
|
||||||
cnr, err := b.containers.Get(idCnr) // fetch actual container
|
cnr, err := b.containers.Get(idCnr) // fetch actual container
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -790,3 +864,56 @@ func (b Service) findRequestInfo(req MetaWithToken, idCnr cid.ID, op acl.Op) (in
|
||||||
|
|
||||||
return info, nil
|
return info, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// findRequestInfoWithoutACLOperationAssert is findRequestInfo without session token verb assert.
|
||||||
|
func (b Service) findRequestInfoWithoutACLOperationAssert(req MetaWithToken, idCnr cid.ID) (info RequestInfo, err error) {
|
||||||
|
cnr, err := b.containers.Get(idCnr) // fetch actual container
|
||||||
|
if err != nil {
|
||||||
|
return info, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if req.token != nil {
|
||||||
|
currentEpoch, err := b.nm.Epoch()
|
||||||
|
if err != nil {
|
||||||
|
return info, errors.New("can't fetch current epoch")
|
||||||
|
}
|
||||||
|
if req.token.ExpiredAt(currentEpoch) {
|
||||||
|
return info, new(apistatus.SessionTokenExpired)
|
||||||
|
}
|
||||||
|
if req.token.InvalidAt(currentEpoch) {
|
||||||
|
return info, fmt.Errorf("%s: token is invalid at %d epoch)",
|
||||||
|
invalidRequestMessage, currentEpoch)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// find request role and key
|
||||||
|
ownerID, ownerKey, err := req.RequestOwner()
|
||||||
|
if err != nil {
|
||||||
|
return info, err
|
||||||
|
}
|
||||||
|
res, err := b.c.Classify(ownerID, ownerKey, idCnr, cnr.Value)
|
||||||
|
if err != nil {
|
||||||
|
return info, err
|
||||||
|
}
|
||||||
|
|
||||||
|
info.basicACL = cnr.Value.BasicACL()
|
||||||
|
info.requestRole = res.Role
|
||||||
|
info.cnrOwner = cnr.Value.Owner()
|
||||||
|
info.idCnr = idCnr
|
||||||
|
|
||||||
|
cnrNamespace, hasNamespace := strings.CutSuffix(cnrSDK.ReadDomain(cnr.Value).Zone(), ".ns")
|
||||||
|
if hasNamespace {
|
||||||
|
info.cnrNamespace = cnrNamespace
|
||||||
|
}
|
||||||
|
|
||||||
|
// it is assumed that at the moment the key will be valid,
|
||||||
|
// otherwise the request would not pass validation
|
||||||
|
info.senderKey = res.Key
|
||||||
|
|
||||||
|
// add bearer token if it is present in request
|
||||||
|
info.bearer = req.bearer
|
||||||
|
|
||||||
|
info.srcRequest = req.src
|
||||||
|
|
||||||
|
return info, nil
|
||||||
|
}
|
||||||
|
|
|
@ -46,6 +46,8 @@ func getContainerIDFromRequest(req any) (cid.ID, error) {
|
||||||
idV2 = v.GetBody().GetAddress().GetContainerID()
|
idV2 = v.GetBody().GetAddress().GetContainerID()
|
||||||
case *objectV2.PutSingleRequest:
|
case *objectV2.PutSingleRequest:
|
||||||
idV2 = v.GetBody().GetObject().GetHeader().GetContainerID()
|
idV2 = v.GetBody().GetObject().GetHeader().GetContainerID()
|
||||||
|
case *objectV2.PatchRequest:
|
||||||
|
idV2 = v.GetBody().GetAddress().GetContainerID()
|
||||||
default:
|
default:
|
||||||
return cid.ID{}, errors.New("unknown request type")
|
return cid.ID{}, errors.New("unknown request type")
|
||||||
}
|
}
|
||||||
|
@ -174,7 +176,7 @@ func isOwnerFromKey(id user.ID, key *keys.PublicKey) bool {
|
||||||
func assertVerb(tok sessionSDK.Object, op acl.Op) bool {
|
func assertVerb(tok sessionSDK.Object, op acl.Op) bool {
|
||||||
switch op {
|
switch op {
|
||||||
case acl.OpObjectPut:
|
case acl.OpObjectPut:
|
||||||
return tok.AssertVerb(sessionSDK.VerbObjectPut, sessionSDK.VerbObjectDelete)
|
return tok.AssertVerb(sessionSDK.VerbObjectPut, sessionSDK.VerbObjectDelete, sessionSDK.VerbObjectPatch)
|
||||||
case acl.OpObjectDelete:
|
case acl.OpObjectDelete:
|
||||||
return tok.AssertVerb(sessionSDK.VerbObjectDelete)
|
return tok.AssertVerb(sessionSDK.VerbObjectDelete)
|
||||||
case acl.OpObjectGet:
|
case acl.OpObjectGet:
|
||||||
|
@ -185,11 +187,13 @@ func assertVerb(tok sessionSDK.Object, op acl.Op) bool {
|
||||||
sessionSDK.VerbObjectGet,
|
sessionSDK.VerbObjectGet,
|
||||||
sessionSDK.VerbObjectDelete,
|
sessionSDK.VerbObjectDelete,
|
||||||
sessionSDK.VerbObjectRange,
|
sessionSDK.VerbObjectRange,
|
||||||
sessionSDK.VerbObjectRangeHash)
|
sessionSDK.VerbObjectRangeHash,
|
||||||
|
sessionSDK.VerbObjectPatch,
|
||||||
|
)
|
||||||
case acl.OpObjectSearch:
|
case acl.OpObjectSearch:
|
||||||
return tok.AssertVerb(sessionSDK.VerbObjectSearch, sessionSDK.VerbObjectDelete)
|
return tok.AssertVerb(sessionSDK.VerbObjectSearch, sessionSDK.VerbObjectDelete)
|
||||||
case acl.OpObjectRange:
|
case acl.OpObjectRange:
|
||||||
return tok.AssertVerb(sessionSDK.VerbObjectRange, sessionSDK.VerbObjectRangeHash)
|
return tok.AssertVerb(sessionSDK.VerbObjectRange, sessionSDK.VerbObjectRangeHash, sessionSDK.VerbObjectPatch)
|
||||||
case acl.OpObjectHash:
|
case acl.OpObjectHash:
|
||||||
return tok.AssertVerb(sessionSDK.VerbObjectRangeHash)
|
return tok.AssertVerb(sessionSDK.VerbObjectRangeHash)
|
||||||
}
|
}
|
||||||
|
|
|
@ -103,7 +103,8 @@ func (c *checkerImpl) newAPERequest(ctx context.Context, prm Prm) (aperequest.Re
|
||||||
nativeschema.MethodHeadObject,
|
nativeschema.MethodHeadObject,
|
||||||
nativeschema.MethodRangeObject,
|
nativeschema.MethodRangeObject,
|
||||||
nativeschema.MethodHashObject,
|
nativeschema.MethodHashObject,
|
||||||
nativeschema.MethodDeleteObject:
|
nativeschema.MethodDeleteObject,
|
||||||
|
nativeschema.MethodPatchObject:
|
||||||
if prm.Object == nil {
|
if prm.Object == nil {
|
||||||
return defaultRequest, fmt.Errorf("method %s: %w", prm.Method, errMissingOID)
|
return defaultRequest, fmt.Errorf("method %s: %w", prm.Method, errMissingOID)
|
||||||
}
|
}
|
||||||
|
|
|
@ -204,6 +204,62 @@ func (c *Service) Put() (objectSvc.PutObjectStream, error) {
|
||||||
}, err
|
}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type patchStreamBasicChecker struct {
|
||||||
|
apeChecker Checker
|
||||||
|
|
||||||
|
next objectSvc.PatchObjectStream
|
||||||
|
|
||||||
|
nonFirstSend bool
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *patchStreamBasicChecker) Send(ctx context.Context, request *objectV2.PatchRequest) error {
|
||||||
|
if !p.nonFirstSend {
|
||||||
|
p.nonFirstSend = true
|
||||||
|
|
||||||
|
reqCtx, err := requestContext(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return toStatusErr(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
cnrID, objID, err := getAddressParamsSDK(request.GetBody().GetAddress().GetContainerID(), request.GetBody().GetAddress().GetObjectID())
|
||||||
|
if err != nil {
|
||||||
|
return toStatusErr(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
prm := Prm{
|
||||||
|
Namespace: reqCtx.Namespace,
|
||||||
|
Container: cnrID,
|
||||||
|
Object: objID,
|
||||||
|
Method: nativeschema.MethodPatchObject,
|
||||||
|
SenderKey: hex.EncodeToString(reqCtx.SenderKey),
|
||||||
|
ContainerOwner: reqCtx.ContainerOwner,
|
||||||
|
Role: nativeSchemaRole(reqCtx.Role),
|
||||||
|
SoftAPECheck: reqCtx.SoftAPECheck,
|
||||||
|
BearerToken: reqCtx.BearerToken,
|
||||||
|
XHeaders: request.GetMetaHeader().GetXHeaders(),
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := p.apeChecker.CheckAPE(ctx, prm); err != nil {
|
||||||
|
return toStatusErr(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return p.next.Send(ctx, request)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p patchStreamBasicChecker) CloseAndRecv(ctx context.Context) (*objectV2.PatchResponse, error) {
|
||||||
|
return p.next.CloseAndRecv(ctx)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Service) Patch() (objectSvc.PatchObjectStream, error) {
|
||||||
|
streamer, err := c.next.Patch()
|
||||||
|
|
||||||
|
return &patchStreamBasicChecker{
|
||||||
|
apeChecker: c.apeChecker,
|
||||||
|
next: streamer,
|
||||||
|
}, err
|
||||||
|
}
|
||||||
|
|
||||||
func (c *Service) Head(ctx context.Context, request *objectV2.HeadRequest) (*objectV2.HeadResponse, error) {
|
func (c *Service) Head(ctx context.Context, request *objectV2.HeadRequest) (*objectV2.HeadResponse, error) {
|
||||||
cnrID, objID, err := getAddressParamsSDK(request.GetBody().GetAddress().GetContainerID(), request.GetBody().GetAddress().GetObjectID())
|
cnrID, objID, err := getAddressParamsSDK(request.GetBody().GetAddress().GetContainerID(), request.GetBody().GetAddress().GetObjectID())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -170,3 +170,64 @@ func (a *auditPutStream) Send(ctx context.Context, req *object.PutRequest) error
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type auditPatchStream struct {
|
||||||
|
stream PatchObjectStream
|
||||||
|
log *logger.Logger
|
||||||
|
|
||||||
|
failed bool
|
||||||
|
key []byte
|
||||||
|
containerID *refs.ContainerID
|
||||||
|
objectID *refs.ObjectID
|
||||||
|
|
||||||
|
nonFirstSend bool
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *auditService) Patch() (PatchObjectStream, error) {
|
||||||
|
res, err := a.next.Patch()
|
||||||
|
if !a.enabled.Load() {
|
||||||
|
return res, err
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
audit.LogRequest(a.log, objectGRPC.ObjectService_Patch_FullMethodName, nil, nil, false)
|
||||||
|
return res, err
|
||||||
|
}
|
||||||
|
return &auditPatchStream{
|
||||||
|
stream: res,
|
||||||
|
log: a.log,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// CloseAndRecv implements PatchObjectStream.
|
||||||
|
func (a *auditPatchStream) CloseAndRecv(ctx context.Context) (*object.PatchResponse, error) {
|
||||||
|
resp, err := a.stream.CloseAndRecv(ctx)
|
||||||
|
if err != nil {
|
||||||
|
a.failed = true
|
||||||
|
}
|
||||||
|
a.objectID = resp.GetBody().ObjectID
|
||||||
|
audit.LogRequestWithKey(a.log, objectGRPC.ObjectService_Patch_FullMethodName, a.key,
|
||||||
|
audit.TargetFromContainerIDObjectID(a.containerID, a.objectID),
|
||||||
|
!a.failed)
|
||||||
|
return resp, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Send implements PatchObjectStream.
|
||||||
|
func (a *auditPatchStream) Send(ctx context.Context, req *object.PatchRequest) error {
|
||||||
|
if !a.nonFirstSend {
|
||||||
|
a.containerID = req.GetBody().GetAddress().GetContainerID()
|
||||||
|
a.objectID = req.GetBody().GetAddress().GetObjectID()
|
||||||
|
a.key = req.GetVerificationHeader().GetBodySignature().GetKey()
|
||||||
|
a.nonFirstSend = true
|
||||||
|
}
|
||||||
|
|
||||||
|
err := a.stream.Send(ctx, req)
|
||||||
|
if err != nil {
|
||||||
|
a.failed = true
|
||||||
|
}
|
||||||
|
if !errors.Is(err, util.ErrAbortStream) { // CloseAndRecv will not be called, so log here
|
||||||
|
audit.LogRequestWithKey(a.log, objectGRPC.ObjectService_Patch_FullMethodName, a.key,
|
||||||
|
audit.TargetFromContainerIDObjectID(a.containerID, a.objectID),
|
||||||
|
!a.failed)
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
|
@ -48,6 +48,14 @@ func (x *Common) Put() (PutObjectStream, error) {
|
||||||
return x.nextHandler.Put()
|
return x.nextHandler.Put()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (x *Common) Patch() (PatchObjectStream, error) {
|
||||||
|
if x.state.IsMaintenance() {
|
||||||
|
return nil, new(apistatus.NodeUnderMaintenance)
|
||||||
|
}
|
||||||
|
|
||||||
|
return x.nextHandler.Patch()
|
||||||
|
}
|
||||||
|
|
||||||
func (x *Common) Head(ctx context.Context, req *objectV2.HeadRequest) (*objectV2.HeadResponse, error) {
|
func (x *Common) Head(ctx context.Context, req *objectV2.HeadRequest) (*objectV2.HeadResponse, error) {
|
||||||
if x.state.IsMaintenance() {
|
if x.state.IsMaintenance() {
|
||||||
return nil, new(apistatus.NodeUnderMaintenance)
|
return nil, new(apistatus.NodeUnderMaintenance)
|
||||||
|
|
|
@ -124,6 +124,10 @@ func (p *commonPrm) SetRequestForwarder(f RequestForwarder) {
|
||||||
p.forwarder = f
|
p.forwarder = f
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (p *commonPrm) SetSignerKey(signerKey *ecdsa.PrivateKey) {
|
||||||
|
p.signerKey = signerKey
|
||||||
|
}
|
||||||
|
|
||||||
// WithAddress sets object address to be read.
|
// WithAddress sets object address to be read.
|
||||||
func (p *commonPrm) WithAddress(addr oid.Address) {
|
func (p *commonPrm) WithAddress(addr oid.Address) {
|
||||||
p.addr = addr
|
p.addr = addr
|
||||||
|
|
|
@ -27,6 +27,12 @@ type (
|
||||||
start time.Time
|
start time.Time
|
||||||
}
|
}
|
||||||
|
|
||||||
|
patchStreamMetric struct {
|
||||||
|
stream PatchObjectStream
|
||||||
|
metrics MetricRegister
|
||||||
|
start time.Time
|
||||||
|
}
|
||||||
|
|
||||||
MetricRegister interface {
|
MetricRegister interface {
|
||||||
AddRequestDuration(string, time.Duration, bool)
|
AddRequestDuration(string, time.Duration, bool)
|
||||||
AddPayloadSize(string, int)
|
AddPayloadSize(string, int)
|
||||||
|
@ -76,6 +82,24 @@ func (m MetricCollector) Put() (PutObjectStream, error) {
|
||||||
return m.next.Put()
|
return m.next.Put()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (m MetricCollector) Patch() (PatchObjectStream, error) {
|
||||||
|
if m.enabled {
|
||||||
|
t := time.Now()
|
||||||
|
|
||||||
|
stream, err := m.next.Patch()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return &patchStreamMetric{
|
||||||
|
stream: stream,
|
||||||
|
metrics: m.metrics,
|
||||||
|
start: t,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
return m.next.Patch()
|
||||||
|
}
|
||||||
|
|
||||||
func (m MetricCollector) PutSingle(ctx context.Context, request *object.PutSingleRequest) (*object.PutSingleResponse, error) {
|
func (m MetricCollector) PutSingle(ctx context.Context, request *object.PutSingleRequest) (*object.PutSingleResponse, error) {
|
||||||
if m.enabled {
|
if m.enabled {
|
||||||
t := time.Now()
|
t := time.Now()
|
||||||
|
@ -189,3 +213,16 @@ func (s putStreamMetric) CloseAndRecv(ctx context.Context) (*object.PutResponse,
|
||||||
|
|
||||||
return res, err
|
return res, err
|
||||||
}
|
}
|
||||||
|
func (s patchStreamMetric) Send(ctx context.Context, req *object.PatchRequest) error {
|
||||||
|
s.metrics.AddPayloadSize("Patch", len(req.GetBody().GetPatch().GetChunk()))
|
||||||
|
|
||||||
|
return s.stream.Send(ctx, req)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s patchStreamMetric) CloseAndRecv(ctx context.Context) (*object.PatchResponse, error) {
|
||||||
|
res, err := s.stream.CloseAndRecv(ctx)
|
||||||
|
|
||||||
|
s.metrics.AddRequestDuration("Patch", time.Since(s.start), err == nil)
|
||||||
|
|
||||||
|
return res, err
|
||||||
|
}
|
||||||
|
|
63
pkg/services/object/patch/range_provider.go
Normal file
63
pkg/services/object/patch/range_provider.go
Normal file
|
@ -0,0 +1,63 @@
|
||||||
|
package patchsvc
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"crypto/ecdsa"
|
||||||
|
"io"
|
||||||
|
|
||||||
|
getsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/get"
|
||||||
|
objectUtil "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
|
||||||
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
|
patcherSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/patcher"
|
||||||
|
)
|
||||||
|
|
||||||
|
func (p *pipeChunkWriter) WriteChunk(_ context.Context, chunk []byte) error {
|
||||||
|
_, err := p.wr.Write(chunk)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
type rangeProvider struct {
|
||||||
|
getSvc *getsvc.Service
|
||||||
|
|
||||||
|
addr oid.Address
|
||||||
|
|
||||||
|
commonPrm *objectUtil.CommonPrm
|
||||||
|
|
||||||
|
localNodeKey *ecdsa.PrivateKey
|
||||||
|
}
|
||||||
|
|
||||||
|
var _ patcherSDK.RangeProvider = (*rangeProvider)(nil)
|
||||||
|
|
||||||
|
func (r *rangeProvider) GetRange(ctx context.Context, rng *objectSDK.Range) io.Reader {
|
||||||
|
pipeReader, pipeWriter := io.Pipe()
|
||||||
|
|
||||||
|
var rngPrm getsvc.RangePrm
|
||||||
|
rngPrm.SetSignerKey(r.localNodeKey)
|
||||||
|
rngPrm.SetCommonParameters(r.commonPrm)
|
||||||
|
|
||||||
|
rngPrm.WithAddress(r.addr)
|
||||||
|
rngPrm.SetChunkWriter(&pipeChunkWriter{
|
||||||
|
wr: pipeWriter,
|
||||||
|
})
|
||||||
|
rngPrm.SetRange(rng)
|
||||||
|
|
||||||
|
getRangeErr := make(chan error)
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
defer pipeWriter.Close()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
pipeWriter.CloseWithError(ctx.Err())
|
||||||
|
case err := <-getRangeErr:
|
||||||
|
pipeWriter.CloseWithError(err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
getRangeErr <- r.getSvc.GetRange(ctx, rngPrm)
|
||||||
|
}()
|
||||||
|
|
||||||
|
return pipeReader
|
||||||
|
}
|
44
pkg/services/object/patch/service.go
Normal file
44
pkg/services/object/patch/service.go
Normal file
|
@ -0,0 +1,44 @@
|
||||||
|
package patchsvc
|
||||||
|
|
||||||
|
import (
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object"
|
||||||
|
getsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/get"
|
||||||
|
putsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/put"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Service implements Put operation of Object service v2.
|
||||||
|
type Service struct {
|
||||||
|
keyStorage *util.KeyStorage
|
||||||
|
|
||||||
|
getSvc *getsvc.Service
|
||||||
|
|
||||||
|
putSvc *putsvc.Service
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewService constructs Service instance from provided options.
|
||||||
|
func NewService(ks *util.KeyStorage, getSvc *getsvc.Service, putSvc *putsvc.Service) *Service {
|
||||||
|
return &Service{
|
||||||
|
keyStorage: ks,
|
||||||
|
|
||||||
|
getSvc: getSvc,
|
||||||
|
|
||||||
|
putSvc: putSvc,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Put calls internal service and returns v2 object streamer.
|
||||||
|
func (s *Service) Patch() (object.PatchObjectStream, error) {
|
||||||
|
nodeKey, err := s.keyStorage.GetKey(nil)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return &Streamer{
|
||||||
|
getSvc: s.getSvc,
|
||||||
|
|
||||||
|
putSvc: s.putSvc,
|
||||||
|
|
||||||
|
localNodeKey: nodeKey,
|
||||||
|
}, nil
|
||||||
|
}
|
221
pkg/services/object/patch/streamer.go
Normal file
221
pkg/services/object/patch/streamer.go
Normal file
|
@ -0,0 +1,221 @@
|
||||||
|
package patchsvc
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"crypto/ecdsa"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
|
||||||
|
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
|
||||||
|
refsV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/refs"
|
||||||
|
getsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/get"
|
||||||
|
putsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/put"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||||
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/patcher"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Streamer for the patch handler is a pipeline that merges two incoming streams of patches
|
||||||
|
// and original object payload chunks. The merged result is fed to Put stream target.
|
||||||
|
type Streamer struct {
|
||||||
|
// Patcher must be initialized at first Streamer.Send call.
|
||||||
|
patcher patcher.PatchApplier
|
||||||
|
|
||||||
|
nonFirstSend bool
|
||||||
|
|
||||||
|
getSvc *getsvc.Service
|
||||||
|
|
||||||
|
putSvc *putsvc.Service
|
||||||
|
|
||||||
|
localNodeKey *ecdsa.PrivateKey
|
||||||
|
}
|
||||||
|
|
||||||
|
type pipeChunkWriter struct {
|
||||||
|
wr *io.PipeWriter
|
||||||
|
}
|
||||||
|
|
||||||
|
type headResponseWriter struct {
|
||||||
|
body *objectV2.HeadResponseBody
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *headResponseWriter) WriteHeader(_ context.Context, hdr *objectSDK.Object) error {
|
||||||
|
w.body.SetHeaderPart(toFullObjectHeader(hdr))
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func toFullObjectHeader(hdr *objectSDK.Object) objectV2.GetHeaderPart {
|
||||||
|
obj := hdr.ToV2()
|
||||||
|
|
||||||
|
hs := new(objectV2.HeaderWithSignature)
|
||||||
|
hs.SetHeader(obj.GetHeader())
|
||||||
|
hs.SetSignature(obj.GetSignature())
|
||||||
|
|
||||||
|
return hs
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Streamer) init(ctx context.Context, req *objectV2.PatchRequest) error {
|
||||||
|
hdrWithSig, addr, err := s.readHeader(ctx, req)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
commonPrm, err := util.CommonPrmFromV2(req)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
commonPrm.WithLocalOnly(false)
|
||||||
|
|
||||||
|
rangeProvider := &rangeProvider{
|
||||||
|
getSvc: s.getSvc,
|
||||||
|
|
||||||
|
addr: addr,
|
||||||
|
|
||||||
|
commonPrm: commonPrm,
|
||||||
|
|
||||||
|
localNodeKey: s.localNodeKey,
|
||||||
|
}
|
||||||
|
|
||||||
|
putstm, err := s.putSvc.Put()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
hdr := hdrWithSig.GetHeader()
|
||||||
|
oV2 := new(objectV2.Object)
|
||||||
|
hV2 := new(objectV2.Header)
|
||||||
|
oV2.SetHeader(hV2)
|
||||||
|
oV2.GetHeader().SetContainerID(hdr.GetContainerID())
|
||||||
|
oV2.GetHeader().SetPayloadLength(hdr.GetPayloadLength())
|
||||||
|
oV2.GetHeader().SetAttributes(hdr.GetAttributes())
|
||||||
|
|
||||||
|
ownerID, err := newOwnerID(req.GetVerificationHeader())
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
oV2.GetHeader().SetOwnerID(ownerID)
|
||||||
|
|
||||||
|
prm, err := s.putInitPrm(req, oV2)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
err = putstm.Init(ctx, prm)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
patcherPrm := patcher.Params{
|
||||||
|
Header: objectSDK.NewFromV2(oV2),
|
||||||
|
|
||||||
|
RangeProvider: rangeProvider,
|
||||||
|
|
||||||
|
ObjectWriter: putstm.Target(),
|
||||||
|
}
|
||||||
|
|
||||||
|
s.patcher = patcher.New(patcherPrm)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Streamer) readHeader(ctx context.Context, req *objectV2.PatchRequest) (hdrWithSig *objectV2.HeaderWithSignature, addr oid.Address, err error) {
|
||||||
|
addrV2 := req.GetBody().GetAddress()
|
||||||
|
if addrV2 == nil {
|
||||||
|
err = errors.New("patch request has nil-address")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if err = addr.ReadFromV2(*addrV2); err != nil {
|
||||||
|
err = fmt.Errorf("read address error: %w", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
commonPrm, err := util.CommonPrmFromV2(req)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
commonPrm.WithLocalOnly(false)
|
||||||
|
|
||||||
|
var p getsvc.HeadPrm
|
||||||
|
p.SetSignerKey(s.localNodeKey)
|
||||||
|
p.SetCommonParameters(commonPrm)
|
||||||
|
|
||||||
|
resp := new(objectV2.HeadResponse)
|
||||||
|
resp.SetBody(new(objectV2.HeadResponseBody))
|
||||||
|
|
||||||
|
p.WithAddress(addr)
|
||||||
|
p.SetHeaderWriter(&headResponseWriter{
|
||||||
|
body: resp.GetBody(),
|
||||||
|
})
|
||||||
|
|
||||||
|
err = s.getSvc.Head(ctx, p)
|
||||||
|
if err != nil {
|
||||||
|
err = fmt.Errorf("get header error: %w", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
var ok bool
|
||||||
|
hdrPart := resp.GetBody().GetHeaderPart()
|
||||||
|
if hdrWithSig, ok = hdrPart.(*objectV2.HeaderWithSignature); !ok {
|
||||||
|
err = fmt.Errorf("unexpected header type: %T", hdrPart)
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Streamer) Send(ctx context.Context, req *objectV2.PatchRequest) error {
|
||||||
|
ctx, span := tracing.StartSpanFromContext(ctx, "patch.streamer.Send")
|
||||||
|
defer span.End()
|
||||||
|
|
||||||
|
defer func() {
|
||||||
|
s.nonFirstSend = true
|
||||||
|
}()
|
||||||
|
|
||||||
|
if !s.nonFirstSend {
|
||||||
|
if err := s.init(ctx, req); err != nil {
|
||||||
|
return fmt.Errorf("streamer init error: %w", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
patch := new(objectSDK.Patch)
|
||||||
|
patch.FromV2(req.GetBody())
|
||||||
|
|
||||||
|
if !s.nonFirstSend {
|
||||||
|
err := s.patcher.ApplyAttributesPatch(ctx, patch.NewAttributes, patch.ReplaceAttributes)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("patch attributes: %w", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if patch.PayloadPatch != nil {
|
||||||
|
err := s.patcher.ApplyPayloadPatch(ctx, patch.PayloadPatch)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("patch payload: %w", err)
|
||||||
|
}
|
||||||
|
} else if s.nonFirstSend {
|
||||||
|
return errors.New("invalid non-first patch: empty payload")
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Streamer) CloseAndRecv(ctx context.Context) (*objectV2.PatchResponse, error) {
|
||||||
|
patcherResp, err := s.patcher.Close(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
oidV2 := new(refsV2.ObjectID)
|
||||||
|
|
||||||
|
if patcherResp.AccessIdentifiers.ParentID != nil {
|
||||||
|
patcherResp.AccessIdentifiers.ParentID.WriteToV2(oidV2)
|
||||||
|
} else {
|
||||||
|
patcherResp.AccessIdentifiers.SelfID.WriteToV2(oidV2)
|
||||||
|
}
|
||||||
|
|
||||||
|
return &objectV2.PatchResponse{
|
||||||
|
Body: &objectV2.PatchResponseBody{
|
||||||
|
ObjectID: oidV2,
|
||||||
|
},
|
||||||
|
}, nil
|
||||||
|
}
|
53
pkg/services/object/patch/util.go
Normal file
53
pkg/services/object/patch/util.go
Normal file
|
@ -0,0 +1,53 @@
|
||||||
|
package patchsvc
|
||||||
|
|
||||||
|
import (
|
||||||
|
"crypto/ecdsa"
|
||||||
|
"crypto/elliptic"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/refs"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/session"
|
||||||
|
putsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/put"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
|
||||||
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
|
||||||
|
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
||||||
|
)
|
||||||
|
|
||||||
|
// putInitPrm initializes put paramerer for Put stream.
|
||||||
|
func (s *Streamer) putInitPrm(req *objectV2.PatchRequest, obj *objectV2.Object) (*putsvc.PutInitPrm, error) {
|
||||||
|
commonPrm, err := util.CommonPrmFromV2(req)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
prm := new(putsvc.PutInitPrm)
|
||||||
|
prm.WithObject(objectSDK.NewFromV2(obj)).
|
||||||
|
WithCommonPrm(commonPrm).
|
||||||
|
WithPrivateKey(s.localNodeKey)
|
||||||
|
|
||||||
|
return prm, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func newOwnerID(vh *session.RequestVerificationHeader) (*refs.OwnerID, error) {
|
||||||
|
for vh.GetOrigin() != nil {
|
||||||
|
vh = vh.GetOrigin()
|
||||||
|
}
|
||||||
|
sig := vh.GetBodySignature()
|
||||||
|
if sig == nil {
|
||||||
|
return nil, errors.New("empty body signature")
|
||||||
|
}
|
||||||
|
key, err := keys.NewPublicKeyFromBytes(sig.GetKey(), elliptic.P256())
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("invalid signature key: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
var userID user.ID
|
||||||
|
user.IDFromKey(&userID, (ecdsa.PublicKey)(*key))
|
||||||
|
ownID := new(refs.OwnerID)
|
||||||
|
userID.WriteToV2(ownID)
|
||||||
|
|
||||||
|
return ownID, nil
|
||||||
|
}
|
|
@ -2,6 +2,7 @@ package putsvc
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"crypto/ecdsa"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
|
||||||
|
@ -20,6 +21,8 @@ type PutInitPrm struct {
|
||||||
traverseOpts []placement.Option
|
traverseOpts []placement.Option
|
||||||
|
|
||||||
relay func(context.Context, client.NodeInfo, client.MultiAddressClient) error
|
relay func(context.Context, client.NodeInfo, client.MultiAddressClient) error
|
||||||
|
|
||||||
|
privateKey *ecdsa.PrivateKey
|
||||||
}
|
}
|
||||||
|
|
||||||
type PutChunkPrm struct {
|
type PutChunkPrm struct {
|
||||||
|
@ -65,3 +68,11 @@ func (p *PutChunkPrm) WithChunk(v []byte) *PutChunkPrm {
|
||||||
|
|
||||||
return p
|
return p
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (p *PutInitPrm) WithPrivateKey(v *ecdsa.PrivateKey) *PutInitPrm {
|
||||||
|
if p != nil {
|
||||||
|
p.privateKey = v
|
||||||
|
}
|
||||||
|
|
||||||
|
return p
|
||||||
|
}
|
||||||
|
|
|
@ -47,6 +47,11 @@ func (p *Streamer) Init(ctx context.Context, prm *PutInitPrm) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Target accesses underlying target chunked object writer.
|
||||||
|
func (p *Streamer) Target() transformer.ChunkedObjectWriter {
|
||||||
|
return p.target
|
||||||
|
}
|
||||||
|
|
||||||
// MaxObjectSize returns maximum payload size for the streaming session.
|
// MaxObjectSize returns maximum payload size for the streaming session.
|
||||||
//
|
//
|
||||||
// Must be called after the successful Init.
|
// Must be called after the successful Init.
|
||||||
|
@ -79,11 +84,15 @@ func (p *Streamer) initTarget(prm *PutInitPrm) error {
|
||||||
func (p *Streamer) initUntrustedTarget(prm *PutInitPrm) error {
|
func (p *Streamer) initUntrustedTarget(prm *PutInitPrm) error {
|
||||||
p.relay = prm.relay
|
p.relay = prm.relay
|
||||||
|
|
||||||
|
if prm.privateKey != nil {
|
||||||
|
p.privateKey = prm.privateKey
|
||||||
|
} else {
|
||||||
nodeKey, err := p.cfg.keyStorage.GetKey(nil)
|
nodeKey, err := p.cfg.keyStorage.GetKey(nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
p.privateKey = nodeKey
|
p.privateKey = nodeKey
|
||||||
|
}
|
||||||
|
|
||||||
// prepare untrusted-Put object target
|
// prepare untrusted-Put object target
|
||||||
p.target = &validatingPreparedTarget{
|
p.target = &validatingPreparedTarget{
|
||||||
|
@ -136,7 +145,11 @@ func (p *Streamer) initTrustedTarget(prm *PutInitPrm) error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if prm.privateKey != nil {
|
||||||
|
p.privateKey = prm.privateKey
|
||||||
|
} else {
|
||||||
p.privateKey = key
|
p.privateKey = key
|
||||||
|
}
|
||||||
p.target = &validatingTarget{
|
p.target = &validatingTarget{
|
||||||
fmt: p.fmtValidator,
|
fmt: p.fmtValidator,
|
||||||
nextTarget: transformer.NewPayloadSizeLimiter(transformer.Params{
|
nextTarget: transformer.NewPayloadSizeLimiter(transformer.Params{
|
||||||
|
|
|
@ -37,6 +37,11 @@ type putStreamResponser struct {
|
||||||
respSvc *response.Service
|
respSvc *response.Service
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type patchStreamResponser struct {
|
||||||
|
stream PatchObjectStream
|
||||||
|
respSvc *response.Service
|
||||||
|
}
|
||||||
|
|
||||||
// NewResponseService returns object service instance that passes internal service
|
// NewResponseService returns object service instance that passes internal service
|
||||||
// call to response service.
|
// call to response service.
|
||||||
func NewResponseService(objSvc ServiceServer, respSvc *response.Service) *ResponseService {
|
func NewResponseService(objSvc ServiceServer, respSvc *response.Service) *ResponseService {
|
||||||
|
@ -87,6 +92,35 @@ func (s *ResponseService) Put() (PutObjectStream, error) {
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *patchStreamResponser) Send(ctx context.Context, req *object.PatchRequest) error {
|
||||||
|
if err := s.stream.Send(ctx, req); err != nil {
|
||||||
|
return fmt.Errorf("could not send the request: %w", err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *patchStreamResponser) CloseAndRecv(ctx context.Context) (*object.PatchResponse, error) {
|
||||||
|
r, err := s.stream.CloseAndRecv(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("could not close stream and receive response: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
s.respSvc.SetMeta(r)
|
||||||
|
return r, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *ResponseService) Patch() (PatchObjectStream, error) {
|
||||||
|
stream, err := s.svc.Patch()
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("could not create Put object streamer: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return &patchStreamResponser{
|
||||||
|
stream: stream,
|
||||||
|
respSvc: s.respSvc,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
func (s *ResponseService) PutSingle(ctx context.Context, req *object.PutSingleRequest) (*object.PutSingleResponse, error) {
|
func (s *ResponseService) PutSingle(ctx context.Context, req *object.PutSingleRequest) (*object.PutSingleResponse, error) {
|
||||||
resp, err := s.svc.PutSingle(ctx, req)
|
resp, err := s.svc.PutSingle(ctx, req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -31,11 +31,18 @@ type PutObjectStream interface {
|
||||||
CloseAndRecv(context.Context) (*object.PutResponse, error)
|
CloseAndRecv(context.Context) (*object.PutResponse, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// PatchObjectStream is an interface of FrostFS API v2 compatible patch streamer.
|
||||||
|
type PatchObjectStream interface {
|
||||||
|
Send(context.Context, *object.PatchRequest) error
|
||||||
|
CloseAndRecv(context.Context) (*object.PatchResponse, error)
|
||||||
|
}
|
||||||
|
|
||||||
// ServiceServer is an interface of utility
|
// ServiceServer is an interface of utility
|
||||||
// serving v2 Object service.
|
// serving v2 Object service.
|
||||||
type ServiceServer interface {
|
type ServiceServer interface {
|
||||||
Get(*object.GetRequest, GetObjectStream) error
|
Get(*object.GetRequest, GetObjectStream) error
|
||||||
Put() (PutObjectStream, error)
|
Put() (PutObjectStream, error)
|
||||||
|
Patch() (PatchObjectStream, error)
|
||||||
Head(context.Context, *object.HeadRequest) (*object.HeadResponse, error)
|
Head(context.Context, *object.HeadRequest) (*object.HeadResponse, error)
|
||||||
Search(*object.SearchRequest, SearchStream) error
|
Search(*object.SearchRequest, SearchStream) error
|
||||||
Delete(context.Context, *object.DeleteRequest) (*object.DeleteResponse, error)
|
Delete(context.Context, *object.DeleteRequest) (*object.DeleteResponse, error)
|
||||||
|
|
|
@ -35,6 +35,12 @@ type putStreamSigner struct {
|
||||||
err error
|
err error
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type patchStreamSigner struct {
|
||||||
|
sigSvc *util.SignService
|
||||||
|
stream PatchObjectStream
|
||||||
|
err error
|
||||||
|
}
|
||||||
|
|
||||||
type getRangeStreamSigner struct {
|
type getRangeStreamSigner struct {
|
||||||
GetObjectRangeStream
|
GetObjectRangeStream
|
||||||
sigSvc *util.SignService
|
sigSvc *util.SignService
|
||||||
|
@ -112,6 +118,42 @@ func (s *SignService) Put() (PutObjectStream, error) {
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *patchStreamSigner) Send(ctx context.Context, req *object.PatchRequest) error {
|
||||||
|
if s.err = s.sigSvc.VerifyRequest(req); s.err != nil {
|
||||||
|
return util.ErrAbortStream
|
||||||
|
}
|
||||||
|
if s.err = s.stream.Send(ctx, req); s.err != nil {
|
||||||
|
return util.ErrAbortStream
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *patchStreamSigner) CloseAndRecv(ctx context.Context) (resp *object.PatchResponse, err error) {
|
||||||
|
if s.err != nil {
|
||||||
|
err = s.err
|
||||||
|
resp = new(object.PatchResponse)
|
||||||
|
} else {
|
||||||
|
resp, err = s.stream.CloseAndRecv(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("could not close stream and receive response: %w", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return resp, s.sigSvc.SignResponse(resp, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *SignService) Patch() (PatchObjectStream, error) {
|
||||||
|
stream, err := s.svc.Patch()
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("could not create Put object streamer: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return &patchStreamSigner{
|
||||||
|
stream: stream,
|
||||||
|
sigSvc: s.sigSvc,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
func (s *SignService) Head(ctx context.Context, req *object.HeadRequest) (*object.HeadResponse, error) {
|
func (s *SignService) Head(ctx context.Context, req *object.HeadRequest) (*object.HeadResponse, error) {
|
||||||
if err := s.sigSvc.VerifyRequest(req); err != nil {
|
if err := s.sigSvc.VerifyRequest(req); err != nil {
|
||||||
resp := new(object.HeadResponse)
|
resp := new(object.HeadResponse)
|
||||||
|
|
|
@ -91,6 +91,10 @@ func (c TransportSplitter) Put() (PutObjectStream, error) {
|
||||||
return c.next.Put()
|
return c.next.Put()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c TransportSplitter) Patch() (PatchObjectStream, error) {
|
||||||
|
return c.next.Patch()
|
||||||
|
}
|
||||||
|
|
||||||
func (c TransportSplitter) Head(ctx context.Context, request *object.HeadRequest) (*object.HeadResponse, error) {
|
func (c TransportSplitter) Head(ctx context.Context, request *object.HeadRequest) (*object.HeadResponse, error) {
|
||||||
return c.next.Head(ctx, request)
|
return c.next.Head(ctx, request)
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue