Что может компьютер, но не может большинство людей, — лежать на складе, запечатанным в коробке.
Джек Хэнди
Эта глава, а также две последующие — более сложные, чем остальные. Сейчас мы рассмотрим данные во времени (последовательный и конкурентный доступ на одном компьютере), в главе 16 поговорим о данных в коробке (хранение и извлечение данных с помощью особых файлов и баз данных), а в главе 17 — о данных в пространстве (сети).
Когда вы запускаете отдельную программу, ваша операционная система создает один процесс, который использует системные ресурсы (центральный процессор, память, место на диске) и структуры данных в ядре операционной системы (файлы и сетевые соединения, статистику использования и т.д.). Процесс изолирован от других процессов — он не может видеть, что делают другие процессы, или мешать им.
Операционная система отслеживает все запущенные процессы, выделяя каждому из них немного времени и затем переключаясь на другие, для того чтобы справедливо распределять работу и реагировать на действия пользователя. Вы можете увидеть состояние своих процессов с помощью графического интерфейса, такого как Mac’s Activity Monitor в OS X, диспетчера задач в Windows или команды top в Linux.
Вы также можете получить доступ к данным процесса ваших собственных программ. Модуль стандартной библиотеки os предоставляет общий способ доступа к определенной системной информации. Например, следующие функции получают идентификатор процесса и текущую рабочую папку запущенного интерпретатора Python:
>>> import os
>>> os.getpid()
76051
>>> os.getcwd()
'/Users/williamlubanovic'
А это — мои идентификаторы пользователя и группы:
>>> os.getuid()
501
>>> os.getgid()
20
Все программы, с которыми вы сталкивались до этого момента, представляли собой отдельные процессы. Запускать и останавливать другие существующие в Python программы можно, используя модуль subprocess из стандартной библиотеки. Если вы хотите просто запустить другую программу в оболочке и получить результат ее работы (как стандартный отчет, так и отчет об ошибках), используйте функцию getoutput(). В этом примере мы получим результат работы программы date системы Unix:
>>> import subprocess
>>> ret = subprocess.getoutput('date')
>>> ret
'Sun Mar 30 22:54:37 CDT 2014'
Вы не получите результат, пока процесс не завершится. Если вы хотите запустить еще какую-нибудь программу или процесс, которые займут много времени, обратитесь к разделу «Конкурентность» на с. 308. Поскольку аргументом функции getoutput() является строка, представляющая собой команду оболочки, вы можете включить в эту строку аргументы, каналы, перенаправление ввода/вывода и т.д.:
>>> ret = subprocess.getoutput('date -u')
>>> ret
'Mon Mar 31 03:55:01 UTC 2014'
Передача строки-отчета команде wc насчитывает одну строку, шесть «слов» и 29 символов:
>>> ret = subprocess.getoutput('date -u | wc')
>>> ret
' 1 6 29'
Метод check_output() принимает список команд и аргументов. По умолчанию он возвращает не строку, а объект типа bytes и не использует оболочку:
>>> ret = subprocess.check_output(['date', '-u'])
>>> ret
b'Mon Mar 31 04:01:50 UTC 2014\n'
Чтобы показать статус завершения другой программы, используйте функцию getstatusoutput(), которая возвращает кортеж с кодом статуса и результатом работы:
>>> ret = subprocess.getstatusoutput('date')
>>> ret
(0, 'Sat Jan 18 21:36:23 CST 2014')
Если вам нужен не результат работы программы, а только код, используйте функцию call():
>>> ret = subprocess.call('date')
Sat Jan 18 21:33:11 CST 2014
>>> ret
0
(В системах семейства Unix 0 обычно является статусом, сигнализирующим об успехе.)
Эти дата и время были выведены на экран, но не получены нашей программой. Поэтому мы сохраняем код возврата как ret.
Запускать программы с аргументами можно двумя способами. Первый заключается в том, чтобы разместить аргументы в одной строке. Для примера возьмем команду date-u, которая выводит на экран дату и время в UTC (о UTC мы поговорим немного позже):
>>> ret = subprocess.call('date -u', shell=True)
Tue Jan 21 04:40:04 UTC 2014
Вам нужно использовать shell=True, чтобы распознать команду date-u, разбив ее на отдельные строки и, возможно, развернув любые символы подстановки, такие как * (в нашем примере мы их не использовали).
Второй метод создает список аргументов, поэтому нет необходимости вызывать оболочку:
>>> ret = subprocess.call(['date', '-u'])
Tue Jan 21 04:41:59 UTC 2014
Запустить функцию Python как отдельный процесс или даже создать несколько независимых процессов можно с помощью модуля multiprocessing. Рассмотрим короткий и простой пример 15.1. Сохраните его под именем mp.py, а затем запустите, введя команду pythonmp.py:
Пример 15.1. mp.py
import multiprocessing
import os
def whoami(what):
print("Process %s says: %s" % (os.getpid(), what))
if __name__ == "__main__":
whoami("I'm the main program")
for n in range(4):
p = multiprocessing.Process(target=do_this,
args=("I'm function %s" % n,))
p.start()
Когда я запускаю этот пример, то вижу на экране следующее:
Process 6224 says: I'm the main program
Process 6225 says: I'm function 0
Process 6226 says: I'm function 1
Process 6227 says: I'm function 2
Process 6228 says: I'm function 3
Функция Process() вызвала новый процесс и запустила в нем функцию do_this(). Поскольку мы делали это в цикле с четырьмя итерациями, сгенерировалось четыре новых процесса, которые выполнили do_this() и завершились.
У модуля multiprocessing есть много возможностей. Он прекрасно подходит для тех случаев, когда в целях экономии времени нужно распределить какие-то задачи между несколькими процессами: это может быть, например, загрузка веб-страницы, изменение размера изображений или что-то другое. В модуле содержатся способы постановки задач в очередь, включения взаимодействия между процессами и ожидания завершения всех процессов.
В разделе «Конкурентность» на с. 308 содержится более подробная информация.
Если вы создали один или несколько процессов, а теперь по какой-то причине хотите их завершить (возможно, они зависли в цикле или же вам стало скучно, а может, просто захотелось побыть жестоким правителем), используйте функцию terminate(). В примере 15.2 процесс должен досчитать до миллиона, замирая после каждого шага на секунду и выводя раздражающее сообщение. Однако у нашей основной программы заканчивается терпение, и она сбивает его с орбиты:
Пример 15.2. mp2.py
import multiprocessing
import time
import os
def whoami(name):
print("I'm %s, in process %s" % (name, os.getpid()))
def loopy(name):
whoami(name)
start = 1
stop = 1000000
for num in range(start, stop):
print("\tNumber %s of %s. Honk!" % (num, stop))
time.sleep(1)
if __name__ == "__main__":
whoami("main")
p = multiprocessing.Process(target=loopy, args=("loopy",))
p.start()
time.sleep(5)
p.terminate()
Когда я запускаю эту программу, я вижу следующее:
I'm main, in process 97080
I'm loopy, in process 97081
Number 1 of 1000000. Honk!
Number 2 of 1000000. Honk!
Number 3 of 1000000. Honk!
Number 4 of 1000000. Honk!
Number 5 of 1000000. Honk!
Стандартный пакет os предоставляет подробную информацию о вашей системе и позволяет управлять некоторыми функциями, запуская скрипт, написанный на Python, от лица привилегированного пользователя (например, администратора). Помимо функций файлов и каталогов, которые мы рассмотрели в главе 14, в нем есть такие информационные функции (они запускаются на iMac):
>>> import os
>>> os.uname()
posix.uname_result(sysname='Darwin',
nodename='iMac.local',
release='18.5.0',
version='Darwin Kernel Version 18.5.0: Mon Mar 11 20:40:32 PDT 2019;
root:xnu-4903.251.3~3/RELEASE_X86_64',
machine='x86_64')
>>> os.getloadavg()
(1.794921875, 1.93115234375, 2.2587890625)
>>> os.cpu_count()
4
Имеется также полезная функция system(), которая выполняет командную строку так, как если бы вы ввели ее в консоли:
>>> import os
>>> os.system('date –u')
Tue Apr 30 13:10:09 UTC 2019
0
В этом пакете много полезного. Обратитесь к документации (), чтобы узнать все самое интересное.
Сторонний пакет psutil () также предоставляет информацию о системе и процессах для ОС Linux, Unix, macOS, и Windows.
Нетрудно догадаться, как он устанавливается:
$ pip install psutil
В нем содержатся следующие разделы:
•Система. ЦП, память, диск, сеть, сенсоры.
•Процессы. Идентификатор, родительский идентификатор, ЦП, память, открытые файлы, потоки.
Мы уже видели (в разделе, посвященном пакету os), что мой компьютер имеет четыре процессора. Сколько времени (в секундах) они использовали?
>>> import psutil
>>> psutil.cpu_times(True)
[scputimes(user=62306.49, nice=0.0, system=19872.71, idle=256097.64),
scputimes(user=19928.3, nice=0.0, system=6934.29, idle=311407.28),
scputimes(user=57311.41, nice=0.0, system=15472.99, idle=265485.56),
scputimes(user=14399.49, nice=0.0, system=4848.84, idle=319017.87)]
Насколько они заняты сейчас?
>>> import psutil
>>> psutil.cpu_percent(True)
26.1
>>> psutil.cpu_percent(percpu=True)
[39.7, 16.2, 50.5, 6.0]
Вам, возможно, никогда не понадобятся подобные данные, но всегда полезно знать, как их можно получить.
Вы часто выполняете команды из оболочки (вводя их вручную или запуская скрипты оболочки). Однако в Python имеется несколько качественных сторонних инструментов для управления запуском. Связанная с этим тема — очереди задач — рассматривается в подразделе «Очереди» на с. 309.
Первая версия инструмента fabric позволяет вам определить локальные и удаленные (сетевые) задачи в коде Python. Разработчик разбил этот пакет на fabric2 (для удаленных задач) и invoke (для локальных задач).
Вы можете установить invoke с помощью следующей команды:
$ pip install invoke
Одним из вариантов использования пакета invoke является возможность сделать функции доступными в качестве аргументов командной строки. Давайте создадим файл tasks.py, который содержит строки, показанные в примере 15.3.
Пример 15.3. tasks.py
from invoke import task
@task
def mytime(ctx):
import time
now = time.time()
time_str = time.asctime(time.localtime(now))
print("Local time is", timestr)
(Аргумент ctx — первый аргумент для каждой преобразуемой функции, но используется он только самим пакетом invoke. Неважно, с каким именем, но этот аргумент там должен быть.)
$ invoke mytime
Local time is Thu May 2 13:16:23 2019
Используйте аргументы –l или –list, чтобы увидеть, какие задачи доступны:
$ invoke –l
Available tasks:
mytime
Задачи могут иметь аргументы, и вы можете вызвать из командной строки несколько задач одновременно (аналогично использованию оператора && в командной строке).
Другие варианты использования:
• запуск локальных скриптов оболочки с помощью функции run;
• ответ на возвращаемые другими программами строковые шаблоны.
Этот пакет мы рассмотрели поверхностно. Для получения более подробной информации обратитесь к документации (/).
Следующие пакеты немного похожи на invoke, но в определенных ситуациях лучшее решение может предложить один из таких пакетов:
• click (/);
• doit (/);
• sh (/);
• delegator ();
• pypeln (/).
Официальный сайт Python рассматривает тему конкурентности в целом и с точки зрения стандартной библиотеки (). На страницах сайта содержится множество ссылок на различные пакеты и приемы, а я в этой главе остановлюсь на самых полезных пакетах.
Когда речь идет о компьютерах, находиться в ожидании приходится по одной из двух причин:
• из-за ограничения ввода-вывода. Это наиболее распространенная причина. Процессоры компьютеров безумно быстры — в сотни раз быстрее, чем компьютерная память, и в тысячи — чем диски или сети;
• из-за ограничения процессора. Это случается при выполнении таких объемных задач, как научные или графические расчеты.
С конкурентностью связаны еще два термина:
•синхронность — одна вещь следует за другой, как гусята, семенящие за родителями;
•асинхронность — задачи независимы, как гуси, которые плавают в пруду.
По мере того как от простых систем и задач вы будете переходить к проблемам реальной жизни, вам все чаще придется иметь дело с конкурентностью. Возьмем, например, сайт. Обычно вы можете довольно быстро предоставлять веб-клиентам статическую и динамическую страницы. Если ожидание длится долю секунды, приложение считается интерактивным, однако если время до отображения или взаимодействия более продолжительное, люди становятся нетерпеливыми. Тесты, проведенные компаниями Google и Amazon, показали, что трафик быстро падает, если страница загружается хотя бы немного медленнее обычного.
Ну а если вы не можете повлиять на то, что долго выполняется? Например, на загрузку файла на сервер, на изменение размеров изображения или запрос к базе данных? Вы больше не можете делать это при помощи синхронного кода вашего веб-сервера, поскольку кто-то уже ждет.
Если вам нужно выполнить несколько задач как можно быстрее на одном компьютере, вы можете сделать их независимыми, и тогда медленные задачи не будут блокировать остальные.
Ранее в этой главе было показано, как использовать многопроцессорную обработку для параллельной работы на одной машине. Если нужно, например, изменить размер изображения, ваш веб-сервер может создать отдельный процесс для данной задачи и запустить его асинхронно. Можно масштабировать приложение горизонтально, вызвав несколько процессов изменения размера.
Идея заключается в том, чтобы заставить их работать друг с другом. Наличие любого общего элемента управления или состояния означает, что будут возникать узкие места. Особый подход нужен для обработки ошибок, поскольку конкурентные вычисления сложнее обычных. Многое может пойти не так, и ваши шансы на успех будут ниже.
Какие же методы могут помочь справиться с этими сложностями? Начнем с хорошего способа для управления несколькими задачами — это очереди.
Очередь похожа на список: элементы добавляются с одного ее конца и удаляются с другого. Часто такой принцип называют FIFO (first in, first out — «первым пришел, первым ушел»).
Представьте, что вы моете посуду. Если вы хотите совершить полный цикл, вам нужно вымыть тарелку, высушить ее и отложить в сторону. Сделать это можно несколькими способами. Можно вымыть, высушить и отложить первую тарелку и повторить эти действия со второй и последующими тарелками. Или же можно сгруппировать операции: сначала вымыть всю посуду, затем ее высушить и отложить в сторону. При этом подразумевается, что в раковине и сушилке достаточно места, чтобы разместить там всю посуду на каждом этапе. Оба подхода являются синхронными — один работник выполняет одно действие в любой момент времени.
В качестве альтернативы вы могли бы найти одного-двух помощников. Предположим, вы моете тарелку, передаете ее тому, кто тарелки сушит, а сушильщик передаст тарелку тому, кто отложит ее в сторону. Если все работают в одном темпе, вы должны закончить работу гораздо быстрее, чем если бы выполняли ее полностью самостоятельно.
Но что, если вы моете посуду быстрее, чем сушильщик успевает с ней справляться? Тогда или вымытая посуда будет падать на пол, или вы будете складывать ее между собой и сушильщиком. Есть вариант, что вы просто начнете что-нибудь насвистывать до тех пор, пока сушильщик не примет от вас очередную тарелку. А если последний помощник работает медленнее сушильщика, падать на пол или накапливаться будет уже сухая посуда либо насвистывать начнет уже сушильщик. Хоть у вас и есть несколько работников, но общая задача все еще синхронна и может выполняться только со скоростью самого медленного работника.
«Берись дружно, не будет грузно» — гласит старая пословица. Увеличение количества работников поможет быстрее построить сарай или вымыть посуду. При этом будут задействованы очереди.
В целом, очереди переносят сообщения, которые могут содержать любую информацию. В данном случае нас интересуют очереди для распределенного управления задачами, также известные как рабочие очереди или очереди заданий. Каждая тарелка из раковины выдается доступному мойщику: он ее моет и передает первому доступному сушильщику. Тот, в свою очередь, отдает тарелку первому доступному работнику, в чьи обязанности входит убрать ее в сторону. Этот процесс может быть синхронным (работники сначала ждут, когда им дадут тарелку, а потом ждут, когда освободится следующий в очереди работник) или асинхронным (посуда поступает от работников с разной скоростью). Если у вас есть достаточно работников и они трудятся в одном темпе, задача будет выполнена гораздо быстрее.
Реализовать очередь можно разными способами. Для одного компьютера модуль стандартной библиотеки multiprocessing (с которым вы ранее уже встречались) содержит функцию Queue. Смоделируем ситуацию только с одним мойщиком посуды и несколькими сушильщиками (кто-то позже отложит посуду в сторону) и создадим промежуточную очередь dish_queue. Назовем эту программу dishes.py (пример 15.4).
Пример 15.4. dishes.py
import multiprocessing as mp
def washer(dishes, output):
for dish in dishes:
print('Washing', dish, 'dish')
output.put(dish)
def dryer(input):
while True:
dish = input.get()
print('Drying', dish, 'dish')
input.task_done()
dish_queue = mp.JoinableQueue()
dryer_proc = mp.Process(target=dryer, args=(dish_queue,))
dryer_proc.daemon = True
dryer_proc.start()
dishes = ['salad', 'bread', 'entree', 'dessert']
washer(dishes, dish_queue)
dish_queue.join()
Запускаем нашу новую программу:
$ python dishes.py
Washing salad dish
Washing bread dish
Washing entree dish
Washing dessert dish
Drying salad dish
Drying bread dish
Drying entree dish
Drying dessert dish
Эта очередь похожа на простой итератор, производящий наборы тарелок. В действительности здесь создаются отдельные процессы, которые общаются между собой. Я использовал JoinableQueue и финальный метод join(), чтобы сообщить мойщику о том, что вся посуда высушена. У модуля multiprocessing есть очереди и других типов: узнать о них вы можете из документации ().
Поток запускается внутри процесса и имеет доступ ко всему, что находится в процессе, — это можно сравнить с раздвоением личности. Модуль multiprocessing имеет «кузена» по имени threading, который использует потоки вместо процессов (на самом деле модуль multiprocessing был разработан позже своего собрата, основанного на процессах). Переделаем наш пример с процессами для применения потоков (пример 15.5).
Пример 15.5. thread1.py
import threading
def do_this(what):
whoami(what)
def whoami(what):
print("Thread %s says: %s" % (threading.current_thread(), what))
if __name__ == "__main__":
whoami("I'm the main program")
for n in range(4):
p = threading.Thread(target=do_this,
args=("I'm function %s" % n,))
p.start()
Вот что я вижу на своем экране:
Thread <_MainThread(MainThread, started 140735207346960)> says: I'm the main
program
Thread <Thread(Thread-1, started 4326629376)> says: I'm function 0
Thread <Thread(Thread-2, started 4342157312)> says: I'm function 1
Thread <Thread(Thread-3, started 4347412480)> says: I'm function 2
Thread <Thread(Thread-4, started 4342157312)> says: I'm function 3
Мы можем воспроизвести пример с посудой, основанный на процессах, используя потоки (пример 15.6).
Пример 15.6. thread_dishes.py
import threading, queue
import time
def washer(dishes, dish_queue):
for dish in dishes:
print ("Washing", dish)
time.sleep(5)
dish_queue.put(dish)
def dryer(dish_queue):
while True:
dish = dish_queue.get()
print ("Drying", dish)
time.sleep(10)
dish_queue.task_done()
dish_queue = queue.Queue()
for n in range(2):
dryer_thread = threading.Thread(target=dryer, args=(dish_queue,))
dryer_thread.start()
dishes = ['salad', 'bread', 'entree', 'desert']
washer(dishes, dish_queue)
dish_queue.join()
Различие между модулями multiprocessing и threading заключается в том, что модуль threading не имеет функции terminate(). Не существует простого способа завершить запущенный поток, поскольку это может вызвать разнообразные проблемы в коде и, возможно, даже в пространственно-временном континууме.
Потоки могут быть опасны. Как и управление памятью вручную в таких языках, как С и С++, они могут вызвать появление ошибок, которые ужасно трудно найти и исправить. Для того чтобы задействовать потоки, весь код программы — и код внешних библиотек, которые он использует, — должен быть потокобезопасным. В предыдущем примере кода потоки не имели дела с глобальными переменными, поэтому могли работать независимо, ничего не разрушая.
Представьте, что вы исследуете паранормальную активность в доме с привидениями. Привидения скитаются по коридорам, но ни одно из них не знает о существовании другого, и в любой момент каждое привидение может просматривать, добавлять, удалять или перемещать любую вещь из домашней обстановки.
Вы настороженно идете по дому, снимая показания своими впечатляющими инструментами. И внезапно замечаете, что подсвечник, мимо которого вы проходили несколько секунд назад, пропал.
Содержимое дома похоже на переменные в программе. Привидения — это потоки процесса (дома). Если бы привидения только просматривали содержимое дома, это не было бы проблемой — как поток, который просто читает значение константы или переменной, не пытаясь его изменить.
Однако некая невидимая сущность может схватить ваш фонарик, дунуть холодной струей воздуха на вашу шею, рассыпать шарики на ступеньках или заставить вспыхнуть огонь в камине. Особо утонченные привидения могут так изменить вещи в других комнатах, что вы даже не заметите.
Несмотря на все ваши причудливые инструменты, вам будет очень трудно разобраться в том, кто, когда и как это сделал.
Если бы вместо потоков вы использовали несколько процессов, это можно было бы сравнить с несколькими домами, в каждом из которых обитает только одно (живое) существо. Если поставить бутылку бренди перед камином, через час она все еще будет там. Возможно, немного жидкости испарится, но сама бутылка останется на том же месте.
Потоки могут быть полезны и безопасны, когда речь не идет о глобальных данных. В частности, рекомендуется использовать потоки для экономии времени при ожидании завершения некоторых операций ввода/вывода. В этом случае потокам не нужно сражаться за данные, поскольку у каждого из них имеется свой набор переменных.
Но у потоков иногда есть веские причины для изменения глобальных данных. Фактически самая распространенная причина для использования нескольких потоков — это возможность разделить между ними работу над определенными данными, поэтому ожидается, что какие-то данные будут изменены.
Классический способ разделить данные безопасно — разместить программную блокировку перед изменением переменной в потоке. Это позволит оградить ее значение от других потоков и внести свои изменения. В примере с домом вы бы просто оставили бригаду охотников за привидениями в той комнате, которая должна остаться свободной от привидений. Нужно лишь не забыть ее разблокировать. Блокировки также могут быть вложенными — что, если и другая бригада охотников за привидениями будет наблюдать за этой же комнатой или за всем домом? Использование блокировок достаточно традиционно, но его трудно организовать правильно.
В Python потоки не ускоряют задачи, связанные с ограничениями процессора, из-за одной детали реализации стандартной системы Python, которая называется Global Interpreter Lock (GIL). Она предназначена для того, чтобы избежать потоковых проблем в интерпретаторе Python, и действительно может замедлить многопоточную программу по сравнению с однопоточной или даже многопроцессорной версией.
Итак, для Python рекомендации следующие:
• используйте потоки для задач, связанных с ограничениями ввода-вывода;
• используйте процессы, сетевые вычисления или события (которые мы рассмотрим в следующем подразделе) для задач, связанных с ограничениями процессора.
Как вы только что видели, использование потоков или нескольких процессов требует отслеживания большого количества деталей. Модуль concurrent.futures был добавлен в стандартную библиотеку в версии Python 3.2 для того, чтобы упростить эту задачу. Он позволяет вам спроектировать асинхронный пул так называемых работников с помощью потоков (с ограничением по вводу-выводу) и процессов (с ограничением по ЦП). Вы получаете объект типа future, который позволяет отслеживать состояние заданий и собирать результаты работы.
В примере 15.7 вы видите тестовую программу, которую можно сохранить под именем cf.py. Функция-задача calc() спит в течение одной секунды (наш способ симулировать бурную деятельность), вычисляет квадратный корень своего аргумента и возвращает его. Программа принимает необязательный аргумент командной строки — количество работников, которых следует задействовать (по умолчанию –3), запускает заданное количество из пула потоков, затем из пула процессов, а затем выводит на экран затраченное время. Список values содержит пять чисел, отправленных функции calc() по одному с помощью работника-потока или работника-процесса.
Пример 15.7. cf.py
from concurrent import futures
import math
import time
import sys
def calc(val):
time.sleep(1)
result = math.sqrt(float(val))
return result
def use_threads(num, values):
t1 = time.time()
with futures.ThreadPoolExecutor(num) as tex:
results = tex.map(calc, values)
t2 = time.time()
return t2 – t1
def use_processes(num, values):
t1 = time.time()
with futures.ProcessPoolExecutor(num) as pex:
results = pex.map(calc, values)
t2 = time.time()
return t2 – t1
def main(workers, values):
print(f"Using {workers} workers for {len(values)} values")
t_sec = use_threads(workers, values)
print(f"Threads took {t_sec:.4f} seconds")
p_sec = use_processes(workers, values)
print(f"Processes took {p_sec:.4f} seconds")
if __name__ == '__main__':
workers = int(sys.argv[1])
values = list(range(1, 6)) # 1 .. 5
main(workers, values)
Я получил следующие результаты:
$ python cf.py 1
Using 1 workers for 5 values
Threads took 5.0736 seconds
Processes took 5.5395 seconds
$ python cf.py 3
Using 3 workers for 5 values
Threads took 2.0040 seconds
Processes took 2.0351 seconds
$ python cf.py 5
Using 5 workers for 5 values
Threads took 1.0052 seconds
Processes took 1.0444 seconds
Вызов функции sleep() заставил каждого работника потратить секунду на каждое вычисление.
• Если мы задействуем всего одного работника, вычисления выполнятся последовательно, а общее время составит более пяти секунд.
• Если мы задействуем пять работников, то их количество совпадет с количеством проверяемых значений, поэтому программа будет выполняться чуть дольше чем одну секунду.
• Если мы задействуем трех работников, нам понадобятся два запуска, чтобы обработать все пять значений, поэтому пройдет всего две секунды.
В программе я проигнорировал реальные результаты (рассчитанные нами квадратные корни) для того, чтобы сделать акцент на затраченном времени. Важно отметить, что использование функции map() для определения пула заставит нас дожидаться выполнения действий всех работников перед тем, как возвращать результаты. Если вы хотите получать каждый отдельный результат по завершении работы, создайте еще одну тестовую программу (назовем ее cf2.py), в которой каждый работник будет возвращать значения по мере их вычисления (пример 15.8).
Пример 15.8. cf2.py
from concurrent import futures
import math
import sys
def calc(val):
result = math.sqrt(float(val))
return val, result
def use_threads(num, values):
with futures.ThreadPoolExecutor(num) as tex:
tasks = [tex.submit(calc, value) for value in values]
for f in futures.as_completed(tasks):
yield f.result()
def use_processes(num, values):
with futures.ProcessPoolExecutor(num) as pex:
tasks = [pex.submit(calc, value) for value in values]
for f in futures.as_completed(tasks):
yield f.result()
def main(workers, values):
print(f"Using {workers} workers for {len(values)} values")
print("Using threads:")
for val, result in use_threads(workers, values):
print(f'{val} {result:.4f}')
print("Using processes:")
for val, result in use_processes(workers, values):
print(f'{val} {result:.4f}')
if __name__ == '__main__':
workers = 3
if len(sys.argv) > 1:
workers = int(sys.argv[1])
values = list(range(1, 6)) # 1 .. 5
main(workers, values)
Наши функции use_threads() и use_processes() теперь являются функциями-генераторами, которые вызывают yield для того, чтобы вернуть значение на каждой итерации. По результатам одного запуска на моей машине вы можете увидеть, что работники не всегда выполняют задачи в порядке от 1 до 5:
$ python cf2.py 5
Using 5 workers for 5 values
Using threads:
3 1.7321
1 1.0000
2 1.4142
4 2.0000
5 2.2361
Using processes:
1 1.0000
2 1.4142
3 1.7321
4 2.0000
5 2.2361
Вы можете использовать модуль concurrent.futures всякий раз, когда вам нужно запустить несколько конкурирующих задач, например таких, как:
• поиск (crawling) URL в Интернете;
• обработка файлов, например изменение размера изображений;
• вызов API какой-либо службы.
Как обычно, в документации () вы можете найти дополнительную информацию с большим количеством технических деталей.
Как вы уже видели, разработчики стремятся избежать медленных мест в программах, запуская их в отдельных потоках или процессах. Примером такого дизайна является веб-сервер Apache.
Альтернативой является программирование, основанное на событиях (event-based programming). Программа, основанная на событиях, запускает центральный цикл обработки событий, раздает задачи и повторяет цикл. Так устроен веб-сервер NGINX, работающий быстрее, чем Apache.
Библиотека gevent основана на событиях и позволяет достичь следующего: вы пишете обычный императивный код, и волшебным образом его части превращаются в сопрограммы. Они похожи на генераторы, которые могут взаимодействовать друг с другом и отслеживать свое текущее состояние. Библиотека gevent модифицирует многие стандартные объекты Python, такие как socket, для того чтобы использовать их механизм вместо блокирования. Это не работает для кода надстроек Python, который написан на С, например для некоторых драйверов баз данных.
Вы можете установить библиотеку gevent с помощью pip:
$ pip install gevent
Вот вариант примера кода на сайте библиотеки gevent (/). Вы увидите функцию gethostbyname() класса socket в следующем разделе DNS. Эта функция работает синхронно, поэтому вам придется подождать (возможно, много секунд), пока она не получит имена серверов со всего мира, чтобы найти нужный адрес. Но вы можете использовать версию gevent, чтобы искать несколько сайтов независимо друг от друга. Сохраните этот файл как gevent_test.py (пример 15.9).
Пример 15.9. gevent_test.py
import gevent
from gevent import socket
hosts = ['', '',
'']
jobs = [gevent.spawn(gevent.socket.gethostbyname, host) for host in hosts]
gevent.joinall(jobs, timeout=5)
for job in jobs:
print(job.value)
В этом примере вы можете увидеть однострочный цикл for. Каждое имя хоста по очереди передается в вызов gethostbyname(), но запустить их можно асинхронно благодаря версии функции gethostbyname() из библиотеки gevent.
Запустите файл gevent_test.py:
$ python gevent_test.py
66.6.44.4
74.125.142.121
78.136.12.50
Функция gevent.spawn() создает гринлет (greenlet) (называемый также зеленым потоком и микропотоком) для выполнения каждого вызова gevent.socket.gethostbyname(url).
Разница между ним и обычным потоком заключается в том, что зеленый поток не блокируется. Если произошло какое-то событие, которое заблокировало обычный поток, gevent переключит управление на другой зеленый поток.
Метод gevent.joinall() ожидает завершения всех созданных задач. Наконец, мы выводим на экран IP-адреса, полученные для заданных имен хостов.
Вместо версии socket из модуля gevent вы можете использовать его функции, называемые monkey-patching (обезьяний патч). Они модифицируют стандартные модули, такие как socket, для использования гринлетов вместо того, чтобы каждый раз вызывалась версия модуля gevent. Это полезно, если вы хотите использовать gevent везде, даже в коде, к которому вы можете не иметь доступа.
Добавьте в начало программы следующий вызов:
from gevent import monkey
monkey.patch_socket()
Таким образом, все обычные сокеты заменятся на сокеты gevent даже в стандартной библиотеке. Опять же это работает только для кода Python, но не для библиотек, написанных на С.
Еще одна функция monkey-patching даже больше модулей стандартной библиотеки:
from gevent import monkey
monkey.patch_all()
Разместите этот код в начале программы, чтобы максимально воспользоваться ускорением, обеспечиваемым gevent.
Сохраните программу под именем gevent_monkey.py (пример 15.10).
Пример 15.10. gevent_monkey.py
import gevent
from gevent import monkey; monkey.patch_all()
import socket
hosts = ['', '',
'']
jobs = [gevent.spawn(socket.gethostbyname, host) for host in hosts]
gevent.joinall(jobs, timeout=5)
for job in jobs:
print(job.value)
Запустите программу:
$ python gevent_monkey.py
66.6.44.4
74.125.192.121
78.136.12.50
Использование gevent может нести потенциальную опасность. Как и в случае с любой другой системой, основанной на событиях, каждый исполняемый вами фрагмент кода должен быть относительно быстрым. Несмотря на то что код, который выполняет много работы, не блокируется, он будет работать медленно.
Сама идея monkey-patching заставляет нервничать некоторых людей. Несмотря на это, многие крупные сайты, такие как Pinterest, используют gevent для значительного ускорения своей работы. Используйте gevent строго по назначению, как таблетки по рецепту.
Чтобы увидеть больше примеров, обратитесь к этому руководству — /.
Существует два других популярных фреймворка, основанных на событиях, — tornado (/) и gunicorn (/). Они помогают обрабатывать события на низком уровне, а также предоставляют быстрый веб-сервер. Их стоит рассмотреть, если вы хотите создать быстрый сайт без применения традиционных веб-серверов, таких как Apache.
twisted (/) — это асинхронный фреймворк, управляемый событиями, для работы с сетями. Вы подключаете функции к таким событиям, как получение данных или закрытие соединения, и функции вызываются, когда событие происходит. В данном случае речь идет о функциях обратного вызова. Если вы уже писали код на языке JavaScript, это может показаться вам знакомым. По мере роста приложения некоторым разработчикам бывает сложнее управлять кодом, основанным на функциях обратного вызова.
Чтобы установить фреймворк, введите следующую команду:
$ pip install twisted
twisted — это крупный пакет, который поддерживает множество интернет-протоколов на базе TCP и UDP. Для краткости мы рассмотрим небольшой сервер и клиент, созданные на базе примеров для twisted (). Сначала обратимся к серверу knock_server.py (пример 15.11).
Пример 15.11. knock_server.py
from twisted.internet import protocol, reactor
class Knock(protocol.Protocol):
def dataReceived(self, data):
print 'Client:', data
if data.startswith("Knock knock"):
response = "Who's there?"
else:
response = data + " who?"
print 'Server:', response
self.transport.write(response)
class KnockFactory(protocol.Factory):
def buildProtocol(self, addr):
return Knock()
reactor.listenTCP(8000, KnockFactory())
reactor.run()
Теперь взглянем на его верного компаньона knock_client.py (пример 15.12).
Пример 15.12. knock_client.py
from twisted.internet import reactor, protocol
class KnockClient(protocol.Protocol):
def connectionMade(self):
self.transport.write("Knock knock")
def dataReceived(self, data):
if data.startswith("Who's there?"):
response = "Disappearing client"
self.transport.write(response)
else:
self.transport.loseConnection()
reactor.stop()
class KnockFactory(protocol.ClientFactory):
protocol = KnockClient
def main():
f = KnockFactory()
reactor.connectTCP("localhost", 8000, f)
reactor.run()
if __name__ == '__main__':
main()
Первым запустим сервер:
$ python knock_server.py
Вторым — клиент:
$ python knock_client.py
Сервер и клиент обмениваются сообщениями, и сервер выводит весь диалог:
Client: Knock knock
Server: Who's there?
Client: Disappearing client
Server: Disappearing client who?
Наш клиент-шутник завершает работу, оставляя сервер ждать ударной реплики.
Если вы хотите забраться в дебри twisted, попробуйте запустить другие примеры из его документации.
Библиотека asyncio была добавлена в версию Python 3.4. Она представляет собой способ определения конкурентного кода с помощью новой функциональности, предоставляемой ключевыми словами async и await. Это очень обширная тема с большим количеством деталей. Чтобы не перегружать эту главу, я переместил рассмотрение библиотеки asyncio и связанных с ней тем в приложение В.
Приведенные ранее примеры кода для мытья посуды, где использовались процессы или потоки, запускались на одной машине. Рассмотрим еще один подход к очередям, которые могут запускаться на одной машине или во всей сети. Иногда даже для нескольких процессов и/или потоков одной машины бывает недостаточно. Вы можете воспринимать этот подраздел как мостик между конкурентным использованием одного блока (компьютера) и конкурентным использованием нескольких блоков.
Для запуска примеров из этого раздела вам понадобятся сервер Redis и его модуль для Python. Чтобы узнать, как их скачать, обратитесь к подразделу «Redis» на с. 256. В этой главе Redis используется как база данных. Здесь же мы рассмотрим его возможности для работы с конкурентностью.
Создать очередь можно с помощью списка Redis. Сервер Redis работает на одной машине, на которой могут быть запущены и клиенты. Возможно также, что никакие клиенты на ней не запускаются, а остальные машины получают доступ к серверу по сети. В любом случае клиент общается с сервером с помощью протокола TCP. Один или несколько клиентов-провайдеров помещают сообщения в конец списка. Один или несколько клиентов-работников наблюдают за списком и используют операцию «блокирующее выталкивание» (blocking pop). Если список пуст, то все они просто тратят время впустую. Как только сообщение появляется, его получает первый ожидающий работник.
Как и в предыдущих, основанных на процессах и потоках примерах, код файла redis_washer.py генерирует последовательность посуды (пример 15.13).
Пример 15.13. redis_washer.py
import redis
conn = redis.Redis()
print('Washer is starting')
dishes = ['salad', 'bread', 'entree', 'dessert']
for dish in dishes:
msg = dish.encode('utf-8')
conn.rpush('dishes', msg)
print('Washed', num)
conn.rpush('dishes', 'quit')
print('Washer is done')
Цикл генерирует четыре сообщения с названиями тарелок, за которыми следует заключительное сообщение со словом quit. Каждое сообщение добавляется в список тарелок на сервере Redis по принципу, сходному с принципами Python.
Как только первая тарелка готова, в работу вступает код файла redis_dryer.py (пример 15.14).
Пример 15.14. redis_dryer.py
import redis
conn = redis.Redis()
print('Dryer is starting')
while True:
msg = conn.blpop('dishes')
if not msg:
break
val = msg[1].decode('utf-8')
if val == 'quit':
break
print('Dried', val)
print('Dishes are dried')
Этот код ожидает сообщения, в котором первым токеном будет слово dishes, и выводит сообщение о каждой высушенной тарелке. Подчиняясь сообщению quit, он завершает цикл.
Запустите сначала сушильщика, а затем мойщика. Использование символа & в конце команды ставит первую программу в фоновый режим: она продолжает выполняться, но больше не принимает команды с клавиатуры. Это работает для операционных систем Linux, OS X и Windows, однако вы можете получить разные результаты в следующей строке. В нашем случае (OS X) этим результатом является некоторая информация о фоновом процессе сушильщика. Далее мы запускаем процесс мойщика как обычно (на переднем плане). Вы увидите смешанную выходную информацию двух процессов:
$ python redis_dryer.py &
[2] 81691
Dryer is starting
$ python redis_washer.py
Washer is starting
Washed salad
Dried salad
Washed bread
Dried bread
Washed entree
Dried entree
Washed dessert
Washer is done
Dried dessert
Dishes are dried
[2]+ Done python redis_dryer.py
Как только идентификаторы посуды начинают приходить от мойщика, наш трудолюбивый процесс сушильщика начинает их обрабатывать. Каждый идентификатор посуды, за исключением финального контрольного значения (строки quit) является числом. После того как процесс сушильщика считывает этот идентификатор quit, он завершает работу и выводит на терминал дополнительную информацию о фоновом процессе (что также зависит от системы). Вы можете использовать контрольное значение (или по-другому — некорректное значение), чтобы указать на что-то особенное в потоке данных. В нашей ситуации мы говорим, что закончили работу. В противном случае придется добавить больше программной логики, например:
• заранее оговорить некоторое максимальное количество посуды, что также будет похоже на контрольное значение;
• выполнять определенную специфическую коммуникацию вне потока данных между процессами;
• завершать работу по прошествии какого-то времени, если данных не поступало.
Внесем еще несколько изменений.
• Создадим несколько процессов-сушильщиков.
• Заставим их завершаться по прошествии некоторого времени вместо того, чтобы ожидать контрольного значения.
Новый файл redis_dryer2.py показан в примере 15.15.
Пример 15.15. redis_dryer2.py
def dryer():
import redis
import os
import time
conn = redis.Redis()
pid = os.getpid()
timeout = 20
print('Dryer process %s is starting' % pid)
while True:
msg = conn.blpop('dishes', timeout)
if not msg:
break
val = msg[1].decode('utf-8')
if val == 'quit':
break
print('%s: dried %s' % (pid, val))
time.sleep(0.1)
print('Dryer process %s is done' % pid)
import multiprocessing
DRYERS=3
for num in range(DRYERS):
p = multiprocessing.Process(target=dryer)
p.start()
Запустим процессы сушильщиков в фоновом режиме и процесс мойщика на переднем плане:
$ python redis_dryer2.py &
Dryer process 44447 is starting
Dryer process 44448 is starting
Dryer process 44446 is starting
$ python redis_washer.py
Washer is starting
Washed salad
44447: dried salad
Washed bread
44448: dried bread
Washed entree
44446: dried entree
Washed dessert
Washer is done
44447: dried dessert
Один процесс сушильщика считывает идентификатор quit и завершает работу:
Dryer process 44448 is done
Через 20 секунд другие процессы-сушильщики получают значение None от вызова blpop (это указывает на то, что они завершились по таймеру). Процессы выводят свои последние сообщения и завершаются:
Dryer process 44447 is done
Dryer process 44446 is done
После того как завершается последний подпроцесс-сушильщик, завершается и основная программа-сушильщик:
[1]+ Done python redis_dryer2.py
С увеличением числа работающих элементов растет вероятность сбоев в работе нашего конвейера. Хватит ли нам работников, если нужно будет вымыть посуду после банкета? А что, если сушильщики напьются до чертиков? А если забьется раковина? Ох уж эти проблемы!
Как же с ними справиться? К счастью, есть некоторые приемы.
•Запустить и забыть. Просто передавайте обработанные объекты дальше и не заботьтесь о последствиях, даже если там никого нет. Этот подход похож на сбрасывание посуды на пол.
• Запрос — ответ. Мойщик получает подтверждение от сушильщика, а сушильщик — от того, кто откладывает посуду в сторону. Все это выполняется для каждой тарелки.
•Регулирование нагрузки. Этот прием указывает самому быстрому работнику притормозить, если один из работников, стоящих после него, не поспевает за ним.
В реальных системах вам нужно внимательно следить за тем, чтобы все работники успевали за графиком, в противном случае вы услышите звук бьющейся посуды. Вы можете добавлять в список ожидания новые задачи, а какой-то процесс будет доставать из этого списка последнее сообщение и помещать его в список обработки. Обработанное сообщение будет удалено из списка обработки и добавлено в список завершенных задач. Это позволит вам узнать, какие задачи не были выполнены или затрачивают слишком много времени. Вы можете сделать это самостоятельно с помощью Redis или использовать систему, которую кто-то другой уже написал и протестировал. Некоторые основанные на Python пакеты для работы с очередями (часть из них используют Redis) позволяют удобно управлять процессом:
• celery (/) может выполнять распределенные задачи как синхронно, так и асинхронно, используя рассмотренные нами методы — multiprocessing, gevent и др.;
• rq (/) — это библиотека Python для очередей задач, также основанная на Redis.
В этой главе мы пропустили данные сквозь процессы. В следующей главе вы увидите, как сохранять и получать данные, используя разные форматы файлов и баз данных.
15.1. Используйте модуль multiprocessing, чтобы создать три отдельных процесса. Заставьте каждый из них подождать случайное количество секунд между нулем и единицей, вывести текущее время, а затем завершить работу.