Skip to content
Snippets Groups Projects
Commit 795b6cdb authored by Christian Pointner's avatar Christian Pointner
Browse files

added flow js upload handler, WIP

parent a454613b
No related branches found
No related tags found
No related merge requests found
......@@ -25,13 +25,17 @@
package v1
import (
//"encoding/base64"
"errors"
"fmt"
"io"
"io/ioutil"
"mime/multipart"
"net/http"
"os"
"path/filepath"
"strconv"
"strings"
"sync/atomic"
"github.com/gorilla/mux"
......@@ -84,6 +88,7 @@ const (
type FlowJSFileChunk struct {
state FlowJSFileChunkState
filename string
file *os.File
completed chan struct{}
}
......@@ -96,28 +101,33 @@ func (ch *FlowJSFileChunk) Read(p []byte) (int, error) {
return ch.file.Read(p)
}
func (ch *FlowJSFileChunk) ReadFrom(r io.Reader) (int64, error) {
func (ch *FlowJSFileChunk) ReadFrom(r io.Reader) (n int64, err error) {
if !atomic.CompareAndSwapUint32((*uint32)(&ch.state), uint32(FlowJSFileChunkPending), uint32(FlowJSFileChunkWriting)) {
return 0, errors.New("chunk is already uploading or done")
return 0, ErrFlowJSChunkAlreadUploading
}
n, err := io.Copy(ch.file, r)
if err != nil {
if err := ch.file.Truncate(0); err != nil {
return n, err
}
if _, err := ch.file.Seek(0, io.SeekStart); err != nil {
return n, err
}
atomic.StoreUint32((*uint32)(&ch.state), uint32(FlowJSFileChunkPending))
if ch.file, err = os.OpenFile(ch.filename, os.O_WRONLY|os.O_CREATE|os.O_EXCL, 0600); err != nil {
return
}
return n, err
return io.Copy(ch.file, r)
}
func (ch *FlowJSFileChunk) Reset() (err error) {
if err = ch.file.Truncate(0); err != nil {
return
}
if _, err = ch.file.Seek(0, io.SeekStart); err != nil {
return
}
atomic.StoreUint32((*uint32)(&ch.state), uint32(FlowJSFileChunkPending))
return
}
func (ch *FlowJSFileChunk) Complete() (err error) {
if _, err = ch.file.Seek(0, io.SeekStart); err != nil {
return
}
atomic.StoreUint32((*uint32)(&ch.state), uint32(FlowJSFileChunkCompleted))
close(ch.completed)
return
}
......@@ -146,6 +156,7 @@ func newFlowJSFile(id string, numChunks uint64, size uint64, job *importer.Job)
f.chunksPath = filepath.Join(job.WorkDir, "chunks")
f.chunks = make([]FlowJSFileChunk, numChunks)
for i := range f.chunks {
f.chunks[i].filename = filepath.Join(f.chunksPath, strconv.FormatUint(uint64(i), 10)+".chunk")
f.chunks[i].completed = make(chan struct{})
}
err := os.Mkdir(f.chunksPath, 0700)
......@@ -233,10 +244,134 @@ func getOrNewFlowJSFile(job *importer.Job, flowId string, chunk, totalChunks, to
//***
func getFlowJSParameterFromMultipart(r *http.Request) (id string, chunk, totalChunks, chunkSize, totalSize uint64, data io.Reader, err error) {
var mr *multipart.Reader
if mr, err = r.MultipartReader(); err != nil {
return
}
var p *multipart.Part
var pd []byte
for {
if p, err = mr.NextPart(); err == io.EOF {
err = nil
break
}
if err != nil {
return
}
pn := p.FormName()
if pn == "file" {
data = p // base64.NewDecoder(base64.StdEncoding, p)
break
}
if pd, err = ioutil.ReadAll(p); err != nil {
return
}
switch pn {
case "flowIdentifier":
id = string(pd)
case "flowChunkNumber":
if chunk, err = strconv.ParseUint(string(pd), 10, 64); err != nil {
err = errors.New("invalid parameter 'flowChunkNumber': " + err.Error())
return
}
case "flowTotalChunks":
if totalChunks, err = strconv.ParseUint(string(pd), 10, 64); err != nil {
err = errors.New("invalid parameter 'flowTotalChunk': " + err.Error())
return
}
case "flowCurrentChunkSize":
if chunkSize, err = strconv.ParseUint(string(pd), 10, 64); err != nil {
err = errors.New("invalid parameter 'flowCurrentChunkSize': " + err.Error())
return
}
case "flowTotalSize":
if totalSize, err = strconv.ParseUint(string(pd), 10, 64); err != nil {
err = errors.New("invalid parameter 'flowTotalSize': " + err.Error())
return
}
}
}
var missing []string
if id == "" {
missing = append(missing, "flowIdentifier")
}
if chunk == 0 {
missing = append(missing, "flowChunkNumber")
}
if totalChunks == 0 {
missing = append(missing, "flowTotalChunks")
}
if chunkSize == 0 {
missing = append(missing, "flowCurrentChunkSize")
}
if totalSize == 0 {
missing = append(missing, "flowTotalSize")
}
if data == nil {
missing = append(missing, "file")
}
if len(missing) > 0 {
err = errors.New("missing mandatory parameter: " + strings.Join(missing, ", "))
}
return
}
func (api *API) UploadFileFlowJS() http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// TODO: implement this
sendWebResponse(w, http.StatusNotImplemented, ErrorResponse{"flow.js upload is not yet implemented", nil})
vars := mux.Vars(r)
id, err := idFromString(vars["file-id"])
if err != nil {
sendWebResponse(w, http.StatusBadRequest, ErrorResponse{Error: "invalid file-id: " + err.Error()})
return
}
flowId, chunk, totalChunks, chunkSize, totalSize, data, err := getFlowJSParameterFromMultipart(r)
if err != nil {
sendWebResponse(w, http.StatusBadRequest, ErrorResponse{Error: err.Error()})
return
}
job, err := api.importer.GetJob(vars["group-id"], id)
if err != nil {
sendError(w, err)
return
}
file, err := getOrNewFlowJSFile(job, flowId, chunk, totalChunks, totalSize)
if file == nil {
sendError(w, err)
}
if chunk > uint64(len(file.chunks)) {
errStr := fmt.Sprintf("invalid chunk number %d, the flow.js upload is only %d chunks long", chunk, len(file.chunks))
sendWebResponse(w, http.StatusConflict, ErrorResponse{Error: errStr})
return
}
switch file.chunks[chunk-1].GetState() {
case FlowJSFileChunkWriting:
fallthrough
case FlowJSFileChunkCompleted:
sendWebResponse(w, http.StatusOK, nil)
}
n, err := file.chunks[chunk-1].ReadFrom(data)
if err != nil {
if err := file.chunks[chunk-1].Reset(); err != nil {
sendWebResponse(w, http.StatusInternalServerError, ErrorResponse{Error: "failed to reset chunk after failed upload attempt: " + err.Error()})
return
}
sendWebResponse(w, http.StatusBadRequest, ErrorResponse{Error: "chunk upload failed: " + err.Error()})
return
}
if n != int64(chunkSize) {
errStr := fmt.Sprintf("chunk upload is complete but has incorrect size, got %d bytes but expected %d bytes", n, chunkSize)
sendWebResponse(w, http.StatusBadRequest, ErrorResponse{Error: errStr})
return
}
sendWebResponse(w, http.StatusOK, nil)
})
}
......
......@@ -25,11 +25,18 @@
package v1
import (
"errors"
"gitlab.servus.at/autoradio/tank/importer"
"gitlab.servus.at/autoradio/tank/store"
)
// common
var (
ErrNotFlowJSUpload = errors.New("this is not a flow.js upload")
ErrFlowJSChunkAlreadUploading = errors.New("chunk is already uploading or done")
)
type ErrorResponse struct {
Error string `json:"error,omitempty"`
Details interface{} `json:"details,omitempty"`
......
......@@ -26,7 +26,6 @@ package v1
import (
"encoding/json"
"errors"
"net/http"
"strconv"
......@@ -34,10 +33,6 @@ import (
"gitlab.servus.at/autoradio/tank/store"
)
var (
ErrNotFlowJSUpload = errors.New("this is not a flow.js upload")
)
func idFromString(s string) (uint64, error) {
return strconv.ParseUint(s, 10, 64)
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment