forked from TrueCloudLab/xk6-frostfs
Set background timeout
Signed-off-by: Alex Vanin <a.vanin@yadro.com>
This commit is contained in:
parent
48a95bc50b
commit
96e0e0188e
2 changed files with 31 additions and 7 deletions
|
@ -23,6 +23,7 @@ type (
|
||||||
Client struct {
|
Client struct {
|
||||||
vu modules.VU
|
vu modules.VU
|
||||||
cli *s3.Client
|
cli *s3.Client
|
||||||
|
to time.Duration
|
||||||
}
|
}
|
||||||
|
|
||||||
PutResponse struct {
|
PutResponse struct {
|
||||||
|
@ -56,7 +57,11 @@ func (c *Client) Put(bucket, key string, payload goja.ArrayBuffer) PutResponse {
|
||||||
sz := rdr.Size()
|
sz := rdr.Size()
|
||||||
|
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
_, err := c.cli.PutObject(c.vu.Context(), &s3.PutObjectInput{
|
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), c.to)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
_, err := c.cli.PutObject(ctx, &s3.PutObjectInput{
|
||||||
Bucket: aws.String(bucket),
|
Bucket: aws.String(bucket),
|
||||||
Key: aws.String(key),
|
Key: aws.String(key),
|
||||||
Body: rdr,
|
Body: rdr,
|
||||||
|
@ -89,7 +94,10 @@ func (c *Client) Multipart(bucket, key string, objPartSize, concurrency int, pay
|
||||||
payloadReader := bytes.NewReader(payload.Bytes())
|
payloadReader := bytes.NewReader(payload.Bytes())
|
||||||
sz := payloadReader.Len()
|
sz := payloadReader.Len()
|
||||||
|
|
||||||
_, err := uploader.Upload(c.vu.Context(), &s3.PutObjectInput{
|
ctx, cancel := context.WithTimeout(context.Background(), c.to)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
_, err := uploader.Upload(ctx, &s3.PutObjectInput{
|
||||||
Bucket: aws.String(bucket),
|
Bucket: aws.String(bucket),
|
||||||
Key: aws.String(key),
|
Key: aws.String(key),
|
||||||
Body: payloadReader,
|
Body: payloadReader,
|
||||||
|
@ -108,7 +116,10 @@ func (c *Client) Multipart(bucket, key string, objPartSize, concurrency int, pay
|
||||||
func (c *Client) Delete(bucket, key string) DeleteResponse {
|
func (c *Client) Delete(bucket, key string) DeleteResponse {
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
|
|
||||||
_, err := c.cli.DeleteObject(c.vu.Context(), &s3.DeleteObjectInput{
|
ctx, cancel := context.WithTimeout(context.Background(), c.to)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
_, err := c.cli.DeleteObject(ctx, &s3.DeleteObjectInput{
|
||||||
Bucket: aws.String(bucket),
|
Bucket: aws.String(bucket),
|
||||||
Key: aws.String(key),
|
Key: aws.String(key),
|
||||||
})
|
})
|
||||||
|
@ -125,8 +136,11 @@ func (c *Client) Delete(bucket, key string) DeleteResponse {
|
||||||
func (c *Client) Get(bucket, key string) GetResponse {
|
func (c *Client) Get(bucket, key string) GetResponse {
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
|
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), c.to)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
var objSize = 0
|
var objSize = 0
|
||||||
err := get(c.cli, bucket, key, func(chunk []byte) {
|
err := get(ctx, c.cli, bucket, key, func(chunk []byte) {
|
||||||
objSize += len(chunk)
|
objSize += len(chunk)
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -141,6 +155,7 @@ func (c *Client) Get(bucket, key string) GetResponse {
|
||||||
}
|
}
|
||||||
|
|
||||||
func get(
|
func get(
|
||||||
|
ctx context.Context,
|
||||||
c *s3.Client,
|
c *s3.Client,
|
||||||
bucket string,
|
bucket string,
|
||||||
key string,
|
key string,
|
||||||
|
@ -148,7 +163,7 @@ func get(
|
||||||
) error {
|
) error {
|
||||||
var buf = make([]byte, 4*1024)
|
var buf = make([]byte, 4*1024)
|
||||||
|
|
||||||
obj, err := c.GetObject(context.Background(), &s3.GetObjectInput{
|
obj, err := c.GetObject(ctx, &s3.GetObjectInput{
|
||||||
Bucket: aws.String(bucket),
|
Bucket: aws.String(bucket),
|
||||||
Key: aws.String(key),
|
Key: aws.String(key),
|
||||||
})
|
})
|
||||||
|
@ -170,7 +185,11 @@ func get(
|
||||||
|
|
||||||
func (c *Client) VerifyHash(bucket, key, expectedHash string) VerifyHashResponse {
|
func (c *Client) VerifyHash(bucket, key, expectedHash string) VerifyHashResponse {
|
||||||
hasher := sha256.New()
|
hasher := sha256.New()
|
||||||
err := get(c.cli, bucket, key, func(data []byte) {
|
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), c.to)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
err := get(ctx, c.cli, bucket, key, func(data []byte) {
|
||||||
hasher.Write(data)
|
hasher.Write(data)
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -202,7 +221,11 @@ func (c *Client) CreateBucket(bucket string, params map[string]string) CreateBuc
|
||||||
}
|
}
|
||||||
|
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
_, err = c.cli.CreateBucket(c.vu.Context(), &s3.CreateBucketInput{
|
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), c.to)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
_, err = c.cli.CreateBucket(ctx, &s3.CreateBucketInput{
|
||||||
Bucket: aws.String(bucket),
|
Bucket: aws.String(bucket),
|
||||||
ACL: types.BucketCannedACL(params["acl"]),
|
ACL: types.BucketCannedACL(params["acl"]),
|
||||||
CreateBucketConfiguration: bucketConfiguration,
|
CreateBucketConfiguration: bucketConfiguration,
|
||||||
|
|
|
@ -114,5 +114,6 @@ func (s *S3) Connect(endpoint string, params map[string]string) (*Client, error)
|
||||||
return &Client{
|
return &Client{
|
||||||
vu: s.vu,
|
vu: s.vu,
|
||||||
cli: cli,
|
cli: cli,
|
||||||
|
to: timeout,
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue