[#1120] cli: Add EC support to object nodes
command
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
parent
854200a874
commit
5c730de96e
1 changed files with 152 additions and 68 deletions
|
@ -14,6 +14,7 @@ import (
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/commonflags"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/commonflags"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/key"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/key"
|
||||||
commonCmd "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/internal/common"
|
commonCmd "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/internal/common"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/policy"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
||||||
|
@ -32,11 +33,16 @@ const (
|
||||||
|
|
||||||
var errNoAvailableEndpoint = errors.New("failed to create client: no available endpoint")
|
var errNoAvailableEndpoint = errors.New("failed to create client: no available endpoint")
|
||||||
|
|
||||||
type objectNodesInfo struct {
|
type phyObject struct {
|
||||||
containerID cid.ID
|
containerID cid.ID
|
||||||
objectID oid.ID
|
objectID oid.ID
|
||||||
relatedObjectIDs []oid.ID
|
storedOnAllContainerNodes bool
|
||||||
isLockOrTombstone bool
|
ecHeader *ecHeader
|
||||||
|
}
|
||||||
|
|
||||||
|
type ecHeader struct {
|
||||||
|
index uint32
|
||||||
|
parent oid.ID
|
||||||
}
|
}
|
||||||
|
|
||||||
type boolError struct {
|
type boolError struct {
|
||||||
|
@ -49,7 +55,7 @@ var objectNodesCmd = &cobra.Command{
|
||||||
Short: "List of nodes where the object is stored",
|
Short: "List of nodes where the object is stored",
|
||||||
Long: `List of nodes where the object should be stored and where it is actually stored.
|
Long: `List of nodes where the object should be stored and where it is actually stored.
|
||||||
Lock objects must exist on all nodes of the container.
|
Lock objects must exist on all nodes of the container.
|
||||||
For complex objects, a node is considered to store an object if the node stores at least one part of the complex object.
|
For complex and EC objects, a node is considered to store an object if the node stores at least one part of the complex object or one chunk of the EC object.
|
||||||
By default, the actual storage of the object is checked only on the nodes that should store the object. To check all nodes, use the flag --verify-presence-all.`,
|
By default, the actual storage of the object is checked only on the nodes that should store the object. To check all nodes, use the flag --verify-presence-all.`,
|
||||||
Run: objectNodes,
|
Run: objectNodes,
|
||||||
}
|
}
|
||||||
|
@ -76,18 +82,18 @@ func objectNodes(cmd *cobra.Command, _ []string) {
|
||||||
pk := key.GetOrGenerate(cmd)
|
pk := key.GetOrGenerate(cmd)
|
||||||
cli := internalclient.GetSDKClientByFlag(cmd, pk, commonflags.RPC)
|
cli := internalclient.GetSDKClientByFlag(cmd, pk, commonflags.RPC)
|
||||||
|
|
||||||
objectInfo := getObjectInfo(cmd, cnrID, objID, cli, pk)
|
objects := getPhyObjects(cmd, cnrID, objID, cli, pk)
|
||||||
|
|
||||||
placementPolicy, netmap := getPlacementPolicyAndNetmap(cmd, cnrID, cli)
|
placementPolicy, netmap := getPlacementPolicyAndNetmap(cmd, cnrID, cli)
|
||||||
|
|
||||||
requiredPlacement := getRequiredPlacement(cmd, objectInfo, placementPolicy, netmap)
|
requiredPlacement := getRequiredPlacement(cmd, objects, placementPolicy, netmap)
|
||||||
|
|
||||||
actualPlacement := getActualPlacement(cmd, netmap, requiredPlacement, pk, objectInfo)
|
actualPlacement := getActualPlacement(cmd, netmap, requiredPlacement, pk, objects)
|
||||||
|
|
||||||
printPlacement(cmd, netmap, requiredPlacement, actualPlacement)
|
printPlacement(cmd, netmap, requiredPlacement, actualPlacement)
|
||||||
}
|
}
|
||||||
|
|
||||||
func getObjectInfo(cmd *cobra.Command, cnrID cid.ID, objID oid.ID, cli *client.Client, pk *ecdsa.PrivateKey) *objectNodesInfo {
|
func getPhyObjects(cmd *cobra.Command, cnrID cid.ID, objID oid.ID, cli *client.Client, pk *ecdsa.PrivateKey) []phyObject {
|
||||||
var addrObj oid.Address
|
var addrObj oid.Address
|
||||||
addrObj.SetContainer(cnrID)
|
addrObj.SetContainer(cnrID)
|
||||||
addrObj.SetObject(objID)
|
addrObj.SetObject(objID)
|
||||||
|
@ -102,44 +108,101 @@ func getObjectInfo(cmd *cobra.Command, cnrID cid.ID, objID oid.ID, cli *client.C
|
||||||
|
|
||||||
res, err := internalclient.HeadObject(cmd.Context(), prmHead)
|
res, err := internalclient.HeadObject(cmd.Context(), prmHead)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
return &objectNodesInfo{
|
obj := phyObject{
|
||||||
containerID: cnrID,
|
containerID: cnrID,
|
||||||
objectID: objID,
|
objectID: objID,
|
||||||
isLockOrTombstone: res.Header().Type() == objectSDK.TypeLock || res.Header().Type() == objectSDK.TypeTombstone,
|
storedOnAllContainerNodes: res.Header().Type() == objectSDK.TypeLock || res.Header().Type() == objectSDK.TypeTombstone,
|
||||||
}
|
}
|
||||||
|
if res.Header().ECHeader() != nil {
|
||||||
|
obj.ecHeader = &ecHeader{
|
||||||
|
index: res.Header().ECHeader().Index(),
|
||||||
|
parent: res.Header().ECHeader().Parent(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return []phyObject{obj}
|
||||||
}
|
}
|
||||||
|
|
||||||
var errSplitInfo *objectSDK.SplitInfoError
|
var errSplitInfo *objectSDK.SplitInfoError
|
||||||
|
if errors.As(err, &errSplitInfo) {
|
||||||
if !errors.As(err, &errSplitInfo) {
|
return getComplexObjectParts(cmd, cnrID, objID, cli, prmHead, errSplitInfo)
|
||||||
commonCmd.ExitOnErr(cmd, "failed to get object info: %w", err)
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var ecInfoError *objectSDK.ECInfoError
|
||||||
|
if errors.As(err, &ecInfoError) {
|
||||||
|
return getECObjectChunks(cmd, cnrID, objID, ecInfoError)
|
||||||
|
}
|
||||||
|
commonCmd.ExitOnErr(cmd, "failed to get object info: %w", err)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func getComplexObjectParts(cmd *cobra.Command, cnrID cid.ID, objID oid.ID, cli *client.Client, prmHead internalclient.HeadObjectPrm, errSplitInfo *objectSDK.SplitInfoError) []phyObject {
|
||||||
|
members := getCompexObjectMembers(cmd, cnrID, objID, cli, prmHead, errSplitInfo)
|
||||||
|
return flattenComplexMembersIfECContainer(cmd, cnrID, members, prmHead)
|
||||||
|
}
|
||||||
|
|
||||||
|
func getCompexObjectMembers(cmd *cobra.Command, cnrID cid.ID, objID oid.ID, cli *client.Client, prmHead internalclient.HeadObjectPrm, errSplitInfo *objectSDK.SplitInfoError) []oid.ID {
|
||||||
splitInfo := errSplitInfo.SplitInfo()
|
splitInfo := errSplitInfo.SplitInfo()
|
||||||
|
|
||||||
if members, ok := tryGetSplitMembersByLinkingObject(cmd, splitInfo, prmHead, cnrID); ok {
|
if members, ok := tryGetSplitMembersByLinkingObject(cmd, splitInfo, prmHead, cnrID); ok {
|
||||||
return &objectNodesInfo{
|
return members
|
||||||
containerID: cnrID,
|
|
||||||
objectID: objID,
|
|
||||||
relatedObjectIDs: members,
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if members, ok := tryGetSplitMembersBySplitID(cmd, splitInfo, cli, cnrID); ok {
|
if members, ok := tryGetSplitMembersBySplitID(cmd, splitInfo, cli, cnrID); ok {
|
||||||
return &objectNodesInfo{
|
return members
|
||||||
containerID: cnrID,
|
|
||||||
objectID: objID,
|
|
||||||
relatedObjectIDs: members,
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
members := tryRestoreChainInReverse(cmd, splitInfo, prmHead, cli, cnrID, objID)
|
return tryRestoreChainInReverse(cmd, splitInfo, prmHead, cli, cnrID, objID)
|
||||||
return &objectNodesInfo{
|
}
|
||||||
containerID: cnrID,
|
|
||||||
objectID: objID,
|
func flattenComplexMembersIfECContainer(cmd *cobra.Command, cnrID cid.ID, members []oid.ID, prmHead internalclient.HeadObjectPrm) []phyObject {
|
||||||
relatedObjectIDs: members,
|
result := make([]phyObject, 0, len(members))
|
||||||
|
var addrObj oid.Address
|
||||||
|
addrObj.SetContainer(cnrID)
|
||||||
|
prmHead.SetRawFlag(true) // to get an error instead of whole object
|
||||||
|
for _, partObjID := range members {
|
||||||
|
addrObj.SetObject(partObjID)
|
||||||
|
prmHead.SetAddress(addrObj)
|
||||||
|
|
||||||
|
_, err := internalclient.HeadObject(cmd.Context(), prmHead)
|
||||||
|
var ecInfoError *objectSDK.ECInfoError
|
||||||
|
if errors.As(err, &ecInfoError) {
|
||||||
|
chunks := getECObjectChunks(cmd, cnrID, partObjID, ecInfoError)
|
||||||
|
result = append(result, chunks...)
|
||||||
|
continue
|
||||||
|
} else if err == nil { // not EC object, so all members must be phy objects
|
||||||
|
for _, member := range members {
|
||||||
|
result = append(result, phyObject{
|
||||||
|
containerID: cnrID,
|
||||||
|
objectID: member,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
break
|
||||||
|
}
|
||||||
|
commonCmd.ExitOnErr(cmd, "failed to read EC chunk of complex object: %w", err)
|
||||||
}
|
}
|
||||||
|
return result
|
||||||
|
}
|
||||||
|
|
||||||
|
func getECObjectChunks(cmd *cobra.Command, cnrID cid.ID, objID oid.ID, errECInfo *objectSDK.ECInfoError) []phyObject {
|
||||||
|
ecInfo := errECInfo.ECInfo()
|
||||||
|
result := make([]phyObject, 0, len(ecInfo.Chunks))
|
||||||
|
for _, ch := range ecInfo.Chunks {
|
||||||
|
var chID oid.ID
|
||||||
|
err := chID.ReadFromV2(ch.ID)
|
||||||
|
if err != nil {
|
||||||
|
commonCmd.ExitOnErr(cmd, "failed to read EC chunk ID %w", err)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
result = append(result, phyObject{
|
||||||
|
containerID: cnrID,
|
||||||
|
objectID: chID,
|
||||||
|
ecHeader: &ecHeader{
|
||||||
|
index: ch.Index,
|
||||||
|
parent: objID,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
}
|
||||||
|
return result
|
||||||
}
|
}
|
||||||
|
|
||||||
func getPlacementPolicyAndNetmap(cmd *cobra.Command, cnrID cid.ID, cli *client.Client) (placementPolicy netmapSDK.PlacementPolicy, netmap *netmapSDK.NetMap) {
|
func getPlacementPolicyAndNetmap(cmd *cobra.Command, cnrID cid.ID, cli *client.Client) (placementPolicy netmapSDK.PlacementPolicy, netmap *netmapSDK.NetMap) {
|
||||||
|
@ -184,29 +247,28 @@ func getNetMap(ctx context.Context, cli *client.Client) (*netmapSDK.NetMap, erro
|
||||||
return &nm, nil
|
return &nm, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func getRequiredPlacement(cmd *cobra.Command, objInfo *objectNodesInfo, placementPolicy netmapSDK.PlacementPolicy, netmap *netmapSDK.NetMap) map[uint64]netmapSDK.NodeInfo {
|
func getRequiredPlacement(cmd *cobra.Command, objects []phyObject, placementPolicy netmapSDK.PlacementPolicy, netmap *netmapSDK.NetMap) map[uint64]netmapSDK.NodeInfo {
|
||||||
|
if policy.IsECPlacement(placementPolicy) {
|
||||||
|
return getECRequiredPlacement(cmd, objects, placementPolicy, netmap)
|
||||||
|
}
|
||||||
|
return getReplicaRequiredPlacement(cmd, objects, placementPolicy, netmap)
|
||||||
|
}
|
||||||
|
|
||||||
|
func getReplicaRequiredPlacement(cmd *cobra.Command, objects []phyObject, placementPolicy netmapSDK.PlacementPolicy, netmap *netmapSDK.NetMap) map[uint64]netmapSDK.NodeInfo {
|
||||||
nodes := make(map[uint64]netmapSDK.NodeInfo)
|
nodes := make(map[uint64]netmapSDK.NodeInfo)
|
||||||
placementBuilder := placement.NewNetworkMapBuilder(netmap)
|
placementBuilder := placement.NewNetworkMapBuilder(netmap)
|
||||||
placement, err := placementBuilder.BuildPlacement(objInfo.containerID, &objInfo.objectID, placementPolicy)
|
for _, object := range objects {
|
||||||
commonCmd.ExitOnErr(cmd, "failed to get required placement: %w", err)
|
placement, err := placementBuilder.BuildPlacement(object.containerID, &object.objectID, placementPolicy)
|
||||||
for repIdx, rep := range placement {
|
commonCmd.ExitOnErr(cmd, "failed to get required placement for object: %w", err)
|
||||||
numOfReplicas := placementPolicy.ReplicaDescriptor(repIdx).NumberOfObjects()
|
for repIdx, rep := range placement {
|
||||||
var nodeIdx uint32
|
numOfReplicas := placementPolicy.ReplicaDescriptor(repIdx).NumberOfObjects()
|
||||||
for _, n := range rep {
|
var nodeIdx uint32
|
||||||
if !objInfo.isLockOrTombstone && nodeIdx == numOfReplicas { // lock and tombstone objects should be on all container nodes
|
|
||||||
break
|
|
||||||
}
|
|
||||||
nodes[n.Hash()] = n
|
|
||||||
nodeIdx++
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, relatedObjID := range objInfo.relatedObjectIDs {
|
|
||||||
placement, err = placementBuilder.BuildPlacement(objInfo.containerID, &relatedObjID, placementPolicy)
|
|
||||||
commonCmd.ExitOnErr(cmd, "failed to get required placement for related object: %w", err)
|
|
||||||
for _, rep := range placement {
|
|
||||||
for _, n := range rep {
|
for _, n := range rep {
|
||||||
|
if !object.storedOnAllContainerNodes && nodeIdx == numOfReplicas {
|
||||||
|
break
|
||||||
|
}
|
||||||
nodes[n.Hash()] = n
|
nodes[n.Hash()] = n
|
||||||
|
nodeIdx++
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -214,8 +276,42 @@ func getRequiredPlacement(cmd *cobra.Command, objInfo *objectNodesInfo, placemen
|
||||||
return nodes
|
return nodes
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func getECRequiredPlacement(cmd *cobra.Command, objects []phyObject, placementPolicy netmapSDK.PlacementPolicy, netmap *netmapSDK.NetMap) map[uint64]netmapSDK.NodeInfo {
|
||||||
|
nodes := make(map[uint64]netmapSDK.NodeInfo)
|
||||||
|
for _, object := range objects {
|
||||||
|
getECRequiredPlacementInternal(cmd, object, placementPolicy, netmap, nodes)
|
||||||
|
}
|
||||||
|
return nodes
|
||||||
|
}
|
||||||
|
|
||||||
|
func getECRequiredPlacementInternal(cmd *cobra.Command, object phyObject, placementPolicy netmapSDK.PlacementPolicy, netmap *netmapSDK.NetMap, nodes map[uint64]netmapSDK.NodeInfo) {
|
||||||
|
placementObjectID := object.objectID
|
||||||
|
if object.ecHeader != nil {
|
||||||
|
placementObjectID = object.ecHeader.parent
|
||||||
|
}
|
||||||
|
placementBuilder := placement.NewNetworkMapBuilder(netmap)
|
||||||
|
placement, err := placementBuilder.BuildPlacement(object.containerID, &placementObjectID, placementPolicy)
|
||||||
|
commonCmd.ExitOnErr(cmd, "failed to get required placement: %w", err)
|
||||||
|
|
||||||
|
for _, vector := range placement {
|
||||||
|
if object.storedOnAllContainerNodes {
|
||||||
|
for _, node := range vector {
|
||||||
|
nodes[node.Hash()] = node
|
||||||
|
}
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if object.ecHeader != nil {
|
||||||
|
chunkIdx := int(object.ecHeader.index)
|
||||||
|
nodeIdx := chunkIdx % len(vector)
|
||||||
|
node := vector[nodeIdx]
|
||||||
|
nodes[node.Hash()] = node
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func getActualPlacement(cmd *cobra.Command, netmap *netmapSDK.NetMap, requiredPlacement map[uint64]netmapSDK.NodeInfo,
|
func getActualPlacement(cmd *cobra.Command, netmap *netmapSDK.NetMap, requiredPlacement map[uint64]netmapSDK.NodeInfo,
|
||||||
pk *ecdsa.PrivateKey, objInfo *objectNodesInfo,
|
pk *ecdsa.PrivateKey, objects []phyObject,
|
||||||
) map[uint64]boolError {
|
) map[uint64]boolError {
|
||||||
result := make(map[uint64]boolError)
|
result := make(map[uint64]boolError)
|
||||||
resultMtx := &sync.Mutex{}
|
resultMtx := &sync.Mutex{}
|
||||||
|
@ -243,23 +339,11 @@ func getActualPlacement(cmd *cobra.Command, netmap *netmapSDK.NetMap, requiredPl
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
eg.Go(func() error {
|
for _, object := range objects {
|
||||||
var v boolError
|
object := object
|
||||||
v.value, v.err = isObjectStoredOnNode(egCtx, cmd, objInfo.containerID, objInfo.objectID, cli, pk)
|
|
||||||
resultMtx.Lock()
|
|
||||||
defer resultMtx.Unlock()
|
|
||||||
if prev, exists := result[cand.Hash()]; exists && (prev.err != nil || prev.value) {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
result[cand.Hash()] = v
|
|
||||||
return nil
|
|
||||||
})
|
|
||||||
|
|
||||||
for _, rObjID := range objInfo.relatedObjectIDs {
|
|
||||||
rObjID := rObjID
|
|
||||||
eg.Go(func() error {
|
eg.Go(func() error {
|
||||||
var v boolError
|
var v boolError
|
||||||
v.value, v.err = isObjectStoredOnNode(egCtx, cmd, objInfo.containerID, rObjID, cli, pk)
|
v.value, v.err = isObjectStoredOnNode(egCtx, cmd, object.containerID, object.objectID, cli, pk)
|
||||||
resultMtx.Lock()
|
resultMtx.Lock()
|
||||||
defer resultMtx.Unlock()
|
defer resultMtx.Unlock()
|
||||||
if prev, exists := result[cand.Hash()]; exists && (prev.err != nil || prev.value) {
|
if prev, exists := result[cand.Hash()]; exists && (prev.err != nil || prev.value) {
|
||||||
|
|
Loading…
Reference in a new issue