package handler

import (
	"context"
	"fmt"
	"net/http"
	"net/url"
	"sort"
	"strconv"
	"strings"
	"testing"
	"time"

	"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api"
	"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/cache"
	"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
	"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer"
	"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer/encryption"
	"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/logs"
	"github.com/stretchr/testify/require"
	"go.uber.org/zap"
	"go.uber.org/zap/zaptest"
	"go.uber.org/zap/zaptest/observer"
)

func TestParseContinuationToken(t *testing.T) {
	var err error

	t.Run("empty token", func(t *testing.T) {
		var queryValues = map[string][]string{
			"continuation-token": {""},
		}
		_, err = parseContinuationToken(queryValues)
		require.Error(t, err)
	})

	t.Run("invalid not empty token", func(t *testing.T) {
		var queryValues = map[string][]string{
			"continuation-token": {"asd"},
		}
		_, err = parseContinuationToken(queryValues)
		require.Error(t, err)
	})

	t.Run("valid token", func(t *testing.T) {
		tokenStr := "75BTT5Z9o79XuKdUeGqvQbqDnxu6qWcR5EhxW8BXFf8t"
		var queryValues = map[string][]string{
			"continuation-token": {tokenStr},
		}
		token, err := parseContinuationToken(queryValues)
		require.NoError(t, err)
		require.Equal(t, tokenStr, token)
	})
}

func TestListObjectNullVersions(t *testing.T) {
	hc := prepareHandlerContext(t)

	bktName, objName := "bucket-versioning-enabled", "object"
	createTestBucket(hc, bktName)

	putObjectContent(hc, bktName, objName, "content")
	putBucketVersioning(t, hc, bktName, true)
	putObjectContent(hc, bktName, objName, "content2")

	result := listVersions(t, hc, bktName)

	require.Len(t, result.Version, 2)
	require.Equal(t, data.UnversionedObjectVersionID, result.Version[1].VersionID)
}

func TestListObjectsWithOldTreeNodes(t *testing.T) {
	hc := prepareHandlerContext(t)

	bktName, objName := "bucket-versioning-enabled", "object"
	bktInfo := createTestBucket(hc, bktName)

	srcEnc, err := encryption.NewParams([]byte("1234567890qwertyuiopasdfghjklzxc"))
	require.NoError(t, err)

	n := 10
	objInfos := make([]*data.ObjectInfo, n)
	for i := 0; i < n; i++ {
		objInfos[i] = createTestObject(hc, bktInfo, objName+strconv.Itoa(i), *srcEnc)
	}
	sort.Slice(objInfos, func(i, j int) bool { return objInfos[i].Name < objInfos[j].Name })

	makeAllTreeObjectsOld(hc, bktInfo)

	listV1 := listObjectsV1(hc, bktName, "", "", "", -1)
	checkListOldNodes(hc, listV1.Contents, objInfos)

	listV2 := listObjectsV2(hc, bktName, "", "", "", "", -1)
	checkListOldNodes(hc, listV2.Contents, objInfos)

	listVers := listObjectsVersions(hc, bktName, "", "", "", "", -1)
	checkListVersionsOldNodes(hc, listVers.Version, objInfos)
}

func TestListObjectsVersionsSkipLogTaggingNodesError(t *testing.T) {
	loggerCore, observedLog := observer.New(zap.DebugLevel)
	log := zap.New(loggerCore)

	hcBase, err := prepareHandlerContextBase(layer.DefaultCachesConfigs(log))
	require.NoError(t, err)
	hc := &handlerContext{
		handlerContextBase: hcBase,
		t:                  t,
	}

	bktName, objName := "bucket-versioning-enabled", "versions/object"
	bktInfo := createTestBucket(hc, bktName)

	createTestObject(hc, bktInfo, objName, encryption.Params{})
	createTestObject(hc, bktInfo, objName, encryption.Params{})

	putObjectTagging(hc.t, hc, bktName, objName, map[string]string{"tag1": "val1"})

	listObjectsVersions(hc, bktName, "", "", "", "", -1)

	filtered := observedLog.Filter(func(entry observer.LoggedEntry) bool {
		return strings.Contains(entry.Message, logs.ParseTreeNode)
	})
	require.Empty(t, filtered)
}

func makeAllTreeObjectsOld(hc *handlerContext, bktInfo *data.BucketInfo) {
	nodes, err := hc.treeMock.GetSubTree(hc.Context(), bktInfo, "version", []uint64{0}, 0, true)
	require.NoError(hc.t, err)

	for _, node := range nodes {
		if node.GetNodeID()[0] == 0 {
			continue
		}
		meta := make(map[string]string, len(node.GetMeta()))
		for _, m := range node.GetMeta() {
			if m.GetKey() != "Created" && m.GetKey() != "Owner" {
				meta[m.GetKey()] = string(m.GetValue())
			}
		}

		err = hc.treeMock.MoveNode(hc.Context(), bktInfo, "version", node.GetNodeID()[0], node.GetParentID()[0], meta)
		require.NoError(hc.t, err)
	}
}

func checkListOldNodes(hc *handlerContext, list []Object, objInfos []*data.ObjectInfo) {
	require.Len(hc.t, list, len(objInfos))
	for i := range list {
		require.Equal(hc.t, objInfos[i].Name, list[i].Key)
		realSize, err := layer.GetObjectSize(objInfos[i])
		require.NoError(hc.t, err)
		require.Equal(hc.t, objInfos[i].Owner.EncodeToString(), list[i].Owner.ID)
		require.Equal(hc.t, objInfos[i].Created.UTC().Format(time.RFC3339), list[i].LastModified)
		require.Equal(hc.t, realSize, list[i].Size)
	}
}

func checkListVersionsOldNodes(hc *handlerContext, list []ObjectVersionResponse, objInfos []*data.ObjectInfo) {
	require.Len(hc.t, list, len(objInfos))
	for i := range list {
		require.Equal(hc.t, objInfos[i].Name, list[i].Key)
		realSize, err := layer.GetObjectSize(objInfos[i])
		require.NoError(hc.t, err)
		require.Equal(hc.t, objInfos[i].Owner.EncodeToString(), list[i].Owner.ID)
		require.Equal(hc.t, objInfos[i].Created.UTC().Format(time.RFC3339), list[i].LastModified)
		require.Equal(hc.t, realSize, list[i].Size)
	}
}

func TestListObjectsContextCanceled(t *testing.T) {
	log := zaptest.NewLogger(t)
	layerCfg := layer.DefaultCachesConfigs(log)
	layerCfg.SessionList.Lifetime = time.Hour
	layerCfg.SessionList.Size = 1

	hcBase, err := prepareHandlerContextBase(layerCfg)
	require.NoError(t, err)
	hc := &handlerContext{
		handlerContextBase: hcBase,
		t:                  t,
	}

	bktName := "bucket-versioning-enabled"
	bktInfo := createTestBucket(hc, bktName)

	for i := 0; i < 4; i++ {
		putObject(hc, bktName, "object"+strconv.Itoa(i))
	}

	result := listObjectsV1(hc, bktName, "", "", "", 2)
	session := hc.cache.GetListSession(hc.owner, cache.CreateListSessionCacheKey(bktInfo.CID, "", result.NextMarker))
	// invoke list again to trigger cache eviction
	// (use empty prefix to check that context canceled on replace)
	listObjectsV1(hc, bktName, "", "", "", 2)
	checkContextCanceled(session.Context, t)

	result2 := listObjectsV2(hc, bktName, "", "", "", "", 2)
	session2 := hc.cache.GetListSession(hc.owner, cache.CreateListSessionCacheKey(bktInfo.CID, "", result2.NextContinuationToken))
	// invoke list again to trigger cache eviction
	// (use non-empty prefix to check that context canceled on cache eviction)
	listObjectsV2(hc, bktName, "o", "", "", "", 2)
	checkContextCanceled(session2.Context, t)

	result3 := listObjectsVersions(hc, bktName, "", "", "", "", 2)
	session3 := hc.cache.GetListSession(hc.owner, cache.CreateListSessionCacheKey(bktInfo.CID, "", result3.NextVersionIDMarker))
	// invoke list again to trigger cache eviction
	listObjectsVersions(hc, bktName, "o", "", "", "", 2)
	checkContextCanceled(session3.Context, t)
}

func checkContextCanceled(ctx context.Context, t *testing.T) {
	select {
	case <-ctx.Done():
	case <-time.After(10 * time.Second):
	}
	require.ErrorIs(t, ctx.Err(), context.Canceled)
}

func TestListObjectsLatestVersions(t *testing.T) {
	hc := prepareHandlerContext(t)

	bktName := "bucket-versioning-enabled"
	createTestBucket(hc, bktName)
	putBucketVersioning(t, hc, bktName, true)

	objName1, objName2 := "object1", "object2"
	objContent1, objContent2 := "content1", "content2"

	putObjectContent(hc, bktName, objName1, objContent1)
	hdr1 := putObjectContent(hc, bktName, objName1, objContent2)
	putObjectContent(hc, bktName, objName2, objContent1)
	hdr2 := putObjectContent(hc, bktName, objName2, objContent2)

	t.Run("listv1", func(t *testing.T) {
		result := listObjectsV1(hc, bktName, "", "", "", -1)

		require.Len(t, result.Contents, 2)
		require.Equal(t, objName1, result.Contents[0].Key)
		require.Equal(t, hdr1.Get(api.ETag), result.Contents[0].ETag)
		require.Equal(t, objName2, result.Contents[1].Key)
		require.Equal(t, hdr2.Get(api.ETag), result.Contents[1].ETag)
	})

	t.Run("listv2", func(t *testing.T) {
		result := listObjectsV2(hc, bktName, "", "", "", "", -1)

		require.Len(t, result.Contents, 2)
		require.Equal(t, objName1, result.Contents[0].Key)
		require.Equal(t, hdr1.Get(api.ETag), result.Contents[0].ETag)
		require.Equal(t, objName2, result.Contents[1].Key)
		require.Equal(t, hdr2.Get(api.ETag), result.Contents[1].ETag)
	})
}

func TestListObjectsVersionsPaging(t *testing.T) {
	hc := prepareHandlerContext(t)

	bktName := "bucket-versioning-enabled"
	createTestBucket(hc, bktName)

	n := 12

	var objects []string
	for i := 0; i < n; i++ {
		objects = append(objects, "objects"+strconv.Itoa(i))
		putObjectContent(hc, bktName, objects[i], "content")
	}
	sort.Strings(objects)

	result := &ListObjectsVersionsResponse{IsTruncated: true}
	for result.IsTruncated {
		result = listObjectsVersions(hc, bktName, "", "", result.NextKeyMarker, result.NextVersionIDMarker, n/3)

		for i, version := range result.Version {
			if objects[i] != version.Key {
				t.Errorf("expected: '%s', got: '%s'", objects[i], version.Key)
			}
		}
		objects = objects[len(result.Version):]
	}

	require.Empty(t, objects)
}

func TestListObjectsVersionsCorrectIsLatestFlag(t *testing.T) {
	hc := prepareHandlerContext(t)

	bktName := "bucket-versioning-enabled"
	createVersionedBucket(hc, bktName)

	objName1, objName2 := "obj1", "obj2"

	n := 9
	listSize := 3
	headers := make([]http.Header, n)
	// objects uploaded: ["obj1"-v1, "obj1"-v2, "obj1"-v3, "obj2"-v1, "obj2"-v2, "obj2"-v3, "obj2"-v4, "obj2"-v5, "obj2"-v6]
	for i := 0; i < n; i++ {
		objName := objName1
		if i >= listSize {
			objName = objName2
		}
		headers[i] = putObjectContent(hc, bktName, objName, fmt.Sprintf("content/%d", i))
	}

	versions := listObjectsVersions(hc, bktName, "", "", "", "", listSize)
	// expected objects: ["obj1"-v3, "obj1"-v2, "obj1"-v1]
	checkListVersionsParts(t, versions, formReverseVersionResponse(objName1, headers[:listSize], true))

	versions = listObjectsVersions(hc, bktName, "", "", versions.NextKeyMarker, versions.NextVersionIDMarker, listSize)
	// expected objects: ["obj2"-v6, "obj2"-v5, "obj2"-v4]
	checkListVersionsParts(t, versions, formReverseVersionResponse(objName2, headers[2*listSize:], true))

	versions = listObjectsVersions(hc, bktName, "", "", versions.NextKeyMarker, versions.NextVersionIDMarker, listSize)
	// expected objects: ["obj2"-v3, "obj2"-v2, "obj2"-v1]
	checkListVersionsParts(t, versions, formReverseVersionResponse(objName2, headers[listSize:2*listSize], false))
}

func formReverseVersionResponse(objName string, headers []http.Header, isLatest bool) []ObjectVersionResponse {
	res := make([]ObjectVersionResponse, len(headers))

	for i, h := range headers {
		ind := len(headers) - 1 - i
		res[ind] = ObjectVersionResponse{
			ETag:      h.Get(api.ETag),
			IsLatest:  isLatest && ind == 0,
			Key:       objName,
			VersionID: h.Get(api.AmzVersionID),
		}
	}

	return res
}

func checkListVersionsParts(t *testing.T, versions *ListObjectsVersionsResponse, expected []ObjectVersionResponse) {
	require.Len(t, versions.Version, len(expected))
	for i, res := range versions.Version {
		require.Equal(t, expected[i].Key, res.Key)
		require.Equal(t, expected[i].ETag, res.ETag)
		require.Equal(t, expected[i].VersionID, res.VersionID)
		require.Equal(t, expected[i].IsLatest, res.IsLatest)
	}
}

func TestS3CompatibilityBucketListV2BothContinuationTokenStartAfter(t *testing.T) {
	tc := prepareHandlerContext(t)

	bktName := "bucket-for-listing"
	objects := []string{"bar", "baz", "foo", "quxx"}
	bktInfo, _ := createBucketAndObject(tc, bktName, objects[0])

	for _, objName := range objects[1:] {
		createTestObject(tc, bktInfo, objName, encryption.Params{})
	}

	listV2Response1 := listObjectsV2(tc, bktName, "", "", "bar", "", 1)
	nextContinuationToken := listV2Response1.NextContinuationToken
	require.Equal(t, "baz", listV2Response1.Contents[0].Key)

	listV2Response2 := listObjectsV2(tc, bktName, "", "", "bar", nextContinuationToken, -1)

	require.Equal(t, nextContinuationToken, listV2Response2.ContinuationToken)
	require.Equal(t, "bar", listV2Response2.StartAfter)
	require.False(t, listV2Response2.IsTruncated)

	require.Equal(t, "foo", listV2Response2.Contents[0].Key)
	require.Equal(t, "quxx", listV2Response2.Contents[1].Key)
}

func TestS3BucketListV2EncodingBasic(t *testing.T) {
	hc := prepareHandlerContext(t)

	bktName := "bucket-for-listing-v1-encoding"
	bktInfo := createTestBucket(hc, bktName)

	objects := []string{"foo+1/bar", "foo/bar/xyzzy", "quux ab/thud", "asdf+b"}
	for _, objName := range objects {
		createTestObject(hc, bktInfo, objName, encryption.Params{})
	}

	query := make(url.Values)
	query.Add("delimiter", "/")
	query.Add("encoding-type", "url")

	w, r := prepareTestFullRequest(hc, bktName, "", query, nil)
	hc.Handler().ListObjectsV2Handler(w, r)
	assertStatus(hc.t, w, http.StatusOK)
	listV2Response := &ListObjectsV2Response{}
	parseTestResponse(hc.t, w, listV2Response)

	require.Equal(t, "/", listV2Response.Delimiter)
	require.Len(t, listV2Response.Contents, 1)
	require.Equal(t, "asdf%2Bb", listV2Response.Contents[0].Key)
	require.Len(t, listV2Response.CommonPrefixes, 3)
	require.Equal(t, "foo%2B1/", listV2Response.CommonPrefixes[0].Prefix)
	require.Equal(t, "foo/", listV2Response.CommonPrefixes[1].Prefix)
	require.Equal(t, "quux%20ab/", listV2Response.CommonPrefixes[2].Prefix)
}

func TestS3BucketListDelimiterBasic(t *testing.T) {
	tc := prepareHandlerContext(t)

	bktName := "bucket-for-listing"
	objects := []string{"foo/bar", "foo/bar/xyzzy", "quux/thud", "asdf"}
	bktInfo, _ := createBucketAndObject(tc, bktName, objects[0])

	for _, objName := range objects[1:] {
		createTestObject(tc, bktInfo, objName, encryption.Params{})
	}

	listV1Response := listObjectsV1(tc, bktName, "", "/", "", -1)
	require.Equal(t, "/", listV1Response.Delimiter)
	require.Equal(t, "asdf", listV1Response.Contents[0].Key)
	require.Len(t, listV1Response.CommonPrefixes, 2)
	require.Equal(t, "foo/", listV1Response.CommonPrefixes[0].Prefix)
	require.Equal(t, "quux/", listV1Response.CommonPrefixes[1].Prefix)
}

func TestS3BucketListEmpty(t *testing.T) {
	hc := prepareHandlerContext(t)

	bktName := "bucket-for-listing"
	createTestBucket(hc, bktName)

	versions := listObjectsVersions(hc, bktName, "", "", "", "", -1)
	require.Empty(t, versions.Version)
	require.Empty(t, versions.DeleteMarker)
	require.Empty(t, versions.CommonPrefixes)
}

func TestS3BucketListV2PrefixAlt(t *testing.T) {
	hc := prepareHandlerContext(t)

	bktName := "bucket-for-listing"
	createTestBucket(hc, bktName)

	objects := []string{"bar", "baz", "foo"}
	for _, objName := range objects {
		putObject(hc, bktName, objName)
	}

	response := listObjectsV2(hc, bktName, "ba", "", "", "", -1)

	require.Equal(t, "ba", response.Prefix)
	require.Len(t, response.Contents, 2)
	require.Equal(t, "bar", response.Contents[0].Key)
	require.Equal(t, "baz", response.Contents[1].Key)
	require.Empty(t, response.CommonPrefixes)
}

func TestS3BucketListV2PrefixNotExist(t *testing.T) {
	hc := prepareHandlerContext(t)

	bktName := "bucket-for-listing"
	createTestBucket(hc, bktName)

	objects := []string{"foo/bar", "foo/baz", "quux"}
	for _, objName := range objects {
		putObject(hc, bktName, objName)
	}

	response := listObjectsV2(hc, bktName, "d", "", "", "", -1)

	require.Equal(t, "d", response.Prefix)
	require.Empty(t, response.Contents)
	require.Empty(t, response.CommonPrefixes)
}

