Skip to content
Snippets Groups Projects
Commit cd1c5109 authored by Christian Pointner's avatar Christian Pointner
Browse files

job source now has a name

parent bc33aa5c
No related branches found
No related tags found
No related merge requests found
......@@ -57,7 +57,7 @@ func (api *API) UploadFileSimple() http.Handler {
return
}
done, err := job.AttachUploader(uint64(r.ContentLength), r.Body)
done, err := job.AttachUploader(uint64(r.ContentLength), "simple HTTP uploader", r.Body)
if err != nil {
sendError(w, err)
return
......@@ -220,7 +220,7 @@ func getOrNewFlowJSFile(job *importer.Job, flowId string, chunk, totalChunks, to
return
}
_, err = job.AttachUploader(file.size, file)
_, err = job.AttachUploader(file.size, "flow.js uploader: "+file.id, file)
switch err {
case nil:
case importer.ErrSourceAlreadyAttached:
......
......@@ -87,6 +87,7 @@ func (job *Job) run() error {
return err
}
// job.source is now initialized and points to a valid source
job.im.dbgLog.Printf("job source is ready, fetching %d bytes from %s", job.source.Len(), job.source.String())
loudness, err := job.fetch()
if err != nil {
job.im.store.UpdateFileImportState(job.Group, job.ID, store.ImportAborted)
......@@ -154,7 +155,7 @@ func (job *Job) Start(ctx context.Context, timeout time.Duration) (err error) {
return
}
func (job *Job) AttachUploader(len uint64, r io.Reader) (<-chan *JobSourceResult, error) {
func (job *Job) AttachUploader(len uint64, name string, r io.Reader) (<-chan *JobSourceResult, error) {
if state := atomic.LoadUint32((*uint32)(&job.State)); state != uint32(JobRunning) {
return nil, ErrImportNotRunning
}
......@@ -166,7 +167,7 @@ func (job *Job) AttachUploader(len uint64, r io.Reader) (<-chan *JobSourceResult
if ok := atomic.CompareAndSwapUint32(&job.sourceSet, 0, 1); !ok {
return nil, ErrSourceAlreadyAttached
}
src := newJobSourceUpload(len, r)
src := newJobSourceUpload(len, name, r)
job.source = src
close(job.subC.sourceAttached)
return src.done, nil
......
......@@ -37,7 +37,7 @@ import (
func (job *Job) initializeSource() (err error) {
if job.Source.Scheme == SourceSchemeUpload {
// the source will be attached using job.AttachSource() so all we need to do
// the source will be attached using job.AttachUploader() so all we need to do
// is to wait for it to happen
select {
case <-job.Ctx.Done():
......@@ -78,6 +78,10 @@ func (src *JobSourceFake) Len() uint64 {
return uint64(*src)
}
func (src *JobSourceFake) String() string {
return "fake source"
}
func (src *JobSourceFake) Read(p []byte) (n int, err error) {
l := len(p)
if *src > JobSourceFake(l) {
......@@ -98,12 +102,13 @@ func (src *JobSourceFake) Done(result *JobSourceResult) {
type JobSourceUpload struct {
len uint64
name string
r io.Reader
done chan *JobSourceResult
}
func newJobSourceUpload(len uint64, r io.Reader) *JobSourceUpload {
src := &JobSourceUpload{len: len, r: r}
func newJobSourceUpload(len uint64, name string, r io.Reader) *JobSourceUpload {
src := &JobSourceUpload{len: len, name: name, r: r}
src.done = make(chan *JobSourceResult, 1)
return src
}
......@@ -112,6 +117,10 @@ func (src *JobSourceUpload) Len() uint64 {
return src.len
}
func (src *JobSourceUpload) String() string {
return src.name
}
func (src *JobSourceUpload) Read(p []byte) (n int, err error) {
return src.r.Read(p)
}
......@@ -127,7 +136,8 @@ func (src *JobSourceUpload) Done(result *JobSourceResult) {
//******* http(s)
type JobSourceHTTP struct {
r *http.Response
r *http.Response
filename string
}
func newJobSourceHTTP(client *http.Client, srcURL SourceURL) (*JobSourceHTTP, error) {
......@@ -147,17 +157,14 @@ func newJobSourceHTTP(client *http.Client, srcURL SourceURL) (*JobSourceHTTP, er
err = errors.New("server returned unexpected status: " + src.r.Status)
}
filename := ""
if mediatype, params, err := mime.ParseMediaType(src.r.Header.Get("Content-Disposition")); err == nil {
if mediatype == "attachment" {
filename = params["filename"]
src.filename = params["filename"]
}
}
if filename == "" {
_, filename = path.Split((url.URL)(srcURL).Path)
if src.filename == "" {
_, src.filename = path.Split((url.URL)(srcURL).Path)
}
// TODO: print log line that request was successful...
return src, err
}
......@@ -169,6 +176,10 @@ func (src *JobSourceHTTP) Len() uint64 {
return uint64(cl)
}
func (src *JobSourceHTTP) String() string {
return "HTTP downloader: " + src.filename
}
func (src *JobSourceHTTP) Read(p []byte) (n int, err error) {
return src.r.Body.Read(p)
}
......
......@@ -105,6 +105,7 @@ func (r *JobSourceResult) Error() string {
type JobSource interface {
Len() uint64
String() string
io.Reader
Done(*JobSourceResult)
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment