xk6-frostfs/internal/s3local/client.go
Dmitrii Stepanov bc47d66316
All checks were successful
DCO action / DCO (pull_request) Successful in 1m8s
Tests and linters / Tests (1.21) (pull_request) Successful in 1m49s
Tests and linters / Tests (1.20) (pull_request) Successful in 2m1s
Tests and linters / Tests with -race (pull_request) Successful in 3m7s
[#106] xk6: Allow to set max total size in local scenarios
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2023-12-15 14:07:35 +03:00

130 lines
3 KiB
Go

package s3local
import (
"bytes"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
"git.frostfs.info/TrueCloudLab/xk6-frostfs/internal/local"
"git.frostfs.info/TrueCloudLab/xk6-frostfs/internal/stats"
"github.com/dop251/goja"
"go.k6.io/k6/js/modules"
"go.k6.io/k6/metrics"
)
type Client struct {
vu modules.VU
l layer.Client
ownerID *user.ID
resolver layer.BucketResolver
limiter local.Limiter
}
type (
SuccessOrErrorResponse struct {
Success bool
Abort bool
Error string
}
CreateBucketResponse SuccessOrErrorResponse
PutResponse SuccessOrErrorResponse
DeleteResponse SuccessOrErrorResponse
GetResponse SuccessOrErrorResponse
)
func (c *Client) Put(bucket, key string, payload goja.ArrayBuffer) PutResponse {
if c.limiter.IsFull() {
return PutResponse{
Success: false,
Abort: true,
Error: "engine size limit reached",
}
}
cid, err := c.resolver.Resolve(c.vu.Context(), bucket)
if err != nil {
stats.Report(c.vu, objPutFails, 1)
return PutResponse{Error: err.Error()}
}
prm := &layer.PutObjectParams{
BktInfo: &data.BucketInfo{
Name: bucket,
CID: cid,
Owner: *c.ownerID,
Created: time.Now(),
},
Header: map[string]string{},
Object: key,
Size: int64(len(payload.Bytes())),
Reader: bytes.NewReader(payload.Bytes()),
}
start := time.Now()
if _, err := c.l.PutObject(c.vu.Context(), prm); err != nil {
stats.Report(c.vu, objPutFails, 1)
return PutResponse{Error: err.Error()}
}
stats.Report(c.vu, objPutDuration, metrics.D(time.Since(start)))
stats.Report(c.vu, objPutTotal, 1)
stats.ReportDataSent(c.vu, float64(prm.Size))
return PutResponse{Success: true}
}
func (c *Client) Get(bucket, key string) GetResponse {
cid, err := c.resolver.Resolve(c.vu.Context(), bucket)
if err != nil {
stats.Report(c.vu, objGetFails, 1)
return GetResponse{Error: err.Error()}
}
start := time.Now()
bktInfo := &data.BucketInfo{
Name: bucket,
CID: cid,
Owner: *c.ownerID,
}
headPrm := &layer.HeadObjectParams{
BktInfo: bktInfo,
Object: key,
}
extInfo, err := c.l.GetExtendedObjectInfo(c.vu.Context(), headPrm)
if err != nil {
stats.Report(c.vu, objGetFails, 1)
return GetResponse{Error: err.Error()}
}
wr := &recvDataReporter{}
getPrm := &layer.GetObjectParams{
BucketInfo: bktInfo,
ObjectInfo: extInfo.ObjectInfo,
Range: &layer.RangeParams{
Start: 0,
End: uint64(extInfo.ObjectInfo.Size),
},
Writer: wr,
}
if err := c.l.GetObject(c.vu.Context(), getPrm); err != nil {
stats.Report(c.vu, objGetFails, 1)
return GetResponse{Error: err.Error()}
}
stats.Report(c.vu, objGetDuration, metrics.D(time.Since(start)))
stats.Report(c.vu, objGetTotal, 1)
stats.ReportDataReceived(c.vu, wr.total)
return GetResponse{Success: true}
}
type recvDataReporter struct{ total float64 }
func (r *recvDataReporter) Write(p []byte) (int, error) {
r.total += float64(len(p))
return len(p), nil
}