engine: Drop shards weights #1005
8 changed files with 65 additions and 77 deletions
|
@ -221,7 +221,7 @@ func (e *StorageEngine) Evacuate(ctx context.Context, prm EvacuateShardPrm) (*Ev
|
||||||
))
|
))
|
||||||
defer span.End()
|
defer span.End()
|
||||||
|
|
||||||
shards, weights, err := e.getActualShards(shardIDs, prm)
|
shards, err := e.getActualShards(shardIDs, prm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -243,7 +243,7 @@ func (e *StorageEngine) Evacuate(ctx context.Context, prm EvacuateShardPrm) (*Ev
|
||||||
}
|
}
|
||||||
|
|
||||||
eg.Go(func() error {
|
eg.Go(func() error {
|
||||||
return e.evacuateShards(egCtx, shardIDs, prm, res, shards, weights, shardsToEvacuate)
|
return e.evacuateShards(egCtx, shardIDs, prm, res, shards, shardsToEvacuate)
|
||||||
})
|
})
|
||||||
|
|
||||||
if prm.Async {
|
if prm.Async {
|
||||||
|
@ -261,7 +261,7 @@ func ctxOrBackground(ctx context.Context, background bool) context.Context {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *StorageEngine) evacuateShards(ctx context.Context, shardIDs []string, prm EvacuateShardPrm, res *EvacuateShardRes,
|
func (e *StorageEngine) evacuateShards(ctx context.Context, shardIDs []string, prm EvacuateShardPrm, res *EvacuateShardRes,
|
||||||
shards []pooledShard, weights []float64, shardsToEvacuate map[string]*shard.Shard,
|
shards []pooledShard, shardsToEvacuate map[string]*shard.Shard,
|
||||||
) error {
|
) error {
|
||||||
var err error
|
var err error
|
||||||
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.evacuateShards",
|
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.evacuateShards",
|
||||||
|
@ -288,7 +288,7 @@ func (e *StorageEngine) evacuateShards(ctx context.Context, shardIDs []string, p
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, shardID := range shardIDs {
|
for _, shardID := range shardIDs {
|
||||||
if err = e.evacuateShard(ctx, shardID, prm, res, shards, weights, shardsToEvacuate); err != nil {
|
if err = e.evacuateShard(ctx, shardID, prm, res, shards, shardsToEvacuate); err != nil {
|
||||||
e.log.Error(logs.EngineFinishedWithErrorShardsEvacuation, zap.Error(err), zap.Strings("shard_ids", shardIDs), evacuationOperationLogField,
|
e.log.Error(logs.EngineFinishedWithErrorShardsEvacuation, zap.Error(err), zap.Strings("shard_ids", shardIDs), evacuationOperationLogField,
|
||||||
zap.String("trace_id", tracingPkg.GetTraceID(ctx)), zap.Stringer("scope", prm.Scope))
|
zap.String("trace_id", tracingPkg.GetTraceID(ctx)), zap.Stringer("scope", prm.Scope))
|
||||||
return err
|
return err
|
||||||
|
@ -336,7 +336,7 @@ func (e *StorageEngine) getTotals(ctx context.Context, prm EvacuateShardPrm, sha
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *StorageEngine) evacuateShard(ctx context.Context, shardID string, prm EvacuateShardPrm, res *EvacuateShardRes,
|
func (e *StorageEngine) evacuateShard(ctx context.Context, shardID string, prm EvacuateShardPrm, res *EvacuateShardRes,
|
||||||
shards []pooledShard, weights []float64, shardsToEvacuate map[string]*shard.Shard,
|
shards []pooledShard, shardsToEvacuate map[string]*shard.Shard,
|
||||||
) error {
|
) error {
|
||||||
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.evacuateShard",
|
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.evacuateShard",
|
||||||
trace.WithAttributes(
|
trace.WithAttributes(
|
||||||
|
@ -345,13 +345,13 @@ func (e *StorageEngine) evacuateShard(ctx context.Context, shardID string, prm E
|
||||||
defer span.End()
|
defer span.End()
|
||||||
|
|
||||||
if prm.Scope.WithObjects() {
|
if prm.Scope.WithObjects() {
|
||||||
if err := e.evacuateShardObjects(ctx, shardID, prm, res, shards, weights, shardsToEvacuate); err != nil {
|
if err := e.evacuateShardObjects(ctx, shardID, prm, res, shards, shardsToEvacuate); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if prm.Scope.WithTrees() && shardsToEvacuate[shardID].PiloramaEnabled() {
|
if prm.Scope.WithTrees() && shardsToEvacuate[shardID].PiloramaEnabled() {
|
||||||
if err := e.evacuateShardTrees(ctx, shardID, prm, res, shards, weights, shardsToEvacuate); err != nil {
|
if err := e.evacuateShardTrees(ctx, shardID, prm, res, shards, shardsToEvacuate); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -360,7 +360,7 @@ func (e *StorageEngine) evacuateShard(ctx context.Context, shardID string, prm E
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *StorageEngine) evacuateShardObjects(ctx context.Context, shardID string, prm EvacuateShardPrm, res *EvacuateShardRes,
|
func (e *StorageEngine) evacuateShardObjects(ctx context.Context, shardID string, prm EvacuateShardPrm, res *EvacuateShardRes,
|
||||||
shards []pooledShard, weights []float64, shardsToEvacuate map[string]*shard.Shard,
|
shards []pooledShard, shardsToEvacuate map[string]*shard.Shard,
|
||||||
) error {
|
) error {
|
||||||
var listPrm shard.ListWithCursorPrm
|
var listPrm shard.ListWithCursorPrm
|
||||||
listPrm.WithCount(defaultEvacuateBatchSize)
|
listPrm.WithCount(defaultEvacuateBatchSize)
|
||||||
|
@ -383,7 +383,7 @@ func (e *StorageEngine) evacuateShardObjects(ctx context.Context, shardID string
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if err = e.evacuateObjects(ctx, sh, listRes.AddressList(), prm, res, shards, weights, shardsToEvacuate); err != nil {
|
if err = e.evacuateObjects(ctx, sh, listRes.AddressList(), prm, res, shards, shardsToEvacuate); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -393,7 +393,7 @@ func (e *StorageEngine) evacuateShardObjects(ctx context.Context, shardID string
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *StorageEngine) evacuateShardTrees(ctx context.Context, shardID string, prm EvacuateShardPrm, res *EvacuateShardRes,
|
func (e *StorageEngine) evacuateShardTrees(ctx context.Context, shardID string, prm EvacuateShardPrm, res *EvacuateShardRes,
|
||||||
shards []pooledShard, weights []float64, shardsToEvacuate map[string]*shard.Shard,
|
shards []pooledShard, shardsToEvacuate map[string]*shard.Shard,
|
||||||
) error {
|
) error {
|
||||||
sh := shardsToEvacuate[shardID]
|
sh := shardsToEvacuate[shardID]
|
||||||
|
|
||||||
|
@ -414,7 +414,7 @@ func (e *StorageEngine) evacuateShardTrees(ctx context.Context, shardID string,
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
listPrm.NextPageToken = listRes.NextPageToken
|
listPrm.NextPageToken = listRes.NextPageToken
|
||||||
if err := e.evacuateTrees(ctx, sh, listRes.Items, prm, res, shards, weights, shardsToEvacuate); err != nil {
|
if err := e.evacuateTrees(ctx, sh, listRes.Items, prm, res, shards, shardsToEvacuate); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -422,8 +422,7 @@ func (e *StorageEngine) evacuateShardTrees(ctx context.Context, shardID string,
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *StorageEngine) evacuateTrees(ctx context.Context, sh *shard.Shard, trees []pilorama.ContainerIDTreeID,
|
func (e *StorageEngine) evacuateTrees(ctx context.Context, sh *shard.Shard, trees []pilorama.ContainerIDTreeID,
|
||||||
prm EvacuateShardPrm, res *EvacuateShardRes, shards []pooledShard, weights []float64,
|
prm EvacuateShardPrm, res *EvacuateShardRes, shards []pooledShard, shardsToEvacuate map[string]*shard.Shard,
|
||||||
shardsToEvacuate map[string]*shard.Shard,
|
|
||||||
) error {
|
) error {
|
||||||
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.evacuateTrees",
|
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.evacuateTrees",
|
||||||
trace.WithAttributes(
|
trace.WithAttributes(
|
||||||
|
@ -438,7 +437,7 @@ func (e *StorageEngine) evacuateTrees(ctx context.Context, sh *shard.Shard, tree
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
|
|
||||||
success, shardID, err := e.tryEvacuateTreeLocal(ctx, sh, contTree, prm, shards, weights, shardsToEvacuate)
|
success, shardID, err := e.tryEvacuateTreeLocal(ctx, sh, contTree, prm, shards, shardsToEvacuate)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -477,9 +476,9 @@ func (e *StorageEngine) evacuateTreeToOtherNode(ctx context.Context, sh *shard.S
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *StorageEngine) tryEvacuateTreeLocal(ctx context.Context, sh *shard.Shard, tree pilorama.ContainerIDTreeID,
|
func (e *StorageEngine) tryEvacuateTreeLocal(ctx context.Context, sh *shard.Shard, tree pilorama.ContainerIDTreeID,
|
||||||
prm EvacuateShardPrm, shards []pooledShard, weights []float64, shardsToEvacuate map[string]*shard.Shard,
|
prm EvacuateShardPrm, shards []pooledShard, shardsToEvacuate map[string]*shard.Shard,
|
||||||
) (bool, string, error) {
|
) (bool, string, error) {
|
||||||
target, found, err := e.findShardToEvacuateTree(ctx, tree, shards, weights, shardsToEvacuate)
|
target, found, err := e.findShardToEvacuateTree(ctx, tree, shards, shardsToEvacuate)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, "", err
|
return false, "", err
|
||||||
}
|
}
|
||||||
|
@ -547,9 +546,9 @@ func (e *StorageEngine) tryEvacuateTreeLocal(ctx context.Context, sh *shard.Shar
|
||||||
|
|
||||||
// findShardToEvacuateTree returns first shard according HRW or first shard with tree exists.
|
// findShardToEvacuateTree returns first shard according HRW or first shard with tree exists.
|
||||||
func (e *StorageEngine) findShardToEvacuateTree(ctx context.Context, tree pilorama.ContainerIDTreeID,
|
func (e *StorageEngine) findShardToEvacuateTree(ctx context.Context, tree pilorama.ContainerIDTreeID,
|
||||||
shards []pooledShard, weights []float64, shardsToEvacuate map[string]*shard.Shard,
|
shards []pooledShard, shardsToEvacuate map[string]*shard.Shard,
|
||||||
) (pooledShard, bool, error) {
|
) (pooledShard, bool, error) {
|
||||||
hrw.SortHasherSliceByWeightValue(shards, weights, hrw.StringHash(tree.CID.EncodeToString()))
|
hrw.SortHasherSliceByValue(shards, hrw.StringHash(tree.CID.EncodeToString()))
|
||||||
var result pooledShard
|
var result pooledShard
|
||||||
var found bool
|
var found bool
|
||||||
for _, target := range shards {
|
for _, target := range shards {
|
||||||
|
@ -583,31 +582,31 @@ func (e *StorageEngine) findShardToEvacuateTree(ctx context.Context, tree pilora
|
||||||
return result, found, nil
|
return result, found, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *StorageEngine) getActualShards(shardIDs []string, prm EvacuateShardPrm) ([]pooledShard, []float64, error) {
|
func (e *StorageEngine) getActualShards(shardIDs []string, prm EvacuateShardPrm) ([]pooledShard, error) {
|
||||||
e.mtx.RLock()
|
e.mtx.RLock()
|
||||||
defer e.mtx.RUnlock()
|
defer e.mtx.RUnlock()
|
||||||
|
|
||||||
for i := range shardIDs {
|
for i := range shardIDs {
|
||||||
sh, ok := e.shards[shardIDs[i]]
|
sh, ok := e.shards[shardIDs[i]]
|
||||||
if !ok {
|
if !ok {
|
||||||
return nil, nil, errShardNotFound
|
return nil, errShardNotFound
|
||||||
}
|
}
|
||||||
|
|
||||||
if !sh.GetMode().ReadOnly() {
|
if !sh.GetMode().ReadOnly() {
|
||||||
return nil, nil, ErrMustBeReadOnly
|
return nil, ErrMustBeReadOnly
|
||||||
}
|
}
|
||||||
|
|
||||||
if prm.Scope.TreesOnly() && !sh.PiloramaEnabled() {
|
if prm.Scope.TreesOnly() && !sh.PiloramaEnabled() {
|
||||||
return nil, nil, fmt.Errorf("shard %s doesn't have pilorama enabled", sh.ID())
|
return nil, fmt.Errorf("shard %s doesn't have pilorama enabled", sh.ID())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(e.shards)-len(shardIDs) < 1 && prm.ObjectsHandler == nil && prm.Scope.WithObjects() {
|
if len(e.shards)-len(shardIDs) < 1 && prm.ObjectsHandler == nil && prm.Scope.WithObjects() {
|
||||||
return nil, nil, errMustHaveTwoShards
|
return nil, errMustHaveTwoShards
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(e.shards)-len(shardIDs) < 1 && prm.TreeHandler == nil && prm.Scope.WithTrees() {
|
if len(e.shards)-len(shardIDs) < 1 && prm.TreeHandler == nil && prm.Scope.WithTrees() {
|
||||||
return nil, nil, errMustHaveTwoShards
|
return nil, errMustHaveTwoShards
|
||||||
}
|
}
|
||||||
|
|
||||||
// We must have all shards, to have correct information about their
|
// We must have all shards, to have correct information about their
|
||||||
|
@ -620,17 +619,11 @@ func (e *StorageEngine) getActualShards(shardIDs []string, prm EvacuateShardPrm)
|
||||||
pool: e.shardPools[id],
|
pool: e.shardPools[id],
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
return shards, nil
|
||||||
weights := make([]float64, 0, len(shards))
|
|
||||||
for i := range shards {
|
|
||||||
weights = append(weights, e.shardWeight(shards[i].Shard))
|
|
||||||
}
|
|
||||||
|
|
||||||
return shards, weights, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *StorageEngine) evacuateObjects(ctx context.Context, sh *shard.Shard, toEvacuate []object.AddressWithType, prm EvacuateShardPrm, res *EvacuateShardRes,
|
func (e *StorageEngine) evacuateObjects(ctx context.Context, sh *shard.Shard, toEvacuate []object.AddressWithType, prm EvacuateShardPrm, res *EvacuateShardRes,
|
||||||
shards []pooledShard, weights []float64, shardsToEvacuate map[string]*shard.Shard,
|
shards []pooledShard, shardsToEvacuate map[string]*shard.Shard,
|
||||||
) error {
|
) error {
|
||||||
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.evacuateObjects",
|
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.evacuateObjects",
|
||||||
trace.WithAttributes(
|
trace.WithAttributes(
|
||||||
|
@ -660,7 +653,7 @@ func (e *StorageEngine) evacuateObjects(ctx context.Context, sh *shard.Shard, to
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
evacuatedLocal, err := e.tryEvacuateObjectLocal(ctx, addr, getRes.Object(), sh, shards, weights, shardsToEvacuate, res)
|
evacuatedLocal, err := e.tryEvacuateObjectLocal(ctx, addr, getRes.Object(), sh, shards, shardsToEvacuate, res)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -687,9 +680,9 @@ func (e *StorageEngine) evacuateObjects(ctx context.Context, sh *shard.Shard, to
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *StorageEngine) tryEvacuateObjectLocal(ctx context.Context, addr oid.Address, object *objectSDK.Object, sh *shard.Shard,
|
func (e *StorageEngine) tryEvacuateObjectLocal(ctx context.Context, addr oid.Address, object *objectSDK.Object, sh *shard.Shard,
|
||||||
shards []pooledShard, weights []float64, shardsToEvacuate map[string]*shard.Shard, res *EvacuateShardRes,
|
shards []pooledShard, shardsToEvacuate map[string]*shard.Shard, res *EvacuateShardRes,
|
||||||
) (bool, error) {
|
) (bool, error) {
|
||||||
hrw.SortHasherSliceByWeightValue(shards, weights, hrw.StringHash(addr.EncodeToString()))
|
hrw.SortHasherSliceByValue(shards, hrw.StringHash(addr.EncodeToString()))
|
||||||
for j := range shards {
|
for j := range shards {
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
|
|
|
@ -111,7 +111,7 @@ func (e *StorageEngine) removeObjects(ctx context.Context, ch <-chan oid.Address
|
||||||
|
|
||||||
for addr := range ch {
|
for addr := range ch {
|
||||||
h := hrw.StringHash(addr.EncodeToString())
|
h := hrw.StringHash(addr.EncodeToString())
|
||||||
shards := sortShardsByWeight(shards, h)
|
hrw.SortHasherSliceByValue(shards, h)
|
||||||
found := false
|
found := false
|
||||||
for i := range shards {
|
for i := range shards {
|
||||||
var existsPrm shard.ExistsPrm
|
var existsPrm shard.ExistsPrm
|
||||||
|
|
|
@ -41,7 +41,7 @@ func TestRebalance(t *testing.T) {
|
||||||
obj.SetPayload(make([]byte, errSmallSize))
|
obj.SetPayload(make([]byte, errSmallSize))
|
||||||
objects[i].object = obj
|
objects[i].object = obj
|
||||||
|
|
||||||
shards := te.ng.sortShardsByWeight(object.AddressOf(obj))
|
shards := te.ng.sortShards(object.AddressOf(obj))
|
||||||
objects[i].bestShard = *shards[0].Shard.ID()
|
objects[i].bestShard = *shards[0].Shard.ID()
|
||||||
objects[i].worstShard = *shards[1].Shard.ID()
|
objects[i].worstShard = *shards[1].Shard.ID()
|
||||||
}
|
}
|
||||||
|
|
|
@ -254,13 +254,7 @@ func generateShardID() (*shard.ID, error) {
|
||||||
return shard.NewIDFromBytes(bin), nil
|
return shard.NewIDFromBytes(bin), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *StorageEngine) shardWeight(sh *shard.Shard) float64 {
|
func (e *StorageEngine) sortShards(objAddr interface{ EncodeToString() string }) []hashedShard {
|
||||||
weightValues := sh.WeightValues()
|
|
||||||
|
|
||||||
return float64(weightValues.FreeSpace)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (e *StorageEngine) sortShardsByWeight(objAddr interface{ EncodeToString() string }) []hashedShard {
|
|
||||||
e.mtx.RLock()
|
e.mtx.RLock()
|
||||||
defer e.mtx.RUnlock()
|
defer e.mtx.RUnlock()
|
||||||
|
|
||||||
|
@ -269,16 +263,7 @@ func (e *StorageEngine) sortShardsByWeight(objAddr interface{ EncodeToString() s
|
||||||
for _, sh := range e.shards {
|
for _, sh := range e.shards {
|
||||||
shards = append(shards, hashedShard(sh))
|
shards = append(shards, hashedShard(sh))
|
||||||
}
|
}
|
||||||
return sortShardsByWeight(shards, h)
|
hrw.SortHasherSliceByValue(shards, h)
|
||||||
}
|
|
||||||
|
|
||||||
func sortShardsByWeight(shards []hashedShard, h uint64) []hashedShard {
|
|
||||||
weights := make([]float64, 0, len(shards))
|
|
||||||
for _, sh := range shards {
|
|
||||||
weights = append(weights, float64(sh.Shard.WeightValues().FreeSpace))
|
|
||||||
}
|
|
||||||
|
|
||||||
hrw.SortHasherSliceByWeightValue(shards, weights, h)
|
|
||||||
return shards
|
return shards
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -296,7 +281,7 @@ func (e *StorageEngine) unsortedShards() []hashedShard {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *StorageEngine) iterateOverSortedShards(addr oid.Address, handler func(int, hashedShard) (stop bool)) {
|
func (e *StorageEngine) iterateOverSortedShards(addr oid.Address, handler func(int, hashedShard) (stop bool)) {
|
||||||
for i, sh := range e.sortShardsByWeight(addr) {
|
for i, sh := range e.sortShards(addr) {
|
||||||
if handler(i, sh) {
|
if handler(i, sh) {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
|
@ -6,6 +6,7 @@ import (
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
|
||||||
|
"git.frostfs.info/TrueCloudLab/hrw"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -62,3 +63,27 @@ func TestDisableShards(t *testing.T) {
|
||||||
|
|
||||||
require.Equal(t, 1, len(e.shards))
|
require.Equal(t, 1, len(e.shards))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestSortShardsByWeight(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
const numOfShards = 500
|
||||||
|
|
||||||
|
var shards1 []hashedShard
|
||||||
|
var weights1 []float64
|
||||||
|
var shards2 []hashedShard
|
||||||
|
for i := 0; i < numOfShards; i++ {
|
||||||
|
shards1 = append(shards1, hashedShard{
|
||||||
|
hash: uint64(i),
|
||||||
|
})
|
||||||
|
weights1 = append(weights1, 0)
|
||||||
|
shards2 = append(shards2, hashedShard{
|
||||||
|
hash: uint64(i),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
hrw.SortHasherSliceByWeightValue(shards1, weights1, 0)
|
||||||
|
hrw.SortHasherSliceByValue(shards2, 0)
|
||||||
|
|
||||||
|
require.Equal(t, shards1, shards2)
|
||||||
|
}
|
||||||
|
|
|
@ -125,7 +125,7 @@ func (e *StorageEngine) TreeGetByPath(ctx context.Context, cid cidSDK.ID, treeID
|
||||||
|
|
||||||
var err error
|
var err error
|
||||||
var nodes []pilorama.Node
|
var nodes []pilorama.Node
|
||||||
for _, sh := range e.sortShardsByWeight(cid) {
|
for _, sh := range e.sortShards(cid) {
|
||||||
nodes, err = sh.TreeGetByPath(ctx, cid, treeID, attr, path, latest)
|
nodes, err = sh.TreeGetByPath(ctx, cid, treeID, attr, path, latest)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if err == shard.ErrPiloramaDisabled {
|
if err == shard.ErrPiloramaDisabled {
|
||||||
|
@ -158,7 +158,7 @@ func (e *StorageEngine) TreeGetMeta(ctx context.Context, cid cidSDK.ID, treeID s
|
||||||
var err error
|
var err error
|
||||||
var m pilorama.Meta
|
var m pilorama.Meta
|
||||||
var p uint64
|
var p uint64
|
||||||
for _, sh := range e.sortShardsByWeight(cid) {
|
for _, sh := range e.sortShards(cid) {
|
||||||
m, p, err = sh.TreeGetMeta(ctx, cid, treeID, nodeID)
|
m, p, err = sh.TreeGetMeta(ctx, cid, treeID, nodeID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if err == shard.ErrPiloramaDisabled {
|
if err == shard.ErrPiloramaDisabled {
|
||||||
|
@ -190,7 +190,7 @@ func (e *StorageEngine) TreeGetChildren(ctx context.Context, cid cidSDK.ID, tree
|
||||||
|
|
||||||
var err error
|
var err error
|
||||||
var nodes []pilorama.NodeInfo
|
var nodes []pilorama.NodeInfo
|
||||||
for _, sh := range e.sortShardsByWeight(cid) {
|
for _, sh := range e.sortShards(cid) {
|
||||||
nodes, err = sh.TreeGetChildren(ctx, cid, treeID, nodeID)
|
nodes, err = sh.TreeGetChildren(ctx, cid, treeID, nodeID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if err == shard.ErrPiloramaDisabled {
|
if err == shard.ErrPiloramaDisabled {
|
||||||
|
@ -222,7 +222,7 @@ func (e *StorageEngine) TreeGetOpLog(ctx context.Context, cid cidSDK.ID, treeID
|
||||||
|
|
||||||
var err error
|
var err error
|
||||||
var lm pilorama.Move
|
var lm pilorama.Move
|
||||||
for _, sh := range e.sortShardsByWeight(cid) {
|
for _, sh := range e.sortShards(cid) {
|
||||||
lm, err = sh.TreeGetOpLog(ctx, cid, treeID, height)
|
lm, err = sh.TreeGetOpLog(ctx, cid, treeID, height)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if err == shard.ErrPiloramaDisabled {
|
if err == shard.ErrPiloramaDisabled {
|
||||||
|
@ -252,7 +252,7 @@ func (e *StorageEngine) TreeDrop(ctx context.Context, cid cidSDK.ID, treeID stri
|
||||||
defer span.End()
|
defer span.End()
|
||||||
|
|
||||||
var err error
|
var err error
|
||||||
for _, sh := range e.sortShardsByWeight(cid) {
|
for _, sh := range e.sortShards(cid) {
|
||||||
err = sh.TreeDrop(ctx, cid, treeID)
|
err = sh.TreeDrop(ctx, cid, treeID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if err == shard.ErrPiloramaDisabled {
|
if err == shard.ErrPiloramaDisabled {
|
||||||
|
@ -375,7 +375,7 @@ func (e *StorageEngine) TreeLastSyncHeight(ctx context.Context, cid cidSDK.ID, t
|
||||||
|
|
||||||
var err error
|
var err error
|
||||||
var height uint64
|
var height uint64
|
||||||
for _, sh := range e.sortShardsByWeight(cid) {
|
for _, sh := range e.sortShards(cid) {
|
||||||
height, err = sh.TreeLastSyncHeight(ctx, cid, treeID)
|
height, err = sh.TreeLastSyncHeight(ctx, cid, treeID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if err == shard.ErrPiloramaDisabled {
|
if err == shard.ErrPiloramaDisabled {
|
||||||
|
@ -395,7 +395,7 @@ func (e *StorageEngine) TreeLastSyncHeight(ctx context.Context, cid cidSDK.ID, t
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *StorageEngine) getTreeShard(ctx context.Context, cid cidSDK.ID, treeID string) (int, []hashedShard, error) {
|
func (e *StorageEngine) getTreeShard(ctx context.Context, cid cidSDK.ID, treeID string) (int, []hashedShard, error) {
|
||||||
lst := e.sortShardsByWeight(cid)
|
lst := e.sortShards(cid)
|
||||||
for i, sh := range lst {
|
for i, sh := range lst {
|
||||||
exists, err := sh.TreeExists(ctx, cid, treeID)
|
exists, err := sh.TreeExists(ctx, cid, treeID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -25,9 +25,6 @@ type Info struct {
|
||||||
// Information about the Write Cache.
|
// Information about the Write Cache.
|
||||||
WriteCacheInfo writecache.Info
|
WriteCacheInfo writecache.Info
|
||||||
|
|
||||||
// Weight parameters of the shard.
|
|
||||||
WeightValues WeightValues
|
|
||||||
|
|
||||||
// ErrorCount contains amount of errors occurred in shard operations.
|
// ErrorCount contains amount of errors occurred in shard operations.
|
||||||
ErrorCount uint32
|
ErrorCount uint32
|
||||||
|
|
||||||
|
|
|
@ -1,12 +0,0 @@
|
||||||
package shard
|
|
||||||
|
|
||||||
// WeightValues groups values of Shard weight parameters.
|
|
||||||
type WeightValues struct {
|
|
||||||
// Amount of free disk space. Measured in kilobytes.
|
|
||||||
FreeSpace uint64
|
|
||||||
}
|
|
||||||
|
|
||||||
// WeightValues returns current weight values of the Shard.
|
|
||||||
func (s *Shard) WeightValues() WeightValues {
|
|
||||||
return s.info.WeightValues
|
|
||||||
}
|
|
Loading…
Reference in a new issue