Back
Featured image of post 连接池

连接池

基础知识

连接池实现

mysql连接池

头文件:

#ifndef DBPOOL_H_
#define DBPOOL_H_

#include <iostream>
#include <list>
#include <mutex>
#include <condition_variable>
#include <map>
#include <stdint.h>

#include <mysql.h>

#define MAX_ESCAPE_STRING_LEN	10240

using namespace std;

// 返回结果 select的时候用
class CResultSet {
public:
	CResultSet(MYSQL_RES* res);
	virtual ~CResultSet();

	bool Next();
	int GetInt(const char* key);
	char* GetString(const char* key);
private:
	int _GetIndex(const char* key);

	MYSQL_RES* 			m_res;
	MYSQL_ROW			m_row;
	map<string, int>	m_key_map;
};

// 插入数据用
class CPrepareStatement {
public:
	CPrepareStatement();
	virtual ~CPrepareStatement();

	bool Init(MYSQL* mysql, string& sql);

	void SetParam(uint32_t index, int& value);
	void SetParam(uint32_t index, uint32_t& value);
    void SetParam(uint32_t index, string& value);
    void SetParam(uint32_t index, const string& value);

	bool ExecuteUpdate();
	uint32_t GetInsertId();
private:
	MYSQL_STMT*	m_stmt;
	MYSQL_BIND*	m_param_bind;
	uint32_t	m_param_cnt;
};

class CDBPool;

class CDBConn {
public:
	CDBConn(CDBPool* pDBPool);
	virtual ~CDBConn();
	int Init();

	// 创建表
	bool ExecuteCreate(const char* sql_query);
	// 删除表
	bool ExecuteDrop(const char* sql_query);
	// 查询
	CResultSet* ExecuteQuery(const char* sql_query);

    /**
    *  执行DB更新,修改
    *
    *  @param sql_query     sql
    *  @param care_affected_rows  是否在意影响的行数,false:不在意;true:在意
    *
    *  @return 成功返回true 失败返回false
    */
	bool ExecuteUpdate(const char* sql_query, bool care_affected_rows = true);
	uint32_t GetInsertId();

	// 开启事务
	bool StartTransaction();
	// 提交事务
	bool Commit();
	// 回滚事务
	bool Rollback();
	// 获取连接池名
	const char* GetPoolName();
	MYSQL* GetMysql() { return m_mysql; }
private:
	CDBPool* 	m_pDBPool;	// to get MySQL server information
	MYSQL* 		m_mysql;	// 对应一个连接
	char		m_escape_string[MAX_ESCAPE_STRING_LEN + 1];
};

class CDBPool {	// 只是负责管理连接CDBConn,真正干活的是CDBConn
public:
	CDBPool() {}
	CDBPool(const char* pool_name, const char* db_server_ip, uint16_t db_server_port,
			const char* username, const char* password, const char* db_name, 
			int max_conn_cnt);
	virtual 	~CDBPool();

	int 		Init();		// 连接数据库,创建连接
	CDBConn* 	GetDBConn(const int timeout_ms = -1);	// 获取连接资源
	void 		RelDBConn(CDBConn* pConn);	// 归还连接资源

	const char* GetPoolName() { return m_pool_name.c_str(); }
	const char* GetDBServerIP() { return m_db_server_ip.c_str(); }
	uint16_t 	GetDBServerPort() { return m_db_server_port; }
	const char* GetUsername() { return m_username.c_str(); }
	const char* GetPasswrod() { return m_password.c_str(); }
	const char* GetDBName() { return m_db_name.c_str(); }
private:
	string 		m_pool_name;	// 连接池名称
	string 		m_db_server_ip;	// 数据库ip
	uint16_t	m_db_server_port; // 数据库端口
	string 		m_username;  	// 用户名
	string 		m_password;		// 用户密码
	string 		m_db_name;		// db名称
	int			m_db_cur_conn_cnt;	// 当前启用的连接数量
	int 		m_db_max_conn_cnt;	// 最大连接数量
	list<CDBConn*>	m_free_list;	// 空闲的连接

	list<CDBConn*>	m_used_list;		// 记录已经被请求的连接
	std::mutex m_mutex;
    std::condition_variable m_cond_var;
	bool m_abort_request = false;
	// CThreadNotify	m_free_notify;	// 信号量
};

#endif /* DBPOOL_H_ */

};

实现:


#include "DBPool.h"
#include <string.h>

#define log_error printf
#define log_warn printf
#define log_info printf
#define MIN_DB_CONN_CNT 2
#define MAX_DB_CONN_FAIL_NUM 10


CResultSet::CResultSet(MYSQL_RES *res)
{
	m_res = res;

	// map table field key to index in the result array
	int num_fields = mysql_num_fields(m_res);
	MYSQL_FIELD *fields = mysql_fetch_fields(m_res);
	for (int i = 0; i < num_fields; i++)
	{
		// 多行
		m_key_map.insert(make_pair(fields[i].name, i));
	}
}

CResultSet::~CResultSet()
{
	if (m_res)
	{
		mysql_free_result(m_res);
		m_res = NULL;
	}
}

bool CResultSet::Next()
{
	m_row = mysql_fetch_row(m_res);
	if (m_row)
	{
		return true;
	}
	else
	{
		return false;
	}
}

int CResultSet::_GetIndex(const char *key)
{
	map<string, int>::iterator it = m_key_map.find(key);
	if (it == m_key_map.end())
	{
		return -1;
	}
	else
	{
		return it->second;
	}
}

int CResultSet::GetInt(const char *key)
{
	int idx = _GetIndex(key);
	if (idx == -1)
	{
		return 0;
	}
	else
	{
		return atoi(m_row[idx]); // 有索引
	}
}

char *CResultSet::GetString(const char *key)
{
	int idx = _GetIndex(key);
	if (idx == -1)
	{
		return NULL;
	}
	else
	{
		return m_row[idx];		// 列
	}
}

/////////////////////////////////////////
CPrepareStatement::CPrepareStatement()
{
	m_stmt = NULL;
	m_param_bind = NULL;
	m_param_cnt = 0;
}

CPrepareStatement::~CPrepareStatement()
{
	if (m_stmt)
	{
		mysql_stmt_close(m_stmt);
		m_stmt = NULL;
	}

	if (m_param_bind)
	{
		delete[] m_param_bind;
		m_param_bind = NULL;
	}
}

bool CPrepareStatement::Init(MYSQL *mysql, string &sql)
{
	mysql_ping(mysql);	// 当mysql连接丢失的时候,使用mysql_ping能够自动重连数据库

	//g_master_conn_fail_num ++;
	m_stmt = mysql_stmt_init(mysql);
	if (!m_stmt)
	{
		log_error("mysql_stmt_init failed\n");
		return false;
	}

	if (mysql_stmt_prepare(m_stmt, sql.c_str(), sql.size()))
	{
		log_error("mysql_stmt_prepare failed: %s\n", mysql_stmt_error(m_stmt));
		return false;
	}

	m_param_cnt = mysql_stmt_param_count(m_stmt);
	if (m_param_cnt > 0)
	{
		m_param_bind = new MYSQL_BIND[m_param_cnt];
		if (!m_param_bind)
		{
			log_error("new failed\n");
			return false;
		}

		memset(m_param_bind, 0, sizeof(MYSQL_BIND) * m_param_cnt);
	}

	return true;
}

void CPrepareStatement::SetParam(uint32_t index, int &value)
{
	if (index >= m_param_cnt)
	{
		log_error("index too large: %d\n", index);
		return;
	}

	m_param_bind[index].buffer_type = MYSQL_TYPE_LONG;
	m_param_bind[index].buffer = &value;
}

void CPrepareStatement::SetParam(uint32_t index, uint32_t &value)
{
	if (index >= m_param_cnt)
	{
		log_error("index too large: %d\n", index);
		return;
	}

	m_param_bind[index].buffer_type = MYSQL_TYPE_LONG;
	m_param_bind[index].buffer = &value;
}

void CPrepareStatement::SetParam(uint32_t index, string &value)
{
	if (index >= m_param_cnt)
	{
		log_error("index too large: %d\n", index);
		return;
	}

	m_param_bind[index].buffer_type = MYSQL_TYPE_STRING;
	m_param_bind[index].buffer = (char *)value.c_str();
	m_param_bind[index].buffer_length = value.size();
}

void CPrepareStatement::SetParam(uint32_t index, const string &value)
{
	if (index >= m_param_cnt)
	{
		log_error("index too large: %d\n", index);
		return;
	}

	m_param_bind[index].buffer_type = MYSQL_TYPE_STRING;
	m_param_bind[index].buffer = (char *)value.c_str();
	m_param_bind[index].buffer_length = value.size();
}

bool CPrepareStatement::ExecuteUpdate()
{
	if (!m_stmt)
	{
		log_error("no m_stmt\n");
		return false;
	}

	if (mysql_stmt_bind_param(m_stmt, m_param_bind))
	{
		log_error("mysql_stmt_bind_param failed: %s\n", mysql_stmt_error(m_stmt));
		return false;
	}

	if (mysql_stmt_execute(m_stmt))
	{
		log_error("mysql_stmt_execute failed: %s\n", mysql_stmt_error(m_stmt));
		return false;
	}

	if (mysql_stmt_affected_rows(m_stmt) == 0)
	{
		log_error("ExecuteUpdate have no effect\n");
		return false;
	}

	return true;
}

uint32_t CPrepareStatement::GetInsertId()
{
	return mysql_stmt_insert_id(m_stmt);
}

/////////////////////
CDBConn::CDBConn(CDBPool *pPool)
{
	m_pDBPool = pPool;
	m_mysql = NULL;
}

CDBConn::~CDBConn()
{
	if (m_mysql)
	{
		mysql_close(m_mysql);
	}
}

int CDBConn::Init()
{
	m_mysql = mysql_init(NULL);	// mysql_标准的mysql c client对应的api
	if (!m_mysql)
	{
		log_error("mysql_init failed\n");
		return 1;
	}

	my_bool reconnect = true;
	mysql_options(m_mysql, MYSQL_OPT_RECONNECT, &reconnect);	// 配合mysql_ping实现自动重连
	mysql_options(m_mysql, MYSQL_SET_CHARSET_NAME, "utf8mb4");	// utf8mb4和utf8区别

	// ip 端口 用户名 密码 数据库名
	if (!mysql_real_connect(m_mysql, m_pDBPool->GetDBServerIP(), m_pDBPool->GetUsername(), m_pDBPool->GetPasswrod(),
							m_pDBPool->GetDBName(), m_pDBPool->GetDBServerPort(), NULL, 0))
	{
		log_error("mysql_real_connect failed: %s\n", mysql_error(m_mysql));
		return 2;
	}

	return 0;
}

const char *CDBConn::GetPoolName()
{
	return m_pDBPool->GetPoolName();
}

bool CDBConn::ExecuteCreate(const char *sql_query)
{
	mysql_ping(m_mysql);
	// mysql_real_query 实际就是执行了SQL
	if (mysql_real_query(m_mysql, sql_query, strlen(sql_query)))
	{
		log_error("mysql_real_query failed: %s, sql: start transaction\n", mysql_error(m_mysql));
		return false;
	}

	return true;
}
bool CDBConn::ExecuteDrop(const char *sql_query)
{
	mysql_ping(m_mysql);	// 如果端开了,能够自动重连

	if (mysql_real_query(m_mysql, sql_query, strlen(sql_query)))
	{
		log_error("mysql_real_query failed: %s, sql: start transaction\n", mysql_error(m_mysql));
		return false;
	}

	return true;
}

CResultSet *CDBConn::ExecuteQuery(const char *sql_query)
{
	mysql_ping(m_mysql);

	if (mysql_real_query(m_mysql, sql_query, strlen(sql_query)))
	{
		log_error("mysql_real_query failed: %s, sql: %s\n", mysql_error(m_mysql), sql_query);
		return NULL;
	}
	// 返回结果
	MYSQL_RES *res = mysql_store_result(m_mysql);	// 返回结果
	if (!res)
	{
		log_error("mysql_store_result failed: %s\n", mysql_error(m_mysql));
		return NULL;
	}

	CResultSet *result_set = new CResultSet(res);	// 存储到CResultSet
	return result_set;
}

/*
1.执行成功,则返回受影响的行的数目,如果最近一次查询失败的话,函数返回 -1

2.对于delete,将返回实际删除的行数.

3.对于update,如果更新的列值原值和新值一样,如update tables set col1=10 where id=1;
id=1该条记录原值就是10的话,则返回0。

mysql_affected_rows返回的是实际更新的行数,而不是匹配到的行数。
*/
bool CDBConn::ExecuteUpdate(const char *sql_query, bool care_affected_rows)
{
	mysql_ping(m_mysql);

	if (mysql_real_query(m_mysql, sql_query, strlen(sql_query)))
	{
		log_error("mysql_real_query failed: %s, sql: %s\n", mysql_error(m_mysql), sql_query);
		//g_master_conn_fail_num ++;
		return false;
	}

	if (mysql_affected_rows(m_mysql) > 0)
	{
		return true;
	}
	else
	{ // 影响的行数为0时
		if (care_affected_rows)
		{ // 如果在意影响的行数时, 返回false, 否则返回true
			log_error("mysql_real_query failed: %s, sql: %s\n\n", mysql_error(m_mysql), sql_query);
			return false;
		}
		else
		{
			log_warn("affected_rows=0, sql: %s\n\n", sql_query);
			return true;
		}
	}
}

bool CDBConn::StartTransaction()
{
	mysql_ping(m_mysql);

	if (mysql_real_query(m_mysql, "start transaction\n", 17))
	{
		log_error("mysql_real_query failed: %s, sql: start transaction\n", mysql_error(m_mysql));
		return false;
	}

	return true;
}

bool CDBConn::Rollback()
{
	mysql_ping(m_mysql);

	if (mysql_real_query(m_mysql, "rollback\n", 8))
	{
		log_error("mysql_real_query failed: %s, sql: rollback\n", mysql_error(m_mysql));
		return false;
	}

	return true;
}

bool CDBConn::Commit()
{
	mysql_ping(m_mysql);

	if (mysql_real_query(m_mysql, "commit\n", 6))
	{
		log_error("mysql_real_query failed: %s, sql: commit\n", mysql_error(m_mysql));
		return false;
	}

	return true;
}
uint32_t CDBConn::GetInsertId()
{
	return (uint32_t)mysql_insert_id(m_mysql);
}

////////////////
CDBPool::CDBPool(const char *pool_name, const char *db_server_ip, uint16_t db_server_port,
				 const char *username, const char *password, const char *db_name, int max_conn_cnt)
{
	m_pool_name = pool_name;
	m_db_server_ip = db_server_ip;
	m_db_server_port = db_server_port;
	m_username = username;
	m_password = password;
	m_db_name = db_name;
	m_db_max_conn_cnt = max_conn_cnt;	// 
	m_db_cur_conn_cnt = MIN_DB_CONN_CNT; // 最小连接数量
}

// 释放连接池
CDBPool::~CDBPool()
{
	std::lock_guard<std::mutex> lock(m_mutex);
	m_abort_request = true;
	m_cond_var.notify_all();		// 通知所有在等待的

	for (list<CDBConn *>::iterator it = m_free_list.begin(); it != m_free_list.end(); it++)
	{
		CDBConn *pConn = *it;
		delete pConn;
	}

	m_free_list.clear();
}

int CDBPool::Init()
{
	// 创建固定最小的连接数量
	for (int i = 0; i < m_db_cur_conn_cnt; i++)
	{
		CDBConn *pDBConn = new CDBConn(this);
		int ret = pDBConn->Init();
		if (ret)
		{
			delete pDBConn;
			return ret;
		}

		m_free_list.push_back(pDBConn);
	}

	// log_error("db pool: %s, size: %d\n", m_pool_name.c_str(), (int)m_free_list.size());
	return 0;
}

/*
 *TODO: 增加保护机制,把分配的连接加入另一个队列,这样获取连接时,如果没有空闲连接,
 *TODO: 检查已经分配的连接多久没有返回,如果超过一定时间,则自动收回连接,放在用户忘了调用释放连接的接口
 * timeout_ms默认为-1死等
 * timeout_ms >=0 则为等待的时间
 */
int wait_cout = 0;
CDBConn *CDBPool::GetDBConn(const int timeout_ms)
{
	std::unique_lock<std::mutex> lock(m_mutex);
	if(m_abort_request) 
	{
		log_warn("have aboort\n");
		return NULL;
	}

	if (m_free_list.empty())		// 当没有连接可以用时
	{
		// 第一步先检测 当前连接数量是否达到最大的连接数量 
		if (m_db_cur_conn_cnt >= m_db_max_conn_cnt)
		{
			// 如果已经到达了,看看是否需要超时等待
			if(timeout_ms < 0)		// 死等,直到有连接可以用 或者 连接池要退出
			{
				log_info("wait ms:%d\n", timeout_ms);
				m_cond_var.wait(lock, [this] 
				{
					// log_info("wait:%d, size:%d\n", wait_cout++, m_free_list.size());
					// 当前连接数量小于最大连接数量 或者请求释放连接池时退出
					return (!m_free_list.empty()) | m_abort_request;
				});
			} else {
				// return如果返回 false,继续wait(或者超时),  如果返回true退出wait
				// 1.m_free_list不为空
				// 2.超时退出
				// 3. m_abort_request被置为true,要释放整个连接池
				m_cond_var.wait_for(lock, std::chrono::milliseconds(timeout_ms), [this] {
					// log_info("wait_for:%d, size:%d\n", wait_cout++, m_free_list.size());
					return (!m_free_list.empty()) | m_abort_request;
				});
				// 带超时功能时还要判断是否为空
				if(m_free_list.empty()) 	// 如果连接池还是没有空闲则退出
				{
					return NULL;
				}
			}

			if(m_abort_request) 
			{
				log_warn("have aboort\n");
				return NULL;
			}
		}
		else // 还没有到最大连接则创建连接
		{
			CDBConn *pDBConn = new CDBConn(this);	//新建连接
			int ret = pDBConn->Init();
			if (ret)
			{
				log_error("Init DBConnecton failed\n\n");
				delete pDBConn;
				return NULL;
			}
			else
			{
				m_free_list.push_back(pDBConn);
				m_db_cur_conn_cnt++;
				log_info("new db connection: %s, conn_cnt: %d\n", m_pool_name.c_str(), m_db_cur_conn_cnt);
			}
		}
	}

	CDBConn *pConn = m_free_list.front();	// 获取连接
	m_free_list.pop_front();	// STL 吐出连接,从空闲队列删除
	// pConn->setCurrentTime();  // 伪代码
	m_used_list.push_back(pConn);		// 

	return pConn;
}

void CDBPool::RelDBConn(CDBConn *pConn)
{
	std::lock_guard<std::mutex> lock(m_mutex);

	list<CDBConn *>::iterator it = m_free_list.begin();
	for (; it != m_free_list.end(); it++)	// 避免重复归还
	{
		if (*it == pConn)	
		{
			break;
		}
	}

	if (it == m_free_list.end())
	{
		m_used_list.remove(pConn);
		m_free_list.push_back(pConn);
		m_cond_var.notify_one();		// 通知取队列
	} else 
	{
		log_error("RelDBConn failed\n");
	}
}
// 遍历检测是否超时未归还
// pConn->isTimeout(); // 当前时间 - 被请求的时间
// 强制回收  从m_used_list 放回 m_free_list

redis连接池

=============

头文件


/*
 * @Author: your name
 * @Date: 2019-12-07 10:54:57
 * @LastEditTime : 2020-01-10 16:35:13
 * @LastEditors  : Please set LastEditors
 * @Description: In User Settings Edit
 * @FilePath: \src\cache_pool\CachePool.h
 */
#ifndef CACHEPOOL_H_
#define CACHEPOOL_H_

#include <iostream>
#include <vector>
#include <map>
#include <list>

#include "Thread.h"

#include "hiredis.h"


using std::string;
using std::list;
using std::map; 
using std::vector; 

class CachePool;

class CacheConn {
public:
	CacheConn(const char* server_ip, int server_port, int db_index, const char* password, 
		const char *pool_name ="");
	CacheConn(CachePool* pCachePool);	
	virtual ~CacheConn();
	
	int Init();
	void DeInit();
	const char* GetPoolName();
    // 通用操作
    // 判断一个key是否存在
    bool isExists(string &key);
    // 删除某个key
    long del(string &key);

    // ------------------- 字符串相关 -------------------
	string get(string key);
    string set(string key, string& value);
	string setex(string key, int timeout, string value);
	
	// string mset(string key, map);
    //批量获取
    bool mget(const vector<string>& keys, map<string, string>& ret_value);
	//原子加减1
    long incr(string key);
    long decr(string key);


	// ---------------- 哈希相关 ------------------------
	long hdel(string key, string field);
	string hget(string key, string field);
	bool hgetAll(string key, map<string, string>& ret_value);
	long hset(string key, string field, string value);

	long hincrBy(string key, string field, long value);
    long incrBy(string key, long value);
	string hmset(string key, map<string, string>& hash);
	bool hmget(string key, list<string>& fields, list<string>& ret_value);
    
    

