// // tank, Import and Playlist Daemon for Aura project // Copyright (C) 2017-2020 Christian Pointner <equinox@helsinki.at> // // This program is free software: you can redistribute it and/or modify // it under the terms of the GNU Affero General Public License as // published by the Free Software Foundation, either version 3 of the // License, or (at your option) any later version. // // This program is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU Affero General Public License for more details. // // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see <https://www.gnu.org/licenses/>. // package importer import ( "log" "sort" "sync" ) type jobInventoryShow struct { jobs map[uint64]*Job changed chan struct{} } func newJobInventoryShow() *jobInventoryShow { g := &jobInventoryShow{} g.jobs = make(map[uint64]*Job) return g } type jobInventory struct { infoLog *log.Logger errLog *log.Logger dbgLog *log.Logger shows map[uint64]*jobInventoryShow mu sync.RWMutex } func (i *jobInventory) ListJobs(showID uint64, offset, limit int) (jobs []*Job) { i.mu.RLock() defer i.mu.RUnlock() ig, exists := i.shows[showID] if !exists { return []*Job{} } for _, job := range ig.jobs { jobs = append(jobs, job) } sort.Slice(jobs, func(i, j int) bool { return jobs[i].ID < jobs[j].ID }) if offset > 0 { if offset >= len(jobs) { return []*Job{} } jobs = jobs[offset:] } if limit >= 0 && limit < len(jobs) { jobs = jobs[:limit] } return } func (i *jobInventory) ListJobsForShows(showIDs []uint64, offset int, limit int) (jobs []*Job) { i.mu.RLock() defer i.mu.RUnlock() for _, showID := range showIDs { ig, exists := i.shows[showID] if exists { for _, job := range ig.jobs { jobs = append(jobs, job) } sort.Slice(jobs, func(i, j int) bool { return jobs[i].ID < jobs[j].ID }) } } if offset > 0 { if offset >= len(jobs) { return []*Job{} } jobs = jobs[offset:] } if limit >= 0 && limit < len(jobs) { jobs = jobs[:limit] } return } func (i *jobInventory) SubscribeJobs(showID uint64) (jobs []*Job, changed <-chan struct{}) { i.mu.RLock() defer i.mu.RUnlock() ig, exists := i.shows[showID] if !exists { ig = newJobInventoryShow() ig.changed = make(chan struct{}) return jobs, ig.changed } for _, j := range ig.jobs { jobs = append(jobs, j) } if ig.changed == nil { ig.changed = make(chan struct{}) } return jobs, ig.changed } func (i *jobInventory) GetOrNewJob(showID uint64, id uint64, im *Importer, src *SourceURL, user, refID string) (job *Job) { i.mu.Lock() defer i.mu.Unlock() ig, exists := i.shows[showID] if !exists { ig = newJobInventoryShow() i.shows[showID] = ig } if job, exists = ig.jobs[id]; !exists { njob := newJob(im, showID, id, src, user, refID) ig.jobs[id] = njob i.dbgLog.Printf("importer: job-inventory added job(%d/%d)", showID, id) if ig.changed != nil { close(ig.changed) ig.changed = nil } return njob } return } func (i *jobInventory) GetJob(showID uint64, id uint64) (job *Job, err error) { i.mu.RLock() defer i.mu.RUnlock() ig, exists := i.shows[showID] if !exists { return nil, ErrNotFound } if job, exists = ig.jobs[id]; !exists { return nil, ErrNotFound } return } func (i *jobInventory) DeleteJob(showID uint64, id uint64) (err error) { i.mu.Lock() defer i.mu.Unlock() ig, exists := i.shows[showID] if !exists { return ErrNotFound } if _, exists = ig.jobs[id]; !exists { return ErrNotFound } delete(ig.jobs, id) i.dbgLog.Printf("importer: job-inventory removed job(%d/%d)", showID, id) if len(ig.jobs) == 0 { delete(i.shows, showID) i.dbgLog.Printf("importer: job-inventory also removed now empty show %d", showID) } if ig.changed != nil { close(ig.changed) ig.changed = nil } return } func newJobInventory(infoLog, errLog, dbgLog *log.Logger) *jobInventory { i := &jobInventory{infoLog: infoLog, errLog: errLog, dbgLog: dbgLog} i.shows = make(map[uint64]*jobInventoryShow) return i }