xk6-frostfs/internal/local/client.go

154 lines
3.7 KiB
Go

package local
import (
"crypto/ecdsa"
"fmt"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
"git.frostfs.info/TrueCloudLab/xk6-frostfs/internal/stats"
"github.com/dop251/goja"
"go.k6.io/k6/js/modules"
"go.k6.io/k6/metrics"
)
type Client struct {
vu modules.VU
key ecdsa.PrivateKey
ng *engine.StorageEngine
}
type PutResponse struct {
Success bool
ObjectID string
Error string
}
type GetResponse struct {
Success bool
Error string
}
type DeleteResponse struct {
Success bool
Error string
}
func (c *Client) Put(containerID string, headers map[string]string, payload goja.ArrayBuffer) PutResponse {
sz := len(payload.Bytes())
attrs := make([]object.Attribute, len(headers))
{
ind := 0
for k, v := range headers {
attrs[ind].SetKey(k)
attrs[ind].SetValue(v)
ind++
}
}
ownerID := &user.ID{}
user.IDFromKey(ownerID, c.key.PublicKey)
obj := object.New()
obj.SetContainerID(mustParseContainerID(containerID))
obj.SetOwnerID(ownerID) // needed for metabase bucket name
obj.SetAttributes(attrs...)
obj.SetPayload(payload.Bytes())
obj.SetPayloadSize(uint64(len(payload.Bytes())))
object.CalculateAndSetPayloadChecksum(obj) // needed for metabase key
id, err := object.CalculateID(obj)
if err != nil {
return PutResponse{Error: fmt.Sprintf("calculating id: %v", err)}
}
obj.SetID(id)
if err := object.CalculateAndSetSignature(c.key, obj); err != nil {
return PutResponse{Error: fmt.Sprintf("calculating signature: %v", err)}
}
var req engine.PutPrm
req.WithObject(obj)
start := time.Now()
if _, err := c.ng.Put(req); err != nil {
stats.Report(c.vu, objPutFails, 1)
return PutResponse{Error: err.Error()}
}
stats.Report(c.vu, objPutTotal, 1)
stats.ReportDataSent(c.vu, float64(sz))
stats.Report(c.vu, objPutDuration, metrics.D(time.Since(start)))
return PutResponse{
Success: true,
ObjectID: id.EncodeToString(),
}
}
func (c *Client) Get(containerID, objectID string) GetResponse {
var addr oid.Address
addr.SetContainer(mustParseContainerID(containerID))
addr.SetObject(mustParseObjectID(objectID))
var req engine.GetPrm
req.WithAddress(addr)
start := time.Now()
resp, err := c.ng.Get(req)
if err != nil {
stats.Report(c.vu, objGetFails, 1)
return GetResponse{Error: err.Error()}
}
stats.Report(c.vu, objGetTotal, 1)
stats.Report(c.vu, objGetDuration, metrics.D(time.Since(start)))
stats.ReportDataReceived(c.vu, float64(len(resp.Object().Payload())))
return GetResponse{Success: true}
}
func (c *Client) Delete(containerID, objectID string) DeleteResponse {
var addr oid.Address
addr.SetContainer(mustParseContainerID(containerID))
addr.SetObject(mustParseObjectID(objectID))
var req engine.DeletePrm
req.WithAddress(addr)
start := time.Now()
if _, err := c.ng.Delete(req); err != nil {
stats.Report(c.vu, objDeleteFails, 1)
return DeleteResponse{Error: err.Error()}
}
stats.Report(c.vu, objDeleteTotal, 1)
stats.Report(c.vu, objDeleteDuration, metrics.D(time.Since(start)))
return DeleteResponse{Success: true}
}
func mustParseContainerID(strContainerID string) cid.ID {
var containerID cid.ID
err := containerID.DecodeString(strContainerID)
if err != nil {
panic(fmt.Sprintf("parsing container id %q: %v", strContainerID, err))
}
return containerID
}
func mustParseObjectID(strObjectID string) oid.ID {
var cliObjectID oid.ID
err := cliObjectID.DecodeString(strObjectID)
if err != nil {
panic(fmt.Sprintf("parsing object id %q: %v", strObjectID, err))
}
return cliObjectID
}