mirror of
https://github.com/owncloud/ocis
synced 2026-04-25 17:25:21 +02:00
* feat(search): introduce search query package With the increasing complexity of how we organize our resources, the search must also be able to find them using entity properties. The query package provides the necessary functionality to do this. This makes it possible to search for resources via KQL, the microsoft spec is largely covered and can be used for this. In the current state, the legacy query language is still used, in a future update this will be deprecated and KQL will become the standard
196 lines
5.8 KiB
Go
196 lines
5.8 KiB
Go
package service
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"time"
|
|
|
|
user "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1"
|
|
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
|
|
revactx "github.com/cs3org/reva/v2/pkg/ctx"
|
|
"github.com/cs3org/reva/v2/pkg/errtypes"
|
|
"github.com/cs3org/reva/v2/pkg/events/stream"
|
|
"github.com/cs3org/reva/v2/pkg/rgrpc/todo/pool"
|
|
"github.com/cs3org/reva/v2/pkg/token"
|
|
"github.com/cs3org/reva/v2/pkg/token/manager/jwt"
|
|
"github.com/jellydator/ttlcache/v2"
|
|
merrors "go-micro.dev/v4/errors"
|
|
"go-micro.dev/v4/metadata"
|
|
grpcmetadata "google.golang.org/grpc/metadata"
|
|
|
|
"github.com/owncloud/ocis/v2/ocis-pkg/log"
|
|
"github.com/owncloud/ocis/v2/ocis-pkg/registry"
|
|
v0 "github.com/owncloud/ocis/v2/protogen/gen/ocis/messages/search/v0"
|
|
searchsvc "github.com/owncloud/ocis/v2/protogen/gen/ocis/services/search/v0"
|
|
"github.com/owncloud/ocis/v2/services/search/pkg/content"
|
|
"github.com/owncloud/ocis/v2/services/search/pkg/engine"
|
|
"github.com/owncloud/ocis/v2/services/search/pkg/query/bleve"
|
|
"github.com/owncloud/ocis/v2/services/search/pkg/search"
|
|
)
|
|
|
|
// NewHandler returns a service implementation for Service.
|
|
func NewHandler(opts ...Option) (searchsvc.SearchProviderHandler, func(), error) {
|
|
teardown := func() {}
|
|
options := newOptions(opts...)
|
|
logger := options.Logger
|
|
cfg := options.Config
|
|
|
|
// initialize search engine
|
|
var eng engine.Engine
|
|
switch cfg.Engine.Type {
|
|
case "bleve":
|
|
idx, err := engine.NewBleveIndex(cfg.Engine.Bleve.Datapath)
|
|
if err != nil {
|
|
return nil, teardown, err
|
|
}
|
|
|
|
teardown = func() {
|
|
_ = idx.Close()
|
|
}
|
|
|
|
eng = engine.NewBleveEngine(idx, bleve.LegacyCreator)
|
|
default:
|
|
return nil, teardown, fmt.Errorf("unknown search engine: %s", cfg.Engine.Type)
|
|
}
|
|
|
|
// initialize gateway
|
|
selector, err := pool.GatewaySelector(cfg.Reva.Address, pool.WithRegistry(registry.GetRegistry()), pool.WithTracerProvider(options.TracerProvider))
|
|
if err != nil {
|
|
logger.Fatal().Err(err).Msg("could not get reva gateway selector")
|
|
return nil, teardown, err
|
|
}
|
|
// initialize search content extractor
|
|
var extractor content.Extractor
|
|
switch cfg.Extractor.Type {
|
|
case "basic":
|
|
if extractor, err = content.NewBasicExtractor(logger); err != nil {
|
|
return nil, teardown, err
|
|
}
|
|
case "tika":
|
|
if extractor, err = content.NewTikaExtractor(selector, logger, cfg); err != nil {
|
|
return nil, teardown, err
|
|
}
|
|
default:
|
|
return nil, teardown, fmt.Errorf("unknown search extractor: %s", cfg.Extractor.Type)
|
|
}
|
|
|
|
bus, err := stream.NatsFromConfig(cfg.Service.Name, stream.NatsConfig{
|
|
Endpoint: cfg.Events.Endpoint,
|
|
Cluster: cfg.Events.Cluster,
|
|
EnableTLS: cfg.Events.EnableTLS,
|
|
TLSInsecure: cfg.Events.TLSInsecure,
|
|
TLSRootCACertificate: cfg.Events.TLSRootCACertificate,
|
|
})
|
|
if err != nil {
|
|
return nil, teardown, err
|
|
}
|
|
|
|
ss := search.NewService(selector, eng, extractor, logger, cfg)
|
|
|
|
// setup event handling
|
|
if err := search.HandleEvents(ss, bus, logger, cfg); err != nil {
|
|
return nil, teardown, err
|
|
}
|
|
|
|
cache := ttlcache.NewCache()
|
|
if err := cache.SetTTL(time.Second); err != nil {
|
|
return nil, teardown, err
|
|
}
|
|
|
|
tokenManager, err := jwt.New(map[string]interface{}{
|
|
"secret": options.JWTSecret,
|
|
"expires": int64(24 * 60 * 60),
|
|
})
|
|
if err != nil {
|
|
return nil, teardown, err
|
|
}
|
|
|
|
return &Service{
|
|
id: cfg.GRPC.Namespace + "." + cfg.Service.Name,
|
|
log: logger,
|
|
searcher: ss,
|
|
cache: cache,
|
|
tokenManager: tokenManager,
|
|
}, teardown, nil
|
|
}
|
|
|
|
// Service implements the searchServiceHandler interface
|
|
type Service struct {
|
|
id string
|
|
log log.Logger
|
|
searcher search.Searcher
|
|
cache *ttlcache.Cache
|
|
tokenManager token.Manager
|
|
}
|
|
|
|
// Search handles the search
|
|
func (s Service) Search(ctx context.Context, in *searchsvc.SearchRequest, out *searchsvc.SearchResponse) error {
|
|
// Get token from the context (go-micro) and make it known to the reva client too (grpc)
|
|
t, ok := metadata.Get(ctx, revactx.TokenHeader)
|
|
if !ok {
|
|
s.log.Error().Msg("Could not get token from context")
|
|
return errors.New("could not get token from context")
|
|
}
|
|
ctx = grpcmetadata.AppendToOutgoingContext(ctx, revactx.TokenHeader, t)
|
|
|
|
// unpack user
|
|
u, _, err := s.tokenManager.DismantleToken(ctx, t)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
ctx = revactx.ContextSetUser(ctx, u)
|
|
|
|
key := cacheKey(in.Query, in.PageSize, in.Ref, u)
|
|
res, ok := s.FromCache(key)
|
|
if !ok {
|
|
var err error
|
|
res, err = s.searcher.Search(ctx, &searchsvc.SearchRequest{
|
|
Query: in.Query,
|
|
PageSize: in.PageSize,
|
|
Ref: in.Ref,
|
|
})
|
|
if err != nil {
|
|
switch err.(type) {
|
|
case errtypes.BadRequest:
|
|
return merrors.BadRequest(s.id, err.Error())
|
|
default:
|
|
return merrors.InternalServerError(s.id, err.Error())
|
|
}
|
|
}
|
|
|
|
s.Cache(key, res)
|
|
}
|
|
|
|
out.Matches = res.Matches
|
|
out.TotalMatches = res.TotalMatches
|
|
out.NextPageToken = res.NextPageToken
|
|
return nil
|
|
}
|
|
|
|
// IndexSpace (re)indexes all resources of a given space.
|
|
func (s Service) IndexSpace(ctx context.Context, in *searchsvc.IndexSpaceRequest, _ *searchsvc.IndexSpaceResponse) error {
|
|
return s.searcher.IndexSpace(&provider.StorageSpaceId{OpaqueId: in.SpaceId}, &user.UserId{OpaqueId: in.UserId})
|
|
}
|
|
|
|
// FromCache pulls a search result from cache
|
|
func (s Service) FromCache(key string) (*searchsvc.SearchResponse, bool) {
|
|
v, err := s.cache.Get(key)
|
|
if err != nil {
|
|
return nil, false
|
|
}
|
|
|
|
sr, ok := v.(*searchsvc.SearchResponse)
|
|
return sr, ok
|
|
}
|
|
|
|
// Cache caches the search result
|
|
func (s Service) Cache(key string, res *searchsvc.SearchResponse) {
|
|
// lets ignore the error
|
|
_ = s.cache.Set(key, res)
|
|
}
|
|
|
|
func cacheKey(query string, pagesize int32, ref *v0.Reference, user *user.User) string {
|
|
return fmt.Sprintf("%s|%d|%s$%s!%s/%s|%s", query, pagesize, ref.GetResourceId().GetStorageId(), ref.GetResourceId().GetSpaceId(), ref.GetResourceId().GetOpaqueId(), ref.GetPath(), user.GetId().GetOpaqueId())
|
|
}
|