Skip to content
Snippets Groups Projects
files_upload.go 12.6 KiB
Newer Older
  • Learn to ignore specific revisions
  • //
    //  tank
    //
    //  Import and Playlist Daemon for autoradio project
    //
    //
    
    //  Copyright (C) 2017-2019 Christian Pointner <equinox@helsinki.at>
    
    //
    //  This file is part of tank.
    //
    //  tank is free software: you can redistribute it and/or modify
    //  it under the terms of the GNU General Public License as published by
    //  the Free Software Foundation, either version 3 of the License, or
    //  any later version.
    //
    //  tank is distributed in the hope that it will be useful,
    //  but WITHOUT ANY WARRANTY; without even the implied warranty of
    //  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    //  GNU General Public License for more details.
    //
    //  You should have received a copy of the GNU General Public License
    //  along with tank. If not, see <http://www.gnu.org/licenses/>.
    //
    
    package v1
    
    import (
    	"errors"
    
    	"io/ioutil"
    	"mime/multipart"
    
    	"net/http"
    	"os"
    	"path/filepath"
    
    	"sync/atomic"
    
    	"github.com/gin-gonic/gin"
    
    	"gitlab.servus.at/autoradio/tank/importer"
    )
    
    //******* simple binary http upload
    
    
    func (api *API) UploadFileSimple(c *gin.Context) {
    	showID := c.Param("show-id")
    	if authorized, _ := authorizeRequest(c, showID); !authorized {
    		return
    	}
    
    	id, err := idFromString(c.Param("file-id"))
    	if err != nil {
    		c.JSON(http.StatusBadRequest, ErrorResponse{Error: "invalid file-id: " + err.Error()})
    		return
    	}
    	job, err := api.importer.GetJob(showID, id)
    	if err != nil {
    		sendError(c, err)
    		return
    	}
    
    	done, err := job.AttachUploader(uint64(c.Request.ContentLength), "simple HTTP uploader", c.Request.Body)
    	if err != nil {
    		sendError(c, err)
    		return
    	}
    	result := <-done
    	if result == nil {
    		sendError(c, errors.New("upload failed for unknown reason"))
    		return
    
    	if result.Err != nil {
    		sendError(c, result)
    		return
    	}
    	c.JSON(http.StatusOK, result)
    
    type FlowJSFileChunkState uint32
    
    const (
    	FlowJSFileChunkPending FlowJSFileChunkState = iota
    	FlowJSFileChunkWriting
    	FlowJSFileChunkCompleted
    )
    
    
    type FlowJSFileChunk struct {
    
    	state     FlowJSFileChunkState
    
    	filename  string
    
    	file      *os.File
    	completed chan struct{}
    }
    
    
    func (ch *FlowJSFileChunk) GetState() FlowJSFileChunkState {
    	return FlowJSFileChunkState(atomic.LoadUint32((*uint32)(&ch.state)))
    }
    
    
    func (ch *FlowJSFileChunk) Read(p []byte) (int, error) {
    	return ch.file.Read(p)
    }
    
    
    func (ch *FlowJSFileChunk) ReadFrom(r io.Reader) (n int64, err error) {
    
    	if !atomic.CompareAndSwapUint32((*uint32)(&ch.state), uint32(FlowJSFileChunkPending), uint32(FlowJSFileChunkWriting)) {
    
    		return 0, ErrFlowJSChunkAlreadUploading
    
    	// fmt.Printf("starting upload: %s\n", ch.filename)
    
    	if ch.file, err = os.OpenFile(ch.filename, os.O_RDWR|os.O_CREATE|os.O_EXCL, 0600); err != nil {
    
    	return io.Copy(ch.file, r)
    }
    
    func (ch *FlowJSFileChunk) Reset() (err error) {
    
    	// fmt.Printf("reseting chunk: %s\n", ch.filename)
    
    	if err = ch.file.Truncate(0); err != nil {
    		return
    	}
    	if _, err = ch.file.Seek(0, io.SeekStart); err != nil {
    		return
    	}
    	atomic.StoreUint32((*uint32)(&ch.state), uint32(FlowJSFileChunkPending))
    	return
    
    }
    
    func (ch *FlowJSFileChunk) Complete() (err error) {
    
    	// fmt.Printf("completing chunk: %s\n", ch.filename)
    
    	if _, err = ch.file.Seek(0, io.SeekStart); err != nil {
    		return
    	}
    
    	atomic.StoreUint32((*uint32)(&ch.state), uint32(FlowJSFileChunkCompleted))
    
    	close(ch.completed)
    	return
    }
    
    func (ch *FlowJSFileChunk) Cleanup() (err error) {
    
    	// fmt.Printf("cleaning up chunk: %s\n", ch.filename)
    
    	if err = ch.file.Close(); err != nil {
    		return err
    	}
    
    	return os.Remove(ch.filename)
    
    type FlowJSFile struct {
    
    	size       uint64
    	chunks     []FlowJSFileChunk
    	chunksPath string
    	readOffset int
    	job        *importer.Job
    }
    
    
    func newFlowJSFile(id string, numChunks uint64, size uint64, job *importer.Job) (*FlowJSFile, error) {
    	f := &FlowJSFile{id: id, size: size, job: job}
    
    	f.chunksPath = filepath.Join(job.WorkDir, "chunks")
    	f.chunks = make([]FlowJSFileChunk, numChunks)
    	for i := range f.chunks {
    
    		f.chunks[i].filename = filepath.Join(f.chunksPath, strconv.FormatUint(uint64(i), 10)+".chunk")
    
    		f.chunks[i].completed = make(chan struct{})
    	}
    	err := os.Mkdir(f.chunksPath, 0700)
    
    	if os.IsExist(err) {
    		err = nil
    	}
    
    	return f, err
    }
    
    func (f *FlowJSFile) Read(p []byte) (n int, err error) {
    
    	for {
    		if f.readOffset >= len(f.chunks) {
    
    			// fmt.Printf("all chunks are read. we are done!\n")
    
    			// we already read all the chunks
    			return 0, io.EOF
    		}
    
    		// wait for chunk to be completed
    		select {
    		case <-f.job.Ctx.Done():
    			return 0, f.job.Ctx.Err()
    		case <-f.chunks[f.readOffset].completed:
    		}
    
    		if n, err = f.chunks[f.readOffset].Read(p); err == nil || err == io.EOF {
    			if n > 0 {
    
    				// fmt.Printf("successfully read %d bytes from: %s\n", n, f.chunks[f.readOffset].filename)
    
    				err = nil
    				return
    			}
    
    			// we are at the end of the chunk but haven't read anything yet... so just move on
    
    		// fmt.Printf("done reading chunk: %s\n", f.chunks[f.readOffset].filename)
    
    
    		// we are done with this chunk, clean it up...
    		if err = f.chunks[f.readOffset].Cleanup(); err != nil {
    			return
    		}
    		// ...and proceed to the next
    		f.readOffset++
    		if n > 0 {
    			// if this was the last chunk the next call to Read() will return (0, io.EOF)
    			// for now we will return (n, nil)
    			return
    		}
    
    Christian Pointner's avatar
    Christian Pointner committed
    //***
    
    func getOrNewFlowJSFile(job *importer.Job, flowId string, chunk, totalChunks, totalSize uint64) (file *FlowJSFile, err error) {
    	var src io.Reader
    	if src, err = job.GetAttachedUploader(); err != nil && err != importer.ErrSourceNotYetAttached {
    		return
    	}
    
    	if err == importer.ErrSourceNotYetAttached {
    		if file, err = newFlowJSFile(flowId, totalChunks, totalSize, job); err != nil {
    			return
    		}
    
    
    		_, err = job.AttachUploader(file.size, "flow.js uploader: "+file.id, file)
    
    Christian Pointner's avatar
    Christian Pointner committed
    		switch err {
    		case nil:
    		case importer.ErrSourceAlreadyAttached:
    			// there has been a race and the other thread won!
    			file = nil
    			if src, err = job.GetAttachedUploader(); err != nil {
    				return
    			}
    		default:
    			return
    		}
    	}
    
    	if file == nil {
    		var ok bool
    		if file, ok = src.(*FlowJSFile); !ok {
    			err = ErrNotFlowJSUpload
    			return
    		}
    	}
    	return
    }
    
    //***
    
    
    Christian Pointner's avatar
    Christian Pointner committed
    const (
    	FlowJSParamID               string = "flowIdentifier"
    	FlowJSParamChunk            string = "flowChunkNumber"
    	FlowJSParamTotalChunks      string = "flowTotalChunks"
    	FlowJSParamCurrentChunkSize string = "flowCurrentChunkSize"
    	FlowJSParamTotalSize        string = "flowTotalSize"
    	FlowJSParamFile             string = "file"
    )
    
    
    func getFlowJSParameterFromMultipart(r *http.Request) (id string, chunk, totalChunks, chunkSize, totalSize uint64, data io.Reader, err error) {
    	var mr *multipart.Reader
    	if mr, err = r.MultipartReader(); err != nil {
    		return
    	}
    	var p *multipart.Part
    	var pd []byte
    	for {
    		if p, err = mr.NextPart(); err == io.EOF {
    			err = nil
    			break
    		}
    		if err != nil {
    			return
    		}
    
    		pn := p.FormName()
    
    Christian Pointner's avatar
    Christian Pointner committed
    		if pn == FlowJSParamFile {
    
    			break
    		}
    
    		if pd, err = ioutil.ReadAll(p); err != nil {
    			return
    		}
    		switch pn {
    
    Christian Pointner's avatar
    Christian Pointner committed
    		case FlowJSParamID:
    
    			id = string(pd)
    
    Christian Pointner's avatar
    Christian Pointner committed
    		case FlowJSParamChunk:
    
    			if chunk, err = strconv.ParseUint(string(pd), 10, 64); err != nil {
    
    Christian Pointner's avatar
    Christian Pointner committed
    				err = fmt.Errorf("invalid parameter '%s': %v", FlowJSParamChunk, err.Error())
    
    Christian Pointner's avatar
    Christian Pointner committed
    		case FlowJSParamTotalChunks:
    
    			if totalChunks, err = strconv.ParseUint(string(pd), 10, 64); err != nil {
    
    Christian Pointner's avatar
    Christian Pointner committed
    				err = fmt.Errorf("invalid parameter '%s': %v", FlowJSParamTotalChunks, err.Error())
    
    Christian Pointner's avatar
    Christian Pointner committed
    		case FlowJSParamCurrentChunkSize:
    
    			if chunkSize, err = strconv.ParseUint(string(pd), 10, 64); err != nil {
    
    Christian Pointner's avatar
    Christian Pointner committed
    				err = fmt.Errorf("invalid parameter '%s': %v", FlowJSParamCurrentChunkSize, err.Error())
    
    Christian Pointner's avatar
    Christian Pointner committed
    		case FlowJSParamTotalSize:
    
    			if totalSize, err = strconv.ParseUint(string(pd), 10, 64); err != nil {
    
    Christian Pointner's avatar
    Christian Pointner committed
    				err = fmt.Errorf("invalid parameter '%s': %v", FlowJSParamTotalSize, err.Error())
    
    Christian Pointner's avatar
    Christian Pointner committed
    
    
    	var missing []string
    	if id == "" {
    
    Christian Pointner's avatar
    Christian Pointner committed
    		missing = append(missing, FlowJSParamID)
    
    	}
    	if chunk == 0 {
    
    Christian Pointner's avatar
    Christian Pointner committed
    		missing = append(missing, FlowJSParamChunk)
    
    	}
    	if totalChunks == 0 {
    
    Christian Pointner's avatar
    Christian Pointner committed
    		missing = append(missing, FlowJSParamTotalChunks)
    
    	}
    	if chunkSize == 0 {
    
    Christian Pointner's avatar
    Christian Pointner committed
    		missing = append(missing, FlowJSParamCurrentChunkSize)
    
    	}
    	if totalSize == 0 {
    
    Christian Pointner's avatar
    Christian Pointner committed
    		missing = append(missing, FlowJSParamTotalSize)
    
    	}
    	if data == nil {
    
    Christian Pointner's avatar
    Christian Pointner committed
    		missing = append(missing, FlowJSParamFile)
    	}
    	if len(missing) > 0 {
    		err = errors.New("missing/invalid mandatory parameter: " + strings.Join(missing, ", "))
    	}
    	return
    }
    
    //***
    
    func getFlowJSParameterFromQuery(r *http.Request) (id string, chunk, totalChunks, totalSize uint64, err error) {
    	q := r.URL.Query()
    	id = q.Get(FlowJSParamID)
    	if chunk, err = strconv.ParseUint(q.Get(FlowJSParamChunk), 10, 64); err != nil {
    		err = fmt.Errorf("invalid parameter '%s': %v", FlowJSParamChunk, err.Error())
    		return
    	}
    	if totalChunks, err = strconv.ParseUint(q.Get(FlowJSParamTotalChunks), 10, 64); err != nil {
    		err = fmt.Errorf("invalid parameter '%s': %v", FlowJSParamTotalChunks, err.Error())
    		return
    	}
    	if totalSize, err = strconv.ParseUint(q.Get(FlowJSParamTotalSize), 10, 64); err != nil {
    		err = fmt.Errorf("invalid parameter '%s': %v", FlowJSParamTotalSize, err.Error())
    		return
    	}
    
    	var missing []string
    	if id == "" {
    		missing = append(missing, FlowJSParamID)
    	}
    	if chunk == 0 {
    		missing = append(missing, FlowJSParamChunk)
    	}
    	if totalChunks == 0 {
    		missing = append(missing, FlowJSParamTotalChunks)
    	}
    	if totalSize == 0 {
    		missing = append(missing, FlowJSParamTotalSize)
    
    	}
    	if len(missing) > 0 {
    
    Christian Pointner's avatar
    Christian Pointner committed
    		err = errors.New("missing/invalid mandatory parameter: " + strings.Join(missing, ", "))
    
    func (api *API) UploadFileFlowJS(c *gin.Context) {
    	showID := c.Param("show-id")
    	if authorized, _ := authorizeRequest(c, showID); !authorized {
    		return
    	}
    
    	id, err := idFromString(c.Param("file-id"))
    	if err != nil {
    		c.JSON(http.StatusBadRequest, ErrorResponse{Error: "invalid file-id: " + err.Error()})
    		return
    	}
    	flowId, chunk, totalChunks, chunkSize, totalSize, data, err := getFlowJSParameterFromMultipart(c.Request)
    	if err != nil {
    		c.JSON(http.StatusBadRequest, ErrorResponse{Error: err.Error()})
    		return
    	}
    
    	job, err := api.importer.GetJob(showID, id)
    	if err != nil {
    		sendError(c, err)
    		return
    	}
    
    	file, err := getOrNewFlowJSFile(job, flowId, chunk, totalChunks, totalSize)
    	if file == nil {
    		sendError(c, err)
    	}
    
    	if chunk > uint64(len(file.chunks)) {
    		errStr := fmt.Sprintf("invalid chunk number %d, the flow.js upload is only %d chunks long", chunk, len(file.chunks))
    		c.JSON(http.StatusConflict, ErrorResponse{Error: errStr})
    		return
    	}
    	switch file.chunks[chunk-1].GetState() {
    	case FlowJSFileChunkWriting:
    		fallthrough
    	case FlowJSFileChunkCompleted:
    		c.JSON(http.StatusOK, nil)
    	}
    
    	n, err := file.chunks[chunk-1].ReadFrom(data)
    	if err != nil {
    		if err := file.chunks[chunk-1].Reset(); err != nil {
    			c.JSON(http.StatusInternalServerError, ErrorResponse{Error: "failed to reset chunk after failed upload attempt: " + err.Error()})
    
    		c.JSON(http.StatusBadRequest, ErrorResponse{Error: "chunk upload failed: " + err.Error()})
    		return
    	}
    	if n != int64(chunkSize) {
    		if err := file.chunks[chunk-1].Reset(); err != nil {
    			c.JSON(http.StatusInternalServerError, ErrorResponse{Error: "failed to reset chunk after failed upload attempt: " + err.Error()})
    
    		errStr := fmt.Sprintf("chunk upload is complete but has incorrect size, got %d bytes but expected %d bytes", n, chunkSize)
    		c.JSON(http.StatusBadRequest, ErrorResponse{Error: errStr})
    		return
    
    	if err := file.chunks[chunk-1].Complete(); err != nil {
    		c.JSON(http.StatusInternalServerError, ErrorResponse{Error: "failed to mark chunk completed: " + err.Error()})
    		return
    	}
    	c.JSON(http.StatusOK, nil)
    
    func (api *API) TestFileFlowJS(c *gin.Context) {
    	showID := c.Param("show-id")
    	if authorized, _ := authorizeRequest(c, showID); !authorized {
    		return
    	}
    
    	id, err := idFromString(c.Param("file-id"))
    	if err != nil {
    		c.JSON(http.StatusBadRequest, ErrorResponse{Error: "invalid file-id: " + err.Error()})
    		return
    	}
    	flowId, chunk, totalChunks, totalSize, err := getFlowJSParameterFromQuery(c.Request)
    	if err != nil {
    		c.JSON(http.StatusBadRequest, ErrorResponse{Error: err.Error()})
    		return
    	}
    
    	job, err := api.importer.GetJob(showID, id)
    	if err != nil {
    		sendError(c, err)
    		return
    	}
    
    	file, err := getOrNewFlowJSFile(job, flowId, chunk, totalChunks, totalSize)
    	if file == nil {
    		sendError(c, err)
    	}
    
    	if chunk > uint64(len(file.chunks)) {
    		errStr := fmt.Sprintf("invalid chunk number %d, the flow.js upload is only %d chunks long", chunk, len(file.chunks))
    		c.JSON(http.StatusConflict, ErrorResponse{Error: errStr})
    		return
    	}
    	switch file.chunks[chunk-1].GetState() {
    	case FlowJSFileChunkPending:
    		c.JSON(http.StatusNoContent, nil)
    	case FlowJSFileChunkWriting:
    		fallthrough
    	case FlowJSFileChunkCompleted:
    		c.JSON(http.StatusOK, nil)