forked from TrueCloudLab/xk6-frostfs
[#145] native: Allow to specify max_obj_size
For locally prepared objects it is possible now to specify cut size. Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
parent
f90a645594
commit
6d1e7eb49e
8 changed files with 35 additions and 14 deletions
|
@ -48,10 +48,11 @@ Create native client with `connect` method. Arguments:
|
||||||
- dial timeout in seconds (0 for the default value)
|
- dial timeout in seconds (0 for the default value)
|
||||||
- stream timeout in seconds (0 for the default value)
|
- stream timeout in seconds (0 for the default value)
|
||||||
- generate object header on the client side (for big object - split locally too)
|
- generate object header on the client side (for big object - split locally too)
|
||||||
|
- max size for generated object header on the client side (for big object - the size that the object is splitted into)
|
||||||
|
|
||||||
```js
|
```js
|
||||||
import native from 'k6/x/frostfs/native';
|
import native from 'k6/x/frostfs/native';
|
||||||
const frostfs_cli = native.connect("s01.frostfs.devenv:8080", "", 0, 0, false)
|
const frostfs_cli = native.connect("s01.frostfs.devenv:8080", "", 0, 0, false, 0)
|
||||||
```
|
```
|
||||||
|
|
||||||
### Methods
|
### Methods
|
||||||
|
|
|
@ -3,7 +3,7 @@ import { fail } from "k6";
|
||||||
import { uuidv4 } from '../scenarios/libs/k6-utils-1.4.0.js';
|
import { uuidv4 } from '../scenarios/libs/k6-utils-1.4.0.js';
|
||||||
|
|
||||||
const payload = open('../go.sum', 'b');
|
const payload = open('../go.sum', 'b');
|
||||||
const frostfs_cli = native.connect("s01.frostfs.devenv:8080", "1dd37fba80fec4e6a6f13fd708d8dcb3b29def768017052f6c930fa1c5d90bbb", 0, 0, false)
|
const frostfs_cli = native.connect("s01.frostfs.devenv:8080", "1dd37fba80fec4e6a6f13fd708d8dcb3b29def768017052f6c930fa1c5d90bbb", 0, 0, false, 0)
|
||||||
|
|
||||||
export const options = {
|
export const options = {
|
||||||
stages: [
|
stages: [
|
||||||
|
|
|
@ -3,7 +3,7 @@ import { uuidv4 } from '../scenarios/libs/k6-utils-1.4.0.js';
|
||||||
|
|
||||||
const payload = open('../go.sum', 'b');
|
const payload = open('../go.sum', 'b');
|
||||||
const container = "AjSxSNNXbJUDPqqKYm1VbFVDGCakbpUNH8aGjPmGAH3B"
|
const container = "AjSxSNNXbJUDPqqKYm1VbFVDGCakbpUNH8aGjPmGAH3B"
|
||||||
const frostfs_cli = native.connect("s01.frostfs.devenv:8080", "", 0, 0, false)
|
const frostfs_cli = native.connect("s01.frostfs.devenv:8080", "", 0, 0, false, 0)
|
||||||
const frostfs_obj = frostfs_cli.onsite(container, payload)
|
const frostfs_obj = frostfs_cli.onsite(container, payload)
|
||||||
|
|
||||||
export const options = {
|
export const options = {
|
||||||
|
|
|
@ -35,6 +35,7 @@ type (
|
||||||
tok session.Object
|
tok session.Object
|
||||||
cli *client.Client
|
cli *client.Client
|
||||||
prepareLocally bool
|
prepareLocally bool
|
||||||
|
maxObjSize uint64
|
||||||
}
|
}
|
||||||
|
|
||||||
PutResponse struct {
|
PutResponse struct {
|
||||||
|
@ -71,6 +72,7 @@ type (
|
||||||
hdr object.Object
|
hdr object.Object
|
||||||
payload []byte
|
payload []byte
|
||||||
prepareLocally bool
|
prepareLocally bool
|
||||||
|
maxObjSize uint64
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -103,7 +105,7 @@ func (c *Client) Put(containerID string, headers map[string]string, payload data
|
||||||
o.SetOwnerID(owner)
|
o.SetOwnerID(owner)
|
||||||
o.SetAttributes(attrs...)
|
o.SetAttributes(attrs...)
|
||||||
|
|
||||||
resp, err := put(c.vu, c.cli, c.prepareLocally, &tok, &o, payload, chunkSize)
|
resp, err := put(c.vu, c.cli, c.prepareLocally, &tok, &o, payload, chunkSize, c.maxObjSize)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return PutResponse{Success: false, Error: err.Error()}
|
return PutResponse{Success: false, Error: err.Error()}
|
||||||
}
|
}
|
||||||
|
@ -373,6 +375,7 @@ func (c *Client) Onsite(containerID string, payload datagen.Payload) PreparedObj
|
||||||
hdr: *obj,
|
hdr: *obj,
|
||||||
payload: data,
|
payload: data,
|
||||||
prepareLocally: c.prepareLocally,
|
prepareLocally: c.prepareLocally,
|
||||||
|
maxObjSize: c.maxObjSize,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -398,7 +401,7 @@ func (p PreparedObject) Put(headers map[string]string) PutResponse {
|
||||||
return PutResponse{Success: false, Error: err.Error()}
|
return PutResponse{Success: false, Error: err.Error()}
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err = put(p.vu, p.cli, p.prepareLocally, nil, &obj, datagen.NewFixedPayload(p.payload), 0)
|
_, err = put(p.vu, p.cli, p.prepareLocally, nil, &obj, datagen.NewFixedPayload(p.payload), 0, p.maxObjSize)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return PutResponse{Success: false, Error: err.Error()}
|
return PutResponse{Success: false, Error: err.Error()}
|
||||||
}
|
}
|
||||||
|
@ -413,7 +416,7 @@ func (s epochSource) CurrentEpoch() uint64 {
|
||||||
}
|
}
|
||||||
|
|
||||||
func put(vu modules.VU, cli *client.Client, prepareLocally bool, tok *session.Object,
|
func put(vu modules.VU, cli *client.Client, prepareLocally bool, tok *session.Object,
|
||||||
hdr *object.Object, payload datagen.Payload, chunkSize int,
|
hdr *object.Object, payload datagen.Payload, chunkSize int, maxObjSize uint64,
|
||||||
) (*client.ResObjectPut, error) {
|
) (*client.ResObjectPut, error) {
|
||||||
bufSize := defaultBufferSize
|
bufSize := defaultBufferSize
|
||||||
if chunkSize > 0 {
|
if chunkSize > 0 {
|
||||||
|
@ -441,6 +444,9 @@ func put(vu modules.VU, cli *client.Client, prepareLocally bool, tok *session.Ob
|
||||||
prm.MaxSize = res.Info().MaxObjectSize()
|
prm.MaxSize = res.Info().MaxObjectSize()
|
||||||
prm.EpochSource = epochSource(res.Info().CurrentEpoch())
|
prm.EpochSource = epochSource(res.Info().CurrentEpoch())
|
||||||
prm.WithoutHomomorphHash = true
|
prm.WithoutHomomorphHash = true
|
||||||
|
if maxObjSize > 0 {
|
||||||
|
prm.MaxSize = maxObjSize
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
objectWriter, err := cli.ObjectPutInit(vu.Context(), prm)
|
objectWriter, err := cli.ObjectPutInit(vu.Context(), prm)
|
||||||
|
|
|
@ -52,13 +52,17 @@ func (n *Native) Exports() modules.Exports {
|
||||||
return modules.Exports{Default: n}
|
return modules.Exports{Default: n}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *Native) Connect(endpoint, hexPrivateKey string, dialTimeout, streamTimeout int, prepareLocally bool) (*Client, error) {
|
func (n *Native) Connect(endpoint, hexPrivateKey string, dialTimeout, streamTimeout int, prepareLocally bool, maxObjSize int) (*Client, error) {
|
||||||
var (
|
var (
|
||||||
cli client.Client
|
cli client.Client
|
||||||
pk *keys.PrivateKey
|
pk *keys.PrivateKey
|
||||||
err error
|
err error
|
||||||
)
|
)
|
||||||
|
|
||||||
|
if maxObjSize < 0 {
|
||||||
|
return nil, fmt.Errorf("max object size value must be positive")
|
||||||
|
}
|
||||||
|
|
||||||
pk, err = keys.NewPrivateKey()
|
pk, err = keys.NewPrivateKey()
|
||||||
if len(hexPrivateKey) != 0 {
|
if len(hexPrivateKey) != 0 {
|
||||||
pk, err = keys.NewPrivateKeyFromHex(hexPrivateKey)
|
pk, err = keys.NewPrivateKeyFromHex(hexPrivateKey)
|
||||||
|
@ -114,6 +118,16 @@ func (n *Native) Connect(endpoint, hexPrivateKey string, dialTimeout, streamTime
|
||||||
tok.SetAuthKey(&key)
|
tok.SetAuthKey(&key)
|
||||||
tok.SetExp(exp)
|
tok.SetExp(exp)
|
||||||
|
|
||||||
|
if prepareLocally && maxObjSize > 0 {
|
||||||
|
res, err := cli.NetworkInfo(n.vu.Context(), client.PrmNetworkInfo{})
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if uint64(maxObjSize) > res.Info().MaxObjectSize() {
|
||||||
|
return nil, fmt.Errorf("max object size must be not greater than %d bytes", res.Info().MaxObjectSize())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// register metrics
|
// register metrics
|
||||||
|
|
||||||
objPutSuccess, _ = stats.Registry.NewMetric("frostfs_obj_put_success", metrics.Counter)
|
objPutSuccess, _ = stats.Registry.NewMetric("frostfs_obj_put_success", metrics.Counter)
|
||||||
|
@ -140,5 +154,6 @@ func (n *Native) Connect(endpoint, hexPrivateKey string, dialTimeout, streamTime
|
||||||
tok: tok,
|
tok: tok,
|
||||||
cli: &cli,
|
cli: &cli,
|
||||||
prepareLocally: prepareLocally,
|
prepareLocally: prepareLocally,
|
||||||
|
maxObjSize: uint64(maxObjSize),
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -31,8 +31,8 @@ const grpc_endpoint =
|
||||||
const grpc_client = native.connect(
|
const grpc_client = native.connect(
|
||||||
grpc_endpoint, '', __ENV.DIAL_TIMEOUT ? parseInt(__ENV.DIAL_TIMEOUT) : 5,
|
grpc_endpoint, '', __ENV.DIAL_TIMEOUT ? parseInt(__ENV.DIAL_TIMEOUT) : 5,
|
||||||
__ENV.STREAM_TIMEOUT ? parseInt(__ENV.STREAM_TIMEOUT) : 60,
|
__ENV.STREAM_TIMEOUT ? parseInt(__ENV.STREAM_TIMEOUT) : 60,
|
||||||
__ENV.PREPARE_LOCALLY ? __ENV.PREPARE_LOCALLY.toLowerCase() === 'true'
|
__ENV.PREPARE_LOCALLY ? __ENV.PREPARE_LOCALLY.toLowerCase() === 'true' : false,
|
||||||
: false);
|
1024 * parseInt(__ENV.MAX_OBJECT_SIZE || '0'));
|
||||||
const log = logging.new().withField('endpoint', grpc_endpoint);
|
const log = logging.new().withField('endpoint', grpc_endpoint);
|
||||||
|
|
||||||
const registry_enabled = !!__ENV.REGISTRY_FILE;
|
const registry_enabled = !!__ENV.REGISTRY_FILE;
|
||||||
|
|
|
@ -30,8 +30,8 @@ const grpc_endpoint =
|
||||||
const grpc_client = native.connect(
|
const grpc_client = native.connect(
|
||||||
grpc_endpoint, '', __ENV.DIAL_TIMEOUT ? parseInt(__ENV.DIAL_TIMEOUT) : 5,
|
grpc_endpoint, '', __ENV.DIAL_TIMEOUT ? parseInt(__ENV.DIAL_TIMEOUT) : 5,
|
||||||
__ENV.STREAM_TIMEOUT ? parseInt(__ENV.STREAM_TIMEOUT) : 60,
|
__ENV.STREAM_TIMEOUT ? parseInt(__ENV.STREAM_TIMEOUT) : 60,
|
||||||
__ENV.PREPARE_LOCALLY ? __ENV.PREPARE_LOCALLY.toLowerCase() === 'true' :
|
__ENV.PREPARE_LOCALLY ? __ENV.PREPARE_LOCALLY.toLowerCase() === 'true' : false,
|
||||||
false);
|
1024 * parseInt(__ENV.MAX_OBJECT_SIZE || '0'));
|
||||||
const log = logging.new().withField('endpoint', grpc_endpoint);
|
const log = logging.new().withField('endpoint', grpc_endpoint);
|
||||||
|
|
||||||
const registry_enabled = !!__ENV.REGISTRY_FILE;
|
const registry_enabled = !!__ENV.REGISTRY_FILE;
|
||||||
|
|
|
@ -44,9 +44,8 @@ if (__ENV.GRPC_ENDPOINTS) {
|
||||||
grpc_client = native.connect(
|
grpc_client = native.connect(
|
||||||
grpcEndpoint, '', __ENV.DIAL_TIMEOUT ? parseInt(__ENV.DIAL_TIMEOUT) : 0,
|
grpcEndpoint, '', __ENV.DIAL_TIMEOUT ? parseInt(__ENV.DIAL_TIMEOUT) : 0,
|
||||||
__ENV.STREAM_TIMEOUT ? parseInt(__ENV.STREAM_TIMEOUT) : 0,
|
__ENV.STREAM_TIMEOUT ? parseInt(__ENV.STREAM_TIMEOUT) : 0,
|
||||||
__ENV.PREPARE_LOCALLY ? __ENV.PREPARE_LOCALLY.toLowerCase() === 'true' :
|
__ENV.PREPARE_LOCALLY ? __ENV.PREPARE_LOCALLY.toLowerCase() === 'true' : false,
|
||||||
false,
|
1024 * parseInt(__ENV.MAX_OBJECT_SIZE || '0'));
|
||||||
'');
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Connect to random S3 endpoint
|
// Connect to random S3 endpoint
|
||||||
|
|
Loading…
Reference in a new issue