diff --git a/api/v1/api_uploads.go b/api/v1/api_uploads.go index b3fa26f5c96db9edb058e1d73a5f7b832b2960c3..d79e17274983b89b1d98db76c7f045110c91a06a 100644 --- a/api/v1/api_uploads.go +++ b/api/v1/api_uploads.go @@ -25,7 +25,6 @@ package v1 import ( - //"encoding/base64" "errors" "fmt" "io" @@ -106,13 +105,15 @@ func (ch *FlowJSFileChunk) ReadFrom(r io.Reader) (n int64, err error) { return 0, ErrFlowJSChunkAlreadUploading } - if ch.file, err = os.OpenFile(ch.filename, os.O_WRONLY|os.O_CREATE|os.O_EXCL, 0600); err != nil { + fmt.Printf("starting upload: %s\n", ch.filename) + if ch.file, err = os.OpenFile(ch.filename, os.O_RDWR|os.O_CREATE|os.O_EXCL, 0600); err != nil { return } return io.Copy(ch.file, r) } func (ch *FlowJSFileChunk) Reset() (err error) { + fmt.Printf("reseting chunk: %s\n", ch.filename) if err = ch.file.Truncate(0); err != nil { return } @@ -124,6 +125,7 @@ func (ch *FlowJSFileChunk) Reset() (err error) { } func (ch *FlowJSFileChunk) Complete() (err error) { + fmt.Printf("completing chunk: %s\n", ch.filename) if _, err = ch.file.Seek(0, io.SeekStart); err != nil { return } @@ -133,11 +135,11 @@ func (ch *FlowJSFileChunk) Complete() (err error) { } func (ch *FlowJSFileChunk) Cleanup() (err error) { - filename := ch.file.Name() + fmt.Printf("cleaning up chunk: %s\n", ch.filename) if err = ch.file.Close(); err != nil { return err } - return os.Remove(filename) + return os.Remove(ch.filename) } //*** @@ -169,6 +171,7 @@ func newFlowJSFile(id string, numChunks uint64, size uint64, job *importer.Job) func (f *FlowJSFile) Read(p []byte) (n int, err error) { for { if f.readOffset >= len(f.chunks) { + fmt.Printf("all chunks are read. we are done!\n") // we already read all the chunks return 0, io.EOF } @@ -182,14 +185,13 @@ func (f *FlowJSFile) Read(p []byte) (n int, err error) { if n, err = f.chunks[f.readOffset].Read(p); err == nil || err == io.EOF { if n > 0 { + fmt.Printf("successfully read %d bytes from: %s\n", n, f.chunks[f.readOffset].filename) err = nil return } - // we are at the end of the chunk but haven't read anything yet... - // .. move on to the next chunk - f.readOffset++ - continue + // we are at the end of the chunk but haven't read anything yet... so just move on } + fmt.Printf("done reading chunk: %s\n", f.chunks[f.readOffset].filename) // we are done with this chunk, clean it up... if err = f.chunks[f.readOffset].Cleanup(); err != nil { @@ -262,7 +264,7 @@ func getFlowJSParameterFromMultipart(r *http.Request) (id string, chunk, totalCh pn := p.FormName() if pn == "file" { - data = p // base64.NewDecoder(base64.StdEncoding, p) + data = p break } @@ -366,11 +368,19 @@ func (api *API) UploadFileFlowJS() http.Handler { return } if n != int64(chunkSize) { + if err := file.chunks[chunk-1].Reset(); err != nil { + sendWebResponse(w, http.StatusInternalServerError, ErrorResponse{Error: "failed to reset chunk after failed upload attempt: " + err.Error()}) + return + } errStr := fmt.Sprintf("chunk upload is complete but has incorrect size, got %d bytes but expected %d bytes", n, chunkSize) sendWebResponse(w, http.StatusBadRequest, ErrorResponse{Error: errStr}) return } - + if err := file.chunks[chunk-1].Complete(); err != nil { + sendWebResponse(w, http.StatusInternalServerError, ErrorResponse{Error: "failed to mark chunk completed: " + err.Error()}) + return + } + // TODO: if this is the last chunk to be uploaded wait for fetch converter to finish? sendWebResponse(w, http.StatusOK, nil) }) } diff --git a/importer/fetch.go b/importer/fetch.go index 45f52f66adc384e656f80cc7388141549f6ce939..e20233341818efe41436e4c5f7bab0361e82c908 100644 --- a/importer/fetch.go +++ b/importer/fetch.go @@ -25,10 +25,12 @@ package importer import ( - "encoding/base64" + "crypto/sha256" + //"encoding/base64" + "encoding/hex" "io" - - "golang.org/x/crypto/blake2b" + // TODO: make hashing algo configurable + // "golang.org/x/crypto/blake2b" ) type copyFromSourceResult struct { @@ -39,17 +41,19 @@ type copyFromSourceResult struct { func (job *Job) copyFromSource(w io.Writer, done chan<- copyFromSourceResult) { defer close(done) - hash, err := blake2b.New256(nil) - if err != nil { - panic("creating hash function failed: " + err.Error()) - } + // hash, err := blake2b.New256(nil) + // if err != nil { + // panic("creating hash function failed: " + err.Error()) + // } + hash := sha256.New() src := io.TeeReader(job.source, hash) written, err := io.Copy(&progressWriter{job, StepFetching, job.source.Len(), 0, w}, src) if err != nil { done <- copyFromSourceResult{err, ""} return } - hashStr := "blake2b_256:" + base64.URLEncoding.EncodeToString(hash.Sum(nil)) + //hashStr := "blake2b_256:" + base64.URLEncoding.EncodeToString(hash.Sum(nil)) + hashStr := "sha256:" + hex.EncodeToString(hash.Sum(nil)) job.im.dbgLog.Printf("fetch(): done copying %d bytes from source (%s)", written, hashStr) _, err = job.im.store.UpdateFileSourceHash(job.Group, job.ID, hashStr) done <- copyFromSourceResult{err, hashStr}