Browse Source

0.2.0-pre: upgrade to ww-rs-eta 0.5.0-pre1

- We now use the fancy new improved WhatsApp connection logic
  in ww-rs-eta 0.5.0-pre1, which is nice.
- As a result, the handling of failed and superseded connections
  should be vastly improved; no more random QR code requests from
  lingering connections!
- In addition, the user now gets notified whenever they *remove*
  the connection from their mobile app (although it could also
  conceivably remove the state from the DB as well)
- History fetching was removed, as it never worked.
- The `ignore_other_libraries` config key now doesn't ignore
  whatsappweb_eta, because it's useful to have that in there
  for debugging as well.
- Some `dyn`s were dotted about the place to satisfy Rust 2018 lints.
master
eta 4 years ago
parent
commit
8d8729dc3d
  1. 824
      Cargo.lock
  2. 2
      Cargo.toml
  3. 15
      src/comm.rs
  4. 4
      src/config.rs
  5. 2
      src/contact_factory.rs
  6. 8
      src/logging.rs
  7. 1
      src/main.rs
  8. 4
      src/modem.rs
  9. 2
      src/store.rs
  10. 345
      src/whatsapp.rs
  11. 159
      src/whatsapp_conn.rs

824
Cargo.lock generated

File diff suppressed because it is too large Load Diff

2
Cargo.toml

@ -2,7 +2,7 @@ @@ -2,7 +2,7 @@
authors = ["eeeeeta <eeeeeta@users.noreply.github.com>"]
name = "sms-irc"
edition = "2018"
version = "0.1.0"
version = "0.2.0-pre"
description = "A WhatsApp/SMS to IRC bridge"
license = "AGPL-3.0"

15
src/comm.rs

@ -4,16 +4,9 @@ use futures::sync::mpsc::{self, UnboundedSender, UnboundedReceiver}; @@ -4,16 +4,9 @@ use futures::sync::mpsc::{self, UnboundedSender, UnboundedReceiver};
use huawei_modem::cmd::sms::SmsMessage;
use huawei_modem::pdu::PduAddress;
use whatsappweb::Jid;
use whatsappweb::GroupMetadata;
use whatsappweb::connection::State as WaState;
use whatsappweb::connection::UserData as WaUserData;
use whatsappweb::connection::PersistentSession as WaPersistentSession;
use whatsappweb::connection::DisconnectReason as WaDisconnectReason;
use whatsappweb::message::ChatMessage as WaMessage;
use crate::config::Config;
use crate::store::Store;
use tokio_core::reactor::Handle;
use qrcode::QrCode;
use crate::whatsapp_media::MediaResult;
pub enum ModemCommand {
@ -31,24 +24,16 @@ pub enum ModemCommand { @@ -31,24 +24,16 @@ pub enum ModemCommand {
pub enum WhatsappCommand {
StartRegistration,
LogonIfSaved,
QrCode(QrCode),
SendGroupMessage(String, String),
SendDirectMessage(PduAddress, String),
GroupAssociate(Jid, String),
GroupList,
GroupRemove(String),
GroupUpdateAll,
GotGroupMetadata(GroupMetadata),
StateChanged(WaState),
UserDataChanged(WaUserData),
PersistentChanged(WaPersistentSession),
Disconnect(WaDisconnectReason),
Message(bool, Box<WaMessage>),
MediaFinished(MediaResult),
CheckAcks,
PrintAcks,
MakeContact(PduAddress),
HistoryResponse(Jid, Option<Vec<WaMessage>>)
}
#[allow(dead_code)]
pub enum ContactFactoryCommand {

4
src/config.rs

@ -60,7 +60,9 @@ pub struct WhatsappConfig { @@ -60,7 +60,9 @@ pub struct WhatsappConfig {
#[serde(default)]
pub autoupdate_nicks: bool,
#[serde(default)]
pub load_history_messages: Option<u16>
pub load_history_messages: Option<u16>,
#[serde(default)]
pub backoff_time_ms: Option<u64>
}
#[derive(Deserialize, Debug, Clone)]
pub struct IrcClientConfig {

2
src/contact_factory.rs

@ -21,7 +21,7 @@ pub struct ContactFactory { @@ -21,7 +21,7 @@ pub struct ContactFactory {
cb_tx: UnboundedSender<ControlBotCommand>,
wa_tx: UnboundedSender<WhatsappCommand>,
m_tx: UnboundedSender<ModemCommand>,
contacts_starting: HashMap<PduAddress, Box<Future<Item = ContactManager, Error = Error>>>,
contacts_starting: HashMap<PduAddress, Box<dyn Future<Item = ContactManager, Error = Error>>>,
contacts: HashMap<PduAddress, ContactManager>,
contacts_presence: HashMap<PduAddress, Option<String>>,
failed_contacts: HashSet<PduAddress>,

8
src/logging.rs

@ -29,7 +29,9 @@ impl Logger { @@ -29,7 +29,9 @@ impl Logger {
}
impl log::Log for Logger {
fn enabled(&self, metadata: &Metadata) -> bool {
if self.ignore_other_libraries && !metadata.target().starts_with("sms_irc") {
if self.ignore_other_libraries
&& !metadata.target().starts_with("sms_irc")
&& !metadata.target().starts_with("whatsappweb_eta") {
return false;
}
let lvl = metadata.level();
@ -37,7 +39,9 @@ impl log::Log for Logger { @@ -37,7 +39,9 @@ impl log::Log for Logger {
}
fn log(&self, rec: &Record) {
use log::Level::*;
if self.ignore_other_libraries && !rec.target().starts_with("sms_irc") {
if self.ignore_other_libraries
&& !rec.target().starts_with("sms_irc")
&& !rec.target().starts_with("whatsappweb_eta") {
return;
}
if rec.level() <= self.stdout_loglevel {

1
src/main.rs

@ -26,6 +26,7 @@ mod control_common; @@ -26,6 +26,7 @@ mod control_common;
mod sender_common;
mod whatsapp;
mod whatsapp_media;
mod whatsapp_conn;
mod insp_s2s;
mod insp_user;
mod irc_s2c;

4
src/modem.rs

@ -32,14 +32,14 @@ enum ModemInner { @@ -32,14 +32,14 @@ enum ModemInner {
Uninitialized,
Disabled,
Waiting(Delay),
Initializing(Box<Future<Item = HuaweiModem, Error = Error>>),
Initializing(Box<dyn Future<Item = HuaweiModem, Error = Error>>),
Running {
modem: HuaweiModem,
urc_rx: UnboundedReceiver<AtResponse>
},
}
impl ModemInner {
fn init_future(path: &str, hdl: &Handle, timeout_ms: u32) -> Box<Future<Item = HuaweiModem, Error = Error>> {
fn init_future(path: &str, hdl: &Handle, timeout_ms: u32) -> Box<dyn Future<Item = HuaweiModem, Error = Error>> {
info!("Initializing modem {}", path);
let modem = HuaweiModem::new_from_path(path, hdl);
let fut = modem.into_future()

2
src/store.rs

@ -8,7 +8,7 @@ use std::sync::Arc; @@ -8,7 +8,7 @@ use std::sync::Arc;
use huawei_modem::pdu::PduAddress;
use diesel::prelude::*;
use serde_json;
use whatsappweb::connection::PersistentSession;
use whatsappweb::session::PersistentSession;
use whatsappweb::Jid;
use crate::util::{self, Result};
use chrono::NaiveDateTime;

345
src/whatsapp.rs

@ -1,60 +1,38 @@ @@ -1,60 +1,38 @@
//! Experimental support for WhatsApp.
use whatsappweb::connection::{WhatsappWebConnection, WhatsappWebHandler};
use whatsappweb::connection::State as WaState;
use whatsappweb::Jid;
use whatsappweb::Contact as WaContact;
use whatsappweb::Chat as WaChat;
use whatsappweb::GroupMetadata;
use whatsappweb::connection::UserData as WaUserData;
use whatsappweb::connection::PersistentSession as WaPersistentSession;
use whatsappweb::connection::DisconnectReason as WaDisconnectReason;
use whatsappweb::message::ChatMessage as WaMessage;
use whatsappweb::message::{ChatMessageContent, MessageId, Peer, MessageAckLevel};
use whatsappweb::session::PersistentSession as WaPersistentSession;
use whatsappweb::event::WaEvent;
use whatsappweb::req::WaRequest;
use whatsappweb::errors::WaError;
use whatsappweb::errors::DisconnectReason as WaDisconnectReason;
use huawei_modem::pdu::PduAddress;
use futures::sync::mpsc::{UnboundedSender, UnboundedReceiver};
use std::collections::HashMap;
use crate::store::Store;
use std::sync::Arc;
use crate::comm::{WhatsappCommand, ContactFactoryCommand, ControlBotCommand, InitParameters};
use crate::util::{self, Result};
use image::Luma;
use qrcode::QrCode;
use futures::{Future, Async, Poll, Stream};
use futures::{Future, Async, Poll, Stream, Sink};
use failure::Error;
use crate::models::Recipient;
use crate::whatsapp_media::{MediaInfo, MediaResult};
use regex::{Regex, Captures};
use chrono::prelude::*;
use tokio_timer::Interval;
use std::time::{Instant, Duration};
use unicode_segmentation::UnicodeSegmentation;
use std::collections::VecDeque;
use crate::comm::{WhatsappCommand, ContactFactoryCommand, ControlBotCommand, InitParameters};
use crate::util::{self, Result};
use crate::models::Recipient;
use crate::whatsapp_media::{MediaInfo, MediaResult};
use crate::store::Store;
use crate::whatsapp_conn::{WebConnectionWrapper, WebConnectionWrapperConfig};
struct WhatsappHandler {
tx: Arc<UnboundedSender<WhatsappCommand>>
}
impl WhatsappWebHandler for WhatsappHandler {
fn on_state_changed(&self, _: &WhatsappWebConnection<Self>, state: WaState) {
self.tx.unbounded_send(WhatsappCommand::StateChanged(state))
.unwrap();
}
fn on_user_data_changed(&self, _: &WhatsappWebConnection<Self>, user_data: WaUserData) {
self.tx.unbounded_send(WhatsappCommand::UserDataChanged(user_data))
.unwrap();
}
fn on_persistent_session_data_changed(&self, ps: WaPersistentSession) {
self.tx.unbounded_send(WhatsappCommand::PersistentChanged(ps))
.unwrap();
}
fn on_disconnect(&self, reason: WaDisconnectReason) {
self.tx.unbounded_send(WhatsappCommand::Disconnect(reason))
.unwrap();
}
fn on_message(&self, _: &WhatsappWebConnection<Self>, new: bool, message: Box<WaMessage>) {
self.tx.unbounded_send(WhatsappCommand::Message(new, message))
.unwrap();
}
}
struct MessageSendStatus {
ack_level: Option<MessageAckLevel>,
sent_ts: DateTime<Utc>,
@ -64,7 +42,7 @@ struct MessageSendStatus { @@ -64,7 +42,7 @@ struct MessageSendStatus {
alerted_pending: bool,
}
pub struct WhatsappManager {
conn: Option<WhatsappWebConnection<WhatsappHandler>>,
conn: WebConnectionWrapper,
rx: UnboundedReceiver<WhatsappCommand>,
wa_tx: Arc<UnboundedSender<WhatsappCommand>>,
cf_tx: UnboundedSender<ContactFactoryCommand>,
@ -72,7 +50,6 @@ pub struct WhatsappManager { @@ -72,7 +50,6 @@ pub struct WhatsappManager {
contacts: HashMap<Jid, WaContact>,
chats: HashMap<Jid, WaChat>,
outgoing_messages: HashMap<String, MessageSendStatus>,
state: WaState,
ack_warn: u64,
ack_warn_pending: u64,
ack_expiry: u64,
@ -86,17 +63,48 @@ pub struct WhatsappManager { @@ -86,17 +63,48 @@ pub struct WhatsappManager {
autocreate: Option<String>,
autoupdate_nicks: bool,
mark_read: bool,
load_history_messages: Option<u16>,
our_jid: Option<Jid>
our_jid: Option<Jid>,
outbox: VecDeque<WaRequest>
}
impl Future for WhatsappManager {
type Item = ();
type Error = Error;
fn poll(&mut self) -> Poll<(), Error> {
while let Async::Ready(com) = self.rx.poll().unwrap() {
let com = com.ok_or(format_err!("whatsappmanager rx died"))?;
self.handle_int_rx(com)?;
// We use this `cont` variable to handle the case where
// we change the state of `self.conn` during `handle_int_rx()`,
// and therefore need to poll it again.
let mut cont = true;
while cont {
cont = false;
while let Async::Ready(evt) = self.conn.poll()? {
match evt.expect("None from wrapper impossible") {
Ok(e) => self.on_wa_event(e)?,
Err(e) => self.on_wa_error(e)
}
}
while let Async::Ready(com) = self.rx.poll().unwrap() {
let com = com.ok_or(format_err!("whatsappmanager rx died"))?;
self.handle_int_rx(com)?;
cont = true;
}
if self.outbox.len() > 0 {
if self.conn.is_connected() {
while let Some(req) = self.outbox.pop_front() {
if let Err(e) = self.conn.start_send(req) {
self.on_wa_error(e);
break;
}
}
}
else {
warn!("Disconnected, so discarding messages in outbox");
self.outbox.clear();
}
}
if self.conn.is_connected() {
self.conn.poll_complete()?;
}
}
Ok(Async::NotReady)
}
@ -116,11 +124,11 @@ impl WhatsappManager { @@ -116,11 +124,11 @@ impl WhatsappManager {
let ack_warn_ms = p.cfg.whatsapp.ack_warn_ms.unwrap_or(5000);
let ack_warn_pending_ms = p.cfg.whatsapp.ack_warn_pending_ms.unwrap_or(ack_warn_ms * 2);
let ack_expiry_ms = p.cfg.whatsapp.ack_expiry_ms.unwrap_or(60000);
let load_history_messages = p.cfg.whatsapp.load_history_messages;
let ack_resend_ms = p.cfg.whatsapp.ack_resend_ms.clone();
let backlog_start = p.cfg.whatsapp.backlog_start.clone();
let mark_read = p.cfg.whatsapp.mark_read;
let autoupdate_nicks = p.cfg.whatsapp.autoupdate_nicks;
let backoff_time_ms = p.cfg.whatsapp.backoff_time_ms.unwrap_or(10000);
wa_tx.unbounded_send(WhatsappCommand::LogonIfSaved)
.unwrap();
let wa_tx = Arc::new(wa_tx);
@ -134,21 +142,24 @@ impl WhatsappManager { @@ -134,21 +142,24 @@ impl WhatsappManager {
Ok(())
});
p.hdl.spawn(timer);
let conn = WebConnectionWrapper::new(WebConnectionWrapperConfig {
backoff_time_ms
});
Self {
conn: None,
conn,
contacts: HashMap::new(),
chats: HashMap::new(),
outgoing_messages: HashMap::new(),
state: WaState::Uninitialized,
connected: false,
our_jid: None,
ack_warn: ack_warn_ms,
ack_warn_pending: ack_warn_pending_ms,
ack_expiry: ack_expiry_ms,
ack_resend: ack_resend_ms,
outbox: VecDeque::new(),
wa_tx, backlog_start,
rx, cf_tx, cb_tx, qr_path, store, media_path, dl_path, autocreate,
mark_read, autoupdate_nicks, load_history_messages
mark_read, autoupdate_nicks
}
}
fn handle_int_rx(&mut self, c: WhatsappCommand) -> Result<()> {
@ -157,26 +168,16 @@ impl WhatsappManager { @@ -157,26 +168,16 @@ impl WhatsappManager {
match c {
StartRegistration => self.start_registration()?,
LogonIfSaved => self.logon_if_saved()?,
QrCode(qr) => self.on_qr(qr)?,
SendGroupMessage(to, cont) => self.send_group_message(to, cont)?,
SendDirectMessage(to, cont) => self.send_direct_message(to, cont)?,
GroupAssociate(jid, to) => self.group_associate_handler(jid, to)?,
GroupList => self.group_list()?,
GroupUpdateAll => self.group_update_all()?,
GroupRemove(grp) => self.group_remove(grp)?,
StateChanged(was) => self.on_state_changed(was),
UserDataChanged(wau) => self.on_user_data_changed(wau)?,
PersistentChanged(wap) => self.on_persistent_session_data_changed(wap)?,
Disconnect(war) => self.on_disconnect(war),
GotGroupMetadata(meta) => self.on_got_group_metadata(meta)?,
Message(new, msg) => {
self.on_message(*msg, new)?;
},
MediaFinished(r) => self.media_finished(r)?,
CheckAcks => self.check_acks()?,
PrintAcks => self.print_acks()?,
MakeContact(a) => self.make_contact(a)?,
HistoryResponse(j, h) => self.history_response(j, h)?
MakeContact(a) => self.make_contact(a)?
}
Ok(())
}
@ -285,22 +286,19 @@ impl WhatsappManager { @@ -285,22 +286,19 @@ impl WhatsappManager {
warn!("Failed to store WA msgid {} after media download: {}", r.mi.0, e);
}
if self.mark_read {
if let Some(ref mut conn) = self.conn {
if let Some(p) = r.peer {
conn.send_message_read(r.mi, p);
}
if let Some(p) = r.peer {
self.outbox.push_back(WaRequest::MessageRead {
mid: r.mi,
peer: p
});
}
}
Ok(())
}
fn logon_if_saved(&mut self) -> Result<()> {
use whatsappweb::connection;
if let Some(wap) = self.store.get_wa_persistence_opt()? {
info!("Logging on to WhatsApp Web using stored persistence data");
let tx = self.wa_tx.clone();
let (conn, _) = connection::with_persistent_session(wap, WhatsappHandler { tx });
self.conn = Some(conn);
self.conn.connect_persistent(wap);
}
else {
info!("WhatsApp is not configured");
@ -308,16 +306,8 @@ impl WhatsappManager { @@ -308,16 +306,8 @@ impl WhatsappManager {
Ok(())
}
fn start_registration(&mut self) -> Result<()> {
use whatsappweb::connection;
info!("Beginning WhatsApp Web registration process");
let tx = self.wa_tx.clone();
let tx2 = self.wa_tx.clone();
let (conn, _) = connection::new(move |qr| {
tx2.unbounded_send(WhatsappCommand::QrCode(qr))
.unwrap()
}, WhatsappHandler { tx });
self.conn = Some(conn);
info!("Creating a new WhatsApp Web session");
self.conn.connect_new();
Ok(())
}
fn cb_respond(&mut self, s: String) {
@ -345,8 +335,9 @@ impl WhatsappManager { @@ -345,8 +335,9 @@ impl WhatsappManager {
alerted: false,
alerted_pending: false
};
let mid = self.conn.as_mut().unwrap()
.send_message(c, j);
let m = WaMessage::new(j, c);
let mid = m.id.clone();
self.outbox.push_back(WaRequest::SendMessage(m));
debug!("Send to {}: message ID {}", mss.destination.to_string(), mid.0);
if let Err(e) = self.store.store_wa_msgid(mid.0.clone()) {
warn!("Failed to store outgoing msgid {}: {}", mid.0, e);
@ -357,7 +348,7 @@ impl WhatsappManager { @@ -357,7 +348,7 @@ impl WhatsappManager {
fn send_direct_message(&mut self, addr: PduAddress, content: String) -> Result<()> {
debug!("Sending direct message to {}...", addr);
trace!("Message contents: {}", content);
if self.conn.is_none() || !self.connected {
if !self.connected || !self.conn.is_connected() {
warn!("Tried to send WA message to {} while disconnected!", addr);
self.cb_tx.unbounded_send(ControlBotCommand::ReportFailure(format!("Failed to send to WA contact {}: disconnected from server", addr)))
.unwrap();
@ -379,7 +370,7 @@ impl WhatsappManager { @@ -379,7 +370,7 @@ impl WhatsappManager {
fn send_group_message(&mut self, chan: String, content: String) -> Result<()> {
debug!("Sending message to group with chan {}...", chan);
trace!("Message contents: {}", content);
if self.conn.is_none() || !self.connected {
if !self.connected || !self.conn.is_connected() {
warn!("Tried to send WA message to group {} while disconnected!", chan);
self.cb_tx.unbounded_send(ControlBotCommand::ReportFailure(format!("Failed to send to group {}: disconnected from server", chan)))
.unwrap();
@ -483,7 +474,7 @@ impl WhatsappManager { @@ -483,7 +474,7 @@ impl WhatsappManager {
}
else {
if self.autocreate.is_some() {
info!("Attempting to autocreate channel for unbridged group {}...", gid.to_string());
info!("Attempting to autocreate channel for unbridged group {}...", gid);
match self.group_autocreate_from_unbridged(gid) {
Ok(id) => Some(id),
Err(e) => {
@ -493,7 +484,7 @@ impl WhatsappManager { @@ -493,7 +484,7 @@ impl WhatsappManager {
}
}
else {
info!("Received message for unbridged group {}, ignoring...", gid.to_string());
info!("Received message for unbridged group {}, ignoring...", gid);
return Ok(());
}
}
@ -591,10 +582,11 @@ impl WhatsappManager { @@ -591,10 +582,11 @@ impl WhatsappManager {
}
}
if let Some(p) = peer {
if let Some(ref mut conn) = self.conn {
if !is_media && !is_ours && self.mark_read {
conn.send_message_read(id, p);
}
if !is_media && !is_ours && self.mark_read {
self.outbox.push_back(WaRequest::MessageRead {
mid: id,
peer: p
});
}
}
Ok(())
@ -611,34 +603,6 @@ impl WhatsappManager { @@ -611,34 +603,6 @@ impl WhatsappManager {
}
Ok(())
}
fn load_history(&mut self, msgs: u16) -> Result<()> {
info!("Loading {} lines of history per chat for {} chats", msgs, self.chats.len());
for (jid, _) in self.chats.iter() {
let tx = self.wa_tx.clone();
let j = jid.clone();
debug!("Loading history for {}", j.to_string());
self.conn.as_mut().unwrap()
.get_messages_before(j.clone(), "".into(), msgs, Box::new(move |history| {
tx.unbounded_send(WhatsappCommand::HistoryResponse(j.clone(), history))
.unwrap();
}));
}
Ok(())
}
fn history_response(&mut self, jid: Jid, history: Option<Vec<WaMessage>>) -> Result<()> {
match history {
Some(msgs) => {
debug!("Got history response for {} - {} lines", jid.to_string(), msgs.len());
for msg in msgs {
self.on_message(msg, false)?;
}
},
_ => {
warn!("History response for {} was empty", jid.to_string());
}
}
Ok(())
}
fn process_media(&mut self, id: MessageId, peer: Option<Peer>, from: Jid, group: Option<i32>, ct: ChatMessageContent, ts: NaiveDateTime) -> Result<()> {
use whatsappweb::MediaType;
@ -674,7 +638,7 @@ impl WhatsappManager { @@ -674,7 +638,7 @@ impl WhatsappManager {
format!("\x021-to-1 chat\x0f")
}
};
list.push(format!("- '{}' (jid {}) - {}", gmeta.name.as_ref().map(|x| x as &str).unwrap_or("<unnamed>"), jid.to_string(), bstatus));
list.push(format!("- '{}' (jid {}) - {}", gmeta.name.as_ref().map(|x| x as &str).unwrap_or("<unnamed>"), jid, bstatus));
}
if list.len() == 0 {
self.cb_respond("no WhatsApp chats (yet?)".into());
@ -703,7 +667,7 @@ impl WhatsappManager { @@ -703,7 +667,7 @@ impl WhatsappManager {
let addr = match util::jid_to_address(jid) {
Some(a) => a,
None => {
return Err(format_err!("couldn't translate jid {} to address", jid.to_string()));
return Err(format_err!("couldn't translate jid {} to address", jid));
}
};
if let Some(recip) = self.store.get_recipient_by_addr_opt(&addr)? {
@ -754,18 +718,8 @@ impl WhatsappManager { @@ -754,18 +718,8 @@ impl WhatsappManager {
Ok(())
}
fn request_update_group(&mut self, jid: Jid) -> Result<()> {
info!("Getting metadata for jid {}", jid.to_string());
let tx = self.wa_tx.clone();
self.conn.as_mut().unwrap()
.get_group_metadata(&jid, Box::new(move |m| {
if let Some(m) = m {
tx.unbounded_send(WhatsappCommand::GotGroupMetadata(m))
.unwrap();
}
else {
warn!("Got empty group metadata, for some reason");
}
}));
info!("Getting metadata for jid {}", jid);
self.outbox.push_back(WaRequest::GetGroupMetadata(jid));
Ok(())
}
/// Auto-create a group after a GroupIntroduction message.
@ -818,13 +772,13 @@ impl WhatsappManager { @@ -818,13 +772,13 @@ impl WhatsappManager {
if let Some(grp) = self.store.get_group_by_chan_opt(&chan)? {
bail!("that channel is already used for a group (jid {})!", grp.jid);
}
if self.conn.is_none() || !self.connected {
if !self.connected || !self.conn.is_connected() {
bail!("we aren't connected to WhatsApp!");
}
if !jid.is_group {
bail!("that jid isn't a group!");
}
info!("Bridging WA group {} to channel {}", jid.to_string(), chan);
info!("Bridging WA group {} to channel {}", jid, chan);
let grp = self.store.store_group(&jid, &chan, vec![], vec![], "*** Group setup in progress, please wait... ***")?;
if request_update {
self.request_update_group(jid)?;
@ -857,24 +811,31 @@ impl WhatsappManager { @@ -857,24 +811,31 @@ impl WhatsappManager {
}
Ok(())
}
fn on_persistent_session_data_changed(&mut self, ps: WaPersistentSession) -> Result<()> {
self.store.store_wa_persistence(ps)?;
info!("Persistent session data updated");
fn on_established(&mut self, jid: Jid, ps: WaPersistentSession) -> Result<()> {
info!("Logged in as {}.", jid);
self.store.store_wa_persistence(ps.clone())?;
self.conn.set_persistent(Some(ps));
self.our_jid = Some(jid);
self.connected = true;
Ok(())
}
fn on_state_changed(&mut self, state: WaState) {
self.connected = state == WaState::Connected;
self.state = state;
info!("State changed to {:?}", state);
}
fn on_disconnect(&mut self, reason: WaDisconnectReason) {
use self::WaDisconnectReason::*;
let reason = match reason {
Replaced => "connection replaced by another",
Removed => "connection removed from mobile app"
};
warn!("Disconnected from WhatsApp - reason: {:?}", reason);
fn on_wa_error(&mut self, err: WaError) {
warn!("WA connection failed: {}", err);
if let WaError::Disconnected(reason) = err {
use self::WaDisconnectReason::*;
let reason_text = match reason {
Replaced => "connection replaced by another",
Removed => "connection removed from mobile app"
};
warn!("Disconnected from WhatsApp - reason: {:?}", reason_text);
if let Removed = reason {
let err = "Error: WhatsApp Web connection removed in the mobile app! Use the WHATSAPP SETUP command to restore connectivity.";
self.cb_tx.unbounded_send(ControlBotCommand::ReportFailure(err.into()))
.unwrap();
self.conn.disable();
}
}
self.our_jid = None;
self.connected = false;
}
fn on_contact_change(&mut self, ct: WaContact) -> Result<()> {
@ -904,61 +865,57 @@ impl WhatsappManager { @@ -904,61 +865,57 @@ impl WhatsappManager {
}
Ok(())
}
fn on_user_data_changed(&mut self, ud: WaUserData) -> Result<()> {
trace!("user data changed: {:?}", ud);
use self::WaUserData::*;
match ud {
ContactsInitial(cts) => {
fn on_wa_event(&mut self, evt: WaEvent) -> Result<()> {
use self::WaEvent::*;
match evt {
WebsocketConnected => {},
ScanCode(qr) => self.on_qr(qr)?,
SessionEstablished { jid, persistent } => self.on_established(jid, persistent)?,
Message { is_new, msg } => self.on_message(msg, is_new)?,
InitialContacts(cts) => {
info!("Received initial contact list");
for ct in cts {
self.on_contact_change(ct)?;
}
},
ContactAddChange(ct) => {
debug!("Contact {} added or modified", ct.jid.to_string());
AddContact(ct) => {
debug!("Contact {} added or modified", ct.jid);
self.on_contact_change(ct)?;
},
ContactDelete(jid) => {
DeleteContact(jid) => {
// NOTE: I don't see any real reason to ever delete contacts.
// They might contain useful names for us to use later, and WA
// seems to send contact deletion messages on the regular as
// people send stuff, which is stupid.
//
// Therefore, we just don't.
debug!("Contact {} deleted", jid.to_string());
debug!("Contact {} deleted", jid);
//self.contacts.remove(&jid);
},
Chats(cts) => {
InitialChats(cts) => {
info!("Received initial chat list");
for ct in cts {
self.chats.insert(ct.jid.clone(), ct);
}
if let Some(n) = self.load_history_messages {
self.load_history(n)?;
}
},
ChatAction(jid, act) => {
ChatEvent { jid, event } => {
use whatsappweb::ChatAction::*;
match act {
match event {
Remove => {
debug!("Chat {} removed", jid.to_string());
debug!("Chat {} removed", jid);
self.chats.remove(&jid);
},
act => debug!("Chat {} action: {:?}", jid.to_string(), act)
act => debug!("Chat {} action: {:?}", jid, act)
}
},
UserJid(jid) => {
info!("Our jid is: {}", jid.to_string());
self.our_jid = Some(jid);
},
PresenceChange(jid, ps, dt) => {
PresenceChange { jid, presence, ts } => {
use whatsappweb::PresenceStatus::*;
debug!("JID {} changed presence to {:?} (ts {:?})", jid.to_string(), ps, dt);
debug!("JID {} changed presence to {:?} (ts {:?})", jid, presence, ts);
if let Some(num) = util::jid_to_address(&jid) {
let away = match ps {
let away = match presence {
Unavailable => {
if let Some(ts) = dt {
if let Some(ts) = ts {
Some(format!("last seen {}", ts))
}
else {
@ -993,19 +950,27 @@ impl WhatsappManager { @@ -993,19 +950,27 @@ impl WhatsappManager {
self.group_autocreate_from_intro(meta)?;
}
},
GroupSubjectChange { group, subject, subject_owner, .. } => {
if let Some(id) = self.store.get_group_by_jid_opt(&group)?.map(|x| x.id) {
GroupMetadata { meta } => {
match meta {
Ok(m) => self.on_got_group_metadata(m)?,
Err(e) => {
warn!("Group metadata query failed: {}", e);
}
}
},
GroupSubjectChange { jid, subject, inducer, .. } => {
if let Some(id) = self.store.get_group_by_jid_opt(&jid)?.map(|x| x.id) {
let ts = Utc::now().naive_utc();
self.store_message(&subject_owner, &format!("\x01ACTION changed the subject to '{}'\x01", subject), Some(id), ts)?;
self.request_update_group(group)?;
self.store_message(&inducer, &format!("\x01ACTION changed the subject to '{}'\x01", subject), Some(id), ts)?;
self.request_update_group(jid)?;
}
},
GroupParticipantsChange { group, change, inducer, participants } => {
GroupParticipantsChange { jid, change, inducer, participants } => {
use whatsappweb::GroupParticipantsChange;
debug!("Participants {:?} in group {} changed: {:?} (by {:?})", participants, group.to_string(), change, inducer);
debug!("Participants {:?} in group {} changed: {:?} (by {:?})", participants, jid, change, inducer);
let ts = Utc::now().naive_utc();
if let Some(id) = self.store.get_group_by_jid_opt(&group)?.map(|x| x.id) {
if let Some(id) = self.store.get_group_by_jid_opt(&jid)?.map(|x| x.id) {
if let Some(inducer) = inducer {
// Special-case: someone removing themself is just them leaving.
if participants.len() == 1 && participants[0] == inducer {
@ -1026,12 +991,14 @@ impl WhatsappManager { @@ -1026,12 +991,14 @@ impl WhatsappManager {
self.store_message(&inducer, &format!("\x01ACTION {}: {}\x01", action, nicks.join(", ")), Some(id), ts)?;
}
}
self.request_update_group(group)?;
self.request_update_group(jid)?;
}
},
StatusChange(user, status) => {
let recip = self.get_wa_recipient(&user)?;
info!("{} changed their status to: {}", recip.nick, status);
ProfileStatus { jid, status, was_request } => {
let recip = self.get_wa_recipient(&jid)?;
if !was_request {
info!("{} changed their status to: {}", recip.nick, status);
}
},
PictureChange { jid, removed } => {
let recip = self.get_wa_recipient(&jid)?;
@ -1042,9 +1009,17 @@ impl WhatsappManager { @@ -1042,9 +1009,17 @@ impl WhatsappManager {
info!("{} removed their profile photo.", recip.nick);
}
},
Battery(level) => {
MessageSendFail { mid, status } => {
error!("Got a MessageSendFail (status {}) for mid {}", status, mid.0);
let err = format!("Error: Sending WhatsApp message ID {} failed with code {}!", mid.0, status);
self.cb_tx.unbounded_send(ControlBotCommand::ReportFailure(err.into()))
.unwrap();
},
BatteryLevel(level) => {
// FIXME: warn when this gets low?
debug!("Phone battery level: {}", level);
}
},
_ => {}
}
Ok(())
}

159
src/whatsapp_conn.rs

@ -0,0 +1,159 @@ @@ -0,0 +1,159 @@
//! Connecting to WhatsApp Web, and handling the intermediate states when doing so.
//!
//! FIXME: maybe find a way to share code between this and `ModemInner` from
//! `src/modem.rs`.
use whatsappweb::conn::WebConnection;
use whatsappweb::event::WaEvent;
use whatsappweb::req::WaRequest;
use whatsappweb::errors::WaError;
use whatsappweb::session::PersistentSession;
use tokio_timer::Delay;
use std::time::{Duration, Instant};
use failure::Error;
use futures::{self, Future, Stream, Poll, Async, Sink, StartSend};
pub struct WebConnectionWrapperConfig {
pub backoff_time_ms: u64
}
enum WrapperState {
Disabled,
Waiting(Delay),
Initializing(Box<dyn Future<Item = WebConnection, Error = WaError>>),
Running(WebConnection)
}
pub struct WebConnectionWrapper {
inner: WrapperState,
persist: Option<PersistentSession>,
cfg: WebConnectionWrapperConfig
}
impl WebConnectionWrapper {
pub fn new(cfg: WebConnectionWrapperConfig) -> Self {
Self {
inner: WrapperState::Disabled,
persist: None,
cfg
}
}
pub fn disable(&mut self) {
self.inner = WrapperState::Disabled;
}
pub fn connect_new(&mut self) {
self.set_persistent(None);
self.initialize();
}
pub fn connect_persistent(&mut self, persist: PersistentSession) {
self.set_persistent(Some(persist));
self.initialize();
}
pub fn set_persistent(&mut self, persist: Option<PersistentSession>) {
self.persist = persist;
}
pub fn is_connected(&self) -> bool {
if let WrapperState::Running(_) = self.inner {
true
}
else {
false
}
}
fn initialize(&mut self) {
info!("Connecting to WhatsApp Web...");
let fut: Box<dyn Future<Item = WebConnection, Error = WaError>> = match self.persist.clone() {
Some(p) => Box::new(WebConnection::connect_persistent(p)),
None => Box::new(WebConnection::connect_new())
};
self.inner = WrapperState::Initializing(fut);
}
fn backoff(&mut self) {
let delay = Delay::new(Instant::now() + Duration::from_millis(self.cfg.backoff_time_ms));
self.inner = WrapperState::Waiting(delay);
}
}
impl Stream for WebConnectionWrapper {
type Item = Result<WaEvent, WaError>;
type Error = Error;
fn poll(&mut self) -> Poll<Option<Result<WaEvent, WaError>>, Error> {
use self::WrapperState::*;
loop {
match std::mem::replace(&mut self.inner, Disabled) {
Disabled => return Ok(Async::NotReady),
Waiting(mut delay) => {
match delay.poll()? {
Async::Ready(_) => self.initialize(),
Async::NotReady => {
self.inner = Waiting(delay);
return Ok(Async::NotReady);
},
}
},
Initializing(mut fut) => {
match fut.poll() {
Ok(Async::Ready(c)) => {
info!("Connected to WhatsApp Web.");
self.inner = Running(c);
},
Ok(Async::NotReady) => {
self.inner = Initializing(fut);
return Ok(Async::NotReady);
},
Err(e) => {
warn!("Failed to connect to WA: {}", e);
self.backoff();
}
}
},
Running(mut fut) => {
match fut.poll() {
Ok(Async::NotReady) => {
self.inner = Running(fut);
return Ok(Async::NotReady);
},
Ok(Async::Ready(Some(x))) => {
self.inner = Running(fut);
return Ok(Async::Ready(Some(Ok(x))));
},
Ok(Async::Ready(None)) => {
unreachable!()
},
Err(e) => {
self.backoff();
return Ok(Async::Ready(Some(Err(e))));
}
}
},
}
}
}
}
impl Sink for WebConnectionWrapper {
type SinkItem = WaRequest;
type SinkError = WaError;
fn start_send(&mut self, item: WaRequest) -> StartSend<WaRequest, WaError> {
if let WrapperState::Running(ref mut c) = self.inner {
match c.start_send(item) {
Err(e) => {
warn!("WA sink failed: {}", e);
self.backoff();
return Err(e);
},
x => x
}
}
else {
panic!("WebConnectionWrapper should not be used as a sink while disconnected");
}
}
fn poll_complete(&mut self) -> Poll<(), WaError> {
if let WrapperState::Running(ref mut c) = self.inner {
c.poll_complete()
}
else {
panic!("WebConnectionWrapper should not be used as a sink while disconnected");
}
}
}
Loading…
Cancel
Save