feature/165-speed_up_listing #294

Merged
alexvanin merged 22 commits from dkirillov/frostfs-s3-gw:feature/165-speed_up_listing into master 2024-09-04 19:51:13 +00:00
2 changed files with 61 additions and 26 deletions
Showing only changes of commit a74d498df2 - Show all commits

View file

@ -15,7 +15,6 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer/encryption"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/pkg/service/tree"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zaptest"
)
@ -74,37 +73,68 @@ func TestListObjectsWithOldTreeNodes(t *testing.T) {
srcEnc, err := encryption.NewParams([]byte("1234567890qwertyuiopasdfghjklzxc"))
require.NoError(t, err)
objInfo := createTestObject(hc, bktInfo, objName, *srcEnc)
_ = objInfo
prm := &tree.GetNodesParams{
BktInfo: bktInfo,
TreeID: "version",
Path: []string{objName},
LatestOnly: true,
AllAttrs: true,
n := 10
objInfos := make([]*data.ObjectInfo, n)
for i := 0; i < n; i++ {
objInfos[i] = createTestObject(hc, bktInfo, objName+strconv.Itoa(i), *srcEnc)
}
nodes, err := hc.treeMock.GetNodes(hc.Context(), prm)
require.NoError(t, err)
require.Len(t, nodes, 1)
node := nodes[0]
meta := make(map[string]string, len(node.GetMeta()))
for _, m := range node.GetMeta() {
if m.GetKey() != "Created" && m.GetKey() != "Owner" {
meta[m.GetKey()] = string(m.GetValue())
sort.Slice(objInfos, func(i, j int) bool { return objInfos[i].Name < objInfos[j].Name })
makeAllTreeObjectsOld(hc, bktInfo)
listV1 := listObjectsV1(hc, bktName, "", "", "", -1)
checkListOldNodes(hc, listV1.Contents, objInfos)
listV2 := listObjectsV2(hc, bktName, "", "", "", "", -1)
checkListOldNodes(hc, listV2.Contents, objInfos)
listVers := listObjectsVersions(hc, bktName, "", "", "", "", -1)
checkListVersionsOldNodes(hc, listVers.Version, objInfos)
}
func makeAllTreeObjectsOld(hc *handlerContext, bktInfo *data.BucketInfo) {
nodes, err := hc.treeMock.GetSubTree(hc.Context(), bktInfo, "version", 0, 0)
require.NoError(hc.t, err)
for _, node := range nodes {
if node.GetNodeID() == 0 {
continue
}
meta := make(map[string]string, len(node.GetMeta()))
for _, m := range node.GetMeta() {
if m.GetKey() != "Created" && m.GetKey() != "Owner" {
meta[m.GetKey()] = string(m.GetValue())
}
}
err = hc.treeMock.MoveNode(hc.Context(), bktInfo, "version", node.GetNodeID(), node.GetParentID(), meta)
require.NoError(hc.t, err)
}
}
err = hc.treeMock.MoveNode(hc.Context(), bktInfo, "version", node.GetNodeID(), node.GetParentID(), meta)
require.NoError(t, err)
func checkListOldNodes(hc *handlerContext, list []Object, objInfos []*data.ObjectInfo) {
require.Len(hc.t, list, len(objInfos))
for i := range list {
require.Equal(hc.t, objInfos[i].Name, list[i].Key)
realSize, err := layer.GetObjectSize(objInfos[i])
require.NoError(hc.t, err)
require.Equal(hc.t, objInfos[i].Owner.EncodeToString(), list[i].Owner.ID)
require.Equal(hc.t, objInfos[i].Created.UTC().Format(time.RFC3339), list[i].LastModified)
require.Equal(hc.t, realSize, list[i].Size)
}
}
list := listObjectsV1(hc, bktName, "", "", "", -1)
require.Len(t, list.Contents, 1)
realSize, err := layer.GetObjectSize(objInfo)
require.NoError(t, err)
require.Equal(t, objInfo.Owner.EncodeToString(), list.Contents[0].Owner.ID)
require.Equal(t, objInfo.Created.UTC().Format(time.RFC3339), list.Contents[0].LastModified)
require.Equal(t, realSize, list.Contents[0].Size)
func checkListVersionsOldNodes(hc *handlerContext, list []ObjectVersionResponse, objInfos []*data.ObjectInfo) {
require.Len(hc.t, list, len(objInfos))
for i := range list {
require.Equal(hc.t, objInfos[i].Name, list[i].Key)
realSize, err := layer.GetObjectSize(objInfos[i])
require.NoError(hc.t, err)
require.Equal(hc.t, objInfos[i].Owner.EncodeToString(), list[i].Owner.ID)
require.Equal(hc.t, objInfos[i].Created.UTC().Format(time.RFC3339), list[i].LastModified)
require.Equal(hc.t, realSize, list[i].Size)
}
}
func TestListObjectsContextCanceled(t *testing.T) {
@ -657,6 +687,7 @@ func listObjectsV2(hc *handlerContext, bktName, prefix, delimiter, startAfter, c
func listObjectsV2Ext(hc *handlerContext, bktName, prefix, delimiter, startAfter, continuationToken, encodingType string, maxKeys int) *ListObjectsV2Response {
query := prepareCommonListObjectsQuery(prefix, delimiter, maxKeys)
query.Add("fetch-owner", "true")
if len(startAfter) != 0 {
query.Add("start-after", startAfter)
}

View file

@ -214,6 +214,8 @@ func (n *layer) getLatestObjectsVersions(ctx context.Context, p commonLatestVers
return nil, nil, fmt.Errorf("failed to get next object from stream: %w", err)
}
sort.Slice(objects, func(i, j int) bool { return objects[i].NodeVersion.FilePath < objects[j].NodeVersion.FilePath })
if len(objects) > p.MaxKeys {
next = objects[p.MaxKeys]
n.putListLatestVersionsSession(ctx, p, session, objects)
@ -241,6 +243,8 @@ func (n *layer) getAllObjectsVersions(ctx context.Context, p commonVersionsListi
allObjects := handleGeneratedVersions(objOutCh, p, session)
sort.SliceStable(allObjects, func(i, j int) bool { return allObjects[i].NodeVersion.FilePath < allObjects[j].NodeVersion.FilePath })
if err = <-errorCh; err != nil {
return nil, false, fmt.Errorf("failed to get next object from stream: %w", err)
}