From 0b885c5e5fe33e2781755a8a02fa4bddfa997306 Mon Sep 17 00:00:00 2001 From: Bo-Yi Wu Date: Sun, 14 Aug 2022 10:34:19 +0800 Subject: [PATCH] chore(poller): Add poller package Signed-off-by: Bo-Yi Wu --- cmd/config.go | 7 +++++++ cmd/damon.go | 24 +++++++++++++++++++++- go.mod | 2 +- poller/poller.go | 52 ++++++++++++++++++++++++++++++++++++++++++++++++ poller/thread.go | 24 ++++++++++++++++++++++ 5 files changed, 107 insertions(+), 2 deletions(-) create mode 100644 poller/poller.go create mode 100644 poller/thread.go diff --git a/cmd/config.go b/cmd/config.go index e7964ae..6ea263b 100644 --- a/cmd/config.go +++ b/cmd/config.go @@ -29,6 +29,13 @@ type ( Environ map[string]string `envconfig:"GITEA_RUNNER_ENVIRON"` EnvFile string `envconfig:"GITEA_RUNNER_ENV_FILE"` } + + Platform struct { + OS string `envconfig:"GITEA_PLATFORM_OS" default:"linux"` + Arch string `envconfig:"GITEA_PLATFORM_ARCH" default:"amd64"` + Kernel string `envconfig:"GITEA_PLATFORM_KERNEL"` + Variant string `envconfig:"GITEA_PLATFORM_VARIANT"` + } } ) diff --git a/cmd/damon.go b/cmd/damon.go index 29485de..f3b43ee 100644 --- a/cmd/damon.go +++ b/cmd/damon.go @@ -6,6 +6,8 @@ import ( "gitea.com/gitea/act_runner/client" "gitea.com/gitea/act_runner/engine" + "gitea.com/gitea/act_runner/poller" + "golang.org/x/sync/errgroup" "github.com/joho/godotenv" log "github.com/sirupsen/logrus" @@ -196,7 +198,27 @@ func runDaemon(ctx context.Context, input *Input) func(cmd *cobra.Command, args } } - return nil + var g errgroup.Group + + poller := poller.New(cli) + + g.Go(func() error { + log.WithField("capacity", cfg.Runner.Capacity). + WithField("endpoint", cfg.Client.Address). + WithField("os", cfg.Platform.OS). + WithField("arch", cfg.Platform.Arch). + Infoln("polling the remote server") + + poller.Poll(ctx, cfg.Runner.Capacity) + return nil + }) + + err = g.Wait() + if err != nil { + log.WithError(err). + Errorln("shutting down the server") + } + return err // var conn *websocket.Conn // var err error // ticker := time.NewTicker(time.Second) diff --git a/go.mod b/go.mod index 19a9e17..15ae640 100644 --- a/go.mod +++ b/go.mod @@ -13,6 +13,7 @@ require ( github.com/sirupsen/logrus v1.9.0 github.com/spf13/cobra v1.5.0 golang.org/x/net v0.0.0-20220403103023-749bd193bc2b + golang.org/x/sync v0.0.0-20220513210516-0976fa681c29 ) require ( @@ -61,7 +62,6 @@ require ( github.com/xanzy/ssh-agent v0.3.1 // indirect go.opencensus.io v0.23.0 // indirect golang.org/x/crypto v0.0.0-20220331220935-ae2d96664a29 // indirect - golang.org/x/sync v0.0.0-20220513210516-0976fa681c29 // indirect golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8 // indirect golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 // indirect golang.org/x/text v0.3.7 // indirect diff --git a/poller/poller.go b/poller/poller.go new file mode 100644 index 0000000..bab63e4 --- /dev/null +++ b/poller/poller.go @@ -0,0 +1,52 @@ +package poller + +import ( + "context" + + "gitea.com/gitea/act_runner/client" + log "github.com/sirupsen/logrus" +) + +func New(cli client.Client) *Poller { + return &Poller{ + Client: cli, + routineGroup: newRoutineGroup(), + } +} + +type Poller struct { + Client client.Client + + routineGroup *routineGroup +} + +func (p *Poller) Poll(ctx context.Context, n int) { + for i := 0; i < n; i++ { + func(i int) { + p.routineGroup.Run(func() { + for { + select { + case <-ctx.Done(): + log.Infof("stopped the runner: %d", i+1) + return + default: + if ctx.Err() != nil { + log.Infof("stopping the runner: %d", i+1) + return + } + if err := p.poll(ctx, i+1); err != nil { + log.WithError(err).Error("poll error") + } + } + } + }) + }(i) + } + p.routineGroup.Wait() +} + +func (p *Poller) poll(ctx context.Context, thread int) error { + log.WithField("thread", thread).Info("poller: request stage from remote server") + + return nil +} diff --git a/poller/thread.go b/poller/thread.go new file mode 100644 index 0000000..fe29a4a --- /dev/null +++ b/poller/thread.go @@ -0,0 +1,24 @@ +package poller + +import "sync" + +type routineGroup struct { + waitGroup sync.WaitGroup +} + +func newRoutineGroup() *routineGroup { + return new(routineGroup) +} + +func (g *routineGroup) Run(fn func()) { + g.waitGroup.Add(1) + + go func() { + defer g.waitGroup.Done() + fn() + }() +} + +func (g *routineGroup) Wait() { + g.waitGroup.Wait() +}