簡單實(shí)現(xiàn)并發(fā):python concurrent模塊
技術(shù)支持服務(wù)電話:15308000360 【7x24提供運(yùn)維服務(wù),解決各類系統(tǒng)/軟硬件疑難技術(shù)問題】
可以使用python 3中的concurrent模塊
如果python環(huán)境是2.7的話,需要下載https://pypi.python.org/packages/source/f/futures/futures-2.1.6.tar.gz#md5=cfab9ac3cd55d6c7ddd0546a9f22f453
此futures包即可食用concurrent模塊。
官方文檔:http://pythonhosted.org//futures/
對(duì)于python來說,作為解釋型語言,Python的解釋器必須做到既安全又高效。我們都知道多線程編程會(huì)遇到的問題,解釋器要留意的是避免在不同的線程操作內(nèi)部共享的數(shù)據(jù),同時(shí)它還要保證在管理用戶線程時(shí)保證總是有最大化的計(jì)算資源。而python是通過使用全局解釋器鎖來保護(hù)數(shù)據(jù)的安全性:
python代碼的執(zhí)行由python虛擬機(jī)來控制,即Python先把代碼(.py文件)編譯成字節(jié)碼(字節(jié)碼在Python虛擬機(jī)程序里對(duì)應(yīng)的是PyCodeObject對(duì)象,.pyc文件是字節(jié)碼在磁盤上的表現(xiàn)形式),交給字節(jié)碼虛擬機(jī),然后虛擬機(jī)一條一條執(zhí)行字節(jié)碼指令,從而完成程序的執(zhí)行。python在設(shè)計(jì)的時(shí)候在虛擬機(jī)中,同時(shí)只能有一個(gè)線程執(zhí)行。同樣地,雖然python解釋器中可以運(yùn)行多個(gè)線程,但在任意時(shí)刻,只有一個(gè)線程在解釋器中運(yùn)行。而對(duì)python虛擬機(jī)的訪問由全局解釋器鎖來控制,正是這個(gè)鎖能保證同一時(shí)刻只有一個(gè)線程在運(yùn)行。在多線程的環(huán)境中,python虛擬機(jī)按一下方式執(zhí)行:
1,設(shè)置GIL(global interpreter lock).
2,切換到一個(gè)線程執(zhí)行。
3,運(yùn)行:
a,指定數(shù)量的字節(jié)碼指令。
b,線程主動(dòng)讓出控制(可以調(diào)用time.sleep(0))。
4,把線程設(shè)置為睡眠狀態(tài)。
5,解鎖GIL.
6,再次重復(fù)以上步驟。
GIL的特性,也就導(dǎo)致了python不能充分利用多核cpu。而對(duì)面向I/O的(會(huì)調(diào)用內(nèi)建操作系統(tǒng)C代碼的)程序來說,GIL會(huì)在這個(gè)I/O調(diào)用之前被釋放,以允許其他線程在這個(gè)線程等待I/O的時(shí)候運(yùn)行。如果線程并為使用很多I/O操作,它會(huì)在自己的時(shí)間片一直占用處理器和GIL。這也就是所說的:I/O密集型python程序比計(jì)算密集型的程序更能充分利用多線程的好處。
總之,不要使用python多線程,使用python多進(jìn)程進(jìn)行并發(fā)編程,就不會(huì)有GIL這種問題存在,并且也能充分利用多核cpu。
一,提供的功能
提供了多線程和多進(jìn)程的并發(fā)功能
二,基本方法
class concurrent.futures.Executor (注:Executor為ThreadPoolExecutor或者ProcessPoolExecutor)
提供的方法如下:
submit(fn, *args, **kwargs)
fn:為需要異步執(zhí)行的函數(shù)
args,kwargs:為給函數(shù)傳遞的參數(shù)
例:
#!/bin/env python
#coding:utf-8
import time,re
import os,datetime
from concurrent import futures
def wait_on_b():
print 5
time.sleep(2)
def wait_on_a():
print 6
time.sleep(2)
ex = futures.ThreadPoolExecutor(max_workers=2)
ex.submit(wait_on_b)
ex.submit(wait_on_a)
wait_on_a和wait_on_b函數(shù)會(huì)同時(shí)執(zhí)行,因?yàn)槭褂昧?個(gè)worker
#####################################
map(func, *iterables, timeout=None)
此map函數(shù)和python自帶的map函數(shù)功能類似,只不過concurrent模塊的map函數(shù)從迭代器獲得參數(shù)后異步執(zhí)行。并且,每一個(gè)異步操作,能用timeout參數(shù)來設(shè)置超時(shí)時(shí)間,timeout的值可以是int或float型,如果操作timeout的話,會(huì)raisesTimeoutError。如果timeout參數(shù)不指定的話,則不設(shè)置超時(shí)間。
func:為需要異步執(zhí)行的函數(shù)
iterables:可以是一個(gè)能迭代的對(duì)象,例如列表等。每一次func執(zhí)行,會(huì)從iterables中取參數(shù)。
timeout:設(shè)置每次異步操作的超時(shí)時(shí)間
例:
#!/bin/env python
#coding:utf-8
import time,re
import os,datetime
from concurrent import futures
data = [‘1‘,‘2‘]
def wait_on(argument):
print argument
time.sleep(2)
return ‘ok‘
ex = futures.ThreadPoolExecutor(max_workers=2)
for i in ex.map(wait_on,data):
print i
map函數(shù)異步執(zhí)行完成之后,結(jié)果也是list,數(shù)據(jù)需要從list中取出
######################################
submit函數(shù)和map函數(shù),根據(jù)需要,選一個(gè)使用即可。
shutdown(wait=True)
此函數(shù)用于釋放異步執(zhí)行操作后的系統(tǒng)資源。
If wait is True then this method will not return until all the pending futures are done executing and the resources associated with the executor have been freed. If wait is False then this method will return immediately and the resources associated with the executor will be freed when all pending futures are done executing. Regardless of the value of wait, the entire Python program will not exit until all pending futures are done executing.
You can avoid having to call this method explicitly if you use the with statement, which will shutdown the Executor (waiting as if Executor.shutdown() were called with wait set to True):
with ThreadPoolExecutor(max_workers=4) as e:e.submit(shutil.copy, ‘src1.txt‘, ‘dest1.txt‘)
三,完整的concurrent例子:
#!/bin/env python
#coding:utf-8
import time,re,fcntl
import os,datetime
from concurrent import futures
count_list = list()
MinuteNum = 1
StartTime = datetime.datetime(2014, 4, 16, 19, 31, 0, 484870)
NowTime = datetime.datetime.now()
os.system(‘:>new.txt‘)
f_new = open(‘new.txt‘,‘a‘)
def test(CountTimeFormat):
f = open(‘push_slave.stdout‘,‘r‘)
for line in f.readlines():
if re.search(CountTimeFormat,line):
#獲得文件專用鎖
fcntl.flock(f_new, fcntl.LOCK_EX)
f_new.writelines(line)
f_new.flush()
#釋放文件鎖
fcntl.flock(f_new, fcntl.LOCK_UN)
break
while 1:
AfterOneMinute = datetime.timedelta(minutes=MinuteNum)
CountTime = AfterOneMinute+StartTime
CountTimeFormat = CountTime.strftime(‘%Y-%m-%d %H:%M‘)
MinuteNum = MinuteNum+1
count_list.append(CountTimeFormat)
if CountTimeFormat == "2014-04-23 16:00":
break
def exec_cmd():
with futures.ProcessPoolExecutor(max_workers=24) as executor:
dict(( executor.submit(test, times), times) for times in count_list)
if __name__ == ‘__main__‘:
exec_cmd()
f_new.close()