Compare commits
6 Commits
cli/fix/ig
...
module-app
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
db484b3f30 | ||
|
|
9f155bc79a | ||
|
|
05a1661d50 | ||
|
|
d5c4fc4c0e | ||
|
|
d71b1838f8 | ||
|
|
1de2b6ea2f |
@@ -170,8 +170,16 @@ spec:
|
||||
priorityClassName: "system-cluster-critical"
|
||||
containers:
|
||||
- name: app-service
|
||||
image: beclab/app-service:0.4.74
|
||||
image: beclab/app-service:0.4.75
|
||||
imagePullPolicy: IfNotPresent
|
||||
ports:
|
||||
- containerPort: 6755
|
||||
readinessProbe:
|
||||
failureThreshold: 5
|
||||
initialDelaySeconds: 10
|
||||
periodSeconds: 5
|
||||
tcpSocket:
|
||||
port: 6755
|
||||
securityContext:
|
||||
runAsUser: 0
|
||||
env:
|
||||
|
||||
@@ -18,7 +18,6 @@ import (
|
||||
appevent "github.com/beclab/Olares/framework/app-service/pkg/event"
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/generated/clientset/versioned"
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/images"
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/utils"
|
||||
|
||||
kbappsv1 "github.com/apecloud/kubeblocks/apis/apps/v1"
|
||||
kbopv1alphav1 "github.com/apecloud/kubeblocks/apis/operations/v1alpha1"
|
||||
@@ -139,16 +138,16 @@ func main() {
|
||||
setupLog.Error(err, "Unable to create controller", "controller", "Security")
|
||||
os.Exit(1)
|
||||
}
|
||||
natsConn, err := utils.NewNatsConn()
|
||||
if err != nil {
|
||||
setupLog.Error(err, "Failed to connect to NATS")
|
||||
os.Exit(1)
|
||||
}
|
||||
defer natsConn.Drain()
|
||||
appEventQueue := appevent.NewAppEventQueue(ictx, natsConn)
|
||||
appEventQueue := appevent.NewAppEventQueue(ictx, nil)
|
||||
appevent.SetAppEventQueue(appEventQueue)
|
||||
go appEventQueue.Run()
|
||||
|
||||
defer func() {
|
||||
if nc := appEventQueue.GetNatsConn(); nc != nil {
|
||||
nc.Drain()
|
||||
}
|
||||
}()
|
||||
|
||||
if err = (&controllers.ApplicationManagerController{
|
||||
Client: mgr.GetClient(),
|
||||
KubeConfig: config,
|
||||
@@ -205,7 +204,7 @@ func main() {
|
||||
if err = (&controllers.NodeAlertController{
|
||||
Client: mgr.GetClient(),
|
||||
KubeConfig: config,
|
||||
NatsConn: natsConn,
|
||||
NatsConn: nil,
|
||||
}).SetupWithManager(mgr); err != nil {
|
||||
setupLog.Error(err, "Unable to create controller", "controller", "NodeAlert")
|
||||
os.Exit(1)
|
||||
|
||||
@@ -56,6 +56,7 @@ type NodeAlertController struct {
|
||||
lastPressureState map[string]bool
|
||||
mutex sync.RWMutex
|
||||
NatsConn *nats.Conn
|
||||
natsConnMux sync.Mutex
|
||||
}
|
||||
|
||||
// SetupWithManager sets up the controller with the Manager.
|
||||
@@ -247,5 +248,31 @@ func (r *NodeAlertController) sendNodeAlert(nodeName string, pressureType NodePr
|
||||
|
||||
// publishToNats publishes a message to the specified NATS subject
|
||||
func (r *NodeAlertController) publishToNats(subject string, data interface{}) error {
|
||||
if err := r.ensureNatsConnected(); err != nil {
|
||||
return fmt.Errorf("failed to ensure NATS connection: %w", err)
|
||||
}
|
||||
return utils.PublishEvent(r.NatsConn, subject, data)
|
||||
}
|
||||
|
||||
func (r *NodeAlertController) ensureNatsConnected() error {
|
||||
r.natsConnMux.Lock()
|
||||
defer r.natsConnMux.Unlock()
|
||||
|
||||
if r.NatsConn != nil && r.NatsConn.IsConnected() {
|
||||
return nil
|
||||
}
|
||||
if r.NatsConn != nil {
|
||||
r.NatsConn.Close()
|
||||
}
|
||||
|
||||
klog.Info("NATS connection not established in NodeAlertController, attempting to connect...")
|
||||
nc, err := utils.NewNatsConn()
|
||||
if err != nil {
|
||||
klog.Errorf("NodeAlertController failed to connect to NATS: %v", err)
|
||||
return err
|
||||
}
|
||||
|
||||
r.NatsConn = nc
|
||||
klog.Info("NodeAlertController successfully connected to NATS")
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -984,6 +984,9 @@ func (h *Handler) oamValues(req *restful.Request, resp *restful.Response) {
|
||||
values["mongodb"] = map[string]interface{}{
|
||||
"databases": map[string]interface{}{},
|
||||
}
|
||||
values["clickhouse"] = map[string]interface{}{
|
||||
"databases": map[string]interface{}{},
|
||||
}
|
||||
values["svcs"] = map[string]interface{}{}
|
||||
values["nats"] = map[string]interface{}{
|
||||
"subjects": map[string]interface{}{},
|
||||
|
||||
@@ -51,6 +51,7 @@ var (
|
||||
tapr.TypeElasticsearch.String(),
|
||||
tapr.TypeMariaDB.String(),
|
||||
tapr.TypeMySQL.String(),
|
||||
tapr.TypeClickHouse.String(),
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
@@ -3,6 +3,7 @@ package event
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/utils"
|
||||
@@ -17,9 +18,10 @@ import (
|
||||
var AppEventQueue *QueuedEventController
|
||||
|
||||
type QueuedEventController struct {
|
||||
wq workqueue.RateLimitingInterface
|
||||
ctx context.Context
|
||||
nc *nats.Conn
|
||||
wq workqueue.RateLimitingInterface
|
||||
ctx context.Context
|
||||
nc *nats.Conn
|
||||
ncMux sync.Mutex
|
||||
}
|
||||
|
||||
type QueueEvent struct {
|
||||
@@ -90,9 +92,41 @@ func (qe *QueuedEventController) enqueue(obj interface{}) {
|
||||
}
|
||||
|
||||
func (qe *QueuedEventController) publish(subject string, data interface{}) error {
|
||||
if err := qe.ensureNatsConnected(); err != nil {
|
||||
return fmt.Errorf("failed to ensure NATS connection: %w", err)
|
||||
}
|
||||
return utils.PublishEvent(qe.nc, subject, data)
|
||||
}
|
||||
|
||||
func (qe *QueuedEventController) ensureNatsConnected() error {
|
||||
qe.ncMux.Lock()
|
||||
defer qe.ncMux.Unlock()
|
||||
|
||||
if qe.nc != nil && qe.nc.IsConnected() {
|
||||
return nil
|
||||
}
|
||||
if qe.nc != nil {
|
||||
qe.nc.Close()
|
||||
}
|
||||
|
||||
klog.Info("NATS connection not established, attempting to connect...")
|
||||
nc, err := utils.NewNatsConn()
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to connect to NATS: %v", err)
|
||||
return err
|
||||
}
|
||||
|
||||
qe.nc = nc
|
||||
klog.Info("Successfully connected to NATS")
|
||||
return nil
|
||||
}
|
||||
|
||||
func (qe *QueuedEventController) GetNatsConn() *nats.Conn {
|
||||
qe.ncMux.Lock()
|
||||
defer qe.ncMux.Unlock()
|
||||
return qe.nc
|
||||
}
|
||||
|
||||
func NewAppEventQueue(ctx context.Context, nc *nats.Conn) *QueuedEventController {
|
||||
return &QueuedEventController{
|
||||
ctx: ctx,
|
||||
|
||||
@@ -42,6 +42,9 @@ const (
|
||||
|
||||
// TypeMySQL indicates the middleware is mysql
|
||||
TypeMySQL MiddlewareType = "mysql"
|
||||
|
||||
// TypeClickHouse indicates the middleware is ClickHouse
|
||||
TypeClickHouse MiddlewareType = "clickhouse"
|
||||
)
|
||||
|
||||
func (mr MiddlewareType) String() string {
|
||||
@@ -323,6 +326,27 @@ func Apply(middleware *Middleware, kubeConfig *rest.Config, appName, appNamespac
|
||||
}
|
||||
klog.Infof("values.mysql: %v", vals["mysql"])
|
||||
}
|
||||
if middleware.ClickHouse != nil {
|
||||
username := fmt.Sprintf("%s-%s-%s", middleware.ClickHouse.Username, ownerName, appName)
|
||||
err := process(kubeConfig, appName, appNamespace, namespace, username, TypeClickHouse, ownerName, middleware)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
resp, err := getMiddlewareRequest(TypeClickHouse)
|
||||
if err != nil {
|
||||
klog.Errorf("failed to get clickHouse middleware request info %v", err)
|
||||
return err
|
||||
}
|
||||
vals["clickhouse"] = map[string]interface{}{
|
||||
"host": resp.Host,
|
||||
"port": resp.Port,
|
||||
"username": resp.UserName,
|
||||
"password": resp.Password,
|
||||
"databases": resp.Databases,
|
||||
}
|
||||
klog.Infof("values.clickhouse: %v", vals["clickhouse"])
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -383,6 +407,8 @@ func getPassword(middleware *Middleware, middlewareType MiddlewareType) (string,
|
||||
return middleware.MariaDB.Password, nil
|
||||
case TypeMySQL:
|
||||
return middleware.MySQL.Password, nil
|
||||
case TypeClickHouse:
|
||||
return middleware.ClickHouse.Password, nil
|
||||
}
|
||||
return "", fmt.Errorf("unsupported middleware type %v", middlewareType)
|
||||
}
|
||||
|
||||
@@ -287,6 +287,32 @@ spec:
|
||||
user: {{ .Middleware.Username }}
|
||||
`
|
||||
|
||||
const clickHouseRequest = `apiVersion: apr.bytetrade.io/v1alpha1
|
||||
kind: MiddlewareRequest
|
||||
metadata:
|
||||
name: {{ .AppName }}-clickhouse
|
||||
namespace: {{ .Namespace }}
|
||||
spec:
|
||||
app: {{ .AppName }}
|
||||
appNamespace: {{ .AppNamespace }}
|
||||
middleware: clickhouse
|
||||
clickhouse:
|
||||
databases:
|
||||
{{- range $k, $v := .Middleware.Databases }}
|
||||
- name: {{ $v.Name }}
|
||||
{{- end }}
|
||||
password:
|
||||
{{- if not (eq .Middleware.Password "") }}
|
||||
value: {{ .Middleware.Password }}
|
||||
{{- else }}
|
||||
valueFrom:
|
||||
secretKeyRef:
|
||||
name: {{ .AppName }}-{{ .Namespace }}-clickhouse-password
|
||||
key: "password"
|
||||
{{- end }}
|
||||
user: {{ .Middleware.Username }}
|
||||
`
|
||||
|
||||
type RequestParams struct {
|
||||
MiddlewareType MiddlewareType
|
||||
AppName string
|
||||
@@ -318,6 +344,8 @@ func GenMiddleRequest(p RequestParams) ([]byte, error) {
|
||||
return genMariadbRequest(p)
|
||||
case TypeMySQL:
|
||||
return genMysqlRequest(p)
|
||||
case TypeClickHouse:
|
||||
return genClickHouseRequest(p)
|
||||
default:
|
||||
return []byte{}, fmt.Errorf("unsupported middleware type: %s", p.MiddlewareType)
|
||||
}
|
||||
@@ -512,3 +540,22 @@ func genElasticsearchRequest(p RequestParams) ([]byte, error) {
|
||||
}
|
||||
return renderTemplate(elasticsearchRequest, data)
|
||||
}
|
||||
|
||||
func genClickHouseRequest(p RequestParams) ([]byte, error) {
|
||||
data := struct {
|
||||
AppName string
|
||||
AppNamespace string
|
||||
Namespace string
|
||||
Middleware *ClickHouseConfig
|
||||
}{
|
||||
AppName: p.AppName,
|
||||
AppNamespace: p.AppNamespace,
|
||||
Namespace: p.Namespace,
|
||||
Middleware: &ClickHouseConfig{
|
||||
Username: p.Username,
|
||||
Password: p.Password,
|
||||
Databases: p.Middleware.ClickHouse.Databases,
|
||||
},
|
||||
}
|
||||
return renderTemplate(clickHouseRequest, data)
|
||||
}
|
||||
|
||||
@@ -12,6 +12,7 @@ type Middleware struct {
|
||||
MariaDB *MariaDBConfig `yaml:"mariadb,omitempty"`
|
||||
MySQL *MySQLConfig `yaml:"mysql,omitempty"`
|
||||
Argo *ArgoConfig `yaml:"argo,omitempty"`
|
||||
ClickHouse *ClickHouseConfig `yaml:"clickHouse,omitempty"`
|
||||
}
|
||||
|
||||
// Database specify database name and if distributed.
|
||||
@@ -92,6 +93,13 @@ type MySQLConfig struct {
|
||||
Databases []Database `yaml:"databases" json:"databases"`
|
||||
}
|
||||
|
||||
// ClickHouseConfig contains fields for clickhouse config.
|
||||
type ClickHouseConfig struct {
|
||||
Username string `yaml:"username" json:"username"`
|
||||
Password string `yaml:"password,omitempty" json:"password"`
|
||||
Databases []Database `yaml:"databases" json:"databases"`
|
||||
}
|
||||
|
||||
type NatsConfig struct {
|
||||
Username string `yaml:"username" json:"username"`
|
||||
Password string `yaml:"password,omitempty" json:"password,omitempty"`
|
||||
|
||||
@@ -162,6 +162,9 @@ func GetResourceListFromChart(chartPath string, values map[string]interface{}) (
|
||||
values["elasticsearch"] = map[string]interface{}{
|
||||
"indexes": map[string]interface{}{},
|
||||
}
|
||||
values["clickhouse"] = map[string]interface{}{
|
||||
"databases": map[string]interface{}{},
|
||||
}
|
||||
values["svcs"] = map[string]interface{}{}
|
||||
values["nats"] = map[string]interface{}{
|
||||
"subjects": map[string]interface{}{},
|
||||
|
||||
Reference in New Issue
Block a user