Allow to split objects on client #75
8 changed files with 57 additions and 30 deletions
|
@ -47,10 +47,11 @@ Create native client with `connect` method. Arguments:
|
||||||
- hex encoded private key (empty value produces random key)
|
- hex encoded private key (empty value produces random key)
|
||||||
- dial timeout in seconds (0 for the default value)
|
- dial timeout in seconds (0 for the default value)
|
||||||
- stream timeout in seconds (0 for the default value)
|
- stream timeout in seconds (0 for the default value)
|
||||||
|
- generate object header on the client side (for big object - split locally too)
|
||||||
|
|
||||||
```js
|
```js
|
||||||
import native from 'k6/x/frostfs/native';
|
import native from 'k6/x/frostfs/native';
|
||||||
const frostfs_cli = native.connect("s01.frostfs.devenv:8080", "", 0, 0)
|
const frostfs_cli = native.connect("s01.frostfs.devenv:8080", "", 0, 0, false)
|
||||||
```
|
```
|
||||||
|
|
||||||
### Methods
|
### Methods
|
||||||
|
|
|
@ -3,7 +3,7 @@ import { fail } from "k6";
|
||||||
import { uuidv4 } from '../scenarios/libs/k6-utils-1.4.0.js';
|
import { uuidv4 } from '../scenarios/libs/k6-utils-1.4.0.js';
|
||||||
|
|
||||||
const payload = open('../go.sum', 'b');
|
const payload = open('../go.sum', 'b');
|
||||||
const frostfs_cli = native.connect("s01.frostfs.devenv:8080", "1dd37fba80fec4e6a6f13fd708d8dcb3b29def768017052f6c930fa1c5d90bbb", 0, 0)
|
const frostfs_cli = native.connect("s01.frostfs.devenv:8080", "1dd37fba80fec4e6a6f13fd708d8dcb3b29def768017052f6c930fa1c5d90bbb", 0, 0, false)
|
||||||
|
|
||||||
export const options = {
|
export const options = {
|
||||||
stages: [
|
stages: [
|
||||||
|
|
|
@ -3,7 +3,7 @@ import { uuidv4 } from '../scenarios/libs/k6-utils-1.4.0.js';
|
||||||
|
|
||||||
const payload = open('../go.sum', 'b');
|
const payload = open('../go.sum', 'b');
|
||||||
const container = "AjSxSNNXbJUDPqqKYm1VbFVDGCakbpUNH8aGjPmGAH3B"
|
const container = "AjSxSNNXbJUDPqqKYm1VbFVDGCakbpUNH8aGjPmGAH3B"
|
||||||
const frostfs_cli = native.connect("s01.frostfs.devenv:8080", "", 0, 0)
|
const frostfs_cli = native.connect("s01.frostfs.devenv:8080", "", 0, 0, false)
|
||||||
const frostfs_obj = frostfs_cli.onsite(container, payload)
|
const frostfs_obj = frostfs_cli.onsite(container, payload)
|
||||||
|
|
||||||
export const options = {
|
export const options = {
|
||||||
|
|
|
@ -35,6 +35,7 @@ type (
|
||||||
key ecdsa.PrivateKey
|
key ecdsa.PrivateKey
|
||||||
tok session.Object
|
tok session.Object
|
||||||
cli *client.Client
|
cli *client.Client
|
||||||
|
prepareLocally bool
|
||||||
}
|
}
|
||||||
|
|
||||||
PutResponse struct {
|
PutResponse struct {
|
||||||
|
@ -68,9 +69,9 @@ type (
|
||||||
vu modules.VU
|
vu modules.VU
|
||||||
key ecdsa.PrivateKey
|
key ecdsa.PrivateKey
|
||||||
cli *client.Client
|
cli *client.Client
|
||||||
|
|
||||||
hdr object.Object
|
hdr object.Object
|
||||||
payload []byte
|
payload []byte
|
||||||
|
prepareLocally bool
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -103,7 +104,7 @@ func (c *Client) Put(containerID string, headers map[string]string, payload goja
|
||||||
o.SetOwnerID(&owner)
|
o.SetOwnerID(&owner)
|
||||||
o.SetAttributes(attrs...)
|
o.SetAttributes(attrs...)
|
||||||
|
|
||||||
resp, err := put(c.vu, c.cli, &tok, &o, payload.Bytes(), chunkSize)
|
resp, err := put(c.vu, c.cli, c.prepareLocally, &tok, &o, payload.Bytes(), chunkSize)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return PutResponse{Success: false, Error: err.Error()}
|
return PutResponse{Success: false, Error: err.Error()}
|
||||||
}
|
}
|
||||||
|
@ -370,9 +371,9 @@ func (c *Client) Onsite(containerID string, payload goja.ArrayBuffer) PreparedOb
|
||||||
vu: c.vu,
|
vu: c.vu,
|
||||||
key: c.key,
|
key: c.key,
|
||||||
cli: c.cli,
|
cli: c.cli,
|
||||||
|
|
||||||
hdr: *obj,
|
hdr: *obj,
|
||||||
payload: data,
|
payload: data,
|
||||||
|
prepareLocally: c.prepareLocally,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -398,7 +399,7 @@ func (p PreparedObject) Put(headers map[string]string) PutResponse {
|
||||||
return PutResponse{Success: false, Error: err.Error()}
|
return PutResponse{Success: false, Error: err.Error()}
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err = put(p.vu, p.cli, nil, &obj, p.payload, 0)
|
_, err = put(p.vu, p.cli, p.prepareLocally, nil, &obj, p.payload, 0)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return PutResponse{Success: false, Error: err.Error()}
|
return PutResponse{Success: false, Error: err.Error()}
|
||||||
}
|
}
|
||||||
|
@ -406,7 +407,13 @@ func (p PreparedObject) Put(headers map[string]string) PutResponse {
|
||||||
return PutResponse{Success: true, ObjectID: id.String()}
|
return PutResponse{Success: true, ObjectID: id.String()}
|
||||||
}
|
}
|
||||||
|
|
||||||
func put(vu modules.VU, cli *client.Client, tok *session.Object,
|
type epochSource uint64
|
||||||
|
|
||||||
|
func (s epochSource) CurrentEpoch() uint64 {
|
||||||
|
return uint64(s)
|
||||||
|
}
|
||||||
|
|
||||||
|
func put(vu modules.VU, cli *client.Client, prepareLocally bool, tok *session.Object,
|
||||||
hdr *object.Object, payload []byte, chunkSize int) (*client.ResObjectPut, error) {
|
hdr *object.Object, payload []byte, chunkSize int) (*client.ResObjectPut, error) {
|
||||||
bufSize := defaultBufferSize
|
bufSize := defaultBufferSize
|
||||||
if chunkSize > 0 {
|
if chunkSize > 0 {
|
||||||
|
@ -426,6 +433,15 @@ func put(vu modules.VU, cli *client.Client, tok *session.Object,
|
||||||
if chunkSize > 0 {
|
if chunkSize > 0 {
|
||||||
prm.SetGRPCPayloadChunkLen(chunkSize)
|
prm.SetGRPCPayloadChunkLen(chunkSize)
|
||||||
}
|
}
|
||||||
|
if prepareLocally {
|
||||||
|
res, err := cli.NetworkInfo(vu.Context(), client.PrmNetworkInfo{})
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
prm.WithObjectMaxSize(res.Info().MaxObjectSize())
|
||||||
|
prm.WithEpochSource(epochSource(res.Info().CurrentEpoch()))
|
||||||
|
prm.WithoutHomomorphicHash(true)
|
||||||
|
}
|
||||||
|
|
||||||
objectWriter, err := cli.ObjectPutInit(vu.Context(), prm)
|
objectWriter, err := cli.ObjectPutInit(vu.Context(), prm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -51,7 +51,7 @@ func (n *Native) Exports() modules.Exports {
|
||||||
return modules.Exports{Default: n}
|
return modules.Exports{Default: n}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *Native) Connect(endpoint, hexPrivateKey string, dialTimeout, streamTimeout int) (*Client, error) {
|
func (n *Native) Connect(endpoint, hexPrivateKey string, dialTimeout, streamTimeout int, prepareLocally bool) (*Client, error) {
|
||||||
var (
|
var (
|
||||||
cli client.Client
|
cli client.Client
|
||||||
pk *keys.PrivateKey
|
pk *keys.PrivateKey
|
||||||
|
@ -137,5 +137,6 @@ func (n *Native) Connect(endpoint, hexPrivateKey string, dialTimeout, streamTime
|
||||||
key: pk.PrivateKey,
|
key: pk.PrivateKey,
|
||||||
tok: tok,
|
tok: tok,
|
||||||
cli: &cli,
|
cli: &cli,
|
||||||
|
prepareLocally: prepareLocally,
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -24,7 +24,10 @@ const summary_json = __ENV.SUMMARY_JSON || "/tmp/summary.json";
|
||||||
// Select random gRPC endpoint for current VU
|
// Select random gRPC endpoint for current VU
|
||||||
const grpc_endpoints = __ENV.GRPC_ENDPOINTS.split(',');
|
const grpc_endpoints = __ENV.GRPC_ENDPOINTS.split(',');
|
||||||
const grpc_endpoint = grpc_endpoints[Math.floor(Math.random() * grpc_endpoints.length)];
|
const grpc_endpoint = grpc_endpoints[Math.floor(Math.random() * grpc_endpoints.length)];
|
||||||
const grpc_client = native.connect(grpc_endpoint, '', __ENV.DIAL_TIMEOUT ? parseInt(__ENV.DIAL_TIMEOUT) : 5, __ENV.STREAM_TIMEOUT ? parseInt(__ENV.STREAM_TIMEOUT) : 60);
|
const grpc_client = native.connect(grpc_endpoint, '',
|
||||||
|
__ENV.DIAL_TIMEOUT ? parseInt(__ENV.DIAL_TIMEOUT) : 5,
|
||||||
|
__ENV.STREAM_TIMEOUT ? parseInt(__ENV.STREAM_TIMEOUT) : 60,
|
||||||
|
__ENV.PREPARE_LOCALLY ? __ENV.PREPARE_LOCALLY.toLowerCase() === "true" : false);
|
||||||
const log = logging.new().withField("endpoint", grpc_endpoint);
|
const log = logging.new().withField("endpoint", grpc_endpoint);
|
||||||
|
|
||||||
const registry_enabled = !!__ENV.REGISTRY_FILE;
|
const registry_enabled = !!__ENV.REGISTRY_FILE;
|
||||||
|
|
|
@ -24,7 +24,10 @@ const summary_json = __ENV.SUMMARY_JSON || "/tmp/summary.json";
|
||||||
// Select random gRPC endpoint for current VU
|
// Select random gRPC endpoint for current VU
|
||||||
const grpc_endpoints = __ENV.GRPC_ENDPOINTS.split(',');
|
const grpc_endpoints = __ENV.GRPC_ENDPOINTS.split(',');
|
||||||
const grpc_endpoint = grpc_endpoints[Math.floor(Math.random() * grpc_endpoints.length)];
|
const grpc_endpoint = grpc_endpoints[Math.floor(Math.random() * grpc_endpoints.length)];
|
||||||
const grpc_client = native.connect(grpc_endpoint, '', __ENV.DIAL_TIMEOUT ? parseInt(__ENV.DIAL_TIMEOUT) : 5, __ENV.STREAM_TIMEOUT ? parseInt(__ENV.STREAM_TIMEOUT) : 60);
|
const grpc_client = native.connect(grpc_endpoint, '',
|
||||||
|
__ENV.DIAL_TIMEOUT ? parseInt(__ENV.DIAL_TIMEOUT) : 5,
|
||||||
|
__ENV.STREAM_TIMEOUT ? parseInt(__ENV.STREAM_TIMEOUT) : 60,
|
||||||
|
__ENV.PREPARE_LOCALLY ? __ENV.PREPARE_LOCALLY.toLowerCase() === "true" : false);
|
||||||
const log = logging.new().withField("endpoint", grpc_endpoint);
|
const log = logging.new().withField("endpoint", grpc_endpoint);
|
||||||
|
|
||||||
const registry_enabled = !!__ENV.REGISTRY_FILE;
|
const registry_enabled = !!__ENV.REGISTRY_FILE;
|
||||||
|
|
|
@ -34,7 +34,10 @@ if (__ENV.GRPC_ENDPOINTS) {
|
||||||
const grpcEndpoints = __ENV.GRPC_ENDPOINTS.split(',');
|
const grpcEndpoints = __ENV.GRPC_ENDPOINTS.split(',');
|
||||||
const grpcEndpoint = grpcEndpoints[Math.floor(Math.random() * grpcEndpoints.length)];
|
const grpcEndpoint = grpcEndpoints[Math.floor(Math.random() * grpcEndpoints.length)];
|
||||||
log = log.withField("endpoint", grpcEndpoint);
|
log = log.withField("endpoint", grpcEndpoint);
|
||||||
grpc_client = native.connect(grpcEndpoint, '', __ENV.DIAL_TIMEOUT ? parseInt(__ENV.DIAL_TIMEOUT) : 0, __ENV.STREAM_TIMEOUT ? parseInt(__ENV.STREAM_TIMEOUT) : 0);
|
grpc_client = native.connect(grpcEndpoint, '',
|
||||||
|
__ENV.DIAL_TIMEOUT ? parseInt(__ENV.DIAL_TIMEOUT) : 0,
|
||||||
|
__ENV.STREAM_TIMEOUT ? parseInt(__ENV.STREAM_TIMEOUT) : 0,
|
||||||
|
__ENV.PREPARE_LOCALLY ? __ENV.PREPARE_LOCALLY.toLowerCase() === "true" : false, '');
|
||||||
}
|
}
|
||||||
|
|
||||||
// Connect to random S3 endpoint
|
// Connect to random S3 endpoint
|
||||||
|
|
Loading…
Reference in a new issue