Task Queue
The Task Queue file provides a queue management system to handle asynchronous jobs. It uses Redis to store jobs in the queue and supports retries when a job fails. Below is a detailed explanation of each function:
push()
push()
public static function push($queueName, $eventName, $payload = null, $attempts = 0) {}
Functionality: Pushes a job into the queue.
Parameters:
$queueName
: The name of the queue (e.g.,'email'
,'notifications'
).$eventName
: The name of the event to be triggered when the job is processed (e.g.,'SendEmailEvent'
).$payload
: Data associated with the job (e.g., email information to be sent).$attempts
: The number of retry attempts (default is0
).
Example:
<?php
TaskQueue::push(
'email',
'SendEmailEvent',
['to' => '[email protected]', 'subject' => 'Welcome'],
0
);
// Result: The job is added to the task_queue:email queue.
pop()
pop()
public static function pop($queueName, $timeout = 5) {}
Functionality: Retrieves a job from the queue (uses blocking pop if available).
Parameters:
$queueName
: The name of the queue.$timeout
: Timeout duration (in seconds) if the queue is empty.
Example:
<?php
$job = TaskQueue::pop('email');
print_r($job);
// Output
// [
// 'event' => 'SendEmailEvent',
// 'payload' => ['to' => '[email protected]', 'subject' => 'Welcome'],
// 'timestamp' => 1681111111,
// 'attempts' => 0
// ]
del()
del()
Completely clears all session data, effectively logging out the user and resetting the session
Example:
Session::del('user_id'); // Deletes the user_id session variable
processJob()
processJob()
public static function processJob($queueName, $maxRetries = 3) {}
Functionality: Processes a job from the queue.
Parameters:
$queueName
: The name of the queue.$maxRetries
: The maximum number of retry attempts (default is3
).
Example:
<?php
TaskQueue::processJob('email');
// Result:
// If the job succeeds: The event is processed.
// If the job fails: The job is retried or moved to the failed queue.
pushFailed()
pushFailed()
public static function pushFailed($queueName, $job) {}
Functionality: Pushes a failed job into the failed queue.
Parameters:
$queueName
: The name of the original queue.$job
: The job as an array.
Example:
<?php
$job = [
'event' => 'SendEmailEvent',
'payload' => ['to' => '[email protected]', 'subject' => 'Welcome'],
'timestamp' => 1681111111,
'attempts' => 4
];
TaskQueue::pushFailed('email', $job);
// Result: The job is stored in the failed_task_queue:email queue
runWorker()
runWorker()
public static function runWorker($queueName, $maxRetries = 3, $timeout = 5) {}
Functionality: Runs a worker to continuously retrieve and process jobs from the queue.
Parameters:
$queueName
: The name of the queue.$maxRetries
: The maximum number of retry attempts.$timeout
: Timeout duration (in seconds) for blocking pop.
Example:
<?php
TaskQueue::runWorker('email');
// Result: The worker continuously processes jobs in the email queue
Example
<?php
// Register the event
Events::on('SendEmailEvent', function ($payload) {
echo "Sending email to {$payload['to']} with subject '{$payload['subject']}'<br>";
});
// Push a job into the queue
TaskQueue::push('email', 'SendEmailEvent', ['to' => '[email protected]', 'subject' => 'Welcome']);
// Run the worker to process the job
TaskQueue::runWorker('email');
// Output
// Sending email to [email protected] with subject 'Welcome'
Summary
push()
: Pushes a job into the queue.pop()
: Retrieves a job from the queue.processJob()
: Processes a single job.pushFailed()
: Pushes a failed job into the failed queue.runWorker()
: Runs a worker to continuously process jobs.
Last updated
Was this helpful?