diff --git "a/evn/\347\216\257\345\242\203" "b/evn/\347\216\257\345\242\203" new file mode 100644 index 0000000..65b806d --- /dev/null +++ "b/evn/\347\216\257\345\242\203" @@ -0,0 +1,4 @@ +echo $PATH #环境引用的先后 +/usr/local/bin/python +/usr/bin/python + diff --git a/hashlib_lab/hash_md6.py b/hashlib_lab/hash_md6.py new file mode 100644 index 0000000..88ffdcc --- /dev/null +++ b/hashlib_lab/hash_md6.py @@ -0,0 +1,17 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# Created by iFantastic on 15-2-12 +__author__ = 'cluo' + +import hashlib +h = hashlib.md5() +h.update('cluo') +print h.hexdigest() #字符 +print h.digest() #二进制 + + +h = hashlib.sha1() + + + + diff --git a/os.pyc b/os.pyc deleted file mode 100644 index 695ad58..0000000 Binary files a/os.pyc and /dev/null differ diff --git a/os.py b/os_lab.py similarity index 100% rename from os.py rename to os_lab.py diff --git a/signal.py b/signal_lab.py similarity index 100% rename from signal.py rename to signal_lab.py diff --git a/signal_lab/getsinal_lab.py b/signal_lab/getsinal_lab.py new file mode 100644 index 0000000..afbaea7 --- /dev/null +++ b/signal_lab/getsinal_lab.py @@ -0,0 +1,34 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# Created by iFantastic on 15-2-12 +__author__ = 'cluo' +#获取注册程序 +#每个进程都用一张表来 保存信号量 +#注册函数为SIG_IGN 忽略 +#注册函数为SIG_DEF 默认行为 +#信号是进程间通信 模拟硬件通信的行为 + +import signal + +def alarm_received(n, stack): + return + +signal.signal(signal.SIGALRM, alarm_received) + +signals_to_names = dict( + (getattr(signal, n), n) + for n in dir(signal) + if n.startswith('SIG') and '_' not in n +) + +for s, name in sorted(signals_to_names.items()): + handler = signal.getsignal(s) #获取注册信号的函数 + if handler is signal.SIG_DFL: + handler = 'SIG_DFL' + elif handler is signal.SIG_IGN: + handler = 'SIG_IGN' + print '%-10s (%2d):' % (name, s), handler + +# if __name__ == '__main__': +# print signals_to_names +# print dir(signal) diff --git a/signal_lab/ignore.py b/signal_lab/ignore.py new file mode 100644 index 0000000..09c40d2 --- /dev/null +++ b/signal_lab/ignore.py @@ -0,0 +1,15 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# Created by iFantastic on 15-2-12 +__author__ = 'cluo' +import signal +import os +import time +if __name__ == '__main__': + def do_exit(sig, stack): + raise SystemExit('Exiting') + signal.signal(signal.SIGINT, signal.SIG_IGN) #ctrl + c SIGINT 中断程序 处理函数为 signal.SIG_IGN 时忽略信号 + signal.signal(signal.SIGUSR1, do_exit) #退出程序 + print 'My PID:', os.getpid() + signal.pause() + diff --git a/signal_lab/kill_14_SIGALRM.py b/signal_lab/kill_14_SIGALRM.py new file mode 100644 index 0000000..830fec3 --- /dev/null +++ b/signal_lab/kill_14_SIGALRM.py @@ -0,0 +1,28 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# Created by iFantastic on 15-2-12 +__author__ = 'cluo' + +import signal +import os + + + +# Define signal handler function +def myHandler(signum, frame): + print signum + print os.getpid() + print signal.SIGALRM #14定时唤醒 + print("Now, it's the time") + os.kill(os.getpid(),signal.SIGALRM) + exit() + +# register signal.SIGALRM's handler +signal.signal(signal.SIGALRM, myHandler) +signal.alarm(2) +signal.pause() +print('End of Signal Demo') + + + + diff --git a/signal_lab/kill_15_SIGTERM.py b/signal_lab/kill_15_SIGTERM.py new file mode 100644 index 0000000..4abea85 --- /dev/null +++ b/signal_lab/kill_15_SIGTERM.py @@ -0,0 +1,43 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# Created by iFantastic on 15-2-12 +__author__ = 'cluo' +import os +import signal +#handler为signal.SIG_IGN时,信号被无视(ignore) +#当handler为singal.SIG_DFL,进程采取默认操作(default)。 +# 当handler为一个函数名时,进程采取函数中定义的操作。 +#我们首先使用signal.signal()函数来预设信号处理函数。 +# 然后我们执行signal.pause()来让该进程暂停以等待信号, +#以等待信号。当信号SIGUSR1被传递给该进程时,进程从暂停中 +#并根据预设,执行SIGTSTP的信号处理函数myHandler()。 +#myHandler的两个参数一个用来识别信号(signum), +#另一个用来获得信号发生时,进程栈的状况(stack frame)。 +#这两个参数都是由signal.singnal()函数来传递的。 + +# 通过按下CTRL+Z向该进程发送SIGTSTP信号。 +# 我们可以看到,进程执行了myHandle()函数, +# 随后返回主程序,继续执行。(当然,也可以用$ps查询process ID, 再使用$kill来发出信号。) +# (进程并不一定要使用signal.pause()暂停以等待信号, +# 它也可以在进行工作中接受信号, +# 比如将上面的signal.pause()改为一个需要长时间工作的循环。 +# while true +# doing +# +# ) + + +def sigterm_clean(signum, frame): + try: + print 'alive' + print 'over' + print signal.SIGTERM #15 等待15信号 + os.kill(os.getpid(),15) #相当于 kill -15 pid + except OSError: + print 'error' + pass + +signal.signal(signal.SIGTERM, sigterm_clean) +print os.getpid() + +signal.pause() diff --git a/signal_lab/signal_alarm.py b/signal_lab/signal_alarm.py new file mode 100644 index 0000000..411db35 --- /dev/null +++ b/signal_lab/signal_alarm.py @@ -0,0 +1,17 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# Created by iFantastic on 15-2-12 +import signal +import time +#定时触发信号 +__author__ = 'cluo' +def receive_alarm(signum, stack): + print 'Alarm :',time.ctime() +signal.signal(signal.SIGALRM, receive_alarm) +signal.alarm(2) +print 'Before:',time.ctime() +time.sleep(4) +print 'After :',time.ctime() + + + diff --git a/signal_lab/signal_receive_signal.py b/signal_lab/signal_receive_signal.py new file mode 100644 index 0000000..acb2deb --- /dev/null +++ b/signal_lab/signal_receive_signal.py @@ -0,0 +1,28 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# Created by iFantastic on 15-2-12 +__author__ = 'cluo' +import signal +import os +import time + +#信号(signal)是Linux进程间通信的一种机制,全称为软中断信号,也被称为软中断。信号本质上是在软件层次上对硬件中断机制的一种模拟。 +def receive_signal(signum, stack): + print '==============================received:',signum +signal.signal(signal.SIGUSR1, receive_signal) +signal.signal(signal.SIGUSR2, receive_signal) + +#kill -USR1 $pid +#kill -USR1 $pid +#kill -INT $pid ctrl + c +#kill -QUIT $pid crtl + d + + +if __name__ == '__main__': + print 'my pid is :', os.getpid() + while True: + print 'Waiting...' + print os.getpid() + print 'usr1:',signal.SIGUSR1 + print 'usr2:',signal.SIGUSR2 + time.sleep(20) diff --git a/signal_lab/singal_threading.py b/signal_lab/singal_threading.py new file mode 100644 index 0000000..2dee48c --- /dev/null +++ b/signal_lab/singal_threading.py @@ -0,0 +1,56 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# Created by iFantastic on 15-2-12 +__author__ = 'cluo' +import signal +import threading +import os +import time + +#信号一般是线程级别的通信, +#子线程是接不了信号的,子线程pause, 子线程将永远不会退出 +#只有主线程才能接受闹铃 + +if __name__ == '__main__': + #主线程 + def signal_handler(num, stack): + print 'Received signal %d in %s' %(num, threading.currentThread().name) + signal.signal(signal.SIGUSR1, signal_handler) #python 中的信号处理函数 + + + def signal_handler_1(num, stack): + print 'Received signal %d in %s' %(num, threading.currentThread().name) + signal.signal(signal.SIGTERM, signal_handler_1) #python 中的信号处理函数 + + + + + #收信号线程 + def wait_for_signal(): + print 'Waiting for signal in',threading.currentThread().name + signal.pause() #线程 接受不了任何信号 + print 'Done waiting' + + receiver = threading.Thread(target=wait_for_signal,name='receiver') + receiver.start() + time.sleep(0.1) + + #发信号线程 + def send_signal(): + print 'Sending signal in',threading.currentThread().name + os.kill(os.getpid(), signal.SIGUSR1) + os.kill(os.getpid(), signal.SIGTERM) + + sender = threading.Thread(target=send_signal,name='sender') + start = time.time() + sender.start() + sender.join() + + print 'Waiting for',receiver.name + signal.alarm(2) #2秒后自己送送SIGALRM 终止程序(防止阻塞) + print os.getpid() + receiver.join() + end = time.time() + interval = end - start + print interval + diff --git a/signal_lab/stream_lab.py b/signal_lab/stream_lab.py new file mode 100644 index 0000000..ece7118 --- /dev/null +++ b/signal_lab/stream_lab.py @@ -0,0 +1,20 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# Created by iFantastic on 15-2-12 +__author__ = 'cluo' +import signal +import os +# Define signal handler function +def myHandler(signum, frame): + print('I received: ', signum) + print signal.SIGTSTP + print os.getpid() + print 'end' + os.kill(os.getpid(),signal.SIGTERM) + +# register signal.SIGTSTP's handler +signal.signal(signal.SIGTSTP, myHandler) +while True: + print('End of Signal Demo') + +#通过按下CTRL+Z向该进程发送SIGTSTP信号 \ No newline at end of file diff --git a/signal_lab/thread_alarm_handle.py b/signal_lab/thread_alarm_handle.py new file mode 100644 index 0000000..4fa6325 --- /dev/null +++ b/signal_lab/thread_alarm_handle.py @@ -0,0 +1,28 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# Created by iFantastic on 15-2-12 +__author__ = 'cluo' +#尽管线程中能设置闹铃 但总是主线程接受 +import signal +import time +import threading + +def signal_handler(num, stack): + print time.ctime(),'Alarm in ', threading.currentThread().name +signal.signal(signal.SIGALRM, signal_handler) + +def use_alarm(): + t_name = threading.currentThread().name + print time.ctime(), 'Setting alarm in ',t_name + signal.alarm(1) #子线程的闹铃,只主线程接收 + #不会中断3秒的延迟时 + print time.ctime(),'Sleeping in',t_name + time.sleep(3) + print time.ctime(),'Done with sleep in',t_name + +alarm_thread = threading.Thread(target=use_alarm,name='alarm_thread') +alarm_thread.start() +time.sleep(0.1) +print time.ctime(),'Waiting for',alarm_thread.name +alarm_thread.join() +print time.ctime(),'Exting normally' diff --git a/thread/img/ora-lp4e.jpg b/thread/img/ora-lp4e.jpg new file mode 100755 index 0000000..97c2dc8 Binary files /dev/null and b/thread/img/ora-lp4e.jpg differ diff --git a/thread/img/ora-pp4e-large.jpg b/thread/img/ora-pp4e-large.jpg new file mode 100755 index 0000000..8f789d0 Binary files /dev/null and b/thread/img/ora-pp4e-large.jpg differ diff --git a/thread/info b/thread/info new file mode 100644 index 0000000..2964a1f --- /dev/null +++ b/thread/info @@ -0,0 +1,26 @@ +线程池原理及python实现 +为什么需要线程池 +  目前的大多数网络服务器,包括Web服务器、Email服务器以及数据库服务器等都具有一个共同点,就是单位时间内必须处理数目巨大的连接请求,但处理时间却相对较短。 +  传统多线程方案中我们采用的服务器模型则是一旦接受到请求之后,即创建一个新的线程,由该线程执行任务。任务执行完毕后,线程退出,这就是是“即时创建, 即时销毁”的策略。尽管与创建进程相比,创建线程的时间已经大大的缩短,但是如果提交给线程的任务是执行时间较短,而且执行次数极其频繁,那么服务器将处于不停的创建线程,销毁线程的状态。 +  我们将传统方案中的线程执行过程分为三个过程:T1、T2、T3: +  T1:线程创建时间 +  T2:线程执行时间,包括线程的同步等时间 +  T3:线程销毁时间 + +那么我们可以看出,线程本身的开销所占的比例为(T1+T3) / (T1+T2+T3)。如果线程执行的时间很短的话,这比开销可能占到20%-50%左右。如果任务执行时间很频繁的话,这笔开销将是不可忽略的。 +  除此之外,线程池能够减少创建的线程个数。通常线程池所允许的并发线程是有上界的,如果同时需要并发的线程数超过上界,那么一部分线程将会等待。而传统方案中,如果同时请求数目为2000,那么最坏情况下,系统可能需要产生2000个线程。尽管这不是一个很大的数目,但是也有部分机器可能达不到这种要求。 +  因此线程池的出现正是着眼于减少线程池本身带来的开销。线程池采用预创建的技术,在应用程序启动之后,将立即创建一定数量的线程(N1),放入空闲队列 中。这些线程都是处于阻塞(Suspended)状态,不消耗CPU,但占用较小的内存空间。当任务到来后,缓冲池选择一个空闲线程,把任务传入此线程中运行。当N1个线程都在处理任务后,缓冲池自动创建一定数量的新线程,用于处理更多的任务。在任务执行完毕后线程也不退出,而是继续保持在池中等待下一次的任务。当系统比较空闲时,大部分线程都一直处于暂停状态,线程池自动销毁一部分线程,回收系统资源。 +  基于这种预创建技术,线程池将线程创建和销毁本身所带来的开销分摊到了各个具体的任务上,执行次数越多,每个任务所分担到的线程本身开销则越小,不过我们另外可能需要考虑进去线程之间同步所带来的开销。 + +构建线程池框架 + +一般线程池都必须具备下面几个组成部分: +  线程池管理器:用于创建并管理线程池 +  : 线程池中实际执行的线程 +  : 尽管线程池大多数情况下是用来支持网络服务器,但是我们将线程执行的任务抽象出来,形成任务接口,从而是的线程池与具体的任务无关。 +  :线程池的概念具体到实现则可能是队列,链表之类的数据结构,其中保存执行线程。 + +  我们把任务放进队列中去,然后开N个线程,每个线程都去队列中取一个任务,执行完了之后告诉系统说我执行完了,然后接着去队列中取下一个任务,直至队列中所有任务取空,退出线程。 + +  这就是一般的线程池实现的原理,下面看一个实际的代码: + diff --git a/thread/more_wokrer_more_question.py b/thread/more_wokrer_more_question.py new file mode 100644 index 0000000..042bd5d --- /dev/null +++ b/thread/more_wokrer_more_question.py @@ -0,0 +1,73 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# Created by iFantastic on 15-2-14 +__author__ = 'cluo' + +#Example2.py +''' +A more realistic thread pool example +''' +# 这段代码能正确的运行,但仔细看看我们需要做些什么:构造不同的方法、追踪一系列的线程, +# 还有为了解决恼人的死锁问题,我们需要进行一系列的 join 操作。这还只是开始…… +# +# 至此我们回顾了经典的多线程教程,多少有些空洞不是吗?样板化而且易出错, +# 这样事倍功半的风格显然不那么适合日常使用,好在我们还有更好的方法。 +import time +import threading +import Queue +import urllib2 +import logging + +#日志 +logging.basicConfig( + level=logging.DEBUG, + format='[%(asctime)s](%(threadName)-10s) %(message)s' +) + +class Consumer(threading.Thread): + def __init__(self, queue): + threading.Thread.__init__(self) + self._queue = queue + + def run(self): + while True: + content = self._queue.get() + if isinstance(content, str) and content == 'quit': + break + logging.debug(content) + response = urllib2.urlopen(content) + print response + + + +def Producer(): + urls = [ + 'http://www.yahoo.com', + 'http://www.baidu.com', + 'http://www.163.com', + 'http://www.python.org', + ] + queue = Queue.Queue() + worker_threads = build_worker_pool(queue, 4) + start_time = time.time() + + # Add the urls to process + for url in urls: + queue.put(url) + for worker in worker_threads: # Add the poison pillv + queue.put('quit') + for worker in worker_threads: + worker.join() + + print 'Done! Time taken: {}'.format(time.time() - start_time) + +def build_worker_pool(queue, size): + workers = [] + for _ in range(size): + worker = Consumer(queue) + worker.start() + workers.append(worker) + return workers + +if __name__ == '__main__': + Producer() diff --git a/thread/multiprocessing_dummy.py b/thread/multiprocessing_dummy.py new file mode 100644 index 0000000..68036b7 --- /dev/null +++ b/thread/multiprocessing_dummy.py @@ -0,0 +1,72 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# Created by iFantastic on 15-2-15 +__author__ = 'cluo' +# 在 Python 中有个两个库包含了 map 函数: multiprocessing 和它鲜为人知的子库 multiprocessing.dummy. +# +# 这里多扯两句: multiprocessing.dummy? mltiprocessing 库的线程版克隆?这是虾米? +# 即便在 multiprocessing 库的官方文档里关于这一子库也只有一句相关描述。 +# 而这句描述译成人话基本就是说:”嘛,有这么个东西,你知道就成.”相信我,这个库被严重低估了! +# +# dummy 是 multiprocessing 模块的完整克隆,唯一的不同在于 multiprocessing 作用于进程, +# 而 dummy 模块作用于线程(因此也包括了 Python 所有常见的多线程限制)。 +# 所以替换使用这两个库异常容易。你可以针对 IO 密集型任务和 CPU 密集型任务来选择不同的库。2 +# + +# Pool 对象有一些参数,这里我所需要关注的只是它的第一个参数:processes. 这一参数用于设定线程池中的线程数 +# 其默认值为当前机器 CPU 的核数。 + +# 一般来说,执行 CPU 密集型任务时,调用越多的核速度就越快。但是当处理网络密集型任务时, +# 事情有有些难以预计了,通过实验来确定线程池的大小才是明智的。 + +# pool = ThreadPool(4) # Sets the pool size to 4 +# 线程数过多时,切换线程所消耗的时间甚至会超过实际工作时间。对于不同的工作,通过尝试来找到线程池大小的最优值是个不错的主意。 +# 动手尝试 + +# 虽然只改动了几行代码,我们却明显提高了程序的执行速度。 +# 在生产环境中,我们可以为 CPU 密集型任务和 IO 密集型任务分别选择多进程和多线程库来进一步提高执行速度 +# ——这也是解决死锁问题的良方。 +# 此外,由于 map 函数并不支持手动线程管理,反而使得相关的 debug 工作也变得异常简单。 +# 到这里,我们就实现了(基本)通过一行 Python 实现并行化。 + +from multiprocessing.dummy import Pool as ThreadPool #io密集型 +import urllib2 +import time +import logging +from timeit import Timer + +#日志 +logging.basicConfig( + level=logging.DEBUG, + format='[%(asctime)s](%(threadName)-10s) %(message)s' +) + +if __name__ == '__main__': + def test(): + urls = [ + 'http://www.163.com', + 'http://www.baidu.com', + ] + + # Make the Pool of workers + pool = ThreadPool(4) #调整线程池 选出最短耗时 + # Open the urls in their own threads + # and return the results + # start_time = time.time() + results = pool.map(urllib2.urlopen, urls) + #close the pool and wait for the work to finish + pool.close() + pool.join() + # cost = time.time() - start_time + # logging.debug("cost_time %s", cost) + def test1(): + i = 3 + while i > 0: + i -= 1 + t1 = Timer("test1()", 'from __main__ import test1') + print t1.timeit(1) #运行时间 + + t = Timer("test()", 'from __main__ import test') + print t.timeit(1) #运行时间 + + diff --git a/thread/multiprocessing_pool.py b/thread/multiprocessing_pool.py new file mode 100644 index 0000000..ea29340 --- /dev/null +++ b/thread/multiprocessing_pool.py @@ -0,0 +1,37 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# Created by iFantastic on 15-2-15 +__author__ = 'cluo' +import os +import PIL + +from multiprocessing import Pool +from PIL import Image + +SIZE = (75, 75) +SAVE_DIRECTORY = 'thumbs' + +def get_image_paths(folder): + return (os.path.join(folder, f) + for f in os.listdir(folder) + if 'jpg' in f) + +def create_thumbnail(filename): + im = Image.open(filename) + im.thumbnail(SIZE, Image.ANTIALIAS) + base, fname = os.path.split(filename) + save_path = os.path.join(base, SAVE_DIRECTORY, fname) + im.save(save_path) + +if __name__ == '__main__': + folder = os.path.abspath('./img') + thumb_path = os.path.join(folder, SAVE_DIRECTORY) + if os.path.exists(thumb_path) == False: + os.mkdir(thumb_path) + images = get_image_paths(folder) + pool = Pool() + pool.map(create_thumbnail, images) + pool.close() + pool.join() + + diff --git a/thread/producer_consumer.py b/thread/producer_consumer.py new file mode 100644 index 0000000..f52b2ac --- /dev/null +++ b/thread/producer_consumer.py @@ -0,0 +1,66 @@ +#-*- coding:utf8 -*- +# 我并不是说使用生产者/消费者模型处理多线程/多进程任务是错误的(事实上,这一模型自有其用武之地)。 +# 只是,处理日常脚本任务时我们可以使用更有效率的模型。 +# +# 问题在于… +# +# 首先,你需要一个样板类; +# 其次,你需要一个队列来传递对象; +# 而且,你还需要在通道两端都构建相应的方法来协助其工作(如果需想要进行双向通信或是保存结果还需要再引入一个队列)。 +#Example.py +''' +Standard Producer/Consumer Threading Pattern +''' + +import time +import threading +import Queue + +class Consumer(threading.Thread): + def __init__(self, queue): + threading.Thread.__init__(self) + self._queue = queue + + def run(self): + while True: + # queue.get() blocks the current thread until + # an item is retrieved. + msg = self._queue.get() #block until receive + # Checks if the current message is + # the "Poison Pill" + if isinstance(msg, str) and msg == 'quit': + # if so, exists the loop + break + # "Processes" (or in our case, prints) the queue item + print "I'm a thread, and I received %s!!" % msg + # Always be friendly! + print 'Bye byes!' + +def Producer(): + # Queue is used to share items between + # the threads. + queue = Queue.Queue() + + # Create an instance of the worker + worker = Consumer(queue) + # start calls the internal run() method to + # kick off the thread + worker.start() + + # variable to keep track of when we started + start_time = time.time() + # While under 5 seconds.. + while time.time() - start_time < 5: + # "Produce" a piece of work and stick it in + # the queue for the Consumer to process + queue.put('something at %s' % time.time()) + # Sleep a bit just to avoid an absurd number of messages + time.sleep(1) + + # This the "poison pill" method of killing a thread. + queue.put('quit') + # wait for the thread to close down + worker.join() + +if __name__ == '__main__': + Producer() \ No newline at end of file diff --git a/thread/thread_args.py b/thread/thread_args.py new file mode 100644 index 0000000..e1da4f6 --- /dev/null +++ b/thread/thread_args.py @@ -0,0 +1,20 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# Created by iFantastic on 15-2-12 +__author__ = 'cluo' + +#程序可以由同一个线程空间并发运行多个进程 +import threading +def worker(num,mark): + """thread worker function """ + print 'Worker %s mark %s' % (num,mark) + print threading.currentThread().getName() + return +threads = [] +for i in range(5): + name = 'cluo' + str(i) + t = threading.Thread(target = worker, name = name, args = (i,name)) + threads.append(t) + t.start() + +print threads \ No newline at end of file diff --git a/thread/thread_basic.py b/thread/thread_basic.py new file mode 100644 index 0000000..65b35bb --- /dev/null +++ b/thread/thread_basic.py @@ -0,0 +1,19 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# Created by iFantastic on 15-2-12 +__author__ = 'cluo' + +#程序可以由同一个线程空间并发运行多个进程 +import threading +def worker(): + """thread worker function """ + print 'Worker' + print threading.currentThread().getName() + return +threads = [] +for i in range(5): + t = threading.Thread(target=worker) + threads.append(t) + t.start() + +print threads \ No newline at end of file diff --git a/thread/thread_condition.py b/thread/thread_condition.py new file mode 100644 index 0000000..6d24678 --- /dev/null +++ b/thread/thread_condition.py @@ -0,0 +1,43 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# Created by iFantastic on 15-2-14 +__author__ = 'cluo' + +import logging +import threading +import time +#consumer()要设置Condition 才能继续 +#producer() 负责设置条件 并通知其他进程可以继续‰ +logging.basicConfig( + level = logging.DEBUG, + format = '%(asctime)s (%(threadName)-2s) %(message)s' +) + +def consumer(cond): + """wait fro condition and use the resource""" + logging.debug('Starting consumer thread') + t = threading.currentThread() + with cond: #condition 使用一个Lock + cond.wait() + logging.debug('Resource is alailable to consumer') + +def producer(cond): + """set up the resource to be used by the consumer""" + logging.debug('Starting producer thread') + with cond: + logging.debug('Making resource available') + cond.notifyAll() + +condition = threading.Condition() +c1 = threading.Thread(name = 'c1', target = consumer, args = (condition,)) +c2 = threading.Thread(name = 'c2', target = consumer, args = (condition,)) + +p = threading.Thread(name = 'p', target = producer, args = (condition,)) + + +if __name__ == '__main__': + c1.start() + time.sleep(2) + c2.start() + time.sleep(2) + p.start() diff --git a/thread/thread_daemon.py b/thread/thread_daemon.py index 19ee864..1faae24 100644 --- a/thread/thread_daemon.py +++ b/thread/thread_daemon.py @@ -4,34 +4,36 @@ import logging import threading import time +#程序等待所有非守护线程工作后才退出 +#守护线程 会一直运行不会阻塞程序退出 logging.basicConfig( - level=logging.DEBUG, - format='[%(levelname)s] %(message)s' + level=logging.DEBUG, + format='[%(levelname)s] %(message)s' ) def daemon(): - logging.debug('Starting') - time.sleep(3) - logging.debug('Exting') + logging.debug('Starting') + time.sleep(3) + logging.debug('Exting')#没有输出 因为非守护进程 和主程序已经退出 d = threading.Thread(name='daemon', target=daemon) d.setDaemon(True) def non_daemon(): - logging.debug('Starting') - logging.debug('Exiting') - time.sleep(5) + logging.debug('Starting') + logging.debug('Exiting') t = threading.Thread(name='non-daemon', target=non_daemon) d.start() t.start() -start = time.time() -d.join(1.1) -t.join(1.1) -end = time.time() -print (end - start) #join所有线程 +# start = time.time() +# d.join(1.1) +# t.join(1.1) +# end = time.time() +# print (end - start) #join所有线程 + -print 'd.isAlive()', d.isAlive() print 't.isAlive()', t.isAlive() +print 'd.isAlive()', d.isAlive() diff --git a/thread/thread_daemon_join.py b/thread/thread_daemon_join.py new file mode 100644 index 0000000..37b18c9 --- /dev/null +++ b/thread/thread_daemon_join.py @@ -0,0 +1,38 @@ +#-*- coding:utf8 -*- +__author__ = 'admin' + +import logging +import threading +import time +#等待守护进程完成要用join +logging.basicConfig( + level=logging.DEBUG, + format='[%(levelname)s] %(message)s' +) + +def daemon(): + logging.debug('Starting') + time.sleep(3) + logging.debug('Exting')#没有输出 因为非守护进程 和主程序已经退出 +d = threading.Thread(name='daemon', target=daemon) +d.setDaemon(True) + +def non_daemon(): + logging.debug('Starting') + logging.debug('Exiting') +t = threading.Thread(name='non-daemon', target=non_daemon) + +d.start() +t.start() +start = time.time() +d.join() #等待守护进程退出 +t.join() +end = time.time() +print (end - start) #join所有线程 + + +print 't.isAlive()', t.isAlive() +print 'd.isAlive()', d.isAlive() + + + diff --git a/thread/thread_daemon_join_time_out.py b/thread/thread_daemon_join_time_out.py new file mode 100644 index 0000000..2f51c0c --- /dev/null +++ b/thread/thread_daemon_join_time_out.py @@ -0,0 +1,38 @@ +#-*- coding:utf8 -*- +__author__ = 'admin' + +import logging +import threading +import time +#等待守护进程完成要用join,防止无限阻塞用join(sec) +logging.basicConfig( + level=logging.DEBUG, + format='[%(levelname)s] %(message)s' +) + +def daemon(): + logging.debug('Starting') + time.sleep(3) + logging.debug('Exting')#没有输出 因为非守护进程 和主程序已经退出 +d = threading.Thread(name='daemon', target=daemon) +d.setDaemon(True) + +def non_daemon(): + logging.debug('Starting') + logging.debug('Exiting') +t = threading.Thread(name='non-daemon', target=non_daemon) + +d.start() +t.start() +start = time.time() +d.join(2) #等待守护进程退出 +t.join() +end = time.time() +print (end - start) #join所有线程 + + +print 't.isAlive()', t.isAlive() +print 'd.isAlive()', d.isAlive() #阻塞等待程序退出 + + + diff --git a/thread/thread_event.py b/thread/thread_event.py index 30eb0b8..9047746 100644 --- a/thread/thread_event.py +++ b/thread/thread_event.py @@ -1,19 +1,25 @@ +#-*- coding:utf8 -*- __author__ = 'admin' import threading import time import logging +#用event同步线程 +#事件对象是实现线程间通信的简单方法(多个线程同步操作时) +#Event管理一个内部标记 调用者可以用set() clear()方法控制这个标志 +#其他线程可以用wait()暂停知道遇到这个标记,允许继续之前阻塞线程运行 +# 事件运行时线程通信的埋点,主线程的变量可以看作是运行后的? logging.basicConfig(level=logging.DEBUG, - format='(%(threadName)-10s) %(message)s') + format='(%(threadN‰ ame)-10s) %(message)s') def wait_for_event(e): logging.debug('wait_for_event starting') - event_is_set = e.wait() + event_is_set = e.wait() #永久阻塞 知道设置标志位 logging.debug('event set %s',event_is_set) def wait_for_event_timeout(e, t): while not e.isSet(): logging.debug('wait_for_event_timeout starting') - event_is_set = e.wait(t) + event_is_set = e.wait(t) #定时阻塞 logging.debug('event set: %s',event_is_set) if event_is_set: logging.debug('processing event') @@ -23,14 +29,14 @@ def wait_for_event_timeout(e, t): e = threading.Event() t1 = threading.Thread(name='block', target=wait_for_event, - args=(e,)) + args=(e,)) t1.start() t2 = threading.Thread(name='nonblock', target=wait_for_event_timeout, args=(e,2)) t2.start() -logging.debug('Waiting before calling Event set()') +logging.debug('Waiting before calling Event se‰t()') time.sleep(3) e.set() logging.debug('Event is set') \ No newline at end of file diff --git a/thread/thread_lock.py b/thread/thread_lock.py index 84e75aa..cb68700 100644 --- a/thread/thread_lock.py +++ b/thread/thread_lock.py @@ -1,8 +1,12 @@ +#-*- coding:utf8 -*- __author__ = 'admin' + import logging import random import threading import time +#python 内置数据结构(列表,字典)是线程安全,python 用原子字节码来管理这些数据结构的一个副作用更新过程不会释放GIL +#python 中的其他数据结构或者更简单的类型 整数和浮点数贼没有这个保护,要保证同时安全的访问对象可以使用Lock对象 logging.basicConfig(level=logging.DEBUG, format='(%(threadName)-10s) %(message)s') @@ -11,6 +15,7 @@ class Counter(): def __init__(self, start=0): self.lock = threading.Lock() self.value = start + def increment(self): logging.debug('Waithing for lock') self.lock.acquire() @@ -36,7 +41,7 @@ def worker(c): logging.debug('Waiting for worker threads') main_thread = threading.currentThread() for t in threading.enumerate(): - if t is not main_thread: + if t is not main_thread: #跳过,主进程主塞会陷入死锁 t.join() logging.debug('Counter: %d', counter.value) diff --git a/thread/thread_lock_holder.py b/thread/thread_lock_holder.py index f7cb3e4..9db5f80 100644 --- a/thread/thread_lock_holder.py +++ b/thread/thread_lock_holder.py @@ -1,4 +1,4 @@ -__author__ = 'admin' +_author__ = 'admin' import logging import threading import time @@ -26,7 +26,9 @@ def worker(lock): while num_acquires < 3: time.sleep(0.5) logging.debug('Trying to acquire') - have_it = lock.acquire(0) + have_it = lock.acquire(0) #查看另外一个进程是否请求锁 而不影响当前进程 + #如果lock重复acquire会导致死锁 + #设置了超时0 避免永久阻塞 try: num_tries += 1 if have_it: @@ -41,7 +43,7 @@ def worker(lock): lock = threading.Lock() holder = threading.Thread(target=lock_holder,args=(lock,),name='LockHolder') -holder.setDaemon(True) +holder.setDaemon(True) #守护线程,不阻塞主程序退出,主程序退出依然存在 holder.start() worker = threading.Thread(target=worker,args=(lock,), name='Worker') diff --git a/thread/thread_1.py b/thread/thread_name.py similarity index 83% rename from thread/thread_1.py rename to thread/thread_name.py index 32ac429..e71b028 100644 --- a/thread/thread_1.py +++ b/thread/thread_name.py @@ -10,7 +10,7 @@ def my_service(): time.sleep(2) print threading.currentThread().getName(), 'Exting' -t = threading.Thread(name='my_service', target=my_service) +t = threading.Thread(name='my_service', target=my_service) #注册线程名 和线程函数 w = threading.Thread(name='worker', target=worker) w2 = threading.Thread(target=worker) diff --git a/thread/thread_2.py b/thread/thread_name_log.py similarity index 94% rename from thread/thread_2.py rename to thread/thread_name_log.py index 79b19a4..fe1c382 100644 --- a/thread/thread_2.py +++ b/thread/thread_name_log.py @@ -1,7 +1,10 @@ +#-*- coding:utf8 -*- __author__ = 'admin' import logging import threading import time + +#日志 logging.basicConfig( level=logging.DEBUG, format='[%(levelname)s](%(threadName)-10s) %(message)s' diff --git a/thread/thread_pool.py b/thread/thread_pool.py new file mode 100644 index 0000000..e69de29 diff --git a/thread/thread_relock.py b/thread/thread_relock.py index 49d9a45..b61feae 100644 --- a/thread/thread_relock.py +++ b/thread/thread_relock.py @@ -8,9 +8,10 @@ format='(%(threadName)-10s) %(message)s') lock = threading.Lock() logging.debug('first try : %s' % lock.acquire()) -logging.debug('second try: %s' % lock.acquire(0)) #正常来说对同一线程 不能锁多次 +logging.debug('second try: %s' % lock.acquire(0)) #正常来说对同一线程 不能锁多次 设置了获取锁超时 + -lock = threading.RLock() +lock = threading.RLock() #重新上锁 logging.debug('first try : %s' % lock.acquire()) logging.debug('second try: %s' % lock.acquire(0)) #正常来说对同一线程 不能锁多次 diff --git a/thread/thread_semaphore.py b/thread/thread_semaphore.py new file mode 100644 index 0000000..ff815ee --- /dev/null +++ b/thread/thread_semaphore.py @@ -0,0 +1,47 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# Created by iFantastic on 15-2-14 +__author__ = 'cluo' +import logging +import random +import threading +import time +#线程池 线程池 +logging.basicConfig( + level = logging.DEBUG, + format = '%(asctime)s (%(threadName)-2s) %(message)s' +) + +class ActivePool(object): + def __init__(self): + super(ActivePool,self).__init__() + self.active = [] + self.lock = threading.Lock() + + def makeActive(self, name): + with self.lock: + self.active.append(name) + logging.debug('Running:%s', self.active) + + def makeInactive(self, name): + with self.lock: + self.active.remove(name) + logging.debug('Running:%s', self.active) + +def worker(s, pool): + logging.debug('Waiting to join the pool') + with s: + name = threading.currentThread().getName() + pool.makeActive(name) + time.sleep(0.1) + pool.makeInactive(name) + + +pool = ActivePool() +if __name__ == '__main__': + s = threading.Semaphore(2) # 线程池 + for i in range(4): + t = threading.Thread(target = worker, + name = str(i), + args = (s, pool)) + t.start() diff --git a/thread/thread_sub.py b/thread/thread_sub.py index 1ab3a9a..7139821 100644 --- a/thread/thread_sub.py +++ b/thread/thread_sub.py @@ -2,22 +2,30 @@ import threading import logging logging.basicConfig(level=logging.DEBUG, - format='(%(threadName)-10s) %(message)s') + format='(%(threadName)-10s) %(message)s') class MyThread(threading.Thread): - def __init__(self, group=None, target=None, name=None,args=(), kwargs=None, verbose=None): - threading.Thread.__init__(self,group=group, - target=target, - name=name, - verbose=verbose) - self.args = args - self.kwargs = kwargs + def __init__(self, group=None, target=None, name=None,args=(), kwargs=None, verbose=None): + threading.Thread.__init__(self,group=group, + target=target, + name=name, + verbose=verbose) + + self.args = args + self.kwargs = kwargs + print target - def run(self): - logging.debug('running %s and %s', - self.args, - self.kwargs) - return + + def run(self): + logging.debug('running %s and %s', + self.args, + self.kwargs) + return + +def handler(): + # print 'args: %s, %s' %(args, kwargs) + print 'hello world' for i in range(5): - t = MyThread(args=(i,),kwargs={'a':'A', 'b':'B'}) - t.start() + thread_name = 'thread_name: %s' % (i) + t = MyThread(args=(i,), kwargs={'a':'A', 'b':'B'}, target=handler, name=thread_name) + t.start() diff --git a/thread/thread_timmer.py b/thread/thread_timmer.py index 7243b1c..ff41597 100644 --- a/thread/thread_timmer.py +++ b/thread/thread_timmer.py @@ -2,6 +2,8 @@ import threading import time import logging +#延时一定的时候后开始工作,可以再这个延时期间内的任意时刻取消 +#非守护线程 主程序完成时,线程会隐式退出 logging.basicConfig(level=logging.DEBUG, format='(%(threadName)-10s) %(message)s') def delayed(): diff --git a/thread/thread_with_lock.py b/thread/thread_with_lock.py new file mode 100644 index 0000000..d923353 --- /dev/null +++ b/thread/thread_with_lock.py @@ -0,0 +1,28 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# Created by iFantastic on 15-2-14 +__author__ = 'cluo' +import threading +import logging +logging.basicConfig(level=logging.DEBUG, + format='(%(threadName)-10s) %(message)s' + ) +def worker_with(lock): + with lock: #避免了 lock.acquire lock.release + logging.debug('Lock acquired via with') +def worker_no_with(lock): + lock.acquire() + try: + logging.debug('Lock acquired directly') + finally: + lock.release() + + + +if __name__ == '__main__': + lock = threading.Lock() + w = threading.Thread(target = worker_with, args = (lock,)) + nw = threading.Thread(target = worker_no_with, args = (lock,)) + w.start() + nw.start() + diff --git a/thread/threads_list_enumerate.py b/thread/threads_list_enumerate.py index 294b4a6..7f2f503 100644 --- a/thread/threads_list_enumerate.py +++ b/thread/threads_list_enumerate.py @@ -1,9 +1,10 @@ +#-*- coding:utf8 -*- __author__ = 'admin' import random import threading import time import logging - +#列举所有线程 logging.basicConfig( level =logging.DEBUG, format ='(%(threadName)-10s) %(message)s' @@ -24,8 +25,8 @@ def worker(): main_thread = threading.currentThread() print(threading.enumerate()) -for t in threading.enumerate(): - if t is main_thread: +for t in threading.enumerate(): #列出当前所有线程 包括主线程 + if t is main_thread: #等待当前线程完成 会引入死锁必须跳过 continue logging.debug('joining %s', t.getName()) t.join() diff --git a/thread/timeit_lab.py b/thread/timeit_lab.py new file mode 100644 index 0000000..19fec5b --- /dev/null +++ b/thread/timeit_lab.py @@ -0,0 +1,55 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# Created by cluo on 15-2-15 +__author__ = 'cluo' +import timeit +# timeit 模块 +# +# timeit 模块定义了接受两个参数的 Timer 类。两个参数都是字符串。 +# 第一个参数是你要计时的语句或者函数。 +# 传递给 Timer 的第二个参数是为第一个参数语句构建环境的导入语句。 +# 从内部讲, timeit 构建起一个独立的虚拟环境, 手工地执行建立语句,然后手工地编译和执行被计时语句。 +# 一旦有了 Timer 对象,最简单的事就是调用 timeit(),它接受一个参数为每个测试中调用被计时语句的次数,默认为一百万次; +# 返回所耗费的秒数。 +# Timer 对象的另一个主要方法是 repeat(), 它接受两个可选参数。 第一个参数是重复整个测试的次数, +# 第二个参数是每个测试中调用被计时语句的次数。 两个参数都是可选的,它们的默认值分别是 3 和 1000000。 +# repeat() 方法返回以秒记录的每个测试循环的耗时列表。 +# Python 有一个方便的 min 函数可以把输入的列表返回成最小值,如: min(t.repeat(3, 1000000)) +# 你可以在命令行使用 timeit 模块来测试一个已存在的 Python 程序,而不需要修改代码。 +# 具体可参见文档: http://docs.python.org/library/timeit.html + +def test1(): + n=0 + for i in range(101): + n+=i + return n + +def test2(): + return sum(range(101)) + +def test3(): + return sum(x for x in range(101)) + +if __name__=='__main__': + from timeit import Timer + t1=Timer("test1()","from __main__ import test1") #构建独立的虚拟环境 并且手动编译和执行语句 + t2=Timer("test2()","from __main__ import test2") + t3=Timer("test3()","from __main__ import test3") + print t1.timeit(1000000) + print t2.timeit(1000000) + print t3.timeit(1000000) #调用语句次数 + print t1.repeat(3,1000000) #重复测试次数 调用语句的次数 + print t2.repeat(3,1000000) + print t3.repeat(3,1000000) + +#不推荐 的计算耗时的方法 +from time import clock +def test(): + L=[] + for i in range(100): + L.append(i) +start=clock() +for i in range(1000000): + test() +finish=clock() +print (finish-start)/1000000 \ No newline at end of file diff --git a/thread_lock_holder/__init__.py b/thread_lock_holder/__init__.py deleted file mode 100644 index eabb3c0..0000000 --- a/thread_lock_holder/__init__.py +++ /dev/null @@ -1 +0,0 @@ -__author__ = 'admin' diff --git a/urllib2_lab/map_open.py b/urllib2_lab/map_open.py new file mode 100644 index 0000000..c5d75cc --- /dev/null +++ b/urllib2_lab/map_open.py @@ -0,0 +1,19 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# Created by iFantastic on 15-2-14 +__author__ = 'cluo' +import urllib2 + +# map 这一小巧精致的函数是简捷实现 Python 程序并行化的关键。 +# map 源于 Lisp 这类函数式编程语言。它可以通过一个序列实现两个函数之间的映射。 +urls = ['http://www.yahoo.com', 'http://www.reddit.com'] +results = map(urllib2.urlopen, urls) +print results + + +# 上面的这两行代码将 urls 这一序列中的每个元素作为参数传递到 urlopen 方法中, +# 并将所有结果保存到 results 这一列表中。其结果大致相当于: +results = [] +for url in urls: + results.append(urllib2.urlopen(url)) +print results diff --git a/zmq/zmqpolling.py b/zmq/zmqpolling.py index 90d6263..4b6574f 100644 --- a/zmq/zmqpolling.py +++ b/zmq/zmqpolling.py @@ -3,6 +3,8 @@ # Created by iFantastic on 15-2-11 __author__ = 'cluo' +#poller 注册轮询 + import zmq import time import sys