ВУЗ: Не указан
Категория: Не указан
Дисциплина: Не указана
Добавлен: 11.01.2024
Просмотров: 1132
Скачиваний: 5
ВНИМАНИЕ! Если данный файл нарушает Ваши авторские права, то обязательно сообщите нам.
СОДЕРЖАНИЕ
Листинг 20-17: Передача принимающей части канала "работнику"
Мы внесли несколько небольших и простых изменений: мы передаём принимающую часть канала в
Worker::new
, а затем используем его внутри замыкания.
При попытке проверить код, мы получаем ошибку:
Код пытается передать receiver в несколько экземпляров
Worker
. Это не будет работать, как вы помните из главы 16: реализация канала предоставляемая Rust,
impl
ThreadPool {
// --snip-- pub fn new
(size: usize
) -> ThreadPool { assert!
(size >
0
); let
(sender, receiver) = mpsc::channel(); let mut workers =
Vec
::with_capacity(size); for id in
0
..size { workers.push(Worker::new(id, receiver));
}
ThreadPool { workers, sender }
}
// --snip--
}
// --snip-- impl
Worker { fn new
(id: usize
, receiver: mpsc::Receiver
});
Worker { id, thread }
}
}
$
cargo check
Checking hello v0.1.0 (file:///projects/hello) error[E0382]: use of moved value: `receiver`
-->
src/lib.rs:26:42
|
21 | let (sender, receiver) = mpsc::channel();
| -------- move occurs because `receiver` has type
`std::sync::mpsc::Receiver
26 | workers.push(Worker::new(id, receiver));
| ^^^^^^^^ value moved here, in previous iteration of loop
For more information about this error, try `rustc --explain E0382`. error: could not compile `hello` due to previous error
является моделью несколько производителей (multiple producer), один потребитель
(single consumer). Это означает, что мы не можем просто клонировать принимающую часть канала для исправления этого кода. Даже если бы мы это могли, это не техника которую мы хотели бы использовать; вместо этого мы хотим распределить задачи среди потоков, разделяя один receiver среди всех "работников".
Кроме того, удаление задачи из очереди канала включает изменение receiver
, поэтому потокам необходим безопасный способ делиться и изменять receiver
, в противном случае мы можем получить условия гонки (как описано в главе 16).
Вспомните умные указатели, которые обсуждались в главе 16: чтобы делиться владением между несколькими потоками и позволить потокам изменять значение, нам нужно использовать тип
Arc>
. Тип
Arc позволит нескольким "работникам" владеть получателем (receiver), а
Mutex гарантирует что только один "работник" получит задание
(job) от получателя в один момент времени. Листинг 20-18 показывает изменения,
которые мы должны сделать.
Файл: src/lib.rs use std::{ sync::{mpsc, Arc, Mutex}, thread,
};
// --snip-- impl
ThreadPool {
// --snip-- pub fn new
(size: usize
) -> ThreadPool { assert!
(size >
0
); let
(sender, receiver) = mpsc::channel(); let receiver = Arc::new(Mutex::new(receiver)); let mut workers =
Vec
::with_capacity(size); for id in
0
..size { workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool { workers, sender }
}
// --snip--
}
// --snip-- impl
Worker { fn new
(id: usize
, receiver: Arc>>) -> Worker {
// --snip--
}
}
(single consumer). Это означает, что мы не можем просто клонировать принимающую часть канала для исправления этого кода. Даже если бы мы это могли, это не техника которую мы хотели бы использовать; вместо этого мы хотим распределить задачи среди потоков, разделяя один receiver среди всех "работников".
Кроме того, удаление задачи из очереди канала включает изменение receiver
, поэтому потокам необходим безопасный способ делиться и изменять receiver
, в противном случае мы можем получить условия гонки (как описано в главе 16).
Вспомните умные указатели, которые обсуждались в главе 16: чтобы делиться владением между несколькими потоками и позволить потокам изменять значение, нам нужно использовать тип
Arc
. Тип
Arc позволит нескольким "работникам" владеть получателем (receiver), а
Mutex гарантирует что только один "работник" получит задание
(job) от получателя в один момент времени. Листинг 20-18 показывает изменения,
которые мы должны сделать.
Файл: src/lib.rs use std::{ sync::{mpsc, Arc, Mutex}, thread,
};
// --snip-- impl
ThreadPool {
// --snip-- pub fn new
(size: usize
) -> ThreadPool { assert!
(size >
0
); let
(sender, receiver) = mpsc::channel(); let receiver = Arc::new(Mutex::new(receiver)); let mut workers =
Vec
::with_capacity(size); for id in
0
..size { workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool { workers, sender }
}
// --snip--
}
// --snip-- impl
Worker { fn new
(id: usize
, receiver: Arc
// --snip--
}
}
Листинг 20-18: Совместное использование принимающей стороны канала среди "работников" используя
Arc
и
Mutex
В
ThreadPool::new мы помещаем принимающую сторону канала внутрь
Arc и
Mutex
Для каждого нового "работника" мы клонируем
Arc
, чтобы увеличить счётчик ссылок так, что "работники" могут разделять владение принимающей стороны канала.
С этими изменениями код компилируется! Мы подбираемся к цели!
Реализация метода execute
Давайте реализуем метод execute у структуры
ThreadPool
. Мы также изменим тип
Job со структуры на псевдоним типа для типаж-объекта, который содержит тип замыкания принимаемый методом execute
. Как описано в разделе "Создание синонимов типа с помощью псевдонимов типа"
главы 19, псевдонимы типов позволяют делать длинные типы короче. Посмотрите в листинг 20-19.
Файл: src/lib.rs
Листинг 20-19: Создание псевдонима типа
Job
для
Box
, содержащего каждое замыкание и затем
отправляющее задание (job) в канал
После создания нового экземпляра
Job с помощью замыкания, получаемого в метод execute
, мы отправляем это задание в отправляющую часть канала. Мы вызываем unwrap для send в случае неудачной отправки. Это может произойти, если например,
мы остановим выполнение всех наших потоков, что означает, что принимающая сторона прекратила получение новых сообщений. На данный момент мы не можем остановить выполнение наших потоков: наши потоки продолжают выполняться, пока существует пул. Причина, по которой мы используем unwrap
, заключается в том, что мы знаем, что сбоя не произойдёт, но компилятор этого не знает.
// --snip-- type
Job
=
Box
<
dyn
FnOnce
() +
Send
+
'static
>; impl
ThreadPool {
// --snip-- pub fn execute
self
, f: F) where
F:
FnOnce
() +
Send
+
'static
,
{ let job =
Box
::new(f); self
.sender.send(job).unwrap();
}
}
// --snip--
Но мы ещё не закончили! В "работнике" (worker) наше замыкание, переданное в thread::spawn все ещё ссылается только на принимающую сторону канала. Вместо этого нам нужно, чтобы замыкание работало в бесконечном цикле, запрашивая задание у принимающей части канала и выполняя задание, когда оно принято. Давайте внесём изменения, показанные в листинге 20-20 внутри
Worker::new
Файл: src/lib.rs
Листинг 20-20: Получение и выполнение заданий в потоке "работника"
Здесь мы сначала вызываем lock у receiver
, чтобы получить мьютекс, а затем вызываем unwrap для паники при любых ошибках. Захват блокировки может завершиться неудачей, если мьютекс находится в отравленном state (poisoned state), что может произойти если какой-то другой поток запаниковал, удерживая блокировку,
вместо снятия блокировки. В этой ситуации правильное действие - вызвать unwrap для паники потока. Не стесняйтесь заменить unwrap на expect с сообщением об ошибке,
которое имеет для вас значение.
Если мы получим блокировку мьютекса, мы вызываем recv для получения
Job из канала. Окончательный вызов unwrap проходит мимо любых ошибок, которые могут произойти, если поток удерживающий отправляющую сторону канала, завершил работу подобно тому, как метод send возвращает
Err
, если принимающая сторона закрывается.
Вызов recv блокирующий, поэтому если ещё нет задач (job), то текущий поток будет ждать, пока задача не станет доступной.
Mutex
гарантирует, что только один поток
Worker пытается запросить задачу за раз.
Наш пул потоков теперь находится в рабочем состоянии! Выполните cargo run и
сделайте несколько запросов:
// --snip-- impl
Worker { fn new
(id: usize
, receiver: Arc
move
|| loop
{ let job = receiver.lock().unwrap().recv().unwrap(); println!
(
"Worker {id} got a job; executing."
); job();
});
Worker { id, thread }
}
}
Успех! Теперь у нас есть пул потоков, который обрабатывает соединения асинхронно.
Никогда не создаётся более четырёх потоков, поэтому наша система не будет перегружена, если сервер получает много запросов. Если мы отправим запрос ресурса
/sleep, сервер сможет обслуживать другие запросы, запустив их в другом потоке.
Примечание: если вы запрашиваете /sleep в нескольких окнах браузера одновременно, они могут загружаться по одному с интервалами в 5 секунд.
Некоторые веб-браузеры выполняют несколько экземпляров одного и того же запроса последовательно из-за кэширования. Данное ограничение не вызвано нашим веб-сервером.
После изучения цикла while let в главе 18 вы можете удивиться, почему мы не написали код рабочего потока (worker thread), как показано в листинге 20-22.
$
cargo run
Compiling hello v0.1.0 (file:///projects/hello) warning: field is never read: `workers`
-->
src/lib.rs:7:5
|
7 | workers: Vec
| ^^^^^^^^^^^^^^^^^^^^
|
= note: `#[warn(dead_code)]` on by default warning: field is never read: `id`
-->
src/lib.rs:48:5
|
48 | id: usize,
| ^^^^^^^^^ warning: field is never read: `thread`
-->
src/lib.rs:49:5
|
49 | thread: thread::JoinHandle<()>,
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ warning: 3 warnings emitted
Finished dev [unoptimized + debuginfo] target(s) in 1.40s
Running `target/debug/main`
Worker 0 got a job; executing.
Worker 2 got a job; executing.
Worker 1 got a job; executing.
Worker 3 got a job; executing.
Worker 0 got a job; executing.
Worker 2 got a job; executing.
Worker 1 got a job; executing.
Worker 3 got a job; executing.
Worker 0 got a job; executing.
Worker 2 got a job; executing.
Файл: src/lib.rs
Листинг 20-22: Альтернативная реализация
Worker::new
с использованием
while let
Этот код компилируется и запускается, но не приводит к желаемому поведению :
медленный запрос всё равно приведёт к тому, что другие запросы будут ждать обработки. Причина здесь несколько тоньше:
Mutex не имеет общедоступного unlock потому что право собственности на блокировку зависит от времени жизни
MutexGuard
в
LockResult
которое возвращает метод lock
. Во время компиляции анализатор заимствований может затем применить правило, согласно которому к ресурсу, охраняемому
Mutex нельзя получить доступ, если мы не удерживаем блокировку. Но эта реализация также может привести к тому, что блокировка будет удерживаться дольше, чем предполагалось, если мы не будем тщательно продумывать время жизни
MutexGuard
Код в листинге 20-20, который использует let job = receiver.lock().unwrap().recv().unwrap();
работает, потому что с let любые временные значения, используемые в выражении справа от знака равенства,
немедленно удаляются после завершения оператора let
. Однако while let
(и if let and match
) не удаляет временные значения до конца связанного блока. В листинге 20-21
блокировка сохраняется на время вызова job()
, что означает, что другие исполнители не могут получать задания.
// --snip-- impl
Worker { fn new
(id: usize
, receiver: Arc
move
|| { while let
Ok
(job) = receiver.lock().unwrap().recv() { println!
(
"Worker {id} got a job; executing."
); job();
}
});
Worker { id, thread }
}
}
Изящное завершение и освобождение ресурсов
Листинг 20-20 асинхронно отвечает на запросы с помощью использования пула потоков,
как мы и хотели. Мы получаем некоторые предупреждения про workers
, id и поля thread
, которые мы не используем напрямую, что напоминает нам о том, что мы не освобождаем все ресурсы. Когда мы используем менее элегантный метод остановки основного потока клавишной комбинацией ctrl-c, все остальные потоки также немедленно останавливаются, даже если они находятся в середине обработки запроса.
Теперь мы будем реализовывать типаж
Drop для вызова join у каждого потока в пуле,
чтобы они могли завершить запросы над которыми они работают до закрытия. Затем мы реализуем способ сообщить потокам, что они должны перестать принимать новые запросы и завершить работу. Чтобы увидеть этот код в действии, мы изменим наш сервер так, чтобы он принимал только два запроса, прежде чем корректно завершить работу его пула потоков.
Реализация типажа Drop для ThreadPool
Давайте начнём с реализации
Drop у нашего пула потоков. Когда пул удаляется, все наши потоки должны объединиться (join), чтобы убедиться, что они завершают свою работу. В листинге 20-22 показана первая попытка реализации
Drop
, код пока не будет работать.
Файл: src/lib.rs
Листинг 20-22: Присоединение (Joining) каждого потока, когда пул потоков выходит из области видимости
Во-первых, мы проходим циклом по каждому workers из пула потоков. Для этого мы используем
&mut
, потому что self является изменяемой ссылкой и нам также нужно иметь возможность изменять экземпляр worker
. Для каждого "работника" мы печатаем сообщение о том, что этот конкретный "работник" завершается, затем вызываем join у
потока этого "работника". Если вызов join происходит с ошибкой, мы используем unwrap
, чтобы вызвать панику в Rust и завершить не совсем красиво.
Ошибка получаемая при компиляции этого кода:
impl
Drop for
ThreadPool { fn drop
(&
mut self
) { for worker in
&
mut self
.workers { println!
(
"Shutting down worker {}"
, worker.id); worker.thread.join().unwrap();
}
}
}
Ошибка говорит, что мы не можем вызвать join
, потому что у нас есть только изменяемое заимствование каждого worker и что join забирает во владение его аргумент. Чтобы решить эту проблему, нужно переместить поток из экземпляра
Worker
,
который владеет thread
, чтобы join мог использовать внутренний поток. Мы сделали это в коде 17-15: если вместо этого
Worker содержит тип
Option<:joinhandle>>
, мы можем вызвать метод take у
Option
, чтобы переместить значение из варианта
Some и оставить вариант
None на его месте. Другими словами, работающий
Worker будет содержать вариант
Some внутри thread
, и когда мы хотим очистить
Worker
, мы заменяем значение варианта
Some на вариант
None
, чтобы у
Worker не было потока для запуска.
Итак, мы хотим обновить объявление
Worker следующим образом:
Файл: src/lib.rs
Теперь давайте опираться на компилятор, чтобы найти другие места, которые нужно изменить. Проверяя код, мы получаем две ошибки:
$
cargo check
Checking hello v0.1.0 (file:///projects/hello) error[E0507]: cannot move out of `worker.thread` which is behind a mutable reference
--> src/lib.rs:52:13
|
52 | worker.thread.join().unwrap();
| ^^^^^^^^^^^^^ ------ `worker.thread` moved due to this method call
| |
| move occurs because `worker.thread` has type `JoinHandle<()>`, which does not implement the `Copy` trait
| note: this function takes ownership of the receiver `self`, which moves
`worker.thread`
For more information about this error, try `rustc --explain E0507`. error: could not compile `hello` due to previous error struct
Worker
{ id: usize
, thread:
Option
<:joinhandle>>,
}
Давайте обратимся ко второй ошибке, которая указывает на код в конце
Worker::new
;
нам нужно обернуть значение thread в вариант
Some при создании нового
Worker
Внесите следующие изменения, чтобы исправить эту ошибку:
Файл: src/lib.rs
Первая ошибка находится в нашей реализации
Drop
. Ранее мы упоминали, что намеревались вызвать take для параметра
Option
, чтобы забрать thread из процесса worker
. Следующие изменения делают это:
Файл: src/lib.rs
$
cargo check
Checking hello v0.1.0 (file:///projects/hello) error[E0599]: no method named `join` found for enum `Option` in the current scope
-->
src/lib.rs:52:27
|
52 | worker.thread.join().unwrap();
| ^^^^ method not found in `Option
-->
src/lib.rs:72:22
|
72 | Worker { id, thread }
| ^^^^^^ expected enum `Option`, found struct `JoinHandle`
|
= note: expected enum `Option
|
72 | Worker { id, thread: Some(thread) }
| +++++++++++++ +
Some errors have detailed explanations: E0308, E0599.
For more information about an error, try `rustc --explain E0308`. error: could not compile `hello` due to 2 previous errors impl
Worker { fn new
(id: usize
, receiver: Arc
// --snip--
Worker { id, thread:
Some
(thread),
}
}
}
Как уже говорилось в главе 17, метод take у типа
Option забирает значение из варианта
Some и оставляет вариант
None в этом месте. Мы используем if let
, чтобы деструктурировать
Some и получить поток; затем вызываем join у потока. Если поток "работника" уже
None
, мы знаем, что этот "работник" уже очистил свой поток, поэтому в этом случае ничего не происходит.
1 ... 54 55 56 57 58 59 60 61 62