forked from TrueCloudLab/xk6-frostfs
Compare commits
4 commits
3bc1229062
...
20c86084af
Author | SHA1 | Date | |
---|---|---|---|
20c86084af | |||
335c45c578 | |||
e7d4dd404a | |||
0a9aeab47c |
2 changed files with 31 additions and 15 deletions
|
@ -9,6 +9,8 @@ import (
|
||||||
"go.etcd.io/bbolt"
|
"go.etcd.io/bbolt"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const nextObjectTimeout = 10 * time.Second
|
||||||
|
|
||||||
type ObjFilter struct {
|
type ObjFilter struct {
|
||||||
Status string
|
Status string
|
||||||
Age int
|
Age int
|
||||||
|
@ -57,7 +59,16 @@ func NewObjSelector(registry *ObjRegistry, selectionSize int, kind SelectorKind,
|
||||||
// - underlying registry context is done, nil objects will be returned on the
|
// - underlying registry context is done, nil objects will be returned on the
|
||||||
// currently blocked and every further NextObject calls.
|
// currently blocked and every further NextObject calls.
|
||||||
func (o *ObjSelector) NextObject() *ObjectInfo {
|
func (o *ObjSelector) NextObject() *ObjectInfo {
|
||||||
return <-o.objChan
|
if o.kind == SelectorOneshot {
|
||||||
|
return <-o.objChan
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-time.After(nextObjectTimeout):
|
||||||
|
return nil
|
||||||
|
case obj := <-o.objChan:
|
||||||
|
return obj
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Count returns total number of objects that match filter of the selector.
|
// Count returns total number of objects that match filter of the selector.
|
||||||
|
|
|
@ -71,11 +71,23 @@ if (write_vu_count > 0) {
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const read_vu_count = parseInt(__ENV.READERS || '0');
|
||||||
|
if (read_vu_count > 0) {
|
||||||
|
scenarios.read = {
|
||||||
|
executor : 'constant-vus',
|
||||||
|
vus : read_vu_count,
|
||||||
|
duration : `${duration}s`,
|
||||||
|
exec : 'obj_read',
|
||||||
|
gracefulStop : '5s',
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
const delete_age = __ENV.DELETE_AGE ? parseInt(__ENV.DELETE_AGE) : undefined;
|
const delete_age = __ENV.DELETE_AGE ? parseInt(__ENV.DELETE_AGE) : undefined;
|
||||||
let obj_to_delete_selector = undefined;
|
let obj_to_delete_selector = undefined;
|
||||||
let obj_to_delete_exit_on_null = undefined;
|
let obj_to_delete_exit_on_null = undefined;
|
||||||
if (registry_enabled && delete_age) {
|
|
||||||
obj_to_delete_exit_on_null = write_vu_count == 0;
|
if (registry_enabled ) {
|
||||||
|
obj_to_delete_exit_on_null = (write_vu_count == 0) && (read_vu_count == 0)
|
||||||
|
|
||||||
let constructor = obj_to_delete_exit_on_null ? registry.getOneshotSelector
|
let constructor = obj_to_delete_exit_on_null ? registry.getOneshotSelector
|
||||||
: registry.getSelector;
|
: registry.getSelector;
|
||||||
|
@ -88,16 +100,7 @@ if (registry_enabled && delete_age) {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
const read_vu_count = parseInt(__ENV.READERS || '0');
|
|
||||||
if (read_vu_count > 0) {
|
|
||||||
scenarios.read = {
|
|
||||||
executor : 'constant-vus',
|
|
||||||
vus : read_vu_count,
|
|
||||||
duration : `${duration}s`,
|
|
||||||
exec : 'obj_read',
|
|
||||||
gracefulStop : '5s',
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
const delete_vu_count = parseInt(__ENV.DELETERS || '0');
|
const delete_vu_count = parseInt(__ENV.DELETERS || '0');
|
||||||
if (delete_vu_count > 0) {
|
if (delete_vu_count > 0) {
|
||||||
|
@ -162,11 +165,12 @@ export function obj_write() {
|
||||||
const payload = generator.genPayload();
|
const payload = generator.genPayload();
|
||||||
const resp = s3_client.put(bucket, key, payload);
|
const resp = s3_client.put(bucket, key, payload);
|
||||||
if (!resp.success) {
|
if (!resp.success) {
|
||||||
log.withFields({bucket : bucket, key : key}).error(resp.error);
|
log.withFields({bucket : bucket, key : key, op: "write"}).error(resp.error);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (obj_registry) {
|
if (obj_registry) {
|
||||||
|
log.withFields({bucket : bucket, key : key, op: "write"}).info("add to registry");
|
||||||
obj_registry.addObject('', '', bucket, key, payload.hash());
|
obj_registry.addObject('', '', bucket, key, payload.hash());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -186,6 +190,7 @@ export function obj_read() {
|
||||||
log.withFields({bucket : obj.s3_bucket, key : obj.s3_key, status: obj.status, op: `READ`})
|
log.withFields({bucket : obj.s3_bucket, key : obj.s3_key, status: obj.status, op: `READ`})
|
||||||
.error(resp.error);
|
.error(resp.error);
|
||||||
} else {
|
} else {
|
||||||
|
log.withFields({bucket : obj.s3_bucket, key : obj.s3_key, op: "read"}).info("set read status");
|
||||||
obj_registry.setObjectStatus(obj.id, obj.status, 'read');
|
obj_registry.setObjectStatus(obj.id, obj.status, 'read');
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -225,6 +230,6 @@ export function delete_object(obj) {
|
||||||
.error(resp.error);
|
.error(resp.error);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
log.withFields({bucket : obj.s3_bucket, key : obj.s3_key, op: "delete"}).info("delete from registry");
|
||||||
obj_registry.deleteObject(obj.id);
|
obj_registry.deleteObject(obj.id);
|
||||||
}
|
}
|
Loading…
Reference in a new issue