[#102] container: Make PutContainerSize stable for simultaneous invocations in one block

Signed-off-by: Alex Vanin <alexey@nspcc.ru>
This commit is contained in:
Alex Vanin 2021-06-30 19:08:59 +03:00 committed by Alex Vanin
parent 2402768eae
commit 711962924f

View file

@ -54,8 +54,9 @@ const (
containerIDSize = 32 // SHA256 size containerIDSize = 32 // SHA256 size
estimateKeyPrefix = "cnr" estimateKeyPrefix = "cnr"
cleanupDelta = 3 estimatePostfixSize = 10
cleanupDelta = 3
) )
var ( var (
@ -364,21 +365,12 @@ func PutContainerSize(epoch int, cid []byte, usedSize int, pubKey interop.Public
panic("container: only storage nodes can save size estimations") panic("container: only storage nodes can save size estimations")
} }
key := estimationKey(epoch, cid) key := estimationKey(epoch, cid, pubKey)
s := getContainerSizeEstimation(ctx, key, cid)
// do not add estimation twice s := estimation{
for i := range s.estimations {
est := s.estimations[i]
if common.BytesEqual(est.from, pubKey) {
panic("invalid estimation")
}
}
s.estimations = append(s.estimations, estimation{
from: pubKey, from: pubKey,
size: usedSize, size: usedSize,
}) }
storage.Put(ctx, key, std.Serialize(s)) storage.Put(ctx, key, std.Serialize(s))
@ -387,7 +379,13 @@ func PutContainerSize(epoch int, cid []byte, usedSize int, pubKey interop.Public
func GetContainerSize(id []byte) containerSizes { func GetContainerSize(id []byte) containerSizes {
ctx := storage.GetReadOnlyContext() ctx := storage.GetReadOnlyContext()
return getContainerSizeEstimation(ctx, id, nil)
// this `id` expected to be from `ListContainerSizes`
// therefore it is not contains postfix, we ignore it in the cut.
ln := len(id)
cid := id[ln-containerIDSize : ln]
return getContainerSizeEstimation(ctx, id, cid)
} }
func ListContainerSizes(epoch int) [][]byte { func ListContainerSizes(epoch int) [][]byte {
@ -400,11 +398,21 @@ func ListContainerSizes(epoch int) [][]byte {
it := storage.Find(ctx, key, storage.KeysOnly) it := storage.Find(ctx, key, storage.KeysOnly)
var result [][]byte uniq := map[string]struct{}{}
for iterator.Next(it) { for iterator.Next(it) {
key := iterator.Value(it).([]byte) // it MUST BE `storage.KeysOnly` storageKey := iterator.Value(it).([]byte)
result = append(result, key)
ln := len(storageKey)
storageKey = storageKey[:ln-estimatePostfixSize]
uniq[string(storageKey)] = struct{}{}
}
var result [][]byte
for k := range uniq {
result = append(result, []byte(k))
} }
return result return result
@ -574,24 +582,31 @@ func ownerFromBinaryContainer(container []byte) []byte {
return container[offset : offset+25] // offset + size of owner return container[offset : offset+25] // offset + size of owner
} }
func estimationKey(epoch int, cid []byte) []byte { func estimationKey(epoch int, cid []byte, key interop.PublicKey) []byte {
var buf interface{} = epoch var buf interface{} = epoch
hash := crypto.Ripemd160(key)
result := []byte(estimateKeyPrefix) result := []byte(estimateKeyPrefix)
result = append(result, buf.([]byte)...) result = append(result, buf.([]byte)...)
result = append(result, cid...)
return append(result, cid...) return append(result, hash[:estimatePostfixSize]...)
} }
func getContainerSizeEstimation(ctx storage.Context, key, cid []byte) containerSizes { func getContainerSizeEstimation(ctx storage.Context, key, cid []byte) containerSizes {
data := storage.Get(ctx, key) var estimations []estimation
if data != nil {
return std.Deserialize(data.([]byte)).(containerSizes) it := storage.Find(ctx, key, storage.ValuesOnly)
for iterator.Next(it) {
rawEstimation := iterator.Value(it).([]byte)
est := std.Deserialize(rawEstimation).(estimation)
estimations = append(estimations, est)
} }
return containerSizes{ return containerSizes{
cid: cid, cid: cid,
estimations: []estimation{}, estimations: estimations,
} }
} }
@ -618,8 +633,8 @@ func keysToDelete(ctx storage.Context, epoch int) [][]byte {
it := storage.Find(ctx, []byte(estimateKeyPrefix), storage.KeysOnly) it := storage.Find(ctx, []byte(estimateKeyPrefix), storage.KeysOnly)
for iterator.Next(it) { for iterator.Next(it) {
k := iterator.Value(it).([]byte) // it MUST BE `storage.KeysOnly` k := iterator.Value(it).([]byte)
nbytes := k[len(estimateKeyPrefix) : len(k)-32] nbytes := k[len(estimateKeyPrefix) : len(k)-containerIDSize-estimatePostfixSize]
var n interface{} = nbytes var n interface{} = nbytes