Skip to content
Snippets Groups Projects
Commit 51a638c1 authored by Christian Pointner's avatar Christian Pointner
Browse files

using a mutex in job-inventory is actually much easier to understand

parent d0aec73a
No related branches found
No related tags found
No related merge requests found
......@@ -46,7 +46,7 @@ type Importer struct {
}
func (im *Importer) ListJobs(group string) (Jobs, error) {
return im.jobs.ListJobs(group)
return im.jobs.ListJobs(group), nil // for now error is always nil but this might change later
}
func (im *Importer) CreateJob(group string, id uint64, src url.URL, user, refID string) (*Job, error) {
......@@ -63,7 +63,7 @@ func (im *Importer) CreateJob(group string, id uint64, src url.URL, user, refID
return nil, ErrFileNotNew
}
return im.jobs.InsertOrGetJob(group, id, newJob(im, group, id, src, user, refID))
return im.jobs.InsertOrGetJob(group, id, newJob(im, group, id, src, user, refID)), nil
}
func (im *Importer) GetJob(group string, id uint64) (*Job, error) {
......
......@@ -26,51 +26,9 @@ package importer
import (
"log"
"sync"
)
type listJobsResp struct {
jobs Jobs
err error
}
type listJobsReq struct {
resp chan listJobsResp
group string
}
type insertOrGetJobResp struct {
job *Job
err error
}
type insertOrGetJobReq struct {
resp chan insertOrGetJobResp
group string
id uint64
job *Job
}
type getJobResp struct {
job *Job
err error
}
type getJobReq struct {
resp chan getJobResp
group string
id uint64
}
type deleteJobResp struct {
err error
}
type deleteJobReq struct {
resp chan deleteJobResp
group string
id uint64
}
type jobInventoryGroup struct {
jobs map[uint64]*Job
// add list of subscription channels
......@@ -83,72 +41,74 @@ func newJobInventoryGroup() *jobInventoryGroup {
}
type jobInventory struct {
infoLog *log.Logger
errLog *log.Logger
dbgLog *log.Logger
listJobsC chan listJobsReq
insertOrGetJobC chan insertOrGetJobReq
getJobC chan getJobReq
deleteJobC chan deleteJobReq
groups map[string]*jobInventoryGroup
infoLog *log.Logger
errLog *log.Logger
dbgLog *log.Logger
groups map[string]*jobInventoryGroup
mu sync.RWMutex
}
func (i *jobInventory) listJobs(group string) (resp listJobsResp) {
func (i *jobInventory) ListJobs(group string) (jobs Jobs) {
i.mu.RLock()
defer i.mu.RUnlock()
ig, exists := i.groups[group]
if !exists {
return
}
for _, j := range ig.jobs {
resp.jobs = append(resp.jobs, j)
jobs = append(jobs, j)
}
return
}
func (i *jobInventory) insertOrGetJob(group string, id uint64, job *Job) (resp insertOrGetJobResp) {
func (i *jobInventory) InsertOrGetJob(group string, id uint64, jobIn *Job) (job *Job) {
i.mu.Lock()
defer i.mu.Unlock()
ig, exists := i.groups[group]
if !exists {
ig = newJobInventoryGroup()
ig.jobs[id] = job
ig.jobs[id] = jobIn
i.groups[group] = ig
resp.job = job
return
return jobIn
}
if resp.job, exists = ig.jobs[id]; !exists {
ig.jobs[id] = job
resp.job = job
return
if job, exists = ig.jobs[id]; !exists {
ig.jobs[id] = jobIn
return jobIn
}
i.dbgLog.Printf("importer: job-inventory added job(%s/%d)", group, id)
return
}
func (i *jobInventory) getJob(group string, id uint64) (resp getJobResp) {
func (i *jobInventory) GetJob(group string, id uint64) (job *Job, err error) {
i.mu.RLock()
defer i.mu.RUnlock()
ig, exists := i.groups[group]
if !exists {
resp.err = ErrNotFound
return
return nil, ErrNotFound
}
if resp.job, exists = ig.jobs[id]; !exists {
resp.err = ErrNotFound
return
if job, exists = ig.jobs[id]; !exists {
return nil, ErrNotFound
}
return
}
func (i *jobInventory) deleteJob(group string, id uint64) (resp deleteJobResp) {
func (i *jobInventory) DeleteJob(group string, id uint64) (err error) {
i.mu.Lock()
defer i.mu.Unlock()
ig, exists := i.groups[group]
if !exists {
resp.err = ErrNotFound
return
return ErrNotFound
}
if _, exists = ig.jobs[id]; !exists {
resp.err = ErrNotFound
return
return ErrNotFound
}
delete(ig.jobs, id)
......@@ -161,67 +121,10 @@ func (i *jobInventory) deleteJob(group string, id uint64) (resp deleteJobResp) {
return
}
func (i *jobInventory) dispatchRequests() {
i.dbgLog.Println("importer: job-inventory initialized")
for {
select {
case req := <-i.listJobsC:
req.resp <- i.listJobs(req.group)
case req := <-i.insertOrGetJobC:
req.resp <- i.insertOrGetJob(req.group, req.id, req.job)
case req := <-i.getJobC:
req.resp <- i.getJob(req.group, req.id)
case req := <-i.deleteJobC:
req.resp <- i.deleteJob(req.group, req.id)
}
}
}
// *********************************************************
// Public Interface
func (i *jobInventory) ListJobs(group string) (Jobs, error) {
respC := make(chan listJobsResp)
req := listJobsReq{respC, group}
i.listJobsC <- req
resp := <-respC
return resp.jobs, resp.err
}
func (i *jobInventory) InsertOrGetJob(group string, id uint64, job *Job) (*Job, error) {
respC := make(chan insertOrGetJobResp)
req := insertOrGetJobReq{respC, group, id, job}
i.insertOrGetJobC <- req
resp := <-respC
return resp.job, resp.err
}
func (i *jobInventory) GetJob(group string, id uint64) (*Job, error) {
respC := make(chan getJobResp)
req := getJobReq{respC, group, id}
i.getJobC <- req
resp := <-respC
return resp.job, resp.err
}
func (i *jobInventory) DeleteJob(group string, id uint64) error {
respC := make(chan deleteJobResp)
req := deleteJobReq{respC, group, id}
i.deleteJobC <- req
resp := <-respC
return resp.err
}
// TODO: handle subscriptions to new and deleted jobs
func newJobInventory(infoLog, errLog, dbgLog *log.Logger) *jobInventory {
i := &jobInventory{infoLog: infoLog, errLog: errLog, dbgLog: dbgLog}
i.listJobsC = make(chan listJobsReq)
i.insertOrGetJobC = make(chan insertOrGetJobReq)
i.getJobC = make(chan getJobReq)
i.deleteJobC = make(chan deleteJobReq)
i.groups = make(map[string]*jobInventoryGroup)
go i.dispatchRequests()
return i
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment