[#483] cli: Allow to split object on the client side
Build / Build Components (1.20) (pull_request) Successful in 3m50s Details
Build / Build Components (1.19) (pull_request) Successful in 4m2s Details
ci/woodpecker/pr/pre-commit Pipeline was successful Details
Tests and linters / Lint (pull_request) Successful in 8m50s Details
Tests and linters / Tests with -race (pull_request) Successful in 5m16s Details
Tests and linters / Tests (1.20) (pull_request) Successful in 6m10s Details
Tests and linters / Tests (1.19) (pull_request) Successful in 13m36s Details
ci/woodpecker/push/pre-commit Pipeline failed Details

Signed-off-by: Anton Nikiforov <an.nikiforov@yadro.com>
pull/471/head
Anton Nikiforov 2023-06-29 09:48:32 +03:00
parent 90e9a85acc
commit 4c248d573e
2 changed files with 70 additions and 30 deletions

View File

@ -336,6 +336,8 @@ type PutObjectPrm struct {
rdr io.Reader
headerCallback func(*object.Object)
prepareLocally bool
}
// SetHeader sets object header.
@ -360,6 +362,41 @@ func (x *PutObjectPrm) SetCopiesNumberByVectors(copiesNumbers []uint32) {
x.copyNum = copiesNumbers
}
// PrepareLocally generate object header on the client side.
// For big object - split locally too.
func (x *PutObjectPrm) PrepareLocally() {
x.prepareLocally = true
}
func (x *PutObjectPrm) convertToSDKPrm(ctx context.Context) (client.PrmObjectPutInit, error) {
var putPrm client.PrmObjectPutInit
if !x.prepareLocally && x.sessionToken != nil {
putPrm.WithinSession(*x.sessionToken)
}
if x.bearerToken != nil {
putPrm.WithBearerToken(*x.bearerToken)
}
if x.local {
putPrm.MarkLocal()
}
putPrm.WithXHeaders(x.xHeaders...)
putPrm.SetCopiesNumberByVectors(x.copyNum)
if x.prepareLocally {
res, err := x.cli.NetworkInfo(ctx, client.PrmNetworkInfo{})
if err != nil {
return client.PrmObjectPutInit{}, err
}
putPrm.WithObjectMaxSize(res.Info().MaxObjectSize())
putPrm.WithEpochSource(epochSource(res.Info().CurrentEpoch()))
putPrm.WithoutHomomorphicHash(res.Info().HomomorphicHashingDisabled())
}
return putPrm, nil
}
// PutObjectRes groups the resulting values of PutObject operation.
type PutObjectRes struct {
id oid.ID
@ -370,28 +407,21 @@ func (x PutObjectRes) ID() oid.ID {
return x.id
}
type epochSource uint64
func (s epochSource) CurrentEpoch() uint64 {
return uint64(s)
}
// PutObject saves the object in FrostFS network.
//
// Returns any error which prevented the operation from completing correctly in error return.
func PutObject(ctx context.Context, prm PutObjectPrm) (*PutObjectRes, error) {
var putPrm client.PrmObjectPutInit
if prm.sessionToken != nil {
putPrm.WithinSession(*prm.sessionToken)
sdkPrm, err := prm.convertToSDKPrm(ctx)
if err != nil {
return nil, fmt.Errorf("unable to create parameters of object put operation: %w", err)
}
if prm.bearerToken != nil {
putPrm.WithBearerToken(*prm.bearerToken)
}
if prm.local {
putPrm.MarkLocal()
}
putPrm.WithXHeaders(prm.xHeaders...)
putPrm.SetCopiesNumberByVectors(prm.copyNum)
wrt, err := prm.cli.ObjectPutInit(ctx, putPrm)
wrt, err := prm.cli.ObjectPutInit(ctx, sdkPrm)
if err != nil {
return nil, fmt.Errorf("init object writing: %w", err)
}

View File

@ -23,9 +23,10 @@ import (
)
const (
noProgressFlag = "no-progress"
notificationFlag = "notify"
copiesNumberFlag = "copies-number"
noProgressFlag = "no-progress"
notificationFlag = "notify"
copiesNumberFlag = "copies-number"
prepareLocallyFlag = "prepare-locally"
)
var putExpiredOn uint64
@ -54,6 +55,7 @@ func initObjectPutCmd() {
flags.Bool("disable-timestamp", false, "Do not set well-known timestamp attribute")
flags.Uint64VarP(&putExpiredOn, commonflags.ExpireAt, "e", 0, "The last active epoch in the life of the object")
flags.Bool(noProgressFlag, false, "Do not show progress bar")
flags.Bool(prepareLocallyFlag, false, "Generate object header on the client side (for big object - split locally too)")
flags.String(notificationFlag, "", "Object notification in the form of *epoch*:*topic*; '-' topic means using default")
flags.Bool(binaryFlag, false, "Deserialize object structure from given file.")
@ -102,7 +104,11 @@ func putObject(cmd *cobra.Command, _ []string) {
}
var prm internalclient.PutObjectPrm
ReadOrOpenSession(cmd, &prm, pk, cnr, nil)
if prepareLocally, _ := cmd.Flags().GetBool(prepareLocallyFlag); prepareLocally {
prm.PrepareLocally()
} else {
ReadOrOpenSession(cmd, &prm, pk, cnr, nil)
}
Prepare(cmd, &prm)
prm.SetHeader(obj)
@ -121,15 +127,7 @@ func putObject(cmd *cobra.Command, _ []string) {
copyNum, err := cmd.Flags().GetString(copiesNumberFlag)
commonCmd.ExitOnErr(cmd, "can't parse object copies numbers information: %w", err)
if len(copyNum) > 0 {
var cn []uint32
for _, num := range strings.Split(copyNum, ",") {
val, err := strconv.ParseUint(num, 10, 32)
commonCmd.ExitOnErr(cmd, "can't parse object copies numbers information: %w", err)
cn = append(cn, uint32(val))
}
prm.SetCopiesNumberByVectors(cn)
}
prm.SetCopiesNumberByVectors(parseCopyNumber(cmd, copyNum))
res, err := internalclient.PutObject(cmd.Context(), prm)
if p != nil {
@ -141,6 +139,18 @@ func putObject(cmd *cobra.Command, _ []string) {
cmd.Printf(" OID: %s\n CID: %s\n", res.ID(), cnr)
}
func parseCopyNumber(cmd *cobra.Command, copyNum string) []uint32 {
var cn []uint32
if len(copyNum) > 0 {
for _, num := range strings.Split(copyNum, ",") {
val, err := strconv.ParseUint(num, 10, 32)
commonCmd.ExitOnErr(cmd, "can't parse object copies numbers information: %w", err)
cn = append(cn, uint32(val))
}
}
return cn
}
func readFilePayload(filename string, cmd *cobra.Command) (io.Reader, cid.ID, user.ID) {
buf, err := os.ReadFile(filename)
commonCmd.ExitOnErr(cmd, "unable to read given file: %w", err)