diff --git a/api/v1/api_uploads.go b/api/v1/api_uploads.go index 6f042ef78061ad13cf9fd993ec631c3c13ce0179..b04d1c1405c52a9fddec2b3197dd25fa0c3a2094 100644 --- a/api/v1/api_uploads.go +++ b/api/v1/api_uploads.go @@ -120,30 +120,42 @@ func newFlowJSFile(size uint64, numChunks uint, job *importer.Job) (*FlowJSFile, } func (f *FlowJSFile) Read(p []byte) (n int, err error) { - if f.readOffset >= len(f.chunks) { - // we already read all the chunks - return 0, io.EOF - } + for { + if f.readOffset >= len(f.chunks) { + // we already read all the chunks + return 0, io.EOF + } - // also wait for jobs context !!!? - select { - case <-f.job.Ctx.Done(): - return 0, f.job.Ctx.Err() - case <-f.chunks[f.readOffset].completed: - } - if n, err = f.chunks[f.readOffset].Read(p); err == nil || err == io.EOF { - return - } + // wait for chunk to be completed + select { + case <-f.job.Ctx.Done(): + return 0, f.job.Ctx.Err() + case <-f.chunks[f.readOffset].completed: + } - // we are done with this chunk, clean it up... - if err = f.chunks[f.readOffset].Cleanup(); err != nil { - return + if n, err = f.chunks[f.readOffset].Read(p); err == nil || err == io.EOF { + if n > 0 { + err = nil + return + } + // we are at the end of the chunk but haven't read anything yet... + // .. move on to the next chunk + f.readOffset++ + continue + } + + // we are done with this chunk, clean it up... + if err = f.chunks[f.readOffset].Cleanup(); err != nil { + return + } + // ...and proceed to the next + f.readOffset++ + if n > 0 { + // if this was the last chunk the next call to Read() will return (0, io.EOF) + // for now we will return (n, nil) + return + } } - // ...and proceed to the next - f.readOffset++ - // if this was the last chunk the next call to Read() will return (0, io.EOF) - // for now we will return (n, nil) - return } func (api *API) UploadFileFlowJS() http.Handler {