From c92deb306805e87b82a2320c837f202f0bb2b404 Mon Sep 17 00:00:00 2001 From: Christian Pointner <equinox@helsinki.at> Date: Fri, 7 Sep 2018 23:40:24 +0200 Subject: [PATCH] importer.JobSource is now an Interface --- importer/fetch.go | 33 ++++------------ importer/importer.go | 5 ++- importer/job.go | 10 ++--- importer/job_source.go | 89 ++++++++++++++++++++++++++++++++++++++++++ importer/types.go | 9 +++-- 5 files changed, 110 insertions(+), 36 deletions(-) create mode 100644 importer/job_source.go diff --git a/importer/fetch.go b/importer/fetch.go index b8589bc..09e8b57 100644 --- a/importer/fetch.go +++ b/importer/fetch.go @@ -28,26 +28,10 @@ import ( "encoding/base64" "io" "sync/atomic" - "time" - "unsafe" "golang.org/x/crypto/blake2b" ) -type devNull uint64 - -func (d *devNull) Read(p []byte) (n int, err error) { - l := len(p) - if *d > devNull(l) { - *d = *d - devNull(l) - time.Sleep(20 * time.Millisecond) - return l, nil - } - l = int(*d) - *d = 0 - return l, io.EOF -} - func (job *Job) prepareSource() { switch job.Source.Scheme { case SourceSchemeAttachment: @@ -58,11 +42,8 @@ func (job *Job) prepareSource() { default: // simulate a 10 MB file... - l := uint64(10 * 1024 * 1024) - r := devNull(l) - src := &JobSource{len: l, r: &r} - src.done = make(chan *JobSourceResult, 1) - atomic.StorePointer((*unsafe.Pointer)(unsafe.Pointer(&job.source)), unsafe.Pointer(src)) + atomic.StoreUint32(&job.sourceSet, 1) + job.source = newJobSourceNull(10 * 1024 * 1024) close(job.subC.sourceAttached) } } @@ -83,7 +64,7 @@ func (job *Job) fetch() (interface{}, error) { } // job.source is now initialized and points to a valid source // make sure a potentially connected source gets notified in any case - defer close(job.source.done) + defer job.source.Done(nil) conv, err := job.newFetchConverter() if err != nil { @@ -99,8 +80,8 @@ func (job *Job) fetch() (interface{}, error) { if err != nil { panic("creating hash function failed: " + err.Error()) } - src := io.TeeReader(job.source.r, hash) - written, err := io.Copy(&progressWriter{job, StepFetching, job.source.len, 0, conv}, src) + src := io.TeeReader(job.source, hash) + written, err := io.Copy(&progressWriter{job, StepFetching, job.source.Len(), 0, conv}, src) if err != nil { done <- copyResult{err, ""} return @@ -117,7 +98,7 @@ func (job *Job) fetch() (interface{}, error) { conv.Close() go conv.Wait() // do the zombie reaping in seperate go routine since we are not interested in the result anyway err = job.ctx.Err() - job.source.done <- &JobSourceResult{Err: err} + job.source.Done(&JobSourceResult{Err: err}) return nil, err case res = <-done: } @@ -133,6 +114,6 @@ func (job *Job) fetch() (interface{}, error) { job.im.dbgLog.Println(l.Line) } job.Progress.set(StepFetching, 1) - job.source.done <- &JobSourceResult{err, res.hash, convLog} + job.source.Done(&JobSourceResult{err, res.hash, convLog}) return loudness, err } diff --git a/importer/importer.go b/importer/importer.go index 34a8ed2..7991eac 100644 --- a/importer/importer.go +++ b/importer/importer.go @@ -52,7 +52,10 @@ func (im *Importer) ListJobs(group string) (Jobs, error) { func (im *Importer) CreateJob(group string, id uint64, src url.URL, user, refID string) (*Job, error) { // TODO: update this list once we implemented other sources - if src.Scheme != SourceSchemeAttachment { + switch src.Scheme { + case SourceSchemeAttachment: + // case SourceSchemeFlowJS: + default: return nil, ErrSourceNotSupported } diff --git a/importer/job.go b/importer/job.go index 8becf0f..d18332c 100644 --- a/importer/job.go +++ b/importer/job.go @@ -32,7 +32,6 @@ import ( "os" "sync/atomic" "time" - "unsafe" "gitlab.servus.at/autoradio/tank/store" ) @@ -51,7 +50,8 @@ type Job struct { Source SourceURL `json:"source"` RefID string `json:"ref-id,omitempty"` Progress JobProgress `json:"progress"` - source *JobSource + source JobSource + sourceSet uint32 workDir string subC struct { sourceAttached chan struct{} @@ -159,11 +159,11 @@ func (job *Job) AttachSource(length uint64, r io.Reader) (<-chan *JobSourceResul return nil, ErrImportNotRunning } - src := &JobSource{len: length, r: r} - src.done = make(chan *JobSourceResult, 1) - if ok := atomic.CompareAndSwapPointer((*unsafe.Pointer)(unsafe.Pointer(&job.source)), unsafe.Pointer(nil), unsafe.Pointer(src)); !ok { + if ok := atomic.CompareAndSwapUint32(&job.sourceSet, 0, 1); !ok { return nil, ErrSourceAlreadyAttached } + src := newJobSourceAttachment(length, r) + job.source = src close(job.subC.sourceAttached) return src.done, nil } diff --git a/importer/job_source.go b/importer/job_source.go new file mode 100644 index 0000000..d2559d8 --- /dev/null +++ b/importer/job_source.go @@ -0,0 +1,89 @@ +// +// tank +// +// Import and Playlist Daemon for autoradio project +// +// +// Copyright (C) 2017-2018 Christian Pointner <equinox@helsinki.at> +// +// This file is part of tank. +// +// tank is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// any later version. +// +// tank is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with tank. If not, see <http://www.gnu.org/licenses/>. +// + +package importer + +import ( + "io" + "time" +) + +//******* null + +type JobSourceNull uint64 + +func newJobSourceNull(len uint64) *JobSourceNull { + src := JobSourceNull(len) + return &src +} + +func (src *JobSourceNull) Len() uint64 { + return uint64(*src) +} + +func (src *JobSourceNull) Read(p []byte) (n int, err error) { + l := len(p) + if *src > JobSourceNull(l) { + *src = *src - JobSourceNull(l) + time.Sleep(20 * time.Millisecond) + return l, nil + } + l = int(*src) + *src = 0 + return l, io.EOF +} + +func (src *JobSourceNull) Done(result *JobSourceResult) { + return +} + +//******* Attachment + +type JobSourceAttachment struct { + len uint64 + r io.Reader + done chan *JobSourceResult +} + +func newJobSourceAttachment(len uint64, r io.Reader) *JobSourceAttachment { + src := &JobSourceAttachment{len: len, r: r} + src.done = make(chan *JobSourceResult, 1) + return src +} + +func (src *JobSourceAttachment) Len() uint64 { + return src.len +} + +func (src *JobSourceAttachment) Read(p []byte) (n int, err error) { + return src.r.Read(p) +} + +func (src *JobSourceAttachment) Done(result *JobSourceResult) { + if result == nil { + close(src.done) + return + } + src.done <- result +} diff --git a/importer/types.go b/importer/types.go index 2b78d46..ecb4dbc 100644 --- a/importer/types.go +++ b/importer/types.go @@ -39,6 +39,7 @@ const ( DefaultBacklog = 100 SourceSchemeAttachment = "attachment" + SourceSchemeFlowJS = "flowjs" ) //******* Errors @@ -98,10 +99,10 @@ func (r *JobSourceResult) Error() string { return r.Err.Error() } -type JobSource struct { - len uint64 - r io.Reader - done chan *JobSourceResult +type JobSource interface { + Len() uint64 + io.Reader + Done(*JobSourceResult) } //******* State -- GitLab