forked from TrueCloudLab/frostfs-s3-gw
Merge pull request #175 from masterSplinter01/112-list-objects-cache
ListObjectsV2 cache
This commit is contained in:
commit
a0fb14d91e
5 changed files with 163 additions and 33 deletions
|
@ -1,7 +1,6 @@
|
||||||
package handler
|
package handler
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
|
||||||
"encoding/xml"
|
"encoding/xml"
|
||||||
"fmt"
|
"fmt"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
@ -13,7 +12,6 @@ import (
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/policy"
|
"github.com/nspcc-dev/neofs-node/pkg/policy"
|
||||||
"github.com/nspcc-dev/neofs-s3-gw/api"
|
"github.com/nspcc-dev/neofs-s3-gw/api"
|
||||||
"github.com/nspcc-dev/neofs-s3-gw/api/layer"
|
"github.com/nspcc-dev/neofs-s3-gw/api/layer"
|
||||||
"github.com/nspcc-dev/neofs-s3-gw/creds/accessbox"
|
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -107,7 +105,7 @@ func (h *handler) CreateBucketHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
p.BoxData, err = getBoxData(r.Context())
|
p.BoxData, err = layer.GetBoxData(r.Context())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
h.registerAndSendError(w, r, err, "could not get boxData")
|
h.registerAndSendError(w, r, err, "could not get boxData")
|
||||||
return
|
return
|
||||||
|
@ -172,17 +170,3 @@ func parseBasicACL(basicACL string) (uint32, error) {
|
||||||
return uint32(value), nil
|
return uint32(value), nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func getBoxData(ctx context.Context) (*accessbox.Box, error) {
|
|
||||||
var boxData *accessbox.Box
|
|
||||||
data, ok := ctx.Value(api.BoxData).(*accessbox.Box)
|
|
||||||
if !ok || data == nil {
|
|
||||||
return nil, fmt.Errorf("couldn't get box data from context")
|
|
||||||
}
|
|
||||||
|
|
||||||
boxData = data
|
|
||||||
if boxData.Gate == nil {
|
|
||||||
boxData.Gate = &accessbox.GateData{}
|
|
||||||
}
|
|
||||||
return boxData, nil
|
|
||||||
}
|
|
||||||
|
|
|
@ -24,6 +24,7 @@ type (
|
||||||
layer struct {
|
layer struct {
|
||||||
pool pool.Pool
|
pool pool.Pool
|
||||||
log *zap.Logger
|
log *zap.Logger
|
||||||
|
cache ObjectsListV2Cache
|
||||||
}
|
}
|
||||||
|
|
||||||
// Params stores basic API parameters.
|
// Params stores basic API parameters.
|
||||||
|
@ -130,6 +131,7 @@ func NewLayer(log *zap.Logger, conns pool.Pool) Client {
|
||||||
return &layer{
|
return &layer{
|
||||||
pool: conns,
|
pool: conns,
|
||||||
log: log,
|
log: log,
|
||||||
|
cache: newListObjectsCache(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -13,6 +13,7 @@ import (
|
||||||
cid "github.com/nspcc-dev/neofs-api-go/pkg/container/id"
|
cid "github.com/nspcc-dev/neofs-api-go/pkg/container/id"
|
||||||
"github.com/nspcc-dev/neofs-api-go/pkg/object"
|
"github.com/nspcc-dev/neofs-api-go/pkg/object"
|
||||||
"github.com/nspcc-dev/neofs-s3-gw/api"
|
"github.com/nspcc-dev/neofs-s3-gw/api"
|
||||||
|
"github.com/nspcc-dev/neofs-s3-gw/creds/accessbox"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -55,7 +56,7 @@ type (
|
||||||
}
|
}
|
||||||
|
|
||||||
allObjectParams struct {
|
allObjectParams struct {
|
||||||
Bucket string
|
Bucket *BucketInfo
|
||||||
Delimiter string
|
Delimiter string
|
||||||
Prefix string
|
Prefix string
|
||||||
StartAfter string
|
StartAfter string
|
||||||
|
@ -210,14 +211,19 @@ func (n *layer) ListObjectsV1(ctx context.Context, p *ListObjectsParamsV1) (*Lis
|
||||||
var (
|
var (
|
||||||
err error
|
err error
|
||||||
result ListObjectsInfoV1
|
result ListObjectsInfoV1
|
||||||
|
bkt *BucketInfo
|
||||||
)
|
)
|
||||||
|
|
||||||
if p.MaxKeys == 0 {
|
if p.MaxKeys == 0 {
|
||||||
return &result, nil
|
return &result, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if bkt, err = n.GetBucketInfo(ctx, p.Bucket); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
allObjects, err := n.listSortedAllObjects(ctx, allObjectParams{
|
allObjects, err := n.listSortedAllObjects(ctx, allObjectParams{
|
||||||
Bucket: p.Bucket,
|
Bucket: bkt,
|
||||||
Prefix: p.Prefix,
|
Prefix: p.Prefix,
|
||||||
Delimiter: p.Delimiter,
|
Delimiter: p.Delimiter,
|
||||||
StartAfter: p.Marker,
|
StartAfter: p.Marker,
|
||||||
|
@ -252,17 +258,33 @@ func (n *layer) ListObjectsV2(ctx context.Context, p *ListObjectsParamsV2) (*Lis
|
||||||
err error
|
err error
|
||||||
result ListObjectsInfoV2
|
result ListObjectsInfoV2
|
||||||
allObjects []*ObjectInfo
|
allObjects []*ObjectInfo
|
||||||
|
bkt *BucketInfo
|
||||||
|
cacheKey string
|
||||||
|
box *accessbox.Box
|
||||||
)
|
)
|
||||||
|
|
||||||
if p.MaxKeys == 0 {
|
if p.MaxKeys == 0 {
|
||||||
return &result, nil
|
return &result, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if bkt, err = n.GetBucketInfo(ctx, p.Bucket); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if box, err = GetBoxData(ctx); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
cacheKey = createKey(box.Gate.AccessKey, bkt.CID)
|
||||||
|
|
||||||
if p.ContinuationToken != "" {
|
if p.ContinuationToken != "" {
|
||||||
// find cache with continuation token
|
allObjects = n.cache.Get(p.ContinuationToken, cacheKey)
|
||||||
} else {
|
allObjects = trimStartAfter(p.StartAfter, allObjects)
|
||||||
|
}
|
||||||
|
|
||||||
|
if allObjects == nil {
|
||||||
allObjects, err = n.listSortedAllObjects(ctx, allObjectParams{
|
allObjects, err = n.listSortedAllObjects(ctx, allObjectParams{
|
||||||
Bucket: p.Bucket,
|
Bucket: bkt,
|
||||||
Prefix: p.Prefix,
|
Prefix: p.Prefix,
|
||||||
Delimiter: p.Delimiter,
|
Delimiter: p.Delimiter,
|
||||||
StartAfter: p.StartAfter,
|
StartAfter: p.StartAfter,
|
||||||
|
@ -270,13 +292,20 @@ func (n *layer) ListObjectsV2(ctx context.Context, p *ListObjectsParamsV2) (*Lis
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if p.ContinuationToken != "" {
|
||||||
|
allObjects = trimAfterObjectID(p.ContinuationToken, allObjects)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(allObjects) > p.MaxKeys {
|
if len(allObjects) > p.MaxKeys {
|
||||||
result.IsTruncated = true
|
result.IsTruncated = true
|
||||||
|
|
||||||
|
restObjects := allObjects[p.MaxKeys:]
|
||||||
|
n.cache.Put(cacheKey, restObjects)
|
||||||
|
result.NextContinuationToken = restObjects[0].id.String()
|
||||||
|
|
||||||
allObjects = allObjects[:p.MaxKeys]
|
allObjects = allObjects[:p.MaxKeys]
|
||||||
// add creating of cache here
|
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, ov := range allObjects {
|
for _, ov := range allObjects {
|
||||||
|
@ -292,14 +321,11 @@ func (n *layer) ListObjectsV2(ctx context.Context, p *ListObjectsParamsV2) (*Lis
|
||||||
func (n *layer) listSortedAllObjects(ctx context.Context, p allObjectParams) ([]*ObjectInfo, error) {
|
func (n *layer) listSortedAllObjects(ctx context.Context, p allObjectParams) ([]*ObjectInfo, error) {
|
||||||
var (
|
var (
|
||||||
err error
|
err error
|
||||||
bkt *BucketInfo
|
|
||||||
ids []*object.ID
|
ids []*object.ID
|
||||||
uniqNames = make(map[string]bool)
|
uniqNames = make(map[string]bool)
|
||||||
)
|
)
|
||||||
|
|
||||||
if bkt, err = n.GetBucketInfo(ctx, p.Bucket); err != nil {
|
if ids, err = n.objectSearch(ctx, &findParams{cid: p.Bucket.CID}); err != nil {
|
||||||
return nil, err
|
|
||||||
} else if ids, err = n.objectSearch(ctx, &findParams{cid: bkt.CID}); err != nil {
|
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -308,14 +334,14 @@ func (n *layer) listSortedAllObjects(ctx context.Context, p allObjectParams) ([]
|
||||||
for _, id := range ids {
|
for _, id := range ids {
|
||||||
addr := object.NewAddress()
|
addr := object.NewAddress()
|
||||||
addr.SetObjectID(id)
|
addr.SetObjectID(id)
|
||||||
addr.SetContainerID(bkt.CID)
|
addr.SetContainerID(p.Bucket.CID)
|
||||||
|
|
||||||
meta, err := n.objectHead(ctx, addr)
|
meta, err := n.objectHead(ctx, addr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
n.log.Warn("could not fetch object meta", zap.Error(err))
|
n.log.Warn("could not fetch object meta", zap.Error(err))
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if oi := objectInfoFromMeta(bkt, meta, p.Prefix, p.Delimiter); oi != nil {
|
if oi := objectInfoFromMeta(p.Bucket, meta, p.Prefix, p.Delimiter); oi != nil {
|
||||||
// use only unique dir names
|
// use only unique dir names
|
||||||
if _, ok := uniqNames[oi.Name]; ok {
|
if _, ok := uniqNames[oi.Name]; ok {
|
||||||
continue
|
continue
|
||||||
|
@ -336,3 +362,23 @@ func (n *layer) listSortedAllObjects(ctx context.Context, p allObjectParams) ([]
|
||||||
|
|
||||||
return objects, nil
|
return objects, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func trimStartAfter(startAfter string, objects []*ObjectInfo) []*ObjectInfo {
|
||||||
|
if objects != nil && len(startAfter) != 0 && objects[0].Name <= startAfter {
|
||||||
|
for i := range objects {
|
||||||
|
if objects[i].Name > startAfter {
|
||||||
|
return objects[i:]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return objects
|
||||||
|
}
|
||||||
|
|
||||||
|
func trimAfterObjectID(id string, objects []*ObjectInfo) []*ObjectInfo {
|
||||||
|
for i, obj := range objects {
|
||||||
|
if obj.ID().String() == id {
|
||||||
|
return objects[i:]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return objects
|
||||||
|
}
|
||||||
|
|
79
api/layer/object_cache.go
Normal file
79
api/layer/object_cache.go
Normal file
|
@ -0,0 +1,79 @@
|
||||||
|
package layer
|
||||||
|
|
||||||
|
import (
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
cid "github.com/nspcc-dev/neofs-api-go/pkg/container/id"
|
||||||
|
)
|
||||||
|
|
||||||
|
/*
|
||||||
|
This is an implementation of a cache for ListObjectsV2 which we return to users by ContinuationToken.
|
||||||
|
|
||||||
|
The cache is a map which has a key: (access_key from AccessBox) + container_id and a value: list of objects with
|
||||||
|
creation time. After putting a record we start a timer (via time.AfterFunc) that removes the record after
|
||||||
|
defaultCacheLifetime value.
|
||||||
|
|
||||||
|
ContinuationToken in our gateway is an objectID in NeoFS.
|
||||||
|
|
||||||
|
We don't keep ContinuationToken in this structure because we assume that users who received the token can reconnect
|
||||||
|
to other gateways and they should be able to get a list of objects.
|
||||||
|
When we receive the token from the user we just try to find the cache and then we return the list of objects which
|
||||||
|
starts from this token (i.e. objectID).
|
||||||
|
*/
|
||||||
|
|
||||||
|
// ObjectsListV2Cache provides interface for cache of ListObjectsV2 in a layer struct.
|
||||||
|
type (
|
||||||
|
ObjectsListV2Cache interface {
|
||||||
|
Get(token string, key string) []*ObjectInfo
|
||||||
|
Put(key string, objects []*ObjectInfo)
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
defaultCacheLifetime = time.Second * 60
|
||||||
|
)
|
||||||
|
|
||||||
|
type (
|
||||||
|
listObjectsCache struct {
|
||||||
|
caches map[string]cache
|
||||||
|
mtx sync.RWMutex
|
||||||
|
}
|
||||||
|
cache struct {
|
||||||
|
list []*ObjectInfo
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
func newListObjectsCache() *listObjectsCache {
|
||||||
|
return &listObjectsCache{
|
||||||
|
caches: make(map[string]cache),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *listObjectsCache) Get(token, key string) []*ObjectInfo {
|
||||||
|
l.mtx.RLock()
|
||||||
|
defer l.mtx.RUnlock()
|
||||||
|
if val, ok := l.caches[key]; ok {
|
||||||
|
return trimAfterObjectID(token, val.list)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *listObjectsCache) Put(key string, objects []*ObjectInfo) {
|
||||||
|
var c cache
|
||||||
|
|
||||||
|
l.mtx.Lock()
|
||||||
|
defer l.mtx.Unlock()
|
||||||
|
c.list = objects
|
||||||
|
l.caches[key] = c
|
||||||
|
time.AfterFunc(defaultCacheLifetime, func() {
|
||||||
|
l.mtx.Lock()
|
||||||
|
delete(l.caches, key)
|
||||||
|
l.mtx.Unlock()
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func createKey(accessKey string, cid *cid.ID) string {
|
||||||
|
return accessKey + cid.String()
|
||||||
|
}
|
|
@ -1,6 +1,8 @@
|
||||||
package layer
|
package layer
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
"os"
|
"os"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
|
@ -8,6 +10,8 @@ import (
|
||||||
|
|
||||||
"github.com/nspcc-dev/neofs-api-go/pkg/object"
|
"github.com/nspcc-dev/neofs-api-go/pkg/object"
|
||||||
"github.com/nspcc-dev/neofs-api-go/pkg/owner"
|
"github.com/nspcc-dev/neofs-api-go/pkg/owner"
|
||||||
|
"github.com/nspcc-dev/neofs-s3-gw/api"
|
||||||
|
"github.com/nspcc-dev/neofs-s3-gw/creds/accessbox"
|
||||||
)
|
)
|
||||||
|
|
||||||
type (
|
type (
|
||||||
|
@ -172,3 +176,18 @@ func (o *ObjectInfo) ID() *object.ID { return o.id }
|
||||||
|
|
||||||
// IsDir allows to check if object is a directory.
|
// IsDir allows to check if object is a directory.
|
||||||
func (o *ObjectInfo) IsDir() bool { return o.isDir }
|
func (o *ObjectInfo) IsDir() bool { return o.isDir }
|
||||||
|
|
||||||
|
// GetBoxData extracts accessbox.Box from context.
|
||||||
|
func GetBoxData(ctx context.Context) (*accessbox.Box, error) {
|
||||||
|
var boxData *accessbox.Box
|
||||||
|
data, ok := ctx.Value(api.BoxData).(*accessbox.Box)
|
||||||
|
if !ok || data == nil {
|
||||||
|
return nil, fmt.Errorf("couldn't get box data from context")
|
||||||
|
}
|
||||||
|
|
||||||
|
boxData = data
|
||||||
|
if boxData.Gate == nil {
|
||||||
|
boxData.Gate = &accessbox.GateData{}
|
||||||
|
}
|
||||||
|
return boxData, nil
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in a new issue