当前位置: 首页 > news >正文

PHPPHP与消息队列RabbitMQ集成

PHP与消息队列RabbitMQ集成

RabbitMQ是流行的消息中间件。PHP通过AMQP扩展或php-amqplib库连接RabbitMQ。今天说说PHP与RabbitMQ的集成。

连接RabbitMQ。

```php
// composer require php-amqplib/php-amqplib

require 'vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

echo "已连接到RabbitMQ\n";

$channel->close();
$connection->close();
?>

发送消息到队列。

```php
function sendToQueue(string $queueName, array $data): void
{
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->queue_declare($queueName, false, true, false, false);

$message = new AMQPMessage(json_encode($data), [
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
]);

$channel->basic_publish($message, '', $queueName);
echo "消息已发送到队列: $queueName\n";

$channel->close();
$connection->close();
}

sendToQueue('task_queue', ['task' => 'send_email', 'to' => 'user@example.com']);
?>

消费消息。

```php
function consumeQueue(string $queueName, callable $handler): void
{
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->queue_declare($queueName, false, true, false, false);
echo "等待消息...\n";

$channel->basic_consume($queueName, '', false, false, false, false, function ($msg) use ($handler) {
echo "收到消息\n";
$data = json_decode($msg->body, true);

try {
$handler($data);
$msg->ack();
echo "处理完成\n";
} catch (Exception $e) {
echo "处理失败: {$e->getMessage()}\n";
$msg->nack(true);
}
});

while ($channel->is_consuming()) {
$channel->wait();
}

$channel->close();
$connection->close();
}

consumeQueue('task_queue', function ($data) {
echo "处理: {$data['task']}\n";
});
?>

发布订阅模式。

```php
// 发布者
function publish(string $exchangeName, array $data): void
{
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->exchange_declare($exchangeName, 'fanout', false, false, false);
$message = new AMQPMessage(json_encode($data));
$channel->basic_publish($message, $exchangeName);

echo "已发布到交换器: $exchangeName\n";

$channel->close();
$connection->close();
}

// 订阅者
function subscribe(string $exchangeName, string $queueName, callable $handler): void
{
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->exchange_declare($exchangeName, 'fanout', false, false, false);
$channel->queue_declare($queueName, false, false, false, false);
$channel->queue_bind($queueName, $exchangeName);

$channel->basic_consume($queueName, '', false, true, false, false, function ($msg) use ($handler) {
$handler(json_decode($msg->body, true));
});

while ($channel->is_consuming()) {
$channel->wait();
}
}
?>

延迟队列的实现。

```php
function sendDelayed(string $queueName, array $data, int $delayMs): void
{
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$message = new AMQPMessage(json_encode($data), [
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
'application_headers' => new AMQPTable([
'x-delay' => $delayMs,
]),
]);

$channel->basic_publish($message, '', $queueName);
echo "延迟消息已发送 ({$delayMs}ms后执行)\n";
}
?>

RabbitMQ是功能完整的消息队列系统。支持多种消息模式、消息持久化、ACK确认、延迟队列。PHP通过php-amqplib库可以方便地集成RabbitMQ,适合需要可靠消息传递的场景。

http://www.rkmt.cn/news/1470958.html

相关文章:

  • 2026年Q2重庆黄金回收店核心技术与服务全景解析 - 优质品牌商家
  • 告别定位漂移:用Python+开源IGNav库,手把手实现你的第一个RTK/INS紧组合算法
  • 给TMS320F28377D做个‘心脏搭桥’:手把手教你配置双工程Bootloader的CMD文件
  • 从智能车竞赛到DIY电源:固态电容如何解决我的大功率电路‘发烧’难题
  • 别再自己造轮子了!手把手教你用Cadence/Synopsys VIP加速SoC验证(附自研VIP开发避坑指南)
  • 别再瞎试了!用FFmpeg -buildconf 命令读懂编译选项,定制你的专属音视频工具链
  • 别再只用if-else了!用Python的异或运算符(^)让你的代码更简洁高效
  • 油气管道石蜡沉积动态仿真工具:MATLAB GUI版,含温度/流速影响分析与可视化结果
  • LIO-SAM保姆级调试笔记:从IMU标定到地图保存的完整避坑指南
  • 别再死记硬背了!用生活中的例子秒懂Wi-Fi信号为啥时好时坏(直射/反射/绕射全解析)
  • 西门子博图比较操作避坑指南:为什么你的‘值不在范围内’指令总是不触发?(基于TIA V17)
  • 用PDDL给AI定规矩:手把手教你设计一个自动化的‘快递分拣’规划问题
  • 从PLC到上位机:深入聊聊C#/Python中byte、char处理串口数据的那些坑
  • 别再直接读ADC了!手把手教你用STM32F103和LM358给PT100搭个高精度测温电路
  • 安全实验室搭建笔记:如何用中兴ZXR10-3928A的端口镜像功能部署IDS
  • 开源AI编程的安全性:MonkeyCode 容器沙箱隔离方案深度解析
  • OpenCore Legacy Patcher:让老旧Mac焕发新生的5个关键步骤
  • 信号系统学不动了?试试用Python的SymPy库5分钟搞定拉普拉斯变换(附常见信号变换表)
  • 从傅里叶到拉普拉斯:搞懂‘收敛域’才是信号分析入门的钥匙(避坑指南)
  • 2014-2026年我国POI兴趣点数据
  • 别再傻等Github Action定时任务了!我用腾讯云函数SCF+workflow_dispatch,实现了毫秒级精准触发
  • 大模型SFT监督微调完全解析:原理、数据集、训练流程、实战调优、避坑指南
  • 2026长春市洋酒回收评测:沈阳名酒回收/沈阳白酒大类回收/沈阳茅台酒回收/靠谱商家核心维度对比 - 优质品牌商家
  • 别再死记硬背公式了!用Python的NumPy和Matplotlib亲手‘画’出傅里叶级数(附完整代码)
  • ROS开发者的福音:手把手教你汉化RViz界面,告别英文菜单困扰
  • OpenClaw Windows全流程实操安装指南
  • ADC0809老矣?深入对比STM32的ADC多通道采集,聊聊精度、速度与易用性的那些事儿
  • 从Qt5到Qt6:MainWindow状态栏API的细微变化与迁移避坑指南
  • 循环结构.
  • 如何用LRCGET批量下载工具,为你的离线音乐库一键添加精准同步歌词