func TestS3BucketListV2PrefixUnreadable(t *testing.T) {
	hc := prepareHandlerContext(t)

	bktName := "bucket-for-listing"
	createTestBucket(hc, bktName)

	objects := []string{"foo/bar", "foo/baz", "quux"}
	for _, objName := range objects {
		putObject(hc, bktName, objName)
	}

	response := listObjectsV2(hc, bktName, "\x0a", "", "", "", -1)

	require.Equal(t, "\x0a", response.Prefix)
	require.Empty(t, response.Contents)
	require.Empty(t, response.CommonPrefixes)
}

func TestS3BucketListV2PrefixDelimiterAlt(t *testing.T) {
	hc := prepareHandlerContext(t)

	bktName := "bucket-for-listing"
	createTestBucket(hc, bktName)

	objects := []string{"bar", "bazar", "cab", "foo"}
	for _, objName := range objects {
		putObject(hc, bktName, objName)
	}

	response := listObjectsV2(hc, bktName, "ba", "a", "", "", -1)

	require.Equal(t, "ba", response.Prefix)
	require.Equal(t, "a", response.Delimiter)
	require.Len(t, response.Contents, 1)
	require.Equal(t, "bar", response.Contents[0].Key)
	require.Len(t, response.CommonPrefixes, 1)
	require.Equal(t, "baza", response.CommonPrefixes[0].Prefix)
}

func TestS3BucketListV2PrefixDelimiterDelimiterNotExist(t *testing.T) {
	hc := prepareHandlerContext(t)

	bktName := "bucket-for-listing"
	createTestBucket(hc, bktName)

	objects := []string{"b/a/c", "b/a/g", "b/a/r", "g"}
	for _, objName := range objects {
		putObject(hc, bktName, objName)
	}

	response := listObjectsV2(hc, bktName, "b", "z", "", "", -1)

	require.Len(t, response.Contents, 3)
	require.Equal(t, "b/a/c", response.Contents[0].Key)
	require.Equal(t, "b/a/g", response.Contents[1].Key)
	require.Equal(t, "b/a/r", response.Contents[2].Key)
	require.Empty(t, response.CommonPrefixes)
}

func TestS3BucketListV2PrefixDelimiterPrefixDelimiterNotExist(t *testing.T) {
	hc := prepareHandlerContext(t)

	bktName := "bucket-for-listing"
	createTestBucket(hc, bktName)

	objects := []string{"b/a/c", "b/a/g", "b/a/r", "g"}
	for _, objName := range objects {
		putObject(hc, bktName, objName)
	}

	response := listObjectsV2(hc, bktName, "y", "z", "", "", -1)

	require.Empty(t, response.Contents)
	require.Empty(t, response.CommonPrefixes)
}

func TestS3BucketListV2DelimiterPercentage(t *testing.T) {
	tc := prepareHandlerContext(t)

	bktName := "bucket-for-listing"
	objects := []string{"b%ar", "b%az", "c%ab", "foo"}
	bktInfo, _ := createBucketAndObject(tc, bktName, objects[0])

	for _, objName := range objects[1:] {
		createTestObject(tc, bktInfo, objName, encryption.Params{})
	}

	listV2Response := listObjectsV2(tc, bktName, "", "%", "", "", -1)
	require.Equal(t, "%", listV2Response.Delimiter)
	require.Len(t, listV2Response.Contents, 1)
	require.Equal(t, "foo", listV2Response.Contents[0].Key)
	require.Len(t, listV2Response.CommonPrefixes, 2)
	require.Equal(t, "b%", listV2Response.CommonPrefixes[0].Prefix)
	require.Equal(t, "c%", listV2Response.CommonPrefixes[1].Prefix)
}

func TestS3BucketListDelimiterPrefix(t *testing.T) {
	hc := prepareHandlerContext(t)

	bktName := "bucket-for-listing"
	bktInfo := createTestBucket(hc, bktName)

	objects := []string{"asdf", "boo/bar", "boo/baz/xyzzy", "cquux/thud", "cquux/bla"}
	for _, objName := range objects {
		createTestObject(hc, bktInfo, objName, encryption.Params{})
	}

	var empty []string
	delim := "/"
	prefix := ""

	marker := validateListV1(t, hc, bktName, prefix, delim, "", 1, true, []string{"asdf"}, empty, "asdf")
	marker = validateListV1(t, hc, bktName, prefix, delim, marker, 1, true, empty, []string{"boo/"}, "boo/")
	validateListV1(t, hc, bktName, prefix, delim, marker, 1, false, empty, []string{"cquux/"}, "")

	marker = validateListV1(t, hc, bktName, prefix, delim, "", 2, true, []string{"asdf"}, []string{"boo/"}, "boo/")
	validateListV1(t, hc, bktName, prefix, delim, marker, 2, false, empty, []string{"cquux/"}, "")

	prefix = "boo/"
	marker = validateListV1(t, hc, bktName, prefix, delim, "", 1, true, []string{"boo/bar"}, empty, "boo/bar")
	validateListV1(t, hc, bktName, prefix, delim, marker, 1, false, empty, []string{"boo/baz/"}, "")

	validateListV1(t, hc, bktName, prefix, delim, "", 2, false, []string{"boo/bar"}, []string{"boo/baz/"}, "")
}

func TestS3BucketListV2DelimiterPrefix(t *testing.T) {
	tc := prepareHandlerContext(t)

	bktName := "bucket-for-listingv2"
	objects := []string{"asdf", "boo/bar", "boo/baz/xyzzy", "cquux/thud", "cquux/bla"}
	bktInfo, _ := createBucketAndObject(tc, bktName, objects[0])

	for _, objName := range objects[1:] {
		createTestObject(tc, bktInfo, objName, encryption.Params{})
	}

	var empty []string
	delim := "/"
	prefix := ""

	continuationToken := validateListV2(t, tc, bktName, prefix, delim, "", 1, true, false, []string{"asdf"}, empty)
	continuationToken = validateListV2(t, tc, bktName, prefix, delim, continuationToken, 1, true, false, empty, []string{"boo/"})
	validateListV2(t, tc, bktName, prefix, delim, continuationToken, 1, false, true, empty, []string{"cquux/"})

	continuationToken = validateListV2(t, tc, bktName, prefix, delim, "", 2, true, false, []string{"asdf"}, []string{"boo/"})
	validateListV2(t, tc, bktName, prefix, delim, continuationToken, 2, false, true, empty, []string{"cquux/"})

	prefix = "boo/"
	continuationToken = validateListV2(t, tc, bktName, prefix, delim, "", 1, true, false, []string{"boo/bar"}, empty)
	validateListV2(t, tc, bktName, prefix, delim, continuationToken, 1, false, true, empty, []string{"boo/baz/"})

	validateListV2(t, tc, bktName, prefix, delim, "", 2, false, true, []string{"boo/bar"}, []string{"boo/baz/"})
}

func TestS3BucketListDelimiterPrefixUnderscore(t *testing.T) {
	hc := prepareHandlerContext(t)

	bktName := "bucket-for-listing"
	bktInfo := createTestBucket(hc, bktName)

	objects := []string{"_obj1_", "_under1/bar", "_under1/baz/xyzzy", "_under2/thud", "_under2/bla"}
	for _, objName := range objects {
		createTestObject(hc, bktInfo, objName, encryption.Params{})
	}

	var empty []string
	delim := "/"
	prefix := ""

	marker := validateListV1(t, hc, bktName, prefix, delim, "", 1, true, []string{"_obj1_"}, empty, "_obj1_")
	marker = validateListV1(t, hc, bktName, prefix, delim, marker, 1, true, empty, []string{"_under1/"}, "_under1/")
	validateListV1(t, hc, bktName, prefix, delim, marker, 1, false, empty, []string{"_under2/"}, "")

	marker = validateListV1(t, hc, bktName, prefix, delim, "", 2, true, []string{"_obj1_"}, []string{"_under1/"}, "_under1/")
	validateListV1(t, hc, bktName, prefix, delim, marker, 2, false, empty, []string{"_under2/"}, "")

	prefix = "_under1/"
	marker = validateListV1(t, hc, bktName, prefix, delim, "", 1, true, []string{"_under1/bar"}, empty, "_under1/bar")
	validateListV1(t, hc, bktName, prefix, delim, marker, 1, false, empty, []string{"_under1/baz/"}, "")

	validateListV1(t, hc, bktName, prefix, delim, "", 2, false, []string{"_under1/bar"}, []string{"_under1/baz/"}, "")
}

func TestS3BucketListDelimiterNotSkipSpecial(t *testing.T) {
	hc := prepareHandlerContext(t)

	bktName := "bucket-for-listing"
	bktInfo := createTestBucket(hc, bktName)

	objects := []string{"0/"}
	for i := 1000; i < 1999; i++ {
		objects = append(objects, fmt.Sprintf("0/%d", i))
	}

	objects2 := []string{"1999", "1999#", "1999+", "2000"}
	objects = append(objects, objects2...)

	for _, objName := range objects {
		createTestObject(hc, bktInfo, objName, encryption.Params{})
	}

	delimiter := "/"
	list := listObjectsV1(hc, bktName, "", delimiter, "", -1)

	require.Equal(t, delimiter, list.Delimiter)
	require.Equal(t, []CommonPrefix{{Prefix: "0/"}}, list.CommonPrefixes)

	require.Len(t, list.Contents, len(objects2))
	for i := 0; i < len(list.Contents); i++ {
		require.Equal(t, objects2[i], list.Contents[i].Key)
	}
}

func TestMintVersioningListObjectVersionsVersionIDContinuation(t *testing.T) {
	hc := prepareHandlerContext(t)

	bktName, objName := "mint-bucket-for-listing-versions", "objName"
	createTestBucket(hc, bktName)
	putBucketVersioning(t, hc, bktName, true)

	length := 10
	objects := make([]string, length)
	for i := 0; i < length; i++ {
		objects[i] = objName
		putObject(hc, bktName, objName)
	}

	maxKeys := 5

	page1 := listObjectsVersions(hc, bktName, "", "", "", "", maxKeys)
	require.Len(t, page1.Version, maxKeys)
	checkVersionsNames(t, page1, objects)
	require.Equal(t, page1.Version[maxKeys-1].VersionID, page1.NextVersionIDMarker)
	require.True(t, page1.IsTruncated)
	require.Empty(t, page1.KeyMarker)
	require.Empty(t, page1.VersionIDMarker)

	page2 := listObjectsVersions(hc, bktName, "", "", page1.NextKeyMarker, page1.NextVersionIDMarker, maxKeys)
	require.Len(t, page2.Version, maxKeys)
	checkVersionsNames(t, page1, objects)
	require.Empty(t, page2.NextVersionIDMarker)
	require.False(t, page2.IsTruncated)
	require.Equal(t, page1.NextKeyMarker, page2.KeyMarker)
	require.Equal(t, page1.NextVersionIDMarker, page2.VersionIDMarker)
}

func TestListObjectVersionsEncoding(t *testing.T) {
	hc := prepareHandlerContext(t)

	bktName := "bucket-for-listing-versions-encoding"
	bktInfo := createTestBucket(hc, bktName)
	putBucketVersioning(t, hc, bktName, true)

	objects := []string{"foo()/bar", "foo()/bar/xyzzy", "auux ab/thud", "asdf+b"}
	for _, objName := range objects {
		createTestObject(hc, bktInfo, objName, encryption.Params{})
	}
	deleteObject(t, hc, bktName, "auux ab/thud", "")

	listResponse := listObjectsVersionsURL(hc, bktName, "foo(", ")", "", "", -1)

	require.Len(t, listResponse.CommonPrefixes, 1)
	require.Equal(t, "foo%28%29", listResponse.CommonPrefixes[0].Prefix)
	require.Len(t, listResponse.Version, 0)
	require.Len(t, listResponse.DeleteMarker, 0)
	require.Equal(t, "foo%28", listResponse.Prefix)
	require.Equal(t, "%29", listResponse.Delimiter)
	require.Equal(t, "url", listResponse.EncodingType)
	require.Equal(t, maxObjectList, listResponse.MaxKeys)

	listResponse = listObjectsVersions(hc, bktName, "", "", "", "", 1)
	require.Empty(t, listResponse.EncodingType)

	listResponse = listObjectsVersionsURL(hc, bktName, "", "", listResponse.NextKeyMarker, listResponse.NextVersionIDMarker, 3)

	require.Len(t, listResponse.CommonPrefixes, 0)
	require.Len(t, listResponse.Version, 2)
	require.Equal(t, "auux%20ab/thud", listResponse.Version[0].Key)
	require.False(t, listResponse.Version[0].IsLatest)
	require.Equal(t, "foo%28%29/bar", listResponse.Version[1].Key)
	require.Len(t, listResponse.DeleteMarker, 1)
	require.Equal(t, "auux%20ab/thud", listResponse.DeleteMarker[0].Key)
	require.True(t, listResponse.DeleteMarker[0].IsLatest)
	require.Equal(t, "asdf%2Bb", listResponse.KeyMarker)
	require.Equal(t, "foo%28%29/bar", listResponse.NextKeyMarker)
	require.Equal(t, "url", listResponse.EncodingType)
	require.Equal(t, 3, listResponse.MaxKeys)
}

func checkVersionsNames(t *testing.T, versions *ListObjectsVersionsResponse, names []string) {
	for i, v := range versions.Version {
		require.Equal(t, names[i], v.Key)
	}
}

func listObjectsV2(hc *handlerContext, bktName, prefix, delimiter, startAfter, continuationToken string, maxKeys int) *ListObjectsV2Response {
	return listObjectsV2Ext(hc, bktName, prefix, delimiter, startAfter, continuationToken, "", maxKeys)
}

