Newer
Older
//
// tank
//
// Import and Playlist Daemon for autoradio project
//
//
// Copyright (C) 2017-2018 Christian Pointner <equinox@helsinki.at>
//
// This file is part of tank.
//
// tank is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// any later version.
//
// tank is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with tank. If not, see <http://www.gnu.org/licenses/>.
//
package importer
import (
"io"
"io/ioutil"
"sync/atomic"
"unsafe"
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
type devNull uint64
func (d *devNull) Read(p []byte) (n int, err error) {
l := len(p)
if *d > devNull(l) {
*d = *d - devNull(l)
time.Sleep(20 * time.Millisecond)
return l, nil
}
l = int(*d)
*d = 0
return l, io.EOF
}
func (job *Job) prepareSource() {
// TODO: enable this as soon as attachment source is implemented
// if job.Source.Scheme == "attachment" {
// the source will be attached using AttachSource()
// return
// }
// TODO: implement other sources
// simulate a 10 MB file...
l := uint64(10 * 1024 * 1024)
r := devNull(l)
atomic.StorePointer((*unsafe.Pointer)(unsafe.Pointer(&job.source)), unsafe.Pointer(&JobSource{l, &r}))
close(job.sourceAttached)
}
type progressWriter struct {
job *Job
written uint64
w io.Writer
}
func (pw *progressWriter) Write(p []byte) (n int, err error) {
n, err = pw.w.Write(p)
if n > 0 {
pw.written += uint64(n)
}
pw.job.Progress.set(StepFetching, float32(pw.written)/float32(pw.job.source.len))
return
}
func (job *Job) fetch() (err error) {
job.Progress.set(StepFetching, 0)
// wait until source is attached
select {
case <-job.ctx.Done():
return job.ctx.Err()
case <-job.sourceAttached:
}
// TODO: use an actual converter here
conv := ioutil.Discard
done := make(chan error)
go func() {
// TODO: wrap job.source using io.TeeReader to compute source file hash
_, err := io.Copy(&progressWriter{job, 0, conv}, job.source.r)
done <- err
}()
select {
case <-job.ctx.Done():
return job.ctx.Err()
case err = <-done:
// TODO: wait for converter to finish up and return the result
job.Progress.set(StepFetching, 1)
return err