[#186] Refactor objectSearch, fix objectPut

Fixed panic in objectPut() if input reader is nil
Made objectSearch searching by any attribute

Signed-off-by: Angira Kekteeva <kira@nspcc.ru>
This commit is contained in:
Angira Kekteeva 2021-11-13 23:35:50 +03:00 committed by Alex Vanin
parent befe084900
commit 284a560ea6
3 changed files with 42 additions and 20 deletions

View file

@ -587,7 +587,7 @@ func (n *layer) deleteObject(ctx context.Context, bkt *data.BucketInfo, obj *Ver
obj.DeleteMarkVersion = objInfo.Version() obj.DeleteMarkVersion = objInfo.Version()
} }
} else { } else {
ids, err = n.objectSearch(ctx, &findParams{cid: bkt.CID, val: obj.Name}) ids, err = n.objectSearchByName(ctx, bkt.CID, obj.Name)
if err != nil { if err != nil {
obj.Error = err obj.Error = err
return obj return obj

View file

@ -22,12 +22,16 @@ import (
type ( type (
findParams struct { findParams struct {
attr string filters []filter
val string
cid *cid.ID cid *cid.ID
prefix string prefix string
} }
filter struct {
attr string
val string
}
getParams struct { getParams struct {
io.Writer io.Writer
*object.Range *object.Range
@ -68,21 +72,29 @@ type (
} }
) )
func (n *layer) objectSearchByName(ctx context.Context, cid *cid.ID, filename string) ([]*object.ID, error) {
f := &findParams{
filters: []filter{{attr: object.AttributeFileName, val: filename}},
cid: cid,
prefix: "",
}
return n.objectSearch(ctx, f)
}
// objectSearch returns all available objects by search params. // objectSearch returns all available objects by search params.
func (n *layer) objectSearch(ctx context.Context, p *findParams) ([]*object.ID, error) { func (n *layer) objectSearch(ctx context.Context, p *findParams) ([]*object.ID, error) {
var opts object.SearchFilters var opts object.SearchFilters
opts.AddRootFilter() opts.AddRootFilter()
if filename, err := url.QueryUnescape(p.val); err != nil { for _, filter := range p.filters {
if val, err := url.QueryUnescape(filter.val); err != nil {
return nil, err return nil, err
} else if filename != "" { } else if val != "" {
if p.attr == "" { opts.AddFilter(filter.attr, val, object.MatchStringEqual)
opts.AddFilter(object.AttributeFileName, filename, object.MatchStringEqual)
} else {
opts.AddFilter(p.attr, filename, object.MatchStringEqual)
} }
} }
if prefix, err := url.QueryUnescape(p.prefix); err != nil { if prefix, err := url.QueryUnescape(p.prefix); err != nil {
return nil, err return nil, err
} else if prefix != "" { } else if prefix != "" {
@ -142,6 +154,7 @@ func (n *layer) objectPut(ctx context.Context, bkt *data.BucketInfo, p *PutObjec
idsToDeleteArr := updateCRDT2PSetHeaders(p.Header, versions, versioningEnabled) idsToDeleteArr := updateCRDT2PSetHeaders(p.Header, versions, versioningEnabled)
r := p.Reader r := p.Reader
if r != nil {
if len(p.Header[api.ContentType]) == 0 { if len(p.Header[api.ContentType]) == 0 {
d := newDetector(r) d := newDetector(r)
if contentType, err := d.Detect(); err == nil { if contentType, err := d.Detect(); err == nil {
@ -149,6 +162,7 @@ func (n *layer) objectPut(ctx context.Context, bkt *data.BucketInfo, p *PutObjec
} }
r = d.MultiReader() r = d.MultiReader()
} }
}
rawObject := formRawObject(p, bkt.CID, own, obj) rawObject := formRawObject(p, bkt.CID, own, obj)
ops := new(client.PutObjectParams).WithObject(rawObject.Object()).WithPayloadReader(r) ops := new(client.PutObjectParams).WithObject(rawObject.Object()).WithPayloadReader(r)
@ -307,7 +321,7 @@ func (n *layer) headLastVersionIfNotDeleted(ctx context.Context, bkt *data.Bucke
} }
func (n *layer) headVersions(ctx context.Context, bkt *data.BucketInfo, objectName string) (*objectVersions, error) { func (n *layer) headVersions(ctx context.Context, bkt *data.BucketInfo, objectName string) (*objectVersions, error) {
ids, err := n.objectSearch(ctx, &findParams{cid: bkt.CID, val: objectName}) ids, err := n.objectSearchByName(ctx, bkt.CID, objectName)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View file

@ -44,7 +44,11 @@ func (n *layer) headSystemObject(ctx context.Context, bkt *data.BucketInfo, objN
} }
func (n *layer) deleteSystemObject(ctx context.Context, bktInfo *data.BucketInfo, name string) error { func (n *layer) deleteSystemObject(ctx context.Context, bktInfo *data.BucketInfo, name string) error {
ids, err := n.objectSearch(ctx, &findParams{cid: bktInfo.CID, attr: objectSystemAttributeName, val: name}) f := &findParams{
filters: []filter{{attr: objectSystemAttributeName, val: name}},
cid: bktInfo.CID,
}
ids, err := n.objectSearch(ctx, f)
if err != nil { if err != nil {
return err return err
} }
@ -166,7 +170,11 @@ func (n *layer) getCORS(ctx context.Context, bkt *data.BucketInfo, sysName strin
} }
func (n *layer) headSystemVersions(ctx context.Context, bkt *data.BucketInfo, sysName string) (*objectVersions, error) { func (n *layer) headSystemVersions(ctx context.Context, bkt *data.BucketInfo, sysName string) (*objectVersions, error) {
ids, err := n.objectSearch(ctx, &findParams{cid: bkt.CID, attr: objectSystemAttributeName, val: sysName}) f := &findParams{
filters: []filter{{attr: objectSystemAttributeName, val: sysName}},
cid: bkt.CID,
}
ids, err := n.objectSearch(ctx, f)
if err != nil { if err != nil {
return nil, err return nil, err
} }