1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
use std::collections::VecDeque;
use futures::{Future, Sink, Stream, Poll, Async, AsyncSink};
use futures::sync::mpsc;
use comms::Room;

use net::*;

pub struct Spectators {
    spectator_rx: mpsc::Receiver<MsgClient<String>>,
    spectators: MsgRoom<String>,
    msg_rx: mpsc::Receiver<Msg>,
    msg_queue: VecDeque<Msg>,
}

impl Spectators {
    pub fn new(spectator_rx: mpsc::Receiver<MsgClient<String>>,
               msg_rx: mpsc::Receiver<Msg>)
               -> Spectators {
        Spectators {
            spectator_rx: spectator_rx,
            spectators: Room::default(),
            msg_rx: msg_rx,
            msg_queue: VecDeque::new(),
        }
    }
}

impl Future for Spectators {
    type Item = ();
    type Error = ();

    fn poll(&mut self) -> Poll<(), ()> {
        println!("spectators wakeup {:?}", self.spectators.ids());

        loop {
            match self.spectator_rx.poll() {
                Ok(Async::NotReady) => break,
                Ok(Async::Ready(Some(client))) => {
                    self.spectators.insert(client);
                }
                Ok(Async::Ready(None)) => {
                    // If stream closed, shutdown this future.
                    // @TODO: Guard this a bit better with panics.
                    self.spectator_rx.close();
                    self.spectators.close_all();
                }
                Err(_) => {}
            }
        }

        loop {
            match self.msg_rx.poll() {
                Ok(Async::NotReady) => break,
                Ok(Async::Ready(Some(msg))) => self.msg_queue.push_back(msg),
                Ok(Async::Ready(None)) => {
                    // If stream closed, shutdown this future.
                    // @TODO: Guard this a bit better with panics.
                    self.spectator_rx.close();
                    self.spectators.close_all();
                }
                Err(_) => {}
            }
        }

        // If any spectator sends a message, disconnect them as that behaviour is not
        // consistent with spectating.
        match self.spectators.poll() {
            Ok(Async::Ready(Some((id, _)))) => {
                // @TODO: Would be nice to have a `close_one` method to avoid the heap Vec.
                self.spectators.close(vec![id].into_iter().collect());
            }
            _ => {}
        }

        match self.spectators.poll_complete() {
            Ok(Async::Ready(())) => {}
            Ok(Async::NotReady) => {}
            Err(_) => {}
        }

        while let Some(msg) = self.msg_queue.pop_front() {
            for id in self.spectators.ids() {
                match self.spectators.start_send((id.clone(), msg.clone())) {
                    Ok(AsyncSink::NotReady(_)) |
                    Err(_) => {
                        self.spectators.close(vec![id].into_iter().collect());
                    }
                    Ok(AsyncSink::Ready) => {}
                }
            }

            match self.spectators.poll_complete() {
                Ok(Async::Ready(())) => continue,
                Ok(Async::NotReady) => break,
                Err(_) => {}
            }
        }

        Ok(Async::NotReady)
    }
}