diff --git a/pkg/common/executor.go b/pkg/common/executor.go index 697a317..9707bec 100644 --- a/pkg/common/executor.go +++ b/pkg/common/executor.go @@ -2,6 +2,7 @@ package common import ( "context" + "errors" "fmt" log "github.com/sirupsen/logrus" @@ -130,6 +131,31 @@ func NewParallelExecutor(parallel int, executors ...Executor) Executor { } } +func NewFieldExecutor(name string, value interface{}, exec Executor) Executor { + return func(ctx context.Context) error { + return exec(WithLogger(ctx, Logger(ctx).WithField(name, value))) + } +} + +// Then runs another executor if this executor succeeds +func (e Executor) ThenError(then func(ctx context.Context, err error) error) Executor { + return func(ctx context.Context) error { + err := e(ctx) + if err != nil { + switch err.(type) { + case Warning: + Logger(ctx).Warning(err.Error()) + default: + return then(ctx, err) + } + } + if ctx.Err() != nil { + return ctx.Err() + } + return then(ctx, err) + } +} + // Then runs another executor if this executor succeeds func (e Executor) Then(then Executor) Executor { return func(ctx context.Context) error { @@ -149,6 +175,25 @@ func (e Executor) Then(then Executor) Executor { } } +// Then runs another executor if this executor succeeds +func (e Executor) OnError(then Executor) Executor { + return func(ctx context.Context) error { + err := e(ctx) + if err != nil { + switch err.(type) { + case Warning: + Logger(ctx).Warning(err.Error()) + default: + return errors.Join(err, then(ctx)) + } + } + if ctx.Err() != nil { + return ctx.Err() + } + return nil + } +} + // If only runs this executor if conditional is true func (e Executor) If(conditional Conditional) Executor { return func(ctx context.Context) error { diff --git a/pkg/runner/job_executor.go b/pkg/runner/job_executor.go index 30cd75c..e3ae2be 100644 --- a/pkg/runner/job_executor.go +++ b/pkg/runner/job_executor.go @@ -94,7 +94,7 @@ func newJobExecutor(info jobInfo, sf stepFactory, rc *RunContext) common.Executo } } - postExecutor = postExecutor.Finally(func(ctx context.Context) error { + var stopContainerExecutor common.Executor = func(ctx context.Context) error { jobError := common.JobError(ctx) var err error if rc.Config.AutoRemove || jobError == nil { @@ -108,30 +108,48 @@ func newJobExecutor(info jobInfo, sf stepFactory, rc *RunContext) common.Executo logger.Errorf("Error while stop job container: %v", err) } } + return err + } + + var setJobResultExecutor common.Executor = func(ctx context.Context) error { + jobError := common.JobError(ctx) setJobResult(ctx, info, rc, jobError == nil) setJobOutputs(ctx, rc) + return nil + } - return err - }) + var setJobError = func(ctx context.Context, err error) error { + common.SetJobError(ctx, err) + return nil + } pipeline := make([]common.Executor, 0) - pipeline = append(pipeline, rc.InitializeNodeTool()) pipeline = append(pipeline, preSteps...) pipeline = append(pipeline, steps...) - return common.NewPipelineExecutor(info.startContainer(), common.NewPipelineExecutor(pipeline...). - Finally(func(ctx context.Context) error { //nolint:contextcheck - var cancel context.CancelFunc - if ctx.Err() == context.Canceled { - // in case of an aborted run, we still should execute the - // post steps to allow cleanup. - ctx, cancel = context.WithTimeout(common.WithLogger(context.Background(), common.Logger(ctx)), 5*time.Minute) - defer cancel() - } - return postExecutor(ctx) - }). - Finally(info.interpolateOutputs()). - Finally(info.closeContainer())) + return common.NewPipelineExecutor( + common.NewFieldExecutor("step", "Set up job", common.NewFieldExecutor("stepid", []string{"--setup-job"}, + common.NewPipelineExecutor(common.NewInfoExecutor("\u2B50 Run Set up job"), info.startContainer(), rc.InitializeNodeTool()). + Then(common.NewFieldExecutor("stepResult", model.StepStatusSuccess, common.NewInfoExecutor(" \u2705 Success - Set up job"))). + OnError(common.NewFieldExecutor("stepResult", model.StepStatusFailure, common.NewInfoExecutor(" \u274C Failure - Set up job")).ThenError(setJobError)))), + common.NewPipelineExecutor(pipeline...). + Finally(func(ctx context.Context) error { //nolint:contextcheck + var cancel context.CancelFunc + if ctx.Err() == context.Canceled { + // in case of an aborted run, we still should execute the + // post steps to allow cleanup. + ctx, cancel = context.WithTimeout(common.WithLogger(context.Background(), common.Logger(ctx)), 5*time.Minute) + defer cancel() + } + return postExecutor(ctx) + }). + Finally(common.NewFieldExecutor("step", "Complete job", common.NewFieldExecutor("stepid", []string{"--complete-job"}, + common.NewInfoExecutor("\u2B50 Run Complete job"). + Finally(stopContainerExecutor). + Finally( + info.interpolateOutputs().Finally(info.closeContainer()).Then(common.NewFieldExecutor("stepResult", model.StepStatusSuccess, common.NewInfoExecutor(" \u2705 Success - Complete job"))). + OnError(common.NewFieldExecutor("stepResult", model.StepStatusFailure, common.NewInfoExecutor(" \u274C Failure - Complete job"))), + )))).Finally(setJobResultExecutor)) } func setJobResult(ctx context.Context, info jobInfo, rc *RunContext, success bool) {