Skip to content
Snippets Groups Projects
Commit 3e410741 authored by Christian Pointner's avatar Christian Pointner
Browse files

fix job thread safety

parent f625f218
No related branches found
No related tags found
No related merge requests found
...@@ -93,7 +93,7 @@ func uiIndexHtml() (*asset, error) { ...@@ -93,7 +93,7 @@ func uiIndexHtml() (*asset, error) {
return nil, err return nil, err
} }
   
info := bindataFileInfo{name: "ui/index.html", size: 10339, mode: os.FileMode(436), modTime: time.Unix(1530831239, 0)} info := bindataFileInfo{name: "ui/index.html", size: 10339, mode: os.FileMode(436), modTime: time.Unix(1530835704, 0)}
a := &asset{bytes: bytes, info: info} a := &asset{bytes: bytes, info: info}
return a, nil return a, nil
} }
...@@ -113,7 +113,7 @@ func uiCssBootstrapRebootMinCss() (*asset, error) { ...@@ -113,7 +113,7 @@ func uiCssBootstrapRebootMinCss() (*asset, error) {
return nil, err return nil, err
} }
   
info := bindataFileInfo{name: "ui/css/bootstrap-reboot.min.css", size: 3989, mode: os.FileMode(420), modTime: time.Unix(1525033372, 0)} info := bindataFileInfo{name: "ui/css/bootstrap-reboot.min.css", size: 3989, mode: os.FileMode(436), modTime: time.Unix(1530321195, 0)}
a := &asset{bytes: bytes, info: info} a := &asset{bytes: bytes, info: info}
return a, nil return a, nil
} }
...@@ -133,7 +133,7 @@ func uiCssBootstrapRebootMinCssMap() (*asset, error) { ...@@ -133,7 +133,7 @@ func uiCssBootstrapRebootMinCssMap() (*asset, error) {
return nil, err return nil, err
} }
   
info := bindataFileInfo{name: "ui/css/bootstrap-reboot.min.css.map", size: 25857, mode: os.FileMode(436), modTime: time.Unix(1530624697, 0)} info := bindataFileInfo{name: "ui/css/bootstrap-reboot.min.css.map", size: 25857, mode: os.FileMode(420), modTime: time.Unix(1525065771, 0)}
a := &asset{bytes: bytes, info: info} a := &asset{bytes: bytes, info: info}
return a, nil return a, nil
} }
...@@ -153,7 +153,7 @@ func uiCssBootstrapMinCss() (*asset, error) { ...@@ -153,7 +153,7 @@ func uiCssBootstrapMinCss() (*asset, error) {
return nil, err return nil, err
} }
   
info := bindataFileInfo{name: "ui/css/bootstrap.min.css", size: 140930, mode: os.FileMode(420), modTime: time.Unix(1525033370, 0)} info := bindataFileInfo{name: "ui/css/bootstrap.min.css", size: 140930, mode: os.FileMode(436), modTime: time.Unix(1530321195, 0)}
a := &asset{bytes: bytes, info: info} a := &asset{bytes: bytes, info: info}
return a, nil return a, nil
} }
...@@ -173,7 +173,7 @@ func uiCssBootstrapMinCssMap() (*asset, error) { ...@@ -173,7 +173,7 @@ func uiCssBootstrapMinCssMap() (*asset, error) {
return nil, err return nil, err
} }
   
info := bindataFileInfo{name: "ui/css/bootstrap.min.css.map", size: 559636, mode: os.FileMode(436), modTime: time.Unix(1530624697, 0)} info := bindataFileInfo{name: "ui/css/bootstrap.min.css.map", size: 559636, mode: os.FileMode(420), modTime: time.Unix(1525065769, 0)}
a := &asset{bytes: bytes, info: info} a := &asset{bytes: bytes, info: info}
return a, nil return a, nil
} }
...@@ -193,7 +193,7 @@ func uiCssMainCss() (*asset, error) { ...@@ -193,7 +193,7 @@ func uiCssMainCss() (*asset, error) {
return nil, err return nil, err
} }
   
