258 lines
8.6 KiB
Rust
258 lines
8.6 KiB
Rust
use std::path::{Path, PathBuf};
|
|
use std::process::Stdio;
|
|
use std::sync::Arc;
|
|
use tokio::process::Command;
|
|
use tokio::sync::RwLock;
|
|
use serde_json::{Value, json};
|
|
use std::collections::HashMap;
|
|
|
|
pub struct Transcoder {
|
|
hls_path: PathBuf,
|
|
active_transcodes: Arc<RwLock<HashMap<String, tokio::process::Child>>>,
|
|
}
|
|
|
|
impl Transcoder {
|
|
pub fn new(base_path: PathBuf) -> Self {
|
|
let hls_path = base_path.join("hls");
|
|
std::fs::create_dir_all(&hls_path).ok();
|
|
Self {
|
|
hls_path,
|
|
active_transcodes: Arc::new(RwLock::new(HashMap::new())),
|
|
}
|
|
}
|
|
|
|
pub fn get_hls_path(&self, session_id: &str) -> PathBuf {
|
|
self.hls_path.join(session_id)
|
|
}
|
|
|
|
pub fn get_playlist_path(&self, session_id: &str) -> PathBuf {
|
|
self.get_hls_path(session_id).join("playlist.m3u8")
|
|
}
|
|
|
|
pub fn get_segment_path(&self, session_id: &str, segment: &str) -> PathBuf {
|
|
self.get_hls_path(session_id).join(segment)
|
|
}
|
|
|
|
pub fn is_hls_ready(&self, session_id: &str) -> bool {
|
|
self.get_playlist_path(session_id).exists()
|
|
}
|
|
|
|
/// Get video metadata using ffprobe
|
|
pub async fn get_video_info(&self, input_path: &Path) -> Result<Value, String> {
|
|
let output = Command::new("ffprobe")
|
|
.args(&[
|
|
"-v", "quiet",
|
|
"-print_format", "json",
|
|
"-show_format",
|
|
"-show_streams",
|
|
input_path.to_str().ok_or("Invalid path")?,
|
|
])
|
|
.output()
|
|
.await
|
|
.map_err(|e| format!("Failed to run ffprobe: {}", e))?;
|
|
|
|
if !output.status.success() {
|
|
return Err(format!("ffprobe failed: {}", String::from_utf8_lossy(&output.stderr)));
|
|
}
|
|
|
|
let json: Value = serde_json::from_slice(&output.stdout)
|
|
.map_err(|e| format!("Failed to parse ffprobe output: {}", e))?;
|
|
|
|
let format = json.get("format").ok_or("No format in ffprobe output")?;
|
|
let streams = json.get("streams")
|
|
.and_then(|s| s.as_array())
|
|
.ok_or("No streams in ffprobe output")?;
|
|
|
|
let video_stream = streams.iter()
|
|
.find(|s| s.get("codec_type").and_then(|v| v.as_str()) == Some("video"));
|
|
|
|
let audio_stream = streams.iter()
|
|
.find(|s| s.get("codec_type").and_then(|v| v.as_str()) == Some("audio"));
|
|
|
|
let duration = format.get("duration")
|
|
.and_then(|d| d.as_str())
|
|
.and_then(|d| d.parse::<f64>().ok())
|
|
.unwrap_or(0.0);
|
|
|
|
let size = format.get("size")
|
|
.and_then(|s| s.as_str())
|
|
.and_then(|s| s.parse::<u64>().ok())
|
|
.unwrap_or(0);
|
|
|
|
let bitrate = format.get("bit_rate")
|
|
.and_then(|b| b.as_str())
|
|
.and_then(|b| b.parse::<u64>().ok())
|
|
.unwrap_or(0);
|
|
|
|
let mut result = json!({
|
|
"duration": duration,
|
|
"size": size,
|
|
"bitrate": bitrate,
|
|
});
|
|
|
|
if let Some(video) = video_stream {
|
|
let codec = video.get("codec_name").and_then(|c| c.as_str()).unwrap_or("unknown");
|
|
let width = video.get("width").and_then(|w| w.as_u64()).unwrap_or(0);
|
|
let height = video.get("height").and_then(|h| h.as_u64()).unwrap_or(0);
|
|
|
|
let fps = video.get("r_frame_rate")
|
|
.and_then(|f| f.as_str())
|
|
.and_then(|f| {
|
|
let parts: Vec<&str> = f.split('/').collect();
|
|
if parts.len() == 2 {
|
|
let num = parts[0].parse::<f64>().ok()?;
|
|
let den = parts[1].parse::<f64>().ok()?;
|
|
if den != 0.0 {
|
|
Some(num / den)
|
|
} else {
|
|
None
|
|
}
|
|
} else {
|
|
None
|
|
}
|
|
})
|
|
.unwrap_or(0.0);
|
|
|
|
result["video"] = json!({
|
|
"codec": codec,
|
|
"width": width,
|
|
"height": height,
|
|
"fps": fps,
|
|
});
|
|
} else {
|
|
result["video"] = Value::Null;
|
|
}
|
|
|
|
if let Some(audio) = audio_stream {
|
|
let codec = audio.get("codec_name").and_then(|c| c.as_str()).unwrap_or("unknown");
|
|
let channels = audio.get("channels").and_then(|c| c.as_u64()).unwrap_or(0);
|
|
let sample_rate = audio.get("sample_rate")
|
|
.and_then(|s| s.as_str())
|
|
.and_then(|s| s.parse::<u64>().ok())
|
|
.unwrap_or(0);
|
|
|
|
result["audio"] = json!({
|
|
"codec": codec,
|
|
"channels": channels,
|
|
"sampleRate": sample_rate,
|
|
});
|
|
} else {
|
|
result["audio"] = Value::Null;
|
|
}
|
|
|
|
Ok(result)
|
|
}
|
|
|
|
/// Start HLS transcoding
|
|
pub async fn start_hls_transcode(
|
|
&self,
|
|
session_id: String,
|
|
input_path: PathBuf,
|
|
) -> Result<(), String> {
|
|
let output_dir = self.get_hls_path(&session_id);
|
|
let playlist_path = self.get_playlist_path(&session_id);
|
|
|
|
// Create output directory
|
|
std::fs::create_dir_all(&output_dir)
|
|
.map_err(|e| format!("Failed to create output directory: {}", e))?;
|
|
|
|
// Check if already transcoded
|
|
if playlist_path.exists() {
|
|
log::info!("Using cached HLS for session {}", session_id);
|
|
return Ok(());
|
|
}
|
|
|
|
log::info!("Starting HLS transcode for session {}", session_id);
|
|
|
|
let segment_pattern = output_dir.join("segment_%03d.ts").to_string_lossy().to_string();
|
|
|
|
let mut cmd = Command::new("ffmpeg");
|
|
cmd.args(&[
|
|
"-i", input_path.to_str().ok_or("Invalid input path")?,
|
|
// HLS options
|
|
"-hls_time", "4",
|
|
"-hls_list_size", "0",
|
|
"-hls_flags", "delete_segments+append_list+split_by_time",
|
|
"-hls_segment_type", "mpegts",
|
|
"-hls_segment_filename", &segment_pattern,
|
|
// Video encoding
|
|
"-c:v", "libx264",
|
|
"-preset", "ultrafast",
|
|
"-tune", "zerolatency",
|
|
"-crf", "23",
|
|
"-maxrate", "4M",
|
|
"-bufsize", "8M",
|
|
"-g", "48",
|
|
// Audio encoding
|
|
"-c:a", "aac",
|
|
"-b:a", "128k",
|
|
"-ac", "2",
|
|
// General
|
|
"-movflags", "+faststart",
|
|
"-f", "hls",
|
|
playlist_path.to_str().ok_or("Invalid playlist path")?,
|
|
])
|
|
.stdout(Stdio::piped())
|
|
.stderr(Stdio::piped())
|
|
.stdin(Stdio::null());
|
|
|
|
let child = cmd.spawn()
|
|
.map_err(|e| format!("Failed to start ffmpeg: {}", e))?;
|
|
|
|
{
|
|
let mut active = self.active_transcodes.write().await;
|
|
active.insert(session_id.clone(), child);
|
|
}
|
|
|
|
// Monitor transcoding in background
|
|
let transcodes = self.active_transcodes.clone();
|
|
let session_id_clone = session_id.clone();
|
|
tokio::spawn(async move {
|
|
let mut child_opt = {
|
|
let mut active = transcodes.write().await;
|
|
active.remove(&session_id_clone)
|
|
};
|
|
|
|
if let Some(mut child) = child_opt {
|
|
match child.wait().await {
|
|
Ok(status) => {
|
|
if status.success() {
|
|
log::info!("HLS transcode complete for session {}", session_id_clone);
|
|
} else {
|
|
log::error!("HLS transcode failed for session {}", session_id_clone);
|
|
}
|
|
}
|
|
Err(e) => {
|
|
log::error!("Error waiting for ffmpeg: {}", e);
|
|
}
|
|
}
|
|
}
|
|
});
|
|
|
|
Ok(())
|
|
}
|
|
|
|
/// Stop transcoding for a session
|
|
pub async fn stop_transcode(&self, session_id: &str) {
|
|
let mut active = self.active_transcodes.write().await;
|
|
if let Some(mut child) = active.remove(session_id) {
|
|
let _ = child.kill().await;
|
|
log::info!("Stopped transcode for session {}", session_id);
|
|
}
|
|
}
|
|
|
|
/// Clean up HLS files for a session
|
|
pub async fn cleanup_session(&self, session_id: &str) {
|
|
self.stop_transcode(session_id).await;
|
|
let output_dir = self.get_hls_path(session_id);
|
|
if output_dir.exists() {
|
|
if let Err(e) = std::fs::remove_dir_all(&output_dir) {
|
|
log::error!("Failed to cleanup HLS directory for {}: {}", session_id, e);
|
|
} else {
|
|
log::info!("Cleaned up HLS directory for session {}", session_id);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|