|
|
|
@ -2,27 +2,77 @@ use anyhow::{anyhow, Context};
@@ -2,27 +2,77 @@ use anyhow::{anyhow, Context};
|
|
|
|
|
use byteorder::{BigEndian, ReadBytesExt}; |
|
|
|
|
use futures_util::StreamExt; |
|
|
|
|
use image::{ImageBuffer, Rgb}; |
|
|
|
|
use log::{debug, error, info, trace, warn}; |
|
|
|
|
use openh264::decoder::{Decoder, DecoderConfig}; |
|
|
|
|
use retina::client::{PlayOptions, Session, SessionOptions, SetupOptions}; |
|
|
|
|
use retina::codec::{CodecItem, ParametersRef}; |
|
|
|
|
use std::io::{Cursor, Read, Seek, SeekFrom, Write}; |
|
|
|
|
use std::ops::Deref; |
|
|
|
|
use std::fs::File; |
|
|
|
|
use std::io::{Cursor, Read, Seek, SeekFrom}; |
|
|
|
|
use std::time::{Duration, Instant}; |
|
|
|
|
|
|
|
|
|
mod config; |
|
|
|
|
|
|
|
|
|
use crate::config::{Configuration, Probe}; |
|
|
|
|
|
|
|
|
|
fn print_usage() { |
|
|
|
|
let our_name = std::env::args() |
|
|
|
|
.next() |
|
|
|
|
.unwrap_or_else(|| "boiler-stats".to_owned()); |
|
|
|
|
eprintln!("usage: {} [configuration file path]", our_name); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
fn load_config(path: &str) -> anyhow::Result<Configuration> { |
|
|
|
|
let mut file = File::open(path).context(format!( |
|
|
|
|
"failed to open config file at {}; check -h for usage", |
|
|
|
|
path |
|
|
|
|
))?; |
|
|
|
|
let mut buffer = Vec::new(); |
|
|
|
|
|
|
|
|
|
file.read_to_end(&mut buffer) |
|
|
|
|
.context("failed to read config")?; |
|
|
|
|
|
|
|
|
|
let ret = toml::from_slice(&buffer).context("failed to deserialize config")?; |
|
|
|
|
|
|
|
|
|
Ok(ret) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
#[tokio::main] |
|
|
|
|
async fn main() -> anyhow::Result<()> { |
|
|
|
|
eprintln!("[+] setting up RTSP stream"); |
|
|
|
|
let mut config_path = "./config.toml".to_owned(); |
|
|
|
|
match std::env::args().nth(1).as_ref().map(|x| x as &str) { |
|
|
|
|
Some("-h") => { |
|
|
|
|
print_usage(); |
|
|
|
|
return Ok(()); |
|
|
|
|
} |
|
|
|
|
Some(path) => { |
|
|
|
|
config_path = path.to_owned(); // eww
|
|
|
|
|
} |
|
|
|
|
None => {} |
|
|
|
|
} |
|
|
|
|
let config = load_config(&config_path)?; |
|
|
|
|
|
|
|
|
|
pretty_env_logger::formatted_builder() |
|
|
|
|
.filter_level(config.loglevel) |
|
|
|
|
.init(); |
|
|
|
|
|
|
|
|
|
info!("boiler-stats v{}", env!("CARGO_PKG_VERSION")); |
|
|
|
|
info!("connecting to RTSP stream at {}", config.rtsp_uri); |
|
|
|
|
|
|
|
|
|
let uri = config.rtsp_uri.parse().context("parsing RTSP URI")?; |
|
|
|
|
debug!("doing DESCRIBE"); |
|
|
|
|
|
|
|
|
|
let mut session = Session::describe( |
|
|
|
|
"rtsp://hammersmith.i.eta.st:8554/boiler".parse()?, |
|
|
|
|
uri, |
|
|
|
|
SessionOptions::default().user_agent("boiler-stats/0.1".to_owned()), |
|
|
|
|
) |
|
|
|
|
.await |
|
|
|
|
.context("rtsp describe failed")?; |
|
|
|
|
|
|
|
|
|
let mut stream_idx = None; |
|
|
|
|
let mut video_params = None; |
|
|
|
|
for (i, stream) in session.streams().iter().enumerate() { |
|
|
|
|
eprintln!( |
|
|
|
|
"[+] stream {}: media {} with encoding {} (parameters {})", |
|
|
|
|
debug!( |
|
|
|
|
"stream {}: media {} with encoding {} (parameters {})", |
|
|
|
|
i, |
|
|
|
|
stream.media(), |
|
|
|
|
stream.encoding_name(), |
|
|
|
@ -49,13 +99,14 @@ async fn main() -> anyhow::Result<()> {
@@ -49,13 +99,14 @@ async fn main() -> anyhow::Result<()> {
|
|
|
|
|
.await |
|
|
|
|
.context("rtsp play failed")?; |
|
|
|
|
let mut demuxed = playing.demuxed().context("rtsp demux failed")?; |
|
|
|
|
eprintln!("[+] streaming"); |
|
|
|
|
debug!("rtsp succeeded; starting openh264"); |
|
|
|
|
let mut decoder = |
|
|
|
|
Decoder::with_config(DecoderConfig::default()).context("init openh264 decoder")?; |
|
|
|
|
let mut provided_headers = false; |
|
|
|
|
let mut imaged = false; |
|
|
|
|
let mut last_success = Instant::now(); |
|
|
|
|
|
|
|
|
|
info!("streaming frames"); |
|
|
|
|
while let Some(ret) = demuxed.next().await { |
|
|
|
|
let ret = match ret { |
|
|
|
|
Ok(v) => v, |
|
|
|
@ -79,7 +130,7 @@ async fn main() -> anyhow::Result<()> {
@@ -79,7 +130,7 @@ async fn main() -> anyhow::Result<()> {
|
|
|
|
|
let nal_type = buf[0] & 0x1f; |
|
|
|
|
// 7 = SPS, 8 = PPS
|
|
|
|
|
if nal_type != 7 && nal_type != 8 && !provided_headers { |
|
|
|
|
eprintln!("[+] detected no in-band headers, passing them manually"); |
|
|
|
|
warn!("detected no in-band SPS/PPS NALs, passing them manually"); |
|
|
|
|
provided_headers = true; |
|
|
|
|
let video_params = video_params |
|
|
|
|
.take() |
|
|
|
@ -108,9 +159,7 @@ async fn main() -> anyhow::Result<()> {
@@ -108,9 +159,7 @@ async fn main() -> anyhow::Result<()> {
|
|
|
|
|
extra_data_cursor |
|
|
|
|
.read_exact(&mut pps_nal[3..]) |
|
|
|
|
.context("failed to read PPS NAL")?; |
|
|
|
|
assert_eq!(sps_nal[3] & 0x1f, 7); |
|
|
|
|
assert_eq!(pps_nal[3] & 0x1f, 8); |
|
|
|
|
eprintln!( |
|
|
|
|
debug!( |
|
|
|
|
"[+] SPS NAL length {}, PPS NAL length {}", |
|
|
|
|
sps_nal.len(), |
|
|
|
|
pps_nal.len() |
|
|
|
@ -122,7 +171,7 @@ async fn main() -> anyhow::Result<()> {
@@ -122,7 +171,7 @@ async fn main() -> anyhow::Result<()> {
|
|
|
|
|
decoder |
|
|
|
|
.decode(&pps_nal) |
|
|
|
|
.context("failed to decode PPS NAL")?; |
|
|
|
|
eprintln!("[+] passed SPS and PPS NALs, continuing"); |
|
|
|
|
debug!("passed SPS and PPS NALs, continuing"); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
let decode_result = decoder.decode(&buf); |
|
|
|
@ -135,10 +184,14 @@ async fn main() -> anyhow::Result<()> {
@@ -135,10 +184,14 @@ async fn main() -> anyhow::Result<()> {
|
|
|
|
|
let ibuf = |
|
|
|
|
ImageBuffer::<Rgb<u8>, _>::from_vec(width as u32, height as u32, buf) |
|
|
|
|
.unwrap(); |
|
|
|
|
eprintln!("[+] frame at T+{:.02}s", vf.timestamp().elapsed_secs()); |
|
|
|
|
trace!("frame at T+{:.02}s", vf.timestamp().elapsed_secs()); |
|
|
|
|
if !imaged { |
|
|
|
|
ibuf.save("./decoded.png")?; |
|
|
|
|
eprintln!("[+] saved image to ./decoded.png"); |
|
|
|
|
if let Some(ip) = config.image_path.as_ref() { |
|
|
|
|
ibuf.save(ip)?; |
|
|
|
|
info!("saved first decoded frame to {}", ip); |
|
|
|
|
} else { |
|
|
|
|
info!("got first decoded frame"); |
|
|
|
|
} |
|
|
|
|
imaged = true; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
@ -146,7 +199,9 @@ async fn main() -> anyhow::Result<()> {
@@ -146,7 +199,9 @@ async fn main() -> anyhow::Result<()> {
|
|
|
|
|
last_success = Instant::now(); |
|
|
|
|
} |
|
|
|
|
Err(e) => { |
|
|
|
|
trace!("decode error: {}", e); |
|
|
|
|
if Instant::now() - last_success > Duration::from_secs(10) { |
|
|
|
|
error!("giving up on decoding"); |
|
|
|
|
return Err(anyhow!( |
|
|
|
|
"failed to decode after 10 seconds; last error: {}", |
|
|
|
|
e |
|
|
|
|