1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
use std::hash::Hash;
use std::collections::{HashSet, HashMap};
use futures::{Future, Sink, Stream, Poll, Async};
use super::*;
pub struct Receive<I, C>
where I: Clone + Send + PartialEq + Eq + Hash + Debug + 'static,
C: Sink + Stream + 'static
{
room: Option<Room<I, C>>,
poll_list: HashSet<I>,
replies: Vec<(I, C::Item)>,
}
impl<I, C> Receive<I, C>
where I: Clone + Send + PartialEq + Eq + Hash + Debug + 'static,
C: Sink + Stream + 'static
{
#[doc(hidden)]
pub fn new(room: Room<I, C>, ids: HashSet<I>) -> Receive<I, C> {
Receive {
room: Some(room),
poll_list: ids,
replies: vec![],
}
}
pub fn into_inner(mut self) -> (Room<I, C>, Vec<(I, C::Item)>) {
(self.room.take().unwrap(), self.replies)
}
}
impl<I, C> Future for Receive<I, C>
where I: Clone + Send + PartialEq + Eq + Hash + Debug + 'static,
C: Sink + Stream + 'static
{
type Item = (HashMap<I, C::Item>, Room<I, C>);
type Error = ();
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
let mut room = self.room.take().unwrap();
let poll_list = self.poll_list.drain().collect::<Vec<_>>();
for id in poll_list {
let ready_client = match room.client_mut(&id) {
Some(ready_client) => ready_client,
None => continue,
};
match ready_client.poll() {
Ok(Async::NotReady) => {
self.poll_list.insert(id);
}
Ok(Async::Ready(Some(msg))) => self.replies.push((id, msg)),
Ok(Async::Ready(None)) |
Err(_) => {}
}
}
if self.poll_list.is_empty() {
let replies_hashmap: HashMap<_, _> = self.replies.drain(..).collect();
Ok(Async::Ready((replies_hashmap, room)))
} else {
self.room = Some(room);
Ok(Async::NotReady)
}
}
}