[#122] Update listObjectVerions

Signed-off-by: Denis Kirillov <denis@nspcc.ru>
This commit is contained in:
Denis Kirillov 2021-08-16 12:31:50 +03:00
parent 9c058a70fd
commit f6c51cc9ee
2 changed files with 57 additions and 39 deletions

View file

@ -137,6 +137,7 @@ type (
}
objectVersions struct {
name string
objects []*ObjectInfo
addList []string
delList []string
@ -177,6 +178,10 @@ type (
}
)
func newObjectVersions(name string) *objectVersions {
return &objectVersions{name: name}
}
func (v *objectVersions) appendVersion(oi *ObjectInfo) {
addVers := append(splitVersions(oi.Headers[versionsAddAttr]), oi.Version())
delVers := splitVersions(oi.Headers[versionsDelAttr])
@ -214,7 +219,7 @@ func (v *objectVersions) getLast() *ObjectInfo {
if delMarkHeader == "" {
return v.objects[i]
}
if delMarkHeader == "*" {
if delMarkHeader == delMarkFullObject {
return nil
}
}
@ -223,6 +228,29 @@ func (v *objectVersions) getLast() *ObjectInfo {
return nil
}
func (v *objectVersions) getFiltered() []*ObjectVersionInfo {
if len(v.objects) == 0 {
return nil
}
v.sort()
existedVersions := getExistedVersions(v)
res := make([]*ObjectVersionInfo, 0, len(v.objects))
for _, version := range v.objects {
delMark := version.Headers[versionsDeleteMarkAttr]
if contains(existedVersions, version.Version()) && (delMark == delMarkFullObject || delMark == "") {
res = append(res, &ObjectVersionInfo{Object: version})
}
}
if len(res) > 0 {
res[len(res)-1].IsLatest = true
}
return res
}
func (v *objectVersions) getAddHeader() string {
return strings.Join(v.addList, ",")
}
@ -237,6 +265,10 @@ const (
objectSystemAttributeName = "S3-System-name"
attrVersionsIgnore = "S3-Versions-ignore"
attrSettingsVersioningEnabled = "S3-Settings-Versioning-enabled"
versionsDelAttr = "S3-Versions-del"
versionsAddAttr = "S3-Versions-add"
versionsDeleteMarkAttr = "S3-Versions-delete-mark"
delMarkFullObject = "*"
)
func (t *VersionedObject) String() string {
@ -490,7 +522,7 @@ func (n *layer) deleteObject(ctx context.Context, bkt *BucketInfo, obj *Versione
p.Header[versionsDelAttr] = obj.VersionID
} else {
p.Header[versionsDeleteMarkAttr] = "*"
p.Header[versionsDeleteMarkAttr] = delMarkFullObject
}
if _, err = n.objectPut(ctx, bkt, p); err != nil {
return &errors.DeleteError{Err: err, Object: obj.String()}
@ -606,26 +638,29 @@ func (n *layer) ListObjectVersions(ctx context.Context, p *ListObjectVersionsPar
objVersions.appendVersion(oi)
versions[oi.Name] = objVersions
} else {
objVersion := &objectVersions{}
objVersion := newObjectVersions(oi.Name)
objVersion.appendVersion(oi)
versions[oi.Name] = objVersion
}
}
}
for _, v := range versions {
existed, deleted := triageVersions(v)
res.Version = append(res.Version, existed...)
res.DeleteMarker = append(res.DeleteMarker, deleted...)
sortedNames := make([]string, 0, len(versions))
for k := range versions {
sortedNames = append(sortedNames, k)
}
sort.Strings(sortedNames)
objects := make([]*ObjectVersionInfo, 0, p.MaxKeys)
for _, name := range sortedNames {
objects = append(objects, versions[name].getFiltered()...)
if len(objects) > p.MaxKeys {
objects = objects[:p.MaxKeys]
break
}
}
sort.Slice(res.Version, func(i, j int) bool {
return res.Version[i].Object.Name < res.Version[j].Object.Name
})
sort.Slice(res.DeleteMarker, func(i, j int) bool {
return res.DeleteMarker[i].Object.Name < res.DeleteMarker[j].Object.Name
})
res.Version, res.DeleteMarker = triageVersions(objects)
return &res, nil
}
@ -635,30 +670,19 @@ func sortVersions(versions []*ObjectInfo) {
})
}
func triageVersions(objectVersions *objectVersions) ([]*ObjectVersionInfo, []*ObjectVersionInfo) {
if objectVersions == nil || len(objectVersions.objects) == 0 {
func triageVersions(objVersions []*ObjectVersionInfo) ([]*ObjectVersionInfo, []*ObjectVersionInfo) {
if len(objVersions) == 0 {
return nil, nil
}
sortVersions(objectVersions.objects)
var resVersion []*ObjectVersionInfo
var resDelMarkVersions []*ObjectVersionInfo
isLatest := true
for i := len(objectVersions.objects) - 1; i >= 0; i-- {
version := objectVersions.objects[i]
if contains(objectVersions.delList, version.Version()) {
continue
}
ovi := &ObjectVersionInfo{Object: version, IsLatest: isLatest}
isLatest = false
if len(version.Headers[versionsDeleteMarkAttr]) == 0 {
resVersion = append(resVersion, ovi)
for _, version := range objVersions {
if version.Object.Headers[versionsDeleteMarkAttr] == delMarkFullObject {
resDelMarkVersions = append(resDelMarkVersions, version)
} else {
resDelMarkVersions = append(resDelMarkVersions, ovi)
resVersion = append(resVersion, version)
}
}

View file

@ -64,12 +64,6 @@ type (
}
)
const (
versionsDelAttr = "S3-Versions-del"
versionsAddAttr = "S3-Versions-add"
versionsDeleteMarkAttr = "S3-Versions-delete-mark"
)
// objectSearch returns all available objects by search params.
func (n *layer) objectSearch(ctx context.Context, p *findParams) ([]*object.ID, error) {
var opts object.SearchFilters
@ -272,7 +266,7 @@ func (n *layer) headVersions(ctx context.Context, bkt *BucketInfo, objectName st
return nil, apiErrors.GetAPIError(apiErrors.ErrNoSuchKey)
}
versions := &objectVersions{}
versions := newObjectVersions(objectName)
for _, id := range ids {
meta, err := n.objectHead(ctx, bkt.CID, id)
if err != nil {
@ -418,7 +412,7 @@ func (n *layer) listSortedObjectsFromNeoFS(ctx context.Context, p allObjectParam
objVersions.appendVersion(oi)
versions[oi.Name] = objVersions
} else {
objVersion := &objectVersions{}
objVersion := newObjectVersions(oi.Name)
objVersion.appendVersion(oi)
versions[oi.Name] = objVersion
}