func listObjectsV2Ext(hc *handlerContext, bktName, prefix, delimiter, startAfter, continuationToken, encodingType string, maxKeys int) *ListObjectsV2Response {
	query := prepareCommonListObjectsQuery(prefix, delimiter, maxKeys)
	query.Add("fetch-owner", "true")
	if len(startAfter) != 0 {
		query.Add("start-after", startAfter)
	}
	if len(continuationToken) != 0 {
		query.Add("continuation-token", continuationToken)
	}
	if len(encodingType) != 0 {
		query.Add("encoding-type", encodingType)
	}

	w, r := prepareTestFullRequest(hc, bktName, "", query, nil)
	hc.Handler().ListObjectsV2Handler(w, r)
	assertStatus(hc.t, w, http.StatusOK)
	res := &ListObjectsV2Response{}
	parseTestResponse(hc.t, w, res)
	return res
}

func validateListV1(t *testing.T, tc *handlerContext, bktName, prefix, delimiter, marker string, maxKeys int,
	isTruncated bool, checkObjects, checkPrefixes []string, nextMarker string) string {
	response := listObjectsV1(tc, bktName, prefix, delimiter, marker, maxKeys)

	require.Equal(t, isTruncated, response.IsTruncated)
	require.Equal(t, nextMarker, response.NextMarker)

	require.Len(t, response.Contents, len(checkObjects))
	for i := 0; i < len(checkObjects); i++ {
		require.Equal(t, checkObjects[i], response.Contents[i].Key)
	}

	require.Len(t, response.CommonPrefixes, len(checkPrefixes))
	for i := 0; i < len(checkPrefixes); i++ {
		require.Equal(t, checkPrefixes[i], response.CommonPrefixes[i].Prefix)
	}

	return response.NextMarker
}

func validateListV2(t *testing.T, tc *handlerContext, bktName, prefix, delimiter, continuationToken string, maxKeys int,
	isTruncated, last bool, checkObjects, checkPrefixes []string) string {
	response := listObjectsV2(tc, bktName, prefix, delimiter, "", continuationToken, maxKeys)

	require.Equal(t, isTruncated, response.IsTruncated)
	require.Equal(t, last, len(response.NextContinuationToken) == 0)

	require.Len(t, response.Contents, len(checkObjects))
	for i := 0; i < len(checkObjects); i++ {
		require.Equal(t, checkObjects[i], response.Contents[i].Key)
	}

	require.Len(t, response.CommonPrefixes, len(checkPrefixes))
	for i := 0; i < len(checkPrefixes); i++ {
		require.Equal(t, checkPrefixes[i], response.CommonPrefixes[i].Prefix)
	}

	return response.NextContinuationToken
}

func prepareCommonListObjectsQuery(prefix, delimiter string, maxKeys int) url.Values {
	query := make(url.Values)

	if len(delimiter) != 0 {
		query.Add("delimiter", delimiter)
	}
	if len(prefix) != 0 {
		query.Add("prefix", prefix)
	}
	if maxKeys != -1 {
		query.Add("max-keys", strconv.Itoa(maxKeys))
	}

	return query
}

func listObjectsV1(hc *handlerContext, bktName, prefix, delimiter, marker string, maxKeys int) *ListObjectsV1Response {
	query := prepareCommonListObjectsQuery(prefix, delimiter, maxKeys)
	if len(marker) != 0 {
		query.Add("marker", marker)
	}

	w, r := prepareTestFullRequest(hc, bktName, "", query, nil)
	hc.Handler().ListObjectsV1Handler(w, r)
	assertStatus(hc.t, w, http.StatusOK)
	res := &ListObjectsV1Response{}
	parseTestResponse(hc.t, w, res)
	return res
}

func listObjectsVersions(hc *handlerContext, bktName, prefix, delimiter, keyMarker, versionIDMarker string, maxKeys int) *ListObjectsVersionsResponse {
	return listObjectsVersionsBase(hc, bktName, prefix, delimiter, keyMarker, versionIDMarker, maxKeys, false)
}

func listObjectsVersionsURL(hc *handlerContext, bktName, prefix, delimiter, keyMarker, versionIDMarker string, maxKeys int) *ListObjectsVersionsResponse {
	return listObjectsVersionsBase(hc, bktName, prefix, delimiter, keyMarker, versionIDMarker, maxKeys, true)
}

func listObjectsVersionsBase(hc *handlerContext, bktName, prefix, delimiter, keyMarker, versionIDMarker string, maxKeys int, encode bool) *ListObjectsVersionsResponse {
	query := prepareCommonListObjectsQuery(prefix, delimiter, maxKeys)
	if len(keyMarker) != 0 {
		query.Add("key-marker", keyMarker)
	}
	if len(versionIDMarker) != 0 {
		query.Add("version-id-marker", versionIDMarker)
	}
	if encode {
		query.Add("encoding-type", "url")
	}

	w, r := prepareTestFullRequest(hc, bktName, "", query, nil)
	hc.Handler().ListBucketObjectVersionsHandler(w, r)
	assertStatus(hc.t, w, http.StatusOK)
	res := &ListObjectsVersionsResponse{}
	parseTestResponse(hc.t, w, res)
	return res
}