Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions docs/architecture/providers.md
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,20 @@ The Xai provider uses a provider-specific batch lifecycle and response envelope:
- result retrieval from `/v1/batches/{batch_id}/results`
- provider-specific result row key (`custom_id_field_name = "batch_request_id"`)

## sference provider

The sference provider reuses the OpenAI provider implementation with inline batch
submission to ``POST /v1/batches`` (no ``/v1/files`` upload step):

- `hostname = "api.sference.com"`
- `is_file_based = False`
- `batchable_endpoints = ("/v1/chat/completions",)` — sference inline batches execute
chat-completion bodies only; `/v1/responses` is not batchable via `/v1/batches`
- `supported_completion_windows = ("24h",)` — sference batches currently expose a
`24h` SLA window only
- poll via `GET /v1/batches/{batch_id}`; results via `GET /v1/batches/{batch_id}/results.jsonl`
- inherits OpenAI terminal states and default JSONL result decoding

## Doubleword provider

The Doubleword provider reuses the OpenAI provider implementation and only changes:
Expand Down
1 change: 1 addition & 0 deletions docs/providers.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ The following providers are supported by `batchling`:
- [Groq](providers/groq.md)
- [Mistral](providers/mistral.md)
- [OpenAI](providers/openai.md)
- [Sference](providers/sference.md)
- [Together](providers/together.md)
- [Vertex](providers/vertex.md)
- [XAI](providers/xai.md)
3 changes: 3 additions & 0 deletions docs/providers/_credentials/sference.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
<!-- markdownlint-disable-file MD041 MD001 -->
Set `SFERENCE_API_KEY` in `.env` or export it in your shell before running batches.
You can create keys in the sference console.
3 changes: 3 additions & 0 deletions docs/providers/_urls/sference.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
<!-- markdownlint-disable-file MD041 MD001 -->
- [Batch inference guide](https://sference.com/docs/guides/batches)
- [Responses & streams guide](https://sference.com/docs/guides/responses)
23 changes: 23 additions & 0 deletions docs/providers/sference.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# Sference

`batchling` is compatible with Sference through any [supported framework](../frameworks.md){ data-preview }

The following endpoints are made batch-compatible by Sference:

- `/v1/chat/completions`

!!! warning "Check model support and batch pricing"
Before sending batches, review the provider's official pricing page for supported models and batch pricing details.

The Batch API docs for Sference can be found on the following URL:
--8<-- "docs/providers/_urls/sference.md"

## Example Usage

--8<-- "docs/providers/_credentials/sference.md"

Here's an example showing how to use `batchling` with Sference:

```py title="sference_example.py"
--8<-- "examples/providers/sference_example.py"
```
51 changes: 51 additions & 0 deletions examples/providers/sference_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import asyncio
import os

from dotenv import load_dotenv
from openai import AsyncOpenAI

from batchling import batchify

load_dotenv()


async def build_tasks() -> list:
"""Build sference chat completion requests."""
client = AsyncOpenAI(
api_key=os.getenv(key="SFERENCE_API_KEY"),
base_url="https://api.sference.com/v1",
)
questions = [
"Who is the best French painter? Answer in one short sentence.",
"What is the capital of France?",
]
return [
client.chat.completions.create(
model="moonshotai/Kimi-K2.6",
messages=[
{
"role": "user",
"content": question,
}
],
)
for question in questions
]


async def main() -> None:
"""Run the sference example."""
tasks = await build_tasks()
responses = await asyncio.gather(*tasks)
for response in responses:
print(f"{response.model} answer:\n{response.choices[0].message.content}\n")


async def run_with_batchify() -> None:
"""Run `main` inside `batchify` for direct script execution."""
async with batchify(completion_window="24h"):
await main()


if __name__ == "__main__":
asyncio.run(run_with_batchify())
1 change: 1 addition & 0 deletions mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ nav:
- Groq: providers/groq.md
- Mistral: providers/mistral.md
- OpenAI: providers/openai.md
- Sference: providers/sference.md
- Together: providers/together.md
- Vertex: providers/vertex.md
- XAI: providers/xai.md
Expand Down
151 changes: 151 additions & 0 deletions src/batchling/providers/sference.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
import json
import typing as t

import httpx

from batchling.providers.base import (
PendingRequestLike,
PollSnapshot,
ProviderRequestSpec,
ResumeContext,
)
from batchling.providers.openai import OpenAIProvider


class SferenceProvider(OpenAIProvider):
"""Provider adapter for sference's inline ``POST /v1/batches`` API."""

name = "sference"
hostname = "api.sference.com"
is_file_based = False
file_content_endpoint = "/v1/batches/{id}/results.jsonl"
batch_endpoint = "/v1/batches"
output_file_field_name: str = "id"
error_file_field_name: str = "id"
supported_completion_windows: tuple[str, ...] = ("24h",)
batchable_endpoints = ("/v1/chat/completions",)

def matches_url(self, hostname: str) -> bool:
normalized = hostname.lower()
return normalized in {"api.sference.com"}

def build_jsonl_lines(
self,
*,
requests: t.Sequence[PendingRequestLike],
) -> list[dict[str, t.Any]]:
return [
{
"custom_id": request.custom_id,
"body": json.loads(s=request.params["body"].decode(encoding="utf-8")),
}
for request in requests
]

async def build_inline_batch_payload(
self,
*,
jsonl_lines: list[dict[str, t.Any]],
completion_window: str,
) -> dict[str, t.Any]:
return {
"window": completion_window,
"requests": jsonl_lines,
}

def build_batch_results_path(self, *, file_id: str | None, batch_id: str) -> str:
del file_id
return f"/v1/batches/{batch_id}/results.jsonl"

def get_progress_from_poll(
self,
*,
payload: dict[str, t.Any],
requests_count: int,
) -> tuple[int, float]:
if payload.get(self.batch_status_field_name) == "completed":
return requests_count, 100.0
return 0, 0.0

async def get_result_locator_from_poll_response(
self,
*,
payload: dict[str, t.Any],
) -> str:
return str(object=payload.get("id") or "")

def build_poll_request_spec(
self,
*,
base_url: str,
api_headers: dict[str, str],
batch_id: str,
) -> ProviderRequestSpec:
return super().build_poll_request_spec(
base_url=base_url,
api_headers=api_headers,
batch_id=batch_id,
)

async def parse_poll_response(
self,
*,
payload: dict[str, t.Any],
requests_count: int,
) -> PollSnapshot:
return await super().parse_poll_response(
payload=payload,
requests_count=requests_count,
)

def build_results_request_spec(
self,
*,
base_url: str,
api_headers: dict[str, str],
file_id: str | None,
batch_id: str,
) -> ProviderRequestSpec:
return super().build_results_request_spec(
base_url=base_url,
api_headers=api_headers,
file_id=file_id,
batch_id=batch_id,
)

def decode_results_content(
self,
*,
batch_id: str,
content: str,
) -> dict[str, httpx.Response]:
return super().decode_results_content(batch_id=batch_id, content=content)

def from_batch_result(self, result_item: dict[str, t.Any]) -> httpx.Response:
result_json = result_item.get("result_json")
error_json = result_item.get("error_json")
if result_json is not None:
status_code = 200
body: dict[str, t.Any] | t.Any = result_json
elif error_json is not None:
status_code = 500
body = error_json
else:
status_code = 500
body = {"error": result_item.get("status") or "Missing result"}

content, content_headers = self.encode_body(body=body)
headers = dict(content_headers)
return httpx.Response(
status_code=status_code,
headers=headers,
content=content,
)

def build_resume_context(
self,
*,
host: str,
headers: dict[str, str] | None,
) -> ResumeContext:
return super().build_resume_context(host=host, headers=headers)
55 changes: 55 additions & 0 deletions tests/test_provider_contracts.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from batchling.providers.groq import GroqProvider
from batchling.providers.mistral import MistralProvider
from batchling.providers.openai import OpenAIProvider
from batchling.providers.sference import SferenceProvider
from batchling.providers.together import TogetherProvider
from batchling.providers.vertex import VertexProvider
from batchling.providers.xai import XaiProvider
Expand All @@ -28,6 +29,7 @@
DoublewordProvider(),
XaiProvider(),
VertexProvider(),
SferenceProvider(),
],
)
def test_build_poll_request_spec_returns_get(provider: t.Any) -> None:
Expand Down Expand Up @@ -61,6 +63,7 @@ def test_build_poll_request_spec_returns_get(provider: t.Any) -> None:
DoublewordProvider(),
XaiProvider(),
VertexProvider(),
SferenceProvider(),
],
)
def test_build_resume_context_adds_internal_header(provider: t.Any) -> None:
Expand Down Expand Up @@ -238,6 +241,58 @@ def test_decode_results_content_maps_custom_ids() -> None:
assert isinstance(vertex_results["req-3"], httpx.Response)


@pytest.mark.asyncio
async def test_sference_build_inline_batch_payload_includes_window() -> None:
"""
Ensure inline batch payloads include the sference SLA window field.
"""
provider = SferenceProvider()
payload = await provider.build_inline_batch_payload(
jsonl_lines=[{"custom_id": "req-1", "body": {"model": "demo", "messages": []}}],
completion_window="24h",
)
assert payload == {
"window": "24h",
"requests": [{"custom_id": "req-1", "body": {"model": "demo", "messages": []}}],
}


def test_sference_build_batch_results_path_uses_batch_id() -> None:
"""
Ensure sference downloads results from the inline batch results route.
"""
provider = SferenceProvider()
assert provider.build_batch_results_path(file_id=None, batch_id="batch-123") == (
"/v1/batches/batch-123/results.jsonl"
)


def test_sference_from_batch_result_decodes_result_json() -> None:
"""
Ensure sference batch rows decode from ``result_json`` / ``error_json``.
"""
provider = SferenceProvider()
success = provider.from_batch_result(
result_item={
"custom_id": "req-1",
"status": "completed",
"result_json": {"object": "chat.completion", "choices": []},
"error_json": None,
},
)
assert success.status_code == 200

failure = provider.from_batch_result(
result_item={
"custom_id": "req-2",
"status": "failed",
"result_json": None,
"error_json": {"detail": "boom"},
},
)
assert failure.status_code == 500


@pytest.mark.asyncio
@pytest.mark.parametrize(
("payload", "expected_status"),
Expand Down
35 changes: 35 additions & 0 deletions tests/test_provider_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,41 @@ def test_provider_lookup_resolves_xai() -> None:
assert provider.name == "xai"


def test_provider_lookup_does_not_batch_sference_responses() -> None:
"""
Ensure sference responses are not routed through the inline batch API.

Returns
-------
None
This test asserts responses are excluded from batchable endpoints.
"""
provider = get_provider_for_batch_request(
hostname="api.sference.com",
path="/v1/responses",
method="POST",
)
assert provider is None


def test_provider_lookup_resolves_sference_chat_completions() -> None:
"""
Ensure hostname lookup resolves the sference provider for chat completions.

Returns
-------
None
This test asserts hostname-to-provider mapping.
"""
provider = get_provider_for_batch_request(
hostname="api.sference.com",
path="/v1/chat/completions",
method="POST",
)
assert provider is not None
assert provider.name == "sference"


def test_provider_lookup_requires_exact_hostname_match() -> None:
"""
Ensure provider lookup does not match hostname suffixes or subdomains.
Expand Down
Loading