Sync commits #4
9 changed files with 71 additions and 19 deletions
|
@ -79,6 +79,15 @@ import s3 from 'k6/x/neofs/s3';
|
||||||
const s3_cli = s3.connect("http://s3.neofs.devenv:8080")
|
const s3_cli = s3.connect("http://s3.neofs.devenv:8080")
|
||||||
```
|
```
|
||||||
|
|
||||||
|
You can also provide additional options:
|
||||||
|
```js
|
||||||
|
import s3 from 'k6/x/neofs/s3';
|
||||||
|
const s3_cli = s3.connect("http://s3.neofs.devenv:8080", {'no_verify_ssl': 'true', 'timeout': '60s'})
|
||||||
|
```
|
||||||
|
|
||||||
|
* `no_verify_ss` - Bool. If `true` - skip verifying the s3 certificate chain and host name (useful if s3 uses self-signed certificates)
|
||||||
|
* `timeout` - Duration. Set timeout for requests (in http client). If omitted or zero - timeout is infinite.
|
||||||
|
|
||||||
### Methods
|
### Methods
|
||||||
- `createBucket(bucket, params)`. Returns dictionary with `success` boolean flag
|
- `createBucket(bucket, params)`. Returns dictionary with `success` boolean flag
|
||||||
and `error` string. The `params` is a dictionary (e.g. `{acl:'private',lock_enabled:'true',location_constraint:'ru'}`)
|
and `error` string. The `params` is a dictionary (e.g. `{acl:'private',lock_enabled:'true',location_constraint:'ru'}`)
|
||||||
|
|
|
@ -4,7 +4,7 @@ import s3 from 'k6/x/neofs/s3';
|
||||||
|
|
||||||
const payload = open('../go.sum', 'b');
|
const payload = open('../go.sum', 'b');
|
||||||
const bucket = "cats"
|
const bucket = "cats"
|
||||||
const s3_cli = s3.connect("http://s3.neofs.devenv:8080")
|
const s3_cli = s3.connect("https://s3.neofs.devenv:8080", {'no_verify_ssl': 'true'})
|
||||||
|
|
||||||
export const options = {
|
export const options = {
|
||||||
stages: [
|
stages: [
|
||||||
|
|
|
@ -51,7 +51,7 @@ func (n *Native) Exports() modules.Exports {
|
||||||
return modules.Exports{Default: n}
|
return modules.Exports{Default: n}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *Native) Connect(endpoint, hexPrivateKey string) (*Client, error) {
|
func (n *Native) Connect(endpoint, hexPrivateKey string, dialTimeout, streamTimeout int) (*Client, error) {
|
||||||
var (
|
var (
|
||||||
cli client.Client
|
cli client.Client
|
||||||
pk *keys.PrivateKey
|
pk *keys.PrivateKey
|
||||||
|
@ -73,7 +73,9 @@ func (n *Native) Connect(endpoint, hexPrivateKey string) (*Client, error) {
|
||||||
|
|
||||||
var prmDial client.PrmDial
|
var prmDial client.PrmDial
|
||||||
prmDial.SetServerURI(endpoint)
|
prmDial.SetServerURI(endpoint)
|
||||||
prmDial.SetTimeout(5 * time.Second)
|
|
||||||
|
prmDial.SetTimeout(time.Duration(dialTimeout) * time.Second)
|
||||||
|
prmDial.SetStreamTimeout(time.Duration(streamTimeout) * time.Second)
|
||||||
|
|
||||||
err = cli.Dial(prmDial)
|
err = cli.Dial(prmDial)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -1,7 +1,11 @@
|
||||||
package s3
|
package s3
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"crypto/tls"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"net/http"
|
||||||
|
"strconv"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/aws/aws-sdk-go-v2/aws"
|
"github.com/aws/aws-sdk-go-v2/aws"
|
||||||
"github.com/aws/aws-sdk-go-v2/config"
|
"github.com/aws/aws-sdk-go-v2/config"
|
||||||
|
@ -47,7 +51,7 @@ func (s *S3) Exports() modules.Exports {
|
||||||
return modules.Exports{Default: s}
|
return modules.Exports{Default: s}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *S3) Connect(endpoint string) (*Client, error) {
|
func (s *S3) Connect(endpoint string, params map[string]string) (*Client, error) {
|
||||||
resolver := aws.EndpointResolverWithOptionsFunc(func(_, _ string, _ ...interface{}) (aws.Endpoint, error) {
|
resolver := aws.EndpointResolverWithOptionsFunc(func(_, _ string, _ ...interface{}) (aws.Endpoint, error) {
|
||||||
return aws.Endpoint{
|
return aws.Endpoint{
|
||||||
URL: endpoint,
|
URL: endpoint,
|
||||||
|
@ -59,11 +63,34 @@ func (s *S3) Connect(endpoint string) (*Client, error) {
|
||||||
return nil, fmt.Errorf("configuration error: %w", err)
|
return nil, fmt.Errorf("configuration error: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var noVerifySSL bool
|
||||||
|
if noVerifySSLStr, ok := params["no_verify_ssl"]; ok {
|
||||||
|
if noVerifySSL, err = strconv.ParseBool(noVerifySSLStr); err != nil {
|
||||||
|
return nil, fmt.Errorf("invalid value for 'no_verify_ssl': '%s'", noVerifySSLStr)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
var timeout time.Duration
|
||||||
|
if timeoutStr, ok := params["timeout"]; ok {
|
||||||
|
if timeout, err = time.ParseDuration(timeoutStr); err != nil {
|
||||||
|
return nil, fmt.Errorf("invalid value for 'timeout': '%s'", timeoutStr)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
cli := s3.NewFromConfig(cfg, func(options *s3.Options) {
|
cli := s3.NewFromConfig(cfg, func(options *s3.Options) {
|
||||||
// use 'domain/bucket/key' instead of default 'bucket.domain/key' scheme
|
// use 'domain/bucket/key' instead of default 'bucket.domain/key' scheme
|
||||||
options.UsePathStyle = true
|
options.UsePathStyle = true
|
||||||
// do not retry failed requests, by default client does up to 3 retry
|
// do not retry failed requests, by default client does up to 3 retry
|
||||||
options.Retryer = aws.NopRetryer{}
|
options.Retryer = aws.NopRetryer{}
|
||||||
|
// s3 sometimes use self-signed certs
|
||||||
|
options.HTTPClient = &http.Client{
|
||||||
|
Transport: &http.Transport{
|
||||||
|
TLSClientConfig: &tls.Config{
|
||||||
|
InsecureSkipVerify: noVerifySSL,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Timeout: timeout,
|
||||||
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
// register metrics
|
// register metrics
|
||||||
|
|
|
@ -17,7 +17,7 @@ const read_size = JSON.parse(open(__ENV.PREGEN_JSON)).obj_size;
|
||||||
// Select random gRPC endpoint for current VU
|
// Select random gRPC endpoint for current VU
|
||||||
const grpc_endpoints = __ENV.GRPC_ENDPOINTS.split(',');
|
const grpc_endpoints = __ENV.GRPC_ENDPOINTS.split(',');
|
||||||
const grpc_endpoint = grpc_endpoints[Math.floor(Math.random() * grpc_endpoints.length)];
|
const grpc_endpoint = grpc_endpoints[Math.floor(Math.random() * grpc_endpoints.length)];
|
||||||
const grpc_client = native.connect(grpc_endpoint, '');
|
const grpc_client = native.connect(grpc_endpoint, '', __ENV.DIAL_TIMEOUT ? parseInt(__ENV.DIAL_TIMEOUT) : 5, __ENV.STREAM_TIMEOUT ? parseInt(__ENV.STREAM_TIMEOUT) : 15);
|
||||||
|
|
||||||
const registry_enabled = !!__ENV.REGISTRY_FILE;
|
const registry_enabled = !!__ENV.REGISTRY_FILE;
|
||||||
const obj_registry = registry_enabled ? registry.open(__ENV.REGISTRY_FILE) : undefined;
|
const obj_registry = registry_enabled ? registry.open(__ENV.REGISTRY_FILE) : undefined;
|
||||||
|
|
|
@ -7,6 +7,8 @@ from helpers.neofs_cli import get_object
|
||||||
|
|
||||||
parser = argparse.ArgumentParser()
|
parser = argparse.ArgumentParser()
|
||||||
parser.add_argument('--endpoint', help='Node address')
|
parser.add_argument('--endpoint', help='Node address')
|
||||||
|
parser.add_argument('--wallet', help='Wallet file path')
|
||||||
|
parser.add_argument('--config', help='Wallet config file path')
|
||||||
parser.add_argument('--preset_file', help='JSON file path with preset')
|
parser.add_argument('--preset_file', help='JSON file path with preset')
|
||||||
|
|
||||||
args = parser.parse_args()
|
args = parser.parse_args()
|
||||||
|
@ -21,11 +23,14 @@ def main():
|
||||||
success_objs = 0
|
success_objs = 0
|
||||||
failed_objs = 0
|
failed_objs = 0
|
||||||
|
|
||||||
|
wallet = args.wallet
|
||||||
|
wallet_config = args.config
|
||||||
|
|
||||||
for obj in preset.get('objects'):
|
for obj in preset.get('objects'):
|
||||||
oid = obj.get('object')
|
oid = obj.get('object')
|
||||||
cid = obj.get('container')
|
cid = obj.get('container')
|
||||||
|
|
||||||
rst = get_object(cid, oid, args.endpoint, "/dev/null")
|
rst = get_object(cid, oid, args.endpoint, "/dev/null", wallet, wallet_config)
|
||||||
|
|
||||||
if rst:
|
if rst:
|
||||||
success_objs += 1
|
success_objs += 1
|
||||||
|
|
|
@ -16,6 +16,8 @@ parser.add_argument('--expected_copies', help="Expected amount of object copies"
|
||||||
parser.add_argument('--preset_file', help='JSON file path with preset')
|
parser.add_argument('--preset_file', help='JSON file path with preset')
|
||||||
parser.add_argument('--max_workers', help='Max workers in parallel', default=50)
|
parser.add_argument('--max_workers', help='Max workers in parallel', default=50)
|
||||||
parser.add_argument('--print_failed', help='Print failed objects', default=False)
|
parser.add_argument('--print_failed', help='Print failed objects', default=False)
|
||||||
|
parser.add_argument('--wallet', help='Wallet file path')
|
||||||
|
parser.add_argument('--config', help='Wallet config file path')
|
||||||
|
|
||||||
|
|
||||||
args: Namespace = parser.parse_args()
|
args: Namespace = parser.parse_args()
|
||||||
|
@ -35,12 +37,14 @@ def main():
|
||||||
objs_len = len(objs)
|
objs_len = len(objs)
|
||||||
|
|
||||||
endpoints = args.endpoints.split(',')
|
endpoints = args.endpoints.split(',')
|
||||||
|
wallet = args.wallet
|
||||||
|
wallet_config = args.config
|
||||||
|
|
||||||
final_discrubution = Counter(dict.fromkeys(endpoints, 0))
|
final_discrubution = Counter(dict.fromkeys(endpoints, 0))
|
||||||
|
|
||||||
with ProcessPoolExecutor(max_workers=50) as executor:
|
with ProcessPoolExecutor(max_workers=50) as executor:
|
||||||
search_runs = {executor.submit(check_object_amounts, obj.get('container'), obj.get('object'), endpoints,
|
search_runs = {executor.submit(check_object_amounts, obj.get('container'), obj.get('object'), endpoints,
|
||||||
int(args.expected_copies)): obj for obj in objs}
|
int(args.expected_copies), wallet, wallet_config): obj for obj in objs}
|
||||||
|
|
||||||
ProgressBar.start()
|
ProgressBar.start()
|
||||||
|
|
||||||
|
@ -64,13 +68,13 @@ def main():
|
||||||
print(f'{endpoint}: {final_discrubution[endpoint]}')
|
print(f'{endpoint}: {final_discrubution[endpoint]}')
|
||||||
|
|
||||||
|
|
||||||
def check_object_amounts(cid, oid, endpoints, expected_copies):
|
def check_object_amounts(cid, oid, endpoints, expected_copies, wallet, wallet_config):
|
||||||
distribution = Counter(dict.fromkeys(endpoints, 0))
|
distribution = Counter(dict.fromkeys(endpoints, 0))
|
||||||
|
|
||||||
copies_in_cluster = 0
|
copies_in_cluster = 0
|
||||||
|
|
||||||
for endpoint in endpoints:
|
for endpoint in endpoints:
|
||||||
copy_on_endpoint = search_object_by_id(cid, oid, endpoint, ttl=1)
|
copy_on_endpoint = search_object_by_id(cid, oid, endpoint, wallet, wallet_config, ttl=1)
|
||||||
|
|
||||||
copies_in_cluster += int(copy_on_endpoint)
|
copies_in_cluster += int(copy_on_endpoint)
|
||||||
|
|
||||||
|
|
|
@ -3,8 +3,8 @@ import re
|
||||||
from helpers.cmd import execute_cmd
|
from helpers.cmd import execute_cmd
|
||||||
|
|
||||||
|
|
||||||
def create_container(endpoint, policy):
|
def create_container(endpoint, policy, wallet_file, wallet_config):
|
||||||
cmd_line = f"neofs-cli --rpc-endpoint {endpoint} container create -g" \
|
cmd_line = f"neofs-cli --rpc-endpoint {endpoint} container create --wallet {wallet_file} --config {wallet_config} " \
|
||||||
f" --policy '{policy}' --basic-acl public-read-write --await"
|
f" --policy '{policy}' --basic-acl public-read-write --await"
|
||||||
|
|
||||||
output, success = execute_cmd(cmd_line)
|
output, success = execute_cmd(cmd_line)
|
||||||
|
@ -27,9 +27,9 @@ def create_container(endpoint, policy):
|
||||||
return splitted[1]
|
return splitted[1]
|
||||||
|
|
||||||
|
|
||||||
def upload_object(container, payload_filepath, endpoint):
|
def upload_object(container, payload_filepath, endpoint, wallet_file, wallet_config):
|
||||||
object_name = ""
|
object_name = ""
|
||||||
cmd_line = f"neofs-cli --rpc-endpoint {endpoint} object put -g --file {payload_filepath} " \
|
cmd_line = f"neofs-cli --rpc-endpoint {endpoint} object put --file {payload_filepath} --wallet {wallet_file} --config {wallet_config} " \
|
||||||
f"--cid {container} --no-progress"
|
f"--cid {container} --no-progress"
|
||||||
output, success = execute_cmd(cmd_line)
|
output, success = execute_cmd(cmd_line)
|
||||||
|
|
||||||
|
@ -49,8 +49,8 @@ def upload_object(container, payload_filepath, endpoint):
|
||||||
return splitted[1]
|
return splitted[1]
|
||||||
|
|
||||||
|
|
||||||
def get_object(cid, oid, endpoint, out_filepath):
|
def get_object(cid, oid, endpoint, out_filepath, wallet_file, wallet_config):
|
||||||
cmd_line = f"neofs-cli object get -r {endpoint} -g --cid {cid} --oid {oid} " \
|
cmd_line = f"neofs-cli object get -r {endpoint} --cid {cid} --oid {oid} --wallet {wallet_file} --config {wallet_config} " \
|
||||||
f"--file {out_filepath}"
|
f"--file {out_filepath}"
|
||||||
|
|
||||||
output, success = execute_cmd(cmd_line)
|
output, success = execute_cmd(cmd_line)
|
||||||
|
@ -63,8 +63,8 @@ def get_object(cid, oid, endpoint, out_filepath):
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
|
||||||
def search_object_by_id(cid, oid, endpoint, ttl=2):
|
def search_object_by_id(cid, oid, endpoint, wallet_file, wallet_config, ttl=2):
|
||||||
cmd_line = f"neofs-cli object search --ttl {ttl} -r {endpoint} -g --cid {cid} --oid {oid}"
|
cmd_line = f"neofs-cli object search --ttl {ttl} -r {endpoint} --cid {cid} --oid {oid} --wallet {wallet_file} --config {wallet_config} "
|
||||||
|
|
||||||
output, success = execute_cmd(cmd_line)
|
output, success = execute_cmd(cmd_line)
|
||||||
|
|
||||||
|
|
|
@ -15,6 +15,8 @@ parser.add_argument('--size', help='Upload objects size in kb')
|
||||||
parser.add_argument('--containers', help='Number of containers to create')
|
parser.add_argument('--containers', help='Number of containers to create')
|
||||||
parser.add_argument('--out', help='JSON file with output')
|
parser.add_argument('--out', help='JSON file with output')
|
||||||
parser.add_argument('--preload_obj', help='Number of pre-loaded objects')
|
parser.add_argument('--preload_obj', help='Number of pre-loaded objects')
|
||||||
|
parser.add_argument('--wallet', help='Wallet file path')
|
||||||
|
parser.add_argument('--config', help='Wallet config file path')
|
||||||
parser.add_argument(
|
parser.add_argument(
|
||||||
"--policy",
|
"--policy",
|
||||||
help="Container placement policy",
|
help="Container placement policy",
|
||||||
|
@ -34,6 +36,9 @@ def main():
|
||||||
|
|
||||||
endpoints = args.endpoint.split(',')
|
endpoints = args.endpoint.split(',')
|
||||||
|
|
||||||
|
wallet = args.wallet
|
||||||
|
wallet_config = args.config
|
||||||
|
|
||||||
if args.update:
|
if args.update:
|
||||||
# Open file
|
# Open file
|
||||||
with open(args.out) as f:
|
with open(args.out) as f:
|
||||||
|
@ -43,7 +48,7 @@ def main():
|
||||||
print(f"Create containers: {args.containers}")
|
print(f"Create containers: {args.containers}")
|
||||||
with ProcessPoolExecutor(max_workers=50) as executor:
|
with ProcessPoolExecutor(max_workers=50) as executor:
|
||||||
containers_runs = {executor.submit(create_container, endpoints[random.randrange(len(endpoints))],
|
containers_runs = {executor.submit(create_container, endpoints[random.randrange(len(endpoints))],
|
||||||
args.policy): _ for _ in range(int(args.containers))}
|
args.policy, wallet, wallet_config): _ for _ in range(int(args.containers))}
|
||||||
|
|
||||||
for run in containers_runs:
|
for run in containers_runs:
|
||||||
if run.result():
|
if run.result():
|
||||||
|
@ -63,7 +68,7 @@ def main():
|
||||||
print(f" > Upload objects for container {container}")
|
print(f" > Upload objects for container {container}")
|
||||||
with ProcessPoolExecutor(max_workers=50) as executor:
|
with ProcessPoolExecutor(max_workers=50) as executor:
|
||||||
objects_runs = {executor.submit(upload_object, container, payload_filepath,
|
objects_runs = {executor.submit(upload_object, container, payload_filepath,
|
||||||
endpoints[random.randrange(len(endpoints))]): _ for _ in range(int(args.preload_obj))}
|
endpoints[random.randrange(len(endpoints))], wallet, wallet_config): _ for _ in range(int(args.preload_obj))}
|
||||||
|
|
||||||
for run in objects_runs:
|
for run in objects_runs:
|
||||||
if run.result():
|
if run.result():
|
||||||
|
|
Loading…
Reference in a new issue