Browse Source

initial commit - stuff works!

master
eeeeeta 5 years ago
commit
53f7823e75
  1. 2
      .gitignore
  2. 1492
      Cargo.lock
  3. 25
      Cargo.toml
  4. 7
      config.toml
  5. 0
      migrations/.gitkeep
  6. 2
      migrations/20180621140749_initial/down.sql
  7. 11
      migrations/20180621140749_initial/up.sql
  8. 62
      src/comm.rs
  9. 23
      src/config.rs
  10. 284
      src/contact.rs
  11. 159
      src/contact_factory.rs
  12. 171
      src/control.rs
  13. 122
      src/main.rs
  14. 28
      src/models.rs
  15. 222
      src/modem.rs
  16. 19
      src/schema.rs
  17. 123
      src/store.rs
  18. 47
      src/util.rs

2
.gitignore vendored

@ -0,0 +1,2 @@ @@ -0,0 +1,2 @@
/target
**/*.rs.bk

1492
Cargo.lock generated

File diff suppressed because it is too large Load Diff

25
Cargo.toml

@ -0,0 +1,25 @@ @@ -0,0 +1,25 @@
[package]
authors = ["eeeeeta <eeeeeta@users.noreply.github.com>"]
name = "sms-irc"
version = "0.1.0"
[dependencies]
failure = "0.1"
log4rs = "0.8"
futures = "0.1"
irc = "0.13"
log = "0.4"
r2d2 = "0.8"
r2d2-diesel = "1.0"
serde = "1.0"
serde_derive = "1.0"
tokio-core = "0.1"
tokio-timer = "0.2"
toml = "0.4"
[dependencies.diesel]
features = ["postgres"]
version = "1.0"
[dependencies.huawei-modem]
git = "git://github.com/eeeeeta/huawei-modem"

7
config.toml

@ -0,0 +1,7 @@ @@ -0,0 +1,7 @@
database_url = "postgresql://eeeeeta@localhost/smsirc"
irc_hostname = "192.168.42.15"
modem_path = "/dev/ttyUSB2"
admin_nick = "eeeeeta"
irc_channel = "#smsirc"
cmgl_secs = 30
logging = "sms_irc=debug"

0
migrations/.gitkeep

2
migrations/20180621140749_initial/down.sql

@ -0,0 +1,2 @@ @@ -0,0 +1,2 @@
DROP TABLE IF EXISTS messages;
DROP TABLE IF EXISTS recipients;

11
migrations/20180621140749_initial/up.sql

@ -0,0 +1,11 @@ @@ -0,0 +1,11 @@
CREATE TABLE recipients (
id SERIAL PRIMARY KEY,
phone_number VARCHAR UNIQUE NOT NULL,
nick VARCHAR NOT NULL
);
CREATE TABLE messages (
id SERIAL PRIMARY KEY,
phone_number VARCHAR NOT NULL,
pdu bytea NOT NULL,
csms_data INT
);

62
src/comm.rs

@ -0,0 +1,62 @@ @@ -0,0 +1,62 @@
//! Communication between different things.
use futures::sync::mpsc::{self, UnboundedSender, UnboundedReceiver};
use huawei_modem::cmd::sms::SmsMessage;
use huawei_modem::errors::HuaweiError;
use huawei_modem::pdu::PduAddress;
use huawei_modem::cmd::network::{SignalQuality, RegistrationState};
use config::Config;
use store::Store;
use tokio_core::reactor::Handle;
pub enum ModemCommand {
DoCmgl,
CmglComplete(Vec<SmsMessage>),
CmglFailed(HuaweiError),
SendMessage(PduAddress, String),
RequestCsq,
RequestReg
}
pub enum ContactFactoryCommand {
ProcessMessages,
MakeContact(PduAddress),
LoadRecipients
}
pub enum ContactManagerCommand {
ProcessMessages
}
pub enum ControlBotCommand {
Log(String),
CsqResult(SignalQuality),
RegResult(RegistrationState)
}
pub struct InitParameters<'a> {
pub cfg: &'a Config,
pub store: Store,
pub cm: &'a mut ChannelMaker,
pub hdl: &'a Handle
}
pub struct ChannelMaker {
pub modem_rx: Option<UnboundedReceiver<ModemCommand>>,
pub modem_tx: UnboundedSender<ModemCommand>,
pub cf_rx: Option<UnboundedReceiver<ContactFactoryCommand>>,
pub cf_tx: UnboundedSender<ContactFactoryCommand>,
pub cb_rx: Option<UnboundedReceiver<ControlBotCommand>>,
pub cb_tx: UnboundedSender<ControlBotCommand>
}
impl ChannelMaker {
pub fn new() -> Self {
let (modem_tx, modem_rx) = mpsc::unbounded();
let (cf_tx, cf_rx) = mpsc::unbounded();
let (cb_tx, cb_rx) = mpsc::unbounded();
Self {
modem_rx: Some(modem_rx),
modem_tx,
cf_rx: Some(cf_rx),
cf_tx,
cb_rx: Some(cb_rx),
cb_tx
}
}
}

23
src/config.rs

@ -0,0 +1,23 @@ @@ -0,0 +1,23 @@
#[derive(Deserialize, Debug, Clone)]
pub struct Config {
pub database_url: String,
pub irc_hostname: String,
pub modem_path: String,
pub admin_nick: String,
pub irc_channel: String,
#[serde(default)]
pub control_bot_nick: Option<String>,
#[serde(default)]
pub cmgl_secs: Option<u32>,
#[serde(default)]
pub failure_interval: Option<u64>,
#[serde(default)]
pub chan_loglevel: Option<String>,
#[serde(default)]
pub stdout_loglevel: Option<String>,
#[serde(default)]
pub irc_port: Option<u16>,
#[serde(default)]
pub irc_password: Option<String>,
}

284
src/contact.rs

@ -0,0 +1,284 @@ @@ -0,0 +1,284 @@
//! Dealing with individual IRC virtual users.
use irc::client::PackedIrcClient;
use futures::sync::mpsc::{UnboundedSender, UnboundedReceiver, self};
use comm::{ModemCommand, ContactManagerCommand, InitParameters};
use huawei_modem::pdu::{PduAddress, DeliverPdu};
use store::Store;
use failure::Error;
use futures::{self, Future, Async, Poll, Stream};
use futures::future::Either;
use std::default::Default;
use irc::client::{IrcClient, ClientStream, Client};
use irc::client::ext::ClientExt;
use irc::proto::command::Command;
use irc::proto::message::Message;
use models::Message as OurMessage;
use models::Recipient;
use irc::client::data::config::Config as IrcConfig;
use util::{self, Result};
/// The maximum message size sent over IRC.
static MESSAGE_MAX_LEN: usize = 350;
pub struct ContactManager {
irc: PackedIrcClient,
irc_stream: ClientStream,
admin: String,
nick: String,
addr: PduAddress,
store: Store,
id: bool,
modem_tx: UnboundedSender<ModemCommand>,
pub tx: UnboundedSender<ContactManagerCommand>,
rx: UnboundedReceiver<ContactManagerCommand>
}
impl Future for ContactManager {
type Item = ();
type Error = Error;
fn poll(&mut self) -> Poll<(), Error> {
if !self.id {
self.irc.0.identify()?;
self.id = true;
}
while let Async::Ready(_) = self.irc.1.poll()? {}
while let Async::Ready(res) = self.irc_stream.poll()? {
let msg = res.ok_or(format_err!("irc_stream stopped"))?;
self.handle_irc_message(msg)?;
}
while let Async::Ready(cmc) = self.rx.poll().unwrap() {
let cmc = cmc.ok_or(format_err!("contactmanager rx died"))?;
self.handle_int_rx(cmc)?;
}
Ok(Async::NotReady)
}
}
impl ContactManager {
pub fn add_command(&self, cmd: ContactManagerCommand) {
self.tx.unbounded_send(cmd)
.unwrap()
}
fn send_raw_message(&mut self, msg: &str) -> Result<()> {
// We need to split messages that are too long to send on IRC up
// into fragments, as well as splitting them at newlines.
//
// Shoutout to sebk from #rust on moznet for providing
// this nifty implementation!
for line in msg.lines() {
let mut last = 0;
let mut iter = line.char_indices().filter_map(|(i, _)| {
if i >= last + MESSAGE_MAX_LEN {
let part = &line[last..i];
last = i;
Some(part)
}
else if last + MESSAGE_MAX_LEN >= line.len() {
let part = &line[last..];
last = line.len();
Some(part)
}
else {
None
}
});
for chunk in iter {
self.irc.0.send_privmsg(&self.admin, chunk)?;
}
}
Ok(())
}
fn process_msg_pdu(&mut self, msg: OurMessage, pdu: DeliverPdu) -> Result<()> {
use huawei_modem::convert::TryFrom;
// sanity check
if pdu.originating_address != self.addr {
return Err(format_err!("PDU for {} sent to ContactManager for {}", pdu.originating_address, self.addr));
}
match pdu.get_message_data().decode_message() {
Ok(m) => {
if let Some(cd) = m.udh.and_then(|x| x.get_concatenated_sms_data()) {
debug!("Message is concatenated: {:?}", cd);
let msgs = self.store.get_all_concatenated(&msg.phone_number, cd.reference as _)?;
if msgs.len() != (cd.parts as usize) {
debug!("Not enough messages: have {}, need {}", msgs.len(), cd.parts);
return Ok(());
}
let mut concatenated = String::new();
let mut pdus = vec![];
for msg in msgs.iter() {
let dec = DeliverPdu::try_from(&msg.pdu)?
.get_message_data()
.decode_message()?;
pdus.push(dec);
}
pdus.sort_by_key(|p| p.udh.as_ref().unwrap().get_concatenated_sms_data().unwrap().sequence);
for pdu in pdus {
concatenated.push_str(&pdu.text);
}
self.send_raw_message(&concatenated)?;
for msg in msgs.iter() {
self.store.delete_message(msg.id)?;
}
}
else {
self.send_raw_message(&m.text)?;
self.store.delete_message(msg.id)?;
}
},
Err(e) => {
warn!("Error decoding message from {}: {:?}", self.addr, e);
self.irc.0.send_notice(&self.admin, &format!("Indecipherable message: {}", e))?;
}
}
Ok(())
}
fn process_messages(&mut self) -> Result<()> {
use huawei_modem::convert::TryFrom;
let msgs = self.store.get_messages_for_recipient(&self.addr)?;
for msg in msgs {
debug!("Processing message #{}", msg.id);
let pdu = DeliverPdu::try_from(&msg.pdu)?;
self.process_msg_pdu(msg, pdu)?;
}
Ok(())
}
fn handle_int_rx(&mut self, cmc: ContactManagerCommand) -> Result<()> {
use self::ContactManagerCommand::*;
match cmc {
ProcessMessages => self.process_messages()?,
}
Ok(())
}
fn change_nick(&mut self, nick: String) -> Result<()> {
info!("Contact {} changing nick to {}", self.nick, nick);
self.store.update_recipient_nick(&self.addr, &nick)?;
self.irc.0.send(Command::NICK(nick))?;
Ok(())
}
fn process_admin_command(&mut self, mesg: String) -> Result<()> {
if mesg.len() < 1 || mesg.chars().nth(0) != Some('!') {
return Ok(());
}
let msg = mesg.split(" ").collect::<Vec<_>>();
match msg[0] {
"!nick" => {
if msg.get(1).is_none() {
self.irc.0.send_notice(&self.admin, "!nick takes an argument.")?;
return Ok(());
}
self.change_nick(msg[1].into())?;
self.irc.0.send_notice(&self.admin, "Done.")?;
},
unrec => {
self.irc.0.send_notice(&self.admin, &format!("Unknown command: {}", unrec))?;
}
}
Ok(())
}
fn handle_irc_message(&mut self, im: Message) -> Result<()> {
match im.command {
Command::NICK(nick) => {
if let Some(from) = im.prefix {
let from = from.split("!").collect::<Vec<_>>();
if let Some(&from) = from.get(0) {
if from == self.nick {
self.nick = nick;
}
}
}
},
Command::NOTICE(target, mesg) => {
if let Some(from) = im.prefix {
let from = from.split("!").collect::<Vec<_>>();
trace!("{} got NOTICE from {:?} to {}: {}", self.addr, from, target, mesg);
if from.len() < 1 {
return Ok(());
}
if from[0] != self.admin {
self.irc.0.send_notice(from[0], "You aren't the SMS bridge aministrator, so sending me NOTICEs will do nothing!")?;
return Ok(());
}
if target == self.nick {
debug!("Received contact command: {}", mesg);
self.process_admin_command(mesg)?;
}
}
},
Command::PRIVMSG(target, mesg) => {
if let Some(from) = im.prefix {
let from = from.split("!").collect::<Vec<_>>();
trace!("{} got PRIVMSG from {:?} to {}: {}", self.addr, from, target, mesg);
if from.len() < 1 {
return Ok(());
}
if from[0] != self.admin {
self.irc.0.send_notice(from[0], "Message not delivered; you aren't the SMS bridge administrator!")?;
return Ok(());
}
if target == self.nick {
debug!("{} -> {}: {}", from[0], self.addr, mesg);
self.modem_tx.unbounded_send(ModemCommand::SendMessage(self.addr.clone(), mesg)).unwrap();
}
}
},
Command::ERROR(msg) => {
return Err(format_err!("Error from server: {}", msg));
},
_ => {}
}
Ok(())
}
pub fn new(recip: Recipient, p: InitParameters) -> impl Future<Item = Self, Error = Error> {
let store = p.store;
let addr = match util::un_normalize_address(&recip.phone_number)
.ok_or(format_err!("invalid num {} in db", recip.phone_number)) {
Ok(r) => r,
Err(e) => return Either::B(futures::future::err(e.into()))
};
let (tx, rx) = mpsc::unbounded();
let modem_tx = p.cm.modem_tx.clone();
let admin = p.cfg.admin_nick.clone();
let cfg = Box::into_raw(Box::new(IrcConfig {
nickname: Some(recip.nick),
realname: Some(addr.to_string()),
server: Some(p.cfg.irc_hostname.clone()),
password: p.cfg.irc_password.clone(),
port: p.cfg.irc_port,
channels: Some(vec![p.cfg.irc_channel.clone()]),
..Default::default()
}));
// DODGY UNSAFE STUFF: The way IrcClient::new_future works is stupid.
// However, when the future it returns has resolved, it no longer
// holds a reference to the IrcConfig. Therefore, we fudge a 'static
// reference here (to satisfy the stupid method), and deallocate
// it later, when the future has resolved.
let cfgb: &'static IrcConfig = unsafe { &*cfg };
let fut = match IrcClient::new_future(p.hdl.clone(), cfgb) {
Ok(r) => r,
Err(e) => return Either::B(futures::future::err(e.into()))
};
let fut = fut
.then(move |res| {
let _ = unsafe { Box::from_raw(cfg) };
match res {
Ok(cli) => {
let irc_stream = cli.0.stream();
let nick = cli.0.current_nickname().into();
tx.unbounded_send(ContactManagerCommand::ProcessMessages)
.unwrap();
Ok(ContactManager {
irc: cli,
irc_stream,
id: false,
addr, store, modem_tx, tx, rx, admin, nick
})
},
Err(e) => {
Err(e.into())
}
}
});
Either::A(fut)
}
}

159
src/contact_factory.rs

@ -0,0 +1,159 @@ @@ -0,0 +1,159 @@
//! Manages the creation and maintenance of ContactManagers.
use config::Config;
use store::Store;
use comm::{ContactFactoryCommand, ContactManagerCommand, ChannelMaker, InitParameters};
use futures::{Future, Async, Poll, Stream};
use futures::sync::mpsc::UnboundedReceiver;
use std::collections::{HashMap, HashSet};
use tokio_core::reactor::Handle;
use huawei_modem::pdu::PduAddress;
use contact::ContactManager;
use util::{self, Result};
use models::Recipient;
use tokio_timer::Interval;
use failure::Error;
pub struct ContactFactory {
rx: UnboundedReceiver<ContactFactoryCommand>,
contacts_starting: HashMap<PduAddress, Box<Future<Item = ContactManager, Error = Error>>>,
contacts: HashMap<PduAddress, ContactManager>,
failed_contacts: HashSet<PduAddress>,
failure_int: Interval,
messages_processed: HashSet<i32>,
cfg: Config,
store: Store,
cm: ChannelMaker,
hdl: Handle
}
impl Future for ContactFactory {
type Item = ();
type Error = Error;
fn poll(&mut self) -> Poll<(), Error> {
while let Async::Ready(res) = self.rx.poll().unwrap() {
use self::ContactFactoryCommand::*;
let msg = res.expect("contactfactory rx died");
match msg {
ProcessMessages => self.process_messages()?,
LoadRecipients => self.load_recipients()?,
MakeContact(addr) => self.make_contact(addr)?,
}
}
let mut to_remove = vec![];
for (addr, fut) in self.contacts_starting.iter_mut() {
match fut.poll() {
Ok(Async::Ready(c)) => {
self.contacts.insert(addr.clone(), c);
to_remove.push(addr.clone())
},
Ok(Async::NotReady) => {},
Err(e) => {
warn!("Making contact for {} failed: {}", addr, e);
self.failed_contacts.insert(addr.clone());
to_remove.push(addr.clone())
}
}
}
for tr in to_remove {
self.contacts_starting.remove(&tr);
}
let mut to_remove = vec![];
for (addr, fut) in self.contacts.iter_mut() {
match fut.poll() {
Ok(Async::Ready(_)) => unreachable!(),
Ok(Async::NotReady) => {},
Err(e) => {
warn!("Contact for {} failed: {}", addr, e);
self.failed_contacts.insert(addr.clone());
to_remove.push(addr.clone())
}
}
}
for tr in to_remove {
self.contacts.remove(&tr);
}
while let Async::Ready(_) = self.failure_int.poll()? {
self.process_failures()?;
}
Ok(Async::NotReady)
}
}
impl ContactFactory {
pub fn new(cfg: Config, store: Store, mut cm: ChannelMaker, hdl: Handle) -> Self {
use std::time::{Instant, Duration};
let rx = cm.cf_rx.take().unwrap();
cm.cf_tx.unbounded_send(ContactFactoryCommand::LoadRecipients).unwrap();
let failure_int = Interval::new(Instant::now(), Duration::from_millis(cfg.failure_interval.unwrap_or(30000)));
Self {
rx, failure_int,
contacts_starting: HashMap::new(),
contacts: HashMap::new(),
failed_contacts: HashSet::new(),
messages_processed: HashSet::new(),
cfg, store, cm, hdl
}
}
fn process_failures(&mut self) -> Result<()> {
for addr in ::std::mem::replace(&mut self.failed_contacts, HashSet::new()) {
self.make_contact(addr)?;
}
Ok(())
}
fn get_init_parameters(&mut self) -> InitParameters {
InitParameters {
cfg: &self.cfg,
store: self.store.clone(),
cm: &mut self.cm,
hdl: &self.hdl
}
}
fn setup_recipient(&mut self, recip: Recipient) -> Result<()> {
let addr = util::un_normalize_address(&recip.phone_number)
.ok_or(format_err!("invalid num {} in db", recip.phone_number))?;
debug!("Setting up recipient for {} (nick {})", addr, recip.nick);
let cfut = {
let ip = self.get_init_parameters();
ContactManager::new(recip, ip)
};
self.contacts_starting.insert(addr, Box::new(cfut));
Ok(())
}
fn make_contact(&mut self, addr: PduAddress) -> Result<()> {
if let Some(recip) = self.store.get_recipient_by_addr_opt(&addr)? {
self.setup_recipient(recip)?;
}
else {
let nick = util::make_nick_for_address(&addr);
info!("Creating new recipient for {} (nick {})", addr, nick);
let recip = self.store.store_recipient(&addr, &nick)?;
self.setup_recipient(recip)?;
}
Ok(())
}
fn load_recipients(&mut self) -> Result<()> {
for recip in self.store.get_all_recipients()? {
self.setup_recipient(recip)?;
}
Ok(())
}
fn process_messages(&mut self) -> Result<()> {
for msg in self.store.get_all_messages()? {
if self.messages_processed.insert(msg.id) {
let addr = util::un_normalize_address(&msg.phone_number)
.ok_or(format_err!("invalid address {} in db", msg.phone_number))?;
if self.contacts_starting.get(&addr).is_some() {
continue;
}
if let Some(c) = self.contacts.get_mut(&addr) {
c.add_command(ContactManagerCommand::ProcessMessages);
continue;
}
self.make_contact(addr)?;
}
}
Ok(())
}
}

171
src/control.rs

@ -0,0 +1,171 @@ @@ -0,0 +1,171 @@
//! IRC bot that allows users to control the bridge.
//!
//! A lot of this module is a copypasta from src/contact.rs and I don't like it :(
use irc::client::PackedIrcClient;
use futures::sync::mpsc::{UnboundedSender, UnboundedReceiver};
use comm::{ControlBotCommand, ModemCommand, ContactFactoryCommand, InitParameters};
use failure::Error;
use futures::{self, Future, Async, Poll, Stream};
use futures::future::Either;
use irc::client::{IrcClient, ClientStream, Client};
use irc::proto::command::Command;
use irc::client::data::config::Config as IrcConfig;
use irc::proto::message::Message;
use irc::client::ext::ClientExt;
use util::Result;
static HELPTEXT: &str = r#"sms-irc help:
[in this admin room]
- !csq: check modem signal quality
- !reg: check modem registration status
- !sms <num>: start a conversation with a given phone number
[in a /NOTICE to one of the ghosts]
- !nick <nick>: change nickname"#;
pub struct ControlBot {
irc: PackedIrcClient,
irc_stream: ClientStream,
chan: String,
admin: String,
id: bool,
rx: UnboundedReceiver<ControlBotCommand>,
cf_tx: UnboundedSender<ContactFactoryCommand>,
m_tx: UnboundedSender<ModemCommand>
}
impl Future for ControlBot {
type Item = ();
type Error = Error;
fn poll(&mut self) -> Poll<(), Error> {
if !self.id {
self.irc.0.identify()?;
self.id = true;
}
while let Async::Ready(_) = self.irc.1.poll()? {}
while let Async::Ready(res) = self.irc_stream.poll()? {
let msg = res.ok_or(format_err!("irc_stream stopped"))?;
self.handle_irc_message(msg)?;
}
while let Async::Ready(cbc) = self.rx.poll().unwrap() {
let cbc = cbc.ok_or(format_err!("controlbot rx died"))?;
self.handle_int_rx(cbc)?;
}
Ok(Async::NotReady)
}
}
impl ControlBot {
fn process_admin_command(&mut self, mesg: String) -> Result<()> {
if mesg.len() < 1 || mesg.chars().nth(0) != Some('!') {
return Ok(());
}
let msg = mesg.split(" ").collect::<Vec<_>>();
match msg[0] {
"!csq" => {
self.m_tx.unbounded_send(ModemCommand::RequestCsq).unwrap();
},
"!reg" => {
self.m_tx.unbounded_send(ModemCommand::RequestReg).unwrap();
},
"!sms" => {
if msg.get(1).is_none() {
self.irc.0.send_privmsg(&self.chan, "!sms takes an argument.")?;
return Ok(());
}
let addr = msg[1].parse().unwrap();
self.cf_tx.unbounded_send(ContactFactoryCommand::MakeContact(addr)).unwrap();
},
"!help" => {
for line in HELPTEXT.lines() {
self.irc.0.send_privmsg(&self.chan, line)?;
}
},
unrec => {
self.irc.0.send_privmsg(&self.chan, &format!("Unknown command: {}", unrec))?;
}
}
Ok(())
}
fn handle_irc_message(&mut self, im: Message) -> Result<()> {
match im.command {
Command::PRIVMSG(target, mesg) => {
if let Some(from) = im.prefix {
let from = from.split("!").collect::<Vec<_>>();
trace!("ctl got PRIVMSG from {:?} to {}: {}", from, target, mesg);
if from.len() < 1 {
return Ok(());
}
if from[0] != self.admin {
return Ok(());
}
if target == self.chan {
debug!("Received control command: {}", mesg);
self.process_admin_command(mesg)?;
}
}
},
Command::ERROR(msg) => {
return Err(format_err!("Error from server: {}", msg));
},
_ => {}
}
Ok(())
}
fn handle_int_rx(&mut self, m: ControlBotCommand) -> Result<()> {
use self::ControlBotCommand::*;
match m {
Log(log) => {
self.irc.0.send_notice(&self.chan, &log)?;
},
CsqResult(sq) => {
self.irc.0.send_privmsg(&self.chan, &format!("RSSI: {} | BER: {}", sq.rssi, sq.ber))?;
},
RegResult(st) => {
self.irc.0.send_privmsg(&self.chan, &format!("Registration state: {}", st))?;
},
}
Ok(())
}
pub fn new(p: InitParameters) -> impl Future<Item = Self, Error = Error> {
let cf_tx = p.cm.cf_tx.clone();
let m_tx = p.cm.modem_tx.clone();
let rx = p.cm.cb_rx.take().unwrap();
let admin = p.cfg.admin_nick.clone();
let chan = p.cfg.irc_channel.clone();
let cfg = Box::into_raw(Box::new(IrcConfig {
nickname: Some(p.cfg.control_bot_nick.clone().unwrap_or("smsirc".into())),
realname: Some("smsirc control bot".into()),
server: Some(p.cfg.irc_hostname.clone()),
password: p.cfg.irc_password.clone(),
port: p.cfg.irc_port,
channels: Some(vec![p.cfg.irc_channel.clone()]),
..Default::default()
}));
// DODGY UNSAFE STUFF: see src/contact.rs
let cfgb: &'static IrcConfig = unsafe { &*cfg };
let fut = match IrcClient::new_future(p.hdl.clone(), cfgb) {
Ok(r) => r,
Err(e) => return Either::B(futures::future::err(e.into()))
};
let fut = fut
.then(move |res| {
let _ = unsafe { Box::from_raw(cfg) };
match res {
Ok(cli) => {
let irc_stream = cli.0.stream();
Ok(ControlBot {
irc: cli,
irc_stream,
id: false,
cf_tx, m_tx, rx, admin, chan
})
},
Err(e) => {
Err(e.into())
}
}
});
Either::A(fut)
}
}

122
src/main.rs

@ -0,0 +1,122 @@ @@ -0,0 +1,122 @@
extern crate irc;
extern crate futures;
extern crate tokio_core;
extern crate huawei_modem;
#[macro_use] extern crate diesel;
extern crate r2d2;
extern crate r2d2_diesel;
extern crate serde;
#[macro_use] extern crate serde_derive;
extern crate toml;
#[macro_use] extern crate failure;
#[macro_use] extern crate log;
extern crate log4rs;
extern crate tokio_timer;
mod config;
mod store;
mod modem;
mod comm;
mod util;
mod schema;
mod models;
mod contact;
mod contact_factory;
mod control;
use config::Config;
use store::Store;
use modem::ModemManager;
use control::ControlBot;
use comm::{ChannelMaker, InitParameters};
use futures::Future;
use contact_factory::ContactFactory;
use futures::sync::mpsc::UnboundedSender;
use comm::ControlBotCommand;
use tokio_core::reactor::Core;
use log4rs::config::{Appender, Logger, Root};
use log4rs::config::Config as LogConfig;
use log4rs::append::Append;
use log4rs::append::console::ConsoleAppender;
use log::Record;
use std::fmt;
pub struct IrcLogWriter {
sender: UnboundedSender<ControlBotCommand>
}
impl fmt::Debug for IrcLogWriter {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "IrcLogWriter {{ /* fields hidden */ }}")
}
}
impl Append for IrcLogWriter {
fn append(&self, rec: &Record) -> Result<(), Box<::std::error::Error + Sync + Send>> {
self.sender.unbounded_send(
ControlBotCommand::Log(format!("{}:{} -- {}", rec.target(), rec.level(), rec.args())))
.unwrap();
Ok(())
}
fn flush(&self) {
}
}
fn main() -> Result<(), failure::Error> {
eprintln!("[+] smsirc starting -- reading config file");
let config_path = ::std::env::var("SMSIRC_CONFIG")
.unwrap_or("config.toml".to_string());
eprintln!("[+] config path: {} (set SMSIRC_CONFIG to change)", config_path);
let config: Config = toml::from_str(&::std::fs::read_to_string(config_path)?)?;
let stdout = ConsoleAppender::builder().build();
let mut cm = ChannelMaker::new();
let ilw = IrcLogWriter { sender: cm.cb_tx.clone() };
eprintln!("[+] initialising better logging system");
let cll = config.chan_loglevel.as_ref().map(|x| x as &str).unwrap_or("info").parse()?;
let pll = config.stdout_loglevel.as_ref().map(|x| x as &str).unwrap_or("info").parse()?;
let log_config = LogConfig::builder()
.appender(Appender::builder().build("stdout", Box::new(stdout)))
.appender(Appender::builder().build("irc_chan", Box::new(ilw)))
.logger(Logger::builder()
.appender("irc_chan")
.build("sms_irc", cll))
.build(Root::builder().appender("stdout").build(pll))?;
log4rs::init_config(log_config)?;
info!("Logging initialized");
info!("Connecting to database");
let store = Store::new(&config)?;
info!("Initializing tokio");
let mut core = Core::new()?;
let hdl = core.handle();
info!("Initializing modem");
let mm = core.run(ModemManager::new(InitParameters {
cfg: &config,
store: store.clone(),
cm: &mut cm,
hdl: &hdl
}))?;
hdl.spawn(mm.map_err(|e| {
// FIXME: restartability
error!("ModemManager failed: {}", e);
panic!("modemmanager failed");
}));
info!("Initializing control bot");
let cb = core.run(ControlBot::new(InitParameters {
cfg: &config,
store: store.clone(),
cm: &mut cm,
hdl: &hdl
}))?;
hdl.spawn(cb.map_err(|e| {
// FIXME: restartability
error!("ControlBot failed: {}", e);
panic!("controlbot failed");
}));
info!("Initializing contact factory");
let cf = ContactFactory::new(config, store, cm, hdl);
let _ = core.run(cf.map_err(|e| {
error!("ContactFactory failed: {}", e);
panic!("contactfactory failed");
}));
Ok(())
}

28
src/models.rs

@ -0,0 +1,28 @@ @@ -0,0 +1,28 @@
use schema::{recipients, messages};
#[derive(Queryable)]
pub struct Recipient {
pub id: i32,
pub phone_number: String,
pub nick: String
}
#[derive(Insertable)]
#[table_name="recipients"]
pub struct NewRecipient<'a> {
pub phone_number: &'a str,
pub nick: &'a str
}
#[derive(Queryable, Debug)]
pub struct Message {
pub id: i32,
pub phone_number: String,
pub pdu: Vec<u8>,
pub csms_data: Option<i32>
}
#[derive(Insertable)]
#[table_name="messages"]
pub struct NewMessage<'a> {
pub phone_number: &'a str,
pub pdu: &'a [u8],
pub csms_data: Option<i32>
}

222
src/modem.rs

@ -0,0 +1,222 @@ @@ -0,0 +1,222 @@
//! Modem management.
use huawei_modem::{HuaweiModem, cmd};
use futures::{self, Future, Stream, Poll, Async};
use futures::future::Either;
use tokio_core::reactor::Handle;
use futures::sync::mpsc::{UnboundedSender, UnboundedReceiver};
use huawei_modem::at::AtResponse;
use comm::{ModemCommand, ContactFactoryCommand, ControlBotCommand, InitParameters};
use tokio_timer::Interval;
use std::time::{Instant, Duration};
use store::Store;
use huawei_modem::cmd::sms::SmsMessage;
use huawei_modem::pdu::{Pdu, PduAddress};
use huawei_modem::gsm_encoding::GsmMessageData;
use huawei_modem::errors::HuaweiError;
use failure::Error;
use util::Result;
pub struct ModemManager {
modem: HuaweiModem,
store: Store,
handle: Handle,
rx: UnboundedReceiver<ModemCommand>,
urc_rx: UnboundedReceiver<AtResponse>,
cf_tx: UnboundedSender<ContactFactoryCommand>,
int_tx: UnboundedSender<ModemCommand>,
cb_tx: UnboundedSender<ControlBotCommand>,
}
impl Future for ModemManager {
type Item = ();
type Error = Error;
fn poll(&mut self) -> Poll<(), Error> {
while let Async::Ready(urc) = self.urc_rx.poll().unwrap() {
let urc = urc.expect("urc_rx stopped producing");
trace!("received URC: {:?}", urc);
if let AtResponse::InformationResponse { param, .. } = urc {
if param == "+CMTI" {
debug!("received CMTI indication");
self.cmgl();
}
}
}
while let Async::Ready(msg) = self.rx.poll().unwrap() {
use self::ModemCommand::*;
let msg = msg.expect("rx stopped producing");
match msg {
DoCmgl => self.cmgl(),
CmglComplete(msgs) => self.cmgl_complete(msgs)?,
CmglFailed(e) => self.cmgl_failed(e),
SendMessage(addr, msg) => self.send_message(addr, msg),
RequestCsq => self.request_csq(),
RequestReg => self.request_reg()
}
}
Ok(Async::NotReady)
}
}
impl ModemManager {
pub fn new(p: InitParameters) -> impl Future<Item = Self, Error = Error> {
let mut modem = match HuaweiModem::new_from_path(&p.cfg.modem_path, p.hdl) {
Ok(m) => m,
Err(e) => return Either::B(futures::future::err(e.into()))
};
let handle = p.hdl.clone();
let urc_rx = modem.take_urc_rx().unwrap();
let cs = p.cfg.cmgl_secs;
let rx = p.cm.modem_rx.take().unwrap();
let int_tx = p.cm.modem_tx.clone();
let cf_tx = p.cm.cf_tx.clone();
let cb_tx = p.cm.cb_tx.clone();
int_tx.unbounded_send(ModemCommand::DoCmgl).unwrap();
if let Some(cs) = cs {
let int_tx_timer = p.cm.modem_tx.clone();
let timer = Interval::new(Instant::now(), Duration::new(cs as _, 0))
.map_err(|e| {
error!("CMGL timer failed: {}", e);
panic!("timer failed!");
}).for_each(move |_| {
trace!("CMGL timer triggered.");
int_tx_timer.unbounded_send(ModemCommand::DoCmgl).unwrap();
Ok(())
});
p.hdl.spawn(timer);
}
let store = p.store;
let fut = cmd::sms::set_sms_textmode(&mut modem, false)
.map_err(|e| Error::from(e))
.join(cmd::sms::set_new_message_indications(&mut modem,
cmd::sms::NewMessageNotification::SendDirectlyOrBuffer,
cmd::sms::NewMessageStorage::StoreAndNotify)
.map_err(|e| Error::from(e)))
.then(move |res| {
if let Err(e) = res {
warn!("Failed to set +CNMI: {}", e);
if cs.is_none() {
error!("+CNMI support is not available, and cmgl_secs is not provided in the config");
return Err(format_err!("no CNMI support, and no cmgl_secs"));
}
}
Ok(Self {
modem, rx, store, cf_tx, urc_rx, handle, int_tx, cb_tx
})
});
Either::A(fut)
}
fn request_reg(&mut self) {
let tx = self.cb_tx.clone();
let fut = cmd::network::get_registration(&mut self.modem)
.then(move |res| {
match res {
Ok(res) => tx.unbounded_send(ControlBotCommand::RegResult(res)).unwrap(),
Err(e) => warn!("Error getting registration: {}", e)
}
Ok(())
});
self.handle.spawn(fut);
}
fn request_csq(&mut self) {
let tx = self.cb_tx.clone();
let fut = cmd::network::get_signal_quality(&mut self.modem)
.then(move |res| {
match res {
Ok(res) => tx.unbounded_send(ControlBotCommand::CsqResult(res)).unwrap(),
Err(e) => warn!("Error getting signal quality: {}", e)
}
Ok(())
});
self.handle.spawn(fut);
}
fn cmgl_complete(&mut self, msgs: Vec<SmsMessage>) -> Result<()> {
use huawei_modem::cmd::sms::{MessageStatus, DeletionOptions};
debug!("+CMGL complete");
trace!("messages received from CMGL: {:?}", msgs);
for msg in msgs {
trace!("processing message: {:?}", msg);
if msg.status != MessageStatus::ReceivedUnread {
continue;
}
let data = msg.pdu.get_message_data();
let csms_data = match data.decode_message() {
Ok(m) => {
m.udh
.and_then(|x| x.get_concatenated_sms_data())
.map(|x| x.reference as i32)
},
Err(e) => {
// The error is reported when the message is sent
// via the ContactManager, not here.
debug!("Error decoding message - but it'll be reported later: {:?}", e);
None
}
};
if let Some(d) = csms_data {
trace!("Message is concatenated: {:?}", d);
}
let addr = msg.pdu.originating_address;
self.store.store_message(&addr, &msg.raw_pdu, csms_data)?;
}
self.cf_tx.unbounded_send(ContactFactoryCommand::ProcessMessages).unwrap();
let fut = cmd::sms::del_sms_pdu(&mut self.modem, DeletionOptions::DeleteReadAndOutgoing)
.map_err(|e| {
warn!("Failed to delete messages: {}", e);
});
self.handle.spawn(fut);
Ok(())
}
fn cmgl_failed(&mut self, e: HuaweiError) {
// FIXME: retry perhaps?
error!("+CMGL failed: {}", e);
}
fn send_message(&mut self, addr: PduAddress, msg: String) {
let data = GsmMessageData::encode_message(&msg);
let parts = data.len();
debug!("Sending {}-part message to {}...", parts, addr);
trace!("Message content: {}", msg);
let mut futs = vec![];
for (i, part) in data.into_iter().enumerate() {
debug!("Sending part {}/{} of message to {}...", i+1, parts, addr);
let pdu = Pdu::make_simple_message(addr.clone(), part);
trace!("PDU: {:?}", pdu);
futs.push(cmd::sms::send_sms_pdu(&mut self.modem, &pdu)
.map_err(|e| e.into()));
}
let a1 = addr.clone();
let fut = futures::future::join_all(futs)
.map(move |res| {
info!("Message to {} sent!", a1);
debug!("Message ids: {:?}", res);
}).map_err(move |e: ::failure::Error| {
// FIXME: retrying?
warn!("Failed to send message to {}: {}", addr, e);
});
self.handle.spawn(fut);
}
fn cmgl(&mut self) {
use huawei_modem::cmd::sms::MessageStatus;
let tx = self.int_tx.clone();
let fut = cmd::sms::list_sms_pdu(&mut self.modem, MessageStatus::All)
.then(move |results| {
match results {
Ok(results) => {
tx.unbounded_send(
ModemCommand::CmglComplete(results)).unwrap();
},
Err(e) => {
tx.unbounded_send(
ModemCommand::CmglFailed(e)).unwrap();
}
}
let res: ::std::result::Result<(), ()> = Ok(());
res
});
self.handle.spawn(fut);
}
}

19
src/schema.rs

@ -0,0 +1,19 @@ @@ -0,0 +1,19 @@
//! Diesel DSL schema (generated by `diesel print-schema`)
table! {
messages (id) {
id -> Int4,
phone_number -> Varchar,
pdu -> Bytea,
csms_data -> Nullable<Int4>,
}
}
table! {
recipients (id) {
id -> Int4,
phone_number -> Varchar,
nick -> Varchar,
}
}

123
src/store.rs

@ -0,0 +1,123 @@ @@ -0,0 +1,123 @@
//! Handles database stuff.
use diesel::PgConnection;
use r2d2_diesel::ConnectionManager;
use r2d2::Pool;
use config::Config;
use std::sync::Arc;
use huawei_modem::pdu::PduAddress;
use diesel::prelude::*;
use util::{self, Result};
use models::*;
#[derive(Clone)]
pub struct Store {
inner: Arc<Pool<ConnectionManager<PgConnection>>>
}
impl Store {
pub fn new(cfg: &Config) -> Result<Self> {
let manager = ConnectionManager::new(cfg.database_url.clone());
let pool = Pool::builder()
.build(manager)?;
Ok(Self {
inner: Arc::new(pool)
})
}
pub fn store_message(&mut self, addr: &PduAddress, pdu: &[u8], csms_data: Option<i32>) -> Result<Message> {
use schema::messages;
let num = util::normalize_address(addr);
let nm = NewMessage {
phone_number: &num,
pdu,
csms_data
};
let conn = self.inner.get()?;
let res = ::diesel::insert_into(messages::table)
.values(&nm)
.get_result(&*conn)?;
Ok(res)
}
pub fn store_recipient(&mut self, addr: &PduAddress, nick: &str) -> Result<Recipient> {
use schema::recipients;
let num = util::normalize_address(addr);
let nr = NewRecipient {
phone_number: &num,
nick
};
let conn = self.inner.get()?;
let res = ::diesel::insert_into(recipients::table)
.values(&nr)
.get_result(&*conn)?;
Ok(res)
}
pub fn update_recipient_nick(&mut self, addr: &PduAddress, n: &str) -> Result<()> {
use schema::recipients::dsl::*;
let conn = self.inner.get()?;
let num = util::normalize_address(addr);
::diesel::update(recipients)
.filter(phone_number.eq(num))
.set(nick.eq(n))
.execute(&*conn)?;
Ok(())
}
pub fn get_recipient_by_addr_opt(&mut self, addr: &PduAddress) -> Result<Option<Recipient>> {
use schema::recipients::dsl::*;
let conn = self.inner.get()?;
let num = util::normalize_address(addr);
let res = recipients.filter(phone_number.eq(num))
.first(&*conn)
.optional()?;
Ok(res)
}
pub fn get_all_recipients(&mut self) -> Result<Vec<Recipient>> {
use schema::recipients::dsl::*;
let conn = self.inner.get()?;
let res = recipients
.load(&*conn)?;
Ok(res)
}
pub fn get_all_messages(&mut self) -> Result<Vec<Message>> {
use schema::messages::dsl::*;
let conn = self.inner.get()?;
let res = messages
.load(&*conn)?;
Ok(res)
}
pub fn get_messages_for_recipient(&mut self, addr: &PduAddress) -> Result<Vec<Message>> {
use schema::messages::dsl::*;
let conn = self.inner.get()?;
let num = util::normalize_address(addr);
let res = messages.filter(phone_number.eq(num))
.load(&*conn)?;
Ok(res)
}
pub fn get_all_concatenated(&mut self, num: &str, rf: i32) -> Result<Vec<Message>> {
use schema::messages::dsl::*;
let conn = self.inner.get()?;
let res = messages.filter(csms_data.eq(rf)
.and(phone_number.eq(num)))
.load(&*conn)?;
Ok(res)
}
pub fn delete_message(&mut self, mid: i32) -> Result<()> {
use schema::messages::dsl::*;
let conn = self.inner.get()?;
let rows_affected = ::diesel::delete(messages.filter(id.eq(mid)))
.execute(&*conn)?;
if rows_affected == 0 {
return Err(format_err!("no rows affected deleting mid #{}", mid));
}
Ok(())
}
}

