From ff6814e15d463f931cf4bc6597cb6fb0d480034e Mon Sep 17 00:00:00 2001 From: Anton Nikiforov Date: Fri, 7 Jul 2023 13:16:54 +0300 Subject: [PATCH] [#72] Add option `--prepare-locally` Signed-off-by: Anton Nikiforov --- README.md | 3 ++- examples/native.js | 2 +- examples/native_onsite.js | 2 +- internal/native/client.go | 54 +++++++++++++++++++++++++-------------- internal/native/native.go | 11 ++++---- scenarios/grpc.js | 5 +++- scenarios/grpc_car.js | 5 +++- scenarios/verify.js | 5 +++- 8 files changed, 57 insertions(+), 30 deletions(-) diff --git a/README.md b/README.md index 832f66e..afde6b1 100644 --- a/README.md +++ b/README.md @@ -47,10 +47,11 @@ Create native client with `connect` method. Arguments: - hex encoded private key (empty value produces random key) - dial timeout in seconds (0 for the default value) - stream timeout in seconds (0 for the default value) +- generate object header on the client side (for big object - split locally too) ```js import native from 'k6/x/frostfs/native'; -const frostfs_cli = native.connect("s01.frostfs.devenv:8080", "", 0, 0) +const frostfs_cli = native.connect("s01.frostfs.devenv:8080", "", 0, 0, false) ``` ### Methods diff --git a/examples/native.js b/examples/native.js index 1b9ad26..2ea5e89 100644 --- a/examples/native.js +++ b/examples/native.js @@ -3,7 +3,7 @@ import { fail } from "k6"; import { uuidv4 } from '../scenarios/libs/k6-utils-1.4.0.js'; const payload = open('../go.sum', 'b'); -const frostfs_cli = native.connect("s01.frostfs.devenv:8080", "1dd37fba80fec4e6a6f13fd708d8dcb3b29def768017052f6c930fa1c5d90bbb", 0, 0) +const frostfs_cli = native.connect("s01.frostfs.devenv:8080", "1dd37fba80fec4e6a6f13fd708d8dcb3b29def768017052f6c930fa1c5d90bbb", 0, 0, false) export const options = { stages: [ diff --git a/examples/native_onsite.js b/examples/native_onsite.js index 218b7e2..ce2e24b 100644 --- a/examples/native_onsite.js +++ b/examples/native_onsite.js @@ -3,7 +3,7 @@ import { uuidv4 } from '../scenarios/libs/k6-utils-1.4.0.js'; const payload = open('../go.sum', 'b'); const container = "AjSxSNNXbJUDPqqKYm1VbFVDGCakbpUNH8aGjPmGAH3B" -const frostfs_cli = native.connect("s01.frostfs.devenv:8080", "", 0, 0) +const frostfs_cli = native.connect("s01.frostfs.devenv:8080", "", 0, 0, false) const frostfs_obj = frostfs_cli.onsite(container, payload) export const options = { diff --git a/internal/native/client.go b/internal/native/client.go index 5fbc783..612c59c 100644 --- a/internal/native/client.go +++ b/internal/native/client.go @@ -31,10 +31,11 @@ import ( type ( Client struct { - vu modules.VU - key ecdsa.PrivateKey - tok session.Object - cli *client.Client + vu modules.VU + key ecdsa.PrivateKey + tok session.Object + cli *client.Client + prepareLocally bool } PutResponse struct { @@ -65,12 +66,12 @@ type ( } PreparedObject struct { - vu modules.VU - key ecdsa.PrivateKey - cli *client.Client - - hdr object.Object - payload []byte + vu modules.VU + key ecdsa.PrivateKey + cli *client.Client + hdr object.Object + payload []byte + prepareLocally bool } ) @@ -103,7 +104,7 @@ func (c *Client) Put(containerID string, headers map[string]string, payload goja o.SetOwnerID(&owner) o.SetAttributes(attrs...) - resp, err := put(c.vu, c.cli, &tok, &o, payload.Bytes(), chunkSize) + resp, err := put(c.vu, c.cli, c.prepareLocally, &tok, &o, payload.Bytes(), chunkSize) if err != nil { return PutResponse{Success: false, Error: err.Error()} } @@ -367,12 +368,12 @@ func (c *Client) Onsite(containerID string, payload goja.ArrayBuffer) PreparedOb } return PreparedObject{ - vu: c.vu, - key: c.key, - cli: c.cli, - - hdr: *obj, - payload: data, + vu: c.vu, + key: c.key, + cli: c.cli, + hdr: *obj, + payload: data, + prepareLocally: c.prepareLocally, } } @@ -398,7 +399,7 @@ func (p PreparedObject) Put(headers map[string]string) PutResponse { return PutResponse{Success: false, Error: err.Error()} } - _, err = put(p.vu, p.cli, nil, &obj, p.payload, 0) + _, err = put(p.vu, p.cli, p.prepareLocally, nil, &obj, p.payload, 0) if err != nil { return PutResponse{Success: false, Error: err.Error()} } @@ -406,7 +407,13 @@ func (p PreparedObject) Put(headers map[string]string) PutResponse { return PutResponse{Success: true, ObjectID: id.String()} } -func put(vu modules.VU, cli *client.Client, tok *session.Object, +type epochSource uint64 + +func (s epochSource) CurrentEpoch() uint64 { + return uint64(s) +} + +func put(vu modules.VU, cli *client.Client, prepareLocally bool, tok *session.Object, hdr *object.Object, payload []byte, chunkSize int) (*client.ResObjectPut, error) { bufSize := defaultBufferSize if chunkSize > 0 { @@ -426,6 +433,15 @@ func put(vu modules.VU, cli *client.Client, tok *session.Object, if chunkSize > 0 { prm.SetGRPCPayloadChunkLen(chunkSize) } + if prepareLocally { + res, err := cli.NetworkInfo(vu.Context(), client.PrmNetworkInfo{}) + if err != nil { + return nil, err + } + prm.WithObjectMaxSize(res.Info().MaxObjectSize()) + prm.WithEpochSource(epochSource(res.Info().CurrentEpoch())) + prm.WithoutHomomorphicHash(true) + } objectWriter, err := cli.ObjectPutInit(vu.Context(), prm) if err != nil { diff --git a/internal/native/native.go b/internal/native/native.go index f3357d2..95a8e7d 100644 --- a/internal/native/native.go +++ b/internal/native/native.go @@ -51,7 +51,7 @@ func (n *Native) Exports() modules.Exports { return modules.Exports{Default: n} } -func (n *Native) Connect(endpoint, hexPrivateKey string, dialTimeout, streamTimeout int) (*Client, error) { +func (n *Native) Connect(endpoint, hexPrivateKey string, dialTimeout, streamTimeout int, prepareLocally bool) (*Client, error) { var ( cli client.Client pk *keys.PrivateKey @@ -133,9 +133,10 @@ func (n *Native) Connect(endpoint, hexPrivateKey string, dialTimeout, streamTime cnrPutDuration, _ = registry.NewMetric("frostfs_cnr_put_duration", metrics.Trend, metrics.Time) return &Client{ - vu: n.vu, - key: pk.PrivateKey, - tok: tok, - cli: &cli, + vu: n.vu, + key: pk.PrivateKey, + tok: tok, + cli: &cli, + prepareLocally: prepareLocally, }, nil } diff --git a/scenarios/grpc.js b/scenarios/grpc.js index cfd6790..328aae8 100644 --- a/scenarios/grpc.js +++ b/scenarios/grpc.js @@ -24,7 +24,10 @@ const summary_json = __ENV.SUMMARY_JSON || "/tmp/summary.json"; // Select random gRPC endpoint for current VU const grpc_endpoints = __ENV.GRPC_ENDPOINTS.split(','); const grpc_endpoint = grpc_endpoints[Math.floor(Math.random() * grpc_endpoints.length)]; -const grpc_client = native.connect(grpc_endpoint, '', __ENV.DIAL_TIMEOUT ? parseInt(__ENV.DIAL_TIMEOUT) : 5, __ENV.STREAM_TIMEOUT ? parseInt(__ENV.STREAM_TIMEOUT) : 60); +const grpc_client = native.connect(grpc_endpoint, '', + __ENV.DIAL_TIMEOUT ? parseInt(__ENV.DIAL_TIMEOUT) : 5, + __ENV.STREAM_TIMEOUT ? parseInt(__ENV.STREAM_TIMEOUT) : 60, + __ENV.PREPARE_LOCALLY ? __ENV.PREPARE_LOCALLY.toLowerCase() === "true" : false); const log = logging.new().withField("endpoint", grpc_endpoint); const registry_enabled = !!__ENV.REGISTRY_FILE; diff --git a/scenarios/grpc_car.js b/scenarios/grpc_car.js index 4fe8589..2214529 100644 --- a/scenarios/grpc_car.js +++ b/scenarios/grpc_car.js @@ -24,7 +24,10 @@ const summary_json = __ENV.SUMMARY_JSON || "/tmp/summary.json"; // Select random gRPC endpoint for current VU const grpc_endpoints = __ENV.GRPC_ENDPOINTS.split(','); const grpc_endpoint = grpc_endpoints[Math.floor(Math.random() * grpc_endpoints.length)]; -const grpc_client = native.connect(grpc_endpoint, '', __ENV.DIAL_TIMEOUT ? parseInt(__ENV.DIAL_TIMEOUT) : 5, __ENV.STREAM_TIMEOUT ? parseInt(__ENV.STREAM_TIMEOUT) : 60); +const grpc_client = native.connect(grpc_endpoint, '', + __ENV.DIAL_TIMEOUT ? parseInt(__ENV.DIAL_TIMEOUT) : 5, + __ENV.STREAM_TIMEOUT ? parseInt(__ENV.STREAM_TIMEOUT) : 60, + __ENV.PREPARE_LOCALLY ? __ENV.PREPARE_LOCALLY.toLowerCase() === "true" : false); const log = logging.new().withField("endpoint", grpc_endpoint); const registry_enabled = !!__ENV.REGISTRY_FILE; diff --git a/scenarios/verify.js b/scenarios/verify.js index 07a034a..0c27166 100644 --- a/scenarios/verify.js +++ b/scenarios/verify.js @@ -34,7 +34,10 @@ if (__ENV.GRPC_ENDPOINTS) { const grpcEndpoints = __ENV.GRPC_ENDPOINTS.split(','); const grpcEndpoint = grpcEndpoints[Math.floor(Math.random() * grpcEndpoints.length)]; log = log.withField("endpoint", grpcEndpoint); - grpc_client = native.connect(grpcEndpoint, '', __ENV.DIAL_TIMEOUT ? parseInt(__ENV.DIAL_TIMEOUT) : 0, __ENV.STREAM_TIMEOUT ? parseInt(__ENV.STREAM_TIMEOUT) : 0); + grpc_client = native.connect(grpcEndpoint, '', + __ENV.DIAL_TIMEOUT ? parseInt(__ENV.DIAL_TIMEOUT) : 0, + __ENV.STREAM_TIMEOUT ? parseInt(__ENV.STREAM_TIMEOUT) : 0, + __ENV.PREPARE_LOCALLY ? __ENV.PREPARE_LOCALLY.toLowerCase() === "true" : false, ''); } // Connect to random S3 endpoint