From 9a6cf300389f918cf52e61b527e738c2c3536a7c Mon Sep 17 00:00:00 2001
From: Christian Pointner <equinox@helsinki.at>
Date: Sat, 8 Sep 2018 01:16:03 +0200
Subject: [PATCH] added some more job sources

---
 importer/fetch.go      | 17 -----------
 importer/importer.go   |  3 +-
 importer/job.go        | 15 +++++----
 importer/job_source.go | 69 ++++++++++++++++++++++++++++++++++++------
 importer/types.go      |  1 +
 5 files changed, 71 insertions(+), 34 deletions(-)

diff --git a/importer/fetch.go b/importer/fetch.go
index 09e8b57..89d1a95 100644
--- a/importer/fetch.go
+++ b/importer/fetch.go
@@ -27,27 +27,10 @@ package importer
 import (
 	"encoding/base64"
 	"io"
-	"sync/atomic"
 
 	"golang.org/x/crypto/blake2b"
 )
 
-func (job *Job) prepareSource() {
-	switch job.Source.Scheme {
-	case SourceSchemeAttachment:
-		// the source will be attached using AttachSource()
-		return
-
-		// TODO: implement other sources
-
-	default:
-		// simulate a 10 MB file...
-		atomic.StoreUint32(&job.sourceSet, 1)
-		job.source = newJobSourceNull(10 * 1024 * 1024)
-		close(job.subC.sourceAttached)
-	}
-}
-
 type copyResult struct {
 	err  error
 	hash string
diff --git a/importer/importer.go b/importer/importer.go
index 7991eac..084adfc 100644
--- a/importer/importer.go
+++ b/importer/importer.go
@@ -54,7 +54,8 @@ func (im *Importer) CreateJob(group string, id uint64, src url.URL, user, refID
 	// TODO: update this list once we implemented other sources
 	switch src.Scheme {
 	case SourceSchemeAttachment:
-		//	case SourceSchemeFlowJS:
+	case SourceSchemeFlowJS:
+	case SourceSchemeFake:
 	default:
 		return nil, ErrSourceNotSupported
 	}
diff --git a/importer/job.go b/importer/job.go
index d18332c..74d0531 100644
--- a/importer/job.go
+++ b/importer/job.go
@@ -71,8 +71,6 @@ func (job *Job) run() error {
 	defer job.cleanup()
 	defer close(job.subC.done)
 
-	job.prepareSource()
-
 	if !atomic.CompareAndSwapUint32((*uint32)(&job.State), uint32(JobPending), uint32(JobRunning)) {
 		job.im.store.UpdateFileImportState(job.Group, job.ID, store.ImportAborted)
 		// the job was canceled before job.Start() could initialize the context and because of this
@@ -82,21 +80,22 @@ func (job *Job) run() error {
 	close(job.subC.running)
 	job.StartedAt.set(time.Now())
 	job.im.dbgLog.Printf("running import for %s/%d from: %s", job.Group, job.ID, job.Source.String())
-
 	job.im.store.UpdateFileImportState(job.Group, job.ID, store.ImportRunning)
+
+	if err := job.prepareSource(); err != nil {
+		job.im.store.UpdateFileImportState(job.Group, job.ID, store.ImportAborted)
+		return err
+	}
 	loudness, err := job.fetch()
 	if err != nil {
 		job.im.store.UpdateFileImportState(job.Group, job.ID, store.ImportAborted)
-		// send result to all done subscriptions
 		return err
 	}
 	if err = job.normalize(loudness); err != nil {
 		job.im.store.UpdateFileImportState(job.Group, job.ID, store.ImportAborted)
-		// send result to all done subscriptions
 		return err
 	}
 	job.im.store.UpdateFileImportState(job.Group, job.ID, store.ImportDone)
-	// send result to all done subscriptions
 	return err
 }
 
@@ -159,6 +158,10 @@ func (job *Job) AttachSource(length uint64, r io.Reader) (<-chan *JobSourceResul
 		return nil, ErrImportNotRunning
 	}
 
+	// only allow to attach external sources if the job's source was in fact an attachment URL
+	if job.Source.Scheme != SourceSchemeAttachment {
+		return nil, ErrSourceAlreadyAttached
+	}
 	if ok := atomic.CompareAndSwapUint32(&job.sourceSet, 0, 1); !ok {
 		return nil, ErrSourceAlreadyAttached
 	}
diff --git a/importer/job_source.go b/importer/job_source.go
index d2559d8..344a745 100644
--- a/importer/job_source.go
+++ b/importer/job_source.go
@@ -26,27 +26,53 @@ package importer
 
 import (
 	"io"
+	"sync/atomic"
 	"time"
 )
 
-//******* null
+func (job *Job) prepareSource() error {
+	if job.Source.Scheme == SourceSchemeAttachment {
+		// the source will be attached using AttachSource()
+		return nil
+	}
+
+	atomic.StoreUint32(&job.sourceSet, 1)
+
+	switch job.Source.Scheme {
+	case SourceSchemeFlowJS:
+		job.source = newJobSourceFlowJS(job.Source)
 
-type JobSourceNull uint64
+		// TODO: implement other sources
 
-func newJobSourceNull(len uint64) *JobSourceNull {
-	src := JobSourceNull(len)
+	case SourceSchemeFake:
+		job.source = newJobSourceFake(job.Source)
+	default:
+		return ErrSourceNotSupported
+	}
+
+	close(job.subC.sourceAttached)
+	return nil
+}
+
+//******* fake
+
+type JobSourceFake uint64
+
+func newJobSourceFake(srcURL SourceURL) *JobSourceFake {
+	// TODO: parse size from srcURL
+	src := JobSourceFake(10 * 1024 * 1024) // simulate a 10 MB file...
 	return &src
 }
 
-func (src *JobSourceNull) Len() uint64 {
+func (src *JobSourceFake) Len() uint64 {
 	return uint64(*src)
 }
 
-func (src *JobSourceNull) Read(p []byte) (n int, err error) {
+func (src *JobSourceFake) Read(p []byte) (n int, err error) {
 	l := len(p)
-	if *src > JobSourceNull(l) {
-		*src = *src - JobSourceNull(l)
-		time.Sleep(20 * time.Millisecond)
+	if *src > JobSourceFake(l) {
+		*src = *src - JobSourceFake(l)
+		time.Sleep(200 * time.Millisecond)
 		return l, nil
 	}
 	l = int(*src)
@@ -54,7 +80,7 @@ func (src *JobSourceNull) Read(p []byte) (n int, err error) {
 	return l, io.EOF
 }
 
-func (src *JobSourceNull) Done(result *JobSourceResult) {
+func (src *JobSourceFake) Done(result *JobSourceResult) {
 	return
 }
 
@@ -87,3 +113,26 @@ func (src *JobSourceAttachment) Done(result *JobSourceResult) {
 	}
 	src.done <- result
 }
+
+//******* flow.js
+
+type JobSourceFlowJS struct {
+	// TODO: implement this
+}
+
+func newJobSourceFlowJS(srcURL SourceURL) *JobSourceFlowJS {
+	src := &JobSourceFlowJS{}
+	return src
+}
+
+func (src *JobSourceFlowJS) Len() uint64 {
+	return 0
+}
+
+func (src *JobSourceFlowJS) Read(p []byte) (n int, err error) {
+	return 0, ErrNotImplemented
+}
+
+func (src *JobSourceFlowJS) Done(result *JobSourceResult) {
+	return
+}
diff --git a/importer/types.go b/importer/types.go
index ecb4dbc..13542c1 100644
--- a/importer/types.go
+++ b/importer/types.go
@@ -40,6 +40,7 @@ const (
 
 	SourceSchemeAttachment = "attachment"
 	SourceSchemeFlowJS     = "flowjs"
+	SourceSchemeFake       = "fake"
 )
 
 //******* Errors
-- 
GitLab