[#236] cache: Refactor ListObjectsCache

Replaced map in ListObjectsCache by gcache.
Now ListObjectsCache keeps only objectIDs and
requests ObjectInfo from cache or NeoFS.
Refactored ListObjectsCache keys: removed delimiter and method fields.
Now ListObjectsCache keeps cache with all objects versions.

Signed-off-by: Angira Kekteeva <kira@nspcc.ru>
This commit is contained in:
Angira Kekteeva 2021-09-01 19:10:31 +03:00 committed by Alex Vanin
parent 1bc2e51cbc
commit 1ece42b23f
8 changed files with 171 additions and 149 deletions

View file

@ -1,11 +1,12 @@
package cache package cache
import ( import (
"sync" "fmt"
"time" "time"
"github.com/bluele/gcache"
cid "github.com/nspcc-dev/neofs-api-go/pkg/container/id" cid "github.com/nspcc-dev/neofs-api-go/pkg/container/id"
"github.com/nspcc-dev/neofs-s3-gw/api" "github.com/nspcc-dev/neofs-api-go/pkg/object"
) )
/* /*
@ -19,86 +20,75 @@ import (
the list of objects. Otherwise we send the request to NeoFS. the list of objects. Otherwise we send the request to NeoFS.
*/ */
// ObjectsListCache provides interface for cache of ListObjectsV2 in a layer struct.
type ( type (
// ObjectsListCache provides interface for cache of ListObjectsV2 in a layer struct.
ObjectsListCache interface { ObjectsListCache interface {
Get(key ObjectsListKey) []*api.ObjectInfo Get(key ObjectsListKey) []*object.ID
Put(key ObjectsListKey, objects []*api.ObjectInfo) Put(key ObjectsListKey, oids []*object.ID) error
} }
) )
// DefaultObjectsListCacheLifetime is a default lifetime of entries in cache of ListObjects.
const DefaultObjectsListCacheLifetime = time.Second * 60
const ( const (
// ListObjectsMethod is used to mark a cache entry for ListObjectsV1/V2. // DefaultObjectsListCacheLifetime is a default lifetime of entries in cache of ListObjects.
ListObjectsMethod = "listObjects" DefaultObjectsListCacheLifetime = time.Second * 60
// ListVersionsMethod is used to mark a cache entry for ListObjectVersions. // DefaultObjectsListCacheSize is a default size of cache of ListObjects.
ListVersionsMethod = "listVersions" DefaultObjectsListCacheSize = 1e5
) )
type ( type (
// ListObjectsCache contains cache for ListObjects and ListObjectVersions. // ListObjectsCache contains cache for ListObjects and ListObjectVersions.
ListObjectsCache struct { ListObjectsCache struct {
cacheLifetime time.Duration lifetime time.Duration
caches map[ObjectsListKey]objectsListEntry cache gcache.Cache
mtx sync.RWMutex
}
objectsListEntry struct {
list []*api.ObjectInfo
} }
// ObjectsListKey is a key to find a ObjectsListCache's entry. // ObjectsListKey is a key to find a ObjectsListCache's entry.
ObjectsListKey struct { ObjectsListKey struct {
Method string cid string
Key string prefix string
Delimiter string
Prefix string
} }
) )
// NewObjectsListCache is a constructor which creates an object of ListObjectsCache with given lifetime of entries. // NewObjectsListCache is a constructor which creates an object of ListObjectsCache with given lifetime of entries.
func NewObjectsListCache(lifetime time.Duration) *ListObjectsCache { func NewObjectsListCache(cacheSize int, lifetime time.Duration) *ListObjectsCache {
gc := gcache.New(cacheSize).LRU().Build()
return &ListObjectsCache{ return &ListObjectsCache{
caches: make(map[ObjectsListKey]objectsListEntry), cache: gc,
cacheLifetime: lifetime, lifetime: lifetime,
} }
} }
// Get return list of ObjectInfo. // Get return list of ObjectInfo.
func (l *ListObjectsCache) Get(key ObjectsListKey) []*api.ObjectInfo { func (l *ListObjectsCache) Get(key ObjectsListKey) []*object.ID {
l.mtx.RLock() entry, err := l.cache.Get(key)
defer l.mtx.RUnlock() if err != nil {
if val, ok := l.caches[key]; ok { return nil
return val.list
} }
return nil
result, ok := entry.([]*object.ID)
if !ok {
return nil
}
return result
} }
// Put put a list of objects to cache. // Put puts a list of objects to cache.
func (l *ListObjectsCache) Put(key ObjectsListKey, objects []*api.ObjectInfo) { func (l *ListObjectsCache) Put(key ObjectsListKey, oids []*object.ID) error {
if len(objects) == 0 { if len(oids) == 0 {
return return fmt.Errorf("list is empty, cid: %s, prefix: %s", key.cid, key.prefix)
} }
var c objectsListEntry
l.mtx.Lock() return l.cache.SetWithExpire(key, oids, l.lifetime)
defer l.mtx.Unlock()
c.list = objects
l.caches[key] = c
time.AfterFunc(l.cacheLifetime, func() {
l.mtx.Lock()
delete(l.caches, key)
l.mtx.Unlock()
})
} }
// CreateObjectsListCacheKey returns ObjectsListKey with given CID, method, prefix, and delimiter. // CreateObjectsListCacheKey returns ObjectsListKey with given CID, method, prefix, and delimiter.
func CreateObjectsListCacheKey(cid *cid.ID, method, prefix, delimiter string) (ObjectsListKey, error) { func CreateObjectsListCacheKey(cid *cid.ID, prefix string) ObjectsListKey {
p := ObjectsListKey{ p := ObjectsListKey{
Method: method, cid: cid.String(),
Key: cid.String(), prefix: prefix,
Delimiter: delimiter,
Prefix: prefix,
} }
return p, nil return p
} }

View file

@ -3,16 +3,15 @@ package cache
import ( import (
"crypto/rand" "crypto/rand"
"crypto/sha256" "crypto/sha256"
"sort"
"testing" "testing"
"time" "time"
"github.com/nspcc-dev/neofs-api-go/pkg/object" "github.com/nspcc-dev/neofs-api-go/pkg/object"
"github.com/nspcc-dev/neofs-s3-gw/api"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
const testingCacheLifetime = 5 * time.Second const testingCacheLifetime = 5 * time.Second
const testingCacheSize = 10
func randID(t *testing.T) *object.ID { func randID(t *testing.T) *object.ID {
id := object.NewID() id := object.NewID()
@ -31,83 +30,82 @@ func randSHA256Checksum(t *testing.T) (cs [sha256.Size]byte) {
func TestObjectsListCache(t *testing.T) { func TestObjectsListCache(t *testing.T) {
var ( var (
cacheSize = 10 cacheSize = 10
objects []*api.ObjectInfo ids []*object.ID
userKey = "key" userKey = "key"
) )
for i := 0; i < cacheSize; i++ { for i := 0; i < cacheSize; i++ {
id := randID(t) id := randID(t)
objects = append(objects, &api.ObjectInfo{ID: id, Name: id.String()}) ids = append(ids, id)
} }
sort.Slice(objects, func(i, j int) bool {
return objects[i].Name < objects[j].Name
})
t.Run("lifetime", func(t *testing.T) { t.Run("lifetime", func(t *testing.T) {
var ( var (
cache = NewObjectsListCache(testingCacheLifetime) cache = NewObjectsListCache(testingCacheSize, testingCacheLifetime)
cacheKey = ObjectsListKey{Key: userKey} cacheKey = ObjectsListKey{cid: userKey}
) )
cache.Put(cacheKey, objects) err := cache.Put(cacheKey, ids)
require.NoError(t, err)
condition := func() bool { condition := func() bool {
return cache.Get(cacheKey) == nil return cache.Get(cacheKey) == nil
} }
require.Never(t, condition, cache.cacheLifetime, time.Second) require.Never(t, condition, cache.lifetime, time.Second)
require.Eventually(t, condition, time.Second, 10*time.Millisecond) require.Eventually(t, condition, time.Second, 10*time.Millisecond)
}) })
t.Run("get cache with empty delimiter, empty prefix", func(t *testing.T) { t.Run("get cache with empty prefix", func(t *testing.T) {
var ( var (
cache = NewObjectsListCache(testingCacheLifetime) cache = NewObjectsListCache(testingCacheSize, testingCacheLifetime)
cacheKey = ObjectsListKey{Key: userKey} cacheKey = ObjectsListKey{cid: userKey}
) )
cache.Put(cacheKey, objects) err := cache.Put(cacheKey, ids)
require.NoError(t, err)
actual := cache.Get(cacheKey) actual := cache.Get(cacheKey)
require.Equal(t, len(objects), len(actual)) require.Equal(t, len(ids), len(actual))
for i := range objects { for i := range ids {
require.Equal(t, objects[i], actual[i]) require.Equal(t, ids[i], actual[i])
} }
}) })
t.Run("get cache with delimiter and prefix", func(t *testing.T) { t.Run("get cache with prefix", func(t *testing.T) {
cacheKey := ObjectsListKey{ cacheKey := ObjectsListKey{
Key: userKey, cid: userKey,
Delimiter: "/", prefix: "dir",
Prefix: "dir",
} }
cache := NewObjectsListCache(testingCacheLifetime) cache := NewObjectsListCache(testingCacheSize, testingCacheLifetime)
cache.Put(cacheKey, objects) err := cache.Put(cacheKey, ids)
require.NoError(t, err)
actual := cache.Get(cacheKey) actual := cache.Get(cacheKey)
require.Equal(t, len(objects), len(actual)) require.Equal(t, len(ids), len(actual))
for i := range objects { for i := range ids {
require.Equal(t, objects[i], actual[i]) require.Equal(t, ids[i], actual[i])
} }
}) })
t.Run("get cache with other delimiter and prefix", func(t *testing.T) { t.Run("get cache with other prefix", func(t *testing.T) {
var ( var (
cacheKey = ObjectsListKey{ cacheKey = ObjectsListKey{
Key: userKey, cid: userKey,
Delimiter: "/", prefix: "dir",
Prefix: "dir",
} }
newKey = ObjectsListKey{ newKey = ObjectsListKey{
Key: "key", cid: "key",
Delimiter: "*", prefix: "obj",
Prefix: "obj",
} }
) )
cache := NewObjectsListCache(testingCacheLifetime) cache := NewObjectsListCache(testingCacheSize, testingCacheLifetime)
cache.Put(cacheKey, objects) err := cache.Put(cacheKey, ids)
require.NoError(t, err)
actual := cache.Get(newKey) actual := cache.Get(newKey)
require.Nil(t, actual) require.Nil(t, actual)
@ -116,15 +114,16 @@ func TestObjectsListCache(t *testing.T) {
t.Run("get cache with non-existing key", func(t *testing.T) { t.Run("get cache with non-existing key", func(t *testing.T) {
var ( var (
cacheKey = ObjectsListKey{ cacheKey = ObjectsListKey{
Key: userKey, cid: userKey,
} }
newKey = ObjectsListKey{ newKey = ObjectsListKey{
Key: "asdf", cid: "asdf",
} }
) )
cache := NewObjectsListCache(testingCacheLifetime) cache := NewObjectsListCache(testingCacheSize, testingCacheLifetime)
cache.Put(cacheKey, objects) err := cache.Put(cacheKey, ids)
require.NoError(t, err)
actual := cache.Get(newKey) actual := cache.Get(newKey)
require.Nil(t, actual) require.Nil(t, actual)

View file

@ -41,6 +41,7 @@ type (
Lifetime time.Duration Lifetime time.Duration
Size int Size int
ListObjectsLifetime time.Duration ListObjectsLifetime time.Duration
ListObjectsSize int
} }
// Params stores basic API parameters. // Params stores basic API parameters.
@ -197,7 +198,7 @@ func NewLayer(log *zap.Logger, conns pool.Pool, config *CacheConfig) Client {
return &layer{ return &layer{
pool: conns, pool: conns,
log: log, log: log,
listsCache: cache.NewObjectsListCache(config.ListObjectsLifetime), listsCache: cache.NewObjectsListCache(config.ListObjectsSize, config.ListObjectsLifetime),
objCache: cache.New(config.Size, config.Lifetime), objCache: cache.New(config.Size, config.Lifetime),
//todo reconsider cache params //todo reconsider cache params
namesCache: cache.NewObjectsNameCache(1000, time.Minute), namesCache: cache.NewObjectsNameCache(1000, time.Minute),
@ -671,7 +672,7 @@ func (n *layer) DeleteBucket(ctx context.Context, p *DeleteBucketParams) error {
return err return err
} }
objects, err := n.listSortedObjectsFromNeoFS(ctx, allObjectParams{Bucket: bucketInfo}) objects, err := n.listSortedObjects(ctx, allObjectParams{Bucket: bucketInfo})
if err != nil { if err != nil {
return err return err
} }

View file

@ -22,9 +22,10 @@ import (
type ( type (
findParams struct { findParams struct {
attr string attr string
val string val string
cid *cid.ID cid *cid.ID
prefix string
} }
getParams struct { getParams struct {
@ -82,6 +83,11 @@ func (n *layer) objectSearch(ctx context.Context, p *findParams) ([]*object.ID,
opts.AddFilter(p.attr, filename, object.MatchStringEqual) opts.AddFilter(p.attr, filename, object.MatchStringEqual)
} }
} }
if prefix, err := url.QueryUnescape(p.prefix); err != nil {
return nil, err
} else if prefix != "" {
opts.AddFilter(object.AttributeFileName, prefix, object.MatchCommonPrefix)
}
return n.pool.SearchObject(ctx, new(client.SearchObjectParams).WithContainerID(p.cid).WithSearchFilters(opts), n.BearerOpt(ctx)) return n.pool.SearchObject(ctx, new(client.SearchObjectParams).WithContainerID(p.cid).WithSearchFilters(opts), n.BearerOpt(ctx))
} }
@ -457,7 +463,7 @@ func (n *layer) ListObjectsV2(ctx context.Context, p *ListObjectsParamsV2) (*Lis
return &result, nil return &result, nil
} }
func (n *layer) listSortedObjectsFromNeoFS(ctx context.Context, p allObjectParams) ([]*api.ObjectInfo, error) { func (n *layer) listSortedObjects(ctx context.Context, p allObjectParams) ([]*api.ObjectInfo, error) {
versions, err := n.getAllObjectsVersions(ctx, p.Bucket, p.Prefix, p.Delimiter) versions, err := n.getAllObjectsVersions(ctx, p.Bucket, p.Prefix, p.Delimiter)
if err != nil { if err != nil {
return nil, err return nil, err
@ -479,19 +485,34 @@ func (n *layer) listSortedObjectsFromNeoFS(ctx context.Context, p allObjectParam
} }
func (n *layer) getAllObjectsVersions(ctx context.Context, bkt *api.BucketInfo, prefix, delimiter string) (map[string]*objectVersions, error) { func (n *layer) getAllObjectsVersions(ctx context.Context, bkt *api.BucketInfo, prefix, delimiter string) (map[string]*objectVersions, error) {
ids, err := n.objectSearch(ctx, &findParams{cid: bkt.CID}) var err error
if err != nil {
return nil, err cacheKey := cache.CreateObjectsListCacheKey(bkt.CID, prefix)
ids := n.listsCache.Get(cacheKey)
if ids == nil {
ids, err = n.objectSearch(ctx, &findParams{cid: bkt.CID, prefix: prefix})
if err != nil {
return nil, err
}
if err := n.listsCache.Put(cacheKey, ids); err != nil {
n.log.Error("couldn't cache list of objects", zap.Error(err))
}
} }
versions := make(map[string]*objectVersions, len(ids)/2) versions := make(map[string]*objectVersions, len(ids)/2)
for _, id := range ids {
meta, err := n.objectHead(ctx, bkt.CID, id) for i := 0; i < len(ids); i++ {
if err != nil { if ids[i] == nil {
n.log.Warn("could not fetch object meta", zap.Error(err))
continue continue
} }
if oi := objectInfoFromMeta(bkt, meta, prefix, delimiter); oi != nil { obj := n.objectFromObjectsCacheOrNeoFS(ctx, bkt.CID, ids[i])
if obj == nil {
// mark ids[i] as an address of invalid object
ids[i] = nil
continue
}
if oi := objectInfoFromMeta(bkt, obj, prefix, delimiter); oi != nil {
if isSystem(oi) { if isSystem(oi) {
continue continue
} }
@ -573,7 +594,6 @@ func (n *layer) listAllObjects(ctx context.Context, p ListObjectsParamsCommon) (
var ( var (
err error err error
bkt *api.BucketInfo bkt *api.BucketInfo
cacheKey cache.ObjectsListKey
allObjects []*api.ObjectInfo allObjects []*api.ObjectInfo
) )
@ -581,26 +601,15 @@ func (n *layer) listAllObjects(ctx context.Context, p ListObjectsParamsCommon) (
return nil, err return nil, err
} }
if cacheKey, err = cache.CreateObjectsListCacheKey(bkt.CID, cache.ListObjectsMethod, p.Prefix, p.Delimiter); err != nil { allObjects, err = n.listSortedObjects(ctx, allObjectParams{
Bucket: bkt,
Prefix: p.Prefix,
Delimiter: p.Delimiter,
})
if err != nil {
return nil, err return nil, err
} }
allObjects = n.listsCache.Get(cacheKey)
if allObjects == nil {
allObjects, err = n.listSortedObjectsFromNeoFS(ctx, allObjectParams{
Bucket: bkt,
Prefix: p.Prefix,
Delimiter: p.Delimiter,
})
if err != nil {
return nil, err
}
// putting to cache a copy of allObjects because allObjects can be modified further
n.listsCache.Put(cacheKey, append([]*api.ObjectInfo(nil), allObjects...))
}
return allObjects, nil return allObjects, nil
} }
@ -613,3 +622,22 @@ func (n *layer) isVersioningEnabled(ctx context.Context, bktInfo *api.BucketInfo
return settings.VersioningEnabled return settings.VersioningEnabled
} }
func (n *layer) objectFromObjectsCacheOrNeoFS(ctx context.Context, cid *cid.ID, oid *object.ID) *object.Object {
var (
err error
meta = n.objCache.Get(newAddress(cid, oid))
)
if meta == nil {
meta, err = n.objectHead(ctx, cid, oid)
if err != nil {
n.log.Warn("could not fetch object meta", zap.Error(err))
return nil
}
if err = n.objCache.Put(*meta); err != nil {
n.log.Error("couldn't cache an object", zap.Error(err))
}
}
return meta
}

View file

@ -8,7 +8,6 @@ import (
"github.com/nspcc-dev/neofs-api-go/pkg/object" "github.com/nspcc-dev/neofs-api-go/pkg/object"
"github.com/nspcc-dev/neofs-s3-gw/api" "github.com/nspcc-dev/neofs-s3-gw/api"
"github.com/nspcc-dev/neofs-s3-gw/api/cache"
"github.com/nspcc-dev/neofs-s3-gw/api/errors" "github.com/nspcc-dev/neofs-s3-gw/api/errors"
) )
@ -146,39 +145,29 @@ func (n *layer) GetBucketVersioning(ctx context.Context, bucketName string) (*Bu
} }
func (n *layer) ListObjectVersions(ctx context.Context, p *ListObjectVersionsParams) (*ListObjectVersionsInfo, error) { func (n *layer) ListObjectVersions(ctx context.Context, p *ListObjectVersionsParams) (*ListObjectVersionsInfo, error) {
var versions map[string]*objectVersions var (
res := &ListObjectVersionsInfo{} versions map[string]*objectVersions
allObjects = make([]*api.ObjectInfo, 0, p.MaxKeys)
res = &ListObjectVersionsInfo{}
)
bkt, err := n.GetBucketInfo(ctx, p.Bucket) bkt, err := n.GetBucketInfo(ctx, p.Bucket)
if err != nil { if err != nil {
return nil, err return nil, err
} }
cacheKey, err := cache.CreateObjectsListCacheKey(bkt.CID, cache.ListVersionsMethod, p.Prefix, p.Delimiter) if versions, err = n.getAllObjectsVersions(ctx, bkt, p.Prefix, p.Delimiter); err != nil {
if err != nil {
return nil, err return nil, err
} }
allObjects := n.listsCache.Get(cacheKey) sortedNames := make([]string, 0, len(versions))
if allObjects == nil { for k := range versions {
versions, err = n.getAllObjectsVersions(ctx, bkt, p.Prefix, p.Delimiter) sortedNames = append(sortedNames, k)
if err != nil { }
return nil, err sort.Strings(sortedNames)
}
sortedNames := make([]string, 0, len(versions)) for _, name := range sortedNames {
for k := range versions { allObjects = append(allObjects, versions[name].getFiltered()...)
sortedNames = append(sortedNames, k)
}
sort.Strings(sortedNames)
allObjects = make([]*api.ObjectInfo, 0, p.MaxKeys)
for _, name := range sortedNames {
allObjects = append(allObjects, versions[name].getFiltered()...)
}
// putting to cache a copy of allObjects because allObjects can be modified further
n.listsCache.Put(cacheKey, append([]*api.ObjectInfo(nil), allObjects...))
} }
for i, obj := range allObjects { for i, obj := range allObjects {

View file

@ -331,7 +331,8 @@ func prepareContext(t *testing.T) *testContext {
layer: NewLayer(l, tp, &CacheConfig{ layer: NewLayer(l, tp, &CacheConfig{
Size: cache.DefaultObjectsCacheSize, Size: cache.DefaultObjectsCacheSize,
Lifetime: cache.DefaultObjectsCacheLifetime, Lifetime: cache.DefaultObjectsCacheLifetime,
ListObjectsLifetime: cache.DefaultObjectsListCacheLifetime}, ListObjectsLifetime: cache.DefaultObjectsListCacheLifetime,
ListObjectsSize: cache.DefaultObjectsListCacheSize},
), ),
bkt: bktName, bkt: bktName,
bktID: bktID, bktID: bktID,

View file

@ -217,6 +217,7 @@ func (a *App) Server(ctx context.Context) {
func getCacheOptions(v *viper.Viper, l *zap.Logger) *layer.CacheConfig { func getCacheOptions(v *viper.Viper, l *zap.Logger) *layer.CacheConfig {
cacheCfg := layer.CacheConfig{ cacheCfg := layer.CacheConfig{
ListObjectsLifetime: cache.DefaultObjectsListCacheLifetime, ListObjectsLifetime: cache.DefaultObjectsListCacheLifetime,
ListObjectsSize: cache.DefaultObjectsListCacheSize,
Size: cache.DefaultObjectsCacheSize, Size: cache.DefaultObjectsCacheSize,
Lifetime: cache.DefaultObjectsCacheLifetime, Lifetime: cache.DefaultObjectsCacheLifetime,
} }
@ -253,6 +254,18 @@ func getCacheOptions(v *viper.Viper, l *zap.Logger) *layer.CacheConfig {
cacheCfg.ListObjectsLifetime = lifetime cacheCfg.ListObjectsLifetime = lifetime
} }
} }
if v.IsSet(cfgListObjectsCacheSize) {
size := v.GetInt(cfgListObjectsCacheSize)
if size <= 0 {
l.Error("invalid cache size, using default value",
zap.Int("value in config", size),
zap.Int("default", cacheCfg.ListObjectsSize))
} else {
cacheCfg.Size = size
}
}
return &cacheCfg return &cacheCfg
} }

View file

@ -58,6 +58,7 @@ const ( // Settings.
cfgObjectsCacheLifetime = "cache.lifetime" cfgObjectsCacheLifetime = "cache.lifetime"
cfgCacheSize = "cache.size" cfgCacheSize = "cache.size"
cfgListObjectsCacheLifetime = "cache.list_objects_lifetime" cfgListObjectsCacheLifetime = "cache.list_objects_lifetime"
cfgListObjectsCacheSize = "cache.list_objects_size"
// Policy. // Policy.
cfgDefaultPolicy = "default_policy" cfgDefaultPolicy = "default_policy"