You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

167 lines
5.5 KiB

//! Connecting to WhatsApp Web, and handling the intermediate states when doing so.
//!
//! FIXME: maybe find a way to share code between this and `ModemInner` from
//! `src/modem.rs`.
use whatsappweb::conn::WebConnection;
use whatsappweb::event::WaEvent;
use whatsappweb::req::WaRequest;
use whatsappweb::errors::WaError;
use whatsappweb::session::PersistentSession;
use tokio_timer::Delay;
use std::time::{Duration, Instant};
use failure::Error;
use futures::{self, Future, Stream, Poll, Async, Sink, StartSend};
pub struct WebConnectionWrapperConfig {
pub backoff_time_ms: u64
}
enum WrapperState {
Disabled,
Waiting(Delay),
Initializing(Box<dyn Future<Item = WebConnection, Error = WaError>>),
Running(WebConnection)
}
pub struct WebConnectionWrapper {
inner: WrapperState,
persist: Option<PersistentSession>,
cfg: WebConnectionWrapperConfig
}
impl WebConnectionWrapper {
pub fn new(cfg: WebConnectionWrapperConfig) -> Self {
Self {
inner: WrapperState::Disabled,
persist: None,
cfg
}
}
pub fn is_disabled(&self) -> bool {
if let WrapperState::Disabled = self.inner {
true
}
else {
false
}
}
pub fn disable(&mut self) {
self.inner = WrapperState::Disabled;
}
pub fn connect_new(&mut self) {
self.set_persistent(None);
self.initialize();
}
pub fn connect_persistent(&mut self, persist: PersistentSession) {
self.set_persistent(Some(persist));
self.initialize();
}
pub fn set_persistent(&mut self, persist: Option<PersistentSession>) {
self.persist = persist;
}
pub fn is_connected(&self) -> bool {
if let WrapperState::Running(_) = self.inner {
true
}
else {
false
}
}
fn initialize(&mut self) {
debug!("Connecting to WhatsApp Web...");
let fut: Box<dyn Future<Item = WebConnection, Error = WaError>> = match self.persist.clone() {
Some(p) => Box::new(WebConnection::connect_persistent(p)),
None => Box::new(WebConnection::connect_new())
};
self.inner = WrapperState::Initializing(fut);
}
fn backoff(&mut self) {
let delay = Delay::new(Instant::now() + Duration::from_millis(self.cfg.backoff_time_ms));
self.inner = WrapperState::Waiting(delay);
}
}
impl Stream for WebConnectionWrapper {
type Item = Result<WaEvent, WaError>;
type Error = Error;
fn poll(&mut self) -> Poll<Option<Result<WaEvent, WaError>>, Error> {
use self::WrapperState::*;
loop {
match std::mem::replace(&mut self.inner, Disabled) {
Disabled => return Ok(Async::NotReady),
Waiting(mut delay) => {
match delay.poll()? {
Async::Ready(_) => self.initialize(),
Async::NotReady => {
self.inner = Waiting(delay);
return Ok(Async::NotReady);
},
}
},
Initializing(mut fut) => {
match fut.poll() {
Ok(Async::Ready(c)) => {
debug!("Connected to WhatsApp Web.");
self.inner = Running(c);
},
Ok(Async::NotReady) => {
self.inner = Initializing(fut);
return Ok(Async::NotReady);
},
Err(e) => {
warn!("Failed to connect to WhatsApp: {}", e);
self.backoff();
}
}
},
Running(mut fut) => {
match fut.poll() {
Ok(Async::NotReady) => {
self.inner = Running(fut);
return Ok(Async::NotReady);
},
Ok(Async::Ready(Some(x))) => {
self.inner = Running(fut);
return Ok(Async::Ready(Some(Ok(x))));
},
Ok(Async::Ready(None)) => {
unreachable!()
},
Err(e) => {
self.backoff();
return Ok(Async::Ready(Some(Err(e))));
}
}
},
}
}
}
}
impl Sink for WebConnectionWrapper {
type SinkItem = WaRequest;
type SinkError = WaError;
fn start_send(&mut self, item: WaRequest) -> StartSend<WaRequest, WaError> {
if let WrapperState::Running(ref mut c) = self.inner {
match c.start_send(item) {
Err(e) => {
warn!("WA sink failed: {}", e);
self.backoff();
return Err(e);
},
x => x
}
}
else {
panic!("WebConnectionWrapper should not be used as a sink while disconnected");
}
}
fn poll_complete(&mut self) -> Poll<(), WaError> {
if let WrapperState::Running(ref mut c) = self.inner {
c.poll_complete()
}
else {
panic!("WebConnectionWrapper should not be used as a sink while disconnected");
}
}
}