(summary) add multi-tenant support and v2 tasks / API

Add multitenancy support to Summary sub-app. The V1 routes / tasks
behave like before, with the default tenant being "meet".

V2 routes / tasks support being called frm any tenant, and don't have
meet related logic.
V2 tasks are created in separate queues to avoid mix / match,i
This commit is contained in:
Florent Chehab
2026-03-17 14:23:51 +01:00
parent 7e422e5846
commit 5a70604f01
24 changed files with 1310 additions and 99 deletions

View File

@@ -304,7 +304,8 @@ jobs:
working-directory: src/summary
env:
APP_API_TOKEN: "test-api-token"
V1_TENANT_ID: 'test-tenant'
AUTHORIZED_TENANTS: '[{"id": "test-tenant", "api_key": "test-api-token", "webhook_url": "https://example.com/webhook", "webhook_api_key": "test-webhook-api-key"}]'
AWS_STORAGE_BUCKET_NAME: "http://meet-media-storage"
AWS_S3_ENDPOINT_URL: "minio:9000"
AWS_S3_ACCESS_KEY_ID: "meet"
@@ -316,8 +317,6 @@ jobs:
LLM_BASE_URL: "https://configure-your-url.com"
LLM_API_KEY: "test-llm-secret"
LLM_MODEL: "test-llm-model"
WEBHOOK_API_TOKEN: "test-webhook-secret"
WEBHOOK_URL: "https://configure-your-url.com"
steps:
- name: Checkout repository

View File

@@ -8,6 +8,10 @@ and this project adheres to
## [Unreleased]
### Added
- ✨(summary) add routes v2 for async STT and summary tasks #1171
## [1.13.0] - 2026-03-31
### Changed

View File

@@ -107,6 +107,7 @@ k8s_resource('minio-bucket', resource_deps=['minio'])
k8s_resource('meet-backend', resource_deps=['postgresql', 'minio', 'redis', 'livekit-livekit-server'])
k8s_resource('meet-celery-backend', resource_deps=['redis'])
k8s_resource('meet-celery-summarize', resource_deps=['redis'])
k8s_resource('meet-celery-summary-backend', resource_deps=['redis'])
k8s_resource('meet-celery-transcribe', resource_deps=['redis'])
k8s_resource('meet-backend-migrate', resource_deps=['meet-backend'])
k8s_resource('livekit-livekit-server', resource_deps=['redis'])

View File

@@ -24,7 +24,7 @@ _summaryEnvVars: &summaryEnvVars
secretKeyRef:
name: secret-dev
key: LLM_API_KEY
LLM_MODEL: meta-llama/Llama-3.1-8B-Instruct
LLM_MODEL: Qwen/Qwen3-Coder-30B-A3B-Instruct
WEBHOOK_API_TOKEN: password
WEBHOOK_URL: https://www.mock-impress.com/webhook/
CELERY_BROKER_URL: redis://default:pass@redis-master:6379/1

View File

@@ -6,6 +6,7 @@ _summaryEnvVars: &summaryEnvVars
AWS_S3_ACCESS_KEY_ID: meet
AWS_S3_SECRET_ACCESS_KEY: password
AWS_S3_SECURE_ACCESS: False
AUTHORIZED_TENANTS: '[{"id": "dictaphone", "api_key": "dictaphone_token", "webhook_url": "http://dictaphone-backend.dictaphone.svc.cluster.local/api/v1.0/files/ai-webhook/", "webhook_api_key": "token_summary"}]'
WHISPERX_API_KEY:
secretKeyRef:
name: secret-dev
@@ -24,13 +25,14 @@ _summaryEnvVars: &summaryEnvVars
secretKeyRef:
name: secret-dev
key: LLM_API_KEY
LLM_MODEL: meta-llama/Llama-3.1-8B-Instruct
LLM_MODEL: Qwen/Qwen3-Coder-30B-A3B-Instruct
WEBHOOK_API_TOKEN: password
WEBHOOK_URL: https://www.mock-impress.com/webhook/
CELERY_BROKER_URL: redis://default:pass@redis-master:6379/1
CELERY_RESULT_BACKEND: redis://default:pass@redis-master:6379/1
TASK_TRACKER_REDIS_URL: redis://default:pass@redis-master:6379/1
_summaryImage: &summaryImage
repository: localhost:5001/meet-summary
pullPolicy: Always
@@ -215,7 +217,7 @@ celeryTranscribe:
- "--pool=solo"
- "--loglevel=info"
- "-Q"
- "transcribe-queue"
- "transcribe-queue,transcribe-queue-v2"
celerySummarize:
replicas: 1
@@ -230,7 +232,27 @@ celerySummarize:
- "--pool=solo"
- "--loglevel=info"
- "-Q"
- "summarize-queue"
- "summarize-queue,summarize-queue-v2"
celerySummaryBackend:
replicas: 1
envVars:
<<: *summaryEnvVars
image:
repository: localhost:5001/meet-summary
pullPolicy: Always
tag: "latest"
command:
- "celery"
- "-A"
- "summary.core.celery_worker"
- "worker"
- "--pool=solo"
- "--loglevel=info"
- "-Q"
- "call-webhook-queue-v2"
agents:
replicas: 1

View File

@@ -202,6 +202,16 @@ Requires top level scope
{{ include "meet.fullname" . }}-celery-summarize
{{- end }}
{{/*
Full name for the Celery Summary Backend
Requires top level scope
*/}}
{{- define "meet.celerySummaryBackend.fullname" -}}
{{ include "meet.fullname" . }}-celery-summary-backend
{{- end }}
{{/*
Full name for the agents

View File

@@ -0,0 +1,153 @@
{{- $envVars := include "meet.common.env" (list . .Values.celerySummaryBackend) -}}
{{- $fullName := include "meet.celerySummaryBackend.fullname" . -}}
{{- $component := "celery-summary-backend" -}}
apiVersion: apps/v1
kind: Deployment
metadata:
name: {{ $fullName }}
annotations:
{{- with .Values.celerySummaryBackend.dpAnnotations }}
{{- toYaml . | nindent 4 }}
{{- end }}
namespace: {{ .Release.Namespace | quote }}
labels:
{{- include "meet.common.labels" (list . $component) | nindent 4 }}
spec:
replicas: {{ .Values.celerySummaryBackend.replicas }}
selector:
matchLabels:
{{- include "meet.common.selectorLabels" (list . $component) | nindent 6 }}
template:
metadata:
annotations:
{{- with .Values.celerySummaryBackend.podAnnotations }}
{{- toYaml . | nindent 8 }}
{{- end }}
labels:
{{- include "meet.common.selectorLabels" (list . $component) | nindent 8 }}
spec:
{{- if $.Values.image.credentials }}
imagePullSecrets:
- name: {{ include "meet.secret.dockerconfigjson.name" (dict "fullname" (include "meet.fullname" .) "imageCredentials" $.Values.image.credentials) }}
{{- end }}
shareProcessNamespace: {{ .Values.celerySummaryBackend.shareProcessNamespace }}
containers:
{{- with .Values.celerySummaryBackend.sidecars }}
{{- toYaml . | nindent 8 }}
{{- end }}
- name: {{ .Chart.Name }}
image: "{{ (.Values.celerySummaryBackend.image | default dict).repository | default .Values.image.repository }}:{{ (.Values.celerySummaryBackend.image | default dict).tag | default .Values.image.tag }}"
imagePullPolicy: {{ (.Values.celerySummaryBackend.image | default dict).pullPolicy | default .Values.image.pullPolicy }}
{{- with .Values.celerySummaryBackend.command }}
command:
{{- toYaml . | nindent 12 }}
{{- end }}
{{- with .Values.celerySummaryBackend.args }}
args:
{{- toYaml . | nindent 12 }}
{{- end }}
env:
{{- if $envVars }}
{{- $envVars | indent 12 }}
{{- end }}
{{- with .Values.celerySummaryBackend.securityContext }}
securityContext:
{{- toYaml . | nindent 12 }}
{{- end }}
ports:
- name: http
containerPort: {{ .Values.celerySummaryBackend.service.targetPort }}
protocol: TCP
{{- if .Values.celerySummaryBackend.probes.liveness }}
livenessProbe:
{{- include "meet.probes.abstract" (merge .Values.celerySummaryBackend.probes.liveness (dict "targetPort" .Values.celerySummaryBackend.service.targetPort )) | nindent 12 }}
{{- end }}
{{- if .Values.celerySummaryBackend.probes.readiness }}
readinessProbe:
{{- include "meet.probes.abstract" (merge .Values.celerySummaryBackend.probes.readiness (dict "targetPort" .Values.celerySummaryBackend.service.targetPort )) | nindent 12 }}
{{- end }}
{{- if .Values.celerySummaryBackend.probes.startup }}
startupProbe:
{{- include "meet.probes.abstract" (merge .Values.celerySummaryBackend.probes.startup (dict "targetPort" .Values.celerySummaryBackend.service.targetPort )) | nindent 12 }}
{{- end }}
{{- with .Values.celerySummaryBackend.resources }}
resources:
{{- toYaml . | nindent 12 }}
{{- end }}
volumeMounts:
{{- range $index, $value := .Values.mountFiles }}
- name: "files-{{ $index }}"
mountPath: {{ $value.path }}
subPath: content
{{- end }}
{{- range $name, $volume := .Values.celerySummaryBackend.persistence }}
- name: "{{ $name }}"
mountPath: "{{ $volume.mountPath }}"
{{- end }}
{{- range .Values.celerySummaryBackend.extraVolumeMounts }}
- name: {{ .name }}
mountPath: {{ .mountPath }}
subPath: {{ .subPath | default "" }}
readOnly: {{ .readOnly }}
{{- end }}
{{- with .Values.celerySummaryBackend.nodeSelector }}
nodeSelector:
{{- toYaml . | nindent 8 }}
{{- end }}
{{- with .Values.celerySummaryBackend.affinity }}
affinity:
{{- toYaml . | nindent 8 }}
{{- end }}
{{- with .Values.celerySummaryBackend.tolerations }}
tolerations:
{{- toYaml . | nindent 8 }}
{{- end }}
volumes:
{{- range $index, $value := .Values.mountFiles }}
- name: "files-{{ $index }}"
configMap:
name: "{{ include "meet.fullname" $ }}-files-{{ $index }}"
{{- end }}
{{- range $name, $volume := .Values.celerySummaryBackend.persistence }}
- name: "{{ $name }}"
{{- if eq $volume.type "emptyDir" }}
emptyDir: {}
{{- else }}
persistentVolumeClaim:
claimName: "{{ $fullName }}-{{ $name }}"
{{- end }}
{{- end }}
{{- range .Values.celerySummaryBackend.extraVolumes }}
- name: {{ .name }}
{{- if .existingClaim }}
persistentVolumeClaim:
claimName: {{ .existingClaim }}
{{- else if .hostPath }}
hostPath:
{{ toYaml .hostPath | nindent 12 }}
{{- else if .csi }}
csi:
{{- toYaml .csi | nindent 12 }}
{{- else if .configMap }}
configMap:
{{- toYaml .configMap | nindent 12 }}
{{- else if .emptyDir }}
emptyDir:
{{- toYaml .emptyDir | nindent 12 }}
{{- else }}
emptyDir: {}
{{- end }}
{{- end }}
---
{{ if .Values.celerySummaryBackend.pdb.enabled }}
apiVersion: policy/v1
kind: PodDisruptionBudget
metadata:
name: {{ $fullName }}
namespace: {{ .Release.Namespace | quote }}
spec:
maxUnavailable: 1
selector:
matchLabels:
{{- include "meet.common.selectorLabels" (list . $component) | nindent 6 }}
{{ end }}

View File

@@ -738,7 +738,7 @@ celeryTranscribe:
- "--pool=solo"
- "--loglevel=info"
- "-Q"
- "transcribe-queue"
- "transcribe-queue,transcribe-queue-v2"
## @param celeryTranscribe.args Override the celeryTranscribe container args
args: []
@@ -847,7 +847,7 @@ celerySummarize:
- "--pool=solo"
- "--loglevel=info"
- "-Q"
- "summarize-queue"
- "summarize-queue,summarize-queue-v2"
## @param celerySummarize.args Override the celerySummarize container args
args: []
@@ -933,6 +933,116 @@ celerySummarize:
pdb:
enabled: false
## @section celerySummaryBackend
celerySummaryBackend:
## @param celerySummaryBackend.image.repository Repository to use to pull meet's celerySummaryBackend container image
## @param celerySummaryBackend.image.tag meet's celerySummaryBackend container tag
## @param celerySummaryBackend.image.pullPolicy celerySummaryBackend container image pull policy
image:
repository: lasuite/meet-summary
pullPolicy: IfNotPresent
tag: "latest"
## @param celerySummaryBackend.dpAnnotations Annotations to add to the celerySummaryBackend Deployment
dpAnnotations: {}
## @param celerySummaryBackend.command Override the celerySummaryBackend container command
command:
- "celery"
- "-A"
- "summary.core.celery_worker"
- "worker"
- "--pool=solo"
- "--loglevel=info"
- "-Q"
- "call-webhook-queue-v2"
## @param celerySummaryBackend.args Override the celerySummaryBackend container args
args: []
## @param celerySummaryBackend.replicas Amount of celerySummaryBackend replicas
replicas: 1
## @param celerySummaryBackend.shareProcessNamespace Enable share process namespace between containers
shareProcessNamespace: false
## @param celerySummaryBackend.sidecars Add sidecars containers to celerySummaryBackend deployment
sidecars: []
## @param celerySummaryBackend.migrateJobAnnotations Annotations for the migrate job
migrateJobAnnotations: {}
## @param celerySummaryBackend.securityContext Configure celerySummaryBackend Pod security context
securityContext: null
## @param celerySummaryBackend.envVars Configure celerySummaryBackend container environment variables
## @extra celerySummaryBackend.envVars.BY_VALUE Example environment variable by setting value directly
## @extra celerySummaryBackend.envVars.FROM_CONFIGMAP.configMapKeyRef.name Name of a ConfigMap when configuring env vars from a ConfigMap
## @extra celerySummaryBackend.envVars.FROM_CONFIGMAP.configMapKeyRef.key Key within a ConfigMap when configuring env vars from a ConfigMap
## @extra celerySummaryBackend.envVars.FROM_SECRET.secretKeyRef.name Name of a Secret when configuring env vars from a Secret
## @extra celerySummaryBackend.envVars.FROM_SECRET.secretKeyRef.key Key within a Secret when configuring env vars from a Secret
## @skip celerySummaryBackend.envVars
envVars:
<<: *commonEnvVars
## @param celerySummaryBackend.podAnnotations Annotations to add to the celerySummaryBackend Pod
podAnnotations: {}
## @param celerySummaryBackend.service.type celerySummaryBackend Service type
## @param celerySummaryBackend.service.port celerySummaryBackend Service listening port
## @param celerySummaryBackend.service.targetPort celerySummaryBackend container listening port
## @param celerySummaryBackend.service.annotations Annotations to add to the celerySummaryBackend Service
service:
type: ClusterIP
port: 80
targetPort: 8000
annotations: {}
## @param celerySummaryBackend.probes Configure celerySummaryBackend probes
## @param celerySummaryBackend.probes.liveness.path [nullable] Configure path for celerySummaryBackend HTTP liveness probe
## @param celerySummaryBackend.probes.liveness.targetPort [nullable] Configure port for celerySummaryBackend HTTP liveness probe
## @param celerySummaryBackend.probes.liveness.initialDelaySeconds [nullable] Configure initial delay for celerySummaryBackend liveness probe
## @param celerySummaryBackend.probes.liveness.initialDelaySeconds [nullable] Configure timeout for celerySummaryBackend liveness probe
## @param celerySummaryBackend.probes.startup.path [nullable] Configure path for celerySummaryBackend HTTP startup probe
## @param celerySummaryBackend.probes.startup.targetPort [nullable] Configure port for celerySummaryBackend HTTP startup probe
## @param celerySummaryBackend.probes.startup.initialDelaySeconds [nullable] Configure initial delay for celerySummaryBackend startup probe
## @param celerySummaryBackend.probes.startup.initialDelaySeconds [nullable] Configure timeout for celerySummaryBackend startup probe
## @param celerySummaryBackend.probes.readiness.path [nullable] Configure path for celerySummaryBackend HTTP readiness probe
## @param celerySummaryBackend.probes.readiness.targetPort [nullable] Configure port for celerySummaryBackend HTTP readiness probe
## @param celerySummaryBackend.probes.readiness.initialDelaySeconds [nullable] Configure initial delay for celerySummaryBackend readiness probe
## @param celerySummaryBackend.probes.readiness.initialDelaySeconds [nullable] Configure timeout for celerySummaryBackend readiness probe
probes: {}
## @param celerySummaryBackend.resources Resource requirements for the celerySummaryBackend container
resources: {}
## @param celerySummaryBackend.nodeSelector Node selector for the celerySummaryBackend Pod
nodeSelector: {}
## @param celerySummaryBackend.tolerations Tolerations for the celerySummaryBackend Pod
tolerations: []
## @param celerySummaryBackend.affinity Affinity for the celerySummaryBackend Pod
affinity: {}
## @param celerySummaryBackend.persistence Additional volumes to create and mount on the celerySummaryBackend. Used for debugging purposes
## @extra celerySummaryBackend.persistence.volume-name.size Size of the additional volume
## @extra celerySummaryBackend.persistence.volume-name.type Type of the additional volume, persistentVolumeClaim or emptyDir
## @extra celerySummaryBackend.persistence.volume-name.mountPath Path where the volume should be mounted to
persistence: {}
## @param celerySummaryBackend.extraVolumeMounts Additional volumes to mount on the celerySummaryBackend.
extraVolumeMounts: []
## @param celerySummaryBackend.extraVolumes Additional volumes to mount on the celerySummaryBackend.
extraVolumes: []
## @param celerySummaryBackend.pdb.enabled Enable pdb on celerySummaryBackend
pdb:
enabled: false
## @section agents
agents:

View File

@@ -2,8 +2,11 @@
from fastapi import APIRouter, Depends
from summary.api.route import tasks
from summary.core.security import verify_token
from summary.api.route import tasks, tasks_v2
from summary.core.security import verify_tenant_api_key
api_router = APIRouter(dependencies=[Depends(verify_token)])
api_router.include_router(tasks.router, tags=["tasks"])
api_router_v1 = APIRouter(dependencies=[Depends(verify_tenant_api_key)])
api_router_v1.include_router(tasks.router_tasks_v1, tags=["tasks"])
api_router_v2 = APIRouter(dependencies=[Depends(verify_tenant_api_key)])
api_router_v2.include_router(tasks_v2.router_tasks_v2, tags=["tasks"])

View File

@@ -42,10 +42,10 @@ class TranscribeSummarizeTaskCreation(BaseModel):
return v
router = APIRouter(prefix="/tasks")
router_tasks_v1 = APIRouter(prefix="/tasks")
@router.post("/")
@router_tasks_v1.post("/")
async def create_transcribe_summarize_task(request: TranscribeSummarizeTaskCreation):
"""Create a transcription and summarization task."""
task = process_audio_transcribe_summarize_v2.apply_async(
@@ -68,7 +68,7 @@ async def create_transcribe_summarize_task(request: TranscribeSummarizeTaskCreat
return {"id": task.id, "message": "Task created"}
@router.get("/{task_id}")
@router_tasks_v1.get("/{task_id}")
async def get_task_status(task_id: str):
"""Check task status by ID."""
task = AsyncResult(task_id)

View File

@@ -0,0 +1,61 @@
"""API routes related to application tasks (V2 / tenant friendly)."""
from celery.result import AsyncResult
from fastapi import APIRouter, Depends, HTTPException
from summary.core.celery_worker import (
process_audio_transcribe_v2_task,
summarize_v2_task,
)
from summary.core.config import AuthorizedTenant
from summary.core.models import SummarizeTaskV2Request, TranscribeTaskV2Request
from summary.core.security import verify_tenant_api_key_v2
router_tasks_v2 = APIRouter()
@router_tasks_v2.post("/async-jobs/transcribe")
async def create_transcribe_task_v2(
request: TranscribeTaskV2Request,
request_tenant: AuthorizedTenant = Depends(verify_tenant_api_key_v2),
):
"""Create a transcription task."""
task = process_audio_transcribe_v2_task.apply_async(
args=[{**request.model_dump(), "tenant_id": request_tenant.id}]
)
return {"job_id": task.id, "message": "Transcribe job created"}
@router_tasks_v2.post("/async-jobs/summarize")
async def create_summarize_task_v2(
request: SummarizeTaskV2Request,
request_tenant: AuthorizedTenant = Depends(verify_tenant_api_key_v2),
):
"""Create a summarization task."""
task = summarize_v2_task.apply_async(
args=[{**request.model_dump(), "tenant_id": request_tenant.id}]
)
return {"job_id": task.id, "message": "Summarize job created"}
@router_tasks_v2.get("/async-jobs/transcribe/{job_id}")
@router_tasks_v2.get("/async-jobs/summarize/{job_id}")
async def get_task_status(
job_id: str,
request_tenant: AuthorizedTenant = Depends(verify_tenant_api_key_v2),
):
"""Check task status by ID."""
task = AsyncResult(job_id)
try:
if (
isinstance(task.args, (list, tuple))
and len(task.args) > 0
and isinstance(task.args[0], dict)
and task.args[0].get("tenant_id") == request_tenant.id
):
return {"job_id": job_id, "status": task.status}
except (TypeError, KeyError):
pass
raise HTTPException(status_code=404, detail="Not found")

View File

@@ -17,6 +17,10 @@ from summary.core.config import get_settings
from summary.core.file_service import FileService, FileServiceException
from summary.core.llm_service import LLMException, LLMObservability, LLMService
from summary.core.locales import get_locale
from summary.core.models import (
SummarizeTaskV2Payload,
TranscribeTaskV2Payload,
)
from summary.core.prompt import (
FORMAT_NEXT_STEPS,
FORMAT_PLAN,
@@ -27,8 +31,19 @@ from summary.core.prompt import (
PROMPT_SYSTEM_TLDR,
PROMPT_USER_PART,
)
from summary.core.shared_models import (
SummarizeWebhookFailurePayload,
SummarizeWebhookSuccessPayload,
TranscribeWebhookFailurePayload,
TranscribeWebhookSuccessPayload,
WhisperXResponse,
webhook_payload_adapter,
)
from summary.core.transcript_formatter import TranscriptFormatter
from summary.core.webhook_service import submit_content
from summary.core.webhook_service import (
call_webhook_v2,
submit_content,
)
settings = get_settings()
analytics = get_analytics()
@@ -58,14 +73,26 @@ if settings.sentry_dsn and settings.sentry_is_enabled:
file_service = FileService()
def transcribe_audio(task_id, filename, language):
def transcribe_audio(
*,
task_id: str,
filename: str | None = None,
language: str,
cloud_storage_url=None,
raises: bool = False,
):
"""Transcribe an audio file using WhisperX.
Downloads the audio from MinIO, sends it to WhisperX for transcription,
and tracks metadata throughout the process.
Downloads the audio from MinIO or a cloud storage URL, sends it to
WhisperX for transcription, and tracks metadata throughout the process.
Returns the transcription object, or None if the file could not be retrieved.
"""
if bool(filename) == bool(cloud_storage_url):
raise ValueError(
"Either filename or cloud_storage_url must be provided, but not both."
)
logger.info("Initiating WhisperX client")
whisperx_client = openai.OpenAI(
api_key=settings.whisperx_api_key.get_secret_value(),
@@ -75,7 +102,10 @@ def transcribe_audio(task_id, filename, language):
# Transcription
try:
with file_service.prepare_audio_file(filename) as (audio_file, metadata):
with file_service.prepare_audio_file(
remote_object_key=filename,
cloud_storage_url=cloud_storage_url,
) as (audio_file, metadata):
metadata_manager.track(task_id, {"audio_length": metadata["duration"]})
if language is None:
@@ -104,8 +134,21 @@ def transcribe_audio(task_id, filename, language):
logger.info("Transcription received in %.2f seconds.", transcription_time)
logger.debug("Transcription: \n %s", transcription)
except FileServiceException:
logger.exception("Unexpected error for filename: %s", filename)
except FileServiceException as e:
# For v2 pipeline we want failures not silent errors like this
if raises:
raise e
redacted_cloud_storage_url = (
cloud_storage_url.split("?", 1)[0] if cloud_storage_url else None
)
logger.exception(
(
"Unexpected error while preparing file | filename: %s "
"| cloud_storage_url: %s"
),
filename,
redacted_cloud_storage_url,
)
return None
metadata_manager.track_transcription_metadata(task_id, transcription)
@@ -114,13 +157,13 @@ def transcribe_audio(task_id, filename, language):
def format_transcript(
transcription,
context_language,
language,
room,
recording_date,
recording_time,
download_link,
):
context_language: str | None,
language: str,
room: str | None,
recording_date: str | None,
recording_time: str | None,
download_link: str | None,
) -> tuple[str, str]:
"""Format a transcription into readable content with a title.
Resolves the locale from context_language / language, then uses
@@ -208,7 +251,9 @@ def process_audio_transcribe_summarize_v2(
task_id = self.request.id
transcription = transcribe_audio(task_id, filename, language)
transcription = transcribe_audio(
task_id=task_id, filename=filename, language=language
)
if transcription is None:
return
@@ -258,24 +303,10 @@ def task_failure_handler(task_id, exception=None, **kwargs):
metadata_manager.capture(task_id, settings.posthog_event_failure)
@celery.task(
bind=True,
autoretry_for=[LLMException, Exception],
max_retries=settings.celery_max_retries,
queue=settings.summarize_queue,
)
def summarize_transcription(
self, owner_id: str, transcript: str, email: str, sub: str, title: str
):
"""Generate a summary from the provided transcription text.
This Celery task performs the following operations:
1. Uses an LLM to generate a TL;DR summary of the transcription.
2. Breaks the transcription into parts and summarizes each part.
3. Cleans up the combined summary
4. Generates next steps.
5. Sends the final summary via webhook.
"""
def summarize_transcription_internals(
*, owner_id: str, transcript: str, session_id: str
) -> str:
"""Generate a summary from the provided transcription text."""
logger.info(
"Starting summarization task | Owner: %s",
owner_id,
@@ -292,7 +323,7 @@ def summarize_transcription(
# privacy controls in observability traces.
llm_observability = LLMObservability(
user_has_tracing_consent=user_has_tracing_consent,
session_id=self.request.id,
session_id=session_id,
user_id=owner_id,
)
llm_service = LLMService(llm_observability=llm_observability)
@@ -338,9 +369,203 @@ def summarize_transcription(
logger.info("Summary cleaned")
summary = tldr + "\n\n" + cleaned_summary + "\n\n" + next_steps
llm_observability.flush()
logger.debug("LLM observability flushed")
return summary
@celery.task(
bind=True,
autoretry_for=[LLMException, Exception],
max_retries=settings.celery_max_retries,
queue=settings.summarize_queue,
)
def summarize_transcription(
self, owner_id: str, transcript: str, email: str, sub: str, title: str
):
"""Generate a summary from the provided transcription text.
This Celery task performs the following operations:
1. Uses an LLM to generate a TL;DR summary of the transcription.
2. Breaks the transcription into parts and summarizes each part.
3. Cleans up the combined summary
4. Generates next steps.
5. Sends the final summary via webhook.
"""
summary = summarize_transcription_internals(
owner_id=owner_id, transcript=transcript, session_id=self.request.id
)
summary_title = settings.summary_title_template.format(title=title)
submit_content(summary, summary_title, email, sub)
llm_observability.flush()
logger.debug("LLM observability flushed")
##################################################################################
# Tasks v2
##################################################################################
@celery.task(
max_retries=3,
queue=settings.call_webhook_queue_v2,
autoretry_for=[exceptions.HTTPError],
)
def call_webhook_v2_task(
payload: dict,
tenant_id: str,
):
"""Calls a webhook asynchrously (retry handled by celery)."""
call_webhook_v2(
payload=webhook_payload_adapter.validate_python(payload), tenant_id=tenant_id
)
@celery.task(
bind=True,
autoretry_for=[exceptions.HTTPError],
max_retries=settings.celery_max_retries,
queue=settings.transcribe_queue_v2,
)
def process_audio_transcribe_v2_task(
self,
payload: dict,
):
"""Process an audio file by transcribing it.
This Celery task orchestrates:
1. Audio transcription via WhisperX
2. Store transcript result on S3
3. Webhook submission
Args:
self: Celery task instance (passed on with bind=True)
payload: Serialized dictionary of TranscribeSummarizeTaskCreationV2
"""
payload = TranscribeTaskV2Payload.model_validate(payload)
logger.info(
"Transcribing for object received | Owner: %s",
payload.user_sub,
)
job_id = self.request.id
transcription_res = WhisperXResponse(
**transcribe_audio( # type: ignore
task_id=job_id,
cloud_storage_url=payload.cloud_storage_url,
language=payload.language,
raises=True,
).model_dump()
)
file_service.store_transcript(
transcript=transcription_res,
job_id=job_id,
)
call_webhook_v2_task.apply_async(
args=[
TranscribeWebhookSuccessPayload(
job_id=job_id,
transcription_data_url=file_service.get_transcript_signed_url(job_id),
).model_dump(),
payload.tenant_id,
]
)
@signals.task_failure.connect(sender=process_audio_transcribe_v2_task)
def handle_transcribe_v2_failed(
sender,
task_id=None,
exception=None,
args=None,
kwargs=None,
traceback=None,
einfo=None,
**kw,
):
"""Handle the failure of transcribe_v2_task.
This function is triggered when the transcribe_v2_task fails.
It sends a webhook failure payload to notify the client of the failure.
"""
task = sender
# If retries are exhausted:
if task.request.retries >= task.max_retries:
call_webhook_v2_task.apply_async(
args=[
TranscribeWebhookFailurePayload(
job_id=task.id,
error_code="unknown_error",
).model_dump(),
args[0]["tenant_id"],
]
)
@celery.task(
bind=True,
autoretry_for=[LLMException, Exception],
max_retries=settings.celery_max_retries,
queue=settings.summarize_queue_v2,
)
def summarize_v2_task(
self,
payload: dict,
):
"""Generate a summary from the provided content.
This Celery task performs the following operations:
1. Uses an LLM to generate a TL;DR summary of the content.
2. Breaks the content into parts and summarizes each part.
3. Cleans up the combined summary
4. Generates next steps.
5. Sends the final summary via webhook.
"""
payload = SummarizeTaskV2Payload.model_validate(payload)
summary = summarize_transcription_internals(
owner_id=payload.user_sub,
transcript=payload.content,
session_id=self.request.id,
)
call_webhook_v2_task.apply_async(
args=[
SummarizeWebhookSuccessPayload(
job_id=self.request.id, summary=summary
).model_dump(),
payload.tenant_id,
]
)
@signals.task_failure.connect(sender=summarize_v2_task)
def handle_summarize_v2_failed(
sender,
task_id=None,
exception=None,
args=None,
kwargs=None,
traceback=None,
einfo=None,
**kw,
):
"""Handle the failure of summarize_v2_task.
This function is triggered when the summarize_v2_task fails.
It sends a webhook failure payload to notify the client of the failure.
"""
task = sender
# If retries are exhausted:
if task.request.retries >= task.max_retries:
call_webhook_v2_task.apply_async(
args=[
SummarizeWebhookFailurePayload(
job_id=task.id,
error_code="unknown_error",
).model_dump(),
args[0]["tenant_id"],
]
)

View File

@@ -1,21 +1,58 @@
"""Application configuration and settings."""
from functools import lru_cache
from typing import Annotated, List, Literal, Optional, Set
import logging
import os
from functools import cached_property, lru_cache
from typing import Annotated, Any, List, Literal, Mapping, Optional, Set
from fastapi import Depends
from pydantic import SecretStr
from pydantic import (
BaseModel,
ConfigDict,
Field,
SecretStr,
model_validator,
)
from pydantic_settings import BaseSettings, SettingsConfigDict
from summary.core.types import Url
logger = logging.getLogger(__name__)
class AuthorizedTenant(BaseModel):
"""Authorized tenant configuration."""
model_config = ConfigDict(frozen=True)
id: str = Field(title="Tenant Id", description="A unique ID for the tenant.")
api_key: SecretStr = Field(title="API key for using the summary API", min_length=8)
webhook_url: Url = Field(
title="Webhook URL", description="The URL to send the summary to."
)
webhook_api_key: SecretStr = Field(
title="Webhook API Key",
description="The api_key to authenticate the webhook request.",
)
V1_DEFAULT_TENANT_ID = "__deprecated_meet_tenant__"
class Settings(BaseSettings):
"""Configuration settings loaded from environment variables and .env file."""
model_config = SettingsConfigDict(env_file=".env")
model_config = SettingsConfigDict(env_file=".env", frozen=True)
app_name: str = "app"
app_name: str = "summary"
app_api_v1_str: str = "/api/v1"
app_api_token: SecretStr
app_api_v2_str: str = "/api/v2"
# Authorized Tenants
# Using env variables to store authorized tenants for now
# to avoid any other external dependency (DB)
authorized_tenants: tuple[AuthorizedTenant, ...] = Field(default_factory=tuple)
v1_tenant_id: str = V1_DEFAULT_TENANT_ID
# Audio recordings
recording_max_duration: Optional[int] = None
@@ -29,6 +66,10 @@ class Settings(BaseSettings):
transcribe_queue: str = "transcribe-queue"
summarize_queue: str = "summarize-queue"
# v2 tasks
transcribe_queue_v2: str = "transcribe-queue-v2"
summarize_queue_v2: str = "summarize-queue-v2"
call_webhook_queue_v2: str = "call-webhook-queue-v2"
# Minio settings
aws_storage_bucket_name: str
@@ -36,6 +77,7 @@ class Settings(BaseSettings):
aws_s3_access_key_id: str
aws_s3_secret_access_key: SecretStr
aws_s3_secure_access: bool = True
aws_transcript_path: str = "transcripts"
# AI-related settings
whisperx_api_key: SecretStr
@@ -56,8 +98,6 @@ class Settings(BaseSettings):
webhook_max_retries: int = 2
webhook_status_forcelist: List[int] = [502, 503, 504]
webhook_backoff_factor: float = 0.1
webhook_api_token: SecretStr
webhook_url: str
# Locale
default_context_language: Literal["de", "en", "fr", "nl"] = "fr"
@@ -90,6 +130,92 @@ class Settings(BaseSettings):
task_tracker_redis_url: str = "redis://redis/0"
task_tracker_prefix: str = "task_metadata:"
@model_validator(mode="before")
@classmethod
def legacy_default_tenant_config(cls, data: Any) -> Any:
"""Migrate the legacy default tenant configuration."""
if isinstance(data, dict):
api_key = os.getenv("APP_API_TOKEN")
webhook_api_key = os.getenv("WEBHOOK_API_TOKEN")
webhook_url = os.getenv("WEBHOOK_URL")
if api_key and webhook_api_key and webhook_url:
logger.warning(
"Deprecated legacy app configuration detected, "
"please use only the new 'authorized_tenants' field instead."
)
authorized_tenants = list(data.get("authorized_tenants", []))
authorized_tenants.append(
AuthorizedTenant(
id=V1_DEFAULT_TENANT_ID,
api_key=SecretStr(api_key),
webhook_url=webhook_url,
webhook_api_key=SecretStr(webhook_api_key),
)
)
data["authorized_tenants"] = tuple(authorized_tenants)
return data
@model_validator(mode="after")
def validate_authorized_tenants(self):
"""Validate authorized tenants configuration."""
if len(self.authorized_tenants) == 0:
raise ValueError("No authorized tenants configured")
tenant_ids = {tenant.id for tenant in self.authorized_tenants}
if len(tenant_ids) != len(self.authorized_tenants):
raise ValueError("Duplicate tenant ids are not allowed")
api_keys = {
tenant.api_key.get_secret_value() for tenant in self.authorized_tenants
}
if len(api_keys) != len(self.authorized_tenants):
raise ValueError("Duplicate application API api_keys are not allowed")
return self
@model_validator(mode="after")
def validate_default_v1_tenant(self):
"""Validate default v1 tenant configuration."""
if not any(
tenant.id == self.v1_tenant_id for tenant in self.authorized_tenants
):
raise ValueError("v1 tenant is not configured in authorized tenants")
return self
@cached_property
def authorized_tenant_api_keys(self) -> frozenset[str]:
"""Return a frozenset of authorized tenant API api_keys."""
return frozenset(
app.api_key.get_secret_value() for app in self.authorized_tenants
)
@cached_property
def authorized_tenant_by_id(self) -> Mapping[str, AuthorizedTenant]:
"""Return a dict of authorized tenants by ID."""
return {app.id: app for app in self.authorized_tenants}
@cached_property
def authorized_tenant_by_api_key(self) -> Mapping[str, AuthorizedTenant]:
"""Return the authorized tenant for a given API api_key."""
return {app.api_key.get_secret_value(): app for app in self.authorized_tenants}
def get_authorized_tenant(
self, *, tenant_id: str | None = None, api_key: str | None = None
) -> AuthorizedTenant:
"""Return the authorized tenant for a given API api_key or id."""
if tenant_id is None and api_key is None:
raise ValueError("Either tenant_id or api_key must be provided")
if tenant_id is not None and api_key is not None:
raise ValueError("Only one of tenant_id or api_key can be provided")
if tenant_id is not None:
return self.authorized_tenant_by_id[tenant_id]
else:
return self.authorized_tenant_by_api_key[api_key]
@lru_cache
def get_settings():

View File

@@ -1,21 +1,25 @@
"""File service to encapsulate files' manipulations."""
import io
import logging
import os
import subprocess
import tempfile
from contextlib import contextmanager
from datetime import timedelta
from pathlib import Path
from urllib.parse import urlparse
import mutagen
import requests
from minio import Minio
from minio.error import MinioException, S3Error
from summary.core.config import get_settings
from summary.core.shared_models import WhisperXResponse
settings = get_settings()
logger = logging.getLogger(__name__)
@@ -97,6 +101,58 @@ class FileService:
if response:
response.close()
def _download_from_cloud_storage_url(self, cloud_storage_url: str) -> Path:
"""Download file from a cloud storage URL to local temporary file."""
logger.info(
"Download recording from URL | cloud_storage_url: %s",
cloud_storage_url,
)
if not cloud_storage_url:
logger.warning("Invalid cloud_storage_url '%s'", cloud_storage_url)
raise ValueError("Invalid cloud_storage_url")
extension = Path(urlparse(cloud_storage_url).path).suffix.lower()
if extension not in self._allowed_extensions:
logger.warning(
"Invalid file extension '%s' from cloud_storage_url", extension
)
raise ValueError(f"Invalid file extension '{extension}'")
try:
with requests.get(
cloud_storage_url,
stream=True,
timeout=(10, 300),
# verify=False,
) as response:
response.raise_for_status()
with tempfile.NamedTemporaryFile(
suffix=extension,
delete=False,
prefix="cloud_storage_download_",
) as tmp:
for chunk in response.iter_content(
chunk_size=self._stream_chunk_size
):
if chunk:
tmp.write(chunk)
tmp.flush()
local_path = Path(tmp.name)
logger.info(
"Recording successfully downloaded from cloud_storage_url"
)
logger.debug("Recording local file path: %s", local_path)
return local_path
except requests.RequestException as e:
raise FileServiceException(
"Unexpected error while downloading object from cloud_storage_url."
) from e
def _validate_duration(self, local_path: Path) -> float:
"""Validate audio file duration against configured maximum."""
file_metadata = mutagen.File(local_path).info
@@ -157,19 +213,37 @@ class FileService:
raise RuntimeError("Failed to extract audio.") from e
@contextmanager
def prepare_audio_file(self, remote_object_key: str):
def prepare_audio_file(
self,
remote_object_key: str | None = None,
cloud_storage_url: str | None = None,
):
"""Download and prepare audio file for processing.
Downloads file from MinIO, validates duration, and yields an open
file handle with metadata. Automatically cleans up temporary files
when the context exits.
Downloads file from MinIO or an external cloud URL, validates duration,
and yields an open file handle with metadata. Automatically cleans up
temporary files when the context exits.
"""
downloaded_path = None
processed_path = None
file_handle = None
try:
if bool(remote_object_key) == bool(cloud_storage_url):
raise ValueError(
(
"Exactly one of 'remote_object_key' or "
"'cloud_storage_url' must be provided."
)
)
if cloud_storage_url:
downloaded_path = self._download_from_cloud_storage_url(
cloud_storage_url
)
else:
downloaded_path = self._download_from_minio(remote_object_key)
duration = self._validate_duration(downloaded_path)
extension = downloaded_path.suffix.lower()
@@ -199,3 +273,25 @@ class FileService:
logger.debug("Temporary file removed: %s", path)
except OSError as e:
logger.warning("Failed to remove temporary file %s: %s", path, e)
def store_transcript(self, *, transcript: WhisperXResponse, job_id: str) -> None:
"""Store transcript in MinIO."""
logger.info("Storing transcript for job id %s", job_id)
transcript_path = f"{settings.aws_transcript_path}/{job_id}.json"
logger.debug("Transcript path: %s", transcript_path)
data = transcript.model_dump_json().encode()
self._minio_client.put_object(
self._bucket_name,
transcript_path,
io.BytesIO(data),
length=len(data),
)
logger.info("Transcript stored successfully for job id %s", job_id)
def get_transcript_signed_url(self, job_id: str) -> str:
"""Get signed URL for transcript file."""
transcript_path = f"{settings.aws_transcript_path}/{job_id}.json"
logger.debug("Transcript path: %s", transcript_path)
return self._minio_client.presigned_get_object(
self._bucket_name, transcript_path, expires=timedelta(hours=1)
)

View File

@@ -0,0 +1,60 @@
"""Models for the API & Celery tasks creation."""
from pydantic import BaseModel, Field, field_validator
from summary.core.config import get_settings
from summary.core.types import Url
settings = get_settings()
class SharedV2TaskCreation(BaseModel):
"""Model that holds basic information for task creation."""
user_sub: str = Field(title="User Sub", description="The user's sub.")
class TranscribeTaskV2Request(SharedV2TaskCreation):
"""Model for creating a transcribe and summarize task (used for API request)."""
cloud_storage_url: Url = Field(
title="Cloud storage URL",
description="The URL of the audio file to transcribe.",
)
context_language: str | None = Field(
default=None,
title="Context Language",
description="The language of the context text.",
)
language: str = Field(
title="Language", description="The language of the content to summarize."
)
@field_validator("language")
@classmethod
def validate_language(cls, v):
"""Validate 'language' parameter."""
if v is not None and v not in settings.whisperx_allowed_languages:
raise ValueError(
f"Language '{v}' is not allowed. "
f"Allowed languages: {', '.join(settings.whisperx_allowed_languages)}"
)
return v
class TranscribeTaskV2Payload(TranscribeTaskV2Request):
"""Model for creating a transcribe and summarize task (used for actual task creation).""" # noqa: E501
tenant_id: str = Field(title="Tenant ID", description="The ID of the tenant.")
class SummarizeTaskV2Request(SharedV2TaskCreation):
"""Model for creating a summarize task (used for API request)."""
content: str = Field(title="Content", description="The content to summarize.")
class SummarizeTaskV2Payload(SummarizeTaskV2Request):
"""Model for creating a summarize task (used for actual task creation)."""
tenant_id: str = Field(title="Tenant ID", description="The ID of the tenant.")

View File

@@ -8,12 +8,23 @@ from .config import SettingsDeps
security = HTTPBearer()
def verify_token(
def verify_tenant_api_key(
settings: SettingsDeps,
credentials: HTTPAuthorizationCredentials = Security(security), # noqa: B008
):
"""Verify the bearer token from the Authorization header."""
token = credentials.credentials
if token != settings.app_api_token.get_secret_value():
raise HTTPException(status_code=401, detail="Invalid token")
return token
"""Verify the bearer api_key from the Authorization header."""
api_key = credentials.credentials
if api_key not in settings.authorized_tenant_api_keys:
raise HTTPException(status_code=403, detail="Unauthorized")
return api_key
def verify_tenant_api_key_v2(
settings: SettingsDeps,
credentials: HTTPAuthorizationCredentials = Security(security), # noqa: B008
):
"""Verify the bearer api_key from the Authorization header."""
api_key = credentials.credentials
if api_key not in settings.authorized_tenant_api_keys:
raise HTTPException(status_code=403, detail="Unauthorized")
return settings.get_authorized_tenant(api_key=api_key)

View File

@@ -0,0 +1,124 @@
"""Transcribe / summary Shared / Webhook models."""
from typing import Annotated, Literal, Union
from pydantic import BaseModel, Field, TypeAdapter
class WordSegment(BaseModel):
"""Word segment model for transcription tasks."""
word: str = Field(title="Word")
start: float = Field(title="Start Time", description="Start time in seconds.")
end: float = Field(title="End Time", description="End time in seconds.")
score: float | None = Field(
title="Confidence Score", description="Confidence score for the word segment."
)
speaker: str | None = Field(
title="Speaker", description="Speaker identifier for the word segment."
)
class Segment(BaseModel):
"""Segment model for transcription tasks."""
start: float = Field(title="Start Time", description="Start time in seconds.")
end: float = Field(title="End Time", description="End time in seconds.")
text: str = Field(
title="Segment Text", description="Transcribed text for the segment."
)
words: tuple[WordSegment, ...] | None = Field(
title="Word Segments", description="List of word segments within the segment."
)
speaker: str | None = Field(
title="Speaker", description="Speaker identifier for the segment."
)
class WhisperXResponse(BaseModel):
"""Model for WhisperX response."""
segments: tuple[Segment, ...] = Field(
title="Segments", description="List of transcribed segments."
)
word_segments: tuple[WordSegment, ...] = Field(
title="Word Segments", description="List of word segments."
)
class BaseWebhook(BaseModel):
"""Base webhook payload."""
job_id: str = Field(
title="Job ID",
description="The ID of the job document in the receiver system.",
)
class TranscribeWebhookSuccessPayload(BaseWebhook):
"""Payload for a successful transcription webhook."""
type: Literal["transcript"] = Field(default="transcript")
status: Literal["success"] = Field(default="success")
transcription_data_url: str = Field(
title="Transcript", description="URL to the raw transcription data."
)
class TranscribeWebhookFailurePayload(BaseWebhook):
"""Payload for a failed transcription webhook."""
type: Literal["transcript"] = Field(default="transcript")
status: Literal["failure"] = Field(default="failure")
error_code: Literal["unknown_error"] = Field(
title="Error code", description="The error code."
)
TranscribeWebhookPayloads = Annotated[
Union[TranscribeWebhookSuccessPayload, TranscribeWebhookFailurePayload],
Field(discriminator="status"),
]
class SummarizeWebhookSuccessPayload(BaseWebhook):
"""Payload for a successful summarization webhook."""
type: Literal["summary"] = Field(default="summary")
status: Literal["success"] = Field(default="success")
summary: str = Field(title="Summary", description="The summary of the text.")
class SummarizeWebhookFailurePayload(BaseWebhook):
"""Payload for a failed summarization webhook."""
type: Literal["summary"] = Field(default="summary")
status: Literal["failure"] = Field(default="failure")
error_code: Literal["unknown_error"] = Field(
title="Error code", description="The error code."
)
SummarizeWebhookPayloads = Annotated[
Union[SummarizeWebhookSuccessPayload, SummarizeWebhookFailurePayload],
Field(discriminator="status"),
]
WebhookPayloads = Annotated[
Union[TranscribeWebhookPayloads, SummarizeWebhookPayloads],
Field(discriminator="type"),
]
webhook_payload_adapter = TypeAdapter(WebhookPayloads)
__all__ = [
"TranscribeWebhookSuccessPayload",
"TranscribeWebhookFailurePayload",
"SummarizeWebhookSuccessPayload",
"SummarizeWebhookFailurePayload",
"TranscribeWebhookPayloads",
"SummarizeWebhookPayloads",
"WebhookPayloads",
"WhisperXResponse",
]

View File

@@ -0,0 +1,10 @@
"""Basic types for the summary service."""
from typing import Annotated
from pydantic import BeforeValidator, HttpUrl, TypeAdapter
http_url_adapter = TypeAdapter(HttpUrl)
Url = Annotated[
str, BeforeValidator(lambda value: str(http_url_adapter.validate_python(value)))
]

View File

@@ -3,18 +3,22 @@
import json
import logging
import requests
from requests import Session
from requests.adapters import HTTPAdapter
from urllib3.util import Retry
from summary.core.config import get_settings
from summary.core.shared_models import (
WebhookPayloads,
)
settings = get_settings()
logger = logging.getLogger(__name__)
def _create_retry_session():
def _create_retry_session(api_key: str | None = None):
"""Create an HTTP session configured with retry logic."""
session = Session()
retries = Retry(
@@ -24,15 +28,16 @@ def _create_retry_session():
allowed_methods={"POST"},
)
session.mount("https://", HTTPAdapter(max_retries=retries))
if api_key:
session.headers.update({"Authorization": f"Bearer {api_key}"})
return session
def _post_with_retries(url, data):
def _post_with_retries(*, url, data, api_key: str | None = None):
"""Send POST request with automatic retries."""
session = _create_retry_session()
session.headers.update(
{"Authorization": f"Bearer {settings.webhook_api_token.get_secret_value()}"}
)
session = _create_retry_session(api_key=api_key)
try:
response = session.post(url, json=data)
response.raise_for_status()
@@ -41,22 +46,78 @@ def _post_with_retries(url, data):
session.close()
def submit_content(content, title, email, sub):
"""Submit content to the configured webhook destination.
def call_webhook_v1(*, tenant_id: str, payload: dict) -> None:
"""Call webhook with payload a payload and optional token."""
tenant = settings.get_authorized_tenant(tenant_id=tenant_id)
Builds the payload, sends it with retries, and logs the outcome.
"""
data = {
"title": title,
"content": content,
"email": email,
"sub": sub,
}
logger.debug("Submitting to %s", tenant.webhook_url)
logger.debug("Request payload: %s", json.dumps(payload, indent=2))
logger.debug("Submitting to %s", settings.webhook_url)
logger.debug("Request payload: %s", json.dumps(data, indent=2))
response = _post_with_retries(settings.webhook_url, data)
response = _post_with_retries(
url=tenant.webhook_url,
api_key=tenant.webhook_api_key.get_secret_value(),
data=payload,
)
try:
response_data = response.json()
document_id = response_data.get("id", "N/A")
except (json.JSONDecodeError, AttributeError):
document_id = "Unable to parse response"
response_data = response.text
logger.info(
"Delivery success | Document %s submitted (HTTP %s)",
document_id,
response.status_code,
)
logger.debug("Full response: %s", response_data)
def submit_content(content: str, title: str, email: str, sub: str) -> None:
"""Submit content to the configured webhook destination.
Builds the payload, sends it with retries, and logs the outcome.
Notes:
Deprecated: Use call_webhook_v2 directly instead.
Deprecated:
This will route content to the v1 default tenant
"""
data = {
"title": title,
"content": content,
"email": email,
"sub": sub,
}
call_webhook_v1(payload=data, tenant_id=settings.v1_tenant_id)
def call_webhook_v2(
*,
tenant_id: str,
payload: WebhookPayloads,
) -> None:
"""Call webhook with a payload to a specific tenant.
Request is performed wihtout retry, retry should be handled at the task level.
"""
tenant = settings.get_authorized_tenant(tenant_id=tenant_id)
logger.debug("Submitting to %s", tenant.webhook_url)
logger.debug("Request payload: %s", payload.model_dump_json(indent=2))
response = requests.post(
tenant.webhook_url,
json=payload.model_dump(),
headers={
"Authorization": f"Bearer {tenant.webhook_api_key.get_secret_value()}",
},
timeout=(10, 20),
)
response.raise_for_status()
try:
response_data = response.json()

View File

@@ -4,7 +4,7 @@ import sentry_sdk
from fastapi import FastAPI
from summary.api import health
from summary.api.main import api_router
from summary.api.main import api_router_v1, api_router_v2
from summary.core.config import get_settings
settings = get_settings()
@@ -17,5 +17,6 @@ app = FastAPI(
title=settings.app_name,
)
app.include_router(api_router, prefix=settings.app_api_v1_str)
app.include_router(api_router_v1, prefix=settings.app_api_v1_str)
app.include_router(api_router_v2, prefix=settings.app_api_v2_str)
app.include_router(health.router)

View File

@@ -5,7 +5,7 @@ class TestHeartbeat:
"""Tests for the /__heartbeat__ endpoint."""
def test_returns_200(self, client):
"""The heartbeat endpoint responds with 200 OK without a token."""
"""The heartbeat endpoint responds with 200 OK without an api_key."""
response = client.get("/__heartbeat__")
assert response.status_code == 200
@@ -15,7 +15,7 @@ class TestLBHeartbeat:
"""Tests for the /__lbheartbeat__ endpoint."""
def test_returns_200(self, client):
"""The load-balancer heartbeat endpoint responds with 200 OK without a token."""
"""The LB heartbeat endpoint responds with 200 OK without an api_key."""
response = client.get("/__lbheartbeat__")
assert response.status_code == 200

View File

@@ -5,7 +5,7 @@ from unittest.mock import MagicMock, patch
class TestTasks:
"""Tests for the /tasks endpoint."""
"""Tests for the /v1/tasks endpoint."""
@patch(
"summary.api.route.tasks.process_audio_transcribe_summarize_v2.apply_async",

View File

@@ -0,0 +1,126 @@
"""Integration tests for the V2 task API endpoints."""
from unittest.mock import MagicMock, patch
class TestTasksV2:
"""Tests for the /v2/async-jobs-jobs endpoints."""
@patch(
"summary.api.route.tasks_v2.process_audio_transcribe_v2_task.apply_async",
return_value=MagicMock(id="transcribe-task-id-abc"),
)
def test_create_transcribe_task_v2_returns_task_id(self, mock_apply_async, client):
"""POST /async-jobs/transcribe creates a task and injects tenant_id."""
response = client.post(
"/api/v2/async-jobs/transcribe",
headers={"Authorization": "Bearer test-api-token"},
json={
"user_sub": "remote-001",
"cloud_storage_url": "https://example.com/audio.mp3",
"language": "en",
"context_language": "fr",
},
)
assert response.status_code == 200
assert response.json() == {
"job_id": "transcribe-task-id-abc",
"message": "Transcribe job created",
}
args = mock_apply_async.call_args.kwargs["args"]
assert args == [
{
"user_sub": "remote-001",
"cloud_storage_url": "https://example.com/audio.mp3",
"language": "en",
"context_language": "fr",
"tenant_id": "test-tenant",
}
]
@patch(
"summary.api.route.tasks_v2.summarize_v2_task.apply_async",
return_value=MagicMock(id="summarize-task-id-abc"),
)
def test_create_summarize_task_v2_returns_task_id(self, mock_apply_async, client):
"""POST /async-jobs/summarize creates a task and injects tenant_id."""
response = client.post(
"/api/v2/async-jobs/summarize",
headers={"Authorization": "Bearer test-api-token"},
json={
"user_sub": "remote-002",
"content": "This is a long meeting transcript to summarize.",
},
)
assert response.status_code == 200
assert response.json() == {
"job_id": "summarize-task-id-abc",
"message": "Summarize job created",
}
args = mock_apply_async.call_args.kwargs["args"]
assert args == [
{
"user_sub": "remote-002",
"content": "This is a long meeting transcript to summarize.",
"tenant_id": "test-tenant",
}
]
@patch("summary.api.route.tasks_v2.AsyncResult")
def test_get_transcribe_task_status_returns_status_for_same_tenant(
self, mock_async_result, client
):
"""GET /async-jobs/transcribe/{id} returns status when tenant matches."""
mock_async_result.return_value = MagicMock(
status="PENDING",
args=[{"tenant_id": "test-tenant"}],
)
response = client.get(
"/api/v2/async-jobs/transcribe/task-id-abc",
headers={"Authorization": "Bearer test-api-token"},
)
assert response.status_code == 200
assert response.json() == {"job_id": "task-id-abc", "status": "PENDING"}
mock_async_result.assert_called_once_with("task-id-abc")
@patch("summary.api.route.tasks_v2.AsyncResult")
def test_get_summarize_task_status_returns_status_for_same_tenant(
self, mock_async_result, client
):
"""GET /async-jobs/summarize/{id} returns status when tenant matches."""
mock_async_result.return_value = MagicMock(
status="SUCCESS",
args=[{"tenant_id": "test-tenant"}],
)
response = client.get(
"/api/v2/async-jobs/summarize/task-id-abc",
headers={"Authorization": "Bearer test-api-token"},
)
assert response.status_code == 200
assert response.json() == {"job_id": "task-id-abc", "status": "SUCCESS"}
mock_async_result.assert_called_once_with("task-id-abc")
@patch("summary.api.route.tasks_v2.AsyncResult")
def test_get_task_status_returns_404_when_args_are_missing(
self, mock_async_result, client
):
"""GET /async-jobs/.../{id} returns 404 when task args are invalid."""
mock_async_result.return_value = MagicMock(status="SUCCESS", args=None)
response = client.get(
"/api/v2/async-jobs/transcribe/task-id-abc",
headers={"Authorization": "Bearer test-api-token"},
)
assert response.status_code == 404
assert response.json() == {"detail": "Not found"}

View File

@@ -4,14 +4,22 @@ import pytest
from fastapi.testclient import TestClient
from pydantic import SecretStr
from summary.core.config import Settings, get_settings
from summary.core.config import AuthorizedTenant, Settings, get_settings
from summary.main import app
def get_settings_override():
"""Return settings for tests."""
return Settings(
app_api_token=SecretStr("test-api-token"),
v1_tenant_id="test-tenant",
authorized_tenants=(
AuthorizedTenant(
webhook_url="https://example.com/webhook",
id="test-tenant",
api_key=SecretStr("test-api-token"),
webhook_api_key=SecretStr("test-webhook-api-key"),
),
),
)