forked from TrueCloudLab/frostfs-node
[#175] adm: pipeline container iteration
Do not accumulate everything in memory. Also, CLI should be responsive. Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
This commit is contained in:
parent
e85e5382e4
commit
49cc23e03c
1 changed files with 77 additions and 57 deletions
|
@ -41,28 +41,28 @@ func getContainerContractHash(cmd *cobra.Command, inv *invoker.Invoker, c Client
|
||||||
return ch, nil
|
return ch, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func getContainersList(inv *invoker.Invoker, ch util.Uint160) ([][]byte, error) {
|
func iterateContainerList(inv *invoker.Invoker, ch util.Uint160, f func([]byte) error) error {
|
||||||
sid, r, err := unwrap.SessionIterator(inv.Call(ch, "containersOf", ""))
|
sid, r, err := unwrap.SessionIterator(inv.Call(ch, "containersOf", ""))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("%w: %v", errInvalidContainerResponse, err)
|
return fmt.Errorf("%w: %v", errInvalidContainerResponse, err)
|
||||||
}
|
}
|
||||||
// Nothing bad, except live session on the server, do not report to the user.
|
// Nothing bad, except live session on the server, do not report to the user.
|
||||||
defer func() { _ = inv.TerminateSession(sid) }()
|
defer func() { _ = inv.TerminateSession(sid) }()
|
||||||
|
|
||||||
var lst [][]byte
|
|
||||||
|
|
||||||
items, err := inv.TraverseIterator(sid, &r, 0)
|
items, err := inv.TraverseIterator(sid, &r, 0)
|
||||||
for err == nil && len(items) != 0 {
|
for err == nil && len(items) != 0 {
|
||||||
for j := range items {
|
for j := range items {
|
||||||
b, err := items[j].TryBytes()
|
b, err := items[j].TryBytes()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("%w: %v", errInvalidContainerResponse, err)
|
return fmt.Errorf("%w: %v", errInvalidContainerResponse, err)
|
||||||
|
}
|
||||||
|
if err := f(b); err != nil {
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
lst = append(lst, b)
|
|
||||||
}
|
}
|
||||||
items, err = inv.TraverseIterator(sid, &r, 0)
|
items, err = inv.TraverseIterator(sid, &r, 0)
|
||||||
}
|
}
|
||||||
return lst, err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func dumpContainers(cmd *cobra.Command, _ []string) error {
|
func dumpContainers(cmd *cobra.Command, _ []string) error {
|
||||||
|
@ -83,56 +83,81 @@ func dumpContainers(cmd *cobra.Command, _ []string) error {
|
||||||
return fmt.Errorf("unable to get contaract hash: %w", err)
|
return fmt.Errorf("unable to get contaract hash: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
cids, err := getContainersList(inv, ch)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("%w: %v", errInvalidContainerResponse, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
isOK, err := getCIDFilterFunc(cmd)
|
isOK, err := getCIDFilterFunc(cmd)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
var containers []*Container
|
f, err := os.OpenFile(filename, os.O_RDWR|os.O_TRUNC|os.O_CREATE, 0o660)
|
||||||
bw := io.NewBufBinWriter()
|
if err != nil {
|
||||||
for _, id := range cids {
|
return err
|
||||||
if !isOK(id) {
|
|
||||||
continue
|
|
||||||
}
|
}
|
||||||
|
defer f.Close()
|
||||||
|
|
||||||
|
_, err = f.Write([]byte{'['})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
written := 0
|
||||||
|
enc := json.NewEncoder(f)
|
||||||
|
bw := io.NewBufBinWriter()
|
||||||
|
iterErr := iterateContainerList(inv, ch, func(id []byte) error {
|
||||||
|
if !isOK(id) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
cnt, err := dumpSingleContainer(bw, ch, inv, id)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Writing directly to the file is ok, because json.Encoder does no internal buffering.
|
||||||
|
if written != 0 {
|
||||||
|
_, err = f.Write([]byte{','})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
written++
|
||||||
|
return enc.Encode(cnt)
|
||||||
|
})
|
||||||
|
if iterErr != nil {
|
||||||
|
return iterErr
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = f.Write([]byte{']'})
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func dumpSingleContainer(bw *io.BufBinWriter, ch util.Uint160, inv *invoker.Invoker, id []byte) (*Container, error) {
|
||||||
bw.Reset()
|
bw.Reset()
|
||||||
emit.AppCall(bw.BinWriter, ch, "get", callflag.All, id)
|
emit.AppCall(bw.BinWriter, ch, "get", callflag.All, id)
|
||||||
emit.AppCall(bw.BinWriter, ch, "eACL", callflag.All, id)
|
emit.AppCall(bw.BinWriter, ch, "eACL", callflag.All, id)
|
||||||
res, err := inv.Run(bw.Bytes())
|
res, err := inv.Run(bw.Bytes())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("can't get container info: %w", err)
|
return nil, fmt.Errorf("can't get container info: %w", err)
|
||||||
}
|
}
|
||||||
if len(res.Stack) != 2 {
|
if len(res.Stack) != 2 {
|
||||||
return fmt.Errorf("%w: expected 2 items on stack", errInvalidContainerResponse)
|
return nil, fmt.Errorf("%w: expected 2 items on stack", errInvalidContainerResponse)
|
||||||
}
|
}
|
||||||
|
|
||||||
cnt := new(Container)
|
cnt := new(Container)
|
||||||
err = cnt.FromStackItem(res.Stack[0])
|
err = cnt.FromStackItem(res.Stack[0])
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("%w: %v", errInvalidContainerResponse, err)
|
return nil, fmt.Errorf("%w: %v", errInvalidContainerResponse, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
ea := new(EACL)
|
ea := new(EACL)
|
||||||
err = ea.FromStackItem(res.Stack[1])
|
err = ea.FromStackItem(res.Stack[1])
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("%w: %v", errInvalidContainerResponse, err)
|
return nil, fmt.Errorf("%w: %v", errInvalidContainerResponse, err)
|
||||||
}
|
}
|
||||||
if len(ea.Value) != 0 {
|
if len(ea.Value) != 0 {
|
||||||
cnt.EACL = ea
|
cnt.EACL = ea
|
||||||
}
|
}
|
||||||
|
return cnt, nil
|
||||||
containers = append(containers, cnt)
|
|
||||||
}
|
|
||||||
|
|
||||||
out, err := json.Marshal(containers)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
return os.WriteFile(filename, out, 0o660)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func listContainers(cmd *cobra.Command, _ []string) error {
|
func listContainers(cmd *cobra.Command, _ []string) error {
|
||||||
|
@ -148,20 +173,15 @@ func listContainers(cmd *cobra.Command, _ []string) error {
|
||||||
return fmt.Errorf("unable to get contaract hash: %w", err)
|
return fmt.Errorf("unable to get contaract hash: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
cids, err := getContainersList(inv, ch)
|
return iterateContainerList(inv, ch, func(id []byte) error {
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("%w: %v", errInvalidContainerResponse, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, id := range cids {
|
|
||||||
var idCnr cid.ID
|
var idCnr cid.ID
|
||||||
err = idCnr.Decode(id)
|
err = idCnr.Decode(id)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("unable to decode container id: %w", err)
|
return fmt.Errorf("unable to decode container id: %w", err)
|
||||||
}
|
}
|
||||||
cmd.Println(idCnr)
|
cmd.Println(idCnr)
|
||||||
}
|
|
||||||
return nil
|
return nil
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func restoreContainers(cmd *cobra.Command, _ []string) error {
|
func restoreContainers(cmd *cobra.Command, _ []string) error {
|
||||||
|
|
Loading…
Reference in a new issue