[#166] Fix getting s3 object with the FrostFS OID name
Prioritize getting s3 object with the key, which equals to valid FrostFS OID, rather than getting non-existent object with OID via native protocol for GET and HEAD requests Signed-off-by: Nikita Zinkevich <n.zinkevich@yadro.com>
This commit is contained in:
parent
e81f01c2ab
commit
98520b1692
5 changed files with 100 additions and 118 deletions
|
@ -8,6 +8,7 @@ import (
|
||||||
"io"
|
"io"
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/url"
|
"net/url"
|
||||||
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs"
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs"
|
||||||
|
@ -23,21 +24,44 @@ import (
|
||||||
|
|
||||||
// DownloadByAddressOrBucketName handles download requests using simple cid/oid or bucketname/key format.
|
// DownloadByAddressOrBucketName handles download requests using simple cid/oid or bucketname/key format.
|
||||||
func (h *Handler) DownloadByAddressOrBucketName(c *fasthttp.RequestCtx) {
|
func (h *Handler) DownloadByAddressOrBucketName(c *fasthttp.RequestCtx) {
|
||||||
oidURLParam := c.UserValue("oid").(string)
|
cidParam := c.UserValue("cid").(string)
|
||||||
downloadQueryParam := c.QueryArgs().GetBool("download")
|
oidParam := c.UserValue("oid").(string)
|
||||||
|
downloadParam := c.QueryArgs().GetBool("download")
|
||||||
|
|
||||||
switch {
|
ctx := utils.GetContextFromRequest(c)
|
||||||
case isObjectID(oidURLParam):
|
log := utils.GetReqLogOrDefault(ctx, h.log)
|
||||||
h.byNativeAddress(c, h.receiveFile)
|
|
||||||
case !isContainerRoot(oidURLParam) && (downloadQueryParam || !isDir(oidURLParam)):
|
bktInfo, err := h.getBucketInfo(ctx, cidParam, log)
|
||||||
h.byS3Path(c, h.receiveFile)
|
if err != nil {
|
||||||
default:
|
logAndSendBucketError(c, log, err)
|
||||||
h.browseIndex(c)
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
s3checkErr := h.tree.CheckSettingsNodeExist(ctx, bktInfo)
|
||||||
|
if s3checkErr != nil && !strings.Contains(s3checkErr.Error(), "tree not found") {
|
||||||
|
logAndSendBucketError(c, log, s3checkErr)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
var objID oid.ID
|
||||||
|
if s3checkErr == nil && shouldDownload(oidParam, downloadParam) {
|
||||||
|
// Receive file via S3
|
||||||
|
h.byS3Path(c, bktInfo.CID, h.receiveFile)
|
||||||
|
} else if err = objID.DecodeString(oidParam); err == nil {
|
||||||
|
// Receive file via native protocol
|
||||||
|
addr := newAddress(bktInfo.CID, objID)
|
||||||
|
h.receiveFile(ctx, h.newRequest(c, log), addr)
|
||||||
|
} else {
|
||||||
|
h.browseIndex(c, s3checkErr != nil)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *Handler) newRequest(ctx *fasthttp.RequestCtx, log *zap.Logger) *request {
|
func shouldDownload(oidParam string, downloadParam bool) bool {
|
||||||
return &request{
|
return !isDir(oidParam) || downloadParam
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *Handler) newRequest(ctx *fasthttp.RequestCtx, log *zap.Logger) request {
|
||||||
|
return request{
|
||||||
RequestCtx: ctx,
|
RequestCtx: ctx,
|
||||||
log: log,
|
log: log,
|
||||||
}
|
}
|
||||||
|
|
|
@ -190,79 +190,45 @@ func New(params *AppParams, config Config, tree *tree.Tree, workerPool *ants.Poo
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// byNativeAddress is a wrapper for function (e.g. request.headObject, request.receiveFile) that
|
// resolveContainer decode container id, if it's not a valid container id
|
||||||
// prepares request and object address to it.
|
// then trey to resolve name using provided resolver.
|
||||||
func (h *Handler) byNativeAddress(c *fasthttp.RequestCtx, f func(context.Context, request, oid.Address)) {
|
func (h *Handler) resolveContainer(ctx context.Context, containerID string) (*cid.ID, error) {
|
||||||
idCnr, _ := c.UserValue("cid").(string)
|
cnrID := new(cid.ID)
|
||||||
idObj, _ := url.PathUnescape(c.UserValue("oid").(string))
|
err := cnrID.DecodeString(containerID)
|
||||||
|
|
||||||
ctx := utils.GetContextFromRequest(c)
|
|
||||||
reqLog := utils.GetReqLogOrDefault(ctx, h.log)
|
|
||||||
log := reqLog.With(zap.String("cid", idCnr), zap.String("oid", idObj))
|
|
||||||
|
|
||||||
bktInfo, err := h.getBucketInfo(ctx, idCnr, log)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logAndSendBucketError(c, log, err)
|
cnrID, err = h.containerResolver.Resolve(ctx, containerID)
|
||||||
return
|
if err != nil && strings.Contains(err.Error(), "not found") {
|
||||||
|
err = fmt.Errorf("%w: %s", new(apistatus.ContainerNotFound), err.Error())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
return cnrID, err
|
||||||
objID := new(oid.ID)
|
|
||||||
if err = objID.DecodeString(idObj); err != nil {
|
|
||||||
log.Error(logs.WrongObjectID, zap.Error(err))
|
|
||||||
response.Error(c, "wrong object id", fasthttp.StatusBadRequest)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
addr := newAddress(bktInfo.CID, *objID)
|
|
||||||
|
|
||||||
f(ctx, *h.newRequest(c, log), addr)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// byS3Path is a wrapper for function (e.g. request.headObject, request.receiveFile) that
|
func (h *Handler) byS3Path(c *fasthttp.RequestCtx, cnrID cid.ID, handler func(context.Context, request, oid.Address)) {
|
||||||
// resolves object address from S3-like path <bucket name>/<object key>.
|
oidParam := c.UserValue("oid").(string)
|
||||||
func (h *Handler) byS3Path(c *fasthttp.RequestCtx, f func(context.Context, request, oid.Address)) {
|
|
||||||
bucketname := c.UserValue("cid").(string)
|
|
||||||
key := c.UserValue("oid").(string)
|
|
||||||
|
|
||||||
ctx := utils.GetContextFromRequest(c)
|
ctx := utils.GetContextFromRequest(c)
|
||||||
reqLog := utils.GetReqLogOrDefault(ctx, h.log)
|
log := utils.GetReqLogOrDefault(ctx, h.log).With(
|
||||||
log := reqLog.With(zap.String("bucketname", bucketname), zap.String("key", key))
|
zap.String("cid", cnrID.EncodeToString()),
|
||||||
|
zap.String("oid", oidParam),
|
||||||
|
)
|
||||||
|
|
||||||
unescapedKey, err := url.QueryUnescape(key)
|
foundOID, err := h.tree.GetLatestVersion(ctx, &cnrID, oidParam)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logAndSendBucketError(c, log, err)
|
logAndSendBucketError(c, log, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
if foundOID.DeleteMarker {
|
||||||
bktInfo, err := h.getBucketInfo(ctx, bucketname, log)
|
|
||||||
if err != nil {
|
|
||||||
logAndSendBucketError(c, log, err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
foundOid, err := h.tree.GetLatestVersion(ctx, &bktInfo.CID, unescapedKey)
|
|
||||||
if err != nil {
|
|
||||||
if errors.Is(err, tree.ErrNodeAccessDenied) {
|
|
||||||
response.Error(c, "Access Denied", fasthttp.StatusForbidden)
|
|
||||||
} else {
|
|
||||||
response.Error(c, "object wasn't found", fasthttp.StatusNotFound)
|
|
||||||
log.Error(logs.GetLatestObjectVersion, zap.Error(err))
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if foundOid.DeleteMarker {
|
|
||||||
log.Error(logs.ObjectWasDeleted)
|
log.Error(logs.ObjectWasDeleted)
|
||||||
response.Error(c, "object deleted", fasthttp.StatusNotFound)
|
response.Error(c, "object deleted", fasthttp.StatusNotFound)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
addr := newAddress(bktInfo.CID, foundOid.OID)
|
|
||||||
|
|
||||||
f(ctx, *h.newRequest(c, log), addr)
|
addr := newAddress(cnrID, foundOID.OID)
|
||||||
|
handler(ctx, h.newRequest(c, log), addr)
|
||||||
}
|
}
|
||||||
|
|
||||||
// byAttribute is a wrapper similar to byNativeAddress.
|
func (h *Handler) byAttribute(c *fasthttp.RequestCtx, handler func(context.Context, request, oid.Address)) {
|
||||||
func (h *Handler) byAttribute(c *fasthttp.RequestCtx, f func(context.Context, request, oid.Address)) {
|
cidParam, _ := c.UserValue("cid").(string)
|
||||||
scid, _ := c.UserValue("cid").(string)
|
|
||||||
key, _ := c.UserValue("attr_key").(string)
|
key, _ := c.UserValue("attr_key").(string)
|
||||||
val, _ := c.UserValue("attr_val").(string)
|
val, _ := c.UserValue("attr_val").(string)
|
||||||
|
|
||||||
|
@ -271,21 +237,21 @@ func (h *Handler) byAttribute(c *fasthttp.RequestCtx, f func(context.Context, re
|
||||||
|
|
||||||
key, err := url.QueryUnescape(key)
|
key, err := url.QueryUnescape(key)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error(logs.FailedToUnescapeQuery, zap.String("cid", scid), zap.String("attr_key", key), zap.Error(err))
|
log.Error(logs.FailedToUnescapeQuery, zap.String("cid", cidParam), zap.String("attr_key", key), zap.Error(err))
|
||||||
response.Error(c, "could not unescape attr_key: "+err.Error(), fasthttp.StatusBadRequest)
|
response.Error(c, "could not unescape attr_key: "+err.Error(), fasthttp.StatusBadRequest)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
val, err = url.QueryUnescape(val)
|
val, err = url.QueryUnescape(val)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error(logs.FailedToUnescapeQuery, zap.String("cid", scid), zap.String("attr_val", val), zap.Error(err))
|
log.Error(logs.FailedToUnescapeQuery, zap.String("cid", cidParam), zap.String("attr_val", val), zap.Error(err))
|
||||||
response.Error(c, "could not unescape attr_val: "+err.Error(), fasthttp.StatusBadRequest)
|
response.Error(c, "could not unescape attr_val: "+err.Error(), fasthttp.StatusBadRequest)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
log = log.With(zap.String("cid", scid), zap.String("attr_key", key), zap.String("attr_val", val))
|
log = log.With(zap.String("cid", cidParam), zap.String("attr_key", key), zap.String("attr_val", val))
|
||||||
|
|
||||||
bktInfo, err := h.getBucketInfo(ctx, scid, log)
|
bktInfo, err := h.getBucketInfo(ctx, cidParam, log)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logAndSendBucketError(c, log, err)
|
logAndSendBucketError(c, log, err)
|
||||||
return
|
return
|
||||||
|
@ -315,25 +281,11 @@ func (h *Handler) byAttribute(c *fasthttp.RequestCtx, f func(context.Context, re
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
var addrObj oid.Address
|
var addr oid.Address
|
||||||
addrObj.SetContainer(bktInfo.CID)
|
addr.SetContainer(bktInfo.CID)
|
||||||
addrObj.SetObject(buf[0])
|
addr.SetObject(buf[0])
|
||||||
|
|
||||||
f(ctx, *h.newRequest(c, log), addrObj)
|
handler(ctx, h.newRequest(c, log), addr)
|
||||||
}
|
|
||||||
|
|
||||||
// resolveContainer decode container id, if it's not a valid container id
|
|
||||||
// then trey to resolve name using provided resolver.
|
|
||||||
func (h *Handler) resolveContainer(ctx context.Context, containerID string) (*cid.ID, error) {
|
|
||||||
cnrID := new(cid.ID)
|
|
||||||
err := cnrID.DecodeString(containerID)
|
|
||||||
if err != nil {
|
|
||||||
cnrID, err = h.containerResolver.Resolve(ctx, containerID)
|
|
||||||
if err != nil && strings.Contains(err.Error(), "not found") {
|
|
||||||
err = fmt.Errorf("%w: %s", new(apistatus.ContainerNotFound), err.Error())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return cnrID, err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *Handler) getBucketInfo(ctx context.Context, containerName string, log *zap.Logger) (*data.BucketInfo, error) {
|
func (h *Handler) getBucketInfo(ctx context.Context, containerName string, log *zap.Logger) (*data.BucketInfo, error) {
|
||||||
|
@ -388,7 +340,7 @@ func (h *Handler) readContainer(ctx context.Context, cnrID cid.ID) (*data.Bucket
|
||||||
return bktInfo, err
|
return bktInfo, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *Handler) browseIndex(c *fasthttp.RequestCtx) {
|
func (h *Handler) browseIndex(c *fasthttp.RequestCtx, isNativeList bool) {
|
||||||
if !h.config.IndexPageEnabled() {
|
if !h.config.IndexPageEnabled() {
|
||||||
c.SetStatusCode(fasthttp.StatusNotFound)
|
c.SetStatusCode(fasthttp.StatusNotFound)
|
||||||
return
|
return
|
||||||
|
@ -414,18 +366,9 @@ func (h *Handler) browseIndex(c *fasthttp.RequestCtx) {
|
||||||
}
|
}
|
||||||
|
|
||||||
listFunc := h.getDirObjectsS3
|
listFunc := h.getDirObjectsS3
|
||||||
isNativeList := false
|
if isNativeList {
|
||||||
|
// tree probe failed, trying to use native
|
||||||
err = h.tree.CheckSettingsNodeExist(ctx, bktInfo)
|
listFunc = h.getDirObjectsNative
|
||||||
if err != nil {
|
|
||||||
if errors.Is(err, tree.ErrNodeNotFound) {
|
|
||||||
// tree probe failed, try to use native
|
|
||||||
listFunc = h.getDirObjectsNative
|
|
||||||
isNativeList = true
|
|
||||||
} else {
|
|
||||||
logAndSendBucketError(c, log, err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
h.browseObjects(c, browseParams{
|
h.browseObjects(c, browseParams{
|
||||||
|
|
|
@ -128,7 +128,7 @@ func prepareHandlerContext() (*handlerContext, error) {
|
||||||
treeMock := &treeClientMock{}
|
treeMock := &treeClientMock{}
|
||||||
cfgMock := &configMock{}
|
cfgMock := &configMock{}
|
||||||
|
|
||||||
workerPool, err := ants.NewPool(1000)
|
workerPool, err := ants.NewPool(1)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
|
@ -5,6 +5,7 @@ import (
|
||||||
"io"
|
"io"
|
||||||
"net/http"
|
"net/http"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs"
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs"
|
||||||
|
@ -102,14 +103,37 @@ func idsToResponse(resp *fasthttp.Response, obj *object.Object) {
|
||||||
|
|
||||||
// HeadByAddressOrBucketName handles head requests using simple cid/oid or bucketname/key format.
|
// HeadByAddressOrBucketName handles head requests using simple cid/oid or bucketname/key format.
|
||||||
func (h *Handler) HeadByAddressOrBucketName(c *fasthttp.RequestCtx) {
|
func (h *Handler) HeadByAddressOrBucketName(c *fasthttp.RequestCtx) {
|
||||||
test, _ := c.UserValue("oid").(string)
|
cidParam, _ := c.UserValue("cid").(string)
|
||||||
var id oid.ID
|
oidParam, _ := c.UserValue("oid").(string)
|
||||||
|
|
||||||
err := id.DecodeString(test)
|
ctx := utils.GetContextFromRequest(c)
|
||||||
|
log := utils.GetReqLogOrDefault(ctx, h.log).With(
|
||||||
|
zap.String("cid", cidParam),
|
||||||
|
zap.String("oid", oidParam),
|
||||||
|
)
|
||||||
|
|
||||||
|
bktInfo, err := h.getBucketInfo(ctx, cidParam, log)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
h.byS3Path(c, h.headObject)
|
logAndSendBucketError(c, log, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
checkErr := h.tree.CheckSettingsNodeExist(ctx, bktInfo)
|
||||||
|
if checkErr != nil && !strings.Contains(checkErr.Error(), "tree not found") {
|
||||||
|
logAndSendBucketError(c, log, checkErr)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
var objID oid.ID
|
||||||
|
if checkErr == nil {
|
||||||
|
// Head object via s3 protocol
|
||||||
|
h.byS3Path(c, bktInfo.CID, h.headObject)
|
||||||
|
} else if err = objID.DecodeString(oidParam); err == nil {
|
||||||
|
// Head object via native protocol
|
||||||
|
addr := newAddress(bktInfo.CID, objID)
|
||||||
|
h.headObject(ctx, h.newRequest(c, log), addr)
|
||||||
} else {
|
} else {
|
||||||
h.byNativeAddress(c, h.headObject)
|
logAndSendBucketError(c, log, checkErr)
|
||||||
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -42,16 +42,7 @@ func bearerToken(ctx context.Context) *bearer.Token {
|
||||||
}
|
}
|
||||||
|
|
||||||
func isDir(name string) bool {
|
func isDir(name string) bool {
|
||||||
return strings.HasSuffix(name, "/")
|
return name == "" || strings.HasSuffix(name, "/")
|
||||||
}
|
|
||||||
|
|
||||||
func isObjectID(s string) bool {
|
|
||||||
var objID oid.ID
|
|
||||||
return objID.DecodeString(s) == nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func isContainerRoot(key string) bool {
|
|
||||||
return key == ""
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func loadAttributes(attrs []object.Attribute) map[string]string {
|
func loadAttributes(attrs []object.Attribute) map[string]string {
|
||||||
|
|
Loading…
Add table
Reference in a new issue