chore(poller): add metric to track the worker number

Add metric to track multiple task.
This commit is contained in:
Bo-Yi Wu 2022-11-11 15:00:38 +08:00 committed by Jason Song
parent d1114da299
commit abdb547b1b
3 changed files with 68 additions and 32 deletions

33
poller/metric.go Normal file
View File

@ -0,0 +1,33 @@
package poller
import "sync/atomic"
// Metric interface
type Metric interface {
IncBusyWorker() uint64
DecBusyWorker() uint64
BusyWorkers() uint64
}
var _ Metric = (*metric)(nil)
type metric struct {
busyWorkers uint64
}
// NewMetric for default metric structure
func NewMetric() Metric {
return &metric{}
}
func (m *metric) IncBusyWorker() uint64 {
return atomic.AddUint64(&m.busyWorkers, 1)
}
func (m *metric) DecBusyWorker() uint64 {
return atomic.AddUint64(&m.busyWorkers, ^uint64(0))
}
func (m *metric) BusyWorkers() uint64 {
return atomic.LoadUint64(&m.busyWorkers)
}

View File

@ -24,6 +24,7 @@ func New(cli client.Client, dispatch func(context.Context, *runnerv1.Task) error
Client: cli, Client: cli,
Dispatch: dispatch, Dispatch: dispatch,
routineGroup: newRoutineGroup(), routineGroup: newRoutineGroup(),
metric: &metric{},
} }
} }
@ -33,6 +34,7 @@ type Poller struct {
Dispatch func(context.Context, *runnerv1.Task) error Dispatch func(context.Context, *runnerv1.Task) error
routineGroup *routineGroup routineGroup *routineGroup
metric *metric
errorRetryCounter int errorRetryCounter int
} }
@ -111,5 +113,38 @@ func (p *Poller) poll(ctx context.Context, thread int) error {
runCtx, cancel := context.WithTimeout(ctx, time.Hour) runCtx, cancel := context.WithTimeout(ctx, time.Hour)
defer cancel() defer cancel()
// update runner status
// running: idle -> active
// stopped: active -> idle
if val := p.metric.IncBusyWorker(); val == 1 {
if _, err := p.Client.UpdateRunner(
ctx,
connect.NewRequest(&runnerv1.UpdateRunnerRequest{
Status: runnerv1.RunnerStatus_RUNNER_STATUS_ACTIVE,
}),
); err != nil {
return err
}
l.Info("update runner status to active")
}
defer func() {
if val := p.metric.DecBusyWorker(); val != 0 {
return
}
defer func() {
if _, err := p.Client.UpdateRunner(
ctx,
connect.NewRequest(&runnerv1.UpdateRunnerRequest{
Status: runnerv1.RunnerStatus_RUNNER_STATUS_IDLE,
}),
); err != nil {
log.Errorln("update status error:", err.Error())
}
l.Info("update runner status to idle")
}()
}()
return p.Dispatch(runCtx, resp.Msg.Task) return p.Dispatch(runCtx, resp.Msg.Task)
} }

View File

@ -5,9 +5,6 @@ import (
"gitea.com/gitea/act_runner/client" "gitea.com/gitea/act_runner/client"
runnerv1 "gitea.com/gitea/proto-go/runner/v1" runnerv1 "gitea.com/gitea/proto-go/runner/v1"
"github.com/bufbuild/connect-go"
log "github.com/sirupsen/logrus"
) )
// Runner runs the pipeline. // Runner runs the pipeline.
@ -20,34 +17,5 @@ type Runner struct {
// Run runs the pipeline stage. // Run runs the pipeline stage.
func (s *Runner) Run(ctx context.Context, task *runnerv1.Task) error { func (s *Runner) Run(ctx context.Context, task *runnerv1.Task) error {
l := log.
WithField("task.id", task.Id)
l.Info("start running pipeline")
// update runner status
// running: idle -> active
// stopped: active -> idle
if _, err := s.Client.UpdateRunner(
ctx,
connect.NewRequest(&runnerv1.UpdateRunnerRequest{
Status: runnerv1.RunnerStatus_RUNNER_STATUS_ACTIVE,
}),
); err != nil {
return err
}
l.Info("update runner status to active")
defer func() {
if _, err := s.Client.UpdateRunner(
ctx,
connect.NewRequest(&runnerv1.UpdateRunnerRequest{
Status: runnerv1.RunnerStatus_RUNNER_STATUS_IDLE,
}),
); err != nil {
log.Errorln("update status error:", err.Error())
}
l.Info("update runner status to idle")
}()
return NewTask(s.ForgeInstance, task.Id, s.Client, s.Environ).Run(ctx, task) return NewTask(s.ForgeInstance, task.Id, s.Client, s.Environ).Run(ctx, task)
} }