抱歉,您的浏览器无法访问本站
本页面需要浏览器支持(启用)JavaScript
了解详情 >

题外话

一款进度条库

pip install tqdm

多线程与多进程

什么是进程和线程

简单概述进程和线程的关系

  • 一个工厂,至少有一个车间,一个车间至少有一个工人,最终是工人在工作。
  • 一个程序,至少有一个进程,一个车间至少有一个线程,最终是线程在工作。

    进程

    是计算机资源分配的最小单元,为线程提供资源

    线程

    是计算机中可以被CPU执行的最小单元,是真正在工作的单位

    多线程和多进程

    多线程

    多线程常见方法

  • start(): 当前线程准备就绪,等待CPU调度,具体执行时间由CPU决定。主线程不会等待子线程执行完毕,直接往下走。
  • join(): 当前线程等待子线程执行完毕,再往下走。
  • setDaemon(boolean): 设置当前线程是否为守护线程。(主线程执行完毕是否等待子线程执行完毕,true为不等待,false为等待)
  • setName(String): 设置当前线程的名称。

    自定义多线程

    import time
    import threading
    class MyThread(threading.Thread):
      # 实例化执行的方法
      def run(self):
          print("执行此线程",self._args)
    t= MyThread(args=("我是参数1","我是参数2"))
    t.start()
    
    自定义多线程实列
    ```python

    自定义线程实列

    import requests # 导入requests
    import threading
    url_list = [
    {‘name’:’1.mp4’,’url’:’https://api.007666.xyz/bilibili/video/url?id=BV1Rm4y1R7rT'},
    {‘name’:’2.mp4’,’url’:’https://api.007666.xyz/bilibili/video/url?id=BV1Rm4y1R7rT'},
    {‘name’:’3.mp4’,’url’:’https://api.007666.xyz/bilibili/video/url?id=BV1Rm4y1R7rT'},
    ]

class DowBili(threading.Thread):
def run(self):
name,url = self._args
r = requests.get(url)
with open(name,’wb’) as f:
f.write(r.content)

for i in url_list:
dow = DowBili(target=requests.get,args=(i[‘name’],i[‘url’]))
dow.start()

#### 线程安全
##### 设置线程锁
* 在多线程环境下,线程安全是一个重要的问题。如果两个线程同时修改一个变量,可能会出现不可预料的结果。在Python中,通过使用锁机制来解决线程安全问题。
```python
# 手动上锁解锁
import threading
lock_object = threading.RLock()
num = 0
def add(count):
    lock_object.acquire() # 申请锁(没有申请到就等待)
    global num
    for i in range(count):
        num += 1
    lock_object.release() # 释放锁
def sub(count):
    lock_object.acquire() # 申请锁(没有申请到就等待)
    global num
    for i in range(count):
        num -= 1
    lock_object.release() # 释放锁
t1 = threading.Thread(target=add, args=(1000000,))
t2 = threading.Thread(target=sub, args=(1000000,))
t1.start()
t2.start()
t1.join()  
t2.join()
print(num)
```python
# 上下文自动进行上锁解锁
import threading
lock_object = threading.RLock()
sum = 0
def add(count):
    with lock_object:
        global sum
        for i in range(count):
            sum += 1
    print(sum)

for i in range(10):
    t = threading.Thread(target=add, args=(1000000,))
    t.start()
线程安全类型

是指有一些类型内部已经实现了线程安全的功能,例如:

# 带线程锁的类型实列
import threading

data_list = []

lock = threading.RLock()

def append_data(count):
    for i in range(count):
        data_list.append(i)
    print(len(data_list))

for i in range(2):
    t = threading.Thread(target=append_data, args=(100,))
    t.start()
Lock和RLock的区别
  • Lock: 不可套娃锁,已经被锁的代码块里面不能再被锁。强行加锁会造成死锁(程序卡死)。但是Lock反复上锁解锁效率高。
  • RLock: 可重入锁,已经被锁的代码块里面能再被锁。但是RLock反复上锁解锁效率低。

  • 注:一般碰到调用别人上了锁的代码块或方法,就用RLock。

线程池

python3才提供线程池。线程不是开的越多越好,多了效率会下降,所以要有一个线程池来管理线程。

不等待线程池的任务完成,就直接返回。

# 线程池
import time
from concurrent.futures import ThreadPoolExecutor

def task(name):
    print('{} is running...'.format(name))
    time.sleep(1)

# 创建线程池,最大线程数为10,如果线程池中的线程数超过了最大线程数,那么新加入的任务将会等待线程池中的线程执行完毕后再执行
pool = ThreadPoolExecutor(10)

for i in range(20):
    pool.submit(task, i)

print('all task is done!')

等待线程池的任务完成,才能返回。

# 线程池
import time
from concurrent.futures import ThreadPoolExecutor

def task(name):
    print('{} is running...'.format(name))
    time.sleep(1)

# 创建线程池,最大线程数为10,如果线程池中的线程数超过了最大线程数,那么新加入的任务将会等待线程池中的线程执行完毕后再执行
pool = ThreadPoolExecutor(10)

for i in range(20):
    pool.submit(task, i)
print('wait for all task done...')
pool.shutdown(wait=True)
print('all task is done!')
线程池完成的回调
# 线程池执行完毕的回调
import time
from concurrent.futures import ThreadPoolExecutor
def task(name):
    print('{} is running...'.format(name))
    time.sleep(1)
    return name
pool = ThreadPoolExecutor(10)
for i in range(20):

    # 线程池提交会返回一个futures对象
    future =  pool.submit(task, i)
    # 使用add_done_callback方法添加回调函数
    future.add_done_callback(lambda x: print('{} is done'.format(x.result())))

多进程

多进程常见方法
  • start: 当前进程准备就绪,等待CPU调度,具体执行时间由CPU决定。最终工作的是进程中的线程。
  • join: 等待当前进程的所有线程执行完毕再执行下一步。
  • daemon: 设置当前进程为守护进程。

自定义进程类

# 自定义进程
import multiprocessing

class MyProcess(multiprocessing.Process):
    def run(self):
        print('执行了进程:', self.name)

if __name__ == '__main__':
    p = MyProcess(name='test')
    p.start()

CPU个数

# CPU个数
import multiprocessing

cpu_count = multiprocessing.cpu_count()
print(cpu_count)

进程间数据的共享

键值对共享数据
from multiprocessing import Process,Value,Array

def fun(v1,v2,v3,a1,a2):
    v1.value = 666
    v2.value = 'a'.encode('utf-8')
    v3.value = "嗨"
    a1[0] = 'z'.encode('utf-8')
    a2[0] = 123

if __name__ == '__main__':
    '''可选的数据类型
        "c" -- 字符型 "u: -- 无符号字符型
        "b" -- 布尔   "B" -- 无符号布尔
        "h" -- 短整型 "H" -- 无符号短整型
        "i" -- 整型   "I" -- 无符号整型
        "l" -- 长整型 "L" -- 无符号长整型
        "f" -- 浮点型 "F" -- 无符号浮点型
    '''
    v1 = Value('i',0) # Value(类型,初始值)
    v2 = Value('c')
    v3 = Value('u')
    a1 = Array('c',3) # Array(类型,长度:可以是整数或者数组,长度不可更改)
    a2 = Array('i',[1,2,3])

    p = Process(target=fun,args=(v1,v2,v3,a1,a2))
    p.start()
    p.join()

    print(v1.value)
    print(v2.value)
    print(v3.value)
    print(a1[:])
    print(a2[:])
Manager使用dict和list共享数据
from multiprocessing import Process,Manager

def fun(dict,list):
    dict[1] = '1'
    dict['2'] = 2
    dict[0.25] = None
    list.append(6666)

if __name__ == '__main__':
    with Manager() as manager:
        d = manager.dict({1:'a',"2":'b',0.25:'c'})
        l = manager.list()

        p = Process(target=fun,args=(d,l))
        p.start()
        p.join()

        print(d)
        print(l)
基于队列Queue的进程间通信

怎么选择多进程还是多线程

for i in url_list:
t = threading.Thread(target=download,args=(i[‘name’],i[‘url’]))
t.start()

### 多进程
```python
import time
import requests # 导入requests
import multiprocessing
url_list = [
    {'name':'1.mp4','url':'https://api.007666.xyz/bilibili/video/url?id=BV1Rm4y1R7rT'},
    {'name':'2.mp4','url':'https://api.007666.xyz/bilibili/video/url?id=BV1Rm4y1R7rT'},
    {'name':'3.mp4','url':'https://api.007666.xyz/bilibili/video/url?id=BV1Rm4y1R7rT'},
]
start = time.time()
end = 0
def download(name,url):
    res = requests.get(url)
    with open(name,'wb') as f:
        f.write(res.content)
    end = time.time()
    print("耗时:",end-start)
if __name__ == '__main__': # 多进程必须要有这个判断,如果不需要就加上:multiprocessing.set_start_method('fork')
    for i in url_list:
        t = multiprocessing.Process(target=download,args=(i['name'],i['url']))
        t.start()

多进程和多进程的混合

# 多进程与多进程混合
import time 
import multiprocessing
import threading

# 多线程执行函数
def thread_run(name):
    print('%s is running' % name)

# 多进程执行函数
def process_run(name):
    # 多线程
    t1 = threading.Thread(target=thread_run, args=(name,))
    t1.start()

    t2 = threading.Thread(target=thread_run, args=(name,))
    t2.start()

    t3 = threading.Thread(target=thread_run, args=(name,))
    t3.start()

if __name__ == '__main__':
    # 多进程
    p1 = multiprocessing.Process(target=process_run, args=('process1',))
    p1.start()

    p2 = multiprocessing.Process(target=process_run, args=('process2',))
    p2.start()

GIL锁

是什么是GIL锁

GIL(全局解释器锁),是CPthon特有的一个玩意,让一个进程一个时间单位只能有一个被CPU调用。
由于GIL锁的存在,所以在选择多线程和多进程的时候

如果想利用CPU的多核,让CPU同时处理一些任务,那么就必须要使用多进程,而不能使用多线程,即使资源开销大。
如果不利于CPU的多核,那么就可以使用多线程,而不能使用多进程。

评论