diff --git a/importer/fetch_converter_utils.go b/importer/converter_utils.go similarity index 76% rename from importer/fetch_converter_utils.go rename to importer/converter_utils.go index f2e22e7f4fbe7d0c2334ba60b35cccb631c612da..468f823fcfc0a1e834ad3071e8142220e2464e7a 100644 --- a/importer/fetch_converter_utils.go +++ b/importer/converter_utils.go @@ -30,6 +30,38 @@ import ( "sync" ) +type ffmpegLoudnormParams struct { + InputI string `json:"input_i"` + InputTP string `json:"input_tp"` + InputLRA string `json:"input_lra"` + InputThresh string `json:"input_thresh"` + + OutputI string `json:"output_i"` + OutputTP string `json:"output_tp"` + OutputLRA string `json:"output_lra"` + OutputThresh string `json:"output_thresh"` + + NormalizationType string `json:"normalization_type"` + TargetOffset string `json:"target_offset"` +} + +type progressWriter struct { + job *Job + step JobProgressStep + total uint64 + written uint64 + w io.Writer +} + +func (pw *progressWriter) Write(p []byte) (n int, err error) { + n, err = pw.w.Write(p) + if n > 0 { + pw.written += uint64(n) + } + pw.job.Progress.set(pw.step, float32(pw.written)/float32(pw.total)) + return +} + type convLog struct { log JobLog m *sync.Mutex diff --git a/importer/fetch_converter_utils_test.go b/importer/converter_utils_test.go similarity index 100% rename from importer/fetch_converter_utils_test.go rename to importer/converter_utils_test.go diff --git a/importer/fetch.go b/importer/fetch.go index 45236231407383745f65f1d15c8ceee91769b02a..61d5aa422ddf32cce118e390d8a37fc5947b22fe 100644 --- a/importer/fetch.go +++ b/importer/fetch.go @@ -67,21 +67,6 @@ func (job *Job) prepareSource() { } } -type progressWriter struct { - job *Job - written uint64 - w io.Writer -} - -func (pw *progressWriter) Write(p []byte) (n int, err error) { - n, err = pw.w.Write(p) - if n > 0 { - pw.written += uint64(n) - } - pw.job.Progress.set(StepFetching, float32(pw.written)/float32(pw.job.source.len)) - return -} - type copyResult struct { err error hash string @@ -100,7 +85,7 @@ func (job *Job) fetch() (interface{}, error) { // make sure a potentially connected source gets notified in any case defer close(job.source.done) - conv, err := job.newConverter() + conv, err := job.newFetchConverter() if err != nil { job.im.errLog.Printf("fetch(): creating fetch converter failed: %v", err) return nil, err @@ -115,7 +100,7 @@ func (job *Job) fetch() (interface{}, error) { panic("creating hash function failed: " + err.Error()) } src := io.TeeReader(job.source.r, hash) - written, err := io.Copy(&progressWriter{job, 0, conv}, src) + written, err := io.Copy(&progressWriter{job, StepFetching, job.source.len, 0, conv}, src) if err != nil { done <- copyResult{err, ""} return diff --git a/importer/fetch_converter.go b/importer/fetch_converter.go index 9d3192193b1e1e45dd5f2140c95043f0dd01e126..f99380456ae71c555f25aa2b707500caf38b69a0 100644 --- a/importer/fetch_converter.go +++ b/importer/fetch_converter.go @@ -31,51 +31,51 @@ import ( "io" "os" "os/exec" - "strconv" + "path/filepath" "strings" "syscall" ) -type converter interface { +type fetchConverter interface { io.WriteCloser - Wait() (loudness interface{}, log JobLog, err error) + Wait() (interface{}, JobLog, error) } -func (job *Job) newConverter() (converter, error) { +func (job *Job) newFetchConverter() (fetchConverter, error) { switch job.im.conf.Converter { case ConvNull: - return newNullConverter(job) + return newNullFetchConverter(job) case ConvFFmpeg: - return newFFmpegConverter(job) + return newFFmpegFetchConverter(job) } panic("invalid fetch converter") } //******* null -type nullConverter struct { +type nullFetchConverter struct { job *Job file *os.File log JobLog } -func newNullConverter(job *Job) (c *nullConverter, err error) { - c = &nullConverter{job: job} - filename := job.im.store.GetFilePath(job.Group, job.ID) - job.im.dbgLog.Printf("null-converter: opened file '%s'", filename) +func newNullFetchConverter(job *Job) (c *nullFetchConverter, err error) { + c = &nullFetchConverter{job: job} + filename := filepath.Join(job.workDir, "source") + job.im.dbgLog.Printf("null-converter: opening file '%s'", filename) if c.file, err = os.OpenFile(filename, os.O_WRONLY|os.O_CREATE|os.O_EXCL, 0600); err != nil { c.log.append("stderr", "ERROR opening file failed: "+err.Error()) - } else { - c.log.append("stdout", "successfully opened file: "+filename) + return } + c.log.append("stdout", "successfully opened file: "+filename) return } -func (c *nullConverter) Write(p []byte) (n int, err error) { +func (c *nullFetchConverter) Write(p []byte) (n int, err error) { return c.file.Write(p) } -func (c *nullConverter) Close() (err error) { +func (c *nullFetchConverter) Close() (err error) { filename := c.file.Name() if err = c.file.Close(); err != nil { c.log.append("stderr", "ERROR closing file failed: "+err.Error()) @@ -85,28 +85,13 @@ func (c *nullConverter) Close() (err error) { return } -func (c *nullConverter) Wait() (loudness interface{}, log JobLog, err error) { +func (c *nullFetchConverter) Wait() (loudness interface{}, log JobLog, err error) { return nil, c.log, nil } //******* FFmpeg -type ffmpegLoudnormParams struct { - InputI string `json:"input_i"` - InputTP string `json:"input_tp"` - InputLRA string `json:"input_lra"` - InputThresh string `json:"input_thresh"` - - OutputI string `json:"output_i"` - OutputTP string `json:"output_tp"` - OutputLRA string `json:"output_lra"` - OutputThresh string `json:"output_thresh"` - - NormalizationType string `json:"normalization_type"` - TargetOffset string `json:"target_offset"` -} - -type ffmpegConverter struct { +type ffmpegFetchConverter struct { job *Job log *convLog cmd *exec.Cmd @@ -115,20 +100,19 @@ type ffmpegConverter struct { stderr *convLogger } -func newFFmpegConverter(job *Job) (c *ffmpegConverter, err error) { - c = &ffmpegConverter{job: job} +func newFFmpegFetchConverter(job *Job) (c *ffmpegFetchConverter, err error) { + c = &ffmpegFetchConverter{job: job} c.log = newConvLog() - filename := job.im.store.GetFilePath(job.Group, job.ID) + filename := filepath.Join(job.workDir, "source") job.im.dbgLog.Printf("ffmpeg-converter: starting ffmpeg for file '%s'", filename) // c.cmd = exec.CommandContext(job.ctx, "ffmpeg", "-hide_banner", "-nostats", "-i", "-") c.cmd = exec.CommandContext(job.ctx, "ffmpeg", "-nostats", "-y", "-i", "-") c.cmd.Args = append(c.cmd.Args, "-map_metadata", "0", "-vn") - c.cmd.Args = append(c.cmd.Args, "-ar", strconv.FormatUint(uint64(job.im.store.Audio.SampleRate), 10)) - c.cmd.Args = append(c.cmd.Args, "-f", job.im.store.Audio.Format.String(), filename) + c.cmd.Args = append(c.cmd.Args, "-ar", "192k", "-codec:a", "pcm_s24le") + c.cmd.Args = append(c.cmd.Args, "-f", "wav", filename) // loudness normalization, see: http://k.ylo.ph/2016/04/04/loudnorm.html c.cmd.Args = append(c.cmd.Args, "-map_metadata", "-1", "-vn") - c.cmd.Args = append(c.cmd.Args, "-ar", "192k") c.cmd.Args = append(c.cmd.Args, "-filter:a", "loudnorm=print_format=json:dual_mono=true") c.cmd.Args = append(c.cmd.Args, "-f", "null", "/dev/null") @@ -156,15 +140,15 @@ func newFFmpegConverter(job *Job) (c *ffmpegConverter, err error) { return } -func (c *ffmpegConverter) Write(p []byte) (n int, err error) { +func (c *ffmpegFetchConverter) Write(p []byte) (n int, err error) { return c.stdin.Write(p) } -func (c *ffmpegConverter) Close() (err error) { +func (c *ffmpegFetchConverter) Close() (err error) { return c.stdin.Close() } -func (c *ffmpegConverter) fetchLoudnormParams() (*ffmpegLoudnormParams, error) { +func (c *ffmpegFetchConverter) fetchLoudnormParams() (*ffmpegLoudnormParams, error) { // the loudnorm filter posts its result onto stderr r := NewJobLogReader(c.log.log, "stderr(ffmpeg)") @@ -193,7 +177,7 @@ func (c *ffmpegConverter) fetchLoudnormParams() (*ffmpegLoudnormParams, error) { return params, nil } -func (c *ffmpegConverter) Wait() (loudness interface{}, log JobLog, err error) { +func (c *ffmpegFetchConverter) Wait() (loudness interface{}, log JobLog, err error) { c.stdout.Wait() c.stderr.Wait() diff --git a/importer/job.go b/importer/job.go index dff10cfa248c035fb28e28d758e0ae6199265df4..f7123e08285d595360d38543142fdf92d98fee48 100644 --- a/importer/job.go +++ b/importer/job.go @@ -27,7 +27,9 @@ package importer import ( "context" "io" + "io/ioutil" "net/url" + "os" "sync/atomic" "time" "unsafe" @@ -50,6 +52,7 @@ type Job struct { RefID string `json:"ref-id,omitempty"` Progress JobProgress `json:"progress"` source *JobSource + workDir string subC struct { sourceAttached chan struct{} running chan struct{} @@ -59,7 +62,7 @@ type Job struct { type Jobs []*Job -func (job *Job) run() (err error) { +func (job *Job) run() error { // this function must never-ever hang or we block a worker forever - watch your context kids!! // when this function returns the job is either done or aborted... @@ -81,20 +84,20 @@ func (job *Job) run() (err error) { job.im.dbgLog.Printf("running import for %s/%d from: %s", job.Group, job.ID, job.Source.String()) job.im.store.UpdateFileImportState(job.Group, job.ID, store.ImportRunning, false) - var loudness interface{} - if loudness, err = job.fetch(); err != nil { + loudness, err := job.fetch() + if err != nil { job.im.store.UpdateFileImportState(job.Group, job.ID, store.ImportAborted, false) // send result to all done subscriptions - return + return err } if err = job.normalize(loudness); err != nil { job.im.store.UpdateFileImportState(job.Group, job.ID, store.ImportAborted, false) // send result to all done subscriptions - return + return err } job.im.store.UpdateFileImportState(job.Group, job.ID, store.ImportDone, true) // send result to all done subscriptions - return + return err } // Start adds job to the work queue of the importer. If timeout is > 0 it will create a context @@ -103,14 +106,19 @@ func (job *Job) run() (err error) { // context.Context requires you to cleanup resources with after all work is done by calling it's // CancelFunc. This is not easy to do from stateless interfaces. In this case use context.Background() // and set the timeout to a appropiate value. The job will then take care of freeing up the resources. -func (job *Job) Start(ctx context.Context, timeout time.Duration) error { +func (job *Job) Start(ctx context.Context, timeout time.Duration) (err error) { if !atomic.CompareAndSwapUint32((*uint32)(&job.State), uint32(JobNew), uint32(JobInitializing)) { if atomic.LoadUint32((*uint32)(&job.State)) == uint32(JobCanceled) { // the job has already been canceled and thus should just be removed from the inventory job.cleanup() return ErrAlreadyCanceled } - return nil + return + } + + if job.workDir, err = ioutil.TempDir(job.im.conf.TempPath, "tank-job-"); err != nil { + job.cleanup() + return } // shall we use a global context here so we can cancel all jobs at once? i.e. for forced shutdown... @@ -143,7 +151,7 @@ func (job *Job) Start(ctx context.Context, timeout time.Duration) error { } // the job was successfully enqueued now and awaits a worker to call job.run(). - return nil + return } func (job *Job) AttachSource(length uint64, r io.Reader) (<-chan *JobSourceResult, error) { @@ -185,6 +193,12 @@ func (job *Job) cleanup() { // we are not really interested in the result of the deleteJob here, // the only possible error would be ErrNotFound anyway... no need to wait for that. go job.im.deleteJob(job.Group, job.ID) + + if job.workDir != "" { + if err := os.RemoveAll(job.workDir); err != nil { + job.im.errLog.Printf("failed to remove jobs work directory: %v", err) + } + } } func newJob(im *Importer, group string, id uint64, src url.URL, user, refID string) *Job { diff --git a/importer/normalize.go b/importer/normalize.go index 489ad329a3e8513e6656182ee124e5c35f8e124e..d8d404add255d18622470ce9cca63fef4afa71b7 100644 --- a/importer/normalize.go +++ b/importer/normalize.go @@ -25,36 +25,63 @@ package importer import ( - "time" -) - -const ( - DEBUG_NORMALIZE_TIME = 150 + "io" + "os" + "path/filepath" ) func (job *Job) normalize(loudness interface{}) error { job.Progress.set(StepNormalizing, 0) - job.im.dbgLog.Printf("normalization parameters(%T): %+v", loudness, loudness) - - // TODO: actually normalize the file - t := time.NewTicker(100 * time.Millisecond) - defer t.Stop() - cnt := 0 -normalizeLoop: - for { - select { - case <-job.ctx.Done(): - return job.ctx.Err() - case <-t.C: - cnt++ - if cnt > DEBUG_NORMALIZE_TIME { - break normalizeLoop - } - job.Progress.set(StepNormalizing, float32(cnt)/DEBUG_NORMALIZE_TIME) + conv, err := job.newNormalizeConverter(loudness) + if err != nil { + job.im.errLog.Printf("normalize(): creating normalize converter failed: %v", err) + return err + } + // from here on conv.Close() and conv.Wait() has to be called in any case to + // reap potential child process zombies + + done := make(chan error) + go func() { + src, err := os.OpenFile(filepath.Join(job.workDir, "source"), os.O_RDONLY, 0400) + if err != nil { + done <- err + return + } + + srcStat, err := src.Stat() + if err != nil || srcStat.Size() < 0 { + done <- err + return } + + written, err := io.Copy(&progressWriter{job, StepNormalizing, 0, uint64(srcStat.Size()), conv}, src) + if err != nil { + done <- err + return + } + job.im.dbgLog.Printf("normalize(): done copying %d bytes from source", written) + close(done) + }() + + select { + case <-job.ctx.Done(): + conv.Close() + go conv.Wait() // do the zombie reaping in seperate go routine since we are not interested in the result anyway + return job.ctx.Err() + case err = <-done: } + conv.Close() + convLog, convErr := conv.Wait() + job.im.dbgLog.Printf("normalize(): converter returned: %v", err) + if err == nil { + err = convErr + } + + for _, l := range convLog { + job.im.dbgLog.Println(l.Line) + } job.Progress.set(StepNormalizing, 1) - return nil + return err } diff --git a/importer/normalize_converter.go b/importer/normalize_converter.go new file mode 100644 index 0000000000000000000000000000000000000000..edc601b736d258af3f7991a1455f23bee88d5aa9 --- /dev/null +++ b/importer/normalize_converter.go @@ -0,0 +1,140 @@ +// +// tank +// +// Import and Playlist Daemon for autoradio project +// +// +// Copyright (C) 2017-2018 Christian Pointner <equinox@helsinki.at> +// +// This file is part of tank. +// +// tank is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// any later version. +// +// tank is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with tank. If not, see <http://www.gnu.org/licenses/>. +// + +package importer + +import ( + "fmt" + "io" + "os/exec" + "strconv" + "syscall" +) + +type normalizeConverter interface { + io.WriteCloser + Wait() (log JobLog, err error) +} + +func (job *Job) newNormalizeConverter(loudness interface{}) (normalizeConverter, error) { + if loudness == nil { + // make default converter configurable? + return newFFmpegNormalizeConverter(job, nil) + } + + switch loudness.(type) { + case *ffmpegLoudnormParams: + return newFFmpegNormalizeConverter(job, loudness.(*ffmpegLoudnormParams)) + } + + return nil, fmt.Errorf("unknown loudness parameter type: %T", loudness) +} + +//******* FFmpeg + +type ffmpegNormalizeConverter struct { + job *Job + log *convLog + cmd *exec.Cmd + stdin io.WriteCloser + stdout *convLogger + stderr *convLogger +} + +func newFFmpegNormalizeConverter(job *Job, params *ffmpegLoudnormParams) (c *ffmpegNormalizeConverter, err error) { + c = &ffmpegNormalizeConverter{job: job} + c.log = newConvLog() + filename := job.im.store.GetFilePath(job.Group, job.ID) + job.im.dbgLog.Printf("ffmpeg-converter: starting ffmpeg for file '%s'", filename) + + // c.cmd = exec.CommandContext(job.ctx, "ffmpeg", "-hide_banner", "-nostats", "-i", "-") + c.cmd = exec.CommandContext(job.ctx, "ffmpeg", "-nostats", "-y", "-i", "-") + c.cmd.Args = append(c.cmd.Args, "-map_metadata", "0", "-vn") + if params != nil { + // loudness normalization, see: http://k.ylo.ph/2016/04/04/loudnorm.html + params_encoded := fmt.Sprintf("measured_I=%s:measured_LRA=%s:measured_TP=%s:measured_thresh=%s:offset=%s", + params.InputI, params.InputLRA, params.InputTP, params.InputThresh, params.TargetOffset) + c.cmd.Args = append(c.cmd.Args, "-filter:a", "loudnorm="+params_encoded+":print_format=summary:dual_mono=true") + } + c.cmd.Args = append(c.cmd.Args, "-ar", strconv.FormatUint(uint64(job.im.store.Audio.SampleRate), 10)) + c.cmd.Args = append(c.cmd.Args, "-f", job.im.store.Audio.Format.String(), filename) + + if c.stdin, err = c.cmd.StdinPipe(); err != nil { + c.log.append("stderr", "ERROR opening stdin pipe: "+err.Error()) + return nil, err + } + + var stdout, stderr io.Reader + if stdout, err = c.cmd.StdoutPipe(); err != nil { + c.log.append("stderr", "ERROR opening stdout pipe: "+err.Error()) + return nil, err + } + c.stdout = newConvLogger(c.log, "stdout(ffmpeg)", stdout) + if stderr, err = c.cmd.StderrPipe(); err != nil { + c.log.append("stderr", "ERROR opening stderr pipe: "+err.Error()) + return nil, err + } + c.stderr = newConvLogger(c.log, "stderr(ffmpeg)", stderr) + + if err = c.cmd.Start(); err != nil { + c.log.append("stderr", "ERROR starting ffmpeg: "+err.Error()) + return nil, err + } + return +} + +func (c *ffmpegNormalizeConverter) Write(p []byte) (n int, err error) { + return c.stdin.Write(p) +} + +func (c *ffmpegNormalizeConverter) Close() (err error) { + return c.stdin.Close() +} + +func (c *ffmpegNormalizeConverter) Wait() (log JobLog, err error) { + c.stdout.Wait() + c.stderr.Wait() + + if err = c.cmd.Wait(); err != nil { + if exiterr, ok := err.(*exec.ExitError); ok { + if status, ok := exiterr.Sys().(syscall.WaitStatus); ok { + exitcode := status.ExitStatus() + c.job.im.dbgLog.Printf("ffmpeg-converter: ffmpeg returned %d", exitcode) + c.log.append("stdout", fmt.Sprintf("ffmpeg returned %d", exitcode)) + } else { + c.job.im.errLog.Println("ffmpeg-converter: getting exit code of ffmpeg failed:", err) + c.log.append("stderr", "ERROR getting exit code of ffmpeg: "+err.Error()) + } + } else { + c.job.im.errLog.Println("ffmpeg-converter: getting exit code of ffmpeg failed:", err) + c.log.append("stderr", "ERROR getting exit code of ffmpeg: "+err.Error()) + } + + return c.log.log, err + } + + c.job.im.dbgLog.Println("ffmpeg-converter: ffmpeg returned 0 (success)") + c.log.append("stdout", "ffmpeg returned 0 (success)") + return c.log.log, err +}