feat(cassette): refactored transcoder

This commit is contained in:
5rahim
2026-03-06 16:12:52 +01:00
parent 29f7947789
commit af208f47b4
21 changed files with 3068 additions and 80 deletions

View File

@@ -140,20 +140,6 @@ func (a *App) initModulesOnce() {
a.MangaDownloader.Start()
// +---------------------+
// | Media Stream |
// +---------------------+
a.MediastreamRepository = mediastream.NewRepository(&mediastream.NewRepositoryOptions{
Logger: a.Logger,
WSEventManager: a.WSEventManager,
FileCacher: a.FileCacher,
})
a.AddCleanupFunction(func() {
a.MediastreamRepository.OnCleanup()
})
// +---------------------+
// | Video Core |
// +---------------------+
@@ -171,6 +157,21 @@ func (a *App) initModulesOnce() {
IsOfflineRef: a.IsOfflineRef(),
})
// +---------------------+
// | Media Stream |
// +---------------------+
a.MediastreamRepository = mediastream.NewRepository(&mediastream.NewRepositoryOptions{
Logger: a.Logger,
WSEventManager: a.WSEventManager,
FileCacher: a.FileCacher,
VideoCore: a.VideoCore,
})
a.AddCleanupFunction(func() {
a.MediastreamRepository.OnCleanup()
})
// +---------------------+
// | Native Player |
// +---------------------+

View File

