Browse Source

add whatsapp support, and more

master
eeeeeta 5 years ago
parent
commit
94b1886e2f
  1. 1036
      Cargo.lock
  2. 13
      Cargo.toml
  3. 7
      migrations/20180729152102_whatsapp/down.sql
  4. 17
      migrations/20180729152102_whatsapp/up.sql
  5. 45
      src/comm.rs
  6. 2
      src/config.rs
  7. 87
      src/contact.rs
  8. 26
      src/contact_factory.rs
  9. 89
      src/control.rs
  10. 30
      src/main.rs
  11. 42
      src/models.rs
  12. 4
      src/modem.rs
  13. 21
      src/schema.rs
  14. 123
      src/store.rs
  15. 22
      src/util.rs
  16. 449
      src/whatsapp.rs

1036
Cargo.lock generated

File diff suppressed because it is too large Load Diff

13
Cargo.toml

@ -4,22 +4,29 @@ name = "sms-irc" @@ -4,22 +4,29 @@ name = "sms-irc"
version = "0.1.0"
[dependencies]
derive_builder = "0.5"
failure = "0.1"
log4rs = "0.8"
futures = "0.1"
image = "0.19"
irc = "0.13"
log = "0.4"
log4rs = "0.8"
qrcode = "0.7"
r2d2 = "0.8"
r2d2-diesel = "1.0"
serde = "1.0"
serde_derive = "1.0"
serde_json = "1.0"
tokio-core = "0.1"
tokio-timer = "0.2"
toml = "0.4"
[dependencies.diesel]
features = ["postgres"]
features = ["postgres", "serde_json"]
version = "1.0"
[dependencies.huawei-modem]
git = "git://github.com/eeeeeta/huawei-modem"
git = "https://github.com/eeeeeta/huawei-modem"
[dependencies.whatsappweb]
path = "../whatsappweb-rs"

7
migrations/20180729152102_whatsapp/down.sql

@ -0,0 +1,7 @@ @@ -0,0 +1,7 @@
ALTER TABLE messages DROP CONSTRAINT messages_concat_if_pdu;
ALTER TABLE messages DROP CONSTRAINT messages_pdu_or_text;
ALTER TABLE messages DROP COLUMN text;
ALTER TABLE messages DROP COLUMN group_target;
ALTER TABLE messages ALTER COLUMN pdu SET NOT NULL;
DROP TABLE wa_persistence;
DROP TABLE groups;

17
migrations/20180729152102_whatsapp/up.sql

@ -0,0 +1,17 @@ @@ -0,0 +1,17 @@
CREATE TABLE groups (
id SERIAL PRIMARY KEY,
jid VARCHAR UNIQUE NOT NULL,
channel VARCHAR UNIQUE NOT NULL,
participants INT[] NOT NULL,
admins INT[] NOT NULL,
topic VARCHAR NOT NULL
);
CREATE TABLE wa_persistence (
rev INT PRIMARY KEY,
data JSON NOT NULL
);
ALTER TABLE messages ALTER COLUMN pdu DROP NOT NULL;
ALTER TABLE messages ADD COLUMN group_target INT REFERENCES groups ON DELETE CASCADE;
ALTER TABLE messages ADD COLUMN text VARCHAR;
ALTER TABLE messages ADD CONSTRAINT messages_pdu_or_text CHECK ((pdu IS NULL) != (text IS NULL));
ALTER TABLE messages ADD CONSTRAINT messages_concat_if_pdu CHECK ((csms_data IS NULL) OR (pdu IS NULL));

45
src/comm.rs

