Something went wrong on our end
-
Christian Pointner authoredChristian Pointner authored
api_uploads.go 12.86 KiB
//
// tank
//
// Import and Playlist Daemon for autoradio project
//
//
// Copyright (C) 2017-2018 Christian Pointner <equinox@helsinki.at>
//
// This file is part of tank.
//
// tank is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// any later version.
//
// tank is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with tank. If not, see <http://www.gnu.org/licenses/>.
//
package v1
import (
"errors"
"fmt"
"io"
"io/ioutil"
"mime/multipart"
"net/http"
"os"
"path/filepath"
"strconv"
"strings"
"sync/atomic"
"github.com/gorilla/mux"
"gitlab.servus.at/autoradio/tank/importer"
)
//******* simple binary http upload
func (api *API) UploadFileSimple() http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
id, err := idFromString(vars["file-id"])
if err != nil {
sendWebResponse(w, http.StatusBadRequest, ErrorResponse{Error: "invalid file-id: " + err.Error()})
return
}
job, err := api.importer.GetJob(vars["group-id"], id)
if err != nil {
sendError(w, err)
return
}
done, err := job.AttachUploader(uint64(r.ContentLength), r.Body)
if err != nil {
sendError(w, err)
return
}
result := <-done
if result == nil {
sendError(w, errors.New("upload failed for unknown reason"))
return
}
if result.Err != nil {
sendError(w, result)
return
}
sendWebResponse(w, http.StatusOK, result)
})
}
//******* flow.js
type FlowJSFileChunkState uint32
const (
FlowJSFileChunkPending FlowJSFileChunkState = iota
FlowJSFileChunkWriting
FlowJSFileChunkCompleted
)
type FlowJSFileChunk struct {
state FlowJSFileChunkState
filename string
file *os.File
completed chan struct{}
}
func (ch *FlowJSFileChunk) GetState() FlowJSFileChunkState {
return FlowJSFileChunkState(atomic.LoadUint32((*uint32)(&ch.state)))
}
func (ch *FlowJSFileChunk) Read(p []byte) (int, error) {
return ch.file.Read(p)
}
func (ch *FlowJSFileChunk) ReadFrom(r io.Reader) (n int64, err error) {
if !atomic.CompareAndSwapUint32((*uint32)(&ch.state), uint32(FlowJSFileChunkPending), uint32(FlowJSFileChunkWriting)) {
return 0, ErrFlowJSChunkAlreadUploading
}
// fmt.Printf("starting upload: %s\n", ch.filename)
if ch.file, err = os.OpenFile(ch.filename, os.O_RDWR|os.O_CREATE|os.O_EXCL, 0600); err != nil {
return
}
return io.Copy(ch.file, r)
}
func (ch *FlowJSFileChunk) Reset() (err error) {
// fmt.Printf("reseting chunk: %s\n", ch.filename)
if err = ch.file.Truncate(0); err != nil {
return
}
if _, err = ch.file.Seek(0, io.SeekStart); err != nil {
return
}
atomic.StoreUint32((*uint32)(&ch.state), uint32(FlowJSFileChunkPending))
return
}
func (ch *FlowJSFileChunk) Complete() (err error) {
// fmt.Printf("completing chunk: %s\n", ch.filename)
if _, err = ch.file.Seek(0, io.SeekStart); err != nil {
return
}
atomic.StoreUint32((*uint32)(&ch.state), uint32(FlowJSFileChunkCompleted))
close(ch.completed)
return
}
func (ch *FlowJSFileChunk) Cleanup() (err error) {
// fmt.Printf("cleaning up chunk: %s\n", ch.filename)
if err = ch.file.Close(); err != nil {
return err
}
return os.Remove(ch.filename)
}
//***
type FlowJSFile struct {
id string
size uint64
chunks []FlowJSFileChunk
chunksPath string
readOffset int
job *importer.Job
}
func newFlowJSFile(id string, numChunks uint64, size uint64, job *importer.Job) (*FlowJSFile, error) {
f := &FlowJSFile{id: id, size: size, job: job}
f.chunksPath = filepath.Join(job.WorkDir, "chunks")
f.chunks = make([]FlowJSFileChunk, numChunks)
for i := range f.chunks {
f.chunks[i].filename = filepath.Join(f.chunksPath, strconv.FormatUint(uint64(i), 10)+".chunk")
f.chunks[i].completed = make(chan struct{})
}
err := os.Mkdir(f.chunksPath, 0700)
if os.IsExist(err) {
err = nil
}
return f, err
}
func (f *FlowJSFile) Read(p []byte) (n int, err error) {
for {
if f.readOffset >= len(f.chunks) {
// fmt.Printf("all chunks are read. we are done!\n")
// we already read all the chunks
return 0, io.EOF
}
// wait for chunk to be completed
select {
case <-f.job.Ctx.Done():
return 0, f.job.Ctx.Err()
case <-f.chunks[f.readOffset].completed:
}
if n, err = f.chunks[f.readOffset].Read(p); err == nil || err == io.EOF {
if n > 0 {
// fmt.Printf("successfully read %d bytes from: %s\n", n, f.chunks[f.readOffset].filename)
err = nil
return
}
// we are at the end of the chunk but haven't read anything yet... so just move on
}
// fmt.Printf("done reading chunk: %s\n", f.chunks[f.readOffset].filename)
// we are done with this chunk, clean it up...
if err = f.chunks[f.readOffset].Cleanup(); err != nil {
return
}
// ...and proceed to the next
f.readOffset++
if n > 0 {
// if this was the last chunk the next call to Read() will return (0, io.EOF)
// for now we will return (n, nil)
return
}
}
}
//***
func getOrNewFlowJSFile(job *importer.Job, flowId string, chunk, totalChunks, totalSize uint64) (file *FlowJSFile, err error) {
var src io.Reader
if src, err = job.GetAttachedUploader(); err != nil && err != importer.ErrSourceNotYetAttached {
return
}
if err == importer.ErrSourceNotYetAttached {
if file, err = newFlowJSFile(flowId, totalChunks, totalSize, job); err != nil {
return
}
_, err = job.AttachUploader(file.size, file)
switch err {
case nil:
case importer.ErrSourceAlreadyAttached:
// there has been a race and the other thread won!
file = nil
if src, err = job.GetAttachedUploader(); err != nil {
return
}
default:
return
}
}
if file == nil {
var ok bool
if file, ok = src.(*FlowJSFile); !ok {
err = ErrNotFlowJSUpload
return
}
}
return
}
//***
const (
FlowJSParamID string = "flowIdentifier"
FlowJSParamChunk string = "flowChunkNumber"
FlowJSParamTotalChunks string = "flowTotalChunks"
FlowJSParamCurrentChunkSize string = "flowCurrentChunkSize"
FlowJSParamTotalSize string = "flowTotalSize"
FlowJSParamFile string = "file"
)
func getFlowJSParameterFromMultipart(r *http.Request) (id string, chunk, totalChunks, chunkSize, totalSize uint64, data io.Reader, err error) {
var mr *multipart.Reader
if mr, err = r.MultipartReader(); err != nil {
return
}
var p *multipart.Part
var pd []byte
for {
if p, err = mr.NextPart(); err == io.EOF {
err = nil
break
}
if err != nil {
return
}
pn := p.FormName()
if pn == FlowJSParamFile {
data = p
break
}
if pd, err = ioutil.ReadAll(p); err != nil {
return
}
switch pn {
case FlowJSParamID:
id = string(pd)
case FlowJSParamChunk:
if chunk, err = strconv.ParseUint(string(pd), 10, 64); err != nil {
err = fmt.Errorf("invalid parameter '%s': %v", FlowJSParamChunk, err.Error())
return
}
case FlowJSParamTotalChunks:
if totalChunks, err = strconv.ParseUint(string(pd), 10, 64); err != nil {
err = fmt.Errorf("invalid parameter '%s': %v", FlowJSParamTotalChunks, err.Error())
return
}
case FlowJSParamCurrentChunkSize:
if chunkSize, err = strconv.ParseUint(string(pd), 10, 64); err != nil {
err = fmt.Errorf("invalid parameter '%s': %v", FlowJSParamCurrentChunkSize, err.Error())
return
}
case FlowJSParamTotalSize:
if totalSize, err = strconv.ParseUint(string(pd), 10, 64); err != nil {
err = fmt.Errorf("invalid parameter '%s': %v", FlowJSParamTotalSize, err.Error())
return
}
}
}
var missing []string
if id == "" {
missing = append(missing, FlowJSParamID)
}
if chunk == 0 {
missing = append(missing, FlowJSParamChunk)
}
if totalChunks == 0 {
missing = append(missing, FlowJSParamTotalChunks)
}
if chunkSize == 0 {
missing = append(missing, FlowJSParamCurrentChunkSize)
}
if totalSize == 0 {
missing = append(missing, FlowJSParamTotalSize)
}
if data == nil {
missing = append(missing, FlowJSParamFile)
}
if len(missing) > 0 {
err = errors.New("missing/invalid mandatory parameter: " + strings.Join(missing, ", "))
}
return
}
//***
func getFlowJSParameterFromQuery(r *http.Request) (id string, chunk, totalChunks, totalSize uint64, err error) {
q := r.URL.Query()
id = q.Get(FlowJSParamID)
if chunk, err = strconv.ParseUint(q.Get(FlowJSParamChunk), 10, 64); err != nil {
err = fmt.Errorf("invalid parameter '%s': %v", FlowJSParamChunk, err.Error())
return
}
if totalChunks, err = strconv.ParseUint(q.Get(FlowJSParamTotalChunks), 10, 64); err != nil {
err = fmt.Errorf("invalid parameter '%s': %v", FlowJSParamTotalChunks, err.Error())
return
}
if totalSize, err = strconv.ParseUint(q.Get(FlowJSParamTotalSize), 10, 64); err != nil {
err = fmt.Errorf("invalid parameter '%s': %v", FlowJSParamTotalSize, err.Error())
return
}
var missing []string
if id == "" {
missing = append(missing, FlowJSParamID)
}
if chunk == 0 {
missing = append(missing, FlowJSParamChunk)
}
if totalChunks == 0 {
missing = append(missing, FlowJSParamTotalChunks)
}
if totalSize == 0 {
missing = append(missing, FlowJSParamTotalSize)
}
if len(missing) > 0 {
err = errors.New("missing/invalid mandatory parameter: " + strings.Join(missing, ", "))
}
return
}
func (api *API) UploadFileFlowJS() http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
id, err := idFromString(vars["file-id"])
if err != nil {
sendWebResponse(w, http.StatusBadRequest, ErrorResponse{Error: "invalid file-id: " + err.Error()})
return
}
flowId, chunk, totalChunks, chunkSize, totalSize, data, err := getFlowJSParameterFromMultipart(r)
if err != nil {
sendWebResponse(w, http.StatusBadRequest, ErrorResponse{Error: err.Error()})
return
}
job, err := api.importer.GetJob(vars["group-id"], id)
if err != nil {
sendError(w, err)
return
}
file, err := getOrNewFlowJSFile(job, flowId, chunk, totalChunks, totalSize)
if file == nil {
sendError(w, err)
}
if chunk > uint64(len(file.chunks)) {
errStr := fmt.Sprintf("invalid chunk number %d, the flow.js upload is only %d chunks long", chunk, len(file.chunks))
sendWebResponse(w, http.StatusConflict, ErrorResponse{Error: errStr})
return
}
switch file.chunks[chunk-1].GetState() {
case FlowJSFileChunkWriting:
fallthrough
case FlowJSFileChunkCompleted:
sendWebResponse(w, http.StatusOK, nil)
}
n, err := file.chunks[chunk-1].ReadFrom(data)
if err != nil {
if err := file.chunks[chunk-1].Reset(); err != nil {
sendWebResponse(w, http.StatusInternalServerError, ErrorResponse{Error: "failed to reset chunk after failed upload attempt: " + err.Error()})
return
}
sendWebResponse(w, http.StatusBadRequest, ErrorResponse{Error: "chunk upload failed: " + err.Error()})
return
}
if n != int64(chunkSize) {
if err := file.chunks[chunk-1].Reset(); err != nil {
sendWebResponse(w, http.StatusInternalServerError, ErrorResponse{Error: "failed to reset chunk after failed upload attempt: " + err.Error()})
return
}
errStr := fmt.Sprintf("chunk upload is complete but has incorrect size, got %d bytes but expected %d bytes", n, chunkSize)
sendWebResponse(w, http.StatusBadRequest, ErrorResponse{Error: errStr})
return
}
if err := file.chunks[chunk-1].Complete(); err != nil {
sendWebResponse(w, http.StatusInternalServerError, ErrorResponse{Error: "failed to mark chunk completed: " + err.Error()})
return
}
sendWebResponse(w, http.StatusOK, nil)
})
}
func (api *API) TestFileFlowJS() http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
id, err := idFromString(vars["file-id"])
if err != nil {
sendWebResponse(w, http.StatusBadRequest, ErrorResponse{Error: "invalid file-id: " + err.Error()})
return
}
flowId, chunk, totalChunks, totalSize, err := getFlowJSParameterFromQuery(r)
if err != nil {
sendWebResponse(w, http.StatusBadRequest, ErrorResponse{Error: err.Error()})
return
}
job, err := api.importer.GetJob(vars["group-id"], id)
if err != nil {
sendError(w, err)
return
}
file, err := getOrNewFlowJSFile(job, flowId, chunk, totalChunks, totalSize)
if file == nil {
sendError(w, err)
}
if chunk > uint64(len(file.chunks)) {
errStr := fmt.Sprintf("invalid chunk number %d, the flow.js upload is only %d chunks long", chunk, len(file.chunks))
sendWebResponse(w, http.StatusConflict, ErrorResponse{Error: errStr})
return
}
switch file.chunks[chunk-1].GetState() {
case FlowJSFileChunkPending:
sendWebResponse(w, http.StatusNoContent, nil)
case FlowJSFileChunkWriting:
fallthrough
case FlowJSFileChunkCompleted:
sendWebResponse(w, http.StatusOK, nil)
}
})
}