From 22e97339c97a994ec1bce6ebcdad75dd55b12380 Mon Sep 17 00:00:00 2001
From: "a.chetaev" <alex.chetaev@gmail.com>
Date: Fri, 18 Nov 2022 13:27:57 +0300
Subject: [PATCH] Add script to check policy complience

Signed-off-by: a.chetaev <alex.chetaev@gmail.com>
---
 scenarios/preset/check_objects_in_preset.py |  4 +-
 scenarios/preset/check_policy_compliance.py | 86 +++++++++++++++++++++
 scenarios/preset/helpers/__init__.py        |  0
 scenarios/preset/helpers/cmd.py             | 19 +++++
 scenarios/preset/helpers/neofs_cli.py       | 34 ++++++--
 scenarios/preset/preset_grpc.py             | 15 ++--
 6 files changed, 143 insertions(+), 15 deletions(-)
 create mode 100755 scenarios/preset/check_policy_compliance.py
 create mode 100644 scenarios/preset/helpers/__init__.py

diff --git a/scenarios/preset/check_objects_in_preset.py b/scenarios/preset/check_objects_in_preset.py
index 4c03592..a68ed6c 100755
--- a/scenarios/preset/check_objects_in_preset.py
+++ b/scenarios/preset/check_objects_in_preset.py
@@ -7,13 +7,13 @@ from helpers.neofs_cli import get_object
 
 parser = argparse.ArgumentParser()
 parser.add_argument('--endpoint', help='Node address')
-parser.add_argument('--preset_json', help='JSON file path with preset')
+parser.add_argument('--preset_file', help='JSON file path with preset')
 
 args = parser.parse_args()
 
 
 def main():
-    with open(args.preset_json) as f:
+    with open(args.preset_file) as f:
         preset_text = f.read()
 
     preset = json.loads(preset_text)
diff --git a/scenarios/preset/check_policy_compliance.py b/scenarios/preset/check_policy_compliance.py
new file mode 100755
index 0000000..89c45f9
--- /dev/null
+++ b/scenarios/preset/check_policy_compliance.py
@@ -0,0 +1,86 @@
+#!/usr/bin/python3
+
+import argparse
+import json
+
+from argparse import Namespace
+from collections import Counter
+from concurrent.futures import ProcessPoolExecutor
+
+from helpers.cmd import ProgressBar
+from helpers.neofs_cli import search_object_by_id
+
+parser = argparse.ArgumentParser()
+parser.add_argument('--endpoints', help='Node address')
+parser.add_argument('--expected_copies', help="Expected amount of object copies")
+parser.add_argument('--preset_file', help='JSON file path with preset')
+parser.add_argument('--max_workers', help='Max workers in parallel', default=50)
+parser.add_argument('--print_failed', help='Print failed objects', default=False)
+
+
+args: Namespace = parser.parse_args()
+print(args)
+
+
+def main():
+    success_objs = 0
+    failed_objs = 0
+
+    with open(args.preset_file) as f:
+        preset_text = f.read()
+
+    preset_json = json.loads(preset_text)
+
+    objs = preset_json.get('objects')
+    objs_len = len(objs)
+
+    endpoints = args.endpoints.split(',')
+
+    final_discrubution = Counter(dict.fromkeys(endpoints, 0))
+
+    with ProcessPoolExecutor(max_workers=50) as executor:
+        search_runs = {executor.submit(check_object_amounts, obj.get('container'), obj.get('object'), endpoints,
+                                       int(args.expected_copies)): obj for obj in objs}
+
+        ProgressBar.start()
+
+        for run in search_runs:
+            result, distribution = run.result()
+
+            if result:
+                success_objs += 1
+            else:
+                failed_objs += 1
+
+            final_discrubution += distribution
+
+            ProgressBar.print(success_objs + failed_objs, objs_len)
+
+        ProgressBar.end()
+
+    print(f'Success objects: {success_objs}')
+    print(f'Failed objects: {failed_objs}')
+    for endpoint in endpoints:
+        print(f'{endpoint}: {final_discrubution[endpoint]}')
+
+
+def check_object_amounts(cid, oid, endpoints, expected_copies):
+    distribution = Counter(dict.fromkeys(endpoints, 0))
+
+    copies_in_cluster = 0
+
+    for endpoint in endpoints:
+        copy_on_endpoint = search_object_by_id(cid, oid, endpoint, ttl=1)
+
+        copies_in_cluster += int(copy_on_endpoint)
+
+        distribution[endpoint] += int(copy_on_endpoint)
+
+    if copies_in_cluster != expected_copies and args.print_failed:
+        print(f' > Wrong copies for object {oid} in container {cid}. Copies: {copies_in_cluster}')
+
+    return copies_in_cluster == expected_copies, distribution
+
+
+if __name__ == "__main__":
+    main()
diff --git a/scenarios/preset/helpers/__init__.py b/scenarios/preset/helpers/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/scenarios/preset/helpers/cmd.py b/scenarios/preset/helpers/cmd.py
index 229d8b1..19986eb 100644
--- a/scenarios/preset/helpers/cmd.py
+++ b/scenarios/preset/helpers/cmd.py
@@ -1,5 +1,6 @@
 import os
 import shlex
