fix: add group runner's timeout and make some channels buffered.

This commit is contained in:
Juan Pablo Villafáñez
2024-04-29 11:11:04 +02:00
parent 08c47632f0
commit 05f684a537
5 changed files with 229 additions and 52 deletions

View File

@@ -3,6 +3,8 @@ package runner
import (
"context"
"sync"
"sync/atomic"
"time"
)
// GroupRunner represent a group of tasks that need to run together.
@@ -17,21 +19,44 @@ import (
// providing a piece of functionality, however, if any of them fails, the
// feature provided by them would be incomplete or broken.
//
// The interrupt duration for the group can be set through the
// `WithInterruptDuration` option. If the option isn't supplied, the default
// value (15 secs) will be used.
//
// It's recommended that the timeouts are handled by each runner individually,
// meaning that each runner's timeout should be less than the group runner's
// timeout. This way, we can know which runner timed out.
// If the group timeout is reached, the remaining results will have the
// runner's id as "_unknown_".
//
// Note that, as services, the task aren't expected to stop by default.
// This means that, if a task finishes naturally, the rest of the task will
// asked to stop as well.
type GroupRunner struct {
runners sync.Map
runnersCount int
isRunning bool
runningMutex sync.Mutex
runners sync.Map
runnersCount int
isRunning bool
interruptDur time.Duration
interrupted atomic.Bool
interruptedCh chan time.Duration
runningMutex sync.Mutex
}
// NewGroup will create a GroupRunner
func NewGroup() *GroupRunner {
func NewGroup(opts ...Option) *GroupRunner {
options := Options{
InterruptDuration: DefaultGroupInterruptDuration,
}
for _, o := range opts {
o(&options)
}
return &GroupRunner{
runners: sync.Map{},
runningMutex: sync.Mutex{},
runners: sync.Map{},
runningMutex: sync.Mutex{},
interruptDur: options.InterruptDuration,
interruptedCh: make(chan time.Duration, 1),
}
}
@@ -85,7 +110,7 @@ func (gr *GroupRunner) Run(ctx context.Context) []*Result {
gr.isRunning = true
gr.runningMutex.Unlock()
results := make(map[string]*Result)
results := make([]*Result, 0, gr.runnersCount)
ch := make(chan *Result, gr.runnersCount) // no need to block writing results
gr.runners.Range(func(_, value any) bool {
@@ -94,45 +119,46 @@ func (gr *GroupRunner) Run(ctx context.Context) []*Result {
return true
})
var d time.Duration
// wait for a result or for the context to be done
select {
case result := <-ch:
results[result.RunnerID] = result
results = append(results, result)
case d = <-gr.interruptedCh:
results = append(results, &Result{
RunnerID: "_unknown_",
RunnerError: NewGroupTimeoutError(d),
})
case <-ctx.Done():
// Do nothing
}
// interrupt the rest of the runners
gr.runners.Range(func(_, value any) bool {
r := value.(*Runner)
if _, ok := results[r.ID]; !ok {
select {
case <-r.Finished():
// No data should be sent through the channel, so we'd be
// here only if the channel is closed. This means the task
// has finished and we don't need to interrupt. We do
// nothing in this case
default:
r.Interrupt()
}
}
return true
})
gr.Interrupt()
// Having notified that the context has been finished, we still need to
// wait for the rest of the results
for i := len(results); i < gr.runnersCount; i++ {
result := <-ch
results[result.RunnerID] = result
select {
case result := <-ch:
results = append(results, result)
case d2, ok := <-gr.interruptedCh:
if ok {
d = d2
}
results = append(results, &Result{
RunnerID: "_unknown_",
RunnerError: NewGroupTimeoutError(d),
})
}
}
close(ch)
values := make([]*Result, 0, gr.runnersCount)
for _, val := range results {
values = append(values, val)
}
return values
// Even if we reach the group time out and bail out early, tasks might
// be running and eventually deliver the result through the channel.
// We'll rely on the buffered channel so the tasks won't block and the
// data can be eventually garbage-collected along with the unused
// channel, so we won't close the channel here.
return results
}
// RunAsync will execute the tasks in the group asynchronously.
@@ -158,12 +184,38 @@ func (gr *GroupRunner) RunAsync(ch chan<- *Result) {
})
go func() {
result := <-interCh
var result *Result
var d time.Duration
select {
case result = <-interCh:
// result already assigned, so do nothing
case d = <-gr.interruptedCh:
// we aren't tracking which runners have finished and which are still
// running, so we'll use "_unknown_" as runner id
result = &Result{
RunnerID: "_unknown_",
RunnerError: NewGroupTimeoutError(d),
}
}
gr.Interrupt()
ch <- result
for i := 1; i < gr.runnersCount; i++ {
result = <-interCh
select {
case result = <-interCh:
// result already assigned, so do nothing
case d2, ok := <-gr.interruptedCh:
// if ok is true, d2 will have a good value; if false, the channel
// is closed and we get a default value
if ok {
d = d2
}
result = &Result{
RunnerID: "_unknown_",
RunnerError: NewGroupTimeoutError(d),
}
}
ch <- result
}
}()
@@ -179,14 +231,32 @@ func (gr *GroupRunner) RunAsync(ch chan<- *Result) {
// As said, this will affect ALL the tasks in the group. It isn't possible to
// try to stop just one task.
// If a task has finished, the corresponding stopper won't be called
//
// The interrupt timeout for the group will start after all the runners in the
// group have been notified. Note that, if the task's stopper for a runner
// takes a lot of time to return, it will delay the timeout's start, so it's
// advised that the stopper either returns fast or is run asynchronously.
func (gr *GroupRunner) Interrupt() {
gr.runners.Range(func(_, value any) bool {
r := value.(*Runner)
select {
case <-r.Finished():
default:
r.Interrupt()
}
return true
})
if gr.interrupted.CompareAndSwap(false, true) {
gr.runners.Range(func(_, value any) bool {
r := value.(*Runner)
select {
case <-r.Finished():
// No data should be sent through the channel, so we'd be
// here only if the channel is closed. This means the task
// has finished and we don't need to interrupt. We do
// nothing in this case
default:
r.Interrupt()
}
return true
})
_ = time.AfterFunc(gr.interruptDur, func() {
// timeout reached -> send it through the channel so our runner
// can abort
gr.interruptedCh <- gr.interruptDur
close(gr.interruptedCh)
})
}
}

View File

@@ -62,7 +62,7 @@ var _ = Describe("GroupRunner", func() {
)))
task3Ch := make(chan error)
task3 := TimedTask(task3Ch, 15*time.Second)
task3 := TimedTask(task3Ch, 6*time.Second)
Expect(func() {
gr.Add(runner.New("task3", task3, func() {
task3Ch <- nil
@@ -77,7 +77,7 @@ var _ = Describe("GroupRunner", func() {
Expect(func() {
task3Ch := make(chan error)
task3 := TimedTask(task3Ch, 15*time.Second)
task3 := TimedTask(task3Ch, 6*time.Second)
gr.Add(runner.New("task3", task3, func() {
task3Ch <- nil
close(task3Ch)
@@ -89,7 +89,7 @@ var _ = Describe("GroupRunner", func() {
Describe("Run", func() {
It("Context is done", func(ctx SpecContext) {
task3Ch := make(chan error)
task3 := TimedTask(task3Ch, 15*time.Second)
task3 := TimedTask(task3Ch, 6*time.Second)
gr.Add(runner.New("task3", task3, func() {
task3Ch <- nil
close(task3Ch)
@@ -141,6 +141,75 @@ var _ = Describe("GroupRunner", func() {
)))
}, SpecTimeout(5*time.Second))
It("Context done and group timeout reached", func(ctx SpecContext) {
gr := runner.NewGroup(runner.WithInterruptDuration(2 * time.Second))
gr.Add(runner.New("task1", func() error {
time.Sleep(6 * time.Second)
return nil
}, func() {
}))
gr.Add(runner.New("task2", func() error {
time.Sleep(6 * time.Second)
return nil
}, func() {
}))
// context will be done in 1 second
myCtx, cancel := context.WithTimeout(ctx, 1*time.Second)
defer cancel()
// spawn a new goroutine and return the result in the channel
ch2 := make(chan []*runner.Result)
go func(ch2 chan []*runner.Result) {
ch2 <- gr.Run(myCtx)
close(ch2)
}(ch2)
// context finishes in 1 sec, tasks will be interrupted
// group timeout will be reached after 2 extra seconds
Eventually(ctx, ch2).Should(Receive(ContainElements(
&runner.Result{RunnerID: "_unknown_", RunnerError: runner.NewGroupTimeoutError(2 * time.Second)},
&runner.Result{RunnerID: "_unknown_", RunnerError: runner.NewGroupTimeoutError(2 * time.Second)},
)))
}, SpecTimeout(5*time.Second))
It("Interrupted and group timeout reached", func(ctx SpecContext) {
gr := runner.NewGroup(runner.WithInterruptDuration(2 * time.Second))
gr.Add(runner.New("task1", func() error {
time.Sleep(6 * time.Second)
return nil
}, func() {
}))
gr.Add(runner.New("task2", func() error {
time.Sleep(6 * time.Second)
return nil
}, func() {
}))
// context will be done in 10 second
myCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
// spawn a new goroutine and return the result in the channel
ch2 := make(chan []*runner.Result)
go func(ch2 chan []*runner.Result) {
ch2 <- gr.Run(myCtx)
close(ch2)
}(ch2)
gr.Interrupt()
// tasks will be interrupted
// group timeout will be reached after 2 extra seconds
Eventually(ctx, ch2).Should(Receive(ContainElements(
&runner.Result{RunnerID: "_unknown_", RunnerError: runner.NewGroupTimeoutError(2 * time.Second)},
&runner.Result{RunnerID: "_unknown_", RunnerError: runner.NewGroupTimeoutError(2 * time.Second)},
)))
}, SpecTimeout(5*time.Second))
It("Doble run panics", func(ctx SpecContext) {
// context will be done in 1 second
myCtx, cancel := context.WithTimeout(ctx, 1*time.Second)
@@ -181,7 +250,7 @@ var _ = Describe("GroupRunner", func() {
It("Interrupt async", func(ctx SpecContext) {
task3Ch := make(chan error)
task3 := TimedTask(task3Ch, 15*time.Second)
task3 := TimedTask(task3Ch, 6*time.Second)
gr.Add(runner.New("task3", task3, func() {
task3Ch <- nil
close(task3Ch)
@@ -196,5 +265,29 @@ var _ = Describe("GroupRunner", func() {
Eventually(ctx, ch2).Should(Receive())
Eventually(ctx, ch2).Should(Receive())
}, SpecTimeout(5*time.Second))
It("Interrupt async group timeout reached", func(ctx SpecContext) {
gr := runner.NewGroup(runner.WithInterruptDuration(2 * time.Second))
gr.Add(runner.New("task1", func() error {
time.Sleep(6 * time.Second)
return nil
}, func() {
}))
gr.Add(runner.New("task2", func() error {
time.Sleep(6 * time.Second)
return nil
}, func() {
}))
ch2 := make(chan *runner.Result)
gr.RunAsync(ch2)
gr.Interrupt()
// group timeout will be reached after 2 extra seconds
Eventually(ctx, ch2).Should(Receive(Equal(&runner.Result{RunnerID: "_unknown_", RunnerError: runner.NewGroupTimeoutError(2 * time.Second)})))
Eventually(ctx, ch2).Should(Receive(Equal(&runner.Result{RunnerID: "_unknown_", RunnerError: runner.NewGroupTimeoutError(2 * time.Second)})))
}, SpecTimeout(5*time.Second))
})
})

View File

@@ -6,8 +6,11 @@ import (
var (
// DefaultInterruptDuration is the default value for the `WithInterruptDuration`
// This global value can be adjusted if needed.
// for the "regular" runners. This global value can be adjusted if needed.
DefaultInterruptDuration = 10 * time.Second
// DefaultGroupInterruptDuration is the default value for the `WithInterruptDuration`
// for the group runners. This global value can be adjusted if needed.
DefaultGroupInterruptDuration = 15 * time.Second
)
// Option defines a single option function.

View File

@@ -32,7 +32,7 @@ type Runner struct {
//
// The interrupt duration, which can be set through the `WithInterruptDuration`
// option, will be used to ensure the runner doesn't block forever. If the
// option isn't suplied, the default value will be used.
// option isn't supplied, the default value (10 secs) will be used.
// The interrupt duration will be used to start a timeout when the
// runner gets interrupted (either the context of the `Run` method is done
// or this runner's `Interrupt` method is called). If the timeout is reached,
@@ -56,7 +56,7 @@ func New(id string, fn Runable, interrupt Stopper, opts ...Option) *Runner {
interruptDur: options.InterruptDuration,
fn: fn,
interrupt: interrupt,
interruptedCh: make(chan time.Duration),
interruptedCh: make(chan time.Duration, 1),
finished: make(chan struct{}),
}
}
@@ -163,7 +163,7 @@ func (r *Runner) Finished() <-chan struct{} {
// A result will be provided when either the task finishes naturally or we
// reach the timeout after being interrupted
func (r *Runner) doTask(ch chan<- *Result, closeChan bool) {
tmpCh := make(chan *Result)
tmpCh := make(chan *Result, 1)
// spawn the task and return the result in a temporary channel
go func(tmpCh chan *Result) {

View File

@@ -55,6 +55,17 @@ func NewTimeoutError(runnerID string, duration time.Duration) *TimeoutError {
}
}
// NewGroupTimeoutError creates a new timeout error. This is intended to be
// used for group runners when the timeout of the group is reached.
// The runner id will be set to "_unknown_" because we don't know which is
// the id of the missing runner.
func NewGroupTimeoutError(duration time.Duration) *TimeoutError {
return &TimeoutError{
RunnerID: "_unknown_",
Duration: duration,
}
}
// Error generates the message for this particular error.
func (te *TimeoutError) Error() string {
var sb strings.Builder