diff --git a/api/apps/restful_apis/agent_api.py b/api/apps/restful_apis/agent_api.py index 8cfc16c34b0..84dbfbfb143 100644 --- a/api/apps/restful_apis/agent_api.py +++ b/api/apps/restful_apis/agent_api.py @@ -14,18 +14,25 @@ # limitations under the License. # -import inspect +import asyncio +import base64 import copy +import hashlib +import hmac +import inspect +import ipaddress import json import logging +import time from functools import partial +import jwt from quart import Response, jsonify, request -from agent.component import LLM from agent.canvas import Canvas +from agent.component import LLM from agent.dsl_migration import normalize_chunker_dsl -from api.apps import login_required +from api.apps import current_user, login_required from api.apps.services.canvas_replica_service import CanvasReplicaService from api.db import CanvasCategory from api.db.db_models import Task @@ -52,15 +59,14 @@ server_error_response, validate_request, ) +from common import settings from common.constants import RetCode from common.misc_utils import get_uuid, thread_pool_exec -from common import settings from peewee import MySQLDatabase, PostgresqlDatabase from rag.flow.pipeline import Pipeline from rag.nlp import search from rag.utils.redis_conn import REDIS_CONN - def _get_user_nickname(user_id: str) -> str: exists, user = UserService.get_by_id(user_id) if not exists: @@ -1045,3 +1051,795 @@ async def generate(): if return_trace and final_ans: final_ans["data"]["trace"] = trace_items return get_result(data=final_ans) + + +@manager.route("/agents//webhook", methods=["POST", "GET", "PUT", "PATCH", "DELETE", "HEAD"]) # noqa: F821 +@manager.route("/agents//webhook/test",methods=["POST", "GET", "PUT", "PATCH", "DELETE", "HEAD"],) # noqa: F821 +async def webhook(agent_id: str): + is_test = request.path.startswith(f"/api/v1/agents/{agent_id}/webhook/test") + start_ts = time.time() + + # 1. Fetch canvas by agent_id + exists, cvs = UserCanvasService.get_by_id(agent_id) + if not exists: + return get_data_error_result(code=RetCode.BAD_REQUEST,message="Canvas not found."),RetCode.BAD_REQUEST + + # 2. Check canvas category + if cvs.canvas_category == CanvasCategory.DataFlow: + return get_data_error_result(code=RetCode.BAD_REQUEST,message="Dataflow can not be triggered by webhook."),RetCode.BAD_REQUEST + + # 3. Load DSL from canvas + dsl = getattr(cvs, "dsl", None) + if not isinstance(dsl, dict): + return get_data_error_result(code=RetCode.BAD_REQUEST,message="Invalid DSL format."),RetCode.BAD_REQUEST + + # 4. Check webhook configuration in DSL + webhook_cfg = {} + components = dsl.get("components", {}) + for k, _ in components.items(): + cpn_obj = components[k]["obj"] + if cpn_obj["component_name"].lower() == "begin" and cpn_obj["params"]["mode"] == "Webhook": + webhook_cfg = cpn_obj["params"] + + if not webhook_cfg: + return get_data_error_result(code=RetCode.BAD_REQUEST,message="Webhook not configured for this agent."),RetCode.BAD_REQUEST + + # 5. Validate request method against webhook_cfg.methods + allowed_methods = webhook_cfg.get("methods", []) + request_method = request.method.upper() + if allowed_methods and request_method not in allowed_methods: + return get_data_error_result( + code=RetCode.BAD_REQUEST,message=f"HTTP method '{request_method}' not allowed for this webhook." + ),RetCode.BAD_REQUEST + + # 6. Validate webhook security + async def validate_webhook_security(security_cfg: dict): + """Validate webhook security rules based on security configuration.""" + + if not security_cfg: + return # No security config → allowed by default + + # 1. Validate max body size + await _validate_max_body_size(security_cfg) + + # 2. Validate IP whitelist + _validate_ip_whitelist(security_cfg) + + # # 3. Validate rate limiting + _validate_rate_limit(security_cfg) + + # 4. Validate authentication + auth_type = security_cfg.get("auth_type", "none") + + if auth_type == "none": + return + + if auth_type == "token": + _validate_token_auth(security_cfg) + + elif auth_type == "basic": + _validate_basic_auth(security_cfg) + + elif auth_type == "jwt": + _validate_jwt_auth(security_cfg) + + else: + raise Exception(f"Unsupported auth_type: {auth_type}") + + async def _validate_max_body_size(security_cfg): + """Check request size does not exceed max_body_size.""" + max_size = security_cfg.get("max_body_size") + if not max_size: + return + + # Convert "10MB" → bytes + units = {"kb": 1024, "mb": 1024**2} + size_str = max_size.lower() + + for suffix, factor in units.items(): + if size_str.endswith(suffix): + limit = int(size_str.replace(suffix, "")) * factor + break + else: + raise Exception("Invalid max_body_size format") + MAX_LIMIT = 10 * 1024 * 1024 # 10MB + if limit > MAX_LIMIT: + raise Exception("max_body_size exceeds maximum allowed size (10MB)") + + content_length = request.content_length or 0 + if content_length > limit: + raise Exception(f"Request body too large: {content_length} > {limit}") + + def _validate_ip_whitelist(security_cfg): + """Allow only IPs listed in ip_whitelist.""" + whitelist = security_cfg.get("ip_whitelist", []) + if not whitelist: + return + + client_ip = request.remote_addr + + + for rule in whitelist: + if "/" in rule: + # CIDR notation + if ipaddress.ip_address(client_ip) in ipaddress.ip_network(rule, strict=False): + return + else: + # Single IP + if client_ip == rule: + return + + raise Exception(f"IP {client_ip} is not allowed by whitelist") + + def _validate_rate_limit(security_cfg): + """Simple in-memory rate limiting.""" + rl = security_cfg.get("rate_limit") + if not rl: + return + + limit = int(rl.get("limit", 60)) + if limit <= 0: + raise Exception("rate_limit.limit must be > 0") + per = rl.get("per", "minute") + + window = { + "second": 1, + "minute": 60, + "hour": 3600, + "day": 86400, + }.get(per) + + if not window: + raise Exception(f"Invalid rate_limit.per: {per}") + + capacity = limit + rate = limit / window + cost = 1 + + key = f"rl:tb:{agent_id}" + now = time.time() + + try: + res = REDIS_CONN.lua_token_bucket( + keys=[key], + args=[capacity, rate, now, cost], + client=REDIS_CONN.REDIS, + ) + + allowed = int(res[0]) + if allowed != 1: + raise Exception("Too many requests (rate limit exceeded)") + + except Exception as e: + raise Exception(f"Rate limit error: {e}") + + def _validate_token_auth(security_cfg): + """Validate header-based token authentication.""" + token_cfg = security_cfg.get("token",{}) + header = token_cfg.get("token_header") + token_value = token_cfg.get("token_value") + + provided = request.headers.get(header) + if provided != token_value: + raise Exception("Invalid token authentication") + + def _validate_basic_auth(security_cfg): + """Validate HTTP Basic Auth credentials.""" + auth_cfg = security_cfg.get("basic_auth", {}) + username = auth_cfg.get("username") + password = auth_cfg.get("password") + + auth = request.authorization + if not auth or auth.username != username or auth.password != password: + raise Exception("Invalid Basic Auth credentials") + + def _validate_jwt_auth(security_cfg): + """Validate JWT token in Authorization header.""" + jwt_cfg = security_cfg.get("jwt", {}) + secret = jwt_cfg.get("secret") + if not secret: + raise Exception("JWT secret not configured") + + auth_header = request.headers.get("Authorization", "") + if not auth_header.startswith("Bearer "): + raise Exception("Missing Bearer token") + + token = auth_header[len("Bearer "):].strip() + if not token: + raise Exception("Empty Bearer token") + + alg = (jwt_cfg.get("algorithm") or "HS256").upper() + + decode_kwargs = { + "key": secret, + "algorithms": [alg], + } + options = {} + if jwt_cfg.get("audience"): + decode_kwargs["audience"] = jwt_cfg["audience"] + options["verify_aud"] = True + else: + options["verify_aud"] = False + + if jwt_cfg.get("issuer"): + decode_kwargs["issuer"] = jwt_cfg["issuer"] + options["verify_iss"] = True + else: + options["verify_iss"] = False + try: + decoded = jwt.decode( + token, + options=options, + **decode_kwargs, + ) + except Exception as e: + raise Exception(f"Invalid JWT: {str(e)}") + + raw_required_claims = jwt_cfg.get("required_claims", []) + if isinstance(raw_required_claims, str): + required_claims = [raw_required_claims] + elif isinstance(raw_required_claims, (list, tuple, set)): + required_claims = list(raw_required_claims) + else: + required_claims = [] + + required_claims = [ + c for c in required_claims + if isinstance(c, str) and c.strip() + ] + + RESERVED_CLAIMS = {"exp", "sub", "aud", "iss", "nbf", "iat"} + for claim in required_claims: + if claim in RESERVED_CLAIMS: + raise Exception(f"Reserved JWT claim cannot be required: {claim}") + + for claim in required_claims: + if claim not in decoded: + raise Exception(f"Missing JWT claim: {claim}") + + return decoded + + try: + security_config=webhook_cfg.get("security", {}) + await validate_webhook_security(security_config) + except Exception as e: + return get_data_error_result(code=RetCode.BAD_REQUEST,message=str(e)),RetCode.BAD_REQUEST + if not isinstance(cvs.dsl, str): + dsl = json.dumps(cvs.dsl, ensure_ascii=False) + try: + canvas = Canvas(dsl, cvs.user_id, agent_id, canvas_id=agent_id) + except Exception as e: + resp=get_data_error_result(code=RetCode.BAD_REQUEST,message=str(e)) + resp.status_code = RetCode.BAD_REQUEST + return resp + + # 7. Parse request body + async def parse_webhook_request(content_type): + """Parse request based on content-type and return structured data.""" + + # 1. Query + query_data = {k: v for k, v in request.args.items()} + + # 2. Headers + header_data = {k: v for k, v in request.headers.items()} + + # 3. Body + ctype = request.headers.get("Content-Type", "").split(";")[0].strip() + if ctype and ctype != content_type: + raise ValueError( + f"Invalid Content-Type: expect '{content_type}', got '{ctype}'" + ) + + body_data: dict = {} + + try: + if ctype == "application/json": + body_data = await request.get_json() or {} + + elif ctype == "multipart/form-data": + nonlocal canvas + form = await request.form + files = await request.files + + body_data = {} + + for key, value in form.items(): + body_data[key] = value + + if len(files) > 10: + raise Exception("Too many uploaded files") + for key, file in files.items(): + desc = FileService.upload_info( + cvs.user_id, # user + file, # FileStorage + None # url (None for webhook) + ) + file_parsed= await canvas.get_files_async([desc]) + body_data[key] = file_parsed + + elif ctype == "application/x-www-form-urlencoded": + form = await request.form + body_data = dict(form) + + else: + # text/plain / octet-stream / empty / unknown + raw = await request.get_data() + if raw: + try: + body_data = json.loads(raw.decode("utf-8")) + except Exception: + body_data = {} + else: + body_data = {} + + except Exception: + body_data = {} + + return { + "query": query_data, + "headers": header_data, + "body": body_data, + "content_type": ctype, + } + + def extract_by_schema(data, schema, name="section"): + """ + Extract only fields defined in schema. + Required fields must exist. + Optional fields default to type-based default values. + Type validation included. + """ + props = schema.get("properties", {}) + required = schema.get("required", []) + + extracted = {} + + for field, field_schema in props.items(): + field_type = field_schema.get("type") + + # 1. Required field missing + if field in required and field not in data: + raise Exception(f"{name} missing required field: {field}") + + # 2. Optional → default value + if field not in data: + extracted[field] = default_for_type(field_type) + continue + + raw_value = data[field] + + # 3. Auto convert value + try: + value = auto_cast_value(raw_value, field_type) + except Exception as e: + raise Exception(f"{name}.{field} auto-cast failed: {str(e)}") + + # 4. Type validation + if not validate_type(value, field_type): + raise Exception( + f"{name}.{field} type mismatch: expected {field_type}, got {type(value).__name__}" + ) + + extracted[field] = value + + return extracted + + + def default_for_type(t): + """Return default value for the given schema type.""" + if t == "file": + return [] + if t == "object": + return {} + if t == "boolean": + return False + if t == "number": + return 0 + if t == "string": + return "" + if t and t.startswith("array"): + return [] + if t == "null": + return None + return None + + def auto_cast_value(value, expected_type): + """Convert string values into schema type when possible.""" + + # Non-string values already good + if not isinstance(value, str): + return value + + v = value.strip() + + # Boolean + if expected_type == "boolean": + if v.lower() in ["true", "1"]: + return True + if v.lower() in ["false", "0"]: + return False + raise Exception(f"Cannot convert '{value}' to boolean") + + # Number + if expected_type == "number": + # integer + if v.isdigit() or (v.startswith("-") and v[1:].isdigit()): + return int(v) + + # float + try: + return float(v) + except Exception: + raise Exception(f"Cannot convert '{value}' to number") + + # Object + if expected_type == "object": + try: + parsed = json.loads(v) + if isinstance(parsed, dict): + return parsed + else: + raise Exception("JSON is not an object") + except Exception: + raise Exception(f"Cannot convert '{value}' to object") + + # Array + if expected_type.startswith("array"): + try: + parsed = json.loads(v) + if isinstance(parsed, list): + return parsed + else: + raise Exception("JSON is not an array") + except Exception: + raise Exception(f"Cannot convert '{value}' to array") + + # String (accept original) + if expected_type == "string": + return value + + # File + if expected_type == "file": + return value + # Default: do nothing + return value + + + def validate_type(value, t): + """Validate value type against schema type t.""" + if t == "file": + return isinstance(value, list) + + if t == "string": + return isinstance(value, str) + + if t == "number": + return isinstance(value, (int, float)) + + if t == "boolean": + return isinstance(value, bool) + + if t == "object": + return isinstance(value, dict) + + # array / array / array + if t.startswith("array"): + if not isinstance(value, list): + return False + + if "<" in t and ">" in t: + inner = t[t.find("<") + 1 : t.find(">")] + + # Check each element type + for item in value: + if not validate_type(item, inner): + return False + + return True + + return True + parsed = await parse_webhook_request(webhook_cfg.get("content_types")) + SCHEMA = webhook_cfg.get("schema", {"query": {}, "headers": {}, "body": {}}) + + # Extract strictly by schema + try: + query_clean = extract_by_schema(parsed["query"], SCHEMA.get("query", {}), name="query") + header_clean = extract_by_schema(parsed["headers"], SCHEMA.get("headers", {}), name="headers") + body_clean = extract_by_schema(parsed["body"], SCHEMA.get("body", {}), name="body") + except Exception as e: + return get_data_error_result(code=RetCode.BAD_REQUEST,message=str(e)),RetCode.BAD_REQUEST + + clean_request = { + "query": query_clean, + "headers": header_clean, + "body": body_clean, + "input": parsed + } + + execution_mode = webhook_cfg.get("execution_mode", "Immediately") + response_cfg = webhook_cfg.get("response", {}) + + def append_webhook_trace(agent_id: str, start_ts: float,event: dict, ttl=600): + key = f"webhook-trace-{agent_id}-logs" + + raw = REDIS_CONN.get(key) + obj = json.loads(raw) if raw else {"webhooks": {}} + + ws = obj["webhooks"].setdefault( + str(start_ts), + {"start_ts": start_ts, "events": []} + ) + + ws["events"].append({ + "ts": time.time(), + **event + }) + + REDIS_CONN.set_obj(key, obj, ttl) + + if execution_mode == "Immediately": + status = response_cfg.get("status", 200) + try: + status = int(status) + except (TypeError, ValueError): + return get_data_error_result(code=RetCode.BAD_REQUEST,message=str(f"Invalid response status code: {status}")),RetCode.BAD_REQUEST + + if not (200 <= status <= 399): + return get_data_error_result(code=RetCode.BAD_REQUEST,message=str(f"Invalid response status code: {status}, must be between 200 and 399")),RetCode.BAD_REQUEST + + body_tpl = response_cfg.get("body_template", "") + + def parse_body(body: str): + if not body: + return None, "application/json" + + try: + parsed = json.loads(body) + return parsed, "application/json" + except (json.JSONDecodeError, TypeError): + return body, "text/plain" + + + body, content_type = parse_body(body_tpl) + resp = Response( + json.dumps(body, ensure_ascii=False) if content_type == "application/json" else body, + status=status, + content_type=content_type, + ) + + async def background_run(): + try: + async for ans in canvas.run( + query="", + user_id=cvs.user_id, + webhook_payload=clean_request + ): + if is_test: + append_webhook_trace(agent_id, start_ts, ans) + + if is_test: + append_webhook_trace( + agent_id, + start_ts, + { + "event": "finished", + "elapsed_time": time.time() - start_ts, + "success": True, + } + ) + + cvs.dsl = json.loads(str(canvas)) + UserCanvasService.update_by_id(cvs.user_id, cvs.to_dict()) + + except Exception as e: + logging.exception("Webhook background run failed") + if is_test: + try: + append_webhook_trace( + agent_id, + start_ts, + { + "event": "error", + "message": str(e), + "error_type": type(e).__name__, + } + ) + append_webhook_trace( + agent_id, + start_ts, + { + "event": "finished", + "elapsed_time": time.time() - start_ts, + "success": False, + } + ) + except Exception: + logging.exception("Failed to append webhook trace") + + asyncio.create_task(background_run()) + return resp + else: + async def sse(): + nonlocal canvas + contents: list[str] = [] + status = 200 + try: + async for ans in canvas.run( + query="", + user_id=cvs.user_id, + webhook_payload=clean_request, + ): + if ans["event"] == "message": + content = ans["data"]["content"] + if ans["data"].get("start_to_think", False): + content = "" + elif ans["data"].get("end_to_think", False): + content = "" + if content: + contents.append(content) + if ans["event"] == "message_end": + status = int(ans["data"].get("status", status)) + if is_test: + append_webhook_trace( + agent_id, + start_ts, + ans + ) + if is_test: + append_webhook_trace( + agent_id, + start_ts, + { + "event": "finished", + "elapsed_time": time.time() - start_ts, + "success": True, + } + ) + final_content = "".join(contents) + return { + "message": final_content, + "success": True, + "code": status, + } + + except Exception as e: + if is_test: + append_webhook_trace( + agent_id, + start_ts, + { + "event": "error", + "message": str(e), + "error_type": type(e).__name__, + } + ) + append_webhook_trace( + agent_id, + start_ts, + { + "event": "finished", + "elapsed_time": time.time() - start_ts, + "success": False, + } + ) + return {"code": 400, "message": str(e),"success":False} + + result = await sse() + return Response( + json.dumps(result), + status=result["code"], + mimetype="application/json", + ) + + +@manager.route("/agents//webhook/logs", methods=["GET"]) # noqa: F821 +@login_required +async def webhook_trace(agent_id: str): + exists, cvs = UserCanvasService.get_by_id(agent_id) + if not exists or str(cvs.user_id) != str(current_user.id): + return get_data_error_result( + message="Canvas not found.", + ) + + def encode_webhook_id(start_ts: str) -> str: + WEBHOOK_ID_SECRET = "webhook_id_secret" + sig = hmac.new( + WEBHOOK_ID_SECRET.encode("utf-8"), + start_ts.encode("utf-8"), + hashlib.sha256, + ).digest() + return base64.urlsafe_b64encode(sig).decode("utf-8").rstrip("=") + + def decode_webhook_id(enc_id: str, webhooks: dict) -> str | None: + for ts in webhooks.keys(): + if encode_webhook_id(ts) == enc_id: + return ts + return None + since_ts = request.args.get("since_ts", type=float) + webhook_id = request.args.get("webhook_id") + + key = f"webhook-trace-{agent_id}-logs" + raw = REDIS_CONN.get(key) + + if since_ts is None: + now = time.time() + return get_json_result( + data={ + "webhook_id": None, + "events": [], + "next_since_ts": now, + "finished": False, + } + ) + + if not raw: + return get_json_result( + data={ + "webhook_id": None, + "events": [], + "next_since_ts": since_ts, + "finished": False, + } + ) + + obj = json.loads(raw) + webhooks = obj.get("webhooks", {}) + + if webhook_id is None: + candidates = [ + float(k) for k in webhooks.keys() if float(k) > since_ts + ] + + if not candidates: + return get_json_result( + data={ + "webhook_id": None, + "events": [], + "next_since_ts": since_ts, + "finished": False, + } + ) + + start_ts = min(candidates) + real_id = str(start_ts) + webhook_id = encode_webhook_id(real_id) + + return get_json_result( + data={ + "webhook_id": webhook_id, + "events": [], + "next_since_ts": start_ts, + "finished": False, + } + ) + + real_id = decode_webhook_id(webhook_id, webhooks) + + if not real_id: + return get_json_result( + data={ + "webhook_id": webhook_id, + "events": [], + "next_since_ts": since_ts, + "finished": True, + } + ) + + ws = webhooks.get(str(real_id)) + events = ws.get("events", []) + new_events = [e for e in events if e.get("ts", 0) > since_ts] + + next_ts = since_ts + for e in new_events: + next_ts = max(next_ts, e["ts"]) + + finished = any(e.get("event") == "finished" for e in new_events) + + return get_json_result( + data={ + "webhook_id": webhook_id, + "events": new_events, + "next_since_ts": next_ts, + "finished": finished, + } + ) diff --git a/api/apps/sdk/agents.py b/api/apps/sdk/agents.py deleted file mode 100644 index 993c0b613aa..00000000000 --- a/api/apps/sdk/agents.py +++ /dev/null @@ -1,819 +0,0 @@ -# -# Copyright 2024 The InfiniFlow Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -import asyncio -import base64 -import hashlib -import hmac -import ipaddress -import json -import logging -import time - -import jwt - -from agent.canvas import Canvas -from api.db import CanvasCategory -from api.db.services.canvas_service import UserCanvasService -from api.db.services.file_service import FileService -from common.constants import RetCode -from api.utils.api_utils import get_data_error_result, get_json_result -from quart import request, Response -from rag.utils.redis_conn import REDIS_CONN - -@manager.route("/webhook/", methods=["POST", "GET", "PUT", "PATCH", "DELETE", "HEAD"]) # noqa: F821 -@manager.route("/webhook_test/",methods=["POST", "GET", "PUT", "PATCH", "DELETE", "HEAD"],) # noqa: F821 -async def webhook(agent_id: str): - is_test = request.path.startswith("/api/v1/webhook_test") - start_ts = time.time() - - # 1. Fetch canvas by agent_id - exists, cvs = UserCanvasService.get_by_id(agent_id) - if not exists: - return get_data_error_result(code=RetCode.BAD_REQUEST,message="Canvas not found."),RetCode.BAD_REQUEST - - # 2. Check canvas category - if cvs.canvas_category == CanvasCategory.DataFlow: - return get_data_error_result(code=RetCode.BAD_REQUEST,message="Dataflow can not be triggered by webhook."),RetCode.BAD_REQUEST - - # 3. Load DSL from canvas - dsl = getattr(cvs, "dsl", None) - if not isinstance(dsl, dict): - return get_data_error_result(code=RetCode.BAD_REQUEST,message="Invalid DSL format."),RetCode.BAD_REQUEST - - # 4. Check webhook configuration in DSL - webhook_cfg = {} - components = dsl.get("components", {}) - for k, _ in components.items(): - cpn_obj = components[k]["obj"] - if cpn_obj["component_name"].lower() == "begin" and cpn_obj["params"]["mode"] == "Webhook": - webhook_cfg = cpn_obj["params"] - - if not webhook_cfg: - return get_data_error_result(code=RetCode.BAD_REQUEST,message="Webhook not configured for this agent."),RetCode.BAD_REQUEST - - # 5. Validate request method against webhook_cfg.methods - allowed_methods = webhook_cfg.get("methods", []) - request_method = request.method.upper() - if allowed_methods and request_method not in allowed_methods: - return get_data_error_result( - code=RetCode.BAD_REQUEST,message=f"HTTP method '{request_method}' not allowed for this webhook." - ),RetCode.BAD_REQUEST - - # 6. Validate webhook security - async def validate_webhook_security(security_cfg: dict): - """Validate webhook security rules based on security configuration.""" - - if not security_cfg: - return # No security config → allowed by default - - # 1. Validate max body size - await _validate_max_body_size(security_cfg) - - # 2. Validate IP whitelist - _validate_ip_whitelist(security_cfg) - - # # 3. Validate rate limiting - _validate_rate_limit(security_cfg) - - # 4. Validate authentication - auth_type = security_cfg.get("auth_type", "none") - - if auth_type == "none": - return - - if auth_type == "token": - _validate_token_auth(security_cfg) - - elif auth_type == "basic": - _validate_basic_auth(security_cfg) - - elif auth_type == "jwt": - _validate_jwt_auth(security_cfg) - - else: - raise Exception(f"Unsupported auth_type: {auth_type}") - - async def _validate_max_body_size(security_cfg): - """Check request size does not exceed max_body_size.""" - max_size = security_cfg.get("max_body_size") - if not max_size: - return - - # Convert "10MB" → bytes - units = {"kb": 1024, "mb": 1024**2} - size_str = max_size.lower() - - for suffix, factor in units.items(): - if size_str.endswith(suffix): - limit = int(size_str.replace(suffix, "")) * factor - break - else: - raise Exception("Invalid max_body_size format") - MAX_LIMIT = 10 * 1024 * 1024 # 10MB - if limit > MAX_LIMIT: - raise Exception("max_body_size exceeds maximum allowed size (10MB)") - - content_length = request.content_length or 0 - if content_length > limit: - raise Exception(f"Request body too large: {content_length} > {limit}") - - def _validate_ip_whitelist(security_cfg): - """Allow only IPs listed in ip_whitelist.""" - whitelist = security_cfg.get("ip_whitelist", []) - if not whitelist: - return - - client_ip = request.remote_addr - - - for rule in whitelist: - if "/" in rule: - # CIDR notation - if ipaddress.ip_address(client_ip) in ipaddress.ip_network(rule, strict=False): - return - else: - # Single IP - if client_ip == rule: - return - - raise Exception(f"IP {client_ip} is not allowed by whitelist") - - def _validate_rate_limit(security_cfg): - """Simple in-memory rate limiting.""" - rl = security_cfg.get("rate_limit") - if not rl: - return - - limit = int(rl.get("limit", 60)) - if limit <= 0: - raise Exception("rate_limit.limit must be > 0") - per = rl.get("per", "minute") - - window = { - "second": 1, - "minute": 60, - "hour": 3600, - "day": 86400, - }.get(per) - - if not window: - raise Exception(f"Invalid rate_limit.per: {per}") - - capacity = limit - rate = limit / window - cost = 1 - - key = f"rl:tb:{agent_id}" - now = time.time() - - try: - res = REDIS_CONN.lua_token_bucket( - keys=[key], - args=[capacity, rate, now, cost], - client=REDIS_CONN.REDIS, - ) - - allowed = int(res[0]) - if allowed != 1: - raise Exception("Too many requests (rate limit exceeded)") - - except Exception as e: - raise Exception(f"Rate limit error: {e}") - - def _validate_token_auth(security_cfg): - """Validate header-based token authentication.""" - token_cfg = security_cfg.get("token",{}) - header = token_cfg.get("token_header") - token_value = token_cfg.get("token_value") - - provided = request.headers.get(header) - if provided != token_value: - raise Exception("Invalid token authentication") - - def _validate_basic_auth(security_cfg): - """Validate HTTP Basic Auth credentials.""" - auth_cfg = security_cfg.get("basic_auth", {}) - username = auth_cfg.get("username") - password = auth_cfg.get("password") - - auth = request.authorization - if not auth or auth.username != username or auth.password != password: - raise Exception("Invalid Basic Auth credentials") - - def _validate_jwt_auth(security_cfg): - """Validate JWT token in Authorization header.""" - jwt_cfg = security_cfg.get("jwt", {}) - secret = jwt_cfg.get("secret") - if not secret: - raise Exception("JWT secret not configured") - - auth_header = request.headers.get("Authorization", "") - if not auth_header.startswith("Bearer "): - raise Exception("Missing Bearer token") - - token = auth_header[len("Bearer "):].strip() - if not token: - raise Exception("Empty Bearer token") - - alg = (jwt_cfg.get("algorithm") or "HS256").upper() - - decode_kwargs = { - "key": secret, - "algorithms": [alg], - } - options = {} - if jwt_cfg.get("audience"): - decode_kwargs["audience"] = jwt_cfg["audience"] - options["verify_aud"] = True - else: - options["verify_aud"] = False - - if jwt_cfg.get("issuer"): - decode_kwargs["issuer"] = jwt_cfg["issuer"] - options["verify_iss"] = True - else: - options["verify_iss"] = False - try: - decoded = jwt.decode( - token, - options=options, - **decode_kwargs, - ) - except Exception as e: - raise Exception(f"Invalid JWT: {str(e)}") - - raw_required_claims = jwt_cfg.get("required_claims", []) - if isinstance(raw_required_claims, str): - required_claims = [raw_required_claims] - elif isinstance(raw_required_claims, (list, tuple, set)): - required_claims = list(raw_required_claims) - else: - required_claims = [] - - required_claims = [ - c for c in required_claims - if isinstance(c, str) and c.strip() - ] - - RESERVED_CLAIMS = {"exp", "sub", "aud", "iss", "nbf", "iat"} - for claim in required_claims: - if claim in RESERVED_CLAIMS: - raise Exception(f"Reserved JWT claim cannot be required: {claim}") - - for claim in required_claims: - if claim not in decoded: - raise Exception(f"Missing JWT claim: {claim}") - - return decoded - - try: - security_config=webhook_cfg.get("security", {}) - await validate_webhook_security(security_config) - except Exception as e: - return get_data_error_result(code=RetCode.BAD_REQUEST,message=str(e)),RetCode.BAD_REQUEST - if not isinstance(cvs.dsl, str): - dsl = json.dumps(cvs.dsl, ensure_ascii=False) - try: - canvas = Canvas(dsl, cvs.user_id, agent_id, canvas_id=agent_id) - except Exception as e: - resp=get_data_error_result(code=RetCode.BAD_REQUEST,message=str(e)) - resp.status_code = RetCode.BAD_REQUEST - return resp - - # 7. Parse request body - async def parse_webhook_request(content_type): - """Parse request based on content-type and return structured data.""" - - # 1. Query - query_data = {k: v for k, v in request.args.items()} - - # 2. Headers - header_data = {k: v for k, v in request.headers.items()} - - # 3. Body - ctype = request.headers.get("Content-Type", "").split(";")[0].strip() - if ctype and ctype != content_type: - raise ValueError( - f"Invalid Content-Type: expect '{content_type}', got '{ctype}'" - ) - - body_data: dict = {} - - try: - if ctype == "application/json": - body_data = await request.get_json() or {} - - elif ctype == "multipart/form-data": - nonlocal canvas - form = await request.form - files = await request.files - - body_data = {} - - for key, value in form.items(): - body_data[key] = value - - if len(files) > 10: - raise Exception("Too many uploaded files") - for key, file in files.items(): - desc = FileService.upload_info( - cvs.user_id, # user - file, # FileStorage - None # url (None for webhook) - ) - file_parsed= await canvas.get_files_async([desc]) - body_data[key] = file_parsed - - elif ctype == "application/x-www-form-urlencoded": - form = await request.form - body_data = dict(form) - - else: - # text/plain / octet-stream / empty / unknown - raw = await request.get_data() - if raw: - try: - body_data = json.loads(raw.decode("utf-8")) - except Exception: - body_data = {} - else: - body_data = {} - - except Exception: - body_data = {} - - return { - "query": query_data, - "headers": header_data, - "body": body_data, - "content_type": ctype, - } - - def extract_by_schema(data, schema, name="section"): - """ - Extract only fields defined in schema. - Required fields must exist. - Optional fields default to type-based default values. - Type validation included. - """ - props = schema.get("properties", {}) - required = schema.get("required", []) - - extracted = {} - - for field, field_schema in props.items(): - field_type = field_schema.get("type") - - # 1. Required field missing - if field in required and field not in data: - raise Exception(f"{name} missing required field: {field}") - - # 2. Optional → default value - if field not in data: - extracted[field] = default_for_type(field_type) - continue - - raw_value = data[field] - - # 3. Auto convert value - try: - value = auto_cast_value(raw_value, field_type) - except Exception as e: - raise Exception(f"{name}.{field} auto-cast failed: {str(e)}") - - # 4. Type validation - if not validate_type(value, field_type): - raise Exception( - f"{name}.{field} type mismatch: expected {field_type}, got {type(value).__name__}" - ) - - extracted[field] = value - - return extracted - - - def default_for_type(t): - """Return default value for the given schema type.""" - if t == "file": - return [] - if t == "object": - return {} - if t == "boolean": - return False - if t == "number": - return 0 - if t == "string": - return "" - if t and t.startswith("array"): - return [] - if t == "null": - return None - return None - - def auto_cast_value(value, expected_type): - """Convert string values into schema type when possible.""" - - # Non-string values already good - if not isinstance(value, str): - return value - - v = value.strip() - - # Boolean - if expected_type == "boolean": - if v.lower() in ["true", "1"]: - return True - if v.lower() in ["false", "0"]: - return False - raise Exception(f"Cannot convert '{value}' to boolean") - - # Number - if expected_type == "number": - # integer - if v.isdigit() or (v.startswith("-") and v[1:].isdigit()): - return int(v) - - # float - try: - return float(v) - except Exception: - raise Exception(f"Cannot convert '{value}' to number") - - # Object - if expected_type == "object": - try: - parsed = json.loads(v) - if isinstance(parsed, dict): - return parsed - else: - raise Exception("JSON is not an object") - except Exception: - raise Exception(f"Cannot convert '{value}' to object") - - # Array - if expected_type.startswith("array"): - try: - parsed = json.loads(v) - if isinstance(parsed, list): - return parsed - else: - raise Exception("JSON is not an array") - except Exception: - raise Exception(f"Cannot convert '{value}' to array") - - # String (accept original) - if expected_type == "string": - return value - - # File - if expected_type == "file": - return value - # Default: do nothing - return value - - - def validate_type(value, t): - """Validate value type against schema type t.""" - if t == "file": - return isinstance(value, list) - - if t == "string": - return isinstance(value, str) - - if t == "number": - return isinstance(value, (int, float)) - - if t == "boolean": - return isinstance(value, bool) - - if t == "object": - return isinstance(value, dict) - - # array / array / array - if t.startswith("array"): - if not isinstance(value, list): - return False - - if "<" in t and ">" in t: - inner = t[t.find("<") + 1 : t.find(">")] - - # Check each element type - for item in value: - if not validate_type(item, inner): - return False - - return True - - return True - parsed = await parse_webhook_request(webhook_cfg.get("content_types")) - SCHEMA = webhook_cfg.get("schema", {"query": {}, "headers": {}, "body": {}}) - - # Extract strictly by schema - try: - query_clean = extract_by_schema(parsed["query"], SCHEMA.get("query", {}), name="query") - header_clean = extract_by_schema(parsed["headers"], SCHEMA.get("headers", {}), name="headers") - body_clean = extract_by_schema(parsed["body"], SCHEMA.get("body", {}), name="body") - except Exception as e: - return get_data_error_result(code=RetCode.BAD_REQUEST,message=str(e)),RetCode.BAD_REQUEST - - clean_request = { - "query": query_clean, - "headers": header_clean, - "body": body_clean, - "input": parsed - } - - execution_mode = webhook_cfg.get("execution_mode", "Immediately") - response_cfg = webhook_cfg.get("response", {}) - - def append_webhook_trace(agent_id: str, start_ts: float,event: dict, ttl=600): - key = f"webhook-trace-{agent_id}-logs" - - raw = REDIS_CONN.get(key) - obj = json.loads(raw) if raw else {"webhooks": {}} - - ws = obj["webhooks"].setdefault( - str(start_ts), - {"start_ts": start_ts, "events": []} - ) - - ws["events"].append({ - "ts": time.time(), - **event - }) - - REDIS_CONN.set_obj(key, obj, ttl) - - if execution_mode == "Immediately": - status = response_cfg.get("status", 200) - try: - status = int(status) - except (TypeError, ValueError): - return get_data_error_result(code=RetCode.BAD_REQUEST,message=str(f"Invalid response status code: {status}")),RetCode.BAD_REQUEST - - if not (200 <= status <= 399): - return get_data_error_result(code=RetCode.BAD_REQUEST,message=str(f"Invalid response status code: {status}, must be between 200 and 399")),RetCode.BAD_REQUEST - - body_tpl = response_cfg.get("body_template", "") - - def parse_body(body: str): - if not body: - return None, "application/json" - - try: - parsed = json.loads(body) - return parsed, "application/json" - except (json.JSONDecodeError, TypeError): - return body, "text/plain" - - - body, content_type = parse_body(body_tpl) - resp = Response( - json.dumps(body, ensure_ascii=False) if content_type == "application/json" else body, - status=status, - content_type=content_type, - ) - - async def background_run(): - try: - async for ans in canvas.run( - query="", - user_id=cvs.user_id, - webhook_payload=clean_request - ): - if is_test: - append_webhook_trace(agent_id, start_ts, ans) - - if is_test: - append_webhook_trace( - agent_id, - start_ts, - { - "event": "finished", - "elapsed_time": time.time() - start_ts, - "success": True, - } - ) - - cvs.dsl = json.loads(str(canvas)) - UserCanvasService.update_by_id(cvs.user_id, cvs.to_dict()) - - except Exception as e: - logging.exception("Webhook background run failed") - if is_test: - try: - append_webhook_trace( - agent_id, - start_ts, - { - "event": "error", - "message": str(e), - "error_type": type(e).__name__, - } - ) - append_webhook_trace( - agent_id, - start_ts, - { - "event": "finished", - "elapsed_time": time.time() - start_ts, - "success": False, - } - ) - except Exception: - logging.exception("Failed to append webhook trace") - - asyncio.create_task(background_run()) - return resp - else: - async def sse(): - nonlocal canvas - contents: list[str] = [] - status = 200 - try: - async for ans in canvas.run( - query="", - user_id=cvs.user_id, - webhook_payload=clean_request, - ): - if ans["event"] == "message": - content = ans["data"]["content"] - if ans["data"].get("start_to_think", False): - content = "" - elif ans["data"].get("end_to_think", False): - content = "" - if content: - contents.append(content) - if ans["event"] == "message_end": - status = int(ans["data"].get("status", status)) - if is_test: - append_webhook_trace( - agent_id, - start_ts, - ans - ) - if is_test: - append_webhook_trace( - agent_id, - start_ts, - { - "event": "finished", - "elapsed_time": time.time() - start_ts, - "success": True, - } - ) - final_content = "".join(contents) - return { - "message": final_content, - "success": True, - "code": status, - } - - except Exception as e: - if is_test: - append_webhook_trace( - agent_id, - start_ts, - { - "event": "error", - "message": str(e), - "error_type": type(e).__name__, - } - ) - append_webhook_trace( - agent_id, - start_ts, - { - "event": "finished", - "elapsed_time": time.time() - start_ts, - "success": False, - } - ) - return {"code": 400, "message": str(e),"success":False} - - result = await sse() - return Response( - json.dumps(result), - status=result["code"], - mimetype="application/json", - ) - - -@manager.route("/webhook_trace/", methods=["GET"]) # noqa: F821 -async def webhook_trace(agent_id: str): - def encode_webhook_id(start_ts: str) -> str: - WEBHOOK_ID_SECRET = "webhook_id_secret" - sig = hmac.new( - WEBHOOK_ID_SECRET.encode("utf-8"), - start_ts.encode("utf-8"), - hashlib.sha256, - ).digest() - return base64.urlsafe_b64encode(sig).decode("utf-8").rstrip("=") - - def decode_webhook_id(enc_id: str, webhooks: dict) -> str | None: - for ts in webhooks.keys(): - if encode_webhook_id(ts) == enc_id: - return ts - return None - since_ts = request.args.get("since_ts", type=float) - webhook_id = request.args.get("webhook_id") - - key = f"webhook-trace-{agent_id}-logs" - raw = REDIS_CONN.get(key) - - if since_ts is None: - now = time.time() - return get_json_result( - data={ - "webhook_id": None, - "events": [], - "next_since_ts": now, - "finished": False, - } - ) - - if not raw: - return get_json_result( - data={ - "webhook_id": None, - "events": [], - "next_since_ts": since_ts, - "finished": False, - } - ) - - obj = json.loads(raw) - webhooks = obj.get("webhooks", {}) - - if webhook_id is None: - candidates = [ - float(k) for k in webhooks.keys() if float(k) > since_ts - ] - - if not candidates: - return get_json_result( - data={ - "webhook_id": None, - "events": [], - "next_since_ts": since_ts, - "finished": False, - } - ) - - start_ts = min(candidates) - real_id = str(start_ts) - webhook_id = encode_webhook_id(real_id) - - return get_json_result( - data={ - "webhook_id": webhook_id, - "events": [], - "next_since_ts": start_ts, - "finished": False, - } - ) - - real_id = decode_webhook_id(webhook_id, webhooks) - - if not real_id: - return get_json_result( - data={ - "webhook_id": webhook_id, - "events": [], - "next_since_ts": since_ts, - "finished": True, - } - ) - - ws = webhooks.get(str(real_id)) - events = ws.get("events", []) - new_events = [e for e in events if e.get("ts", 0) > since_ts] - - next_ts = since_ts - for e in new_events: - next_ts = max(next_ts, e["ts"]) - - finished = any(e.get("event") == "finished" for e in new_events) - - return get_json_result( - data={ - "webhook_id": webhook_id, - "events": new_events, - "next_since_ts": next_ts, - "finished": finished, - } - ) diff --git a/test/testcases/test_http_api/test_session_management/test_session_sdk_routes_unit.py b/test/testcases/test_http_api/test_session_management/test_session_sdk_routes_unit.py index b94a6f80c5b..9834b28e25c 100644 --- a/test/testcases/test_http_api/test_session_management/test_session_sdk_routes_unit.py +++ b/test/testcases/test_http_api/test_session_management/test_session_sdk_routes_unit.py @@ -552,6 +552,7 @@ class _StubAgentLLM: api_apps_mod = ModuleType("api.apps") api_apps_mod.__path__ = [str(repo_root / "api" / "apps")] + api_apps_mod.current_user = SimpleNamespace(id="tenant-1") api_apps_mod.login_required = lambda func: func monkeypatch.setitem(sys.modules, "api.apps", api_apps_mod) diff --git a/test/testcases/test_web_api/test_agent_app/test_agents_webhook_unit.py b/test/testcases/test_web_api/test_agent_app/test_agents_webhook_unit.py new file mode 100644 index 00000000000..b1f7b6c4a88 --- /dev/null +++ b/test/testcases/test_web_api/test_agent_app/test_agents_webhook_unit.py @@ -0,0 +1,1427 @@ +# +# Copyright 2026 The InfiniFlow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +import asyncio +import base64 +import hashlib +import hmac +import importlib.util +import json +import sys +from pathlib import Path +from types import ModuleType, SimpleNamespace + +import pytest + + +class _DummyManager: + def route(self, *_args, **_kwargs): + def decorator(func): + return func + + return decorator + + +class _AwaitableValue: + def __init__(self, value): + self._value = value + + def __await__(self): + async def _co(): + return self._value + + return _co().__await__() + + +class _Args(dict): + def get(self, key, default=None, type=None): + value = super().get(key, default) + if value is None or type is None: + return value + try: + return type(value) + except (TypeError, ValueError): + return default + + +class _DummyRequest: + def __init__( + self, + *, + path="/api/v1/agents/agent-1/webhook", + method="POST", + headers=None, + content_length=0, + remote_addr="127.0.0.1", + args=None, + json_body=None, + raw_body=b"", + form=None, + files=None, + authorization=None, + ): + self.path = path + self.method = method + self.headers = headers or {} + self.content_length = content_length + self.remote_addr = remote_addr + self.args = args or {} + self.authorization = authorization + self.form = _AwaitableValue(form or {}) + self.files = _AwaitableValue(files or {}) + self._json_body = json_body + self._raw_body = raw_body + + async def get_json(self): + return self._json_body + + async def get_data(self): + return self._raw_body + + +class _CanvasRecord: + def __init__(self, *, canvas_category, dsl, user_id="tenant-1"): + self.canvas_category = canvas_category + self.dsl = dsl + self.user_id = user_id + + def to_dict(self): + return {"user_id": self.user_id, "dsl": self.dsl} + + +class _StubCanvas: + def __init__(self, dsl, user_id, agent_id, canvas_id=None): + self.dsl = dsl + self.user_id = user_id + self.agent_id = agent_id + self.canvas_id = canvas_id + + async def run(self, **_kwargs): + if False: + yield {} + + async def get_files_async(self, desc): + return {"files": desc} + + def __str__(self): + return "{}" + + +class _StubRedisConn: + def __init__(self): + self.bucket_result = [1] + self.bucket_exc = None + self.REDIS = object() + + def lua_token_bucket(self, **_kwargs): + if self.bucket_exc is not None: + raise self.bucket_exc + return self.bucket_result + + def get(self, _key): + return None + + def set_obj(self, _key, _obj, _ttl): + return None + + +def _run(coro): + return asyncio.run(coro) + + +def _default_webhook_params( + *, + security=None, + methods=None, + content_types="application/json", + schema=None, + execution_mode="Immediately", + response=None, +): + return { + "mode": "Webhook", + "methods": methods if methods is not None else ["POST"], + "security": security if security is not None else {}, + "content_types": content_types, + "schema": schema + if schema is not None + else { + "query": {"properties": {}, "required": []}, + "headers": {"properties": {}, "required": []}, + "body": {"properties": {}, "required": []}, + }, + "execution_mode": execution_mode, + "response": response if response is not None else {}, + } + + +def _make_webhook_cvs(module, *, params=None, dsl=None, canvas_category=None): + if dsl is None: + if params is None: + params = _default_webhook_params() + dsl = { + "components": { + "begin": { + "obj": {"component_name": "Begin", "params": params}, + "downstream": [], + "upstream": [], + } + } + } + if canvas_category is None: + canvas_category = module.CanvasCategory.Agent + return _CanvasRecord(canvas_category=canvas_category, dsl=dsl) + + +def _patch_background_task(monkeypatch, module): + def _fake_create_task(coro): + coro.close() + return None + + monkeypatch.setattr(module.asyncio, "create_task", _fake_create_task) + + +def _load_agents_app(monkeypatch, *, target="rest"): + repo_root = Path(__file__).resolve().parents[4] + + common_pkg = ModuleType("common") + common_pkg.__path__ = [str(repo_root / "common")] + monkeypatch.setitem(sys.modules, "common", common_pkg) + settings_mod = ModuleType("common.settings") + settings_mod.DATABASE_TYPE = "mysql" + settings_mod.docStoreConn = SimpleNamespace( + index_exist=lambda *_args, **_kwargs: False, + delete=lambda *_args, **_kwargs: None, + ) + common_pkg.settings = settings_mod + monkeypatch.setitem(sys.modules, "common.settings", settings_mod) + + agent_pkg = ModuleType("agent") + agent_pkg.__path__ = [str(repo_root / "agent")] + canvas_mod = ModuleType("agent.canvas") + canvas_mod.Canvas = _StubCanvas + component_mod = ModuleType("agent.component") + component_mod.LLM = type("_StubAgentLLM", (), {}) + dsl_migration_mod = ModuleType("agent.dsl_migration") + dsl_migration_mod.normalize_chunker_dsl = lambda dsl: dsl + agent_pkg.canvas = canvas_mod + agent_pkg.component = component_mod + agent_pkg.dsl_migration = dsl_migration_mod + monkeypatch.setitem(sys.modules, "agent", agent_pkg) + monkeypatch.setitem(sys.modules, "agent.canvas", canvas_mod) + monkeypatch.setitem(sys.modules, "agent.component", component_mod) + monkeypatch.setitem(sys.modules, "agent.dsl_migration", dsl_migration_mod) + + services_pkg = ModuleType("api.db.services") + services_pkg.__path__ = [] + monkeypatch.setitem(sys.modules, "api.db.services", services_pkg) + + db_models_mod = ModuleType("api.db.db_models") + db_models_mod.Task = type("_StubTask", (), {"doc_id": "doc_id"}) + db_models_mod.APIToken = type( + "_StubAPIToken", + (), + {"query": staticmethod(lambda **_kwargs: [])}, + ) + monkeypatch.setitem(sys.modules, "api.db.db_models", db_models_mod) + + canvas_service_mod = ModuleType("api.db.services.canvas_service") + + class _StubUserCanvasService: + @staticmethod + def query(**_kwargs): + return [] + + @staticmethod + def get_list(*_args, **_kwargs): + return [] + + @staticmethod + def get_by_tenant_ids(*_args, **_kwargs): + return [], 0 + + @staticmethod + def save(**_kwargs): + return True + + @staticmethod + def update_by_id(*_args, **_kwargs): + return True + + @staticmethod + def delete_by_id(*_args, **_kwargs): + return True + + @staticmethod + def get_by_id(_id): + return False, None + + @staticmethod + def get_by_canvas_id(_id): + return False, None + + @staticmethod + def accessible(*_args, **_kwargs): + return True + + canvas_service_mod.UserCanvasService = _StubUserCanvasService + canvas_service_mod.CanvasTemplateService = type("_StubCanvasTemplateService", (), {}) + canvas_service_mod.completion = lambda *_args, **_kwargs: None + canvas_service_mod.completion_openai = lambda *_args, **_kwargs: None + monkeypatch.setitem(sys.modules, "api.db.services.canvas_service", canvas_service_mod) + services_pkg.canvas_service = canvas_service_mod + + api_service_mod = ModuleType("api.db.services.api_service") + + class _StubAPI4ConversationService: + @staticmethod + def get_names(*_args, **_kwargs): + return [] + + @staticmethod + def get_list(*_args, **_kwargs): + return 0, [] + + api_service_mod.API4ConversationService = _StubAPI4ConversationService + monkeypatch.setitem(sys.modules, "api.db.services.api_service", api_service_mod) + services_pkg.api_service = api_service_mod + + document_service_mod = ModuleType("api.db.services.document_service") + document_service_mod.DocumentService = type( + "_StubDocumentService", + (), + { + "clear_chunk_num_when_rerun": staticmethod(lambda *_args, **_kwargs: True), + "update_by_id": staticmethod(lambda *_args, **_kwargs: True), + }, + ) + monkeypatch.setitem(sys.modules, "api.db.services.document_service", document_service_mod) + services_pkg.document_service = document_service_mod + + file_service_mod = ModuleType("api.db.services.file_service") + + class _StubFileService: + @staticmethod + def upload_info(*_args, **_kwargs): + return {"id": "uploaded"} + + file_service_mod.FileService = _StubFileService + monkeypatch.setitem(sys.modules, "api.db.services.file_service", file_service_mod) + services_pkg.file_service = file_service_mod + + knowledgebase_service_mod = ModuleType("api.db.services.knowledgebase_service") + knowledgebase_service_mod.KnowledgebaseService = type( + "_StubKnowledgebaseService", + (), + {"query": staticmethod(lambda **_kwargs: [])}, + ) + monkeypatch.setitem(sys.modules, "api.db.services.knowledgebase_service", knowledgebase_service_mod) + services_pkg.knowledgebase_service = knowledgebase_service_mod + + pipeline_log_service_mod = ModuleType("api.db.services.pipeline_operation_log_service") + pipeline_log_service_mod.PipelineOperationLogService = type( + "_StubPipelineOperationLogService", + (), + { + "get_documents_info": staticmethod(lambda *_args, **_kwargs: []), + "update_by_id": staticmethod(lambda *_args, **_kwargs: True), + }, + ) + monkeypatch.setitem(sys.modules, "api.db.services.pipeline_operation_log_service", pipeline_log_service_mod) + services_pkg.pipeline_operation_log_service = pipeline_log_service_mod + + task_service_mod = ModuleType("api.db.services.task_service") + task_service_mod.CANVAS_DEBUG_DOC_ID = "debug-doc-id" + task_service_mod.TaskService = type( + "_StubTaskService", + (), + {"filter_delete": staticmethod(lambda *_args, **_kwargs: True)}, + ) + task_service_mod.queue_dataflow = lambda *_args, **_kwargs: (True, "") + monkeypatch.setitem(sys.modules, "api.db.services.task_service", task_service_mod) + services_pkg.task_service = task_service_mod + + canvas_version_mod = ModuleType("api.db.services.user_canvas_version") + + class _StubUserCanvasVersionService: + @staticmethod + def insert(**_kwargs): + return True + + @staticmethod + def delete_all_versions(*_args, **_kwargs): + return True + + @staticmethod + def save_or_replace_latest(*_args, **_kwargs): + return True + + @staticmethod + def build_version_title(*_args, **_kwargs): + return "stub_version_title" + + canvas_version_mod.UserCanvasVersionService = _StubUserCanvasVersionService + monkeypatch.setitem(sys.modules, "api.db.services.user_canvas_version", canvas_version_mod) + services_pkg.user_canvas_version = canvas_version_mod + + tenant_llm_service_mod = ModuleType("api.db.services.tenant_llm_service") + + class _StubLLMFactoriesService: + @staticmethod + def get_api_key(*_args, **_kwargs): + return None + + tenant_llm_service_mod.LLMFactoriesService = _StubLLMFactoriesService + monkeypatch.setitem(sys.modules, "api.db.services.tenant_llm_service", tenant_llm_service_mod) + services_pkg.tenant_llm_service = tenant_llm_service_mod + + user_service_mod = ModuleType("api.db.services.user_service") + + class _StubTenantService: + @staticmethod + def get_joined_tenants_by_user_id(_tenant_id): + return [] + + class _StubUserService: + @staticmethod + def query(**_kwargs): + return [] + + @staticmethod + def get_by_id(_id): + return False, None + + user_service_mod.TenantService = _StubTenantService + user_service_mod.UserService = _StubUserService + monkeypatch.setitem(sys.modules, "api.db.services.user_service", user_service_mod) + services_pkg.user_service = user_service_mod + services_pkg.TenantService = _StubTenantService + services_pkg.UserService = _StubUserService + + # Stub api.apps package to prevent api/apps/__init__.py from executing + # (it triggers heavy imports like quart, settings, DB connections). + api_apps_pkg = ModuleType("api.apps") + api_apps_pkg.__path__ = [] + api_apps_pkg.current_user = SimpleNamespace(id="tenant-1") + + def _identity_decorator(func): + return func + + api_apps_pkg.login_required = _identity_decorator + monkeypatch.setitem(sys.modules, "api.apps", api_apps_pkg) + + api_apps_services_pkg = ModuleType("api.apps.services") + api_apps_services_pkg.__path__ = [] + monkeypatch.setitem(sys.modules, "api.apps.services", api_apps_services_pkg) + api_apps_pkg.services = api_apps_services_pkg + + canvas_replica_mod = ModuleType("api.apps.services.canvas_replica_service") + + class _StubCanvasReplicaService: + @classmethod + def normalize_dsl(cls, dsl): + import json + if isinstance(dsl, str): + return json.loads(dsl) + return dsl + + @classmethod + def bootstrap(cls, *_args, **_kwargs): + return {} + + @classmethod + def load_for_run(cls, *_args, **_kwargs): + return None + + @classmethod + def commit_after_run(cls, *_args, **_kwargs): + return True + + @classmethod + def replace_for_set(cls, *_args, **_kwargs): + return True + + @classmethod + def create_if_absent(cls, *_args, **_kwargs): + return {} + + canvas_replica_mod.CanvasReplicaService = _StubCanvasReplicaService + monkeypatch.setitem(sys.modules, "api.apps.services.canvas_replica_service", canvas_replica_mod) + api_apps_services_pkg.canvas_replica_service = canvas_replica_mod + + redis_obj = _StubRedisConn() + redis_mod = ModuleType("rag.utils.redis_conn") + redis_mod.REDIS_CONN = redis_obj + monkeypatch.setitem(sys.modules, "rag.utils.redis_conn", redis_mod) + + rag_pkg = ModuleType("rag") + rag_pkg.__path__ = [] + rag_flow_pkg = ModuleType("rag.flow") + rag_flow_pkg.__path__ = [] + rag_flow_pipeline_mod = ModuleType("rag.flow.pipeline") + rag_flow_pipeline_mod.Pipeline = type("_StubPipeline", (), {}) + rag_nlp_pkg = ModuleType("rag.nlp") + rag_search_mod = ModuleType("rag.nlp.search") + rag_search_mod.index_name = lambda tenant_id: f"idx-{tenant_id}" + rag_nlp_pkg.search = rag_search_mod + monkeypatch.setitem(sys.modules, "rag", rag_pkg) + monkeypatch.setitem(sys.modules, "rag.flow", rag_flow_pkg) + monkeypatch.setitem(sys.modules, "rag.flow.pipeline", rag_flow_pipeline_mod) + monkeypatch.setitem(sys.modules, "rag.nlp", rag_nlp_pkg) + monkeypatch.setitem(sys.modules, "rag.nlp.search", rag_search_mod) + + module_path = repo_root / "api" / "apps" / "restful_apis" / "agent_api.py" + spec = importlib.util.spec_from_file_location("test_agents_webhook_unit", module_path) + module = importlib.util.module_from_spec(spec) + module.manager = _DummyManager() + spec.loader.exec_module(module) + return module + + +def _assert_bad_request(res, expected_substring): + assert isinstance(res, tuple), res + payload, code = res + assert code == 400, res + assert payload["code"] == 400, payload + assert expected_substring in payload["message"], payload + + +@pytest.mark.p2 +def test_agents_crud_unit_branches(monkeypatch): + module = _load_agents_app(monkeypatch) + + monkeypatch.setattr(module.TenantService, "get_joined_tenants_by_user_id", lambda _tenant_id: []) + monkeypatch.setattr( + module, + "request", + SimpleNamespace(args={"owner_ids": "other-tenant", "desc": "false", "page": "1", "page_size": "10"}), + ) + res = module.list_agents.__wrapped__("tenant-1") + assert res["code"] == module.RetCode.OPERATING_ERROR + assert "authorized owner_ids" in res["message"] + + captured = {} + + def fake_get_by_tenant_ids(owner_ids, tenant_id, page, page_size, orderby, desc, keywords, canvas_category): + captured["owner_ids"] = owner_ids + captured["tenant_id"] = tenant_id + captured["page"] = page + captured["page_size"] = page_size + captured["orderby"] = orderby + captured["desc"] = desc + captured["keywords"] = keywords + captured["canvas_category"] = canvas_category + return [{"id": "agent-1"}], 1 + + monkeypatch.setattr(module.UserCanvasService, "get_by_tenant_ids", fake_get_by_tenant_ids) + monkeypatch.setattr(module, "request", SimpleNamespace(args={"desc": "true"})) + res = module.list_agents.__wrapped__("tenant-1") + assert res["code"] == module.RetCode.SUCCESS + assert captured["owner_ids"] == ["tenant-1"] + assert captured["desc"] is True + + async def req_no_dsl(): + return {"title": "agent-a"} + + monkeypatch.setattr(module, "get_request_json", req_no_dsl) + res = _run(module.create_agent.__wrapped__("tenant-1")) + assert res["code"] == module.RetCode.ARGUMENT_ERROR + assert "No DSL data in request" in res["message"] + + async def req_no_title(): + return {"dsl": {"components": {}}} + + monkeypatch.setattr(module, "get_request_json", req_no_title) + res = _run(module.create_agent.__wrapped__("tenant-1")) + assert res["code"] == module.RetCode.ARGUMENT_ERROR + assert "No title in request" in res["message"] + + async def req_dup(): + return {"dsl": {"components": {}}, "title": "agent-dup"} + + monkeypatch.setattr(module, "get_request_json", req_dup) + monkeypatch.setattr(module.UserCanvasService, "query", lambda **_kwargs: [object()]) + res = _run(module.create_agent.__wrapped__("tenant-1")) + assert res["code"] == module.RetCode.DATA_ERROR + assert "already exists" in res["message"] + + monkeypatch.setattr(module.UserCanvasService, "query", lambda **_kwargs: []) + monkeypatch.setattr(module, "get_uuid", lambda: "agent-created") + monkeypatch.setattr(module.UserCanvasService, "save", lambda **_kwargs: False) + res = _run(module.create_agent.__wrapped__("tenant-1")) + assert res["code"] == module.RetCode.DATA_ERROR + assert "Fail to create agent" in res["message"] + + async def req_update(): + return {"dsl": {"nodes": []}, "title": " webhook-agent ", "unused": None} + + monkeypatch.setattr(module, "get_request_json", req_update) + monkeypatch.setattr(module.UserCanvasService, "query", lambda **_kwargs: False) + res = _run(module.update_agent.__wrapped__("agent-1", "tenant-1")) + assert res["code"] == module.RetCode.OPERATING_ERROR + + calls = {"update": 0, "save_or_replace_latest": 0, "replace_for_set": 0} + monkeypatch.setattr(module.UserCanvasService, "query", lambda **_kwargs: True) + monkeypatch.setattr( + module.UserCanvasService, + "get_by_id", + lambda _id: (True, SimpleNamespace(title="agent-1", canvas_category=module.CanvasCategory.Agent)), + ) + monkeypatch.setattr( + module.UserCanvasService, + "update_by_id", + lambda *_args, **_kwargs: calls.__setitem__("update", calls["update"] + 1), + ) + monkeypatch.setattr( + module.UserCanvasVersionService, + "save_or_replace_latest", + lambda *_args, **_kwargs: calls.__setitem__("save_or_replace_latest", calls["save_or_replace_latest"] + 1), + ) + monkeypatch.setattr( + module.CanvasReplicaService, + "replace_for_set", + lambda **_kwargs: calls.__setitem__("replace_for_set", calls["replace_for_set"] + 1) or True, + ) + res = _run(module.update_agent.__wrapped__("agent-1", "tenant-1")) + assert res["code"] == module.RetCode.SUCCESS + assert calls == {"update": 1, "save_or_replace_latest": 1, "replace_for_set": 1} + + monkeypatch.setattr(module.UserCanvasService, "query", lambda **_kwargs: False) + res = module.delete_agent.__wrapped__("agent-1", "tenant-1") + assert res["code"] == module.RetCode.OPERATING_ERROR + + +@pytest.mark.p2 +def test_webhook_prechecks(monkeypatch): + module = _load_agents_app(monkeypatch) + monkeypatch.setattr(module, "request", _DummyRequest(headers={"Content-Type": "application/json"}, json_body={})) + + monkeypatch.setattr(module.UserCanvasService, "get_by_id", lambda _id: (False, None)) + _assert_bad_request(_run(module.webhook("agent-1")), "Canvas not found") + + cvs = _make_webhook_cvs(module, canvas_category=module.CanvasCategory.DataFlow) + monkeypatch.setattr(module.UserCanvasService, "get_by_id", lambda _id: (True, cvs)) + _assert_bad_request(_run(module.webhook("agent-1")), "Dataflow can not be triggered") + + cvs = _make_webhook_cvs(module, dsl="invalid-dsl") + monkeypatch.setattr(module.UserCanvasService, "get_by_id", lambda _id: (True, cvs)) + _assert_bad_request(_run(module.webhook("agent-1")), "Invalid DSL format") + + cvs = _make_webhook_cvs( + module, + dsl={"components": {"begin": {"obj": {"component_name": "Begin", "params": {"mode": "Chat"}}}}}, + ) + monkeypatch.setattr(module.UserCanvasService, "get_by_id", lambda _id: (True, cvs)) + _assert_bad_request(_run(module.webhook("agent-1")), "Webhook not configured") + + params = _default_webhook_params(methods=["GET"]) + cvs = _make_webhook_cvs(module, params=params) + monkeypatch.setattr(module.UserCanvasService, "get_by_id", lambda _id: (True, cvs)) + _assert_bad_request(_run(module.webhook("agent-1")), "not allowed") + + +@pytest.mark.p2 +def test_webhook_security_dispatch(monkeypatch): + module = _load_agents_app(monkeypatch) + _patch_background_task(monkeypatch, module) + + monkeypatch.setattr( + module, + "request", + _DummyRequest(headers={"Content-Type": "application/json"}, json_body={}, args={"a": "b"}), + ) + + for security in ({}, {"auth_type": "none"}): + cvs = _make_webhook_cvs(module, params=_default_webhook_params(security=security)) + monkeypatch.setattr(module.UserCanvasService, "get_by_id", lambda _id, _cvs=cvs: (True, _cvs)) + res = _run(module.webhook("agent-1")) + assert hasattr(res, "status_code"), res + assert res.status_code == 200 + + cvs = _make_webhook_cvs(module, params=_default_webhook_params(security={"auth_type": "unsupported"})) + monkeypatch.setattr(module.UserCanvasService, "get_by_id", lambda _id: (True, cvs)) + _assert_bad_request(_run(module.webhook("agent-1")), "Unsupported auth_type") + + +@pytest.mark.p2 +def test_webhook_max_body_size(monkeypatch): + module = _load_agents_app(monkeypatch) + _patch_background_task(monkeypatch, module) + + base_request = _DummyRequest(headers={"Content-Type": "application/json"}, json_body={}) + monkeypatch.setattr(module, "request", base_request) + + cvs = _make_webhook_cvs(module, params=_default_webhook_params(security={"auth_type": "none"})) + monkeypatch.setattr(module.UserCanvasService, "get_by_id", lambda _id: (True, cvs)) + res = _run(module.webhook("agent-1")) + assert hasattr(res, "status_code") + assert res.status_code == 200 + + security = {"auth_type": "none", "max_body_size": "123"} + cvs = _make_webhook_cvs(module, params=_default_webhook_params(security=security)) + monkeypatch.setattr(module.UserCanvasService, "get_by_id", lambda _id: (True, cvs)) + _assert_bad_request(_run(module.webhook("agent-1")), "Invalid max_body_size format") + + security = {"auth_type": "none", "max_body_size": "11mb"} + cvs = _make_webhook_cvs(module, params=_default_webhook_params(security=security)) + monkeypatch.setattr(module.UserCanvasService, "get_by_id", lambda _id: (True, cvs)) + _assert_bad_request(_run(module.webhook("agent-1")), "exceeds maximum allowed size") + + monkeypatch.setattr( + module, + "request", + _DummyRequest(headers={"Content-Type": "application/json"}, json_body={}, content_length=2048), + ) + security = {"auth_type": "none", "max_body_size": "1kb"} + cvs = _make_webhook_cvs(module, params=_default_webhook_params(security=security)) + monkeypatch.setattr(module.UserCanvasService, "get_by_id", lambda _id: (True, cvs)) + _assert_bad_request(_run(module.webhook("agent-1")), "Request body too large") + + +@pytest.mark.p2 +def test_webhook_ip_whitelist(monkeypatch): + module = _load_agents_app(monkeypatch) + _patch_background_task(monkeypatch, module) + + monkeypatch.setattr( + module, + "request", + _DummyRequest(headers={"Content-Type": "application/json"}, json_body={}, remote_addr="127.0.0.1"), + ) + + for whitelist in ([], ["127.0.0.0/24"], ["127.0.0.1"]): + security = {"auth_type": "none", "ip_whitelist": whitelist} + cvs = _make_webhook_cvs(module, params=_default_webhook_params(security=security)) + monkeypatch.setattr(module.UserCanvasService, "get_by_id", lambda _id, _cvs=cvs: (True, _cvs)) + res = _run(module.webhook("agent-1")) + assert hasattr(res, "status_code"), res + assert res.status_code == 200 + + security = {"auth_type": "none", "ip_whitelist": ["10.0.0.1"]} + cvs = _make_webhook_cvs(module, params=_default_webhook_params(security=security)) + monkeypatch.setattr(module.UserCanvasService, "get_by_id", lambda _id: (True, cvs)) + _assert_bad_request(_run(module.webhook("agent-1")), "is not allowed") + + +@pytest.mark.p2 +def test_webhook_rate_limit(monkeypatch): + module = _load_agents_app(monkeypatch) + _patch_background_task(monkeypatch, module) + + monkeypatch.setattr(module, "request", _DummyRequest(headers={"Content-Type": "application/json"}, json_body={})) + + cvs = _make_webhook_cvs(module, params=_default_webhook_params(security={"auth_type": "none"})) + monkeypatch.setattr(module.UserCanvasService, "get_by_id", lambda _id: (True, cvs)) + res = _run(module.webhook("agent-1")) + assert hasattr(res, "status_code") + assert res.status_code == 200 + + bad_limit = {"auth_type": "none", "rate_limit": {"limit": 0, "per": "minute"}} + cvs = _make_webhook_cvs(module, params=_default_webhook_params(security=bad_limit)) + monkeypatch.setattr(module.UserCanvasService, "get_by_id", lambda _id: (True, cvs)) + _assert_bad_request(_run(module.webhook("agent-1")), "rate_limit.limit must be > 0") + + bad_per = {"auth_type": "none", "rate_limit": {"limit": 1, "per": "week"}} + cvs = _make_webhook_cvs(module, params=_default_webhook_params(security=bad_per)) + monkeypatch.setattr(module.UserCanvasService, "get_by_id", lambda _id: (True, cvs)) + _assert_bad_request(_run(module.webhook("agent-1")), "Invalid rate_limit.per") + + module.REDIS_CONN.bucket_result = [0] + module.REDIS_CONN.bucket_exc = None + denied = {"auth_type": "none", "rate_limit": {"limit": 1, "per": "minute"}} + cvs = _make_webhook_cvs(module, params=_default_webhook_params(security=denied)) + monkeypatch.setattr(module.UserCanvasService, "get_by_id", lambda _id: (True, cvs)) + _assert_bad_request(_run(module.webhook("agent-1")), "Too many requests") + + module.REDIS_CONN.bucket_result = [1] + module.REDIS_CONN.bucket_exc = RuntimeError("redis failure") + cvs = _make_webhook_cvs(module, params=_default_webhook_params(security=denied)) + monkeypatch.setattr(module.UserCanvasService, "get_by_id", lambda _id: (True, cvs)) + _assert_bad_request(_run(module.webhook("agent-1")), "Rate limit error") + + +@pytest.mark.p2 +def test_webhook_token_basic_jwt_auth(monkeypatch): + module = _load_agents_app(monkeypatch) + _patch_background_task(monkeypatch, module) + + monkeypatch.setattr(module, "request", _DummyRequest(headers={"Content-Type": "application/json"}, json_body={})) + + token_security = {"auth_type": "token", "token": {"token_header": "X-TOKEN", "token_value": "ok"}} + cvs = _make_webhook_cvs(module, params=_default_webhook_params(security=token_security)) + monkeypatch.setattr(module.UserCanvasService, "get_by_id", lambda _id: (True, cvs)) + _assert_bad_request(_run(module.webhook("agent-1")), "Invalid token authentication") + + monkeypatch.setattr( + module, + "request", + _DummyRequest( + headers={"Content-Type": "application/json"}, + json_body={}, + authorization=SimpleNamespace(username="u", password="bad"), + ), + ) + basic_security = {"auth_type": "basic", "basic_auth": {"username": "u", "password": "p"}} + cvs = _make_webhook_cvs(module, params=_default_webhook_params(security=basic_security)) + monkeypatch.setattr(module.UserCanvasService, "get_by_id", lambda _id: (True, cvs)) + _assert_bad_request(_run(module.webhook("agent-1")), "Invalid Basic Auth credentials") + + monkeypatch.setattr(module, "request", _DummyRequest(headers={"Content-Type": "application/json"}, json_body={})) + jwt_missing_secret = {"auth_type": "jwt", "jwt": {}} + cvs = _make_webhook_cvs(module, params=_default_webhook_params(security=jwt_missing_secret)) + monkeypatch.setattr(module.UserCanvasService, "get_by_id", lambda _id: (True, cvs)) + _assert_bad_request(_run(module.webhook("agent-1")), "JWT secret not configured") + + jwt_base = {"auth_type": "jwt", "jwt": {"secret": "secret"}} + cvs = _make_webhook_cvs(module, params=_default_webhook_params(security=jwt_base)) + monkeypatch.setattr(module.UserCanvasService, "get_by_id", lambda _id: (True, cvs)) + _assert_bad_request(_run(module.webhook("agent-1")), "Missing Bearer token") + + monkeypatch.setattr( + module, + "request", + _DummyRequest(headers={"Content-Type": "application/json", "Authorization": "Bearer "}, json_body={}), + ) + _assert_bad_request(_run(module.webhook("agent-1")), "Empty Bearer token") + + monkeypatch.setattr( + module, + "request", + _DummyRequest(headers={"Content-Type": "application/json", "Authorization": "Bearer token"}, json_body={}), + ) + monkeypatch.setattr(module.jwt, "decode", lambda *_args, **_kwargs: (_ for _ in ()).throw(Exception("decode boom"))) + _assert_bad_request(_run(module.webhook("agent-1")), "Invalid JWT") + + monkeypatch.setattr(module.jwt, "decode", lambda *_args, **_kwargs: {"exp": 1}) + jwt_reserved = {"auth_type": "jwt", "jwt": {"secret": "secret", "required_claims": ["exp"]}} + cvs = _make_webhook_cvs(module, params=_default_webhook_params(security=jwt_reserved)) + monkeypatch.setattr(module.UserCanvasService, "get_by_id", lambda _id: (True, cvs)) + _assert_bad_request(_run(module.webhook("agent-1")), "Reserved JWT claim cannot be required") + + monkeypatch.setattr(module.jwt, "decode", lambda *_args, **_kwargs: {}) + jwt_missing_claim = {"auth_type": "jwt", "jwt": {"secret": "secret", "required_claims": ["role"]}} + cvs = _make_webhook_cvs(module, params=_default_webhook_params(security=jwt_missing_claim)) + monkeypatch.setattr(module.UserCanvasService, "get_by_id", lambda _id: (True, cvs)) + _assert_bad_request(_run(module.webhook("agent-1")), "Missing JWT claim") + + captured = {} + + def fake_decode(token, options, **kwargs): + captured["token"] = token + captured["options"] = options + captured["kwargs"] = kwargs + return {"role": "admin"} + + monkeypatch.setattr(module.jwt, "decode", fake_decode) + jwt_success = { + "auth_type": "jwt", + "jwt": { + "secret": "secret", + "audience": "aud", + "issuer": "iss", + "required_claims": "role", + }, + } + cvs = _make_webhook_cvs(module, params=_default_webhook_params(security=jwt_success)) + monkeypatch.setattr(module.UserCanvasService, "get_by_id", lambda _id: (True, cvs)) + res = _run(module.webhook("agent-1")) + assert hasattr(res, "status_code") + assert res.status_code == 200 + assert captured["kwargs"]["audience"] == "aud" + assert captured["kwargs"]["issuer"] == "iss" + assert captured["options"]["verify_aud"] is True + assert captured["options"]["verify_iss"] is True + + monkeypatch.setattr(module.jwt, "decode", lambda *_args, **_kwargs: {}) + jwt_success_invalid_type = {"auth_type": "jwt", "jwt": {"secret": "secret", "required_claims": 123}} + cvs = _make_webhook_cvs(module, params=_default_webhook_params(security=jwt_success_invalid_type)) + monkeypatch.setattr(module.UserCanvasService, "get_by_id", lambda _id: (True, cvs)) + res = _run(module.webhook("agent-1")) + assert hasattr(res, "status_code") + assert res.status_code == 200 + + +@pytest.mark.p2 +def test_webhook_parse_request_branches(monkeypatch): + module = _load_agents_app(monkeypatch) + _patch_background_task(monkeypatch, module) + + security = {"auth_type": "none"} + params = _default_webhook_params(security=security, content_types="application/json") + cvs = _make_webhook_cvs(module, params=params) + monkeypatch.setattr(module.UserCanvasService, "get_by_id", lambda _id: (True, cvs)) + + monkeypatch.setattr( + module, + "request", + _DummyRequest(headers={"Content-Type": "text/plain"}, raw_body=b'{"x":1}', json_body={}), + ) + with pytest.raises(ValueError, match="Invalid Content-Type"): + _run(module.webhook("agent-1")) + + monkeypatch.setattr( + module, + "request", + _DummyRequest(headers={"Content-Type": "application/json"}, json_body={"x": 1}, args={"q": "1"}), + ) + res = _run(module.webhook("agent-1")) + assert hasattr(res, "status_code") + assert res.status_code == 200 + + params = _default_webhook_params(security=security, content_types="multipart/form-data") + cvs = _make_webhook_cvs(module, params=params) + monkeypatch.setattr(module.UserCanvasService, "get_by_id", lambda _id: (True, cvs)) + files = {f"file{i}": object() for i in range(11)} + monkeypatch.setattr( + module, + "request", + _DummyRequest( + headers={"Content-Type": "multipart/form-data"}, + form={"key": "value"}, + files=files, + json_body={}, + ), + ) + res = _run(module.webhook("agent-1")) + assert hasattr(res, "status_code") + assert res.status_code == 200 + + uploaded = {"count": 0} + monkeypatch.setattr( + module.FileService, + "upload_info", + lambda *_args, **_kwargs: uploaded.__setitem__("count", uploaded["count"] + 1) or {"id": "uploaded"}, + ) + monkeypatch.setattr( + module, + "request", + _DummyRequest( + headers={"Content-Type": "multipart/form-data"}, + form={"k": "v"}, + files={"file1": object()}, + json_body={}, + ), + ) + res = _run(module.webhook("agent-1")) + assert hasattr(res, "status_code") + assert res.status_code == 200 + assert uploaded["count"] == 1 + + +@pytest.mark.p2 +def test_webhook_canvas_constructor_exception(monkeypatch): + module = _load_agents_app(monkeypatch) + + params = _default_webhook_params(security={"auth_type": "none"}) + cvs = _make_webhook_cvs(module, params=params) + monkeypatch.setattr(module.UserCanvasService, "get_by_id", lambda _id: (True, cvs)) + monkeypatch.setattr( + module, + "request", + _DummyRequest(headers={"Content-Type": "application/json"}, json_body={}), + ) + monkeypatch.setattr(module, "Canvas", lambda *_args, **_kwargs: (_ for _ in ()).throw(RuntimeError("canvas init failed"))) + + def fake_error_result(*, code, message): + return SimpleNamespace(code=code, message=message) + + monkeypatch.setattr(module, "get_data_error_result", fake_error_result) + res = _run(module.webhook("agent-1")) + assert isinstance(res, SimpleNamespace) + assert res.code == module.RetCode.BAD_REQUEST + assert "canvas init failed" in res.message + assert res.status_code == module.RetCode.BAD_REQUEST + + +@pytest.mark.p2 +def test_webhook_trace_polling_branches(monkeypatch): + module = _load_agents_app(monkeypatch) + monkeypatch.setattr( + module.UserCanvasService, + "get_by_id", + lambda _id: (True, _CanvasRecord(canvas_category=module.CanvasCategory.Agent, dsl={}, user_id="tenant-1")), + ) + + # Missing since_ts. + monkeypatch.setattr(module, "request", SimpleNamespace(args=_Args())) + res = _run(module.webhook_trace("agent-1")) + assert res["code"] == module.RetCode.SUCCESS + assert res["data"]["webhook_id"] is None + assert res["data"]["events"] == [] + assert res["data"]["finished"] is False + + # since_ts provided but no Redis data. + monkeypatch.setattr(module, "request", SimpleNamespace(args=_Args({"since_ts": "100.0"}))) + monkeypatch.setattr(module.REDIS_CONN, "get", lambda _k: None) + res = _run(module.webhook_trace("agent-1")) + assert res["code"] == module.RetCode.SUCCESS + assert res["data"]["webhook_id"] is None + assert res["data"]["next_since_ts"] == 100.0 + assert res["data"]["events"] == [] + assert res["data"]["finished"] is False + + webhooks_obj = { + "webhooks": { + "101.0": { + "events": [ + {"event": "message", "ts": 101.2, "data": {"content": "a"}}, + {"event": "finished", "ts": 102.5}, + ] + }, + "99.0": {"events": [{"event": "message", "ts": 99.1}]}, + } + } + raw = json.dumps(webhooks_obj) + monkeypatch.setattr(module.REDIS_CONN, "get", lambda _k: raw) + + # No candidates newer than since_ts. + monkeypatch.setattr(module, "request", SimpleNamespace(args=_Args({"since_ts": "200.0"}))) + res = _run(module.webhook_trace("agent-1")) + assert res["code"] == module.RetCode.SUCCESS + assert res["data"]["webhook_id"] is None + assert res["data"]["next_since_ts"] == 200.0 + assert res["data"]["events"] == [] + assert res["data"]["finished"] is False + + # Candidate exists and webhook id is assigned. + monkeypatch.setattr(module, "request", SimpleNamespace(args=_Args({"since_ts": "100.0"}))) + res = _run(module.webhook_trace("agent-1")) + assert res["code"] == module.RetCode.SUCCESS + webhook_id = res["data"]["webhook_id"] + assert webhook_id + assert res["data"]["events"] == [] + assert res["data"]["next_since_ts"] == 101.0 + assert res["data"]["finished"] is False + + # Invalid webhook id. + monkeypatch.setattr( + module, + "request", + SimpleNamespace(args=_Args({"since_ts": "100.0", "webhook_id": "bad-id"})), + ) + res = _run(module.webhook_trace("agent-1")) + assert res["code"] == module.RetCode.SUCCESS + assert res["data"]["webhook_id"] == "bad-id" + assert res["data"]["events"] == [] + assert res["data"]["next_since_ts"] == 100.0 + assert res["data"]["finished"] is True + + # Valid webhook id with event filtering and finished flag. + monkeypatch.setattr( + module, + "request", + SimpleNamespace(args=_Args({"since_ts": "101.0", "webhook_id": webhook_id})), + ) + res = _run(module.webhook_trace("agent-1")) + assert res["code"] == module.RetCode.SUCCESS + assert res["data"]["webhook_id"] == webhook_id + assert [event["ts"] for event in res["data"]["events"]] == [101.2, 102.5] + assert res["data"]["next_since_ts"] == 102.5 + assert res["data"]["finished"] is True + + +@pytest.mark.p2 +def test_webhook_parse_request_form_and_raw_body_paths(monkeypatch): + module = _load_agents_app(monkeypatch) + _patch_background_task(monkeypatch, module) + + security = {"auth_type": "none"} + + def _run_with(params, req): + cvs = _make_webhook_cvs(module, params=params) + monkeypatch.setattr(module.UserCanvasService, "get_by_id", lambda _id, _cvs=cvs: (True, _cvs)) + monkeypatch.setattr(module, "request", req) + res = _run(module.webhook("agent-1")) + assert hasattr(res, "status_code"), res + assert res.status_code == 200 + + _run_with( + _default_webhook_params(security=security, content_types="application/x-www-form-urlencoded"), + _DummyRequest( + headers={"Content-Type": "application/x-www-form-urlencoded"}, + form={"a": "1", "b": "2"}, + json_body={}, + ), + ) + + _run_with( + _default_webhook_params(security=security, content_types="text/plain"), + _DummyRequest(headers={"Content-Type": "text/plain"}, raw_body=b'{"k": 1}', json_body={}), + ) + + _run_with( + _default_webhook_params(security=security, content_types="text/plain"), + _DummyRequest(headers={"Content-Type": "text/plain"}, raw_body=b"{bad-json}", json_body={}), + ) + + _run_with( + _default_webhook_params(security=security, content_types="text/plain"), + _DummyRequest(headers={"Content-Type": "text/plain"}, raw_body=b"", json_body={}), + ) + + class _BrokenRawRequest(_DummyRequest): + async def get_data(self): + raise RuntimeError("raw read failed") + + _run_with( + _default_webhook_params(security=security, content_types="text/plain"), + _BrokenRawRequest(headers={"Content-Type": "text/plain"}, json_body={}), + ) + + +@pytest.mark.p2 +def test_webhook_schema_extract_cast_defaults_and_validation_errors(monkeypatch): + module = _load_agents_app(monkeypatch) + _patch_background_task(monkeypatch, module) + + base_schema = { + "query": { + "properties": { + "q_file": {"type": "file"}, + "q_object": {"type": "object"}, + "q_boolean": {"type": "boolean"}, + "q_number": {"type": "number"}, + "q_string": {"type": "string"}, + "q_array": {"type": "array"}, + "q_null": {"type": "null"}, + "q_default_none": {}, + }, + "required": [], + }, + "headers": {"properties": {"Content-Type": {"type": "string"}}, "required": []}, + "body": { + "properties": { + "bool_true": {"type": "boolean"}, + "bool_false": {"type": "boolean"}, + "number_int": {"type": "number"}, + "number_float": {"type": "number"}, + "obj": {"type": "object"}, + "arr": {"type": "array"}, + "text": {"type": "string"}, + "file_list": {"type": "file"}, + "unknown": {"type": "mystery"}, + }, + "required": [ + "bool_true", + "number_int", + "obj", + "arr", + "text", + "file_list", + "unknown", + ], + }, + } + + params = _default_webhook_params( + security={"auth_type": "none"}, + content_types="application/json", + schema=base_schema, + ) + cvs = _make_webhook_cvs(module, params=params) + monkeypatch.setattr(module.UserCanvasService, "get_by_id", lambda _id: (True, cvs)) + monkeypatch.setattr( + module, + "request", + _DummyRequest( + headers={"Content-Type": "application/json"}, + args={}, + json_body={ + "bool_true": "true", + "bool_false": "0", + "number_int": "-3", + "number_float": "2.5", + "obj": '{"a": 1}', + "arr": "[1, 2]", + "text": "hello", + "file_list": ["f1"], + "unknown": "mystery", + }, + ), + ) + res = _run(module.webhook("agent-1")) + assert hasattr(res, "status_code"), res + assert res.status_code == 200 + + failure_cases = [ + ( + {"query": {"properties": {}, "required": []}, "headers": {"properties": {}, "required": []}, "body": {"properties": {"must": {"type": "string"}}, "required": ["must"]}}, + {}, + "missing required field", + ), + ( + {"query": {"properties": {}, "required": []}, "headers": {"properties": {}, "required": []}, "body": {"properties": {"flag": {"type": "boolean"}}, "required": ["flag"]}}, + {"flag": "maybe"}, + "auto-cast failed", + ), + ( + {"query": {"properties": {}, "required": []}, "headers": {"properties": {}, "required": []}, "body": {"properties": {"num": {"type": "number"}}, "required": ["num"]}}, + {"num": "abc"}, + "auto-cast failed", + ), + ( + {"query": {"properties": {}, "required": []}, "headers": {"properties": {}, "required": []}, "body": {"properties": {"obj": {"type": "object"}}, "required": ["obj"]}}, + {"obj": "[]"}, + "auto-cast failed", + ), + ( + {"query": {"properties": {}, "required": []}, "headers": {"properties": {}, "required": []}, "body": {"properties": {"arr": {"type": "array"}}, "required": ["arr"]}}, + {"arr": "{}"}, + "auto-cast failed", + ), + ( + {"query": {"properties": {}, "required": []}, "headers": {"properties": {}, "required": []}, "body": {"properties": {"num": {"type": "number"}}, "required": ["num"]}}, + {"num": []}, + "type mismatch", + ), + ( + {"query": {"properties": {}, "required": []}, "headers": {"properties": {}, "required": []}, "body": {"properties": {"arr": {"type": "array"}}, "required": ["arr"]}}, + {"arr": 3}, + "type mismatch", + ), + ( + {"query": {"properties": {}, "required": []}, "headers": {"properties": {}, "required": []}, "body": {"properties": {"arr": {"type": "array"}}, "required": ["arr"]}}, + {"arr": [1, "x"]}, + "type mismatch", + ), + ( + {"query": {"properties": {}, "required": []}, "headers": {"properties": {}, "required": []}, "body": {"properties": {"file": {"type": "file"}}, "required": ["file"]}}, + {"file": "inline-file"}, + "type mismatch", + ), + ] + + for schema, body_payload, expected_substring in failure_cases: + params = _default_webhook_params( + security={"auth_type": "none"}, + content_types="application/json", + schema=schema, + ) + cvs = _make_webhook_cvs(module, params=params) + monkeypatch.setattr(module.UserCanvasService, "get_by_id", lambda _id, _cvs=cvs: (True, _cvs)) + monkeypatch.setattr( + module, + "request", + _DummyRequest(headers={"Content-Type": "application/json"}, json_body=body_payload), + ) + res = _run(module.webhook("agent-1")) + _assert_bad_request(res, expected_substring) + + +@pytest.mark.p2 +def test_webhook_immediate_response_status_and_template_validation(monkeypatch): + module = _load_agents_app(monkeypatch) + _patch_background_task(monkeypatch, module) + + def _run_case(response_cfg): + params = _default_webhook_params( + security={"auth_type": "none"}, + content_types="application/json", + response=response_cfg, + ) + cvs = _make_webhook_cvs(module, params=params) + monkeypatch.setattr(module.UserCanvasService, "get_by_id", lambda _id, _cvs=cvs: (True, _cvs)) + monkeypatch.setattr(module, "request", _DummyRequest(headers={"Content-Type": "application/json"}, json_body={})) + return _run(module.webhook("agent-1")) + + _assert_bad_request(_run_case({"status": "abc"}), "Invalid response status code") + _assert_bad_request(_run_case({"status": 500}), "must be between 200 and 399") + + empty_res = _run_case({"status": 204, "body_template": ""}) + assert empty_res.status_code == 204 + assert empty_res.content_type == "application/json" + assert _run(empty_res.get_data(as_text=True)) == "null" + + json_res = _run_case({"status": 201, "body_template": '{"ok": true}'}) + assert json_res.status_code == 201 + assert json_res.content_type == "application/json" + assert json.loads(_run(json_res.get_data(as_text=True))) == {"ok": True} + + plain_res = _run_case({"status": 202, "body_template": "plain-text"}) + assert plain_res.status_code == 202 + assert plain_res.content_type == "text/plain" + assert _run(plain_res.get_data(as_text=True)) == "plain-text" + + +@pytest.mark.p2 +def test_webhook_background_run_success_and_error_trace_paths(monkeypatch): + module = _load_agents_app(monkeypatch) + + redis_store = {} + + def redis_get(key): + return redis_store.get(key) + + def redis_set_obj(key, obj, _ttl): + redis_store[key] = json.dumps(obj) + + monkeypatch.setattr(module.REDIS_CONN, "get", redis_get) + monkeypatch.setattr(module.REDIS_CONN, "set_obj", redis_set_obj) + + update_calls = [] + monkeypatch.setattr(module.UserCanvasService, "update_by_id", lambda *_args, **_kwargs: update_calls.append(True)) + + tasks = [] + + def _capture_task(coro): + tasks.append(coro) + return SimpleNamespace() + + monkeypatch.setattr(module.asyncio, "create_task", _capture_task) + + class _CanvasSuccess(_StubCanvas): + async def run(self, **_kwargs): + yield {"event": "message", "data": {"content": "ok"}} + + def __str__(self): + return "{}" + + monkeypatch.setattr(module, "Canvas", _CanvasSuccess) + + params = _default_webhook_params(security={"auth_type": "none"}, content_types="application/json") + cvs = _make_webhook_cvs(module, params=params) + monkeypatch.setattr(module.UserCanvasService, "get_by_id", lambda _id: (True, cvs)) + monkeypatch.setattr( + module, + "request", + _DummyRequest(path="/api/v1/agents/agent-1/webhook/test", headers={"Content-Type": "application/json"}, json_body={}), + ) + + res = _run(module.webhook("agent-1")) + assert res.status_code == 200 + assert len(tasks) == 1 + _run(tasks.pop(0)) + assert update_calls == [True] + + key = "webhook-trace-agent-1-logs" + trace_obj = json.loads(redis_store[key]) + ws = next(iter(trace_obj["webhooks"].values())) + events = ws["events"] + assert any(event.get("event") == "message" for event in events) + assert any(event.get("event") == "finished" and event.get("success") is True for event in events) + + class _CanvasError(_StubCanvas): + async def run(self, **_kwargs): + raise RuntimeError("run failed") + yield {} + + monkeypatch.setattr(module, "Canvas", _CanvasError) + tasks.clear() + redis_store.clear() + cvs = _make_webhook_cvs(module, params=params) + monkeypatch.setattr(module.UserCanvasService, "get_by_id", lambda _id, _cvs=cvs: (True, _cvs)) + res = _run(module.webhook("agent-1")) + assert res.status_code == 200 + _run(tasks.pop(0)) + trace_obj = json.loads(redis_store[key]) + ws = next(iter(trace_obj["webhooks"].values())) + events = ws["events"] + assert any(event.get("event") == "error" for event in events) + assert any(event.get("event") == "finished" and event.get("success") is False for event in events) + + log_messages = [] + monkeypatch.setattr(module.logging, "exception", lambda msg, *_args, **_kwargs: log_messages.append(str(msg))) + monkeypatch.setattr(module.REDIS_CONN, "get", lambda _key: "{") + monkeypatch.setattr(module.REDIS_CONN, "set_obj", lambda *_args, **_kwargs: None) + tasks.clear() + cvs = _make_webhook_cvs(module, params=params) + monkeypatch.setattr(module.UserCanvasService, "get_by_id", lambda _id, _cvs=cvs: (True, _cvs)) + _run(module.webhook("agent-1")) + _run(tasks.pop(0)) + assert any("Failed to append webhook trace" in msg for msg in log_messages) + + +@pytest.mark.p2 +def test_webhook_sse_success_and_exception_paths(monkeypatch): + module = _load_agents_app(monkeypatch) + + redis_store = {} + monkeypatch.setattr(module.REDIS_CONN, "get", lambda key: redis_store.get(key)) + monkeypatch.setattr(module.REDIS_CONN, "set_obj", lambda key, obj, _ttl: redis_store.__setitem__(key, json.dumps(obj))) + + params = _default_webhook_params( + security={"auth_type": "none"}, + content_types="application/json", + execution_mode="Deferred", + ) + cvs = _make_webhook_cvs(module, params=params) + monkeypatch.setattr(module.UserCanvasService, "get_by_id", lambda _id: (True, cvs)) + + class _CanvasSSESuccess(_StubCanvas): + async def run(self, **_kwargs): + yield {"event": "message", "data": {"content": "x", "start_to_think": True}} + yield {"event": "message", "data": {"content": "y", "end_to_think": True}} + yield {"event": "message", "data": {"content": "Hello"}} + yield {"event": "message_end", "data": {"status": "201"}} + + monkeypatch.setattr(module, "Canvas", _CanvasSSESuccess) + monkeypatch.setattr( + module, + "request", + _DummyRequest(path="/api/v1/agents/agent-1/webhook/test", headers={"Content-Type": "application/json"}, json_body={}), + ) + res = _run(module.webhook("agent-1")) + assert res.status_code == 201 + payload = json.loads(_run(res.get_data(as_text=True))) + assert payload == {"message": "Hello", "success": True, "code": 201} + + class _CanvasSSEError(_StubCanvas): + async def run(self, **_kwargs): + raise RuntimeError("sse failed") + yield {} + + monkeypatch.setattr(module, "Canvas", _CanvasSSEError) + monkeypatch.setattr( + module, + "request", + _DummyRequest(path="/api/v1/agents/agent-1/webhook/test", headers={"Content-Type": "application/json"}, json_body={}), + ) + res = _run(module.webhook("agent-1")) + assert res.status_code == 400 + payload = json.loads(_run(res.get_data(as_text=True))) + assert payload["code"] == 400 + assert payload["success"] is False + assert "sse failed" in payload["message"] + + +@pytest.mark.p2 +def test_webhook_trace_encoded_id_generation(monkeypatch): + module = _load_agents_app(monkeypatch) + monkeypatch.setattr( + module.UserCanvasService, + "get_by_id", + lambda _id: (True, _CanvasRecord(canvas_category=module.CanvasCategory.Agent, dsl={}, user_id="tenant-1")), + ) + + webhooks_obj = { + "webhooks": { + "101.0": { + "events": [{"event": "message", "ts": 101.2}], + } + } + } + monkeypatch.setattr(module.REDIS_CONN, "get", lambda _key: json.dumps(webhooks_obj)) + monkeypatch.setattr(module, "request", SimpleNamespace(args=_Args({"since_ts": "100.0"}))) + res = _run(module.webhook_trace("agent-1")) + assert res["code"] == module.RetCode.SUCCESS + + expected = base64.urlsafe_b64encode( + hmac.new( + b"webhook_id_secret", + b"101.0", + hashlib.sha256, + ).digest() + ).decode("utf-8").rstrip("=") + assert res["data"]["webhook_id"] == expected diff --git a/test/testcases/test_web_api/test_chunk_app/test_list_chunks.py b/test/testcases/test_web_api/test_chunk_app/test_list_chunks.py index 1b381499f31..12b083b5128 100644 --- a/test/testcases/test_web_api/test_chunk_app/test_list_chunks.py +++ b/test/testcases/test_web_api/test_chunk_app/test_list_chunks.py @@ -90,11 +90,17 @@ def test_available_filter(self, WebApiAuth, add_chunks): res = update_chunk(WebApiAuth, dataset_id, document_id, chunk_id, {"content": "unchanged content", "available": False}) assert res["code"] == 0, res - from time import sleep - - sleep(1) - res = list_chunks(WebApiAuth, dataset_id, document_id, params={"available": "false"}) - assert res["code"] == 0, res + from time import sleep, time + + deadline = time() + 5 + res = None + while time() < deadline: + res = list_chunks(WebApiAuth, dataset_id, document_id, params={"available": "false"}) + assert res["code"] == 0, res + if res["data"]["chunks"]: + break + sleep(0.5) + assert res is not None assert len(res["data"]["chunks"]) >= 1, res assert all(chunk["available"] is False for chunk in res["data"]["chunks"]), res @@ -104,20 +110,23 @@ def test_available_filter(self, WebApiAuth, add_chunks): @pytest.mark.p2 @pytest.mark.parametrize( - "params, expected_page_size", + "params, expected_page_size, minimum_page_size", [ - ({"keywords": None}, 5), - ({"keywords": ""}, 5), - ({"keywords": "1"}, 1), - pytest.param({"keywords": "chunk"}, 4, marks=pytest.mark.skipif(os.getenv("DOC_ENGINE") == "infinity", reason="issues/6509")), - ({"keywords": "unknown"}, 0), + ({"keywords": None}, 5, None), + ({"keywords": ""}, 5, None), + ({"keywords": "1"}, 1, None), + pytest.param({"keywords": "chunk"}, None, 3, marks=pytest.mark.skipif(os.getenv("DOC_ENGINE") == "infinity", reason="issues/6509")), + ({"keywords": "unknown"}, 0, None), ], ) - def test_keywords(self, WebApiAuth, add_chunks, params, expected_page_size): + def test_keywords(self, WebApiAuth, add_chunks, params, expected_page_size, minimum_page_size): dataset_id, document_id, _ = add_chunks res = list_chunks(WebApiAuth, dataset_id, document_id, params=params) assert res["code"] == 0, res - assert len(res["data"]["chunks"]) == expected_page_size, res + if minimum_page_size is not None: + assert len(res["data"]["chunks"]) >= minimum_page_size, res + else: + assert len(res["data"]["chunks"]) == expected_page_size, res @pytest.mark.p3 def test_invalid_params(self, WebApiAuth, add_chunks): diff --git a/web/src/pages/agent/hooks/use-build-webhook-url.ts b/web/src/pages/agent/hooks/use-build-webhook-url.ts index 6794bc77da2..eb732d87ebe 100644 --- a/web/src/pages/agent/hooks/use-build-webhook-url.ts +++ b/web/src/pages/agent/hooks/use-build-webhook-url.ts @@ -3,6 +3,6 @@ import { useParams } from 'react-router'; export function useBuildWebhookUrl() { const { id } = useParams(); - const text = `${location.protocol}//${location.host}/api/v1/webhook/${id}`; + const text = `${location.protocol}//${location.host}/api/v1/agents/${id}/webhook`; return text; } diff --git a/web/src/pages/agent/webhook-sheet/index.tsx b/web/src/pages/agent/webhook-sheet/index.tsx index d1f46544bb9..e0091ab96e0 100644 --- a/web/src/pages/agent/webhook-sheet/index.tsx +++ b/web/src/pages/agent/webhook-sheet/index.tsx @@ -28,7 +28,7 @@ enum WebhookTraceTabType { const WebhookSheet = ({ hideModal }: RunSheetProps) => { const { t } = useTranslation(); const { id } = useParams(); - const text = `${location.protocol}//${location.host}/api/v1/webhook_test/${id}`; + const text = `${location.protocol}//${location.host}/api/v1/agents/${id}/webhook/test`; const { data } = useFetchWebhookTrace(true); diff --git a/web/src/utils/api.ts b/web/src/utils/api.ts index 315c238cf9b..bf863f6a8d3 100644 --- a/web/src/utils/api.ts +++ b/web/src/utils/api.ts @@ -211,8 +211,8 @@ export default { prompt: `${restAPIv1}/agents/prompts`, cancelDataflow: (id: string) => `${webAPI}/canvas/cancel/${id}`, downloadFile: `${restAPIv1}/agents/download`, - testWebhook: (id: string) => `${restAPIv1}/webhook_test/${id}`, - fetchWebhookTrace: (id: string) => `${restAPIv1}/webhook_trace/${id}`, + testWebhook: (id: string) => `${restAPIv1}/agents/${id}/webhook/test`, + fetchWebhookTrace: (id: string) => `${restAPIv1}/agents/${id}/webhook/logs`, // explore