fs: merge command startup into CommandReader

This commit is contained in:
Michael Eischer 2023-10-01 15:25:48 +02:00
parent 7d879705ad
commit 317144c1d6
2 changed files with 39 additions and 50 deletions

View file

@ -7,7 +7,6 @@ import (
"fmt" "fmt"
"io" "io"
"os" "os"
"os/exec"
"path" "path"
"path/filepath" "path/filepath"
"runtime" "runtime"
@ -601,9 +600,9 @@ func runBackup(ctx context.Context, opts BackupOptions, gopts GlobalOptions, ter
progressPrinter.V("read data from stdin") progressPrinter.V("read data from stdin")
} }
filename := path.Join("/", opts.StdinFilename) filename := path.Join("/", opts.StdinFilename)
var closer io.ReadCloser = os.Stdin var source io.ReadCloser = os.Stdin
if opts.StdinCommand { if opts.StdinCommand {
closer, err = prepareStdinCommand(ctx, args) source, err = fs.NewCommandReader(ctx, args, globalOptions.stderr)
if err != nil { if err != nil {
return err return err
} }
@ -612,7 +611,7 @@ func runBackup(ctx context.Context, opts BackupOptions, gopts GlobalOptions, ter
ModTime: timeStamp, ModTime: timeStamp,
Name: filename, Name: filename,
Mode: 0644, Mode: 0644,
ReadCloser: closer, ReadCloser: source,
} }
targets = []string{filename} targets = []string{filename}
} }
@ -701,32 +700,3 @@ func runBackup(ctx context.Context, opts BackupOptions, gopts GlobalOptions, ter
// Return error if any // Return error if any
return werr return werr
} }
func prepareStdinCommand(ctx context.Context, args []string) (io.ReadCloser, error) {
// Prepare command and stdout. These variables will be assigned to the
// io.ReadCloser that is used by the archiver to read data, so that the
// Close() function waits for the program to finish. See
// fs.ReadCloserCommand.
command := exec.CommandContext(ctx, args[0], args[1:]...)
stdout, err := command.StdoutPipe()
if err != nil {
return nil, errors.Wrap(err, "command.StdoutPipe")
}
// Use a Go routine to handle the stderr to avoid deadlocks
stderr, err := command.StderrPipe()
if err != nil {
return nil, errors.Wrap(err, "command.StderrPipe")
}
go func() {
sc := bufio.NewScanner(stderr)
for sc.Scan() {
_, _ = fmt.Fprintf(os.Stderr, "subprocess %v: %v\n", command.Args[0], sc.Text())
}
}()
if err := command.Start(); err != nil {
return nil, errors.Wrap(err, "command.Start")
}
return fs.NewCommandReader(command, stdout), nil
}

View file

@ -1,22 +1,24 @@
package fs package fs
import ( import (
"bufio"
"context"
"fmt"
"io" "io"
"os/exec" "os/exec"
"github.com/restic/restic/internal/errors" "github.com/restic/restic/internal/errors"
) )
// CommandReader wraps an exec.Cmd and its standard output to provide an // CommandReader wrap a command such that its standard output can be read using
// io.ReadCloser that waits for the command to terminate on Close(), reporting // a io.ReadCloser. Close() waits for the command to terminate, reporting
// any error in the command.Wait() function back to the Close() caller. // any error back to the caller.
type CommandReader struct { type CommandReader struct {
cmd *exec.Cmd cmd *exec.Cmd
stdout io.ReadCloser stdout io.ReadCloser
// We should call exec.Wait() once. waitHandled is taking care of storing // cmd.Wait() must only be called once. Prevent duplicate executions in
// whether we already called that function in Read() to avoid calling it // Read() and Close().
// again in Close().
waitHandled bool waitHandled bool
// alreadyClosedReadErr is the error that we should return if we try to // alreadyClosedReadErr is the error that we should return if we try to
@ -26,11 +28,34 @@ type CommandReader struct {
alreadyClosedReadErr error alreadyClosedReadErr error
} }
func NewCommandReader(cmd *exec.Cmd, stdout io.ReadCloser) *CommandReader { func NewCommandReader(ctx context.Context, args []string, logOutput io.Writer) (*CommandReader, error) {
return &CommandReader{ // Prepare command and stdout
cmd: cmd, command := exec.CommandContext(ctx, args[0], args[1:]...)
stdout: stdout, stdout, err := command.StdoutPipe()
if err != nil {
return nil, fmt.Errorf("failed to setup stdout pipe: %w", err)
} }
// Use a Go routine to handle the stderr to avoid deadlocks
stderr, err := command.StderrPipe()
if err != nil {
return nil, fmt.Errorf("failed to setup stderr pipe: %w", err)
}
go func() {
sc := bufio.NewScanner(stderr)
for sc.Scan() {
_, _ = fmt.Fprintf(logOutput, "subprocess %v: %v\n", command.Args[0], sc.Text())
}
}()
if err := command.Start(); err != nil {
return nil, fmt.Errorf("failed to start command: %w", err)
}
return &CommandReader{
cmd: command,
stdout: stdout,
}, nil
} }
// Read populate the array with data from the process stdout. // Read populate the array with data from the process stdout.
@ -57,13 +82,7 @@ func (fp *CommandReader) Read(p []byte) (int, error) {
func (fp *CommandReader) wait() error { func (fp *CommandReader) wait() error {
err := fp.cmd.Wait() err := fp.cmd.Wait()
if err != nil { if err != nil {
// If we have information about the exit code, let's use it in the // Use a fatal error to abort the snapshot.
// error message. Otherwise, send the error message along.
// In any case, use a fatal error to abort the snapshot.
var err2 *exec.ExitError
if errors.As(err, &err2) {
return errors.Fatalf("command terminated with exit code %d", err2.ExitCode())
}
return errors.Fatal(err.Error()) return errors.Fatal(err.Error())
} }
return nil return nil