Merge pull request #181 from masterSplinter01/feature/179-add-cache-to-lov1

Add cache to ListObjectsV1
This commit is contained in:
Kirillov Denis 2021-08-09 14:20:20 +03:00 committed by GitHub
commit b555a1b1d8
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 356 additions and 113 deletions

View file

@ -24,7 +24,7 @@ type (
layer struct {
pool pool.Pool
log *zap.Logger
cache ObjectsListV2Cache
cache ObjectsListCache
}
// Params stores basic API parameters.
@ -131,7 +131,7 @@ func NewLayer(log *zap.Logger, conns pool.Pool) Client {
return &layer{
pool: conns,
log: log,
cache: newListObjectsCache(),
cache: newListObjectsCache(defaultCacheLifetime),
}
}

View file

@ -13,7 +13,6 @@ import (
cid "github.com/nspcc-dev/neofs-api-go/pkg/container/id"
"github.com/nspcc-dev/neofs-api-go/pkg/object"
"github.com/nspcc-dev/neofs-s3-gw/api"
"github.com/nspcc-dev/neofs-s3-gw/creds/accessbox"
"go.uber.org/zap"
)
@ -56,10 +55,9 @@ type (
}
allObjectParams struct {
Bucket *BucketInfo
Delimiter string
Prefix string
StartAfter string
Bucket *BucketInfo
Delimiter string
Prefix string
}
)
@ -209,45 +207,34 @@ func (n *layer) objectDelete(ctx context.Context, address *object.Address) error
// ListObjectsV1 returns objects in a bucket for requests of Version 1.
func (n *layer) ListObjectsV1(ctx context.Context, p *ListObjectsParamsV1) (*ListObjectsInfoV1, error) {
var (
err error
result ListObjectsInfoV1
bkt *BucketInfo
err error
result ListObjectsInfoV1
allObjects []*ObjectInfo
)
if p.MaxKeys == 0 {
return &result, nil
}
if bkt, err = n.GetBucketInfo(ctx, p.Bucket); err != nil {
if allObjects, err = n.listAllObjects(ctx, p.ListObjectsParamsCommon); err != nil {
return nil, err
}
allObjects, err := n.listSortedAllObjects(ctx, allObjectParams{
Bucket: bkt,
Prefix: p.Prefix,
Delimiter: p.Delimiter,
StartAfter: p.Marker,
})
if err != nil {
return nil, err
if len(allObjects) == 0 {
return &result, nil
}
if p.Marker != "" {
allObjects = trimAfterObjectName(p.Marker, allObjects)
}
if len(allObjects) > p.MaxKeys {
result.IsTruncated = true
nextObject := allObjects[p.MaxKeys-1]
result.NextMarker = nextObject.Name
allObjects = allObjects[:p.MaxKeys]
result.NextMarker = allObjects[len(allObjects)-1].Name
}
for _, ov := range allObjects {
if ov.isDir {
result.Prefixes = append(result.Prefixes, ov.Name)
} else {
result.Objects = append(result.Objects, ov)
}
}
result.Prefixes, result.Objects = triageObjects(allObjects)
return &result, nil
}
@ -258,67 +245,40 @@ func (n *layer) ListObjectsV2(ctx context.Context, p *ListObjectsParamsV2) (*Lis
err error
result ListObjectsInfoV2
allObjects []*ObjectInfo
bkt *BucketInfo
cacheKey string
box *accessbox.Box
)
if p.MaxKeys == 0 {
return &result, nil
}
if bkt, err = n.GetBucketInfo(ctx, p.Bucket); err != nil {
if allObjects, err = n.listAllObjects(ctx, p.ListObjectsParamsCommon); err != nil {
return nil, err
}
if box, err = GetBoxData(ctx); err != nil {
return nil, err
if len(allObjects) == 0 {
return &result, nil
}
cacheKey = createKey(box.Gate.AccessKey, bkt.CID)
if p.ContinuationToken != "" {
allObjects = n.cache.Get(p.ContinuationToken, cacheKey)
allObjects = trimStartAfter(p.StartAfter, allObjects)
allObjects = trimAfterObjectID(p.ContinuationToken, allObjects)
}
if allObjects == nil {
allObjects, err = n.listSortedAllObjects(ctx, allObjectParams{
Bucket: bkt,
Prefix: p.Prefix,
Delimiter: p.Delimiter,
StartAfter: p.StartAfter,
})
if err != nil {
return nil, err
}
if p.ContinuationToken != "" {
allObjects = trimAfterObjectID(p.ContinuationToken, allObjects)
}
if p.StartAfter != "" {
allObjects = trimAfterObjectName(p.StartAfter, allObjects)
}
if len(allObjects) > p.MaxKeys {
result.IsTruncated = true
restObjects := allObjects[p.MaxKeys:]
n.cache.Put(cacheKey, restObjects)
result.NextContinuationToken = restObjects[0].id.String()
allObjects = allObjects[:p.MaxKeys]
result.NextContinuationToken = allObjects[len(allObjects)-1].id.String()
}
for _, ov := range allObjects {
if ov.isDir {
result.Prefixes = append(result.Prefixes, ov.Name)
} else {
result.Objects = append(result.Objects, ov)
}
}
result.Prefixes, result.Objects = triageObjects(allObjects)
return &result, nil
}
func (n *layer) listSortedAllObjects(ctx context.Context, p allObjectParams) ([]*ObjectInfo, error) {
func (n *layer) listSortedObjectsFromNeoFS(ctx context.Context, p allObjectParams) ([]*ObjectInfo, error) {
var (
err error
ids []*object.ID
@ -346,9 +306,6 @@ func (n *layer) listSortedAllObjects(ctx context.Context, p allObjectParams) ([]
if _, ok := uniqNames[oi.Name]; ok {
continue
}
if len(p.StartAfter) > 0 && oi.Name <= p.StartAfter {
continue
}
uniqNames[oi.Name] = oi.isDir
@ -363,22 +320,75 @@ func (n *layer) listSortedAllObjects(ctx context.Context, p allObjectParams) ([]
return objects, nil
}
func trimStartAfter(startAfter string, objects []*ObjectInfo) []*ObjectInfo {
if objects != nil && len(startAfter) != 0 && objects[0].Name <= startAfter {
for i := range objects {
if objects[i].Name > startAfter {
return objects[i:]
}
}
func trimAfterObjectName(startAfter string, objects []*ObjectInfo) []*ObjectInfo {
if len(objects) != 0 && objects[len(objects)-1].Name <= startAfter {
return nil
}
return objects
}
func trimAfterObjectID(id string, objects []*ObjectInfo) []*ObjectInfo {
for i, obj := range objects {
if obj.ID().String() == id {
for i := range objects {
if objects[i].Name > startAfter {
return objects[i:]
}
}
return objects
return nil
}
func trimAfterObjectID(id string, objects []*ObjectInfo) []*ObjectInfo {
if len(objects) != 0 && objects[len(objects)-1].id.String() == id {
return []*ObjectInfo{}
}
for i, obj := range objects {
if obj.ID().String() == id {
return objects[i+1:]
}
}
return nil
}
func triageObjects(allObjects []*ObjectInfo) (prefixes []string, objects []*ObjectInfo) {
for _, ov := range allObjects {
if ov.isDir {
prefixes = append(prefixes, ov.Name)
} else {
objects = append(objects, ov)
}
}
return
}
func (n *layer) listAllObjects(ctx context.Context, p ListObjectsParamsCommon) ([]*ObjectInfo, error) {
var (
err error
bkt *BucketInfo
cacheKey cacheOptions
allObjects []*ObjectInfo
)
if bkt, err = n.GetBucketInfo(ctx, p.Bucket); err != nil {
return nil, err
}
if cacheKey, err = createKey(ctx, bkt.CID, p.Prefix, p.Delimiter); err != nil {
return nil, err
}
allObjects = n.cache.Get(cacheKey)
if allObjects == nil {
allObjects, err = n.listSortedObjectsFromNeoFS(ctx, allObjectParams{
Bucket: bkt,
Prefix: p.Prefix,
Delimiter: p.Delimiter,
})
if err != nil {
return nil, err
}
// putting to cache a copy of allObjects because allObjects can be modified further
n.cache.Put(cacheKey, append([]*ObjectInfo(nil), allObjects...))
}
return allObjects, nil
}

View file

@ -1,6 +1,7 @@
package layer
import (
"context"
"sync"
"time"
@ -8,72 +9,84 @@ import (
)
/*
This is an implementation of a cache for ListObjectsV2 which we return to users by ContinuationToken.
This is an implementation of a cache for ListObjectsV2/V1 which we can return to users when we receive a ListObjects
request.
The cache is a map which has a key: (access_key from AccessBox) + container_id and a value: list of objects with
creation time. After putting a record we start a timer (via time.AfterFunc) that removes the record after
defaultCacheLifetime value.
The cache is a map which has a key: cacheOptions struct and a value: list of objects. After putting a record we
start a timer (via time.AfterFunc) that removes the record after defaultCacheLifetime value.
ContinuationToken in our gateway is an objectID in NeoFS.
We don't keep ContinuationToken in this structure because we assume that users who received the token can reconnect
to other gateways and they should be able to get a list of objects.
When we receive the token from the user we just try to find the cache and then we return the list of objects which
starts from this token (i.e. objectID).
When we get a request from the user we just try to find the suitable and non-expired cache and then we return
the list of objects. Otherwise we send the request to NeoFS.
*/
// ObjectsListV2Cache provides interface for cache of ListObjectsV2 in a layer struct.
// ObjectsListCache provides interface for cache of ListObjectsV2 in a layer struct.
type (
ObjectsListV2Cache interface {
Get(token string, key string) []*ObjectInfo
Put(key string, objects []*ObjectInfo)
ObjectsListCache interface {
Get(key cacheOptions) []*ObjectInfo
Put(key cacheOptions, objects []*ObjectInfo)
}
)
var (
defaultCacheLifetime = time.Second * 60
)
const defaultCacheLifetime = time.Second * 60
type (
listObjectsCache struct {
caches map[string]cache
mtx sync.RWMutex
cacheLifetime time.Duration
caches map[cacheOptions]cache
mtx sync.RWMutex
}
cache struct {
list []*ObjectInfo
}
cacheOptions struct {
key string
delimiter string
prefix string
}
)
func newListObjectsCache() *listObjectsCache {
func newListObjectsCache(lifetime time.Duration) *listObjectsCache {
return &listObjectsCache{
caches: make(map[string]cache),
caches: make(map[cacheOptions]cache),
cacheLifetime: lifetime,
}
}
func (l *listObjectsCache) Get(token, key string) []*ObjectInfo {
func (l *listObjectsCache) Get(key cacheOptions) []*ObjectInfo {
l.mtx.RLock()
defer l.mtx.RUnlock()
if val, ok := l.caches[key]; ok {
return trimAfterObjectID(token, val.list)
return val.list
}
return nil
}
func (l *listObjectsCache) Put(key string, objects []*ObjectInfo) {
func (l *listObjectsCache) Put(key cacheOptions, objects []*ObjectInfo) {
if len(objects) == 0 {
return
}
var c cache
l.mtx.Lock()
defer l.mtx.Unlock()
c.list = objects
l.caches[key] = c
time.AfterFunc(defaultCacheLifetime, func() {
time.AfterFunc(l.cacheLifetime, func() {
l.mtx.Lock()
delete(l.caches, key)
l.mtx.Unlock()
})
}
func createKey(accessKey string, cid *cid.ID) string {
return accessKey + cid.String()
func createKey(ctx context.Context, cid *cid.ID, prefix, delimiter string) (cacheOptions, error) {
box, err := GetBoxData(ctx)
if err != nil {
return cacheOptions{}, err
}
p := cacheOptions{
key: box.Gate.AccessKey + cid.String(),
delimiter: delimiter,
prefix: prefix,
}
return p, nil
}

View file

@ -0,0 +1,220 @@
package layer
import (
"crypto/rand"
"crypto/sha256"
"sort"
"testing"
"time"
"github.com/nspcc-dev/neofs-api-go/pkg/object"
"github.com/stretchr/testify/require"
)
const testingCacheLifetime = 5 * time.Second
func randID(t *testing.T) *object.ID {
id := object.NewID()
id.SetSHA256(randSHA256Checksum(t))
return id
}
func randSHA256Checksum(t *testing.T) (cs [sha256.Size]byte) {
_, err := rand.Read(cs[:])
require.NoError(t, err)
return
}
func TestTrimAfterObjectName(t *testing.T) {
var (
objects []*ObjectInfo
names = []string{"b", "c", "d"}
)
for _, name := range names {
objects = append(objects, &ObjectInfo{Name: name})
}
t.Run("startafter before all objects", func(t *testing.T) {
actual := trimAfterObjectName("a", objects)
require.Equal(t, objects, actual)
})
t.Run("startafter first object", func(t *testing.T) {
actual := trimAfterObjectName(names[0], objects)
require.Equal(t, objects[1:], actual)
})
t.Run("startafter second-to-last object", func(t *testing.T) {
actual := trimAfterObjectName(names[len(names)-2], objects)
require.Equal(t, objects[len(objects)-1:], actual)
})
t.Run("startafter last object", func(t *testing.T) {
actual := trimAfterObjectName(names[len(names)-1], objects)
require.Empty(t, actual)
})
t.Run("startafter after all objects", func(t *testing.T) {
actual := trimAfterObjectName("z", objects)
require.Nil(t, actual)
})
t.Run("empty objects", func(t *testing.T) {
actual := trimAfterObjectName(names[0], []*ObjectInfo{})
require.Nil(t, actual)
})
t.Run("nil objects", func(t *testing.T) {
actual := trimAfterObjectName(names[0], nil)
require.Nil(t, actual)
})
t.Run("empty startafter", func(t *testing.T) {
actual := trimAfterObjectName("", objects)
require.Equal(t, objects, actual)
})
}
func TestTrimAfterObjectID(t *testing.T) {
var (
objects []*ObjectInfo
ids []*object.ID
numberOfIDS = 3
)
for i := 0; i < numberOfIDS; i++ {
id := randID(t)
objects = append(objects, &ObjectInfo{id: id})
ids = append(ids, id)
}
t.Run("existing id", func(t *testing.T) {
actual := trimAfterObjectID(ids[0].String(), objects)
require.Equal(t, objects[1:], actual)
})
t.Run("second to last id", func(t *testing.T) {
actual := trimAfterObjectID(ids[len(ids)-2].String(), objects)
require.Equal(t, objects[len(objects)-1:], actual)
})
t.Run("non-existing id", func(t *testing.T) {
actual := trimAfterObjectID("z", objects)
require.Nil(t, actual)
})
t.Run("last id", func(t *testing.T) {
actual := trimAfterObjectID(ids[len(ids)-1].String(), objects)
require.Empty(t, actual)
})
t.Run("empty id", func(t *testing.T) {
actual := trimAfterObjectID("", objects)
require.Nil(t, actual)
})
}
func TestObjectsListCache(t *testing.T) {
var (
cacheSize = 10
objects []*ObjectInfo
userKey = "key"
)
for i := 0; i < cacheSize; i++ {
id := randID(t)
objects = append(objects, &ObjectInfo{id: id, Name: id.String()})
}
sort.Slice(objects, func(i, j int) bool {
return objects[i].Name < objects[j].Name
})
t.Run("lifetime", func(t *testing.T) {
var (
cache = newListObjectsCache(testingCacheLifetime)
cacheKey = cacheOptions{key: userKey}
)
cache.Put(cacheKey, objects)
condition := func() bool {
return cache.Get(cacheKey) == nil
}
require.Never(t, condition, cache.cacheLifetime, time.Second)
require.Eventually(t, condition, time.Second, 10*time.Millisecond)
})
t.Run("get cache with empty delimiter, empty prefix", func(t *testing.T) {
var (
cache = newListObjectsCache(testingCacheLifetime)
cacheKey = cacheOptions{key: userKey}
)
cache.Put(cacheKey, objects)
actual := cache.Get(cacheKey)
require.Equal(t, len(objects), len(actual))
for i := range objects {
require.Equal(t, objects[i], actual[i])
}
})
t.Run("get cache with delimiter and prefix", func(t *testing.T) {
cacheKey := cacheOptions{
key: userKey,
delimiter: "/",
prefix: "dir",
}
cache := newListObjectsCache(testingCacheLifetime)
cache.Put(cacheKey, objects)
actual := cache.Get(cacheKey)
require.Equal(t, len(objects), len(actual))
for i := range objects {
require.Equal(t, objects[i], actual[i])
}
})
t.Run("get cache with other delimiter and prefix", func(t *testing.T) {
var (
cacheKey = cacheOptions{
key: userKey,
delimiter: "/",
prefix: "dir",
}
newKey = cacheOptions{
key: "key",
delimiter: "*",
prefix: "obj",
}
)
cache := newListObjectsCache(testingCacheLifetime)
cache.Put(cacheKey, objects)
actual := cache.Get(newKey)
require.Nil(t, actual)
})
t.Run("get cache with non-existing key", func(t *testing.T) {
var (
cacheKey = cacheOptions{
key: userKey,
}
newKey = cacheOptions{
key: "asdf",
}
)
cache := newListObjectsCache(testingCacheLifetime)
cache.Put(cacheKey, objects)
actual := cache.Get(newKey)
require.Nil(t, actual)
})
}