// // tank // // Import and Playlist Daemon for autoradio project // // // Copyright (C) 2017-2019 Christian Pointner <equinox@helsinki.at> // // This file is part of tank. // // tank is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation, either version 3 of the License, or // any later version. // // tank is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // // You should have received a copy of the GNU General Public License // along with tank. If not, see <http://www.gnu.org/licenses/>. // package v1 import ( "errors" "fmt" "io" "io/ioutil" "mime/multipart" "net/http" "os" "path/filepath" "strconv" "strings" "sync/atomic" "github.com/gin-gonic/gin" "gitlab.servus.at/autoradio/tank/importer" ) //******* simple binary http upload func (api *API) UploadFileSimple(c *gin.Context) { showID := c.Param("show-id") if authorized, _ := authorizeRequest(c, showID); !authorized { return } id, err := idFromString(c.Param("file-id")) if err != nil { c.JSON(http.StatusBadRequest, ErrorResponse{Error: "invalid file-id: " + err.Error()}) return } job, err := api.importer.GetJob(showID, id) if err != nil { sendError(c, err) return } done, err := job.AttachUploader(uint64(c.Request.ContentLength), "simple HTTP uploader", c.Request.Body) if err != nil { sendError(c, err) return } result := <-done if result == nil { sendError(c, errors.New("upload failed for unknown reason")) return } if result.Err != nil { sendError(c, result) return } c.JSON(http.StatusOK, result) } //******* flow.js type FlowJSFileChunkState uint32 const ( FlowJSFileChunkPending FlowJSFileChunkState = iota FlowJSFileChunkWriting FlowJSFileChunkCompleted ) type FlowJSFileChunk struct { state FlowJSFileChunkState filename string file *os.File completed chan struct{} } func (ch *FlowJSFileChunk) GetState() FlowJSFileChunkState { return FlowJSFileChunkState(atomic.LoadUint32((*uint32)(&ch.state))) } func (ch *FlowJSFileChunk) Read(p []byte) (int, error) { return ch.file.Read(p) } func (ch *FlowJSFileChunk) ReadFrom(r io.Reader) (n int64, err error) { if !atomic.CompareAndSwapUint32((*uint32)(&ch.state), uint32(FlowJSFileChunkPending), uint32(FlowJSFileChunkWriting)) { return 0, ErrFlowJSChunkAlreadUploading } // fmt.Printf("starting upload: %s\n", ch.filename) if ch.file, err = os.OpenFile(ch.filename, os.O_RDWR|os.O_CREATE|os.O_EXCL, 0600); err != nil { return } return io.Copy(ch.file, r) } func (ch *FlowJSFileChunk) Reset() (err error) { // fmt.Printf("reseting chunk: %s\n", ch.filename) if err = ch.file.Truncate(0); err != nil { return } if _, err = ch.file.Seek(0, io.SeekStart); err != nil { return } atomic.StoreUint32((*uint32)(&ch.state), uint32(FlowJSFileChunkPending)) return } func (ch *FlowJSFileChunk) Complete() (err error) { // fmt.Printf("completing chunk: %s\n", ch.filename) if _, err = ch.file.Seek(0, io.SeekStart); err != nil { return } atomic.StoreUint32((*uint32)(&ch.state), uint32(FlowJSFileChunkCompleted)) close(ch.completed) return } func (ch *FlowJSFileChunk) Cleanup() (err error) { // fmt.Printf("cleaning up chunk: %s\n", ch.filename) if err = ch.file.Close(); err != nil { return err } return os.Remove(ch.filename) } //*** type FlowJSFile struct { id string size uint64 chunks []FlowJSFileChunk chunksPath string readOffset int job *importer.Job } func newFlowJSFile(id string, numChunks uint64, size uint64, job *importer.Job) (*FlowJSFile, error) { f := &FlowJSFile{id: id, size: size, job: job} f.chunksPath = filepath.Join(job.WorkDir, "chunks") f.chunks = make([]FlowJSFileChunk, numChunks) for i := range f.chunks { f.chunks[i].filename = filepath.Join(f.chunksPath, strconv.FormatUint(uint64(i), 10)+".chunk") f.chunks[i].completed = make(chan struct{}) } err := os.Mkdir(f.chunksPath, 0700) if os.IsExist(err) { err = nil } return f, err } func (f *FlowJSFile) Read(p []byte) (n int, err error) { for { if f.readOffset >= len(f.chunks) { // fmt.Printf("all chunks are read. we are done!\n") // we already read all the chunks return 0, io.EOF } // wait for chunk to be completed select { case <-f.job.Ctx.Done(): return 0, f.job.Ctx.Err() case <-f.chunks[f.readOffset].completed: } if n, err = f.chunks[f.readOffset].Read(p); err == nil || err == io.EOF { if n > 0 { // fmt.Printf("successfully read %d bytes from: %s\n", n, f.chunks[f.readOffset].filename) err = nil return } // we are at the end of the chunk but haven't read anything yet... so just move on } // fmt.Printf("done reading chunk: %s\n", f.chunks[f.readOffset].filename) // we are done with this chunk, clean it up... if err = f.chunks[f.readOffset].Cleanup(); err != nil { return } // ...and proceed to the next f.readOffset++ if n > 0 { // if this was the last chunk the next call to Read() will return (0, io.EOF) // for now we will return (n, nil) return } } } //*** func getOrNewFlowJSFile(job *importer.Job, flowId string, chunk, totalChunks, totalSize uint64) (file *FlowJSFile, err error) { var src io.Reader if src, err = job.GetAttachedUploader(); err != nil && err != importer.ErrSourceNotYetAttached { return } if err == importer.ErrSourceNotYetAttached { if file, err = newFlowJSFile(flowId, totalChunks, totalSize, job); err != nil { return } _, err = job.AttachUploader(file.size, "flow.js uploader: "+file.id, file) switch err { case nil: case importer.ErrSourceAlreadyAttached: // there has been a race and the other thread won! file = nil if src, err = job.GetAttachedUploader(); err != nil { return } default: return } } if file == nil { var ok bool if file, ok = src.(*FlowJSFile); !ok { err = ErrNotFlowJSUpload return } } return } //*** const ( FlowJSParamID string = "flowIdentifier" FlowJSParamChunk string = "flowChunkNumber" FlowJSParamTotalChunks string = "flowTotalChunks" FlowJSParamCurrentChunkSize string = "flowCurrentChunkSize" FlowJSParamTotalSize string = "flowTotalSize" FlowJSParamFile string = "file" ) func getFlowJSParameterFromMultipart(r *http.Request) (id string, chunk, totalChunks, chunkSize, totalSize uint64, data io.Reader, err error) { var mr *multipart.Reader if mr, err = r.MultipartReader(); err != nil { return } var p *multipart.Part var pd []byte for { if p, err = mr.NextPart(); err == io.EOF { err = nil break } if err != nil { return } pn := p.FormName() if pn == FlowJSParamFile { data = p break } if pd, err = ioutil.ReadAll(p); err != nil { return } switch pn { case FlowJSParamID: id = string(pd) case FlowJSParamChunk: if chunk, err = strconv.ParseUint(string(pd), 10, 64); err != nil { err = fmt.Errorf("invalid parameter '%s': %v", FlowJSParamChunk, err.Error()) return } case FlowJSParamTotalChunks: if totalChunks, err = strconv.ParseUint(string(pd), 10, 64); err != nil { err = fmt.Errorf("invalid parameter '%s': %v", FlowJSParamTotalChunks, err.Error()) return } case FlowJSParamCurrentChunkSize: if chunkSize, err = strconv.ParseUint(string(pd), 10, 64); err != nil { err = fmt.Errorf("invalid parameter '%s': %v", FlowJSParamCurrentChunkSize, err.Error()) return } case FlowJSParamTotalSize: if totalSize, err = strconv.ParseUint(string(pd), 10, 64); err != nil { err = fmt.Errorf("invalid parameter '%s': %v", FlowJSParamTotalSize, err.Error()) return } } } var missing []string if id == "" { missing = append(missing, FlowJSParamID) } if chunk == 0 { missing = append(missing, FlowJSParamChunk) } if totalChunks == 0 { missing = append(missing, FlowJSParamTotalChunks) } if chunkSize == 0 { missing = append(missing, FlowJSParamCurrentChunkSize) } if totalSize == 0 { missing = append(missing, FlowJSParamTotalSize) } if data == nil { missing = append(missing, FlowJSParamFile) } if len(missing) > 0 { err = errors.New("missing/invalid mandatory parameter: " + strings.Join(missing, ", ")) } return } //*** func getFlowJSParameterFromQuery(r *http.Request) (id string, chunk, totalChunks, totalSize uint64, err error) { q := r.URL.Query() id = q.Get(FlowJSParamID) if chunk, err = strconv.ParseUint(q.Get(FlowJSParamChunk), 10, 64); err != nil { err = fmt.Errorf("invalid parameter '%s': %v", FlowJSParamChunk, err.Error()) return } if totalChunks, err = strconv.ParseUint(q.Get(FlowJSParamTotalChunks), 10, 64); err != nil { err = fmt.Errorf("invalid parameter '%s': %v", FlowJSParamTotalChunks, err.Error()) return } if totalSize, err = strconv.ParseUint(q.Get(FlowJSParamTotalSize), 10, 64); err != nil { err = fmt.Errorf("invalid parameter '%s': %v", FlowJSParamTotalSize, err.Error()) return } var missing []string if id == "" { missing = append(missing, FlowJSParamID) } if chunk == 0 { missing = append(missing, FlowJSParamChunk) } if totalChunks == 0 { missing = append(missing, FlowJSParamTotalChunks) } if totalSize == 0 { missing = append(missing, FlowJSParamTotalSize) } if len(missing) > 0 { err = errors.New("missing/invalid mandatory parameter: " + strings.Join(missing, ", ")) } return } func (api *API) UploadFileFlowJS(c *gin.Context) { showID := c.Param("show-id") if authorized, _ := authorizeRequest(c, showID); !authorized { return } id, err := idFromString(c.Param("file-id")) if err != nil { c.JSON(http.StatusBadRequest, ErrorResponse{Error: "invalid file-id: " + err.Error()}) return } flowId, chunk, totalChunks, chunkSize, totalSize, data, err := getFlowJSParameterFromMultipart(c.Request) if err != nil { c.JSON(http.StatusBadRequest, ErrorResponse{Error: err.Error()}) return } job, err := api.importer.GetJob(showID, id) if err != nil { sendError(c, err) return } file, err := getOrNewFlowJSFile(job, flowId, chunk, totalChunks, totalSize) if file == nil { sendError(c, err) } if chunk > uint64(len(file.chunks)) { errStr := fmt.Sprintf("invalid chunk number %d, the flow.js upload is only %d chunks long", chunk, len(file.chunks)) c.JSON(http.StatusConflict, ErrorResponse{Error: errStr}) return } switch file.chunks[chunk-1].GetState() { case FlowJSFileChunkWriting: fallthrough case FlowJSFileChunkCompleted: c.JSON(http.StatusOK, nil) } n, err := file.chunks[chunk-1].ReadFrom(data) if err != nil { if err := file.chunks[chunk-1].Reset(); err != nil { c.JSON(http.StatusInternalServerError, ErrorResponse{Error: "failed to reset chunk after failed upload attempt: " + err.Error()}) return } c.JSON(http.StatusBadRequest, ErrorResponse{Error: "chunk upload failed: " + err.Error()}) return } if n != int64(chunkSize) { if err := file.chunks[chunk-1].Reset(); err != nil { c.JSON(http.StatusInternalServerError, ErrorResponse{Error: "failed to reset chunk after failed upload attempt: " + err.Error()}) return } errStr := fmt.Sprintf("chunk upload is complete but has incorrect size, got %d bytes but expected %d bytes", n, chunkSize) c.JSON(http.StatusBadRequest, ErrorResponse{Error: errStr}) return } if err := file.chunks[chunk-1].Complete(); err != nil { c.JSON(http.StatusInternalServerError, ErrorResponse{Error: "failed to mark chunk completed: " + err.Error()}) return } c.JSON(http.StatusOK, nil) } func (api *API) TestFileFlowJS(c *gin.Context) { showID := c.Param("show-id") if authorized, _ := authorizeRequest(c, showID); !authorized { return } id, err := idFromString(c.Param("file-id")) if err != nil { c.JSON(http.StatusBadRequest, ErrorResponse{Error: "invalid file-id: " + err.Error()}) return } flowId, chunk, totalChunks, totalSize, err := getFlowJSParameterFromQuery(c.Request) if err != nil { c.JSON(http.StatusBadRequest, ErrorResponse{Error: err.Error()}) return } job, err := api.importer.GetJob(showID, id) if err != nil { sendError(c, err) return } file, err := getOrNewFlowJSFile(job, flowId, chunk, totalChunks, totalSize) if file == nil { sendError(c, err) } if chunk > uint64(len(file.chunks)) { errStr := fmt.Sprintf("invalid chunk number %d, the flow.js upload is only %d chunks long", chunk, len(file.chunks)) c.JSON(http.StatusConflict, ErrorResponse{Error: errStr}) return } switch file.chunks[chunk-1].GetState() { case FlowJSFileChunkPending: c.JSON(http.StatusNoContent, nil) case FlowJSFileChunkWriting: fallthrough case FlowJSFileChunkCompleted: c.JSON(http.StatusOK, nil) } }