标签存档: Message Queue

PHP 通过共享内存实现消息队列和进程通信的两个类

实现消息队列,可以使用比较专业的工具,例如:Apache ActiveMQ、memcacheq…..,下面是两个基本简单的实现方式:

  1. 使用memcache方法来实现
    
    < ?php
    /*
     * @Copyright (c) 2007,上海友邻信息科技有限公司
     * @All	rights reserved.
     *
     * 这个消息队列不是线程安全的,我只是尽量的避免了冲突的可能性。如果你要实现线程安全的,一个建议是通过文件进行锁定,然后进行操作。
     *
     * @filename   MemcacheQueue.class.php
     * @category
     * @package
     * @author     Xinze 
     * @date       2010-04-08 03:44:42
     */
    /**
    * Class and Function List:
    * Function list:
    * - __construct()
    * - singleton()
    * - init()
    * - __get()
    * - __set()
    * - isEmpty()
    * - isFull()
    * - enQueue()
    * - deQueue()
    * - getTop()
    * - getAll()
    * - getPage()
    * - makeEmpty()
    * - getAllKeys()
    * - add()
    * - increment()
    * - decrement()
    * - set()
    * - get()
    * - delete()
    * - getKeyByPos()
    * Classes list:
    * - Yl_MemcacheQueue
    */
    
    class Yl_MemcacheQueue
    {
        private static $instance;
        private $memcache;
        private $name;
        private $prefix;
        private $maxSize;
    
        private function __construct()
        {
        }
    
        static function singleton()
        {
    
            if (!(self::$instance instanceof self))
            {
                self::$instance = new Yl_MemcacheQueue(); 
    
            }
    
            return self::$instance;
        }
    
        public function init($max_size, $name, $prefix = "__queue__")
        {
    		$max_size = 1000;
    		$name     = '_war_';
    		$prefix   = '_queue';
    
            $this->memcache = Yl_Memcache::singleton();
            $this->name    = $name;
            $this->prefix  = $prefix;
            $this->maxSize = $max_size;
    
            $this->add('front', 0);
            $this->add('rear', 0);
            $this->add('size', 0);
        }
    
        function isEmpty()
        {
            return $this->get('size') == 0;
        }
    
        function isFull()
        {
            return $this->get('size') >= $this->maxSize;
        }
    
        function enQueue($data)
        {
            if ($this->isFull())
            {
                throw new Exception("Queue is Full");
            }
    
            $size = $this->increment('size');
            $rear = $this->increment('rear');
    
            $this->set(($rear-1) % $this->maxSize, $data);
    
            return $this;
        }
    
        function deQueue()
        {
            if ($this->isEmpty())
            {
                throw new Exception("Queue is Empty");
            }
    
            $this->decrement('size');
            $front = $this->increment('front');
            $this->delete(($front - 1) % $this->maxSize);
    
            return $this;
        }
    
        function getTop()
        {
            return $this->get($this->get('front') % $this->maxSize);
        }
    
        function getAll()
        {
            return $this->getPage();
        }
    
        function getPage($offset = 0, $limit = 0)
        {
    		$size = $this->get('size');
    
            if (0==$size || $size< $offset)
            {
                return null;
            }
    
    		$front = $this->get('front') % $this->maxSize;
    		$rear = $this->get('rear') % $this->maxSize;
    
            $keys[] = $this->getKeyByPos(($front + $offset) % $this->maxSize);
            $num = 1;
    
            for ($pos = ($front + $offset + 1) % $this->maxSize; $pos != $rear; $pos = ($pos + 1) % $this->maxSize)
            {
                $keys[] = $this->getKeyByPos($pos);
                $num++;
    
                if ($limit > 0 && $limit == $num)
                {
                    break;
                }
            }
    
            return array_values($this->memcache->get($keys));
        }
    
        function makeEmpty()
        {
            $keys = $this->getAllKeys();
    
            foreach($keys as $value)
            {
                $this->delete($value);
            }
    
            $this->delete("rear");
            $this->delete("front");
            $this->delete('size');
            $this->delete("maxSize");
        }
    
        private function getAllKeys()
        {
            if ($this->isEmpty())
            {
                return array();
            }
    
            $keys[] = $this->get('front');
    
            for ($pos = ($this->get('front') % $this->maxSize + 1) % $this->maxSize; $pos != $this->get('rear') % $this->maxSize; $pos = ($pos + 1) % $this->maxSize)
            {
                $keys[] = $pos;
            }
    
            return $keys;
        }
    
        private function add($pos, $data)
        {
            $this->memcache->add($this->getKeyByPos($pos) , $data);
    
            return $this;
        }
    
        private function increment($pos)
        {
    
            return $this->memcache->increment($this->getKeyByPos($pos));
        }
    
        private function decrement($pos)
        {
            $this->memcache->decrement($this->getKeyByPos($pos));
        }
    
        private function set($pos, $data)
        {
            $this->memcache->save($data, $this->getKeyByPos($pos));
    
            return $this;
        }
    
        private function get($pos)
        {
    
            return $this->memcache->get($this->getKeyByPos($pos));
        }
    
        private function delete($pos)
        {
    
            return $this->memcache->delete($this->getKeyByPos($pos));
        }
    
        private function getKeyByPos($pos)
        {
    
            return $this->prefix . $this->name . $pos;
        }
    }
    ?>
    
  2. 使用共享内存队列实现 点击查看原文地址
    < ?php
    /**
     * 使用共享内存的PHP循环内存队列实现
     * 支持多进程, 支持各种数据类型的存储
     * 注: 完成入队或出队操作,尽快使用unset(), 以释放临界区
     *
     * @author wangbinandi@gmail.com
     * @created 2009-12-23
     */
    class ShmQueue
    {
    	private $maxQSize = 0; // 队列最大长度
    
    	private $front = 0; // 队头指针
    	private $rear = 0;  // 队尾指针
    
    	private $blockSize = 256;  // 块的大小(byte)
    	private $memSize = 25600;  // 最大共享内存(byte)
    	private $shmId = 0;
    
    	private $filePtr = './shmq.ptr';
    
    	private $semId = 0;
    	public function __construct()
    	{
    		$shmkey = ftok(__FILE__, 't');
    
    		$this->shmId = shmop_open($shmkey, "c", 0644, $this->memSize );
    		$this->maxQSize = $this->memSize / $this->blockSize;
    
    		 // 申請一个信号量
    		$this->semId = sem_get($shmkey, 1);
    		sem_acquire($this->semId); // 申请进入临界区		
    
    		$this->init();
    	}
    
    	private function init()
    	{
    		if ( file_exists($this->filePtr) ){
    			$contents = file_get_contents($this->filePtr);
    			$data = explode( '|', $contents );
    			if ( isset($data[0]) && isset($data[1])){
    				$this->front = (int)$data[0];
    				$this->rear  = (int)$data[1];
    			}
    		}
    	}
    
    	public function getLength()
    	{
    		return (($this->rear - $this->front + $this->memSize) % ($this->memSize) )/$this->blockSize;
    	}
    
    	public function enQueue( $value )
    	{
    		if ( $this->ptrInc($this->rear) == $this->front ){ // 队满
    			return false;
    		}
    
    		$data = $this->encode($value);
    		shmop_write($this->shmId, $data, $this->rear );
    		$this->rear = $this->ptrInc($this->rear);
    		return true;
    	}
    
    	public function deQueue()
    	{
    		if ( $this->front == $this->rear ){ // 队空
    			return false;
    		}
    		$value = shmop_read($this->shmId, $this->front, $this->blockSize-1);
    		$this->front = $this->ptrInc($this->front);
    		return $this->decode($value);
    	}
    
    	private function ptrInc( $ptr )
    	{
    		return ($ptr + $this->blockSize) % ($this->memSize);
    	}
    
    	private function encode( $value )
    	{
    		$data = serialize($value) . "__eof";
    		echo '';
    
    		echo strlen($data);
    		echo '';
    
    		echo $this->blockSize -1;
    		echo '';
    
    		if ( strlen($data) > $this->blockSize -1 ){
    			throw new Exception(strlen($data)." is overload block size!");
    		}
    		return $data;
    	}
    
    	private function decode( $value )
    	{
    		$data = explode("__eof", $value);
    		return unserialize($data[0]);
    	}
    
    	public function __destruct()
    	{
    		$data = $this->front . '|' . $this->rear;
    		file_put_contents($this->filePtr, $data);
    
    		sem_release($this->semId); // 出临界区, 释放信号量
    	}
    }
    
    /*
    // 进队操作
    $shmq = new ShmQueue();
    $data = 'test data';
    $shmq->enQueue($data);
    unset($shmq);
    // 出队操作
    $shmq = new ShmQueue();
    $data = $shmq->deQueue();
    unset($shmq);
    */
    
    ?>