[#611] Split name processing from object info creation procedure

Signed-off-by: Alex Vanin <alexey@nspcc.ru>
This commit is contained in:
Alex Vanin 2022-07-22 19:54:11 +03:00 committed by Alex Vanin
parent 97f81d3270
commit e0136feb73
3 changed files with 51 additions and 71 deletions

View file

@ -267,7 +267,7 @@ func (n *layer) headLastVersionIfNotDeleted(ctx context.Context, bkt *data.Bucke
if err != nil { if err != nil {
return nil, err return nil, err
} }
objInfo := objInfoFromMeta(bkt, meta) objInfo := objectInfoFromMeta(bkt, meta)
if err = n.objCache.PutObject(objInfo); err != nil { if err = n.objCache.PutObject(objInfo); err != nil {
n.log.Warn("couldn't put object info to cache", n.log.Warn("couldn't put object info to cache",
zap.Stringer("object id", node.OID), zap.Stringer("object id", node.OID),
@ -330,7 +330,7 @@ func (n *layer) headVersion(ctx context.Context, bkt *data.BucketInfo, p *HeadOb
return nil, err return nil, err
} }
objInfo := objInfoFromMeta(bkt, meta) objInfo := objectInfoFromMeta(bkt, meta)
if err = n.objCache.PutObject(objInfo); err != nil { if err = n.objCache.PutObject(objInfo); err != nil {
n.log.Warn("couldn't put obj to object cache", n.log.Warn("couldn't put obj to object cache",
zap.String("bucket name", objInfo.Bucket), zap.String("bucket name", objInfo.Bucket),
@ -670,41 +670,23 @@ func triageExtendedObjects(allObjects []*data.ExtendedObjectInfo) (prefixes []st
return return
} }
func (n *layer) objectInfoFromObjectsCacheOrNeoFS(ctx context.Context, bktInfo *data.BucketInfo, obj oid.ID, prefix, delimiter string) *data.ObjectInfo { func (n *layer) objectInfoFromObjectsCacheOrNeoFS(ctx context.Context, bktInfo *data.BucketInfo, obj oid.ID, prefix, delimiter string) (oi *data.ObjectInfo) {
if objInfo := n.objCache.GetObject(newAddress(bktInfo.CID, obj)); objInfo != nil { oi = n.objCache.GetObject(newAddress(bktInfo.CID, obj))
// that's the simplest solution
// consider doing something else if oi == nil {
if !strings.HasPrefix(objInfo.Name, prefix) { meta, err := n.objectHead(ctx, bktInfo, obj)
if err != nil {
n.log.Warn("could not fetch object meta", zap.Error(err))
return nil return nil
} }
if len(delimiter) == 0 {
return objInfo oi = objectInfoFromMeta(bktInfo, meta)
if err = n.objCache.PutObject(oi); err != nil {
n.log.Warn("couldn't cache an object", zap.Error(err))
} }
copiedObjInfo := *objInfo
tail := strings.TrimPrefix(copiedObjInfo.Name, prefix)
index := strings.Index(tail, delimiter)
if index >= 0 {
copiedObjInfo.IsDir = true
copiedObjInfo.Size = 0
copiedObjInfo.Headers = nil
copiedObjInfo.ContentType = ""
copiedObjInfo.Name = prefix + tail[:index+1]
}
return &copiedObjInfo
} }
meta, err := n.objectHead(ctx, bktInfo, obj) return processObjectInfoName(oi, prefix, delimiter)
if err != nil {
n.log.Warn("could not fetch object meta", zap.Error(err))
return nil
}
objInfo := objectInfoFromMeta(bktInfo, meta, prefix, delimiter)
if err = n.objCache.PutObject(objInfo); err != nil {
n.log.Warn("couldn't cache an object", zap.Error(err))
}
return objInfo
} }
func (n *layer) transformNeofsError(ctx context.Context, err error) error { func (n *layer) transformNeofsError(ctx context.Context, err error) error {

View file

@ -68,49 +68,23 @@ func userHeaders(attrs []object.Attribute) map[string]string {
return result return result
} }
func objInfoFromMeta(bkt *data.BucketInfo, meta *object.Object) *data.ObjectInfo { func objectInfoFromMeta(bkt *data.BucketInfo, meta *object.Object) *data.ObjectInfo {
return objectInfoFromMeta(bkt, meta, "", "")
}
func objectInfoFromMeta(bkt *data.BucketInfo, meta *object.Object, prefix, delimiter string) *data.ObjectInfo {
var ( var (
isDir bool
size int64
mimeType string mimeType string
creation time.Time creation time.Time
filename = filenameFromObject(meta)
) )
if !strings.HasPrefix(filename, prefix) { headers := userHeaders(meta.Attributes())
return nil delete(headers, object.AttributeFileName)
} if contentType, ok := headers[object.AttributeContentType]; ok {
userHeaders := userHeaders(meta.Attributes())
delete(userHeaders, object.AttributeFileName)
if contentType, ok := userHeaders[object.AttributeContentType]; ok {
mimeType = contentType mimeType = contentType
delete(userHeaders, object.AttributeContentType) delete(headers, object.AttributeContentType)
} }
if val, ok := userHeaders[object.AttributeTimestamp]; !ok { if val, ok := headers[object.AttributeTimestamp]; !ok {
// ignore empty value // ignore empty value
} else if dt, err := strconv.ParseInt(val, 10, 64); err == nil { } else if dt, err := strconv.ParseInt(val, 10, 64); err == nil {
creation = time.Unix(dt, 0) creation = time.Unix(dt, 0)
delete(userHeaders, object.AttributeTimestamp) delete(headers, object.AttributeTimestamp)
}
if len(delimiter) > 0 {
tail := strings.TrimPrefix(filename, prefix)
index := strings.Index(tail, delimiter)
if index >= 0 {
isDir = true
mimeType = ""
filename = prefix + tail[:index+1]
userHeaders = nil
} else {
size = int64(meta.PayloadSize())
}
} else {
size = int64(meta.PayloadSize())
} }
objID, _ := meta.ID() objID, _ := meta.ID()
@ -118,19 +92,43 @@ func objectInfoFromMeta(bkt *data.BucketInfo, meta *object.Object, prefix, delim
return &data.ObjectInfo{ return &data.ObjectInfo{
ID: objID, ID: objID,
CID: bkt.CID, CID: bkt.CID,
IsDir: isDir, IsDir: false,
Bucket: bkt.Name, Bucket: bkt.Name,
Name: filename, Name: filenameFromObject(meta),
Created: creation, Created: creation,
ContentType: mimeType, ContentType: mimeType,
Headers: userHeaders, Headers: headers,
Owner: *meta.OwnerID(), Owner: *meta.OwnerID(),
Size: size, Size: int64(meta.PayloadSize()),
HashSum: hex.EncodeToString(payloadChecksum.Value()), HashSum: hex.EncodeToString(payloadChecksum.Value()),
} }
} }
// processObjectInfoName fixes name in objectInfo structure based on prefix and
// delimiter from user request. If name does not contain prefix, nil value is
// returned. If name should be modified, then function returns copy of objectInfo
// structure.
func processObjectInfoName(oi *data.ObjectInfo, prefix, delimiter string) *data.ObjectInfo {
if !strings.HasPrefix(oi.Name, prefix) {
return nil
}
if len(delimiter) == 0 {
return oi
}
copiedObjInfo := *oi
tail := strings.TrimPrefix(copiedObjInfo.Name, prefix)
index := strings.Index(tail, delimiter)
if index >= 0 {
copiedObjInfo.IsDir = true
copiedObjInfo.Size = 0
copiedObjInfo.Headers = nil
copiedObjInfo.ContentType = ""
copiedObjInfo.Name = prefix + tail[:index+1]
}
return &copiedObjInfo
}
func filenameFromObject(o *object.Object) string { func filenameFromObject(o *object.Object) string {
for _, attr := range o.Attributes() { for _, attr := range o.Attributes() {
if attr.Key() == object.AttributeFileName { if attr.Key() == object.AttributeFileName {

View file

@ -72,7 +72,7 @@ func newTestInfo(obj oid.ID, bkt *data.BucketInfo, name string, isDir bool) *dat
return info return info
} }
func Test_objectInfoFromMeta(t *testing.T) { func Test_objectInfoName(t *testing.T) {
var uid user.ID var uid user.ID
var id oid.ID var id oid.ID
var containerID cid.ID var containerID cid.ID
@ -158,7 +158,7 @@ func Test_objectInfoFromMeta(t *testing.T) {
for _, tc := range cases { for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) { t.Run(tc.name, func(t *testing.T) {
info := objectInfoFromMeta(bkt, tc.object, tc.prefix, tc.delimiter) info := processObjectInfoName(objectInfoFromMeta(bkt, tc.object), tc.prefix, tc.delimiter)
require.Equal(t, tc.result, info) require.Equal(t, tc.result, info)
}) })
} }