[#151] index page: add native protocol support

Signed-off-by: Nikita Zinkevich <n.zinkevich@yadro.com>
This commit is contained in:
Nikita Zinkevich 2024-10-10 11:59:53 +03:00
parent 8fe8f2dcc2
commit b188457a15
12 changed files with 420 additions and 92 deletions

View file

@ -1,6 +1,7 @@
package handler
import (
"context"
"html/template"
"net/url"
"sort"
@ -10,6 +11,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/data"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/utils"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
"github.com/docker/go-units"
"github.com/valyala/fasthttp"
"go.uber.org/zap"
@ -25,19 +27,58 @@ const (
type (
BrowsePageData struct {
BucketName,
Prefix string
Objects []ResponseObject
BucketInfo *data.BucketInfo
Prefix string
Objects []ResponseObject
}
ResponseObject struct {
OID string
Created string
FileName string
FilePath string
Size string
IsDir bool
}
)
func newListObjectsResponseS3(attrs map[string]string) ResponseObject {
return ResponseObject{
OID: attrs[attrOID],
Created: attrs[attrCreated],
FileName: attrs[attrFileName],
Size: attrs[attrSize],
IsDir: attrs[attrOID] == "",
}
}
func newListObjectsResponseNative(attrs map[string]string) ResponseObject {
return ResponseObject{
OID: attrs[attrOID],
Created: attrs[object.AttributeTimestamp] + "000",
FileName: lastPathElement(attrs[object.AttributeFilePath]),
FilePath: attrs[object.AttributeFilePath],
Size: attrs[attrSize],
IsDir: false,
}
}
func getNextDir(filepath, prefix string) string {
restPath := strings.Replace(filepath, prefix, "", 1)
index := strings.Index(restPath, "/")
if index == -1 {
return ""
}
return restPath[:index]
}
func lastPathElement(path string) string {
index := strings.LastIndex(path, "/")
if index == len(path)-1 {
index = strings.LastIndex(path[:index], "/")
}
return path[index+1:]
}
func parseTimestamp(tstamp string) (time.Time, error) {
millis, err := strconv.ParseInt(tstamp, 10, 64)
if err != nil {
@ -47,16 +88,6 @@ func parseTimestamp(tstamp string) (time.Time, error) {
return time.UnixMilli(millis), nil
}
func NewResponseObject(nodes map[string]string) ResponseObject {
return ResponseObject{
OID: nodes[attrOID],
Created: nodes[attrCreated],
FileName: nodes[attrFileName],
Size: nodes[attrSize],
IsDir: nodes[attrOID] == "",
}
}
func formatTimestamp(strdate string) string {
date, err := parseTimestamp(strdate)
if err != nil || date.IsZero() {
@ -94,12 +125,9 @@ func trimPrefix(encPrefix string) string {
return prefix[:slashIndex]
}
func urlencode(prefix, filename string) string {
func urlencode(path string) string {
var res strings.Builder
path := filename
if prefix != "" {
path = strings.Join([]string{prefix, filename}, "/")
}
prefixParts := strings.Split(path, "/")
for _, prefixPart := range prefixParts {
prefixPart = "/" + url.PathEscape(prefixPart)
@ -112,28 +140,32 @@ func urlencode(prefix, filename string) string {
return res.String()
}
func (h *Handler) browseObjects(c *fasthttp.RequestCtx, bucketInfo *data.BucketInfo, prefix string) {
log := h.log.With(zap.String("bucket", bucketInfo.Name))
type browseParams struct {
bucketInfo *data.BucketInfo
template string
listObjects func(ctx context.Context, bucketName *data.BucketInfo, prefix string) ([]ResponseObject, error)
prefix string
}
func (h *Handler) browseObjects(c *fasthttp.RequestCtx, p browseParams) {
log := h.log.With(
zap.String("bucket", p.bucketInfo.Name),
zap.String("container", p.bucketInfo.CID.EncodeToString()),
zap.String("prefix", p.prefix),
)
ctx := utils.GetContextFromRequest(c)
nodes, err := h.listObjects(ctx, bucketInfo, prefix)
objects, err := p.listObjects(ctx, p.bucketInfo, p.prefix)
if err != nil {
logAndSendBucketError(c, log, err)
return
}
respObjects := make([]ResponseObject, len(nodes))
for i, node := range nodes {
respObjects[i] = NewResponseObject(node)
}
sort.Slice(respObjects, func(i, j int) bool {
if respObjects[i].IsDir == respObjects[j].IsDir {
return respObjects[i].FileName < respObjects[j].FileName
sort.Slice(objects, func(i, j int) bool {
if objects[i].IsDir == objects[j].IsDir {
return objects[i].FileName < objects[j].FileName
}
return respObjects[i].IsDir
return objects[i].IsDir
})
indexTemplate := h.config.IndexPageTemplate()
tmpl, err := template.New("index").Funcs(template.FuncMap{
"formatTimestamp": formatTimestamp,
@ -141,15 +173,15 @@ func (h *Handler) browseObjects(c *fasthttp.RequestCtx, bucketInfo *data.BucketI
"trimPrefix": trimPrefix,
"urlencode": urlencode,
"parentDir": parentDir,
}).Parse(indexTemplate)
}).Parse(p.template)
if err != nil {
logAndSendBucketError(c, log, err)
return
}
if err = tmpl.Execute(c, &BrowsePageData{
BucketName: bucketInfo.Name,
Prefix: prefix,
Objects: respObjects,
BucketInfo: p.bucketInfo,
Prefix: p.prefix,
Objects: objects,
}); err != nil {
logAndSendBucketError(c, log, err)
return

View file

@ -23,10 +23,9 @@ import (
// DownloadByAddressOrBucketName handles download requests using simple cid/oid or bucketname/key format.
func (h *Handler) DownloadByAddressOrBucketName(c *fasthttp.RequestCtx) {
test, _ := c.UserValue("oid").(string)
var id oid.ID
err := id.DecodeString(test)
if err != nil {
testCID, _ := c.UserValue("cid").(string)
var cntID cid.ID
if err := cntID.DecodeString(testCID); err != nil {
h.byObjectName(c, h.receiveFile)
} else {
h.byAddress(c, h.receiveFile)
@ -45,7 +44,7 @@ func (h *Handler) DownloadByAttribute(c *fasthttp.RequestCtx) {
h.byAttribute(c, h.receiveFile)
}
func (h *Handler) search(ctx context.Context, cnrID *cid.ID, key, val string, op object.SearchMatchType) (ResObjectSearch, error) {
func (h *Handler) search(ctx context.Context, cnrID cid.ID, key, val string, op object.SearchMatchType) (ResObjectSearch, error) {
filters := object.NewSearchFilters()
filters.AddRootFilter()
filters.AddFilter(key, val, op)
@ -54,7 +53,7 @@ func (h *Handler) search(ctx context.Context, cnrID *cid.ID, key, val string, op
PrmAuth: PrmAuth{
BearerToken: bearerToken(ctx),
},
Container: *cnrID,
Container: cnrID,
Filters: filters,
}
@ -101,7 +100,7 @@ func (h *Handler) DownloadZipped(c *fasthttp.RequestCtx) {
return
}
resSearch, err := h.search(ctx, &bktInfo.CID, object.AttributeFilePath, prefix, object.MatchCommonPrefix)
resSearch, err := h.search(ctx, bktInfo.CID, object.AttributeFilePath, prefix, object.MatchCommonPrefix)
if err != nil {
log.Error(logs.CouldNotSearchForObjects, zap.Error(err))
response.Error(c, "could not search for objects: "+err.Error(), fasthttp.StatusBadRequest)

View file

@ -6,7 +6,9 @@ import (
"fmt"
"io"
"net/url"
"strconv"
"strings"
"sync"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/cache"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/data"
@ -31,7 +33,8 @@ type Config interface {
ZipCompression() bool
ClientCut() bool
IndexPageEnabled() bool
IndexPageTemplate() string
IndexPageS3Template() string
IndexPageNativeTemplate() string
BufferMaxSizeForPut() uint64
NamespaceHeader() string
}
@ -182,7 +185,7 @@ func New(params *AppParams, config Config, tree *tree.Tree) *Handler {
func (h *Handler) byAddress(c *fasthttp.RequestCtx, f func(context.Context, request, oid.Address)) {
var (
idCnr, _ = c.UserValue("cid").(string)
idObj, _ = c.UserValue("oid").(string)
idObj, _ = url.PathUnescape(c.UserValue("oid").(string))
log = h.log.With(zap.String("cid", idCnr), zap.String("oid", idObj))
)
@ -196,6 +199,18 @@ func (h *Handler) byAddress(c *fasthttp.RequestCtx, f func(context.Context, requ
objID := new(oid.ID)
if err = objID.DecodeString(idObj); err != nil {
if h.config.IndexPageEnabled() {
var addr oid.Address
addr.SetContainer(bktInfo.CID)
c.SetStatusCode(fasthttp.StatusNotFound)
h.browseObjects(c, browseParams{
bucketInfo: bktInfo,
prefix: idObj,
listObjects: h.getDirObjectsNative,
template: h.config.IndexPageNativeTemplate(),
})
return
}
log.Error(logs.WrongObjectID, zap.Error(err))
response.Error(c, "wrong object id", fasthttp.StatusBadRequest)
return
@ -205,6 +220,28 @@ func (h *Handler) byAddress(c *fasthttp.RequestCtx, f func(context.Context, requ
addr.SetContainer(bktInfo.CID)
addr.SetObject(*objID)
_, err = h.frostfs.GetObject(ctx, PrmObjectGet{
PrmAuth: PrmAuth{
BearerToken: bearerToken(ctx),
},
Address: addr,
})
if err != nil {
if h.config.IndexPageEnabled() {
c.SetStatusCode(fasthttp.StatusNotFound)
h.browseObjects(c, browseParams{
bucketInfo: bktInfo,
prefix: "",
listObjects: h.getDirObjectsNative,
template: h.config.IndexPageNativeTemplate(),
})
return
}
logAndSendBucketError(c, log, err)
return
}
f(ctx, *h.newRequest(c, log), addr)
}
@ -237,7 +274,12 @@ func (h *Handler) byObjectName(c *fasthttp.RequestCtx, f func(context.Context, r
if isDir(unescapedKey) || isContainerRoot(unescapedKey) {
if code := checkErrorType(err); code == fasthttp.StatusNotFound || code == fasthttp.StatusOK {
c.SetStatusCode(code)
h.browseObjects(c, bktInfo, unescapedKey)
h.browseObjects(c, browseParams{
bucketInfo: bktInfo,
prefix: unescapedKey,
listObjects: h.getDirObjectsS3,
template: h.config.IndexPageS3Template(),
})
return
}
}
@ -294,7 +336,7 @@ func (h *Handler) byAttribute(c *fasthttp.RequestCtx, f func(context.Context, re
return
}
res, err := h.search(ctx, &bktInfo.CID, key, val, object.MatchStringEqual)
res, err := h.search(ctx, bktInfo.CID, key, val, object.MatchStringEqual)
if err != nil {
log.Error(logs.CouldNotSearchForObjects, zap.Error(err))
response.Error(c, "could not search for objects: "+err.Error(), fasthttp.StatusBadRequest)
@ -391,24 +433,125 @@ func (h *Handler) readContainer(ctx context.Context, cnrID cid.ID) (*data.Bucket
return bktInfo, err
}
func (h *Handler) listObjects(ctx context.Context, bucketInfo *data.BucketInfo, prefix string) ([]map[string]string, error) {
func (h *Handler) getDirObjectsS3(ctx context.Context, bucketInfo *data.BucketInfo, prefix string) ([]ResponseObject, error) {
nodes, _, err := h.tree.GetSubTreeByPrefix(ctx, bucketInfo, prefix, true)
if err != nil {
return nil, err
}
var objects = make([]map[string]string, 0, len(nodes))
var objects = make([]ResponseObject, 0, len(nodes))
for _, node := range nodes {
meta := node.GetMeta()
if meta == nil {
continue
}
var obj = make(map[string]string, len(meta))
var attrs = make(map[string]string, len(meta))
for _, m := range meta {
obj[m.GetKey()] = string(m.GetValue())
attrs[m.GetKey()] = string(m.GetValue())
}
obj := newListObjectsResponseS3(attrs)
obj.FilePath = prefix + obj.FileName
objects = append(objects, obj)
}
return objects, nil
}
func (h *Handler) getDirObjectsNative(ctx context.Context, bucketInfo *data.BucketInfo, prefix string) ([]ResponseObject, error) {
basePath := strings.TrimRightFunc(prefix, func(r rune) bool {
return r != '/'
})
objectIDs, err := h.search(ctx, bucketInfo.CID, object.AttributeFilePath, prefix, object.MatchCommonPrefix)
if err != nil {
return nil, err
}
defer objectIDs.Close()
objects, err := h.headDirObjects(ctx, bucketInfo.CID, objectIDs, basePath)
if err != nil {
return nil, err
}
return objects, nil
}
func (h *Handler) headDirObjects(ctx context.Context, cID cid.ID, objectIDs ResObjectSearch, basePath string) ([]ResponseObject, error) {
const initialSliceCapacity = 100
var (
log = h.log.With(
zap.String("cid", cID.EncodeToString()),
zap.String("prefix", basePath),
)
mu = sync.Mutex{}
wg = sync.WaitGroup{}
errChan = make(chan error)
addr oid.Address
objects = make([]ResponseObject, 0, initialSliceCapacity)
dirs = sync.Map{}
auth = PrmAuth{
BearerToken: bearerToken(ctx),
}
)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
go func() {
for err := range errChan {
if err != nil {
log.Error(logs.FailedToHeadObject, zap.Error(err))
}
}
}()
addr.SetContainer(cID)
err := objectIDs.Iterate(func(id oid.ID) bool {
wg.Add(1)
go func() {
defer wg.Done()
addr.SetObject(id)
obj, err := h.frostfs.HeadObject(ctx, PrmObjectHead{
PrmAuth: auth,
Address: addr,
})
if err != nil {
errChan <- err
cancel()
return
}
attrs := loadAttributes(obj.Attributes())
attrs[attrOID] = id.EncodeToString()
attrs[attrSize] = strconv.FormatUint(obj.PayloadSize(), 10)
dirname := getNextDir(attrs[object.AttributeFilePath], basePath)
if dirname == "" {
mu.Lock()
objects = append(objects, newListObjectsResponseNative(attrs))
mu.Unlock()
} else if _, ok := dirs.Load(dirname); !ok {
mu.Lock()
objects = append(objects, ResponseObject{
FileName: dirname,
FilePath: basePath + dirname,
IsDir: true,
})
mu.Unlock()
dirs.Store(dirname, true)
}
}()
return false
})
if err != nil {
return nil, err
}
wg.Wait()
close(errChan)
return objects, nil
}

View file

@ -57,10 +57,10 @@ func (c *configMock) IndexPageEnabled() bool {
return false
}
func (c *configMock) IndexPageTemplatePath() string {
func (c *configMock) IndexPageS3Template() string {
return ""
}
func (c *configMock) IndexPageTemplate() string {
func (c *configMock) IndexPageNativeTemplate() string {
return ""
}

View file

@ -12,6 +12,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/tree"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
"github.com/valyala/fasthttp"
"go.uber.org/zap"
)
@ -59,6 +60,14 @@ func checkErrorType(err error) int {
}
}
func loadAttributes(attrs []object.Attribute) map[string]string {
result := make(map[string]string)
for _, attr := range attrs {
result[attr.Key()] = attr.Value()
}
return result
}
func isValidToken(s string) bool {
for _, c := range s {
if c <= ' ' || c > 127 {