连接池实现
mysql连接池
头文件:
#ifndef DBPOOL_H_
#define DBPOOL_H_
#include <iostream>
#include <list>
#include <mutex>
#include <condition_variable>
#include <map>
#include <stdint.h>
#include <mysql.h>
#define MAX_ESCAPE_STRING_LEN 10240
using namespace std;
// 返回结果 select的时候用
class CResultSet {
public:
CResultSet(MYSQL_RES* res);
virtual ~CResultSet();
bool Next();
int GetInt(const char* key);
char* GetString(const char* key);
private:
int _GetIndex(const char* key);
MYSQL_RES* m_res;
MYSQL_ROW m_row;
map<string, int> m_key_map;
};
// 插入数据用
class CPrepareStatement {
public:
CPrepareStatement();
virtual ~CPrepareStatement();
bool Init(MYSQL* mysql, string& sql);
void SetParam(uint32_t index, int& value);
void SetParam(uint32_t index, uint32_t& value);
void SetParam(uint32_t index, string& value);
void SetParam(uint32_t index, const string& value);
bool ExecuteUpdate();
uint32_t GetInsertId();
private:
MYSQL_STMT* m_stmt;
MYSQL_BIND* m_param_bind;
uint32_t m_param_cnt;
};
class CDBPool;
class CDBConn {
public:
CDBConn(CDBPool* pDBPool);
virtual ~CDBConn();
int Init();
// 创建表
bool ExecuteCreate(const char* sql_query);
// 删除表
bool ExecuteDrop(const char* sql_query);
// 查询
CResultSet* ExecuteQuery(const char* sql_query);
/**
* 执行DB更新,修改
*
* @param sql_query sql
* @param care_affected_rows 是否在意影响的行数,false:不在意;true:在意
*
* @return 成功返回true 失败返回false
*/
bool ExecuteUpdate(const char* sql_query, bool care_affected_rows = true);
uint32_t GetInsertId();
// 开启事务
bool StartTransaction();
// 提交事务
bool Commit();
// 回滚事务
bool Rollback();
// 获取连接池名
const char* GetPoolName();
MYSQL* GetMysql() { return m_mysql; }
private:
CDBPool* m_pDBPool; // to get MySQL server information
MYSQL* m_mysql; // 对应一个连接
char m_escape_string[MAX_ESCAPE_STRING_LEN + 1];
};
class CDBPool { // 只是负责管理连接CDBConn,真正干活的是CDBConn
public:
CDBPool() {}
CDBPool(const char* pool_name, const char* db_server_ip, uint16_t db_server_port,
const char* username, const char* password, const char* db_name,
int max_conn_cnt);
virtual ~CDBPool();
int Init(); // 连接数据库,创建连接
CDBConn* GetDBConn(const int timeout_ms = -1); // 获取连接资源
void RelDBConn(CDBConn* pConn); // 归还连接资源
const char* GetPoolName() { return m_pool_name.c_str(); }
const char* GetDBServerIP() { return m_db_server_ip.c_str(); }
uint16_t GetDBServerPort() { return m_db_server_port; }
const char* GetUsername() { return m_username.c_str(); }
const char* GetPasswrod() { return m_password.c_str(); }
const char* GetDBName() { return m_db_name.c_str(); }
private:
string m_pool_name; // 连接池名称
string m_db_server_ip; // 数据库ip
uint16_t m_db_server_port; // 数据库端口
string m_username; // 用户名
string m_password; // 用户密码
string m_db_name; // db名称
int m_db_cur_conn_cnt; // 当前启用的连接数量
int m_db_max_conn_cnt; // 最大连接数量
list<CDBConn*> m_free_list; // 空闲的连接
list<CDBConn*> m_used_list; // 记录已经被请求的连接
std::mutex m_mutex;
std::condition_variable m_cond_var;
bool m_abort_request = false;
// CThreadNotify m_free_notify; // 信号量
};
#endif /* DBPOOL_H_ */
};
实现:
#include "DBPool.h"
#include <string.h>
#define log_error printf
#define log_warn printf
#define log_info printf
#define MIN_DB_CONN_CNT 2
#define MAX_DB_CONN_FAIL_NUM 10
CResultSet::CResultSet(MYSQL_RES *res)
{
m_res = res;
// map table field key to index in the result array
int num_fields = mysql_num_fields(m_res);
MYSQL_FIELD *fields = mysql_fetch_fields(m_res);
for (int i = 0; i < num_fields; i++)
{
// 多行
m_key_map.insert(make_pair(fields[i].name, i));
}
}
CResultSet::~CResultSet()
{
if (m_res)
{
mysql_free_result(m_res);
m_res = NULL;
}
}
bool CResultSet::Next()
{
m_row = mysql_fetch_row(m_res);
if (m_row)
{
return true;
}
else
{
return false;
}
}
int CResultSet::_GetIndex(const char *key)
{
map<string, int>::iterator it = m_key_map.find(key);
if (it == m_key_map.end())
{
return -1;
}
else
{
return it->second;
}
}
int CResultSet::GetInt(const char *key)
{
int idx = _GetIndex(key);
if (idx == -1)
{
return 0;
}
else
{
return atoi(m_row[idx]); // 有索引
}
}
char *CResultSet::GetString(const char *key)
{
int idx = _GetIndex(key);
if (idx == -1)
{
return NULL;
}
else
{
return m_row[idx]; // 列
}
}
/////////////////////////////////////////
CPrepareStatement::CPrepareStatement()
{
m_stmt = NULL;
m_param_bind = NULL;
m_param_cnt = 0;
}
CPrepareStatement::~CPrepareStatement()
{
if (m_stmt)
{
mysql_stmt_close(m_stmt);
m_stmt = NULL;
}
if (m_param_bind)
{
delete[] m_param_bind;
m_param_bind = NULL;
}
}
bool CPrepareStatement::Init(MYSQL *mysql, string &sql)
{
mysql_ping(mysql); // 当mysql连接丢失的时候,使用mysql_ping能够自动重连数据库
//g_master_conn_fail_num ++;
m_stmt = mysql_stmt_init(mysql);
if (!m_stmt)
{
log_error("mysql_stmt_init failed\n");
return false;
}
if (mysql_stmt_prepare(m_stmt, sql.c_str(), sql.size()))
{
log_error("mysql_stmt_prepare failed: %s\n", mysql_stmt_error(m_stmt));
return false;
}
m_param_cnt = mysql_stmt_param_count(m_stmt);
if (m_param_cnt > 0)
{
m_param_bind = new MYSQL_BIND[m_param_cnt];
if (!m_param_bind)
{
log_error("new failed\n");
return false;
}
memset(m_param_bind, 0, sizeof(MYSQL_BIND) * m_param_cnt);
}
return true;
}
void CPrepareStatement::SetParam(uint32_t index, int &value)
{
if (index >= m_param_cnt)
{
log_error("index too large: %d\n", index);
return;
}
m_param_bind[index].buffer_type = MYSQL_TYPE_LONG;
m_param_bind[index].buffer = &value;
}
void CPrepareStatement::SetParam(uint32_t index, uint32_t &value)
{
if (index >= m_param_cnt)
{
log_error("index too large: %d\n", index);
return;
}
m_param_bind[index].buffer_type = MYSQL_TYPE_LONG;
m_param_bind[index].buffer = &value;
}
void CPrepareStatement::SetParam(uint32_t index, string &value)
{
if (index >= m_param_cnt)
{
log_error("index too large: %d\n", index);
return;
}
m_param_bind[index].buffer_type = MYSQL_TYPE_STRING;
m_param_bind[index].buffer = (char *)value.c_str();
m_param_bind[index].buffer_length = value.size();
}
void CPrepareStatement::SetParam(uint32_t index, const string &value)
{
if (index >= m_param_cnt)
{
log_error("index too large: %d\n", index);
return;
}
m_param_bind[index].buffer_type = MYSQL_TYPE_STRING;
m_param_bind[index].buffer = (char *)value.c_str();
m_param_bind[index].buffer_length = value.size();
}
bool CPrepareStatement::ExecuteUpdate()
{
if (!m_stmt)
{
log_error("no m_stmt\n");
return false;
}
if (mysql_stmt_bind_param(m_stmt, m_param_bind))
{
log_error("mysql_stmt_bind_param failed: %s\n", mysql_stmt_error(m_stmt));
return false;
}
if (mysql_stmt_execute(m_stmt))
{
log_error("mysql_stmt_execute failed: %s\n", mysql_stmt_error(m_stmt));
return false;
}
if (mysql_stmt_affected_rows(m_stmt) == 0)
{
log_error("ExecuteUpdate have no effect\n");
return false;
}
return true;
}
uint32_t CPrepareStatement::GetInsertId()
{
return mysql_stmt_insert_id(m_stmt);
}
/////////////////////
CDBConn::CDBConn(CDBPool *pPool)
{
m_pDBPool = pPool;
m_mysql = NULL;
}
CDBConn::~CDBConn()
{
if (m_mysql)
{
mysql_close(m_mysql);
}
}
int CDBConn::Init()
{
m_mysql = mysql_init(NULL); // mysql_标准的mysql c client对应的api
if (!m_mysql)
{
log_error("mysql_init failed\n");
return 1;
}
my_bool reconnect = true;
mysql_options(m_mysql, MYSQL_OPT_RECONNECT, &reconnect); // 配合mysql_ping实现自动重连
mysql_options(m_mysql, MYSQL_SET_CHARSET_NAME, "utf8mb4"); // utf8mb4和utf8区别
// ip 端口 用户名 密码 数据库名
if (!mysql_real_connect(m_mysql, m_pDBPool->GetDBServerIP(), m_pDBPool->GetUsername(), m_pDBPool->GetPasswrod(),
m_pDBPool->GetDBName(), m_pDBPool->GetDBServerPort(), NULL, 0))
{
log_error("mysql_real_connect failed: %s\n", mysql_error(m_mysql));
return 2;
}
return 0;
}
const char *CDBConn::GetPoolName()
{
return m_pDBPool->GetPoolName();
}
bool CDBConn::ExecuteCreate(const char *sql_query)
{
mysql_ping(m_mysql);
// mysql_real_query 实际就是执行了SQL
if (mysql_real_query(m_mysql, sql_query, strlen(sql_query)))
{
log_error("mysql_real_query failed: %s, sql: start transaction\n", mysql_error(m_mysql));
return false;
}
return true;
}
bool CDBConn::ExecuteDrop(const char *sql_query)
{
mysql_ping(m_mysql); // 如果端开了,能够自动重连
if (mysql_real_query(m_mysql, sql_query, strlen(sql_query)))
{
log_error("mysql_real_query failed: %s, sql: start transaction\n", mysql_error(m_mysql));
return false;
}
return true;
}
CResultSet *CDBConn::ExecuteQuery(const char *sql_query)
{
mysql_ping(m_mysql);
if (mysql_real_query(m_mysql, sql_query, strlen(sql_query)))
{
log_error("mysql_real_query failed: %s, sql: %s\n", mysql_error(m_mysql), sql_query);
return NULL;
}
// 返回结果
MYSQL_RES *res = mysql_store_result(m_mysql); // 返回结果
if (!res)
{
log_error("mysql_store_result failed: %s\n", mysql_error(m_mysql));
return NULL;
}
CResultSet *result_set = new CResultSet(res); // 存储到CResultSet
return result_set;
}
/*
1.执行成功,则返回受影响的行的数目,如果最近一次查询失败的话,函数返回 -1
2.对于delete,将返回实际删除的行数.
3.对于update,如果更新的列值原值和新值一样,如update tables set col1=10 where id=1;
id=1该条记录原值就是10的话,则返回0。
mysql_affected_rows返回的是实际更新的行数,而不是匹配到的行数。
*/
bool CDBConn::ExecuteUpdate(const char *sql_query, bool care_affected_rows)
{
mysql_ping(m_mysql);
if (mysql_real_query(m_mysql, sql_query, strlen(sql_query)))
{
log_error("mysql_real_query failed: %s, sql: %s\n", mysql_error(m_mysql), sql_query);
//g_master_conn_fail_num ++;
return false;
}
if (mysql_affected_rows(m_mysql) > 0)
{
return true;
}
else
{ // 影响的行数为0时
if (care_affected_rows)
{ // 如果在意影响的行数时, 返回false, 否则返回true
log_error("mysql_real_query failed: %s, sql: %s\n\n", mysql_error(m_mysql), sql_query);
return false;
}
else
{
log_warn("affected_rows=0, sql: %s\n\n", sql_query);
return true;
}
}
}
bool CDBConn::StartTransaction()
{
mysql_ping(m_mysql);
if (mysql_real_query(m_mysql, "start transaction\n", 17))
{
log_error("mysql_real_query failed: %s, sql: start transaction\n", mysql_error(m_mysql));
return false;
}
return true;
}
bool CDBConn::Rollback()
{
mysql_ping(m_mysql);
if (mysql_real_query(m_mysql, "rollback\n", 8))
{
log_error("mysql_real_query failed: %s, sql: rollback\n", mysql_error(m_mysql));
return false;
}
return true;
}
bool CDBConn::Commit()
{
mysql_ping(m_mysql);
if (mysql_real_query(m_mysql, "commit\n", 6))
{
log_error("mysql_real_query failed: %s, sql: commit\n", mysql_error(m_mysql));
return false;
}
return true;
}
uint32_t CDBConn::GetInsertId()
{
return (uint32_t)mysql_insert_id(m_mysql);
}
////////////////
CDBPool::CDBPool(const char *pool_name, const char *db_server_ip, uint16_t db_server_port,
const char *username, const char *password, const char *db_name, int max_conn_cnt)
{
m_pool_name = pool_name;
m_db_server_ip = db_server_ip;
m_db_server_port = db_server_port;
m_username = username;
m_password = password;
m_db_name = db_name;
m_db_max_conn_cnt = max_conn_cnt; //
m_db_cur_conn_cnt = MIN_DB_CONN_CNT; // 最小连接数量
}
// 释放连接池
CDBPool::~CDBPool()
{
std::lock_guard<std::mutex> lock(m_mutex);
m_abort_request = true;
m_cond_var.notify_all(); // 通知所有在等待的
for (list<CDBConn *>::iterator it = m_free_list.begin(); it != m_free_list.end(); it++)
{
CDBConn *pConn = *it;
delete pConn;
}
m_free_list.clear();
}
int CDBPool::Init()
{
// 创建固定最小的连接数量
for (int i = 0; i < m_db_cur_conn_cnt; i++)
{
CDBConn *pDBConn = new CDBConn(this);
int ret = pDBConn->Init();
if (ret)
{
delete pDBConn;
return ret;
}
m_free_list.push_back(pDBConn);
}
// log_error("db pool: %s, size: %d\n", m_pool_name.c_str(), (int)m_free_list.size());
return 0;
}
/*
*TODO: 增加保护机制,把分配的连接加入另一个队列,这样获取连接时,如果没有空闲连接,
*TODO: 检查已经分配的连接多久没有返回,如果超过一定时间,则自动收回连接,放在用户忘了调用释放连接的接口
* timeout_ms默认为-1死等
* timeout_ms >=0 则为等待的时间
*/
int wait_cout = 0;
CDBConn *CDBPool::GetDBConn(const int timeout_ms)
{
std::unique_lock<std::mutex> lock(m_mutex);
if(m_abort_request)
{
log_warn("have aboort\n");
return NULL;
}
if (m_free_list.empty()) // 当没有连接可以用时
{
// 第一步先检测 当前连接数量是否达到最大的连接数量
if (m_db_cur_conn_cnt >= m_db_max_conn_cnt)
{
// 如果已经到达了,看看是否需要超时等待
if(timeout_ms < 0) // 死等,直到有连接可以用 或者 连接池要退出
{
log_info("wait ms:%d\n", timeout_ms);
m_cond_var.wait(lock, [this]
{
// log_info("wait:%d, size:%d\n", wait_cout++, m_free_list.size());
// 当前连接数量小于最大连接数量 或者请求释放连接池时退出
return (!m_free_list.empty()) | m_abort_request;
});
} else {
// return如果返回 false,继续wait(或者超时), 如果返回true退出wait
// 1.m_free_list不为空
// 2.超时退出
// 3. m_abort_request被置为true,要释放整个连接池
m_cond_var.wait_for(lock, std::chrono::milliseconds(timeout_ms), [this] {
// log_info("wait_for:%d, size:%d\n", wait_cout++, m_free_list.size());
return (!m_free_list.empty()) | m_abort_request;
});
// 带超时功能时还要判断是否为空
if(m_free_list.empty()) // 如果连接池还是没有空闲则退出
{
return NULL;
}
}
if(m_abort_request)
{
log_warn("have aboort\n");
return NULL;
}
}
else // 还没有到最大连接则创建连接
{
CDBConn *pDBConn = new CDBConn(this); //新建连接
int ret = pDBConn->Init();
if (ret)
{
log_error("Init DBConnecton failed\n\n");
delete pDBConn;
return NULL;
}
else
{
m_free_list.push_back(pDBConn);
m_db_cur_conn_cnt++;
log_info("new db connection: %s, conn_cnt: %d\n", m_pool_name.c_str(), m_db_cur_conn_cnt);
}
}
}
CDBConn *pConn = m_free_list.front(); // 获取连接
m_free_list.pop_front(); // STL 吐出连接,从空闲队列删除
// pConn->setCurrentTime(); // 伪代码
m_used_list.push_back(pConn); //
return pConn;
}
void CDBPool::RelDBConn(CDBConn *pConn)
{
std::lock_guard<std::mutex> lock(m_mutex);
list<CDBConn *>::iterator it = m_free_list.begin();
for (; it != m_free_list.end(); it++) // 避免重复归还
{
if (*it == pConn)
{
break;
}
}
if (it == m_free_list.end())
{
m_used_list.remove(pConn);
m_free_list.push_back(pConn);
m_cond_var.notify_one(); // 通知取队列
} else
{
log_error("RelDBConn failed\n");
}
}
// 遍历检测是否超时未归还
// pConn->isTimeout(); // 当前时间 - 被请求的时间
// 强制回收 从m_used_list 放回 m_free_list
redis连接池
=============
头文件
/*
* @Author: your name
* @Date: 2019-12-07 10:54:57
* @LastEditTime : 2020-01-10 16:35:13
* @LastEditors : Please set LastEditors
* @Description: In User Settings Edit
* @FilePath: \src\cache_pool\CachePool.h
*/
#ifndef CACHEPOOL_H_
#define CACHEPOOL_H_
#include <iostream>
#include <vector>
#include <map>
#include <list>
#include "Thread.h"
#include "hiredis.h"
using std::string;
using std::list;
using std::map;
using std::vector;
class CachePool;
class CacheConn {
public:
CacheConn(const char* server_ip, int server_port, int db_index, const char* password,
const char *pool_name ="");
CacheConn(CachePool* pCachePool);
virtual ~CacheConn();
int Init();
void DeInit();
const char* GetPoolName();
// 通用操作
// 判断一个key是否存在
bool isExists(string &key);
// 删除某个key
long del(string &key);
// ------------------- 字符串相关 -------------------
string get(string key);
string set(string key, string& value);
string setex(string key, int timeout, string value);
// string mset(string key, map);
//批量获取
bool mget(const vector<string>& keys, map<string, string>& ret_value);
//原子加减1
long incr(string key);
long decr(string key);
// ---------------- 哈希相关 ------------------------
long hdel(string key, string field);
string hget(string key, string field);
bool hgetAll(string key, map<string, string>& ret_value);
long hset(string key, string field, string value);
long hincrBy(string key, string field, long value);
long incrBy(string key, long value);
string hmset(string key, map<string, string>& hash);
bool hmget(string key, list<string>& fields, list<string>& ret_value);
// ------------ 链表相关 ------------
long lpush(string key, string value);
long rpush(string key, string value);
long llen(string key);
bool lrange(string key, long start, long end, list<string>& ret_value);
bool flushdb();
private:
CachePool* m_pCachePool;
redisContext* m_pContext;
uint64_t m_last_connect_time;
uint16_t m_server_port;
string m_server_ip;
string m_password;
uint16_t m_db_index;
string m_pool_name;
};
class CachePool {
public:
// db_index和mysql不同的地方
CachePool(const char* pool_name, const char* server_ip, int server_port, int db_index,
const char *password, int max_conn_cnt);
virtual ~CachePool();
int Init();
// 获取空闲的连接资源
CacheConn* GetCacheConn();
// Pool回收连接资源
void RelCacheConn(CacheConn* pCacheConn);
const char* GetPoolName() { return m_pool_name.c_str(); }
const char* GetServerIP() { return m_server_ip.c_str(); }
const char* GetPassword() { return m_password.c_str(); }
int GetServerPort() { return m_server_port; }
int GetDBIndex() { return m_db_index; }
private:
string m_pool_name;
string m_server_ip;
string m_password;
int m_server_port;
int m_db_index; // mysql 数据库名字, redis db index
int m_cur_conn_cnt;
int m_max_conn_cnt;
list<CacheConn*> m_free_list;
CThreadNotify m_free_notify;
};
#endif /* CACHEPOOL_H_ */
实现
#include "CachePool.h"
#include <stdlib.h>
#include <string.h>
#include "Thread.h"
#define log_error printf
#define log_info printf
#define MIN_CACHE_CONN_CNT 2
#define MAX_CACHE_CONN_FAIL_NUM 10
CacheConn::CacheConn(const char *server_ip, int server_port, int db_index, const char *password,
const char *pool_name)
{
m_server_ip = server_ip;
m_server_port = server_port;
m_db_index = db_index;
m_password = password;
m_pool_name = pool_name;
m_pContext = NULL;
m_last_connect_time = 0;
}
CacheConn::CacheConn(CachePool *pCachePool)
{
m_pCachePool = pCachePool;
if (pCachePool)
{
m_server_ip = pCachePool->GetServerIP();
m_server_port = pCachePool->GetServerPort();
m_db_index = pCachePool->GetDBIndex();
m_password = pCachePool->GetPassword();
m_pool_name = pCachePool->GetPoolName();
}
else
{
log_error("pCachePool is NULL\n");
}
m_pContext = NULL;
m_last_connect_time = 0;
}
CacheConn::~CacheConn()
{
if (m_pContext)
{
redisFree(m_pContext);
m_pContext = NULL;
}
}
/*
* redis初始化连接和重连操作,类似mysql_ping()
*/
int CacheConn::Init()
{
if (m_pContext) // 非空,连接是正常的
{
return 0;
}
// 1s 尝试重连一次
uint64_t cur_time = (uint64_t)time(NULL);
if (cur_time < m_last_connect_time + 1) // 重连尝试 间隔1秒
{
printf("cur_time:%lu, m_last_connect_time:%lu\n", cur_time, m_last_connect_time);
return 1;
}
// printf("m_last_connect_time = cur_time\n");
m_last_connect_time = cur_time;
// 1000ms超时
struct timeval timeout = {0, 1000000};
// 建立连接后使用 redisContext 来保存连接状态。
// redisContext 在每次操作后会修改其中的 err 和 errstr 字段来表示发生的错误码(大于0)和对应的描述。
m_pContext = redisConnectWithTimeout(m_server_ip.c_str(), m_server_port, timeout);
if (!m_pContext || m_pContext->err)
{
if (m_pContext)
{
log_error("redisConnect failed: %s\n", m_pContext->errstr);
redisFree(m_pContext);
m_pContext = NULL;
}
else
{
log_error("redisConnect failed\n");
}
return 1;
}
redisReply *reply;
// 验证
if (!m_password.empty())
{
reply = (redisReply *)redisCommand(m_pContext, "AUTH %s", m_password.c_str());
if (!reply || reply->type == REDIS_REPLY_ERROR)
{
log_error("Authentication failure:%p\n", reply);
if (reply)
freeReplyObject(reply);
return -1;
}
else
{
// log_info("Authentication success\n");
}
freeReplyObject(reply);
}
reply = (redisReply *)redisCommand(m_pContext, "SELECT %d", 0);
if (reply && (reply->type == REDIS_REPLY_STATUS) && (strncmp(reply->str, "OK", 2) == 0))
{
freeReplyObject(reply);
return 0;
}
else
{
if (reply)
log_error("select cache db failed:%s\n", reply->str);
return 2;
}
}
void CacheConn::DeInit()
{
if (m_pContext)
{
redisFree(m_pContext);
m_pContext = NULL;
}
}
const char *CacheConn::GetPoolName()
{
return m_pool_name.c_str();
}
string CacheConn::get(string key)
{
string value;
if (Init())
{
return value;
}
redisReply *reply = (redisReply *)redisCommand(m_pContext, "GET %s", key.c_str());
if (!reply)
{
log_error("redisCommand failed:%s\n", m_pContext->errstr);
redisFree(m_pContext);
m_pContext = NULL;
return value;
}
if (reply->type == REDIS_REPLY_STRING)
{
value.append(reply->str, reply->len);
}
freeReplyObject(reply);
return value;
}
string CacheConn::set(string key, string &value)
{
string ret_value;
if (Init())
{
return ret_value;
}
// 返回的结果存放在redisReply
redisReply *reply = (redisReply *)redisCommand(m_pContext, "SET %s %s", key.c_str(), value.c_str());
if (!reply)
{
log_error("redisCommand failed:%s\n", m_pContext->errstr);
redisFree(m_pContext);
m_pContext = NULL;
return ret_value;
}
ret_value.append(reply->str, reply->len);
freeReplyObject(reply); // 释放资源
return ret_value;
}
string CacheConn::setex(string key, int timeout, string value)
{
string ret_value;
if (Init())
{
return ret_value;
}
redisReply *reply = (redisReply *)redisCommand(m_pContext, "SETEX %s %d %s", key.c_str(), timeout, value.c_str());
if (!reply)
{
log_error("redisCommand failed:%s\n", m_pContext->errstr);
redisFree(m_pContext);
m_pContext = NULL;
return ret_value;
}
ret_value.append(reply->str, reply->len);
freeReplyObject(reply);
return ret_value;
}
bool CacheConn::mget(const vector<string> &keys, map<string, string> &ret_value)
{
if (Init())
{
return false;
}
if (keys.empty())
{
return false;
}
string strKey;
bool bFirst = true;
for (vector<string>::const_iterator it = keys.begin(); it != keys.end(); ++it)
{
if (bFirst)
{
bFirst = false;
strKey = *it;
}
else
{
strKey += " " + *it;
}
}
if (strKey.empty())
{
return false;
}
strKey = "MGET " + strKey;
redisReply *reply = (redisReply *)redisCommand(m_pContext, strKey.c_str());
if (!reply)
{
log_info("redisCommand failed:%s\n", m_pContext->errstr);
redisFree(m_pContext);
m_pContext = NULL;
return false;
}
if (reply->type == REDIS_REPLY_ARRAY)
{
for (size_t i = 0; i < reply->elements; ++i)
{
redisReply *child_reply = reply->element[i];
if (child_reply->type == REDIS_REPLY_STRING)
{
ret_value[keys[i]] = child_reply->str;
}
}
}
freeReplyObject(reply);
return true;
}
bool CacheConn::isExists(string &key)
{
if (Init())
{
return false;
}
redisReply *reply = (redisReply *)redisCommand(m_pContext, "EXISTS %s", key.c_str());
if (!reply)
{
log_error("redisCommand failed:%s\n", m_pContext->errstr);
redisFree(m_pContext);
m_pContext = NULL;
return false;
}
long ret_value = reply->integer;
freeReplyObject(reply);
if (0 == ret_value)
{
return false;
}
else
{
return true;
}
}
long CacheConn::del(string &key)
{
if (Init())
{
return 0;
}
redisReply *reply = (redisReply *)redisCommand(m_pContext, "DEL %s", key.c_str());
if (!reply)
{
log_error("redisCommand failed:%s\n", m_pContext->errstr);
redisFree(m_pContext);
m_pContext = NULL;
return 0;
}
long ret_value = reply->integer;
freeReplyObject(reply);
return ret_value;
}
long CacheConn::hdel(string key, string field)
{
if (Init())
{
return 0;
}
redisReply *reply = (redisReply *)redisCommand(m_pContext, "HDEL %s %s", key.c_str(), field.c_str());
if (!reply)
{
log_error("redisCommand failed:%s\n", m_pContext->errstr);
redisFree(m_pContext);
m_pContext = NULL;
return 0;
}
long ret_value = reply->integer;
freeReplyObject(reply);
return ret_value;
}
string CacheConn::hget(string key, string field)
{
string ret_value;
if (Init())
{
return ret_value;
}
redisReply *reply = (redisReply *)redisCommand(m_pContext, "HGET %s %s", key.c_str(), field.c_str());
if (!reply)
{
log_error("redisCommand failed:%s\n", m_pContext->errstr);
redisFree(m_pContext);
m_pContext = NULL;
return ret_value;
}
if (reply->type == REDIS_REPLY_STRING)
{
ret_value.append(reply->str, reply->len);
}
freeReplyObject(reply);
return ret_value;
}
bool CacheConn::hgetAll(string key, map<string, string> &ret_value)
{
if (Init())
{
return false;
}
redisReply *reply = (redisReply *)redisCommand(m_pContext, "HGETALL %s", key.c_str());
if (!reply)
{
log_error("redisCommand failed:%s\n", m_pContext->errstr);
redisFree(m_pContext);
m_pContext = NULL;
return false;
}
if ((reply->type == REDIS_REPLY_ARRAY) && (reply->elements % 2 == 0))
{
for (size_t i = 0; i < reply->elements; i += 2)
{
redisReply *field_reply = reply->element[i];
redisReply *value_reply = reply->element[i + 1];
string field(field_reply->str, field_reply->len);
string value(value_reply->str, value_reply->len);
ret_value.insert(make_pair(field, value));
}
}
freeReplyObject(reply);
return true;
}
long CacheConn::hset(string key, string field, string value)
{
if (Init())
{
return -1;
}
redisReply *reply = (redisReply *)redisCommand(m_pContext, "HSET %s %s %s", key.c_str(), field.c_str(), value.c_str());
if (!reply)
{
log_error("redisCommand failed:%s\n", m_pContext->errstr);
redisFree(m_pContext);
m_pContext = NULL;
return -1;
}
long ret_value = reply->integer;
freeReplyObject(reply);
return ret_value;
}
long CacheConn::hincrBy(string key, string field, long value)
{
if (Init())
{
return -1;
}
redisReply *reply = (redisReply *)redisCommand(m_pContext, "HINCRBY %s %s %ld", key.c_str(), field.c_str(), value);
if (!reply)
{
log_error("redisCommand failed:%s\n", m_pContext->errstr);
redisFree(m_pContext);
m_pContext = NULL;
return -1;
}
long ret_value = reply->integer;
freeReplyObject(reply);
return ret_value;
}
long CacheConn::incrBy(string key, long value)
{
if (Init())
{
return -1;
}
redisReply *reply = (redisReply *)redisCommand(m_pContext, "INCRBY %s %ld", key.c_str(), value);
if (!reply)
{
log_error("redis Command failed:%s\n", m_pContext->errstr);
redisFree(m_pContext);
m_pContext = NULL;
return -1;
}
long ret_value = reply->integer;
freeReplyObject(reply);
return ret_value;
}
string CacheConn::hmset(string key, map<string, string> &hash)
{
string ret_value;
if (Init())
{
return ret_value;
}
int argc = hash.size() * 2 + 2;
const char **argv = new const char *[argc];
if (!argv)
{
return ret_value;
}
argv[0] = "HMSET";
argv[1] = key.c_str();
int i = 2;
for (map<string, string>::iterator it = hash.begin(); it != hash.end(); it++)
{
argv[i++] = it->first.c_str();
argv[i++] = it->second.c_str();
}
redisReply *reply = (redisReply *)redisCommandArgv(m_pContext, argc, argv, NULL);
if (!reply)
{
log_error("redisCommand failed:%s\n", m_pContext->errstr);
delete[] argv;
redisFree(m_pContext);
m_pContext = NULL;
return ret_value;
}
ret_value.append(reply->str, reply->len);
delete[] argv;
freeReplyObject(reply);
return ret_value;
}
bool CacheConn::hmget(string key, list<string> &fields, list<string> &ret_value)
{
if (Init())
{
return false;
}
int argc = fields.size() + 2;
const char **argv = new const char *[argc];
if (!argv)
{
return false;
}
argv[0] = "HMGET";
argv[1] = key.c_str();
int i = 2;
for (list<string>::iterator it = fields.begin(); it != fields.end(); it++)
{
argv[i++] = it->c_str();
}
redisReply *reply = (redisReply *)redisCommandArgv(m_pContext, argc, (const char **)argv, NULL);
if (!reply)
{
log_error("redisCommand failed:%s\n", m_pContext->errstr);
delete[] argv;
redisFree(m_pContext);
m_pContext = NULL;
return false;
}
if (reply->type == REDIS_REPLY_ARRAY)
{
for (size_t i = 0; i < reply->elements; i++)
{
redisReply *value_reply = reply->element[i];
string value(value_reply->str, value_reply->len);
ret_value.push_back(value);
}
}
delete[] argv;
freeReplyObject(reply);
return true;
}
long CacheConn::incr(string key)
{
if (Init())
{
return -1;
}
redisReply *reply = (redisReply *)redisCommand(m_pContext, "INCR %s", key.c_str());
if (!reply)
{
log_error("redis Command failed:%s\n", m_pContext->errstr);
redisFree(m_pContext);
m_pContext = NULL;
return -1;
}
long ret_value = reply->integer;
freeReplyObject(reply);
return ret_value;
}
long CacheConn::decr(string key)
{
if (Init())
{
return -1;
}
redisReply *reply = (redisReply *)redisCommand(m_pContext, "DECR %s", key.c_str());
if (!reply)
{
log_error("redis Command failed:%s\n", m_pContext->errstr);
redisFree(m_pContext);
m_pContext = NULL;
return -1;
}
long ret_value = reply->integer;
freeReplyObject(reply);
return ret_value;
}
long CacheConn::lpush(string key, string value)
{
if (Init())
{
return -1;
}
redisReply *reply = (redisReply *)redisCommand(m_pContext, "LPUSH %s %s", key.c_str(), value.c_str());
if (!reply)
{
log_error("redisCommand failed:%s\n", m_pContext->errstr);
redisFree(m_pContext);
m_pContext = NULL;
return -1;
}
long ret_value = reply->integer;
freeReplyObject(reply);
return ret_value;
}
long CacheConn::rpush(string key, string value)
{
if (Init())
{
return -1;
}
redisReply *reply = (redisReply *)redisCommand(m_pContext, "RPUSH %s %s", key.c_str(), value.c_str());
if (!reply)
{
log_error("redisCommand failed:%s\n", m_pContext->errstr);
redisFree(m_pContext);
m_pContext = NULL;
return -1;
}
long ret_value = reply->integer;
freeReplyObject(reply);
return ret_value;
}
long CacheConn::llen(string key)
{
if (Init())
{
return -1;
}
redisReply *reply = (redisReply *)redisCommand(m_pContext, "LLEN %s", key.c_str());
if (!reply)
{
log_error("redisCommand failed:%s\n", m_pContext->errstr);
redisFree(m_pContext);
m_pContext = NULL;
return -1;
}
long ret_value = reply->integer;
freeReplyObject(reply);
return ret_value;
}
bool CacheConn::lrange(string key, long start, long end, list<string> &ret_value)
{
if (Init())
{
return false;
}
redisReply *reply = (redisReply *)redisCommand(m_pContext, "LRANGE %s %d %d", key.c_str(), start, end);
if (!reply)
{
log_error("redisCommand failed:%s\n", m_pContext->errstr);
redisFree(m_pContext);
m_pContext = NULL;
return false;
}
if (reply->type == REDIS_REPLY_ARRAY)
{
for (size_t i = 0; i < reply->elements; i++)
{
redisReply *value_reply = reply->element[i];
string value(value_reply->str, value_reply->len);
ret_value.push_back(value);
}
}
freeReplyObject(reply);
return true;
}
bool CacheConn::flushdb()
{
bool ret = false;
if (Init())
{
return false;
}
redisReply *reply = (redisReply *)redisCommand(m_pContext, "FLUSHDB");
if (!reply)
{
log_error("redisCommand failed:%s\n", m_pContext->errstr);
redisFree(m_pContext);
m_pContext = NULL;
return false;
}
if (reply->type == REDIS_REPLY_STRING && strncmp(reply->str, "OK", 2) == 0)
{
ret = true;
}
freeReplyObject(reply);
return ret;
}
///////////////
CachePool::CachePool(const char *pool_name, const char *server_ip, int server_port, int db_index,
const char *password, int max_conn_cnt)
{
m_pool_name = pool_name;
m_server_ip = server_ip;
m_server_port = server_port;
m_db_index = db_index;
m_password = password;
m_max_conn_cnt = max_conn_cnt;
m_cur_conn_cnt = MIN_CACHE_CONN_CNT;
}
CachePool::~CachePool()
{
m_free_notify.Lock();
for (list<CacheConn *>::iterator it = m_free_list.begin(); it != m_free_list.end(); it++)
{
CacheConn *pConn = *it;
delete pConn;
}
m_free_list.clear();
m_cur_conn_cnt = 0;
m_free_notify.Unlock();
}
int CachePool::Init()
{
for (int i = 0; i < m_cur_conn_cnt; i++)
{
CacheConn *pConn = new CacheConn(m_server_ip.c_str(), m_server_port,
m_db_index, m_password.c_str(), m_pool_name.c_str());
if (pConn->Init())
{
delete pConn;
return 1;
}
m_free_list.push_back(pConn);
}
log_info("cache pool: %s, list size: %lu\n", m_pool_name.c_str(), m_free_list.size());
return 0;
}
CacheConn *CachePool::GetCacheConn()
{
m_free_notify.Lock();
while (m_free_list.empty())
{
if (m_cur_conn_cnt >= m_max_conn_cnt)
{
m_free_notify.Wait();
}
else
{
CacheConn *p_cache_conn = new CacheConn(m_server_ip.c_str(), m_server_port,
m_db_index, m_password.c_str(), m_pool_name.c_str());
int ret = p_cache_conn->Init();
if (ret)
{
log_error("Init CacheConn failed\n");
delete p_cache_conn;
m_free_notify.Unlock();
return NULL;
}
else
{
m_free_list.push_back(p_cache_conn);
m_cur_conn_cnt++;
log_info("new cache connection: %s, conn_cnt: %d\n", m_pool_name.c_str(), m_cur_conn_cnt);
}
}
}
CacheConn *pConn = m_free_list.front();
m_free_list.pop_front();
m_free_notify.Unlock();
return pConn;
}
void CachePool::RelCacheConn(CacheConn *p_cache_conn)
{
m_free_notify.Lock();
list<CacheConn *>::iterator it = m_free_list.begin();
for (; it != m_free_list.end(); it++)
{
if (*it == p_cache_conn)
{
break;
}
}
if (it == m_free_list.end())
{
m_free_list.push_back(p_cache_conn);
}
m_free_notify.Signal();
m_free_notify.Unlock();
}