| 注册
请输入搜索内容

热门搜索

Java Linux MySQL PHP JavaScript Hibernate jQuery Nginx
mx3y
9年前发布

mongodb连接池c++ 封装

linux平台下mongodb c++连接池封装,线程安全

//函数返回0:成功 >0 出错  class cmongo{      public:          //默认构造函数,默认连接数为1          cmongo();          //传入连接数到构造函数,默认连接数为size          cmongo(int size);          //析构函数          ~cmongo();      public:          //设置tcp读写超时时间          int set_wr_timeout(double t);          //连接          int conn(string mhost="127.0.0.1",int mport=27017);          //设置db collection          int setdb(string mdb,string mcollection);             int setindex(string key);          //查询          int get(map<string,string>& out,vector<string> in,string key,string key_val);          //投递一批要查询的字段,fields为要查询哪些字段          int gets(map< string,map<string,string> >& rout,vector<string> fields,vector<string> in,string key);          //dump key-value dumpkey对应一个value          int dumpkey(map< string,string >& rout,string key,string val);          //dump key->map<key,value> dumpkey对应一组value          int dumpvals(map< string,map<string,string> >& rout,vector<string> in,string key);          //写入          int set(map<string,string> in,string key,string key_val);          //批量写入          //更新接口,批量更新key="id"          //  "123456":<key,value>,<key,value>          //  "123457":<key,value>,<key,value>          int sets(map< string,map<string,string> > in,string key);          //删除          int remove(string key,string key_val);      private:          string doc;          //tcp读写超时时间          double wr_timeout;          pthread_mutex_t _jobmux;          sem_t _jobsem;          map<DBClientConnection*,bool> _joblst;          pthread_mutex_t _dbmux;     };           cmongo::cmongo(int size){      //doc      doc=string(DB_DB)+"."+string(DB_COLLECTION);      wr_timeout=3;      //最大连接0-200      if(size<0){          size=1;      }      if(size>200){          size=200;      }      if(_joblst.size()>0){          return;      }      bool auto_conn=true;      pthread_mutex_init(&_jobmux,NULL);      if((sem_init(&_jobsem,0,0))<0){          return;      }      pthread_mutex_lock(&_jobmux);      for(int i=0;i<size;++i){          DBClientConnection* pconn = new DBClientConnection(auto_conn,0,wr_timeout);          if(pconn != NULL){              _joblst[pconn]=false;          }      }      pthread_mutex_unlock(&_jobmux);        }  cmongo::~cmongo(){      doc="";      pthread_mutex_lock(&_jobmux);      map<DBClientConnection*,bool>::iterator it=_joblst.begin();      while(it != _joblst.end()){          delete it->first;          it++;      }      pthread_mutex_unlock(&_jobmux);  }  int cmongo::set_wr_timeout(double t){      wr_timeout=t;      return RET_OK;  }  int cmongo::conn(string mhost,int mport){      pthread_mutex_lock(&_jobmux);      map<DBClientConnection*,bool>::iterator it=_joblst.begin();      while(it!=_joblst.end()){          string errmsg="";          HostAndPort hp(mhost,mport);          if(!(it->first->connect(hp,errmsg))){              cerr<<"connect mhost:"<<mhost<<" mport:"<<mport<<" msg:"<<errmsg<<endl;              it->second=true;          }          sem_post(&_jobsem);          it++;      }      pthread_mutex_unlock(&_jobmux);      return RET_OK;  }  int cmongo::setdb(string mdb,string mcollection){      if(mdb.empty() || mcollection.empty()){          return RET_PARERR;      }      doc=mdb+"."+mcollection;      return RET_OK;  }  int cmongo::setindex(string key){      if(key.empty()){          return RET_PARERR;      }        sem_wait(&_jobsem);      pthread_mutex_lock(&_jobmux);      map<DBClientConnection*,bool>::iterator it=_joblst.begin();      while(it!=_joblst.end()){          if(it->second == false){              it->second=true;              break;          }          it++;      }      pthread_mutex_unlock(&_jobmux);      string bindex="{"+key+":1}";      it->first->ensureIndex(doc,fromjson(bindex));         pthread_mutex_lock(&_jobmux);      it->second=false;      pthread_mutex_unlock(&_jobmux);      sem_post(&_jobsem);         return RET_OK;  }  //out为检索出来的key-value数据对应,in 为要检索的字段,key,key_value为要检索的条件,暂不支持多条件检索  //单列查询  int cmongo::get(map<string,string>& out,vector<string> in,string key,string key_val){      //key key_val 要检索字段      if(key.empty() || key_val.empty() || in.size()<=0){          return RET_PARERR;      }      BSONObjBuilder b;      for(vector<string>::iterator iter=in.begin();iter!=in.end();++iter){          b.append(*iter,1);      }         sem_wait(&_jobsem);      pthread_mutex_lock(&_jobmux);      map<DBClientConnection*,bool>::iterator it=_joblst.begin();      while(it!=_joblst.end()){          if(it->second == false){              it->second=true;              break;          }          it++;      }      pthread_mutex_unlock(&_jobmux);      BSONObj ob=b.obj();         BSONObj p=it->first->findOne(doc,QUERY(key<<key_val),&ob);         map<string,string> temp;      for(vector<string>::iterator iter=in.begin();iter!=in.end();++iter){          string mkey=*iter;          temp[*iter]=p.getStringField(mkey.c_str());      }      out=temp;         pthread_mutex_lock(&_jobmux);      it->second=false;      pthread_mutex_unlock(&_jobmux);      sem_post(&_jobsem);         return RET_OK;  }  //查询key为key的一批数据的 某些字段  //fields为要查询的字段集  //key="id" 值为in 一批key  //返回key->map<key,value>  int cmongo::gets(map< string,map<string,string> >& rout,vector<string> fields,vector<string> in,string key){      if(key.empty()){          return RET_PARERR;      }      sem_wait(&_jobsem);      pthread_mutex_lock(&_jobmux);      map<DBClientConnection*,bool>::iterator it=_joblst.begin();      while(it!=_joblst.end()){          if(it->second == false){              it->second=true;              break;          }          it++;      }      pthread_mutex_unlock(&_jobmux);         BSONObjBuilder b;      b.append(key,1);      for(vector<string>::iterator iter=fields.begin();iter!=fields.end();++iter){          b.append(*iter,1);      }         BSONObj p=b.obj();      for(vector<string>::iterator iter2=in.begin();iter2!=in.end();++iter2){          BSONObj ob=it->first->findOne(doc,QUERY(key<<*iter2),&p);          map<string,string> temp;          for(vector<string>::iterator iter=fields.begin();iter!=fields.end();++iter){              string mkey=*iter;              temp[*iter]=ob.getStringField(mkey.c_str());             }          rout[ob.getStringField(key.c_str())]=temp;      }         pthread_mutex_lock(&_jobmux);      it->second=false;      pthread_mutex_unlock(&_jobmux);      sem_post(&_jobsem);         return RET_OK;  }  //dumpkey key-value 返回 key对应的val值  //key val  int cmongo::dumpkey(map< string,string >& rout,string key,string val){      if(key.empty()){          return RET_PARERR;      }      sem_wait(&_jobsem);      pthread_mutex_lock(&_jobmux);      map<DBClientConnection*,bool>::iterator it=_joblst.begin();      while(it!=_joblst.end()){          if(it->second == false){              it->second=true;              break;          }          it++;      }      pthread_mutex_unlock(&_jobmux);         BSONObjBuilder b;      b.append(key,1);      if(!val.empty()){          b.append(val,1);      }         BSONObj p=b.obj();         pthread_mutex_lock(&_dbmux);      auto_ptr<DBClientCursor> cursor = it->first->query(doc,Query(),0,0,&p);      while(cursor->more()){          BSONObj ob=cursor->next();          rout[ob.getStringField(key.c_str())]=ob.getStringField(val.c_str());      }      pthread_mutex_unlock(&_dbmux);         pthread_mutex_lock(&_jobmux);      it->second=false;      pthread_mutex_unlock(&_jobmux);      sem_post(&_jobsem);         return RET_OK;  }  //dumpkey key对应多个value  //key->map<key,value>.  //其实dumpvals接口完全可以包含dumpkey,为了方便运用独立出来  //out 返回的key 对应的map<key,value>  //in 每个key需要对应的返回哪些字段  //key="id"  int cmongo::dumpvals(map< string,map<string,string> >& rout,vector<string> in,string key){      if(key.empty()){          return RET_PARERR;      }      sem_wait(&_jobsem);      pthread_mutex_lock(&_jobmux);      map<DBClientConnection*,bool>::iterator it=_joblst.begin();      while(it!=_joblst.end()){          if(it->second == false){              it->second=true;              break;          }          it++;      }      pthread_mutex_unlock(&_jobmux);         BSONObjBuilder b;      b.append(key,1);      for(vector<string>::iterator iter=in.begin();iter!=in.end();++iter){          b.append(*iter,1);      }         BSONObj p=b.obj();         pthread_mutex_lock(&_dbmux);      auto_ptr<DBClientCursor> cursor = it->first->query(doc,Query(),0,0,&p);      while(cursor->more()){          BSONObj ob=cursor->next();          map<string,string> temp;          for(vector<string>::iterator iter=in.begin();iter!=in.end();++iter){              string val=*iter;              temp[val]=ob.getStringField(val.c_str());          }          rout[ob.getStringField(key.c_str())]=temp;          temp.clear();      }      pthread_mutex_unlock(&_dbmux);         pthread_mutex_lock(&_jobmux);      it->second=false;      pthread_mutex_unlock(&_jobmux);      sem_post(&_jobsem);         return RET_OK;  }  //更新接口,暂不支持key对应多条记录的更新  int cmongo::set(map<string,string> in,string key,string key_val){      //如果map没有数据,返回参数错误      if(in.size()<=0 || key.empty() || key_val.empty()){          return RET_PARERR;      }      BSONObjBuilder b;      map<string,string>::iterator iter;      for(iter=in.begin();iter!=in.end();++iter){          b.append(iter->first,iter->second);      }             sem_wait(&_jobsem);      pthread_mutex_lock(&_jobmux);      map<DBClientConnection*,bool>::iterator it=_joblst.begin();      while(it!=_joblst.end()){          if(it->second == false){              it->second=true;              break;          }          it++;      }      pthread_mutex_unlock(&_jobmux);      BSONObj ob=b.obj();      it->first->update(doc,QUERY(key<<key_val),BSON("$set"<<ob),true);         int ret=RET_OK;      string errmsg=it->first->getLastError();      if(!errmsg.empty()){          ret=RET_ERR;      }         pthread_mutex_lock(&_jobmux);      it->second=false;      pthread_mutex_unlock(&_jobmux);      sem_post(&_jobsem);         return ret;  }  //更新接口,批量更新key="id"  //  "123456":<key,value>,<key,value>  //  "123457":<key,value>,<key,value>  int cmongo::sets(map< string,map<string,string> > in,string key){      //如果map没有数据,返回参数错误      if(in.size()<=0 || key.empty() ){          return RET_PARERR;      }         sem_wait(&_jobsem);      pthread_mutex_lock(&_jobmux);      map<DBClientConnection*,bool>::iterator it=_joblst.begin();      while(it!=_joblst.end()){          if(it->second == false){              it->second=true;              break;          }          it++;      }      pthread_mutex_unlock(&_jobmux);         int ret=RET_OK;      map< string,map<string,string> >::iterator iter;      for(iter=in.begin();iter!=in.end();++iter){          BSONObjBuilder b;          for(map<string,string>::iterator iter2=iter->second.begin();iter2!=iter->second.end();++iter2){              b.append(iter2->first,iter2->second);          }          BSONObj ob=b.obj();          it->first->update(doc,QUERY(key<<iter->first),BSON("$set"<<ob),true);          string errmsg=it->first->getLastError();          if(!errmsg.empty()){              ret=RET_ERR;          }      }                pthread_mutex_lock(&_jobmux);      it->second=false;      pthread_mutex_unlock(&_jobmux);      sem_post(&_jobsem);         return ret;  }  //删除接口,删除记录 key=id key_val=587.即删除id="587"的记录  int cmongo::remove(string key,string key_val){         if(key.empty() || key_val.empty()){          return RET_PARERR;      }      sem_wait(&_jobsem);      pthread_mutex_lock(&_jobmux);      map<DBClientConnection*,bool>::iterator it=_joblst.begin();      while(it!=_joblst.end()){          if(it->second == false){              it->second=true;              break;          }          it++;      }      pthread_mutex_unlock(&_jobmux);         it->first->remove(doc,BSON(key << key_val));         pthread_mutex_lock(&_jobmux);      it->second=false;      pthread_mutex_unlock(&_jobmux);      sem_post(&_jobsem);         return RET_OK;  }