Newer
Older
//
// tank
//
// Import and Playlist Daemon for autoradio project
//
//
// Copyright (C) 2017-2019 Christian Pointner <equinox@helsinki.at>
//
// This file is part of tank.
//
// tank is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// any later version.
//
// tank is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with tank. If not, see <http://www.gnu.org/licenses/>.
//
package v1
import (
"errors"
"net/http"
"os"
"path/filepath"
"github.com/gin-gonic/gin"
"gitlab.servus.at/autoradio/tank/importer"
)
//******* simple binary http upload
func (api *API) UploadFileSimple(c *gin.Context) {
showID := c.Param("show-id")
if authorized, _ := authorizeRequest(c, showID); !authorized {
return
}
id, err := idFromString(c.Param("file-id"))
if err != nil {
c.JSON(http.StatusBadRequest, ErrorResponse{Error: "invalid file-id: " + err.Error()})
return
}
job, err := api.importer.GetJob(showID, id)
if err != nil {
sendError(c, err)
return
}
done, err := job.AttachUploader(uint64(c.Request.ContentLength), "simple HTTP uploader", c.Request.Body)
if err != nil {
sendError(c, err)
return
}
result := <-done
if result == nil {
sendError(c, errors.New("upload failed for unknown reason"))
return
if result.Err != nil {
sendError(c, result)
return
}
c.JSON(http.StatusOK, result)
type FlowJSFileChunkState uint32
const (
FlowJSFileChunkPending FlowJSFileChunkState = iota
FlowJSFileChunkWriting
FlowJSFileChunkCompleted
)
file *os.File
completed chan struct{}
}
func (ch *FlowJSFileChunk) GetState() FlowJSFileChunkState {
return FlowJSFileChunkState(atomic.LoadUint32((*uint32)(&ch.state)))
}
func (ch *FlowJSFileChunk) Read(p []byte) (int, error) {
return ch.file.Read(p)
}
func (ch *FlowJSFileChunk) ReadFrom(r io.Reader) (n int64, err error) {
if !atomic.CompareAndSwapUint32((*uint32)(&ch.state), uint32(FlowJSFileChunkPending), uint32(FlowJSFileChunkWriting)) {
return 0, ErrFlowJSChunkAlreadUploading
// fmt.Printf("starting upload: %s\n", ch.filename)
if ch.file, err = os.OpenFile(ch.filename, os.O_RDWR|os.O_CREATE|os.O_EXCL, 0600); err != nil {
return io.Copy(ch.file, r)
}
func (ch *FlowJSFileChunk) Reset() (err error) {
// fmt.Printf("reseting chunk: %s\n", ch.filename)
if err = ch.file.Truncate(0); err != nil {
return
}
if _, err = ch.file.Seek(0, io.SeekStart); err != nil {
return
}
atomic.StoreUint32((*uint32)(&ch.state), uint32(FlowJSFileChunkPending))
return
}
func (ch *FlowJSFileChunk) Complete() (err error) {
// fmt.Printf("completing chunk: %s\n", ch.filename)
if _, err = ch.file.Seek(0, io.SeekStart); err != nil {
return
}
atomic.StoreUint32((*uint32)(&ch.state), uint32(FlowJSFileChunkCompleted))
close(ch.completed)
return
}
func (ch *FlowJSFileChunk) Cleanup() (err error) {
// fmt.Printf("cleaning up chunk: %s\n", ch.filename)
if err = ch.file.Close(); err != nil {
return err
}
return os.Remove(ch.filename)
size uint64
chunks []FlowJSFileChunk
chunksPath string
readOffset int
job *importer.Job
}
func newFlowJSFile(id string, numChunks uint64, size uint64, job *importer.Job) (*FlowJSFile, error) {
f := &FlowJSFile{id: id, size: size, job: job}
f.chunksPath = filepath.Join(job.WorkDir, "chunks")
f.chunks = make([]FlowJSFileChunk, numChunks)
for i := range f.chunks {
f.chunks[i].filename = filepath.Join(f.chunksPath, strconv.FormatUint(uint64(i), 10)+".chunk")
f.chunks[i].completed = make(chan struct{})
}
err := os.Mkdir(f.chunksPath, 0700)
if os.IsExist(err) {
err = nil
}
return f, err
}
func (f *FlowJSFile) Read(p []byte) (n int, err error) {
// fmt.Printf("all chunks are read. we are done!\n")
// we already read all the chunks
return 0, io.EOF
}
// wait for chunk to be completed
select {
case <-f.job.Ctx.Done():
return 0, f.job.Ctx.Err()
case <-f.chunks[f.readOffset].completed:
}
if n, err = f.chunks[f.readOffset].Read(p); err == nil || err == io.EOF {
if n > 0 {
// fmt.Printf("successfully read %d bytes from: %s\n", n, f.chunks[f.readOffset].filename)
// we are at the end of the chunk but haven't read anything yet... so just move on
// fmt.Printf("done reading chunk: %s\n", f.chunks[f.readOffset].filename)
// we are done with this chunk, clean it up...
if err = f.chunks[f.readOffset].Cleanup(); err != nil {
return
}
// ...and proceed to the next
f.readOffset++
if n > 0 {
// if this was the last chunk the next call to Read() will return (0, io.EOF)
// for now we will return (n, nil)
return
}
//***
func getOrNewFlowJSFile(job *importer.Job, flowId string, chunk, totalChunks, totalSize uint64) (file *FlowJSFile, err error) {
var src io.Reader
if src, err = job.GetAttachedUploader(); err != nil && err != importer.ErrSourceNotYetAttached {
return
}
if err == importer.ErrSourceNotYetAttached {
if file, err = newFlowJSFile(flowId, totalChunks, totalSize, job); err != nil {
return
}
_, err = job.AttachUploader(file.size, "flow.js uploader: "+file.id, file)
switch err {
case nil:
case importer.ErrSourceAlreadyAttached:
// there has been a race and the other thread won!
file = nil
if src, err = job.GetAttachedUploader(); err != nil {
return
}
default:
return
}
}
if file == nil {
var ok bool
if file, ok = src.(*FlowJSFile); !ok {
err = ErrNotFlowJSUpload
return
}
}
return
}
//***
const (
FlowJSParamID string = "flowIdentifier"
FlowJSParamChunk string = "flowChunkNumber"
FlowJSParamTotalChunks string = "flowTotalChunks"
FlowJSParamCurrentChunkSize string = "flowCurrentChunkSize"
FlowJSParamTotalSize string = "flowTotalSize"
FlowJSParamFile string = "file"
)
func getFlowJSParameterFromMultipart(r *http.Request) (id string, chunk, totalChunks, chunkSize, totalSize uint64, data io.Reader, err error) {
var mr *multipart.Reader
if mr, err = r.MultipartReader(); err != nil {
return
}
var p *multipart.Part
var pd []byte
for {
if p, err = mr.NextPart(); err == io.EOF {
err = nil
break
}
if err != nil {
return
}
pn := p.FormName()
break
}
if pd, err = ioutil.ReadAll(p); err != nil {
return
}
switch pn {
if chunk, err = strconv.ParseUint(string(pd), 10, 64); err != nil {
err = fmt.Errorf("invalid parameter '%s': %v", FlowJSParamChunk, err.Error())
if totalChunks, err = strconv.ParseUint(string(pd), 10, 64); err != nil {
err = fmt.Errorf("invalid parameter '%s': %v", FlowJSParamTotalChunks, err.Error())
if chunkSize, err = strconv.ParseUint(string(pd), 10, 64); err != nil {
err = fmt.Errorf("invalid parameter '%s': %v", FlowJSParamCurrentChunkSize, err.Error())
if totalSize, err = strconv.ParseUint(string(pd), 10, 64); err != nil {
err = fmt.Errorf("invalid parameter '%s': %v", FlowJSParamTotalSize, err.Error())
var missing []string
if id == "" {
missing = append(missing, FlowJSParamCurrentChunkSize)
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
missing = append(missing, FlowJSParamFile)
}
if len(missing) > 0 {
err = errors.New("missing/invalid mandatory parameter: " + strings.Join(missing, ", "))
}
return
}
//***
func getFlowJSParameterFromQuery(r *http.Request) (id string, chunk, totalChunks, totalSize uint64, err error) {
q := r.URL.Query()
id = q.Get(FlowJSParamID)
if chunk, err = strconv.ParseUint(q.Get(FlowJSParamChunk), 10, 64); err != nil {
err = fmt.Errorf("invalid parameter '%s': %v", FlowJSParamChunk, err.Error())
return
}
if totalChunks, err = strconv.ParseUint(q.Get(FlowJSParamTotalChunks), 10, 64); err != nil {
err = fmt.Errorf("invalid parameter '%s': %v", FlowJSParamTotalChunks, err.Error())
return
}
if totalSize, err = strconv.ParseUint(q.Get(FlowJSParamTotalSize), 10, 64); err != nil {
err = fmt.Errorf("invalid parameter '%s': %v", FlowJSParamTotalSize, err.Error())
return
}
var missing []string
if id == "" {
missing = append(missing, FlowJSParamID)
}
if chunk == 0 {
missing = append(missing, FlowJSParamChunk)
}
if totalChunks == 0 {
missing = append(missing, FlowJSParamTotalChunks)
}
if totalSize == 0 {
missing = append(missing, FlowJSParamTotalSize)
err = errors.New("missing/invalid mandatory parameter: " + strings.Join(missing, ", "))
func (api *API) UploadFileFlowJS(c *gin.Context) {
showID := c.Param("show-id")
if authorized, _ := authorizeRequest(c, showID); !authorized {
return
}
id, err := idFromString(c.Param("file-id"))
if err != nil {
c.JSON(http.StatusBadRequest, ErrorResponse{Error: "invalid file-id: " + err.Error()})
return
}
flowId, chunk, totalChunks, chunkSize, totalSize, data, err := getFlowJSParameterFromMultipart(c.Request)
if err != nil {
c.JSON(http.StatusBadRequest, ErrorResponse{Error: err.Error()})
return
}
job, err := api.importer.GetJob(showID, id)
if err != nil {
sendError(c, err)
return
}
file, err := getOrNewFlowJSFile(job, flowId, chunk, totalChunks, totalSize)
if file == nil {
sendError(c, err)
}
if chunk > uint64(len(file.chunks)) {
errStr := fmt.Sprintf("invalid chunk number %d, the flow.js upload is only %d chunks long", chunk, len(file.chunks))
c.JSON(http.StatusConflict, ErrorResponse{Error: errStr})
return
}
switch file.chunks[chunk-1].GetState() {
case FlowJSFileChunkWriting:
fallthrough
case FlowJSFileChunkCompleted:
c.JSON(http.StatusOK, nil)
}
n, err := file.chunks[chunk-1].ReadFrom(data)
if err != nil {
if err := file.chunks[chunk-1].Reset(); err != nil {
c.JSON(http.StatusInternalServerError, ErrorResponse{Error: "failed to reset chunk after failed upload attempt: " + err.Error()})
c.JSON(http.StatusBadRequest, ErrorResponse{Error: "chunk upload failed: " + err.Error()})
return
}
if n != int64(chunkSize) {
if err := file.chunks[chunk-1].Reset(); err != nil {
c.JSON(http.StatusInternalServerError, ErrorResponse{Error: "failed to reset chunk after failed upload attempt: " + err.Error()})
errStr := fmt.Sprintf("chunk upload is complete but has incorrect size, got %d bytes but expected %d bytes", n, chunkSize)
c.JSON(http.StatusBadRequest, ErrorResponse{Error: errStr})
return
if err := file.chunks[chunk-1].Complete(); err != nil {
c.JSON(http.StatusInternalServerError, ErrorResponse{Error: "failed to mark chunk completed: " + err.Error()})
return
}
c.JSON(http.StatusOK, nil)
func (api *API) TestFileFlowJS(c *gin.Context) {
showID := c.Param("show-id")
if authorized, _ := authorizeRequest(c, showID); !authorized {
return
}
id, err := idFromString(c.Param("file-id"))
if err != nil {
c.JSON(http.StatusBadRequest, ErrorResponse{Error: "invalid file-id: " + err.Error()})
return
}
flowId, chunk, totalChunks, totalSize, err := getFlowJSParameterFromQuery(c.Request)
if err != nil {
c.JSON(http.StatusBadRequest, ErrorResponse{Error: err.Error()})
return
}
job, err := api.importer.GetJob(showID, id)
if err != nil {
sendError(c, err)
return
}
file, err := getOrNewFlowJSFile(job, flowId, chunk, totalChunks, totalSize)
if file == nil {
sendError(c, err)
}
if chunk > uint64(len(file.chunks)) {
errStr := fmt.Sprintf("invalid chunk number %d, the flow.js upload is only %d chunks long", chunk, len(file.chunks))
c.JSON(http.StatusConflict, ErrorResponse{Error: errStr})
return
}
switch file.chunks[chunk-1].GetState() {
case FlowJSFileChunkPending:
c.JSON(http.StatusNoContent, nil)
case FlowJSFileChunkWriting:
fallthrough
case FlowJSFileChunkCompleted:
c.JSON(http.StatusOK, nil)