forked from TrueCloudLab/frostfs-node
[#1632] node/container: Update LIST cache on successful writes
In previous implementation storage node responded with the outdated container list after successful creation/removal up until cache invalidation due to TTL. In order to decrease the probability of outdated responses node should update its cache on event receipts. Implement `ttlContainerLister.update` method which actualizes cached list of the owner's containers. Make node to call `update` method on `PutSuccess`/`DeleteSuccess` notifications from the `Container` contract. Signed-off-by: Leonard Lyubich <leonard@nspcc.ru>
This commit is contained in:
parent
d15a7d8d3d
commit
5ce8315cd8
2 changed files with 110 additions and 21 deletions
|
@ -245,6 +245,14 @@ func (s *lruNetmapSource) Epoch() (uint64, error) {
|
||||||
// that implements container lister.
|
// that implements container lister.
|
||||||
type ttlContainerLister ttlNetCache
|
type ttlContainerLister ttlNetCache
|
||||||
|
|
||||||
|
// value type for ttlNetCache used by ttlContainerLister.
|
||||||
|
type cacheItemContainerList struct {
|
||||||
|
// protects list from concurrent add/remove ops
|
||||||
|
mtx sync.RWMutex
|
||||||
|
// actual list of containers owner by the particular user
|
||||||
|
list []cid.ID
|
||||||
|
}
|
||||||
|
|
||||||
func newCachedContainerLister(c *cntClient.Client) *ttlContainerLister {
|
func newCachedContainerLister(c *cntClient.Client) *ttlContainerLister {
|
||||||
const (
|
const (
|
||||||
containerListerCacheSize = 100
|
containerListerCacheSize = 100
|
||||||
|
@ -266,7 +274,14 @@ func newCachedContainerLister(c *cntClient.Client) *ttlContainerLister {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return c.List(id)
|
list, err := c.List(id)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return &cacheItemContainerList{
|
||||||
|
list: list,
|
||||||
|
}, nil
|
||||||
})
|
})
|
||||||
|
|
||||||
return (*ttlContainerLister)(lruCnrListerCache)
|
return (*ttlContainerLister)(lruCnrListerCache)
|
||||||
|
@ -287,12 +302,68 @@ func (s *ttlContainerLister) List(id *user.ID) ([]cid.ID, error) {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return val.([]cid.ID), nil
|
// panic on typecast below is OK since developer must be careful,
|
||||||
|
// runtime can do nothing with wrong type occurrence
|
||||||
|
item := val.(*cacheItemContainerList)
|
||||||
|
|
||||||
|
item.mtx.RLock()
|
||||||
|
res := make([]cid.ID, len(item.list))
|
||||||
|
copy(res, item.list)
|
||||||
|
item.mtx.RUnlock()
|
||||||
|
|
||||||
|
return res, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// InvalidateContainerList removes cached list of container IDs.
|
// updates cached list of owner's containers: cnr is added if flag is true, otherwise it's removed.
|
||||||
func (s *ttlContainerLister) InvalidateContainerList(id user.ID) {
|
// Concurrent calls can lead to some races:
|
||||||
(*ttlNetCache)(s).remove(id.EncodeToString())
|
// - two parallel additions to missing owner's cache can lead to only one container to be cached
|
||||||
|
// - async cache value eviction can lead to idle addition
|
||||||
|
//
|
||||||
|
// All described race cases aren't critical since cache values expire anyway, we just try
|
||||||
|
// to increase cache actuality w/o huge overhead on synchronization.
|
||||||
|
func (s *ttlContainerLister) update(owner user.ID, cnr cid.ID, add bool) {
|
||||||
|
strOwner := owner.EncodeToString()
|
||||||
|
|
||||||
|
val, ok := (*ttlNetCache)(s).cache.Get(strOwner)
|
||||||
|
if !ok {
|
||||||
|
if add {
|
||||||
|
// first cached owner's container
|
||||||
|
(*ttlNetCache)(s).set(strOwner, &cacheItemContainerList{
|
||||||
|
list: []cid.ID{cnr},
|
||||||
|
}, nil)
|
||||||
|
}
|
||||||
|
|
||||||
|
// no-op on removal when no owner's containers are cached
|
||||||
|
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// panic on typecast below is OK since developer must be careful,
|
||||||
|
// runtime can do nothing with wrong type occurrence
|
||||||
|
item := val.(*valueWithTime).v.(*cacheItemContainerList)
|
||||||
|
|
||||||
|
item.mtx.Lock()
|
||||||
|
{
|
||||||
|
found := false
|
||||||
|
|
||||||
|
for i := range item.list {
|
||||||
|
if found = item.list[i].Equals(cnr); found {
|
||||||
|
if !add {
|
||||||
|
item.list = append(item.list[:i], item.list[i+1:]...)
|
||||||
|
// if list became empty we don't remove the value from the cache
|
||||||
|
// since empty list is a correct value, and we don't want to insta
|
||||||
|
// re-request it from the Sidechain
|
||||||
|
}
|
||||||
|
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if add && !found {
|
||||||
|
item.list = append(item.list, cnr)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
item.mtx.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
type cachedIRFetcher ttlNetCache
|
type cachedIRFetcher ttlNetCache
|
||||||
|
|
|
@ -63,12 +63,6 @@ func initContainerService(c *cfg) {
|
||||||
neoClient: wrap,
|
neoClient: wrap,
|
||||||
}
|
}
|
||||||
|
|
||||||
subscribeToContainerCreation(c, func(e event.Event) {
|
|
||||||
c.log.Debug("container creation event's receipt",
|
|
||||||
zap.Stringer("id", e.(containerEvent.PutSuccess).ID),
|
|
||||||
)
|
|
||||||
})
|
|
||||||
|
|
||||||
if c.cfgMorph.disableCache {
|
if c.cfgMorph.disableCache {
|
||||||
c.cfgObject.eaclSource = eACLFetcher
|
c.cfgObject.eaclSource = eACLFetcher
|
||||||
cnrRdr.eacl = eACLFetcher
|
cnrRdr.eacl = eACLFetcher
|
||||||
|
@ -81,9 +75,42 @@ func initContainerService(c *cfg) {
|
||||||
cachedEACLStorage := newCachedEACLStorage(eACLFetcher)
|
cachedEACLStorage := newCachedEACLStorage(eACLFetcher)
|
||||||
cachedContainerLister := newCachedContainerLister(wrap)
|
cachedContainerLister := newCachedContainerLister(wrap)
|
||||||
|
|
||||||
|
subscribeToContainerCreation(c, func(e event.Event) {
|
||||||
|
ev := e.(containerEvent.PutSuccess)
|
||||||
|
|
||||||
|
// read owner of the created container in order to update the reading cache.
|
||||||
|
// TODO: use owner directly from the event after neofs-contract#256 will become resolved
|
||||||
|
// but don't forget about the profit of reading the new container and caching it:
|
||||||
|
// creation success are most commonly tracked by polling GET op.
|
||||||
|
cnr, err := cachedContainerStorage.Get(ev.ID)
|
||||||
|
if err == nil {
|
||||||
|
cachedContainerLister.update(cnr.Value.Owner(), ev.ID, true)
|
||||||
|
} else {
|
||||||
|
// unlike removal, we expect successful receive of the container
|
||||||
|
// after successful creation, so logging can be useful
|
||||||
|
c.log.Error("read newly created container after the notification",
|
||||||
|
zap.Stringer("id", ev.ID),
|
||||||
|
zap.Error(err),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
c.log.Debug("container creation event's receipt",
|
||||||
|
zap.Stringer("id", ev.ID),
|
||||||
|
)
|
||||||
|
})
|
||||||
|
|
||||||
subscribeToContainerRemoval(c, func(e event.Event) {
|
subscribeToContainerRemoval(c, func(e event.Event) {
|
||||||
ev := e.(containerEvent.DeleteSuccess)
|
ev := e.(containerEvent.DeleteSuccess)
|
||||||
|
|
||||||
|
// read owner of the removed container in order to update the listing cache.
|
||||||
|
// It's strange to read already removed container, but we can successfully hit
|
||||||
|
// the cache.
|
||||||
|
// TODO: use owner directly from the event after neofs-contract#256 will become resolved
|
||||||
|
cnr, err := cachedContainerStorage.Get(ev.ID)
|
||||||
|
if err == nil {
|
||||||
|
cachedContainerLister.update(cnr.Value.Owner(), ev.ID, false)
|
||||||
|
}
|
||||||
|
|
||||||
cachedContainerStorage.handleRemoval(ev.ID)
|
cachedContainerStorage.handleRemoval(ev.ID)
|
||||||
|
|
||||||
c.log.Debug("container removal event's receipt",
|
c.log.Debug("container removal event's receipt",
|
||||||
|
@ -622,16 +649,7 @@ type morphContainerWriter struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m morphContainerWriter) Put(cnr containerCore.Container) (*cid.ID, error) {
|
func (m morphContainerWriter) Put(cnr containerCore.Container) (*cid.ID, error) {
|
||||||
containerID, err := cntClient.Put(m.neoClient, cnr)
|
return cntClient.Put(m.neoClient, cnr)
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
if m.cacheEnabled {
|
|
||||||
m.lists.InvalidateContainerList(cnr.Value.Owner())
|
|
||||||
}
|
|
||||||
|
|
||||||
return containerID, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m morphContainerWriter) Delete(witness containerCore.RemovalWitness) error {
|
func (m morphContainerWriter) Delete(witness containerCore.RemovalWitness) error {
|
||||||
|
|
Loading…
Reference in a new issue