package s3 import ( "context" "crypto/sha256" "encoding/hex" "fmt" "strconv" "time" "git.frostfs.info/TrueCloudLab/xk6-frostfs/internal/datagen" "git.frostfs.info/TrueCloudLab/xk6-frostfs/internal/stats" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/feature/s3/manager" "github.com/aws/aws-sdk-go-v2/service/s3" "github.com/aws/aws-sdk-go-v2/service/s3/types" "go.k6.io/k6/js/modules" "go.k6.io/k6/metrics" ) type ( Client struct { vu modules.VU cli *s3.Client } PutResponse struct { Success bool Error string } DeleteResponse struct { Success bool Error string } GetResponse struct { Success bool Error string } CreateBucketResponse struct { Success bool Error string } VerifyHashResponse struct { Success bool Error string } ) func (c *Client) Put(bucket, key string, payload datagen.Payload) PutResponse { rdr := payload.Reader() sz := payload.Size() start := time.Now() _, err := c.cli.PutObject(c.vu.Context(), &s3.PutObjectInput{ Bucket: aws.String(bucket), Key: aws.String(key), Body: rdr, }) if err != nil { stats.Report(c.vu, objPutFails, 1) return PutResponse{Success: false, Error: err.Error()} } stats.Report(c.vu, objPutSuccess, 1) stats.ReportDataSent(c.vu, float64(sz)) stats.Report(c.vu, objPutDuration, metrics.D(time.Since(start))) stats.Report(c.vu, objPutData, float64(sz)) return PutResponse{Success: true} } const multipartUploadMinPartSize = 5 * 1024 * 1024 // 5MB func (c *Client) Multipart(bucket, key string, objPartSize, concurrency int, payload datagen.Payload) PutResponse { if objPartSize < multipartUploadMinPartSize { stats.Report(c.vu, objPutFails, 1) return PutResponse{Success: false, Error: fmt.Sprintf("part size '%d' must be greater than '%d'(5 MB)", objPartSize, multipartUploadMinPartSize)} } start := time.Now() uploader := manager.NewUploader(c.cli, func(u *manager.Uploader) { u.PartSize = int64(objPartSize) u.Concurrency = concurrency }) payloadReader := payload.Reader() sz := payload.Size() _, err := uploader.Upload(c.vu.Context(), &s3.PutObjectInput{ Bucket: aws.String(bucket), Key: aws.String(key), Body: payloadReader, }) if err != nil { stats.Report(c.vu, objPutFails, 1) return PutResponse{Success: false, Error: err.Error()} } stats.Report(c.vu, objPutSuccess, 1) stats.ReportDataSent(c.vu, float64(sz)) stats.Report(c.vu, objPutDuration, metrics.D(time.Since(start))) stats.Report(c.vu, objPutData, float64(sz)) return PutResponse{Success: true} } func (c *Client) Delete(bucket, key string) DeleteResponse { start := time.Now() _, err := c.cli.DeleteObject(c.vu.Context(), &s3.DeleteObjectInput{ Bucket: aws.String(bucket), Key: aws.String(key), }) if err != nil { stats.Report(c.vu, objDeleteFails, 1) return DeleteResponse{Success: false, Error: err.Error()} } stats.Report(c.vu, objDeleteSuccess, 1) stats.Report(c.vu, objDeleteDuration, metrics.D(time.Since(start))) return DeleteResponse{Success: true} } func (c *Client) Get(bucket, key string) GetResponse { start := time.Now() objSize := 0 err := get(c.cli, bucket, key, func(chunk []byte) { objSize += len(chunk) }) if err != nil { stats.Report(c.vu, objGetFails, 1) return GetResponse{Success: false, Error: err.Error()} } stats.Report(c.vu, objGetSuccess, 1) stats.Report(c.vu, objGetDuration, metrics.D(time.Since(start))) stats.ReportDataReceived(c.vu, float64(objSize)) stats.Report(c.vu, objGetData, float64(objSize)) return GetResponse{Success: true} } // DeleteObjectVersion deletes object version with specified versionID. // If version argument is empty, deletes all versions and delete-markers of specified object. func (c *Client) DeleteObjectVersion(bucket, key, version string) DeleteResponse { var toDelete []types.ObjectIdentifier if version != "" { toDelete = append(toDelete, types.ObjectIdentifier{ Key: aws.String(key), VersionId: aws.String(version), }) } else { versions, err := c.cli.ListObjectVersions(c.vu.Context(), &s3.ListObjectVersionsInput{ Bucket: aws.String(bucket), Prefix: aws.String(key), }) if err != nil { stats.Report(c.vu, objDeleteFails, 1) return DeleteResponse{Success: false, Error: err.Error()} } toDelete = filterObjectVersions(versions, key) } if len(toDelete) == 0 { return c.Delete(bucket, key) } else { _, err := c.cli.DeleteObjects(c.vu.Context(), &s3.DeleteObjectsInput{ Bucket: aws.String(bucket), Delete: &types.Delete{ Objects: toDelete, Quiet: true, }, }) if err != nil { stats.Report(c.vu, objDeleteFails, 1) return DeleteResponse{Success: false, Error: err.Error()} } } return DeleteResponse{Success: true} } func filterObjectVersions(versions *s3.ListObjectVersionsOutput, key string) []types.ObjectIdentifier { var result []types.ObjectIdentifier for _, v := range versions.Versions { if *v.Key == key { result = append(result, types.ObjectIdentifier{ Key: v.Key, VersionId: v.VersionId, }) } } for _, marker := range versions.DeleteMarkers { if *marker.Key == key { result = append(result, types.ObjectIdentifier{ Key: marker.Key, VersionId: marker.VersionId, }) } } return result } func get( c *s3.Client, bucket string, key string, onDataChunk func(chunk []byte), ) error { var buf = make([]byte, 4*1024) obj, err := c.GetObject(context.Background(), &s3.GetObjectInput{ Bucket: aws.String(bucket), Key: aws.String(key), }) if err != nil { return err } for { n, err := obj.Body.Read(buf) if n > 0 { onDataChunk(buf[:n]) } if err != nil { break } } return nil } func (c *Client) VerifyHash(bucket, key, expectedHash string) VerifyHashResponse { hasher := sha256.New() err := get(c.cli, bucket, key, func(data []byte) { hasher.Write(data) }) if err != nil { return VerifyHashResponse{Success: false, Error: err.Error()} } actualHash := hex.EncodeToString(hasher.Sum(nil)) if actualHash != expectedHash { return VerifyHashResponse{Success: false, Error: "hash mismatch"} } return VerifyHashResponse{Success: true} } func (c *Client) CreateBucket(bucket string, params map[string]string) CreateBucketResponse { var err error var lockEnabled bool if lockEnabledStr, ok := params["lock_enabled"]; ok { if lockEnabled, err = strconv.ParseBool(lockEnabledStr); err != nil { stats.Report(c.vu, createBucketFails, 1) return CreateBucketResponse{Success: false, Error: "invalid lock_enabled params"} } } var bucketConfiguration *types.CreateBucketConfiguration if locationConstraint, ok := params["location_constraint"]; ok { bucketConfiguration = &types.CreateBucketConfiguration{ LocationConstraint: types.BucketLocationConstraint(locationConstraint), } } start := time.Now() _, err = c.cli.CreateBucket(c.vu.Context(), &s3.CreateBucketInput{ Bucket: aws.String(bucket), ACL: types.BucketCannedACL(params["acl"]), CreateBucketConfiguration: bucketConfiguration, ObjectLockEnabledForBucket: lockEnabled, }) if err != nil { stats.Report(c.vu, createBucketFails, 1) return CreateBucketResponse{Success: false, Error: err.Error()} } var versioning bool if strVersioned, ok := params["versioning"]; ok { if versioning, err = strconv.ParseBool(strVersioned); err != nil { stats.Report(c.vu, createBucketFails, 1) return CreateBucketResponse{Success: false, Error: err.Error()} } } if versioning { _, err = c.cli.PutBucketVersioning(context.TODO(), &s3.PutBucketVersioningInput{ Bucket: aws.String(bucket), VersioningConfiguration: &types.VersioningConfiguration{ Status: types.BucketVersioningStatusEnabled, }, }) if err != nil { stats.Report(c.vu, createBucketFails, 1) return CreateBucketResponse{Success: false, Error: err.Error()} } } stats.Report(c.vu, createBucketSuccess, 1) stats.Report(c.vu, createBucketDuration, metrics.D(time.Since(start))) return CreateBucketResponse{Success: true} }