@ -4,10 +4,17 @@ use futures::sync::mpsc::{self, UnboundedSender, UnboundedReceiver}; @@ -4,10 +4,17 @@ use futures::sync::mpsc::{self, UnboundedSender, UnboundedReceiver};
use huawei_modem::cmd::sms::SmsMessage;
use huawei_modem::errors::HuaweiError;
use huawei_modem::pdu::PduAddress;
use whatsappweb::Jid;
use whatsappweb::connection::State as WaState;
use whatsappweb::connection::UserData as WaUserData;
use whatsappweb::connection::PersistentSession as WaPersistentSession;
use whatsappweb::connection::DisconnectReason as WaDisconnectReason;
use whatsappweb::message::ChatMessage as WaMessage;
use huawei_modem::cmd::network::{SignalQuality, RegistrationState};
use config::Config;
use store::Store;
use tokio_core::reactor::Handle;
use qrcode::QrCode;
pub enum ModemCommand {
DoCmgl,
@ -17,19 +24,42 @@ pub enum ModemCommand { @@ -17,19 +24,42 @@ pub enum ModemCommand {
RequestCsq,
RequestReg
}
pub enum WhatsappCommand {
StartRegistration,
LogonIfSaved,
QrCode(QrCode),
SendGroupMessage(String, String),
SendDirectMessage(PduAddress, String),
GroupAssociate(Jid, String),
GroupList,
GroupRemove(String),
StateChanged(WaState),
UserDataChanged(WaUserData),
PersistentChanged(WaPersistentSession),
Disconnect(WaDisconnectReason),
Message(bool, Box<WaMessage>)
}
pub enum ContactFactoryCommand {
ProcessMessages,
ProcessGroups,
MakeContact(PduAddress),
DropContact(PduAddress),
LoadRecipients
LoadRecipients,
UpdateAway(PduAddress, Option<String>)
}
pub enum ContactManagerCommand {
ProcessMessages
ProcessMessages,
ProcessGroups,
UpdateAway(Option<String>)
}
pub enum ControlBotCommand {
Log(String),
ReportFailure(String),
// FIXME: unify this and the *Results
CommandResponse(String),
CsqResult(SignalQuality),
RegResult(RegistrationState)
RegResult(RegistrationState),
ProcessGroups
}
pub struct InitParameters<'a> {
pub cfg: &'a Config,
@ -43,20 +73,25 @@ pub struct ChannelMaker { @@ -43,20 +73,25 @@ pub struct ChannelMaker {
pub cf_rx: Option<UnboundedReceiver<ContactFactoryCommand>>,
pub cf_tx: UnboundedSender<ContactFactoryCommand>,
pub cb_rx: Option<UnboundedReceiver<ControlBotCommand>>,
pub cb_tx: UnboundedSender<ControlBotCommand>
pub cb_tx: UnboundedSender<ControlBotCommand>,
pub wa_rx: Option<UnboundedReceiver<WhatsappCommand>>,
pub wa_tx: UnboundedSender<WhatsappCommand>
}
impl ChannelMaker {
pub fn new() -> Self {
let (modem_tx, modem_rx) = mpsc::unbounded();
let (cf_tx, cf_rx) = mpsc::unbounded();
let (cb_tx, cb_rx) = mpsc::unbounded();
let (wa_tx, wa_rx) = mpsc::unbounded();
Self {
modem_rx: Some(modem_rx),
modem_tx,
cf_rx: Some(cf_rx),
cf_tx,
cb_rx: Some(cb_rx),
cb_tx
cb_tx,
wa_rx: Some(wa_rx),
wa_tx
}
}
}

2
src/config.rs

@ -19,5 +19,7 @@ pub struct Config { @@ -19,5 +19,7 @@ pub struct Config {
pub irc_port: Option<u16>,
#[serde(default)]
pub irc_password: Option<String>,
#[serde(default)]
pub qr_path: Option<String>
}

87
src/contact.rs

@ -2,7 +2,7 @@ @@ -2,7 +2,7 @@
use irc::client::PackedIrcClient;
use futures::sync::mpsc::{UnboundedSender, UnboundedReceiver, self};
use comm::{ModemCommand, ContactManagerCommand, ContactFactoryCommand, InitParameters};
use comm::{ModemCommand, ContactManagerCommand, ContactFactoryCommand, WhatsappCommand, InitParameters};
use huawei_modem::pdu::{PduAddress, DeliverPdu};
use store::Store;
use failure::Error;
@ -18,6 +18,7 @@ use models::Message as OurMessage; @@ -18,6 +18,7 @@ use models::Message as OurMessage;
use models::Recipient;
use irc::client::data::config::Config as IrcConfig;
use util::{self, Result};
use std::borrow::Cow;
/// The maximum message size sent over IRC.
static MESSAGE_MAX_LEN: usize = 350;
@ -29,9 +30,13 @@ pub struct ContactManager { @@ -29,9 +30,13 @@ pub struct ContactManager {
addr: PduAddress,
store: Store,
id: bool,
wa_mode: bool,
admin_is_online: bool,
connected: bool,
presence: Option<String>,
channels: Vec<String>,
cf_tx: UnboundedSender<ContactFactoryCommand>,
wa_tx: UnboundedSender<WhatsappCommand>,
modem_tx: UnboundedSender<ModemCommand>,
pub tx: UnboundedSender<ContactManagerCommand>,
rx: UnboundedReceiver<ContactManagerCommand>
@ -63,7 +68,27 @@ impl ContactManager { @@ -63,7 +68,27 @@ impl ContactManager {
self.tx.unbounded_send(cmd)
.unwrap()
}
fn send_raw_message(&mut self, msg: &str) -> Result<()> {
fn process_groups(&mut self) -> Result<()> {
let mut chans = vec![];
for grp in self.store.get_groups_for_recipient(&self.addr)? {
self.irc.0.send_join(&grp.channel)?;
chans.push(grp.channel);
}
for ch in ::std::mem::replace(&mut self.channels, chans) {
if !self.channels.contains(&ch) {
self.irc.0.send_part(&ch)?;
}
}
Ok(())
}
fn send_raw_message(&mut self, msg: &str, group_target: Option<i32>) -> Result<()> {
let dest: Cow<str> = if let Some(g) = group_target {
let grp = self.store.get_group_by_id(g)?;
Cow::Owned(grp.channel)
}
else {
Cow::Borrowed(&self.admin)
};
// We need to split messages that are too long to send on IRC up
// into fragments, as well as splitting them at newlines.
//
@ -87,11 +112,17 @@ impl ContactManager { @@ -87,11 +112,17 @@ impl ContactManager {
}
});
for chunk in iter {
self.irc.0.send_privmsg(&self.admin, chunk)?;
self.irc.0.send_privmsg(&dest, chunk)?;
}
}
Ok(())
}
fn process_msg_plain(&mut self, msg: OurMessage) -> Result<()> {
let text = msg.text.as_ref().expect("msg has neither text nor pdu");
self.send_raw_message(text, msg.group_target)?;
self.store.delete_message(msg.id)?;
Ok(())
}
fn process_msg_pdu(&mut self, msg: OurMessage, pdu: DeliverPdu) -> Result<()> {
use huawei_modem::convert::TryFrom;
@ -111,7 +142,7 @@ impl ContactManager { @@ -111,7 +142,7 @@ impl ContactManager {
let mut concatenated = String::new();
let mut pdus = vec![];
for msg in msgs.iter() {
let dec = DeliverPdu::try_from(&msg.pdu)?
let dec = DeliverPdu::try_from(msg.pdu.as_ref().expect("csms message has no pdu"))?
.get_message_data()
.decode_message()?;
pdus.push(dec);
@ -120,13 +151,13 @@ impl ContactManager { @@ -120,13 +151,13 @@ impl ContactManager {
for pdu in pdus {
concatenated.push_str(&pdu.text);
}
self.send_raw_message(&concatenated)?;
self.send_raw_message(&concatenated, msg.group_target)?;
for msg in msgs.iter() {
self.store.delete_message(msg.id)?;
}
}
else {
self.send_raw_message(&m.text)?;
self.send_raw_message(&m.text, msg.group_target)?;
self.store.delete_message(msg.id)?;
}
},
@ -152,15 +183,34 @@ impl ContactManager { @@ -152,15 +183,34 @@ impl ContactManager {
let msgs = self.store.get_messages_for_recipient(&self.addr)?;
for msg in msgs {
debug!("Processing message #{}", msg.id);
let pdu = DeliverPdu::try_from(&msg.pdu)?;
self.process_msg_pdu(msg, pdu)?;
if msg.pdu.is_some() {
let pdu = DeliverPdu::try_from(msg.pdu.as_ref().unwrap())?;
self.process_msg_pdu(msg, pdu)?;
}
else {
self.process_msg_plain(msg)?;
}
}
Ok(())
}
fn update_away(&mut self) -> Result<()> {
if !self.connected {
debug!("Not updating presence yet; not connected");
return Ok(());
}
debug!("Setting away state to {:?}", self.presence);
self.irc.0.send(Command::AWAY(self.presence.clone()))?;
Ok(())
}
fn handle_int_rx(&mut self, cmc: ContactManagerCommand) -> Result<()> {
use self::ContactManagerCommand::*;
match cmc {
ProcessMessages => self.process_messages()?,
ProcessGroups => self.process_groups()?,
UpdateAway(msg) => {
self.presence = msg;
self.update_away()?;
}
}
Ok(())
}
@ -184,6 +234,11 @@ impl ContactManager { @@ -184,6 +234,11 @@ impl ContactManager {
self.change_nick(msg[1].into())?;
self.irc.0.send_notice(&self.admin, "Done.")?;
},
"!wa" => {
self.wa_mode = !self.wa_mode;
let state = if self.wa_mode { "ENABLED" } else { "DISABLED" };
self.irc.0.send_notice(&self.admin, &format!("WhatsApp mode: {}", state))?;
},
"!die" => {
self.cf_tx.unbounded_send(ContactFactoryCommand::DropContact(self.addr.clone()))
.unwrap();
@ -207,6 +262,7 @@ impl ContactManager { @@ -207,6 +262,7 @@ impl ContactManager {
self.connected = true;
self.process_messages()?;
self.initialize_watch()?;
self.update_away()?;
},
Command::NICK(nick) => {
if let Some(from) = im.prefix {
@ -248,7 +304,12 @@ impl ContactManager { @@ -248,7 +304,12 @@ impl ContactManager {
}
if target == self.nick {
debug!("{} -> {}: {}", from[0], self.addr, mesg);
self.modem_tx.unbounded_send(ModemCommand::SendMessage(self.addr.clone(), mesg)).unwrap();
if self.wa_mode {
self.wa_tx.unbounded_send(WhatsappCommand::SendDirectMessage(self.addr.clone(), mesg)).unwrap();
}
else {
self.modem_tx.unbounded_send(ModemCommand::SendMessage(self.addr.clone(), mesg)).unwrap();
}
}
}
},
@ -303,6 +364,7 @@ impl ContactManager { @@ -303,6 +364,7 @@ impl ContactManager {
let (tx, rx) = mpsc::unbounded();
let modem_tx = p.cm.modem_tx.clone();
let cf_tx = p.cm.cf_tx.clone();
let wa_tx = p.cm.wa_tx.clone();
let admin = p.cfg.admin_nick.clone();
let cfg = Box::into_raw(Box::new(IrcConfig {
nickname: Some(recip.nick),
@ -333,14 +395,19 @@ impl ContactManager { @@ -333,14 +395,19 @@ impl ContactManager {
let nick = cli.0.current_nickname().into();
tx.unbounded_send(ContactManagerCommand::ProcessMessages)
.unwrap();
tx.unbounded_send(ContactManagerCommand::ProcessGroups)
.unwrap();
Ok(ContactManager {
irc: cli,
irc_stream,
id: false,
connected: false,
wa_mode: false,
// Assume admin is online to start with
admin_is_online: true,
addr, store, modem_tx, tx, rx, admin, nick, cf_tx
presence: None,
channels: vec![],
addr, store, modem_tx, tx, rx, admin, nick, cf_tx, wa_tx
})
},
Err(e) => {

26
src/contact_factory.rs

@ -18,6 +18,7 @@ pub struct ContactFactory { @@ -18,6 +18,7 @@ pub struct ContactFactory {
rx: UnboundedReceiver<ContactFactoryCommand>,
contacts_starting: HashMap<PduAddress, Box<Future<Item = ContactManager, Error = Error>>>,
contacts: HashMap<PduAddress, ContactManager>,
contacts_presence: HashMap<PduAddress, Option<String>>,
failed_contacts: HashSet<PduAddress>,
failure_int: Interval,
messages_processed: HashSet<i32>,
@ -37,15 +38,20 @@ impl Future for ContactFactory { @@ -37,15 +38,20 @@ impl Future for ContactFactory {
let msg = res.expect("contactfactory rx died");
match msg {
ProcessMessages => self.process_messages()?,
ProcessGroups => self.process_groups()?,
LoadRecipients => self.load_recipients()?,
MakeContact(addr) => self.make_contact(addr)?,
DropContact(addr) => self.drop_contact(addr)?
DropContact(addr) => self.drop_contact(addr)?,
UpdateAway(addr, away) => self.update_away(addr, away)
}
}
let mut to_remove = vec![];
for (addr, fut) in self.contacts_starting.iter_mut() {
match fut.poll() {
Ok(Async::Ready(c)) => {
if let Some(pre) = self.contacts_presence.get(&addr) {
c.add_command(ContactManagerCommand::UpdateAway(pre.clone()));
}
self.contacts.insert(addr.clone(), c);
to_remove.push(addr.clone())
},
@ -91,6 +97,7 @@ impl ContactFactory { @@ -91,6 +97,7 @@ impl ContactFactory {
Self {
rx, failure_int,
contacts_starting: HashMap::new(),
contacts_presence: HashMap::new(),
contacts: HashMap::new(),
failed_contacts: HashSet::new(),
messages_processed: HashSet::new(),
@ -115,6 +122,10 @@ impl ContactFactory { @@ -115,6 +122,10 @@ impl ContactFactory {
let addr = util::un_normalize_address(&recip.phone_number)
.ok_or(format_err!("invalid num {} in db", recip.phone_number))?;
debug!("Setting up recipient for {} (nick {})", addr, recip.nick);
if self.contacts_starting.get(&addr).is_some() || self.contacts.get(&addr).is_some() {
debug!("Not doing anything; contact already exists");
return Ok(());
}
let cfut = {
let ip = self.get_init_parameters();
ContactManager::new(recip, ip)
@ -146,6 +157,19 @@ impl ContactFactory { @@ -146,6 +157,19 @@ impl ContactFactory {
}
Ok(())
}
fn update_away(&mut self, addr: PduAddress, away: Option<String>) {
if let Some(c) = self.contacts.get(&addr) {
c.add_command(ContactManagerCommand::UpdateAway(away.clone()));
}
self.contacts_presence.insert(addr, away);
}
fn process_groups(&mut self) -> Result<()> {
debug!("Processing group updates");
for (_, c) in self.contacts.iter_mut() {
c.add_command(ContactManagerCommand::ProcessGroups);
}
Ok(())
}
fn process_messages(&mut self) -> Result<()> {
for msg in self.store.get_all_messages()? {
if self.messages_processed.insert(msg.id) {

89
src/control.rs

@ -1,10 +1,10 @@ @@ -1,10 +1,10 @@
//! IRC bot that allows users to control the bridge.
//!
//! A lot of this module is a copypasta from src/contact.rs and I don't like it :(
//! FIXME: A lot of this module is a copypasta from src/contact.rs and I don't like it :(
use irc::client::PackedIrcClient;
use futures::sync::mpsc::{UnboundedSender, UnboundedReceiver};
use comm::{ControlBotCommand, ModemCommand, ContactFactoryCommand, InitParameters};
use comm::{ControlBotCommand, ModemCommand, ContactFactoryCommand, InitParameters, WhatsappCommand};
use failure::Error;
use futures::{self, Future, Async, Poll, Stream};
use futures::future::Either;
@ -14,6 +14,7 @@ use irc::client::data::config::Config as IrcConfig; @@ -14,6 +14,7 @@ use irc::client::data::config::Config as IrcConfig;
use irc::proto::message::Message;
use irc::client::ext::ClientExt;
use util::Result;
use store::Store;
static HELPTEXT: &str = r#"sms-irc help:
[in this admin room]
@ -21,15 +22,24 @@ static HELPTEXT: &str = r#"sms-irc help: @@ -21,15 +22,24 @@ static HELPTEXT: &str = r#"sms-irc help:
- !reg: check modem registration status
- !sms <num>: start a conversation with a given phone number
[in a /NOTICE to one of the ghosts]
- !nick <nick>: change nickname"#;
- !nick <nick>: change nickname
- !wasetup: set up WhatsApp Web integration
- !walogon: logon to WhatsApp Web using stored credentials
- !wabridge <jid> <#channel>: bridge the WA group <jid> to an IRC channel <#channel>
- !walist: list available WA groups
- !wadel <#channel>: unbridge IRC channel <#channel>
"#;
pub struct ControlBot {
irc: PackedIrcClient,
irc_stream: ClientStream,
chan: String,
admin: String,
channels: Vec<String>,
store: Store,
id: bool,
rx: UnboundedReceiver<ControlBotCommand>,
cf_tx: UnboundedSender<ContactFactoryCommand>,
wa_tx: UnboundedSender<WhatsappCommand>,
m_tx: UnboundedSender<ModemCommand>
}
impl Future for ControlBot {
@ -55,12 +65,61 @@ impl Future for ControlBot { @@ -55,12 +65,61 @@ impl Future for ControlBot {
}
}
impl ControlBot {
// FIXME: this is yet more horrible copypasta :<
fn process_groups(&mut self) -> Result<()> {
let mut chans = vec![];
for grp in self.store.get_all_groups()? {
self.irc.0.send_join(&grp.channel)?;
chans.push(grp.channel);
}
for ch in ::std::mem::replace(&mut self.channels, chans) {
if !self.channels.contains(&ch) {
self.irc.0.send_part(&ch)?;
}
}
Ok(())
}
fn process_admin_command(&mut self, mesg: String) -> Result<()> {
if mesg.len() < 1 || mesg.chars().nth(0) != Some('!') {
return Ok(());
}
let msg = mesg.split(" ").collect::<Vec<_>>();
match msg[0] {
"!wasetup" => {
self.wa_tx.unbounded_send(WhatsappCommand::StartRegistration)
.unwrap();
},
"!walist" => {
self.wa_tx.unbounded_send(WhatsappCommand::GroupList)
.unwrap();
},
"!walogon" => {
self.wa_tx.unbounded_send(WhatsappCommand::LogonIfSaved)
.unwrap();
},
"!wadel" => {
if msg.get(1).is_none() {
self.irc.0.send_privmsg(&self.chan, "!wadel takes an argument.")?;
return Ok(());
}
self.wa_tx.unbounded_send(WhatsappCommand::GroupRemove(msg[1].into()))
.unwrap();
},
"!wabridge" => {
if msg.get(1).is_none() || msg.get(2).is_none() {
self.irc.0.send_privmsg(&self.chan, "!wabridge takes two arguments.")?;
return Ok(());
}
let jid = match msg[1].parse() {
Ok(j) => j,
Err(e) => {
self.irc.0.send_privmsg(&self.chan, &format!("failed to parse jid: {}", e))?;
return Ok(());
}
};
self.wa_tx.unbounded_send(WhatsappCommand::GroupAssociate(jid, msg[2].into()))
.unwrap();
},
"!csq" => {
self.m_tx.unbounded_send(ModemCommand::RequestCsq).unwrap();
},
@ -102,6 +161,14 @@ impl ControlBot { @@ -102,6 +161,14 @@ impl ControlBot {
debug!("Received control command: {}", mesg);
self.process_admin_command(mesg)?;
}
else if self.channels.contains(&target) {
debug!("Received group message in {}: {}", target, mesg);
self.wa_tx.unbounded_send(WhatsappCommand::SendGroupMessage(target, mesg))
.unwrap();
}
else {
warn!("Received unsolicited message to {}: {}", target, mesg);
}
}
},
Command::ERROR(msg) => {
@ -118,21 +185,30 @@ impl ControlBot { @@ -118,21 +185,30 @@ impl ControlBot {
Log(log) => {
self.irc.0.send_notice(&self.chan, &log)?;
},
ReportFailure(err) => {
self.irc.0.send_notice(&self.admin, &format!("\x02\x0304{}\x0f", err))?;
},
CommandResponse(resp) => {
self.irc.0.send_privmsg(&self.chan, &format!("{}: {}", self.admin, resp))?;
},
CsqResult(sq) => {
self.irc.0.send_privmsg(&self.chan, &format!("RSSI: {} | BER: {}", sq.rssi, sq.ber))?;
self.irc.0.send_privmsg(&self.chan, &format!("RSSI: \x02{}\x0f | BER: \x02{}\x0f", sq.rssi, sq.ber))?;
},
RegResult(st) => {
self.irc.0.send_privmsg(&self.chan, &format!("Registration state: {}", st))?;
self.irc.0.send_privmsg(&self.chan, &format!("Registration state: \x02{}\x0f", st))?;
},
ProcessGroups => self.process_groups()?
}
Ok(())
}
pub fn new(p: InitParameters) -> impl Future<Item = Self, Error = Error> {
let cf_tx = p.cm.cf_tx.clone();
let wa_tx = p.cm.wa_tx.clone();
let m_tx = p.cm.modem_tx.clone();
let rx = p.cm.cb_rx.take().unwrap();
let admin = p.cfg.admin_nick.clone();
let chan = p.cfg.irc_channel.clone();
let store = p.store.clone();
let cfg = Box::into_raw(Box::new(IrcConfig {
nickname: Some(p.cfg.control_bot_nick.clone().unwrap_or("smsirc".into())),
realname: Some("smsirc control bot".into()),
@ -158,7 +234,8 @@ impl ControlBot { @@ -158,7 +234,8 @@ impl ControlBot {
irc: cli,
irc_stream,
id: false,
cf_tx, m_tx, rx, admin, chan
channels: vec![],
cf_tx, m_tx, rx, admin, chan, store, wa_tx
})
},
Err(e) => {

30
src/main.rs

@ -12,6 +12,10 @@ extern crate toml; @@ -12,6 +12,10 @@ extern crate toml;
#[macro_use] extern crate log;
extern crate log4rs;
extern crate tokio_timer;
extern crate whatsappweb;
extern crate serde_json;
extern crate image;
extern crate qrcode;
mod config;
mod store;
@ -23,6 +27,7 @@ mod models; @@ -23,6 +27,7 @@ mod models;
mod contact;
mod contact_factory;
mod control;
mod whatsapp;
use config::Config;
use store::Store;
@ -40,6 +45,7 @@ use log4rs::append::Append; @@ -40,6 +45,7 @@ use log4rs::append::Append;
use log4rs::append::console::ConsoleAppender;
use log::Record;
use std::fmt;
use whatsapp::WhatsappManager;
pub struct IrcLogWriter {
sender: UnboundedSender<ControlBotCommand>
@ -51,8 +57,17 @@ impl fmt::Debug for IrcLogWriter { @@ -51,8 +57,17 @@ impl fmt::Debug for IrcLogWriter {
}
impl Append for IrcLogWriter {
fn append(&self, rec: &Record) -> Result<(), Box<::std::error::Error + Sync + Send>> {
use log::Level::*;
let colour = match rec.level() {
Error => "04",
Warn => "07",
Info => "09",
Debug => "10",
Trace => "11"
};
self.sender.unbounded_send(
ControlBotCommand::Log(format!("{}:{} -- {}", rec.target(), rec.level(), rec.args())))
ControlBotCommand::Log(format!("[\x0302{}\x0f] \x02\x03{}{}\x0f -- {}", rec.target(), colour, rec.level(), rec.args())))
.unwrap();
Ok(())
}
@ -112,6 +127,19 @@ fn main() -> Result<(), failure::Error> { @@ -112,6 +127,19 @@ fn main() -> Result<(), failure::Error> {
error!("ControlBot failed: {}", e);
panic!("controlbot failed");
}));
info!("Initializing WhatsApp");
let wa = WhatsappManager::new(InitParameters {
cfg: &config,
store: store.clone(),
cm: &mut cm,
hdl: &hdl
});
hdl.spawn(wa.map_err(|e| {
// FIXME: restartability
error!("WhatsappManager failed: {}", e);
panic!("whatsapp failed");
}));
info!("Initializing contact factory");
let cf = ContactFactory::new(config, store, cm, hdl);
let _ = core.run(cf.map_err(|e| {

42
src/models.rs

@ -1,4 +1,5 @@ @@ -1,4 +1,5 @@
use schema::{recipients, messages};
use schema::{recipients, messages, groups, wa_persistence};
use serde_json::Value;
#[derive(Queryable)]
pub struct Recipient {
@ -16,13 +17,46 @@ pub struct NewRecipient<'a> { @@ -16,13 +17,46 @@ pub struct NewRecipient<'a> {
pub struct Message {
pub id: i32,
pub phone_number: String,
pub pdu: Vec<u8>,
pub csms_data: Option<i32>
pub pdu: Option<Vec<u8>>,
pub csms_data: Option<i32>,
pub group_target: Option<i32>,
pub text: Option<String>,
}
#[derive(Queryable, Debug)]
pub struct Group {
pub id: i32,
pub jid: String,
pub channel: String,
pub participants: Vec<i32>,
pub admins: Vec<i32>,
pub topic: String
}
#[derive(Insertable, Queryable, Debug)]
#[table_name="wa_persistence"]
pub struct PersistenceData {
pub rev: i32,
pub data: Value
}
#[derive(Insertable)]
#[table_name="groups"]
pub struct NewGroup<'a> {
pub jid: &'a str,
pub channel: &'a str,
pub participants: Vec<i32>,
pub admins: Vec<i32>,
pub topic: &'a str
}
#[derive(Insertable)]
#[table_name="messages"]
pub struct NewMessage<'a> {
pub phone_number: &'a str,
pub pdu: &'a [u8],
pub csms_data: Option<i32>
pub csms_data: Option<i32>,
}
#[derive(Insertable)]
#[table_name="messages"]
pub struct NewPlainMessage<'a> {
pub phone_number: &'a str,
pub group_target: Option<i32>,
pub text: &'a str,
}

4
src/modem.rs

@ -188,6 +188,7 @@ impl ModemManager { @@ -188,6 +188,7 @@ impl ModemManager {
.map_err(|e| e.into()));
}
let a1 = addr.clone();
let cb_tx = self.cb_tx.clone();
let fut = futures::future::join_all(futs)
.map(move |res| {
info!("Message to {} sent!", a1);
@ -195,6 +196,9 @@ impl ModemManager { @@ -195,6 +196,9 @@ impl ModemManager {
}).map_err(move |e: ::failure::Error| {
// FIXME: retrying?
warn!("Failed to send message to {}: {}", addr, e);
let emsg = format!("Failed to send message to {}: {}", addr, e);
cb_tx.unbounded_send(ControlBotCommand::ReportFailure(emsg))
.unwrap();
});
self.handle.spawn(fut);
}

21
src/schema.rs

@ -1,11 +1,22 @@ @@ -1,11 +1,22 @@
//! Diesel DSL schema (generated by `diesel print-schema`)
table! {
groups (id) {
id -> Int4,
jid -> Varchar,
channel -> Varchar,
participants -> Array<Int4>,
admins -> Array<Int4>,
topic -> Varchar,
}
}
table! {
messages (id) {
id -> Int4,
phone_number -> Varchar,
pdu -> Bytea,
pdu -> Nullable<Bytea>,
csms_data -> Nullable<Int4>,
group_target -> Nullable<Int4>,
text -> Nullable<Varchar>,
}
}
@ -17,3 +28,9 @@ table! { @@ -17,3 +28,9 @@ table! {
}
}
table! {
wa_persistence (rev) {
rev -> Int4,
data -> Json,
}
}

123
src/store.rs

@ -7,6 +7,9 @@ use config::Config; @@ -7,6 +7,9 @@ use config::Config;
use std::sync::Arc;
use huawei_modem::pdu::PduAddress;
use diesel::prelude::*;
use serde_json;
use whatsappweb::connection::PersistentSession;
use whatsappweb::Jid;
use util::{self, Result};
use models::*;
@ -39,6 +42,70 @@ impl Store { @@ -39,6 +42,70 @@ impl Store {
.get_result(&*conn)?;
Ok(res)
}
pub fn store_plain_message(&mut self, addr: &PduAddress, text: &str, group_target: Option<i32>) -> Result<Message> {
use schema::messages;
let num = util::normalize_address(addr);
let nm = NewPlainMessage {
phone_number: &num,
text,
group_target
};
let conn = self.inner.get()?;
let res = ::diesel::insert_into(messages::table)
.values(&nm)
.get_result(&*conn)?;
Ok(res)
}
pub fn store_wa_persistence(&mut self, p: PersistentSession) -> Result<()> {
use schema::wa_persistence;
use schema::wa_persistence::dsl::*;
let pdata = serde_json::to_value(&p)?;
let pdata = PersistenceData {
rev: 0,
data: pdata
};
let conn = self.inner.get()?;
::diesel::insert_into(wa_persistence::table)
.values(&pdata)
.on_conflict(rev)
.do_update()
.set(data.eq(::diesel::pg::upsert::excluded(data)))
.execute(&*conn)?;
Ok(())
}
pub fn store_group(&mut self, jid: &Jid, channel: &str, participants: Vec<i32>, admins: Vec<i32>, topic: &str) -> Result<Group> {
use schema::groups;
let jid = jid.to_string();
let newgrp = NewGroup {
jid: &jid,
channel, participants, admins, topic
};
let conn = self.inner.get()?;
let res = ::diesel::insert_into(groups::table)
.values(&newgrp)
.get_result(&*conn)?;
Ok(res)
}
pub fn get_wa_persistence_opt(&mut self) -> Result<Option<PersistentSession>> {
use schema::wa_persistence::dsl::*;
let conn = self.inner.get()?;
let res: Option<PersistenceData> = wa_persistence.filter(rev.eq(0))
.first(&*conn)
.optional()?;
let res = match res {
Some(res) => {
let res: PersistentSession = serde_json::from_value(res.data)?;
Some(res)
},
None => None
};
Ok(res)
}
pub fn store_recipient(&mut self, addr: &PduAddress, nick: &str) -> Result<Recipient> {
use schema::recipients;
@ -91,6 +158,51 @@ impl Store { @@ -91,6 +158,51 @@ impl Store {
.load(&*conn)?;
Ok(res)
}
pub fn get_group_by_id(&mut self, gid: i32) -> Result<Group> {
use schema::groups::dsl::*;
let conn = self.inner.get()?;
let res = groups.filter(id.eq(gid))
.first(&*conn)?;
Ok(res)
}
pub fn get_all_groups(&mut self) -> Result<Vec<Group>> {
use schema::groups::dsl::*;
let conn = self.inner.get()?;
let res = groups
.load(&*conn)?;
Ok(res)
}
pub fn get_group_by_jid_opt(&mut self, j: &Jid) -> Result<Option<Group>> {
use schema::groups::dsl::*;
let j = j.to_string();
let conn = self.inner.get()?;
let res = groups.filter(jid.eq(j))
.first(&*conn)
.optional()?;
Ok(res)
}
pub fn get_group_by_chan_opt(&mut self, c: &str) -> Result<Option<Group>> {
use schema::groups::dsl::*;
let conn = self.inner.get()?;
let res = groups.filter(channel.eq(c))
.first(&*conn)
.optional()?;
Ok(res)
}
pub fn get_groups_for_recipient(&mut self, addr: &PduAddress) -> Result<Vec<Group>> {
use schema::groups::dsl::*;
let r = self.get_recipient_by_addr_opt(addr)?
.ok_or(format_err!("get_groups_for_recipient couldn't find recipient"))?;
let conn = self.inner.get()?;
let res = groups.filter(participants.contains(vec![r.id]))
.load(&*conn)?;
Ok(res)
}
pub fn get_messages_for_recipient(&mut self, addr: &PduAddress) -> Result<Vec<Message>> {
use schema::messages::dsl::*;
let conn = self.inner.get()?;
@ -109,6 +221,17 @@ impl Store { @@ -109,6 +221,17 @@ impl Store {
.load(&*conn)?;
Ok(res)
}
pub fn delete_group_with_id(&mut self, i: i32) -> Result<()> {
use schema::groups::dsl::*;
let conn = self.inner.get()?;
let rows_affected = ::diesel::delete(groups.filter(id.eq(i)))
.execute(&*conn)?;
if rows_affected == 0 {
return Err(format_err!("no rows affected deleting gid {}", i));
}
Ok(())
}
pub fn delete_recipient_with_addr(&mut self, addr: &PduAddress) -> Result<()> {
use schema::recipients::dsl::*;
let conn = self.inner.get()?;

22
src/util.rs

@ -1,9 +1,23 @@ @@ -1,9 +1,23 @@
//! Helpful utility functions.
use huawei_modem::pdu::{PduAddress, AddressType, HexData, PhoneNumber};
use huawei_modem::convert::TryFrom;
use whatsappweb::Jid;
pub type Result<T> = ::std::result::Result<T, ::failure::Error>;
pub fn jid_to_address(jid: &Jid) -> Option<PduAddress> {
if let Some(pn) = jid.phonenumber() {
let toa = AddressType::default();
let num = PhoneNumber::from(pn.as_bytes());
Some(PduAddress {
type_addr: toa,
number: num
})
}
else {
None
}
}
pub fn normalize_address(addr: &PduAddress) -> String {
let ton: u8 = addr.type_addr.into();
let mut ret = format!("{:02X}", ton);
@ -23,9 +37,9 @@ pub fn un_normalize_address(addr: &str) -> Option<PduAddress> { @@ -23,9 +37,9 @@ pub fn un_normalize_address(addr: &str) -> Option<PduAddress> {
number: num
})
}
pub fn make_nick_for_address(addr: &PduAddress) -> String {
pub fn string_to_irc_nick(inp: &str) -> String {
let mut ret = "S".to_string();
for ch in addr.to_string().chars() {
for ch in inp.chars() {
match ch {
'+' => ret.push('I'),
'_' => ret.push('_'),
@ -49,3 +63,7 @@ pub fn make_nick_for_address(addr: &PduAddress) -> String { @@ -49,3 +63,7 @@ pub fn make_nick_for_address(addr: &PduAddress) -> String {
}
ret
}
pub fn make_nick_for_address(addr: &PduAddress) -> String {
let inp = addr.to_string();
string_to_irc_nick(&inp)
}

449
src/whatsapp.rs

@ -0,0 +1,449 @@ @@ -0,0 +1,449 @@
//! Experimental support for WhatsApp.
use whatsappweb::connection::{WhatsappWebConnection, WhatsappWebHandler};
use whatsappweb::connection::State as WaState;
use whatsappweb::Jid;
use whatsappweb::Contact as WaContact;
use whatsappweb::Chat as WaChat;
use whatsappweb::GroupMetadata;
use whatsappweb::connection::UserData as WaUserData;
use whatsappweb::connection::PersistentSession as WaPersistentSession;
use whatsappweb::connection::DisconnectReason as WaDisconnectReason;
use whatsappweb::message::ChatMessage as WaMessage;
use huawei_modem::pdu::PduAddress;
use futures::sync::mpsc::{self, UnboundedSender, UnboundedReceiver};
use std::collections::HashMap;
use store::Store;
use std::sync::Arc;
use comm::{WhatsappCommand, ContactFactoryCommand, ControlBotCommand, InitParameters};
use util::{self, Result};
use image::Luma;
use qrcode::QrCode;
use futures::{Future, Async, Poll, Stream};
use failure::Error;
struct WhatsappHandler {
tx: Arc<UnboundedSender<WhatsappCommand>>
}
impl WhatsappWebHandler for WhatsappHandler {
fn on_state_changed(&self, _: &WhatsappWebConnection<Self>, state: WaState) {
self.tx.unbounded_send(WhatsappCommand::StateChanged(state))
.unwrap();
}
fn on_user_data_changed(&self, _: &WhatsappWebConnection<Self>, user_data: WaUserData) {
self.tx.unbounded_send(WhatsappCommand::UserDataChanged(user_data))
.unwrap();
}
fn on_persistent_session_data_changed(&self, ps: WaPersistentSession) {
self.tx.unbounded_send(WhatsappCommand::PersistentChanged(ps))
.unwrap();
}
fn on_disconnect(&self, reason: WaDisconnectReason) {
self.tx.unbounded_send(WhatsappCommand::Disconnect(reason))
.unwrap();
}
fn on_message(&self, _: &WhatsappWebConnection<Self>, new: bool, message: Box<WaMessage>) {
self.tx.unbounded_send(WhatsappCommand::Message(new, message))
.unwrap();
}
}
pub struct WhatsappManager {
conn: Option<WhatsappWebConnection<WhatsappHandler>>,
rx: UnboundedReceiver<WhatsappCommand>,
wa_tx: Arc<UnboundedSender<WhatsappCommand>>,
cf_tx: UnboundedSender<ContactFactoryCommand>,
cb_tx: UnboundedSender<ControlBotCommand>,
contacts: HashMap<Jid, WaContact>,
chats: HashMap<Jid, WaChat>,
groups: HashMap<Jid, GroupMetadata>,
state: WaState,
connected: bool,
store: Store,
qr_path: String
}
impl Future for WhatsappManager {
type Item = ();
type Error = Error;
fn poll(&mut self) -> Poll<(), Error> {
while let Async::Ready(com) = self.rx.poll().unwrap() {
let com = com.ok_or(format_err!("whatsappmanager rx died"))?;
self.handle_int_rx(com)?;
}
Ok(Async::NotReady)
}
}
impl WhatsappManager {
pub fn new(p: InitParameters) -> Self {
let store = p.store;
let (wa_tx, rx) = mpsc::unbounded();
let cf_tx = p.cm.cf_tx.clone();
let cb_tx = p.cm.cb_tx.clone();
let qr_path = p.cfg.qr_path.clone().unwrap_or("/tmp/wa_qr.png".into());
wa_tx.unbounded_send(WhatsappCommand::LogonIfSaved)
.unwrap();
Self {
conn: None,
contacts: HashMap::new(),
chats: HashMap::new(),
groups: HashMap::new(),
state: WaState::Uninitialized,
connected: false,
wa_tx: Arc::new(wa_tx),
rx, cf_tx, cb_tx, qr_path, store
}
}
fn handle_int_rx(&mut self, c: WhatsappCommand) -> Result<()> {
use self::WhatsappCommand::*;
match c {
StartRegistration => self.start_registration()?,
LogonIfSaved => self.logon_if_saved()?,
QrCode(qr) => self.on_qr(qr)?,
SendGroupMessage(to, cont) => self.send_group_message(to, cont)?,
SendDirectMessage(to, cont) => self.send_direct_message(to, cont)?,
GroupAssociate(jid, to) => self.group_associate(jid, to)?,
GroupList => self.group_list()?,
GroupRemove(grp) => self.group_remove(grp)?,
StateChanged(was) => self.on_state_changed(was),
UserDataChanged(wau) => self.on_user_data_changed(wau),
PersistentChanged(wap) => self.on_persistent_session_data_changed(wap)?,
Disconnect(war) => self.on_disconnect(war),
Message(new, msg) => {
if new {
self.on_message(msg)?;
}
}
}
Ok(())
}
fn logon_if_saved(&mut self) -> Result<()> {
use whatsappweb::connection;
if let Some(wap) = self.store.get_wa_persistence_opt()? {
info!("Logging on to WhatsApp Web using stored persistence data");
let tx = self.wa_tx.clone();
let (conn, _) = connection::with_persistent_session(wap, WhatsappHandler { tx });
self.conn = Some(conn);
}
Ok(())
}
fn start_registration(&mut self) -> Result<()> {
use whatsappweb::connection;
info!("Beginning WhatsApp Web registration process");
let tx = self.wa_tx.clone();
let tx2 = self.wa_tx.clone();
let (conn, _) = connection::new(move |qr| {
tx2.unbounded_send(WhatsappCommand::QrCode(qr))
.unwrap()
}, WhatsappHandler { tx });
self.conn = Some(conn);
Ok(())
}
fn cb_respond(&mut self, s: String) {
self.cb_tx.unbounded_send(ControlBotCommand::CommandResponse(s))
.unwrap();
}
fn on_qr(&mut self, qr: QrCode) -> Result<()> {
info!("Processing registration QR code...");
qr.render::<Luma<u8>>()
.module_dimensions(10, 10)
.build()
.save(&self.qr_path)?;
let qrn = format!("Scan the QR code saved at {} to log in!", self.qr_path);
self.cb_respond(qrn);
self.cb_respond(format!("NB: The code is only valid for a few seconds, so scan quickly!"));
Ok(())
}
fn send_direct_message(&mut self, addr: PduAddress, content: String) -> Result<()> {
use whatsappweb::message::ChatMessageContent;
debug!("Sending direct message to {}...", addr);
trace!("Message contents: {}", content);
if self.conn.is_none() || !self.connected {
warn!("Tried to send WA message to {} while disconnected!", addr);
self.cb_tx.unbounded_send(ControlBotCommand::ReportFailure(format!("Failed to send to WA contact {}: disconnected from server", addr)))
.unwrap();
return Ok(());
}
match Jid::from_phone_number(format!("{}", addr)) {
Ok(jid) => {
let content = ChatMessageContent::Text(content);
self.conn.as_mut().unwrap()
.send_message(content, jid);
debug!("WA direct message sent (probably)");
},
Err(e) => {
warn!("Couldn't send WA message to {}: {}", addr, e);
self.cb_tx.unbounded_send(ControlBotCommand::ReportFailure(format!("Failed to send to WA contact {}: {}", addr, e)))
.unwrap();
}
}
Ok(())
}
fn send_group_message(&mut self, chan: String, content: String) -> Result<()> {
use whatsappweb::message::ChatMessageContent;
debug!("Sending message to group with chan {}...", chan);
trace!("Message contents: {}", content);
if self.conn.is_none() || !self.connected {
warn!("Tried to send WA message to group {} while disconnected!", chan);
self.cb_tx.unbounded_send(ControlBotCommand::ReportFailure(format!("Failed to send to group {}: disconnected from server", chan)))
.unwrap();
return Ok(());
}
if let Some(grp) = self.store.get_group_by_chan_opt(&chan)? {
let jid = grp.jid.parse().expect("bad jid in DB");
let content = ChatMessageContent::Text(content);
self.conn.as_mut().unwrap()
.send_message(content, jid);
debug!("WA group message sent (probably)");
}
else {
error!("Tried to send WA message to nonexistent group {}", chan);
}
Ok(())
}
fn on_message(&mut self, msg: Box<WaMessage>) -> Result<()> {
use whatsappweb::message::{Direction, Peer, ChatMessageContent};
trace!("processing WA message: {:?}", msg);
let msg = *msg; // otherwise stupid borrowck gets angry, because Box
let WaMessage { direction, content, .. } = msg;
if let Direction::Receiving(peer) = direction {
let (from, group) = match peer {
Peer::Individual(j) => (j, None),
Peer::Group { group, participant } => (participant, Some(group))
};
let group = match group {
Some(gid) => {
if let Some(grp) = self.store.get_group_by_jid_opt(&gid)? {
Some(grp.id)
}
else {
info!("Received message for unbridged group {}, ignoring...", gid.to_string());
return Ok(());
}
},
None => None
};
let text = match content {
ChatMessageContent::Text(s) => s,
x => format!("[unimplemented message type: {:?}]", x)
};
if let Some(addr) = util::jid_to_address(&from) {
self.store.store_plain_message(&addr, &text, group)?;
self.cf_tx.unbounded_send(ContactFactoryCommand::ProcessMessages)
.unwrap();
}
}
Ok(())
}
fn group_list(&mut self) -> Result<()> {
let mut list = vec![];