From 5d77a526d042b59112b8934a14f8cad424408a29 Mon Sep 17 00:00:00 2001 From: Denis Kirillov Date: Fri, 17 Jun 2022 15:50:24 +0300 Subject: [PATCH] [#11] Add container creation Signed-off-by: Denis Kirillov --- README.md | 6 +- examples/native.js | 34 +++++++--- go.mod | 1 + go.sum | 1 + internal/native/client.go | 133 +++++++++++++++++++++++++++++++++++++- internal/native/native.go | 11 +++- 6 files changed, 171 insertions(+), 15 deletions(-) diff --git a/README.md b/README.md index 7408521..a683004 100644 --- a/README.md +++ b/README.md @@ -44,7 +44,7 @@ xk6 build --with github.com/nspcc-dev/xk6-neofs=. Create native client with `connect` method. Arguments: - neofs storage node endpoint -- WIF (empty value produces random key) +- hex encoded private key (empty value produces random key) ```js import native from 'k6/x/neofs/native'; @@ -52,6 +52,10 @@ const neofs_cli = native.connect("s01.neofs.devenv:8080", "") ``` ### Methods +- `putContainer(params)`. The `params` is a dictionary (e.g. + `{acl:'public-read-write',placement_policy:'REP 3',name:'container-name',name_global_scope:'false'}`). + Returns dictionary with `success` + boolean flag, `container_id` string, and `error` string. - `setBufferSize(size)`. Sets internal buffer size for data upload and download. Default is 64 KiB. - `put(container_id, headers, payload)`. Returns dictionary with `success` diff --git a/examples/native.js b/examples/native.js index 27f1f53..92e4475 100644 --- a/examples/native.js +++ b/examples/native.js @@ -1,23 +1,39 @@ -import { uuidv4 } from 'https://jslib.k6.io/k6-utils/1.2.0/index.js'; +import {uuidv4} from 'https://jslib.k6.io/k6-utils/1.2.0/index.js'; +import {fail} from "k6"; import native from 'k6/x/neofs/native'; const payload = open('../go.sum', 'b'); -const container = "AjSxSNNXbJUDPqqKYm1VbFVDGCakbpUNH8aGjPmGAH3B" -const neofs_cli = native.connect("s01.neofs.devenv:8080", "") +const neofs_cli = native.connect("s01.neofs.devenv:8080", "1dd37fba80fec4e6a6f13fd708d8dcb3b29def768017052f6c930fa1c5d90bbb") export const options = { stages: [ - { duration: '30s', target: 10 }, + {duration: '30s', target: 10}, ], }; -export default function () { - let headers = { - 'unique_header': uuidv4() +export function setup() { + const params = { + acl: 'public-read-write', + placement_policy: 'REP 3', + name: 'container-name', + name_global_scope: 'false' } - let resp = neofs_cli.put(container, headers, payload) + + const res = neofs_cli.putContainer(params) + if (!res.success) { + fail(res.error) + } + console.info("created container", res.container_id) + return {container_id: res.container_id} +} + +export default function (data) { + let headers = { + 'unique_header': uuidv4() + } + let resp = neofs_cli.put(data.container_id, headers, payload) if (resp.success) { - neofs_cli.get(container, resp.object_id) + neofs_cli.get(data.container_id, resp.object_id) } else { console.log(resp.error) } diff --git a/go.mod b/go.mod index 3028f95..531e504 100644 --- a/go.mod +++ b/go.mod @@ -15,6 +15,7 @@ require ( ) require ( + github.com/antlr/antlr4/runtime/Go/antlr v0.0.0-20210521073959-f0d4d129b7f1 // indirect github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.4.1 // indirect github.com/aws/aws-sdk-go-v2/credentials v1.12.0 // indirect github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.12.4 // indirect diff --git a/go.sum b/go.sum index 4791028..3f3ff6d 100644 --- a/go.sum +++ b/go.sum @@ -25,6 +25,7 @@ github.com/andybalholm/brotli v1.0.4 h1:V7DdXeJtZscaqfNuAdSRuRFzuiKlHSC/Zh3zl9qY github.com/andybalholm/brotli v1.0.4/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig= github.com/andybalholm/cascadia v1.3.1/go.mod h1:R4bJ1UQfqADjvDa4P6HZHLh/3OxWWEqc0Sk8XGwHqvA= github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= +github.com/antlr/antlr4/runtime/Go/antlr v0.0.0-20210521073959-f0d4d129b7f1 h1:zFRi26YWd7NIorBXe8UkevRl0dIvk/AnXHWaAiZG+Yk= github.com/antlr/antlr4/runtime/Go/antlr v0.0.0-20210521073959-f0d4d129b7f1/go.mod h1:F7bn7fEU90QkQ3tnmaTx3LTKLEDqnwWODIYppRQ5hnY= github.com/aws/aws-sdk-go-v2 v1.16.3 h1:0W1TSJ7O6OzwuEvIXAtJGvOeQ0SGAhcpxPN2/NK5EhM= github.com/aws/aws-sdk-go-v2 v1.16.3/go.mod h1:ytwTPBG6fXTZLxxeeCCWj2/EMYp/xDUgX+OET6TLNNU= diff --git a/internal/native/client.go b/internal/native/client.go index f02ffe2..dec5912 100644 --- a/internal/native/client.go +++ b/internal/native/client.go @@ -8,17 +8,21 @@ import ( "encoding/binary" "errors" "fmt" + "strconv" "time" "github.com/dop251/goja" "github.com/nspcc-dev/neo-go/pkg/vm/stackitem" + "github.com/nspcc-dev/neofs-sdk-go/acl" "github.com/nspcc-dev/neofs-sdk-go/checksum" "github.com/nspcc-dev/neofs-sdk-go/client" + "github.com/nspcc-dev/neofs-sdk-go/container" cid "github.com/nspcc-dev/neofs-sdk-go/container/id" "github.com/nspcc-dev/neofs-sdk-go/netmap" "github.com/nspcc-dev/neofs-sdk-go/object" "github.com/nspcc-dev/neofs-sdk-go/object/address" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" + "github.com/nspcc-dev/neofs-sdk-go/policy" "github.com/nspcc-dev/neofs-sdk-go/session" "github.com/nspcc-dev/neofs-sdk-go/user" "github.com/nspcc-dev/neofs-sdk-go/version" @@ -48,6 +52,12 @@ type ( Error string } + PutContainerResponse struct { + Success bool + ContainerID string + Error string + } + PreparedObject struct { vu modules.VU key ecdsa.PrivateKey @@ -164,8 +174,11 @@ func (c *Client) Get(inputContainerID, inputObjectID string) GetResponse { var o object.Object if !objectReader.ReadHeader(&o) { stats.Report(c.vu, objGetFails, 1) - _, err := objectReader.Close() - return GetResponse{Success: false, Error: err.Error()} + var errorStr string + if _, err = objectReader.Close(); err != nil { + errorStr = err.Error() + } + return GetResponse{Success: false, Error: errorStr} } n, _ := objectReader.Read(buf) @@ -185,6 +198,78 @@ func (c *Client) Get(inputContainerID, inputObjectID string) GetResponse { return GetResponse{Success: true} } +func (c *Client) putCnrErrorResponse(err error) PutContainerResponse { + stats.Report(c.vu, cnrPutFails, 1) + return PutContainerResponse{Success: false, Error: err.Error()} +} + +func (c *Client) PutContainer(params map[string]string) PutContainerResponse { + stats.Report(c.vu, cnrPutTotal, 1) + + opts := []container.Option{ + container.WithAttribute(container.AttributeTimestamp, strconv.FormatInt(time.Now().Unix(), 10)), + container.WithOwnerPublicKey(&c.key.PublicKey), + } + + if basicACLStr, ok := params["acl"]; ok { + basicACL, err := acl.ParseBasicACL(basicACLStr) + if err != nil { + return c.putCnrErrorResponse(err) + } + opts = append(opts, container.WithCustomBasicACL(basicACL)) + } + + placementPolicyStr, ok := params["placement_policy"] + if ok { + placementPolicy, err := policy.Parse(placementPolicyStr) + if err != nil { + return c.putCnrErrorResponse(err) + } + opts = append(opts, container.WithPolicy(placementPolicy)) + } + + containerName, hasName := params["name"] + if hasName { + opts = append(opts, container.WithAttribute(container.AttributeName, containerName)) + } + + cnr := container.New(opts...) + + var err error + var nameScopeGlobal bool + if nameScopeGlobalStr, ok := params["name_scope_global"]; ok { + if nameScopeGlobal, err = strconv.ParseBool(nameScopeGlobalStr); err != nil { + return c.putCnrErrorResponse(fmt.Errorf("invalid name_scope_global param: %w", err)) + } + } + + if nameScopeGlobal { + if !hasName { + return c.putCnrErrorResponse(errors.New("you must provide container name if name_scope_global param is set")) + } + container.SetNativeName(cnr, containerName) + } + + start := time.Now() + var prm client.PrmContainerPut + prm.SetContainer(*cnr) + + res, err := c.cli.ContainerPut(c.vu.Context(), prm) + if err != nil { + return c.putCnrErrorResponse(err) + } + + var wp waitParams + wp.setDefaults() + + if err = c.waitForContainerPresence(c.vu.Context(), res.ID(), &wp); err != nil { + return c.putCnrErrorResponse(err) + } + + stats.Report(c.vu, cnrPutDuration, metrics.D(time.Since(start))) + return PutContainerResponse{Success: true, ContainerID: res.ID().EncodeToString()} +} + func (c *Client) Onsite(inputContainerID string, payload goja.ArrayBuffer) PreparedObject { maxObjectSize, epoch, hhDisabled, err := parseNetworkInfo(c.vu.Context(), c.cli) if err != nil { @@ -342,3 +427,47 @@ func parseNetworkInfo(ctx context.Context, cli *client.Client) (maxObjSize, epoc }) return maxObjSize, epoch, hhDisabled, err } + +type waitParams struct { + timeout time.Duration + pollInterval time.Duration +} + +func (x *waitParams) setDefaults() { + x.timeout = 120 * time.Second + x.pollInterval = 5 * time.Second +} + +func (c *Client) waitForContainerPresence(ctx context.Context, cnrID *cid.ID, wp *waitParams) error { + return waitFor(ctx, wp, func(ctx context.Context) bool { + var prm client.PrmContainerGet + if cnrID != nil { + prm.SetContainer(*cnrID) + } + + _, err := c.cli.ContainerGet(ctx, prm) + return err == nil + }) +} + +func waitFor(ctx context.Context, params *waitParams, condition func(context.Context) bool) error { + wctx, cancel := context.WithTimeout(ctx, params.timeout) + defer cancel() + ticker := time.NewTimer(params.pollInterval) + defer ticker.Stop() + wdone := wctx.Done() + done := ctx.Done() + for { + select { + case <-done: + return ctx.Err() + case <-wdone: + return wctx.Err() + case <-ticker.C: + if condition(ctx) { + return nil + } + ticker.Reset(params.pollInterval) + } + } +} diff --git a/internal/native/native.go b/internal/native/native.go index ebabc4d..51c6659 100644 --- a/internal/native/native.go +++ b/internal/native/native.go @@ -30,6 +30,7 @@ var ( objPutTotal, objPutFails, objPutDuration *metrics.Metric objGetTotal, objGetFails, objGetDuration *metrics.Metric + cnrPutTotal, cnrPutFails, cnrPutDuration *metrics.Metric ) func init() { @@ -49,7 +50,7 @@ func (n *Native) Exports() modules.Exports { return modules.Exports{Default: n} } -func (n *Native) Connect(endpoint, wif string) (*Client, error) { +func (n *Native) Connect(endpoint, hexPrivateKey string) (*Client, error) { var ( cli client.Client pk *keys.PrivateKey @@ -57,8 +58,8 @@ func (n *Native) Connect(endpoint, wif string) (*Client, error) { ) pk, err = keys.NewPrivateKey() - if len(wif) != 0 { - pk, err = keys.NewPrivateKeyFromWIF(wif) + if len(hexPrivateKey) != 0 { + pk, err = keys.NewPrivateKeyFromHex(hexPrivateKey) } if err != nil { return nil, fmt.Errorf("invalid key: %w", err) @@ -115,6 +116,10 @@ func (n *Native) Connect(endpoint, wif string) (*Client, error) { objGetFails, _ = registry.NewMetric("neofs_obj_get_fails", metrics.Counter) objGetDuration, _ = registry.NewMetric("neofs_obj_get_duration", metrics.Trend, metrics.Time) + cnrPutTotal, _ = registry.NewMetric("neofs_cnr_put_total", metrics.Counter) + cnrPutFails, _ = registry.NewMetric("neofs_cnr_put_fails", metrics.Counter) + cnrPutDuration, _ = registry.NewMetric("neofs_cnr_put_duration", metrics.Trend, metrics.Time) + return &Client{ vu: n.vu, key: pk.PrivateKey,