From e30c038a4b03d3968854fe1fa063a82271d86634 Mon Sep 17 00:00:00 2001 From: Christian Pointner <equinox@helsinki.at> Date: Fri, 1 Dec 2017 21:44:54 +0100 Subject: [PATCH] added importer --- importer/.gitignore | 1 + importer/Makefile | 47 +++++ importer/config.go | 29 +++ importer/importer.go | 108 ++++++++++ importer/session.go | 411 ++++++++++++++++++++++++++++++++++++++ importer/session_store.go | 395 ++++++++++++++++++++++++++++++++++++ 6 files changed, 991 insertions(+) create mode 100644 importer/.gitignore create mode 100644 importer/Makefile create mode 100644 importer/config.go create mode 100644 importer/importer.go create mode 100644 importer/session.go create mode 100644 importer/session_store.go diff --git a/importer/.gitignore b/importer/.gitignore new file mode 100644 index 0000000..2d83068 --- /dev/null +++ b/importer/.gitignore @@ -0,0 +1 @@ +coverage.out diff --git a/importer/Makefile b/importer/Makefile new file mode 100644 index 0000000..0596704 --- /dev/null +++ b/importer/Makefile @@ -0,0 +1,47 @@ +## +## tank +## +## Import and Playlist Daemon for autoradio project +## +## +## Copyright (C) 2017 Christian Pointner <equinox@helsinki.at> +## +## This file is part of tank. +## +## tank is free software: you can redistribute it and/or modify +## it under the terms of the GNU General Public License as published by +## the Free Software Foundation, either version 3 of the License, or +## any later version. +## +## tank is distributed in the hope that it will be useful, +## but WITHOUT ANY WARRANTY; without even the implied warranty of +## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +## GNU General Public License for more details. +## +## You should have received a copy of the GNU General Public License +## along with tank. If not, see <http://www.gnu.org/licenses/>. +## + +GOCMD := go +ifdef GOROOT +GOCMD = $(GOROOT)/bin/go +endif + +all: test +.PHONY: vet format test cover bench + +vet: + $(GOCMD) vet + +format: + $(GOCMD) fmt + +test: + $(GOCMD) test + +bench: + $(GOCMD) test -bench=. + +cover: + $(GOCMD) test -coverprofile=coverage.out + $(GOCMD) tool cover -html=coverage.out diff --git a/importer/config.go b/importer/config.go new file mode 100644 index 0000000..64d0f4b --- /dev/null +++ b/importer/config.go @@ -0,0 +1,29 @@ +// +// tank +// +// Import and Playlist Daemon for autoradio project +// +// +// Copyright (C) 2017 Christian Pointner <equinox@helsinki.at> +// +// This file is part of tank. +// +// tank is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// any later version. +// +// tank is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with tank. If not, see <http://www.gnu.org/licenses/>. +// + +package importer + +type Config struct { + TempPath string `json:"temp-path" yaml:"temp-path" toml:"temp-path"` +} diff --git a/importer/importer.go b/importer/importer.go new file mode 100644 index 0000000..0aa29d3 --- /dev/null +++ b/importer/importer.go @@ -0,0 +1,108 @@ +// +// tank +// +// Import and Playlist Daemon for autoradio project +// +// +// Copyright (C) 2017 Christian Pointner <equinox@helsinki.at> +// +// This file is part of tank. +// +// tank is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// any later version. +// +// tank is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with tank. If not, see <http://www.gnu.org/licenses/>. +// + +package importer + +import ( + "io/ioutil" + "log" + "os" +) + +type ProgressCB func(step int, stepName string, current, total float64, userdata interface{}) bool +type DoneCB func(result Result, userdata interface{}) bool + +type ProgressData struct { + Step int + StepName string + Current float64 + Total float64 +} + +type Result struct { + ResponseCode int + ErrorString string +} + +type AttachmentChunk struct { + Data []byte + Error error +} + +type ImportContext struct { + conf *Config + stdlog *log.Logger + dbglog *log.Logger + GroupName string + WorkDir string + AttachmentChan chan AttachmentChunk + ProgressCallBack ProgressCB + ProgressCallBackData interface{} + Cancel <-chan bool + + // TODO: add source config +} + +func NewImportContext(conf *Config, stdlog, dbglog *log.Logger) *ImportContext { + if stdlog == nil { + stdlog = log.New(ioutil.Discard, "", 0) + } + if dbglog == nil { + dbglog = log.New(ioutil.Discard, "", 0) + } + + ctx := &ImportContext{} + ctx.conf = conf + ctx.stdlog = stdlog + ctx.dbglog = dbglog + ctx.GroupName = "" + ctx.AttachmentChan = make(chan AttachmentChunk, 32) + ctx.ProgressCallBack = nil + ctx.Cancel = nil + return ctx +} + +func (ctx *ImportContext) createTempWorkDir() (err error) { + ctx.WorkDir, err = ioutil.TempDir(ctx.conf.TempPath, "tank-") + return +} + +func (ctx *ImportContext) removeTempWorkDir() { + if err := os.RemoveAll(ctx.WorkDir); err != nil { + ctx.stdlog.Printf("Error removing WorkDir: %s", err) + } + return +} + +func (ctx *ImportContext) reportProgress(step int, stepName string, current, total float64) { + if ctx.ProgressCallBack != nil { + if keep := ctx.ProgressCallBack(step, stepName, current, total, ctx.ProgressCallBackData); !keep { + ctx.ProgressCallBack = nil + } + } +} + +func (ctx *ImportContext) isCanceled() bool { + return ctx.Cancel != nil && len(ctx.Cancel) > 0 +} diff --git a/importer/session.go b/importer/session.go new file mode 100644 index 0000000..793f797 --- /dev/null +++ b/importer/session.go @@ -0,0 +1,411 @@ +// +// tank +// +// Import and Playlist Daemon for autoradio project +// +// +// Copyright (C) 2017 Christian Pointner <equinox@helsinki.at> +// +// This file is part of tank. +// +// tank is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// any later version. +// +// tank is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with tank. If not, see <http://www.gnu.org/licenses/>. +// + +package importer + +import ( + "fmt" + "net/http" + "time" +) + +type SessionState uint8 + +const ( + StateNew SessionState = iota + StatePending + StateRunning + StateCanceled + StateDone + StateTimeout +) + +func (u SessionState) String() string { + switch u { + case StateNew: + return "new" + case StatePending: + return "pending" + case StateRunning: + return "running" + case StateCanceled: + return "canceled" + case StateDone: + return "done" + case StateTimeout: + return "timeout" + } + return "unknown" +} + +func (u SessionState) MarshalText() (data []byte, err error) { + data = []byte(u.String()) + return +} + +type sessionProgressCB struct { + cb ProgressCB + userdata interface{} +} + +type sessionDoneCB struct { + cb DoneCB + userdata interface{} +} + +type sessionAddProgressHandlerResponse struct { + err error +} + +type sessionAddProgressHandlerRequest struct { + userdata interface{} + callback ProgressCB + response chan<- sessionAddProgressHandlerResponse +} + +type sessionAddDoneHandlerResponse struct { + err error +} + +type sessionAddDoneHandlerRequest struct { + userdata interface{} + callback DoneCB + response chan<- sessionAddDoneHandlerResponse +} + +type attachUploaderResponse struct { + cancel <-chan bool + attachment chan<- AttachmentChunk +} + +type attachUploaderRequest struct { + response chan<- attachUploaderResponse +} + +type session struct { + ctx ImportContext + state SessionState + removeFunc func() + done chan bool + quit chan bool + timer *time.Timer + cancelIntChan chan bool + progressRateLimit time.Duration + progressIntChan chan ProgressData + doneIntChan chan Result + runChan chan time.Duration + cancelChan chan bool + addProgressChan chan sessionAddProgressHandlerRequest + addDoneChan chan sessionAddDoneHandlerRequest + attachUploaderChan chan attachUploaderRequest + progressCBs []*sessionProgressCB + doneCBs []*sessionDoneCB + cancelUploader chan bool +} + +func sessionProgressCallback(step int, stepName string, current, total float64, userdata interface{}) bool { + out := userdata.(chan<- ProgressData) + out <- ProgressData{step, stepName, current, total} + return true +} + +func (s *session) runner() { + // TODO: acutall run the import + s.doneIntChan <- Result{ResponseCode: http.StatusNotImplemented, ErrorString: "not yet implemented"} +} + +func (s *session) run(timeout time.Duration) { + s.ctx.ProgressCallBack = sessionProgressCallback + s.ctx.ProgressCallBackData = (chan<- ProgressData)(s.progressIntChan) + s.ctx.Cancel = s.cancelIntChan + go s.runner() + s.state = StateRunning + if timeout <= 0 || timeout > 3*time.Hour { + s.ctx.stdlog.Printf("requested session timeout (%v) is invalid or too high - setting it to 3h", timeout) + timeout = 3 * time.Hour + } + s.timer.Reset(timeout) + return +} + +func (s *session) cancel() { + s.ctx.dbglog.Println("Session: canceling running import") + select { + case s.cancelIntChan <- true: + default: // session got canceled already?? + } + s.state = StateCanceled +} + +func (s *session) addProgressHandler(userdata interface{}, cb ProgressCB) (resp sessionAddProgressHandlerResponse) { + if s.state != StateNew && s.state != StateRunning { + resp.err = fmt.Errorf("session is already done/canceled") + } + s.progressCBs = append(s.progressCBs, &sessionProgressCB{cb, userdata}) + return +} + +func (s *session) addDoneHandler(userdata interface{}, cb DoneCB) (resp sessionAddDoneHandlerResponse) { + if s.state != StateNew && s.state != StateRunning { + resp.err = fmt.Errorf("session is already done/canceled") + } + s.doneCBs = append(s.doneCBs, &sessionDoneCB{cb, userdata}) + return +} + +func (s *session) callProgressHandler(p *ProgressData) { + for _, cb := range s.progressCBs { + if cb.cb != nil { + if keep := cb.cb(p.Step, p.StepName, p.Current, p.Total, cb.userdata); !keep { + cb.cb = nil + } + } + } +} + +func (s *session) callDoneHandler(r *Result) { + for _, cb := range s.doneCBs { + if cb.cb != nil { + if keep := cb.cb(*r, cb.userdata); !keep { + cb.cb = nil + } + } + } +} + +func (s *session) attachUploader() (resp attachUploaderResponse) { + if s.cancelUploader != nil { + return + } + s.cancelUploader = make(chan bool, 1) + resp.cancel = s.cancelUploader + resp.attachment = s.ctx.AttachmentChan + return +} + +func (s *session) dispatchRequests() { + defer func() { + if s.cancelUploader != nil { + close(s.cancelUploader) + } + s.done <- true + }() + + var lastProgress *ProgressData + progressPending := 0 + pt := time.NewTimer(s.progressRateLimit) + pt.Stop() + + for { + select { + case <-s.quit: + if s.state == StateRunning { + s.cancel() + } + return + case <-s.timer.C: + if s.state == StateRunning { + s.cancel() + } + s.state = StateTimeout + r := &Result{ResponseCode: http.StatusInternalServerError, ErrorString: "session timed out"} + s.callDoneHandler(r) + if s.removeFunc != nil { + s.removeFunc() + } + case t := <-s.runChan: + if s.state == StateNew { + s.run(t) + } + case <-s.cancelChan: + if s.state == StateRunning { + s.cancel() + } + case req := <-s.addProgressChan: + req.response <- s.addProgressHandler(req.userdata, req.callback) + case req := <-s.addDoneChan: + req.response <- s.addDoneHandler(req.userdata, req.callback) + case <-pt.C: + if progressPending > 1 && lastProgress != nil { + s.callProgressHandler(lastProgress) + } + progressPending = 0 + lastProgress = nil + case p := <-s.progressIntChan: + if s.state == StateRunning { + if lastProgress == nil { + s.callProgressHandler(&p) + pt.Reset(s.progressRateLimit) + } else if lastProgress.Step != p.Step { + s.callProgressHandler(lastProgress) + s.callProgressHandler(&p) + pt.Reset(s.progressRateLimit) + } + lastProgress = &p + progressPending++ + } + case r := <-s.doneIntChan: + if s.state != StateTimeout { + s.timer.Stop() + s.state = StateDone + s.callDoneHandler(&r) + if s.removeFunc != nil { + s.removeFunc() + } + } + case req := <-s.attachUploaderChan: + req.response <- s.attachUploader() + } + } +} + +// ********************************************************* +// Public Interface + +type Session struct { + runChan chan<- time.Duration + cancelChan chan<- bool + addProgressChan chan<- sessionAddProgressHandlerRequest + addDoneChan chan<- sessionAddDoneHandlerRequest + attachUploaderChan chan<- attachUploaderRequest +} + +func (s *Session) Run(timeout time.Duration) { + select { + case s.runChan <- timeout: + default: // command is already pending or session is about to be closed/removed + } +} + +func (s *Session) Cancel() { + select { + case s.cancelChan <- true: + default: // cancel is already pending or session is about to be closed/removed + } +} + +func (s *Session) AddProgressHandler(userdata interface{}, cb ProgressCB) error { + resCh := make(chan sessionAddProgressHandlerResponse) + req := sessionAddProgressHandlerRequest{} + req.userdata = userdata + req.callback = cb + req.response = resCh + select { + case s.addProgressChan <- req: + default: + return fmt.Errorf("session is about to be closed/removed") + } + + res := <-resCh + return res.err +} + +func (s *Session) AddDoneHandler(userdata interface{}, cb DoneCB) error { + resCh := make(chan sessionAddDoneHandlerResponse) + req := sessionAddDoneHandlerRequest{} + req.userdata = userdata + req.callback = cb + req.response = resCh + select { + case s.addDoneChan <- req: + default: + return fmt.Errorf("session is about to be closed/removed") + } + + res := <-resCh + return res.err +} + +func (s *Session) AttachUploader() (<-chan bool, chan<- AttachmentChunk) { + resCh := make(chan attachUploaderResponse) + req := attachUploaderRequest{} + req.response = resCh + select { + case s.attachUploaderChan <- req: + default: + // session is about to be closed/removed + return nil, nil + } + + res := <-resCh + return res.cancel, res.attachment +} + +// ********************************************************* +// Semi-Public Interface (only used by sessionStore) + +func (s *session) getInterface() *Session { + ch := &Session{} + ch.runChan = s.runChan + ch.cancelChan = s.cancelChan + ch.addProgressChan = s.addProgressChan + ch.addDoneChan = s.addDoneChan + ch.attachUploaderChan = s.attachUploaderChan + return ch +} + +func (s *session) cleanup() { + s.quit <- true + s.ctx.dbglog.Printf("Session: waiting for session to close") + <-s.done + close(s.quit) + close(s.done) + s.timer.Stop() + // don't close the channels we give out because this might lead to a panic if + // somebody wites to an already removed session + // close(s.cancelIntChan) + // close(s.progressIntChan) + // close(s.doneIntChan) + // close(s.runChan) + // close(s.cancelChan) + // close(s.addProgressChan) + // close(s.addDoneChan) + // close(s.attachUploader) + s.ctx.removeTempWorkDir() + s.ctx.dbglog.Printf("Session: cleanup is now done") +} + +func newSession(ctx *ImportContext, removeFunc func()) (s *session, err error) { + s = &session{} + s.state = StateNew + s.removeFunc = removeFunc + s.ctx = *ctx + if err = s.ctx.createTempWorkDir(); err != nil { + return + } + s.quit = make(chan bool, 1) + s.done = make(chan bool) + s.timer = time.NewTimer(10 * time.Second) + s.cancelIntChan = make(chan bool, 1) + s.progressRateLimit = 100 * time.Millisecond // TODO: hardcoded value + s.progressIntChan = make(chan ProgressData, 10) + s.doneIntChan = make(chan Result, 1) + s.runChan = make(chan time.Duration, 1) + s.cancelChan = make(chan bool, 1) + s.addProgressChan = make(chan sessionAddProgressHandlerRequest, 10) + s.addDoneChan = make(chan sessionAddDoneHandlerRequest, 10) + s.attachUploaderChan = make(chan attachUploaderRequest, 1) + go s.dispatchRequests() + return +} diff --git a/importer/session_store.go b/importer/session_store.go new file mode 100644 index 0000000..6eb6242 --- /dev/null +++ b/importer/session_store.go @@ -0,0 +1,395 @@ +// +// tank +// +// Import and Playlist Daemon for autoradio project +// +// +// Copyright (C) 2017 Christian Pointner <equinox@helsinki.at> +// +// This file is part of tank. +// +// tank is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// any later version. +// +// tank is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with tank. If not, see <http://www.gnu.org/licenses/>. +// + +package importer + +import ( + "crypto/rand" + "encoding/base64" + "fmt" + "io/ioutil" + "log" + "net/http" + "strings" + "time" +) + +type newSessionResponse struct { + id string + session *Session + responsecode int + errorstring string +} + +type newSessionRequest struct { + ctx *ImportContext + refId string + response chan newSessionResponse +} + +type getSessionResponse struct { + session *Session + refId string + responsecode int + errorstring string +} + +type getSessionRequest struct { + group string + id string + refId string + response chan getSessionResponse +} + +type SessionsUpdateCB func(added, removed map[string]string, userdata interface{}) bool + +type SessionsListCB struct { + cb SessionsUpdateCB + userdata interface{} +} + +type listSessionsResponse struct { + sessions map[string]string + responsecode int + errorstring string +} + +type listSessionsRequest struct { + group string + password string + trusted bool + callback SessionsUpdateCB + userdata interface{} + response chan listSessionsResponse +} + +type removeSessionResponse struct { + responsecode int + errorstring string +} + +type removeSessionRequest struct { + group string + id string + response chan removeSessionResponse +} + +type sessionStoreSessionElement struct { + s *session + refId string +} + +type sessionStoreGroupElement struct { + sessions map[string]*sessionStoreSessionElement + updateCBs []SessionsListCB +} + +func (group *sessionStoreGroupElement) callUpdateHandler(added, removed map[string]string) { + var keptCBs []SessionsListCB + for _, cb := range group.updateCBs { + if cb.cb != nil { + if keep := cb.cb(added, removed, cb.userdata); keep { + keptCBs = append(keptCBs, cb) + } + } + } + group.updateCBs = keptCBs +} + +func (group *sessionStoreGroupElement) callUpdateHandlerAdd(id, refId string) { + added := make(map[string]string) + added[id] = refId + group.callUpdateHandler(added, nil) +} + +func (group *sessionStoreGroupElement) callUpdateHandlerRemove(id, refId string) { + removed := make(map[string]string) + removed[id] = refId + group.callUpdateHandler(nil, removed) +} + +type sessionStore struct { + store map[string]*sessionStoreGroupElement + conf *Config + stdlog *log.Logger + dbglog *log.Logger + quit chan bool + done chan bool + newChan chan newSessionRequest + getChan chan getSessionRequest + listChan chan listSessionsRequest + removeChan chan removeSessionRequest +} + +func generateSessionId() (string, error) { + var b [32]byte + if _, err := rand.Read(b[:]); err != nil { + return "", err + } + return base64.RawURLEncoding.EncodeToString(b[:]), nil +} + +func (store *sessionStore) new(ctx *ImportContext, refId string) (resp newSessionResponse) { + resp.responsecode = http.StatusOK + resp.errorstring = "OK" + id, err := generateSessionId() + if err != nil { + resp.responsecode = http.StatusInternalServerError + resp.errorstring = err.Error() + return + } + + resp.id = id + if _, exists := store.store[ctx.GroupName]; !exists { + newgroup := &sessionStoreGroupElement{} + newgroup.sessions = make(map[string]*sessionStoreSessionElement) + store.store[ctx.GroupName] = newgroup + } + ctx.conf = store.conf + if pref := ctx.stdlog.Prefix(); strings.Contains(pref, "%s") { + ctx.stdlog.SetPrefix(fmt.Sprintf(pref, resp.id)) + } + if pref := ctx.dbglog.Prefix(); strings.Contains(pref, "%s") { + ctx.dbglog.SetPrefix(fmt.Sprintf(pref, resp.id)) + } + + s, err := newSession(ctx, func() { store.GetInterface().Remove(ctx.GroupName, resp.id) }) + if err != nil { + resp.responsecode = http.StatusInternalServerError + resp.errorstring = err.Error() + return + } + store.store[ctx.GroupName].sessions[resp.id] = &sessionStoreSessionElement{s, refId} + resp.session = store.store[ctx.GroupName].sessions[resp.id].s.getInterface() + store.dbglog.Printf("SessionStore: created session for '%s' -> %s", ctx.GroupName, resp.id) + store.store[ctx.GroupName].callUpdateHandlerAdd(resp.id, refId) + return +} + +func (store *sessionStore) get(groupname, id string) (resp getSessionResponse) { + resp.responsecode = http.StatusOK + resp.errorstring = "OK" + + group, exists := store.store[groupname] + if !exists { + resp.responsecode = http.StatusNotFound + resp.errorstring = fmt.Sprintf("SessionStore: session '%s/%s' not found", groupname, id) + return + } + + if session, exists := group.sessions[id]; exists { + resp.session = session.s.getInterface() + resp.refId = session.refId + return + } + resp.responsecode = http.StatusNotFound + resp.errorstring = fmt.Sprintf("SessionStore: session '%s/%s' not found", groupname, id) + return +} + +func (store *sessionStore) list(groupname, password string, trusted bool, userdata interface{}, cb SessionsUpdateCB) (resp listSessionsResponse) { + resp.responsecode = http.StatusOK + resp.errorstring = "OK" + resp.sessions = make(map[string]string) + if group, exists := store.store[groupname]; exists { + for id, e := range group.sessions { + resp.sessions[id] = e.refId + } + if cb != nil { + group.updateCBs = append(group.updateCBs, SessionsListCB{cb, userdata}) + } + } else if cb != nil { + newgroup := &sessionStoreGroupElement{} + newgroup.sessions = make(map[string]*sessionStoreSessionElement) + newgroup.updateCBs = []SessionsListCB{SessionsListCB{cb, userdata}} + store.store[groupname] = newgroup + } + return +} + +func (store *sessionStore) remove(groupname, id string) (resp removeSessionResponse) { + resp.responsecode = http.StatusOK + resp.errorstring = "OK" + + group, exists := store.store[groupname] + if !exists { + resp.responsecode = http.StatusNotFound + resp.errorstring = fmt.Sprintf("SessionStore: session '%s/%s' not found", groupname, id) + return + } + + if session, exists := group.sessions[id]; exists { + go session.s.cleanup() // cleanup could take a while -> don't block all the other stuff + refId := session.refId + delete(group.sessions, id) + store.dbglog.Printf("SessionStore: removed session '%s/%s'", groupname, id) + group.callUpdateHandlerRemove(id, refId) + + if len(group.sessions) == 0 && len(group.updateCBs) == 0 { + delete(store.store, groupname) + store.dbglog.Printf("SessionStore: removed group '%s'", groupname) + } + } else { + resp.responsecode = http.StatusNotFound + resp.errorstring = fmt.Sprintf("SessionStore: session '%s/%s' not found", groupname, id) + } + return +} + +func (store *sessionStore) maintenanceTask() { + for name, group := range store.store { + group.callUpdateHandler(nil, nil) + if len(group.sessions) == 0 && len(group.updateCBs) == 0 { + delete(store.store, name) + store.dbglog.Printf("SessionStore: removed group '%s'", name) + } + } +} + +func (store *sessionStore) dispatchRequests() { + defer func() { store.done <- true }() + + mt := time.NewTicker(1 * time.Minute) + for { + select { + case <-store.quit: + return + case <-mt.C: + store.maintenanceTask() + case req := <-store.newChan: + req.response <- store.new(req.ctx, req.refId) + case req := <-store.getChan: + req.response <- store.get(req.group, req.id) + case req := <-store.listChan: + req.response <- store.list(req.group, req.password, req.trusted, req.userdata, req.callback) + case req := <-store.removeChan: + req.response <- store.remove(req.group, req.id) + } + } +} + +// ********************************************************* +// Public Interface + +type SessionStore struct { + newChan chan<- newSessionRequest + getChan chan<- getSessionRequest + listChan chan listSessionsRequest + removeChan chan<- removeSessionRequest +} + +func (store *SessionStore) New(ctx *ImportContext, refId string) (string, *Session, int, string) { + resCh := make(chan newSessionResponse) + req := newSessionRequest{} + req.ctx = ctx + req.refId = refId + req.response = resCh + store.newChan <- req + + res := <-resCh + return res.id, res.session, res.responsecode, res.errorstring +} + +func (store *SessionStore) Get(group, id string) (*Session, string, int, string) { + resCh := make(chan getSessionResponse) + req := getSessionRequest{} + req.group = group + req.id = id + req.response = resCh + store.getChan <- req + + res := <-resCh + return res.session, res.refId, res.responsecode, res.errorstring +} + +func (store *SessionStore) List(group, password string, trusted bool, userdata interface{}, cb SessionsUpdateCB) (map[string]string, int, string) { + resCh := make(chan listSessionsResponse) + req := listSessionsRequest{} + req.group = group + req.password = password + req.trusted = trusted + req.response = resCh + req.callback = cb + req.userdata = userdata + store.listChan <- req + + res := <-resCh + return res.sessions, res.responsecode, res.errorstring +} + +func (store *SessionStore) Remove(group, id string) (int, string) { + resCh := make(chan removeSessionResponse) + req := removeSessionRequest{} + req.group = group + req.id = id + req.response = resCh + store.removeChan <- req + + res := <-resCh + return res.responsecode, res.errorstring +} + +func (store *sessionStore) GetInterface() *SessionStore { + ch := &SessionStore{} + ch.newChan = store.newChan + ch.getChan = store.getChan + ch.listChan = store.listChan + ch.removeChan = store.removeChan + return ch +} + +func (store *sessionStore) Cleanup() { + store.quit <- true + <-store.done + close(store.quit) + close(store.done) + close(store.newChan) + close(store.getChan) + close(store.listChan) + close(store.removeChan) +} + +func NewSessionStore(conf *Config, stdlog, dbglog *log.Logger) (store *sessionStore, err error) { + if stdlog == nil { + stdlog = log.New(ioutil.Discard, "", 0) + } + if dbglog == nil { + dbglog = log.New(ioutil.Discard, "", 0) + } + + store = &sessionStore{} + store.conf = conf + store.stdlog = stdlog + store.dbglog = dbglog + store.quit = make(chan bool, 1) + store.done = make(chan bool) + store.store = make(map[string]*sessionStoreGroupElement) + store.newChan = make(chan newSessionRequest, 10) + store.getChan = make(chan getSessionRequest, 10) + store.listChan = make(chan listSessionsRequest, 10) + store.removeChan = make(chan removeSessionRequest, 10) + + go store.dispatchRequests() + return +} -- GitLab