frostfs-cli: Add EC support to object nodes
#1120
3 changed files with 347 additions and 126 deletions
|
@ -1,19 +1,22 @@
|
|||
package object
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"cmp"
|
||||
"context"
|
||||
"crypto/ecdsa"
|
||||
"encoding/hex"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"slices"
|
||||
"sync"
|
||||
"text/tabwriter"
|
||||
|
||||
internalclient "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/client"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/commonflags"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/key"
|
||||
commonCmd "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/internal/common"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/policy"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
||||
|
@ -32,16 +35,40 @@ const (
|
|||
|
||||
var errNoAvailableEndpoint = errors.New("failed to create client: no available endpoint")
|
||||
|
||||
type objectNodesInfo struct {
|
||||
containerID cid.ID
|
||||
objectID oid.ID
|
||||
relatedObjectIDs []oid.ID
|
||||
isLockOrTombstone bool
|
||||
type phyObject struct {
|
||||
containerID cid.ID
|
||||
objectID oid.ID
|
||||
storedOnAllContainerNodes bool
|
||||
ecHeader *ecHeader
|
||||
}
|
||||
|
||||
type boolError struct {
|
||||
value bool
|
||||
err error
|
||||
type ecHeader struct {
|
||||
index uint32
|
||||
parent oid.ID
|
||||
}
|
||||
|
||||
type objectPlacement struct {
|
||||
requiredNodes []netmapSDK.NodeInfo
|
||||
confirmedNodes []netmapSDK.NodeInfo
|
||||
}
|
||||
|
||||
type objectNodesResult struct {
|
||||
errors []error
|
||||
placements map[oid.ID]objectPlacement
|
||||
}
|
||||
|
||||
type ObjNodesDataObject struct {
|
||||
ObjectID string `json:"object_id"`
|
||||
RequiredNodes []string `json:"required_nodes,omitempty"`
|
||||
ConfirmedNodes []string `json:"confirmed_nodes,omitempty"`
|
||||
ECParentObjectID *string `json:"ec_parent_object_id,omitempty"`
|
||||
ECIndex *uint32 `json:"ec_index,omitempty"`
|
||||
}
|
||||
|
||||
type objNodesResultJSON struct {
|
||||
ObjectID string `json:"object_id"`
|
||||
DataObjects []ObjNodesDataObject `json:"data_objects,omitempty"`
|
||||
Errors []string `json:"errors,omitempty"`
|
||||
}
|
||||
|
||||
var objectNodesCmd = &cobra.Command{
|
||||
|
@ -49,7 +76,7 @@ var objectNodesCmd = &cobra.Command{
|
|||
Short: "List of nodes where the object is stored",
|
||||
Long: `List of nodes where the object should be stored and where it is actually stored.
|
||||
Lock objects must exist on all nodes of the container.
|
||||
For complex objects, a node is considered to store an object if the node stores at least one part of the complex object.
|
||||
For complex and EC objects, a node is considered to store an object if the node stores at least one part of the complex object or one chunk of the EC object.
|
||||
By default, the actual storage of the object is checked only on the nodes that should store the object. To check all nodes, use the flag --verify-presence-all.`,
|
||||
Run: objectNodes,
|
||||
}
|
||||
|
@ -65,7 +92,8 @@ func initObjectNodesCmd() {
|
|||
flags.String(commonflags.OIDFlag, "", commonflags.OIDFlagUsage)
|
||||
_ = objectGetCmd.MarkFlagRequired(commonflags.OIDFlag)
|
||||
|
||||
flags.Bool("verify-presence-all", false, "Verify the actual presence of the object on all netmap nodes")
|
||||
flags.Bool(verifyPresenceAllFlag, false, "Verify the actual presence of the object on all netmap nodes.")
|
||||
flags.Bool(commonflags.JSON, false, "Print information about the object placement as json.")
|
||||
}
|
||||
|
||||
func objectNodes(cmd *cobra.Command, _ []string) {
|
||||
|
@ -76,18 +104,18 @@ func objectNodes(cmd *cobra.Command, _ []string) {
|
|||
pk := key.GetOrGenerate(cmd)
|
||||
cli := internalclient.GetSDKClientByFlag(cmd, pk, commonflags.RPC)
|
||||
|
||||
objectInfo := getObjectInfo(cmd, cnrID, objID, cli, pk)
|
||||
objects := getPhyObjects(cmd, cnrID, objID, cli, pk)
|
||||
|
||||
placementPolicy, netmap := getPlacementPolicyAndNetmap(cmd, cnrID, cli)
|
||||
|
||||
requiredPlacement := getRequiredPlacement(cmd, objectInfo, placementPolicy, netmap)
|
||||
result := getRequiredPlacement(cmd, objects, placementPolicy, netmap)
|
||||
|
||||
actualPlacement := getActualPlacement(cmd, netmap, requiredPlacement, pk, objectInfo)
|
||||
getActualPlacement(cmd, netmap, pk, objects, result)
|
||||
|
||||
printPlacement(cmd, netmap, requiredPlacement, actualPlacement)
|
||||
printPlacement(cmd, objID, objects, result)
|
||||
}
|
||||
|
||||
func getObjectInfo(cmd *cobra.Command, cnrID cid.ID, objID oid.ID, cli *client.Client, pk *ecdsa.PrivateKey) *objectNodesInfo {
|
||||
func getPhyObjects(cmd *cobra.Command, cnrID cid.ID, objID oid.ID, cli *client.Client, pk *ecdsa.PrivateKey) []phyObject {
|
||||
var addrObj oid.Address
|
||||
addrObj.SetContainer(cnrID)
|
||||
addrObj.SetObject(objID)
|
||||
|
@ -102,44 +130,103 @@ func getObjectInfo(cmd *cobra.Command, cnrID cid.ID, objID oid.ID, cli *client.C
|
|||
|
||||
res, err := internalclient.HeadObject(cmd.Context(), prmHead)
|
||||
if err == nil {
|
||||
return &objectNodesInfo{
|
||||
containerID: cnrID,
|
||||
objectID: objID,
|
||||
isLockOrTombstone: res.Header().Type() == objectSDK.TypeLock || res.Header().Type() == objectSDK.TypeTombstone,
|
||||
obj := phyObject{
|
||||
containerID: cnrID,
|
||||
objectID: objID,
|
||||
storedOnAllContainerNodes: res.Header().Type() == objectSDK.TypeLock ||
|
||||
res.Header().Type() == objectSDK.TypeTombstone ||
|
||||
len(res.Header().Children()) > 0,
|
||||
}
|
||||
if res.Header().ECHeader() != nil {
|
||||
obj.ecHeader = &ecHeader{
|
||||
index: res.Header().ECHeader().Index(),
|
||||
parent: res.Header().ECHeader().Parent(),
|
||||
}
|
||||
}
|
||||
return []phyObject{obj}
|
||||
}
|
||||
|
||||
var errSplitInfo *objectSDK.SplitInfoError
|
||||
|
||||
if !errors.As(err, &errSplitInfo) {
|
||||
commonCmd.ExitOnErr(cmd, "failed to get object info: %w", err)
|
||||
return nil
|
||||
if errors.As(err, &errSplitInfo) {
|
||||
return getComplexObjectParts(cmd, cnrID, objID, cli, prmHead, errSplitInfo)
|
||||
}
|
||||
|
||||
var ecInfoError *objectSDK.ECInfoError
|
||||
if errors.As(err, &ecInfoError) {
|
||||
return getECObjectChunks(cmd, cnrID, objID, ecInfoError)
|
||||
}
|
||||
commonCmd.ExitOnErr(cmd, "failed to get object info: %w", err)
|
||||
return nil
|
||||
}
|
||||
|
||||
func getComplexObjectParts(cmd *cobra.Command, cnrID cid.ID, objID oid.ID, cli *client.Client, prmHead internalclient.HeadObjectPrm, errSplitInfo *objectSDK.SplitInfoError) []phyObject {
|
||||
members := getCompexObjectMembers(cmd, cnrID, objID, cli, prmHead, errSplitInfo)
|
||||
return flattenComplexMembersIfECContainer(cmd, cnrID, members, prmHead)
|
||||
}
|
||||
|
||||
func getCompexObjectMembers(cmd *cobra.Command, cnrID cid.ID, objID oid.ID, cli *client.Client, prmHead internalclient.HeadObjectPrm, errSplitInfo *objectSDK.SplitInfoError) []oid.ID {
|
||||
splitInfo := errSplitInfo.SplitInfo()
|
||||
|
||||
if members, ok := tryGetSplitMembersByLinkingObject(cmd, splitInfo, prmHead, cnrID); ok {
|
||||
return &objectNodesInfo{
|
||||
containerID: cnrID,
|
||||
objectID: objID,
|
||||
relatedObjectIDs: members,
|
||||
}
|
||||
if members, ok := tryGetSplitMembersByLinkingObject(cmd, splitInfo, prmHead, cnrID, false); ok {
|
||||
return members
|
||||
}
|
||||
|
||||
if members, ok := tryGetSplitMembersBySplitID(cmd, splitInfo, cli, cnrID); ok {
|
||||
return &objectNodesInfo{
|
||||
containerID: cnrID,
|
||||
objectID: objID,
|
||||
relatedObjectIDs: members,
|
||||
}
|
||||
return members
|
||||
}
|
||||
|
||||
members := tryRestoreChainInReverse(cmd, splitInfo, prmHead, cli, cnrID, objID)
|
||||
return &objectNodesInfo{
|
||||
containerID: cnrID,
|
||||
objectID: objID,
|
||||
relatedObjectIDs: members,
|
||||
return tryRestoreChainInReverse(cmd, splitInfo, prmHead, cli, cnrID, objID)
|
||||
}
|
||||
|
||||
func flattenComplexMembersIfECContainer(cmd *cobra.Command, cnrID cid.ID, members []oid.ID, prmHead internalclient.HeadObjectPrm) []phyObject {
|
||||
result := make([]phyObject, 0, len(members))
|
||||
var addrObj oid.Address
|
||||
addrObj.SetContainer(cnrID)
|
||||
prmHead.SetRawFlag(true) // to get an error instead of whole object
|
||||
for _, partObjID := range members {
|
||||
addrObj.SetObject(partObjID)
|
||||
prmHead.SetAddress(addrObj)
|
||||
|
||||
_, err := internalclient.HeadObject(cmd.Context(), prmHead)
|
||||
var ecInfoError *objectSDK.ECInfoError
|
||||
if errors.As(err, &ecInfoError) {
|
||||
chunks := getECObjectChunks(cmd, cnrID, partObjID, ecInfoError)
|
||||
result = append(result, chunks...)
|
||||
continue
|
||||
} else if err == nil { // not EC object, so all members must be phy objects
|
||||
for _, member := range members {
|
||||
result = append(result, phyObject{
|
||||
containerID: cnrID,
|
||||
objectID: member,
|
||||
})
|
||||
}
|
||||
break
|
||||
}
|
||||
commonCmd.ExitOnErr(cmd, "failed to read EC chunk of complex object: %w", err)
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
func getECObjectChunks(cmd *cobra.Command, cnrID cid.ID, objID oid.ID, errECInfo *objectSDK.ECInfoError) []phyObject {
|
||||
ecInfo := errECInfo.ECInfo()
|
||||
result := make([]phyObject, 0, len(ecInfo.Chunks))
|
||||
for _, ch := range ecInfo.Chunks {
|
||||
var chID oid.ID
|
||||
err := chID.ReadFromV2(ch.ID)
|
||||
if err != nil {
|
||||
commonCmd.ExitOnErr(cmd, "failed to read EC chunk ID %w", err)
|
||||
return nil
|
||||
}
|
||||
result = append(result, phyObject{
|
||||
containerID: cnrID,
|
||||
objectID: chID,
|
||||
ecHeader: &ecHeader{
|
||||
index: ch.Index,
|
||||
parent: objID,
|
||||
},
|
||||
})
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
func getPlacementPolicyAndNetmap(cmd *cobra.Command, cnrID cid.ID, cli *client.Client) (placementPolicy netmapSDK.PlacementPolicy, netmap *netmapSDK.NetMap) {
|
||||
|
@ -184,52 +271,87 @@ func getNetMap(ctx context.Context, cli *client.Client) (*netmapSDK.NetMap, erro
|
|||
return &nm, nil
|
||||
}
|
||||
|
||||
func getRequiredPlacement(cmd *cobra.Command, objInfo *objectNodesInfo, placementPolicy netmapSDK.PlacementPolicy, netmap *netmapSDK.NetMap) map[uint64]netmapSDK.NodeInfo {
|
||||
nodes := make(map[uint64]netmapSDK.NodeInfo)
|
||||
placementBuilder := placement.NewNetworkMapBuilder(netmap)
|
||||
placement, err := placementBuilder.BuildPlacement(objInfo.containerID, &objInfo.objectID, placementPolicy)
|
||||
commonCmd.ExitOnErr(cmd, "failed to get required placement: %w", err)
|
||||
for repIdx, rep := range placement {
|
||||
numOfReplicas := placementPolicy.ReplicaDescriptor(repIdx).NumberOfObjects()
|
||||
var nodeIdx uint32
|
||||
for _, n := range rep {
|
||||
if !objInfo.isLockOrTombstone && nodeIdx == numOfReplicas { // lock and tombstone objects should be on all container nodes
|
||||
break
|
||||
}
|
||||
nodes[n.Hash()] = n
|
||||
nodeIdx++
|
||||
}
|
||||
func getRequiredPlacement(cmd *cobra.Command, objects []phyObject, placementPolicy netmapSDK.PlacementPolicy, netmap *netmapSDK.NetMap) *objectNodesResult {
|
||||
if policy.IsECPlacement(placementPolicy) {
|
||||
return getECRequiredPlacement(cmd, objects, placementPolicy, netmap)
|
||||
}
|
||||
|
||||
for _, relatedObjID := range objInfo.relatedObjectIDs {
|
||||
placement, err = placementBuilder.BuildPlacement(objInfo.containerID, &relatedObjID, placementPolicy)
|
||||
commonCmd.ExitOnErr(cmd, "failed to get required placement for related object: %w", err)
|
||||
for _, rep := range placement {
|
||||
for _, n := range rep {
|
||||
nodes[n.Hash()] = n
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nodes
|
||||
return getReplicaRequiredPlacement(cmd, objects, placementPolicy, netmap)
|
||||
}
|
||||
|
||||
func getActualPlacement(cmd *cobra.Command, netmap *netmapSDK.NetMap, requiredPlacement map[uint64]netmapSDK.NodeInfo,
|
||||
pk *ecdsa.PrivateKey, objInfo *objectNodesInfo,
|
||||
) map[uint64]boolError {
|
||||
result := make(map[uint64]boolError)
|
||||
resultMtx := &sync.Mutex{}
|
||||
func getReplicaRequiredPlacement(cmd *cobra.Command, objects []phyObject, placementPolicy netmapSDK.PlacementPolicy, netmap *netmapSDK.NetMap) *objectNodesResult {
|
||||
result := &objectNodesResult{
|
||||
placements: make(map[oid.ID]objectPlacement),
|
||||
}
|
||||
placementBuilder := placement.NewNetworkMapBuilder(netmap)
|
||||
for _, object := range objects {
|
||||
placement, err := placementBuilder.BuildPlacement(object.containerID, &object.objectID, placementPolicy)
|
||||
commonCmd.ExitOnErr(cmd, "failed to get required placement for object: %w", err)
|
||||
for repIdx, rep := range placement {
|
||||
numOfReplicas := placementPolicy.ReplicaDescriptor(repIdx).NumberOfObjects()
|
||||
var nodeIdx uint32
|
||||
for _, n := range rep {
|
||||
if !object.storedOnAllContainerNodes && nodeIdx == numOfReplicas {
|
||||
break
|
||||
}
|
||||
|
||||
var candidates []netmapSDK.NodeInfo
|
||||
checkAllNodes, _ := cmd.Flags().GetBool(verifyPresenceAllFlag)
|
||||
if checkAllNodes {
|
||||
candidates = netmap.Nodes()
|
||||
} else {
|
||||
for _, n := range requiredPlacement {
|
||||
candidates = append(candidates, n)
|
||||
op := result.placements[object.objectID]
|
||||
op.requiredNodes = append(op.requiredNodes, n)
|
||||
result.placements[object.objectID] = op
|
||||
|
||||
nodeIdx++
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return result
|
||||
}
|
||||
|
||||
func getECRequiredPlacement(cmd *cobra.Command, objects []phyObject, placementPolicy netmapSDK.PlacementPolicy, netmap *netmapSDK.NetMap) *objectNodesResult {
|
||||
result := &objectNodesResult{
|
||||
placements: make(map[oid.ID]objectPlacement),
|
||||
}
|
||||
for _, object := range objects {
|
||||
getECRequiredPlacementInternal(cmd, object, placementPolicy, netmap, result)
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
func getECRequiredPlacementInternal(cmd *cobra.Command, object phyObject, placementPolicy netmapSDK.PlacementPolicy, netmap *netmapSDK.NetMap, result *objectNodesResult) {
|
||||
placementObjectID := object.objectID
|
||||
if object.ecHeader != nil {
|
||||
placementObjectID = object.ecHeader.parent
|
||||
}
|
||||
placementBuilder := placement.NewNetworkMapBuilder(netmap)
|
||||
placement, err := placementBuilder.BuildPlacement(object.containerID, &placementObjectID, placementPolicy)
|
||||
commonCmd.ExitOnErr(cmd, "failed to get required placement: %w", err)
|
||||
|
||||
for _, vector := range placement {
|
||||
if object.storedOnAllContainerNodes {
|
||||
for _, node := range vector {
|
||||
op := result.placements[object.objectID]
|
||||
op.requiredNodes = append(op.requiredNodes, node)
|
||||
result.placements[object.objectID] = op
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
if object.ecHeader != nil {
|
||||
chunkIdx := int(object.ecHeader.index)
|
||||
nodeIdx := chunkIdx % len(vector)
|
||||
node := vector[nodeIdx]
|
||||
|
||||
op := result.placements[object.objectID]
|
||||
op.requiredNodes = append(op.requiredNodes, node)
|
||||
result.placements[object.objectID] = op
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func getActualPlacement(cmd *cobra.Command, netmap *netmapSDK.NetMap, pk *ecdsa.PrivateKey, objects []phyObject, result *objectNodesResult) {
|
||||
resultMtx := &sync.Mutex{}
|
||||
|
||||
candidates := getNodesToCheckObjectExistance(cmd, netmap, result)
|
||||
|
||||
eg, egCtx := errgroup.WithContext(cmd.Context())
|
||||
for _, cand := range candidates {
|
||||
cand := cand
|
||||
|
@ -239,33 +361,24 @@ func getActualPlacement(cmd *cobra.Command, netmap *netmapSDK.NetMap, requiredPl
|
|||
if err != nil {
|
||||
resultMtx.Lock()
|
||||
defer resultMtx.Unlock()
|
||||
result[cand.Hash()] = boolError{err: err}
|
||||
result.errors = append(result.errors, fmt.Errorf("failed to connect to node %s: %w", hex.EncodeToString(cand.PublicKey()), err))
|
||||
return nil
|
||||
}
|
||||
|
||||
eg.Go(func() error {
|
||||
var v boolError
|
||||
v.value, v.err = isObjectStoredOnNode(egCtx, cmd, objInfo.containerID, objInfo.objectID, cli, pk)
|
||||
resultMtx.Lock()
|
||||
defer resultMtx.Unlock()
|
||||
if prev, exists := result[cand.Hash()]; exists && (prev.err != nil || prev.value) {
|
||||
return nil
|
||||
}
|
||||
result[cand.Hash()] = v
|
||||
return nil
|
||||
})
|
||||
|
||||
for _, rObjID := range objInfo.relatedObjectIDs {
|
||||
rObjID := rObjID
|
||||
for _, object := range objects {
|
||||
object := object
|
||||
eg.Go(func() error {
|
||||
var v boolError
|
||||
v.value, v.err = isObjectStoredOnNode(egCtx, cmd, objInfo.containerID, rObjID, cli, pk)
|
||||
stored, err := isObjectStoredOnNode(egCtx, cmd, object.containerID, object.objectID, cli, pk)
|
||||
resultMtx.Lock()
|
||||
defer resultMtx.Unlock()
|
||||
if prev, exists := result[cand.Hash()]; exists && (prev.err != nil || prev.value) {
|
||||
return nil
|
||||
if err == nil && stored {
|
||||
op := result.placements[object.objectID]
|
||||
op.confirmedNodes = append(op.confirmedNodes, cand)
|
||||
result.placements[object.objectID] = op
|
||||
}
|
||||
if err != nil {
|
||||
result.errors = append(result.errors, fmt.Errorf("failed to check object %s existence on node %s: %w", object.objectID.EncodeToString(), hex.EncodeToString(cand.PublicKey()), err))
|
||||
}
|
||||
result[cand.Hash()] = v
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
@ -274,7 +387,24 @@ func getActualPlacement(cmd *cobra.Command, netmap *netmapSDK.NetMap, requiredPl
|
|||
}
|
||||
|
||||
commonCmd.ExitOnErr(cmd, "failed to get actual placement: %w", eg.Wait())
|
||||
return result
|
||||
}
|
||||
|
||||
func getNodesToCheckObjectExistance(cmd *cobra.Command, netmap *netmapSDK.NetMap, result *objectNodesResult) []netmapSDK.NodeInfo {
|
||||
checkAllNodes, _ := cmd.Flags().GetBool(verifyPresenceAllFlag)
|
||||
if checkAllNodes {
|
||||
return netmap.Nodes()
|
||||
}
|
||||
var nodes []netmapSDK.NodeInfo
|
||||
visited := make(map[uint64]struct{})
|
||||
for _, p := range result.placements {
|
||||
for _, node := range p.requiredNodes {
|
||||
if _, ok := visited[node.Hash()]; !ok {
|
||||
nodes = append(nodes, node)
|
||||
visited[node.Hash()] = struct{}{}
|
||||
}
|
||||
}
|
||||
}
|
||||
return nodes
|
||||
}
|
||||
|
||||
func createClient(ctx context.Context, cmd *cobra.Command, candidate netmapSDK.NodeInfo, pk *ecdsa.PrivateKey) (*client.Client, error) {
|
||||
|
@ -328,27 +458,113 @@ func isObjectStoredOnNode(ctx context.Context, cmd *cobra.Command, cnrID cid.ID,
|
|||
if errors.As(err, ¬Found) || errors.As(err, &removed) {
|
||||
return false, nil
|
||||
}
|
||||
cmd.Printf("failed to get object %s from client\n", objID.EncodeToString())
|
||||
return false, err
|
||||
}
|
||||
|
||||
func printPlacement(cmd *cobra.Command, netmap *netmapSDK.NetMap, requiredPlacement map[uint64]netmapSDK.NodeInfo, actualPlacement map[uint64]boolError) {
|
||||
w := tabwriter.NewWriter(cmd.OutOrStdout(), 0, 0, 1, ' ', tabwriter.AlignRight|tabwriter.Debug)
|
||||
defer func() {
|
||||
commonCmd.ExitOnErr(cmd, "failed to print placement info: %w", w.Flush())
|
||||
}()
|
||||
fmt.Fprintln(w, "Node ID\tShould contain object\tActually contains object\t")
|
||||
for _, n := range netmap.Nodes() {
|
||||
nodeID := hex.EncodeToString(n.PublicKey())
|
||||
_, required := requiredPlacement[n.Hash()]
|
||||
actual, actualExists := actualPlacement[n.Hash()]
|
||||
actualStr := ""
|
||||
if actualExists {
|
||||
if actual.err != nil {
|
||||
actualStr = fmt.Sprintf("error: %v", actual.err)
|
||||
} else {
|
||||
actualStr = strconv.FormatBool(actual.value)
|
||||
}
|
||||
}
|
||||
fmt.Fprintf(w, "%s\t%s\t%s\t\n", nodeID, strconv.FormatBool(required), actualStr)
|
||||
func printPlacement(cmd *cobra.Command, objID oid.ID, objects []phyObject, result *objectNodesResult) {
|
||||
normilizeObjectNodesResult(objects, result)
|
||||
if json, _ := cmd.Flags().GetBool(commonflags.JSON); json {
|
||||
printObjectNodesAsJSON(cmd, objID, objects, result)
|
||||
} else {
|
||||
printObjectNodesAsText(cmd, objID, objects, result)
|
||||
}
|
||||
}
|
||||
|
||||
func normilizeObjectNodesResult(objects []phyObject, result *objectNodesResult) {
|
||||
slices.SortFunc(objects, func(lhs, rhs phyObject) int {
|
||||
if lhs.ecHeader == nil && rhs.ecHeader == nil {
|
||||
return bytes.Compare(lhs.objectID[:], rhs.objectID[:])
|
||||
}
|
||||
if lhs.ecHeader == nil {
|
||||
return -1
|
||||
}
|
||||
if rhs.ecHeader == nil {
|
||||
return 1
|
||||
}
|
||||
if lhs.ecHeader.parent == rhs.ecHeader.parent {
|
||||
return cmp.Compare(lhs.ecHeader.index, rhs.ecHeader.index)
|
||||
}
|
||||
return bytes.Compare(lhs.ecHeader.parent[:], rhs.ecHeader.parent[:])
|
||||
})
|
||||
for _, obj := range objects {
|
||||
op := result.placements[obj.objectID]
|
||||
slices.SortFunc(op.confirmedNodes, func(lhs, rhs netmapSDK.NodeInfo) int {
|
||||
return bytes.Compare(lhs.PublicKey(), rhs.PublicKey())
|
||||
})
|
||||
slices.SortFunc(op.requiredNodes, func(lhs, rhs netmapSDK.NodeInfo) int {
|
||||
return bytes.Compare(lhs.PublicKey(), rhs.PublicKey())
|
||||
})
|
||||
result.placements[obj.objectID] = op
|
||||
}
|
||||
}
|
||||
|
||||
func printObjectNodesAsText(cmd *cobra.Command, objID oid.ID, objects []phyObject, result *objectNodesResult) {
|
||||
fmt.Fprintf(cmd.OutOrStdout(), "Object %s stores payload in %d data objects:\n", objID.EncodeToString(), len(objects))
|
||||
|
||||
for _, object := range objects {
|
||||
fmt.Fprintf(cmd.OutOrStdout(), "- %s\n", object.objectID)
|
||||
if object.ecHeader != nil {
|
||||
fmt.Fprintf(cmd.OutOrStdout(), "\tEC index: %d\n", object.ecHeader.index)
|
||||
fmt.Fprintf(cmd.OutOrStdout(), "\tEC parent: %s\n", object.ecHeader.parent.EncodeToString())
|
||||
}
|
||||
op, ok := result.placements[object.objectID]
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
if len(op.requiredNodes) > 0 {
|
||||
fmt.Fprintf(cmd.OutOrStdout(), "\tRequired nodes:\n")
|
||||
for _, node := range op.requiredNodes {
|
||||
fmt.Fprintf(cmd.OutOrStdout(), "\t\t- %s\n", hex.EncodeToString(node.PublicKey()))
|
||||
}
|
||||
}
|
||||
if len(op.confirmedNodes) > 0 {
|
||||
fmt.Fprintf(cmd.OutOrStdout(), "\tConfirmed nodes:\n")
|
||||
for _, node := range op.confirmedNodes {
|
||||
fmt.Fprintf(cmd.OutOrStdout(), "\t\t- %s\n", hex.EncodeToString(node.PublicKey()))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if len(result.errors) == 0 {
|
||||
return
|
||||
}
|
||||
fmt.Fprintf(cmd.OutOrStdout(), "Errors:\n")
|
||||
for _, err := range result.errors {
|
||||
fmt.Fprintf(cmd.OutOrStdout(), "\t%s\n", err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
func printObjectNodesAsJSON(cmd *cobra.Command, objID oid.ID, objects []phyObject, result *objectNodesResult) {
|
||||
jsonResult := &objNodesResultJSON{
|
||||
ObjectID: objID.EncodeToString(),
|
||||
}
|
||||
|
||||
for _, object := range objects {
|
||||
do := ObjNodesDataObject{
|
||||
ObjectID: object.objectID.EncodeToString(),
|
||||
}
|
||||
if object.ecHeader != nil {
|
||||
do.ECIndex = &object.ecHeader.index
|
||||
ecParent := object.ecHeader.parent.EncodeToString()
|
||||
do.ECParentObjectID = &ecParent
|
||||
}
|
||||
op, ok := result.placements[object.objectID]
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
for _, rn := range op.requiredNodes {
|
||||
do.RequiredNodes = append(do.RequiredNodes, hex.EncodeToString(rn.PublicKey()))
|
||||
}
|
||||
for _, cn := range op.confirmedNodes {
|
||||
do.ConfirmedNodes = append(do.ConfirmedNodes, hex.EncodeToString(cn.PublicKey()))
|
||||
}
|
||||
jsonResult.DataObjects = append(jsonResult.DataObjects, do)
|
||||
}
|
||||
for _, err := range result.errors {
|
||||
jsonResult.Errors = append(jsonResult.Errors, err.Error())
|
||||
}
|
||||
b, err := json.Marshal(jsonResult)
|
||||
commonCmd.ExitOnErr(cmd, "failed to marshal json: %w", err)
|
||||
cmd.Println(string(b))
|
||||
}
|
||||
|
|
|
@ -370,7 +370,7 @@ func collectObjectRelatives(cmd *cobra.Command, cli *client.Client, cnr cid.ID,
|
|||
|
||||
splitInfo := errSplit.SplitInfo()
|
||||
|
||||
if members, ok := tryGetSplitMembersByLinkingObject(cmd, splitInfo, prmHead, cnr); ok {
|
||||
if members, ok := tryGetSplitMembersByLinkingObject(cmd, splitInfo, prmHead, cnr, true); ok {
|
||||
return members
|
||||
}
|
||||
|
||||
|
@ -381,7 +381,7 @@ func collectObjectRelatives(cmd *cobra.Command, cli *client.Client, cnr cid.ID,
|
|||
return tryRestoreChainInReverse(cmd, splitInfo, prmHead, cli, cnr, obj)
|
||||
}
|
||||
|
||||
func tryGetSplitMembersByLinkingObject(cmd *cobra.Command, splitInfo *objectSDK.SplitInfo, prmHead internal.HeadObjectPrm, cnr cid.ID) ([]oid.ID, bool) {
|
||||
func tryGetSplitMembersByLinkingObject(cmd *cobra.Command, splitInfo *objectSDK.SplitInfo, prmHead internal.HeadObjectPrm, cnr cid.ID, withLinking bool) ([]oid.ID, bool) {
|
||||
// collect split chain by the descending ease of operations (ease is evaluated heuristically).
|
||||
// If any approach fails, we don't try the next since we assume that it will fail too.
|
||||
|
||||
|
@ -402,8 +402,10 @@ func tryGetSplitMembersByLinkingObject(cmd *cobra.Command, splitInfo *objectSDK.
|
|||
|
||||
common.PrintVerbose(cmd, "Received split members from the linking object: %v", children)
|
||||
|
||||
// include linking object
|
||||
return append(children, idLinking), true
|
||||
if withLinking {
|
||||
return append(children, idLinking), true
|
||||
}
|
||||
return children, true
|
||||
}
|
||||
|
||||
// linking object is not required for
|
||||
|
|
|
@ -147,7 +147,7 @@ func (e *ecWriter) relayToContainerNode(ctx context.Context) error {
|
|||
}
|
||||
|
||||
func (e *ecWriter) writeECPart(ctx context.Context, obj *objectSDK.Object) error {
|
||||
t, err := placement.NewTraverser(e.placementOpts...)
|
||||
t, err := placement.NewTraverser(append(e.placementOpts, placement.ForObject(obj.ECHeader().Parent()))...)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -162,6 +162,7 @@ func (e *ecWriter) writeECPart(ctx context.Context, obj *objectSDK.Object) error
|
|||
eg.Go(func() error {
|
||||
return e.writePart(egCtx, obj, int(obj.ECHeader().Index()), nodes)
|
||||
})
|
||||
t.SubmitSuccess()
|
||||
}
|
||||
if err := eg.Wait(); err != nil {
|
||||
return errIncompletePut{
|
||||
|
@ -181,7 +182,8 @@ func (e *ecWriter) writeRawObject(ctx context.Context, obj *objectSDK.Object) er
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
t, err := placement.NewTraverser(e.placementOpts...)
|
||||
objID, _ := obj.ID()
|
||||
t, err := placement.NewTraverser(append(e.placementOpts, placement.ForObject(objID))...)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -198,6 +200,7 @@ func (e *ecWriter) writeRawObject(ctx context.Context, obj *objectSDK.Object) er
|
|||
eg.Go(func() error {
|
||||
return e.writePart(egCtx, parts[idx], idx, nodes)
|
||||
})
|
||||
t.SubmitSuccess()
|
||||
}
|
||||
}
|
||||
if err := eg.Wait(); err != nil {
|
||||
|
|
Loading…
Reference in a new issue