1. 線程創(chuàng)建與管理
省流:python多線程效率堪憂,想了解這方面的去看第2小節(jié)GIL,想繼續(xù)看看怎么使用的繼續(xù)接著看。
1.1 創(chuàng)建線程
Python提供了thread、threading等模塊來(lái)進(jìn)行線程的創(chuàng)建與管理,后者在線程管理能力上更進(jìn)一步,因此我們通常使用threading模塊。創(chuàng)建一個(gè)線程需要指定該線程執(zhí)行的任務(wù)(函數(shù)名)、以及該函數(shù)需要的參數(shù),示例代碼如下所示:
from threading import Thread, current_thread
def target01(args1, args2):
print("這里是{}".format(current_thread().name))
# 創(chuàng)建線程
thread01 = Thread(target=target01, args="參數(shù)", name="線程1")
# 設(shè)置守護(hù)線程【可選】
thread01.setDaemon(True)
# 啟動(dòng)線程
thread01.start()
1.2 設(shè)置守護(hù)線程
線程是程序執(zhí)行的最小單位,Python在進(jìn)程啟動(dòng)起來(lái)后,會(huì)自動(dòng)創(chuàng)建一個(gè)主線程,之后使用多線程機(jī)制可以在此基礎(chǔ)上進(jìn)行分支,產(chǎn)生新的子線程。子線程啟動(dòng)起來(lái)后,主線程默認(rèn)會(huì)等待所有線程執(zhí)行完成之后再退出。但是我們可以將子線程設(shè)置為守護(hù)線程,此時(shí)主線程任務(wù)一旦完成,所有子線程將會(huì)和主線程一起結(jié)束(就算子線程沒(méi)有執(zhí)行完也會(huì)退出)。
守護(hù)線程可以在線程啟動(dòng)之前,通過(guò)setDaemon(True)的形式進(jìn)行設(shè)置,或者在創(chuàng)建子線程對(duì)象時(shí),以參數(shù)的形式指定:
thread01 = Thread(target=target01, args="", name="線程1", daemon=True)
但是需要注意,如果希望主程序不等待任何線程直接退出,只有所有的線程都被設(shè)置為守護(hù)線程才有用。
1.3 設(shè)置線程阻塞
我們可以用join()方法使主線程陷入阻塞,以等待某個(gè)線程執(zhí)行完畢。因此這也是實(shí)現(xiàn)線程同步的一種方式。參數(shù) t i m e o u t timeouttimeout 可以用來(lái)設(shè)置主線程陷入阻塞的時(shí)間,如果線程不是守護(hù)線程,即沒(méi)有設(shè)置daemon為T(mén)rue,那么參數(shù) t i m e o u t timeouttimeout 是無(wú)效的,主線程會(huì)一直阻塞,直到子線程執(zhí)行結(jié)束。
測(cè)試代碼如下:
import time
from threading import Thread, current_thread
def target():
if current_thread().name == "1":
time.sleep(5)
else:
time.sleep(6)
print("線程{}已退出".format(current_thread().name))
thread01 = Thread(target=target, daemon=True, name="1")
thread02 = Thread(target=target, daemon=True, name="2")
thread01.start()
thread02.start()
print("程序因線程1陷入阻塞")
thread01.join(timeout=3)
print("程序因線程2陷入阻塞")
thread02.join(timeout=3)
print("主線程已退出")
1.4 線程間通信的方法
我們知道,線程之間共享同一塊內(nèi)存。子線程雖然可以通過(guò)指定target來(lái)執(zhí)行一個(gè)函數(shù),但是這個(gè)函數(shù)的返回值是沒(méi)有辦法直接傳回主線程的。我們使用多線程一般是用于并行執(zhí)行一些其他任務(wù),因此獲取子線程的執(zhí)行結(jié)果十分有必要。
直接使用全局變量雖然可行,但是資源的并發(fā)讀寫(xiě)會(huì)引來(lái)線程安全問(wèn)題。下面給出常用的兩種處理方式:
1.4.1 線程鎖
其一是可以考慮使用鎖來(lái)處理,當(dāng)多個(gè)線程對(duì)同一份資源進(jìn)行讀寫(xiě)操作時(shí),我們可以通過(guò)加鎖來(lái)確保數(shù)據(jù)安全。Python中給出了多種鎖的實(shí)現(xiàn),例如:同步鎖 Lock,遞歸鎖 RLock,條件鎖 Condition,事件鎖 Event,信號(hào)量鎖 Semaphore,這里只給出 Lock 的使用方式,其余的大家感興趣可以自己查閱。
可以通過(guò)threading.lock類(lèi)來(lái)創(chuàng)建鎖對(duì)象,一旦一個(gè)線程獲得一個(gè)鎖,會(huì)阻塞之后所有嘗試獲得該鎖對(duì)象的線程,直到它被重新釋放。這里舉一個(gè)例子,通過(guò)加鎖來(lái)確保兩個(gè)線程在對(duì)同一個(gè)全局變量進(jìn)行讀寫(xiě)時(shí)的數(shù)據(jù)安全:
from threading import Thread, Lock
from time import sleep
book_num = 100 # 圖書(shū)館最開(kāi)始有100本圖書(shū)
bookLock = Lock()
def books_return():
global book_num
while True:
bookLock.acquire()
book_num += 1
print("歸還1本,現(xiàn)有圖書(shū){}本".format(book_num))
bookLock.release()
sleep(1) # 模擬事件發(fā)生周期
def books_lease():
global book_num
while True:
bookLock.acquire()
book_num -= 1
print("借走1本,現(xiàn)有圖書(shū){}本".format(book_num))
bookLock.release()
sleep(2) # 模擬事件發(fā)生周期
if __name__ == "__main__":
thread_lease = Thread(target=books_lease)
thread_return = Thread(target=books_return)
thread_lease.start()
thread_return.start()
從結(jié)果中可以看出,其中沒(méi)有出現(xiàn)由于讀寫(xiě)沖突導(dǎo)致的數(shù)據(jù)錯(cuò)誤。
1.4.2 queue模塊(同步隊(duì)列類(lèi))
或者,我們可以采用Python的queue模塊來(lái)實(shí)現(xiàn)線程通信。Python中的 q u e u e queuequeue 模塊實(shí)現(xiàn)了多生產(chǎn)者、多消費(fèi)者隊(duì)列,特別適用于在多線程間安全的進(jìn)行信息交換。該模塊提供了4種我們可以利用的隊(duì)列容器,分別 Q u e u e QueueQueue(先進(jìn)先出隊(duì)列)、L i f o Q u e u e LifoQueueLifoQueue(先進(jìn)后出隊(duì)列)、P r i o r t y Q u e u e PriortyQueuePriortyQueue(優(yōu)先級(jí)隊(duì)列)、S i m p l e Q u e u e SimpleQueueSimpleQueue(無(wú)界的先進(jìn)先出隊(duì)列,簡(jiǎn)單實(shí)現(xiàn),缺少Q(mào)ueue中的任務(wù)跟蹤等高級(jí)功能)。下面我們以 Q u e u e QueueQueue 為例介紹其使用方法,其他容器請(qǐng)自行查閱。
Queue(maxsize=5) # 創(chuàng)建一個(gè)FIFO隊(duì)列,并制定隊(duì)列大小,若maxsize被指定為小于等于0,則隊(duì)列無(wú)限大
Queue.qsize() # 返回隊(duì)列的大致大小,注意并不是確切值,所以不能被用來(lái)當(dāng)做后續(xù)線程是否會(huì)被阻塞的依據(jù)
Queue.empty() # 判斷隊(duì)列為空是否成立,同樣不能作為阻塞依據(jù)
Queue.full() # 判斷隊(duì)列為滿是否成立,同樣不能作為阻塞依據(jù)
Queue.put(item, block=True, timeout=None) # 投放元素進(jìn)入隊(duì)列,block為T(mén)rue表示如果隊(duì)列滿了投放失敗,將阻塞該線程,timeout可用來(lái)設(shè)置線程阻塞的時(shí)間長(zhǎng)短(秒);
# 注意,如果block為False,如果隊(duì)列為滿,則將直接引發(fā)Full異常,timeout將被忽略(在外界用try處理異常即可)
Queue.put_nowait(item) # 相當(dāng)于put(item, block=False)
Queue.get(block=True, timeout=False) # 從隊(duì)列中取出元素,block為False而隊(duì)列為空時(shí),會(huì)引發(fā)Empty異常
Queue.get_nowait() # 相當(dāng)于get(block=False)
Queue.task_done() # 每個(gè)線程使用get方法從隊(duì)列中獲取一個(gè)元素,該線程通過(guò)調(diào)用task_done()表示該元素已處理完成。
Queue.join() # 阻塞至隊(duì)列中所有元素都被處理完成,即隊(duì)列中所有元素都已被接收,且接收線程全已調(diào)用task_done()。
下面給出一個(gè)例子,場(chǎng)景是3個(gè)廚師給4個(gè)客人上菜,這是對(duì)多生產(chǎn)者多消費(fèi)者場(chǎng)景的模擬:
import queue
from random import choice
from threading import Thread
q = queue.Queue(maxsize=5)
dealList = ["紅燒豬蹄", "鹵雞爪", "酸菜魚(yú)", "糖醋里脊", "九轉(zhuǎn)大腸", "陽(yáng)春面", "烤鴨", "燒雞", "剁椒魚(yú)頭", "酸湯肥牛", "燉羊肉"]
def cooking(chefname: str):
for i in range(4):
deal = choice(dealList)
q.put(deal, block=True)
print("廚師{}給大家?guī)?lái)一道:{} ".format(chefname, deal))
def eating(custname: str):
for i in range(3):
deal = q.get(block=True)
print("顧客{}吃掉了:{} ".format(custname, deal))
q.task_done()
if __name__ == "__main__":
# 創(chuàng)建并啟動(dòng)廚師ABC線程,創(chuàng)建并啟動(dòng)顧客1234線程
threadlist_chef = [Thread(target=cooking, args=chefname).start() for chefname in ["A", "B", "C"]]
threadlist_cust = [Thread(target=eating, args=str(custname)).start() for custname in range(4)]
# 隊(duì)列阻塞,直到所有線程對(duì)每個(gè)元素都調(diào)用了task_done
q.join()
上述程序執(zhí)行結(jié)果如下圖所示:
1.5 殺死線程
在一些場(chǎng)景下,我們可能需要?dú)⑺滥硞€(gè)線程,但是在這之前,請(qǐng)仔細(xì)的考量代碼的上下文環(huán)境。強(qiáng)制殺死線程可能會(huì)帶來(lái)一些意想不到的結(jié)果,并且從程序設(shè)計(jì)來(lái)講,這本身就是不合理的。而且,鎖資源并不會(huì)因?yàn)楫?dāng)前線程的退出而釋放,這在程序運(yùn)行過(guò)程中,可能會(huì)成為典型的死鎖場(chǎng)景。所以殺死線程之前,請(qǐng)一定慎重。殺死線程的方法網(wǎng)上有好幾種,我這里給出一種我覺(jué)得比較穩(wěn)妥的方式。
前面我們提到過(guò)如何做線程通信,這里可以用全局變量給出一個(gè)flag,線程任務(wù)采用循環(huán)形式進(jìn)行,每次循環(huán)都會(huì)檢查該flag,外界可以通過(guò)修改這一flag來(lái)通知這一線程退出循環(huán),結(jié)束任務(wù),從而起到殺死線程的目的,但請(qǐng)注意,為了線程安全,退出前一定要釋放這一線程所占用的資源。下面給出一個(gè)示例程序:
from threading import Lock, Thread
from time import sleep
flag = True
lock = Lock()
def tar():
global flag, lock
while True:
lock.acquire()
"線程任務(wù)邏輯"
if flag is False:
break
lock.release()
lock.release()
if __name__ == "__main__":
thread = Thread(target=tar)
thread.start()
print("3秒后線程會(huì)被殺死")
sleep(3)
flag = False
print("線程已被殺死")
執(zhí)行結(jié)果如圖所示,如果需要其他的方法請(qǐng)自行查閱,網(wǎng)上有不少。
1.6 線程池的使用
在程序運(yùn)行過(guò)程之中,臨時(shí)創(chuàng)建一個(gè)線程需要耗費(fèi)不小的代價(jià)(包括與操作系統(tǒng)的交互部分),尤其是我們只對(duì)一個(gè)線程分配一個(gè)簡(jiǎn)短的任務(wù),此時(shí),頻繁的線程創(chuàng)建將會(huì)嚴(yán)重拖垮程序的執(zhí)行的效率。
因此,在這種情形下,我們可以選擇采用線程池技術(shù),即通過(guò)預(yù)先創(chuàng)建幾個(gè)空閑線程,在需要多線程來(lái)處理任務(wù)時(shí),將任務(wù)分配給一個(gè)處于空閑狀態(tài)的線程,該線程在執(zhí)行完成后,將會(huì)回歸空閑狀態(tài),而不是直接銷(xiāo)毀;而如果申請(qǐng)從線程池中分配一個(gè)空閑線程時(shí),遇到所有線程均處于運(yùn)行狀態(tài),則當(dāng)前線程可以選擇阻塞來(lái)等待線程資源的空閑。如此一來(lái),程序?qū)τ诰€程的管理將會(huì)更加靈活。
Python從3.2開(kāi)始,就將線程池作為內(nèi)置模塊包含了進(jìn)來(lái),可以通過(guò)concurrent.futures.ThreadPoolExecutor來(lái)調(diào)用,使用方法也很簡(jiǎn)單。下面給出線程池的程序例子:
from concurrent.futures import ThreadPoolExecutor
from time import sleep
tasklist = ["任務(wù)1", "任務(wù)2", "任務(wù)3", "任務(wù)4"]
def task(taskname: str):
sleep(5)
print(taskname + " 已完成\n")
return taskname + " 的執(zhí)行結(jié)果"
executor = ThreadPoolExecutor(max_workers=3) # 創(chuàng)建線程池(是一個(gè)ThreadPoolExecutor對(duì)象),線程數(shù)為3
future_a = executor.submit(task, tasklist[0]) # 通過(guò)submit方法向線程池提交任務(wù),返回一個(gè)對(duì)應(yīng)的Future對(duì)象
future_b = executor.submit(task, tasklist[1])
future_c = executor.submit(task, tasklist[2])
future_d = executor.submit(task, tasklist[3]) # 如果提交時(shí),線程池中沒(méi)有空余線程,則該線程會(huì)進(jìn)入等待狀態(tài),主線程不會(huì)阻塞
print(future_a.result(), future_b.result()) # 通過(guò)Future對(duì)象的result()方法獲取任務(wù)的返回值,若沒(méi)有執(zhí)行完,則會(huì)陷入阻塞
有關(guān)于線程池的詳細(xì)使用方法,我后面還會(huì)出一篇文章,大家這里沒(méi)理解的可以去看一下。
2. GIL 全局解釋器鎖
2.1 GIL是什么?
G I L GILGIL(G l o b a l I n t e r p r e t e r L o c k Global Interpreter LockGlobalInterpreterLock,全局解釋器鎖)是CPython中采用的一種機(jī)制,它確保同一時(shí)刻只有一個(gè)線程在執(zhí)行Python字節(jié)碼。給整個(gè)解釋器加鎖使得解釋器多線程運(yùn)行更方便,而且開(kāi)發(fā)的CPython也更易于維護(hù),但是代價(jià)是犧牲了在多處理器上的并行性。因此,在相當(dāng)多的場(chǎng)景中,CPython解釋器下的多線程機(jī)制的性能都不盡如人意。
2.2 GIL給Python帶來(lái)的影響?
上圖是David Beazley的UnderstandGIL幻燈片中的一張,用于描述GIL的執(zhí)行模型。
從這套幻燈篇的介紹中,我們可以得知,GIL本質(zhì)上是條件鎖與互斥鎖結(jié)合的一種二值信號(hào)量類(lèi)的一個(gè)實(shí)例,在程序執(zhí)行過(guò)程中,一個(gè)線程通過(guò)acquire操作獲得GIL,從而執(zhí)行其字節(jié)碼,而當(dāng)其遇到IO操作時(shí),他將會(huì)release釋放掉GIL鎖資源,GIL這時(shí)可以被其他線程獲得以執(zhí)行該線程的任務(wù),而原先線程的IO操作將會(huì)同時(shí)進(jìn)行。
由此我們可以看到,GIL使得Python多線程程序在計(jì)算密集的場(chǎng)景下,不能充分利用多核心并發(fā)的優(yōu)勢(shì)(因?yàn)闊o(wú)論機(jī)器有多少核心,并且無(wú)論有多少線程來(lái)執(zhí)行計(jì)算任務(wù),同時(shí)運(yùn)行的只有1個(gè)),而在IO密集的場(chǎng)景下,其性能受到的影響則較小。
2.3 如何繞過(guò)GIL?
那么如何避免GIL對(duì)多線程性能帶來(lái)的影響呢?
1. 繞過(guò)CPython,使用JPython(Java實(shí)現(xiàn)的)等別的Python解釋器
首先,GIL是CPython解釋器中的實(shí)現(xiàn),在JPython等其他解釋器中并沒(méi)有采用,因此我們可以考慮更換解釋器實(shí)現(xiàn)。我們現(xiàn)在從官網(wǎng)下載,或者通過(guò)Anaconda部署的解釋器普遍采用的是CPython,可以通過(guò)下面的方法安裝其它實(shí)現(xiàn):以JPython為例,去JPython官網(wǎng)下載其安裝包(jar包),然后用 java -jar (前提是你的電腦安裝了Java環(huán)境)去執(zhí)行它,最后再配置一下環(huán)境變量即可。
2. 把關(guān)鍵性能代碼,放到別的語(yǔ)言(一般是C++)中實(shí)現(xiàn)
這個(gè)是常用的一種方式,很多追求執(zhí)行性能的模塊例如Numpy、Pytorch等都是將自身性能代碼放在C語(yǔ)言擴(kuò)展中來(lái)完成的,如何開(kāi)發(fā)Python模塊的C語(yǔ)言擴(kuò)展部分,可以參考這個(gè)鏈接http://t.csdn.cn/4YuDO。
3. 并行改并發(fā),在Python中,多進(jìn)程有時(shí)比多線程管用
Python程序的每個(gè)進(jìn)程都有自己的GIL鎖,互補(bǔ)干涉,因此我們也可以直接使用多線程來(lái)處理一些計(jì)算任務(wù),Python的多線程可以使用m u l t i p r o c e s s i n g multiprocessingmultiprocessing模塊來(lái)完成,示例程序如下。
from multiprocessing import Process
def task(procName: int):
print("這是線程{}".format(procName))
if __name__ == "__main__":
proc1 = Process(target=task, args=(1,))
proc2 = Process(target=task, args=(2,))
proc1.start()
proc2.start()
2.4 使用GIL并非絕對(duì)線程安全
前面講了這么多,總而言之就是Python的多線程機(jī)制很奇怪,就算有多個(gè)物理核心,在任何時(shí)刻只會(huì)有一個(gè)線程在執(zhí)行。那么有人就會(huì)問(wèn),那我還要給公共資源加鎖干什么?
各位記住,Python的GIL只是負(fù)責(zé)Python解釋器的線程安全,也只能保證同時(shí)只有一個(gè)線程在執(zhí)行字節(jié)碼,而Python程序本身的線程安全,Python一概不負(fù)責(zé)。想了解詳情的,可以參考下網(wǎng)上的討論,比如這個(gè):https://www.zhihu.com/question/521650365