47
src/util.rs

@ -0,0 +1,47 @@ @@ -0,0 +1,47 @@
//! Helpful utility functions.
use huawei_modem::pdu::{PduAddress, AddressType};
use huawei_modem::convert::TryFrom;
pub type Result<T> = ::std::result::Result<T, ::failure::Error>;
pub fn normalize_address(addr: &PduAddress) -> String {
let ton: u8 = addr.type_addr.into();
let mut ret = format!("{:02X}", ton);
for b in addr.number.0.iter() {
ret += &format!("{}", b);
}
ret
}
pub fn un_normalize_address(addr: &str) -> Option<PduAddress> {
if addr.len() < 3 {
return None;
}
let toa = u8::from_str_radix(&addr[0..2], 16).ok()?;
let toa = AddressType::try_from(toa).ok()?;
let mut addr: PduAddress = addr.parse().unwrap();
addr.number.0.remove(0);
addr.number.0.remove(0);
addr.type_addr = toa;
Some(addr)
}
pub fn make_nick_for_address(addr: &PduAddress) -> String {
let mut ret = "S".to_string();
for ch in addr.to_string().chars() {
match ch {
'+' => ret.push('I'),
'_' => ret.push('_'),
'-' => ret.push('-'),
'\\' => ret.push('\\'),
'[' => ret.push('['),