Compare commits

...

6 Commits

Author SHA1 Message Date
hysyeah
db484b3f30 feat: add clickhouse support (#2438) 2026-01-22 19:57:30 +08:00
hys
9f155bc79a feat: add clickhouse support 2026-01-22 19:47:14 +08:00
hys
05a1661d50 set appservice image tag and add readiness probe 2026-01-09 15:31:45 +08:00
hysyeah
d5c4fc4c0e fix: delay create nats conn (#2391) 2026-01-09 15:31:45 +08:00
hys
d71b1838f8 fix: helm upgrade do not use atomic param and allow upgrade failed release 2026-01-09 15:31:45 +08:00
hys
1de2b6ea2f fix: failed release upgrade 2026-01-09 15:31:45 +08:00
10 changed files with 169 additions and 13 deletions

View File

@@ -170,8 +170,16 @@ spec:
priorityClassName: "system-cluster-critical"
containers:
- name: app-service
image: beclab/app-service:0.4.74
image: beclab/app-service:0.4.75
imagePullPolicy: IfNotPresent
ports:
- containerPort: 6755
readinessProbe:
failureThreshold: 5
initialDelaySeconds: 10
periodSeconds: 5
tcpSocket:
port: 6755
securityContext:
runAsUser: 0
env:

View File

@@ -18,7 +18,6 @@ import (
appevent "github.com/beclab/Olares/framework/app-service/pkg/event"
"github.com/beclab/Olares/framework/app-service/pkg/generated/clientset/versioned"
"github.com/beclab/Olares/framework/app-service/pkg/images"
"github.com/beclab/Olares/framework/app-service/pkg/utils"
kbappsv1 "github.com/apecloud/kubeblocks/apis/apps/v1"
kbopv1alphav1 "github.com/apecloud/kubeblocks/apis/operations/v1alpha1"
@@ -139,16 +138,16 @@ func main() {
setupLog.Error(err, "Unable to create controller", "controller", "Security")
os.Exit(1)
}
natsConn, err := utils.NewNatsConn()
if err != nil {
setupLog.Error(err, "Failed to connect to NATS")
os.Exit(1)
}
defer natsConn.Drain()
appEventQueue := appevent.NewAppEventQueue(ictx, natsConn)
appEventQueue := appevent.NewAppEventQueue(ictx, nil)
appevent.SetAppEventQueue(appEventQueue)
go appEventQueue.Run()
defer func() {
if nc := appEventQueue.GetNatsConn(); nc != nil {
nc.Drain()
}
}()
if err = (&controllers.ApplicationManagerController{
Client: mgr.GetClient(),
KubeConfig: config,
@@ -205,7 +204,7 @@ func main() {
if err = (&controllers.NodeAlertController{
Client: mgr.GetClient(),
KubeConfig: config,
NatsConn: natsConn,
NatsConn: nil,
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "Unable to create controller", "controller", "NodeAlert")
os.Exit(1)

View File

@@ -56,6 +56,7 @@ type NodeAlertController struct {
lastPressureState map[string]bool
mutex sync.RWMutex
NatsConn *nats.Conn
natsConnMux sync.Mutex
}
// SetupWithManager sets up the controller with the Manager.
@@ -247,5 +248,31 @@ func (r *NodeAlertController) sendNodeAlert(nodeName string, pressureType NodePr
// publishToNats publishes a message to the specified NATS subject
func (r *NodeAlertController) publishToNats(subject string, data interface{}) error {
if err := r.ensureNatsConnected(); err != nil {
return fmt.Errorf("failed to ensure NATS connection: %w", err)
}
return utils.PublishEvent(r.NatsConn, subject, data)
}
func (r *NodeAlertController) ensureNatsConnected() error {
r.natsConnMux.Lock()
defer r.natsConnMux.Unlock()
if r.NatsConn != nil && r.NatsConn.IsConnected() {
return nil
}
if r.NatsConn != nil {
r.NatsConn.Close()
}
klog.Info("NATS connection not established in NodeAlertController, attempting to connect...")
nc, err := utils.NewNatsConn()
if err != nil {
klog.Errorf("NodeAlertController failed to connect to NATS: %v", err)
return err
}
r.NatsConn = nc
klog.Info("NodeAlertController successfully connected to NATS")
return nil
}

View File

@@ -984,6 +984,9 @@ func (h *Handler) oamValues(req *restful.Request, resp *restful.Response) {
values["mongodb"] = map[string]interface{}{
"databases": map[string]interface{}{},
}
values["clickhouse"] = map[string]interface{}{
"databases": map[string]interface{}{},
}
values["svcs"] = map[string]interface{}{}
values["nats"] = map[string]interface{}{
"subjects": map[string]interface{}{},

View File

@@ -51,6 +51,7 @@ var (
tapr.TypeElasticsearch.String(),
tapr.TypeMariaDB.String(),
tapr.TypeMySQL.String(),
tapr.TypeClickHouse.String(),
}
)

View File

@@ -3,6 +3,7 @@ package event
import (
"context"
"fmt"
"sync"
"time"
"github.com/beclab/Olares/framework/app-service/pkg/utils"
@@ -17,9 +18,10 @@ import (
var AppEventQueue *QueuedEventController
type QueuedEventController struct {
wq workqueue.RateLimitingInterface
ctx context.Context
nc *nats.Conn
wq workqueue.RateLimitingInterface
ctx context.Context
nc *nats.Conn
ncMux sync.Mutex
}
type QueueEvent struct {
@@ -90,9 +92,41 @@ func (qe *QueuedEventController) enqueue(obj interface{}) {
}
func (qe *QueuedEventController) publish(subject string, data interface{}) error {
if err := qe.ensureNatsConnected(); err != nil {
return fmt.Errorf("failed to ensure NATS connection: %w", err)
}
return utils.PublishEvent(qe.nc, subject, data)
}
func (qe *QueuedEventController) ensureNatsConnected() error {
qe.ncMux.Lock()
defer qe.ncMux.Unlock()
if qe.nc != nil && qe.nc.IsConnected() {
return nil
}
if qe.nc != nil {
qe.nc.Close()
}
klog.Info("NATS connection not established, attempting to connect...")
nc, err := utils.NewNatsConn()
if err != nil {
klog.Errorf("Failed to connect to NATS: %v", err)
return err
}
qe.nc = nc
klog.Info("Successfully connected to NATS")
return nil
}
func (qe *QueuedEventController) GetNatsConn() *nats.Conn {
qe.ncMux.Lock()
defer qe.ncMux.Unlock()
return qe.nc
}
func NewAppEventQueue(ctx context.Context, nc *nats.Conn) *QueuedEventController {
return &QueuedEventController{
ctx: ctx,

View File

@@ -42,6 +42,9 @@ const (
// TypeMySQL indicates the middleware is mysql
TypeMySQL MiddlewareType = "mysql"
// TypeClickHouse indicates the middleware is ClickHouse
TypeClickHouse MiddlewareType = "clickhouse"
)
func (mr MiddlewareType) String() string {
@@ -323,6 +326,27 @@ func Apply(middleware *Middleware, kubeConfig *rest.Config, appName, appNamespac
}
klog.Infof("values.mysql: %v", vals["mysql"])
}
if middleware.ClickHouse != nil {
username := fmt.Sprintf("%s-%s-%s", middleware.ClickHouse.Username, ownerName, appName)
err := process(kubeConfig, appName, appNamespace, namespace, username, TypeClickHouse, ownerName, middleware)
if err != nil {
return err
}
resp, err := getMiddlewareRequest(TypeClickHouse)
if err != nil {
klog.Errorf("failed to get clickHouse middleware request info %v", err)
return err
}
vals["clickhouse"] = map[string]interface{}{
"host": resp.Host,
"port": resp.Port,
"username": resp.UserName,
"password": resp.Password,
"databases": resp.Databases,
}
klog.Infof("values.clickhouse: %v", vals["clickhouse"])
}
return nil
}
@@ -383,6 +407,8 @@ func getPassword(middleware *Middleware, middlewareType MiddlewareType) (string,
return middleware.MariaDB.Password, nil
case TypeMySQL:
return middleware.MySQL.Password, nil
case TypeClickHouse:
return middleware.ClickHouse.Password, nil
}
return "", fmt.Errorf("unsupported middleware type %v", middlewareType)
}

View File

@@ -287,6 +287,32 @@ spec:
user: {{ .Middleware.Username }}
`
const clickHouseRequest = `apiVersion: apr.bytetrade.io/v1alpha1
kind: MiddlewareRequest
metadata:
name: {{ .AppName }}-clickhouse
namespace: {{ .Namespace }}
spec:
app: {{ .AppName }}
appNamespace: {{ .AppNamespace }}
middleware: clickhouse
clickhouse:
databases:
{{- range $k, $v := .Middleware.Databases }}
- name: {{ $v.Name }}
{{- end }}
password:
{{- if not (eq .Middleware.Password "") }}
value: {{ .Middleware.Password }}
{{- else }}
valueFrom:
secretKeyRef:
name: {{ .AppName }}-{{ .Namespace }}-clickhouse-password
key: "password"
{{- end }}
user: {{ .Middleware.Username }}
`
type RequestParams struct {
MiddlewareType MiddlewareType
AppName string
@@ -318,6 +344,8 @@ func GenMiddleRequest(p RequestParams) ([]byte, error) {
return genMariadbRequest(p)
case TypeMySQL:
return genMysqlRequest(p)
case TypeClickHouse:
return genClickHouseRequest(p)
default:
return []byte{}, fmt.Errorf("unsupported middleware type: %s", p.MiddlewareType)
}
@@ -512,3 +540,22 @@ func genElasticsearchRequest(p RequestParams) ([]byte, error) {
}
return renderTemplate(elasticsearchRequest, data)
}
func genClickHouseRequest(p RequestParams) ([]byte, error) {
data := struct {
AppName string
AppNamespace string
Namespace string
Middleware *ClickHouseConfig
}{
AppName: p.AppName,
AppNamespace: p.AppNamespace,
Namespace: p.Namespace,
Middleware: &ClickHouseConfig{
Username: p.Username,
Password: p.Password,
Databases: p.Middleware.ClickHouse.Databases,
},
}
return renderTemplate(clickHouseRequest, data)
}

View File

@@ -12,6 +12,7 @@ type Middleware struct {
MariaDB *MariaDBConfig `yaml:"mariadb,omitempty"`
MySQL *MySQLConfig `yaml:"mysql,omitempty"`
Argo *ArgoConfig `yaml:"argo,omitempty"`
ClickHouse *ClickHouseConfig `yaml:"clickHouse,omitempty"`
}
// Database specify database name and if distributed.
@@ -92,6 +93,13 @@ type MySQLConfig struct {
Databases []Database `yaml:"databases" json:"databases"`
}
// ClickHouseConfig contains fields for clickhouse config.
type ClickHouseConfig struct {
Username string `yaml:"username" json:"username"`
Password string `yaml:"password,omitempty" json:"password"`
Databases []Database `yaml:"databases" json:"databases"`
}
type NatsConfig struct {
Username string `yaml:"username" json:"username"`
Password string `yaml:"password,omitempty" json:"password,omitempty"`

View File

@@ -162,6 +162,9 @@ func GetResourceListFromChart(chartPath string, values map[string]interface{}) (
values["elasticsearch"] = map[string]interface{}{
"indexes": map[string]interface{}{},
}
values["clickhouse"] = map[string]interface{}{
"databases": map[string]interface{}{},
}
values["svcs"] = map[string]interface{}{}
values["nats"] = map[string]interface{}{
"subjects": map[string]interface{}{},