[#12] Allow using multiple endpoints for presets #76
4 changed files with 90 additions and 75 deletions
|
@ -28,6 +28,7 @@ def create_bucket(endpoint, versioning, location):
|
||||||
else:
|
else:
|
||||||
print(f" > Bucket versioning has been applied.")
|
print(f" > Bucket versioning has been applied.")
|
||||||
|
|
||||||
|
print(f"Created bucket: {bucket_name} via endpoint {endpoint}")
|
||||||
return bucket_name
|
return bucket_name
|
||||||
|
|
||||||
|
|
||||||
|
@ -41,5 +42,5 @@ def upload_object(bucket, payload_filepath, endpoint):
|
||||||
if not success:
|
if not success:
|
||||||
print(f" > Object {object_name} has not been uploaded.")
|
print(f" > Object {object_name} has not been uploaded.")
|
||||||
return False
|
return False
|
||||||
else:
|
|
||||||
return object_name
|
return bucket, endpoint, object_name
|
||||||
|
|
|
@ -12,19 +12,19 @@ def create_container(endpoint, policy, wallet_file, wallet_config):
|
||||||
if not success:
|
if not success:
|
||||||
print(f" > Container has not been created:\n{output}")
|
print(f" > Container has not been created:\n{output}")
|
||||||
return False
|
return False
|
||||||
else:
|
|
||||||
try:
|
|
||||||
fst_str = output.split('\n')[0]
|
|
||||||
except Exception:
|
|
||||||
print(f"Got empty output: {output}")
|
|
||||||
return False
|
|
||||||
splitted = fst_str.split(": ")
|
|
||||||
if len(splitted) != 2:
|
|
||||||
raise ValueError(f"no CID was parsed from command output: \t{fst_str}")
|
|
||||||
|
|
||||||
print(f"Created container: {splitted[1]}")
|
try:
|
||||||
|
fst_str = output.split('\n')[0]
|
||||||
|
except Exception:
|
||||||
|
print(f"Got empty output: {output}")
|
||||||
|
return False
|
||||||
|
splitted = fst_str.split(": ")
|
||||||
|
if len(splitted) != 2:
|
||||||
|
raise ValueError(f"no CID was parsed from command output: \t{fst_str}")
|
||||||
|
|
||||||
return splitted[1]
|
print(f"Created container: {splitted[1]} via endpoint {endpoint}")
|
||||||
|
|
||||||
|
return splitted[1]
|
||||||
|
|
||||||
|
|
||||||
def upload_object(container, payload_filepath, endpoint, wallet_file, wallet_config):
|
def upload_object(container, payload_filepath, endpoint, wallet_file, wallet_config):
|
||||||
|
@ -36,17 +36,17 @@ def upload_object(container, payload_filepath, endpoint, wallet_file, wallet_con
|
||||||
if not success:
|
if not success:
|
||||||
print(f" > Object {object_name} has not been uploaded:\n{output}")
|
print(f" > Object {object_name} has not been uploaded:\n{output}")
|
||||||
return False
|
return False
|
||||||
else:
|
|
||||||
try:
|
try:
|
||||||
# taking second string from command output
|
# taking second string from command output
|
||||||
snd_str = output.split('\n')[1]
|
snd_str = output.split('\n')[1]
|
||||||
except Exception:
|
except Exception:
|
||||||
print(f"Got empty input: {output}")
|
print(f"Got empty input: {output}")
|
||||||
return False
|
return False
|
||||||
splitted = snd_str.split(": ")
|
splitted = snd_str.split(": ")
|
||||||
if len(splitted) != 2:
|
if len(splitted) != 2:
|
||||||
raise Exception(f"no OID was parsed from command output: \t{snd_str}")
|
raise Exception(f"no OID was parsed from command output: \t{snd_str}")
|
||||||
return splitted[1]
|
return container, endpoint, splitted[1]
|
||||||
|
|
||||||
|
|
||||||
def get_object(cid, oid, endpoint, out_filepath, wallet_file, wallet_config):
|
def get_object(cid, oid, endpoint, out_filepath, wallet_file, wallet_config):
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
#!/usr/bin/python3
|
#!/usr/bin/python3
|
||||||
|
|
||||||
import argparse
|
import argparse
|
||||||
|
from itertools import cycle
|
||||||
import json
|
import json
|
||||||
import random
|
import random
|
||||||
import sys
|
import sys
|
||||||
|
@ -29,7 +30,7 @@ parser.add_argument(
|
||||||
help="Container placement policy",
|
help="Container placement policy",
|
||||||
default="REP 2 IN X CBF 2 SELECT 2 FROM * AS X"
|
default="REP 2 IN X CBF 2 SELECT 2 FROM * AS X"
|
||||||
)
|
)
|
||||||
parser.add_argument('--endpoint', help='Node address')
|
parser.add_argument('--endpoint', help='Nodes addresses separated by comma.')
|
||||||
parser.add_argument('--update', help='Save existed containers')
|
parser.add_argument('--update', help='Save existed containers')
|
||||||
parser.add_argument('--ignore-errors', help='Ignore preset errors')
|
parser.add_argument('--ignore-errors', help='Ignore preset errors')
|
||||||
parser.add_argument('--workers', help='Count of workers in preset. Max = 50, Default = 50', default=50)
|
parser.add_argument('--workers', help='Count of workers in preset. Max = 50, Default = 50', default=50)
|
||||||
|
@ -41,7 +42,7 @@ print(args)
|
||||||
|
|
||||||
|
|
||||||
def main():
|
def main():
|
||||||
container_list = []
|
containers = []
|
||||||
objects_list = []
|
objects_list = []
|
||||||
|
|
||||||
endpoints = args.endpoint.split(',')
|
endpoints = args.endpoint.split(',')
|
||||||
|
@ -56,24 +57,26 @@ def main():
|
||||||
# Open file
|
# Open file
|
||||||
with open(args.out) as f:
|
with open(args.out) as f:
|
||||||
data_json = json.load(f)
|
data_json = json.load(f)
|
||||||
container_list = data_json['containers']
|
containers = data_json['containers']
|
||||||
containers_count = len(container_list)
|
containers_count = len(containers)
|
||||||
else:
|
else:
|
||||||
containers_count = int(args.containers)
|
containers_count = int(args.containers)
|
||||||
print(f"Create containers: {containers_count}")
|
print(f"Create containers: {containers_count}")
|
||||||
with ProcessPoolExecutor(max_workers=min(MAX_WORKERS, workers)) as executor:
|
with ProcessPoolExecutor(max_workers=min(MAX_WORKERS, workers)) as executor:
|
||||||
containers_runs = {executor.submit(create_container, endpoints[random.randrange(len(endpoints))],
|
containers_runs = [executor.submit(create_container, endpoint, args.policy, wallet, wallet_config)
|
||||||
args.policy, wallet, wallet_config): _ for _ in range(containers_count)}
|
for _, endpoint in
|
||||||
|
zip(range(containers_count), cycle(endpoints))]
|
||||||
|
|
||||||
for run in containers_runs:
|
for run in containers_runs:
|
||||||
if run.result():
|
container_id = run.result()
|
||||||
container_list.append(run.result())
|
if container_id:
|
||||||
|
containers.append(container_id)
|
||||||
|
|
||||||
print("Create containers: Completed")
|
print("Create containers: Completed")
|
||||||
|
|
||||||
print(f" > Containers: {container_list}")
|
print(f" > Containers: {containers}")
|
||||||
if containers_count == 0 or len(container_list) != containers_count:
|
if containers_count == 0 or len(containers) != containers_count:
|
||||||
print(f"Containers mismatch in preset: expected {containers_count}, created {len(container_list)}")
|
print(f"Containers mismatch in preset: expected {containers_count}, created {len(containers)}")
|
||||||
if not ignore_errors:
|
if not ignore_errors:
|
||||||
sys.exit(ERROR_WRONG_CONTAINERS_COUNT)
|
sys.exit(ERROR_WRONG_CONTAINERS_COUNT)
|
||||||
|
|
||||||
|
@ -86,32 +89,36 @@ def main():
|
||||||
random_payload(payload_file, args.size)
|
random_payload(payload_file, args.size)
|
||||||
print(" > Create random payload: Completed")
|
print(" > Create random payload: Completed")
|
||||||
|
|
||||||
for container in container_list:
|
total_objects = objects_per_container * containers_count
|
||||||
print(f" > Upload objects for container {container}")
|
with ProcessPoolExecutor(max_workers=min(MAX_WORKERS, workers)) as executor:
|
||||||
with ProcessPoolExecutor(max_workers=min(MAX_WORKERS, workers)) as executor:
|
objects_runs = [executor.submit(upload_object, container, payload_file.name,
|
||||||
objects_runs = {executor.submit(upload_object, container, payload_file.name,
|
endpoint, wallet, wallet_config)
|
||||||
endpoints[random.randrange(len(endpoints))], wallet, wallet_config): _ for _ in range(objects_per_container)}
|
for _, container, endpoint in
|
||||||
|
zip(range(total_objects), cycle(containers), cycle(endpoints))]
|
||||||
|
|
||||||
for run in objects_runs:
|
for run in objects_runs:
|
||||||
if run.result():
|
result = run.result()
|
||||||
objects_list.append({'container': container, 'object': run.result()})
|
if run.result:
|
||||||
print(f" > Upload objects for container {container}: Completed")
|
container_id = result[0]
|
||||||
|
endpoint = result[1]
|
||||||
|
object_id = result[2]
|
||||||
|
objects_list.append({'container': container_id, 'object': object_id})
|
||||||
|
print(f" > Uploaded object {object_id} for container {container_id} via endpoint {endpoint}.")
|
||||||
|
|
||||||
print("Upload objects to each container: Completed")
|
print("Upload objects to each container: Completed")
|
||||||
|
|
||||||
total_objects = objects_per_container * containers_count
|
|
||||||
if total_objects > 0 and len(objects_list) != total_objects:
|
if total_objects > 0 and len(objects_list) != total_objects:
|
||||||
print(f"Objects mismatch in preset: expected {total_objects}, created {len(objects_list)}")
|
print(f"Objects mismatch in preset: expected {total_objects}, created {len(objects_list)}")
|
||||||
if not ignore_errors:
|
if not ignore_errors:
|
||||||
sys.exit(ERROR_WRONG_OBJECTS_COUNT)
|
sys.exit(ERROR_WRONG_OBJECTS_COUNT)
|
||||||
|
|
||||||
data = {'containers': container_list, 'objects': objects_list, 'obj_size': args.size + " Kb"}
|
data = {'containers': containers, 'objects': objects_list, 'obj_size': args.size + " Kb"}
|
||||||
|
|
||||||
with open(args.out, 'w+') as f:
|
with open(args.out, 'w+') as f:
|
||||||
json.dump(data, f, ensure_ascii=False, indent=2)
|
json.dump(data, f, ensure_ascii=False, indent=2)
|
||||||
|
|
||||||
print("Result:")
|
print("Result:")
|
||||||
print(f" > Total Containers has been created: {len(container_list)}.")
|
print(f" > Total Containers has been created: {len(containers)}.")
|
||||||
print(f" > Total Objects has been created: {len(objects_list)}.")
|
print(f" > Total Objects has been created: {len(objects_list)}.")
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
#!/usr/bin/python3
|
#!/usr/bin/python3
|
||||||
|
|
||||||
import argparse
|
import argparse
|
||||||
|
from itertools import cycle
|
||||||
import json
|
import json
|
||||||
import sys
|
import sys
|
||||||
import tempfile
|
import tempfile
|
||||||
|
@ -16,7 +17,7 @@ parser.add_argument('--size', help='Upload objects size in kb.')
|
||||||
parser.add_argument('--buckets', help='Number of buckets to create.')
|
parser.add_argument('--buckets', help='Number of buckets to create.')
|
||||||
parser.add_argument('--out', help='JSON file with output.')
|
parser.add_argument('--out', help='JSON file with output.')
|
||||||
parser.add_argument('--preload_obj', help='Number of pre-loaded objects.')
|
parser.add_argument('--preload_obj', help='Number of pre-loaded objects.')
|
||||||
parser.add_argument('--endpoint', help='S3 Gateway address.')
|
parser.add_argument('--endpoint', help='S3 Gateways addresses separated by comma.')
|
||||||
parser.add_argument('--update', help='True/False, False by default. Save existed buckets from target file (--out). '
|
parser.add_argument('--update', help='True/False, False by default. Save existed buckets from target file (--out). '
|
||||||
'New buckets will not be created.')
|
'New buckets will not be created.')
|
||||||
parser.add_argument('--location', help='AWS location. Will be empty, if has not be declared.', default="")
|
parser.add_argument('--location', help='AWS location. Will be empty, if has not be declared.', default="")
|
||||||
|
@ -34,10 +35,12 @@ ERROR_WRONG_OBJECTS_COUNT = 2
|
||||||
MAX_WORKERS = 50
|
MAX_WORKERS = 50
|
||||||
|
|
||||||
def main():
|
def main():
|
||||||
bucket_list = []
|
buckets = []
|
||||||
objects_list = []
|
objects_list = []
|
||||||
ignore_errors = True if args.ignore_errors else False
|
ignore_errors = True if args.ignore_errors else False
|
||||||
|
|
||||||
|
endpoints = args.endpoint.split(',')
|
||||||
|
|
||||||
workers = int(args.workers)
|
workers = int(args.workers)
|
||||||
objects_per_bucket = int(args.preload_obj)
|
objects_per_bucket = int(args.preload_obj)
|
||||||
|
|
||||||
|
@ -45,26 +48,28 @@ def main():
|
||||||
# Open file
|
# Open file
|
||||||
with open(args.out) as f:
|
with open(args.out) as f:
|
||||||
data_json = json.load(f)
|
data_json = json.load(f)
|
||||||
bucket_list = data_json['buckets']
|
buckets = data_json['buckets']
|
||||||
buckets_count = len(bucket_list)
|
buckets_count = len(buckets)
|
||||||
# Get CID list
|
# Get CID list
|
||||||
else:
|
else:
|
||||||
buckets_count = int(args.buckets)
|
buckets_count = int(args.buckets)
|
||||||
print(f"Create buckets: {buckets_count}")
|
print(f"Create buckets: {buckets_count}")
|
||||||
|
|
||||||
with ProcessPoolExecutor(max_workers=min(MAX_WORKERS, workers)) as executor:
|
with ProcessPoolExecutor(max_workers=min(MAX_WORKERS, workers)) as executor:
|
||||||
buckets_runs = {executor.submit(create_bucket, args.endpoint, args.versioning,
|
buckets_runs = [executor.submit(create_bucket, endpoint, args.versioning, args.location)
|
||||||
args.location): _ for _ in range(buckets_count)}
|
for _, endpoint in
|
||||||
|
zip(range(buckets_count), cycle(endpoints))]
|
||||||
|
|
||||||
for run in buckets_runs:
|
for run in buckets_runs:
|
||||||
if run.result():
|
bucket_name = run.result()
|
||||||
bucket_list.append(run.result())
|
if bucket_name:
|
||||||
|
buckets.append(bucket_name)
|
||||||
|
|
||||||
print("Create buckets: Completed")
|
print("Create buckets: Completed")
|
||||||
|
|
||||||
print(f" > Buckets: {bucket_list}")
|
print(f" > Buckets: {buckets}")
|
||||||
if buckets_count == 0 or len(bucket_list) != buckets_count:
|
if buckets_count == 0 or len(buckets) != buckets_count:
|
||||||
print(f"Buckets mismatch in preset: expected {buckets_count}, created {len(bucket_list)}")
|
print(f"Buckets mismatch in preset: expected {buckets_count}, created {len(buckets)}")
|
||||||
if not ignore_errors:
|
if not ignore_errors:
|
||||||
sys.exit(ERROR_WRONG_CONTAINERS_COUNT)
|
sys.exit(ERROR_WRONG_CONTAINERS_COUNT)
|
||||||
|
|
||||||
|
@ -77,32 +82,34 @@ def main():
|
||||||
random_payload(payload_file, args.size)
|
random_payload(payload_file, args.size)
|
||||||
print(" > Create random payload: Completed")
|
print(" > Create random payload: Completed")
|
||||||
|
|
||||||
for bucket in bucket_list:
|
|
||||||
print(f" > Upload objects for bucket {bucket}")
|
|
||||||
with ProcessPoolExecutor(max_workers=min(MAX_WORKERS, workers)) as executor:
|
|
||||||
objects_runs = {executor.submit(upload_object, bucket, payload_file.name,
|
|
||||||
args.endpoint): _ for _ in range(objects_per_bucket)}
|
|
||||||
|
|
||||||
for run in objects_runs:
|
|
||||||
if run.result():
|
|
||||||
objects_list.append({'bucket': bucket, 'object': run.result()})
|
|
||||||
print(f" > Upload objects for bucket {bucket}: Completed")
|
|
||||||
|
|
||||||
print("Upload objects to each bucket: Completed")
|
|
||||||
|
|
||||||
total_objects = objects_per_bucket * buckets_count
|
total_objects = objects_per_bucket * buckets_count
|
||||||
|
|
||||||
|
with ProcessPoolExecutor(max_workers=min(MAX_WORKERS, workers)) as executor:
|
||||||
|
objects_runs = [executor.submit(upload_object, bucket, payload_file.name, endpoint)
|
||||||
|
for _, bucket, endpoint in
|
||||||
|
zip(range(total_objects), cycle(buckets), cycle(endpoints))]
|
||||||
|
|
||||||
|
for run in objects_runs:
|
||||||
|
result = run.result()
|
||||||
|
if run.result:
|
||||||
|
bucket = result[0]
|
||||||
|
endpoint = result[1]
|
||||||
|
object_id = result[2]
|
||||||
|
objects_list.append({'bucket': bucket, 'object': object_id})
|
||||||
|
print(f" > Uploaded object {object_id} for bucket {bucket} via endpoint {endpoint}.")
|
||||||
|
|
||||||
if total_objects > 0 and len(objects_list) != total_objects:
|
if total_objects > 0 and len(objects_list) != total_objects:
|
||||||
print(f"Objects mismatch in preset: expected {total_objects}, created {len(objects_list)}")
|
print(f"Objects mismatch in preset: expected {total_objects}, created {len(objects_list)}")
|
||||||
if not ignore_errors:
|
if not ignore_errors:
|
||||||
sys.exit(ERROR_WRONG_OBJECTS_COUNT)
|
sys.exit(ERROR_WRONG_OBJECTS_COUNT)
|
||||||
|
|
||||||
data = {'buckets': bucket_list, 'objects': objects_list, 'obj_size': args.size + " Kb"}
|
data = {'buckets': buckets, 'objects': objects_list, 'obj_size': args.size + " Kb"}
|
||||||
|
|
||||||
with open(args.out, 'w+') as f:
|
with open(args.out, 'w+') as f:
|
||||||
json.dump(data, f, ensure_ascii=False, indent=2)
|
json.dump(data, f, ensure_ascii=False, indent=2)
|
||||||
|
|
||||||
print("Result:")
|
print("Result:")
|
||||||
print(f" > Total Buckets has been created: {len(bucket_list)}.")
|
print(f" > Total Buckets has been created: {len(buckets)}.")
|
||||||
print(f" > Total Objects has been created: {len(objects_list)}.")
|
print(f" > Total Objects has been created: {len(objects_list)}.")
|
||||||
|
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue