Skip to content
Snippets Groups Projects
Commit 68ea7cac authored by Christian Pointner's avatar Christian Pointner
Browse files

improved signaling for attached sources

parent 220a9146
No related branches found
No related tags found
No related merge requests found
......@@ -74,13 +74,15 @@ func (api *API) UploadFile() http.Handler {
return
}
if err = job.AttachSource(uint64(r.ContentLength), r.Body); err != nil {
done, err := job.AttachSource(uint64(r.ContentLength), r.Body)
if err != nil {
sendError(w, err)
return
}
if err = <-done; err != nil {
sendError(w, err)
return
}
<-job.Done()
// TODO: get result from job
sendWebResponse(w, http.StatusOK, nil)
})
}
......
......@@ -59,7 +59,9 @@ func (job *Job) prepareSource() {
// simulate a 10 MB file...
l := uint64(10 * 1024 * 1024)
r := devNull(l)
atomic.StorePointer((*unsafe.Pointer)(unsafe.Pointer(&job.source)), unsafe.Pointer(&JobSource{l, &r}))
src := &JobSource{len: l, r: &r}
src.done = make(chan error, 1)
atomic.StorePointer((*unsafe.Pointer)(unsafe.Pointer(&job.source)), unsafe.Pointer(src))
close(job.subC.sourceAttached)
}
}
......@@ -89,6 +91,8 @@ func (job *Job) fetch() error {
case <-job.subC.sourceAttached:
}
// job.source is now initialized and points to a valid source
// make sure a potentially connected source gets notified in any case
defer close(job.source.done)
conv, err := job.newConverter()
if err != nil {
......@@ -119,5 +123,6 @@ func (job *Job) fetch() error {
}
job.Progress.set(StepFetching, 1)
job.source.done <- err
return err
}
......@@ -145,17 +145,18 @@ func (job *Job) Start(ctx context.Context, timeout time.Duration) error {
return nil
}
func (job *Job) AttachSource(length uint64, r io.Reader) error {
func (job *Job) AttachSource(length uint64, r io.Reader) (<-chan error, error) {
if state := atomic.LoadUint32((*uint32)(&job.State)); state != uint32(JobRunning) {
return ErrImportNotRunning
return nil, ErrImportNotRunning
}
src := &JobSource{length, r}
src := &JobSource{len: length, r: r}
src.done = make(chan error, 1)
if ok := atomic.CompareAndSwapPointer((*unsafe.Pointer)(unsafe.Pointer(&job.source)), unsafe.Pointer(nil), unsafe.Pointer(src)); !ok {
return ErrSourceAlreadyAttached
return nil, ErrSourceAlreadyAttached
}
close(job.subC.sourceAttached)
return nil
return src.done, nil
}
func (job *Job) Cancel() {
......
......@@ -75,8 +75,9 @@ func (s *SourceURL) UnmarshalText(data []byte) (err error) {
//******* Source
type JobSource struct {
len uint64
r io.Reader
len uint64
r io.Reader
done chan error
}
//******* State
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment