forked from TrueCloudLab/frostfs-node
277 lines
8 KiB
Go
277 lines
8 KiB
Go
package object
|
|
|
|
import (
|
|
"bytes"
|
|
"fmt"
|
|
"io"
|
|
"os"
|
|
"path/filepath"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
|
|
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
|
|
internalclient "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/client"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/commonflags"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/key"
|
|
commonCmd "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/internal/common"
|
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
|
|
"github.com/cheggaaa/pb"
|
|
"github.com/spf13/cobra"
|
|
)
|
|
|
|
const (
|
|
noProgressFlag = "no-progress"
|
|
notificationFlag = "notify"
|
|
copiesNumberFlag = "copies-number"
|
|
)
|
|
|
|
var putExpiredOn uint64
|
|
|
|
var objectPutCmd = &cobra.Command{
|
|
Use: "put",
|
|
Short: "Put object to FrostFS",
|
|
Long: "Put object to FrostFS",
|
|
Run: putObject,
|
|
}
|
|
|
|
func initObjectPutCmd() {
|
|
commonflags.Init(objectPutCmd)
|
|
initFlagSession(objectPutCmd, "PUT")
|
|
|
|
flags := objectPutCmd.Flags()
|
|
|
|
flags.String(fileFlag, "", "File with object payload")
|
|
_ = objectPutCmd.MarkFlagFilename(fileFlag)
|
|
_ = objectPutCmd.MarkFlagRequired(fileFlag)
|
|
|
|
flags.String(commonflags.CIDFlag, "", commonflags.CIDFlagUsage)
|
|
|
|
flags.String("attributes", "", "User attributes in form of Key1=Value1,Key2=Value2")
|
|
flags.Bool("disable-filename", false, "Do not set well-known filename attribute")
|
|
flags.Bool("disable-timestamp", false, "Do not set well-known timestamp attribute")
|
|
flags.Uint64VarP(&putExpiredOn, commonflags.ExpireAt, "e", 0, "The last active epoch in the life of the object")
|
|
flags.Bool(noProgressFlag, false, "Do not show progress bar")
|
|
|
|
flags.String(notificationFlag, "", "Object notification in the form of *epoch*:*topic*; '-' topic means using default")
|
|
flags.Bool(binaryFlag, false, "Deserialize object structure from given file.")
|
|
|
|
flags.String(copiesNumberFlag, "", "Number of copies of the object to store within the RPC call")
|
|
}
|
|
|
|
func putObject(cmd *cobra.Command, _ []string) {
|
|
binary, _ := cmd.Flags().GetBool(binaryFlag)
|
|
cidVal, _ := cmd.Flags().GetString(commonflags.CIDFlag)
|
|
|
|
if !binary && cidVal == "" {
|
|
commonCmd.ExitOnErr(cmd, "", fmt.Errorf("required flag \"%s\" not set", commonflags.CIDFlag))
|
|
}
|
|
pk := key.GetOrGenerate(cmd)
|
|
|
|
var ownerID user.ID
|
|
var cnr cid.ID
|
|
|
|
filename, _ := cmd.Flags().GetString(fileFlag)
|
|
f, err := os.OpenFile(filename, os.O_RDONLY, os.ModePerm)
|
|
if err != nil {
|
|
commonCmd.ExitOnErr(cmd, "", fmt.Errorf("can't open file '%s': %w", filename, err))
|
|
}
|
|
var payloadReader io.Reader = f
|
|
obj := object.New()
|
|
|
|
if binary {
|
|
payloadReader, cnr, ownerID = readFilePayload(filename, cmd)
|
|
} else {
|
|
readCID(cmd, &cnr)
|
|
user.IDFromKey(&ownerID, pk.PublicKey)
|
|
}
|
|
|
|
attrs := getAllObjectAttributes(cmd)
|
|
|
|
obj.SetContainerID(cnr)
|
|
obj.SetOwnerID(&ownerID)
|
|
obj.SetAttributes(attrs...)
|
|
|
|
notificationInfo, err := parseObjectNotifications(cmd)
|
|
commonCmd.ExitOnErr(cmd, "can't parse object notification information: %w", err)
|
|
|
|
if notificationInfo != nil {
|
|
obj.SetNotification(*notificationInfo)
|
|
}
|
|
|
|
var prm internalclient.PutObjectPrm
|
|
ReadOrOpenSession(cmd, &prm, pk, cnr, nil)
|
|
Prepare(cmd, &prm)
|
|
prm.SetHeader(obj)
|
|
|
|
var p *pb.ProgressBar
|
|
|
|
noProgress, _ := cmd.Flags().GetBool(noProgressFlag)
|
|
if noProgress {
|
|
prm.SetPayloadReader(payloadReader)
|
|
} else {
|
|
if binary {
|
|
p = setBinaryPayloadReader(cmd, obj, &prm, payloadReader)
|
|
} else {
|
|
p = setFilePayloadReader(cmd, f, &prm)
|
|
}
|
|
}
|
|
|
|
copyNum, err := cmd.Flags().GetString(copiesNumberFlag)
|
|
commonCmd.ExitOnErr(cmd, "can't parse object copies numbers information: %w", err)
|
|
if len(copyNum) > 0 {
|
|
var cn []uint32
|
|
for _, num := range strings.Split(copyNum, ",") {
|
|
val, err := strconv.ParseUint(num, 10, 32)
|
|
commonCmd.ExitOnErr(cmd, "can't parse object copies numbers information: %w", err)
|
|
cn = append(cn, uint32(val))
|
|
}
|
|
prm.SetCopiesNumberByVectors(cn)
|
|
}
|
|
|
|
res, err := internalclient.PutObject(cmd.Context(), prm)
|
|
if p != nil {
|
|
p.Finish()
|
|
}
|
|
commonCmd.ExitOnErr(cmd, "rpc error: %w", err)
|
|
|
|
cmd.Printf("[%s] Object successfully stored\n", filename)
|
|
cmd.Printf(" OID: %s\n CID: %s\n", res.ID(), cnr)
|
|
}
|
|
|
|
func readFilePayload(filename string, cmd *cobra.Command) (io.Reader, cid.ID, user.ID) {
|
|
buf, err := os.ReadFile(filename)
|
|
commonCmd.ExitOnErr(cmd, "unable to read given file: %w", err)
|
|
objTemp := object.New()
|
|
// TODO(@acid-ant): #1932 Use streams to marshal/unmarshal payload
|
|
commonCmd.ExitOnErr(cmd, "can't unmarshal object from given file: %w", objTemp.Unmarshal(buf))
|
|
payloadReader := bytes.NewReader(objTemp.Payload())
|
|
cnr, _ := objTemp.ContainerID()
|
|
ownerID := *objTemp.OwnerID()
|
|
return payloadReader, cnr, ownerID
|
|
}
|
|
|
|
func setFilePayloadReader(cmd *cobra.Command, f *os.File, prm *internalclient.PutObjectPrm) *pb.ProgressBar {
|
|
fi, err := f.Stat()
|
|
if err != nil {
|
|
cmd.PrintErrf("Failed to get file size, progress bar is disabled: %v\n", err)
|
|
prm.SetPayloadReader(f)
|
|
return nil
|
|
}
|
|
p := pb.New64(fi.Size())
|
|
p.Output = cmd.OutOrStdout()
|
|
prm.SetPayloadReader(p.NewProxyReader(f))
|
|
prm.SetHeaderCallback(func(o *object.Object) { p.Start() })
|
|
return p
|
|
}
|
|
|
|
func setBinaryPayloadReader(cmd *cobra.Command, obj *object.Object, prm *internalclient.PutObjectPrm, payloadReader io.Reader) *pb.ProgressBar {
|
|
p := pb.New(len(obj.Payload()))
|
|
p.Output = cmd.OutOrStdout()
|
|
prm.SetPayloadReader(p.NewProxyReader(payloadReader))
|
|
prm.SetHeaderCallback(func(o *object.Object) { p.Start() })
|
|
return p
|
|
}
|
|
|
|
func getAllObjectAttributes(cmd *cobra.Command) []object.Attribute {
|
|
attrs, err := parseObjectAttrs(cmd)
|
|
commonCmd.ExitOnErr(cmd, "can't parse object attributes: %w", err)
|
|
|
|
expiresOn, _ := cmd.Flags().GetUint64(commonflags.ExpireAt)
|
|
if expiresOn > 0 {
|
|
var expAttrFound bool
|
|
expAttrValue := strconv.FormatUint(expiresOn, 10)
|
|
|
|
for i := range attrs {
|
|
if attrs[i].Key() == objectV2.SysAttributeExpEpoch {
|
|
attrs[i].SetValue(expAttrValue)
|
|
expAttrFound = true
|
|
break
|
|
}
|
|
}
|
|
|
|
if !expAttrFound {
|
|
index := len(attrs)
|
|
attrs = append(attrs, object.Attribute{})
|
|
attrs[index].SetKey(objectV2.SysAttributeExpEpoch)
|
|
attrs[index].SetValue(expAttrValue)
|
|
}
|
|
}
|
|
return attrs
|
|
}
|
|
|
|
func parseObjectAttrs(cmd *cobra.Command) ([]object.Attribute, error) {
|
|
var rawAttrs []string
|
|
|
|
raw := cmd.Flag("attributes").Value.String()
|
|
if len(raw) != 0 {
|
|
rawAttrs = strings.Split(raw, ",")
|
|
}
|
|
|
|
attrs := make([]object.Attribute, len(rawAttrs), len(rawAttrs)+2) // name + timestamp attributes
|
|
for i := range rawAttrs {
|
|
k, v, found := strings.Cut(rawAttrs[i], "=")
|
|
if !found {
|
|
return nil, fmt.Errorf("invalid attribute format: %s", rawAttrs[i])
|
|
}
|
|
attrs[i].SetKey(k)
|
|
attrs[i].SetValue(v)
|
|
}
|
|
|
|
disableFilename, _ := cmd.Flags().GetBool("disable-filename")
|
|
if !disableFilename {
|
|
filename := filepath.Base(cmd.Flag(fileFlag).Value.String())
|
|
index := len(attrs)
|
|
attrs = append(attrs, object.Attribute{})
|
|
attrs[index].SetKey(object.AttributeFileName)
|
|
attrs[index].SetValue(filename)
|
|
}
|
|
|
|
disableTime, _ := cmd.Flags().GetBool("disable-timestamp")
|
|
if !disableTime {
|
|
index := len(attrs)
|
|
attrs = append(attrs, object.Attribute{})
|
|
attrs[index].SetKey(object.AttributeTimestamp)
|
|
attrs[index].SetValue(strconv.FormatInt(time.Now().Unix(), 10))
|
|
}
|
|
|
|
return attrs, nil
|
|
}
|
|
|
|
func parseObjectNotifications(cmd *cobra.Command) (*object.NotificationInfo, error) {
|
|
const (
|
|
separator = ":"
|
|
useDefaultTopic = "-"
|
|
)
|
|
|
|
raw := cmd.Flag(notificationFlag).Value.String()
|
|
if raw == "" {
|
|
return nil, nil
|
|
}
|
|
|
|
before, after, found := strings.Cut(raw, separator)
|
|
if !found {
|
|
return nil, fmt.Errorf("notification must be in the form of: *epoch*%s*topic*, got %s", separator, raw)
|
|
}
|
|
|
|
ni := new(object.NotificationInfo)
|
|
|
|
epoch, err := strconv.ParseUint(before, 10, 64)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("could not parse notification epoch %s: %w", before, err)
|
|
}
|
|
|
|
ni.SetEpoch(epoch)
|
|
|
|
if after == "" {
|
|
return nil, fmt.Errorf("incorrect empty topic: use %s to force using default topic", useDefaultTopic)
|
|
}
|
|
|
|
if after != useDefaultTopic {
|
|
ni.SetTopic(after)
|
|
}
|
|
|
|
return ni, nil
|
|
}
|