cli: Allow to split object on the client side #471
2 changed files with 70 additions and 30 deletions
|
@ -336,6 +336,8 @@ type PutObjectPrm struct {
|
||||||
rdr io.Reader
|
rdr io.Reader
|
||||||
|
|
||||||
headerCallback func(*object.Object)
|
headerCallback func(*object.Object)
|
||||||
|
|
||||||
|
prepareLocally bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// SetHeader sets object header.
|
// SetHeader sets object header.
|
||||||
|
@ -360,6 +362,41 @@ func (x *PutObjectPrm) SetCopiesNumberByVectors(copiesNumbers []uint32) {
|
||||||
x.copyNum = copiesNumbers
|
x.copyNum = copiesNumbers
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// PrepareLocally generate object header on the client side.
|
||||||
|
// For big object - split locally too.
|
||||||
|
func (x *PutObjectPrm) PrepareLocally() {
|
||||||
|
x.prepareLocally = true
|
||||||
|
}
|
||||||
|
|
||||||
|
func (x *PutObjectPrm) convertToSDKPrm(ctx context.Context) (client.PrmObjectPutInit, error) {
|
||||||
|
var putPrm client.PrmObjectPutInit
|
||||||
|
if !x.prepareLocally && x.sessionToken != nil {
|
||||||
|
putPrm.WithinSession(*x.sessionToken)
|
||||||
|
}
|
||||||
|
|
||||||
|
if x.bearerToken != nil {
|
||||||
|
putPrm.WithBearerToken(*x.bearerToken)
|
||||||
|
}
|
||||||
|
|
||||||
|
if x.local {
|
||||||
|
putPrm.MarkLocal()
|
||||||
|
}
|
||||||
|
|
||||||
|
putPrm.WithXHeaders(x.xHeaders...)
|
||||||
|
putPrm.SetCopiesNumberByVectors(x.copyNum)
|
||||||
|
|
||||||
|
if x.prepareLocally {
|
||||||
|
res, err := x.cli.NetworkInfo(ctx, client.PrmNetworkInfo{})
|
||||||
|
if err != nil {
|
||||||
|
return client.PrmObjectPutInit{}, err
|
||||||
|
}
|
||||||
|
putPrm.WithObjectMaxSize(res.Info().MaxObjectSize())
|
||||||
|
putPrm.WithEpochSource(epochSource(res.Info().CurrentEpoch()))
|
||||||
|
putPrm.WithoutHomomorphicHash(res.Info().HomomorphicHashingDisabled())
|
||||||
|
}
|
||||||
|
return putPrm, nil
|
||||||
|
}
|
||||||
|
|
||||||
// PutObjectRes groups the resulting values of PutObject operation.
|
// PutObjectRes groups the resulting values of PutObject operation.
|
||||||
type PutObjectRes struct {
|
type PutObjectRes struct {
|
||||||
id oid.ID
|
id oid.ID
|
||||||
|
@ -370,28 +407,21 @@ func (x PutObjectRes) ID() oid.ID {
|
||||||
return x.id
|
return x.id
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type epochSource uint64
|
||||||
|
|
||||||
|
func (s epochSource) CurrentEpoch() uint64 {
|
||||||
|
return uint64(s)
|
||||||
|
}
|
||||||
|
|
||||||
// PutObject saves the object in FrostFS network.
|
// PutObject saves the object in FrostFS network.
|
||||||
//
|
//
|
||||||
// Returns any error which prevented the operation from completing correctly in error return.
|
// Returns any error which prevented the operation from completing correctly in error return.
|
||||||
func PutObject(ctx context.Context, prm PutObjectPrm) (*PutObjectRes, error) {
|
func PutObject(ctx context.Context, prm PutObjectPrm) (*PutObjectRes, error) {
|
||||||
var putPrm client.PrmObjectPutInit
|
sdkPrm, err := prm.convertToSDKPrm(ctx)
|
||||||
|
if err != nil {
|
||||||
if prm.sessionToken != nil {
|
return nil, fmt.Errorf("unable to create parameters of object put operation: %w", err)
|
||||||
putPrm.WithinSession(*prm.sessionToken)
|
|
||||||
}
|
}
|
||||||
|
wrt, err := prm.cli.ObjectPutInit(ctx, sdkPrm)
|
||||||
if prm.bearerToken != nil {
|
|
||||||
putPrm.WithBearerToken(*prm.bearerToken)
|
|
||||||
}
|
|
||||||
|
|
||||||
if prm.local {
|
|
||||||
putPrm.MarkLocal()
|
|
||||||
}
|
|
||||||
|
|
||||||
putPrm.WithXHeaders(prm.xHeaders...)
|
|
||||||
putPrm.SetCopiesNumberByVectors(prm.copyNum)
|
|
||||||
|
|
||||||
wrt, err := prm.cli.ObjectPutInit(ctx, putPrm)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("init object writing: %w", err)
|
return nil, fmt.Errorf("init object writing: %w", err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -23,9 +23,10 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
noProgressFlag = "no-progress"
|
noProgressFlag = "no-progress"
|
||||||
notificationFlag = "notify"
|
notificationFlag = "notify"
|
||||||
copiesNumberFlag = "copies-number"
|
copiesNumberFlag = "copies-number"
|
||||||
|
prepareLocallyFlag = "prepare-locally"
|
||||||
)
|
)
|
||||||
|
|
||||||
var putExpiredOn uint64
|
var putExpiredOn uint64
|
||||||
|
@ -54,6 +55,7 @@ func initObjectPutCmd() {
|
||||||
flags.Bool("disable-timestamp", false, "Do not set well-known timestamp attribute")
|
flags.Bool("disable-timestamp", false, "Do not set well-known timestamp attribute")
|
||||||
flags.Uint64VarP(&putExpiredOn, commonflags.ExpireAt, "e", 0, "The last active epoch in the life of the object")
|
flags.Uint64VarP(&putExpiredOn, commonflags.ExpireAt, "e", 0, "The last active epoch in the life of the object")
|
||||||
flags.Bool(noProgressFlag, false, "Do not show progress bar")
|
flags.Bool(noProgressFlag, false, "Do not show progress bar")
|
||||||
|
flags.Bool(prepareLocallyFlag, false, "Generate object header on the client side (for big object - split locally too)")
|
||||||
|
|
||||||
flags.String(notificationFlag, "", "Object notification in the form of *epoch*:*topic*; '-' topic means using default")
|
flags.String(notificationFlag, "", "Object notification in the form of *epoch*:*topic*; '-' topic means using default")
|
||||||
flags.Bool(binaryFlag, false, "Deserialize object structure from given file.")
|
flags.Bool(binaryFlag, false, "Deserialize object structure from given file.")
|
||||||
|
@ -102,7 +104,11 @@ func putObject(cmd *cobra.Command, _ []string) {
|
||||||
}
|
}
|
||||||
|
|
||||||
var prm internalclient.PutObjectPrm
|
var prm internalclient.PutObjectPrm
|
||||||
ReadOrOpenSession(cmd, &prm, pk, cnr, nil)
|
if prepareLocally, _ := cmd.Flags().GetBool(prepareLocallyFlag); prepareLocally {
|
||||||
|
prm.PrepareLocally()
|
||||||
|
} else {
|
||||||
|
ReadOrOpenSession(cmd, &prm, pk, cnr, nil)
|
||||||
|
}
|
||||||
Prepare(cmd, &prm)
|
Prepare(cmd, &prm)
|
||||||
prm.SetHeader(obj)
|
prm.SetHeader(obj)
|
||||||
|
|
||||||
|
@ -121,15 +127,7 @@ func putObject(cmd *cobra.Command, _ []string) {
|
||||||
|
|
||||||
copyNum, err := cmd.Flags().GetString(copiesNumberFlag)
|
copyNum, err := cmd.Flags().GetString(copiesNumberFlag)
|
||||||
commonCmd.ExitOnErr(cmd, "can't parse object copies numbers information: %w", err)
|
commonCmd.ExitOnErr(cmd, "can't parse object copies numbers information: %w", err)
|
||||||
if len(copyNum) > 0 {
|
prm.SetCopiesNumberByVectors(parseCopyNumber(cmd, copyNum))
|
||||||
var cn []uint32
|
|
||||||
for _, num := range strings.Split(copyNum, ",") {
|
|
||||||
val, err := strconv.ParseUint(num, 10, 32)
|
|
||||||
commonCmd.ExitOnErr(cmd, "can't parse object copies numbers information: %w", err)
|
|
||||||
cn = append(cn, uint32(val))
|
|
||||||
}
|
|
||||||
prm.SetCopiesNumberByVectors(cn)
|
|
||||||
}
|
|
||||||
|
|
||||||
res, err := internalclient.PutObject(cmd.Context(), prm)
|
res, err := internalclient.PutObject(cmd.Context(), prm)
|
||||||
if p != nil {
|
if p != nil {
|
||||||
|
@ -141,6 +139,18 @@ func putObject(cmd *cobra.Command, _ []string) {
|
||||||
cmd.Printf(" OID: %s\n CID: %s\n", res.ID(), cnr)
|
cmd.Printf(" OID: %s\n CID: %s\n", res.ID(), cnr)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func parseCopyNumber(cmd *cobra.Command, copyNum string) []uint32 {
|
||||||
|
var cn []uint32
|
||||||
|
if len(copyNum) > 0 {
|
||||||
|
for _, num := range strings.Split(copyNum, ",") {
|
||||||
|
val, err := strconv.ParseUint(num, 10, 32)
|
||||||
|
commonCmd.ExitOnErr(cmd, "can't parse object copies numbers information: %w", err)
|
||||||
|
cn = append(cn, uint32(val))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return cn
|
||||||
|
}
|
||||||
|
|
||||||
func readFilePayload(filename string, cmd *cobra.Command) (io.Reader, cid.ID, user.ID) {
|
func readFilePayload(filename string, cmd *cobra.Command) (io.Reader, cid.ID, user.ID) {
|
||||||
buf, err := os.ReadFile(filename)
|
buf, err := os.ReadFile(filename)
|
||||||
commonCmd.ExitOnErr(cmd, "unable to read given file: %w", err)
|
commonCmd.ExitOnErr(cmd, "unable to read given file: %w", err)
|
||||||
|
|
Loading…
Reference in a new issue