feat: handle context cancelation during docker exec (#1170)
* feat: handle context cancelation during docker exec To allow interrupting docker exec (which could be long running) we process the log output in a go routine and handle context cancelation as well as command result. In case of context cancelation a CTRL+C is written into the docker container. This should be enough to terminate the running command. To make sure we do not get stuck during cleanup, we do set the cleanup contexts with a timeout of 5 minutes Co-authored-by: Björn Brauer <bjoern.brauer@new-work.se> Co-authored-by: Philipp Hinrichsen <philipp.hinrichsen@new-work.se> * feat: handle SIGTERM signal and abort run * test: on context cancel, abort running command This test makes sure that whenever the act Context was canceled, the currently running docker exec is sent a 0x03 (ctrl+c). Co-authored-by: Björn Brauer <zaubernerd@zaubernerd.de> * test: make sure the exec funcction handles command exit code This test makes sure that the exec function does handle docker command error results Co-authored-by: Björn Brauer <bjoern.brauer@new-work.se> Co-authored-by: Philipp Hinrichsen <philipp.hinrichsen@new-work.se> Co-authored-by: Björn Brauer <zaubernerd@zaubernerd.de>
This commit is contained in:
parent
943a0e6eea
commit
4ef50eeae7
5 changed files with 190 additions and 22 deletions
3
main.go
3
main.go
|
@ -4,6 +4,7 @@ import (
|
|||
"context"
|
||||
"os"
|
||||
"os/signal"
|
||||
"syscall"
|
||||
|
||||
"github.com/nektos/act/cmd"
|
||||
)
|
||||
|
@ -16,7 +17,7 @@ func main() {
|
|||
|
||||
// trap Ctrl+C and call cancel on the context
|
||||
c := make(chan os.Signal, 1)
|
||||
signal.Notify(c, os.Interrupt)
|
||||
signal.Notify(c, os.Interrupt, syscall.SIGTERM)
|
||||
defer func() {
|
||||
signal.Stop(c)
|
||||
cancel()
|
||||
|
|
|
@ -89,7 +89,7 @@ func NewContainer(input *NewContainerInput) Container {
|
|||
|
||||
// supportsContainerImagePlatform returns true if the underlying Docker server
|
||||
// API version is 1.41 and beyond
|
||||
func supportsContainerImagePlatform(ctx context.Context, cli *client.Client) bool {
|
||||
func supportsContainerImagePlatform(ctx context.Context, cli client.APIClient) bool {
|
||||
logger := common.Logger(ctx)
|
||||
ver, err := cli.ServerVersion(ctx)
|
||||
if err != nil {
|
||||
|
@ -210,12 +210,12 @@ func (cr *containerReference) ReplaceLogWriter(stdout io.Writer, stderr io.Write
|
|||
}
|
||||
|
||||
type containerReference struct {
|
||||
cli *client.Client
|
||||
cli client.APIClient
|
||||
id string
|
||||
input *NewContainerInput
|
||||
}
|
||||
|
||||
func GetDockerClient(ctx context.Context) (cli *client.Client, err error) {
|
||||
func GetDockerClient(ctx context.Context) (cli client.APIClient, err error) {
|
||||
// TODO: this should maybe need to be a global option, not hidden in here?
|
||||
// though i'm not sure how that works out when there's another Executor :D
|
||||
// I really would like something that works on OSX native for eg
|
||||
|
@ -244,7 +244,7 @@ func GetDockerClient(ctx context.Context) (cli *client.Client, err error) {
|
|||
}
|
||||
|
||||
func GetHostInfo(ctx context.Context) (info types.Info, err error) {
|
||||
var cli *client.Client
|
||||
var cli client.APIClient
|
||||
cli, err = GetDockerClient(ctx)
|
||||
if err != nil {
|
||||
return info, err
|
||||
|
@ -558,23 +558,9 @@ func (cr *containerReference) exec(cmd []string, env map[string]string, user, wo
|
|||
}
|
||||
defer resp.Close()
|
||||
|
||||
var outWriter io.Writer
|
||||
outWriter = cr.input.Stdout
|
||||
if outWriter == nil {
|
||||
outWriter = os.Stdout
|
||||
}
|
||||
errWriter := cr.input.Stderr
|
||||
if errWriter == nil {
|
||||
errWriter = os.Stderr
|
||||
}
|
||||
|
||||
if !isTerminal || os.Getenv("NORAW") != "" {
|
||||
_, err = stdcopy.StdCopy(outWriter, errWriter, resp.Reader)
|
||||
} else {
|
||||
_, err = io.Copy(outWriter, resp.Reader)
|
||||
}
|
||||
err = cr.waitForCommand(ctx, isTerminal, resp, idResp, user, workdir)
|
||||
if err != nil {
|
||||
logger.Error(err)
|
||||
return err
|
||||
}
|
||||
|
||||
inspectResp, err := cr.cli.ContainerExecInspect(ctx, idResp.ID)
|
||||
|
@ -590,6 +576,51 @@ func (cr *containerReference) exec(cmd []string, env map[string]string, user, wo
|
|||
}
|
||||
}
|
||||
|
||||
func (cr *containerReference) waitForCommand(ctx context.Context, isTerminal bool, resp types.HijackedResponse, idResp types.IDResponse, user string, workdir string) error {
|
||||
logger := common.Logger(ctx)
|
||||
|
||||
cmdResponse := make(chan error)
|
||||
|
||||
go func() {
|
||||
var outWriter io.Writer
|
||||
outWriter = cr.input.Stdout
|
||||
if outWriter == nil {
|
||||
outWriter = os.Stdout
|
||||
}
|
||||
errWriter := cr.input.Stderr
|
||||
if errWriter == nil {
|
||||
errWriter = os.Stderr
|
||||
}
|
||||
|
||||
var err error
|
||||
if !isTerminal || os.Getenv("NORAW") != "" {
|
||||
_, err = stdcopy.StdCopy(outWriter, errWriter, resp.Reader)
|
||||
} else {
|
||||
_, err = io.Copy(outWriter, resp.Reader)
|
||||
}
|
||||
cmdResponse <- err
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
// send ctrl + c
|
||||
_, err := resp.Conn.Write([]byte{3})
|
||||
if err != nil {
|
||||
logger.Warnf("Failed to send CTRL+C: %+s", err)
|
||||
}
|
||||
|
||||
// we return the context canceled error to prevent other steps
|
||||
// from executing
|
||||
return ctx.Err()
|
||||
case err := <-cmdResponse:
|
||||
if err != nil {
|
||||
logger.Error(err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func (cr *containerReference) copyDir(dstPath string, srcPath string, useGitIgnore bool) common.Executor {
|
||||
return func(ctx context.Context) error {
|
||||
logger := common.Logger(ctx)
|
||||
|
|
|
@ -1,10 +1,18 @@
|
|||
package container
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"context"
|
||||
"io"
|
||||
"net"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/docker/docker/api/types"
|
||||
"github.com/docker/docker/client"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/mock"
|
||||
)
|
||||
|
||||
func TestDocker(t *testing.T) {
|
||||
|
@ -45,3 +53,113 @@ func TestDocker(t *testing.T) {
|
|||
"CONFLICT_VAR": "I_EXIST_IN_MULTIPLE_PLACES",
|
||||
}, env)
|
||||
}
|
||||
|
||||
type mockDockerClient struct {
|
||||
client.APIClient
|
||||
mock.Mock
|
||||
}
|
||||
|
||||
func (m *mockDockerClient) ContainerExecCreate(ctx context.Context, id string, opts types.ExecConfig) (types.IDResponse, error) {
|
||||
args := m.Called(ctx, id, opts)
|
||||
return args.Get(0).(types.IDResponse), args.Error(1)
|
||||
}
|
||||
|
||||
func (m *mockDockerClient) ContainerExecAttach(ctx context.Context, id string, opts types.ExecStartCheck) (types.HijackedResponse, error) {
|
||||
args := m.Called(ctx, id, opts)
|
||||
return args.Get(0).(types.HijackedResponse), args.Error(1)
|
||||
}
|
||||
|
||||
func (m *mockDockerClient) ContainerExecInspect(ctx context.Context, execID string) (types.ContainerExecInspect, error) {
|
||||
args := m.Called(ctx, execID)
|
||||
return args.Get(0).(types.ContainerExecInspect), args.Error(1)
|
||||
}
|
||||
|
||||
type endlessReader struct {
|
||||
io.Reader
|
||||
}
|
||||
|
||||
func (r endlessReader) Read(p []byte) (n int, err error) {
|
||||
return 1, nil
|
||||
}
|
||||
|
||||
type mockConn struct {
|
||||
net.Conn
|
||||
mock.Mock
|
||||
}
|
||||
|
||||
func (m *mockConn) Write(b []byte) (n int, err error) {
|
||||
args := m.Called(b)
|
||||
return args.Int(0), args.Error(1)
|
||||
}
|
||||
|
||||
func (m *mockConn) Close() (err error) {
|
||||
return nil
|
||||
}
|
||||
|
||||
func TestDockerExecAbort(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
|
||||
conn := &mockConn{}
|
||||
conn.On("Write", mock.AnythingOfType("[]uint8")).Return(1, nil)
|
||||
|
||||
client := &mockDockerClient{}
|
||||
client.On("ContainerExecCreate", ctx, "123", mock.AnythingOfType("types.ExecConfig")).Return(types.IDResponse{ID: "id"}, nil)
|
||||
client.On("ContainerExecAttach", ctx, "id", mock.AnythingOfType("types.ExecStartCheck")).Return(types.HijackedResponse{
|
||||
Conn: conn,
|
||||
Reader: bufio.NewReader(endlessReader{}),
|
||||
}, nil)
|
||||
|
||||
cr := &containerReference{
|
||||
id: "123",
|
||||
cli: client,
|
||||
input: &NewContainerInput{
|
||||
Image: "image",
|
||||
},
|
||||
}
|
||||
|
||||
channel := make(chan error)
|
||||
|
||||
go func() {
|
||||
channel <- cr.exec([]string{""}, map[string]string{}, "user", "workdir")(ctx)
|
||||
}()
|
||||
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
|
||||
cancel()
|
||||
|
||||
err := <-channel
|
||||
assert.ErrorIs(t, err, context.Canceled)
|
||||
|
||||
conn.AssertExpectations(t)
|
||||
client.AssertExpectations(t)
|
||||
}
|
||||
|
||||
func TestDockerExecFailure(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
conn := &mockConn{}
|
||||
|
||||
client := &mockDockerClient{}
|
||||
client.On("ContainerExecCreate", ctx, "123", mock.AnythingOfType("types.ExecConfig")).Return(types.IDResponse{ID: "id"}, nil)
|
||||
client.On("ContainerExecAttach", ctx, "id", mock.AnythingOfType("types.ExecStartCheck")).Return(types.HijackedResponse{
|
||||
Conn: conn,
|
||||
Reader: bufio.NewReader(strings.NewReader("output")),
|
||||
}, nil)
|
||||
client.On("ContainerExecInspect", ctx, "id").Return(types.ContainerExecInspect{
|
||||
ExitCode: 1,
|
||||
}, nil)
|
||||
|
||||
cr := &containerReference{
|
||||
id: "123",
|
||||
cli: client,
|
||||
input: &NewContainerInput{
|
||||
Image: "image",
|
||||
},
|
||||
}
|
||||
|
||||
err := cr.exec([]string{""}, map[string]string{}, "user", "workdir")(ctx)
|
||||
assert.Error(t, err, "exit with `FAILURE`: 1")
|
||||
|
||||
conn.AssertExpectations(t)
|
||||
client.AssertExpectations(t)
|
||||
}
|
||||
|
|
|
@ -3,6 +3,7 @@ package runner
|
|||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/nektos/act/pkg/common"
|
||||
"github.com/nektos/act/pkg/model"
|
||||
|
@ -100,7 +101,16 @@ func newJobExecutor(info jobInfo, sf stepFactory, rc *RunContext) common.Executo
|
|||
pipeline = append(pipeline, steps...)
|
||||
|
||||
return common.NewPipelineExecutor(pipeline...).
|
||||
Finally(postExecutor).
|
||||
Finally(func(ctx context.Context) error {
|
||||
var cancel context.CancelFunc
|
||||
if ctx.Err() == context.Canceled {
|
||||
// in case of an aborted run, we still should execute the
|
||||
// post steps to allow cleanup.
|
||||
ctx, cancel = context.WithTimeout(context.Background(), 5*time.Minute)
|
||||
defer cancel()
|
||||
}
|
||||
return postExecutor(ctx)
|
||||
}).
|
||||
Finally(info.interpolateOutputs()).
|
||||
Finally(info.closeContainer())
|
||||
}
|
||||
|
|
|
@ -8,6 +8,7 @@ import (
|
|||
"regexp"
|
||||
"runtime"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
log "github.com/sirupsen/logrus"
|
||||
|
||||
|
@ -169,7 +170,14 @@ func (runner *runnerImpl) NewPlanExecutor(plan *model.Plan) common.Executor {
|
|||
}
|
||||
|
||||
if runner.config.AutoRemove && isLastRunningContainer(s, r) {
|
||||
var cancel context.CancelFunc
|
||||
if ctx.Err() == context.Canceled {
|
||||
ctx, cancel = context.WithTimeout(context.Background(), 5*time.Minute)
|
||||
defer cancel()
|
||||
}
|
||||
|
||||
log.Infof("Cleaning up container for job %s", rc.JobName)
|
||||
|
||||
if err := rc.stopJobContainer()(ctx); err != nil {
|
||||
log.Errorf("Error while cleaning container: %v", err)
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue