Browse Source

Vastly improved contact creation, WA notify tracking

- Instead of contacts/recipients potentially being created in like
  three disparate places, there is now one and only one place
  where this happens: the object responsible for the protocol,
  i.e. the WhatsApp or modem manager.
  - This makes everything far simpler, and means we can actually
    keep track of things like contact names properly, instead of
    wondering which part of the bridge will end up making the
    contact in the end.
- All of the WhatsApp contacts we get on the initial burst, and
  any added thereafter, are now translated into recipients.
  - In addition, we keep track of their 'notify' property (i.e.
    the ~tildename for each contact, or the contact name from the
    user address book), and store that in the database.
  - In future, the bridge will perhaps let you autoname people,
    or display their notify in some meaningful way to allow you
    to differentiate people amongst the sea of random numbers...
master
eta 4 years ago
parent
commit
d82ec9e1a7
  1. 2
      migrations/2019-07-13-090924_notify/down.sql
  2. 2
      migrations/2019-07-13-090924_notify/up.sql
  3. 9
      src/comm.rs
  4. 51
      src/contact_common.rs
  5. 22
      src/contact_factory.rs
  6. 11
      src/control_common.rs
  7. 32
      src/insp_s2s.rs
  8. 5
      src/irc_s2c.rs
  9. 13
      src/models.rs
  10. 13
      src/modem.rs
  11. 2
      src/schema.rs
  12. 46
      src/store.rs
  13. 13
      src/util.rs
  14. 163
      src/whatsapp.rs
  15. 11
      src/whatsapp_media.rs

2
migrations/2019-07-13-090924_notify/down.sql

@ -0,0 +1,2 @@ @@ -0,0 +1,2 @@
ALTER TABLE recipients DROP COLUMN notify;
ALTER TABLE messages DROP COLUMN source;

2
migrations/2019-07-13-090924_notify/up.sql

@ -0,0 +1,2 @@ @@ -0,0 +1,2 @@
ALTER TABLE recipients ADD COLUMN notify VARCHAR;
ALTER TABLE messages ADD COLUMN source INT NOT NULL DEFAULT 0;

9
src/comm.rs

@ -25,7 +25,8 @@ pub enum ModemCommand { @@ -25,7 +25,8 @@ pub enum ModemCommand {
RequestReg,
ForceReinit,
UpdatePath(Option<String>),
CommandTimeout
CommandTimeout,
MakeContact(PduAddress),
}
pub enum WhatsappCommand {
StartRegistration,
@ -45,15 +46,17 @@ pub enum WhatsappCommand { @@ -45,15 +46,17 @@ pub enum WhatsappCommand {
Message(bool, Box<WaMessage>),
MediaFinished(MediaResult),
CheckAcks,
PrintAcks
PrintAcks,
MakeContact(PduAddress),
}
#[allow(dead_code)]
pub enum ContactFactoryCommand {
ProcessMessages,
ProcessGroups,
ProcessAvatars,
MakeContact(PduAddress, bool),
SetupContact(PduAddress),
DropContact(PduAddress),
QueryContact(PduAddress, i32),
// FIXME: these `ByNick` variants are dumb and only exist to serve the control bot
DropContactByNick(String),
LoadRecipients,

51
src/contact_common.rs

@ -1,12 +1,17 @@ @@ -1,12 +1,17 @@
//! Shared behaviour for contact factories/stores.
use huawei_modem::pdu::PduAddress;
use crate::models::Recipient;
use crate::models::{Recipient, Message};
use crate::store::Store;
use crate::comm::ContactManagerCommand;
use crate::util::{self, Result};
use futures::sync::mpsc::UnboundedSender;
use crate::comm::{WhatsappCommand, ModemCommand, ControlBotCommand};
pub trait ContactManagerManager {
fn wa_tx(&mut self) -> &mut UnboundedSender<WhatsappCommand>;
fn m_tx(&mut self) -> &mut UnboundedSender<ModemCommand>;
fn cb_tx(&mut self) -> &mut UnboundedSender<ControlBotCommand>;
fn setup_contact_for(&mut self, _: Recipient, _: PduAddress) -> Result<()>;
fn remove_contact_for(&mut self, _: &PduAddress) -> Result<()>;
fn has_contact(&mut self, _: &PduAddress) -> bool;
@ -24,17 +29,49 @@ pub trait ContactManagerManager { @@ -24,17 +29,49 @@ pub trait ContactManagerManager {
self.setup_contact_for(recip, addr)?;
Ok(())
}
fn make_contact(&mut self, addr: PduAddress, is_wa: bool) -> Result<()> {
if let Some(recip) = self.store().get_recipient_by_addr_opt(&addr)? {
fn query_contact(&mut self, a: PduAddress, src: i32) -> Result<()> {
if let Some(recip) = self.store().get_recipient_by_addr_opt(&a)? {
self.cb_tx().unbounded_send(
ControlBotCommand::CommandResponse(
format!("Ghost exists already; nickname is `{}`.", recip.nick)
))
.unwrap();
self.setup_recipient(recip)?;
}
else {
let nick = util::make_nick_for_address(&addr);
let watext = if is_wa { "WA recipient"} else { "recipient" };
info!("Creating new {} for {} (nick {})", watext, addr, nick);
let recip = self.store().store_recipient(&addr, &nick, is_wa)?;
self.cb_tx().unbounded_send(
ControlBotCommand::CommandResponse(
format!("Setting up a ghost for number `{}`...", a)
))
.unwrap();
self.request_contact(a, src)?;
}
Ok(())
}
fn setup_contact(&mut self, a: PduAddress) -> Result<()> {
if let Some(recip) = self.store().get_recipient_by_addr_opt(&a)? {
self.setup_recipient(recip)?;
}
else {
error!("Attempted to setup non-existent recipient {}", a);
}
Ok(())
}
fn request_contact(&mut self, a: PduAddress, src: i32) -> Result<()> {
info!("No contact exists yet for {}; asking for its creation", a);
match src {
Message::SOURCE_SMS => {
self.m_tx().unbounded_send(ModemCommand::MakeContact(a))
.unwrap();
},
Message::SOURCE_WA => {
self.wa_tx().unbounded_send(WhatsappCommand::MakeContact(a))
.unwrap();
},
_ => {
error!("Contact requested for unknown message source {}", src);
}
}
Ok(())
}
fn drop_contact(&mut self, addr: PduAddress) -> Result<()> {

22
src/contact_factory.rs

@ -2,9 +2,9 @@ @@ -2,9 +2,9 @@
use crate::config::Config;
use crate::store::Store;
use crate::comm::{ContactFactoryCommand, ContactManagerCommand, ChannelMaker, InitParameters};
use crate::comm::{ContactFactoryCommand, ContactManagerCommand, ChannelMaker, InitParameters, ModemCommand, WhatsappCommand, ControlBotCommand};
use futures::{Future, Async, Poll, Stream};
use futures::sync::mpsc::UnboundedReceiver;
use futures::sync::mpsc::{UnboundedReceiver, UnboundedSender};
use std::collections::{HashMap, HashSet};
use tokio_core::reactor::Handle;
use huawei_modem::pdu::PduAddress;
@ -18,6 +18,9 @@ use crate::config::IrcClientConfig; @@ -18,6 +18,9 @@ use crate::config::IrcClientConfig;
pub struct ContactFactory {
rx: UnboundedReceiver<ContactFactoryCommand>,
cb_tx: UnboundedSender<ControlBotCommand>,
wa_tx: UnboundedSender<WhatsappCommand>,
m_tx: UnboundedSender<ModemCommand>,
contacts_starting: HashMap<PduAddress, Box<Future<Item = ContactManager, Error = Error>>>,
contacts: HashMap<PduAddress, ContactManager>,
contacts_presence: HashMap<PduAddress, Option<String>>,
@ -42,7 +45,8 @@ impl Future for ContactFactory { @@ -42,7 +45,8 @@ impl Future for ContactFactory {
ProcessMessages => self.process_messages()?,
ProcessGroups => self.process_groups()?,
LoadRecipients => self.load_recipients()?,
MakeContact(addr, wa) => self.make_contact(addr, wa)?,
SetupContact(addr) => self.setup_contact(addr)?,
QueryContact(addr, src) => self.query_contact(addr, src)?,
DropContact(addr) => self.drop_contact(addr)?,
DropContactByNick(nick) => self.drop_contact_by_nick(nick)?,
ForwardCommand(addr, cmd) => self.forward_cmd(&addr, cmd)?,
@ -95,6 +99,9 @@ impl Future for ContactFactory { @@ -95,6 +99,9 @@ impl Future for ContactFactory {
}
}
impl ContactManagerManager for ContactFactory {
fn wa_tx(&mut self) -> &mut UnboundedSender<WhatsappCommand> { &mut self.wa_tx }
fn cb_tx(&mut self) -> &mut UnboundedSender<ControlBotCommand> { &mut self.cb_tx }
fn m_tx(&mut self) -> &mut UnboundedSender<ModemCommand> { &mut self.m_tx }
fn setup_contact_for(&mut self, recip: Recipient, addr: PduAddress) -> Result<()> {
let cfut = {
let ip = self.get_init_parameters();
@ -140,6 +147,9 @@ impl ContactFactory { @@ -140,6 +147,9 @@ impl ContactFactory {
use std::time::{Instant, Duration};
let rx = cm.cf_rx.take().unwrap();
let wa_tx = cm.wa_tx.clone();
let cb_tx = cm.cb_tx.clone();
let m_tx = cm.modem_tx.clone();
cm.cf_tx.unbounded_send(ContactFactoryCommand::LoadRecipients).unwrap();
let failure_int = Interval::new(Instant::now(), Duration::from_millis(cfg.client.as_ref().unwrap().failure_interval.unwrap_or(30000)));
Self {
@ -149,12 +159,12 @@ impl ContactFactory { @@ -149,12 +159,12 @@ impl ContactFactory {
contacts: HashMap::new(),
failed_contacts: HashSet::new(),
messages_processed: HashSet::new(),
cfg, store, cm, hdl
cfg, store, cm, hdl, wa_tx, m_tx, cb_tx
}
}
fn process_failures(&mut self) -> Result<()> {
for addr in ::std::mem::replace(&mut self.failed_contacts, HashSet::new()) {
self.make_contact(addr, false)?;
self.setup_contact(addr)?;
}
Ok(())
}
@ -199,7 +209,7 @@ impl ContactFactory { @@ -199,7 +209,7 @@ impl ContactFactory {
c.add_command(ContactManagerCommand::ProcessMessages);
continue;
}
self.make_contact(addr, msg.text.is_some())?;
self.request_contact(addr, msg.source)?;
}
}
Ok(())

11
src/control_common.rs

@ -6,6 +6,7 @@ use crate::util::Result; @@ -6,6 +6,7 @@ use crate::util::Result;
use crate::admin::{InspCommand, AdminCommand, GhostCommand, GroupCommand, ContactCommand};
use crate::admin::ModemCommand as AdminModemCommand;
use crate::admin::WhatsappCommand as AdminWhatsappCommand;
use crate::models::Message;
pub trait ControlCommon {
fn wa_tx(&mut self) -> &mut UnboundedSender<WhatsappCommand>;
@ -98,14 +99,12 @@ pub trait ControlCommon { @@ -98,14 +99,12 @@ pub trait ControlCommon {
},
AdminCommand::Contact(cc) => {
use self::ContactCommand::*;
let (addr, is_wa) = match cc {
NewSms(a) => (a, false),
NewWhatsapp(a) => (a, true)
let (addr, src) = match cc {
NewSms(a) => (a, Message::SOURCE_SMS),
NewWhatsapp(a) => (a, Message::SOURCE_WA)
};
self.cf_tx().unbounded_send(ContactFactoryCommand::MakeContact(addr, is_wa))
self.cf_tx().unbounded_send(ContactFactoryCommand::QueryContact(addr, src))
.unwrap();
self.control_response("Contact command executed.")?;
},
AdminCommand::Insp(ic) => {
if !self.process_insp(ic)? {

32
src/insp_s2s.rs

@ -47,6 +47,8 @@ pub struct InspLink { @@ -47,6 +47,8 @@ pub struct InspLink {
cf_rx: UnboundedReceiver<ContactFactoryCommand>,
cf_tx: UnboundedSender<ContactFactoryCommand>,
cb_rx: UnboundedReceiver<ControlBotCommand>,
// Okay, this is a bit silly, but hey, standardization...
cb_tx: UnboundedSender<ControlBotCommand>,
wa_tx: UnboundedSender<WhatsappCommand>,
m_tx: UnboundedSender<ModemCommand>,
// map of UID -> InspUser
@ -89,6 +91,9 @@ impl Future for InspLink { @@ -89,6 +91,9 @@ impl Future for InspLink {
}
}
impl ContactManagerManager for InspLink {
fn wa_tx(&mut self) -> &mut UnboundedSender<WhatsappCommand> { &mut self.wa_tx }
fn m_tx(&mut self) -> &mut UnboundedSender<ModemCommand> { &mut self.m_tx }
fn cb_tx(&mut self) -> &mut UnboundedSender<ControlBotCommand> { &mut self.cb_tx }
fn setup_contact_for(&mut self, recip: Recipient, addr: PduAddress) -> Result<()> {
trace!("setting up contact for recip #{}: {}", recip.id, addr);
let host = self.host_for_wa(recip.whatsapp);
@ -158,15 +163,9 @@ impl ContactManagerManager for InspLink { @@ -158,15 +163,9 @@ impl ContactManagerManager for InspLink {
}
}
impl ControlCommon for InspLink {
fn wa_tx(&mut self) -> &mut UnboundedSender<WhatsappCommand> {
&mut self.wa_tx
}
fn cf_tx(&mut self) -> &mut UnboundedSender<ContactFactoryCommand> {
&mut self.cf_tx
}
fn m_tx(&mut self) -> &mut UnboundedSender<ModemCommand> {
&mut self.m_tx
}
fn cf_tx(&mut self) -> &mut UnboundedSender<ContactFactoryCommand> { &mut self.cf_tx }
fn wa_tx(&mut self) -> &mut UnboundedSender<WhatsappCommand> { &mut self.wa_tx }
fn m_tx(&mut self) -> &mut UnboundedSender<ModemCommand> { &mut self.m_tx }
fn control_response(&mut self, msg: &str) -> Result<()> {
if let Some(admu) = self.admin_uuid() {
let line = Message::new(Some(&self.control_uuid), "NOTICE", vec![&admu], Some(msg))?;
@ -264,6 +263,7 @@ impl InspLink { @@ -264,6 +263,7 @@ impl InspLink {
let cf_rx = p.cm.cf_rx.take().unwrap();
let cb_rx = p.cm.cb_rx.take().unwrap();
let cf_tx = p.cm.cf_tx.clone();
let cb_tx = p.cm.cb_tx.clone();
let wa_tx = p.cm.wa_tx.clone();
let m_tx = p.cm.modem_tx.clone();
let (addr, codec) = match Self::_make_addr_and_codec(&cfg) {
@ -279,7 +279,7 @@ impl InspLink { @@ -279,7 +279,7 @@ impl InspLink {
cfg,
control_uuid,
next_user_id: 1,
cf_rx, cf_tx, cb_rx, wa_tx, m_tx,
cf_rx, cf_tx, cb_rx, cb_tx, wa_tx, m_tx,
users: HashMap::new(),
contacts: HashMap::new(),
contacts_uuid_pdua: HashMap::new(),
@ -322,7 +322,7 @@ impl InspLink { @@ -322,7 +322,7 @@ impl InspLink {
if let Some(a) = addr {
if recreate {
info!("Contact for {} removed; recreating", a);
self.make_contact(a, false)?;
self.setup_contact(a)?;
}
}
Ok(())
@ -590,7 +590,8 @@ impl InspLink { @@ -590,7 +590,8 @@ impl InspLink {
match cfc {
ProcessMessages => self.process_messages()?,
ProcessGroups => self.process_groups()?,
MakeContact(a, wa) => self.make_contact(a, wa)?,
SetupContact(a) => self.setup_contact(a)?,
QueryContact(a, src) => self.query_contact(a, src)?,
DropContact(a) => self.drop_contact(a)?,
DropContactByNick(a) => self.drop_contact_by_nick(a)?,
LoadRecipients => {
@ -681,7 +682,8 @@ impl InspLink { @@ -681,7 +682,8 @@ impl InspLink {
let addr = util::un_normalize_address(&msg.phone_number)
.ok_or(format_err!("invalid address {} in db", msg.phone_number))?;
if !self.has_contact(&addr) {
self.make_contact(addr.clone(), msg.text.is_some())?;
self.request_contact(addr, msg.source)?;
continue;
}
let (uuid, is_wa) = {
let ct = self.contacts.get(&addr).unwrap();
@ -735,9 +737,7 @@ impl InspLink { @@ -735,9 +737,7 @@ impl InspLink {
let line = self.make_uid_line(&uuid)?;
self.send(line);
for recip in self.store.get_all_recipients()? {
let addr = util::un_normalize_address(&recip.phone_number)
.ok_or(format_err!("invalid phone number in db"))?;
self.make_contact(addr, recip.whatsapp)?;
self.setup_recipient(recip)?;
}
self.send_sid_line("ENDBURST", vec![], None)?;
Ok(())

5
src/irc_s2c.rs

@ -4,15 +4,14 @@ @@ -4,15 +4,14 @@
use tokio_core::net::{TcpListener, Incoming, TcpStream};
use tokio_codec::Framed;
use futures::sync::mpsc::{UnboundedReceiver, UnboundedSender};
use irc::proto::IrcCodec;
use irc::proto::message::Message;
use irc::proto::command::Command;
use futures::{Future, Async, Poll, Stream, Sink, AsyncSink, self};
use futures::{Future, Async, Poll, Stream, Sink, self};
use failure::Error;
use std::net::{SocketAddr, ToSocketAddrs};
use crate::util::{self, Result};
use crate::util::Result;
use crate::irc_s2c_registration::{PendingIrcConnectionWrapper, RegistrationInformation};
use crate::config::IrcServerConfig;
use crate::comm::InitParameters;

13
src/models.rs

@ -7,7 +7,8 @@ pub struct Recipient { @@ -7,7 +7,8 @@ pub struct Recipient {
pub phone_number: String,
pub nick: String,
pub whatsapp: bool,
pub avatar_url: Option<String>
pub avatar_url: Option<String>,
pub notify: Option<String>,
}
#[derive(Insertable)]
#[table_name="recipients"]
@ -15,7 +16,8 @@ pub struct NewRecipient<'a> { @@ -15,7 +16,8 @@ pub struct NewRecipient<'a> {
pub phone_number: &'a str,
pub nick: &'a str,
pub whatsapp: bool,
pub avatar_url: Option<&'a str>
pub avatar_url: Option<&'a str>,
pub notify: Option<&'a str>
}
#[derive(Queryable, Debug)]
pub struct Message {
@ -25,6 +27,11 @@ pub struct Message { @@ -25,6 +27,11 @@ pub struct Message {
pub csms_data: Option<i32>,
pub group_target: Option<i32>,
pub text: Option<String>,
pub source: i32,
}
impl Message {
pub const SOURCE_SMS: i32 = 0;
pub const SOURCE_WA: i32 = 1;
}
#[derive(Queryable, Debug)]
pub struct Group {
@ -61,6 +68,7 @@ pub struct NewMessage<'a> { @@ -61,6 +68,7 @@ pub struct NewMessage<'a> {
pub phone_number: &'a str,
pub pdu: &'a [u8],
pub csms_data: Option<i32>,
pub source: i32
}
#[derive(Insertable)]
#[table_name="messages"]
@ -68,4 +76,5 @@ pub struct NewPlainMessage<'a> { @@ -68,4 +76,5 @@ pub struct NewPlainMessage<'a> {
pub phone_number: &'a str,
pub group_target: Option<i32>,
pub text: &'a str,
pub source: i32
}

13
src/modem.rs

@ -13,7 +13,7 @@ use huawei_modem::cmd::sms::SmsMessage; @@ -13,7 +13,7 @@ use huawei_modem::cmd::sms::SmsMessage;
use huawei_modem::pdu::{Pdu, PduAddress};
use huawei_modem::gsm_encoding::GsmMessageData;
use failure::Error;
use crate::util::Result;
use crate::util::{self, Result};
use std::mem;
macro_rules! command_timeout {
@ -197,7 +197,8 @@ impl Future for ModemManager { @@ -197,7 +197,8 @@ impl Future for ModemManager {
RequestReg => self.request_reg(),
ForceReinit => self.reinit_modem(),
UpdatePath(p) => self.update_path(p),
CommandTimeout => self.command_timeout()
CommandTimeout => self.command_timeout(),
MakeContact(a) => self.make_contact(a)?
}
}
Ok(Async::NotReady)
@ -212,6 +213,12 @@ impl ModemManager { @@ -212,6 +213,12 @@ impl ModemManager {
self.report_modem_error(e);
}
}
fn make_contact(&mut self, addr: PduAddress) -> Result<()> {
let nick = util::make_nick_for_address(&addr);
self.store.store_recipient(&addr, &nick)?;
self.cf_tx.unbounded_send(ContactFactoryCommand::ProcessMessages).unwrap();
Ok(())
}
fn update_path(&mut self, path: Option<String>) {
info!("Updating modem path to {:?}", path);
self.modem_path = path;
@ -339,7 +346,7 @@ impl ModemManager { @@ -339,7 +346,7 @@ impl ModemManager {
trace!("Message is concatenated: {:?}", d);
}
let addr = msg.pdu.originating_address;
self.store.store_message(&addr, &msg.raw_pdu, csms_data)?;
self.store.store_sms_message(&addr, &msg.raw_pdu, csms_data)?;
}
self.cf_tx.unbounded_send(ContactFactoryCommand::ProcessMessages).unwrap();
let mut modem = match self.inner.get_modem() {

2
src/schema.rs

@ -17,6 +17,7 @@ table! { @@ -17,6 +17,7 @@ table! {
csms_data -> Nullable<Int4>,
group_target -> Nullable<Int4>,
text -> Nullable<Varchar>,
source -> Int4,
}
}
@ -27,6 +28,7 @@ table! { @@ -27,6 +28,7 @@ table! {
nick -> Varchar,
whatsapp -> Bool,
avatar_url -> Nullable<Varchar>,
notify -> Nullable<Varchar>,
}
}

46
src/store.rs

@ -29,14 +29,15 @@ impl Store { @@ -29,14 +29,15 @@ impl Store {
inner: Arc::new(pool)
})
}
pub fn store_message(&mut self, addr: &PduAddress, pdu: &[u8], csms_data: Option<i32>) -> Result<Message> {
pub fn store_sms_message(&mut self, addr: &PduAddress, pdu: &[u8], csms_data: Option<i32>) -> Result<Message> {
use crate::schema::messages;
let num = util::normalize_address(addr);
let nm = NewMessage {
phone_number: &num,
pdu,
csms_data
csms_data,
source: Message::SOURCE_SMS
};
let conn = self.inner.get()?;
@ -45,14 +46,15 @@ impl Store { @@ -45,14 +46,15 @@ impl Store {
.get_result(&*conn)?;
Ok(res)
}
pub fn store_plain_message(&mut self, addr: &PduAddress, text: &str, group_target: Option<i32>) -> Result<Message> {
pub fn store_wa_message(&mut self, addr: &PduAddress, text: &str, group_target: Option<i32>) -> Result<Message> {
use crate::schema::messages;
let num = util::normalize_address(addr);
let nm = NewPlainMessage {
phone_number: &num,
text,
group_target
group_target,
source: Message::SOURCE_WA
};
let conn = self.inner.get()?;
@ -138,15 +140,16 @@ impl Store { @@ -138,15 +140,16 @@ impl Store {
.execute(&*conn)?;
Ok(())
}
pub fn store_recipient(&mut self, addr: &PduAddress, nick: &str, wa: bool) -> Result<Recipient> {
pub fn store_recipient(&mut self, addr: &PduAddress, nick: &str) -> Result<Recipient> {
use crate::schema::recipients;
let num = util::normalize_address(addr);
let nr = NewRecipient {
phone_number: &num,
nick,
whatsapp: wa,
avatar_url: None
whatsapp: false,
avatar_url: None,
notify: None
};
let conn = self.inner.get()?;
@ -155,6 +158,35 @@ impl Store { @@ -155,6 +158,35 @@ impl Store {
.get_result(&*conn)?;
Ok(res)
}
pub fn store_wa_recipient(&mut self, addr: &PduAddress, nick: &str, notify: Option<&str>) -> Result<Recipient> {
use crate::schema::recipients;
let num = util::normalize_address(addr);
let nr = NewRecipient {
phone_number: &num,
nick,
whatsapp: true,
avatar_url: None,
notify: notify
};
let conn = self.inner.get()?;
let res = ::diesel::insert_into(recipients::table)
.values(&nr)
.get_result(&*conn)?;
Ok(res)
}
pub fn update_recipient_notify(&mut self, addr: &PduAddress, n: Option<&str>) -> Result<()> {
use crate::schema::recipients::dsl::*;
let conn = self.inner.get()?;
let num = util::normalize_address(addr);
::diesel::update(recipients)
.filter(phone_number.eq(num))
.set(notify.eq(n))
.execute(&*conn)?;
Ok(())
}
pub fn update_recipient_nick(&mut self, addr: &PduAddress, n: &str) -> Result<()> {
use crate::schema::recipients::dsl::*;
let conn = self.inner.get()?;

13
src/util.rs

@ -25,12 +25,23 @@ macro_rules! sink_outbox { @@ -25,12 +25,23 @@ macro_rules! sink_outbox {
pub fn jid_to_address(jid: &Jid) -> Option<PduAddress> {
if let Some(pn) = jid.phonenumber() {
Some(pn.parse().unwrap())
let ret: PduAddress = pn.parse().unwrap();
if ret.number.0.len() > 0 {
Some(ret)
}
else {
warn!("jid {} converted as 0-length PduAddress", jid.to_string());
None
}
}
else {
None
}
}
pub fn address_to_jid(addr: &PduAddress) -> Result<Jid> {
let ret = Jid::from_phonenumber(addr.to_string())?;
Ok(ret)
}
pub fn normalize_address(addr: &PduAddress) -> String {
let ton: u8 = addr.type_addr.into();
let mut ret = format!("{:02X}", ton);

163
src/whatsapp.rs

@ -166,7 +166,21 @@ impl WhatsappManager { @@ -166,7 +166,21 @@ impl WhatsappManager {
},
MediaFinished(r) => self.media_finished(r)?,
CheckAcks => self.check_acks()?,
PrintAcks => self.print_acks()?
PrintAcks => self.print_acks()?,
MakeContact(a) => self.make_contact(a)?,
}
Ok(())
}
fn make_contact(&mut self, addr: PduAddress) -> Result<()> {
match util::address_to_jid(&addr) {
Ok(from) => {
let _ = self.get_wa_recipient(&from)?;
self.cf_tx.unbounded_send(ContactFactoryCommand::ProcessMessages)
.unwrap();
},
Err(e) => {
error!("Couldn't make contact for {}: {}", addr, e);
}
}
Ok(())
}
@ -246,25 +260,22 @@ impl WhatsappManager { @@ -246,25 +260,22 @@ impl WhatsappManager {
fn media_finished(&mut self, r: MediaResult) -> Result<()> {
match r.result {
Ok(ret) => {
debug!("Media download/decryption job for {} / mid {:?} complete.", r.addr, r.mi);
self.store.store_plain_message(&r.addr, &ret, r.group)?;
self.cf_tx.unbounded_send(ContactFactoryCommand::ProcessMessages)
.unwrap();
if let Err(e) = self.store.store_wa_msgid(r.mi.0.clone()) {
warn!("Failed to store WA msgid {} after media download: {}", r.mi.0, e);
}
if let Some(ref mut conn) = self.conn {
if let Some(p) = r.peer {
conn.send_message_read(r.mi, p);
}
}
debug!("Media download/decryption job for {} / mid {:?} complete.", r.from.to_string(), r.mi);
self.store_message(&r.from, &ret, r.group)?;
},
Err(e) => {
warn!("Decryption job failed for {} / mid {:?}: {}", r.addr, r.mi, e);
// FIXME: We could possibly retry the download somehow.
warn!("Decryption job failed for {} / mid {:?}: {}", r.from.to_string(), r.mi, e);
let msg = "\x01ACTION uploaded media (couldn't download)\x01";
self.store.store_plain_message(&r.addr, &msg, r.group)?;
self.cf_tx.unbounded_send(ContactFactoryCommand::ProcessMessages)
.unwrap();
self.store_message(&r.from, msg, r.group)?;
}
}
if let Err(e) = self.store.store_wa_msgid(r.mi.0.clone()) {
warn!("Failed to store WA msgid {} after media download: {}", r.mi.0, e);
}
if let Some(ref mut conn) = self.conn {
if let Some(p) = r.peer {
conn.send_message_read(r.mi, p);
}
}
Ok(())
@ -533,14 +544,8 @@ impl WhatsappManager { @@ -533,14 +544,8 @@ impl WhatsappManager {
mut x @ ChatMessageContent::Audio { .. } |
mut x @ ChatMessageContent::Document { .. } => {
let capt = x.take_caption();
if let Some(addr) = util::jid_to_address(&from) {
self.process_media(id.clone(), peer.clone(), addr, group, x)?;
is_media = true;
}
else {
warn!("couldn't make address for jid {}", from.to_string());
return Ok(());
}
self.process_media(id.clone(), peer.clone(), from.clone(), group, x)?;
is_media = true;
if let Some(c) = capt {
c
}
@ -567,8 +572,10 @@ impl WhatsappManager { @@ -567,8 +572,10 @@ impl WhatsappManager {
self.store_message(&from, &quote, group)?;
}
self.store_message(&from, &text, group)?;
if let Err(e) = self.store.store_wa_msgid(id.0.clone()) {
warn!("Failed to store received msgid {}: {}", id.0, e);
if !is_media {
if let Err(e) = self.store.store_wa_msgid(id.0.clone()) {
warn!("Failed to store received msgid {}: {}", id.0, e);
}
}
if let Some(p) = peer {
if let Some(ref mut conn) = self.conn {
@ -581,8 +588,8 @@ impl WhatsappManager { @@ -581,8 +588,8 @@ impl WhatsappManager {
}
fn store_message(&mut self, from: &Jid, text: &str, group: Option<i32>) -> Result<()> {
if let Some(addr) = util::jid_to_address(from) {
let _ = self.get_wa_recipient(from, &addr)?;
self.store.store_plain_message(&addr, &text, group)?;
let _ = self.get_wa_recipient(from)?;
self.store.store_wa_message(&addr, &text, group)?;
self.cf_tx.unbounded_send(ContactFactoryCommand::ProcessMessages)
.unwrap();
}
@ -591,7 +598,7 @@ impl WhatsappManager { @@ -591,7 +598,7 @@ impl WhatsappManager {
}
Ok(())
}
fn process_media(&mut self, id: MessageId, peer: Option<Peer>, addr: PduAddress, group: Option<i32>, ct: ChatMessageContent) -> Result<()> {
fn process_media(&mut self, id: MessageId, peer: Option<Peer>, from: Jid, group: Option<i32>, ct: ChatMessageContent) -> Result<()> {
use whatsappweb::MediaType;
let (ty, fi, name) = match ct {
@ -604,7 +611,7 @@ impl WhatsappManager { @@ -604,7 +611,7 @@ impl WhatsappManager {
let mi = MediaInfo {
ty, fi, name, peer,
mi: id,
addr, group,
from, group,
path: self.media_path.clone(),
dl_path: self.dl_path.clone(),
tx: self.wa_tx.clone()
@ -639,22 +646,39 @@ impl WhatsappManager { @@ -639,22 +646,39 @@ impl WhatsappManager {
}
Ok(())
}
fn get_wa_recipient(&mut self, jid: &Jid, addr: &PduAddress) -> Result<Recipient> {
fn get_contact_notify_for_jid(&mut self, jid: &Jid) -> Option<&str> {
let mut notify: Option<&str> = None;
if let Some(ct) = self.contacts.get(jid) {
if let Some(ref name) = ct.name {
notify = Some(name);
}
else if let Some(ref name) = ct.notify {
notify = Some(name);
}
}
notify
}
fn get_wa_recipient(&mut self, jid: &Jid) -> Result<Recipient> {
let addr = match util::jid_to_address(jid) {
Some(a) => a,
None => {
return Err(format_err!("couldn't translate jid {} to address", jid.to_string()));
}
};
if let Some(recip) = self.store.get_recipient_by_addr_opt(&addr)? {
Ok(recip)
}
else {
let mut nick = util::make_nick_for_address(&addr);
if let Some(ct) = self.contacts.get(jid) {
if let Some(ref name) = ct.name {
nick = util::string_to_irc_nick(name);
}
else if let Some(ref name) = ct.notify {
nick = util::string_to_irc_nick(name);
}
}
let notify = self.get_contact_notify_for_jid(jid).map(|x| x.to_string());
let nick = match notify {
Some(ref n) => util::string_to_irc_nick(n),
None => util::make_nick_for_address(&addr)
};
info!("Creating new WA recipient for {} (nick {})", addr, nick);
Ok(self.store.store_recipient(&addr, &nick, true)?)
let ret = self.store.store_wa_recipient(&addr, &nick, notify.as_ref().map(|x| x as &str))?;
self.cf_tx.unbounded_send(ContactFactoryCommand::SetupContact(addr.clone()))
.unwrap();
Ok(ret)
}
}
fn on_got_group_metadata(&mut self, grp: GroupMetadata) -> Result<()> {
@ -669,14 +693,10 @@ impl WhatsappManager { @@ -669,14 +693,10 @@ impl WhatsappManager {
let mut participants = vec![];
let mut admins = vec![];
for &(ref jid, admin) in grp.participants.iter() {
if let Some(addr) = util::jid_to_address(jid) {
let recip = self.get_wa_recipient(&jid, &addr)?;
self.cf_tx.unbounded_send(ContactFactoryCommand::MakeContact(addr, true))
.unwrap();
participants.push(recip.id);
if admin {
admins.push(recip.id);
}
let recip = self.get_wa_recipient(jid)?;
participants.push(recip.id);
if admin {
admins.push(recip.id);
}
}
self.store.update_group(&grp.id, participants, admins, &grp.subject)?;
@ -816,6 +836,19 @@ impl WhatsappManager { @@ -816,6 +836,19 @@ impl WhatsappManager {
warn!("Disconnected from WhatsApp - reason: {:?}", reason);
self.connected = false;
}
fn on_contact_change(&mut self, ct: WaContact) -> Result<()> {
let jid = ct.jid.clone();
if let Some(addr) = util::jid_to_address(&jid) {
let recip = self.get_wa_recipient(&jid)?;
self.contacts.insert(ct.jid.clone(), ct);
let notify = self.get_contact_notify_for_jid(&jid).map(|x| x.to_string());
if notify != recip.notify {
debug!("Notify changed for recipient {}: it's now {:?}", recip.nick, notify);
self.store.update_recipient_notify(&addr, notify.as_ref().map(|x| x as &str))?;
}
}
Ok(())
}
fn on_user_data_changed(&mut self, ud: WaUserData) -> Result<()> {
trace!("user data changed: {:?}", ud);
use self::WaUserData::*;
@ -823,12 +856,12 @@ impl WhatsappManager { @@ -823,12 +856,12 @@ impl WhatsappManager {
ContactsInitial(cts) => {
info!("Received initial contact list");
for ct in cts {
self.contacts.insert(ct.jid.clone(), ct);
self.on_contact_change(ct)?;
}
},
ContactAddChange(ct) => {
debug!("Contact {} added or modified", ct.jid.to_string());
self.contacts.insert(ct.jid.clone(), ct);
self.on_contact_change(ct)?;
},
ContactDelete(jid) => {
// NOTE: I don't see any real reason to ever delete contacts.
@ -921,10 +954,8 @@ impl WhatsappManager { @@ -921,10 +954,8 @@ impl WhatsappManager {
else {
let mut nicks = vec![];
for participant in participants {
if let Some(addr) = util::jid_to_address(&participant) {
let recip = self.get_wa_recipient(&participant, &addr)?;
nicks.push(recip.nick);
}
let recip = self.get_wa_recipient(&participant)?;
nicks.push(recip.nick);
}
let action = match change {
GroupParticipantsChange::Add => "added",
@ -939,20 +970,16 @@ impl WhatsappManager { @@ -939,20 +970,16 @@ impl WhatsappManager {
}
},
StatusChange(user, status) => {
if let Some(addr) = util::jid_to_address(&user) {
let recip = self.get_wa_recipient(&user, &addr)?;
info!("{} changed their status to: {}", recip.nick, status);
}
let recip = self.get_wa_recipient(&user)?;
info!("{} changed their status to: {}", recip.nick, status);
},
PictureChange { jid, removed } => {
if let Some(addr) = util::jid_to_address(&jid) {
let recip = self.get_wa_recipient(&jid, &addr)?;
if !removed {
info!("{} changed their profile photo.", recip.nick);
}
else {
info!("{} removed their profile photo.", recip.nick);
}
let recip = self.get_wa_recipient(&jid)?;
if !removed {
info!("{} changed their profile photo.", recip.nick);
}
else {
info!("{} removed their profile photo.", recip.nick);
}
},
Battery(level) => {

11
src/whatsapp_media.rs

@ -1,11 +1,10 @@ @@ -1,11 +1,10 @@
//! Decrypting/downloading WA media.
use whatsappweb::message::{MessageId, Peer, FileInfo};
use whatsappweb::{MediaType, crypto};
use whatsappweb::{MediaType, crypto, Jid};
use std::thread;
use crate::comm::WhatsappCommand;
use futures::sync::mpsc::UnboundedSender;
use huawei_modem::pdu::PduAddress;
use humansize::{FileSize, file_size_opts};
use reqwest;
use std::io::prelude::*;
@ -32,7 +31,7 @@ pub struct MediaInfo { @@ -32,7 +31,7 @@ pub struct MediaInfo {
pub fi: FileInfo,
pub mi: MessageId,
pub peer: Option<Peer>,
pub addr: PduAddress,
pub from: Jid,
pub group: Option<i32>,
pub path: String,
pub dl_path: String,
@ -40,7 +39,7 @@ pub struct MediaInfo { @@ -40,7 +39,7 @@ pub struct MediaInfo {
pub name: Option<String>
}
pub struct MediaResult {
pub addr: PduAddress,
pub from: Jid,
pub group: Option<i32>,
pub mi: MessageId,
pub peer: Option<Peer>,
@ -96,13 +95,13 @@ impl MediaInfo { @@ -96,13 +95,13 @@ impl MediaInfo {
Ok(ret)
}
pub fn start(mut self) {
debug!("Starting media download/decryption job for {} / mid {:?}", self.addr, self.mi);
debug!("Starting media download/decryption job for {} / mid {:?}", self.from.to_string(), self.mi);
thread::spawn(move || {
let ret = self.run();
let ret = MediaResult {
group: self.group,
mi: self.mi,
addr: self.addr,
from: self.from,
peer: self.peer,
result: ret
};

Loading…
Cancel
Save