| 注册
请输入搜索内容

热门搜索

Java Linux MySQL PHP JavaScript Hibernate jQuery Nginx
jopen
8年前发布

基于消息机制的异步架构之消息队列

消息队列的头文件msgqueue.h    /*   * msgqueue.h   *   */      #ifndef MSGQUEUE_H_  #define MSGQUEUE_H_      #include "conn.h"      #define MAX_MSG_LENGTH (1024)  typedef struct MSG {  char buf[MAX_MSG_LENGTH];  uint16 buf_len;  CONN* c;  struct MSG* next;      } MSG;      typedef struct MSG_QUEUE {  MSG *head;  MSG *tail;  int size;  } MSG_QUEUE;      MSG_QUEUE* create_msg_queue();      extern int push_msg(MSG_QUEUE *q,CONN* c);      int empty_msg_queue( MSG_QUEUE *q);      int  get_msg(MSG_QUEUE *q ,MSG* temp_msg);      #endif /* MSGQUEUE_H_ */      消息队列的实现文件 * msgqueue.c    /*   * msgqueue.c   *   */      #include "msgqueue.h"      MSG_QUEUE* create_msg_queue()  {      MSG_QUEUE* qmsg=malloc(sizeof(MSG_QUEUE)); /*申请头尾指针结点*/  if(qmsg==NULL){  fprintf(stderr, "create_msg_queue malloc error, errno: %d %m\n", errno);  return NULL;  }  MSG* msg=malloc(sizeof(MSG)); /*申请链队头结点*/  if(msg==NULL){  fprintf(stderr, "create_head_msg malloc error, errno: %d %m\n", errno);  free(qmsg);  return NULL;  }      msg->next=NULL;  qmsg->head=qmsg->tail=msg;  qmsg->size=0;  return qmsg;  }           int  push_msg(MSG_QUEUE *q,CONN* c)  {  if(q->size>10240){  fprintf(stderr, "push_msg error,msg_queue size is over %d ,errno: %d %m\n",q->size, errno);  return -1;  }      MSG *msg;  msg=malloc(sizeof(MSG)); /*申请新结点*/  if(msg==NULL){  fprintf(stderr, "create_new_msg malloc error, errno: %d %m\n", errno);  return -1;  }  memcpy(msg->buf,c->in_buf,c->in_buf_len);  msg->buf_len=c->in_buf_len;  msg->c=c;  msg->next=NULL;  q->tail->next=msg;  q->tail=msg;  q->size++;      return 0;  }      int empty_msg_queue( MSG_QUEUE *q)  {  if (q->head==q->tail)  return 0;  else  return 1;  }          int  get_msg(MSG_QUEUE *q ,MSG* temp_msg)  {  if (q->head==q->tail){  return -1;  }  MSG *temp=q->head->next;      if(q->head->next == q->tail) //如果要出队的结点为最后一个结点,使q->rear指向头结点防止出现悬空的指针          q->tail = q->head;         // msg_buf = temp->buf; //将出队的数据元素存入*e      temp_msg->c=temp->c;      memcpy(temp_msg->buf,temp->buf,temp->buf_len);      temp_msg->buf_len=temp->buf_len;          q->head->next = temp->next; //使下一个结点成为队头,如果没有下一个结点则为NULL  q->size--;  //int buf_len=temp->buf_len;      free(temp); //删除要出队的结点  return 0;      }