Compare commits

...

38 commits

Author SHA1 Message Date
9c866ca6e1 Monitor VU cancel
Signed-off-by: Alex Vanin <a.vanin@yadro.com>
2023-10-31 14:46:28 +03:00
96e0e0188e Set background timeout
Signed-off-by: Alex Vanin <a.vanin@yadro.com>
2023-10-31 14:35:50 +03:00
48a95bc50b remove http from s3 multipart upload load scenario, protocol would be set in endpoint parameter
Signed-off-by: m.malygina <m.malygina@yadro.com>
2023-10-27 13:22:38 +03:00
26f5262b3d [#90] Support config folder together with config file
Signed-off-by: Anton Nikiforov <an.nikiforov@yadro.com>
2023-10-25 16:48:29 +03:00
95ce6f1162 [#96] .forgejo: Copy tests workflow from node
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2023-10-19 11:45:10 +03:00
27db0ac943 [#96] .forgejo: Fix DCO action
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2023-10-19 11:43:52 +03:00
e970e52eea [#96] .forgejo: Move workflows folder from .github
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2023-10-19 11:43:52 +03:00
1311051f60 [#99] Adding read age param to improve k6 runs stability
Signed-off-by: Andrey Berezin <a.berezin@yadro.com>
2023-10-02 20:08:43 +03:00
7db7751334 [#95] Allow to use wallet from config file for frostfs-cli
Signed-off-by: Anton Nikiforov <an.nikiforov@yadro.com>
2023-08-23 15:01:53 +03:00
bf884936a7 [#91] Improve logging for preset
Signed-off-by: Andrey Berezin <a.berezin@yadro.com>
2023-08-16 08:25:03 +00:00
108e761639 [#93] go.mod: Update go.k6.io/k6 package to patched version
* The update fixes bug with k6 build

Signed-off-by: Airat Arifullin <a.arifullin@yadro.com>
2023-08-11 18:52:14 +03:00
5b1793f248 [#30] report: log start and end time of load scenario
Signed-off-by: Airat Arifullin a.arifullin@yadro.com
2023-07-26 14:54:59 +00:00
4ef3795e04 [#84] preset: fix typo
Signed-off-by: Andrey Berezin <a.berezin@yadro.com>
2023-07-21 06:55:40 +00:00
704c0f06bc [#25] selector: Remove next object timeout
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2023-07-20 15:01:57 +03:00
0dc0ba1704 [#25] xk6: Read objects from registry for S3 tests
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2023-07-20 10:01:41 +03:00
3c26e7c917 [#25] xk6: Read objects from registry for gRPC tests
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2023-07-20 10:01:34 +03:00
50e2f55362 [#80] Add dump registry util
Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
2023-07-19 15:57:39 +03:00
77d3dd8d6e [#80] Support parallel multipart
Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
2023-07-19 10:44:44 +03:00
6182d47b43 [#81] remove schema from preset_s3 and k6 load s3 scenarios 2023-07-14 11:36:10 +00:00
ff6814e15d [#72] Add option --prepare-locally
Signed-off-by: Anton Nikiforov <an.nikiforov@yadro.com>
2023-07-07 13:16:54 +03:00
56235f5e90 [#72] Update dependencies
Signed-off-by: Anton Nikiforov <an.nikiforov@yadro.com>
2023-07-06 12:14:52 +03:00
f633f9a64a [#79] client: Remove bufSize field
Use constant value instead.

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2023-07-06 11:27:33 +03:00
42f1881580 [#79] object put: Add chunk size parameter
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2023-07-06 11:27:33 +03:00
4972bb928e [#79] xk6: Update node and SDK-Go
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2023-07-05 15:37:06 +03:00
a1f5738d2f [#77] Use writecache in local scenarios
Signed-off-by: Alejandro Lopez <a.lopez@yadro.com>
2023-06-30 12:50:42 +00:00
8e99d08aa4 [#12] Allow using multiple endpoints for presets
Signed-off-by: Andrey Berezin <a.berezin@yadro.com>
2023-06-28 20:21:43 +03:00
ba04c682cb [#13] Allow to use english text in the payload
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2023-06-27 11:14:05 +00:00
3525d5b4e3 [#15] go.mod: Tidy
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2023-06-27 11:14:05 +00:00
62d7b78131 [#73] preset: Allow to sleep before putting objects
For large networks block propagation may take some time.
If we do not wait enough, putting objects can fail for some containers.

Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2023-06-25 13:14:15 +03:00
153390cedb [#65] go.mod: Move to go1.19
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2023-05-24 11:41:02 +03:00
1025e80f11 [#65] go.mod: Update dependencies
See https://github.com/grafana/k6/pull/3075/, it is not yet in any
release.

Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2023-05-24 11:41:02 +03:00
6151005b4d [#67] Fail k6 if preset fails
Signed-off-by: Andrey Berezin <a.berezin@yadro.com>
2023-05-24 08:39:37 +00:00
925fe3ec83 [#66] scenarios: Exit if there is nothing to verify
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2023-05-23 16:02:12 +03:00
4aa9a359b5 [#64] registry: Delete object from the old bucket
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2023-05-23 16:02:03 +03:00
52ed0d6d88 [#63] scenarios: Unify logs in verify script
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2023-05-23 16:02:03 +03:00
4c2678077b [#57] preset: Use temporary file for payload
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2023-05-22 09:06:10 +00:00
5a1191a1ab [#20] Add pprof extension with support for cpu and mem
Signed-off-by: Alejandro Lopez <a.lopez@yadro.com>
2023-05-18 15:08:33 +00:00
5c26b4bad4 [#61] scenarios: Fix setObjectStatus parameters
We were providing new status instead of old and expected it to exist in
DB.

Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2023-05-18 09:43:41 +03:00
57 changed files with 1627 additions and 1612 deletions

View file

Before

Width:  |  Height:  |  Size: 5.5 KiB

After

Width:  |  Height:  |  Size: 5.5 KiB

View file

@ -0,0 +1,21 @@
name: DCO action
on: [pull_request]
jobs:
dco:
name: DCO
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
with:
fetch-depth: 0
- name: Setup Go
uses: actions/setup-go@v3
with:
go-version: '1.21'
- name: Run commit format checker
uses: https://git.frostfs.info/TrueCloudLab/dco-go@v2
with:
from: 'origin/${{ github.event.pull_request.base.ref }}'

View file

@ -0,0 +1,38 @@
name: Tests and linters
on: [pull_request]
jobs:
tests:
name: Tests
runs-on: ubuntu-latest
strategy:
matrix:
go_versions: [ '1.20', '1.21' ]
fail-fast: false
steps:
- uses: actions/checkout@v3
- name: Set up Go
uses: actions/setup-go@v3
with:
go-version: '${{ matrix.go_versions }}'
cache: true
- name: Run tests
run: make test
tests-race:
name: Tests with -race
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Set up Go
uses: actions/setup-go@v3
with:
go-version: '1.21'
cache: true
- name: Run tests
run: go test ./... -count=1 -race

View file

@ -1,21 +0,0 @@
name: DCO check
on:
pull_request:
branches:
- master
jobs:
commits_check_job:
runs-on: ubuntu-latest
name: Commits Check
steps:
- name: Get PR Commits
id: 'get-pr-commits'
uses: tim-actions/get-pr-commits@master
with:
token: ${{ secrets.GITHUB_TOKEN }}
- name: DCO Check
uses: tim-actions/dco@master
with:
commits: ${{ steps.get-pr-commits.outputs.commits }}

View file

@ -1,34 +0,0 @@
name: Tests
on:
pull_request:
branches:
- master
types: [opened, synchronize]
paths-ignore:
- '**/*.md'
workflow_dispatch:
jobs:
lint:
name: Lint
runs-on: ubuntu-20.04
steps:
- name: Check out code
uses: actions/checkout@v2
- name: golangci-lint
uses: golangci/golangci-lint-action@v2
with:
version: latest
args: --timeout=2m
tests:
name: Tests
runs-on: ubuntu-20.04
strategy:
matrix:
go_versions: [ '1.17', '1.18', '1.19' ]
fail-fast: false
steps:
- uses: actions/checkout@v3

1
.gitignore vendored
View file

@ -1,3 +1,4 @@
k6
*.bolt
presets
bin

94
Makefile Normal file
View file

@ -0,0 +1,94 @@
#!/usr/bin/make -f
# Common variables
REPO ?= $(shell go list -m)
VERSION ?= $(shell git describe --tags --dirty --match "v*" --always --abbrev=8 2>/dev/null || cat VERSION 2>/dev/null || echo "develop")
GO_VERSION ?= 1.19
LINT_VERSION ?= 1.49.0
BINDIR = bin
# Binaries to build
CMDS = $(addprefix frostfs-, $(notdir $(wildcard cmd/*)))
BINS = $(addprefix $(BINDIR)/, $(CMDS))
.PHONY: all $(BINS) $(BINDIR) dep docker/ test cover format lint docker/lint pre-commit unpre-commit version clean
# Make all binaries
all: $(BINS)
$(BINS): $(BINDIR) dep
@echo "⇒ Build $@"
CGO_ENABLED=0 \
go build -v -trimpath \
-ldflags "-X $(REPO)/internal/version.Version=$(VERSION)" \
-o $@ ./cmd/$(subst frostfs-,,$(notdir $@))
$(BINDIR):
@echo "⇒ Ensure dir: $@"
@mkdir -p $@
# Pull go dependencies
dep:
@printf "⇒ Download requirements: "
@CGO_ENABLED=0 \
go mod download && echo OK
@printf "⇒ Tidy requirements: "
@CGO_ENABLED=0 \
go mod tidy -v && echo OK
# Run `make %` in Golang container, for more information run `make help.docker/%`
docker/%:
$(if $(filter $*,all $(BINS)), \
@echo "=> Running 'make $*' in clean Docker environment" && \
docker run --rm -t \
-v `pwd`:/src \
-w /src \
-u `stat -c "%u:%g" .` \
--env HOME=/src \
golang:$(GO_VERSION) make $*,\
@echo "supported docker targets: all $(BINS) lint")
# Run tests
test:
@go test ./... -cover
# Run tests with race detection and produce coverage output
cover:
@go test -v -race ./... -coverprofile=coverage.txt -covermode=atomic
@go tool cover -html=coverage.txt -o coverage.html
# Reformat code
format:
@echo "⇒ Processing gofmt check"
@gofmt -s -w ./
# Run linters
lint:
@golangci-lint --timeout=5m run
# Run linters in Docker
docker/lint:
docker run --rm -it \
-v `pwd`:/src \
-u `stat -c "%u:%g" .` \
--env HOME=/src \
golangci/golangci-lint:v$(LINT_VERSION) bash -c 'cd /src/ && make lint'
# Activate pre-commit hooks
pre-commit:
pre-commit install -t pre-commit -t commit-msg
# Deactivate pre-commit hooks
unpre-commit:
pre-commit uninstall -t pre-commit -t commit-msg
# Show current version
version:
@echo $(VERSION)
# Clean up files
clean:
rm -rf .cache
rm -rf $(BINDIR)
include help.mk

View file

@ -47,10 +47,11 @@ Create native client with `connect` method. Arguments:
- hex encoded private key (empty value produces random key)
- dial timeout in seconds (0 for the default value)
- stream timeout in seconds (0 for the default value)
- generate object header on the client side (for big object - split locally too)
```js
import native from 'k6/x/frostfs/native';
const frostfs_cli = native.connect("s01.frostfs.devenv:8080", "", 0, 0)
const frostfs_cli = native.connect("s01.frostfs.devenv:8080", "", 0, 0, false)
```
### Methods
@ -73,12 +74,13 @@ const frostfs_cli = native.connect("s01.frostfs.devenv:8080", "", 0, 0)
Create a local client with `connect` method. Arguments:
- local path to frostfs storage node configuration file
- local path to frostfs storage node configuration directory
- hex encoded private key (empty value produces random key)
- whether to use the debug logger (warning: very verbose)
```js
import local from 'k6/x/frostfs/local';
const local_client = local.connect("/path/to/config.yaml", "", false)
const local_client = local.connect("/path/to/config.yaml", "/path/to/config/dir", "", false)
```
### Methods
@ -98,13 +100,13 @@ Credentials are taken from default AWS configuration files and ENVs.
```js
import s3 from 'k6/x/frostfs/s3';
const s3_cli = s3.connect("http://s3.frostfs.devenv:8080")
const s3_cli = s3.connect("https://s3.frostfs.devenv:8080")
```
You can also provide additional options:
```js
import s3 from 'k6/x/frostfs/s3';
const s3_cli = s3.connect("http://s3.frostfs.devenv:8080", {'no_verify_ssl': 'true', 'timeout': '60s'})
const s3_cli = s3.connect("https://s3.frostfs.devenv:8080", {'no_verify_ssl': 'true', 'timeout': '60s'})
```
* `no_verify_ss` - Bool. If `true` - skip verifying the s3 certificate chain and host name (useful if s3 uses self-signed certificates)
@ -122,6 +124,7 @@ const s3_cli = s3.connect("http://s3.frostfs.devenv:8080", {'no_verify_ssl': 'tr
Create local s3 client with `connect` method. Arguments:
- local path to frostfs storage node configuration file
- local path to frostfs storage node configuration directory
- parameter map with the following options:
* `hex_key`: private key to use as a hexadecimal string. A random one is created if none is provided.
* `node_position`: position of this node in the node array if loading multiple nodes independently (default: 0).
@ -134,7 +137,7 @@ Create local s3 client with `connect` method. Arguments:
import local from 'k6/x/frostfs/local';
const params = {'node_position': 1, 'node_count': 3}
const bucketMapping = {'mytestbucket': 'GBQDDUM1hdodXmiRHV57EUkFWJzuntsG8BG15wFSwam6'}
const local_client = local.connect("/path/to/config.yaml", params, bucketMapping)
const local_client = local.connect("/path/to/config.yaml", "/path/to/config/dir", params, bucketMapping)
```
### Methods
@ -147,6 +150,41 @@ const local_client = local.connect("/path/to/config.yaml", params, bucketMapping
See native protocol and s3 test suite examples in [examples](./examples) dir.
# Command line utils
To build all command line utils just run:
```shell
$ make
```
All binaries will be in `bin` directory.
## Export registry db
You can export registry bolt db to json file, that can be used as pregen for scenarios (see [docs](./scenarios/run_scenarios.md)).
To do this use `frostfs-xk6-registry-exporter`, available flags can be seen in help:
```shell
$ ./bin/frostfs-xk6-registry-exporter -h
Registry exporter for xk6
Usage:
registry-exporter [flags]
Examples:
registry-exporter registry.bolt
registry-exporter --status created --out out.json registry.bolt
Flags:
--age int Object age
--format string Output format (default "json")
-h, --help help for registry-exporter
--out string Path to output file (default "dumped-registry.json")
--status string Object status (default "created")
-v, --version version for registry-exporter
```
# License
- [GNU General Public License v3.0](LICENSE)

View file

@ -0,0 +1,18 @@
package main
import (
"context"
"os"
"os/signal"
"syscall"
)
func main() {
ctx, _ := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM, syscall.SIGHUP)
if cmd, err := rootCmd.ExecuteContextC(ctx); err != nil {
cmd.PrintErrln("Error:", err.Error())
cmd.PrintErrf("Run '%v --help' for usage.\n", cmd.CommandPath())
os.Exit(1)
}
}

View file

@ -0,0 +1,89 @@
package main
import (
"fmt"
"runtime"
"git.frostfs.info/TrueCloudLab/xk6-frostfs/internal/registry"
"git.frostfs.info/TrueCloudLab/xk6-frostfs/internal/version"
"github.com/spf13/cobra"
)
var rootCmd = &cobra.Command{
Use: "registry-exporter",
Version: version.Version,
Short: "Registry exporter",
Long: "Registry exporter for xk6",
Example: `registry-exporter registry.bolt
registry-exporter --status created --out out.json registry.bolt`,
SilenceErrors: true,
SilenceUsage: true,
RunE: rootCmdRun,
}
const (
outFlag = "out"
formatFlag = "format"
statusFlag = "status"
ageFlag = "age"
)
const (
defaultOutPath = "dumped-registry.json"
jsonFormat = "json"
createdStatus = "created"
)
func init() {
rootCmd.Flags().String(outFlag, defaultOutPath, "Path to output file")
rootCmd.Flags().String(formatFlag, jsonFormat, "Output format")
rootCmd.Flags().String(statusFlag, createdStatus, "Object status")
rootCmd.Flags().Int(ageFlag, 0, "Object age")
cobra.AddTemplateFunc("runtimeVersion", runtime.Version)
rootCmd.SetVersionTemplate(`FrostFS xk6 Registry Exporter
{{printf "Version: %s" .Version }}
GoVersion: {{ runtimeVersion }}
`)
}
func rootCmdRun(cmd *cobra.Command, args []string) error {
if len(args) != 1 {
return fmt.Errorf("expected exacly one non-flag argumet: path to the registry, got: %s", args)
}
format, err := cmd.Flags().GetString(formatFlag)
if err != nil {
return fmt.Errorf("get '%s' flag: %w", formatFlag, err)
}
if format != jsonFormat {
return fmt.Errorf("unknown format '%s', only '%s' is supported", format, jsonFormat)
}
out, err := cmd.Flags().GetString(outFlag)
if err != nil {
return fmt.Errorf("get '%s' flag: %w", outFlag, err)
}
status, err := cmd.Flags().GetString(statusFlag)
if err != nil {
return fmt.Errorf("get '%s' flag: %w", statusFlag, err)
}
age, err := cmd.Flags().GetInt(ageFlag)
if err != nil {
return fmt.Errorf("get '%s' flag: %w", ageFlag, err)
}
objRegistry := registry.NewObjRegistry(cmd.Context(), args[0])
objSelector := registry.NewObjSelector(objRegistry, 0, &registry.ObjFilter{
Status: status,
Age: age,
})
objExporter := registry.NewObjExporter(objSelector)
cmd.Println("Writing result file:", out)
return objExporter.ExportJSONPreGen(out)
}

View file

@ -2,7 +2,7 @@ import local from 'k6/x/frostfs/local';
import { uuidv4 } from '../scenarios/libs/k6-utils-1.4.0.js';
const payload = open('../go.sum', 'b');
const local_cli = local.connect("/path/to/config.yaml", "", false)
const local_cli = local.connect("/path/to/config.yaml", "/path/to/config/dir", "", false)
export const options = {
stages: [

View file

@ -3,7 +3,7 @@ import { fail } from "k6";
import { uuidv4 } from '../scenarios/libs/k6-utils-1.4.0.js';
const payload = open('../go.sum', 'b');
const frostfs_cli = native.connect("s01.frostfs.devenv:8080", "1dd37fba80fec4e6a6f13fd708d8dcb3b29def768017052f6c930fa1c5d90bbb", 0, 0)
const frostfs_cli = native.connect("s01.frostfs.devenv:8080", "1dd37fba80fec4e6a6f13fd708d8dcb3b29def768017052f6c930fa1c5d90bbb", 0, 0, false)
export const options = {
stages: [

View file

@ -3,7 +3,7 @@ import { uuidv4 } from '../scenarios/libs/k6-utils-1.4.0.js';
const payload = open('../go.sum', 'b');
const container = "AjSxSNNXbJUDPqqKYm1VbFVDGCakbpUNH8aGjPmGAH3B"
const frostfs_cli = native.connect("s01.frostfs.devenv:8080", "", 0, 0)
const frostfs_cli = native.connect("s01.frostfs.devenv:8080", "", 0, 0, false)
const frostfs_obj = frostfs_cli.onsite(container, payload)
export const options = {

View file

@ -3,7 +3,7 @@ import { uuidv4 } from '../scenarios/libs/k6-utils-1.4.0.js';
const bucket = "testbucket"
const payload = open('../go.sum', 'b');
const s3local_cli = s3local.connect("path/to/storage/config.yml", {}, {
const s3local_cli = s3local.connect("path/to/storage/config.yml", "path/to/storage/config/dir", {}, {
'testbucket': 'GBQDDUM1hdodXmiRHV57EUkFWJzuntsG8BG15wFSwam6',
});

View file

@ -6,6 +6,7 @@ import (
_ "git.frostfs.info/TrueCloudLab/xk6-frostfs/internal/local"
_ "git.frostfs.info/TrueCloudLab/xk6-frostfs/internal/logging"
_ "git.frostfs.info/TrueCloudLab/xk6-frostfs/internal/native"
_ "git.frostfs.info/TrueCloudLab/xk6-frostfs/internal/profile"
_ "git.frostfs.info/TrueCloudLab/xk6-frostfs/internal/registry"
_ "git.frostfs.info/TrueCloudLab/xk6-frostfs/internal/s3"
_ "git.frostfs.info/TrueCloudLab/xk6-frostfs/internal/s3local"

159
go.mod
View file

@ -1,107 +1,130 @@
module git.frostfs.info/TrueCloudLab/xk6-frostfs
go 1.17
go 1.19
require (
git.frostfs.info/TrueCloudLab/frostfs-node v0.22.2-0.20230313113918-4e244686cf03
git.frostfs.info/TrueCloudLab/frostfs-s3-gw v0.24.1-0.20230403110435-01afa1cae425
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20230329125804-552219b8e130
git.frostfs.info/TrueCloudLab/frostfs-node v0.22.2-0.20230704155826-b520a3049e6f
git.frostfs.info/TrueCloudLab/frostfs-s3-gw v0.27.0-rc.2
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20230705125206-769f6eec0565
git.frostfs.info/TrueCloudLab/tzhash v1.8.0
github.com/aws/aws-sdk-go-v2 v1.16.3
github.com/aws/aws-sdk-go-v2/config v1.15.5
github.com/aws/aws-sdk-go-v2/service/s3 v1.26.9
github.com/dop251/goja v0.0.0-20220405120441-9037c2b61cbf
github.com/aws/aws-sdk-go-v2 v1.19.0
github.com/aws/aws-sdk-go-v2/config v1.18.28
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.11.72
github.com/aws/aws-sdk-go-v2/service/s3 v1.37.0
github.com/dop251/goja v0.0.0-20230626124041-ba8a63e79201
github.com/go-loremipsum/loremipsum v1.1.3
github.com/google/uuid v1.3.0
github.com/joho/godotenv v1.5.1
github.com/nspcc-dev/neo-go v0.101.0
github.com/panjf2000/ants/v2 v2.5.0
github.com/sirupsen/logrus v1.8.1
github.com/stretchr/testify v1.8.1
go.etcd.io/bbolt v1.3.6
go.k6.io/k6 v0.38.2
github.com/nspcc-dev/neo-go v0.101.2
github.com/panjf2000/ants/v2 v2.8.0
github.com/sirupsen/logrus v1.9.3
github.com/spf13/cobra v1.7.0
github.com/stretchr/testify v1.8.4
go.etcd.io/bbolt v1.3.7
go.k6.io/k6 v0.45.1
go.uber.org/zap v1.24.0
golang.org/x/sys v0.10.0
)
require (
git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.11.2-0.20230315095236-9dc375346703 // indirect
git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.15.1-0.20230602142716-68021b910acb // indirect
git.frostfs.info/TrueCloudLab/frostfs-crypto v0.6.0 // indirect
git.frostfs.info/TrueCloudLab/hrw v1.2.0 // indirect
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20230531082742-c97d21411eb6 // indirect
git.frostfs.info/TrueCloudLab/hrw v1.2.1 // indirect
git.frostfs.info/TrueCloudLab/rfc6979 v0.4.0 // indirect
github.com/antlr/antlr4/runtime/Go/antlr/v4 v4.0.0-20221202181307-76fa05c21b12 // indirect
github.com/aws/aws-sdk-go v1.44.6 // indirect
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.4.1 // indirect
github.com/aws/aws-sdk-go-v2/credentials v1.12.0 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.12.4 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.10 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.4 // indirect
github.com/aws/aws-sdk-go-v2/internal/ini v1.3.11 // indirect
github.com/aws/aws-sdk-go-v2/internal/v4a v1.0.1 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.9.1 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.1.5 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.4 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.13.4 // indirect
github.com/aws/aws-sdk-go-v2/service/sso v1.11.4 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.16.4 // indirect
github.com/aws/smithy-go v1.11.2 // indirect
github.com/antlr4-go/antlr/v4 v4.13.0 // indirect
github.com/aws/aws-sdk-go v1.44.296 // indirect
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.4.10 // indirect
github.com/aws/aws-sdk-go-v2/credentials v1.13.27 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.13.5 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.35 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.29 // indirect
github.com/aws/aws-sdk-go-v2/internal/ini v1.3.36 // indirect
github.com/aws/aws-sdk-go-v2/internal/v4a v1.0.27 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.9.11 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.1.30 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.29 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.14.4 // indirect
github.com/aws/aws-sdk-go-v2/service/sso v1.12.13 // indirect
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.14.13 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.19.3 // indirect
github.com/aws/smithy-go v1.13.5 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/bluele/gcache v0.0.2 // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/cenkalti/backoff/v4 v4.2.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.1 // indirect
github.com/dlclark/regexp2 v1.4.1-0.20201116162257-a2a8dda75c91 // indirect
github.com/fatih/color v1.13.0 // indirect
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 // indirect
github.com/dlclark/regexp2 v1.10.0 // indirect
github.com/fatih/color v1.15.0 // indirect
github.com/fsnotify/fsnotify v1.6.0 // indirect
github.com/go-logr/logr v1.2.4 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-sourcemap/sourcemap v2.1.4-0.20211119122758-180fcef48034+incompatible // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/google/pprof v0.0.0-20230705174524-200ffdc848b8 // indirect
github.com/gorilla/mux v1.8.0 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0 // indirect
github.com/hashicorp/golang-lru v0.6.0 // indirect
github.com/hashicorp/golang-lru/v2 v2.0.1 // indirect
github.com/hashicorp/golang-lru/v2 v2.0.4 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/klauspost/compress v1.15.13 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/klauspost/compress v1.16.7 // indirect
github.com/magiconair/properties v1.8.7 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
github.com/mattn/go-colorable v0.1.12 // indirect
github.com/mattn/go-isatty v0.0.14 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.19 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
github.com/minio/highwayhash v1.0.2 // indirect
github.com/minio/sio v0.3.0 // indirect
github.com/minio/sio v0.3.1 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/mr-tron/base58 v1.2.0 // indirect
github.com/mstoykov/atlas v0.0.0-20220811071828-388f114305dd // indirect
github.com/nats-io/jwt/v2 v2.4.1 // indirect
github.com/nats-io/nats.go v1.22.1 // indirect
github.com/nats-io/nats.go v1.27.1 // indirect
github.com/nats-io/nkeys v0.4.4 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
github.com/nspcc-dev/rfc6979 v0.2.0 // indirect
github.com/nxadm/tail v1.4.8 // indirect
github.com/oxtoacart/bpool v0.0.0-20190530202638-03653db5a59c // indirect
github.com/pelletier/go-toml/v2 v2.0.6 // indirect
github.com/onsi/ginkgo v1.16.5 // indirect
github.com/onsi/gomega v1.20.2 // indirect
github.com/pelletier/go-toml/v2 v2.0.8 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_golang v1.13.0 // indirect
github.com/prometheus/client_model v0.2.0 // indirect
github.com/prometheus/common v0.37.0 // indirect
github.com/prometheus/procfs v0.8.0 // indirect
github.com/prometheus/client_golang v1.16.0 // indirect
github.com/prometheus/client_model v0.4.0 // indirect
github.com/prometheus/common v0.44.0 // indirect
github.com/prometheus/procfs v0.11.0 // indirect
github.com/serenize/snaker v0.0.0-20201027110005-a7ad2135616e // indirect
github.com/spaolacci/murmur3 v1.1.0 // indirect
github.com/spf13/afero v1.9.3 // indirect
github.com/spf13/cast v1.5.0 // indirect
github.com/spf13/afero v1.9.5 // indirect
github.com/spf13/cast v1.5.1 // indirect
github.com/spf13/jwalterweatherman v1.1.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/spf13/viper v1.15.0 // indirect
github.com/spf13/viper v1.16.0 // indirect
github.com/subosito/gotenv v1.4.2 // indirect
go.uber.org/atomic v1.10.0 // indirect
go.uber.org/multierr v1.9.0 // indirect
golang.org/x/crypto v0.7.0 // indirect
golang.org/x/exp v0.0.0-20221227203929-1b447090c38c // indirect
golang.org/x/net v0.8.0 // indirect
golang.org/x/sync v0.1.0 // indirect
golang.org/x/sys v0.6.0 // indirect
golang.org/x/text v0.8.0 // indirect
golang.org/x/time v0.1.0 // indirect
google.golang.org/genproto v0.0.0-20221227171554-f9683d7f8bef // indirect
google.golang.org/grpc v1.52.0 // indirect
google.golang.org/protobuf v1.28.1 // indirect
gopkg.in/guregu/null.v3 v3.3.0 // indirect
github.com/twmb/murmur3 v1.1.8 // indirect
go.opentelemetry.io/otel v1.16.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.16.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.16.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.16.0 // indirect
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.16.0 // indirect
go.opentelemetry.io/otel/metric v1.16.0 // indirect
go.opentelemetry.io/otel/sdk v1.16.0 // indirect
go.opentelemetry.io/otel/trace v1.16.0 // indirect
go.opentelemetry.io/proto/otlp v0.20.0 // indirect
go.uber.org/atomic v1.11.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/crypto v0.11.0 // indirect
golang.org/x/exp v0.0.0-20230626212559-97b1e661b5df // indirect
golang.org/x/net v0.12.0 // indirect
golang.org/x/sync v0.3.0 // indirect
golang.org/x/text v0.11.0 // indirect
golang.org/x/time v0.3.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20230629202037-9506855d4529 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20230629202037-9506855d4529 // indirect
google.golang.org/grpc v1.56.1 // indirect
google.golang.org/protobuf v1.31.0 // indirect
gopkg.in/guregu/null.v3 v3.5.0 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

1397
go.sum

File diff suppressed because it is too large Load diff

22
help.mk Normal file
View file

@ -0,0 +1,22 @@
.PHONY: help
# Show this help prompt
help:
@echo ' Usage:'
@echo ''
@echo ' make <target>'
@echo ''
@echo ' Targets:'
@echo ''
@awk '/^#/{ comment = substr($$0,3) } comment && /^[a-zA-Z][a-zA-Z0-9.%_/-]+ ?:/{ print " ", $$1, comment }' $(MAKEFILE_LIST) | column -t -s ':' | grep -v 'IGNORE' | sort | uniq
# Show help for docker/% IGNORE
help.docker/%:
$(eval TARGETS:=$(notdir all lint) ${BINS})
@echo ' Usage:'
@echo ''
@echo ' make docker/% -- Run `make %` in Golang container'
@echo ''
@echo ' Supported docker targets:'
@echo ''
@$(foreach bin, $(TARGETS), echo ' ' $(bin);)

View file

@ -1,6 +1,8 @@
package datagen
import (
"strings"
"go.k6.io/k6/js/modules"
)
@ -36,7 +38,7 @@ func (d *Datagen) Exports() modules.Exports {
return modules.Exports{Default: d}
}
func (d *Datagen) Generator(size int) *Generator {
g := NewGenerator(d.vu, size)
func (d *Datagen) Generator(size int, typ string) *Generator {
g := NewGenerator(d.vu, size, strings.ToLower(typ))
return &g
}

View file

@ -1,12 +1,14 @@
package datagen
import (
"bytes"
"crypto/sha256"
"encoding/hex"
"math/rand"
"time"
"github.com/dop251/goja"
"github.com/go-loremipsum/loremipsum"
"go.k6.io/k6/js/modules"
)
@ -24,6 +26,7 @@ type (
size int
rand *rand.Rand
buf []byte
typ string
offset int
}
@ -36,19 +39,53 @@ type (
// TailSize specifies number of extra random bytes in the buffer tail.
const TailSize = 1024
func NewGenerator(vu modules.VU, size int) Generator {
var payloadTypes = []string{
"text",
"random",
"",
}
func NewGenerator(vu modules.VU, size int, typ string) Generator {
if size <= 0 {
panic("size should be positive")
}
var found bool
for i := range payloadTypes {
if payloadTypes[i] == typ {
found = true
break
}
}
if !found {
vu.InitEnv().Logger.Info("Unknown payload type '%s', random will be used.", typ)
}
r := rand.New(rand.NewSource(time.Now().UnixNano()))
buf := make([]byte, size+TailSize)
r.Read(buf)
return Generator{
g := Generator{
vu: vu,
size: size,
rand: r,
buf: buf,
typ: typ,
}
g.fillBuffer()
return g
}
func (g *Generator) fillBuffer() {
switch g.typ {
case "text":
li := loremipsum.New()
b := bytes.NewBuffer(g.buf[:0])
for b.Len() < g.size+TailSize {
b.WriteString(li.Paragraph())
b.WriteRune('\n')
}
g.buf = b.Bytes()
default:
rand.Read(g.buf) // Per docs, err is always nil here
}
}
@ -66,9 +103,9 @@ func (g *Generator) GenPayload(calcHash bool) GenPayloadResponse {
}
func (g *Generator) nextSlice() []byte {
if g.offset >= TailSize {
if g.offset+g.size >= len(g.buf) {
g.offset = 0
g.rand.Read(g.buf) // Per docs, err is always nil here
g.fillBuffer()
}
result := g.buf[g.offset : g.offset+g.size]

View file

@ -16,25 +16,25 @@ func TestGenerator(t *testing.T) {
t.Run("fails on negative size", func(t *testing.T) {
require.Panics(t, func() {
_ = NewGenerator(vu, -1)
_ = NewGenerator(vu, -1, "")
})
})
t.Run("fails on zero size", func(t *testing.T) {
require.Panics(t, func() {
_ = NewGenerator(vu, 0)
_ = NewGenerator(vu, 0, "")
})
})
t.Run("creates slice of specified size", func(t *testing.T) {
size := 10
g := NewGenerator(vu, size)
g := NewGenerator(vu, size, "")
slice := g.nextSlice()
require.Len(t, slice, size)
})
t.Run("creates a different slice on each call", func(t *testing.T) {
g := NewGenerator(vu, 1000)
g := NewGenerator(vu, 1000, "")
slice1 := g.nextSlice()
slice2 := g.nextSlice()
// Each slice should be unique (assuming that 1000 random bytes will never coincide
@ -43,7 +43,7 @@ func TestGenerator(t *testing.T) {
})
t.Run("keeps generating slices after consuming entire tail", func(t *testing.T) {
g := NewGenerator(vu, 1000)
g := NewGenerator(vu, 1000, "")
initialSlice := g.nextSlice()
for i := 0; i < TailSize; i++ {
g.nextSlice()

View file

@ -7,9 +7,11 @@ import (
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"git.frostfs.info/TrueCloudLab/xk6-frostfs/internal/local/rawclient"
"github.com/dop251/goja"
"go.k6.io/k6/js/modules"
)
type Client struct {
vu modules.VU
rc *rawclient.RawClient
}
@ -30,7 +32,7 @@ type (
)
func (c *Client) Put(containerID string, headers map[string]string, payload goja.ArrayBuffer) PutResponse {
id, err := c.rc.Put(mustParseContainerID(containerID), nil, headers, payload.Bytes())
id, err := c.rc.Put(c.vu.Context(), mustParseContainerID(containerID), nil, headers, payload.Bytes())
if err != nil {
return PutResponse{Error: err.Error()}
}
@ -41,14 +43,14 @@ func (c *Client) Put(containerID string, headers map[string]string, payload goja
}
func (c *Client) Get(containerID, objectID string) GetResponse {
if _, err := c.rc.Get(mustParseContainerID(containerID), mustParseObjectID(objectID)); err != nil {
if _, err := c.rc.Get(c.vu.Context(), mustParseContainerID(containerID), mustParseObjectID(objectID)); err != nil {
return GetResponse{Error: err.Error()}
}
return GetResponse{Success: true}
}
func (c *Client) Delete(containerID, objectID string) DeleteResponse {
if err := c.rc.Delete(mustParseContainerID(containerID), mustParseObjectID(objectID)); err != nil {
if err := c.rc.Delete(c.vu.Context(), mustParseContainerID(containerID), mustParseObjectID(objectID)); err != nil {
return DeleteResponse{Error: err.Error()}
}
return DeleteResponse{Success: true}

View file

@ -40,6 +40,8 @@ type RootModule struct {
mu sync.Mutex
// configFile is the name of the configuration file used during one test.
configFile string
// configDir is the name of the configuration directory used during one test.
configDir string
// ng is the engine instance used during one test, corresponding to the configFile. Each VU
// gets the same engine instance.
ng *engine.StorageEngine
@ -48,7 +50,7 @@ type RootModule struct {
// Local represents an instance of the module for every VU.
type Local struct {
vu modules.VU
ResolveEngine func(context.Context, string, bool) (*engine.StorageEngine, error)
ResolveEngine func(context.Context, string, string, bool) (*engine.StorageEngine, error)
}
// Ensure the interfaces are implemented correctly.
@ -71,7 +73,7 @@ func (r *RootModule) NewModuleInstance(vu modules.VU) modules.Instance {
return NewLocalModuleInstance(vu, r.GetOrCreateEngine)
}
func NewLocalModuleInstance(vu modules.VU, resolveEngine func(context.Context, string, bool) (*engine.StorageEngine, error)) *Local {
func NewLocalModuleInstance(vu modules.VU, resolveEngine func(context.Context, string, string, bool) (*engine.StorageEngine, error)) *Local {
return &Local{
vu: vu,
ResolveEngine: resolveEngine,
@ -100,21 +102,22 @@ func checkResourceLimits() error {
return nil
}
// GetOrCreateEngine returns the current engine instance for the given configuration file,
// GetOrCreateEngine returns the current engine instance for the given configuration file or directory,
// creating a new one if none exists. Note that the identity of configuration files is their
// file name for the purposes of test runs.
func (r *RootModule) GetOrCreateEngine(ctx context.Context, configFile string, debug bool) (*engine.StorageEngine, error) {
func (r *RootModule) GetOrCreateEngine(ctx context.Context, configFile string, configDir string, debug bool) (*engine.StorageEngine, error) {
r.mu.Lock()
defer r.mu.Unlock()
if len(configFile) == 0 {
return nil, errors.New("configFile cannot be empty")
if len(configFile) == 0 && len(configDir) == 0 {
return nil, errors.New("provide configFile or configDir")
}
// Create and initialize engine for the given configFile if it doesn't exist already
if r.ng == nil {
r.configFile = configFile
appCfg := config.New(config.Prm{}, config.WithConfigFile(configFile))
r.configDir = configDir
appCfg := config.New(configFile, configDir, "")
ngOpts, shardOpts, err := storageEngineOptionsFromConfig(appCfg, debug)
if err != nil {
return nil, fmt.Errorf("creating engine options from config: %v", err)
@ -131,11 +134,15 @@ func (r *RootModule) GetOrCreateEngine(ctx context.Context, configFile string, d
if err := r.ng.Open(); err != nil {
return nil, fmt.Errorf("opening engine: %v", err)
}
if err := r.ng.Init(); err != nil {
if err := r.ng.Init(ctx); err != nil {
return nil, fmt.Errorf("initializing engine: %v", err)
}
} else if configFile != r.configFile {
return nil, fmt.Errorf("GetOrCreateEngine called with mismatching configFile after engine was initialized: got %q, want %q", configFile, r.configFile)
return nil, fmt.Errorf("GetOrCreateEngine called with mismatching configFile after engine was "+
"initialized: got %q, want %q", configFile, r.configFile)
} else if configDir != r.configDir {
return nil, fmt.Errorf("GetOrCreateEngine called with mismatching configDir after engine was "+
"initialized: got %q, want %q", configDir, r.configDir)
}
return r.ng, nil
@ -149,10 +156,10 @@ func (s *Local) Exports() modules.Exports {
func (s *Local) VU() modules.VU { return s.vu }
func (s *Local) Connect(configFile, hexKey string, debug bool) (*Client, error) {
ng, err := s.ResolveEngine(s.VU().Context(), configFile, debug)
func (s *Local) Connect(configFile, configDir, hexKey string, debug bool) (*Client, error) {
ng, err := s.ResolveEngine(s.VU().Context(), configFile, configDir, debug)
if err != nil {
return nil, fmt.Errorf("connecting to engine for config %q: %v", configFile, err)
return nil, fmt.Errorf("connecting to engine for config - file %q dir %q: %v", configFile, configDir, err)
}
key, err := ParseOrCreateKey(hexKey)
@ -204,7 +211,7 @@ func (s *Local) Connect(configFile, hexKey string, debug bool) (*Client, error)
}
}),
)
return &Client{rc}, nil
return &Client{vu: s.vu, rc: rc}, nil
}
type epochState struct{}
@ -292,17 +299,20 @@ func storageEngineOptionsFromConfig(c *config.Config, debug bool) ([]engine.Opti
// write cache
if wc := sc.WriteCache(); wc.Enabled() {
opts = append(opts, shard.WithWriteCacheOptions(
writecache.WithPath(wc.Path()),
writecache.WithMaxBatchSize(wc.BoltDB().MaxBatchSize()),
writecache.WithMaxBatchDelay(wc.BoltDB().MaxBatchDelay()),
writecache.WithMaxObjectSize(wc.MaxObjectSize()),
writecache.WithSmallObjectSize(wc.SmallObjectSize()),
writecache.WithFlushWorkersCount(wc.WorkersNumber()),
writecache.WithMaxCacheSize(wc.SizeLimit()),
writecache.WithNoSync(wc.NoSync()),
writecache.WithLogger(&logger.Logger{Logger: log}),
))
opts = append(opts,
shard.WithWriteCache(true),
shard.WithWriteCacheOptions(
writecache.WithPath(wc.Path()),
writecache.WithMaxBatchSize(wc.BoltDB().MaxBatchSize()),
writecache.WithMaxBatchDelay(wc.BoltDB().MaxBatchDelay()),
writecache.WithMaxObjectSize(wc.MaxObjectSize()),
writecache.WithSmallObjectSize(wc.SmallObjectSize()),
writecache.WithFlushWorkersCount(wc.WorkersNumber()),
writecache.WithMaxCacheSize(wc.SizeLimit()),
writecache.WithNoSync(wc.NoSync()),
writecache.WithLogger(&logger.Logger{Logger: log}),
),
)
}
// tree

View file

@ -3,6 +3,7 @@
package rawclient
import (
"context"
"fmt"
"time"
@ -31,7 +32,7 @@ func New(ng *engine.StorageEngine, opts ...Option) *RawClient {
return client
}
func (c *RawClient) Put(containerID cid.ID, ownerID *user.ID, headers map[string]string, payload []byte) (oid.ID, error) {
func (c *RawClient) Put(ctx context.Context, containerID cid.ID, ownerID *user.ID, headers map[string]string, payload []byte) (oid.ID, error) {
sz := len(payload)
attrs := make([]object.Attribute, len(headers))
@ -71,7 +72,7 @@ func (c *RawClient) Put(containerID cid.ID, ownerID *user.ID, headers map[string
req.WithObject(obj)
start := time.Now()
_, err = c.ng.Put(req)
err = c.ng.Put(ctx, req)
c.onPut(uint64(sz), err, time.Since(start))
if err != nil {
return oid.ID{}, err
@ -80,7 +81,7 @@ func (c *RawClient) Put(containerID cid.ID, ownerID *user.ID, headers map[string
return id, nil
}
func (c *RawClient) Get(containerID cid.ID, objectID oid.ID) (*object.Object, error) {
func (c *RawClient) Get(ctx context.Context, containerID cid.ID, objectID oid.ID) (*object.Object, error) {
var addr oid.Address
addr.SetContainer(containerID)
addr.SetObject(objectID)
@ -89,7 +90,7 @@ func (c *RawClient) Get(containerID cid.ID, objectID oid.ID) (*object.Object, er
req.WithAddress(addr)
start := time.Now()
res, err := c.ng.Get(req)
res, err := c.ng.Get(ctx, req)
var sz uint64
obj := res.Object()
@ -101,7 +102,7 @@ func (c *RawClient) Get(containerID cid.ID, objectID oid.ID) (*object.Object, er
return obj, nil
}
func (c *RawClient) Delete(containerID cid.ID, objectID oid.ID) error {
func (c *RawClient) Delete(ctx context.Context, containerID cid.ID, objectID oid.ID) error {
var addr oid.Address
addr.SetContainer(containerID)
addr.SetObject(objectID)
@ -110,7 +111,7 @@ func (c *RawClient) Delete(containerID cid.ID, objectID oid.ID) error {
req.WithAddress(addr)
start := time.Now()
_, err := c.ng.Delete(req)
_, err := c.ng.Delete(ctx, req)
c.onDelete(err, time.Since(start))
return err
}

View file

@ -31,11 +31,11 @@ import (
type (
Client struct {
vu modules.VU
key ecdsa.PrivateKey
tok session.Object
cli *client.Client
bufsize int
vu modules.VU
key ecdsa.PrivateKey
tok session.Object
cli *client.Client
prepareLocally bool
}
PutResponse struct {
@ -66,30 +66,18 @@ type (
}
PreparedObject struct {
vu modules.VU
key ecdsa.PrivateKey
cli *client.Client
bufsize int
hdr object.Object
payload []byte
vu modules.VU
key ecdsa.PrivateKey
cli *client.Client
hdr object.Object
payload []byte
prepareLocally bool
}
)
const defaultBufferSize = 64 * 1024
func (c *Client) SetBufferSize(size int) {
if size < 0 {
panic("buffer size must be positive")
}
if size == 0 {
c.bufsize = defaultBufferSize
} else {
c.bufsize = size
}
}
func (c *Client) Put(containerID string, headers map[string]string, payload goja.ArrayBuffer) PutResponse {
func (c *Client) Put(containerID string, headers map[string]string, payload goja.ArrayBuffer, chunkSize int) PutResponse {
cliContainerID := parseContainerID(containerID)
tok := c.tok
@ -116,7 +104,7 @@ func (c *Client) Put(containerID string, headers map[string]string, payload goja
o.SetOwnerID(&owner)
o.SetAttributes(attrs...)
resp, err := put(c.vu, c.bufsize, c.cli, &tok, &o, payload.Bytes())
resp, err := put(c.vu, c.cli, c.prepareLocally, &tok, &o, payload.Bytes(), chunkSize)
if err != nil {
return PutResponse{Success: false, Error: err.Error()}
}
@ -176,7 +164,7 @@ func (c *Client) Get(containerID, objectID string) GetResponse {
prm.WithinSession(tok)
var objSize = 0
err = get(c.cli, prm, c.vu.Context(), c.bufsize, func(data []byte) {
err = get(c.cli, prm, c.vu.Context(), func(data []byte) {
objSize += len(data)
})
if err != nil {
@ -194,10 +182,9 @@ func get(
cli *client.Client,
prm client.PrmObjectGet,
ctx context.Context,
bufSize int,
onDataChunk func(chunk []byte),
) error {
var buf = make([]byte, bufSize)
var buf = make([]byte, defaultBufferSize)
objectReader, err := cli.ObjectGetInit(ctx, prm)
if err != nil {
@ -245,7 +232,7 @@ func (c *Client) VerifyHash(containerID, objectID, expectedHash string) VerifyHa
prm.WithinSession(tok)
hasher := sha256.New()
err = get(c.cli, prm, c.vu.Context(), c.bufsize, func(data []byte) {
err = get(c.cli, prm, c.vu.Context(), func(data []byte) {
hasher.Write(data)
})
if err != nil {
@ -381,13 +368,12 @@ func (c *Client) Onsite(containerID string, payload goja.ArrayBuffer) PreparedOb
}
return PreparedObject{
vu: c.vu,
key: c.key,
cli: c.cli,
bufsize: c.bufsize,
hdr: *obj,
payload: data,
vu: c.vu,
key: c.key,
cli: c.cli,
hdr: *obj,
payload: data,
prepareLocally: c.prepareLocally,
}
}
@ -413,7 +399,7 @@ func (p PreparedObject) Put(headers map[string]string) PutResponse {
return PutResponse{Success: false, Error: err.Error()}
}
_, err = put(p.vu, p.bufsize, p.cli, nil, &obj, p.payload)
_, err = put(p.vu, p.cli, p.prepareLocally, nil, &obj, p.payload, 0)
if err != nil {
return PutResponse{Success: false, Error: err.Error()}
}
@ -421,8 +407,18 @@ func (p PreparedObject) Put(headers map[string]string) PutResponse {
return PutResponse{Success: true, ObjectID: id.String()}
}
func put(vu modules.VU, bufSize int, cli *client.Client, tok *session.Object,
hdr *object.Object, payload []byte) (*client.ResObjectPut, error) {
type epochSource uint64
func (s epochSource) CurrentEpoch() uint64 {
return uint64(s)
}
func put(vu modules.VU, cli *client.Client, prepareLocally bool, tok *session.Object,
hdr *object.Object, payload []byte, chunkSize int) (*client.ResObjectPut, error) {
bufSize := defaultBufferSize
if chunkSize > 0 {
bufSize = chunkSize
}
buf := make([]byte, bufSize)
rdr := bytes.NewReader(payload)
sz := rdr.Size()
@ -434,6 +430,18 @@ func put(vu modules.VU, bufSize int, cli *client.Client, tok *session.Object,
if tok != nil {
prm.WithinSession(*tok)
}
if chunkSize > 0 {
prm.SetGRPCPayloadChunkLen(chunkSize)
}
if prepareLocally {
res, err := cli.NetworkInfo(vu.Context(), client.PrmNetworkInfo{})
if err != nil {
return nil, err
}
prm.WithObjectMaxSize(res.Info().MaxObjectSize())
prm.WithEpochSource(epochSource(res.Info().CurrentEpoch()))
prm.WithoutHomomorphicHash(true)
}
objectWriter, err := cli.ObjectPutInit(vu.Context(), prm)
if err != nil {
@ -441,21 +449,21 @@ func put(vu modules.VU, bufSize int, cli *client.Client, tok *session.Object,
return nil, err
}
if !objectWriter.WriteHeader(*hdr) {
if !objectWriter.WriteHeader(vu.Context(), *hdr) {
stats.Report(vu, objPutFails, 1)
_, err = objectWriter.Close()
_, err = objectWriter.Close(vu.Context())
return nil, err
}
n, _ := rdr.Read(buf)
for n > 0 {
if !objectWriter.WritePayloadChunk(buf[:n]) {
if !objectWriter.WritePayloadChunk(vu.Context(), buf[:n]) {
break
}
n, _ = rdr.Read(buf)
}
resp, err := objectWriter.Close()
resp, err := objectWriter.Close(vu.Context())
if err != nil {
stats.Report(vu, objPutFails, 1)
return nil, err

View file

@ -51,7 +51,7 @@ func (n *Native) Exports() modules.Exports {
return modules.Exports{Default: n}
}
func (n *Native) Connect(endpoint, hexPrivateKey string, dialTimeout, streamTimeout int) (*Client, error) {
func (n *Native) Connect(endpoint, hexPrivateKey string, dialTimeout, streamTimeout int, prepareLocally bool) (*Client, error) {
var (
cli client.Client
pk *keys.PrivateKey
@ -82,7 +82,7 @@ func (n *Native) Connect(endpoint, hexPrivateKey string, dialTimeout, streamTime
prmDial.SetStreamTimeout(time.Duration(streamTimeout) * time.Second)
}
err = cli.Dial(prmDial)
err = cli.Dial(n.vu.Context(), prmDial)
if err != nil {
return nil, fmt.Errorf("dial endpoint: %s %w", endpoint, err)
}
@ -133,10 +133,10 @@ func (n *Native) Connect(endpoint, hexPrivateKey string, dialTimeout, streamTime
cnrPutDuration, _ = registry.NewMetric("frostfs_cnr_put_duration", metrics.Trend, metrics.Time)
return &Client{
vu: n.vu,
key: pk.PrivateKey,
tok: tok,
cli: &cli,
bufsize: defaultBufferSize,
vu: n.vu,
key: pk.PrivateKey,
tok: tok,
cli: &cli,
prepareLocally: prepareLocally,
}, nil
}

View file

@ -0,0 +1,67 @@
// Package profile provides an extension to generate profile data from k6 itself.
//
// An Output extension is used to leverage the Start and Stop hooks which are
// otherwise inaccessible in a regular module.
package profile
import (
"fmt"
"os"
"runtime"
"runtime/pprof"
"go.k6.io/k6/metrics"
"go.k6.io/k6/output"
)
const (
cpuProfilePath = "cpu.prof"
memProfilePath = "mem.prof"
)
type profExt struct {
cpuFile *os.File
}
func New(output.Params) (output.Output, error) {
return &profExt{}, nil
}
func (*profExt) Description() string {
return "profile"
}
func (ext *profExt) Start() error {
var err error
ext.cpuFile, err = os.Create(cpuProfilePath)
if err != nil {
return fmt.Errorf("creating cpu profile file: %v", err)
}
if err := pprof.StartCPUProfile(ext.cpuFile); err != nil {
return fmt.Errorf("starting cpu profile: %v", err)
}
return nil
}
func (ext *profExt) Stop() error {
pprof.StopCPUProfile()
if err := ext.cpuFile.Close(); err != nil {
return fmt.Errorf("closing cpu profile file: %v", err)
}
f, err := os.Create(memProfilePath)
if err != nil {
return fmt.Errorf("creating mem profile file: %v", err)
}
defer f.Close()
runtime.GC()
if err := pprof.WriteHeapProfile(f); err != nil {
return fmt.Errorf("writing mem profile: %v", err)
}
return nil
}
func (*profExt) AddMetricSamples([]metrics.SampleContainer) {}
func init() {
output.RegisterExtension("profile", New)
}

View file

@ -0,0 +1,82 @@
package registry
import (
"fmt"
"os"
)
type ObjExporter struct {
selector *ObjSelector
}
type PreGenerateInfo struct {
Buckets []string `json:"buckets"`
Objects []ObjInfo `json:"objects"`
ObjSize string `json:"obj_size"`
}
type ObjInfo struct {
Bucket string `json:"bucket"`
Object string `json:"object"`
}
func NewObjExporter(selector *ObjSelector) *ObjExporter {
return &ObjExporter{selector: selector}
}
func (o *ObjExporter) ExportJSONPreGen(fileName string) error {
f, err := os.Create(fileName)
if err != nil {
return err
}
defer f.Close()
// there can be a lot of object, so manually form json
if _, err = f.WriteString(`{"objects":[`); err != nil {
return err
}
bucketMap := make(map[string]struct{})
count, err := o.selector.Count()
if err != nil {
return err
}
var comma string
for i := 0; i < count; i++ {
info := o.selector.NextObject()
if info == nil {
break
}
if _, err = f.WriteString(fmt.Sprintf(`%s{"bucket":"%s","object":"%s"}`, comma, info.S3Bucket, info.S3Key)); err != nil {
return err
}
if i == 0 {
comma = ","
}
bucketMap[info.S3Bucket] = struct{}{}
}
if _, err = f.WriteString(`],"buckets":[`); err != nil {
return err
}
i := 0
comma = ""
for bucket := range bucketMap {
if _, err = f.WriteString(fmt.Sprintf(`%s"%s"`, comma, bucket)); err != nil {
return err
}
if i == 0 {
comma = ","
}
i++
}
_, err = f.WriteString(`]}`)
return err
}

View file

@ -81,10 +81,14 @@ func (o *ObjRegistry) SetObjectStatus(id uint64, oldStatus, newStatus string) er
return fmt.Errorf("bucket doesn't exist: '%s'", oldStatus)
}
objBytes := oldB.Get(encodeId(id))
key := encodeId(id)
objBytes := oldB.Get(key)
if objBytes == nil {
return errors.New("object doesn't exist")
}
if err := oldB.Delete(key); err != nil {
return fmt.Errorf("bucket.Delete: %w", err)
}
obj := new(ObjectInfo)
if err := obj.Unmarshal(objBytes); err != nil {

View file

@ -163,10 +163,11 @@ func (o *ObjSelector) selectLoop() {
if len(cache) != o.cacheSize {
// no more objects, wait a little; the logic could be improved.
select {
case <-time.After(time.Second * time.Duration(o.filter.Age/2)):
case <-time.After(time.Second):
case <-o.ctx.Done():
return
}
lastID = 0
}
// clean handled objects

View file

@ -94,6 +94,10 @@ func (r *Registry) GetSelector(dbFilePath string, name string, cacheSize int, fi
return selector
}
func (r *Registry) GetExporter(selector *ObjSelector) *ObjExporter {
return NewObjExporter(selector)
}
func parseFilter(filter map[string]string) (*ObjFilter, error) {
objFilter := ObjFilter{}
objFilter.Status = filter["status"]

View file

@ -5,11 +5,13 @@ import (
"context"
"crypto/sha256"
"encoding/hex"
"fmt"
"strconv"
"time"
"git.frostfs.info/TrueCloudLab/xk6-frostfs/internal/stats"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/feature/s3/manager"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/aws/aws-sdk-go-v2/service/s3/types"
"github.com/dop251/goja"
@ -21,6 +23,7 @@ type (
Client struct {
vu modules.VU
cli *s3.Client
to time.Duration
}
PutResponse struct {
@ -54,7 +57,14 @@ func (c *Client) Put(bucket, key string, payload goja.ArrayBuffer) PutResponse {
sz := rdr.Size()
start := time.Now()
_, err := c.cli.PutObject(c.vu.Context(), &s3.PutObjectInput{
ch := CancelMonitor(c.vu.Context(), c.vu.State().Logger.WithField("method", "put"))
defer func() { close(ch) }()
ctx, cancel := context.WithTimeout(context.Background(), c.to)
defer cancel()
_, err := c.cli.PutObject(ctx, &s3.PutObjectInput{
Bucket: aws.String(bucket),
Key: aws.String(key),
Body: rdr,
@ -70,10 +80,49 @@ func (c *Client) Put(bucket, key string, payload goja.ArrayBuffer) PutResponse {
return PutResponse{Success: true}
}
const multipartUploadMinPartSize = 5 * 1024 * 1024 // 5MB
func (c *Client) Multipart(bucket, key string, objPartSize, concurrency int, payload goja.ArrayBuffer) PutResponse {
if objPartSize < multipartUploadMinPartSize {
stats.Report(c.vu, objPutFails, 1)
return PutResponse{Success: false, Error: fmt.Sprintf("part size '%d' must be greater than '%d'(5 MB)", objPartSize, multipartUploadMinPartSize)}
}
start := time.Now()
uploader := manager.NewUploader(c.cli, func(u *manager.Uploader) {
u.PartSize = int64(objPartSize)
u.Concurrency = concurrency
})
payloadReader := bytes.NewReader(payload.Bytes())
sz := payloadReader.Len()
ctx, cancel := context.WithTimeout(context.Background(), c.to)
defer cancel()
_, err := uploader.Upload(ctx, &s3.PutObjectInput{
Bucket: aws.String(bucket),
Key: aws.String(key),
Body: payloadReader,
})
if err != nil {
stats.Report(c.vu, objPutFails, 1)
return PutResponse{Success: false, Error: err.Error()}
}
stats.Report(c.vu, objPutTotal, 1)
stats.ReportDataSent(c.vu, float64(sz))
stats.Report(c.vu, objPutDuration, metrics.D(time.Since(start)))
return PutResponse{Success: true}
}
func (c *Client) Delete(bucket, key string) DeleteResponse {
start := time.Now()
_, err := c.cli.DeleteObject(c.vu.Context(), &s3.DeleteObjectInput{
ctx, cancel := context.WithTimeout(context.Background(), c.to)
defer cancel()
_, err := c.cli.DeleteObject(ctx, &s3.DeleteObjectInput{
Bucket: aws.String(bucket),
Key: aws.String(key),
})
@ -90,8 +139,14 @@ func (c *Client) Delete(bucket, key string) DeleteResponse {
func (c *Client) Get(bucket, key string) GetResponse {
start := time.Now()
ch := CancelMonitor(c.vu.Context(), c.vu.State().Logger.WithField("method", "get"))
defer func() { close(ch) }()
ctx, cancel := context.WithTimeout(context.Background(), c.to)
defer cancel()
var objSize = 0
err := get(c.cli, bucket, key, func(chunk []byte) {
err := get(ctx, c.cli, bucket, key, func(chunk []byte) {
objSize += len(chunk)
})
if err != nil {
@ -106,6 +161,7 @@ func (c *Client) Get(bucket, key string) GetResponse {
}
func get(
ctx context.Context,
c *s3.Client,
bucket string,
key string,
@ -113,7 +169,7 @@ func get(
) error {
var buf = make([]byte, 4*1024)
obj, err := c.GetObject(context.Background(), &s3.GetObjectInput{
obj, err := c.GetObject(ctx, &s3.GetObjectInput{
Bucket: aws.String(bucket),
Key: aws.String(key),
})
@ -135,7 +191,11 @@ func get(
func (c *Client) VerifyHash(bucket, key, expectedHash string) VerifyHashResponse {
hasher := sha256.New()
err := get(c.cli, bucket, key, func(data []byte) {
ctx, cancel := context.WithTimeout(context.Background(), c.to)
defer cancel()
err := get(ctx, c.cli, bucket, key, func(data []byte) {
hasher.Write(data)
})
if err != nil {
@ -167,7 +227,11 @@ func (c *Client) CreateBucket(bucket string, params map[string]string) CreateBuc
}
start := time.Now()
_, err = c.cli.CreateBucket(c.vu.Context(), &s3.CreateBucketInput{
ctx, cancel := context.WithTimeout(context.Background(), c.to)
defer cancel()
_, err = c.cli.CreateBucket(ctx, &s3.CreateBucketInput{
Bucket: aws.String(bucket),
ACL: types.BucketCannedACL(params["acl"]),
CreateBucketConfiguration: bucketConfiguration,

View file

@ -1,6 +1,7 @@
package s3
import (
"context"
"crypto/tls"
"fmt"
"net/http"
@ -10,6 +11,7 @@ import (
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/sirupsen/logrus"
"go.k6.io/k6/js/modules"
"go.k6.io/k6/metrics"
)
@ -114,5 +116,18 @@ func (s *S3) Connect(endpoint string, params map[string]string) (*Client, error)
return &Client{
vu: s.vu,
cli: cli,
to: timeout,
}, nil
}
func CancelMonitor(ctx context.Context, writer logrus.FieldLogger) chan<- struct{} {
ch := make(chan struct{})
go func() {
select {
case <-ctx.Done():
writer.WithField("error", ctx.Err()).Print("VU context done")
case <-ch:
}
}()
return ch
}

View file

@ -55,7 +55,7 @@ func (*frostfs) DeleteContainer(context.Context, cid.ID, *session.Container) err
}
func (f *frostfs) ReadObject(ctx context.Context, prm layer.PrmObjectRead) (*layer.ObjectPart, error) {
obj, err := f.Get(prm.Container, prm.Object)
obj, err := f.Get(ctx, prm.Container, prm.Object)
if err != nil {
return nil, err
}
@ -78,7 +78,7 @@ func (f *frostfs) CreateObject(ctx context.Context, prm layer.PrmObjectCreate) (
for _, attr := range prm.Attributes {
hdrs[attr[0]] = attr[1]
}
return f.Put(prm.Container, &prm.Creator, hdrs, payload)
return f.Put(ctx, prm.Container, &prm.Creator, hdrs, payload)
}
func (f *frostfs) DeleteObject(context.Context, layer.PrmObjectDelete) error {

View file

@ -56,7 +56,7 @@ func (s *Local) Exports() modules.Exports {
return modules.Exports{Default: s}
}
func (s *Local) Connect(configFile string, params map[string]string, bucketMapping map[string]string) (*Client, error) {
func (s *Local) Connect(configFile string, configDir string, params map[string]string, bucketMapping map[string]string) (*Client, error) {
// Parse configuration flags.
fs := flag.NewFlagSet("s3local", flag.ContinueOnError)
@ -107,16 +107,16 @@ func (s *Local) Connect(configFile string, params map[string]string, bucketMappi
objGetDuration, _ = registry.NewMetric("s3local_obj_get_duration", metrics.Trend, metrics.Time)
// Create S3 layer backed by local storage engine and tree service.
ng, err := s.l.ResolveEngine(s.l.VU().Context(), configFile, *debugLogger)
ng, err := s.l.ResolveEngine(s.l.VU().Context(), configFile, configDir, *debugLogger)
if err != nil {
return nil, fmt.Errorf("connecting to engine for config %q: %v", configFile, err)
return nil, fmt.Errorf("connecting to engine for config - file %q dir %q: %v", configFile, configDir, err)
}
treeSvc := tree.NewTree(treeServiceEngineWrapper{
ng: ng,
pos: *nodePosition,
size: *nodeCount,
})
}, zap.L())
rc := rawclient.New(ng,
rawclient.WithKey(key.PrivateKey),

View file

@ -49,7 +49,7 @@ func (r nodeResponse) GetParentID() uint64 { return r.parentID }
func (r nodeResponse) GetTimestamp() uint64 { return r.ts }
func (s treeServiceEngineWrapper) GetNodes(ctx context.Context, p *tree.GetNodesParams) ([]tree.NodeResponse, error) {
nodeIDs, err := s.ng.TreeGetByPath(p.BktInfo.CID, p.TreeID, pilorama.AttributeFilename, p.Path, p.LatestOnly)
nodeIDs, err := s.ng.TreeGetByPath(ctx, p.BktInfo.CID, p.TreeID, pilorama.AttributeFilename, p.Path, p.LatestOnly)
if err != nil {
if errors.Is(err, pilorama.ErrTreeNotFound) {
// This is needed in order for the tree implementation to create the tree/node
@ -62,7 +62,7 @@ func (s treeServiceEngineWrapper) GetNodes(ctx context.Context, p *tree.GetNodes
resps := make([]tree.NodeResponse, 0, len(nodeIDs))
for _, nodeID := range nodeIDs {
m, parentID, err := s.ng.TreeGetMeta(p.BktInfo.CID, p.TreeID, nodeID)
m, parentID, err := s.ng.TreeGetMeta(ctx, p.BktInfo.CID, p.TreeID, nodeID)
if err != nil {
return nil, err
}
@ -94,7 +94,7 @@ func (s treeServiceEngineWrapper) GetSubTree(ctx context.Context, bktInfo *data.
var traverse func(nodeID uint64, curDepth uint32) error
traverse = func(nodeID uint64, curDepth uint32) error {
m, parentID, err := s.ng.TreeGetMeta(bktInfo.CID, treeID, nodeID)
m, parentID, err := s.ng.TreeGetMeta(ctx, bktInfo.CID, treeID, nodeID)
if err != nil {
return fmt.Errorf("getting meta: %v", err)
}
@ -110,7 +110,7 @@ func (s treeServiceEngineWrapper) GetSubTree(ctx context.Context, bktInfo *data.
return nil
}
children, err := s.ng.TreeGetChildren(bktInfo.CID, treeID, nodeID)
children, err := s.ng.TreeGetChildren(ctx, bktInfo.CID, treeID, nodeID)
if err != nil {
return fmt.Errorf("getting children: %v", err)
}
@ -135,7 +135,7 @@ func (s treeServiceEngineWrapper) AddNode(ctx context.Context, bktInfo *data.Buc
Position: s.pos,
Size: s.size,
}
mv, err := s.ng.TreeMove(desc, treeID, &pilorama.Move{
mv, err := s.ng.TreeMove(ctx, desc, treeID, &pilorama.Move{
Parent: parentID,
Child: pilorama.RootID,
Meta: pilorama.Meta{Items: mapToKV(meta)},
@ -149,7 +149,7 @@ func (s treeServiceEngineWrapper) AddNodeByPath(ctx context.Context, bktInfo *da
Position: s.pos,
Size: s.size,
}
mvs, err := s.ng.TreeAddByPath(desc, treeID, pilorama.AttributeFilename, path, mapToKV(meta))
mvs, err := s.ng.TreeAddByPath(ctx, desc, treeID, pilorama.AttributeFilename, path, mapToKV(meta))
if err != nil {
return pilorama.TrashID, err
}
@ -165,7 +165,7 @@ func (s treeServiceEngineWrapper) MoveNode(ctx context.Context, bktInfo *data.Bu
Position: s.pos,
Size: s.size,
}
_, err := s.ng.TreeMove(desc, treeID, &pilorama.Move{
_, err := s.ng.TreeMove(ctx, desc, treeID, &pilorama.Move{
Parent: parentID,
Child: nodeID,
Meta: pilorama.Meta{
@ -184,7 +184,7 @@ func (s treeServiceEngineWrapper) RemoveNode(ctx context.Context, bktInfo *data.
Position: s.pos,
Size: s.size,
}
_, err := s.ng.TreeMove(desc, treeID, &pilorama.Move{
_, err := s.ng.TreeMove(ctx, desc, treeID, &pilorama.Move{
Parent: pilorama.TrashID,
Child: nodeID,
})

View file

@ -9,18 +9,22 @@ import (
func Report(vu modules.VU, metric *metrics.Metric, value float64) {
metrics.PushIfNotDone(vu.Context(), vu.State().Samples, metrics.Sample{
Metric: metric,
Time: time.Now(),
Value: value,
TimeSeries: metrics.TimeSeries{
Metric: metric,
},
Time: time.Now(),
Value: value,
})
}
func ReportDataReceived(vu modules.VU, value float64) {
vu.State().BuiltinMetrics.DataReceived.Sink.Add(
metrics.Sample{
Metric: &metrics.Metric{},
Value: value,
Time: time.Now()},
TimeSeries: metrics.TimeSeries{
Metric: &metrics.Metric{},
},
Value: value,
Time: time.Now()},
)
}
@ -28,8 +32,10 @@ func ReportDataSent(vu modules.VU, value float64) {
state := vu.State()
state.BuiltinMetrics.DataSent.Sink.Add(
metrics.Sample{
Metric: &metrics.Metric{},
Value: value,
Time: time.Now()},
TimeSeries: metrics.TimeSeries{
Metric: &metrics.Metric{},
},
Value: value,
Time: time.Now()},
)
}

View file

@ -0,0 +1,6 @@
package version
var (
// Version is the xk6 command-line utils version.
Version = "dev"
)

View file

@ -24,7 +24,10 @@ const summary_json = __ENV.SUMMARY_JSON || "/tmp/summary.json";
// Select random gRPC endpoint for current VU
const grpc_endpoints = __ENV.GRPC_ENDPOINTS.split(',');
const grpc_endpoint = grpc_endpoints[Math.floor(Math.random() * grpc_endpoints.length)];
const grpc_client = native.connect(grpc_endpoint, '', __ENV.DIAL_TIMEOUT ? parseInt(__ENV.DIAL_TIMEOUT) : 5, __ENV.STREAM_TIMEOUT ? parseInt(__ENV.STREAM_TIMEOUT) : 60);
const grpc_client = native.connect(grpc_endpoint, '',
__ENV.DIAL_TIMEOUT ? parseInt(__ENV.DIAL_TIMEOUT) : 5,
__ENV.STREAM_TIMEOUT ? parseInt(__ENV.STREAM_TIMEOUT) : 60,
__ENV.PREPARE_LOCALLY ? __ENV.PREPARE_LOCALLY.toLowerCase() === "true" : false);
const log = logging.new().withField("endpoint", grpc_endpoint);
const registry_enabled = !!__ENV.REGISTRY_FILE;
@ -46,12 +49,27 @@ if (registry_enabled && delete_age) {
);
}
const read_age = __ENV.READ_AGE ? parseInt(__ENV.READ_AGE) : 10;
let obj_to_read_selector = undefined;
if (registry_enabled) {
obj_to_read_selector = registry.getSelector(
__ENV.REGISTRY_FILE,
"obj_to_read",
__ENV.SELECTION_SIZE ? parseInt(__ENV.SELECTION_SIZE) : 0,
{
status: "created",
age: read_age,
}
)
}
const generator = datagen.generator(1024 * parseInt(__ENV.WRITE_OBJ_SIZE));
const generator = datagen.generator(1024 * parseInt(__ENV.WRITE_OBJ_SIZE), __ENV.PAYLOAD_TYPE || "");
const scenarios = {};
const write_vu_count = parseInt(__ENV.WRITERS || '0');
const write_grpc_chunk_size = 1024 * parseInt(__ENV.GRPC_CHUNK_SIZE || '0')
if (write_vu_count > 0) {
scenarios.write = {
executor: 'constant-vus',
@ -103,12 +121,17 @@ export function setup() {
console.log(`Writing VUs: ${write_vu_count}`);
console.log(`Deleting VUs: ${delete_vu_count}`);
console.log(`Total VUs: ${total_vu_count}`);
const start_timestamp = Date.now()
console.log(`Load started at: ${Date(start_timestamp).toString()}`)
}
export function teardown(data) {
if (obj_registry) {
obj_registry.close();
}
const end_timestamp = Date.now()
console.log(`Load finished at: ${Date(end_timestamp).toString()}`)
}
export function handleSummary(data) {
@ -129,7 +152,7 @@ export function obj_write() {
const container = container_list[Math.floor(Math.random() * container_list.length)];
const { payload, hash } = generator.genPayload(registry_enabled);
const resp = grpc_client.put(container, headers, payload);
const resp = grpc_client.put(container, headers, payload, write_grpc_chunk_size);
if (!resp.success) {
log.withField("cid", container).error(resp.error);
return;
@ -145,6 +168,18 @@ export function obj_read() {
sleep(__ENV.SLEEP_READ);
}
if(obj_to_read_selector) {
const obj = obj_to_read_selector.nextObject();
if (!obj) {
return;
}
const resp = grpc_client.get(obj.c_id, obj.o_id)
if (!resp.success) {
log.withFields({cid: obj.c_id, oid: obj.o_id}).error(resp.error);
}
return
}
const obj = obj_list[Math.floor(Math.random() * obj_list.length)];
const resp = grpc_client.get(obj.container, obj.object)
if (!resp.success) {

View file

@ -24,7 +24,10 @@ const summary_json = __ENV.SUMMARY_JSON || "/tmp/summary.json";
// Select random gRPC endpoint for current VU
const grpc_endpoints = __ENV.GRPC_ENDPOINTS.split(',');
const grpc_endpoint = grpc_endpoints[Math.floor(Math.random() * grpc_endpoints.length)];
const grpc_client = native.connect(grpc_endpoint, '', __ENV.DIAL_TIMEOUT ? parseInt(__ENV.DIAL_TIMEOUT) : 5, __ENV.STREAM_TIMEOUT ? parseInt(__ENV.STREAM_TIMEOUT) : 60);
const grpc_client = native.connect(grpc_endpoint, '',
__ENV.DIAL_TIMEOUT ? parseInt(__ENV.DIAL_TIMEOUT) : 5,
__ENV.STREAM_TIMEOUT ? parseInt(__ENV.STREAM_TIMEOUT) : 60,
__ENV.PREPARE_LOCALLY ? __ENV.PREPARE_LOCALLY.toLowerCase() === "true" : false);
const log = logging.new().withField("endpoint", grpc_endpoint);
const registry_enabled = !!__ENV.REGISTRY_FILE;
@ -46,6 +49,19 @@ if (registry_enabled && delete_age) {
);
}
const read_age = __ENV.READ_AGE ? parseInt(__ENV.READ_AGE) : 10;
let obj_to_read_selector = undefined;
if (registry_enabled) {
obj_to_read_selector = registry.getSelector(
__ENV.REGISTRY_FILE,
"obj_to_read",
__ENV.SELECTION_SIZE ? parseInt(__ENV.SELECTION_SIZE) : 0,
{
status: "created",
age: read_age,
}
)
}
const generator = datagen.generator(1024 * parseInt(__ENV.WRITE_OBJ_SIZE));
@ -55,6 +71,7 @@ const time_unit = __ENV.TIME_UNIT || '1s';
const pre_alloc_write_vus = parseInt(__ENV.PRE_ALLOC_WRITERS || '0');
const max_write_vus = parseInt(__ENV.MAX_WRITERS || pre_alloc_write_vus);
const write_rate = parseInt(__ENV.WRITE_RATE || '0');
const write_grpc_chunk_size = 1024 * parseInt(__ENV.GRPC_CHUNK_SIZE || '0')
if (write_rate > 0) {
scenarios.write = {
executor: 'constant-arrival-rate',
@ -128,12 +145,17 @@ export function setup() {
console.log(`Read rate: ${read_rate}`);
console.log(`Writing rate: ${write_rate}`);
console.log(`Delete rate: ${delete_rate}`);
const start_timestamp = Date.now()
console.log(`Load started at: ${Date(start_timestamp).toString()}`)
}
export function teardown(data) {
if (obj_registry) {
obj_registry.close();
}
const end_timestamp = Date.now()
console.log(`Load finished at: ${Date(end_timestamp).toString()}`)
}
export function handleSummary(data) {
@ -154,7 +176,7 @@ export function obj_write() {
const container = container_list[Math.floor(Math.random() * container_list.length)];
const { payload, hash } = generator.genPayload(registry_enabled);
const resp = grpc_client.put(container, headers, payload);
const resp = grpc_client.put(container, headers, payload, write_grpc_chunk_size);
if (!resp.success) {
log.withField("cid", container).error(resp.error);
return;
@ -170,6 +192,18 @@ export function obj_read() {
sleep(__ENV.SLEEP_READ);
}
if(obj_to_read_selector) {
const obj = obj_to_read_selector.nextObject();
if (!obj) {
return;
}
const resp = grpc_client.get(obj.c_id, obj.o_id)
if (!resp.success) {
log.withFields({cid: obj.c_id, oid: obj.o_id}).error(resp.error);
}
return
}
const obj = obj_list[Math.floor(Math.random() * obj_list.length)];
const resp = grpc_client.get(obj.container, obj.object)
if (!resp.success) {

View file

@ -31,7 +31,7 @@ const obj_registry = registry_enabled ? registry.open(__ENV.REGISTRY_FILE) : und
const duration = __ENV.DURATION;
const generator = datagen.generator(1024 * parseInt(__ENV.WRITE_OBJ_SIZE));
const generator = datagen.generator(1024 * parseInt(__ENV.WRITE_OBJ_SIZE), __ENV.PAYLOAD_TYPE || "");
const scenarios = {};
@ -71,12 +71,17 @@ export function setup() {
console.log(`Reading VUs: ${read_vu_count}`);
console.log(`Writing VUs: ${write_vu_count}`);
console.log(`Total VUs: ${total_vu_count}`);
const start_timestamp = Date.now()
console.log(`Load started at: ${Date(start_timestamp).toString()}`)
}
export function teardown(data) {
if (obj_registry) {
obj_registry.close();
}
const end_timestamp = Date.now()
console.log(`Load finished at: ${Date(end_timestamp).toString()}`)
}
export function handleSummary(data) {

View file

@ -21,9 +21,10 @@ const read_size = JSON.parse(open(__ENV.PREGEN_JSON)).obj_size;
const summary_json = __ENV.SUMMARY_JSON || "/tmp/summary.json";
const config_file = __ENV.CONFIG_FILE;
const config_dir = __ENV.CONFIG_DIR;
const debug_logger = (__ENV.DEBUG_LOGGER || 'false') == 'true';
const local_client = local.connect(config_file, '', debug_logger);
const log = logging.new().withField("config", config_file);
const local_client = local.connect(config_file, config_dir, '', debug_logger);
const log = logging.new().withFields({"config_file": config_file,"config_dir": config_dir});
const registry_enabled = !!__ENV.REGISTRY_FILE;
const obj_registry = registry_enabled ? registry.open(__ENV.REGISTRY_FILE) : undefined;
@ -100,12 +101,17 @@ export function setup() {
console.log(`Writing VUs: ${write_vu_count}`);
console.log(`Deleting VUs: ${delete_vu_count}`);
console.log(`Total VUs: ${total_vu_count}`);
const start_timestamp = Date.now()
console.log(`Load started at: ${Date(start_timestamp).toString()}`)
}
export function teardown(data) {
if (obj_registry) {
obj_registry.close();
}
const end_timestamp = Date.now()
console.log(`Load finished at: ${Date(end_timestamp).toString()}`)
}
export function handleSummary(data) {

View file

@ -1,50 +1,50 @@
import uuid
from helpers.cmd import execute_cmd
from helpers.cmd import execute_cmd, log
def create_bucket(endpoint, versioning, location):
bucket_create_marker = False
def create_bucket(endpoint, versioning, location, no_verify_ssl):
if location:
location = f"--create-bucket-configuration 'LocationConstraint={location}'"
bucket_name = str(uuid.uuid4())
no_verify_ssl_str = "--no-verify-ssl" if no_verify_ssl else ""
cmd_line = f"aws {no_verify_ssl_str} s3api create-bucket --bucket {bucket_name} " \
f"--endpoint {endpoint} {location}"
cmd_line_ver = f"aws {no_verify_ssl_str} s3api put-bucket-versioning --bucket {bucket_name} " \
f"--versioning-configuration Status=Enabled --endpoint {endpoint} "
cmd_line = f"aws --no-verify-ssl s3api create-bucket --bucket {bucket_name} " \
f"--endpoint http://{endpoint} {location}"
cmd_line_ver = f"aws --no-verify-ssl s3api put-bucket-versioning --bucket {bucket_name} " \
f"--versioning-configuration Status=Enabled --endpoint http://{endpoint} "
output, success = execute_cmd(cmd_line)
out, success = execute_cmd(cmd_line)
if not success and "succeeded and you already own it" not in output:
log(f"{cmd_line}\n"
f"Bucket {bucket_name} has not been created:\n"
f"Error: {output}", endpoint)
return False
if not success:
if "succeeded and you already own it" in out:
bucket_create_marker = True
else:
print(f" > Bucket {bucket_name} has not been created:\n{out}")
else:
bucket_create_marker = True
print(f"cmd: {cmd_line}")
if bucket_create_marker and versioning == "True":
out, success = execute_cmd(cmd_line_ver)
if versioning == "True":
output, success = execute_cmd(cmd_line_ver)
if not success:
print(f" > Bucket versioning has not been applied for bucket {bucket_name}:\n{out}")
log(f"{cmd_line_ver}\n"
f"Bucket versioning has not been applied for bucket {bucket_name}\n"
f"Error: {output}", endpoint)
else:
print(f" > Bucket versioning has been applied.")
log(f"Bucket versioning has been applied for bucket {bucket_name}", endpoint)
log(f"Created bucket: {bucket_name}", endpoint)
return bucket_name
def upload_object(bucket, payload_filepath, endpoint):
def upload_object(bucket, payload_filepath, endpoint, no_verify_ssl):
object_name = str(uuid.uuid4())
cmd_line = f"aws --no-verify-ssl s3api put-object --bucket {bucket} --key {object_name} " \
f"--body {payload_filepath} --endpoint http://{endpoint}"
out, success = execute_cmd(cmd_line)
no_verify_ssl_str = "--no-verify-ssl" if no_verify_ssl else ""
cmd_line = f"aws {no_verify_ssl_str} s3api put-object --bucket {bucket} --key {object_name} " \
f"--body {payload_filepath} --endpoint {endpoint}"
output, success = execute_cmd(cmd_line)
if not success:
print(f" > Object {object_name} has not been uploaded.")
log(f"{cmd_line}\n"
f"Object {object_name} has not been uploaded\n"
f"Error: {output}", endpoint)
return False
else:
return object_name
return bucket, endpoint, object_name

View file

@ -1,9 +1,12 @@
import os
import shlex
import sys
from datetime import datetime
from subprocess import check_output, CalledProcessError, STDOUT
def log(message, endpoint):
time = datetime.utcnow()
print(f"{time} at {endpoint}: {message}")
def execute_cmd(cmd_line):
cmd_args = shlex.split(cmd_line)
@ -19,10 +22,9 @@ def execute_cmd(cmd_line):
return output, success
def random_payload(payload_filepath, size):
with open('%s' % payload_filepath, 'w+b') as fout:
fout.write(os.urandom(1024 * int(size)))
def random_payload(file, size):
file.write(os.urandom(1024 * int(size)))
file.flush()
class ProgressBar:
@staticmethod

View file

@ -1,81 +1,105 @@
import re
from helpers.cmd import execute_cmd
from helpers.cmd import execute_cmd, log
def create_container(endpoint, policy, wallet_file, wallet_config):
cmd_line = f"frostfs-cli --rpc-endpoint {endpoint} container create --wallet {wallet_file} --config {wallet_config} " \
if wallet_file:
wallet_file = "--wallet " + wallet_file
if wallet_config:
wallet_config = "--config " + wallet_config
cmd_line = f"frostfs-cli --rpc-endpoint {endpoint} container create {wallet_file} {wallet_config} " \
f" --policy '{policy}' --basic-acl public-read-write --await"
output, success = execute_cmd(cmd_line)
if not success:
print(f" > Container has not been created:\n{output}")
log(f"{cmd_line}\n"
f"Container has not been created\n"
f"{output}", endpoint)
return False
else:
try:
fst_str = output.split('\n')[0]
except Exception:
print(f"Got empty output: {output}")
return False
splitted = fst_str.split(": ")
if len(splitted) != 2:
raise ValueError(f"no CID was parsed from command output: \t{fst_str}")
try:
fst_str = output.split('\n')[0]
except Exception:
log(f"{cmd_line}\n"
f"Incorrect output\n"
f"Output: {output or '<empty>'}", endpoint)
return False
splitted = fst_str.split(": ")
if len(splitted) != 2:
raise ValueError(f"no CID was parsed from command output:\t{fst_str}")
print(f"Created container: {splitted[1]}")
log(f"Created container {splitted[1]}", endpoint)
return splitted[1]
return splitted[1]
def upload_object(container, payload_filepath, endpoint, wallet_file, wallet_config):
object_name = ""
cmd_line = f"frostfs-cli --rpc-endpoint {endpoint} object put --file {payload_filepath} --wallet {wallet_file} --config {wallet_config} " \
if wallet_file:
wallet_file = "--wallet " + wallet_file
if wallet_config:
wallet_config = "--config " + wallet_config
cmd_line = f"frostfs-cli --rpc-endpoint {endpoint} object put --file {payload_filepath} {wallet_file} {wallet_config} " \
f"--cid {container} --no-progress"
output, success = execute_cmd(cmd_line)
if not success:
print(f" > Object {object_name} has not been uploaded:\n{output}")
log(f"{cmd_line}\n"
f"Object {object_name} has not been uploaded\n"
f"Error: {output}", endpoint)
return False
else:
try:
# taking second string from command output
snd_str = output.split('\n')[1]
except Exception:
print(f"Got empty input: {output}")
return False
splitted = snd_str.split(": ")
if len(splitted) != 2:
raise Exception(f"no OID was parsed from command output: \t{snd_str}")
return splitted[1]
try:
# taking second string from command output
snd_str = output.split('\n')[1]
except Exception:
log(f"{cmd_line}\n"
f"Incorrect output\n"
f"Output: {output or '<empty>'}", endpoint)
return False
splitted = snd_str.split(": ")
if len(splitted) != 2:
raise Exception(f"no OID was parsed from command output: \t{snd_str}")
return container, endpoint, splitted[1]
def get_object(cid, oid, endpoint, out_filepath, wallet_file, wallet_config):
cmd_line = f"frostfs-cli object get -r {endpoint} --cid {cid} --oid {oid} --wallet {wallet_file} --config {wallet_config} " \
if wallet_file:
wallet_file = "--wallet " + wallet_file
if wallet_config:
wallet_config = "--config " + wallet_config
cmd_line = f"frostfs-cli object get -r {endpoint} --cid {cid} --oid {oid} {wallet_file} {wallet_config} " \
f"--file {out_filepath}"
output, success = execute_cmd(cmd_line)
if not success:
print(f" > Failed to get object {output} from container {cid} \r\n"
f" > Error: {output}")
log(f"{cmd_line}\n"
f"Failed to get object {oid} from container {cid}\n"
f"Error: {output}", endpoint)
return False
return True
def search_object_by_id(cid, oid, endpoint, wallet_file, wallet_config, ttl=2):
cmd_line = f"frostfs-cli object search --ttl {ttl} -r {endpoint} --cid {cid} --oid {oid} --wallet {wallet_file} --config {wallet_config} "
if wallet_file:
wallet_file = "--wallet " + wallet_file
if wallet_config:
wallet_config = "--config " + wallet_config
cmd_line = f"frostfs-cli object search --ttl {ttl} -r {endpoint} --cid {cid} --oid {oid} {wallet_file} {wallet_config} "
output, success = execute_cmd(cmd_line)
if not success:
print(f" > Failed to search object {oid} for container {cid} \r\n"
f" > Error: {output}")
log(f"{cmd_line}\n"
f"Failed to search object {oid} for container {cid}\n"
f"Error: {output}", endpoint)
return False
re_rst = re.search(r'Found (\d+) objects', output)
if not re_rst:
raise Exception("Failed to parce search results")
raise Exception("Failed to parse search results")
return re_rst.group(1)

View file

@ -1,18 +1,19 @@
#!/usr/bin/python3
import argparse
from itertools import cycle
import json
import random
import sys
import tempfile
import time
from argparse import Namespace
from concurrent.futures import ProcessPoolExecutor
from helpers.cmd import random_payload
from helpers.frostfs_cli import create_container, upload_object
ERROR_NO_CONTAINERS = 1
ERROR_NO_OBJECTS = 2
ERROR_WRONG_CONTAINERS_COUNT = 1
ERROR_WRONG_OBJECTS_COUNT = 2
MAX_WORKERS = 50
parser = argparse.ArgumentParser()
@ -27,78 +28,95 @@ parser.add_argument(
help="Container placement policy",
default="REP 2 IN X CBF 2 SELECT 2 FROM * AS X"
)
parser.add_argument('--endpoint', help='Node address')
parser.add_argument('--endpoint', help='Nodes addresses separated by comma.')
parser.add_argument('--update', help='Save existed containers')
parser.add_argument('--ignore-errors', help='Ignore preset errors')
parser.add_argument('--ignore-errors', help='Ignore preset errors', action='store_true')
parser.add_argument('--workers', help='Count of workers in preset. Max = 50, Default = 50', default=50)
parser.add_argument('--sleep', help='Time to sleep between containers creation and objects upload (in seconds), '
'Default = 8', default=8)
args: Namespace = parser.parse_args()
print(args)
def main():
container_list = []
containers = []
objects_list = []
payload_filepath = '/tmp/data_file'
endpoints = args.endpoint.split(',')
wallet = args.wallet
wallet_config = args.config
workers = int(args.workers)
ignore_errors = True if args.ignore_errors else False
objects_per_container = int(args.preload_obj)
ignore_errors = args.ignore_errors
if args.update:
# Open file
with open(args.out) as f:
data_json = json.load(f)
container_list = data_json['containers']
containers = data_json['containers']
containers_count = len(containers)
else:
print(f"Create containers: {args.containers}")
containers_count = int(args.containers)
print(f"Create containers: {containers_count}")
with ProcessPoolExecutor(max_workers=min(MAX_WORKERS, workers)) as executor:
containers_runs = {executor.submit(create_container, endpoints[random.randrange(len(endpoints))],
args.policy, wallet, wallet_config): _ for _ in range(int(args.containers))}
containers_runs = [executor.submit(create_container, endpoint, args.policy, wallet, wallet_config)
for _, endpoint in
zip(range(containers_count), cycle(endpoints))]
for run in containers_runs:
if run.result():
container_list.append(run.result())
container_id = run.result()
if container_id:
containers.append(container_id)
print("Create containers: Completed")
print(f" > Containers: {container_list}")
if not container_list:
print("No containers to work with")
print(f" > Containers: {containers}")
if containers_count == 0 or len(containers) != containers_count:
print(f"Containers mismatch in preset: expected {containers_count}, created {len(containers)}")
if not ignore_errors:
sys.exit(ERROR_NO_CONTAINERS)
sys.exit(ERROR_WRONG_CONTAINERS_COUNT)
if args.sleep != 0:
print(f"Sleep for {args.sleep} seconds")
time.sleep(args.sleep)
print(f"Upload objects to each container: {args.preload_obj} ")
random_payload(payload_filepath, args.size)
payload_file = tempfile.NamedTemporaryFile()
random_payload(payload_file, args.size)
print(" > Create random payload: Completed")
for container in container_list:
print(f" > Upload objects for container {container}")
with ProcessPoolExecutor(max_workers=min(MAX_WORKERS, workers)) as executor:
objects_runs = {executor.submit(upload_object, container, payload_filepath,
endpoints[random.randrange(len(endpoints))], wallet, wallet_config): _ for _ in range(int(args.preload_obj))}
for run in objects_runs:
if run.result():
objects_list.append({'container': container, 'object': run.result()})
print(f" > Upload objects for container {container}: Completed")
total_objects = objects_per_container * containers_count
with ProcessPoolExecutor(max_workers=min(MAX_WORKERS, workers)) as executor:
objects_runs = [executor.submit(upload_object, container, payload_file.name,
endpoint, wallet, wallet_config)
for _, container, endpoint in
zip(range(total_objects), cycle(containers), cycle(endpoints))]
for run in objects_runs:
result = run.result()
if result:
container_id = result[0]
endpoint = result[1]
object_id = result[2]
objects_list.append({'container': container_id, 'object': object_id})
print(f" > Uploaded object {object_id} for container {container_id} via endpoint {endpoint}.")
print("Upload objects to each container: Completed")
if int(args.preload_obj) > 0 and not objects_list:
print("No objects were uploaded")
if total_objects > 0 and len(objects_list) != total_objects:
print(f"Objects mismatch in preset: expected {total_objects}, created {len(objects_list)}")
if not ignore_errors:
sys.exit(ERROR_NO_OBJECTS)
sys.exit(ERROR_WRONG_OBJECTS_COUNT)
data = {'containers': container_list, 'objects': objects_list, 'obj_size': args.size + " Kb"}
data = {'containers': containers, 'objects': objects_list, 'obj_size': args.size + " Kb"}
with open(args.out, 'w+') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
print("Result:")
print(f" > Total Containers has been created: {len(container_list)}.")
print(f" > Total Containers has been created: {len(containers)}.")
print(f" > Total Objects has been created: {len(objects_list)}.")

View file

@ -1,8 +1,11 @@
#!/usr/bin/python3
import argparse
from itertools import cycle
import json
import sys
import tempfile
import time
from concurrent.futures import ProcessPoolExecutor
from helpers.cmd import random_payload
@ -14,82 +17,101 @@ parser.add_argument('--size', help='Upload objects size in kb.')
parser.add_argument('--buckets', help='Number of buckets to create.')
parser.add_argument('--out', help='JSON file with output.')
parser.add_argument('--preload_obj', help='Number of pre-loaded objects.')
parser.add_argument('--endpoint', help='S3 Gateway address.')
parser.add_argument('--endpoint', help='S3 Gateways addresses separated by comma.')
parser.add_argument('--update', help='True/False, False by default. Save existed buckets from target file (--out). '
'New buckets will not be created.')
parser.add_argument('--location', help='AWS location. Will be empty, if has not be declared.', default="")
parser.add_argument('--versioning', help='True/False, False by default.')
parser.add_argument('--ignore-errors', help='Ignore preset errors')
parser.add_argument('--ignore-errors', help='Ignore preset errors', action='store_true')
parser.add_argument('--no-verify-ssl', help='Ignore SSL verifications', action='store_true')
parser.add_argument('--workers', help='Count of workers in preset. Max = 50, Default = 50', default=50)
parser.add_argument('--sleep', help='Time to sleep between buckets creation and objects upload (in seconds), '
'Default = 8', default=8)
args = parser.parse_args()
print(args)
ERROR_NO_BUCKETS = 1
ERROR_NO_OBJECTS = 2
ERROR_WRONG_CONTAINERS_COUNT = 1
ERROR_WRONG_OBJECTS_COUNT = 2
MAX_WORKERS = 50
def main():
bucket_list = []
buckets = []
objects_list = []
payload_filepath = '/tmp/data_file'
ignore_errors = True if args.ignore_errors else False
ignore_errors = args.ignore_errors
no_verify_ssl = args.no_verify_ssl
endpoints = args.endpoint.split(',')
workers = int(args.workers)
objects_per_bucket = int(args.preload_obj)
if args.update:
# Open file
with open(args.out) as f:
data_json = json.load(f)
bucket_list = data_json['buckets']
buckets = data_json['buckets']
buckets_count = len(buckets)
# Get CID list
else:
print(f"Create buckets: {args.buckets}")
buckets_count = int(args.buckets)
print(f"Create buckets: {buckets_count}")
with ProcessPoolExecutor(max_workers=min(MAX_WORKERS, workers)) as executor:
buckets_runs = {executor.submit(create_bucket, args.endpoint, args.versioning,
args.location): _ for _ in range(int(args.buckets))}
buckets_runs = [executor.submit(create_bucket, endpoint, args.versioning, args.location, no_verify_ssl)
for _, endpoint in
zip(range(buckets_count), cycle(endpoints))]
for run in buckets_runs:
if run.result() is not None:
bucket_list.append(run.result())
bucket_name = run.result()
if bucket_name:
buckets.append(bucket_name)
print("Create buckets: Completed")
print(f" > Buckets: {bucket_list}")
if not bucket_list:
print("No buckets to work with")
print(f" > Buckets: {buckets}")
if buckets_count == 0 or len(buckets) != buckets_count:
print(f"Buckets mismatch in preset: expected {buckets_count}, created {len(buckets)}")
if not ignore_errors:
sys.exit(ERROR_NO_BUCKETS)
sys.exit(ERROR_WRONG_CONTAINERS_COUNT)
print(f"Upload objects to each bucket: {args.preload_obj} ")
random_payload(payload_filepath, args.size)
if args.sleep != 0:
print(f"Sleep for {args.sleep} seconds")
time.sleep(args.sleep)
print(f"Upload objects to each bucket: {objects_per_bucket} ")
payload_file = tempfile.NamedTemporaryFile()
random_payload(payload_file, args.size)
print(" > Create random payload: Completed")
for bucket in bucket_list:
print(f" > Upload objects for bucket {bucket}")
with ProcessPoolExecutor(max_workers=min(MAX_WORKERS, workers)) as executor:
objects_runs = {executor.submit(upload_object, bucket, payload_filepath,
args.endpoint): _ for _ in range(int(args.preload_obj))}
total_objects = objects_per_bucket * buckets_count
for run in objects_runs:
if run.result() is not None:
objects_list.append({'bucket': bucket, 'object': run.result()})
print(f" > Upload objects for bucket {bucket}: Completed")
with ProcessPoolExecutor(max_workers=min(MAX_WORKERS, workers)) as executor:
objects_runs = [executor.submit(upload_object, bucket, payload_file.name, endpoint, no_verify_ssl)
for _, bucket, endpoint in
zip(range(total_objects), cycle(buckets), cycle(endpoints))]
for run in objects_runs:
result = run.result()
if result:
bucket = result[0]
endpoint = result[1]
object_id = result[2]
objects_list.append({'bucket': bucket, 'object': object_id})
print(f" > Uploaded object {object_id} for bucket {bucket} via endpoint {endpoint}.")
print("Upload objects to each bucket: Completed")
if int(args.preload_obj) > 0 and not objects_list:
print("No objects were uploaded")
if total_objects > 0 and len(objects_list) != total_objects:
print(f"Objects mismatch in preset: expected {total_objects}, created {len(objects_list)}")
if not ignore_errors:
sys.exit(ERROR_NO_OBJECTS)
sys.exit(ERROR_WRONG_OBJECTS_COUNT)
data = {'buckets': bucket_list, 'objects': objects_list, 'obj_size': args.size + " Kb"}
data = {'buckets': buckets, 'objects': objects_list, 'obj_size': args.size + " Kb"}
with open(args.out, 'w+') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
print("Result:")
print(f" > Total Buckets has been created: {len(bucket_list)}.")
print(f" > Total Buckets has been created: {len(buckets)}.")
print(f" > Total Objects has been created: {len(objects_list)}.")

View file

@ -18,6 +18,13 @@ Scenarios `grpc.js`, `local.js`, `http.js` and `s3.js` support the following opt
* `SLEEP_WRITE` - time interval (in seconds) between writing VU iterations.
* `SLEEP_READ` - time interval (in seconds) between reading VU iterations.
* `SELECTION_SIZE` - size of batch to select for deletion (default: 1000).
* `PAYLOAD_TYPE` - type of an object payload ("random" or "text", default: "random").
Additionally, the profiling extension can be enabled to generate CPU and memory profiles which can be inspected with `go tool pprof file.prof`:
```shell
$ ./k6 run --out profile (...)
```
The profiles are saved in the current directory as `cpu.prof` and `mem.prof`, respectively.
## Common options for the local scenarios:
@ -128,6 +135,31 @@ Options (in addition to the common options):
* `SLEEP_DELETE` - time interval (in seconds) between deleting VU iterations.
* `OBJ_NAME` - if specified, this name will be used for all write operations instead of random generation.
## S3 Multipart
Perform multipart upload operation, break up large objects, so they can be transferred in multiple parts, in parallel
```shell
$ ./k6 run -e DURATION=600 \
-e WRITERS=400 -e WRITERS_MULTIPART=10 \
-e WRITE_OBJ_SIZE=524288 -e WRITE_OBJ_PART_SIZE=10240 \
-e S3_ENDPOINTS=10.78.70.142:8084,10.78.70.143:8084,10.78.70.144:8084,10.78.70.145:8084 \
-e PREGEN_JSON=/home/service/s3_4kb.json \
scenarios/s3_multipart.js
```
Options:
* `DURATION` - duration of scenario in seconds.
* `REGISTRY_FILE` - if set, all produced objects will be stored in database for subsequent verification. Database file name will be set to the value of `REGISTRY_FILE`.
* `PREGEN_JSON` - path to json file with pre-generated containers.
* `SLEEP_WRITE` - time interval (in seconds) between writing VU iterations.
* `PAYLOAD_TYPE` - type of an object payload ("random" or "text", default: "random").
* `S3_ENDPOINTS` - - endpoints of S3 gateways in format `host:port`. To specify multiple endpoints separate them by comma.
* `WRITERS` - number of VUs performing upload payload operation
* `WRITERS_MULTIPART` - number of goroutines that will upload parts in parallel
* `WRITE_OBJ_SIZE` - object size in kb for write(PUT) operations.
* `WRITE_OBJ_PART_SIZE` - part size in kb for multipart upload operations (must be greater or equal 5mb).
## S3 Local
1. Follow steps 1. and 2. from the normal S3 scenario in order to obtain credentials and a preset file with the information about the buckets and objects that were pre-created.

View file

@ -21,10 +21,12 @@ const bucket_list = new SharedArray('bucket_list', function () {
const read_size = JSON.parse(open(__ENV.PREGEN_JSON)).obj_size;
const summary_json = __ENV.SUMMARY_JSON || "/tmp/summary.json";
const no_verify_ssl = __ENV.NO_VERIFY_SSL || "true";
const connection_args = {no_verify_ssl: no_verify_ssl}
// Select random S3 endpoint for current VU
const s3_endpoints = __ENV.S3_ENDPOINTS.split(',');
const s3_endpoint = s3_endpoints[Math.floor(Math.random() * s3_endpoints.length)];
const s3_client = s3.connect(`http://${s3_endpoint}`);
const s3_client = s3.connect(s3_endpoint, connection_args);
const log = logging.new().withField("endpoint", s3_endpoint);
const registry_enabled = !!__ENV.REGISTRY_FILE;
@ -46,7 +48,21 @@ if (registry_enabled && delete_age) {
);
}
const generator = datagen.generator(1024 * parseInt(__ENV.WRITE_OBJ_SIZE));
const read_age = __ENV.READ_AGE ? parseInt(__ENV.READ_AGE) : 10;
let obj_to_read_selector = undefined;
if (registry_enabled) {
obj_to_read_selector = registry.getSelector(
__ENV.REGISTRY_FILE,
"obj_to_read",
__ENV.SELECTION_SIZE ? parseInt(__ENV.SELECTION_SIZE) : 0,
{
status: "created",
age: read_age,
}
)
}
const generator = datagen.generator(1024 * parseInt(__ENV.WRITE_OBJ_SIZE), __ENV.PAYLOAD_TYPE || "");
const scenarios = {};
@ -102,12 +118,17 @@ export function setup() {
console.log(`Writing VUs: ${write_vu_count}`);
console.log(`Deleting VUs: ${delete_vu_count}`);
console.log(`Total VUs: ${total_vu_count}`);
const start_timestamp = Date.now()
console.log(`Load started at: ${Date(start_timestamp).toString()}`)
}
export function teardown(data) {
if (obj_registry) {
obj_registry.close();
}
const end_timestamp = Date.now()
console.log(`Load finished at: ${Date(end_timestamp).toString()}`)
}
export function handleSummary(data) {
@ -142,6 +163,18 @@ export function obj_read() {
sleep(__ENV.SLEEP_READ);
}
if(obj_to_read_selector) {
const obj = obj_to_read_selector.nextObject();
if (!obj) {
return;
}
const resp = s3_client.get(obj.s3_bucket, obj.s3_key)
if (!resp.success) {
log.withFields({bucket: obj.s3_bucket, key: obj.s3_key}).error(resp.error);
}
return
}
const obj = obj_list[Math.floor(Math.random() * obj_list.length)];
const resp = s3_client.get(obj.bucket, obj.object);

View file

@ -24,7 +24,9 @@ const summary_json = __ENV.SUMMARY_JSON || "/tmp/summary.json";
// Select random S3 endpoint for current VU
const s3_endpoints = __ENV.S3_ENDPOINTS.split(',');
const s3_endpoint = s3_endpoints[Math.floor(Math.random() * s3_endpoints.length)];
const s3_client = s3.connect(`http://${s3_endpoint}`);
const no_verify_ssl = __ENV.NO_VERIFY_SSL || "true";
const connection_args = {no_verify_ssl: no_verify_ssl}
const s3_client = s3.connect(s3_endpoint, connection_args);
const log = logging.new().withField("endpoint", s3_endpoint);
const registry_enabled = !!__ENV.REGISTRY_FILE;
@ -46,6 +48,20 @@ if (registry_enabled && delete_age) {
);
}
const read_age = __ENV.READ_AGE ? parseInt(__ENV.READ_AGE) : 10;
let obj_to_read_selector = undefined;
if (registry_enabled) {
obj_to_read_selector = registry.getSelector(
__ENV.REGISTRY_FILE,
"obj_to_read",
__ENV.SELECTION_SIZE ? parseInt(__ENV.SELECTION_SIZE) : 0,
{
status: "created",
age: read_age,
}
)
}
const generator = datagen.generator(1024 * parseInt(__ENV.WRITE_OBJ_SIZE));
const scenarios = {};
@ -129,12 +145,17 @@ export function setup() {
console.log(`Read rate: ${read_rate}`);
console.log(`Writing rate: ${write_rate}`);
console.log(`Delete rate: ${delete_rate}`);
const start_timestamp = Date.now()
console.log(`Load started at: ${Date(start_timestamp).toString()}`)
}
export function teardown(data) {
if (obj_registry) {
obj_registry.close();
}
const end_timestamp = Date.now()
console.log(`Load finished at: ${Date(end_timestamp).toString()}`)
}
export function handleSummary(data) {
@ -169,6 +190,18 @@ export function obj_read() {
sleep(__ENV.SLEEP_READ);
}
if(obj_to_read_selector) {
const obj = obj_to_read_selector.nextObject();
if (!obj) {
return;
}
const resp = s3_client.get(obj.s3_bucket, obj.s3_key)
if (!resp.success) {
log.withFields({bucket: obj.s3_bucket, key: obj.s3_key}).error(resp.error);
}
return
}
const obj = obj_list[Math.floor(Math.random() * obj_list.length)];
const resp = s3_client.get(obj.bucket, obj.object);

106
scenarios/s3_multipart.js Normal file
View file

@ -0,0 +1,106 @@
import datagen from 'k6/x/frostfs/datagen';
import logging from 'k6/x/frostfs/logging';
import registry from 'k6/x/frostfs/registry';
import s3 from 'k6/x/frostfs/s3';
import {SharedArray} from 'k6/data';
import {sleep} from 'k6';
import {textSummary} from './libs/k6-summary-0.0.2.js';
import {parseEnv} from './libs/env-parser.js';
import {uuidv4} from './libs/k6-utils-1.4.0.js';
parseEnv();
const bucket_list = new SharedArray('bucket_list', function () {
return JSON.parse(open(__ENV.PREGEN_JSON)).buckets;
});
const summary_json = __ENV.SUMMARY_JSON || "/tmp/summary.json";
// Select random S3 endpoint for current VU
const s3_endpoints = __ENV.S3_ENDPOINTS.split(',');
const s3_endpoint = s3_endpoints[Math.floor(Math.random() * s3_endpoints.length)];
const no_verify_ssl = __ENV.NO_VERIFY_SSL || "true";
const connection_args = {no_verify_ssl: no_verify_ssl}
const s3_client = s3.connect(s3_endpoint, connection_args);
const log = logging.new().withField("endpoint", s3_endpoint);
const registry_enabled = !!__ENV.REGISTRY_FILE;
const obj_registry = registry_enabled ? registry.open(__ENV.REGISTRY_FILE) : undefined;
const duration = __ENV.DURATION;
const generator = datagen.generator(1024 * parseInt(__ENV.WRITE_OBJ_SIZE), __ENV.PAYLOAD_TYPE || "");
const scenarios = {};
const write_vu_count = parseInt(__ENV.WRITERS || '0');
if (write_vu_count < 1) {
throw 'number of VUs (env WRITERS) performing write operations should be greater than 0';
}
const write_multipart_vu_count = parseInt(__ENV.WRITERS_MULTIPART || '0');
if (write_multipart_vu_count < 1) {
throw 'number of parts (env WRITERS_MULTIPART) to upload in parallel should be greater than 0';
}
if (write_vu_count > 0) {
scenarios.write_multipart = {
executor: 'constant-vus',
vus: write_vu_count,
duration: `${duration}s`,
exec: 'obj_write_multipart',
gracefulStop: '5s',
};
}
export const options = {
scenarios,
setupTimeout: '5s',
};
export function setup() {
const total_vu_count = write_vu_count * write_multipart_vu_count;
console.log(`Pregenerated buckets: ${bucket_list.length}`);
console.log(`Writing VUs: ${write_vu_count}`);
console.log(`Writing multipart VUs: ${write_multipart_vu_count}`);
console.log(`Total VUs: ${total_vu_count}`);
}
export function teardown(data) {
if (obj_registry) {
obj_registry.close();
}
}
export function handleSummary(data) {
return {
'stdout': textSummary(data, {indent: ' ', enableColors: false}),
[summary_json]: JSON.stringify(data),
};
}
const write_multipart_part_size = 1024 * parseInt(__ENV.WRITE_OBJ_PART_SIZE || '0')
if (write_multipart_part_size < 5 * 1024 * 1024) {
throw 'part size (env WRITE_OBJ_PART_SIZE * 1024) must be greater than (5 MB)';
}
export function obj_write_multipart() {
if (__ENV.SLEEP_WRITE) {
sleep(__ENV.SLEEP_WRITE);
}
const key = __ENV.OBJ_NAME || uuidv4();
const bucket = bucket_list[Math.floor(Math.random() * bucket_list.length)];
const {payload, hash} = generator.genPayload(registry_enabled);
const resp = s3_client.multipart(bucket, key, write_multipart_part_size, write_multipart_vu_count, payload);
if (!resp.success) {
log.withFields({bucket: bucket, key: key}).error(resp.error);
return;
}
if (obj_registry) {
obj_registry.addObject("", "", bucket, key, hash);
}
}

View file

@ -36,14 +36,27 @@ const read_size = JSON.parse(open(__ENV.PREGEN_JSON)).obj_size;
const summary_json = __ENV.SUMMARY_JSON || "/tmp/summary.json";
const config_file = __ENV.CONFIG_FILE;
const s3_client = s3local.connect(config_file, {
const config_dir = __ENV.CONFIG_DIR;
const s3_client = s3local.connect(config_file, config_dir, {
'debug_logger': __ENV.DEBUG_LOGGER || 'false',
}, bucket_mapping());
const log = logging.new().withField("config", config_file);
const log = logging.new().withFields({"config_file": config_file,"config_dir": config_dir});
const registry_enabled = !!__ENV.REGISTRY_FILE;
const obj_registry = registry_enabled ? registry.open(__ENV.REGISTRY_FILE) : undefined;
let obj_to_read_selector = undefined;
if (registry_enabled) {
obj_to_read_selector = registry.getSelector(
__ENV.REGISTRY_FILE,
"obj_to_read",
__ENV.SELECTION_SIZE ? parseInt(__ENV.SELECTION_SIZE) : 0,
{
status: "created",
}
)
}
const duration = __ENV.DURATION;
const generator = datagen.generator(1024 * parseInt(__ENV.WRITE_OBJ_SIZE));
@ -86,12 +99,17 @@ export function setup() {
console.log(`Reading VUs: ${read_vu_count}`);
console.log(`Writing VUs: ${write_vu_count}`);
console.log(`Total VUs: ${total_vu_count}`);
const start_timestamp = Date.now()
console.log(`Load started at: ${Date(start_timestamp).toString()}`)
}
export function teardown(data) {
if (obj_registry) {
obj_registry.close();
}
const end_timestamp = Date.now()
console.log(`Load finished at: ${Date(end_timestamp).toString()}`)
}
export function handleSummary(data) {
@ -118,6 +136,18 @@ export function obj_write() {
}
export function obj_read() {
if(obj_to_read_selector) {
const obj = obj_to_read_selector.nextObject();
if (!obj) {
return;
}
const resp = s3_client.get(obj.s3_bucket, obj.s3_key)
if (!resp.success) {
log.withFields({bucket: obj.s3_bucket, key: obj.s3_key}).error(resp.error);
}
return
}
const obj = obj_list[Math.floor(Math.random() * obj_list.length)];
const resp = s3_client.get(obj.bucket, obj.object);

View file

@ -1,6 +1,7 @@
import native from 'k6/x/frostfs/native';
import registry from 'k6/x/frostfs/registry';
import s3 from 'k6/x/frostfs/s3';
import logging from 'k6/x/frostfs/logging';
import { sleep } from 'k6';
import { Counter } from 'k6/metrics';
import { textSummary } from './libs/k6-summary-0.0.2.js';
@ -25,20 +26,29 @@ const obj_counters = {
invalid: new Counter('invalid_obj'),
};
let log = logging.new();
// Connect to random gRPC endpoint
let grpc_client = undefined;
if (__ENV.GRPC_ENDPOINTS) {
const grpcEndpoints = __ENV.GRPC_ENDPOINTS.split(',');
const grpcEndpoint = grpcEndpoints[Math.floor(Math.random() * grpcEndpoints.length)];
grpc_client = native.connect(grpcEndpoint, '', __ENV.DIAL_TIMEOUT ? parseInt(__ENV.DIAL_TIMEOUT) : 0, __ENV.STREAM_TIMEOUT ? parseInt(__ENV.STREAM_TIMEOUT) : 0);
log = log.withField("endpoint", grpcEndpoint);
grpc_client = native.connect(grpcEndpoint, '',
__ENV.DIAL_TIMEOUT ? parseInt(__ENV.DIAL_TIMEOUT) : 0,
__ENV.STREAM_TIMEOUT ? parseInt(__ENV.STREAM_TIMEOUT) : 0,
__ENV.PREPARE_LOCALLY ? __ENV.PREPARE_LOCALLY.toLowerCase() === "true" : false, '');
}
// Connect to random S3 endpoint
let s3_client = undefined;
if (__ENV.S3_ENDPOINTS) {
const no_verify_ssl = __ENV.NO_VERIFY_SSL || "true";
const connection_args = {no_verify_ssl: no_verify_ssl}
const s3_endpoints = __ENV.S3_ENDPOINTS.split(',');
const s3_endpoint = s3_endpoints[Math.floor(Math.random() * s3_endpoints.length)];
s3_client = s3.connect(`http://${s3_endpoint}`);
log = log.withField("endpoint", s3_endpoint);
s3_client = s3.connect(s3_endpoint, connection_args);
}
// We will attempt to verify every object in "created" status. The scenario will execute
@ -93,30 +103,45 @@ export function handleSummary(data) {
}
export function obj_verify() {
if (obj_to_verify_count == 0) {
log.info("Nothing to verify");
return;
}
if (__ENV.SLEEP) {
sleep(__ENV.SLEEP);
}
const obj = obj_to_verify_selector.nextObject();
if (!obj) {
console.log("All objects have been verified");
log.info("All objects have been verified");
return;
}
const obj_status = verify_object_with_retries(obj, 3);
obj_counters[obj_status].add(1);
obj_registry.setObjectStatus(obj.id, obj_status);
obj_registry.setObjectStatus(obj.id, obj.status, obj_status);
}
function verify_object_with_retries(obj, attempts) {
for (let i = 0; i < attempts; i++) {
let result;
// Different name is required.
// ReferenceError: Cannot access a variable before initialization.
let lg = log;
if (obj.c_id && obj.o_id) {
lg = lg.withFields({cid: obj.c_id, oid: obj.o_id});
result = grpc_client.verifyHash(obj.c_id, obj.o_id, obj.payload_hash);
} else if (obj.s3_bucket && obj.s3_key) {
lg = lg.withFields({bucket: obj.s3_bucket, key: obj.s3_key});
result = s3_client.verifyHash(obj.s3_bucket, obj.s3_key, obj.payload_hash);
} else {
console.log(`Object id=${obj.id} cannot be verified with supported protocols`);
lg.withFields({
cid: obj.c_id,
oid: obj.o_id,
bucket: obj.s3_bucket,
key: obj.s3_key
}).warn(`Object cannot be verified with supported protocols`);
return "skipped";
}
@ -127,7 +152,7 @@ function verify_object_with_retries(obj, attempts) {
}
// Unless we explicitly saw that there was a hash mismatch, then we will retry after a delay
console.log(`Verify error on ${obj.id}: ${result.error}. Object will be re-tried`);
lg.error(`Verify error: ${result.error}. Object will be re-tried`);
sleep(__ENV.SLEEP);
}