From 4e1ef17f18dbf81627ce875c651f89bc0be0a916 Mon Sep 17 00:00:00 2001
From: Christian Pointner <equinox@helsinki.at>
Date: Sat, 1 Sep 2018 23:55:32 +0200
Subject: [PATCH] initial version if normalizer

---
 ..._converter_utils.go => converter_utils.go} |  32 ++++
 ..._utils_test.go => converter_utils_test.go} |   0
 importer/fetch.go                             |  19 +--
 importer/fetch_converter.go                   |  68 ++++-----
 importer/job.go                               |  32 ++--
 importer/normalize.go                         |  73 ++++++---
 importer/normalize_converter.go               | 140 ++++++++++++++++++
 7 files changed, 273 insertions(+), 91 deletions(-)
 rename importer/{fetch_converter_utils.go => converter_utils.go} (76%)
 rename importer/{fetch_converter_utils_test.go => converter_utils_test.go} (100%)
 create mode 100644 importer/normalize_converter.go

diff --git a/importer/fetch_converter_utils.go b/importer/converter_utils.go
similarity index 76%
rename from importer/fetch_converter_utils.go
rename to importer/converter_utils.go
index f2e22e7..468f823 100644
--- a/importer/fetch_converter_utils.go
+++ b/importer/converter_utils.go
@@ -30,6 +30,38 @@ import (
 	"sync"
 )
 
+type ffmpegLoudnormParams struct {
+	InputI      string `json:"input_i"`
+	InputTP     string `json:"input_tp"`
+	InputLRA    string `json:"input_lra"`
+	InputThresh string `json:"input_thresh"`
+
+	OutputI      string `json:"output_i"`
+	OutputTP     string `json:"output_tp"`
+	OutputLRA    string `json:"output_lra"`
+	OutputThresh string `json:"output_thresh"`
+
+	NormalizationType string `json:"normalization_type"`
+	TargetOffset      string `json:"target_offset"`
+}
+
+type progressWriter struct {
+	job     *Job
+	step    JobProgressStep
+	total   uint64
+	written uint64
+	w       io.Writer
+}
+
+func (pw *progressWriter) Write(p []byte) (n int, err error) {
+	n, err = pw.w.Write(p)
+	if n > 0 {
+		pw.written += uint64(n)
+	}
+	pw.job.Progress.set(pw.step, float32(pw.written)/float32(pw.total))
+	return
+}
+
 type convLog struct {
 	log JobLog
 	m   *sync.Mutex
diff --git a/importer/fetch_converter_utils_test.go b/importer/converter_utils_test.go
similarity index 100%
rename from importer/fetch_converter_utils_test.go
rename to importer/converter_utils_test.go
diff --git a/importer/fetch.go b/importer/fetch.go
index 4523623..61d5aa4 100644
--- a/importer/fetch.go
+++ b/importer/fetch.go
@@ -67,21 +67,6 @@ func (job *Job) prepareSource() {
 	}
 }
 
-type progressWriter struct {
-	job     *Job
-	written uint64
-	w       io.Writer
-}
-
-func (pw *progressWriter) Write(p []byte) (n int, err error) {
-	n, err = pw.w.Write(p)
-	if n > 0 {
-		pw.written += uint64(n)
-	}
-	pw.job.Progress.set(StepFetching, float32(pw.written)/float32(pw.job.source.len))
-	return
-}
-
 type copyResult struct {
 	err  error
 	hash string
@@ -100,7 +85,7 @@ func (job *Job) fetch() (interface{}, error) {
 	// make sure a potentially connected source gets notified in any case
 	defer close(job.source.done)
 
-	conv, err := job.newConverter()
+	conv, err := job.newFetchConverter()
 	if err != nil {
 		job.im.errLog.Printf("fetch(): creating fetch converter failed: %v", err)
 		return nil, err
@@ -115,7 +100,7 @@ func (job *Job) fetch() (interface{}, error) {
 			panic("creating hash function failed: " + err.Error())
 		}
 		src := io.TeeReader(job.source.r, hash)
-		written, err := io.Copy(&progressWriter{job, 0, conv}, src)
+		written, err := io.Copy(&progressWriter{job, StepFetching, job.source.len, 0, conv}, src)
 		if err != nil {
 			done <- copyResult{err, ""}
 			return
diff --git a/importer/fetch_converter.go b/importer/fetch_converter.go
index 9d31921..f993804 100644
--- a/importer/fetch_converter.go
+++ b/importer/fetch_converter.go
@@ -31,51 +31,51 @@ import (
 	"io"
 	"os"
 	"os/exec"
-	"strconv"
+	"path/filepath"
 	"strings"
 	"syscall"
 )
 
-type converter interface {
+type fetchConverter interface {
 	io.WriteCloser
-	Wait() (loudness interface{}, log JobLog, err error)
+	Wait() (interface{}, JobLog, error)
 }
 
-func (job *Job) newConverter() (converter, error) {
+func (job *Job) newFetchConverter() (fetchConverter, error) {
 	switch job.im.conf.Converter {
 	case ConvNull:
-		return newNullConverter(job)
+		return newNullFetchConverter(job)
 	case ConvFFmpeg:
-		return newFFmpegConverter(job)
+		return newFFmpegFetchConverter(job)
 	}
 	panic("invalid fetch converter")
 }
 
 //******* null
 
-type nullConverter struct {
+type nullFetchConverter struct {
 	job  *Job
 	file *os.File
 	log  JobLog
 }
 
-func newNullConverter(job *Job) (c *nullConverter, err error) {
-	c = &nullConverter{job: job}
-	filename := job.im.store.GetFilePath(job.Group, job.ID)
-	job.im.dbgLog.Printf("null-converter: opened file '%s'", filename)
+func newNullFetchConverter(job *Job) (c *nullFetchConverter, err error) {
+	c = &nullFetchConverter{job: job}
+	filename := filepath.Join(job.workDir, "source")
+	job.im.dbgLog.Printf("null-converter: opening file '%s'", filename)
 	if c.file, err = os.OpenFile(filename, os.O_WRONLY|os.O_CREATE|os.O_EXCL, 0600); err != nil {
 		c.log.append("stderr", "ERROR opening file failed: "+err.Error())
-	} else {
-		c.log.append("stdout", "successfully opened file: "+filename)
+		return
 	}
+	c.log.append("stdout", "successfully opened file: "+filename)
 	return
 }
 
-func (c *nullConverter) Write(p []byte) (n int, err error) {
+func (c *nullFetchConverter) Write(p []byte) (n int, err error) {
 	return c.file.Write(p)
 }
 
-func (c *nullConverter) Close() (err error) {
+func (c *nullFetchConverter) Close() (err error) {
 	filename := c.file.Name()
 	if err = c.file.Close(); err != nil {
 		c.log.append("stderr", "ERROR closing file failed: "+err.Error())
@@ -85,28 +85,13 @@ func (c *nullConverter) Close() (err error) {
 	return
 }
 
-func (c *nullConverter) Wait() (loudness interface{}, log JobLog, err error) {
+func (c *nullFetchConverter) Wait() (loudness interface{}, log JobLog, err error) {
 	return nil, c.log, nil
 }
 
 //******* FFmpeg
 
-type ffmpegLoudnormParams struct {
-	InputI      string `json:"input_i"`
-	InputTP     string `json:"input_tp"`
-	InputLRA    string `json:"input_lra"`
-	InputThresh string `json:"input_thresh"`
-
-	OutputI      string `json:"output_i"`
-	OutputTP     string `json:"output_tp"`
-	OutputLRA    string `json:"output_lra"`
-	OutputThresh string `json:"output_thresh"`
-
-	NormalizationType string `json:"normalization_type"`
-	TargetOffset      string `json:"target_offset"`
-}
-
-type ffmpegConverter struct {
+type ffmpegFetchConverter struct {
 	job    *Job
 	log    *convLog
 	cmd    *exec.Cmd
@@ -115,20 +100,19 @@ type ffmpegConverter struct {
 	stderr *convLogger
 }
 
-func newFFmpegConverter(job *Job) (c *ffmpegConverter, err error) {
-	c = &ffmpegConverter{job: job}
+func newFFmpegFetchConverter(job *Job) (c *ffmpegFetchConverter, err error) {
+	c = &ffmpegFetchConverter{job: job}
 	c.log = newConvLog()
-	filename := job.im.store.GetFilePath(job.Group, job.ID)
+	filename := filepath.Join(job.workDir, "source")
 	job.im.dbgLog.Printf("ffmpeg-converter: starting ffmpeg for file '%s'", filename)
 
 	// c.cmd = exec.CommandContext(job.ctx, "ffmpeg", "-hide_banner", "-nostats", "-i", "-")
 	c.cmd = exec.CommandContext(job.ctx, "ffmpeg", "-nostats", "-y", "-i", "-")
 	c.cmd.Args = append(c.cmd.Args, "-map_metadata", "0", "-vn")
-	c.cmd.Args = append(c.cmd.Args, "-ar", strconv.FormatUint(uint64(job.im.store.Audio.SampleRate), 10))
-	c.cmd.Args = append(c.cmd.Args, "-f", job.im.store.Audio.Format.String(), filename)
+	c.cmd.Args = append(c.cmd.Args, "-ar", "192k", "-codec:a", "pcm_s24le")
+	c.cmd.Args = append(c.cmd.Args, "-f", "wav", filename)
 	// loudness normalization, see: http://k.ylo.ph/2016/04/04/loudnorm.html
 	c.cmd.Args = append(c.cmd.Args, "-map_metadata", "-1", "-vn")
-	c.cmd.Args = append(c.cmd.Args, "-ar", "192k")
 	c.cmd.Args = append(c.cmd.Args, "-filter:a", "loudnorm=print_format=json:dual_mono=true")
 	c.cmd.Args = append(c.cmd.Args, "-f", "null", "/dev/null")
 
@@ -156,15 +140,15 @@ func newFFmpegConverter(job *Job) (c *ffmpegConverter, err error) {
 	return
 }
 
-func (c *ffmpegConverter) Write(p []byte) (n int, err error) {
+func (c *ffmpegFetchConverter) Write(p []byte) (n int, err error) {
 	return c.stdin.Write(p)
 }
 
-func (c *ffmpegConverter) Close() (err error) {
+func (c *ffmpegFetchConverter) Close() (err error) {
 	return c.stdin.Close()
 }
 
-func (c *ffmpegConverter) fetchLoudnormParams() (*ffmpegLoudnormParams, error) {
+func (c *ffmpegFetchConverter) fetchLoudnormParams() (*ffmpegLoudnormParams, error) {
 	// the loudnorm filter posts its result onto stderr
 	r := NewJobLogReader(c.log.log, "stderr(ffmpeg)")
 
@@ -193,7 +177,7 @@ func (c *ffmpegConverter) fetchLoudnormParams() (*ffmpegLoudnormParams, error) {
 	return params, nil
 }
 
-func (c *ffmpegConverter) Wait() (loudness interface{}, log JobLog, err error) {
+func (c *ffmpegFetchConverter) Wait() (loudness interface{}, log JobLog, err error) {
 	c.stdout.Wait()
 	c.stderr.Wait()
 
diff --git a/importer/job.go b/importer/job.go
index dff10cf..f7123e0 100644
--- a/importer/job.go
+++ b/importer/job.go
@@ -27,7 +27,9 @@ package importer
 import (
 	"context"
 	"io"
+	"io/ioutil"
 	"net/url"
+	"os"
 	"sync/atomic"
 	"time"
 	"unsafe"
@@ -50,6 +52,7 @@ type Job struct {
 	RefID     string      `json:"ref-id,omitempty"`
 	Progress  JobProgress `json:"progress"`
 	source    *JobSource
+	workDir   string
 	subC      struct {
 		sourceAttached chan struct{}
 		running        chan struct{}
@@ -59,7 +62,7 @@ type Job struct {
 
 type Jobs []*Job
 
-func (job *Job) run() (err error) {
+func (job *Job) run() error {
 	// this function must never-ever hang or we block a worker forever - watch your context kids!!
 
 	// when this function returns the job is either done or aborted...
@@ -81,20 +84,20 @@ func (job *Job) run() (err error) {
 	job.im.dbgLog.Printf("running import for %s/%d from: %s", job.Group, job.ID, job.Source.String())
 
 	job.im.store.UpdateFileImportState(job.Group, job.ID, store.ImportRunning, false)
-	var loudness interface{}
-	if loudness, err = job.fetch(); err != nil {
+	loudness, err := job.fetch()
+	if err != nil {
 		job.im.store.UpdateFileImportState(job.Group, job.ID, store.ImportAborted, false)
 		// send result to all done subscriptions
-		return
+		return err
 	}
 	if err = job.normalize(loudness); err != nil {
 		job.im.store.UpdateFileImportState(job.Group, job.ID, store.ImportAborted, false)
 		// send result to all done subscriptions
-		return
+		return err
 	}
 	job.im.store.UpdateFileImportState(job.Group, job.ID, store.ImportDone, true)
 	// send result to all done subscriptions
-	return
+	return err
 }
 
 // Start adds job to the work queue of the importer. If timeout is > 0 it will create a context
@@ -103,14 +106,19 @@ func (job *Job) run() (err error) {
 // context.Context requires you to cleanup resources with after all work is done by calling it's
 // CancelFunc. This is not easy to do from stateless interfaces. In this case use context.Background()
 // and set the timeout to a appropiate value. The job will then take care of freeing up the resources.
-func (job *Job) Start(ctx context.Context, timeout time.Duration) error {
+func (job *Job) Start(ctx context.Context, timeout time.Duration) (err error) {
 	if !atomic.CompareAndSwapUint32((*uint32)(&job.State), uint32(JobNew), uint32(JobInitializing)) {
 		if atomic.LoadUint32((*uint32)(&job.State)) == uint32(JobCanceled) {
 			// the job has already been canceled and thus should just be removed from the inventory
 			job.cleanup()
 			return ErrAlreadyCanceled
 		}
-		return nil
+		return
+	}
+
+	if job.workDir, err = ioutil.TempDir(job.im.conf.TempPath, "tank-job-"); err != nil {
+		job.cleanup()
+		return
 	}
 
 	// shall we use a global context here so we can cancel all jobs at once? i.e. for forced shutdown...
@@ -143,7 +151,7 @@ func (job *Job) Start(ctx context.Context, timeout time.Duration) error {
 	}
 
 	// the job was successfully enqueued now and awaits a worker to call job.run().
-	return nil
+	return
 }
 
 func (job *Job) AttachSource(length uint64, r io.Reader) (<-chan *JobSourceResult, error) {
@@ -185,6 +193,12 @@ func (job *Job) cleanup() {
 	// we are not really interested in the result of the deleteJob here,
 	// the only possible error would be ErrNotFound anyway... no need to wait for that.
 	go job.im.deleteJob(job.Group, job.ID)
+
+	if job.workDir != "" {
+		if err := os.RemoveAll(job.workDir); err != nil {
+			job.im.errLog.Printf("failed to remove jobs work directory: %v", err)
+		}
+	}
 }
 
 func newJob(im *Importer, group string, id uint64, src url.URL, user, refID string) *Job {
diff --git a/importer/normalize.go b/importer/normalize.go
index 489ad32..d8d404a 100644
--- a/importer/normalize.go
+++ b/importer/normalize.go
@@ -25,36 +25,63 @@
 package importer
 
 import (
-	"time"
-)
-
-const (
-	DEBUG_NORMALIZE_TIME = 150
+	"io"
+	"os"
+	"path/filepath"
 )
 
 func (job *Job) normalize(loudness interface{}) error {
 	job.Progress.set(StepNormalizing, 0)
 
-	job.im.dbgLog.Printf("normalization parameters(%T): %+v", loudness, loudness)
-
-	// TODO: actually normalize the file
-	t := time.NewTicker(100 * time.Millisecond)
-	defer t.Stop()
-	cnt := 0
-normalizeLoop:
-	for {
-		select {
-		case <-job.ctx.Done():
-			return job.ctx.Err()
-		case <-t.C:
-			cnt++
-			if cnt > DEBUG_NORMALIZE_TIME {
-				break normalizeLoop
-			}
-			job.Progress.set(StepNormalizing, float32(cnt)/DEBUG_NORMALIZE_TIME)
+	conv, err := job.newNormalizeConverter(loudness)
+	if err != nil {
+		job.im.errLog.Printf("normalize(): creating normalize converter failed: %v", err)
+		return err
+	}
+	// from here on conv.Close() and conv.Wait() has to be called in any case to
+	// reap potential child process zombies
+
+	done := make(chan error)
+	go func() {
+		src, err := os.OpenFile(filepath.Join(job.workDir, "source"), os.O_RDONLY, 0400)
+		if err != nil {
+			done <- err
+			return
+		}
+
+		srcStat, err := src.Stat()
+		if err != nil || srcStat.Size() < 0 {
+			done <- err
+			return
 		}
+
+		written, err := io.Copy(&progressWriter{job, StepNormalizing, 0, uint64(srcStat.Size()), conv}, src)
+		if err != nil {
+			done <- err
+			return
+		}
+		job.im.dbgLog.Printf("normalize(): done copying %d bytes from source", written)
+		close(done)
+	}()
+
+	select {
+	case <-job.ctx.Done():
+		conv.Close()
+		go conv.Wait() // do the zombie reaping in seperate go routine since we are not interested in the result anyway
+		return job.ctx.Err()
+	case err = <-done:
 	}
 
+	conv.Close()
+	convLog, convErr := conv.Wait()
+	job.im.dbgLog.Printf("normalize(): converter returned: %v", err)
+	if err == nil {
+		err = convErr
+	}
+
+	for _, l := range convLog {
+		job.im.dbgLog.Println(l.Line)
+	}
 	job.Progress.set(StepNormalizing, 1)
-	return nil
+	return err
 }
diff --git a/importer/normalize_converter.go b/importer/normalize_converter.go
new file mode 100644
index 0000000..edc601b
--- /dev/null
+++ b/importer/normalize_converter.go
@@ -0,0 +1,140 @@
+//
+//  tank
+//
+//  Import and Playlist Daemon for autoradio project
+//
+//
+//  Copyright (C) 2017-2018 Christian Pointner <equinox@helsinki.at>
+//
+//  This file is part of tank.
+//
+//  tank is free software: you can redistribute it and/or modify
+//  it under the terms of the GNU General Public License as published by
+//  the Free Software Foundation, either version 3 of the License, or
+//  any later version.
+//
+//  tank is distributed in the hope that it will be useful,
+//  but WITHOUT ANY WARRANTY; without even the implied warranty of
+//  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+//  GNU General Public License for more details.
+//
+//  You should have received a copy of the GNU General Public License
+//  along with tank. If not, see <http://www.gnu.org/licenses/>.
+//
+
+package importer
+
+import (
+	"fmt"
+	"io"
+	"os/exec"
+	"strconv"
+	"syscall"
+)
+
+type normalizeConverter interface {
+	io.WriteCloser
+	Wait() (log JobLog, err error)
+}
+
+func (job *Job) newNormalizeConverter(loudness interface{}) (normalizeConverter, error) {
+	if loudness == nil {
+		// make default converter configurable?
+		return newFFmpegNormalizeConverter(job, nil)
+	}
+
+	switch loudness.(type) {
+	case *ffmpegLoudnormParams:
+		return newFFmpegNormalizeConverter(job, loudness.(*ffmpegLoudnormParams))
+	}
+
+	return nil, fmt.Errorf("unknown loudness parameter type: %T", loudness)
+}
+
+//******* FFmpeg
+
+type ffmpegNormalizeConverter struct {
+	job    *Job
+	log    *convLog
+	cmd    *exec.Cmd
+	stdin  io.WriteCloser
+	stdout *convLogger
+	stderr *convLogger
+}
+
+func newFFmpegNormalizeConverter(job *Job, params *ffmpegLoudnormParams) (c *ffmpegNormalizeConverter, err error) {
+	c = &ffmpegNormalizeConverter{job: job}
+	c.log = newConvLog()
+	filename := job.im.store.GetFilePath(job.Group, job.ID)
+	job.im.dbgLog.Printf("ffmpeg-converter: starting ffmpeg for file '%s'", filename)
+
+	// c.cmd = exec.CommandContext(job.ctx, "ffmpeg", "-hide_banner", "-nostats", "-i", "-")
+	c.cmd = exec.CommandContext(job.ctx, "ffmpeg", "-nostats", "-y", "-i", "-")
+	c.cmd.Args = append(c.cmd.Args, "-map_metadata", "0", "-vn")
+	if params != nil {
+		// loudness normalization, see: http://k.ylo.ph/2016/04/04/loudnorm.html
+		params_encoded := fmt.Sprintf("measured_I=%s:measured_LRA=%s:measured_TP=%s:measured_thresh=%s:offset=%s",
+			params.InputI, params.InputLRA, params.InputTP, params.InputThresh, params.TargetOffset)
+		c.cmd.Args = append(c.cmd.Args, "-filter:a", "loudnorm="+params_encoded+":print_format=summary:dual_mono=true")
+	}
+	c.cmd.Args = append(c.cmd.Args, "-ar", strconv.FormatUint(uint64(job.im.store.Audio.SampleRate), 10))
+	c.cmd.Args = append(c.cmd.Args, "-f", job.im.store.Audio.Format.String(), filename)
+
+	if c.stdin, err = c.cmd.StdinPipe(); err != nil {
+		c.log.append("stderr", "ERROR opening stdin pipe: "+err.Error())
+		return nil, err
+	}
+
+	var stdout, stderr io.Reader
+	if stdout, err = c.cmd.StdoutPipe(); err != nil {
+		c.log.append("stderr", "ERROR opening stdout pipe: "+err.Error())
+		return nil, err
+	}
+	c.stdout = newConvLogger(c.log, "stdout(ffmpeg)", stdout)
+	if stderr, err = c.cmd.StderrPipe(); err != nil {
+		c.log.append("stderr", "ERROR opening stderr pipe: "+err.Error())
+		return nil, err
+	}
+	c.stderr = newConvLogger(c.log, "stderr(ffmpeg)", stderr)
+
+	if err = c.cmd.Start(); err != nil {
+		c.log.append("stderr", "ERROR starting ffmpeg: "+err.Error())
+		return nil, err
+	}
+	return
+}
+
+func (c *ffmpegNormalizeConverter) Write(p []byte) (n int, err error) {
+	return c.stdin.Write(p)
+}
+
+func (c *ffmpegNormalizeConverter) Close() (err error) {
+	return c.stdin.Close()
+}
+
+func (c *ffmpegNormalizeConverter) Wait() (log JobLog, err error) {
+	c.stdout.Wait()
+	c.stderr.Wait()
+
+	if err = c.cmd.Wait(); err != nil {
+		if exiterr, ok := err.(*exec.ExitError); ok {
+			if status, ok := exiterr.Sys().(syscall.WaitStatus); ok {
+				exitcode := status.ExitStatus()
+				c.job.im.dbgLog.Printf("ffmpeg-converter: ffmpeg returned %d", exitcode)
+				c.log.append("stdout", fmt.Sprintf("ffmpeg returned %d", exitcode))
+			} else {
+				c.job.im.errLog.Println("ffmpeg-converter: getting exit code of ffmpeg failed:", err)
+				c.log.append("stderr", "ERROR getting exit code of ffmpeg: "+err.Error())
+			}
+		} else {
+			c.job.im.errLog.Println("ffmpeg-converter: getting exit code of ffmpeg failed:", err)
+			c.log.append("stderr", "ERROR getting exit code of ffmpeg: "+err.Error())
+		}
+
+		return c.log.log, err
+	}
+
+	c.job.im.dbgLog.Println("ffmpeg-converter: ffmpeg returned 0 (success)")
+	c.log.append("stdout", "ffmpeg returned 0 (success)")
+	return c.log.log, err
+}
-- 
GitLab