Sync commits #4
9 changed files with 71 additions and 19 deletions
|
@ -79,6 +79,15 @@ import s3 from 'k6/x/neofs/s3';
|
|||
const s3_cli = s3.connect("http://s3.neofs.devenv:8080")
|
||||
```
|
||||
|
||||
You can also provide additional options:
|
||||
```js
|
||||
import s3 from 'k6/x/neofs/s3';
|
||||
const s3_cli = s3.connect("http://s3.neofs.devenv:8080", {'no_verify_ssl': 'true', 'timeout': '60s'})
|
||||
```
|
||||
|
||||
* `no_verify_ss` - Bool. If `true` - skip verifying the s3 certificate chain and host name (useful if s3 uses self-signed certificates)
|
||||
* `timeout` - Duration. Set timeout for requests (in http client). If omitted or zero - timeout is infinite.
|
||||
|
||||
### Methods
|
||||
- `createBucket(bucket, params)`. Returns dictionary with `success` boolean flag
|
||||
and `error` string. The `params` is a dictionary (e.g. `{acl:'private',lock_enabled:'true',location_constraint:'ru'}`)
|
||||
|
|
|
@ -4,7 +4,7 @@ import s3 from 'k6/x/neofs/s3';
|
|||
|
||||
const payload = open('../go.sum', 'b');
|
||||
const bucket = "cats"
|
||||
const s3_cli = s3.connect("http://s3.neofs.devenv:8080")
|
||||
const s3_cli = s3.connect("https://s3.neofs.devenv:8080", {'no_verify_ssl': 'true'})
|
||||
|
||||
export const options = {
|
||||
stages: [
|
||||
|
|
|
@ -51,7 +51,7 @@ func (n *Native) Exports() modules.Exports {
|
|||
return modules.Exports{Default: n}
|
||||
}
|
||||
|
||||
func (n *Native) Connect(endpoint, hexPrivateKey string) (*Client, error) {
|
||||
func (n *Native) Connect(endpoint, hexPrivateKey string, dialTimeout, streamTimeout int) (*Client, error) {
|
||||
var (
|
||||
cli client.Client
|
||||
pk *keys.PrivateKey
|
||||
|
@ -73,7 +73,9 @@ func (n *Native) Connect(endpoint, hexPrivateKey string) (*Client, error) {
|
|||
|
||||
var prmDial client.PrmDial
|
||||
prmDial.SetServerURI(endpoint)
|
||||
prmDial.SetTimeout(5 * time.Second)
|
||||
|
||||
prmDial.SetTimeout(time.Duration(dialTimeout) * time.Second)
|
||||
prmDial.SetStreamTimeout(time.Duration(streamTimeout) * time.Second)
|
||||
|
||||
err = cli.Dial(prmDial)
|
||||
if err != nil {
|
||||
|
|
|
@ -1,7 +1,11 @@
|
|||
package s3
|
||||
|
||||
import (
|
||||
"crypto/tls"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/aws/aws-sdk-go-v2/aws"
|
||||
"github.com/aws/aws-sdk-go-v2/config"
|
||||
|
@ -47,7 +51,7 @@ func (s *S3) Exports() modules.Exports {
|
|||
return modules.Exports{Default: s}
|
||||
}
|
||||
|
||||
func (s *S3) Connect(endpoint string) (*Client, error) {
|
||||
func (s *S3) Connect(endpoint string, params map[string]string) (*Client, error) {
|
||||
resolver := aws.EndpointResolverWithOptionsFunc(func(_, _ string, _ ...interface{}) (aws.Endpoint, error) {
|
||||
return aws.Endpoint{
|
||||
URL: endpoint,
|
||||
|
@ -59,11 +63,34 @@ func (s *S3) Connect(endpoint string) (*Client, error) {
|
|||
return nil, fmt.Errorf("configuration error: %w", err)
|
||||
}
|
||||
|
||||
var noVerifySSL bool
|
||||
if noVerifySSLStr, ok := params["no_verify_ssl"]; ok {
|
||||
if noVerifySSL, err = strconv.ParseBool(noVerifySSLStr); err != nil {
|
||||
return nil, fmt.Errorf("invalid value for 'no_verify_ssl': '%s'", noVerifySSLStr)
|
||||
}
|
||||
}
|
||||
|
||||
var timeout time.Duration
|
||||
if timeoutStr, ok := params["timeout"]; ok {
|
||||
if timeout, err = time.ParseDuration(timeoutStr); err != nil {
|
||||
return nil, fmt.Errorf("invalid value for 'timeout': '%s'", timeoutStr)
|
||||
}
|
||||
}
|
||||
|
||||
cli := s3.NewFromConfig(cfg, func(options *s3.Options) {
|
||||
// use 'domain/bucket/key' instead of default 'bucket.domain/key' scheme
|
||||
options.UsePathStyle = true
|
||||
// do not retry failed requests, by default client does up to 3 retry
|
||||
options.Retryer = aws.NopRetryer{}
|
||||
// s3 sometimes use self-signed certs
|
||||
options.HTTPClient = &http.Client{
|
||||
Transport: &http.Transport{
|
||||
TLSClientConfig: &tls.Config{
|
||||
InsecureSkipVerify: noVerifySSL,
|
||||
},
|
||||
},
|
||||
Timeout: timeout,
|
||||
}
|
||||
})
|
||||
|
||||
// register metrics
|
||||
|
|
|
@ -17,7 +17,7 @@ const read_size = JSON.parse(open(__ENV.PREGEN_JSON)).obj_size;
|
|||
// Select random gRPC endpoint for current VU
|
||||
const grpc_endpoints = __ENV.GRPC_ENDPOINTS.split(',');
|
||||
const grpc_endpoint = grpc_endpoints[Math.floor(Math.random() * grpc_endpoints.length)];
|
||||
const grpc_client = native.connect(grpc_endpoint, '');
|
||||
const grpc_client = native.connect(grpc_endpoint, '', __ENV.DIAL_TIMEOUT ? parseInt(__ENV.DIAL_TIMEOUT) : 5, __ENV.STREAM_TIMEOUT ? parseInt(__ENV.STREAM_TIMEOUT) : 15);
|
||||
|
||||
const registry_enabled = !!__ENV.REGISTRY_FILE;
|
||||
const obj_registry = registry_enabled ? registry.open(__ENV.REGISTRY_FILE) : undefined;
|
||||
|
|
|
@ -7,6 +7,8 @@ from helpers.neofs_cli import get_object
|
|||
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument('--endpoint', help='Node address')
|
||||
parser.add_argument('--wallet', help='Wallet file path')
|
||||
parser.add_argument('--config', help='Wallet config file path')
|
||||
parser.add_argument('--preset_file', help='JSON file path with preset')
|
||||
|
||||
args = parser.parse_args()
|
||||
|
@ -21,11 +23,14 @@ def main():
|
|||
success_objs = 0
|
||||
failed_objs = 0
|
||||
|
||||
wallet = args.wallet
|
||||
wallet_config = args.config
|
||||
|
||||
for obj in preset.get('objects'):
|
||||
oid = obj.get('object')
|
||||
cid = obj.get('container')
|
||||
|
||||
rst = get_object(cid, oid, args.endpoint, "/dev/null")
|
||||
rst = get_object(cid, oid, args.endpoint, "/dev/null", wallet, wallet_config)
|
||||
|
||||
if rst:
|
||||
success_objs += 1
|
||||
|
|
|
@ -16,6 +16,8 @@ parser.add_argument('--expected_copies', help="Expected amount of object copies"
|
|||
parser.add_argument('--preset_file', help='JSON file path with preset')
|
||||
parser.add_argument('--max_workers', help='Max workers in parallel', default=50)
|
||||
parser.add_argument('--print_failed', help='Print failed objects', default=False)
|
||||
parser.add_argument('--wallet', help='Wallet file path')
|
||||
parser.add_argument('--config', help='Wallet config file path')
|
||||
|
||||
|
||||
args: Namespace = parser.parse_args()
|
||||
|
@ -35,12 +37,14 @@ def main():
|
|||
objs_len = len(objs)
|
||||
|
||||
endpoints = args.endpoints.split(',')
|
||||
wallet = args.wallet
|
||||
wallet_config = args.config
|
||||
|
||||
final_discrubution = Counter(dict.fromkeys(endpoints, 0))
|
||||
|
||||
with ProcessPoolExecutor(max_workers=50) as executor:
|
||||
search_runs = {executor.submit(check_object_amounts, obj.get('container'), obj.get('object'), endpoints,
|
||||
int(args.expected_copies)): obj for obj in objs}
|
||||
int(args.expected_copies), wallet, wallet_config): obj for obj in objs}
|
||||
|
||||
ProgressBar.start()
|
||||
|
||||
|
@ -64,13 +68,13 @@ def main():
|
|||
print(f'{endpoint}: {final_discrubution[endpoint]}')
|
||||
|
||||
|
||||
def check_object_amounts(cid, oid, endpoints, expected_copies):
|
||||
def check_object_amounts(cid, oid, endpoints, expected_copies, wallet, wallet_config):
|
||||
distribution = Counter(dict.fromkeys(endpoints, 0))
|
||||
|
||||
copies_in_cluster = 0
|
||||
|
||||
for endpoint in endpoints:
|
||||
copy_on_endpoint = search_object_by_id(cid, oid, endpoint, ttl=1)
|
||||
copy_on_endpoint = search_object_by_id(cid, oid, endpoint, wallet, wallet_config, ttl=1)
|
||||
|
||||
copies_in_cluster += int(copy_on_endpoint)
|
||||
|
||||
|
|
|
@ -3,8 +3,8 @@ import re
|
|||
from helpers.cmd import execute_cmd
|
||||
|
||||
|
||||
def create_container(endpoint, policy):
|
||||
cmd_line = f"neofs-cli --rpc-endpoint {endpoint} container create -g" \
|
||||
def create_container(endpoint, policy, wallet_file, wallet_config):
|
||||
cmd_line = f"neofs-cli --rpc-endpoint {endpoint} container create --wallet {wallet_file} --config {wallet_config} " \
|
||||
f" --policy '{policy}' --basic-acl public-read-write --await"
|
||||
|
||||
output, success = execute_cmd(cmd_line)
|
||||
|
@ -27,9 +27,9 @@ def create_container(endpoint, policy):
|
|||
return splitted[1]
|
||||
|
||||
|
||||
def upload_object(container, payload_filepath, endpoint):
|
||||
def upload_object(container, payload_filepath, endpoint, wallet_file, wallet_config):
|
||||
object_name = ""
|
||||
cmd_line = f"neofs-cli --rpc-endpoint {endpoint} object put -g --file {payload_filepath} " \
|
||||
cmd_line = f"neofs-cli --rpc-endpoint {endpoint} object put --file {payload_filepath} --wallet {wallet_file} --config {wallet_config} " \
|
||||
f"--cid {container} --no-progress"
|
||||
output, success = execute_cmd(cmd_line)
|
||||
|
||||
|
@ -49,8 +49,8 @@ def upload_object(container, payload_filepath, endpoint):
|
|||
return splitted[1]
|
||||
|
||||
|
||||
def get_object(cid, oid, endpoint, out_filepath):
|
||||
cmd_line = f"neofs-cli object get -r {endpoint} -g --cid {cid} --oid {oid} " \
|
||||
def get_object(cid, oid, endpoint, out_filepath, wallet_file, wallet_config):
|
||||
cmd_line = f"neofs-cli object get -r {endpoint} --cid {cid} --oid {oid} --wallet {wallet_file} --config {wallet_config} " \
|
||||
f"--file {out_filepath}"
|
||||
|
||||
output, success = execute_cmd(cmd_line)
|
||||
|
@ -63,8 +63,8 @@ def get_object(cid, oid, endpoint, out_filepath):
|
|||
return True
|
||||
|
||||
|
||||
def search_object_by_id(cid, oid, endpoint, ttl=2):
|
||||
cmd_line = f"neofs-cli object search --ttl {ttl} -r {endpoint} -g --cid {cid} --oid {oid}"
|
||||
def search_object_by_id(cid, oid, endpoint, wallet_file, wallet_config, ttl=2):
|
||||
cmd_line = f"neofs-cli object search --ttl {ttl} -r {endpoint} --cid {cid} --oid {oid} --wallet {wallet_file} --config {wallet_config} "
|
||||
|
||||
output, success = execute_cmd(cmd_line)
|
||||
|
||||
|
|
|
@ -15,6 +15,8 @@ parser.add_argument('--size', help='Upload objects size in kb')
|
|||
parser.add_argument('--containers', help='Number of containers to create')
|
||||
parser.add_argument('--out', help='JSON file with output')
|
||||
parser.add_argument('--preload_obj', help='Number of pre-loaded objects')
|
||||
parser.add_argument('--wallet', help='Wallet file path')
|
||||
parser.add_argument('--config', help='Wallet config file path')
|
||||
parser.add_argument(
|
||||
"--policy",
|
||||
help="Container placement policy",
|
||||
|
@ -34,6 +36,9 @@ def main():
|
|||
|
||||
endpoints = args.endpoint.split(',')
|
||||
|
||||
wallet = args.wallet
|
||||
wallet_config = args.config
|
||||
|
||||
if args.update:
|
||||
# Open file
|
||||
with open(args.out) as f:
|
||||
|
@ -43,7 +48,7 @@ def main():
|
|||
print(f"Create containers: {args.containers}")
|
||||
with ProcessPoolExecutor(max_workers=50) as executor:
|
||||
containers_runs = {executor.submit(create_container, endpoints[random.randrange(len(endpoints))],
|
||||
args.policy): _ for _ in range(int(args.containers))}
|
||||
args.policy, wallet, wallet_config): _ for _ in range(int(args.containers))}
|
||||
|
||||
for run in containers_runs:
|
||||
if run.result():
|
||||
|
@ -63,7 +68,7 @@ def main():
|
|||
print(f" > Upload objects for container {container}")
|
||||
with ProcessPoolExecutor(max_workers=50) as executor:
|
||||
objects_runs = {executor.submit(upload_object, container, payload_filepath,
|
||||
endpoints[random.randrange(len(endpoints))]): _ for _ in range(int(args.preload_obj))}
|
||||
endpoints[random.randrange(len(endpoints))], wallet, wallet_config): _ for _ in range(int(args.preload_obj))}
|
||||
|
||||
for run in objects_runs:
|
||||
if run.result():
|
||||
|
|
Loading…
Reference in a new issue