Add throttling and cross-rate for twelve data (#1396)

* Add throttling and cross-rate for twelve data

* FIX yahoo precision also

* FIXES

* Update importer.rb

* Fixes

* Revert job

* Fixes
This commit is contained in:
soky srm
2026-04-07 20:46:05 +02:00
committed by GitHub
parent ec1562782b
commit be42988adf
10 changed files with 379 additions and 61 deletions

View File

@@ -24,18 +24,12 @@ class Account::MarketDataImporter
.each do |source_currency, date|
key = [ source_currency, account.currency ]
pair_dates[key] = [ pair_dates[key], date ].compact.min
inverse_key = [ account.currency, source_currency ]
pair_dates[inverse_key] = [ pair_dates[inverse_key], date ].compact.min
end
# 2. ACCOUNT-BASED PAIR convert the account currency to the family currency (if different)
if foreign_account?
key = [ account.currency, account.family.currency ]
pair_dates[key] = [ pair_dates[key], account.start_date ].compact.min
inverse_key = [ account.family.currency, account.currency ]
pair_dates[inverse_key] = [ pair_dates[inverse_key], account.start_date ].compact.min
end
pair_dates.each do |(source, target), start_date|

View File

@@ -15,6 +15,7 @@ class ExchangeRate::Importer
def import_provider_rates
if !clear_cache && all_rates_exist?
Rails.logger.info("No new rates to sync for #{from} to #{to} between #{start_date} and #{end_date}, skipping")
backfill_inverse_rates_if_needed
return
end
@@ -59,6 +60,25 @@ class ExchangeRate::Importer
end
upsert_rows(gapfilled_rates)
# Compute and upsert inverse rates (e.g., EUR→USD from USD→EUR) to avoid
# separate API calls for the reverse direction.
inverse_rates = gapfilled_rates.filter_map do |row|
next if row[:rate].to_f <= 0
{
from_currency: row[:to_currency],
to_currency: row[:from_currency],
date: row[:date],
rate: (BigDecimal("1") / BigDecimal(row[:rate].to_s)).round(12)
}
end
upsert_rows(inverse_rates)
# Also backfill inverse rows for any forward rates that existed in the DB
# before effective_start_date (i.e. dates not covered by gapfilled_rates).
backfill_inverse_rates_if_needed
end
private
@@ -84,7 +104,7 @@ class ExchangeRate::Importer
# Since provider may not return values on weekends and holidays, we grab the first rate from the provider that is on or before the start date
def start_rate_value
provider_rate_value = provider_rates.select { |date, _| date <= start_date }.max_by { |date, _| date }&.last
provider_rate_value = provider_rates.select { |date, _| date <= start_date }.max_by { |date, _| date }&.last&.rate
db_rate_value = db_rates[start_date]&.rate
provider_rate_value || db_rate_value
end
@@ -128,6 +148,28 @@ class ExchangeRate::Importer
end
end
# When forward rates already exist but inverse rates are missing (e.g. from a
# deployment before inverse computation was added), backfill them from the DB
# without making any provider API calls.
def backfill_inverse_rates_if_needed
existing_inverse_dates = ExchangeRate.where(from_currency: to, to_currency: from, date: start_date..end_date).pluck(:date).to_set
return if existing_inverse_dates.size >= expected_count
inverse_rows = db_rates.filter_map do |_date, rate|
next if existing_inverse_dates.include?(rate.date)
next if rate.rate.to_f <= 0
{
from_currency: to,
to_currency: from,
date: rate.date,
rate: (BigDecimal("1") / BigDecimal(rate.rate.to_s)).round(12)
}
end
upsert_rows(inverse_rows) if inverse_rows.any?
end
def all_rates_exist?
db_count == expected_count
end

View File

@@ -69,14 +69,37 @@ module ExchangeRate::Provided
return 0
end
ExchangeRate::Importer.new(
exchange_rate_provider: provider,
from: from,
to: to,
start_date: start_date,
end_date: end_date,
clear_cache: clear_cache
).import_provider_rates
# Prevent concurrent syncs from fetching the same currency pair for overlapping
# date ranges. The lock is scoped to (pair + start_date) so that a broader range
# (e.g. daily job needing older history) is not blocked by a narrower account sync.
#
# Uses an owner-token pattern: the lock value is a unique token so the ensure
# block only deletes its own lock, not one acquired by a different worker after
# expiry. TTL is 5 minutes to cover worst-case throttle + rate-limit retry waits
# (~3 minutes with TwelveData).
lock_key = "exchange_rate_import:#{from}:#{to}:#{start_date}"
lock_token = SecureRandom.uuid
acquired = Rails.cache.write(lock_key, lock_token, expires_in: 5.minutes, unless_exist: true)
unless acquired
Rails.logger.info("Skipping exchange rate import for #{from}/#{to} from #{start_date} — already in progress")
return 0
end
begin
ExchangeRate::Importer.new(
exchange_rate_provider: provider,
from: from,
to: to,
start_date: start_date,
end_date: end_date,
clear_cache: clear_cache
).import_provider_rates
ensure
# Only delete the lock if we still own it (it hasn't expired and been
# re-acquired by another worker).
Rails.cache.delete(lock_key) if Rails.cache.read(lock_key) == lock_token
end
end
end
end

View File

@@ -76,9 +76,6 @@ class MarketDataImporter
.each do |(source, target), date|
key = [ source, target ]
pair_dates[key] = [ pair_dates[key], date ].compact.min
inverse_key = [ target, source ]
pair_dates[inverse_key] = [ pair_dates[inverse_key], date ].compact.min
end
# 2. ACCOUNT-BASED PAIRS use the account's oldest entry date
@@ -94,9 +91,6 @@ class MarketDataImporter
key = [ account.source, account.target ]
pair_dates[key] = [ pair_dates[key], chosen_date ].compact.min
inverse_key = [ account.target, account.source ]
pair_dates[inverse_key] = [ pair_dates[inverse_key], chosen_date ].compact.min
end
# Convert to array of hashes for ease of use

View File

@@ -6,6 +6,10 @@ class Provider::TwelveData < Provider
Error = Class.new(Provider::Error)
InvalidExchangeRateError = Class.new(Error)
InvalidSecurityPriceError = Class.new(Error)
RateLimitError = Class.new(Error)
# Minimum delay between requests to avoid rate limiting (in seconds)
MIN_REQUEST_INTERVAL = 1.0
# Pattern to detect plan upgrade errors in API responses
PLAN_UPGRADE_PATTERN = /available starting with (\w+)/i
@@ -59,20 +63,23 @@ class Provider::TwelveData < Provider
def fetch_exchange_rate(from:, to:, date:)
with_provider_response do
throttle_request
response = client.get("#{base_url}/exchange_rate") do |req|
req.params["symbol"] = "#{from}/#{to}"
req.params["date"] = date.to_s
end
rate = JSON.parse(response.body).dig("rate")
parsed = JSON.parse(response.body)
check_api_error!(parsed)
Rate.new(date: date.to_date, from:, to:, rate: rate)
Rate.new(date: date.to_date, from:, to:, rate: parsed.dig("rate"))
end
end
def fetch_exchange_rates(from:, to:, start_date:, end_date:)
with_provider_response do
# Try to fetch the currency pair via the time_series API (consumes 1 credit) - this might not return anything as the API does not provide time series data for all possible currency pairs
throttle_request
response = client.get("#{base_url}/time_series") do |req|
req.params["symbol"] = "#{from}/#{to}"
req.params["start_date"] = start_date.to_s
@@ -81,11 +88,13 @@ class Provider::TwelveData < Provider
end
parsed = JSON.parse(response.body)
check_api_error!(parsed)
data = parsed.dig("values")
# If currency pair is not available, try to fetch via the time_series/cross API (consumes 5 credits)
if data.nil?
Rails.logger.info("#{self.class.name}: Currency pair #{from}/#{to} not available, fetching via time_series/cross API")
throttle_request(credits: 5)
response = client.get("#{base_url}/time_series/cross") do |req|
req.params["base"] = from
req.params["quote"] = to
@@ -95,6 +104,7 @@ class Provider::TwelveData < Provider
end
parsed = JSON.parse(response.body)
check_api_error!(parsed)
data = parsed.dig("values")
end
@@ -123,12 +133,14 @@ class Provider::TwelveData < Provider
def search_securities(symbol, country_code: nil, exchange_operating_mic: nil)
with_provider_response do
throttle_request
response = client.get("#{base_url}/symbol_search") do |req|
req.params["symbol"] = symbol
req.params["outputsize"] = 25
end
parsed = JSON.parse(response.body)
check_api_error!(parsed)
data = parsed.dig("data")
if data.nil?
@@ -153,19 +165,23 @@ class Provider::TwelveData < Provider
def fetch_security_info(symbol:, exchange_operating_mic:)
with_provider_response do
throttle_request
response = client.get("#{base_url}/profile") do |req|
req.params["symbol"] = symbol
req.params["mic_code"] = exchange_operating_mic
end
profile = JSON.parse(response.body)
check_api_error!(profile)
throttle_request
response = client.get("#{base_url}/logo") do |req|
req.params["symbol"] = symbol
req.params["mic_code"] = exchange_operating_mic
end
logo = JSON.parse(response.body)
check_api_error!(logo)
SecurityInfo.new(
symbol: symbol,
@@ -191,6 +207,7 @@ class Provider::TwelveData < Provider
def fetch_security_prices(symbol:, exchange_operating_mic: nil, start_date:, end_date:)
with_provider_response do
throttle_request
response = client.get("#{base_url}/time_series") do |req|
req.params["symbol"] = symbol
req.params["mic_code"] = exchange_operating_mic
@@ -200,6 +217,7 @@ class Provider::TwelveData < Provider
end
parsed = JSON.parse(response.body)
check_api_error!(parsed)
values = parsed.dig("values")
if values.nil?
@@ -237,10 +255,11 @@ class Provider::TwelveData < Provider
def client
@client ||= Faraday.new(url: base_url, ssl: self.class.faraday_ssl_options) do |faraday|
faraday.request(:retry, {
max: 2,
interval: 0.05,
max: 3,
interval: 1.0,
interval_randomness: 0.5,
backoff_factor: 2
backoff_factor: 2,
exceptions: Faraday::Retry::Middleware::DEFAULT_EXCEPTIONS + [ Faraday::ConnectionFailed ]
})
faraday.request :json
@@ -248,4 +267,69 @@ class Provider::TwelveData < Provider
faraday.headers["Authorization"] = "apikey #{api_key}"
end
end
# Paces API requests to stay within TwelveData's rate limits. Sleeps inline
# because the API physically cannot be called faster — this is unavoidable
# with a rate-limited provider. The 5-minute cache lock TTL in
# ExchangeRate::Provided accounts for worst-case throttle waits.
def throttle_request(credits: 1)
# Layer 1: Per-instance minimum interval between calls
@last_request_time ||= Time.at(0)
elapsed = Time.current - @last_request_time
sleep_time = min_request_interval - elapsed
sleep(sleep_time) if sleep_time > 0
# Layer 2: Global per-minute credit counter via cache (Redis in prod).
# Read current usage first — if adding these credits would exceed the limit,
# wait for the next minute BEFORE incrementing. This ensures credits are
# charged to the minute the request actually fires in, not a stale minute
# we slept through (which would undercount the new minute's usage).
minute_key = "twelve_data:credits:#{Time.current.to_i / 60}"
current_count = Rails.cache.read(minute_key).to_i
if current_count + credits > max_requests_per_minute
wait_seconds = 60 - (Time.current.to_i % 60) + 1
Rails.logger.info("TwelveData: #{current_count + credits}/#{max_requests_per_minute} credits this minute, waiting #{wait_seconds}s")
sleep(wait_seconds)
end
# Charge credits to the minute the request actually fires in
active_minute_key = "twelve_data:credits:#{Time.current.to_i / 60}"
Rails.cache.increment(active_minute_key, credits, expires_in: 120.seconds)
# Set timestamp after all waits so the next call's 1s pacing is measured
# from when this request actually fires, not from before the minute wait.
@last_request_time = Time.current
end
def min_request_interval
ENV.fetch("TWELVE_DATA_MIN_REQUEST_INTERVAL", MIN_REQUEST_INTERVAL).to_f
end
def max_requests_per_minute
ENV.fetch("TWELVE_DATA_MAX_REQUESTS_PER_MINUTE", 7).to_i
end
def check_api_error!(parsed)
return unless parsed.is_a?(Hash) && parsed["code"].present?
if parsed["code"] == 429
raise RateLimitError, parsed["message"] || "Rate limit exceeded"
end
raise Error, "API error (code: #{parsed["code"]}): #{parsed["message"] || "Unknown error"}"
end
def default_error_transformer(error)
case error
when RateLimitError
error
when Faraday::TooManyRequestsError
RateLimitError.new("TwelveData rate limit exceeded", details: error.response&.dig(:body))
when Faraday::Error
self.class::Error.new(error.message, details: error.response&.dig(:body))
else
self.class::Error.new(error.message)
end
end
end

View File

@@ -438,7 +438,7 @@ class Provider::YahooFinance < Provider
date: Time.at(timestamp).utc.to_date,
from: from,
to: to,
rate: (1.0 / close_rate.to_f).round(8)
rate: (BigDecimal("1") / BigDecimal(close_rate.to_s)).round(12)
)
end

View File

@@ -41,6 +41,7 @@ class Account::MarketDataImporterTest < ActiveSupport::TestCase
expected_start_date = (existing_date + 1.day) - EXCHANGE_RATE_BUFFER
end_date = Date.current.in_time_zone("America/New_York").to_date
# Only the forward pair (CAD→USD) should be fetched; inverse (USD→CAD) is computed automatically
@provider.expects(:fetch_exchange_rates)
.with(from: "CAD",
to: "USD",
@@ -50,20 +51,15 @@ class Account::MarketDataImporterTest < ActiveSupport::TestCase
OpenStruct.new(from: "CAD", to: "USD", date: existing_date, rate: 1.5)
]))
@provider.expects(:fetch_exchange_rates)
.with(from: "USD",
to: "CAD",
start_date: expected_start_date,
end_date: end_date)
.returns(provider_success_response([
OpenStruct.new(from: "USD", to: "CAD", date: existing_date, rate: 0.67)
]))
before = ExchangeRate.count
Account::MarketDataImporter.new(account).import_all
after = ExchangeRate.count
assert_operator after, :>, before + 1, "Should insert at least two new exchange-rate rows"
assert_operator after, :>, before + 1, "Should insert at least two new exchange-rate rows (forward + computed inverse)"
# Verify inverse rates were computed from the forward rates
assert ExchangeRate.where(from_currency: "USD", to_currency: "CAD").where("date > ?", existing_date).exists?,
"Inverse rates should be computed automatically"
end
test "syncs security prices for securities traded by the account" do
@@ -237,7 +233,7 @@ class Account::MarketDataImporterTest < ActiveSupport::TestCase
expected_start_date = (existing_date + 1.day) - EXCHANGE_RATE_BUFFER
end_date = Date.current.in_time_zone("America/New_York").to_date
# Simulate provider returning an error response
# Only the forward pair (CAD→USD) should be fetched; inverse is computed automatically
@provider.expects(:fetch_exchange_rates)
.with(from: "CAD",
to: "USD",
@@ -247,15 +243,6 @@ class Account::MarketDataImporterTest < ActiveSupport::TestCase
Provider::TwelveData::Error.new("Rate limit exceeded", details: { code: 429, message: "Rate limit exceeded" })
))
@provider.expects(:fetch_exchange_rates)
.with(from: "USD",
to: "CAD",
start_date: expected_start_date,
end_date: end_date)
.returns(provider_error_response(
Provider::TwelveData::Error.new("Rate limit exceeded", details: { code: 429, message: "Rate limit exceeded" })
))
before = ExchangeRate.count
# Should not raise an error, just log and continue

View File

@@ -61,9 +61,9 @@ class ExchangeRate::ImporterTest < ActiveSupport::TestCase
end_date: Date.current
).import_provider_rates
db_rates = ExchangeRate.order(:date)
assert_equal 4, db_rates.count
assert_equal [ 1.2, 1.25, 1.3, 1.3 ], db_rates.map(&:rate)
forward_rates = ExchangeRate.where(from_currency: "USD", to_currency: "EUR").order(:date)
assert_equal 4, forward_rates.count
assert_equal [ 1.2, 1.25, 1.3, 1.3 ], forward_rates.map(&:rate)
end
test "no provider calls when all rates exist" do
@@ -137,7 +137,57 @@ class ExchangeRate::ImporterTest < ActiveSupport::TestCase
end_date: future_date
).import_provider_rates
assert_equal 1, ExchangeRate.count
# 1 forward rate + 1 inverse rate
assert_equal 2, ExchangeRate.count
end
test "upserts inverse rates alongside forward rates" do
ExchangeRate.delete_all
provider_response = provider_success_response([
OpenStruct.new(from: "USD", to: "EUR", date: Date.current, rate: 0.85)
])
@provider.expects(:fetch_exchange_rates)
.with(from: "USD", to: "EUR", start_date: get_provider_fetch_start_date(Date.current), end_date: Date.current)
.returns(provider_response)
ExchangeRate::Importer.new(
exchange_rate_provider: @provider,
from: "USD",
to: "EUR",
start_date: Date.current,
end_date: Date.current
).import_provider_rates
forward = ExchangeRate.find_by(from_currency: "USD", to_currency: "EUR", date: Date.current)
inverse = ExchangeRate.find_by(from_currency: "EUR", to_currency: "USD", date: Date.current)
assert_not_nil forward, "Forward rate should be stored"
assert_not_nil inverse, "Inverse rate should be computed and stored"
assert_in_delta 0.85, forward.rate.to_f, 0.0001
assert_in_delta (1.0 / 0.85), inverse.rate.to_f, 0.0001
end
test "handles rate limit error gracefully" do
ExchangeRate.delete_all
rate_limit_error = Provider::TwelveData::RateLimitError.new("Rate limit exceeded")
@provider.expects(:fetch_exchange_rates).once.returns(
provider_error_response(rate_limit_error)
)
# Should not raise — logs warning and returns without importing
ExchangeRate::Importer.new(
exchange_rate_provider: @provider,
from: "USD",
to: "EUR",
start_date: Date.current,
end_date: Date.current
).import_provider_rates
assert_equal 0, ExchangeRate.count, "No rates should be imported on rate limit error"
end
private

View File

@@ -43,6 +43,7 @@ class MarketDataImporterTest < ActiveSupport::TestCase
expected_start_date = (SNAPSHOT_START_DATE + 1.day) - EXCHANGE_RATE_BUFFER
end_date = Date.current.in_time_zone("America/New_York").to_date
# Only the forward pair (CAD→USD) should be fetched; inverse (USD→CAD) is computed automatically
@provider.expects(:fetch_exchange_rates)
.with(from: "CAD",
to: "USD",
@@ -52,20 +53,11 @@ class MarketDataImporterTest < ActiveSupport::TestCase
OpenStruct.new(from: "CAD", to: "USD", date: SNAPSHOT_START_DATE, rate: 1.5)
]))
@provider.expects(:fetch_exchange_rates)
.with(from: "USD",
to: "CAD",
start_date: expected_start_date,
end_date: end_date)
.returns(provider_success_response([
OpenStruct.new(from: "USD", to: "CAD", date: SNAPSHOT_START_DATE, rate: 0.67)
]))
before = ExchangeRate.count
MarketDataImporter.new(mode: :snapshot).import_exchange_rates
after = ExchangeRate.count
assert_operator after, :>, before + 1, "Should insert at least two new exchange-rate rows"
assert_operator after, :>, before + 1, "Should insert at least two new exchange-rate rows (forward + computed inverse)"
end
test "syncs security prices" do

View File

@@ -0,0 +1,152 @@
require "test_helper"
class Provider::TwelveDataTest < ActiveSupport::TestCase
setup do
@provider = Provider::TwelveData.new("test_api_key")
end
# ================================
# Rate Limit Detection Tests
# ================================
test "detects rate limit from JSON body code 429" do
rate_limit_body = {
"code" => 429,
"message" => "You have run out of API credits for the current minute.",
"status" => "error"
}.to_json
mock_response = mock
mock_response.stubs(:body).returns(rate_limit_body)
@provider.stubs(:throttle_request)
@provider.stubs(:client).returns(mock_client = mock)
mock_client.stubs(:get).returns(mock_response)
result = @provider.fetch_exchange_rates(from: "USD", to: "EUR", start_date: Date.current, end_date: Date.current)
assert_not result.success?
assert_instance_of Provider::TwelveData::RateLimitError, result.error
end
test "detects rate limit on single exchange rate fetch" do
rate_limit_body = {
"code" => 429,
"message" => "Rate limit exceeded"
}.to_json
mock_response = mock
mock_response.stubs(:body).returns(rate_limit_body)
@provider.stubs(:throttle_request)
@provider.stubs(:client).returns(mock_client = mock)
mock_client.stubs(:get).returns(mock_response)
result = @provider.fetch_exchange_rate(from: "USD", to: "EUR", date: Date.current)
assert_not result.success?
assert_instance_of Provider::TwelveData::RateLimitError, result.error
end
test "does not fall through to cross API when rate limited" do
rate_limit_body = {
"code" => 429,
"message" => "Rate limit exceeded"
}.to_json
mock_response = mock
mock_response.stubs(:body).returns(rate_limit_body)
@provider.stubs(:throttle_request)
mock_client = mock
# Should only be called once (time_series), NOT a second time (time_series/cross)
mock_client.expects(:get).once.returns(mock_response)
@provider.stubs(:client).returns(mock_client)
result = @provider.fetch_exchange_rates(from: "USD", to: "EUR", start_date: Date.current, end_date: Date.current)
assert_not result.success?
assert_instance_of Provider::TwelveData::RateLimitError, result.error
end
# ================================
# Error Transformer Tests
# ================================
test "default_error_transformer preserves RateLimitError" do
error = Provider::TwelveData::RateLimitError.new("Rate limit exceeded")
result = @provider.send(:with_provider_response) { raise error }
assert_not result.success?
assert_instance_of Provider::TwelveData::RateLimitError, result.error
end
test "default_error_transformer converts Faraday 429 to RateLimitError" do
error = Faraday::TooManyRequestsError.new("Too Many Requests", { body: "Rate limited" })
result = @provider.send(:with_provider_response) { raise error }
assert_not result.success?
assert_instance_of Provider::TwelveData::RateLimitError, result.error
end
test "default_error_transformer wraps generic errors as Error" do
error = StandardError.new("Something went wrong")
result = @provider.send(:with_provider_response) { raise error }
assert_not result.success?
assert_instance_of Provider::TwelveData::Error, result.error
end
# ================================
# Throttle Tests
# ================================
test "throttle_request enforces minimum interval between calls" do
@provider.send(:instance_variable_set, :@last_request_time, Time.current)
# Stub sleep to capture the call without actually sleeping
sleep_called_with = nil
@provider.define_singleton_method(:sleep) { |duration| sleep_called_with = duration }
# Stub cache to return under limit (read returns current count, increment charges)
Rails.cache.stubs(:read).returns(0)
Rails.cache.stubs(:increment).returns(1)
@provider.send(:throttle_request)
assert_not_nil sleep_called_with, "Should have called sleep to enforce minimum interval"
assert_operator sleep_called_with, :>, 0
end
test "throttle_request waits when per-minute credit limit is exceeded" do
# Stub cache read to return count at limit (adding 1 more would exceed 7)
Rails.cache.stubs(:read).returns(7)
Rails.cache.stubs(:increment).returns(8)
sleep_called = false
@provider.define_singleton_method(:sleep) { |_duration| sleep_called = true }
@provider.send(:throttle_request)
assert sleep_called, "Should have called sleep when credit limit exceeded"
end
test "throttle_request does not wait when under credit limit" do
# Set last_request_time far in the past so per-instance throttle doesn't trigger
@provider.send(:instance_variable_set, :@last_request_time, Time.at(0))
# Stub cache to return under limit
Rails.cache.stubs(:read).returns(3)
Rails.cache.stubs(:increment).returns(4)
sleep_called = false
@provider.define_singleton_method(:sleep) { |_duration| sleep_called = true }
@provider.send(:throttle_request)
assert_not sleep_called, "Should not sleep when under credit limit"
end
end