Compare commits

...

38 commits

Author SHA1 Message Date
9c866ca6e1 Monitor VU cancel
Signed-off-by: Alex Vanin <a.vanin@yadro.com>
2023-10-31 14:46:28 +03:00
96e0e0188e Set background timeout
Signed-off-by: Alex Vanin <a.vanin@yadro.com>
2023-10-31 14:35:50 +03:00
48a95bc50b remove http from s3 multipart upload load scenario, protocol would be set in endpoint parameter
Signed-off-by: m.malygina <m.malygina@yadro.com>
2023-10-27 13:22:38 +03:00
26f5262b3d [#90] Support config folder together with config file
Signed-off-by: Anton Nikiforov <an.nikiforov@yadro.com>
2023-10-25 16:48:29 +03:00
95ce6f1162 [#96] .forgejo: Copy tests workflow from node
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2023-10-19 11:45:10 +03:00
27db0ac943 [#96] .forgejo: Fix DCO action
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2023-10-19 11:43:52 +03:00
e970e52eea [#96] .forgejo: Move workflows folder from .github
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2023-10-19 11:43:52 +03:00
1311051f60 [#99] Adding read age param to improve k6 runs stability
Signed-off-by: Andrey Berezin <a.berezin@yadro.com>
2023-10-02 20:08:43 +03:00
7db7751334 [#95] Allow to use wallet from config file for frostfs-cli
Signed-off-by: Anton Nikiforov <an.nikiforov@yadro.com>
2023-08-23 15:01:53 +03:00
bf884936a7 [#91] Improve logging for preset
Signed-off-by: Andrey Berezin <a.berezin@yadro.com>
2023-08-16 08:25:03 +00:00
108e761639 [#93] go.mod: Update go.k6.io/k6 package to patched version
* The update fixes bug with k6 build

Signed-off-by: Airat Arifullin <a.arifullin@yadro.com>
2023-08-11 18:52:14 +03:00
5b1793f248 [#30] report: log start and end time of load scenario
Signed-off-by: Airat Arifullin a.arifullin@yadro.com
2023-07-26 14:54:59 +00:00
4ef3795e04 [#84] preset: fix typo
Signed-off-by: Andrey Berezin <a.berezin@yadro.com>
2023-07-21 06:55:40 +00:00
704c0f06bc [#25] selector: Remove next object timeout
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2023-07-20 15:01:57 +03:00
0dc0ba1704 [#25] xk6: Read objects from registry for S3 tests
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2023-07-20 10:01:41 +03:00
3c26e7c917 [#25] xk6: Read objects from registry for gRPC tests
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2023-07-20 10:01:34 +03:00
50e2f55362 [#80] Add dump registry util
Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
2023-07-19 15:57:39 +03:00
77d3dd8d6e [#80] Support parallel multipart
Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
2023-07-19 10:44:44 +03:00
6182d47b43 [#81] remove schema from preset_s3 and k6 load s3 scenarios 2023-07-14 11:36:10 +00:00
ff6814e15d [#72] Add option --prepare-locally
Signed-off-by: Anton Nikiforov <an.nikiforov@yadro.com>
2023-07-07 13:16:54 +03:00
56235f5e90 [#72] Update dependencies
Signed-off-by: Anton Nikiforov <an.nikiforov@yadro.com>
2023-07-06 12:14:52 +03:00
f633f9a64a [#79] client: Remove bufSize field
Use constant value instead.

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2023-07-06 11:27:33 +03:00
42f1881580 [#79] object put: Add chunk size parameter
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2023-07-06 11:27:33 +03:00
4972bb928e [#79] xk6: Update node and SDK-Go
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2023-07-05 15:37:06 +03:00
a1f5738d2f [#77] Use writecache in local scenarios
Signed-off-by: Alejandro Lopez <a.lopez@yadro.com>
2023-06-30 12:50:42 +00:00
8e99d08aa4 [#12] Allow using multiple endpoints for presets
Signed-off-by: Andrey Berezin <a.berezin@yadro.com>
2023-06-28 20:21:43 +03:00
ba04c682cb [#13] Allow to use english text in the payload
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2023-06-27 11:14:05 +00:00
3525d5b4e3 [#15] go.mod: Tidy
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2023-06-27 11:14:05 +00:00
62d7b78131 [#73] preset: Allow to sleep before putting objects
For large networks block propagation may take some time.
If we do not wait enough, putting objects can fail for some containers.

Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2023-06-25 13:14:15 +03:00
153390cedb [#65] go.mod: Move to go1.19
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2023-05-24 11:41:02 +03:00
1025e80f11 [#65] go.mod: Update dependencies
See https://github.com/grafana/k6/pull/3075/, it is not yet in any
release.

Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2023-05-24 11:41:02 +03:00
6151005b4d [#67] Fail k6 if preset fails
Signed-off-by: Andrey Berezin <a.berezin@yadro.com>
2023-05-24 08:39:37 +00:00
925fe3ec83 [#66] scenarios: Exit if there is nothing to verify
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2023-05-23 16:02:12 +03:00
4aa9a359b5 [#64] registry: Delete object from the old bucket
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2023-05-23 16:02:03 +03:00
52ed0d6d88 [#63] scenarios: Unify logs in verify script
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2023-05-23 16:02:03 +03:00
4c2678077b [#57] preset: Use temporary file for payload
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2023-05-22 09:06:10 +00:00
5a1191a1ab [#20] Add pprof extension with support for cpu and mem
Signed-off-by: Alejandro Lopez <a.lopez@yadro.com>
2023-05-18 15:08:33 +00:00
5c26b4bad4 [#61] scenarios: Fix setObjectStatus parameters
We were providing new status instead of old and expected it to exist in
DB.

Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2023-05-18 09:43:41 +03:00
57 changed files with 1627 additions and 1612 deletions

View file

Before

Width:  |  Height:  |  Size: 5.5 KiB

After

Width:  |  Height:  |  Size: 5.5 KiB

View file

@ -0,0 +1,21 @@
name: DCO action
on: [pull_request]
jobs:
dco:
name: DCO
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
with:
fetch-depth: 0
- name: Setup Go
uses: actions/setup-go@v3
with:
go-version: '1.21'
- name: Run commit format checker
uses: https://git.frostfs.info/TrueCloudLab/dco-go@v2
with:
from: 'origin/${{ github.event.pull_request.base.ref }}'

View file

@ -0,0 +1,38 @@
name: Tests and linters
on: [pull_request]
jobs:
tests:
name: Tests
runs-on: ubuntu-latest
strategy:
matrix:
go_versions: [ '1.20', '1.21' ]
fail-fast: false
steps:
- uses: actions/checkout@v3
- name: Set up Go
uses: actions/setup-go@v3
with:
go-version: '${{ matrix.go_versions }}'
cache: true
- name: Run tests
run: make test
tests-race:
name: Tests with -race
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Set up Go
uses: actions/setup-go@v3
with:
go-version: '1.21'
cache: true
- name: Run tests
run: go test ./... -count=1 -race

View file

@ -1,21 +0,0 @@
name: DCO check
on:
pull_request:
branches:
- master
jobs:
commits_check_job:
runs-on: ubuntu-latest
name: Commits Check
steps:
- name: Get PR Commits
id: 'get-pr-commits'
uses: tim-actions/get-pr-commits@master
with:
token: ${{ secrets.GITHUB_TOKEN }}
- name: DCO Check
uses: tim-actions/dco@master
with:
commits: ${{ steps.get-pr-commits.outputs.commits }}

View file

@ -1,34 +0,0 @@
name: Tests
on:
pull_request:
branches:
- master
types: [opened, synchronize]
paths-ignore:
- '**/*.md'
workflow_dispatch:
jobs:
lint:
name: Lint
runs-on: ubuntu-20.04
steps:
- name: Check out code
uses: actions/checkout@v2
- name: golangci-lint
uses: golangci/golangci-lint-action@v2
with:
version: latest
args: --timeout=2m
tests:
name: Tests
runs-on: ubuntu-20.04
strategy:
matrix:
go_versions: [ '1.17', '1.18', '1.19' ]
fail-fast: false
steps:
- uses: actions/checkout@v3

1
.gitignore vendored
View file

@ -1,3 +1,4 @@
k6 k6
*.bolt *.bolt
presets presets
bin

94
Makefile Normal file
View file

@ -0,0 +1,94 @@
#!/usr/bin/make -f
# Common variables
REPO ?= $(shell go list -m)
VERSION ?= $(shell git describe --tags --dirty --match "v*" --always --abbrev=8 2>/dev/null || cat VERSION 2>/dev/null || echo "develop")
GO_VERSION ?= 1.19
LINT_VERSION ?= 1.49.0
BINDIR = bin
# Binaries to build
CMDS = $(addprefix frostfs-, $(notdir $(wildcard cmd/*)))
BINS = $(addprefix $(BINDIR)/, $(CMDS))
.PHONY: all $(BINS) $(BINDIR) dep docker/ test cover format lint docker/lint pre-commit unpre-commit version clean
# Make all binaries
all: $(BINS)
$(BINS): $(BINDIR) dep
@echo "⇒ Build $@"
CGO_ENABLED=0 \
go build -v -trimpath \
-ldflags "-X $(REPO)/internal/version.Version=$(VERSION)" \
-o $@ ./cmd/$(subst frostfs-,,$(notdir $@))
$(BINDIR):
@echo "⇒ Ensure dir: $@"
@mkdir -p $@
# Pull go dependencies
dep:
@printf "⇒ Download requirements: "
@CGO_ENABLED=0 \
go mod download && echo OK
@printf "⇒ Tidy requirements: "
@CGO_ENABLED=0 \
go mod tidy -v && echo OK
# Run `make %` in Golang container, for more information run `make help.docker/%`
docker/%:
$(if $(filter $*,all $(BINS)), \
@echo "=> Running 'make $*' in clean Docker environment" && \
docker run --rm -t \
-v `pwd`:/src \
-w /src \
-u `stat -c "%u:%g" .` \
--env HOME=/src \
golang:$(GO_VERSION) make $*,\
@echo "supported docker targets: all $(BINS) lint")
# Run tests
test:
@go test ./... -cover
# Run tests with race detection and produce coverage output
cover:
@go test -v -race ./... -coverprofile=coverage.txt -covermode=atomic
@go tool cover -html=coverage.txt -o coverage.html
# Reformat code
format:
@echo "⇒ Processing gofmt check"
@gofmt -s -w ./
# Run linters
lint:
@golangci-lint --timeout=5m run
# Run linters in Docker
docker/lint:
docker run --rm -it \
-v `pwd`:/src \
-u `stat -c "%u:%g" .` \
--env HOME=/src \
golangci/golangci-lint:v$(LINT_VERSION) bash -c 'cd /src/ && make lint'
# Activate pre-commit hooks
pre-commit:
pre-commit install -t pre-commit -t commit-msg
# Deactivate pre-commit hooks
unpre-commit:
pre-commit uninstall -t pre-commit -t commit-msg
# Show current version
version:
@echo $(VERSION)
# Clean up files
clean:
rm -rf .cache
rm -rf $(BINDIR)
include help.mk

View file

@ -47,10 +47,11 @@ Create native client with `connect` method. Arguments:
- hex encoded private key (empty value produces random key) - hex encoded private key (empty value produces random key)
- dial timeout in seconds (0 for the default value) - dial timeout in seconds (0 for the default value)
- stream timeout in seconds (0 for the default value) - stream timeout in seconds (0 for the default value)
- generate object header on the client side (for big object - split locally too)
```js ```js
import native from 'k6/x/frostfs/native'; import native from 'k6/x/frostfs/native';
const frostfs_cli = native.connect("s01.frostfs.devenv:8080", "", 0, 0) const frostfs_cli = native.connect("s01.frostfs.devenv:8080", "", 0, 0, false)
``` ```
### Methods ### Methods
@ -73,12 +74,13 @@ const frostfs_cli = native.connect("s01.frostfs.devenv:8080", "", 0, 0)
Create a local client with `connect` method. Arguments: Create a local client with `connect` method. Arguments:
- local path to frostfs storage node configuration file - local path to frostfs storage node configuration file
- local path to frostfs storage node configuration directory
- hex encoded private key (empty value produces random key) - hex encoded private key (empty value produces random key)
- whether to use the debug logger (warning: very verbose) - whether to use the debug logger (warning: very verbose)
```js ```js
import local from 'k6/x/frostfs/local'; import local from 'k6/x/frostfs/local';
const local_client = local.connect("/path/to/config.yaml", "", false) const local_client = local.connect("/path/to/config.yaml", "/path/to/config/dir", "", false)
``` ```
### Methods ### Methods
@ -98,13 +100,13 @@ Credentials are taken from default AWS configuration files and ENVs.
```js ```js
import s3 from 'k6/x/frostfs/s3'; import s3 from 'k6/x/frostfs/s3';
const s3_cli = s3.connect("http://s3.frostfs.devenv:8080") const s3_cli = s3.connect("https://s3.frostfs.devenv:8080")
``` ```
You can also provide additional options: You can also provide additional options:
```js ```js
import s3 from 'k6/x/frostfs/s3'; import s3 from 'k6/x/frostfs/s3';
const s3_cli = s3.connect("http://s3.frostfs.devenv:8080", {'no_verify_ssl': 'true', 'timeout': '60s'}) const s3_cli = s3.connect("https://s3.frostfs.devenv:8080", {'no_verify_ssl': 'true', 'timeout': '60s'})
``` ```
* `no_verify_ss` - Bool. If `true` - skip verifying the s3 certificate chain and host name (useful if s3 uses self-signed certificates) * `no_verify_ss` - Bool. If `true` - skip verifying the s3 certificate chain and host name (useful if s3 uses self-signed certificates)
@ -122,6 +124,7 @@ const s3_cli = s3.connect("http://s3.frostfs.devenv:8080", {'no_verify_ssl': 'tr
Create local s3 client with `connect` method. Arguments: Create local s3 client with `connect` method. Arguments:
- local path to frostfs storage node configuration file - local path to frostfs storage node configuration file
- local path to frostfs storage node configuration directory
- parameter map with the following options: - parameter map with the following options:
* `hex_key`: private key to use as a hexadecimal string. A random one is created if none is provided. * `hex_key`: private key to use as a hexadecimal string. A random one is created if none is provided.
* `node_position`: position of this node in the node array if loading multiple nodes independently (default: 0). * `node_position`: position of this node in the node array if loading multiple nodes independently (default: 0).
@ -134,7 +137,7 @@ Create local s3 client with `connect` method. Arguments:
import local from 'k6/x/frostfs/local'; import local from 'k6/x/frostfs/local';
const params = {'node_position': 1, 'node_count': 3} const params = {'node_position': 1, 'node_count': 3}
const bucketMapping = {'mytestbucket': 'GBQDDUM1hdodXmiRHV57EUkFWJzuntsG8BG15wFSwam6'} const bucketMapping = {'mytestbucket': 'GBQDDUM1hdodXmiRHV57EUkFWJzuntsG8BG15wFSwam6'}
const local_client = local.connect("/path/to/config.yaml", params, bucketMapping) const local_client = local.connect("/path/to/config.yaml", "/path/to/config/dir", params, bucketMapping)
``` ```
### Methods ### Methods
@ -147,6 +150,41 @@ const local_client = local.connect("/path/to/config.yaml", params, bucketMapping
See native protocol and s3 test suite examples in [examples](./examples) dir. See native protocol and s3 test suite examples in [examples](./examples) dir.
# Command line utils
To build all command line utils just run:
```shell
$ make
```
All binaries will be in `bin` directory.
## Export registry db
You can export registry bolt db to json file, that can be used as pregen for scenarios (see [docs](./scenarios/run_scenarios.md)).
To do this use `frostfs-xk6-registry-exporter`, available flags can be seen in help:
```shell
$ ./bin/frostfs-xk6-registry-exporter -h
Registry exporter for xk6
Usage:
registry-exporter [flags]
Examples:
registry-exporter registry.bolt
registry-exporter --status created --out out.json registry.bolt
Flags:
--age int Object age
--format string Output format (default "json")
-h, --help help for registry-exporter
--out string Path to output file (default "dumped-registry.json")
--status string Object status (default "created")
-v, --version version for registry-exporter
```
# License # License
- [GNU General Public License v3.0](LICENSE) - [GNU General Public License v3.0](LICENSE)

View file

@ -0,0 +1,18 @@
package main
import (
"context"
"os"
"os/signal"
"syscall"
)
func main() {
ctx, _ := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM, syscall.SIGHUP)
if cmd, err := rootCmd.ExecuteContextC(ctx); err != nil {
cmd.PrintErrln("Error:", err.Error())
cmd.PrintErrf("Run '%v --help' for usage.\n", cmd.CommandPath())
os.Exit(1)
}
}

View file

@ -0,0 +1,89 @@
package main
import (
"fmt"
"runtime"
"git.frostfs.info/TrueCloudLab/xk6-frostfs/internal/registry"
"git.frostfs.info/TrueCloudLab/xk6-frostfs/internal/version"
"github.com/spf13/cobra"
)
var rootCmd = &cobra.Command{
Use: "registry-exporter",
Version: version.Version,
Short: "Registry exporter",
Long: "Registry exporter for xk6",
Example: `registry-exporter registry.bolt
registry-exporter --status created --out out.json registry.bolt`,
SilenceErrors: true,
SilenceUsage: true,
RunE: rootCmdRun,
}
const (
outFlag = "out"
formatFlag = "format"
statusFlag = "status"
ageFlag = "age"
)
const (
defaultOutPath = "dumped-registry.json"
jsonFormat = "json"
createdStatus = "created"
)
func init() {
rootCmd.Flags().String(outFlag, defaultOutPath, "Path to output file")
rootCmd.Flags().String(formatFlag, jsonFormat, "Output format")
rootCmd.Flags().String(statusFlag, createdStatus, "Object status")
rootCmd.Flags().Int(ageFlag, 0, "Object age")
cobra.AddTemplateFunc("runtimeVersion", runtime.Version)
rootCmd.SetVersionTemplate(`FrostFS xk6 Registry Exporter
{{printf "Version: %s" .Version }}
GoVersion: {{ runtimeVersion }}
`)
}
func rootCmdRun(cmd *cobra.Command, args []string) error {
if len(args) != 1 {
return fmt.Errorf("expected exacly one non-flag argumet: path to the registry, got: %s", args)
}
format, err := cmd.Flags().GetString(formatFlag)
if err != nil {
return fmt.Errorf("get '%s' flag: %w", formatFlag, err)
}
if format != jsonFormat {
return fmt.Errorf("unknown format '%s', only '%s' is supported", format, jsonFormat)
}
out, err := cmd.Flags().GetString(outFlag)
if err != nil {
return fmt.Errorf("get '%s' flag: %w", outFlag, err)
}
status, err := cmd.Flags().GetString(statusFlag)
if err != nil {
return fmt.Errorf("get '%s' flag: %w", statusFlag, err)
}
age, err := cmd.Flags().GetInt(ageFlag)
if err != nil {
return fmt.Errorf("get '%s' flag: %w", ageFlag, err)
}
objRegistry := registry.NewObjRegistry(cmd.Context(), args[0])
objSelector := registry.NewObjSelector(objRegistry, 0, &registry.ObjFilter{
Status: status,
Age: age,
})
objExporter := registry.NewObjExporter(objSelector)
cmd.Println("Writing result file:", out)
return objExporter.ExportJSONPreGen(out)
}

View file

@ -2,7 +2,7 @@ import local from 'k6/x/frostfs/local';
import { uuidv4 } from '../scenarios/libs/k6-utils-1.4.0.js'; import { uuidv4 } from '../scenarios/libs/k6-utils-1.4.0.js';
const payload = open('../go.sum', 'b'); const payload = open('../go.sum', 'b');
const local_cli = local.connect("/path/to/config.yaml", "", false) const local_cli = local.connect("/path/to/config.yaml", "/path/to/config/dir", "", false)
export const options = { export const options = {
stages: [ stages: [

View file

@ -3,7 +3,7 @@ import { fail } from "k6";
import { uuidv4 } from '../scenarios/libs/k6-utils-1.4.0.js'; import { uuidv4 } from '../scenarios/libs/k6-utils-1.4.0.js';
const payload = open('../go.sum', 'b'); const payload = open('../go.sum', 'b');
const frostfs_cli = native.connect("s01.frostfs.devenv:8080", "1dd37fba80fec4e6a6f13fd708d8dcb3b29def768017052f6c930fa1c5d90bbb", 0, 0) const frostfs_cli = native.connect("s01.frostfs.devenv:8080", "1dd37fba80fec4e6a6f13fd708d8dcb3b29def768017052f6c930fa1c5d90bbb", 0, 0, false)
export const options = { export const options = {
stages: [ stages: [

View file

@ -3,7 +3,7 @@ import { uuidv4 } from '../scenarios/libs/k6-utils-1.4.0.js';
const payload = open('../go.sum', 'b'); const payload = open('../go.sum', 'b');
const container = "AjSxSNNXbJUDPqqKYm1VbFVDGCakbpUNH8aGjPmGAH3B" const container = "AjSxSNNXbJUDPqqKYm1VbFVDGCakbpUNH8aGjPmGAH3B"
const frostfs_cli = native.connect("s01.frostfs.devenv:8080", "", 0, 0) const frostfs_cli = native.connect("s01.frostfs.devenv:8080", "", 0, 0, false)
const frostfs_obj = frostfs_cli.onsite(container, payload) const frostfs_obj = frostfs_cli.onsite(container, payload)
export const options = { export const options = {

View file

@ -3,7 +3,7 @@ import { uuidv4 } from '../scenarios/libs/k6-utils-1.4.0.js';
const bucket = "testbucket" const bucket = "testbucket"
const payload = open('../go.sum', 'b'); const payload = open('../go.sum', 'b');
const s3local_cli = s3local.connect("path/to/storage/config.yml", {}, { const s3local_cli = s3local.connect("path/to/storage/config.yml", "path/to/storage/config/dir", {}, {
'testbucket': 'GBQDDUM1hdodXmiRHV57EUkFWJzuntsG8BG15wFSwam6', 'testbucket': 'GBQDDUM1hdodXmiRHV57EUkFWJzuntsG8BG15wFSwam6',
}); });

View file

@ -6,6 +6,7 @@ import (
_ "git.frostfs.info/TrueCloudLab/xk6-frostfs/internal/local" _ "git.frostfs.info/TrueCloudLab/xk6-frostfs/internal/local"
_ "git.frostfs.info/TrueCloudLab/xk6-frostfs/internal/logging" _ "git.frostfs.info/TrueCloudLab/xk6-frostfs/internal/logging"
_ "git.frostfs.info/TrueCloudLab/xk6-frostfs/internal/native" _ "git.frostfs.info/TrueCloudLab/xk6-frostfs/internal/native"
_ "git.frostfs.info/TrueCloudLab/xk6-frostfs/internal/profile"
_ "git.frostfs.info/TrueCloudLab/xk6-frostfs/internal/registry" _ "git.frostfs.info/TrueCloudLab/xk6-frostfs/internal/registry"
_ "git.frostfs.info/TrueCloudLab/xk6-frostfs/internal/s3" _ "git.frostfs.info/TrueCloudLab/xk6-frostfs/internal/s3"
_ "git.frostfs.info/TrueCloudLab/xk6-frostfs/internal/s3local" _ "git.frostfs.info/TrueCloudLab/xk6-frostfs/internal/s3local"

159
go.mod
View file

@ -1,107 +1,130 @@
module git.frostfs.info/TrueCloudLab/xk6-frostfs module git.frostfs.info/TrueCloudLab/xk6-frostfs
go 1.17 go 1.19
require ( require (
git.frostfs.info/TrueCloudLab/frostfs-node v0.22.2-0.20230313113918-4e244686cf03 git.frostfs.info/TrueCloudLab/frostfs-node v0.22.2-0.20230704155826-b520a3049e6f
git.frostfs.info/TrueCloudLab/frostfs-s3-gw v0.24.1-0.20230403110435-01afa1cae425 git.frostfs.info/TrueCloudLab/frostfs-s3-gw v0.27.0-rc.2
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20230329125804-552219b8e130 git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20230705125206-769f6eec0565
git.frostfs.info/TrueCloudLab/tzhash v1.8.0 git.frostfs.info/TrueCloudLab/tzhash v1.8.0
github.com/aws/aws-sdk-go-v2 v1.16.3 github.com/aws/aws-sdk-go-v2 v1.19.0
github.com/aws/aws-sdk-go-v2/config v1.15.5 github.com/aws/aws-sdk-go-v2/config v1.18.28
github.com/aws/aws-sdk-go-v2/service/s3 v1.26.9 github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.11.72
github.com/dop251/goja v0.0.0-20220405120441-9037c2b61cbf github.com/aws/aws-sdk-go-v2/service/s3 v1.37.0
github.com/dop251/goja v0.0.0-20230626124041-ba8a63e79201
github.com/go-loremipsum/loremipsum v1.1.3
github.com/google/uuid v1.3.0 github.com/google/uuid v1.3.0
github.com/joho/godotenv v1.5.1 github.com/joho/godotenv v1.5.1
github.com/nspcc-dev/neo-go v0.101.0 github.com/nspcc-dev/neo-go v0.101.2
github.com/panjf2000/ants/v2 v2.5.0 github.com/panjf2000/ants/v2 v2.8.0
github.com/sirupsen/logrus v1.8.1 github.com/sirupsen/logrus v1.9.3
github.com/stretchr/testify v1.8.1 github.com/spf13/cobra v1.7.0
go.etcd.io/bbolt v1.3.6 github.com/stretchr/testify v1.8.4
go.k6.io/k6 v0.38.2 go.etcd.io/bbolt v1.3.7
go.k6.io/k6 v0.45.1
go.uber.org/zap v1.24.0 go.uber.org/zap v1.24.0
golang.org/x/sys v0.10.0
) )
require ( require (
git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.11.2-0.20230315095236-9dc375346703 // indirect git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.15.1-0.20230602142716-68021b910acb // indirect
git.frostfs.info/TrueCloudLab/frostfs-crypto v0.6.0 // indirect git.frostfs.info/TrueCloudLab/frostfs-crypto v0.6.0 // indirect
git.frostfs.info/TrueCloudLab/hrw v1.2.0 // indirect git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20230531082742-c97d21411eb6 // indirect
git.frostfs.info/TrueCloudLab/hrw v1.2.1 // indirect
git.frostfs.info/TrueCloudLab/rfc6979 v0.4.0 // indirect git.frostfs.info/TrueCloudLab/rfc6979 v0.4.0 // indirect
github.com/antlr/antlr4/runtime/Go/antlr/v4 v4.0.0-20221202181307-76fa05c21b12 // indirect github.com/antlr4-go/antlr/v4 v4.13.0 // indirect
github.com/aws/aws-sdk-go v1.44.6 // indirect github.com/aws/aws-sdk-go v1.44.296 // indirect
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.4.1 // indirect github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.4.10 // indirect
github.com/aws/aws-sdk-go-v2/credentials v1.12.0 // indirect github.com/aws/aws-sdk-go-v2/credentials v1.13.27 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.12.4 // indirect github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.13.5 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.10 // indirect github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.35 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.4 // indirect github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.29 // indirect
github.com/aws/aws-sdk-go-v2/internal/ini v1.3.11 // indirect github.com/aws/aws-sdk-go-v2/internal/ini v1.3.36 // indirect
github.com/aws/aws-sdk-go-v2/internal/v4a v1.0.1 // indirect github.com/aws/aws-sdk-go-v2/internal/v4a v1.0.27 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.9.1 // indirect github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.9.11 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.1.5 // indirect github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.1.30 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.4 // indirect github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.29 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.13.4 // indirect github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.14.4 // indirect
github.com/aws/aws-sdk-go-v2/service/sso v1.11.4 // indirect github.com/aws/aws-sdk-go-v2/service/sso v1.12.13 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.16.4 // indirect github.com/aws/aws-sdk-go-v2/service/ssooidc v1.14.13 // indirect
github.com/aws/smithy-go v1.11.2 // indirect github.com/aws/aws-sdk-go-v2/service/sts v1.19.3 // indirect
github.com/aws/smithy-go v1.13.5 // indirect
github.com/beorn7/perks v1.0.1 // indirect github.com/beorn7/perks v1.0.1 // indirect
github.com/bluele/gcache v0.0.2 // indirect github.com/bluele/gcache v0.0.2 // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect github.com/cenkalti/backoff/v4 v4.2.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect github.com/davecgh/go-spew v1.1.1 // indirect
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.1 // indirect github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 // indirect
github.com/dlclark/regexp2 v1.4.1-0.20201116162257-a2a8dda75c91 // indirect github.com/dlclark/regexp2 v1.10.0 // indirect
github.com/fatih/color v1.13.0 // indirect github.com/fatih/color v1.15.0 // indirect
github.com/fsnotify/fsnotify v1.6.0 // indirect github.com/fsnotify/fsnotify v1.6.0 // indirect
github.com/go-logr/logr v1.2.4 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-sourcemap/sourcemap v2.1.4-0.20211119122758-180fcef48034+incompatible // indirect github.com/go-sourcemap/sourcemap v2.1.4-0.20211119122758-180fcef48034+incompatible // indirect
github.com/golang/protobuf v1.5.2 // indirect github.com/golang/protobuf v1.5.3 // indirect
github.com/google/pprof v0.0.0-20230705174524-200ffdc848b8 // indirect
github.com/gorilla/mux v1.8.0 // indirect github.com/gorilla/mux v1.8.0 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0 // indirect
github.com/hashicorp/golang-lru v0.6.0 // indirect github.com/hashicorp/golang-lru v0.6.0 // indirect
github.com/hashicorp/golang-lru/v2 v2.0.1 // indirect github.com/hashicorp/golang-lru/v2 v2.0.4 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect github.com/hashicorp/hcl v1.0.0 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/klauspost/compress v1.15.13 // indirect github.com/josharian/intern v1.0.0 // indirect
github.com/klauspost/compress v1.16.7 // indirect
github.com/magiconair/properties v1.8.7 // indirect github.com/magiconair/properties v1.8.7 // indirect
github.com/mailru/easyjson v0.7.7 // indirect github.com/mailru/easyjson v0.7.7 // indirect
github.com/mattn/go-colorable v0.1.12 // indirect github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.14 // indirect github.com/mattn/go-isatty v0.0.19 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
github.com/minio/highwayhash v1.0.2 // indirect github.com/minio/highwayhash v1.0.2 // indirect
github.com/minio/sio v0.3.0 // indirect github.com/minio/sio v0.3.1 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/mr-tron/base58 v1.2.0 // indirect github.com/mr-tron/base58 v1.2.0 // indirect
github.com/mstoykov/atlas v0.0.0-20220811071828-388f114305dd // indirect
github.com/nats-io/jwt/v2 v2.4.1 // indirect github.com/nats-io/jwt/v2 v2.4.1 // indirect
github.com/nats-io/nats.go v1.22.1 // indirect github.com/nats-io/nats.go v1.27.1 // indirect
github.com/nats-io/nkeys v0.4.4 // indirect github.com/nats-io/nkeys v0.4.4 // indirect
github.com/nats-io/nuid v1.0.1 // indirect github.com/nats-io/nuid v1.0.1 // indirect
github.com/nspcc-dev/rfc6979 v0.2.0 // indirect github.com/nspcc-dev/rfc6979 v0.2.0 // indirect
github.com/nxadm/tail v1.4.8 // indirect github.com/onsi/ginkgo v1.16.5 // indirect
github.com/oxtoacart/bpool v0.0.0-20190530202638-03653db5a59c // indirect github.com/onsi/gomega v1.20.2 // indirect
github.com/pelletier/go-toml/v2 v2.0.6 // indirect github.com/pelletier/go-toml/v2 v2.0.8 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_golang v1.13.0 // indirect github.com/prometheus/client_golang v1.16.0 // indirect
github.com/prometheus/client_model v0.2.0 // indirect github.com/prometheus/client_model v0.4.0 // indirect
github.com/prometheus/common v0.37.0 // indirect github.com/prometheus/common v0.44.0 // indirect
github.com/prometheus/procfs v0.8.0 // indirect github.com/prometheus/procfs v0.11.0 // indirect
github.com/serenize/snaker v0.0.0-20201027110005-a7ad2135616e // indirect github.com/serenize/snaker v0.0.0-20201027110005-a7ad2135616e // indirect
github.com/spaolacci/murmur3 v1.1.0 // indirect github.com/spf13/afero v1.9.5 // indirect
github.com/spf13/afero v1.9.3 // indirect github.com/spf13/cast v1.5.1 // indirect
github.com/spf13/cast v1.5.0 // indirect
github.com/spf13/jwalterweatherman v1.1.0 // indirect github.com/spf13/jwalterweatherman v1.1.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect github.com/spf13/pflag v1.0.5 // indirect
github.com/spf13/viper v1.15.0 // indirect github.com/spf13/viper v1.16.0 // indirect
github.com/subosito/gotenv v1.4.2 // indirect github.com/subosito/gotenv v1.4.2 // indirect
go.uber.org/atomic v1.10.0 // indirect github.com/twmb/murmur3 v1.1.8 // indirect
go.uber.org/multierr v1.9.0 // indirect go.opentelemetry.io/otel v1.16.0 // indirect
golang.org/x/crypto v0.7.0 // indirect go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.16.0 // indirect
golang.org/x/exp v0.0.0-20221227203929-1b447090c38c // indirect go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.16.0 // indirect
golang.org/x/net v0.8.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.16.0 // indirect
golang.org/x/sync v0.1.0 // indirect go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.16.0 // indirect
golang.org/x/sys v0.6.0 // indirect go.opentelemetry.io/otel/metric v1.16.0 // indirect
golang.org/x/text v0.8.0 // indirect go.opentelemetry.io/otel/sdk v1.16.0 // indirect
golang.org/x/time v0.1.0 // indirect go.opentelemetry.io/otel/trace v1.16.0 // indirect
google.golang.org/genproto v0.0.0-20221227171554-f9683d7f8bef // indirect go.opentelemetry.io/proto/otlp v0.20.0 // indirect
google.golang.org/grpc v1.52.0 // indirect go.uber.org/atomic v1.11.0 // indirect
google.golang.org/protobuf v1.28.1 // indirect go.uber.org/multierr v1.11.0 // indirect
gopkg.in/guregu/null.v3 v3.3.0 // indirect golang.org/x/crypto v0.11.0 // indirect
golang.org/x/exp v0.0.0-20230626212559-97b1e661b5df // indirect
golang.org/x/net v0.12.0 // indirect
golang.org/x/sync v0.3.0 // indirect
golang.org/x/text v0.11.0 // indirect
golang.org/x/time v0.3.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20230629202037-9506855d4529 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20230629202037-9506855d4529 // indirect
google.golang.org/grpc v1.56.1 // indirect
google.golang.org/protobuf v1.31.0 // indirect
gopkg.in/guregu/null.v3 v3.5.0 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect
) )

1397
go.sum

File diff suppressed because it is too large Load diff

22
help.mk Normal file
View file

@ -0,0 +1,22 @@
.PHONY: help
# Show this help prompt
help:
@echo ' Usage:'
@echo ''
@echo ' make <target>'
@echo ''
@echo ' Targets:'
@echo ''
@awk '/^#/{ comment = substr($$0,3) } comment && /^[a-zA-Z][a-zA-Z0-9.%_/-]+ ?:/{ print " ", $$1, comment }' $(MAKEFILE_LIST) | column -t -s ':' | grep -v 'IGNORE' | sort | uniq
# Show help for docker/% IGNORE
help.docker/%:
$(eval TARGETS:=$(notdir all lint) ${BINS})
@echo ' Usage:'
@echo ''
@echo ' make docker/% -- Run `make %` in Golang container'
@echo ''
@echo ' Supported docker targets:'
@echo ''
@$(foreach bin, $(TARGETS), echo ' ' $(bin);)

View file

@ -1,6 +1,8 @@
package datagen package datagen
import ( import (
"strings"
"go.k6.io/k6/js/modules" "go.k6.io/k6/js/modules"
) )
@ -36,7 +38,7 @@ func (d *Datagen) Exports() modules.Exports {
return modules.Exports{Default: d} return modules.Exports{Default: d}
} }
func (d *Datagen) Generator(size int) *Generator { func (d *Datagen) Generator(size int, typ string) *Generator {
g := NewGenerator(d.vu, size) g := NewGenerator(d.vu, size, strings.ToLower(typ))
return &g return &g
} }

View file

@ -1,12 +1,14 @@
package datagen package datagen
import ( import (
"bytes"
"crypto/sha256" "crypto/sha256"
"encoding/hex" "encoding/hex"
"math/rand" "math/rand"
"time" "time"
"github.com/dop251/goja" "github.com/dop251/goja"
"github.com/go-loremipsum/loremipsum"
"go.k6.io/k6/js/modules" "go.k6.io/k6/js/modules"
) )
@ -24,6 +26,7 @@ type (
size int size int
rand *rand.Rand rand *rand.Rand
buf []byte buf []byte
typ string
offset int offset int
} }
@ -36,19 +39,53 @@ type (
// TailSize specifies number of extra random bytes in the buffer tail. // TailSize specifies number of extra random bytes in the buffer tail.
const TailSize = 1024 const TailSize = 1024
func NewGenerator(vu modules.VU, size int) Generator { var payloadTypes = []string{
"text",
"random",
"",
}
func NewGenerator(vu modules.VU, size int, typ string) Generator {
if size <= 0 { if size <= 0 {
panic("size should be positive") panic("size should be positive")
} }
var found bool
for i := range payloadTypes {
if payloadTypes[i] == typ {
found = true
break
}
}
if !found {
vu.InitEnv().Logger.Info("Unknown payload type '%s', random will be used.", typ)
}
r := rand.New(rand.NewSource(time.Now().UnixNano())) r := rand.New(rand.NewSource(time.Now().UnixNano()))
buf := make([]byte, size+TailSize) buf := make([]byte, size+TailSize)
r.Read(buf) g := Generator{
return Generator{
vu: vu, vu: vu,
size: size, size: size,
rand: r, rand: r,
buf: buf, buf: buf,
typ: typ,
}
g.fillBuffer()
return g
}
func (g *Generator) fillBuffer() {
switch g.typ {
case "text":
li := loremipsum.New()
b := bytes.NewBuffer(g.buf[:0])
for b.Len() < g.size+TailSize {
b.WriteString(li.Paragraph())
b.WriteRune('\n')
}
g.buf = b.Bytes()
default:
rand.Read(g.buf) // Per docs, err is always nil here
} }
} }
@ -66,9 +103,9 @@ func (g *Generator) GenPayload(calcHash bool) GenPayloadResponse {
} }
func (g *Generator) nextSlice() []byte { func (g *Generator) nextSlice() []byte {
if g.offset >= TailSize { if g.offset+g.size >= len(g.buf) {
g.offset = 0 g.offset = 0
g.rand.Read(g.buf) // Per docs, err is always nil here g.fillBuffer()
} }
result := g.buf[g.offset : g.offset+g.size] result := g.buf[g.offset : g.offset+g.size]

View file

@ -16,25 +16,25 @@ func TestGenerator(t *testing.T) {
t.Run("fails on negative size", func(t *testing.T) { t.Run("fails on negative size", func(t *testing.T) {
require.Panics(t, func() { require.Panics(t, func() {
_ = NewGenerator(vu, -1) _ = NewGenerator(vu, -1, "")
}) })
}) })
t.Run("fails on zero size", func(t *testing.T) { t.Run("fails on zero size", func(t *testing.T) {
require.Panics(t, func() { require.Panics(t, func() {
_ = NewGenerator(vu, 0) _ = NewGenerator(vu, 0, "")
}) })
}) })
t.Run("creates slice of specified size", func(t *testing.T) { t.Run("creates slice of specified size", func(t *testing.T) {
size := 10 size := 10
g := NewGenerator(vu, size) g := NewGenerator(vu, size, "")
slice := g.nextSlice() slice := g.nextSlice()
require.Len(t, slice, size) require.Len(t, slice, size)
}) })
t.Run("creates a different slice on each call", func(t *testing.T) { t.Run("creates a different slice on each call", func(t *testing.T) {
g := NewGenerator(vu, 1000) g := NewGenerator(vu, 1000, "")
slice1 := g.nextSlice() slice1 := g.nextSlice()
slice2 := g.nextSlice() slice2 := g.nextSlice()
// Each slice should be unique (assuming that 1000 random bytes will never coincide // Each slice should be unique (assuming that 1000 random bytes will never coincide
@ -43,7 +43,7 @@ func TestGenerator(t *testing.T) {
}) })
t.Run("keeps generating slices after consuming entire tail", func(t *testing.T) { t.Run("keeps generating slices after consuming entire tail", func(t *testing.T) {
g := NewGenerator(vu, 1000) g := NewGenerator(vu, 1000, "")
initialSlice := g.nextSlice() initialSlice := g.nextSlice()
for i := 0; i < TailSize; i++ { for i := 0; i < TailSize; i++ {
g.nextSlice() g.nextSlice()

View file

@ -7,9 +7,11 @@ import (
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"git.frostfs.info/TrueCloudLab/xk6-frostfs/internal/local/rawclient" "git.frostfs.info/TrueCloudLab/xk6-frostfs/internal/local/rawclient"
"github.com/dop251/goja" "github.com/dop251/goja"
"go.k6.io/k6/js/modules"
) )
type Client struct { type Client struct {
vu modules.VU
rc *rawclient.RawClient rc *rawclient.RawClient
} }
@ -30,7 +32,7 @@ type (
) )
func (c *Client) Put(containerID string, headers map[string]string, payload goja.ArrayBuffer) PutResponse { func (c *Client) Put(containerID string, headers map[string]string, payload goja.ArrayBuffer) PutResponse {
id, err := c.rc.Put(mustParseContainerID(containerID), nil, headers, payload.Bytes()) id, err := c.rc.Put(c.vu.Context(), mustParseContainerID(containerID), nil, headers, payload.Bytes())
if err != nil { if err != nil {
return PutResponse{Error: err.Error()} return PutResponse{Error: err.Error()}
} }
@ -41,14 +43,14 @@ func (c *Client) Put(containerID string, headers map[string]string, payload goja
} }
func (c *Client) Get(containerID, objectID string) GetResponse { func (c *Client) Get(containerID, objectID string) GetResponse {
if _, err := c.rc.Get(mustParseContainerID(containerID), mustParseObjectID(objectID)); err != nil { if _, err := c.rc.Get(c.vu.Context(), mustParseContainerID(containerID), mustParseObjectID(objectID)); err != nil {
return GetResponse{Error: err.Error()} return GetResponse{Error: err.Error()}
} }
return GetResponse{Success: true} return GetResponse{Success: true}
} }
func (c *Client) Delete(containerID, objectID string) DeleteResponse { func (c *Client) Delete(containerID, objectID string) DeleteResponse {
if err := c.rc.Delete(mustParseContainerID(containerID), mustParseObjectID(objectID)); err != nil { if err := c.rc.Delete(c.vu.Context(), mustParseContainerID(containerID), mustParseObjectID(objectID)); err != nil {
return DeleteResponse{Error: err.Error()} return DeleteResponse{Error: err.Error()}
} }
return DeleteResponse{Success: true} return DeleteResponse{Success: true}

View file

@ -40,6 +40,8 @@ type RootModule struct {
mu sync.Mutex mu sync.Mutex
// configFile is the name of the configuration file used during one test. // configFile is the name of the configuration file used during one test.
configFile string configFile string
// configDir is the name of the configuration directory used during one test.
configDir string
// ng is the engine instance used during one test, corresponding to the configFile. Each VU // ng is the engine instance used during one test, corresponding to the configFile. Each VU
// gets the same engine instance. // gets the same engine instance.
ng *engine.StorageEngine ng *engine.StorageEngine
@ -48,7 +50,7 @@ type RootModule struct {
// Local represents an instance of the module for every VU. // Local represents an instance of the module for every VU.
type Local struct { type Local struct {
vu modules.VU vu modules.VU
ResolveEngine func(context.Context, string, bool) (*engine.StorageEngine, error) ResolveEngine func(context.Context, string, string, bool) (*engine.StorageEngine, error)
} }
// Ensure the interfaces are implemented correctly. // Ensure the interfaces are implemented correctly.
@ -71,7 +73,7 @@ func (r *RootModule) NewModuleInstance(vu modules.VU) modules.Instance {
return NewLocalModuleInstance(vu, r.GetOrCreateEngine) return NewLocalModuleInstance(vu, r.GetOrCreateEngine)
} }
func NewLocalModuleInstance(vu modules.VU, resolveEngine func(context.Context, string, bool) (*engine.StorageEngine, error)) *Local { func NewLocalModuleInstance(vu modules.VU, resolveEngine func(context.Context, string, string, bool) (*engine.StorageEngine, error)) *Local {
return &Local{ return &Local{
vu: vu, vu: vu,
ResolveEngine: resolveEngine, ResolveEngine: resolveEngine,
@ -100,21 +102,22 @@ func checkResourceLimits() error {
return nil return nil
} }
// GetOrCreateEngine returns the current engine instance for the given configuration file, // GetOrCreateEngine returns the current engine instance for the given configuration file or directory,
// creating a new one if none exists. Note that the identity of configuration files is their // creating a new one if none exists. Note that the identity of configuration files is their
// file name for the purposes of test runs. // file name for the purposes of test runs.
func (r *RootModule) GetOrCreateEngine(ctx context.Context, configFile string, debug bool) (*engine.StorageEngine, error) { func (r *RootModule) GetOrCreateEngine(ctx context.Context, configFile string, configDir string, debug bool) (*engine.StorageEngine, error) {
r.mu.Lock() r.mu.Lock()
defer r.mu.Unlock() defer r.mu.Unlock()
if len(configFile) == 0 { if len(configFile) == 0 && len(configDir) == 0 {
return nil, errors.New("configFile cannot be empty") return nil, errors.New("provide configFile or configDir")
} }
// Create and initialize engine for the given configFile if it doesn't exist already // Create and initialize engine for the given configFile if it doesn't exist already
if r.ng == nil { if r.ng == nil {
r.configFile = configFile r.configFile = configFile
appCfg := config.New(config.Prm{}, config.WithConfigFile(configFile)) r.configDir = configDir
appCfg := config.New(configFile, configDir, "")
ngOpts, shardOpts, err := storageEngineOptionsFromConfig(appCfg, debug) ngOpts, shardOpts, err := storageEngineOptionsFromConfig(appCfg, debug)
if err != nil { if err != nil {
return nil, fmt.Errorf("creating engine options from config: %v", err) return nil, fmt.Errorf("creating engine options from config: %v", err)
@ -131,11 +134,15 @@ func (r *RootModule) GetOrCreateEngine(ctx context.Context, configFile string, d
if err := r.ng.Open(); err != nil { if err := r.ng.Open(); err != nil {
return nil, fmt.Errorf("opening engine: %v", err) return nil, fmt.Errorf("opening engine: %v", err)
} }
if err := r.ng.Init(); err != nil { if err := r.ng.Init(ctx); err != nil {
return nil, fmt.Errorf("initializing engine: %v", err) return nil, fmt.Errorf("initializing engine: %v", err)
} }
} else if configFile != r.configFile { } else if configFile != r.configFile {
return nil, fmt.Errorf("GetOrCreateEngine called with mismatching configFile after engine was initialized: got %q, want %q", configFile, r.configFile) return nil, fmt.Errorf("GetOrCreateEngine called with mismatching configFile after engine was "+
"initialized: got %q, want %q", configFile, r.configFile)
} else if configDir != r.configDir {
return nil, fmt.Errorf("GetOrCreateEngine called with mismatching configDir after engine was "+
"initialized: got %q, want %q", configDir, r.configDir)
} }
return r.ng, nil return r.ng, nil
@ -149,10 +156,10 @@ func (s *Local) Exports() modules.Exports {
func (s *Local) VU() modules.VU { return s.vu } func (s *Local) VU() modules.VU { return s.vu }
func (s *Local) Connect(configFile, hexKey string, debug bool) (*Client, error) { func (s *Local) Connect(configFile, configDir, hexKey string, debug bool) (*Client, error) {
ng, err := s.ResolveEngine(s.VU().Context(), configFile, debug) ng, err := s.ResolveEngine(s.VU().Context(), configFile, configDir, debug)
if err != nil { if err != nil {
return nil, fmt.Errorf("connecting to engine for config %q: %v", configFile, err) return nil, fmt.Errorf("connecting to engine for config - file %q dir %q: %v", configFile, configDir, err)
} }
key, err := ParseOrCreateKey(hexKey) key, err := ParseOrCreateKey(hexKey)
@ -204,7 +211,7 @@ func (s *Local) Connect(configFile, hexKey string, debug bool) (*Client, error)
} }
}), }),
) )
return &Client{rc}, nil return &Client{vu: s.vu, rc: rc}, nil
} }
type epochState struct{} type epochState struct{}
@ -292,17 +299,20 @@ func storageEngineOptionsFromConfig(c *config.Config, debug bool) ([]engine.Opti
// write cache // write cache
if wc := sc.WriteCache(); wc.Enabled() { if wc := sc.WriteCache(); wc.Enabled() {
opts = append(opts, shard.WithWriteCacheOptions( opts = append(opts,
writecache.WithPath(wc.Path()), shard.WithWriteCache(true),
writecache.WithMaxBatchSize(wc.BoltDB().MaxBatchSize()), shard.WithWriteCacheOptions(
writecache.WithMaxBatchDelay(wc.BoltDB().MaxBatchDelay()), writecache.WithPath(wc.Path()),
writecache.WithMaxObjectSize(wc.MaxObjectSize()), writecache.WithMaxBatchSize(wc.BoltDB().MaxBatchSize()),
writecache.WithSmallObjectSize(wc.SmallObjectSize()), writecache.WithMaxBatchDelay(wc.BoltDB().MaxBatchDelay()),
writecache.WithFlushWorkersCount(wc.WorkersNumber()), writecache.WithMaxObjectSize(wc.MaxObjectSize()),
writecache.WithMaxCacheSize(wc.SizeLimit()), writecache.WithSmallObjectSize(wc.SmallObjectSize()),
writecache.WithNoSync(wc.NoSync()), writecache.WithFlushWorkersCount(wc.WorkersNumber()),
writecache.WithLogger(&logger.Logger{Logger: log}), writecache.WithMaxCacheSize(wc.SizeLimit()),
)) writecache.WithNoSync(wc.NoSync()),
writecache.WithLogger(&logger.Logger{Logger: log}),
),
)
} }
// tree // tree

View file

@ -3,6 +3,7 @@
package rawclient package rawclient
import ( import (
"context"
"fmt" "fmt"
"time" "time"
@ -31,7 +32,7 @@ func New(ng *engine.StorageEngine, opts ...Option) *RawClient {
return client return client
} }
func (c *RawClient) Put(containerID cid.ID, ownerID *user.ID, headers map[string]string, payload []byte) (oid.ID, error) { func (c *RawClient) Put(ctx context.Context, containerID cid.ID, ownerID *user.ID, headers map[string]string, payload []byte) (oid.ID, error) {
sz := len(payload) sz := len(payload)
attrs := make([]object.Attribute, len(headers)) attrs := make([]object.Attribute, len(headers))
@ -71,7 +72,7 @@ func (c *RawClient) Put(containerID cid.ID, ownerID *user.ID, headers map[string
req.WithObject(obj) req.WithObject(obj)
start := time.Now() start := time.Now()
_, err = c.ng.Put(req) err = c.ng.Put(ctx, req)
c.onPut(uint64(sz), err, time.Since(start)) c.onPut(uint64(sz), err, time.Since(start))
if err != nil { if err != nil {
return oid.ID{}, err return oid.ID{}, err
@ -80,7 +81,7 @@ func (c *RawClient) Put(containerID cid.ID, ownerID *user.ID, headers map[string
return id, nil return id, nil
} }
func (c *RawClient) Get(containerID cid.ID, objectID oid.ID) (*object.Object, error) { func (c *RawClient) Get(ctx context.Context, containerID cid.ID, objectID oid.ID) (*object.Object, error) {
var addr oid.Address var addr oid.Address
addr.SetContainer(containerID) addr.SetContainer(containerID)
addr.SetObject(objectID) addr.SetObject(objectID)
@ -89,7 +90,7 @@ func (c *RawClient) Get(containerID cid.ID, objectID oid.ID) (*object.Object, er
req.WithAddress(addr) req.WithAddress(addr)
start := time.Now() start := time.Now()
res, err := c.ng.Get(req) res, err := c.ng.Get(ctx, req)
var sz uint64 var sz uint64
obj := res.Object() obj := res.Object()
@ -101,7 +102,7 @@ func (c *RawClient) Get(containerID cid.ID, objectID oid.ID) (*object.Object, er
return obj, nil return obj, nil
} }
func (c *RawClient) Delete(containerID cid.ID, objectID oid.ID) error { func (c *RawClient) Delete(ctx context.Context, containerID cid.ID, objectID oid.ID) error {
var addr oid.Address var addr oid.Address
addr.SetContainer(containerID) addr.SetContainer(containerID)
addr.SetObject(objectID) addr.SetObject(objectID)
@ -110,7 +111,7 @@ func (c *RawClient) Delete(containerID cid.ID, objectID oid.ID) error {
req.WithAddress(addr) req.WithAddress(addr)
start := time.Now() start := time.Now()
_, err := c.ng.Delete(req) _, err := c.ng.Delete(ctx, req)
c.onDelete(err, time.Since(start)) c.onDelete(err, time.Since(start))
return err return err
} }

View file

@ -31,11 +31,11 @@ import (
type ( type (
Client struct { Client struct {
vu modules.VU vu modules.VU
key ecdsa.PrivateKey key ecdsa.PrivateKey
tok session.Object tok session.Object
cli *client.Client cli *client.Client
bufsize int prepareLocally bool
} }
PutResponse struct { PutResponse struct {
@ -66,30 +66,18 @@ type (
} }
PreparedObject struct { PreparedObject struct {
vu modules.VU vu modules.VU
key ecdsa.PrivateKey key ecdsa.PrivateKey
cli *client.Client cli *client.Client
bufsize int hdr object.Object
payload []byte
hdr object.Object prepareLocally bool
payload []byte
} }
) )
const defaultBufferSize = 64 * 1024 const defaultBufferSize = 64 * 1024
func (c *Client) SetBufferSize(size int) { func (c *Client) Put(containerID string, headers map[string]string, payload goja.ArrayBuffer, chunkSize int) PutResponse {
if size < 0 {
panic("buffer size must be positive")
}
if size == 0 {
c.bufsize = defaultBufferSize
} else {
c.bufsize = size
}
}
func (c *Client) Put(containerID string, headers map[string]string, payload goja.ArrayBuffer) PutResponse {
cliContainerID := parseContainerID(containerID) cliContainerID := parseContainerID(containerID)
tok := c.tok tok := c.tok
@ -116,7 +104,7 @@ func (c *Client) Put(containerID string, headers map[string]string, payload goja
o.SetOwnerID(&owner) o.SetOwnerID(&owner)
o.SetAttributes(attrs...) o.SetAttributes(attrs...)
resp, err := put(c.vu, c.bufsize, c.cli, &tok, &o, payload.Bytes()) resp, err := put(c.vu, c.cli, c.prepareLocally, &tok, &o, payload.Bytes(), chunkSize)
if err != nil { if err != nil {
return PutResponse{Success: false, Error: err.Error()} return PutResponse{Success: false, Error: err.Error()}
} }
@ -176,7 +164,7 @@ func (c *Client) Get(containerID, objectID string) GetResponse {
prm.WithinSession(tok) prm.WithinSession(tok)
var objSize = 0 var objSize = 0
err = get(c.cli, prm, c.vu.Context(), c.bufsize, func(data []byte) { err = get(c.cli, prm, c.vu.Context(), func(data []byte) {
objSize += len(data) objSize += len(data)
}) })
if err != nil { if err != nil {
@ -194,10 +182,9 @@ func get(
cli *client.Client, cli *client.Client,
prm client.PrmObjectGet, prm client.PrmObjectGet,
ctx context.Context, ctx context.Context,
bufSize int,
onDataChunk func(chunk []byte), onDataChunk func(chunk []byte),
) error { ) error {
var buf = make([]byte, bufSize) var buf = make([]byte, defaultBufferSize)
objectReader, err := cli.ObjectGetInit(ctx, prm) objectReader, err := cli.ObjectGetInit(ctx, prm)
if err != nil { if err != nil {
@ -245,7 +232,7 @@ func (c *Client) VerifyHash(containerID, objectID, expectedHash string) VerifyHa
prm.WithinSession(tok) prm.WithinSession(tok)
hasher := sha256.New() hasher := sha256.New()
err = get(c.cli, prm, c.vu.Context(), c.bufsize, func(data []byte) { err = get(c.cli, prm, c.vu.Context(), func(data []byte) {
hasher.Write(data) hasher.Write(data)
}) })
if err != nil { if err != nil {
@ -381,13 +368,12 @@ func (c *Client) Onsite(containerID string, payload goja.ArrayBuffer) PreparedOb
} }
return PreparedObject{ return PreparedObject{
vu: c.vu, vu: c.vu,
key: c.key, key: c.key,
cli: c.cli, cli: c.cli,
bufsize: c.bufsize, hdr: *obj,
payload: data,
hdr: *obj, prepareLocally: c.prepareLocally,
payload: data,
} }
} }
@ -413,7 +399,7 @@ func (p PreparedObject) Put(headers map[string]string) PutResponse {
return PutResponse{Success: false, Error: err.Error()} return PutResponse{Success: false, Error: err.Error()}
} }
_, err = put(p.vu, p.bufsize, p.cli, nil, &obj, p.payload) _, err = put(p.vu, p.cli, p.prepareLocally, nil, &obj, p.payload, 0)
if err != nil { if err != nil {
return PutResponse{Success: false, Error: err.Error()} return PutResponse{Success: false, Error: err.Error()}
} }
@ -421,8 +407,18 @@ func (p PreparedObject) Put(headers map[string]string) PutResponse {
return PutResponse{Success: true, ObjectID: id.String()} return PutResponse{Success: true, ObjectID: id.String()}
} }
func put(vu modules.VU, bufSize int, cli *client.Client, tok *session.Object, type epochSource uint64
hdr *object.Object, payload []byte) (*client.ResObjectPut, error) {
func (s epochSource) CurrentEpoch() uint64 {
return uint64(s)
}
func put(vu modules.VU, cli *client.Client, prepareLocally bool, tok *session.Object,
hdr *object.Object, payload []byte, chunkSize int) (*client.ResObjectPut, error) {
bufSize := defaultBufferSize
if chunkSize > 0 {
bufSize = chunkSize
}
buf := make([]byte, bufSize) buf := make([]byte, bufSize)
rdr := bytes.NewReader(payload) rdr := bytes.NewReader(payload)
sz := rdr.Size() sz := rdr.Size()
@ -434,6 +430,18 @@ func put(vu modules.VU, bufSize int, cli *client.Client, tok *session.Object,
if tok != nil { if tok != nil {
prm.WithinSession(*tok) prm.WithinSession(*tok)
} }
if chunkSize > 0 {
prm.SetGRPCPayloadChunkLen(chunkSize)
}
if prepareLocally {
res, err := cli.NetworkInfo(vu.Context(), client.PrmNetworkInfo{})
if err != nil {
return nil, err
}
prm.WithObjectMaxSize(res.Info().MaxObjectSize())
prm.WithEpochSource(epochSource(res.Info().CurrentEpoch()))
prm.WithoutHomomorphicHash(true)
}
objectWriter, err := cli.ObjectPutInit(vu.Context(), prm) objectWriter, err := cli.ObjectPutInit(vu.Context(), prm)
if err != nil { if err != nil {
@ -441,21 +449,21 @@ func put(vu modules.VU, bufSize int, cli *client.Client, tok *session.Object,
return nil, err return nil, err
} }
if !objectWriter.WriteHeader(*hdr) { if !objectWriter.WriteHeader(vu.Context(), *hdr) {
stats.Report(vu, objPutFails, 1) stats.Report(vu, objPutFails, 1)
_, err = objectWriter.Close() _, err = objectWriter.Close(vu.Context())
return nil, err return nil, err
} }
n, _ := rdr.Read(buf) n, _ := rdr.Read(buf)
for n > 0 { for n > 0 {
if !objectWriter.WritePayloadChunk(buf[:n]) { if !objectWriter.WritePayloadChunk(vu.Context(), buf[:n]) {
break break
} }
n, _ = rdr.Read(buf) n, _ = rdr.Read(buf)
} }
resp, err := objectWriter.Close() resp, err := objectWriter.Close(vu.Context())
if err != nil { if err != nil {
stats.Report(vu, objPutFails, 1) stats.Report(vu, objPutFails, 1)
return nil, err return nil, err

View file

@ -51,7 +51,7 @@ func (n *Native) Exports() modules.Exports {
return modules.Exports{Default: n} return modules.Exports{Default: n}
} }
func (n *Native) Connect(endpoint, hexPrivateKey string, dialTimeout, streamTimeout int) (*Client, error) { func (n *Native) Connect(endpoint, hexPrivateKey string, dialTimeout, streamTimeout int, prepareLocally bool) (*Client, error) {
var ( var (
cli client.Client cli client.Client
pk *keys.PrivateKey pk *keys.PrivateKey
@ -82,7 +82,7 @@ func (n *Native) Connect(endpoint, hexPrivateKey string, dialTimeout, streamTime
prmDial.SetStreamTimeout(time.Duration(streamTimeout) * time.Second) prmDial.SetStreamTimeout(time.Duration(streamTimeout) * time.Second)
} }
err = cli.Dial(prmDial) err = cli.Dial(n.vu.Context(), prmDial)
if err != nil { if err != nil {
return nil, fmt.Errorf("dial endpoint: %s %w", endpoint, err) return nil, fmt.Errorf("dial endpoint: %s %w", endpoint, err)
} }
@ -133,10 +133,10 @@ func (n *Native) Connect(endpoint, hexPrivateKey string, dialTimeout, streamTime
cnrPutDuration, _ = registry.NewMetric("frostfs_cnr_put_duration", metrics.Trend, metrics.Time) cnrPutDuration, _ = registry.NewMetric("frostfs_cnr_put_duration", metrics.Trend, metrics.Time)
return &Client{ return &Client{
vu: n.vu, vu: n.vu,
key: pk.PrivateKey, key: pk.PrivateKey,
tok: tok, tok: tok,
cli: &cli, cli: &cli,
bufsize: defaultBufferSize, prepareLocally: prepareLocally,
}, nil }, nil
} }

View file

@ -0,0 +1,67 @@
// Package profile provides an extension to generate profile data from k6 itself.
//
// An Output extension is used to leverage the Start and Stop hooks which are
// otherwise inaccessible in a regular module.
package profile
import (
"fmt"
"os"
"runtime"
"runtime/pprof"
"go.k6.io/k6/metrics"
"go.k6.io/k6/output"
)
const (
cpuProfilePath = "cpu.prof"
memProfilePath = "mem.prof"
)
type profExt struct {
cpuFile *os.File
}
func New(output.Params) (output.Output, error) {
return &profExt{}, nil
}
func (*profExt) Description() string {
return "profile"
}
func (ext *profExt) Start() error {
var err error
ext.cpuFile, err = os.Create(cpuProfilePath)
if err != nil {
return fmt.Errorf("creating cpu profile file: %v", err)
}
if err := pprof.StartCPUProfile(ext.cpuFile); err != nil {
return fmt.Errorf("starting cpu profile: %v", err)
}
return nil
}
func (ext *profExt) Stop() error {
pprof.StopCPUProfile()
if err := ext.cpuFile.Close(); err != nil {
return fmt.Errorf("closing cpu profile file: %v", err)
}
f, err := os.Create(memProfilePath)
if err != nil {
return fmt.Errorf("creating mem profile file: %v", err)
}
defer f.Close()
runtime.GC()
if err := pprof.WriteHeapProfile(f); err != nil {
return fmt.Errorf("writing mem profile: %v", err)
}
return nil
}
func (*profExt) AddMetricSamples([]metrics.SampleContainer) {}
func init() {
output.RegisterExtension("profile", New)
}

View file

@ -0,0 +1,82 @@
package registry
import (
"fmt"
"os"
)
type ObjExporter struct {
selector *ObjSelector
}
type PreGenerateInfo struct {
Buckets []string `json:"buckets"`
Objects []ObjInfo `json:"objects"`
ObjSize string `json:"obj_size"`
}
type ObjInfo struct {
Bucket string `json:"bucket"`
Object string `json:"object"`
}
func NewObjExporter(selector *ObjSelector) *ObjExporter {
return &ObjExporter{selector: selector}
}
func (o *ObjExporter) ExportJSONPreGen(fileName string) error {
f, err := os.Create(fileName)
if err != nil {
return err
}
defer f.Close()
// there can be a lot of object, so manually form json
if _, err = f.WriteString(`{"objects":[`); err != nil {
return err
}
bucketMap := make(map[string]struct{})
count, err := o.selector.Count()
if err != nil {
return err
}
var comma string
for i := 0; i < count; i++ {
info := o.selector.NextObject()
if info == nil {
break
}
if _, err = f.WriteString(fmt.Sprintf(`%s{"bucket":"%s","object":"%s"}`, comma, info.S3Bucket, info.S3Key)); err != nil {
return err
}
if i == 0 {
comma = ","
}
bucketMap[info.S3Bucket] = struct{}{}
}
if _, err = f.WriteString(`],"buckets":[`); err != nil {
return err
}
i := 0
comma = ""
for bucket := range bucketMap {
if _, err = f.WriteString(fmt.Sprintf(`%s"%s"`, comma, bucket)); err != nil {
return err
}
if i == 0 {
comma = ","
}
i++
}
_, err = f.WriteString(`]}`)
return err
}

View file

@ -81,10 +81,14 @@ func (o *ObjRegistry) SetObjectStatus(id uint64, oldStatus, newStatus string) er
return fmt.Errorf("bucket doesn't exist: '%s'", oldStatus) return fmt.Errorf("bucket doesn't exist: '%s'", oldStatus)
} }
objBytes := oldB.Get(encodeId(id)) key := encodeId(id)
objBytes := oldB.Get(key)
if objBytes == nil { if objBytes == nil {
return errors.New("object doesn't exist") return errors.New("object doesn't exist")
} }
if err := oldB.Delete(key); err != nil {
return fmt.Errorf("bucket.Delete: %w", err)
}
obj := new(ObjectInfo) obj := new(ObjectInfo)
if err := obj.Unmarshal(objBytes); err != nil { if err := obj.Unmarshal(objBytes); err != nil {

View file

@ -163,10 +163,11 @@ func (o *ObjSelector) selectLoop() {
if len(cache) != o.cacheSize { if len(cache) != o.cacheSize {
// no more objects, wait a little; the logic could be improved. // no more objects, wait a little; the logic could be improved.
select { select {
case <-time.After(time.Second * time.Duration(o.filter.Age/2)): case <-time.After(time.Second):
case <-o.ctx.Done(): case <-o.ctx.Done():
return return
} }
lastID = 0
} }
// clean handled objects // clean handled objects

View file

@ -94,6 +94,10 @@ func (r *Registry) GetSelector(dbFilePath string, name string, cacheSize int, fi
return selector return selector
} }
func (r *Registry) GetExporter(selector *ObjSelector) *ObjExporter {
return NewObjExporter(selector)
}
func parseFilter(filter map[string]string) (*ObjFilter, error) { func parseFilter(filter map[string]string) (*ObjFilter, error) {
objFilter := ObjFilter{} objFilter := ObjFilter{}
objFilter.Status = filter["status"] objFilter.Status = filter["status"]

View file

@ -5,11 +5,13 @@ import (
"context" "context"
"crypto/sha256" "crypto/sha256"
"encoding/hex" "encoding/hex"
"fmt"
"strconv" "strconv"
"time" "time"
"git.frostfs.info/TrueCloudLab/xk6-frostfs/internal/stats" "git.frostfs.info/TrueCloudLab/xk6-frostfs/internal/stats"
"github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/feature/s3/manager"
"github.com/aws/aws-sdk-go-v2/service/s3" "github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/aws/aws-sdk-go-v2/service/s3/types" "github.com/aws/aws-sdk-go-v2/service/s3/types"
"github.com/dop251/goja" "github.com/dop251/goja"
@ -21,6 +23,7 @@ type (
Client struct { Client struct {
vu modules.VU vu modules.VU
cli *s3.Client cli *s3.Client
to time.Duration
} }
PutResponse struct { PutResponse struct {
@ -54,7 +57,14 @@ func (c *Client) Put(bucket, key string, payload goja.ArrayBuffer) PutResponse {
sz := rdr.Size() sz := rdr.Size()
start := time.Now() start := time.Now()
_, err := c.cli.PutObject(c.vu.Context(), &s3.PutObjectInput{
ch := CancelMonitor(c.vu.Context(), c.vu.State().Logger.WithField("method", "put"))
defer func() { close(ch) }()
ctx, cancel := context.WithTimeout(context.Background(), c.to)
defer cancel()
_, err := c.cli.PutObject(ctx, &s3.PutObjectInput{
Bucket: aws.String(bucket), Bucket: aws.String(bucket),
Key: aws.String(key), Key: aws.String(key),
Body: rdr, Body: rdr,
@ -70,10 +80,49 @@ func (c *Client) Put(bucket, key string, payload goja.ArrayBuffer) PutResponse {
return PutResponse{Success: true} return PutResponse{Success: true}
} }
const multipartUploadMinPartSize = 5 * 1024 * 1024 // 5MB
func (c *Client) Multipart(bucket, key string, objPartSize, concurrency int, payload goja.ArrayBuffer) PutResponse {
if objPartSize < multipartUploadMinPartSize {
stats.Report(c.vu, objPutFails, 1)
return PutResponse{Success: false, Error: fmt.Sprintf("part size '%d' must be greater than '%d'(5 MB)", objPartSize, multipartUploadMinPartSize)}
}
start := time.Now()
uploader := manager.NewUploader(c.cli, func(u *manager.Uploader) {
u.PartSize = int64(objPartSize)
u.Concurrency = concurrency
})
payloadReader := bytes.NewReader(payload.Bytes())
sz := payloadReader.Len()
ctx, cancel := context.WithTimeout(context.Background(), c.to)
defer cancel()
_, err := uploader.Upload(ctx, &s3.PutObjectInput{
Bucket: aws.String(bucket),
Key: aws.String(key),
Body: payloadReader,
})
if err != nil {
stats.Report(c.vu, objPutFails, 1)
return PutResponse{Success: false, Error: err.Error()}
}
stats.Report(c.vu, objPutTotal, 1)
stats.ReportDataSent(c.vu, float64(sz))
stats.Report(c.vu, objPutDuration, metrics.D(time.Since(start)))
return PutResponse{Success: true}
}
func (c *Client) Delete(bucket, key string) DeleteResponse { func (c *Client) Delete(bucket, key string) DeleteResponse {
start := time.Now() start := time.Now()
_, err := c.cli.DeleteObject(c.vu.Context(), &s3.DeleteObjectInput{ ctx, cancel := context.WithTimeout(context.Background(), c.to)
defer cancel()
_, err := c.cli.DeleteObject(ctx, &s3.DeleteObjectInput{
Bucket: aws.String(bucket), Bucket: aws.String(bucket),
Key: aws.String(key), Key: aws.String(key),
}) })
@ -90,8 +139,14 @@ func (c *Client) Delete(bucket, key string) DeleteResponse {
func (c *Client) Get(bucket, key string) GetResponse { func (c *Client) Get(bucket, key string) GetResponse {
start := time.Now() start := time.Now()
ch := CancelMonitor(c.vu.Context(), c.vu.State().Logger.WithField("method", "get"))
defer func() { close(ch) }()
ctx, cancel := context.WithTimeout(context.Background(), c.to)
defer cancel()
var objSize = 0 var objSize = 0
err := get(c.cli, bucket, key, func(chunk []byte) { err := get(ctx, c.cli, bucket, key, func(chunk []byte) {
objSize += len(chunk) objSize += len(chunk)
}) })
if err != nil { if err != nil {
@ -106,6 +161,7 @@ func (c *Client) Get(bucket, key string) GetResponse {
} }
func get( func get(
ctx context.Context,
c *s3.Client, c *s3.Client,
bucket string, bucket string,
key string, key string,
@ -113,7 +169,7 @@ func get(
) error { ) error {
var buf = make([]byte, 4*1024) var buf = make([]byte, 4*1024)
obj, err := c.GetObject(context.Background(), &s3.GetObjectInput{ obj, err := c.GetObject(ctx, &s3.GetObjectInput{
Bucket: aws.String(bucket), Bucket: aws.String(bucket),
Key: aws.String(key), Key: aws.String(key),
}) })
@ -135,7 +191,11 @@ func get(
func (c *Client) VerifyHash(bucket, key, expectedHash string) VerifyHashResponse { func (c *Client) VerifyHash(bucket, key, expectedHash string) VerifyHashResponse {
hasher := sha256.New() hasher := sha256.New()
err := get(c.cli, bucket, key, func(data []byte) {
ctx, cancel := context.WithTimeout(context.Background(), c.to)
defer cancel()
err := get(ctx, c.cli, bucket, key, func(data []byte) {
hasher.Write(data) hasher.Write(data)
}) })
if err != nil { if err != nil {
@ -167,7 +227,11 @@ func (c *Client) CreateBucket(bucket string, params map[string]string) CreateBuc
} }
start := time.Now() start := time.Now()
_, err = c.cli.CreateBucket(c.vu.Context(), &s3.CreateBucketInput{
ctx, cancel := context.WithTimeout(context.Background(), c.to)
defer cancel()
_, err = c.cli.CreateBucket(ctx, &s3.CreateBucketInput{
Bucket: aws.String(bucket), Bucket: aws.String(bucket),
ACL: types.BucketCannedACL(params["acl"]), ACL: types.BucketCannedACL(params["acl"]),
CreateBucketConfiguration: bucketConfiguration, CreateBucketConfiguration: bucketConfiguration,

View file

@ -1,6 +1,7 @@
package s3 package s3
import ( import (
"context"
"crypto/tls" "crypto/tls"
"fmt" "fmt"
"net/http" "net/http"
@ -10,6 +11,7 @@ import (
"github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config" "github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/s3" "github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/sirupsen/logrus"
"go.k6.io/k6/js/modules" "go.k6.io/k6/js/modules"
"go.k6.io/k6/metrics" "go.k6.io/k6/metrics"
) )
@ -114,5 +116,18 @@ func (s *S3) Connect(endpoint string, params map[string]string) (*Client, error)
return &Client{ return &Client{
vu: s.vu, vu: s.vu,
cli: cli, cli: cli,
to: timeout,
}, nil }, nil
} }
func CancelMonitor(ctx context.Context, writer logrus.FieldLogger) chan<- struct{} {
ch := make(chan struct{})
go func() {
select {
case <-ctx.Done():
writer.WithField("error", ctx.Err()).Print("VU context done")
case <-ch:
}
}()
return ch
}

View file

@ -55,7 +55,7 @@ func (*frostfs) DeleteContainer(context.Context, cid.ID, *session.Container) err
} }
func (f *frostfs) ReadObject(ctx context.Context, prm layer.PrmObjectRead) (*layer.ObjectPart, error) { func (f *frostfs) ReadObject(ctx context.Context, prm layer.PrmObjectRead) (*layer.ObjectPart, error) {
obj, err := f.Get(prm.Container, prm.Object) obj, err := f.Get(ctx, prm.Container, prm.Object)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -78,7 +78,7 @@ func (f *frostfs) CreateObject(ctx context.Context, prm layer.PrmObjectCreate) (
for _, attr := range prm.Attributes { for _, attr := range prm.Attributes {
hdrs[attr[0]] = attr[1] hdrs[attr[0]] = attr[1]
} }
return f.Put(prm.Container, &prm.Creator, hdrs, payload) return f.Put(ctx, prm.Container, &prm.Creator, hdrs, payload)
} }
func (f *frostfs) DeleteObject(context.Context, layer.PrmObjectDelete) error { func (f *frostfs) DeleteObject(context.Context, layer.PrmObjectDelete) error {

View file

@ -56,7 +56,7 @@ func (s *Local) Exports() modules.Exports {
return modules.Exports{Default: s} return modules.Exports{Default: s}
} }
func (s *Local) Connect(configFile string, params map[string]string, bucketMapping map[string]string) (*Client, error) { func (s *Local) Connect(configFile string, configDir string, params map[string]string, bucketMapping map[string]string) (*Client, error) {
// Parse configuration flags. // Parse configuration flags.
fs := flag.NewFlagSet("s3local", flag.ContinueOnError) fs := flag.NewFlagSet("s3local", flag.ContinueOnError)
@ -107,16 +107,16 @@ func (s *Local) Connect(configFile string, params map[string]string, bucketMappi
objGetDuration, _ = registry.NewMetric("s3local_obj_get_duration", metrics.Trend, metrics.Time) objGetDuration, _ = registry.NewMetric("s3local_obj_get_duration", metrics.Trend, metrics.Time)
// Create S3 layer backed by local storage engine and tree service. // Create S3 layer backed by local storage engine and tree service.
ng, err := s.l.ResolveEngine(s.l.VU().Context(), configFile, *debugLogger) ng, err := s.l.ResolveEngine(s.l.VU().Context(), configFile, configDir, *debugLogger)
if err != nil { if err != nil {
return nil, fmt.Errorf("connecting to engine for config %q: %v", configFile, err) return nil, fmt.Errorf("connecting to engine for config - file %q dir %q: %v", configFile, configDir, err)
} }
treeSvc := tree.NewTree(treeServiceEngineWrapper{ treeSvc := tree.NewTree(treeServiceEngineWrapper{
ng: ng, ng: ng,
pos: *nodePosition, pos: *nodePosition,
size: *nodeCount, size: *nodeCount,
}) }, zap.L())
rc := rawclient.New(ng, rc := rawclient.New(ng,
rawclient.WithKey(key.PrivateKey), rawclient.WithKey(key.PrivateKey),

View file

@ -49,7 +49,7 @@ func (r nodeResponse) GetParentID() uint64 { return r.parentID }
func (r nodeResponse) GetTimestamp() uint64 { return r.ts } func (r nodeResponse) GetTimestamp() uint64 { return r.ts }
func (s treeServiceEngineWrapper) GetNodes(ctx context.Context, p *tree.GetNodesParams) ([]tree.NodeResponse, error) { func (s treeServiceEngineWrapper) GetNodes(ctx context.Context, p *tree.GetNodesParams) ([]tree.NodeResponse, error) {
nodeIDs, err := s.ng.TreeGetByPath(p.BktInfo.CID, p.TreeID, pilorama.AttributeFilename, p.Path, p.LatestOnly) nodeIDs, err := s.ng.TreeGetByPath(ctx, p.BktInfo.CID, p.TreeID, pilorama.AttributeFilename, p.Path, p.LatestOnly)
if err != nil { if err != nil {
if errors.Is(err, pilorama.ErrTreeNotFound) { if errors.Is(err, pilorama.ErrTreeNotFound) {
// This is needed in order for the tree implementation to create the tree/node // This is needed in order for the tree implementation to create the tree/node
@ -62,7 +62,7 @@ func (s treeServiceEngineWrapper) GetNodes(ctx context.Context, p *tree.GetNodes
resps := make([]tree.NodeResponse, 0, len(nodeIDs)) resps := make([]tree.NodeResponse, 0, len(nodeIDs))
for _, nodeID := range nodeIDs { for _, nodeID := range nodeIDs {
m, parentID, err := s.ng.TreeGetMeta(p.BktInfo.CID, p.TreeID, nodeID) m, parentID, err := s.ng.TreeGetMeta(ctx, p.BktInfo.CID, p.TreeID, nodeID)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -94,7 +94,7 @@ func (s treeServiceEngineWrapper) GetSubTree(ctx context.Context, bktInfo *data.
var traverse func(nodeID uint64, curDepth uint32) error var traverse func(nodeID uint64, curDepth uint32) error
traverse = func(nodeID uint64, curDepth uint32) error { traverse = func(nodeID uint64, curDepth uint32) error {
m, parentID, err := s.ng.TreeGetMeta(bktInfo.CID, treeID, nodeID) m, parentID, err := s.ng.TreeGetMeta(ctx, bktInfo.CID, treeID, nodeID)
if err != nil { if err != nil {
return fmt.Errorf("getting meta: %v", err) return fmt.Errorf("getting meta: %v", err)
} }
@ -110,7 +110,7 @@ func (s treeServiceEngineWrapper) GetSubTree(ctx context.Context, bktInfo *data.
return nil return nil
} }
children, err := s.ng.TreeGetChildren(bktInfo.CID, treeID, nodeID) children, err := s.ng.TreeGetChildren(ctx, bktInfo.CID, treeID, nodeID)
if err != nil { if err != nil {
return fmt.Errorf("getting children: %v", err) return fmt.Errorf("getting children: %v", err)
} }
@ -135,7 +135,7 @@ func (s treeServiceEngineWrapper) AddNode(ctx context.Context, bktInfo *data.Buc
Position: s.pos, Position: s.pos,
Size: s.size, Size: s.size,
} }
mv, err := s.ng.TreeMove(desc, treeID, &pilorama.Move{ mv, err := s.ng.TreeMove(ctx, desc, treeID, &pilorama.Move{
Parent: parentID, Parent: parentID,
Child: pilorama.RootID, Child: pilorama.RootID,
Meta: pilorama.Meta{Items: mapToKV(meta)}, Meta: pilorama.Meta{Items: mapToKV(meta)},
@ -149,7 +149,7 @@ func (s treeServiceEngineWrapper) AddNodeByPath(ctx context.Context, bktInfo *da
Position: s.pos, Position: s.pos,
Size: s.size, Size: s.size,
} }
mvs, err := s.ng.TreeAddByPath(desc, treeID, pilorama.AttributeFilename, path, mapToKV(meta)) mvs, err := s.ng.TreeAddByPath(ctx, desc, treeID, pilorama.AttributeFilename, path, mapToKV(meta))
if err != nil { if err != nil {
return pilorama.TrashID, err return pilorama.TrashID, err
} }
@ -165,7 +165,7 @@ func (s treeServiceEngineWrapper) MoveNode(ctx context.Context, bktInfo *data.Bu
Position: s.pos, Position: s.pos,
Size: s.size, Size: s.size,
} }
_, err := s.ng.TreeMove(desc, treeID, &pilorama.Move{ _, err := s.ng.TreeMove(ctx, desc, treeID, &pilorama.Move{
Parent: parentID, Parent: parentID,
Child: nodeID, Child: nodeID,
Meta: pilorama.Meta{ Meta: pilorama.Meta{
@ -184,7 +184,7 @@ func (s treeServiceEngineWrapper) RemoveNode(ctx context.Context, bktInfo *data.
Position: s.pos, Position: s.pos,
Size: s.size, Size: s.size,
} }
_, err := s.ng.TreeMove(desc, treeID, &pilorama.Move{ _, err := s.ng.TreeMove(ctx, desc, treeID, &pilorama.Move{
Parent: pilorama.TrashID, Parent: pilorama.TrashID,
Child: nodeID, Child: nodeID,
}) })

View file

@ -9,18 +9,22 @@ import (
func Report(vu modules.VU, metric *metrics.Metric, value float64) { func Report(vu modules.VU, metric *metrics.Metric, value float64) {
metrics.PushIfNotDone(vu.Context(), vu.State().Samples, metrics.Sample{ metrics.PushIfNotDone(vu.Context(), vu.State().Samples, metrics.Sample{
Metric: metric, TimeSeries: metrics.TimeSeries{
Time: time.Now(), Metric: metric,
Value: value, },
Time: time.Now(),
Value: value,
}) })
} }
func ReportDataReceived(vu modules.VU, value float64) { func ReportDataReceived(vu modules.VU, value float64) {
vu.State().BuiltinMetrics.DataReceived.Sink.Add( vu.State().BuiltinMetrics.DataReceived.Sink.Add(
metrics.Sample{ metrics.Sample{
Metric: &metrics.Metric{}, TimeSeries: metrics.TimeSeries{
Value: value, Metric: &metrics.Metric{},
Time: time.Now()}, },
Value: value,
Time: time.Now()},
) )
} }
@ -28,8 +32,10 @@ func ReportDataSent(vu modules.VU, value float64) {
state := vu.State() state := vu.State()
state.BuiltinMetrics.DataSent.Sink.Add( state.BuiltinMetrics.DataSent.Sink.Add(
metrics.Sample{ metrics.Sample{
Metric: &metrics.Metric{}, TimeSeries: metrics.TimeSeries{
Value: value, Metric: &metrics.Metric{},
Time: time.Now()}, },
Value: value,
Time: time.Now()},
) )
} }

View file

@ -0,0 +1,6 @@
package version
var (
// Version is the xk6 command-line utils version.
Version = "dev"
)

View file

@ -24,7 +24,10 @@ const summary_json = __ENV.SUMMARY_JSON || "/tmp/summary.json";
// Select random gRPC endpoint for current VU // Select random gRPC endpoint for current VU
const grpc_endpoints = __ENV.GRPC_ENDPOINTS.split(','); const grpc_endpoints = __ENV.GRPC_ENDPOINTS.split(',');
const grpc_endpoint = grpc_endpoints[Math.floor(Math.random() * grpc_endpoints.length)]; const grpc_endpoint = grpc_endpoints[Math.floor(Math.random() * grpc_endpoints.length)];
const grpc_client = native.connect(grpc_endpoint, '', __ENV.DIAL_TIMEOUT ? parseInt(__ENV.DIAL_TIMEOUT) : 5, __ENV.STREAM_TIMEOUT ? parseInt(__ENV.STREAM_TIMEOUT) : 60); const grpc_client = native.connect(grpc_endpoint, '',
__ENV.DIAL_TIMEOUT ? parseInt(__ENV.DIAL_TIMEOUT) : 5,
__ENV.STREAM_TIMEOUT ? parseInt(__ENV.STREAM_TIMEOUT) : 60,
__ENV.PREPARE_LOCALLY ? __ENV.PREPARE_LOCALLY.toLowerCase() === "true" : false);
const log = logging.new().withField("endpoint", grpc_endpoint); const log = logging.new().withField("endpoint", grpc_endpoint);
const registry_enabled = !!__ENV.REGISTRY_FILE; const registry_enabled = !!__ENV.REGISTRY_FILE;
@ -46,12 +49,27 @@ if (registry_enabled && delete_age) {
); );
} }
const read_age = __ENV.READ_AGE ? parseInt(__ENV.READ_AGE) : 10;
let obj_to_read_selector = undefined;
if (registry_enabled) {
obj_to_read_selector = registry.getSelector(
__ENV.REGISTRY_FILE,
"obj_to_read",
__ENV.SELECTION_SIZE ? parseInt(__ENV.SELECTION_SIZE) : 0,
{
status: "created",
age: read_age,
}
)
}
const generator = datagen.generator(1024 * parseInt(__ENV.WRITE_OBJ_SIZE));
const generator = datagen.generator(1024 * parseInt(__ENV.WRITE_OBJ_SIZE), __ENV.PAYLOAD_TYPE || "");
const scenarios = {}; const scenarios = {};
const write_vu_count = parseInt(__ENV.WRITERS || '0'); const write_vu_count = parseInt(__ENV.WRITERS || '0');
const write_grpc_chunk_size = 1024 * parseInt(__ENV.GRPC_CHUNK_SIZE || '0')
if (write_vu_count > 0) { if (write_vu_count > 0) {
scenarios.write = { scenarios.write = {
executor: 'constant-vus', executor: 'constant-vus',
@ -103,12 +121,17 @@ export function setup() {
console.log(`Writing VUs: ${write_vu_count}`); console.log(`Writing VUs: ${write_vu_count}`);
console.log(`Deleting VUs: ${delete_vu_count}`); console.log(`Deleting VUs: ${delete_vu_count}`);
console.log(`Total VUs: ${total_vu_count}`); console.log(`Total VUs: ${total_vu_count}`);
const start_timestamp = Date.now()
console.log(`Load started at: ${Date(start_timestamp).toString()}`)
} }
export function teardown(data) { export function teardown(data) {
if (obj_registry) { if (obj_registry) {
obj_registry.close(); obj_registry.close();
} }
const end_timestamp = Date.now()
console.log(`Load finished at: ${Date(end_timestamp).toString()}`)
} }
export function handleSummary(data) { export function handleSummary(data) {
@ -129,7 +152,7 @@ export function obj_write() {
const container = container_list[Math.floor(Math.random() * container_list.length)]; const container = container_list[Math.floor(Math.random() * container_list.length)];
const { payload, hash } = generator.genPayload(registry_enabled); const { payload, hash } = generator.genPayload(registry_enabled);
const resp = grpc_client.put(container, headers, payload); const resp = grpc_client.put(container, headers, payload, write_grpc_chunk_size);
if (!resp.success) { if (!resp.success) {
log.withField("cid", container).error(resp.error); log.withField("cid", container).error(resp.error);
return; return;
@ -145,6 +168,18 @@ export function obj_read() {
sleep(__ENV.SLEEP_READ); sleep(__ENV.SLEEP_READ);
} }
if(obj_to_read_selector) {
const obj = obj_to_read_selector.nextObject();
if (!obj) {
return;
}
const resp = grpc_client.get(obj.c_id, obj.o_id)
if (!resp.success) {
log.withFields({cid: obj.c_id, oid: obj.o_id}).error(resp.error);
}
return
}
const obj = obj_list[Math.floor(Math.random() * obj_list.length)]; const obj = obj_list[Math.floor(Math.random() * obj_list.length)];
const resp = grpc_client.get(obj.container, obj.object) const resp = grpc_client.get(obj.container, obj.object)
if (!resp.success) { if (!resp.success) {

View file

@ -24,7 +24,10 @@ const summary_json = __ENV.SUMMARY_JSON || "/tmp/summary.json";
// Select random gRPC endpoint for current VU // Select random gRPC endpoint for current VU
const grpc_endpoints = __ENV.GRPC_ENDPOINTS.split(','); const grpc_endpoints = __ENV.GRPC_ENDPOINTS.split(',');
const grpc_endpoint = grpc_endpoints[Math.floor(Math.random() * grpc_endpoints.length)]; const grpc_endpoint = grpc_endpoints[Math.floor(Math.random() * grpc_endpoints.length)];
const grpc_client = native.connect(grpc_endpoint, '', __ENV.DIAL_TIMEOUT ? parseInt(__ENV.DIAL_TIMEOUT) : 5, __ENV.STREAM_TIMEOUT ? parseInt(__ENV.STREAM_TIMEOUT) : 60); const grpc_client = native.connect(grpc_endpoint, '',
__ENV.DIAL_TIMEOUT ? parseInt(__ENV.DIAL_TIMEOUT) : 5,
__ENV.STREAM_TIMEOUT ? parseInt(__ENV.STREAM_TIMEOUT) : 60,
__ENV.PREPARE_LOCALLY ? __ENV.PREPARE_LOCALLY.toLowerCase() === "true" : false);
const log = logging.new().withField("endpoint", grpc_endpoint); const log = logging.new().withField("endpoint", grpc_endpoint);
const registry_enabled = !!__ENV.REGISTRY_FILE; const registry_enabled = !!__ENV.REGISTRY_FILE;
@ -46,6 +49,19 @@ if (registry_enabled && delete_age) {
); );
} }
const read_age = __ENV.READ_AGE ? parseInt(__ENV.READ_AGE) : 10;
let obj_to_read_selector = undefined;
if (registry_enabled) {
obj_to_read_selector = registry.getSelector(
__ENV.REGISTRY_FILE,
"obj_to_read",
__ENV.SELECTION_SIZE ? parseInt(__ENV.SELECTION_SIZE) : 0,
{
status: "created",
age: read_age,
}
)
}
const generator = datagen.generator(1024 * parseInt(__ENV.WRITE_OBJ_SIZE)); const generator = datagen.generator(1024 * parseInt(__ENV.WRITE_OBJ_SIZE));
@ -55,6 +71,7 @@ const time_unit = __ENV.TIME_UNIT || '1s';
const pre_alloc_write_vus = parseInt(__ENV.PRE_ALLOC_WRITERS || '0'); const pre_alloc_write_vus = parseInt(__ENV.PRE_ALLOC_WRITERS || '0');
const max_write_vus = parseInt(__ENV.MAX_WRITERS || pre_alloc_write_vus); const max_write_vus = parseInt(__ENV.MAX_WRITERS || pre_alloc_write_vus);
const write_rate = parseInt(__ENV.WRITE_RATE || '0'); const write_rate = parseInt(__ENV.WRITE_RATE || '0');
const write_grpc_chunk_size = 1024 * parseInt(__ENV.GRPC_CHUNK_SIZE || '0')
if (write_rate > 0) { if (write_rate > 0) {
scenarios.write = { scenarios.write = {
executor: 'constant-arrival-rate', executor: 'constant-arrival-rate',
@ -128,12 +145,17 @@ export function setup() {
console.log(`Read rate: ${read_rate}`); console.log(`Read rate: ${read_rate}`);
console.log(`Writing rate: ${write_rate}`); console.log(`Writing rate: ${write_rate}`);
console.log(`Delete rate: ${delete_rate}`); console.log(`Delete rate: ${delete_rate}`);
const start_timestamp = Date.now()
console.log(`Load started at: ${Date(start_timestamp).toString()}`)
} }
export function teardown(data) { export function teardown(data) {
if (obj_registry) { if (obj_registry) {
obj_registry.close(); obj_registry.close();
} }
const end_timestamp = Date.now()
console.log(`Load finished at: ${Date(end_timestamp).toString()}`)
} }
export function handleSummary(data) { export function handleSummary(data) {
@ -154,7 +176,7 @@ export function obj_write() {
const container = container_list[Math.floor(Math.random() * container_list.length)]; const container = container_list[Math.floor(Math.random() * container_list.length)];
const { payload, hash } = generator.genPayload(registry_enabled); const { payload, hash } = generator.genPayload(registry_enabled);
const resp = grpc_client.put(container, headers, payload); const resp = grpc_client.put(container, headers, payload, write_grpc_chunk_size);
if (!resp.success) { if (!resp.success) {
log.withField("cid", container).error(resp.error); log.withField("cid", container).error(resp.error);
return; return;
@ -170,6 +192,18 @@ export function obj_read() {
sleep(__ENV.SLEEP_READ); sleep(__ENV.SLEEP_READ);
} }
if(obj_to_read_selector) {
const obj = obj_to_read_selector.nextObject();
if (!obj) {
return;
}
const resp = grpc_client.get(obj.c_id, obj.o_id)
if (!resp.success) {
log.withFields({cid: obj.c_id, oid: obj.o_id}).error(resp.error);
}
return
}
const obj = obj_list[Math.floor(Math.random() * obj_list.length)]; const obj = obj_list[Math.floor(Math.random() * obj_list.length)];
const resp = grpc_client.get(obj.container, obj.object) const resp = grpc_client.get(obj.container, obj.object)
if (!resp.success) { if (!resp.success) {

View file

@ -31,7 +31,7 @@ const obj_registry = registry_enabled ? registry.open(__ENV.REGISTRY_FILE) : und
const duration = __ENV.DURATION; const duration = __ENV.DURATION;
const generator = datagen.generator(1024 * parseInt(__ENV.WRITE_OBJ_SIZE)); const generator = datagen.generator(1024 * parseInt(__ENV.WRITE_OBJ_SIZE), __ENV.PAYLOAD_TYPE || "");
const scenarios = {}; const scenarios = {};
@ -71,12 +71,17 @@ export function setup() {
console.log(`Reading VUs: ${read_vu_count}`); console.log(`Reading VUs: ${read_vu_count}`);
console.log(`Writing VUs: ${write_vu_count}`); console.log(`Writing VUs: ${write_vu_count}`);
console.log(`Total VUs: ${total_vu_count}`); console.log(`Total VUs: ${total_vu_count}`);
const start_timestamp = Date.now()
console.log(`Load started at: ${Date(start_timestamp).toString()}`)
} }
export function teardown(data) { export function teardown(data) {
if (obj_registry) { if (obj_registry) {
obj_registry.close(); obj_registry.close();
} }
const end_timestamp = Date.now()
console.log(`Load finished at: ${Date(end_timestamp).toString()}`)
} }
export function handleSummary(data) { export function handleSummary(data) {

View file

@ -21,9 +21,10 @@ const read_size = JSON.parse(open(__ENV.PREGEN_JSON)).obj_size;
const summary_json = __ENV.SUMMARY_JSON || "/tmp/summary.json"; const summary_json = __ENV.SUMMARY_JSON || "/tmp/summary.json";
const config_file = __ENV.CONFIG_FILE; const config_file = __ENV.CONFIG_FILE;
const config_dir = __ENV.CONFIG_DIR;
const debug_logger = (__ENV.DEBUG_LOGGER || 'false') == 'true'; const debug_logger = (__ENV.DEBUG_LOGGER || 'false') == 'true';
const local_client = local.connect(config_file, '', debug_logger); const local_client = local.connect(config_file, config_dir, '', debug_logger);
const log = logging.new().withField("config", config_file); const log = logging.new().withFields({"config_file": config_file,"config_dir": config_dir});
const registry_enabled = !!__ENV.REGISTRY_FILE; const registry_enabled = !!__ENV.REGISTRY_FILE;
const obj_registry = registry_enabled ? registry.open(__ENV.REGISTRY_FILE) : undefined; const obj_registry = registry_enabled ? registry.open(__ENV.REGISTRY_FILE) : undefined;
@ -100,12 +101,17 @@ export function setup() {
console.log(`Writing VUs: ${write_vu_count}`); console.log(`Writing VUs: ${write_vu_count}`);
console.log(`Deleting VUs: ${delete_vu_count}`); console.log(`Deleting VUs: ${delete_vu_count}`);
console.log(`Total VUs: ${total_vu_count}`); console.log(`Total VUs: ${total_vu_count}`);
const start_timestamp = Date.now()
console.log(`Load started at: ${Date(start_timestamp).toString()}`)
} }
export function teardown(data) { export function teardown(data) {
if (obj_registry) { if (obj_registry) {
obj_registry.close(); obj_registry.close();
} }
const end_timestamp = Date.now()
console.log(`Load finished at: ${Date(end_timestamp).toString()}`)
} }
export function handleSummary(data) { export function handleSummary(data) {

View file

@ -1,50 +1,50 @@
import uuid import uuid
from helpers.cmd import execute_cmd from helpers.cmd import execute_cmd, log
def create_bucket(endpoint, versioning, location): def create_bucket(endpoint, versioning, location, no_verify_ssl):
bucket_create_marker = False
if location: if location:
location = f"--create-bucket-configuration 'LocationConstraint={location}'" location = f"--create-bucket-configuration 'LocationConstraint={location}'"
bucket_name = str(uuid.uuid4()) bucket_name = str(uuid.uuid4())
no_verify_ssl_str = "--no-verify-ssl" if no_verify_ssl else ""
cmd_line = f"aws {no_verify_ssl_str} s3api create-bucket --bucket {bucket_name} " \
f"--endpoint {endpoint} {location}"
cmd_line_ver = f"aws {no_verify_ssl_str} s3api put-bucket-versioning --bucket {bucket_name} " \
f"--versioning-configuration Status=Enabled --endpoint {endpoint} "
cmd_line = f"aws --no-verify-ssl s3api create-bucket --bucket {bucket_name} " \ output, success = execute_cmd(cmd_line)
f"--endpoint http://{endpoint} {location}"
cmd_line_ver = f"aws --no-verify-ssl s3api put-bucket-versioning --bucket {bucket_name} " \
f"--versioning-configuration Status=Enabled --endpoint http://{endpoint} "
out, success = execute_cmd(cmd_line) if not success and "succeeded and you already own it" not in output:
log(f"{cmd_line}\n"
f"Bucket {bucket_name} has not been created:\n"
f"Error: {output}", endpoint)
return False
if not success: if versioning == "True":
if "succeeded and you already own it" in out: output, success = execute_cmd(cmd_line_ver)
bucket_create_marker = True
else:
print(f" > Bucket {bucket_name} has not been created:\n{out}")
else:
bucket_create_marker = True
print(f"cmd: {cmd_line}")
if bucket_create_marker and versioning == "True":
out, success = execute_cmd(cmd_line_ver)
if not success: if not success:
print(f" > Bucket versioning has not been applied for bucket {bucket_name}:\n{out}") log(f"{cmd_line_ver}\n"
f"Bucket versioning has not been applied for bucket {bucket_name}\n"
f"Error: {output}", endpoint)
else: else:
print(f" > Bucket versioning has been applied.") log(f"Bucket versioning has been applied for bucket {bucket_name}", endpoint)
log(f"Created bucket: {bucket_name}", endpoint)
return bucket_name return bucket_name
def upload_object(bucket, payload_filepath, endpoint): def upload_object(bucket, payload_filepath, endpoint, no_verify_ssl):
object_name = str(uuid.uuid4()) object_name = str(uuid.uuid4())
no_verify_ssl_str = "--no-verify-ssl" if no_verify_ssl else ""
cmd_line = f"aws --no-verify-ssl s3api put-object --bucket {bucket} --key {object_name} " \ cmd_line = f"aws {no_verify_ssl_str} s3api put-object --bucket {bucket} --key {object_name} " \
f"--body {payload_filepath} --endpoint http://{endpoint}" f"--body {payload_filepath} --endpoint {endpoint}"
out, success = execute_cmd(cmd_line) output, success = execute_cmd(cmd_line)
if not success: if not success:
print(f" > Object {object_name} has not been uploaded.") log(f"{cmd_line}\n"
f"Object {object_name} has not been uploaded\n"
f"Error: {output}", endpoint)
return False return False
else:
return object_name return bucket, endpoint, object_name

View file

@ -1,9 +1,12 @@
import os import os
import shlex import shlex
import sys import sys
from datetime import datetime
from subprocess import check_output, CalledProcessError, STDOUT from subprocess import check_output, CalledProcessError, STDOUT
def log(message, endpoint):
time = datetime.utcnow()
print(f"{time} at {endpoint}: {message}")
def execute_cmd(cmd_line): def execute_cmd(cmd_line):
cmd_args = shlex.split(cmd_line) cmd_args = shlex.split(cmd_line)
@ -19,10 +22,9 @@ def execute_cmd(cmd_line):
return output, success return output, success
def random_payload(payload_filepath, size): def random_payload(file, size):
with open('%s' % payload_filepath, 'w+b') as fout: file.write(os.urandom(1024 * int(size)))
fout.write(os.urandom(1024 * int(size))) file.flush()
class ProgressBar: class ProgressBar:
@staticmethod @staticmethod

View file

@ -1,81 +1,105 @@
import re import re
from helpers.cmd import execute_cmd, log
from helpers.cmd import execute_cmd
def create_container(endpoint, policy, wallet_file, wallet_config): def create_container(endpoint, policy, wallet_file, wallet_config):
cmd_line = f"frostfs-cli --rpc-endpoint {endpoint} container create --wallet {wallet_file} --config {wallet_config} " \ if wallet_file:
wallet_file = "--wallet " + wallet_file
if wallet_config:
wallet_config = "--config " + wallet_config
cmd_line = f"frostfs-cli --rpc-endpoint {endpoint} container create {wallet_file} {wallet_config} " \
f" --policy '{policy}' --basic-acl public-read-write --await" f" --policy '{policy}' --basic-acl public-read-write --await"
output, success = execute_cmd(cmd_line) output, success = execute_cmd(cmd_line)
if not success: if not success:
print(f" > Container has not been created:\n{output}") log(f"{cmd_line}\n"
f"Container has not been created\n"
f"{output}", endpoint)
return False return False
else:
try:
fst_str = output.split('\n')[0]
except Exception:
print(f"Got empty output: {output}")
return False
splitted = fst_str.split(": ")
if len(splitted) != 2:
raise ValueError(f"no CID was parsed from command output: \t{fst_str}")
print(f"Created container: {splitted[1]}") try:
fst_str = output.split('\n')[0]
except Exception:
log(f"{cmd_line}\n"
f"Incorrect output\n"
f"Output: {output or '<empty>'}", endpoint)
return False
splitted = fst_str.split(": ")
if len(splitted) != 2:
raise ValueError(f"no CID was parsed from command output:\t{fst_str}")
return splitted[1] log(f"Created container {splitted[1]}", endpoint)
return splitted[1]
def upload_object(container, payload_filepath, endpoint, wallet_file, wallet_config): def upload_object(container, payload_filepath, endpoint, wallet_file, wallet_config):
object_name = "" object_name = ""
cmd_line = f"frostfs-cli --rpc-endpoint {endpoint} object put --file {payload_filepath} --wallet {wallet_file} --config {wallet_config} " \ if wallet_file:
wallet_file = "--wallet " + wallet_file
if wallet_config:
wallet_config = "--config " + wallet_config
cmd_line = f"frostfs-cli --rpc-endpoint {endpoint} object put --file {payload_filepath} {wallet_file} {wallet_config} " \
f"--cid {container} --no-progress" f"--cid {container} --no-progress"
output, success = execute_cmd(cmd_line) output, success = execute_cmd(cmd_line)
if not success: if not success:
print(f" > Object {object_name} has not been uploaded:\n{output}") log(f"{cmd_line}\n"
f"Object {object_name} has not been uploaded\n"
f"Error: {output}", endpoint)
return False return False
else:
try: try:
# taking second string from command output # taking second string from command output
snd_str = output.split('\n')[1] snd_str = output.split('\n')[1]
except Exception: except Exception:
print(f"Got empty input: {output}") log(f"{cmd_line}\n"
return False f"Incorrect output\n"
splitted = snd_str.split(": ") f"Output: {output or '<empty>'}", endpoint)
if len(splitted) != 2: return False
raise Exception(f"no OID was parsed from command output: \t{snd_str}") splitted = snd_str.split(": ")
return splitted[1] if len(splitted) != 2:
raise Exception(f"no OID was parsed from command output: \t{snd_str}")
return container, endpoint, splitted[1]
def get_object(cid, oid, endpoint, out_filepath, wallet_file, wallet_config): def get_object(cid, oid, endpoint, out_filepath, wallet_file, wallet_config):
cmd_line = f"frostfs-cli object get -r {endpoint} --cid {cid} --oid {oid} --wallet {wallet_file} --config {wallet_config} " \ if wallet_file:
wallet_file = "--wallet " + wallet_file
if wallet_config:
wallet_config = "--config " + wallet_config
cmd_line = f"frostfs-cli object get -r {endpoint} --cid {cid} --oid {oid} {wallet_file} {wallet_config} " \
f"--file {out_filepath}" f"--file {out_filepath}"
output, success = execute_cmd(cmd_line) output, success = execute_cmd(cmd_line)
if not success: if not success:
print(f" > Failed to get object {output} from container {cid} \r\n" log(f"{cmd_line}\n"
f" > Error: {output}") f"Failed to get object {oid} from container {cid}\n"
f"Error: {output}", endpoint)
return False return False
return True return True
def search_object_by_id(cid, oid, endpoint, wallet_file, wallet_config, ttl=2): def search_object_by_id(cid, oid, endpoint, wallet_file, wallet_config, ttl=2):
cmd_line = f"frostfs-cli object search --ttl {ttl} -r {endpoint} --cid {cid} --oid {oid} --wallet {wallet_file} --config {wallet_config} " if wallet_file:
wallet_file = "--wallet " + wallet_file
if wallet_config:
wallet_config = "--config " + wallet_config
cmd_line = f"frostfs-cli object search --ttl {ttl} -r {endpoint} --cid {cid} --oid {oid} {wallet_file} {wallet_config} "
output, success = execute_cmd(cmd_line) output, success = execute_cmd(cmd_line)
if not success: if not success:
print(f" > Failed to search object {oid} for container {cid} \r\n" log(f"{cmd_line}\n"
f" > Error: {output}") f"Failed to search object {oid} for container {cid}\n"
f"Error: {output}", endpoint)
return False return False
re_rst = re.search(r'Found (\d+) objects', output) re_rst = re.search(r'Found (\d+) objects', output)
if not re_rst: if not re_rst:
raise Exception("Failed to parce search results") raise Exception("Failed to parse search results")
return re_rst.group(1) return re_rst.group(1)

View file

@ -1,18 +1,19 @@
#!/usr/bin/python3 #!/usr/bin/python3
import argparse import argparse
from itertools import cycle
import json import json
import random
import sys import sys
import tempfile
import time
from argparse import Namespace from argparse import Namespace
from concurrent.futures import ProcessPoolExecutor from concurrent.futures import ProcessPoolExecutor
from helpers.cmd import random_payload from helpers.cmd import random_payload
from helpers.frostfs_cli import create_container, upload_object from helpers.frostfs_cli import create_container, upload_object
ERROR_NO_CONTAINERS = 1 ERROR_WRONG_CONTAINERS_COUNT = 1
ERROR_NO_OBJECTS = 2 ERROR_WRONG_OBJECTS_COUNT = 2
MAX_WORKERS = 50 MAX_WORKERS = 50
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
@ -27,78 +28,95 @@ parser.add_argument(
help="Container placement policy", help="Container placement policy",
default="REP 2 IN X CBF 2 SELECT 2 FROM * AS X" default="REP 2 IN X CBF 2 SELECT 2 FROM * AS X"
) )
parser.add_argument('--endpoint', help='Node address') parser.add_argument('--endpoint', help='Nodes addresses separated by comma.')
parser.add_argument('--update', help='Save existed containers') parser.add_argument('--update', help='Save existed containers')
parser.add_argument('--ignore-errors', help='Ignore preset errors') parser.add_argument('--ignore-errors', help='Ignore preset errors', action='store_true')
parser.add_argument('--workers', help='Count of workers in preset. Max = 50, Default = 50', default=50) parser.add_argument('--workers', help='Count of workers in preset. Max = 50, Default = 50', default=50)
parser.add_argument('--sleep', help='Time to sleep between containers creation and objects upload (in seconds), '
'Default = 8', default=8)
args: Namespace = parser.parse_args() args: Namespace = parser.parse_args()
print(args) print(args)
def main(): def main():
container_list = [] containers = []
objects_list = [] objects_list = []
payload_filepath = '/tmp/data_file'
endpoints = args.endpoint.split(',') endpoints = args.endpoint.split(',')
wallet = args.wallet wallet = args.wallet
wallet_config = args.config wallet_config = args.config
workers = int(args.workers) workers = int(args.workers)
ignore_errors = True if args.ignore_errors else False objects_per_container = int(args.preload_obj)
ignore_errors = args.ignore_errors
if args.update: if args.update:
# Open file # Open file
with open(args.out) as f: with open(args.out) as f:
data_json = json.load(f) data_json = json.load(f)
container_list = data_json['containers'] containers = data_json['containers']
containers_count = len(containers)
else: else:
print(f"Create containers: {args.containers}") containers_count = int(args.containers)
print(f"Create containers: {containers_count}")
with ProcessPoolExecutor(max_workers=min(MAX_WORKERS, workers)) as executor: with ProcessPoolExecutor(max_workers=min(MAX_WORKERS, workers)) as executor:
containers_runs = {executor.submit(create_container, endpoints[random.randrange(len(endpoints))], containers_runs = [executor.submit(create_container, endpoint, args.policy, wallet, wallet_config)
args.policy, wallet, wallet_config): _ for _ in range(int(args.containers))} for _, endpoint in
zip(range(containers_count), cycle(endpoints))]
for run in containers_runs: for run in containers_runs:
if run.result(): container_id = run.result()
container_list.append(run.result()) if container_id:
containers.append(container_id)
print("Create containers: Completed") print("Create containers: Completed")
print(f" > Containers: {container_list}") print(f" > Containers: {containers}")
if not container_list: if containers_count == 0 or len(containers) != containers_count:
print("No containers to work with") print(f"Containers mismatch in preset: expected {containers_count}, created {len(containers)}")
if not ignore_errors: if not ignore_errors:
sys.exit(ERROR_NO_CONTAINERS) sys.exit(ERROR_WRONG_CONTAINERS_COUNT)
if args.sleep != 0:
print(f"Sleep for {args.sleep} seconds")
time.sleep(args.sleep)
print(f"Upload objects to each container: {args.preload_obj} ") print(f"Upload objects to each container: {args.preload_obj} ")
random_payload(payload_filepath, args.size) payload_file = tempfile.NamedTemporaryFile()
random_payload(payload_file, args.size)
print(" > Create random payload: Completed") print(" > Create random payload: Completed")
for container in container_list: total_objects = objects_per_container * containers_count
print(f" > Upload objects for container {container}") with ProcessPoolExecutor(max_workers=min(MAX_WORKERS, workers)) as executor:
with ProcessPoolExecutor(max_workers=min(MAX_WORKERS, workers)) as executor: objects_runs = [executor.submit(upload_object, container, payload_file.name,
objects_runs = {executor.submit(upload_object, container, payload_filepath, endpoint, wallet, wallet_config)
endpoints[random.randrange(len(endpoints))], wallet, wallet_config): _ for _ in range(int(args.preload_obj))} for _, container, endpoint in
zip(range(total_objects), cycle(containers), cycle(endpoints))]
for run in objects_runs: for run in objects_runs:
if run.result(): result = run.result()
objects_list.append({'container': container, 'object': run.result()}) if result:
print(f" > Upload objects for container {container}: Completed") container_id = result[0]
endpoint = result[1]
object_id = result[2]
objects_list.append({'container': container_id, 'object': object_id})
print(f" > Uploaded object {object_id} for container {container_id} via endpoint {endpoint}.")
print("Upload objects to each container: Completed") print("Upload objects to each container: Completed")
if int(args.preload_obj) > 0 and not objects_list: if total_objects > 0 and len(objects_list) != total_objects:
print("No objects were uploaded") print(f"Objects mismatch in preset: expected {total_objects}, created {len(objects_list)}")
if not ignore_errors: if not ignore_errors:
sys.exit(ERROR_NO_OBJECTS) sys.exit(ERROR_WRONG_OBJECTS_COUNT)
data = {'containers': container_list, 'objects': objects_list, 'obj_size': args.size + " Kb"} data = {'containers': containers, 'objects': objects_list, 'obj_size': args.size + " Kb"}
with open(args.out, 'w+') as f: with open(args.out, 'w+') as f:
json.dump(data, f, ensure_ascii=False, indent=2) json.dump(data, f, ensure_ascii=False, indent=2)
print("Result:") print("Result:")
print(f" > Total Containers has been created: {len(container_list)}.") print(f" > Total Containers has been created: {len(containers)}.")
print(f" > Total Objects has been created: {len(objects_list)}.") print(f" > Total Objects has been created: {len(objects_list)}.")

View file

@ -1,8 +1,11 @@
#!/usr/bin/python3 #!/usr/bin/python3
import argparse import argparse
from itertools import cycle
import json import json
import sys import sys
import tempfile
import time
from concurrent.futures import ProcessPoolExecutor from concurrent.futures import ProcessPoolExecutor
from helpers.cmd import random_payload from helpers.cmd import random_payload
@ -14,82 +17,101 @@ parser.add_argument('--size', help='Upload objects size in kb.')
parser.add_argument('--buckets', help='Number of buckets to create.') parser.add_argument('--buckets', help='Number of buckets to create.')
parser.add_argument('--out', help='JSON file with output.') parser.add_argument('--out', help='JSON file with output.')
parser.add_argument('--preload_obj', help='Number of pre-loaded objects.') parser.add_argument('--preload_obj', help='Number of pre-loaded objects.')
parser.add_argument('--endpoint', help='S3 Gateway address.') parser.add_argument('--endpoint', help='S3 Gateways addresses separated by comma.')
parser.add_argument('--update', help='True/False, False by default. Save existed buckets from target file (--out). ' parser.add_argument('--update', help='True/False, False by default. Save existed buckets from target file (--out). '
'New buckets will not be created.') 'New buckets will not be created.')
parser.add_argument('--location', help='AWS location. Will be empty, if has not be declared.', default="") parser.add_argument('--location', help='AWS location. Will be empty, if has not be declared.', default="")
parser.add_argument('--versioning', help='True/False, False by default.') parser.add_argument('--versioning', help='True/False, False by default.')
parser.add_argument('--ignore-errors', help='Ignore preset errors') parser.add_argument('--ignore-errors', help='Ignore preset errors', action='store_true')
parser.add_argument('--no-verify-ssl', help='Ignore SSL verifications', action='store_true')
parser.add_argument('--workers', help='Count of workers in preset. Max = 50, Default = 50', default=50) parser.add_argument('--workers', help='Count of workers in preset. Max = 50, Default = 50', default=50)
parser.add_argument('--sleep', help='Time to sleep between buckets creation and objects upload (in seconds), '
'Default = 8', default=8)
args = parser.parse_args() args = parser.parse_args()
print(args) print(args)
ERROR_NO_BUCKETS = 1 ERROR_WRONG_CONTAINERS_COUNT = 1
ERROR_NO_OBJECTS = 2 ERROR_WRONG_OBJECTS_COUNT = 2
MAX_WORKERS = 50 MAX_WORKERS = 50
def main(): def main():
bucket_list = [] buckets = []
objects_list = [] objects_list = []
payload_filepath = '/tmp/data_file' ignore_errors = args.ignore_errors
ignore_errors = True if args.ignore_errors else False no_verify_ssl = args.no_verify_ssl
endpoints = args.endpoint.split(',')
workers = int(args.workers) workers = int(args.workers)
objects_per_bucket = int(args.preload_obj)
if args.update: if args.update:
# Open file # Open file
with open(args.out) as f: with open(args.out) as f:
data_json = json.load(f) data_json = json.load(f)
bucket_list = data_json['buckets'] buckets = data_json['buckets']
buckets_count = len(buckets)
# Get CID list # Get CID list
else: else:
print(f"Create buckets: {args.buckets}") buckets_count = int(args.buckets)
print(f"Create buckets: {buckets_count}")
with ProcessPoolExecutor(max_workers=min(MAX_WORKERS, workers)) as executor: with ProcessPoolExecutor(max_workers=min(MAX_WORKERS, workers)) as executor:
buckets_runs = {executor.submit(create_bucket, args.endpoint, args.versioning, buckets_runs = [executor.submit(create_bucket, endpoint, args.versioning, args.location, no_verify_ssl)
args.location): _ for _ in range(int(args.buckets))} for _, endpoint in
zip(range(buckets_count), cycle(endpoints))]
for run in buckets_runs: for run in buckets_runs:
if run.result() is not None: bucket_name = run.result()
bucket_list.append(run.result()) if bucket_name:
buckets.append(bucket_name)
print("Create buckets: Completed") print("Create buckets: Completed")
print(f" > Buckets: {bucket_list}") print(f" > Buckets: {buckets}")
if not bucket_list: if buckets_count == 0 or len(buckets) != buckets_count:
print("No buckets to work with") print(f"Buckets mismatch in preset: expected {buckets_count}, created {len(buckets)}")
if not ignore_errors: if not ignore_errors:
sys.exit(ERROR_NO_BUCKETS) sys.exit(ERROR_WRONG_CONTAINERS_COUNT)
print(f"Upload objects to each bucket: {args.preload_obj} ") if args.sleep != 0:
random_payload(payload_filepath, args.size) print(f"Sleep for {args.sleep} seconds")
time.sleep(args.sleep)
print(f"Upload objects to each bucket: {objects_per_bucket} ")
payload_file = tempfile.NamedTemporaryFile()
random_payload(payload_file, args.size)
print(" > Create random payload: Completed") print(" > Create random payload: Completed")
for bucket in bucket_list: total_objects = objects_per_bucket * buckets_count
print(f" > Upload objects for bucket {bucket}")
with ProcessPoolExecutor(max_workers=min(MAX_WORKERS, workers)) as executor:
objects_runs = {executor.submit(upload_object, bucket, payload_filepath,
args.endpoint): _ for _ in range(int(args.preload_obj))}
for run in objects_runs: with ProcessPoolExecutor(max_workers=min(MAX_WORKERS, workers)) as executor:
if run.result() is not None: objects_runs = [executor.submit(upload_object, bucket, payload_file.name, endpoint, no_verify_ssl)
objects_list.append({'bucket': bucket, 'object': run.result()}) for _, bucket, endpoint in
print(f" > Upload objects for bucket {bucket}: Completed") zip(range(total_objects), cycle(buckets), cycle(endpoints))]
print("Upload objects to each bucket: Completed") for run in objects_runs:
result = run.result()
if result:
bucket = result[0]
endpoint = result[1]
object_id = result[2]
objects_list.append({'bucket': bucket, 'object': object_id})
print(f" > Uploaded object {object_id} for bucket {bucket} via endpoint {endpoint}.")
if int(args.preload_obj) > 0 and not objects_list: if total_objects > 0 and len(objects_list) != total_objects:
print("No objects were uploaded") print(f"Objects mismatch in preset: expected {total_objects}, created {len(objects_list)}")
if not ignore_errors: if not ignore_errors:
sys.exit(ERROR_NO_OBJECTS) sys.exit(ERROR_WRONG_OBJECTS_COUNT)
data = {'buckets': bucket_list, 'objects': objects_list, 'obj_size': args.size + " Kb"} data = {'buckets': buckets, 'objects': objects_list, 'obj_size': args.size + " Kb"}
with open(args.out, 'w+') as f: with open(args.out, 'w+') as f:
json.dump(data, f, ensure_ascii=False, indent=2) json.dump(data, f, ensure_ascii=False, indent=2)
print("Result:") print("Result:")
print(f" > Total Buckets has been created: {len(bucket_list)}.") print(f" > Total Buckets has been created: {len(buckets)}.")
print(f" > Total Objects has been created: {len(objects_list)}.") print(f" > Total Objects has been created: {len(objects_list)}.")

View file

@ -18,6 +18,13 @@ Scenarios `grpc.js`, `local.js`, `http.js` and `s3.js` support the following opt
* `SLEEP_WRITE` - time interval (in seconds) between writing VU iterations. * `SLEEP_WRITE` - time interval (in seconds) between writing VU iterations.
* `SLEEP_READ` - time interval (in seconds) between reading VU iterations. * `SLEEP_READ` - time interval (in seconds) between reading VU iterations.
* `SELECTION_SIZE` - size of batch to select for deletion (default: 1000). * `SELECTION_SIZE` - size of batch to select for deletion (default: 1000).
* `PAYLOAD_TYPE` - type of an object payload ("random" or "text", default: "random").
Additionally, the profiling extension can be enabled to generate CPU and memory profiles which can be inspected with `go tool pprof file.prof`:
```shell
$ ./k6 run --out profile (...)
```
The profiles are saved in the current directory as `cpu.prof` and `mem.prof`, respectively.
## Common options for the local scenarios: ## Common options for the local scenarios:
@ -128,6 +135,31 @@ Options (in addition to the common options):
* `SLEEP_DELETE` - time interval (in seconds) between deleting VU iterations. * `SLEEP_DELETE` - time interval (in seconds) between deleting VU iterations.
* `OBJ_NAME` - if specified, this name will be used for all write operations instead of random generation. * `OBJ_NAME` - if specified, this name will be used for all write operations instead of random generation.
## S3 Multipart
Perform multipart upload operation, break up large objects, so they can be transferred in multiple parts, in parallel
```shell
$ ./k6 run -e DURATION=600 \
-e WRITERS=400 -e WRITERS_MULTIPART=10 \
-e WRITE_OBJ_SIZE=524288 -e WRITE_OBJ_PART_SIZE=10240 \
-e S3_ENDPOINTS=10.78.70.142:8084,10.78.70.143:8084,10.78.70.144:8084,10.78.70.145:8084 \
-e PREGEN_JSON=/home/service/s3_4kb.json \
scenarios/s3_multipart.js
```
Options:
* `DURATION` - duration of scenario in seconds.
* `REGISTRY_FILE` - if set, all produced objects will be stored in database for subsequent verification. Database file name will be set to the value of `REGISTRY_FILE`.
* `PREGEN_JSON` - path to json file with pre-generated containers.
* `SLEEP_WRITE` - time interval (in seconds) between writing VU iterations.
* `PAYLOAD_TYPE` - type of an object payload ("random" or "text", default: "random").
* `S3_ENDPOINTS` - - endpoints of S3 gateways in format `host:port`. To specify multiple endpoints separate them by comma.
* `WRITERS` - number of VUs performing upload payload operation
* `WRITERS_MULTIPART` - number of goroutines that will upload parts in parallel
* `WRITE_OBJ_SIZE` - object size in kb for write(PUT) operations.
* `WRITE_OBJ_PART_SIZE` - part size in kb for multipart upload operations (must be greater or equal 5mb).
## S3 Local ## S3 Local
1. Follow steps 1. and 2. from the normal S3 scenario in order to obtain credentials and a preset file with the information about the buckets and objects that were pre-created. 1. Follow steps 1. and 2. from the normal S3 scenario in order to obtain credentials and a preset file with the information about the buckets and objects that were pre-created.

View file

@ -21,10 +21,12 @@ const bucket_list = new SharedArray('bucket_list', function () {
const read_size = JSON.parse(open(__ENV.PREGEN_JSON)).obj_size; const read_size = JSON.parse(open(__ENV.PREGEN_JSON)).obj_size;
const summary_json = __ENV.SUMMARY_JSON || "/tmp/summary.json"; const summary_json = __ENV.SUMMARY_JSON || "/tmp/summary.json";
const no_verify_ssl = __ENV.NO_VERIFY_SSL || "true";
const connection_args = {no_verify_ssl: no_verify_ssl}
// Select random S3 endpoint for current VU // Select random S3 endpoint for current VU
const s3_endpoints = __ENV.S3_ENDPOINTS.split(','); const s3_endpoints = __ENV.S3_ENDPOINTS.split(',');
const s3_endpoint = s3_endpoints[Math.floor(Math.random() * s3_endpoints.length)]; const s3_endpoint = s3_endpoints[Math.floor(Math.random() * s3_endpoints.length)];
const s3_client = s3.connect(`http://${s3_endpoint}`); const s3_client = s3.connect(s3_endpoint, connection_args);
const log = logging.new().withField("endpoint", s3_endpoint); const log = logging.new().withField("endpoint", s3_endpoint);
const registry_enabled = !!__ENV.REGISTRY_FILE; const registry_enabled = !!__ENV.REGISTRY_FILE;
@ -46,7 +48,21 @@ if (registry_enabled && delete_age) {
); );
} }
const generator = datagen.generator(1024 * parseInt(__ENV.WRITE_OBJ_SIZE)); const read_age = __ENV.READ_AGE ? parseInt(__ENV.READ_AGE) : 10;
let obj_to_read_selector = undefined;
if (registry_enabled) {
obj_to_read_selector = registry.getSelector(
__ENV.REGISTRY_FILE,
"obj_to_read",
__ENV.SELECTION_SIZE ? parseInt(__ENV.SELECTION_SIZE) : 0,
{
status: "created",
age: read_age,
}
)
}
const generator = datagen.generator(1024 * parseInt(__ENV.WRITE_OBJ_SIZE), __ENV.PAYLOAD_TYPE || "");
const scenarios = {}; const scenarios = {};
@ -102,12 +118,17 @@ export function setup() {
console.log(`Writing VUs: ${write_vu_count}`); console.log(`Writing VUs: ${write_vu_count}`);
console.log(`Deleting VUs: ${delete_vu_count}`); console.log(`Deleting VUs: ${delete_vu_count}`);
console.log(`Total VUs: ${total_vu_count}`); console.log(`Total VUs: ${total_vu_count}`);
const start_timestamp = Date.now()
console.log(`Load started at: ${Date(start_timestamp).toString()}`)
} }
export function teardown(data) { export function teardown(data) {
if (obj_registry) { if (obj_registry) {
obj_registry.close(); obj_registry.close();
} }
const end_timestamp = Date.now()
console.log(`Load finished at: ${Date(end_timestamp).toString()}`)
} }
export function handleSummary(data) { export function handleSummary(data) {
@ -142,6 +163,18 @@ export function obj_read() {
sleep(__ENV.SLEEP_READ); sleep(__ENV.SLEEP_READ);
} }
if(obj_to_read_selector) {
const obj = obj_to_read_selector.nextObject();
if (!obj) {
return;
}
const resp = s3_client.get(obj.s3_bucket, obj.s3_key)
if (!resp.success) {
log.withFields({bucket: obj.s3_bucket, key: obj.s3_key}).error(resp.error);
}
return
}
const obj = obj_list[Math.floor(Math.random() * obj_list.length)]; const obj = obj_list[Math.floor(Math.random() * obj_list.length)];
const resp = s3_client.get(obj.bucket, obj.object); const resp = s3_client.get(obj.bucket, obj.object);

View file

@ -24,7 +24,9 @@ const summary_json = __ENV.SUMMARY_JSON || "/tmp/summary.json";
// Select random S3 endpoint for current VU // Select random S3 endpoint for current VU
const s3_endpoints = __ENV.S3_ENDPOINTS.split(','); const s3_endpoints = __ENV.S3_ENDPOINTS.split(',');
const s3_endpoint = s3_endpoints[Math.floor(Math.random() * s3_endpoints.length)]; const s3_endpoint = s3_endpoints[Math.floor(Math.random() * s3_endpoints.length)];
const s3_client = s3.connect(`http://${s3_endpoint}`); const no_verify_ssl = __ENV.NO_VERIFY_SSL || "true";
const connection_args = {no_verify_ssl: no_verify_ssl}
const s3_client = s3.connect(s3_endpoint, connection_args);
const log = logging.new().withField("endpoint", s3_endpoint); const log = logging.new().withField("endpoint", s3_endpoint);
const registry_enabled = !!__ENV.REGISTRY_FILE; const registry_enabled = !!__ENV.REGISTRY_FILE;
@ -46,6 +48,20 @@ if (registry_enabled && delete_age) {
); );
} }
const read_age = __ENV.READ_AGE ? parseInt(__ENV.READ_AGE) : 10;
let obj_to_read_selector = undefined;
if (registry_enabled) {
obj_to_read_selector = registry.getSelector(
__ENV.REGISTRY_FILE,
"obj_to_read",
__ENV.SELECTION_SIZE ? parseInt(__ENV.SELECTION_SIZE) : 0,
{
status: "created",
age: read_age,
}
)
}
const generator = datagen.generator(1024 * parseInt(__ENV.WRITE_OBJ_SIZE)); const generator = datagen.generator(1024 * parseInt(__ENV.WRITE_OBJ_SIZE));
const scenarios = {}; const scenarios = {};
@ -129,12 +145,17 @@ export function setup() {
console.log(`Read rate: ${read_rate}`); console.log(`Read rate: ${read_rate}`);
console.log(`Writing rate: ${write_rate}`); console.log(`Writing rate: ${write_rate}`);
console.log(`Delete rate: ${delete_rate}`); console.log(`Delete rate: ${delete_rate}`);
const start_timestamp = Date.now()
console.log(`Load started at: ${Date(start_timestamp).toString()}`)
} }
export function teardown(data) { export function teardown(data) {
if (obj_registry) { if (obj_registry) {
obj_registry.close(); obj_registry.close();
} }
const end_timestamp = Date.now()
console.log(`Load finished at: ${Date(end_timestamp).toString()}`)
} }
export function handleSummary(data) { export function handleSummary(data) {
@ -169,6 +190,18 @@ export function obj_read() {
sleep(__ENV.SLEEP_READ); sleep(__ENV.SLEEP_READ);
} }
if(obj_to_read_selector) {
const obj = obj_to_read_selector.nextObject();
if (!obj) {
return;
}
const resp = s3_client.get(obj.s3_bucket, obj.s3_key)
if (!resp.success) {
log.withFields({bucket: obj.s3_bucket, key: obj.s3_key}).error(resp.error);
}
return
}
const obj = obj_list[Math.floor(Math.random() * obj_list.length)]; const obj = obj_list[Math.floor(Math.random() * obj_list.length)];
const resp = s3_client.get(obj.bucket, obj.object); const resp = s3_client.get(obj.bucket, obj.object);

106
scenarios/s3_multipart.js Normal file
View file

@ -0,0 +1,106 @@
import datagen from 'k6/x/frostfs/datagen';
import logging from 'k6/x/frostfs/logging';
import registry from 'k6/x/frostfs/registry';
import s3 from 'k6/x/frostfs/s3';
import {SharedArray} from 'k6/data';
import {sleep} from 'k6';
import {textSummary} from './libs/k6-summary-0.0.2.js';
import {parseEnv} from './libs/env-parser.js';
import {uuidv4} from './libs/k6-utils-1.4.0.js';
parseEnv();
const bucket_list = new SharedArray('bucket_list', function () {
return JSON.parse(open(__ENV.PREGEN_JSON)).buckets;
});
const summary_json = __ENV.SUMMARY_JSON || "/tmp/summary.json";
// Select random S3 endpoint for current VU
const s3_endpoints = __ENV.S3_ENDPOINTS.split(',');
const s3_endpoint = s3_endpoints[Math.floor(Math.random() * s3_endpoints.length)];
const no_verify_ssl = __ENV.NO_VERIFY_SSL || "true";
const connection_args = {no_verify_ssl: no_verify_ssl}
const s3_client = s3.connect(s3_endpoint, connection_args);
const log = logging.new().withField("endpoint", s3_endpoint);
const registry_enabled = !!__ENV.REGISTRY_FILE;
const obj_registry = registry_enabled ? registry.open(__ENV.REGISTRY_FILE) : undefined;
const duration = __ENV.DURATION;
const generator = datagen.generator(1024 * parseInt(__ENV.WRITE_OBJ_SIZE), __ENV.PAYLOAD_TYPE || "");
const scenarios = {};
const write_vu_count = parseInt(__ENV.WRITERS || '0');
if (write_vu_count < 1) {
throw 'number of VUs (env WRITERS) performing write operations should be greater than 0';
}
const write_multipart_vu_count = parseInt(__ENV.WRITERS_MULTIPART || '0');
if (write_multipart_vu_count < 1) {
throw 'number of parts (env WRITERS_MULTIPART) to upload in parallel should be greater than 0';
}
if (write_vu_count > 0) {
scenarios.write_multipart = {
executor: 'constant-vus',
vus: write_vu_count,
duration: `${duration}s`,
exec: 'obj_write_multipart',
gracefulStop: '5s',
};
}
export const options = {
scenarios,
setupTimeout: '5s',
};
export function setup() {
const total_vu_count = write_vu_count * write_multipart_vu_count;
console.log(`Pregenerated buckets: ${bucket_list.length}`);
console.log(`Writing VUs: ${write_vu_count}`);
console.log(`Writing multipart VUs: ${write_multipart_vu_count}`);
console.log(`Total VUs: ${total_vu_count}`);
}
export function teardown(data) {
if (obj_registry) {
obj_registry.close();
}
}
export function handleSummary(data) {
return {
'stdout': textSummary(data, {indent: ' ', enableColors: false}),
[summary_json]: JSON.stringify(data),
};
}
const write_multipart_part_size = 1024 * parseInt(__ENV.WRITE_OBJ_PART_SIZE || '0')
if (write_multipart_part_size < 5 * 1024 * 1024) {
throw 'part size (env WRITE_OBJ_PART_SIZE * 1024) must be greater than (5 MB)';
}
export function obj_write_multipart() {
if (__ENV.SLEEP_WRITE) {
sleep(__ENV.SLEEP_WRITE);
}
const key = __ENV.OBJ_NAME || uuidv4();
const bucket = bucket_list[Math.floor(Math.random() * bucket_list.length)];
const {payload, hash} = generator.genPayload(registry_enabled);
const resp = s3_client.multipart(bucket, key, write_multipart_part_size, write_multipart_vu_count, payload);
if (!resp.success) {
log.withFields({bucket: bucket, key: key}).error(resp.error);
return;
}
if (obj_registry) {
obj_registry.addObject("", "", bucket, key, hash);
}
}

View file

@ -36,14 +36,27 @@ const read_size = JSON.parse(open(__ENV.PREGEN_JSON)).obj_size;
const summary_json = __ENV.SUMMARY_JSON || "/tmp/summary.json"; const summary_json = __ENV.SUMMARY_JSON || "/tmp/summary.json";
const config_file = __ENV.CONFIG_FILE; const config_file = __ENV.CONFIG_FILE;
const s3_client = s3local.connect(config_file, { const config_dir = __ENV.CONFIG_DIR;
const s3_client = s3local.connect(config_file, config_dir, {
'debug_logger': __ENV.DEBUG_LOGGER || 'false', 'debug_logger': __ENV.DEBUG_LOGGER || 'false',
}, bucket_mapping()); }, bucket_mapping());
const log = logging.new().withField("config", config_file); const log = logging.new().withFields({"config_file": config_file,"config_dir": config_dir});
const registry_enabled = !!__ENV.REGISTRY_FILE; const registry_enabled = !!__ENV.REGISTRY_FILE;
const obj_registry = registry_enabled ? registry.open(__ENV.REGISTRY_FILE) : undefined; const obj_registry = registry_enabled ? registry.open(__ENV.REGISTRY_FILE) : undefined;
let obj_to_read_selector = undefined;
if (registry_enabled) {
obj_to_read_selector = registry.getSelector(
__ENV.REGISTRY_FILE,
"obj_to_read",
__ENV.SELECTION_SIZE ? parseInt(__ENV.SELECTION_SIZE) : 0,
{
status: "created",
}
)
}
const duration = __ENV.DURATION; const duration = __ENV.DURATION;
const generator = datagen.generator(1024 * parseInt(__ENV.WRITE_OBJ_SIZE)); const generator = datagen.generator(1024 * parseInt(__ENV.WRITE_OBJ_SIZE));
@ -86,12 +99,17 @@ export function setup() {
console.log(`Reading VUs: ${read_vu_count}`); console.log(`Reading VUs: ${read_vu_count}`);
console.log(`Writing VUs: ${write_vu_count}`); console.log(`Writing VUs: ${write_vu_count}`);
console.log(`Total VUs: ${total_vu_count}`); console.log(`Total VUs: ${total_vu_count}`);
const start_timestamp = Date.now()
console.log(`Load started at: ${Date(start_timestamp).toString()}`)
} }
export function teardown(data) { export function teardown(data) {
if (obj_registry) { if (obj_registry) {
obj_registry.close(); obj_registry.close();
} }
const end_timestamp = Date.now()
console.log(`Load finished at: ${Date(end_timestamp).toString()}`)
} }
export function handleSummary(data) { export function handleSummary(data) {
@ -118,6 +136,18 @@ export function obj_write() {
} }
export function obj_read() { export function obj_read() {
if(obj_to_read_selector) {
const obj = obj_to_read_selector.nextObject();
if (!obj) {
return;
}
const resp = s3_client.get(obj.s3_bucket, obj.s3_key)
if (!resp.success) {
log.withFields({bucket: obj.s3_bucket, key: obj.s3_key}).error(resp.error);
}
return
}
const obj = obj_list[Math.floor(Math.random() * obj_list.length)]; const obj = obj_list[Math.floor(Math.random() * obj_list.length)];
const resp = s3_client.get(obj.bucket, obj.object); const resp = s3_client.get(obj.bucket, obj.object);

View file

@ -1,6 +1,7 @@
import native from 'k6/x/frostfs/native'; import native from 'k6/x/frostfs/native';
import registry from 'k6/x/frostfs/registry'; import registry from 'k6/x/frostfs/registry';
import s3 from 'k6/x/frostfs/s3'; import s3 from 'k6/x/frostfs/s3';
import logging from 'k6/x/frostfs/logging';
import { sleep } from 'k6'; import { sleep } from 'k6';
import { Counter } from 'k6/metrics'; import { Counter } from 'k6/metrics';
import { textSummary } from './libs/k6-summary-0.0.2.js'; import { textSummary } from './libs/k6-summary-0.0.2.js';
@ -25,20 +26,29 @@ const obj_counters = {
invalid: new Counter('invalid_obj'), invalid: new Counter('invalid_obj'),
}; };
let log = logging.new();
// Connect to random gRPC endpoint // Connect to random gRPC endpoint
let grpc_client = undefined; let grpc_client = undefined;
if (__ENV.GRPC_ENDPOINTS) { if (__ENV.GRPC_ENDPOINTS) {
const grpcEndpoints = __ENV.GRPC_ENDPOINTS.split(','); const grpcEndpoints = __ENV.GRPC_ENDPOINTS.split(',');
const grpcEndpoint = grpcEndpoints[Math.floor(Math.random() * grpcEndpoints.length)]; const grpcEndpoint = grpcEndpoints[Math.floor(Math.random() * grpcEndpoints.length)];
grpc_client = native.connect(grpcEndpoint, '', __ENV.DIAL_TIMEOUT ? parseInt(__ENV.DIAL_TIMEOUT) : 0, __ENV.STREAM_TIMEOUT ? parseInt(__ENV.STREAM_TIMEOUT) : 0); log = log.withField("endpoint", grpcEndpoint);
grpc_client = native.connect(grpcEndpoint, '',
__ENV.DIAL_TIMEOUT ? parseInt(__ENV.DIAL_TIMEOUT) : 0,
__ENV.STREAM_TIMEOUT ? parseInt(__ENV.STREAM_TIMEOUT) : 0,
__ENV.PREPARE_LOCALLY ? __ENV.PREPARE_LOCALLY.toLowerCase() === "true" : false, '');
} }
// Connect to random S3 endpoint // Connect to random S3 endpoint
let s3_client = undefined; let s3_client = undefined;
if (__ENV.S3_ENDPOINTS) { if (__ENV.S3_ENDPOINTS) {
const no_verify_ssl = __ENV.NO_VERIFY_SSL || "true";
const connection_args = {no_verify_ssl: no_verify_ssl}
const s3_endpoints = __ENV.S3_ENDPOINTS.split(','); const s3_endpoints = __ENV.S3_ENDPOINTS.split(',');
const s3_endpoint = s3_endpoints[Math.floor(Math.random() * s3_endpoints.length)]; const s3_endpoint = s3_endpoints[Math.floor(Math.random() * s3_endpoints.length)];
s3_client = s3.connect(`http://${s3_endpoint}`); log = log.withField("endpoint", s3_endpoint);
s3_client = s3.connect(s3_endpoint, connection_args);
} }
// We will attempt to verify every object in "created" status. The scenario will execute // We will attempt to verify every object in "created" status. The scenario will execute
@ -93,30 +103,45 @@ export function handleSummary(data) {
} }
export function obj_verify() { export function obj_verify() {
if (obj_to_verify_count == 0) {
log.info("Nothing to verify");
return;
}
if (__ENV.SLEEP) { if (__ENV.SLEEP) {
sleep(__ENV.SLEEP); sleep(__ENV.SLEEP);
} }
const obj = obj_to_verify_selector.nextObject(); const obj = obj_to_verify_selector.nextObject();
if (!obj) { if (!obj) {
console.log("All objects have been verified"); log.info("All objects have been verified");
return; return;
} }
const obj_status = verify_object_with_retries(obj, 3); const obj_status = verify_object_with_retries(obj, 3);
obj_counters[obj_status].add(1); obj_counters[obj_status].add(1);
obj_registry.setObjectStatus(obj.id, obj_status); obj_registry.setObjectStatus(obj.id, obj.status, obj_status);
} }
function verify_object_with_retries(obj, attempts) { function verify_object_with_retries(obj, attempts) {
for (let i = 0; i < attempts; i++) { for (let i = 0; i < attempts; i++) {
let result; let result;
// Different name is required.
// ReferenceError: Cannot access a variable before initialization.
let lg = log;
if (obj.c_id && obj.o_id) { if (obj.c_id && obj.o_id) {
lg = lg.withFields({cid: obj.c_id, oid: obj.o_id});
result = grpc_client.verifyHash(obj.c_id, obj.o_id, obj.payload_hash); result = grpc_client.verifyHash(obj.c_id, obj.o_id, obj.payload_hash);
} else if (obj.s3_bucket && obj.s3_key) { } else if (obj.s3_bucket && obj.s3_key) {
lg = lg.withFields({bucket: obj.s3_bucket, key: obj.s3_key});
result = s3_client.verifyHash(obj.s3_bucket, obj.s3_key, obj.payload_hash); result = s3_client.verifyHash(obj.s3_bucket, obj.s3_key, obj.payload_hash);
} else { } else {
console.log(`Object id=${obj.id} cannot be verified with supported protocols`); lg.withFields({
cid: obj.c_id,
oid: obj.o_id,
bucket: obj.s3_bucket,
key: obj.s3_key
}).warn(`Object cannot be verified with supported protocols`);
return "skipped"; return "skipped";
} }
@ -127,7 +152,7 @@ function verify_object_with_retries(obj, attempts) {
} }
// Unless we explicitly saw that there was a hash mismatch, then we will retry after a delay // Unless we explicitly saw that there was a hash mismatch, then we will retry after a delay
console.log(`Verify error on ${obj.id}: ${result.error}. Object will be re-tried`); lg.error(`Verify error: ${result.error}. Object will be re-tried`);
sleep(__ENV.SLEEP); sleep(__ENV.SLEEP);
} }