frostfs-cli: Add EC support to object nodes #1120

Merged
fyrchik merged 1 commit from dstepanov-yadro/frostfs-node:feat/ec_object_nodes into master 2024-09-04 19:51:08 +00:00
Showing only changes of commit 368218f0cc - Show all commits

View file

@ -1,14 +1,16 @@
package object
import (
"bytes"
"cmp"
"context"
"crypto/ecdsa"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"strconv"
"slices"
"sync"
"text/tabwriter"
internalclient "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/client"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/commonflags"
@ -29,7 +31,6 @@ import (
const (
verifyPresenceAllFlag = "verify-presence-all"
explainFlag = "explain"
)
var errNoAvailableEndpoint = errors.New("failed to create client: no available endpoint")
@ -46,16 +47,30 @@ type ecHeader struct {
parent oid.ID
}
type boolError struct {
value bool
err error
}
type objectPlacement struct {
requiredNodes []netmapSDK.NodeInfo
confirmedNodes []netmapSDK.NodeInfo
}
type objectNodesResult struct {
errors []error
placements map[oid.ID]objectPlacement
}
type ObjNodesDataObject struct {
ObjectID string `json:"object_id"`
RequiredNodes []string `json:"required_nodes,omitempty"`
ConfirmedNodes []string `json:"confirmed_nodes,omitempty"`
ECParentObjectID *string `json:"ec_parent_object_id,omitempty"`
ECIndex *uint32 `json:"ec_index,omitempty"`
}
type objNodesResultJSON struct {
ObjectID string `json:"object_id"`
DataObjects []ObjNodesDataObject `json:"data_objects,omitempty"`
Errors []string `json:"errors,omitempty"`
}
var objectNodesCmd = &cobra.Command{
Use: "nodes",
Short: "List of nodes where the object is stored",
@ -77,8 +92,8 @@ func initObjectNodesCmd() {
flags.String(commonflags.OIDFlag, "", commonflags.OIDFlagUsage)
_ = objectGetCmd.MarkFlagRequired(commonflags.OIDFlag)
flags.Bool(verifyPresenceAllFlag, false, "Verify the actual presence of the object on all netmap nodes")
flags.Bool(explainFlag, false, "Show detailed information about the object placement")
flags.Bool(verifyPresenceAllFlag, false, "Verify the actual presence of the object on all netmap nodes.")
flags.Bool(commonflags.JSON, false, "Print information about the object placement as json.")
}
func objectNodes(cmd *cobra.Command, _ []string) {
@ -93,11 +108,11 @@ func objectNodes(cmd *cobra.Command, _ []string) {
placementPolicy, netmap := getPlacementPolicyAndNetmap(cmd, cnrID, cli)
requiredNodes, objectsPlacement := getRequiredPlacement(cmd, objects, placementPolicy, netmap)
result := getRequiredPlacement(cmd, objects, placementPolicy, netmap)
actualPlacement := getActualPlacement(cmd, netmap, requiredNodes, pk, objects, objectsPlacement)
getActualPlacement(cmd, netmap, pk, objects, result)
printPlacement(cmd, netmap, requiredNodes, actualPlacement, objID, objects, objectsPlacement)
printPlacement(cmd, objID, objects, result)
}
func getPhyObjects(cmd *cobra.Command, cnrID cid.ID, objID oid.ID, cli *client.Client, pk *ecdsa.PrivateKey) []phyObject {
@ -256,16 +271,17 @@ func getNetMap(ctx context.Context, cli *client.Client) (*netmapSDK.NetMap, erro
return &nm, nil
}
func getRequiredPlacement(cmd *cobra.Command, objects []phyObject, placementPolicy netmapSDK.PlacementPolicy, netmap *netmapSDK.NetMap) (map[uint64]netmapSDK.NodeInfo, map[oid.ID]objectPlacement) {
func getRequiredPlacement(cmd *cobra.Command, objects []phyObject, placementPolicy netmapSDK.PlacementPolicy, netmap *netmapSDK.NetMap) *objectNodesResult {
if policy.IsECPlacement(placementPolicy) {
return getECRequiredPlacement(cmd, objects, placementPolicy, netmap)
}
return getReplicaRequiredPlacement(cmd, objects, placementPolicy, netmap)
}
func getReplicaRequiredPlacement(cmd *cobra.Command, objects []phyObject, placementPolicy netmapSDK.PlacementPolicy, netmap *netmapSDK.NetMap) (map[uint64]netmapSDK.NodeInfo, map[oid.ID]objectPlacement) {
nodes := make(map[uint64]netmapSDK.NodeInfo)
objectsNodes := make(map[oid.ID]objectPlacement)
func getReplicaRequiredPlacement(cmd *cobra.Command, objects []phyObject, placementPolicy netmapSDK.PlacementPolicy, netmap *netmapSDK.NetMap) *objectNodesResult {
result := &objectNodesResult{
placements: make(map[oid.ID]objectPlacement),
}
placementBuilder := placement.NewNetworkMapBuilder(netmap)
for _, object := range objects {
placement, err := placementBuilder.BuildPlacement(object.containerID, &object.objectID, placementPolicy)
@ -277,30 +293,30 @@ func getReplicaRequiredPlacement(cmd *cobra.Command, objects []phyObject, placem
if !object.storedOnAllContainerNodes && nodeIdx == numOfReplicas {
break
}
nodes[n.Hash()] = n
op := objectsNodes[object.objectID]
op := result.placements[object.objectID]
op.requiredNodes = append(op.requiredNodes, n)
objectsNodes[object.objectID] = op
result.placements[object.objectID] = op
nodeIdx++
}
}
}
return nodes, objectsNodes
return result
}
func getECRequiredPlacement(cmd *cobra.Command, objects []phyObject, placementPolicy netmapSDK.PlacementPolicy, netmap *netmapSDK.NetMap) (map[uint64]netmapSDK.NodeInfo, map[oid.ID]objectPlacement) {
nodes := make(map[uint64]netmapSDK.NodeInfo)
objectsNodes := make(map[oid.ID]objectPlacement)
for _, object := range objects {
getECRequiredPlacementInternal(cmd, object, placementPolicy, netmap, nodes, objectsNodes)
func getECRequiredPlacement(cmd *cobra.Command, objects []phyObject, placementPolicy netmapSDK.PlacementPolicy, netmap *netmapSDK.NetMap) *objectNodesResult {
result := &objectNodesResult{
placements: make(map[oid.ID]objectPlacement),
}
return nodes, objectsNodes
for _, object := range objects {
getECRequiredPlacementInternal(cmd, object, placementPolicy, netmap, result)
}
return result
}
func getECRequiredPlacementInternal(cmd *cobra.Command, object phyObject, placementPolicy netmapSDK.PlacementPolicy, netmap *netmapSDK.NetMap, nodes map[uint64]netmapSDK.NodeInfo, objectNodes map[oid.ID]objectPlacement) {
func getECRequiredPlacementInternal(cmd *cobra.Command, object phyObject, placementPolicy netmapSDK.PlacementPolicy, netmap *netmapSDK.NetMap, result *objectNodesResult) {
placementObjectID := object.objectID
if object.ecHeader != nil {
placementObjectID = object.ecHeader.parent
@ -312,11 +328,9 @@ func getECRequiredPlacementInternal(cmd *cobra.Command, object phyObject, placem
for _, vector := range placement {
if object.storedOnAllContainerNodes {
for _, node := range vector {
nodes[node.Hash()] = node
op := objectNodes[object.objectID]
op := result.placements[object.objectID]
op.requiredNodes = append(op.requiredNodes, node)
objectNodes[object.objectID] = op
result.placements[object.objectID] = op
}
continue
}
@ -325,30 +339,18 @@ func getECRequiredPlacementInternal(cmd *cobra.Command, object phyObject, placem
chunkIdx := int(object.ecHeader.index)
nodeIdx := chunkIdx % len(vector)
node := vector[nodeIdx]
nodes[node.Hash()] = node
op := objectNodes[object.objectID]
op := result.placements[object.objectID]
op.requiredNodes = append(op.requiredNodes, node)
objectNodes[object.objectID] = op
result.placements[object.objectID] = op
}
}
}
func getActualPlacement(cmd *cobra.Command, netmap *netmapSDK.NetMap, requiredPlacement map[uint64]netmapSDK.NodeInfo,
pk *ecdsa.PrivateKey, objects []phyObject, objectNodes map[oid.ID]objectPlacement,
) map[uint64]boolError {
result := make(map[uint64]boolError)
func getActualPlacement(cmd *cobra.Command, netmap *netmapSDK.NetMap, pk *ecdsa.PrivateKey, objects []phyObject, result *objectNodesResult) {
resultMtx := &sync.Mutex{}
var candidates []netmapSDK.NodeInfo
checkAllNodes, _ := cmd.Flags().GetBool(verifyPresenceAllFlag)
if checkAllNodes {
candidates = netmap.Nodes()
} else {
for _, n := range requiredPlacement {
candidates = append(candidates, n)
}
}
candidates := getNodesToCheckObjectExistance(cmd, netmap, result)
eg, egCtx := errgroup.WithContext(cmd.Context())
for _, cand := range candidates {
@ -359,26 +361,24 @@ func getActualPlacement(cmd *cobra.Command, netmap *netmapSDK.NetMap, requiredPl
if err != nil {
resultMtx.Lock()
defer resultMtx.Unlock()
result[cand.Hash()] = boolError{err: err}
result.errors = append(result.errors, fmt.Errorf("failed to connect to node %s: %w", hex.EncodeToString(cand.PublicKey()), err))
return nil
}
for _, object := range objects {
object := object
eg.Go(func() error {
var v boolError
v.value, v.err = isObjectStoredOnNode(egCtx, cmd, object.containerID, object.objectID, cli, pk)
stored, err := isObjectStoredOnNode(egCtx, cmd, object.containerID, object.objectID, cli, pk)
resultMtx.Lock()
defer resultMtx.Unlock()
if v.err == nil && v.value {
op := objectNodes[object.objectID]
if err == nil && stored {
op := result.placements[object.objectID]
op.confirmedNodes = append(op.confirmedNodes, cand)
objectNodes[object.objectID] = op
result.placements[object.objectID] = op
}
if prev, exists := result[cand.Hash()]; exists && (prev.err != nil || prev.value) {
return nil
if err != nil {
result.errors = append(result.errors, fmt.Errorf("failed to check object %s existence on node %s: %w", object.objectID.EncodeToString(), hex.EncodeToString(cand.PublicKey()), err))
}
result[cand.Hash()] = v
return nil
})
}
@ -387,7 +387,24 @@ func getActualPlacement(cmd *cobra.Command, netmap *netmapSDK.NetMap, requiredPl
}
commonCmd.ExitOnErr(cmd, "failed to get actual placement: %w", eg.Wait())
return result
}
func getNodesToCheckObjectExistance(cmd *cobra.Command, netmap *netmapSDK.NetMap, result *objectNodesResult) []netmapSDK.NodeInfo {
checkAllNodes, _ := cmd.Flags().GetBool(verifyPresenceAllFlag)
if checkAllNodes {
return netmap.Nodes()
}
var nodes []netmapSDK.NodeInfo
visited := make(map[uint64]struct{})
for _, p := range result.placements {
for _, node := range p.requiredNodes {
if _, ok := visited[node.Hash()]; !ok {
nodes = append(nodes, node)
visited[node.Hash()] = struct{}{}
}
}
}
return nodes
}
func createClient(ctx context.Context, cmd *cobra.Command, candidate netmapSDK.NodeInfo, pk *ecdsa.PrivateKey) (*client.Client, error) {
@ -445,33 +462,44 @@ func isObjectStoredOnNode(ctx context.Context, cmd *cobra.Command, cnrID cid.ID,
return false, err
}
func printPlacement(cmd *cobra.Command, netmap *netmapSDK.NetMap, requiredPlacement map[uint64]netmapSDK.NodeInfo,
actualPlacement map[uint64]boolError, objID oid.ID, objects []phyObject, objectNodes map[oid.ID]objectPlacement,
) {
w := tabwriter.NewWriter(cmd.OutOrStdout(), 0, 0, 1, ' ', tabwriter.AlignRight|tabwriter.Debug)
_, err := fmt.Fprintln(w, "Node ID\tShould contain object\tActually contains object\t")
commonCmd.ExitOnErr(cmd, "failed to print placement info: %w", err)
for _, n := range netmap.Nodes() {
nodeID := hex.EncodeToString(n.PublicKey())
_, required := requiredPlacement[n.Hash()]
actual, actualExists := actualPlacement[n.Hash()]
actualStr := ""
if actualExists {
if actual.err != nil {
actualStr = fmt.Sprintf("error: %v", actual.err)
} else {
actualStr = strconv.FormatBool(actual.value)
}
func printPlacement(cmd *cobra.Command, objID oid.ID, objects []phyObject, result *objectNodesResult) {
normilizeObjectNodesResult(objects, result)
if json, _ := cmd.Flags().GetBool(commonflags.JSON); json {
printObjectNodesAsJSON(cmd, objID, objects, result)
} else {
printObjectNodesAsText(cmd, objID, objects, result)
}
}
func normilizeObjectNodesResult(objects []phyObject, result *objectNodesResult) {
slices.SortFunc(objects, func(lhs, rhs phyObject) int {
if lhs.ecHeader == nil && rhs.ecHeader == nil {
return bytes.Compare(lhs.objectID[:], rhs.objectID[:])
}
_, err := fmt.Fprintf(w, "%s\t%s\t%s\t\n", nodeID, strconv.FormatBool(required), actualStr)
commonCmd.ExitOnErr(cmd, "failed to print placement info: %w", err)
}
commonCmd.ExitOnErr(cmd, "failed to print placement info: %w", w.Flush())
if explain, _ := cmd.Flags().GetBool(explainFlag); !explain {
return
if lhs.ecHeader == nil {
return -1
}
if rhs.ecHeader == nil {
return 1
}
if lhs.ecHeader.parent == rhs.ecHeader.parent {
return cmp.Compare(lhs.ecHeader.index, rhs.ecHeader.index)
}
return bytes.Compare(lhs.ecHeader.parent[:], rhs.ecHeader.parent[:])
})
for _, obj := range objects {
op := result.placements[obj.objectID]
slices.SortFunc(op.confirmedNodes, func(lhs, rhs netmapSDK.NodeInfo) int {
return bytes.Compare(lhs.PublicKey(), rhs.PublicKey())
})
slices.SortFunc(op.requiredNodes, func(lhs, rhs netmapSDK.NodeInfo) int {
return bytes.Compare(lhs.PublicKey(), rhs.PublicKey())
})
result.placements[obj.objectID] = op
}
}
func printObjectNodesAsText(cmd *cobra.Command, objID oid.ID, objects []phyObject, result *objectNodesResult) {
fmt.Fprintf(cmd.OutOrStdout(), "Object %s stores payload in %d data objects:\n", objID.EncodeToString(), len(objects))
for _, object := range objects {
@ -480,7 +508,7 @@ func printPlacement(cmd *cobra.Command, netmap *netmapSDK.NetMap, requiredPlacem
fmt.Fprintf(cmd.OutOrStdout(), "\tEC index: %d\n", object.ecHeader.index)
fmt.Fprintf(cmd.OutOrStdout(), "\tEC parent: %s\n", object.ecHeader.parent.EncodeToString())
}
op, ok := objectNodes[object.objectID]
op, ok := result.placements[object.objectID]
if !ok {
continue
}
@ -491,10 +519,52 @@ func printPlacement(cmd *cobra.Command, netmap *netmapSDK.NetMap, requiredPlacem
}
}
if len(op.confirmedNodes) > 0 {
fmt.Fprintf(cmd.OutOrStdout(), "\tActual nodes:\n")
fmt.Fprintf(cmd.OutOrStdout(), "\tConfirmed nodes:\n")
for _, node := range op.confirmedNodes {
fmt.Fprintf(cmd.OutOrStdout(), "\t\t- %s\n", hex.EncodeToString(node.PublicKey()))
}
}
}
if len(result.errors) == 0 {
return
}
fmt.Fprintf(cmd.OutOrStdout(), "Errors:\n")
for _, err := range result.errors {
fmt.Fprintf(cmd.OutOrStdout(), "\t%s\n", err.Error())
}
}
func printObjectNodesAsJSON(cmd *cobra.Command, objID oid.ID, objects []phyObject, result *objectNodesResult) {
jsonResult := &objNodesResultJSON{
ObjectID: objID.EncodeToString(),
}
for _, object := range objects {
do := ObjNodesDataObject{
ObjectID: object.objectID.EncodeToString(),
}
if object.ecHeader != nil {
do.ECIndex = &object.ecHeader.index
ecParent := object.ecHeader.parent.EncodeToString()
do.ECParentObjectID = &ecParent
}
op, ok := result.placements[object.objectID]
if !ok {
continue
}
for _, rn := range op.requiredNodes {
do.RequiredNodes = append(do.RequiredNodes, hex.EncodeToString(rn.PublicKey()))
}
for _, cn := range op.confirmedNodes {
do.ConfirmedNodes = append(do.ConfirmedNodes, hex.EncodeToString(cn.PublicKey()))
}
jsonResult.DataObjects = append(jsonResult.DataObjects, do)
}
for _, err := range result.errors {
jsonResult.Errors = append(jsonResult.Errors, err.Error())
}
b, err := json.Marshal(jsonResult)
commonCmd.ExitOnErr(cmd, "failed to marshal json: %w", err)
cmd.Println(string(b))
}