Compare commits

..

10 commits

Author SHA1 Message Date
3bf41e626e fix container_creation_retry
Signed-off-by: Liza <e.chichindaeva@yadro.com>
2024-08-30 16:18:58 +03:00
6d3ecb6528 [#154] Add registry import cli utility
* Currently, objects created in preset are never deleted.
  k6 deletes only objects from registry, if registry file
  is not provided k6 delete load fails.
* Added cli utility to import objects created in preset
  into registry so k6 can delete them normally.

Signed-off-by: Ekaterina Lebedeva <ekaterina.lebedeva@yadro.com>
2024-08-23 13:41:01 +03:00
75f670b392 [#159] preset: Add optional max number of retries to create a container instead of hard-coded number 20
Signed-off-by: s.makhov <s.makhov@yadro.com>
2024-08-02 12:22:52 +03:00
9b9db46a07 [#152] Allow to set mix of policies for containers and buckets
Signed-off-by: a.berezin <a.berezin@yadro.com>
2024-07-02 20:45:31 +03:00
335c45c578 [#149] selector: Add read timeout
If there are no objects for 10 second, then return nil.
It is required to prevent VU iteration hang if there
are no objects pushed to registry.

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2024-06-24 13:59:12 +03:00
e7d4dd404a [#150] scenarios: Use SelectorAwaiting for read and delete load in s3_dar.js, make delete_age optional
Signed-off-by: m.malygina <m.malygina@yadro.com>
2024-06-21 15:52:57 +03:00
0a9aeab47c [#150] In case we are running both read and delete load SelectorAwaiting
Signed-off-by: m.malygina <m.malygina@yadro.com>
2024-06-21 10:55:18 +03:00
3bc1229062 [#146] native: Add NetworkInfo cache
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2024-06-17 09:32:20 +03:00
e92ce668a8 [#145] scenarios: Format js files with clang
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2024-06-14 11:26:01 +03:00
6d1e7eb49e [#145] native: Allow to specify max_obj_size
For locally prepared objects it is possible now to
specify cut size.

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2024-06-14 10:57:30 +03:00
19 changed files with 398 additions and 163 deletions

View file

@ -48,10 +48,11 @@ Create native client with `connect` method. Arguments:
- dial timeout in seconds (0 for the default value) - dial timeout in seconds (0 for the default value)
- stream timeout in seconds (0 for the default value) - stream timeout in seconds (0 for the default value)
- generate object header on the client side (for big object - split locally too) - generate object header on the client side (for big object - split locally too)
- max size for generated object header on the client side (for big object - the size that the object is splitted into)
```js ```js
import native from 'k6/x/frostfs/native'; import native from 'k6/x/frostfs/native';
const frostfs_cli = native.connect("s01.frostfs.devenv:8080", "", 0, 0, false) const frostfs_cli = native.connect("s01.frostfs.devenv:8080", "", 0, 0, false, 0)
``` ```
### Methods ### Methods

View file

@ -0,0 +1,55 @@
package importer
import (
"encoding/json"
"os"
"git.frostfs.info/TrueCloudLab/xk6-frostfs/internal/registry"
)
type PreGenObj struct {
Bucket string `json:"bucket"`
Object string `json:"object"`
Container string `json:"container"`
}
type PreGenerateInfo struct {
Buckets []string `json:"buckets"`
Containers []string `json:"containers"`
Objects []PreGenObj `json:"objects"`
ObjSize string `json:"obj_size"`
}
// ImportJSONPreGen writes objects from pregenerated JSON file
// to the registry.
// Note that ImportJSONPreGen does not check if object already
// exists in the registry so in case of re-entry the registry
// will have two entities representing the same object.
func ImportJSONPreGen(o *registry.ObjRegistry, filename string) error {
f, err := os.ReadFile(filename)
if err != nil {
return err
}
var pregenInfo PreGenerateInfo
err = json.Unmarshal(f, &pregenInfo)
if err != nil {
return err
}
// AddObject uses DB.Batch to combine concurrent Batch calls
// into a single Bolt transaction. DB.Batch is limited by
// DB.MaxBatchDelay which may affect perfomance.
for _, obj := range pregenInfo.Objects {
if obj.Bucket != "" {
err = o.AddObject("", "", obj.Bucket, obj.Object, "")
} else {
err = o.AddObject(obj.Container, obj.Object, "", "", "")
}
if err != nil {
return err
}
}
return nil
}

View file

@ -0,0 +1,27 @@
package importer
import (
"git.frostfs.info/TrueCloudLab/xk6-frostfs/internal/registry"
"github.com/spf13/cobra"
)
// Cmd represents the import command.
var Cmd = &cobra.Command{
Use: "import",
Short: "Import objects into registry",
Long: "Import objects into registry from pregenerated files",
Example: `xk6-registry import registry.bolt preset.json
xk6-registry import --status created registry.bolt preset.json another_preset.json`,
RunE: runCmd,
Args: cobra.MinimumNArgs(2),
}
func runCmd(cmd *cobra.Command, args []string) error {
objRegistry := registry.NewObjRegistry(cmd.Context(), args[0])
for i := 1; i < len(args); i++ {
if err := ImportJSONPreGen(objRegistry, args[i]); err != nil {
return err
}
}
return nil
}

18
cmd/xk6-registry/main.go Normal file
View file

@ -0,0 +1,18 @@
package main
import (
"context"
"os"
"os/signal"
"syscall"
)
func main() {
ctx, _ := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM, syscall.SIGHUP)
if cmd, err := rootCmd.ExecuteContextC(ctx); err != nil {
cmd.PrintErrln("Error:", err.Error())
cmd.PrintErrf("Run '%v --help' for usage.\n", cmd.CommandPath())
os.Exit(1)
}
}

33
cmd/xk6-registry/root.go Normal file
View file

@ -0,0 +1,33 @@
package main
import (
"runtime"
"git.frostfs.info/TrueCloudLab/xk6-frostfs/cmd/xk6-registry/importer"
"git.frostfs.info/TrueCloudLab/xk6-frostfs/internal/version"
"github.com/spf13/cobra"
)
var rootCmd = &cobra.Command{
Use: "xk6-registry",
Version: version.Version,
Short: "Command Line Tool to work with Registry",
Long: `Registry provides tools to work with object registry for xk6.
It contains command for importing objects in registry from preset`,
SilenceErrors: true,
SilenceUsage: true,
Run: rootCmdRun,
}
func init() {
cobra.AddTemplateFunc("runtimeVersion", runtime.Version)
rootCmd.SetVersionTemplate(`FrostFS xk6-registry
{{printf "Version: %s" .Version }}
GoVersion: {{ runtimeVersion }}
`)
rootCmd.AddCommand(importer.Cmd)
}
func rootCmdRun(cmd *cobra.Command, _ []string) {
_ = cmd.Usage()
}

View file

@ -3,7 +3,7 @@ import { fail } from "k6";
import { uuidv4 } from '../scenarios/libs/k6-utils-1.4.0.js'; import { uuidv4 } from '../scenarios/libs/k6-utils-1.4.0.js';
const payload = open('../go.sum', 'b'); const payload = open('../go.sum', 'b');
const frostfs_cli = native.connect("s01.frostfs.devenv:8080", "1dd37fba80fec4e6a6f13fd708d8dcb3b29def768017052f6c930fa1c5d90bbb", 0, 0, false) const frostfs_cli = native.connect("s01.frostfs.devenv:8080", "1dd37fba80fec4e6a6f13fd708d8dcb3b29def768017052f6c930fa1c5d90bbb", 0, 0, false, 0)
export const options = { export const options = {
stages: [ stages: [

View file

@ -3,7 +3,7 @@ import { uuidv4 } from '../scenarios/libs/k6-utils-1.4.0.js';
const payload = open('../go.sum', 'b'); const payload = open('../go.sum', 'b');
const container = "AjSxSNNXbJUDPqqKYm1VbFVDGCakbpUNH8aGjPmGAH3B" const container = "AjSxSNNXbJUDPqqKYm1VbFVDGCakbpUNH8aGjPmGAH3B"
const frostfs_cli = native.connect("s01.frostfs.devenv:8080", "", 0, 0, false) const frostfs_cli = native.connect("s01.frostfs.devenv:8080", "", 0, 0, false, 0)
const frostfs_obj = frostfs_cli.onsite(container, payload) const frostfs_obj = frostfs_cli.onsite(container, payload)
export const options = { export const options = {

58
internal/native/cache.go Normal file
View file

@ -0,0 +1,58 @@
package native
import (
"context"
"sync"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
)
const networkCacheTTL = time.Minute
var networkInfoCache = &networkInfoCacheT{}
type networkInfoCacheT struct {
guard sync.RWMutex
current *netmap.NetworkInfo
fetchTS time.Time
}
func (c *networkInfoCacheT) getOrFetch(ctx context.Context, cli *client.Client) (*netmap.NetworkInfo, error) {
if v := c.get(); v != nil {
return v, nil
}
return c.fetch(ctx, cli)
}
func (c *networkInfoCacheT) get() *netmap.NetworkInfo {
c.guard.RLock()
defer c.guard.RUnlock()
if c.current == nil || time.Since(c.fetchTS) > networkCacheTTL {
return nil
}
return c.current
}
func (c *networkInfoCacheT) fetch(ctx context.Context, cli *client.Client) (*netmap.NetworkInfo, error) {
c.guard.Lock()
defer c.guard.Unlock()
if time.Since(c.fetchTS) <= networkCacheTTL {
return c.current, nil
}
res, err := cli.NetworkInfo(ctx, client.PrmNetworkInfo{})
if err != nil {
return nil, err
}
v := res.Info()
c.current = &v
c.fetchTS = time.Now()
return c.current, nil
}

View file

@ -35,6 +35,7 @@ type (
tok session.Object tok session.Object
cli *client.Client cli *client.Client
prepareLocally bool prepareLocally bool
maxObjSize uint64
} }
PutResponse struct { PutResponse struct {
@ -71,6 +72,7 @@ type (
hdr object.Object hdr object.Object
payload []byte payload []byte
prepareLocally bool prepareLocally bool
maxObjSize uint64
} }
) )
@ -103,7 +105,7 @@ func (c *Client) Put(containerID string, headers map[string]string, payload data
o.SetOwnerID(owner) o.SetOwnerID(owner)
o.SetAttributes(attrs...) o.SetAttributes(attrs...)
resp, err := put(c.vu, c.cli, c.prepareLocally, &tok, &o, payload, chunkSize) resp, err := put(c.vu, c.cli, c.prepareLocally, &tok, &o, payload, chunkSize, c.maxObjSize)
if err != nil { if err != nil {
return PutResponse{Success: false, Error: err.Error()} return PutResponse{Success: false, Error: err.Error()}
} }
@ -373,6 +375,7 @@ func (c *Client) Onsite(containerID string, payload datagen.Payload) PreparedObj
hdr: *obj, hdr: *obj,
payload: data, payload: data,
prepareLocally: c.prepareLocally, prepareLocally: c.prepareLocally,
maxObjSize: c.maxObjSize,
} }
} }
@ -398,7 +401,7 @@ func (p PreparedObject) Put(headers map[string]string) PutResponse {
return PutResponse{Success: false, Error: err.Error()} return PutResponse{Success: false, Error: err.Error()}
} }
_, err = put(p.vu, p.cli, p.prepareLocally, nil, &obj, datagen.NewFixedPayload(p.payload), 0) _, err = put(p.vu, p.cli, p.prepareLocally, nil, &obj, datagen.NewFixedPayload(p.payload), 0, p.maxObjSize)
if err != nil { if err != nil {
return PutResponse{Success: false, Error: err.Error()} return PutResponse{Success: false, Error: err.Error()}
} }
@ -413,7 +416,7 @@ func (s epochSource) CurrentEpoch() uint64 {
} }
func put(vu modules.VU, cli *client.Client, prepareLocally bool, tok *session.Object, func put(vu modules.VU, cli *client.Client, prepareLocally bool, tok *session.Object,
hdr *object.Object, payload datagen.Payload, chunkSize int, hdr *object.Object, payload datagen.Payload, chunkSize int, maxObjSize uint64,
) (*client.ResObjectPut, error) { ) (*client.ResObjectPut, error) {
bufSize := defaultBufferSize bufSize := defaultBufferSize
if chunkSize > 0 { if chunkSize > 0 {
@ -434,13 +437,16 @@ func put(vu modules.VU, cli *client.Client, prepareLocally bool, tok *session.Ob
prm.MaxChunkLength = chunkSize prm.MaxChunkLength = chunkSize
} }
if prepareLocally { if prepareLocally {
res, err := cli.NetworkInfo(vu.Context(), client.PrmNetworkInfo{}) ni, err := networkInfoCache.getOrFetch(vu.Context(), cli)
if err != nil { if err != nil {
return nil, err return nil, err
} }
prm.MaxSize = res.Info().MaxObjectSize() prm.MaxSize = ni.MaxObjectSize()
prm.EpochSource = epochSource(res.Info().CurrentEpoch()) prm.EpochSource = epochSource(ni.CurrentEpoch())
prm.WithoutHomomorphHash = true prm.WithoutHomomorphHash = true
if maxObjSize > 0 {
prm.MaxSize = maxObjSize
}
} }
objectWriter, err := cli.ObjectPutInit(vu.Context(), prm) objectWriter, err := cli.ObjectPutInit(vu.Context(), prm)

View file

@ -52,13 +52,17 @@ func (n *Native) Exports() modules.Exports {
return modules.Exports{Default: n} return modules.Exports{Default: n}
} }
func (n *Native) Connect(endpoint, hexPrivateKey string, dialTimeout, streamTimeout int, prepareLocally bool) (*Client, error) { func (n *Native) Connect(endpoint, hexPrivateKey string, dialTimeout, streamTimeout int, prepareLocally bool, maxObjSize int) (*Client, error) {
var ( var (
cli client.Client cli client.Client
pk *keys.PrivateKey pk *keys.PrivateKey
err error err error
) )
if maxObjSize < 0 {
return nil, fmt.Errorf("max object size value must be positive")
}
pk, err = keys.NewPrivateKey() pk, err = keys.NewPrivateKey()
if len(hexPrivateKey) != 0 { if len(hexPrivateKey) != 0 {
pk, err = keys.NewPrivateKeyFromHex(hexPrivateKey) pk, err = keys.NewPrivateKeyFromHex(hexPrivateKey)
@ -114,6 +118,16 @@ func (n *Native) Connect(endpoint, hexPrivateKey string, dialTimeout, streamTime
tok.SetAuthKey(&key) tok.SetAuthKey(&key)
tok.SetExp(exp) tok.SetExp(exp)
if prepareLocally && maxObjSize > 0 {
res, err := cli.NetworkInfo(n.vu.Context(), client.PrmNetworkInfo{})
if err != nil {
return nil, err
}
if uint64(maxObjSize) > res.Info().MaxObjectSize() {
return nil, fmt.Errorf("max object size must be not greater than %d bytes", res.Info().MaxObjectSize())
}
}
// register metrics // register metrics
objPutSuccess, _ = stats.Registry.NewMetric("frostfs_obj_put_success", metrics.Counter) objPutSuccess, _ = stats.Registry.NewMetric("frostfs_obj_put_success", metrics.Counter)
@ -140,5 +154,6 @@ func (n *Native) Connect(endpoint, hexPrivateKey string, dialTimeout, streamTime
tok: tok, tok: tok,
cli: &cli, cli: &cli,
prepareLocally: prepareLocally, prepareLocally: prepareLocally,
maxObjSize: uint64(maxObjSize),
}, nil }, nil
} }

View file

@ -9,6 +9,8 @@ import (
"go.etcd.io/bbolt" "go.etcd.io/bbolt"
) )
const nextObjectTimeout = 10 * time.Second
type ObjFilter struct { type ObjFilter struct {
Status string Status string
Age int Age int
@ -57,9 +59,18 @@ func NewObjSelector(registry *ObjRegistry, selectionSize int, kind SelectorKind,
// - underlying registry context is done, nil objects will be returned on the // - underlying registry context is done, nil objects will be returned on the
// currently blocked and every further NextObject calls. // currently blocked and every further NextObject calls.
func (o *ObjSelector) NextObject() *ObjectInfo { func (o *ObjSelector) NextObject() *ObjectInfo {
if o.kind == SelectorOneshot {
return <-o.objChan return <-o.objChan
} }
select {
case <-time.After(nextObjectTimeout):
return nil
case obj := <-o.objChan:
return obj
}
}
// Count returns total number of objects that match filter of the selector. // Count returns total number of objects that match filter of the selector.
func (o *ObjSelector) Count() (int, error) { func (o *ObjSelector) Count() (int, error) {
count := 0 count := 0

View file

@ -31,8 +31,8 @@ const grpc_endpoint =
const grpc_client = native.connect( const grpc_client = native.connect(
grpc_endpoint, '', __ENV.DIAL_TIMEOUT ? parseInt(__ENV.DIAL_TIMEOUT) : 5, grpc_endpoint, '', __ENV.DIAL_TIMEOUT ? parseInt(__ENV.DIAL_TIMEOUT) : 5,
__ENV.STREAM_TIMEOUT ? parseInt(__ENV.STREAM_TIMEOUT) : 60, __ENV.STREAM_TIMEOUT ? parseInt(__ENV.STREAM_TIMEOUT) : 60,
__ENV.PREPARE_LOCALLY ? __ENV.PREPARE_LOCALLY.toLowerCase() === 'true' __ENV.PREPARE_LOCALLY ? __ENV.PREPARE_LOCALLY.toLowerCase() === 'true' : false,
: false); 1024 * parseInt(__ENV.MAX_OBJECT_SIZE || '0'));
const log = logging.new().withField('endpoint', grpc_endpoint); const log = logging.new().withField('endpoint', grpc_endpoint);
const registry_enabled = !!__ENV.REGISTRY_FILE; const registry_enabled = !!__ENV.REGISTRY_FILE;

View file

@ -30,8 +30,8 @@ const grpc_endpoint =
const grpc_client = native.connect( const grpc_client = native.connect(
grpc_endpoint, '', __ENV.DIAL_TIMEOUT ? parseInt(__ENV.DIAL_TIMEOUT) : 5, grpc_endpoint, '', __ENV.DIAL_TIMEOUT ? parseInt(__ENV.DIAL_TIMEOUT) : 5,
__ENV.STREAM_TIMEOUT ? parseInt(__ENV.STREAM_TIMEOUT) : 60, __ENV.STREAM_TIMEOUT ? parseInt(__ENV.STREAM_TIMEOUT) : 60,
__ENV.PREPARE_LOCALLY ? __ENV.PREPARE_LOCALLY.toLowerCase() === 'true' : __ENV.PREPARE_LOCALLY ? __ENV.PREPARE_LOCALLY.toLowerCase() === 'true' : false,
false); 1024 * parseInt(__ENV.MAX_OBJECT_SIZE || '0'));
const log = logging.new().withField('endpoint', grpc_endpoint); const log = logging.new().withField('endpoint', grpc_endpoint);
const registry_enabled = !!__ENV.REGISTRY_FILE; const registry_enabled = !!__ENV.REGISTRY_FILE;

View file

@ -4,15 +4,16 @@ from helpers.cmd import execute_cmd, log
def create_bucket(endpoint, versioning, location, acl, no_verify_ssl): def create_bucket(endpoint, versioning, location, acl, no_verify_ssl):
configuration = ""
if location: if location:
location = f"--create-bucket-configuration 'LocationConstraint={location}'" configuration = f"--create-bucket-configuration 'LocationConstraint={location}'"
if acl: if acl:
acl = f"--acl {acl}" acl = f"--acl {acl}"
bucket_name = str(uuid.uuid4()) bucket_name = str(uuid.uuid4())
no_verify_ssl_str = "--no-verify-ssl" if no_verify_ssl else "" no_verify_ssl_str = "--no-verify-ssl" if no_verify_ssl else ""
cmd_line = f"aws {no_verify_ssl_str} s3api create-bucket --bucket {bucket_name} " \ cmd_line = f"aws {no_verify_ssl_str} s3api create-bucket --bucket {bucket_name} " \
f"--endpoint {endpoint} {location} {acl} " f"--endpoint {endpoint} {configuration} {acl} "
cmd_line_ver = f"aws {no_verify_ssl_str} s3api put-bucket-versioning --bucket {bucket_name} " \ cmd_line_ver = f"aws {no_verify_ssl_str} s3api put-bucket-versioning --bucket {bucket_name} " \
f"--versioning-configuration Status=Enabled --endpoint {endpoint} {acl} " f"--versioning-configuration Status=Enabled --endpoint {endpoint} {acl} "
@ -33,7 +34,7 @@ def create_bucket(endpoint, versioning, location, acl, no_verify_ssl):
else: else:
log(f"Bucket versioning has been applied for bucket {bucket_name}", endpoint) log(f"Bucket versioning has been applied for bucket {bucket_name}", endpoint)
log(f"Created bucket: {bucket_name}", endpoint) log(f"Created bucket: {bucket_name} ({location})", endpoint)
return bucket_name return bucket_name

View file

@ -1,8 +1,8 @@
import re import re
from helpers.cmd import execute_cmd, log from helpers.cmd import execute_cmd, log
def create_container(endpoint, policy, wallet_path, config, acl, local=False, depth=0): def create_container(endpoint, policy, container_creation_retry, wallet_path, config, acl, local=False, retry=0):
if depth > 20: if retry > int(container_creation_retry):
raise ValueError(f"unable to create container: too many unsuccessful attempts") raise ValueError(f"unable to create container: too many unsuccessful attempts")
if wallet_path: if wallet_path:
@ -34,7 +34,7 @@ def create_container(endpoint, policy, wallet_path, config, acl, local=False, de
raise ValueError(f"no CID was parsed from command output:\t{fst_str}") raise ValueError(f"no CID was parsed from command output:\t{fst_str}")
cid = splitted[1] cid = splitted[1]
log(f"Created container {cid}", endpoint) log(f"Created container: {cid} ({policy})", endpoint)
if not local: if not local:
return cid return cid
@ -88,7 +88,7 @@ def create_container(endpoint, policy, wallet_path, config, acl, local=False, de
return cid return cid
log(f"Created container {cid} is not stored on {endpoint}, creating another one...", endpoint) log(f"Created container {cid} is not stored on {endpoint}, creating another one...", endpoint)
return create_container(endpoint, policy, wallet_path, config, acl, local, depth + 1) return create_container(endpoint, policy, container_creation_retry, wallet_path, config, acl, local, retry + 1)
def upload_object(container, payload_filepath, endpoint, wallet_file, wallet_config): def upload_object(container, payload_filepath, endpoint, wallet_file, wallet_config):

View file

@ -15,18 +15,20 @@ from helpers.frostfs_cli import create_container, upload_object
ERROR_WRONG_CONTAINERS_COUNT = 1 ERROR_WRONG_CONTAINERS_COUNT = 1
ERROR_WRONG_OBJECTS_COUNT = 2 ERROR_WRONG_OBJECTS_COUNT = 2
MAX_WORKERS = 50 MAX_WORKERS = 50
DEFAULT_POLICY = "REP 2 IN X CBF 2 SELECT 2 FROM * AS X"
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
parser.add_argument('--size', help='Upload objects size in kb') parser.add_argument('--size', help='Upload objects size in kb')
parser.add_argument('--containers', help='Number of containers to create') parser.add_argument('--containers', help='Number of containers to create')
parser.add_argument('--retry', default=20, help='Maximum number of retries to create a container')
parser.add_argument('--out', help='JSON file with output') parser.add_argument('--out', help='JSON file with output')
parser.add_argument('--preload_obj', help='Number of pre-loaded objects') parser.add_argument('--preload_obj', help='Number of pre-loaded objects')
parser.add_argument('--wallet', help='Wallet file path') parser.add_argument('--wallet', help='Wallet file path')
parser.add_argument('--config', help='Wallet config file path') parser.add_argument('--config', help='Wallet config file path')
parser.add_argument( parser.add_argument(
"--policy", "--policy",
help="Container placement policy", help=f"Container placement policy. Default is {DEFAULT_POLICY}",
default="REP 2 IN X CBF 2 SELECT 2 FROM * AS X" action="append"
) )
parser.add_argument('--endpoint', help='Nodes addresses separated by comma.') parser.add_argument('--endpoint', help='Nodes addresses separated by comma.')
parser.add_argument('--update', help='Save existed containers') parser.add_argument('--update', help='Save existed containers')
@ -46,7 +48,10 @@ def main():
objects_list = [] objects_list = []
endpoints = args.endpoint.split(',') endpoints = args.endpoint.split(',')
if not args.policy:
args.policy = [DEFAULT_POLICY]
container_creation_retry = args.retry
wallet = args.wallet wallet = args.wallet
wallet_config = args.config wallet_config = args.config
workers = int(args.workers) workers = int(args.workers)
@ -63,9 +68,9 @@ def main():
containers_count = int(args.containers) containers_count = int(args.containers)
print(f"Create containers: {containers_count}") print(f"Create containers: {containers_count}")
with ProcessPoolExecutor(max_workers=min(MAX_WORKERS, workers)) as executor: with ProcessPoolExecutor(max_workers=min(MAX_WORKERS, workers)) as executor:
containers_runs = [executor.submit(create_container, endpoint, args.policy, wallet, wallet_config, args.acl, args.local) containers_runs = [executor.submit(create_container, endpoint, policy, container_creation_retry, wallet, wallet_config, args.acl, args.local)
for _, endpoint in for _, endpoint, policy in
zip(range(containers_count), cycle(endpoints))] zip(range(containers_count), cycle(endpoints), cycle(args.policy))]
for run in containers_runs: for run in containers_runs:
container_id = run.result() container_id = run.result()

View file

@ -11,6 +11,11 @@ from concurrent.futures import ProcessPoolExecutor
from helpers.cmd import random_payload from helpers.cmd import random_payload
from helpers.aws_cli import create_bucket, upload_object from helpers.aws_cli import create_bucket, upload_object
ERROR_WRONG_CONTAINERS_COUNT = 1
ERROR_WRONG_OBJECTS_COUNT = 2
MAX_WORKERS = 50
DEFAULT_LOCATION = ""
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
parser.add_argument('--size', help='Upload objects size in kb.') parser.add_argument('--size', help='Upload objects size in kb.')
@ -20,7 +25,7 @@ parser.add_argument('--preload_obj', help='Number of pre-loaded objects.')
parser.add_argument('--endpoint', help='S3 Gateways addresses separated by comma.') parser.add_argument('--endpoint', help='S3 Gateways addresses separated by comma.')
parser.add_argument('--update', help='True/False, False by default. Save existed buckets from target file (--out). ' parser.add_argument('--update', help='True/False, False by default. Save existed buckets from target file (--out). '
'New buckets will not be created.') 'New buckets will not be created.')
parser.add_argument('--location', help='AWS location. Will be empty, if has not be declared.', default="") parser.add_argument('--location', help=f'AWS location constraint. Default is "{DEFAULT_LOCATION}"', action="append")
parser.add_argument('--versioning', help='True/False, False by default.') parser.add_argument('--versioning', help='True/False, False by default.')
parser.add_argument('--ignore-errors', help='Ignore preset errors', action='store_true') parser.add_argument('--ignore-errors', help='Ignore preset errors', action='store_true')
parser.add_argument('--no-verify-ssl', help='Ignore SSL verifications', action='store_true') parser.add_argument('--no-verify-ssl', help='Ignore SSL verifications', action='store_true')
@ -32,10 +37,6 @@ parser.add_argument('--acl', help='Bucket ACL. Default is private. Expected valu
args = parser.parse_args() args = parser.parse_args()
print(args) print(args)
ERROR_WRONG_CONTAINERS_COUNT = 1
ERROR_WRONG_OBJECTS_COUNT = 2
MAX_WORKERS = 50
def main(): def main():
buckets = [] buckets = []
objects_list = [] objects_list = []
@ -43,6 +44,8 @@ def main():
no_verify_ssl = args.no_verify_ssl no_verify_ssl = args.no_verify_ssl
endpoints = args.endpoint.split(',') endpoints = args.endpoint.split(',')
if not args.location:
args.location = [DEFAULT_LOCATION]
workers = int(args.workers) workers = int(args.workers)
objects_per_bucket = int(args.preload_obj) objects_per_bucket = int(args.preload_obj)
@ -59,9 +62,9 @@ def main():
print(f"Create buckets: {buckets_count}") print(f"Create buckets: {buckets_count}")
with ProcessPoolExecutor(max_workers=min(MAX_WORKERS, workers)) as executor: with ProcessPoolExecutor(max_workers=min(MAX_WORKERS, workers)) as executor:
buckets_runs = [executor.submit(create_bucket, endpoint, args.versioning, args.location, args.acl, no_verify_ssl) buckets_runs = [executor.submit(create_bucket, endpoint, args.versioning, location, args.acl, no_verify_ssl)
for _, endpoint in for _, endpoint, location in
zip(range(buckets_count), cycle(endpoints))] zip(range(buckets_count), cycle(endpoints), cycle(args.location))]
for run in buckets_runs: for run in buckets_runs:
bucket_name = run.result() bucket_name = run.result()

View file

@ -71,11 +71,23 @@ if (write_vu_count > 0) {
}; };
} }
const read_vu_count = parseInt(__ENV.READERS || '0');
if (read_vu_count > 0) {
scenarios.read = {
executor : 'constant-vus',
vus : read_vu_count,
duration : `${duration}s`,
exec : 'obj_read',
gracefulStop : '5s',
};
}
const delete_age = __ENV.DELETE_AGE ? parseInt(__ENV.DELETE_AGE) : undefined; const delete_age = __ENV.DELETE_AGE ? parseInt(__ENV.DELETE_AGE) : undefined;
let obj_to_delete_selector = undefined; let obj_to_delete_selector = undefined;
let obj_to_delete_exit_on_null = undefined; let obj_to_delete_exit_on_null = undefined;
if (registry_enabled && delete_age) {
obj_to_delete_exit_on_null = write_vu_count == 0; if (registry_enabled ) {
obj_to_delete_exit_on_null = (write_vu_count == 0) && (read_vu_count == 0)
let constructor = obj_to_delete_exit_on_null ? registry.getOneshotSelector let constructor = obj_to_delete_exit_on_null ? registry.getOneshotSelector
: registry.getSelector; : registry.getSelector;
@ -88,16 +100,7 @@ if (registry_enabled && delete_age) {
}); });
} }
const read_vu_count = parseInt(__ENV.READERS || '0');
if (read_vu_count > 0) {
scenarios.read = {
executor : 'constant-vus',
vus : read_vu_count,
duration : `${duration}s`,
exec : 'obj_read',
gracefulStop : '5s',
};
}
const delete_vu_count = parseInt(__ENV.DELETERS || '0'); const delete_vu_count = parseInt(__ENV.DELETERS || '0');
if (delete_vu_count > 0) { if (delete_vu_count > 0) {

View file

@ -44,9 +44,8 @@ if (__ENV.GRPC_ENDPOINTS) {
grpc_client = native.connect( grpc_client = native.connect(
grpcEndpoint, '', __ENV.DIAL_TIMEOUT ? parseInt(__ENV.DIAL_TIMEOUT) : 0, grpcEndpoint, '', __ENV.DIAL_TIMEOUT ? parseInt(__ENV.DIAL_TIMEOUT) : 0,
__ENV.STREAM_TIMEOUT ? parseInt(__ENV.STREAM_TIMEOUT) : 0, __ENV.STREAM_TIMEOUT ? parseInt(__ENV.STREAM_TIMEOUT) : 0,
__ENV.PREPARE_LOCALLY ? __ENV.PREPARE_LOCALLY.toLowerCase() === 'true' : __ENV.PREPARE_LOCALLY ? __ENV.PREPARE_LOCALLY.toLowerCase() === 'true' : false,
false, 1024 * parseInt(__ENV.MAX_OBJECT_SIZE || '0'));
'');
} }
// Connect to random S3 endpoint // Connect to random S3 endpoint