Browse Source

whatsapp: add support for ack / read receipt tracking

- We now track the MessageAcks received for each WA message we send.
- If we don't receive acks for a given message, the user gets warned
  about this and can configure the bridge to auto-resend the message
  after a given delay.
- ReportFailure now generates a less ignorable PRIVMSG that mentions
  the admin, instead of a NOTICE.
- Log levels for things like contacts being changed or chat actions
  have been toned down somewhat, to avoid spamming the logs.
master
eta 3 years ago
parent
commit
cd1f3bd9e3
  1. 8
      Cargo.lock
  2. 4
      src/admin.rs
  3. 4
      src/comm.rs
  4. 10
      src/config.rs
  5. 2
      src/control.rs
  6. 3
      src/control_common.rs
  7. 2
      src/insp_s2s.rs
  8. 130
      src/whatsapp.rs

8
Cargo.lock generated

@ -1986,7 +1986,7 @@ dependencies = [ @@ -1986,7 +1986,7 @@ dependencies = [
"tokio-timer 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)",
"toml 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)",
"uuid 0.7.4 (registry+https://github.com/rust-lang/crates.io-index)",
"whatsappweb 0.2.2 (git+https://git.theta.eu.org/whatsappweb-rs.git/)",
"whatsappweb 0.3.0 (git+https://git.theta.eu.org/whatsappweb-rs.git/)",
]
[[package]]
@ -2513,8 +2513,8 @@ dependencies = [ @@ -2513,8 +2513,8 @@ dependencies = [
[[package]]
name = "whatsappweb"
version = "0.2.2"
source = "git+https://git.theta.eu.org/whatsappweb-rs.git/#5f2be2ca6ea04dab425e8e596e723f96f8f76a0a"
version = "0.3.0"
source = "git+https://git.theta.eu.org/whatsappweb-rs.git/#97a940cafd94a5259c621deacd5dc120791e1640"
dependencies = [
"base64 0.10.1 (registry+https://github.com/rust-lang/crates.io-index)",
"bincode 1.1.4 (registry+https://github.com/rust-lang/crates.io-index)",
@ -2872,7 +2872,7 @@ dependencies = [ @@ -2872,7 +2872,7 @@ dependencies = [
"checksum vcpkg 0.2.6 (registry+https://github.com/rust-lang/crates.io-index)" = "def296d3eb3b12371b2c7d0e83bfe1403e4db2d7a0bba324a12b21c4ee13143d"
"checksum version_check 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "914b1a6776c4c929a602fafd8bc742e06365d4bcbe48c30f9cca5824f70dc9dd"
"checksum want 0.0.6 (registry+https://github.com/rust-lang/crates.io-index)" = "797464475f30ddb8830cc529aaaae648d581f99e2036a928877dfde027ddf6b3"
"checksum whatsappweb 0.2.2 (git+https://git.theta.eu.org/whatsappweb-rs.git/)" = "<none>"
"checksum whatsappweb 0.3.0 (git+https://git.theta.eu.org/whatsappweb-rs.git/)" = "<none>"
"checksum winapi 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)" = "167dc9d6949a9b857f3451275e911c3f44255842c1f7a76f33c55103a909087a"
"checksum winapi 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)" = "f10e386af2b13e47c89e7236a7a14a086791a2b88ebad6df9bf42040195cf770"
"checksum winapi-build 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "2d315eee3b34aca4797b2da6b13ed88266e6d612562a0c46390af8299fc699bc"

4
src/admin.rs

@ -126,6 +126,7 @@ pub enum WhatsappCommand { @@ -126,6 +126,7 @@ pub enum WhatsappCommand {
Logon,
ChatList,
UpdateAll,
PrintAcks
}
impl WhatsappCommand {
pub fn help() -> &'static str {
@ -139,6 +140,8 @@ The following commands are available: @@ -139,6 +140,8 @@ The following commands are available:
This command will usually not be required, but is helpful if the bridge appears stuck.
\x02CHATS\x0f
List all WhatsApp chats you're currently part of.
\x02RECEIPTS\x0f \x0307(alias \x02ACKS\x02)\x0f
Print delivery reports for recently sent messages.
\x02REBUILD\x0f
Refresh group metadata for all WhatsApp groups you're currently in.
This command will usually not be required, and is mainly useful for debugging.
@ -151,6 +154,7 @@ The following commands are available: @@ -151,6 +154,7 @@ The following commands are available:
("logon", _) => Some(WhatsappCommand::Logon),
("chats", _) => Some(WhatsappCommand::ChatList),
("rebuild", _) => Some(WhatsappCommand::UpdateAll),
("receipts", _) | ("acks", _) => Some(WhatsappCommand::PrintAcks),
_ => None
}
}

4
src/comm.rs

@ -43,7 +43,9 @@ pub enum WhatsappCommand { @@ -43,7 +43,9 @@ pub enum WhatsappCommand {
PersistentChanged(WaPersistentSession),
Disconnect(WaDisconnectReason),
Message(bool, Box<WaMessage>),
MediaFinished(MediaResult)
MediaFinished(MediaResult),
CheckAcks,
PrintAcks
}
#[allow(dead_code)]
pub enum ContactFactoryCommand {

10
src/config.rs

@ -40,7 +40,15 @@ pub struct WhatsappConfig { @@ -40,7 +40,15 @@ pub struct WhatsappConfig {
#[serde(default)]
pub media_path: Option<String>,
#[serde(default)]
pub autocreate_prefix: Option<String>
pub autocreate_prefix: Option<String>,
#[serde(default)]
pub ack_check_interval: Option<u64>,
#[serde(default)]
pub ack_warn_ms: Option<u64>,
#[serde(default)]
pub ack_expiry_ms: Option<u64>,
#[serde(default)]
pub ack_resend_ms: Option<u64>
}
#[derive(Deserialize, Debug, Clone)]
pub struct IrcClientConfig {

2
src/control.rs

@ -149,7 +149,7 @@ impl ControlBot { @@ -149,7 +149,7 @@ impl ControlBot {
}
},
ReportFailure(err) => {
self.irc.0.send_notice(&self.admin, &format!("\x02\x0304{}\x0f", err))?;
self.irc.0.send_privmsg(&self.admin, &format!("{}: \x02\x0304{}\x0f", self.admin, err))?;
},
CommandResponse(resp) => {
self.control_response(&resp)?;

3
src/control_common.rs

@ -80,7 +80,8 @@ pub trait ControlCommon { @@ -80,7 +80,8 @@ pub trait ControlCommon {
Setup => WhatsappCommand::StartRegistration,
Logon => WhatsappCommand::LogonIfSaved,
ChatList => WhatsappCommand::GroupList,
UpdateAll => WhatsappCommand::GroupUpdateAll
UpdateAll => WhatsappCommand::GroupUpdateAll,
PrintAcks => WhatsappCommand::PrintAcks
};
self.wa_tx().unbounded_send(cts)
.unwrap();

2
src/insp_s2s.rs

@ -626,7 +626,7 @@ impl InspLink { @@ -626,7 +626,7 @@ impl InspLink {
},
ReportFailure(fail) => {
if let Some(admu) = self.admin_uuid() {
let line = Message::new(Some(&self.cfg.sid), "NOTICE", vec![&admu], Some(&format!("\x02\x0304{}\x0f", fail)))?;
let line = Message::new(Some(&self.cfg.sid), "PRIVMSG", vec![&admu], Some(&format!("{}: \x02\x0304{}\x0f", self.cfg.admin_nick, fail)))?;
self.send(line);
}
else {

130
src/whatsapp.rs

@ -10,7 +10,7 @@ use whatsappweb::connection::UserData as WaUserData; @@ -10,7 +10,7 @@ use whatsappweb::connection::UserData as WaUserData;
use whatsappweb::connection::PersistentSession as WaPersistentSession;
use whatsappweb::connection::DisconnectReason as WaDisconnectReason;
use whatsappweb::message::ChatMessage as WaMessage;
use whatsappweb::message::{ChatMessageContent, MessageId, Peer};
use whatsappweb::message::{ChatMessageContent, MessageId, Peer, MessageAckLevel};
use huawei_modem::pdu::PduAddress;
use futures::sync::mpsc::{UnboundedSender, UnboundedReceiver};
use std::collections::HashMap;
@ -25,6 +25,9 @@ use failure::Error; @@ -25,6 +25,9 @@ use failure::Error;
use crate::models::Recipient;
use crate::whatsapp_media::{MediaInfo, MediaResult};
use regex::{Regex, Captures};
use chrono::prelude::*;
use tokio_timer::Interval;
use std::time::{Instant, Duration};
struct WhatsappHandler {
tx: Arc<UnboundedSender<WhatsappCommand>>
@ -51,6 +54,13 @@ impl WhatsappWebHandler for WhatsappHandler { @@ -51,6 +54,13 @@ impl WhatsappWebHandler for WhatsappHandler {
.unwrap();
}
}
struct MessageSendStatus {
ack_level: Option<MessageAckLevel>,
sent_ts: DateTime<Utc>,
content: ChatMessageContent,
destination: Jid,
alerted: bool
}
pub struct WhatsappManager {
conn: Option<WhatsappWebConnection<WhatsappHandler>>,
rx: UnboundedReceiver<WhatsappCommand>,
@ -59,7 +69,11 @@ pub struct WhatsappManager { @@ -59,7 +69,11 @@ pub struct WhatsappManager {
cb_tx: UnboundedSender<ControlBotCommand>,
contacts: HashMap<Jid, WaContact>,
chats: HashMap<Jid, WaChat>,
outgoing_messages: HashMap<String, MessageSendStatus>,
state: WaState,
ack_warn: u64,
ack_expiry: u64,
ack_resend: Option<u64>,
connected: bool,
store: Store,
qr_path: String,
@ -91,16 +105,35 @@ impl WhatsappManager { @@ -91,16 +105,35 @@ impl WhatsappManager {
let qr_path = format!("{}/qr.png", media_path);
let dl_path = p.cfg.whatsapp.dl_path.clone().unwrap_or("file:///tmp/wa_media".into());
let autocreate = p.cfg.whatsapp.autocreate_prefix.clone();
let ack_ivl = p.cfg.whatsapp.ack_check_interval.unwrap_or(10000);
let ack_warn_ms = p.cfg.whatsapp.ack_warn_ms.unwrap_or(5000);
let ack_expiry_ms = p.cfg.whatsapp.ack_warn_ms.unwrap_or(60000);
let ack_resend_ms = p.cfg.whatsapp.ack_resend_ms.clone();
wa_tx.unbounded_send(WhatsappCommand::LogonIfSaved)
.unwrap();
let wa_tx = Arc::new(wa_tx);
let ivl_tx = wa_tx.clone();
let timer = Interval::new(Instant::now(), Duration::new(ack_ivl, 0))
.map_err(|e| {
error!("Message ack timer failed: {}", e);
panic!("timer failed!");
}).for_each(move |_| {
ivl_tx.unbounded_send(WhatsappCommand::CheckAcks).unwrap();
Ok(())
});
p.hdl.spawn(timer);
Self {
conn: None,
contacts: HashMap::new(),
chats: HashMap::new(),
outgoing_messages: HashMap::new(),
state: WaState::Uninitialized,
connected: false,
our_jid: None,
wa_tx: Arc::new(wa_tx),
ack_warn: ack_warn_ms,
ack_expiry: ack_expiry_ms,
ack_resend: ack_resend_ms,
wa_tx,
rx, cf_tx, cb_tx, qr_path, store, media_path, dl_path, autocreate
}
}
@ -128,9 +161,63 @@ impl WhatsappManager { @@ -128,9 +161,63 @@ impl WhatsappManager {
}
},
MediaFinished(r) => self.media_finished(r)?,
CheckAcks => self.check_acks()?,
PrintAcks => self.print_acks()?
}
Ok(())
}
fn print_acks(&mut self) -> Result<()> {
self.cb_respond("Message receipts:".into());
let now = Utc::now();
let mut lines = vec![];
for (mid, mss) in self.outgoing_messages.iter_mut() {
let delta = now - mss.sent_ts;
let mut summary = mss.content.quoted_description();
if summary.len() > 10 {
summary.truncate(10);
summary.push_str("…");
}
lines.push(format!("- \"\x1d{}\x1d\" to \x02{}\x02 ({}s ago) is \x02{:?}\x02",
summary, mss.destination.to_string(), delta.num_seconds(), mss.ack_level));
lines.push(format!(" (message ID \x11{}\x0f)", mid));
}
for line in lines {
self.cb_respond(line);
}
Ok(())
}
fn check_acks(&mut self) -> Result<()> {
trace!("Checking acks");
let now = Utc::now();
let mut to_resend = vec![];
for (mid, mss) in self.outgoing_messages.iter_mut() {
let delta = now - mss.sent_ts;
let delta_ms = delta.num_milliseconds() as u64;
if mss.ack_level.is_none() {
if delta_ms >= self.ack_warn && !mss.alerted {
warn!("Message {} has been un-acked for {} seconds!", mid, delta.num_seconds());
self.cb_tx.unbounded_send(ControlBotCommand::ReportFailure(format!("Warning: Message ID {} apparently hasn't been sent yet!", mid)))
.unwrap();
mss.alerted = true;
}
if let Some(rs) = self.ack_resend {
if delta_ms >= rs {
to_resend.push(mid.clone());
}
}
}
}
for mid in to_resend {
let mss = self.outgoing_messages.remove(&mid).unwrap();
warn!("Resending un-acked message {} (!)", mid);
self.cb_tx.unbounded_send(ControlBotCommand::ReportFailure(format!("Warning: Resending message with ID {}", mid)))
.unwrap();
self.send_message(mss.content, mss.destination);
}
let ack_expiry = self.ack_expiry;
self.outgoing_messages.retain(|_, m| (now - m.sent_ts).num_milliseconds() as u64 > ack_expiry);
Ok(())
}
fn media_finished(&mut self, r: MediaResult) -> Result<()> {
match r.result {
Ok(ret) => {
@ -196,6 +283,20 @@ impl WhatsappManager { @@ -196,6 +283,20 @@ impl WhatsappManager {
self.cb_respond(format!("NB: The code is only valid for a few seconds, so scan quickly!"));
Ok(())
}
fn send_message(&mut self, content: ChatMessageContent, jid: Jid) {
let (c, j) = (content.clone(), jid.clone());
let mss = MessageSendStatus {
ack_level: None,
sent_ts: Utc::now(),
content,
destination: jid,
alerted: false
};
let mid = self.conn.as_mut().unwrap()
.send_message(c, j);
debug!("Send to {}: message ID {}", mss.destination.to_string(), mid.0);
self.outgoing_messages.insert(mid.0, mss);
}
fn send_direct_message(&mut self, addr: PduAddress, content: String) -> Result<()> {
debug!("Sending direct message to {}...", addr);
trace!("Message contents: {}", content);
@ -208,9 +309,7 @@ impl WhatsappManager { @@ -208,9 +309,7 @@ impl WhatsappManager {
match Jid::from_phonenumber(format!("{}", addr)) {
Ok(jid) => {
let content = ChatMessageContent::Text(content);
self.conn.as_mut().unwrap()
.send_message(content, jid);
debug!("WA direct message sent (probably)");
self.send_message(content, jid);
},
Err(e) => {
warn!("Couldn't send WA message to {}: {}", addr, e);
@ -232,9 +331,7 @@ impl WhatsappManager { @@ -232,9 +331,7 @@ impl WhatsappManager {
if let Some(grp) = self.store.get_group_by_chan_opt(&chan)? {
let jid = grp.jid.parse().expect("bad jid in DB");
let content = ChatMessageContent::Text(content);
self.conn.as_mut().unwrap()
.send_message(content, jid);
debug!("WA group message sent (probably)");
self.send_message(content, jid);
}
else {
error!("Tried to send WA message to nonexistent group {}", chan);
@ -668,11 +765,11 @@ impl WhatsappManager { @@ -668,11 +765,11 @@ impl WhatsappManager {
}
},
ContactAddChange(ct) => {
info!("Contact {} added or modified", ct.jid.to_string());
debug!("Contact {} added or modified", ct.jid.to_string());
self.contacts.insert(ct.jid.clone(), ct);
},
ContactDelete(jid) => {
info!("Contact {} deleted", jid.to_string());
debug!("Contact {} deleted", jid.to_string());
self.contacts.remove(&jid);
},
Chats(cts) => {
@ -685,10 +782,10 @@ impl WhatsappManager { @@ -685,10 +782,10 @@ impl WhatsappManager {
use whatsappweb::ChatAction::*;
match act {
Remove => {
info!("Chat {} removed", jid.to_string());
debug!("Chat {} removed", jid.to_string());
self.chats.remove(&jid);
},
act => info!("Chat {} action: {:?}", jid.to_string(), act)
act => debug!("Chat {} action: {:?}", jid.to_string(), act)
}
},
UserJid(jid) => {
@ -721,8 +818,13 @@ impl WhatsappManager { @@ -721,8 +818,13 @@ impl WhatsappManager {
}
},
MessageAck(ack) => {
// TODO: make something more of this
debug!("Message ack: {:?}", ack);
if let Some(mss) = self.outgoing_messages.get_mut(&ack.id.0) {
debug!("Ack known message {} at level: {:?}", ack.id.0, ack.level);
mss.ack_level = Some(ack.level);
}
else {
debug!("Ack unknown message {} at level: {:?}", ack.id.0, ack.level);
}
},
GroupIntroduce { newly_created, meta, .. } => {
let is_new = if newly_created { " newly created" } else { "" };

Loading…
Cancel
Save