forked from TrueCloudLab/xk6-frostfs
154 lines
3.7 KiB
Go
154 lines
3.7 KiB
Go
|
package local
|
||
|
|
||
|
import (
|
||
|
"crypto/ecdsa"
|
||
|
"fmt"
|
||
|
"time"
|
||
|
|
||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
|
||
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
|
||
|
"git.frostfs.info/TrueCloudLab/xk6-frostfs/internal/stats"
|
||
|
"github.com/dop251/goja"
|
||
|
"go.k6.io/k6/js/modules"
|
||
|
"go.k6.io/k6/metrics"
|
||
|
)
|
||
|
|
||
|
type Client struct {
|
||
|
vu modules.VU
|
||
|
key ecdsa.PrivateKey
|
||
|
ng *engine.StorageEngine
|
||
|
}
|
||
|
|
||
|
type PutResponse struct {
|
||
|
Success bool
|
||
|
ObjectID string
|
||
|
Error string
|
||
|
}
|
||
|
|
||
|
type GetResponse struct {
|
||
|
Success bool
|
||
|
Error string
|
||
|
}
|
||
|
|
||
|
type DeleteResponse struct {
|
||
|
Success bool
|
||
|
Error string
|
||
|
}
|
||
|
|
||
|
func (c *Client) Put(containerID string, headers map[string]string, payload goja.ArrayBuffer) PutResponse {
|
||
|
sz := len(payload.Bytes())
|
||
|
|
||
|
attrs := make([]object.Attribute, len(headers))
|
||
|
{
|
||
|
ind := 0
|
||
|
for k, v := range headers {
|
||
|
attrs[ind].SetKey(k)
|
||
|
attrs[ind].SetValue(v)
|
||
|
ind++
|
||
|
}
|
||
|
}
|
||
|
|
||
|
ownerID := &user.ID{}
|
||
|
user.IDFromKey(ownerID, c.key.PublicKey)
|
||
|
|
||
|
obj := object.New()
|
||
|
obj.SetContainerID(mustParseContainerID(containerID))
|
||
|
obj.SetOwnerID(ownerID) // needed for metabase bucket name
|
||
|
obj.SetAttributes(attrs...)
|
||
|
obj.SetPayload(payload.Bytes())
|
||
|
obj.SetPayloadSize(uint64(len(payload.Bytes())))
|
||
|
object.CalculateAndSetPayloadChecksum(obj) // needed for metabase key
|
||
|
|
||
|
id, err := object.CalculateID(obj)
|
||
|
if err != nil {
|
||
|
return PutResponse{Error: fmt.Sprintf("calculating id: %v", err)}
|
||
|
}
|
||
|
obj.SetID(id)
|
||
|
|
||
|
if err := object.CalculateAndSetSignature(c.key, obj); err != nil {
|
||
|
return PutResponse{Error: fmt.Sprintf("calculating signature: %v", err)}
|
||
|
}
|
||
|
|
||
|
var req engine.PutPrm
|
||
|
req.WithObject(obj)
|
||
|
|
||
|
start := time.Now()
|
||
|
|
||
|
if _, err := c.ng.Put(req); err != nil {
|
||
|
stats.Report(c.vu, objPutFails, 1)
|
||
|
return PutResponse{Error: err.Error()}
|
||
|
}
|
||
|
|
||
|
stats.Report(c.vu, objPutTotal, 1)
|
||
|
stats.ReportDataSent(c.vu, float64(sz))
|
||
|
stats.Report(c.vu, objPutDuration, metrics.D(time.Since(start)))
|
||
|
|
||
|
return PutResponse{
|
||
|
Success: true,
|
||
|
ObjectID: id.EncodeToString(),
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (c *Client) Get(containerID, objectID string) GetResponse {
|
||
|
var addr oid.Address
|
||
|
addr.SetContainer(mustParseContainerID(containerID))
|
||
|
addr.SetObject(mustParseObjectID(objectID))
|
||
|
|
||
|
var req engine.GetPrm
|
||
|
req.WithAddress(addr)
|
||
|
|
||
|
start := time.Now()
|
||
|
resp, err := c.ng.Get(req)
|
||
|
if err != nil {
|
||
|
stats.Report(c.vu, objGetFails, 1)
|
||
|
return GetResponse{Error: err.Error()}
|
||
|
}
|
||
|
|
||
|
stats.Report(c.vu, objGetTotal, 1)
|
||
|
stats.Report(c.vu, objGetDuration, metrics.D(time.Since(start)))
|
||
|
stats.ReportDataReceived(c.vu, float64(len(resp.Object().Payload())))
|
||
|
|
||
|
return GetResponse{Success: true}
|
||
|
}
|
||
|
|
||
|
func (c *Client) Delete(containerID, objectID string) DeleteResponse {
|
||
|
var addr oid.Address
|
||
|
addr.SetContainer(mustParseContainerID(containerID))
|
||
|
addr.SetObject(mustParseObjectID(objectID))
|
||
|
|
||
|
var req engine.DeletePrm
|
||
|
req.WithAddress(addr)
|
||
|
|
||
|
start := time.Now()
|
||
|
if _, err := c.ng.Delete(req); err != nil {
|
||
|
stats.Report(c.vu, objDeleteFails, 1)
|
||
|
return DeleteResponse{Error: err.Error()}
|
||
|
}
|
||
|
|
||
|
stats.Report(c.vu, objDeleteTotal, 1)
|
||
|
stats.Report(c.vu, objDeleteDuration, metrics.D(time.Since(start)))
|
||
|
|
||
|
return DeleteResponse{Success: true}
|
||
|
}
|
||
|
|
||
|
func mustParseContainerID(strContainerID string) cid.ID {
|
||
|
var containerID cid.ID
|
||
|
err := containerID.DecodeString(strContainerID)
|
||
|
if err != nil {
|
||
|
panic(fmt.Sprintf("parsing container id %q: %v", strContainerID, err))
|
||
|
}
|
||
|
return containerID
|
||
|
}
|
||
|
|
||
|
func mustParseObjectID(strObjectID string) oid.ID {
|
||
|
var cliObjectID oid.ID
|
||
|
err := cliObjectID.DecodeString(strObjectID)
|
||
|
if err != nil {
|
||
|
panic(fmt.Sprintf("parsing object id %q: %v", strObjectID, err))
|
||
|
}
|
||
|
return cliObjectID
|
||
|
}
|