forked from TrueCloudLab/frostfs-s3-gw
parent
d8ab1b4799
commit
12a2060dd0
7 changed files with 224 additions and 300 deletions
|
@ -57,12 +57,19 @@ type (
|
|||
}
|
||||
|
||||
allObjectParams struct {
|
||||
Bucket *data.BucketInfo
|
||||
Delimiter string
|
||||
Prefix string
|
||||
Bucket *data.BucketInfo
|
||||
Delimiter string
|
||||
Prefix string
|
||||
MaxKeys int
|
||||
Marker string
|
||||
ContinuationToken string
|
||||
}
|
||||
)
|
||||
|
||||
const (
|
||||
continuationToken = "<continuation-token>"
|
||||
)
|
||||
|
||||
func newAddress(cnr cid.ID, obj oid.ID) oid.Address {
|
||||
var addr oid.Address
|
||||
addr.SetContainer(cnr)
|
||||
|
@ -141,7 +148,10 @@ func (n *layer) PutObject(ctx context.Context, p *PutObjectParams) (*data.Object
|
|||
own := n.Owner(ctx)
|
||||
|
||||
versioningEnabled := n.isVersioningEnabled(ctx, p.BktInfo)
|
||||
newVersion := &data.NodeVersion{IsUnversioned: !versioningEnabled}
|
||||
newVersion := &data.NodeVersion{
|
||||
BaseNodeVersion: data.BaseNodeVersion{FilePath: p.Object},
|
||||
IsUnversioned: !versioningEnabled,
|
||||
}
|
||||
|
||||
r := p.Reader
|
||||
if r != nil {
|
||||
|
@ -178,7 +188,7 @@ func (n *layer) PutObject(ctx context.Context, p *PutObjectParams) (*data.Object
|
|||
}
|
||||
|
||||
newVersion.OID = *id
|
||||
if err = n.treeService.AddVersion(ctx, &p.BktInfo.CID, p.Object, newVersion); err != nil {
|
||||
if err = n.treeService.AddVersion(ctx, &p.BktInfo.CID, newVersion); err != nil {
|
||||
return nil, fmt.Errorf("couldn't add new verion to tree service: %w", err)
|
||||
}
|
||||
|
||||
|
@ -345,124 +355,103 @@ func (n *layer) objectPutAndHash(ctx context.Context, prm PrmObjectCreate, bktIn
|
|||
|
||||
// ListObjectsV1 returns objects in a bucket for requests of Version 1.
|
||||
func (n *layer) ListObjectsV1(ctx context.Context, p *ListObjectsParamsV1) (*ListObjectsInfoV1, error) {
|
||||
var (
|
||||
err error
|
||||
result ListObjectsInfoV1
|
||||
allObjects []*data.ObjectInfo
|
||||
)
|
||||
var result ListObjectsInfoV1
|
||||
|
||||
if p.MaxKeys == 0 {
|
||||
return &result, nil
|
||||
prm := allObjectParams{
|
||||
Bucket: p.BktInfo,
|
||||
Delimiter: p.Delimiter,
|
||||
Prefix: p.Prefix,
|
||||
MaxKeys: p.MaxKeys,
|
||||
Marker: p.Marker,
|
||||
}
|
||||
|
||||
if allObjects, err = n.listAllObjects(ctx, p.ListObjectsParamsCommon); err != nil {
|
||||
objects, next, err := n.getLatestObjectsVersions(ctx, prm)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if len(allObjects) == 0 {
|
||||
return &result, nil
|
||||
}
|
||||
|
||||
if p.Marker != "" {
|
||||
allObjects = trimAfterObjectName(p.Marker, allObjects)
|
||||
}
|
||||
|
||||
if len(allObjects) > p.MaxKeys {
|
||||
if next != nil {
|
||||
result.IsTruncated = true
|
||||
allObjects = allObjects[:p.MaxKeys]
|
||||
result.NextMarker = allObjects[len(allObjects)-1].Name
|
||||
result.NextMarker = objects[len(objects)-1].Name
|
||||
}
|
||||
|
||||
result.Prefixes, result.Objects = triageObjects(allObjects)
|
||||
result.Prefixes, result.Objects = triageObjects(objects)
|
||||
|
||||
return &result, nil
|
||||
}
|
||||
|
||||
// ListObjectsV2 returns objects in a bucket for requests of Version 2.
|
||||
func (n *layer) ListObjectsV2(ctx context.Context, p *ListObjectsParamsV2) (*ListObjectsInfoV2, error) {
|
||||
var (
|
||||
err error
|
||||
result ListObjectsInfoV2
|
||||
allObjects []*data.ObjectInfo
|
||||
)
|
||||
var result ListObjectsInfoV2
|
||||
|
||||
if p.MaxKeys == 0 {
|
||||
return &result, nil
|
||||
prm := allObjectParams{
|
||||
Bucket: p.BktInfo,
|
||||
Delimiter: p.Delimiter,
|
||||
Prefix: p.Prefix,
|
||||
MaxKeys: p.MaxKeys,
|
||||
Marker: p.StartAfter,
|
||||
ContinuationToken: p.ContinuationToken,
|
||||
}
|
||||
|
||||
if allObjects, err = n.listAllObjects(ctx, p.ListObjectsParamsCommon); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if len(allObjects) == 0 {
|
||||
return &result, nil
|
||||
}
|
||||
|
||||
if p.ContinuationToken != "" {
|
||||
allObjects = trimAfterObjectID(p.ContinuationToken, allObjects)
|
||||
}
|
||||
|
||||
if p.StartAfter != "" {
|
||||
allObjects = trimAfterObjectName(p.StartAfter, allObjects)
|
||||
}
|
||||
|
||||
if len(allObjects) > p.MaxKeys {
|
||||
result.IsTruncated = true
|
||||
allObjects = allObjects[:p.MaxKeys]
|
||||
result.NextContinuationToken = allObjects[len(allObjects)-1].ID.EncodeToString()
|
||||
}
|
||||
|
||||
result.Prefixes, result.Objects = triageObjects(allObjects)
|
||||
|
||||
return &result, nil
|
||||
}
|
||||
|
||||
func (n *layer) listSortedObjects(ctx context.Context, p allObjectParams) ([]*data.ObjectInfo, error) {
|
||||
objects, err := n.getLatestObjectsVersions(ctx, p.Bucket, p.Prefix, p.Delimiter)
|
||||
objects, next, err := n.getLatestObjectsVersions(ctx, prm)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
sort.Slice(objects, func(i, j int) bool {
|
||||
return objects[i].Name < objects[j].Name
|
||||
})
|
||||
if next != nil {
|
||||
result.IsTruncated = true
|
||||
result.NextContinuationToken = next.ID.EncodeToString()
|
||||
}
|
||||
|
||||
return objects, nil
|
||||
result.Prefixes, result.Objects = triageObjects(objects)
|
||||
|
||||
return &result, nil
|
||||
}
|
||||
|
||||
func (n *layer) getLatestObjectsVersions(ctx context.Context, bkt *data.BucketInfo, prefix, delimiter string) ([]*data.ObjectInfo, error) {
|
||||
func (n *layer) getLatestObjectsVersions(ctx context.Context, p allObjectParams) ([]*data.ObjectInfo, *data.ObjectInfo, error) {
|
||||
if p.MaxKeys == 0 {
|
||||
return nil, nil, nil
|
||||
}
|
||||
|
||||
var err error
|
||||
|
||||
cacheKey := cache.CreateObjectsListCacheKey(&bkt.CID, prefix, true)
|
||||
ids := n.listsCache.Get(cacheKey)
|
||||
cacheKey := cache.CreateObjectsListCacheKey(&p.Bucket.CID, p.Prefix, true)
|
||||
nodeVersions := n.listsCache.GetVersions(cacheKey)
|
||||
|
||||
if ids == nil {
|
||||
ids, err = n.treeService.GetLatestVersionsByPrefix(ctx, &bkt.CID, prefix)
|
||||
if nodeVersions == nil {
|
||||
nodeVersions, err = n.treeService.GetLatestVersionsByPrefix(ctx, &p.Bucket.CID, p.Prefix)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, nil, err
|
||||
}
|
||||
if err := n.listsCache.Put(cacheKey, ids); err != nil {
|
||||
if err = n.listsCache.PutVersions(cacheKey, nodeVersions); err != nil {
|
||||
n.log.Error("couldn't cache list of objects", zap.Error(err))
|
||||
}
|
||||
}
|
||||
|
||||
objectsMap := make(map[string]*data.ObjectInfo, len(ids)) // to squash the same directories
|
||||
for i := 0; i < len(ids); i++ {
|
||||
obj := n.objectFromObjectsCacheOrNeoFS(ctx, bkt, ids[i])
|
||||
if obj == nil {
|
||||
sort.Slice(nodeVersions, func(i, j int) bool {
|
||||
return nodeVersions[i].FilePath < nodeVersions[j].FilePath
|
||||
})
|
||||
|
||||
existed := make(map[string]struct{}, len(nodeVersions)) // to squash the same directories
|
||||
objects := make([]*data.ObjectInfo, 0, p.MaxKeys)
|
||||
|
||||
for _, node := range nodeVersions {
|
||||
if shouldSkip(node, p, existed) {
|
||||
continue
|
||||
}
|
||||
if oi := objectInfoFromMeta(bkt, obj, prefix, delimiter); oi != nil {
|
||||
objectsMap[oi.Name] = oi
|
||||
|
||||
if len(objects) == p.MaxKeys {
|
||||
return objects, &data.ObjectInfo{ID: node.OID, Name: node.FilePath}, nil
|
||||
}
|
||||
|
||||
if obj := n.objectFromObjectsCacheOrNeoFS(ctx, p.Bucket, node.OID); obj != nil {
|
||||
if oi := objectInfoFromMeta(p.Bucket, obj, p.Prefix, p.Delimiter); oi != nil {
|
||||
objects = append(objects, oi)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
objects := make([]*data.ObjectInfo, 0, len(objectsMap))
|
||||
for _, obj := range objectsMap {
|
||||
objects = append(objects, obj)
|
||||
}
|
||||
|
||||
return objects, nil
|
||||
return objects, nil, nil
|
||||
}
|
||||
|
||||
func (n *layer) getAllObjectsVersions(ctx context.Context, bkt *data.BucketInfo, prefix, delimiter string) (map[string][]*data.ExtendedObjectInfo, error) {
|
||||
|
@ -488,7 +477,7 @@ func (n *layer) getAllObjectsVersions(ctx context.Context, bkt *data.BucketInfo,
|
|||
|
||||
if nodeVersion.DeleteMarker != nil { // delete marker does not match any object in NeoFS
|
||||
oi.ID = nodeVersion.OID
|
||||
oi.Name = nodeVersion.DeleteMarker.FilePath
|
||||
oi.Name = nodeVersion.FilePath
|
||||
oi.Owner = nodeVersion.DeleteMarker.Owner
|
||||
oi.Created = nodeVersion.DeleteMarker.Created
|
||||
oi.IsDeleteMarker = true
|
||||
|
@ -524,30 +513,34 @@ func IsSystemHeader(key string) bool {
|
|||
return strings.HasPrefix(key, "S3-")
|
||||
}
|
||||
|
||||
func trimAfterObjectName(startAfter string, objects []*data.ObjectInfo) []*data.ObjectInfo {
|
||||
if len(objects) != 0 && objects[len(objects)-1].Name <= startAfter {
|
||||
return nil
|
||||
func shouldSkip(node *data.NodeVersion, p allObjectParams, existed map[string]struct{}) bool {
|
||||
filepath := node.FilePath
|
||||
if len(p.Delimiter) > 0 {
|
||||
tail := strings.TrimPrefix(filepath, p.Prefix)
|
||||
index := strings.Index(tail, p.Delimiter)
|
||||
if index >= 0 {
|
||||
filepath = p.Prefix + tail[:index+1]
|
||||
}
|
||||
}
|
||||
for i := range objects {
|
||||
if objects[i].Name > startAfter {
|
||||
return objects[i:]
|
||||
if _, ok := existed[filepath]; ok {
|
||||
return true
|
||||
}
|
||||
|
||||
if filepath <= p.Marker {
|
||||
return true
|
||||
}
|
||||
|
||||
if p.ContinuationToken != "" {
|
||||
if _, ok := existed[continuationToken]; !ok {
|
||||
if p.ContinuationToken != node.OID.EncodeToString() {
|
||||
return true
|
||||
}
|
||||
existed[continuationToken] = struct{}{}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func trimAfterObjectID(id string, objects []*data.ObjectInfo) []*data.ObjectInfo {
|
||||
if len(objects) != 0 && objects[len(objects)-1].ID.EncodeToString() == id {
|
||||
return []*data.ObjectInfo{}
|
||||
}
|
||||
for i, obj := range objects {
|
||||
if obj.ID.EncodeToString() == id {
|
||||
return objects[i+1:]
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
existed[filepath] = struct{}{}
|
||||
return false
|
||||
}
|
||||
|
||||
func triageObjects(allObjects []*data.ObjectInfo) (prefixes []string, objects []*data.ObjectInfo) {
|
||||
|
@ -574,24 +567,6 @@ func triageExtendedObjects(allObjects []*data.ExtendedObjectInfo) (prefixes []st
|
|||
return
|
||||
}
|
||||
|
||||
func (n *layer) listAllObjects(ctx context.Context, p ListObjectsParamsCommon) ([]*data.ObjectInfo, error) {
|
||||
var (
|
||||
err error
|
||||
allObjects []*data.ObjectInfo
|
||||
)
|
||||
|
||||
allObjects, err = n.listSortedObjects(ctx, allObjectParams{
|
||||
Bucket: p.BktInfo,
|
||||
Prefix: p.Prefix,
|
||||
Delimiter: p.Delimiter,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return allObjects, nil
|
||||
}
|
||||
|
||||
func (n *layer) isVersioningEnabled(ctx context.Context, bktInfo *data.BucketInfo) bool {
|
||||
settings, err := n.GetBucketSettings(ctx, bktInfo)
|
||||
if err != nil {
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue