Skip to content
Snippets Groups Projects
Commit 1a2c27e6 authored by Christian Pointner's avatar Christian Pointner
Browse files

attaching source works now

parent 50684dde
No related branches found
No related tags found
No related merge requests found
...@@ -62,7 +62,26 @@ func (api *API) ReadImportOfFile() http.Handler { ...@@ -62,7 +62,26 @@ func (api *API) ReadImportOfFile() http.Handler {
func (api *API) UploadFile() http.Handler { func (api *API) UploadFile() http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
sendWebResponse(w, http.StatusNotImplemented, ErrorResponse{Error: "uploading files not yet implemented"}) vars := mux.Vars(r)
id, err := idFromString(vars["file-id"])
if err != nil {
sendWebResponse(w, http.StatusBadRequest, ErrorResponse{Error: "invalid file-id: " + err.Error()})
return
}
job, err := api.importer.GetJob(vars["group-id"], id)
if err != nil {
sendError(w, err)
return
}
if err = job.AttachSource(uint64(r.ContentLength), r.Body); err != nil {
sendError(w, err)
return
}
<-job.Done()
// TODO: get result from job
sendWebResponse(w, http.StatusOK, nil)
}) })
} }
......
...@@ -25,6 +25,8 @@ ...@@ -25,6 +25,8 @@
package importer package importer
import ( import (
"crypto/sha256"
"encoding/hex"
"io" "io"
"io/ioutil" "io/ioutil"
"sync/atomic" "sync/atomic"
...@@ -47,18 +49,20 @@ func (d *devNull) Read(p []byte) (n int, err error) { ...@@ -47,18 +49,20 @@ func (d *devNull) Read(p []byte) (n int, err error) {
} }
func (job *Job) prepareSource() { func (job *Job) prepareSource() {
// TODO: enable this as soon as attachment source is implemented switch job.Source.Scheme {
// if job.Source.Scheme == "attachment" { case SourceSchemeAttachment:
// the source will be attached using AttachSource() // the source will be attached using AttachSource()
// return return
// }
// TODO: implement other sources // TODO: implement other sources
// simulate a 10 MB file... default:
l := uint64(10 * 1024 * 1024) // simulate a 10 MB file...
r := devNull(l) l := uint64(10 * 1024 * 1024)
atomic.StorePointer((*unsafe.Pointer)(unsafe.Pointer(&job.source)), unsafe.Pointer(&JobSource{l, &r})) r := devNull(l)
close(job.subC.sourceAttached) atomic.StorePointer((*unsafe.Pointer)(unsafe.Pointer(&job.source)), unsafe.Pointer(&JobSource{l, &r}))
close(job.subC.sourceAttached)
}
} }
type progressWriter struct { type progressWriter struct {
...@@ -85,14 +89,17 @@ func (job *Job) fetch() (err error) { ...@@ -85,14 +89,17 @@ func (job *Job) fetch() (err error) {
return job.ctx.Err() return job.ctx.Err()
case <-job.subC.sourceAttached: case <-job.subC.sourceAttached:
} }
// job.source is now initialized and points to a valid source
// TODO: use an actual converter here // TODO: use an actual converter here
conv := ioutil.Discard conv := ioutil.Discard
done := make(chan error) done := make(chan error)
go func() { go func() {
// TODO: wrap job.source using io.TeeReader to compute source file hash hash := sha256.New()
_, err := io.Copy(&progressWriter{job, 0, conv}, job.source.r) src := io.TeeReader(job.source.r, hash)
written, err := io.Copy(&progressWriter{job, 0, conv}, src)
job.im.dbgLog.Printf("fetch(): done copying %d bytes from source (SHA256: %s)", written, hex.EncodeToString(hash.Sum(nil)))
done <- err done <- err
}() }()
......
...@@ -52,7 +52,7 @@ func (im *Importer) ListJobs(group string) (Jobs, error) { ...@@ -52,7 +52,7 @@ func (im *Importer) ListJobs(group string) (Jobs, error) {
func (im *Importer) CreateJob(group string, id uint64, src url.URL, user, refID string) (*Job, error) { func (im *Importer) CreateJob(group string, id uint64, src url.URL, user, refID string) (*Job, error) {
// TODO: update this list once we implemented other sources // TODO: update this list once we implemented other sources
if src.Scheme != "attachment" { if src.Scheme != SourceSchemeAttachment {
return nil, ErrSourceNotSupported return nil, ErrSourceNotSupported
} }
......
...@@ -54,7 +54,7 @@ type Job struct { ...@@ -54,7 +54,7 @@ type Job struct {
sourceAttached chan struct{} sourceAttached chan struct{}
running chan struct{} running chan struct{}
done chan struct{} done chan struct{}
} `json:"-"` }
} }
type Jobs []*Job type Jobs []*Job
......
...@@ -36,6 +36,8 @@ import ( ...@@ -36,6 +36,8 @@ import (
const ( const (
DefaultBacklog = 100 DefaultBacklog = 100
SourceSchemeAttachment = "attachment"
) )
//******* Errors //******* Errors
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment