Signed-off-by: Denis Kirillov <denis@nspcc.ru>
This commit is contained in:
Denis Kirillov 2022-05-18 10:48:30 +03:00 committed by Alex Vanin
parent 977f176713
commit 37b1baed41
7 changed files with 95 additions and 262 deletions

View file

@ -268,59 +268,6 @@ func (n *layer) putLockObject(ctx context.Context, bktInfo *data.BucketInfo, obj
return nil
}
func updateCRDT2PSetHeaders(header map[string]string, versions *objectVersions, versioningEnabled bool) []oid.ID {
if !versioningEnabled {
header[versionsUnversionedAttr] = "true"
}
var idsToDeleteArr []oid.ID
if versions.isEmpty() {
return idsToDeleteArr
}
if !versions.isAddListEmpty() {
header[versionsAddAttr] = versions.getAddHeader()
}
if versioningEnabled {
versionsDeletedStr := versions.getDelHeader()
// header[versionsDelAttr] can be not empty when deleting specific version
if delAttr := header[versionsDelAttr]; len(delAttr) != 0 {
if len(versionsDeletedStr) != 0 {
header[versionsDelAttr] = versionsDeletedStr + "," + delAttr
} else {
header[versionsDelAttr] = delAttr
}
} else if len(versionsDeletedStr) != 0 {
header[versionsDelAttr] = versionsDeletedStr
}
} else {
versionsDeletedStr := versions.getDelHeader()
var additionalDel string
for i, del := range versions.unversioned() {
if i != 0 {
additionalDel += ","
}
additionalDel += del.Version()
idsToDeleteArr = append(idsToDeleteArr, del.ID)
}
if len(additionalDel) != 0 {
if len(versionsDeletedStr) != 0 {
versionsDeletedStr += ","
}
versionsDeletedStr += additionalDel
}
if len(versionsDeletedStr) != 0 {
header[versionsDelAttr] = versionsDeletedStr
}
}
return idsToDeleteArr
}
func (n *layer) headLastVersionIfNotDeleted(ctx context.Context, bkt *data.BucketInfo, objectName string) (*data.ObjectInfo, error) {
if addr := n.namesCache.Get(bkt.Name + "/" + objectName); addr != nil {
if headInfo := n.objCache.Get(*addr); headInfo != nil {
@ -330,6 +277,9 @@ func (n *layer) headLastVersionIfNotDeleted(ctx context.Context, bkt *data.Bucke
node, err := n.treeService.GetLatestVersion(ctx, &bkt.CID, objectName)
if err != nil {
if errors.Is(err, ErrNodeNotFound) {
return nil, apiErrors.GetAPIError(apiErrors.ErrNoSuchKey)
}
return nil, err
}
@ -410,16 +360,28 @@ func (n *layer) headVersion(ctx context.Context, bkt *data.BucketInfo, p *HeadOb
return objInfo, nil
}
var id oid.ID
if err := id.DecodeString(p.VersionID); err != nil {
return nil, apiErrors.GetAPIError(apiErrors.ErrInvalidVersion)
versions, err := n.treeService.GetVersions(ctx, &bkt.CID, p.Object)
if err != nil {
return nil, fmt.Errorf("couldn't get versions: %w", err)
}
if headInfo := n.objCache.Get(newAddress(bkt.CID, id)); headInfo != nil {
var foundVersion *NodeVersion
for _, version := range versions {
if version.OID.EncodeToString() == p.VersionID {
foundVersion = version
break
}
}
if foundVersion == nil {
return nil, apiErrors.GetAPIError(apiErrors.ErrNoSuchVersion)
}
if headInfo := n.objCache.Get(newAddress(bkt.CID, foundVersion.OID)); headInfo != nil {
return objInfoFromMeta(bkt, headInfo), nil
}
meta, err := n.objectHead(ctx, bkt, id)
meta, err := n.objectHead(ctx, bkt, foundVersion.OID)
if err != nil {
if client.IsErrObjectNotFound(err) {
return nil, apiErrors.GetAPIError(apiErrors.ErrNoSuchVersion)