Files
docs/src/backend/core/admin.py
charles 9d7f876705 (backend) add command admin
we want to run the indexing from the admin.
in `dmin/core/runindexing/`is a form to do so.

Signed-off-by: charles <charles.englebert@protonmail.com>

(backend) add async_mode flag

the command must be killable.
this adds a async_mode flag to preserve async
feature and allow running sync.

Signed-off-by: charles <charles.englebert@protonmail.com>
2026-04-24 12:02:34 +02:00

305 lines
7.9 KiB
Python

"""Admin classes and registrations for core app."""
from django.contrib import admin, messages
from django.contrib.admin.views.decorators import staff_member_required
from django.contrib.auth import admin as auth_admin
from django.core.management import call_command
from django.http import HttpRequest
from django.shortcuts import redirect, render
from django.utils.translation import gettext_lazy as _
from treebeard.admin import TreeAdmin
from core import models
from core.forms import RunIndexingForm
from core.tasks.user_reconciliation import user_reconciliation_csv_import_job
# Customize the default admin site's get_app_list method
_original_get_app_list = admin.site.get_app_list
def custom_get_app_list(_self, request, app_label=None):
"""Add custom commands to the app list."""
app_list = _original_get_app_list(request, app_label)
# Add Commands app with Run Indexing command
commands_app = {
"name": _("Commands"),
"app_label": "commands",
"app_url": "#",
"has_module_perms": True,
"models": [
{
"name": _("Run indexing"),
"object_name": "RunIndexing",
"admin_url": "/admin/run-indexing/",
"view_only": False,
"add_url": None,
"change_url": None,
}
],
}
app_list.append(commands_app)
return app_list
# Monkey-patch the admin site
admin.site.get_app_list = lambda request, app_label=None: custom_get_app_list(
admin.site, request, app_label
)
@admin.register(models.User)
class UserAdmin(auth_admin.UserAdmin):
"""Admin class for the User model"""
fieldsets = (
(
None,
{
"fields": (
"id",
"admin_email",
"password",
)
},
),
(
_("Personal info"),
{
"fields": (
"sub",
"email",
"full_name",
"short_name",
"language",
"timezone",
)
},
),
(
_("Permissions"),
{
"fields": (
"is_active",
"is_device",
"is_staff",
"is_superuser",
"groups",
"user_permissions",
),
},
),
(_("Important dates"), {"fields": ("created_at", "updated_at")}),
)
add_fieldsets = (
(
None,
{
"classes": ("wide",),
"fields": ("email", "password1", "password2"),
},
),
)
list_display = (
"id",
"sub",
"full_name",
"admin_email",
"email",
"is_active",
"is_staff",
"is_superuser",
"is_device",
"created_at",
"updated_at",
)
list_filter = ("is_staff", "is_superuser", "is_device", "is_active")
ordering = (
"is_active",
"-is_superuser",
"-is_staff",
"-is_device",
"-updated_at",
"full_name",
)
readonly_fields = (
"id",
"sub",
"email",
"full_name",
"short_name",
"created_at",
"updated_at",
)
search_fields = ("id", "sub", "admin_email", "email", "full_name")
@admin.register(models.UserReconciliationCsvImport)
class UserReconciliationCsvImportAdmin(admin.ModelAdmin):
"""Admin class for UserReconciliationCsvImport model."""
list_display = ("id", "__str__", "created_at", "status")
def save_model(self, request, obj, form, change):
"""Override save_model to trigger the import task on creation."""
super().save_model(request, obj, form, change)
if not change:
user_reconciliation_csv_import_job.delay(obj.pk)
messages.success(request, _("Import job created and queued."))
return redirect("..")
@admin.action(description=_("Process selected user reconciliations"))
def process_reconciliation(_modeladmin, _request, queryset):
"""
Admin action to process selected user reconciliations.
The action will process only entries that are ready and have both emails checked.
"""
processable_entries = queryset.filter(
status="ready", active_email_checked=True, inactive_email_checked=True
)
for entry in processable_entries:
entry.process_reconciliation_request()
@admin.register(models.UserReconciliation)
class UserReconciliationAdmin(admin.ModelAdmin):
"""Admin class for UserReconciliation model."""
list_display = ["id", "__str__", "created_at", "status"]
actions = [process_reconciliation]
class DocumentAccessInline(admin.TabularInline):
"""Inline admin class for document accesses."""
autocomplete_fields = ["user"]
model = models.DocumentAccess
extra = 0
@admin.register(models.Document)
class DocumentAdmin(TreeAdmin):
"""Document admin interface declaration."""
fieldsets = (
(
None,
{
"fields": (
"id",
"title",
)
},
),
(
_("Permissions"),
{
"fields": (
"creator",
"link_reach",
"link_role",
)
},
),
(
_("Tree structure"),
{
"fields": (
"path",
"depth",
"numchild",
"duplicated_from",
"attachments",
)
},
),
)
inlines = (DocumentAccessInline,)
list_display = (
"id",
"title",
"link_reach",
"link_role",
"created_at",
"updated_at",
)
readonly_fields = (
"attachments",
"creator",
"depth",
"duplicated_from",
"id",
"numchild",
"path",
)
search_fields = ("id", "title")
@admin.register(models.Invitation)
class InvitationAdmin(admin.ModelAdmin):
"""Admin interface to handle invitations."""
fields = (
"email",
"document",
"role",
"created_at",
"issuer",
)
readonly_fields = (
"created_at",
"is_expired",
"issuer",
)
list_display = (
"email",
"document",
"created_at",
"is_expired",
)
def save_model(self, request, obj, form, change):
obj.issuer = request.user
obj.save()
@staff_member_required
def run_indexing_view(request: HttpRequest):
"""Custom admin view for running indexing commands."""
if request.method == "POST":
form = RunIndexingForm(request.POST)
if form.is_valid():
lower_time_bound = form.cleaned_data.get("lower_time_bound")
upper_time_bound = form.cleaned_data.get("upper_time_bound")
call_command(
"index",
batch_size=form.cleaned_data["batch_size"],
lower_time_bound=lower_time_bound.isoformat()
if lower_time_bound
else None,
upper_time_bound=upper_time_bound.isoformat()
if upper_time_bound
else None,
async_mode=True,
)
messages.success(request, _("Indexing triggered!"))
return redirect("run_indexing")
messages.error(request, _("Please correct the errors below."))
else:
form = RunIndexingForm()
return render(
request=request,
template_name="runindexing.html",
context={
**admin.site.each_context(request),
"title": "Run Indexing Command",
"form": form,
},
)