Skip to content
Commits on Source (2)
#!/bin/bash
set -x
sudo ip link add name showtime0 type dummy
sudo ip link set dev showtime0 address 00:0b:78:00:60:01
sudo ip addr add 192.168.168.55/32 dev showtime0
sudo ip link set dev showtime0 up
#![allow(deprecated)] // want to use sleep_ms, don't care
use std::net::{UdpSocket, Ipv4Addr, SocketAddr};
use bounded_spsc_queue::{make, Consumer, Producer};
use byteorder::{BigEndian, ByteOrder, WriteBytesExt};
#[cfg(feature = "jack")]
use samplerate::{ConverterType, Samplerate};
#[cfg(feature = "jack")]
use sqa_jack::{
JackCallbackContext, JackConnection, JackControl, JackHandler, JackNFrames, JackPort,
PORT_IS_INPUT,
};
#[cfg(any(feature = "stdin", feature = "tcp"))]
use std::io::Read;
use std::io::Write;
#[cfg(feature = "stdin")]
use std::io::{self, BufReader};
#[cfg(feature = "tcp")]
use std::net::TcpListener;
use byteorder::{BigEndian, WriteBytesExt};
#[cfg(any(feature = "jack", test))]
use byteorder::ByteOrder;
use std::sync::Arc;
use std::net::{Ipv4Addr, SocketAddr, UdpSocket};
use std::sync::atomic::{AtomicBool, AtomicU8, Ordering};
use std::time::Instant;
use std::sync::mpsc;
use std::io::{Write};
#[cfg(feature = "stdin")]
use std::io::{self, BufReader};
#[cfg(any(feature = "stdin", feature = "tcp"))]
use std::io::Read;
use bounded_spsc_queue::{Consumer, Producer, make};
use std::sync::Arc;
use std::thread::Builder;
#[cfg(feature = "jack")]
use sqa_jack::{JackHandler, JackPort, JackCallbackContext, JackControl, JackConnection, PORT_IS_INPUT, JackNFrames};
#[cfg(feature = "jack")]
use samplerate::{Samplerate, ConverterType};
use std::time::Instant;
const BUFFER_SIZE: usize = 992*512;
const BUFFER_SIZE: usize = 992 * 512;
#[cfg(feature = "jack")]
struct ShowtimeJackHandler {
left: JackPort,
right: JackPort,
tx: Producer<f32>
tx: Producer<f32>,
}
#[cfg(feature = "jack")]
......@@ -118,7 +119,7 @@ fn jack_feeder_loop(from_jack: mpsc::Receiver<Vec<f32>>, out: mpsc::SyncSender<V
struct FrameChunker<'a> {
inner: &'a [u8],
frame_no: u16,
seq_no: u16
seq_no: u16,
}
impl<'a> FrameChunker<'a> {
......@@ -126,15 +127,14 @@ impl<'a> FrameChunker<'a> {
Self {
inner,
frame_no,
seq_no: 0
seq_no: 0,
}
}
fn next_chunk(&mut self) -> Vec<u8> {
let mut ret = Vec::with_capacity(1024);
let (remaining, last) = if self.inner.len() > 1020 {
(1020, false)
}
else {
} else {
(self.inner.len(), true)
};
ret.write_u16::<BigEndian>(self.frame_no).unwrap();
......@@ -164,17 +164,20 @@ fn make_vsync(frame_no: u16) -> Vec<u8> {
ret
}
fn audio_thread(udp: UdpSocket, dest: SocketAddr, queue: Consumer<u8>, fc: Arc<AtomicU8>) {
fn audio_thread(udp: UdpSocket, dest: SocketAddr, queue: Consumer<i32>, fc: Arc<AtomicU8>) {
let mut pkt = [0u8; 1008];
// Pre-write the header on to the packet.
for (i, b) in [0, 0x55, 0x55, 0x55, 0x55, 0x55, 0x55, 0x55, 0x55, 0x55, 0x55, 0x55, 0, 0, 0].iter().enumerate() {
for (i, b) in [
0, 0x55, 0x55, 0x55, 0x55, 0x55, 0x55, 0x55, 0x55, 0x55, 0x55, 0x55, 0, 0, 0,
]
.iter()
.enumerate()
{
pkt[i] = *b;
}
let now = Instant::now();
// Monotonic nanosecond counter.
let cur_nanos = || {
Instant::now().duration_since(now).as_micros()
};
let cur_nanos = || Instant::now().duration_since(now).as_micros();
const INTERVAL: u128 = 2811;
// Next time (on the counter) we need to send a packet.
let mut next = INTERVAL;
......@@ -193,11 +196,11 @@ fn audio_thread(udp: UdpSocket, dest: SocketAddr, queue: Consumer<u8>, fc: Arc<A
if ptr != 1008 {
num_xruns += 1;
fc.store(0, Ordering::Relaxed);
if ptr != 16 { // reduce spam
if ptr != 16 {
// reduce spam
eprintln!("/!\\ XRUN ptr={}smpls, t={}ns", ptr, cur);
}
}
else {
} else {
// Reset the pointer.
ptr = 16;
num_xruns = 0;
......@@ -219,17 +222,15 @@ fn audio_thread(udp: UdpSocket, dest: SocketAddr, queue: Consumer<u8>, fc: Arc<A
if num_xruns > 5 {
// Let's just fill it up with silence instead.
// (otherwise you get a buzzing effect that's highly irritating)
pkt[ptr] = 0;
ptr += 1;
}
else {
BigEndian::write_i32(&mut pkt[ptr..], 0);
ptr += 4;
} else {
if let Some(smpl) = queue.try_pop() {
pkt[ptr] = smpl;
ptr += 1;
BigEndian::write_i32(&mut pkt[ptr..], smpl);
ptr += 4;
}
}
}
else {
} else {
if ldiff > 300 {
eprintln!("! wakeup delayed {}ns", ldiff);
ldiff = 0;
......@@ -242,16 +243,17 @@ fn audio_thread(udp: UdpSocket, dest: SocketAddr, queue: Consumer<u8>, fc: Arc<A
fn make_heartbeat(seq_no: u16, cur_ts: u16, init: bool) -> Vec<u8> {
let mut ret = Vec::with_capacity(512);
ret.write_all(&[0x54, 0x46, 0x36, 0x7a, 0x63, 0x01, 0x00]).unwrap();
ret.write_all(&[0x54, 0x46, 0x36, 0x7a, 0x63, 0x01, 0x00])
.unwrap();
ret.write_u16::<BigEndian>(seq_no).unwrap();
ret.write_all(&[0x00, 0x00, 0x03, 0x03, 0x03, 0x00, 0x24, 0x00, 0x00]).unwrap();
ret.write_all(&[0x00, 0x00, 0x03, 0x03, 0x03, 0x00, 0x24, 0x00, 0x00])
.unwrap();
for _ in 0..8 {
ret.write_all(&[0]).unwrap();
}
let (mode, a, b, c, d, e, f, other) = if init {
(3, 1920, 1080, 599, 1920, 1080, 120, 3)
}
else {
} else {
(16, 0, 0, 0, 0, 0, 120, 0)
};
for thing in &[mode, a, b, c, d, e, f] {
......@@ -283,7 +285,7 @@ fn wakeup_loop(sock: UdpSocket, dest: SocketAddr, init: Arc<AtomicBool>) {
#[test]
fn test_frame_chunker() {
let test_frame = include_bytes!("../test.mjpeg");
let test_frame = include_bytes!("../fuck.mjpeg");
let mut chonker = FrameChunker::new(0, test_frame as &[u8]);
let mut test = Vec::new();
let mut expected_seq = 0;
......@@ -319,7 +321,7 @@ fn test_frame_chunker_white() {
assert_eq!(&total_framed as &[u8], &test as &[u8]);
}
fn spooler_loop(bufs: mpsc::Receiver<Vec<u8>>, tx: Producer<u8>) {
fn spooler_loop(bufs: mpsc::Receiver<Vec<i32>>, tx: Producer<i32>) {
while let Ok(buf) = bufs.recv() {
for byte in buf {
tx.push(byte);
......@@ -328,7 +330,7 @@ fn spooler_loop(bufs: mpsc::Receiver<Vec<u8>>, tx: Producer<u8>) {
}
#[cfg(any(feature = "tcp", feature = "stdin"))]
fn feeder_loop<R: Read>(mut inp: R, out: mpsc::SyncSender<Vec<u8>>) {
fn feeder_loop<R: Read>(mut inp: R, out: mpsc::SyncSender<Vec<i32>>) {
'outer: loop {
let mut buf = Vec::with_capacity(BUFFER_SIZE / 2);
for _ in 0..(BUFFER_SIZE / 2) {
......@@ -343,7 +345,11 @@ fn feeder_loop<R: Read>(mut inp: R, out: mpsc::SyncSender<Vec<u8>>) {
}
read_bytes += num;
if read_bytes == (BUFFER_SIZE / 2) {
out.send(buf.clone()).unwrap();
let buf_be: Vec<i32> = buf
.chunks(4)
.map(|chunk| BigEndian::read_i32(chunk))
.collect();
out.send(buf_be).unwrap();
read_bytes = 0;
}
}
......@@ -360,7 +366,7 @@ fn stdin_source(bufs_tx: mpsc::SyncSender<Vec<u8>>) {
}
#[cfg(feature = "tcp")]
fn tcp_source(bufs_tx: mpsc::SyncSender<Vec<u8>>) {
fn tcp_source(bufs_tx: mpsc::SyncSender<Vec<i32>>) {
let listener = TcpListener::bind("0.0.0.0:1337").unwrap();
eprintln!("listening on 0.0.0.0:1337; ffmpeg format: -f s32be -ar 44100 -ac 2");
for stream in listener.incoming() {
......@@ -370,8 +376,8 @@ fn tcp_source(bufs_tx: mpsc::SyncSender<Vec<u8>>) {
std::thread::spawn(move || {
feeder_loop(s, btx);
});
},
Err(e) => eprintln!("accept() failed: {}", e)
}
Err(e) => eprintln!("accept() failed: {}", e),
}
}
}
......@@ -385,8 +391,9 @@ fn jack_source(bufs_tx: mpsc::SyncSender<Vec<u8>>) {
let (jack_tx, jack_rx) = make(BUFFER_SIZE * 4);
let (jack_rs_tx, jack_rs_rx) = mpsc::sync_channel(BUFFER_SIZE);
let handler = ShowtimeJackHandler {
left, right,
tx: jack_tx
left,
right,
tx: jack_tx,
};
conn.set_handler(handler).unwrap();
println!("starting JACK feeder");
......@@ -407,7 +414,7 @@ fn jack_source(bufs_tx: mpsc::SyncSender<Vec<u8>>) {
// NOTE: making the conn `_` drops it
let _conn = match conn.activate() {
Ok(c) => c,
Err((_, e)) => Err(e).unwrap()
Err((_, e)) => Err(e).unwrap(),
};
println!("sleeping");
loop {
......@@ -420,46 +427,49 @@ fn main() {
let fuck = include_bytes!("../behind.mjpeg");
let bork = include_bytes!("../bork.mjpeg");
println!("*** showtime sender / by eta <https://eta.st/> ***");
println!("[+] binding UDP sockets (ensure the correct interface has address 192.168.168.55)");
let vsync = UdpSocket::bind("192.168.168.55:2067").unwrap();
let vsync_dest: SocketAddr = "226.2.2.2:2067".parse().unwrap();
let mjpeg = UdpSocket::bind("192.168.168.55:2068").unwrap();
let mjpeg_dest: SocketAddr = "226.2.2.2:2068".parse().unwrap();
let sound = UdpSocket::bind("192.168.168.55:2065").unwrap();
let sound_dest: SocketAddr = "226.2.2.2:2066".parse().unwrap();
let wakeup = UdpSocket::bind("192.168.168.55:48689").unwrap();
let wakeup_dest: SocketAddr = "255.255.255.255:48689".parse().unwrap();
println!("[+] joining multicast groups");
vsync.join_multicast_v4(&MAGIC_ADDRESS, &"192.168.168.55".parse().unwrap()).unwrap();
mjpeg.join_multicast_v4(&MAGIC_ADDRESS, &"192.168.168.55".parse().unwrap()).unwrap();
sound.join_multicast_v4(&MAGIC_ADDRESS, &"192.168.168.55".parse().unwrap()).unwrap();
println!("[+] setting broadcast option");
wakeup.set_broadcast(true).unwrap();
sound
.join_multicast_v4(&MAGIC_ADDRESS, &"192.168.168.55".parse().unwrap())
.unwrap();
let init = Arc::new(AtomicBool::new(false));
let fucked = Arc::new(AtomicU8::new(0));
let ic = init.clone();
if std::env::var_os("NO_HEARTBEAT").is_none() {
println!("[+] starting heartbeater");
let wakeup = UdpSocket::bind("192.168.168.55:48689").unwrap();
let wakeup_dest: SocketAddr = "255.255.255.255:48689".parse().unwrap();
wakeup.set_broadcast(true).unwrap();
Builder::new()
.name("heartbeater".into())
.spawn(move || {
wakeup_loop(wakeup, wakeup_dest, ic);
})
.unwrap();
.unwrap();
std::thread::sleep_ms(100);
}
init.store(true, Ordering::Relaxed);
println!("[+] starting audio sender");
let (tx, rx) = make::<u8>(BUFFER_SIZE);
let (tx, rx) = make::<i32>(BUFFER_SIZE);
let fc = fucked.clone();
Builder::new()
.name("audiosender".into())
.spawn(move || {
audio_thread(sound, sound_dest, rx, fc);
})
.unwrap();
.unwrap();
println!("[+] starting video sender");
if std::env::var_os("NO_VIDEO").is_none() {
let vsync = UdpSocket::bind("192.168.168.55:2067").unwrap();
let vsync_dest: SocketAddr = "226.2.2.2:2067".parse().unwrap();
let mjpeg = UdpSocket::bind("192.168.168.55:2068").unwrap();
let mjpeg_dest: SocketAddr = "226.2.2.2:2068".parse().unwrap();
vsync
.join_multicast_v4(&MAGIC_ADDRESS, &"192.168.168.55".parse().unwrap())
.unwrap();
mjpeg
.join_multicast_v4(&MAGIC_ADDRESS, &"192.168.168.55".parse().unwrap())
.unwrap();
Builder::new()
.name("videosender".into())
.spawn(move || {
......@@ -471,7 +481,7 @@ fn main() {
// "AUDIO THREAD BEHIND" image
2 => fuck as &[u8],
// normal showtime image
_ => showtime as &[u8]
_ => showtime as &[u8],
};
let mut chonker = FrameChunker::new(frame_no, image);
let vsync_pkt = make_vsync(frame_no);
......@@ -484,7 +494,7 @@ fn main() {
::std::thread::sleep_ms(33);
}
})
.unwrap();
.unwrap();
}
println!("[+] starting audio spooler");
let (bufs_tx, bufs_rx) = mpsc::sync_channel(0);
......