Browse Source

Reorganize ack and message processing out into their own submodules

master
eta 4 years ago
parent
commit
9d402b064e
  1. 74
      Cargo.lock
  2. 2
      Cargo.toml
  3. 1
      src/comm.rs
  4. 2
      src/main.rs
  5. 2
      src/store.rs
  6. 340
      src/whatsapp.rs
  7. 139
      src/whatsapp_ack.rs
  8. 191
      src/whatsapp_msg.rs

74
Cargo.lock generated

@ -298,38 +298,6 @@ dependencies = [ @@ -298,38 +298,6 @@ dependencies = [
"generic-array 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "darling"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"darling_core 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)",
"darling_macro 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "darling_core"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"fnv 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)",
"ident_case 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)",
"proc-macro2 0.4.30 (registry+https://github.com/rust-lang/crates.io-index)",
"quote 0.6.13 (registry+https://github.com/rust-lang/crates.io-index)",
"strsim 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)",
"syn 0.15.42 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "darling_macro"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"darling_core 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)",
"quote 0.6.13 (registry+https://github.com/rust-lang/crates.io-index)",
"syn 0.15.42 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "deflate"
version = "0.7.20"
@ -339,29 +307,6 @@ dependencies = [ @@ -339,29 +307,6 @@ dependencies = [
"byteorder 1.3.2 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "derive_builder"
version = "0.7.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"darling 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)",
"derive_builder_core 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)",
"proc-macro2 0.4.30 (registry+https://github.com/rust-lang/crates.io-index)",
"quote 0.6.13 (registry+https://github.com/rust-lang/crates.io-index)",
"syn 0.15.42 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "derive_builder_core"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"darling 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)",
"proc-macro2 0.4.30 (registry+https://github.com/rust-lang/crates.io-index)",
"quote 0.6.13 (registry+https://github.com/rust-lang/crates.io-index)",
"syn 0.15.42 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "derive_is_enum_variant"
version = "0.1.1"
@ -779,11 +724,6 @@ dependencies = [ @@ -779,11 +724,6 @@ dependencies = [
"tokio-io 0.1.12 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "ident_case"
version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "idna"
version = "0.1.5"
@ -2015,8 +1955,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" @@ -2015,8 +1955,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
name = "sms-irc"
version = "0.2.0-pre"
dependencies = [
"base64 0.10.1 (registry+https://github.com/rust-lang/crates.io-index)",
"chrono 0.4.7 (registry+https://github.com/rust-lang/crates.io-index)",
"derive_builder 0.7.2 (registry+https://github.com/rust-lang/crates.io-index)",
"diesel 1.4.2 (registry+https://github.com/rust-lang/crates.io-index)",
"diesel_migrations 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)",
"failure 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)",
@ -2085,11 +2025,6 @@ dependencies = [ @@ -2085,11 +2025,6 @@ dependencies = [
"unicode-normalization 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "strsim"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "syn"
version = "0.11.11"
@ -2714,12 +2649,7 @@ dependencies = [ @@ -2714,12 +2649,7 @@ dependencies = [
"checksum crossbeam-queue 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "7c979cd6cfe72335896575c6b5688da489e420d36a27a0b9eb0c73db574b4a4b"
"checksum crossbeam-utils 0.6.6 (registry+https://github.com/rust-lang/crates.io-index)" = "04973fa96e96579258a5091af6003abde64af786b860f18622b82e026cca60e6"
"checksum crypto-mac 0.5.2 (registry+https://github.com/rust-lang/crates.io-index)" = "0999b4ff4d3446d4ddb19a63e9e00c1876e75cd7000d20e57a693b4b3f08d958"
"checksum darling 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)" = "fcfbcb0c5961907597a7d1148e3af036268f2b773886b8bb3eeb1e1281d3d3d6"
"checksum darling_core 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)" = "6afc018370c3bff3eb51f89256a6bdb18b4fdcda72d577982a14954a7a0b402c"
"checksum darling_macro 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)" = "c6d8dac1c6f1d29a41c4712b4400f878cb4fcc4c7628f298dd75038e024998d1"
"checksum deflate 0.7.20 (registry+https://github.com/rust-lang/crates.io-index)" = "707b6a7b384888a70c8d2e8650b3e60170dfc6a67bb4aa67b6dfca57af4bedb4"
"checksum derive_builder 0.7.2 (registry+https://github.com/rust-lang/crates.io-index)" = "3ac53fa6a3cda160df823a9346442525dcaf1e171999a1cf23e67067e4fd64d4"
"checksum derive_builder_core 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)" = "0288a23da9333c246bb18c143426074a6ae96747995c5819d2947b64cd942b37"
"checksum derive_is_enum_variant 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "d0ac8859845146979953797f03cc5b282fb4396891807cdb3d04929a88418197"
"checksum diesel 1.4.2 (registry+https://github.com/rust-lang/crates.io-index)" = "8d24935ba50c4a8dc375a0fd1f8a2ba6bdbdc4125713126a74b965d6a01a06d7"
"checksum diesel_derives 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "62a27666098617d52c487a41f70de23d44a1dc1f3aa5877ceba2790fb1f1cab4"
@ -2766,7 +2696,6 @@ dependencies = [ @@ -2766,7 +2696,6 @@ dependencies = [
"checksum hyper 0.10.16 (registry+https://github.com/rust-lang/crates.io-index)" = "0a0652d9a2609a968c14be1a9ea00bf4b1d64e2e1f53a1b51b6fff3a6e829273"
"checksum hyper 0.12.33 (registry+https://github.com/rust-lang/crates.io-index)" = "7cb44cbce9d8ee4fb36e4c0ad7b794ac44ebaad924b9c8291a63215bb44c2c8f"
"checksum hyper-tls 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)" = "3a800d6aa50af4b5850b2b0f659625ce9504df908e9733b635720483be26174f"
"checksum ident_case 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)" = "b9e0384b61958566e926dc50660321d12159025e767c18e043daf26b70104c39"
"checksum idna 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "38f09e0f0b1fb55fdee1f17470ad800da77af5186a1a76c026b679358b7e844e"
"checksum image 0.21.2 (registry+https://github.com/rust-lang/crates.io-index)" = "99198e595d012efccf12abf4abc08da2d97be0b0355a2b08d101347527476ba4"
"checksum indexmap 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)" = "7e81a7c05f79578dbc15793d8b619db9ba32b4577003ef3af1a91c416798c58d"
@ -2909,7 +2838,6 @@ dependencies = [ @@ -2909,7 +2838,6 @@ dependencies = [
"checksum stable_deref_trait 1.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "dba1a27d3efae4351c8051072d619e3ade2820635c3958d826bfea39d59b54c8"
"checksum string 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "d24114bfcceb867ca7f71a0d3fe45d45619ec47a6fbfa98cb14e14250bfa5d6d"
"checksum stringprep 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "8ee348cb74b87454fff4b551cbf727025810a004f88aeacae7f85b87f4e9a1c1"
"checksum strsim 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)" = "bb4f380125926a99e52bc279241539c018323fab05ad6368b56f93d9369ff550"
"checksum syn 0.11.11 (registry+https://github.com/rust-lang/crates.io-index)" = "d3b891b9015c88c576343b9b3e41c2c11a51c219ef067b264bd9c8aa9b441dad"
"checksum syn 0.12.15 (registry+https://github.com/rust-lang/crates.io-index)" = "c97c05b8ebc34ddd6b967994d5c6e9852fa92f8b82b3858c39451f97346dcce5"
"checksum syn 0.15.42 (registry+https://github.com/rust-lang/crates.io-index)" = "eadc09306ca51a40555dd6fc2b415538e9e18bc9f870e47b1a524a79fe2dcf5e"

2
Cargo.toml

@ -7,7 +7,7 @@ description = "A WhatsApp/SMS to IRC bridge" @@ -7,7 +7,7 @@ description = "A WhatsApp/SMS to IRC bridge"
license = "AGPL-3.0"
[dependencies]
derive_builder = "0.7"
base64 = "0.10"
diesel_migrations = "1.4"
failure = "0.1"
futures = "0.1"

1
src/comm.rs

@ -31,7 +31,6 @@ pub enum WhatsappCommand { @@ -31,7 +31,6 @@ pub enum WhatsappCommand {
GroupRemove(String),
GroupUpdateAll,
MediaFinished(MediaResult),
CheckAcks,
PrintAcks,
MakeContact(PduAddress),
SubscribePresence(PduAddress)

2
src/main.rs

@ -27,6 +27,8 @@ mod sender_common; @@ -27,6 +27,8 @@ mod sender_common;
mod whatsapp;
mod whatsapp_media;
mod whatsapp_conn;
mod whatsapp_msg;
mod whatsapp_ack;
mod insp_s2s;
mod insp_user;
mod irc_s2c;

2
src/store.rs

@ -139,6 +139,8 @@ impl Store { @@ -139,6 +139,8 @@ impl Store {
::diesel::insert_into(wa_msgids::table)
.values(&new)
.on_conflict(wa_msgids::dsl::mid)
.do_nothing()
.execute(&*conn)?;
Ok(())
}

340
src/whatsapp.rs

@ -5,7 +5,7 @@ use whatsappweb::Contact as WaContact; @@ -5,7 +5,7 @@ use whatsappweb::Contact as WaContact;
use whatsappweb::Chat as WaChat;
use whatsappweb::GroupMetadata;
use whatsappweb::message::ChatMessage as WaMessage;
use whatsappweb::message::{ChatMessageContent, MessageId, Peer, MessageAckLevel};
use whatsappweb::message::{ChatMessageContent, Peer};
use whatsappweb::session::PersistentSession as WaPersistentSession;
use whatsappweb::event::WaEvent;
use whatsappweb::req::WaRequest;
@ -19,48 +19,33 @@ use image::Luma; @@ -19,48 +19,33 @@ use image::Luma;
use qrcode::QrCode;
use futures::{Future, Async, Poll, Stream, Sink};
use failure::Error;
use regex::{Regex, Captures};
use chrono::prelude::*;
use tokio_timer::Interval;
use std::time::{Instant, Duration};
use unicode_segmentation::UnicodeSegmentation;
use std::collections::VecDeque;
use crate::comm::{WhatsappCommand, ContactFactoryCommand, ControlBotCommand, InitParameters};
use crate::util::{self, Result};
use crate::models::Recipient;
use crate::whatsapp_media::{MediaInfo, MediaResult};
use crate::whatsapp_media::MediaResult;
use crate::store::Store;
use crate::whatsapp_conn::{WebConnectionWrapper, WebConnectionWrapperConfig};
use crate::whatsapp_msg::{IncomingMessage, WaMessageProcessor};
use crate::whatsapp_ack::WaAckTracker;
struct MessageSendStatus {
ack_level: Option<MessageAckLevel>,
sent_ts: DateTime<Utc>,
content: ChatMessageContent,
destination: Jid,
alerted: bool,
alerted_pending: bool,
}
pub struct WhatsappManager {
conn: WebConnectionWrapper,
rx: UnboundedReceiver<WhatsappCommand>,
wa_tx: Arc<UnboundedSender<WhatsappCommand>>,
cf_tx: UnboundedSender<ContactFactoryCommand>,
cb_tx: UnboundedSender<ControlBotCommand>,
contacts: HashMap<Jid, WaContact>,
chats: HashMap<Jid, WaChat>,
presence_requests: HashMap<Jid, Instant>,
outgoing_messages: HashMap<String, MessageSendStatus>,
ack_warn: u64,
ack_warn_pending: u64,
ack_expiry: u64,
ack_resend: Option<u64>,
msgproc: WaMessageProcessor,
ackp: WaAckTracker,
backlog_start: Option<chrono::NaiveDateTime>,
connected: bool,
store: Store,
qr_path: String,
media_path: String,
dl_path: String,
autocreate: Option<String>,
autoupdate_nicks: bool,
mark_read: bool,
@ -108,12 +93,14 @@ impl Future for WhatsappManager { @@ -108,12 +93,14 @@ impl Future for WhatsappManager {
self.conn.poll_complete()?;
}
}
self.ackp.poll()?;
Ok(Async::NotReady)
}
}
impl WhatsappManager {
pub fn new<T>(p: InitParameters<T>) -> Self {
let store = p.store;
let ackp = WaAckTracker::new(&p);
let store = p.store.clone();
let wa_tx = p.cm.wa_tx.clone();
let rx = p.cm.wa_rx.take().unwrap();
let cf_tx = p.cm.cf_tx.clone();
@ -122,48 +109,33 @@ impl WhatsappManager { @@ -122,48 +109,33 @@ impl WhatsappManager {
let qr_path = format!("{}/qr.png", media_path);
let dl_path = p.cfg.whatsapp.dl_path.clone().unwrap_or("file:///tmp/wa_media".into());
let autocreate = p.cfg.whatsapp.autocreate_prefix.clone();
let ack_ivl = p.cfg.whatsapp.ack_check_interval.unwrap_or(10);
let ack_warn_ms = p.cfg.whatsapp.ack_warn_ms.unwrap_or(5000);
let ack_warn_pending_ms = p.cfg.whatsapp.ack_warn_pending_ms.unwrap_or(ack_warn_ms * 2);
let ack_expiry_ms = p.cfg.whatsapp.ack_expiry_ms.unwrap_or(60000);
let ack_resend_ms = p.cfg.whatsapp.ack_resend_ms.clone();
let backlog_start = p.cfg.whatsapp.backlog_start.clone();
let mark_read = p.cfg.whatsapp.mark_read;
let autoupdate_nicks = p.cfg.whatsapp.autoupdate_nicks;
let backoff_time_ms = p.cfg.whatsapp.backoff_time_ms.unwrap_or(10000);
let track_presence = p.cfg.whatsapp.track_presence;
wa_tx.unbounded_send(WhatsappCommand::LogonIfSaved)
.unwrap();
let wa_tx = Arc::new(wa_tx);
let ivl_tx = wa_tx.clone();
let timer = Interval::new(Instant::now(), Duration::new(ack_ivl, 0))
.map_err(|e| {
error!("Message ack timer failed: {}", e);
panic!("timer failed!");
}).for_each(move |_| {
ivl_tx.unbounded_send(WhatsappCommand::CheckAcks).unwrap();
Ok(())
});
p.hdl.spawn(timer);
let msgproc = WaMessageProcessor { store: store.clone(), media_path, dl_path, wa_tx };
let conn = WebConnectionWrapper::new(WebConnectionWrapperConfig {
backoff_time_ms
});
Self {
conn,
contacts: HashMap::new(),
chats: HashMap::new(),
outgoing_messages: HashMap::new(),
connected: false,
our_jid: None,
presence_requests: HashMap::new(),
ack_warn: ack_warn_ms,
ack_warn_pending: ack_warn_pending_ms,
ack_expiry: ack_expiry_ms,
ack_resend: ack_resend_ms,
outbox: VecDeque::new(),
wa_tx, backlog_start,
rx, cf_tx, cb_tx, qr_path, store, media_path, dl_path, autocreate,
mark_read, autoupdate_nicks, track_presence
backlog_start,
rx, cf_tx, cb_tx, qr_path, store, msgproc, autocreate,
mark_read, autoupdate_nicks, track_presence, ackp
}
}
fn handle_int_rx(&mut self, c: WhatsappCommand) -> Result<()> {
@ -179,7 +151,6 @@ impl WhatsappManager { @@ -179,7 +151,6 @@ impl WhatsappManager {
GroupUpdateAll => self.group_update_all()?,
GroupRemove(grp) => self.group_remove(grp)?,
MediaFinished(r) => self.media_finished(r)?,
CheckAcks => self.check_acks()?,
PrintAcks => self.print_acks()?,
MakeContact(a) => self.make_contact(a)?,
SubscribePresence(a) => self.subscribe_presence(a)?
@ -218,84 +189,11 @@ impl WhatsappManager { @@ -218,84 +189,11 @@ impl WhatsappManager {
Ok(())
}
fn print_acks(&mut self) -> Result<()> {
let now = Utc::now();
let mut lines = vec![];
for (mid, mss) in self.outgoing_messages.iter_mut() {
let delta = now - mss.sent_ts;
let mut summary = mss.content.quoted_description();
if summary.len() > 15 {
summary = summary.graphemes(true)
.take(10)
.chain(std::iter::once("…"))
.collect();
}
let al: std::borrow::Cow<str> = match mss.ack_level {
Some(al) => format!("{:?}", al).into(),
None => "undelivered".into()
};
lines.push(format!("- \"\x1d{}\x1d\" to \x02{}\x02 ({}s ago) is \x02{:?}\x02",
summary, mss.destination.to_string(), delta.num_seconds(), al));
lines.push(format!(" (message ID \x11{}\x0f)", mid));
}
if lines.len() == 0 {
self.cb_respond("No outgoing messages.");
return Ok(());
}
self.cb_respond("Message receipts:");
for line in lines {
for line in self.ackp.print_acks() {
self.cb_respond(line);
}
Ok(())
}
fn check_acks(&mut self) -> Result<()> {
trace!("Checking acks");
let now = Utc::now();
let mut to_resend = vec![];
for (mid, mss) in self.outgoing_messages.iter_mut() {
let delta = now - mss.sent_ts;
let delta_ms = delta.num_milliseconds() as u64;
if mss.ack_level.is_none() {
if delta_ms >= self.ack_warn && !mss.alerted {
warn!("Message {} has been un-acked for {} seconds!", mid, delta.num_seconds());
self.cb_tx.unbounded_send(ControlBotCommand::ReportFailure(format!("Warning: Sending message ID {} seems to have failed!", mid)))
.unwrap();
mss.alerted = true;
}
if let Some(rs) = self.ack_resend {
if delta_ms >= rs {
to_resend.push(mid.clone());
}
}
}
if let Some(MessageAckLevel::PendingSend) = mss.ack_level {
if delta_ms >= self.ack_warn_pending && !mss.alerted_pending {
warn!("Message {} has been pending for {} seconds!", mid, delta.num_seconds());
self.cb_tx.unbounded_send(ControlBotCommand::ReportFailure(format!("Warning: Sending message ID {} is still pending. Is WhatsApp running and connected?", mid)))
.unwrap();
self.cb_tx.unbounded_send(ControlBotCommand::CommandResponse("For more information on pending outgoing messages, try the \x02WHATSAPP RECEIPTS\x02 command.".into()))
.unwrap();
mss.alerted_pending = true;
}
}
}
for mid in to_resend {
let mss = self.outgoing_messages.remove(&mid).unwrap();
warn!("Resending un-acked message {} (!)", mid);
self.cb_tx.unbounded_send(ControlBotCommand::ReportFailure(format!("Warning: Resending message with ID {}", mid)))
.unwrap();
self.send_message(mss.content, mss.destination)?;
}
let ack_expiry = self.ack_expiry;
self.outgoing_messages.retain(|_, m| {
if let Some(MessageAckLevel::PendingSend) | None = m.ack_level {
// Always retain the failures, so the user knows what happened with them (!)
return true;
}
let diff_ms = (now - m.sent_ts).num_milliseconds() as u64;
diff_ms < ack_expiry
});
Ok(())
}
}
fn media_finished(&mut self, r: MediaResult) -> Result<()> {
match r.result {
Ok(ret) => {
@ -309,9 +207,7 @@ impl WhatsappManager { @@ -309,9 +207,7 @@ impl WhatsappManager {
self.store_message(&r.from, msg, r.group, r.ts)?;
}
}
if let Err(e) = self.store.store_wa_msgid(r.mi.0.clone()) {
warn!("Failed to store WA msgid {} after media download: {}", r.mi.0, e);
}
self.store.store_wa_msgid(r.mi.0.clone())?;
if self.mark_read {
if let Some(p) = r.peer {
self.outbox.push_back(WaRequest::MessageRead {
@ -354,26 +250,15 @@ impl WhatsappManager { @@ -354,26 +250,15 @@ impl WhatsappManager {
}
fn send_message(&mut self, content: ChatMessageContent, jid: Jid) -> Result<()> {
let (c, j) = (content.clone(), jid.clone());
let mss = MessageSendStatus {
ack_level: None,
sent_ts: Utc::now(),
content,
destination: jid.clone(),
alerted: false,
alerted_pending: false
};
let m = WaMessage::new(j, c);
let mid = m.id.clone();
let m = WaMessage::new(jid, content);
debug!("Send to {}: message ID {}", j, m.id.0);
self.store.store_wa_msgid(m.id.0.clone())?;
self.ackp.register_send(j.clone(), c, m.id.0.clone());
self.outbox.push_back(WaRequest::SendMessage(m));
debug!("Send to {}: message ID {}", mss.destination.to_string(), mid.0);
if let Err(e) = self.store.store_wa_msgid(mid.0.clone()) {
warn!("Failed to store outgoing msgid {}: {}", mid.0, e);
}
self.outgoing_messages.insert(mid.0, mss);
if !jid.is_group && self.track_presence {
if !j.is_group && self.track_presence {
let mut update = true;
let now = Instant::now();
if let Some(inst) = self.presence_requests.get(&jid) {
if let Some(inst) = self.presence_requests.get(&j) {
// WhatsApp stops sending you presence updates after about 10 minutes.
// To avoid this, we resubscribe about every 5.
if now.duration_since(*inst) < Duration::new(300, 0) {
@ -381,9 +266,9 @@ impl WhatsappManager { @@ -381,9 +266,9 @@ impl WhatsappManager {
}
}
if update {
debug!("Requesting presence updates for {}", jid);
self.presence_requests.insert(jid.clone(), now);
self.outbox.push_back(WaRequest::SubscribePresence(jid));
debug!("Requesting presence updates for {}", j);
self.presence_requests.insert(j.clone(), now);
self.outbox.push_back(WaRequest::SubscribePresence(j));
}
}
Ok(())
@ -428,38 +313,7 @@ impl WhatsappManager { @@ -428,38 +313,7 @@ impl WhatsappManager {
error!("Tried to send WA message to nonexistent group {}", chan);
}
Ok(())
}
fn process_message<'a>(&mut self, msg: &'a str) -> String {
lazy_static! {
static ref BOLD_RE: Regex = Regex::new(r#"\*([^\*]+)\*"#).unwrap();
static ref ITALICS_RE: Regex = Regex::new(r#"_([^_]+)_"#).unwrap();
static ref MENTIONS_RE: Regex = Regex::new(r#"@(\d+)"#).unwrap();
}
let emboldened = BOLD_RE.replace_all(msg, "\x02$1\x02");
let italicised = ITALICS_RE.replace_all(&emboldened, "\x1D$1\x1D");
let store = &mut self.store;
let ret = MENTIONS_RE.replace_all(&italicised, |caps: &Captures| {
let pdua: PduAddress = caps[0].replace("@", "+").parse().unwrap();
match store.get_recipient_by_addr_opt(&pdua) {
Ok(Some(recip)) => recip.nick,
Ok(None) => format!("<+{}>", &caps[1]),
Err(e) => {
warn!("Error searching for mention recipient: {}", e);
format!("@{}", &caps[1])
}
}
});
ret.to_string()
}
fn jid_to_nick(&mut self, jid: &Jid) -> Result<Option<String>> {
if let Some(num) = jid.phonenumber() {
if let Ok(pdua) = num.parse() {
return Ok(self.store.get_recipient_by_addr_opt(&pdua)?
.map(|x| x.nick));
}
}
Ok(None)
}
}
fn on_message(&mut self, msg: WaMessage, is_new: bool) -> Result<()> {
use whatsappweb::message::{Direction};
@ -495,9 +349,7 @@ impl WhatsappManager { @@ -495,9 +349,7 @@ impl WhatsappManager {
}
else {
debug!("Received self-message in a 1-to-1 chat, ignoring...");
if let Err(e) = self.store.store_wa_msgid(id.0.clone()) {
warn!("Failed to store 1-to-1 msgid {}: {}", id.0, e);
}
self.store.store_wa_msgid(id.0.clone())?;
return Ok(());
};
(ojid, group)
@ -535,95 +387,18 @@ impl WhatsappManager { @@ -535,95 +387,18 @@ impl WhatsappManager {
},
None => None
};
let mut is_media = false;
let text = match content {
ChatMessageContent::Text(s) => self.process_message(&s),
ChatMessageContent::Unimplemented(mut det) => {
if det.trim() == "" {
debug!("Discarding empty unimplemented message.");
return Ok(());
}
if det.len() > 128 {
det = det.graphemes(true)
.take(128)
.chain(std::iter::once("…"))
.collect();
}
format!("[\x02\x0304unimplemented\x0f] {}", det)
},
ChatMessageContent::LiveLocation { lat, long, speed, .. } => {
// FIXME: use write!() maybe
let spd = if let Some(s) = speed {
format!("travelling at {:.02} m/s - https://google.com/maps?q={},{}", s, lat, long)
}
else {
format!("broadcasting live location - https://google.com/maps?q={},{}", lat, long)
};
format!("\x01ACTION is {}\x01", spd)
},
ChatMessageContent::Location { lat, long, name, .. } => {
let place = if let Some(n) = name {
format!("at '{}'", n)
}
else {
"somewhere".into()
};
format!("\x01ACTION is {} - https://google.com/maps?q={},{}\x01", place, lat, long)
},
ChatMessageContent::Redaction { mid } => {
// TODO: make this more useful
format!("\x01ACTION redacted message ID \x11{}\x11\x01", mid.0)
},
ChatMessageContent::Contact { display_name, vcard } => {
match crate::whatsapp_media::store_contact(&self.media_path, &self.dl_path, vcard) {
Ok(link) => {
format!("\x01ACTION uploaded a contact for '{}' - {}\x01", display_name, link)
},
Err(e) => {
warn!("Failed to save contact card: {}", e);
format!("\x01ACTION uploaded a contact for '{}' (couldn't download)\x01", display_name)
}
}
},
mut x @ ChatMessageContent::Image { .. } |
mut x @ ChatMessageContent::Video { .. } |
mut x @ ChatMessageContent::Audio { .. } |
mut x @ ChatMessageContent::Document { .. } => {
let capt = x.take_caption();
self.process_media(id.clone(), peer.clone(), from.clone(), group, x, msg.time)?;
is_media = true;
if let Some(c) = capt {
c
}
else {
return Ok(());
}
}
let inc = IncomingMessage {
id: id.clone(),
peer: peer.clone(),
ts: msg.time,
from, group, content, quoted
};
if let Some(qm) = quoted {
let nick = if group.is_some() {
let nick = self.jid_to_nick(&qm.participant)?
.unwrap_or(qm.participant.to_string());
format!("<{}> ", nick)
}
else {
String::new()
};
let mut message = qm.content.quoted_description();
if message.len() > 128 {
message = message.graphemes(true)
.take(128)
.chain(std::iter::once("…"))
.collect();
}
let quote = format!("\x0315> \x1d{}{}", nick, message);
self.store_message(&from, &quote, group, msg.time)?;
let (msgs, is_media) = self.msgproc.process_wa_incoming(inc)?;
for msg in msgs {
self.store_message(&msg.from, &msg.text, msg.group, msg.ts)?;
}
self.store_message(&from, &text, group, msg.time)?;
if !is_media {
if let Err(e) = self.store.store_wa_msgid(id.0.clone()) {
warn!("Failed to store received msgid {}: {}", id.0, e);
}
self.store.store_wa_msgid(id.0.clone())?;
}
if let Some(p) = peer {
if !is_media && !is_ours && self.mark_read {
@ -646,28 +421,7 @@ impl WhatsappManager { @@ -646,28 +421,7 @@ impl WhatsappManager {
warn!("couldn't make address for jid {}", from.to_string());
}
Ok(())
}
fn process_media(&mut self, id: MessageId, peer: Option<Peer>, from: Jid, group: Option<i32>, ct: ChatMessageContent, ts: NaiveDateTime) -> Result<()> {
use whatsappweb::MediaType;
let (ty, fi, name) = match ct {
ChatMessageContent::Image { info, .. } => (MediaType::Image, info, None),
ChatMessageContent::Video { info, .. } => (MediaType::Video, info, None),
ChatMessageContent::Audio { info, .. } => (MediaType::Audio, info, None),
ChatMessageContent::Document { info, filename } => (MediaType::Document, info, Some(filename)),
_ => unreachable!()
};
let mi = MediaInfo {
ty, fi, name, peer, ts,
mi: id,
from, group,
path: self.media_path.clone(),
dl_path: self.dl_path.clone(),
tx: self.wa_tx.clone()
};
mi.start();
Ok(())
}
}
fn group_list(&mut self) -> Result<()> {
let mut list = vec![];
for (jid, gmeta) in self.chats.iter() {
@ -928,7 +682,7 @@ impl WhatsappManager { @@ -928,7 +682,7 @@ impl WhatsappManager {
SessionEstablished { jid, persistent } => self.on_established(jid, persistent)?,
Message { is_new, msg } => self.on_message(msg, is_new)?,
InitialContacts(cts) => {
info!("Received initial contact list");
debug!("Received initial contact list");
for ct in cts {
self.on_contact_change(ct)?;
}
@ -948,7 +702,7 @@ impl WhatsappManager { @@ -948,7 +702,7 @@ impl WhatsappManager {
//self.contacts.remove(&jid);
},
InitialChats(cts) => {
info!("Received initial chat list");
debug!("Received initial chat list");
for ct in cts {
self.chats.insert(ct.jid.clone(), ct);
}
@ -989,13 +743,7 @@ impl WhatsappManager { @@ -989,13 +743,7 @@ impl WhatsappManager {
}
},
MessageAck(ack) => {
if let Some(mss) = self.outgoing_messages.get_mut(&ack.id.0) {
debug!("Ack known message {} at level: {:?}", ack.id.0, ack.level);
mss.ack_level = Some(ack.level);
}
else {
debug!("Ack unknown message {} at level: {:?}", ack.id.0, ack.level);
}
self.ackp.on_message_ack(ack);
},
GroupIntroduce { newly_created, meta, .. } => {
let is_new = if newly_created { " newly created" } else { "" };

139
src/whatsapp_ack.rs

@ -0,0 +1,139 @@ @@ -0,0 +1,139 @@
//! Tracks WhatsApp message acknowledgements, and alerts if we don't get any.
use tokio_timer::Interval;
use whatsappweb::Jid;
use whatsappweb::message::{ChatMessageContent, MessageAckLevel, MessageAck};
use chrono::prelude::*;
use std::collections::HashMap;
use futures::sync::mpsc::UnboundedSender;
use unicode_segmentation::UnicodeSegmentation;
use std::time::{Instant, Duration};
use futures::{Future, Async, Poll, Stream};
use failure::Error;
use crate::comm::{ControlBotCommand, InitParameters};
struct MessageSendStatus {
ack_level: Option<MessageAckLevel>,
sent_ts: DateTime<Utc>,
content: ChatMessageContent,
destination: Jid,
alerted: bool,
alerted_pending: bool,
}
pub struct WaAckTracker {
cb_tx: UnboundedSender<ControlBotCommand>,
outgoing_messages: HashMap<String, MessageSendStatus>,
ack_warn: u64,
ack_warn_pending: u64,
ack_expiry: u64,
timer: Interval,
}
impl Future for WaAckTracker {
type Item = ();
type Error = Error;
fn poll(&mut self) -> Poll<(), Error> {
while let Async::Ready(_) = self.timer.poll()? {
self.check_acks();
}
Ok(Async::NotReady)
}
}
impl WaAckTracker {
pub fn new<T>(p: &InitParameters<T>) -> Self {
let cb_tx = p.cm.cb_tx.clone();
let ack_ivl = p.cfg.whatsapp.ack_check_interval.unwrap_or(3);
let ack_warn_ms = p.cfg.whatsapp.ack_warn_ms.unwrap_or(5000);
let ack_warn_pending_ms = p.cfg.whatsapp.ack_warn_pending_ms.unwrap_or(ack_warn_ms * 2);
let ack_expiry_ms = p.cfg.whatsapp.ack_expiry_ms.unwrap_or(60000);
let timer = Interval::new(Instant::now(), Duration::new(ack_ivl, 0));
Self {
ack_warn: ack_warn_ms,
ack_warn_pending: ack_warn_pending_ms,
ack_expiry: ack_expiry_ms,
outgoing_messages: HashMap::new(),
cb_tx, timer
}
}
pub fn register_send(&mut self, to: Jid, content: ChatMessageContent, mid: String) {
let mss = MessageSendStatus {
ack_level: None,
sent_ts: Utc::now(),
content,
destination: to,
alerted: false,
alerted_pending: false
};
self.outgoing_messages.insert(mid, mss);
}
pub fn print_acks(&mut self) -> Vec<String> {
let now = Utc::now();
let mut lines = vec![];
for (mid, mss) in self.outgoing_messages.iter_mut() {
let delta = now - mss.sent_ts;
let mut summary = mss.content.quoted_description();
if summary.len() > 15 {
summary = summary.graphemes(true)
.take(10)
.chain(std::iter::once("…"))
.collect();
}
let al: std::borrow::Cow<str> = match mss.ack_level {
Some(al) => format!("{:?}", al).into(),
None => "undelivered".into()
};
lines.push(format!("- \"\x1d{}\x1d\" to \x02{}\x02 ({}s ago) is \x02{}\x02",
summary, mss.destination.to_string(), delta.num_seconds(), al));
lines.push(format!(" (message ID \x11{}\x0f)", mid));
}
if lines.len() == 0 {
lines.push("No outgoing messages".into());
}
lines
}
pub fn on_message_ack(&mut self, ack: MessageAck) {
if let Some(mss) = self.outgoing_messages.get_mut(&ack.id.0) {
debug!("Ack known message {} at level: {:?}", ack.id.0, ack.level);
mss.ack_level = Some(ack.level);
}
else {
debug!("Ack unknown message {} at level: {:?}", ack.id.0, ack.level);
}
}
fn send_fail<T: Into<String>>(cb_tx: &mut UnboundedSender<ControlBotCommand>, msg: T) {
cb_tx.unbounded_send(ControlBotCommand::ReportFailure(msg.into()))
.unwrap();
}
fn check_acks(&mut self) {
trace!("Checking acks");
let now = Utc::now();
for (mid, mss) in self.outgoing_messages.iter_mut() {
let delta = now - mss.sent_ts;
let delta_ms = delta.num_milliseconds() as u64;
if mss.ack_level.is_none() {
if delta_ms >= self.ack_warn && !mss.alerted {
warn!("Message {} has been un-acked for {} seconds!", mid, delta.num_seconds());
Self::send_fail(&mut self.cb_tx, format!("Warning: Sending message ID {} has failed, or is taking longer than usual!", mid));
mss.alerted = true;
}
}
if let Some(MessageAckLevel::PendingSend) = mss.ack_level {
if delta_ms >= self.ack_warn_pending && !mss.alerted_pending {
warn!("Message {} has been pending for {} seconds!", mid, delta.num_seconds());
Self::send_fail(&mut self.cb_tx, format!("Warning: Sending message ID {} is still pending. Is WhatsApp running and connected?", mid));
mss.alerted_pending = true;
}
}
}
let ack_expiry = self.ack_expiry;
self.outgoing_messages.retain(|_, m| {
if let Some(MessageAckLevel::PendingSend) | None = m.ack_level {
// Always retain the failures, so the user knows what happened with them (!)
return true;
}
let diff_ms = (now - m.sent_ts).num_milliseconds() as u64;
diff_ms < ack_expiry
});
}
}

191
src/whatsapp_msg.rs

@ -0,0 +1,191 @@ @@ -0,0 +1,191 @@
//! WhatsApp message processing tools.
use whatsappweb::{Jid, MediaType};
use chrono::prelude::*;
use futures::sync::mpsc::UnboundedSender;
use whatsappweb::message::{ChatMessageContent, QuotedChatMessage, MessageId, Peer};
use regex::{Regex, Captures};
use huawei_modem::pdu::PduAddress;
use std::sync::Arc;
use unicode_segmentation::UnicodeSegmentation;
use crate::comm::WhatsappCommand;
use crate::store::Store;
use crate::whatsapp_media::{MediaInfo, self};
use crate::util::Result;
pub struct IncomingMessage {
pub id: MessageId,
pub peer: Option<Peer>,
pub from: Jid,
pub group: Option<i32>,
pub content: ChatMessageContent,
pub quoted: Option<QuotedChatMessage>,
pub ts: NaiveDateTime
}
pub struct ProcessedIncomingMessage {
pub from: Jid,
pub text: String,
pub group: Option<i32>,
pub ts: NaiveDateTime
}
pub struct WaMessageProcessor {
pub(crate) store: Store,
pub(crate) media_path: String,
pub(crate) dl_path: String,
pub(crate) wa_tx: Arc<UnboundedSender<WhatsappCommand>>
}
impl WaMessageProcessor {
fn process_incoming_media(&mut self, id: MessageId, peer: Option<Peer>, from: Jid, group: Option<i32>, ct: ChatMessageContent, ts: NaiveDateTime) -> Result<()> {
let (ty, fi, name) = match ct {
ChatMessageContent::Image { info, .. } => (MediaType::Image, info, None),
ChatMessageContent::Video { info, .. } => (MediaType::Video, info, None),
ChatMessageContent::Audio { info, .. } => (MediaType::Audio, info, None),
ChatMessageContent::Document { info, filename } => (MediaType::Document, info, Some(filename)),
_ => unreachable!()
};
let mi = MediaInfo {
ty, fi, name, peer, ts,
mi: id,
from, group,
path: self.media_path.clone(),
dl_path: self.dl_path.clone(),
tx: self.wa_tx.clone()
};
mi.start();
Ok(())
}
fn process_wa_text_message<'a>(&mut self, msg: &'a str) -> String {
lazy_static! {
static ref BOLD_RE: Regex = Regex::new(r#"\*([^\*]+)\*"#).unwrap();
static ref ITALICS_RE: Regex = Regex::new(r#"_([^_]+)_"#).unwrap();
static ref MENTIONS_RE: Regex = Regex::new(r#"@(\d+)"#).unwrap();
}
let emboldened = BOLD_RE.replace_all(msg, "\x02$1\x02");
let italicised = ITALICS_RE.replace_all(&emboldened, "\x1D$1\x1D");
let store = &mut self.store;
let ret = MENTIONS_RE.replace_all(&italicised, |caps: &Captures| {
let pdua: PduAddress = caps[0].replace("@", "+").parse().unwrap();
match store.get_recipient_by_addr_opt(&pdua) {
Ok(Some(recip)) => recip.nick,
Ok(None) => format!("<+{}>", &caps[1]),
Err(e) => {
warn!("Error searching for mention recipient: {}", e);
format!("@{}", &caps[1])
}
}
});
ret.to_string()
}
fn jid_to_nick(&mut self, jid: &Jid) -> Result<Option<String>> {
if let Some(num) = jid.phonenumber() {
if let Ok(pdua) = num.parse() {
return Ok(self.store.get_recipient_by_addr_opt(&pdua)?
.map(|x| x.nick));
}
}
Ok(None)
}
pub fn process_wa_incoming(&mut self, inc: IncomingMessage) -> Result<(Vec<ProcessedIncomingMessage>, bool)> {
let IncomingMessage { id, peer, from, group, content, ts, quoted } = inc;
let mut ret = Vec::with_capacity(2);
let mut is_media = false;
let text = match content {
ChatMessageContent::Text(s) => self.process_wa_text_message(&s),
ChatMessageContent::Unimplemented(mut det) => {
if det.trim() == "" {
debug!("Discarding empty unimplemented message.");
return Ok((ret, is_media));
}
if det.len() > 128 {
det = det.graphemes(true)
.take(128)
.chain(std::iter::once("…"))
.collect();
}
format!("[\x02\x0304unimplemented\x0f] {}", det)
},
ChatMessageContent::LiveLocation { lat, long, speed, .. } => {
// FIXME: use write!() maybe
let spd = if let Some(s) = speed {
format!("travelling at {:.02} m/s - https://google.com/maps?q={},{}", s, lat, long)
}
else {
format!("broadcasting live location - https://google.com/maps?q={},{}", lat, long)
};
format!("\x01ACTION is {}\x01", spd)
},
ChatMessageContent::Location { lat, long, name, .. } => {
let place = if let Some(n) = name {
format!("at '{}'", n)
}
else {
"somewhere".into()
};
format!("\x01ACTION is {} - https://google.com/maps?q={},{}\x01", place, lat, long)
},
ChatMessageContent::Redaction { mid } => {
// TODO: make this more useful
format!("\x01ACTION redacted message ID \x11{}\x11\x01", mid.0)
},
ChatMessageContent::Contact { display_name, vcard } => {
match whatsapp_media::store_contact(&self.media_path, &self.dl_path, vcard) {
Ok(link) => {
format!("\x01ACTION uploaded a contact for '{}' - {}\x01", display_name, link)
},
Err(e) => {
warn!("Failed to save contact card: {}", e);
format!("\x01ACTION uploaded a contact for '{}' (couldn't download)\x01", display_name)
}
}
},
mut x @ ChatMessageContent::Image { .. } |
mut x @ ChatMessageContent::Video { .. } |
mut x @ ChatMessageContent::Audio { .. } |
mut x @ ChatMessageContent::Document { .. } => {
let capt = x.take_caption();
self.process_incoming_media(id.clone(), peer.clone(), from.clone(), group, x, ts)?;
is_media = true;
if let Some(c) = capt {
c
}
else {
return Ok((ret, is_media));
}
}
};
if let Some(qm) = quoted {
let nick = if group.is_some() {
let nick = self.jid_to_nick(&qm.participant)?
.unwrap_or(qm.participant.to_string());
format!("<{}> ", nick)
}
else {
String::new()
};
let mut message = qm.content.quoted_description();
if message.len() > 128 {
message = message.graphemes(true)
.take(128)
.chain(std::iter::once("…"))
.collect();
}
let quote = format!("\x0315> \x1d{}{}", nick, message);
ret.push(ProcessedIncomingMessage {
from: from.clone(),
text: quote,
group,
ts
});
}
ret.push(ProcessedIncomingMessage {
from,
text,
group,
ts
});
Ok((ret, is_media))
}
}
Loading…
Cancel
Save