Skip to content
Snippets Groups Projects
fetch.go 2.64 KiB
Newer Older
  • Learn to ignore specific revisions
  • //
    //  tank
    //
    //  Import and Playlist Daemon for autoradio project
    //
    //
    //  Copyright (C) 2017-2018 Christian Pointner <equinox@helsinki.at>
    //
    //  This file is part of tank.
    //
    //  tank is free software: you can redistribute it and/or modify
    //  it under the terms of the GNU General Public License as published by
    //  the Free Software Foundation, either version 3 of the License, or
    //  any later version.
    //
    //  tank is distributed in the hope that it will be useful,
    //  but WITHOUT ANY WARRANTY; without even the implied warranty of
    //  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    //  GNU General Public License for more details.
    //
    //  You should have received a copy of the GNU General Public License
    //  along with tank. If not, see <http://www.gnu.org/licenses/>.
    //
    
    package importer
    
    import (
    
    	"crypto/sha256"
    	"encoding/hex"
    
    type devNull uint64
    
    func (d *devNull) Read(p []byte) (n int, err error) {
    	l := len(p)
    	if *d > devNull(l) {
    		*d = *d - devNull(l)
    		time.Sleep(20 * time.Millisecond)
    		return l, nil
    	}
    	l = int(*d)
    	*d = 0
    	return l, io.EOF
    }
    
    func (job *Job) prepareSource() {
    
    	switch job.Source.Scheme {
    	case SourceSchemeAttachment:
    		// the source will be attached using AttachSource()
    		return
    
    		// TODO: implement other sources
    
    	default:
    		// simulate a 10 MB file...
    		l := uint64(10 * 1024 * 1024)
    		r := devNull(l)
    		atomic.StorePointer((*unsafe.Pointer)(unsafe.Pointer(&job.source)), unsafe.Pointer(&JobSource{l, &r}))
    		close(job.subC.sourceAttached)
    	}
    
    }
    
    type progressWriter struct {
    	job     *Job
    	written uint64
    	w       io.Writer
    }
    
    func (pw *progressWriter) Write(p []byte) (n int, err error) {
    	n, err = pw.w.Write(p)
    	if n > 0 {
    		pw.written += uint64(n)
    	}
    	pw.job.Progress.set(StepFetching, float32(pw.written)/float32(pw.job.source.len))
    	return
    }
    
    func (job *Job) fetch() (err error) {
    
    	job.Progress.set(StepFetching, 0)
    
    
    	// wait until source is attached
    	select {
    	case <-job.ctx.Done():
    		return job.ctx.Err()
    
    	case <-job.subC.sourceAttached:
    
    	// job.source is now initialized and points to a valid source
    
    
    	// TODO: use an actual converter here
    	conv := ioutil.Discard
    
    	done := make(chan error)
    	go func() {
    
    		hash := sha256.New()
    		src := io.TeeReader(job.source.r, hash)
    		written, err := io.Copy(&progressWriter{job, 0, conv}, src)
    		job.im.dbgLog.Printf("fetch(): done copying %d bytes from source (SHA256: %s)", written, hex.EncodeToString(hash.Sum(nil)))
    
    		done <- err
    	}()
    
    	select {
    	case <-job.ctx.Done():
    		return job.ctx.Err()
    	case err = <-done:
    
    	// TODO: wait for converter to finish up and return the result
    
    
    	job.Progress.set(StepFetching, 1)