Changeset 3471792
- Timestamp:
- 02/28/2026 09:07:49 PM (5 weeks ago)
- Location:
- flowsystems-webhook-actions/trunk
- Files:
-
- 3 added
- 2 deleted
- 14 edited
-
.gitignore (added)
-
README.txt (modified) (4 diffs)
-
admin/dist/.vite/manifest.json (modified) (2 diffs)
-
admin/dist/assets/main-BEzXuheK.js (deleted)
-
admin/dist/assets/main-C1P6l3fn.js (added)
-
admin/dist/assets/style-DPbNsgg4.css (added)
-
admin/dist/assets/style-DQRXYd6m.css (deleted)
-
flowsystems-webhook-actions.php (modified) (2 diffs)
-
src/Activation.php (modified) (3 diffs)
-
src/Api/HealthController.php (modified) (4 diffs)
-
src/Api/LogsController.php (modified) (7 diffs)
-
src/Api/QueueController.php (modified) (10 diffs)
-
src/App.php (modified) (1 diff)
-
src/Database/Migrator.php (modified) (3 diffs)
-
src/Repositories/LogRepository.php (modified) (9 diffs)
-
src/Repositories/QueueRepository.php (modified) (3 diffs)
-
src/Services/Dispatcher.php (modified) (18 diffs)
-
src/Services/LogService.php (modified) (4 diffs)
-
src/Services/QueueService.php (modified) (11 diffs)
Legend:
- Unmodified
- Added
- Removed
-
flowsystems-webhook-actions/trunk/README.txt
r3464666 r3471792 1 1 === Flow Systems Webhook Actions === 2 2 Contributors: mateuszflowsystems 3 Tags: webhook, woocommerce, automation, hooks, n8n3 Tags: webhook, woocommerce, automation, n8n, integration 4 4 Requires at least: 6.0 5 5 Tested up to: 6.9 6 6 Requires PHP: 8.0 7 Stable tag: 1. 0.17 Stable tag: 1.1.0 8 8 License: GPLv2 or later 9 9 License URI: https://www.gnu.org/licenses/old-licenses/gpl-2.0.html 10 10 11 WordPress webhook plugin for developers. Trigger HTTP webhooks from any WordPress or WooCommerce action with async retries and payload mapping.11 Production-safe WordPress webhooks with retries, event IDs, queue processing, and full delivery observability. 12 12 13 13 == Description == 14 14 15 Flow Systems Webhook Actions is a WordPress webhook plugin that lets you trigger HTTP webhooks from any WordPress or WooCommerce action (`do_action`). 16 17 Instead of writing custom integration code, you can configure webhook endpoints directly from the admin panel and send structured JSON payloads to automation tools or external APIs. 18 19 Webhooks are dispatched asynchronously with background processing, retry logic, and delivery logging to ensure reliable and non-blocking execution. 20 21 = Typical use cases = 22 23 - Send WooCommerce order data to n8n 24 - Sync new WordPress users to a CRM 25 - Trigger Slack notifications when a post is published 26 - Send form submissions to an external API 27 - Automate membership or subscription workflows 28 - Connect WordPress events to internal backend systems 29 30 = Webhook Triggering = 31 32 - Trigger webhooks from any WordPress action (`do_action`) 33 - Support for core, custom, and WooCommerce hooks 34 - JSON payload including hook name, arguments, timestamp, and site URL 35 - Configurable webhook URL and optional Authorization header 36 - HTTPS enforcement by default (configurable via filter) 37 38 = Queue System = 39 40 - Asynchronous background processing via WP-Cron 41 - Non-blocking execution to avoid slowing down user requests 42 - Automatic retry with exponential backoff 15 Flow Systems Webhook Actions is a developer-focused WordPress webhook delivery layer designed for reliable automation workflows. 16 17 Trigger HTTP webhooks from any WordPress or WooCommerce action (`do_action`) and dispatch them asynchronously through a persistent queue with smart retries, event identity, and full delivery visibility. 18 19 Unlike basic “fire-and-forget” webhook implementations, this plugin ensures: 20 21 - Delivery attempts are tracked 22 - Failures are visible 23 - Retries are automatic and intelligent 24 - Events include stable identity metadata for idempotency 25 26 Built for production environments where losing events is not acceptable. 27 28 = Typical Use Cases = 29 30 - Send WooCommerce orders to n8n with retry protection 31 - Sync WordPress users to external CRMs safely 32 - Trigger backend microservices from WP hooks 33 - Send event-driven data to internal APIs 34 - Replace fragile custom `wp_remote_post()` integrations 35 - Build idempotent WordPress automation pipelines 36 37 = Event Identity & Idempotency = 38 39 Every dispatched webhook includes: 40 41 - Unique UUID (v4) per event 42 - ISO 8601 UTC timestamp 43 - Embedded `event.id`, `event.timestamp`, `event.version` in the payload 44 - HTTP headers: `X-Event-Id`, `X-Event-Timestamp` 45 46 This enables downstream deduplication, idempotent workflow design, and reliable debugging across systems. 47 48 = Reliable Queue & Smart Retry = 49 50 Webhooks are never sent directly from request execution. Instead: 51 52 - Events are stored in a persistent database queue 53 - Processed asynchronously via background jobs 54 - Dispatched in batches to avoid performance impact 55 56 Smart retry routing: 57 58 - 5xx and 429 responses → automatic exponential backoff retry 59 - 4xx and 3xx responses → immediately marked as `permanently_failed` 60 - Configurable maximum retry attempts 61 - Full attempt history stored per event 62 63 No silent failures. 64 65 = Delivery Observability = 66 67 Operational visibility built into the admin panel: 68 69 Status states: `pending`, `processing`, `success`, `failed` (retrying), `permanently_failed` 70 71 - Attempt timeline per event 72 - HTTP status codes and response bodies 73 - Manual retry (single or bulk) 74 75 Filter by: event UUID, target URL, date range, status 76 77 Queue health metrics: 78 79 - Average attempts per event 80 - Oldest pending job age 81 - Queue stuck detection 82 - WP-Cron-only warning 83 84 Designed as an operations console — not just a webhook sender. 43 85 44 86 = Payload Mapping = 45 87 46 - Transform payload structure before dispatch 88 Adapt outgoing JSON payloads to match any external API: 89 47 90 - Rename fields using dot notation 48 - Exclude selected fields from webhook payload 49 - Restructure payload to match external API requirements 50 - Store example payloads to assist configuration 51 52 = Logging = 53 54 - Log webhook delivery attempts 55 - Store HTTP status codes and response bodies 56 - View delivery history per webhook 57 - Automatic cleanup based on retention settings 91 - Restructure nested objects 92 - Exclude sensitive or unnecessary data 93 - Store example payloads for configuration 94 - Modify via `fswa_payload` filter 95 96 Payloads always include stable event metadata for consistency. 58 97 59 98 = Developer Friendly = 60 99 61 - Internal REST endpoints used by the admin interface62 - Extensible via WordPress filters and actions63 - Clean namespace and unique prefixes to avoid conflicts64 - Built following WordPress.org coding standards65 66 = Why choose Flow Systems Webhook Actions? =67 68 100 - Works with any WordPress or WooCommerce action 69 - Reliable background dispatch with retry logic 70 - Payload mapping for adapting data to external systems 71 - Transparent logging and delivery tracking 72 - Designed for automation builders and developers 101 - Internal REST endpoints power the admin interface 102 - Fully extensible via filters and actions 103 - Clean namespace and unique prefixes 104 - Built according to WordPress.org standards 105 - Supports system cron for improved reliability 106 107 = Why Choose Flow Systems Webhook Actions? = 108 109 Most WordPress webhook setups fire once, don't retry intelligently, don't provide delivery visibility, and don't expose event identity. 110 111 Flow Systems Webhook Actions provides: 112 113 - Persistent queue 114 - Smart retry logic 115 - Permanent failure state handling 116 - Event UUIDs and timestamps 117 - Full delivery logging and metrics 118 119 Built for developers who need production-grade automation reliability. 73 120 74 121 = Available Filters = … … 103 150 An action is a WordPress hook triggered at a specific moment, such as when a user is created, a post is saved, or an order is completed. 104 151 105 = Can I use this plugin with n8n ? =106 107 Yes. This plugin works seamlessly with n8n webhook triggers and is designed with automation workflows in mind.152 = Can I use this plugin with n8n or other automation tools? = 153 154 This plugin works seamlessly with n8n webhook triggers and can be used with any automation platform or external API that accepts HTTP webhooks. 108 155 109 156 = Does this plugin support WooCommerce hooks? = … … 139 186 == Changelog == 140 187 188 = 1.1.0 = 189 - Added event identity: each trigger dispatch generates a shared UUID and timestamp sent as `X-Event-Id` / `X-Event-Timestamp` headers and embedded in the payload under `event.{id,timestamp,version}` 190 - Added smart retry routing: 5xx and 429 responses trigger an automatic retry with exponential backoff; 4xx and 3xx responses are immediately marked as permanently failed 191 - Added `permanently_failed` status for non-retryable delivery failures 192 - Added attempt history: each delivery attempt is recorded as a JSON array on the log entry, visible in the admin timeline view 193 - Added per-log retry and bulk retry REST endpoints (`POST /logs/{id}/retry`, `POST /logs/bulk-retry`) 194 - Added `event_uuid` and `target_url` filter parameters to logs and queue REST endpoints 195 - Added date range filtering (`date_from`, `date_to`) to logs and queue list views with a shadcn-style calendar date/time picker 196 - Added health observability metrics: average attempts per event, oldest pending age, queue stuck detection, WP-Cron-only warning 197 - Added `queue.log_id` column linking queue jobs to their log entries 198 - Updated admin UI: permanently failed badge, attempt timeline, per-row retry button, bulk retry, observability warning banners, new filter inputs 199 - Updated footer with a review prompt linking to WordPress.org 200 141 201 = 1.0.1 = 142 202 - Fixed preview freezing when mapping fields from objects with numeric string keys (e.g. WooCommerce line_items) … … 155 215 == Upgrade Notice == 156 216 217 = 1.1.0 = 218 This release adds new database columns (`event_uuid`, `event_timestamp`, `attempt_history`, `next_attempt_at` on logs; `log_id` on queue). The migration runs automatically on plugin activation or update. No manual steps required. 219 157 220 = 1.0.0 = 158 221 Initial stable release. -
flowsystems-webhook-actions/trunk/admin/dist/.vite/manifest.json
r3464666 r3471792 1 1 { 2 2 "src/main.js": { 3 "file": "assets/main- BEzXuheK.js",3 "file": "assets/main-C1P6l3fn.js", 4 4 "name": "main", 5 5 "src": "src/main.js", … … 7 7 }, 8 8 "style.css": { 9 "file": "assets/style-D QRXYd6m.css",9 "file": "assets/style-DPbNsgg4.css", 10 10 "src": "style.css" 11 11 } -
flowsystems-webhook-actions/trunk/flowsystems-webhook-actions.php
r3464666 r3471792 4 4 * Plugin URI: https://flowsystems.pl/wordpress-webhook-actions 5 5 * Description: Trigger HTTP webhooks from WordPress actions (do_action). Easily connect WordPress with n8n, Zapier, Make, or custom workflows. 6 * Version: 1. 0.16 * Version: 1.1.0 7 7 * Author: Mateusz Skorupa 8 8 * Author URI: https://flowsystems.pl … … 17 17 defined('ABSPATH') || exit; 18 18 19 define('FSWA_VERSION', '1. 0.1');19 define('FSWA_VERSION', '1.1.0'); 20 20 define('FSWA_FILE', __FILE__); 21 21 -
flowsystems-webhook-actions/trunk/src/Activation.php
r3462891 r3471792 64 64 http_code SMALLINT UNSIGNED DEFAULT NULL, 65 65 request_payload LONGTEXT, 66 original_payload LONGTEXT DEFAULT NULL, 67 mapping_applied TINYINT(1) NOT NULL DEFAULT 0, 66 68 response_body LONGTEXT, 67 69 error_message TEXT, 68 70 duration_ms INT UNSIGNED DEFAULT NULL, 71 event_uuid VARCHAR(36) DEFAULT NULL, 72 event_timestamp DATETIME DEFAULT NULL, 73 attempt_history LONGTEXT DEFAULT NULL, 74 next_attempt_at DATETIME DEFAULT NULL, 69 75 created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, 70 76 PRIMARY KEY (id), … … 72 78 KEY idx_status (status), 73 79 KEY idx_created (created_at), 74 KEY idx_webhook_created (webhook_id, created_at) 80 KEY idx_webhook_created (webhook_id, created_at), 81 KEY idx_event_uuid (event_uuid) 75 82 ) {$charsetCollate};"; 76 83 … … 89 96 locked_by VARCHAR(64) DEFAULT NULL, 90 97 scheduled_at DATETIME NOT NULL, 98 log_id BIGINT UNSIGNED DEFAULT NULL, 91 99 created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, 92 100 PRIMARY KEY (id), 93 101 KEY idx_status_scheduled (status, scheduled_at), 94 102 KEY idx_locked (locked_at, locked_by), 95 KEY idx_webhook (webhook_id) 103 KEY idx_webhook (webhook_id), 104 KEY idx_log_id (log_id) 96 105 ) {$charsetCollate};"; 97 106 98 107 dbDelta($sqlQueue); 99 108 100 101 update_option('fswa_db_version', '1.0.0'); 109 update_option('fswa_db_version', '1.1.0'); 102 110 } 103 111 -
flowsystems-webhook-actions/trunk/src/Api/HealthController.php
r3462891 r3471792 84 84 $velocityStats = $this->logRepository->getVelocityStats(); 85 85 86 $oldestPendingAge = $this->logRepository->getOldestPendingAgeSeconds(); 87 86 88 return rest_ensure_response([ 87 89 'success_rate' => $successRate, … … 98 100 'pending' => $recentLogStats['pending'], 99 101 'retry' => $recentLogStats['retry'], 102 'permanently_failed' => $recentLogStats['permanently_failed'] ?? 0, 100 103 ], 101 104 'queue' => [ … … 104 107 'completed' => $queueStats['completed'], 105 108 'failed' => $queueStats['failed'], 109 'permanently_failed' => $queueStats['permanently_failed'] ?? 0, 106 110 'total' => $queueStats['total'], 107 111 'due_now' => $queueStats['due_now'], … … 112 116 'avg_duration_ms' => $velocityStats['avg_duration_ms'], 113 117 ], 118 'observability' => [ 119 'avg_attempts_per_event' => $this->logRepository->getAvgAttemptsPerEvent(), 120 'oldest_pending_age_seconds' => $oldestPendingAge, 121 'queue_stuck' => ($oldestPendingAge ?? 0) > 600, 122 'wp_cron_only' => (int) get_option('fswa_last_cron_run', 0) === 0, 123 ], 114 124 ]); 115 125 } -
flowsystems-webhook-actions/trunk/src/Api/LogsController.php
r3462891 r3471792 11 11 use WP_Error; 12 12 use FlowSystems\WebhookActions\Repositories\LogRepository; 13 use FlowSystems\WebhookActions\Repositories\QueueRepository; 14 use FlowSystems\WebhookActions\Services\QueueService; 13 15 14 16 class LogsController extends WP_REST_Controller { … … 17 19 18 20 private LogRepository $repository; 21 private QueueRepository $queueRepository; 22 private QueueService $queueService; 19 23 20 24 public function __construct() { 21 25 $this->repository = new LogRepository(); 26 $this->queueRepository = new QueueRepository(); 27 $this->queueService = new QueueService($this->queueRepository); 22 28 } 23 29 … … 59 65 'callback' => [$this, 'deleteItem'], 60 66 'permission_callback' => [$this, 'deleteItemPermissionsCheck'], 67 ], 68 ]); 69 70 // Retry the queue job associated with a single log 71 register_rest_route($this->namespace, '/' . $this->rest_base . '/(?P<id>[\d]+)/retry', [ 72 [ 73 'methods' => WP_REST_Server::CREATABLE, 74 'callback' => [$this, 'retryItem'], 75 'permission_callback' => [$this, 'getItemPermissionsCheck'], 76 ], 77 ]); 78 79 // Bulk retry queue jobs associated with multiple logs 80 register_rest_route($this->namespace, '/' . $this->rest_base . '/bulk-retry', [ 81 [ 82 'methods' => WP_REST_Server::CREATABLE, 83 'callback' => [$this, 'bulkRetry'], 84 'permission_callback' => [$this, 'getItemsPermissionsCheck'], 85 'args' => [ 86 'ids' => [ 87 'description' => __('Array of log IDs to retry.', 'flowsystems-webhook-actions'), 88 'type' => 'array', 89 'items' => ['type' => 'integer'], 90 'required' => true, 91 'minItems' => 1, 92 ], 93 ], 61 94 ], 62 95 ]); … … 145 178 } 146 179 180 if ($request->get_param('event_uuid')) { 181 $filters['event_uuid'] = sanitize_text_field($request->get_param('event_uuid')); 182 } 183 184 if ($request->get_param('target_url')) { 185 $filters['target_url'] = sanitize_text_field($request->get_param('target_url')); 186 } 187 147 188 $page = (int) ($request->get_param('page') ?: 1); 148 189 $perPage = (int) ($request->get_param('per_page') ?: 20); … … 224 265 'deleted' => $deleted, 225 266 'older_than' => $date, 267 ]); 268 } 269 270 /** 271 * Retry the queue job associated with a log entry 272 */ 273 public function retryItem($request): WP_REST_Response|WP_Error { 274 $logId = (int) $request->get_param('id'); 275 276 $log = $this->repository->find($logId); 277 278 if (!$log) { 279 return new WP_Error( 280 'rest_log_not_found', 281 __('Log not found.', 'flowsystems-webhook-actions'), 282 ['status' => 404] 283 ); 284 } 285 286 $job = $this->queueRepository->findByLogId($logId); 287 288 if (!$job) { 289 return new WP_Error( 290 'rest_job_not_found', 291 __('No queue job found for this log entry.', 'flowsystems-webhook-actions'), 292 ['status' => 404] 293 ); 294 } 295 296 if (!in_array($job['status'], ['failed', 'permanently_failed'], true)) { 297 return new WP_Error( 298 'rest_job_not_retryable', 299 __('Only failed or permanently failed jobs can be retried.', 'flowsystems-webhook-actions'), 300 ['status' => 409] 301 ); 302 } 303 304 $result = $this->queueService->forceRetry((int) $job['id']); 305 306 if (!$result) { 307 return new WP_Error( 308 'rest_retry_failed', 309 __('Failed to retry job.', 'flowsystems-webhook-actions'), 310 ['status' => 500] 311 ); 312 } 313 314 return rest_ensure_response([ 315 'success' => true, 316 'job_id' => (int) $job['id'], 317 ]); 318 } 319 320 /** 321 * Bulk retry queue jobs for multiple log entries 322 */ 323 public function bulkRetry($request): WP_REST_Response { 324 $ids = (array) $request->get_param('ids'); 325 $retried = 0; 326 $skipped = 0; 327 328 foreach ($ids as $logId) { 329 $logId = (int) $logId; 330 $job = $this->queueRepository->findByLogId($logId); 331 332 if (!$job || !in_array($job['status'], ['failed', 'permanently_failed'], true)) { 333 $skipped++; 334 continue; 335 } 336 337 if ($this->queueService->forceRetry((int) $job['id'])) { 338 $retried++; 339 } else { 340 $skipped++; 341 } 342 } 343 344 return rest_ensure_response([ 345 'retried' => $retried, 346 'skipped' => $skipped, 226 347 ]); 227 348 } … … 282 403 'description' => __('Filter by status.', 'flowsystems-webhook-actions'), 283 404 'type' => 'string', 284 'enum' => ['success', 'error', 'retry', 'pending' ],405 'enum' => ['success', 'error', 'retry', 'pending', 'permanently_failed'], 285 406 ], 286 407 'trigger_name' => [ … … 291 412 'description' => __('Filter logs from this date.', 'flowsystems-webhook-actions'), 292 413 'type' => 'string', 293 'format' => 'date-time',294 414 ], 295 415 'date_to' => [ 296 416 'description' => __('Filter logs until this date.', 'flowsystems-webhook-actions'), 297 417 'type' => 'string', 298 'format' => 'date-time', 418 ], 419 'event_uuid' => [ 420 'description' => __('Filter by event UUID (exact match).', 'flowsystems-webhook-actions'), 421 'type' => 'string', 422 ], 423 'target_url' => [ 424 'description' => __('Filter by target URL (partial match).', 'flowsystems-webhook-actions'), 425 'type' => 'string', 299 426 ], 300 427 ]; -
flowsystems-webhook-actions/trunk/src/Api/QueueController.php
r3462891 r3471792 35 35 'status' => [ 36 36 'type' => 'string', 37 'enum' => ['pending', 'processing', 'completed', 'failed' ],37 'enum' => ['pending', 'processing', 'completed', 'failed', 'permanently_failed'], 38 38 ], 39 39 'webhook_id' => [ 40 40 'type' => 'integer', 41 ], 42 'event_uuid' => [ 43 'type' => 'string', 44 ], 45 'target_url' => [ 46 'type' => 'string', 47 ], 48 'date_from' => [ 49 'type' => 'string', 50 ], 51 'date_to' => [ 52 'type' => 'string', 41 53 ], 42 54 'per_page' => [ … … 110 122 } 111 123 112 public function permissionsCheck(WP_REST_Request $ request): bool {124 public function permissionsCheck(WP_REST_Request $_request): bool { 113 125 return current_user_can('manage_options'); 114 126 } … … 132 144 } 133 145 146 if ($request->get_param('event_uuid')) { 147 $filters['event_uuid'] = sanitize_text_field($request->get_param('event_uuid')); 148 } 149 150 if ($request->get_param('target_url')) { 151 $filters['target_url'] = sanitize_text_field($request->get_param('target_url')); 152 } 153 154 if ($request->get_param('date_from')) { 155 $filters['date_from'] = sanitize_text_field($request->get_param('date_from')); 156 } 157 158 if ($request->get_param('date_to')) { 159 $filters['date_to'] = sanitize_text_field($request->get_param('date_to')); 160 } 161 134 162 $jobs = $this->queueService->getJobs($filters, $perPage, $offset); 135 163 $total = $this->queueService->countJobs($filters); … … 152 180 * Get queue statistics 153 181 */ 154 public function getStats($request): WP_REST_Response { 182 /** @noinspection PhpUnusedParameterInspection */ 183 public function getStats(WP_REST_Request $_request): WP_REST_Response { 155 184 $stats = $this->queueService->getStats(); 156 185 … … 190 219 } 191 220 192 // Lock and process the job 221 // Decode payload 222 $jobData = json_decode($job['payload'], true); 223 if (!$jobData || !isset($jobData['webhook']) || !isset($jobData['payload'])) { 224 return new WP_Error( 225 'rest_job_invalid', 226 __('Job has invalid payload data.', 'flowsystems-webhook-actions'), 227 ['status' => 500] 228 ); 229 } 230 231 // Lock the job 193 232 $lockId = wp_generate_uuid4(); 194 233 if (!$this->queueService->lockJob($jobId, $lockId)) { … … 200 239 } 201 240 202 // Process the job203 $jobData = json_decode($job['payload'], true);204 if (!$jobData || !isset($jobData['webhook']) || !isset($jobData['payload'])) {205 $this->queueService->unlockJob($jobId);206 return new WP_Error(207 'rest_job_invalid',208 __('Job has invalid payload data.', 'flowsystems-webhook-actions'),209 ['status' => 500]210 );211 }212 213 241 $webhook = $jobData['webhook']; 214 242 $payload = $jobData['payload']; 215 243 $trigger = $job['trigger_name']; 244 $logId = !empty($job['log_id']) ? (int) $job['log_id'] : (!empty($jobData['log_id']) ? (int) $jobData['log_id'] : null); 245 $attempts = (int) ($job['attempts'] ?? 0); 216 246 217 247 $transport = new WPHttpTransport(); 218 $queueService = new QueueService(); 219 $dispatcher = new Dispatcher($transport, $queueService); 220 221 $success = $dispatcher->sendToWebhook($webhook, $payload, $trigger); 222 223 if ($success) { 248 $dispatcher = new Dispatcher($transport, $this->queueService); 249 250 $result = $dispatcher->sendToWebhook($webhook, $payload, $trigger, $logId, $attempts); 251 252 if ($result['success']) { 224 253 $this->queueService->markCompleted($jobId); 225 254 return rest_ensure_response([ … … 227 256 'message' => __('Job executed successfully.', 'flowsystems-webhook-actions'), 228 257 ]); 229 } else { 230 // Check if we should reschedule or mark as failed 231 if ($this->queueService->rescheduleWithBackoff($jobId)) { 232 return rest_ensure_response([ 233 'success' => false, 234 'message' => __('Job failed and has been rescheduled for retry.', 'flowsystems-webhook-actions'), 235 'rescheduled' => true, 236 ]); 237 } else { 238 return rest_ensure_response([ 239 'success' => false, 240 'message' => __('Job failed and has exceeded maximum retry attempts.', 'flowsystems-webhook-actions'), 241 'rescheduled' => false, 242 ]); 243 } 244 } 258 } 259 260 if ($result['shouldRetry']) { 261 $reschedule = $this->queueService->rescheduleWithBackoff($jobId); 262 return rest_ensure_response([ 263 'success' => false, 264 'rescheduled' => $reschedule['rescheduled'], 265 'message' => $reschedule['rescheduled'] 266 ? __('Job failed and has been rescheduled for retry.', 'flowsystems-webhook-actions') 267 : __('Job failed and has exceeded maximum retry attempts.', 'flowsystems-webhook-actions'), 268 ]); 269 } 270 271 // Non-retryable failure (4xx, bad config, etc.) 272 $this->queueService->markPermanentlyFailed($jobId); 273 return rest_ensure_response([ 274 'success' => false, 275 'rescheduled' => false, 276 'permanently_failed' => true, 277 'message' => __('Job failed with a non-retryable error.', 'flowsystems-webhook-actions'), 278 ]); 245 279 } 246 280 … … 301 335 } 302 336 303 if ( $job['status'] !== 'failed') {304 return new WP_Error( 305 'rest_job_not_ failed',306 __('Only failed jobs can be retried.', 'flowsystems-webhook-actions'),337 if (!in_array($job['status'], ['failed', 'permanently_failed'], true)) { 338 return new WP_Error( 339 'rest_job_not_retryable', 340 __('Only failed or permanently failed jobs can be retried.', 'flowsystems-webhook-actions'), 307 341 ['status' => 409] 308 342 ); … … 331 365 $jobData = json_decode($job['payload'], true); 332 366 $webhook = $jobData['webhook'] ?? []; 367 $eventPayload = $jobData['payload'] ?? []; 333 368 334 369 $scheduledTimestamp = strtotime($job['scheduled_at']); … … 337 372 return [ 338 373 'id' => (int) $job['id'], 374 'log_id' => isset($job['log_id']) ? (int) $job['log_id'] : null, 375 'event_uuid' => $eventPayload['event']['id'] ?? null, 339 376 'webhook_id' => (int) $job['webhook_id'], 340 377 'webhook_name' => $webhook['name'] ?? null, -
flowsystems-webhook-actions/trunk/src/App.php
r3462891 r3471792 12 12 13 13 class App { 14 const VERSION = '1. 0.0';14 const VERSION = '1.1.0'; 15 15 const SLUG = 'flowsystems-webhook-actions'; 16 16 -
flowsystems-webhook-actions/trunk/src/Database/Migrator.php
r3462891 r3471792 5 5 class Migrator { 6 6 private const OPTION_KEY = 'fswa_db_version'; 7 private const CURRENT_VERSION = '1. 0.0';7 private const CURRENT_VERSION = '1.1.0'; 8 8 9 9 /** … … 65 65 return [ 66 66 '1.0.0' => [self::class, 'migration_1_0_0'], 67 '1.1.0' => [self::class, 'migration_1_1_0'], 67 68 ]; 68 69 } … … 174 175 175 176 /** 177 * Migration 1.1.0 - Add event identity and attempt history columns 178 */ 179 public static function migration_1_1_0(): void { 180 global $wpdb; 181 182 $logsTable = $wpdb->prefix . 'fswa_logs'; 183 $queueTable = $wpdb->prefix . 'fswa_queue'; 184 185 // Columns to add to fswa_logs 186 $logsColumns = [ 187 'event_uuid' => "ALTER TABLE {$logsTable} ADD COLUMN event_uuid VARCHAR(36) DEFAULT NULL", 188 'event_timestamp' => "ALTER TABLE {$logsTable} ADD COLUMN event_timestamp DATETIME DEFAULT NULL", 189 'attempt_history' => "ALTER TABLE {$logsTable} ADD COLUMN attempt_history LONGTEXT DEFAULT NULL", 190 'next_attempt_at' => "ALTER TABLE {$logsTable} ADD COLUMN next_attempt_at DATETIME DEFAULT NULL", 191 ]; 192 193 // phpcs:disable WordPress.DB.DirectDatabaseQuery.DirectQuery, WordPress.DB.DirectDatabaseQuery.NoCaching, WordPress.DB.DirectDatabaseQuery.SchemaChange, WordPress.DB.PreparedSQL.InterpolatedNotPrepared, WordPress.DB.PreparedSQL.NotPrepared, PluginCheck.Security.DirectDB.UnescapedDBParameter 194 foreach ($logsColumns as $column => $sql) { 195 $exists = $wpdb->get_var($wpdb->prepare( 196 "SHOW COLUMNS FROM {$logsTable} LIKE %s", 197 $column 198 )); 199 if (!$exists) { 200 $wpdb->query($sql); 201 } 202 } 203 204 // Add index on event_uuid if not exists 205 $indexExists = $wpdb->get_var( 206 "SHOW INDEX FROM {$logsTable} WHERE Key_name = 'idx_event_uuid'" 207 ); 208 if (!$indexExists) { 209 $wpdb->query("ALTER TABLE {$logsTable} ADD KEY idx_event_uuid (event_uuid)"); 210 } 211 212 // Add log_id column to fswa_queue 213 $logIdExists = $wpdb->get_var($wpdb->prepare( 214 "SHOW COLUMNS FROM {$queueTable} LIKE %s", 215 'log_id' 216 )); 217 if (!$logIdExists) { 218 $wpdb->query("ALTER TABLE {$queueTable} ADD COLUMN log_id BIGINT UNSIGNED DEFAULT NULL"); 219 $wpdb->query("ALTER TABLE {$queueTable} ADD KEY idx_log_id (log_id)"); 220 } 221 // phpcs:enable WordPress.DB.DirectDatabaseQuery.DirectQuery, WordPress.DB.DirectDatabaseQuery.NoCaching, WordPress.DB.DirectDatabaseQuery.SchemaChange, WordPress.DB.PreparedSQL.InterpolatedNotPrepared, WordPress.DB.PreparedSQL.NotPrepared, PluginCheck.Security.DirectDB.UnescapedDBParameter 222 } 223 224 /** 176 225 * Get current database version 177 226 */ -
flowsystems-webhook-actions/trunk/src/Repositories/LogRepository.php
r3462891 r3471792 54 54 } 55 55 56 if (!empty($filters['event_uuid'])) { 57 $whereClauses[] = "l.event_uuid LIKE %s"; 58 $whereValues[] = '%' . $wpdb->esc_like($filters['event_uuid']) . '%'; 59 } 60 61 if (!empty($filters['target_url'])) { 62 $whereClauses[] = "w.endpoint_url LIKE %s"; 63 $whereValues[] = '%' . $wpdb->esc_like($filters['target_url']) . '%'; 64 } 65 56 66 $whereSql = !empty($whereClauses) 57 67 ? "WHERE " . implode(' AND ', $whereClauses) 58 68 : ""; 59 69 70 // Always join so target_url filter works in both count and items queries 71 $joinSql = "LEFT JOIN {$this->webhooksTable} w ON l.webhook_id = w.id"; 72 60 73 // Count total 61 $countQuery = "SELECT COUNT(*) FROM {$this->logsTable} l {$ whereSql}";74 $countQuery = "SELECT COUNT(*) FROM {$this->logsTable} l {$joinSql} {$whereSql}"; 62 75 if (!empty($whereValues)) { 63 76 // phpcs:ignore WordPress.DB.PreparedSQL.InterpolatedNotPrepared, WordPress.DB.PreparedSQL.NotPrepared … … 73 86 $items = $wpdb->get_results( 74 87 $wpdb->prepare( 75 "SELECT l.*, w.name as webhook_name 88 "SELECT l.*, w.name as webhook_name, w.endpoint_url as target_url 76 89 FROM {$this->logsTable} l 77 LEFT JOIN {$this->webhooksTable} w ON l.webhook_id = w.id90 {$joinSql} 78 91 {$whereSql} 79 92 ORDER BY l.created_at DESC … … 96 109 $item['response_body'] = $decoded !== null ? $decoded : $item['response_body']; 97 110 } 111 if (!empty($item['attempt_history'])) { 112 $decoded = json_decode($item['attempt_history'], true); 113 $item['attempt_history'] = $decoded !== null ? $decoded : []; 114 } 98 115 $item['mapping_applied'] = (bool) ($item['mapping_applied'] ?? false); 99 116 } … … 120 137 $log = $wpdb->get_row( 121 138 $wpdb->prepare( 122 "SELECT l.*, w.name as webhook_name 139 "SELECT l.*, w.name as webhook_name, w.endpoint_url as target_url 123 140 FROM {$this->logsTable} l 124 141 LEFT JOIN {$this->webhooksTable} w ON l.webhook_id = w.id … … 143 160 $decoded = json_decode($log['response_body'], true); 144 161 $log['response_body'] = $decoded !== null ? $decoded : $log['response_body']; 162 } 163 if (!empty($log['attempt_history'])) { 164 $decoded = json_decode($log['attempt_history'], true); 165 $log['attempt_history'] = $decoded !== null ? $decoded : []; 145 166 } 146 167 $log['mapping_applied'] = (bool) ($log['mapping_applied'] ?? false); … … 176 197 'error_message' => $data['error_message'] ?? null, 177 198 'duration_ms' => $data['duration_ms'] ?? null, 199 'event_uuid' => $data['event_uuid'] ?? null, 200 'event_timestamp' => $data['event_timestamp'] ?? null, 201 'attempt_history' => isset($data['attempt_history']) 202 ? (is_array($data['attempt_history']) ? wp_json_encode($data['attempt_history']) : $data['attempt_history']) 203 : null, 204 'next_attempt_at' => $data['next_attempt_at'] ?? null, 178 205 ], 179 ['%d', '%s', '%s', '%d', '%s', '%s', '%d', '%s', '%s', '%d' ]206 ['%d', '%s', '%s', '%d', '%s', '%s', '%d', '%s', '%s', '%d', '%s', '%s', '%s', '%s'] 180 207 ); 181 208 … … 219 246 $updateData['duration_ms'] = $data['duration_ms']; 220 247 $format[] = '%d'; 248 } 249 250 if (isset($data['attempt_history'])) { 251 $updateData['attempt_history'] = is_array($data['attempt_history']) 252 ? wp_json_encode($data['attempt_history']) 253 : $data['attempt_history']; 254 $format[] = '%s'; 255 } 256 257 if (array_key_exists('next_attempt_at', $data)) { 258 $updateData['next_attempt_at'] = $data['next_attempt_at']; 259 $format[] = '%s'; 221 260 } 222 261 … … 340 379 'pending' => 0, 341 380 'retry' => 0, 381 'permanently_failed' => 0, 342 382 ]; 343 383 344 384 foreach ($stats as $stat) { 345 $result[$stat['status']] = (int) $stat['count']; 385 if (array_key_exists($stat['status'], $result)) { 386 $result[$stat['status']] = (int) $stat['count']; 387 } 346 388 } 347 389 … … 462 504 ]; 463 505 } 506 507 /** 508 * Get average number of attempts per event (last 7 days) 509 * 510 * @return float 511 */ 512 public function getAvgAttemptsPerEvent(): float { 513 global $wpdb; 514 515 $queueTable = $wpdb->prefix . 'fswa_queue'; 516 $sevenDaysAgo = gmdate('Y-m-d H:i:s', strtotime('-7 days')); 517 518 // phpcs:disable WordPress.DB.DirectDatabaseQuery.DirectQuery, WordPress.DB.DirectDatabaseQuery.NoCaching, WordPress.DB.PreparedSQL.InterpolatedNotPrepared, PluginCheck.Security.DirectDB.UnescapedDBParameter 519 $avg = $wpdb->get_var( 520 $wpdb->prepare( 521 "SELECT AVG(q.attempts + 1) 522 FROM {$this->logsTable} l 523 INNER JOIN {$queueTable} q ON q.log_id = l.id 524 WHERE l.status IN ('success', 'error', 'permanently_failed') 525 AND l.created_at >= %s", 526 $sevenDaysAgo 527 ) 528 ); 529 // phpcs:enable WordPress.DB.DirectDatabaseQuery.DirectQuery, WordPress.DB.DirectDatabaseQuery.NoCaching, WordPress.DB.PreparedSQL.InterpolatedNotPrepared, PluginCheck.Security.DirectDB.UnescapedDBParameter 530 531 return $avg !== null ? round((float) $avg, 2) : 0.0; 532 } 533 534 /** 535 * Get age in seconds of the oldest pending log 536 * 537 * @return int|null Null if no pending logs 538 */ 539 public function getOldestPendingAgeSeconds(): ?int { 540 global $wpdb; 541 542 // phpcs:disable WordPress.DB.DirectDatabaseQuery.DirectQuery, WordPress.DB.DirectDatabaseQuery.NoCaching, WordPress.DB.PreparedSQL.InterpolatedNotPrepared, PluginCheck.Security.DirectDB.UnescapedDBParameter 543 $oldest = $wpdb->get_var( 544 "SELECT MIN(created_at) FROM {$this->logsTable} WHERE status = 'pending'" 545 ); 546 // phpcs:enable WordPress.DB.DirectDatabaseQuery.DirectQuery, WordPress.DB.DirectDatabaseQuery.NoCaching, WordPress.DB.PreparedSQL.InterpolatedNotPrepared, PluginCheck.Security.DirectDB.UnescapedDBParameter 547 548 if ($oldest === null) { 549 return null; 550 } 551 552 return (int) (time() - strtotime($oldest)); 553 } 554 555 /** 556 * Find log IDs by status (for bulk operations) 557 * 558 * @param array $statuses 559 * @param int $limit 560 * @return int[] 561 */ 562 public function findIdsByStatus(array $statuses, int $limit = 100): array { 563 global $wpdb; 564 565 if (empty($statuses)) { 566 return []; 567 } 568 569 $placeholders = implode(', ', array_fill(0, count($statuses), '%s')); 570 571 // phpcs:disable WordPress.DB.DirectDatabaseQuery.DirectQuery, WordPress.DB.DirectDatabaseQuery.NoCaching, WordPress.DB.PreparedSQL.InterpolatedNotPrepared, WordPress.DB.PreparedSQLPlaceholders.ReplacementsWrongNumber, PluginCheck.Security.DirectDB.UnescapedDBParameter 572 $ids = $wpdb->get_col( 573 $wpdb->prepare( 574 "SELECT id FROM {$this->logsTable} WHERE status IN ({$placeholders}) ORDER BY id ASC LIMIT %d", 575 array_merge($statuses, [$limit]) 576 ) 577 ); 578 // phpcs:enable WordPress.DB.DirectDatabaseQuery.DirectQuery, WordPress.DB.DirectDatabaseQuery.NoCaching, WordPress.DB.PreparedSQL.InterpolatedNotPrepared, WordPress.DB.PreparedSQLPlaceholders.ReplacementsWrongNumber, PluginCheck.Security.DirectDB.UnescapedDBParameter 579 580 return array_map('intval', $ids ?: []); 581 } 464 582 } -
flowsystems-webhook-actions/trunk/src/Repositories/QueueRepository.php
r3462891 r3471792 261 261 } 262 262 263 if (!empty($filters['event_uuid'])) { 264 $where[] = "JSON_UNQUOTE(JSON_EXTRACT(payload, '$.payload.event.id')) LIKE %s"; 265 $params[] = '%' . $wpdb->esc_like($filters['event_uuid']) . '%'; 266 } 267 268 if (!empty($filters['target_url'])) { 269 $where[] = "JSON_UNQUOTE(JSON_EXTRACT(payload, '$.webhook.endpoint_url')) LIKE %s"; 270 $params[] = '%' . $wpdb->esc_like($filters['target_url']) . '%'; 271 } 272 273 if (!empty($filters['date_from'])) { 274 $where[] = 'created_at >= %s'; 275 $params[] = $filters['date_from']; 276 } 277 278 if (!empty($filters['date_to'])) { 279 $where[] = 'created_at <= %s'; 280 $params[] = $filters['date_to']; 281 } 282 263 283 $whereClause = implode(' AND ', $where); 264 284 … … 300 320 $where[] = 'webhook_id = %d'; 301 321 $params[] = (int) $filters['webhook_id']; 322 } 323 324 if (!empty($filters['event_uuid'])) { 325 $where[] = "JSON_UNQUOTE(JSON_EXTRACT(payload, '$.payload.event.id')) LIKE %s"; 326 $params[] = '%' . $wpdb->esc_like($filters['event_uuid']) . '%'; 327 } 328 329 if (!empty($filters['target_url'])) { 330 $where[] = "JSON_UNQUOTE(JSON_EXTRACT(payload, '$.webhook.endpoint_url')) LIKE %s"; 331 $params[] = '%' . $wpdb->esc_like($filters['target_url']) . '%'; 332 } 333 334 if (!empty($filters['date_from'])) { 335 $where[] = 'created_at >= %s'; 336 $params[] = $filters['date_from']; 337 } 338 339 if (!empty($filters['date_to'])) { 340 $where[] = 'created_at <= %s'; 341 $params[] = $filters['date_to']; 302 342 } 303 343 … … 323 363 324 364 /** 365 * Find a queue job by its associated log ID 366 * 367 * @param int $logId 368 * @return array|null 369 */ 370 public function findByLogId(int $logId): ?array { 371 global $wpdb; 372 373 // phpcs:disable WordPress.DB.DirectDatabaseQuery.DirectQuery, WordPress.DB.DirectDatabaseQuery.NoCaching, WordPress.DB.PreparedSQL.InterpolatedNotPrepared 374 $job = $wpdb->get_row( 375 $wpdb->prepare( 376 "SELECT * FROM {$this->table} WHERE log_id = %d LIMIT 1", 377 $logId 378 ), 379 ARRAY_A 380 ); 381 // phpcs:enable WordPress.DB.DirectDatabaseQuery.DirectQuery, WordPress.DB.DirectDatabaseQuery.NoCaching, WordPress.DB.PreparedSQL.InterpolatedNotPrepared 382 383 return $job ?: null; 384 } 385 386 /** 325 387 * Delete old completed jobs 326 388 * -
flowsystems-webhook-actions/trunk/src/Services/Dispatcher.php
r3464666 r3471792 57 57 } 58 58 59 // Generate event identity once per trigger dispatch, shared across all webhooks 60 $eventUuid = wp_generate_uuid4(); 61 $eventTimestamp = gmdate('Y-m-d\TH:i:s\Z'); 62 59 63 /** 60 64 * Filter the webhook payload before dispatching. … … 67 71 'fswa_payload', 68 72 [ 69 'hook' => $trigger, 70 'args' => $this->normalizeArgs($args), 73 'event' => [ 74 'id' => $eventUuid, 75 'timestamp' => $eventTimestamp, 76 'version' => '1.0', 77 ], 78 'hook' => $trigger, 79 'args' => $this->normalizeArgs($args), 71 80 'timestamp' => time(), 72 'site' => [81 'site' => [ 73 82 'url' => home_url(), 74 83 ], … … 106 115 $transformedPayload, 107 116 $originalPayload, 108 $mappingApplied 117 $mappingApplied, 118 $eventUuid, 119 $eventTimestamp 109 120 ); 110 121 111 $this->queueService->enqueue($webhookId, $trigger, [ 112 'webhook' => $webhook, 113 'payload' => $transformedPayload, 114 'log_id' => $logId, 115 'mapping_applied' => $mappingApplied, 116 'original_payload' => $originalPayload, 117 ]); 122 $this->queueService->enqueue( 123 $webhookId, 124 $trigger, 125 [ 126 'webhook' => $webhook, 127 'payload' => $transformedPayload, 128 'log_id' => $logId, 129 'mapping_applied' => $mappingApplied, 130 'original_payload' => $originalPayload, 131 ], 132 null, 133 $logId ?: null 134 ); 118 135 } 119 136 } … … 127 144 public function process(int $batchSize = 10): array { 128 145 $result = [ 129 'processed' => 0,130 'succeeded' => 0,131 'failed' => 0,132 'rescheduled' => 0,146 'processed' => 0, 147 'succeeded' => 0, 148 'failed' => 0, 149 'rescheduled' => 0, 133 150 'stale_cleaned' => 0, 134 151 ]; 152 153 // Record that the queue processor has run (used by health observability) 154 update_option('fswa_last_cron_run', time(), false); 135 155 136 156 // Step 1: Cleanup stale locks … … 158 178 $result['processed']++; 159 179 160 // Process the job 161 $success = $this->processJob($job); 162 163 if ($success) { 180 $resultData = $this->processJob($job); 181 182 if ($resultData['success']) { 164 183 $this->queueService->markCompleted($jobId); 165 184 $result['succeeded']++; 166 } else { 167 // Try to reschedule with backoff 168 if ($this->queueService->rescheduleWithBackoff($jobId)) { 185 } elseif ($resultData['shouldRetry']) { 186 $rescheduleResult = $this->queueService->rescheduleWithBackoff($jobId); 187 if ($rescheduleResult['rescheduled']) { 188 $logId = $this->extractLogIdFromJob($job); 189 if ($logId) { 190 $this->logService->updateLog($logId, [ 191 'status' => 'retry', 192 'next_attempt_at' => $rescheduleResult['scheduled_at'], 193 ]); 194 } 169 195 $result['rescheduled']++; 170 196 } else { 171 // Max attempts reached, already marked as failed 197 // Max attempts reached 198 $this->queueService->markPermanentlyFailed($jobId); 199 $logId = $this->extractLogIdFromJob($job); 200 if ($logId) { 201 $this->logService->updateLog($logId, [ 202 'status' => 'permanently_failed', 203 'next_attempt_at' => null, 204 ]); 205 } 172 206 $result['failed']++; 173 207 } 208 } else { 209 // Non-retryable failure (4xx, 3xx, config error) 210 $this->queueService->markPermanentlyFailed($jobId); 211 $logId = $this->extractLogIdFromJob($job); 212 if ($logId) { 213 $this->logService->updateLog($logId, [ 214 'status' => 'permanently_failed', 215 'next_attempt_at' => null, 216 ]); 217 } 218 $result['failed']++; 174 219 } 175 220 } … … 182 227 * 183 228 * @param array $job Job data from queue 184 * @return bool True if successful185 */ 186 private function processJob(array $job): bool{229 * @return array{success: bool, shouldRetry: bool} 230 */ 231 private function processJob(array $job): array { 187 232 $jobData = json_decode($job['payload'], true); 188 233 189 234 if (!$jobData || !isset($jobData['webhook']) || !isset($jobData['payload'])) { 190 return false;235 return ['success' => false, 'shouldRetry' => false]; 191 236 } 192 237 … … 194 239 $payload = $jobData['payload']; 195 240 $trigger = $job['trigger_name']; 196 $logId = !empty($jobData['log_id']) ? (int) $jobData['log_id'] : null;241 $logId = $this->extractLogIdFromJob($job); 197 242 $mappingApplied = (bool) ($jobData['mapping_applied'] ?? false); 198 243 $originalPayload = $jobData['original_payload'] ?? null; … … 214 259 } 215 260 216 return $this->sendToWebhook($webhook, $payload, $trigger, $logId); 261 $attemptNumber = (int) ($job['attempts'] ?? 0); 262 263 return $this->sendToWebhook($webhook, $payload, $trigger, $logId, $attemptNumber); 264 } 265 266 /** 267 * Extract log ID from a queue job (column-first, payload fallback) 268 * 269 * @param array $job 270 * @return int|null 271 */ 272 private function extractLogIdFromJob(array $job): ?int { 273 if (!empty($job['log_id'])) { 274 return (int) $job['log_id']; 275 } 276 277 $jobData = json_decode($job['payload'], true); 278 $logId = $jobData['log_id'] ?? null; 279 280 return $logId ? (int) $logId : null; 217 281 } 218 282 … … 224 288 * @param string $trigger The trigger event name 225 289 * @param int|null $logId Existing log ID to update (null to create new) 226 * @return bool True if successful 290 * @param int $attemptNumber Current attempt number (0-indexed) 291 * @return array{success: bool, shouldRetry: bool} 227 292 */ 228 293 public function sendToWebhook( … … 230 295 array $payload, 231 296 string $trigger, 232 ?int $logId = null 233 ): bool { 297 ?int $logId = null, 298 int $attemptNumber = 0 299 ): array { 234 300 if (empty($webhook['endpoint_url']) || !is_string($webhook['endpoint_url'])) { 235 return false;301 return ['success' => false, 'shouldRetry' => false]; 236 302 } 237 303 … … 244 310 if (!$this->isValidUrl($url)) { 245 311 $this->logError($trigger, $url, 'Invalid URL format', $webhookId, $payload, null, null, null, $logId); 246 return false;312 return ['success' => false, 'shouldRetry' => false]; 247 313 } 248 314 … … 254 320 $headers['Authorization'] = $authHeader; 255 321 } 322 323 // Add event identity headers before fswa_headers filter 324 $headers['X-Event-Id'] = $payload['event']['id'] ?? ''; 325 $headers['X-Event-Timestamp'] = $payload['event']['timestamp'] ?? ''; 256 326 257 327 /** … … 269 339 270 340 if (is_wp_error($result)) { 271 $this->handleError($result, $trigger, $url, $webhookId, $payload, $durationMs, $logId); 272 return $this->isTransientError($result) ? false : false; // All errors mean job failed 273 } 274 275 if (!$this->isSuccessResponse($result)) { 276 $responseCode = wp_remote_retrieve_response_code($result); 277 $responseBody = wp_remote_retrieve_body($result); 278 $errorMessage = sprintf("HTTP %d: %s", (int) $responseCode, (string) $responseBody); 279 280 $this->logError($trigger, $url, $errorMessage, $webhookId, $payload, (int) $responseCode, $responseBody, $durationMs, $logId); 281 return false; 282 } 283 284 $this->logSuccess($trigger, $url, $payload, $result, $webhookId, $durationMs, $logId); 285 return true; 341 $errorMessage = $result->get_error_message(); 342 $this->logError($trigger, $url, (string) $errorMessage, $webhookId, $payload, null, null, $durationMs, $logId); 343 344 if ($logId !== null) { 345 $this->logService->appendAttemptHistory($logId, [ 346 'attempt' => $attemptNumber, 347 'attempted_at' => gmdate('Y-m-d\TH:i:s\Z'), 348 'http_code' => null, 349 'status' => 'error', 350 'error_message' => (string) $errorMessage, 351 'duration_ms' => $durationMs, 352 'should_retry' => true, 353 ]); 354 } 355 356 /** 357 * Fires after a webhook delivery fails. 358 * 359 * @param string $trigger The trigger event name 360 * @param string $url The webhook endpoint URL 361 * @param string $error The error message 362 */ 363 do_action('fswa_error', $trigger, $url, (string) $errorMessage); 364 365 return ['success' => false, 'shouldRetry' => true]; 366 } 367 368 $responseCode = (int) wp_remote_retrieve_response_code($result); 369 $responseBody = wp_remote_retrieve_body($result); 370 371 $success = $responseCode >= 200 && $responseCode < 300; 372 $shouldRetry = !$success && ($responseCode >= 500 || $responseCode === 429); 373 374 if ($success) { 375 $this->logSuccess($trigger, $url, $payload, $result, $webhookId, $durationMs, $logId); 376 } else { 377 $errorMessage = sprintf("HTTP %d: %s", $responseCode, (string) $responseBody); 378 $this->logError($trigger, $url, $errorMessage, $webhookId, $payload, $responseCode, $responseBody, $durationMs, $logId); 379 } 380 381 if ($logId !== null) { 382 $this->logService->appendAttemptHistory($logId, [ 383 'attempt' => $attemptNumber, 384 'attempted_at' => gmdate('Y-m-d\TH:i:s\Z'), 385 'http_code' => $responseCode, 386 'status' => $success ? 'success' : 'error', 387 'error_message' => $success ? null : sprintf("HTTP %d", $responseCode), 388 'duration_ms' => $durationMs, 389 'should_retry' => $shouldRetry, 390 ]); 391 } 392 393 if ($success) { 394 /** 395 * Fires after a successful webhook delivery. 396 * 397 * @param string $trigger The trigger event name 398 * @param string $url The webhook endpoint URL 399 * @param array $payload The payload data that was sent 400 * @param array $response The HTTP response from wp_remote_post 401 */ 402 do_action('fswa_success', $trigger, $url, $payload, $result); 403 } else { 404 /** 405 * Fires after a webhook delivery fails. 406 * 407 * @param string $trigger The trigger event name 408 * @param string $url The webhook endpoint URL 409 * @param string $error The error message 410 */ 411 do_action('fswa_error', $trigger, $url, sprintf("HTTP %d", $responseCode)); 412 } 413 414 return ['success' => $success, 'shouldRetry' => $shouldRetry]; 286 415 } 287 416 … … 362 491 363 492 return true; 364 }365 366 /**367 * Check if HTTP response indicates success (2xx status code)368 *369 * @param array<string, mixed>|WP_Error $response WordPress HTTP response array370 * @return bool True if response is successful371 */372 private function isSuccessResponse($response): bool {373 $code = wp_remote_retrieve_response_code($response);374 return (int) $code >= 200 && (int) $code < 300;375 }376 377 /**378 * Handle HTTP errors379 *380 * @param WP_Error $error The WP_Error object381 * @param string $trigger The trigger event name382 * @param string $url The webhook URL383 * @param int $webhookId Webhook ID384 * @param array<string, mixed> $payload Payload data385 * @param int $durationMs Request duration in milliseconds386 * @param int|null $logId Existing log ID to update387 * @return void388 */389 private function handleError(390 WP_Error $error,391 string $trigger,392 string $url,393 int $webhookId,394 array $payload,395 int $durationMs = 0,396 ?int $logId = null397 ): void {398 $errorMessage = $error->get_error_message();399 $this->logError($trigger, $url, (string) $errorMessage, $webhookId, $payload, null, null, $durationMs, $logId);400 }401 402 /**403 * Determine if an error is transient and should be retried404 *405 * @param WP_Error $error The WP_Error object406 * @return bool True if error is transient407 */408 private function isTransientError(WP_Error $error): bool {409 $transientCodes = [410 'http_request_failed',411 'http_request_timeout'412 ];413 414 return count(array_intersect(415 $error->get_error_codes(),416 $transientCodes417 )) > 0;418 493 } 419 494 … … 446 521 if ($logId !== null) { 447 522 $this->logService->updateLog($logId, [ 448 'status' => 'error',449 'http_code' => $httpCode,523 'status' => 'error', 524 'http_code' => $httpCode, 450 525 'response_body' => $responseBody, 451 526 'error_message' => $error, 452 'duration_ms' => $durationMs,527 'duration_ms' => $durationMs, 453 528 ]); 454 529 } else { … … 464 539 } 465 540 } 466 467 /**468 * Fires after a webhook delivery fails.469 *470 * @param string $trigger The trigger event name471 * @param string $url The webhook endpoint URL472 * @param string $error The error message473 */474 do_action('fswa_error', $trigger, $url, $error);475 541 } 476 542 … … 502 568 if ($logId !== null) { 503 569 $this->logService->updateLog($logId, [ 504 'status' => 'success',505 'http_code' => (int) $responseCode,570 'status' => 'success', 571 'http_code' => (int) $responseCode, 506 572 'response_body' => (string) $responseBody, 507 'duration_ms' => $durationMs,573 'duration_ms' => $durationMs, 508 574 ]); 509 575 } else { … … 518 584 } 519 585 } 520 521 /**522 * Fires after a successful webhook delivery.523 *524 * @param string $trigger The trigger event name525 * @param string $url The webhook endpoint URL526 * @param array $payload The payload data that was sent527 * @param array $response The HTTP response from wp_remote_post528 */529 do_action('fswa_success', $trigger, $url, $payload, $response);530 586 } 531 587 -
flowsystems-webhook-actions/trunk/src/Services/LogService.php
r3462891 r3471792 22 22 * @param array|null $originalPayload The original payload before transformation (null if no mapping) 23 23 * @param bool $mappingApplied Whether field mapping was applied 24 * @param string|null $eventUuid UUID shared across all webhooks for this trigger event 25 * @param string|null $eventTimestamp ISO 8601 UTC timestamp of the event 24 26 * @return int|false Log ID or false on failure 25 27 */ … … 29 31 array $payload, 30 32 ?array $originalPayload = null, 31 bool $mappingApplied = false 33 bool $mappingApplied = false, 34 ?string $eventUuid = null, 35 ?string $eventTimestamp = null 32 36 ) { 33 37 return $this->repository->create([ … … 38 42 'original_payload' => $originalPayload, 39 43 'mapping_applied' => $mappingApplied, 44 'event_uuid' => $eventUuid, 45 'event_timestamp' => $eventTimestamp, 40 46 ]); 41 47 } … … 140 146 141 147 /** 148 * Append an attempt entry to the log's attempt history 149 * 150 * Caps history at fswa_max_attempts entries to prevent unbounded growth. 151 * 152 * @param int $logId 153 * @param array $attemptData 154 * @return bool 155 */ 156 public function appendAttemptHistory(int $logId, array $attemptData): bool { 157 $log = $this->repository->find($logId); 158 if (!$log) { 159 return false; 160 } 161 162 $history = is_array($log['attempt_history']) ? $log['attempt_history'] : []; 163 $history[] = $attemptData; 164 165 $maxAttempts = (int) apply_filters('fswa_max_attempts', 5); 166 if (count($history) > $maxAttempts) { 167 $history = array_slice($history, -$maxAttempts); 168 } 169 170 return $this->repository->update($logId, ['attempt_history' => $history]); 171 } 172 173 /** 142 174 * Get the repository instance 143 175 * -
flowsystems-webhook-actions/trunk/src/Services/QueueService.php
r3462891 r3471792 23 23 * @param array $payload 24 24 * @param DateTime|null $scheduledAt When to process (null = now) 25 * @param int|null $logId Associated log ID 25 26 * @return int Job ID 26 27 */ 27 public function enqueue(int $webhookId, string $trigger, array $payload, ?DateTime $scheduledAt = null ): int {28 public function enqueue(int $webhookId, string $trigger, array $payload, ?DateTime $scheduledAt = null, ?int $logId = null): int { 28 29 if ($scheduledAt === null) { 29 30 $scheduledAt = new DateTime('now', new DateTimeZone('UTC')); … … 37 38 $maxAttempts = (int) apply_filters('fswa_max_attempts', 5); 38 39 39 return $this->repository->insert([40 $data = [ 40 41 'webhook_id' => $webhookId, 41 42 'trigger_name' => $trigger, … … 46 47 'scheduled_at' => $scheduledAt->format('Y-m-d H:i:s'), 47 48 'created_at' => current_time('mysql', true), 48 ]); 49 ]; 50 51 if ($logId !== null) { 52 $data['log_id'] = $logId; 53 } 54 55 return $this->repository->insert($data); 49 56 } 50 57 … … 112 119 113 120 /** 121 * Mark a job as permanently failed (non-retryable or max attempts exceeded) 122 * 123 * @param int $jobId 124 */ 125 public function markPermanentlyFailed(int $jobId): void { 126 $this->repository->update($jobId, [ 127 'status' => 'permanently_failed', 128 'locked_at' => null, 129 'locked_by' => null, 130 ]); 131 } 132 133 /** 114 134 * Reschedule a job with exponential backoff 115 135 * 116 136 * @param int $jobId 117 * @return bool True if rescheduled, false if max attempts reached118 */ 119 public function rescheduleWithBackoff(int $jobId): bool{137 * @return array{rescheduled: bool, scheduled_at: string|null} 138 */ 139 public function rescheduleWithBackoff(int $jobId): array { 120 140 $job = $this->repository->find($jobId); 121 141 122 142 if (!$job) { 123 return false;143 return ['rescheduled' => false, 'scheduled_at' => null]; 124 144 } 125 145 … … 128 148 129 149 if ($newAttempts >= $maxAttempts) { 130 $this->markFailed($jobId, 'Max retry attempts exceeded'); 131 return false; 150 return ['rescheduled' => false, 'scheduled_at' => null]; 132 151 } 133 152 … … 136 155 $scheduledAt = new DateTime('now', new DateTimeZone('UTC')); 137 156 $scheduledAt->modify("+{$delaySeconds} seconds"); 157 $scheduledAtStr = $scheduledAt->format('Y-m-d H:i:s'); 138 158 139 159 $this->repository->update($jobId, [ … … 142 162 'locked_at' => null, 143 163 'locked_by' => null, 144 'scheduled_at' => $scheduledAt ->format('Y-m-d H:i:s'),145 ]); 146 147 return true;164 'scheduled_at' => $scheduledAtStr, 165 ]); 166 167 return ['rescheduled' => true, 'scheduled_at' => $scheduledAtStr]; 148 168 } 149 169 … … 174 194 'completed' => 0, 175 195 'failed' => 0, 196 'permanently_failed' => 0, 176 197 'total' => 0, 177 198 ]; 178 199 179 200 foreach ($stats as $row) { 180 $result[$row['status']] = (int) $row['count']; 201 if (array_key_exists($row['status'], $result)) { 202 $result[$row['status']] = (int) $row['count']; 203 } 181 204 $result['total'] += (int) $row['count']; 182 205 } … … 244 267 245 268 /** 246 * Force retry a failed job immediately269 * Force retry a failed or permanently failed job immediately 247 270 * 248 271 * @param int $jobId … … 252 275 $job = $this->repository->find($jobId); 253 276 254 if (!$job || $job['status'] !== 'failed') {277 if (!$job || !in_array($job['status'], ['failed', 'permanently_failed'], true)) { 255 278 return false; 256 279 } … … 266 289 ]); 267 290 } 291 292 /** 293 * Get the repository instance 294 * 295 * @return QueueRepository 296 */ 297 public function getRepository(): QueueRepository { 298 return $this->repository; 299 } 268 300 }
Note: See TracChangeset
for help on using the changeset viewer.