From 4c248d573eb7951abca58e6ef89556f6d9f0dc1a Mon Sep 17 00:00:00 2001 From: Anton Nikiforov Date: Thu, 29 Jun 2023 09:48:32 +0300 Subject: [PATCH] [#483] cli: Allow to split object on the client side Signed-off-by: Anton Nikiforov --- cmd/frostfs-cli/internal/client/client.go | 64 +++++++++++++++++------ cmd/frostfs-cli/modules/object/put.go | 36 ++++++++----- 2 files changed, 70 insertions(+), 30 deletions(-) diff --git a/cmd/frostfs-cli/internal/client/client.go b/cmd/frostfs-cli/internal/client/client.go index d0bbc9cf..9545c60c 100644 --- a/cmd/frostfs-cli/internal/client/client.go +++ b/cmd/frostfs-cli/internal/client/client.go @@ -336,6 +336,8 @@ type PutObjectPrm struct { rdr io.Reader headerCallback func(*object.Object) + + prepareLocally bool } // SetHeader sets object header. @@ -360,6 +362,41 @@ func (x *PutObjectPrm) SetCopiesNumberByVectors(copiesNumbers []uint32) { x.copyNum = copiesNumbers } +// PrepareLocally generate object header on the client side. +// For big object - split locally too. +func (x *PutObjectPrm) PrepareLocally() { + x.prepareLocally = true +} + +func (x *PutObjectPrm) convertToSDKPrm(ctx context.Context) (client.PrmObjectPutInit, error) { + var putPrm client.PrmObjectPutInit + if !x.prepareLocally && x.sessionToken != nil { + putPrm.WithinSession(*x.sessionToken) + } + + if x.bearerToken != nil { + putPrm.WithBearerToken(*x.bearerToken) + } + + if x.local { + putPrm.MarkLocal() + } + + putPrm.WithXHeaders(x.xHeaders...) + putPrm.SetCopiesNumberByVectors(x.copyNum) + + if x.prepareLocally { + res, err := x.cli.NetworkInfo(ctx, client.PrmNetworkInfo{}) + if err != nil { + return client.PrmObjectPutInit{}, err + } + putPrm.WithObjectMaxSize(res.Info().MaxObjectSize()) + putPrm.WithEpochSource(epochSource(res.Info().CurrentEpoch())) + putPrm.WithoutHomomorphicHash(res.Info().HomomorphicHashingDisabled()) + } + return putPrm, nil +} + // PutObjectRes groups the resulting values of PutObject operation. type PutObjectRes struct { id oid.ID @@ -370,28 +407,21 @@ func (x PutObjectRes) ID() oid.ID { return x.id } +type epochSource uint64 + +func (s epochSource) CurrentEpoch() uint64 { + return uint64(s) +} + // PutObject saves the object in FrostFS network. // // Returns any error which prevented the operation from completing correctly in error return. func PutObject(ctx context.Context, prm PutObjectPrm) (*PutObjectRes, error) { - var putPrm client.PrmObjectPutInit - - if prm.sessionToken != nil { - putPrm.WithinSession(*prm.sessionToken) + sdkPrm, err := prm.convertToSDKPrm(ctx) + if err != nil { + return nil, fmt.Errorf("unable to create parameters of object put operation: %w", err) } - - if prm.bearerToken != nil { - putPrm.WithBearerToken(*prm.bearerToken) - } - - if prm.local { - putPrm.MarkLocal() - } - - putPrm.WithXHeaders(prm.xHeaders...) - putPrm.SetCopiesNumberByVectors(prm.copyNum) - - wrt, err := prm.cli.ObjectPutInit(ctx, putPrm) + wrt, err := prm.cli.ObjectPutInit(ctx, sdkPrm) if err != nil { return nil, fmt.Errorf("init object writing: %w", err) } diff --git a/cmd/frostfs-cli/modules/object/put.go b/cmd/frostfs-cli/modules/object/put.go index af9e9fd0..19edbb97 100644 --- a/cmd/frostfs-cli/modules/object/put.go +++ b/cmd/frostfs-cli/modules/object/put.go @@ -23,9 +23,10 @@ import ( ) const ( - noProgressFlag = "no-progress" - notificationFlag = "notify" - copiesNumberFlag = "copies-number" + noProgressFlag = "no-progress" + notificationFlag = "notify" + copiesNumberFlag = "copies-number" + prepareLocallyFlag = "prepare-locally" ) var putExpiredOn uint64 @@ -54,6 +55,7 @@ func initObjectPutCmd() { flags.Bool("disable-timestamp", false, "Do not set well-known timestamp attribute") flags.Uint64VarP(&putExpiredOn, commonflags.ExpireAt, "e", 0, "The last active epoch in the life of the object") flags.Bool(noProgressFlag, false, "Do not show progress bar") + flags.Bool(prepareLocallyFlag, false, "Generate object header on the client side (for big object - split locally too)") flags.String(notificationFlag, "", "Object notification in the form of *epoch*:*topic*; '-' topic means using default") flags.Bool(binaryFlag, false, "Deserialize object structure from given file.") @@ -102,7 +104,11 @@ func putObject(cmd *cobra.Command, _ []string) { } var prm internalclient.PutObjectPrm - ReadOrOpenSession(cmd, &prm, pk, cnr, nil) + if prepareLocally, _ := cmd.Flags().GetBool(prepareLocallyFlag); prepareLocally { + prm.PrepareLocally() + } else { + ReadOrOpenSession(cmd, &prm, pk, cnr, nil) + } Prepare(cmd, &prm) prm.SetHeader(obj) @@ -121,15 +127,7 @@ func putObject(cmd *cobra.Command, _ []string) { copyNum, err := cmd.Flags().GetString(copiesNumberFlag) commonCmd.ExitOnErr(cmd, "can't parse object copies numbers information: %w", err) - if len(copyNum) > 0 { - var cn []uint32 - for _, num := range strings.Split(copyNum, ",") { - val, err := strconv.ParseUint(num, 10, 32) - commonCmd.ExitOnErr(cmd, "can't parse object copies numbers information: %w", err) - cn = append(cn, uint32(val)) - } - prm.SetCopiesNumberByVectors(cn) - } + prm.SetCopiesNumberByVectors(parseCopyNumber(cmd, copyNum)) res, err := internalclient.PutObject(cmd.Context(), prm) if p != nil { @@ -141,6 +139,18 @@ func putObject(cmd *cobra.Command, _ []string) { cmd.Printf(" OID: %s\n CID: %s\n", res.ID(), cnr) } +func parseCopyNumber(cmd *cobra.Command, copyNum string) []uint32 { + var cn []uint32 + if len(copyNum) > 0 { + for _, num := range strings.Split(copyNum, ",") { + val, err := strconv.ParseUint(num, 10, 32) + commonCmd.ExitOnErr(cmd, "can't parse object copies numbers information: %w", err) + cn = append(cn, uint32(val)) + } + } + return cn +} + func readFilePayload(filename string, cmd *cobra.Command) (io.Reader, cid.ID, user.ID) { buf, err := os.ReadFile(filename) commonCmd.ExitOnErr(cmd, "unable to read given file: %w", err)