diff --git a/.golangci.yml b/.golangci.yml index b8b7a3f..06c746a 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -29,6 +29,7 @@ linters: - gocognit - goconst - gocritic + - godoclint - godot - goheader - goprintffuncname @@ -68,6 +69,8 @@ linters: - usetesting - wastedassign settings: + godoclint: + default: all goheader: template: |- SPDX-License-Identifier: Apache-2.0 diff --git a/README.md b/README.md index 58b1694..5a0d918 100644 --- a/README.md +++ b/README.md @@ -40,11 +40,19 @@ import ( ) // IsPrime returns true if n is prime, false otherwise. -func IsPrime(n int64) bool { +func IsPrime(ctx context.Context, n int64) bool { if n < 2 { return false } for p := int64(2); p*p <= n; p++ { + // Check for cancellation periodically (every 10000 iterations) + if p%10000 == 0 { + select { + case <-ctx.Done(): + return false + default: + } + } if n%p == 0 { return false } @@ -54,13 +62,18 @@ func IsPrime(n int64) bool { func main() { wp := workerpool.New(runtime.NumCPU()) + // Defer Close to ensure cleanup on early return (e.g., errors during Submit). + // Close sends cancellation to running tasks and waits for them to complete. + // It's safe to call Close multiple times; subsequent calls return ErrClosed. + defer func() { _ = wp.Close() }() + for i, n := 0, int64(1_000_000_000_000_000_000); i < 100; i, n = i+1, n+1 { id := fmt.Sprintf("task #%d", i) // Use Submit to submit tasks for processing. Submit blocks when no // worker is available to pick up the task. - err := wp.Submit(id, func(_ context.Context) error { + err := wp.Submit(id, func(ctx context.Context) error { fmt.Println("isprime", n) - if IsPrime(n) { + if IsPrime(ctx, n) { fmt.Println(n, "is prime!") } return nil @@ -90,7 +103,8 @@ func main() { } } - // Close should be called once the worker pool is no longer necessary. + // Close is called here explicitly to check for errors. The deferred Close + // will also run but returns ErrClosed (which we can ignore on defer). if err := wp.Close(); err != nil { fmt.Fprintln(os.Stderr, err) } @@ -111,7 +125,6 @@ package main import ( "context" "fmt" - "log" "os" "runtime" @@ -126,24 +139,26 @@ func main() { fmt.Printf("task %s completed in %s\n", r, r.Duration()) } })) + defer func() { _ = wp.Close() }() for i, n := 0, int64(1_000_000_000_000_000_000); i < 100; i, n = i+1, n+1 { id := fmt.Sprintf("task #%d", i) - err := wp.Submit(id, func(_ context.Context) error { - if IsPrime(n) { + err := wp.Submit(id, func(ctx context.Context) error { + if IsPrime(ctx, n) { fmt.Println(n, "is prime!") } return nil }) if err != nil { - log.Fatal(err) + fmt.Fprintln(os.Stderr, err) + return } } // Close waits for all in-flight tasks to complete before returning, // ensuring all callback invocations have finished. if err := wp.Close(); err != nil { - log.Fatal(err) + fmt.Fprintln(os.Stderr, err) } } ``` diff --git a/example_test.go b/example_test.go index d33b26c..2eb22f2 100644 --- a/example_test.go +++ b/example_test.go @@ -6,7 +6,6 @@ package workerpool_test import ( "context" "fmt" - "log" "os" "runtime" @@ -14,11 +13,19 @@ import ( ) // IsPrime returns true if n is prime, false otherwise. -func IsPrime(n int64) bool { +func IsPrime(ctx context.Context, n int64) bool { if n < 2 { return false } for p := int64(2); p*p <= n; p++ { + // Check for cancellation periodically (every 10000 iterations) + if p%10000 == 0 { + select { + case <-ctx.Done(): + return false + default: + } + } if n%p == 0 { return false } @@ -26,21 +33,28 @@ func IsPrime(n int64) bool { return true } +// Example demonstrates basic usage of a worker pool with Drain and Close. func Example() { wp := workerpool.New(runtime.NumCPU()) + // Defer Close to ensure cleanup on early return (e.g., errors during Submit). + // Close sends cancellation to running tasks and waits for them to complete. + // It's safe to call Close multiple times; subsequent calls return [ErrClosed]. + defer func() { _ = wp.Close() }() + for i, n := 0, int64(1_000_000_000_000_000_000); i < 100; i, n = i+1, n+1 { id := fmt.Sprintf("task #%d", i) // Use Submit to submit tasks for processing. Submit blocks when no // worker is available to pick up the task. - err := wp.Submit(id, func(_ context.Context) error { + err := wp.Submit(id, func(ctx context.Context) error { fmt.Println("isprime", n) - if IsPrime(n) { + if IsPrime(ctx, n) { fmt.Println(n, "is prime!") } return nil }) - // Submit fails when the pool is closed (ErrClosed) or being drained - // (ErrDrained). Check for the error when appropriate. + // Submit fails when the pool is closed ([ErrClosed]), being drained + // ([ErrDraining]), or the parent context is done ([context.Canceled]). + // Check for the error when appropriate. if err != nil { fmt.Fprintln(os.Stderr, err) return @@ -63,12 +77,15 @@ func Example() { } } - // Close should be called once the worker pool is no longer necessary. + // Close is called here explicitly to check for errors. The deferred Close + // will also run but returns [ErrClosed] (which we can ignore on defer). if err := wp.Close(); err != nil { fmt.Fprintln(os.Stderr, err) } } +// ExampleWithResultCallback demonstrates using a result callback to process +// task results immediately without accumulation. func ExampleWithResultCallback() { wp := workerpool.New(runtime.NumCPU(), workerpool.WithResultCallback(func(r workerpool.Result) { if err := r.Err(); err != nil { @@ -77,23 +94,25 @@ func ExampleWithResultCallback() { fmt.Printf("task %s completed in %s\n", r, r.Duration()) } })) + defer func() { _ = wp.Close() }() for i, n := 0, int64(1_000_000_000_000_000_000); i < 100; i, n = i+1, n+1 { id := fmt.Sprintf("task #%d", i) - err := wp.Submit(id, func(_ context.Context) error { - if IsPrime(n) { + err := wp.Submit(id, func(ctx context.Context) error { + if IsPrime(ctx, n) { fmt.Println(n, "is prime!") } return nil }) if err != nil { - log.Fatal(err) + fmt.Fprintln(os.Stderr, err) + return } } // Close waits for all in-flight tasks to complete before returning, // ensuring all callback invocations have finished. if err := wp.Close(); err != nil { - log.Fatal(err) + fmt.Fprintln(os.Stderr, err) } } diff --git a/task.go b/task.go index c7a3207..dd4d147 100644 --- a/task.go +++ b/task.go @@ -19,7 +19,7 @@ type Task interface { } // Result is a completed Task that also reports its execution duration. -// It is passed to the callback registered with WithResultCallback. +// It is passed to the callback registered with [WithResultCallback]. type Result interface { Task // Duration returns the time taken to execute the task. @@ -40,7 +40,7 @@ type taskResult struct { // Ensure that taskResult implements the Result interface. var _ Result = &taskResult{} -// String implements fmt.Stringer for taskResult. +// String implements [fmt.Stringer] for taskResult. func (t *taskResult) String() string { return t.id } diff --git a/workerpool.go b/workerpool.go index 91080b1..9ea8271 100644 --- a/workerpool.go +++ b/workerpool.go @@ -15,7 +15,7 @@ // limited, task results are not and they accumulate until they are collected. // Therefore, if a large number of tasks can be expected, the workerpool should // be periodically drained (e.g. every 10k tasks). -// Alternatively, use WithResultCallback to process results as they complete +// Alternatively, use [WithResultCallback] to process results as they complete // without accumulation. package workerpool @@ -31,10 +31,10 @@ var ( // ErrDraining is returned when an operation is not possible because // draining is in progress. ErrDraining = errors.New("drain operation in progress") - // ErrClosed is returned when operations are attempted after a call to Close. + // ErrClosed is returned when operations are attempted after a call to [Close]. ErrClosed = errors.New("worker pool is closed") - // ErrCallbackSet is returned by Drain when a result callback has been - // registered via WithResultCallback. + // ErrCallbackSet is returned by [Drain] when a result callback has been + // registered via [WithResultCallback]. ErrCallbackSet = errors.New("a result callback is set") ) @@ -42,9 +42,15 @@ var ( type Option func(*WorkerPool) // WithResultCallback registers fn to be called each time a task completes. -// When a callback is set, results are not accumulated internally and Drain -// returns ErrCallbackSet. The callback may be invoked concurrently from -// multiple goroutines; fn must be safe for concurrent use. +// +// When a callback is set, results are NOT accumulated internally. This +// means: +// - [Drain] will return [ErrCallbackSet] instead of collecting results +// - Results are processed immediately upon completion, avoiding memory +// buildup +// - The callback fn may be invoked concurrently from multiple goroutines +// +// The callback fn must be safe for concurrent use. // WithResultCallback panics if fn is nil. func WithResultCallback(fn func(Result)) Option { // TODO(v2): New/NewWithContext should return an error so that option @@ -80,8 +86,10 @@ func New(n int, opts ...Option) *WorkerPool { return NewWithContext(context.Background(), n, opts...) } -// NewWithContext creates a new pool of workers where at most n workers process submitted -// tasks concurrently. New panics if n ≤ 0. The context is used as the parent context to the context of the task func passed to Submit. +// NewWithContext creates a new pool of workers where at most n workers +// process submitted tasks concurrently. NewWithContext panics if n ≤ 0. The +// context is used as the parent context to the context of the task func passed +// to [Submit]. func NewWithContext(ctx context.Context, n int, opts ...Option) *WorkerPool { if n <= 0 { panic(fmt.Sprintf("workerpool.New: n must be > 0, got %d", n)) @@ -100,7 +108,7 @@ func NewWithContext(ctx context.Context, n int, opts ...Option) *WorkerPool { return wp } -// Cap returns the concurrent workers capacity, see New(). +// Cap returns the concurrent workers capacity, see [New]. func (wp *WorkerPool) Cap() int { return cap(wp.workers) } @@ -111,18 +119,20 @@ func (wp *WorkerPool) Len() int { } // Submit submits f for processing by a worker. The given id is useful for -// identifying the task once it is completed. The task f must return when the -// context ctx is cancelled. The context passed to task f is cancelled when -// Close is called or when the parent context passed to NewWithContext is done. +// identifying the task once it is completed. // -// Submit blocks until a routine start processing the task. +// The task function f receives a context that is cancelled when [Close] is +// called or when the parent context passed to [NewWithContext] is done. Tasks +// MUST respect context cancellation and return promptly when ctx.Done() is +// signaled. Tasks that ignore cancellation will cause [Close] to block +// indefinitely waiting for them to complete. Use context-aware operations +// (e.g., select with ctx.Done()) to ensure timely shutdown. // -// If a drain operation is in progress, ErrDraining is returned and the task -// is not submitted for processing. -// If the worker pool is closed, ErrClosed is returned and the task is not -// submitted for processing. -// If the parent context is done, context.Canceled is returned and the task is -// not submitted for processing. +// Submit blocks until a routine starts processing the task. +// +// [ErrDraining] is returned if a drain operation is in progress. +// [ErrClosed] is returned if the worker pool is closed. +// [context.Canceled] is returned if the parent context is done. func (wp *WorkerPool) Submit(id string, f func(ctx context.Context) error) error { wp.mu.Lock() if wp.closed { @@ -151,9 +161,18 @@ func (wp *WorkerPool) Submit(id string, f func(ctx context.Context) error) error // Drain waits until all tasks are completed. This operation prevents // submitting new tasks to the worker pool. Drain returns the results of the // tasks that have been processed. -// If a drain operation is already in progress, ErrDraining is returned. -// If the worker pool is closed, ErrClosed is returned. -// If a result callback is set via WithResultCallback, ErrCallbackSet is returned. +// +// Drain is incompatible with the [WithResultCallback] option. When a result +// callback is configured, results are processed immediately upon completion +// rather than being accumulated, so Drain returns [ErrCallbackSet]. +// +// Unlike [Close], Drain does not cancel task contexts. Tasks run to completion +// naturally. After Drain, the pool can be closed with [Close] (which will not +// cancel any tasks since none are running) or more tasks can be submitted. +// +// [ErrCallbackSet] is returned if the [WithResultCallback] option is used. +// [ErrDraining] is returned if a drain operation is already in progress. +// [ErrClosed] is returned if the worker pool is closed. func (wp *WorkerPool) Drain() ([]Task, error) { wp.mu.Lock() if wp.closed { @@ -174,9 +193,13 @@ func (wp *WorkerPool) Drain() ([]Task, error) { wp.wg.Wait() - // NOTE: It's not necessary to hold a lock when reading or writing - // wp.results as no other routine is running at this point besides the - // "run" routine which should be waiting on the tasks channel. + // NOTE: No lock is needed here due to the following synchronization: + // 1. Only the single run() goroutine writes to wp.results. + // 2. run() appends each result BEFORE spawning its worker goroutine. + // 3. Each worker calls wg.Done() upon completion. + // 4. wg.Wait() above ensures all workers (and thus all appends) completed. + // 5. run() is now blocked waiting for tasks on the channel. + // Therefore, no concurrent access to wp.results is possible here. res := wp.results wp.results = nil @@ -188,11 +211,20 @@ func (wp *WorkerPool) Drain() ([]Task, error) { } // Close closes the worker pool, rendering it unable to process new tasks. -// Close sends the cancellation signal to any running task and waits for all -// workers, if any, to return. When a result callback is set via -// WithResultCallback, all callback invocations are guaranteed to have completed -// before Close returns. -// Close will return ErrClosed if it has already been called. +// Close sends the cancellation signal to any running task via context +// cancellation and waits indefinitely for all workers to return. If tasks do +// not respect context cancellation, Close will block until they complete. +// When a result callback is set via [WithResultCallback], all callback +// invocations are guaranteed to have completed before Close returns. +// +// Close will return [ErrClosed] if it has already been called. This makes +// it safe to use with defer immediately after creating the pool (for +// cleanup on early returns) while still calling Close explicitly to check +// for errors. +// +// Note: Close cancels running tasks via context, while [Drain] waits for +// tasks to complete without cancellation. If you want tasks to finish +// naturally, call [Drain] before Close. func (wp *WorkerPool) Close() error { wp.mu.Lock() if wp.closed { @@ -216,6 +248,10 @@ func (wp *WorkerPool) Close() error { // run loops over the tasks channel and starts processing routines. It should // only be called once during the lifetime of a WorkerPool. +// This is the sole goroutine that writes to wp.results, making it safe to +// append without a lock. The append happens before spawning each worker, +// establishing a happens-before relationship that ensures [Drain] can safely +// read wp.results after wg.Wait() completes. func (wp *WorkerPool) run(ctx context.Context) { for t := range wp.tasks { result := taskResult{id: t.id} diff --git a/workerpool_test.go b/workerpool_test.go index 085f32c..ba108a4 100644 --- a/workerpool_test.go +++ b/workerpool_test.go @@ -17,6 +17,7 @@ import ( var errTask = errors.New("task error") +// TestWorkerPoolNewPanics verifies that New panics when n ≤ 0. func TestWorkerPoolNewPanics(t *testing.T) { // helper expecting New(n) to panic. testWorkerPoolNewPanics := func(n int) { @@ -32,6 +33,8 @@ func TestWorkerPoolNewPanics(t *testing.T) { testWorkerPoolNewPanics(-1) } +// TestWithResultCallbackNilPanics verifies that WithResultCallback panics when +// fn is nil. func TestWithResultCallbackNilPanics(t *testing.T) { defer func() { if r := recover(); r == nil { @@ -41,6 +44,7 @@ func TestWithResultCallbackNilPanics(t *testing.T) { workerpool.WithResultCallback(nil) } +// TestWorkerPoolTasksCapacity verifies that the tasks channel is unbuffered. func TestWorkerPoolTasksCapacity(t *testing.T) { wp := workerpool.New(runtime.NumCPU()) defer func() { @@ -54,6 +58,7 @@ func TestWorkerPoolTasksCapacity(t *testing.T) { } } +// TestWorkerPoolCap verifies that Cap returns the concurrent workers capacity. func TestWorkerPoolCap(t *testing.T) { one := workerpool.New(1) defer func() { @@ -87,6 +92,7 @@ func TestWorkerPoolCap(t *testing.T) { } } +// TestWorkerPoolLen verifies that Len returns the count of running workers. func TestWorkerPoolLen(t *testing.T) { wp := workerpool.New(1) if l := wp.Len(); l != 0 { @@ -165,6 +171,8 @@ func TestWorkerPoolConcurrentTasksCount(t *testing.T) { } } +// TestWorkerPool verifies basic worker pool functionality including Submit and +// Drain. func TestWorkerPool(t *testing.T) { n := runtime.NumCPU() wp := workerpool.New(n) @@ -257,6 +265,8 @@ func TestWorkerPool(t *testing.T) { } } +// TestConcurrentDrain verifies that concurrent Drain and Submit calls return +// appropriate errors. func TestConcurrentDrain(t *testing.T) { n := runtime.NumCPU() wp := workerpool.New(n) @@ -336,6 +346,8 @@ func TestConcurrentDrain(t *testing.T) { } } +// TestWorkerPoolDrainAfterClose verifies that Drain returns ErrClosed after +// Close. func TestWorkerPoolDrainAfterClose(t *testing.T) { wp := workerpool.New(runtime.NumCPU()) if err := wp.Close(); err != nil { @@ -350,6 +362,8 @@ func TestWorkerPoolDrainAfterClose(t *testing.T) { } } +// TestWorkerPoolDrainAfterCloseWithCallback verifies that ErrClosed takes +// precedence over ErrCallbackSet. func TestWorkerPoolDrainAfterCloseWithCallback(t *testing.T) { wp := workerpool.New(runtime.NumCPU(), workerpool.WithResultCallback(func(workerpool.Result) {})) if err := wp.Close(); err != nil { @@ -365,6 +379,7 @@ func TestWorkerPoolDrainAfterCloseWithCallback(t *testing.T) { } } +// TestWorkerPoolSubmitNil verifies that submitting a nil task func is valid. func TestWorkerPoolSubmitNil(t *testing.T) { wp := workerpool.New(runtime.NumCPU()) defer func() { @@ -393,6 +408,8 @@ func TestWorkerPoolSubmitNil(t *testing.T) { } +// TestWorkerPoolSubmitNilWithCallback verifies that a nil task func works with +// result callbacks. func TestWorkerPoolSubmitNilWithCallback(t *testing.T) { id := "nothing" var got workerpool.Result @@ -419,6 +436,8 @@ func TestWorkerPoolSubmitNilWithCallback(t *testing.T) { } } +// TestWorkerPoolSubmitAfterClose verifies that Submit returns ErrClosed after +// Close. func TestWorkerPoolSubmitAfterClose(t *testing.T) { wp := workerpool.New(runtime.NumCPU()) if err := wp.Close(); err != nil { @@ -429,6 +448,8 @@ func TestWorkerPoolSubmitAfterClose(t *testing.T) { } } +// TestWorkerPoolManyClose verifies that calling Close multiple times returns +// ErrClosed. func TestWorkerPoolManyClose(t *testing.T) { wp := workerpool.New(runtime.NumCPU()) @@ -446,6 +467,7 @@ func TestWorkerPoolManyClose(t *testing.T) { } } +// TestWorkerPoolClose verifies that Close cancels running tasks via context. func TestWorkerPoolClose(t *testing.T) { n := runtime.NumCPU() wp := workerpool.New(n) @@ -478,6 +500,8 @@ func TestWorkerPoolClose(t *testing.T) { wg.Wait() // all routines should have returned } +// TestWorkerPoolNewWithContext verifies that cancelling the parent context +// cancels running tasks. func TestWorkerPoolNewWithContext(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) n := runtime.NumCPU() @@ -535,6 +559,8 @@ func TestWorkerPoolNewWithContext(t *testing.T) { } } +// TestWorkerPoolNewWithCancelledContext verifies that Submit returns +// [context.Canceled] when the parent context is already cancelled. func TestWorkerPoolNewWithCancelledContext(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) cancel() // cancel before creating the pool @@ -565,6 +591,8 @@ func TestWorkerPoolNewWithCancelledContext(t *testing.T) { } } +// TestWorkerPoolWithResultCallback verifies that result callbacks are invoked +// and Drain returns ErrCallbackSet. func TestWorkerPoolWithResultCallback(t *testing.T) { n := runtime.NumCPU()