[#616] Reduce number of requests during listing

Check if object is directory before request to NeoFS

Signed-off-by: Denis Kirillov <denis@nspcc.ru>
This commit is contained in:
Denis Kirillov 2022-08-13 13:03:49 +03:00 committed by Kirillov Denis
parent 107d8a9033
commit 3824151699
5 changed files with 135 additions and 98 deletions

View file

@ -209,7 +209,7 @@ func TestDeleteObjectFromListCache(t *testing.T) {
bktName, objName := "bucket-for-removal", "object-to-delete"
bktInfo, objInfo := createVersionedBucketAndObject(t, tc, bktName, objName)
versions := listObjectsV1(t, tc, bktName)
versions := listObjectsV1(t, tc, bktName, "", "", "", -1)
require.Len(t, versions.Contents, 1)
checkFound(t, tc, bktName, objName, objInfo.VersionID())
@ -217,7 +217,7 @@ func TestDeleteObjectFromListCache(t *testing.T) {
checkNotFound(t, tc, bktName, objName, objInfo.VersionID())
// check cache is clean after object removal
versions = listObjectsV1(t, tc, bktName)
versions = listObjectsV1(t, tc, bktName, "", "", "", -1)
require.Len(t, versions.Contents, 0)
require.False(t, existInMockedNeoFS(tc, bktInfo, objInfo))
@ -316,15 +316,6 @@ func listVersions(t *testing.T, tc *handlerContext, bktName string) *ListObjects
return res
}
func listObjectsV1(t *testing.T, tc *handlerContext, bktName string) *ListObjectsV1Response {
w, r := prepareTestRequest(t, bktName, "", nil)
tc.Handler().ListObjectsV1Handler(w, r)
assertStatus(t, w, http.StatusOK)
res := &ListObjectsV1Response{}
parseTestResponse(t, w, res)
return res
}
func putObject(t *testing.T, tc *handlerContext, bktName, objName string) {
body := bytes.NewReader([]byte("content"))
w, r := prepareTestPayloadRequest(bktName, objName, body)

View file

@ -87,11 +87,11 @@ func TestS3CompatibilityBucketListV2BothContinuationTokenStartAfter(t *testing.T
createTestObject(tc.Context(), t, tc, bktInfo, objName)
}
listV2Response1 := listObjectsV2(t, tc, bktName, "bar", "", 1)
listV2Response1 := listObjectsV2(t, tc, bktName, "", "", "bar", "", 1)
nextContinuationToken := listV2Response1.NextContinuationToken
require.Equal(t, "baz", listV2Response1.Contents[0].Key)
listV2Response2 := listObjectsV2(t, tc, bktName, "bar", nextContinuationToken, -1)
listV2Response2 := listObjectsV2(t, tc, bktName, "", "", "bar", nextContinuationToken, -1)
require.Equal(t, nextContinuationToken, listV2Response2.ContinuationToken)
require.Equal(t, "bar", listV2Response2.StartAfter)
@ -101,17 +101,33 @@ func TestS3CompatibilityBucketListV2BothContinuationTokenStartAfter(t *testing.T
require.Equal(t, "quxx", listV2Response2.Contents[1].Key)
}
func listObjectsV2(t *testing.T, tc *handlerContext, bktName, startAfter, continuationToken string, maxKeys int) *ListObjectsV2Response {
query := make(url.Values)
func TestS3BucketListDelimiterBasic(t *testing.T) {
tc := prepareHandlerContext(t)
bktName := "bucket-for-listing"
objects := []string{"foo/bar", "foo/bar/xyzzy", "quux/thud", "asdf"}
bktInfo, _ := createBucketAndObject(t, tc, bktName, objects[0])
for _, objName := range objects[1:] {
createTestObject(tc.Context(), t, tc, bktInfo, objName)
}
listV1Response := listObjectsV1(t, tc, bktName, "", "/", "", -1)
require.Equal(t, "/", listV1Response.Delimiter)
require.Equal(t, "asdf", listV1Response.Contents[0].Key)
require.Len(t, listV1Response.CommonPrefixes, 2)
require.Equal(t, "foo/", listV1Response.CommonPrefixes[0].Prefix)
require.Equal(t, "quux/", listV1Response.CommonPrefixes[1].Prefix)
}
func listObjectsV2(t *testing.T, tc *handlerContext, bktName, prefix, delimiter, startAfter, continuationToken string, maxKeys int) *ListObjectsV2Response {
query := prepareCommonListObjectsQuery(prefix, delimiter, maxKeys)
if len(startAfter) != 0 {
query.Add("start-after", startAfter)
}
if len(continuationToken) != 0 {
query.Add("continuation-token", continuationToken)
}
if maxKeys != -1 {
query.Add("max-keys", strconv.Itoa(maxKeys))
}
w, r := prepareTestFullRequest(t, bktName, "", query, nil)
tc.Handler().ListObjectsV2Handler(w, r)
@ -120,3 +136,33 @@ func listObjectsV2(t *testing.T, tc *handlerContext, bktName, startAfter, contin
parseTestResponse(t, w, res)
return res
}
func prepareCommonListObjectsQuery(prefix, delimiter string, maxKeys int) url.Values {
query := make(url.Values)
if len(delimiter) != 0 {
query.Add("delimiter", delimiter)
}
if len(prefix) != 0 {
query.Add("prefix", prefix)
}
if maxKeys != -1 {
query.Add("max-keys", strconv.Itoa(maxKeys))
}
return query
}
func listObjectsV1(t *testing.T, tc *handlerContext, bktName, prefix, delimiter, marker string, maxKeys int) *ListObjectsV1Response {
query := prepareCommonListObjectsQuery(prefix, delimiter, maxKeys)
if len(marker) != 0 {
query.Add("marker", marker)
}
w, r := prepareTestFullRequest(t, bktName, "", query, nil)
tc.Handler().ListObjectsV1Handler(w, r)
assertStatus(t, w, http.StatusOK)
res := &ListObjectsV1Response{}
parseTestResponse(t, w, res)
return res
}

View file

@ -682,19 +682,19 @@ func IsSystemHeader(key string) bool {
}
func shouldSkip(node *data.NodeVersion, p allObjectParams, existed map[string]struct{}) bool {
filepath := node.FilePath
if len(p.Delimiter) > 0 {
tail := strings.TrimPrefix(filepath, p.Prefix)
index := strings.Index(tail, p.Delimiter)
if index >= 0 {
filepath = p.Prefix + tail[:index+1]
}
}
if _, ok := existed[filepath]; ok {
if node.IsDeleteMarker() {
return true
}
if filepath <= p.Marker {
filePath := node.FilePath
if dirName := tryDirectoryName(node, p.Prefix, p.Delimiter); len(dirName) != 0 {
filePath = dirName
}
if _, ok := existed[filePath]; ok {
return true
}
if filePath <= p.Marker {
return true
}
@ -707,7 +707,7 @@ func shouldSkip(node *data.NodeVersion, p allObjectParams, existed map[string]st
}
}
existed[filepath] = struct{}{}
existed[filePath] = struct{}{}
return false
}
@ -736,6 +736,10 @@ func triageExtendedObjects(allObjects []*data.ExtendedObjectInfo) (prefixes []st
}
func (n *layer) objectInfoFromObjectsCacheOrNeoFS(ctx context.Context, bktInfo *data.BucketInfo, node *data.NodeVersion, prefix, delimiter string) (oi *data.ObjectInfo) {
if oiDir := tryDirectory(bktInfo, node, prefix, delimiter); oiDir != nil {
return oiDir
}
extObjInfo := n.objCache.GetObject(newAddress(bktInfo.CID, node.OID))
if extObjInfo != nil {
return extObjInfo.ObjectInfo
@ -755,6 +759,38 @@ func (n *layer) objectInfoFromObjectsCacheOrNeoFS(ctx context.Context, bktInfo *
return processObjectInfoName(oi, prefix, delimiter)
}
func tryDirectory(bktInfo *data.BucketInfo, node *data.NodeVersion, prefix, delimiter string) *data.ObjectInfo {
dirName := tryDirectoryName(node, prefix, delimiter)
if len(dirName) == 0 {
return nil
}
return &data.ObjectInfo{
CID: bktInfo.CID,
IsDir: true,
IsDeleteMarker: node.IsDeleteMarker(),
Bucket: bktInfo.Name,
Name: dirName,
}
}
// tryDirectoryName forms directory name by prefix and delimiter.
// If node isn't a directory empty string is returned.
// This function doesn't check if node has a prefix. It must do a caller.
func tryDirectoryName(node *data.NodeVersion, prefix, delimiter string) string {
if len(delimiter) == 0 {
return ""
}
tail := strings.TrimPrefix(node.FilePath, prefix)
index := strings.Index(tail, delimiter)
if index >= 0 {
return prefix + tail[:index+1]
}
return ""
}
func (n *layer) transformNeofsError(ctx context.Context, err error) error {
if err == nil {
return nil

View file

@ -121,30 +121,6 @@ func addEncryptionHeaders(meta map[string]string, enc encryption.Params) error {
return nil
}
// processObjectInfoName fixes name in objectInfo structure based on prefix and
// delimiter from user request. If name does not contain prefix, nil value is
// returned. If name should be modified, then function returns copy of objectInfo
// structure.
func processObjectInfoName(oi *data.ObjectInfo, prefix, delimiter string) *data.ObjectInfo {
if !strings.HasPrefix(oi.Name, prefix) {
return nil
}
if len(delimiter) == 0 {
return oi
}
copiedObjInfo := *oi
tail := strings.TrimPrefix(copiedObjInfo.Name, prefix)
index := strings.Index(tail, delimiter)
if index >= 0 {
copiedObjInfo.IsDir = true
copiedObjInfo.Size = 0
copiedObjInfo.Headers = nil
copiedObjInfo.ContentType = ""
copiedObjInfo.Name = prefix + tail[:index+1]
}
return &copiedObjInfo
}
func filenameFromObject(o *object.Object) string {
for _, attr := range o.Attributes() {
if attr.Key() == object.AttributeFileName {

View file

@ -3,14 +3,12 @@ package layer
import (
"encoding/hex"
"net/http"
"strconv"
"testing"
"time"
"github.com/nspcc-dev/neofs-s3-gw/api/data"
"github.com/nspcc-dev/neofs-sdk-go/checksum"
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
"github.com/nspcc-dev/neofs-sdk-go/object"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
"github.com/nspcc-dev/neofs-sdk-go/user"
"github.com/stretchr/testify/require"
@ -23,30 +21,6 @@ var (
defaultTestContentType = http.DetectContentType(defaultTestPayload)
)
func newTestObject(id oid.ID, bkt *data.BucketInfo, name string) *object.Object {
filename := object.NewAttribute()
filename.SetKey(object.AttributeFileName)
filename.SetValue(name)
created := object.NewAttribute()
created.SetKey(object.AttributeTimestamp)
created.SetValue(strconv.FormatInt(defaultTestCreated.Unix(), 10))
contentType := object.NewAttribute()
contentType.SetKey(object.AttributeContentType)
contentType.SetValue(defaultTestContentType)
obj := object.New()
obj.SetID(id)
obj.SetOwnerID(&bkt.Owner)
obj.SetContainerID(bkt.CID)
obj.SetPayload(defaultTestPayload)
obj.SetAttributes(*filename, *created, *contentType)
obj.SetPayloadSize(uint64(defaultTestPayloadLength))
return obj
}
func newTestInfo(obj oid.ID, bkt *data.BucketInfo, name string, isDir bool) *data.ObjectInfo {
var hashSum checksum.Checksum
info := &data.ObjectInfo{
@ -72,7 +46,16 @@ func newTestInfo(obj oid.ID, bkt *data.BucketInfo, name string, isDir bool) *dat
return info
}
func Test_objectInfoName(t *testing.T) {
func newTestNodeVersion(id oid.ID, name string) *data.NodeVersion {
return &data.NodeVersion{
BaseNodeVersion: data.BaseNodeVersion{
OID: id,
FilePath: name,
},
}
}
func TestTryDirectory(t *testing.T) {
var uid user.ID
var id oid.ID
var containerID cid.ID
@ -88,77 +71,82 @@ func Test_objectInfoName(t *testing.T) {
name string
prefix string
result *data.ObjectInfo
object *object.Object
node *data.NodeVersion
delimiter string
}{
{
name: "small.jpg",
result: newTestInfo(id, bkt, "small.jpg", false),
object: newTestObject(id, bkt, "small.jpg"),
result: nil,
node: newTestNodeVersion(id, "small.jpg"),
},
{
name: "small.jpg not matched prefix",
prefix: "big",
result: nil,
object: newTestObject(id, bkt, "small.jpg"),
node: newTestNodeVersion(id, "small.jpg"),
},
{
name: "small.jpg delimiter",
delimiter: "/",
result: newTestInfo(id, bkt, "small.jpg", false),
object: newTestObject(id, bkt, "small.jpg"),
result: nil,
node: newTestNodeVersion(id, "small.jpg"),
},
{
name: "test/small.jpg",
result: newTestInfo(id, bkt, "test/small.jpg", false),
object: newTestObject(id, bkt, "test/small.jpg"),
result: nil,
node: newTestNodeVersion(id, "test/small.jpg"),
},
{
name: "test/small.jpg with prefix and delimiter",
prefix: "test/",
delimiter: "/",
result: newTestInfo(id, bkt, "test/small.jpg", false),
object: newTestObject(id, bkt, "test/small.jpg"),
result: nil,
node: newTestNodeVersion(id, "test/small.jpg"),
},
{
name: "a/b/small.jpg",
prefix: "a",
result: newTestInfo(id, bkt, "a/b/small.jpg", false),
object: newTestObject(id, bkt, "a/b/small.jpg"),
result: nil,
node: newTestNodeVersion(id, "a/b/small.jpg"),
},
{
name: "a/b/small.jpg",
prefix: "a/",
delimiter: "/",
result: newTestInfo(id, bkt, "a/b/", true),
object: newTestObject(id, bkt, "a/b/small.jpg"),
node: newTestNodeVersion(id, "a/b/small.jpg"),
},
{
name: "a/b/c/small.jpg",
prefix: "a/",
delimiter: "/",
result: newTestInfo(id, bkt, "a/b/", true),
object: newTestObject(id, bkt, "a/b/c/small.jpg"),
node: newTestNodeVersion(id, "a/b/c/small.jpg"),
},
{
name: "a/b/c/small.jpg",
prefix: "a/b/c/s",
delimiter: "/",
result: newTestInfo(id, bkt, "a/b/c/small.jpg", false),
object: newTestObject(id, bkt, "a/b/c/small.jpg"),
result: nil,
node: newTestNodeVersion(id, "a/b/c/small.jpg"),
},
{
name: "a/b/c/big.jpg",
prefix: "a/b/",
delimiter: "/",
result: newTestInfo(id, bkt, "a/b/c/", true),
object: newTestObject(id, bkt, "a/b/c/big.jpg"),
node: newTestNodeVersion(id, "a/b/c/big.jpg"),
},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
info := processObjectInfoName(objectInfoFromMeta(bkt, tc.object), tc.prefix, tc.delimiter)
info := tryDirectory(bkt, tc.node, tc.prefix, tc.delimiter)
if tc.result != nil {
tc.result.Created = time.Time{}
tc.result.Owner = user.ID{}
}
require.Equal(t, tc.result, info)
})
}