Skip to content
Snippets Groups Projects
Commit 500c7700 authored by Christian Pointner's avatar Christian Pointner
Browse files

basic structure for new importer

parent 9eabe12a
No related branches found
No related tags found
No related merge requests found
......@@ -25,151 +25,85 @@
package importer
import (
"errors"
"io/ioutil"
"log"
"net/http"
"os"
"runtime"
"sync"
"time"
"gitlab.servus.at/autoradio/tank/store"
)
type ProgressCB func(step int, stepName string, current, total float64, userdata interface{}) bool
type DoneCB func(result Result, userdata interface{}) bool
var (
ErrNotImplemented = errors.New("not implemented")
ErrNotFound = errors.New("not found")
ErrTimeout = errors.New("timeout")
)
type ProgressData struct {
Step int
StepName string
Current float64
Total float64
type Importer struct {
conf *Config
infoLog *log.Logger
errLog *log.Logger
dbgLog *log.Logger
wgWorker sync.WaitGroup
work chan *Job
jobs *jobInventory
}
type Result struct {
ResponseCode int
ErrorString string
func (im *Importer) CreateJob(group string, id uint64, src, user, refID string) (*Job, error) {
// TODO: implement this
// sanity check inputs:
// - parse src: valid URI?
// - does fileID exist and does it belong to group?
// - file.Source.Import.State == new?
//
// add job to inventory -> return existing job if it already exists
// job.Start() will enqueue the job
return nil, ErrNotImplemented
}
type AttachmentChunk struct {
Data []byte
Error error
func (im *Importer) GetJob(group string, id uint64) (*Job, error) {
// TODO: pass this on to job inventory
return nil, ErrNotImplemented
}
type ImportContext struct {
conf *Config
infoLog *log.Logger
errLog *log.Logger
dbgLog *log.Logger
fileID uint64
UserName string
GroupName string
WorkDir string
AttachmentChan chan AttachmentChunk
ProgressCallBack ProgressCB
ProgressCallBackData interface{}
Cancel <-chan bool
doneChan chan<- Result
// TODO: add source config
func (im *Importer) ListJobs(group string) ([]*Job, error) {
// TODO: pass this on to job inventory
return nil, ErrNotImplemented
}
func newImportContext(conf *Config, infoLog, errLog, dbgLog *log.Logger) *ImportContext {
if infoLog == nil {
infoLog = log.New(ioutil.Discard, "", 0)
}
if errLog == nil {
errLog = log.New(ioutil.Discard, "", 0)
}
if dbgLog == nil {
dbgLog = log.New(ioutil.Discard, "", 0)
}
ctx := &ImportContext{}
ctx.conf = conf
ctx.infoLog = infoLog
ctx.errLog = errLog
ctx.dbgLog = dbgLog
ctx.fileID = 0
ctx.UserName = ""
ctx.GroupName = ""
ctx.AttachmentChan = make(chan AttachmentChunk, 32)
ctx.ProgressCallBack = nil
ctx.Cancel = nil
return ctx
}
// handle subscriptions to new jobs-of-group -> pass on to job inventory
func (ctx *ImportContext) createTempWorkDir() (err error) {
ctx.WorkDir, err = ioutil.TempDir(ctx.conf.TempPath, "tank-")
return
func (im *Importer) deleteJob(group string, id uint64) ([]*Job, error) {
// TODO: pass this on to job inventory
return nil, ErrNotImplemented
}
func (ctx *ImportContext) removeTempWorkDir() {
if err := os.RemoveAll(ctx.WorkDir); err != nil {
ctx.errLog.Printf("Error removing WorkDir: %s", err)
}
return
}
func (im *Importer) runWorker(idx int) {
defer im.dbgLog.Printf("importer: worker(%d) has stopped", idx)
func (ctx *ImportContext) reportProgress(step int, stepName string, current, total float64) {
if ctx.ProgressCallBack != nil {
if keep := ctx.ProgressCallBack(step, stepName, current, total, ctx.ProgressCallBackData); !keep {
ctx.ProgressCallBack = nil
im.dbgLog.Printf("importer: worker(%d) is running", idx)
for job := range im.work {
select {
case <-job.ctx.Done():
continue
default:
}
}
}
func (ctx *ImportContext) isCanceled() bool {
return ctx.Cancel != nil && len(ctx.Cancel) > 0
}
type Importer struct {
conf *Config
infoLog *log.Logger
errLog *log.Logger
dbgLog *log.Logger
sessions *SessionInventory
wgWorker sync.WaitGroup
workerChan chan workerRequest
}
type workerRequest struct {
ctx *ImportContext
enqueuedChan chan<- bool
}
func (im *Importer) runWorker(idx int) {
defer im.dbgLog.Printf("importer: worker %d has stopped", idx)
im.infoLog.Printf("importer: worker(%d) starting job(%s/%d) ...", idx, job.Group, job.ID)
im.dbgLog.Printf("importer: worker %d is running", idx)
for {
err := ErrTimeout
select {
case req := <-im.workerChan:
if req.ctx.isCanceled() {
im.dbgLog.Printf("importer: worker %d got canceled request", idx)
continue
}
req.enqueuedChan <- true
im.dbgLog.Printf("importer: worker %d got request: %+v", idx, req.ctx)
// TODO: check for potential race between cancel and enqueue
// TODO: actually do import
time.Sleep(60 * time.Second)
req.ctx.doneChan <- Result{ResponseCode: http.StatusOK, ErrorString: "import success"}
case <-job.ctx.Done():
case err = <-job.start():
}
if err != nil {
im.errLog.Printf("importer: worker(%d) job(%s/%d) failed: %s", idx, job.Group, job.ID, err.Error())
} else {
im.infoLog.Printf("importer: worker(%d) successfully completed job(%s/%d)", idx, job.Group, job.ID)
}
}
}
func (im *Importer) New(fileID uint64, user, group, refID string) (*Session, int, string) {
ctx := newImportContext(im.conf, im.infoLog, im.errLog, im.dbgLog)
ctx.fileID = fileID
ctx.UserName = user
ctx.GroupName = group
return im.sessions.New(ctx, refID)
}
func NewImporter(conf Config, st *store.Store, infoLog, errLog, dbgLog *log.Logger) (im *Importer, err error) {
if infoLog == nil {
infoLog = log.New(ioutil.Discard, "", 0)
......@@ -186,7 +120,7 @@ func NewImporter(conf Config, st *store.Store, infoLog, errLog, dbgLog *log.Logg
if im.conf.Workers <= 0 {
im.conf.Workers = runtime.NumCPU()
}
im.workerChan = make(chan workerRequest, im.conf.Workers)
im.work = make(chan *Job, im.conf.Workers)
for i := 0; i < im.conf.Workers; i = i + 1 {
im.wgWorker.Add(1)
go func(idx int) {
......@@ -194,10 +128,7 @@ func NewImporter(conf Config, st *store.Store, infoLog, errLog, dbgLog *log.Logg
im.runWorker(idx)
}(i)
}
if im.sessions, err = newSessionInventory(&conf, st, infoLog, errLog, dbgLog, im.workerChan); err != nil {
return
}
im.jobs = newJobInventory(infoLog, errLog, dbgLog)
infoLog.Printf("importer: started with %d worker", im.conf.Workers)
return
......
//
// tank
//
// Import and Playlist Daemon for autoradio project
//
//
// Copyright (C) 2017-2018 Christian Pointner <equinox@helsinki.at>
//
// This file is part of tank.
//
// tank is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// any later version.
//
// tank is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with tank. If not, see <http://www.gnu.org/licenses/>.
//
package importer
import (
"context"
"net/url"
"time"
)
type Job struct {
im *Importer `json:"-"`
ctx context.Context `json:"-"`
Cancel context.CancelFunc `json:"-"`
CreatedAt time.Time `json:"created"`
ID uint64 `json:"id"`
Group string `json:"group"`
User string `json:"user"`
Source url.URL `json:"source"`
RefID string `json:"ref-id,omitempty"`
}
func (job *Job) run() (err error) {
// TODO: update store.file: set import state to ImportRunning
// do the actual import here
// use job.ctx.Done() wherever we wait for something
// return ErrTimeout in case job.ctx.Done() is closed
time.Sleep(60 * time.Second)
// update store.file: set import state to ImportDone or ImportAborted
// send result to all done subscriptions
// trigger removal from job inventory
return
}
func (job *Job) start() <-chan error {
done := make(chan error, 1)
go func() {
done <- job.run()
close(done)
}()
return done
}
// handle subscriptions for progress and done
func (job *Job) Start(timeout time.Duration) {
// shall we use a global context here so we can cancel all jobs at once? i.e. for forced shutdown...
if timeout > 0 {
job.ctx, job.Cancel = context.WithTimeout(context.Background(), timeout)
} else {
job.ctx, job.Cancel = context.WithCancel(context.Background())
}
// TODO: enqueue job to importer.work
}
func newJob(im *Importer, group string, id uint64, src url.URL, user, refID string) *Job {
job := &Job{im: im, Group: group, ID: id, Source: src, User: user, RefID: refID}
job.CreatedAt = time.Now()
return job
}
//
// tank
//
// Import and Playlist Daemon for autoradio project
//
//
// Copyright (C) 2017-2018 Christian Pointner <equinox@helsinki.at>
//
// This file is part of tank.
//
// tank is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// any later version.
//
// tank is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with tank. If not, see <http://www.gnu.org/licenses/>.
//
package importer
import (
"log"
)
type jobInventoryGroup struct {
jobs map[uint64]*Job
// add list of subscription channels
}
func newJobInventoryGroup() *jobInventoryGroup {
g := &jobInventoryGroup{}
g.jobs = make(map[uint64]*Job)
return g
}
type jobInventory struct {
infoLog *log.Logger
errLog *log.Logger
dbgLog *log.Logger
groups map[string]*jobInventoryGroup
}
func (i *jobInventory) insertOrGetJob(group string, id uint64, job *Job) (*Job, error) {
// TODO: implement this
return nil, ErrNotImplemented
}
func (i *jobInventory) getJob(group string, id uint64) (*Job, error) {
// TODO: implement this
return nil, ErrNotImplemented
}
// handle subscriptions for new jobs
func (i *jobInventory) deleteJob(group string, id uint64) error {
// TODO: implement this
return ErrNotImplemented
}
func newJobInventory(infoLog, errLog, dbgLog *log.Logger) *jobInventory {
i := &jobInventory{infoLog: infoLog, errLog: errLog, dbgLog: dbgLog}
i.groups = make(map[string]*jobInventoryGroup)
return i
}
//
// tank
//
// Import and Playlist Daemon for autoradio project
//
//
// Copyright (C) 2017-2018 Christian Pointner <equinox@helsinki.at>
//
// This file is part of tank.
//
// tank is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// any later version.
//
// tank is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with tank. If not, see <http://www.gnu.org/licenses/>.
//
package importer
import (
"fmt"
"net/http"
"time"
)
type SessionState uint8
const (
StateNew SessionState = iota
StatePending
StateRunning
StateCanceled
StateDone
StateTimeout
)
func (u SessionState) String() string {
switch u {
case StateNew:
return "new"
case StatePending:
return "pending"
case StateRunning:
return "running"
case StateCanceled:
return "canceled"
case StateDone:
return "done"
case StateTimeout:
return "timeout"
}
return "unknown"
}
func (u SessionState) MarshalText() (data []byte, err error) {
data = []byte(u.String())
return
}
type sessionProgressCB struct {
cb ProgressCB
userdata interface{}
}
type sessionDoneCB struct {
cb DoneCB
userdata interface{}
}
type sessionAddProgressHandlerResponse struct {
err error
}
type sessionAddProgressHandlerRequest struct {
userdata interface{}
callback ProgressCB
response chan<- sessionAddProgressHandlerResponse
}
type sessionAddDoneHandlerResponse struct {
err error
}
type sessionAddDoneHandlerRequest struct {
userdata interface{}
callback DoneCB
response chan<- sessionAddDoneHandlerResponse
}
type attachUploaderResponse struct {
cancel <-chan bool
attachment chan<- AttachmentChunk
}
type attachUploaderRequest struct {
response chan<- attachUploaderResponse
}
type session struct {
id uint64
ctx ImportContext
state SessionState
removeFunc func()
done chan bool
quit chan bool
timer *time.Timer
workerChan chan<- workerRequest
enqueuedIntChan chan bool
cancelIntChan chan bool
progressRateLimit time.Duration
progressIntChan chan ProgressData
doneIntChan chan Result
runChan chan time.Duration
cancelChan chan bool
addProgressChan chan sessionAddProgressHandlerRequest
addDoneChan chan sessionAddDoneHandlerRequest
attachUploaderChan chan attachUploaderRequest
progressCBs []*sessionProgressCB
doneCBs []*sessionDoneCB
cancelUploader chan bool
}
func sessionProgressCallback(step int, stepName string, current, total float64, userdata interface{}) bool {
out := userdata.(chan<- ProgressData)
out <- ProgressData{step, stepName, current, total}
return true
}
func (s *session) run(timeout time.Duration) {
s.ctx.ProgressCallBack = sessionProgressCallback
s.ctx.ProgressCallBackData = (chan<- ProgressData)(s.progressIntChan)
s.ctx.Cancel = s.cancelIntChan
s.ctx.doneChan = s.doneIntChan
s.workerChan <- workerRequest{&(s.ctx), s.enqueuedIntChan}
s.state = StatePending
if timeout <= 0 || timeout > 3*time.Hour {
s.ctx.infoLog.Printf("importer: requested session timeout (%v) is invalid or too high - setting it to 3h", timeout)
timeout = 3 * time.Hour
}
s.timer.Reset(timeout)
return
}
func (s *session) cancel(reason string) {
if s.state == StatePending || s.state == StateRunning {
s.ctx.dbgLog.Printf("importer: canceling pending/running import (%s)", reason)
select {
case s.cancelIntChan <- true:
default: // session got canceled already??
}
}
if s.state == StatePending {
r := &Result{ResponseCode: http.StatusNoContent, ErrorString: reason}
s.callDoneHandler(r)
if s.removeFunc != nil {
s.removeFunc()
}
}
s.state = StateCanceled
}
func (s *session) addProgressHandler(userdata interface{}, cb ProgressCB) (resp sessionAddProgressHandlerResponse) {
if s.state != StateNew && s.state != StatePending && s.state != StateRunning {
resp.err = fmt.Errorf("session is already done/canceled")
}
s.progressCBs = append(s.progressCBs, &sessionProgressCB{cb, userdata})
return
}
func (s *session) addDoneHandler(userdata interface{}, cb DoneCB) (resp sessionAddDoneHandlerResponse) {
if s.state != StateNew && s.state != StatePending && s.state != StateRunning {
resp.err = fmt.Errorf("session is already done/canceled")
}
s.doneCBs = append(s.doneCBs, &sessionDoneCB{cb, userdata})
return
}
func (s *session) callProgressHandler(p *ProgressData) {
for _, cb := range s.progressCBs {
if cb.cb != nil {
if keep := cb.cb(p.Step, p.StepName, p.Current, p.Total, cb.userdata); !keep {
cb.cb = nil
}
}
}
}
func (s *session) callDoneHandler(r *Result) {
for _, cb := range s.doneCBs {
if cb.cb != nil {
if keep := cb.cb(*r, cb.userdata); !keep {
cb.cb = nil
}
}
}
}
func (s *session) attachUploader() (resp attachUploaderResponse) {
if s.cancelUploader != nil {
return
}
s.cancelUploader = make(chan bool, 1)
resp.cancel = s.cancelUploader
resp.attachment = s.ctx.AttachmentChan
return
}
func (s *session) dispatchRequests() {
defer func() {
if s.cancelUploader != nil {
close(s.cancelUploader)
}
s.done <- true
}()
var lastProgress *ProgressData
progressPending := 0
pt := time.NewTimer(s.progressRateLimit)
pt.Stop()
for {
select {
case <-s.quit:
s.cancel("session canceled due to global shutdown")
return
case <-s.timer.C:
s.cancel("session timed out")
case t := <-s.runChan:
if s.state == StateNew {
s.run(t)
}
case <-s.enqueuedIntChan:
if s.state == StatePending {
s.state = StateRunning
}
case <-s.cancelChan:
s.cancel("session canceled by user")
case req := <-s.addProgressChan:
req.response <- s.addProgressHandler(req.userdata, req.callback)
case req := <-s.addDoneChan:
req.response <- s.addDoneHandler(req.userdata, req.callback)
case <-pt.C:
if progressPending > 1 && lastProgress != nil {
s.callProgressHandler(lastProgress)
}
progressPending = 0
lastProgress = nil
case p := <-s.progressIntChan:
if s.state == StateRunning {
if lastProgress == nil {
s.callProgressHandler(&p)
pt.Reset(s.progressRateLimit)
} else if lastProgress.Step != p.Step {
s.callProgressHandler(lastProgress)
s.callProgressHandler(&p)
pt.Reset(s.progressRateLimit)
}
lastProgress = &p
progressPending++
}
case r := <-s.doneIntChan:
if s.state != StateTimeout {
s.timer.Stop()
s.state = StateDone
s.callDoneHandler(&r)
if s.removeFunc != nil {
s.removeFunc()
}
}
case req := <-s.attachUploaderChan:
req.response <- s.attachUploader()
}
}
}
// *********************************************************
// Public Interface
type Session struct {
runChan chan<- time.Duration
cancelChan chan<- bool
addProgressChan chan<- sessionAddProgressHandlerRequest
addDoneChan chan<- sessionAddDoneHandlerRequest
attachUploaderChan chan<- attachUploaderRequest
}
func (s *Session) Run(timeout time.Duration) {
select {
case s.runChan <- timeout:
default: // command is already pending or session is about to be closed/removed
}
}
func (s *Session) Cancel() {
select {
case s.cancelChan <- true:
default: // cancel is already pending or session is about to be closed/removed
}
}
func (s *Session) AddProgressHandler(userdata interface{}, cb ProgressCB) error {
resCh := make(chan sessionAddProgressHandlerResponse)
req := sessionAddProgressHandlerRequest{}
req.userdata = userdata
req.callback = cb
req.response = resCh
select {
case s.addProgressChan <- req:
default:
return fmt.Errorf("session is about to be closed/removed")
}
res := <-resCh
return res.err
}
func (s *Session) AddDoneHandler(userdata interface{}, cb DoneCB) error {
resCh := make(chan sessionAddDoneHandlerResponse)
req := sessionAddDoneHandlerRequest{}
req.userdata = userdata
req.callback = cb
req.response = resCh
select {
case s.addDoneChan <- req:
default:
return fmt.Errorf("session is about to be closed/removed")
}
res := <-resCh
return res.err
}
func (s *Session) AttachUploader() (<-chan bool, chan<- AttachmentChunk) {
resCh := make(chan attachUploaderResponse)
req := attachUploaderRequest{}
req.response = resCh
select {
case s.attachUploaderChan <- req:
default:
// session is about to be closed/removed
return nil, nil
}
res := <-resCh
return res.cancel, res.attachment
}
// *********************************************************
// Semi-Public Interface (only used by sessionInventory)
func (s *session) public() *Session {
ch := &Session{}
ch.runChan = s.runChan
ch.cancelChan = s.cancelChan
ch.addProgressChan = s.addProgressChan
ch.addDoneChan = s.addDoneChan
ch.attachUploaderChan = s.attachUploaderChan
return ch
}
func (s *session) cleanup() {
s.quit <- true
s.ctx.dbgLog.Printf("importer: waiting for session to close")
<-s.done
close(s.quit)
close(s.done)
s.timer.Stop()
// don't close the channels we give out because this might lead to a panic if
// somebody wites to an already removed session
// close(s.cancelIntChan)
// close(s.progressIntChan)
// close(s.doneIntChan)
// close(s.runChan)
// close(s.cancelChan)
// close(s.addProgressChan)
// close(s.addDoneChan)
// close(s.attachUploader)
s.ctx.removeTempWorkDir()
s.ctx.dbgLog.Printf("importer: cleanup is now done")
}
func newSession(ctx *ImportContext, workerChan chan<- workerRequest, id uint64, removeFunc func()) (s *session, err error) {
s = &session{id: id}
s.state = StateNew
s.removeFunc = removeFunc
s.ctx = *ctx
if err = s.ctx.createTempWorkDir(); err != nil {
return
}
s.quit = make(chan bool, 1)
s.done = make(chan bool)
s.timer = time.NewTimer(60 * time.Second)
s.workerChan = workerChan
s.enqueuedIntChan = make(chan bool)
s.cancelIntChan = make(chan bool, 1)
s.progressRateLimit = 100 * time.Millisecond // TODO: hardcoded value
s.progressIntChan = make(chan ProgressData, 10)
s.doneIntChan = make(chan Result, 1)
s.runChan = make(chan time.Duration, 1)
s.cancelChan = make(chan bool, 1)
s.addProgressChan = make(chan sessionAddProgressHandlerRequest, 10)
s.addDoneChan = make(chan sessionAddDoneHandlerRequest, 10)
s.attachUploaderChan = make(chan attachUploaderRequest, 1)
go s.dispatchRequests()
return
}
//
// tank
//
// Import and Playlist Daemon for autoradio project
//
//
// Copyright (C) 2017-2018 Christian Pointner <equinox@helsinki.at>
//
// This file is part of tank.
//
// tank is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// any later version.
//
// tank is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with tank. If not, see <http://www.gnu.org/licenses/>.
//
package importer
import (
"fmt"
"log"
"net/http"
"strings"
"time"
"gitlab.servus.at/autoradio/tank/store"
)
type newSessionResponse struct {
session *Session
responsecode int
errorstring string
}
type newSessionRequest struct {
ctx *ImportContext
refID string
response chan newSessionResponse
}
type getSessionResponse struct {
session *Session
refID string
responsecode int
errorstring string
}
type getSessionRequest struct {
group string
id uint64
response chan getSessionResponse
}
type SessionsUpdateCB func(added, removed map[uint64]string, userdata interface{}) bool
type SessionsListCB struct {
cb SessionsUpdateCB
userdata interface{}
}
type listSessionsResponse struct {
sessions map[uint64]string
responsecode int
errorstring string
}
type listSessionsRequest struct {
group string
callback SessionsUpdateCB
userdata interface{}
response chan listSessionsResponse
}
type removeSessionResponse struct {
responsecode int
errorstring string
}
type removeSessionRequest struct {
group string
id uint64
response chan removeSessionResponse
}
type inventorySession struct {
s *session
refID string
}
type inventoryGroup struct {
sessions map[uint64]*inventorySession
updateCBs []SessionsListCB
}
func (group *inventoryGroup) callUpdateHandler(added, removed map[uint64]string) {
var keptCBs []SessionsListCB
for _, cb := range group.updateCBs {
if cb.cb != nil {
if keep := cb.cb(added, removed, cb.userdata); keep {
keptCBs = append(keptCBs, cb)
}
}
}
group.updateCBs = keptCBs
}
func (group *inventoryGroup) callUpdateHandlerAdd(id uint64, refID string) {
added := make(map[uint64]string)
added[id] = refID
group.callUpdateHandler(added, nil)
}
func (group *inventoryGroup) callUpdateHandlerRemove(id uint64, refID string) {
removed := make(map[uint64]string)
removed[id] = refID
group.callUpdateHandler(nil, removed)
}
type SessionInventory struct {
groups map[string]*inventoryGroup
conf *Config
store *store.Store
infoLog *log.Logger
errLog *log.Logger
dbgLog *log.Logger
quit chan bool
done chan bool
newChan chan newSessionRequest
getChan chan getSessionRequest
listChan chan listSessionsRequest
removeChan chan removeSessionRequest
workerChan chan<- workerRequest
}
func (inventory *SessionInventory) new(ctx *ImportContext, refID string) (resp newSessionResponse) {
resp.responsecode = http.StatusCreated
resp.errorstring = "OK"
if _, exists := inventory.groups[ctx.GroupName]; !exists {
newgroup := &inventoryGroup{}
newgroup.sessions = make(map[uint64]*inventorySession)
inventory.groups[ctx.GroupName] = newgroup
} else {
if _, exists := inventory.groups[ctx.GroupName].sessions[ctx.fileID]; exists {
resp.responsecode = http.StatusConflict
resp.errorstring = fmt.Sprintf("import for file %d of group %s already in progress", ctx.fileID, ctx.GroupName)
return
}
}
ctx.conf = inventory.conf
if pref := ctx.infoLog.Prefix(); strings.Contains(pref, "%s") {
ctx.infoLog.SetPrefix(fmt.Sprintf(pref, ctx.fileID))
}
if pref := ctx.errLog.Prefix(); strings.Contains(pref, "%s") {
ctx.errLog.SetPrefix(fmt.Sprintf(pref, ctx.fileID))
}
if pref := ctx.dbgLog.Prefix(); strings.Contains(pref, "%s") {
ctx.dbgLog.SetPrefix(fmt.Sprintf(pref, ctx.fileID))
}
s, err := newSession(ctx, inventory.workerChan, ctx.fileID, func() { inventory.Remove(ctx.GroupName, ctx.fileID) })
if err != nil {
resp.responsecode = http.StatusInternalServerError
resp.errorstring = err.Error()
return
}
inventory.groups[ctx.GroupName].sessions[ctx.fileID] = &inventorySession{s, refID}
resp.session = inventory.groups[ctx.GroupName].sessions[ctx.fileID].s.public()
inventory.dbgLog.Printf("importer: created session for '%s' -> %d", ctx.GroupName, ctx.fileID)
inventory.groups[ctx.GroupName].callUpdateHandlerAdd(ctx.fileID, refID)
return
}
func (inventory *SessionInventory) get(groupname string, id uint64) (resp getSessionResponse) {
resp.responsecode = http.StatusOK
resp.errorstring = "OK"
group, exists := inventory.groups[groupname]
if !exists {
resp.responsecode = http.StatusNotFound
resp.errorstring = fmt.Sprintf("importer: session '%s/%d' not found", groupname, id)
return
}
if session, exists := group.sessions[id]; exists {
resp.session = session.s.public()
resp.refID = session.refID
return
}
resp.responsecode = http.StatusNotFound
resp.errorstring = fmt.Sprintf("importer: session '%s/%d' not found", groupname, id)
return
}
func (inventory *SessionInventory) list(groupname string, userdata interface{}, cb SessionsUpdateCB) (resp listSessionsResponse) {
resp.responsecode = http.StatusOK
resp.errorstring = "OK"
resp.sessions = make(map[uint64]string)
if group, exists := inventory.groups[groupname]; exists {
for id, e := range group.sessions {
resp.sessions[id] = e.refID
}
if cb != nil {
group.updateCBs = append(group.updateCBs, SessionsListCB{cb, userdata})
}
} else if cb != nil {
newgroup := &inventoryGroup{}
newgroup.sessions = make(map[uint64]*inventorySession)
newgroup.updateCBs = []SessionsListCB{SessionsListCB{cb, userdata}}
inventory.groups[groupname] = newgroup
}
return
}
func (inventory *SessionInventory) remove(groupname string, id uint64) (resp removeSessionResponse) {
resp.responsecode = http.StatusOK
resp.errorstring = "OK"
group, exists := inventory.groups[groupname]
if !exists {
resp.responsecode = http.StatusNotFound
resp.errorstring = fmt.Sprintf("importer: session '%s/%d' not found", groupname, id)
return
}
if session, exists := group.sessions[id]; exists {
go session.s.cleanup() // cleanup could take a while -> don't block all the other stuff
refID := session.refID
delete(group.sessions, id)
inventory.dbgLog.Printf("importer: removed session '%s/%d'", groupname, id)
group.callUpdateHandlerRemove(id, refID)
if len(group.sessions) == 0 && len(group.updateCBs) == 0 {
delete(inventory.groups, groupname)
inventory.dbgLog.Printf("importer: removed group '%s'", groupname)
}
} else {
resp.responsecode = http.StatusNotFound
resp.errorstring = fmt.Sprintf("importer: session '%s/%d' not found", groupname, id)
}
return
}
func (inventory *SessionInventory) maintenanceTask() {
for name, group := range inventory.groups {
group.callUpdateHandler(nil, nil)
if len(group.sessions) == 0 && len(group.updateCBs) == 0 {
delete(inventory.groups, name)
inventory.dbgLog.Printf("importer: removed group '%s'", name)
}
}
}
func (inventory *SessionInventory) dispatchRequests() {
defer func() { inventory.done <- true }()
inventory.dbgLog.Println("importer: session-inventory initialized")
mt := time.NewTicker(1 * time.Minute)
for {
select {
case <-inventory.quit:
return
case <-mt.C:
inventory.maintenanceTask()
case req := <-inventory.newChan:
req.response <- inventory.new(req.ctx, req.refID)
case req := <-inventory.getChan:
req.response <- inventory.get(req.group, req.id)
case req := <-inventory.listChan:
req.response <- inventory.list(req.group, req.userdata, req.callback)
case req := <-inventory.removeChan:
req.response <- inventory.remove(req.group, req.id)
}
}
}
// *********************************************************
// Public Interface
func (inventory *SessionInventory) New(ctx *ImportContext, refID string) (*Session, int, string) {
resCh := make(chan newSessionResponse)
req := newSessionRequest{}
req.ctx = ctx
req.refID = refID
req.response = resCh
inventory.newChan <- req
res := <-resCh
return res.session, res.responsecode, res.errorstring
}
func (inventory *SessionInventory) Get(group string, id uint64) (*Session, string, int, string) {
resCh := make(chan getSessionResponse)
req := getSessionRequest{}
req.group = group
req.id = id
req.response = resCh
inventory.getChan <- req
res := <-resCh
return res.session, res.refID, res.responsecode, res.errorstring
}
func (inventory *SessionInventory) List(group string, userdata interface{}, cb SessionsUpdateCB) (map[uint64]string, int, string) {
resCh := make(chan listSessionsResponse)
req := listSessionsRequest{}
req.group = group
req.response = resCh
req.callback = cb
req.userdata = userdata
inventory.listChan <- req
res := <-resCh
return res.sessions, res.responsecode, res.errorstring
}
func (inventory *SessionInventory) Remove(group string, id uint64) (int, string) {
resCh := make(chan removeSessionResponse)
req := removeSessionRequest{}
req.group = group
req.id = id
req.response = resCh
inventory.removeChan <- req
res := <-resCh
return res.responsecode, res.errorstring
}
func (inventory *SessionInventory) Cleanup() {
inventory.quit <- true
<-inventory.done
close(inventory.quit)
close(inventory.done)
close(inventory.newChan)
close(inventory.getChan)
close(inventory.listChan)
close(inventory.removeChan)
}
func newSessionInventory(conf *Config, st *store.Store, infoLog, errLog, dbgLog *log.Logger, workerChan chan<- workerRequest) (inventory *SessionInventory, err error) {
inventory = &SessionInventory{}
inventory.conf = conf
inventory.store = st
inventory.infoLog = infoLog
inventory.errLog = errLog
inventory.dbgLog = dbgLog
inventory.quit = make(chan bool, 1)
inventory.done = make(chan bool)
inventory.groups = make(map[string]*inventoryGroup)
inventory.newChan = make(chan newSessionRequest, 10)
inventory.getChan = make(chan getSessionRequest, 10)
inventory.listChan = make(chan listSessionsRequest, 10)
inventory.removeChan = make(chan removeSessionRequest, 10)
inventory.workerChan = workerChan
go inventory.dispatchRequests()
return
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment