Allow to split objects on client #75
8 changed files with 57 additions and 30 deletions
|
@ -47,10 +47,11 @@ Create native client with `connect` method. Arguments:
|
|||
- hex encoded private key (empty value produces random key)
|
||||
- dial timeout in seconds (0 for the default value)
|
||||
- stream timeout in seconds (0 for the default value)
|
||||
- generate object header on the client side (for big object - split locally too)
|
||||
|
||||
```js
|
||||
import native from 'k6/x/frostfs/native';
|
||||
const frostfs_cli = native.connect("s01.frostfs.devenv:8080", "", 0, 0)
|
||||
const frostfs_cli = native.connect("s01.frostfs.devenv:8080", "", 0, 0, false)
|
||||
```
|
||||
|
||||
### Methods
|
||||
|
|
|
@ -3,7 +3,7 @@ import { fail } from "k6";
|
|||
import { uuidv4 } from '../scenarios/libs/k6-utils-1.4.0.js';
|
||||
|
||||
const payload = open('../go.sum', 'b');
|
||||
const frostfs_cli = native.connect("s01.frostfs.devenv:8080", "1dd37fba80fec4e6a6f13fd708d8dcb3b29def768017052f6c930fa1c5d90bbb", 0, 0)
|
||||
const frostfs_cli = native.connect("s01.frostfs.devenv:8080", "1dd37fba80fec4e6a6f13fd708d8dcb3b29def768017052f6c930fa1c5d90bbb", 0, 0, false)
|
||||
|
||||
export const options = {
|
||||
stages: [
|
||||
|
|
|
@ -3,7 +3,7 @@ import { uuidv4 } from '../scenarios/libs/k6-utils-1.4.0.js';
|
|||
|
||||
const payload = open('../go.sum', 'b');
|
||||
const container = "AjSxSNNXbJUDPqqKYm1VbFVDGCakbpUNH8aGjPmGAH3B"
|
||||
const frostfs_cli = native.connect("s01.frostfs.devenv:8080", "", 0, 0)
|
||||
const frostfs_cli = native.connect("s01.frostfs.devenv:8080", "", 0, 0, false)
|
||||
const frostfs_obj = frostfs_cli.onsite(container, payload)
|
||||
|
||||
export const options = {
|
||||
|
|
|
@ -35,6 +35,7 @@ type (
|
|||
key ecdsa.PrivateKey
|
||||
tok session.Object
|
||||
cli *client.Client
|
||||
prepareLocally bool
|
||||
}
|
||||
|
||||
PutResponse struct {
|
||||
|
@ -68,9 +69,9 @@ type (
|
|||
vu modules.VU
|
||||
key ecdsa.PrivateKey
|
||||
cli *client.Client
|
||||
|
||||
hdr object.Object
|
||||
payload []byte
|
||||
prepareLocally bool
|
||||
}
|
||||
)
|
||||
|
||||
|
@ -103,7 +104,7 @@ func (c *Client) Put(containerID string, headers map[string]string, payload goja
|
|||
o.SetOwnerID(&owner)
|
||||
o.SetAttributes(attrs...)
|
||||
|
||||
resp, err := put(c.vu, c.cli, &tok, &o, payload.Bytes(), chunkSize)
|
||||
resp, err := put(c.vu, c.cli, c.prepareLocally, &tok, &o, payload.Bytes(), chunkSize)
|
||||
if err != nil {
|
||||
return PutResponse{Success: false, Error: err.Error()}
|
||||
}
|
||||
|
@ -370,9 +371,9 @@ func (c *Client) Onsite(containerID string, payload goja.ArrayBuffer) PreparedOb
|
|||
vu: c.vu,
|
||||
key: c.key,
|
||||
cli: c.cli,
|
||||
|
||||
hdr: *obj,
|
||||
payload: data,
|
||||
prepareLocally: c.prepareLocally,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -398,7 +399,7 @@ func (p PreparedObject) Put(headers map[string]string) PutResponse {
|
|||
return PutResponse{Success: false, Error: err.Error()}
|
||||
}
|
||||
|
||||
_, err = put(p.vu, p.cli, nil, &obj, p.payload, 0)
|
||||
_, err = put(p.vu, p.cli, p.prepareLocally, nil, &obj, p.payload, 0)
|
||||
if err != nil {
|
||||
return PutResponse{Success: false, Error: err.Error()}
|
||||
}
|
||||
|
@ -406,7 +407,13 @@ func (p PreparedObject) Put(headers map[string]string) PutResponse {
|
|||
return PutResponse{Success: true, ObjectID: id.String()}
|
||||
}
|
||||
|
||||
func put(vu modules.VU, cli *client.Client, tok *session.Object,
|
||||
type epochSource uint64
|
||||
|
||||
func (s epochSource) CurrentEpoch() uint64 {
|
||||
return uint64(s)
|
||||
}
|
||||
|
||||
func put(vu modules.VU, cli *client.Client, prepareLocally bool, tok *session.Object,
|
||||
hdr *object.Object, payload []byte, chunkSize int) (*client.ResObjectPut, error) {
|
||||
bufSize := defaultBufferSize
|
||||
if chunkSize > 0 {
|
||||
|
@ -426,6 +433,15 @@ func put(vu modules.VU, cli *client.Client, tok *session.Object,
|
|||
if chunkSize > 0 {
|
||||
prm.SetGRPCPayloadChunkLen(chunkSize)
|
||||
}
|
||||
if prepareLocally {
|
||||
res, err := cli.NetworkInfo(vu.Context(), client.PrmNetworkInfo{})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
prm.WithObjectMaxSize(res.Info().MaxObjectSize())
|
||||
prm.WithEpochSource(epochSource(res.Info().CurrentEpoch()))
|
||||
prm.WithoutHomomorphicHash(true)
|
||||
}
|
||||
|
||||
objectWriter, err := cli.ObjectPutInit(vu.Context(), prm)
|
||||
if err != nil {
|
||||
|
|
|
@ -51,7 +51,7 @@ func (n *Native) Exports() modules.Exports {
|
|||
return modules.Exports{Default: n}
|
||||
}
|
||||
|
||||
func (n *Native) Connect(endpoint, hexPrivateKey string, dialTimeout, streamTimeout int) (*Client, error) {
|
||||
func (n *Native) Connect(endpoint, hexPrivateKey string, dialTimeout, streamTimeout int, prepareLocally bool) (*Client, error) {
|
||||
var (
|
||||
cli client.Client
|
||||
pk *keys.PrivateKey
|
||||
|
@ -137,5 +137,6 @@ func (n *Native) Connect(endpoint, hexPrivateKey string, dialTimeout, streamTime
|
|||
key: pk.PrivateKey,
|
||||
tok: tok,
|
||||
cli: &cli,
|
||||
prepareLocally: prepareLocally,
|
||||
}, nil
|
||||
}
|
||||
|
|
|
@ -24,7 +24,10 @@ const summary_json = __ENV.SUMMARY_JSON || "/tmp/summary.json";
|
|||
// Select random gRPC endpoint for current VU
|
||||
const grpc_endpoints = __ENV.GRPC_ENDPOINTS.split(',');
|
||||
const grpc_endpoint = grpc_endpoints[Math.floor(Math.random() * grpc_endpoints.length)];
|
||||
const grpc_client = native.connect(grpc_endpoint, '', __ENV.DIAL_TIMEOUT ? parseInt(__ENV.DIAL_TIMEOUT) : 5, __ENV.STREAM_TIMEOUT ? parseInt(__ENV.STREAM_TIMEOUT) : 60);
|
||||
const grpc_client = native.connect(grpc_endpoint, '',
|
||||
__ENV.DIAL_TIMEOUT ? parseInt(__ENV.DIAL_TIMEOUT) : 5,
|
||||
__ENV.STREAM_TIMEOUT ? parseInt(__ENV.STREAM_TIMEOUT) : 60,
|
||||
__ENV.PREPARE_LOCALLY ? __ENV.PREPARE_LOCALLY.toLowerCase() === "true" : false);
|
||||
const log = logging.new().withField("endpoint", grpc_endpoint);
|
||||
|
||||
const registry_enabled = !!__ENV.REGISTRY_FILE;
|
||||
|
|
|
@ -24,7 +24,10 @@ const summary_json = __ENV.SUMMARY_JSON || "/tmp/summary.json";
|
|||
// Select random gRPC endpoint for current VU
|
||||
const grpc_endpoints = __ENV.GRPC_ENDPOINTS.split(',');
|
||||
const grpc_endpoint = grpc_endpoints[Math.floor(Math.random() * grpc_endpoints.length)];
|
||||
const grpc_client = native.connect(grpc_endpoint, '', __ENV.DIAL_TIMEOUT ? parseInt(__ENV.DIAL_TIMEOUT) : 5, __ENV.STREAM_TIMEOUT ? parseInt(__ENV.STREAM_TIMEOUT) : 60);
|
||||
const grpc_client = native.connect(grpc_endpoint, '',
|
||||
__ENV.DIAL_TIMEOUT ? parseInt(__ENV.DIAL_TIMEOUT) : 5,
|
||||
__ENV.STREAM_TIMEOUT ? parseInt(__ENV.STREAM_TIMEOUT) : 60,
|
||||
__ENV.PREPARE_LOCALLY ? __ENV.PREPARE_LOCALLY.toLowerCase() === "true" : false);
|
||||
const log = logging.new().withField("endpoint", grpc_endpoint);
|
||||
|
||||
const registry_enabled = !!__ENV.REGISTRY_FILE;
|
||||
|
|
|
@ -34,7 +34,10 @@ if (__ENV.GRPC_ENDPOINTS) {
|
|||
const grpcEndpoints = __ENV.GRPC_ENDPOINTS.split(',');
|
||||
const grpcEndpoint = grpcEndpoints[Math.floor(Math.random() * grpcEndpoints.length)];
|
||||
log = log.withField("endpoint", grpcEndpoint);
|
||||
grpc_client = native.connect(grpcEndpoint, '', __ENV.DIAL_TIMEOUT ? parseInt(__ENV.DIAL_TIMEOUT) : 0, __ENV.STREAM_TIMEOUT ? parseInt(__ENV.STREAM_TIMEOUT) : 0);
|
||||
grpc_client = native.connect(grpcEndpoint, '',
|
||||
__ENV.DIAL_TIMEOUT ? parseInt(__ENV.DIAL_TIMEOUT) : 0,
|
||||
__ENV.STREAM_TIMEOUT ? parseInt(__ENV.STREAM_TIMEOUT) : 0,
|
||||
__ENV.PREPARE_LOCALLY ? __ENV.PREPARE_LOCALLY.toLowerCase() === "true" : false, '');
|
||||
}
|
||||
|
||||
// Connect to random S3 endpoint
|
||||
|
|
Loading…
Reference in a new issue