Sync commits #4

Merged
alexvanin merged 3 commits from sync into master 2022-12-29 14:35:14 +00:00
9 changed files with 71 additions and 19 deletions

View file

@ -79,6 +79,15 @@ import s3 from 'k6/x/neofs/s3';
const s3_cli = s3.connect("http://s3.neofs.devenv:8080") const s3_cli = s3.connect("http://s3.neofs.devenv:8080")
``` ```
You can also provide additional options:
```js
import s3 from 'k6/x/neofs/s3';
const s3_cli = s3.connect("http://s3.neofs.devenv:8080", {'no_verify_ssl': 'true', 'timeout': '60s'})
```
* `no_verify_ss` - Bool. If `true` - skip verifying the s3 certificate chain and host name (useful if s3 uses self-signed certificates)
* `timeout` - Duration. Set timeout for requests (in http client). If omitted or zero - timeout is infinite.
### Methods ### Methods
- `createBucket(bucket, params)`. Returns dictionary with `success` boolean flag - `createBucket(bucket, params)`. Returns dictionary with `success` boolean flag
and `error` string. The `params` is a dictionary (e.g. `{acl:'private',lock_enabled:'true',location_constraint:'ru'}`) and `error` string. The `params` is a dictionary (e.g. `{acl:'private',lock_enabled:'true',location_constraint:'ru'}`)

View file

@ -4,7 +4,7 @@ import s3 from 'k6/x/neofs/s3';
const payload = open('../go.sum', 'b'); const payload = open('../go.sum', 'b');
const bucket = "cats" const bucket = "cats"
const s3_cli = s3.connect("http://s3.neofs.devenv:8080") const s3_cli = s3.connect("https://s3.neofs.devenv:8080", {'no_verify_ssl': 'true'})
export const options = { export const options = {
stages: [ stages: [

View file

@ -51,7 +51,7 @@ func (n *Native) Exports() modules.Exports {
return modules.Exports{Default: n} return modules.Exports{Default: n}
} }
func (n *Native) Connect(endpoint, hexPrivateKey string) (*Client, error) { func (n *Native) Connect(endpoint, hexPrivateKey string, dialTimeout, streamTimeout int) (*Client, error) {
var ( var (
cli client.Client cli client.Client
pk *keys.PrivateKey pk *keys.PrivateKey
@ -73,7 +73,9 @@ func (n *Native) Connect(endpoint, hexPrivateKey string) (*Client, error) {
var prmDial client.PrmDial var prmDial client.PrmDial
prmDial.SetServerURI(endpoint) prmDial.SetServerURI(endpoint)
prmDial.SetTimeout(5 * time.Second)
prmDial.SetTimeout(time.Duration(dialTimeout) * time.Second)
prmDial.SetStreamTimeout(time.Duration(streamTimeout) * time.Second)
err = cli.Dial(prmDial) err = cli.Dial(prmDial)
if err != nil { if err != nil {

View file

@ -1,7 +1,11 @@
package s3 package s3
import ( import (
"crypto/tls"
"fmt" "fmt"
"net/http"
"strconv"
"time"
"github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config" "github.com/aws/aws-sdk-go-v2/config"
@ -47,7 +51,7 @@ func (s *S3) Exports() modules.Exports {
return modules.Exports{Default: s} return modules.Exports{Default: s}
} }
func (s *S3) Connect(endpoint string) (*Client, error) { func (s *S3) Connect(endpoint string, params map[string]string) (*Client, error) {
resolver := aws.EndpointResolverWithOptionsFunc(func(_, _ string, _ ...interface{}) (aws.Endpoint, error) { resolver := aws.EndpointResolverWithOptionsFunc(func(_, _ string, _ ...interface{}) (aws.Endpoint, error) {
return aws.Endpoint{ return aws.Endpoint{
URL: endpoint, URL: endpoint,
@ -59,11 +63,34 @@ func (s *S3) Connect(endpoint string) (*Client, error) {
return nil, fmt.Errorf("configuration error: %w", err) return nil, fmt.Errorf("configuration error: %w", err)
} }
var noVerifySSL bool
if noVerifySSLStr, ok := params["no_verify_ssl"]; ok {
if noVerifySSL, err = strconv.ParseBool(noVerifySSLStr); err != nil {
return nil, fmt.Errorf("invalid value for 'no_verify_ssl': '%s'", noVerifySSLStr)
}
}
var timeout time.Duration
if timeoutStr, ok := params["timeout"]; ok {
if timeout, err = time.ParseDuration(timeoutStr); err != nil {
return nil, fmt.Errorf("invalid value for 'timeout': '%s'", timeoutStr)
}
}
cli := s3.NewFromConfig(cfg, func(options *s3.Options) { cli := s3.NewFromConfig(cfg, func(options *s3.Options) {
// use 'domain/bucket/key' instead of default 'bucket.domain/key' scheme // use 'domain/bucket/key' instead of default 'bucket.domain/key' scheme
options.UsePathStyle = true options.UsePathStyle = true
// do not retry failed requests, by default client does up to 3 retry // do not retry failed requests, by default client does up to 3 retry
options.Retryer = aws.NopRetryer{} options.Retryer = aws.NopRetryer{}
// s3 sometimes use self-signed certs
options.HTTPClient = &http.Client{
Transport: &http.Transport{
TLSClientConfig: &tls.Config{
InsecureSkipVerify: noVerifySSL,
},
},
Timeout: timeout,
}
}) })
// register metrics // register metrics

View file

@ -17,7 +17,7 @@ const read_size = JSON.parse(open(__ENV.PREGEN_JSON)).obj_size;
// Select random gRPC endpoint for current VU // Select random gRPC endpoint for current VU
const grpc_endpoints = __ENV.GRPC_ENDPOINTS.split(','); const grpc_endpoints = __ENV.GRPC_ENDPOINTS.split(',');
const grpc_endpoint = grpc_endpoints[Math.floor(Math.random() * grpc_endpoints.length)]; const grpc_endpoint = grpc_endpoints[Math.floor(Math.random() * grpc_endpoints.length)];
const grpc_client = native.connect(grpc_endpoint, ''); const grpc_client = native.connect(grpc_endpoint, '', __ENV.DIAL_TIMEOUT ? parseInt(__ENV.DIAL_TIMEOUT) : 5, __ENV.STREAM_TIMEOUT ? parseInt(__ENV.STREAM_TIMEOUT) : 15);
const registry_enabled = !!__ENV.REGISTRY_FILE; const registry_enabled = !!__ENV.REGISTRY_FILE;
const obj_registry = registry_enabled ? registry.open(__ENV.REGISTRY_FILE) : undefined; const obj_registry = registry_enabled ? registry.open(__ENV.REGISTRY_FILE) : undefined;

View file

@ -7,6 +7,8 @@ from helpers.neofs_cli import get_object
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
parser.add_argument('--endpoint', help='Node address') parser.add_argument('--endpoint', help='Node address')
parser.add_argument('--wallet', help='Wallet file path')
parser.add_argument('--config', help='Wallet config file path')
parser.add_argument('--preset_file', help='JSON file path with preset') parser.add_argument('--preset_file', help='JSON file path with preset')
args = parser.parse_args() args = parser.parse_args()
@ -21,11 +23,14 @@ def main():
success_objs = 0 success_objs = 0
failed_objs = 0 failed_objs = 0
wallet = args.wallet
wallet_config = args.config
for obj in preset.get('objects'): for obj in preset.get('objects'):
oid = obj.get('object') oid = obj.get('object')
cid = obj.get('container') cid = obj.get('container')
rst = get_object(cid, oid, args.endpoint, "/dev/null") rst = get_object(cid, oid, args.endpoint, "/dev/null", wallet, wallet_config)
if rst: if rst:
success_objs += 1 success_objs += 1

View file

@ -16,6 +16,8 @@ parser.add_argument('--expected_copies', help="Expected amount of object copies"
parser.add_argument('--preset_file', help='JSON file path with preset') parser.add_argument('--preset_file', help='JSON file path with preset')
parser.add_argument('--max_workers', help='Max workers in parallel', default=50) parser.add_argument('--max_workers', help='Max workers in parallel', default=50)
parser.add_argument('--print_failed', help='Print failed objects', default=False) parser.add_argument('--print_failed', help='Print failed objects', default=False)
parser.add_argument('--wallet', help='Wallet file path')
parser.add_argument('--config', help='Wallet config file path')
args: Namespace = parser.parse_args() args: Namespace = parser.parse_args()
@ -35,12 +37,14 @@ def main():
objs_len = len(objs) objs_len = len(objs)
endpoints = args.endpoints.split(',') endpoints = args.endpoints.split(',')
wallet = args.wallet
wallet_config = args.config
final_discrubution = Counter(dict.fromkeys(endpoints, 0)) final_discrubution = Counter(dict.fromkeys(endpoints, 0))
with ProcessPoolExecutor(max_workers=50) as executor: with ProcessPoolExecutor(max_workers=50) as executor:
search_runs = {executor.submit(check_object_amounts, obj.get('container'), obj.get('object'), endpoints, search_runs = {executor.submit(check_object_amounts, obj.get('container'), obj.get('object'), endpoints,
int(args.expected_copies)): obj for obj in objs} int(args.expected_copies), wallet, wallet_config): obj for obj in objs}
ProgressBar.start() ProgressBar.start()
@ -64,13 +68,13 @@ def main():
print(f'{endpoint}: {final_discrubution[endpoint]}') print(f'{endpoint}: {final_discrubution[endpoint]}')
def check_object_amounts(cid, oid, endpoints, expected_copies): def check_object_amounts(cid, oid, endpoints, expected_copies, wallet, wallet_config):
distribution = Counter(dict.fromkeys(endpoints, 0)) distribution = Counter(dict.fromkeys(endpoints, 0))
copies_in_cluster = 0 copies_in_cluster = 0
for endpoint in endpoints: for endpoint in endpoints:
copy_on_endpoint = search_object_by_id(cid, oid, endpoint, ttl=1) copy_on_endpoint = search_object_by_id(cid, oid, endpoint, wallet, wallet_config, ttl=1)
copies_in_cluster += int(copy_on_endpoint) copies_in_cluster += int(copy_on_endpoint)

View file

@ -3,8 +3,8 @@ import re
from helpers.cmd import execute_cmd from helpers.cmd import execute_cmd
def create_container(endpoint, policy): def create_container(endpoint, policy, wallet_file, wallet_config):
cmd_line = f"neofs-cli --rpc-endpoint {endpoint} container create -g" \ cmd_line = f"neofs-cli --rpc-endpoint {endpoint} container create --wallet {wallet_file} --config {wallet_config} " \
f" --policy '{policy}' --basic-acl public-read-write --await" f" --policy '{policy}' --basic-acl public-read-write --await"
output, success = execute_cmd(cmd_line) output, success = execute_cmd(cmd_line)
@ -27,9 +27,9 @@ def create_container(endpoint, policy):
return splitted[1] return splitted[1]
def upload_object(container, payload_filepath, endpoint): def upload_object(container, payload_filepath, endpoint, wallet_file, wallet_config):
object_name = "" object_name = ""
cmd_line = f"neofs-cli --rpc-endpoint {endpoint} object put -g --file {payload_filepath} " \ cmd_line = f"neofs-cli --rpc-endpoint {endpoint} object put --file {payload_filepath} --wallet {wallet_file} --config {wallet_config} " \
f"--cid {container} --no-progress" f"--cid {container} --no-progress"
output, success = execute_cmd(cmd_line) output, success = execute_cmd(cmd_line)
@ -49,8 +49,8 @@ def upload_object(container, payload_filepath, endpoint):
return splitted[1] return splitted[1]
def get_object(cid, oid, endpoint, out_filepath): def get_object(cid, oid, endpoint, out_filepath, wallet_file, wallet_config):
cmd_line = f"neofs-cli object get -r {endpoint} -g --cid {cid} --oid {oid} " \ cmd_line = f"neofs-cli object get -r {endpoint} --cid {cid} --oid {oid} --wallet {wallet_file} --config {wallet_config} " \
f"--file {out_filepath}" f"--file {out_filepath}"
output, success = execute_cmd(cmd_line) output, success = execute_cmd(cmd_line)
@ -63,8 +63,8 @@ def get_object(cid, oid, endpoint, out_filepath):
return True return True
def search_object_by_id(cid, oid, endpoint, ttl=2): def search_object_by_id(cid, oid, endpoint, wallet_file, wallet_config, ttl=2):
cmd_line = f"neofs-cli object search --ttl {ttl} -r {endpoint} -g --cid {cid} --oid {oid}" cmd_line = f"neofs-cli object search --ttl {ttl} -r {endpoint} --cid {cid} --oid {oid} --wallet {wallet_file} --config {wallet_config} "
output, success = execute_cmd(cmd_line) output, success = execute_cmd(cmd_line)

View file

@ -15,6 +15,8 @@ parser.add_argument('--size', help='Upload objects size in kb')
parser.add_argument('--containers', help='Number of containers to create') parser.add_argument('--containers', help='Number of containers to create')
parser.add_argument('--out', help='JSON file with output') parser.add_argument('--out', help='JSON file with output')
parser.add_argument('--preload_obj', help='Number of pre-loaded objects') parser.add_argument('--preload_obj', help='Number of pre-loaded objects')
parser.add_argument('--wallet', help='Wallet file path')
parser.add_argument('--config', help='Wallet config file path')
parser.add_argument( parser.add_argument(
"--policy", "--policy",
help="Container placement policy", help="Container placement policy",
@ -34,6 +36,9 @@ def main():
endpoints = args.endpoint.split(',') endpoints = args.endpoint.split(',')
wallet = args.wallet
wallet_config = args.config
if args.update: if args.update:
# Open file # Open file
with open(args.out) as f: with open(args.out) as f:
@ -43,7 +48,7 @@ def main():
print(f"Create containers: {args.containers}") print(f"Create containers: {args.containers}")
with ProcessPoolExecutor(max_workers=50) as executor: with ProcessPoolExecutor(max_workers=50) as executor:
containers_runs = {executor.submit(create_container, endpoints[random.randrange(len(endpoints))], containers_runs = {executor.submit(create_container, endpoints[random.randrange(len(endpoints))],
args.policy): _ for _ in range(int(args.containers))} args.policy, wallet, wallet_config): _ for _ in range(int(args.containers))}
for run in containers_runs: for run in containers_runs:
if run.result(): if run.result():
@ -63,7 +68,7 @@ def main():
print(f" > Upload objects for container {container}") print(f" > Upload objects for container {container}")
with ProcessPoolExecutor(max_workers=50) as executor: with ProcessPoolExecutor(max_workers=50) as executor:
objects_runs = {executor.submit(upload_object, container, payload_filepath, objects_runs = {executor.submit(upload_object, container, payload_filepath,
endpoints[random.randrange(len(endpoints))]): _ for _ in range(int(args.preload_obj))} endpoints[random.randrange(len(endpoints))], wallet, wallet_config): _ for _ in range(int(args.preload_obj))}
for run in objects_runs: for run in objects_runs:
if run.result(): if run.result():