dify/api/controllers/trigger/webhook.py

158 lines
6.5 KiB
Python
Raw Normal View History

import logging
import time
import uuid
from flask import jsonify, request
from werkzeug.exceptions import NotFound, RequestEntityTooLarge
from controllers.trigger import bp
from core.trigger.debug.event_bus import TriggerDebugEventBus
from core.trigger.debug.events import WebhookDebugEvent, build_webhook_pool_key
from services.trigger.webhook_service import WebhookService
logger = logging.getLogger(__name__)
def _prepare_webhook_execution(webhook_id: str, is_debug: bool = False):
"""Fetch trigger context, extract request data, and validate payload using unified processing.
Args:
webhook_id: The webhook ID to process
is_debug: If True, skip status validation for debug mode
"""
webhook_trigger, workflow, node_config = WebhookService.get_webhook_trigger_and_workflow(
webhook_id, is_debug=is_debug
)
try:
# Use new unified extraction and validation
webhook_data = WebhookService.extract_and_validate_webhook_data(webhook_trigger, node_config)
return webhook_trigger, workflow, node_config, webhook_data, None
except ValueError as e:
# Provide minimal context for error reporting without risking another parse failure
webhook_data = {
"method": request.method,
"headers": dict(request.headers),
"query_params": dict(request.args),
"body": {},
"files": {},
}
return webhook_trigger, workflow, node_config, webhook_data, str(e)
@bp.route("/webhook/<string:webhook_id>", methods=["GET", "POST", "PUT", "PATCH", "DELETE", "HEAD", "OPTIONS"])
def handle_webhook(webhook_id: str):
"""
Handle webhook trigger calls.
This endpoint receives webhook calls and processes them according to the
configured webhook trigger settings.
Query Parameters:
workflow_id (optional): Specific workflow version ID to execute
"""
try:
webhook_trigger, workflow, node_config, webhook_data, error = _prepare_webhook_execution(webhook_id)
if error:
return jsonify({"error": "Bad Request", "message": error}), 400
# Extract workflow_id from query parameters (treat empty string as None)
raw_workflow_id = request.args.get("workflow_id")
workflow_id = raw_workflow_id if raw_workflow_id not in (None, "") else None
if workflow_id is not None:
try:
uuid.UUID(workflow_id)
except ValueError:
return jsonify({"error": "Bad Request", "message": "Invalid workflow_id format."}), 400
# Process webhook call (send to Celery) with optional workflow_id
WebhookService.trigger_workflow_execution(webhook_trigger, webhook_data, workflow, workflow_id)
# Return configured response with safe fallback when mocked service doesn't provide a tuple
result = WebhookService.generate_webhook_response(node_config)
if isinstance(result, tuple) and len(result) == 2:
response_data, status_code = result
else:
# Default fallback: empty body and 200 OK
response_data, status_code = {}, 200
return jsonify(response_data), status_code
except ValueError as e:
raise NotFound(str(e))
except RequestEntityTooLarge:
raise
except Exception as e:
logger.exception("Webhook processing failed for %s", webhook_id)
return jsonify({"error": "Internal server error", "message": str(e)}), 500
@bp.route("/webhook-debug/<string:webhook_id>", methods=["GET", "POST", "PUT", "PATCH", "DELETE", "HEAD", "OPTIONS"])
def handle_webhook_debug(webhook_id: str):
"""Handle webhook debug calls without triggering production workflow execution.
The debug webhook endpoint is only for draft inspection flows. It never enqueues
Celery work for the published workflow; instead it dispatches an in-memory debug
event to an active Variable Inspector listener. Returning a clear error when no
listener is registered prevents a misleading 200 response for requests that are
effectively dropped.
"""
try:
webhook_trigger, _, node_config, webhook_data, error = _prepare_webhook_execution(webhook_id, is_debug=True)
if error:
return jsonify({"error": "Bad Request", "message": error}), 400
workflow_inputs = WebhookService.build_workflow_inputs(webhook_data)
# Generate pool key and dispatch debug event
pool_key: str = build_webhook_pool_key(
tenant_id=webhook_trigger.tenant_id,
app_id=webhook_trigger.app_id,
node_id=webhook_trigger.node_id,
)
event = WebhookDebugEvent(
request_id=f"webhook_debug_{webhook_trigger.webhook_id}_{int(time.time() * 1000)}",
timestamp=int(time.time()),
node_id=webhook_trigger.node_id,
payload={
"inputs": workflow_inputs,
"webhook_data": webhook_data,
"method": webhook_data.get("method"),
},
)
dispatch_count = TriggerDebugEventBus.dispatch(
tenant_id=webhook_trigger.tenant_id,
event=event,
pool_key=pool_key,
)
if dispatch_count == 0:
logger.warning(
"Webhook debug request dropped without an active listener for webhook %s (tenant=%s, app=%s, node=%s)",
webhook_trigger.webhook_id,
webhook_trigger.tenant_id,
webhook_trigger.app_id,
webhook_trigger.node_id,
)
return (
jsonify(
{
"error": "No active debug listener",
"message": (
"The webhook debug URL only works while the Variable Inspector is listening. "
"Use the published webhook URL to execute the workflow in Celery."
),
"execution_url": webhook_trigger.webhook_url,
}
),
409,
)
response_data, status_code = WebhookService.generate_webhook_response(node_config)
return jsonify(response_data), status_code
except ValueError as e:
raise NotFound(str(e))
except RequestEntityTooLarge:
raise
except Exception as e:
logger.exception("Webhook debug processing failed for %s", webhook_id)
return jsonify({"error": "Internal server error", "message": "An internal error has occurred."}), 500