Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ linters:
- gocognit
- goconst
- gocritic
- godoclint
- godot
- goheader
- goprintffuncname
Expand Down Expand Up @@ -68,6 +69,8 @@ linters:
- usetesting
- wastedassign
settings:
godoclint:
default: all
goheader:
template: |-
SPDX-License-Identifier: Apache-2.0
Expand Down
33 changes: 24 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,19 @@ import (
)

// IsPrime returns true if n is prime, false otherwise.
func IsPrime(n int64) bool {
func IsPrime(ctx context.Context, n int64) bool {
if n < 2 {
return false
}
for p := int64(2); p*p <= n; p++ {
// Check for cancellation periodically (every 10000 iterations)
if p%10000 == 0 {
select {
case <-ctx.Done():
return false
default:
}
}
if n%p == 0 {
return false
}
Expand All @@ -54,13 +62,18 @@ func IsPrime(n int64) bool {

func main() {
wp := workerpool.New(runtime.NumCPU())
// Defer Close to ensure cleanup on early return (e.g., errors during Submit).
// Close sends cancellation to running tasks and waits for them to complete.
// It's safe to call Close multiple times; subsequent calls return ErrClosed.
defer func() { _ = wp.Close() }()

for i, n := 0, int64(1_000_000_000_000_000_000); i < 100; i, n = i+1, n+1 {
id := fmt.Sprintf("task #%d", i)
// Use Submit to submit tasks for processing. Submit blocks when no
// worker is available to pick up the task.
err := wp.Submit(id, func(_ context.Context) error {
err := wp.Submit(id, func(ctx context.Context) error {
fmt.Println("isprime", n)
if IsPrime(n) {
if IsPrime(ctx, n) {
fmt.Println(n, "is prime!")
}
return nil
Expand Down Expand Up @@ -90,7 +103,8 @@ func main() {
}
}

// Close should be called once the worker pool is no longer necessary.
// Close is called here explicitly to check for errors. The deferred Close
// will also run but returns ErrClosed (which we can ignore on defer).
if err := wp.Close(); err != nil {
fmt.Fprintln(os.Stderr, err)
}
Expand All @@ -111,7 +125,6 @@ package main
import (
"context"
"fmt"
"log"
"os"
"runtime"

Expand All @@ -126,24 +139,26 @@ func main() {
fmt.Printf("task %s completed in %s\n", r, r.Duration())
}
}))
defer func() { _ = wp.Close() }()

for i, n := 0, int64(1_000_000_000_000_000_000); i < 100; i, n = i+1, n+1 {
id := fmt.Sprintf("task #%d", i)
err := wp.Submit(id, func(_ context.Context) error {
if IsPrime(n) {
err := wp.Submit(id, func(ctx context.Context) error {
if IsPrime(ctx, n) {
fmt.Println(n, "is prime!")
}
return nil
})
if err != nil {
log.Fatal(err)
fmt.Fprintln(os.Stderr, err)
return
}
}

// Close waits for all in-flight tasks to complete before returning,
// ensuring all callback invocations have finished.
if err := wp.Close(); err != nil {
log.Fatal(err)
fmt.Fprintln(os.Stderr, err)
}
}
```
41 changes: 30 additions & 11 deletions example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,41 +6,55 @@ package workerpool_test
import (
"context"
"fmt"
"log"
"os"
"runtime"

"github.com/cilium/workerpool"
)

// IsPrime returns true if n is prime, false otherwise.
func IsPrime(n int64) bool {
func IsPrime(ctx context.Context, n int64) bool {
if n < 2 {
return false
}
for p := int64(2); p*p <= n; p++ {
// Check for cancellation periodically (every 10000 iterations)
if p%10000 == 0 {
select {
case <-ctx.Done():
return false
default:
}
}
if n%p == 0 {
return false
}
}
return true
}

// Example demonstrates basic usage of a worker pool with Drain and Close.
func Example() {
wp := workerpool.New(runtime.NumCPU())
// Defer Close to ensure cleanup on early return (e.g., errors during Submit).
// Close sends cancellation to running tasks and waits for them to complete.
// It's safe to call Close multiple times; subsequent calls return [ErrClosed].
defer func() { _ = wp.Close() }()

for i, n := 0, int64(1_000_000_000_000_000_000); i < 100; i, n = i+1, n+1 {
id := fmt.Sprintf("task #%d", i)
// Use Submit to submit tasks for processing. Submit blocks when no
// worker is available to pick up the task.
err := wp.Submit(id, func(_ context.Context) error {
err := wp.Submit(id, func(ctx context.Context) error {
fmt.Println("isprime", n)
if IsPrime(n) {
if IsPrime(ctx, n) {
fmt.Println(n, "is prime!")
}
return nil
})
// Submit fails when the pool is closed (ErrClosed) or being drained
// (ErrDrained). Check for the error when appropriate.
// Submit fails when the pool is closed ([ErrClosed]), being drained
// ([ErrDraining]), or the parent context is done ([context.Canceled]).
// Check for the error when appropriate.
if err != nil {
fmt.Fprintln(os.Stderr, err)
return
Expand All @@ -63,12 +77,15 @@ func Example() {
}
}

// Close should be called once the worker pool is no longer necessary.
// Close is called here explicitly to check for errors. The deferred Close
// will also run but returns [ErrClosed] (which we can ignore on defer).
if err := wp.Close(); err != nil {
fmt.Fprintln(os.Stderr, err)
}
}

// ExampleWithResultCallback demonstrates using a result callback to process
// task results immediately without accumulation.
func ExampleWithResultCallback() {
wp := workerpool.New(runtime.NumCPU(), workerpool.WithResultCallback(func(r workerpool.Result) {
if err := r.Err(); err != nil {
Expand All @@ -77,23 +94,25 @@ func ExampleWithResultCallback() {
fmt.Printf("task %s completed in %s\n", r, r.Duration())
}
}))
defer func() { _ = wp.Close() }()

for i, n := 0, int64(1_000_000_000_000_000_000); i < 100; i, n = i+1, n+1 {
id := fmt.Sprintf("task #%d", i)
err := wp.Submit(id, func(_ context.Context) error {
if IsPrime(n) {
err := wp.Submit(id, func(ctx context.Context) error {
if IsPrime(ctx, n) {
fmt.Println(n, "is prime!")
}
return nil
})
if err != nil {
log.Fatal(err)
fmt.Fprintln(os.Stderr, err)
return
}
}

// Close waits for all in-flight tasks to complete before returning,
// ensuring all callback invocations have finished.
if err := wp.Close(); err != nil {
log.Fatal(err)
fmt.Fprintln(os.Stderr, err)
}
}
4 changes: 2 additions & 2 deletions task.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ type Task interface {
}

// Result is a completed Task that also reports its execution duration.
// It is passed to the callback registered with WithResultCallback.
// It is passed to the callback registered with [WithResultCallback].
type Result interface {
Task
// Duration returns the time taken to execute the task.
Expand All @@ -40,7 +40,7 @@ type taskResult struct {
// Ensure that taskResult implements the Result interface.
var _ Result = &taskResult{}

// String implements fmt.Stringer for taskResult.
// String implements [fmt.Stringer] for taskResult.
func (t *taskResult) String() string {
return t.id
}
Expand Down
Loading
Loading