xk6-frostfs/scenarios/preset/preset_grpc.py

131 lines
5.3 KiB
Python
Executable file

#!/usr/bin/python3
import argparse
from itertools import cycle
import json
import sys
import tempfile
import time
from argparse import Namespace
from concurrent.futures import ProcessPoolExecutor
from helpers.cmd import random_payload
from helpers.frostfs_cli import create_container, upload_object
ERROR_WRONG_CONTAINERS_COUNT = 1
ERROR_WRONG_OBJECTS_COUNT = 2
MAX_WORKERS = 50
DEFAULT_POLICY = "REP 2 IN X CBF 2 SELECT 2 FROM * AS X"
parser = argparse.ArgumentParser()
parser.add_argument('--size', help='Upload objects size in kb')
parser.add_argument('--containers', help='Number of containers to create')
parser.add_argument('--retry', default=20, help='Maximum number of retries to create a container')
parser.add_argument('--out', help='JSON file with output')
parser.add_argument('--preload_obj', help='Number of pre-loaded objects')
parser.add_argument('--wallet', help='Wallet file path')
parser.add_argument('--config', help='Wallet config file path')
parser.add_argument(
"--policy",
help=f"Container placement policy. Default is {DEFAULT_POLICY}",
action="append"
)
parser.add_argument('--endpoint', help='Nodes addresses separated by comma.')
parser.add_argument('--update', help='Save existed containers')
parser.add_argument('--ignore-errors', help='Ignore preset errors', action='store_true')
parser.add_argument('--workers', help='Count of workers in preset. Max = 50, Default = 50', default=50)
parser.add_argument('--sleep', help='Time to sleep between containers creation and objects upload (in seconds), '
'Default = 8', default=8)
parser.add_argument('--local', help='Create containers that store data on provided endpoints. Warning: additional empty containers may be created.', action='store_true')
parser.add_argument('--acl', help='Container ACL. Default is public-read-write.', default='public-read-write')
args: Namespace = parser.parse_args()
print(args)
def main():
containers = []
objects_list = []
endpoints = args.endpoint.split(',')
if not args.policy:
args.policy = [DEFAULT_POLICY]
container_creation_retry = args.retry
wallet = args.wallet
wallet_config = args.config
workers = int(args.workers)
objects_per_container = int(args.preload_obj)
ignore_errors = args.ignore_errors
if args.update:
# Open file
with open(args.out) as f:
data_json = json.load(f)
containers = data_json['containers']
containers_count = len(containers)
else:
containers_count = int(args.containers)
print(f"Create containers: {containers_count}")
with ProcessPoolExecutor(max_workers=min(MAX_WORKERS, workers)) as executor:
containers_runs = [executor.submit(create_container, endpoint, policy, container_creation_retry, wallet, wallet_config, args.acl, args.local)
for _, endpoint, policy in
zip(range(containers_count), cycle(endpoints), cycle(args.policy))]
for run in containers_runs:
container_id = run.result()
if container_id:
containers.append(container_id)
print("Create containers: Completed")
print(f" > Containers: {containers}")
if containers_count > 0 and len(containers) != containers_count:
print(f"Containers mismatch in preset: expected {containers_count}, created {len(containers)}")
if not ignore_errors:
sys.exit(ERROR_WRONG_CONTAINERS_COUNT)
if args.sleep != 0:
print(f"Sleep for {args.sleep} seconds")
time.sleep(args.sleep)
print(f"Upload objects to each container: {args.preload_obj} ")
payload_file = tempfile.NamedTemporaryFile()
random_payload(payload_file, args.size)
print(" > Create random payload: Completed")
total_objects = objects_per_container * containers_count
with ProcessPoolExecutor(max_workers=min(MAX_WORKERS, workers)) as executor:
objects_runs = [executor.submit(upload_object, container, payload_file.name,
endpoint, wallet, wallet_config)
for _, container, endpoint in
zip(range(total_objects), cycle(containers), cycle(endpoints))]
for run in objects_runs:
result = run.result()
if result:
container_id = result[0]
endpoint = result[1]
object_id = result[2]
objects_list.append({'container': container_id, 'object': object_id})
print(f" > Uploaded object {object_id} for container {container_id} via endpoint {endpoint}.")
print("Upload objects to each container: Completed")
if total_objects > 0 and len(objects_list) != total_objects:
print(f"Objects mismatch in preset: expected {total_objects}, created {len(objects_list)}")
if not ignore_errors:
sys.exit(ERROR_WRONG_OBJECTS_COUNT)
data = {'containers': containers, 'objects': objects_list, 'obj_size': args.size + " Kb"}
with open(args.out, 'w+') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
print("Result:")
print(f" > Total Containers has been created: {len(containers)}.")
print(f" > Total Objects has been created: {len(objects_list)}.")
if __name__ == "__main__":
main()