Files
beStream/src-tauri/src/transcoder.rs
2025-12-19 13:48:48 +01:00

258 lines
8.6 KiB
Rust

use std::path::{Path, PathBuf};
use std::process::Stdio;
use std::sync::Arc;
use tokio::process::Command;
use tokio::sync::RwLock;
use serde_json::{Value, json};
use std::collections::HashMap;
pub struct Transcoder {
hls_path: PathBuf,
active_transcodes: Arc<RwLock<HashMap<String, tokio::process::Child>>>,
}
impl Transcoder {
pub fn new(base_path: PathBuf) -> Self {
let hls_path = base_path.join("hls");
std::fs::create_dir_all(&hls_path).ok();
Self {
hls_path,
active_transcodes: Arc::new(RwLock::new(HashMap::new())),
}
}
pub fn get_hls_path(&self, session_id: &str) -> PathBuf {
self.hls_path.join(session_id)
}
pub fn get_playlist_path(&self, session_id: &str) -> PathBuf {
self.get_hls_path(session_id).join("playlist.m3u8")
}
pub fn get_segment_path(&self, session_id: &str, segment: &str) -> PathBuf {
self.get_hls_path(session_id).join(segment)
}
pub fn is_hls_ready(&self, session_id: &str) -> bool {
self.get_playlist_path(session_id).exists()
}
/// Get video metadata using ffprobe
pub async fn get_video_info(&self, input_path: &Path) -> Result<Value, String> {
let output = Command::new("ffprobe")
.args(&[
"-v", "quiet",
"-print_format", "json",
"-show_format",
"-show_streams",
input_path.to_str().ok_or("Invalid path")?,
])
.output()
.await
.map_err(|e| format!("Failed to run ffprobe: {}", e))?;
if !output.status.success() {
return Err(format!("ffprobe failed: {}", String::from_utf8_lossy(&output.stderr)));
}
let json: Value = serde_json::from_slice(&output.stdout)
.map_err(|e| format!("Failed to parse ffprobe output: {}", e))?;
let format = json.get("format").ok_or("No format in ffprobe output")?;
let streams = json.get("streams")
.and_then(|s| s.as_array())
.ok_or("No streams in ffprobe output")?;
let video_stream = streams.iter()
.find(|s| s.get("codec_type").and_then(|v| v.as_str()) == Some("video"));
let audio_stream = streams.iter()
.find(|s| s.get("codec_type").and_then(|v| v.as_str()) == Some("audio"));
let duration = format.get("duration")
.and_then(|d| d.as_str())
.and_then(|d| d.parse::<f64>().ok())
.unwrap_or(0.0);
let size = format.get("size")
.and_then(|s| s.as_str())
.and_then(|s| s.parse::<u64>().ok())
.unwrap_or(0);
let bitrate = format.get("bit_rate")
.and_then(|b| b.as_str())
.and_then(|b| b.parse::<u64>().ok())
.unwrap_or(0);
let mut result = json!({
"duration": duration,
"size": size,
"bitrate": bitrate,
});
if let Some(video) = video_stream {
let codec = video.get("codec_name").and_then(|c| c.as_str()).unwrap_or("unknown");
let width = video.get("width").and_then(|w| w.as_u64()).unwrap_or(0);
let height = video.get("height").and_then(|h| h.as_u64()).unwrap_or(0);
let fps = video.get("r_frame_rate")
.and_then(|f| f.as_str())
.and_then(|f| {
let parts: Vec<&str> = f.split('/').collect();
if parts.len() == 2 {
let num = parts[0].parse::<f64>().ok()?;
let den = parts[1].parse::<f64>().ok()?;
if den != 0.0 {
Some(num / den)
} else {
None
}
} else {
None
}
})
.unwrap_or(0.0);
result["video"] = json!({
"codec": codec,
"width": width,
"height": height,
"fps": fps,
});
} else {
result["video"] = Value::Null;
}
if let Some(audio) = audio_stream {
let codec = audio.get("codec_name").and_then(|c| c.as_str()).unwrap_or("unknown");
let channels = audio.get("channels").and_then(|c| c.as_u64()).unwrap_or(0);
let sample_rate = audio.get("sample_rate")
.and_then(|s| s.as_str())
.and_then(|s| s.parse::<u64>().ok())
.unwrap_or(0);
result["audio"] = json!({
"codec": codec,
"channels": channels,
"sampleRate": sample_rate,
});
} else {
result["audio"] = Value::Null;
}
Ok(result)
}
/// Start HLS transcoding
pub async fn start_hls_transcode(
&self,
session_id: String,
input_path: PathBuf,
) -> Result<(), String> {
let output_dir = self.get_hls_path(&session_id);
let playlist_path = self.get_playlist_path(&session_id);
// Create output directory
std::fs::create_dir_all(&output_dir)
.map_err(|e| format!("Failed to create output directory: {}", e))?;
// Check if already transcoded
if playlist_path.exists() {
log::info!("Using cached HLS for session {}", session_id);
return Ok(());
}
log::info!("Starting HLS transcode for session {}", session_id);
let segment_pattern = output_dir.join("segment_%03d.ts").to_string_lossy().to_string();
let mut cmd = Command::new("ffmpeg");
cmd.args(&[
"-i", input_path.to_str().ok_or("Invalid input path")?,
// HLS options
"-hls_time", "4",
"-hls_list_size", "0",
"-hls_flags", "delete_segments+append_list+split_by_time",
"-hls_segment_type", "mpegts",
"-hls_segment_filename", &segment_pattern,
// Video encoding
"-c:v", "libx264",
"-preset", "ultrafast",
"-tune", "zerolatency",
"-crf", "23",
"-maxrate", "4M",
"-bufsize", "8M",
"-g", "48",
// Audio encoding
"-c:a", "aac",
"-b:a", "128k",
"-ac", "2",
// General
"-movflags", "+faststart",
"-f", "hls",
playlist_path.to_str().ok_or("Invalid playlist path")?,
])
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.stdin(Stdio::null());
let child = cmd.spawn()
.map_err(|e| format!("Failed to start ffmpeg: {}", e))?;
{
let mut active = self.active_transcodes.write().await;
active.insert(session_id.clone(), child);
}
// Monitor transcoding in background
let transcodes = self.active_transcodes.clone();
let session_id_clone = session_id.clone();
tokio::spawn(async move {
let mut child_opt = {
let mut active = transcodes.write().await;
active.remove(&session_id_clone)
};
if let Some(mut child) = child_opt {
match child.wait().await {
Ok(status) => {
if status.success() {
log::info!("HLS transcode complete for session {}", session_id_clone);
} else {
log::error!("HLS transcode failed for session {}", session_id_clone);
}
}
Err(e) => {
log::error!("Error waiting for ffmpeg: {}", e);
}
}
}
});
Ok(())
}
/// Stop transcoding for a session
pub async fn stop_transcode(&self, session_id: &str) {
let mut active = self.active_transcodes.write().await;
if let Some(mut child) = active.remove(session_id) {
let _ = child.kill().await;
log::info!("Stopped transcode for session {}", session_id);
}
}
/// Clean up HLS files for a session
pub async fn cleanup_session(&self, session_id: &str) {
self.stop_transcode(session_id).await;
let output_dir = self.get_hls_path(session_id);
if output_dir.exists() {
if let Err(e) = std::fs::remove_dir_all(&output_dir) {
log::error!("Failed to cleanup HLS directory for {}: {}", session_id, e);
} else {
log::info!("Cleaned up HLS directory for session {}", session_id);
}
}
}
}