Skip to content
Snippets Groups Projects
Commit 71396913 authored by Christian Pointner's avatar Christian Pointner
Browse files

added handling for job subscriptions

parent 71aee899
No related branches found
No related tags found
No related merge requests found
......@@ -30,8 +30,8 @@ import (
)
type jobInventoryGroup struct {
jobs map[uint64]*Job
// add list of subscription channels
jobs map[uint64]*Job
changed chan struct{}
}
func newJobInventoryGroup() *jobInventoryGroup {
......@@ -63,6 +63,26 @@ func (i *jobInventory) ListJobs(group string) (jobs Jobs) {
return
}
func (i *jobInventory) SubscribeJobs(group string) (jobs Jobs, changed <-chan struct{}) {
i.mu.RLock()
defer i.mu.RUnlock()
ig, exists := i.groups[group]
if !exists {
ig = newJobInventoryGroup()
ig.changed = make(chan struct{})
return jobs, ig.changed
}
for _, j := range ig.jobs {
jobs = append(jobs, j)
}
if ig.changed == nil {
ig.changed = make(chan struct{})
}
return jobs, ig.changed
}
func (i *jobInventory) InsertOrGetJob(group string, id uint64, jobIn *Job) (job *Job) {
i.mu.Lock()
defer i.mu.Unlock()
......@@ -70,15 +90,17 @@ func (i *jobInventory) InsertOrGetJob(group string, id uint64, jobIn *Job) (job
ig, exists := i.groups[group]
if !exists {
ig = newJobInventoryGroup()
ig.jobs[id] = jobIn
i.groups[group] = ig
i.dbgLog.Printf("importer: job-inventory added job(%s/%d)", group, id)
return jobIn
}
if job, exists = ig.jobs[id]; !exists {
ig.jobs[id] = jobIn
i.dbgLog.Printf("importer: job-inventory added job(%s/%d)", group, id)
if ig.changed != nil {
close(ig.changed)
ig.changed = nil
}
return jobIn
}
......@@ -115,15 +137,17 @@ func (i *jobInventory) DeleteJob(group string, id uint64) (err error) {
delete(ig.jobs, id)
i.dbgLog.Printf("importer: job-inventory removed job(%s/%d)", group, id)
if len(ig.jobs) == 0 {
// TODO: only do this if there are no subscriptions
delete(i.groups, group)
i.dbgLog.Printf("importer: job-inventory also removed now empty group %s", group)
}
if ig.changed != nil {
close(ig.changed)
ig.changed = nil
}
return
}
// TODO: handle subscriptions to new and deleted jobs
func newJobInventory(infoLog, errLog, dbgLog *log.Logger) *jobInventory {
i := &jobInventory{infoLog: infoLog, errLog: errLog, dbgLog: dbgLog}
i.groups = make(map[string]*jobInventoryGroup)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment