From 8470e5864f63c42447b44a33658ddb5b6079b4fe Mon Sep 17 00:00:00 2001
From: Christian Pointner <equinox@helsinki.at>
Date: Sun, 16 Sep 2018 03:31:06 +0200
Subject: [PATCH] implemented flow.js test handler

---
 api/v1/api_uploads.go | 131 ++++++++++++++++++++++++++++++++++++++++--
 api/v1/utils.go       |   4 ++
 importer/job.go       |  16 +++---
 importer/types.go     |   2 +
 4 files changed, 139 insertions(+), 14 deletions(-)

diff --git a/api/v1/api_uploads.go b/api/v1/api_uploads.go
index b04d1c1..5657257 100644
--- a/api/v1/api_uploads.go
+++ b/api/v1/api_uploads.go
@@ -26,10 +26,13 @@ package v1
 
 import (
 	"errors"
+	"fmt"
 	"io"
 	"net/http"
 	"os"
 	"path/filepath"
+	"strconv"
+	"sync/atomic"
 
 	"github.com/gorilla/mux"
 	"gitlab.servus.at/autoradio/tank/importer"
@@ -71,17 +74,44 @@ func (api *API) UploadFileSimple() http.Handler {
 
 //******* flow.js
 
+type FlowJSFileChunkState uint32
+
+const (
+	FlowJSFileChunkPending FlowJSFileChunkState = iota
+	FlowJSFileChunkWriting
+	FlowJSFileChunkCompleted
+)
+
 type FlowJSFileChunk struct {
+	state     FlowJSFileChunkState
 	file      *os.File
 	completed chan struct{}
 }
 
+func (ch *FlowJSFileChunk) GetState() FlowJSFileChunkState {
+	return FlowJSFileChunkState(atomic.LoadUint32((*uint32)(&ch.state)))
+}
+
 func (ch *FlowJSFileChunk) Read(p []byte) (int, error) {
 	return ch.file.Read(p)
 }
 
-func (ch *FlowJSFileChunk) Write(p []byte) (int, error) {
-	return ch.file.Write(p)
+func (ch *FlowJSFileChunk) ReadFrom(r io.Reader) (int64, error) {
+	if !atomic.CompareAndSwapUint32((*uint32)(&ch.state), uint32(FlowJSFileChunkPending), uint32(FlowJSFileChunkWriting)) {
+		return 0, errors.New("chunk is already uploading or done")
+	}
+
+	n, err := io.Copy(ch.file, r)
+	if err != nil {
+		if err := ch.file.Truncate(0); err != nil {
+			return n, err
+		}
+		if _, err := ch.file.Seek(0, io.SeekStart); err != nil {
+			return n, err
+		}
+		atomic.StoreUint32((*uint32)(&ch.state), uint32(FlowJSFileChunkPending))
+	}
+	return n, err
 }
 
 func (ch *FlowJSFileChunk) Complete() (err error) {
@@ -101,6 +131,7 @@ func (ch *FlowJSFileChunk) Cleanup() (err error) {
 }
 
 type FlowJSFile struct {
+	id         string
 	size       uint64
 	chunks     []FlowJSFileChunk
 	chunksPath string
@@ -108,14 +139,17 @@ type FlowJSFile struct {
 	job        *importer.Job
 }
 
-func newFlowJSFile(size uint64, numChunks uint, job *importer.Job) (*FlowJSFile, error) {
-	f := &FlowJSFile{size: size, job: job}
+func newFlowJSFile(id string, numChunks uint64, size uint64, job *importer.Job) (*FlowJSFile, error) {
+	f := &FlowJSFile{id: id, size: size, job: job}
 	f.chunksPath = filepath.Join(job.WorkDir, "chunks")
 	f.chunks = make([]FlowJSFileChunk, numChunks)
 	for i := range f.chunks {
 		f.chunks[i].completed = make(chan struct{})
 	}
 	err := os.Mkdir(f.chunksPath, 0700)
+	if os.IsExist(err) {
+		err = nil
+	}
 	return f, err
 }
 
@@ -165,9 +199,94 @@ func (api *API) UploadFileFlowJS() http.Handler {
 	})
 }
 
+func getFlowJSParameterFromQuery(r *http.Request) (id string, chunk, totalChunks, totalSize uint64, err error) {
+	q := r.URL.Query()
+	id = q.Get("flowIdentifier")
+	if chunk, err = strconv.ParseUint(q.Get("flowChunkNumber"), 10, 64); err != nil {
+		err = errors.New("invalid query parameter 'flowChunkNumber': " + err.Error())
+		return
+	}
+	if chunk < 1 {
+		err = errors.New("invalid chunk number: 0")
+		return
+	}
+	if totalChunks, err = strconv.ParseUint(q.Get("flowTotalChunks"), 10, 64); err != nil {
+		err = errors.New("invalid query parameter 'flowTotalChunks': " + err.Error())
+		return
+	}
+	if totalSize, err = strconv.ParseUint(q.Get("flowTotalSize"), 10, 64); err != nil {
+		err = errors.New("invalid query parameter 'flowTotalSize': " + err.Error())
+		return
+	}
+	return
+}
+
 func (api *API) TestFileFlowJS() http.Handler {
 	return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
-		// TODO: implement this
-		sendWebResponse(w, http.StatusNotImplemented, ErrorResponse{"flow.js testing is not yet implemented", nil})
+		vars := mux.Vars(r)
+		id, err := idFromString(vars["file-id"])
+		if err != nil {
+			sendWebResponse(w, http.StatusBadRequest, ErrorResponse{Error: "invalid file-id: " + err.Error()})
+			return
+		}
+		flowId, chunk, totalChunks, totalSize, err := getFlowJSParameterFromQuery(r)
+		if err != nil {
+			sendWebResponse(w, http.StatusBadRequest, ErrorResponse{Error: err.Error()})
+			return
+		}
+
+		job, err := api.importer.GetJob(vars["group-id"], id)
+		if err != nil {
+			sendError(w, err)
+			return
+		}
+		src, err := job.GetAttachedUploader()
+		if err != nil && err != importer.ErrSourceNotYetAttached {
+			sendError(w, err)
+			return
+		}
+
+		var file *FlowJSFile
+		if err == importer.ErrSourceNotYetAttached {
+			if file, err = newFlowJSFile(flowId, totalChunks, totalSize, job); err != nil {
+				sendError(w, err)
+				return
+			}
+
+			_, err = job.AttachUploader(file.size, file)
+			switch err {
+			case nil:
+			case importer.ErrSourceAlreadyAttached:
+				// there has been a race and the other thread won!
+				file = nil
+				if src, err = job.GetAttachedUploader(); err != nil {
+					sendError(w, err)
+					return
+				}
+			default:
+				sendError(w, err)
+				return
+			}
+		}
+		if file == nil {
+			var ok bool
+			if file, ok = src.(*FlowJSFile); !ok {
+				sendWebResponse(w, http.StatusConflict, ErrorResponse{Error: "this is not a flow.js upload"})
+				return
+			}
+		}
+
+		if chunk > uint64(len(file.chunks)) {
+			sendWebResponse(w, http.StatusConflict, ErrorResponse{Error: fmt.Sprintf("invalid chunk number %d, the flow.js upload is only %d chunks long", chunk, len(file.chunks))})
+			return
+		}
+		switch file.chunks[chunk-1].GetState() {
+		case FlowJSFileChunkPending:
+			sendWebResponse(w, http.StatusNoContent, nil)
+		case FlowJSFileChunkWriting:
+			fallthrough
+		case FlowJSFileChunkCompleted:
+			sendWebResponse(w, http.StatusOK, nil)
+		}
 	})
 }
diff --git a/api/v1/utils.go b/api/v1/utils.go
index f88904f..367067b 100644
--- a/api/v1/utils.go
+++ b/api/v1/utils.go
@@ -65,6 +65,10 @@ func sendError(w http.ResponseWriter, err error) {
 			code = http.StatusConflict
 		case importer.ErrSourceAlreadyAttached:
 			code = http.StatusConflict
+		case importer.ErrSourceNotYetAttached:
+			code = http.StatusConflict
+		case importer.ErrSourceNotAUpload:
+			code = http.StatusConflict
 		case importer.ErrFileNotNew:
 			code = http.StatusConflict
 		case importer.ErrTooManyJobs:
diff --git a/importer/job.go b/importer/job.go
index 15ca40f..788fbf2 100644
--- a/importer/job.go
+++ b/importer/job.go
@@ -161,7 +161,7 @@ func (job *Job) AttachUploader(len uint64, r io.Reader) (<-chan *JobSourceResult
 
 	// only allow to attach external sources if the job's source was in fact an upload URL
 	if job.Source.Scheme != SourceSchemeUpload {
-		return nil, ErrSourceAlreadyAttached
+		return nil, ErrSourceNotAUpload
 	}
 	if ok := atomic.CompareAndSwapUint32(&job.sourceSet, 0, 1); !ok {
 		return nil, ErrSourceAlreadyAttached
@@ -172,18 +172,18 @@ func (job *Job) AttachUploader(len uint64, r io.Reader) (<-chan *JobSourceResult
 	return src.done, nil
 }
 
-func (job *Job) GetAttachedUploader() io.Reader {
-	select {
-	case <-job.subC.sourceAttached:
-	default:
-		return nil
+func (job *Job) GetAttachedUploader() (io.Reader, error) {
+	if atomic.LoadUint32(&job.sourceSet) != 1 {
+		return nil, ErrSourceNotYetAttached
 	}
+	// make sure that AttachUploader() has finished
+	<-job.subC.sourceAttached
 
 	src, ok := job.source.(*JobSourceUpload)
 	if !ok {
-		return nil
+		return nil, ErrSourceNotAUpload
 	}
-	return src.r
+	return src.r, nil
 }
 
 func (job *Job) Cancel() {
diff --git a/importer/types.go b/importer/types.go
index acd4e30..ac941b6 100644
--- a/importer/types.go
+++ b/importer/types.go
@@ -50,6 +50,8 @@ var (
 	ErrSourceNotSupported    = errors.New("source uri format is not supported")
 	ErrImportNotRunning      = errors.New("import not running")
 	ErrSourceAlreadyAttached = errors.New("source is already attached")
+	ErrSourceNotYetAttached  = errors.New("source is not yet attached")
+	ErrSourceNotAUpload      = errors.New("source is not a upload")
 	ErrFileNotNew            = errors.New("importing already running or done")
 	ErrTooManyJobs           = errors.New("too many pending jobs")
 	ErrAlreadyCanceled       = errors.New("job is already canceled")
-- 
GitLab