forked from TrueCloudLab/xk6-frostfs
303 lines
7.7 KiB
Go
303 lines
7.7 KiB
Go
package s3
|
|
|
|
import (
|
|
"context"
|
|
"crypto/sha256"
|
|
"encoding/hex"
|
|
"fmt"
|
|
"strconv"
|
|
"time"
|
|
|
|
"git.frostfs.info/TrueCloudLab/xk6-frostfs/internal/datagen"
|
|
"git.frostfs.info/TrueCloudLab/xk6-frostfs/internal/stats"
|
|
"github.com/aws/aws-sdk-go-v2/aws"
|
|
"github.com/aws/aws-sdk-go-v2/feature/s3/manager"
|
|
"github.com/aws/aws-sdk-go-v2/service/s3"
|
|
"github.com/aws/aws-sdk-go-v2/service/s3/types"
|
|
"go.k6.io/k6/js/modules"
|
|
"go.k6.io/k6/metrics"
|
|
)
|
|
|
|
type (
|
|
Client struct {
|
|
vu modules.VU
|
|
cli *s3.Client
|
|
}
|
|
|
|
PutResponse struct {
|
|
Success bool
|
|
Error string
|
|
}
|
|
|
|
DeleteResponse struct {
|
|
Success bool
|
|
Error string
|
|
}
|
|
|
|
GetResponse struct {
|
|
Success bool
|
|
Error string
|
|
}
|
|
|
|
CreateBucketResponse struct {
|
|
Success bool
|
|
Error string
|
|
}
|
|
|
|
VerifyHashResponse struct {
|
|
Success bool
|
|
Error string
|
|
}
|
|
)
|
|
|
|
func (c *Client) Put(bucket, key string, payload datagen.Payload) PutResponse {
|
|
rdr := payload.Reader()
|
|
sz := payload.Size()
|
|
|
|
start := time.Now()
|
|
_, err := c.cli.PutObject(c.vu.Context(), &s3.PutObjectInput{
|
|
Bucket: aws.String(bucket),
|
|
Key: aws.String(key),
|
|
Body: rdr,
|
|
})
|
|
if err != nil {
|
|
stats.Report(c.vu, objPutFails, 1)
|
|
return PutResponse{Success: false, Error: err.Error()}
|
|
}
|
|
|
|
stats.Report(c.vu, objPutSuccess, 1)
|
|
stats.ReportDataSent(c.vu, float64(sz))
|
|
stats.Report(c.vu, objPutDuration, metrics.D(time.Since(start)))
|
|
stats.Report(c.vu, objPutData, float64(sz))
|
|
return PutResponse{Success: true}
|
|
}
|
|
|
|
const multipartUploadMinPartSize = 5 * 1024 * 1024 // 5MB
|
|
|
|
func (c *Client) Multipart(bucket, key string, objPartSize, concurrency int, payload datagen.Payload) PutResponse {
|
|
if objPartSize < multipartUploadMinPartSize {
|
|
stats.Report(c.vu, objPutFails, 1)
|
|
return PutResponse{Success: false, Error: fmt.Sprintf("part size '%d' must be greater than '%d'(5 MB)", objPartSize, multipartUploadMinPartSize)}
|
|
}
|
|
|
|
start := time.Now()
|
|
uploader := manager.NewUploader(c.cli, func(u *manager.Uploader) {
|
|
u.PartSize = int64(objPartSize)
|
|
u.Concurrency = concurrency
|
|
})
|
|
|
|
payloadReader := payload.Reader()
|
|
sz := payload.Size()
|
|
|
|
_, err := uploader.Upload(c.vu.Context(), &s3.PutObjectInput{
|
|
Bucket: aws.String(bucket),
|
|
Key: aws.String(key),
|
|
Body: payloadReader,
|
|
})
|
|
if err != nil {
|
|
stats.Report(c.vu, objPutFails, 1)
|
|
return PutResponse{Success: false, Error: err.Error()}
|
|
}
|
|
|
|
stats.Report(c.vu, objPutSuccess, 1)
|
|
stats.ReportDataSent(c.vu, float64(sz))
|
|
stats.Report(c.vu, objPutDuration, metrics.D(time.Since(start)))
|
|
stats.Report(c.vu, objPutData, float64(sz))
|
|
return PutResponse{Success: true}
|
|
}
|
|
|
|
func (c *Client) Delete(bucket, key string) DeleteResponse {
|
|
start := time.Now()
|
|
|
|
_, err := c.cli.DeleteObject(c.vu.Context(), &s3.DeleteObjectInput{
|
|
Bucket: aws.String(bucket),
|
|
Key: aws.String(key),
|
|
})
|
|
if err != nil {
|
|
stats.Report(c.vu, objDeleteFails, 1)
|
|
return DeleteResponse{Success: false, Error: err.Error()}
|
|
}
|
|
|
|
stats.Report(c.vu, objDeleteSuccess, 1)
|
|
stats.Report(c.vu, objDeleteDuration, metrics.D(time.Since(start)))
|
|
return DeleteResponse{Success: true}
|
|
}
|
|
|
|
func (c *Client) Get(bucket, key string) GetResponse {
|
|
start := time.Now()
|
|
|
|
objSize := 0
|
|
err := get(c.cli, bucket, key, func(chunk []byte) {
|
|
objSize += len(chunk)
|
|
})
|
|
if err != nil {
|
|
stats.Report(c.vu, objGetFails, 1)
|
|
return GetResponse{Success: false, Error: err.Error()}
|
|
}
|
|
|
|
stats.Report(c.vu, objGetSuccess, 1)
|
|
stats.Report(c.vu, objGetDuration, metrics.D(time.Since(start)))
|
|
stats.ReportDataReceived(c.vu, float64(objSize))
|
|
stats.Report(c.vu, objGetData, float64(objSize))
|
|
return GetResponse{Success: true}
|
|
}
|
|
|
|
func (c *Client) DeleteObjectVersion(bucket, key, version string) DeleteResponse {
|
|
var toDelete []types.ObjectIdentifier
|
|
|
|
if version != "" {
|
|
toDelete = append(toDelete, types.ObjectIdentifier{
|
|
Key: aws.String(key),
|
|
VersionId: aws.String(version),
|
|
})
|
|
} else {
|
|
versions, err := c.cli.ListObjectVersions(c.vu.Context(), &s3.ListObjectVersionsInput{
|
|
Bucket: aws.String(bucket),
|
|
Prefix: aws.String(key),
|
|
})
|
|
if err != nil {
|
|
stats.Report(c.vu, objDeleteFails, 1)
|
|
return DeleteResponse{Success: false, Error: err.Error()}
|
|
}
|
|
toDelete = filterObjectVersions(versions, key)
|
|
}
|
|
if len(toDelete) == 0 {
|
|
return c.Delete(bucket, key)
|
|
} else {
|
|
_, err := c.cli.DeleteObjects(c.vu.Context(), &s3.DeleteObjectsInput{
|
|
Bucket: aws.String(bucket),
|
|
Delete: &types.Delete{
|
|
Objects: toDelete,
|
|
Quiet: true,
|
|
},
|
|
})
|
|
if err != nil {
|
|
stats.Report(c.vu, objDeleteFails, 1)
|
|
return DeleteResponse{Success: false, Error: err.Error()}
|
|
}
|
|
}
|
|
|
|
return DeleteResponse{Success: true}
|
|
}
|
|
|
|
func filterObjectVersions(versions *s3.ListObjectVersionsOutput, key string) []types.ObjectIdentifier {
|
|
var result []types.ObjectIdentifier
|
|
|
|
for _, v := range versions.Versions {
|
|
if *v.Key == key {
|
|
result = append(result, types.ObjectIdentifier{
|
|
Key: v.Key,
|
|
VersionId: v.VersionId,
|
|
})
|
|
}
|
|
}
|
|
|
|
for _, marker := range versions.DeleteMarkers {
|
|
if *marker.Key == key {
|
|
result = append(result, types.ObjectIdentifier{
|
|
Key: marker.Key,
|
|
VersionId: marker.VersionId,
|
|
})
|
|
}
|
|
}
|
|
|
|
return result
|
|
}
|
|
|
|
func get(
|
|
c *s3.Client,
|
|
bucket string,
|
|
key string,
|
|
onDataChunk func(chunk []byte),
|
|
) error {
|
|
var buf = make([]byte, 4*1024)
|
|
|
|
obj, err := c.GetObject(context.Background(), &s3.GetObjectInput{
|
|
Bucket: aws.String(bucket),
|
|
Key: aws.String(key),
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
for {
|
|
n, err := obj.Body.Read(buf)
|
|
if n > 0 {
|
|
onDataChunk(buf[:n])
|
|
}
|
|
if err != nil {
|
|
break
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (c *Client) VerifyHash(bucket, key, expectedHash string) VerifyHashResponse {
|
|
hasher := sha256.New()
|
|
err := get(c.cli, bucket, key, func(data []byte) {
|
|
hasher.Write(data)
|
|
})
|
|
if err != nil {
|
|
return VerifyHashResponse{Success: false, Error: err.Error()}
|
|
}
|
|
actualHash := hex.EncodeToString(hasher.Sum(nil))
|
|
if actualHash != expectedHash {
|
|
return VerifyHashResponse{Success: false, Error: "hash mismatch"}
|
|
}
|
|
|
|
return VerifyHashResponse{Success: true}
|
|
}
|
|
|
|
func (c *Client) CreateBucket(bucket string, params map[string]string) CreateBucketResponse {
|
|
var err error
|
|
var lockEnabled bool
|
|
if lockEnabledStr, ok := params["lock_enabled"]; ok {
|
|
if lockEnabled, err = strconv.ParseBool(lockEnabledStr); err != nil {
|
|
stats.Report(c.vu, createBucketFails, 1)
|
|
return CreateBucketResponse{Success: false, Error: "invalid lock_enabled params"}
|
|
}
|
|
}
|
|
|
|
var bucketConfiguration *types.CreateBucketConfiguration
|
|
if locationConstraint, ok := params["location_constraint"]; ok {
|
|
bucketConfiguration = &types.CreateBucketConfiguration{
|
|
LocationConstraint: types.BucketLocationConstraint(locationConstraint),
|
|
}
|
|
}
|
|
|
|
start := time.Now()
|
|
_, err = c.cli.CreateBucket(c.vu.Context(), &s3.CreateBucketInput{
|
|
Bucket: aws.String(bucket),
|
|
ACL: types.BucketCannedACL(params["acl"]),
|
|
CreateBucketConfiguration: bucketConfiguration,
|
|
ObjectLockEnabledForBucket: lockEnabled,
|
|
})
|
|
if err != nil {
|
|
stats.Report(c.vu, createBucketFails, 1)
|
|
return CreateBucketResponse{Success: false, Error: err.Error()}
|
|
}
|
|
|
|
var versioning bool
|
|
if strVersioned, ok := params["versioning"]; ok {
|
|
if versioning, err = strconv.ParseBool(strVersioned); err != nil {
|
|
stats.Report(c.vu, createBucketFails, 1)
|
|
return CreateBucketResponse{Success: false, Error: err.Error()}
|
|
}
|
|
}
|
|
if versioning {
|
|
_, err = c.cli.PutBucketVersioning(context.TODO(), &s3.PutBucketVersioningInput{
|
|
Bucket: aws.String(bucket),
|
|
VersioningConfiguration: &types.VersioningConfiguration{
|
|
Status: types.BucketVersioningStatusEnabled,
|
|
},
|
|
})
|
|
if err != nil {
|
|
stats.Report(c.vu, createBucketFails, 1)
|
|
return CreateBucketResponse{Success: false, Error: err.Error()}
|
|
}
|
|
}
|
|
|
|
stats.Report(c.vu, createBucketSuccess, 1)
|
|
stats.Report(c.vu, createBucketDuration, metrics.D(time.Since(start)))
|
|
return CreateBucketResponse{Success: true}
|
|
}
|