Browse Source

Initial (non-working) IRC server/s2c code

- A new macro, sink_outbox!, was added to deduplicate the same
  "send messages into this futures sink from this outbox array"
  behaviour used in multiple places now.
- We now have the beginnings of an IRC server / s2c protocol impl!
  It handles client registration, and pretty much nothing else
  after that at this stage - but it's getting along!
master
eta 3 years ago
parent
commit
1d0f479f52
  1. 6
      src/config.rs
  2. 13
      src/insp_s2s.rs
  3. 185
      src/irc_s2c.rs
  4. 178
      src/irc_s2c_registration.rs
  5. 28
      src/irc_s2c_v3.rs
  6. 31
      src/main.rs
  7. 18
      src/util.rs

6
src/config.rs

@ -6,6 +6,8 @@ pub struct Config { @@ -6,6 +6,8 @@ pub struct Config {
#[serde(default)]
pub insp_s2s: Option<InspConfig>,
#[serde(default)]
pub irc_server: Option<IrcServerConfig>,
#[serde(default)]
pub modem: ModemConfig,
#[serde(default)]
pub whatsapp: WhatsappConfig,
@ -71,6 +73,10 @@ pub struct IrcClientConfig { @@ -71,6 +73,10 @@ pub struct IrcClientConfig {
pub webirc_password: Option<String>
}
#[derive(Deserialize, Debug, Clone)]
pub struct IrcServerConfig {
pub listen: String
}
#[derive(Deserialize, Debug, Clone)]
pub struct InspConfig {
pub sid: String,
pub control_nick: String,

13
src/insp_s2s.rs

@ -4,7 +4,7 @@ use tokio_codec::Framed; @@ -4,7 +4,7 @@ use tokio_codec::Framed;
use irc::proto::IrcCodec;
use irc::proto::message::Message;
use irc::proto::command::Command;
use futures::{Future, Async, Poll, Stream, Sink, AsyncSink, self};
use futures::{Future, Async, Poll, Stream, Sink, self};
use futures::future::Either;
use futures::sync::mpsc::{UnboundedReceiver, UnboundedSender};
use crate::comm::{ControlBotCommand, ContactFactoryCommand, InitParameters, WhatsappCommand, ModemCommand, ContactManagerCommand};
@ -84,16 +84,7 @@ impl Future for InspLink { @@ -84,16 +84,7 @@ impl Future for InspLink {
self.handle_cf_command(cfc)?;
}
}
for msg in ::std::mem::replace(&mut self.outbox, vec![]) {
trace!("--> {:?}", msg);
match self.conn.start_send(msg)? {
AsyncSink::Ready => {},
AsyncSink::NotReady(val) => {
self.outbox.push(val);
}
}
}
self.conn.poll_complete()?;
sink_outbox!(self, outbox, conn, "");
Ok(Async::NotReady)
}
}

185
src/irc_s2c.rs

@ -0,0 +1,185 @@ @@ -0,0 +1,185 @@
//! Acting as an IRC server (IRC server-to-client protocol)
//!
//! Based heavily off https://modern.ircdocs.horse/
use tokio_core::net::{TcpListener, Incoming, TcpStream};
use tokio_codec::Framed;
use futures::sync::mpsc::{UnboundedReceiver, UnboundedSender};
use irc::proto::IrcCodec;
use irc::proto::message::Message;
use irc::proto::command::Command;
use futures::{Future, Async, Poll, Stream, Sink, AsyncSink, self};
use failure::Error;
use std::net::{SocketAddr, ToSocketAddrs};
use crate::util::{self, Result};
use crate::irc_s2c_registration::{PendingIrcConnectionWrapper, RegistrationInformation};
use crate::config::IrcServerConfig;
use crate::comm::InitParameters;
use crate::store::Store;
pub static SERVER_NAME: &str = "sms-irc.";
pub static USER_MODES: &str = "i";
pub static CHANNEL_MODES: &str = "nt";
pub static MOTD: &str = r#"Welcome to sms-irc!
This is the experimental IRC server backend.
Please refer to https://git.theta.eu.org/sms-irc.git/about/
for more information about sms-irc.
Alternatively, come and chat to us in #sms-irc on chat.freenode.net
if you have comments or want help using the software!"#;
pub struct IrcConnection {
sock: Framed<TcpStream, IrcCodec>,
addr: SocketAddr,
reginfo: RegistrationInformation,
outbox: Vec<Message>,
new: bool
}
pub struct IrcServer {
cfg: IrcServerConfig,
store: Store,
incoming: Incoming,
connections: Vec<IrcConnection>,
pending: Vec<PendingIrcConnectionWrapper>
}
impl Future for IrcConnection {
type Item = ();
type Error = Error;
fn poll(&mut self) -> Poll<(), Error> {
if self.new {
self.new = false;
self.on_new()?;
}
while let Async::Ready(msg) = self.sock.poll()? {
let msg = msg.ok_or(format_err!("Socket disconnected"))?;
trace!("<-- [{}] {}", self.addr, msg);
self.handle_remote_message(msg)?;
}
sink_outbox!(self, outbox, sock, self.addr);
Ok(Async::NotReady)
}
}
impl Future for IrcServer {
type Item = ();
type Error = Error;
fn poll(&mut self) -> Poll<(), Error> {
while let Async::Ready(inc) = self.incoming.poll()? {
let (ts, sa) = inc.ok_or(format_err!("TCP listener stopped"))?;
info!("New connection from {}", sa);
let pending = PendingIrcConnectionWrapper::from_incoming(ts, sa)?;
self.pending.push(pending);
}
let mut to_remove = vec![];
for (i, p) in self.pending.iter_mut().enumerate() {
match p.poll() {
Ok(Async::Ready(c)) => {
info!("Connection on {} completed registration", c.addr);
self.connections.push(c);
to_remove.push(i);
},
Ok(Async::NotReady) => {},
Err(e) => {
to_remove.push(i);
info!("Pending connection closed: {}", e);
}
}
}
while let Some(i) = to_remove.pop() {
self.pending.remove(i);
}
for (i, c) in self.connections.iter_mut().enumerate() {
if let Err(e) = c.poll() {
info!("Connection on {} closed: {}", c.addr, e);
to_remove.push(i);
}
}
while let Some(i) = to_remove.pop() {
self.connections.remove(i);
}
Ok(Async::NotReady)
}
}
impl IrcServer {
pub fn new(p: InitParameters<IrcServerConfig>) -> Result<Self> {
let store = p.store;
let cfg = p.cfg2.clone();
let addr = cfg.listen.to_socket_addrs()?
.nth(0)
.ok_or(format_err!("no listen addresses found"))?;
let listener = TcpListener::bind(&addr, &p.hdl)?;
info!("Listening on {} for connections", addr);
let incoming = listener.incoming();
Ok(Self {
store, cfg, incoming,
connections: vec![],
pending: vec![]
})
}
}
impl IrcConnection {
pub fn from_pending(
sock: Framed<TcpStream, IrcCodec>,
addr: SocketAddr,
reginfo: RegistrationInformation
) -> Self {
Self {
sock, addr, reginfo,
outbox: vec![],
new: true
}
}
fn reply_s2c<'a, T: Into<Option<&'a str>>>(&mut self, cmd: &str, args: Vec<&str>, suffix: T) -> Result<()> {
let mut new_args = vec![&self.reginfo.nick as &str];
new_args.extend(args.into_iter());
self.outbox.push(Message::new(Some(&SERVER_NAME), cmd, new_args, suffix.into())?);
Ok(())
}
fn on_new(&mut self) -> Result<()> {
self.reply_s2c("001", vec![], "Welcome to sms-irc, a SMS/WhatsApp to IRC bridge!")?;
self.reply_s2c("002", vec![], &format!("This is sms-irc version {}, running in IRC server mode.", env!("CARGO_PKG_VERSION")) as &str)?;
self.reply_s2c("003", vec![], "(This server doesn't keep creation timestamp information at present.)")?;
let server_version = format!("sms-irc-{}", env!("CARGO_PKG_VERSION"));
self.reply_s2c("004", vec![SERVER_NAME, &server_version, USER_MODES, CHANNEL_MODES], None)?;
self.reply_s2c("005",
vec!["AWAYLEN=200", "CASEMAPPING=ascii", "NETWORK=sms-irc", "NICKLEN=100"],
"are supported by this server")?;
self.send_motd()?;
Ok(())
}
fn send_motd(&mut self) -> Result<()> {
self.reply_s2c("375", vec![], "- Message of the day -")?;
for line in MOTD.lines() {
self.reply_s2c("372", vec![], Some(line))?;
}
self.reply_s2c("376", vec![], "End of /MOTD command.")?;
Ok(())
}
fn handle_remote_message(&mut self, msg: Message) -> Result<()> {
match msg.command {
Command::PING(tok, _) => {
self.reply_s2c("PONG", vec![], Some(&tok as &str))?;
},
Command::QUIT(_) => {
Err(format_err!("Client quit"))?;
},
u => {
// FIXME: the irc crate is hacky, and requires hacky workarounds
let st: String = (&u).into();
warn!("Got unknown command: {}", st.trim());
let verb = st.split(" ").next().unwrap();
self.reply_s2c("421", vec![verb], "Unknown or unimplemented command.")?;
}
}
Ok(())
}
}

178
src/irc_s2c_registration.rs

@ -0,0 +1,178 @@ @@ -0,0 +1,178 @@
//! Handling IRC s2c connection registration.
use futures::{Future, Async, Poll, Stream, Sink, self};
use irc::proto::IrcCodec;
use tokio_core::net::TcpStream;
use tokio_codec::Framed;
use irc::proto::message::Message;
use irc::proto::command::Command;
use failure::Error;
use std::net::SocketAddr;
use crate::util::Result;
use crate::irc_s2c_v3::{IrcCap, SUPPORTED_CAPS};
use crate::irc_s2c::{IrcConnection, SERVER_NAME};
pub struct RegistrationInformation {
pub nick: String,
pub user: String,
pub realname: String,
pub caps: Vec<IrcCap>
}
struct PendingIrcConnection {
sock: Framed<TcpStream, IrcCodec>,
addr: SocketAddr,
cap_ended: bool,
nick: Option<String>,
user: Option<String>,
realname: Option<String>,
caps: Vec<IrcCap>,
outbox: Vec<Message>,
new: bool
}
pub struct PendingIrcConnectionWrapper {
inner: Option<PendingIrcConnection>
}
impl PendingIrcConnectionWrapper {
pub fn from_incoming(ts: TcpStream, sa: SocketAddr) -> Result<Self> {
let codec = IrcCodec::new("utf8")?;
let ic = PendingIrcConnection {
sock: Framed::new(ts, codec),
addr: sa,
cap_ended: true,
nick: None,
user: None,
realname: None,
caps: vec![],
outbox: vec![],
new: true
};
Ok(Self { inner: Some(ic) })
}
}
impl Future for PendingIrcConnectionWrapper {
type Item = IrcConnection;
type Error = Error;
fn poll(&mut self) -> Poll<IrcConnection, Error> {
let conn = self.inner.as_mut()
.expect("PendingIrcConnectionWrapper polled after completion");
if conn.new {
conn.new = false;
conn.on_new()?;
}
while let Async::Ready(msg) = conn.sock.poll()? {
let msg = msg.ok_or(format_err!("Socket disconnected"))?;
trace!("<-- [{}] {}", conn.addr, msg);
conn.handle_remote_message(msg)?;
}
sink_outbox!(conn, outbox, sock, conn.addr);
// we can do this sorta stuff now, because NLL
if conn.is_done()? {
let conn = self.inner.take().unwrap();
let reginfo = RegistrationInformation {
nick: conn.nick.unwrap(),
user: conn.user.unwrap(),
realname: conn.realname.unwrap(),
caps: conn.caps
};
let ret = IrcConnection::from_pending(conn.sock, conn.addr, reginfo);
Ok(Async::Ready(ret))
}
else {
Ok(Async::NotReady)
}
}
}
impl PendingIrcConnection {
fn reply(&mut self, cmd: &str, mut args: Vec<&str>, suffix: Option<&str>) -> Result<()> {
args.insert(0, "*");
self.outbox.push(Message::new(Some(&SERVER_NAME), cmd, args, suffix)?);
Ok(())
}
fn on_new(&mut self) -> Result<()> {
self.reply("NOTICE", vec![], Some("sms-irc initialized, please go on"))?;
Ok(())
}
fn handle_remote_message(&mut self, msg: Message) -> Result<()> {
use irc::proto::command::CapSubCommand;
match msg.command {
Command::NICK(name) => {
// FIXME: add some sort of validation?
self.nick = Some(name);
},
Command::USER(user, _, realname) => {
self.user = Some(user);
self.realname = Some(realname);
},
Command::CAP(_, CapSubCommand::LS, _, _) => {
// Setting cap_ended here means is_done() won't
// return true until the user sends CAP END.
self.cap_ended = false;
self.reply("CAP", vec!["LS"], Some(SUPPORTED_CAPS))?;
},
Command::CAP(_, CapSubCommand::REQ, Some(caps), None) | Command::CAP(_, CapSubCommand::REQ, None, Some(caps)) => {
for cap in caps.split(" ") {
if let Some(c) = IrcCap::from_cap_name(cap) {
debug!("Negotiated capability: {:?}", c);
self.caps.push(c);
self.reply("CAP", vec!["ACK"], Some(cap))?;
}
else {
self.reply("CAP", vec!["NAK"], Some(cap))?;
}
}
},
Command::CAP(_, CapSubCommand::LIST, _, _) => {
// FIXME: this iterator chain may not be optimal
let caps = self.caps
.iter()
.map(|x| x.cap_name())
.collect::<Vec<_>>()
.join(" ");
self.reply("CAP", vec!["LIST"], Some(&caps))?;
},
Command::CAP(_, CapSubCommand::END, _, _) => {
self.cap_ended = true;
},
Command::PASS(_) => {
// FIXME: maybe add support in the future?
},
Command::QUIT(_) => {
Err(format_err!("Client quit"))?
},
Command::Raw(cmd, args, trailing) => {
if cmd == "USER" && args.len() > 1 {
if trailing.is_none() {
self.user = Some(args[0].to_owned());
self.realname = Some(args.last().unwrap().to_owned());
}
else {
self.user = Some(args[0].to_owned());
self.realname = trailing;
}
}
else {
warn!("Unexpected raw registration command: {} {:?} {:?}", cmd, args, trailing);
self.reply("451", vec![], Some("You have not registered"))?;
}
},
c => {
let st: String = (&c).into();
warn!("Unexpected registration command: {}", st.trim());
self.reply("451", vec![], Some("You have not registered"))?;
}
}
Ok(())
}
fn is_done(&mut self) -> Result<bool> {
let ret = self.cap_ended
&& self.nick.is_some()
&& self.user.is_some()
&& self.realname.is_some()
&& self.sock.poll_complete()? == Async::Ready(());
Ok(ret)
}
}

28
src/irc_s2c_v3.rs

@ -0,0 +1,28 @@ @@ -0,0 +1,28 @@
//! IRCv3 support for IRC s2c.
pub static SUPPORTED_CAPS: &str = "away-notify";
#[derive(Clone, Copy, Debug)]
pub enum IrcCap {
/// `away-notify` extension
///
/// https://ircv3.net/specs/extensions/away-notify-3.1
AwayNotify
}
impl IrcCap {
pub fn cap_name(&self) -> &'static str {
use self::IrcCap::*;
match *self {
AwayNotify => "away-notify"
}
}
pub fn from_cap_name(cn: &str) -> Option<Self> {
use self::IrcCap::*;
match cn {
"away-notify" => Some(AwayNotify),
_ => None
}
}
}

31
src/main.rs

@ -13,6 +13,7 @@ mod logging; @@ -13,6 +13,7 @@ mod logging;
mod store;
mod modem;
mod comm;
#[macro_use]
mod util;
mod schema;
mod models;
@ -27,6 +28,9 @@ mod whatsapp; @@ -27,6 +28,9 @@ mod whatsapp;
mod whatsapp_media;
mod insp_s2s;
mod insp_user;
mod irc_s2c;
mod irc_s2c_registration;
mod irc_s2c_v3;
use crate::config::Config;
use crate::store::Store;
@ -37,6 +41,7 @@ use futures::{Future, Stream}; @@ -37,6 +41,7 @@ use futures::{Future, Stream};
use crate::contact_factory::ContactFactory;
use tokio_core::reactor::Core;
use crate::insp_s2s::InspLink;
use crate::irc_s2c::IrcServer;
use crate::whatsapp::WhatsappManager;
use tokio_signal::unix::{Signal, SIGHUP};
use std::path::Path;
@ -83,8 +88,12 @@ fn main() -> Result<(), failure::Error> { @@ -83,8 +88,12 @@ fn main() -> Result<(), failure::Error> {
logger.register();
info!("sms-irc version {}", env!("CARGO_PKG_VERSION"));
if config.client.is_none() == config.insp_s2s.is_none() {
error!("Config must contain either a [client] or an [insp_s2s] section (and not both)!");
let configs = (config.client.is_some() as u32)
+ (config.insp_s2s.is_some() as u32)
+ (config.irc_server.is_some() as u32);
if configs != 1 {
error!("Config must have EXACTLY ONE [client], [insp_s2s] or [irc_server] section!");
panic!("invalid configuration");
}
@ -151,7 +160,7 @@ fn main() -> Result<(), failure::Error> { @@ -151,7 +160,7 @@ fn main() -> Result<(), failure::Error> {
panic!("contactfactory failed");
}));
}
else {
else if config.insp_s2s.is_some() {
info!("Running in InspIRCd s2s mode");
let fut = core.run(InspLink::new(InitParameters {
cfg: &config,
@ -167,5 +176,21 @@ fn main() -> Result<(), failure::Error> { @@ -167,5 +176,21 @@ fn main() -> Result<(), failure::Error> {
panic!("link failed");
}));
}
else if config.irc_server.is_some() {
info!("Running in IRC server mode");
let fut = IrcServer::new(InitParameters {
cfg: &config,
cfg2: config.irc_server.as_ref().unwrap(),
store: store.clone(),
cm: &mut cm,
hdl: &hdl
})?;
let _ = core.run(fut.map_err(|e| {
// FIXME: restartability
error!("IrcServer failed: {}", e);
panic!("server failed");
}));
}
Ok(())
}

18
src/util.rs

@ -5,6 +5,24 @@ use whatsappweb::Jid; @@ -5,6 +5,24 @@ use whatsappweb::Jid;
pub type Result<T> = ::std::result::Result<T, ::failure::Error>;
#[macro_export]
macro_rules! sink_outbox {
($self:ident, $outbox_field:ident, $conn_field:ident, $trace_info:expr) => {
for msg in ::std::mem::replace(&mut $self.$outbox_field, vec![]) {
// FIXME: this logs `msg` twice if we get a `NotReady` for whatever reason.
trace!("-->{} {:?}", $trace_info, msg);
match $self.$conn_field.start_send(msg)? {
::futures::AsyncSink::Ready => {
},
::futures::AsyncSink::NotReady(v) => {
$self.$outbox_field.push(v);
}
}
}
$self.$conn_field.poll_complete()?;
}
}
pub fn jid_to_address(jid: &Jid) -> Option<PduAddress> {
if let Some(pn) = jid.phonenumber() {
Some(pn.parse().unwrap())

Loading…
Cancel
Save