From efdba35b77d1dea3266fee8aa8b0808b3f436a20 Mon Sep 17 00:00:00 2001 From: Gregor Klevze Date: Tue, 13 Jan 2026 17:15:59 +0100 Subject: [PATCH] chore: refactor async device discovery, tracing, and player Arc state --- src-tauri/Cargo.lock | 74 +++++++++++++++++++++++++++++ src-tauri/Cargo.toml | 2 + src-tauri/src/lib.rs | 103 +++++++++++++++++++++++++++++----------- src-tauri/src/player.rs | 60 ++++++++++++----------- 4 files changed, 181 insertions(+), 58 deletions(-) diff --git a/src-tauri/Cargo.lock b/src-tauri/Cargo.lock index e236f64..12cb251 100644 --- a/src-tauri/Cargo.lock +++ b/src-tauri/Cargo.lock @@ -2660,6 +2660,15 @@ dependencies = [ "syn 2.0.111", ] +[[package]] +name = "matchers" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d1525a2a28c7f4fa0fc98bb91ae755d1e2d1505079e05539e35bc876b5d65ae9" +dependencies = [ + "regex-automata", +] + [[package]] name = "matches" version = "0.1.10" @@ -2891,6 +2900,15 @@ dependencies = [ "minimal-lexical", ] +[[package]] +name = "nu-ansi-term" +version = "0.50.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7957b9740744892f114936ab4a57b3f487491bbeafaf8083688b16841a4240e5" +dependencies = [ + "windows-sys 0.61.2", +] + [[package]] name = "num-conv" version = "0.1.0" @@ -3775,6 +3793,8 @@ dependencies = [ "tauri-plugin-opener", "tauri-plugin-shell", "tokio", + "tracing", + "tracing-subscriber", ] [[package]] @@ -4566,6 +4586,15 @@ dependencies = [ "digest", ] +[[package]] +name = "sharded-slab" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f40ca3c46823713e0d4209592e8d6e826aa57e928f09752619fc696c499637f6" +dependencies = [ + "lazy_static", +] + [[package]] name = "shared_child" version = "1.1.1" @@ -5316,6 +5345,15 @@ dependencies = [ "syn 2.0.111", ] +[[package]] +name = "thread_local" +version = "1.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f60246a4944f24f6e018aa17cdeffb7818b76356965d03b07d6a9886e8962185" +dependencies = [ + "cfg-if", +] + [[package]] name = "time" version = "0.3.44" @@ -5615,6 +5653,36 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "db97caf9d906fbde555dd62fa95ddba9eecfd14cb388e4f491a66d74cd5fb79a" dependencies = [ "once_cell", + "valuable", +] + +[[package]] +name = "tracing-log" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ee855f1f400bd0e5c02d150ae5de3840039a3f54b025156404e34c23c03f47c3" +dependencies = [ + "log", + "once_cell", + "tracing-core", +] + +[[package]] +name = "tracing-subscriber" +version = "0.3.22" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2f30143827ddab0d256fd843b7a66d164e9f271cfa0dde49142c5ca0ca291f1e" +dependencies = [ + "matchers", + "nu-ansi-term", + "once_cell", + "regex-automata", + "sharded-slab", + "smallvec", + "thread_local", + "tracing", + "tracing-core", + "tracing-log", ] [[package]] @@ -5791,6 +5859,12 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "valuable" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba73ea9cf16a25df0c8caa16c51acb937d5712a8429db78a3ee29d5dcacd3a65" + [[package]] name = "value-bag" version = "1.12.0" diff --git a/src-tauri/Cargo.toml b/src-tauri/Cargo.toml index 4415825..43e36a0 100644 --- a/src-tauri/Cargo.toml +++ b/src-tauri/Cargo.toml @@ -33,4 +33,6 @@ reqwest = { version = "0.11", features = ["json", "rustls-tls"] } base64 = "0.22" cpal = "0.15" ringbuf = "0.3" +tracing = "0.1" +tracing-subscriber = { version = "0.3", features = ["fmt", "env-filter"] } diff --git a/src-tauri/src/lib.rs b/src-tauri/src/lib.rs index 216059c..0f3b273 100644 --- a/src-tauri/src/lib.rs +++ b/src-tauri/src/lib.rs @@ -2,13 +2,16 @@ use std::collections::HashMap; use std::io::{BufRead, BufReader}; use std::net::{IpAddr, SocketAddr, TcpListener, TcpStream, UdpSocket}; use std::process::{Child, Command, Stdio}; -use std::sync::Mutex; +use std::sync::{Mutex, Arc, RwLock}; use std::thread; use std::time::Duration; use mdns_sd::{ServiceDaemon, ServiceEvent}; use serde_json::json; use tauri::{AppHandle, Manager, State}; +use tauri::Emitter; +use tracing::{info, warn, error}; +use tracing_subscriber; use tauri_plugin_shell::process::{CommandChild, CommandEvent}; use tauri_plugin_shell::ShellExt; use reqwest; @@ -22,7 +25,12 @@ struct SidecarState { } struct AppState { - known_devices: Mutex>, + known_devices: Arc>>, +} + +struct DeviceInfo { + ip: String, + last_seen: std::time::Instant, } struct CastProxy { @@ -43,7 +51,7 @@ struct CastProxyStartResult { // Native (non-WebView) audio player state. // Step 1: state machine + command interface only (no decoding/output yet). struct PlayerRuntime { - shared: &'static PlayerShared, + shared: Arc, controller: PlayerController, } @@ -94,7 +102,7 @@ fn stop_cast_proxy_locked(lock: &mut Option) { if let Some(mut proxy) = lock.take() { let _ = proxy.child.kill(); let _ = proxy.child.wait(); - println!("Cast proxy stopped"); + info!("Cast proxy stopped"); } } @@ -144,7 +152,7 @@ fn spawn_standalone_cast_proxy(url: String, port: u16) -> Result std::thread::sleep(Duration::from_millis(150)); if let Ok(Some(status)) = child.try_wait() { if !status.success() { - eprintln!("Standalone cast proxy exited early; retrying with -c:a mp3"); + warn!("Standalone cast proxy exited early; retrying with -c:a mp3"); child = spawn("mp3")?; } } @@ -164,10 +172,10 @@ async fn cast_proxy_start( player::preflight_ffmpeg_only()?; let device_ip_str = { - let devices = state.known_devices.lock().unwrap(); + let devices = state.known_devices.read().unwrap(); devices .get(&device_name) - .cloned() + .map(|d| d.ip.clone()) .ok_or("Device not found")? }; let device_ip: IpAddr = device_ip_str @@ -221,13 +229,13 @@ async fn cast_proxy_start( }) } Ok(Err(e)) => { - eprintln!("Cast tap start failed; falling back to standalone proxy: {e}"); + warn!("Cast tap start failed; falling back to standalone proxy: {e}"); let mut child = spawn_standalone_cast_proxy(url, port)?; - if let Some(stderr) = child.stderr.take() { + if let Some(stderr) = child.stderr.take() { std::thread::spawn(move || { let reader = BufReader::new(stderr); for line in reader.lines().flatten() { - eprintln!("[cast-proxy ffmpeg] {line}"); + warn!("[cast-proxy ffmpeg] {line}"); } }); } @@ -240,13 +248,13 @@ async fn cast_proxy_start( }) } Err(_) => { - eprintln!("Cast tap start timed out; falling back to standalone proxy"); + warn!("Cast tap start timed out; falling back to standalone proxy"); let mut child = spawn_standalone_cast_proxy(url, port)?; if let Some(stderr) = child.stderr.take() { std::thread::spawn(move || { let reader = BufReader::new(stderr); for line in reader.lines().flatten() { - eprintln!("[cast-proxy ffmpeg] {line}"); + warn!("[cast-proxy ffmpeg] {line}"); } }); } @@ -338,7 +346,7 @@ async fn player_stop(player: State<'_, PlayerRuntime>) -> Result<(), String> { #[tauri::command] async fn list_cast_devices(state: State<'_, AppState>) -> Result, String> { - let devices = state.known_devices.lock().unwrap(); + let devices = state.known_devices.read().unwrap(); let mut list: Vec = devices.keys().cloned().collect(); list.sort(); Ok(list) @@ -353,10 +361,10 @@ async fn cast_play( url: String, ) -> Result<(), String> { let ip = { - let devices = state.known_devices.lock().unwrap(); + let devices = state.known_devices.read().unwrap(); devices .get(&device_name) - .cloned() + .map(|d| d.ip.clone()) .ok_or("Device not found")? }; @@ -366,7 +374,7 @@ async fn cast_play( let child = if let Some(ref mut child) = *lock { child } else { - println!("Spawning new sidecar..."); + info!("Spawning new sidecar..."); let sidecar_command = app .shell() .sidecar("radiocast-sidecar") @@ -377,10 +385,10 @@ async fn cast_play( while let Some(event) = rx.recv().await { match event { CommandEvent::Stdout(line) => { - println!("Sidecar: {}", String::from_utf8_lossy(&line)) + info!("Sidecar: {}", String::from_utf8_lossy(&line)) } CommandEvent::Stderr(line) => { - eprintln!("Sidecar Error: {}", String::from_utf8_lossy(&line)) + error!("Sidecar Error: {}", String::from_utf8_lossy(&line)) } _ => {} } @@ -531,7 +539,7 @@ pub fn run() { }) .setup(|app| { app.manage(AppState { - known_devices: Mutex::new(HashMap::new()), + known_devices: Arc::new(RwLock::new(HashMap::new())), }); app.manage(SidecarState { child: Mutex::new(None), @@ -540,16 +548,19 @@ pub fn run() { inner: Mutex::new(None), }); - // Player scaffolding: leak shared state to get a 'static reference for the - // long-running thread without complex lifetime plumbing. - // Later refactors can move this to Arc<...> when the engine grows. - let shared: &'static PlayerShared = Box::leak(Box::new(PlayerShared { + // Initialize tracing subscriber for structured logging. Honor RUST_LOG if set. + tracing_subscriber::fmt::init(); + + // Player scaffolding: create shared state behind an Arc and spawn the + // player thread with a cloned Arc (avoids leaking memory). + let shared = Arc::new(PlayerShared { state: Mutex::new(PlayerState::default()), - })); - let controller = player::spawn_player_thread(shared); + }); + let controller = player::spawn_player_thread(Arc::clone(&shared)); app.manage(PlayerRuntime { shared, controller }); let handle = app.handle().clone(); + let mdns_handle = handle.clone(); thread::spawn(move || { let mdns = ServiceDaemon::new().expect("Failed to create daemon"); let receiver = mdns @@ -570,12 +581,21 @@ pub fn run() { .or_else(|| addresses.iter().next()); if let Some(ip) = ip { - let state = handle.state::(); - let mut devices = state.known_devices.lock().unwrap(); + let state = mdns_handle.state::(); + let mut devices = state.known_devices.write().unwrap(); let ip_str = ip.to_string(); + let now = std::time::Instant::now(); if !devices.contains_key(&name) { - //println!("Discovered Cast Device: {} at {}", name, ip_str); - devices.insert(name, ip_str); + // new device discovered + let info = DeviceInfo { ip: ip_str.clone(), last_seen: now }; + devices.insert(name.clone(), info); + let _ = mdns_handle.emit("cast-device-discovered", json!({"name": name, "ip": ip_str})); + } else { + // update last_seen and possibly IP + if let Some(d) = devices.get_mut(&name) { + d.last_seen = now; + d.ip = ip_str; + } } } } @@ -583,6 +603,31 @@ pub fn run() { } } }); + + // Spawn a GC thread to drop stale devices and notify frontend + let gc_handle = handle.clone(); + thread::spawn(move || { + let stale_after = Duration::from_secs(30); + loop { + std::thread::sleep(Duration::from_secs(10)); + let state = gc_handle.state::(); + let mut devices = state.known_devices.write().unwrap(); + let now = std::time::Instant::now(); + let mut removed: Vec = Vec::new(); + devices.retain(|name, info| { + if now.duration_since(info.last_seen) > stale_after { + removed.push(name.clone()); + false + } else { + true + } + }); + drop(devices); + for name in removed { + let _ = gc_handle.emit("cast-device-removed", json!({"name": name})); + } + } + }); Ok(()) }) .invoke_handler(tauri::generate_handler![ diff --git a/src-tauri/src/player.rs b/src-tauri/src/player.rs index b85e2f6..ab1a511 100644 --- a/src-tauri/src/player.rs +++ b/src-tauri/src/player.rs @@ -86,10 +86,11 @@ pub struct PlayerController { pub tx: mpsc::Sender, } -pub fn spawn_player_thread(shared: &'static PlayerShared) -> PlayerController { +pub fn spawn_player_thread(shared: std::sync::Arc) -> PlayerController { let (tx, rx) = mpsc::channel::(); - std::thread::spawn(move || player_thread(shared, rx)); + let shared_for_thread = std::sync::Arc::clone(&shared); + std::thread::spawn(move || player_thread(shared_for_thread, rx)); PlayerController { tx } } @@ -113,14 +114,14 @@ fn volume_from_bits(bits: u32) -> f32 { f32::from_bits(bits) } -fn set_status(shared: &'static PlayerShared, status: PlayerStatus) { +fn set_status(shared: &std::sync::Arc, status: PlayerStatus) { let mut s = shared.state.lock().unwrap(); if s.status != status { s.status = status; } } -fn set_error(shared: &'static PlayerShared, message: String) { +fn set_error(shared: &std::sync::Arc, message: String) { let mut s = shared.state.lock().unwrap(); s.status = PlayerStatus::Error; s.error = Some(message); @@ -219,7 +220,7 @@ struct Pipeline { } impl Pipeline { - fn start(shared: &'static PlayerShared, url: String, mode: PipelineMode) -> Result { + fn start(shared: std::sync::Arc, url: String, mode: PipelineMode) -> Result { let (device, sample_format, cfg, sample_rate, channels) = match mode { PipelineMode::WithOutput => { let host = cpal::default_host(); @@ -265,7 +266,7 @@ impl Pipeline { // Decoder thread: spawns ffmpeg, reads PCM, writes into ring buffer. let stop_for_decoder = Arc::clone(&stop_flag); - let shared_for_decoder = shared; + let shared_for_decoder = std::sync::Arc::clone(&shared); let decoder_url = url.clone(); let cast_tx_for_decoder = Arc::clone(&cast_tx); let decoder_join = std::thread::spawn(move || { @@ -280,7 +281,7 @@ impl Pipeline { break; } - set_status(shared_for_decoder, PlayerStatus::Buffering); + set_status(&shared_for_decoder, PlayerStatus::Buffering); let ffmpeg = ffmpeg_command(); let ffmpeg_disp = ffmpeg.to_string_lossy(); @@ -314,7 +315,7 @@ impl Pipeline { Err(e) => { // If ffmpeg isn't available, this is a hard failure. set_error( - shared_for_decoder, + &shared_for_decoder, format!( "Failed to start ffmpeg ({ffmpeg_disp}): {e}. Set RADIOPLAYER_FFMPEG, bundle ffmpeg next to the app, or install ffmpeg on PATH." ), @@ -326,7 +327,7 @@ impl Pipeline { let mut stdout = match child.stdout.take() { Some(s) => s, None => { - set_error(shared_for_decoder, "ffmpeg stdout not available".to_string()); + set_error(&shared_for_decoder, "ffmpeg stdout not available".to_string()); let _ = child.kill(); break; } @@ -355,7 +356,7 @@ impl Pipeline { if stop_for_decoder.load(Ordering::SeqCst) { break 'outer; } - set_status(shared_for_decoder, PlayerStatus::Buffering); + set_status(&shared_for_decoder, PlayerStatus::Buffering); std::thread::sleep(Duration::from_millis(backoff_ms)); backoff_ms = (backoff_ms * 2).min(5000); continue 'outer; @@ -400,7 +401,7 @@ impl Pipeline { // Move to Playing once we've decoded a small buffer. if pushed_since_start >= playing_threshold_samples { - set_status(shared_for_decoder, PlayerStatus::Playing); + set_status(&shared_for_decoder, PlayerStatus::Playing); } } } @@ -413,7 +414,8 @@ impl Pipeline { let mut cons = cons_opt.take().expect("cons must exist for WithOutput"); // Audio callback: drain ring buffer and write to output. - let shared_for_cb = shared; + let shared_for_cb = std::sync::Arc::clone(&shared); + let shared_for_cb_err = std::sync::Arc::clone(&shared_for_cb); let stop_for_cb = Arc::clone(&stop_flag); let volume_for_cb = Arc::clone(&volume_bits); @@ -421,7 +423,7 @@ impl Pipeline { let err_fn = move |err| { let msg = format!("Audio output error: {err}"); - set_error(shared_for_cb, msg); + set_error(&shared_for_cb_err, msg); }; let built = match sample_format { @@ -448,7 +450,7 @@ impl Pipeline { if underrun != last_was_underrun { last_was_underrun = underrun; set_status( - shared_for_cb, + &shared_for_cb, if underrun { PlayerStatus::Buffering } else { @@ -485,7 +487,7 @@ impl Pipeline { if underrun != last_was_underrun { last_was_underrun = underrun; set_status( - shared_for_cb, + &shared_for_cb, if underrun { PlayerStatus::Buffering } else { @@ -523,7 +525,7 @@ impl Pipeline { if underrun != last_was_underrun { last_was_underrun = underrun; set_status( - shared_for_cb, + &shared_for_cb, if underrun { PlayerStatus::Buffering } else { @@ -653,14 +655,14 @@ impl Pipeline { } } - fn stop(mut self, shared: &'static PlayerShared) { + fn stop(mut self, shared: &std::sync::Arc) { self.stop_flag.store(true, Ordering::SeqCst); self.stop_cast_tap(); // dropping stream stops audio if let Some(j) = self.decoder_join.take() { let _ = j.join(); } - set_status(shared, PlayerStatus::Stopped); + set_status(&shared, PlayerStatus::Stopped); } fn set_volume(&self, volume: f32) { @@ -668,7 +670,7 @@ impl Pipeline { } } -fn player_thread(shared: &'static PlayerShared, rx: mpsc::Receiver) { +fn player_thread(shared: std::sync::Arc, rx: mpsc::Receiver) { // Step 2: FFmpeg decode + CPAL playback. let mut pipeline: Option = None; let mut pipeline_cast_owned = false; @@ -676,7 +678,7 @@ fn player_thread(shared: &'static PlayerShared, rx: mpsc::Receiver { if let Some(p) = pipeline.take() { - p.stop(shared); + p.stop(&shared); } pipeline_cast_owned = false; @@ -688,7 +690,7 @@ fn player_thread(shared: &'static PlayerShared, rx: mpsc::Receiver { // Apply current volume to pipeline atomics. let vol = { shared.state.lock().unwrap().volume }; @@ -696,14 +698,14 @@ fn player_thread(shared: &'static PlayerShared, rx: mpsc::Receiver { - set_error(shared, e); + set_error(&shared, e); pipeline = None; } } } PlayerCommand::PlayCast { url } => { if let Some(p) = pipeline.take() { - p.stop(shared); + p.stop(&shared); } pipeline_cast_owned = true; @@ -715,21 +717,21 @@ fn player_thread(shared: &'static PlayerShared, rx: mpsc::Receiver { let vol = { shared.state.lock().unwrap().volume }; p.set_volume(vol); pipeline = Some(p); } Err(e) => { - set_error(shared, e); + set_error(&shared, e); pipeline = None; } } } PlayerCommand::Stop => { if let Some(p) = pipeline.take() { - p.stop(shared); + p.stop(&shared); } else { let mut s = shared.state.lock().unwrap(); s.status = PlayerStatus::Stopped; @@ -762,7 +764,7 @@ fn player_thread(shared: &'static PlayerShared, rx: mpsc::Receiver