[#70] Support bucket/container caching
Mainly it was added because we need to know if TZ hashing is disabled or not for container Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
This commit is contained in:
parent
8bc246f8f9
commit
9a5a2239bd
17 changed files with 283 additions and 48 deletions
|
@ -3,14 +3,20 @@ package handler
|
|||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/url"
|
||||
"strings"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/cache"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/data"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/resolver"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/response"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/tree"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/utils"
|
||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
|
||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
|
@ -34,6 +40,7 @@ type Handler struct {
|
|||
config Config
|
||||
containerResolver *resolver.ContainerResolver
|
||||
tree *tree.Tree
|
||||
cache *cache.BucketCache
|
||||
}
|
||||
|
||||
func New(params *utils.AppParams, config Config, tree *tree.Tree) *Handler {
|
||||
|
@ -44,6 +51,7 @@ func New(params *utils.AppParams, config Config, tree *tree.Tree) *Handler {
|
|||
config: config,
|
||||
containerResolver: params.Resolver,
|
||||
tree: tree,
|
||||
cache: params.Cache,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -69,10 +77,9 @@ func (h *Handler) byAddress(c *fasthttp.RequestCtx, f func(context.Context, requ
|
|||
|
||||
ctx := utils.GetContextFromRequest(c)
|
||||
|
||||
cnrID, err := h.getContainerID(ctx, idCnr)
|
||||
bktInfo, err := h.getBucketInfo(ctx, idCnr, log)
|
||||
if err != nil {
|
||||
log.Error(logs.WrongContainerID, zap.Error(err))
|
||||
response.Error(c, "wrong container id", fasthttp.StatusBadRequest)
|
||||
logAndSendBucketError(c, log, err)
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -84,15 +91,15 @@ func (h *Handler) byAddress(c *fasthttp.RequestCtx, f func(context.Context, requ
|
|||
}
|
||||
|
||||
var addr oid.Address
|
||||
addr.SetContainer(*cnrID)
|
||||
addr.SetContainer(bktInfo.CID)
|
||||
addr.SetObject(*objID)
|
||||
|
||||
f(ctx, *h.newRequest(c, log), addr)
|
||||
}
|
||||
|
||||
// byBucketname is a wrapper for function (e.g. request.headObject, request.receiveFile) that
|
||||
// byObjectName is a wrapper for function (e.g. request.headObject, request.receiveFile) that
|
||||
// prepares request and object address to it.
|
||||
func (h *Handler) byBucketname(req *fasthttp.RequestCtx, f func(context.Context, request, oid.Address)) {
|
||||
func (h *Handler) byObjectName(req *fasthttp.RequestCtx, f func(context.Context, request, oid.Address)) {
|
||||
var (
|
||||
bucketname = req.UserValue("cid").(string)
|
||||
key = req.UserValue("oid").(string)
|
||||
|
@ -101,14 +108,13 @@ func (h *Handler) byBucketname(req *fasthttp.RequestCtx, f func(context.Context,
|
|||
|
||||
ctx := utils.GetContextFromRequest(req)
|
||||
|
||||
cnrID, err := h.getContainerID(ctx, bucketname)
|
||||
bktInfo, err := h.getBucketInfo(ctx, bucketname, log)
|
||||
if err != nil {
|
||||
log.Error(logs.WrongContainerID, zap.Error(err))
|
||||
response.Error(req, "wrong container id", fasthttp.StatusBadRequest)
|
||||
logAndSendBucketError(req, log, err)
|
||||
return
|
||||
}
|
||||
|
||||
foundOid, err := h.tree.GetLatestVersion(ctx, cnrID, key)
|
||||
foundOid, err := h.tree.GetLatestVersion(ctx, &bktInfo.CID, key)
|
||||
if err != nil {
|
||||
log.Error(logs.ObjectWasntFound, zap.Error(err))
|
||||
response.Error(req, "object wasn't found", fasthttp.StatusNotFound)
|
||||
|
@ -121,7 +127,7 @@ func (h *Handler) byBucketname(req *fasthttp.RequestCtx, f func(context.Context,
|
|||
}
|
||||
|
||||
var addr oid.Address
|
||||
addr.SetContainer(*cnrID)
|
||||
addr.SetContainer(bktInfo.CID)
|
||||
addr.SetObject(foundOid.OID)
|
||||
|
||||
f(ctx, *h.newRequest(req, log), addr)
|
||||
|
@ -175,3 +181,65 @@ func (h *Handler) byAttribute(c *fasthttp.RequestCtx, f func(context.Context, re
|
|||
|
||||
f(ctx, *h.newRequest(c, log), addrObj)
|
||||
}
|
||||
|
||||
// resolveContainer decode container id, if it's not a valid container id
|
||||
// then trey to resolve name using provided resolver.
|
||||
func (h *Handler) resolveContainer(ctx context.Context, containerID string) (*cid.ID, error) {
|
||||
cnrID := new(cid.ID)
|
||||
err := cnrID.DecodeString(containerID)
|
||||
if err != nil {
|
||||
cnrID, err = h.containerResolver.Resolve(ctx, containerID)
|
||||
if err != nil && strings.Contains(err.Error(), "not found") {
|
||||
err = fmt.Errorf("%w: %s", &apistatus.ContainerNotFound{}, err.Error())
|
||||
|
||||
}
|
||||
}
|
||||
return cnrID, err
|
||||
}
|
||||
|
||||
func (h *Handler) getBucketInfo(ctx context.Context, containerName string, log *zap.Logger) (*data.BucketInfo, error) {
|
||||
if bktInfo := h.cache.Get(containerName); bktInfo != nil {
|
||||
return bktInfo, nil
|
||||
}
|
||||
|
||||
cnrID, err := h.resolveContainer(ctx, containerName)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
bktInfo, err := h.readContainer(ctx, *cnrID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if err = h.cache.Put(bktInfo); err != nil {
|
||||
log.Warn(logs.CouldntPutBucketIntoCache,
|
||||
zap.String("bucket name", bktInfo.Name),
|
||||
zap.Stringer("bucket cid", bktInfo.CID),
|
||||
zap.Error(err))
|
||||
}
|
||||
|
||||
return bktInfo, nil
|
||||
}
|
||||
|
||||
func (h *Handler) readContainer(ctx context.Context, cnrID cid.ID) (*data.BucketInfo, error) {
|
||||
prm := pool.PrmContainerGet{ContainerID: cnrID}
|
||||
res, err := h.pool.GetContainer(ctx, prm)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("get frostfs container '%s': %w", cnrID.String(), err)
|
||||
}
|
||||
|
||||
bktInfo := &data.BucketInfo{
|
||||
CID: cnrID,
|
||||
Name: cnrID.EncodeToString(),
|
||||
}
|
||||
|
||||
if domain := container.ReadDomain(res); domain.Name() != "" {
|
||||
bktInfo.Name = domain.Name()
|
||||
bktInfo.Zone = domain.Zone()
|
||||
}
|
||||
|
||||
bktInfo.HomomorphicHashDisabled = container.IsHomomorphicHashingDisabled(res)
|
||||
|
||||
return bktInfo, err
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue