// // tank // // Import and Playlist Daemon for autoradio project // // // Copyright (C) 2017-2018 Christian Pointner <equinox@helsinki.at> // // This file is part of tank. // // tank is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation, either version 3 of the License, or // any later version. // // tank is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // // You should have received a copy of the GNU General Public License // along with tank. If not, see <http://www.gnu.org/licenses/>. // package importer import ( "io/ioutil" "log" "net/http" "os" "runtime" "sync" "time" "gitlab.servus.at/autoradio/tank/store" ) type ProgressCB func(step int, stepName string, current, total float64, userdata interface{}) bool type DoneCB func(result Result, userdata interface{}) bool type ProgressData struct { Step int StepName string Current float64 Total float64 } type Result struct { ResponseCode int ErrorString string } type AttachmentChunk struct { Data []byte Error error } type ImportContext struct { conf *Config infoLog *log.Logger errLog *log.Logger dbgLog *log.Logger UserName string GroupName string WorkDir string AttachmentChan chan AttachmentChunk ProgressCallBack ProgressCB ProgressCallBackData interface{} Cancel <-chan bool doneChan chan<- Result // TODO: add source config } func NewImportContext(conf *Config, infoLog, errLog, dbgLog *log.Logger) *ImportContext { if infoLog == nil { infoLog = log.New(ioutil.Discard, "", 0) } if errLog == nil { errLog = log.New(ioutil.Discard, "", 0) } if dbgLog == nil { dbgLog = log.New(ioutil.Discard, "", 0) } ctx := &ImportContext{} ctx.conf = conf ctx.infoLog = infoLog ctx.errLog = errLog ctx.dbgLog = dbgLog ctx.UserName = "" ctx.GroupName = "" ctx.AttachmentChan = make(chan AttachmentChunk, 32) ctx.ProgressCallBack = nil ctx.Cancel = nil return ctx } func (ctx *ImportContext) createTempWorkDir() (err error) { ctx.WorkDir, err = ioutil.TempDir(ctx.conf.TempPath, "tank-") return } func (ctx *ImportContext) removeTempWorkDir() { if err := os.RemoveAll(ctx.WorkDir); err != nil { ctx.errLog.Printf("Error removing WorkDir: %s", err) } return } func (ctx *ImportContext) reportProgress(step int, stepName string, current, total float64) { if ctx.ProgressCallBack != nil { if keep := ctx.ProgressCallBack(step, stepName, current, total, ctx.ProgressCallBackData); !keep { ctx.ProgressCallBack = nil } } } func (ctx *ImportContext) isCanceled() bool { return ctx.Cancel != nil && len(ctx.Cancel) > 0 } type Importer struct { infoLog *log.Logger errLog *log.Logger dbgLog *log.Logger sessions *SessionInventory numWorker int wgWorker sync.WaitGroup workerChan chan workerRequest } type workerRequest struct { ctx *ImportContext enqueuedChan chan<- bool } func (im *Importer) runWorker(idx int) { defer im.dbgLog.Printf("importer: worker %d has stopped", idx) im.dbgLog.Printf("importer: worker %d is running", idx) for { select { case req := <-im.workerChan: if req.ctx.isCanceled() { im.dbgLog.Printf("importer: worker %d got canceled request", idx) continue } req.enqueuedChan <- true im.dbgLog.Printf("importer: worker %d got request: %+v", idx, req.ctx) // TODO: check for potential race between cancel and enqueue // TODO: actually do import time.Sleep(60 * time.Second) req.ctx.doneChan <- Result{ResponseCode: http.StatusOK, ErrorString: "import success"} } } } func NewImporter(conf *Config, st *store.Store, infoLog, errLog, dbgLog *log.Logger) (im *Importer, err error) { if infoLog == nil { infoLog = log.New(ioutil.Discard, "", 0) } if errLog == nil { errLog = log.New(ioutil.Discard, "", 0) } if dbgLog == nil { dbgLog = log.New(ioutil.Discard, "", 0) } im = &Importer{infoLog: infoLog, errLog: errLog, dbgLog: dbgLog} im.numWorker = runtime.NumCPU() if conf.Workers > 0 { im.numWorker = conf.Workers } im.workerChan = make(chan workerRequest, im.numWorker) for i := 0; i < im.numWorker; i = i + 1 { im.wgWorker.Add(1) go func(idx int) { defer im.wgWorker.Done() im.runWorker(idx) }(i) } if im.sessions, err = newSessionInventory(conf, st, infoLog, errLog, dbgLog, im.workerChan); err != nil { return } infoLog.Printf("importer: started with %d worker", im.numWorker) return }