Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 96 additions & 0 deletions conf/tir.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
defaults:
- base
- override rewards: success_and_format
- _self_

actor:
rollout_policy: pipelinerl.domains.tir.generate_tir_rollout
system_prompt: |
You are a math-focused AI Agent. Solve problems by combining clear symbolic reasoning
with short, deterministic Python code.
Keep your replies concise and direct. Prioritize clarity and avoid over-elaboration.
Always present the final answer in LaTeX \boxed{}.
Do not express emotions or opinions about user questions.

Workflow:
1. Draft a brief plan in plain text.
2. Execute one run_python_code call to compute or verify the result.
3. Finalize by calling MathAnswer with the LaTeX-formatted answer.

Python execution policy (run_python_code):
- Use Python strictly for pure computation to verify and validate the final answer.
- No network, file system, OS or environment access.
- Keep snippets minimal and self-contained; print only the final result.

Validation:
- Cross-check results (alternative derivation, invariants, higher precision) before finalizing.
- If execution fails, propose the minimal fix and retry.
Always verify with run_python_code before invoking MathAnswer.
task_template: "{task}"
agent_max_loops: 3
llm_max_rollouts: 128
max_rollout_retries: 20
rollout_workers: 8
shared_memory_entry_size: 1000000000

rewards:
correct_answer_not_finished: 0.0
buffer_tokens: 0

# Math verifier environment
environments:
- key: math
mode: remote
_target_: pipelinerl.domains.math.MathEnvironment
environment_key: math
dataset_loader: pipelinerl.domains.math.load_datasets

train_dataset_names:
- open_reasoner_zero_57k
- open_reasoner_zero_extended_72k
test_dataset_names:
- aime_2025

