diff --git a/api/v1/api_uploads.go b/api/v1/api_uploads.go index b04d1c1405c52a9fddec2b3197dd25fa0c3a2094..56572570a97ddf82cbe1297bc1e9a4c14b25abe7 100644 --- a/api/v1/api_uploads.go +++ b/api/v1/api_uploads.go @@ -26,10 +26,13 @@ package v1 import ( "errors" + "fmt" "io" "net/http" "os" "path/filepath" + "strconv" + "sync/atomic" "github.com/gorilla/mux" "gitlab.servus.at/autoradio/tank/importer" @@ -71,17 +74,44 @@ func (api *API) UploadFileSimple() http.Handler { //******* flow.js +type FlowJSFileChunkState uint32 + +const ( + FlowJSFileChunkPending FlowJSFileChunkState = iota + FlowJSFileChunkWriting + FlowJSFileChunkCompleted +) + type FlowJSFileChunk struct { + state FlowJSFileChunkState file *os.File completed chan struct{} } +func (ch *FlowJSFileChunk) GetState() FlowJSFileChunkState { + return FlowJSFileChunkState(atomic.LoadUint32((*uint32)(&ch.state))) +} + func (ch *FlowJSFileChunk) Read(p []byte) (int, error) { return ch.file.Read(p) } -func (ch *FlowJSFileChunk) Write(p []byte) (int, error) { - return ch.file.Write(p) +func (ch *FlowJSFileChunk) ReadFrom(r io.Reader) (int64, error) { + if !atomic.CompareAndSwapUint32((*uint32)(&ch.state), uint32(FlowJSFileChunkPending), uint32(FlowJSFileChunkWriting)) { + return 0, errors.New("chunk is already uploading or done") + } + + n, err := io.Copy(ch.file, r) + if err != nil { + if err := ch.file.Truncate(0); err != nil { + return n, err + } + if _, err := ch.file.Seek(0, io.SeekStart); err != nil { + return n, err + } + atomic.StoreUint32((*uint32)(&ch.state), uint32(FlowJSFileChunkPending)) + } + return n, err } func (ch *FlowJSFileChunk) Complete() (err error) { @@ -101,6 +131,7 @@ func (ch *FlowJSFileChunk) Cleanup() (err error) { } type FlowJSFile struct { + id string size uint64 chunks []FlowJSFileChunk chunksPath string @@ -108,14 +139,17 @@ type FlowJSFile struct { job *importer.Job } -func newFlowJSFile(size uint64, numChunks uint, job *importer.Job) (*FlowJSFile, error) { - f := &FlowJSFile{size: size, job: job} +func newFlowJSFile(id string, numChunks uint64, size uint64, job *importer.Job) (*FlowJSFile, error) { + f := &FlowJSFile{id: id, size: size, job: job} f.chunksPath = filepath.Join(job.WorkDir, "chunks") f.chunks = make([]FlowJSFileChunk, numChunks) for i := range f.chunks { f.chunks[i].completed = make(chan struct{}) } err := os.Mkdir(f.chunksPath, 0700) + if os.IsExist(err) { + err = nil + } return f, err } @@ -165,9 +199,94 @@ func (api *API) UploadFileFlowJS() http.Handler { }) } +func getFlowJSParameterFromQuery(r *http.Request) (id string, chunk, totalChunks, totalSize uint64, err error) { + q := r.URL.Query() + id = q.Get("flowIdentifier") + if chunk, err = strconv.ParseUint(q.Get("flowChunkNumber"), 10, 64); err != nil { + err = errors.New("invalid query parameter 'flowChunkNumber': " + err.Error()) + return + } + if chunk < 1 { + err = errors.New("invalid chunk number: 0") + return + } + if totalChunks, err = strconv.ParseUint(q.Get("flowTotalChunks"), 10, 64); err != nil { + err = errors.New("invalid query parameter 'flowTotalChunks': " + err.Error()) + return + } + if totalSize, err = strconv.ParseUint(q.Get("flowTotalSize"), 10, 64); err != nil { + err = errors.New("invalid query parameter 'flowTotalSize': " + err.Error()) + return + } + return +} + func (api *API) TestFileFlowJS() http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - // TODO: implement this - sendWebResponse(w, http.StatusNotImplemented, ErrorResponse{"flow.js testing is not yet implemented", nil}) + vars := mux.Vars(r) + id, err := idFromString(vars["file-id"]) + if err != nil { + sendWebResponse(w, http.StatusBadRequest, ErrorResponse{Error: "invalid file-id: " + err.Error()}) + return + } + flowId, chunk, totalChunks, totalSize, err := getFlowJSParameterFromQuery(r) + if err != nil { + sendWebResponse(w, http.StatusBadRequest, ErrorResponse{Error: err.Error()}) + return + } + + job, err := api.importer.GetJob(vars["group-id"], id) + if err != nil { + sendError(w, err) + return + } + src, err := job.GetAttachedUploader() + if err != nil && err != importer.ErrSourceNotYetAttached { + sendError(w, err) + return + } + + var file *FlowJSFile + if err == importer.ErrSourceNotYetAttached { + if file, err = newFlowJSFile(flowId, totalChunks, totalSize, job); err != nil { + sendError(w, err) + return + } + + _, err = job.AttachUploader(file.size, file) + switch err { + case nil: + case importer.ErrSourceAlreadyAttached: + // there has been a race and the other thread won! + file = nil + if src, err = job.GetAttachedUploader(); err != nil { + sendError(w, err) + return + } + default: + sendError(w, err) + return + } + } + if file == nil { + var ok bool + if file, ok = src.(*FlowJSFile); !ok { + sendWebResponse(w, http.StatusConflict, ErrorResponse{Error: "this is not a flow.js upload"}) + return + } + } + + if chunk > uint64(len(file.chunks)) { + sendWebResponse(w, http.StatusConflict, ErrorResponse{Error: fmt.Sprintf("invalid chunk number %d, the flow.js upload is only %d chunks long", chunk, len(file.chunks))}) + return + } + switch file.chunks[chunk-1].GetState() { + case FlowJSFileChunkPending: + sendWebResponse(w, http.StatusNoContent, nil) + case FlowJSFileChunkWriting: + fallthrough + case FlowJSFileChunkCompleted: + sendWebResponse(w, http.StatusOK, nil) + } }) } diff --git a/api/v1/utils.go b/api/v1/utils.go index f88904f8da66de085fa3d289a8e652dad7934ba0..367067b7c248eeb37c00a9fd8d26ec2448fd4507 100644 --- a/api/v1/utils.go +++ b/api/v1/utils.go @@ -65,6 +65,10 @@ func sendError(w http.ResponseWriter, err error) { code = http.StatusConflict case importer.ErrSourceAlreadyAttached: code = http.StatusConflict + case importer.ErrSourceNotYetAttached: + code = http.StatusConflict + case importer.ErrSourceNotAUpload: + code = http.StatusConflict case importer.ErrFileNotNew: code = http.StatusConflict case importer.ErrTooManyJobs: diff --git a/importer/job.go b/importer/job.go index 15ca40ff9c0773ac98ab790c9d6aec5939896551..788fbf2393374df2ae50f21862ae8197dbfa6550 100644 --- a/importer/job.go +++ b/importer/job.go @@ -161,7 +161,7 @@ func (job *Job) AttachUploader(len uint64, r io.Reader) (<-chan *JobSourceResult // only allow to attach external sources if the job's source was in fact an upload URL if job.Source.Scheme != SourceSchemeUpload { - return nil, ErrSourceAlreadyAttached + return nil, ErrSourceNotAUpload } if ok := atomic.CompareAndSwapUint32(&job.sourceSet, 0, 1); !ok { return nil, ErrSourceAlreadyAttached @@ -172,18 +172,18 @@ func (job *Job) AttachUploader(len uint64, r io.Reader) (<-chan *JobSourceResult return src.done, nil } -func (job *Job) GetAttachedUploader() io.Reader { - select { - case <-job.subC.sourceAttached: - default: - return nil +func (job *Job) GetAttachedUploader() (io.Reader, error) { + if atomic.LoadUint32(&job.sourceSet) != 1 { + return nil, ErrSourceNotYetAttached } + // make sure that AttachUploader() has finished + <-job.subC.sourceAttached src, ok := job.source.(*JobSourceUpload) if !ok { - return nil + return nil, ErrSourceNotAUpload } - return src.r + return src.r, nil } func (job *Job) Cancel() { diff --git a/importer/types.go b/importer/types.go index acd4e30fdf236d82ae80cbe0b274e87de5cf4e88..ac941b691070e6f727a25df7c5388adfbbecd8f9 100644 --- a/importer/types.go +++ b/importer/types.go @@ -50,6 +50,8 @@ var ( ErrSourceNotSupported = errors.New("source uri format is not supported") ErrImportNotRunning = errors.New("import not running") ErrSourceAlreadyAttached = errors.New("source is already attached") + ErrSourceNotYetAttached = errors.New("source is not yet attached") + ErrSourceNotAUpload = errors.New("source is not a upload") ErrFileNotNew = errors.New("importing already running or done") ErrTooManyJobs = errors.New("too many pending jobs") ErrAlreadyCanceled = errors.New("job is already canceled")