//
//  tank
//
//  Import and Playlist Daemon for autoradio project
//
//
//  Copyright (C) 2017-2018 Christian Pointner <equinox@helsinki.at>
//
//  This file is part of tank.
//
//  tank is free software: you can redistribute it and/or modify
//  it under the terms of the GNU General Public License as published by
//  the Free Software Foundation, either version 3 of the License, or
//  any later version.
//
//  tank is distributed in the hope that it will be useful,
//  but WITHOUT ANY WARRANTY; without even the implied warranty of
//  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
//  GNU General Public License for more details.
//
//  You should have received a copy of the GNU General Public License
//  along with tank. If not, see <http://www.gnu.org/licenses/>.
//

package importer

import (
	"encoding/base64"
	"io"
	"sync/atomic"
	"time"
	"unsafe"

	"golang.org/x/crypto/blake2b"
)

type devNull uint64

func (d *devNull) Read(p []byte) (n int, err error) {
	l := len(p)
	if *d > devNull(l) {
		*d = *d - devNull(l)
		time.Sleep(20 * time.Millisecond)
		return l, nil
	}
	l = int(*d)
	*d = 0
	return l, io.EOF
}

func (job *Job) prepareSource() {
	switch job.Source.Scheme {
	case SourceSchemeAttachment:
		// the source will be attached using AttachSource()
		return

		// TODO: implement other sources

	default:
		// simulate a 10 MB file...
		l := uint64(10 * 1024 * 1024)
		r := devNull(l)
		src := &JobSource{len: l, r: &r}
		src.done = make(chan *JobSourceResult, 1)
		atomic.StorePointer((*unsafe.Pointer)(unsafe.Pointer(&job.source)), unsafe.Pointer(src))
		close(job.subC.sourceAttached)
	}
}

type progressWriter struct {
	job     *Job
	written uint64
	w       io.Writer
}

func (pw *progressWriter) Write(p []byte) (n int, err error) {
	n, err = pw.w.Write(p)
	if n > 0 {
		pw.written += uint64(n)
	}
	pw.job.Progress.set(StepFetching, float32(pw.written)/float32(pw.job.source.len))
	return
}

type copyResult struct {
	err  error
	hash string
}

func (job *Job) fetch() error {
	job.Progress.set(StepFetching, 0)

	// wait until source is attached
	select {
	case <-job.ctx.Done():
		return job.ctx.Err()
	case <-job.subC.sourceAttached:
	}
	// job.source is now initialized and points to a valid source
	// make sure a potentially connected source gets notified in any case
	defer close(job.source.done)

	conv, err := job.newConverter()
	if err != nil {
		job.im.errLog.Printf("fetch(): creating fetch converter failed: %v", err)
		return err
	}

	done := make(chan copyResult)
	go func() {
		hash, err := blake2b.New256(nil)
		if err != nil {
			panic("creating hash function failed: " + err.Error())
		}
		src := io.TeeReader(job.source.r, hash)
		written, err := io.Copy(&progressWriter{job, 0, conv}, src)
		if err != nil {
			done <- copyResult{err, ""}
			return
		}
		hashStr := "blake2b_256:" + base64.URLEncoding.EncodeToString(hash.Sum(nil))
		job.im.dbgLog.Printf("fetch(): done copying %d bytes from source (%s)", written, hashStr)
		_, err = job.im.store.UpdateFileSourceHash(job.Group, job.ID, hashStr)
		done <- copyResult{err, hashStr}
	}()

	var res copyResult
	select {
	case <-job.ctx.Done():
		conv.Close()
		err = job.ctx.Err()
		job.source.done <- &JobSourceResult{Err: err}
		return err
	case res = <-done:
	}

	conv.Close()
	corr, convLog, err := conv.Wait()
	job.im.dbgLog.Printf("fetch(): converter returned: %f db, %v", corr, err)
	if res.err != nil {
		err = res.err
	}

	job.Progress.set(StepFetching, 1)
	job.source.done <- &JobSourceResult{err, res.hash, convLog}
	return err
}