diff --git a/go.mod b/go.mod index 810a01b..c74cfb3 100644 --- a/go.mod +++ b/go.mod @@ -7,9 +7,10 @@ require ( git.frostfs.info/TrueCloudLab/frostfs-s3-gw v0.27.0-rc.2 git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20230705125206-769f6eec0565 git.frostfs.info/TrueCloudLab/tzhash v1.8.0 - github.com/aws/aws-sdk-go-v2 v1.18.1 - github.com/aws/aws-sdk-go-v2/config v1.18.27 - github.com/aws/aws-sdk-go-v2/service/s3 v1.36.0 + github.com/aws/aws-sdk-go-v2 v1.19.0 + github.com/aws/aws-sdk-go-v2/config v1.18.28 + github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.11.72 + github.com/aws/aws-sdk-go-v2/service/s3 v1.37.0 github.com/dop251/goja v0.0.0-20230626124041-ba8a63e79201 github.com/go-loremipsum/loremipsum v1.1.3 github.com/google/uuid v1.3.0 diff --git a/internal/s3/client.go b/internal/s3/client.go index 0c05d57..23d6b86 100644 --- a/internal/s3/client.go +++ b/internal/s3/client.go @@ -5,11 +5,13 @@ import ( "context" "crypto/sha256" "encoding/hex" + "fmt" "strconv" "time" "git.frostfs.info/TrueCloudLab/xk6-frostfs/internal/stats" "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/feature/s3/manager" "github.com/aws/aws-sdk-go-v2/service/s3" "github.com/aws/aws-sdk-go-v2/service/s3/types" "github.com/dop251/goja" @@ -70,6 +72,39 @@ func (c *Client) Put(bucket, key string, payload goja.ArrayBuffer) PutResponse { return PutResponse{Success: true} } +const multipartUploadMinPartSize = 5 * 1024 * 1024 // 5MB + +func (c *Client) Multipart(bucket, key string, objPartSize, concurrency int, payload goja.ArrayBuffer) PutResponse { + if objPartSize < multipartUploadMinPartSize { + stats.Report(c.vu, objPutFails, 1) + return PutResponse{Success: false, Error: fmt.Sprintf("part size '%d' must be greater than '%d'(5 MB)", objPartSize, multipartUploadMinPartSize)} + } + + start := time.Now() + uploader := manager.NewUploader(c.cli, func(u *manager.Uploader) { + u.PartSize = int64(objPartSize) + u.Concurrency = concurrency + }) + + payloadReader := bytes.NewReader(payload.Bytes()) + sz := payloadReader.Len() + + _, err := uploader.Upload(c.vu.Context(), &s3.PutObjectInput{ + Bucket: aws.String(bucket), + Key: aws.String(key), + Body: payloadReader, + }) + if err != nil { + stats.Report(c.vu, objPutFails, 1) + return PutResponse{Success: false, Error: err.Error()} + } + + stats.Report(c.vu, objPutTotal, 1) + stats.ReportDataSent(c.vu, float64(sz)) + stats.Report(c.vu, objPutDuration, metrics.D(time.Since(start))) + return PutResponse{Success: true} +} + func (c *Client) Delete(bucket, key string) DeleteResponse { start := time.Now() diff --git a/scenarios/run_scenarios.md b/scenarios/run_scenarios.md index ba76c79..15999ea 100644 --- a/scenarios/run_scenarios.md +++ b/scenarios/run_scenarios.md @@ -135,6 +135,31 @@ Options (in addition to the common options): * `SLEEP_DELETE` - time interval (in seconds) between deleting VU iterations. * `OBJ_NAME` - if specified, this name will be used for all write operations instead of random generation. +## S3 Multipart + +Perform multipart upload operation, break up large objects, so they can be transferred in multiple parts, in parallel + +```shell +$ ./k6 run -e DURATION=600 \ +-e WRITERS=400 -e WRITERS_MULTIPART=10 \ +-e WRITE_OBJ_SIZE=524288 -e WRITE_OBJ_PART_SIZE=10240 \ +-e S3_ENDPOINTS=10.78.70.142:8084,10.78.70.143:8084,10.78.70.144:8084,10.78.70.145:8084 \ +-e PREGEN_JSON=/home/service/s3_4kb.json \ +scenarios/s3_multipart.js +``` + +Options: +* `DURATION` - duration of scenario in seconds. +* `REGISTRY_FILE` - if set, all produced objects will be stored in database for subsequent verification. Database file name will be set to the value of `REGISTRY_FILE`. +* `PREGEN_JSON` - path to json file with pre-generated containers. +* `SLEEP_WRITE` - time interval (in seconds) between writing VU iterations. +* `PAYLOAD_TYPE` - type of an object payload ("random" or "text", default: "random"). +* `S3_ENDPOINTS` - - endpoints of S3 gateways in format `host:port`. To specify multiple endpoints separate them by comma. +* `WRITERS` - number of VUs performing upload payload operation +* `WRITERS_MULTIPART` - number of goroutines that will upload parts in parallel +* `WRITE_OBJ_SIZE` - object size in kb for write(PUT) operations. +* `WRITE_OBJ_PART_SIZE` - part size in kb for multipart upload operations (must be greater or equal 5mb). + ## S3 Local 1. Follow steps 1. and 2. from the normal S3 scenario in order to obtain credentials and a preset file with the information about the buckets and objects that were pre-created. diff --git a/scenarios/s3_multipart.js b/scenarios/s3_multipart.js new file mode 100644 index 0000000..39e943f --- /dev/null +++ b/scenarios/s3_multipart.js @@ -0,0 +1,104 @@ +import datagen from 'k6/x/frostfs/datagen'; +import logging from 'k6/x/frostfs/logging'; +import registry from 'k6/x/frostfs/registry'; +import s3 from 'k6/x/frostfs/s3'; +import {SharedArray} from 'k6/data'; +import {sleep} from 'k6'; +import {textSummary} from './libs/k6-summary-0.0.2.js'; +import {parseEnv} from './libs/env-parser.js'; +import {uuidv4} from './libs/k6-utils-1.4.0.js'; + +parseEnv(); + +const bucket_list = new SharedArray('bucket_list', function () { + return JSON.parse(open(__ENV.PREGEN_JSON)).buckets; +}); + +const summary_json = __ENV.SUMMARY_JSON || "/tmp/summary.json"; + +// Select random S3 endpoint for current VU +const s3_endpoints = __ENV.S3_ENDPOINTS.split(','); +const s3_endpoint = s3_endpoints[Math.floor(Math.random() * s3_endpoints.length)]; +const s3_client = s3.connect(`http://${s3_endpoint}`); +const log = logging.new().withField("endpoint", s3_endpoint); + +const registry_enabled = !!__ENV.REGISTRY_FILE; +const obj_registry = registry_enabled ? registry.open(__ENV.REGISTRY_FILE) : undefined; + +const duration = __ENV.DURATION; + +const generator = datagen.generator(1024 * parseInt(__ENV.WRITE_OBJ_SIZE), __ENV.PAYLOAD_TYPE || ""); + +const scenarios = {}; + +const write_vu_count = parseInt(__ENV.WRITERS || '0'); +if (write_vu_count < 1) { + throw 'number of VUs (env WRITERS) performing write operations should be greater than 0'; +} + +const write_multipart_vu_count = parseInt(__ENV.WRITERS_MULTIPART || '0'); +if (write_multipart_vu_count < 1) { + throw 'number of parts (env WRITERS_MULTIPART) to upload in parallel should be greater than 0'; +} + +if (write_vu_count > 0) { + scenarios.write_multipart = { + executor: 'constant-vus', + vus: write_vu_count, + duration: `${duration}s`, + exec: 'obj_write_multipart', + gracefulStop: '5s', + }; +} + +export const options = { + scenarios, + setupTimeout: '5s', +}; + +export function setup() { + const total_vu_count = write_vu_count * write_multipart_vu_count; + + console.log(`Pregenerated buckets: ${bucket_list.length}`); + console.log(`Writing VUs: ${write_vu_count}`); + console.log(`Writing multipart VUs: ${write_multipart_vu_count}`); + console.log(`Total VUs: ${total_vu_count}`); +} + +export function teardown(data) { + if (obj_registry) { + obj_registry.close(); + } +} + +export function handleSummary(data) { + return { + 'stdout': textSummary(data, {indent: ' ', enableColors: false}), + [summary_json]: JSON.stringify(data), + }; +} + +const write_multipart_part_size = 1024 * parseInt(__ENV.WRITE_OBJ_PART_SIZE || '0') +if (write_multipart_part_size < 5 * 1024 * 1024) { + throw 'part size (env WRITE_OBJ_PART_SIZE * 1024) must be greater than (5 MB)'; +} + +export function obj_write_multipart() { + if (__ENV.SLEEP_WRITE) { + sleep(__ENV.SLEEP_WRITE); + } + + const key = __ENV.OBJ_NAME || uuidv4(); + const bucket = bucket_list[Math.floor(Math.random() * bucket_list.length)]; + + const {payload, hash} = generator.genPayload(registry_enabled); + const resp = s3_client.multipart(bucket, key, write_multipart_part_size, write_multipart_vu_count, payload); + if (!resp.success) { + log.withFields({bucket: bucket, key: key}).error(resp.error); + return; + } + + if (obj_registry) { + obj_registry.addObject("", "", bucket, key, hash); + } +}