# SandboxFusion config
sandbox_endpoint: ${oc.env:SANDBOX_ENDPOINT,http://127.0.0.1:8080}
sandbox_timeout: 10.0

# Optional reward shaping
python_tool_shaping:
bonus_on_correct_with_python: 0.1
penalty_on_incorrect_without_python: 0.1
max_abs: 0.2

# vLLM tool-call parser config
vllm_config:
vllm_kwargs:
enable-auto-tool-choice: ""
tool-call-parser: rl_tool
tool-parser-plugin: ${hydra:runtime.cwd}/pipelinerl/rl_tool_parser_plugin.py
max_model_len: 32000

llm:
parameters:
max_tokens: 16000
temperature: 1.0

test_llm:
parameters:
max_tokens: 16000
temperature: 1.0
top_p: 0.95
top_k: 50

finetune:
seq_length: 32000
seq_parallel: 8
gradient_accumulation_passes: 1024
rl:
policy_loss: gspo
overlong_filtering: true

preprocess:
input: actor
output: training_data
n_workers: 8
shared_memory_entry_size: 1000000000
4 changes: 2 additions & 2 deletions pipelinerl/actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
from pipelinerl.finetune_loop import calculate_train_steps
from pipelinerl.finetune.logging_ import flatten_dict_config, init_wandb
from pipelinerl.llm import TrainableLLM
from pipelinerl.rollouts import BaseMetrics, RolloutResult
from pipelinerl.rollouts import BaseMetrics, RolloutResult, rollout_has_overflow
from pipelinerl.shared_memory_array import SharedMemoryQueue
from pipelinerl.state import TrainerState
from pipelinerl.streams import (
Expand Down Expand Up @@ -393,7 +393,7 @@ def init_stats(self):
def compute_domain_agnostic_metrics(self, result: RolloutResult) -> Dict[str, float]:
metrics = {}

metrics['overflow'] = all([not training_text.finished for training_text in result.training_texts ])
metrics['overflow'] = rollout_has_overflow(result.training_texts)
metrics['num_turns'] = len(result.training_texts)
metrics['prompt_tokens'] = [training_text.prompt_tokens for training_text in result.training_texts]
metrics['output_tokens'] = [training_text.output_tokens for training_text in result.training_texts]
Expand Down
58 changes: 48 additions & 10 deletions pipelinerl/async_llm.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@
import logging

import aiohttp
import litellm
import numpy as np
from PIL import Image
from pipelinerl.llm import LLMCall, LLMOutput, Prompt, TokenLogprob, TrainableLLM

from pipelinerl.finetune.data import MASKED_TOKEN_ID
from pipelinerl.rollouts import TrainingText
from pipelinerl.rollouts import TrainingText, apply_rollout_reward
from pipelinerl.processor_factory import get_processor
from omegaconf import DictConfig, ListConfig, OmegaConf

Expand Down Expand Up @@ -54,7 +55,10 @@ def _to_plain_obj(value):


async def llm_async_generate(
llm: TrainableLLM, prompt: Prompt, session: aiohttp.ClientSession
llm: TrainableLLM,
prompt: Prompt,
session: aiohttp.ClientSession,
max_tokens_override: int | None = None,
) -> LLMCall:
llm.load_tokenizer()
headers = {"Content-Type": "application/json"}
Expand Down Expand Up @@ -85,6 +89,12 @@ async def llm_async_generate(

logger.debug(f"POST request to {llm.base_url}/v1/chat/completions")

if prompt.tools:
data["tools"] = _to_plain_obj(prompt.tools)

if max_tokens_override is not None:
data["max_tokens"] = max_tokens_override

# Merge extra_parameters first so that data (model, messages, logprobs settings) takes precedence
payload = _to_plain_obj({**extra_parameters, **data})
async with session.post(
Expand All @@ -101,7 +111,8 @@ async def llm_async_generate(

try:
content = data["choices"][0]["message"]["content"]
if not content:
raw_tool_calls = data["choices"][0]["message"].get("tool_calls", [])
if not content and not raw_tool_calls:
logger.warning(f"Empty completion {data}")

parsed_logprobs = []
Expand All @@ -128,7 +139,9 @@ async def llm_async_generate(
logger.exception(f"Failed to parse llm response: {data}")
raise

output = LLMOutput(content=content)
output = LLMOutput(content=content or "")
if raw_tool_calls:
output.tool_calls = [litellm.ChatCompletionMessageToolCall(**tc) for tc in raw_tool_calls]
llm_call = llm.log_output(prompt, output, count_tokens=False)
llm_call.prompt_length_tokens = data["usage"]["prompt_tokens"]
llm_call.output_length_tokens = data["usage"]["completion_tokens"]
Expand All @@ -144,9 +157,20 @@ def make_training_text(llm: TrainableLLM, llm_call: LLMCall) -> TrainingText:
images = []
use_processor = False
visual_features = None
full_messages = llm_call.prompt.messages + [
{"role": "assistant", "content": llm_call.output.content}
]
assistant_msg: dict = {"role": "assistant", "content": llm_call.output.content or ""}
if llm_call.output.tool_calls:
assistant_msg["tool_calls"] = [
{
"id": tc.id,
"type": "function",
"function": {
"name": tc.function.name,
"arguments": tc.function.arguments,
},
}
for tc in llm_call.output.tool_calls
]
full_messages = llm_call.prompt.messages + [assistant_msg]

if hasattr(llm_call.prompt, "messages"):
images = extract_images_from_messages(llm_call.prompt.messages)
Expand Down Expand Up @@ -197,25 +221,27 @@ def make_training_text(llm: TrainableLLM, llm_call: LLMCall) -> TrainingText:
except Exception as e:
raise ValueError(f"Failed to process with vision-language processor: {e}")
else:
# Use tokenizer for text-only models
tools_kwarg = {"tools": llm_call.prompt.tools} if llm_call.prompt.tools else {}
prompt_text = llm.tokenizer.apply_chat_template(
conversation=llm_call.prompt.messages,
tokenize=False,
add_generation_prompt=True,
**tools_kwarg,
)
text = llm.tokenizer.apply_chat_template(
full_messages,
tokenize=False,
**tools_kwarg,
)
prompt_token_ids = llm.tokenizer.apply_chat_template(
llm_call.prompt.messages,
add_special_tokens=True,
add_generation_prompt=True,
**tools_kwarg,
)

output_text = text[len(prompt_text) :]

# Get the appropriate tokenizer (from processor if using vision model)
tokenizer = processor.tokenizer if use_processor else llm.tokenizer

if tokenizer.bos_token and text.startswith(tokenizer.bos_token):
Expand All @@ -235,7 +261,7 @@ def make_training_text(llm: TrainableLLM, llm_call: LLMCall) -> TrainingText:
finished = finish_reason != "length"
else:
eos_token = tokenizer.eos_token or ""
finished = bool(eos_token) and llm_call.output.content.endswith(eos_token)
finished = bool(eos_token) and (llm_call.output.content or "").endswith(eos_token)
prompt_tokens = llm_call.prompt_length_tokens
output_tokens = llm_call.output_length_tokens

Expand All @@ -250,3 +276,15 @@ def make_training_text(llm: TrainableLLM, llm_call: LLMCall) -> TrainingText:
output_tokens=output_tokens,
visual_features=visual_features,
)


def make_training_texts_from_llm_calls(
llm: TrainableLLM,
llm_calls: list[LLMCall],
reward: float | None = None,
) -> list[TrainingText]:
training_texts = [make_training_text(llm, llm_call) for llm_call in llm_calls]
if reward is not None:
training_texts = apply_rollout_reward(training_texts, reward)
return training_texts

2 changes: 1 addition & 1 deletion pipelinerl/domains/math/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
from .load_datasets import load_datasets
from .rollouts import generate_math_rollout, RewardTable
from .rollouts import generate_math_rollout, RewardTable, get_reward, length_penalty
from .verifier_api import MathEnvironment, verify_answer, verify_answer_rpc
42 changes: 23 additions & 19 deletions pipelinerl/domains/math/rollouts.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,28 @@ def log_config(self, domain: str = "unknown") -> None:
f"buffer_tokens={self.buffer_tokens}"
)

def get_reward(answer_status: str, finished: bool, reward_table: RewardTable) -> float:
match (answer_status, finished):
case ("wrong", False):
return reward_table.wrong_answer_not_finished
case ("wrong", True):
return reward_table.wrong_answer_finished
case ("no_answer", False):
return reward_table.no_answer_not_finished
case ("no_answer", True):
return reward_table.no_answer_finished
case ("unparsable", False):
return reward_table.unparsable_not_finished
case ("unparsable", True):
return reward_table.unparsable_finished
case ("correct", False):
return reward_table.correct_answer_not_finished
case ("correct", True):
return reward_table.correct_answer_finished
case _:
raise ValueError(f"Invalid answer_status/finished combination: {answer_status}/{finished}")


def length_penalty(max_length: int, sequence_length: int, buffer_tokens: int) -> float:
"""
Compute the overlong penalty
Expand Down Expand Up @@ -100,25 +122,7 @@ async def generate_math_rollout(

trace = make_training_text(llm, llm_call)
# Determine reward based on answer status and finished state
match (answer_status, trace.finished):
case ("wrong", False):
reward = rewards.wrong_answer_not_finished
case ("wrong", True):
reward = rewards.wrong_answer_finished
case ("no_answer", False):
reward = rewards.no_answer_not_finished
case ("no_answer", True):
reward = rewards.no_answer_finished
case ("unparsable", False):
reward = rewards.unparsable_not_finished
case ("unparsable", True):
reward = rewards.unparsable_finished
case ("correct", False):
reward = rewards.correct_answer_not_finished
case ("correct", True):
reward = rewards.correct_answer_finished
case _:
raise ValueError(f"Invalid answer_status/finished combination: {answer_status}/{trace.finished}")
reward = get_reward(answer_status, trace.finished, rewards)

# Apply discount factor based on output length
reward *= discount_factor**llm_call.output_length_tokens
Expand Down
19 changes: 5 additions & 14 deletions pipelinerl/domains/miniwob/rollouts.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
from tapeagents.remote_environment import AsyncRemoteEnvironment
from tapeagents.tools.simple_browser import PageObservation

from pipelinerl.async_llm import make_training_text
from pipelinerl.async_llm import make_training_texts_from_llm_calls
from pipelinerl.llm import LLMCall, TrainableLLM
from pipelinerl.rollouts import BaseMetrics, RolloutResult
from pipelinerl.rollouts import BaseMetrics, RolloutResult, summarize_training_texts
from pipelinerl.world import Job

from .steps import WebTape
Expand Down Expand Up @@ -271,13 +271,8 @@ async def _execute_rollout_with_timeout(
]

# (4) # For each LLM interaction in the tape, make a training example.
all_finished = 1
prompt_tokens = [llm_call.prompt_length_tokens for llm_call in llm_calls]
output_tokens = [llm_call.output_length_tokens for llm_call in llm_calls]
training_texts = [make_training_text(llm, llm_call) for llm_call in llm_calls]
for text in training_texts:
text.reward = reward
all_finished &= 1 if text.input_ids[-1] == llm.tokenizer.eos_token_id else 0
training_texts = make_training_texts_from_llm_calls(llm, llm_calls, reward=reward)
training_summary = summarize_training_texts(training_texts)

latency = time.time() - start_time
agent_time = tape.metadata.result.get("agent_execution_time", -1.0)
Expand All @@ -289,7 +284,7 @@ async def _execute_rollout_with_timeout(
success=reward > 0.5,
no_error=no_error,
no_answer=reward < 0,
overflow=not all_finished,
overflow=training_summary.overflow,
n_llm_calls=n_llm_calls,
n_step_errors=n_step_errors,
n_page_observations=n_page_observations,
Expand All @@ -307,8 +302,6 @@ async def _execute_rollout_with_timeout(
latency=latency,
dataset_name=problem["dataset"],
domain="miniwob",
prompt_tokens=prompt_tokens,
output_tokens=output_tokens,
)


Expand Down Expand Up @@ -340,6 +333,4 @@ def _create_failed_rollout_result(problem: dict, start_time: float, error_type:
latency=latency,
dataset_name=problem["dataset"],
domain="miniwob",
prompt_tokens=[],
output_tokens=[],
)
1 change: 1 addition & 0 deletions pipelinerl/domains/tir/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .rollouts import generate_tir_rollout
Loading