ВУЗ: Не указан
Категория: Не указан
Дисциплина: Не указана
Добавлен: 11.01.2024
Просмотров: 1148
Скачиваний: 5
ВНИМАНИЕ! Если данный файл нарушает Ваши авторские права, то обязательно сообщите нам.
СОДЕРЖАНИЕ
потоков"
, в котором мы решили, что наш пул потоков должен иметь интерфейс,
похожий на thread::spawn
. Кроме того, мы реализуем функцию execute
, чтобы она принимала замыкание и передавала его свободному потоку из пула для запуска.
Мы определим метод execute у
ThreadPool для приёма замыкания в качестве параметра. Вспомните раздел "Хранение замыканий с использованием общих параметров и типажей
Fn
"
главы 13 и о том, что мы можем принимать замыкания в качестве параметров с тремя различными типажами:
Fn
,
FnMut и
FnOnce
. Нам нужно решить, какой тип замыкания использовать здесь. Мы знаем, что в конечном счёте мы сделаем что-то похожее на реализацию стандартной библиотеки thread::spawn
, поэтому мы можем посмотреть, какие ограничения накладывает на его параметр в сигнатуре thread::spawn
. Документация показывает следующее:
Параметр типа
F
- это тот, который нас интересует; параметр типа
T
относится к возвращаемому значению и нам он не интересен. Можно увидеть, что spawn использует
FnOnce в качестве ограничения типажа у
F
. Это, вероятно то, чего мы хотим, потому что мы в конечном итоге передадим получаемый аргумент в execute для spawn
. Мы также можем быть ещё более уверены, что
FnOnce
- это тот типаж, который мы хотим использовать, поскольку поток для выполнения запроса будет выполнять этот запрос только один раз, что соответствует параметру
Once в типаже
FnOnce
Параметр типа
F
также имеет ограничение типажа
Send и ограничение времени жизни 'static
, которые полезны в нашей ситуации: нам нужен
Send для передачи замыкания из одного потока в другой и 'static
, потому что мы не знаем, сколько времени займёт выполнение потока. Давайте создадим метод execute для
ThreadPool
, который будет принимать обобщённый параметр типа
F
со следующими ограничениями:
Файл: src/lib.rs
Мы по-прежнему используем
()
после
FnOnce потому что типаж
FnOnce представляет замыкание, которое не принимает параметров и возвращает единичный тип
()
. Также как при определении функций, тип возвращаемого значения может быть опущен в сигнатуре, но даже если у нас нет параметров, нам все равно нужны скобки.
pub fn spawn
(f: F) -> JoinHandle where
F:
FnOnce
() -> T,
F:
Send
+
'static
,
T:
Send
+
'static
, impl
ThreadPool {
// --snip-- pub fn execute
(&
self
, f: F) where
F:
FnOnce
() +
Send
+
'static
,
{
}
}
, в котором мы решили, что наш пул потоков должен иметь интерфейс,
похожий на thread::spawn
. Кроме того, мы реализуем функцию execute
, чтобы она принимала замыкание и передавала его свободному потоку из пула для запуска.
Мы определим метод execute у
ThreadPool для приёма замыкания в качестве параметра. Вспомните раздел "Хранение замыканий с использованием общих параметров и типажей
Fn
"
главы 13 и о том, что мы можем принимать замыкания в качестве параметров с тремя различными типажами:
Fn
,
FnMut и
FnOnce
. Нам нужно решить, какой тип замыкания использовать здесь. Мы знаем, что в конечном счёте мы сделаем что-то похожее на реализацию стандартной библиотеки thread::spawn
, поэтому мы можем посмотреть, какие ограничения накладывает на его параметр в сигнатуре thread::spawn
. Документация показывает следующее:
Параметр типа
F
- это тот, который нас интересует; параметр типа
T
относится к возвращаемому значению и нам он не интересен. Можно увидеть, что spawn использует
FnOnce в качестве ограничения типажа у
F
. Это, вероятно то, чего мы хотим, потому что мы в конечном итоге передадим получаемый аргумент в execute для spawn
. Мы также можем быть ещё более уверены, что
FnOnce
- это тот типаж, который мы хотим использовать, поскольку поток для выполнения запроса будет выполнять этот запрос только один раз, что соответствует параметру
Once в типаже
FnOnce
Параметр типа
F
также имеет ограничение типажа
Send и ограничение времени жизни 'static
, которые полезны в нашей ситуации: нам нужен
Send для передачи замыкания из одного потока в другой и 'static
, потому что мы не знаем, сколько времени займёт выполнение потока. Давайте создадим метод execute для
ThreadPool
, который будет принимать обобщённый параметр типа
F
со следующими ограничениями:
Файл: src/lib.rs
Мы по-прежнему используем
()
после
FnOnce потому что типаж
FnOnce представляет замыкание, которое не принимает параметров и возвращает единичный тип
()
. Также как при определении функций, тип возвращаемого значения может быть опущен в сигнатуре, но даже если у нас нет параметров, нам все равно нужны скобки.
pub fn spawn
F:
FnOnce
() -> T,
F:
Send
+
'static
,
T:
Send
+
'static
, impl
ThreadPool {
// --snip-- pub fn execute
self
, f: F) where
F:
FnOnce
() +
Send
+
'static
,
{
}
}
Опять же, это самая простая реализация метода execute
: она ничего не делает, мы только пытаемся сделать код компилируемым. Давайте проверим снова:
Сейчас мы получаем только предупреждения, что означает, что код компилируется! Но обратите внимание, если вы попробуете cargo run и сделаете запрос в браузере, вы увидите ошибки в браузере, которые мы видели в начале главы. Наша библиотека на самом деле ещё не вызывает замыкание, переданное в execute
!
Примечание: вы возможно слышали высказывание о языках со строгими компиляторами, таких как Haskell и Rust, которое звучит так: «Если код компилируется, то он работает». Но это высказывание не всегда верно. Наш проект компилируется, но абсолютно ничего не делает! Если бы мы создавали реальный,
законченный проект, это был бы хороший момент начать писать модульные тесты,
чтобы проверять, что код компилируется и имеет желаемое поведение.
Проверка количества потоков в new
Мы продолжим получать предупреждения, потому что мы ничего не делаем с параметрами для new и execute
. Давайте реализуем тела этих функций в соответствии с желаемым поведением. Для начала давайте подумаем о new
. Ранее мы выбирали без знаковый тип для параметра size
, потому что пул с отрицательным числом потоков не имеет смысла. Тем не менее, пул с нулевым значением для потоков также не имеет смысла, но ноль является совершенно корректным для типа usize
. Мы добавим код,
чтобы проверить, что size больше нуля, перед возвращением экземпляра
ThreadPool и
будем паниковать, если программа получит ноль, используя макрос assert!
, как показано в листинге 20-13.
Файл: src/lib.rs
$
cargo check
Checking hello v0.1.0 (file:///projects/hello)
Finished dev [unoptimized + debuginfo] target(s) in 0.24s
Листинг 20-13: Реализация
ThreadPool::new
с паникой, если
size
равен нулю
Мы добавили документации в
ThreadPool с помощью комментариев. Обратите внимание, мы следовали хорошим практикам документирования, добавив раздел, в котором указывается ситуация при которой функция может паниковать как обсуждалось в главе 14. Попробуйте запустить cargo doc --open и кликнуть структуру
ThreadPool
,
чтобы увидеть как выглядит сгенерированная документация для new
!
Вместо добавления макроса assert!
, как мы здесь сделали, мы могли бы описать у new возвращать
Result как мы делали в
Config::new проекта ввода/вывода в коде 12-9. Но сейчас мы решили, что попытка создания пула потоков без любого указания количества потоков должно быть не восстанавливаемой ошибкой. Если вы чувствуете себя честолюбивым, попробуйте написать версию new со следующей сигнатурой, чтобы сравнить обе версии:
Создание места для хранения потоков
Теперь у нас есть способ узнать, что задано допустимое число потоков для хранения в пуле и мы можем создать эти потоки и сохранить их в структуре
ThreadPool перед её
возвратом. Но как мы "храним" поток? Давайте ещё раз посмотрим на сигнатуру thread::spawn
:
Функция spawn возвращает тип
JoinHandle
, где
T
является типом, который возвращает замыкание. Давайте попробуем использовать
JoinHandle и посмотрим, что impl
ThreadPool {
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new
(size: usize
) -> ThreadPool { assert!
(size >
0
);
ThreadPool
}
// --snip--
} pub fn new
(size: usize
) ->
Result
F:
FnOnce
() -> T,
F:
Send
+
'static
,
T:
Send
+
'static
,
произойдёт. В нашем случае замыкания, которые мы передаём пулу потоков, будут обрабатывать соединение и ничего не будут возвращать, поэтому
T
будет единичным
(unit) типом
()
Листинг 20-14 скомпилируется, но пока не создаёт потоков. Мы изменили объявление
ThreadPool
, чтобы оно содержало вектор экземпляров thread::JoinHandle<()>
,
инициализировали вектор с размером size
, установили цикл for
, который будет запускать некоторый код для создания потоков и вернули экземпляр
ThreadPool содержащий потоки.
Файл: src/lib.rs
Листинг 20-14: Создание вектора в
ThreadPool
для хранения потоков
Мы добавили std::thread в область видимости библиотечного крейта, потому что мы используем thread::JoinHandle в качестве типа элементов вектора в
ThreadPool
После получения корректного значения size, наш
ThreadPool создаёт новый вектор,
который может содержать size элементов. В этой книге мы ещё не использовали функцию with_capacity
, которая выполняет ту же задачу что и
Vec::new
, но с важным отличием: она заранее выделяет указанную память в векторе. Поскольку мы знаем, что нам нужно хранить size элементов в векторе, выполнение этого выделения немного более эффективно, чем использование
Vec::new
, который изменяет размеры при вставке элементов.
Когда вы снова запустите cargo check
, вы получите ещё несколько предупреждений, но все должно завершится успехом.
use std::thread; pub struct
ThreadPool
{ threads:
Vec
<:joinhandle>>,
} impl
ThreadPool {
// --snip-- pub fn new
(size: usize
) -> ThreadPool { assert!
(size >
0
); let mut threads =
Vec
::with_capacity(size); for
_ in
0
..size {
// create some threads and store them in the vector
}
ThreadPool { threads }
}
// --snip--
}
T
будет единичным
(unit) типом
()
Листинг 20-14 скомпилируется, но пока не создаёт потоков. Мы изменили объявление
ThreadPool
, чтобы оно содержало вектор экземпляров thread::JoinHandle<()>
,
инициализировали вектор с размером size
, установили цикл for
, который будет запускать некоторый код для создания потоков и вернули экземпляр
ThreadPool содержащий потоки.
Файл: src/lib.rs
Листинг 20-14: Создание вектора в
ThreadPool
для хранения потоков
Мы добавили std::thread в область видимости библиотечного крейта, потому что мы используем thread::JoinHandle в качестве типа элементов вектора в
ThreadPool
После получения корректного значения size, наш
ThreadPool создаёт новый вектор,
который может содержать size элементов. В этой книге мы ещё не использовали функцию with_capacity
, которая выполняет ту же задачу что и
Vec::new
, но с важным отличием: она заранее выделяет указанную память в векторе. Поскольку мы знаем, что нам нужно хранить size элементов в векторе, выполнение этого выделения немного более эффективно, чем использование
Vec::new
, который изменяет размеры при вставке элементов.
Когда вы снова запустите cargo check
, вы получите ещё несколько предупреждений, но все должно завершится успехом.
use std::thread; pub struct
ThreadPool
{ threads:
Vec
<:joinhandle>>,
} impl
ThreadPool {
// --snip-- pub fn new
(size: usize
) -> ThreadPool { assert!
(size >
0
); let mut threads =
Vec
::with_capacity(size); for
_ in
0
..size {
// create some threads and store them in the vector
}
ThreadPool { threads }
}
// --snip--
}
Структура Worker ответственная за отправку кода из ThreadPool в поток
Мы оставили комментарий относительно создания потоков в цикле for кода 20-14.
Здесь мы рассмотрим, как мы на самом деле создаём потоки. Стандартная библиотека предоставляет thread::spawn как способ создания потоков, а thread::spawn ожидает получить некоторый код, который поток должен запустить как только поток создан.
Однако в нашем случае мы хотим создать потоки и заставить их ждать код, который мы отправим им позже. Реализация потоков в стандартной библиотеке не имеет какого то способа это сделать, мы должны реализовать это вручную.
Мы будем реализовывать это поведение с помощью новой структуры данных между
ThreadPool и потоками, которая будет управлять этим новым поведением. Мы назовём эту структуру данных
Worker
, что является общим термином в реализации пулов.
Подумайте о людях, работающих на кухне в ресторане: рабочие ждут пока не поступят заказы от клиентов, а затем они несут ответственность за принятие этих заказов и их выполнение.
Вместо хранения вектора
JoinHandle<()>
в пуле потоков, мы будем сохранять экземпляры структуры
Worker
. Каждый
Worker будет хранить один экземпляр
JoinHandle<()>
. Затем мы реализуем метод у
Worker
, который берет код замыкания для запуска и отправляет его в уже запущенный поток для выполнения. Мы также назначим каждому работнику id
, чтобы мы могли различать разных работников в пуле при ведении журнала или отладке.
Давайте внесём изменения в последовательность действий, которая выполняется при создании
ThreadPool
. Мы реализуем код, который отправляет замыкание в поток после того, как мы настроили
Worker следующим образом:
1. Определим структуру
Worker
(работник), которая содержит id и
JoinHandle<()>
2. Изменим
ThreadPool
, чтобы он содержал вектор экземпляров
Worker
3. Определим функцию
Worker::new
, которая принимает номер id и возвращает экземпляр
Worker
, который содержит id и поток, порождённый пустым замыканием.
4. В
ThreadPool::new используем счётчик цикла for для генерации id
, создаём новый
Worker с этим id и сохраняем экземпляр "работника" в вектор.
Если вы готовы принять вызов, попробуйте реализовать эти изменения самостоятельно,
прежде чем смотреть код листинге 20-15.
Готовы? Вот листинг 20-15 с одним из способов сделать предыдущие модификации.
Файл: src/lib.rs
Листинг 20-15: Изменение
ThreadPool
для хранения экземпляров
Worker
вместо непосредственного
хранения потоков
Мы изменили имя поля в
ThreadPool с threads на workers
, потому что теперь оно содержит экземпляры
Worker вместо экземпляров
JoinHandle<()>
. Мы используем счётчик в цикле for в качестве аргумента для
Worker::new и сохраняем каждый новый
Worker в векторе с именем workers
Внешний код (вроде нашего сервера в src/bin/main.rs) не должен знать подробности реализации касательно использования структуры
Worker внутри
ThreadPool
, поэтому мы делаем структуру
Worker и её новую функцию new приватными. Функция
Worker::new использует заданный нами id и сохраняет экземпляр
JoinHandle<()>
,
который создаётся путём порождение нового потока с пустым замыканием.
Этот код скомпилируется и будет хранить количество экземпляров
Worker
, которое мы указали в качестве аргумента функции
ThreadPool::new
. Но мы все ещё не обрабатываем use std::thread; pub struct
ThreadPool
{ workers:
Vec
} impl
ThreadPool {
// --snip-- pub fn new
(size: usize
) -> ThreadPool { assert!
(size >
0
); let mut workers =
Vec
::with_capacity(size); for id in
0
..size { workers.push(Worker::new(id));
}
ThreadPool { workers }
}
// --snip--
} struct
Worker
{ id: usize
, thread: thread::JoinHandle<()>,
} impl
Worker { fn new
(id: usize
) -> Worker { let thread = thread::spawn(|| {});
Worker { id, thread }
}
}
замыкание, которое мы получаем в методе execute
. Давайте взглянем на то, как это сделать.
. Давайте взглянем на то, как это сделать.
1 ... 54 55 56 57 58 59 60 61 62
Отправка запросов в потоки через каналы
Теперь мы рассмотрим проблему, заключающуюся в том, что замыкания переданные в thread::spawn абсолютно ничего не делают. Вот мы получаем замыкание, которое хотим выполнить в методе execute
. Но для запуска нам необходимо передать замыкание в метод thread::spawn
, где на каждый
ThreadPool создаётся один
Worker
Мы хотим, чтобы только что созданные структуры
Worker извлекали код для запуска из очереди хранящейся в
ThreadPool и отправляли этот код в свой поток для выполнения.
В главе 16 вы узнали о каналах (channels) - простом способе связи между двумя потоками,
который идеально подойдёт для этого сценария. Мы будем использовать канал в качестве очереди заданий, а команда execute отправит задание из
ThreadPool экземплярам
Worker
, который отправит задание в свой поток. Вот план:
1.
ThreadPool создаст канал и будет удерживать его передающую сторону.
2. Каждый
Worker будет удерживать принимающую сторону канала.
3. Мы создадим новую структуру
Job которая будет содержать замыкания, которые мы хотим отправить в канал.
4. Метод execute отправит задание, которое он хочет выполнить, в отправляющую сторону канала.
5. В своём потоке
Worker будет выполнять цикл с принимающей стороной канала и выполнит замыкание любого получаемого задания.
Давайте начнём с создания канала в
ThreadPool::new и удержания отправляющей стороны в экземпляре
ThreadPool
, как показано в листинге 20-16. В структуре
Job сейчас ничего не содержится, но это будет тип элемента который мы отправляем в канал.
Файл: src/lib.rs
Листинг 20-16: Модификация
ThreadPool
для хранения отправляющей части канала, который отправляет
экземпляры
Job
В
ThreadPool::new мы создаём наш новый канал и пул содержащий отправляющую сторону. Код успешно скомпилируется все ещё с предупреждениями.
Давайте попробуем передавать принимающую сторону канала каждому "работнику"
(структуре woker), когда пул потоков создаёт канал. Мы знаем, что хотим использовать получающую часть канала в потоке порождаемым "работником", поэтому мы будем ссылаться на параметр receiver в замыкании. Код 20-17 пока не компилируется.
Файл: src/lib.rs use std::{sync::mpsc, thread}; pub struct
ThreadPool
{ workers:
Vec
} struct
Job
; impl
ThreadPool {
// --snip-- pub fn new
(size: usize
) -> ThreadPool { assert!
(size >
0
); let
(sender, receiver) = mpsc::channel(); let mut workers =
Vec
::with_capacity(size); for id in
0
..size { workers.push(Worker::new(id));
}
ThreadPool { workers, sender }
}
// --snip--
}