[#72] Add option --prepare-locally

Signed-off-by: Anton Nikiforov <an.nikiforov@yadro.com>
This commit is contained in:
Anton Nikiforov 2023-07-07 13:16:54 +03:00
parent 56235f5e90
commit ff6814e15d
8 changed files with 57 additions and 30 deletions

View file

@ -47,10 +47,11 @@ Create native client with `connect` method. Arguments:
- hex encoded private key (empty value produces random key)
- dial timeout in seconds (0 for the default value)
- stream timeout in seconds (0 for the default value)
- generate object header on the client side (for big object - split locally too)
```js
import native from 'k6/x/frostfs/native';
const frostfs_cli = native.connect("s01.frostfs.devenv:8080", "", 0, 0)
const frostfs_cli = native.connect("s01.frostfs.devenv:8080", "", 0, 0, false)
```
### Methods

View file

@ -3,7 +3,7 @@ import { fail } from "k6";
import { uuidv4 } from '../scenarios/libs/k6-utils-1.4.0.js';
const payload = open('../go.sum', 'b');
const frostfs_cli = native.connect("s01.frostfs.devenv:8080", "1dd37fba80fec4e6a6f13fd708d8dcb3b29def768017052f6c930fa1c5d90bbb", 0, 0)
const frostfs_cli = native.connect("s01.frostfs.devenv:8080", "1dd37fba80fec4e6a6f13fd708d8dcb3b29def768017052f6c930fa1c5d90bbb", 0, 0, false)
export const options = {
stages: [

View file

@ -3,7 +3,7 @@ import { uuidv4 } from '../scenarios/libs/k6-utils-1.4.0.js';
const payload = open('../go.sum', 'b');
const container = "AjSxSNNXbJUDPqqKYm1VbFVDGCakbpUNH8aGjPmGAH3B"
const frostfs_cli = native.connect("s01.frostfs.devenv:8080", "", 0, 0)
const frostfs_cli = native.connect("s01.frostfs.devenv:8080", "", 0, 0, false)
const frostfs_obj = frostfs_cli.onsite(container, payload)
export const options = {

View file

@ -31,10 +31,11 @@ import (
type (
Client struct {
vu modules.VU
key ecdsa.PrivateKey
tok session.Object
cli *client.Client
vu modules.VU
key ecdsa.PrivateKey
tok session.Object
cli *client.Client
prepareLocally bool
}
PutResponse struct {
@ -65,12 +66,12 @@ type (
}
PreparedObject struct {
vu modules.VU
key ecdsa.PrivateKey
cli *client.Client
hdr object.Object
payload []byte
vu modules.VU
key ecdsa.PrivateKey
cli *client.Client
hdr object.Object
payload []byte
prepareLocally bool
}
)
@ -103,7 +104,7 @@ func (c *Client) Put(containerID string, headers map[string]string, payload goja
o.SetOwnerID(&owner)
o.SetAttributes(attrs...)
resp, err := put(c.vu, c.cli, &tok, &o, payload.Bytes(), chunkSize)
resp, err := put(c.vu, c.cli, c.prepareLocally, &tok, &o, payload.Bytes(), chunkSize)
if err != nil {
return PutResponse{Success: false, Error: err.Error()}
}
@ -367,12 +368,12 @@ func (c *Client) Onsite(containerID string, payload goja.ArrayBuffer) PreparedOb
}
return PreparedObject{
vu: c.vu,
key: c.key,
cli: c.cli,
hdr: *obj,
payload: data,
vu: c.vu,
key: c.key,
cli: c.cli,
hdr: *obj,
payload: data,
prepareLocally: c.prepareLocally,
}
}
@ -398,7 +399,7 @@ func (p PreparedObject) Put(headers map[string]string) PutResponse {
return PutResponse{Success: false, Error: err.Error()}
}
_, err = put(p.vu, p.cli, nil, &obj, p.payload, 0)
_, err = put(p.vu, p.cli, p.prepareLocally, nil, &obj, p.payload, 0)
if err != nil {
return PutResponse{Success: false, Error: err.Error()}
}
@ -406,7 +407,13 @@ func (p PreparedObject) Put(headers map[string]string) PutResponse {
return PutResponse{Success: true, ObjectID: id.String()}
}
func put(vu modules.VU, cli *client.Client, tok *session.Object,
type epochSource uint64
func (s epochSource) CurrentEpoch() uint64 {
return uint64(s)
}
func put(vu modules.VU, cli *client.Client, prepareLocally bool, tok *session.Object,
hdr *object.Object, payload []byte, chunkSize int) (*client.ResObjectPut, error) {
bufSize := defaultBufferSize
if chunkSize > 0 {
@ -426,6 +433,15 @@ func put(vu modules.VU, cli *client.Client, tok *session.Object,
if chunkSize > 0 {
prm.SetGRPCPayloadChunkLen(chunkSize)
}
if prepareLocally {
res, err := cli.NetworkInfo(vu.Context(), client.PrmNetworkInfo{})
if err != nil {
return nil, err
}
prm.WithObjectMaxSize(res.Info().MaxObjectSize())
prm.WithEpochSource(epochSource(res.Info().CurrentEpoch()))
prm.WithoutHomomorphicHash(true)
}
objectWriter, err := cli.ObjectPutInit(vu.Context(), prm)
if err != nil {

View file

@ -51,7 +51,7 @@ func (n *Native) Exports() modules.Exports {
return modules.Exports{Default: n}
}
func (n *Native) Connect(endpoint, hexPrivateKey string, dialTimeout, streamTimeout int) (*Client, error) {
func (n *Native) Connect(endpoint, hexPrivateKey string, dialTimeout, streamTimeout int, prepareLocally bool) (*Client, error) {
var (
cli client.Client
pk *keys.PrivateKey
@ -133,9 +133,10 @@ func (n *Native) Connect(endpoint, hexPrivateKey string, dialTimeout, streamTime
cnrPutDuration, _ = registry.NewMetric("frostfs_cnr_put_duration", metrics.Trend, metrics.Time)
return &Client{
vu: n.vu,
key: pk.PrivateKey,
tok: tok,
cli: &cli,
vu: n.vu,
key: pk.PrivateKey,
tok: tok,
cli: &cli,
prepareLocally: prepareLocally,
}, nil
}

View file

@ -24,7 +24,10 @@ const summary_json = __ENV.SUMMARY_JSON || "/tmp/summary.json";
// Select random gRPC endpoint for current VU
const grpc_endpoints = __ENV.GRPC_ENDPOINTS.split(',');
const grpc_endpoint = grpc_endpoints[Math.floor(Math.random() * grpc_endpoints.length)];
const grpc_client = native.connect(grpc_endpoint, '', __ENV.DIAL_TIMEOUT ? parseInt(__ENV.DIAL_TIMEOUT) : 5, __ENV.STREAM_TIMEOUT ? parseInt(__ENV.STREAM_TIMEOUT) : 60);
const grpc_client = native.connect(grpc_endpoint, '',
__ENV.DIAL_TIMEOUT ? parseInt(__ENV.DIAL_TIMEOUT) : 5,
__ENV.STREAM_TIMEOUT ? parseInt(__ENV.STREAM_TIMEOUT) : 60,
__ENV.PREPARE_LOCALLY ? __ENV.PREPARE_LOCALLY.toLowerCase() === "true" : false);
const log = logging.new().withField("endpoint", grpc_endpoint);
const registry_enabled = !!__ENV.REGISTRY_FILE;

View file

@ -24,7 +24,10 @@ const summary_json = __ENV.SUMMARY_JSON || "/tmp/summary.json";
// Select random gRPC endpoint for current VU
const grpc_endpoints = __ENV.GRPC_ENDPOINTS.split(',');
const grpc_endpoint = grpc_endpoints[Math.floor(Math.random() * grpc_endpoints.length)];
const grpc_client = native.connect(grpc_endpoint, '', __ENV.DIAL_TIMEOUT ? parseInt(__ENV.DIAL_TIMEOUT) : 5, __ENV.STREAM_TIMEOUT ? parseInt(__ENV.STREAM_TIMEOUT) : 60);
const grpc_client = native.connect(grpc_endpoint, '',
__ENV.DIAL_TIMEOUT ? parseInt(__ENV.DIAL_TIMEOUT) : 5,
__ENV.STREAM_TIMEOUT ? parseInt(__ENV.STREAM_TIMEOUT) : 60,
__ENV.PREPARE_LOCALLY ? __ENV.PREPARE_LOCALLY.toLowerCase() === "true" : false);
const log = logging.new().withField("endpoint", grpc_endpoint);
const registry_enabled = !!__ENV.REGISTRY_FILE;

View file

@ -34,7 +34,10 @@ if (__ENV.GRPC_ENDPOINTS) {
const grpcEndpoints = __ENV.GRPC_ENDPOINTS.split(',');
const grpcEndpoint = grpcEndpoints[Math.floor(Math.random() * grpcEndpoints.length)];
log = log.withField("endpoint", grpcEndpoint);
grpc_client = native.connect(grpcEndpoint, '', __ENV.DIAL_TIMEOUT ? parseInt(__ENV.DIAL_TIMEOUT) : 0, __ENV.STREAM_TIMEOUT ? parseInt(__ENV.STREAM_TIMEOUT) : 0);
grpc_client = native.connect(grpcEndpoint, '',
__ENV.DIAL_TIMEOUT ? parseInt(__ENV.DIAL_TIMEOUT) : 0,
__ENV.STREAM_TIMEOUT ? parseInt(__ENV.STREAM_TIMEOUT) : 0,
__ENV.PREPARE_LOCALLY ? __ENV.PREPARE_LOCALLY.toLowerCase() === "true" : false, '');
}
// Connect to random S3 endpoint