support multipart upload

Signed-off-by: Мария Малыгина <m.malygina@yadro.com>
This commit is contained in:
Мария Малыгина 2023-07-18 13:06:38 +03:00
parent 6182d47b43
commit 560c60513e
5 changed files with 280 additions and 70 deletions

31
go.mod
View file

@ -7,9 +7,10 @@ require (
git.frostfs.info/TrueCloudLab/frostfs-s3-gw v0.27.0-rc.2
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20230705125206-769f6eec0565
git.frostfs.info/TrueCloudLab/tzhash v1.8.0
github.com/aws/aws-sdk-go-v2 v1.18.1
github.com/aws/aws-sdk-go-v2/config v1.18.27
github.com/aws/aws-sdk-go-v2/service/s3 v1.36.0
github.com/aws/aws-sdk-go-v2 v1.19.0
github.com/aws/aws-sdk-go-v2/config v1.18.28
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.11.72
github.com/aws/aws-sdk-go-v2/service/s3 v1.37.0
github.com/dop251/goja v0.0.0-20230626124041-ba8a63e79201
github.com/go-loremipsum/loremipsum v1.1.3
github.com/google/uuid v1.3.0
@ -33,19 +34,19 @@ require (
github.com/antlr4-go/antlr/v4 v4.13.0 // indirect
github.com/aws/aws-sdk-go v1.44.296 // indirect
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.4.10 // indirect
github.com/aws/aws-sdk-go-v2/credentials v1.13.26 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.13.4 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.34 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.28 // indirect
github.com/aws/aws-sdk-go-v2/internal/ini v1.3.35 // indirect
github.com/aws/aws-sdk-go-v2/internal/v4a v1.0.26 // indirect
github.com/aws/aws-sdk-go-v2/credentials v1.13.27 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.13.5 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.35 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.29 // indirect
github.com/aws/aws-sdk-go-v2/internal/ini v1.3.36 // indirect
github.com/aws/aws-sdk-go-v2/internal/v4a v1.0.27 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.9.11 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.1.29 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.28 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.14.3 // indirect
github.com/aws/aws-sdk-go-v2/service/sso v1.12.12 // indirect
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.14.12 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.19.2 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.1.30 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.29 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.14.4 // indirect
github.com/aws/aws-sdk-go-v2/service/sso v1.12.13 // indirect
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.14.13 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.19.3 // indirect
github.com/aws/smithy-go v1.13.5 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/bluele/gcache v0.0.2 // indirect

BIN
go.sum

Binary file not shown.

View file

@ -5,11 +5,13 @@ import (
"context"
"crypto/sha256"
"encoding/hex"
"fmt"
"strconv"
"time"
"git.frostfs.info/TrueCloudLab/xk6-frostfs/internal/stats"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/feature/s3/manager"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/aws/aws-sdk-go-v2/service/s3/types"
"github.com/dop251/goja"
@ -70,6 +72,38 @@ func (c *Client) Put(bucket, key string, payload goja.ArrayBuffer) PutResponse {
return PutResponse{Success: true}
}
const multipartUploadMinPartSize = 5 * 1024 * 1024 // 5MB
func (c *Client) Multipart(bucket, key string, objPartSize int, payload goja.ArrayBuffer, concurrency int) PutResponse {
if objPartSize < multipartUploadMinPartSize {
stats.Report(c.vu, objPutFails, 1)
return PutResponse{Success: false, Error: fmt.Sprintf("part size '%d' must be greater than '%d'(5 MB)", objPartSize, multipartUploadMinPartSize)}
}
if concurrency < 1 {
stats.Report(c.vu, objPutFails, 1)
return PutResponse{Success: false, Error: fmt.Sprintf("number of parts to upload in parallel must be greater than 0")}
}
start := time.Now()
uploader := manager.NewUploader(c.cli, func(u *manager.Uploader) {
u.PartSize = int64(objPartSize)
u.Concurrency = concurrency
})
_, err := uploader.Upload(c.vu.Context(), &s3.PutObjectInput{
Bucket: aws.String(bucket),
Key: aws.String(key),
Body: bytes.NewReader(payload.Bytes()),
})
if err != nil {
stats.Report(c.vu, objPutFails, 1)
return PutResponse{Success: false, Error: err.Error()}
}
size := len(payload.Bytes())
stats.Report(c.vu, objPutTotal, 1)
stats.ReportDataSent(c.vu, float64(size))
stats.Report(c.vu, objPutDuration, metrics.D(time.Since(start)))
return PutResponse{Success: true}
}
func (c *Client) Delete(bucket, key string) DeleteResponse {
start := time.Now()

View file

@ -2,6 +2,7 @@
**Note:** you can provide file with all environment variables (system env variables overrides env from file) using
`-e ENV_FILE=.env` (relative path to that file must start from working directory):
```shell
$ ./k6 run -e ENV_FILE=.env some-scenario.js
```
@ -9,21 +10,27 @@ $ ./k6 run -e ENV_FILE=.env some-scenario.js
## Common options for all scenarios:
Scenarios `grpc.js`, `local.js`, `http.js` and `s3.js` support the following options:
* `DURATION` - duration of scenario in seconds.
* `READERS` - number of VUs performing read operations.
* `WRITERS` - number of VUs performing write operations.
* `REGISTRY_FILE` - if set, all produced objects will be stored in database for subsequent verification. Database file name will be set to the value of `REGISTRY_FILE`.
* `REGISTRY_FILE` - if set, all produced objects will be stored in database for subsequent verification. Database file
name will be set to the value of `REGISTRY_FILE`.
* `WRITE_OBJ_SIZE` - object size in kb for write(PUT) operations.
* `PREGEN_JSON` - path to json file with pre-generated containers and objects (in case of http scenario we use json pre-generated for grpc scenario).
* `PREGEN_JSON` - path to json file with pre-generated containers and objects (in case of http scenario we use json
pre-generated for grpc scenario).
* `SLEEP_WRITE` - time interval (in seconds) between writing VU iterations.
* `SLEEP_READ` - time interval (in seconds) between reading VU iterations.
* `SELECTION_SIZE` - size of batch to select for deletion (default: 1000).
* `PAYLOAD_TYPE` - type of an object payload ("random" or "text", default: "random").
Additionally, the profiling extension can be enabled to generate CPU and memory profiles which can be inspected with `go tool pprof file.prof`:
Additionally, the profiling extension can be enabled to generate CPU and memory profiles which can be inspected
with `go tool pprof file.prof`:
```shell
$ ./k6 run --out profile (...)
```
The profiles are saved in the current directory as `cpu.prof` and `mem.prof`, respectively.
## Common options for the local scenarios:
@ -41,8 +48,11 @@ The tests will use all pre-created containers for PUT operations and all pre-cre
```shell
$ ./scenarios/preset/preset_grpc.py --size 1024 --containers 1 --out grpc.json --endpoint host1:8080 --preload_obj 500 --policy "REP 2 IN X CBF 1 SELECT 2 FROM * AS X"
```
* `--policy` - container policy. If parameter is omitted, the default value is "REP 1 IN X CBF 1 SELECT 1 FROM * AS X".
* `--update` - container id. Specify the existing container id, if parameter is omitted the new container will be created.
* `--update` - container id. Specify the existing container id, if parameter is omitted the new container will be
created.
2. Execute scenario with options:
```shell
@ -50,9 +60,13 @@ $ ./k6 run -e DURATION=60 -e WRITE_OBJ_SIZE=8192 -e READERS=20 -e WRITERS=20 -e
```
Options (in addition to the common options):
* `GRPC_ENDPOINTS` - GRPC endpoints of FrostFS storage in format `host:port`. To specify multiple endpoints separate them by comma.
* `DELETERS` - number of VUs performing delete operations (using deleters requires that options `DELETE_AGE` and `REGISTRY_FILE` are specified as well).
* `DELETE_AGE` - age of object in seconds before which it can not be deleted. This parameter can be used to control how many objects we have in the system under load.
* `GRPC_ENDPOINTS` - GRPC endpoints of FrostFS storage in format `host:port`. To specify multiple endpoints separate
them by comma.
* `DELETERS` - number of VUs performing delete operations (using deleters requires that options `DELETE_AGE`
and `REGISTRY_FILE` are specified as well).
* `DELETE_AGE` - age of object in seconds before which it can not be deleted. This parameter can be used to control how
many objects we have in the system under load.
* `SLEEP_DELETE` - time interval (in seconds) between deleting VU iterations.
* `DIAL_TIMEOUT` - timeout to connect to a node (in seconds).
* `STREAM_TIMEOUT` - timeout for a single stream message for `PUT`/`GET` operations (in seconds).
@ -61,7 +75,9 @@ Options (in addition to the common options):
1. Create pre-generated containers or objects:
The tests will use all pre-created containers for PUT operations and all pre-created objects for READ operations. There is no dedicated script to preset HTTP scenario, so we use the same script as for gRPC:
The tests will use all pre-created containers for PUT operations and all pre-created objects for READ operations. There
is no dedicated script to preset HTTP scenario, so we use the same script as for gRPC:
```shell
$ ./scenarios/preset/preset_grpc.py --size 1024 --containers 1 --out grpc.json --endpoint host1:8080 --preload_obj 500
```
@ -73,15 +89,20 @@ $ ./k6 run -e DURATION=60 -e WRITE_OBJ_SIZE=8192 -e READERS=20 -e WRITERS=20 -e
```
Options (in addition to the common options):
* `CONFIG_FILE` - path to the local configuration file used for the storage node. Only the storage configuration section is used.
* `DELETERS` - number of VUs performing delete operations (using deleters requires that options `DELETE_AGE` and `REGISTRY_FILE` are specified as well).
* `DELETE_AGE` - age of object in seconds before which it can not be deleted. This parameter can be used to control how many objects we have in the system under load.
* `CONFIG_FILE` - path to the local configuration file used for the storage node. Only the storage configuration section
is used.
* `DELETERS` - number of VUs performing delete operations (using deleters requires that options `DELETE_AGE`
and `REGISTRY_FILE` are specified as well).
* `DELETE_AGE` - age of object in seconds before which it can not be deleted. This parameter can be used to control how
many objects we have in the system under load.
## HTTP
1. Create pre-generated containers or objects:
There is no dedicated script to preset HTTP scenario, so we use the same script as for gRPC:
```shell
$ ./scenarios/preset/preset_grpc.py --size 1024 --containers 1 --out grpc.json --endpoint host1:8080 --preload_obj 500
```
@ -93,7 +114,9 @@ $ ./k6 run -e DURATION=60 -e WRITE_OBJ_SIZE=8192 -e READERS=10 -e WRITERS=20 -e
```
Options (in addition to the common options):
* `HTTP_ENDPOINTS` - endpoints of HTTP gateways in format `host:port`. To specify multiple endpoints separate them by comma.
* `HTTP_ENDPOINTS` - endpoints of HTTP gateways in format `host:port`. To specify multiple endpoints separate them by
comma.
## S3
@ -120,7 +143,9 @@ The tests will use all pre-created buckets for PUT operations and all pre-create
```shell
$ ./scenarios/preset/preset_s3.py --size 1024 --buckets 1 --out s3_1024kb.json --endpoint host1:8084 --preload_obj 500 --location load-1-4
```
* '--location' - specify the name of container policy (from policy.json file). It's important to run 'aws configure' each time when the policy file has been changed to pick up the latest policies.
* '--location' - specify the name of container policy (from policy.json file). It's important to run 'aws configure'
each time when the policy file has been changed to pick up the latest policies.
3. Execute scenario with options:
@ -129,26 +154,56 @@ $ ./k6 run -e DURATION=60 -e WRITE_OBJ_SIZE=8192 -e READERS=20 -e WRITERS=20 -e
```
Options (in addition to the common options):
* `S3_ENDPOINTS` - endpoints of S3 gateways in format `host:port`. To specify multiple endpoints separate them by comma.
* `DELETERS` - number of VUs performing delete operations (using deleters requires that options `DELETE_AGE` and `REGISTRY_FILE` are specified as well).
* `DELETE_AGE` - age of object in seconds before which it can not be deleted. This parameter can be used to control how many objects we have in the system under load.
* `DELETERS` - number of VUs performing delete operations (using deleters requires that options `DELETE_AGE`
and `REGISTRY_FILE` are specified as well).
* `DELETE_AGE` - age of object in seconds before which it can not be deleted. This parameter can be used to control how
many objects we have in the system under load.
* `SLEEP_DELETE` - time interval (in seconds) between deleting VU iterations.
* `OBJ_NAME` - if specified, this name will be used for all write operations instead of random generation.
## S3 Multipart
Perform multipart upload operation, break up large objects, so they can be transferred in multiple parts, in parallel
```shell
$ ./k6 run -e DURATION=600 \
-e WRITERS=400 -e WRITERS_MULTIPART=10 \
-e WRITE_OBJ_SIZE=524288 -e WRITE_OBJ_PART_SIZE=10240 \
-e S3_ENDPOINTS=10.78.70.142:8084,10.78.70.143:8084,10.78.70.144:8084,10.78.70.145:8084 \
-e PREGEN_JSON=/home/service/s3_4kb.json \
scenarios/s3_multipart.js
```
Options:
* `S3_ENDPOINTS` - - endpoints of S3 gateways in format `host:port`. To specify multiple endpoints separate them by
comma.
* `WRITERS` - number of VUs performing write operations.
* `WRITERS_MULTIPART` - number of parts to upload in parallel.
* `WRITE_OBJ_PART_SIZE` - specifies the buffer size, in bytes, of each part to upload. The minimum size per part is 5MB
MiB.
## S3 Local
1. Follow steps 1. and 2. from the normal S3 scenario in order to obtain credentials and a preset file with the information about the buckets and objects that were pre-created.
2. Assuming the preset file was named `pregen.json`, we need to populate the bucket-to-container mapping before running the local S3 scenario:
1. Follow steps 1. and 2. from the normal S3 scenario in order to obtain credentials and a preset file with the
information about the buckets and objects that were pre-created.
2. Assuming the preset file was named `pregen.json`, we need to populate the bucket-to-container mapping before running
the local S3 scenario:
**WARNING**: Be aware that this command will overwrite the `containers` list field in `pregen.json` file. Make a backup if needed beforehand.
**WARNING**: Be aware that this command will overwrite the `containers` list field in `pregen.json` file. Make a backup
if needed beforehand.
```shell
$ ./scenarios/preset/resolve_containers_in_preset.py --endpoint s3host:8080 --preset_file pregen.json
```
After this, the `pregen.json` file will contain a `containers` list field the same length as `buckets`, which is the mapping of bucket name to container ID in the order they appear.
After this, the `pregen.json` file will contain a `containers` list field the same length as `buckets`, which is the
mapping of bucket name to container ID in the order they appear.
3. Execute the scenario with the desired options. For example:
```shell
$ ./k6 run -e DURATION=60 -e WRITE_OBJ_SIZE=8192 -e READERS=20 -e WRITERS=20 -e CONFIG_FILE=/path/to/node/config.yml -e PREGEN_JSON=pregen.json scenarios/s3local.js
```
@ -156,11 +211,14 @@ $ ./k6 run -e DURATION=60 -e WRITE_OBJ_SIZE=8192 -e READERS=20 -e WRITERS=20 -e
Note that the `s3local` scenario currently does not support deleters.
Options (in addition to the common options):
* `OBJ_NAME` - if specified, this name will be used for all write operations instead of random generation.
## Verify
This scenario allows to verify that objects created by a previous run are really stored in the system and their data is not corrupted. Running this scenario assumes that you've already run gRPC or HTTP or S3 scenario with option `REGISTRY_FILE`.
This scenario allows to verify that objects created by a previous run are really stored in the system and their data is
not corrupted. Running this scenario assumes that you've already run gRPC or HTTP or S3 scenario with
option `REGISTRY_FILE`.
To verify stored objects execute scenario with options:
@ -168,16 +226,21 @@ To verify stored objects execute scenario with options:
./k6 run -e CLIENTS=200 -e TIME_LIMIT=120 -e GRPC_ENDPOINTS=host1:8080,host2:8080 -e S3_ENDPOINTS=host1:8084,host2:8084 -e REGISTRY_FILE=registry.bolt scenarios/verify.js
```
Scenario picks up all objects in `created` status. If object is stored correctly, its' status will be changed into `verified`. If object does not exist or its' data is corrupted, then the status will be changed into `invalid`.
Scenario picks up all objects in `created` status. If object is stored correctly, its' status will be changed
into `verified`. If object does not exist or its' data is corrupted, then the status will be changed into `invalid`.
Scenario ends as soon as all objects are checked or time limit is exceeded.
Running `VERIFY` scenario modifies status of objects in `REGISTRY_FILE`. Objects that have been verified once won't be verified again. If you would like to verify the same set of objects multiple times, you can create a copy of `REGISTRY_FILE` produced by the `LOAD` scenario and run `VERIFY` against the copy of the file.
Running `VERIFY` scenario modifies status of objects in `REGISTRY_FILE`. Objects that have been verified once won't be
verified again. If you would like to verify the same set of objects multiple times, you can create a copy
of `REGISTRY_FILE` produced by the `LOAD` scenario and run `VERIFY` against the copy of the file.
Objects produced by HTTP scenario will be verified via gRPC endpoints.
Options:
* `CLIENTS` - number of VUs for verifying objects (VU can handle both GRPC and S3 objects)
* `TIME_LIMIT` - amount of time in seconds that is sufficient to verify all objects. If this time interval ends, then verification process will be interrupted and objects that have not been checked will stay in the `created` state.
* `TIME_LIMIT` - amount of time in seconds that is sufficient to verify all objects. If this time interval ends, then
verification process will be interrupted and objects that have not been checked will stay in the `created` state.
* `REGISTRY_FILE` - database file from which objects for verification should be read.
* `SLEEP` - time interval (in seconds) between VU iterations.
* `SELECTION_SIZE` - size of batch to select for deletion (default: 1000).
@ -187,20 +250,24 @@ Options:
## Verify preset
Check what all preset objects is in cluster and can be got:
```
./scenarios/preset/check_objects_in_preset.py --endpoint az:8080 --preset_file ./scenarios/presets/grpc_1Mb_c1_o100.json
```
Options:
* `--endpoint` - endpoint to get objects
* `--preset_file` - path to preset file
Check what all objects in preset is compliance with container policy and get distribution of keys:
```
./scenarios/preset/check_policy_compliance.py --endpoints "az:8080,buky:8080,vedi:8080,glagoli:8080" --expected_copies 2 --preset_file "./scenarios/presets/grpc_10Mb_c100_o400.json"
```
Options:
* `--endpoints` - list of all live endpoints in cluster (comma separated)
* `--preset_file` - path to preset file
* `--expected_copies` - amount of expected copies for each object

108
scenarios/s3_multipart.js Normal file
View file

@ -0,0 +1,108 @@
import datagen from 'k6/x/frostfs/datagen';
import logging from 'k6/x/frostfs/logging';
import registry from 'k6/x/frostfs/registry';
import s3 from 'k6/x/frostfs/s3';
import {SharedArray} from 'k6/data';
import {sleep} from 'k6';
import {textSummary} from './libs/k6-summary-0.0.2.js';
import {parseEnv} from './libs/env-parser.js';
import {uuidv4} from './libs/k6-utils-1.4.0.js';
parseEnv();
const obj_list = new SharedArray('obj_list', function () {
return JSON.parse(open(__ENV.PREGEN_JSON)).objects;
});
const bucket_list = new SharedArray('bucket_list', function () {
return JSON.parse(open(__ENV.PREGEN_JSON)).buckets;
});
const read_size = JSON.parse(open(__ENV.PREGEN_JSON)).obj_size;
const summary_json = __ENV.SUMMARY_JSON || "/tmp/summary.json";
// Select random S3 endpoint for current VU
const s3_endpoints = __ENV.S3_ENDPOINTS.split(',');
const s3_endpoint = s3_endpoints[Math.floor(Math.random() * s3_endpoints.length)];
const s3_client = s3.connect(`http://${s3_endpoint}`);
const log = logging.new().withField("endpoint", s3_endpoint);
const registry_enabled = !!__ENV.REGISTRY_FILE;
const obj_registry = registry_enabled ? registry.open(__ENV.REGISTRY_FILE) : undefined;
const duration = __ENV.DURATION;
const generator = datagen.generator(1024 * parseInt(__ENV.WRITE_OBJ_SIZE), __ENV.PAYLOAD_TYPE || "");
const scenarios = {};
// write parts in parallel
const write_multipart_vu_count = parseInt(__ENV.WRITERS_MULTIPART || '0');
if (write_multipart_vu_count < 1) {
throw 'number of parts to upload in parallel should be greater than 0';
}
const write_vu_count = parseInt(__ENV.WRITERS || '0');
if (write_vu_count < 1) {
throw 'number of VUs performing write operations should be greater than 0';
}
if (write_vu_count > 0) {
scenarios.write = {
executor: 'constant-vus',
vus: write_vu_count,
duration: `${duration}s`,
exec: 'obj_write_multipart',
gracefulStop: '5s',
};
}
export const options = {
scenarios,
setupTimeout: '5s',
};
export function setup() {
const total_vu_count = write_vu_count * write_multipart_vu_count;
console.log(`Pregenerated buckets: ${bucket_list.length}`);
console.log(`Pregenerated read object size: ${read_size}`);
console.log(`Pregenerated total objects: ${obj_list.length}`);
console.log(`Writing VUs: ${write_vu_count}`);
console.log(`Writing multipart VUs: ${write_multipart_vu_count}`);
console.log(`Total VUs: ${total_vu_count}`);
}
export function teardown(data) {
if (obj_registry) {
obj_registry.close();
}
}
export function handleSummary(data) {
return {
'stdout': textSummary(data, {indent: ' ', enableColors: false}),
[summary_json]: JSON.stringify(data),
};
}
const write_multipart_part_size = 1024 * parseInt(__ENV.WRITE_OBJ_PART_SIZE || '0')
export function obj_write_multipart() {
if (__ENV.SLEEP_WRITE) {
sleep(__ENV.SLEEP_WRITE);
}
const key = __ENV.OBJ_NAME || uuidv4();
const bucket = bucket_list[Math.floor(Math.random() * bucket_list.length)];
const {payload, hash} = generator.genPayload(registry_enabled);
const resp = s3_client.multipart(bucket, key, write_multipart_part_size, payload, write_multipart_vu_count);
if (!resp.success) {
log.withFields({bucket: bucket, key: key}).error(resp.error);
return;
}
if (obj_registry) {
obj_registry.addObject("", "", bucket, key, hash);
}
}