Linux线程的创建和使用

cqm

贡献于2011-05-20

字数:23892 关键词: Linux Apache C/C++

线程的创建和使用 线程的创建是用下面的几个函数来实现的.   #include int pthread_create(pthread_t *thread,pthread_attr_t *attr, void *(*start_routine)(void *),void *arg); void pthread_exit(void *retval); int pthread_join(pthread *thread,void **thread_return);   pthread_create创建一个线程,thread是用来表明创建线程的ID,attr指出线程创建时候的属性,我们用NULL来表明使用缺省属性.start_routine函数指针是线程创建成功后开始执行的函数,arg是这个函数的唯一一个参数.表明传递给start_routine的参数. pthread_exit函数和exit函数类似用来退出线程.这个函数结束线程,释放函数的资源,并在最后阻塞,直到其他线程使用pthread_join函数等待它.然后将*retval的值传递给**thread_return.由于这个函数释放所以的函数资源,所以retval不能够指向函数的局部变量. pthread_join和wait调用一样用来等待指定的线程. 下面我们使用一个实例来解释一下使用方法.在实践中,我们经常要备份一些文件.下面这个程序可以实现当前目录下的所有文件备份.备份后的后缀名为bak   #include #include #include #include #include #include #include #include #include #include #include #define BUFFER 512 struct copy_file { int infile; int outfile; }; void *copy(void *arg) { int infile,outfile; int bytes_read,bytes_write,*bytes_copy_p; char buffer[BUFFER],*buffer_p; struct copy_file *file=(struct copy_file *)arg; infile=file->infile; outfile=file->outfile; /* 因为线程退出时,所有的变量空间都要被释放,所以我们只好自己分配内存了 */ if((bytes_copy_p=(int *)malloc(sizeof(int)))==NULL) pthread_exit(NULL); bytes_read=bytes_write=0; *bytes_copy_p=0; while((bytes_read=read(infile,buffer,BUFFER))!=0) { if((bytes_read==-1)&&(errno!=EINTR))break; else if(bytes_read>0) { buffer_p=buffer; while((bytes_write=write(outfile,buffer_p,bytes_read))!=0) { if((bytes_write==-1)&&(errno!=EINTR))break; else if(bytes_write==bytes_read)break; else if(bytes_write>0) { buffer_p+=bytes_write; bytes_read-=bytes_write; } } if(bytes_write==-1)break; *bytes_copy_p+=bytes_read; } } close(infile); close(outfile); pthread_exit(bytes_copy_p); } int main(int argc,char **argv) { pthread_t *thread; struct copy_file *file; int byte_copy,*byte_copy_p,num,i,j; char filename[BUFFER]; struct dirent **namelist; struct stat filestat; /* 得到当前路径下面所有的文件(包含目录)的个数 */ if((num=scandir(".",&namelist,0,alphasort))<0) { fprintf(stderr,"Get File Num Error:%s\n\a",strerror(errno)); exit(1); } /* 给线程分配空间,其实没有必要这么多的 */ if(((thread=(pthread_t *)malloc(sizeof(pthread_t)*num))==NULL)|| ((file=(struct copy_file *)malloc(sizeof(struct copy_file)*num))==NULL) ) { fprintf(stderr,"Out Of Memory!\n\a"); exit(1); } for(i=0,j=0;id_name); if(stat(filename,&filestat)==-1) { fprintf(stderr,"Get File Information:%s\n\a",strerror(errno)); exit(1); } /* 我们忽略目录 */ if(!S_ISREG(filestat.st_mode))continue; if((file[j].infile=open(filename,O_RDONLY))<0) { fprintf(stderr,"Open %s Error:%s\n\a",filename,strerror(errno)); continue; } strcat(filename,".bak"); if((file[j].outfile=open(filename,O_WRONLY|O_CREAT,S_IRUSR|S_IWUSR)) <0) { fprintf(stderr,"Creat %s Error:%s\n\a",filename,strerror(errno )); continue; } /* 创建线程,进行文件拷贝 */ if(pthread_create(&thread[j],NULL,copy,(void *)&file[j])!=0) fprintf(stderr,"Create Thread[%d] Error:%s\n\a",i,strerror(errno)); j++; } byte_copy=0; for(i=0;i   int do_fork(unsigned long clone_flags, unsigned long stack_start,   struct pt_regs *regs, unsigned long stack_size)   其中的clone_flags取自以下宏的"或"值:      #define CSIGNAL0x000000ff/* signal mask to be sent at exit */   #define CLONE_VM0x00000100/* set if VM shared between processes */   #define CLONE_FS        0x00000200/* set if fs info shared between processes */   #define CLONE_FILES     0x00000400/* set if open files shared between processes */   #define CLONE_SIGHAND0x00000800/* set if signal handlers and blocked signals shared */   #define CLONE_PID0x00001000/* set if pid shared */   #define CLONE_PTRACE0x00002000/* set if we want to let tracing continue on the child too */   #define CLONE_VFORK0x00004000/* set if the parent wants the child to wake it up on mm_release */   #define CLONE_PARENT0x00008000/* set if we want to have the same parent as the cloner */   #define CLONE_THREAD0x00010000/* Same thread group? */   #define CLONE_NEWNS0x00020000/* New namespace group? */   #define CLONE_SIGNAL (CLONE_SIGHAND | CLONE_THREAD)   在do_fork()中,不同的clone_flags将导致不同的行为,对于LinuxThreads,它使用(CLONE_VM | CLONE_FS | CLONE_FILES | CLONE_SIGHAND)参数来调用clone()创建"线程",表示共享内存、共享文件系统访问计数、共享文件描述符表,以及共享信号处理方式。本节就针对这几个参数,看看Linux内核是如何实现这些资源的共享的。   1.CLONE_VM   do_fork()需要调用copy_mm()来设置task_struct中的mm和active_mm项,这两个mm_struct数据与进程所关联的内存空间相对应。如果do_fork()时指定了CLONE_VM开关,copy_mm()将把新的task_struct中的mm和active_mm设置成与current的相同,同时提高该mm_struct的使用者数目(mm_struct::mm_users)。也就是说,轻量级进程与父进程共享内存地址空间,由下图示意可以看出mm_struct在进程中的地位:   2.CLONE_FS   task_struct中利用fs(struct fs_struct *)记录了进程所在文件系统的根目录和当前目录信息,do_fork()时调用copy_fs()复制了这个结构;而对于轻量级进程则仅增加fs->count计数,与父进程共享相同的fs_struct。也就是说,轻量级进程没有独立的文件系统相关的信息,进程中任何一个线程改变当前目录、根目录等信息都将直接影响到其他线程。   3.CLONE_FILES   一个进程可能打开了一些文件,在进程结构task_struct中利用files(struct files_struct *)来保存进程打开的文件结构(struct file)信息,do_fork()中调用了copy_files()来处理这个进程属性;轻量级进程与父进程是共享该结构的,copy_files()时仅增加files->count计数。这一共享使得任何线程都能访问进程所维护的打开文件,对它们的操作会直接反映到进程中的其他线程。   4.CLONE_SIGHAND   每一个Linux进程都可以自行定义对信号的处理方式,在task_struct中的sig(struct signal_struct)中使用一个struct k_sigaction结构的数组来保存这个配置信息,do_fork()中的copy_sighand()负责复制该信息;轻量级进程不进行复制,而仅仅增加signal_struct::count计数,与父进程共享该结构。也就是说,子进程与父进程的信号处理方式完全相同,而且可以相互更改。   do_fork()中所做的工作很多,在此不详细描述。对于SMP系统,所有的进程fork出来后,都被分配到与父进程相同的cpu上,一直到该进程被调度时才会进行cpu选择。   尽管Linux支持轻量级进程,但并不能说它就支持核心级线程,因为Linux的"线程"和"进程"实际上处于一个调度层次,共享一个进程标识符空间,这种限制使得不可能在Linux上实现完全意义上的POSIX线程机制,因此众多的Linux线程库实现尝试都只能尽可能实现POSIX的绝大部分语义,并在功能上尽可能逼近。   三.LinuxThread的线程机制   LinuxThreads是目前Linux平台上使用最为广泛的线程库,由Xavier Leroy (Xavier.Leroy@inria.fr)负责开发完成,并已绑定在GLIBC中发行。它所实现的就是基于核心轻量级进程的"一对一"线程模型,一个线程实体对应一个核心轻量级进程,而线程之间的管理在核外函数库中实现。   1.线程描述数据结构及实现限制   LinuxThreads定义了一个struct _pthread_descr_struct数据结构来描述线程,并使用全局数组变量__pthread_handles来描述和引用进程所辖线程。在__pthread_handles中的前两项,LinuxThreads定义了两个全局的系统线程:__pthread_initial_thread和__pthread_manager_thread,并用__pthread_main_thread表征__pthread_manager_thread的父线程(初始为__pthread_initial_thread)。   struct _pthread_descr_struct是一个双环链表结构,__pthread_manager_thread所在的链表仅包括它一个元素,实际上,__pthread_manager_thread是一个特殊线程,LinuxThreads仅使用了其中的errno、p_pid、p_priority等三个域。而__pthread_main_thread所在的链则将进程中所有用户线程串在了一起。经过一系列pthread_create()之后形成的__pthread_handles数组将如下图所示:   新创建的线程将首先在__pthread_handles数组中占据一项,然后通过数据结构中的链指针连入以__pthread_main_thread为首指针的链表中。这个链表的使用在介绍线程的创建和释放的时候将提到。   LinuxThreads遵循POSIX1003.1c标准,其中对线程库的实现进行了一些范围限制,比如进程最大线程数,线程私有数据区大小等等。在LinuxThreads的实现中,基本遵循这些限制,但也进行了一定的改动,改动的趋势是放松或者说扩大这些限制,使编程更加方便。这些限定宏主要集中在sysdeps/unix/sysv/linux/bits/local_lim.h(不同平台使用的文件位置不同)中,包括如下几个:   每进程的私有数据key数,POSIX定义_POSIX_THREAD_KEYS_MAX为128,LinuxThreads使用PTHREAD_KEYS_MAX,1024;私有数据释放时允许执行的操作数,LinuxThreads与POSIX一致,定义PTHREAD_DESTRUCTOR_ITERATIONS为4;每进程的线程数,POSIX定义为64,LinuxThreads增大到1024(PTHREAD_THREADS_MAX);线程运行栈最小空间大小,POSIX未指定,LinuxThreads使用PTHREAD_STACK_MIN,16384(字节)。   2.管理线程   "一对一"模型的好处之一是线程的调度由核心完成了,而其他诸如线程取消、线程间的同步等工作,都是在核外线程库中完成的。在LinuxThreads中,专门为每一个进程构造了一个管理线程,负责处理线程相关的管理工作。当进程第一次调用pthread_create()创建一个线程的时候就会创建(__clone())并启动管理线程。   在一个进程空间内,管理线程与其他线程之间通过一对"管理管道(manager_pipe[2])"来通讯,该管道在创建管理线程之前创建,在成功启动了管理线程之后,管理管道的读端和写端分别赋给两个全局变量__pthread_manager_reader和__pthread_manager_request,之后,每个用户线程都通过__pthread_manager_request向管理线程发请求,但管理线程本身并没有直接使用__pthread_manager_reader,管道的读端(manager_pipe[0])是作为__clone()的参数之一传给管理线程的,管理线程的工作主要就是监听管道读端,并对从中取出的请求作出反应。   创建管理线程的流程如下所示: (全局变量pthread_manager_request初值为-1)   初始化结束后,在__pthread_manager_thread中记录了轻量级进程号以及核外分配和管理的线程id,2*PTHREAD_THREADS_MAX+1这个数值不会与任何常规用户线程id冲突。管理线程作为pthread_create()的调用者线程的子线程运行,而pthread_create()所创建的那个用户线程则是由管理线程来调用clone()创建,因此实际上是管理线程的子线程。(此处子线程的概念应该当作子进程来理解。)   __pthread_manager()就是管理线程的主循环所在,在进行一系列初始化工作后,进入while(1)循环。在循环中,线程以2秒为timeout查询(__poll())管理管道的读端。在处理请求前,检查其父线程(也就是创建manager的主线程)是否已退出,如果已退出就退出整个进程。如果有退出的子线程需要清理,则调用pthread_reap_children()清理。   然后才是读取管道中的请求,根据请求类型执行相应操作(switch-case)。具体的请求处理,源码中比较清楚,这里就不赘述了。 3.线程栈   在LinuxThreads中,管理线程的栈和用户线程的栈是分离的,管理线程在进程堆中通过malloc()分配一个THREAD_MANAGER_STACK_SIZE字节的区域作为自己的运行栈。   用户线程的栈分配办法随着体系结构的不同而不同,主要根据两个宏定义来区分,一个是NEED_SEPARATE_REGISTER_STACK,这个属性仅在IA64平台上使用;另一个是FLOATING_STACK宏,在i386等少数平台上使用,此时用户线程栈由系统决定具体位置并提供保护。与此同时,用户还可以通过线程属性结构来指定使用用户自定义的栈。因篇幅所限,这里只能分析i386平台所使用的两种栈组织方式:FLOATING_STACK方式和用户自定义方式。   在FLOATING_STACK方式下,LinuxThreads利用mmap()从内核空间中分配8MB空间(i386系统缺省的最大栈空间大小,如果有运行限制(rlimit),则按照运行限制设置),使用mprotect()设置其中第一页为非访问区。该8M空间的功能分配如下图:   低地址被保护的页面用来监测栈溢出。   对于用户指定的栈,在按照指针对界后,设置线程栈顶,并计算出栈底,不做保护,正确性由用户自己保证。   不论哪种组织方式,线程描述结构总是位于栈顶紧邻堆栈的位置。   4.线程id和进程id   每个LinuxThreads线程都同时具有线程id和进程id,其中进程id就是内核所维护的进程号,而线程id则由LinuxThreads分配和维护。   __pthread_initial_thread的线程id为PTHREAD_THREADS_MAX,__pthread_manager_thread的是2*PTHREAD_THREADS_MAX+1,第一个用户线程的线程id为PTHREAD_THREADS_MAX+2,此后第n个用户线程的线程id遵循以下公式:   tid=n*PTHREAD_THREADS_MAX+n+1   这种分配方式保证了进程中所有的线程(包括已经退出)都不会有相同的线程id,而线程id的类型pthread_t定义为无符号长整型(unsigned long int),也保证了有理由的运行时间内线程id不会重复。   从线程id查找线程数据结构是在pthread_handle()函数中完成的,实际上只是将线程号按PTHREAD_THREADS_MAX取模,得到的就是该线程在__pthread_handles中的索引。   5.线程的创建   在pthread_create()向管理线程发送REQ_CREATE请求之后,管理线程即调用pthread_handle_create()创建新线程。分配栈、设置thread属性后,以pthread_start_thread()为函数入口调用__clone()创建并启动新线程。pthread_start_thread()读取自身的进程id号存入线程描述结构中,并根据其中记录的调度方法配置调度。一切准备就绪后,再调用真正的线程执行函数,并在此函数返回后调用pthread_exit()清理现场。   6.LinuxThreads的不足   由于Linux内核的限制以及实现难度等等原因,LinuxThreads并不是完全POSIX兼容的,在它的发行README中有说明。   1)进程id问题   这个不足是最关键的不足,引起的原因牵涉到LinuxThreads的"一对一"模型。   Linux内核并不支持真正意义上的线程,LinuxThreads是用与普通进程具有同样内核调度视图的轻量级进程来实现线程支持的。这些轻量级进程拥有独立的进程id,在进程调度、信号处理、IO等方面享有与普通进程一样的能力。在源码阅读者看来,就是Linux内核的clone()没有实现对CLONE_PID参数的支持。   在内核do_fork()中对CLONE_PID的处理是这样的:   if (clone_flags & CLONE_PID) {   if (current->pid)   goto fork_out;   }   这段代码表明,目前的Linux内核仅在pid为0的时候认可CLONE_PID参数,实际上,仅在SMP初始化,手工创建进程的时候才会使用CLONE_PID参数。   按照POSIX定义,同一进程的所有线程应该共享一个进程id和父进程id,这在目前的"一对一"模型下是无法实现的。   2)信号处理问题   由于异步信号是内核以进程为单位分发的,而LinuxThreads的每个线程对内核来说都是一个进程,且没有实现"线程组",因此,某些语义不符合POSIX标准,比如没有实现向进程中所有线程发送信号,README对此作了说明。   如果核心不提供实时信号,LinuxThreads将使用SIGUSR1和SIGUSR2作为内部使用的restart和cancel信号,这样应用程序就不能使用这两个原本为用户保留的信号了。在Linux kernel 2.1.60以后的版本都支持扩展的实时信号(从_SIGRTMIN到_SIGRTMAX),因此不存在这个问题。   某些信号的缺省动作难以在现行体系上实现,比如SIGSTOP和SIGCONT,LinuxThreads只能将一个线程挂起,而无法挂起整个进程。 3)线程总数问题   LinuxThreads将每个进程的线程最大数目定义为1024,但实际上这个数值还受到整个系统的总进程数限制,这又是由于线程其实是核心进程。   在kernel 2.4.x中,采用一套全新的总进程数计算方法,使得总进程数基本上仅受限于物理内存的大小,计算公式在kernel/fork.c的fork_init()函数中:   max_threads = mempages / (THREAD_SIZE/PAGE_SIZE) / 8   在i386上,THREAD_SIZE=2*PAGE_SIZE,PAGE_SIZE=2^12(4KB),mempages=物理内存大小/PAGE_SIZE,对于256M的内存的机器,mempages=256*2^20/2^12=256*2^8,此时最大线程数为4096。   但为了保证每个用户(除了root)的进程总数不至于占用一半以上物理内存,fork_init()中继续指定:   init_task.rlim[RLIMIT_NPROC].rlim_cur = max_threads/2;   init_task.rlim[RLIMIT_NPROC].rlim_max = max_threads/2;   这些进程数目的检查都在do_fork()中进行,因此,对于LinuxThreads来说,线程总数同时受这三个因素的限制。   4)管理线程问题   管理线程容易成为瓶颈,这是这种结构的通病;同时,管理线程又负责用户线程的清理工作,因此,尽管管理线程已经屏蔽了大部分的信号,但一旦管理线程死亡,用户线程就不得不手工清理了,而且用户线程并不知道管理线程的状态,之后的线程创建等请求将无人处理。   5)同步问题   LinuxThreads中的线程同步很大程度上是建立在信号基础上的,这种通过内核复杂的信号处理机制的同步方式,效率一直是个问题。   6)其他POSIX兼容性问题   Linux中很多系统调用,按照语义都是与进程相关的,比如nice、setuid、setrlimit等,在目前的LinuxThreads中,这些调用都仅仅影响调用者线程。   7)实时性问题   线程的引入有一定的实时性考虑,但LinuxThreads暂时不支持,比如调度选项,目前还没有实现。不仅LinuxThreads如此,标准的Linux在实时性上考虑都很少。   四.其他的线程实现机制   LinuxThreads的问题,特别是兼容性上的问题,严重阻碍了Linux上的跨平台应用(如Apache)采用多线程设计,从而使得Linux上的线程应用一直保持在比较低的水平。在Linux社区中,已经有很多人在为改进线程性能而努力,其中既包括用户级线程库,也包括核心级和用户级配合改进的线程库。目前最为人看好的有两个项目,一个是RedHat公司牵头研发的NPTL(Native Posix Thread Library),另一个则是IBM投资开发的NGPT(Next Generation Posix Threading),二者都是围绕完全兼容POSIX 1003.1c,同时在核内和核外做工作以而实现多对多线程模型。这两种模型都在一定程度上弥补了LinuxThreads的缺点,且都是重起炉灶全新设计的。   1.NPTL   NPTL的设计目标归纳可归纳为以下几点:   POSIX兼容性   SMP结构的利用   低启动开销   低链接开销(即不使用线程的程序不应当受线程库的影响)   与LinuxThreads应用的二进制兼容性   软硬件的可扩展能力   多体系结构支持   NUMA支持   与C++集成   在技术实现上,NPTL仍然采用1:1的线程模型,并配合glibc和最新的Linux Kernel2.5.x开发版在信号处理、线程同步、存储管理等多方面进行了优化。和LinuxThreads不同,NPTL没有使用管理线程,核心线程的管理直接放在核内进行,这也带了性能的优化。   主要是因为核心的问题,NPTL仍然不是100%POSIX兼容的,但就性能而言相对LinuxThreads已经有很大程度上的改进了。   2.NGPT   IBM的开放源码项目NGPT在2003年1月10日推出了稳定的2.2.0版,但相关的文档工作还差很多。就目前所知,NGPT是基于GNU Pth(GNU Portable Threads)项目而实现的M:N模型,而GNU Pth是一个经典的用户级线程库实现。   按照2003年3月NGPT官方网站上的通知,NGPT考虑到NPTL日益广泛地为人所接受,为避免不同的线程库版本引起的混乱,今后将不再进行进一步开发,而今进行支持性的维护工作。也就是说,NGPT已经放弃与NPTL竞争下一代Linux POSIX线程库标准。   3.其他高效线程机制   此处不能不提到Scheduler Activations。这个1991年在ACM上发表的多线程内核结构影响了很多多线程内核的设计,其中包括Mach3.0、NetBSD和商业版本Digital Unix(现在叫Compaq True64 Unix)。它的实质是在使用用户级线程调度的同时,尽可能地减少用户级对核心的系统调用请求,而后者往往是运行开销的重要来源。采用这种结构的线程机制,实际上是结合了用户级线程的灵活高效和核心级线程的实用性,因此,包括Linux、FreeBSD在内的多个开放源码操作系统设计社区都在进行相关研究,力图在本系统中实现Scheduler Activations。 ________________________________________________ linux多线程机制线程同步 1.引言   目前,许多流行的多任务操作系统都提供线程机制,线程就是程序中的 单个顺序控制流。利用多线程进行程序设计,就是将一个程序(进程)的任务划分为执行的多个部分(线程) ,每一个线程为一个顺序的单控制流,而所有线程都是并发执行的,这样,多线程程序就可以实现并行计算,高效利用多处理器。线程可分为用户级线程和内核级线 程两种基本类型。用户级线程不需要内核支持,可以在用户程序中实现,线程调度、同步与互斥都需要用户程序自己完成。内核级线程需要内核参与,由内核完成线 程调度并提供相应的系统调用,用户程序可以通过这些接口函数对线程进行一定的控制和管理。Linux操作系统提供了LinuxThreads库,它是符合POSIX1003.1c标准的内核级多线程函数库。在linuxthreads库中提供了一些多线程编程的关键函数,在多线程编程时应包括pthread.h文件。   2.LinuxThread中的关键库函数   2.1线程的创建和终止   int pthread_create(pthread_t * pthread,const pthread_attr_t *attr,void *(*start_routine(*void)),void *arg);调用此函数可以创建一个新的线程,新线程创建后执行start_routine 指定的程序。其中参数attr是用户希望创建线程的属性,当为NULL时表示以默认的属性创建线程。arg是向start_routine 传递的参数。当成功创建一个新的线程时,系统会自动为新线程分配一个线程ID号,并通过pthread 返回给调用者。   void pthread_exit(void *value_ptr);调用该函数可以退出线程,参数value_ptr是一个指向返回状态值的指针。   2.2线程控制函数   pthread_self(void);为了区分线程,在线程创建时系统为其分配一个唯一的ID号,由pthread_create()返回给调用者,也可以通过pthread_self()获取自己的线程ID。   Int pthread_join (pthread- t thread , void * *status);这个函数的作用是等待一个线程的结束。调用pthread_join()的线程将被挂起直到线程ID为参数thread 指定的线程终止。   int pthread_detach(pthread_t pthread);参数pthread代表的线程一旦终止,立即释放调该线程占有的所有资源。   2.3线程间的互斥   互斥量和临界区类似,只有拥有互斥量的线程才具有访问资源的权限, 由于互斥对象只有一个,这就决定了任何情况下共享资源(代码或变量)都不会被多个线程同时访问。使用互斥不仅能够在同一应用程序的不同线程中实现资源的安 全共享,而且可以在不同应用程序的线程之间实现对资源的安全共享。Linux中通过pthread_mutex_t来定义互斥体机制完成互斥操作。具体的操作函数如下   pthread_mutex_init(pthread_mutex_t *mutex,const pthread_mutexattr_t *attr);初使化一个互斥体变量mutex,参数attr表示按照attr属性创建互斥体变量mutex,如果参数attr为NULL,则以默认的方式创建。   pthread_mutex_lock(pthread_mutex_t *mutex);给一个互斥体变量上锁,如果mutex指定的互斥体已经被锁住,则调用线程将被阻塞直到拥有mutex的线程对mutex解锁为止。   Pthread_mutex_unlock(pthread_mutex_t *mutex);对参数mutex指定的互斥体变量解锁。   2.4线程间的同步   同步就是线程等待某一个事件的发生,当等待的事件发生时,被等待的线程和事件一起继续执行。如果等待的事件未到达则挂起。在linux操作系统中是通过条件变量来实现同步的。   Pthread_cond_init(pthread_cond_t *cond,const pthread_cond_t *attr);这个函数按参数attr指定的属性初使化一个条件变量cond。   Pthread_cond_wait(pthread_cond_t *cond,pthread_mutex_t *mutex);等待一个事件(条件变量)的发生,发出调用的线程自动阻塞,直到相应的条件变量被置1。等待状态的线程不占用CPU时间。   pthread_cond_signal(pthread_cond_t *cond);解除一个等待参数cond指定的条件变量的线程的阻塞状态。 3.多线程编程的应用实例   在这里利用多线程技术实现生产者和消费者问题,生产者线程向一缓冲区中写数据, 消费者线程从缓冲区中读取数据,由于生产者线程和消费者线程共享同一缓冲区,为了正确读写数据,在使用缓冲队列时必须保持互斥。生产者线程和消费者线程必 须满足:生产者写入缓冲区的数目不能超过缓冲区容量,消费者读取的数目不能超过生产者写入的数目。在程序中使用了一个小技巧来判断缓冲区是空还是满。在初 始化时读指针和写指针为0;如果读指针等于写指针,则缓冲区是空的;如果(写指针+ 1) % N 等于读指针,则缓冲区是满的,%表示取余数,这时实际上有一个单元空出未用。下面是完整的程序段和注释。   #include   #include   #define BUFFER_SIZE 8   struct prodcons {   int buffer[BUFFER_SIZE];   pthread_mutex_t lock; //互斥LOCK   int readpos , writepos;   pthread_cond_t notempty; //缓冲区非空条件判断   pthread_cond_t notfull; //缓冲区未满条件判断   };   void init(struct prodcons * b){   pthread_mutex_init(&b->lock,NULL);   pthread_cond_init(&b->notempty,NULL);   pthread_cond_init(&b->notfull,NULL);   b->readpos=0;   b->writepos=0;   }   void put(struct prodcons* b,int data){   pthread-_mutex_lock(&b->lock);   if((b->writepos + 1) % BUFFER_SIZE == b->readpos)   {   pthread_cond_wait(&b->notfull, &b->lock) ;   }   b->buffer[b->writepos]=data;   b->writepos++;   if(b->writepos >= BUFFER_SIZE)   b->writepos=0;   pthread_cond_signal(&b->notempty);   pthread_mutex_unlock(&b->lock);   }   int get(struct prodcons *b){   int data; pthread_mutex_lock(&b->lock);   if(b->writepos == b->readpos)   {   pthread_cond _wait(&b->notempty, &b->lock);   }   data = b->buffer[b->readpos];   b->readpos++;   if(b->readpos >= BUFFER_SIZE)   b->readpos=0;   pthread_cond_signal(&b->notfull);   pthread_mutex_unlock(&b->lock);   return data;   } #define OVER (-1)   struct prodcons buffer;   void *producer(void *data)   {   int n;   for(n = 0; n < 10000; n++)   {   printf("%d \n", n) ;   put(&buffer, n);   }   put(&buffer, OVER);   return NULL;   }   void *consumer(void * data)   {   int d;   while(1)   {   d = get(&buffer);   if(d == OVER)   break;   printf("%d\n", d);   }   return NULL;   }   int main(void)   { pthread_t th_a, th_b;   void *retval;   init(&buffer);   pthread_create(&th_a, NULL, producer, 0);   & nbsp; pthread_create(&th_b, NULL, consumer, 0);   pthread_join(th_a, &retval);   pthread_join(th_b, &retval);   return 0;   }   上 面的例子中,生产者负责将1到1000的整数写入缓冲区,而消费者负责从同一个缓冲区中读取写入的整数并打印出来。因为生产者和消费者是两个同时运行的线 程,并且要使用同一个缓冲区进行数据交换,因此必须利用一种机制进行同步。通过上面的例子我们可以看到,多线程的最大好处是,除堆栈之外,几乎所有的数据 均是共享的,因此线程间的通讯效率很高;缺点:因为共享所有数据,从而非常容易导致线程之间互相破坏数据,这一点在编程时必须注意。   4.结束语 Linux中基于POSIX标准的很好的支持了多线程技术,它减少了程序并发执行时的系统开销,提高了计算机的工作效率。在具体编程过程中要了解线程的间的关系,还要考虑共享数据的保护,在互斥和同步机制下保证代码的高效运行,程序编译时用gcc -D –REENTRANT -libpthread.xx.so filename.c编译。 _______________________________________________ linux多线程之线程资源的释放 一般来说,对一段运行代码进行加锁然后解锁,如下所示:   pthread_mutex_lock(&mutex);   //运行代码;   pthread_mutex_unlock(&mutex);   如果在运行代码这块发生错误,有异常,导致这个线程异常退出,那么怎么办,pthread_unlock没有得到调用,那么这个锁资源没有解锁。可以用下面的方法修改。   pthread_cleanup_push(pthread_mutex_unlock, (void *) &mutex);   pthread_mutex_lock(&mutex);   /* do some work */   pthread_mutex_unlock(&mutex);   pthread_cleanup_pop(0);   这样假如运行代码发生错误时没有调用到解锁,pthread_cleanup_up会自动来调用,参数为0表示不执行push进来的函数。   但是如果是异常错误的话,这个参数并不影响异常终止时清理函数的执行。   必须要注意的是,如果线程处于PTHREAD_CANCEL_ASYNCHRONOUS状态,上述代码段就有可能出错,因为CANCEL事件有可能在pthread_cleanup_push()和pthread_mutex_lock()之间发生,或者在pthread_mutex_unlock()和pthread_cleanup_pop()之间发生,从而导致清理函数unlock一个并没有加锁的mutex变量,造成错误。因此,在使用清理函数的时候,都应该暂时设置成PTHREAD_CANCEL_DEFERRED模式。为此,POSIX的Linux实现中还提供了一对不保证可移植的pthread_cleanup_push_defer_np()/pthread_cleanup_pop_defer_np()扩展函数,功能与以下代码段相当:   { int oldtype;   pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, &oldtype);   pthread_cleanup_push(routine, arg);   ...   pthread_cleanup_pop(execute);   pthread_setcanceltype(oldtype, NULL);   }   +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++   设置退出类型pthread_setcanceltype   #include   int pthread_setcanceltype(int type, int *oldtype);   返回值:函数成功返回0。任何其他返回值都表示错误。   将线程退出类型设置为延迟类型或异步类型。参数type的取值为PTHREAD_CANCEL_DEFERRED或PTHREAD_CANCEL_ASYNCHRONOUS。   当一个线程被创建后,缺省值是延迟类型。在异步方式下,线程可以在执行的任何时候被退出 __________________________________________________________ Linux 多线程同步之消息队列 消息队列是消息的链表,存放在内核中并有消息队列标示符标示。   msgget用于创建一个新队列或打开一个现存的队列。msgsnd将新消息加入到消息队列中;每个消息包括一个long型的type;和消息缓存;msgrcv用于从队列中取出消息;取消息很智能,不一定先进先出   ①msgget,创建一个新队列或打开一个现有队列   #include   int msgget ( key_t key, int flag );   //成功返回消息队列ID;错误返回-1   ②msgsnd: 发送消息   #include   int msgsnd( int msgid, const void* ptr, size_t nbytes, int flag )   //成功返回0,错误返回-1   a:   flag可以指定为IPC_NOWAIT;  若消息队列已满,则msgsnd立即出错返回EABAIN;   若没指定IPC_NOWAIT; msgsnd会阻塞,直到消息队列有空间为止   ③msgrcv: 读取消息:   ssize_t msgrcv( int msgid, void* ptr, size_t nbytes, long type, int flag );   a. type == 0; 返回消息队列中第一个消息,先进先出   b. type > 0    返回消息队列中类型为tpye的第一个消息   c. type < 0    返回消息队列中类型 <=  |type| 的数据;若这种消息有若干个,则取类型值最小的消息   消息队列创建步骤:   #define   MSG_FILE "."   struct msgtype {   long mtype;   char buffer[BUFFER+1];   };   if((key=ftok(MSG_FILE,'a'))==-1)   {   fprintf(stderr,"Creat Key Error:%s\n", strerror(errno));   exit(1);   }   if((msgid=msgget(key, IPC_CREAT | 0666/*PERM*/))==-1)   {   fprintf(stderr,"Creat Message  Error:%s\n", strerror(errno));   exit(1);   }   msg.mtype = 1;   strncpy(msg.buffer, argv[1], BUFFER);   msgsnd(msgid, &msg, sizeof(struct msgtype), 0);   msgrcv(msgid, &msg, sizeof(struct msgtype), 1, 0);   示例代码:   #include   #include   #include   #include   #include   #include   #include   #include   #include   #define   MSG_FILE "."   #define   BUFFER 255   #define   PERM S_IRUSR|S_IWUSR   #define IPCKEY 0x111   struct msgtype {   long mtype;   char buffer[BUFFER+1];   }; void* thr_test( void* arg ){   struct msgtype msg;   int msgid;   msgid =  *((int*)arg);   printf("msqid = %d  IPC_NOWAIT = %d\n", msgid, IPC_NOWAIT);   time_t tt = time(0)+8;   //while( time(0) <= tt )   //{   msgrcv(msgid, &msg, sizeof(struct msgtype), 1, 0);   fprintf(stderr,"Server Receive:%s\n", msg.buffer);   msg.mtype = 2;   msgsnd(msgid, &msg, sizeof(struct msgtype), 0);   //}   pthread_exit( (void*)2 );   }   int main(int argc, char **argv)   {   struct msgtype msg;   key_t key;   int msgid;   pthread_t tid;   if(argc != 2)   {   fprintf(stderr,"Usage:%s string\n", argv[0]);   exit(1);   }   /*   char path[256];   sprintf( path, "%s/", (char*)getenv("HOME") );   printf( "path is %s\n", path );   msgid=ftok( path, IPCKEY );   */   if((key=ftok(MSG_FILE,'a'))==-1)   {   fprintf(stderr,"Creat Key Error:%s\n", strerror(errno));   exit(1);   }   if((msgid=msgget(key, IPC_CREAT | 0666/*PERM*/))==-1)   {   fprintf(stderr,"Creat Message  Error:%s\n", strerror(errno));   exit(1);   }   pthread_create( &tid, NULL, thr_test, &msgid );   fprintf(stderr,"msid is :%d\n", msgid);   msg.mtype = 1;   strncpy(msg.buffer, argv[1], BUFFER);   msgsnd(msgid, &msg, sizeof(struct msgtype), 0);   exit(0);   } ____________________________________________________________ Linux 多线程同步之命名管道 命名管道(FIFO)既可用于进程间通信,也可用于线程间通信;   FIFO是一种文件类型,一般文件I/O函数(close,read,write,unlink等)都适用于FIFO   一、管道创建:   #include   int mkfifo( const char* pathname, mode_t mode );   //成功返回0;否则返回-1   //mode为读写文件| 是否阻塞   二、管道默认读写——阻塞   a. 管道读取:如果没有线程进行写管道操作,读线程将一直阻塞,直到有线程往里面写为止   b. 管道写: 如果没有线程进行读操作,写线程将一直阻塞,直到有线程读数据为止   三、设置管道读写——不阻塞(O_NONBLOCK)   a、管道读:如果没有线程进行写管道操作,读线程将立即返回   b、 管道写:如果没有线程进行读操作,写线程将立即返回,返回错误码-1;errno: ENXIO   示例代码:获取vmstat的参数   #include   #include   #include   #include   #include   #include   #include   #include   #include   #include   /*定义FIFO路径*/   #define FIFO "myfifo"   #define FILE_PATH "conf.log"   int ncnt = 0;   int get_siso( char* str, int* si, int* so ){   assert( str != NULL );   char* sub_str;   FILE* fp = fopen( FILE_PATH, "ab+" );   sub_str = strtok( str, " " );   //ncnt = 0;   while( sub_str ){   if( sub_str != NULL && isdigit( sub_str[0] ) ){   fprintf( fp, " %s \t", sub_str );   printf( "substr[%d] = %d \n", ncnt, atoi(sub_str) );   ncnt++;   }   if( ncnt == 16 ){   ncnt = 0;   }   sub_str = strtok( NULL, " " );   //sleep(0.3);   }   fclose( fp );   printf( "nCnt is %d\n\n\n", ncnt );   return 1;   } int mf(){   char buf_r[1025];   int fd;   int nread;   printf("Preparing for reading bytes...\n");   memset(buf_r,0,sizeof(buf_r));   //system( "vmstat 2 > myfifo" );   /*打开FIFO管道,不阻塞方式*/   //fd=open(FIFO,O_RDONLY|O_NONBLOCK,0);   fd=open(FIFO,O_RDONLY,0);   if(fd==-1)   {   perror("open");   exit(1);   }   while(1)   {   memset(buf_r,0,sizeof(buf_r));   if((nread=read(fd,buf_r,1024))==-1){   if(errno==EAGAIN)   printf("no data yet\n");   }   sleep(2);   printf("\n\n%s\n",buf_r);   get_siso( buf_r, NULL, NULL );   //sleep(1);   }   pause();   return 1;   }   void thr_get(){   pthread_detach( pthread_self() );   system( "vmstat 2 > myfifo" );   pthread_exit(0);   }   void thr_read(){   pthread_detach( pthread_self() );   pthread_t cthd;   int stat = pthread_create( &cthd, NULL, thr_get, NULL );   mf();   pthread_exit(0);   }   int main(int argc,char** argv)   {   int pid;   pthread_t cthd, dthd;   void* tret;   /*创建FIFO管道*/   if((mkfifo(FIFO,O_CREAT|O_EXCL)<0)&&(errno!=EEXIST)){   printf("cannot create fifoserver\n");   }   system( "chmod 777 myfifo" );   int tsts = pthread_create( &dthd, NULL, thr_read, NULL );   pthread_join( dthd, &tret );   printf( "tsts is %d\n", tsts );   sleep( 60 );   unlink(FIFO);   }

下载文档,方便阅读与编辑

文档的实际排版效果,会与网站的显示效果略有不同!!

需要 15 金币 [ 分享文档获得金币 ]
0 人已下载

下载文档

相关文档