Skip to content
Snippets Groups Projects
Commit 9a6cf300 authored by Christian Pointner's avatar Christian Pointner
Browse files

added some more job sources

parent c92deb30
No related branches found
No related tags found
No related merge requests found
...@@ -27,27 +27,10 @@ package importer ...@@ -27,27 +27,10 @@ package importer
import ( import (
"encoding/base64" "encoding/base64"
"io" "io"
"sync/atomic"
"golang.org/x/crypto/blake2b" "golang.org/x/crypto/blake2b"
) )
func (job *Job) prepareSource() {
switch job.Source.Scheme {
case SourceSchemeAttachment:
// the source will be attached using AttachSource()
return
// TODO: implement other sources
default:
// simulate a 10 MB file...
atomic.StoreUint32(&job.sourceSet, 1)
job.source = newJobSourceNull(10 * 1024 * 1024)
close(job.subC.sourceAttached)
}
}
type copyResult struct { type copyResult struct {
err error err error
hash string hash string
......
...@@ -54,7 +54,8 @@ func (im *Importer) CreateJob(group string, id uint64, src url.URL, user, refID ...@@ -54,7 +54,8 @@ func (im *Importer) CreateJob(group string, id uint64, src url.URL, user, refID
// TODO: update this list once we implemented other sources // TODO: update this list once we implemented other sources
switch src.Scheme { switch src.Scheme {
case SourceSchemeAttachment: case SourceSchemeAttachment:
// case SourceSchemeFlowJS: case SourceSchemeFlowJS:
case SourceSchemeFake:
default: default:
return nil, ErrSourceNotSupported return nil, ErrSourceNotSupported
} }
......
...@@ -71,8 +71,6 @@ func (job *Job) run() error { ...@@ -71,8 +71,6 @@ func (job *Job) run() error {
defer job.cleanup() defer job.cleanup()
defer close(job.subC.done) defer close(job.subC.done)
job.prepareSource()
if !atomic.CompareAndSwapUint32((*uint32)(&job.State), uint32(JobPending), uint32(JobRunning)) { if !atomic.CompareAndSwapUint32((*uint32)(&job.State), uint32(JobPending), uint32(JobRunning)) {
job.im.store.UpdateFileImportState(job.Group, job.ID, store.ImportAborted) job.im.store.UpdateFileImportState(job.Group, job.ID, store.ImportAborted)
// the job was canceled before job.Start() could initialize the context and because of this // the job was canceled before job.Start() could initialize the context and because of this
...@@ -82,21 +80,22 @@ func (job *Job) run() error { ...@@ -82,21 +80,22 @@ func (job *Job) run() error {
close(job.subC.running) close(job.subC.running)
job.StartedAt.set(time.Now()) job.StartedAt.set(time.Now())
job.im.dbgLog.Printf("running import for %s/%d from: %s", job.Group, job.ID, job.Source.String()) job.im.dbgLog.Printf("running import for %s/%d from: %s", job.Group, job.ID, job.Source.String())
job.im.store.UpdateFileImportState(job.Group, job.ID, store.ImportRunning) job.im.store.UpdateFileImportState(job.Group, job.ID, store.ImportRunning)
if err := job.prepareSource(); err != nil {
job.im.store.UpdateFileImportState(job.Group, job.ID, store.ImportAborted)
return err
}
loudness, err := job.fetch() loudness, err := job.fetch()
if err != nil { if err != nil {
job.im.store.UpdateFileImportState(job.Group, job.ID, store.ImportAborted) job.im.store.UpdateFileImportState(job.Group, job.ID, store.ImportAborted)
// send result to all done subscriptions
return err return err
} }
if err = job.normalize(loudness); err != nil { if err = job.normalize(loudness); err != nil {
job.im.store.UpdateFileImportState(job.Group, job.ID, store.ImportAborted) job.im.store.UpdateFileImportState(job.Group, job.ID, store.ImportAborted)
// send result to all done subscriptions
return err return err
} }
job.im.store.UpdateFileImportState(job.Group, job.ID, store.ImportDone) job.im.store.UpdateFileImportState(job.Group, job.ID, store.ImportDone)
// send result to all done subscriptions
return err return err
} }
...@@ -159,6 +158,10 @@ func (job *Job) AttachSource(length uint64, r io.Reader) (<-chan *JobSourceResul ...@@ -159,6 +158,10 @@ func (job *Job) AttachSource(length uint64, r io.Reader) (<-chan *JobSourceResul
return nil, ErrImportNotRunning return nil, ErrImportNotRunning
} }
// only allow to attach external sources if the job's source was in fact an attachment URL
if job.Source.Scheme != SourceSchemeAttachment {
return nil, ErrSourceAlreadyAttached
}
if ok := atomic.CompareAndSwapUint32(&job.sourceSet, 0, 1); !ok { if ok := atomic.CompareAndSwapUint32(&job.sourceSet, 0, 1); !ok {
return nil, ErrSourceAlreadyAttached return nil, ErrSourceAlreadyAttached
} }
......
...@@ -26,27 +26,53 @@ package importer ...@@ -26,27 +26,53 @@ package importer
import ( import (
"io" "io"
"sync/atomic"
"time" "time"
) )
//******* null func (job *Job) prepareSource() error {
if job.Source.Scheme == SourceSchemeAttachment {
// the source will be attached using AttachSource()
return nil
}
atomic.StoreUint32(&job.sourceSet, 1)
switch job.Source.Scheme {
case SourceSchemeFlowJS:
job.source = newJobSourceFlowJS(job.Source)
type JobSourceNull uint64 // TODO: implement other sources
func newJobSourceNull(len uint64) *JobSourceNull { case SourceSchemeFake:
src := JobSourceNull(len) job.source = newJobSourceFake(job.Source)
default:
return ErrSourceNotSupported
}
close(job.subC.sourceAttached)
return nil
}
//******* fake
type JobSourceFake uint64
func newJobSourceFake(srcURL SourceURL) *JobSourceFake {
// TODO: parse size from srcURL
src := JobSourceFake(10 * 1024 * 1024) // simulate a 10 MB file...
return &src return &src
} }
func (src *JobSourceNull) Len() uint64 { func (src *JobSourceFake) Len() uint64 {
return uint64(*src) return uint64(*src)
} }
func (src *JobSourceNull) Read(p []byte) (n int, err error) { func (src *JobSourceFake) Read(p []byte) (n int, err error) {
l := len(p) l := len(p)
if *src > JobSourceNull(l) { if *src > JobSourceFake(l) {
*src = *src - JobSourceNull(l) *src = *src - JobSourceFake(l)
time.Sleep(20 * time.Millisecond) time.Sleep(200 * time.Millisecond)
return l, nil return l, nil
} }
l = int(*src) l = int(*src)
...@@ -54,7 +80,7 @@ func (src *JobSourceNull) Read(p []byte) (n int, err error) { ...@@ -54,7 +80,7 @@ func (src *JobSourceNull) Read(p []byte) (n int, err error) {
return l, io.EOF return l, io.EOF
} }
func (src *JobSourceNull) Done(result *JobSourceResult) { func (src *JobSourceFake) Done(result *JobSourceResult) {
return return
} }
...@@ -87,3 +113,26 @@ func (src *JobSourceAttachment) Done(result *JobSourceResult) { ...@@ -87,3 +113,26 @@ func (src *JobSourceAttachment) Done(result *JobSourceResult) {
} }
src.done <- result src.done <- result
} }
//******* flow.js
type JobSourceFlowJS struct {
// TODO: implement this
}
func newJobSourceFlowJS(srcURL SourceURL) *JobSourceFlowJS {
src := &JobSourceFlowJS{}
return src
}
func (src *JobSourceFlowJS) Len() uint64 {
return 0
}
func (src *JobSourceFlowJS) Read(p []byte) (n int, err error) {
return 0, ErrNotImplemented
}
func (src *JobSourceFlowJS) Done(result *JobSourceResult) {
return
}
...@@ -40,6 +40,7 @@ const ( ...@@ -40,6 +40,7 @@ const (
SourceSchemeAttachment = "attachment" SourceSchemeAttachment = "attachment"
SourceSchemeFlowJS = "flowjs" SourceSchemeFlowJS = "flowjs"
SourceSchemeFake = "fake"
) )
//******* Errors //******* Errors
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment