Skip to content
Snippets Groups Projects
fetch.go 2.47 KiB
Newer Older
  • Learn to ignore specific revisions
  • //
    //  tank
    //
    //  Import and Playlist Daemon for autoradio project
    //
    //
    //  Copyright (C) 2017-2018 Christian Pointner <equinox@helsinki.at>
    //
    //  This file is part of tank.
    //
    //  tank is free software: you can redistribute it and/or modify
    //  it under the terms of the GNU General Public License as published by
    //  the Free Software Foundation, either version 3 of the License, or
    //  any later version.
    //
    //  tank is distributed in the hope that it will be useful,
    //  but WITHOUT ANY WARRANTY; without even the implied warranty of
    //  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    //  GNU General Public License for more details.
    //
    //  You should have received a copy of the GNU General Public License
    //  along with tank. If not, see <http://www.gnu.org/licenses/>.
    //
    
    package importer
    
    import (
    
    type devNull uint64
    
    func (d *devNull) Read(p []byte) (n int, err error) {
    	l := len(p)
    	if *d > devNull(l) {
    		*d = *d - devNull(l)
    		time.Sleep(20 * time.Millisecond)
    		return l, nil
    	}
    	l = int(*d)
    	*d = 0
    	return l, io.EOF
    }
    
    func (job *Job) prepareSource() {
    	// TODO: enable this as soon as attachment source is implemented
    	//	if job.Source.Scheme == "attachment" {
    	// the source will be attached using AttachSource()
    	//		return
    	//	}
    	// TODO: implement other sources
    
    	// simulate a 10 MB file...
    	l := uint64(10 * 1024 * 1024)
    	r := devNull(l)
    	atomic.StorePointer((*unsafe.Pointer)(unsafe.Pointer(&job.source)), unsafe.Pointer(&JobSource{l, &r}))
    	close(job.sourceAttached)
    }
    
    type progressWriter struct {
    	job     *Job
    	written uint64
    	w       io.Writer
    }
    
    func (pw *progressWriter) Write(p []byte) (n int, err error) {
    	n, err = pw.w.Write(p)
    	if n > 0 {
    		pw.written += uint64(n)
    	}
    	pw.job.Progress.set(StepFetching, float32(pw.written)/float32(pw.job.source.len))
    	return
    }
    
    func (job *Job) fetch() (err error) {
    
    	job.Progress.set(StepFetching, 0)
    
    
    	// wait until source is attached
    	select {
    	case <-job.ctx.Done():
    		return job.ctx.Err()
    	case <-job.sourceAttached:
    	}
    
    	// TODO: use an actual converter here
    	conv := ioutil.Discard
    
    	done := make(chan error)
    	go func() {
    		// TODO: wrap job.source using io.TeeReader to compute source file hash
    		_, err := io.Copy(&progressWriter{job, 0, conv}, job.source.r)
    		done <- err
    	}()
    
    	select {
    	case <-job.ctx.Done():
    		return job.ctx.Err()
    	case err = <-done:
    
    	// TODO: wait for converter to finish up and return the result
    
    
    	job.Progress.set(StepFetching, 1)