Skip to content
Snippets Groups Projects
Commit 2b827464 authored by Christian Pointner's avatar Christian Pointner
Browse files

importer: refactor job timeout handling

parent 9c3beceb
No related branches found
No related tags found
No related merge requests found
......@@ -25,10 +25,8 @@
package v1
import (
"context"
"encoding/json"
"net/http"
"time"
"github.com/gin-gonic/gin"
"gitlab.servus.at/autoradio/tank/importer"
......@@ -97,7 +95,7 @@ func (api *API) CreateFileForShow(c *gin.Context) {
goto create_file_response
}
if err = job.Start(context.Background(), 3*time.Hour); err != nil { // TODO: hardcoded value
if err = job.StartWithTimeout(0); err != nil {
goto create_file_response
}
......
......@@ -28,6 +28,7 @@ import (
"errors"
"os"
"strings"
"time"
)
type NormalizerType int
......@@ -69,6 +70,7 @@ func (c *NormalizerType) UnmarshalText(data []byte) (err error) {
}
type Config struct {
JobTimeout time.Duration `json:"job-timeout" yaml:"job-timeout" toml:"job-timeout"`
TempPath string `json:"temp-path" yaml:"temp-path" toml:"temp-path"`
Workers int `json:"workers" yaml:"workers" toml:"workers"`
Backlog uint `json:"backlog" yaml:"backlog" toml:"backlog"`
......
......@@ -98,33 +98,44 @@ func (job *Job) run() error {
return err
}
// Start adds job to the work queue of the importer. If timeout is > 0 it will create a context
// WithTimeout using ctx as its parent or a WithCancel otherwise. Of course you may pass on a context
// that already has a deadline or timeout set. In this case you may set timeout to 0. But since any
// context.Context you pass on requires you to cleanup resources after all work is done (by calling it's
// CancelFunc). It might be easier to use use context.Background() and a the timeout set to a appropiate
// value. The job will then take care of freeing up the resources.
func (job *Job) Start(ctx context.Context, timeout time.Duration) (err error) {
// StartWithTimeout adds job to the work queue of the importer. If timeout is <= 0 the importers
// default job-timeout will be used. Jobs that get started using this function clean up any
// resources when done.
func (job *Job) StartWithTimeout(timeout time.Duration) (err error) {
if timeout <= 0 {
timeout = job.im.conf.JobTimeout
}
return job.start(context.WithTimeout(context.Background(), timeout))
}
// StartWithContext adds job to the work queue of the importer. You should pass on a context
// that has a deadline or timeout set. Canceling the context will abort the import job. You are
// responsible for cleaning up the context resources (using the CancelFunc) after the job has
// completed.
func (job *Job) StartWithContext(ctx context.Context) (err error) {
return job.start(context.WithCancel(ctx))
}
func (job *Job) start(ctx context.Context, cancel context.CancelFunc) (err error) {
if !atomic.CompareAndSwapUint32((*uint32)(&job.State), uint32(JobNew), uint32(JobInitializing)) {
if atomic.LoadUint32((*uint32)(&job.State)) == uint32(JobCanceled) {
// the job has already been canceled and thus should just be removed from the inventory
cancel()
job.cleanup()
return ErrAlreadyCanceled
}
cancel()
return
}
if job.WorkDir, err = ioutil.TempDir(job.im.conf.TempPath, "tank-job-"); err != nil {
cancel()
job.cleanup()
return
}
// shall we use a global context here so we can cancel all jobs at once? i.e. for forced shutdown...
if timeout > 0 {
job.Ctx, job.cancel = context.WithTimeout(ctx, timeout)
} else {
job.Ctx, job.cancel = context.WithCancel(ctx)
}
job.Ctx = ctx
job.cancel = cancel
// from here on we need to take care that job.cancel() will be called in any case so that resources
// allocated by job.Ctx are freed up.
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment