[#220] localstorage: Implement hrw.Hasher over Shard structure

Signed-off-by: Leonard Lyubich <leonard@nspcc.ru>
This commit is contained in:
Leonard Lyubich 2020-11-30 17:58:44 +03:00 committed by Alex Vanin
parent 88c1584e6a
commit 2963473c08
2 changed files with 18 additions and 8 deletions

View file

@ -46,14 +46,14 @@ func (e *StorageEngine) Put(prm *PutPrm) (*PutRes, error) {
// save the object into the "largest" possible shard // save the object into the "largest" possible shard
for _, sh := range sortedShards { for _, sh := range sortedShards {
_, err := sh.Put( _, err := sh.sh.Put(
shPrm.WithObject(prm.obj), shPrm.WithObject(prm.obj),
) )
if err != nil { if err != nil {
// TODO: smth wrong with shard, need to be processed // TODO: smth wrong with shard, need to be processed
e.log.Warn("could not save object in shard", e.log.Warn("could not save object in shard",
zap.Stringer("shard", sh.ID()), zap.Stringer("shard", sh.sh.ID()),
zap.String("error", err.Error()), zap.String("error", err.Error()),
) )
} else { } else {
@ -64,11 +64,11 @@ func (e *StorageEngine) Put(prm *PutPrm) (*PutRes, error) {
return nil, errPutShard return nil, errPutShard
} }
func (e *StorageEngine) objectExists(obj *object.Object, shards []*shard.Shard) bool { func (e *StorageEngine) objectExists(obj *object.Object, shards []hashedShard) bool {
exists := false exists := false
for _, sh := range shards { for _, sh := range shards {
res, err := sh.Exists( res, err := sh.sh.Exists(
new(shard.ExistsPrm). new(shard.ExistsPrm).
WithAddress(obj.Address()), WithAddress(obj.Address()),
) )

View file

@ -12,6 +12,10 @@ import (
var errShardNotFound = errors.New("shard not found") var errShardNotFound = errors.New("shard not found")
type hashedShard struct {
sh *shard.Shard
}
// AddShard adds a new shard to the storage engine. // AddShard adds a new shard to the storage engine.
// //
// Returns any error encountered that did not allow adding a shard. // Returns any error encountered that did not allow adding a shard.
@ -50,15 +54,15 @@ func (e *StorageEngine) shardWeight(sh *shard.Shard) float64 {
return float64(weightValues.FreeSpace) return float64(weightValues.FreeSpace)
} }
func (e *StorageEngine) sortShardsByWeight(objAddr fmt.Stringer) []*shard.Shard { func (e *StorageEngine) sortShardsByWeight(objAddr fmt.Stringer) []hashedShard {
e.mtx.RLock() e.mtx.RLock()
defer e.mtx.RUnlock() defer e.mtx.RUnlock()
shards := make([]*shard.Shard, 0, len(e.shards)) shards := make([]hashedShard, 0, len(e.shards))
weights := make([]float64, 0, len(e.shards)) weights := make([]float64, 0, len(e.shards))
for _, sh := range e.shards { for _, sh := range e.shards {
shards = append(shards, sh) shards = append(shards, hashedShard{sh})
weights = append(weights, e.shardWeight(sh)) weights = append(weights, e.shardWeight(sh))
} }
@ -69,7 +73,7 @@ func (e *StorageEngine) sortShardsByWeight(objAddr fmt.Stringer) []*shard.Shard
func (e *StorageEngine) iterateOverSortedShards(addr *object.Address, handler func(*shard.Shard) (stop bool)) { func (e *StorageEngine) iterateOverSortedShards(addr *object.Address, handler func(*shard.Shard) (stop bool)) {
for _, sh := range e.sortShardsByWeight(addr) { for _, sh := range e.sortShardsByWeight(addr) {
if handler(sh) { if handler(sh.sh) {
break break
} }
} }
@ -90,3 +94,9 @@ func (e *StorageEngine) SetShardMode(id *shard.ID, m shard.Mode) error {
return errShardNotFound return errShardNotFound
} }
func (s hashedShard) Hash() uint64 {
return hrw.Hash(
[]byte(s.sh.ID().String()),
)
}