From feb45d0633a35a3cb6bcdd76325ac4ede0378a16 Mon Sep 17 00:00:00 2001 From: Denis Kirillov Date: Tue, 10 Aug 2021 11:19:09 +0300 Subject: [PATCH] [#122] Add replacing objects Signed-off-by: Denis Kirillov --- api/layer/layer.go | 15 ++++--- api/layer/object.go | 104 ++++++++++++++++++++++++++++++++++++++------ 2 files changed, 100 insertions(+), 19 deletions(-) diff --git a/api/layer/layer.go b/api/layer/layer.go index 2ae3cc58..e7d76af9 100644 --- a/api/layer/layer.go +++ b/api/layer/layer.go @@ -320,11 +320,11 @@ func (n *layer) GetObjectInfo(ctx context.Context, bucketName, filename string) return nil, err } - return n.getObjectInfo(ctx, bkt, filename) + return n.headLastVersion(ctx, bkt, filename) } -func (n *layer) getObjectInfo(ctx context.Context, bkt *BucketInfo, objectName string) (*ObjectInfo, error) { - oid, err := n.objectFindID(ctx, &findParams{cid: bkt.CID, val: objectName}) +func (n *layer) getSettingsObjectInfo(ctx context.Context, bkt *BucketInfo) (*ObjectInfo, error) { + oid, err := n.objectFindID(ctx, &findParams{cid: bkt.CID, val: bktVersionSettingsObject}) if err != nil { n.log.Error("could not find object id", zap.Error(err)) return nil, err @@ -527,12 +527,11 @@ func (n *layer) PutBucketVersioning(ctx context.Context, p *PutVersioningParams) return nil, err } - objectInfo, err := n.getObjectInfo(ctx, bucketInfo, bktVersionSettingsObject) + objectInfo, err := n.getSettingsObjectInfo(ctx, bucketInfo) if err != nil { n.log.Warn("couldn't get bucket version settings object, new one will be created", zap.String("bucket_name", bucketInfo.Name), zap.Stringer("cid", bucketInfo.CID), - zap.String("object_name", bktVersionSettingsObject), zap.Error(err)) } @@ -588,5 +587,9 @@ func (n *layer) PutBucketVersioning(ctx context.Context, p *PutVersioningParams) } func (n *layer) GetBucketVersioning(ctx context.Context, bucketName string) (*ObjectInfo, error) { - return n.GetObjectInfo(ctx, bucketName, bktVersionSettingsObject) + bktInfo, err := n.GetBucketInfo(ctx, bucketName) + if err != nil { + return nil, err + } + return n.getSettingsObjectInfo(ctx, bktInfo) } diff --git a/api/layer/object.go b/api/layer/object.go index 77d310d1..7ec3d1cf 100644 --- a/api/layer/object.go +++ b/api/layer/object.go @@ -3,6 +3,7 @@ package layer import ( "context" "errors" + "fmt" "io" "net/url" "sort" @@ -119,24 +120,37 @@ func (n *layer) objectPut(ctx context.Context, p *PutObjectParams) (*ObjectInfo, own = n.Owner(ctx) ) + if p.Object == bktVersionSettingsObject { + return nil, fmt.Errorf("trying put bucket settings object") + } + if obj, err = url.QueryUnescape(p.Object); err != nil { return nil, err - } else if bkt, err = n.GetBucketInfo(ctx, p.Bucket); err != nil { + } + if bkt, err = n.GetBucketInfo(ctx, p.Bucket); err != nil { return nil, err - } else if err = n.checkObject(ctx, bkt.CID, p.Object); err != nil { - var errExist *apiErrors.ObjectAlreadyExists - if ok := errors.As(err, &errExist); ok { - errExist.Bucket = p.Bucket - errExist.Object = p.Object - return nil, errExist - } + } - if !apiErrors.IsS3Error(err, apiErrors.ErrNoSuchKey) { - return nil, err - } + lastVersionInfo, err := n.headLastVersion(ctx, bkt, p.Object) + if err != nil && !apiErrors.IsS3Error(err, apiErrors.ErrNoSuchKey) { + return nil, err } attributes := make([]*object.Attribute, 0, len(p.Header)+1) + var idsToDeleteArr []*object.ID + if lastVersionInfo != nil { + versionsDeletedStr := lastVersionInfo.Headers["S3-Versions-del"] + if len(versionsDeletedStr) != 0 { + versionsDeletedStr += "," + } + versionsDeletedStr += lastVersionInfo.ID().String() + deletedVersions := object.NewAttribute() + deletedVersions.SetKey("S3-Versions-del") + deletedVersions.SetValue(versionsDeletedStr) + + attributes = append(attributes, deletedVersions) + idsToDeleteArr = append(idsToDeleteArr, lastVersionInfo.ID()) + } unix := strconv.FormatInt(time.Now().UTC().Unix(), 10) @@ -187,7 +201,7 @@ func (n *layer) objectPut(ctx context.Context, p *PutObjectParams) (*ObjectInfo, n.log.Error("couldn't cache an object", zap.Error(err)) } - return &ObjectInfo{ + objInfo := &ObjectInfo{ id: oid, Owner: own, @@ -198,7 +212,71 @@ func (n *layer) objectPut(ctx context.Context, p *PutObjectParams) (*ObjectInfo, Headers: p.Header, ContentType: r.contentType, HashSum: meta.PayloadChecksum().String(), - }, nil + } + + for _, id := range idsToDeleteArr { + addr := object.NewAddress() + addr.SetObjectID(id) + addr.SetContainerID(bkt.CID) + + if err = n.objectDelete(ctx, addr); err != nil { + n.log.Warn("couldn't delete object", + zap.Stringer("version id", id), + zap.Error(err)) + } + } + + return objInfo, nil +} + +func (n *layer) headLastVersion(ctx context.Context, bkt *BucketInfo, objectName string) (*ObjectInfo, error) { + ids, err := n.objectSearch(ctx, &findParams{cid: bkt.CID, val: objectName}) + if err != nil { + return nil, err + } + + if len(ids) == 0 { + return nil, api.GetAPIError(api.ErrNoSuchKey) + } + + infos := make([]*object.Object, 0, len(ids)) + for _, id := range ids { + addr := object.NewAddress() + addr.SetContainerID(bkt.CID) + addr.SetObjectID(id) + meta, err := n.objectHead(ctx, addr) + if err != nil { + n.log.Warn("couldn't head object", + zap.Stringer("object id", id), + zap.Stringer("bucket id", bkt.CID), + zap.Error(err)) + continue + } + infos = append(infos, meta) + } + + sort.Slice(infos, func(i, j int) bool { + return infos[i].CreationEpoch() < infos[j].CreationEpoch() || (infos[i].CreationEpoch() == infos[j].CreationEpoch() && infos[i].ID().String() < infos[j].ID().String()) + }) + + return objectInfoFromMeta(bkt, infos[len(infos)-1], "", ""), nil + //versionsToDeleteStr, ok := lastVersionInfo.Headers["S3-Versions-add"] + //versionsDeletedStr := lastVersionInfo.Headers["S3-Versions-del"] + //idsToDeleteArr := []*object.ID{lastVersionInfo.ID()} + //if ok { + // // for versioning mode only + // idsToDelete := strings.Split(versionsToDeleteStr, ",") + // for _, idStr := range idsToDelete { + // oid := object.NewID() + // if err = oid.Parse(idStr); err != nil { + // n.log.Warn("couldn't parse object id versions list", + // zap.String("versions id", versionsToDeleteStr), + // zap.Error(err)) + // break + // } + // idsToDeleteArr = append(idsToDeleteArr, oid) + // } + //} } // objectDelete puts tombstone object into neofs.