diff --git a/importer/fetch.go b/importer/fetch.go index 1f7ac3181d1b742b5d101ddbca54a5ff2d193cd8..8e20b227a6ce9c534aee4121fa61a5c968ca2c19 100644 --- a/importer/fetch.go +++ b/importer/fetch.go @@ -31,11 +31,30 @@ import ( "golang.org/x/crypto/blake2b" ) -type copyResult struct { +type copyFromSourceResult struct { err error hash string } +func (job *Job) copyFromSource(w io.Writer, done chan<- copyFromSourceResult) { + defer close(done) + + hash, err := blake2b.New256(nil) + if err != nil { + panic("creating hash function failed: " + err.Error()) + } + src := io.TeeReader(job.source, hash) + written, err := io.Copy(&progressWriter{job, StepFetching, job.source.Len(), 0, w}, src) + if err != nil { + done <- copyFromSourceResult{err, ""} + return + } + hashStr := "blake2b_256:" + base64.URLEncoding.EncodeToString(hash.Sum(nil)) + job.im.dbgLog.Printf("fetch(): done copying %d bytes from source (%s)", written, hashStr) + _, err = job.im.store.UpdateFileSourceHash(job.Group, job.ID, hashStr) + done <- copyFromSourceResult{err, hashStr} +} + func (job *Job) fetch() (interface{}, error) { job.Progress.set(StepFetching, 0) @@ -50,25 +69,10 @@ func (job *Job) fetch() (interface{}, error) { // from here on conv.Close() and conv.Wait() has to be called in any case to // reap potential child process zombies - done := make(chan copyResult) - go func() { - hash, err := blake2b.New256(nil) - if err != nil { - panic("creating hash function failed: " + err.Error()) - } - src := io.TeeReader(job.source, hash) - written, err := io.Copy(&progressWriter{job, StepFetching, job.source.Len(), 0, conv}, src) - if err != nil { - done <- copyResult{err, ""} - return - } - hashStr := "blake2b_256:" + base64.URLEncoding.EncodeToString(hash.Sum(nil)) - job.im.dbgLog.Printf("fetch(): done copying %d bytes from source (%s)", written, hashStr) - _, err = job.im.store.UpdateFileSourceHash(job.Group, job.ID, hashStr) - done <- copyResult{err, hashStr} - }() + done := make(chan copyFromSourceResult) + go job.copyFromSource(conv, done) - var res copyResult + var res copyFromSourceResult select { case <-job.ctx.Done(): conv.Close() diff --git a/importer/normalize.go b/importer/normalize.go index 5ebe320a8c4409c103c792ed74f7ff057f3b53de..9d09e899f5dbd40f30de93b0a30725ab4d3a9b9f 100644 --- a/importer/normalize.go +++ b/importer/normalize.go @@ -30,6 +30,28 @@ import ( "path/filepath" ) +func (job *Job) copyToNormalizer(w io.Writer, done chan<- error) { + defer close(done) + + src, err := os.OpenFile(filepath.Join(job.workDir, "source"), os.O_RDONLY, 0400) + if err != nil { + done <- err + return + } + srcStat, err := src.Stat() + if err != nil { + done <- err + return + } + + written, err := io.Copy(&progressWriter{job, StepNormalizing, uint64(srcStat.Size()), 0, w}, src) + if err != nil { + done <- err + return + } + job.im.dbgLog.Printf("normalize(): done copying %d bytes from source", written) +} + func (job *Job) normalize(loudness interface{}) error { job.Progress.set(StepNormalizing, 0) @@ -42,27 +64,7 @@ func (job *Job) normalize(loudness interface{}) error { // reap potential child process zombies done := make(chan error) - go func() { - src, err := os.OpenFile(filepath.Join(job.workDir, "source"), os.O_RDONLY, 0400) - if err != nil { - done <- err - return - } - - srcStat, err := src.Stat() - if err != nil { - done <- err - return - } - - written, err := io.Copy(&progressWriter{job, StepNormalizing, uint64(srcStat.Size()), 0, conv}, src) - if err != nil { - done <- err - return - } - job.im.dbgLog.Printf("normalize(): done copying %d bytes from source", written) - close(done) - }() + go job.copyToNormalizer(conv, done) select { case <-job.ctx.Done():