This commit is contained in:
5rahim
2026-04-07 17:23:26 +02:00
parent 0457d84677
commit bfbbc15b19
4 changed files with 424 additions and 110 deletions

View File

@@ -11,6 +11,7 @@ import (
"seanime/internal/events"
"seanime/internal/util"
"strconv"
"sync"
"time"
"github.com/goccy/go-json"
@@ -259,7 +260,231 @@ func (ac *AnilistClientImpl) AnimeAiringScheduleRaw(ctx context.Context, ids []*
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
var sentRateLimitWarningTime = time.Now().Add(-10 * time.Second)
type requestRateBlocker interface {
Wait(ctx context.Context, sleep requestSleepFunc) error
BlockUntil(until time.Time) bool
}
type requestSleepFunc func(ctx context.Context, delay time.Duration) error
type aniListRateBlocker struct {
mu sync.Mutex
blockedUntil time.Time
now func() time.Time
}
func newAniListRateBlocker() *aniListRateBlocker {
return &aniListRateBlocker{now: time.Now}
}
func (b *aniListRateBlocker) Wait(ctx context.Context, sleep requestSleepFunc) error {
if sleep == nil {
sleep = sleepWithContext
}
for {
b.mu.Lock()
blockedUntil := b.blockedUntil
now := b.currentTime()
b.mu.Unlock()
if blockedUntil.IsZero() || !now.Before(blockedUntil) {
return nil
}
if err := sleep(ctx, blockedUntil.Sub(now)); err != nil {
return err
}
}
}
func (b *aniListRateBlocker) BlockUntil(until time.Time) bool {
if until.IsZero() {
return false
}
b.mu.Lock()
defer b.mu.Unlock()
now := b.currentTime()
if !until.After(now) || !until.After(b.blockedUntil) {
return false
}
b.blockedUntil = until
return true
}
func (b *aniListRateBlocker) currentTime() time.Time {
if b.now != nil {
return b.now()
}
return time.Now()
}
func parseResponseDate(headers http.Header) (time.Time, bool) {
raw := headers.Get("Date")
if raw == "" {
return time.Time{}, false
}
parsed, err := http.ParseTime(raw)
if err != nil {
return time.Time{}, false
}
return parsed, true
}
func parseAniListRateLimitResetTime(headers http.Header, now time.Time) (time.Time, bool) {
if resetAt, ok := parseRetryAfterTime(headers, now); ok {
return resetAt, true
}
raw := headers.Get("X-RateLimit-Reset")
if raw == "" {
return time.Time{}, false
}
if unixSeconds, err := strconv.ParseInt(raw, 10, 64); err == nil && unixSeconds > 0 {
return time.Unix(unixSeconds, 0), true
}
parsed, err := http.ParseTime(raw)
if err != nil {
return time.Time{}, false
}
return parsed, true
}
func parseRetryAfterTime(headers http.Header, now time.Time) (time.Time, bool) {
raw := headers.Get("Retry-After")
if raw == "" {
return time.Time{}, false
}
if retryAfterSeconds, err := strconv.Atoi(raw); err == nil {
return now.Truncate(time.Second).Add(time.Duration(retryAfterSeconds+1) * time.Second), true
}
parsed, err := http.ParseTime(raw)
if err != nil {
return time.Time{}, false
}
return parsed, true
}
var (
sentRateLimitWarningTime = time.Now().Add(-10 * time.Second)
sharedAniListRateBlocker requestRateBlocker = newAniListRateBlocker()
)
func doAniListRequestWithRetries(
client *http.Client,
req *http.Request,
rateBlocker requestRateBlocker,
sleep requestSleepFunc,
onRateLimited func(waitSeconds int),
) (resp *http.Response, rlRemainingStr string, err error) {
if client == nil {
client = http.DefaultClient
}
if sleep == nil {
sleep = sleepWithContext
}
const retryCount = 2
for i := 0; i < retryCount; i++ {
if err := req.Context().Err(); err != nil {
return nil, rlRemainingStr, err
}
if rateBlocker != nil {
if err := rateBlocker.Wait(req.Context(), sleep); err != nil {
return nil, rlRemainingStr, err
}
}
if i > 0 && req.Body != nil {
if req.GetBody == nil {
return nil, rlRemainingStr, errors.New("failed to retry request: request body is not replayable")
}
newBody, err := req.GetBody()
if err != nil {
return nil, rlRemainingStr, fmt.Errorf("failed to get request body: %w", err)
}
req.Body = newBody
}
resp, err = client.Do(req)
if err != nil {
return nil, rlRemainingStr, fmt.Errorf("request failed: %w", err)
}
rlRemainingStr = resp.Header.Get("X-Ratelimit-Remaining")
responseTime := time.Now()
if responseDate, ok := parseResponseDate(resp.Header); ok {
responseTime = responseDate
}
if resetAt, ok := parseAniListRateLimitResetTime(resp.Header, responseTime); ok {
if rateBlocker == nil || rateBlocker.BlockUntil(resetAt) {
if onRateLimited != nil {
waitSeconds := int(resetAt.Sub(responseTime).Round(time.Second) / time.Second)
if waitSeconds < 1 {
waitSeconds = 1
}
onRateLimited(waitSeconds)
}
}
closeAniListResponseBody(resp)
continue
}
return resp, rlRemainingStr, nil
}
return resp, rlRemainingStr, nil
}
func closeAniListResponseBody(resp *http.Response) {
if resp == nil || resp.Body == nil {
return
}
_ = resp.Body.Close()
resp.Body = nil
}
func sleepWithContext(ctx context.Context, delay time.Duration) error {
timer := time.NewTimer(delay)
defer timer.Stop()
select {
case <-ctx.Done():
return ctx.Err()
case <-timer.C:
return nil
}
}
func notifyAniListRateLimit(logger *zerolog.Logger, waitSeconds int) {
if logger != nil {
logger.Warn().Msgf("anilist: Rate limited, retrying in %d seconds", waitSeconds)
}
if time.Since(sentRateLimitWarningTime) <= 10*time.Second {
return
}
if events.GlobalWSEventManager != nil {
events.GlobalWSEventManager.SendEvent(events.WarningToast, "anilist: Rate limited, retrying in "+strconv.Itoa(waitSeconds)+" seconds")
}
sentRateLimitWarningTime = time.Now()
}
// customDoFunc is a custom request interceptor function that handles rate limiting and retries.
func (ac *AnilistClientImpl) customDoFunc(ctx context.Context, req *http.Request, gqlInfo *clientv2.GQLRequestInfo, res interface{}) (err error) {
@@ -280,60 +505,18 @@ func (ac *AnilistClientImpl) customDoFunc(ctx context.Context, req *http.Request
}
}()
client := http.DefaultClient
var resp *http.Response
retryCount := 2
for i := 0; i < retryCount; i++ {
// Reset response body for retry
if resp != nil && resp.Body != nil {
resp.Body.Close()
}
// Recreate the request body if it was read in a previous attempt
if req.GetBody != nil {
newBody, err := req.GetBody()
if err != nil {
return fmt.Errorf("failed to get request body: %w", err)
}
req.Body = newBody
}
resp, err = client.Do(req)
if err != nil {
return fmt.Errorf("request failed: %w", err)
}
rlRemainingStr = resp.Header.Get("X-Ratelimit-Remaining")
rlRetryAfterStr := resp.Header.Get("Retry-After")
//println("Remaining:", rlRemainingStr, " | RetryAfter:", rlRetryAfterStr)
// If we have a rate limit, sleep for the time
rlRetryAfter, err := strconv.Atoi(rlRetryAfterStr)
if err == nil {
ac.logger.Warn().Msgf("anilist: Rate limited, retrying in %d seconds", rlRetryAfter+1)
if time.Since(sentRateLimitWarningTime) > 10*time.Second {
if events.GlobalWSEventManager != nil {
events.GlobalWSEventManager.SendEvent(events.WarningToast, "anilist: Rate limited, retrying in "+strconv.Itoa(rlRetryAfter+1)+" seconds")
}
sentRateLimitWarningTime = time.Now()
}
select {
case <-time.After(time.Duration(rlRetryAfter+1) * time.Second):
continue
}
}
if rlRemainingStr == "" {
select {
case <-time.After(5 * time.Second):
continue
}
}
break
resp, rlRemainingStr, err = doAniListRequestWithRetries(
http.DefaultClient,
req,
sharedAniListRateBlocker,
sleepWithContext,
func(waitSeconds int) {
notifyAniListRateLimit(ac.logger, waitSeconds)
},
)
if err != nil {
return err
}
defer resp.Body.Close()

View File

@@ -1,14 +1,50 @@
package anilist
import (
"bytes"
"context"
"io"
"net/http"
"seanime/internal/util"
"testing"
"time"
"github.com/davecgh/go-spew/spew"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
type roundTripFunc func(req *http.Request) (*http.Response, error)
func (f roundTripFunc) RoundTrip(req *http.Request) (*http.Response, error) {
return f(req)
}
type testClock struct {
now time.Time
}
func (c *testClock) Now() time.Time {
return c.now
}
func (c *testClock) Advance(delay time.Duration) {
c.now = c.now.Add(delay)
}
func newAniListTestResponse(statusCode int, body string, headers map[string]string) *http.Response {
respHeaders := make(http.Header)
for key, value := range headers {
respHeaders.Set(key, value)
}
return &http.Response{
StatusCode: statusCode,
Header: respHeaders,
Body: io.NopCloser(bytes.NewBufferString(body)),
}
}
func TestGetAnimeById(t *testing.T) {
anilistClient := NewTestAnilistClient()
@@ -120,3 +156,122 @@ func TestListAnime(t *testing.T) {
})
}
}
func TestDoAniListRequestWithRetriesWaitsBetweenRateLimitedAttempts(t *testing.T) {
clock := &testClock{now: time.Date(2026, time.April, 7, 12, 0, 0, 0, time.UTC)}
rateBlocker := newAniListRateBlocker()
rateBlocker.now = clock.Now
requestBody := `{"query":"test"}`
requestBodies := make([]string, 0, 2)
sleepDurations := make([]time.Duration, 0, 1)
rateLimitWarnings := make([]int, 0, 1)
attempt := 0
client := &http.Client{Transport: roundTripFunc(func(req *http.Request) (*http.Response, error) {
attempt++
body, err := io.ReadAll(req.Body)
require.NoError(t, err)
requestBodies = append(requestBodies, string(body))
if attempt == 1 {
return newAniListTestResponse(http.StatusTooManyRequests, `{"errors":[{"message":"rate limited"}]}`, map[string]string{
"Date": clock.Now().Format(http.TimeFormat),
"Retry-After": "0",
}), nil
}
return newAniListTestResponse(http.StatusOK, `{"data":{"ok":true}}`, map[string]string{
"X-Ratelimit-Remaining": "9",
}), nil
})}
req, err := http.NewRequest(http.MethodPost, "https://anilist.test/graphql", bytes.NewBufferString(requestBody))
require.NoError(t, err)
resp, rlRemainingStr, err := doAniListRequestWithRetries(
client,
req,
rateBlocker,
func(ctx context.Context, delay time.Duration) error {
sleepDurations = append(sleepDurations, delay)
clock.Advance(delay)
return nil
},
func(waitSeconds int) {
rateLimitWarnings = append(rateLimitWarnings, waitSeconds)
},
)
require.NoError(t, err)
require.NotNil(t, resp)
defer resp.Body.Close()
assert.Equal(t, 2, attempt)
assert.Equal(t, []time.Duration{time.Second}, sleepDurations)
assert.Equal(t, []int{1}, rateLimitWarnings)
assert.Equal(t, []string{requestBody, requestBody}, requestBodies)
assert.Equal(t, "9", rlRemainingStr)
}
func TestDoAniListRequestWithRetriesDoesNotRetryWhenRateLimitHeadersAreMissing(t *testing.T) {
// without explicit rate-limit headers, the response should be returned as-is.
sleepDurations := make([]time.Duration, 0, 1)
attempt := 0
client := &http.Client{Transport: roundTripFunc(func(req *http.Request) (*http.Response, error) {
attempt++
return newAniListTestResponse(http.StatusOK, `{"data":{"ok":true}}`, nil), nil
})}
req, err := http.NewRequest(http.MethodPost, "https://anilist.test/graphql", bytes.NewBufferString(`{"query":"test"}`))
require.NoError(t, err)
resp, rlRemainingStr, err := doAniListRequestWithRetries(
client,
req,
nil,
func(ctx context.Context, delay time.Duration) error {
sleepDurations = append(sleepDurations, delay)
return nil
},
nil,
)
require.NoError(t, err)
require.NotNil(t, resp)
defer resp.Body.Close()
assert.Equal(t, 1, attempt)
assert.Empty(t, sleepDurations)
assert.Equal(t, "", rlRemainingStr)
}
func TestAniListRateBlockerWaitsUntilBlockExpires(t *testing.T) {
// once blocked, later requests should wait until the shared block expires.
clock := &testClock{now: time.Date(2026, time.April, 7, 12, 0, 10, 0, time.UTC)}
rateBlocker := newAniListRateBlocker()
rateBlocker.now = clock.Now
require.True(t, rateBlocker.BlockUntil(clock.Now().Add(18*time.Second)))
sleepDurations := make([]time.Duration, 0, 1)
err := rateBlocker.Wait(context.Background(), func(ctx context.Context, delay time.Duration) error {
sleepDurations = append(sleepDurations, delay)
clock.Advance(delay)
return nil
})
require.NoError(t, err)
assert.Equal(t, []time.Duration{18 * time.Second}, sleepDurations)
}
func TestAniListRateBlockerIgnoresDuplicateOrShorterBlocks(t *testing.T) {
// concurrent 429s with the same reset should not re-announce the same block repeatedly.
clock := &testClock{now: time.Date(2026, time.April, 7, 12, 0, 20, 0, time.UTC)}
rateBlocker := newAniListRateBlocker()
rateBlocker.now = clock.Now
blockedUntil := clock.Now().Add(18 * time.Second)
assert.True(t, rateBlocker.BlockUntil(blockedUntil))
assert.False(t, rateBlocker.BlockUntil(blockedUntil))
assert.False(t, rateBlocker.BlockUntil(clock.Now().Add(5*time.Second)))
assert.True(t, rateBlocker.BlockUntil(clock.Now().Add(25*time.Second)))
}

View File

@@ -7,9 +7,7 @@ import (
"fmt"
"net/http"
"seanime/internal/constants"
"seanime/internal/events"
"seanime/internal/util"
"strconv"
"time"
"github.com/goccy/go-json"
@@ -47,8 +45,6 @@ func customQuery(body []byte, logger *zerolog.Logger, token ...string) (data int
err = errors.New("panic in customQuery")
})
client := http.DefaultClient
var req *http.Request
req, err = http.NewRequest("POST", constants.AnilistApiUrl, bytes.NewBuffer(body))
if err != nil {
@@ -61,54 +57,18 @@ func customQuery(body []byte, logger *zerolog.Logger, token ...string) (data int
req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", token[0]))
}
// Send request
retryCount := 2
var resp *http.Response
for i := 0; i < retryCount; i++ {
// Reset response body for retry
if resp != nil && resp.Body != nil {
resp.Body.Close()
}
// Recreate the request body if it was read in a previous attempt
if req.GetBody != nil {
newBody, err := req.GetBody()
if err != nil {
return nil, fmt.Errorf("failed to get request body: %w", err)
}
req.Body = newBody
}
resp, err = client.Do(req)
if err != nil {
return nil, fmt.Errorf("request failed: %w", err)
}
rlRemainingStr = resp.Header.Get("X-Ratelimit-Remaining")
rlRetryAfterStr := resp.Header.Get("Retry-After")
rlRetryAfter, err := strconv.Atoi(rlRetryAfterStr)
if err == nil {
logger.Warn().Msgf("anilist: Rate limited, retrying in %d seconds", rlRetryAfter+1)
if time.Since(sentRateLimitWarningTime) > 10*time.Second {
events.GlobalWSEventManager.SendEvent(events.WarningToast, "anilist: Rate limited, retrying in "+strconv.Itoa(rlRetryAfter+1)+" seconds")
sentRateLimitWarningTime = time.Now()
}
select {
case <-time.After(time.Duration(rlRetryAfter+1) * time.Second):
continue
}
}
if rlRemainingStr == "" {
select {
case <-time.After(5 * time.Second):
continue
}
}
break
resp, rlRemainingStr, err = doAniListRequestWithRetries(
http.DefaultClient,
req,
sharedAniListRateBlocker,
sleepWithContext,
func(waitSeconds int) {
notifyAniListRateLimit(logger, waitSeconds)
},
)
if err != nil {
return nil, err
}
defer resp.Body.Close()

View File

@@ -39,9 +39,11 @@ type SimulatedPlatform struct {
collectionMu sync.RWMutex // used to protect access to collections
lastAnimeCollectionRefetchTime time.Time // used to prevent refetching too many times
lastMangaCollectionRefetchTime time.Time // used to prevent refetching too many times
refreshRateLimit *limiter.Limiter
helper *shared_platform.PlatformHelper
db *db.Database
refreshRateLimit *limiter.Limiter
refreshAnimeMetadataCancelFunc context.CancelFunc
refreshMangaMetadataCancelFunc context.CancelFunc
}
func NewSimulatedPlatform(localManager local.Manager, client *util.Ref[anilist.AnilistClient], extensionBankRef *util.Ref[*extension.UnifiedBank], logger *zerolog.Logger, db *db.Database) (platform.Platform, error) {
@@ -49,7 +51,7 @@ func NewSimulatedPlatform(localManager local.Manager, client *util.Ref[anilist.A
logger: logger,
localManager: localManager,
client: shared_platform.NewCacheLayer(client),
refreshRateLimit: limiter.NewLimiter(time.Second, 1),
refreshRateLimit: limiter.NewLimiter(2*time.Second, 1),
helper: shared_platform.NewPlatformHelper(extensionBankRef, db, logger),
db: db,
}
@@ -746,6 +748,13 @@ func (sp *SimulatedPlatform) GetAnimeAiringSchedule(ctx context.Context) (*anili
}
func (sp *SimulatedPlatform) refreshAnimeCollectionMetadata(ctx context.Context, collection *anilist.AnimeCollection) error {
if sp.refreshAnimeMetadataCancelFunc != nil {
sp.refreshAnimeMetadataCancelFunc()
}
var fctx context.Context
fctx, sp.refreshAnimeMetadataCancelFunc = context.WithCancel(ctx)
defer sp.refreshAnimeMetadataCancelFunc()
mediaIDs := collectRefreshableAnimeIDs(collection)
if len(mediaIDs) == 0 {
return nil
@@ -755,7 +764,7 @@ func (sp *SimulatedPlatform) refreshAnimeCollectionMetadata(ctx context.Context,
wrapper := sp.GetAnimeCollectionWrapper()
for _, mediaID := range mediaIDs {
if err := ctx.Err(); err != nil {
if err := fctx.Err(); err != nil {
return err
}
@@ -790,6 +799,13 @@ func (sp *SimulatedPlatform) refreshAnimeCollectionMetadata(ctx context.Context,
}
func (sp *SimulatedPlatform) refreshMangaCollectionMetadata(ctx context.Context, collection *anilist.MangaCollection) error {
if sp.refreshMangaMetadataCancelFunc != nil {
sp.refreshMangaMetadataCancelFunc()
}
var fctx context.Context
fctx, sp.refreshMangaMetadataCancelFunc = context.WithCancel(ctx)
defer sp.refreshMangaMetadataCancelFunc()
mediaIDs := collectRefreshableMangaIDs(collection)
if len(mediaIDs) == 0 {
return nil
@@ -799,7 +815,7 @@ func (sp *SimulatedPlatform) refreshMangaCollectionMetadata(ctx context.Context,
wrapper := sp.GetMangaCollectionWrapper()
for _, mediaID := range mediaIDs {
if err := ctx.Err(); err != nil {
if err := fctx.Err(); err != nil {
return err
}
@@ -891,7 +907,7 @@ func shouldRefreshSimulatedMedia(entryStatus *anilist.MediaListStatus, mediaStat
}
switch *entryStatus {
case anilist.MediaListStatusCurrent, anilist.MediaListStatusPaused, anilist.MediaListStatusPlanning:
case anilist.MediaListStatusCurrent, anilist.MediaListStatusPlanning:
default:
return false
}