frostfs-cli & frostfs-ir refactorings #165

Merged
fyrchik merged 5 commits from dstepanov-yadro/frostfs-node:refactoring/OBJECT-3610_cli_ir into master 2023-03-27 07:43:58 +00:00
5 changed files with 243 additions and 138 deletions

View file

@ -42,27 +42,15 @@ func initObjectGetCmd() {
flags.Bool(binaryFlag, false, "Serialize whole object structure into given file(id + signature + header + payload).")
}
// nolint: funlen
func getObject(cmd *cobra.Command, _ []string) {
var cnr cid.ID
var obj oid.ID
objAddr := readObjectAddress(cmd, &cnr, &obj)
var out io.Writer
filename := cmd.Flag(fileFlag).Value.String()
if filename == "" {
out = os.Stdout
} else {
f, err := os.OpenFile(filename, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0644)
if err != nil {
commonCmd.ExitOnErr(cmd, "", fmt.Errorf("can't open file '%s': %w", filename, err))
}
defer f.Close()
out = f
}
out, closer := createOutWriter(cmd, filename)
defer closer()
pk := key.GetOrGenerate(cmd)
@ -114,6 +102,10 @@ func getObject(cmd *cobra.Command, _ []string) {
commonCmd.ExitOnErr(cmd, "rpc error: %w", err)
}
processResult(cmd, res, binary, payloadBuffer, out, filename)
}
func processResult(cmd *cobra.Command, res *internalclient.GetObjectRes, binary bool, payloadBuffer *bytes.Buffer, out io.Writer, filename string) {
if binary {
objToStore := res.Header()
// TODO(@acid-ant): #1932 Use streams to marshal/unmarshal payload
@ -130,11 +122,29 @@ func getObject(cmd *cobra.Command, _ []string) {
// Print header only if file is not streamed to stdout.
if filename != "" {
err = printHeader(cmd, res.Header())
err := printHeader(cmd, res.Header())
commonCmd.ExitOnErr(cmd, "", err)
}
}
func createOutWriter(cmd *cobra.Command, filename string) (out io.Writer, closer func()) {
if filename == "" {
out = os.Stdout
closer = func() {}
} else {
f, err := os.OpenFile(filename, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0644)
if err != nil {
commonCmd.ExitOnErr(cmd, "", fmt.Errorf("can't open file '%s': %w", filename, err))
}
out = f
closer = func() {
f.Close()
}
}
return
}
func strictOutput(cmd *cobra.Command) bool {
toJSON, _ := cmd.Flags().GetBool(commonflags.JSON)
toProto, _ := cmd.Flags().GetBool("proto")

View file

@ -58,7 +58,6 @@ func initObjectPutCmd() {
flags.Bool(binaryFlag, false, "Deserialize object structure from given file.")
}
// nolint: funlen
func putObject(cmd *cobra.Command, _ []string) {
binary, _ := cmd.Flags().GetBool(binaryFlag)
cidVal, _ := cmd.Flags().GetString(commonflags.CIDFlag)
@ -80,42 +79,13 @@ func putObject(cmd *cobra.Command, _ []string) {
obj := object.New()
if binary {
buf, err := os.ReadFile(filename)
commonCmd.ExitOnErr(cmd, "unable to read given file: %w", err)
objTemp := object.New()
// TODO(@acid-ant): #1932 Use streams to marshal/unmarshal payload
commonCmd.ExitOnErr(cmd, "can't unmarshal object from given file: %w", objTemp.Unmarshal(buf))
payloadReader = bytes.NewReader(objTemp.Payload())
cnr, _ = objTemp.ContainerID()
ownerID = *objTemp.OwnerID()
payloadReader, cnr, ownerID = readFilePayload(filename, cmd)
} else {
readCID(cmd, &cnr)
user.IDFromKey(&ownerID, pk.PublicKey)
}
attrs, err := parseObjectAttrs(cmd)
commonCmd.ExitOnErr(cmd, "can't parse object attributes: %w", err)
expiresOn, _ := cmd.Flags().GetUint64(commonflags.ExpireAt)
if expiresOn > 0 {
var expAttrFound bool
expAttrValue := strconv.FormatUint(expiresOn, 10)
for i := range attrs {
if attrs[i].Key() == objectV2.SysAttributeExpEpoch {
attrs[i].SetValue(expAttrValue)
expAttrFound = true
break
}
}
if !expAttrFound {
index := len(attrs)
attrs = append(attrs, object.Attribute{})
attrs[index].SetKey(objectV2.SysAttributeExpEpoch)
attrs[index].SetValue(expAttrValue)
}
}
attrs := getAllObjectAttributes(cmd)
obj.SetContainerID(cnr)
obj.SetOwnerID(&ownerID)
@ -140,23 +110,9 @@ func putObject(cmd *cobra.Command, _ []string) {
prm.SetPayloadReader(payloadReader)
} else {
if binary {
p = pb.New(len(obj.Payload()))
p.Output = cmd.OutOrStdout()
prm.SetPayloadReader(p.NewProxyReader(payloadReader))
prm.SetHeaderCallback(func(o *object.Object) { p.Start() })
p = setBinaryPayloadReader(cmd, obj, &prm, payloadReader)
} else {
fi, err := f.Stat()
if err != nil {
cmd.PrintErrf("Failed to get file size, progress bar is disabled: %v\n", err)
prm.SetPayloadReader(f)
} else {
p = pb.New64(fi.Size())
p.Output = cmd.OutOrStdout()
prm.SetPayloadReader(p.NewProxyReader(f))
prm.SetHeaderCallback(func(o *object.Object) {
p.Start()
})
}
p = setFilePayloadReader(cmd, f, &prm)
}
}
@ -170,6 +126,67 @@ func putObject(cmd *cobra.Command, _ []string) {
cmd.Printf(" OID: %s\n CID: %s\n", res.ID(), cnr)
}
func readFilePayload(filename string, cmd *cobra.Command) (io.Reader, cid.ID, user.ID) {
buf, err := os.ReadFile(filename)
commonCmd.ExitOnErr(cmd, "unable to read given file: %w", err)
objTemp := object.New()
// TODO(@acid-ant): #1932 Use streams to marshal/unmarshal payload
commonCmd.ExitOnErr(cmd, "can't unmarshal object from given file: %w", objTemp.Unmarshal(buf))
payloadReader := bytes.NewReader(objTemp.Payload())
cnr, _ := objTemp.ContainerID()
ownerID := *objTemp.OwnerID()
return payloadReader, cnr, ownerID
}
func setFilePayloadReader(cmd *cobra.Command, f *os.File, prm *internalclient.PutObjectPrm) *pb.ProgressBar {
fi, err := f.Stat()
if err != nil {
cmd.PrintErrf("Failed to get file size, progress bar is disabled: %v\n", err)
prm.SetPayloadReader(f)
return nil
}
p := pb.New64(fi.Size())
p.Output = cmd.OutOrStdout()
prm.SetPayloadReader(p.NewProxyReader(f))
prm.SetHeaderCallback(func(o *object.Object) { p.Start() })
return p
}
func setBinaryPayloadReader(cmd *cobra.Command, obj *object.Object, prm *internalclient.PutObjectPrm, payloadReader io.Reader) *pb.ProgressBar {
p := pb.New(len(obj.Payload()))
p.Output = cmd.OutOrStdout()
prm.SetPayloadReader(p.NewProxyReader(payloadReader))
prm.SetHeaderCallback(func(o *object.Object) { p.Start() })
return p
}
func getAllObjectAttributes(cmd *cobra.Command) []object.Attribute {
attrs, err := parseObjectAttrs(cmd)
commonCmd.ExitOnErr(cmd, "can't parse object attributes: %w", err)
expiresOn, _ := cmd.Flags().GetUint64(commonflags.ExpireAt)
if expiresOn > 0 {
var expAttrFound bool
expAttrValue := strconv.FormatUint(expiresOn, 10)
for i := range attrs {
if attrs[i].Key() == objectV2.SysAttributeExpEpoch {
attrs[i].SetValue(expAttrValue)
expAttrFound = true
break
}
}
if !expAttrFound {
index := len(attrs)
attrs = append(attrs, object.Attribute{})
attrs[index].SetKey(objectV2.SysAttributeExpEpoch)
attrs[index].SetValue(expAttrValue)
}
}
return attrs
}
func parseObjectAttrs(cmd *cobra.Command) ([]object.Attribute, error) {
var rawAttrs []string

View file

@ -339,8 +339,6 @@ func initFlagSession(cmd *cobra.Command, verb string) {
// container.
//
// The object itself is not included in the result.
//
// nolint: funlen
func collectObjectRelatives(cmd *cobra.Command, cli *client.Client, cnr cid.ID, obj oid.ID) []oid.ID {
common.PrintVerbose(cmd, "Fetching raw object header...")
@ -372,13 +370,28 @@ func collectObjectRelatives(cmd *cobra.Command, cli *client.Client, cnr cid.ID,
splitInfo := errSplit.SplitInfo()
if members, ok := tryGetSplitMembersByLinkingObject(cmd, splitInfo, prmHead, cnr); ok {
return members
}
if members, ok := tryGetSplitMembersBySplitID(cmd, splitInfo, cli, cnr); ok {
return members
}
return tryRestoreChainInReverse(cmd, splitInfo, prmHead, cli, cnr, obj)
}
func tryGetSplitMembersByLinkingObject(cmd *cobra.Command, splitInfo *object.SplitInfo, prmHead internal.HeadObjectPrm, cnr cid.ID) ([]oid.ID, bool) {
// collect split chain by the descending ease of operations (ease is evaluated heuristically).
// If any approach fails, we don't try the next since we assume that it will fail too.
if idLinking, ok := splitInfo.Link(); ok {
common.PrintVerbose(cmd, "Collecting split members using linking object %s...", idLinking)
var addrObj oid.Address
addrObj.SetContainer(cnr)
addrObj.SetObject(idLinking)
prmHead.SetAddress(addrObj)
prmHead.SetRawFlag(false)
// client is already set
@ -390,14 +403,17 @@ func collectObjectRelatives(cmd *cobra.Command, cli *client.Client, cnr cid.ID,
common.PrintVerbose(cmd, "Received split members from the linking object: %v", children)
// include linking object
return append(children, idLinking)
return append(children, idLinking), true
}
// linking object is not required for
// object collecting
common.PrintVerbose(cmd, "failed to get linking object's header: %w", err)
}
return nil, false
}
func tryGetSplitMembersBySplitID(cmd *cobra.Command, splitInfo *object.SplitInfo, cli *client.Client, cnr cid.ID) ([]oid.ID, bool) {
if idSplit := splitInfo.SplitID(); idSplit != nil {
common.PrintVerbose(cmd, "Collecting split members by split ID...")
@ -412,12 +428,18 @@ func collectObjectRelatives(cmd *cobra.Command, cli *client.Client, cnr cid.ID,
res, err := internal.SearchObjects(prm)
commonCmd.ExitOnErr(cmd, "failed to search objects by split ID: %w", err)
members := res.IDList()
parts := res.IDList()
common.PrintVerbose(cmd, "Found objects by split ID: %v", res.IDList())
return members
return parts, true
}
return nil, false
}
func tryRestoreChainInReverse(cmd *cobra.Command, splitInfo *object.SplitInfo, prmHead internal.HeadObjectPrm, cli *client.Client, cnr cid.ID, obj oid.ID) []oid.ID {
var addrObj oid.Address
addrObj.SetContainer(cnr)
idMember, ok := splitInfo.LastPart()
if !ok {
@ -427,6 +449,8 @@ func collectObjectRelatives(cmd *cobra.Command, cli *client.Client, cnr cid.ID,
common.PrintVerbose(cmd, "Traverse the object split chain in reverse...", idMember)
var res *internal.HeadObjectRes
var err error
chain := []oid.ID{idMember}
chainSet := map[oid.ID]struct{}{idMember: {}}
@ -437,6 +461,7 @@ func collectObjectRelatives(cmd *cobra.Command, cli *client.Client, cnr cid.ID,
common.PrintVerbose(cmd, "Reading previous element of the split chain member %s...", idMember)
addrObj.SetObject(idMember)
prmHead.SetAddress(addrObj)
res, err = internal.HeadObject(prmHead)
commonCmd.ExitOnErr(cmd, "failed to read split chain member's header: %w", err)

View file

@ -162,8 +162,6 @@ func eaclFiltersToString(fs []eacl.Filter) string {
// ParseEACLRules parses eACL table.
// Uses ParseEACLRule.
//
//nolint:godot
func ParseEACLRules(table *eacl.Table, rules []string) error {
if len(rules) == 0 {
return errors.New("no extended ACL rules has been provided")

View file

@ -41,33 +41,122 @@ func newConfig(path, directory string) (*viper.Viper, error) {
return v, err
}
// nolint: funlen
func defaultConfiguration(cfg *viper.Viper) {
cfg.SetDefault("logger.level", "info")
cfg.SetDefault("pprof.address", "localhost:6060")
cfg.SetDefault("pprof.shutdown_timeout", "30s")
setPprofDefaults(cfg)
cfg.SetDefault("prometheus.address", "localhost:9090")
cfg.SetDefault("prometheus.shutdown_timeout", "30s")
setPrometheusDefaults(cfg)
cfg.SetDefault("without_mainnet", false)
cfg.SetDefault("node.persistent_state.path", ".frostfs-ir-state")
cfg.SetDefault("morph.endpoint.client", []string{})
cfg.SetDefault("morph.dial_timeout", 15*time.Second)
cfg.SetDefault("morph.validators", []string{})
cfg.SetDefault("morph.switch_interval", 2*time.Minute)
setMorphDefaults(cfg)
cfg.SetDefault("mainnet.endpoint.client", []string{})
cfg.SetDefault("mainnet.dial_timeout", 15*time.Second)
cfg.SetDefault("mainnet.switch_interval", 2*time.Minute)
setMainNetDefaults(cfg)
cfg.SetDefault("wallet.path", "") // inner ring node NEP-6 wallet
cfg.SetDefault("wallet.address", "") // account address
cfg.SetDefault("wallet.password", "") // password
setWalletDefaults(cfg)
setContractsDefaults(cfg)
setTimersDefaults(cfg)
setWorkersDefaults(cfg)
setNetmapCleanerDefaults(cfg)
setEmitDefaults(cfg)
setAuditDefaults(cfg)
setSettlementDefaults(cfg)
cfg.SetDefault("indexer.cache_timeout", 15*time.Second)
cfg.SetDefault("locode.db.path", "")
setFeeDefaults(cfg)
setControlDefaults(cfg)
cfg.SetDefault("governance.disable", false)
}
func setControlDefaults(cfg *viper.Viper) {
cfg.SetDefault("control.authorized_keys", []string{})
cfg.SetDefault("control.grpc.endpoint", "")
}
func setFeeDefaults(cfg *viper.Viper) {
// extra fee values for working mode without notary contract
cfg.SetDefault("fee.main_chain", 5000_0000) // 0.5 Fixed8
cfg.SetDefault("fee.side_chain", 2_0000_0000) // 2.0 Fixed8
cfg.SetDefault("fee.named_container_register", 25_0000_0000) // 25.0 Fixed8
}
func setSettlementDefaults(cfg *viper.Viper) {
cfg.SetDefault("settlement.basic_income_rate", 0)
cfg.SetDefault("settlement.audit_fee", 0)
}
func setAuditDefaults(cfg *viper.Viper) {
cfg.SetDefault("audit.task.exec_pool_size", 10)
cfg.SetDefault("audit.task.queue_capacity", 100)
cfg.SetDefault("audit.timeout.get", "5s")
cfg.SetDefault("audit.timeout.head", "5s")
cfg.SetDefault("audit.timeout.rangehash", "5s")
cfg.SetDefault("audit.timeout.search", "10s")
cfg.SetDefault("audit.pdp.max_sleep_interval", "5s")
cfg.SetDefault("audit.pdp.pairs_pool_size", "10")
cfg.SetDefault("audit.por.pool_size", "10")
}
func setEmitDefaults(cfg *viper.Viper) {
cfg.SetDefault("emit.storage.amount", 0)
cfg.SetDefault("emit.mint.cache_size", 1000)
cfg.SetDefault("emit.mint.threshold", 1)
cfg.SetDefault("emit.mint.value", 20000000) // 0.2 Fixed8
cfg.SetDefault("emit.gas.balance_threshold", 0)
cfg.SetDefault("emit.extra_wallets", nil)
}
func setPprofDefaults(cfg *viper.Viper) {
cfg.SetDefault("pprof.address", "localhost:6060")
cfg.SetDefault("pprof.shutdown_timeout", "30s")
}
func setPrometheusDefaults(cfg *viper.Viper) {
cfg.SetDefault("prometheus.address", "localhost:9090")
cfg.SetDefault("prometheus.shutdown_timeout", "30s")
}
func setNetmapCleanerDefaults(cfg *viper.Viper) {
cfg.SetDefault("netmap_cleaner.enabled", true)
cfg.SetDefault("netmap_cleaner.threshold", 3)
}
func setWorkersDefaults(cfg *viper.Viper) {
cfg.SetDefault("workers.netmap", "10")
cfg.SetDefault("workers.balance", "10")
cfg.SetDefault("workers.frostfs", "10")
cfg.SetDefault("workers.container", "10")
cfg.SetDefault("workers.alphabet", "10")
cfg.SetDefault("workers.reputation", "10")
cfg.SetDefault("workers.subnet", "10")
}
func setTimersDefaults(cfg *viper.Viper) {
cfg.SetDefault("timers.emit", "0")
cfg.SetDefault("timers.stop_estimation.mul", 1)
cfg.SetDefault("timers.stop_estimation.div", 4)
cfg.SetDefault("timers.collect_basic_income.mul", 1)
cfg.SetDefault("timers.collect_basic_income.div", 2)
cfg.SetDefault("timers.distribute_basic_income.mul", 3)
cfg.SetDefault("timers.distribute_basic_income.div", 4)
}
func setContractsDefaults(cfg *viper.Viper) {
cfg.SetDefault("contracts.netmap", "")
cfg.SetDefault("contracts.frostfs", "")
cfg.SetDefault("contracts.balance", "")
@ -78,57 +167,23 @@ func defaultConfiguration(cfg *viper.Viper) {
cfg.SetDefault("contracts.reputation", "")
cfg.SetDefault("contracts.subnet", "")
cfg.SetDefault("contracts.proxy", "")
cfg.SetDefault("timers.emit", "0")
cfg.SetDefault("timers.stop_estimation.mul", 1)
cfg.SetDefault("timers.stop_estimation.div", 4)
cfg.SetDefault("timers.collect_basic_income.mul", 1)
cfg.SetDefault("timers.collect_basic_income.div", 2)
cfg.SetDefault("timers.distribute_basic_income.mul", 3)
cfg.SetDefault("timers.distribute_basic_income.div", 4)
cfg.SetDefault("workers.netmap", "10")
cfg.SetDefault("workers.balance", "10")
cfg.SetDefault("workers.frostfs", "10")
cfg.SetDefault("workers.container", "10")
cfg.SetDefault("workers.alphabet", "10")
cfg.SetDefault("workers.reputation", "10")
cfg.SetDefault("workers.subnet", "10")
cfg.SetDefault("netmap_cleaner.enabled", true)
cfg.SetDefault("netmap_cleaner.threshold", 3)
cfg.SetDefault("emit.storage.amount", 0)
cfg.SetDefault("emit.mint.cache_size", 1000)
cfg.SetDefault("emit.mint.threshold", 1)
cfg.SetDefault("emit.mint.value", 20000000) // 0.2 Fixed8
cfg.SetDefault("emit.gas.balance_threshold", 0)
cfg.SetDefault("emit.extra_wallets", nil)
cfg.SetDefault("audit.task.exec_pool_size", 10)
cfg.SetDefault("audit.task.queue_capacity", 100)
cfg.SetDefault("audit.timeout.get", "5s")
cfg.SetDefault("audit.timeout.head", "5s")
cfg.SetDefault("audit.timeout.rangehash", "5s")
cfg.SetDefault("audit.timeout.search", "10s")
cfg.SetDefault("audit.pdp.max_sleep_interval", "5s")
cfg.SetDefault("audit.pdp.pairs_pool_size", "10")
cfg.SetDefault("audit.por.pool_size", "10")
cfg.SetDefault("settlement.basic_income_rate", 0)
cfg.SetDefault("settlement.audit_fee", 0)
cfg.SetDefault("indexer.cache_timeout", 15*time.Second)
cfg.SetDefault("locode.db.path", "")
// extra fee values for working mode without notary contract
cfg.SetDefault("fee.main_chain", 5000_0000) // 0.5 Fixed8
cfg.SetDefault("fee.side_chain", 2_0000_0000) // 2.0 Fixed8
cfg.SetDefault("fee.named_container_register", 25_0000_0000) // 25.0 Fixed8
cfg.SetDefault("control.authorized_keys", []string{})
cfg.SetDefault("control.grpc.endpoint", "")
cfg.SetDefault("governance.disable", false)
}
func setWalletDefaults(cfg *viper.Viper) {
cfg.SetDefault("wallet.path", "") // inner ring node NEP-6 wallet
cfg.SetDefault("wallet.address", "") // account address
cfg.SetDefault("wallet.password", "") // password
}
func setMainNetDefaults(cfg *viper.Viper) {
cfg.SetDefault("mainnet.endpoint.client", []string{})
cfg.SetDefault("mainnet.dial_timeout", 15*time.Second)
cfg.SetDefault("mainnet.switch_interval", 2*time.Minute)
}
func setMorphDefaults(cfg *viper.Viper) {
cfg.SetDefault("morph.endpoint.client", []string{})
cfg.SetDefault("morph.dial_timeout", 15*time.Second)
cfg.SetDefault("morph.validators", []string{})
cfg.SetDefault("morph.switch_interval", 2*time.Minute)
}