diff --git a/scenarios/preset/helpers/aws_cli.py b/scenarios/preset/helpers/aws_cli.py index befd27b..68fb7e2 100644 --- a/scenarios/preset/helpers/aws_cli.py +++ b/scenarios/preset/helpers/aws_cli.py @@ -27,7 +27,8 @@ def create_bucket(endpoint, versioning, location): print(f" > Bucket versioning has not been applied for bucket {bucket_name}:\n{out}") else: print(f" > Bucket versioning has been applied.") - + + print(f"Created bucket: {bucket_name} via endpoint {endpoint}") return bucket_name @@ -41,5 +42,5 @@ def upload_object(bucket, payload_filepath, endpoint): if not success: print(f" > Object {object_name} has not been uploaded.") return False - else: - return object_name + + return bucket, endpoint, object_name diff --git a/scenarios/preset/helpers/frostfs_cli.py b/scenarios/preset/helpers/frostfs_cli.py index c829a3e..f8bf4a0 100644 --- a/scenarios/preset/helpers/frostfs_cli.py +++ b/scenarios/preset/helpers/frostfs_cli.py @@ -12,19 +12,19 @@ def create_container(endpoint, policy, wallet_file, wallet_config): if not success: print(f" > Container has not been created:\n{output}") return False - else: - try: - fst_str = output.split('\n')[0] - except Exception: - print(f"Got empty output: {output}") - return False - splitted = fst_str.split(": ") - if len(splitted) != 2: - raise ValueError(f"no CID was parsed from command output: \t{fst_str}") + + try: + fst_str = output.split('\n')[0] + except Exception: + print(f"Got empty output: {output}") + return False + splitted = fst_str.split(": ") + if len(splitted) != 2: + raise ValueError(f"no CID was parsed from command output: \t{fst_str}") - print(f"Created container: {splitted[1]}") + print(f"Created container: {splitted[1]} via endpoint {endpoint}") - return splitted[1] + return splitted[1] def upload_object(container, payload_filepath, endpoint, wallet_file, wallet_config): @@ -36,17 +36,17 @@ def upload_object(container, payload_filepath, endpoint, wallet_file, wallet_con if not success: print(f" > Object {object_name} has not been uploaded:\n{output}") return False - else: - try: - # taking second string from command output - snd_str = output.split('\n')[1] - except Exception: - print(f"Got empty input: {output}") - return False - splitted = snd_str.split(": ") - if len(splitted) != 2: - raise Exception(f"no OID was parsed from command output: \t{snd_str}") - return splitted[1] + + try: + # taking second string from command output + snd_str = output.split('\n')[1] + except Exception: + print(f"Got empty input: {output}") + return False + splitted = snd_str.split(": ") + if len(splitted) != 2: + raise Exception(f"no OID was parsed from command output: \t{snd_str}") + return container, endpoint, splitted[1] def get_object(cid, oid, endpoint, out_filepath, wallet_file, wallet_config): diff --git a/scenarios/preset/preset_grpc.py b/scenarios/preset/preset_grpc.py index a7b0b08..c6f19a6 100755 --- a/scenarios/preset/preset_grpc.py +++ b/scenarios/preset/preset_grpc.py @@ -1,6 +1,7 @@ #!/usr/bin/python3 import argparse +from itertools import cycle import json import random import sys @@ -29,7 +30,7 @@ parser.add_argument( help="Container placement policy", default="REP 2 IN X CBF 2 SELECT 2 FROM * AS X" ) -parser.add_argument('--endpoint', help='Node address') +parser.add_argument('--endpoint', help='Nodes addresses separated by comma.') parser.add_argument('--update', help='Save existed containers') parser.add_argument('--ignore-errors', help='Ignore preset errors') parser.add_argument('--workers', help='Count of workers in preset. Max = 50, Default = 50', default=50) @@ -41,7 +42,7 @@ print(args) def main(): - container_list = [] + containers = [] objects_list = [] endpoints = args.endpoint.split(',') @@ -56,24 +57,26 @@ def main(): # Open file with open(args.out) as f: data_json = json.load(f) - container_list = data_json['containers'] - containers_count = len(container_list) + containers = data_json['containers'] + containers_count = len(containers) else: containers_count = int(args.containers) print(f"Create containers: {containers_count}") with ProcessPoolExecutor(max_workers=min(MAX_WORKERS, workers)) as executor: - containers_runs = {executor.submit(create_container, endpoints[random.randrange(len(endpoints))], - args.policy, wallet, wallet_config): _ for _ in range(containers_count)} + containers_runs = [executor.submit(create_container, endpoint, args.policy, wallet, wallet_config) + for _, endpoint in + zip(range(containers_count), cycle(endpoints))] for run in containers_runs: - if run.result(): - container_list.append(run.result()) + container_id = run.result() + if container_id: + containers.append(container_id) print("Create containers: Completed") - print(f" > Containers: {container_list}") - if containers_count == 0 or len(container_list) != containers_count: - print(f"Containers mismatch in preset: expected {containers_count}, created {len(container_list)}") + print(f" > Containers: {containers}") + if containers_count == 0 or len(containers) != containers_count: + print(f"Containers mismatch in preset: expected {containers_count}, created {len(containers)}") if not ignore_errors: sys.exit(ERROR_WRONG_CONTAINERS_COUNT) @@ -86,32 +89,36 @@ def main(): random_payload(payload_file, args.size) print(" > Create random payload: Completed") - for container in container_list: - print(f" > Upload objects for container {container}") - with ProcessPoolExecutor(max_workers=min(MAX_WORKERS, workers)) as executor: - objects_runs = {executor.submit(upload_object, container, payload_file.name, - endpoints[random.randrange(len(endpoints))], wallet, wallet_config): _ for _ in range(objects_per_container)} - - for run in objects_runs: - if run.result(): - objects_list.append({'container': container, 'object': run.result()}) - print(f" > Upload objects for container {container}: Completed") + total_objects = objects_per_container * containers_count + with ProcessPoolExecutor(max_workers=min(MAX_WORKERS, workers)) as executor: + objects_runs = [executor.submit(upload_object, container, payload_file.name, + endpoint, wallet, wallet_config) + for _, container, endpoint in + zip(range(total_objects), cycle(containers), cycle(endpoints))] + + for run in objects_runs: + result = run.result() + if run.result: + container_id = result[0] + endpoint = result[1] + object_id = result[2] + objects_list.append({'container': container_id, 'object': object_id}) + print(f" > Uploaded object {object_id} for container {container_id} via endpoint {endpoint}.") print("Upload objects to each container: Completed") - total_objects = objects_per_container * containers_count if total_objects > 0 and len(objects_list) != total_objects: print(f"Objects mismatch in preset: expected {total_objects}, created {len(objects_list)}") if not ignore_errors: sys.exit(ERROR_WRONG_OBJECTS_COUNT) - data = {'containers': container_list, 'objects': objects_list, 'obj_size': args.size + " Kb"} + data = {'containers': containers, 'objects': objects_list, 'obj_size': args.size + " Kb"} with open(args.out, 'w+') as f: json.dump(data, f, ensure_ascii=False, indent=2) print("Result:") - print(f" > Total Containers has been created: {len(container_list)}.") + print(f" > Total Containers has been created: {len(containers)}.") print(f" > Total Objects has been created: {len(objects_list)}.") diff --git a/scenarios/preset/preset_s3.py b/scenarios/preset/preset_s3.py index 174e62c..16205cc 100755 --- a/scenarios/preset/preset_s3.py +++ b/scenarios/preset/preset_s3.py @@ -1,6 +1,7 @@ #!/usr/bin/python3 import argparse +from itertools import cycle import json import sys import tempfile @@ -16,7 +17,7 @@ parser.add_argument('--size', help='Upload objects size in kb.') parser.add_argument('--buckets', help='Number of buckets to create.') parser.add_argument('--out', help='JSON file with output.') parser.add_argument('--preload_obj', help='Number of pre-loaded objects.') -parser.add_argument('--endpoint', help='S3 Gateway address.') +parser.add_argument('--endpoint', help='S3 Gateways addresses separated by comma.') parser.add_argument('--update', help='True/False, False by default. Save existed buckets from target file (--out). ' 'New buckets will not be created.') parser.add_argument('--location', help='AWS location. Will be empty, if has not be declared.', default="") @@ -34,10 +35,12 @@ ERROR_WRONG_OBJECTS_COUNT = 2 MAX_WORKERS = 50 def main(): - bucket_list = [] + buckets = [] objects_list = [] ignore_errors = True if args.ignore_errors else False + endpoints = args.endpoint.split(',') + workers = int(args.workers) objects_per_bucket = int(args.preload_obj) @@ -45,26 +48,28 @@ def main(): # Open file with open(args.out) as f: data_json = json.load(f) - bucket_list = data_json['buckets'] - buckets_count = len(bucket_list) + buckets = data_json['buckets'] + buckets_count = len(buckets) # Get CID list else: buckets_count = int(args.buckets) print(f"Create buckets: {buckets_count}") with ProcessPoolExecutor(max_workers=min(MAX_WORKERS, workers)) as executor: - buckets_runs = {executor.submit(create_bucket, args.endpoint, args.versioning, - args.location): _ for _ in range(buckets_count)} + buckets_runs = [executor.submit(create_bucket, endpoint, args.versioning, args.location) + for _, endpoint in + zip(range(buckets_count), cycle(endpoints))] for run in buckets_runs: - if run.result(): - bucket_list.append(run.result()) + bucket_name = run.result() + if bucket_name: + buckets.append(bucket_name) print("Create buckets: Completed") - print(f" > Buckets: {bucket_list}") - if buckets_count == 0 or len(bucket_list) != buckets_count: - print(f"Buckets mismatch in preset: expected {buckets_count}, created {len(bucket_list)}") + print(f" > Buckets: {buckets}") + if buckets_count == 0 or len(buckets) != buckets_count: + print(f"Buckets mismatch in preset: expected {buckets_count}, created {len(buckets)}") if not ignore_errors: sys.exit(ERROR_WRONG_CONTAINERS_COUNT) @@ -77,32 +82,34 @@ def main(): random_payload(payload_file, args.size) print(" > Create random payload: Completed") - for bucket in bucket_list: - print(f" > Upload objects for bucket {bucket}") - with ProcessPoolExecutor(max_workers=min(MAX_WORKERS, workers)) as executor: - objects_runs = {executor.submit(upload_object, bucket, payload_file.name, - args.endpoint): _ for _ in range(objects_per_bucket)} - - for run in objects_runs: - if run.result(): - objects_list.append({'bucket': bucket, 'object': run.result()}) - print(f" > Upload objects for bucket {bucket}: Completed") - - print("Upload objects to each bucket: Completed") - total_objects = objects_per_bucket * buckets_count + + with ProcessPoolExecutor(max_workers=min(MAX_WORKERS, workers)) as executor: + objects_runs = [executor.submit(upload_object, bucket, payload_file.name, endpoint) + for _, bucket, endpoint in + zip(range(total_objects), cycle(buckets), cycle(endpoints))] + + for run in objects_runs: + result = run.result() + if run.result: + bucket = result[0] + endpoint = result[1] + object_id = result[2] + objects_list.append({'bucket': bucket, 'object': object_id}) + print(f" > Uploaded object {object_id} for bucket {bucket} via endpoint {endpoint}.") + if total_objects > 0 and len(objects_list) != total_objects: print(f"Objects mismatch in preset: expected {total_objects}, created {len(objects_list)}") if not ignore_errors: sys.exit(ERROR_WRONG_OBJECTS_COUNT) - data = {'buckets': bucket_list, 'objects': objects_list, 'obj_size': args.size + " Kb"} + data = {'buckets': buckets, 'objects': objects_list, 'obj_size': args.size + " Kb"} with open(args.out, 'w+') as f: json.dump(data, f, ensure_ascii=False, indent=2) print("Result:") - print(f" > Total Buckets has been created: {len(bucket_list)}.") + print(f" > Total Buckets has been created: {len(buckets)}.") print(f" > Total Objects has been created: {len(objects_list)}.")