diff --git a/README.md b/README.md index c38c751..19b200b 100644 --- a/README.md +++ b/README.md @@ -79,6 +79,15 @@ import s3 from 'k6/x/neofs/s3'; const s3_cli = s3.connect("http://s3.neofs.devenv:8080") ``` +You can also provide additional options: +```js +import s3 from 'k6/x/neofs/s3'; +const s3_cli = s3.connect("http://s3.neofs.devenv:8080", {'no_verify_ssl': 'true', 'timeout': '60s'}) +``` + +* `no_verify_ss` - Bool. If `true` - skip verifying the s3 certificate chain and host name (useful if s3 uses self-signed certificates) +* `timeout` - Duration. Set timeout for requests (in http client). If omitted or zero - timeout is infinite. + ### Methods - `createBucket(bucket, params)`. Returns dictionary with `success` boolean flag and `error` string. The `params` is a dictionary (e.g. `{acl:'private',lock_enabled:'true',location_constraint:'ru'}`) diff --git a/examples/s3.js b/examples/s3.js index 0ad3d49..8830548 100644 --- a/examples/s3.js +++ b/examples/s3.js @@ -4,7 +4,7 @@ import s3 from 'k6/x/neofs/s3'; const payload = open('../go.sum', 'b'); const bucket = "cats" -const s3_cli = s3.connect("http://s3.neofs.devenv:8080") +const s3_cli = s3.connect("https://s3.neofs.devenv:8080", {'no_verify_ssl': 'true'}) export const options = { stages: [ diff --git a/internal/native/native.go b/internal/native/native.go index 287bef2..e7ae4dc 100644 --- a/internal/native/native.go +++ b/internal/native/native.go @@ -51,7 +51,7 @@ func (n *Native) Exports() modules.Exports { return modules.Exports{Default: n} } -func (n *Native) Connect(endpoint, hexPrivateKey string) (*Client, error) { +func (n *Native) Connect(endpoint, hexPrivateKey string, dialTimeout, streamTimeout int) (*Client, error) { var ( cli client.Client pk *keys.PrivateKey @@ -73,7 +73,9 @@ func (n *Native) Connect(endpoint, hexPrivateKey string) (*Client, error) { var prmDial client.PrmDial prmDial.SetServerURI(endpoint) - prmDial.SetTimeout(5 * time.Second) + + prmDial.SetTimeout(time.Duration(dialTimeout) * time.Second) + prmDial.SetStreamTimeout(time.Duration(streamTimeout) * time.Second) err = cli.Dial(prmDial) if err != nil { diff --git a/internal/s3/s3.go b/internal/s3/s3.go index 242f3b4..c2da5e7 100644 --- a/internal/s3/s3.go +++ b/internal/s3/s3.go @@ -1,7 +1,11 @@ package s3 import ( + "crypto/tls" "fmt" + "net/http" + "strconv" + "time" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/config" @@ -47,7 +51,7 @@ func (s *S3) Exports() modules.Exports { return modules.Exports{Default: s} } -func (s *S3) Connect(endpoint string) (*Client, error) { +func (s *S3) Connect(endpoint string, params map[string]string) (*Client, error) { resolver := aws.EndpointResolverWithOptionsFunc(func(_, _ string, _ ...interface{}) (aws.Endpoint, error) { return aws.Endpoint{ URL: endpoint, @@ -59,11 +63,34 @@ func (s *S3) Connect(endpoint string) (*Client, error) { return nil, fmt.Errorf("configuration error: %w", err) } + var noVerifySSL bool + if noVerifySSLStr, ok := params["no_verify_ssl"]; ok { + if noVerifySSL, err = strconv.ParseBool(noVerifySSLStr); err != nil { + return nil, fmt.Errorf("invalid value for 'no_verify_ssl': '%s'", noVerifySSLStr) + } + } + + var timeout time.Duration + if timeoutStr, ok := params["timeout"]; ok { + if timeout, err = time.ParseDuration(timeoutStr); err != nil { + return nil, fmt.Errorf("invalid value for 'timeout': '%s'", timeoutStr) + } + } + cli := s3.NewFromConfig(cfg, func(options *s3.Options) { // use 'domain/bucket/key' instead of default 'bucket.domain/key' scheme options.UsePathStyle = true // do not retry failed requests, by default client does up to 3 retry options.Retryer = aws.NopRetryer{} + // s3 sometimes use self-signed certs + options.HTTPClient = &http.Client{ + Transport: &http.Transport{ + TLSClientConfig: &tls.Config{ + InsecureSkipVerify: noVerifySSL, + }, + }, + Timeout: timeout, + } }) // register metrics diff --git a/scenarios/grpc.js b/scenarios/grpc.js index 0e19f53..e731171 100644 --- a/scenarios/grpc.js +++ b/scenarios/grpc.js @@ -17,7 +17,7 @@ const read_size = JSON.parse(open(__ENV.PREGEN_JSON)).obj_size; // Select random gRPC endpoint for current VU const grpc_endpoints = __ENV.GRPC_ENDPOINTS.split(','); const grpc_endpoint = grpc_endpoints[Math.floor(Math.random() * grpc_endpoints.length)]; -const grpc_client = native.connect(grpc_endpoint, ''); +const grpc_client = native.connect(grpc_endpoint, '', __ENV.DIAL_TIMEOUT ? parseInt(__ENV.DIAL_TIMEOUT) : 5, __ENV.STREAM_TIMEOUT ? parseInt(__ENV.STREAM_TIMEOUT) : 15); const registry_enabled = !!__ENV.REGISTRY_FILE; const obj_registry = registry_enabled ? registry.open(__ENV.REGISTRY_FILE) : undefined; diff --git a/scenarios/preset/check_objects_in_preset.py b/scenarios/preset/check_objects_in_preset.py index a68ed6c..1e84d57 100755 --- a/scenarios/preset/check_objects_in_preset.py +++ b/scenarios/preset/check_objects_in_preset.py @@ -7,6 +7,8 @@ from helpers.neofs_cli import get_object parser = argparse.ArgumentParser() parser.add_argument('--endpoint', help='Node address') +parser.add_argument('--wallet', help='Wallet file path') +parser.add_argument('--config', help='Wallet config file path') parser.add_argument('--preset_file', help='JSON file path with preset') args = parser.parse_args() @@ -21,11 +23,14 @@ def main(): success_objs = 0 failed_objs = 0 + wallet = args.wallet + wallet_config = args.config + for obj in preset.get('objects'): oid = obj.get('object') cid = obj.get('container') - rst = get_object(cid, oid, args.endpoint, "/dev/null") + rst = get_object(cid, oid, args.endpoint, "/dev/null", wallet, wallet_config) if rst: success_objs += 1 diff --git a/scenarios/preset/check_policy_compliance.py b/scenarios/preset/check_policy_compliance.py index 89c45f9..54297b1 100755 --- a/scenarios/preset/check_policy_compliance.py +++ b/scenarios/preset/check_policy_compliance.py @@ -16,6 +16,8 @@ parser.add_argument('--expected_copies', help="Expected amount of object copies" parser.add_argument('--preset_file', help='JSON file path with preset') parser.add_argument('--max_workers', help='Max workers in parallel', default=50) parser.add_argument('--print_failed', help='Print failed objects', default=False) +parser.add_argument('--wallet', help='Wallet file path') +parser.add_argument('--config', help='Wallet config file path') args: Namespace = parser.parse_args() @@ -35,12 +37,14 @@ def main(): objs_len = len(objs) endpoints = args.endpoints.split(',') + wallet = args.wallet + wallet_config = args.config final_discrubution = Counter(dict.fromkeys(endpoints, 0)) with ProcessPoolExecutor(max_workers=50) as executor: search_runs = {executor.submit(check_object_amounts, obj.get('container'), obj.get('object'), endpoints, - int(args.expected_copies)): obj for obj in objs} + int(args.expected_copies), wallet, wallet_config): obj for obj in objs} ProgressBar.start() @@ -64,13 +68,13 @@ def main(): print(f'{endpoint}: {final_discrubution[endpoint]}') -def check_object_amounts(cid, oid, endpoints, expected_copies): +def check_object_amounts(cid, oid, endpoints, expected_copies, wallet, wallet_config): distribution = Counter(dict.fromkeys(endpoints, 0)) copies_in_cluster = 0 for endpoint in endpoints: - copy_on_endpoint = search_object_by_id(cid, oid, endpoint, ttl=1) + copy_on_endpoint = search_object_by_id(cid, oid, endpoint, wallet, wallet_config, ttl=1) copies_in_cluster += int(copy_on_endpoint) diff --git a/scenarios/preset/helpers/neofs_cli.py b/scenarios/preset/helpers/neofs_cli.py index 4ba7ba4..76c242b 100644 --- a/scenarios/preset/helpers/neofs_cli.py +++ b/scenarios/preset/helpers/neofs_cli.py @@ -3,8 +3,8 @@ import re from helpers.cmd import execute_cmd -def create_container(endpoint, policy): - cmd_line = f"neofs-cli --rpc-endpoint {endpoint} container create -g" \ +def create_container(endpoint, policy, wallet_file, wallet_config): + cmd_line = f"neofs-cli --rpc-endpoint {endpoint} container create --wallet {wallet_file} --config {wallet_config} " \ f" --policy '{policy}' --basic-acl public-read-write --await" output, success = execute_cmd(cmd_line) @@ -27,9 +27,9 @@ def create_container(endpoint, policy): return splitted[1] -def upload_object(container, payload_filepath, endpoint): +def upload_object(container, payload_filepath, endpoint, wallet_file, wallet_config): object_name = "" - cmd_line = f"neofs-cli --rpc-endpoint {endpoint} object put -g --file {payload_filepath} " \ + cmd_line = f"neofs-cli --rpc-endpoint {endpoint} object put --file {payload_filepath} --wallet {wallet_file} --config {wallet_config} " \ f"--cid {container} --no-progress" output, success = execute_cmd(cmd_line) @@ -49,8 +49,8 @@ def upload_object(container, payload_filepath, endpoint): return splitted[1] -def get_object(cid, oid, endpoint, out_filepath): - cmd_line = f"neofs-cli object get -r {endpoint} -g --cid {cid} --oid {oid} " \ +def get_object(cid, oid, endpoint, out_filepath, wallet_file, wallet_config): + cmd_line = f"neofs-cli object get -r {endpoint} --cid {cid} --oid {oid} --wallet {wallet_file} --config {wallet_config} " \ f"--file {out_filepath}" output, success = execute_cmd(cmd_line) @@ -63,8 +63,8 @@ def get_object(cid, oid, endpoint, out_filepath): return True -def search_object_by_id(cid, oid, endpoint, ttl=2): - cmd_line = f"neofs-cli object search --ttl {ttl} -r {endpoint} -g --cid {cid} --oid {oid}" +def search_object_by_id(cid, oid, endpoint, wallet_file, wallet_config, ttl=2): + cmd_line = f"neofs-cli object search --ttl {ttl} -r {endpoint} --cid {cid} --oid {oid} --wallet {wallet_file} --config {wallet_config} " output, success = execute_cmd(cmd_line) diff --git a/scenarios/preset/preset_grpc.py b/scenarios/preset/preset_grpc.py index a6a87fe..bc61de3 100755 --- a/scenarios/preset/preset_grpc.py +++ b/scenarios/preset/preset_grpc.py @@ -15,6 +15,8 @@ parser.add_argument('--size', help='Upload objects size in kb') parser.add_argument('--containers', help='Number of containers to create') parser.add_argument('--out', help='JSON file with output') parser.add_argument('--preload_obj', help='Number of pre-loaded objects') +parser.add_argument('--wallet', help='Wallet file path') +parser.add_argument('--config', help='Wallet config file path') parser.add_argument( "--policy", help="Container placement policy", @@ -34,6 +36,9 @@ def main(): endpoints = args.endpoint.split(',') + wallet = args.wallet + wallet_config = args.config + if args.update: # Open file with open(args.out) as f: @@ -43,7 +48,7 @@ def main(): print(f"Create containers: {args.containers}") with ProcessPoolExecutor(max_workers=50) as executor: containers_runs = {executor.submit(create_container, endpoints[random.randrange(len(endpoints))], - args.policy): _ for _ in range(int(args.containers))} + args.policy, wallet, wallet_config): _ for _ in range(int(args.containers))} for run in containers_runs: if run.result(): @@ -63,7 +68,7 @@ def main(): print(f" > Upload objects for container {container}") with ProcessPoolExecutor(max_workers=50) as executor: objects_runs = {executor.submit(upload_object, container, payload_filepath, - endpoints[random.randrange(len(endpoints))]): _ for _ in range(int(args.preload_obj))} + endpoints[random.randrange(len(endpoints))], wallet, wallet_config): _ for _ in range(int(args.preload_obj))} for run in objects_runs: if run.result():