info := bindataFileInfo{name: "ui/css/main.css", size: 232, mode: os.FileMode(436), modTime: time.Unix(1530314832, 0)} info := bindataFileInfo{name: "ui/css/main.css", size: 232, mode: os.FileMode(436), modTime: time.Unix(1530321195, 0)}
a := &asset{bytes: bytes, info: info} a := &asset{bytes: bytes, info: info}
return a, nil return a, nil
} }
...@@ -213,7 +213,7 @@ func uiJsBootstrapBundleMinJs() (*asset, error) { ...@@ -213,7 +213,7 @@ func uiJsBootstrapBundleMinJs() (*asset, error) {
return nil, err return nil, err
} }
   
info := bindataFileInfo{name: "ui/js/bootstrap.bundle.min.js", size: 70682, mode: os.FileMode(420), modTime: time.Unix(1530305287, 0)} info := bindataFileInfo{name: "ui/js/bootstrap.bundle.min.js", size: 70682, mode: os.FileMode(436), modTime: time.Unix(1530321195, 0)}
a := &asset{bytes: bytes, info: info} a := &asset{bytes: bytes, info: info}
return a, nil return a, nil
} }
...@@ -233,7 +233,7 @@ func uiJsBootstrapBundleMinJsMap() (*asset, error) { ...@@ -233,7 +233,7 @@ func uiJsBootstrapBundleMinJsMap() (*asset, error) {
return nil, err return nil, err
} }
   
info := bindataFileInfo{name: "ui/js/bootstrap.bundle.min.js.map", size: 292629, mode: os.FileMode(436), modTime: time.Unix(1530624697, 0)} info := bindataFileInfo{name: "ui/js/bootstrap.bundle.min.js.map", size: 292629, mode: os.FileMode(420), modTime: time.Unix(1525065777, 0)}
a := &asset{bytes: bytes, info: info} a := &asset{bytes: bytes, info: info}
return a, nil return a, nil
} }
...@@ -253,7 +253,7 @@ func uiJsJqueryMinJs() (*asset, error) { ...@@ -253,7 +253,7 @@ func uiJsJqueryMinJs() (*asset, error) {
return nil, err return nil, err
} }
   
info := bindataFileInfo{name: "ui/js/jquery.min.js", size: 86927, mode: os.FileMode(436), modTime: time.Unix(1516469204, 0)} info := bindataFileInfo{name: "ui/js/jquery.min.js", size: 86927, mode: os.FileMode(436), modTime: time.Unix(1530321195, 0)}
a := &asset{bytes: bytes, info: info} a := &asset{bytes: bytes, info: info}
return a, nil return a, nil
} }
...@@ -273,7 +273,7 @@ func uiJsMainJs() (*asset, error) { ...@@ -273,7 +273,7 @@ func uiJsMainJs() (*asset, error) {
return nil, err return nil, err
} }
   
