Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 24 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,19 @@ import (
)

// IsPrime returns true if n is prime, false otherwise.
func IsPrime(n int64) bool {
func IsPrime(ctx context.Context, n int64) bool {
if n < 2 {
return false
}
for p := int64(2); p*p <= n; p++ {
// Check for cancellation periodically (every 10000 iterations)
if p%10000 == 0 {
select {
case <-ctx.Done():
return false
default:
}
}
if n%p == 0 {
return false
}
Expand All @@ -54,13 +62,18 @@ func IsPrime(n int64) bool {

func main() {
wp := workerpool.New(runtime.NumCPU())
// Defer Close to ensure cleanup on early return (e.g., errors during Submit).
// Close sends cancellation to running tasks and waits for them to complete.
// It's safe to call Close multiple times; subsequent calls return ErrClosed.
defer func() { _ = wp.Close() }()

for i, n := 0, int64(1_000_000_000_000_000_000); i < 100; i, n = i+1, n+1 {
id := fmt.Sprintf("task #%d", i)
// Use Submit to submit tasks for processing. Submit blocks when no
// worker is available to pick up the task.
err := wp.Submit(id, func(_ context.Context) error {
err := wp.Submit(id, func(ctx context.Context) error {
fmt.Println("isprime", n)
if IsPrime(n) {
if IsPrime(ctx, n) {
fmt.Println(n, "is prime!")
}
return nil
Expand Down Expand Up @@ -90,7 +103,8 @@ func main() {
}
}

// Close should be called once the worker pool is no longer necessary.
// Close is called here explicitly to check for errors. The deferred Close
// will also run but returns ErrClosed (which we can ignore on defer).
if err := wp.Close(); err != nil {
fmt.Fprintln(os.Stderr, err)
}
Expand All @@ -111,7 +125,6 @@ package main
import (
"context"
"fmt"
"log"
"os"
"runtime"

Expand All @@ -126,24 +139,26 @@ func main() {
fmt.Printf("task %s completed in %s\n", r, r.Duration())
}
}))
defer func() { _ = wp.Close() }()

for i, n := 0, int64(1_000_000_000_000_000_000); i < 100; i, n = i+1, n+1 {
id := fmt.Sprintf("task #%d", i)
err := wp.Submit(id, func(_ context.Context) error {
if IsPrime(n) {
err := wp.Submit(id, func(ctx context.Context) error {
if IsPrime(ctx, n) {
fmt.Println(n, "is prime!")
}
return nil
})
if err != nil {
log.Fatal(err)
fmt.Fprintln(os.Stderr, err)
return
}
}

// Close waits for all in-flight tasks to complete before returning,
// ensuring all callback invocations have finished.
if err := wp.Close(); err != nil {
log.Fatal(err)
fmt.Fprintln(os.Stderr, err)
}
}
```
38 changes: 27 additions & 11 deletions example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,26 @@ package workerpool_test
import (
"context"
"fmt"
"log"
"os"
"runtime"

"github.com/cilium/workerpool"
)

// IsPrime returns true if n is prime, false otherwise.
func IsPrime(n int64) bool {
func IsPrime(ctx context.Context, n int64) bool {
if n < 2 {
return false
}
for p := int64(2); p*p <= n; p++ {
// Check for cancellation periodically (every 10000 iterations)
if p%10000 == 0 {
select {
case <-ctx.Done():
return false
default:
}
}
if n%p == 0 {
return false
}
Expand All @@ -28,19 +35,25 @@ func IsPrime(n int64) bool {

func Example() {
wp := workerpool.New(runtime.NumCPU())
// Defer Close to ensure cleanup on early return (e.g., errors during Submit).
// Close sends cancellation to running tasks and waits for them to complete.
// It's safe to call Close multiple times; subsequent calls return ErrClosed.
defer func() { _ = wp.Close() }()

for i, n := 0, int64(1_000_000_000_000_000_000); i < 100; i, n = i+1, n+1 {
id := fmt.Sprintf("task #%d", i)
// Use Submit to submit tasks for processing. Submit blocks when no
// worker is available to pick up the task.
err := wp.Submit(id, func(_ context.Context) error {
err := wp.Submit(id, func(ctx context.Context) error {
fmt.Println("isprime", n)
if IsPrime(n) {
if IsPrime(ctx, n) {
fmt.Println(n, "is prime!")
}
return nil
})
// Submit fails when the pool is closed (ErrClosed) or being drained
// (ErrDrained). Check for the error when appropriate.
// Submit fails when the pool is closed (ErrClosed), being drained
// (ErrDraining), or the parent context is done (context.Canceled).
// Check for the error when appropriate.
if err != nil {
fmt.Fprintln(os.Stderr, err)
return
Expand All @@ -63,7 +76,8 @@ func Example() {
}
}

// Close should be called once the worker pool is no longer necessary.
// Close is called here explicitly to check for errors. The deferred Close
// will also run but returns ErrClosed (which we can ignore on defer).
if err := wp.Close(); err != nil {
fmt.Fprintln(os.Stderr, err)
}
Expand All @@ -77,23 +91,25 @@ func ExampleWithResultCallback() {
fmt.Printf("task %s completed in %s\n", r, r.Duration())
}
}))
defer func() { _ = wp.Close() }()

for i, n := 0, int64(1_000_000_000_000_000_000); i < 100; i, n = i+1, n+1 {
id := fmt.Sprintf("task #%d", i)
err := wp.Submit(id, func(_ context.Context) error {
if IsPrime(n) {
err := wp.Submit(id, func(ctx context.Context) error {
if IsPrime(ctx, n) {
fmt.Println(n, "is prime!")
}
return nil
})
if err != nil {
log.Fatal(err)
fmt.Fprintln(os.Stderr, err)
return
}
}

// Close waits for all in-flight tasks to complete before returning,
// ensuring all callback invocations have finished.
if err := wp.Close(); err != nil {
log.Fatal(err)
fmt.Fprintln(os.Stderr, err)
}
}
77 changes: 54 additions & 23 deletions workerpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,13 @@ var (
type Option func(*WorkerPool)

// WithResultCallback registers fn to be called each time a task completes.
// When a callback is set, results are not accumulated internally and Drain
// returns ErrCallbackSet. The callback may be invoked concurrently from
// multiple goroutines; fn must be safe for concurrent use.
//
// When a callback is set, results are NOT accumulated internally. This means:
// - Drain() will return ErrCallbackSet instead of collecting results
Comment thread
rolinh marked this conversation as resolved.
Outdated
// - Results are processed immediately upon completion, avoiding memory buildup
// - The callback fn may be invoked concurrently from multiple goroutines
//
// The callback fn must be safe for concurrent use.
// WithResultCallback panics if fn is nil.
func WithResultCallback(fn func(Result)) Option {
// TODO(v2): New/NewWithContext should return an error so that option
Expand Down Expand Up @@ -111,18 +115,20 @@ func (wp *WorkerPool) Len() int {
}

// Submit submits f for processing by a worker. The given id is useful for
// identifying the task once it is completed. The task f must return when the
// context ctx is cancelled. The context passed to task f is cancelled when
// Close is called or when the parent context passed to NewWithContext is done.
// identifying the task once it is completed.
//
// Submit blocks until a routine start processing the task.
// The task function f receives a context that is cancelled when Close is called
// or when the parent context passed to NewWithContext is done. Tasks MUST respect
// context cancellation and return promptly when ctx.Done() is signaled. Tasks that
// ignore cancellation will cause Close to block indefinitely waiting for them to
// complete. Use context-aware operations (e.g., select with ctx.Done()) to ensure
// timely shutdown.
//
// If a drain operation is in progress, ErrDraining is returned and the task
// is not submitted for processing.
// If the worker pool is closed, ErrClosed is returned and the task is not
// submitted for processing.
// If the parent context is done, context.Canceled is returned and the task is
// not submitted for processing.
// Submit blocks until a routine starts processing the task.
//
// ErrDraining is returned if a drain operation is in progress.
// ErrClosed is returned if the worker pool is closed.
Comment thread
rolinh marked this conversation as resolved.
Outdated
// context.Canceled is returned if the parent context is done.
func (wp *WorkerPool) Submit(id string, f func(ctx context.Context) error) error {
wp.mu.Lock()
if wp.closed {
Expand Down Expand Up @@ -151,9 +157,18 @@ func (wp *WorkerPool) Submit(id string, f func(ctx context.Context) error) error
// Drain waits until all tasks are completed. This operation prevents
// submitting new tasks to the worker pool. Drain returns the results of the
// tasks that have been processed.
// If a drain operation is already in progress, ErrDraining is returned.
// If the worker pool is closed, ErrClosed is returned.
// If a result callback is set via WithResultCallback, ErrCallbackSet is returned.
//
// Drain is incompatible with the WithResultCallback option. When a result
// callback is configured, results are processed immediately upon completion
// rather than being accumulated, so Drain returns ErrCallbackSet.
Comment thread
rolinh marked this conversation as resolved.
Outdated
//
// Unlike Close, Drain does not cancel task contexts. Tasks run to completion
Comment thread
rolinh marked this conversation as resolved.
Outdated
// naturally. After Drain, the pool can be closed with Close (which will not
// cancel any tasks since none are running) or more tasks can be submitted.
//
// ErrCallbackSet is returned if the WithResultCallback option is used.
// ErrDraining is returned if a drain operation is already in progress.
// ErrClosed is returned if the worker pool is closed.
Comment thread
rolinh marked this conversation as resolved.
Outdated
func (wp *WorkerPool) Drain() ([]Task, error) {
wp.mu.Lock()
if wp.closed {
Expand All @@ -174,9 +189,13 @@ func (wp *WorkerPool) Drain() ([]Task, error) {

wp.wg.Wait()

// NOTE: It's not necessary to hold a lock when reading or writing
// wp.results as no other routine is running at this point besides the
// "run" routine which should be waiting on the tasks channel.
// NOTE: No lock is needed here due to the following synchronization:
// 1. Only the single run() goroutine writes to wp.results.
// 2. run() appends each result BEFORE spawning its worker goroutine.
// 3. Each worker calls wg.Done() upon completion.
// 4. wg.Wait() above ensures all workers (and thus all appends) completed.
// 5. run() is now blocked waiting for tasks on the channel.
// Therefore, no concurrent access to wp.results is possible here.
res := wp.results
wp.results = nil

Expand All @@ -188,11 +207,19 @@ func (wp *WorkerPool) Drain() ([]Task, error) {
}

// Close closes the worker pool, rendering it unable to process new tasks.
// Close sends the cancellation signal to any running task and waits for all
// workers, if any, to return. When a result callback is set via
// WithResultCallback, all callback invocations are guaranteed to have completed
// Close sends the cancellation signal to any running task via context cancellation
// and waits indefinitely for all workers to return. If tasks do not respect context
// cancellation, Close will block until they complete. When a result callback is set
// via WithResultCallback, all callback invocations are guaranteed to have completed
// before Close returns.
// Close will return ErrClosed if it has already been called.
//
// Close will return ErrClosed if it has already been called. This makes it safe
// to use with defer immediately after creating the pool (for cleanup on early
// returns) while still calling Close explicitly to check for errors.
//
// Note: Close cancels running tasks via context, while Drain waits for tasks to
// complete without cancellation. If you want tasks to finish naturally, call
// Drain before Close.
func (wp *WorkerPool) Close() error {
wp.mu.Lock()
if wp.closed {
Expand All @@ -216,6 +243,10 @@ func (wp *WorkerPool) Close() error {

// run loops over the tasks channel and starts processing routines. It should
// only be called once during the lifetime of a WorkerPool.
// This is the sole goroutine that writes to wp.results, making it safe to
// append without a lock. The append happens before spawning each worker,
// establishing a happens-before relationship that ensures Drain() can safely
// read wp.results after wg.Wait() completes.
func (wp *WorkerPool) run(ctx context.Context) {
for t := range wp.tasks {
result := taskResult{id: t.id}
Expand Down
Loading