diff --git a/importer/fetch.go b/importer/fetch.go index 89d1a95fdd5a876a6110108b745acc78ee8311d9..1f7ac3181d1b742b5d101ddbca54a5ff2d193cd8 100644 --- a/importer/fetch.go +++ b/importer/fetch.go @@ -39,13 +39,6 @@ type copyResult struct { func (job *Job) fetch() (interface{}, error) { job.Progress.set(StepFetching, 0) - // wait until source is attached - select { - case <-job.ctx.Done(): - return nil, job.ctx.Err() - case <-job.subC.sourceAttached: - } - // job.source is now initialized and points to a valid source // make sure a potentially connected source gets notified in any case defer job.source.Done(nil) diff --git a/importer/job.go b/importer/job.go index 74d05311dc29f4a445bd73e5172223ad3fca1516..1ea975947611698d510dacf499ee23f564008d3b 100644 --- a/importer/job.go +++ b/importer/job.go @@ -82,10 +82,11 @@ func (job *Job) run() error { job.im.dbgLog.Printf("running import for %s/%d from: %s", job.Group, job.ID, job.Source.String()) job.im.store.UpdateFileImportState(job.Group, job.ID, store.ImportRunning) - if err := job.prepareSource(); err != nil { + if err := job.initializeSource(); err != nil { job.im.store.UpdateFileImportState(job.Group, job.ID, store.ImportAborted) return err } + // job.source is now initialized and points to a valid source loudness, err := job.fetch() if err != nil { job.im.store.UpdateFileImportState(job.Group, job.ID, store.ImportAborted) diff --git a/importer/job_source.go b/importer/job_source.go index 344a74595164d513892b99944566f0b2798948a4..80e43971e1dd0f1d453001cdc3ef4d7432262463 100644 --- a/importer/job_source.go +++ b/importer/job_source.go @@ -30,9 +30,15 @@ import ( "time" ) -func (job *Job) prepareSource() error { +func (job *Job) initializeSource() error { if job.Source.Scheme == SourceSchemeAttachment { - // the source will be attached using AttachSource() + // the source will be attached using job.AttachSource() so all we need to do + // is to wait for it to happen + select { + case <-job.ctx.Done(): + return job.ctx.Err() + case <-job.subC.sourceAttached: + } return nil } @@ -41,9 +47,6 @@ func (job *Job) prepareSource() error { switch job.Source.Scheme { case SourceSchemeFlowJS: job.source = newJobSourceFlowJS(job.Source) - - // TODO: implement other sources - case SourceSchemeFake: job.source = newJobSourceFake(job.Source) default: