From 49cc23e03c8da451fe67f67509183441d3ee740d Mon Sep 17 00:00:00 2001 From: Evgenii Stratonikov Date: Mon, 3 Apr 2023 14:37:05 +0300 Subject: [PATCH] [#175] adm: pipeline container iteration Do not accumulate everything in memory. Also, CLI should be responsive. Signed-off-by: Evgenii Stratonikov --- .../internal/modules/morph/container.go | 134 ++++++++++-------- 1 file changed, 77 insertions(+), 57 deletions(-) diff --git a/cmd/frostfs-adm/internal/modules/morph/container.go b/cmd/frostfs-adm/internal/modules/morph/container.go index b5447fcf..687d7e84 100644 --- a/cmd/frostfs-adm/internal/modules/morph/container.go +++ b/cmd/frostfs-adm/internal/modules/morph/container.go @@ -41,28 +41,28 @@ func getContainerContractHash(cmd *cobra.Command, inv *invoker.Invoker, c Client return ch, nil } -func getContainersList(inv *invoker.Invoker, ch util.Uint160) ([][]byte, error) { +func iterateContainerList(inv *invoker.Invoker, ch util.Uint160, f func([]byte) error) error { sid, r, err := unwrap.SessionIterator(inv.Call(ch, "containersOf", "")) if err != nil { - return nil, fmt.Errorf("%w: %v", errInvalidContainerResponse, err) + return fmt.Errorf("%w: %v", errInvalidContainerResponse, err) } // Nothing bad, except live session on the server, do not report to the user. defer func() { _ = inv.TerminateSession(sid) }() - var lst [][]byte - items, err := inv.TraverseIterator(sid, &r, 0) for err == nil && len(items) != 0 { for j := range items { b, err := items[j].TryBytes() if err != nil { - return nil, fmt.Errorf("%w: %v", errInvalidContainerResponse, err) + return fmt.Errorf("%w: %v", errInvalidContainerResponse, err) + } + if err := f(b); err != nil { + return err } - lst = append(lst, b) } items, err = inv.TraverseIterator(sid, &r, 0) } - return lst, err + return err } func dumpContainers(cmd *cobra.Command, _ []string) error { @@ -83,56 +83,81 @@ func dumpContainers(cmd *cobra.Command, _ []string) error { return fmt.Errorf("unable to get contaract hash: %w", err) } - cids, err := getContainersList(inv, ch) - if err != nil { - return fmt.Errorf("%w: %v", errInvalidContainerResponse, err) - } - isOK, err := getCIDFilterFunc(cmd) if err != nil { return err } - var containers []*Container - bw := io.NewBufBinWriter() - for _, id := range cids { - if !isOK(id) { - continue - } - bw.Reset() - emit.AppCall(bw.BinWriter, ch, "get", callflag.All, id) - emit.AppCall(bw.BinWriter, ch, "eACL", callflag.All, id) - res, err := inv.Run(bw.Bytes()) - if err != nil { - return fmt.Errorf("can't get container info: %w", err) - } - if len(res.Stack) != 2 { - return fmt.Errorf("%w: expected 2 items on stack", errInvalidContainerResponse) - } - - cnt := new(Container) - err = cnt.FromStackItem(res.Stack[0]) - if err != nil { - return fmt.Errorf("%w: %v", errInvalidContainerResponse, err) - } - - ea := new(EACL) - err = ea.FromStackItem(res.Stack[1]) - if err != nil { - return fmt.Errorf("%w: %v", errInvalidContainerResponse, err) - } - if len(ea.Value) != 0 { - cnt.EACL = ea - } - - containers = append(containers, cnt) - } - - out, err := json.Marshal(containers) + f, err := os.OpenFile(filename, os.O_RDWR|os.O_TRUNC|os.O_CREATE, 0o660) if err != nil { return err } - return os.WriteFile(filename, out, 0o660) + defer f.Close() + + _, err = f.Write([]byte{'['}) + if err != nil { + return err + } + + written := 0 + enc := json.NewEncoder(f) + bw := io.NewBufBinWriter() + iterErr := iterateContainerList(inv, ch, func(id []byte) error { + if !isOK(id) { + return nil + } + + cnt, err := dumpSingleContainer(bw, ch, inv, id) + if err != nil { + return err + } + + // Writing directly to the file is ok, because json.Encoder does no internal buffering. + if written != 0 { + _, err = f.Write([]byte{','}) + if err != nil { + return err + } + } + + written++ + return enc.Encode(cnt) + }) + if iterErr != nil { + return iterErr + } + + _, err = f.Write([]byte{']'}) + return err +} + +func dumpSingleContainer(bw *io.BufBinWriter, ch util.Uint160, inv *invoker.Invoker, id []byte) (*Container, error) { + bw.Reset() + emit.AppCall(bw.BinWriter, ch, "get", callflag.All, id) + emit.AppCall(bw.BinWriter, ch, "eACL", callflag.All, id) + res, err := inv.Run(bw.Bytes()) + if err != nil { + return nil, fmt.Errorf("can't get container info: %w", err) + } + if len(res.Stack) != 2 { + return nil, fmt.Errorf("%w: expected 2 items on stack", errInvalidContainerResponse) + } + + cnt := new(Container) + err = cnt.FromStackItem(res.Stack[0]) + if err != nil { + return nil, fmt.Errorf("%w: %v", errInvalidContainerResponse, err) + } + + ea := new(EACL) + err = ea.FromStackItem(res.Stack[1]) + if err != nil { + return nil, fmt.Errorf("%w: %v", errInvalidContainerResponse, err) + } + if len(ea.Value) != 0 { + cnt.EACL = ea + } + return cnt, nil } func listContainers(cmd *cobra.Command, _ []string) error { @@ -148,20 +173,15 @@ func listContainers(cmd *cobra.Command, _ []string) error { return fmt.Errorf("unable to get contaract hash: %w", err) } - cids, err := getContainersList(inv, ch) - if err != nil { - return fmt.Errorf("%w: %v", errInvalidContainerResponse, err) - } - - for _, id := range cids { + return iterateContainerList(inv, ch, func(id []byte) error { var idCnr cid.ID err = idCnr.Decode(id) if err != nil { return fmt.Errorf("unable to decode container id: %w", err) } cmd.Println(idCnr) - } - return nil + return nil + }) } func restoreContainers(cmd *cobra.Command, _ []string) error {