Skip to content
Snippets Groups Projects
Commit ba00d48e authored by Christian Pointner's avatar Christian Pointner
Browse files

fix FlowJSFile.Read()

parent 8baee297
No related branches found
No related tags found
No related merge requests found
......@@ -120,30 +120,42 @@ func newFlowJSFile(size uint64, numChunks uint, job *importer.Job) (*FlowJSFile,
}
func (f *FlowJSFile) Read(p []byte) (n int, err error) {
if f.readOffset >= len(f.chunks) {
// we already read all the chunks
return 0, io.EOF
}
for {
if f.readOffset >= len(f.chunks) {
// we already read all the chunks
return 0, io.EOF
}
// also wait for jobs context !!!?
select {
case <-f.job.Ctx.Done():
return 0, f.job.Ctx.Err()
case <-f.chunks[f.readOffset].completed:
}
if n, err = f.chunks[f.readOffset].Read(p); err == nil || err == io.EOF {
return
}
// wait for chunk to be completed
select {
case <-f.job.Ctx.Done():
return 0, f.job.Ctx.Err()
case <-f.chunks[f.readOffset].completed:
}
// we are done with this chunk, clean it up...
if err = f.chunks[f.readOffset].Cleanup(); err != nil {
return
if n, err = f.chunks[f.readOffset].Read(p); err == nil || err == io.EOF {
if n > 0 {
err = nil
return
}
// we are at the end of the chunk but haven't read anything yet...
// .. move on to the next chunk
f.readOffset++
continue
}
// we are done with this chunk, clean it up...
if err = f.chunks[f.readOffset].Cleanup(); err != nil {
return
}
// ...and proceed to the next
f.readOffset++
if n > 0 {
// if this was the last chunk the next call to Read() will return (0, io.EOF)
// for now we will return (n, nil)
return
}
}
// ...and proceed to the next
f.readOffset++
// if this was the last chunk the next call to Read() will return (0, io.EOF)
// for now we will return (n, nil)
return
}
func (api *API) UploadFileFlowJS() http.Handler {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment