[#165] listing: Use NodeVersion instead of ObjectInfo
Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
This commit is contained in:
parent
3e20f736a6
commit
4ad84b9b94
16 changed files with 308 additions and 509 deletions
2
api/cache/listsession.go
vendored
2
api/cache/listsession.go
vendored
|
@ -55,7 +55,7 @@ func NewListSessionCache(config *Config) *ListSessionCache {
|
|||
zap.String("expected", fmt.Sprintf("%T", session)))
|
||||
}
|
||||
|
||||
//session.Cancel()
|
||||
// todo session.Cancel()
|
||||
}).Build()
|
||||
return &ListSessionCache{cache: gc, logger: config.Logger}
|
||||
}
|
||||
|
|
|
@ -37,8 +37,6 @@ type (
|
|||
ObjectInfo struct {
|
||||
ID oid.ID
|
||||
CID cid.ID
|
||||
IsDir bool
|
||||
IsDeleteMarker bool
|
||||
|
||||
Bucket string
|
||||
Name string
|
||||
|
|
|
@ -10,7 +10,7 @@ type VersionsStream interface {
|
|||
|
||||
// todo consider thread safe
|
||||
type ListSession struct {
|
||||
Next *ObjectInfo
|
||||
Next *NodeVersion
|
||||
Stream VersionsStream
|
||||
NamesMap map[string]struct{}
|
||||
Context context.Context
|
||||
|
|
|
@ -16,20 +16,22 @@ const (
|
|||
// NodeVersion represent node from tree service.
|
||||
type NodeVersion struct {
|
||||
BaseNodeVersion
|
||||
DeleteMarker *DeleteMarkerInfo
|
||||
IsUnversioned bool
|
||||
IsCombined bool
|
||||
}
|
||||
|
||||
func (v NodeVersion) IsDeleteMarker() bool {
|
||||
return v.DeleteMarker != nil
|
||||
// ExtendedNodeVersion contains additional node info to be able to sort versions by timestamp.
|
||||
type ExtendedNodeVersion struct {
|
||||
NodeVersion *NodeVersion
|
||||
IsLatest bool
|
||||
}
|
||||
|
||||
// DeleteMarkerInfo is used to save object info if node in the tree service is delete marker.
|
||||
// We need this information because the "delete marker" object is no longer stored in FrostFS.
|
||||
type DeleteMarkerInfo struct {
|
||||
Created time.Time
|
||||
Owner user.ID
|
||||
func (e ExtendedNodeVersion) Version() string {
|
||||
if e.NodeVersion.IsUnversioned {
|
||||
return UnversionedObjectVersionID
|
||||
}
|
||||
|
||||
return e.NodeVersion.OID.EncodeToString()
|
||||
}
|
||||
|
||||
// ExtendedObjectInfo contains additional node info to be able to sort versions by timestamp.
|
||||
|
@ -54,10 +56,30 @@ type BaseNodeVersion struct {
|
|||
ParenID uint64
|
||||
OID oid.ID
|
||||
Timestamp uint64
|
||||
Size uint64
|
||||
Size uint64 // todo discuss if it is possible don't support correct obj size for ola objects
|
||||
ETag string
|
||||
MD5 string
|
||||
FilePath string
|
||||
Created *time.Time
|
||||
Owner *user.ID
|
||||
IsDeleteMarker bool
|
||||
}
|
||||
|
||||
func (v *BaseNodeVersion) GetETag(md5Enabled bool) string {
|
||||
if md5Enabled && len(v.MD5) > 0 {
|
||||
return v.MD5
|
||||
}
|
||||
return v.ETag
|
||||
}
|
||||
|
||||
// IsFilledExtra returns true is node was created by version of gate v0.29.x and later
|
||||
func (v BaseNodeVersion) IsFilledExtra() bool {
|
||||
return v.Created != nil && v.Owner != nil
|
||||
}
|
||||
|
||||
func (v *BaseNodeVersion) FillExtra(objInfo *ObjectInfo) {
|
||||
v.Owner = &objInfo.Owner
|
||||
v.Created = &objInfo.Created
|
||||
}
|
||||
|
||||
type ObjectTaggingInfo struct {
|
||||
|
|
|
@ -137,7 +137,7 @@ func writeAttributesHeaders(h http.Header, info *data.ExtendedObjectInfo, isBuck
|
|||
h.Set(api.AmzVersionID, info.Version())
|
||||
}
|
||||
|
||||
if info.NodeVersion.IsDeleteMarker() {
|
||||
if info.NodeVersion.IsDeleteMarker {
|
||||
h.Set(api.AmzDeleteMarker, strconv.FormatBool(true))
|
||||
}
|
||||
|
||||
|
@ -191,7 +191,7 @@ func encodeToObjectAttributesResponse(info *data.ObjectInfo, p *GetObjectAttribu
|
|||
case storageClass:
|
||||
resp.StorageClass = api.DefaultStorageClass
|
||||
case objectSize:
|
||||
resp.ObjectSize = info.Size
|
||||
resp.ObjectSize = info.Size // todo probably we need to use GetObjectSize
|
||||
case checksum:
|
||||
checksumBytes, err := hex.DecodeString(info.HashSum)
|
||||
if err != nil {
|
||||
|
|
|
@ -243,6 +243,7 @@ func TestDeleteMarkerVersioned(t *testing.T) {
|
|||
deleteMarkerVersion, isDeleteMarker := deleteObject(t, tc, bktName, objName, emptyVersion)
|
||||
require.True(t, isDeleteMarker)
|
||||
versions := listVersions(t, tc, bktName)
|
||||
require.Len(t, versions.DeleteMarker, 1)
|
||||
require.Equal(t, deleteMarkerVersion, versions.DeleteMarker[0].VersionID)
|
||||
|
||||
_, isDeleteMarker = deleteObject(t, tc, bktName, objName, emptyVersion)
|
||||
|
|
|
@ -185,24 +185,24 @@ func fillPrefixes(src []string, encode string) []CommonPrefix {
|
|||
return dst
|
||||
}
|
||||
|
||||
func fillContentsWithOwner(src []*data.ObjectInfo, encode string, md5Enabled bool) []Object {
|
||||
func fillContentsWithOwner(src []*data.NodeVersion, encode string, md5Enabled bool) []Object {
|
||||
return fillContents(src, encode, true, md5Enabled)
|
||||
}
|
||||
|
||||
func fillContents(src []*data.ObjectInfo, encode string, fetchOwner, md5Enabled bool) []Object {
|
||||
func fillContents(src []*data.NodeVersion, encode string, fetchOwner, md5Enabled bool) []Object {
|
||||
var dst []Object
|
||||
for _, obj := range src {
|
||||
res := Object{
|
||||
Key: s3PathEncode(obj.Name, encode),
|
||||
Key: s3PathEncode(obj.FilePath, encode),
|
||||
Size: obj.Size,
|
||||
LastModified: obj.Created.UTC().Format(time.RFC3339),
|
||||
ETag: data.Quote(obj.ETag(md5Enabled)),
|
||||
ETag: data.Quote(obj.GetETag(md5Enabled)),
|
||||
StorageClass: api.DefaultStorageClass,
|
||||
}
|
||||
|
||||
if size, err := layer.GetObjectSize(obj); err == nil {
|
||||
res.Size = size
|
||||
}
|
||||
//if size, err := layer.GetObjectSize(obj); err == nil {
|
||||
// res.Size = size
|
||||
//}
|
||||
|
||||
if fetchOwner {
|
||||
res.Owner = &Owner{
|
||||
|
@ -284,15 +284,15 @@ func encodeListObjectVersionsToResponse(info *layer.ListObjectVersionsInfo, buck
|
|||
for _, ver := range info.Version {
|
||||
res.Version = append(res.Version, ObjectVersionResponse{
|
||||
IsLatest: ver.IsLatest,
|
||||
Key: ver.ObjectInfo.Name,
|
||||
LastModified: ver.ObjectInfo.Created.UTC().Format(time.RFC3339),
|
||||
Key: ver.NodeVersion.FilePath,
|
||||
LastModified: ver.NodeVersion.Created.UTC().Format(time.RFC3339),
|
||||
Owner: Owner{
|
||||
ID: ver.ObjectInfo.Owner.String(),
|
||||
DisplayName: ver.ObjectInfo.Owner.String(),
|
||||
ID: ver.NodeVersion.Owner.String(),
|
||||
DisplayName: ver.NodeVersion.Owner.String(),
|
||||
},
|
||||
Size: ver.ObjectInfo.Size,
|
||||
Size: ver.NodeVersion.Size,
|
||||
VersionID: ver.Version(),
|
||||
ETag: data.Quote(ver.ObjectInfo.ETag(md5Enabled)),
|
||||
ETag: data.Quote(ver.NodeVersion.GetETag(md5Enabled)),
|
||||
StorageClass: api.DefaultStorageClass,
|
||||
})
|
||||
}
|
||||
|
@ -300,11 +300,11 @@ func encodeListObjectVersionsToResponse(info *layer.ListObjectVersionsInfo, buck
|
|||
for _, del := range info.DeleteMarker {
|
||||
res.DeleteMarker = append(res.DeleteMarker, DeleteMarkerEntry{
|
||||
IsLatest: del.IsLatest,
|
||||
Key: del.ObjectInfo.Name,
|
||||
LastModified: del.ObjectInfo.Created.UTC().Format(time.RFC3339),
|
||||
Key: del.NodeVersion.FilePath,
|
||||
LastModified: del.NodeVersion.Created.UTC().Format(time.RFC3339),
|
||||
Owner: Owner{
|
||||
ID: del.ObjectInfo.Owner.String(),
|
||||
DisplayName: del.ObjectInfo.Owner.String(),
|
||||
ID: del.NodeVersion.Owner.String(),
|
||||
DisplayName: del.NodeVersion.Owner.String(),
|
||||
},
|
||||
VersionID: del.Version(),
|
||||
})
|
||||
|
|
|
@ -651,7 +651,7 @@ func (n *layer) deleteObject(ctx context.Context, bkt *data.BucketInfo, settings
|
|||
|
||||
var nullVersionToDelete *data.NodeVersion
|
||||
if lastVersion.IsUnversioned {
|
||||
if !lastVersion.IsDeleteMarker() {
|
||||
if !lastVersion.IsDeleteMarker {
|
||||
nullVersionToDelete = lastVersion
|
||||
}
|
||||
} else if nullVersionToDelete, obj.Error = n.getNodeVersionToDelete(ctx, bkt, obj); obj.Error != nil {
|
||||
|
@ -667,7 +667,7 @@ func (n *layer) deleteObject(ctx context.Context, bkt *data.BucketInfo, settings
|
|||
}
|
||||
}
|
||||
|
||||
if lastVersion.IsDeleteMarker() {
|
||||
if lastVersion.IsDeleteMarker {
|
||||
obj.DeleteMarkVersion = lastVersion.OID.EncodeToString()
|
||||
return obj
|
||||
}
|
||||
|
@ -679,15 +679,14 @@ func (n *layer) deleteObject(ctx context.Context, bkt *data.BucketInfo, settings
|
|||
}
|
||||
|
||||
obj.DeleteMarkVersion = randOID.EncodeToString()
|
||||
|
||||
now := TimeNow(ctx)
|
||||
newVersion := &data.NodeVersion{
|
||||
BaseNodeVersion: data.BaseNodeVersion{
|
||||
OID: randOID,
|
||||
FilePath: obj.Name,
|
||||
},
|
||||
DeleteMarker: &data.DeleteMarkerInfo{
|
||||
Created: TimeNow(ctx),
|
||||
Owner: n.gateOwner,
|
||||
Created: &now,
|
||||
Owner: &n.gateOwner,
|
||||
IsDeleteMarker: true,
|
||||
},
|
||||
IsUnversioned: settings.VersioningSuspended(),
|
||||
}
|
||||
|
@ -764,7 +763,7 @@ func (n *layer) getLastNodeVersion(ctx context.Context, bkt *data.BucketInfo, ob
|
|||
}
|
||||
|
||||
func (n *layer) removeOldVersion(ctx context.Context, bkt *data.BucketInfo, nodeVersion *data.NodeVersion, obj *VersionedObject) (string, error) {
|
||||
if nodeVersion.IsDeleteMarker() {
|
||||
if nodeVersion.IsDeleteMarker {
|
||||
return obj.VersionID, nil
|
||||
}
|
||||
|
||||
|
|
|
@ -45,7 +45,7 @@ type (
|
|||
// ListObjectsInfo contains common fields of data for ListObjectsV1 and ListObjectsV2.
|
||||
ListObjectsInfo struct {
|
||||
Prefixes []string
|
||||
Objects []*data.ObjectInfo
|
||||
Objects []*data.NodeVersion
|
||||
IsTruncated bool
|
||||
}
|
||||
|
||||
|
@ -68,8 +68,8 @@ type (
|
|||
KeyMarker string
|
||||
NextKeyMarker string
|
||||
NextVersionIDMarker string
|
||||
Version []*data.ExtendedObjectInfo
|
||||
DeleteMarker []*data.ExtendedObjectInfo
|
||||
Version []*data.ExtendedNodeVersion
|
||||
DeleteMarker []*data.ExtendedNodeVersion
|
||||
VersionIDMarker string
|
||||
}
|
||||
|
||||
|
@ -102,10 +102,10 @@ func (n *layer) ListObjectsV1(ctx context.Context, p *ListObjectsParamsV1) (*Lis
|
|||
|
||||
if next != nil {
|
||||
result.IsTruncated = true
|
||||
result.NextMarker = objects[len(objects)-1].Name
|
||||
result.NextMarker = objects[len(objects)-1].FilePath
|
||||
}
|
||||
|
||||
result.Prefixes, result.Objects = triageObjects(objects)
|
||||
result.Prefixes, result.Objects = triageObjects(objects, p.Prefix, p.Delimiter)
|
||||
|
||||
return &result, nil
|
||||
}
|
||||
|
@ -123,17 +123,17 @@ func (n *layer) ListObjectsV2(ctx context.Context, p *ListObjectsParamsV2) (*Lis
|
|||
ContinuationToken: p.ContinuationToken,
|
||||
}
|
||||
|
||||
objects, next, err := n.getLatestObjectsVersionsV2(ctx, prm)
|
||||
objects, next, err := n.getLatestObjectsVersions(ctx, prm)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if next != nil {
|
||||
result.IsTruncated = true
|
||||
result.NextContinuationToken = next.ID.EncodeToString()
|
||||
result.NextContinuationToken = next.OID.EncodeToString()
|
||||
}
|
||||
|
||||
result.Prefixes, result.Objects = triageObjects(objects)
|
||||
result.Prefixes, result.Objects = triageObjects(objects, p.Prefix, p.Delimiter)
|
||||
|
||||
return &result, nil
|
||||
}
|
||||
|
@ -150,7 +150,7 @@ func (n *layer) ListObjectVersions(ctx context.Context, p *ListObjectVersionsPar
|
|||
}
|
||||
sort.Strings(sortedNames)
|
||||
|
||||
allObjects := make([]*data.ExtendedObjectInfo, 0, p.MaxKeys)
|
||||
allObjects := make([]*data.ExtendedNodeVersion, 0, p.MaxKeys)
|
||||
|
||||
for _, name := range sortedNames {
|
||||
sortedVersions := versions[name]
|
||||
|
@ -173,12 +173,12 @@ func (n *layer) ListObjectVersions(ctx context.Context, p *ListObjectVersionsPar
|
|||
VersionIDMarker: p.VersionIDMarker,
|
||||
}
|
||||
|
||||
res.CommonPrefixes, allObjects = triageExtendedObjects(allObjects)
|
||||
res.CommonPrefixes, allObjects = triageExtendedObjects(allObjects, p.Prefix, p.Delimiter)
|
||||
|
||||
if len(allObjects) > p.MaxKeys {
|
||||
res.IsTruncated = true
|
||||
res.NextKeyMarker = allObjects[p.MaxKeys-1].ObjectInfo.Name
|
||||
res.NextVersionIDMarker = allObjects[p.MaxKeys-1].ObjectInfo.VersionID()
|
||||
res.NextKeyMarker = allObjects[p.MaxKeys-1].NodeVersion.FilePath
|
||||
res.NextVersionIDMarker = allObjects[p.MaxKeys-1].NodeVersion.OID.EncodeToString()
|
||||
|
||||
allObjects = allObjects[:p.MaxKeys]
|
||||
}
|
||||
|
@ -187,71 +187,7 @@ func (n *layer) ListObjectVersions(ctx context.Context, p *ListObjectVersionsPar
|
|||
return res, nil
|
||||
}
|
||||
|
||||
func (n *layer) getLatestObjectsVersions(ctx context.Context, p allObjectParams) (objects []*data.ObjectInfo, next *data.ObjectInfo, err error) {
|
||||
if p.MaxKeys == 0 {
|
||||
return nil, nil, nil
|
||||
}
|
||||
|
||||
owner := n.BearerOwner(ctx)
|
||||
cacheKey := cache.CreateObjectsListCacheKey(p.Bucket.CID, p.Prefix, true)
|
||||
nodeVersions := n.cache.GetList(owner, cacheKey)
|
||||
|
||||
if nodeVersions == nil {
|
||||
nodeVersions, err = n.treeService.GetLatestVersionsByPrefix(ctx, p.Bucket, p.Prefix)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
n.cache.PutList(owner, cacheKey, nodeVersions)
|
||||
}
|
||||
|
||||
if len(nodeVersions) == 0 {
|
||||
return nil, nil, nil
|
||||
}
|
||||
|
||||
sort.Slice(nodeVersions, func(i, j int) bool {
|
||||
return nodeVersions[i].FilePath < nodeVersions[j].FilePath
|
||||
})
|
||||
|
||||
poolCtx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
objOutCh, err := n.initWorkerPool(poolCtx, 2, p, nodesGenerator(poolCtx, p, nodeVersions))
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("failed to init worker pool: %w", err)
|
||||
}
|
||||
|
||||
objects = make([]*data.ObjectInfo, 0, p.MaxKeys)
|
||||
|
||||
for obj := range objOutCh {
|
||||
objects = append(objects, obj)
|
||||
}
|
||||
|
||||
//for node := range nodesGenerator(poolCtx, p, nodeVersions) {
|
||||
// objects = append(objects, &data.ObjectInfo{
|
||||
// ID: node.OID,
|
||||
// IsDir: false,
|
||||
// IsDeleteMarker: node.IsDeleteMarker(),
|
||||
// Name: node.FilePath,
|
||||
// Size: node.Size,
|
||||
// Created: time.Time{},
|
||||
// HashSum: node.ETag,
|
||||
// Owner: user.ID{},
|
||||
// Headers: nil,
|
||||
// })
|
||||
//}
|
||||
|
||||
sort.Slice(objects, func(i, j int) bool {
|
||||
return objects[i].Name < objects[j].Name
|
||||
})
|
||||
|
||||
if len(objects) > p.MaxKeys {
|
||||
next = objects[p.MaxKeys]
|
||||
objects = objects[:p.MaxKeys]
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (n *layer) getLatestObjectsVersionsV2(ctx context.Context, p allObjectParams) (objects []*data.ObjectInfo, next *data.ObjectInfo, err error) {
|
||||
func (n *layer) getLatestObjectsVersions(ctx context.Context, p allObjectParams) (objects []*data.NodeVersion, next *data.NodeVersion, err error) {
|
||||
if p.MaxKeys == 0 {
|
||||
return nil, nil, nil
|
||||
}
|
||||
|
@ -286,7 +222,7 @@ func (n *layer) getLatestObjectsVersionsV2(ctx context.Context, p allObjectParam
|
|||
return nil, nil, fmt.Errorf("failed to init worker pool: %w", err)
|
||||
}
|
||||
|
||||
objects = make([]*data.ObjectInfo, 0, p.MaxKeys+1)
|
||||
objects = make([]*data.NodeVersion, 0, p.MaxKeys+1)
|
||||
if session.Next != nil {
|
||||
objects = append(objects, session.Next)
|
||||
}
|
||||
|
@ -295,26 +231,13 @@ func (n *layer) getLatestObjectsVersionsV2(ctx context.Context, p allObjectParam
|
|||
objects = append(objects, obj)
|
||||
}
|
||||
|
||||
//for node := range generator {
|
||||
// objects = append(objects, &data.ObjectInfo{
|
||||
// ID: node.OID,
|
||||
// IsDir: false,
|
||||
// IsDeleteMarker: node.IsDeleteMarker(),
|
||||
// Name: node.FilePath,
|
||||
// Size: node.Size,
|
||||
// Created: time.Time{},
|
||||
// HashSum: node.ETag,
|
||||
// Owner: user.ID{},
|
||||
// Headers: nil,
|
||||
// })
|
||||
//}
|
||||
|
||||
if err = <-errorCh; err != nil {
|
||||
return nil, nil, fmt.Errorf("failed to get next object from stream: %w", err)
|
||||
}
|
||||
|
||||
// probably isn't necessary
|
||||
sort.Slice(objects, func(i, j int) bool {
|
||||
return objects[i].Name < objects[j].Name
|
||||
return objects[i].FilePath < objects[j].FilePath
|
||||
})
|
||||
|
||||
if len(objects) > p.MaxKeys {
|
||||
|
@ -324,19 +247,19 @@ func (n *layer) getLatestObjectsVersionsV2(ctx context.Context, p allObjectParam
|
|||
|
||||
if next != nil {
|
||||
session.Next = next
|
||||
n.cache.PutListSession(owner, cache.CreateListSessionCacheKey(p.Bucket.CID, p.Prefix, next.VersionID()), session)
|
||||
n.cache.PutListSession(owner, cache.CreateListSessionCacheKey(p.Bucket.CID, p.Prefix, next.OID.EncodeToString()), session)
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (n *layer) getAllObjectsVersions(ctx context.Context, p *ListObjectVersionsParams) (map[string][]*data.ExtendedObjectInfo, error) {
|
||||
func (n *layer) getAllObjectsVersions(ctx context.Context, p *ListObjectVersionsParams) (map[string][]*data.ExtendedNodeVersion, error) {
|
||||
nodeVersions, err := n.bucketNodeVersions(ctx, p.BktInfo, p.Prefix)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
versions := make(map[string][]*data.ExtendedObjectInfo, len(nodeVersions))
|
||||
versions := make(map[string][]*data.ExtendedNodeVersion, len(nodeVersions))
|
||||
|
||||
sort.Slice(nodeVersions, func(i, j int) bool {
|
||||
return nodeVersions[i].FilePath < nodeVersions[j].FilePath
|
||||
|
@ -360,26 +283,48 @@ func (n *layer) getAllObjectsVersions(ctx context.Context, p *ListObjectVersions
|
|||
}
|
||||
|
||||
for eoi := range objOutCh {
|
||||
objVersions, ok := versions[eoi.ObjectInfo.Name]
|
||||
objVersions, ok := versions[eoi.NodeVersion.FilePath]
|
||||
if !ok {
|
||||
objVersions = []*data.ExtendedObjectInfo{eoi}
|
||||
} else if !eoi.ObjectInfo.IsDir {
|
||||
objVersions = []*data.ExtendedNodeVersion{eoi}
|
||||
} else if dirName := tryDirectoryName(eoi.NodeVersion, p.Prefix, p.Delimiter); dirName == "" {
|
||||
objVersions = append(objVersions, eoi)
|
||||
} else {
|
||||
versions[dirName] = objVersions
|
||||
}
|
||||
versions[eoi.ObjectInfo.Name] = objVersions
|
||||
}
|
||||
|
||||
return versions, nil
|
||||
}
|
||||
|
||||
func nodesGenerator(ctx context.Context, p allObjectParams, nodeVersions []*data.NodeVersion) <-chan *data.NodeVersion {
|
||||
nodeCh := make(chan *data.NodeVersion)
|
||||
existed := make(map[string]struct{}, len(nodeVersions)) // to squash the same directories
|
||||
func nodesGeneratorStream(ctx context.Context, p allObjectParams, stream *data.ListSession) (<-chan *data.NodeVersion, <-chan error) {
|
||||
nodeCh := make(chan *data.NodeVersion, 1000)
|
||||
errCh := make(chan error, 1)
|
||||
//existed := make(map[string]struct{}, p.MaxKeys) // to squash the same directories
|
||||
existed := stream.NamesMap
|
||||
|
||||
if stream.Next != nil {
|
||||
existed[continuationToken] = struct{}{}
|
||||
}
|
||||
|
||||
limit := p.MaxKeys
|
||||
if stream.Next == nil {
|
||||
limit++
|
||||
}
|
||||
|
||||
go func() {
|
||||
var generated int
|
||||
var err error
|
||||
|
||||
LOOP:
|
||||
for _, node := range nodeVersions {
|
||||
for err == nil {
|
||||
node, err := stream.Stream.Next(ctx)
|
||||
if err != nil {
|
||||
if !errors.Is(err, io.EOF) {
|
||||
errCh <- fmt.Errorf("stream next: %w", err)
|
||||
}
|
||||
break LOOP
|
||||
}
|
||||
|
||||
if shouldSkip(node, p, existed) {
|
||||
continue
|
||||
}
|
||||
|
@ -389,17 +334,18 @@ func nodesGenerator(ctx context.Context, p allObjectParams, nodeVersions []*data
|
|||
break LOOP
|
||||
case nodeCh <- node:
|
||||
generated++
|
||||
if generated == p.MaxKeys+1 { // we use maxKeys+1 to be able to know nextMarker/nextContinuationToken
|
||||
|
||||
if generated == limit { // we use maxKeys+1 to be able to know nextMarker/nextContinuationToken
|
||||
break LOOP
|
||||
}
|
||||
}
|
||||
}
|
||||
close(nodeCh)
|
||||
close(errCh)
|
||||
}()
|
||||
|
||||
return nodeCh
|
||||
return nodeCh, errCh
|
||||
}
|
||||
|
||||
func nodesGeneratorVersions(ctx context.Context, p allObjectParams, nodeVersions []*data.NodeVersion) <-chan *data.NodeVersion {
|
||||
nodeCh := make(chan *data.NodeVersion)
|
||||
existed := make(map[string]struct{}, len(nodeVersions)) // to squash the same directories
|
||||
|
@ -428,65 +374,13 @@ func nodesGeneratorVersions(ctx context.Context, p allObjectParams, nodeVersions
|
|||
return nodeCh
|
||||
}
|
||||
|
||||
func nodesGeneratorStream(ctx context.Context, p allObjectParams, stream *data.ListSession) (<-chan *data.NodeVersion, <-chan error) {
|
||||
nodeCh := make(chan *data.NodeVersion, 1000)
|
||||
errCh := make(chan error, 1)
|
||||
//existed := make(map[string]struct{}, p.MaxKeys) // to squash the same directories
|
||||
existed := stream.NamesMap
|
||||
|
||||
if stream.Next != nil {
|
||||
existed[continuationToken] = struct{}{}
|
||||
}
|
||||
|
||||
limit := p.MaxKeys
|
||||
if stream.Next == nil {
|
||||
limit++
|
||||
}
|
||||
|
||||
go func() {
|
||||
var generated int
|
||||
var err error
|
||||
|
||||
LOOP:
|
||||
for err == nil {
|
||||
node, err := stream.Stream.Next(ctx)
|
||||
if err != nil {
|
||||
if !errors.Is(err, io.EOF) {
|
||||
fmt.Println(ctx.Err())
|
||||
errCh <- fmt.Errorf("stream next: %w", err)
|
||||
}
|
||||
break LOOP
|
||||
}
|
||||
|
||||
if shouldSkip(node, p, existed) {
|
||||
continue
|
||||
}
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
break LOOP
|
||||
case nodeCh <- node:
|
||||
generated++
|
||||
|
||||
if generated == limit { // we use maxKeys+1 to be able to know nextMarker/nextContinuationToken
|
||||
break LOOP
|
||||
}
|
||||
}
|
||||
}
|
||||
close(nodeCh)
|
||||
close(errCh)
|
||||
}()
|
||||
|
||||
return nodeCh, errCh
|
||||
}
|
||||
|
||||
func (n *layer) initWorkerPool(ctx context.Context, size int, p allObjectParams, input <-chan *data.NodeVersion) (<-chan *data.ObjectInfo, error) {
|
||||
func (n *layer) initWorkerPoolStream(ctx context.Context, size int, p allObjectParams, input <-chan *data.NodeVersion) (<-chan *data.NodeVersion, error) {
|
||||
reqLog := n.reqLogger(ctx)
|
||||
pool, err := ants.NewPool(size, ants.WithLogger(&logWrapper{reqLog}))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("coudln't init go pool for listing: %w", err)
|
||||
}
|
||||
objCh := make(chan *data.ObjectInfo)
|
||||
objCh := make(chan *data.NodeVersion, size)
|
||||
|
||||
go func() {
|
||||
var wg sync.WaitGroup
|
||||
|
@ -499,91 +393,32 @@ func (n *layer) initWorkerPool(ctx context.Context, size int, p allObjectParams,
|
|||
default:
|
||||
}
|
||||
|
||||
// We have to make a copy of pointer to data.NodeVersion
|
||||
// to get correct value in submitted task function.
|
||||
func(node *data.NodeVersion) {
|
||||
wg.Add(1)
|
||||
err = pool.Submit(func() {
|
||||
defer wg.Done()
|
||||
oi := n.objectInfoFromObjectsCacheOrFrostFS(ctx, p.Bucket, node, p.Prefix, p.Delimiter)
|
||||
if oi == nil {
|
||||
// try to get object again
|
||||
if oi = n.objectInfoFromObjectsCacheOrFrostFS(ctx, p.Bucket, node, p.Prefix, p.Delimiter); oi == nil {
|
||||
// do not process object which are definitely missing in object service
|
||||
return
|
||||
}
|
||||
}
|
||||
if node.IsFilledExtra() || tryDirectoryName(node, p.Prefix, p.Delimiter) != "" { // todo think to not compute twice
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
case objCh <- oi:
|
||||
case objCh <- node:
|
||||
}
|
||||
})
|
||||
if err != nil {
|
||||
wg.Done()
|
||||
reqLog.Warn(logs.FailedToSubmitTaskToPool, zap.Error(err))
|
||||
}
|
||||
}(node)
|
||||
}
|
||||
wg.Wait()
|
||||
close(objCh)
|
||||
pool.Release()
|
||||
}()
|
||||
|
||||
return objCh, nil
|
||||
}
|
||||
|
||||
func (n *layer) initWorkerPoolVersions(ctx context.Context, size int, p allObjectParams, input <-chan *data.NodeVersion) (<-chan *data.ExtendedObjectInfo, error) {
|
||||
reqLog := n.reqLogger(ctx)
|
||||
pool, err := ants.NewPool(size, ants.WithLogger(&logWrapper{reqLog}))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("coudln't init go pool for listing: %w", err)
|
||||
}
|
||||
objCh := make(chan *data.ExtendedObjectInfo)
|
||||
|
||||
go func() {
|
||||
var wg sync.WaitGroup
|
||||
|
||||
LOOP:
|
||||
for node := range input {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
break LOOP
|
||||
default:
|
||||
}
|
||||
|
||||
// We have to make a copy of pointer to data.NodeVersion
|
||||
// to get correct value in submitted task function.
|
||||
func(node *data.NodeVersion) {
|
||||
wg.Add(1)
|
||||
err = pool.Submit(func() {
|
||||
defer wg.Done()
|
||||
|
||||
oi := &data.ObjectInfo{}
|
||||
if node.IsDeleteMarker() { // delete marker does not match any object in FrostFS
|
||||
oi.ID = node.OID
|
||||
oi.Name = node.FilePath
|
||||
oi.Owner = node.DeleteMarker.Owner
|
||||
oi.Created = node.DeleteMarker.Created
|
||||
oi.IsDeleteMarker = true
|
||||
} else {
|
||||
oi = n.objectInfoFromObjectsCacheOrFrostFS(ctx, p.Bucket, node, p.Prefix, p.Delimiter)
|
||||
// We have to make a copy of pointer to data.NodeVersion
|
||||
// to get correct value in submitted task function.
|
||||
func(node *data.NodeVersion) {
|
||||
wg.Add(1)
|
||||
err = pool.Submit(func() {
|
||||
defer wg.Done()
|
||||
oi := n.objectInfoFromObjectsCacheOrFrostFS(ctx, p.Bucket, node)
|
||||
if oi == nil {
|
||||
// try to get object again
|
||||
if oi = n.objectInfoFromObjectsCacheOrFrostFS(ctx, p.Bucket, node, p.Prefix, p.Delimiter); oi == nil {
|
||||
if oi = n.objectInfoFromObjectsCacheOrFrostFS(ctx, p.Bucket, node); oi == nil {
|
||||
// do not process object which are definitely missing in object service
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
eoi := &data.ExtendedObjectInfo{
|
||||
ObjectInfo: oi,
|
||||
NodeVersion: node,
|
||||
}
|
||||
node.FillExtra(oi)
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
case objCh <- eoi:
|
||||
case objCh <- node:
|
||||
}
|
||||
})
|
||||
if err != nil {
|
||||
|
@ -592,6 +427,7 @@ func (n *layer) initWorkerPoolVersions(ctx context.Context, size int, p allObjec
|
|||
}
|
||||
}(node)
|
||||
}
|
||||
}
|
||||
wg.Wait()
|
||||
close(objCh)
|
||||
pool.Release()
|
||||
|
@ -600,13 +436,13 @@ func (n *layer) initWorkerPoolVersions(ctx context.Context, size int, p allObjec
|
|||
return objCh, nil
|
||||
}
|
||||
|
||||
func (n *layer) initWorkerPoolStream(ctx context.Context, size int, p allObjectParams, input <-chan *data.NodeVersion) (<-chan *data.ObjectInfo, error) {
|
||||
func (n *layer) initWorkerPoolVersions(ctx context.Context, size int, p allObjectParams, input <-chan *data.NodeVersion) (<-chan *data.ExtendedNodeVersion, error) {
|
||||
reqLog := n.reqLogger(ctx)
|
||||
pool, err := ants.NewPool(size, ants.WithLogger(&logWrapper{reqLog}))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("coudln't init go pool for listing: %w", err)
|
||||
}
|
||||
objCh := make(chan *data.ObjectInfo)
|
||||
objCh := make(chan *data.ExtendedNodeVersion)
|
||||
|
||||
go func() {
|
||||
var wg sync.WaitGroup
|
||||
|
@ -619,23 +455,33 @@ func (n *layer) initWorkerPoolStream(ctx context.Context, size int, p allObjectP
|
|||
default:
|
||||
}
|
||||
|
||||
if node.IsFilledExtra() {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
case objCh <- &data.ExtendedNodeVersion{NodeVersion: node}:
|
||||
}
|
||||
} else {
|
||||
// We have to make a copy of pointer to data.NodeVersion
|
||||
// to get correct value in submitted task function.
|
||||
func(node *data.NodeVersion) {
|
||||
wg.Add(1)
|
||||
err = pool.Submit(func() {
|
||||
defer wg.Done()
|
||||
oi := n.objectInfoFromObjectsCacheOrFrostFS(ctx, p.Bucket, node, p.Prefix, p.Delimiter)
|
||||
|
||||
oi := n.objectInfoFromObjectsCacheOrFrostFS(ctx, p.Bucket, node)
|
||||
if oi == nil {
|
||||
// try to get object again
|
||||
if oi = n.objectInfoFromObjectsCacheOrFrostFS(ctx, p.Bucket, node, p.Prefix, p.Delimiter); oi == nil {
|
||||
if oi = n.objectInfoFromObjectsCacheOrFrostFS(ctx, p.Bucket, node); oi == nil {
|
||||
// do not process object which are definitely missing in object service
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
node.FillExtra(oi)
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
case objCh <- oi:
|
||||
case objCh <- &data.ExtendedNodeVersion{NodeVersion: node}:
|
||||
}
|
||||
})
|
||||
if err != nil {
|
||||
|
@ -644,6 +490,7 @@ func (n *layer) initWorkerPoolStream(ctx context.Context, size int, p allObjectP
|
|||
}
|
||||
}(node)
|
||||
}
|
||||
}
|
||||
wg.Wait()
|
||||
close(objCh)
|
||||
pool.Release()
|
||||
|
@ -672,7 +519,7 @@ func (n *layer) bucketNodeVersions(ctx context.Context, bkt *data.BucketInfo, pr
|
|||
}
|
||||
|
||||
func shouldSkip(node *data.NodeVersion, p allObjectParams, existed map[string]struct{}) bool {
|
||||
if node.IsDeleteMarker() {
|
||||
if node.IsDeleteMarker {
|
||||
return true
|
||||
}
|
||||
|
||||
|
@ -727,10 +574,11 @@ func shouldSkipVersions(node *data.NodeVersion, p allObjectParams, existed map[s
|
|||
return false
|
||||
}
|
||||
|
||||
func triageObjects(allObjects []*data.ObjectInfo) (prefixes []string, objects []*data.ObjectInfo) {
|
||||
func triageObjects(allObjects []*data.NodeVersion, prefix, delimiter string) (prefixes []string, objects []*data.NodeVersion) {
|
||||
objects = make([]*data.NodeVersion, 0, len(allObjects))
|
||||
for _, ov := range allObjects {
|
||||
if ov.IsDir {
|
||||
prefixes = append(prefixes, ov.Name)
|
||||
if dirName := tryDirectoryName(ov, prefix, delimiter); dirName != "" {
|
||||
prefixes = append(prefixes, dirName)
|
||||
} else {
|
||||
objects = append(objects, ov)
|
||||
}
|
||||
|
@ -739,10 +587,10 @@ func triageObjects(allObjects []*data.ObjectInfo) (prefixes []string, objects []
|
|||
return
|
||||
}
|
||||
|
||||
func triageExtendedObjects(allObjects []*data.ExtendedObjectInfo) (prefixes []string, objects []*data.ExtendedObjectInfo) {
|
||||
func triageExtendedObjects(allObjects []*data.ExtendedNodeVersion, prefix, delimiter string) (prefixes []string, objects []*data.ExtendedNodeVersion) {
|
||||
for _, ov := range allObjects {
|
||||
if ov.ObjectInfo.IsDir {
|
||||
prefixes = append(prefixes, ov.ObjectInfo.Name)
|
||||
if dirName := tryDirectoryName(ov.NodeVersion, prefix, delimiter); dirName != "" {
|
||||
prefixes = append(prefixes, dirName)
|
||||
} else {
|
||||
objects = append(objects, ov)
|
||||
}
|
||||
|
@ -751,11 +599,7 @@ func triageExtendedObjects(allObjects []*data.ExtendedObjectInfo) (prefixes []st
|
|||
return
|
||||
}
|
||||
|
||||
func (n *layer) objectInfoFromObjectsCacheOrFrostFS(ctx context.Context, bktInfo *data.BucketInfo, node *data.NodeVersion, prefix, delimiter string) (oi *data.ObjectInfo) {
|
||||
if oiDir := tryDirectory(bktInfo, node, prefix, delimiter); oiDir != nil {
|
||||
return oiDir
|
||||
}
|
||||
|
||||
func (n *layer) objectInfoFromObjectsCacheOrFrostFS(ctx context.Context, bktInfo *data.BucketInfo, node *data.NodeVersion) (oi *data.ObjectInfo) {
|
||||
owner := n.BearerOwner(ctx)
|
||||
if extInfo := n.cache.GetObject(owner, newAddress(bktInfo.CID, node.OID)); extInfo != nil {
|
||||
return extInfo.ObjectInfo
|
||||
|
@ -774,22 +618,6 @@ func (n *layer) objectInfoFromObjectsCacheOrFrostFS(ctx context.Context, bktInfo
|
|||
return oi
|
||||
}
|
||||
|
||||
func tryDirectory(bktInfo *data.BucketInfo, node *data.NodeVersion, prefix, delimiter string) *data.ObjectInfo {
|
||||
dirName := tryDirectoryName(node, prefix, delimiter)
|
||||
if len(dirName) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
return &data.ObjectInfo{
|
||||
ID: node.OID, // to use it as continuation token
|
||||
CID: bktInfo.CID,
|
||||
IsDir: true,
|
||||
IsDeleteMarker: node.IsDeleteMarker(),
|
||||
Bucket: bktInfo.Name,
|
||||
Name: dirName,
|
||||
}
|
||||
}
|
||||
|
||||
// tryDirectoryName forms directory name by prefix and delimiter.
|
||||
// If node isn't a directory empty string is returned.
|
||||
// This function doesn't check if node has a prefix. It must do a caller.
|
||||
|
@ -807,26 +635,26 @@ func tryDirectoryName(node *data.NodeVersion, prefix, delimiter string) string {
|
|||
return ""
|
||||
}
|
||||
|
||||
func filterVersionsByMarker(objects []*data.ExtendedObjectInfo, p *ListObjectVersionsParams) ([]*data.ExtendedObjectInfo, error) {
|
||||
func filterVersionsByMarker(objects []*data.ExtendedNodeVersion, p *ListObjectVersionsParams) ([]*data.ExtendedNodeVersion, error) {
|
||||
if p.KeyMarker == "" {
|
||||
return objects, nil
|
||||
}
|
||||
|
||||
for i, obj := range objects {
|
||||
if obj.ObjectInfo.Name == p.KeyMarker {
|
||||
if obj.NodeVersion.FilePath == p.KeyMarker {
|
||||
for j := i; j < len(objects); j++ {
|
||||
if objects[j].ObjectInfo.Name != obj.ObjectInfo.Name {
|
||||
if objects[j].NodeVersion.FilePath != obj.NodeVersion.FilePath {
|
||||
if p.VersionIDMarker == "" {
|
||||
return objects[j:], nil
|
||||
}
|
||||
break
|
||||
}
|
||||
if objects[j].ObjectInfo.VersionID() == p.VersionIDMarker {
|
||||
if objects[j].NodeVersion.OID.EncodeToString() == p.VersionIDMarker {
|
||||
return objects[j+1:], nil
|
||||
}
|
||||
}
|
||||
return nil, s3errors.GetAPIError(s3errors.ErrInvalidVersion)
|
||||
} else if obj.ObjectInfo.Name > p.KeyMarker {
|
||||
} else if obj.NodeVersion.FilePath > p.KeyMarker {
|
||||
if p.VersionIDMarker != "" {
|
||||
return nil, s3errors.GetAPIError(s3errors.ErrInvalidVersion)
|
||||
}
|
||||
|
@ -836,19 +664,19 @@ func filterVersionsByMarker(objects []*data.ExtendedObjectInfo, p *ListObjectVer
|
|||
|
||||
// don't use nil as empty slice to be consistent with `return objects[j+1:], nil` above
|
||||
// that can be empty
|
||||
return []*data.ExtendedObjectInfo{}, nil
|
||||
return []*data.ExtendedNodeVersion{}, nil
|
||||
}
|
||||
|
||||
func triageVersions(objVersions []*data.ExtendedObjectInfo) ([]*data.ExtendedObjectInfo, []*data.ExtendedObjectInfo) {
|
||||
func triageVersions(objVersions []*data.ExtendedNodeVersion) ([]*data.ExtendedNodeVersion, []*data.ExtendedNodeVersion) {
|
||||
if len(objVersions) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
var resVersion []*data.ExtendedObjectInfo
|
||||
var resDelMarkVersions []*data.ExtendedObjectInfo
|
||||
var resVersion []*data.ExtendedNodeVersion
|
||||
var resDelMarkVersions []*data.ExtendedNodeVersion
|
||||
|
||||
for _, version := range objVersions {
|
||||
if version.NodeVersion.IsDeleteMarker() {
|
||||
if version.NodeVersion.IsDeleteMarker {
|
||||
resDelMarkVersions = append(resDelMarkVersions, version)
|
||||
} else {
|
||||
resVersion = append(resVersion, version)
|
||||
|
|
|
@ -296,13 +296,15 @@ func (n *layer) PutObject(ctx context.Context, p *PutObjectParams) (*data.Extend
|
|||
}
|
||||
|
||||
n.reqLogger(ctx).Debug(logs.PutObject, zap.Stringer("cid", p.BktInfo.CID), zap.Stringer("oid", id))
|
||||
|
||||
now := TimeNow(ctx)
|
||||
newVersion := &data.NodeVersion{
|
||||
BaseNodeVersion: data.BaseNodeVersion{
|
||||
OID: id,
|
||||
ETag: hex.EncodeToString(hash),
|
||||
FilePath: p.Object,
|
||||
Size: size,
|
||||
Size: p.Size,
|
||||
Created: &now,
|
||||
Owner: &n.gateOwner,
|
||||
},
|
||||
IsUnversioned: !bktSettings.VersioningEnabled(),
|
||||
IsCombined: p.Header[MultipartObjectSize] != "",
|
||||
|
@ -375,7 +377,7 @@ func (n *layer) headLastVersionIfNotDeleted(ctx context.Context, bkt *data.Bucke
|
|||
return nil, err
|
||||
}
|
||||
|
||||
if node.IsDeleteMarker() {
|
||||
if node.IsDeleteMarker {
|
||||
return nil, DeleteMarkerError{ErrorCode: apiErrors.ErrNoSuchKey}
|
||||
}
|
||||
|
||||
|
@ -432,7 +434,7 @@ func (n *layer) headVersion(ctx context.Context, bkt *data.BucketInfo, p *HeadOb
|
|||
return extObjInfo, nil
|
||||
}
|
||||
|
||||
if foundVersion.IsDeleteMarker() {
|
||||
if foundVersion.IsDeleteMarker {
|
||||
return nil, DeleteMarkerError{ErrorCode: apiErrors.ErrMethodNotAllowed}
|
||||
}
|
||||
|
||||
|
|
|
@ -174,13 +174,13 @@ func (n *layer) getNodeVersion(ctx context.Context, objVersion *ObjectVersion) (
|
|||
}
|
||||
}
|
||||
|
||||
if err == nil && version.IsDeleteMarker() && !objVersion.NoErrorOnDeleteMarker {
|
||||
if err == nil && version.IsDeleteMarker && !objVersion.NoErrorOnDeleteMarker {
|
||||
return nil, fmt.Errorf("%w: found version is delete marker", s3errors.GetAPIError(s3errors.ErrNoSuchKey))
|
||||
} else if errors.Is(err, ErrNodeNotFound) {
|
||||
return nil, fmt.Errorf("%w: %s", s3errors.GetAPIError(s3errors.ErrNoSuchKey), err.Error())
|
||||
}
|
||||
|
||||
if err == nil && version != nil && !version.IsDeleteMarker() {
|
||||
if err == nil && version != nil && !version.IsDeleteMarker {
|
||||
n.reqLogger(ctx).Debug(logs.GetTreeNode,
|
||||
zap.Stringer("cid", objVersion.BktInfo.CID), zap.Stringer("oid", version.OID))
|
||||
}
|
||||
|
|
|
@ -50,7 +50,6 @@ func objectInfoFromMeta(bkt *data.BucketInfo, meta *object.Object) *data.ObjectI
|
|||
return &data.ObjectInfo{
|
||||
ID: objID,
|
||||
CID: bkt.CID,
|
||||
IsDir: false,
|
||||
|
||||
Bucket: bkt.Name,
|
||||
Name: filepathFromObject(meta),
|
||||
|
|
|
@ -1,51 +1,13 @@
|
|||
package layer
|
||||
|
||||
import (
|
||||
"encoding/hex"
|
||||
"net/http"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/checksum"
|
||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
var (
|
||||
defaultTestCreated = time.Now()
|
||||
defaultTestPayload = []byte("test object payload")
|
||||
defaultTestPayloadLength = uint64(len(defaultTestPayload))
|
||||
defaultTestContentType = http.DetectContentType(defaultTestPayload)
|
||||
)
|
||||
|
||||
func newTestInfo(obj oid.ID, bkt *data.BucketInfo, name string, isDir bool) *data.ObjectInfo {
|
||||
var hashSum checksum.Checksum
|
||||
info := &data.ObjectInfo{
|
||||
ID: obj,
|
||||
Name: name,
|
||||
Bucket: bkt.Name,
|
||||
CID: bkt.CID,
|
||||
Size: defaultTestPayloadLength,
|
||||
ContentType: defaultTestContentType,
|
||||
Created: time.Unix(defaultTestCreated.Unix(), 0),
|
||||
Owner: bkt.Owner,
|
||||
Headers: make(map[string]string),
|
||||
HashSum: hex.EncodeToString(hashSum.Value()),
|
||||
}
|
||||
|
||||
if isDir {
|
||||
info.IsDir = true
|
||||
info.Size = 0
|
||||
info.ContentType = ""
|
||||
info.Headers = nil
|
||||
}
|
||||
|
||||
return info
|
||||
}
|
||||
|
||||
func newTestNodeVersion(id oid.ID, name string) *data.NodeVersion {
|
||||
return &data.NodeVersion{
|
||||
BaseNodeVersion: data.BaseNodeVersion{
|
||||
|
@ -56,98 +18,84 @@ func newTestNodeVersion(id oid.ID, name string) *data.NodeVersion {
|
|||
}
|
||||
|
||||
func TestTryDirectory(t *testing.T) {
|
||||
var uid user.ID
|
||||
var id oid.ID
|
||||
var containerID cid.ID
|
||||
|
||||
bkt := &data.BucketInfo{
|
||||
Name: "test-container",
|
||||
CID: containerID,
|
||||
Owner: uid,
|
||||
Created: time.Now(),
|
||||
}
|
||||
|
||||
cases := []struct {
|
||||
name string
|
||||
prefix string
|
||||
result *data.ObjectInfo
|
||||
result string
|
||||
node *data.NodeVersion
|
||||
delimiter string
|
||||
}{
|
||||
{
|
||||
name: "small.jpg",
|
||||
result: nil,
|
||||
result: "",
|
||||
node: newTestNodeVersion(id, "small.jpg"),
|
||||
},
|
||||
{
|
||||
name: "small.jpg not matched prefix",
|
||||
prefix: "big",
|
||||
result: nil,
|
||||
result: "",
|
||||
node: newTestNodeVersion(id, "small.jpg"),
|
||||
},
|
||||
{
|
||||
name: "small.jpg delimiter",
|
||||
delimiter: "/",
|
||||
result: nil,
|
||||
result: "",
|
||||
node: newTestNodeVersion(id, "small.jpg"),
|
||||
},
|
||||
{
|
||||
name: "test/small.jpg",
|
||||
result: nil,
|
||||
result: "",
|
||||
node: newTestNodeVersion(id, "test/small.jpg"),
|
||||
},
|
||||
{
|
||||
name: "test/small.jpg with prefix and delimiter",
|
||||
prefix: "test/",
|
||||
delimiter: "/",
|
||||
result: nil,
|
||||
result: "",
|
||||
node: newTestNodeVersion(id, "test/small.jpg"),
|
||||
},
|
||||
{
|
||||
name: "a/b/small.jpg",
|
||||
prefix: "a",
|
||||
result: nil,
|
||||
result: "",
|
||||
node: newTestNodeVersion(id, "a/b/small.jpg"),
|
||||
},
|
||||
{
|
||||
name: "a/b/small.jpg",
|
||||
prefix: "a/",
|
||||
delimiter: "/",
|
||||
result: newTestInfo(id, bkt, "a/b/", true),
|
||||
result: "a/b/",
|
||||
node: newTestNodeVersion(id, "a/b/small.jpg"),
|
||||
},
|
||||
{
|
||||
name: "a/b/c/small.jpg",
|
||||
prefix: "a/",
|
||||
delimiter: "/",
|
||||
result: newTestInfo(id, bkt, "a/b/", true),
|
||||
result: "a/b/",
|
||||
node: newTestNodeVersion(id, "a/b/c/small.jpg"),
|
||||
},
|
||||
{
|
||||
name: "a/b/c/small.jpg",
|
||||
prefix: "a/b/c/s",
|
||||
delimiter: "/",
|
||||
result: nil,
|
||||
result: "",
|
||||
node: newTestNodeVersion(id, "a/b/c/small.jpg"),
|
||||
},
|
||||
{
|
||||
name: "a/b/c/big.jpg",
|
||||
prefix: "a/b/",
|
||||
delimiter: "/",
|
||||
result: newTestInfo(id, bkt, "a/b/c/", true),
|
||||
result: "a/b/c/",
|
||||
node: newTestNodeVersion(id, "a/b/c/big.jpg"),
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range cases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
info := tryDirectory(bkt, tc.node, tc.prefix, tc.delimiter)
|
||||
if tc.result != nil {
|
||||
tc.result.Created = time.Time{}
|
||||
tc.result.Owner = user.ID{}
|
||||
}
|
||||
|
||||
require.Equal(t, tc.result, info)
|
||||
dirName := tryDirectoryName(tc.node, tc.prefix, tc.delimiter)
|
||||
require.Equal(t, tc.result, dirName)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
|
@ -72,7 +72,7 @@ func (tc *testContext) deleteObject(objectName, versionID string, settings *data
|
|||
}
|
||||
}
|
||||
|
||||
func (tc *testContext) listObjectsV1() []*data.ObjectInfo {
|
||||
func (tc *testContext) listObjectsV1() []*data.NodeVersion {
|
||||
res, err := tc.layer.ListObjectsV1(tc.ctx, &ListObjectsParamsV1{
|
||||
ListObjectsParamsCommon: ListObjectsParamsCommon{
|
||||
BktInfo: tc.bktInfo,
|
||||
|
@ -83,7 +83,7 @@ func (tc *testContext) listObjectsV1() []*data.ObjectInfo {
|
|||
return res.Objects
|
||||
}
|
||||
|
||||
func (tc *testContext) listObjectsV2() []*data.ObjectInfo {
|
||||
func (tc *testContext) listObjectsV2() []*data.NodeVersion {
|
||||
res, err := tc.layer.ListObjectsV2(tc.ctx, &ListObjectsParamsV2{
|
||||
ListObjectsParamsCommon: ListObjectsParamsCommon{
|
||||
BktInfo: tc.bktInfo,
|
||||
|
@ -291,7 +291,7 @@ func TestVersioningDeleteSpecificObjectVersion(t *testing.T) {
|
|||
require.Len(t, versions.DeleteMarker, 1)
|
||||
for _, ver := range versions.DeleteMarker {
|
||||
if ver.IsLatest {
|
||||
tc.deleteObject(tc.obj, ver.ObjectInfo.VersionID(), settings)
|
||||
tc.deleteObject(tc.obj, ver.NodeVersion.OID.EncodeToString(), settings)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -323,112 +323,112 @@ func TestFilterVersionsByMarker(t *testing.T) {
|
|||
|
||||
for _, tc := range []struct {
|
||||
name string
|
||||
objects []*data.ExtendedObjectInfo
|
||||
objects []*data.ExtendedNodeVersion
|
||||
params *ListObjectVersionsParams
|
||||
expected []*data.ExtendedObjectInfo
|
||||
expected []*data.ExtendedNodeVersion
|
||||
error bool
|
||||
}{
|
||||
{
|
||||
name: "missed key marker",
|
||||
objects: []*data.ExtendedObjectInfo{
|
||||
{ObjectInfo: &data.ObjectInfo{Name: "obj0", ID: testOIDs[0]}},
|
||||
{ObjectInfo: &data.ObjectInfo{Name: "obj0", ID: testOIDs[1]}},
|
||||
objects: []*data.ExtendedNodeVersion{
|
||||
{NodeVersion: &data.NodeVersion{BaseNodeVersion: data.BaseNodeVersion{FilePath: "obj0", OID: testOIDs[0]}}},
|
||||
{NodeVersion: &data.NodeVersion{BaseNodeVersion: data.BaseNodeVersion{FilePath: "obj0", OID: testOIDs[1]}}},
|
||||
},
|
||||
params: &ListObjectVersionsParams{KeyMarker: "", VersionIDMarker: "dummy"},
|
||||
expected: []*data.ExtendedObjectInfo{
|
||||
{ObjectInfo: &data.ObjectInfo{Name: "obj0", ID: testOIDs[0]}},
|
||||
{ObjectInfo: &data.ObjectInfo{Name: "obj0", ID: testOIDs[1]}},
|
||||
expected: []*data.ExtendedNodeVersion{
|
||||
{NodeVersion: &data.NodeVersion{BaseNodeVersion: data.BaseNodeVersion{FilePath: "obj0", OID: testOIDs[0]}}},
|
||||
{NodeVersion: &data.NodeVersion{BaseNodeVersion: data.BaseNodeVersion{FilePath: "obj0", OID: testOIDs[1]}}},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "last version id",
|
||||
objects: []*data.ExtendedObjectInfo{
|
||||
{ObjectInfo: &data.ObjectInfo{Name: "obj0", ID: testOIDs[0]}},
|
||||
{ObjectInfo: &data.ObjectInfo{Name: "obj0", ID: testOIDs[1]}},
|
||||
objects: []*data.ExtendedNodeVersion{
|
||||
{NodeVersion: &data.NodeVersion{BaseNodeVersion: data.BaseNodeVersion{FilePath: "obj0", OID: testOIDs[0]}}},
|
||||
{NodeVersion: &data.NodeVersion{BaseNodeVersion: data.BaseNodeVersion{FilePath: "obj0", OID: testOIDs[1]}}},
|
||||
},
|
||||
params: &ListObjectVersionsParams{KeyMarker: "obj0", VersionIDMarker: testOIDs[1].EncodeToString()},
|
||||
expected: []*data.ExtendedObjectInfo{},
|
||||
expected: []*data.ExtendedNodeVersion{},
|
||||
},
|
||||
{
|
||||
name: "same name, different versions",
|
||||
objects: []*data.ExtendedObjectInfo{
|
||||
{ObjectInfo: &data.ObjectInfo{Name: "obj0", ID: testOIDs[0]}},
|
||||
{ObjectInfo: &data.ObjectInfo{Name: "obj0", ID: testOIDs[1]}},
|
||||
objects: []*data.ExtendedNodeVersion{
|
||||
{NodeVersion: &data.NodeVersion{BaseNodeVersion: data.BaseNodeVersion{FilePath: "obj0", OID: testOIDs[0]}}},
|
||||
{NodeVersion: &data.NodeVersion{BaseNodeVersion: data.BaseNodeVersion{FilePath: "obj0", OID: testOIDs[1]}}},
|
||||
},
|
||||
params: &ListObjectVersionsParams{KeyMarker: "obj0", VersionIDMarker: testOIDs[0].EncodeToString()},
|
||||
expected: []*data.ExtendedObjectInfo{
|
||||
{ObjectInfo: &data.ObjectInfo{Name: "obj0", ID: testOIDs[1]}},
|
||||
expected: []*data.ExtendedNodeVersion{
|
||||
{NodeVersion: &data.NodeVersion{BaseNodeVersion: data.BaseNodeVersion{FilePath: "obj0", OID: testOIDs[1]}}},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "different name, different versions",
|
||||
objects: []*data.ExtendedObjectInfo{
|
||||
{ObjectInfo: &data.ObjectInfo{Name: "obj0", ID: testOIDs[0]}},
|
||||
{ObjectInfo: &data.ObjectInfo{Name: "obj1", ID: testOIDs[1]}},
|
||||
objects: []*data.ExtendedNodeVersion{
|
||||
{NodeVersion: &data.NodeVersion{BaseNodeVersion: data.BaseNodeVersion{FilePath: "obj0", OID: testOIDs[0]}}},
|
||||
{NodeVersion: &data.NodeVersion{BaseNodeVersion: data.BaseNodeVersion{FilePath: "obj1", OID: testOIDs[1]}}},
|
||||
},
|
||||
params: &ListObjectVersionsParams{KeyMarker: "obj0", VersionIDMarker: testOIDs[0].EncodeToString()},
|
||||
expected: []*data.ExtendedObjectInfo{
|
||||
{ObjectInfo: &data.ObjectInfo{Name: "obj1", ID: testOIDs[1]}},
|
||||
expected: []*data.ExtendedNodeVersion{
|
||||
{NodeVersion: &data.NodeVersion{BaseNodeVersion: data.BaseNodeVersion{FilePath: "obj1", OID: testOIDs[1]}}},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "not matched name alphabetically less",
|
||||
objects: []*data.ExtendedObjectInfo{
|
||||
{ObjectInfo: &data.ObjectInfo{Name: "obj0", ID: testOIDs[0]}},
|
||||
{ObjectInfo: &data.ObjectInfo{Name: "obj1", ID: testOIDs[1]}},
|
||||
objects: []*data.ExtendedNodeVersion{
|
||||
{NodeVersion: &data.NodeVersion{BaseNodeVersion: data.BaseNodeVersion{FilePath: "obj0", OID: testOIDs[0]}}},
|
||||
{NodeVersion: &data.NodeVersion{BaseNodeVersion: data.BaseNodeVersion{FilePath: "obj1", OID: testOIDs[1]}}},
|
||||
},
|
||||
params: &ListObjectVersionsParams{KeyMarker: "obj", VersionIDMarker: ""},
|
||||
expected: []*data.ExtendedObjectInfo{
|
||||
{ObjectInfo: &data.ObjectInfo{Name: "obj0", ID: testOIDs[0]}},
|
||||
{ObjectInfo: &data.ObjectInfo{Name: "obj1", ID: testOIDs[1]}},
|
||||
expected: []*data.ExtendedNodeVersion{
|
||||
{NodeVersion: &data.NodeVersion{BaseNodeVersion: data.BaseNodeVersion{FilePath: "obj0", OID: testOIDs[0]}}},
|
||||
{NodeVersion: &data.NodeVersion{BaseNodeVersion: data.BaseNodeVersion{FilePath: "obj1", OID: testOIDs[1]}}},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "not matched name alphabetically less with dummy version id",
|
||||
objects: []*data.ExtendedObjectInfo{
|
||||
{ObjectInfo: &data.ObjectInfo{Name: "obj0", ID: testOIDs[0]}},
|
||||
objects: []*data.ExtendedNodeVersion{
|
||||
{NodeVersion: &data.NodeVersion{BaseNodeVersion: data.BaseNodeVersion{FilePath: "obj0", OID: testOIDs[0]}}},
|
||||
},
|
||||
params: &ListObjectVersionsParams{KeyMarker: "obj", VersionIDMarker: "dummy"},
|
||||
error: true,
|
||||
},
|
||||
{
|
||||
name: "not matched name alphabetically greater",
|
||||
objects: []*data.ExtendedObjectInfo{
|
||||
{ObjectInfo: &data.ObjectInfo{Name: "obj0", ID: testOIDs[0]}},
|
||||
{ObjectInfo: &data.ObjectInfo{Name: "obj1", ID: testOIDs[1]}},
|
||||
objects: []*data.ExtendedNodeVersion{
|
||||
{NodeVersion: &data.NodeVersion{BaseNodeVersion: data.BaseNodeVersion{FilePath: "obj0", OID: testOIDs[0]}}},
|
||||
{NodeVersion: &data.NodeVersion{BaseNodeVersion: data.BaseNodeVersion{FilePath: "obj1", OID: testOIDs[1]}}},
|
||||
},
|
||||
params: &ListObjectVersionsParams{KeyMarker: "obj2", VersionIDMarker: testOIDs[2].EncodeToString()},
|
||||
expected: []*data.ExtendedObjectInfo{},
|
||||
expected: []*data.ExtendedNodeVersion{},
|
||||
},
|
||||
{
|
||||
name: "not found version id",
|
||||
objects: []*data.ExtendedObjectInfo{
|
||||
{ObjectInfo: &data.ObjectInfo{Name: "obj0", ID: testOIDs[0]}},
|
||||
{ObjectInfo: &data.ObjectInfo{Name: "obj0", ID: testOIDs[1]}},
|
||||
{ObjectInfo: &data.ObjectInfo{Name: "obj1", ID: testOIDs[2]}},
|
||||
objects: []*data.ExtendedNodeVersion{
|
||||
{NodeVersion: &data.NodeVersion{BaseNodeVersion: data.BaseNodeVersion{FilePath: "obj0", OID: testOIDs[0]}}},
|
||||
{NodeVersion: &data.NodeVersion{BaseNodeVersion: data.BaseNodeVersion{FilePath: "obj0", OID: testOIDs[1]}}},
|
||||
{NodeVersion: &data.NodeVersion{BaseNodeVersion: data.BaseNodeVersion{FilePath: "obj1", OID: testOIDs[2]}}},
|
||||
},
|
||||
params: &ListObjectVersionsParams{KeyMarker: "obj0", VersionIDMarker: "dummy"},
|
||||
error: true,
|
||||
},
|
||||
{
|
||||
name: "not found version id, obj last",
|
||||
objects: []*data.ExtendedObjectInfo{
|
||||
{ObjectInfo: &data.ObjectInfo{Name: "obj0", ID: testOIDs[0]}},
|
||||
{ObjectInfo: &data.ObjectInfo{Name: "obj0", ID: testOIDs[1]}},
|
||||
objects: []*data.ExtendedNodeVersion{
|
||||
{NodeVersion: &data.NodeVersion{BaseNodeVersion: data.BaseNodeVersion{FilePath: "obj0", OID: testOIDs[0]}}},
|
||||
{NodeVersion: &data.NodeVersion{BaseNodeVersion: data.BaseNodeVersion{FilePath: "obj0", OID: testOIDs[1]}}},
|
||||
},
|
||||
params: &ListObjectVersionsParams{KeyMarker: "obj0", VersionIDMarker: "dummy"},
|
||||
error: true,
|
||||
},
|
||||
{
|
||||
name: "not found version id, obj last",
|
||||
objects: []*data.ExtendedObjectInfo{
|
||||
{ObjectInfo: &data.ObjectInfo{Name: "obj0", ID: testOIDs[0]}},
|
||||
{ObjectInfo: &data.ObjectInfo{Name: "obj0", ID: testOIDs[1]}},
|
||||
{ObjectInfo: &data.ObjectInfo{Name: "obj1", ID: testOIDs[2]}},
|
||||
objects: []*data.ExtendedNodeVersion{
|
||||
{NodeVersion: &data.NodeVersion{BaseNodeVersion: data.BaseNodeVersion{FilePath: "obj0", OID: testOIDs[0]}}},
|
||||
{NodeVersion: &data.NodeVersion{BaseNodeVersion: data.BaseNodeVersion{FilePath: "obj0", OID: testOIDs[1]}}},
|
||||
{NodeVersion: &data.NodeVersion{BaseNodeVersion: data.BaseNodeVersion{FilePath: "obj1", OID: testOIDs[2]}}},
|
||||
},
|
||||
params: &ListObjectVersionsParams{KeyMarker: "obj0", VersionIDMarker: ""},
|
||||
expected: []*data.ExtendedObjectInfo{
|
||||
{ObjectInfo: &data.ObjectInfo{Name: "obj1", ID: testOIDs[2]}},
|
||||
expected: []*data.ExtendedNodeVersion{
|
||||
{NodeVersion: &data.NodeVersion{BaseNodeVersion: data.BaseNodeVersion{FilePath: "obj1", OID: testOIDs[2]}}},
|
||||
},
|
||||
},
|
||||
} {
|
||||
|
|
|
@ -205,29 +205,26 @@ func newNodeVersionFromTreeNode(filePath string, treeNode *treeNode) *data.NodeV
|
|||
MD5: md5,
|
||||
Size: treeNode.Size,
|
||||
FilePath: filePath,
|
||||
IsDeleteMarker: isDeleteMarker,
|
||||
},
|
||||
IsUnversioned: isUnversioned,
|
||||
IsCombined: isCombined,
|
||||
}
|
||||
|
||||
if isDeleteMarker {
|
||||
var created time.Time
|
||||
if createdStr, ok := treeNode.Get(createdKV); ok {
|
||||
if utcMilli, err := strconv.ParseInt(createdStr, 10, 64); err == nil {
|
||||
created = time.UnixMilli(utcMilli)
|
||||
created := time.UnixMilli(utcMilli)
|
||||
version.Created = &created
|
||||
}
|
||||
}
|
||||
|
||||
var owner user.ID
|
||||
if ownerStr, ok := treeNode.Get(ownerKV); ok {
|
||||
_ = owner.DecodeString(ownerStr)
|
||||
var owner user.ID
|
||||
if err := owner.DecodeString(ownerStr); err == nil {
|
||||
version.Owner = &owner
|
||||
}
|
||||
}
|
||||
|
||||
version.DeleteMarker = &data.DeleteMarkerInfo{
|
||||
Created: created,
|
||||
Owner: owner,
|
||||
}
|
||||
}
|
||||
return version
|
||||
}
|
||||
|
||||
|
@ -958,7 +955,7 @@ func (c *Tree) getSubTreeVersions(ctx context.Context, bktInfo *data.BucketInfo,
|
|||
|
||||
result := make([]*data.NodeVersion, 0, len(versions)) // consider use len(subTree)
|
||||
for _, version := range versions {
|
||||
if latestOnly && version[0].IsDeleteMarker() {
|
||||
if latestOnly && version[0].IsDeleteMarker {
|
||||
continue
|
||||
}
|
||||
result = append(result, version...)
|
||||
|
@ -1325,6 +1322,8 @@ func (c *Tree) addVersion(ctx context.Context, bktInfo *data.BucketInfo, treeID
|
|||
meta := map[string]string{
|
||||
oidKV: version.OID.EncodeToString(),
|
||||
FileNameKey: path[len(path)-1],
|
||||
ownerKV: version.Owner.EncodeToString(),
|
||||
createdKV: strconv.FormatInt(version.Created.UTC().UnixMilli(), 10),
|
||||
}
|
||||
|
||||
if version.Size > 0 {
|
||||
|
@ -1337,10 +1336,8 @@ func (c *Tree) addVersion(ctx context.Context, bktInfo *data.BucketInfo, treeID
|
|||
meta[md5KV] = version.MD5
|
||||
}
|
||||
|
||||
if version.IsDeleteMarker() {
|
||||
if version.IsDeleteMarker {
|
||||
meta[isDeleteMarkerKV] = "true"
|
||||
meta[ownerKV] = version.DeleteMarker.Owner.EncodeToString()
|
||||
meta[createdKV] = strconv.FormatInt(version.DeleteMarker.Created.UTC().UnixMilli(), 10)
|
||||
}
|
||||
|
||||
if version.IsCombined {
|
||||
|
|
|
@ -3,10 +3,12 @@ package tree
|
|||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
|
||||
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
|
||||
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
|
||||
usertest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user/test"
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.uber.org/zap/zaptest"
|
||||
)
|
||||
|
@ -141,12 +143,15 @@ func TestTreeServiceAddVersion(t *testing.T) {
|
|||
CID: cidtest.ID(),
|
||||
}
|
||||
|
||||
now := time.Now()
|
||||
version := &data.NodeVersion{
|
||||
BaseNodeVersion: data.BaseNodeVersion{
|
||||
OID: oidtest.ID(),
|
||||
Size: 10,
|
||||
ETag: "etag",
|
||||
FilePath: "path/to/version",
|
||||
Owner: usertest.ID(),
|
||||
Created: &now,
|
||||
},
|
||||
IsUnversioned: true,
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue