diff --git a/internal/native/client.go b/internal/native/client.go index b6eec45..c249521 100644 --- a/internal/native/client.go +++ b/internal/native/client.go @@ -137,7 +137,6 @@ func (c *Client) Delete(containerID string, objectID string) DeleteResponse { panic(err) } - stats.Report(c.vu, objDeleteTotal, 1) start := time.Now() var prm client.PrmObjectDelete @@ -151,6 +150,7 @@ func (c *Client) Delete(containerID string, objectID string) DeleteResponse { return DeleteResponse{Success: false, Error: err.Error()} } + stats.Report(c.vu, objDeleteTotal, 1) stats.Report(c.vu, objDeleteDuration, metrics.D(time.Since(start))) return DeleteResponse{Success: true} } @@ -168,7 +168,6 @@ func (c *Client) Get(containerID, objectID string) GetResponse { panic(err) } - stats.Report(c.vu, objGetTotal, 1) start := time.Now() var prm client.PrmObjectGet @@ -185,6 +184,7 @@ func (c *Client) Get(containerID, objectID string) GetResponse { return GetResponse{Success: false, Error: err.Error()} } + stats.Report(c.vu, objGetTotal, 1) stats.Report(c.vu, objGetDuration, metrics.D(time.Since(start))) stats.ReportDataReceived(c.vu, float64(objSize)) return GetResponse{Success: true} @@ -428,7 +428,6 @@ func put(vu modules.VU, bufSize int, cli *client.Client, tok *session.Object, sz := rdr.Size() // starting upload - stats.Report(vu, objPutTotal, 1) start := time.Now() var prm client.PrmObjectPutInit @@ -462,10 +461,11 @@ func put(vu modules.VU, bufSize int, cli *client.Client, tok *session.Object, return nil, err } + stats.Report(vu, objPutTotal, 1) stats.ReportDataSent(vu, float64(sz)) stats.Report(vu, objPutDuration, metrics.D(time.Since(start))) - return resp, err + return resp, nil } func parseNetworkInfo(ctx context.Context, cli *client.Client) (maxObjSize, epoch uint64, hhDisabled bool, err error) { diff --git a/internal/native/native.go b/internal/native/native.go index 1f792fa..9f0ce17 100644 --- a/internal/native/native.go +++ b/internal/native/native.go @@ -79,7 +79,7 @@ func (n *Native) Connect(endpoint, hexPrivateKey string, dialTimeout, streamTime err = cli.Dial(prmDial) if err != nil { - return nil, fmt.Errorf("dial endpoint: %w", err) + return nil, fmt.Errorf("dial endpoint: %w %w", endpoint, err) } // generate session token @@ -88,7 +88,7 @@ func (n *Native) Connect(endpoint, hexPrivateKey string, dialTimeout, streamTime prmSessionCreate.SetExp(exp) sessionResp, err := cli.SessionCreate(n.vu.Context(), prmSessionCreate) if err != nil { - return nil, fmt.Errorf("dial endpoint: %w", err) + return nil, fmt.Errorf("dial endpoint: %w %w", endpoint, err) } var id uuid.UUID diff --git a/internal/s3/client.go b/internal/s3/client.go index 285bec6..13aec02 100644 --- a/internal/s3/client.go +++ b/internal/s3/client.go @@ -53,8 +53,6 @@ func (c *Client) Put(bucket, key string, payload goja.ArrayBuffer) PutResponse { rdr := bytes.NewReader(payload.Bytes()) sz := rdr.Size() - stats.Report(c.vu, objPutTotal, 1) - start := time.Now() _, err := c.cli.PutObject(c.vu.Context(), &s3.PutObjectInput{ Bucket: aws.String(bucket), @@ -66,13 +64,13 @@ func (c *Client) Put(bucket, key string, payload goja.ArrayBuffer) PutResponse { return PutResponse{Success: false, Error: err.Error()} } + stats.Report(c.vu, objPutTotal, 1) stats.ReportDataSent(c.vu, float64(sz)) stats.Report(c.vu, objPutDuration, metrics.D(time.Since(start))) return PutResponse{Success: true} } func (c *Client) Delete(bucket, key string) DeleteResponse { - stats.Report(c.vu, objDeleteTotal, 1) start := time.Now() _, err := c.cli.DeleteObject(c.vu.Context(), &s3.DeleteObjectInput{ @@ -84,12 +82,12 @@ func (c *Client) Delete(bucket, key string) DeleteResponse { return DeleteResponse{Success: false, Error: err.Error()} } + stats.Report(c.vu, objDeleteTotal, 1) stats.Report(c.vu, objDeleteDuration, metrics.D(time.Since(start))) return DeleteResponse{Success: true} } func (c *Client) Get(bucket, key string) GetResponse { - stats.Report(c.vu, objGetTotal, 1) start := time.Now() var objSize = 0 @@ -101,6 +99,7 @@ func (c *Client) Get(bucket, key string) GetResponse { return GetResponse{Success: false, Error: err.Error()} } + stats.Report(c.vu, objGetTotal, 1) stats.Report(c.vu, objGetDuration, metrics.D(time.Since(start))) stats.ReportDataReceived(c.vu, float64(objSize)) return GetResponse{Success: true} @@ -151,8 +150,6 @@ func (c *Client) VerifyHash(bucket, key, expectedHash string) VerifyHashResponse } func (c *Client) CreateBucket(bucket string, params map[string]string) CreateBucketResponse { - stats.Report(c.vu, createBucketTotal, 1) - var err error var lockEnabled bool if lockEnabledStr, ok := params["lock_enabled"]; ok { @@ -181,6 +178,7 @@ func (c *Client) CreateBucket(bucket string, params map[string]string) CreateBuc return CreateBucketResponse{Success: false, Error: err.Error()} } + stats.Report(c.vu, createBucketTotal, 1) stats.Report(c.vu, createBucketDuration, metrics.D(time.Since(start))) return CreateBucketResponse{Success: true} } diff --git a/scenarios/preset/preset_grpc.py b/scenarios/preset/preset_grpc.py index fd09fa7..5bf6dae 100755 --- a/scenarios/preset/preset_grpc.py +++ b/scenarios/preset/preset_grpc.py @@ -3,6 +3,7 @@ import argparse import json import random +import sys from argparse import Namespace from concurrent.futures import ProcessPoolExecutor @@ -10,6 +11,10 @@ from concurrent.futures import ProcessPoolExecutor from helpers.cmd import random_payload from helpers.frostfs_cli import create_container, upload_object +ERROR_NO_CONTAINERS = 1 +ERROR_NO_OBJECTS = 2 +MAX_WORKERS = 50 + parser = argparse.ArgumentParser() parser.add_argument('--size', help='Upload objects size in kb') parser.add_argument('--containers', help='Number of containers to create') @@ -24,6 +29,8 @@ parser.add_argument( ) parser.add_argument('--endpoint', help='Node address') parser.add_argument('--update', help='Save existed containers') +parser.add_argument('--ignore-errors', help='Ignore preset errors') +parser.add_argument('--workers', help='Count of workers in preset. Max = 50, Default = 50', default=50) args: Namespace = parser.parse_args() print(args) @@ -31,14 +38,15 @@ print(args) def main(): container_list = [] - objects_struct = [] + objects_list = [] payload_filepath = '/tmp/data_file' endpoints = args.endpoint.split(',') wallet = args.wallet wallet_config = args.config - + workers = int(args.workers) + ignore_errors = True if args.ignore_errors else False if args.update: # Open file with open(args.out) as f: @@ -46,7 +54,7 @@ def main(): container_list = data_json['containers'] else: print(f"Create containers: {args.containers}") - with ProcessPoolExecutor(max_workers=50) as executor: + with ProcessPoolExecutor(max_workers=min(MAX_WORKERS, workers)) as executor: containers_runs = {executor.submit(create_container, endpoints[random.randrange(len(endpoints))], args.policy, wallet, wallet_config): _ for _ in range(int(args.containers))} @@ -58,7 +66,9 @@ def main(): print(f" > Containers: {container_list}") if not container_list: - return + print("No containers to work with") + if not ignore_errors: + sys.exit(ERROR_NO_CONTAINERS) print(f"Upload objects to each container: {args.preload_obj} ") random_payload(payload_filepath, args.size) @@ -66,25 +76,30 @@ def main(): for container in container_list: print(f" > Upload objects for container {container}") - with ProcessPoolExecutor(max_workers=50) as executor: + with ProcessPoolExecutor(max_workers=min(MAX_WORKERS, workers)) as executor: objects_runs = {executor.submit(upload_object, container, payload_filepath, - endpoints[random.randrange(len(endpoints))], wallet, wallet_config): _ for _ in range(int(args.preload_obj))} + endpoints[random.randrange(len(endpoints))], wallet, wallet_config): _ for _ in range(int(args.preload_obj))} for run in objects_runs: if run.result(): - objects_struct.append({'container': container, 'object': run.result()}) + objects_list.append({'container': container, 'object': run.result()}) print(f" > Upload objects for container {container}: Completed") print("Upload objects to each container: Completed") - data = {'containers': container_list, 'objects': objects_struct, 'obj_size': args.size + " Kb"} + if int(args.preload_obj) > 0 and not objects_list: + print("No objects were uploaded") + if not ignore_errors: + sys.exit(ERROR_NO_OBJECTS) + + data = {'containers': container_list, 'objects': objects_list, 'obj_size': args.size + " Kb"} with open(args.out, 'w+') as f: json.dump(data, f, ensure_ascii=False, indent=2) - print(f"Result:") + print("Result:") print(f" > Total Containers has been created: {len(container_list)}.") - print(f" > Total Objects has been created: {len(objects_struct)}.") + print(f" > Total Objects has been created: {len(objects_list)}.") if __name__ == "__main__": diff --git a/scenarios/preset/preset_s3.py b/scenarios/preset/preset_s3.py index d1a374d..c28f47b 100755 --- a/scenarios/preset/preset_s3.py +++ b/scenarios/preset/preset_s3.py @@ -2,6 +2,7 @@ import argparse import json +import sys from concurrent.futures import ProcessPoolExecutor from helpers.cmd import random_payload @@ -18,16 +19,23 @@ parser.add_argument('--update', help='True/False, False by default. Save existed 'New buckets will not be created.') parser.add_argument('--location', help='AWS location. Will be empty, if has not be declared.', default="") parser.add_argument('--versioning', help='True/False, False by default.') +parser.add_argument('--ignore-errors', help='Ignore preset errors') +parser.add_argument('--workers', help='Count of workers in preset. Max = 50, Default = 50', default=50) args = parser.parse_args() print(args) +ERROR_NO_BUCKETS = 1 +ERROR_NO_OBJECTS = 2 +MAX_WORKERS = 50 def main(): bucket_list = [] - objects_struct = [] + objects_list = [] payload_filepath = '/tmp/data_file' + ignore_errors = True if args.ignore_errors else False + workers = int(args.workers) if args.update: # Open file with open(args.out) as f: @@ -37,7 +45,7 @@ def main(): else: print(f"Create buckets: {args.buckets}") - with ProcessPoolExecutor(max_workers=10) as executor: + with ProcessPoolExecutor(max_workers=min(MAX_WORKERS, workers)) as executor: buckets_runs = {executor.submit(create_bucket, args.endpoint, args.versioning, args.location): _ for _ in range(int(args.buckets))} @@ -48,6 +56,10 @@ def main(): print("Create buckets: Completed") print(f" > Buckets: {bucket_list}") + if not bucket_list: + print("No buckets to work with") + if not ignore_errors: + sys.exit(ERROR_NO_BUCKETS) print(f"Upload objects to each bucket: {args.preload_obj} ") random_payload(payload_filepath, args.size) @@ -55,25 +67,30 @@ def main(): for bucket in bucket_list: print(f" > Upload objects for bucket {bucket}") - with ProcessPoolExecutor(max_workers=50) as executor: + with ProcessPoolExecutor(max_workers=min(MAX_WORKERS, workers)) as executor: objects_runs = {executor.submit(upload_object, bucket, payload_filepath, args.endpoint): _ for _ in range(int(args.preload_obj))} for run in objects_runs: if run.result() is not None: - objects_struct.append({'bucket': bucket, 'object': run.result()}) + objects_list.append({'bucket': bucket, 'object': run.result()}) print(f" > Upload objects for bucket {bucket}: Completed") print("Upload objects to each bucket: Completed") - data = {'buckets': bucket_list, 'objects': objects_struct, 'obj_size': args.size + " Kb"} + if int(args.preload_obj) > 0 and not objects_list: + print("No objects were uploaded") + if not ignore_errors: + sys.exit(ERROR_NO_OBJECTS) + + data = {'buckets': bucket_list, 'objects': objects_list, 'obj_size': args.size + " Kb"} with open(args.out, 'w+') as f: json.dump(data, f, ensure_ascii=False, indent=2) - print(f"Result:") + print("Result:") print(f" > Total Buckets has been created: {len(bucket_list)}.") - print(f" > Total Objects has been created: {len(objects_struct)}.") + print(f" > Total Objects has been created: {len(objects_list)}.") if __name__ == "__main__":