forked from TrueCloudLab/frostfs-s3-gw
125 lines
4.2 KiB
Go
125 lines
4.2 KiB
Go
package layer
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"fmt"
|
|
"strconv"
|
|
"sync"
|
|
|
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer/frostfs"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/middleware"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/logs"
|
|
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/object"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/relations"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
func (n *Layer) putTombstones(ctx context.Context, bkt *data.BucketInfo, networkInfo netmap.NetworkInfo, members []oid.ID) error {
|
|
if len(members) == 0 {
|
|
return nil
|
|
}
|
|
|
|
var wg sync.WaitGroup
|
|
tombstoneMembersSize := n.features.TombstoneMembersSize()
|
|
tombstoneLifetime := n.features.TombstoneLifetime()
|
|
tombstonesCount := len(members) / tombstoneMembersSize
|
|
if len(members)%tombstoneMembersSize != 0 {
|
|
tombstonesCount++
|
|
}
|
|
errCh := make(chan error, tombstonesCount)
|
|
|
|
for i := 0; i < tombstonesCount; i++ {
|
|
end := tombstoneMembersSize * (i + 1)
|
|
if end > len(members) {
|
|
end = len(members)
|
|
}
|
|
n.submitPutTombstone(ctx, bkt, members[tombstoneMembersSize*i:end], networkInfo.CurrentEpoch()+tombstoneLifetime, &wg, errCh)
|
|
}
|
|
|
|
wg.Wait()
|
|
close(errCh)
|
|
|
|
if err := <-errCh; err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (n *Layer) submitPutTombstone(ctx context.Context, bkt *data.BucketInfo, members []oid.ID, expEpoch uint64, wg *sync.WaitGroup, errCh chan<- error) {
|
|
tomb := object.NewTombstone()
|
|
tomb.SetExpirationEpoch(expEpoch)
|
|
tomb.SetMembers(members)
|
|
|
|
wg.Add(1)
|
|
err := n.workerPool.Submit(func() {
|
|
defer wg.Done()
|
|
|
|
if err := n.putTombstoneObject(ctx, tomb, bkt); err != nil {
|
|
n.reqLogger(ctx).Warn(logs.FailedToPutTombstoneObject, zap.String("cid", bkt.CID.EncodeToString()), zap.Error(err))
|
|
errCh <- fmt.Errorf("put tombstone object: %w", err)
|
|
}
|
|
})
|
|
if err != nil {
|
|
wg.Done()
|
|
n.reqLogger(ctx).Warn(logs.FailedToSubmitTaskToPool, zap.Error(err))
|
|
errCh <- fmt.Errorf("submit task to pool: %w", err)
|
|
}
|
|
}
|
|
|
|
func (n *Layer) putTombstoneObject(ctx context.Context, tomb *object.Tombstone, bktInfo *data.BucketInfo) error {
|
|
payload, err := tomb.Marshal()
|
|
if err != nil {
|
|
return fmt.Errorf("marshal tombstone: %w", err)
|
|
}
|
|
|
|
prm := frostfs.PrmObjectCreate{
|
|
Container: bktInfo.CID,
|
|
Attributes: [][2]string{{objectV2.SysAttributeExpEpoch, strconv.FormatUint(tomb.ExpirationEpoch(), 10)}},
|
|
Payload: bytes.NewReader(payload),
|
|
CreationTime: TimeNow(ctx),
|
|
ClientCut: n.features.ClientCut(),
|
|
WithoutHomomorphicHash: bktInfo.HomomorphicHashDisabled,
|
|
BufferMaxSize: n.features.BufferMaxSizeForPut(),
|
|
Type: object.TypeTombstone,
|
|
}
|
|
n.prepareAuthParameters(ctx, &prm.PrmAuth, bktInfo.Owner)
|
|
|
|
_, err = n.frostFS.CreateObject(ctx, prm)
|
|
return err
|
|
}
|
|
|
|
func (n *Layer) getMembers(ctx context.Context, cnrID cid.ID, objID oid.ID, tokens relations.Tokens) ([]oid.ID, error) {
|
|
oids, err := relations.ListAllRelations(ctx, n.frostFS.Relations(), cnrID, objID, tokens)
|
|
if err != nil {
|
|
if !client.IsErrObjectAlreadyRemoved(err) && !client.IsErrObjectNotFound(err) {
|
|
return nil, fmt.Errorf("failed to list all object relations '%s': %w", objID.EncodeToString(), err)
|
|
}
|
|
|
|
n.reqLogger(ctx).Warn(logs.FailedToListAllObjectRelations, zap.String("cid", cnrID.EncodeToString()),
|
|
zap.String("oid", objID.EncodeToString()), zap.Error(err))
|
|
return nil, nil
|
|
}
|
|
return append(oids, objID), nil
|
|
}
|
|
|
|
func prepareTokensParameter(ctx context.Context, bktOwner user.ID) relations.Tokens {
|
|
tokens := relations.Tokens{}
|
|
|
|
if bd, err := middleware.GetBoxData(ctx); err == nil && bd.Gate.BearerToken != nil {
|
|
if bd.Gate.BearerToken.Impersonate() || bktOwner.Equals(bearer.ResolveIssuer(*bd.Gate.BearerToken)) {
|
|
tokens.Bearer = bd.Gate.BearerToken
|
|
}
|
|
}
|
|
|
|
return tokens
|
|
}
|