xk6-frostfs/internal/s3/client.go

306 lines
7.9 KiB
Go
Raw Permalink Normal View History

package s3
import (
"context"
"crypto/sha256"
"encoding/hex"
"fmt"
"strconv"
"time"
"git.frostfs.info/TrueCloudLab/xk6-frostfs/internal/datagen"
"git.frostfs.info/TrueCloudLab/xk6-frostfs/internal/stats"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/feature/s3/manager"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/aws/aws-sdk-go-v2/service/s3/types"
"go.k6.io/k6/js/modules"
"go.k6.io/k6/metrics"
)
type (
Client struct {
vu modules.VU
cli *s3.Client
}
PutResponse struct {
Success bool
Error string
}
DeleteResponse struct {
Success bool
Error string
}
GetResponse struct {
Success bool
Error string
}
CreateBucketResponse struct {
Success bool
Error string
}
VerifyHashResponse struct {
Success bool
Error string
}
)
func (c *Client) Put(bucket, key string, payload datagen.Payload) PutResponse {
rdr := payload.Reader()
sz := payload.Size()
start := time.Now()
_, err := c.cli.PutObject(c.vu.Context(), &s3.PutObjectInput{
Bucket: aws.String(bucket),
Key: aws.String(key),
Body: rdr,
})
if err != nil {
stats.Report(c.vu, objPutFails, 1)
return PutResponse{Success: false, Error: err.Error()}
}
stats.Report(c.vu, objPutSuccess, 1)
stats.ReportDataSent(c.vu, float64(sz))
stats.Report(c.vu, objPutDuration, metrics.D(time.Since(start)))
stats.Report(c.vu, objPutData, float64(sz))
return PutResponse{Success: true}
}
const multipartUploadMinPartSize = 5 * 1024 * 1024 // 5MB
func (c *Client) Multipart(bucket, key string, objPartSize, concurrency int, payload datagen.Payload) PutResponse {
if objPartSize < multipartUploadMinPartSize {
stats.Report(c.vu, objPutFails, 1)
return PutResponse{Success: false, Error: fmt.Sprintf("part size '%d' must be greater than '%d'(5 MB)", objPartSize, multipartUploadMinPartSize)}
}
start := time.Now()
uploader := manager.NewUploader(c.cli, func(u *manager.Uploader) {
u.PartSize = int64(objPartSize)
u.Concurrency = concurrency
})
payloadReader := payload.Reader()
sz := payload.Size()
_, err := uploader.Upload(c.vu.Context(), &s3.PutObjectInput{
Bucket: aws.String(bucket),
Key: aws.String(key),
Body: payloadReader,
})
if err != nil {
stats.Report(c.vu, objPutFails, 1)
return PutResponse{Success: false, Error: err.Error()}
}
stats.Report(c.vu, objPutSuccess, 1)
stats.ReportDataSent(c.vu, float64(sz))
stats.Report(c.vu, objPutDuration, metrics.D(time.Since(start)))
stats.Report(c.vu, objPutData, float64(sz))
return PutResponse{Success: true}
}
func (c *Client) Delete(bucket, key string) DeleteResponse {
start := time.Now()
_, err := c.cli.DeleteObject(c.vu.Context(), &s3.DeleteObjectInput{
Bucket: aws.String(bucket),
Key: aws.String(key),
})
if err != nil {
stats.Report(c.vu, objDeleteFails, 1)
return DeleteResponse{Success: false, Error: err.Error()}
}
stats.Report(c.vu, objDeleteSuccess, 1)
stats.Report(c.vu, objDeleteDuration, metrics.D(time.Since(start)))
return DeleteResponse{Success: true}
}
func (c *Client) Get(bucket, key string) GetResponse {
start := time.Now()
objSize := 0
err := get(c.cli, bucket, key, func(chunk []byte) {
objSize += len(chunk)
})
if err != nil {
stats.Report(c.vu, objGetFails, 1)
return GetResponse{Success: false, Error: err.Error()}
}
stats.Report(c.vu, objGetSuccess, 1)
stats.Report(c.vu, objGetDuration, metrics.D(time.Since(start)))
stats.ReportDataReceived(c.vu, float64(objSize))
stats.Report(c.vu, objGetData, float64(objSize))
return GetResponse{Success: true}
}
// DeleteObjectVersion deletes object version with specified versionID.
// If version argument is empty, deletes all versions and delete-markers of specified object.
func (c *Client) DeleteObjectVersion(bucket, key, version string) DeleteResponse {
var toDelete []types.ObjectIdentifier
if version != "" {
toDelete = append(toDelete, types.ObjectIdentifier{
Key: aws.String(key),
VersionId: aws.String(version),
})
} else {
versions, err := c.cli.ListObjectVersions(c.vu.Context(), &s3.ListObjectVersionsInput{
Bucket: aws.String(bucket),
Prefix: aws.String(key),
})
if err != nil {
stats.Report(c.vu, objDeleteFails, 1)
return DeleteResponse{Success: false, Error: err.Error()}
}
toDelete = filterObjectVersions(versions, key)
}
if len(toDelete) == 0 {
return c.Delete(bucket, key)
} else {
_, err := c.cli.DeleteObjects(c.vu.Context(), &s3.DeleteObjectsInput{
Bucket: aws.String(bucket),
Delete: &types.Delete{
Objects: toDelete,
Quiet: true,
},
})
if err != nil {
stats.Report(c.vu, objDeleteFails, 1)
return DeleteResponse{Success: false, Error: err.Error()}
}
}
return DeleteResponse{Success: true}
}
func filterObjectVersions(versions *s3.ListObjectVersionsOutput, key string) []types.ObjectIdentifier {
var result []types.ObjectIdentifier
for _, v := range versions.Versions {
if *v.Key == key {
result = append(result, types.ObjectIdentifier{
Key: v.Key,
VersionId: v.VersionId,
})
}
}
for _, marker := range versions.DeleteMarkers {
if *marker.Key == key {
result = append(result, types.ObjectIdentifier{
Key: marker.Key,
VersionId: marker.VersionId,
})
}
}
return result
}
func get(
c *s3.Client,
bucket string,
key string,
onDataChunk func(chunk []byte),
) error {
var buf = make([]byte, 4*1024)
obj, err := c.GetObject(context.Background(), &s3.GetObjectInput{
Bucket: aws.String(bucket),
Key: aws.String(key),
})
if err != nil {
return err
}
for {
n, err := obj.Body.Read(buf)
if n > 0 {
onDataChunk(buf[:n])
}
if err != nil {
break
}
}
return nil
}
func (c *Client) VerifyHash(bucket, key, expectedHash string) VerifyHashResponse {
hasher := sha256.New()
err := get(c.cli, bucket, key, func(data []byte) {
hasher.Write(data)
})
if err != nil {
return VerifyHashResponse{Success: false, Error: err.Error()}
}
actualHash := hex.EncodeToString(hasher.Sum(nil))
if actualHash != expectedHash {
return VerifyHashResponse{Success: false, Error: "hash mismatch"}
}
return VerifyHashResponse{Success: true}
}
func (c *Client) CreateBucket(bucket string, params map[string]string) CreateBucketResponse {
var err error
var lockEnabled bool
if lockEnabledStr, ok := params["lock_enabled"]; ok {
if lockEnabled, err = strconv.ParseBool(lockEnabledStr); err != nil {
stats.Report(c.vu, createBucketFails, 1)
return CreateBucketResponse{Success: false, Error: "invalid lock_enabled params"}
}
}
var bucketConfiguration *types.CreateBucketConfiguration
if locationConstraint, ok := params["location_constraint"]; ok {
bucketConfiguration = &types.CreateBucketConfiguration{
LocationConstraint: types.BucketLocationConstraint(locationConstraint),
}
}
start := time.Now()
_, err = c.cli.CreateBucket(c.vu.Context(), &s3.CreateBucketInput{
Bucket: aws.String(bucket),
ACL: types.BucketCannedACL(params["acl"]),
CreateBucketConfiguration: bucketConfiguration,
ObjectLockEnabledForBucket: lockEnabled,
})
if err != nil {
stats.Report(c.vu, createBucketFails, 1)
return CreateBucketResponse{Success: false, Error: err.Error()}
}
var versioning bool
if strVersioned, ok := params["versioning"]; ok {
if versioning, err = strconv.ParseBool(strVersioned); err != nil {
stats.Report(c.vu, createBucketFails, 1)
return CreateBucketResponse{Success: false, Error: err.Error()}
}
}
if versioning {
_, err = c.cli.PutBucketVersioning(context.TODO(), &s3.PutBucketVersioningInput{
Bucket: aws.String(bucket),
VersioningConfiguration: &types.VersioningConfiguration{
Status: types.BucketVersioningStatusEnabled,
},
})
if err != nil {
stats.Report(c.vu, createBucketFails, 1)
return CreateBucketResponse{Success: false, Error: err.Error()}
}
}
stats.Report(c.vu, createBucketSuccess, 1)
stats.Report(c.vu, createBucketDuration, metrics.D(time.Since(start)))
return CreateBucketResponse{Success: true}
}