mongodb连接池c++ 封装
linux平台下mongodb c++连接池封装,线程安全
//函数返回0:成功 >0 出错 class cmongo{ public: //默认构造函数,默认连接数为1 cmongo(); //传入连接数到构造函数,默认连接数为size cmongo(int size); //析构函数 ~cmongo(); public: //设置tcp读写超时时间 int set_wr_timeout(double t); //连接 int conn(string mhost="127.0.0.1",int mport=27017); //设置db collection int setdb(string mdb,string mcollection); int setindex(string key); //查询 int get(map<string,string>& out,vector<string> in,string key,string key_val); //投递一批要查询的字段,fields为要查询哪些字段 int gets(map< string,map<string,string> >& rout,vector<string> fields,vector<string> in,string key); //dump key-value dumpkey对应一个value int dumpkey(map< string,string >& rout,string key,string val); //dump key->map<key,value> dumpkey对应一组value int dumpvals(map< string,map<string,string> >& rout,vector<string> in,string key); //写入 int set(map<string,string> in,string key,string key_val); //批量写入 //更新接口,批量更新key="id" // "123456":<key,value>,<key,value> // "123457":<key,value>,<key,value> int sets(map< string,map<string,string> > in,string key); //删除 int remove(string key,string key_val); private: string doc; //tcp读写超时时间 double wr_timeout; pthread_mutex_t _jobmux; sem_t _jobsem; map<DBClientConnection*,bool> _joblst; pthread_mutex_t _dbmux; }; cmongo::cmongo(int size){ //doc doc=string(DB_DB)+"."+string(DB_COLLECTION); wr_timeout=3; //最大连接0-200 if(size<0){ size=1; } if(size>200){ size=200; } if(_joblst.size()>0){ return; } bool auto_conn=true; pthread_mutex_init(&_jobmux,NULL); if((sem_init(&_jobsem,0,0))<0){ return; } pthread_mutex_lock(&_jobmux); for(int i=0;i<size;++i){ DBClientConnection* pconn = new DBClientConnection(auto_conn,0,wr_timeout); if(pconn != NULL){ _joblst[pconn]=false; } } pthread_mutex_unlock(&_jobmux); } cmongo::~cmongo(){ doc=""; pthread_mutex_lock(&_jobmux); map<DBClientConnection*,bool>::iterator it=_joblst.begin(); while(it != _joblst.end()){ delete it->first; it++; } pthread_mutex_unlock(&_jobmux); } int cmongo::set_wr_timeout(double t){ wr_timeout=t; return RET_OK; } int cmongo::conn(string mhost,int mport){ pthread_mutex_lock(&_jobmux); map<DBClientConnection*,bool>::iterator it=_joblst.begin(); while(it!=_joblst.end()){ string errmsg=""; HostAndPort hp(mhost,mport); if(!(it->first->connect(hp,errmsg))){ cerr<<"connect mhost:"<<mhost<<" mport:"<<mport<<" msg:"<<errmsg<<endl; it->second=true; } sem_post(&_jobsem); it++; } pthread_mutex_unlock(&_jobmux); return RET_OK; } int cmongo::setdb(string mdb,string mcollection){ if(mdb.empty() || mcollection.empty()){ return RET_PARERR; } doc=mdb+"."+mcollection; return RET_OK; } int cmongo::setindex(string key){ if(key.empty()){ return RET_PARERR; } sem_wait(&_jobsem); pthread_mutex_lock(&_jobmux); map<DBClientConnection*,bool>::iterator it=_joblst.begin(); while(it!=_joblst.end()){ if(it->second == false){ it->second=true; break; } it++; } pthread_mutex_unlock(&_jobmux); string bindex="{"+key+":1}"; it->first->ensureIndex(doc,fromjson(bindex)); pthread_mutex_lock(&_jobmux); it->second=false; pthread_mutex_unlock(&_jobmux); sem_post(&_jobsem); return RET_OK; } //out为检索出来的key-value数据对应,in 为要检索的字段,key,key_value为要检索的条件,暂不支持多条件检索 //单列查询 int cmongo::get(map<string,string>& out,vector<string> in,string key,string key_val){ //key key_val 要检索字段 if(key.empty() || key_val.empty() || in.size()<=0){ return RET_PARERR; } BSONObjBuilder b; for(vector<string>::iterator iter=in.begin();iter!=in.end();++iter){ b.append(*iter,1); } sem_wait(&_jobsem); pthread_mutex_lock(&_jobmux); map<DBClientConnection*,bool>::iterator it=_joblst.begin(); while(it!=_joblst.end()){ if(it->second == false){ it->second=true; break; } it++; } pthread_mutex_unlock(&_jobmux); BSONObj ob=b.obj(); BSONObj p=it->first->findOne(doc,QUERY(key<<key_val),&ob); map<string,string> temp; for(vector<string>::iterator iter=in.begin();iter!=in.end();++iter){ string mkey=*iter; temp[*iter]=p.getStringField(mkey.c_str()); } out=temp; pthread_mutex_lock(&_jobmux); it->second=false; pthread_mutex_unlock(&_jobmux); sem_post(&_jobsem); return RET_OK; } //查询key为key的一批数据的 某些字段 //fields为要查询的字段集 //key="id" 值为in 一批key //返回key->map<key,value> int cmongo::gets(map< string,map<string,string> >& rout,vector<string> fields,vector<string> in,string key){ if(key.empty()){ return RET_PARERR; } sem_wait(&_jobsem); pthread_mutex_lock(&_jobmux); map<DBClientConnection*,bool>::iterator it=_joblst.begin(); while(it!=_joblst.end()){ if(it->second == false){ it->second=true; break; } it++; } pthread_mutex_unlock(&_jobmux); BSONObjBuilder b; b.append(key,1); for(vector<string>::iterator iter=fields.begin();iter!=fields.end();++iter){ b.append(*iter,1); } BSONObj p=b.obj(); for(vector<string>::iterator iter2=in.begin();iter2!=in.end();++iter2){ BSONObj ob=it->first->findOne(doc,QUERY(key<<*iter2),&p); map<string,string> temp; for(vector<string>::iterator iter=fields.begin();iter!=fields.end();++iter){ string mkey=*iter; temp[*iter]=ob.getStringField(mkey.c_str()); } rout[ob.getStringField(key.c_str())]=temp; } pthread_mutex_lock(&_jobmux); it->second=false; pthread_mutex_unlock(&_jobmux); sem_post(&_jobsem); return RET_OK; } //dumpkey key-value 返回 key对应的val值 //key val int cmongo::dumpkey(map< string,string >& rout,string key,string val){ if(key.empty()){ return RET_PARERR; } sem_wait(&_jobsem); pthread_mutex_lock(&_jobmux); map<DBClientConnection*,bool>::iterator it=_joblst.begin(); while(it!=_joblst.end()){ if(it->second == false){ it->second=true; break; } it++; } pthread_mutex_unlock(&_jobmux); BSONObjBuilder b; b.append(key,1); if(!val.empty()){ b.append(val,1); } BSONObj p=b.obj(); pthread_mutex_lock(&_dbmux); auto_ptr<DBClientCursor> cursor = it->first->query(doc,Query(),0,0,&p); while(cursor->more()){ BSONObj ob=cursor->next(); rout[ob.getStringField(key.c_str())]=ob.getStringField(val.c_str()); } pthread_mutex_unlock(&_dbmux); pthread_mutex_lock(&_jobmux); it->second=false; pthread_mutex_unlock(&_jobmux); sem_post(&_jobsem); return RET_OK; } //dumpkey key对应多个value //key->map<key,value>. //其实dumpvals接口完全可以包含dumpkey,为了方便运用独立出来 //out 返回的key 对应的map<key,value> //in 每个key需要对应的返回哪些字段 //key="id" int cmongo::dumpvals(map< string,map<string,string> >& rout,vector<string> in,string key){ if(key.empty()){ return RET_PARERR; } sem_wait(&_jobsem); pthread_mutex_lock(&_jobmux); map<DBClientConnection*,bool>::iterator it=_joblst.begin(); while(it!=_joblst.end()){ if(it->second == false){ it->second=true; break; } it++; } pthread_mutex_unlock(&_jobmux); BSONObjBuilder b; b.append(key,1); for(vector<string>::iterator iter=in.begin();iter!=in.end();++iter){ b.append(*iter,1); } BSONObj p=b.obj(); pthread_mutex_lock(&_dbmux); auto_ptr<DBClientCursor> cursor = it->first->query(doc,Query(),0,0,&p); while(cursor->more()){ BSONObj ob=cursor->next(); map<string,string> temp; for(vector<string>::iterator iter=in.begin();iter!=in.end();++iter){ string val=*iter; temp[val]=ob.getStringField(val.c_str()); } rout[ob.getStringField(key.c_str())]=temp; temp.clear(); } pthread_mutex_unlock(&_dbmux); pthread_mutex_lock(&_jobmux); it->second=false; pthread_mutex_unlock(&_jobmux); sem_post(&_jobsem); return RET_OK; } //更新接口,暂不支持key对应多条记录的更新 int cmongo::set(map<string,string> in,string key,string key_val){ //如果map没有数据,返回参数错误 if(in.size()<=0 || key.empty() || key_val.empty()){ return RET_PARERR; } BSONObjBuilder b; map<string,string>::iterator iter; for(iter=in.begin();iter!=in.end();++iter){ b.append(iter->first,iter->second); } sem_wait(&_jobsem); pthread_mutex_lock(&_jobmux); map<DBClientConnection*,bool>::iterator it=_joblst.begin(); while(it!=_joblst.end()){ if(it->second == false){ it->second=true; break; } it++; } pthread_mutex_unlock(&_jobmux); BSONObj ob=b.obj(); it->first->update(doc,QUERY(key<<key_val),BSON("$set"<<ob),true); int ret=RET_OK; string errmsg=it->first->getLastError(); if(!errmsg.empty()){ ret=RET_ERR; } pthread_mutex_lock(&_jobmux); it->second=false; pthread_mutex_unlock(&_jobmux); sem_post(&_jobsem); return ret; } //更新接口,批量更新key="id" // "123456":<key,value>,<key,value> // "123457":<key,value>,<key,value> int cmongo::sets(map< string,map<string,string> > in,string key){ //如果map没有数据,返回参数错误 if(in.size()<=0 || key.empty() ){ return RET_PARERR; } sem_wait(&_jobsem); pthread_mutex_lock(&_jobmux); map<DBClientConnection*,bool>::iterator it=_joblst.begin(); while(it!=_joblst.end()){ if(it->second == false){ it->second=true; break; } it++; } pthread_mutex_unlock(&_jobmux); int ret=RET_OK; map< string,map<string,string> >::iterator iter; for(iter=in.begin();iter!=in.end();++iter){ BSONObjBuilder b; for(map<string,string>::iterator iter2=iter->second.begin();iter2!=iter->second.end();++iter2){ b.append(iter2->first,iter2->second); } BSONObj ob=b.obj(); it->first->update(doc,QUERY(key<<iter->first),BSON("$set"<<ob),true); string errmsg=it->first->getLastError(); if(!errmsg.empty()){ ret=RET_ERR; } } pthread_mutex_lock(&_jobmux); it->second=false; pthread_mutex_unlock(&_jobmux); sem_post(&_jobsem); return ret; } //删除接口,删除记录 key=id key_val=587.即删除id="587"的记录 int cmongo::remove(string key,string key_val){ if(key.empty() || key_val.empty()){ return RET_PARERR; } sem_wait(&_jobsem); pthread_mutex_lock(&_jobmux); map<DBClientConnection*,bool>::iterator it=_joblst.begin(); while(it!=_joblst.end()){ if(it->second == false){ it->second=true; break; } it++; } pthread_mutex_unlock(&_jobmux); it->first->remove(doc,BSON(key << key_val)); pthread_mutex_lock(&_jobmux); it->second=false; pthread_mutex_unlock(&_jobmux); sem_post(&_jobsem); return RET_OK; }