[#114] datagen: Allow to generate streaming payloads

Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
This commit is contained in:
Evgenii Stratonikov 2024-01-11 21:59:36 +03:00
parent 74121bb387
commit 4544ec616b
7 changed files with 110 additions and 15 deletions

View file

@ -38,7 +38,7 @@ func (d *Datagen) Exports() modules.Exports {
return modules.Exports{Default: d} return modules.Exports{Default: d}
} }
func (d *Datagen) Generator(size int, typ string) *Generator { func (d *Datagen) Generator(size int, typ string, streaming bool) *Generator {
g := NewGenerator(d.vu, size, strings.ToLower(typ)) g := NewGenerator(d.vu, size, strings.ToLower(typ), streaming)
return &g return &g
} }

View file

@ -3,6 +3,7 @@ package datagen
import ( import (
"bytes" "bytes"
"math/rand" "math/rand"
"sync/atomic"
"time" "time"
"github.com/go-loremipsum/loremipsum" "github.com/go-loremipsum/loremipsum"
@ -25,6 +26,9 @@ type (
buf []byte buf []byte
typ string typ string
offset int offset int
streaming bool
seed *atomic.Int64
} }
) )
@ -37,7 +41,7 @@ var payloadTypes = []string{
"", "",
} }
func NewGenerator(vu modules.VU, size int, typ string) Generator { func NewGenerator(vu modules.VU, size int, typ string, streaming bool) Generator {
if size <= 0 { if size <= 0 {
panic("size should be positive") panic("size should be positive")
} }
@ -52,17 +56,20 @@ func NewGenerator(vu modules.VU, size int, typ string) Generator {
if !found { if !found {
vu.InitEnv().Logger.Info("Unknown payload type '%s', random will be used.", typ) vu.InitEnv().Logger.Info("Unknown payload type '%s', random will be used.", typ)
} }
r := rand.New(rand.NewSource(time.Now().UnixNano()))
buf := make([]byte, size+TailSize)
g := Generator{ g := Generator{
vu: vu, vu: vu,
size: size, size: size,
rand: r,
buf: buf,
typ: typ, typ: typ,
} }
g.fillBuffer()
if streaming {
g.streaming = true
g.seed = new(atomic.Int64)
} else {
g.rand = rand.New(rand.NewSource(time.Now().UnixNano()))
g.buf = make([]byte, size+TailSize)
g.fillBuffer()
}
return g return g
} }
@ -82,6 +89,10 @@ func (g *Generator) fillBuffer() {
} }
func (g *Generator) GenPayload(calcHash bool) Payload { func (g *Generator) GenPayload(calcHash bool) Payload {
if g.streaming {
return NewStreamPayload(g.size, g.seed.Add(1), g.typ)
}
data := g.nextSlice() data := g.nextSlice()
return NewFixedPayload(data) return NewFixedPayload(data)
} }

View file

@ -16,25 +16,25 @@ func TestGenerator(t *testing.T) {
t.Run("fails on negative size", func(t *testing.T) { t.Run("fails on negative size", func(t *testing.T) {
require.Panics(t, func() { require.Panics(t, func() {
_ = NewGenerator(vu, -1, "") _ = NewGenerator(vu, -1, "", false)
}) })
}) })
t.Run("fails on zero size", func(t *testing.T) { t.Run("fails on zero size", func(t *testing.T) {
require.Panics(t, func() { require.Panics(t, func() {
_ = NewGenerator(vu, 0, "") _ = NewGenerator(vu, 0, "", false)
}) })
}) })
t.Run("creates slice of specified size", func(t *testing.T) { t.Run("creates slice of specified size", func(t *testing.T) {
size := 10 size := 10
g := NewGenerator(vu, size, "") g := NewGenerator(vu, size, "", false)
slice := g.nextSlice() slice := g.nextSlice()
require.Len(t, slice, size) require.Len(t, slice, size)
}) })
t.Run("creates a different slice on each call", func(t *testing.T) { t.Run("creates a different slice on each call", func(t *testing.T) {
g := NewGenerator(vu, 1000, "") g := NewGenerator(vu, 1000, "", false)
slice1 := g.nextSlice() slice1 := g.nextSlice()
slice2 := g.nextSlice() slice2 := g.nextSlice()
// Each slice should be unique (assuming that 1000 random bytes will never coincide // Each slice should be unique (assuming that 1000 random bytes will never coincide
@ -43,7 +43,7 @@ func TestGenerator(t *testing.T) {
}) })
t.Run("keeps generating slices after consuming entire tail", func(t *testing.T) { t.Run("keeps generating slices after consuming entire tail", func(t *testing.T) {
g := NewGenerator(vu, 1000, "") g := NewGenerator(vu, 1000, "", false)
initialSlice := g.nextSlice() initialSlice := g.nextSlice()
for i := 0; i < TailSize; i++ { for i := 0; i < TailSize; i++ {
g.nextSlice() g.nextSlice()

View file

@ -1,10 +1,15 @@
package datagen package datagen
import ( import (
"bufio"
"bytes" "bytes"
"crypto/sha256" "crypto/sha256"
"encoding/hex" "encoding/hex"
"hash"
"io" "io"
"math/rand"
"github.com/go-loremipsum/loremipsum"
) )
// Payload represents arbitrary data to be packed into S3 or native object. // Payload represents arbitrary data to be packed into S3 or native object.
@ -48,3 +53,69 @@ func (p *bytesPayload) Bytes() []byte {
func NewFixedPayload(data []byte) Payload { func NewFixedPayload(data []byte) Payload {
return &bytesPayload{data: data} return &bytesPayload{data: data}
} }
type randomPayload struct {
r io.Reader
s hash.Hash
h string
size int
}
func NewStreamPayload(size int, seed int64, typ string) Payload {
var rr io.Reader
switch typ {
case "text":
rr = &textReader{li: loremipsum.NewWithSeed(seed)}
default:
rr = rand.New(rand.NewSource(seed))
}
lr := io.LimitReader(rr, int64(size))
// We need some buffering to write complete blocks in the TeeReader.
// Streaming payload read is expected to be used for big objects, thus 4k seems like a good choice.
br := bufio.NewReaderSize(lr, 4096)
s := sha256.New()
tr := io.TeeReader(br, s)
return &randomPayload{
r: tr,
s: s,
size: size,
}
}
func (p *randomPayload) Reader() io.Reader {
return p.r
}
func (p *randomPayload) Size() int {
return p.size
}
func (p *randomPayload) Hash() string {
if p.h == "" {
p.h = hex.EncodeToString(p.s.Sum(nil))
// Prevent possible misuse.
p.r = nil
p.s = nil
}
return p.h
}
func (p *randomPayload) Bytes() []byte {
data, err := io.ReadAll(p.r)
if err != nil {
// We use only 2 readers, either `bytes.Reader` or `rand.Reader`.
// None of them returns errors, thus encountering an error is a fatal error.
panic(err)
}
return data
}
type textReader struct {
li *loremipsum.LoremIpsum
}
func (r *textReader) Read(p []byte) (n int, err error) {
paragraph := r.li.Paragraph()
return copy(p, paragraph), nil
}

View file

@ -26,3 +26,15 @@ func TestFixedPayload(t *testing.T) {
h := sha256.Sum256(data) h := sha256.Sum256(data)
require.Equal(t, hex.EncodeToString(h[:]), p.Hash()) require.Equal(t, hex.EncodeToString(h[:]), p.Hash())
} }
func TestStreamingPayload(t *testing.T) {
const size = 123
p := NewStreamPayload(size, 0, "")
require.Equal(t, size, p.Size())
actual, err := io.ReadAll(p.Reader())
require.NoError(t, err)
require.Equal(t, size, len(actual))
require.Equal(t, sha256.Size*2, len(p.Hash()))
}

View file

@ -2,7 +2,7 @@ import datagen from 'k6/x/frostfs/datagen';
export function newGenerator(condition) { export function newGenerator(condition) {
if (condition) { if (condition) {
return datagen.generator(1024 * parseInt(__ENV.WRITE_OBJ_SIZE), __ENV.PAYLOAD_TYPE || ""); return datagen.generator(1024 * parseInt(__ENV.WRITE_OBJ_SIZE), __ENV.PAYLOAD_TYPE || "", !!__ENV.STREAMING);
} }
return undefined; return undefined;
} }

View file

@ -19,6 +19,7 @@ Scenarios `grpc.js`, `local.js`, `http.js` and `s3.js` support the following opt
* `SLEEP_READ` - time interval (in seconds) between reading VU iterations. * `SLEEP_READ` - time interval (in seconds) between reading VU iterations.
* `SELECTION_SIZE` - size of batch to select for deletion (default: 1000). * `SELECTION_SIZE` - size of batch to select for deletion (default: 1000).
* `PAYLOAD_TYPE` - type of an object payload ("random" or "text", default: "random"). * `PAYLOAD_TYPE` - type of an object payload ("random" or "text", default: "random").
* `STREAMING` - if set, the payload is generated on the fly and is not read into memory fully.
Additionally, the profiling extension can be enabled to generate CPU and memory profiles which can be inspected with `go tool pprof file.prof`: Additionally, the profiling extension can be enabled to generate CPU and memory profiles which can be inspected with `go tool pprof file.prof`:
```shell ```shell