Commit 26bc97ab authored by Christian Pointner's avatar Christian Pointner
Browse files

added worker state and implemented importer health check

parent 2545966f
Pipeline #551 passed with stages
in 11 minutes and 23 seconds
......@@ -26,6 +26,7 @@ package importer
import (
"context"
"fmt"
"io/ioutil"
"log"
"net/http"
......@@ -44,6 +45,7 @@ type Importer struct {
dbgLog *log.Logger
store *store.Store
wgWorker sync.WaitGroup
workers []*WorkerState
work chan *Job
jobs *jobInventory
httpC *http.Client
......@@ -84,9 +86,22 @@ func (im *Importer) GetJob(show string, id uint64) (*Job, error) {
return im.jobs.GetJob(show, id)
}
func (im *Importer) Healthz(ctx context.Context) error {
// TODO: implement this!
return nil
func (im *Importer) Healthz(ctx context.Context) (err error) {
if len(im.workers) != im.conf.Workers {
return fmt.Errorf("importer has not enough/too many worker: got %d, expected %d", len(im.workers), im.conf.Workers)
}
for _, ws := range im.workers {
switch ws.get() {
case WorkerIdle:
fallthrough
case WorkerBusy:
continue
default:
err = fmt.Errorf("at least one worker is not running")
break
}
}
return
}
// handle subscriptions to new jobs-of-show -> pass on to job inventory
......@@ -95,11 +110,13 @@ func (im *Importer) deleteJob(show string, id uint64) error {
return im.jobs.DeleteJob(show, id)
}
func (im *Importer) runWorker(idx int) {
func (im *Importer) runWorker(idx int, state *WorkerState) {
defer im.dbgLog.Printf("importer: worker(%d) has stopped", idx)
im.dbgLog.Printf("importer: worker(%d) is running", idx)
state.set(WorkerIdle)
for job := range im.work {
state.set(WorkerBusy)
im.infoLog.Printf("importer: worker(%d) starting job(%s/%d) ...", idx, job.Show, job.ID)
// job.run() will take care of cancelation and timeouts to make sure this function returns in time.
......@@ -114,6 +131,8 @@ func (im *Importer) runWorker(idx int) {
} else {
im.infoLog.Printf("importer: worker(%d) successfully completed job(%s/%d)", idx, job.Show, job.ID)
}
state.set(WorkerIdle)
}
}
......@@ -145,7 +164,9 @@ func NewImporter(conf Config, st *store.Store, infoLog, errLog, dbgLog *log.Logg
im.wgWorker.Add(1)
go func(idx int) {
defer im.wgWorker.Done()
im.runWorker(idx)
state := WorkerNew
im.workers = append(im.workers, &state)
im.runWorker(idx, &state)
}(i)
}
im.jobs = newJobInventory(infoLog, errLog, dbgLog)
......
......@@ -60,7 +60,45 @@ var (
ErrAlreadyCanceled = errors.New("job is already canceled")
)
//******* Source URL
//******* Worker: State
type WorkerState uint32
const (
WorkerNew WorkerState = iota
WorkerIdle
WorkerBusy
WorkerStopped
)
func (s *WorkerState) set(state WorkerState) {
atomic.StoreUint32((*uint32)(s), uint32(state))
}
func (s *WorkerState) get() (state WorkerState) {
return WorkerState(atomic.LoadUint32((*uint32)(s)))
}
func (s *WorkerState) String() string {
switch s.get() {
case WorkerNew:
return "new"
case WorkerIdle:
return "idle"
case WorkerBusy:
return "busy"
case WorkerStopped:
return "stopped"
}
return "unknown"
}
func (s *WorkerState) MarshalText() (data []byte, err error) {
data = []byte(s.String())
return
}
//******* Job: Source URL
type SourceURL url.URL
......@@ -78,7 +116,7 @@ func (s *SourceURL) UnmarshalText(data []byte) (err error) {
return
}
//******* Source
//******* Job: Source
type JobSourceResult struct {
Err error `json:"error,omitempty"`
......@@ -97,7 +135,7 @@ type JobSource interface {
Done(*JobSourceResult)
}
//******* State
//******* Job: State
type JobState uint32
......@@ -155,7 +193,7 @@ func (s *JobState) UnmarshalText(data []byte) (err error) {
return s.fromString(string(data))
}
//******* Progress
//******* Job: Progress
type JobProgressStep uint32
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment