forked from TrueCloudLab/xk6-frostfs
[#80] Support parallel multipart
Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
This commit is contained in:
parent
6182d47b43
commit
77d3dd8d6e
4 changed files with 168 additions and 3 deletions
7
go.mod
7
go.mod
|
@ -7,9 +7,10 @@ require (
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-s3-gw v0.27.0-rc.2
|
git.frostfs.info/TrueCloudLab/frostfs-s3-gw v0.27.0-rc.2
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20230705125206-769f6eec0565
|
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20230705125206-769f6eec0565
|
||||||
git.frostfs.info/TrueCloudLab/tzhash v1.8.0
|
git.frostfs.info/TrueCloudLab/tzhash v1.8.0
|
||||||
github.com/aws/aws-sdk-go-v2 v1.18.1
|
github.com/aws/aws-sdk-go-v2 v1.19.0
|
||||||
github.com/aws/aws-sdk-go-v2/config v1.18.27
|
github.com/aws/aws-sdk-go-v2/config v1.18.28
|
||||||
github.com/aws/aws-sdk-go-v2/service/s3 v1.36.0
|
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.11.72
|
||||||
|
github.com/aws/aws-sdk-go-v2/service/s3 v1.37.0
|
||||||
github.com/dop251/goja v0.0.0-20230626124041-ba8a63e79201
|
github.com/dop251/goja v0.0.0-20230626124041-ba8a63e79201
|
||||||
github.com/go-loremipsum/loremipsum v1.1.3
|
github.com/go-loremipsum/loremipsum v1.1.3
|
||||||
github.com/google/uuid v1.3.0
|
github.com/google/uuid v1.3.0
|
||||||
|
|
|
@ -5,11 +5,13 @@ import (
|
||||||
"context"
|
"context"
|
||||||
"crypto/sha256"
|
"crypto/sha256"
|
||||||
"encoding/hex"
|
"encoding/hex"
|
||||||
|
"fmt"
|
||||||
"strconv"
|
"strconv"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/xk6-frostfs/internal/stats"
|
"git.frostfs.info/TrueCloudLab/xk6-frostfs/internal/stats"
|
||||||
"github.com/aws/aws-sdk-go-v2/aws"
|
"github.com/aws/aws-sdk-go-v2/aws"
|
||||||
|
"github.com/aws/aws-sdk-go-v2/feature/s3/manager"
|
||||||
"github.com/aws/aws-sdk-go-v2/service/s3"
|
"github.com/aws/aws-sdk-go-v2/service/s3"
|
||||||
"github.com/aws/aws-sdk-go-v2/service/s3/types"
|
"github.com/aws/aws-sdk-go-v2/service/s3/types"
|
||||||
"github.com/dop251/goja"
|
"github.com/dop251/goja"
|
||||||
|
@ -70,6 +72,39 @@ func (c *Client) Put(bucket, key string, payload goja.ArrayBuffer) PutResponse {
|
||||||
return PutResponse{Success: true}
|
return PutResponse{Success: true}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const multipartUploadMinPartSize = 5 * 1024 * 1024 // 5MB
|
||||||
|
|
||||||
|
func (c *Client) Multipart(bucket, key string, objPartSize, concurrency int, payload goja.ArrayBuffer) PutResponse {
|
||||||
|
if objPartSize < multipartUploadMinPartSize {
|
||||||
|
stats.Report(c.vu, objPutFails, 1)
|
||||||
|
return PutResponse{Success: false, Error: fmt.Sprintf("part size '%d' must be greater than '%d'(5 MB)", objPartSize, multipartUploadMinPartSize)}
|
||||||
|
}
|
||||||
|
|
||||||
|
start := time.Now()
|
||||||
|
uploader := manager.NewUploader(c.cli, func(u *manager.Uploader) {
|
||||||
|
u.PartSize = int64(objPartSize)
|
||||||
|
u.Concurrency = concurrency
|
||||||
|
})
|
||||||
|
|
||||||
|
payloadReader := bytes.NewReader(payload.Bytes())
|
||||||
|
sz := payloadReader.Len()
|
||||||
|
|
||||||
|
_, err := uploader.Upload(c.vu.Context(), &s3.PutObjectInput{
|
||||||
|
Bucket: aws.String(bucket),
|
||||||
|
Key: aws.String(key),
|
||||||
|
Body: payloadReader,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
stats.Report(c.vu, objPutFails, 1)
|
||||||
|
return PutResponse{Success: false, Error: err.Error()}
|
||||||
|
}
|
||||||
|
|
||||||
|
stats.Report(c.vu, objPutTotal, 1)
|
||||||
|
stats.ReportDataSent(c.vu, float64(sz))
|
||||||
|
stats.Report(c.vu, objPutDuration, metrics.D(time.Since(start)))
|
||||||
|
return PutResponse{Success: true}
|
||||||
|
}
|
||||||
|
|
||||||
func (c *Client) Delete(bucket, key string) DeleteResponse {
|
func (c *Client) Delete(bucket, key string) DeleteResponse {
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
|
|
||||||
|
|
|
@ -135,6 +135,31 @@ Options (in addition to the common options):
|
||||||
* `SLEEP_DELETE` - time interval (in seconds) between deleting VU iterations.
|
* `SLEEP_DELETE` - time interval (in seconds) between deleting VU iterations.
|
||||||
* `OBJ_NAME` - if specified, this name will be used for all write operations instead of random generation.
|
* `OBJ_NAME` - if specified, this name will be used for all write operations instead of random generation.
|
||||||
|
|
||||||
|
## S3 Multipart
|
||||||
|
|
||||||
|
Perform multipart upload operation, break up large objects, so they can be transferred in multiple parts, in parallel
|
||||||
|
|
||||||
|
```shell
|
||||||
|
$ ./k6 run -e DURATION=600 \
|
||||||
|
-e WRITERS=400 -e WRITERS_MULTIPART=10 \
|
||||||
|
-e WRITE_OBJ_SIZE=524288 -e WRITE_OBJ_PART_SIZE=10240 \
|
||||||
|
-e S3_ENDPOINTS=10.78.70.142:8084,10.78.70.143:8084,10.78.70.144:8084,10.78.70.145:8084 \
|
||||||
|
-e PREGEN_JSON=/home/service/s3_4kb.json \
|
||||||
|
scenarios/s3_multipart.js
|
||||||
|
```
|
||||||
|
|
||||||
|
Options:
|
||||||
|
* `DURATION` - duration of scenario in seconds.
|
||||||
|
* `REGISTRY_FILE` - if set, all produced objects will be stored in database for subsequent verification. Database file name will be set to the value of `REGISTRY_FILE`.
|
||||||
|
* `PREGEN_JSON` - path to json file with pre-generated containers.
|
||||||
|
* `SLEEP_WRITE` - time interval (in seconds) between writing VU iterations.
|
||||||
|
* `PAYLOAD_TYPE` - type of an object payload ("random" or "text", default: "random").
|
||||||
|
* `S3_ENDPOINTS` - - endpoints of S3 gateways in format `host:port`. To specify multiple endpoints separate them by comma.
|
||||||
|
* `WRITERS` - number of VUs performing upload payload operation
|
||||||
|
* `WRITERS_MULTIPART` - number of goroutines that will upload parts in parallel
|
||||||
|
* `WRITE_OBJ_SIZE` - object size in kb for write(PUT) operations.
|
||||||
|
* `WRITE_OBJ_PART_SIZE` - part size in kb for multipart upload operations (must be greater or equal 5mb).
|
||||||
|
|
||||||
## S3 Local
|
## S3 Local
|
||||||
|
|
||||||
1. Follow steps 1. and 2. from the normal S3 scenario in order to obtain credentials and a preset file with the information about the buckets and objects that were pre-created.
|
1. Follow steps 1. and 2. from the normal S3 scenario in order to obtain credentials and a preset file with the information about the buckets and objects that were pre-created.
|
||||||
|
|
104
scenarios/s3_multipart.js
Normal file
104
scenarios/s3_multipart.js
Normal file
|
@ -0,0 +1,104 @@
|
||||||
|
import datagen from 'k6/x/frostfs/datagen';
|
||||||
|
import logging from 'k6/x/frostfs/logging';
|
||||||
|
import registry from 'k6/x/frostfs/registry';
|
||||||
|
import s3 from 'k6/x/frostfs/s3';
|
||||||
|
import {SharedArray} from 'k6/data';
|
||||||
|
import {sleep} from 'k6';
|
||||||
|
import {textSummary} from './libs/k6-summary-0.0.2.js';
|
||||||
|
import {parseEnv} from './libs/env-parser.js';
|
||||||
|
import {uuidv4} from './libs/k6-utils-1.4.0.js';
|
||||||
|
|
||||||
|
parseEnv();
|
||||||
|
|
||||||
|
const bucket_list = new SharedArray('bucket_list', function () {
|
||||||
|
return JSON.parse(open(__ENV.PREGEN_JSON)).buckets;
|
||||||
|
});
|
||||||
|
|
||||||
|
const summary_json = __ENV.SUMMARY_JSON || "/tmp/summary.json";
|
||||||
|
|
||||||
|
// Select random S3 endpoint for current VU
|
||||||
|
const s3_endpoints = __ENV.S3_ENDPOINTS.split(',');
|
||||||
|
const s3_endpoint = s3_endpoints[Math.floor(Math.random() * s3_endpoints.length)];
|
||||||
|
const s3_client = s3.connect(`http://${s3_endpoint}`);
|
||||||
|
const log = logging.new().withField("endpoint", s3_endpoint);
|
||||||
|
|
||||||
|
const registry_enabled = !!__ENV.REGISTRY_FILE;
|
||||||
|
const obj_registry = registry_enabled ? registry.open(__ENV.REGISTRY_FILE) : undefined;
|
||||||
|
|
||||||
|
const duration = __ENV.DURATION;
|
||||||
|
|
||||||
|
const generator = datagen.generator(1024 * parseInt(__ENV.WRITE_OBJ_SIZE), __ENV.PAYLOAD_TYPE || "");
|
||||||
|
|
||||||
|
const scenarios = {};
|
||||||
|
|
||||||
|
const write_vu_count = parseInt(__ENV.WRITERS || '0');
|
||||||
|
if (write_vu_count < 1) {
|
||||||
|
throw 'number of VUs (env WRITERS) performing write operations should be greater than 0';
|
||||||
|
}
|
||||||
|
|
||||||
|
const write_multipart_vu_count = parseInt(__ENV.WRITERS_MULTIPART || '0');
|
||||||
|
if (write_multipart_vu_count < 1) {
|
||||||
|
throw 'number of parts (env WRITERS_MULTIPART) to upload in parallel should be greater than 0';
|
||||||
|
}
|
||||||
|
|
||||||
|
if (write_vu_count > 0) {
|
||||||
|
scenarios.write_multipart = {
|
||||||
|
executor: 'constant-vus',
|
||||||
|
vus: write_vu_count,
|
||||||
|
duration: `${duration}s`,
|
||||||
|
exec: 'obj_write_multipart',
|
||||||
|
gracefulStop: '5s',
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
export const options = {
|
||||||
|
scenarios,
|
||||||
|
setupTimeout: '5s',
|
||||||
|
};
|
||||||
|
|
||||||
|
export function setup() {
|
||||||
|
const total_vu_count = write_vu_count * write_multipart_vu_count;
|
||||||
|
|
||||||
|
console.log(`Pregenerated buckets: ${bucket_list.length}`);
|
||||||
|
console.log(`Writing VUs: ${write_vu_count}`);
|
||||||
|
console.log(`Writing multipart VUs: ${write_multipart_vu_count}`);
|
||||||
|
console.log(`Total VUs: ${total_vu_count}`);
|
||||||
|
}
|
||||||
|
|
||||||
|
export function teardown(data) {
|
||||||
|
if (obj_registry) {
|
||||||
|
obj_registry.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
export function handleSummary(data) {
|
||||||
|
return {
|
||||||
|
'stdout': textSummary(data, {indent: ' ', enableColors: false}),
|
||||||
|
[summary_json]: JSON.stringify(data),
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
const write_multipart_part_size = 1024 * parseInt(__ENV.WRITE_OBJ_PART_SIZE || '0')
|
||||||
|
if (write_multipart_part_size < 5 * 1024 * 1024) {
|
||||||
|
throw 'part size (env WRITE_OBJ_PART_SIZE * 1024) must be greater than (5 MB)';
|
||||||
|
}
|
||||||
|
|
||||||
|
export function obj_write_multipart() {
|
||||||
|
if (__ENV.SLEEP_WRITE) {
|
||||||
|
sleep(__ENV.SLEEP_WRITE);
|
||||||
|
}
|
||||||
|
|
||||||
|
const key = __ENV.OBJ_NAME || uuidv4();
|
||||||
|
const bucket = bucket_list[Math.floor(Math.random() * bucket_list.length)];
|
||||||
|
|
||||||
|
const {payload, hash} = generator.genPayload(registry_enabled);
|
||||||
|
const resp = s3_client.multipart(bucket, key, write_multipart_part_size, write_multipart_vu_count, payload);
|
||||||
|
if (!resp.success) {
|
||||||
|
log.withFields({bucket: bucket, key: key}).error(resp.error);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (obj_registry) {
|
||||||
|
obj_registry.addObject("", "", bucket, key, hash);
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in a new issue