HRW usage optimize #75
8 changed files with 26 additions and 17 deletions
|
@ -53,6 +53,7 @@ Changelog for FrostFS Node
|
||||||
- `golang.org/x/term` to `v0.3.0`
|
- `golang.org/x/term` to `v0.3.0`
|
||||||
- `google.golang.org/grpc` to `v1.51.0`
|
- `google.golang.org/grpc` to `v1.51.0`
|
||||||
- `github.com/nats-io/nats.go` to `v1.22.1`
|
- `github.com/nats-io/nats.go` to `v1.22.1`
|
||||||
|
- `github.com/TrueCloudLab/hrw` to `v.1.1.1`
|
||||||
- Minimum go version to v1.18
|
- Minimum go version to v1.18
|
||||||
|
|
||||||
### Updating from v0.35.0
|
### Updating from v0.35.0
|
||||||
|
|
2
go.mod
2
go.mod
|
@ -6,7 +6,7 @@ require (
|
||||||
github.com/TrueCloudLab/frostfs-api-go/v2 v2.0.0-20221212144048-1351b6656d68
|
github.com/TrueCloudLab/frostfs-api-go/v2 v2.0.0-20221212144048-1351b6656d68
|
||||||
github.com/TrueCloudLab/frostfs-contract v0.0.0-20221213081248-6c805c1b4e42
|
github.com/TrueCloudLab/frostfs-contract v0.0.0-20221213081248-6c805c1b4e42
|
||||||
github.com/TrueCloudLab/frostfs-sdk-go v0.0.0-20221214065929-4c779423f556
|
github.com/TrueCloudLab/frostfs-sdk-go v0.0.0-20221214065929-4c779423f556
|
||||||
github.com/TrueCloudLab/hrw v1.1.0
|
github.com/TrueCloudLab/hrw v1.1.1-0.20230227111858-79b208bebf52
|
||||||
github.com/TrueCloudLab/tzhash v1.7.0
|
github.com/TrueCloudLab/tzhash v1.7.0
|
||||||
github.com/cheggaaa/pb v1.0.29
|
github.com/cheggaaa/pb v1.0.29
|
||||||
github.com/chzyer/readline v1.5.1
|
github.com/chzyer/readline v1.5.1
|
||||||
|
|
BIN
go.sum
BIN
go.sum
Binary file not shown.
|
@ -19,7 +19,7 @@ type StorageEngine struct {
|
||||||
|
|
||||||
mtx *sync.RWMutex
|
mtx *sync.RWMutex
|
||||||
|
|
||||||
shards map[string]shardWrapper
|
shards map[string]hashedShard
|
||||||
|
|
||||||
shardPools map[string]util.WorkerPool
|
shardPools map[string]util.WorkerPool
|
||||||
|
|
||||||
|
@ -223,7 +223,7 @@ func New(opts ...Option) *StorageEngine {
|
||||||
return &StorageEngine{
|
return &StorageEngine{
|
||||||
cfg: c,
|
cfg: c,
|
||||||
mtx: new(sync.RWMutex),
|
mtx: new(sync.RWMutex),
|
||||||
shards: make(map[string]shardWrapper),
|
shards: make(map[string]hashedShard),
|
||||||
shardPools: make(map[string]util.WorkerPool),
|
shardPools: make(map[string]util.WorkerPool),
|
||||||
closeCh: make(chan struct{}),
|
closeCh: make(chan struct{}),
|
||||||
setModeCh: make(chan setModeRequest),
|
setModeCh: make(chan setModeRequest),
|
||||||
|
|
|
@ -21,6 +21,7 @@ import (
|
||||||
oidtest "github.com/TrueCloudLab/frostfs-sdk-go/object/id/test"
|
oidtest "github.com/TrueCloudLab/frostfs-sdk-go/object/id/test"
|
||||||
usertest "github.com/TrueCloudLab/frostfs-sdk-go/user/test"
|
usertest "github.com/TrueCloudLab/frostfs-sdk-go/user/test"
|
||||||
"github.com/TrueCloudLab/frostfs-sdk-go/version"
|
"github.com/TrueCloudLab/frostfs-sdk-go/version"
|
||||||
|
"github.com/TrueCloudLab/hrw"
|
||||||
"github.com/TrueCloudLab/tzhash/tz"
|
"github.com/TrueCloudLab/tzhash/tz"
|
||||||
"github.com/panjf2000/ants/v2"
|
"github.com/panjf2000/ants/v2"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
|
@ -86,9 +87,12 @@ func testNewEngineWithShards(shards ...*shard.Shard) *StorageEngine {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
engine.shards[s.ID().String()] = shardWrapper{
|
engine.shards[s.ID().String()] = hashedShard{
|
||||||
errorCount: atomic.NewUint32(0),
|
shardWrapper: shardWrapper{
|
||||||
Shard: s,
|
errorCount: atomic.NewUint32(0),
|
||||||
|
Shard: s,
|
||||||
|
},
|
||||||
|
hash: hrw.Hash([]byte(s.ID().String())),
|
||||||
}
|
}
|
||||||
engine.shardPools[s.ID().String()] = pool
|
engine.shardPools[s.ID().String()] = pool
|
||||||
}
|
}
|
||||||
|
|
|
@ -151,7 +151,7 @@ mainLoop:
|
||||||
return res, err
|
return res, err
|
||||||
}
|
}
|
||||||
|
|
||||||
hrw.SortSliceByWeightValue(shards, weights, hrw.Hash([]byte(addr.EncodeToString())))
|
hrw.SortHasherSliceByWeightValue(shards, weights, hrw.Hash([]byte(addr.EncodeToString())))
|
||||||
for j := range shards {
|
for j := range shards {
|
||||||
if _, ok := shardMap[shards[j].ID().String()]; ok {
|
if _, ok := shardMap[shards[j].ID().String()]; ok {
|
||||||
continue
|
continue
|
||||||
|
|
|
@ -16,7 +16,10 @@ import (
|
||||||
|
|
||||||
var errShardNotFound = logicerr.New("shard not found")
|
var errShardNotFound = logicerr.New("shard not found")
|
||||||
|
|
||||||
type hashedShard shardWrapper
|
type hashedShard struct {
|
||||||
|
shardWrapper
|
||||||
|
hash uint64
|
||||||
|
}
|
||||||
|
|
||||||
type metricsWithID struct {
|
type metricsWithID struct {
|
||||||
id string
|
id string
|
||||||
|
@ -127,9 +130,12 @@ func (e *StorageEngine) addShard(sh *shard.Shard) error {
|
||||||
return fmt.Errorf("shard with id %s was already added", strID)
|
return fmt.Errorf("shard with id %s was already added", strID)
|
||||||
}
|
}
|
||||||
|
|
||||||
e.shards[strID] = shardWrapper{
|
e.shards[strID] = hashedShard{
|
||||||
errorCount: atomic.NewUint32(0),
|
shardWrapper: shardWrapper{
|
||||||
Shard: sh,
|
errorCount: atomic.NewUint32(0),
|
||||||
|
Shard: sh,
|
||||||
|
},
|
||||||
|
hash: hrw.Hash([]byte(strID)),
|
||||||
}
|
}
|
||||||
|
|
||||||
e.shardPools[strID] = pool
|
e.shardPools[strID] = pool
|
||||||
|
@ -144,7 +150,7 @@ func (e *StorageEngine) removeShards(ids ...string) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
ss := make([]shardWrapper, 0, len(ids))
|
ss := make([]hashedShard, 0, len(ids))
|
||||||
|
|
||||||
e.mtx.Lock()
|
e.mtx.Lock()
|
||||||
for _, id := range ids {
|
for _, id := range ids {
|
||||||
|
@ -210,7 +216,7 @@ func (e *StorageEngine) sortShardsByWeight(objAddr interface{ EncodeToString() s
|
||||||
weights = append(weights, e.shardWeight(sh.Shard))
|
weights = append(weights, e.shardWeight(sh.Shard))
|
||||||
}
|
}
|
||||||
|
|
||||||
hrw.SortSliceByWeightValue(shards, weights, hrw.Hash([]byte(objAddr.EncodeToString())))
|
hrw.SortHasherSliceByWeightValue(shards, weights, hrw.Hash([]byte(objAddr.EncodeToString())))
|
||||||
|
|
||||||
return shards
|
return shards
|
||||||
}
|
}
|
||||||
|
@ -276,7 +282,5 @@ func (e *StorageEngine) HandleNewEpoch(epoch uint64) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s hashedShard) Hash() uint64 {
|
func (s hashedShard) Hash() uint64 {
|
||||||
return hrw.Hash(
|
return s.hash
|
||||||
[]byte(s.Shard.ID().String()),
|
|
||||||
)
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -93,7 +93,7 @@ func (mb *managerBuilder) BuildManagers(epoch uint64, p apireputation.PeerID) ([
|
||||||
|
|
||||||
copy(nodes, nmNodes)
|
copy(nodes, nmNodes)
|
||||||
|
|
||||||
hrw.SortSliceByValue(nodes, epoch)
|
hrw.SortHasherSliceByValue(nodes, epoch)
|
||||||
|
|
||||||
for i := range nodes {
|
for i := range nodes {
|
||||||
if apireputation.ComparePeerKey(p, nodes[i].PublicKey()) {
|
if apireputation.ComparePeerKey(p, nodes[i].PublicKey()) {
|
||||||
|
|
Loading…
Reference in a new issue