Skip to content
Snippets Groups Projects
Commit 50684dde authored by Christian Pointner's avatar Christian Pointner
Browse files

added notification channels for running and done

parent 7bd09d95
No related branches found
No related tags found
No related merge requests found
...@@ -58,7 +58,7 @@ func (job *Job) prepareSource() { ...@@ -58,7 +58,7 @@ func (job *Job) prepareSource() {
l := uint64(10 * 1024 * 1024) l := uint64(10 * 1024 * 1024)
r := devNull(l) r := devNull(l)
atomic.StorePointer((*unsafe.Pointer)(unsafe.Pointer(&job.source)), unsafe.Pointer(&JobSource{l, &r})) atomic.StorePointer((*unsafe.Pointer)(unsafe.Pointer(&job.source)), unsafe.Pointer(&JobSource{l, &r}))
close(job.sourceAttached) close(job.subC.sourceAttached)
} }
type progressWriter struct { type progressWriter struct {
...@@ -83,7 +83,7 @@ func (job *Job) fetch() (err error) { ...@@ -83,7 +83,7 @@ func (job *Job) fetch() (err error) {
select { select {
case <-job.ctx.Done(): case <-job.ctx.Done():
return job.ctx.Err() return job.ctx.Err()
case <-job.sourceAttached: case <-job.subC.sourceAttached:
} }
// TODO: use an actual converter here // TODO: use an actual converter here
......
...@@ -37,20 +37,24 @@ import ( ...@@ -37,20 +37,24 @@ import (
// Job represents a import job for the file identified by Group and ID // Job represents a import job for the file identified by Group and ID
type Job struct { type Job struct {
im *Importer im *Importer
ctx context.Context ctx context.Context
cancel context.CancelFunc cancel context.CancelFunc
State JobState `json:"state"` State JobState `json:"state"`
CreatedAt Timestamp `json:"created"` CreatedAt Timestamp `json:"created"`
StartedAt Timestamp `json:"started,omitempty"` StartedAt Timestamp `json:"started,omitempty"`
ID uint64 `json:"id"` ID uint64 `json:"id"`
Group string `json:"group"` Group string `json:"group"`
User string `json:"user"` User string `json:"user"`
Source SourceURL `json:"source"` Source SourceURL `json:"source"`
RefID string `json:"ref-id,omitempty"` RefID string `json:"ref-id,omitempty"`
Progress JobProgress `json:"progress"` Progress JobProgress `json:"progress"`
source *JobSource source *JobSource
sourceAttached chan struct{} subC struct {
sourceAttached chan struct{}
running chan struct{}
done chan struct{}
} `json:"-"`
} }
type Jobs []*Job type Jobs []*Job
...@@ -62,6 +66,7 @@ func (job *Job) run() (err error) { ...@@ -62,6 +66,7 @@ func (job *Job) run() (err error) {
// either way job.cancel() needs to be called and the job needs to be removed from the inventory // either way job.cancel() needs to be called and the job needs to be removed from the inventory
defer job.cancel() defer job.cancel()
defer job.cleanup() defer job.cleanup()
defer close(job.subC.done)
job.prepareSource() job.prepareSource()
...@@ -71,7 +76,7 @@ func (job *Job) run() (err error) { ...@@ -71,7 +76,7 @@ func (job *Job) run() (err error) {
// job.Cancel() only set the state to JobCanceled... so we simulate a canceled context here // job.Cancel() only set the state to JobCanceled... so we simulate a canceled context here
return context.Canceled return context.Canceled
} }
close(job.subC.running)
job.StartedAt.set(time.Now()) job.StartedAt.set(time.Now())
job.im.dbgLog.Printf("running import for %s/%d from: %s", job.Group, job.ID, job.Source.String()) job.im.dbgLog.Printf("running import for %s/%d from: %s", job.Group, job.ID, job.Source.String())
...@@ -149,7 +154,7 @@ func (job *Job) AttachSource(length uint64, r io.Reader) error { ...@@ -149,7 +154,7 @@ func (job *Job) AttachSource(length uint64, r io.Reader) error {
if ok := atomic.CompareAndSwapPointer((*unsafe.Pointer)(unsafe.Pointer(&job.source)), unsafe.Pointer(nil), unsafe.Pointer(src)); !ok { if ok := atomic.CompareAndSwapPointer((*unsafe.Pointer)(unsafe.Pointer(&job.source)), unsafe.Pointer(nil), unsafe.Pointer(src)); !ok {
return ErrSourceAlreadyAttached return ErrSourceAlreadyAttached
} }
close(job.sourceAttached) close(job.subC.sourceAttached)
return nil return nil
} }
...@@ -164,7 +169,13 @@ func (job *Job) Cancel() { ...@@ -164,7 +169,13 @@ func (job *Job) Cancel() {
job.cancel() job.cancel()
} }
// TODO: handle subscriptions to done func (job *Job) Running() <-chan struct{} {
return job.subC.running
}
func (job *Job) Done() <-chan struct{} {
return job.subC.done
}
func (job *Job) cleanup() { func (job *Job) cleanup() {
atomic.StoreUint32((*uint32)(&job.State), uint32(JobDestroying)) atomic.StoreUint32((*uint32)(&job.State), uint32(JobDestroying))
...@@ -178,6 +189,8 @@ func newJob(im *Importer, group string, id uint64, src url.URL, user, refID stri ...@@ -178,6 +189,8 @@ func newJob(im *Importer, group string, id uint64, src url.URL, user, refID stri
job := &Job{im: im, Group: group, ID: id, Source: SourceURL(src), User: user, RefID: refID} job := &Job{im: im, Group: group, ID: id, Source: SourceURL(src), User: user, RefID: refID}
job.State = JobNew job.State = JobNew
job.CreatedAt.set(time.Now()) job.CreatedAt.set(time.Now())
job.sourceAttached = make(chan struct{}) job.subC.sourceAttached = make(chan struct{})
job.subC.running = make(chan struct{})
job.subC.done = make(chan struct{})
return job return job
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment