frostfs-node/cmd/frostfs-cli/modules/object/nodes.go
Dmitrii Stepanov a45b548a6f [#1120] cli: Add explain to object nodes
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2024-05-08 15:23:58 +03:00

500 lines
16 KiB
Go

package object
import (
"context"
"crypto/ecdsa"
"encoding/hex"
"errors"
"fmt"
"strconv"
"sync"
"text/tabwriter"
internalclient "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/client"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/commonflags"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/key"
commonCmd "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/internal/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/policy"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"github.com/spf13/cobra"
"golang.org/x/sync/errgroup"
)
const (
verifyPresenceAllFlag = "verify-presence-all"
explainFlag = "explain"
)
var errNoAvailableEndpoint = errors.New("failed to create client: no available endpoint")
type phyObject struct {
containerID cid.ID
objectID oid.ID
storedOnAllContainerNodes bool
ecHeader *ecHeader
}
type ecHeader struct {
index uint32
parent oid.ID
}
type boolError struct {
value bool
err error
}
type objectPlacement struct {
requiredNodes []netmapSDK.NodeInfo
confirmedNodes []netmapSDK.NodeInfo
}
var objectNodesCmd = &cobra.Command{
Use: "nodes",
Short: "List of nodes where the object is stored",
Long: `List of nodes where the object should be stored and where it is actually stored.
Lock objects must exist on all nodes of the container.
For complex and EC objects, a node is considered to store an object if the node stores at least one part of the complex object or one chunk of the EC object.
By default, the actual storage of the object is checked only on the nodes that should store the object. To check all nodes, use the flag --verify-presence-all.`,
Run: objectNodes,
}
func initObjectNodesCmd() {
commonflags.Init(objectNodesCmd)
flags := objectNodesCmd.Flags()
flags.String(commonflags.CIDFlag, "", commonflags.CIDFlagUsage)
_ = objectGetCmd.MarkFlagRequired(commonflags.CIDFlag)
flags.String(commonflags.OIDFlag, "", commonflags.OIDFlagUsage)
_ = objectGetCmd.MarkFlagRequired(commonflags.OIDFlag)
flags.Bool(verifyPresenceAllFlag, false, "Verify the actual presence of the object on all netmap nodes")
flags.Bool(explainFlag, false, "Show detailed information about the object placement")
}
func objectNodes(cmd *cobra.Command, _ []string) {
var cnrID cid.ID
var objID oid.ID
readObjectAddress(cmd, &cnrID, &objID)
pk := key.GetOrGenerate(cmd)
cli := internalclient.GetSDKClientByFlag(cmd, pk, commonflags.RPC)
objects := getPhyObjects(cmd, cnrID, objID, cli, pk)
placementPolicy, netmap := getPlacementPolicyAndNetmap(cmd, cnrID, cli)
requiredNodes, objectsPlacement := getRequiredPlacement(cmd, objects, placementPolicy, netmap)
actualPlacement := getActualPlacement(cmd, netmap, requiredNodes, pk, objects, objectsPlacement)
printPlacement(cmd, netmap, requiredNodes, actualPlacement, objID, objects, objectsPlacement)
}
func getPhyObjects(cmd *cobra.Command, cnrID cid.ID, objID oid.ID, cli *client.Client, pk *ecdsa.PrivateKey) []phyObject {
var addrObj oid.Address
addrObj.SetContainer(cnrID)
addrObj.SetObject(objID)
var prmHead internalclient.HeadObjectPrm
prmHead.SetClient(cli)
prmHead.SetAddress(addrObj)
prmHead.SetRawFlag(true)
Prepare(cmd, &prmHead)
readSession(cmd, &prmHead, pk, cnrID, objID)
res, err := internalclient.HeadObject(cmd.Context(), prmHead)
if err == nil {
obj := phyObject{
containerID: cnrID,
objectID: objID,
storedOnAllContainerNodes: res.Header().Type() == objectSDK.TypeLock ||
res.Header().Type() == objectSDK.TypeTombstone ||
len(res.Header().Children()) > 0,
}
if res.Header().ECHeader() != nil {
obj.ecHeader = &ecHeader{
index: res.Header().ECHeader().Index(),
parent: res.Header().ECHeader().Parent(),
}
}
return []phyObject{obj}
}
var errSplitInfo *objectSDK.SplitInfoError
if errors.As(err, &errSplitInfo) {
return getComplexObjectParts(cmd, cnrID, objID, cli, prmHead, errSplitInfo)
}
var ecInfoError *objectSDK.ECInfoError
if errors.As(err, &ecInfoError) {
return getECObjectChunks(cmd, cnrID, objID, ecInfoError)
}
commonCmd.ExitOnErr(cmd, "failed to get object info: %w", err)
return nil
}
func getComplexObjectParts(cmd *cobra.Command, cnrID cid.ID, objID oid.ID, cli *client.Client, prmHead internalclient.HeadObjectPrm, errSplitInfo *objectSDK.SplitInfoError) []phyObject {
members := getCompexObjectMembers(cmd, cnrID, objID, cli, prmHead, errSplitInfo)
return flattenComplexMembersIfECContainer(cmd, cnrID, members, prmHead)
}
func getCompexObjectMembers(cmd *cobra.Command, cnrID cid.ID, objID oid.ID, cli *client.Client, prmHead internalclient.HeadObjectPrm, errSplitInfo *objectSDK.SplitInfoError) []oid.ID {
splitInfo := errSplitInfo.SplitInfo()
if members, ok := tryGetSplitMembersByLinkingObject(cmd, splitInfo, prmHead, cnrID, false); ok {
return members
}
if members, ok := tryGetSplitMembersBySplitID(cmd, splitInfo, cli, cnrID); ok {
return members
}
return tryRestoreChainInReverse(cmd, splitInfo, prmHead, cli, cnrID, objID)
}
func flattenComplexMembersIfECContainer(cmd *cobra.Command, cnrID cid.ID, members []oid.ID, prmHead internalclient.HeadObjectPrm) []phyObject {
result := make([]phyObject, 0, len(members))
var addrObj oid.Address
addrObj.SetContainer(cnrID)
prmHead.SetRawFlag(true) // to get an error instead of whole object
for _, partObjID := range members {
addrObj.SetObject(partObjID)
prmHead.SetAddress(addrObj)
_, err := internalclient.HeadObject(cmd.Context(), prmHead)
var ecInfoError *objectSDK.ECInfoError
if errors.As(err, &ecInfoError) {
chunks := getECObjectChunks(cmd, cnrID, partObjID, ecInfoError)
result = append(result, chunks...)
continue
} else if err == nil { // not EC object, so all members must be phy objects
for _, member := range members {
result = append(result, phyObject{
containerID: cnrID,
objectID: member,
})
}
break
}
commonCmd.ExitOnErr(cmd, "failed to read EC chunk of complex object: %w", err)
}
return result
}
func getECObjectChunks(cmd *cobra.Command, cnrID cid.ID, objID oid.ID, errECInfo *objectSDK.ECInfoError) []phyObject {
ecInfo := errECInfo.ECInfo()
result := make([]phyObject, 0, len(ecInfo.Chunks))
for _, ch := range ecInfo.Chunks {
var chID oid.ID
err := chID.ReadFromV2(ch.ID)
if err != nil {
commonCmd.ExitOnErr(cmd, "failed to read EC chunk ID %w", err)
return nil
}
result = append(result, phyObject{
containerID: cnrID,
objectID: chID,
ecHeader: &ecHeader{
index: ch.Index,
parent: objID,
},
})
}
return result
}
func getPlacementPolicyAndNetmap(cmd *cobra.Command, cnrID cid.ID, cli *client.Client) (placementPolicy netmapSDK.PlacementPolicy, netmap *netmapSDK.NetMap) {
eg, egCtx := errgroup.WithContext(cmd.Context())
eg.Go(func() (e error) {
placementPolicy, e = getPlacementPolicy(egCtx, cnrID, cli)
return
})
eg.Go(func() (e error) {
netmap, e = getNetMap(egCtx, cli)
return
})
commonCmd.ExitOnErr(cmd, "rpc error: %w", eg.Wait())
return
}
func getPlacementPolicy(ctx context.Context, cnrID cid.ID, cli *client.Client) (netmapSDK.PlacementPolicy, error) {
prm := internalclient.GetContainerPrm{
Client: cli,
ClientParams: client.PrmContainerGet{
ContainerID: &cnrID,
},
}
res, err := internalclient.GetContainer(ctx, prm)
if err != nil {
return netmapSDK.PlacementPolicy{}, err
}
return res.Container().PlacementPolicy(), nil
}
func getNetMap(ctx context.Context, cli *client.Client) (*netmapSDK.NetMap, error) {
var prm internalclient.NetMapSnapshotPrm
prm.SetClient(cli)
res, err := internalclient.NetMapSnapshot(ctx, prm)
if err != nil {
return nil, err
}
nm := res.NetMap()
return &nm, nil
}
func getRequiredPlacement(cmd *cobra.Command, objects []phyObject, placementPolicy netmapSDK.PlacementPolicy, netmap *netmapSDK.NetMap) (map[uint64]netmapSDK.NodeInfo, map[oid.ID]objectPlacement) {
if policy.IsECPlacement(placementPolicy) {
return getECRequiredPlacement(cmd, objects, placementPolicy, netmap)
}
return getReplicaRequiredPlacement(cmd, objects, placementPolicy, netmap)
}
func getReplicaRequiredPlacement(cmd *cobra.Command, objects []phyObject, placementPolicy netmapSDK.PlacementPolicy, netmap *netmapSDK.NetMap) (map[uint64]netmapSDK.NodeInfo, map[oid.ID]objectPlacement) {
nodes := make(map[uint64]netmapSDK.NodeInfo)
objectsNodes := make(map[oid.ID]objectPlacement)
placementBuilder := placement.NewNetworkMapBuilder(netmap)
for _, object := range objects {
placement, err := placementBuilder.BuildPlacement(object.containerID, &object.objectID, placementPolicy)
commonCmd.ExitOnErr(cmd, "failed to get required placement for object: %w", err)
for repIdx, rep := range placement {
numOfReplicas := placementPolicy.ReplicaDescriptor(repIdx).NumberOfObjects()
var nodeIdx uint32
for _, n := range rep {
if !object.storedOnAllContainerNodes && nodeIdx == numOfReplicas {
break
}
nodes[n.Hash()] = n
op := objectsNodes[object.objectID]
op.requiredNodes = append(op.requiredNodes, n)
objectsNodes[object.objectID] = op
nodeIdx++
}
}
}
return nodes, objectsNodes
}
func getECRequiredPlacement(cmd *cobra.Command, objects []phyObject, placementPolicy netmapSDK.PlacementPolicy, netmap *netmapSDK.NetMap) (map[uint64]netmapSDK.NodeInfo, map[oid.ID]objectPlacement) {
nodes := make(map[uint64]netmapSDK.NodeInfo)
objectsNodes := make(map[oid.ID]objectPlacement)
for _, object := range objects {
getECRequiredPlacementInternal(cmd, object, placementPolicy, netmap, nodes, objectsNodes)
}
return nodes, objectsNodes
}
func getECRequiredPlacementInternal(cmd *cobra.Command, object phyObject, placementPolicy netmapSDK.PlacementPolicy, netmap *netmapSDK.NetMap, nodes map[uint64]netmapSDK.NodeInfo, objectNodes map[oid.ID]objectPlacement) {
placementObjectID := object.objectID
if object.ecHeader != nil {
placementObjectID = object.ecHeader.parent
}
placementBuilder := placement.NewNetworkMapBuilder(netmap)
placement, err := placementBuilder.BuildPlacement(object.containerID, &placementObjectID, placementPolicy)
commonCmd.ExitOnErr(cmd, "failed to get required placement: %w", err)
for _, vector := range placement {
if object.storedOnAllContainerNodes {
for _, node := range vector {
nodes[node.Hash()] = node
op := objectNodes[object.objectID]
op.requiredNodes = append(op.requiredNodes, node)
objectNodes[object.objectID] = op
}
continue
}
if object.ecHeader != nil {
chunkIdx := int(object.ecHeader.index)
nodeIdx := chunkIdx % len(vector)
node := vector[nodeIdx]
nodes[node.Hash()] = node
op := objectNodes[object.objectID]
op.requiredNodes = append(op.requiredNodes, node)
objectNodes[object.objectID] = op
}
}
}
func getActualPlacement(cmd *cobra.Command, netmap *netmapSDK.NetMap, requiredPlacement map[uint64]netmapSDK.NodeInfo,
pk *ecdsa.PrivateKey, objects []phyObject, objectNodes map[oid.ID]objectPlacement,
) map[uint64]boolError {
result := make(map[uint64]boolError)
resultMtx := &sync.Mutex{}
var candidates []netmapSDK.NodeInfo
checkAllNodes, _ := cmd.Flags().GetBool(verifyPresenceAllFlag)
if checkAllNodes {
candidates = netmap.Nodes()
} else {
for _, n := range requiredPlacement {
candidates = append(candidates, n)
}
}
eg, egCtx := errgroup.WithContext(cmd.Context())
for _, cand := range candidates {
cand := cand
eg.Go(func() error {
cli, err := createClient(egCtx, cmd, cand, pk)
if err != nil {
resultMtx.Lock()
defer resultMtx.Unlock()
result[cand.Hash()] = boolError{err: err}
return nil
}
for _, object := range objects {
object := object
eg.Go(func() error {
var v boolError
v.value, v.err = isObjectStoredOnNode(egCtx, cmd, object.containerID, object.objectID, cli, pk)
resultMtx.Lock()
defer resultMtx.Unlock()
if v.err == nil && v.value {
op := objectNodes[object.objectID]
op.confirmedNodes = append(op.confirmedNodes, cand)
objectNodes[object.objectID] = op
}
if prev, exists := result[cand.Hash()]; exists && (prev.err != nil || prev.value) {
return nil
}
result[cand.Hash()] = v
return nil
})
}
return nil
})
}
commonCmd.ExitOnErr(cmd, "failed to get actual placement: %w", eg.Wait())
return result
}
func createClient(ctx context.Context, cmd *cobra.Command, candidate netmapSDK.NodeInfo, pk *ecdsa.PrivateKey) (*client.Client, error) {
var cli *client.Client
var addresses []string
candidate.IterateNetworkEndpoints(func(s string) bool {
addresses = append(addresses, s)
return false
})
addresses = append(addresses, candidate.ExternalAddresses()...)
var lastErr error
for _, address := range addresses {
var networkAddr network.Address
lastErr = networkAddr.FromString(address)
if lastErr != nil {
continue
}
cli, lastErr = internalclient.GetSDKClient(ctx, cmd, pk, networkAddr)
if lastErr == nil {
break
}
}
if lastErr != nil {
return nil, lastErr
}
if cli == nil {
return nil, errNoAvailableEndpoint
}
return cli, nil
}
func isObjectStoredOnNode(ctx context.Context, cmd *cobra.Command, cnrID cid.ID, objID oid.ID, cli *client.Client, pk *ecdsa.PrivateKey) (bool, error) {
var addrObj oid.Address
addrObj.SetContainer(cnrID)
addrObj.SetObject(objID)
var prmHead internalclient.HeadObjectPrm
prmHead.SetClient(cli)
prmHead.SetAddress(addrObj)
Prepare(cmd, &prmHead)
prmHead.SetTTL(1)
readSession(cmd, &prmHead, pk, cnrID, objID)
res, err := internalclient.HeadObject(ctx, prmHead)
if err == nil && res != nil {
return true, nil
}
var notFound *apistatus.ObjectNotFound
var removed *apistatus.ObjectAlreadyRemoved
if errors.As(err, &notFound) || errors.As(err, &removed) {
return false, nil
}
cmd.Printf("failed to get object %s from client\n", objID.EncodeToString())
return false, err
}
func printPlacement(cmd *cobra.Command, netmap *netmapSDK.NetMap, requiredPlacement map[uint64]netmapSDK.NodeInfo,
actualPlacement map[uint64]boolError, objID oid.ID, objects []phyObject, objectNodes map[oid.ID]objectPlacement,
) {
w := tabwriter.NewWriter(cmd.OutOrStdout(), 0, 0, 1, ' ', tabwriter.AlignRight|tabwriter.Debug)
_, err := fmt.Fprintln(w, "Node ID\tShould contain object\tActually contains object\t")
commonCmd.ExitOnErr(cmd, "failed to print placement info: %w", err)
for _, n := range netmap.Nodes() {
nodeID := hex.EncodeToString(n.PublicKey())
_, required := requiredPlacement[n.Hash()]
actual, actualExists := actualPlacement[n.Hash()]
actualStr := ""
if actualExists {
if actual.err != nil {
actualStr = fmt.Sprintf("error: %v", actual.err)
} else {
actualStr = strconv.FormatBool(actual.value)
}
}
_, err := fmt.Fprintf(w, "%s\t%s\t%s\t\n", nodeID, strconv.FormatBool(required), actualStr)
commonCmd.ExitOnErr(cmd, "failed to print placement info: %w", err)
}
commonCmd.ExitOnErr(cmd, "failed to print placement info: %w", w.Flush())
if explain, _ := cmd.Flags().GetBool(explainFlag); !explain {
return
}
fmt.Fprintf(cmd.OutOrStdout(), "Object %s stores payload in %d data objects:\n", objID.EncodeToString(), len(objects))
for _, object := range objects {
fmt.Fprintf(cmd.OutOrStdout(), "- %s\n", object.objectID)
if object.ecHeader != nil {
fmt.Fprintf(cmd.OutOrStdout(), "\tEC index: %d\n", object.ecHeader.index)
fmt.Fprintf(cmd.OutOrStdout(), "\tEC parent: %s\n", object.ecHeader.parent.EncodeToString())
}
op, ok := objectNodes[object.objectID]
if !ok {
continue
}
if len(op.requiredNodes) > 0 {
fmt.Fprintf(cmd.OutOrStdout(), "\tRequired nodes:\n")
for _, node := range op.requiredNodes {
fmt.Fprintf(cmd.OutOrStdout(), "\t\t- %s\n", hex.EncodeToString(node.PublicKey()))
}
}
if len(op.confirmedNodes) > 0 {
fmt.Fprintf(cmd.OutOrStdout(), "\tActual nodes:\n")
for _, node := range op.confirmedNodes {
fmt.Fprintf(cmd.OutOrStdout(), "\t\t- %s\n", hex.EncodeToString(node.PublicKey()))
}
}
}
}