From 8baee297e5aab287c950345f1e3f8f8dc4c4f68c Mon Sep 17 00:00:00 2001 From: Christian Pointner <equinox@helsinki.at> Date: Mon, 10 Sep 2018 01:09:30 +0200 Subject: [PATCH] added function to retrieved attached uploader --- api/v1/api_uploads.go | 2 +- importer/job.go | 18 ++++++++++++++++-- 2 files changed, 17 insertions(+), 3 deletions(-) diff --git a/api/v1/api_uploads.go b/api/v1/api_uploads.go index 4e86894..6f042ef 100644 --- a/api/v1/api_uploads.go +++ b/api/v1/api_uploads.go @@ -51,7 +51,7 @@ func (api *API) UploadFileSimple() http.Handler { return } - done, err := job.AttachSource(uint64(r.ContentLength), r.Body) + done, err := job.AttachUploader(uint64(r.ContentLength), r.Body) if err != nil { sendError(w, err) return diff --git a/importer/job.go b/importer/job.go index 9bd3c0c..1ac6d4c 100644 --- a/importer/job.go +++ b/importer/job.go @@ -154,7 +154,7 @@ func (job *Job) Start(ctx context.Context, timeout time.Duration) (err error) { return } -func (job *Job) AttachSource(length uint64, r io.Reader) (<-chan *JobSourceResult, error) { +func (job *Job) AttachUploader(len uint64, r io.Reader) (<-chan *JobSourceResult, error) { if state := atomic.LoadUint32((*uint32)(&job.State)); state != uint32(JobRunning) { return nil, ErrImportNotRunning } @@ -166,12 +166,26 @@ func (job *Job) AttachSource(length uint64, r io.Reader) (<-chan *JobSourceResul if ok := atomic.CompareAndSwapUint32(&job.sourceSet, 0, 1); !ok { return nil, ErrSourceAlreadyAttached } - src := newJobSourceUpload(length, r) + src := newJobSourceUpload(len, r) job.source = src close(job.subC.sourceAttached) return src.done, nil } +func (job *Job) GetAttachedUploader() io.Reader { + select { + case <-job.subC.sourceAttached: + default: + return nil + } + + src, ok := job.source.(*JobSourceUpload) + if !ok { + return nil + } + return src.r +} + func (job *Job) Cancel() { oldState := atomic.SwapUint32((*uint32)(&job.State), uint32(JobCanceled)) // this next line is why we need to make sure JobCanceled is smaller than all the other states. -- GitLab