Compare commits

...

1 commit

Author SHA1 Message Date
09c66fc93c
WIP cli: Allow to upload object parts in parallel
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2024-11-29 15:46:46 +03:00
2 changed files with 139 additions and 0 deletions

View file

@ -4,6 +4,7 @@ import (
"bytes" "bytes"
"cmp" "cmp"
"context" "context"
"crypto/ecdsa"
"errors" "errors"
"fmt" "fmt"
"io" "io"
@ -13,6 +14,7 @@ import (
"strings" "strings"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/accounting" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/accounting"
buffPool "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/util/pool"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/checksum" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/checksum"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container" containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
@ -20,7 +22,9 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/transformer"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/version" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/version"
"golang.org/x/sync/errgroup"
) )
var errMissingHeaderInResponse = errors.New("missing header in response") var errMissingHeaderInResponse = errors.New("missing header in response")
@ -449,6 +453,119 @@ func PutObject(ctx context.Context, prm PutObjectPrm) (*PutObjectRes, error) {
}, nil }, nil
} }
func PutSingle(ctx context.Context, pk *ecdsa.PrivateKey, threadCount int, maxSize uint64, prm PutObjectPrm) (*PutObjectRes, error) {
res, err := prm.cli.NetworkInfo(ctx, client.PrmNetworkInfo{})
if err != nil {
return nil, err
}
if maxSize == 0 {
maxSize = res.Info().MaxObjectSize()
}
if threadCount <= 0 {
threadCount = 1
}
it := &internalTarget{client: prm.cli, prm: client.PrmObjectPutSingle{
XHeaders: prm.xHeaders,
BearerToken: prm.bearerToken,
Local: prm.local,
CopiesNumber: prm.copyNum,
Key: pk,
}}
it.eg.SetLimit(threadCount)
defer it.eg.Wait()
sz := prm.hdr.PayloadSize()
pool := buffPool.NewBufferPool(uint32(res.Info().MaxObjectSize()))
wrt := transformer.NewPayloadSizeLimiter(transformer.Params{
Key: pk,
NextTargetInit: func() transformer.ObjectWriter {
return it
},
NetworkState: epochSource(res.Info().CurrentEpoch()),
MaxSize: maxSize,
WithoutHomomorphicHash: res.Info().HomomorphicHashingDisabled(),
SizeHint: sz,
Pool: &pool,
})
if err := wrt.WriteHeader(ctx, prm.hdr); err != nil {
return nil, err
}
if prm.headerCallback != nil {
prm.headerCallback()
}
if data := prm.hdr.Payload(); len(data) > 0 {
if prm.rdr != nil {
prm.rdr = io.MultiReader(bytes.NewReader(data), prm.rdr)
} else {
prm.rdr = bytes.NewReader(data)
sz = uint64(len(data))
}
}
if prm.rdr != nil {
const defaultBufferSizePut = 4 << 20 // Maximum chunk size is 3 MiB in the SDK.
if sz == 0 || sz > defaultBufferSizePut {
sz = defaultBufferSizePut
}
buf := make([]byte, sz)
var n int
for {
n, err = prm.rdr.Read(buf)
if n > 0 {
if _, err := wrt.Write(ctx, buf[:n]); err != nil {
return nil, err
}
continue
}
if errors.Is(err, io.EOF) {
break
}
return nil, fmt.Errorf("read payload: %w", err)
}
}
if err := it.eg.Wait(); err != nil {
return nil, err
}
cliRes, err := wrt.Close(ctx)
if err != nil { // here err already carries both status and client errors
return nil, fmt.Errorf("client failure: %w", err)
}
return &PutObjectRes{id: cliRes.SelfID}, nil
}
type internalTarget struct {
eg errgroup.Group
client *client.Client
pool *buffPool.BufferPool
prm client.PrmObjectPutSingle
}
func (it *internalTarget) WriteObject(ctx context.Context, obj *objectSDK.Object) error {
it.eg.Go(func() error {
defer it.pool.Put(&buffPool.Buffer{Data: obj.Payload()})
prm := it.prm
prm.Object = obj
_, err := it.client.ObjectPutSingle(ctx, prm)
return err
})
return nil
}
// DeleteObjectPrm groups parameters of DeleteObject operation. // DeleteObjectPrm groups parameters of DeleteObject operation.
type DeleteObjectPrm struct { type DeleteObjectPrm struct {
commonObjectPrm commonObjectPrm

View file

@ -27,6 +27,8 @@ const (
notificationFlag = "notify" notificationFlag = "notify"
copiesNumberFlag = "copies-number" copiesNumberFlag = "copies-number"
prepareLocallyFlag = "prepare-locally" prepareLocallyFlag = "prepare-locally"
threadCountFlag = "thread-count"
maxSizeFlag = "max-size-mb"
) )
var putExpiredOn uint64 var putExpiredOn uint64
@ -61,6 +63,8 @@ func initObjectPutCmd() {
flags.Bool(binaryFlag, false, "Deserialize object structure from given file.") flags.Bool(binaryFlag, false, "Deserialize object structure from given file.")
flags.String(copiesNumberFlag, "", "Number of copies of the object to store within the RPC call") flags.String(copiesNumberFlag, "", "Number of copies of the object to store within the RPC call")
flags.Int(threadCountFlag, 1, "Max objects to put in parallel")
flags.Uint64(maxSizeFlag, 0, "Max object size in mebibytes")
} }
func putObject(cmd *cobra.Command, _ []string) { func putObject(cmd *cobra.Command, _ []string) {
@ -107,6 +111,24 @@ func putObject(cmd *cobra.Command, _ []string) {
if prepareLocally, _ := cmd.Flags().GetBool(prepareLocallyFlag); prepareLocally { if prepareLocally, _ := cmd.Flags().GetBool(prepareLocallyFlag); prepareLocally {
prm.SetClient(internalclient.GetSDKClientByFlag(cmd, pk, commonflags.RPC)) prm.SetClient(internalclient.GetSDKClientByFlag(cmd, pk, commonflags.RPC))
prm.PrepareLocally() prm.PrepareLocally()
Prepare(cmd, &prm)
prm.SetHeader(obj)
prm.SetPayloadReader(payloadReader)
copyNum, err := cmd.Flags().GetString(copiesNumberFlag)
commonCmd.ExitOnErr(cmd, "can't parse object copies numbers information: %w", err)
prm.SetCopiesNumberByVectors(parseCopyNumber(cmd, copyNum))
threadCount, _ := cmd.Flags().GetInt(threadCountFlag)
maxSize, _ := cmd.Flags().GetUint64(maxSizeFlag)
maxSize <<= 20 // In mebibytes.
res, err := internalclient.PutSingle(cmd.Context(), pk, threadCount, maxSize, prm)
commonCmd.ExitOnErr(cmd, "rpc error: %w", err)
cmd.Printf("[%s] Object successfully stored\n", filename)
cmd.Printf(" OID: %s\n CID: %s\n", res.ID(), cnr)
} else { } else {
ReadOrOpenSession(cmd, &prm, pk, cnr, nil) ReadOrOpenSession(cmd, &prm, pk, cnr, nil)
} }