226 lines
7.3 KiB
Go
226 lines
7.3 KiB
Go
package container
|
|
|
|
import (
|
|
"errors"
|
|
"fmt"
|
|
"os"
|
|
"strings"
|
|
"time"
|
|
|
|
internalclient "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/client"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/common"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/commonflags"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/key"
|
|
commonCmd "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/internal/common"
|
|
containerApi "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/container"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
|
|
"github.com/spf13/cobra"
|
|
)
|
|
|
|
var (
|
|
containerPolicy string
|
|
containerAttributes []string
|
|
containerAwait bool
|
|
containerName string
|
|
containerNnsName string
|
|
containerNnsZone string
|
|
containerNoTimestamp bool
|
|
force bool
|
|
)
|
|
|
|
var createContainerCmd = &cobra.Command{
|
|
Use: "create",
|
|
Short: "Create new container",
|
|
Long: `Create new container and register it in the FrostFS.
|
|
It will be stored in sidechain when inner ring will accepts it.`,
|
|
Run: func(cmd *cobra.Command, _ []string) {
|
|
placementPolicy, err := parseContainerPolicy(cmd, containerPolicy)
|
|
commonCmd.ExitOnErr(cmd, "", err)
|
|
|
|
key := key.Get(cmd)
|
|
cli := internalclient.GetSDKClientByFlag(cmd, key, commonflags.RPC)
|
|
|
|
if !force {
|
|
var prm internalclient.NetMapSnapshotPrm
|
|
prm.SetClient(cli)
|
|
|
|
resmap, err := internalclient.NetMapSnapshot(cmd.Context(), prm)
|
|
commonCmd.ExitOnErr(cmd, "unable to get netmap snapshot to validate container placement, "+
|
|
"use --force option to skip this check: %w", err)
|
|
|
|
nodesByRep, err := resmap.NetMap().ContainerNodes(*placementPolicy, nil)
|
|
commonCmd.ExitOnErr(cmd, "could not build container nodes based on given placement policy, "+
|
|
"use --force option to skip this check: %w", err)
|
|
|
|
for i, nodes := range nodesByRep {
|
|
if repNum := placementPolicy.ReplicaDescriptor(i).NumberOfObjects(); repNum > 0 {
|
|
if repNum > uint32(len(nodes)) {
|
|
commonCmd.ExitOnErr(cmd, "", fmt.Errorf(
|
|
"the number of nodes '%d' in selector is not enough for the number of replicas '%d', "+
|
|
"use --force option to skip this check",
|
|
len(nodes),
|
|
repNum,
|
|
))
|
|
}
|
|
} else if ecParts := placementPolicy.ReplicaDescriptor(i).TotalECPartCount(); ecParts > 0 {
|
|
if ecParts > uint32(len(nodes)) {
|
|
commonCmd.ExitOnErr(cmd, "", fmt.Errorf(
|
|
"the number of nodes '%d' in selector is not enough for EC placement '%d.%d', "+
|
|
"use --force option to skip this check",
|
|
len(nodes),
|
|
placementPolicy.ReplicaDescriptor(i).GetECDataCount(),
|
|
placementPolicy.ReplicaDescriptor(i).GetECParityCount(),
|
|
))
|
|
}
|
|
} else {
|
|
commonCmd.ExitOnErr(cmd, "%w", errors.New("no replication policy is set"))
|
|
}
|
|
}
|
|
}
|
|
|
|
var cnr container.Container
|
|
cnr.Init()
|
|
|
|
err = parseAttributes(&cnr, containerAttributes)
|
|
commonCmd.ExitOnErr(cmd, "", err)
|
|
|
|
tok := getSession(cmd)
|
|
|
|
if tok != nil {
|
|
issuer := tok.Issuer()
|
|
cnr.SetOwner(issuer)
|
|
} else {
|
|
var idOwner user.ID
|
|
user.IDFromKey(&idOwner, key.PublicKey)
|
|
|
|
cnr.SetOwner(idOwner)
|
|
}
|
|
|
|
cnr.SetPlacementPolicy(*placementPolicy)
|
|
|
|
var syncContainerPrm internalclient.SyncContainerPrm
|
|
syncContainerPrm.SetClient(cli)
|
|
syncContainerPrm.SetContainer(&cnr)
|
|
|
|
_, err = internalclient.SyncContainerSettings(cmd.Context(), syncContainerPrm)
|
|
commonCmd.ExitOnErr(cmd, "syncing container's settings rpc error: %w", err)
|
|
|
|
putPrm := internalclient.PutContainerPrm{
|
|
Client: cli,
|
|
ClientParams: client.PrmContainerPut{
|
|
Container: &cnr,
|
|
Session: tok,
|
|
},
|
|
}
|
|
|
|
res, err := internalclient.PutContainer(cmd.Context(), putPrm)
|
|
commonCmd.ExitOnErr(cmd, "put container rpc error: %w", err)
|
|
|
|
id := res.ID()
|
|
|
|
cmd.Println("CID:", id)
|
|
|
|
if containerAwait {
|
|
cmd.Println("awaiting...")
|
|
|
|
getPrm := internalclient.GetContainerPrm{
|
|
Client: cli,
|
|
ClientParams: client.PrmContainerGet{
|
|
ContainerID: &id,
|
|
},
|
|
}
|
|
|
|
for range awaitTimeout {
|
|
time.Sleep(1 * time.Second)
|
|
|
|
_, err := internalclient.GetContainer(cmd.Context(), getPrm)
|
|
if err == nil {
|
|
cmd.Println("container has been persisted on sidechain")
|
|
return
|
|
}
|
|
}
|
|
|
|
commonCmd.ExitOnErr(cmd, "", errCreateTimeout)
|
|
}
|
|
},
|
|
}
|
|
|
|
func initContainerCreateCmd() {
|
|
flags := createContainerCmd.Flags()
|
|
|
|
// Init common flags
|
|
flags.StringP(commonflags.RPC, commonflags.RPCShorthand, commonflags.RPCDefault, commonflags.RPCUsage)
|
|
flags.Bool(commonflags.TracingFlag, false, commonflags.TracingFlagUsage)
|
|
flags.DurationP(commonflags.Timeout, commonflags.TimeoutShorthand, commonflags.TimeoutDefault, commonflags.TimeoutUsage)
|
|
flags.StringP(commonflags.WalletPath, commonflags.WalletPathShorthand, commonflags.WalletPathDefault, commonflags.WalletPathUsage)
|
|
flags.StringP(commonflags.Account, commonflags.AccountShorthand, commonflags.AccountDefault, commonflags.AccountUsage)
|
|
flags.StringVarP(&containerPolicy, "policy", "p", "", "QL-encoded or JSON-encoded placement policy or path to file with it")
|
|
flags.StringSliceVarP(&containerAttributes, "attributes", "a", nil, "Comma separated pairs of container attributes in form of Key1=Value1,Key2=Value2")
|
|
flags.BoolVar(&containerAwait, "await", false, "Block execution until container is persisted")
|
|
flags.StringVar(&containerName, "name", "", "Container name attribute")
|
|
flags.StringVar(&containerNnsName, "nns-name", "", "Container nns name attribute")
|
|
flags.StringVar(&containerNnsZone, "nns-zone", "", "Container nns zone attribute")
|
|
flags.BoolVar(&containerNoTimestamp, "disable-timestamp", false, "Disable timestamp container attribute")
|
|
flags.BoolVarP(&force, commonflags.ForceFlag, commonflags.ForceFlagShorthand, false,
|
|
"Skip placement validity check")
|
|
}
|
|
|
|
func parseContainerPolicy(cmd *cobra.Command, policyString string) (*netmap.PlacementPolicy, error) {
|
|
_, err := os.Stat(policyString) // check if `policyString` is a path to file with placement policy
|
|
if err == nil {
|
|
common.PrintVerbose(cmd, "Reading placement policy from file: %s", policyString)
|
|
|
|
data, err := os.ReadFile(policyString)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("can't read file with placement policy: %w", err)
|
|
}
|
|
|
|
policyString = string(data)
|
|
}
|
|
|
|
var result netmap.PlacementPolicy
|
|
|
|
err = result.DecodeString(policyString)
|
|
if err == nil {
|
|
common.PrintVerbose(cmd, "Parsed QL encoded policy")
|
|
return &result, nil
|
|
}
|
|
|
|
if err := result.UnmarshalJSON([]byte(policyString)); err == nil {
|
|
common.PrintVerbose(cmd, "Parsed JSON encoded policy")
|
|
return &result, nil
|
|
}
|
|
|
|
return nil, fmt.Errorf("can't parse placement policy: %w", err)
|
|
}
|
|
|
|
func parseAttributes(dst *container.Container, attributes []string) error {
|
|
for i := range attributes {
|
|
k, v, found := strings.Cut(attributes[i], attributeDelimiter)
|
|
if !found {
|
|
return errors.New("invalid container attribute")
|
|
}
|
|
|
|
dst.SetAttribute(k, v)
|
|
}
|
|
|
|
if !containerNoTimestamp {
|
|
container.SetCreationTime(dst, time.Now())
|
|
}
|
|
|
|
if containerName != "" {
|
|
container.SetName(dst, containerName)
|
|
}
|
|
|
|
if containerNnsName != "" {
|
|
dst.SetAttribute(containerApi.SysAttributeName, containerNnsName)
|
|
}
|
|
if containerNnsZone != "" {
|
|
dst.SetAttribute(containerApi.SysAttributeZone, containerNnsZone)
|
|
}
|
|
|
|
return nil
|
|
}
|