[#653] Support removal old combined objects
Some checks failed
/ DCO (pull_request) Successful in 41s
/ Vulncheck (pull_request) Successful in 1m6s
/ Lint (pull_request) Failing after 1m0s
/ Tests (pull_request) Failing after 1m28s
/ Builds (pull_request) Successful in 1m11s
/ OCI image (pull_request) Successful in 2m1s
Some checks failed
/ DCO (pull_request) Successful in 41s
/ Vulncheck (pull_request) Successful in 1m6s
/ Lint (pull_request) Failing after 1m0s
/ Tests (pull_request) Failing after 1m28s
/ Builds (pull_request) Successful in 1m11s
/ OCI image (pull_request) Successful in 2m1s
Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
This commit is contained in:
parent
4a430257a4
commit
396c89f7bc
8 changed files with 88 additions and 28 deletions
|
@ -20,6 +20,12 @@ type NodeVersion struct {
|
|||
IsCombined bool
|
||||
}
|
||||
|
||||
// OIDInfo represent OID to delete.
|
||||
type OIDInfo struct {
|
||||
ID oid.ID
|
||||
IsCombined bool
|
||||
}
|
||||
|
||||
// ExtendedNodeVersion contains additional node info to be able to sort versions by timestamp.
|
||||
type ExtendedNodeVersion struct {
|
||||
NodeVersion *NodeVersion
|
||||
|
|
|
@ -493,6 +493,33 @@ func TestRemovalOnReplace(t *testing.T) {
|
|||
require.Len(t, hc.MockedPool().AllObjects(bktInfo.CID), 2)
|
||||
}
|
||||
|
||||
func TestRemovalOnReplaceMultipart(t *testing.T) {
|
||||
hc := prepareHandlerContext(t)
|
||||
|
||||
bktName, objName := "bucket", "object"
|
||||
bktInfo := createTestBucket(hc, bktName)
|
||||
|
||||
multipartUpload(hc, bktName, objName, nil, 10, 10)
|
||||
require.Len(t, hc.MockedPool().AllObjects(bktInfo.CID), 2)
|
||||
|
||||
multipartUpload(hc, bktName, objName, nil, 10, 10)
|
||||
require.Len(t, hc.MockedPool().AllObjects(bktInfo.CID), 4)
|
||||
|
||||
hc.layerFeatures.SetRemoveOnReplace(true)
|
||||
|
||||
multipartUpload(hc, bktName, objName, nil, 10, 10)
|
||||
time.Sleep(time.Second)
|
||||
require.Len(t, hc.MockedPool().AllObjects(bktInfo.CID), 4)
|
||||
|
||||
putObject(hc, bktName, objName)
|
||||
time.Sleep(time.Second)
|
||||
require.Len(t, hc.MockedPool().AllObjects(bktInfo.CID), 3)
|
||||
|
||||
multipartUpload(hc, bktName, objName, nil, 10, 10)
|
||||
time.Sleep(time.Second)
|
||||
require.Len(t, hc.MockedPool().AllObjects(bktInfo.CID), 4)
|
||||
}
|
||||
|
||||
func createBucketAndObject(tc *handlerContext, bktName, objName string) (*data.BucketInfo, *data.ObjectInfo) {
|
||||
bktInfo := createTestBucket(tc, bktName)
|
||||
|
||||
|
|
|
@ -72,7 +72,7 @@ type (
|
|||
removalParams struct {
|
||||
Auth frostfs.PrmAuth
|
||||
BktInfo *data.BucketInfo
|
||||
OIDs []oid.ID
|
||||
OIDsInfo []data.OIDInfo
|
||||
RequestID string
|
||||
TraceID string
|
||||
}
|
||||
|
@ -733,8 +733,17 @@ func (n *Layer) removalRoutine(ctx context.Context) {
|
|||
}
|
||||
|
||||
reqCtx, cancel := context.WithTimeout(ctx, n.features.RemoveOnReplaceTimeout())
|
||||
for _, objID := range prm.OIDs {
|
||||
if err := n.objectDeleteBase(reqCtx, prm.BktInfo, objID, prm.Auth); err != nil {
|
||||
for _, oidInfo := range prm.OIDsInfo {
|
||||
if oidInfo.IsCombined {
|
||||
networkInfo, err := n.GetNetworkInfo(ctx)
|
||||
if err == nil {
|
||||
err = n.removeCombinedObject(reqCtx, prm.BktInfo, oidInfo.ID, networkInfo, prm.Auth)
|
||||
}
|
||||
if err != nil {
|
||||
n.log.Warn(logs.FailedToRemoveOldUnversionedCombinedObject, zap.String("request_id", prm.RequestID),
|
||||
zap.String("trace_id", prm.TraceID), zap.Error(err), logs.TagField(logs.TagExternalStorage))
|
||||
}
|
||||
} else if err := n.objectDeleteBase(reqCtx, prm.BktInfo, oidInfo.ID, prm.Auth); err != nil {
|
||||
n.log.Warn(logs.FailedToRemoveOldUnversionedObject, zap.String("request_id", prm.RequestID),
|
||||
zap.String("trace_id", prm.TraceID), zap.Error(err), logs.TagField(logs.TagExternalStorage))
|
||||
}
|
||||
|
@ -744,7 +753,7 @@ func (n *Layer) removalRoutine(ctx context.Context) {
|
|||
}
|
||||
}
|
||||
|
||||
func (n *Layer) tryRemove(ctx context.Context, bktInfo *data.BucketInfo, OIDs []oid.ID) {
|
||||
func (n *Layer) tryRemove(ctx context.Context, bktInfo *data.BucketInfo, OIDsInfo []data.OIDInfo) {
|
||||
if !n.features.RemoveOnReplace() {
|
||||
return
|
||||
}
|
||||
|
@ -753,7 +762,7 @@ func (n *Layer) tryRemove(ctx context.Context, bktInfo *data.BucketInfo, OIDs []
|
|||
prm := removalParams{
|
||||
Auth: frostfs.PrmAuth{},
|
||||
BktInfo: bktInfo,
|
||||
OIDs: OIDs,
|
||||
OIDsInfo: OIDsInfo,
|
||||
RequestID: reqInfo.RequestID,
|
||||
TraceID: reqInfo.TraceID,
|
||||
}
|
||||
|
@ -763,9 +772,9 @@ func (n *Layer) tryRemove(ctx context.Context, bktInfo *data.BucketInfo, OIDs []
|
|||
select {
|
||||
case n.removalChan <- prm:
|
||||
default:
|
||||
oidsStr := make([]string, len(OIDs))
|
||||
for i, d := range OIDs {
|
||||
oidsStr[i] = d.EncodeToString()
|
||||
oidsStr := make([]string, len(OIDsInfo))
|
||||
for i, d := range OIDsInfo {
|
||||
oidsStr[i] = d.ID.EncodeToString()
|
||||
}
|
||||
|
||||
n.reqLogger(ctx).Debug(logs.FailedToQueueOldUnversionedObjectToDelete,
|
||||
|
@ -876,16 +885,16 @@ func (n *Layer) removeOldVersion(ctx context.Context, bkt *data.BucketInfo, node
|
|||
}
|
||||
|
||||
if nodeVersion.IsCombined {
|
||||
return "", n.removeCombinedObject(ctx, bkt, nodeVersion, networkInfo)
|
||||
return "", n.removeCombinedObject(ctx, bkt, nodeVersion.OID, networkInfo, frostfs.PrmAuth{})
|
||||
}
|
||||
|
||||
return "", n.objectDelete(ctx, bkt, nodeVersion.OID)
|
||||
}
|
||||
|
||||
func (n *Layer) removeCombinedObject(ctx context.Context, bkt *data.BucketInfo, nodeVersion *data.NodeVersion, networkInfo netmap.NetworkInfo) error {
|
||||
combinedObj, err := n.objectGet(ctx, bkt, nodeVersion.OID)
|
||||
func (n *Layer) removeCombinedObject(ctx context.Context, bkt *data.BucketInfo, objID oid.ID, networkInfo netmap.NetworkInfo, prmAuth frostfs.PrmAuth) error {
|
||||
combinedObj, err := n.objectGetBase(ctx, bkt, objID, prmAuth)
|
||||
if err != nil {
|
||||
return fmt.Errorf("get combined object '%s': %w", nodeVersion.OID.EncodeToString(), err)
|
||||
return fmt.Errorf("get combined object '%s': %w", objID.EncodeToString(), err)
|
||||
}
|
||||
|
||||
var parts []*data.PartInfo
|
||||
|
@ -893,7 +902,7 @@ func (n *Layer) removeCombinedObject(ctx context.Context, bkt *data.BucketInfo,
|
|||
return fmt.Errorf("unmarshal combined object parts: %w", err)
|
||||
}
|
||||
|
||||
tokens := prepareTokensParameter(ctx, bkt.Owner)
|
||||
tokens := prepareTokensParameterBase(ctx, bkt.Owner, prmAuth)
|
||||
members := make([]oid.ID, 0)
|
||||
// First gateway tries to delete all object parts.
|
||||
// In case of errors, abort multipart removal.
|
||||
|
@ -905,18 +914,18 @@ func (n *Layer) removeCombinedObject(ctx context.Context, bkt *data.BucketInfo,
|
|||
members = append(members, oids...)
|
||||
}
|
||||
|
||||
if err = n.putTombstones(ctx, bkt, networkInfo, members); err != nil {
|
||||
if err = n.putTombstonesBase(ctx, bkt, networkInfo, members, prmAuth); err != nil {
|
||||
return fmt.Errorf("put tombstones with parts: %w", err)
|
||||
}
|
||||
|
||||
// If all parts were removed successfully, remove multipart linking object.
|
||||
// Do not delete this object first, because gateway won't be able to find parts.
|
||||
members, err = n.getMembers(ctx, bkt.CID, nodeVersion.OID, tokens)
|
||||
members, err = n.getMembers(ctx, bkt.CID, objID, tokens)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return n.putTombstones(ctx, bkt, networkInfo, members)
|
||||
return n.putTombstonesBase(ctx, bkt, networkInfo, members, prmAuth)
|
||||
}
|
||||
|
||||
// DeleteObjects from the storage.
|
||||
|
|
|
@ -24,6 +24,10 @@ import (
|
|||
)
|
||||
|
||||
func (n *Layer) putTombstones(ctx context.Context, bkt *data.BucketInfo, networkInfo netmap.NetworkInfo, members []oid.ID) error {
|
||||
return n.putTombstonesBase(ctx, bkt, networkInfo, members, frostfs.PrmAuth{})
|
||||
}
|
||||
|
||||
func (n *Layer) putTombstonesBase(ctx context.Context, bkt *data.BucketInfo, networkInfo netmap.NetworkInfo, members []oid.ID, prmAuth frostfs.PrmAuth) error {
|
||||
if len(members) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
@ -42,7 +46,7 @@ func (n *Layer) putTombstones(ctx context.Context, bkt *data.BucketInfo, network
|
|||
if end > len(members) {
|
||||
end = len(members)
|
||||
}
|
||||
n.submitPutTombstone(ctx, bkt, members[tombstoneMembersSize*i:end], networkInfo.CurrentEpoch()+tombstoneLifetime, &wg, errCh)
|
||||
n.submitPutTombstone(ctx, bkt, members[tombstoneMembersSize*i:end], networkInfo.CurrentEpoch()+tombstoneLifetime, &wg, prmAuth, errCh)
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
|
@ -55,7 +59,7 @@ func (n *Layer) putTombstones(ctx context.Context, bkt *data.BucketInfo, network
|
|||
return nil
|
||||
}
|
||||
|
||||
func (n *Layer) submitPutTombstone(ctx context.Context, bkt *data.BucketInfo, members []oid.ID, expEpoch uint64, wg *sync.WaitGroup, errCh chan<- error) {
|
||||
func (n *Layer) submitPutTombstone(ctx context.Context, bkt *data.BucketInfo, members []oid.ID, expEpoch uint64, wg *sync.WaitGroup, prmAuth frostfs.PrmAuth, errCh chan<- error) {
|
||||
tomb := object.NewTombstone()
|
||||
tomb.SetExpirationEpoch(expEpoch)
|
||||
tomb.SetMembers(members)
|
||||
|
@ -64,7 +68,7 @@ func (n *Layer) submitPutTombstone(ctx context.Context, bkt *data.BucketInfo, me
|
|||
err := n.workerPool.Submit(func() {
|
||||
defer wg.Done()
|
||||
|
||||
if err := n.putTombstoneObject(ctx, tomb, bkt); err != nil {
|
||||
if err := n.putTombstoneObject(ctx, tomb, bkt, prmAuth); err != nil {
|
||||
n.reqLogger(ctx).Warn(logs.FailedToPutTombstoneObject, zap.String("cid", bkt.CID.EncodeToString()), zap.Error(err), logs.TagField(logs.TagExternalStorage))
|
||||
errCh <- fmt.Errorf("put tombstone object: %w", err)
|
||||
}
|
||||
|
@ -76,7 +80,7 @@ func (n *Layer) submitPutTombstone(ctx context.Context, bkt *data.BucketInfo, me
|
|||
}
|
||||
}
|
||||
|
||||
func (n *Layer) putTombstoneObject(ctx context.Context, tomb *object.Tombstone, bktInfo *data.BucketInfo) error {
|
||||
func (n *Layer) putTombstoneObject(ctx context.Context, tomb *object.Tombstone, bktInfo *data.BucketInfo, prmAuth frostfs.PrmAuth) error {
|
||||
payload, err := tomb.Marshal()
|
||||
if err != nil {
|
||||
return fmt.Errorf("marshal tombstone: %w", err)
|
||||
|
@ -91,6 +95,7 @@ func (n *Layer) putTombstoneObject(ctx context.Context, tomb *object.Tombstone,
|
|||
WithoutHomomorphicHash: bktInfo.HomomorphicHashDisabled,
|
||||
BufferMaxSize: n.features.BufferMaxSizeForPut(),
|
||||
Type: object.TypeTombstone,
|
||||
PrmAuth: prmAuth,
|
||||
}
|
||||
n.prepareAuthParameters(ctx, &prm.PrmAuth, bktInfo.Owner)
|
||||
|
||||
|
@ -113,8 +118,17 @@ func (n *Layer) getMembers(ctx context.Context, cnrID cid.ID, objID oid.ID, toke
|
|||
}
|
||||
|
||||
func prepareTokensParameter(ctx context.Context, bktOwner user.ID) relations.Tokens {
|
||||
return prepareTokensParameterBase(ctx, bktOwner, frostfs.PrmAuth{})
|
||||
}
|
||||
|
||||
func prepareTokensParameterBase(ctx context.Context, bktOwner user.ID, prmAuth frostfs.PrmAuth) relations.Tokens {
|
||||
tokens := relations.Tokens{}
|
||||
|
||||
if prmAuth.BearerToken != nil {
|
||||
tokens.Bearer = prmAuth.BearerToken
|
||||
return tokens
|
||||
}
|
||||
|
||||
if bd, err := middleware.GetBoxData(ctx); err == nil && bd.Gate.BearerToken != nil {
|
||||
if bd.Gate.BearerToken.Impersonate() || bktOwner.Equals(bearer.ResolveIssuer(*bd.Gate.BearerToken)) {
|
||||
tokens.Bearer = bd.Gate.BearerToken
|
||||
|
|
|
@ -49,7 +49,7 @@ type Service interface {
|
|||
// AddVersion creates new version in tree.
|
||||
// Returns new node id and object ids of old versions (OIDS) that must be deleted.
|
||||
// OIDs can be returned even if error is not nil.
|
||||
AddVersion(ctx context.Context, bktInfo *data.BucketInfo, newVersion *data.NodeVersion) (uint64, []oid.ID, error)
|
||||
AddVersion(ctx context.Context, bktInfo *data.BucketInfo, newVersion *data.NodeVersion) (uint64, []data.OIDInfo, error)
|
||||
RemoveVersion(ctx context.Context, bktInfo *data.BucketInfo, nodeID uint64) error
|
||||
|
||||
PutLock(ctx context.Context, bktInfo *data.BucketInfo, nodeID uint64, lock *data.LockInfo) error
|
||||
|
|
|
@ -240,7 +240,7 @@ func (t *TreeServiceMock) GetUnversioned(_ context.Context, bktInfo *data.Bucket
|
|||
return nil, tree.ErrNodeNotFound
|
||||
}
|
||||
|
||||
func (t *TreeServiceMock) AddVersion(_ context.Context, bktInfo *data.BucketInfo, newVersion *data.NodeVersion) (uint64, []oid.ID, error) {
|
||||
func (t *TreeServiceMock) AddVersion(_ context.Context, bktInfo *data.BucketInfo, newVersion *data.NodeVersion) (uint64, []data.OIDInfo, error) {
|
||||
cnrVersionsMap, ok := t.versions[bktInfo.CID.EncodeToString()]
|
||||
if !ok {
|
||||
t.versions[bktInfo.CID.EncodeToString()] = map[string][]*data.NodeVersion{
|
||||
|
@ -266,7 +266,7 @@ func (t *TreeServiceMock) AddVersion(_ context.Context, bktInfo *data.BucketInfo
|
|||
|
||||
result := versions
|
||||
|
||||
var oldUnversionedIDs []oid.ID
|
||||
var oldUnversionedIDs []data.OIDInfo
|
||||
|
||||
if newVersion.IsUnversioned {
|
||||
result = make([]*data.NodeVersion, 0, len(versions))
|
||||
|
@ -274,7 +274,7 @@ func (t *TreeServiceMock) AddVersion(_ context.Context, bktInfo *data.BucketInfo
|
|||
if !node.IsUnversioned {
|
||||
result = append(result, node)
|
||||
} else {
|
||||
oldUnversionedIDs = append(oldUnversionedIDs, node.OID)
|
||||
oldUnversionedIDs = append(oldUnversionedIDs, data.OIDInfo{ID: node.OID, IsCombined: node.IsCombined})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -181,6 +181,7 @@ const (
|
|||
CouldNotFetchObjectMeta = "could not fetch object meta"
|
||||
FailedToDeleteObject = "failed to delete object"
|
||||
FailedToRemoveOldUnversionedObject = "failed to remove old unversioned object"
|
||||
FailedToRemoveOldUnversionedCombinedObject = "failed to remove old unversioned combined object"
|
||||
CouldntDeleteLifecycleObject = "couldn't delete lifecycle configuration object"
|
||||
CouldntGetCORSObjectVersions = "couldn't get cors object versions"
|
||||
)
|
||||
|
|
|
@ -1331,7 +1331,7 @@ func (c *Tree) getUnversioned(ctx context.Context, bktInfo *data.BucketInfo, tre
|
|||
return nodes, nil
|
||||
}
|
||||
|
||||
func (c *Tree) AddVersion(ctx context.Context, bktInfo *data.BucketInfo, version *data.NodeVersion) (uint64, []oid.ID, error) {
|
||||
func (c *Tree) AddVersion(ctx context.Context, bktInfo *data.BucketInfo, version *data.NodeVersion) (uint64, []data.OIDInfo, error) {
|
||||
ctx, span := tracing.StartSpanFromContext(ctx, "tree.AddVersion")
|
||||
defer span.End()
|
||||
|
||||
|
@ -1768,7 +1768,7 @@ func (c *Tree) GetObjectTaggingAndLock(ctx context.Context, bktInfo *data.Bucket
|
|||
return getObjectTagging(nodes[isTagKV]), lockInfo, nil
|
||||
}
|
||||
|
||||
func (c *Tree) addVersion(ctx context.Context, bktInfo *data.BucketInfo, treeID string, version *data.NodeVersion) (uint64, []oid.ID, error) {
|
||||
func (c *Tree) addVersion(ctx context.Context, bktInfo *data.BucketInfo, treeID string, version *data.NodeVersion) (uint64, []data.OIDInfo, error) {
|
||||
path := pathFromName(version.FilePath)
|
||||
meta := map[string]string{
|
||||
oidKV: version.OID.EncodeToString(),
|
||||
|
@ -1806,9 +1806,12 @@ func (c *Tree) addVersion(ctx context.Context, bktInfo *data.BucketInfo, treeID
|
|||
return 0, nil, err
|
||||
}
|
||||
|
||||
oldOIDs := make([]oid.ID, len(nodes))
|
||||
oldOIDs := make([]data.OIDInfo, len(nodes))
|
||||
for i, oldNode := range nodes {
|
||||
oldOIDs[i] = oldNode.OID
|
||||
oldOIDs[i] = data.OIDInfo{
|
||||
ID: oldNode.OID,
|
||||
IsCombined: oldNode.IsCombined,
|
||||
}
|
||||
}
|
||||
|
||||
return node.ID, oldOIDs, c.clearOutdatedVersionInfo(ctx, bktInfo, treeID, nodes)
|
||||
|
|
Loading…
Add table
Reference in a new issue