From 0d125b90aa9915c128da55d75408196625a65ce7 Mon Sep 17 00:00:00 2001 From: Christian Pointner <equinox@helsinki.at> Date: Sun, 9 Sep 2018 15:43:43 +0200 Subject: [PATCH] some cleanup --- importer/fetch.go | 42 ++++++++++++++++++++++------------------- importer/normalize.go | 44 ++++++++++++++++++++++--------------------- 2 files changed, 46 insertions(+), 40 deletions(-) diff --git a/importer/fetch.go b/importer/fetch.go index 1f7ac31..8e20b22 100644 --- a/importer/fetch.go +++ b/importer/fetch.go @@ -31,11 +31,30 @@ import ( "golang.org/x/crypto/blake2b" ) -type copyResult struct { +type copyFromSourceResult struct { err error hash string } +func (job *Job) copyFromSource(w io.Writer, done chan<- copyFromSourceResult) { + defer close(done) + + hash, err := blake2b.New256(nil) + if err != nil { + panic("creating hash function failed: " + err.Error()) + } + src := io.TeeReader(job.source, hash) + written, err := io.Copy(&progressWriter{job, StepFetching, job.source.Len(), 0, w}, src) + if err != nil { + done <- copyFromSourceResult{err, ""} + return + } + hashStr := "blake2b_256:" + base64.URLEncoding.EncodeToString(hash.Sum(nil)) + job.im.dbgLog.Printf("fetch(): done copying %d bytes from source (%s)", written, hashStr) + _, err = job.im.store.UpdateFileSourceHash(job.Group, job.ID, hashStr) + done <- copyFromSourceResult{err, hashStr} +} + func (job *Job) fetch() (interface{}, error) { job.Progress.set(StepFetching, 0) @@ -50,25 +69,10 @@ func (job *Job) fetch() (interface{}, error) { // from here on conv.Close() and conv.Wait() has to be called in any case to // reap potential child process zombies - done := make(chan copyResult) - go func() { - hash, err := blake2b.New256(nil) - if err != nil { - panic("creating hash function failed: " + err.Error()) - } - src := io.TeeReader(job.source, hash) - written, err := io.Copy(&progressWriter{job, StepFetching, job.source.Len(), 0, conv}, src) - if err != nil { - done <- copyResult{err, ""} - return - } - hashStr := "blake2b_256:" + base64.URLEncoding.EncodeToString(hash.Sum(nil)) - job.im.dbgLog.Printf("fetch(): done copying %d bytes from source (%s)", written, hashStr) - _, err = job.im.store.UpdateFileSourceHash(job.Group, job.ID, hashStr) - done <- copyResult{err, hashStr} - }() + done := make(chan copyFromSourceResult) + go job.copyFromSource(conv, done) - var res copyResult + var res copyFromSourceResult select { case <-job.ctx.Done(): conv.Close() diff --git a/importer/normalize.go b/importer/normalize.go index 5ebe320..9d09e89 100644 --- a/importer/normalize.go +++ b/importer/normalize.go @@ -30,6 +30,28 @@ import ( "path/filepath" ) +func (job *Job) copyToNormalizer(w io.Writer, done chan<- error) { + defer close(done) + + src, err := os.OpenFile(filepath.Join(job.workDir, "source"), os.O_RDONLY, 0400) + if err != nil { + done <- err + return + } + srcStat, err := src.Stat() + if err != nil { + done <- err + return + } + + written, err := io.Copy(&progressWriter{job, StepNormalizing, uint64(srcStat.Size()), 0, w}, src) + if err != nil { + done <- err + return + } + job.im.dbgLog.Printf("normalize(): done copying %d bytes from source", written) +} + func (job *Job) normalize(loudness interface{}) error { job.Progress.set(StepNormalizing, 0) @@ -42,27 +64,7 @@ func (job *Job) normalize(loudness interface{}) error { // reap potential child process zombies done := make(chan error) - go func() { - src, err := os.OpenFile(filepath.Join(job.workDir, "source"), os.O_RDONLY, 0400) - if err != nil { - done <- err - return - } - - srcStat, err := src.Stat() - if err != nil { - done <- err - return - } - - written, err := io.Copy(&progressWriter{job, StepNormalizing, uint64(srcStat.Size()), 0, conv}, src) - if err != nil { - done <- err - return - } - job.im.dbgLog.Printf("normalize(): done copying %d bytes from source", written) - close(done) - }() + go job.copyToNormalizer(conv, done) select { case <-job.ctx.Done(): -- GitLab