2020-02-05 00:38:41 +00:00
package model
import (
2021-04-02 14:01:45 +00:00
"fmt"
2020-08-31 21:50:01 +00:00
"io"
2020-02-05 00:38:41 +00:00
"io/ioutil"
"math"
"os"
"path/filepath"
2021-04-02 14:01:45 +00:00
"regexp"
2020-02-05 00:38:41 +00:00
"sort"
log "github.com/sirupsen/logrus"
)
// WorkflowPlanner contains methods for creating plans
type WorkflowPlanner interface {
PlanEvent ( eventName string ) * Plan
PlanJob ( jobName string ) * Plan
GetEvents ( ) [ ] string
}
// Plan contains a list of stages to run in series
type Plan struct {
Stages [ ] * Stage
}
// Stage contains a list of runs to execute in parallel
type Stage struct {
Runs [ ] * Run
}
// Run represents a job from a workflow that needs to be run
type Run struct {
Workflow * Workflow
JobID string
}
2020-02-07 06:17:58 +00:00
func ( r * Run ) String ( ) string {
jobName := r . Job ( ) . Name
if jobName == "" {
jobName = r . JobID
}
2020-02-27 07:29:43 +00:00
return jobName
2020-02-07 06:17:58 +00:00
}
// Job returns the job for this Run
func ( r * Run ) Job ( ) * Job {
return r . Workflow . GetJob ( r . JobID )
}
2021-05-03 14:57:24 +00:00
type WorkflowFiles struct {
workflowFileInfo os . FileInfo
dirPath string
}
// NewWorkflowPlanner will load a specific workflow, all workflows from a directory or all workflows from a directory and its subdirectories
2021-05-06 13:30:12 +00:00
// nolint: gocyclo
2021-05-03 14:57:24 +00:00
func NewWorkflowPlanner ( path string , noWorkflowRecurse bool ) ( WorkflowPlanner , error ) {
path , err := filepath . Abs ( path )
if err != nil {
return nil , err
}
2020-05-27 03:29:50 +00:00
fi , err := os . Stat ( path )
if err != nil {
return nil , err
}
2021-05-03 14:57:24 +00:00
var workflows [ ] WorkflowFiles
2020-05-27 03:29:50 +00:00
if fi . IsDir ( ) {
log . Debugf ( "Loading workflows from '%s'" , path )
2021-05-03 14:57:24 +00:00
if noWorkflowRecurse {
files , err := ioutil . ReadDir ( path )
if err != nil {
return nil , err
}
for _ , v := range files {
workflows = append ( workflows , WorkflowFiles {
dirPath : path ,
workflowFileInfo : v ,
} )
}
} else {
log . Debug ( "Loading workflows recursively" )
if err := filepath . Walk ( path ,
func ( p string , f os . FileInfo , err error ) error {
if err != nil {
return err
}
if ! f . IsDir ( ) {
log . Debugf ( "Found workflow '%s' in '%s'" , f . Name ( ) , p )
workflows = append ( workflows , WorkflowFiles {
dirPath : filepath . Dir ( p ) ,
workflowFileInfo : f ,
} )
}
return nil
} ) ; err != nil {
return nil , err
}
}
2020-05-27 03:29:50 +00:00
} else {
log . Debugf ( "Loading workflow '%s'" , path )
2021-05-03 14:57:24 +00:00
dirname := filepath . Dir ( path )
workflows = append ( workflows , WorkflowFiles {
dirPath : dirname ,
workflowFileInfo : fi ,
} )
2020-05-27 03:29:50 +00:00
}
2020-02-05 00:38:41 +00:00
if err != nil {
return nil , err
}
wp := new ( workflowPlanner )
2021-05-03 14:57:24 +00:00
for _ , wf := range workflows {
ext := filepath . Ext ( wf . workflowFileInfo . Name ( ) )
2020-02-05 00:38:41 +00:00
if ext == ".yml" || ext == ".yaml" {
2021-05-03 14:57:24 +00:00
f , err := os . Open ( filepath . Join ( wf . dirPath , wf . workflowFileInfo . Name ( ) ) )
2020-02-05 00:38:41 +00:00
if err != nil {
return nil , err
}
2020-02-17 06:04:13 +00:00
log . Debugf ( "Reading workflow '%s'" , f . Name ( ) )
2020-02-05 00:38:41 +00:00
workflow , err := ReadWorkflow ( f )
if err != nil {
2022-06-10 21:16:42 +00:00
_ = f . Close ( )
2020-08-31 21:50:01 +00:00
if err == io . EOF {
2022-06-10 21:16:42 +00:00
return nil , fmt . Errorf ( "unable to read workflow '%s': file is empty: %w" , wf . workflowFileInfo . Name ( ) , err )
2020-08-31 21:50:01 +00:00
}
2022-09-06 20:41:43 +00:00
return nil , fmt . Errorf ( "workflow is not valid. '%s': %w" , wf . workflowFileInfo . Name ( ) , err )
2020-02-05 00:38:41 +00:00
}
2021-05-05 20:04:03 +00:00
_ , err = f . Seek ( 0 , 0 )
if err != nil {
2022-06-10 21:16:42 +00:00
_ = f . Close ( )
return nil , fmt . Errorf ( "error occurring when resetting io pointer in '%s': %w" , wf . workflowFileInfo . Name ( ) , err )
2021-05-05 20:04:03 +00:00
}
2021-05-03 14:57:24 +00:00
2021-08-30 15:38:03 +00:00
workflow . File = wf . workflowFileInfo . Name ( )
2020-02-07 06:17:58 +00:00
if workflow . Name == "" {
2021-05-03 14:57:24 +00:00
workflow . Name = wf . workflowFileInfo . Name ( )
2020-02-07 06:17:58 +00:00
}
2021-05-03 14:57:24 +00:00
2021-04-02 14:01:45 +00:00
jobNameRegex := regexp . MustCompile ( ` ^([[:alpha:]_][[:alnum:]_\-]*)$ ` )
for k := range workflow . Jobs {
if ok := jobNameRegex . MatchString ( k ) ; ! ok {
2022-09-06 20:41:43 +00:00
_ = f . Close ( )
2021-05-03 14:57:24 +00:00
return nil , fmt . Errorf ( "workflow is not valid. '%s': Job name '%s' is invalid. Names must start with a letter or '_' and contain only alphanumeric characters, '-', or '_'" , workflow . Name , k )
2021-04-02 14:01:45 +00:00
}
}
2021-05-03 14:57:24 +00:00
2020-02-05 00:38:41 +00:00
wp . workflows = append ( wp . workflows , workflow )
2022-06-10 21:16:42 +00:00
_ = f . Close ( )
2020-02-05 00:38:41 +00:00
}
}
return wp , nil
}
type workflowPlanner struct {
workflows [ ] * Workflow
}
// PlanEvent builds a new list of runs to execute in parallel for an event name
func ( wp * workflowPlanner ) PlanEvent ( eventName string ) * Plan {
plan := new ( Plan )
2020-10-09 05:28:01 +00:00
if len ( wp . workflows ) == 0 {
log . Debugf ( "no events found for workflow: %s" , eventName )
}
2020-02-05 00:38:41 +00:00
for _ , w := range wp . workflows {
2020-02-11 00:35:00 +00:00
for _ , e := range w . On ( ) {
if e == eventName {
plan . mergeStages ( createStages ( w , w . GetJobIDs ( ) ... ) )
}
2020-02-05 00:38:41 +00:00
}
}
return plan
}
// PlanJob builds a new run to execute in parallel for a job name
func ( wp * workflowPlanner ) PlanJob ( jobName string ) * Plan {
plan := new ( Plan )
2020-10-09 05:28:01 +00:00
if len ( wp . workflows ) == 0 {
log . Debugf ( "no jobs found for workflow: %s" , jobName )
}
2020-02-05 00:38:41 +00:00
for _ , w := range wp . workflows {
plan . mergeStages ( createStages ( w , jobName ) )
}
return plan
}
// GetEvents gets all the events in the workflows file
func ( wp * workflowPlanner ) GetEvents ( ) [ ] string {
events := make ( [ ] string , 0 )
for _ , w := range wp . workflows {
found := false
for _ , e := range events {
2020-02-11 00:35:00 +00:00
for _ , we := range w . On ( ) {
if e == we {
found = true
break
}
}
if found {
2020-02-05 00:38:41 +00:00
break
}
}
if ! found {
2020-02-11 00:35:00 +00:00
events = append ( events , w . On ( ) ... )
2020-02-05 00:38:41 +00:00
}
}
// sort the list based on depth of dependencies
sort . Slice ( events , func ( i , j int ) bool {
return events [ i ] < events [ j ]
} )
return events
}
2020-02-17 18:30:52 +00:00
// MaxRunNameLen determines the max name length of all jobs
func ( p * Plan ) MaxRunNameLen ( ) int {
maxRunNameLen := 0
for _ , stage := range p . Stages {
for _ , run := range stage . Runs {
runNameLen := len ( run . String ( ) )
if runNameLen > maxRunNameLen {
maxRunNameLen = runNameLen
}
}
}
return maxRunNameLen
}
2020-02-05 00:38:41 +00:00
// GetJobIDs will get all the job names in the stage
func ( s * Stage ) GetJobIDs ( ) [ ] string {
names := make ( [ ] string , 0 )
for _ , r := range s . Runs {
names = append ( names , r . JobID )
}
return names
}
// Merge stages with existing stages in plan
func ( p * Plan ) mergeStages ( stages [ ] * Stage ) {
newStages := make ( [ ] * Stage , int ( math . Max ( float64 ( len ( p . Stages ) ) , float64 ( len ( stages ) ) ) ) )
for i := 0 ; i < len ( newStages ) ; i ++ {
newStages [ i ] = new ( Stage )
if i >= len ( p . Stages ) {
2020-02-11 00:53:14 +00:00
newStages [ i ] . Runs = append ( newStages [ i ] . Runs , stages [ i ] . Runs ... )
2020-02-05 00:38:41 +00:00
} else if i >= len ( stages ) {
2020-02-11 00:53:14 +00:00
newStages [ i ] . Runs = append ( newStages [ i ] . Runs , p . Stages [ i ] . Runs ... )
2020-02-05 00:38:41 +00:00
} else {
2020-02-11 00:53:14 +00:00
newStages [ i ] . Runs = append ( newStages [ i ] . Runs , p . Stages [ i ] . Runs ... )
newStages [ i ] . Runs = append ( newStages [ i ] . Runs , stages [ i ] . Runs ... )
2020-02-05 00:38:41 +00:00
}
}
p . Stages = newStages
}
func createStages ( w * Workflow , jobIDs ... string ) [ ] * Stage {
// first, build a list of all the necessary jobs to run, and their dependencies
jobDependencies := make ( map [ string ] [ ] string )
for len ( jobIDs ) > 0 {
newJobIDs := make ( [ ] string , 0 )
for _ , jID := range jobIDs {
// make sure we haven't visited this job yet
if _ , ok := jobDependencies [ jID ] ; ! ok {
if job := w . GetJob ( jID ) ; job != nil {
2020-02-11 00:35:00 +00:00
jobDependencies [ jID ] = job . Needs ( )
newJobIDs = append ( newJobIDs , job . Needs ( ) ... )
2020-02-05 00:38:41 +00:00
}
}
}
jobIDs = newJobIDs
}
// next, build an execution graph
stages := make ( [ ] * Stage , 0 )
for len ( jobDependencies ) > 0 {
stage := new ( Stage )
for jID , jDeps := range jobDependencies {
// make sure all deps are in the graph already
if listInStages ( jDeps , stages ... ) {
stage . Runs = append ( stage . Runs , & Run {
Workflow : w ,
JobID : jID ,
} )
delete ( jobDependencies , jID )
}
}
if len ( stage . Runs ) == 0 {
log . Fatalf ( "Unable to build dependency graph!" )
}
stages = append ( stages , stage )
}
return stages
}
// return true iff all strings in srcList exist in at least one of the stages
func listInStages ( srcList [ ] string , stages ... * Stage ) bool {
for _ , src := range srcList {
found := false
for _ , stage := range stages {
for _ , search := range stage . GetJobIDs ( ) {
if src == search {
found = true
}
}
}
if ! found {
return false
}
}
return true
}