Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 95 additions & 24 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,28 +4,72 @@
[![CI](https://github.com/cilium/workerpool/workflows/Tests/badge.svg)](https://github.com/cilium/workerpool/actions?query=workflow%3ATests)
[![Go Report Card](https://goreportcard.com/badge/github.com/cilium/workerpool)](https://goreportcard.com/report/github.com/cilium/workerpool)

Package workerpool implements a concurrency limiting worker pool. Worker
routines are spawned on demand as tasks are submitted; up to the configured
limit of concurrent workers.
**A concurrency-limiting worker pool for Go with backpressure and zero
dependencies.**

When the limit of concurrently running workers is reached, submitting a task
blocks until a worker is able to pick it up. This behavior is intentional as it
prevents from accumulating tasks which could grow unbounded. Therefore, it is
the responsibility of the caller to queue up tasks if that's the intended
behavior.
Perfect for CPU-bound tasks that need controlled parallelism without
unbounded queuing.

One caveat is that while the number of concurrently running workers is limited,
task results are not and they accumulate until they are collected. Therefore,
if a large number of tasks can be expected, the workerpool should be
periodically drained (e.g. every 10k tasks). Alternatively,
`WithResultCallback` can be used to process results as they complete, avoiding
accumulation entirely.
## Features

This package is mostly useful when tasks are CPU bound and spawning too many
routines would be detrimental to performance. It features a straightforward API
and no external dependencies. See the sections below for usage examples.
- ✅ **Backpressure by design** - Blocks on submit when workers are busy
Comment thread
rolinh marked this conversation as resolved.
(no unbounded queues)
- ✅ **On-demand workers** - Spawns workers as needed, up to configured
limit
- ✅ **Two result modes** - Collect via `Drain()` or stream via callback
- ✅ **Context-aware** - Full cancellation support for graceful shutdown
- ✅ **Zero dependencies** - Pure standard library
- ✅ **Simple API** - Submit, Drain, Close. That's it.

## Example with Drain
## Installation

```bash
go get github.com/cilium/workerpool
```

## Quick Start

```go
wp := workerpool.New(runtime.NumCPU())
defer wp.Close()

// Submit tasks (blocks when all workers are busy)
err := wp.Submit("task-1", func(ctx context.Context) error {
// Your CPU-bound work here
return process(data)
})

// Collect results
tasks, _ := wp.Drain()
for _, task := range tasks {
if err := task.Err(); err != nil {
log.Printf("Task %s failed: %v", task, err)
}
}
```

## When to Use This

**Use workerpool when:**
- Tasks are CPU-bound and need parallelism control
- You want backpressure (block submission instead of queuing unbounded
tasks)
- You need simple, predictable concurrency limiting

**Don't use if:**
- You need I/O-bound task handling (consider channels or goroutines
directly)
- You want automatic retries, priorities, or complex scheduling
- You need persistent job queues (use a proper job queue)

## Usage Patterns

### Pattern 1: Batch Processing with Drain

Process tasks in batches and collect all results at once.

<details>
<summary>Click to expand full example</summary>

```go
package main
Expand Down Expand Up @@ -111,13 +155,18 @@ func main() {
}
```

## Example with result callback
</details>

### Pattern 2: Streaming Results with Callback

Use `WithResultCallback` to process each result as it completes rather than
accumulating them for a later `Drain` call. The callback receives a `Result`,
which extends `Task` with a `Duration()` method reporting how long the task
took to execute. This is useful for logging, metrics, or long-running pools
where unbounded result accumulation is undesirable.
Use `WithResultCallback` to process each result as it completes rather
than accumulating them for a later `Drain` call. The callback receives a
`Result`, which extends `Task` with a `Duration()` method reporting how
long the task took to execute. This is useful for logging, metrics, or
long-running pools where unbounded result accumulation is undesirable.

<details>
<summary>Click to expand full example</summary>

```go
package main
Expand Down Expand Up @@ -162,3 +211,25 @@ func main() {
}
}
```

</details>

## Important Notes

> [!WARNING]
> **Result accumulation**: Without `WithResultCallback`, results accumulate
> in memory until drained. For large workloads, drain periodically or use
> the callback mode.

> [!NOTE]
> **Backpressure behavior**: `Submit()` blocks when no workers are
> available. This is intentional to prevent unbounded queuing. Queue tasks
> yourself if needed.

> [!IMPORTANT]
> **Cleanup**: Always `defer wp.Close()` to ensure graceful shutdown and
> context cancellation.

## Documentation

Full API documentation: https://pkg.go.dev/github.com/cilium/workerpool
Loading