Skip to content
Snippets Groups Projects
Commit c92deb30 authored by Christian Pointner's avatar Christian Pointner
Browse files

importer.JobSource is now an Interface

parent 09a567b2
No related branches found
No related tags found
No related merge requests found
......@@ -28,26 +28,10 @@ import (
"encoding/base64"
"io"
"sync/atomic"
"time"
"unsafe"
"golang.org/x/crypto/blake2b"
)
type devNull uint64
func (d *devNull) Read(p []byte) (n int, err error) {
l := len(p)
if *d > devNull(l) {
*d = *d - devNull(l)
time.Sleep(20 * time.Millisecond)
return l, nil
}
l = int(*d)
*d = 0
return l, io.EOF
}
func (job *Job) prepareSource() {
switch job.Source.Scheme {
case SourceSchemeAttachment:
......@@ -58,11 +42,8 @@ func (job *Job) prepareSource() {
default:
// simulate a 10 MB file...
l := uint64(10 * 1024 * 1024)
r := devNull(l)
src := &JobSource{len: l, r: &r}
src.done = make(chan *JobSourceResult, 1)
atomic.StorePointer((*unsafe.Pointer)(unsafe.Pointer(&job.source)), unsafe.Pointer(src))
atomic.StoreUint32(&job.sourceSet, 1)
job.source = newJobSourceNull(10 * 1024 * 1024)
close(job.subC.sourceAttached)
}
}
......@@ -83,7 +64,7 @@ func (job *Job) fetch() (interface{}, error) {
}
// job.source is now initialized and points to a valid source
// make sure a potentially connected source gets notified in any case
defer close(job.source.done)
defer job.source.Done(nil)
conv, err := job.newFetchConverter()
if err != nil {
......@@ -99,8 +80,8 @@ func (job *Job) fetch() (interface{}, error) {
if err != nil {
panic("creating hash function failed: " + err.Error())
}
src := io.TeeReader(job.source.r, hash)
written, err := io.Copy(&progressWriter{job, StepFetching, job.source.len, 0, conv}, src)
src := io.TeeReader(job.source, hash)
written, err := io.Copy(&progressWriter{job, StepFetching, job.source.Len(), 0, conv}, src)
if err != nil {
done <- copyResult{err, ""}
return
......@@ -117,7 +98,7 @@ func (job *Job) fetch() (interface{}, error) {
conv.Close()
go conv.Wait() // do the zombie reaping in seperate go routine since we are not interested in the result anyway
err = job.ctx.Err()
job.source.done <- &JobSourceResult{Err: err}
job.source.Done(&JobSourceResult{Err: err})
return nil, err
case res = <-done:
}
......@@ -133,6 +114,6 @@ func (job *Job) fetch() (interface{}, error) {
job.im.dbgLog.Println(l.Line)
}
job.Progress.set(StepFetching, 1)
job.source.done <- &JobSourceResult{err, res.hash, convLog}
job.source.Done(&JobSourceResult{err, res.hash, convLog})
return loudness, err
}
......@@ -52,7 +52,10 @@ func (im *Importer) ListJobs(group string) (Jobs, error) {
func (im *Importer) CreateJob(group string, id uint64, src url.URL, user, refID string) (*Job, error) {
// TODO: update this list once we implemented other sources
if src.Scheme != SourceSchemeAttachment {
switch src.Scheme {
case SourceSchemeAttachment:
// case SourceSchemeFlowJS:
default:
return nil, ErrSourceNotSupported
}
......
......@@ -32,7 +32,6 @@ import (
"os"
"sync/atomic"
"time"
"unsafe"
"gitlab.servus.at/autoradio/tank/store"
)
......@@ -51,7 +50,8 @@ type Job struct {
Source SourceURL `json:"source"`
RefID string `json:"ref-id,omitempty"`
Progress JobProgress `json:"progress"`
source *JobSource
source JobSource
sourceSet uint32
workDir string
subC struct {
sourceAttached chan struct{}
......@@ -159,11 +159,11 @@ func (job *Job) AttachSource(length uint64, r io.Reader) (<-chan *JobSourceResul
return nil, ErrImportNotRunning
}
src := &JobSource{len: length, r: r}
src.done = make(chan *JobSourceResult, 1)
if ok := atomic.CompareAndSwapPointer((*unsafe.Pointer)(unsafe.Pointer(&job.source)), unsafe.Pointer(nil), unsafe.Pointer(src)); !ok {
if ok := atomic.CompareAndSwapUint32(&job.sourceSet, 0, 1); !ok {
return nil, ErrSourceAlreadyAttached
}
src := newJobSourceAttachment(length, r)
job.source = src
close(job.subC.sourceAttached)
return src.done, nil
}
......
//
// tank
//
// Import and Playlist Daemon for autoradio project
//
//
// Copyright (C) 2017-2018 Christian Pointner <equinox@helsinki.at>
//
// This file is part of tank.
//
// tank is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// any later version.
//
// tank is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with tank. If not, see <http://www.gnu.org/licenses/>.
//
package importer
import (
"io"
"time"
)
//******* null
type JobSourceNull uint64
func newJobSourceNull(len uint64) *JobSourceNull {
src := JobSourceNull(len)
return &src
}
func (src *JobSourceNull) Len() uint64 {
return uint64(*src)
}
func (src *JobSourceNull) Read(p []byte) (n int, err error) {
l := len(p)
if *src > JobSourceNull(l) {
*src = *src - JobSourceNull(l)
time.Sleep(20 * time.Millisecond)
return l, nil
}
l = int(*src)
*src = 0
return l, io.EOF
}
func (src *JobSourceNull) Done(result *JobSourceResult) {
return
}
//******* Attachment
type JobSourceAttachment struct {
len uint64
r io.Reader
done chan *JobSourceResult
}
func newJobSourceAttachment(len uint64, r io.Reader) *JobSourceAttachment {
src := &JobSourceAttachment{len: len, r: r}
src.done = make(chan *JobSourceResult, 1)
return src
}
func (src *JobSourceAttachment) Len() uint64 {
return src.len
}
func (src *JobSourceAttachment) Read(p []byte) (n int, err error) {
return src.r.Read(p)
}
func (src *JobSourceAttachment) Done(result *JobSourceResult) {
if result == nil {
close(src.done)
return
}
src.done <- result
}
......@@ -39,6 +39,7 @@ const (
DefaultBacklog = 100
SourceSchemeAttachment = "attachment"
SourceSchemeFlowJS = "flowjs"
)
//******* Errors
......@@ -98,10 +99,10 @@ func (r *JobSourceResult) Error() string {
return r.Err.Error()
}
type JobSource struct {
len uint64
r io.Reader
done chan *JobSourceResult
type JobSource interface {
Len() uint64
io.Reader
Done(*JobSourceResult)
}
//******* State
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment