Files
beStream/server/src/routes/stream.js
kharonsec 766cbbce89
Some checks failed
CI / Lint & Type Check (push) Failing after 43s
CI / Tests (push) Successful in 1m0s
CI / Build Web (push) Has been skipped
CI / Security Scan (push) Successful in 46s
CI / Build Electron (Linux) (push) Has been skipped
CI / Build Tauri (ubuntu-latest) (push) Has been skipped
CI / Build Electron (Windows) (push) Has been cancelled
CI / Build Tauri (windows-latest) (push) Has been cancelled
Add local profiles, smart features, and Google Cast support
- Local Profiles: Profile selector, manager, avatar system, profile-aware stores
- Smart Features: Continue Watching, personalized recommendations, auto-quality, smart downloads
- Google Cast: Cast service with web SDK and Capacitor Android plugin interface
- Settings: New toggles for auto-quality and smart downloads

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-31 20:44:13 +01:00

487 lines
16 KiB
JavaScript

import express from 'express';
import { v4 as uuidv4 } from 'uuid';
import path from 'path';
import fs from 'fs';
import mime from 'mime-types';
import { torrentManager } from '../services/torrentManager.js';
import { transcoder } from '../services/transcoder.js';
import { broadcastUpdate } from '../utils/websocket.js';
import { validate, schemas, isValidUUID, isValidFilename } from '../utils/validation.js';
export const streamRouter = express.Router();
/**
* Start a new streaming session
* POST /api/stream/start
*
* Body params:
* - hash: torrent hash (required)
* - name: display name (required)
* - quality: quality string (optional)
* - isSeasonPack: boolean (optional) - if true, this is a season pack
* - episodeFilePattern: regex pattern (optional) - pattern to match episode file in season pack
*/
streamRouter.post('/start', validate(schemas.streamStart, 'body'), async (req, res) => {
try {
const { hash, name, quality, isSeasonPack, episodeFilePattern } = req.body;
if (!hash || !name) {
return res.status(400).json({ error: 'Missing hash or name' });
}
const sessionId = uuidv4();
console.log(`🎬 Starting stream session ${sessionId} for "${name}"${isSeasonPack ? ' (Season Pack)' : ''}`);
// Track the actual session ID (may change if deduplicated)
let actualSessionId = sessionId;
// Start torrent download with optional episode file selection
const session = await torrentManager.startSession(
sessionId,
hash,
name,
(progress) => {
// Use the tracked session ID for broadcasts
broadcastUpdate(actualSessionId, {
type: 'progress',
...torrentManager.getSessionStatus(actualSessionId),
});
},
isSeasonPack ? { episodeFilePattern } : undefined
);
// If deduplicated, update the tracked session ID
if (session.deduplicated) {
actualSessionId = session.originalSessionId;
console.log(`↩️ Returning deduplicated session ${actualSessionId} instead of ${sessionId}`);
}
res.json({
sessionId: actualSessionId,
status: session.status,
deduplicated: session.deduplicated || false,
videoFile: session.videoFile ? {
name: session.videoFile.name,
size: session.videoFile.length,
} : null,
});
} catch (error) {
console.error('Error starting stream:', error);
// Provide more helpful error messages
let errorMessage = error.message;
if (error.message.includes('No video file found')) {
errorMessage = 'This torrent does not contain any video files. Please try a different torrent or source.';
} else if (error.message.includes('Connection timeout')) {
errorMessage = 'Unable to connect to torrent peers. The torrent may be dead or have no active seeders.';
} else if (error.message.includes('duplicate')) {
errorMessage = 'This torrent is already being processed. Please wait a moment and try again.';
} else if (error.message.includes('Episode not found in season pack')) {
errorMessage = 'The requested episode was not found in the season pack. Please try a different torrent.';
}
res.status(500).json({ error: errorMessage });
}
});
/**
* Get session status
* GET /api/stream/:sessionId/status
*/
streamRouter.get('/:sessionId/status', validate(schemas.sessionId, 'params'), (req, res) => {
const { sessionId } = req.params;
const status = torrentManager.getSessionStatus(sessionId);
if (!status) {
return res.status(404).json({ error: 'Session not found' });
}
res.json(status);
});
/**
* Direct video stream (for browsers that support the format)
* GET /api/stream/:sessionId/video
*/
streamRouter.get('/:sessionId/video', validate(schemas.sessionId, 'params'), async (req, res) => {
try {
const { sessionId } = req.params;
const session = torrentManager.getSession(sessionId);
if (!session) {
return res.status(404).json({ error: 'Session not found' });
}
// Use session's selected video file - this already respects episodeFilePattern for season packs
// The createReadStream method in torrentManager handles getting fresh references
let videoFile = session.videoFile;
// If session doesn't have a video file yet, try to get one from torrent using episode pattern
if (!videoFile && session.torrent && session.torrent.files) {
videoFile = torrentManager.findVideoFile(session.torrent, session.episodeFilePattern);
if (videoFile) {
session.videoFile = videoFile;
}
}
if (!videoFile) {
return res.status(404).json({ error: 'Video file not ready' });
}
// Validate that videoFile has required properties
const fileSize = videoFile.length;
if (typeof fileSize !== 'number' || fileSize <= 0) {
console.error(`[Stream] Invalid video file length: ${fileSize} for session ${sessionId}`);
return res.status(500).json({ error: 'Video file metadata not available' });
}
const mimeType = mime.lookup(videoFile.name) || 'video/mp4';
// Handle range requests for seeking
const range = req.headers.range;
if (range) {
const parts = range.replace(/bytes=/, '').split('-');
const start = parseInt(parts[0], 10);
const end = parts[1] ? parseInt(parts[1], 10) : fileSize - 1;
const chunkSize = end - start + 1;
res.writeHead(206, {
'Content-Range': `bytes ${start}-${end}/${fileSize}`,
'Accept-Ranges': 'bytes',
'Content-Length': chunkSize,
'Content-Type': mimeType,
});
const stream = torrentManager.createReadStream(sessionId, { start, end });
// Handle stream errors gracefully - suppress common client disconnect and WebTorrent errors
stream.on('error', (err) => {
const msg = err.message || '';
// Suppress common errors: client disconnects, WebTorrent file state issues
const isCommonError = msg.includes('closed prematurely') ||
msg.includes('ECONNRESET') ||
msg.includes('missing required') ||
msg.includes('destroyed');
if (!isCommonError) {
console.error('Stream error:', msg);
}
if (!res.headersSent) {
res.status(500).end();
}
});
// Handle client disconnect
req.on('close', () => {
stream.destroy();
});
stream.pipe(res);
} else {
res.writeHead(200, {
'Content-Length': fileSize,
'Content-Type': mimeType,
'Accept-Ranges': 'bytes',
});
const stream = torrentManager.createReadStream(sessionId);
// Handle stream errors gracefully - suppress common client disconnect and WebTorrent errors
stream.on('error', (err) => {
const msg = err.message || '';
// Suppress common errors: client disconnects, WebTorrent file state issues
const isCommonError = msg.includes('closed prematurely') ||
msg.includes('ECONNRESET') ||
msg.includes('missing required') ||
msg.includes('destroyed');
if (!isCommonError) {
console.error('Stream error:', msg);
}
if (!res.headersSent) {
res.status(500).end();
}
});
// Handle client disconnect
req.on('close', () => {
stream.destroy();
});
stream.pipe(res);
}
} catch (error) {
console.error('Error streaming video:', error);
if (!res.headersSent) {
res.status(500).json({ error: error.message });
}
}
});
/**
* Start HLS transcoding
* POST /api/stream/:sessionId/hls
*/
streamRouter.post('/:sessionId/hls', validate(schemas.sessionId, 'params'), async (req, res) => {
try {
const { sessionId } = req.params;
const session = torrentManager.getSession(sessionId);
if (!session) {
return res.status(404).json({ error: 'Session not found' });
}
if (session.status !== 'ready' && session.progress < 0.05) {
return res.status(400).json({
error: 'Video not ready yet',
progress: session.progress
});
}
const videoPath = torrentManager.getVideoPath(sessionId);
if (!videoPath) {
console.log(`[HLS] No video path for session ${sessionId}`);
return res.status(400).json({ error: 'Video file path not available' });
}
// Check if file exists on disk
const fileExists = fs.existsSync(videoPath);
let useStreamTranscode = false;
if (!fileExists) {
console.log(`[HLS] Video file does not exist on disk: ${videoPath}`);
console.log(`[HLS] Session status: ${session.status}, Progress: ${session.progress}`);
// If file doesn't exist, we can transcode directly from the WebTorrent stream
// This is how Popcorn Time handles it - stream transcoding
if (session.torrent && session.torrent.files && session.videoFile) {
// Use the session's selected video file (respects episodeFilePattern for season packs)
const videoFile = session.videoFile;
const downloaded = videoFile.downloaded || 0;
const downloadPercent = videoFile.length > 0 ? (downloaded / videoFile.length) * 100 : 0;
console.log(`[HLS] Video file in torrent: ${videoFile.name}`);
console.log(`[HLS] File length: ${videoFile.length}, downloaded: ${downloaded} (${downloadPercent.toFixed(1)}%)`);
// Check if torrent is done - if so, file should be on disk
if (session.torrent.done) {
// Torrent is complete but file not found - path might be wrong
console.log(`[HLS] Torrent is complete but file not found. Checking alternative paths...`);
const downloadDir = path.join(torrentManager.downloadPath, session.torrent.name);
if (fs.existsSync(downloadDir)) {
const files = fs.readdirSync(downloadDir, { recursive: true });
console.log(`[HLS] Files in download directory:`, files);
}
// Log internal path for debugging but don't expose to client
console.error(`[HLS] Expected path not found: ${videoPath}`);
return res.status(400).json({
error: 'Video file not found on disk even though torrent is complete'
});
}
// Check if enough is downloaded (at least 5% for stream transcoding to start)
const minDownloadPercent = 5;
if (downloadPercent < minDownloadPercent) {
return res.status(400).json({
error: 'Video file not downloaded enough yet for transcoding',
downloaded: downloaded,
total: videoFile.length,
progress: downloadPercent,
required: `${minDownloadPercent}%`
});
}
// Use stream-based transcoding (like Popcorn Time)
console.log(`[HLS] Using stream-based transcoding (file not on disk yet)`);
useStreamTranscode = true;
} else {
return res.status(400).json({
error: 'Video file not available',
status: session.status,
progress: session.progress
});
}
} else {
// Check file size - ensure it's not empty
const stats = fs.statSync(videoPath);
if (stats.size === 0) {
console.log(`[HLS] Video file is empty: ${videoPath}`);
return res.status(400).json({ error: 'Video file is empty' });
}
console.log(`[HLS] Video file ready on disk: ${videoPath}, size: ${stats.size} bytes`);
}
// Check if already transcoding or ready
if (transcoder.isHlsReady(sessionId)) {
return res.json({
status: 'ready',
playlistUrl: `/api/stream/${sessionId}/hls/playlist.m3u8`,
});
}
// Check if transcoding is already in progress (prevents duplicate FFmpeg processes)
if (transcoder.isTranscoding(sessionId)) {
console.log(`[HLS] Transcoding already in progress for session ${sessionId}`);
return res.json({
status: 'transcoding',
message: 'Transcoding already in progress',
});
}
// Get quality from session for transcoding settings
const quality = session.quality || null;
// Start transcoding - either from file or stream
if (useStreamTranscode) {
// Stream-based transcoding (like Popcorn Time)
// Transcode directly from WebTorrent stream
try {
const videoStream = torrentManager.createReadStream(sessionId);
transcoder.startHlsStreamTranscode(sessionId, videoStream, (progress) => {
broadcastUpdate(sessionId, {
type: 'transcode',
progress: progress.percent,
time: progress.time,
});
}, quality).then(() => {
broadcastUpdate(sessionId, {
type: 'transcode',
status: 'ready',
playlistUrl: `/api/stream/${sessionId}/hls/playlist.m3u8`,
});
}).catch((err) => {
console.error('Stream transcode error:', err);
broadcastUpdate(sessionId, {
type: 'error',
message: 'Stream transcoding failed',
});
});
} catch (err) {
console.error('Error creating stream for transcoding:', err);
return res.status(500).json({ error: 'Failed to start stream transcoding' });
}
} else {
// File-based transcoding (normal path)
transcoder.startHlsTranscode(sessionId, videoPath, (progress) => {
broadcastUpdate(sessionId, {
type: 'transcode',
progress: progress.percent,
time: progress.time,
});
}, quality).then(() => {
broadcastUpdate(sessionId, {
type: 'transcode',
status: 'ready',
playlistUrl: `/api/stream/${sessionId}/hls/playlist.m3u8`,
});
}).catch((err) => {
console.error('Transcode error:', err);
broadcastUpdate(sessionId, {
type: 'error',
message: 'Transcoding failed',
});
});
}
res.json({
status: 'transcoding',
message: 'HLS transcoding started',
});
} catch (error) {
console.error('Error starting HLS:', error);
res.status(500).json({ error: error.message });
}
});
/**
* Serve HLS playlist
* GET /api/stream/:sessionId/hls/playlist.m3u8
*/
streamRouter.get('/:sessionId/hls/playlist.m3u8', validate(schemas.sessionId, 'params'), (req, res) => {
const { sessionId } = req.params;
const playlistPath = transcoder.getPlaylistPath(sessionId);
if (!fs.existsSync(playlistPath)) {
return res.status(404).json({ error: 'Playlist not ready' });
}
res.setHeader('Content-Type', 'application/vnd.apple.mpegurl');
res.setHeader('Cache-Control', 'no-cache');
res.sendFile(playlistPath);
});
/**
* Serve HLS segment
* GET /api/stream/:sessionId/hls/:segment
*/
streamRouter.get('/:sessionId/hls/:segment', validate(schemas.hlsSegment, 'params'), (req, res) => {
const { sessionId, segment } = req.params;
// Additional security check - prevent path traversal
if (!isValidFilename(segment)) {
return res.status(400).json({ error: 'Invalid segment name' });
}
const segmentPath = transcoder.getSegmentPath(sessionId, segment);
if (!fs.existsSync(segmentPath)) {
return res.status(404).json({ error: 'Segment not found' });
}
res.setHeader('Content-Type', 'video/mp2t');
res.setHeader('Cache-Control', 'max-age=3600');
res.sendFile(segmentPath);
});
/**
* Get video info
* GET /api/stream/:sessionId/info
*/
streamRouter.get('/:sessionId/info', validate(schemas.sessionId, 'params'), async (req, res) => {
try {
const { sessionId } = req.params;
const videoPath = torrentManager.getVideoPath(sessionId);
if (!videoPath || !fs.existsSync(videoPath)) {
return res.status(404).json({ error: 'Video not available' });
}
const info = await transcoder.getVideoInfo(videoPath);
res.json(info);
} catch (error) {
console.error('Error getting video info:', error);
res.status(500).json({ error: error.message });
}
});
/**
* Stop a streaming session
* DELETE /api/stream/:sessionId
* Query params:
* - cleanup: If 'true', delete downloaded files
*/
streamRouter.delete('/:sessionId', validate(schemas.sessionId, 'params'), async (req, res) => {
const { sessionId } = req.params;
const cleanupFiles = req.query.cleanup === 'true';
await torrentManager.stopSession(sessionId, { cleanupFiles });
transcoder.cleanupSession(sessionId);
res.json({ status: 'stopped', filesDeleted: cleanupFiles });
});
/**
* List active sessions
* GET /api/stream/sessions
*/
streamRouter.get('/sessions', (req, res) => {
const sessions = [];
// This would need access to the sessions map
res.json({ count: torrentManager.getActiveCount() });
});