From 2da51e4aa28059cfdbd2fb11a940241b0558477f Mon Sep 17 00:00:00 2001 From: Alex Vanin Date: Mon, 23 May 2022 14:42:48 +0300 Subject: [PATCH] [#6] native: Support onsite object preparation and uploading Signed-off-by: Alex Vanin --- README.md | 4 + examples/native_onsite.js | 25 ++++++ internal/native/client.go | 162 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 191 insertions(+) create mode 100644 examples/native_onsite.js diff --git a/README.md b/README.md index 32715cd..22d0aa3 100644 --- a/README.md +++ b/README.md @@ -50,6 +50,10 @@ const neofsCli = native.connect("s01.neofs.devenv:8080", "") boolean flag, `object_id` string, and `error` string. - `get(container_id, object_id)`. Returns dictionary with `success` boolean flag, and `error` string. +- `onsite(container_id, object_id)`. Returns NeoFS object instance with prepared + headers. Invoke `put(headers)` method on this object to upload it into NeoFS. + It returns dicrionary with `success` boolean flag, `object_id` string and + `error` string. ### S3 diff --git a/examples/native_onsite.js b/examples/native_onsite.js new file mode 100644 index 0000000..8b81cda --- /dev/null +++ b/examples/native_onsite.js @@ -0,0 +1,25 @@ +import { uuidv4 } from 'https://jslib.k6.io/k6-utils/1.2.0/index.js'; +import native from 'k6/x/neofs/native'; + +const payload = open('../go.sum', 'b'); +const container = "AjSxSNNXbJUDPqqKYm1VbFVDGCakbpUNH8aGjPmGAH3B" +const neofs_cli = native.connect("s01.neofs.devenv:8080", "") +const neofs_obj = neofs_cli.onsite(container, payload) + +export const options = { + stages: [ + { duration: '30s', target: 10 }, + ], +}; + +export default function () { + let headers = { + 'unique_header': uuidv4() + } + let resp = neofs_obj.put(headers) + if (resp.success) { + neofs_cli.get(container, resp.object_id) + } else { + console.log(resp.error) + } +} diff --git a/internal/native/client.go b/internal/native/client.go index 09ad910..b69e259 100644 --- a/internal/native/client.go +++ b/internal/native/client.go @@ -2,17 +2,27 @@ package native import ( "bytes" + "context" "crypto/ecdsa" + "crypto/sha256" + "encoding/binary" + "errors" + "fmt" "time" "github.com/dop251/goja" + "github.com/nspcc-dev/neo-go/pkg/vm/stackitem" + "github.com/nspcc-dev/neofs-sdk-go/checksum" "github.com/nspcc-dev/neofs-sdk-go/client" cid "github.com/nspcc-dev/neofs-sdk-go/container/id" + "github.com/nspcc-dev/neofs-sdk-go/netmap" "github.com/nspcc-dev/neofs-sdk-go/object" "github.com/nspcc-dev/neofs-sdk-go/object/address" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" "github.com/nspcc-dev/neofs-sdk-go/session" "github.com/nspcc-dev/neofs-sdk-go/user" + "github.com/nspcc-dev/neofs-sdk-go/version" + "github.com/nspcc-dev/tzhash/tz" "github.com/nspcc-dev/xk6-neofs/internal/stats" "go.k6.io/k6/js/modules" "go.k6.io/k6/metrics" @@ -36,6 +46,15 @@ type ( Success bool Error string } + + PreparedObject struct { + vu modules.VU + key ecdsa.PrivateKey + cli *client.Client + + hdr object.Object + payload []byte + } ) func (c *Client) Put(inputContainerID string, headers map[string]string, payload goja.ArrayBuffer) PutResponse { @@ -186,3 +205,146 @@ func (c *Client) Get(inputContainerID, inputObjectID string) GetResponse { stats.ReportDataReceived(c.vu, float64(sz)) return GetResponse{Success: true} } + +func (c *Client) Onsite(inputContainerID string, payload goja.ArrayBuffer) PreparedObject { + maxObjectSize, epoch, hhDisabled, err := parseNetworkInfo(c.vu.Context(), c.cli) + if err != nil { + panic(err) + } + data := payload.Bytes() + ln := len(data) + if ln > int(maxObjectSize) { + // not sure if load test needs object transformation + // with parent-child relation; if needs, then replace + // this code with the usage of object transformer from + // neofs-loader or distribution. + msg := fmt.Sprintf("payload size %d is bigger than network limit %d", ln, maxObjectSize) + panic(msg) + } + + var containerID cid.ID + err = containerID.DecodeString(inputContainerID) + if err != nil { + panic(err) + } + + var owner user.ID + user.IDFromKey(&owner, c.key.PublicKey) + + apiVersion := version.Current() + + obj := object.New() + obj.SetVersion(&apiVersion) + obj.SetType(object.TypeRegular) + obj.SetContainerID(containerID) + obj.SetOwnerID(&owner) + obj.SetPayloadSize(uint64(ln)) + obj.SetCreationEpoch(epoch) + + var sha, hh checksum.Checksum + sha.SetSHA256(sha256.Sum256(data)) + obj.SetPayloadChecksum(sha) + if !hhDisabled { + hh.SetTillichZemor(tz.Sum(data)) + obj.SetPayloadHomomorphicHash(hh) + } + + return PreparedObject{ + vu: c.vu, + key: c.key, + cli: c.cli, + + hdr: *obj, + payload: data, + } +} + +func (p PreparedObject) Put(headers map[string]string) PutResponse { + obj := p.hdr + + attrs := make([]object.Attribute, len(headers)) + ind := 0 + for k, v := range headers { + attrs[ind].SetKey(k) + attrs[ind].SetValue(v) + ind++ + } + obj.SetAttributes(attrs...) + + id, err := object.CalculateID(&obj) + if err != nil { + return PutResponse{Success: false, Error: err.Error()} + } + obj.SetID(id) + + if err = object.CalculateAndSetSignature(p.key, &obj); err != nil { + return PutResponse{Success: false, Error: err.Error()} + } + + buf := make([]byte, 4*1024) + rdr := bytes.NewReader(p.payload) + + // starting upload + // TODO(alexvanin): factor uploading code of Put() methods + stats.Report(p.vu, objPutTotal, 1) + start := time.Now() + + objectWriter, err := p.cli.ObjectPutInit(p.vu.Context(), client.PrmObjectPutInit{}) + if err != nil { + stats.Report(p.vu, objPutFails, 1) + return PutResponse{Success: false, Error: err.Error()} + } + + if !objectWriter.WriteHeader(obj) { + stats.Report(p.vu, objPutFails, 1) + _, err := objectWriter.Close() + return PutResponse{Success: false, Error: err.Error()} + } + + n, _ := rdr.Read(buf) + for n > 0 { + if !objectWriter.WritePayloadChunk(buf[:n]) { + break + } + n, _ = rdr.Read(buf) + } + + _, err = objectWriter.Close() + if err != nil { + stats.Report(p.vu, objPutFails, 1) + return PutResponse{Success: false, Error: err.Error()} + } + + stats.ReportDataSent(p.vu, float64(obj.PayloadSize())) + stats.Report(p.vu, objPutDuration, metrics.D(time.Since(start))) + + return PutResponse{Success: true, ObjectID: id.String()} +} + +func parseNetworkInfo(ctx context.Context, cli *client.Client) (maxObjSize, epoch uint64, hhDisabled bool, err error) { + ni, err := cli.NetworkInfo(ctx, client.PrmNetworkInfo{}) + if err != nil { + return 0, 0, false, err + } + + epoch = ni.Info().CurrentEpoch() + err = errors.New("network configuration misses max object size value") + + ni.Info().NetworkConfig().IterateParameters(func(parameter *netmap.NetworkParameter) bool { + switch string(parameter.Key()) { + case "MaxObjectSize": + buf := make([]byte, 8) + copy(buf[:], parameter.Value()) + maxObjSize = binary.LittleEndian.Uint64(buf) + err = nil + case "HomomorphicHashingDisabled": + arr := stackitem.NewByteArray(parameter.Value()) + hhDisabled, err = arr.TryBool() + if err != nil { + return true + } + } + return false + }) + return maxObjSize, epoch, hhDisabled, err +}