Compare commits

..

No commits in common. "master" and "bugfix/support-ec" have entirely different histories.

26 changed files with 171 additions and 843 deletions

View file

@ -3,8 +3,8 @@
First, thank you for contributing! We love and encourage pull requests from
everyone. Please follow the guidelines:
- Check the open [issues](https://git.frostfs.info/TrueCloudLab/xk6-frostfs/issues) and
[pull requests](https://git.frostfs.info/TrueCloudLab/xk6-frostfs/pulls) for existing
- Check the open [issues](https://github.com/TrueCloudLab/xk6-frostfs/issues) and
[pull requests](https://github.com/TrueCloudLab/xk6-frostfs/pulls) for existing
discussions.
- Open an issue first, to discuss a new feature or enhancement.
@ -28,18 +28,18 @@ send a pull request. We encourage pull requests to discuss code changes. Here
are the steps in details:
### Set up your GitHub Repository
Fork [xk6-frostfs upstream](https://git.frostfs.info/TrueCloudLab/xk6-frostfs/fork) source
Fork [xk6-frostfs upstream](https://github.com/TrueCloudLab/xk6-frostfs/fork) source
repository to your own personal repository. Copy the URL of your fork (you will
need it for the `git clone` command below).
```sh
$ git clone https://git.frostfs.info/TrueCloudLab/xk6-frostfs
$ git clone https://github.com/TrueCloudLab/xk6-frostfs
```
### Set up git remote as ``upstream``
```sh
$ cd xk6-frostfs
$ git remote add upstream https://git.frostfs.info/TrueCloudLab/xk6-frostfs
$ git remote add upstream https://github.com/TrueCloudLab/xk6-frostfs
$ git fetch upstream
$ git merge upstream/master
...

View file

@ -1,5 +1,5 @@
<p align="center">
<img src="./.forgejo/logo.svg" width="500px" alt="FrostFS logo">
<img src="./.github/logo.svg" width="500px" alt="FrostFS logo">
</p>
<p align="center">
<a href="https://go.k6.io/k6">k6</a> extension to test and benchmark FrostFS related protocols.
@ -48,11 +48,10 @@ Create native client with `connect` method. Arguments:
- dial timeout in seconds (0 for the default value)
- stream timeout in seconds (0 for the default value)
- generate object header on the client side (for big object - split locally too)
- max size for generated object header on the client side (for big object - the size that the object is splitted into)
```js
import native from 'k6/x/frostfs/native';
const frostfs_cli = native.connect("s01.frostfs.devenv:8080", "", 0, 0, false, 0)
const frostfs_cli = native.connect("s01.frostfs.devenv:8080", "", 0, 0, false)
```
### Methods

View file

@ -1,55 +0,0 @@
package importer
import (
"encoding/json"
"os"
"git.frostfs.info/TrueCloudLab/xk6-frostfs/internal/registry"
)
type PreGenObj struct {
Bucket string `json:"bucket"`
Object string `json:"object"`
Container string `json:"container"`
}
type PreGenerateInfo struct {
Buckets []string `json:"buckets"`
Containers []string `json:"containers"`
Objects []PreGenObj `json:"objects"`
ObjSize string `json:"obj_size"`
}
// ImportJSONPreGen writes objects from pregenerated JSON file
// to the registry.
// Note that ImportJSONPreGen does not check if object already
// exists in the registry so in case of re-entry the registry
// will have two entities representing the same object.
func ImportJSONPreGen(o *registry.ObjRegistry, filename string) error {
f, err := os.ReadFile(filename)
if err != nil {
return err
}
var pregenInfo PreGenerateInfo
err = json.Unmarshal(f, &pregenInfo)
if err != nil {
return err
}
// AddObject uses DB.Batch to combine concurrent Batch calls
// into a single Bolt transaction. DB.Batch is limited by
// DB.MaxBatchDelay which may affect perfomance.
for _, obj := range pregenInfo.Objects {
if obj.Bucket != "" {
err = o.AddObject("", "", obj.Bucket, obj.Object, "")
} else {
err = o.AddObject(obj.Container, obj.Object, "", "", "")
}
if err != nil {
return err
}
}
return nil
}

View file

@ -1,27 +0,0 @@
package importer
import (
"git.frostfs.info/TrueCloudLab/xk6-frostfs/internal/registry"
"github.com/spf13/cobra"
)
// Cmd represents the import command.
var Cmd = &cobra.Command{
Use: "import",
Short: "Import objects into registry",
Long: "Import objects into registry from pregenerated files",
Example: `xk6-registry import registry.bolt preset.json
xk6-registry import --status created registry.bolt preset.json another_preset.json`,
RunE: runCmd,
Args: cobra.MinimumNArgs(2),
}
func runCmd(cmd *cobra.Command, args []string) error {
objRegistry := registry.NewObjRegistry(cmd.Context(), args[0])
for i := 1; i < len(args); i++ {
if err := ImportJSONPreGen(objRegistry, args[i]); err != nil {
return err
}
}
return nil
}

View file

@ -1,18 +0,0 @@
package main
import (
"context"
"os"
"os/signal"
"syscall"
)
func main() {
ctx, _ := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM, syscall.SIGHUP)
if cmd, err := rootCmd.ExecuteContextC(ctx); err != nil {
cmd.PrintErrln("Error:", err.Error())
cmd.PrintErrf("Run '%v --help' for usage.\n", cmd.CommandPath())
os.Exit(1)
}
}

View file

@ -1,33 +0,0 @@
package main
import (
"runtime"
"git.frostfs.info/TrueCloudLab/xk6-frostfs/cmd/xk6-registry/importer"
"git.frostfs.info/TrueCloudLab/xk6-frostfs/internal/version"
"github.com/spf13/cobra"
)
var rootCmd = &cobra.Command{
Use: "xk6-registry",
Version: version.Version,
Short: "Command Line Tool to work with Registry",
Long: `Registry provides tools to work with object registry for xk6.
It contains command for importing objects in registry from preset`,
SilenceErrors: true,
SilenceUsage: true,
Run: rootCmdRun,
}
func init() {
cobra.AddTemplateFunc("runtimeVersion", runtime.Version)
rootCmd.SetVersionTemplate(`FrostFS xk6-registry
{{printf "Version: %s" .Version }}
GoVersion: {{ runtimeVersion }}
`)
rootCmd.AddCommand(importer.Cmd)
}
func rootCmdRun(cmd *cobra.Command, _ []string) {
_ = cmd.Usage()
}

View file

@ -3,11 +3,11 @@ import { fail } from "k6";
import { uuidv4 } from '../scenarios/libs/k6-utils-1.4.0.js';
const payload = open('../go.sum', 'b');
const frostfs_cli = native.connect("s01.frostfs.devenv:8080", "1dd37fba80fec4e6a6f13fd708d8dcb3b29def768017052f6c930fa1c5d90bbb", 0, 0, false, 0)
const frostfs_cli = native.connect("s01.frostfs.devenv:8080", "1dd37fba80fec4e6a6f13fd708d8dcb3b29def768017052f6c930fa1c5d90bbb", 0, 0, false)
export const options = {
stages: [
{ duration: '30s', target: 10 },
{duration: '30s', target: 10},
],
};
@ -24,7 +24,7 @@ export function setup() {
fail(res.error)
}
console.info("created container", res.container_id)
return { container_id: res.container_id }
return {container_id: res.container_id}
}
export default function (data) {

View file

@ -3,7 +3,7 @@ import { uuidv4 } from '../scenarios/libs/k6-utils-1.4.0.js';
const payload = open('../go.sum', 'b');
const container = "AjSxSNNXbJUDPqqKYm1VbFVDGCakbpUNH8aGjPmGAH3B"
const frostfs_cli = native.connect("s01.frostfs.devenv:8080", "", 0, 0, false, 0)
const frostfs_cli = native.connect("s01.frostfs.devenv:8080", "", 0, 0, false)
const frostfs_obj = frostfs_cli.onsite(container, payload)
export const options = {
@ -14,11 +14,11 @@ export const options = {
export default function () {
let headers = {
'unique_header': uuidv4()
'unique_header': uuidv4()
}
let resp = frostfs_obj.put(headers)
if (resp.success) {
frostfs_cli.get(container, resp.object_id)
frostfs_cli.get(container, resp.object_id)
} else {
console.log(resp.error)
}

View file

@ -1,58 +0,0 @@
package native
import (
"context"
"sync"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
)
const networkCacheTTL = time.Minute
var networkInfoCache = &networkInfoCacheT{}
type networkInfoCacheT struct {
guard sync.RWMutex
current *netmap.NetworkInfo
fetchTS time.Time
}
func (c *networkInfoCacheT) getOrFetch(ctx context.Context, cli *client.Client) (*netmap.NetworkInfo, error) {
if v := c.get(); v != nil {
return v, nil
}
return c.fetch(ctx, cli)
}
func (c *networkInfoCacheT) get() *netmap.NetworkInfo {
c.guard.RLock()
defer c.guard.RUnlock()
if c.current == nil || time.Since(c.fetchTS) > networkCacheTTL {
return nil
}
return c.current
}
func (c *networkInfoCacheT) fetch(ctx context.Context, cli *client.Client) (*netmap.NetworkInfo, error) {
c.guard.Lock()
defer c.guard.Unlock()
if time.Since(c.fetchTS) <= networkCacheTTL {
return c.current, nil
}
res, err := cli.NetworkInfo(ctx, client.PrmNetworkInfo{})
if err != nil {
return nil, err
}
v := res.Info()
c.current = &v
c.fetchTS = time.Now()
return c.current, nil
}

View file

@ -35,7 +35,6 @@ type (
tok session.Object
cli *client.Client
prepareLocally bool
maxObjSize uint64
}
PutResponse struct {
@ -72,7 +71,6 @@ type (
hdr object.Object
payload []byte
prepareLocally bool
maxObjSize uint64
}
)
@ -105,7 +103,7 @@ func (c *Client) Put(containerID string, headers map[string]string, payload data
o.SetOwnerID(owner)
o.SetAttributes(attrs...)
resp, err := put(c.vu, c.cli, c.prepareLocally, &tok, &o, payload, chunkSize, c.maxObjSize)
resp, err := put(c.vu, c.cli, c.prepareLocally, &tok, &o, payload, chunkSize)
if err != nil {
return PutResponse{Success: false, Error: err.Error()}
}
@ -242,7 +240,7 @@ func (c *Client) VerifyHash(containerID, objectID, expectedHash string) VerifyHa
}
actualHash := hex.EncodeToString(hasher.Sum(nil))
if actualHash != expectedHash {
return VerifyHashResponse{Success: false, Error: "hash mismatch"}
return VerifyHashResponse{Success: true, Error: "hash mismatch"}
}
return VerifyHashResponse{Success: true}
@ -375,7 +373,6 @@ func (c *Client) Onsite(containerID string, payload datagen.Payload) PreparedObj
hdr: *obj,
payload: data,
prepareLocally: c.prepareLocally,
maxObjSize: c.maxObjSize,
}
}
@ -401,7 +398,7 @@ func (p PreparedObject) Put(headers map[string]string) PutResponse {
return PutResponse{Success: false, Error: err.Error()}
}
_, err = put(p.vu, p.cli, p.prepareLocally, nil, &obj, datagen.NewFixedPayload(p.payload), 0, p.maxObjSize)
_, err = put(p.vu, p.cli, p.prepareLocally, nil, &obj, datagen.NewFixedPayload(p.payload), 0)
if err != nil {
return PutResponse{Success: false, Error: err.Error()}
}
@ -416,7 +413,7 @@ func (s epochSource) CurrentEpoch() uint64 {
}
func put(vu modules.VU, cli *client.Client, prepareLocally bool, tok *session.Object,
hdr *object.Object, payload datagen.Payload, chunkSize int, maxObjSize uint64,
hdr *object.Object, payload datagen.Payload, chunkSize int,
) (*client.ResObjectPut, error) {
bufSize := defaultBufferSize
if chunkSize > 0 {
@ -437,16 +434,13 @@ func put(vu modules.VU, cli *client.Client, prepareLocally bool, tok *session.Ob
prm.MaxChunkLength = chunkSize
}
if prepareLocally {
ni, err := networkInfoCache.getOrFetch(vu.Context(), cli)
res, err := cli.NetworkInfo(vu.Context(), client.PrmNetworkInfo{})
if err != nil {
return nil, err
}
prm.MaxSize = ni.MaxObjectSize()
prm.EpochSource = epochSource(ni.CurrentEpoch())
prm.MaxSize = res.Info().MaxObjectSize()
prm.EpochSource = epochSource(res.Info().CurrentEpoch())
prm.WithoutHomomorphHash = true
if maxObjSize > 0 {
prm.MaxSize = maxObjSize
}
}
objectWriter, err := cli.ObjectPutInit(vu.Context(), prm)

View file

@ -52,17 +52,13 @@ func (n *Native) Exports() modules.Exports {
return modules.Exports{Default: n}
}
func (n *Native) Connect(endpoint, hexPrivateKey string, dialTimeout, streamTimeout int, prepareLocally bool, maxObjSize int) (*Client, error) {
func (n *Native) Connect(endpoint, hexPrivateKey string, dialTimeout, streamTimeout int, prepareLocally bool) (*Client, error) {
var (
cli client.Client
pk *keys.PrivateKey
err error
)
if maxObjSize < 0 {
return nil, fmt.Errorf("max object size value must be positive")
}
pk, err = keys.NewPrivateKey()
if len(hexPrivateKey) != 0 {
pk, err = keys.NewPrivateKeyFromHex(hexPrivateKey)
@ -118,16 +114,6 @@ func (n *Native) Connect(endpoint, hexPrivateKey string, dialTimeout, streamTime
tok.SetAuthKey(&key)
tok.SetExp(exp)
if prepareLocally && maxObjSize > 0 {
res, err := cli.NetworkInfo(n.vu.Context(), client.PrmNetworkInfo{})
if err != nil {
return nil, err
}
if uint64(maxObjSize) > res.Info().MaxObjectSize() {
return nil, fmt.Errorf("max object size must be not greater than %d bytes", res.Info().MaxObjectSize())
}
}
// register metrics
objPutSuccess, _ = stats.Registry.NewMetric("frostfs_obj_put_success", metrics.Counter)
@ -154,6 +140,5 @@ func (n *Native) Connect(endpoint, hexPrivateKey string, dialTimeout, streamTime
tok: tok,
cli: &cli,
prepareLocally: prepareLocally,
maxObjSize: uint64(maxObjSize),
}, nil
}

View file

@ -10,17 +10,14 @@ type ObjExporter struct {
}
type PreGenerateInfo struct {
Buckets []string `json:"buckets"`
Containers []string `json:"containers"`
Objects []ObjInfo `json:"objects"`
ObjSize string `json:"obj_size"`
Buckets []string `json:"buckets"`
Objects []ObjInfo `json:"objects"`
ObjSize string `json:"obj_size"`
}
type ObjInfo struct {
Bucket string `json:"bucket"`
Object string `json:"object"`
CID string `json:"cid"`
OID string `json:"oid"`
}
func NewObjExporter(selector *ObjSelector) *ObjExporter {
@ -40,7 +37,6 @@ func (o *ObjExporter) ExportJSONPreGen(fileName string) error {
}
bucketMap := make(map[string]struct{})
containerMap := make(map[string]struct{})
count, err := o.selector.Count()
if err != nil {
@ -54,7 +50,7 @@ func (o *ObjExporter) ExportJSONPreGen(fileName string) error {
break
}
if err = writeObjectInfo(comma, info, f); err != nil {
if _, err = f.WriteString(fmt.Sprintf(`%s{"bucket":"%s","object":"%s"}`, comma, info.S3Bucket, info.S3Key)); err != nil {
return err
}
@ -62,54 +58,15 @@ func (o *ObjExporter) ExportJSONPreGen(fileName string) error {
comma = ","
}
if info.S3Bucket != "" {
bucketMap[info.S3Bucket] = struct{}{}
}
if info.CID != "" {
containerMap[info.CID] = struct{}{}
}
bucketMap[info.S3Bucket] = struct{}{}
}
if _, err = f.WriteString(`]`); err != nil {
return err
}
if len(bucketMap) > 0 {
if err = writeContainerInfo("buckets", bucketMap, f); err != nil {
return err
}
}
if len(containerMap) > 0 {
if err = writeContainerInfo("containers", containerMap, f); err != nil {
return err
}
}
if _, err = f.WriteString(`}`); err != nil {
return err
}
return nil
}
func writeObjectInfo(comma string, info *ObjectInfo, f *os.File) (err error) {
var res string
if info.S3Bucket != "" || info.S3Key != "" {
res = fmt.Sprintf(`%s{"bucket":"%s","object":"%s"}`, comma, info.S3Bucket, info.S3Key)
} else {
res = fmt.Sprintf(`%s{"cid":"%s","oid":"%s"}`, comma, info.CID, info.OID)
}
_, err = f.WriteString(res)
return err
}
func writeContainerInfo(attrName string, bucketMap map[string]struct{}, f *os.File) (err error) {
if _, err = f.WriteString(fmt.Sprintf(`,"%s":[`, attrName)); err != nil {
if _, err = f.WriteString(`],"buckets":[`); err != nil {
return err
}
i := 0
comma := ""
comma = ""
for bucket := range bucketMap {
if _, err = f.WriteString(fmt.Sprintf(`%s"%s"`, comma, bucket)); err != nil {
return err
@ -119,6 +76,7 @@ func writeContainerInfo(attrName string, bucketMap map[string]struct{}, f *os.Fi
}
i++
}
_, err = f.WriteString(`]`)
_, err = f.WriteString(`]}`)
return err
}

View file

@ -1,156 +0,0 @@
package registry
import (
"context"
"encoding/json"
"fmt"
"math/rand"
"os"
"path/filepath"
"slices"
"strings"
"testing"
"github.com/stretchr/testify/require"
)
type expectedResult struct {
mode string
objects []ObjectInfo
dir string
dbName string
jsonName string
}
func TestObjectExporter(t *testing.T) {
names := []string{"s3", "grpc"}
for _, name := range names {
t.Run(name, runExportTest)
t.Run(name+"-changed", runExportChangedTest)
t.Run(name+"-empty", runExportEmptyTest)
}
}
func runExportTest(t *testing.T) {
expected := getExpectedResult(t)
objReg := getFilledRegistry(t, expected)
objExp := NewObjExporter(NewObjSelector(objReg, 0, SelectorOneshot, &ObjFilter{Status: statusCreated}))
require.NoError(t, objExp.ExportJSONPreGen(expected.jsonName))
require.NoError(t, checkExported(expected.objects, expected.jsonName))
}
func runExportChangedTest(t *testing.T) {
expected := getExpectedResult(t)
objReg := getFilledRegistry(t, expected)
newStatus := randString(10)
num := randPositiveInt(1, len(expected.objects))
changedObjects := make([]ObjectInfo, num)
require.Equal(t, num, copy(changedObjects[:], expected.objects[:]))
sel := NewObjSelector(objReg, 0, SelectorOneshot, &ObjFilter{Status: statusCreated})
for i := range changedObjects {
changedObjects[i].Status = newStatus
require.NoError(t, objReg.SetObjectStatus(sel.NextObject().Id, statusCreated, newStatus))
}
objExp := NewObjExporter(NewObjSelector(objReg, 0, SelectorOneshot, &ObjFilter{Status: newStatus}))
require.NoError(t, objExp.ExportJSONPreGen(expected.jsonName))
require.NoError(t, checkExported(changedObjects, expected.jsonName))
}
func runExportEmptyTest(t *testing.T) {
expected := getExpectedResult(t)
expected.objects = make([]ObjectInfo, 0)
objReg := getFilledRegistry(t, expected)
objExp := NewObjExporter(NewObjSelector(objReg, 0, SelectorOneshot, &ObjFilter{Status: statusCreated}))
require.NoError(t, objExp.ExportJSONPreGen(expected.jsonName))
require.NoError(t, checkExported(expected.objects, expected.jsonName))
}
func getExpectedResult(t *testing.T) expectedResult {
num := randPositiveInt(2, 100)
mode := getMode(t.Name())
require.NotEqual(t, "", mode, "test mode should contain either \"s3\" or\"grpc\"")
dir := t.TempDir()
res := expectedResult{
mode: mode,
objects: generateObjectInfo(num, t.Name()),
dir: dir,
dbName: filepath.Join(dir, "registry-"+mode+".db"),
jsonName: filepath.Join(dir, "registry-"+mode+".json"),
}
return res
}
func randPositiveInt(min, max int) int {
return rand.Intn(max-min) + min
}
func getMode(name string) (res string) {
if strings.Contains(name, "s3") {
res = filepath.Base(name)
}
if strings.Contains(name, "grpc") {
res = filepath.Base(name)
}
return res
}
func generateObjectInfo(num int, mode string) []ObjectInfo {
res := make([]ObjectInfo, num)
for i := range res {
res[i] = randomObjectInfo()
if !strings.Contains(mode, "s3") {
res[i].S3Bucket = ""
res[i].S3Key = ""
}
if !strings.Contains(mode, "grpc") {
res[i].CID = ""
res[i].OID = ""
}
}
return res
}
func getFilledRegistry(t *testing.T, expected expectedResult) *ObjRegistry {
objReg := NewObjRegistry(context.Background(), expected.dbName)
for i := range expected.objects {
require.NoError(t, objReg.AddObject(expected.objects[i].CID, expected.objects[i].OID, expected.objects[i].S3Bucket, expected.objects[i].S3Key, expected.objects[i].PayloadHash))
}
return objReg
}
func checkExported(expected []ObjectInfo, fileName string) error {
file, err := os.ReadFile(fileName)
if err != nil {
return err
}
if !json.Valid(file) {
return fmt.Errorf("exported json file %s is invalid", fileName)
}
var actual PreGenerateInfo
if json.Unmarshal(file, &actual) != nil {
return err
}
if len(expected) != len(actual.Objects) {
return fmt.Errorf("expected len(): %v, got len(): %v", len(expected), len(actual.Objects))
}
for i := range expected {
if !slices.ContainsFunc(actual.Objects, func(oi ObjInfo) bool {
compareS3 := oi.Bucket == expected[i].S3Bucket && oi.Object == expected[i].S3Key
comparegRPC := oi.CID == expected[i].CID && oi.OID == expected[i].OID
return compareS3 && comparegRPC
}) {
return fmt.Errorf("object %v not found in exported json file %s", expected[i], fileName)
}
}
return nil
}

View file

@ -101,7 +101,7 @@ func randomObjectInfo() ObjectInfo {
func randString(n int) string {
var sb strings.Builder
for i := 0; i < n; i++ {
sb.WriteRune('a' + rune(rand.Int31())%('z'-'a'+1))
sb.WriteRune('a' + rune(rand.Int())%('z'-'a'+1))
}
return sb.String()
}

View file

@ -75,7 +75,7 @@ func (o *ObjRegistry) AddObject(cid, oid, s3Bucket, s3Key, payloadHash string) e
}
func (o *ObjRegistry) SetObjectStatus(id uint64, oldStatus, newStatus string) error {
return o.boltDB.Batch(func(tx *bbolt.Tx) error {
return o.boltDB.Update(func(tx *bbolt.Tx) error {
oldB := tx.Bucket([]byte(oldStatus))
if oldB == nil {
return fmt.Errorf("bucket doesn't exist: '%s'", oldStatus)
@ -110,7 +110,7 @@ func (o *ObjRegistry) SetObjectStatus(id uint64, oldStatus, newStatus string) er
}
func (o *ObjRegistry) DeleteObject(id uint64) error {
return o.boltDB.Batch(func(tx *bbolt.Tx) error {
return o.boltDB.Update(func(tx *bbolt.Tx) error {
return tx.ForEach(func(_ []byte, b *bbolt.Bucket) error {
return b.Delete(encodeId(id))
})

View file

@ -3,15 +3,12 @@ package registry
import (
"context"
"fmt"
"sync"
"time"
"github.com/nspcc-dev/neo-go/pkg/io"
"go.etcd.io/bbolt"
)
const nextObjectTimeout = 10 * time.Second
type ObjFilter struct {
Status string
Age int
@ -24,8 +21,6 @@ type ObjSelector struct {
filter *ObjFilter
cacheSize int
kind SelectorKind
// Sync synchronizes VU used for deletion.
Sync sync.WaitGroup
}
// objectSelectCache is the default maximum size of a batch to select from DB.
@ -62,16 +57,7 @@ func NewObjSelector(registry *ObjRegistry, selectionSize int, kind SelectorKind,
// - underlying registry context is done, nil objects will be returned on the
// currently blocked and every further NextObject calls.
func (o *ObjSelector) NextObject() *ObjectInfo {
if o.kind == SelectorOneshot {
return <-o.objChan
}
select {
case <-time.After(nextObjectTimeout):
return nil
case obj := <-o.objChan:
return obj
}
return <-o.objChan
}
// Count returns total number of objects that match filter of the selector.

View file

@ -180,7 +180,7 @@ func (c *Client) VerifyHash(bucket, key, expectedHash string) VerifyHashResponse
}
actualHash := hex.EncodeToString(hasher.Sum(nil))
if actualHash != expectedHash {
return VerifyHashResponse{Success: false, Error: "hash mismatch"}
return VerifyHashResponse{Success: true, Error: "hash mismatch"}
}
return VerifyHashResponse{Success: true}

View file

@ -1,25 +1,25 @@
import { sleep } from 'k6';
import { SharedArray } from 'k6/data';
import {sleep} from 'k6';
import {SharedArray} from 'k6/data';
import exec from 'k6/execution';
import logging from 'k6/x/frostfs/logging';
import native from 'k6/x/frostfs/native';
import registry from 'k6/x/frostfs/registry';
import stats from 'k6/x/frostfs/stats';
import { newGenerator } from './libs/datagen.js';
import { parseEnv } from './libs/env-parser.js';
import { textSummary } from './libs/k6-summary-0.0.2.js';
import { uuidv4 } from './libs/k6-utils-1.4.0.js';
import {newGenerator} from './libs/datagen.js';
import {parseEnv} from './libs/env-parser.js';
import {textSummary} from './libs/k6-summary-0.0.2.js';
import {uuidv4} from './libs/k6-utils-1.4.0.js';
parseEnv();
const obj_list = new SharedArray(
'obj_list',
function () { return JSON.parse(open(__ENV.PREGEN_JSON)).objects; });
'obj_list',
function() { return JSON.parse(open(__ENV.PREGEN_JSON)).objects; });
const container_list = new SharedArray(
'container_list',
function () { return JSON.parse(open(__ENV.PREGEN_JSON)).containers; });
'container_list',
function() { return JSON.parse(open(__ENV.PREGEN_JSON)).containers; });
const read_size = JSON.parse(open(__ENV.PREGEN_JSON)).obj_size;
const summary_json = __ENV.SUMMARY_JSON || '/tmp/summary.json';
@ -27,17 +27,17 @@ const summary_json = __ENV.SUMMARY_JSON || '/tmp/summary.json';
// Select random gRPC endpoint for current VU
const grpc_endpoints = __ENV.GRPC_ENDPOINTS.split(',');
const grpc_endpoint =
grpc_endpoints[Math.floor(Math.random() * grpc_endpoints.length)];
grpc_endpoints[Math.floor(Math.random() * grpc_endpoints.length)];
const grpc_client = native.connect(
grpc_endpoint, '', __ENV.DIAL_TIMEOUT ? parseInt(__ENV.DIAL_TIMEOUT) : 5,
__ENV.STREAM_TIMEOUT ? parseInt(__ENV.STREAM_TIMEOUT) : 60,
__ENV.PREPARE_LOCALLY ? __ENV.PREPARE_LOCALLY.toLowerCase() === 'true' : false,
1024 * parseInt(__ENV.MAX_OBJECT_SIZE || '0'));
grpc_endpoint, '', __ENV.DIAL_TIMEOUT ? parseInt(__ENV.DIAL_TIMEOUT) : 5,
__ENV.STREAM_TIMEOUT ? parseInt(__ENV.STREAM_TIMEOUT) : 60,
__ENV.PREPARE_LOCALLY ? __ENV.PREPARE_LOCALLY.toLowerCase() === 'true'
: false);
const log = logging.new().withField('endpoint', grpc_endpoint);
const registry_enabled = !!__ENV.REGISTRY_FILE;
const obj_registry =
registry_enabled ? registry.open(__ENV.REGISTRY_FILE) : undefined;
registry_enabled ? registry.open(__ENV.REGISTRY_FILE) : undefined;
const duration = __ENV.DURATION;
@ -49,11 +49,11 @@ const read_age = __ENV.READ_AGE ? parseInt(__ENV.READ_AGE) : 10;
let obj_to_read_selector = undefined;
if (registry_enabled) {
obj_to_read_selector = registry.getLoopedSelector(
__ENV.REGISTRY_FILE, 'obj_to_read',
__ENV.SELECTION_SIZE ? parseInt(__ENV.SELECTION_SIZE) : 0, {
status: 'created',
age: read_age,
})
__ENV.REGISTRY_FILE, 'obj_to_read',
__ENV.SELECTION_SIZE ? parseInt(__ENV.SELECTION_SIZE) : 0, {
status : 'created',
age : read_age,
})
}
const scenarios = {};
@ -63,11 +63,11 @@ const write_grpc_chunk_size = 1024 * parseInt(__ENV.GRPC_CHUNK_SIZE || '0')
const generator = newGenerator(write_vu_count > 0);
if (write_vu_count > 0) {
scenarios.write = {
executor: 'constant-vus',
vus: write_vu_count,
duration: `${duration}s`,
exec: 'obj_write',
gracefulStop: '5s',
executor : 'constant-vus',
vus : write_vu_count,
duration : `${duration}s`,
exec : 'obj_write',
gracefulStop : '5s',
};
}
@ -78,24 +78,24 @@ if (registry_enabled && delete_age) {
obj_to_delete_exit_on_null = write_vu_count == 0;
let constructor = obj_to_delete_exit_on_null ? registry.getOneshotSelector
: registry.getSelector;
: registry.getSelector;
obj_to_delete_selector =
constructor(__ENV.REGISTRY_FILE, 'obj_to_delete',
__ENV.SELECTION_SIZE ? parseInt(__ENV.SELECTION_SIZE) : 0, {
status: 'created',
age: delete_age,
});
constructor(__ENV.REGISTRY_FILE, 'obj_to_delete',
__ENV.SELECTION_SIZE ? parseInt(__ENV.SELECTION_SIZE) : 0, {
status : 'created',
age : delete_age,
});
}
const read_vu_count = parseInt(__ENV.READERS || '0');
if (read_vu_count > 0) {
scenarios.read = {
executor: 'constant-vus',
vus: read_vu_count,
duration: `${duration}s`,
exec: 'obj_read',
gracefulStop: '5s',
executor : 'constant-vus',
vus : read_vu_count,
duration : `${duration}s`,
exec : 'obj_read',
gracefulStop : '5s',
};
}
@ -103,21 +103,21 @@ const delete_vu_count = parseInt(__ENV.DELETERS || '0');
if (delete_vu_count > 0) {
if (!obj_to_delete_selector) {
throw new Error(
'Positive DELETE worker number without a proper object selector');
'Positive DELETE worker number without a proper object selector');
}
scenarios.delete = {
executor: 'constant-vus',
vus: delete_vu_count,
duration: `${duration}s`,
exec: 'obj_delete',
gracefulStop: '5s',
executor : 'constant-vus',
vus : delete_vu_count,
duration : `${duration}s`,
exec : 'obj_delete',
gracefulStop : '5s',
};
}
export const options = {
scenarios,
setupTimeout: '5s',
setupTimeout : '5s',
};
export function setup() {
@ -133,7 +133,7 @@ export function setup() {
const start_timestamp = Date.now()
console.log(
`Load started at: ${Date(start_timestamp).toString()}`)
`Load started at: ${Date(start_timestamp).toString()}`)
}
export function teardown(data) {
@ -142,13 +142,13 @@ export function teardown(data) {
}
const end_timestamp = Date.now()
console.log(
`Load finished at: ${Date(end_timestamp).toString()}`)
`Load finished at: ${Date(end_timestamp).toString()}`)
}
export function handleSummary(data) {
return {
'stdout': textSummary(data, { indent: ' ', enableColors: false }),
[summary_json]: JSON.stringify(data),
'stdout' : textSummary(data, {indent : ' ', enableColors : false}),
[summary_json] : JSON.stringify(data),
};
}
@ -157,13 +157,13 @@ export function obj_write() {
sleep(__ENV.SLEEP_WRITE);
}
const headers = { unique_header: uuidv4() };
const headers = {unique_header : uuidv4()};
const container =
container_list[Math.floor(Math.random() * container_list.length)];
container_list[Math.floor(Math.random() * container_list.length)];
const payload = generator.genPayload();
const resp =
grpc_client.put(container, headers, payload, write_grpc_chunk_size);
grpc_client.put(container, headers, payload, write_grpc_chunk_size);
if (!resp.success) {
log.withField('cid', container).error(resp.error);
return;
@ -186,7 +186,7 @@ export function obj_read() {
}
const resp = grpc_client.get(obj.c_id, obj.o_id)
if (!resp.success) {
log.withFields({ cid: obj.c_id, oid: obj.o_id }).error(resp.error);
log.withFields({cid : obj.c_id, oid : obj.o_id}).error(resp.error);
}
return
}
@ -194,7 +194,7 @@ export function obj_read() {
const obj = obj_list[Math.floor(Math.random() * obj_list.length)];
const resp = grpc_client.get(obj.container, obj.object)
if (!resp.success) {
log.withFields({ cid: obj.container, oid: obj.object }).error(resp.error);
log.withFields({cid : obj.container, oid : obj.object}).error(resp.error);
}
}
@ -214,7 +214,7 @@ export function obj_delete() {
const resp = grpc_client.delete(obj.c_id, obj.o_id);
if (!resp.success) {
// Log errors except (2052 - object already deleted)
log.withFields({ cid: obj.c_id, oid: obj.o_id }).error(resp.error);
log.withFields({cid : obj.c_id, oid : obj.o_id}).error(resp.error);
return;
}

View file

@ -1,22 +1,22 @@
import { sleep } from 'k6';
import { SharedArray } from 'k6/data';
import {sleep} from 'k6';
import {SharedArray} from 'k6/data';
import logging from 'k6/x/frostfs/logging';
import native from 'k6/x/frostfs/native';
import registry from 'k6/x/frostfs/registry';
import stats from 'k6/x/frostfs/stats';
import { newGenerator } from './libs/datagen.js';
import { parseEnv } from './libs/env-parser.js';
import { textSummary } from './libs/k6-summary-0.0.2.js';
import { uuidv4 } from './libs/k6-utils-1.4.0.js';
import {newGenerator} from './libs/datagen.js';
import {parseEnv} from './libs/env-parser.js';
import {textSummary} from './libs/k6-summary-0.0.2.js';
import {uuidv4} from './libs/k6-utils-1.4.0.js';
parseEnv();
const obj_list = new SharedArray('obj_list', function () {
const obj_list = new SharedArray('obj_list', function() {
return JSON.parse(open(__ENV.PREGEN_JSON)).objects;
});
const container_list = new SharedArray('container_list', function () {
const container_list = new SharedArray('container_list', function() {
return JSON.parse(open(__ENV.PREGEN_JSON)).containers;
});
@ -26,17 +26,17 @@ const summary_json = __ENV.SUMMARY_JSON || '/tmp/summary.json';
// Select random gRPC endpoint for current VU
const grpc_endpoints = __ENV.GRPC_ENDPOINTS.split(',');
const grpc_endpoint =
grpc_endpoints[Math.floor(Math.random() * grpc_endpoints.length)];
grpc_endpoints[Math.floor(Math.random() * grpc_endpoints.length)];
const grpc_client = native.connect(
grpc_endpoint, '', __ENV.DIAL_TIMEOUT ? parseInt(__ENV.DIAL_TIMEOUT) : 5,
__ENV.STREAM_TIMEOUT ? parseInt(__ENV.STREAM_TIMEOUT) : 60,
__ENV.PREPARE_LOCALLY ? __ENV.PREPARE_LOCALLY.toLowerCase() === 'true' : false,
1024 * parseInt(__ENV.MAX_OBJECT_SIZE || '0'));
grpc_endpoint, '', __ENV.DIAL_TIMEOUT ? parseInt(__ENV.DIAL_TIMEOUT) : 5,
__ENV.STREAM_TIMEOUT ? parseInt(__ENV.STREAM_TIMEOUT) : 60,
__ENV.PREPARE_LOCALLY ? __ENV.PREPARE_LOCALLY.toLowerCase() === 'true' :
false);
const log = logging.new().withField('endpoint', grpc_endpoint);
const registry_enabled = !!__ENV.REGISTRY_FILE;
const obj_registry =
registry_enabled ? registry.open(__ENV.REGISTRY_FILE) : undefined;
registry_enabled ? registry.open(__ENV.REGISTRY_FILE) : undefined;
const duration = __ENV.DURATION;
@ -48,22 +48,22 @@ const delete_age = __ENV.DELETE_AGE ? parseInt(__ENV.DELETE_AGE) : undefined;
let obj_to_delete_selector = undefined;
if (registry_enabled && delete_age) {
obj_to_delete_selector = registry.getSelector(
__ENV.REGISTRY_FILE, 'obj_to_delete',
__ENV.SELECTION_SIZE ? parseInt(__ENV.SELECTION_SIZE) : 0, {
status: 'created',
age: delete_age,
});
__ENV.REGISTRY_FILE, 'obj_to_delete',
__ENV.SELECTION_SIZE ? parseInt(__ENV.SELECTION_SIZE) : 0, {
status: 'created',
age: delete_age,
});
}
const read_age = __ENV.READ_AGE ? parseInt(__ENV.READ_AGE) : 10;
let obj_to_read_selector = undefined;
if (registry_enabled) {
obj_to_read_selector = registry.getLoopedSelector(
__ENV.REGISTRY_FILE, 'obj_to_read',
__ENV.SELECTION_SIZE ? parseInt(__ENV.SELECTION_SIZE) : 0, {
status: 'created',
age: read_age,
})
__ENV.REGISTRY_FILE, 'obj_to_read',
__ENV.SELECTION_SIZE ? parseInt(__ENV.SELECTION_SIZE) : 0, {
status: 'created',
age: read_age,
})
}
const scenarios = {};
@ -109,7 +109,7 @@ const delete_rate = parseInt(__ENV.DELETE_RATE || '0');
if (delete_rate > 0) {
if (!obj_to_delete_selector) {
throw new Error(
'Positive DELETE worker number without a proper object selector');
'Positive DELETE worker number without a proper object selector');
}
scenarios.delete = {
@ -131,7 +131,7 @@ export const options = {
export function setup() {
const total_pre_allocated_vu_count =
pre_alloc_write_vus + pre_alloc_read_vus + pre_alloc_delete_vus;
pre_alloc_write_vus + pre_alloc_read_vus + pre_alloc_delete_vus;
const total_max_vu_count = max_read_vus + max_write_vus + max_delete_vus
console.log(`Pregenerated containers: ${container_list.length}`);
@ -152,7 +152,7 @@ export function setup() {
const start_timestamp = Date.now()
console.log(
`Load started at: ${Date(start_timestamp).toString()}`)
`Load started at: ${Date(start_timestamp).toString()}`)
}
export function teardown(data) {
@ -161,12 +161,12 @@ export function teardown(data) {
}
const end_timestamp = Date.now()
console.log(
`Load finished at: ${Date(end_timestamp).toString()}`)
`Load finished at: ${Date(end_timestamp).toString()}`)
}
export function handleSummary(data) {
return {
'stdout': textSummary(data, { indent: ' ', enableColors: false }),
'stdout': textSummary(data, {indent: ' ', enableColors: false}),
[summary_json]: JSON.stringify(data),
};
}
@ -176,13 +176,13 @@ export function obj_write() {
sleep(__ENV.SLEEP_WRITE);
}
const headers = { unique_header: uuidv4() };
const headers = {unique_header: uuidv4()};
const container =
container_list[Math.floor(Math.random() * container_list.length)];
container_list[Math.floor(Math.random() * container_list.length)];
const payload = generator.genPayload();
const resp =
grpc_client.put(container, headers, payload, write_grpc_chunk_size);
grpc_client.put(container, headers, payload, write_grpc_chunk_size);
if (!resp.success) {
log.withField('cid', container).error(resp.error);
return;
@ -205,7 +205,7 @@ export function obj_read() {
}
const resp = grpc_client.get(obj.c_id, obj.o_id)
if (!resp.success) {
log.withFields({ cid: obj.c_id, oid: obj.o_id }).error(resp.error);
log.withFields({cid: obj.c_id, oid: obj.o_id}).error(resp.error);
}
return
}
@ -213,7 +213,7 @@ export function obj_read() {
const obj = obj_list[Math.floor(Math.random() * obj_list.length)];
const resp = grpc_client.get(obj.container, obj.object)
if (!resp.success) {
log.withFields({ cid: obj.container, oid: obj.object }).error(resp.error);
log.withFields({cid: obj.container, oid: obj.object}).error(resp.error);
}
}
@ -230,7 +230,7 @@ export function obj_delete() {
const resp = grpc_client.delete(obj.c_id, obj.o_id);
if (!resp.success) {
// Log errors except (2052 - object already deleted)
log.withFields({ cid: obj.c_id, oid: obj.o_id }).error(resp.error);
log.withFields({cid: obj.c_id, oid: obj.o_id}).error(resp.error);
return;
}

View file

@ -4,16 +4,15 @@ from helpers.cmd import execute_cmd, log
def create_bucket(endpoint, versioning, location, acl, no_verify_ssl):
configuration = ""
if location:
configuration = f"--create-bucket-configuration 'LocationConstraint={location}'"
location = f"--create-bucket-configuration 'LocationConstraint={location}'"
if acl:
acl = f"--acl {acl}"
bucket_name = str(uuid.uuid4())
no_verify_ssl_str = "--no-verify-ssl" if no_verify_ssl else ""
cmd_line = f"aws {no_verify_ssl_str} s3api create-bucket --bucket {bucket_name} " \
f"--endpoint {endpoint} {configuration} {acl} "
f"--endpoint {endpoint} {location} {acl} "
cmd_line_ver = f"aws {no_verify_ssl_str} s3api put-bucket-versioning --bucket {bucket_name} " \
f"--versioning-configuration Status=Enabled --endpoint {endpoint} {acl} "
@ -34,7 +33,7 @@ def create_bucket(endpoint, versioning, location, acl, no_verify_ssl):
else:
log(f"Bucket versioning has been applied for bucket {bucket_name}", endpoint)
log(f"Created bucket: {bucket_name} ({location})", endpoint)
log(f"Created bucket: {bucket_name}", endpoint)
return bucket_name

View file

@ -1,8 +1,8 @@
import re
from helpers.cmd import execute_cmd, log
def create_container(endpoint, policy, container_creation_retry, wallet_path, config, acl, local=False, retry=0):
if retry > int(container_creation_retry):
def create_container(endpoint, policy, wallet_path, config, acl, local=False, depth=0):
if depth > 20:
raise ValueError(f"unable to create container: too many unsuccessful attempts")
if wallet_path:
@ -34,7 +34,7 @@ def create_container(endpoint, policy, container_creation_retry, wallet_path, co
raise ValueError(f"no CID was parsed from command output:\t{fst_str}")
cid = splitted[1]
log(f"Created container: {cid} ({policy})", endpoint)
log(f"Created container {cid}", endpoint)
if not local:
return cid
@ -88,7 +88,7 @@ def create_container(endpoint, policy, container_creation_retry, wallet_path, co
return cid
log(f"Created container {cid} is not stored on {endpoint}, creating another one...", endpoint)
return create_container(endpoint, policy, container_creation_retry, wallet_path, config, acl, local, retry + 1)
return create_container(endpoint, policy, wallet_path, config, acl, local, depth + 1)
def upload_object(container, payload_filepath, endpoint, wallet_file, wallet_config):

View file

@ -15,20 +15,18 @@ from helpers.frostfs_cli import create_container, upload_object
ERROR_WRONG_CONTAINERS_COUNT = 1
ERROR_WRONG_OBJECTS_COUNT = 2
MAX_WORKERS = 50
DEFAULT_POLICY = "REP 2 IN X CBF 2 SELECT 2 FROM * AS X"
parser = argparse.ArgumentParser()
parser.add_argument('--size', help='Upload objects size in kb')
parser.add_argument('--containers', help='Number of containers to create')
parser.add_argument('--retry', default=20, help='Maximum number of retries to create a container')
parser.add_argument('--out', help='JSON file with output')
parser.add_argument('--preload_obj', help='Number of pre-loaded objects')
parser.add_argument('--wallet', help='Wallet file path')
parser.add_argument('--config', help='Wallet config file path')
parser.add_argument(
"--policy",
help=f"Container placement policy. Default is {DEFAULT_POLICY}",
action="append"
help="Container placement policy",
default="REP 2 IN X CBF 2 SELECT 2 FROM * AS X"
)
parser.add_argument('--endpoint', help='Nodes addresses separated by comma.')
parser.add_argument('--update', help='Save existed containers')
@ -48,10 +46,7 @@ def main():
objects_list = []
endpoints = args.endpoint.split(',')
if not args.policy:
args.policy = [DEFAULT_POLICY]
container_creation_retry = args.retry
wallet = args.wallet
wallet_config = args.config
workers = int(args.workers)
@ -68,9 +63,9 @@ def main():
containers_count = int(args.containers)
print(f"Create containers: {containers_count}")
with ProcessPoolExecutor(max_workers=min(MAX_WORKERS, workers)) as executor:
containers_runs = [executor.submit(create_container, endpoint, policy, container_creation_retry, wallet, wallet_config, args.acl, args.local)
for _, endpoint, policy in
zip(range(containers_count), cycle(endpoints), cycle(args.policy))]
containers_runs = [executor.submit(create_container, endpoint, args.policy, wallet, wallet_config, args.acl, args.local)
for _, endpoint in
zip(range(containers_count), cycle(endpoints))]
for run in containers_runs:
container_id = run.result()

View file

@ -11,11 +11,6 @@ from concurrent.futures import ProcessPoolExecutor
from helpers.cmd import random_payload
from helpers.aws_cli import create_bucket, upload_object
ERROR_WRONG_CONTAINERS_COUNT = 1
ERROR_WRONG_OBJECTS_COUNT = 2
MAX_WORKERS = 50
DEFAULT_LOCATION = ""
parser = argparse.ArgumentParser()
parser.add_argument('--size', help='Upload objects size in kb.')
@ -25,7 +20,7 @@ parser.add_argument('--preload_obj', help='Number of pre-loaded objects.')
parser.add_argument('--endpoint', help='S3 Gateways addresses separated by comma.')
parser.add_argument('--update', help='True/False, False by default. Save existed buckets from target file (--out). '
'New buckets will not be created.')
parser.add_argument('--location', help=f'AWS location constraint. Default is "{DEFAULT_LOCATION}"', action="append")
parser.add_argument('--location', help='AWS location. Will be empty, if has not be declared.', default="")
parser.add_argument('--versioning', help='True/False, False by default.')
parser.add_argument('--ignore-errors', help='Ignore preset errors', action='store_true')
parser.add_argument('--no-verify-ssl', help='Ignore SSL verifications', action='store_true')
@ -37,6 +32,10 @@ parser.add_argument('--acl', help='Bucket ACL. Default is private. Expected valu
args = parser.parse_args()
print(args)
ERROR_WRONG_CONTAINERS_COUNT = 1
ERROR_WRONG_OBJECTS_COUNT = 2
MAX_WORKERS = 50
def main():
buckets = []
objects_list = []
@ -44,8 +43,6 @@ def main():
no_verify_ssl = args.no_verify_ssl
endpoints = args.endpoint.split(',')
if not args.location:
args.location = [DEFAULT_LOCATION]
workers = int(args.workers)
objects_per_bucket = int(args.preload_obj)
@ -62,9 +59,9 @@ def main():
print(f"Create buckets: {buckets_count}")
with ProcessPoolExecutor(max_workers=min(MAX_WORKERS, workers)) as executor:
buckets_runs = [executor.submit(create_bucket, endpoint, args.versioning, location, args.acl, no_verify_ssl)
for _, endpoint, location in
zip(range(buckets_count), cycle(endpoints), cycle(args.location))]
buckets_runs = [executor.submit(create_bucket, endpoint, args.versioning, args.location, args.acl, no_verify_ssl)
for _, endpoint in
zip(range(buckets_count), cycle(endpoints))]
for run in buckets_runs:
bucket_name = run.result()

View file

@ -132,10 +132,6 @@ export function setup() {
const start_timestamp = Date.now()
console.log(
`Load started at: ${Date(start_timestamp).toString()}`)
if (delete_vu_count > 0){
obj_to_delete_selector.sync.add(delete_vu_count)
}
}
export function teardown(data) {
@ -208,8 +204,6 @@ export function obj_delete() {
const obj = obj_to_delete_selector.nextObject();
if (!obj) {
if (obj_to_delete_exit_on_null) {
obj_to_delete_selector.sync.done()
obj_to_delete_selector.sync.wait()
exec.test.abort("No more objects to select");
}
return;

View file

@ -1,233 +0,0 @@
import {sleep} from 'k6';
import {SharedArray} from 'k6/data';
import exec from 'k6/execution';
import logging from 'k6/x/frostfs/logging';
import registry from 'k6/x/frostfs/registry';
import s3 from 'k6/x/frostfs/s3';
import stats from 'k6/x/frostfs/stats';
import {newGenerator} from './libs/datagen.js';
import {parseEnv} from './libs/env-parser.js';
import {textSummary} from './libs/k6-summary-0.0.2.js';
import {uuidv4} from './libs/k6-utils-1.4.0.js';
parseEnv();
const obj_list = new SharedArray(
'obj_list',
function() { return JSON.parse(open(__ENV.PREGEN_JSON)).objects; });
const bucket_list = new SharedArray(
'bucket_list',
function() { return JSON.parse(open(__ENV.PREGEN_JSON)).buckets; });
const read_size = JSON.parse(open(__ENV.PREGEN_JSON)).obj_size;
const summary_json = __ENV.SUMMARY_JSON || '/tmp/summary.json';
const no_verify_ssl = __ENV.NO_VERIFY_SSL || 'true';
const connection_args = {
no_verify_ssl : no_verify_ssl
}
// Select random S3 endpoint for current VU
const s3_endpoints = __ENV.S3_ENDPOINTS.split(',');
const s3_endpoint =
s3_endpoints[Math.floor(Math.random() * s3_endpoints.length)];
const s3_client = s3.connect(s3_endpoint, connection_args);
const log = logging.new().withField('endpoint', s3_endpoint);
const registry_enabled = !!__ENV.REGISTRY_FILE;
const obj_registry =
registry_enabled ? registry.open(__ENV.REGISTRY_FILE) : undefined;
const duration = __ENV.DURATION;
if (!!__ENV.METRIC_TAGS) {
stats.setTags(__ENV.METRIC_TAGS)
}
const read_age = __ENV.READ_AGE ? parseInt(__ENV.READ_AGE) : 10;
let obj_to_read_selector = undefined;
if (registry_enabled) {
obj_to_read_selector = registry.getSelector(
__ENV.REGISTRY_FILE, 'obj_to_read',
__ENV.SELECTION_SIZE ? parseInt(__ENV.SELECTION_SIZE) : 0, {
status : 'created',
age : read_age,
})
}
const scenarios = {};
const write_vu_count = parseInt(__ENV.WRITERS || '0');
const generator = newGenerator(write_vu_count > 0);
if (write_vu_count > 0) {
scenarios.write = {
executor : 'constant-vus',
vus : write_vu_count,
duration : `${duration}s`,
exec : 'obj_write',
gracefulStop : '5s',
};
}
const read_vu_count = parseInt(__ENV.READERS || '0');
if (read_vu_count > 0) {
scenarios.read = {
executor : 'constant-vus',
vus : read_vu_count,
duration : `${duration}s`,
exec : 'obj_read',
gracefulStop : '5s',
};
}
const delete_age = __ENV.DELETE_AGE ? parseInt(__ENV.DELETE_AGE) : undefined;
let obj_to_delete_selector = undefined;
let obj_to_delete_exit_on_null = undefined;
if (registry_enabled ) {
obj_to_delete_exit_on_null = (write_vu_count == 0) && (read_vu_count == 0)
let constructor = obj_to_delete_exit_on_null ? registry.getOneshotSelector
: registry.getSelector;
obj_to_delete_selector =
constructor(__ENV.REGISTRY_FILE, 'obj_to_delete',
__ENV.SELECTION_SIZE ? parseInt(__ENV.SELECTION_SIZE) : 0, {
status : 'read',
age : delete_age,
});
}
const delete_vu_count = parseInt(__ENV.DELETERS || '0');
if (delete_vu_count > 0) {
if (!obj_to_delete_selector) {
throw 'Positive DELETE worker number without a proper object selector';
}
scenarios.delete = {
executor : 'constant-vus',
vus : delete_vu_count,
duration : `${duration}s`,
exec : 'obj_delete',
gracefulStop : '5s',
};
}
export const options = {
scenarios,
setupTimeout : '5s',
};
export function setup() {
const total_vu_count = write_vu_count + read_vu_count + delete_vu_count;
console.log(`Pregenerated buckets: ${bucket_list.length}`);
console.log(`Pregenerated read object size: ${read_size}`);
console.log(`Pregenerated total objects: ${obj_list.length}`);
console.log(`Reading VUs: ${read_vu_count}`);
console.log(`Writing VUs: ${write_vu_count}`);
console.log(`Deleting VUs: ${delete_vu_count}`);
console.log(`Total VUs: ${total_vu_count}`);
const start_timestamp = Date.now()
console.log(
`Load started at: ${Date(start_timestamp).toString()}`)
}
export function teardown(data) {
if (obj_registry) {
obj_registry.close();
}
const end_timestamp = Date.now()
console.log(
`Load finished at: ${Date(end_timestamp).toString()}`)
}
export function handleSummary(data) {
return {
'stdout' : textSummary(data, {indent : ' ', enableColors : false}),
[summary_json] : JSON.stringify(data),
};
}
export function obj_write() {
if (__ENV.SLEEP_WRITE) {
sleep(__ENV.SLEEP_WRITE);
}
const key = __ENV.OBJ_NAME || uuidv4();
const bucket = bucket_list[Math.floor(Math.random() * bucket_list.length)];
const payload = generator.genPayload();
const resp = s3_client.put(bucket, key, payload);
if (!resp.success) {
log.withFields({bucket : bucket, key : key}).error(resp.error);
return;
}
if (obj_registry) {
obj_registry.addObject('', '', bucket, key, payload.hash());
}
}
export function obj_read() {
if (__ENV.SLEEP_READ) {
sleep(__ENV.SLEEP_READ);
}
if (obj_to_read_selector) {
const obj = obj_to_read_selector.nextObject();
if (!obj ) {
return;
}
const resp = s3_client.get(obj.s3_bucket, obj.s3_key)
if (!resp.success) {
log.withFields({bucket : obj.s3_bucket, key : obj.s3_key, status: obj.status, op: `READ`})
.error(resp.error);
} else {
obj_registry.setObjectStatus(obj.id, obj.status, 'read');
}
return
}
const obj = obj_list[Math.floor(Math.random() * obj_list.length)];
const resp = s3_client.get(obj.bucket, obj.object);
if (!resp.success) {
log.withFields({bucket : obj.bucket, key : obj.object}).error(resp.error);
} else {
obj_registry.setObjectStatus(obj.id, obj.status, 'read');
}
}
export function obj_delete() {
if (__ENV.SLEEP_DELETE) {
sleep(__ENV.SLEEP_DELETE);
}
const obj = obj_to_delete_selector.nextObject();
delete_object(obj)
}
export function delete_object(obj) {
if (!obj) {
if (obj_to_delete_exit_on_null) {
exec.test.abort("No more objects to select");
}
return;
}
const resp = s3_client.delete(obj.s3_bucket, obj.s3_key);
if (!resp.success) {
log.withFields({bucket : obj.s3_bucket, key : obj.s3_key, op : 'DELETE'})
.error(resp.error);
return;
}
obj_registry.deleteObject(obj.id);
}

View file

@ -1,13 +1,13 @@
import { sleep } from 'k6';
import { Counter } from 'k6/metrics';
import {sleep} from 'k6';
import {Counter} from 'k6/metrics';
import logging from 'k6/x/frostfs/logging';
import native from 'k6/x/frostfs/native';
import registry from 'k6/x/frostfs/registry';
import s3 from 'k6/x/frostfs/s3';
import stats from 'k6/x/frostfs/stats';
import { parseEnv } from './libs/env-parser.js';
import { textSummary } from './libs/k6-summary-0.0.2.js';
import {parseEnv} from './libs/env-parser.js';
import {textSummary} from './libs/k6-summary-0.0.2.js';
parseEnv();
@ -39,23 +39,24 @@ let grpc_client = undefined;
if (__ENV.GRPC_ENDPOINTS) {
const grpcEndpoints = __ENV.GRPC_ENDPOINTS.split(',');
const grpcEndpoint =
grpcEndpoints[Math.floor(Math.random() * grpcEndpoints.length)];
grpcEndpoints[Math.floor(Math.random() * grpcEndpoints.length)];
log = log.withField('endpoint', grpcEndpoint);
grpc_client = native.connect(
grpcEndpoint, '', __ENV.DIAL_TIMEOUT ? parseInt(__ENV.DIAL_TIMEOUT) : 0,
__ENV.STREAM_TIMEOUT ? parseInt(__ENV.STREAM_TIMEOUT) : 0,
__ENV.PREPARE_LOCALLY ? __ENV.PREPARE_LOCALLY.toLowerCase() === 'true' : false,
1024 * parseInt(__ENV.MAX_OBJECT_SIZE || '0'));
grpcEndpoint, '', __ENV.DIAL_TIMEOUT ? parseInt(__ENV.DIAL_TIMEOUT) : 0,
__ENV.STREAM_TIMEOUT ? parseInt(__ENV.STREAM_TIMEOUT) : 0,
__ENV.PREPARE_LOCALLY ? __ENV.PREPARE_LOCALLY.toLowerCase() === 'true' :
false,
'');
}
// Connect to random S3 endpoint
let s3_client = undefined;
if (__ENV.S3_ENDPOINTS) {
const no_verify_ssl = __ENV.NO_VERIFY_SSL || 'true';
const connection_args = { no_verify_ssl: no_verify_ssl };
const connection_args = {no_verify_ssl: no_verify_ssl};
const s3_endpoints = __ENV.S3_ENDPOINTS.split(',');
const s3_endpoint =
s3_endpoints[Math.floor(Math.random() * s3_endpoints.length)];
s3_endpoints[Math.floor(Math.random() * s3_endpoints.length)];
log = log.withField('endpoint', s3_endpoint);
s3_client = s3.connect(s3_endpoint, connection_args);
}
@ -64,10 +65,10 @@ if (__ENV.S3_ENDPOINTS) {
// execute as many iterations as there are objects. Each object will have 3
// retries to be verified
const obj_to_verify_selector = registry.getSelector(
__ENV.REGISTRY_FILE, 'obj_to_verify',
__ENV.SELECTION_SIZE ? parseInt(__ENV.SELECTION_SIZE) : 0, {
status: 'created',
});
__ENV.REGISTRY_FILE, 'obj_to_verify',
__ENV.SELECTION_SIZE ? parseInt(__ENV.SELECTION_SIZE) : 0, {
status: 'created',
});
const obj_to_verify_count = obj_to_verify_selector.count();
// Execute at least one iteration (executor shared-iterations can't run 0
// iterations)
@ -96,15 +97,15 @@ export function setup() {
// Populate counters with initial values
for (const [status, counter] of Object.entries(obj_counters)) {
const obj_selector = registry.getSelector(
__ENV.REGISTRY_FILE, status,
__ENV.SELECTION_SIZE ? parseInt(__ENV.SELECTION_SIZE) : 0, { status });
__ENV.REGISTRY_FILE, status,
__ENV.SELECTION_SIZE ? parseInt(__ENV.SELECTION_SIZE) : 0, {status});
counter.add(obj_selector.count());
}
}
export function handleSummary(data) {
return {
'stdout': textSummary(data, { indent: ' ', enableColors: false }),
'stdout': textSummary(data, {indent: ' ', enableColors: false}),
[summary_json]: JSON.stringify(data),
};
}
@ -137,19 +138,19 @@ function verify_object_with_retries(obj, attempts) {
// ReferenceError: Cannot access a variable before initialization.
let lg = log;
if (obj.c_id && obj.o_id) {
lg = lg.withFields({ cid: obj.c_id, oid: obj.o_id });
lg = lg.withFields({cid: obj.c_id, oid: obj.o_id});
result = grpc_client.verifyHash(obj.c_id, obj.o_id, obj.payload_hash);
} else if (obj.s3_bucket && obj.s3_key) {
lg = lg.withFields({ bucket: obj.s3_bucket, key: obj.s3_key });
lg = lg.withFields({bucket: obj.s3_bucket, key: obj.s3_key});
result =
s3_client.verifyHash(obj.s3_bucket, obj.s3_key, obj.payload_hash);
s3_client.verifyHash(obj.s3_bucket, obj.s3_key, obj.payload_hash);
} else {
lg.withFields({
cid: obj.c_id,
oid: obj.o_id,
bucket: obj.s3_bucket,
key: obj.s3_key
}).warn(`Object cannot be verified with supported protocols`);
cid: obj.c_id,
oid: obj.o_id,
bucket: obj.s3_bucket,
key: obj.s3_key
}).warn(`Object cannot be verified with supported protocols`);
return 'skipped';
}