#!/usr/bin/python3 import argparse from itertools import cycle import json import sys import tempfile import time from argparse import Namespace from concurrent.futures import ProcessPoolExecutor from helpers.cmd import random_payload from helpers.frostfs_cli import create_container, upload_object ERROR_WRONG_CONTAINERS_COUNT = 1 ERROR_WRONG_OBJECTS_COUNT = 2 MAX_WORKERS = 50 DEFAULT_POLICY = "REP 2 IN X CBF 2 SELECT 2 FROM * AS X" DEFAULT_RULES = ["allow Object.* *"] parser = argparse.ArgumentParser() parser.add_argument('--size', help='Upload objects size in kb') parser.add_argument('--containers', help='Number of containers to create') parser.add_argument('--retry', default=20, help='Maximum number of retries to create a container') parser.add_argument('--out', help='JSON file with output') parser.add_argument('--preload_obj', help='Number of pre-loaded objects') parser.add_argument('--wallet', help='Wallet file path') parser.add_argument('--config', help='Wallet config file path') parser.add_argument( "--policy", help=f"Container placement policy. Default is {DEFAULT_POLICY}", action="append" ) parser.add_argument('--endpoint', help='Nodes addresses separated by comma.') parser.add_argument('--update', help='Save existed containers') parser.add_argument('--ignore-errors', help='Ignore preset errors', action='store_true') parser.add_argument('--workers', help='Count of workers in preset. Max = 50, Default = 50', default=50) parser.add_argument('--sleep', help='Time to sleep between containers creation and objects upload (in seconds), ' 'Default = 8', default=8) parser.add_argument('--local', help='Create containers that store data on provided endpoints. Warning: additional empty containers may be created.', action='store_true') parser.add_argument( '--rule', help='Rule attached to created containers. All entries of CONTAINER_ID will be replaced with id of created container.', action="append") args: Namespace = parser.parse_args() print(args) def main(): containers = [] objects_list = [] endpoints = args.endpoint.split(',') if not args.policy: args.policy = [DEFAULT_POLICY] container_creation_retry = args.retry wallet = args.wallet wallet_config = args.config workers = int(args.workers) objects_per_container = int(args.preload_obj) rules = args.rule if not rules: rules = DEFAULT_RULES ignore_errors = args.ignore_errors if args.update: # Open file with open(args.out) as f: data_json = json.load(f) containers = data_json['containers'] containers_count = len(containers) else: containers_count = int(args.containers) print(f"Create containers: {containers_count}") with ProcessPoolExecutor(max_workers=min(MAX_WORKERS, workers)) as executor: containers_runs = [executor.submit(create_container, endpoint, policy, container_creation_retry, wallet, wallet_config, rules, args.local) for _, endpoint, policy in zip(range(containers_count), cycle(endpoints), cycle(args.policy))] for run in containers_runs: container_id = run.result() if container_id: containers.append(container_id) print("Create containers: Completed") print(f" > Containers: {containers}") if containers_count > 0 and len(containers) != containers_count: print(f"Containers mismatch in preset: expected {containers_count}, created {len(containers)}") if not ignore_errors: sys.exit(ERROR_WRONG_CONTAINERS_COUNT) if args.sleep != 0: print(f"Sleep for {args.sleep} seconds") time.sleep(args.sleep) print(f"Upload objects to each container: {args.preload_obj} ") payload_file = tempfile.NamedTemporaryFile() random_payload(payload_file, args.size) print(" > Create random payload: Completed") total_objects = objects_per_container * containers_count with ProcessPoolExecutor(max_workers=min(MAX_WORKERS, workers)) as executor: objects_runs = [executor.submit(upload_object, container, payload_file.name, endpoint, wallet, wallet_config) for _, container, endpoint in zip(range(total_objects), cycle(containers), cycle(endpoints))] for run in objects_runs: result = run.result() if result: container_id = result[0] endpoint = result[1] object_id = result[2] objects_list.append({'container': container_id, 'object': object_id}) print(f" > Uploaded object {object_id} for container {container_id} via endpoint {endpoint}.") print("Upload objects to each container: Completed") if total_objects > 0 and len(objects_list) != total_objects: print(f"Objects mismatch in preset: expected {total_objects}, created {len(objects_list)}") if not ignore_errors: sys.exit(ERROR_WRONG_OBJECTS_COUNT) data = {'containers': containers, 'objects': objects_list, 'obj_size': args.size + " Kb"} with open(args.out, 'w+') as f: json.dump(data, f, ensure_ascii=False, indent=2) print("Result:") print(f" > Total Containers has been created: {len(containers)}.") print(f" > Total Objects has been created: {len(objects_list)}.") if __name__ == "__main__": main()