	// ------------ 链表相关 ------------
	long lpush(string key, string value);
	long rpush(string key, string value);
	long llen(string key);
	bool lrange(string key, long start, long end, list<string>& ret_value);

	
    bool flushdb();

private:
	CachePool* 		m_pCachePool;
	redisContext* 	m_pContext;
	uint64_t		m_last_connect_time;
	uint16_t 		m_server_port;
	string 			m_server_ip;
    string          m_password;
	uint16_t        m_db_index;
	string 			m_pool_name;
};


class CachePool {
public:
	// db_index和mysql不同的地方 
	CachePool(const char* pool_name, const char* server_ip, int server_port, int db_index, 
		const char *password, int max_conn_cnt);
	virtual ~CachePool();

	int Init();
    // 获取空闲的连接资源
	CacheConn* GetCacheConn();
    // Pool回收连接资源
	void RelCacheConn(CacheConn* pCacheConn);

	const char* GetPoolName() { return m_pool_name.c_str(); }
	const char* GetServerIP() { return m_server_ip.c_str(); }
	const char* GetPassword() { return m_password.c_str(); }
	int GetServerPort() { return m_server_port; }
	int GetDBIndex() { return m_db_index; }
private:
	string 		m_pool_name;
	string		m_server_ip;
	string 		m_password;
	int			m_server_port;
	int			m_db_index;	// mysql 数据库名字, redis db index

	int			m_cur_conn_cnt;
	int 		m_max_conn_cnt;
	list<CacheConn*>	m_free_list;
	CThreadNotify		m_free_notify;
};



#endif /* CACHEPOOL_H_ */

实现


#include "CachePool.h"

#include <stdlib.h>
#include <string.h>
#include "Thread.h"

#define log_error printf
#define log_info printf

#define MIN_CACHE_CONN_CNT 2
#define MAX_CACHE_CONN_FAIL_NUM 10

CacheConn::CacheConn(const char *server_ip, int server_port, int db_index, const char *password,
					 const char *pool_name)
{
	m_server_ip = server_ip;
	m_server_port = server_port;

	m_db_index = db_index;
	m_password = password;
	m_pool_name = pool_name;
	m_pContext = NULL;
	m_last_connect_time = 0;
}

CacheConn::CacheConn(CachePool *pCachePool)
{
	m_pCachePool = pCachePool;
	if (pCachePool)
	{
		m_server_ip = pCachePool->GetServerIP();
		m_server_port = pCachePool->GetServerPort();
		m_db_index = pCachePool->GetDBIndex();
		m_password = pCachePool->GetPassword();
		m_pool_name = pCachePool->GetPoolName();
	}
	else
	{
		log_error("pCachePool is NULL\n");
	}

	m_pContext = NULL;
	m_last_connect_time = 0;
}

CacheConn::~CacheConn()
{
	if (m_pContext)
	{
		redisFree(m_pContext);
		m_pContext = NULL;
	}
}

/*
 * redis初始化连接和重连操作,类似mysql_ping()
 */
int CacheConn::Init()
{
	if (m_pContext)	// 非空,连接是正常的
	{
		return 0;
	}

	// 1s 尝试重连一次
	uint64_t cur_time = (uint64_t)time(NULL);
	if (cur_time < m_last_connect_time + 1) 		// 重连尝试 间隔1秒 
	{
		printf("cur_time:%lu, m_last_connect_time:%lu\n", cur_time, m_last_connect_time);
		return 1;
	}
	// printf("m_last_connect_time = cur_time\n");
	m_last_connect_time = cur_time;

	// 1000ms超时
	struct timeval timeout = {0, 1000000};
	// 建立连接后使用 redisContext 来保存连接状态。
	// redisContext 在每次操作后会修改其中的 err 和  errstr 字段来表示发生的错误码(大于0)和对应的描述。
	m_pContext = redisConnectWithTimeout(m_server_ip.c_str(), m_server_port, timeout);

	if (!m_pContext || m_pContext->err)
	{
		if (m_pContext)
		{
			log_error("redisConnect failed: %s\n", m_pContext->errstr);
			redisFree(m_pContext);
			m_pContext = NULL;
		}
		else
		{
			log_error("redisConnect failed\n");
		}

		return 1;
	}

	redisReply *reply;
	// 验证
	if (!m_password.empty())
	{
		reply = (redisReply *)redisCommand(m_pContext, "AUTH %s", m_password.c_str());

		if (!reply || reply->type == REDIS_REPLY_ERROR)
		{
			log_error("Authentication failure:%p\n", reply);
			if (reply)
				freeReplyObject(reply);
			return -1;
		}
		else
		{
			// log_info("Authentication success\n");
		}

		freeReplyObject(reply);
	}

	reply = (redisReply *)redisCommand(m_pContext, "SELECT %d", 0);

	if (reply && (reply->type == REDIS_REPLY_STATUS) && (strncmp(reply->str, "OK", 2) == 0))
	{
		freeReplyObject(reply);
		return 0;
	}
	else
	{
		if (reply)
			log_error("select cache db failed:%s\n", reply->str);
		return 2;
	}
}

void CacheConn::DeInit()
{
	if (m_pContext)
	{
		redisFree(m_pContext);
		m_pContext = NULL;
	}
}

const char *CacheConn::GetPoolName()
{
	return m_pool_name.c_str();
}

string CacheConn::get(string key)
{
	string value;

	if (Init())
	{
		return value;
	}

	redisReply *reply = (redisReply *)redisCommand(m_pContext, "GET %s", key.c_str());
	if (!reply)
	{
		log_error("redisCommand failed:%s\n", m_pContext->errstr);
		redisFree(m_pContext);
		m_pContext = NULL;
		return value;
	}

	if (reply->type == REDIS_REPLY_STRING)
	{
		value.append(reply->str, reply->len);
	}

	freeReplyObject(reply);
	return value;
}

string CacheConn::set(string key, string &value)
{
	string ret_value;

	if (Init())
	{
		return ret_value;
	}
	// 返回的结果存放在redisReply
	redisReply *reply = (redisReply *)redisCommand(m_pContext, "SET %s %s", key.c_str(), value.c_str());
	if (!reply)
	{
		log_error("redisCommand failed:%s\n", m_pContext->errstr);
		redisFree(m_pContext);
		m_pContext = NULL;
		return ret_value;
	}

	ret_value.append(reply->str, reply->len);
	freeReplyObject(reply); // 释放资源
	return ret_value;
}

string CacheConn::setex(string key, int timeout, string value)
{
	string ret_value;

	if (Init())
	{
		return ret_value;
	}

	redisReply *reply = (redisReply *)redisCommand(m_pContext, "SETEX %s %d %s", key.c_str(), timeout, value.c_str());
	if (!reply)
	{
		log_error("redisCommand failed:%s\n", m_pContext->errstr);
		redisFree(m_pContext);
		m_pContext = NULL;
		return ret_value;
	}

	ret_value.append(reply->str, reply->len);
	freeReplyObject(reply);
	return ret_value;
}

bool CacheConn::mget(const vector<string> &keys, map<string, string> &ret_value)
{
	if (Init())
	{
		return false;
	}
	if (keys.empty())
	{
		return false;
	}

	string strKey;
	bool bFirst = true;
	for (vector<string>::const_iterator it = keys.begin(); it != keys.end(); ++it)
	{
		if (bFirst)
		{
			bFirst = false;
			strKey = *it;
		}
		else
		{
			strKey += " " + *it;
		}
	}

	if (strKey.empty())
	{
		return false;
	}
	strKey = "MGET " + strKey;
	redisReply *reply = (redisReply *)redisCommand(m_pContext, strKey.c_str());
	if (!reply)
	{
		log_info("redisCommand failed:%s\n", m_pContext->errstr);
		redisFree(m_pContext);
		m_pContext = NULL;
		return false;
	}
	if (reply->type == REDIS_REPLY_ARRAY)
	{
		for (size_t i = 0; i < reply->elements; ++i)
		{
			redisReply *child_reply = reply->element[i];
			if (child_reply->type == REDIS_REPLY_STRING)
			{
				ret_value[keys[i]] = child_reply->str;
			}
		}
	}
	freeReplyObject(reply);
	return true;
}

bool CacheConn::isExists(string &key)
{
	if (Init())
	{
		return false;
	}

	redisReply *reply = (redisReply *)redisCommand(m_pContext, "EXISTS %s", key.c_str());
	if (!reply)
	{
		log_error("redisCommand failed:%s\n", m_pContext->errstr);
		redisFree(m_pContext);
		m_pContext = NULL;
		return false;
	}
	long ret_value = reply->integer;
	freeReplyObject(reply);
	if (0 == ret_value)
	{
		return false;
	}
	else
	{
		return true;
	}
}

long CacheConn::del(string &key)
{
	if (Init())
	{
		return 0;
	}

	redisReply *reply = (redisReply *)redisCommand(m_pContext, "DEL %s", key.c_str());
	if (!reply)
	{
		log_error("redisCommand failed:%s\n", m_pContext->errstr);
		redisFree(m_pContext);
		m_pContext = NULL;
		return 0;
	}

	long ret_value = reply->integer;
	freeReplyObject(reply);
	return ret_value;
}

long CacheConn::hdel(string key, string field)
{
	if (Init())
	{
		return 0;
	}

	redisReply *reply = (redisReply *)redisCommand(m_pContext, "HDEL %s %s", key.c_str(), field.c_str());
	if (!reply)
	{
		log_error("redisCommand failed:%s\n", m_pContext->errstr);
		redisFree(m_pContext);
		m_pContext = NULL;
		return 0;
	}

	long ret_value = reply->integer;
	freeReplyObject(reply);
	return ret_value;
}

string CacheConn::hget(string key, string field)
{
	string ret_value;
	if (Init())
	{
		return ret_value;
	}

	redisReply *reply = (redisReply *)redisCommand(m_pContext, "HGET %s %s", key.c_str(), field.c_str());
	if (!reply)
	{
		log_error("redisCommand failed:%s\n", m_pContext->errstr);
		redisFree(m_pContext);
		m_pContext = NULL;
		return ret_value;
	}

	if (reply->type == REDIS_REPLY_STRING)
	{
		ret_value.append(reply->str, reply->len);
	}

	freeReplyObject(reply);
	return ret_value;
}

bool CacheConn::hgetAll(string key, map<string, string> &ret_value)
{
	if (Init())
	{
		return false;
	}

	redisReply *reply = (redisReply *)redisCommand(m_pContext, "HGETALL %s", key.c_str());
	if (!reply)
	{
		log_error("redisCommand failed:%s\n", m_pContext->errstr);
		redisFree(m_pContext);
		m_pContext = NULL;
		return false;
	}

	if ((reply->type == REDIS_REPLY_ARRAY) && (reply->elements % 2 == 0))
	{
		for (size_t i = 0; i < reply->elements; i += 2)
		{
			redisReply *field_reply = reply->element[i];
			redisReply *value_reply = reply->element[i + 1];

			string field(field_reply->str, field_reply->len);
			string value(value_reply->str, value_reply->len);
			ret_value.insert(make_pair(field, value));
		}
	}

	freeReplyObject(reply);
	return true;
}

long CacheConn::hset(string key, string field, string value)
{
	if (Init())
	{
		return -1;
	}

	redisReply *reply = (redisReply *)redisCommand(m_pContext, "HSET %s %s %s", key.c_str(), field.c_str(), value.c_str());
	if (!reply)
	{
		log_error("redisCommand failed:%s\n", m_pContext->errstr);
		redisFree(m_pContext);
		m_pContext = NULL;
		return -1;
	}

	long ret_value = reply->integer;
	freeReplyObject(reply);
	return ret_value;
}

long CacheConn::hincrBy(string key, string field, long value)
{
	if (Init())
	{
		return -1;
	}

	redisReply *reply = (redisReply *)redisCommand(m_pContext, "HINCRBY %s %s %ld", key.c_str(), field.c_str(), value);
	if (!reply)
	{
		log_error("redisCommand failed:%s\n", m_pContext->errstr);
		redisFree(m_pContext);
		m_pContext = NULL;
		return -1;
	}

	long ret_value = reply->integer;
	freeReplyObject(reply);
	return ret_value;
}

long CacheConn::incrBy(string key, long value)
{
	if (Init())
	{
		return -1;
	}

	redisReply *reply = (redisReply *)redisCommand(m_pContext, "INCRBY %s %ld", key.c_str(), value);
	if (!reply)
	{
		log_error("redis Command failed:%s\n", m_pContext->errstr);
		redisFree(m_pContext);
		m_pContext = NULL;
		return -1;
	}
	long ret_value = reply->integer;
	freeReplyObject(reply);
	return ret_value;
}

string CacheConn::hmset(string key, map<string, string> &hash)
{
	string ret_value;

	if (Init())
	{
		return ret_value;
	}

	int argc = hash.size() * 2 + 2;
	const char **argv = new const char *[argc];
	if (!argv)
	{
		return ret_value;
	}

	argv[0] = "HMSET";
	argv[1] = key.c_str();
	int i = 2;
	for (map<string, string>::iterator it = hash.begin(); it != hash.end(); it++)
	{
		argv[i++] = it->first.c_str();
		argv[i++] = it->second.c_str();
	}

	redisReply *reply = (redisReply *)redisCommandArgv(m_pContext, argc, argv, NULL);
	if (!reply)
	{
		log_error("redisCommand failed:%s\n", m_pContext->errstr);
		delete[] argv;

		redisFree(m_pContext);
		m_pContext = NULL;
		return ret_value;
	}

	ret_value.append(reply->str, reply->len);

	delete[] argv;
	freeReplyObject(reply);
	return ret_value;
}

bool CacheConn::hmget(string key, list<string> &fields, list<string> &ret_value)
{
	if (Init())
	{
		return false;
	}

	int argc = fields.size() + 2;
	const char **argv = new const char *[argc];
	if (!argv)
	{
		return false;
	}

	argv[0] = "HMGET";
	argv[1] = key.c_str();
	int i = 2;
	for (list<string>::iterator it = fields.begin(); it != fields.end(); it++)
	{
		argv[i++] = it->c_str();
	}

	redisReply *reply = (redisReply *)redisCommandArgv(m_pContext, argc, (const char **)argv, NULL);
	if (!reply)
	{
		log_error("redisCommand failed:%s\n", m_pContext->errstr);
		delete[] argv;

		redisFree(m_pContext);
		m_pContext = NULL;

		return false;
	}

	if (reply->type == REDIS_REPLY_ARRAY)
	{
		for (size_t i = 0; i < reply->elements; i++)
		{
			redisReply *value_reply = reply->element[i];
			string value(value_reply->str, value_reply->len);
			ret_value.push_back(value);
		}
	}

	delete[] argv;
	freeReplyObject(reply);
	return true;
}

long CacheConn::incr(string key)
{
	if (Init())
	{
		return -1;
	}

	redisReply *reply = (redisReply *)redisCommand(m_pContext, "INCR %s", key.c_str());
	if (!reply)
	{
		log_error("redis Command failed:%s\n", m_pContext->errstr);
		redisFree(m_pContext);
		m_pContext = NULL;
		return -1;
	}
	long ret_value = reply->integer;
	freeReplyObject(reply);
	return ret_value;
}

long CacheConn::decr(string key)
{
	if (Init())
	{
		return -1;
	}

	redisReply *reply = (redisReply *)redisCommand(m_pContext, "DECR %s", key.c_str());
	if (!reply)
	{
		log_error("redis Command failed:%s\n", m_pContext->errstr);
		redisFree(m_pContext);
		m_pContext = NULL;
		return -1;
	}
	long ret_value = reply->integer;
	freeReplyObject(reply);
	return ret_value;
}

long CacheConn::lpush(string key, string value)
{
	if (Init())
	{
		return -1;
	}

	redisReply *reply = (redisReply *)redisCommand(m_pContext, "LPUSH %s %s", key.c_str(), value.c_str());
	if (!reply)
	{
		log_error("redisCommand failed:%s\n", m_pContext->errstr);
		redisFree(m_pContext);
		m_pContext = NULL;
		return -1;
	}

	long ret_value = reply->integer;
	freeReplyObject(reply);
	return ret_value;
}

long CacheConn::rpush(string key, string value)
{
	if (Init())
	{
		return -1;
	}

	redisReply *reply = (redisReply *)redisCommand(m_pContext, "RPUSH %s %s", key.c_str(), value.c_str());
	if (!reply)
	{
		log_error("redisCommand failed:%s\n", m_pContext->errstr);
		redisFree(m_pContext);
		m_pContext = NULL;
		return -1;
	}

	long ret_value = reply->integer;
	freeReplyObject(reply);
	return ret_value;
}

long CacheConn::llen(string key)
{
	if (Init())
	{
		return -1;
	}

	redisReply *reply = (redisReply *)redisCommand(m_pContext, "LLEN %s", key.c_str());
	if (!reply)
	{
		log_error("redisCommand failed:%s\n", m_pContext->errstr);
		redisFree(m_pContext);
		m_pContext = NULL;
		return -1;
	}

	long ret_value = reply->integer;
	freeReplyObject(reply);
	return ret_value;
}

bool CacheConn::lrange(string key, long start, long end, list<string> &ret_value)
{
	if (Init())
	{
		return false;
	}

	redisReply *reply = (redisReply *)redisCommand(m_pContext, "LRANGE %s %d %d", key.c_str(), start, end);
	if (!reply)
	{
		log_error("redisCommand failed:%s\n", m_pContext->errstr);
		redisFree(m_pContext);
		m_pContext = NULL;
		return false;
	}

	if (reply->type == REDIS_REPLY_ARRAY)
	{
		for (size_t i = 0; i < reply->elements; i++)
		{
			redisReply *value_reply = reply->element[i];
			string value(value_reply->str, value_reply->len);
			ret_value.push_back(value);
		}
	}

	freeReplyObject(reply);
	return true;
}

bool CacheConn::flushdb()
{
	bool ret = false;
	if (Init())
	{
		return false;
	}

	redisReply *reply = (redisReply *)redisCommand(m_pContext, "FLUSHDB");
	if (!reply)
	{
		log_error("redisCommand failed:%s\n", m_pContext->errstr);
		redisFree(m_pContext);
		m_pContext = NULL;
		return false;
	}

	if (reply->type == REDIS_REPLY_STRING && strncmp(reply->str, "OK", 2) == 0)
	{
		ret = true;
	}

	freeReplyObject(reply);

	return ret;
}
///////////////
CachePool::CachePool(const char *pool_name, const char *server_ip, int server_port, int db_index,
					 const char *password, int max_conn_cnt)
{
	m_pool_name = pool_name;
	m_server_ip = server_ip;
	m_server_port = server_port;
	m_db_index = db_index;
	m_password = password;
	m_max_conn_cnt = max_conn_cnt;
	m_cur_conn_cnt = MIN_CACHE_CONN_CNT;
}

CachePool::~CachePool()
{
	m_free_notify.Lock();
	for (list<CacheConn *>::iterator it = m_free_list.begin(); it != m_free_list.end(); it++)
	{
		CacheConn *pConn = *it;
		delete pConn;
	}

	m_free_list.clear();
	m_cur_conn_cnt = 0;
	m_free_notify.Unlock();
}

int CachePool::Init()
{
	for (int i = 0; i < m_cur_conn_cnt; i++)
	{
		CacheConn *pConn = new CacheConn(m_server_ip.c_str(), m_server_port,
										 m_db_index, m_password.c_str(), m_pool_name.c_str());
		if (pConn->Init())
		{
			delete pConn;
			return 1;
		}

		m_free_list.push_back(pConn);
	}

	log_info("cache pool: %s, list size: %lu\n", m_pool_name.c_str(), m_free_list.size());
	return 0;
}

CacheConn *CachePool::GetCacheConn()
{
	m_free_notify.Lock();

	while (m_free_list.empty())
	{
		if (m_cur_conn_cnt >= m_max_conn_cnt)
		{
			m_free_notify.Wait();
		}
		else
		{
			CacheConn *p_cache_conn = new CacheConn(m_server_ip.c_str(), m_server_port,
													m_db_index, m_password.c_str(), m_pool_name.c_str());
			int ret = p_cache_conn->Init();
			if (ret)
			{
				log_error("Init CacheConn failed\n");
				delete p_cache_conn;
				m_free_notify.Unlock();
				return NULL;
			}
			else
			{
				m_free_list.push_back(p_cache_conn);
				m_cur_conn_cnt++;
				log_info("new cache connection: %s, conn_cnt: %d\n", m_pool_name.c_str(), m_cur_conn_cnt);
			}
		}
	}

	CacheConn *pConn = m_free_list.front();
	m_free_list.pop_front();

	m_free_notify.Unlock();

	return pConn;
}

void CachePool::RelCacheConn(CacheConn *p_cache_conn)
{
	m_free_notify.Lock();

	list<CacheConn *>::iterator it = m_free_list.begin();
	for (; it != m_free_list.end(); it++)
	{
		if (*it == p_cache_conn)
		{
			break;
		}
	}

	if (it == m_free_list.end())
	{
		m_free_list.push_back(p_cache_conn);
	}

	m_free_notify.Signal();
	m_free_notify.Unlock();
}

comments powered by Disqus
Built with Hugo
Theme Stack designed by Jimmy