xk6-frostfs/internal/s3local/client.go

137 lines
3.3 KiB
Go
Raw Normal View History

package s3local
import (
"time"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
"git.frostfs.info/TrueCloudLab/xk6-frostfs/internal/datagen"
"git.frostfs.info/TrueCloudLab/xk6-frostfs/internal/local"
"git.frostfs.info/TrueCloudLab/xk6-frostfs/internal/stats"
"go.k6.io/k6/js/modules"
"go.k6.io/k6/metrics"
)
type Client struct {
vu modules.VU
l layer.Client
ownerID *user.ID
resolver layer.BucketResolver
limiter local.Limiter
}
type (
SuccessOrErrorResponse struct {
Success bool
Abort bool
Error string
}
CreateBucketResponse SuccessOrErrorResponse
PutResponse SuccessOrErrorResponse
DeleteResponse SuccessOrErrorResponse
GetResponse SuccessOrErrorResponse
)
func (c *Client) Put(bucket, key string, payload datagen.Payload) PutResponse {
if c.limiter.IsFull() {
return PutResponse{
Success: false,
Abort: true,
Error: "engine size limit reached",
}
}
cid, err := c.resolver.Resolve(c.vu.Context(), bucket)
if err != nil {
stats.Report(c.vu, objPutFails, 1)
return PutResponse{Error: err.Error()}
}
prm := &layer.PutObjectParams{
BktInfo: &data.BucketInfo{
Name: bucket,
CID: cid,
Owner: *c.ownerID,
Created: time.Now(),
},
Header: map[string]string{},
Object: key,
Size: uint64(payload.Size()),
Reader: payload.Reader(),
}
start := time.Now()
if _, err := c.l.PutObject(c.vu.Context(), prm); err != nil {
stats.Report(c.vu, objPutFails, 1)
return PutResponse{Error: err.Error()}
}
stats.Report(c.vu, objPutDuration, metrics.D(time.Since(start)))
stats.Report(c.vu, objPutSuccess, 1)
stats.ReportDataSent(c.vu, float64(prm.Size))
stats.Report(c.vu, objPutData, float64(prm.Size))
return PutResponse{Success: true}
}
func (c *Client) Get(bucket, key string) GetResponse {
cid, err := c.resolver.Resolve(c.vu.Context(), bucket)
if err != nil {
stats.Report(c.vu, objGetFails, 1)
return GetResponse{Error: err.Error()}
}
start := time.Now()
bktInfo := &data.BucketInfo{
Name: bucket,
CID: cid,
Owner: *c.ownerID,
}
headPrm := &layer.HeadObjectParams{
BktInfo: bktInfo,
Object: key,
}
extInfo, err := c.l.GetExtendedObjectInfo(c.vu.Context(), headPrm)
if err != nil {
stats.Report(c.vu, objGetFails, 1)
return GetResponse{Error: err.Error()}
}
wr := &recvDataReporter{}
getPrm := &layer.GetObjectParams{
BucketInfo: bktInfo,
ObjectInfo: extInfo.ObjectInfo,
Range: &layer.RangeParams{
Start: 0,
End: uint64(extInfo.ObjectInfo.Size),
},
}
objPayload, err := c.l.GetObject(c.vu.Context(), getPrm)
if err != nil {
stats.Report(c.vu, objGetFails, 1)
return GetResponse{Error: err.Error()}
}
err = objPayload.StreamTo(wr)
if err != nil {
stats.Report(c.vu, objGetFails, 1)
return GetResponse{Error: err.Error()}
}
stats.Report(c.vu, objGetDuration, metrics.D(time.Since(start)))
stats.Report(c.vu, objGetSuccess, 1)
stats.ReportDataReceived(c.vu, wr.total)
stats.Report(c.vu, objGetData, wr.total)
return GetResponse{Success: true}
}
type recvDataReporter struct{ total float64 }
func (r *recvDataReporter) Write(p []byte) (int, error) {
r.total += float64(len(p))
return len(p), nil
}