xk6 improvements #6
5 changed files with 59 additions and 29 deletions
|
@ -137,7 +137,6 @@ func (c *Client) Delete(containerID string, objectID string) DeleteResponse {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
stats.Report(c.vu, objDeleteTotal, 1)
|
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
|
|
||||||
var prm client.PrmObjectDelete
|
var prm client.PrmObjectDelete
|
||||||
|
@ -151,6 +150,7 @@ func (c *Client) Delete(containerID string, objectID string) DeleteResponse {
|
||||||
return DeleteResponse{Success: false, Error: err.Error()}
|
return DeleteResponse{Success: false, Error: err.Error()}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
stats.Report(c.vu, objDeleteTotal, 1)
|
||||||
stats.Report(c.vu, objDeleteDuration, metrics.D(time.Since(start)))
|
stats.Report(c.vu, objDeleteDuration, metrics.D(time.Since(start)))
|
||||||
return DeleteResponse{Success: true}
|
return DeleteResponse{Success: true}
|
||||||
}
|
}
|
||||||
|
@ -168,7 +168,6 @@ func (c *Client) Get(containerID, objectID string) GetResponse {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
stats.Report(c.vu, objGetTotal, 1)
|
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
|
|
||||||
var prm client.PrmObjectGet
|
var prm client.PrmObjectGet
|
||||||
|
@ -185,6 +184,7 @@ func (c *Client) Get(containerID, objectID string) GetResponse {
|
||||||
return GetResponse{Success: false, Error: err.Error()}
|
return GetResponse{Success: false, Error: err.Error()}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
stats.Report(c.vu, objGetTotal, 1)
|
||||||
stats.Report(c.vu, objGetDuration, metrics.D(time.Since(start)))
|
stats.Report(c.vu, objGetDuration, metrics.D(time.Since(start)))
|
||||||
stats.ReportDataReceived(c.vu, float64(objSize))
|
stats.ReportDataReceived(c.vu, float64(objSize))
|
||||||
return GetResponse{Success: true}
|
return GetResponse{Success: true}
|
||||||
|
@ -428,7 +428,6 @@ func put(vu modules.VU, bufSize int, cli *client.Client, tok *session.Object,
|
||||||
sz := rdr.Size()
|
sz := rdr.Size()
|
||||||
|
|
||||||
// starting upload
|
// starting upload
|
||||||
stats.Report(vu, objPutTotal, 1)
|
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
|
|
||||||
var prm client.PrmObjectPutInit
|
var prm client.PrmObjectPutInit
|
||||||
|
@ -462,10 +461,11 @@ func put(vu modules.VU, bufSize int, cli *client.Client, tok *session.Object,
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
stats.Report(vu, objPutTotal, 1)
|
||||||
stats.ReportDataSent(vu, float64(sz))
|
stats.ReportDataSent(vu, float64(sz))
|
||||||
stats.Report(vu, objPutDuration, metrics.D(time.Since(start)))
|
stats.Report(vu, objPutDuration, metrics.D(time.Since(start)))
|
||||||
|
|
||||||
return resp, err
|
return resp, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func parseNetworkInfo(ctx context.Context, cli *client.Client) (maxObjSize, epoch uint64, hhDisabled bool, err error) {
|
func parseNetworkInfo(ctx context.Context, cli *client.Client) (maxObjSize, epoch uint64, hhDisabled bool, err error) {
|
||||||
|
|
|
@ -79,7 +79,7 @@ func (n *Native) Connect(endpoint, hexPrivateKey string, dialTimeout, streamTime
|
||||||
|
|
||||||
err = cli.Dial(prmDial)
|
err = cli.Dial(prmDial)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("dial endpoint: %w", err)
|
return nil, fmt.Errorf("dial endpoint: %w %w", endpoint, err)
|
||||||
}
|
}
|
||||||
https://github.com/TrueCloudLab/xk6-frostfs/pull/7
|
|||||||
|
|
||||||
// generate session token
|
// generate session token
|
||||||
|
@ -88,7 +88,7 @@ func (n *Native) Connect(endpoint, hexPrivateKey string, dialTimeout, streamTime
|
||||||
prmSessionCreate.SetExp(exp)
|
prmSessionCreate.SetExp(exp)
|
||||||
sessionResp, err := cli.SessionCreate(n.vu.Context(), prmSessionCreate)
|
sessionResp, err := cli.SessionCreate(n.vu.Context(), prmSessionCreate)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("dial endpoint: %w", err)
|
return nil, fmt.Errorf("dial endpoint: %w %w", endpoint, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
var id uuid.UUID
|
var id uuid.UUID
|
||||||
|
|
|
@ -53,8 +53,6 @@ func (c *Client) Put(bucket, key string, payload goja.ArrayBuffer) PutResponse {
|
||||||
rdr := bytes.NewReader(payload.Bytes())
|
rdr := bytes.NewReader(payload.Bytes())
|
||||||
sz := rdr.Size()
|
sz := rdr.Size()
|
||||||
|
|
||||||
stats.Report(c.vu, objPutTotal, 1)
|
|
||||||
|
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
_, err := c.cli.PutObject(c.vu.Context(), &s3.PutObjectInput{
|
_, err := c.cli.PutObject(c.vu.Context(), &s3.PutObjectInput{
|
||||||
Bucket: aws.String(bucket),
|
Bucket: aws.String(bucket),
|
||||||
|
@ -66,13 +64,13 @@ func (c *Client) Put(bucket, key string, payload goja.ArrayBuffer) PutResponse {
|
||||||
return PutResponse{Success: false, Error: err.Error()}
|
return PutResponse{Success: false, Error: err.Error()}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
stats.Report(c.vu, objPutTotal, 1)
|
||||||
stats.ReportDataSent(c.vu, float64(sz))
|
stats.ReportDataSent(c.vu, float64(sz))
|
||||||
stats.Report(c.vu, objPutDuration, metrics.D(time.Since(start)))
|
stats.Report(c.vu, objPutDuration, metrics.D(time.Since(start)))
|
||||||
return PutResponse{Success: true}
|
return PutResponse{Success: true}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Client) Delete(bucket, key string) DeleteResponse {
|
func (c *Client) Delete(bucket, key string) DeleteResponse {
|
||||||
stats.Report(c.vu, objDeleteTotal, 1)
|
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
|
|
||||||
_, err := c.cli.DeleteObject(c.vu.Context(), &s3.DeleteObjectInput{
|
_, err := c.cli.DeleteObject(c.vu.Context(), &s3.DeleteObjectInput{
|
||||||
|
@ -84,12 +82,12 @@ func (c *Client) Delete(bucket, key string) DeleteResponse {
|
||||||
return DeleteResponse{Success: false, Error: err.Error()}
|
return DeleteResponse{Success: false, Error: err.Error()}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
stats.Report(c.vu, objDeleteTotal, 1)
|
||||||
stats.Report(c.vu, objDeleteDuration, metrics.D(time.Since(start)))
|
stats.Report(c.vu, objDeleteDuration, metrics.D(time.Since(start)))
|
||||||
return DeleteResponse{Success: true}
|
return DeleteResponse{Success: true}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Client) Get(bucket, key string) GetResponse {
|
func (c *Client) Get(bucket, key string) GetResponse {
|
||||||
stats.Report(c.vu, objGetTotal, 1)
|
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
|
|
||||||
var objSize = 0
|
var objSize = 0
|
||||||
|
@ -101,6 +99,7 @@ func (c *Client) Get(bucket, key string) GetResponse {
|
||||||
return GetResponse{Success: false, Error: err.Error()}
|
return GetResponse{Success: false, Error: err.Error()}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
stats.Report(c.vu, objGetTotal, 1)
|
||||||
stats.Report(c.vu, objGetDuration, metrics.D(time.Since(start)))
|
stats.Report(c.vu, objGetDuration, metrics.D(time.Since(start)))
|
||||||
stats.ReportDataReceived(c.vu, float64(objSize))
|
stats.ReportDataReceived(c.vu, float64(objSize))
|
||||||
return GetResponse{Success: true}
|
return GetResponse{Success: true}
|
||||||
|
@ -151,8 +150,6 @@ func (c *Client) VerifyHash(bucket, key, expectedHash string) VerifyHashResponse
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Client) CreateBucket(bucket string, params map[string]string) CreateBucketResponse {
|
func (c *Client) CreateBucket(bucket string, params map[string]string) CreateBucketResponse {
|
||||||
stats.Report(c.vu, createBucketTotal, 1)
|
|
||||||
|
|
||||||
var err error
|
var err error
|
||||||
var lockEnabled bool
|
var lockEnabled bool
|
||||||
if lockEnabledStr, ok := params["lock_enabled"]; ok {
|
if lockEnabledStr, ok := params["lock_enabled"]; ok {
|
||||||
|
@ -181,6 +178,7 @@ func (c *Client) CreateBucket(bucket string, params map[string]string) CreateBuc
|
||||||
return CreateBucketResponse{Success: false, Error: err.Error()}
|
return CreateBucketResponse{Success: false, Error: err.Error()}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
stats.Report(c.vu, createBucketTotal, 1)
|
||||||
stats.Report(c.vu, createBucketDuration, metrics.D(time.Since(start)))
|
stats.Report(c.vu, createBucketDuration, metrics.D(time.Since(start)))
|
||||||
return CreateBucketResponse{Success: true}
|
return CreateBucketResponse{Success: true}
|
||||||
}
|
}
|
||||||
|
|
|
@ -3,6 +3,7 @@
|
||||||
import argparse
|
import argparse
|
||||||
import json
|
import json
|
||||||
import random
|
import random
|
||||||
|
import sys
|
||||||
|
|
||||||
from argparse import Namespace
|
from argparse import Namespace
|
||||||
from concurrent.futures import ProcessPoolExecutor
|
from concurrent.futures import ProcessPoolExecutor
|
||||||
|
@ -10,6 +11,10 @@ from concurrent.futures import ProcessPoolExecutor
|
||||||
from helpers.cmd import random_payload
|
from helpers.cmd import random_payload
|
||||||
from helpers.frostfs_cli import create_container, upload_object
|
from helpers.frostfs_cli import create_container, upload_object
|
||||||
|
|
||||||
|
ERROR_NO_CONTAINERS = 1
|
||||||
|
ERROR_NO_OBJECTS = 2
|
||||||
|
MAX_WORKERS = 50
|
||||||
|
|
||||||
parser = argparse.ArgumentParser()
|
parser = argparse.ArgumentParser()
|
||||||
parser.add_argument('--size', help='Upload objects size in kb')
|
parser.add_argument('--size', help='Upload objects size in kb')
|
||||||
parser.add_argument('--containers', help='Number of containers to create')
|
parser.add_argument('--containers', help='Number of containers to create')
|
||||||
|
@ -24,6 +29,8 @@ parser.add_argument(
|
||||||
)
|
)
|
||||||
parser.add_argument('--endpoint', help='Node address')
|
parser.add_argument('--endpoint', help='Node address')
|
||||||
parser.add_argument('--update', help='Save existed containers')
|
parser.add_argument('--update', help='Save existed containers')
|
||||||
|
parser.add_argument('--ignore-errors', help='Ignore preset errors')
|
||||||
|
parser.add_argument('--workers', help='Count of workers in preset. Max = 50, Default = 50', default=50)
|
||||||
|
|
||||||
args: Namespace = parser.parse_args()
|
args: Namespace = parser.parse_args()
|
||||||
print(args)
|
print(args)
|
||||||
|
@ -31,14 +38,15 @@ print(args)
|
||||||
|
|
||||||
def main():
|
def main():
|
||||||
container_list = []
|
container_list = []
|
||||||
objects_struct = []
|
objects_list = []
|
||||||
payload_filepath = '/tmp/data_file'
|
payload_filepath = '/tmp/data_file'
|
||||||
|
|
||||||
endpoints = args.endpoint.split(',')
|
endpoints = args.endpoint.split(',')
|
||||||
|
|
||||||
wallet = args.wallet
|
wallet = args.wallet
|
||||||
wallet_config = args.config
|
wallet_config = args.config
|
||||||
|
workers = int(args.workers)
|
||||||
|
ignore_errors = True if args.ignore_errors else False
|
||||||
if args.update:
|
if args.update:
|
||||||
# Open file
|
# Open file
|
||||||
with open(args.out) as f:
|
with open(args.out) as f:
|
||||||
|
@ -46,7 +54,7 @@ def main():
|
||||||
container_list = data_json['containers']
|
container_list = data_json['containers']
|
||||||
else:
|
else:
|
||||||
print(f"Create containers: {args.containers}")
|
print(f"Create containers: {args.containers}")
|
||||||
with ProcessPoolExecutor(max_workers=50) as executor:
|
with ProcessPoolExecutor(max_workers=min(MAX_WORKERS, workers)) as executor:
|
||||||
containers_runs = {executor.submit(create_container, endpoints[random.randrange(len(endpoints))],
|
containers_runs = {executor.submit(create_container, endpoints[random.randrange(len(endpoints))],
|
||||||
args.policy, wallet, wallet_config): _ for _ in range(int(args.containers))}
|
args.policy, wallet, wallet_config): _ for _ in range(int(args.containers))}
|
||||||
|
|
||||||
|
@ -58,7 +66,9 @@ def main():
|
||||||
|
|
||||||
print(f" > Containers: {container_list}")
|
print(f" > Containers: {container_list}")
|
||||||
if not container_list:
|
if not container_list:
|
||||||
return
|
print("No containers to work with")
|
||||||
|
if not ignore_errors:
|
||||||
|
sys.exit(ERROR_NO_CONTAINERS)
|
||||||
|
|
||||||
print(f"Upload objects to each container: {args.preload_obj} ")
|
print(f"Upload objects to each container: {args.preload_obj} ")
|
||||||
random_payload(payload_filepath, args.size)
|
random_payload(payload_filepath, args.size)
|
||||||
|
@ -66,25 +76,30 @@ def main():
|
||||||
|
|
||||||
for container in container_list:
|
for container in container_list:
|
||||||
print(f" > Upload objects for container {container}")
|
print(f" > Upload objects for container {container}")
|
||||||
with ProcessPoolExecutor(max_workers=50) as executor:
|
with ProcessPoolExecutor(max_workers=min(MAX_WORKERS, workers)) as executor:
|
||||||
objects_runs = {executor.submit(upload_object, container, payload_filepath,
|
objects_runs = {executor.submit(upload_object, container, payload_filepath,
|
||||||
endpoints[random.randrange(len(endpoints))], wallet, wallet_config): _ for _ in range(int(args.preload_obj))}
|
endpoints[random.randrange(len(endpoints))], wallet, wallet_config): _ for _ in range(int(args.preload_obj))}
|
||||||
|
|
||||||
for run in objects_runs:
|
for run in objects_runs:
|
||||||
if run.result():
|
if run.result():
|
||||||
objects_struct.append({'container': container, 'object': run.result()})
|
objects_list.append({'container': container, 'object': run.result()})
|
||||||
print(f" > Upload objects for container {container}: Completed")
|
print(f" > Upload objects for container {container}: Completed")
|
||||||
|
|
||||||
print("Upload objects to each container: Completed")
|
print("Upload objects to each container: Completed")
|
||||||
|
|
||||||
data = {'containers': container_list, 'objects': objects_struct, 'obj_size': args.size + " Kb"}
|
if int(args.preload_obj) > 0 and not objects_list:
|
||||||
|
print("No objects were uploaded")
|
||||||
|
if not ignore_errors:
|
||||||
|
sys.exit(ERROR_NO_OBJECTS)
|
||||||
|
|
||||||
|
data = {'containers': container_list, 'objects': objects_list, 'obj_size': args.size + " Kb"}
|
||||||
|
|
||||||
with open(args.out, 'w+') as f:
|
with open(args.out, 'w+') as f:
|
||||||
json.dump(data, f, ensure_ascii=False, indent=2)
|
json.dump(data, f, ensure_ascii=False, indent=2)
|
||||||
|
|
||||||
print(f"Result:")
|
print("Result:")
|
||||||
print(f" > Total Containers has been created: {len(container_list)}.")
|
print(f" > Total Containers has been created: {len(container_list)}.")
|
||||||
print(f" > Total Objects has been created: {len(objects_struct)}.")
|
print(f" > Total Objects has been created: {len(objects_list)}.")
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
|
|
|
@ -2,6 +2,7 @@
|
||||||
|
|
||||||
import argparse
|
import argparse
|
||||||
import json
|
import json
|
||||||
|
import sys
|
||||||
from concurrent.futures import ProcessPoolExecutor
|
from concurrent.futures import ProcessPoolExecutor
|
||||||
|
|
||||||
from helpers.cmd import random_payload
|
from helpers.cmd import random_payload
|
||||||
|
@ -18,16 +19,23 @@ parser.add_argument('--update', help='True/False, False by default. Save existed
|
||||||
'New buckets will not be created.')
|
'New buckets will not be created.')
|
||||||
parser.add_argument('--location', help='AWS location. Will be empty, if has not be declared.', default="")
|
parser.add_argument('--location', help='AWS location. Will be empty, if has not be declared.', default="")
|
||||||
parser.add_argument('--versioning', help='True/False, False by default.')
|
parser.add_argument('--versioning', help='True/False, False by default.')
|
||||||
|
parser.add_argument('--ignore-errors', help='Ignore preset errors')
|
||||||
|
parser.add_argument('--workers', help='Count of workers in preset. Max = 50, Default = 50', default=50)
|
||||||
|
|
||||||
args = parser.parse_args()
|
args = parser.parse_args()
|
||||||
print(args)
|
print(args)
|
||||||
|
|
||||||
|
ERROR_NO_BUCKETS = 1
|
||||||
|
ERROR_NO_OBJECTS = 2
|
||||||
|
MAX_WORKERS = 50
|
||||||
|
|
||||||
def main():
|
def main():
|
||||||
bucket_list = []
|
bucket_list = []
|
||||||
objects_struct = []
|
objects_list = []
|
||||||
payload_filepath = '/tmp/data_file'
|
payload_filepath = '/tmp/data_file'
|
||||||
|
ignore_errors = True if args.ignore_errors else False
|
||||||
|
|
||||||
|
workers = int(args.workers)
|
||||||
if args.update:
|
if args.update:
|
||||||
# Open file
|
# Open file
|
||||||
with open(args.out) as f:
|
with open(args.out) as f:
|
||||||
|
@ -37,7 +45,7 @@ def main():
|
||||||
else:
|
else:
|
||||||
print(f"Create buckets: {args.buckets}")
|
print(f"Create buckets: {args.buckets}")
|
||||||
|
|
||||||
with ProcessPoolExecutor(max_workers=10) as executor:
|
with ProcessPoolExecutor(max_workers=min(MAX_WORKERS, workers)) as executor:
|
||||||
buckets_runs = {executor.submit(create_bucket, args.endpoint, args.versioning,
|
buckets_runs = {executor.submit(create_bucket, args.endpoint, args.versioning,
|
||||||
args.location): _ for _ in range(int(args.buckets))}
|
args.location): _ for _ in range(int(args.buckets))}
|
||||||
|
|
||||||
|
@ -48,6 +56,10 @@ def main():
|
||||||
print("Create buckets: Completed")
|
print("Create buckets: Completed")
|
||||||
|
|
||||||
print(f" > Buckets: {bucket_list}")
|
print(f" > Buckets: {bucket_list}")
|
||||||
|
if not bucket_list:
|
||||||
|
print("No buckets to work with")
|
||||||
|
if not ignore_errors:
|
||||||
|
sys.exit(ERROR_NO_BUCKETS)
|
||||||
|
|
||||||
print(f"Upload objects to each bucket: {args.preload_obj} ")
|
print(f"Upload objects to each bucket: {args.preload_obj} ")
|
||||||
random_payload(payload_filepath, args.size)
|
random_payload(payload_filepath, args.size)
|
||||||
|
@ -55,25 +67,30 @@ def main():
|
||||||
|
|
||||||
for bucket in bucket_list:
|
for bucket in bucket_list:
|
||||||
print(f" > Upload objects for bucket {bucket}")
|
print(f" > Upload objects for bucket {bucket}")
|
||||||
with ProcessPoolExecutor(max_workers=50) as executor:
|
with ProcessPoolExecutor(max_workers=min(MAX_WORKERS, workers)) as executor:
|
||||||
objects_runs = {executor.submit(upload_object, bucket, payload_filepath,
|
objects_runs = {executor.submit(upload_object, bucket, payload_filepath,
|
||||||
args.endpoint): _ for _ in range(int(args.preload_obj))}
|
args.endpoint): _ for _ in range(int(args.preload_obj))}
|
||||||
|
|
||||||
for run in objects_runs:
|
for run in objects_runs:
|
||||||
if run.result() is not None:
|
if run.result() is not None:
|
||||||
objects_struct.append({'bucket': bucket, 'object': run.result()})
|
objects_list.append({'bucket': bucket, 'object': run.result()})
|
||||||
print(f" > Upload objects for bucket {bucket}: Completed")
|
print(f" > Upload objects for bucket {bucket}: Completed")
|
||||||
|
|
||||||
print("Upload objects to each bucket: Completed")
|
print("Upload objects to each bucket: Completed")
|
||||||
|
|
||||||
data = {'buckets': bucket_list, 'objects': objects_struct, 'obj_size': args.size + " Kb"}
|
if int(args.preload_obj) > 0 and not objects_list:
|
||||||
|
print("No objects were uploaded")
|
||||||
|
if not ignore_errors:
|
||||||
|
sys.exit(ERROR_NO_OBJECTS)
|
||||||
|
|
||||||
|
data = {'buckets': bucket_list, 'objects': objects_list, 'obj_size': args.size + " Kb"}
|
||||||
|
|
||||||
with open(args.out, 'w+') as f:
|
with open(args.out, 'w+') as f:
|
||||||
json.dump(data, f, ensure_ascii=False, indent=2)
|
json.dump(data, f, ensure_ascii=False, indent=2)
|
||||||
|
|
||||||
print(f"Result:")
|
print("Result:")
|
||||||
print(f" > Total Buckets has been created: {len(bucket_list)}.")
|
print(f" > Total Buckets has been created: {len(bucket_list)}.")
|
||||||
print(f" > Total Objects has been created: {len(objects_struct)}.")
|
print(f" > Total Objects has been created: {len(objects_list)}.")
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
|
|
Loading…
Reference in a new issue
This line should have been:
return nil, fmt.Errorf("dial endpoint: %v %w", endpoint, err)
(
%v %w
instead of%w %w
).