You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

937 lines
39 KiB

//! Experimental support for WhatsApp.
use whatsappweb::Jid;
use whatsappweb::Contact as WaContact;
use whatsappweb::Chat as WaChat;
use whatsappweb::GroupMetadata;
use whatsappweb::message::ChatMessage as WaMessage;
use whatsappweb::message::{ChatMessageContent, Peer, MessageId, MessageStubType};
use whatsappweb::session::PersistentSession as WaPersistentSession;
use whatsappweb::event::WaEvent;
use whatsappweb::req::WaRequest;
use whatsappweb::errors::WaError;
use whatsappweb::errors::DisconnectReason as WaDisconnectReason;
use huawei_modem::pdu::PduAddress;
4 years ago
use futures::sync::mpsc::{UnboundedSender, UnboundedReceiver};
use std::collections::HashMap;
use std::sync::Arc;
use image::Luma;
use qrcode::QrCode;
use futures::{Future, Async, Poll, Stream, Sink};
use failure::Error;
use chrono::prelude::*;
use std::time::{Instant, Duration};
use std::collections::VecDeque;
use crate::comm::{WhatsappCommand, ContactFactoryCommand, ControlBotCommand, InitParameters};
use crate::util::{self, Result};
use crate::models::Recipient;
use crate::whatsapp_media::MediaResult;
use crate::store::Store;
use crate::whatsapp_conn::{WebConnectionWrapper, WebConnectionWrapperConfig};
use crate::whatsapp_msg::{IncomingMessage, WaMessageProcessor};
use crate::whatsapp_ack::WaAckTracker;
pub struct WhatsappManager {
conn: WebConnectionWrapper,
rx: UnboundedReceiver<WhatsappCommand>,
cf_tx: UnboundedSender<ContactFactoryCommand>,
cb_tx: UnboundedSender<ControlBotCommand>,
contacts: HashMap<Jid, WaContact>,
chats: HashMap<Jid, WaChat>,
presence_requests: HashMap<Jid, Instant>,
msgproc: WaMessageProcessor,
ackp: WaAckTracker,
backlog_start: Option<chrono::NaiveDateTime>,
connected: bool,
store: Store,
qr_path: String,
autocreate: Option<String>,
autoupdate_nicks: bool,
mark_read: bool,
drop_non_group_messages_on_the_floor: bool,
track_presence: bool,
our_jid: Option<Jid>,
prev_jid: Option<Jid>,
outbox: VecDeque<WaRequest>
}
impl Future for WhatsappManager {
type Item = ();
type Error = Error;
fn poll(&mut self) -> Poll<(), Error> {
// We use this `cont` variable to handle the case where
// we change the state of `self.conn` during `handle_int_rx()`,
// and therefore need to poll it again.
let mut cont = true;
while cont {
cont = false;
while let Async::Ready(evt) = self.conn.poll()? {
match evt.expect("None from wrapper impossible") {
Ok(e) => self.on_wa_event(e)?,
Err(e) => self.on_wa_error(e)
}
}
while let Async::Ready(com) = self.rx.poll().unwrap() {
let com = com.ok_or(format_err!("whatsappmanager rx died"))?;
self.handle_int_rx(com)?;
cont = true;
}
if self.outbox.len() > 0 {
if self.conn.is_connected() {
while let Some(req) = self.outbox.pop_front() {
if let Err(e) = self.conn.start_send(req) {
self.on_wa_error(e);
break;
}
}
}
else {
warn!("Disconnected, so discarding messages in outbox");
self.outbox.clear();
}
}
if self.conn.is_connected() {
self.conn.poll_complete()?;
}
}
self.ackp.poll()?;
Ok(Async::NotReady)
}
}
impl WhatsappManager {
pub fn new<T>(p: InitParameters<T>) -> Self {
let ackp = WaAckTracker::new(&p);
let store = p.store.clone();
4 years ago
let wa_tx = p.cm.wa_tx.clone();
let rx = p.cm.wa_rx.take().unwrap();
let cf_tx = p.cm.cf_tx.clone();
let cb_tx = p.cm.cb_tx.clone();
let media_path = p.cfg.whatsapp.media_path.clone().unwrap_or("/tmp/wa_media".into());
let qr_path = format!("{}/qr.png", media_path);
let dl_path = p.cfg.whatsapp.dl_path.clone().unwrap_or("file:///tmp/wa_media".into());
let autocreate = p.cfg.whatsapp.autocreate_prefix.clone();
let backlog_start = p.cfg.whatsapp.backlog_start.clone();
let mark_read = p.cfg.whatsapp.mark_read;
let drop_non_group_messages_on_the_floor = p.cfg.whatsapp.drop_non_group_messages_on_the_floor;
let autoupdate_nicks = p.cfg.whatsapp.autoupdate_nicks;
let backoff_time_ms = p.cfg.whatsapp.backoff_time_ms.unwrap_or(10000);
let track_presence = p.cfg.whatsapp.track_presence;
wa_tx.unbounded_send(WhatsappCommand::LogonIfSaved)
.unwrap();
let wa_tx = Arc::new(wa_tx);
let msgproc = WaMessageProcessor { store: store.clone(), media_path, dl_path, wa_tx };
let conn = WebConnectionWrapper::new(WebConnectionWrapperConfig {
backoff_time_ms
});
if drop_non_group_messages_on_the_floor {
error!("I'm going to drop all non-group messages on the floor! Are you sure this is a good idea?!");
}
Self {
conn,
contacts: HashMap::new(),
chats: HashMap::new(),
connected: false,
our_jid: None,
prev_jid: None,
presence_requests: HashMap::new(),
outbox: VecDeque::new(),
backlog_start,
rx, cf_tx, cb_tx, qr_path, store, msgproc, autocreate,
mark_read, autoupdate_nicks, track_presence, ackp,
drop_non_group_messages_on_the_floor
}
}
fn handle_int_rx(&mut self, c: WhatsappCommand) -> Result<()> {
use self::WhatsappCommand::*;
match c {
StartRegistration => self.start_registration()?,
LogonIfSaved => self.logon_if_saved()?,
SendGroupMessage(to, cont) => self.send_group_message(to, cont)?,
SendDirectMessage(to, cont) => self.send_direct_message(to, cont)?,
GroupAssociate(jid, to) => self.group_associate_handler(jid, to)?,
GroupList => self.group_list()?,
GroupUpdateAll => self.group_update_all()?,
GroupRemove(grp) => self.group_remove(grp)?,
MediaFinished(r) => self.media_finished(r)?,
PrintAcks => self.print_acks()?,
MakeContact(a) => self.make_contact(a)?,
SubscribePresence(a) => self.subscribe_presence(a)?
}
Ok(())
}
fn subscribe_presence(&mut self, addr: PduAddress) -> Result<()> {
match util::address_to_jid(&addr) {
Ok(from) => {
let recip = self.get_wa_recipient(&from)?;
if self.connected {
self.cb_respond(format!("Subscribing to presence updates from '{}' (jid {})`", recip.nick, from));
self.outbox.push_back(WaRequest::SubscribePresence(from));
}
else {
self.cb_respond("Error subscribing: not connected to WA");
}
},
Err(_) => {
self.cb_respond("Error subscribing: invalid PduAddress");
}
}
Ok(())
}
fn make_contact(&mut self, addr: PduAddress) -> Result<()> {
match util::address_to_jid(&addr) {
Ok(from) => {
let _ = self.get_wa_recipient(&from)?;
self.cf_tx.unbounded_send(ContactFactoryCommand::ProcessMessages)
.unwrap();
},
Err(e) => {
error!("Couldn't make contact for {}: {}", addr, e);
}
}
Ok(())
}
fn print_acks(&mut self) -> Result<()> {
for line in self.ackp.print_acks() {
self.cb_respond(line);
}
Ok(())
}
fn media_finished(&mut self, r: MediaResult) -> Result<()> {
match r.result {
Ok(ret) => {
debug!("Media download/decryption job for {} / mid {:?} complete.", r.from.to_string(), r.mi);
self.store_message(&r.from, &ret, r.group, r.ts)?;
},
Err(e) => {
// FIXME: We could possibly retry the download somehow.
warn!("Decryption job failed for {} / mid {:?}: {}", r.from.to_string(), r.mi, e);
let msg = "\x01ACTION uploaded media (couldn't download)\x01";
self.store_message(&r.from, msg, r.group, r.ts)?;
}
}
self.store.store_wa_msgid(r.mi.0.clone())?;
if self.mark_read {
if let Some(p) = r.peer {
self.outbox.push_back(WaRequest::MessageRead {
mid: r.mi,
peer: p
});
}
}
Ok(())
}
fn logon_if_saved(&mut self) -> Result<()> {
if let Some(wap) = self.store.get_wa_persistence_opt()? {
info!("Logging on to WhatsApp Web using stored persistence data");
self.conn.connect_persistent(wap);
}
4 years ago
else {
info!("WhatsApp is not configured");
4 years ago
}
Ok(())
}
fn start_registration(&mut self) -> Result<()> {
info!("Creating a new WhatsApp Web session");
self.conn.connect_new();
Ok(())
}
fn cb_respond<T: Into<String>>(&mut self, s: T) {
self.cb_tx.unbounded_send(ControlBotCommand::CommandResponse(s.into()))
.unwrap();
}
fn on_qr(&mut self, qr: QrCode) -> Result<()> {
info!("Processing registration QR code...");
qr.render::<Luma<u8>>()
.module_dimensions(10, 10)
.build()
.save(&self.qr_path)?;
let qrn = format!("Scan the QR code saved at {} to log in!", self.qr_path);
self.cb_respond(qrn);
self.cb_respond("NB: The code is only valid for a few seconds, so scan quickly!");
Ok(())
}
fn queue_message(&mut self, content: ChatMessageContent, jid: Jid) {
let mid = MessageId::generate();
debug!("Queued send to {}: message ID {}", jid, mid.0);
self.ackp.register_send(jid, content, mid.0, true);
if self.conn.is_disabled() {
let err = "Warning: WhatsApp Web is currently not set up, but you've tried to send something.";
self.cb_tx.unbounded_send(ControlBotCommand::ReportFailure(err.into())).unwrap();
}
}
fn send_message(&mut self, content: ChatMessageContent, jid: Jid) -> Result<()> {
let (c, j) = (content.clone(), jid.clone());
let m = WaMessage::new(jid, content);
debug!("Send to {}: message ID {}", j, m.id.0);
self.store.store_wa_msgid(m.id.0.clone())?;
self.ackp.register_send(j.clone(), c, m.id.0.clone(), false);
self.outbox.push_back(WaRequest::SendMessage(m));
if !j.is_group && self.track_presence {
let mut update = true;
let now = Instant::now();
if let Some(inst) = self.presence_requests.get(&j) {
// WhatsApp stops sending you presence updates after about 10 minutes.
// To avoid this, we resubscribe about every 5.
if now.duration_since(*inst) < Duration::new(300, 0) {
update = false;
}
}
if update {
debug!("Requesting presence updates for {}", j);
self.presence_requests.insert(j.clone(), now);
self.outbox.push_back(WaRequest::SubscribePresence(j));
}
}
Ok(())
}
fn send_direct_message(&mut self, addr: PduAddress, content: String) -> Result<()> {
debug!("Sending direct message to {}...", addr);
trace!("Message contents: {}", content);
match Jid::from_phonenumber(format!("{}", addr)) {
Ok(jid) => {
let content = ChatMessageContent::Text(content);
if !self.connected || !self.conn.is_connected() {
self.queue_message(content, jid);
}
else {
self.send_message(content, jid)?;
}
},
Err(e) => {
warn!("Couldn't send WA message to {}: {}", addr, e);
self.cb_tx.unbounded_send(ControlBotCommand::ReportFailure(format!("Failed to send to WA contact {}: {}", addr, e)))
.unwrap();
}
}
Ok(())
}
fn send_group_message(&mut self, chan: String, content: String) -> Result<()> {
debug!("Sending message to group with chan {}...", chan);
trace!("Message contents: {}", content);
if let Some(grp) = self.store.get_group_by_chan_opt(&chan)? {
let jid = grp.jid.parse().expect("bad jid in DB");
let content = ChatMessageContent::Text(content);
if !self.connected || !self.conn.is_connected() {
self.queue_message(content, jid);
}
else {
self.send_message(content, jid)?;
}
}
else {
error!("Tried to send WA message to nonexistent group {}", chan);
}
Ok(())
}
fn on_message(&mut self, msg: WaMessage, is_new: bool) -> Result<()> {
use whatsappweb::message::{Direction};
trace!("processing WA message (new {}): {:?}", is_new, msg);
let WaMessage { direction, content, id, quoted, stub_type, .. } = msg;
debug!("got message from dir {:?}", direction);
// If we don't mark things as read, we have to check every 'new' message,
// because they might not actually be new.
if !self.mark_read || !is_new {
if self.store.is_wa_msgid_stored(&id.0)? {
debug!("Rejecting backlog message: already in database");
return Ok(());
}
}
if !is_new {
debug!("message timestamp: {}", msg.time);
if let Some(ref bsf) = self.backlog_start {
if *bsf > msg.time {
debug!("Rejecting backlog message: before backlog start time");
return Ok(());
}
}
}
let mut peer = None;
let mut is_ours = false;
let (from, group) = match direction {
Direction::Sending(jid) => {
let ojid = self.our_jid.clone()
.ok_or(format_err!("our_jid empty"))?;
is_ours = true;
let group = if jid.is_group {
Some(jid)
}
else {
debug!("Received self-message in a 1-to-1 chat, ignoring...");
self.store.store_wa_msgid(id.0.clone())?;
return Ok(());
};
(ojid, group)
},
Direction::Receiving(p) => {
peer = Some(p.clone());
match p {
Peer::Individual(j) => (j, None),
Peer::Group { group, participant } => (participant, Some(group))
}
}
};
let group = match group {
Some(gid) => {
if gid.id == "status" {
return Ok(());
}
if let Some(grp) = self.store.get_group_by_jid_opt(&gid)? {
Some(grp.id)
}
else {
if self.autocreate.is_some() {
info!("Attempting to autocreate channel for unbridged group {}...", gid);
match self.group_autocreate_from_unbridged(gid.clone()) {
Ok(id) => Some(id),
Err(e) => {
warn!("Autocreation failed: {} - requesting metadata instead", e);
self.request_update_group(gid)?;
return Ok(());
}
}
}
else {
info!("Received message for unbridged group {}, ignoring...", gid);
return Ok(());
}
}
},
None => None
};
let inc = IncomingMessage {
id: id.clone(),
peer: peer.clone(),
ts: msg.time,
stub_type: stub_type.clone(),
from, group, content, quoted
};
let (msgs, is_media) = self.msgproc.process_wa_incoming(inc)?;
let num_msgs = msgs.len();
for msg in msgs {
if self.drop_non_group_messages_on_the_floor && msg.group.is_some() && !is_media {
// *thud*
}
else {
self.store_message(&msg.from, &msg.text, msg.group, msg.ts)?;
}
}
if is_media {
/* none of the other fancy stuff below applies */
}
// If no messages are generated from the processor, say something about it.
else if num_msgs == 0 {
if let Some(st) = stub_type {
warn!("Message {} has stub type {:?}", id.0, st);
// CIPHERTEXT stubs mean "I'm about to send you the real message, but I can't
// just now because rekeying or something". The real message contents get
// sent with the same message ID, so storing it in the database now is a
// Bad Idea.
//
// All other stubs *should* (!) be fine though, and we store them here (after
// generating a loud warning in case it's _not_ fine), to avoid generating
// said loud warning every time we reconnect and load in backlog.
if st != MessageStubType::CIPHERTEXT {
self.store.store_wa_msgid(id.0.clone())?;
}
}
else {
// This is an interesting case, and shouldn't really happen...
warn!("Message {} is empty, and isn't even a stub!", id.0);
}
}
else {
self.store.store_wa_msgid(id.0.clone())?;
}
if let Some(p) = peer {
if !is_media && !is_ours && self.mark_read {
self.outbox.push_back(WaRequest::MessageRead {
mid: id,
peer: p
});
}
}
Ok(())
}
fn store_message(&mut self, from: &Jid, text: &str, group: Option<i32>, ts: NaiveDateTime) -> Result<()> {
if let Some(addr) = util::jid_to_address(from) {
let _ = self.get_wa_recipient(from)?;
self.store.store_wa_message(&addr, &text, group, ts)?;
self.cf_tx.unbounded_send(ContactFactoryCommand::ProcessMessages)
.unwrap();
}
else {
warn!("couldn't make address for jid {}", from.to_string());
}
Ok(())
}
fn group_list(&mut self) -> Result<()> {
let mut list = vec![];
for (jid, gmeta) in self.chats.iter() {
let bstatus = if let Some(grp) = self.store.get_group_by_jid_opt(jid)? {
format!("\x02\x0309group bridged to {}\x0f", grp.channel)
}
else {
if jid.is_group {
format!("\x02\x0304unbridged group\x0f")
}
else {
format!("\x021-to-1 chat\x0f")
}
};
list.push(format!("- '{}' (jid {}) - {}", gmeta.name.as_ref().map(|x| x as &str).unwrap_or("<unnamed>"), jid, bstatus));
}
if list.len() == 0 {
self.cb_respond("no WhatsApp chats (yet?)");
}
else {
self.cb_respond("WhatsApp chats:");
}
for item in list {
self.cb_respond(item);
}
Ok(())
}
fn get_nick_for_jid(&mut self, jid: &Jid) -> Result<(String, i32)> {
if let Some(ct) = self.contacts.get(jid) {
if let Some(ref name) = ct.name {
let nick = util::string_to_irc_nick(&name);
return Ok((nick, Recipient::NICKSRC_WA_CONTACT));
}
else if let Some(ref name) = ct.notify {
let nick = util::string_to_irc_nick(&name);
return Ok((nick, Recipient::NICKSRC_WA_NOTIFY));
}
}
let addr = match util::jid_to_address(jid) {
Some(a) => a,
None => {
return Err(format_err!("couldn't translate jid {} to address", jid));
}
};
Ok((util::make_nick_for_address(&addr), Recipient::NICKSRC_AUTO))
}
fn get_wa_recipient(&mut self, jid: &Jid) -> Result<Recipient> {
let addr = match util::jid_to_address(jid) {
Some(a) => a,
None => {
return Err(format_err!("couldn't translate jid {} to address", jid));
}
};
if let Some(recip) = self.store.get_recipient_by_addr_opt(&addr)? {
Ok(recip)
}
else {
let (nick, nicksrc) = self.get_nick_for_jid(jid)?;
info!("Creating new WA recipient for {} (nick {}, src {})", addr, nick, nicksrc);
let notify = self.contacts.get(jid).and_then(|x| x.notify.as_ref().map(|x| x as &str));
let ret = self.store.store_wa_recipient(&addr, &nick, notify, nicksrc)?;
self.cf_tx.unbounded_send(ContactFactoryCommand::SetupContact(addr.clone()))
.unwrap();
Ok(ret)
}
}
fn on_got_group_metadata(&mut self, grp: GroupMetadata) -> Result<()> {
match self.store.get_group_by_jid_opt(&grp.id)? {
Some(g) => {
info!("Got metadata for group '{}' (jid {}, id {})", grp.subject, grp.id, g.id);
},
None => {
warn!("Got metadata for unbridged group '{}' (jid {})", grp.subject, grp.id);
if self.autocreate.is_some() {
match self.group_autocreate(grp.clone()) {
Ok((id, chan)) => {
self.cb_respond(format!("Automatically bridged new group '{}' to channel {} (id {})", grp.subject, chan, id));
},
Err(e) => {
warn!("Autocreation failed for group {}: {}", grp.id, e);
self.cb_respond(format!("Failed to autocreate new group {} - check logs for details.", grp.id));
}
}
}
}
}
let mut participants = vec![];
let mut admins = vec![];
for &(ref jid, admin) in grp.participants.iter() {
let recip = self.get_wa_recipient(jid)?;
participants.push(recip.id);
if admin {
admins.push(recip.id);
}
}
self.store.update_group(&grp.id, participants, admins, &grp.subject)?;
self.on_groups_changed();
Ok(())
}
fn group_update_all(&mut self) -> Result<()> {
info!("Updating metadata for ALL groups");
for grp in self.store.get_all_groups()? {
if let Ok(j) = grp.jid.parse() {
self.request_update_group(j)?;
}
}
Ok(())
}
fn request_update_group(&mut self, jid: Jid) -> Result<()> {
info!("Getting metadata for jid {}", jid);
self.outbox.push_back(WaRequest::GetGroupMetadata(jid));
Ok(())
}
/// Auto-create a group after a GroupIntroduction message.
///
/// This is the nicest way to autocreate a group, because we get all the metadata straight off.
fn group_autocreate_from_intro(&mut self, meta: GroupMetadata) -> Result<()> {
let jid = meta.id.to_string();
info!("Attempting to autocreate channel for new group {}...", jid);
let subj = meta.subject.clone();
match self.group_autocreate(meta) {
Ok((id, chan)) => {
self.cb_respond(format!("Automatically bridged new group '{}' to channel {} (id {})", subj, chan, id));
},
Err(e) => {
warn!("Autocreation failed for group {}: {}", jid, e);
self.cb_respond(format!("Failed to autocreate new group {} - check logs for details.", jid));
}
}
Ok(())
}
/// Auto-create a group that we've received an unbridged message for.
///
/// Here, we have to hope that we've got data in our initial chat list for this group;
/// otherwise, we can't know the subject of the group in order to autocreate it.
fn group_autocreate_from_unbridged(&mut self, jid: Jid) -> Result<i32> {
let chat = match self.chats.get(&jid) {
Some(c) => c.clone(),
None => bail!("chat not in chat list")
};
let name = match chat.name {
Some(n) => n,
None => bail!("chat unnamed")
};
let irc_subject = util::string_to_irc_chan(&name);
let chan = format!("{}-{}", self.autocreate.as_ref().unwrap(), irc_subject);
let id = self.group_associate(jid, chan.clone(), true)?;
Ok(id)
}
fn group_autocreate(&mut self, meta: GroupMetadata) -> Result<(i32, String)> {
let irc_subject = util::string_to_irc_chan(&meta.subject);
let chan = format!("{}-{}", self.autocreate.as_ref().unwrap(), irc_subject);
let id = self.group_associate(meta.id.clone(), chan.clone(), false)?;
self.on_got_group_metadata(meta)?;
Ok((id, chan))
}
fn group_associate(&mut self, jid: Jid, chan: String, request_update: bool) -> Result<i32> {
if let Some(grp) = self.store.get_group_by_jid_opt(&jid)? {
bail!("that group already exists (channel {})!", grp.channel);
}
if let Some(grp) = self.store.get_group_by_chan_opt(&chan)? {
bail!("that channel is already used for a group (jid {})!", grp.jid);
}
if !self.connected || !self.conn.is_connected() {
bail!("we aren't connected to WhatsApp!");
}
if !jid.is_group {
bail!("that jid isn't a group!");
}