From 4544ec616bfa31dbf337cfe2aaa3ee729c188346 Mon Sep 17 00:00:00 2001 From: Evgenii Stratonikov Date: Thu, 11 Jan 2024 21:59:36 +0300 Subject: [PATCH] [#114] datagen: Allow to generate streaming payloads Signed-off-by: Evgenii Stratonikov --- internal/datagen/datagen.go | 4 +- internal/datagen/generator.go | 25 ++++++++--- internal/datagen/generator_test.go | 10 ++--- internal/datagen/payload.go | 71 ++++++++++++++++++++++++++++++ internal/datagen/payload_test.go | 12 +++++ scenarios/libs/datagen.js | 2 +- scenarios/run_scenarios.md | 1 + 7 files changed, 110 insertions(+), 15 deletions(-) diff --git a/internal/datagen/datagen.go b/internal/datagen/datagen.go index 7994875..2efeb2b 100644 --- a/internal/datagen/datagen.go +++ b/internal/datagen/datagen.go @@ -38,7 +38,7 @@ func (d *Datagen) Exports() modules.Exports { return modules.Exports{Default: d} } -func (d *Datagen) Generator(size int, typ string) *Generator { - g := NewGenerator(d.vu, size, strings.ToLower(typ)) +func (d *Datagen) Generator(size int, typ string, streaming bool) *Generator { + g := NewGenerator(d.vu, size, strings.ToLower(typ), streaming) return &g } diff --git a/internal/datagen/generator.go b/internal/datagen/generator.go index 8dbc093..0617ddc 100644 --- a/internal/datagen/generator.go +++ b/internal/datagen/generator.go @@ -3,6 +3,7 @@ package datagen import ( "bytes" "math/rand" + "sync/atomic" "time" "github.com/go-loremipsum/loremipsum" @@ -25,6 +26,9 @@ type ( buf []byte typ string offset int + + streaming bool + seed *atomic.Int64 } ) @@ -37,7 +41,7 @@ var payloadTypes = []string{ "", } -func NewGenerator(vu modules.VU, size int, typ string) Generator { +func NewGenerator(vu modules.VU, size int, typ string, streaming bool) Generator { if size <= 0 { panic("size should be positive") } @@ -52,17 +56,20 @@ func NewGenerator(vu modules.VU, size int, typ string) Generator { if !found { vu.InitEnv().Logger.Info("Unknown payload type '%s', random will be used.", typ) } - - r := rand.New(rand.NewSource(time.Now().UnixNano())) - buf := make([]byte, size+TailSize) g := Generator{ vu: vu, size: size, - rand: r, - buf: buf, typ: typ, } - g.fillBuffer() + + if streaming { + g.streaming = true + g.seed = new(atomic.Int64) + } else { + g.rand = rand.New(rand.NewSource(time.Now().UnixNano())) + g.buf = make([]byte, size+TailSize) + g.fillBuffer() + } return g } @@ -82,6 +89,10 @@ func (g *Generator) fillBuffer() { } func (g *Generator) GenPayload(calcHash bool) Payload { + if g.streaming { + return NewStreamPayload(g.size, g.seed.Add(1), g.typ) + } + data := g.nextSlice() return NewFixedPayload(data) } diff --git a/internal/datagen/generator_test.go b/internal/datagen/generator_test.go index a82dd25..cbc4c0f 100644 --- a/internal/datagen/generator_test.go +++ b/internal/datagen/generator_test.go @@ -16,25 +16,25 @@ func TestGenerator(t *testing.T) { t.Run("fails on negative size", func(t *testing.T) { require.Panics(t, func() { - _ = NewGenerator(vu, -1, "") + _ = NewGenerator(vu, -1, "", false) }) }) t.Run("fails on zero size", func(t *testing.T) { require.Panics(t, func() { - _ = NewGenerator(vu, 0, "") + _ = NewGenerator(vu, 0, "", false) }) }) t.Run("creates slice of specified size", func(t *testing.T) { size := 10 - g := NewGenerator(vu, size, "") + g := NewGenerator(vu, size, "", false) slice := g.nextSlice() require.Len(t, slice, size) }) t.Run("creates a different slice on each call", func(t *testing.T) { - g := NewGenerator(vu, 1000, "") + g := NewGenerator(vu, 1000, "", false) slice1 := g.nextSlice() slice2 := g.nextSlice() // Each slice should be unique (assuming that 1000 random bytes will never coincide @@ -43,7 +43,7 @@ func TestGenerator(t *testing.T) { }) t.Run("keeps generating slices after consuming entire tail", func(t *testing.T) { - g := NewGenerator(vu, 1000, "") + g := NewGenerator(vu, 1000, "", false) initialSlice := g.nextSlice() for i := 0; i < TailSize; i++ { g.nextSlice() diff --git a/internal/datagen/payload.go b/internal/datagen/payload.go index efd0c20..65b5425 100644 --- a/internal/datagen/payload.go +++ b/internal/datagen/payload.go @@ -1,10 +1,15 @@ package datagen import ( + "bufio" "bytes" "crypto/sha256" "encoding/hex" + "hash" "io" + "math/rand" + + "github.com/go-loremipsum/loremipsum" ) // Payload represents arbitrary data to be packed into S3 or native object. @@ -48,3 +53,69 @@ func (p *bytesPayload) Bytes() []byte { func NewFixedPayload(data []byte) Payload { return &bytesPayload{data: data} } + +type randomPayload struct { + r io.Reader + s hash.Hash + h string + size int +} + +func NewStreamPayload(size int, seed int64, typ string) Payload { + var rr io.Reader + switch typ { + case "text": + rr = &textReader{li: loremipsum.NewWithSeed(seed)} + default: + rr = rand.New(rand.NewSource(seed)) + } + + lr := io.LimitReader(rr, int64(size)) + // We need some buffering to write complete blocks in the TeeReader. + // Streaming payload read is expected to be used for big objects, thus 4k seems like a good choice. + br := bufio.NewReaderSize(lr, 4096) + s := sha256.New() + tr := io.TeeReader(br, s) + return &randomPayload{ + r: tr, + s: s, + size: size, + } +} + +func (p *randomPayload) Reader() io.Reader { + return p.r +} + +func (p *randomPayload) Size() int { + return p.size +} + +func (p *randomPayload) Hash() string { + if p.h == "" { + p.h = hex.EncodeToString(p.s.Sum(nil)) + // Prevent possible misuse. + p.r = nil + p.s = nil + } + return p.h +} + +func (p *randomPayload) Bytes() []byte { + data, err := io.ReadAll(p.r) + if err != nil { + // We use only 2 readers, either `bytes.Reader` or `rand.Reader`. + // None of them returns errors, thus encountering an error is a fatal error. + panic(err) + } + return data +} + +type textReader struct { + li *loremipsum.LoremIpsum +} + +func (r *textReader) Read(p []byte) (n int, err error) { + paragraph := r.li.Paragraph() + return copy(p, paragraph), nil +} diff --git a/internal/datagen/payload_test.go b/internal/datagen/payload_test.go index c85f3b0..b7351b3 100644 --- a/internal/datagen/payload_test.go +++ b/internal/datagen/payload_test.go @@ -26,3 +26,15 @@ func TestFixedPayload(t *testing.T) { h := sha256.Sum256(data) require.Equal(t, hex.EncodeToString(h[:]), p.Hash()) } + +func TestStreamingPayload(t *testing.T) { + const size = 123 + + p := NewStreamPayload(size, 0, "") + require.Equal(t, size, p.Size()) + + actual, err := io.ReadAll(p.Reader()) + require.NoError(t, err) + require.Equal(t, size, len(actual)) + require.Equal(t, sha256.Size*2, len(p.Hash())) +} diff --git a/scenarios/libs/datagen.js b/scenarios/libs/datagen.js index d6feba2..37bb460 100644 --- a/scenarios/libs/datagen.js +++ b/scenarios/libs/datagen.js @@ -2,7 +2,7 @@ import datagen from 'k6/x/frostfs/datagen'; export function newGenerator(condition) { if (condition) { - return datagen.generator(1024 * parseInt(__ENV.WRITE_OBJ_SIZE), __ENV.PAYLOAD_TYPE || ""); + return datagen.generator(1024 * parseInt(__ENV.WRITE_OBJ_SIZE), __ENV.PAYLOAD_TYPE || "", !!__ENV.STREAMING); } return undefined; } diff --git a/scenarios/run_scenarios.md b/scenarios/run_scenarios.md index 846e075..c94ad5a 100644 --- a/scenarios/run_scenarios.md +++ b/scenarios/run_scenarios.md @@ -19,6 +19,7 @@ Scenarios `grpc.js`, `local.js`, `http.js` and `s3.js` support the following opt * `SLEEP_READ` - time interval (in seconds) between reading VU iterations. * `SELECTION_SIZE` - size of batch to select for deletion (default: 1000). * `PAYLOAD_TYPE` - type of an object payload ("random" or "text", default: "random"). + * `STREAMING` - if set, the payload is generated on the fly and is not read into memory fully. Additionally, the profiling extension can be enabled to generate CPU and memory profiles which can be inspected with `go tool pprof file.prof`: ```shell