diff --git a/importer/importer.go b/importer/importer.go index bb3c7ea15769a0874581d30dcb948ebd26bfd0ba..201c3e951d78ed17c23297da0a807eb19f4a80ad 100644 --- a/importer/importer.go +++ b/importer/importer.go @@ -27,9 +27,11 @@ package importer import ( "io/ioutil" "log" + "net/http" "os" "runtime" "sync" + "time" ) type ProgressCB func(step int, stepName string, current, total float64, userdata interface{}) bool @@ -63,6 +65,7 @@ type ImportContext struct { ProgressCallBack ProgressCB ProgressCallBackData interface{} Cancel <-chan bool + doneChan chan<- Result // TODO: add source config } @@ -125,17 +128,30 @@ type Importer struct { } type workerRequest struct { - ctx *ImportContext - enquedChan chan<- bool + ctx *ImportContext + enqueuedChan chan<- bool } func (im *Importer) runWorker(idx int) { defer im.dbgLog.Printf("importer: worker %d has stopped", idx) im.dbgLog.Printf("importer: worker %d is running", idx) - select { - case req := <-im.workerChan: - im.dbgLog.Printf("importer: worker %d got request: %+v", idx, req.ctx) + for { + select { + case req := <-im.workerChan: + if req.ctx.isCanceled() { + im.dbgLog.Printf("importer: worker %d got canceled request") + continue + } + req.enqueuedChan <- true + im.dbgLog.Printf("importer: worker %d got request: %+v", idx, req.ctx) + // TODO: check for potential race between cancel and enqueue + + // TODO: actually do import + time.Sleep(60 * time.Second) + + req.ctx.doneChan <- Result{ResponseCode: http.StatusOK, ErrorString: "import success"} + } } } diff --git a/importer/session.go b/importer/session.go index 8fd50561d1c5549f99826b121e3bee2ec28c166f..21502180b583b58c12153f5965c51b19b9c74fbe 100644 --- a/importer/session.go +++ b/importer/session.go @@ -33,8 +33,8 @@ import ( type SessionState uint8 const ( - StateNew SessionState = iota - StatePending // TODO: use this when session backlog is implemented + StateNew SessionState = iota + StatePending StateRunning StateCanceled StateDone @@ -111,7 +111,7 @@ type session struct { quit chan bool timer *time.Timer workerChan chan<- workerRequest - enquedIntChan chan bool + enqueuedIntChan chan bool cancelIntChan chan bool progressRateLimit time.Duration progressIntChan chan ProgressData @@ -136,7 +136,8 @@ func (s *session) run(timeout time.Duration) { s.ctx.ProgressCallBack = sessionProgressCallback s.ctx.ProgressCallBackData = (chan<- ProgressData)(s.progressIntChan) s.ctx.Cancel = s.cancelIntChan - s.workerChan <- workerRequest{&(s.ctx), s.enquedIntChan} + s.ctx.doneChan = s.doneIntChan + s.workerChan <- workerRequest{&(s.ctx), s.enqueuedIntChan} s.state = StatePending if timeout <= 0 || timeout > 3*time.Hour { s.ctx.infoLog.Printf("importer: requested session timeout (%v) is invalid or too high - setting it to 3h", timeout) @@ -146,11 +147,20 @@ func (s *session) run(timeout time.Duration) { return } -func (s *session) cancel() { - s.ctx.dbgLog.Println("importer: canceling running import") - select { - case s.cancelIntChan <- true: - default: // session got canceled already?? +func (s *session) cancel(reason string) { + if s.state == StatePending || s.state == StateRunning { + s.ctx.dbgLog.Printf("importer: canceling running import (%s)", reason) + select { + case s.cancelIntChan <- true: + default: // session got canceled already?? + } + } + if s.state == StatePending { + r := &Result{ResponseCode: http.StatusNoContent, ErrorString: reason} + s.callDoneHandler(r) + if s.removeFunc != nil { + s.removeFunc() + } } s.state = StateCanceled } @@ -217,32 +227,20 @@ func (s *session) dispatchRequests() { for { select { case <-s.quit: - if s.state == StatePending || s.state == StateRunning { - s.cancel() - } + s.cancel("session canceled due to global shutdown") return case <-s.timer.C: - if s.state == StatePending || s.state == StateRunning { - s.cancel() - } - s.state = StateTimeout - r := &Result{ResponseCode: http.StatusInternalServerError, ErrorString: "session timed out"} - s.callDoneHandler(r) - if s.removeFunc != nil { - s.removeFunc() - } + s.cancel("session timed out") case t := <-s.runChan: if s.state == StateNew { s.run(t) } - case <-s.enquedIntChan: + case <-s.enqueuedIntChan: if s.state == StatePending { s.state = StateRunning } case <-s.cancelChan: - if s.state == StatePending || s.state == StateRunning { - s.cancel() - } + s.cancel("session canceled by user") case req := <-s.addProgressChan: req.response <- s.addProgressHandler(req.userdata, req.callback) case req := <-s.addDoneChan: @@ -399,7 +397,7 @@ func newSession(ctx *ImportContext, workerChan chan<- workerRequest, removeFunc s.done = make(chan bool) s.timer = time.NewTimer(60 * time.Second) s.workerChan = workerChan - s.enquedIntChan = make(chan bool) + s.enqueuedIntChan = make(chan bool) s.cancelIntChan = make(chan bool, 1) s.progressRateLimit = 100 * time.Millisecond // TODO: hardcoded value s.progressIntChan = make(chan ProgressData, 10)