WIP cli: Allow to upload object parts in parallel

Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
This commit is contained in:
Evgenii Stratonikov 2024-11-29 15:46:46 +03:00
parent 01acec708f
commit 09c66fc93c
Signed by: fyrchik
SSH key fingerprint: SHA256:m/TTwCzjnRkXgnzEx9X92ccxy1CcVeinOgDb3NPWWmg
2 changed files with 139 additions and 0 deletions

View file

@ -4,6 +4,7 @@ import (
"bytes"
"cmp"
"context"
"crypto/ecdsa"
"errors"
"fmt"
"io"
@ -13,6 +14,7 @@ import (
"strings"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/accounting"
buffPool "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/util/pool"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/checksum"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
@ -20,7 +22,9 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/transformer"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/version"
"golang.org/x/sync/errgroup"
)
var errMissingHeaderInResponse = errors.New("missing header in response")
@ -449,6 +453,119 @@ func PutObject(ctx context.Context, prm PutObjectPrm) (*PutObjectRes, error) {
}, nil
}
func PutSingle(ctx context.Context, pk *ecdsa.PrivateKey, threadCount int, maxSize uint64, prm PutObjectPrm) (*PutObjectRes, error) {
res, err := prm.cli.NetworkInfo(ctx, client.PrmNetworkInfo{})
if err != nil {
return nil, err
}
if maxSize == 0 {
maxSize = res.Info().MaxObjectSize()
}
if threadCount <= 0 {
threadCount = 1
}
it := &internalTarget{client: prm.cli, prm: client.PrmObjectPutSingle{
XHeaders: prm.xHeaders,
BearerToken: prm.bearerToken,
Local: prm.local,
CopiesNumber: prm.copyNum,
Key: pk,
}}
it.eg.SetLimit(threadCount)
defer it.eg.Wait()
sz := prm.hdr.PayloadSize()
pool := buffPool.NewBufferPool(uint32(res.Info().MaxObjectSize()))
wrt := transformer.NewPayloadSizeLimiter(transformer.Params{
Key: pk,
NextTargetInit: func() transformer.ObjectWriter {
return it
},
NetworkState: epochSource(res.Info().CurrentEpoch()),
MaxSize: maxSize,
WithoutHomomorphicHash: res.Info().HomomorphicHashingDisabled(),
SizeHint: sz,
Pool: &pool,
})
if err := wrt.WriteHeader(ctx, prm.hdr); err != nil {
return nil, err
}
if prm.headerCallback != nil {
prm.headerCallback()
}
if data := prm.hdr.Payload(); len(data) > 0 {
if prm.rdr != nil {
prm.rdr = io.MultiReader(bytes.NewReader(data), prm.rdr)
} else {
prm.rdr = bytes.NewReader(data)
sz = uint64(len(data))
}
}
if prm.rdr != nil {
const defaultBufferSizePut = 4 << 20 // Maximum chunk size is 3 MiB in the SDK.
if sz == 0 || sz > defaultBufferSizePut {
sz = defaultBufferSizePut
}
buf := make([]byte, sz)
var n int
for {
n, err = prm.rdr.Read(buf)
if n > 0 {
if _, err := wrt.Write(ctx, buf[:n]); err != nil {
return nil, err
}
continue
}
if errors.Is(err, io.EOF) {
break
}
return nil, fmt.Errorf("read payload: %w", err)
}
}
if err := it.eg.Wait(); err != nil {
return nil, err
}
cliRes, err := wrt.Close(ctx)
if err != nil { // here err already carries both status and client errors
return nil, fmt.Errorf("client failure: %w", err)
}
return &PutObjectRes{id: cliRes.SelfID}, nil
}
type internalTarget struct {
eg errgroup.Group
client *client.Client
pool *buffPool.BufferPool
prm client.PrmObjectPutSingle
}
func (it *internalTarget) WriteObject(ctx context.Context, obj *objectSDK.Object) error {
it.eg.Go(func() error {
defer it.pool.Put(&buffPool.Buffer{Data: obj.Payload()})
prm := it.prm
prm.Object = obj
_, err := it.client.ObjectPutSingle(ctx, prm)
return err
})
return nil
}
// DeleteObjectPrm groups parameters of DeleteObject operation.
type DeleteObjectPrm struct {
commonObjectPrm

View file

@ -27,6 +27,8 @@ const (
notificationFlag = "notify"
copiesNumberFlag = "copies-number"
prepareLocallyFlag = "prepare-locally"
threadCountFlag = "thread-count"
maxSizeFlag = "max-size-mb"
)
var putExpiredOn uint64
@ -61,6 +63,8 @@ func initObjectPutCmd() {
flags.Bool(binaryFlag, false, "Deserialize object structure from given file.")
flags.String(copiesNumberFlag, "", "Number of copies of the object to store within the RPC call")
flags.Int(threadCountFlag, 1, "Max objects to put in parallel")
flags.Uint64(maxSizeFlag, 0, "Max object size in mebibytes")
}
func putObject(cmd *cobra.Command, _ []string) {
@ -107,6 +111,24 @@ func putObject(cmd *cobra.Command, _ []string) {
if prepareLocally, _ := cmd.Flags().GetBool(prepareLocallyFlag); prepareLocally {
prm.SetClient(internalclient.GetSDKClientByFlag(cmd, pk, commonflags.RPC))
prm.PrepareLocally()
Prepare(cmd, &prm)
prm.SetHeader(obj)
prm.SetPayloadReader(payloadReader)
copyNum, err := cmd.Flags().GetString(copiesNumberFlag)
commonCmd.ExitOnErr(cmd, "can't parse object copies numbers information: %w", err)
prm.SetCopiesNumberByVectors(parseCopyNumber(cmd, copyNum))
threadCount, _ := cmd.Flags().GetInt(threadCountFlag)
maxSize, _ := cmd.Flags().GetUint64(maxSizeFlag)
maxSize <<= 20 // In mebibytes.
res, err := internalclient.PutSingle(cmd.Context(), pk, threadCount, maxSize, prm)
commonCmd.ExitOnErr(cmd, "rpc error: %w", err)
cmd.Printf("[%s] Object successfully stored\n", filename)
cmd.Printf(" OID: %s\n CID: %s\n", res.ID(), cnr)
} else {
ReadOrOpenSession(cmd, &prm, pk, cnr, nil)
}