xk6-frostfs/internal/native/client.go
Vladimir Domnich 37e27f6791 [#23] Implement deletion of objects
1. Added simple lock mechanism to reset obj selector. This prevents
   most of concurrency issues when multiple VUs try to reset selector.
2. Added logic to delete objects to grpc and s3 scenarios.
3. Added registry support to http scenario.
4. Deletion logic was not implemented for http scenario, because
   http gateway does not provide web-method to delete objects.

Signed-off-by: Vladimir Domnich <v.domnich@yadro.com>
2022-10-03 17:34:20 +03:00

562 lines
13 KiB
Go

package native
import (
"bytes"
"context"
"crypto/ecdsa"
"crypto/sha256"
"encoding/binary"
"encoding/hex"
"errors"
"fmt"
"strconv"
"time"
"github.com/dop251/goja"
"github.com/nspcc-dev/neo-go/pkg/vm/stackitem"
"github.com/nspcc-dev/neofs-sdk-go/acl"
"github.com/nspcc-dev/neofs-sdk-go/checksum"
"github.com/nspcc-dev/neofs-sdk-go/client"
"github.com/nspcc-dev/neofs-sdk-go/container"
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
"github.com/nspcc-dev/neofs-sdk-go/netmap"
"github.com/nspcc-dev/neofs-sdk-go/object"
"github.com/nspcc-dev/neofs-sdk-go/object/address"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
"github.com/nspcc-dev/neofs-sdk-go/policy"
"github.com/nspcc-dev/neofs-sdk-go/session"
"github.com/nspcc-dev/neofs-sdk-go/user"
"github.com/nspcc-dev/neofs-sdk-go/version"
"github.com/nspcc-dev/tzhash/tz"
"github.com/nspcc-dev/xk6-neofs/internal/stats"
"go.k6.io/k6/js/modules"
"go.k6.io/k6/metrics"
)
type (
Client struct {
vu modules.VU
key ecdsa.PrivateKey
tok session.Object
cli *client.Client
bufsize int
}
PutResponse struct {
Success bool
ObjectID string
Error string
}
DeleteResponse struct {
Success bool
Error string
}
GetResponse struct {
Success bool
Error string
}
VerifyHashResponse struct {
Success bool
Error string
}
PutContainerResponse struct {
Success bool
ContainerID string
Error string
}
PreparedObject struct {
vu modules.VU
key ecdsa.PrivateKey
cli *client.Client
bufsize int
hdr object.Object
payload []byte
}
)
const defaultBufferSize = 64 * 1024
func (c *Client) SetBufferSize(size int) {
if size < 0 {
panic("buffer size must be positive")
}
if size == 0 {
c.bufsize = defaultBufferSize
} else {
c.bufsize = size
}
}
func (c *Client) Put(containerID string, headers map[string]string, payload goja.ArrayBuffer) PutResponse {
cliContainerID := parseContainerID(containerID)
var addr address.Address
addr.SetContainerID(cliContainerID)
tok := c.tok
tok.ForVerb(session.VerbObjectPut)
tok.ApplyTo(addr)
err := tok.Sign(c.key)
if err != nil {
panic(err)
}
var owner user.ID
user.IDFromKey(&owner, c.key.PublicKey)
attrs := make([]object.Attribute, len(headers))
ind := 0
for k, v := range headers {
attrs[ind].SetKey(k)
attrs[ind].SetValue(v)
ind++
}
var o object.Object
o.SetContainerID(cliContainerID)
o.SetOwnerID(&owner)
o.SetAttributes(attrs...)
resp, err := put(c.vu, c.bufsize, c.cli, &tok, &o, payload.Bytes())
if err != nil {
return PutResponse{Success: false, Error: err.Error()}
}
var id oid.ID
resp.ReadStoredObjectID(&id)
return PutResponse{Success: true, ObjectID: id.String()}
}
func (c *Client) Delete(containerID string, objectID string) DeleteResponse {
cliContainerID := parseContainerID(containerID)
cliObjectID := parseObjectID(objectID)
var addr address.Address
addr.SetContainerID(cliContainerID)
addr.SetObjectID(cliObjectID)
tok := c.tok
tok.ForVerb(session.VerbObjectDelete)
tok.ApplyTo(addr)
err := tok.Sign(c.key)
if err != nil {
panic(err)
}
var prm client.PrmObjectDelete
prm.ByID(cliObjectID)
prm.FromContainer(cliContainerID)
prm.WithinSession(tok)
_, err = c.cli.ObjectDelete(c.vu.Context(), prm)
if err != nil {
return DeleteResponse{Success: false, Error: err.Error()}
}
return DeleteResponse{Success: true}
}
func (c *Client) Get(containerID, objectID string) GetResponse {
cliContainerID := parseContainerID(containerID)
cliObjectID := parseObjectID(objectID)
var addr address.Address
addr.SetContainerID(cliContainerID)
addr.SetObjectID(cliObjectID)
tok := c.tok
tok.ForVerb(session.VerbObjectGet)
tok.ApplyTo(addr)
err := tok.Sign(c.key)
if err != nil {
panic(err)
}
stats.Report(c.vu, objGetTotal, 1)
start := time.Now()
var prm client.PrmObjectGet
prm.ByID(cliObjectID)
prm.FromContainer(cliContainerID)
prm.WithinSession(tok)
var objSize = 0
err = get(c.cli, prm, c.vu.Context(), c.bufsize, func(data []byte) {
objSize += len(data)
})
if err != nil {
stats.Report(c.vu, objGetFails, 1)
return GetResponse{Success: false, Error: err.Error()}
}
stats.Report(c.vu, objGetDuration, metrics.D(time.Since(start)))
stats.ReportDataReceived(c.vu, float64(objSize))
return GetResponse{Success: true}
}
func get(
cli *client.Client,
prm client.PrmObjectGet,
ctx context.Context,
bufSize int,
onDataChunk func(chunk []byte),
) error {
var buf = make([]byte, bufSize)
objectReader, err := cli.ObjectGetInit(ctx, prm)
if err != nil {
return err
}
var o object.Object
if !objectReader.ReadHeader(&o) {
if _, err = objectReader.Close(); err != nil {
return err
}
return errors.New("can't read object header")
}
n, _ := objectReader.Read(buf)
for n > 0 {
onDataChunk(buf[:n])
n, _ = objectReader.Read(buf)
}
_, err = objectReader.Close()
if err != nil {
return err
}
return nil
}
func (c *Client) VerifyHash(containerID, objectID, expectedHash string) VerifyHashResponse {
cliContainerID := parseContainerID(containerID)
cliObjectID := parseObjectID(objectID)
var addr address.Address
addr.SetContainerID(cliContainerID)
addr.SetObjectID(cliObjectID)
tok := c.tok
tok.ForVerb(session.VerbObjectGet)
tok.ApplyTo(addr)
err := tok.Sign(c.key)
if err != nil {
panic(err)
}
var prm client.PrmObjectGet
prm.ByID(cliObjectID)
prm.FromContainer(cliContainerID)
prm.WithinSession(tok)
hasher := sha256.New()
err = get(c.cli, prm, c.vu.Context(), c.bufsize, func(data []byte) {
hasher.Write(data)
})
if err != nil {
return VerifyHashResponse{Success: false, Error: err.Error()}
}
actualHash := hex.EncodeToString(hasher.Sum(nil))
if actualHash != expectedHash {
return VerifyHashResponse{Success: true, Error: "hash mismatch"}
}
return VerifyHashResponse{Success: true}
}
func (c *Client) putCnrErrorResponse(err error) PutContainerResponse {
stats.Report(c.vu, cnrPutFails, 1)
return PutContainerResponse{Success: false, Error: err.Error()}
}
func (c *Client) PutContainer(params map[string]string) PutContainerResponse {
stats.Report(c.vu, cnrPutTotal, 1)
opts := []container.Option{
container.WithAttribute(container.AttributeTimestamp, strconv.FormatInt(time.Now().Unix(), 10)),
container.WithOwnerPublicKey(&c.key.PublicKey),
}
if basicACLStr, ok := params["acl"]; ok {
basicACL, err := acl.ParseBasicACL(basicACLStr)
if err != nil {
return c.putCnrErrorResponse(err)
}
opts = append(opts, container.WithCustomBasicACL(basicACL))
}
placementPolicyStr, ok := params["placement_policy"]
if ok {
placementPolicy, err := policy.Parse(placementPolicyStr)
if err != nil {
return c.putCnrErrorResponse(err)
}
opts = append(opts, container.WithPolicy(placementPolicy))
}
containerName, hasName := params["name"]
if hasName {
opts = append(opts, container.WithAttribute(container.AttributeName, containerName))
}
cnr := container.New(opts...)
var err error
var nameScopeGlobal bool
if nameScopeGlobalStr, ok := params["name_scope_global"]; ok {
if nameScopeGlobal, err = strconv.ParseBool(nameScopeGlobalStr); err != nil {
return c.putCnrErrorResponse(fmt.Errorf("invalid name_scope_global param: %w", err))
}
}
if nameScopeGlobal {
if !hasName {
return c.putCnrErrorResponse(errors.New("you must provide container name if name_scope_global param is set"))
}
container.SetNativeName(cnr, containerName)
}
start := time.Now()
var prm client.PrmContainerPut
prm.SetContainer(*cnr)
res, err := c.cli.ContainerPut(c.vu.Context(), prm)
if err != nil {
return c.putCnrErrorResponse(err)
}
var wp waitParams
wp.setDefaults()
if err = c.waitForContainerPresence(c.vu.Context(), res.ID(), &wp); err != nil {
return c.putCnrErrorResponse(err)
}
stats.Report(c.vu, cnrPutDuration, metrics.D(time.Since(start)))
return PutContainerResponse{Success: true, ContainerID: res.ID().EncodeToString()}
}
func (c *Client) Onsite(containerID string, payload goja.ArrayBuffer) PreparedObject {
maxObjectSize, epoch, hhDisabled, err := parseNetworkInfo(c.vu.Context(), c.cli)
if err != nil {
panic(err)
}
data := payload.Bytes()
ln := len(data)
if ln > int(maxObjectSize) {
// not sure if load test needs object transformation
// with parent-child relation; if needs, then replace
// this code with the usage of object transformer from
// neofs-loader or distribution.
msg := fmt.Sprintf("payload size %d is bigger than network limit %d", ln, maxObjectSize)
panic(msg)
}
cliContainerID := parseContainerID(containerID)
var owner user.ID
user.IDFromKey(&owner, c.key.PublicKey)
apiVersion := version.Current()
obj := object.New()
obj.SetVersion(&apiVersion)
obj.SetType(object.TypeRegular)
obj.SetContainerID(cliContainerID)
obj.SetOwnerID(&owner)
obj.SetPayloadSize(uint64(ln))
obj.SetCreationEpoch(epoch)
var sha, hh checksum.Checksum
sha.SetSHA256(sha256.Sum256(data))
obj.SetPayloadChecksum(sha)
if !hhDisabled {
hh.SetTillichZemor(tz.Sum(data))
obj.SetPayloadHomomorphicHash(hh)
}
return PreparedObject{
vu: c.vu,
key: c.key,
cli: c.cli,
bufsize: c.bufsize,
hdr: *obj,
payload: data,
}
}
func (p PreparedObject) Put(headers map[string]string) PutResponse {
obj := p.hdr
attrs := make([]object.Attribute, len(headers))
ind := 0
for k, v := range headers {
attrs[ind].SetKey(k)
attrs[ind].SetValue(v)
ind++
}
obj.SetAttributes(attrs...)
id, err := object.CalculateID(&obj)
if err != nil {
return PutResponse{Success: false, Error: err.Error()}
}
obj.SetID(id)
if err = object.CalculateAndSetSignature(p.key, &obj); err != nil {
return PutResponse{Success: false, Error: err.Error()}
}
_, err = put(p.vu, p.bufsize, p.cli, nil, &obj, p.payload)
if err != nil {
return PutResponse{Success: false, Error: err.Error()}
}
return PutResponse{Success: true, ObjectID: id.String()}
}
func put(vu modules.VU, bufSize int, cli *client.Client, tok *session.Object,
hdr *object.Object, payload []byte) (*client.ResObjectPut, error) {
buf := make([]byte, bufSize)
rdr := bytes.NewReader(payload)
sz := rdr.Size()
// starting upload
stats.Report(vu, objPutTotal, 1)
start := time.Now()
objectWriter, err := cli.ObjectPutInit(vu.Context(), client.PrmObjectPutInit{})
if err != nil {
stats.Report(vu, objPutFails, 1)
return nil, err
}
if tok != nil {
objectWriter.WithinSession(*tok)
}
if !objectWriter.WriteHeader(*hdr) {
stats.Report(vu, objPutFails, 1)
_, err = objectWriter.Close()
return nil, err
}
n, _ := rdr.Read(buf)
for n > 0 {
if !objectWriter.WritePayloadChunk(buf[:n]) {
break
}
n, _ = rdr.Read(buf)
}
resp, err := objectWriter.Close()
if err != nil {
stats.Report(vu, objPutFails, 1)
return nil, err
}
stats.ReportDataSent(vu, float64(sz))
stats.Report(vu, objPutDuration, metrics.D(time.Since(start)))
return resp, err
}
func parseNetworkInfo(ctx context.Context, cli *client.Client) (maxObjSize, epoch uint64, hhDisabled bool, err error) {
ni, err := cli.NetworkInfo(ctx, client.PrmNetworkInfo{})
if err != nil {
return 0, 0, false, err
}
epoch = ni.Info().CurrentEpoch()
err = errors.New("network configuration misses max object size value")
ni.Info().NetworkConfig().IterateParameters(func(parameter *netmap.NetworkParameter) bool {
switch string(parameter.Key()) {
case "MaxObjectSize":
buf := make([]byte, 8)
copy(buf[:], parameter.Value())
maxObjSize = binary.LittleEndian.Uint64(buf)
err = nil
case "HomomorphicHashingDisabled":
arr := stackitem.NewByteArray(parameter.Value())
hhDisabled, err = arr.TryBool()
if err != nil {
return true
}
}
return false
})
return maxObjSize, epoch, hhDisabled, err
}
type waitParams struct {
timeout time.Duration
pollInterval time.Duration
}
func (x *waitParams) setDefaults() {
x.timeout = 120 * time.Second
x.pollInterval = 5 * time.Second
}
func (c *Client) waitForContainerPresence(ctx context.Context, cnrID *cid.ID, wp *waitParams) error {
return waitFor(ctx, wp, func(ctx context.Context) bool {
var prm client.PrmContainerGet
if cnrID != nil {
prm.SetContainer(*cnrID)
}
_, err := c.cli.ContainerGet(ctx, prm)
return err == nil
})
}
func waitFor(ctx context.Context, params *waitParams, condition func(context.Context) bool) error {
wctx, cancel := context.WithTimeout(ctx, params.timeout)
defer cancel()
ticker := time.NewTimer(params.pollInterval)
defer ticker.Stop()
wdone := wctx.Done()
done := ctx.Done()
for {
select {
case <-done:
return ctx.Err()
case <-wdone:
return wctx.Err()
case <-ticker.C:
if condition(ctx) {
return nil
}
ticker.Reset(params.pollInterval)
}
}
}
func parseContainerID(strContainerID string) cid.ID {
var containerID cid.ID
err := containerID.DecodeString(strContainerID)
if err != nil {
panic(err)
}
return containerID
}
func parseObjectID(strObjectID string) oid.ID {
var cliObjectID oid.ID
err := cliObjectID.DecodeString(strObjectID)
if err != nil {
panic(err)
}
return cliObjectID
}