Some checks failed
DCO action / DCO (pull_request) Successful in 49s
Vulncheck / Vulncheck (pull_request) Failing after 1m16s
Build / Build Components (pull_request) Successful in 1m27s
Tests and linters / Tests (pull_request) Successful in 2m55s
Tests and linters / Lint (pull_request) Successful in 3m0s
Tests and linters / Staticcheck (pull_request) Successful in 2m59s
Tests and linters / Tests with -race (pull_request) Successful in 3m32s
Tests and linters / gopls check (pull_request) Successful in 4m7s
Add `move` flag to confirm that objects must be moved from storage engine to quarantine. Without this flag only object addresses will be printed. This allows to just print object addresses that should be moved to quarantine. Change-Id: I551979d8bffaf45fe21d92f6edadfaadcb5d6e25 Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
263 lines
7.3 KiB
Go
263 lines
7.3 KiB
Go
package zombie
|
|
|
|
import (
|
|
"context"
|
|
"crypto/ecdsa"
|
|
"crypto/sha256"
|
|
"errors"
|
|
"fmt"
|
|
"sync"
|
|
"time"
|
|
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-adm/internal/commonflags"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config"
|
|
apiclientconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/apiclient"
|
|
commonCmd "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/internal/common"
|
|
clientCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
|
|
netmapCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
|
|
cntClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/container"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/netmap"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network/cache"
|
|
clientSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
|
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
|
"github.com/spf13/cobra"
|
|
"golang.org/x/sync/errgroup"
|
|
)
|
|
|
|
func scan(cmd *cobra.Command, _ []string) {
|
|
configFile, _ := cmd.Flags().GetString(commonflags.ConfigFlag)
|
|
configDir, _ := cmd.Flags().GetString(commonflags.ConfigDirFlag)
|
|
appCfg := config.New(configFile, configDir, config.EnvPrefix)
|
|
batchSize, _ := cmd.Flags().GetUint32(flagBatchSize)
|
|
if batchSize == 0 {
|
|
commonCmd.ExitOnErr(cmd, "invalid batch size: %w", errors.New("batch size must be positive value"))
|
|
}
|
|
move, _ := cmd.Flags().GetBool(moveFlag)
|
|
|
|
storageEngine := newEngine(cmd, appCfg)
|
|
morphClient := createMorphClient(cmd, appCfg)
|
|
cnrCli := createContainerClient(cmd, morphClient)
|
|
nmCli := createNetmapClient(cmd, morphClient)
|
|
q := createQuarantine(cmd, storageEngine.DumpInfo())
|
|
pk := getPrivateKey(cmd, appCfg)
|
|
|
|
epoch, err := nmCli.Epoch()
|
|
commonCmd.ExitOnErr(cmd, "read epoch from morph: %w", err)
|
|
|
|
nm, err := nmCli.GetNetMapByEpoch(epoch)
|
|
commonCmd.ExitOnErr(cmd, "read netmap from morph: %w", err)
|
|
|
|
cmd.Printf("Epoch: %d\n", nm.Epoch())
|
|
cmd.Printf("Nodes in the netmap: %d\n", len(nm.Nodes()))
|
|
|
|
ps := &processStatus{
|
|
statusCount: make(map[status]uint64),
|
|
}
|
|
|
|
stopCh := make(chan struct{})
|
|
start := time.Now()
|
|
var wg sync.WaitGroup
|
|
wg.Add(2)
|
|
go func() {
|
|
defer wg.Done()
|
|
tick := time.NewTicker(time.Second)
|
|
defer tick.Stop()
|
|
for {
|
|
select {
|
|
case <-cmd.Context().Done():
|
|
return
|
|
case <-stopCh:
|
|
return
|
|
case <-tick.C:
|
|
fmt.Printf("Objects processed: %d; Time elapsed: %s\n", ps.total(), time.Since(start))
|
|
}
|
|
}
|
|
}()
|
|
go func() {
|
|
defer wg.Done()
|
|
err = scanStorageEngine(cmd, batchSize, storageEngine, ps, appCfg, cnrCli, nmCli, q, pk, move)
|
|
close(stopCh)
|
|
}()
|
|
wg.Wait()
|
|
commonCmd.ExitOnErr(cmd, "scan storage engine for zombie objects: %w", err)
|
|
|
|
cmd.Println()
|
|
cmd.Println("Status description:")
|
|
cmd.Println("undefined -- nothing is clear")
|
|
cmd.Println("found -- object is found in cluster")
|
|
cmd.Println("quarantine -- object is not found in cluster")
|
|
cmd.Println()
|
|
for status, count := range ps.statusCount {
|
|
cmd.Printf("Status: %s, Count: %d\n", status, count)
|
|
}
|
|
}
|
|
|
|
type status string
|
|
|
|
const (
|
|
statusUndefined status = "undefined"
|
|
statusFound status = "found"
|
|
statusQuarantine status = "quarantine"
|
|
)
|
|
|
|
func checkAddr(ctx context.Context, cnrCli *cntClient.Client, nmCli *netmap.Client, cc *cache.ClientCache, addr oid.Address) (status, error) {
|
|
rawCID := make([]byte, sha256.Size)
|
|
cid := addr.Container()
|
|
cid.Encode(rawCID)
|
|
|
|
cnr, err := cnrCli.Get(rawCID)
|
|
if err != nil {
|
|
return statusUndefined, fmt.Errorf("read container %s from morph: %w", cid, err)
|
|
}
|
|
nm, err := nmCli.NetMap()
|
|
if err != nil {
|
|
return statusUndefined, fmt.Errorf("read netmap from morph: %w", err)
|
|
}
|
|
|
|
nodes, err := nm.ContainerNodes(cnr.Value.PlacementPolicy(), rawCID)
|
|
if err != nil {
|
|
// Not enough nodes, check all netmap nodes.
|
|
nodes = append([][]netmap.NodeInfo{}, nm.Nodes())
|
|
}
|
|
|
|
objID := addr.Object()
|
|
cnrID := addr.Container()
|
|
prm := clientSDK.PrmObjectHead{
|
|
ObjectID: &objID,
|
|
ContainerID: &cnrID,
|
|
Local: true,
|
|
}
|
|
|
|
var ni clientCore.NodeInfo
|
|
for i := range nodes {
|
|
for j := range nodes[i] {
|
|
if err := clientCore.NodeInfoFromRawNetmapElement(&ni, netmapCore.Node(nodes[i][j])); err != nil {
|
|
return statusUndefined, fmt.Errorf("parse node info: %w", err)
|
|
}
|
|
c, err := cc.Get(ni)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
res, err := c.ObjectHead(ctx, prm)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
if err := apistatus.ErrFromStatus(res.Status()); err != nil {
|
|
continue
|
|
}
|
|
return statusFound, nil
|
|
}
|
|
}
|
|
|
|
if cnr.Value.PlacementPolicy().NumberOfReplicas() == 1 && cnr.Value.PlacementPolicy().ReplicaNumberByIndex(0) == 1 {
|
|
return statusFound, nil
|
|
}
|
|
return statusQuarantine, nil
|
|
}
|
|
|
|
func scanStorageEngine(cmd *cobra.Command, batchSize uint32, storageEngine *engine.StorageEngine, ps *processStatus,
|
|
appCfg *config.Config, cnrCli *cntClient.Client, nmCli *netmap.Client, q *quarantine, pk *ecdsa.PrivateKey, move bool,
|
|
) error {
|
|
cc := cache.NewSDKClientCache(cache.ClientCacheOpts{
|
|
DialTimeout: apiclientconfig.DialTimeout(appCfg),
|
|
StreamTimeout: apiclientconfig.StreamTimeout(appCfg),
|
|
ReconnectTimeout: apiclientconfig.ReconnectTimeout(appCfg),
|
|
Key: pk,
|
|
AllowExternal: apiclientconfig.AllowExternal(appCfg),
|
|
})
|
|
ctx := cmd.Context()
|
|
|
|
var cursor *engine.Cursor
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
default:
|
|
}
|
|
|
|
var prm engine.ListWithCursorPrm
|
|
prm.WithCursor(cursor)
|
|
prm.WithCount(batchSize)
|
|
|
|
res, err := storageEngine.ListWithCursor(ctx, prm)
|
|
if err != nil {
|
|
if errors.Is(err, engine.ErrEndOfListing) {
|
|
return nil
|
|
}
|
|
return fmt.Errorf("list with cursor: %w", err)
|
|
}
|
|
|
|
cursor = res.Cursor()
|
|
addrList := res.AddressList()
|
|
eg, egCtx := errgroup.WithContext(ctx)
|
|
eg.SetLimit(int(batchSize))
|
|
|
|
for i := range addrList {
|
|
addr := addrList[i]
|
|
eg.Go(func() error {
|
|
result, err := checkAddr(egCtx, cnrCli, nmCli, cc, addr.Address)
|
|
if err != nil {
|
|
return fmt.Errorf("check object %s status: %w", addr.Address, err)
|
|
}
|
|
ps.add(result)
|
|
|
|
if !move && result == statusQuarantine {
|
|
cmd.Println(addr)
|
|
return nil
|
|
}
|
|
|
|
if result == statusQuarantine {
|
|
return moveToQuarantine(egCtx, storageEngine, q, addr.Address)
|
|
}
|
|
return nil
|
|
})
|
|
}
|
|
if err := eg.Wait(); err != nil {
|
|
return fmt.Errorf("process objects batch: %w", err)
|
|
}
|
|
}
|
|
}
|
|
|
|
func moveToQuarantine(ctx context.Context, storageEngine *engine.StorageEngine, q *quarantine, addr oid.Address) error {
|
|
var getPrm engine.GetPrm
|
|
getPrm.WithAddress(addr)
|
|
res, err := storageEngine.Get(ctx, getPrm)
|
|
if err != nil {
|
|
return fmt.Errorf("get object %s from storage engine: %w", addr, err)
|
|
}
|
|
|
|
if err := q.Put(ctx, res.Object()); err != nil {
|
|
return fmt.Errorf("put object %s to quarantine: %w", addr, err)
|
|
}
|
|
|
|
var delPrm engine.DeletePrm
|
|
delPrm.WithForceRemoval()
|
|
delPrm.WithAddress(addr)
|
|
|
|
_, err = storageEngine.Delete(ctx, delPrm)
|
|
if err != nil {
|
|
return fmt.Errorf("delete object %s from storage engine: %w", addr, err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
type processStatus struct {
|
|
guard sync.RWMutex
|
|
statusCount map[status]uint64
|
|
count uint64
|
|
}
|
|
|
|
func (s *processStatus) add(st status) {
|
|
s.guard.Lock()
|
|
defer s.guard.Unlock()
|
|
s.statusCount[st]++
|
|
s.count++
|
|
}
|
|
|
|
func (s *processStatus) total() uint64 {
|
|
s.guard.RLock()
|
|
defer s.guard.RUnlock()
|
|
return s.count
|
|
}
|