[#426] cli: Add object nodes command
All checks were successful
ci/woodpecker/push/pre-commit Pipeline was successful
All checks were successful
ci/woodpecker/push/pre-commit Pipeline was successful
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
parent
ab489265b3
commit
72fedff7ad
2 changed files with 361 additions and 1 deletions
358
cmd/frostfs-cli/modules/object/nodes.go
Normal file
358
cmd/frostfs-cli/modules/object/nodes.go
Normal file
|
@ -0,0 +1,358 @@
|
||||||
|
package object
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"crypto/ecdsa"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"strconv"
|
||||||
|
"sync"
|
||||||
|
"text/tabwriter"
|
||||||
|
|
||||||
|
internalclient "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/client"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/commonflags"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/key"
|
||||||
|
commonCmd "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/internal/common"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
||||||
|
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||||
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||||
|
netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
||||||
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
|
"github.com/spf13/cobra"
|
||||||
|
"golang.org/x/sync/errgroup"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
verifyPresenceAllFlag = "verify-presence-all"
|
||||||
|
)
|
||||||
|
|
||||||
|
type objectNodesInfo struct {
|
||||||
|
containerID cid.ID
|
||||||
|
objectID oid.ID
|
||||||
|
relatedObjectIDs []oid.ID
|
||||||
|
isLock bool
|
||||||
|
}
|
||||||
|
|
||||||
|
type boolError struct {
|
||||||
|
value bool
|
||||||
|
err error
|
||||||
|
}
|
||||||
|
|
||||||
|
var objectNodesCmd = &cobra.Command{
|
||||||
|
Use: "nodes",
|
||||||
|
Short: "List of nodes where the object is stored",
|
||||||
|
Long: `List of nodes where the object should be stored and where it is actually stored.
|
||||||
|
Lock objects must exist on all nodes of the container.
|
||||||
|
For complex objects, a node is considered to store an object if the node stores at least one part of the complex object.
|
||||||
|
By default, the actual storage of the object is checked only on the nodes that should store the object. To check all nodes, use the flag --verify-presence-all.`,
|
||||||
|
Run: objectNodes,
|
||||||
|
}
|
||||||
|
|
||||||
|
func initObjectNodesCmd() {
|
||||||
|
commonflags.Init(objectNodesCmd)
|
||||||
|
|
||||||
|
flags := objectNodesCmd.Flags()
|
||||||
|
|
||||||
|
flags.String(commonflags.CIDFlag, "", commonflags.CIDFlagUsage)
|
||||||
|
_ = objectGetCmd.MarkFlagRequired(commonflags.CIDFlag)
|
||||||
|
|
||||||
|
flags.String(commonflags.OIDFlag, "", commonflags.OIDFlagUsage)
|
||||||
|
_ = objectGetCmd.MarkFlagRequired(commonflags.OIDFlag)
|
||||||
|
|
||||||
|
flags.Bool("verify-presence-all", false, "Verify the actual presence of the object on all netmap nodes")
|
||||||
|
}
|
||||||
|
|
||||||
|
func objectNodes(cmd *cobra.Command, _ []string) {
|
||||||
|
var cnrID cid.ID
|
||||||
|
var objID oid.ID
|
||||||
|
readObjectAddress(cmd, &cnrID, &objID)
|
||||||
|
|
||||||
|
pk := key.GetOrGenerate(cmd)
|
||||||
|
cli := internalclient.GetSDKClientByFlag(cmd, pk, commonflags.RPC)
|
||||||
|
|
||||||
|
objectInfo := getObjectInfo(cmd, cnrID, objID, cli, pk)
|
||||||
|
|
||||||
|
placementPolicy, netmap := getPlacementPolicyAndNetmap(cmd, cnrID, cli)
|
||||||
|
|
||||||
|
requiredPlacement := getRequiredPlacement(cmd, objectInfo, placementPolicy, netmap)
|
||||||
|
|
||||||
|
actualPlacement := getActualPlacement(cmd, netmap, requiredPlacement, pk, objectInfo)
|
||||||
|
|
||||||
|
printPlacement(cmd, netmap, requiredPlacement, actualPlacement)
|
||||||
|
}
|
||||||
|
|
||||||
|
func getObjectInfo(cmd *cobra.Command, cnrID cid.ID, objID oid.ID, cli *client.Client, pk *ecdsa.PrivateKey) *objectNodesInfo {
|
||||||
|
var addrObj oid.Address
|
||||||
|
addrObj.SetContainer(cnrID)
|
||||||
|
addrObj.SetObject(objID)
|
||||||
|
|
||||||
|
var prmHead internalclient.HeadObjectPrm
|
||||||
|
prmHead.SetClient(cli)
|
||||||
|
prmHead.SetAddress(addrObj)
|
||||||
|
prmHead.SetRawFlag(true)
|
||||||
|
|
||||||
|
Prepare(cmd, &prmHead)
|
||||||
|
readSession(cmd, &prmHead, pk, cnrID, objID)
|
||||||
|
|
||||||
|
res, err := internalclient.HeadObject(cmd.Context(), prmHead)
|
||||||
|
if err == nil {
|
||||||
|
return &objectNodesInfo{
|
||||||
|
containerID: cnrID,
|
||||||
|
objectID: objID,
|
||||||
|
isLock: res.Header().Type() == objectSDK.TypeLock,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
var errSplitInfo *objectSDK.SplitInfoError
|
||||||
|
|
||||||
|
if !errors.As(err, &errSplitInfo) {
|
||||||
|
commonCmd.ExitOnErr(cmd, "failed to get object info: %w", err)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
splitInfo := errSplitInfo.SplitInfo()
|
||||||
|
|
||||||
|
if members, ok := tryGetSplitMembersByLinkingObject(cmd, splitInfo, prmHead, cnrID); ok {
|
||||||
|
return &objectNodesInfo{
|
||||||
|
containerID: cnrID,
|
||||||
|
objectID: objID,
|
||||||
|
relatedObjectIDs: members,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if members, ok := tryGetSplitMembersBySplitID(cmd, splitInfo, cli, cnrID); ok {
|
||||||
|
return &objectNodesInfo{
|
||||||
|
containerID: cnrID,
|
||||||
|
objectID: objID,
|
||||||
|
relatedObjectIDs: members,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
members := tryRestoreChainInReverse(cmd, splitInfo, prmHead, cli, cnrID, objID)
|
||||||
|
return &objectNodesInfo{
|
||||||
|
containerID: cnrID,
|
||||||
|
objectID: objID,
|
||||||
|
relatedObjectIDs: members,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func getPlacementPolicyAndNetmap(cmd *cobra.Command, cnrID cid.ID, cli *client.Client) (placementPolicy netmapSDK.PlacementPolicy, netmap *netmapSDK.NetMap) {
|
||||||
|
eg, egCtx := errgroup.WithContext(cmd.Context())
|
||||||
|
eg.Go(func() (e error) {
|
||||||
|
placementPolicy, e = getPlacementPolicy(egCtx, cnrID, cli)
|
||||||
|
return
|
||||||
|
})
|
||||||
|
eg.Go(func() (e error) {
|
||||||
|
netmap, e = getNetMap(egCtx, cli)
|
||||||
|
return
|
||||||
|
})
|
||||||
|
commonCmd.ExitOnErr(cmd, "rpc error: %w", eg.Wait())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func getPlacementPolicy(ctx context.Context, cnrID cid.ID, cli *client.Client) (netmapSDK.PlacementPolicy, error) {
|
||||||
|
var prm internalclient.GetContainerPrm
|
||||||
|
prm.SetClient(cli)
|
||||||
|
prm.SetContainer(cnrID)
|
||||||
|
|
||||||
|
res, err := internalclient.GetContainer(ctx, prm)
|
||||||
|
if err != nil {
|
||||||
|
return netmapSDK.PlacementPolicy{}, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return res.Container().PlacementPolicy(), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func getNetMap(ctx context.Context, cli *client.Client) (*netmapSDK.NetMap, error) {
|
||||||
|
var prm internalclient.NetMapSnapshotPrm
|
||||||
|
prm.SetClient(cli)
|
||||||
|
|
||||||
|
res, err := internalclient.NetMapSnapshot(ctx, prm)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
nm := res.NetMap()
|
||||||
|
return &nm, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func getRequiredPlacement(cmd *cobra.Command, objInfo *objectNodesInfo, placementPolicy netmapSDK.PlacementPolicy, netmap *netmapSDK.NetMap) map[uint64]netmapSDK.NodeInfo {
|
||||||
|
nodes := make(map[uint64]netmapSDK.NodeInfo)
|
||||||
|
placementBuilder := placement.NewNetworkMapBuilder(netmap)
|
||||||
|
placement, err := placementBuilder.BuildPlacement(objInfo.containerID, &objInfo.objectID, placementPolicy)
|
||||||
|
commonCmd.ExitOnErr(cmd, "failed to get required placement: %w", err)
|
||||||
|
for repIdx, rep := range placement {
|
||||||
|
numOfReplicas := placementPolicy.ReplicaNumberByIndex(repIdx)
|
||||||
|
var nodeIdx uint32
|
||||||
|
for _, n := range rep {
|
||||||
|
if !objInfo.isLock && nodeIdx == numOfReplicas { //lock object should be on all container nodes
|
||||||
|
break
|
||||||
|
}
|
||||||
|
nodes[n.Hash()] = n
|
||||||
|
nodeIdx++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, relatedObjID := range objInfo.relatedObjectIDs {
|
||||||
|
placement, err = placementBuilder.BuildPlacement(objInfo.containerID, &relatedObjID, placementPolicy)
|
||||||
|
commonCmd.ExitOnErr(cmd, "failed to get required placement for related object: %w", err)
|
||||||
|
for _, rep := range placement {
|
||||||
|
for _, n := range rep {
|
||||||
|
nodes[n.Hash()] = n
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nodes
|
||||||
|
}
|
||||||
|
|
||||||
|
func getActualPlacement(cmd *cobra.Command, netmap *netmapSDK.NetMap, requiredPlacement map[uint64]netmapSDK.NodeInfo,
|
||||||
|
pk *ecdsa.PrivateKey, objInfo *objectNodesInfo) map[uint64]boolError {
|
||||||
|
result := make(map[uint64]boolError)
|
||||||
|
resultMtx := &sync.Mutex{}
|
||||||
|
|
||||||
|
var candidates []netmapSDK.NodeInfo
|
||||||
|
checkAllNodes, _ := cmd.Flags().GetBool(verifyPresenceAllFlag)
|
||||||
|
if checkAllNodes {
|
||||||
|
candidates = netmap.Nodes()
|
||||||
|
} else {
|
||||||
|
for _, n := range requiredPlacement {
|
||||||
|
candidates = append(candidates, n)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
var err error
|
||||||
|
eg, egCtx := errgroup.WithContext(cmd.Context())
|
||||||
|
for _, cand := range candidates {
|
||||||
|
cand := cand
|
||||||
|
|
||||||
|
eg.Go(func() error {
|
||||||
|
var cli *client.Client
|
||||||
|
cli, err = createClient(egCtx, cmd, cand, pk)
|
||||||
|
if err != nil {
|
||||||
|
resultMtx.Lock()
|
||||||
|
defer resultMtx.Unlock()
|
||||||
|
result[cand.Hash()] = boolError{err: err}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
eg.Go(func() error {
|
||||||
|
var v boolError
|
||||||
|
v.value, v.err = isObjectStoredOnNode(egCtx, cmd, objInfo.containerID, objInfo.objectID, cli, pk)
|
||||||
|
resultMtx.Lock()
|
||||||
|
defer resultMtx.Unlock()
|
||||||
|
if prev, exists := result[cand.Hash()]; exists && (prev.err != nil || prev.value) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
result[cand.Hash()] = v
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
|
||||||
|
for _, rObjID := range objInfo.relatedObjectIDs {
|
||||||
|
rObjID := rObjID
|
||||||
|
eg.Go(func() error {
|
||||||
|
var v boolError
|
||||||
|
v.value, v.err = isObjectStoredOnNode(egCtx, cmd, objInfo.containerID, rObjID, cli, pk)
|
||||||
|
resultMtx.Lock()
|
||||||
|
defer resultMtx.Unlock()
|
||||||
|
if prev, exists := result[cand.Hash()]; exists && (prev.err != nil || prev.value) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
result[cand.Hash()] = v
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
egErr := eg.Wait()
|
||||||
|
if err != nil || egErr != nil {
|
||||||
|
if err == nil {
|
||||||
|
err = egErr
|
||||||
|
}
|
||||||
|
commonCmd.ExitOnErr(cmd, "failed to get actual placement: %w", err)
|
||||||
|
}
|
||||||
|
return result
|
||||||
|
}
|
||||||
|
|
||||||
|
func createClient(ctx context.Context, cmd *cobra.Command, candidate netmapSDK.NodeInfo, pk *ecdsa.PrivateKey) (*client.Client, error) {
|
||||||
|
var cli *client.Client
|
||||||
|
var addresses []string
|
||||||
|
candidate.IterateNetworkEndpoints(func(s string) bool {
|
||||||
|
addresses = append(addresses, s)
|
||||||
|
return false
|
||||||
|
})
|
||||||
|
var lastErr error
|
||||||
|
for _, address := range addresses {
|
||||||
|
var networkAddr network.Address
|
||||||
|
lastErr = networkAddr.FromString(address)
|
||||||
|
if lastErr != nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
cli, lastErr = internalclient.GetSDKClient(ctx, cmd, pk, networkAddr)
|
||||||
|
if lastErr == nil {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if lastErr != nil {
|
||||||
|
return nil, lastErr
|
||||||
|
}
|
||||||
|
if cli == nil {
|
||||||
|
return nil, fmt.Errorf("failed to create client: no available endpoint")
|
||||||
|
}
|
||||||
|
return cli, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func isObjectStoredOnNode(ctx context.Context, cmd *cobra.Command, cnrID cid.ID, objID oid.ID, cli *client.Client, pk *ecdsa.PrivateKey) (bool, error) {
|
||||||
|
var addrObj oid.Address
|
||||||
|
addrObj.SetContainer(cnrID)
|
||||||
|
addrObj.SetObject(objID)
|
||||||
|
|
||||||
|
var prmHead internalclient.HeadObjectPrm
|
||||||
|
prmHead.SetClient(cli)
|
||||||
|
prmHead.SetAddress(addrObj)
|
||||||
|
|
||||||
|
Prepare(cmd, &prmHead)
|
||||||
|
prmHead.SetTTL(1)
|
||||||
|
readSession(cmd, &prmHead, pk, cnrID, objID)
|
||||||
|
|
||||||
|
res, err := internalclient.HeadObject(ctx, prmHead)
|
||||||
|
if err == nil && res != nil {
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
var notFound *apistatus.ObjectNotFound
|
||||||
|
var removed *apistatus.ObjectAlreadyRemoved
|
||||||
|
if errors.As(err, ¬Found) || errors.As(err, &removed) {
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func printPlacement(cmd *cobra.Command, netmap *netmapSDK.NetMap, requiredPlacement map[uint64]netmapSDK.NodeInfo, actualPlacement map[uint64]boolError) {
|
||||||
|
w := tabwriter.NewWriter(cmd.OutOrStdout(), 0, 0, 1, ' ', tabwriter.AlignRight|tabwriter.Debug)
|
||||||
|
defer func() {
|
||||||
|
commonCmd.ExitOnErr(cmd, "failed to print placement info: %w", w.Flush())
|
||||||
|
}()
|
||||||
|
fmt.Fprintln(w, "Netmap node\tShould contain object\tActually contains object\t")
|
||||||
|
for _, n := range netmap.Nodes() {
|
||||||
|
var address string
|
||||||
|
n.IterateNetworkEndpoints(func(s string) bool {
|
||||||
|
address = s
|
||||||
|
return s != ""
|
||||||
|
})
|
||||||
|
_, required := requiredPlacement[n.Hash()]
|
||||||
|
actual, actualExists := actualPlacement[n.Hash()]
|
||||||
|
actualStr := ""
|
||||||
|
if actualExists {
|
||||||
|
if actual.err != nil {
|
||||||
|
actualStr = fmt.Sprintf("error: %v", actual.err)
|
||||||
|
} else {
|
||||||
|
actualStr = strconv.FormatBool(actual.value)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
fmt.Fprintf(w, "%s\t%s\t%s\t\n", address, strconv.FormatBool(required), actualStr)
|
||||||
|
}
|
||||||
|
}
|
|
@ -30,7 +30,8 @@ func init() {
|
||||||
objectHeadCmd,
|
objectHeadCmd,
|
||||||
objectHashCmd,
|
objectHashCmd,
|
||||||
objectRangeCmd,
|
objectRangeCmd,
|
||||||
objectLockCmd}
|
objectLockCmd,
|
||||||
|
objectNodesCmd}
|
||||||
|
|
||||||
Cmd.AddCommand(objectChildCommands...)
|
Cmd.AddCommand(objectChildCommands...)
|
||||||
|
|
||||||
|
@ -47,4 +48,5 @@ func init() {
|
||||||
initObjectHashCmd()
|
initObjectHashCmd()
|
||||||
initObjectRangeCmd()
|
initObjectRangeCmd()
|
||||||
initCommandObjectLock()
|
initCommandObjectLock()
|
||||||
|
initObjectNodesCmd()
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue