python之多线程操作

前言

什么是多线程

多线程是指在一个程序中同时创建和使用多个执行流(thread)来执行不同的任务。这样多个任务就可以同时进行,从而提高程序的执行效率。

在python使用多线程的方法

在 Python 中有两种方法可以使用多线程:使用 Python 自带的 threading 模块,或使用第三方库 multiprocessing

多线程的优势

多线程的好处在于可以利用多核 CPU 的优势,让程序在等待 I/O 操作时使用其他 CPU 核心,从而提高程序的效率。不过由于 Python 的全局解释器锁(Global Interpreter Lock, 简称GIL)的存在,在 Python 中创建的线程并不能真正并行执行。如果你需要在 Python 中创建多个真正并行的线程,可以使用 multiprocessing 库。

线程的安全问题

在使用多线程时,需要注意线程安全问题。如果多个线程同时访问同一个变量,可能会导致数据不一致的问题。为了解决这个问题,可以使用 threading 模块中的锁(Lock)和条件变量(Condition)来控制线程同步

使用锁的方式可以保证线程安全,但是会导致线程的执行效率降低。因此在使用多线程时,需要谨慎考虑是否真的需要使用多线程

什么是阻塞

在多线程编程中,阻塞指的是一个线程被暂停执行,直到某个特定的条件被满足才能继续执行

例如在Python中,Queue类的get()方法会阻塞当前线程,直到有数据可以取出。这意味着,如果你在一个线程中调用get()方法,该线程会被暂停,直到有数据可以取出才能继续执行

同样,Queue类的put()方法也会阻塞当前线程,如果队列已满的话。这意味着,如果你在一个线程中调用put()方法,该线程会被暂停,直到有空位可以插入新的数据项才能继续执行

threading模块常用方法

theading模块常用操作

1.创建线程

首先,我们需要创建一个 Thread 类的实例,并将要执行的函数作为参数传递给该实例。例如:

import threading

def my_function():
    print("Hello from a new thread!")

thread = threading.Thread(target=my_function)

可以使用 Thread 类的 daemon 属性来设置线程为守护线程。守护线程是一种特殊的线程,它在主线程结束时会自动结束

要将线程设置为守护线程,只需将其 daemon 属性设置为 True

import threading

def my_function():
    print("Hello from a new thread!")

thread = threading.Thread(target=my_function, name="MyThread", daemon=True)

2.启动线程

接下来,可以使用 thread.start() 方法来启动新线程,运行 my_function 函数。

import threading

def my_function():
    print("Hello from a new thread!")

thread = threading.Thread(target=my_function)
thread.start()

3.等待线程结束

如果要等待所有线程完成,可以使用 thread.join() 方法。例如:

import threading

def my_function():
    print("Hello from a new thread!")

thread = threading.Thread(target=my_function)
thread.start()
thread.join()

4.判断线程是否运行

使用 is_alive() 方法可以检查线程是否在运行

在下面例子中,我们调用了线程的 is_alive() 方法来检查它是否在运行。如果线程仍在运行,则会输出 "Thread is still running.",否则会输出 "Thread is not running."

import threading

def my_function():
    print("Hello from a new thread!")

thread = threading.Thread(target=my_function)
thread.start()
if thread.is_alive():
    print("Thread is still running.")
else:
    print("Thread is not running.")

5.设置和获取线程名称

可以使用 Thread 类的 setName()getName() 方法来设置和获取线程的名称

import threading

def my_function():
    print("Hello from a new thread!")

thread = threading.Thread(target=my_function)
thread.setName("MyThread")
print(thread.getName())

6.查看当前活动线程数

使用 Python 的内置函数 threading.active_count() 来查看当前活动的线程数

import threading

def my_function():
    print("Hello from a new thread!")

thread1 = threading.Thread(target=my_function)
thread2 = threading.Thread(target=my_function)

thread1.start()
thread2.start()

print(threading.active_count())  #输出2

7.枚举当前活动线程

使用 threading.enumerate() 函数来枚举所有当前活动的线程

import threading

def my_function():
    print("Hello from a new thread!")

thread1 = threading.Thread(target=my_function)
thread2 = threading.Thread(target=my_function)

thread1.start()
thread2.start()

threads = threading.enumerate()
for thread in threads:
    print(thread.getName())

多线程实现共享资源访问

设置线程执行的先后顺序

1.使用join()

首先举个没有使用join()函数的线程例子, 在下述代码的输出可知, 线程的执行是并发的(一起执行的), 并没有明确要求先执行完work1线程后再执行work2线程

import threading
import time
def work1():
    for i in range(4):
        time.sleep(0.5)
        print("work1")

def work2():
    for i in range(4):
        time.sleep(0.5)
        print("work2")

def main():
    thread_add1 = threading.Thread(target=work1)
    thread_add1.start()
    thread_add2 = threading.Thread(target=work2)
    thread_add2.start()
main()

下面是使用了join()方法的代码, 从输出结果上看, 可以发现程序先等work1线程执行完后才能执行work2线程

import threading
import time
def work1():
    for i in range(4):
        time.sleep(0.5)
        print("work1")

def work2():
    for i in range(4):
        time.sleep(0.5)
        print("work2")

def main():
    thread_add1 = threading.Thread(target=work1,name="work1") #name参数:给线程命名
    thread_add1.start()
    thread_add1.join()
    thread_add2 = threading.Thread(target=work2,name="work2")
    thread_add2.start()
main()

2.使用threading.Lock()

当多个线程试图访问同一个资源时,可能会发生资源竞争。为了避免这种情况,您可以使用 lock() 方法来锁定资源,并在访问完成后解锁

在下面的代码中, 我们使用 lock.acquire() 方法锁定资源,然后使用 lock.release() 方法在访问完成后解锁。这样只有一个线程可以访问资源,其他线程必须等待

import threading
import time
lock = threading.Lock()
def work1():
    lock.acquire()
    for i in range(4):
        time.sleep(0.5)
        print("work1")
    lock.release()

def work2():
    lock.acquire()
    for i in range(4):
        time.sleep(0.5)
        print("work2")
    lock.release()

def main():
    thread_add1 = threading.Thread(target=work1,name="work1") #name参数:给线程命名
    thread_add1.start()
    thread_add2 = threading.Thread(target=work2,name="work2")
    thread_add2.start()


main()

3.使用with函数

利用with函数你就可以不写上锁解锁这两行代码了,和文件打开和关闭差不多

import threading
import time
lock = threading.Lock()
def work1():
    with lock:
        for i in range(4):
            time.sleep(0.5)
            print("work1")


def work2():
    with lock:
        for i in range(4):
            time.sleep(0.5)
            print("work2")


def main():
    thread_add1 = threading.Thread(target=work1,name="work1") #name参数:给线程命名
    thread_add1.start()
    thread_add2 = threading.Thread(target=work2,name="work2")
    thread_add2.start()

main()

控制线程同时执行的数量

使用threading.BoundedSemaphore()方法来控制线程同时执行的数量

import threading
import os
import time
semaphore = threading.BoundedSemaphore(5) #设置只能允许5个线程同时进行

def work1(se):
    se.acquire()
    print("test")
    se.release()

def main():
    for i in range(1,15):
        thread = threading.Thread(target=work1,args=(semaphore,)) #这里要注意,args参数需传递一个元组
        thread.start()
main()

创建自定义线程类

在 Python 中,线程的工作是通过实现一个名为 run 的方法来进行的。这个方法是在线程启动时自动运行的,并且是线程中执行的主要任务

你可以通过继承 Thread 类并实现 run 方法来创建自定义线程类。例如下面的代码:

import threading

class MyThread(threading.Thread):
    def __init__(self, name):
        super().__init__()
        self.name = name

    def run(self):
        print(f"{self.name} starting")
        print(f"{self.name} finishing")

然后你可以使用这个类来创建线程

thread = MyThread("MyThread")
thread.start()

在这种情况下,当调用 thread.start() 时,将会自动调用 MyThread 类的 run 方法

需要注意的是,你不应该直接调用 run 方法,而应该使用 start 方法来启动线程。如果你直接调用 run 方法,那么它将在主线程中运行,而不是在单独的线程中运行

还有一点需要注意的是,线程的 run 方法是可以被重写的

线程优先级队列

什么是Queue类

Queue是Python中的一个线程安全的队列类, 它支持在队列的两端执行高效的插入和删除操作, 还支持阻塞和超时功能

Queue类位于Python标准库中的queue模块中。你可以使用如下代码导入Queue类

from queue import Queue

Queue类常用方法

Queue类的简单使用

创建一个Queue对象,并选择是否指定队列的最大容量

q = Queue()          # 创建一个无限容量的队列
q = Queue(maxsize=5) # 创建一个最大容量为5的队列

然后你就可以使用put()get()方法来存取数据项了, 可以使用以下代码将数据项放入队列中

q.put(1)
q.put(2)
q.put(3)

要从队列中取出数据项,可以使用get()方法

print(q.get()) # 1
print(q.get()) # 2
print(q.get()) # 3

可以使用以下代码检查队列是否为空

if q.empty():
    print('Queue is empty')
else:
    print('Queue is not empty')

可以使用以下代码检查队列是否已满

if q.full():
    print('Queue is full')
else:
    print('Queue is not full')

线程池实现

假设我们想要实现一个线程池,其中有若干个工作线程来执行任务,并且有一个任务队列来存储待执行的任务。

我们可以使用Queue类来实现这个任务队列, 下面是一个使用Queue类实现线程池的例子

import threading
from queue import Queue

# 定义一个工作线程类
class Worker(threading.Thread):
    def __init__(self, queue):
        super().__init__()
        self.queue = queue

    def run(self):
        while True:
            # 从队列中取出一个任务
            task = self.queue.get()
            # 执行任务
            print(f'{threading.current_thread().name} is executing task: {task}')
            # 通知队列任务已经完成
            self.queue.task_done()

# 创建一个任务队列
queue = Queue()

# 创建3个工作线程
for i in range(3):
    worker = Worker(queue)
    worker.start()

# 将任务放入队列
for i in range(10):
    queue.put(i)

# 阻塞直到队列中的所有任务都完成
queue.join()

print('All tasks are done!')

'''
输出结果如下:
Thread-1 is executing task: 0
Thread-1 is executing task: 1
Thread-1 is executing task: 2
Thread-1 is executing task: 3
Thread-1 is executing task: 4
Thread-1 is executing task: 5
Thread-1 is executing task: 6
Thread-3 is executing task: 7
Thread-3 is executing task: 8
Thread-2 is executing task: 9
All tasks are done!
'''

在上面的例子中,我们只是创建了3个工作线程,但这不一定是最优的做法。如果任务队列中的任务数量很多,3个工作线程可能不够。同时,如果任务数量很少,3个工作线程可能过于浪费资源。

因此,我们可以在线程池中动态地增加或减少工作线程的数量。这样,我们就可以根据实际需求来动态调整工作线程的数量,从而提高线程池的效率。

下面是一个动态调整工作线程数量的例子:

import threading
from queue import Queue

# 定义一个工作线程类
class Worker(threading.Thread):
    def __init__(self, queue):
        super().__init__()
        self.queue = queue

    def run(self):
        while True:
            # 从队列中取出一个任务
            task = self.queue.get()
            # 执行任务
            print(f'{threading.current_thread().name} is executing task: {task}')
            # 通知队列任务已经完成
            self.queue.task_done()

# 创建一个任务队列
queue = Queue()

# 初始时,创建3个工作线程
for i in range(3):
    worker = Worker(queue)
    worker.start()

# 将任务放入队列
for i in range(10):
    queue.put(i)

# 当任务队列中的任务数量大于5时,增加工作线程的数量
while queue.unfinished_tasks > 5:
    worker = Worker(queue)
    worker.start()

# 等待所有任务完成
queue.join()
print("All tasks complete")

注意

  • 使用threading模块的python文件名称不能取"threading", 否则导入threading模块时会报错, 例如: module 'threading' has no attribute '_shutdown'

  • 一旦线程启动,就不能重新启动。如果需要重新启动线程,则必须创建一个新的线程

  • 在使用线程时,需要注意资源竞争。在多线程环境中,各个线程可能会竞争共享资源,因此需要使用适当的同步机制来避免冲突

  • 线程是不能被强制终止的,因此您必须在线程中提供一个退出机制

  • 守护线程在后台运行,不会阻止程序的退出。如果您希望在线程中的任务执行完成后再退出程序,则不应使用守护线程

总结

总结一下,在 Python 中使用多线程的方法如下:

  1. 导入 threading 模块。

  2. 创建一个 Thread 类的实例,并将要执行的函数作为参数传递给该实例。

  3. 调用 thread.start() 方法启动新线程。

  4. 调用 thread.join() 方法等待所有线程完成。

最后更新于