Browse Source

Drastically reduce connection state logging, queue offline messages

- We now only log WA connection state information at the DEBUG level,
  to try and avoid spamming the logs every time the websocket drops
  for whatever reason (which has become more frequent as of late)
- A new mechanism enables messages sent while offline to be queued
  and sent once the connection is reestablished, making small drops
  less of a pain.
  - This in fact re-uses the ack tracker code, so the user also gets
    alerted if the message doesn't send for a while.
master
eta 4 years ago
parent
commit
7fac20c58b
  1. 65
      src/whatsapp.rs
  2. 27
      src/whatsapp_ack.rs
  3. 14
      src/whatsapp_conn.rs

65
src/whatsapp.rs

@ -5,7 +5,7 @@ use whatsappweb::Contact as WaContact; @@ -5,7 +5,7 @@ use whatsappweb::Contact as WaContact;
use whatsappweb::Chat as WaChat;
use whatsappweb::GroupMetadata;
use whatsappweb::message::ChatMessage as WaMessage;
use whatsappweb::message::{ChatMessageContent, Peer};
use whatsappweb::message::{ChatMessageContent, Peer, MessageId};
use whatsappweb::session::PersistentSession as WaPersistentSession;
use whatsappweb::event::WaEvent;
use whatsappweb::req::WaRequest;
@ -51,6 +51,7 @@ pub struct WhatsappManager { @@ -51,6 +51,7 @@ pub struct WhatsappManager {
mark_read: bool,
track_presence: bool,
our_jid: Option<Jid>,
prev_jid: Option<Jid>,
outbox: VecDeque<WaRequest>
}
impl Future for WhatsappManager {
@ -131,6 +132,7 @@ impl WhatsappManager { @@ -131,6 +132,7 @@ impl WhatsappManager {
chats: HashMap::new(),
connected: false,
our_jid: None,
prev_jid: None,
presence_requests: HashMap::new(),
outbox: VecDeque::new(),
backlog_start,
@ -248,12 +250,21 @@ impl WhatsappManager { @@ -248,12 +250,21 @@ impl WhatsappManager {
self.cb_respond("NB: The code is only valid for a few seconds, so scan quickly!");
Ok(())
}
fn queue_message(&mut self, content: ChatMessageContent, jid: Jid) {
let mid = MessageId::generate();
debug!("Queued send to {}: message ID {}", jid, mid.0);
self.ackp.register_send(jid, content, mid.0, true);
if self.conn.is_disabled() {
let err = "Warning: WhatsApp Web is currently not set up, but you've tried to send something.";
self.cb_tx.unbounded_send(ControlBotCommand::ReportFailure(err.into())).unwrap();
}
}
fn send_message(&mut self, content: ChatMessageContent, jid: Jid) -> Result<()> {
let (c, j) = (content.clone(), jid.clone());
let m = WaMessage::new(jid, content);
debug!("Send to {}: message ID {}", j, m.id.0);
self.store.store_wa_msgid(m.id.0.clone())?;
self.ackp.register_send(j.clone(), c, m.id.0.clone());
self.ackp.register_send(j.clone(), c, m.id.0.clone(), false);
self.outbox.push_back(WaRequest::SendMessage(m));
if !j.is_group && self.track_presence {
let mut update = true;
@ -276,16 +287,15 @@ impl WhatsappManager { @@ -276,16 +287,15 @@ impl WhatsappManager {
fn send_direct_message(&mut self, addr: PduAddress, content: String) -> Result<()> {
debug!("Sending direct message to {}...", addr);
trace!("Message contents: {}", content);
if !self.connected || !self.conn.is_connected() {
warn!("Tried to send WA message to {} while disconnected!", addr);
self.cb_tx.unbounded_send(ControlBotCommand::ReportFailure(format!("Failed to send to WA contact {}: disconnected from server", addr)))
.unwrap();
return Ok(());
}
match Jid::from_phonenumber(format!("{}", addr)) {
Ok(jid) => {
let content = ChatMessageContent::Text(content);
self.send_message(content, jid)?;
if !self.connected || !self.conn.is_connected() {
self.queue_message(content, jid);
}
else {
self.send_message(content, jid)?;
}
},
Err(e) => {
warn!("Couldn't send WA message to {}: {}", addr, e);
@ -298,16 +308,15 @@ impl WhatsappManager { @@ -298,16 +308,15 @@ impl WhatsappManager {
fn send_group_message(&mut self, chan: String, content: String) -> Result<()> {
debug!("Sending message to group with chan {}...", chan);
trace!("Message contents: {}", content);
if !self.connected || !self.conn.is_connected() {
warn!("Tried to send WA message to group {} while disconnected!", chan);
self.cb_tx.unbounded_send(ControlBotCommand::ReportFailure(format!("Failed to send to group {}: disconnected from server", chan)))
.unwrap();
return Ok(());
}
if let Some(grp) = self.store.get_group_by_chan_opt(&chan)? {
let jid = grp.jid.parse().expect("bad jid in DB");
let content = ChatMessageContent::Text(content);
self.send_message(content, jid)?;
if !self.connected || !self.conn.is_connected() {
self.queue_message(content, jid);
}
else {
self.send_message(content, jid)?;
}
}
else {
error!("Tried to send WA message to nonexistent group {}", chan);
@ -628,15 +637,33 @@ impl WhatsappManager { @@ -628,15 +637,33 @@ impl WhatsappManager {
Ok(())
}
fn on_established(&mut self, jid: Jid, ps: WaPersistentSession) -> Result<()> {
info!("Logged in as {}.", jid);
if self.our_jid != self.prev_jid {
info!("Logged in as {}.", jid);
if self.prev_jid.is_some() {
warn!("Logged in as a different phone number (prev was {})!", self.prev_jid.as_ref().unwrap());
let err = "Warning: You've logged in with a different phone number.";
self.cb_tx.unbounded_send(ControlBotCommand::ReportFailure(err.into())).unwrap();
}
}
else {
debug!("Logged in again after connection loss.");
}
let unsent = self.ackp.extract_unsent();
if unsent.len() > 0 {
info!("Sending {} messages sent while offline", unsent.len());
}
for mss in unsent {
self.send_message(mss.content, mss.destination)?;
}
self.store.store_wa_persistence(ps.clone())?;
self.conn.set_persistent(Some(ps));
self.our_jid = Some(jid);
self.our_jid = Some(jid.clone());
self.prev_jid = Some(jid);
self.connected = true;
Ok(())
}
fn on_wa_error(&mut self, err: WaError) {
warn!("WA connection failed: {}", err);
debug!("WA connection failed: {}", err);
if let WaError::Disconnected(reason) = err {
use self::WaDisconnectReason::*;
let reason_text = match reason {

27
src/whatsapp_ack.rs

@ -13,11 +13,13 @@ use failure::Error; @@ -13,11 +13,13 @@ use failure::Error;
use crate::comm::{ControlBotCommand, InitParameters};
struct MessageSendStatus {
#[derive(Clone)]
pub struct MessageSendStatus {
ack_level: Option<MessageAckLevel>,
sent_ts: DateTime<Utc>,
content: ChatMessageContent,
destination: Jid,
pub content: ChatMessageContent,
pub destination: Jid,
unsent: bool,
alerted: bool,
alerted_pending: bool,
}
@ -56,17 +58,26 @@ impl WaAckTracker { @@ -56,17 +58,26 @@ impl WaAckTracker {
cb_tx, timer
}
}
pub fn register_send(&mut self, to: Jid, content: ChatMessageContent, mid: String) {
pub fn register_send(&mut self, to: Jid, content: ChatMessageContent, mid: String, unsent: bool) {
let mss = MessageSendStatus {
ack_level: None,
sent_ts: Utc::now(),
content,
destination: to,
unsent: unsent,
alerted: false,
alerted_pending: false
};
self.outgoing_messages.insert(mid, mss);
}
pub fn extract_unsent(&mut self) -> Vec<MessageSendStatus> {
let ret = self.outgoing_messages.iter()
.filter(|(_, mss)| mss.unsent)
.map(|(_, x)| x.clone())
.collect();
self.outgoing_messages.retain(|_, mss| !mss.unsent);
ret
}
pub fn print_acks(&mut self) -> Vec<String> {
let now = Utc::now();
let mut lines = vec![];
@ -118,7 +129,13 @@ impl WaAckTracker { @@ -118,7 +129,13 @@ impl WaAckTracker {
if mss.ack_level.is_none() {
if delta_ms >= self.ack_warn && !mss.alerted {
warn!("Message {} has been un-acked for {} seconds!", mid, delta.num_seconds());
Self::send_fail(&mut self.cb_tx, format!("Warning: Sending message ID {} has failed, or is taking longer than usual!", mid));
if mss.unsent {
warn!("(still disconnected)");
Self::send_fail(&mut self.cb_tx, format!("Warning: Message ID {} is still unsent, because we aren't connected to WhatsApp Web.", mid));
}
else {
Self::send_fail(&mut self.cb_tx, format!("Warning: Sending message ID {} has failed, or is taking longer than usual!", mid));
}
mss.alerted = true;
}
}

14
src/whatsapp_conn.rs

@ -37,6 +37,14 @@ impl WebConnectionWrapper { @@ -37,6 +37,14 @@ impl WebConnectionWrapper {
cfg
}
}
pub fn is_disabled(&self) -> bool {
if let WrapperState::Disabled = self.inner {
true
}
else {
false
}
}
pub fn disable(&mut self) {
self.inner = WrapperState::Disabled;
}
@ -60,7 +68,7 @@ impl WebConnectionWrapper { @@ -60,7 +68,7 @@ impl WebConnectionWrapper {
}
}
fn initialize(&mut self) {
info!("Connecting to WhatsApp Web...");
debug!("Connecting to WhatsApp Web...");
let fut: Box<dyn Future<Item = WebConnection, Error = WaError>> = match self.persist.clone() {
Some(p) => Box::new(WebConnection::connect_persistent(p)),
None => Box::new(WebConnection::connect_new())
@ -93,7 +101,7 @@ impl Stream for WebConnectionWrapper { @@ -93,7 +101,7 @@ impl Stream for WebConnectionWrapper {
Initializing(mut fut) => {
match fut.poll() {
Ok(Async::Ready(c)) => {
info!("Connected to WhatsApp Web.");
debug!("Connected to WhatsApp Web.");
self.inner = Running(c);
},
Ok(Async::NotReady) => {
@ -101,7 +109,7 @@ impl Stream for WebConnectionWrapper { @@ -101,7 +109,7 @@ impl Stream for WebConnectionWrapper {
return Ok(Async::NotReady);
},
Err(e) => {
warn!("Failed to connect to WA: {}", e);
warn!("Failed to connect to WhatsApp: {}", e);
self.backoff();
}
}

Loading…
Cancel
Save