+import sys
 
 from subprocess import check_output, CalledProcessError, STDOUT
 
@@ -21,3 +22,21 @@ def execute_cmd(cmd_line):
 def random_payload(payload_filepath, size):
     with open('%s' % payload_filepath, 'w+b') as fout:
         fout.write(os.urandom(1024 * int(size)))
+
+
+class ProgressBar:
+    @staticmethod
+    def start():
+        sys.stdout.write('\r\n\r\n')
+
+    @staticmethod
+    def print(current, goal):
+        finish_percent = current / goal
+        sys.stdout.write('\r')
+        sys.stdout.write(f" > Progress: [{'=' * int(30 * finish_percent)}{' ' * (29 - int(30 * finish_percent))}>]"
+                         f" {current}/{goal}")
+        sys.stdout.flush()
+
+    @staticmethod
+    def end():
+        sys.stdout.write('\r\n\r\n')
diff --git a/scenarios/preset/helpers/neofs_cli.py b/scenarios/preset/helpers/neofs_cli.py
index b45038d..4ba7ba4 100644
--- a/scenarios/preset/helpers/neofs_cli.py
+++ b/scenarios/preset/helpers/neofs_cli.py
@@ -1,3 +1,5 @@
+import re
+
 from helpers.cmd import execute_cmd
 
 
@@ -29,17 +31,17 @@ def upload_object(container, payload_filepath, endpoint):
     object_name = ""
     cmd_line = f"neofs-cli --rpc-endpoint {endpoint} object put -g --file {payload_filepath} " \
                f"--cid {container} --no-progress"
-    out, success = execute_cmd(cmd_line)
+    output, success = execute_cmd(cmd_line)
 
     if not success:
-        print(f" > Object {object_name} has not been uploaded:\n{out}")
+        print(f" > Object {object_name} has not been uploaded:\n{output}")
         return False
     else:
         try:
             # taking second string from command output
-            snd_str = out.split('\n')[1]
+            snd_str = output.split('\n')[1]
         except Exception:
-            print(f"Got empty input: {out}")
+            print(f"Got empty input: {output}")
             return False
         splitted = snd_str.split(": ")
         if len(splitted) != 2:
@@ -51,11 +53,29 @@ def get_object(cid, oid, endpoint, out_filepath):
     cmd_line = f"neofs-cli object get -r {endpoint} -g --cid {cid} --oid {oid} " \
                f"--file {out_filepath}"
 
-    out, success = execute_cmd(cmd_line)
+    output, success = execute_cmd(cmd_line)
 
     if not success:
-        print(f" > Failed to get object {oid} from container {cid} \r\n"
-              f" > Error: {out}")
+        print(f" > Failed to get object {output} from container {cid} \r\n"
+              f" > Error: {output}")
         return False
 
     return True
+
+
+def search_object_by_id(cid, oid, endpoint, ttl=2):
+    cmd_line = f"neofs-cli object search --ttl {ttl} -r {endpoint} -g --cid {cid} --oid {oid}"
+
+    output, success = execute_cmd(cmd_line)
+
+    if not success:
+        print(f" > Failed to search object {oid} for container {cid} \r\n"
+              f" > Error: {output}")
+        return False
+
+    re_rst = re.search(r'Found (\d+) objects', output)
+
+    if not re_rst:
+        raise Exception("Failed to parce search results")
+
+    return re_rst.group(1)
diff --git a/scenarios/preset/preset_grpc.py b/scenarios/preset/preset_grpc.py
index 8635464..a6a87fe 100755
--- a/scenarios/preset/preset_grpc.py
+++ b/scenarios/preset/preset_grpc.py
@@ -2,8 +2,9 @@
 
 import argparse
 import json
-from argparse import Namespace
+import random
 
+from argparse import Namespace
 from concurrent.futures import ProcessPoolExecutor
 
 from helpers.cmd import random_payload
@@ -31,6 +32,8 @@ def main():
     objects_struct = []
     payload_filepath = '/tmp/data_file'
 
+    endpoints = args.endpoint.split(',')
+
     if args.update:
         # Open file
         with open(args.out) as f:
@@ -38,9 +41,9 @@ def main():
             container_list = data_json['containers']
     else:
         print(f"Create containers: {args.containers}")
-        with ProcessPoolExecutor(max_workers=10) as executor:
-            containers_runs = {executor.submit(create_container, args.endpoint, args.policy): _ for _ in
-                               range(int(args.containers))}
+        with ProcessPoolExecutor(max_workers=50) as executor:
+            containers_runs = {executor.submit(create_container, endpoints[random.randrange(len(endpoints))],
+                                               args.policy): _ for _ in range(int(args.containers))}
 
         for run in containers_runs:
             if run.result():
@@ -59,8 +62,8 @@ def main():
     for container in container_list:
         print(f" > Upload objects for container {container}")
         with ProcessPoolExecutor(max_workers=50) as executor:
-            objects_runs = {executor.submit(upload_object, container, payload_filepath, args.endpoint): _ for _ in
-                            range(int(args.preload_obj))}
+            objects_runs = {executor.submit(upload_object, container, payload_filepath,
+                              endpoints[random.randrange(len(endpoints))]): _ for _ in range(int(args.preload_obj))}
 
         for run in objects_runs:
             if run.result():