diff --git a/importer/fetch_converter.go b/importer/fetch_converter.go index 86d81586dca5bb89fc9ba2adc1b7936a7d087ec6..ee10719c27b447d3fc5e51578134f6d133c079df 100644 --- a/importer/fetch_converter.go +++ b/importer/fetch_converter.go @@ -35,7 +35,7 @@ import ( type converter interface { io.WriteCloser - Wait() (loudnessCorr float64, log []string, err error) + Wait() (loudnessCorr float64, log JobLog, err error) } func (job *Job) newConverter() (converter, error) { @@ -53,7 +53,7 @@ func (job *Job) newConverter() (converter, error) { type nullConverter struct { job *Job file *os.File - log []string + log JobLog } func newNullConverter(job *Job) (c *nullConverter, err error) { @@ -61,9 +61,9 @@ func newNullConverter(job *Job) (c *nullConverter, err error) { filename := job.im.store.GetFilePath(job.Group, job.ID) job.im.dbgLog.Printf("null-converter: opened file '%s'", filename) if c.file, err = os.OpenFile(filename, os.O_WRONLY|os.O_CREATE|os.O_EXCL, 0600); err != nil { - c.log = append(c.log, "ERROR opening file failed: "+err.Error()) + c.log.append("stderr", "ERROR opening file failed: "+err.Error()) } else { - c.log = append(c.log, "successfully opened file: "+filename) + c.log.append("stdout", "successfully opened file: "+filename) } return } @@ -75,14 +75,14 @@ func (c *nullConverter) Write(p []byte) (n int, err error) { func (c *nullConverter) Close() (err error) { filename := c.file.Name() if err = c.file.Close(); err != nil { - c.log = append(c.log, "ERROR closing file failed: "+err.Error()) + c.log.append("stderr", "ERROR closing file failed: "+err.Error()) } else { - c.log = append(c.log, "successfully closed file: "+filename) + c.log.append("stdout", "successfully closed file: "+filename) } return } -func (c *nullConverter) Wait() (loudnessCorr float64, log []string, err error) { +func (c *nullConverter) Wait() (loudnessCorr float64, log JobLog, err error) { return 0, c.log, nil } @@ -115,24 +115,24 @@ func newFFmpegConverter(job *Job) (c *ffmpegConverter, err error) { c.cmd.Args = append(c.cmd.Args, "-f", "null", "/dev/null") if c.stdin, err = c.cmd.StdinPipe(); err != nil { - c.log.Append("ERROR opening stdin pipe: " + err.Error()) + c.log.append("stderr", "ERROR opening stdin pipe: "+err.Error()) return nil, err } var stdout, stderr io.Reader if stdout, err = c.cmd.StdoutPipe(); err != nil { - c.log.Append("ERROR opening stdout pipe: " + err.Error()) + c.log.append("stderr", "ERROR opening stdout pipe: "+err.Error()) return nil, err } - c.stdout = newConvLogger(c.log, "ffmpeg(stderr)> ", stdout) + c.stdout = newConvLogger(c.log, "stdout(ffmpeg)", stdout) if stderr, err = c.cmd.StderrPipe(); err != nil { - c.log.Append("ERROR opening stderr pipe: " + err.Error()) + c.log.append("stderr", "ERROR opening stderr pipe: "+err.Error()) return nil, err } - c.stderr = newConvLogger(c.log, "ffmpeg(stderr)> ", stderr) + c.stderr = newConvLogger(c.log, "stderr(ffmpeg)", stderr) if err = c.cmd.Start(); err != nil { - c.log.Append("ERROR starting ffmpeg: " + err.Error()) + c.log.append("stderr", "ERROR starting ffmpeg: "+err.Error()) return nil, err } return @@ -146,7 +146,7 @@ func (c *ffmpegConverter) Close() (err error) { return c.stdin.Close() } -func (c *ffmpegConverter) Wait() (loudnessCorr float64, log []string, err error) { +func (c *ffmpegConverter) Wait() (loudnessCorr float64, log JobLog, err error) { c.stdout.Wait() c.stderr.Wait() err = c.cmd.Wait() @@ -156,14 +156,14 @@ func (c *ffmpegConverter) Wait() (loudnessCorr float64, log []string, err error) if status, ok := exiterr.Sys().(syscall.WaitStatus); ok { exitcode := status.ExitStatus() c.job.im.dbgLog.Printf("ffmpeg-converter: ffmpeg returned %d", exitcode) - c.log.Append(fmt.Sprintf("ffmpeg returned %d", exitcode)) + c.log.append("stdout", fmt.Sprintf("ffmpeg returned %d", exitcode)) } else { - c.log.Append("ERROR getting exit code of ffmpeg: " + err.Error()) c.job.im.errLog.Println("ffmpeg-converter: getting exit code of ffmpeg failed:", err) + c.log.append("stderr", "ERROR getting exit code of ffmpeg: "+err.Error()) } } else { - c.log.Append("ERROR getting exit code of ffmpeg: " + err.Error()) c.job.im.errLog.Println("ffmpeg-converter: getting exit code of ffmpeg failed:", err) + c.log.append("stderr", "ERROR getting exit code of ffmpeg: "+err.Error()) } } return 0, c.log.log, err diff --git a/importer/fetch_converter_utils.go b/importer/fetch_converter_utils.go index 17c777781f4db452d9f02656f73cb17c56037bff..b63fca012be7cc00194ab29d1231f0cc91c8ea84 100644 --- a/importer/fetch_converter_utils.go +++ b/importer/fetch_converter_utils.go @@ -31,7 +31,7 @@ import ( ) type convLog struct { - log []string + log JobLog m *sync.Mutex } @@ -41,22 +41,22 @@ func newConvLog() *convLog { return l } -func (l *convLog) Append(line string) { +func (l *convLog) append(stream, line string) { l.m.Lock() defer l.m.Unlock() - l.log = append(l.log, line) + l.log.append(stream, line) } type convLogger struct { log *convLog - prefix string + stream string s *bufio.Scanner done chan struct{} } -func newConvLogger(log *convLog, prefix string, pipe io.Reader) *convLogger { - l := &convLogger{log: log, prefix: prefix} +func newConvLogger(log *convLog, stream string, pipe io.Reader) *convLogger { + l := &convLogger{log: log, stream: stream} l.s = bufio.NewScanner(pipe) l.done = make(chan struct{}) go l.run() @@ -67,11 +67,11 @@ func (l *convLogger) run() { defer close(l.done) for l.s.Scan() { - l.log.Append(l.prefix + l.s.Text()) + l.log.append(l.stream, l.s.Text()) // TODO: add this to import log of store as well? } if err := l.s.Err(); err != nil { - l.log.Append(l.prefix + l.s.Text()) + l.log.append(l.stream, l.s.Text()) // TODO: print this on application error log?? return } diff --git a/importer/types.go b/importer/types.go index b8c674db25867b5eb6d605581465260d85ccaac5..081781426124d522b748cbfbe6dd2288ebc67908 100644 --- a/importer/types.go +++ b/importer/types.go @@ -54,6 +54,20 @@ var ( ErrAlreadyCanceled = errors.New("job is already canceled") ) +//******* Log + +type JobLogLine struct { + Stream string `json:"stream"` + Timestamp time.Time `json:"timestamp"` + Line string `json:"line"` +} + +type JobLog []JobLogLine + +func (l *JobLog) append(stream, line string) { + *l = append(*l, JobLogLine{stream, time.Now(), line}) +} + //******* Source URL type SourceURL url.URL @@ -75,9 +89,9 @@ func (s *SourceURL) UnmarshalText(data []byte) (err error) { //******* Source type JobSourceResult struct { - Err error `json:"error,omitempty"` - Hash string `json:"hash,omitempty"` - Log []string `json:"log"` + Err error `json:"error,omitempty"` + Hash string `json:"hash,omitempty"` + Log JobLog `json:"log"` } func (r *JobSourceResult) Error() string {