From 58a8ce4e479bbebf0fc6a96221c36e25a1489184 Mon Sep 17 00:00:00 2001 From: Alex Vanin Date: Mon, 27 May 2019 10:45:29 +0300 Subject: [PATCH] Added weighted HRW sorting (#5) * Added weighted HRW sorting This commit proposes renaming of old `SortByWeight` functions to `Sort` and implementation of `SortByWeight` function with explicit weights in arguments. `SortByWeight` function calculates normalized hashes of nodes and normalized input weights. Then multiplies these values to obtain node's actual weight for later sorting. - renamed `SortByWeight` function to `Sort` - added `SortByWeight`, `SortSliceByWeightValue` and `SortSliceBeWeightIndex` functions - moved code with reflection processing into `prepareRule` function - added tests and benchmarks for new weighted functions - added benchmark results into README * Fixed comments --- README.md | 29 +++-- hrw.go | 184 +++++++++++++++++++++++--------- hrw_test.go | 297 +++++++++++++++++++++++++++++++++++++++++++++++++--- 3 files changed, 438 insertions(+), 72 deletions(-) diff --git a/README.md b/README.md index abb4b92..bec9cbd 100644 --- a/README.md +++ b/README.md @@ -14,15 +14,26 @@ ## Benchmark: ``` -BenchmarkSortByWeight_fnv_10-8 3000000 435 ns/op 224 B/op 3 allocs/op -BenchmarkSortByWeight_fnv_100-8 200000 7238 ns/op 1856 B/op 3 allocs/op -BenchmarkSortByWeight_fnv_1000-8 10000 163158 ns/op 16448 B/op 3 allocs/op -BenchmarkSortByIndex_fnv_10-8 2000000 642 ns/op 384 B/op 7 allocs/op -BenchmarkSortByIndex_fnv_100-8 200000 8045 ns/op 2928 B/op 7 allocs/op -BenchmarkSortByIndex_fnv_1000-8 10000 227527 ns/op 25728 B/op 7 allocs/op -BenchmarkSortByValue_fnv_10-8 1000000 1244 ns/op 544 B/op 17 allocs/op -BenchmarkSortByValue_fnv_100-8 100000 12397 ns/op 4528 B/op 107 allocs/op -BenchmarkSortByValue_fnv_1000-8 10000 154278 ns/op 41728 B/op 1007 allocs/op +BenchmarkSort_fnv_10-8 5000000 354 ns/op 224 B/op 3 allocs/op +BenchmarkSort_fnv_100-8 300000 5103 ns/op 1856 B/op 3 allocs/op +BenchmarkSort_fnv_1000-8 10000 115874 ns/op 16448 B/op 3 allocs/op +BenchmarkSortByIndex_fnv_10-8 3000000 562 ns/op 384 B/op 7 allocs/op +BenchmarkSortByIndex_fnv_100-8 200000 5819 ns/op 2928 B/op 7 allocs/op +BenchmarkSortByIndex_fnv_1000-8 10000 125859 ns/op 25728 B/op 7 allocs/op +BenchmarkSortByValue_fnv_10-8 2000000 1056 ns/op 544 B/op 17 allocs/op +BenchmarkSortByValue_fnv_100-8 200000 9593 ns/op 4528 B/op 107 allocs/op +BenchmarkSortByValue_fnv_1000-8 10000 109272 ns/op 41728 B/op 1007 allocs/op + +BenchmarkSortByWeight_fnv_10-8 3000000 500 ns/op 320 B/op 4 allocs/op +BenchmarkSortByWeight_fnv_100-8 200000 8257 ns/op 2768 B/op 4 allocs/op +BenchmarkSortByWeight_fnv_1000-8 10000 197938 ns/op 24656 B/op 4 allocs/op +BenchmarkSortByWeightIndex_fnv_10-8 2000000 760 ns/op 480 B/op 8 allocs/op +BenchmarkSortByWeightIndex_fnv_100-8 200000 9191 ns/op 3840 B/op 8 allocs/op +BenchmarkSortByWeightIndex_fnv_1000-8 10000 208204 ns/op 33936 B/op 8 allocs/op +BenchmarkSortByWeightValue_fnv_10-8 1000000 1095 ns/op 640 B/op 18 allocs/op +BenchmarkSortByWeightValue_fnv_100-8 200000 12291 ns/op 5440 B/op 108 allocs/op +BenchmarkSortByWeightValue_fnv_1000-8 10000 145125 ns/op 49936 B/op 1008 allocs/op + ``` ## Example diff --git a/hrw.go b/hrw.go index ceb985b..1d3c4c1 100644 --- a/hrw.go +++ b/hrw.go @@ -21,6 +21,11 @@ type ( sorted []uint64 weight []uint64 } + + weighted struct { + h hashed + normal []float64 // normalized input weights + } ) func weight(x uint64, y uint64) uint64 { @@ -36,16 +41,29 @@ func weight(x uint64, y uint64) uint64 { } func (h hashed) Len() int { return h.length } -func (h hashed) Less(i, j int) bool { return h.weight[h.sorted[i]] < h.weight[h.sorted[j]] } -func (h hashed) Swap(i, j int) { h.sorted[i], h.sorted[j] = h.sorted[j], h.sorted[i] } +func (h hashed) Less(i, j int) bool { return h.weight[i] < h.weight[j] } +func (h hashed) Swap(i, j int) { + h.sorted[i], h.sorted[j] = h.sorted[j], h.sorted[i] + h.weight[i], h.weight[j] = h.weight[j], h.weight[i] +} + +func (w weighted) Len() int { return w.h.length } +func (w weighted) Less(i, j int) bool { + // `maxUint64 - weight` makes least weight most valuable + // it is necessary for operation with normalized values + wi := float64(^uint64(0)-w.h.weight[i]) * w.normal[i] + wj := float64(^uint64(0)-w.h.weight[j]) * w.normal[j] + return wi > wj // higher weight must be placed lower to be first +} +func (w weighted) Swap(i, j int) { w.normal[i], w.normal[j] = w.normal[j], w.normal[i]; w.h.Swap(i, j) } // Hash uses murmur3 hash to return uint64 func Hash(key []byte) uint64 { return murmur3.Sum64(key) } -// SortByWeight receive nodes and hash, and sort it by weight -func SortByWeight(nodes []uint64, hash uint64) []uint64 { +// Sort receive nodes and hash, and sort it by weight +func Sort(nodes []uint64, hash uint64) []uint64 { var ( l = len(nodes) h = hashed{ @@ -64,22 +82,129 @@ func SortByWeight(nodes []uint64, hash uint64) []uint64 { return h.sorted } +// SortByWeight receive nodes and hash, and sort it by weight +func SortByWeight(nodes []uint64, weights []uint64, hash uint64) []uint64 { + var ( + maxWeight uint64 + + l = len(nodes) + w = weighted{ + h: hashed{ + length: l, + sorted: make([]uint64, 0, l), + weight: make([]uint64, 0, l), + }, + normal: make([]float64, 0, l), + } + ) + + // finding max weight to perform normalization + for i := range weights { + if maxWeight < weights[i] { + maxWeight = weights[i] + } + } + + // if all nodes have 0-weights or weights are incorrect then sort uniformly + if maxWeight == 0 || l != len(nodes) { + return Sort(nodes, hash) + } + + fMaxWeight := float64(maxWeight) + for i, node := range nodes { + w.h.sorted = append(w.h.sorted, uint64(i)) + w.h.weight = append(w.h.weight, weight(node, hash)) + w.normal = append(w.normal, float64(weights[i])/fMaxWeight) + } + sort.Sort(w) + return w.h.sorted +} + // SortSliceByValue received []T and hash to sort by value-weight func SortSliceByValue(slice interface{}, hash uint64) { + rule := prepareRule(slice) + if rule != nil { + swap := reflect.Swapper(slice) + rule = Sort(rule, hash) + sortByRuleInverse(swap, uint64(len(rule)), rule) + } +} + +// SortSliceByWeightValue received []T, weights and hash to sort by value-weight +func SortSliceByWeightValue(slice interface{}, weight []uint64, hash uint64) { + rule := prepareRule(slice) + if rule != nil { + swap := reflect.Swapper(slice) + rule = SortByWeight(rule, weight, hash) + sortByRuleInverse(swap, uint64(len(rule)), rule) + } +} + +// SortSliceByIndex received []T and hash to sort by index-weight +func SortSliceByIndex(slice interface{}, hash uint64) { + length := uint64(reflect.ValueOf(slice).Len()) + swap := reflect.Swapper(slice) + rule := make([]uint64, 0, length) + for i := uint64(0); i < length; i++ { + rule = append(rule, i) + } + rule = Sort(rule, hash) + sortByRuleInverse(swap, length, rule) +} + +// SortSliceByWeightIndex received []T, weights and hash to sort by index-weight +func SortSliceByWeightIndex(slice interface{}, weight []uint64, hash uint64) { + length := uint64(reflect.ValueOf(slice).Len()) + swap := reflect.Swapper(slice) + rule := make([]uint64, 0, length) + for i := uint64(0); i < length; i++ { + rule = append(rule, i) + } + rule = SortByWeight(rule, weight, hash) + sortByRuleInverse(swap, length, rule) +} + +func sortByRuleDirect(swap swapper, length uint64, rule []uint64) { + done := make([]bool, length) + for i := uint64(0); i < length; i++ { + if done[i] { + continue + } + for j := rule[i]; !done[rule[j]]; j = rule[j] { + swap(int(i), int(j)) + done[j] = true + } + } +} + +func sortByRuleInverse(swap swapper, length uint64, rule []uint64) { + done := make([]bool, length) + for i := uint64(0); i < length; i++ { + if done[i] { + continue + } + + for j := i; !done[rule[j]]; j = rule[j] { + swap(int(j), int(rule[j])) + done[j] = true + } + } +} + +func prepareRule(slice interface{}) []uint64 { t := reflect.TypeOf(slice) if t.Kind() != reflect.Slice { - return + return nil } var ( val = reflect.ValueOf(slice) - swap = reflect.Swapper(slice) length = val.Len() rule = make([]uint64, 0, length) ) if length == 0 { - return + return nil } switch slice := slice.(type) { @@ -148,7 +273,7 @@ func SortSliceByValue(slice interface{}, hash uint64) { default: if _, ok := val.Index(0).Interface().(Hasher); !ok { - return + return nil } for i := 0; i < length; i++ { @@ -156,46 +281,5 @@ func SortSliceByValue(slice interface{}, hash uint64) { rule = append(rule, h.Hash()) } } - - rule = SortByWeight(rule, hash) - sortByRuleInverse(swap, uint64(length), rule) -} - -// SortSliceByIndex received []T and hash to sort by index-weight -func SortSliceByIndex(slice interface{}, hash uint64) { - length := uint64(reflect.ValueOf(slice).Len()) - swap := reflect.Swapper(slice) - rule := make([]uint64, 0, length) - for i := uint64(0); i < length; i++ { - rule = append(rule, i) - } - rule = SortByWeight(rule, hash) - sortByRuleInverse(swap, length, rule) -} - -func sortByRuleDirect(swap swapper, length uint64, rule []uint64) { - done := make([]bool, length) - for i := uint64(0); i < length; i++ { - if done[i] { - continue - } - for j := rule[i]; !done[rule[j]]; j = rule[j] { - swap(int(i), int(j)) - done[j] = true - } - } -} - -func sortByRuleInverse(swap swapper, length uint64, rule []uint64) { - done := make([]bool, length) - for i := uint64(0); i < length; i++ { - if done[i] { - continue - } - - for j := i; !done[rule[j]]; j = rule[j] { - swap(int(j), int(rule[j])) - done[j] = true - } - } + return rule } diff --git a/hrw_test.go b/hrw_test.go index 5fd86b8..3703a55 100644 --- a/hrw_test.go +++ b/hrw_test.go @@ -4,6 +4,7 @@ import ( "encoding/binary" "fmt" "math" + "math/rand" "reflect" "strconv" "testing" @@ -72,6 +73,17 @@ func TestSortSliceByIndex(t *testing.T) { } } +func TestSortSliceByWeightIndex(t *testing.T) { + actual := []string{"a", "b", "c", "d", "e", "f"} + weights := []uint64{10, 10, 10, 2, 2, 2} + expect := []string{"a", "c", "b", "e", "f", "d"} + hash := Hash(testKey) + SortSliceByWeightIndex(actual, weights, hash) + if !reflect.DeepEqual(actual, expect) { + t.Errorf("Was %#v, but expected %#v", actual, expect) + } +} + func TestSortSliceByValue(t *testing.T) { actual := []string{"a", "b", "c", "d", "e", "f"} expect := []string{"d", "f", "c", "b", "a", "e"} @@ -219,17 +231,17 @@ func TestSortSliceByValueIntSlice(t *testing.T) { } } -func TestSortByWeight(t *testing.T) { +func TestSort(t *testing.T) { nodes := []uint64{1, 2, 3, 4, 5} hash := Hash(testKey) - actual := SortByWeight(nodes, hash) + actual := Sort(nodes, hash) expected := []uint64{3, 1, 4, 2, 0} if !reflect.DeepEqual(actual, expected) { t.Errorf("Was %#v, but expected %#v", actual, expected) } } -func TestUniformDistribution(t *testing.T) { +func TestDistribution(t *testing.T) { const ( size = 10 keys = 100000 @@ -240,7 +252,7 @@ func TestUniformDistribution(t *testing.T) { // https://www.medcalc.org/manual/chi-square-table.php p=0.1 var chiTable = map[int]float64{9: 14.68, 99: 117.407} - t.Run("sortByWeight", func(t *testing.T) { + t.Run("sort", func(t *testing.T) { var ( i uint64 nodes [size]uint64 @@ -255,7 +267,7 @@ func TestUniformDistribution(t *testing.T) { for i = 0; i < keys; i++ { binary.BigEndian.PutUint64(key, i+size) hash := Hash(key) - counts[SortByWeight(nodes[:], hash)[0]]++ + counts[Sort(nodes[:], hash)[0]]++ } var chi2 float64 @@ -439,6 +451,170 @@ func TestUniformDistribution(t *testing.T) { } }) + t.Run("sortByWeightValue", func(t *testing.T) { + var ( + i uint64 + a, b, result [size]int + w [size]uint64 + key = make([]byte, 16) + ) + + for i = 0; i < size; i++ { + a[i] = int(i) + w[i] = size - i + } + for i = 0; i < keys; i++ { + copy(b[:], a[:]) + binary.BigEndian.PutUint64(key, i+size) + hash := Hash(key) + SortSliceByWeightValue(b[:], w[:], hash) + result[b[0]]++ + } + for i := 0; i < size-1; i++ { + if bool(w[i] > w[i+1]) != bool(result[i] > result[i+1]) { + t.Fatalf("result array %v must be corresponded to weights %v", result, w) + } + } + }) + + t.Run("sortByWeightValueShuffledW", func(t *testing.T) { + var ( + i uint64 + a, b, result [size]int + w [size]uint64 + key = make([]byte, 16) + ) + + for i = 0; i < size; i++ { + a[i] = int(i) + w[i] = size - i + } + + rand.Shuffle(size, func(i, j int) { + w[i], w[j] = w[j], w[i] + }) + for i = 0; i < keys; i++ { + copy(b[:], a[:]) + binary.BigEndian.PutUint64(key, i+size) + hash := Hash(key) + SortSliceByWeightValue(b[:], w[:], hash) + result[b[0]]++ + } + for i := 0; i < size-1; i++ { + if bool(w[i] > w[i+1]) != bool(result[i] > result[i+1]) { + t.Fatalf("result array %v must be corresponded to weights %v", result, w) + } + } + }) + + t.Run("sortByWeightValueEmptyW", func(t *testing.T) { + var ( + i uint64 + a, b [size]int + w [size]uint64 + counts = make(map[int]int, size) + key = make([]byte, 16) + ) + + for i = 0; i < size; i++ { + a[i] = int(i) + } + + for i = 0; i < keys; i++ { + copy(b[:], a[:]) + binary.BigEndian.PutUint64(key, i+size) + hash := Hash(key) + SortSliceByWeightValue(b[:], w[:], hash) + counts[b[0]]++ + } + + var chi2 float64 + mean := float64(keys) / float64(size) + delta := mean * percent + for node, count := range counts { + d := mean - float64(count) + chi2 += math.Pow(float64(count)-mean, 2) / mean + if d > delta || (0-d) > delta { + t.Errorf( + "Node %d received %d keys, expected %.0f (+/- %.2f)", + node, count, mean, delta, + ) + } + } + if chi2 > chiTable[size-1] { + t.Errorf( + "Chi2 condition for .9 is not met (expected %.2f <= %.2f)", + chi2, chiTable[size-1]) + } + }) + + t.Run("sortByWeightValueUniformW", func(t *testing.T) { + var ( + i uint64 + a, b [size]int + w [size]uint64 + counts = make(map[int]int, size) + key = make([]byte, 16) + ) + + for i = 0; i < size; i++ { + a[i] = int(i) + w[i] = 10 + } + + for i = 0; i < keys; i++ { + copy(b[:], a[:]) + binary.BigEndian.PutUint64(key, i+size) + hash := Hash(key) + SortSliceByWeightValue(b[:], w[:], hash) + counts[b[0]]++ + } + + var chi2 float64 + mean := float64(keys) / float64(size) + delta := mean * percent + for node, count := range counts { + d := mean - float64(count) + chi2 += math.Pow(float64(count)-mean, 2) / mean + if d > delta || (0-d) > delta { + t.Errorf( + "Node %d received %d keys, expected %.0f (+/- %.2f)", + node, count, mean, delta, + ) + } + } + if chi2 > chiTable[size-1] { + t.Errorf( + "Chi2 condition for .9 is not met (expected %.2f <= %.2f)", + chi2, chiTable[size-1]) + } + }) + + t.Run("sortByWeightValueAbsoluteW", func(t *testing.T) { + var ( + i uint64 + a, b [size]int + w [size]uint64 + key = make([]byte, 16) + ) + + for i = 0; i < size; i++ { + a[i] = int(i) + } + w[size-1] = 10 + + for i = 0; i < keys; i++ { + copy(b[:], a[:]) + binary.BigEndian.PutUint64(key, i+size) + hash := Hash(key) + SortSliceByWeightValue(b[:], w[:], hash) + if b[0] != a[size-1] { + t.Fatalf("expected last value of %v to be the first with highest weight", a) + } + } + + }) + t.Run("hash collision", func(t *testing.T) { var ( i uint64 @@ -460,19 +636,19 @@ func TestUniformDistribution(t *testing.T) { }) } -func BenchmarkSortByWeight_fnv_10(b *testing.B) { +func BenchmarkSort_fnv_10(b *testing.B) { hash := Hash(testKey) - _ = benchmarkSortByWeight(b, 10, hash) + _ = benchmarkSort(b, 10, hash) } -func BenchmarkSortByWeight_fnv_100(b *testing.B) { +func BenchmarkSort_fnv_100(b *testing.B) { hash := Hash(testKey) - _ = benchmarkSortByWeight(b, 100, hash) + _ = benchmarkSort(b, 100, hash) } -func BenchmarkSortByWeight_fnv_1000(b *testing.B) { +func BenchmarkSort_fnv_1000(b *testing.B) { hash := Hash(testKey) - _ = benchmarkSortByWeight(b, 1000, hash) + _ = benchmarkSort(b, 1000, hash) } func BenchmarkSortByIndex_fnv_10(b *testing.B) { @@ -505,7 +681,52 @@ func BenchmarkSortByValue_fnv_1000(b *testing.B) { benchmarkSortByValue(b, 1000, hash) } -func benchmarkSortByWeight(b *testing.B, n int, hash uint64) uint64 { +func BenchmarkSortByWeight_fnv_10(b *testing.B) { + hash := Hash(testKey) + _ = benchmarkSortByWeight(b, 10, hash) +} + +func BenchmarkSortByWeight_fnv_100(b *testing.B) { + hash := Hash(testKey) + _ = benchmarkSortByWeight(b, 100, hash) +} + +func BenchmarkSortByWeight_fnv_1000(b *testing.B) { + hash := Hash(testKey) + _ = benchmarkSortByWeight(b, 1000, hash) +} + +func BenchmarkSortByWeightIndex_fnv_10(b *testing.B) { + hash := Hash(testKey) + benchmarkSortByWeightIndex(b, 10, hash) +} + +func BenchmarkSortByWeightIndex_fnv_100(b *testing.B) { + hash := Hash(testKey) + benchmarkSortByWeightIndex(b, 100, hash) +} + +func BenchmarkSortByWeightIndex_fnv_1000(b *testing.B) { + hash := Hash(testKey) + benchmarkSortByWeightIndex(b, 1000, hash) +} + +func BenchmarkSortByWeightValue_fnv_10(b *testing.B) { + hash := Hash(testKey) + benchmarkSortByWeightValue(b, 10, hash) +} + +func BenchmarkSortByWeightValue_fnv_100(b *testing.B) { + hash := Hash(testKey) + benchmarkSortByWeightValue(b, 100, hash) +} + +func BenchmarkSortByWeightValue_fnv_1000(b *testing.B) { + hash := Hash(testKey) + benchmarkSortByWeightValue(b, 1000, hash) +} + +func benchmarkSort(b *testing.B, n int, hash uint64) uint64 { servers := make([]uint64, n) for i := uint64(0); i < uint64(len(servers)); i++ { servers[i] = i @@ -516,7 +737,7 @@ func benchmarkSortByWeight(b *testing.B, n int, hash uint64) uint64 { var x uint64 for i := 0; i < b.N; i++ { - x += SortByWeight(servers, hash)[0] + x += Sort(servers, hash)[0] } return x } @@ -548,3 +769,53 @@ func benchmarkSortByValue(b *testing.B, n int, hash uint64) { SortSliceByValue(servers, hash) } } + +func benchmarkSortByWeight(b *testing.B, n int, hash uint64) uint64 { + servers := make([]uint64, n) + weights := make([]uint64, n) + for i := uint64(0); i < uint64(len(servers)); i++ { + weights[i] = uint64(n) - i + servers[i] = i + } + + b.ResetTimer() + b.ReportAllocs() + + var x uint64 + for i := 0; i < b.N; i++ { + x += SortByWeight(servers, weights, hash)[0] + } + return x +} + +func benchmarkSortByWeightIndex(b *testing.B, n int, hash uint64) { + servers := make([]uint64, n) + weights := make([]uint64, n) + for i := uint64(0); i < uint64(len(servers)); i++ { + weights[i] = uint64(n) - i + servers[i] = i + } + + b.ResetTimer() + b.ReportAllocs() + + for i := 0; i < b.N; i++ { + SortSliceByWeightIndex(servers, weights, hash) + } +} + +func benchmarkSortByWeightValue(b *testing.B, n int, hash uint64) { + servers := make([]string, n) + weights := make([]uint64, n) + for i := uint64(0); i < uint64(len(servers)); i++ { + weights[i] = uint64(n) - i + servers[i] = "localhost:" + strconv.FormatUint(60000-i, 10) + } + + b.ResetTimer() + b.ReportAllocs() + + for i := 0; i < b.N; i++ { + SortSliceByWeightValue(servers, weights, hash) + } +}