1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
use std::hash::Hash;
use std::collections::HashSet;
use futures::{Future, Sink, Stream, Poll, Async, AsyncSink};
use super::*;

pub struct Broadcast<I, C>
    where I: Clone + Send + PartialEq + Eq + Hash + Debug + 'static,
          C: Sink + Stream + 'static,
          C::SinkItem: Clone
{
    room: Option<Room<I, C>>,
    msg: C::SinkItem,
    start_send_list: HashSet<I>,
    poll_complete_list: Vec<I>,
}

impl<I, C> Broadcast<I, C>
    where I: Clone + Send + PartialEq + Eq + Hash + Debug + 'static,
          C: Sink + Stream + 'static,
          C::SinkItem: Clone
{
    #[doc(hidden)]
    pub fn new(room: Room<I, C>, msg: C::SinkItem, ids: HashSet<I>) -> Broadcast<I, C> {
        Broadcast {
            room: Some(room),
            msg: msg,
            start_send_list: ids,
            poll_complete_list: vec![],
        }
    }

    pub fn into_inner(mut self) -> Room<I, C> {
        self.room.take().unwrap()
    }
}

impl<I, C> Future for Broadcast<I, C>
    where I: Clone + Send + PartialEq + Eq + Hash + Debug + 'static,
          C: Sink + Stream + 'static,
          C::SinkItem: Clone
{
    type Item = Room<I, C>;
    type Error = ();

    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
        let mut room = self.room.take().unwrap();

        let start_send_list = self.start_send_list.drain().collect::<Vec<_>>();
        for id in start_send_list {
            let ready_client = match room.client_mut(&id) {
                Some(ready_client) => ready_client,
                None => continue,
            };
            match ready_client.start_send(self.msg.clone()) {
                Ok(AsyncSink::NotReady(_)) => {
                    self.start_send_list.insert(id);
                }
                Ok(AsyncSink::Ready) => {
                    self.poll_complete_list.push(id);
                }
                Err(_) => {}
            }
        }

        let poll_complete_list = self.poll_complete_list.drain(..).collect::<Vec<_>>();
        for id in poll_complete_list {
            let ready_client = match room.client_mut(&id) {
                Some(ready_client) => ready_client,
                None => continue,
            };
            match ready_client.poll_complete() {
                Ok(Async::NotReady) => {
                    self.poll_complete_list.push(id);
                }
                Ok(Async::Ready(())) | Err(_) => {}
            }
        }

        if self.start_send_list.is_empty() && self.poll_complete_list.is_empty() {
            Ok(Async::Ready(room))
        } else {
            self.room = Some(room);
            Ok(Async::NotReady)
        }
    }
}