Invalidate list cache on container events #802
4 changed files with 19 additions and 57 deletions
|
@ -308,51 +308,9 @@ func (s ttlContainerLister) List(id *user.ID) ([]cid.ID, error) {
|
||||||
return res, nil
|
return res, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// updates cached list of owner's containers: cnr is added if flag is true, otherwise it's removed.
|
func (s *ttlContainerLister) invalidate(owner user.ID) {
|
||||||
// Concurrent calls can lead to some races:
|
|
||||||
// - two parallel additions to missing owner's cache can lead to only one container to be cached
|
|
||||||
// - async cache value eviction can lead to idle addition
|
|
||||||
//
|
|
||||||
// All described race cases aren't critical since cache values expire anyway, we just try
|
|
||||||
// to increase cache actuality w/o huge overhead on synchronization.
|
|
||||||
func (s *ttlContainerLister) update(owner user.ID, cnr cid.ID, add bool) {
|
|
||||||
strOwner := owner.EncodeToString()
|
strOwner := owner.EncodeToString()
|
||||||
|
s.inner.remove(strOwner)
|
||||||
val, ok := s.inner.cache.Peek(strOwner)
|
|
||||||
if !ok {
|
|
||||||
// we could cache the single cnr but in this case we will disperse
|
|
||||||
// with the Sidechain a lot
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if s.inner.ttl <= time.Since(val.t) {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
item := val.v
|
|
||||||
|
|
||||||
item.mtx.Lock()
|
|
||||||
{
|
|
||||||
found := false
|
|
||||||
|
|
||||||
for i := range item.list {
|
|
||||||
if found = item.list[i].Equals(cnr); found {
|
|
||||||
if !add {
|
|
||||||
item.list = append(item.list[:i], item.list[i+1:]...)
|
|
||||||
// if list became empty we don't remove the value from the cache
|
|
||||||
// since empty list is a correct value, and we don't want to insta
|
|
||||||
// re-request it from the Sidechain
|
|
||||||
}
|
|
||||||
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if add && !found {
|
|
||||||
item.list = append(item.list, cnr)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
item.mtx.Unlock()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type cachedIRFetcher struct {
|
type cachedIRFetcher struct {
|
||||||
|
|
|
@ -74,7 +74,7 @@ func configureEACLAndContainerSources(c *cfg, client *cntClient.Client, cnrSrc c
|
||||||
// creation success are most commonly tracked by polling GET op.
|
// creation success are most commonly tracked by polling GET op.
|
||||||
cnr, err := cnrSrc.Get(ev.ID)
|
cnr, err := cnrSrc.Get(ev.ID)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
cachedContainerLister.update(cnr.Value.Owner(), ev.ID, true)
|
cachedContainerLister.invalidate(cnr.Value.Owner())
|
||||||
cachedContainerStorage.containerCache.set(ev.ID, cnr, nil)
|
cachedContainerStorage.containerCache.set(ev.ID, cnr, nil)
|
||||||
} else {
|
} else {
|
||||||
// unlike removal, we expect successful receive of the container
|
// unlike removal, we expect successful receive of the container
|
||||||
|
@ -93,16 +93,12 @@ func configureEACLAndContainerSources(c *cfg, client *cntClient.Client, cnrSrc c
|
||||||
subscribeToContainerRemoval(c, func(e event.Event) {
|
subscribeToContainerRemoval(c, func(e event.Event) {
|
||||||
ev := e.(containerEvent.DeleteSuccess)
|
ev := e.(containerEvent.DeleteSuccess)
|
||||||
|
|
||||||
// read owner of the removed container in order to update the listing cache.
|
cachedContainerStorage.handleRemoval(ev.ID)
|
||||||
// It's strange to read already removed container, but we can successfully hit
|
info, err := cachedContainerStorage.DeletionInfo(ev.ID)
|
||||||
// the cache.
|
|
||||||
// TODO: use owner directly from the event after neofs-contract#256 will become resolved
|
|
||||||
cnr, err := cachedContainerStorage.Get(ev.ID)
|
|
||||||
if err == nil {
|
if err == nil {
|
||||||
cachedContainerLister.update(cnr.Value.Owner(), ev.ID, false)
|
cachedContainerLister.invalidate(info.Owner)
|
||||||
}
|
}
|
||||||
|
|
||||||
cachedContainerStorage.handleRemoval(ev.ID)
|
|
||||||
c.log.Debug(logs.FrostFSNodeContainerRemovalEventsReceipt,
|
c.log.Debug(logs.FrostFSNodeContainerRemovalEventsReceipt,
|
||||||
zap.Stringer("id", ev.ID),
|
zap.Stringer("id", ev.ID),
|
||||||
)
|
)
|
||||||
|
|
|
@ -6,6 +6,7 @@ import (
|
||||||
frostfscrypto "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/crypto"
|
frostfscrypto "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/crypto"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/eacl"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/eacl"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
|
||||||
policyengine "git.frostfs.info/TrueCloudLab/policy-engine"
|
policyengine "git.frostfs.info/TrueCloudLab/policy-engine"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -24,10 +25,10 @@ type Container struct {
|
||||||
// DelInfo contains info about removed container.
|
// DelInfo contains info about removed container.
|
||||||
type DelInfo struct {
|
type DelInfo struct {
|
||||||
// Container owner.
|
// Container owner.
|
||||||
Owner []byte
|
Owner user.ID
|
||||||
|
|
||||||
// Epoch indicates when the container was removed.
|
// Epoch indicates when the container was removed.
|
||||||
Epoch int
|
Epoch uint64
|
||||||
}
|
}
|
||||||
|
|
||||||
// Source is an interface that wraps
|
// Source is an interface that wraps
|
||||||
|
|
|
@ -10,6 +10,8 @@ import (
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client"
|
||||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
|
||||||
|
"github.com/mr-tron/base58"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (x *containerSource) DeletionInfo(cnr cid.ID) (*containercore.DelInfo, error) {
|
func (x *containerSource) DeletionInfo(cnr cid.ID) (*containercore.DelInfo, error) {
|
||||||
|
@ -51,18 +53,23 @@ func (c *Client) DeletionInfo(cid []byte) (*containercore.DelInfo, error) {
|
||||||
return nil, fmt.Errorf("unexpected container stack item count (%s): %d", deletionInfoMethod, len(arr))
|
return nil, fmt.Errorf("unexpected container stack item count (%s): %d", deletionInfoMethod, len(arr))
|
||||||
}
|
}
|
||||||
|
|
||||||
owner, err := client.BytesFromStackItem(arr[0])
|
rawOwner, err := client.BytesFromStackItem(arr[0])
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("could not get byte array of container (%s): %w", deletionInfoMethod, err)
|
return nil, fmt.Errorf("could not get byte array of container (%s): %w", deletionInfoMethod, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
epoch, err := client.IntFromStackItem(arr[1])
|
var owner user.ID
|
||||||
|
if err := owner.DecodeString(base58.Encode(rawOwner)); err != nil {
|
||||||
|
return nil, fmt.Errorf("could not decode container owner id (%s): %w", deletionInfoMethod, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
epoch, err := client.BigIntFromStackItem(arr[1])
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("could not get byte array of container signature (%s): %w", deletionInfoMethod, err)
|
return nil, fmt.Errorf("could not get byte array of container signature (%s): %w", deletionInfoMethod, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return &containercore.DelInfo{
|
return &containercore.DelInfo{
|
||||||
Owner: owner,
|
Owner: owner,
|
||||||
Epoch: int(epoch),
|
Epoch: epoch.Uint64(),
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue