diff --git a/importer/fetch.go b/importer/fetch.go index 756d1f9ddc92115e908fefbbec08d6ade1740be4..45236231407383745f65f1d15c8ceee91769b02a 100644 --- a/importer/fetch.go +++ b/importer/fetch.go @@ -87,13 +87,13 @@ type copyResult struct { hash string } -func (job *Job) fetch() error { +func (job *Job) fetch() (interface{}, error) { job.Progress.set(StepFetching, 0) // wait until source is attached select { case <-job.ctx.Done(): - return job.ctx.Err() + return nil, job.ctx.Err() case <-job.subC.sourceAttached: } // job.source is now initialized and points to a valid source @@ -103,7 +103,7 @@ func (job *Job) fetch() error { conv, err := job.newConverter() if err != nil { job.im.errLog.Printf("fetch(): creating fetch converter failed: %v", err) - return err + return nil, err } // from here on conv.Close() and conv.Wait() has to be called in any case to // reap potential child process zombies @@ -133,18 +133,18 @@ func (job *Job) fetch() error { go conv.Wait() // do the zombie reaping in seperate go routine since we are not interested in the result anyway err = job.ctx.Err() job.source.done <- &JobSourceResult{Err: err} - return err + return nil, err case res = <-done: } conv.Close() - corr, convLog, err := conv.Wait() - job.im.dbgLog.Printf("fetch(): converter returned: %f db, %v", corr, err) + loudness, convLog, err := conv.Wait() + job.im.dbgLog.Printf("fetch(): converter returned: %T, %v", loudness, err) if res.err != nil { err = res.err } job.Progress.set(StepFetching, 1) job.source.done <- &JobSourceResult{err, res.hash, convLog} - return err + return loudness, err } diff --git a/importer/fetch_converter.go b/importer/fetch_converter.go index ee10719c27b447d3fc5e51578134f6d133c079df..9d3192193b1e1e45dd5f2140c95043f0dd01e126 100644 --- a/importer/fetch_converter.go +++ b/importer/fetch_converter.go @@ -25,17 +25,20 @@ package importer import ( + "encoding/json" + "errors" "fmt" "io" "os" "os/exec" "strconv" + "strings" "syscall" ) type converter interface { io.WriteCloser - Wait() (loudnessCorr float64, log JobLog, err error) + Wait() (loudness interface{}, log JobLog, err error) } func (job *Job) newConverter() (converter, error) { @@ -82,12 +85,27 @@ func (c *nullConverter) Close() (err error) { return } -func (c *nullConverter) Wait() (loudnessCorr float64, log JobLog, err error) { - return 0, c.log, nil +func (c *nullConverter) Wait() (loudness interface{}, log JobLog, err error) { + return nil, c.log, nil } //******* FFmpeg +type ffmpegLoudnormParams struct { + InputI string `json:"input_i"` + InputTP string `json:"input_tp"` + InputLRA string `json:"input_lra"` + InputThresh string `json:"input_thresh"` + + OutputI string `json:"output_i"` + OutputTP string `json:"output_tp"` + OutputLRA string `json:"output_lra"` + OutputThresh string `json:"output_thresh"` + + NormalizationType string `json:"normalization_type"` + TargetOffset string `json:"target_offset"` +} + type ffmpegConverter struct { job *Job log *convLog @@ -146,12 +164,40 @@ func (c *ffmpegConverter) Close() (err error) { return c.stdin.Close() } -func (c *ffmpegConverter) Wait() (loudnessCorr float64, log JobLog, err error) { +func (c *ffmpegConverter) fetchLoudnormParams() (*ffmpegLoudnormParams, error) { + // the loudnorm filter posts its result onto stderr + r := NewJobLogReader(c.log.log, "stderr(ffmpeg)") + + // we are looking for a line starting with "[Parsed_loudnorm_0 " so 32 bytes is enough + var buf [32]byte + for { + p := buf[:] + n, err := r.Read(p) // this will always return a single, possibly truncated, line + if err != nil { + if err == io.EOF { + err = errors.New("couldn't find any loudness parameters in ffmpeg output") + } + return nil, err + } + if strings.HasPrefix(string(p[:n]), "[Parsed_loudnorm_0 ") { + break + } + } + + params := &ffmpegLoudnormParams{} + jd := json.NewDecoder(r) + jd.DisallowUnknownFields() + if err := jd.Decode(¶ms); err != nil { + return nil, err + } + return params, nil +} + +func (c *ffmpegConverter) Wait() (loudness interface{}, log JobLog, err error) { c.stdout.Wait() c.stderr.Wait() - err = c.cmd.Wait() - if err != nil { + if err = c.cmd.Wait(); err != nil { if exiterr, ok := err.(*exec.ExitError); ok { if status, ok := exiterr.Sys().(syscall.WaitStatus); ok { exitcode := status.ExitStatus() @@ -165,6 +211,15 @@ func (c *ffmpegConverter) Wait() (loudnessCorr float64, log JobLog, err error) { c.job.im.errLog.Println("ffmpeg-converter: getting exit code of ffmpeg failed:", err) c.log.append("stderr", "ERROR getting exit code of ffmpeg: "+err.Error()) } + + return nil, c.log.log, err + } + + c.job.im.dbgLog.Println("ffmpeg-converter: ffmpeg returned 0 (success)") + c.log.append("stdout", "ffmpeg returned 0 (success)") + + if loudness, err = c.fetchLoudnormParams(); err != nil { + c.log.append("stderr", "ERROR fetching parameters from ffmpeg loudnorm filter: "+err.Error()) } - return 0, c.log.log, err + return loudness, c.log.log, err } diff --git a/importer/fetch_converter_utils.go b/importer/fetch_converter_utils.go index 642c339c21c314ece5370f8bbb453938b85cc884..f2e22e7f4fbe7d0c2334ba60b35cccb631c612da 100644 --- a/importer/fetch_converter_utils.go +++ b/importer/fetch_converter_utils.go @@ -75,7 +75,6 @@ func (l *convLogger) run() { // TODO: print this on application error log?? return } - } func (l *convLogger) Wait() { @@ -86,6 +85,9 @@ func (l *convLogger) Wait() { // - it will only return one line on each invocation even if len(p) would allow us to // return more than one line // - lines longer than len(p) will be truncated +// +// also keep in mind that despite convLog is thread-safe - this reader is not! +// concurrent reads are allowed but the log must not be changed while anyone is reading from it! type JobLogReader struct { log JobLog pos int diff --git a/importer/job.go b/importer/job.go index 9dbca4e5b8acd9eeb409695c76dc2a89b366a4fa..dff10cfa248c035fb28e28d758e0ae6199265df4 100644 --- a/importer/job.go +++ b/importer/job.go @@ -81,12 +81,13 @@ func (job *Job) run() (err error) { job.im.dbgLog.Printf("running import for %s/%d from: %s", job.Group, job.ID, job.Source.String()) job.im.store.UpdateFileImportState(job.Group, job.ID, store.ImportRunning, false) - if err = job.fetch(); err != nil { + var loudness interface{} + if loudness, err = job.fetch(); err != nil { job.im.store.UpdateFileImportState(job.Group, job.ID, store.ImportAborted, false) // send result to all done subscriptions return } - if err = job.normalize(); err != nil { + if err = job.normalize(loudness); err != nil { job.im.store.UpdateFileImportState(job.Group, job.ID, store.ImportAborted, false) // send result to all done subscriptions return diff --git a/importer/normalize.go b/importer/normalize.go index 1aac09c72644adc2fa90e210a557bb787cd69d80..489ad329a3e8513e6656182ee124e5c35f8e124e 100644 --- a/importer/normalize.go +++ b/importer/normalize.go @@ -32,9 +32,11 @@ const ( DEBUG_NORMALIZE_TIME = 150 ) -func (job *Job) normalize() error { +func (job *Job) normalize(loudness interface{}) error { job.Progress.set(StepNormalizing, 0) + job.im.dbgLog.Printf("normalization parameters(%T): %+v", loudness, loudness) + // TODO: actually normalize the file t := time.NewTicker(100 * time.Millisecond) defer t.Stop()