Browse Source

Display WA backlog messages received while bridge was offline

- We now store each WhatsApp message ID we've received in the database,
  so we can determine whether an incoming message has already been
  delivered or not.
- 'Old' messages (those without the 'new' flag) are now delivered,
  instead of merely being thrown away, with a timestamp prepended to
  indicate that they're part of the backlog.
- A new config option `backlog_start` under the [whatsapp] section
  enables users to hide backlog messages before a given timestamp, specified
  in ISO 8601 format. This is useful if you're migrating to this version
  and don't want a million copies of all your messages to be sent.
- The `ignore_other_libraries` config flag was fixed to actually work.
master
eta 3 years ago
parent
commit
e62a533c27
  1. 1
      Cargo.lock
  2. 5
      Cargo.toml
  3. 2
      config.example.toml
  4. 1
      migrations/2019-07-03-140351_wa_msgids/down.sql
  5. 3
      migrations/2019-07-03-140351_wa_msgids/up.sql
  6. 4
      src/config.rs
  7. 3
      src/logging.rs
  8. 7
      src/models.rs
  9. 16
      src/schema.rs
  10. 19
      src/store.rs
  11. 61
      src/whatsapp.rs

1
Cargo.lock generated

@ -186,6 +186,7 @@ dependencies = [ @@ -186,6 +186,7 @@ dependencies = [
"libc 0.2.58 (registry+https://github.com/rust-lang/crates.io-index)",
"num-integer 0.1.41 (registry+https://github.com/rust-lang/crates.io-index)",
"num-traits 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)",
"serde 1.0.94 (registry+https://github.com/rust-lang/crates.io-index)",
"time 0.1.42 (registry+https://github.com/rust-lang/crates.io-index)",
]

5
Cargo.toml

@ -5,7 +5,6 @@ edition = "2018" @@ -5,7 +5,6 @@ edition = "2018"
version = "0.1.0"
[dependencies]
chrono = "0.4"
derive_builder = "0.7"
diesel_migrations = "1.4"
failure = "0.1"
@ -31,6 +30,10 @@ tokio-signal = "0.2.7" @@ -31,6 +30,10 @@ tokio-signal = "0.2.7"
tokio-timer = "0.2"
toml = "0.5"
[dependencies.chrono]
features = ["serde"]
version = "0.4"
[dependencies.diesel]
features = ["postgres", "serde_json"]
version = "1.0"

2
config.example.toml

@ -77,7 +77,7 @@ chan_loglevel = "info" @@ -77,7 +77,7 @@ chan_loglevel = "info"
## is shown. To disable this behaviour and only log sms-irc messages, change
## the below value to 'false'.
ignore_other_libraries = "false"
ignore_other_libraries = false
## MODEM SETTINGS

1
migrations/2019-07-03-140351_wa_msgids/down.sql

@ -0,0 +1 @@ @@ -0,0 +1 @@
DROP TABLE wa_msgids;

3
migrations/2019-07-03-140351_wa_msgids/up.sql

@ -0,0 +1,3 @@ @@ -0,0 +1,3 @@
CREATE TABLE wa_msgids (
mid VARCHAR PRIMARY KEY
);

4
src/config.rs

@ -48,7 +48,9 @@ pub struct WhatsappConfig { @@ -48,7 +48,9 @@ pub struct WhatsappConfig {
#[serde(default)]
pub ack_expiry_ms: Option<u64>,
#[serde(default)]
pub ack_resend_ms: Option<u64>
pub ack_resend_ms: Option<u64>,
#[serde(default)]
pub backlog_start: Option<chrono::NaiveDateTime>
}
#[derive(Deserialize, Debug, Clone)]
pub struct IrcClientConfig {

3
src/logging.rs

@ -37,6 +37,9 @@ impl log::Log for Logger { @@ -37,6 +37,9 @@ impl log::Log for Logger {
}
fn log(&self, rec: &Record) {
use log::Level::*;
if self.ignore_other_libraries && !rec.target().starts_with("sms_irc") {
return;
}
if rec.level() <= self.stdout_loglevel {
println!("[{}] {} -- {}", rec.target(), rec.level(), rec.args());
}

7
src/models.rs

@ -1,4 +1,4 @@ @@ -1,4 +1,4 @@
use crate::schema::{recipients, messages, groups, wa_persistence};
use crate::schema::{recipients, messages, groups, wa_persistence, wa_msgids};
use serde_json::Value;
#[derive(Queryable)]
@ -41,6 +41,11 @@ pub struct PersistenceData { @@ -41,6 +41,11 @@ pub struct PersistenceData {
pub rev: i32,
pub data: Value
}
#[derive(Insertable, Queryable, Debug)]
#[table_name="wa_msgids"]
pub struct WaMessageId {
pub mid: String
}
#[derive(Insertable)]
#[table_name="groups"]
pub struct NewGroup<'a> {

16
src/schema.rs

@ -30,9 +30,25 @@ table! { @@ -30,9 +30,25 @@ table! {
}
}
table! {
wa_msgids (mid) {
mid -> Varchar,
}
}
table! {
wa_persistence (rev) {
rev -> Int4,
data -> Json,
}
}
joinable!(messages -> groups (group_target));
allow_tables_to_appear_in_same_query!(
groups,
messages,
recipients,
wa_msgids,
wa_persistence,
);

19
src/store.rs

@ -119,6 +119,25 @@ impl Store { @@ -119,6 +119,25 @@ impl Store {
};
Ok(res)
}
pub fn is_wa_msgid_stored(&mut self, id: &str) -> Result<bool> {
use crate::schema::wa_msgids::dsl::*;
let conn = self.inner.get()?;
let res: Option<WaMessageId> = wa_msgids.filter(mid.eq(id))
.first(&*conn)
.optional()?;
Ok(res.is_some())
}
pub fn store_wa_msgid(&mut self, id: String) -> Result<()> {
use crate::schema::wa_msgids;
let new = WaMessageId { mid: id };
let conn = self.inner.get()?;
::diesel::insert_into(wa_msgids::table)
.values(&new)
.execute(&*conn)?;
Ok(())
}
pub fn store_recipient(&mut self, addr: &PduAddress, nick: &str, wa: bool) -> Result<Recipient> {
use crate::schema::recipients;

61
src/whatsapp.rs

@ -74,6 +74,7 @@ pub struct WhatsappManager { @@ -74,6 +74,7 @@ pub struct WhatsappManager {
ack_warn: u64,
ack_expiry: u64,
ack_resend: Option<u64>,
backlog_start: Option<chrono::NaiveDateTime>,
connected: bool,
store: Store,
qr_path: String,
@ -109,6 +110,7 @@ impl WhatsappManager { @@ -109,6 +110,7 @@ impl WhatsappManager {
let ack_warn_ms = p.cfg.whatsapp.ack_warn_ms.unwrap_or(5000);
let ack_expiry_ms = p.cfg.whatsapp.ack_warn_ms.unwrap_or(60000);
let ack_resend_ms = p.cfg.whatsapp.ack_resend_ms.clone();
let backlog_start = p.cfg.whatsapp.backlog_start.clone();
wa_tx.unbounded_send(WhatsappCommand::LogonIfSaved)
.unwrap();
let wa_tx = Arc::new(wa_tx);
@ -133,7 +135,7 @@ impl WhatsappManager { @@ -133,7 +135,7 @@ impl WhatsappManager {
ack_warn: ack_warn_ms,
ack_expiry: ack_expiry_ms,
ack_resend: ack_resend_ms,
wa_tx,
wa_tx, backlog_start,
rx, cf_tx, cb_tx, qr_path, store, media_path, dl_path, autocreate
}
}
@ -156,9 +158,7 @@ impl WhatsappManager { @@ -156,9 +158,7 @@ impl WhatsappManager {
Disconnect(war) => self.on_disconnect(war),
GotGroupMetadata(meta) => self.on_got_group_metadata(meta)?,
Message(new, msg) => {
if new {
self.on_message(msg)?;
}
self.on_message(msg, new)?;
},
MediaFinished(r) => self.media_finished(r)?,
CheckAcks => self.check_acks()?,
@ -216,7 +216,7 @@ impl WhatsappManager { @@ -216,7 +216,7 @@ impl WhatsappManager {
warn!("Resending un-acked message {} (!)", mid);
self.cb_tx.unbounded_send(ControlBotCommand::ReportFailure(format!("Warning: Resending message with ID {}", mid)))
.unwrap();
self.send_message(mss.content, mss.destination);
self.send_message(mss.content, mss.destination)?;
}
let ack_expiry = self.ack_expiry;
self.outgoing_messages.retain(|_, m| {
@ -290,7 +290,7 @@ impl WhatsappManager { @@ -290,7 +290,7 @@ impl WhatsappManager {
self.cb_respond(format!("NB: The code is only valid for a few seconds, so scan quickly!"));
Ok(())
}
fn send_message(&mut self, content: ChatMessageContent, jid: Jid) {
fn send_message(&mut self, content: ChatMessageContent, jid: Jid) -> Result<()> {
let (c, j) = (content.clone(), jid.clone());
let mss = MessageSendStatus {
ack_level: None,
@ -302,7 +302,9 @@ impl WhatsappManager { @@ -302,7 +302,9 @@ impl WhatsappManager {
let mid = self.conn.as_mut().unwrap()
.send_message(c, j);
debug!("Send to {}: message ID {}", mss.destination.to_string(), mid.0);
self.store.store_wa_msgid(mid.0.clone())?;
self.outgoing_messages.insert(mid.0, mss);
Ok(())
}
fn send_direct_message(&mut self, addr: PduAddress, content: String) -> Result<()> {
debug!("Sending direct message to {}...", addr);
@ -316,7 +318,7 @@ impl WhatsappManager { @@ -316,7 +318,7 @@ impl WhatsappManager {
match Jid::from_phonenumber(format!("{}", addr)) {
Ok(jid) => {
let content = ChatMessageContent::Text(content);
self.send_message(content, jid);
self.send_message(content, jid)?;
},
Err(e) => {
warn!("Couldn't send WA message to {}: {}", addr, e);
@ -338,7 +340,7 @@ impl WhatsappManager { @@ -338,7 +340,7 @@ impl WhatsappManager {
if let Some(grp) = self.store.get_group_by_chan_opt(&chan)? {
let jid = grp.jid.parse().expect("bad jid in DB");
let content = ChatMessageContent::Text(content);
self.send_message(content, jid);
self.send_message(content, jid)?;
}
else {
error!("Tried to send WA message to nonexistent group {}", chan);
@ -376,13 +378,36 @@ impl WhatsappManager { @@ -376,13 +378,36 @@ impl WhatsappManager {
}
Ok(None)
}
fn on_message(&mut self, msg: Box<WaMessage>) -> Result<()> {
fn on_message(&mut self, msg: Box<WaMessage>, is_new: bool) -> Result<()> {
use whatsappweb::message::{Direction};
use std::fmt::Write;
trace!("processing WA message: {:?}", msg);
let now = chrono::Utc::now().naive_utc();
trace!("processing WA message (new {}): {:?}", is_new, msg);
let msg = *msg; // otherwise stupid borrowck gets angry, because Box
let WaMessage { direction, content, id, quoted, .. } = msg;
debug!("got message from dir {:?}", direction);
let mut ts_text = String::new();
if !is_new {
debug!("message timestamp: {}", msg.time);
if let Some(ref bsf) = self.backlog_start {
if *bsf > msg.time {
debug!("Rejecting backlog message: before backlog start time");
return Ok(());
}
}
if self.store.is_wa_msgid_stored(&id.0)? {
debug!("Rejecting backlog message: already in database");
return Ok(());
}
let local = Local.from_utc_datetime(&msg.time).naive_local();
write!(&mut ts_text, "\x0315\x02[\x02")?;
if now.date() != msg.time.date() {
write!(&mut ts_text, "{} ", local.date())?;
}
write!(&mut ts_text, "{}", local.time())?;
write!(&mut ts_text, "\x02]\x02\x0f ")?;
}
let mut peer = None;
let mut is_ours = false;
let (from, group) = match direction {
@ -395,6 +420,7 @@ impl WhatsappManager { @@ -395,6 +420,7 @@ impl WhatsappManager {
}
else {
info!("Received self-message in a 1-to-1 chat, ignoring...");
self.store.store_wa_msgid(id.0.clone())?;
return Ok(());
};
(ojid, group)
@ -433,14 +459,14 @@ impl WhatsappManager { @@ -433,14 +459,14 @@ impl WhatsappManager {
};
let mut is_media = false;
let text = match content {
ChatMessageContent::Text(s) => self.process_message(&s),
ChatMessageContent::Text(s) => format!("{}{}", ts_text, self.process_message(&s)),
ChatMessageContent::Unimplemented(mut det) => {
if det.trim() == "" {
debug!("Discarding empty unimplemented message.");
return Ok(());
}
det.truncate(128);
format!("[\x02\x0304unimplemented\x0f] {}", det)
format!("{}[\x02\x0304unimplemented\x0f] {}", ts_text, det)
},
ChatMessageContent::LiveLocation { lat, long, speed, .. } => {
// FIXME: use write!() maybe
@ -450,7 +476,7 @@ impl WhatsappManager { @@ -450,7 +476,7 @@ impl WhatsappManager {
else {
format!("broadcasting live location - https://google.com/maps?q={},{}", lat, long)
};
format!("\x01ACTION is {}\x01", spd)
format!("\x01ACTION is {}{}\x01", ts_text, spd)
},
ChatMessageContent::Location { lat, long, name, .. } => {
let place = if let Some(n) = name {
@ -459,20 +485,20 @@ impl WhatsappManager { @@ -459,20 +485,20 @@ impl WhatsappManager {
else {
"somewhere".into()
};
format!("\x01ACTION is {} - https://google.com/maps?q={},{}\x01", place, lat, long)
format!("\x01ACTION is {}{} - https://google.com/maps?q={},{}\x01", ts_text, place, lat, long)
},
ChatMessageContent::Redaction { mid } => {
// TODO: make this more useful
format!("\x01ACTION redacted message ID \x11{}\x11\x01", mid.0)
format!("\x01ACTION {}redacted message ID \x11{}\x11\x01", ts_text, mid.0)
},
ChatMessageContent::Contact { display_name, vcard } => {
match crate::whatsapp_media::store_contact(&self.media_path, &self.dl_path, vcard) {
Ok(link) => {
format!("\x01ACTION uploaded a contact for '{}' - {}\x01", display_name, link)
format!("\x01ACTION {}uploaded a contact for '{}' - {}\x01", ts_text, display_name, link)
},
Err(e) => {
warn!("Failed to save contact card: {}", e);
format!("\x01ACTION uploaded a contact for '{}' (couldn't download)\x01", display_name)
format!("\x01ACTION {}uploaded a contact for '{}' (couldn't download)\x01", ts_text, display_name)
}
}
},
@ -515,6 +541,7 @@ impl WhatsappManager { @@ -515,6 +541,7 @@ impl WhatsappManager {
self.store_message(&from, &quote, group)?;
}
self.store_message(&from, &text, group)?;
self.store.store_wa_msgid(id.0.clone())?;
if let Some(p) = peer {
if let Some(ref mut conn) = self.conn {
if !is_media && !is_ours {

Loading…
Cancel
Save