Skip to content
Snippets Groups Projects
Commit 0e12d396 authored by Christian Pointner's avatar Christian Pointner
Browse files

flow.js upload works now, testing and cleanup needed

parent 795b6cdb
No related branches found
No related tags found
No related merge requests found
...@@ -25,7 +25,6 @@ ...@@ -25,7 +25,6 @@
package v1 package v1
import ( import (
//"encoding/base64"
"errors" "errors"
"fmt" "fmt"
"io" "io"
...@@ -106,13 +105,15 @@ func (ch *FlowJSFileChunk) ReadFrom(r io.Reader) (n int64, err error) { ...@@ -106,13 +105,15 @@ func (ch *FlowJSFileChunk) ReadFrom(r io.Reader) (n int64, err error) {
return 0, ErrFlowJSChunkAlreadUploading return 0, ErrFlowJSChunkAlreadUploading
} }
if ch.file, err = os.OpenFile(ch.filename, os.O_WRONLY|os.O_CREATE|os.O_EXCL, 0600); err != nil { fmt.Printf("starting upload: %s\n", ch.filename)
if ch.file, err = os.OpenFile(ch.filename, os.O_RDWR|os.O_CREATE|os.O_EXCL, 0600); err != nil {
return return
} }
return io.Copy(ch.file, r) return io.Copy(ch.file, r)
} }
func (ch *FlowJSFileChunk) Reset() (err error) { func (ch *FlowJSFileChunk) Reset() (err error) {
fmt.Printf("reseting chunk: %s\n", ch.filename)
if err = ch.file.Truncate(0); err != nil { if err = ch.file.Truncate(0); err != nil {
return return
} }
...@@ -124,6 +125,7 @@ func (ch *FlowJSFileChunk) Reset() (err error) { ...@@ -124,6 +125,7 @@ func (ch *FlowJSFileChunk) Reset() (err error) {
} }
func (ch *FlowJSFileChunk) Complete() (err error) { func (ch *FlowJSFileChunk) Complete() (err error) {
fmt.Printf("completing chunk: %s\n", ch.filename)
if _, err = ch.file.Seek(0, io.SeekStart); err != nil { if _, err = ch.file.Seek(0, io.SeekStart); err != nil {
return return
} }
...@@ -133,11 +135,11 @@ func (ch *FlowJSFileChunk) Complete() (err error) { ...@@ -133,11 +135,11 @@ func (ch *FlowJSFileChunk) Complete() (err error) {
} }
func (ch *FlowJSFileChunk) Cleanup() (err error) { func (ch *FlowJSFileChunk) Cleanup() (err error) {
filename := ch.file.Name() fmt.Printf("cleaning up chunk: %s\n", ch.filename)
if err = ch.file.Close(); err != nil { if err = ch.file.Close(); err != nil {
return err return err
} }
return os.Remove(filename) return os.Remove(ch.filename)
} }
//*** //***
...@@ -169,6 +171,7 @@ func newFlowJSFile(id string, numChunks uint64, size uint64, job *importer.Job) ...@@ -169,6 +171,7 @@ func newFlowJSFile(id string, numChunks uint64, size uint64, job *importer.Job)
func (f *FlowJSFile) Read(p []byte) (n int, err error) { func (f *FlowJSFile) Read(p []byte) (n int, err error) {
for { for {
if f.readOffset >= len(f.chunks) { if f.readOffset >= len(f.chunks) {
fmt.Printf("all chunks are read. we are done!\n")
// we already read all the chunks // we already read all the chunks
return 0, io.EOF return 0, io.EOF
} }
...@@ -182,14 +185,13 @@ func (f *FlowJSFile) Read(p []byte) (n int, err error) { ...@@ -182,14 +185,13 @@ func (f *FlowJSFile) Read(p []byte) (n int, err error) {
if n, err = f.chunks[f.readOffset].Read(p); err == nil || err == io.EOF { if n, err = f.chunks[f.readOffset].Read(p); err == nil || err == io.EOF {
if n > 0 { if n > 0 {
fmt.Printf("successfully read %d bytes from: %s\n", n, f.chunks[f.readOffset].filename)
err = nil err = nil
return return
} }
// we are at the end of the chunk but haven't read anything yet... // we are at the end of the chunk but haven't read anything yet... so just move on
// .. move on to the next chunk
f.readOffset++
continue
} }
fmt.Printf("done reading chunk: %s\n", f.chunks[f.readOffset].filename)
// we are done with this chunk, clean it up... // we are done with this chunk, clean it up...
if err = f.chunks[f.readOffset].Cleanup(); err != nil { if err = f.chunks[f.readOffset].Cleanup(); err != nil {
...@@ -262,7 +264,7 @@ func getFlowJSParameterFromMultipart(r *http.Request) (id string, chunk, totalCh ...@@ -262,7 +264,7 @@ func getFlowJSParameterFromMultipart(r *http.Request) (id string, chunk, totalCh
pn := p.FormName() pn := p.FormName()
if pn == "file" { if pn == "file" {
data = p // base64.NewDecoder(base64.StdEncoding, p) data = p
break break
} }
...@@ -366,11 +368,19 @@ func (api *API) UploadFileFlowJS() http.Handler { ...@@ -366,11 +368,19 @@ func (api *API) UploadFileFlowJS() http.Handler {
return return
} }
if n != int64(chunkSize) { if n != int64(chunkSize) {
if err := file.chunks[chunk-1].Reset(); err != nil {
sendWebResponse(w, http.StatusInternalServerError, ErrorResponse{Error: "failed to reset chunk after failed upload attempt: " + err.Error()})
return
}
errStr := fmt.Sprintf("chunk upload is complete but has incorrect size, got %d bytes but expected %d bytes", n, chunkSize) errStr := fmt.Sprintf("chunk upload is complete but has incorrect size, got %d bytes but expected %d bytes", n, chunkSize)
sendWebResponse(w, http.StatusBadRequest, ErrorResponse{Error: errStr}) sendWebResponse(w, http.StatusBadRequest, ErrorResponse{Error: errStr})
return return
} }
if err := file.chunks[chunk-1].Complete(); err != nil {
sendWebResponse(w, http.StatusInternalServerError, ErrorResponse{Error: "failed to mark chunk completed: " + err.Error()})
return
}
// TODO: if this is the last chunk to be uploaded wait for fetch converter to finish?
sendWebResponse(w, http.StatusOK, nil) sendWebResponse(w, http.StatusOK, nil)
}) })
} }
......
...@@ -25,10 +25,12 @@ ...@@ -25,10 +25,12 @@
package importer package importer
import ( import (
"encoding/base64" "crypto/sha256"
//"encoding/base64"
"encoding/hex"
"io" "io"
// TODO: make hashing algo configurable
"golang.org/x/crypto/blake2b" // "golang.org/x/crypto/blake2b"
) )
type copyFromSourceResult struct { type copyFromSourceResult struct {
...@@ -39,17 +41,19 @@ type copyFromSourceResult struct { ...@@ -39,17 +41,19 @@ type copyFromSourceResult struct {
func (job *Job) copyFromSource(w io.Writer, done chan<- copyFromSourceResult) { func (job *Job) copyFromSource(w io.Writer, done chan<- copyFromSourceResult) {
defer close(done) defer close(done)
hash, err := blake2b.New256(nil) // hash, err := blake2b.New256(nil)
if err != nil { // if err != nil {
panic("creating hash function failed: " + err.Error()) // panic("creating hash function failed: " + err.Error())
} // }
hash := sha256.New()
src := io.TeeReader(job.source, hash) src := io.TeeReader(job.source, hash)
written, err := io.Copy(&progressWriter{job, StepFetching, job.source.Len(), 0, w}, src) written, err := io.Copy(&progressWriter{job, StepFetching, job.source.Len(), 0, w}, src)
if err != nil { if err != nil {
done <- copyFromSourceResult{err, ""} done <- copyFromSourceResult{err, ""}
return return
} }
hashStr := "blake2b_256:" + base64.URLEncoding.EncodeToString(hash.Sum(nil)) //hashStr := "blake2b_256:" + base64.URLEncoding.EncodeToString(hash.Sum(nil))
hashStr := "sha256:" + hex.EncodeToString(hash.Sum(nil))
job.im.dbgLog.Printf("fetch(): done copying %d bytes from source (%s)", written, hashStr) job.im.dbgLog.Printf("fetch(): done copying %d bytes from source (%s)", written, hashStr)
_, err = job.im.store.UpdateFileSourceHash(job.Group, job.ID, hashStr) _, err = job.im.store.UpdateFileSourceHash(job.Group, job.ID, hashStr)
done <- copyFromSourceResult{err, hashStr} done <- copyFromSourceResult{err, hashStr}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment