fix: directstream cancelation

This commit is contained in:
5rahim
2026-04-16 10:45:00 +02:00
parent 1a18a794b3
commit 47c204c633
16 changed files with 506 additions and 108 deletions

View File

@@ -2,6 +2,13 @@
All notable changes to this project will be documented in this file.
## v3.6.1
- 🦺 In-App Player: Fixed deadlock caused by stream cancellation
- Cancelling a stream no longer cause future stream requests to hang (regression)
- It is now possible to cancel a stream while metadata is being loaded
- 🏗️ VideoCore: Refactored stream cancellation logic
## v3.6.0
- ⚡️ Anime: Paste magnet link anywhere to stream or download

View File

@@ -6,7 +6,7 @@ import (
)
const (
Version = "3.6.0"
Version = "3.6.1"
VersionName = "Kagero"
GcTime = time.Minute * 30
ConfigFileName = "config.toml"

View File

@@ -330,9 +330,6 @@ type PlayDebridStreamOptions struct {
// PlayDebridStream is used by a module to load a new debrid stream.
func (m *Manager) PlayDebridStream(ctx context.Context, filepath string, opts PlayDebridStreamOptions) error {
m.playbackMu.Lock()
defer m.playbackMu.Unlock()
episodeCollection, err := anime.NewEpisodeCollection(anime.NewEpisodeCollectionOptions{
AnimeMetadata: nil,
Media: opts.Media,
@@ -367,8 +364,6 @@ func (m *Manager) PlayDebridStream(ctx context.Context, filepath string, opts Pl
}
go func() {
m.playbackMu.Lock()
defer m.playbackMu.Unlock()
m.loadStream(stream)
}()

View File

@@ -244,9 +244,6 @@ type PlayLocalFileOptions struct {
func (m *Manager) PlayLocalFile(ctx context.Context, opts PlayLocalFileOptions) error {
m.ResetOpenState(opts.ClientId)
m.playbackMu.Lock()
defer m.playbackMu.Unlock()
animeCollection, ok := m.animeCollection.Get()
if !ok {
return fmt.Errorf("cannot play local file, anime collection is not set")

View File

@@ -51,6 +51,8 @@ type (
// ---------- Playback State ---------- //
currentStream mo.Option[Stream] // The current stream being played
currentPlaybackID string
currentPlaybackClient string
preparingClientID string
preparationCanceled bool
preparationCancelFunc func()

View File

@@ -286,9 +286,6 @@ type PlayNakamaStreamOptions struct {
func (m *Manager) PlayNakamaStream(ctx context.Context, opts PlayNakamaStreamOptions) error {
m.ResetOpenState(opts.ClientId)
m.playbackMu.Lock()
defer m.playbackMu.Unlock()
episodeCollection, err := anime.NewEpisodeCollection(anime.NewEpisodeCollectionOptions{
AnimeMetadata: nil,
Media: opts.Media,
@@ -322,8 +319,6 @@ func (m *Manager) PlayNakamaStream(ctx context.Context, opts PlayNakamaStreamOpt
}
go func() {
m.playbackMu.Lock()
defer m.playbackMu.Unlock()
m.loadStream(stream)
}()

View File

@@ -86,15 +86,23 @@ func (m *Manager) getStreamHandler() http.Handler {
}
func (m *Manager) BeginOpen(clientId string, step string, onCancel func()) bool {
// if there's a current stream, stop it
m.playbackMu.Lock()
defer m.playbackMu.Unlock()
previousStream, cancelPlayback, _ := m.releaseCurrentStreamLocked(nil)
m.clearPreparationLocked()
m.clearCurrentPlaybackIdentityLocked()
m.playbackMu.Unlock()
m.prepareNewStream()
m.cancelAndTerminateStream(previousStream, cancelPlayback)
m.playbackMu.Lock()
m.preparingClientID = clientId
m.preparationCanceled = false
m.preparationCancelFunc = onCancel
ok := m.updateOpenStepLocked(clientId, step)
m.playbackMu.Unlock()
return m.updateOpenStepLocked(clientId, step)
return ok
}
func (m *Manager) UpdateOpenStep(clientId string, step string) bool {
@@ -167,18 +175,13 @@ func (m *Manager) ResetOpenState(clientId string) {
func (m *Manager) GetCurrentPlaybackIdentity() (playbackID string, clientID string, ok bool) {
m.playbackMu.Lock()
stream, hasStream := m.currentStream.Get()
m.playbackMu.Unlock()
if !hasStream {
defer m.playbackMu.Unlock()
if m.currentPlaybackID == "" || m.currentPlaybackClient == "" {
return "", "", false
}
playbackInfo, err := stream.LoadPlaybackInfo()
if err != nil || playbackInfo == nil {
return "", "", false
}
return playbackInfo.ID, stream.ClientId(), true
return m.currentPlaybackID, m.currentPlaybackClient, true
}
func (m *Manager) PrepareNewStream(clientId string, step string) {
@@ -193,28 +196,18 @@ func (m *Manager) StreamError(err error) {
}
}
// AbortOpen stops the stream preparation
func (m *Manager) AbortOpen(clientId string, err error) {
m.playbackMu.Lock()
defer m.playbackMu.Unlock()
m.abortPreparation(clientId, err)
}
func (m *Manager) prepareNewStream() {
// Cancel the previous playback
if m.playbackCtxCancelFunc != nil {
m.Logger.Trace().Msgf("directstream: Cancelling previous playback")
m.playbackCtxCancelFunc()
m.playbackCtxCancelFunc = nil
}
// Clear the current stream if it exists
if stream, ok := m.currentStream.Get(); ok {
m.Logger.Debug().Msgf("directstream: Terminating previous stream before preparing new stream")
stream.Terminate()
m.currentStream = mo.None[Stream]()
}
previousStream, cancelPlayback, _ := m.releaseCurrentStreamLocked(nil)
m.clearPreparationLocked()
m.clearCurrentPlaybackIdentityLocked()
m.playbackMu.Unlock()
m.cancelAndTerminateStream(previousStream, cancelPlayback)
m.Logger.Debug().Msgf("directstream: Signaling native player to abort stream preparation, reason: %s", err.Error())
m.nativePlayer.AbortOpen(clientId, err.Error())
}
func (m *Manager) updateOpenStepLocked(clientId string, step string) bool {
@@ -232,20 +225,83 @@ func (m *Manager) updateOpenStepLocked(clientId string, step string) bool {
return true
}
func (m *Manager) abortPreparation(clientId string, err error) {
m.prepareNewStream()
m.Logger.Debug().Msgf("directstream: Signaling native player to abort stream preparation, reason: %s", err.Error())
m.clearPreparationLocked()
m.nativePlayer.AbortOpen(clientId, err.Error())
}
func (m *Manager) clearPreparationLocked() {
m.preparingClientID = ""
m.preparationCanceled = false
m.preparationCancelFunc = nil
}
func (m *Manager) clearCurrentPlaybackIdentityLocked() {
m.currentPlaybackID = ""
m.currentPlaybackClient = ""
}
// releaseCurrentStreamLocked
func (m *Manager) releaseCurrentStreamLocked(target Stream) (stream Stream, cancel context.CancelFunc, ok bool) {
currentStream, hasCurrentStream := m.currentStream.Get()
if target != nil {
if !hasCurrentStream || currentStream != target {
return nil, nil, false
}
}
cancel = m.playbackCtxCancelFunc
m.playbackCtx = nil
m.playbackCtxCancelFunc = nil
if hasCurrentStream {
m.currentStream = mo.None[Stream]()
return currentStream, cancel, true
}
return nil, cancel, target == nil
}
func (m *Manager) cancelAndTerminateStream(stream Stream, cancel context.CancelFunc) {
if cancel != nil {
m.Logger.Trace().Msg("directstream: Cancelling playback context")
cancel()
}
if stream != nil {
m.Logger.Debug().Msg("directstream: Terminating current stream")
stream.Terminate()
}
}
func (m *Manager) isCurrentStreamLocked(stream Stream) bool {
currentStream, ok := m.currentStream.Get()
return ok && currentStream == stream
}
func (m *Manager) clearStreamLoadingState(stream Stream) {
m.playbackMu.Lock()
_, cancelPlayback, ok := m.releaseCurrentStreamLocked(stream)
m.playbackMu.Unlock()
if !ok {
return
}
if cancelPlayback != nil {
cancelPlayback()
}
}
func (m *Manager) shouldHandleTerminatedEventLocked(event *videocore.VideoTerminatedEvent, stream Stream) bool {
if event.GetClientId() != "" && event.GetClientId() != stream.ClientId() {
return false
}
if m.currentPlaybackID == "" {
return true
}
if event.GetPlaybackId() == "" {
return false
}
return event.GetPlaybackId() == m.currentPlaybackID
}
func (m *Manager) cancelPreparationLocked(clientId string, clearCancelFunc bool) (func(), bool) {
if clientId != "" && m.preparingClientID != "" && m.preparingClientID != clientId {
return nil, false
@@ -274,27 +330,35 @@ func (m *Manager) shouldStopOpeningLocked(clientId string) bool {
func (m *Manager) discardCurrentStreamLocked(stream Stream) {
if currentStream, ok := m.currentStream.Get(); ok && currentStream == stream {
m.currentStream = mo.None[Stream]()
m.playbackCtx = nil
m.playbackCtxCancelFunc = nil
}
}
// loadStream loads a new stream and cancels the previous one.
// Caller should use mutex to lock the manager.
// loadStream loads a new stream and keeps the control paths responsive while metadata is being prepared.
func (m *Manager) loadStream(stream Stream) {
if !m.updateOpenStepLocked(stream.ClientId(), "Loading stream...") {
if !m.UpdateOpenStep(stream.ClientId(), "Loading stream...") {
return
}
m.Logger.Debug().Msgf("directstream: Loading stream")
m.currentStream = mo.Some(stream)
// Create a new context
ctx, cancel := context.WithCancel(context.Background())
if setter, ok := stream.(interface{ setPlaybackCancelFunc(context.CancelFunc) }); ok {
setter.setPlaybackCancelFunc(cancel)
}
m.playbackMu.Lock()
m.currentStream = mo.Some(stream)
m.playbackCtx = ctx
m.playbackCtxCancelFunc = cancel
m.clearCurrentPlaybackIdentityLocked()
m.playbackMu.Unlock()
m.Logger.Debug().Msgf("directstream: Loading content type")
if !m.updateOpenStepLocked(stream.ClientId(), "Loading metadata...") {
m.discardCurrentStreamLocked(stream)
if !m.UpdateOpenStep(stream.ClientId(), "Loading metadata...") {
m.clearStreamLoadingState(stream)
return
}
// Load the content type
@@ -304,8 +368,11 @@ func (m *Manager) loadStream(stream Stream) {
m.preStreamError(stream, fmt.Errorf("failed to load content type"))
return
}
if ctx.Err() != nil || m.shouldStopOpeningLocked(stream.ClientId()) {
m.discardCurrentStreamLocked(stream)
m.playbackMu.Lock()
shouldStopOpening := ctx.Err() != nil || m.shouldStopOpeningLocked(stream.ClientId()) || !m.isCurrentStreamLocked(stream)
m.playbackMu.Unlock()
if shouldStopOpening {
m.clearStreamLoadingState(stream)
return
}
@@ -319,10 +386,17 @@ func (m *Manager) loadStream(stream Stream) {
m.preStreamError(stream, fmt.Errorf("failed to load playback info: %w", err))
return
}
if ctx.Err() != nil || m.shouldStopOpeningLocked(stream.ClientId()) {
m.discardCurrentStreamLocked(stream)
m.playbackMu.Lock()
shouldStopOpening = ctx.Err() != nil || m.shouldStopOpeningLocked(stream.ClientId()) || !m.isCurrentStreamLocked(stream)
if shouldStopOpening {
m.playbackMu.Unlock()
m.clearStreamLoadingState(stream)
return
}
m.currentPlaybackID = playbackInfo.ID
m.currentPlaybackClient = stream.ClientId()
m.clearPreparationLocked()
m.playbackMu.Unlock()
// Shut the mkv parser logger
//parser, ok := playbackInfo.MkvMetadataParser.Get()
@@ -331,7 +405,6 @@ func (m *Manager) loadStream(stream Stream) {
//}
m.Logger.Debug().Msgf("directstream: Signaling native player that stream is ready")
m.clearPreparationLocked()
m.nativePlayer.Watch(stream.ClientId(), playbackInfo)
}
@@ -364,18 +437,22 @@ func (m *Manager) listenToPlayerEvents() {
}
continue
}
m.playbackMu.Unlock()
if terminatedEvent, isTerminated := event.(*videocore.VideoTerminatedEvent); isTerminated {
if !m.shouldHandleTerminatedEventLocked(terminatedEvent, cs) {
m.playbackMu.Unlock()
continue
}
m.clearPreparationLocked()
m.playbackMu.Unlock()
if event.GetClientId() != cs.ClientId() {
m.Logger.Debug().Msgf("directstream: Video terminated")
m.unloadStream(cs)
continue
}
if _, isTerminated := event.(*videocore.VideoTerminatedEvent); isTerminated {
m.Logger.Debug().Msgf("directstream: Video terminated")
m.playbackMu.Lock()
_, _ = m.cancelPreparationLocked(cs.ClientId(), true)
m.playbackMu.Unlock()
cs.Terminate()
m.playbackMu.Unlock()
if event.GetClientId() != cs.ClientId() {
continue
}
@@ -429,23 +506,19 @@ func (m *Manager) listenToPlayerEvents() {
}()
}
func (m *Manager) unloadStream() {
func (m *Manager) unloadStream(targets ...Stream) {
m.Logger.Debug().Msg("directstream: Unloading current stream")
// Cancel any existing playback context first
if m.playbackCtxCancelFunc != nil {
m.Logger.Trace().Msg("directstream: Cancelling playback context")
m.playbackCtxCancelFunc()
m.playbackCtxCancelFunc = nil
var target Stream
if len(targets) > 0 {
target = targets[0]
}
// Clear the current stream
if stream, ok := m.currentStream.Get(); ok {
m.Logger.Debug().Msg("directstream: Terminating current stream")
stream.Terminate()
m.playbackMu.Lock()
stream, cancelPlayback, ok := m.releaseCurrentStreamLocked(target)
m.playbackMu.Unlock()
if ok {
m.cancelAndTerminateStream(stream, cancelPlayback)
}
m.currentStream = mo.None[Stream]()
m.Logger.Debug().Msg("directstream: Stream unloaded successfully")
}
@@ -463,6 +536,7 @@ type BaseStream struct {
playbackInfo *nativeplayer.PlaybackInfo
playbackInfoErr error
playbackInfoOnce sync.Once
playbackCancelFunc context.CancelFunc
subtitleEventCache *result.Map[string, *mkvparser.SubtitleEvent]
terminateOnce sync.Once
serveContentCancelFunc context.CancelFunc
@@ -517,12 +591,16 @@ func (s *BaseStream) ClientId() string {
return s.clientId
}
func (s *BaseStream) setPlaybackCancelFunc(cancel context.CancelFunc) {
s.playbackCancelFunc = cancel
}
func (s *BaseStream) Terminate() {
s.terminateOnce.Do(func() {
// Cancel the playback context
// This will snowball and cancel other stuff
if s.manager.playbackCtxCancelFunc != nil {
s.manager.playbackCtxCancelFunc()
if s.playbackCancelFunc != nil {
s.playbackCancelFunc()
}
// Cancel all active subtitle streams
@@ -538,11 +616,15 @@ func (s *BaseStream) Terminate() {
func (s *BaseStream) StreamError(err error) {
s.logger.Error().Err(err).Msg("directstream: Stream error occurred")
s.manager.nativePlayer.Error(s.clientId, err)
s.Terminate()
s.manager.playbackMu.Lock()
s.manager.unloadStream()
if !s.manager.isCurrentStreamLocked(s) {
s.manager.playbackMu.Unlock()
return
}
s.manager.playbackMu.Unlock()
s.manager.nativePlayer.Error(s.clientId, err)
s.manager.unloadStream(s)
}
func (s *BaseStream) GetSubtitleEventCache() *result.Map[string, *mkvparser.SubtitleEvent] {
@@ -588,10 +670,16 @@ func loadContentType(path string, reader ...io.ReadSeekCloser) string {
}
func (m *Manager) preStreamError(stream Stream, err error) {
stream.Terminate()
m.playbackMu.Lock()
if !m.isCurrentStreamLocked(stream) {
m.playbackMu.Unlock()
return
}
m.clearPreparationLocked()
m.playbackMu.Unlock()
m.nativePlayer.Error(stream.ClientId(), err)
m.unloadStream()
m.unloadStream(stream)
}
func (m *Manager) getContentTypeAndLength(url string) (string, int64, error) {

View File

@@ -2,21 +2,21 @@ package directstream
import (
"context"
"encoding/json"
"io"
"net/http"
"net/http/httptest"
"seanime/internal/api/anilist"
"seanime/internal/library/anime"
"testing"
"time"
"seanime/internal/events"
"seanime/internal/library/anime"
"seanime/internal/mkvparser"
"seanime/internal/nativeplayer"
"seanime/internal/util"
"seanime/internal/util/result"
"seanime/internal/videocore"
"sync"
"testing"
"time"
"github.com/samber/mo"
"github.com/stretchr/testify/require"
@@ -46,8 +46,10 @@ type trackingReadSeekCloser struct {
type blockingStream struct {
clientID string
loadPlaybackCh chan struct{}
loadStartedCh chan struct{}
terminatedCh chan struct{}
terminated bool
startOnce sync.Once
}
func (s *blockingStream) Type() nativeplayer.StreamType { return nativeplayer.StreamTypeTorrent }
@@ -58,6 +60,11 @@ func (s *blockingStream) Episode() *anime.Episode { return n
func (s *blockingStream) ListEntryData() *anime.EntryListData { return nil }
func (s *blockingStream) EpisodeCollection() *anime.EpisodeCollection { return nil }
func (s *blockingStream) LoadPlaybackInfo() (*nativeplayer.PlaybackInfo, error) {
s.startOnce.Do(func() {
if s.loadStartedCh != nil {
close(s.loadStartedCh)
}
})
<-s.loadPlaybackCh
return &nativeplayer.PlaybackInfo{ID: "blocked"}, nil
}
@@ -78,6 +85,82 @@ func (s *blockingStream) GetSubtitleEventCache() *result.Map[string, *mkvparser.
}
func (s *blockingStream) OnSubtitleFileUploaded(string, string) {}
type prevTerminateStream struct {
manager *Manager
clientID string
terminatedCh chan struct{}
terminateOnce sync.Once
}
func (s *prevTerminateStream) Type() nativeplayer.StreamType {
return nativeplayer.StreamTypeTorrent
}
func (s *prevTerminateStream) LoadContentType() string { return "video/webm" }
func (s *prevTerminateStream) ClientId() string { return s.clientID }
func (s *prevTerminateStream) Media() *anilist.BaseAnime { return nil }
func (s *prevTerminateStream) Episode() *anime.Episode { return nil }
func (s *prevTerminateStream) ListEntryData() *anime.EntryListData { return nil }
func (s *prevTerminateStream) EpisodeCollection() *anime.EpisodeCollection { return nil }
func (s *prevTerminateStream) LoadPlaybackInfo() (*nativeplayer.PlaybackInfo, error) {
return &nativeplayer.PlaybackInfo{ID: "previous-playback"}, nil
}
func (s *prevTerminateStream) GetAttachmentByName(string) (*mkvparser.AttachmentInfo, bool) {
return nil, false
}
func (s *prevTerminateStream) GetStreamHandler() http.Handler { return http.NewServeMux() }
func (s *prevTerminateStream) StreamError(error) {}
func (s *prevTerminateStream) Terminate() {
s.terminateOnce.Do(func() {
close(s.terminatedCh)
_ = s.manager.CloseOpen("")
})
}
func (s *prevTerminateStream) GetSubtitleEventCache() *result.Map[string, *mkvparser.SubtitleEvent] {
return result.NewMap[string, *mkvparser.SubtitleEvent]()
}
func (s *prevTerminateStream) OnSubtitleFileUploaded(string, string) {}
type eventStream struct {
clientID string
playbackInfo *nativeplayer.PlaybackInfo
terminatedCh chan struct{}
terminateOnce sync.Once
}
func (s *eventStream) Type() nativeplayer.StreamType { return nativeplayer.StreamTypeTorrent }
func (s *eventStream) LoadContentType() string { return "video/webm" }
func (s *eventStream) ClientId() string { return s.clientID }
func (s *eventStream) Media() *anilist.BaseAnime { return nil }
func (s *eventStream) Episode() *anime.Episode { return nil }
func (s *eventStream) ListEntryData() *anime.EntryListData { return nil }
func (s *eventStream) EpisodeCollection() *anime.EpisodeCollection { return nil }
func (s *eventStream) LoadPlaybackInfo() (*nativeplayer.PlaybackInfo, error) {
return s.playbackInfo, nil
}
func (s *eventStream) GetAttachmentByName(string) (*mkvparser.AttachmentInfo, bool) {
return nil, false
}
func (s *eventStream) GetStreamHandler() http.Handler { return http.NewServeMux() }
func (s *eventStream) StreamError(error) {}
func (s *eventStream) Terminate() {
s.terminateOnce.Do(func() {
close(s.terminatedCh)
})
}
func (s *eventStream) GetSubtitleEventCache() *result.Map[string, *mkvparser.SubtitleEvent] {
return result.NewMap[string, *mkvparser.SubtitleEvent]()
}
func (s *eventStream) OnSubtitleFileUploaded(string, string) {}
func mustMarshalRawMessage(t *testing.T, value interface{}) json.RawMessage {
t.Helper()
data, err := json.Marshal(value)
require.NoError(t, err)
return data
}
func (r *trackingReadSeekCloser) Read(_ []byte) (int, error) {
return 0, io.EOF
}
@@ -184,6 +267,7 @@ func TestListenToPlayerEventsTerminatesWithoutWaitingForPlaybackInfo(t *testing.
stream := &blockingStream{
clientID: "player-client",
loadPlaybackCh: make(chan struct{}),
loadStartedCh: make(chan struct{}),
terminatedCh: make(chan struct{}),
}
manager.currentStream = mo.Some[Stream](stream)
@@ -208,3 +292,183 @@ func TestListenToPlayerEventsTerminatesWithoutWaitingForPlaybackInfo(t *testing.
t.Fatal("expected terminate to bypass playback info loading")
}
}
// ensures that if a new stream is started while the previous stream is still loading playback info,
// the previous stream will be terminated without waiting for the playback info to finish loading
func TestStream_beginOpenTerminatesPreviousStream(t *testing.T) {
logger := util.NewLogger()
ws := events.NewMockWSEventManager(logger)
vc := videocore.New(videocore.NewVideoCoreOptions{
WsEventManager: ws,
Logger: logger,
})
np := nativeplayer.New(nativeplayer.NewNativePlayerOptions{
WsEventManager: ws,
Logger: logger,
VideoCore: vc,
})
manager := NewManager(NewManagerOptions{
Logger: logger,
WSEventManager: ws,
NativePlayer: np,
VideoCore: vc,
})
stream := &prevTerminateStream{
manager: manager,
clientID: "player-client",
terminatedCh: make(chan struct{}),
}
manager.currentStream = mo.Some[Stream](stream)
t.Cleanup(func() {
vc.Shutdown()
})
done := make(chan bool, 1)
go func() {
done <- manager.BeginOpen("player-client", "opening", nil)
}()
select {
case <-stream.terminatedCh:
case <-time.After(250 * time.Millisecond):
t.Fatal("expected previous stream to terminate")
}
select {
case ok := <-done:
require.True(t, ok)
case <-time.After(250 * time.Millisecond):
t.Fatal("expected BeginOpen to return without deadlocking")
}
}
func TestStream_closeOpenReturnsWhileLoadPlaybackInfoIsBlocked(t *testing.T) {
logger := util.NewLogger()
ws := events.NewMockWSEventManager(logger)
vc := videocore.New(videocore.NewVideoCoreOptions{
WsEventManager: ws,
Logger: logger,
})
np := nativeplayer.New(nativeplayer.NewNativePlayerOptions{
WsEventManager: ws,
Logger: logger,
VideoCore: vc,
})
manager := NewManager(NewManagerOptions{
Logger: logger,
WSEventManager: ws,
NativePlayer: np,
VideoCore: vc,
})
stream := &blockingStream{
clientID: "player-client",
loadPlaybackCh: make(chan struct{}),
loadStartedCh: make(chan struct{}),
terminatedCh: make(chan struct{}),
}
t.Cleanup(func() {
close(stream.loadPlaybackCh)
vc.Shutdown()
})
require.True(t, manager.BeginOpen("player-client", "opening", nil))
go manager.loadStream(stream)
select {
case <-stream.loadStartedCh:
case <-time.After(250 * time.Millisecond):
t.Fatal("expected stream loading to reach playback info")
}
done := make(chan bool, 1)
go func() {
done <- manager.CloseOpen("player-client")
}()
select {
case ok := <-done:
require.True(t, ok)
case <-time.After(250 * time.Millisecond):
t.Fatal("expected CloseOpen to return while metadata is still loading")
}
}
func TestStream_listenToPlayerEventsIgnoresStalePlaybackTermination(t *testing.T) {
logger := util.NewLogger()
ws := events.NewMockWSEventManager(logger)
vc := videocore.New(videocore.NewVideoCoreOptions{
WsEventManager: ws,
Logger: logger,
})
np := nativeplayer.New(nativeplayer.NewNativePlayerOptions{
WsEventManager: ws,
Logger: logger,
VideoCore: vc,
})
manager := NewManager(NewManagerOptions{
Logger: logger,
WSEventManager: ws,
NativePlayer: np,
VideoCore: vc,
})
stream := &eventStream{
clientID: "player-client",
playbackInfo: &nativeplayer.PlaybackInfo{ID: "current-playback-id"},
terminatedCh: make(chan struct{}),
}
manager.currentStream = mo.Some[Stream](stream)
manager.currentPlaybackID = "current-playback-id"
manager.currentPlaybackClient = "player-client"
t.Cleanup(func() {
vc.Shutdown()
})
ws.MockSendClientEvent(&events.WebsocketClientEvent{
ClientID: "socket-client",
Type: events.VideoCoreEventType,
Payload: videocore.ClientEvent{
ClientId: "player-client",
Type: videocore.PlayerEventVideoTerminated,
Payload: mustMarshalRawMessage(t, map[string]interface{}{
"id": "stale-playback-id",
"clientId": "player-client",
"playerType": "native",
"playbackType": "torrent",
}),
},
})
select {
case <-stream.terminatedCh:
t.Fatal("expected stale terminated event to be ignored")
case <-time.After(250 * time.Millisecond):
}
ws.MockSendClientEvent(&events.WebsocketClientEvent{
ClientID: "socket-client",
Type: events.VideoCoreEventType,
Payload: videocore.ClientEvent{
ClientId: "player-client",
Type: videocore.PlayerEventVideoTerminated,
Payload: mustMarshalRawMessage(t, map[string]interface{}{
"id": "current-playback-id",
"clientId": "player-client",
"playerType": "native",
"playbackType": "torrent",
}),
},
})
select {
case <-stream.terminatedCh:
case <-time.After(250 * time.Millisecond):
t.Fatal("expected matching terminated event to stop the stream")
}
}

View File

@@ -202,9 +202,6 @@ type PlayTorrentStreamOptions struct {
// PlayTorrentStream is used by a module to load a new torrent stream.
func (m *Manager) PlayTorrentStream(ctx context.Context, opts PlayTorrentStreamOptions) (chan struct{}, error) {
m.playbackMu.Lock()
defer m.playbackMu.Unlock()
episodeCollection, err := anime.NewEpisodeCollection(anime.NewEpisodeCollectionOptions{
AnimeMetadata: nil,
Media: opts.Media,
@@ -240,8 +237,6 @@ func (m *Manager) PlayTorrentStream(ctx context.Context, opts PlayTorrentStreamO
go func() {
<-stream.streamReadyCh
m.playbackMu.Lock()
defer m.playbackMu.Unlock()
m.loadStream(stream)
}()

View File

@@ -222,6 +222,12 @@ type (
clientVideoAnime4KPayload struct {
Option string `json:"option"`
}
clientVideoTerminatedPayload struct {
ID string `json:"id"`
ClientId string `json:"clientId"`
PlayerType PlayerType `json:"playerType"`
PlaybackType PlaybackType `json:"playbackType"`
}
clientVideoTextTracksPayload struct {
TextTracks []*VideoTextTrack `json:"textTracks"`
}

View File

@@ -944,11 +944,37 @@ func (vc *VideoCore) listenToClientEvents() {
})
}
case PlayerEventVideoTerminated:
payload := &clientVideoTerminatedPayload{}
_ = playerEvent.UnmarshalAs(payload)
if payload.ClientId != "" {
eventClientID = payload.ClientId
}
event := &VideoTerminatedEvent{}
if state, ok := vc.GetPlaybackState(); ok {
event.identify(state.PlaybackInfo.Id, state.ClientId, state.PlayerType, state.PlaybackInfo.PlaybackType)
playbackID := state.PlaybackInfo.Id
if payload.ID != "" {
playbackID = payload.ID
}
clientID := state.ClientId
if eventClientID != "" {
clientID = eventClientID
}
playerType := state.PlayerType
if payload.PlayerType != "" {
playerType = payload.PlayerType
}
playbackType := state.PlaybackInfo.PlaybackType
if payload.PlaybackType != "" {
playbackType = payload.PlaybackType
}
event.identify(playbackID, clientID, playerType, playbackType)
} else if eventClientID != "" {
event.identify("", eventClientID, NativePlayer, "")
playerType := payload.PlayerType
if playerType == "" {
playerType = NativePlayer
}
event.identify(payload.ID, eventClientID, playerType, payload.PlaybackType)
}
select {
case vc.eventBus <- event:

View File

@@ -1,12 +1,12 @@
{
"name": "seanime-denshi",
"version": "3.6.0-alpha.6",
"version": "3.6.0",
"lockfileVersion": 3,
"requires": true,
"packages": {
"": {
"name": "seanime-denshi",
"version": "3.6.0-alpha.6",
"version": "3.6.0",
"dependencies": {
"castv2": "^0.1.10",
"electron-log": "^5.4.3",

View File

@@ -1,6 +1,6 @@
{
"name": "seanime-denshi",
"version": "3.6.0",
"version": "3.6.1",
"description": "Electron-based Desktop client for Seanime",
"main": "src/main.js",
"author": "5rahim <talkwithrahim@gmail.com>",

View File

@@ -179,6 +179,9 @@ export function NativePlayer() {
//
function handleTerminateStream() {
const playbackId = state.playbackInfo?.id || ""
const playbackType = state.playbackInfo?.streamType || ""
// Clean up player first
if (videoElement) {
log.info("Cleaning up media")
@@ -205,6 +208,12 @@ export function NativePlayer() {
payload: {
clientId: clientId,
type: "video-terminated",
payload: {
id: playbackId,
clientId: clientId,
playerType: "native",
playbackType: playbackType,
},
},
})
}

View File

@@ -307,7 +307,12 @@ export function useVideoCoreSetupEvents(id: string,
function dispatchTerminatedEvent() {
log.trace("Video terminated")
sendEvent("video-terminated")
sendEvent("video-terminated", {
id: state.playbackInfo?.id || "",
clientId: clientId,
playerType: id === "native-player" ? "native" : "web",
playbackType: state.playbackInfo?.playbackType || "",
})
}
function dispatchTranslateTextEvent(text: string) {

View File

@@ -59,6 +59,9 @@ export function TorrentStreamOverlay({ isNativePlayerComponent, show }: {
const handleStopStream = React.useCallback(() => {
if (nativePlayerState.active && clientId) {
const playbackId = nativePlayerState.playbackInfo?.id || ""
const playbackType = nativePlayerState.playbackInfo?.streamType || ""
if (videoElement) {
videoElement.pause()
}
@@ -83,6 +86,12 @@ export function TorrentStreamOverlay({ isNativePlayerComponent, show }: {
payload: {
clientId,
type: "video-terminated",
payload: {
id: playbackId,
clientId,
playerType: "native",
playbackType,
},
},
})