@@ -18,8 +18,7 @@ func (h *Handler) OptionalAuthMiddleware(next echo.HandlerFunc) echo.HandlerFunc
// Allow the following paths to be accessed by anyone
if path == "/api/v1/status" || // public but restricted
path == "/events" || // for server events (auth handled by websocket handler)
strings.HasPrefix(path, "/api/v1/mediastream/transcode/") { // HLS segments (TODO: secure later)
path == "/events" { // for server events (auth handled by websocket handler)
if path == "/api/v1/status" {
// allow status requests by all clients but mark as unauthenticated

View File

@@ -0,0 +1,280 @@
package cassette
import (
"context"
"fmt"
"os"
"path"
"path/filepath"
"seanime/internal/mediastream/videofile"
"sync"
"time"
"github.com/rs/zerolog"
)
// NewCassetteOptions configuration for the transcoder
type NewCassetteOptions struct {
Logger *zerolog.Logger
HwAccelKind string
Preset string
TempOutDir string
FfmpegPath string
FfprobePath string
HwAccelCustomSettings string
// MaxConcurrency limits simultaneous ffmpeg processes. 0 = NumCPU
MaxConcurrency int
}
// Cassette is the top-level transcoding orchestrator.
// it manages sessions, client tracking, and resource allocation.
type Cassette struct {
sessions sync.Map // map[string]*Session keyed by file path.
sessionsMu sync.Mutex
clientChan chan ClientInfo
tracker *ClientTracker
governor *Governor
logger *zerolog.Logger
settings Settings
}
// New creates and returns a cassette instance.
// it prepares the stream directory and initializes the governor and tracker.
func New(opts *NewCassetteOptions) (*Cassette, error) {
streamDir := filepath.Join(opts.TempOutDir, "streams")
if err := os.MkdirAll(streamDir, 0755); err != nil {
return nil, fmt.Errorf("cassette: failed to create stream dir: %w", err)
}
// clear stale segment dirs from previous runs.
entries, err := os.ReadDir(streamDir)
if err != nil {
return nil, err
}
for _, e := range entries {
_ = os.RemoveAll(path.Join(streamDir, e.Name()))
}
hwAccel := BuildHwAccelProfile(HwAccelOptions{
Kind: opts.HwAccelKind,
Preset: opts.Preset,
CustomSettings: opts.HwAccelCustomSettings,
}, opts.FfmpegPath, opts.Logger)
c := &Cassette{
clientChan: make(chan ClientInfo, 1024),
governor: NewGovernor(opts.MaxConcurrency, hwAccel.Name != "disabled", opts.Logger),
logger: opts.Logger,
settings: Settings{
StreamDir: streamDir,
HwAccel: hwAccel,
FfmpegPath: opts.FfmpegPath,
FfprobePath: opts.FfprobePath,
},
}
c.tracker = NewClientTracker(c)
c.logger.Info().
Str("hwaccel", hwAccel.Name).
Int("maxConcurrency", c.governor.maxSlot).
Msg("cassette: initialised")
return c, nil
}
// GetSettings returns the current settings.
func (c *Cassette) GetSettings() *Settings {
return &c.settings
}
// GovernorStats returns the resource governor metrics.
func (c *Cassette) GovernorStats() GovernorStats {
return c.governor.Stats()
}
// Destroy stops all sessions and clears the output directory.
func (c *Cassette) Destroy() {
defer func() {
if r := recover(); r != nil {
c.logger.Warn().Interface("recover", r).Msg("cassette: recovered during destroy")
}
}()
c.tracker.Stop()
c.logger.Debug().Msg("cassette: destroying all sessions")
c.sessions.Range(func(key, value any) bool {
if s, ok := value.(*Session); ok {
s.Destroy()
}
c.sessions.Delete(key)
return true
})
// clear keyframe cache
ClearKeyframeCache()
c.logger.Debug().Msg("cassette: destroyed")
}
// session management
func (c *Cassette) getSession(
filePath, hash string,
mediaInfo *videofile.MediaInfo,
) (*Session, error) {
// session already exists
if v, ok := c.sessions.Load(filePath); ok {
s := v.(*Session)
s.WaitReady()
if s.err != nil {
c.sessions.Delete(filePath)
return nil, s.err
}
return s, nil
}
// create session
c.sessionsMu.Lock()
if v, ok := c.sessions.Load(filePath); ok {
c.sessionsMu.Unlock()
s := v.(*Session)
s.WaitReady()
if s.err != nil {
c.sessions.Delete(filePath)
return nil, s.err
}
return s, nil
}
s := NewSession(filePath, hash, mediaInfo, &c.settings, c.governor, c.logger)
c.sessions.Store(filePath, s)
c.sessionsMu.Unlock()
s.WaitReady()
if s.err != nil {
c.sessions.Delete(filePath)
return nil, s.err
}
return s, nil
}
// getSessionByPath returns the session for a path
func (c *Cassette) getSessionByPath(filePath string) *Session {
v, ok := c.sessions.Load(filePath)
if !ok {
return nil
}
return v.(*Session)
}
// destroySession removes and destroys a session
func (c *Cassette) destroySession(filePath string) {
v, ok := c.sessions.LoadAndDelete(filePath)
if !ok {
return
}
v.(*Session).Destroy()
}
// public api
func (c *Cassette) sendClientInfo(info ClientInfo) {
select {
case c.clientChan <- info:
default:
// channel full, drop update
c.logger.Warn().Msg("cassette: client channel full, dropping update")
}
}
// GetMaster returns the hls master playlist
func (c *Cassette) GetMaster(
filePath, hash string,
mediaInfo *videofile.MediaInfo,
client string,
token string,
) (string, error) {
start := time.Now()
s, err := c.getSession(filePath, hash, mediaInfo)
if err != nil {
return "", err
}
c.sendClientInfo(ClientInfo{
Client: client, Path: filePath,
Quality: nil, Audio: -1, Head: -1,
})
c.logger.Trace().Dur("elapsed", time.Since(start)).Msg("cassette: GetMaster")
return s.GetMaster(token), nil
}
// GetVideoIndex returns the hls variant playlist for video quality.
// fetching a variant is just a probe.
func (c *Cassette) GetVideoIndex(
filePath, hash string,
mediaInfo *videofile.MediaInfo,
quality Quality,
client string,
token string,
) (string, error) {
s, err := c.getSession(filePath, hash, mediaInfo)
if err != nil {
return "", err
}
return s.GetVideoIndex(quality, token)
}
// GetAudioIndex returns the hls variant playlist for an audio track.
func (c *Cassette) GetAudioIndex(
filePath, hash string,
mediaInfo *videofile.MediaInfo,
audio int32,
client string,
token string,
) (string, error) {
s, err := c.getSession(filePath, hash, mediaInfo)
if err != nil {
return "", err
}
return s.GetAudioIndex(audio, token)
}
// GetVideoSegment returns the path to a transcoded video segment file.
func (c *Cassette) GetVideoSegment(
ctx context.Context,
filePath, hash string,
mediaInfo *videofile.MediaInfo,
quality Quality,
segment int32,
client string,
) (string, error) {
s, err := c.getSession(filePath, hash, mediaInfo)
if err != nil {
return "", err
}
c.sendClientInfo(ClientInfo{
Client: client, Path: filePath,
Quality: &quality, Audio: -1, Head: segment,
})
return s.GetVideoSegment(ctx, quality, segment)
}
// GetAudioSegment returns the path to a transcoded audio segment file.
func (c *Cassette) GetAudioSegment(
ctx context.Context,
filePath, hash string,
mediaInfo *videofile.MediaInfo,
audio, segment int32,
client string,
) (string, error) {
s, err := c.getSession(filePath, hash, mediaInfo)
if err != nil {
return "", err
}
c.sendClientInfo(ClientInfo{
Client: client, Path: filePath,
Quality: nil, Audio: audio, Head: segment,
})
return s.GetAudioSegment(ctx, audio, segment)
}

View File

@@ -0,0 +1,91 @@
package cassette
import (
"context"
"runtime"
"sync"
"sync/atomic"
"time"
"github.com/rs/zerolog"
)
// Governor throttles concurrent ffmpeg processes
type Governor struct {
sem chan struct{}
active atomic.Int32
maxSlot int
logger *zerolog.Logger
mu sync.Mutex
stats GovernorStats
}
// GovernorStats contains runtime metrics
type GovernorStats struct {
ActiveProcesses int32 `json:"activeProcesses"`
MaxConcurrency int `json:"maxConcurrency"`
TotalLaunched int64 `json:"totalLaunched"`
TotalCompleted int64 `json:"totalCompleted"`
TotalWaitTime time.Duration `json:"totalWaitTime"`
}
// NewGovernor creates a governor with max concurrency
func NewGovernor(maxConcurrency int, hwAccelEnabled bool, logger *zerolog.Logger) *Governor {
if maxConcurrency <= 0 {
if hwAccelEnabled {
maxConcurrency = max(runtime.NumCPU()*2, 10) // give hardware accel a higher threshold
} else {
maxConcurrency = max(runtime.NumCPU(), 1)
}
}
return &Governor{
sem: make(chan struct{}, maxConcurrency),
maxSlot: maxConcurrency,
logger: logger,
}
}
// Acquire blocks until a slot is available
func (g *Governor) Acquire(ctx context.Context) (release func(), err error) {
start := time.Now()
select {
case g.sem <- struct{}{}:
case <-ctx.Done():
return nil, ctx.Err()
}
waited := time.Since(start)
n := g.active.Add(1)
g.mu.Lock()
g.stats.TotalLaunched++
g.stats.TotalWaitTime += waited
g.stats.ActiveProcesses = n
g.stats.MaxConcurrency = g.maxSlot
g.mu.Unlock()
if waited > 50*time.Millisecond {
g.logger.Debug().
Dur("waited", waited).
Int32("active", n).
Msg("cassette/governor: slot acquired after wait")
}
return func() {
remaining := g.active.Add(-1)
g.mu.Lock()
g.stats.TotalCompleted++
g.stats.ActiveProcesses = remaining
g.mu.Unlock()
<-g.sem
}, nil
}
// Stats returns the governor metrics
func (g *Governor) Stats() GovernorStats {
g.mu.Lock()
defer g.mu.Unlock()
return g.stats
}

View File

@@ -0,0 +1,319 @@
package cassette
import (
"context"
"fmt"
"os/exec"
"runtime"
"seanime/internal/mediastream/videofile"
"strings"
"time"
"github.com/goccy/go-json"
"github.com/rs/zerolog"
)
// HwAccelOptions are configuration knobs for hardware acceleration
type HwAccelOptions struct {
Kind string
Preset string
CustomSettings string // JSON-encoded HwAccelProfile for "custom" kind.
}
// BuildHwAccelProfile returns a profile for the requested hardware backend
func BuildHwAccelProfile(opts HwAccelOptions, ffmpegPath string, logger *zerolog.Logger) HwAccelProfile {
name := opts.Kind
if name == "" || name == "cpu" || name == "none" {
name = "disabled"
}
// Handle custom JSON profile.
var custom HwAccelProfile
if name == "custom" {
if opts.CustomSettings == "" {
logger.Warn().Msg("cassette: custom hwaccel selected but no settings provided, falling back to CPU")
name = "disabled"
} else if err := json.Unmarshal([]byte(opts.CustomSettings), &custom); err != nil {
logger.Error().Err(err).Msg("cassette: failed to parse custom hwaccel settings, falling back to CPU")
name = "disabled"
} else {
custom.Name = "custom"
}
}
// probe for the best encoder
if name == "auto" {
name = probeHardwareEncoder(ffmpegPath, logger)
}
logger.Debug().Str("backend", name).Msg("cassette: hardware acceleration resolved")
defaultDevice := "/dev/dri/renderD128"
if runtime.GOOS == "windows" {
defaultDevice = "auto"
}
preset := opts.Preset
if preset == "" {
preset = "fast"
}
switch name {
case "disabled":
return cpuProfile(preset)
case "vaapi":
return vaApiProfile(defaultDevice)
case "qsv", "intel":
return qsvProfile(defaultDevice, preset)
case "nvidia":
return nvidiaProfile(preset)
case "videotoolbox":
return videotoolboxProfile()
case "custom":
return custom
default:
logger.Warn().Str("name", name).Msg("cassette: unknown hwaccel, falling back to CPU")
return cpuProfile(preset)
}
}
// hardware probing
// probeHardwareEncoder tests encoders and returns the best backend
func probeHardwareEncoder(ffmpegPath string, logger *zerolog.Logger) string {
if ffmpegPath == "" {
ffmpegPath = "ffmpeg"
}
type candidate struct {
name string
encoder string
}
candidates := []candidate{
{"nvidia", "h264_nvenc"},
{"qsv", "h264_qsv"},
{"vaapi", "h264_vaapi"},
}
if runtime.GOOS == "darwin" {
candidates = append(candidates, candidate{"videotoolbox", "h264_videotoolbox"})
}
for _, c := range candidates {
if testEncoder(ffmpegPath, c.encoder) {
logger.Info().
Str("encoder", c.encoder).
Str("backend", c.name).
Msg("cassette: hardware encoder probe succeeded")
return c.name
}
logger.Trace().Str("encoder", c.encoder).Msg("cassette: hardware encoder probe failed")
}
logger.Info().Msg("cassette: no hardware encoder available, using CPU")
return "disabled"
}
// TestEncoder attempts a minimal encode to verify if it works
func testEncoder(ffmpegPath, encoder string) bool {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
// Generate 1 frame of black video and encode it with the candidate
// encoder. If this succeeds, the encoder is functional.
cmd := exec.CommandContext(ctx, ffmpegPath,
"-f", "lavfi",
"-i", "color=black:s=64x64:d=0.04",
"-c:v", encoder,
"-frames:v", "1",
"-f", "null", "-",
)
cmd.Stdout = nil
cmd.Stderr = nil
return cmd.Run() == nil
}
// profile constructors
func cpuProfile(preset string) HwAccelProfile {
return HwAccelProfile{
Name: "disabled",
DecodeFlags: []string{},
EncodeFlags: []string{
"-c:v", "libx264",
"-preset", preset,
"-profile:v", "high", // ?
"-tune", "animation", // ?
// "-tune", "fastdecode,zerolatency", // ?
"-sc_threshold", "0",
"-pix_fmt", "yuv420p",
},
ScaleFilter: "scale=%d:%d",
NoScaleFilter: "format=yuv420p",
ForcedIDR: true,
}
}
func vaApiProfile(device string) HwAccelProfile {
return HwAccelProfile{
Name: "vaapi",
DecodeFlags: []string{
"-hwaccel", "vaapi",
"-hwaccel_device", GetEnvOr("SEANIME_TRANSCODER_VAAPI_RENDERER", device),
"-hwaccel_output_format", "vaapi",
},
EncodeFlags: []string{
"-c:v", "h264_vaapi",
"-profile:v", "high", // ?
},
ScaleFilter: "format=nv12|vaapi,hwupload,scale_vaapi=%d:%d:format=nv12",
NoScaleFilter: "format=nv12|vaapi,hwupload",
ForcedIDR: true,
}
}
func qsvProfile(device, preset string) HwAccelProfile {
return HwAccelProfile{
Name: "qsv",
DecodeFlags: []string{
"-hwaccel", "qsv",
"-qsv_device", GetEnvOr("SEANIME_TRANSCODER_QSV_RENDERER", device),
"-hwaccel_output_format", "qsv",
},
EncodeFlags: []string{
"-c:v", "h264_qsv",
"-preset", preset,
"-profile:v", "high", // ?
"-async_depth", "1", // ? reduce latency
"-look_ahead", "0", // ?
"-bf", "3", // ?
},
ScaleFilter: "format=nv12|qsv,hwupload,scale_qsv=%d:%d:format=nv12",
NoScaleFilter: "format=nv12|qsv,hwupload",
ForcedIDR: true,
}
}
func nvidiaProfile(preset string) HwAccelProfile {
if preset == "ultrafast" {
preset = "p1"
}
return HwAccelProfile{
Name: "nvidia",
DecodeFlags: []string{
"-hwaccel", "cuda",
"-hwaccel_output_format", "cuda",
},
EncodeFlags: []string{
"-c:v", "h264_nvenc",
"-preset", preset,
"-profile:v", "high", // ?
"-rc:v", "vbr", // ?
"-bf", "0", // ?
"-spatial-aq", "1", // ?
"-temporal-aq", "1", // ?
"-rc-lookahead", "0", // ?
"-delay", "0",
"-no-scenecut", "1",
},
ScaleFilter: "format=nv12|cuda,hwupload,scale_cuda=%d:%d:format=nv12",
NoScaleFilter: "format=nv12|cuda,hwupload",
ForcedIDR: true,
}
}
func videotoolboxProfile() HwAccelProfile {
return HwAccelProfile{
Name: "videotoolbox",
DecodeFlags: []string{
"-hwaccel", "videotoolbox",
},
EncodeFlags: []string{
"-c:v", "h264_videotoolbox",
// "-realtime", "true",
// "-prio_speed", "true",
"-profile:v", "main",
},
ScaleFilter: "scale=%d:%d",
NoScaleFilter: "format=yuv420p",
ForcedIDR: true,
}
}
// runtime fallback and adjustments
// BuildVideoFilter generates the scale filter string
func BuildVideoFilter(hw *HwAccelProfile, video *videofile.Video, width, height int32) string {
noScale := false
if video.Width == uint32(width) && video.Height == uint32(height) {
noScale = true
}
if hw.Name == "custom" {
if noScale && hw.NoScaleFilter != "" {
return hw.NoScaleFilter
}
return fmt.Sprintf(hw.ScaleFilter, width, height)
}
var filter string
if noScale && hw.NoScaleFilter != "" {
filter = hw.NoScaleFilter
} else {
filter = fmt.Sprintf(hw.ScaleFilter, width, height)
}
// Enable p010 hwupload if the source is 10-bit or 12-bit
is10Bit := strings.Contains(strings.ToLower(video.PixFmt), "10le") || strings.Contains(strings.ToLower(video.PixFmt), "12le") || strings.Contains(strings.ToLower(video.PixFmt), "p010")
if is10Bit && strings.HasPrefix(filter, "format=nv12|") {
filter = strings.Replace(filter, "format=nv12|", "format=p010|", 1)
}
// Software and Videotoolbox use scale= directly
if !noScale && (hw.Name == "disabled" || hw.Name == "videotoolbox") {
return filter
}
return filter
}
// FallbackToCPU returns a cpu profile
func FallbackToCPU(preset string) HwAccelProfile {
return cpuProfile(preset)
}
// DetectHwAccelFailure checks for hardware acceleration failures
func DetectHwAccelFailure(stderr string) bool {
lower := strings.ToLower(stderr)
failureSignals := []string{
"hwaccel", "vaapi", "cuvid", "vdpau", "qsv",
"cuda", "nvenc", "videotoolbox",
"no capable devices found",
"device creation failed",
"initialization failed",
}
if !strings.Contains(lower, "failed") && !strings.Contains(lower, "error") {
return false
}
for _, sig := range failureSignals {
if strings.Contains(lower, sig) {
return true
}
}
return false
}
// FormatHwAccelSummary returns a summary of the active profile
func FormatHwAccelSummary(p HwAccelProfile) string {
if p.Name == "disabled" {
return "CPU (software encoding)"
}
encoder := "unknown"
for i, f := range p.EncodeFlags {
if f == "-c:v" && i+1 < len(p.EncodeFlags) {
encoder = p.EncodeFlags[i+1]
break
}
}
return fmt.Sprintf("%s (%s)", strings.ToUpper(p.Name), encoder)
}

View File

@@ -0,0 +1,239 @@
package cassette
import (
"bufio"
"path/filepath"
"seanime/internal/mediastream/videofile"
"seanime/internal/util"
"strconv"
"strings"
"sync"
"sync/atomic"
"github.com/rs/zerolog"
)
// KeyframeIndex holds extracted keyframe timestamps
type KeyframeIndex struct {
Sha string `json:"sha"`
Keyframes []float64 `json:"keyframes"`
IsDone bool `json:"isDone"`
mu sync.RWMutex
ready sync.WaitGroup
listeners []func(keyframes []float64)
}
// Get returns the keyframe timestamp
func (ki *KeyframeIndex) Get(idx int32) float64 {
ki.mu.RLock()
defer ki.mu.RUnlock()
return ki.Keyframes[idx]
}
// Slice returns a copy of keyframe timestamps
func (ki *KeyframeIndex) Slice(start, end int32) []float64 {
if end <= start {
return nil
}
ki.mu.RLock()
defer ki.mu.RUnlock()
out := make([]float64, end-start)
copy(out, ki.Keyframes[start:end])
return out
}
// Length returns number of keyframes and status
func (ki *KeyframeIndex) Length() (int32, bool) {
ki.mu.RLock()
defer ki.mu.RUnlock()
return int32(len(ki.Keyframes)), ki.IsDone
}
// AddListener registers a callback for new keyframes
func (ki *KeyframeIndex) AddListener(fn func([]float64)) {
ki.mu.Lock()
defer ki.mu.Unlock()
ki.listeners = append(ki.listeners, fn)
}
func (ki *KeyframeIndex) append(values []float64) {
ki.mu.Lock()
defer ki.mu.Unlock()
ki.Keyframes = append(ki.Keyframes, values...)
for _, fn := range ki.listeners {
fn(ki.Keyframes)
}
}
// global keyframe cache
var (
kfCache sync.Map // map[string]*KeyframeIndex
kfCacheMu sync.Mutex
)
// ClearKeyframeCache removes cached indexes
func ClearKeyframeCache() {
kfCache.Range(func(key, _ any) bool {
kfCache.Delete(key)
return true
})
}
// getOrExtractKeyframes returns a keyframe index
func getOrExtractKeyframes(
path string,
hash string,
settings *Settings,
logger *zerolog.Logger,
) *KeyframeIndex {
if v, ok := kfCache.Load(hash); ok {
ki := v.(*KeyframeIndex)
ki.ready.Wait()
return ki
}
kfCacheMu.Lock()
if v, ok := kfCache.Load(hash); ok {
kfCacheMu.Unlock()
ki := v.(*KeyframeIndex)
ki.ready.Wait()
return ki
}
ki := &KeyframeIndex{Sha: hash}
ki.ready.Add(1)
kfCache.Store(hash, ki)
kfCacheMu.Unlock()
go func() {
diskPath := filepath.Join(settings.StreamDir, hash, "keyframes.json")
// Try disk cache first
if err := getSavedInfo(diskPath, ki); err == nil {
logger.Trace().Msg("cassette: keyframes disk cache HIT")
ki.ready.Done()
return
}
// Extract from the file
if err := extractKeyframes(settings.FfprobePath, path, ki, hash, logger); err == nil {
saveInfo(diskPath, ki)
}
}()
ki.ready.Wait()
return ki
}
// extractKeyframes probes the file for keyframes
func extractKeyframes(
ffprobePath string,
path string,
ki *KeyframeIndex,
hash string,
logger *zerolog.Logger,
) error {
defer printExecTime(logger, "ffprobe keyframe analysis for %s", path)()
probeBin := ffprobePath
if probeBin == "" {
probeBin = "ffprobe"
}
cmd := util.NewCmd(
probeBin,
"-loglevel", "error",
"-select_streams", "v:0",
"-show_entries", "packet=pts_time,flags",
"-of", "csv=print_section=0",
path,
)
stdout, err := cmd.StdoutPipe()
if err != nil {
return err
}
if err := cmd.Start(); err != nil {
return err
}
scanner := bufio.NewScanner(stdout)
buf := make([]float64, 0, 1000)
batchSize := 100
flushed := int32(0)
var readyDone atomic.Bool
flush := func(final bool) {
if len(buf) == 0 && !final {
return
}
ki.append(buf)
flushed += int32(len(buf))
if !readyDone.Load() {
readyDone.Store(true)
ki.ready.Done()
}
buf = buf[:0]
// After the first 500 keyframes increase batch size to reduce
// listener overhead on long files
if flushed >= 500 {
batchSize = 500
}
}
for scanner.Scan() {
line := scanner.Text()
if line == "" {
continue
}
parts := strings.SplitN(line, ",", 2)
if len(parts) < 2 {
continue
}
pts, flags := parts[0], parts[1]
if pts == "N/A" {
break
}
if len(flags) == 0 || flags[0] != 'K' {
continue
}
fpts, err := strconv.ParseFloat(pts, 64)
if err != nil {
return err
}
buf = append(buf, fpts)
if len(buf) >= batchSize {
flush(false)
}
}
// Handle files with <=1 keyframe
if flushed == 0 && len(buf) < 2 {
dummy, err := makeDummyKeyframes(ffprobePath, path, hash)
if err != nil {
return err
}
buf = dummy
}
flush(true)
ki.IsDone = true
return nil
}
// makeDummyKeyframes at 2s intervals
func makeDummyKeyframes(ffprobePath, path, hash string) ([]float64, error) {
const interval = 2.0
info, err := videofile.FfprobeGetInfo(ffprobePath, path, hash)
if err != nil {
return nil, err
}
n := int(float64(info.Duration)/interval) + 1
out := make([]float64, n)
for i := range out {
out[i] = float64(i) * interval
}
return out, nil
}

View File

@@ -0,0 +1,608 @@
package cassette
import (
"bufio"
"context"
"errors"
"fmt"
"io"
"math"
"os"
"os/exec"
"path/filepath"
"seanime/internal/util"
"strings"
"sync"
"time"
"github.com/rs/zerolog"
"github.com/samber/lo"
)
// PipelineKind distinguishes video from audio pipelines
type PipelineKind int
const (
VideoKind PipelineKind = iota
AudioKind
)
func (k PipelineKind) String() string {
if k == VideoKind {
return "video"
}
return "audio"
}
// head represents an ffmpeg process encoding segments
type head struct {
segment int32 // Current segment (updated as ffmpeg writes segments).
end int32 // First segment NOT included in this head's work.
cmd *exec.Cmd // The ffmpeg process.
stdin io.WriteCloser // Used to gracefully quit ffmpeg via "q".
cancel context.CancelFunc // Cancels the head's soft-close goroutine.
}
var deletedHead = head{segment: -1, end: -1}
// Pipeline manages a single encode stream
type Pipeline struct {
kind PipelineKind
label string // e.g. "video (720p)" or "audio 0"
session *Session
segments *SegmentTable
velocity *VelocityEstimator
// heads tracks all in-flight encoder processes.
headsMu sync.RWMutex
heads []head
activeHeadsWg sync.WaitGroup
// killCh is recreated each time a segment is requested. Closing it
// aborts any WaitFor in progress.
killCh chan struct{}
settings *Settings
governor *Governor
logger *zerolog.Logger
ctx context.Context
cancel context.CancelFunc
// buildArgs is the strategy function that produces the quality-specific
// part of the ffmpeg command line.
buildArgs func(segmentTimes string) []string
// outPathFmt returns the output path pattern for a given encoder ID.
outPathFmt func(encoderID int) string
}
// PipelineConfig configures a new pipeline
type PipelineConfig struct {
Kind PipelineKind
Label string
Session *Session
Settings *Settings
Governor *Governor
Logger *zerolog.Logger
BuildArgs func(segmentTimes string) []string
OutPathFmt func(encoderID int) string
}
// NewPipeline creates a pipeline and initializes its segment table
func NewPipeline(cfg PipelineConfig) *Pipeline {
ctx, cancel := context.WithCancel(context.Background())
length, isDone := cfg.Session.Keyframes.Length()
segments := NewSegmentTable(length)
p := &Pipeline{
kind: cfg.Kind,
label: cfg.Label,
session: cfg.Session,
segments: segments,
velocity: NewVelocityEstimator(30 * time.Second),
heads: make([]head, 0, 4),
killCh: make(chan struct{}),
settings: cfg.Settings,
governor: cfg.Governor,
logger: cfg.Logger,
ctx: ctx,
cancel: cancel,
buildArgs: cfg.BuildArgs,
outPathFmt: cfg.OutPathFmt,
}
if !isDone {
cfg.Session.Keyframes.AddListener(func(keyframes []float64) {
segments.Grow(len(keyframes))
})
}
// Scan for existing segments on disk (segment reuse).
go p.reclaimExistingSegments()
return p
}
// reclaimExistingSegments scans for matches on disk
func (p *Pipeline) reclaimExistingSegments() {
dir := filepath.Dir(p.outPathFmt(0))
entries, err := os.ReadDir(dir)
if err != nil {
return // Not an error — dir may not exist yet.
}
pattern := filepath.Base(p.outPathFmt(0))
reclaimed := 0
for _, entry := range entries {
if entry.IsDir() {
continue
}
var seg int32
if _, err := fmt.Sscanf(entry.Name(), pattern, &seg); err != nil {
continue
}
if seg < 0 || seg >= int32(p.segments.Len()) {
continue
}
// File exists on disk — mark as ready with encoder ID 0.
if !p.segments.IsReady(seg) {
p.segments.MarkReady(seg, 0)
reclaimed++
}
}
if reclaimed > 0 {
p.logger.Debug().
Int("count", reclaimed).
Str("pipeline", p.label).
Msg("cassette: reclaimed existing segments from disk")
}
}
// GetIndex generates an hls variant playlist for this pipeline's segments
func (p *Pipeline) GetIndex(token string) (string, error) {
return GenerateVariantPlaylist(
p.session.Keyframes,
float64(p.session.Info.Duration),
token,
), nil
}
// GetSegment blocks until the requested segment is ready and returns the path
// to the .ts file on disk
func (p *Pipeline) GetSegment(ctx context.Context, seg int32) (string, error) {
// Recreate the kill channel so that a previously-killed pipeline can
// service new requests
p.killCh = make(chan struct{})
// Record for velocity tracking
p.velocity.Record(seg)
// if the user jumped far, kill all distant heads
// immediately so we don't waste resources
if p.velocity.DetectSeek(50) {
p.killDistantHeads(seg)
}
if p.segments.IsReady(seg) {
p.prefetch(seg)
return p.segmentPath(seg), nil
}
// decide whether to spawn a new encoder
p.headsMu.RLock()
distance := p.minHeadDistance(seg)
scheduled := p.isScheduled(seg)
p.headsMu.RUnlock()
// todo: improve
if distance > 60 || !scheduled {
if err := p.runHead(seg); err != nil {
return "", err
}
}
// Wait for the segment, allowing the client's request ctx to
// abort the wait early if they disconnect
waitCtx, waitCancel := context.WithTimeout(ctx, 30*time.Second)
defer waitCancel()
if err := p.segments.WaitFor(waitCtx, seg, p.killCh); err != nil {
return "", fmt.Errorf("cassette: %s segment %d not ready: %w", p.label, seg, err)
}
p.prefetch(seg)
return p.segmentPath(seg), nil
}
// segmentPath returns the path for a ready segment
func (p *Pipeline) segmentPath(seg int32) string {
return fmt.Sprintf(filepath.ToSlash(p.outPathFmt(p.segments.EncoderID(seg))), seg)
}
// Kill signals all ffmpeg heads to stop and closes the pipeline context, waiting
// for all processes to fully exit to ensure no files are locked
func (p *Pipeline) Kill() {
p.cancel() // Cancel the global pipeline context
p.headsMu.Lock()
for i := range p.heads {
p.killHeadLocked(i)
}
p.headsMu.Unlock()
// block until all reaper goroutines have finished, ensuring no ffmpeg
// process is still lingering and locking files
p.activeHeadsWg.Wait()
}
// killHeadLocked terminates an encoder head
func (p *Pipeline) killHeadLocked(id int) {
defer func() { recover() }() // Guard against double-close of killCh.
select {
case <-p.killCh:
default:
close(p.killCh)
}
h := p.heads[id]
if h.cancel != nil {
h.cancel()
}
if h.segment == -1 || h.cmd == nil {
return
}
// use Kill to guarantee termination across platforms (os.Interrupt is
// unsupported on Windows for os.Process.Signal)
_ = h.cmd.Process.Kill()
p.heads[id] = deletedHead
}
// killDistantHeads kills distant heads on seek
func (p *Pipeline) killDistantHeads(target int32) {
p.headsMu.Lock()
defer p.headsMu.Unlock()
for i, h := range p.heads {
if h.segment == -1 {
continue
}
if abs32(h.segment-target) > 50 {
p.logger.Trace().Int("eid", i).Int32("at", h.segment).Int32("target", target).
Msg("cassette: killing distant head after seek")
p.killHeadLocked(i)
}
}
// devnote: don't recreate the pipeline context here because we only cancelled
// individual heads, not the whole pipeline. p.killCh will be recreated
// at the top of GetSegment
}
// encoder head management
// isScheduled reports if any head covers seg
func (p *Pipeline) isScheduled(seg int32) bool {
for _, h := range p.heads {
if h.segment >= 0 && h.segment <= seg && seg < h.end {
return true
}
}
return false
}
// minHeadDistance returns distance to nearest head
func (p *Pipeline) minHeadDistance(seg int32) float64 {
t := p.session.Keyframes.Get(seg)
best := math.Inf(1)
for _, h := range p.heads {
if h.segment < 0 || seg >= h.end {
continue
}
ht := p.session.Keyframes.Get(h.segment)
if ht > t {
continue
}
if d := t - ht; d < best {
best = d
}
}
return best
}
// prefetch speculatively spawns an encoder by using VelocityEstimator
func (p *Pipeline) prefetch(current int32) {
// Audio is cheap to encode on demand, skip prefetch
if p.kind == AudioKind {
return
}
lookAhead := p.velocity.LookAhead(5) // base 5 segments, scales up
length := int32(p.segments.Len())
p.headsMu.RLock()
defer p.headsMu.RUnlock()
for i := current + 1; i <= min(current+lookAhead, length-1); i++ {
if p.segments.IsReady(i) {
continue
}
if d := p.minHeadDistance(i); d < 60+5*float64(i-current) {
continue
}
go func(s int32) { _ = p.runHead(s) }(i)
return // only one speculative head per request
}
}
// runHead launches an ffmpeg process from [start, end).
// it acquires a slot from the governor.
func (p *Pipeline) runHead(start int32) error {
length, isDone := p.session.Keyframes.Length()
end := min(start+100, length)
// keep a 2-segment padding when keyframes are still arriving so we
// never reference a keyframe that hasn't been extracted yet.
if !isDone {
end -= 2
}
// shrink range to stop at the first already-ready segment.
for i := start; i < end; i++ {
if p.segments.IsReady(i) {
end = i
break
}
}
if start >= end {
return nil
}
// acquire a slot from the governor
release, err := p.governor.Acquire(p.ctx)
if err != nil {
return fmt.Errorf("cassette: governor denied slot: %w", err)
}
// guard against the select race: when both the semaphore and ctx.Done()
// are immediately ready, Go picks randomly. If the semaphore won but the
// context was already cancelled (e.g. pipeline was Kill()ed), bail now.
if p.ctx.Err() != nil {
release()
return fmt.Errorf("cassette: pipeline cancelled")
}
headCtx, headCancel := context.WithCancel(p.ctx)
p.headsMu.Lock()
encoderID := len(p.heads)
p.heads = append(p.heads, head{segment: start, end: end, cancel: headCancel})
p.headsMu.Unlock()
// build ffmpeg arguments
startSeg := start
startRef := float64(0)
if start != 0 {
startSeg = start - 1
if p.kind == AudioKind {
// audio needs pre-context to avoid ~100ms of silence at segment
// boundaries
startRef = p.session.Keyframes.Get(startSeg)
} else {
// video: nudge seek point past the keyframe to prevent ffmpeg from
// accidentally landing on the prior keyframe
if startSeg+1 == length {
startRef = (p.session.Keyframes.Get(startSeg) + float64(p.session.Info.Duration)) / 2
} else {
startRef = (p.session.Keyframes.Get(startSeg) + p.session.Keyframes.Get(startSeg+1)) / 2
}
}
}
endPad := int32(1)
if end == length {
endPad = 0
}
// We must include the "start" keyframe as a boundary so ffmpeg spits out
// the pre-segment (which we discard) as a separate file
firstBoundary := start + 1
if start != 0 {
firstBoundary = start
}
segmentTimes := p.session.Keyframes.Slice(firstBoundary, end+endPad)
if len(segmentTimes) == 0 {
segmentTimes = []float64{9_999_999}
}
outPath := p.outPathFmt(encoderID)
if err := os.MkdirAll(filepath.Dir(outPath), 0755); err != nil {
release()
return err
}
args := []string{"-nostats", "-hide_banner", "-loglevel", "warning"}
args = append(args, p.settings.HwAccel.DecodeFlags...)
if startRef != 0 {
if p.kind == VideoKind {
// -noaccurate_seek gives faster seeks for video and is required
// for correct segment boundary behaviour in transmux mode
args = append(args, "-noaccurate_seek")
}
args = append(args, "-ss", fmt.Sprintf("%.6f", startRef))
}
if end+1 < length {
endRef := p.session.Keyframes.Get(end + 1)
// compensate for the offset between the requested -ss and the actual
// keyframe that ffmpeg landed on
endRef += startRef - p.session.Keyframes.Get(startSeg)
args = append(args, "-to", fmt.Sprintf("%.6f", endRef))
}
args = append(args,
"-sn", "-dn",
"-i", p.session.Path,
"-map_metadata", "-1", // ?
"-map_chapters", "-1", // ?
"-start_at_zero",
"-copyts",
"-muxdelay", "0",
)
segStr := toSegmentStr(segmentTimes)
args = append(args, p.buildArgs(segStr)...)
// Compute segment_times relative to -ss start.
relTimes := lo.Map(segmentTimes, func(t float64, _ int) float64 {
return t - p.session.Keyframes.Get(startSeg)
})
args = append(args,
"-f", "segment",
"-segment_time_delta", "0.05",
"-segment_format", "mpegts",
"-segment_times", toSegmentStr(relTimes),
"-segment_list_type", "flat",
"-segment_list", "pipe:1",
"-segment_start_number", fmt.Sprint(startSeg),
outPath,
)
p.logger.Trace().Str("pipeline", p.label).Int("eid", encoderID).
Int32("start", start).Int32("end", end).
Msgf("cassette: spawning ffmpeg")
cmd := util.NewCmdCtx(context.Background(), p.settings.FfmpegPath, args...)
stdout, err := cmd.StdoutPipe()
if err != nil {
release()
return err
}
stdin, err := cmd.StdinPipe()
if err != nil {
release()
return err
}
var stderr strings.Builder
cmd.Stderr = &stderr
if err := cmd.Start(); err != nil {
release()
return err
}
p.headsMu.Lock()
p.heads[encoderID].cmd = cmd
p.heads[encoderID].stdin = stdin
p.headsMu.Unlock()
p.activeHeadsWg.Add(1)
// read segment list from stdout
go p.readSegments(encoderID, start, end, length, stdout, stdin)
// cancel listener, propagates context cancellation to ffmpeg
go func(ctx context.Context) {
<-ctx.Done()
_, _ = stdin.Write([]byte("q"))
_ = stdin.Close()
}(headCtx)
// Goroutine: reap process and release governor slot.
go p.reapProcess(encoderID, cmd, &stderr, release, headCancel)
return nil
}
// readSegments processes the segment list from stdout.
// As each segment is ready, it marks it as ready in the segments map.
func (p *Pipeline) readSegments(
encoderID int,
start, end, length int32,
stdout io.ReadCloser,
stdin io.WriteCloser,
) {
scanner := bufio.NewScanner(stdout)
format := filepath.Base(p.outPathFmt(encoderID))
for scanner.Scan() {
var seg int32
if _, err := fmt.Sscanf(scanner.Text(), format, &seg); err != nil {
continue
}
if seg < start {
continue // pre-segment produced by -ss padding, discard
}
p.headsMu.Lock()
p.heads[encoderID].segment = seg
p.headsMu.Unlock()
if p.segments.IsReady(seg) {
// another encoder beat us, quit to avoid duplicate work
_, _ = stdin.Write([]byte("q"))
_ = stdin.Close()
return
}
p.segments.MarkReady(seg, encoderID)
if seg == end-1 {
return // range complete, ffmpeg will finish naturally
}
if p.segments.IsReady(seg + 1) {
// next segment already done by another head, no point continuing
_, _ = stdin.Write([]byte("q"))
_ = stdin.Close()
return
}
}
}
// reapProcess waits for the ffmpeg process to exit, marks its head as deleted,
// and releases the governor slot. If a hardware acceleration failure is
// detected, it logs actionable guidance.
func (p *Pipeline) reapProcess(encoderID int, cmd *exec.Cmd, stderr *strings.Builder, release func(), headCancel context.CancelFunc) {
defer p.activeHeadsWg.Done() // Signal that this head has completely exited
defer release() // Always release the governor slot
defer headCancel() // Cancel the head context to free the soft-close goroutine
err := cmd.Wait()
// Check for hardware acceleration failures in stderr
if len(p.settings.HwAccel.DecodeFlags) > 0 && DetectHwAccelFailure(stderr.String()) {
p.logger.Warn().Int("eid", encoderID).
Str("hwaccel", FormatHwAccelSummary(p.settings.HwAccel)).
Msg("cassette: hardware acceleration failed, consider switching to CPU or a different backend")
}
var exitErr *exec.ExitError
switch {
case errors.As(err, &exitErr) && exitErr.ExitCode() == 255:
p.logger.Trace().Int("eid", encoderID).Msg("cassette: ffmpeg process terminated")
case err != nil && strings.Contains(err.Error(), "killed"):
p.logger.Trace().Int("eid", encoderID).Msg("cassette: ffmpeg process killed intentionally")
case err != nil:
p.logger.Error().Int("eid", encoderID).
Err(fmt.Errorf("%s: %s", err, stderr.String())).
Msg("cassette: ffmpeg process failed")
default:
p.logger.Trace().Int("eid", encoderID).Str("pipeline", p.label).
Msg("cassette: ffmpeg process exited cleanly")
}
p.headsMu.Lock()
defer p.headsMu.Unlock()
p.heads[encoderID] = deletedHead
}
// helpers
func toSegmentStr(times []float64) string {
parts := make([]string, len(times))
for i, t := range times {
parts[i] = fmt.Sprintf("%.6f", t)
}
return strings.Join(parts, ",")
}

View File

@@ -0,0 +1,125 @@
package cassette
import (
"fmt"
"seanime/internal/mediastream/videofile"
"strings"
)
// playlist generation
// GenerateMasterPlaylist builds the hls master playlist using the quality ladder
func GenerateMasterPlaylist(info *videofile.MediaInfo, ladder []QualityLadderEntry, token string) string {
var b strings.Builder
b.WriteString("#EXTM3U\n")
if info.Video == nil {
// just list audio tracks if audio only
writeAudioTracks(&b, info, token)
return b.String()
}
// The codec advertized for transcoded variants. Original uses the source's actual codec when known.
transcodedCodec := "avc1.640028" // h264 High L4.0 safe default
tokenSuffix := ""
if token != "" {
tokenSuffix = "?token=" + token
}
for _, entry := range ladder {
b.WriteString("#EXT-X-STREAM-INF:")
if entry.Quality == Original {
// use actual source properties
avg, peak := EffectiveBitrate(Original, info.Video.Bitrate)
fmt.Fprintf(&b, "AVERAGE-BANDWIDTH=%d,", avg)
fmt.Fprintf(&b, "BANDWIDTH=%d,", peak)
fmt.Fprintf(&b, "RESOLUTION=%dx%d,", info.Video.Width, info.Video.Height)
codec := transcodedCodec
if entry.OriginalCanTransmux && info.Video.MimeCodec != nil {
codec = *info.Video.MimeCodec
}
fmt.Fprintf(&b, "CODECS=\"%s,mp4a.40.2\",", codec)
} else {
// use quality-specific bitrates capped at source
avg, peak := EffectiveBitrate(entry.Quality, info.Video.Bitrate)
fmt.Fprintf(&b, "AVERAGE-BANDWIDTH=%d,", avg)
fmt.Fprintf(&b, "BANDWIDTH=%d,", peak)
fmt.Fprintf(&b, "RESOLUTION=%dx%d,", entry.Width, entry.Height)
fmt.Fprintf(&b, "CODECS=\"%s,mp4a.40.2\",", transcodedCodec)
}
b.WriteString("AUDIO=\"audio\",")
b.WriteString("CLOSED-CAPTIONS=NONE\n")
fmt.Fprintf(&b, "./%s/index.m3u8%s\n", entry.Quality, tokenSuffix)
}
writeAudioTracks(&b, info, token)
return b.String()
}
// writeAudioTracks appends audio track entries to the playlist
func writeAudioTracks(b *strings.Builder, info *videofile.MediaInfo, token string) {
tokenSuffix := ""
if token != "" {
tokenSuffix = "?token=" + token
}
for _, audio := range info.Audios {
b.WriteString("#EXT-X-MEDIA:TYPE=AUDIO,")
b.WriteString("GROUP-ID=\"audio\",")
if audio.Language != nil {
fmt.Fprintf(b, "LANGUAGE=\"%s\",", *audio.Language)
}
switch {
case audio.Title != nil:
fmt.Fprintf(b, "NAME=\"%s\",", *audio.Title)
case audio.Language != nil:
fmt.Fprintf(b, "NAME=\"%s\",", *audio.Language)
default:
fmt.Fprintf(b, "NAME=\"Audio %d\",", audio.Index)
}
if audio.IsDefault {
b.WriteString("DEFAULT=YES,")
}
ch := audio.Channels
if ch == 0 {
ch = 2
}
fmt.Fprintf(b, "CHANNELS=\"%d\",", ch)
fmt.Fprintf(b, "URI=\"./audio/%d/index.m3u8%s\"\n", audio.Index, tokenSuffix)
}
}
// GenerateVariantPlaylist builds a variant playlist listing every segment
func GenerateVariantPlaylist(ki *KeyframeIndex, duration float64, token string) string {
tokenSuffix := ""
if token != "" {
tokenSuffix = "?token=" + token
}
var b strings.Builder
b.WriteString("#EXTM3U\n")
b.WriteString("#EXT-X-VERSION:6\n")
b.WriteString("#EXT-X-PLAYLIST-TYPE:EVENT\n")
b.WriteString("#EXT-X-START:TIME-OFFSET=0\n")
b.WriteString("#EXT-X-TARGETDURATION:4\n")
b.WriteString("#EXT-X-MEDIA-SEQUENCE:0\n")
b.WriteString("#EXT-X-INDEPENDENT-SEGMENTS\n")
length, isDone := ki.Length()
for seg := int32(0); seg < length-1; seg++ {
fmt.Fprintf(&b, "#EXTINF:%.6f\n", ki.Get(seg+1)-ki.Get(seg))
fmt.Fprintf(&b, "segment-%d.ts%s\n", seg, tokenSuffix)
}
// Final segment, include only when extraction is complete
if isDone && length > 0 {
fmt.Fprintf(&b, "#EXTINF:%.6f\n", duration-ki.Get(length-1))
fmt.Fprintf(&b, "segment-%d.ts%s\n", length-1, tokenSuffix)
b.WriteString("#EXT-X-ENDLIST")
}
return b.String()
}

View File

@@ -0,0 +1,275 @@
package cassette
import (
"errors"
"fmt"
"math"
"seanime/internal/mediastream/videofile"
"strings"
)
// Quality represents a named video resolution tier.
// [Original] indicates a transmux (copy) of the source video without re-encoding.
type Quality string
const (
P240 Quality = "240p"
P360 Quality = "360p"
P480 Quality = "480p"
P720 Quality = "720p"
P1080 Quality = "1080p"
P1440 Quality = "1440p"
P4k Quality = "4k"
P8k Quality = "8k"
Original Quality = "original"
)
var Qualities = []Quality{P240, P360, P480, P720, P1080, P1440, P4k, P8k}
func QualityFromString(s string) (Quality, error) {
if s == string(Original) {
return Original, nil
}
for _, q := range Qualities {
if string(q) == s {
return q, nil
}
}
return Original, errors.New("cassette: invalid quality string")
}
// QualityFromHeight returns the smallest quality tier whose height is >= given value
func QualityFromHeight(height uint32) Quality {
for _, q := range Qualities {
if q.Height() >= height {
return q
}
}
return P240
}
// Height returns the vertical resolution in pixels
func (q Quality) Height() uint32 {
switch q {
case P240:
return 240
case P360:
return 360
case P480:
return 480
case P720:
return 720
case P1080:
return 1080
case P1440:
return 1440
case P4k:
return 2160
case P8k:
return 4320
case Original:
panic("cassette: Original quality must be handled specially")
}
panic("cassette: invalid quality value")
}
// AverageBitrate returns the target average bitrate in bits/s for this tier
func (q Quality) AverageBitrate() uint32 {
switch q {
case P240:
return 400_000
case P360:
return 800_000
case P480:
return 1_200_000
case P720:
return 2_400_000
case P1080:
return 4_800_000
case P1440:
return 9_600_000
case P4k:
return 16_000_000
case P8k:
return 28_000_000
case Original:
panic("cassette: Original quality must be handled specially")
}
panic("cassette: invalid quality value")
}
// MaxBitrate returns the peak bitrate used for VBV/HRD in bits/s
func (q Quality) MaxBitrate() uint32 {
switch q {
case P240:
return 700_000
case P360:
return 1_400_000
case P480:
return 2_100_000
case P720:
return 4_000_000
case P1080:
return 8_000_000
case P1440:
return 12_000_000
case P4k:
return 28_000_000
case P8k:
return 40_000_000
case Original:
panic("cassette: Original quality must be handled specially")
}
panic("cassette: invalid quality value")
}
// dynamic quality ladder
// QualityLadderEntry is one rung in a dynamic quality ladder built from
// the source file's actual properties
type QualityLadderEntry struct {
Quality Quality
Width int32
Height int32
// Whether this quality tier requires transcoding (vs transmux/copy).
NeedsTranscode bool
// Whether the source's codec already matches the output codec so the
// original tier can transmux (copy) the video stream.
OriginalCanTransmux bool
}
// BuildQualityLadder generates a quality ladder for the source file.
// - Never offers tiers above the source resolution (useless upscale)
// - Marks the original tier as transmux-capable when the source is h264
// at a compatible profile (avoids an entire re-encode)
// - Computes actual output dimensions preserving aspect ratio
// - Skips tiers whose bitrate would exceed the source's native bitrate (wasteful to transcode to a "lower quality" at higher bitrate)
func BuildQualityLadder(info *videofile.MediaInfo) []QualityLadderEntry {
if info.Video == nil {
return nil
}
srcHeight := info.Video.Height
srcWidth := info.Video.Width
srcBitrate := info.Video.Bitrate
aspectRatio := float64(srcWidth) / float64(srcHeight)
// whether the source can be transmuxed (copied) as the "original"
// h264 baseline/main/high at standard levels is universally playable
canTransmux := isTransmuxableVideo(info.Video)
var ladder []QualityLadderEntry
// include the "original" tier first
ladder = append(ladder, QualityLadderEntry{
Quality: Original,
Width: int32(srcWidth),
Height: int32(srcHeight),
NeedsTranscode: !canTransmux,
OriginalCanTransmux: canTransmux,
})
// add downscale tiers that make sense
for _, q := range Qualities {
h := q.Height()
// skip tiers above source resolution, upscaling is pointless
if h > srcHeight {
continue
}
// skip tiers whose average bitrate exceeds the source bitrate
// e.g. skip a 720p tier at 2.4 Mbps when the source is a low bitrate 1080p at 1.5 Mbps
if srcBitrate > 0 && q.AverageBitrate() >= srcBitrate {
continue
}
w := closestEven(int32(float64(h) * aspectRatio))
ladder = append(ladder, QualityLadderEntry{
Quality: q,
Width: w,
Height: int32(h),
NeedsTranscode: true,
})
}
return ladder
}
// isTransmuxableVideo returns true if the codec can be directly copied
// into an HLS mpegts container without re-encoding and be playable in browsers.
func isTransmuxableVideo(video *videofile.Video) bool {
if video.MimeCodec == nil {
return false
}
// chrome/safari support h.264 baseline/main/high profiles (8-bit)
codec := *video.MimeCodec
return strings.HasPrefix(codec, "avc1.42") ||
strings.HasPrefix(codec, "avc1.4d") ||
strings.HasPrefix(codec, "avc1.64")
}
// audio codec awareness
// AudioTranscodeDecision describes how an audio track should be handled
type AudioTranscodeDecision struct {
// Copy is true when the source codec is HLS-compatible and can be
// transmitted without re-encoding.
Copy bool
// Codec is the output codec flag (e.g. "aac", "copy").
Codec string
// Bitrate is the output bitrate flag (e.g. "128k", "384k"). Empty when
// Copy is true.
Bitrate string
// Channels is the output channel count as a string.
Channels string
}
// DecideAudioTranscode
// - If the source is already AAC: copy it
// - If the source has ≤ 2 channels: encode to AAC stereo @ 128k
// - If the source has > 2 channels: encode to AAC preserving layout @ 384k
func DecideAudioTranscode(audio *videofile.Audio) AudioTranscodeDecision {
// just copy aac
if audio.Codec == "aac" {
channels := "2"
if audio.Channels > 2 {
channels = fmt.Sprintf("%d", audio.Channels)
}
return AudioTranscodeDecision{
Copy: true,
Codec: "copy",
Channels: channels,
}
}
// everything else needs re-encoding to AAC
channels := "2"
bitrate := "128k"
if audio.Channels > 2 {
channels = fmt.Sprintf("%d", audio.Channels)
bitrate = "384k"
}
return AudioTranscodeDecision{
Copy: false,
Codec: "aac",
Bitrate: bitrate,
Channels: channels,
}
}
// EffectiveBitrate returns the bitrate to advertize in the master playlist
func EffectiveBitrate(q Quality, srcBitrate uint32) (avg uint32, peak uint32) {
if q == Original {
return srcBitrate, srcBitrate
}
avg = q.AverageBitrate()
peak = q.MaxBitrate()
if srcBitrate > 0 {
avg = uint32(math.Min(float64(avg), float64(srcBitrate)*0.8))
peak = uint32(math.Min(float64(peak), float64(srcBitrate)))
}
return avg, peak
}

View File

@@ -0,0 +1,108 @@
package cassette
import (
"context"
"sync"
)
// SegmentTable tracks which segments are ready
type SegmentTable struct {
mu sync.RWMutex
segments []segmentEntry
}
type segmentEntry struct {
// ch is closed when the segment is ready on disk.
ch chan struct{}
// encoderID is the head that produced the segment.
encoderID int
}
// NewSegmentTable creates a table with initialLen segments
func NewSegmentTable(initialLen int32) *SegmentTable {
st := &SegmentTable{
segments: make([]segmentEntry, initialLen, max(initialLen, 2048)),
}
for i := range st.segments {
st.segments[i].ch = make(chan struct{})
}
return st
}
// Grow extends the table to at least newLen segments
func (st *SegmentTable) Grow(newLen int) {
st.mu.Lock()
defer st.mu.Unlock()
if newLen <= len(st.segments) {
return
}
for i := len(st.segments); i < newLen; i++ {
st.segments = append(st.segments, segmentEntry{ch: make(chan struct{})})
}
}
// Len returns the current number of tracked segments
func (st *SegmentTable) Len() int {
st.mu.RLock()
defer st.mu.RUnlock()
return len(st.segments)
}
// IsReady returns true if segment is ready
func (st *SegmentTable) IsReady(seg int32) bool {
st.mu.RLock()
ch := st.segments[seg].ch
st.mu.RUnlock()
select {
case <-ch:
return true
default:
return false
}
}
// isReadyLocked is like IsReady but expects at least an RLock to be held.
func (st *SegmentTable) isReadyLocked(seg int32) bool {
select {
case <-st.segments[seg].ch:
return true
default:
return false
}
}
// MarkReady marks a segment as ready
func (st *SegmentTable) MarkReady(seg int32, encoderID int) {
st.mu.Lock()
defer st.mu.Unlock()
select {
case <-st.segments[seg].ch:
// Already closed — idempotent.
default:
st.segments[seg].encoderID = encoderID
close(st.segments[seg].ch)
}
}
// EncoderID returns the encoder that produced the given segment
func (st *SegmentTable) EncoderID(seg int32) int {
st.mu.RLock()
defer st.mu.RUnlock()
return st.segments[seg].encoderID
}
// WaitFor blocks until segment is ready or context is cancelled
func (st *SegmentTable) WaitFor(ctx context.Context, seg int32, kill <-chan struct{}) error {
st.mu.RLock()
ch := st.segments[seg].ch
st.mu.RUnlock()
select {
case <-ch:
return nil
case <-ctx.Done():
return ctx.Err()
case <-kill:
return context.Canceled
}
}

View File

@@ -0,0 +1,333 @@
package cassette
import (
"context"
"fmt"
"os"
"path/filepath"
"seanime/internal/mediastream/videofile"
"sync"
"github.com/rs/zerolog"
)
// Session represents a per-file transcode session
type Session struct {
// ready is decremented once keyframes load
ready sync.WaitGroup
// err is set if initialization fails
err error
Path string
Out string
Keyframes *KeyframeIndex
Info *videofile.MediaInfo
Ladder []QualityLadderEntry
// videos and audios are created lazily
videosMu sync.Mutex
videos map[Quality]*Pipeline
audiosMu sync.Mutex
audios map[int32]*Pipeline
settings *Settings
governor *Governor
logger *zerolog.Logger
}
// NewSession creates a transcode session and starts keyframe extraction
func NewSession(
path, hash string,
info *videofile.MediaInfo,
settings *Settings,
governor *Governor,
logger *zerolog.Logger,
) *Session {
s := &Session{
Path: path,
Out: filepath.Join(settings.StreamDir, hash),
videos: make(map[Quality]*Pipeline),
audios: make(map[int32]*Pipeline),
Info: info,
Ladder: BuildQualityLadder(info),
settings: settings,
governor: governor,
logger: logger,
}
s.ready.Add(1)
go func() {
defer s.ready.Done()
s.Keyframes = getOrExtractKeyframes(path, hash, settings, logger)
}()
if len(s.Ladder) > 0 {
logger.Debug().
Int("tiers", len(s.Ladder)).
Bool("canTransmux", s.Ladder[0].OriginalCanTransmux).
Msg("cassette: quality ladder built")
}
return s
}
// WaitReady blocks until the keyframe index is ready
func (s *Session) WaitReady() error {
s.ready.Wait()
return s.err
}
// master / index / segment accessors
// GetMaster returns the hls master playlist
func (s *Session) GetMaster(token string) string {
return GenerateMasterPlaylist(s.Info, s.Ladder, token)
}
// GetVideoIndex returns the hls variant playlist for a quality
func (s *Session) GetVideoIndex(q Quality, token string) (string, error) {
p := s.getVideoPipeline(q)
return p.GetIndex(token)
}
// GetVideoSegment returns the path to a video segment, blocking until ready
func (s *Session) GetVideoSegment(ctx context.Context, q Quality, seg int32) (string, error) {
// The timeout is bounded by the pipeline constraints, but the request controls early cancellation.
type result struct {
path string
err error
}
ch := make(chan result, 1)
go func() {
p := s.getVideoPipeline(q)
path, err := p.GetSegment(ctx, seg)
ch <- result{path, err}
}()
select {
case r := <-ch:
return r.path, r.err
case <-ctx.Done():
return "", fmt.Errorf("cassette: context canceled waiting for video segment %d (%s)", seg, q)
}
}
// GetAudioIndex returns the hls variant playlist for an audio track
func (s *Session) GetAudioIndex(audio int32, token string) (string, error) {
p := s.getAudioPipeline(audio)
return p.GetIndex(token)
}
// GetAudioSegment returns the path to an audio segment
func (s *Session) GetAudioSegment(ctx context.Context, audio, seg int32) (string, error) {
p := s.getAudioPipeline(audio)
return p.GetSegment(ctx, seg)
}
// video pipeline factory
func (s *Session) getVideoPipeline(q Quality) *Pipeline {
s.videosMu.Lock()
defer s.videosMu.Unlock()
if p, ok := s.videos[q]; ok {
return p
}
s.logger.Trace().Str("file", filepath.Base(s.Path)).Str("quality", string(q)).
Msg("cassette: creating video pipeline")
// Check if this quality can transmux.
canTransmux := false
if q == Original {
for _, entry := range s.Ladder {
if entry.Quality == Original {
canTransmux = entry.OriginalCanTransmux
break
}
}
}
buildArgs := func(segmentTimes string) []string {
args := []string{"-map", "0:V:0"}
if canTransmux {
// no encode, just copy.
args = append(args, "-c:v", "copy")
return args
}
if q == Original {
// Needs transcode even for original quality (e.g. HEVC).
args = append(args, s.settings.HwAccel.EncodeFlags...)
avgBitrate, maxBitrate := EffectiveBitrate(Original, s.Info.Video.Bitrate)
if avgBitrate == 0 {
avgBitrate = 5_000_000
maxBitrate = 8_000_000
}
width := closestEven(int32(s.Info.Video.Width))
args = append(args,
"-vf", BuildVideoFilter(&s.settings.HwAccel, s.Info.Video, width, int32(s.Info.Video.Height)),
"-bufsize", fmt.Sprint(maxBitrate*5),
"-b:v", fmt.Sprint(avgBitrate),
"-maxrate", fmt.Sprint(maxBitrate),
)
if s.settings.HwAccel.ForcedIDR {
args = append(args, "-forced-idr", "1")
}
args = append(args,
"-force_key_frames", segmentTimes,
"-strict", "-2",
)
return args
}
// Downscale transcode.
args = append(args, s.settings.HwAccel.EncodeFlags...)
width := closestEven(int32(
float64(q.Height()) / float64(s.Info.Video.Height) * float64(s.Info.Video.Width),
))
args = append(args,
"-vf", BuildVideoFilter(&s.settings.HwAccel, s.Info.Video, width, int32(q.Height())),
// "-vf", fmt.Sprintf(s.settings.HwAccel.ScaleFilter, width, q.Height()),
"-bufsize", fmt.Sprint(q.MaxBitrate()*5),
"-b:v", fmt.Sprint(q.AverageBitrate()),
"-maxrate", fmt.Sprint(q.MaxBitrate()),
)
if s.settings.HwAccel.ForcedIDR {
args = append(args, "-forced-idr", "1")
}
args = append(args,
"-force_key_frames", segmentTimes,
"-strict", "-2",
)
return args
}
outFmt := func(eid int) string {
return filepath.Join(s.Out, fmt.Sprintf("segment-%s-%d-%%d.ts", q, eid))
}
label := fmt.Sprintf("video (%s)", q)
if canTransmux {
label = "video (original/transmux)"
}
p := NewPipeline(PipelineConfig{
Kind: VideoKind,
Label: label,
Session: s,
Settings: s.settings,
Governor: s.governor,
Logger: s.logger,
BuildArgs: buildArgs,
OutPathFmt: outFmt,
})
s.videos[q] = p
return p
}
// audio pipeline factory
// getAudioPipeline creates or retrieves an audio pipeline.
func (s *Session) getAudioPipeline(idx int32) *Pipeline {
s.audiosMu.Lock()
defer s.audiosMu.Unlock()
if p, ok := s.audios[idx]; ok {
return p
}
s.logger.Trace().Str("file", filepath.Base(s.Path)).Int32("audio", idx).
Msg("cassette: creating audio pipeline")
// Get source audio info.
var srcAudio *videofile.Audio
for i := range s.Info.Audios {
if int32(s.Info.Audios[i].Index) == idx {
srcAudio = &s.Info.Audios[i]
break
}
}
decision := AudioTranscodeDecision{
Codec: "aac",
Bitrate: "128k",
Channels: "2",
}
if srcAudio != nil {
decision = DecideAudioTranscode(srcAudio)
}
if decision.Copy {
s.logger.Debug().Int32("audio", idx).Str("codec", "copy").
Msg("cassette: audio is HLS-compatible, transmuxing (no re-encode)")
} else {
s.logger.Debug().Int32("audio", idx).
Str("codec", decision.Codec).
Str("bitrate", decision.Bitrate).
Str("channels", decision.Channels).
Msg("cassette: audio needs re-encode")
}
buildArgs := func(_ string) []string {
args := []string{
"-map", fmt.Sprintf("0:a:%d", idx),
"-c:a", decision.Codec,
}
if !decision.Copy {
args = append(args, "-ac", decision.Channels)
if decision.Bitrate != "" {
args = append(args, "-b:a", decision.Bitrate)
}
}
return args
}
outFmt := func(eid int) string {
return filepath.Join(s.Out, fmt.Sprintf("segment-a%d-%d-%%d.ts", idx, eid))
}
p := NewPipeline(PipelineConfig{
Kind: AudioKind,
Label: fmt.Sprintf("audio %d", idx),
Session: s,
Settings: s.settings,
Governor: s.governor,
Logger: s.logger,
BuildArgs: buildArgs,
OutPathFmt: outFmt,
})
s.audios[idx] = p
return p
}
// lifecycle
// Kill stops all running encode pipelines
func (s *Session) Kill() {
s.videosMu.Lock()
for _, p := range s.videos {
p.Kill()
}
s.videosMu.Unlock()
s.audiosMu.Lock()
for _, p := range s.audios {
p.Kill()
}
s.audiosMu.Unlock()
}
// Destroy stops everything and removes output directory
func (s *Session) Destroy() {
s.logger.Debug().Str("path", s.Path).Msg("cassette: destroying session")
s.Kill()
_ = os.RemoveAll(s.Out)
}

View File

@@ -0,0 +1,29 @@
package cassette
// HwAccelProfile holds ffmpeg flags for a hardware backend
type HwAccelProfile struct {
// Name is the identifier for logging
Name string `json:"name"`
// DecodeFlags are placed before -i
DecodeFlags []string `json:"decodeFlags"`
// EncodeFlags are placed after -i
EncodeFlags []string `json:"encodeFlags"`
// ScaleFilter is a format string for width/height
ScaleFilter string `json:"scaleFilter"`
// NoScaleFilter used when scaling is not needed
NoScaleFilter string `json:"noScaleFilter"`
// ForcedIDR ensures segment boundaries align with idr frames
ForcedIDR bool `json:"forcedIdr"`
}
// Settings holds the runtime configuration for a cassette instance
type Settings struct {
// StreamDir is the directory where segments are written
StreamDir string
// HwAccel is the active hardware acceleration profile
HwAccel HwAccelProfile
// FfmpegPath is the path to the ffmpeg binary
FfmpegPath string
// FfprobePath is the path to the ffprobe binary
FfprobePath string
}

View File

@@ -0,0 +1,307 @@
package cassette
import (
"time"
"github.com/rs/zerolog"
)
// ClientInfo represents a snapshot of client consumption
type ClientInfo struct {
Client string
Path string
Quality *Quality
Audio int32 // -1 means "unchanged / not set".
Head int32 // -1 means "unchanged / not set".
}
// ClientTracker monitors client activity and cleans up idle resources
type ClientTracker struct {
clients map[string]ClientInfo
visitDate map[string]time.Time
lastUsage map[string]time.Time
// qualityActive and audioActive track the last time a segment was requested
qualityActive map[string]map[Quality]time.Time // path -> quality -> last used
audioActive map[string]map[int32]time.Time // path -> audio -> last used
cassette *Cassette
deletedStream chan string
logger *zerolog.Logger
killCh chan struct{}
}
// NewClientTracker creates and starts a client tracker
func NewClientTracker(c *Cassette) *ClientTracker {
t := &ClientTracker{
clients: make(map[string]ClientInfo),
visitDate: make(map[string]time.Time),
lastUsage: make(map[string]time.Time),
qualityActive: make(map[string]map[Quality]time.Time),
audioActive: make(map[string]map[int32]time.Time),
cassette: c,
deletedStream: make(chan string, 256),
logger: c.logger,
killCh: make(chan struct{}),
}
go t.run()
return t
}
// Stop shuts down the tracker
func (t *ClientTracker) Stop() {
select {
case <-t.killCh:
default:
close(t.killCh)
}
}
// ---------------------------------------------------------------------------
// Event loop
// ---------------------------------------------------------------------------
func (t *ClientTracker) run() {
const (
inactiveTimeout = 1 * time.Hour
pipelineIdleTimeout = 15 * time.Second
pipelineTickInterval = 5 * time.Second
)
sessionTicker := time.NewTicker(inactiveTimeout)
pipelineTicker := time.NewTicker(pipelineTickInterval)
defer sessionTicker.Stop()
defer pipelineTicker.Stop()
for {
select {
case <-t.killCh:
return
case info, ok := <-t.cassette.clientChan:
if !ok {
return
}
t.handleClientUpdate(info)
case <-sessionTicker.C:
t.purgeInactive(inactiveTimeout)
case <-pipelineTicker.C:
t.purgeIdlePipelines(pipelineIdleTimeout)
case path := <-t.deletedStream:
t.maybeDestroySession(path)
}
}
}
func (t *ClientTracker) handleClientUpdate(info ClientInfo) {
old, exists := t.clients[info.Client]
// merge partial updates
if exists && old.Path == info.Path {
if info.Quality == nil {
info.Quality = old.Quality
}
if info.Audio == -1 {
info.Audio = old.Audio
}
if info.Head == -1 {
info.Head = old.Head
}
}
t.clients[info.Client] = info
t.visitDate[info.Client] = time.Now()
t.lastUsage[info.Path] = time.Now()
// record the last-active timestamp for the quality and audio track
if info.Quality != nil {
if t.qualityActive[info.Path] == nil {
t.qualityActive[info.Path] = make(map[Quality]time.Time)
}
t.qualityActive[info.Path][*info.Quality] = time.Now()
}
if info.Audio >= 0 {
if t.audioActive[info.Path] == nil {
t.audioActive[info.Path] = make(map[int32]time.Time)
}
t.audioActive[info.Path][info.Audio] = time.Now()
}
if !exists {
return
}
if old.Path != info.Path {
t.killSessionIfDead(old.Path)
return
}
// kill orphaned heads when playhead jumps far
if old.Head != -1 && abs32(info.Head-old.Head) > 100 {
t.killOrphanedHeads(old.Path, old.Quality, old.Audio)
}
}
func (t *ClientTracker) purgeInactive(timeout time.Duration) {
for client, date := range t.visitDate {
if time.Since(date) < timeout {
continue
}
info := t.clients[client]
if !t.killSessionIfDead(info.Path) {
t.killOrphanedHeads(info.Path, info.Quality, info.Audio)
}
delete(t.clients, client)
delete(t.visitDate, client)
}
}
// purgeIdlePipelines kills pipelines whose last request is older than timeout
func (t *ClientTracker) purgeIdlePipelines(timeout time.Duration) {
for path, qualities := range t.qualityActive {
for q, lastUsed := range qualities {
if time.Since(lastUsed) < timeout {
continue
}
if t.killQualityIfDead(path, q) {
delete(qualities, q)
}
}
if len(qualities) == 0 {
delete(t.qualityActive, path)
}
}
for path, audios := range t.audioActive {
for audio, lastUsed := range audios {
if time.Since(lastUsed) < timeout {
continue
}
if t.killAudioIfDead(path, audio) {
delete(audios, audio)
}
}
if len(audios) == 0 {
delete(t.audioActive, path)
}
}
}
// session / pipeline reaping
func (t *ClientTracker) killSessionIfDead(path string) bool {
for _, c := range t.clients {
if c.Path == path {
return false
}
}
t.logger.Trace().Str("path", path).Msg("cassette: reaping idle session")
s := t.cassette.getSessionByPath(path)
if s == nil {
return false
}
s.Kill()
// Schedule full destruction after a cooldown
go func() {
select {
case <-t.killCh:
return
case <-time.After(4 * time.Hour):
t.deletedStream <- path
}
}()
return true
}
func (t *ClientTracker) maybeDestroySession(path string) {
if time.Since(t.lastUsage[path]) < 4*time.Hour {
return
}
t.cassette.destroySession(path)
}
func (t *ClientTracker) killAudioIfDead(path string, audio int32) bool {
for _, c := range t.clients {
if c.Path == path && c.Audio == audio {
return false
}
}
t.logger.Trace().Int32("audio", audio).Str("path", path).Msg("cassette: reaping idle audio pipeline")
s := t.cassette.getSessionByPath(path)
if s == nil {
return false
}
s.audiosMu.Lock()
if p, ok := s.audios[audio]; ok {
p.Kill()
delete(s.audios, audio)
}
s.audiosMu.Unlock()
return true
}
func (t *ClientTracker) killQualityIfDead(path string, q Quality) bool {
for _, c := range t.clients {
if c.Path == path && c.Quality != nil && *c.Quality == q {
return false
}
}
t.logger.Trace().Str("quality", string(q)).Str("path", path).
Msg("cassette: reaping idle video pipeline")
s := t.cassette.getSessionByPath(path)
if s == nil {
return false
}
s.videosMu.Lock()
if p, ok := s.videos[q]; ok {
p.Kill()
delete(s.videos, q)
}
s.videosMu.Unlock()
return true
}
func (t *ClientTracker) killOrphanedHeads(path string, quality *Quality, audio int32) {
s := t.cassette.getSessionByPath(path)
if s == nil {
return
}
reapPipeline := func(p *Pipeline) {
p.headsMu.Lock()
defer p.headsMu.Unlock()
for eid, h := range p.heads {
if h.segment == -1 {
continue
}
dist := int32(99999)
for _, c := range t.clients {
if c.Head == -1 {
continue
}
if d := abs32(c.Head - h.segment); d < dist {
dist = d
}
}
if dist > 20 {
t.logger.Trace().Int("eid", eid).Msg("cassette: reaping orphaned encoder head")
p.killHeadLocked(eid)
}
}
}
if quality != nil {
s.videosMu.Lock()
if p, ok := s.videos[*quality]; ok {
reapPipeline(p)
}
s.videosMu.Unlock()
}
if audio != -1 {
s.audiosMu.Lock()
if p, ok := s.audios[audio]; ok {
reapPipeline(p)
}
s.audiosMu.Unlock()
}
}

View File

@@ -0,0 +1,81 @@
package cassette
import (
"errors"
"fmt"
"io"
"os"
"path/filepath"
"time"
"github.com/goccy/go-json"
"github.com/rs/zerolog"
)
// ParseSegment extracts the segment number from a filename like "segment-42.ts"
func ParseSegment(segment string) (int32, error) {
var ret int32
_, err := fmt.Sscanf(segment, "segment-%d.ts", &ret)
if err != nil {
return 0, errors.New("cassette: could not parse segment name")
}
return ret, nil
}
// getSavedInfo deserializes a json file
func getSavedInfo[T any](savePath string, target *T) error {
f, err := os.Open(savePath)
if err != nil {
return err
}
defer f.Close()
data, err := io.ReadAll(f)
if err != nil {
return err
}
return json.Unmarshal(data, target)
}
// saveInfo serializes the value to json and stores it on disk
func saveInfo[T any](savePath string, value *T) error {
data, err := json.Marshal(*value)
if err != nil {
return err
}
_ = os.MkdirAll(filepath.Dir(savePath), 0755)
return os.WriteFile(savePath, data, 0666)
}
// printExecTime logs elapsed time for an operation
func printExecTime(logger *zerolog.Logger, message string, args ...any) func() {
msg := fmt.Sprintf(message, args...)
start := time.Now()
logger.Trace().Msgf("cassette: Running %s", msg)
return func() {
logger.Trace().Msgf("cassette: %s finished in %s", msg, time.Since(start))
}
}
// GetEnvOr returns the environment variable value or default
func GetEnvOr(env, def string) string {
if v := os.Getenv(env); v != "" {
return v
}
return def
}
// closestEven rounds n up to the nearest even number
func closestEven(n int32) int32 {
if n%2 != 0 {
return n + 1
}
return n
}
// abs32 returns the absolute value of an int32
func abs32(x int32) int32 {
if x < 0 {
return -x
}
return x
}

View File

@@ -0,0 +1,98 @@
package cassette
import (
"sync"
"time"
)
// VelocityEstimator tracks playback speed to adjust prefetch
type VelocityEstimator struct {
mu sync.Mutex
samples []velocitySample
maxAge time.Duration
}
type velocitySample struct {
segment int32
at time.Time
}
// NewVelocityEstimator creates an estimator with a moving window
func NewVelocityEstimator(window time.Duration) *VelocityEstimator {
return &VelocityEstimator{
samples: make([]velocitySample, 0, 64),
maxAge: window,
}
}
// Record adds a new segment request observation
func (ve *VelocityEstimator) Record(seg int32) {
ve.mu.Lock()
defer ve.mu.Unlock()
now := time.Now()
ve.samples = append(ve.samples, velocitySample{segment: seg, at: now})
// Evict old samples.
cutoff := now.Add(-ve.maxAge)
i := 0
for i < len(ve.samples) && ve.samples[i].at.Before(cutoff) {
i++
}
if i > 0 {
ve.samples = ve.samples[i:]
}
}
// SegmentsPerSecond returns the consumption rate
func (ve *VelocityEstimator) SegmentsPerSecond() float64 {
ve.mu.Lock()
defer ve.mu.Unlock()
if len(ve.samples) < 2 {
return 0
}
first := ve.samples[0]
last := ve.samples[len(ve.samples)-1]
dt := last.at.Sub(first.at).Seconds()
if dt < 0.01 {
return 0
}
dSeg := float64(last.segment - first.segment)
if dSeg < 0 {
// seek backwards
return 0
}
return dSeg / dt
}
// LookAhead returns suggested prefetch count based on velocity
func (ve *VelocityEstimator) LookAhead(base int32) int32 {
v := ve.SegmentsPerSecond()
if v <= 0 {
return base
}
// At 1x playback, v ~= 0.25 seg/s.
extra := int32(v * 20) // scale up for faster network/playback
return min(base+extra, 30) // cap at 30 segments
}
// DetectSeek returns true if the request looks like a seek
func (ve *VelocityEstimator) DetectSeek(threshold int32) bool {
ve.mu.Lock()
defer ve.mu.Unlock()
n := len(ve.samples)
if n < 2 {
return false
}
delta := ve.samples[n-1].segment - ve.samples[n-2].segment
return abs32(delta) > threshold
}
// Reset clears all samples
func (ve *VelocityEstimator) Reset() {
ve.mu.Lock()
defer ve.mu.Unlock()
ve.samples = ve.samples[:0]
}

View File

@@ -6,10 +6,11 @@ import (
"path/filepath"
"seanime/internal/database/models"
"seanime/internal/events"
"seanime/internal/mediastream/cassette"
"seanime/internal/mediastream/optimizer"
"seanime/internal/mediastream/transcoder"
"seanime/internal/mediastream/videofile"
"seanime/internal/util/filecache"
"seanime/internal/videocore"
"sync"
"github.com/rs/zerolog"
@@ -18,13 +19,14 @@ import (
type (
Repository struct {
transcoder mo.Option[*transcoder.Transcoder]
transcoder mo.Option[*cassette.Cassette]
optimizer *optimizer.Optimizer
settings mo.Option[*models.MediastreamSettings]
playbackManager *PlaybackManager
mediaInfoExtractor *videofile.MediaInfoExtractor
logger *zerolog.Logger
wsEventManager events.WSEventManagerInterface
videoCore *videocore.VideoCore
fileCacher *filecache.Cacher
reqMu sync.Mutex
cacheDir string // where attachments are stored
@@ -34,6 +36,7 @@ type (
NewRepositoryOptions struct {
Logger *zerolog.Logger
WSEventManager events.WSEventManagerInterface
VideoCore *videocore.VideoCore
FileCacher *filecache.Cacher
}
)
@@ -46,13 +49,27 @@ func NewRepository(opts *NewRepositoryOptions) *Repository {
WSEventManager: opts.WSEventManager,
}),
settings: mo.None[*models.MediastreamSettings](),
transcoder: mo.None[*transcoder.Transcoder](),
transcoder: mo.None[*cassette.Cassette](),
wsEventManager: opts.WSEventManager,
videoCore: opts.VideoCore,
fileCacher: opts.FileCacher,
mediaInfoExtractor: videofile.NewMediaInfoExtractor(opts.FileCacher, opts.Logger),
}
ret.playbackManager = NewPlaybackManager(ret)
if opts.VideoCore != nil {
opts.VideoCore.RegisterEventCallback(func(event videocore.VideoEvent) bool {
switch e := event.(type) {
case *videocore.VideoTerminatedEvent:
if ret.TranscoderIsInitialized() {
opts.Logger.Debug().Str("clientId", e.GetClientId()).Msg("mediastream: Received VideoTerminatedEvent, killing transcoder")
ret.ShutdownTranscodeStream(e.GetClientId())
}
}
return true
})
}
return ret
}
@@ -249,7 +266,7 @@ func (r *Repository) initializeTranscoder(settings mo.Option[*models.Mediastream
tc.Destroy()
}
r.transcoder = mo.None[*transcoder.Transcoder]()
r.transcoder = mo.None[*cassette.Cassette]()
// If the transcoder is not enabled, don't initialize the transcoder
if !settings.MustGet().TranscodeEnabled {
@@ -262,7 +279,7 @@ func (r *Repository) initializeTranscoder(settings mo.Option[*models.Mediastream
return false
}
opts := &transcoder.NewTranscoderOptions{
opts := &cassette.NewCassetteOptions{
Logger: r.logger,
HwAccelKind: settings.MustGet().TranscodeHwAccel,
Preset: settings.MustGet().TranscodePreset,
@@ -272,16 +289,16 @@ func (r *Repository) initializeTranscoder(settings mo.Option[*models.Mediastream
TempOutDir: r.transcodeDir,
}
tc, err := transcoder.NewTranscoder(opts)
tc, err := cassette.New(opts)
if err != nil {
r.logger.Error().Err(err).Msg("mediastream: Failed to initialize transcoder")
r.logger.Error().Err(err).Msg("mediastream: Failed to initialize cassette")
return false
}
r.playbackManager.mediaContainers.Clear()
r.logger.Info().Msg("mediastream: Transcoder module initialized")
r.transcoder = mo.Some[*transcoder.Transcoder](tc)
r.logger.Info().Msg("mediastream: Cassette module initialized")
r.transcoder = mo.Some[*cassette.Cassette](tc)
return true
}

View File

@@ -3,13 +3,12 @@ package mediastream
import (
"errors"
"seanime/internal/events"
"seanime/internal/mediastream/transcoder"
"seanime/internal/mediastream/cassette"
"strconv"
"strings"
"github.com/samber/mo"
"github.com/labstack/echo/v4"
"github.com/samber/mo"
)
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
@@ -35,8 +34,10 @@ func (r *Repository) ServeEchoTranscodeStream(c echo.Context, clientId string) e
return errors.New("no file has been loaded")
}
token := c.QueryParam("token")
if path == "master.m3u8" {
ret, err := r.transcoder.MustGet().GetMaster(mediaContainer.Filepath, mediaContainer.Hash, mediaContainer.MediaInfo, clientId)
ret, err := r.transcoder.MustGet().GetMaster(mediaContainer.Filepath, mediaContainer.Hash, mediaContainer.MediaInfo, clientId, token)
if err != nil {
return err
}
@@ -52,12 +53,12 @@ func (r *Repository) ServeEchoTranscodeStream(c echo.Context, clientId string) e
return errors.New("invalid index.m3u8 path")
}
quality, err := transcoder.QualityFromString(split[0])
quality, err := cassette.QualityFromString(split[0])
if err != nil {
return err
}
ret, err := r.transcoder.MustGet().GetVideoIndex(mediaContainer.Filepath, mediaContainer.Hash, mediaContainer.MediaInfo, quality, clientId)
ret, err := r.transcoder.MustGet().GetVideoIndex(mediaContainer.Filepath, mediaContainer.Hash, mediaContainer.MediaInfo, quality, clientId, token)
if err != nil {
return err
}
@@ -78,7 +79,7 @@ func (r *Repository) ServeEchoTranscodeStream(c echo.Context, clientId string) e
return err
}
ret, err := r.transcoder.MustGet().GetAudioIndex(mediaContainer.Filepath, mediaContainer.Hash, mediaContainer.MediaInfo, int32(audio), clientId)
ret, err := r.transcoder.MustGet().GetAudioIndex(mediaContainer.Filepath, mediaContainer.Hash, mediaContainer.MediaInfo, int32(audio), clientId, token)
if err != nil {
return err
}
@@ -94,17 +95,19 @@ func (r *Repository) ServeEchoTranscodeStream(c echo.Context, clientId string) e
return errors.New("invalid segments-:chunk.ts path")
}
quality, err := transcoder.QualityFromString(split[0])
quality, err := cassette.QualityFromString(split[0])
if err != nil {
return err
}
segment, err := transcoder.ParseSegment(split[1])
segment, err := cassette.ParseSegment(split[1])
if err != nil {
return err
}
ret, err := r.transcoder.MustGet().GetVideoSegment(mediaContainer.Filepath, mediaContainer.Hash, mediaContainer.MediaInfo, quality, segment, clientId)
ret, err := r.transcoder.MustGet().GetVideoSegment(
c.Request().Context(),
mediaContainer.Filepath, mediaContainer.Hash, mediaContainer.MediaInfo, quality, segment, clientId)
if err != nil {
return err
}
@@ -125,12 +128,14 @@ func (r *Repository) ServeEchoTranscodeStream(c echo.Context, clientId string) e
return err
}
segment, err := transcoder.ParseSegment(split[2])
segment, err := cassette.ParseSegment(split[2])
if err != nil {
return err
}
ret, err := r.transcoder.MustGet().GetAudioSegment(mediaContainer.Filepath, mediaContainer.Hash, mediaContainer.MediaInfo, int32(audio), segment, clientId)
ret, err := r.transcoder.MustGet().GetAudioSegment(
c.Request().Context(),
mediaContainer.Filepath, mediaContainer.Hash, mediaContainer.MediaInfo, int32(audio), segment, clientId)
if err != nil {
return err
}
@@ -168,7 +173,7 @@ func (r *Repository) ShutdownTranscodeStream(clientId string) {
r.transcoder.MustGet().Destroy()
// Load a new transcoder
r.transcoder = mo.None[*transcoder.Transcoder]()
r.transcoder = mo.None[*cassette.Cassette]()
r.initializeTranscoder(r.settings)
// Send event

View File

@@ -7,18 +7,27 @@ import (
"path/filepath"
"seanime/internal/util"
"seanime/internal/util/crashlog"
"time"
"github.com/rs/zerolog"
)
func GetFileSubsCacheDir(outDir string, hash string) string {
return filepath.Join(outDir, "videofiles", hash, "/subs")
return filepath.Join(outDir, "videofiles", hash, "subs")
}
func GetFileAttCacheDir(outDir string, hash string) string {
return filepath.Join(outDir, "videofiles", hash, "/att")
return filepath.Join(outDir, "videofiles", hash, "att")
}
// ExtractAttachment extracts subtitles and font attachments from a media file
// using ffmpeg. It skips extraction if the output directory already contains
// the expected number of subtitle files.
//
// Improvements over the previous version:
// - 120-second timeout prevents hangs on corrupt/huge files.
// - Validates subtitle extensions before starting ffmpeg.
// - Uses context cancellation for clean cleanup on timeout.
func ExtractAttachment(ffmpegPath string, path string, hash string, mediaInfo *MediaInfo, cacheDir string, logger *zerolog.Logger) (err error) {
logger.Debug().Str("hash", hash).Msgf("videofile: Starting media attachment extraction")
@@ -27,20 +36,24 @@ func ExtractAttachment(ffmpegPath string, path string, hash string, mediaInfo *M
_ = os.MkdirAll(attachmentPath, 0755)
_ = os.MkdirAll(subsPath, 0755)
// Check if subtitles are already extracted.
subsDir, err := os.ReadDir(subsPath)
if err == nil {
if len(subsDir) == len(mediaInfo.Subtitles) {
logger.Debug().Str("hash", hash).Msgf("videofile: Attachments already extracted")
return
if err == nil && len(subsDir) >= len(mediaInfo.Subtitles) {
logger.Debug().Str("hash", hash).Msgf("videofile: Attachments already extracted")
return nil
}
// Validate all subtitles have supported extensions before starting ffmpeg.
for _, sub := range mediaInfo.Subtitles {
if sub.Extension == nil || *sub.Extension == "" {
logger.Error().Uint32("index", sub.Index).Msgf("videofile: Subtitle format is not supported, skipping")
continue
}
}
for _, sub := range mediaInfo.Subtitles {
if sub.Extension == nil || *sub.Extension == "" {
logger.Error().Msgf("videofile: Subtitle format is not supported")
return fmt.Errorf("videofile: Unsupported subtitle format")
}
}
// Use a timeout context to prevent hangs on corrupt files.
ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
defer cancel()
// Instantiate a new crash logger
crashLogger := crashlog.GlobalCrashLogger.InitArea("ffmpeg")
@@ -48,35 +61,46 @@ func ExtractAttachment(ffmpegPath string, path string, hash string, mediaInfo *M
crashLogger.LogInfof("Extracting attachments from %s", path)
// DEVNOTE: All paths fed into this command should be absolute
cmd := util.NewCmdCtx(
context.Background(),
ffmpegPath,
// Build ffmpeg command: dump font attachments and extract subtitles.
args := []string{
"-dump_attachment:t", "",
// override old attachments
"-y",
"-i", path,
)
// The working directory for the command is the attachment directory
cmd.Dir = attachmentPath
for _, sub := range mediaInfo.Subtitles {
if ext := sub.Extension; ext != nil {
cmd.Args = append(
cmd.Args,
"-map", fmt.Sprintf("0:s:%d", sub.Index),
"-c:s", "copy",
fmt.Sprintf("%s/%d.%s", subsPath, sub.Index, *ext),
)
}
}
extractedCount := 0
for _, sub := range mediaInfo.Subtitles {
if sub.Extension == nil || *sub.Extension == "" {
continue
}
args = append(args,
"-map", fmt.Sprintf("0:s:%d", sub.Index),
"-c:s", "copy",
fmt.Sprintf("%s/%d.%s", subsPath, sub.Index, *sub.Extension),
)
extractedCount++
}
if extractedCount == 0 {
logger.Debug().Str("hash", hash).Msg("videofile: No extractable subtitles found")
return nil
}
cmd := util.NewCmdCtx(ctx, ffmpegPath, args...)
cmd.Dir = attachmentPath
cmd.Stdout = crashLogger.Stdout()
cmd.Stderr = crashLogger.Stdout()
err = cmd.Run()
if err != nil {
logger.Error().Err(err).Msgf("videofile: Error starting FFmpeg")
if ctx.Err() != nil {
logger.Error().Str("hash", hash).Msg("videofile: FFmpeg attachment extraction timed out")
} else {
logger.Error().Err(err).Msgf("videofile: Error running FFmpeg")
}
crashlog.GlobalCrashLogger.WriteAreaLogToFile(crashLogger)
} else {
logger.Debug().Str("hash", hash).Int("subtitles", extractedCount).Msg("videofile: Attachment extraction complete")
}
return err

View File

@@ -9,6 +9,7 @@ import (
"seanime/internal/util/filecache"
"strconv"
"strings"
"sync"
"time"
"github.com/rs/zerolog"
@@ -18,8 +19,6 @@ import (
)
type MediaInfo struct {
// closed if the mediainfo is ready for read. open otherwise
ready <-chan struct{}
// The sha1 of the video file
Sha string `json:"sha"`
// The internal path of the video file
@@ -62,6 +61,14 @@ type Video struct {
Height uint32 `json:"height"`
// The average bitrate of the video in bytes/s
Bitrate uint32 `json:"bitrate"`
// The pixel format
PixFmt string `json:"pixFmt"`
// The color space
ColorSpace string `json:"colorSpace"`
// The color transfer
ColorTransfer string `json:"colorTransfer"`
// The color primaries
ColorPrimaries string `json:"colorPrimaries"`
}
type Audio struct {
@@ -165,10 +172,17 @@ func (e *MediaInfoExtractor) GetInfo(ffprobePath, path string) (mi *MediaInfo, e
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// ffprobeOnce ensures the binary path is set exactly once. The go-ffprobe
// library uses a package-global variable for the path, which is racy under
// concurrent use. Setting it via sync.Once eliminates the race.
var ffprobeOnce sync.Once
func FfprobeGetInfo(ffprobePath, path, hash string) (*MediaInfo, error) {
if ffprobePath != "" {
ffprobe.SetFFProbeBinPath(ffprobePath)
ffprobeOnce.Do(func() {
ffprobe.SetFFProbeBinPath(ffprobePath)
})
}
ffprobeCtx, cancel := context.WithTimeout(context.Background(), 40*time.Second)
@@ -205,13 +219,37 @@ func FfprobeGetInfo(ffprobePath, path, hash string) (*MediaInfo, error) {
Height: uint32(stream.Height),
// ffmpeg does not report bitrate in mkv files, fallback to bitrate of the whole container
// (bigger than the result since it contains audio and other videos but better than nothing).
Bitrate: uint32(bitrate),
Bitrate: uint32(bitrate),
PixFmt: stream.PixFmt,
ColorSpace: stream.ColorSpace,
ColorTransfer: stream.ColorTransfer,
ColorPrimaries: stream.ColorPrimaries,
}
})
// Get the audio streams
mi.Audios = streamToMap(data.Streams, ffprobe.StreamAudio, func(stream *ffprobe.Stream, i uint32) Audio {
lang, _ := language.Parse(stream.Tags.Language)
// Parse channel count from the stream. This is critical for the
// cassette package's surround audio passthrough and for advertising
// the correct channel count in the HLS master playlist.
channels := uint32(stream.Channels)
if channels == 0 {
// Fallback: parse ChannelLayout for channel count estimation.
// Common layouts: "stereo" (2), "5.1" (6), "5.1(side)" (6), "7.1" (8)
switch {
case strings.Contains(stream.ChannelLayout, "7.1"):
channels = 8
case strings.Contains(stream.ChannelLayout, "5.1"):
channels = 6
case strings.Contains(stream.ChannelLayout, "stereo"):
channels = 2
case strings.Contains(stream.ChannelLayout, "mono"):
channels = 1
default:
channels = 2 // Safe default
}
}
return Audio{
Index: i,
Title: nullIfZero(stream.Tags.Title),
@@ -220,6 +258,7 @@ func FfprobeGetInfo(ffprobePath, path, hash string) (*MediaInfo, error) {
MimeCodec: streamToMimeCodec(stream),
IsDefault: stream.Disposition.Default != 0,
IsForced: stream.Disposition.Forced != 0,
Channels: channels,
}
})
@@ -419,7 +458,6 @@ func streamToMimeCodec(stream *ffprobe.Stream) *string {
case "alac":
ret := "alac"
return &ret
default:
return nil
}

View File

@@ -1,19 +1,26 @@
package videofile
import (
"crypto/sha1"
"crypto/sha256"
"encoding/hex"
"fmt"
"os"
)
// GetHashFromPath returns a deterministic hash derived from the file path and
// its last-modified timestamp. The hash changes when the file is re-written,
// ensuring stale cache entries are invalidated.
//
// Uses SHA-256 with a stable time format (UnixNano) to avoid locale-dependent
// string representations of ModTime.
func GetHashFromPath(path string) (string, error) {
info, err := os.Stat(path)
if err != nil {
return "", err
}
h := sha1.New()
h.Write([]byte(path))
h.Write([]byte(info.ModTime().String()))
sha := hex.EncodeToString(h.Sum(nil))
return sha, nil
h := sha256.New()
// Use UnixNano instead of ModTime().String() which can vary across
// locales and Go versions.
_, _ = fmt.Fprintf(h, "%s:%d", path, info.ModTime().UnixNano())
return hex.EncodeToString(h.Sum(nil))[:40], nil // 40-char hex prefix
}

View File

@@ -105,11 +105,13 @@ function MediastreamPage() {
const [subsToken, setSubsToken] = React.useState("")
const [attToken, setAttToken] = React.useState("")
const [directToken, setDirectToken] = React.useState("")
const [transcodeToken, setTranscodeToken] = React.useState("")
React.useEffect(() => {
(async () => {
setSubsToken(await getHMACTokenQueryParam("/api/v1/mediastream/subs", "?"))
setAttToken(await getHMACTokenQueryParam("/api/v1/mediastream/att", "?"))
setDirectToken(await getHMACTokenQueryParam("/api/v1/mediastream/direct", "?"))
setTranscodeToken(await getHMACTokenQueryParam("/api/v1/mediastream/transcode", "?"))
})()
}, [getHMACTokenQueryParam])
@@ -188,6 +190,8 @@ function MediastreamPage() {
// Append HMAC token for direct play URLs
if (mediaContainer.streamType === "direct" && directToken) {
_newUrl += directToken
} else if (mediaContainer.streamType === "transcode" && transcodeToken) {
_newUrl += transcodeToken
}
log.info("Setting stream URL", _newUrl)
changeUrl(_newUrl)