forked from TrueCloudLab/frostfs-http-gw
382 lines
9.2 KiB
Go
382 lines
9.2 KiB
Go
package handler
|
|
|
|
import (
|
|
"context"
|
|
"html/template"
|
|
"net/url"
|
|
"sort"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/data"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/utils"
|
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
|
"github.com/docker/go-units"
|
|
"github.com/valyala/fasthttp"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
const (
|
|
dateFormat = "02-01-2006 15:04"
|
|
attrOID = "OID"
|
|
attrCreated = "Created"
|
|
attrFileName = "FileName"
|
|
attrFilePath = "FilePath"
|
|
attrSize = "Size"
|
|
attrDeleteMarker = "IsDeleteMarker"
|
|
)
|
|
|
|
type (
|
|
BrowsePageData struct {
|
|
HasErrors bool
|
|
Container string
|
|
Prefix string
|
|
Protocol string
|
|
Objects []ResponseObject
|
|
}
|
|
ResponseObject struct {
|
|
OID string
|
|
Created string
|
|
FileName string
|
|
FilePath string
|
|
Size string
|
|
IsDir bool
|
|
GetURL string
|
|
IsDeleteMarker bool
|
|
}
|
|
)
|
|
|
|
func newListObjectsResponseS3(attrs map[string]string) ResponseObject {
|
|
return ResponseObject{
|
|
Created: formatTimestamp(attrs[attrCreated]),
|
|
OID: attrs[attrOID],
|
|
FileName: attrs[attrFileName],
|
|
Size: attrs[attrSize],
|
|
IsDir: attrs[attrOID] == "",
|
|
IsDeleteMarker: attrs[attrDeleteMarker] == "true",
|
|
}
|
|
}
|
|
|
|
func newListObjectsResponseNative(attrs map[string]string) ResponseObject {
|
|
filename := lastPathElement(attrs[object.AttributeFilePath])
|
|
if filename == "" {
|
|
filename = attrs[attrFileName]
|
|
}
|
|
return ResponseObject{
|
|
OID: attrs[attrOID],
|
|
Created: formatTimestamp(attrs[object.AttributeTimestamp] + "000"),
|
|
FileName: filename,
|
|
FilePath: attrs[object.AttributeFilePath],
|
|
Size: attrs[attrSize],
|
|
IsDir: false,
|
|
}
|
|
}
|
|
|
|
func getNextDir(filepath, prefix string) string {
|
|
restPath := strings.Replace(filepath, prefix, "", 1)
|
|
index := strings.Index(restPath, "/")
|
|
if index == -1 {
|
|
return ""
|
|
}
|
|
return restPath[:index]
|
|
}
|
|
|
|
func lastPathElement(path string) string {
|
|
if path == "" {
|
|
return path
|
|
}
|
|
index := strings.LastIndex(path, "/")
|
|
if index == len(path)-1 {
|
|
index = strings.LastIndex(path[:index], "/")
|
|
}
|
|
return path[index+1:]
|
|
}
|
|
|
|
func parseTimestamp(tstamp string) (time.Time, error) {
|
|
millis, err := strconv.ParseInt(tstamp, 10, 64)
|
|
if err != nil {
|
|
return time.Time{}, err
|
|
}
|
|
|
|
return time.UnixMilli(millis), nil
|
|
}
|
|
|
|
func formatTimestamp(strdate string) string {
|
|
date, err := parseTimestamp(strdate)
|
|
if err != nil || date.IsZero() {
|
|
return ""
|
|
}
|
|
|
|
return date.Format(dateFormat)
|
|
}
|
|
|
|
func formatSize(strsize string) string {
|
|
size, err := strconv.ParseFloat(strsize, 64)
|
|
if err != nil {
|
|
return "0B"
|
|
}
|
|
return units.HumanSize(size)
|
|
}
|
|
|
|
func parentDir(prefix string) string {
|
|
index := strings.LastIndex(prefix, "/")
|
|
if index == -1 {
|
|
return prefix
|
|
}
|
|
return prefix[index:]
|
|
}
|
|
|
|
func trimPrefix(encPrefix string) string {
|
|
prefix, err := url.PathUnescape(encPrefix)
|
|
if err != nil {
|
|
return ""
|
|
}
|
|
slashIndex := strings.LastIndex(prefix, "/")
|
|
if slashIndex == -1 {
|
|
return ""
|
|
}
|
|
return prefix[:slashIndex]
|
|
}
|
|
|
|
func urlencode(path string) string {
|
|
var res strings.Builder
|
|
|
|
prefixParts := strings.Split(path, "/")
|
|
for _, prefixPart := range prefixParts {
|
|
prefixPart = "/" + url.PathEscape(prefixPart)
|
|
if prefixPart == "/." || prefixPart == "/.." {
|
|
prefixPart = url.PathEscape(prefixPart)
|
|
}
|
|
res.WriteString(prefixPart)
|
|
}
|
|
|
|
return res.String()
|
|
}
|
|
|
|
type GetObjectsResponse struct {
|
|
objects []ResponseObject
|
|
hasErrors bool
|
|
}
|
|
|
|
func (h *Handler) getDirObjectsS3(ctx context.Context, bucketInfo *data.BucketInfo, prefix string) (*GetObjectsResponse, error) {
|
|
nodes, _, err := h.tree.GetSubTreeByPrefix(ctx, bucketInfo, prefix, true)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
result := &GetObjectsResponse{
|
|
objects: make([]ResponseObject, 0, len(nodes)),
|
|
}
|
|
for _, node := range nodes {
|
|
meta := node.Meta
|
|
if meta == nil {
|
|
continue
|
|
}
|
|
var attrs = make(map[string]string, len(meta))
|
|
for _, m := range meta {
|
|
attrs[m.GetKey()] = string(m.GetValue())
|
|
}
|
|
obj := newListObjectsResponseS3(attrs)
|
|
if obj.IsDeleteMarker {
|
|
continue
|
|
}
|
|
obj.FilePath = prefix + obj.FileName
|
|
obj.GetURL = "/get/" + bucketInfo.Name + urlencode(obj.FilePath)
|
|
result.objects = append(result.objects, obj)
|
|
}
|
|
|
|
return result, nil
|
|
}
|
|
|
|
func (h *Handler) getDirObjectsNative(ctx context.Context, bucketInfo *data.BucketInfo, prefix string) (*GetObjectsResponse, error) {
|
|
var basePath string
|
|
if ind := strings.LastIndex(prefix, "/"); ind != -1 {
|
|
basePath = prefix[:ind+1]
|
|
}
|
|
|
|
filters := object.NewSearchFilters()
|
|
filters.AddRootFilter()
|
|
if prefix != "" {
|
|
filters.AddFilter(object.AttributeFilePath, prefix, object.MatchCommonPrefix)
|
|
}
|
|
|
|
prm := PrmObjectSearch{
|
|
PrmAuth: PrmAuth{
|
|
BearerToken: bearerToken(ctx),
|
|
},
|
|
Container: bucketInfo.CID,
|
|
Filters: filters,
|
|
}
|
|
objectIDs, err := h.frostfs.SearchObjects(ctx, prm)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer objectIDs.Close()
|
|
|
|
resp, err := h.headDirObjects(ctx, bucketInfo.CID, objectIDs, basePath)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
log := utils.GetReqLogOrDefault(ctx, h.log)
|
|
dirs := make(map[string]struct{})
|
|
result := &GetObjectsResponse{
|
|
objects: make([]ResponseObject, 0, 100),
|
|
}
|
|
for objExt := range resp {
|
|
if objExt.Error != nil {
|
|
log.Error(logs.FailedToHeadObject, zap.Error(objExt.Error))
|
|
result.hasErrors = true
|
|
continue
|
|
}
|
|
if objExt.Object.IsDir {
|
|
if _, ok := dirs[objExt.Object.FileName]; ok {
|
|
continue
|
|
}
|
|
objExt.Object.GetURL = "/get/" + bucketInfo.CID.EncodeToString() + urlencode(objExt.Object.FilePath)
|
|
dirs[objExt.Object.FileName] = struct{}{}
|
|
} else {
|
|
objExt.Object.GetURL = "/get/" + bucketInfo.CID.EncodeToString() + "/" + objExt.Object.OID
|
|
}
|
|
result.objects = append(result.objects, objExt.Object)
|
|
}
|
|
return result, nil
|
|
}
|
|
|
|
type ResponseObjectExtended struct {
|
|
Object ResponseObject
|
|
Error error
|
|
}
|
|
|
|
func (h *Handler) headDirObjects(ctx context.Context, cnrID cid.ID, objectIDs ResObjectSearch, basePath string) (<-chan ResponseObjectExtended, error) {
|
|
res := make(chan ResponseObjectExtended)
|
|
|
|
go func() {
|
|
defer close(res)
|
|
log := utils.GetReqLogOrDefault(ctx, h.log).With(
|
|
zap.String("cid", cnrID.EncodeToString()),
|
|
zap.String("path", basePath),
|
|
)
|
|
var wg sync.WaitGroup
|
|
err := objectIDs.Iterate(func(id oid.ID) bool {
|
|
wg.Add(1)
|
|
err := h.workerPool.Submit(func() {
|
|
defer wg.Done()
|
|
var obj ResponseObjectExtended
|
|
obj.Object, obj.Error = h.headDirObject(ctx, cnrID, id, basePath)
|
|
res <- obj
|
|
})
|
|
if err != nil {
|
|
wg.Done()
|
|
log.Warn(logs.FailedToSumbitTaskToPool, zap.Error(err))
|
|
}
|
|
select {
|
|
case <-ctx.Done():
|
|
return true
|
|
default:
|
|
return false
|
|
}
|
|
})
|
|
if err != nil {
|
|
log.Error(logs.FailedToIterateOverResponse, zap.Error(err))
|
|
}
|
|
wg.Wait()
|
|
}()
|
|
|
|
return res, nil
|
|
}
|
|
|
|
func (h *Handler) headDirObject(ctx context.Context, cnrID cid.ID, objID oid.ID, basePath string) (ResponseObject, error) {
|
|
addr := newAddress(cnrID, objID)
|
|
obj, err := h.frostfs.HeadObject(ctx, PrmObjectHead{
|
|
PrmAuth: PrmAuth{BearerToken: bearerToken(ctx)},
|
|
Address: addr,
|
|
})
|
|
if err != nil {
|
|
return ResponseObject{}, err
|
|
}
|
|
|
|
attrs := loadAttributes(obj.Attributes())
|
|
attrs[attrOID] = objID.EncodeToString()
|
|
if multipartSize, ok := attrs[attributeMultipartObjectSize]; ok {
|
|
attrs[attrSize] = multipartSize
|
|
} else {
|
|
attrs[attrSize] = strconv.FormatUint(obj.PayloadSize(), 10)
|
|
}
|
|
|
|
dirname := getNextDir(attrs[object.AttributeFilePath], basePath)
|
|
if dirname == "" {
|
|
return newListObjectsResponseNative(attrs), nil
|
|
}
|
|
|
|
return ResponseObject{
|
|
FileName: dirname,
|
|
FilePath: basePath + dirname,
|
|
IsDir: true,
|
|
}, nil
|
|
}
|
|
|
|
type browseParams struct {
|
|
bucketInfo *data.BucketInfo
|
|
prefix string
|
|
isNative bool
|
|
listObjects func(ctx context.Context, bucketName *data.BucketInfo, prefix string) (*GetObjectsResponse, error)
|
|
}
|
|
|
|
func (h *Handler) browseObjects(c *fasthttp.RequestCtx, p browseParams) {
|
|
const S3Protocol = "s3"
|
|
const FrostfsProtocol = "frostfs"
|
|
|
|
ctx := utils.GetContextFromRequest(c)
|
|
reqLog := utils.GetReqLogOrDefault(ctx, h.log)
|
|
log := reqLog.With(
|
|
zap.String("bucket", p.bucketInfo.Name),
|
|
zap.String("container", p.bucketInfo.CID.EncodeToString()),
|
|
zap.String("prefix", p.prefix),
|
|
)
|
|
resp, err := p.listObjects(ctx, p.bucketInfo, p.prefix)
|
|
if err != nil {
|
|
logAndSendBucketError(c, log, err)
|
|
return
|
|
}
|
|
|
|
objects := resp.objects
|
|
sort.Slice(objects, func(i, j int) bool {
|
|
if objects[i].IsDir == objects[j].IsDir {
|
|
return objects[i].FileName < objects[j].FileName
|
|
}
|
|
return objects[i].IsDir
|
|
})
|
|
|
|
tmpl, err := template.New("index").Funcs(template.FuncMap{
|
|
"formatSize": formatSize,
|
|
"trimPrefix": trimPrefix,
|
|
"urlencode": urlencode,
|
|
"parentDir": parentDir,
|
|
}).Parse(h.config.IndexPageTemplate())
|
|
if err != nil {
|
|
logAndSendBucketError(c, log, err)
|
|
return
|
|
}
|
|
bucketName := p.bucketInfo.Name
|
|
protocol := S3Protocol
|
|
if p.isNative {
|
|
bucketName = p.bucketInfo.CID.EncodeToString()
|
|
protocol = FrostfsProtocol
|
|
}
|
|
if err = tmpl.Execute(c, &BrowsePageData{
|
|
Container: bucketName,
|
|
Prefix: p.prefix,
|
|
Objects: objects,
|
|
Protocol: protocol,
|
|
HasErrors: resp.hasErrors,
|
|
}); err != nil {
|
|
logAndSendBucketError(c, log, err)
|
|
return
|
|
}
|
|
}
|