diff --git a/api/v1/api_uploads.go b/api/v1/api_uploads.go index 4e86894d0304fd44d0316d6ff7af9ad7faf6adc6..6f042ef78061ad13cf9fd993ec631c3c13ce0179 100644 --- a/api/v1/api_uploads.go +++ b/api/v1/api_uploads.go @@ -51,7 +51,7 @@ func (api *API) UploadFileSimple() http.Handler { return } - done, err := job.AttachSource(uint64(r.ContentLength), r.Body) + done, err := job.AttachUploader(uint64(r.ContentLength), r.Body) if err != nil { sendError(w, err) return diff --git a/importer/job.go b/importer/job.go index 9bd3c0cbef10e45d588f4800f3c923e6a8c3ece2..1ac6d4c48cfeb936039e57b67912d42d49beefba 100644 --- a/importer/job.go +++ b/importer/job.go @@ -154,7 +154,7 @@ func (job *Job) Start(ctx context.Context, timeout time.Duration) (err error) { return } -func (job *Job) AttachSource(length uint64, r io.Reader) (<-chan *JobSourceResult, error) { +func (job *Job) AttachUploader(len uint64, r io.Reader) (<-chan *JobSourceResult, error) { if state := atomic.LoadUint32((*uint32)(&job.State)); state != uint32(JobRunning) { return nil, ErrImportNotRunning } @@ -166,12 +166,26 @@ func (job *Job) AttachSource(length uint64, r io.Reader) (<-chan *JobSourceResul if ok := atomic.CompareAndSwapUint32(&job.sourceSet, 0, 1); !ok { return nil, ErrSourceAlreadyAttached } - src := newJobSourceUpload(length, r) + src := newJobSourceUpload(len, r) job.source = src close(job.subC.sourceAttached) return src.done, nil } +func (job *Job) GetAttachedUploader() io.Reader { + select { + case <-job.subC.sourceAttached: + default: + return nil + } + + src, ok := job.source.(*JobSourceUpload) + if !ok { + return nil + } + return src.r +} + func (job *Job) Cancel() { oldState := atomic.SwapUint32((*uint32)(&job.State), uint32(JobCanceled)) // this next line is why we need to make sure JobCanceled is smaller than all the other states.