From 09c66fc93c1818003c313885d3cd9113cd6db8ee Mon Sep 17 00:00:00 2001 From: Evgenii Stratonikov Date: Fri, 29 Nov 2024 15:46:46 +0300 Subject: [PATCH] WIP cli: Allow to upload object parts in parallel Signed-off-by: Evgenii Stratonikov --- cmd/frostfs-cli/internal/client/client.go | 117 ++++++++++++++++++++++ cmd/frostfs-cli/modules/object/put.go | 22 ++++ 2 files changed, 139 insertions(+) diff --git a/cmd/frostfs-cli/internal/client/client.go b/cmd/frostfs-cli/internal/client/client.go index 948d61f36..9b0ddccb9 100644 --- a/cmd/frostfs-cli/internal/client/client.go +++ b/cmd/frostfs-cli/internal/client/client.go @@ -4,6 +4,7 @@ import ( "bytes" "cmp" "context" + "crypto/ecdsa" "errors" "fmt" "io" @@ -13,6 +14,7 @@ import ( "strings" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/accounting" + buffPool "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/util/pool" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/checksum" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client" containerSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container" @@ -20,7 +22,9 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/transformer" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/version" + "golang.org/x/sync/errgroup" ) var errMissingHeaderInResponse = errors.New("missing header in response") @@ -449,6 +453,119 @@ func PutObject(ctx context.Context, prm PutObjectPrm) (*PutObjectRes, error) { }, nil } +func PutSingle(ctx context.Context, pk *ecdsa.PrivateKey, threadCount int, maxSize uint64, prm PutObjectPrm) (*PutObjectRes, error) { + res, err := prm.cli.NetworkInfo(ctx, client.PrmNetworkInfo{}) + if err != nil { + return nil, err + } + + if maxSize == 0 { + maxSize = res.Info().MaxObjectSize() + } + if threadCount <= 0 { + threadCount = 1 + } + + it := &internalTarget{client: prm.cli, prm: client.PrmObjectPutSingle{ + XHeaders: prm.xHeaders, + BearerToken: prm.bearerToken, + Local: prm.local, + CopiesNumber: prm.copyNum, + Key: pk, + }} + it.eg.SetLimit(threadCount) + defer it.eg.Wait() + + sz := prm.hdr.PayloadSize() + pool := buffPool.NewBufferPool(uint32(res.Info().MaxObjectSize())) + wrt := transformer.NewPayloadSizeLimiter(transformer.Params{ + Key: pk, + NextTargetInit: func() transformer.ObjectWriter { + return it + }, + NetworkState: epochSource(res.Info().CurrentEpoch()), + MaxSize: maxSize, + WithoutHomomorphicHash: res.Info().HomomorphicHashingDisabled(), + SizeHint: sz, + Pool: &pool, + }) + + if err := wrt.WriteHeader(ctx, prm.hdr); err != nil { + return nil, err + } + if prm.headerCallback != nil { + prm.headerCallback() + } + + if data := prm.hdr.Payload(); len(data) > 0 { + if prm.rdr != nil { + prm.rdr = io.MultiReader(bytes.NewReader(data), prm.rdr) + } else { + prm.rdr = bytes.NewReader(data) + sz = uint64(len(data)) + } + } + + if prm.rdr != nil { + const defaultBufferSizePut = 4 << 20 // Maximum chunk size is 3 MiB in the SDK. + + if sz == 0 || sz > defaultBufferSizePut { + sz = defaultBufferSizePut + } + + buf := make([]byte, sz) + + var n int + + for { + n, err = prm.rdr.Read(buf) + if n > 0 { + if _, err := wrt.Write(ctx, buf[:n]); err != nil { + return nil, err + } + continue + } + + if errors.Is(err, io.EOF) { + break + } + + return nil, fmt.Errorf("read payload: %w", err) + } + } + + if err := it.eg.Wait(); err != nil { + return nil, err + } + + cliRes, err := wrt.Close(ctx) + if err != nil { // here err already carries both status and client errors + return nil, fmt.Errorf("client failure: %w", err) + } + + return &PutObjectRes{id: cliRes.SelfID}, nil +} + +type internalTarget struct { + eg errgroup.Group + client *client.Client + pool *buffPool.BufferPool + prm client.PrmObjectPutSingle +} + +func (it *internalTarget) WriteObject(ctx context.Context, obj *objectSDK.Object) error { + it.eg.Go(func() error { + defer it.pool.Put(&buffPool.Buffer{Data: obj.Payload()}) + + prm := it.prm + prm.Object = obj + + _, err := it.client.ObjectPutSingle(ctx, prm) + return err + }) + return nil +} + // DeleteObjectPrm groups parameters of DeleteObject operation. type DeleteObjectPrm struct { commonObjectPrm diff --git a/cmd/frostfs-cli/modules/object/put.go b/cmd/frostfs-cli/modules/object/put.go index affe9bbba..5a667aa2c 100644 --- a/cmd/frostfs-cli/modules/object/put.go +++ b/cmd/frostfs-cli/modules/object/put.go @@ -27,6 +27,8 @@ const ( notificationFlag = "notify" copiesNumberFlag = "copies-number" prepareLocallyFlag = "prepare-locally" + threadCountFlag = "thread-count" + maxSizeFlag = "max-size-mb" ) var putExpiredOn uint64 @@ -61,6 +63,8 @@ func initObjectPutCmd() { flags.Bool(binaryFlag, false, "Deserialize object structure from given file.") flags.String(copiesNumberFlag, "", "Number of copies of the object to store within the RPC call") + flags.Int(threadCountFlag, 1, "Max objects to put in parallel") + flags.Uint64(maxSizeFlag, 0, "Max object size in mebibytes") } func putObject(cmd *cobra.Command, _ []string) { @@ -107,6 +111,24 @@ func putObject(cmd *cobra.Command, _ []string) { if prepareLocally, _ := cmd.Flags().GetBool(prepareLocallyFlag); prepareLocally { prm.SetClient(internalclient.GetSDKClientByFlag(cmd, pk, commonflags.RPC)) prm.PrepareLocally() + Prepare(cmd, &prm) + prm.SetHeader(obj) + prm.SetPayloadReader(payloadReader) + + copyNum, err := cmd.Flags().GetString(copiesNumberFlag) + commonCmd.ExitOnErr(cmd, "can't parse object copies numbers information: %w", err) + prm.SetCopiesNumberByVectors(parseCopyNumber(cmd, copyNum)) + + threadCount, _ := cmd.Flags().GetInt(threadCountFlag) + maxSize, _ := cmd.Flags().GetUint64(maxSizeFlag) + maxSize <<= 20 // In mebibytes. + + res, err := internalclient.PutSingle(cmd.Context(), pk, threadCount, maxSize, prm) + commonCmd.ExitOnErr(cmd, "rpc error: %w", err) + + cmd.Printf("[%s] Object successfully stored\n", filename) + cmd.Printf(" OID: %s\n CID: %s\n", res.ID(), cnr) + } else { ReadOrOpenSession(cmd, &prm, pk, cnr, nil) }