1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
use std::collections::VecDeque;
use futures::{Future, Sink, Stream, Poll, Async, AsyncSink};
use futures::sync::mpsc;
use comms::Room;
use net::*;
pub struct Spectators {
spectator_rx: mpsc::Receiver<MsgClient<String>>,
spectators: MsgRoom<String>,
msg_rx: mpsc::Receiver<Msg>,
msg_queue: VecDeque<Msg>,
}
impl Spectators {
pub fn new(spectator_rx: mpsc::Receiver<MsgClient<String>>,
msg_rx: mpsc::Receiver<Msg>)
-> Spectators {
Spectators {
spectator_rx: spectator_rx,
spectators: Room::default(),
msg_rx: msg_rx,
msg_queue: VecDeque::new(),
}
}
}
impl Future for Spectators {
type Item = ();
type Error = ();
fn poll(&mut self) -> Poll<(), ()> {
println!("spectators wakeup {:?}", self.spectators.ids());
loop {
match self.spectator_rx.poll() {
Ok(Async::NotReady) => break,
Ok(Async::Ready(Some(client))) => {
self.spectators.insert(client);
}
Ok(Async::Ready(None)) => {
self.spectator_rx.close();
self.spectators.close_all();
}
Err(_) => {}
}
}
loop {
match self.msg_rx.poll() {
Ok(Async::NotReady) => break,
Ok(Async::Ready(Some(msg))) => self.msg_queue.push_back(msg),
Ok(Async::Ready(None)) => {
self.spectator_rx.close();
self.spectators.close_all();
}
Err(_) => {}
}
}
match self.spectators.poll() {
Ok(Async::Ready(Some((id, _)))) => {
self.spectators.close(vec![id].into_iter().collect());
}
_ => {}
}
match self.spectators.poll_complete() {
Ok(Async::Ready(())) => {}
Ok(Async::NotReady) => {}
Err(_) => {}
}
while let Some(msg) = self.msg_queue.pop_front() {
for id in self.spectators.ids() {
match self.spectators.start_send((id.clone(), msg.clone())) {
Ok(AsyncSink::NotReady(_)) |
Err(_) => {
self.spectators.close(vec![id].into_iter().collect());
}
Ok(AsyncSink::Ready) => {}
}
}
match self.spectators.poll_complete() {
Ok(Async::Ready(())) => continue,
Ok(Async::NotReady) => break,
Err(_) => {}
}
}
Ok(Async::NotReady)
}
}