Skip to content
GitLab
Explore
Sign in
Commits on Source (2)
fix static glitching bug, rust-toolchain, rustfmt
· 9606f9e8
eta
authored
Aug 20, 2021
I'm too lazy to make this a Proper Change and it also only fixes it for the tcp feature flag
9606f9e8
make socket binding conditional
· c1e90271
eta
authored
Nov 06, 2021
c1e90271
Hide whitespace changes
Inline
Side-by-side
.gitignore
View file @
c1e90271
/target
.idea/
ifup.sh
0 → 100644
View file @
c1e90271
#!/bin/bash
set
-x
sudo
ip
link
add name showtime0
type
dummy
sudo
ip
link set
dev showtime0 address 00:0b:78:00:60:01
sudo
ip addr add 192.168.168.55/32 dev showtime0
sudo
ip
link set
dev showtime0 up
rust-toolchain
0 → 100644
View file @
c1e90271
nightly
src/main.rs
View file @
c1e90271
#![allow(deprecated)]
// want to use sleep_ms, don't care
use
std
::
net
::{
UdpSocket
,
Ipv4Addr
,
SocketAddr
};
use
bounded_spsc_queue
::{
make
,
Consumer
,
Producer
};
use
byteorder
::{
BigEndian
,
ByteOrder
,
WriteBytesExt
};
#[cfg(feature
=
"jack"
)]
use
samplerate
::{
ConverterType
,
Samplerate
};
#[cfg(feature
=
"jack"
)]
use
sqa_jack
::{
JackCallbackContext
,
JackConnection
,
JackControl
,
JackHandler
,
JackNFrames
,
JackPort
,
PORT_IS_INPUT
,
};
#[cfg(any(feature
=
"stdin"
,
feature
=
"tcp"
))]
use
std
::
io
::
Read
;
use
std
::
io
::
Write
;
#[cfg(feature
=
"stdin"
)]
use
std
::
io
::{
self
,
BufReader
};
#[cfg(feature
=
"tcp"
)]
use
std
::
net
::
TcpListener
;
use
byteorder
::{
BigEndian
,
WriteBytesExt
};
#[cfg(any(feature
=
"jack"
,
test))]
use
byteorder
::
ByteOrder
;
use
std
::
sync
::
Arc
;
use
std
::
net
::{
Ipv4Addr
,
SocketAddr
,
UdpSocket
};
use
std
::
sync
::
atomic
::{
AtomicBool
,
AtomicU8
,
Ordering
};
use
std
::
time
::
Instant
;
use
std
::
sync
::
mpsc
;
use
std
::
io
::{
Write
};
#[cfg(feature
=
"stdin"
)]
use
std
::
io
::{
self
,
BufReader
};
#[cfg(any(feature
=
"stdin"
,
feature
=
"tcp"
))]
use
std
::
io
::
Read
;
use
bounded_spsc_queue
::{
Consumer
,
Producer
,
make
};
use
std
::
sync
::
Arc
;
use
std
::
thread
::
Builder
;
#[cfg(feature
=
"jack"
)]
use
sqa_jack
::{
JackHandler
,
JackPort
,
JackCallbackContext
,
JackControl
,
JackConnection
,
PORT_IS_INPUT
,
JackNFrames
};
#[cfg(feature
=
"jack"
)]
use
samplerate
::{
Samplerate
,
ConverterType
};
use
std
::
time
::
Instant
;
const
BUFFER_SIZE
:
usize
=
992
*
512
;
const
BUFFER_SIZE
:
usize
=
992
*
512
;
#[cfg(feature
=
"jack"
)]
struct
ShowtimeJackHandler
{
left
:
JackPort
,
right
:
JackPort
,
tx
:
Producer
<
f32
>
tx
:
Producer
<
f32
>
,
}
#[cfg(feature
=
"jack"
)]
...
...
@@ -118,7 +119,7 @@ fn jack_feeder_loop(from_jack: mpsc::Receiver<Vec<f32>>, out: mpsc::SyncSender<V
struct
FrameChunker
<
'a
>
{
inner
:
&
'a
[
u8
],
frame_no
:
u16
,
seq_no
:
u16
seq_no
:
u16
,
}
impl
<
'a
>
FrameChunker
<
'a
>
{
...
...
@@ -126,15 +127,14 @@ impl<'a> FrameChunker<'a> {
Self
{
inner
,
frame_no
,
seq_no
:
0
seq_no
:
0
,
}
}
fn
next_chunk
(
&
mut
self
)
->
Vec
<
u8
>
{
let
mut
ret
=
Vec
::
with_capacity
(
1024
);
let
(
remaining
,
last
)
=
if
self
.inner
.len
()
>
1020
{
(
1020
,
false
)
}
else
{
}
else
{
(
self
.inner
.len
(),
true
)
};
ret
.write_u16
::
<
BigEndian
>
(
self
.frame_no
)
.unwrap
();
...
...
@@ -164,17 +164,20 @@ fn make_vsync(frame_no: u16) -> Vec<u8> {
ret
}
fn
audio_thread
(
udp
:
UdpSocket
,
dest
:
SocketAddr
,
queue
:
Consumer
<
u8
>
,
fc
:
Arc
<
AtomicU8
>
)
{
fn
audio_thread
(
udp
:
UdpSocket
,
dest
:
SocketAddr
,
queue
:
Consumer
<
i32
>
,
fc
:
Arc
<
AtomicU8
>
)
{
let
mut
pkt
=
[
0u8
;
1008
];
// Pre-write the header on to the packet.
for
(
i
,
b
)
in
[
0
,
0x55
,
0x55
,
0x55
,
0x55
,
0x55
,
0x55
,
0x55
,
0x55
,
0x55
,
0x55
,
0x55
,
0
,
0
,
0
]
.iter
()
.enumerate
()
{
for
(
i
,
b
)
in
[
0
,
0x55
,
0x55
,
0x55
,
0x55
,
0x55
,
0x55
,
0x55
,
0x55
,
0x55
,
0x55
,
0x55
,
0
,
0
,
0
,
]
.iter
()
.enumerate
()
{
pkt
[
i
]
=
*
b
;
}
let
now
=
Instant
::
now
();
// Monotonic nanosecond counter.
let
cur_nanos
=
||
{
Instant
::
now
()
.duration_since
(
now
)
.as_micros
()
};
let
cur_nanos
=
||
Instant
::
now
()
.duration_since
(
now
)
.as_micros
();
const
INTERVAL
:
u128
=
2811
;
// Next time (on the counter) we need to send a packet.
let
mut
next
=
INTERVAL
;
...
...
@@ -193,11 +196,11 @@ fn audio_thread(udp: UdpSocket, dest: SocketAddr, queue: Consumer<u8>, fc: Arc<A
if
ptr
!=
1008
{
num_xruns
+=
1
;
fc
.store
(
0
,
Ordering
::
Relaxed
);
if
ptr
!=
16
{
// reduce spam
if
ptr
!=
16
{
// reduce spam
eprintln!
(
"/!
\\
XRUN ptr={}smpls, t={}ns"
,
ptr
,
cur
);
}
}
else
{
}
else
{
// Reset the pointer.
ptr
=
16
;
num_xruns
=
0
;
...
...
@@ -219,17 +222,15 @@ fn audio_thread(udp: UdpSocket, dest: SocketAddr, queue: Consumer<u8>, fc: Arc<A
if
num_xruns
>
5
{
// Let's just fill it up with silence instead.
// (otherwise you get a buzzing effect that's highly irritating)
pkt
[
ptr
]
=
0
;
ptr
+=
1
;
}
else
{
BigEndian
::
write_i32
(
&
mut
pkt
[
ptr
..
],
0
);
ptr
+=
4
;
}
else
{
if
let
Some
(
smpl
)
=
queue
.try_pop
()
{
pkt
[
ptr
]
=
smpl
;
ptr
+=
1
;
BigEndian
::
write_i32
(
&
mut
pkt
[
ptr
..
],
smpl
)
;
ptr
+=
4
;
}
}
}
else
{
}
else
{
if
ldiff
>
300
{
eprintln!
(
"! wakeup delayed {}ns"
,
ldiff
);
ldiff
=
0
;
...
...
@@ -242,16 +243,17 @@ fn audio_thread(udp: UdpSocket, dest: SocketAddr, queue: Consumer<u8>, fc: Arc<A
fn
make_heartbeat
(
seq_no
:
u16
,
cur_ts
:
u16
,
init
:
bool
)
->
Vec
<
u8
>
{
let
mut
ret
=
Vec
::
with_capacity
(
512
);
ret
.write_all
(
&
[
0x54
,
0x46
,
0x36
,
0x7a
,
0x63
,
0x01
,
0x00
])
.unwrap
();
ret
.write_all
(
&
[
0x54
,
0x46
,
0x36
,
0x7a
,
0x63
,
0x01
,
0x00
])
.unwrap
();
ret
.write_u16
::
<
BigEndian
>
(
seq_no
)
.unwrap
();
ret
.write_all
(
&
[
0x00
,
0x00
,
0x03
,
0x03
,
0x03
,
0x00
,
0x24
,
0x00
,
0x00
])
.unwrap
();
ret
.write_all
(
&
[
0x00
,
0x00
,
0x03
,
0x03
,
0x03
,
0x00
,
0x24
,
0x00
,
0x00
])
.unwrap
();
for
_
in
0
..
8
{
ret
.write_all
(
&
[
0
])
.unwrap
();
}
let
(
mode
,
a
,
b
,
c
,
d
,
e
,
f
,
other
)
=
if
init
{
(
3
,
1920
,
1080
,
599
,
1920
,
1080
,
120
,
3
)
}
else
{
}
else
{
(
16
,
0
,
0
,
0
,
0
,
0
,
120
,
0
)
};
for
thing
in
&
[
mode
,
a
,
b
,
c
,
d
,
e
,
f
]
{
...
...
@@ -283,7 +285,7 @@ fn wakeup_loop(sock: UdpSocket, dest: SocketAddr, init: Arc<AtomicBool>) {
#[test]
fn
test_frame_chunker
()
{
let
test_frame
=
include_bytes!
(
"../
test
.mjpeg"
);
let
test_frame
=
include_bytes!
(
"../
fuck
.mjpeg"
);
let
mut
chonker
=
FrameChunker
::
new
(
0
,
test_frame
as
&
[
u8
]);
let
mut
test
=
Vec
::
new
();
let
mut
expected_seq
=
0
;
...
...
@@ -319,7 +321,7 @@ fn test_frame_chunker_white() {
assert_eq!
(
&
total_framed
as
&
[
u8
],
&
test
as
&
[
u8
]);
}
fn
spooler_loop
(
bufs
:
mpsc
::
Receiver
<
Vec
<
u8
>>
,
tx
:
Producer
<
u8
>
)
{
fn
spooler_loop
(
bufs
:
mpsc
::
Receiver
<
Vec
<
i32
>>
,
tx
:
Producer
<
i32
>
)
{
while
let
Ok
(
buf
)
=
bufs
.recv
()
{
for
byte
in
buf
{
tx
.push
(
byte
);
...
...
@@ -328,7 +330,7 @@ fn spooler_loop(bufs: mpsc::Receiver<Vec<u8>>, tx: Producer<u8>) {
}
#[cfg(any(feature
=
"tcp"
,
feature
=
"stdin"
))]
fn
feeder_loop
<
R
:
Read
>
(
mut
inp
:
R
,
out
:
mpsc
::
SyncSender
<
Vec
<
u8
>>
)
{
fn
feeder_loop
<
R
:
Read
>
(
mut
inp
:
R
,
out
:
mpsc
::
SyncSender
<
Vec
<
i32
>>
)
{
'outer
:
loop
{
let
mut
buf
=
Vec
::
with_capacity
(
BUFFER_SIZE
/
2
);
for
_
in
0
..
(
BUFFER_SIZE
/
2
)
{
...
...
@@ -343,7 +345,11 @@ fn feeder_loop<R: Read>(mut inp: R, out: mpsc::SyncSender<Vec<u8>>) {
}
read_bytes
+=
num
;
if
read_bytes
==
(
BUFFER_SIZE
/
2
)
{
out
.send
(
buf
.clone
())
.unwrap
();
let
buf_be
:
Vec
<
i32
>
=
buf
.chunks
(
4
)
.map
(|
chunk
|
BigEndian
::
read_i32
(
chunk
))
.collect
();
out
.send
(
buf_be
)
.unwrap
();
read_bytes
=
0
;
}
}
...
...
@@ -360,7 +366,7 @@ fn stdin_source(bufs_tx: mpsc::SyncSender<Vec<u8>>) {
}
#[cfg(feature
=
"tcp"
)]
fn
tcp_source
(
bufs_tx
:
mpsc
::
SyncSender
<
Vec
<
u8
>>
)
{
fn
tcp_source
(
bufs_tx
:
mpsc
::
SyncSender
<
Vec
<
i32
>>
)
{
let
listener
=
TcpListener
::
bind
(
"0.0.0.0:1337"
)
.unwrap
();
eprintln!
(
"listening on 0.0.0.0:1337; ffmpeg format: -f s32be -ar 44100 -ac 2"
);
for
stream
in
listener
.incoming
()
{
...
...
@@ -370,8 +376,8 @@ fn tcp_source(bufs_tx: mpsc::SyncSender<Vec<u8>>) {
std
::
thread
::
spawn
(
move
||
{
feeder_loop
(
s
,
btx
);
});
}
,
Err
(
e
)
=>
eprintln!
(
"accept() failed: {}"
,
e
)
}
Err
(
e
)
=>
eprintln!
(
"accept() failed: {}"
,
e
)
,
}
}
}
...
...
@@ -385,8 +391,9 @@ fn jack_source(bufs_tx: mpsc::SyncSender<Vec<u8>>) {
let
(
jack_tx
,
jack_rx
)
=
make
(
BUFFER_SIZE
*
4
);
let
(
jack_rs_tx
,
jack_rs_rx
)
=
mpsc
::
sync_channel
(
BUFFER_SIZE
);
let
handler
=
ShowtimeJackHandler
{
left
,
right
,
tx
:
jack_tx
left
,
right
,
tx
:
jack_tx
,
};
conn
.set_handler
(
handler
)
.unwrap
();
println!
(
"starting JACK feeder"
);
...
...
@@ -407,7 +414,7 @@ fn jack_source(bufs_tx: mpsc::SyncSender<Vec<u8>>) {
// NOTE: making the conn `_` drops it
let
_conn
=
match
conn
.activate
()
{
Ok
(
c
)
=>
c
,
Err
((
_
,
e
))
=>
Err
(
e
)
.unwrap
()
Err
((
_
,
e
))
=>
Err
(
e
)
.unwrap
()
,
};
println!
(
"sleeping"
);
loop
{
...
...
@@ -420,46 +427,49 @@ fn main() {
let
fuck
=
include_bytes!
(
"../behind.mjpeg"
);
let
bork
=
include_bytes!
(
"../bork.mjpeg"
);
println!
(
"*** showtime sender / by eta <https://eta.st/> ***"
);
println!
(
"[+] binding UDP sockets (ensure the correct interface has address 192.168.168.55)"
);
let
vsync
=
UdpSocket
::
bind
(
"192.168.168.55:2067"
)
.unwrap
();
let
vsync_dest
:
SocketAddr
=
"226.2.2.2:2067"
.parse
()
.unwrap
();
let
mjpeg
=
UdpSocket
::
bind
(
"192.168.168.55:2068"
)
.unwrap
();
let
mjpeg_dest
:
SocketAddr
=
"226.2.2.2:2068"
.parse
()
.unwrap
();
let
sound
=
UdpSocket
::
bind
(
"192.168.168.55:2065"
)
.unwrap
();
let
sound_dest
:
SocketAddr
=
"226.2.2.2:2066"
.parse
()
.unwrap
();
let
wakeup
=
UdpSocket
::
bind
(
"192.168.168.55:48689"
)
.unwrap
();
let
wakeup_dest
:
SocketAddr
=
"255.255.255.255:48689"
.parse
()
.unwrap
();
println!
(
"[+] joining multicast groups"
);
vsync
.join_multicast_v4
(
&
MAGIC_ADDRESS
,
&
"192.168.168.55"
.parse
()
.unwrap
())
.unwrap
();
mjpeg
.join_multicast_v4
(
&
MAGIC_ADDRESS
,
&
"192.168.168.55"
.parse
()
.unwrap
())
.unwrap
();
sound
.join_multicast_v4
(
&
MAGIC_ADDRESS
,
&
"192.168.168.55"
.parse
()
.unwrap
())
.unwrap
();
println!
(
"[+] setting broadcast option"
);
wakeup
.set_broadcast
(
true
)
.unwrap
();
sound
.join_multicast_v4
(
&
MAGIC_ADDRESS
,
&
"192.168.168.55"
.parse
()
.unwrap
())
.unwrap
();
let
init
=
Arc
::
new
(
AtomicBool
::
new
(
false
));
let
fucked
=
Arc
::
new
(
AtomicU8
::
new
(
0
));
let
ic
=
init
.clone
();
if
std
::
env
::
var_os
(
"NO_HEARTBEAT"
)
.is_none
()
{
println!
(
"[+] starting heartbeater"
);
let
wakeup
=
UdpSocket
::
bind
(
"192.168.168.55:48689"
)
.unwrap
();
let
wakeup_dest
:
SocketAddr
=
"255.255.255.255:48689"
.parse
()
.unwrap
();
wakeup
.set_broadcast
(
true
)
.unwrap
();
Builder
::
new
()
.name
(
"heartbeater"
.into
())
.spawn
(
move
||
{
wakeup_loop
(
wakeup
,
wakeup_dest
,
ic
);
})
.unwrap
();
.unwrap
();
std
::
thread
::
sleep_ms
(
100
);
}
init
.store
(
true
,
Ordering
::
Relaxed
);
println!
(
"[+] starting audio sender"
);
let
(
tx
,
rx
)
=
make
::
<
u8
>
(
BUFFER_SIZE
);
let
(
tx
,
rx
)
=
make
::
<
i32
>
(
BUFFER_SIZE
);
let
fc
=
fucked
.clone
();
Builder
::
new
()
.name
(
"audiosender"
.into
())
.spawn
(
move
||
{
audio_thread
(
sound
,
sound_dest
,
rx
,
fc
);
})
.unwrap
();
.unwrap
();
println!
(
"[+] starting video sender"
);
if
std
::
env
::
var_os
(
"NO_VIDEO"
)
.is_none
()
{
let
vsync
=
UdpSocket
::
bind
(
"192.168.168.55:2067"
)
.unwrap
();
let
vsync_dest
:
SocketAddr
=
"226.2.2.2:2067"
.parse
()
.unwrap
();
let
mjpeg
=
UdpSocket
::
bind
(
"192.168.168.55:2068"
)
.unwrap
();
let
mjpeg_dest
:
SocketAddr
=
"226.2.2.2:2068"
.parse
()
.unwrap
();
vsync
.join_multicast_v4
(
&
MAGIC_ADDRESS
,
&
"192.168.168.55"
.parse
()
.unwrap
())
.unwrap
();
mjpeg
.join_multicast_v4
(
&
MAGIC_ADDRESS
,
&
"192.168.168.55"
.parse
()
.unwrap
())
.unwrap
();
Builder
::
new
()
.name
(
"videosender"
.into
())
.spawn
(
move
||
{
...
...
@@ -471,7 +481,7 @@ fn main() {
// "AUDIO THREAD BEHIND" image
2
=>
fuck
as
&
[
u8
],
// normal showtime image
_
=>
showtime
as
&
[
u8
]
_
=>
showtime
as
&
[
u8
]
,
};
let
mut
chonker
=
FrameChunker
::
new
(
frame_no
,
image
);
let
vsync_pkt
=
make_vsync
(
frame_no
);
...
...
@@ -484,7 +494,7 @@ fn main() {
::
std
::
thread
::
sleep_ms
(
33
);
}
})
.unwrap
();
.unwrap
();
}
println!
(
"[+] starting audio spooler"
);
let
(
bufs_tx
,
bufs_rx
)
=
mpsc
::
sync_channel
(
0
);
...
...