Skip to content
Snippets Groups Projects
Commit d09e781e authored by Christian Pointner's avatar Christian Pointner
Browse files

session cleanup after cancel (still needs testing

parent 5619eabc
No related branches found
No related tags found
No related merge requests found
......@@ -27,9 +27,11 @@ package importer
import (
"io/ioutil"
"log"
"net/http"
"os"
"runtime"
"sync"
"time"
)
type ProgressCB func(step int, stepName string, current, total float64, userdata interface{}) bool
......@@ -63,6 +65,7 @@ type ImportContext struct {
ProgressCallBack ProgressCB
ProgressCallBackData interface{}
Cancel <-chan bool
doneChan chan<- Result
// TODO: add source config
}
......@@ -125,17 +128,30 @@ type Importer struct {
}
type workerRequest struct {
ctx *ImportContext
enquedChan chan<- bool
ctx *ImportContext
enqueuedChan chan<- bool
}
func (im *Importer) runWorker(idx int) {
defer im.dbgLog.Printf("importer: worker %d has stopped", idx)
im.dbgLog.Printf("importer: worker %d is running", idx)
select {
case req := <-im.workerChan:
im.dbgLog.Printf("importer: worker %d got request: %+v", idx, req.ctx)
for {
select {
case req := <-im.workerChan:
if req.ctx.isCanceled() {
im.dbgLog.Printf("importer: worker %d got canceled request")
continue
}
req.enqueuedChan <- true
im.dbgLog.Printf("importer: worker %d got request: %+v", idx, req.ctx)
// TODO: check for potential race between cancel and enqueue
// TODO: actually do import
time.Sleep(60 * time.Second)
req.ctx.doneChan <- Result{ResponseCode: http.StatusOK, ErrorString: "import success"}
}
}
}
......
......@@ -33,8 +33,8 @@ import (
type SessionState uint8
const (
StateNew SessionState = iota
StatePending // TODO: use this when session backlog is implemented
StateNew SessionState = iota
StatePending
StateRunning
StateCanceled
StateDone
......@@ -111,7 +111,7 @@ type session struct {
quit chan bool
timer *time.Timer
workerChan chan<- workerRequest
enquedIntChan chan bool
enqueuedIntChan chan bool
cancelIntChan chan bool
progressRateLimit time.Duration
progressIntChan chan ProgressData
......@@ -136,7 +136,8 @@ func (s *session) run(timeout time.Duration) {
s.ctx.ProgressCallBack = sessionProgressCallback
s.ctx.ProgressCallBackData = (chan<- ProgressData)(s.progressIntChan)
s.ctx.Cancel = s.cancelIntChan
s.workerChan <- workerRequest{&(s.ctx), s.enquedIntChan}
s.ctx.doneChan = s.doneIntChan
s.workerChan <- workerRequest{&(s.ctx), s.enqueuedIntChan}
s.state = StatePending
if timeout <= 0 || timeout > 3*time.Hour {
s.ctx.infoLog.Printf("importer: requested session timeout (%v) is invalid or too high - setting it to 3h", timeout)
......@@ -146,11 +147,20 @@ func (s *session) run(timeout time.Duration) {
return
}
func (s *session) cancel() {
s.ctx.dbgLog.Println("importer: canceling running import")
select {
case s.cancelIntChan <- true:
default: // session got canceled already??
func (s *session) cancel(reason string) {
if s.state == StatePending || s.state == StateRunning {
s.ctx.dbgLog.Printf("importer: canceling running import (%s)", reason)
select {
case s.cancelIntChan <- true:
default: // session got canceled already??
}
}
if s.state == StatePending {
r := &Result{ResponseCode: http.StatusNoContent, ErrorString: reason}
s.callDoneHandler(r)
if s.removeFunc != nil {
s.removeFunc()
}
}
s.state = StateCanceled
}
......@@ -217,32 +227,20 @@ func (s *session) dispatchRequests() {
for {
select {
case <-s.quit:
if s.state == StatePending || s.state == StateRunning {
s.cancel()
}
s.cancel("session canceled due to global shutdown")
return
case <-s.timer.C:
if s.state == StatePending || s.state == StateRunning {
s.cancel()
}
s.state = StateTimeout
r := &Result{ResponseCode: http.StatusInternalServerError, ErrorString: "session timed out"}
s.callDoneHandler(r)
if s.removeFunc != nil {
s.removeFunc()
}
s.cancel("session timed out")
case t := <-s.runChan:
if s.state == StateNew {
s.run(t)
}
case <-s.enquedIntChan:
case <-s.enqueuedIntChan:
if s.state == StatePending {
s.state = StateRunning
}
case <-s.cancelChan:
if s.state == StatePending || s.state == StateRunning {
s.cancel()
}
s.cancel("session canceled by user")
case req := <-s.addProgressChan:
req.response <- s.addProgressHandler(req.userdata, req.callback)
case req := <-s.addDoneChan:
......@@ -399,7 +397,7 @@ func newSession(ctx *ImportContext, workerChan chan<- workerRequest, removeFunc
s.done = make(chan bool)
s.timer = time.NewTimer(60 * time.Second)
s.workerChan = workerChan
s.enquedIntChan = make(chan bool)
s.enqueuedIntChan = make(chan bool)
s.cancelIntChan = make(chan bool, 1)
s.progressRateLimit = 100 * time.Millisecond // TODO: hardcoded value
s.progressIntChan = make(chan ProgressData, 10)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment