实现消息队列,可以使用比较专业的工具,例如:Apache ActiveMQ、memcacheq…..,下面是两个基本简单的实现方式:
- 使用memcache方法来实现
< ?php /* * @Copyright (c) 2007,上海友邻信息科技有限公司 * @All rights reserved. * * 这个消息队列不是线程安全的,我只是尽量的避免了冲突的可能性。如果你要实现线程安全的,一个建议是通过文件进行锁定,然后进行操作。 * * @filename MemcacheQueue.class.php * @category * @package * @author Xinze
* @date 2010-04-08 03:44:42 */ /** * Class and Function List: * Function list: * - __construct() * - singleton() * - init() * - __get() * - __set() * - isEmpty() * - isFull() * - enQueue() * - deQueue() * - getTop() * - getAll() * - getPage() * - makeEmpty() * - getAllKeys() * - add() * - increment() * - decrement() * - set() * - get() * - delete() * - getKeyByPos() * Classes list: * - Yl_MemcacheQueue */ class Yl_MemcacheQueue { private static $instance; private $memcache; private $name; private $prefix; private $maxSize; private function __construct() { } static function singleton() { if (!(self::$instance instanceof self)) { self::$instance = new Yl_MemcacheQueue(); } return self::$instance; } public function init($max_size, $name, $prefix = "__queue__") { $max_size = 1000; $name = '_war_'; $prefix = '_queue'; $this->memcache = Yl_Memcache::singleton(); $this->name = $name; $this->prefix = $prefix; $this->maxSize = $max_size; $this->add('front', 0); $this->add('rear', 0); $this->add('size', 0); } function isEmpty() { return $this->get('size') == 0; } function isFull() { return $this->get('size') >= $this->maxSize; } function enQueue($data) { if ($this->isFull()) { throw new Exception("Queue is Full"); } $size = $this->increment('size'); $rear = $this->increment('rear'); $this->set(($rear-1) % $this->maxSize, $data); return $this; } function deQueue() { if ($this->isEmpty()) { throw new Exception("Queue is Empty"); } $this->decrement('size'); $front = $this->increment('front'); $this->delete(($front - 1) % $this->maxSize); return $this; } function getTop() { return $this->get($this->get('front') % $this->maxSize); } function getAll() { return $this->getPage(); } function getPage($offset = 0, $limit = 0) { $size = $this->get('size'); if (0==$size || $size< $offset) { return null; } $front = $this->get('front') % $this->maxSize; $rear = $this->get('rear') % $this->maxSize; $keys[] = $this->getKeyByPos(($front + $offset) % $this->maxSize); $num = 1; for ($pos = ($front + $offset + 1) % $this->maxSize; $pos != $rear; $pos = ($pos + 1) % $this->maxSize) { $keys[] = $this->getKeyByPos($pos); $num++; if ($limit > 0 && $limit == $num) { break; } } return array_values($this->memcache->get($keys)); } function makeEmpty() { $keys = $this->getAllKeys(); foreach($keys as $value) { $this->delete($value); } $this->delete("rear"); $this->delete("front"); $this->delete('size'); $this->delete("maxSize"); } private function getAllKeys() { if ($this->isEmpty()) { return array(); } $keys[] = $this->get('front'); for ($pos = ($this->get('front') % $this->maxSize + 1) % $this->maxSize; $pos != $this->get('rear') % $this->maxSize; $pos = ($pos + 1) % $this->maxSize) { $keys[] = $pos; } return $keys; } private function add($pos, $data) { $this->memcache->add($this->getKeyByPos($pos) , $data); return $this; } private function increment($pos) { return $this->memcache->increment($this->getKeyByPos($pos)); } private function decrement($pos) { $this->memcache->decrement($this->getKeyByPos($pos)); } private function set($pos, $data) { $this->memcache->save($data, $this->getKeyByPos($pos)); return $this; } private function get($pos) { return $this->memcache->get($this->getKeyByPos($pos)); } private function delete($pos) { return $this->memcache->delete($this->getKeyByPos($pos)); } private function getKeyByPos($pos) { return $this->prefix . $this->name . $pos; } } ?> - 使用共享内存队列实现 点击查看原文地址
< ?php /** * 使用共享内存的PHP循环内存队列实现 * 支持多进程, 支持各种数据类型的存储 * 注: 完成入队或出队操作,尽快使用unset(), 以释放临界区 * * @author wangbinandi@gmail.com * @created 2009-12-23 */ class ShmQueue { private $maxQSize = 0; // 队列最大长度 private $front = 0; // 队头指针 private $rear = 0; // 队尾指针 private $blockSize = 256; // 块的大小(byte) private $memSize = 25600; // 最大共享内存(byte) private $shmId = 0; private $filePtr = './shmq.ptr'; private $semId = 0; public function __construct() { $shmkey = ftok(__FILE__, 't'); $this->shmId = shmop_open($shmkey, "c", 0644, $this->memSize ); $this->maxQSize = $this->memSize / $this->blockSize; // 申請一个信号量 $this->semId = sem_get($shmkey, 1); sem_acquire($this->semId); // 申请进入临界区 $this->init(); } private function init() { if ( file_exists($this->filePtr) ){ $contents = file_get_contents($this->filePtr); $data = explode( '|', $contents ); if ( isset($data[0]) && isset($data[1])){ $this->front = (int)$data[0]; $this->rear = (int)$data[1]; } } } public function getLength() { return (($this->rear - $this->front + $this->memSize) % ($this->memSize) )/$this->blockSize; } public function enQueue( $value ) { if ( $this->ptrInc($this->rear) == $this->front ){ // 队满 return false; } $data = $this->encode($value); shmop_write($this->shmId, $data, $this->rear ); $this->rear = $this->ptrInc($this->rear); return true; } public function deQueue() { if ( $this->front == $this->rear ){ // 队空 return false; } $value = shmop_read($this->shmId, $this->front, $this->blockSize-1); $this->front = $this->ptrInc($this->front); return $this->decode($value); } private function ptrInc( $ptr ) { return ($ptr + $this->blockSize) % ($this->memSize); } private function encode( $value ) { $data = serialize($value) . "__eof"; echo ''; echo strlen($data); echo ''; echo $this->blockSize -1; echo ''; if ( strlen($data) > $this->blockSize -1 ){ throw new Exception(strlen($data)." is overload block size!"); } return $data; } private function decode( $value ) { $data = explode("__eof", $value); return unserialize($data[0]); } public function __destruct() { $data = $this->front . '|' . $this->rear; file_put_contents($this->filePtr, $data); sem_release($this->semId); // 出临界区, 释放信号量 } } /* // 进队操作 $shmq = new ShmQueue(); $data = 'test data'; $shmq->enQueue($data); unset($shmq); // 出队操作 $shmq = new ShmQueue(); $data = $shmq->deQueue(); unset($shmq); */ ?>
近期评论