mirror of
https://github.com/owncloud/ocis
synced 2026-04-25 17:25:21 +02:00
enhancement(search): retry extraction with abort on persistent failure (#12111)
During IndexSpace bulk reindexing, file extraction is now retried up to 5 times with a 1-second delay between attempts. If 5 consecutive files fail (indicating the extraction service is down), the walk is aborted. This replaces the previous 30-minute exponential backoff approach per review feedback from jvillafanez: keep retries quick and short, return an error to the admin rather than sleeping for extended periods. No interface changes — retry logic is internal to the service. Signed-off-by: Paul Faure <paul@faure.ca> Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
16
changelog/unreleased/search-indexer-backoff.md
Normal file
16
changelog/unreleased/search-indexer-backoff.md
Normal file
@@ -0,0 +1,16 @@
|
||||
Enhancement: Retry and abort on repeated extraction failures during indexing
|
||||
|
||||
During `ocis search index` bulk reindexing, if the content extractor (e.g.
|
||||
Tika) becomes unavailable, individual file extractions are now retried up to
|
||||
5 times with a 1-second delay between attempts. If a file still fails after
|
||||
all retries, the failure is logged and the walk continues.
|
||||
|
||||
If 5 consecutive files fail extraction (indicating the extraction service is
|
||||
down rather than a single file being problematic), the index walk is aborted
|
||||
with an error so the admin can investigate.
|
||||
|
||||
Previously, extraction failures were silently logged and the walk continued
|
||||
at full speed, accumulating thousands of wasted "connection refused" errors
|
||||
when Tika was down.
|
||||
|
||||
https://github.com/owncloud/ocis/pull/12111
|
||||
@@ -421,6 +421,13 @@ func (s *Service) searchIndex(ctx context.Context, req *searchsvc.SearchRequest,
|
||||
return res, nil
|
||||
}
|
||||
|
||||
// Retry constants for IndexSpace extraction failure handling.
|
||||
const (
|
||||
_extractionRetries = 5 // max retry attempts per file
|
||||
_extractionRetryDelay = 1 * time.Second // delay between retries
|
||||
_consecutiveAbort = 5 // abort walk after this many consecutive file failures
|
||||
)
|
||||
|
||||
// IndexSpace (re)indexes all resources of a given space.
|
||||
func (s *Service) IndexSpace(spaceID *provider.StorageSpaceId) error {
|
||||
ownerCtx, err := getAuthContext(s.serviceAccountID, s.gatewaySelector, s.serviceAccountSecret, s.logger)
|
||||
@@ -439,6 +446,8 @@ func (s *Service) IndexSpace(spaceID *provider.StorageSpaceId) error {
|
||||
}
|
||||
rootID.OpaqueId = rootID.SpaceId
|
||||
|
||||
failures := 0
|
||||
|
||||
w := walker.NewWalker(s.gatewaySelector)
|
||||
err = w.Walk(ownerCtx, &rootID, func(wd string, info *provider.ResourceInfo, err error) error {
|
||||
if err != nil {
|
||||
@@ -480,8 +489,20 @@ func (s *Service) IndexSpace(spaceID *provider.StorageSpaceId) error {
|
||||
}
|
||||
}
|
||||
|
||||
s.UpsertItem(ref)
|
||||
if err := s.upsertItem(ref, _extractionRetries); err != nil {
|
||||
failures++
|
||||
s.logger.Warn().Err(err).
|
||||
Int("failures", failures).
|
||||
Str("path", ref.Path).
|
||||
Msg("extraction failed after retries")
|
||||
|
||||
if failures >= _consecutiveAbort {
|
||||
return fmt.Errorf("aborting index walk: %d consecutive extraction failures, last error: %w", failures, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
failures = 0
|
||||
return nil
|
||||
})
|
||||
|
||||
@@ -504,29 +525,50 @@ func (s *Service) TrashItem(rID *provider.ResourceId) {
|
||||
|
||||
// UpsertItem indexes or stores Resource data fields.
|
||||
func (s *Service) UpsertItem(ref *provider.Reference) {
|
||||
if err := s.upsertItem(ref, 0); err != nil {
|
||||
s.logger.Error().Err(err).Msg("failed to upsert resource")
|
||||
}
|
||||
}
|
||||
|
||||
// upsertItem is the core extraction-and-index method. When retries > 0,
|
||||
// a failed extraction is retried up to that many times with a fixed delay.
|
||||
func (s *Service) upsertItem(ref *provider.Reference, retries int) error {
|
||||
ctx, stat, path := s.resInfo(ref)
|
||||
if ctx == nil || stat == nil || path == "" {
|
||||
return
|
||||
return fmt.Errorf("could not resolve resource info for %s", ref.GetPath())
|
||||
}
|
||||
|
||||
if slices.Contains(_skipPathNames, path) || slices.Contains(_skipPathDirs, path) {
|
||||
s.logger.Info().Str("path", path).Msg("file won't be indexed")
|
||||
return
|
||||
return nil
|
||||
}
|
||||
|
||||
for _, skipPath := range _skipPathDirs {
|
||||
if strings.HasPrefix(path, skipPath+"/") {
|
||||
s.logger.Info().Str("path", path).Msg("file is in a directory that won't be indexed")
|
||||
return
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
resourceID := storagespace.FormatResourceID(stat.Info.Id)
|
||||
|
||||
doc, err := s.extractor.Extract(ctx, stat.Info)
|
||||
var doc content.Document
|
||||
var err error
|
||||
for attempt := range retries + 1 {
|
||||
doc, err = s.extractor.Extract(ctx, stat.Info)
|
||||
if err == nil {
|
||||
break
|
||||
}
|
||||
if attempt < retries {
|
||||
s.logger.Debug().Err(err).
|
||||
Int("attempt", attempt+1).
|
||||
Str("path", ref.Path).
|
||||
Msg("extraction attempt failed, retrying")
|
||||
time.Sleep(_extractionRetryDelay)
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
s.logger.Error().Err(err).Msg("failed to extract resource content")
|
||||
return
|
||||
return fmt.Errorf("extraction failed for %s: %w", ref.Path, err)
|
||||
}
|
||||
|
||||
r := engine.Resource{
|
||||
@@ -548,12 +590,12 @@ func (s *Service) UpsertItem(ref *provider.Reference) {
|
||||
}
|
||||
|
||||
if err = s.engine.Upsert(r.ID, r); err != nil {
|
||||
s.logger.Error().Err(err).Msg("error adding updating the resource in the index")
|
||||
} else {
|
||||
logDocCount(s.engine, s.logger)
|
||||
return fmt.Errorf("index upsert for %s: %w", ref.Path, err)
|
||||
}
|
||||
|
||||
logDocCount(s.engine, s.logger)
|
||||
s.storeExtractedMetadata(ctx, ref, doc)
|
||||
return nil
|
||||
}
|
||||
|
||||
// storeExtractedMetadata persists Tika-extracted audio, image, location, and
|
||||
|
||||
Reference in New Issue
Block a user