info := bindataFileInfo{name: "ui/js/main.js", size: 14941, mode: os.FileMode(436), modTime: time.Unix(1530831443, 0)} info := bindataFileInfo{name: "ui/js/main.js", size: 14941, mode: os.FileMode(436), modTime: time.Unix(1530835704, 0)}
a := &asset{bytes: bytes, info: info} a := &asset{bytes: bytes, info: info}
return a, nil return a, nil
} }
......
...@@ -72,8 +72,7 @@ func (im *Importer) CreateJob(group string, id uint64, src url.URL, user, refID ...@@ -72,8 +72,7 @@ func (im *Importer) CreateJob(group string, id uint64, src url.URL, user, refID
return nil, ErrFileNotNew return nil, ErrFileNotNew
} }
im.jobs.InsertOrGetJob(group, id, newJob(im, group, id, src, user, refID)) return im.jobs.InsertOrGetJob(group, id, newJob(im, group, id, src, user, refID))
return nil, ErrNotImplemented
} }
func (im *Importer) GetJob(group string, id uint64) (*Job, error) { func (im *Importer) GetJob(group string, id uint64) (*Job, error) {
...@@ -98,9 +97,10 @@ func (im *Importer) runWorker(idx int) { ...@@ -98,9 +97,10 @@ func (im *Importer) runWorker(idx int) {
} }
im.infoLog.Printf("importer: worker(%d) starting job(%s/%d) ...", idx, job.Group, job.ID) im.infoLog.Printf("importer: worker(%d) starting job(%s/%d) ...", idx, job.Group, job.ID)
err := ErrTimeout var err error
select { select {
case <-job.ctx.Done(): case <-job.ctx.Done():
err = job.ctx.Err()
case err = <-job.start(): case err = <-job.start():
} }
if err != nil { if err != nil {
......
...@@ -26,20 +26,76 @@ package importer ...@@ -26,20 +26,76 @@ package importer
import ( import (
"context" "context"
"errors"
"net/url" "net/url"
"sync/atomic"
"time" "time"
) )
type JobState uint32
const (
JobNew JobState = iota
JobInitializing
JobPending
JobRunning
JobDestroying
)
func (s JobState) String() string {
switch s {
case JobNew:
return "new"
case JobInitializing:
return "initializing"
case JobPending:
return "pending"
case JobRunning:
return "running"
case JobDestroying:
return "destroying"
}
return "unknown"
}
func (s *JobState) fromString(str string) error {
switch str {
case "new":
*s = JobNew
case "initializing":
*s = JobInitializing
case "pending":
*s = JobPending
case "running":
*s = JobRunning
case "destroying":
*s = JobDestroying
default:
return errors.New("invalid job state: '" + str + "'")
}
return nil
}
func (s JobState) MarshalText() (data []byte, err error) {
data = []byte(s.String())
return
}
func (s *JobState) UnmarshalText(data []byte) (err error) {
return s.fromString(string(data))
}
type Job struct { type Job struct {
im *Importer im *Importer
ctx context.Context ctx context.Context
Cancel context.CancelFunc `json:"-"` cancel context.CancelFunc
CreatedAt time.Time `json:"created"` State JobState `json:"state"`
ID uint64 `json:"id"` CreatedAt time.Time `json:"created"`
Group string `json:"group"` ID uint64 `json:"id"`
User string `json:"user"` Group string `json:"group"`
Source url.URL `json:"source"` // TODO: json marshaller should render this as a string... User string `json:"user"`
RefID string `json:"ref-id,omitempty"` Source url.URL `json:"source"` // TODO: json marshaller should render this as a string...
RefID string `json:"ref-id,omitempty"`
} }
type Jobs []Job type Jobs []Job
...@@ -50,6 +106,7 @@ func (job *Job) run() (err error) { ...@@ -50,6 +106,7 @@ func (job *Job) run() (err error) {
// do the actual import here // do the actual import here
// use job.ctx.Done() wherever we wait for something // use job.ctx.Done() wherever we wait for something
// return ErrTimeout in case job.ctx.Done() is closed // return ErrTimeout in case job.ctx.Done() is closed
job.im.dbgLog.Printf("running import for %s/%d from: %s", job.Group, job.ID, job.Source.String())
time.Sleep(60 * time.Second) time.Sleep(60 * time.Second)
// update store.file: set import state to ImportDone or ImportAborted // update store.file: set import state to ImportDone or ImportAborted
// send result to all done subscriptions // send result to all done subscriptions
...@@ -70,16 +127,27 @@ func (job *Job) start() <-chan error { ...@@ -70,16 +127,27 @@ func (job *Job) start() <-chan error {
// handle subscriptions for progress and done // handle subscriptions for progress and done
func (job *Job) Start(timeout time.Duration) { func (job *Job) Start(timeout time.Duration) {
// TODO: do nothing if import is already running!! if !atomic.CompareAndSwapUint32((*uint32)(&job.State), uint32(JobNew), uint32(JobInitializing)) {
return
}
// shall we use a global context here so we can cancel all jobs at once? i.e. for forced shutdown... // shall we use a global context here so we can cancel all jobs at once? i.e. for forced shutdown...
if timeout > 0 { if timeout > 0 {
job.ctx, job.Cancel = context.WithTimeout(context.Background(), timeout) job.ctx, job.cancel = context.WithTimeout(context.Background(), timeout)
} else { } else {
job.ctx, job.Cancel = context.WithCancel(context.Background()) job.ctx, job.cancel = context.WithCancel(context.Background())
} }
// TODO: update store.file: set import state to ImportPending
job.im.work <- job job.im.work <- job
// TODO: update store.file: set import state to ImportPending
atomic.StoreUint32((*uint32)(&job.State), uint32(JobPending))
}
func (job *Job) Cancel() {
if atomic.LoadUint32((*uint32)(&job.State)) < uint32(JobPending) {
return
}
job.cancel()
} }
func newJob(im *Importer, group string, id uint64, src url.URL, user, refID string) *Job { func newJob(im *Importer, group string, id uint64, src url.URL, user, refID string) *Job {
......
...@@ -28,6 +28,49 @@ import ( ...@@ -28,6 +28,49 @@ import (
"log" "log"
) )
type listJobsResp struct {
jobs Jobs
err error
}
type listJobsReq struct {
resp chan listJobsResp
group string
}
type insertOrGetJobResp struct {
job *Job
err error
}
type insertOrGetJobReq struct {
resp chan insertOrGetJobResp
group string
id uint64
job *Job
}
type getJobResp struct {
job *Job
err error
}
type getJobReq struct {
resp chan getJobResp
group string
id uint64
}
type deleteJobResp struct {
err error
}
type deleteJobReq struct {
resp chan deleteJobResp
group string
id uint64
}
type jobInventoryGroup struct { type jobInventoryGroup struct {
jobs map[uint64]*Job jobs map[uint64]*Job
// add list of subscription channels // add list of subscription channels
...@@ -40,64 +83,72 @@ func newJobInventoryGroup() *jobInventoryGroup { ...@@ -40,64 +83,72 @@ func newJobInventoryGroup() *jobInventoryGroup {
} }
type jobInventory struct { type jobInventory struct {
infoLog *log.Logger infoLog *log.Logger
errLog *log.Logger errLog *log.Logger
dbgLog *log.Logger dbgLog *log.Logger
groups map[string]*jobInventoryGroup listJobsC chan listJobsReq
insertOrGetJobC chan insertOrGetJobReq
getJobC chan getJobReq
deleteJobC chan deleteJobReq
groups map[string]*jobInventoryGroup
} }
func (i *jobInventory) listJobs(group string) (jobs Jobs, err error) { func (i *jobInventory) listJobs(group string) (resp listJobsResp) {
ig, exists := i.groups[group] ig, exists := i.groups[group]
if !exists { if !exists {
return return
} }
for _, j := range ig.jobs { for _, j := range ig.jobs {
jobs = append(jobs, *j) resp.jobs = append(resp.jobs, *j)
} }
return return
} }
func (i *jobInventory) insertOrGetJob(group string, id uint64, job *Job) (*Job, error) { func (i *jobInventory) insertOrGetJob(group string, id uint64, job *Job) (resp insertOrGetJobResp) {
ig, exists := i.groups[group] ig, exists := i.groups[group]
if !exists { if !exists {
ig = newJobInventoryGroup() ig = newJobInventoryGroup()
ig.jobs[id] = job ig.jobs[id] = job
i.groups[group] = ig i.groups[group] = ig
return job, nil resp.job = job
return
} }
igj, exists := ig.jobs[id] if resp.job, exists = ig.jobs[id]; !exists {
if !exists {
ig.jobs[id] = job ig.jobs[id] = job
return job, nil resp.job = job
return
} }
i.dbgLog.Printf("importer: job-inventory added job(%s/%d)", group, id) i.dbgLog.Printf("importer: job-inventory added job(%s/%d)", group, id)
return igj, nil return
} }
func (i *jobInventory) getJob(group string, id uint64) (*Job, error) { func (i *jobInventory) getJob(group string, id uint64) (resp getJobResp) {
ig, exists := i.groups[group] ig, exists := i.groups[group]
if !exists { if !exists {
return nil, ErrNotFound resp.err = ErrNotFound
return
} }
igj, exists := ig.jobs[id] if resp.job, exists = ig.jobs[id]; !exists {
if !exists { resp.err = ErrNotFound
return nil, ErrNotFound return
} }
return igj, nil return
} }
func (i *jobInventory) deleteJob(group string, id uint64) error { func (i *jobInventory) deleteJob(group string, id uint64) (resp deleteJobResp) {
ig, exists := i.groups[group] ig, exists := i.groups[group]
if !exists { if !exists {
return ErrNotFound resp.err = ErrNotFound
return
} }
if _, exists = ig.jobs[id]; !exists { if _, exists = ig.jobs[id]; !exists {
return ErrNotFound resp.err = ErrNotFound
return
} }
delete(ig.jobs, id) delete(ig.jobs, id)
...@@ -107,34 +158,70 @@ func (i *jobInventory) deleteJob(group string, id uint64) error { ...@@ -107,34 +158,70 @@ func (i *jobInventory) deleteJob(group string, id uint64) error {
delete(i.groups, group) delete(i.groups, group)
i.dbgLog.Printf("importer: job-inventory also removed now empty group %s", group) i.dbgLog.Printf("importer: job-inventory also removed now empty group %s", group)
} }
return nil return
}
func (i *jobInventory) dispatchRequests() {
i.dbgLog.Println("importer: job-inventory initialized")
for {
select {
case req := <-i.listJobsC:
req.resp <- i.listJobs(req.group)
case req := <-i.insertOrGetJobC:
req.resp <- i.insertOrGetJob(req.group, req.id, req.job)
case req := <-i.getJobC:
req.resp <- i.getJob(req.group, req.id)
case req := <-i.deleteJobC:
req.resp <- i.deleteJob(req.group, req.id)
}
}
} }
// ********************************************************* // *********************************************************
// Public Interface // Public Interface
// TODO: for now these methods are not safe to be used in multiple goroutines!!!!
// but they soon will be!
func (i *jobInventory) ListJobs(group string) (Jobs, error) { func (i *jobInventory) ListJobs(group string) (Jobs, error) {
return i.listJobs(group) respC := make(chan listJobsResp)
req := listJobsReq{respC, group}
i.listJobsC <- req
resp := <-respC
return resp.jobs, resp.err
} }
func (i *jobInventory) InsertOrGetJob(group string, id uint64, job *Job) (*Job, error) { func (i *jobInventory) InsertOrGetJob(group string, id uint64, job *Job) (*Job, error) {
return i.insertOrGetJob(group, id, job) respC := make(chan insertOrGetJobResp)
req := insertOrGetJobReq{respC, group, id, job}
i.insertOrGetJobC <- req
resp := <-respC
return resp.job, resp.err
} }
func (i *jobInventory) GetJob(group string, id uint64) (*Job, error) { func (i *jobInventory) GetJob(group string, id uint64) (*Job, error) {
return i.getJob(group, id) respC := make(chan getJobResp)
req := getJobReq{respC, group, id}
i.getJobC <- req
resp := <-respC
return resp.job, resp.err
} }
func (i *jobInventory) DeleteJob(group string, id uint64) error { func (i *jobInventory) DeleteJob(group string, id uint64) error {
return i.deleteJob(group, id) respC := make(chan deleteJobResp)
req := deleteJobReq{respC, group, id}
i.deleteJobC <- req
resp := <-respC
return resp.err
} }
// TODO: handle subscriptions to new and deleted jobs // TODO: handle subscriptions to new and deleted jobs
func newJobInventory(infoLog, errLog, dbgLog *log.Logger) *jobInventory { func newJobInventory(infoLog, errLog, dbgLog *log.Logger) *jobInventory {
i := &jobInventory{infoLog: infoLog, errLog: errLog, dbgLog: dbgLog} i := &jobInventory{infoLog: infoLog, errLog: errLog, dbgLog: dbgLog}
i.listJobsC = make(chan listJobsReq)
i.insertOrGetJobC = make(chan insertOrGetJobReq)
i.getJobC = make(chan getJobReq)
i.deleteJobC = make(chan deleteJobReq)
i.groups = make(map[string]*jobInventoryGroup) i.groups = make(map[string]*jobInventoryGroup)
go i.dispatchRequests()
return i return i
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment