[#174] Add kludge additional search #180

Merged
alexvanin merged 1 commit from r.loginov/frostfs-http-gw:feature/174-add_kludge_additional_search into master 2024-12-16 10:43:34 +00:00
9 changed files with 256 additions and 35 deletions

View file

@ -95,21 +95,22 @@ type (
dialerSource *internalnet.DialerSource dialerSource *internalnet.DialerSource
workerPoolSize int workerPoolSize int
mu sync.RWMutex mu sync.RWMutex
defaultTimestamp bool defaultTimestamp bool
zipCompression bool zipCompression bool
clientCut bool clientCut bool
returnIndexPage bool returnIndexPage bool
indexPageTemplate string indexPageTemplate string
bufferMaxSizeForPut uint64 bufferMaxSizeForPut uint64
namespaceHeader string namespaceHeader string
defaultNamespaces []string defaultNamespaces []string
corsAllowOrigin string corsAllowOrigin string
corsAllowMethods []string corsAllowMethods []string
corsAllowHeaders []string corsAllowHeaders []string
corsExposeHeaders []string corsExposeHeaders []string
corsAllowCredentials bool corsAllowCredentials bool
corsMaxAge int corsMaxAge int
enableFilepathFallback bool
} }
CORS struct { CORS struct {
@ -189,6 +190,7 @@ func (s *appSettings) update(v *viper.Viper, l *zap.Logger) {
corsExposeHeaders := v.GetStringSlice(cfgCORSExposeHeaders) corsExposeHeaders := v.GetStringSlice(cfgCORSExposeHeaders)
corsAllowCredentials := v.GetBool(cfgCORSAllowCredentials) corsAllowCredentials := v.GetBool(cfgCORSAllowCredentials)
corsMaxAge := fetchCORSMaxAge(v) corsMaxAge := fetchCORSMaxAge(v)
enableFilepathFallback := v.GetBool(cfgFeaturesEnableFilepathFallback)
s.mu.Lock() s.mu.Lock()
defer s.mu.Unlock() defer s.mu.Unlock()
@ -208,6 +210,7 @@ func (s *appSettings) update(v *viper.Viper, l *zap.Logger) {
s.corsExposeHeaders = corsExposeHeaders s.corsExposeHeaders = corsExposeHeaders
s.corsAllowCredentials = corsAllowCredentials s.corsAllowCredentials = corsAllowCredentials
s.corsMaxAge = corsMaxAge s.corsMaxAge = corsMaxAge
s.enableFilepathFallback = enableFilepathFallback
} }
func (s *loggerSettings) DroppedLogsInc() { func (s *loggerSettings) DroppedLogsInc() {
@ -305,6 +308,12 @@ func (s *appSettings) FormContainerZone(ns string) (zone string, isDefault bool)
return ns + ".ns", false return ns + ".ns", false
} }
func (s *appSettings) EnableFilepathFallback() bool {
s.mu.RLock()
defer s.mu.RUnlock()
return s.enableFilepathFallback
}
func (a *app) initResolver() { func (a *app) initResolver() {
var err error var err error
a.resolver, err = resolver.NewContainerResolver(a.getResolverConfig()) a.resolver, err = resolver.NewContainerResolver(a.getResolverConfig())

View file

@ -164,6 +164,9 @@ const (
cfgMultinetFallbackDelay = "multinet.fallback_delay" cfgMultinetFallbackDelay = "multinet.fallback_delay"
cfgMultinetSubnets = "multinet.subnets" cfgMultinetSubnets = "multinet.subnets"
// Feature.
cfgFeaturesEnableFilepathFallback = "features.enable_filepath_fallback"
// Command line args. // Command line args.
cmdHelp = "help" cmdHelp = "help"
cmdVersion = "version" cmdVersion = "version"

View file

@ -158,4 +158,7 @@ HTTP_GW_WORKER_POOL_SIZE=1000
# Enable index page support # Enable index page support
HTTP_GW_INDEX_PAGE_ENABLED=false HTTP_GW_INDEX_PAGE_ENABLED=false
# Index page template path # Index page template path
HTTP_GW_INDEX_PAGE_TEMPLATE_PATH=internal/handler/templates/index.gotmpl HTTP_GW_INDEX_PAGE_TEMPLATE_PATH=internal/handler/templates/index.gotmpl
# Enable using fallback path to search for a object by attribute
HTTP_GW_FEATURES_ENABLE_FILEPATH_FALLBACK=false

View file

@ -172,3 +172,7 @@ multinet:
source_ips: source_ips:
- 1.2.3.4 - 1.2.3.4
- 1.2.3.5 - 1.2.3.5
features:
# Enable using fallback path to search for a object by attribute
enable_filepath_fallback: false

View file

@ -59,7 +59,7 @@ $ cat http.log
| `resolve_bucket` | [Bucket name resolving configuration](#resolve_bucket-section) | | `resolve_bucket` | [Bucket name resolving configuration](#resolve_bucket-section) |
| `index_page` | [Index page configuration](#index_page-section) | | `index_page` | [Index page configuration](#index_page-section) |
| `multinet` | [Multinet configuration](#multinet-section) | | `multinet` | [Multinet configuration](#multinet-section) |
| `features` | [Features configuration](#features-section) |
# General section # General section
@ -457,3 +457,16 @@ multinet:
|--------------|------------|---------------|---------------|----------------------------------------------------------------------| |--------------|------------|---------------|---------------|----------------------------------------------------------------------|
| `mask` | `string` | yes | | Destination subnet. | | `mask` | `string` | yes | | Destination subnet. |
| `source_ips` | `[]string` | yes | | Array of source IP addresses to use when dialing destination subnet. | | `source_ips` | `[]string` | yes | | Array of source IP addresses to use when dialing destination subnet. |
# `features` section
Contains parameters for enabling features.
```yaml
features:
enable_filepath_fallback: true
```
| Parameter | Type | SIGHUP reload | Default value | Description |
| ----------------------------------- | ------ | ------------- | ------------- | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `features.enable_filepath_fallback` | `bool` | yes | `false` | Enable using fallback path to search for a object by attribute. If the value of the `FilePath` attribute in the request contains no `/` symbols or single leading `/` symbol and the object was not found, then an attempt is made to search for the object by the attribute `FileName`. |

View file

@ -26,6 +26,7 @@ const (
attrOID = "OID" attrOID = "OID"
attrCreated = "Created" attrCreated = "Created"
attrFileName = "FileName" attrFileName = "FileName"
attrFilePath = "FilePath"
attrSize = "Size" attrSize = "Size"
) )

View file

@ -35,6 +35,7 @@ type Config interface {
IndexPageTemplate() string IndexPageTemplate() string
BufferMaxSizeForPut() uint64 BufferMaxSizeForPut() uint64
NamespaceHeader() string NamespaceHeader() string
EnableFilepathFallback() bool
} }
// PrmContainer groups parameters of FrostFS.Container operation. // PrmContainer groups parameters of FrostFS.Container operation.
@ -291,35 +292,58 @@ func (h *Handler) byAttribute(c *fasthttp.RequestCtx, f func(context.Context, re
return return
} }
res, err := h.search(ctx, bktInfo.CID, key, val, object.MatchStringEqual) objID, err := h.findObjectByAttribute(ctx, log, bktInfo.CID, key, val)
if err != nil { if err != nil {
log.Error(logs.CouldNotSearchForObjects, zap.Error(err)) if errors.Is(err, io.EOF) {
response.Error(c, "could not search for objects: "+err.Error(), fasthttp.StatusBadRequest) response.Error(c, err.Error(), fasthttp.StatusNotFound)
return
}
response.Error(c, err.Error(), fasthttp.StatusBadRequest)
return return
} }
var addrObj oid.Address
addrObj.SetContainer(bktInfo.CID)
addrObj.SetObject(objID)
f(ctx, *h.newRequest(c, log), addrObj)
}
func (h *Handler) findObjectByAttribute(ctx context.Context, log *zap.Logger, cnrID cid.ID, attrKey, attrVal string) (oid.ID, error) {
res, err := h.search(ctx, cnrID, attrKey, attrVal, object.MatchStringEqual)
if err != nil {
log.Error(logs.CouldNotSearchForObjects, zap.Error(err))
return oid.ID{}, fmt.Errorf("could not search for objects: %w", err)
}
defer res.Close() defer res.Close()
buf := make([]oid.ID, 1) buf := make([]oid.ID, 1)
n, err := res.Read(buf) n, err := res.Read(buf)
if n == 0 { if n == 0 {
if errors.Is(err, io.EOF) { switch {
case errors.Is(err, io.EOF) && h.needSearchByFileName(attrKey, attrVal):
log.Debug(logs.ObjectNotFoundByFilePathTrySearchByFileName)
return h.findObjectByAttribute(ctx, log, cnrID, attrFileName, attrVal)
case errors.Is(err, io.EOF):
log.Error(logs.ObjectNotFound, zap.Error(err)) log.Error(logs.ObjectNotFound, zap.Error(err))
response.Error(c, "object not found", fasthttp.StatusNotFound) return oid.ID{}, fmt.Errorf("object not found: %w", err)
return default:
log.Error(logs.ReadObjectListFailed, zap.Error(err))
return oid.ID{}, fmt.Errorf("read object list failed: %w", err)
} }
log.Error(logs.ReadObjectListFailed, zap.Error(err))
response.Error(c, "read object list failed: "+err.Error(), fasthttp.StatusBadRequest)
return
} }
var addrObj oid.Address return buf[0], nil
addrObj.SetContainer(bktInfo.CID) }
addrObj.SetObject(buf[0])
f(ctx, *h.newRequest(c, log), addrObj) func (h *Handler) needSearchByFileName(key, val string) bool {
if key != attrFilePath || !h.config.EnableFilepathFallback() {
return false
}
return strings.HasPrefix(val, "/") && strings.Count(val, "/") == 1 || !strings.Contains(val, "/")
} }
// resolveContainer decode container id, if it's not a valid container id // resolveContainer decode container id, if it's not a valid container id

View file

@ -44,6 +44,7 @@ func (t *treeClientMock) GetSubTree(context.Context, *data.BucketInfo, string, [
} }
type configMock struct { type configMock struct {
additionalSearch bool
} }
func (c *configMock) DefaultTimestamp() bool { func (c *configMock) DefaultTimestamp() bool {
@ -78,6 +79,10 @@ func (c *configMock) NamespaceHeader() string {
return "" return ""
} }
func (c *configMock) EnableFilepathFallback() bool {
return c.additionalSearch
}
type handlerContext struct { type handlerContext struct {
key *keys.PrivateKey key *keys.PrivateKey
owner user.ID owner user.ID
@ -199,10 +204,8 @@ func TestBasic(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
obj := hc.frostfs.objects[putRes.ContainerID+"/"+putRes.ObjectID] obj := hc.frostfs.objects[putRes.ContainerID+"/"+putRes.ObjectID]
attr := object.NewAttribute() attr := prepareObjectAttributes(object.AttributeFilePath, objFileName)
attr.SetKey(object.AttributeFilePath) obj.SetAttributes(append(obj.Attributes(), attr)...)
attr.SetValue(objFileName)
obj.SetAttributes(append(obj.Attributes(), *attr)...)
t.Run("get", func(t *testing.T) { t.Run("get", func(t *testing.T) {
r = prepareGetRequest(ctx, cnrID.EncodeToString(), putRes.ObjectID) r = prepareGetRequest(ctx, cnrID.EncodeToString(), putRes.ObjectID)
@ -251,6 +254,159 @@ func TestBasic(t *testing.T) {
}) })
} }
func TestFindObjectByAttribute(t *testing.T) {
hc, err := prepareHandlerContext()
require.NoError(t, err)
hc.cfg.additionalSearch = true
bktName := "bucket"
cnrID, cnr, err := hc.prepareContainer(bktName, acl.PublicRWExtended)
require.NoError(t, err)
hc.frostfs.SetContainer(cnrID, cnr)
ctx := context.Background()
ctx = middleware.SetNamespace(ctx, "")
content := "hello"
r, err := prepareUploadRequest(ctx, cnrID.EncodeToString(), content)
require.NoError(t, err)
hc.Handler().Upload(r)
require.Equal(t, r.Response.StatusCode(), http.StatusOK)
var putRes putResponse
err = json.Unmarshal(r.Response.Body(), &putRes)
require.NoError(t, err)
testAttrVal1 := "test-attr-val1"
testAttrVal2 := "test-attr-val2"
testAttrVal3 := "test-attr-val3"
for _, tc := range []struct {
name string
firstAttr object.Attribute
secondAttr object.Attribute
reqAttrKey string
reqAttrValue string
err string
additionalSearch bool
}{
{
name: "success search by FileName",
firstAttr: prepareObjectAttributes(attrFilePath, testAttrVal1),
secondAttr: prepareObjectAttributes(attrFileName, testAttrVal2),
reqAttrKey: attrFileName,
reqAttrValue: testAttrVal2,
additionalSearch: false,
},
{
name: "failed search by FileName",
firstAttr: prepareObjectAttributes(attrFilePath, testAttrVal1),
secondAttr: prepareObjectAttributes(attrFileName, testAttrVal2),
reqAttrKey: attrFileName,
reqAttrValue: testAttrVal3,
err: "not found",
additionalSearch: false,
},
{
name: "success search by FilePath (with additional search)",
firstAttr: prepareObjectAttributes(attrFilePath, testAttrVal1),
secondAttr: prepareObjectAttributes(attrFileName, testAttrVal2),
reqAttrKey: attrFilePath,
reqAttrValue: testAttrVal2,
additionalSearch: true,
},
{
name: "failed by FilePath (with additional search)",
firstAttr: prepareObjectAttributes(attrFilePath, testAttrVal1),
secondAttr: prepareObjectAttributes(attrFileName, testAttrVal2),
reqAttrKey: attrFilePath,
reqAttrValue: testAttrVal3,
err: "not found",
additionalSearch: true,
},
} {
t.Run(tc.name, func(t *testing.T) {
obj := hc.frostfs.objects[putRes.ContainerID+"/"+putRes.ObjectID]
obj.SetAttributes(tc.firstAttr, tc.secondAttr)
hc.cfg.additionalSearch = tc.additionalSearch
objID, err := hc.Handler().findObjectByAttribute(ctx, hc.Handler().log, cnrID, tc.reqAttrKey, tc.reqAttrValue)
if tc.err != "" {
require.Error(t, err)
require.Contains(t, err.Error(), tc.err)
return
}
require.NoError(t, err)
require.Equal(t, putRes.ObjectID, objID.EncodeToString())
})
}
}
func TestNeedSearchByFileName(t *testing.T) {
hc, err := prepareHandlerContext()
require.NoError(t, err)
for _, tc := range []struct {
name string
attrKey string
attrVal string
additionalSearch bool
expected bool
}{
{
name: "need search - not contains slash",
attrKey: attrFilePath,
attrVal: "cat.png",
additionalSearch: true,
expected: true,
},
{
name: "need search - single lead slash",
attrKey: attrFilePath,
attrVal: "/cat.png",
additionalSearch: true,
expected: true,
},
{
name: "don't need search - single slash but not lead",
attrKey: attrFilePath,
attrVal: "cats/cat.png",
additionalSearch: true,
expected: false,
},
{
name: "don't need search - more one slash",
attrKey: attrFilePath,
attrVal: "/cats/cat.png",
additionalSearch: true,
expected: false,
},
{
name: "don't need search - incorrect attribute key",
attrKey: attrFileName,
attrVal: "cat.png",
additionalSearch: true,
expected: false,
},
{
name: "don't need search - additional search disabled",
attrKey: attrFilePath,
attrVal: "cat.png",
additionalSearch: false,
expected: false,
},
} {
t.Run(tc.name, func(t *testing.T) {
hc.cfg.additionalSearch = tc.additionalSearch
res := hc.h.needSearchByFileName(tc.attrKey, tc.attrVal)
require.Equal(t, tc.expected, res)
})
}
}
func prepareUploadRequest(ctx context.Context, bucket, content string) (*fasthttp.RequestCtx, error) { func prepareUploadRequest(ctx context.Context, bucket, content string) (*fasthttp.RequestCtx, error) {
r := new(fasthttp.RequestCtx) r := new(fasthttp.RequestCtx)
utils.SetContextToRequest(ctx, r) utils.SetContextToRequest(ctx, r)
@ -283,6 +439,13 @@ func prepareGetZipped(ctx context.Context, bucket, prefix string) *fasthttp.Requ
return r return r
} }
func prepareObjectAttributes(attrKey, attrValue string) object.Attribute {
attr := object.NewAttribute()
attr.SetKey(attrKey)
attr.SetValue(attrValue)
return *attr
}
const ( const (
keyAttr = "User-Attribute" keyAttr = "User-Attribute"
valAttr = "user value" valAttr = "user value"

View file

@ -87,4 +87,5 @@ const (
MultinetDialFail = "multinet dial failed" MultinetDialFail = "multinet dial failed"
FailedToLoadMultinetConfig = "failed to load multinet config" FailedToLoadMultinetConfig = "failed to load multinet config"
MultinetConfigWontBeUpdated = "multinet config won't be updated" MultinetConfigWontBeUpdated = "multinet config won't be updated"
ObjectNotFoundByFilePathTrySearchByFileName = "object not found by filePath attribute, try search by fileName"
) )