diff --git a/cmd/daemon.go b/cmd/daemon.go index 7c78806..d2dad1a 100644 --- a/cmd/daemon.go +++ b/cmd/daemon.go @@ -59,6 +59,7 @@ func runDaemon(ctx context.Context, envFile string) func(cmd *cobra.Command, arg poller := poller.New( cli, runner.Run, + cfg.Runner.Capacity, ) g.Go(func() error { @@ -81,7 +82,7 @@ func runDaemon(ctx context.Context, envFile string) func(cmd *cobra.Command, arg Errorln("failed to update runner") } - return poller.Poll(ctx, cfg.Runner.Capacity) + return poller.Poll(ctx) }) g.Go(func() error { diff --git a/poller/poller.go b/poller/poller.go index 9e0d52d..8b3408e 100644 --- a/poller/poller.go +++ b/poller/poller.go @@ -3,6 +3,7 @@ package poller import ( "context" "errors" + "sync" "time" "gitea.com/gitea/act_runner/client" @@ -14,12 +15,13 @@ import ( var ErrDataLock = errors.New("Data Lock Error") -func New(cli client.Client, dispatch func(context.Context, *runnerv1.Task) error) *Poller { +func New(cli client.Client, dispatch func(context.Context, *runnerv1.Task) error, workerNum int) *Poller { return &Poller{ Client: cli, Dispatch: dispatch, routineGroup: newRoutineGroup(), metric: &metric{}, + workerNum: workerNum, } } @@ -28,48 +30,84 @@ type Poller struct { Filter *client.Filter Dispatch func(context.Context, *runnerv1.Task) error + sync.Mutex routineGroup *routineGroup metric *metric + ready chan struct{} + workerNum int } func (p *Poller) Wait() { p.routineGroup.Wait() } -func (p *Poller) Poll(ctx context.Context, n int) error { - for i := 0; i < n; i++ { - func(i int) { - p.routineGroup.Run(func() { - for { - select { - case <-ctx.Done(): - log.Infof("stopped the runner: %d", i+1) - return - default: - if ctx.Err() != nil { - log.Infof("stopping the runner: %d", i+1) - return - } - if err := p.poll(ctx, i+1); err != nil { - log.WithField("thread", i+1). - WithError(err).Error("poll error") - select { - case <-ctx.Done(): - return - case <-time.After(5 * time.Second): - } - } - } - } - }) - }(i) +func (p *Poller) schedule() { + p.Lock() + defer p.Unlock() + if int(p.metric.BusyWorkers()) >= p.workerNum { + return + } + + select { + case p.ready <- struct{}{}: + default: } - p.routineGroup.Wait() - return nil } -func (p *Poller) poll(ctx context.Context, thread int) error { - l := log.WithField("thread", thread) +func (p *Poller) Poll(ctx context.Context) error { + l := log.WithField("func", "Poll") + + for { + // check worker number + p.schedule() + + select { + // wait worker ready + case <-p.ready: + case <-ctx.Done(): + return nil + } + LOOP: + for { + select { + case <-ctx.Done(): + break LOOP + default: + task, err := p.pollTask(ctx) + if task == nil || err != nil { + if err != nil { + l.Errorf("can't find the task: %v", err.Error()) + } + time.Sleep(5 * time.Second) + break + } + + // update runner status + // running: idle -> active + if val := p.metric.IncBusyWorker(); val == 1 { + if _, err := p.Client.UpdateRunner( + ctx, + connect.NewRequest(&runnerv1.UpdateRunnerRequest{ + Status: runnerv1.RunnerStatus_RUNNER_STATUS_ACTIVE, + }), + ); err != nil { + return err + } + l.Info("update runner status to active") + } + p.routineGroup.Run(func() { + if err := p.dispatchTask(ctx, task); err != nil { + l.Errorf("execute task: %v", err.Error()) + } + }) + break LOOP + } + } + } +} + +func (p *Poller) pollTask(ctx context.Context) (*runnerv1.Task, error) { + l := log.WithField("func", "pollTask") l.Info("poller: request stage from remote server") reqCtx, cancel := context.WithTimeout(ctx, 5*time.Second) @@ -80,60 +118,54 @@ func (p *Poller) poll(ctx context.Context, thread int) error { resp, err := p.Client.FetchTask(reqCtx, connect.NewRequest(&runnerv1.FetchTaskRequest{})) if err == context.Canceled || err == context.DeadlineExceeded { l.WithError(err).Trace("poller: no stage returned") - return nil + return nil, nil } if err != nil && err == ErrDataLock { l.WithError(err).Info("task accepted by another runner") - return nil + return nil, nil } if err != nil { l.WithError(err).Error("cannot accept task") - return err + return nil, err } // exit if a nil or empty stage is returned from the system // and allow the runner to retry. if resp.Msg.Task == nil || resp.Msg.Task.Id == 0 { - return nil + return nil, nil } + return resp.Msg.Task, nil +} + +func (p *Poller) dispatchTask(ctx context.Context, task *runnerv1.Task) error { + l := log.WithField("func", "dispatchTask") + defer func() { + val := p.metric.DecBusyWorker() + e := recover() + if e != nil { + l.Errorf("panic error: %v", e) + } + p.schedule() + + if val != 0 { + return + } + if _, err := p.Client.UpdateRunner( + ctx, + connect.NewRequest(&runnerv1.UpdateRunnerRequest{ + Status: runnerv1.RunnerStatus_RUNNER_STATUS_IDLE, + }), + ); err != nil { + l.Errorln("update status error:", err.Error()) + } + l.Info("update runner status to idle") + }() + runCtx, cancel := context.WithTimeout(ctx, time.Hour) defer cancel() - // update runner status - // running: idle -> active - // stopped: active -> idle - if val := p.metric.IncBusyWorker(); val == 1 { - if _, err := p.Client.UpdateRunner( - ctx, - connect.NewRequest(&runnerv1.UpdateRunnerRequest{ - Status: runnerv1.RunnerStatus_RUNNER_STATUS_ACTIVE, - }), - ); err != nil { - return err - } - l.Info("update runner status to active") - } - - defer func() { - if val := p.metric.DecBusyWorker(); val != 0 { - return - } - - defer func() { - if _, err := p.Client.UpdateRunner( - ctx, - connect.NewRequest(&runnerv1.UpdateRunnerRequest{ - Status: runnerv1.RunnerStatus_RUNNER_STATUS_IDLE, - }), - ); err != nil { - log.Errorln("update status error:", err.Error()) - } - l.Info("update runner status to idle") - }() - }() - - return p.Dispatch(runCtx, resp.Msg.Task) + return p.Dispatch(runCtx, task) }