延时队列

  • Delayproducer.Php

  • Amqpbuilder.Php

AmqpBuilder.php

<?phpdeclare(strict_types = 1);namespace App\Components\Amqp;use Hyperf\Amqp\Builder\Builder;use Hyperf\Amqp\Builder\QueueBuilder;class AmqpBuilder extends QueueBuilder{    /**     * @param array|\PhpAmqpLib\Wire\AMQPTable $arguments     *     * @return \Hyperf\Amqp\Builder\Builder     */    public function setArguments($arguments) : Builder    {        $this->arguments = array_merge($this->arguments, $arguments);        return $this;    }    /**     * 设置延时队列相关参数     *     * @param string $queueName     * @param int    $xMessageTtl     * @param string $xDeadLetterExchange     * @param string $xDeadLetterRoutingKey     *     * @return $this     */    public function setDelayedQueue(string $queueName, int $xMessageTtl, string $xDeadLetterExchange, string $xDeadLetterRoutingKey) : self    {        $this->setArguments([            'x-message-ttl'             => ['I', $xMessageTtl * 1000], // 毫秒            'x-dead-letter-exchange'    => ['S', $xDeadLetterExchange],            'x-dead-letter-routing-key' => ['S', $xDeadLetterRoutingKey],        ]);        $this->setQueue($queueName);        return $this;    }}

DelayProducer.php

<?phpdeclare(strict_types = 1);namespace App\Components\Amqp;use Hyperf\Amqp\Annotation\Producer;use Hyperf\Amqp\Builder;use Hyperf\Amqp\Message\ProducerMessageInterface;use Hyperf\Di\Annotation\AnnotationCollector;use PhpAmqpLib\Message\AMQPMessage;use Throwable;class DelayProducer extends Builder{    /**     * @param ProducerMessageInterface $producerMessage     * @param AmqpBuilder              $queueBuilder     * @param bool                     $confirm     * @param int                      $timeout     *     * @return bool     * @throws \Throwable     */    public function produce(ProducerMessageInterface $producerMessage, AmqpBuilder $queueBuilder, bool $confirm = false, int $timeout = 5) : bool    {        return retry(1, function () use ($producerMessage, $queueBuilder, $confirm, $timeout)        {            return $this->produceMessage($producerMessage, $queueBuilder, $confirm, $timeout);        });    }    /**     * @param ProducerMessageInterface $producerMessage     * @param AmqpBuilder              $queueBuilder     * @param bool                     $confirm     * @param int                      $timeout     *     * @return bool     * @throws \Throwable     */    private function produceMessage(ProducerMessageInterface $producerMessage, AmqpBuilder $queueBuilder, bool $confirm = false, int $timeout = 5) : bool    {        $result = false;        $this->injectMessageProperty($producerMessage);        $message = new AMQPMessage($producerMessage->payload(), $producerMessage->getProperties());        $pool    = $this->getConnectionPool($producerMessage->getPoolName());        /** @var \Hyperf\Amqp\Connection $connection */        $connection = $pool->get();        if ($confirm) {            $channel = $connection->getConfirmChannel();        } else {            $channel = $connection->getChannel();        }        $channel->set_ack_handler(function () use (&$result)        {            $result = true;        });        try {            // 处理延时队列            $exchangeBuilder = $producerMessage->getExchangeBuilder();            // 队列定义            $channel->queue_declare($queueBuilder->getQueue(), $queueBuilder->isPassive(), $queueBuilder->isDurable(), $queueBuilder->isExclusive(), $queueBuilder->isAutoDelete(), $queueBuilder->isNowait(), $queueBuilder->getArguments(), $queueBuilder->getTicket());            // 路由定义            $channel->exchange_declare($exchangeBuilder->getExchange(), $exchangeBuilder->getType(), $exchangeBuilder->isPassive(), $exchangeBuilder->isDurable(), $exchangeBuilder->isAutoDelete(), $exchangeBuilder->isInternal(), $exchangeBuilder->isNowait(), $exchangeBuilder->getArguments(), $exchangeBuilder->getTicket());            // 队列绑定            $channel->queue_bind($queueBuilder->getQueue(), $producerMessage->getExchange(), $producerMessage->getRoutingKey());            // 消息发送            $channel->basic_publish($message, $producerMessage->getExchange(), $producerMessage->getRoutingKey());            $channel->wait_for_pending_acks_returns($timeout);        } catch (Throwable $exception) {            // Reconnect the connection before release.            $connection->reconnect();            throw $exception;        }        finally {            $connection->release();        }        return $confirm ? $result : true;    }    /**     * @param ProducerMessageInterface $producerMessage     */    private function injectMessageProperty(ProducerMessageInterface $producerMessage) : void    {        if (class_exists(AnnotationCollector::class)) {            /** @var \Hyperf\Amqp\Annotation\Producer $annotation */            $annotation = AnnotationCollector::getClassAnnotation(get_class($producerMessage), Producer::class);            if ($annotation) {                $annotation->routingKey && $producerMessage->setRoutingKey($annotation->routingKey);                $annotation->exchange && $producerMessage->setExchange($annotation->exchange);            }        }    }}

处理超时订单

  • Orderqueueconsumer.Php

  • Orderqueueproducer.Php

Orderqueueproducer.php

<?phpdeclare(strict_types = 1);namespace App\Amqp\Producer;use Hyperf\Amqp\Annotation\Producer;use Hyperf\Amqp\Builder\ExchangeBuilder;use Hyperf\Amqp\Message\ProducerMessage;/** * @Producer(exchange="order_exchange", routingKey="order_exchange") */class OrderQueueProducer extends ProducerMessage{    public function __construct($data)    {        $this->payload = $data;    }    public function getExchangeBuilder() : ExchangeBuilder    {        return parent::getExchangeBuilder(); // TODO: Change the autogenerated stub    }}

Orderqueueconsumer.php

<?phpdeclare(strict_types = 1);namespace App\Amqp\Consumer;use App\Service\CityTransport\OrderService;use Hyperf\Amqp\Result;use Hyperf\Amqp\Annotation\Consumer;use Hyperf\Amqp\Message\ConsumerMessage;/** * @Consumer(exchange="delay_exchange", routingKey="delay_route", queue="delay_queue", name ="OrderQueueConsumer", nums=1) */class OrderQueueConsumer extends ConsumerMessage{    public function consume($data) : string    {       ##业务处理    }    public function isEnable() : bool    {        return true;    }}

Demo

$builder = new AmqpBuilder();        $builder->setDelayedQueue('order_exchange', 1, 'delay_exchange', 'delay_route');        $que = ApplicationContext::getContainer()->get(DelayProducer::class);        var_dump($que->produce(new OrderQueueProducer(['order_sn' => (string)mt_rand(10000, 90000)]), $builder))

推荐教程:《PHP教程》

更多相关文章

  1. PHP自定义函数xmlToArray的实例解析
  2. PHP中数组规范和自定义集合
  3. PHP DIY系列之自定义配置和路由
  4. PHP预定义接口之Iterator用法示例
  5. 示例PHP购物车类Cart.class.php定义与用法
  6. PHP如何自定义的 printf 函数
  7. 谈谈PHP中的多进程消费队列
  8. PHP自定义的 printf 函数新用途
  9. 5种PHP定义数组的方法

随机推荐

  1. Android启动页
  2. Android 自带图标库 android.R.drawable
  3. Android触控事件
  4. rock960 android box compiling!
  5. [Android实例] android实现顶级圆角弹窗(
  6. Android 问题(一)
  7. android对话框的使用
  8. android的Material Design点击涟漪效果
  9. NoClassDefFoundError with Android Stud
  10. SDK下载地址