Compare commits
11 Commits
module-l4-
...
module-app
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
256922e6d0 | ||
|
|
669de95449 | ||
|
|
8f703c5d23 | ||
|
|
94ae4e88fa | ||
|
|
71f2f2152c | ||
|
|
bf08441dbd | ||
|
|
ae837c6fd2 | ||
|
|
137dd9f505 | ||
|
|
e643c01917 | ||
|
|
66b65b8e44 | ||
|
|
aa31e4b388 |
@@ -170,7 +170,7 @@ spec:
|
||||
priorityClassName: "system-cluster-critical"
|
||||
containers:
|
||||
- name: app-service
|
||||
image: beclab/app-service:0.4.75
|
||||
image: beclab/app-service:0.4.76
|
||||
imagePullPolicy: IfNotPresent
|
||||
ports:
|
||||
- containerPort: 6755
|
||||
|
||||
@@ -193,6 +193,7 @@ func (r *ApplicationManagerController) publishStateChangeEvent(am *appv1alpha1.A
|
||||
RawAppName: am.Spec.RawAppName,
|
||||
Type: am.Spec.Type.String(),
|
||||
Title: apputils.AppTitle(am.Spec.Config),
|
||||
Icon: apputils.AppIcon(am.Spec.Config),
|
||||
Reason: am.Status.Reason,
|
||||
Message: am.Status.Message,
|
||||
})
|
||||
|
||||
@@ -252,6 +252,7 @@ func (r *EntranceStatusManagerController) updateEntranceStatus(ctx context.Conte
|
||||
RawAppName: appCopy.Spec.RawAppName,
|
||||
Type: am.Spec.Type.String(),
|
||||
Title: app.AppTitle(am.Spec.Config),
|
||||
Icon: app.AppIcon(am.Spec.Config),
|
||||
SharedEntrances: appCopy.Spec.SharedEntrances,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -4,9 +4,11 @@ import (
|
||||
"context"
|
||||
"os"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
appv1alpha1 "github.com/beclab/Olares/framework/app-service/api/app.bytetrade.io/v1alpha1"
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/apiserver/api"
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/appstate"
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/constants"
|
||||
apputils "github.com/beclab/Olares/framework/app-service/pkg/utils/app"
|
||||
@@ -120,7 +122,7 @@ func (r *PodAbnormalSuspendAppController) Reconcile(ctx context.Context, req ctr
|
||||
|
||||
if pod.Status.Reason == "Evicted" {
|
||||
klog.Infof("pod evicted name=%s namespace=%s, attempting to suspend app=%s owner=%s", pod.Name, pod.Namespace, appName, owner)
|
||||
ok, err := r.trySuspendApp(ctx, owner, appName, constants.AppStopDueToEvicted, "evicted pod: "+pod.Namespace+"/"+pod.Name)
|
||||
ok, err := r.trySuspendApp(ctx, owner, appName, constants.AppStopDueToEvicted, "evicted pod: "+pod.Namespace+"/"+pod.Name, pod.Namespace)
|
||||
if err != nil {
|
||||
klog.Errorf("suspend attempt failed for app=%s owner=%s: %v", appName, owner, err)
|
||||
return ctrl.Result{}, err
|
||||
@@ -147,7 +149,7 @@ func (r *PodAbnormalSuspendAppController) Reconcile(ctx context.Context, req ctr
|
||||
}
|
||||
|
||||
klog.Infof("attempting to suspend app=%s owner=%s due to pending unschedulable timeout", appName, owner)
|
||||
ok, err := r.trySuspendApp(ctx, owner, appName, constants.AppUnschedulable, "pending unschedulable timeout on pod: "+pod.Namespace+"/"+pod.Name)
|
||||
ok, err := r.trySuspendApp(ctx, owner, appName, constants.AppUnschedulable, "pending unschedulable timeout on pod: "+pod.Namespace+"/"+pod.Name, pod.Namespace)
|
||||
if err != nil {
|
||||
klog.Errorf("suspend attempt failed for app=%s owner=%s: %v", appName, owner, err)
|
||||
return ctrl.Result{}, err
|
||||
@@ -191,7 +193,7 @@ func pendingUnschedulableSince(pod *corev1.Pod) (time.Time, bool) {
|
||||
|
||||
// trySuspendApp attempts to suspend the app and returns (true, nil) if a suspend request was issued.
|
||||
// If the app is not suspendable yet, returns (false, nil) to trigger a short requeue.
|
||||
func (r *PodAbnormalSuspendAppController) trySuspendApp(ctx context.Context, owner, appName, reason, message string) (bool, error) {
|
||||
func (r *PodAbnormalSuspendAppController) trySuspendApp(ctx context.Context, owner, appName, reason, message, podNamespace string) (bool, error) {
|
||||
name, err := apputils.FmtAppMgrName(appName, owner, "")
|
||||
if err != nil {
|
||||
klog.Errorf("failed to format app manager name app=%s owner=%s: %v", appName, owner, err)
|
||||
@@ -215,6 +217,11 @@ func (r *PodAbnormalSuspendAppController) trySuspendApp(ctx context.Context, own
|
||||
return false, nil
|
||||
}
|
||||
|
||||
isServerPod := strings.HasSuffix(podNamespace, "-shared")
|
||||
if isServerPod {
|
||||
am.Annotations[api.AppStopAllKey] = "true"
|
||||
}
|
||||
|
||||
am.Spec.OpType = appv1alpha1.StopOp
|
||||
if err := r.Update(ctx, &am); err != nil {
|
||||
klog.Errorf("failed to update applicationmanager spec to StopOp name=%s app=%s owner=%s: %v", am.Name, appName, owner, err)
|
||||
|
||||
@@ -426,6 +426,11 @@ func (h *Handler) apps(req *restful.Request, resp *restful.Response) {
|
||||
api.HandleError(resp, req, err)
|
||||
return
|
||||
}
|
||||
for i := range appconfig.Entrances {
|
||||
if appconfig.Entrances[i].AuthLevel == "" {
|
||||
appconfig.Entrances[i].AuthLevel = "private"
|
||||
}
|
||||
}
|
||||
now := metav1.Now()
|
||||
name, _ := apputils.FmtAppMgrName(am.Spec.AppName, owner, appconfig.Namespace)
|
||||
app := &v1alpha1.Application{
|
||||
@@ -443,6 +448,7 @@ func (h *Handler) apps(req *restful.Request, resp *restful.Response) {
|
||||
Owner: owner,
|
||||
Entrances: appconfig.Entrances,
|
||||
SharedEntrances: appconfig.SharedEntrances,
|
||||
Ports: appconfig.Ports,
|
||||
Icon: appconfig.Icon,
|
||||
Settings: map[string]string{
|
||||
"title": am.Annotations[constants.ApplicationTitleLabel],
|
||||
@@ -477,6 +483,8 @@ func (h *Handler) apps(req *restful.Request, resp *restful.Response) {
|
||||
}
|
||||
if v, ok := appsMap[a.Name]; ok {
|
||||
v.Spec.Settings = a.Spec.Settings
|
||||
v.Spec.Entrances = a.Spec.Entrances
|
||||
v.Spec.Ports = a.Spec.Ports
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -738,6 +746,11 @@ func (h *Handler) allUsersApps(req *restful.Request, resp *restful.Response) {
|
||||
api.HandleError(resp, req, err)
|
||||
return
|
||||
}
|
||||
for i := range appconfig.Entrances {
|
||||
if appconfig.Entrances[i].AuthLevel == "" {
|
||||
appconfig.Entrances[i].AuthLevel = "private"
|
||||
}
|
||||
}
|
||||
|
||||
now := metav1.Now()
|
||||
app := v1alpha1.Application{
|
||||
@@ -754,6 +767,7 @@ func (h *Handler) allUsersApps(req *restful.Request, resp *restful.Response) {
|
||||
Namespace: am.Spec.AppNamespace,
|
||||
Owner: am.Spec.AppOwner,
|
||||
Entrances: appconfig.Entrances,
|
||||
Ports: appconfig.Ports,
|
||||
SharedEntrances: appconfig.SharedEntrances,
|
||||
Icon: appconfig.Icon,
|
||||
Settings: map[string]string{
|
||||
@@ -788,6 +802,8 @@ func (h *Handler) allUsersApps(req *restful.Request, resp *restful.Response) {
|
||||
}
|
||||
if v, ok := appsMap[a.Name]; ok {
|
||||
v.Spec.Settings = a.Spec.Settings
|
||||
v.Spec.Entrances = a.Spec.Entrances
|
||||
v.Spec.Ports = a.Spec.Ports
|
||||
}
|
||||
}
|
||||
|
||||
@@ -984,6 +1000,9 @@ func (h *Handler) oamValues(req *restful.Request, resp *restful.Response) {
|
||||
values["mongodb"] = map[string]interface{}{
|
||||
"databases": map[string]interface{}{},
|
||||
}
|
||||
values["clickhouse"] = map[string]interface{}{
|
||||
"databases": map[string]interface{}{},
|
||||
}
|
||||
values["svcs"] = map[string]interface{}{}
|
||||
values["nats"] = map[string]interface{}{
|
||||
"subjects": map[string]interface{}{},
|
||||
|
||||
@@ -187,6 +187,11 @@ func (h *Handler) listBackend(req *restful.Request, resp *restful.Response) {
|
||||
api.HandleError(resp, req, err)
|
||||
return
|
||||
}
|
||||
for i := range appconfig.Entrances {
|
||||
if appconfig.Entrances[i].AuthLevel == "" {
|
||||
appconfig.Entrances[i].AuthLevel = "private"
|
||||
}
|
||||
}
|
||||
|
||||
appconfig.SharedEntrances, err = appconfig.GenSharedEntranceURL(req.Request.Context())
|
||||
if err != nil {
|
||||
@@ -214,6 +219,7 @@ func (h *Handler) listBackend(req *restful.Request, resp *restful.Response) {
|
||||
Namespace: am.Spec.AppNamespace,
|
||||
Owner: am.Spec.AppOwner,
|
||||
Entrances: appconfig.Entrances,
|
||||
Ports: appconfig.Ports,
|
||||
SharedEntrances: appconfig.SharedEntrances,
|
||||
Icon: appconfig.Icon,
|
||||
Settings: map[string]string{
|
||||
@@ -274,6 +280,8 @@ func (h *Handler) listBackend(req *restful.Request, resp *restful.Response) {
|
||||
}
|
||||
if v, ok := appsMap[a.Name]; ok {
|
||||
v.Spec.Settings = a.Spec.Settings
|
||||
v.Spec.Entrances = a.Spec.Entrances
|
||||
v.Spec.Ports = a.Spec.Ports
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -51,6 +51,7 @@ var (
|
||||
tapr.TypeElasticsearch.String(),
|
||||
tapr.TypeMariaDB.String(),
|
||||
tapr.TypeMySQL.String(),
|
||||
tapr.TypeClickHouse.String(),
|
||||
}
|
||||
)
|
||||
|
||||
@@ -563,7 +564,7 @@ func (h *HelmOps) WaitForStartUp() (bool, error) {
|
||||
}
|
||||
return true, nil
|
||||
}
|
||||
if errors.Is(err, errcode.ErrPodPending) {
|
||||
if errors.Is(err, errcode.ErrPodPending) || errors.Is(err, errcode.ErrServerSidePodPending) {
|
||||
return false, err
|
||||
}
|
||||
|
||||
@@ -575,11 +576,41 @@ func (h *HelmOps) WaitForStartUp() (bool, error) {
|
||||
}
|
||||
|
||||
func (h *HelmOps) isStartUp() (bool, error) {
|
||||
pods, err := h.findAppSelectedPods()
|
||||
if h.app.IsV2() && h.app.IsMultiCharts() {
|
||||
serverPods, err := h.findServerPods()
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
podNames := make([]string, 0)
|
||||
for _, p := range serverPods {
|
||||
podNames = append(podNames, p.Name)
|
||||
}
|
||||
klog.Infof("podSErvers: %v", podNames)
|
||||
|
||||
serverStarted, err := checkIfStartup(serverPods, true)
|
||||
if err != nil {
|
||||
klog.Errorf("v2 app %s server pods not ready: %v", h.app.AppName, err)
|
||||
return false, err
|
||||
}
|
||||
|
||||
if !serverStarted {
|
||||
klog.Infof("v2 app %s server pods not started yet, waiting...", h.app.AppName)
|
||||
return false, nil
|
||||
}
|
||||
|
||||
klog.Infof("v2 app %s server pods started, checking client pods", h.app.AppName)
|
||||
}
|
||||
|
||||
clientPods, err := h.findV1OrClientPods()
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
return checkIfStartup(pods)
|
||||
|
||||
clientStarted, err := checkIfStartup(clientPods, false)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
return clientStarted, nil
|
||||
}
|
||||
|
||||
func (h *HelmOps) findAppSelectedPods() (*corev1.PodList, error) {
|
||||
@@ -609,15 +640,49 @@ func (h *HelmOps) findAppSelectedPods() (*corev1.PodList, error) {
|
||||
return pods, nil
|
||||
}
|
||||
|
||||
func checkIfStartup(pods *corev1.PodList) (bool, error) {
|
||||
if len(pods.Items) == 0 {
|
||||
func (h *HelmOps) findV1OrClientPods() ([]corev1.Pod, error) {
|
||||
podList, err := h.client.KubeClient.Kubernetes().CoreV1().Pods(h.app.Namespace).List(h.ctx, metav1.ListOptions{})
|
||||
if err != nil {
|
||||
klog.Errorf("app %s get pods err %v", h.app.AppName, err)
|
||||
return nil, err
|
||||
}
|
||||
return podList.Items, nil
|
||||
|
||||
}
|
||||
|
||||
func (h *HelmOps) findServerPods() ([]corev1.Pod, error) {
|
||||
pods := make([]corev1.Pod, 0)
|
||||
|
||||
for _, c := range h.app.SubCharts {
|
||||
if !c.Shared {
|
||||
continue
|
||||
}
|
||||
ns := c.Namespace(h.app.OwnerName)
|
||||
podList, err := h.client.KubeClient.Kubernetes().CoreV1().Pods(ns).List(h.ctx, metav1.ListOptions{})
|
||||
if err != nil {
|
||||
klog.Errorf("app %s get pods err %v", h.app.AppName, err)
|
||||
return nil, err
|
||||
}
|
||||
pods = append(pods, podList.Items...)
|
||||
}
|
||||
|
||||
return pods, nil
|
||||
}
|
||||
|
||||
func checkIfStartup(pods []corev1.Pod, isServerSide bool) (bool, error) {
|
||||
if len(pods) == 0 {
|
||||
return false, errors.New("no pod found")
|
||||
}
|
||||
for _, pod := range pods.Items {
|
||||
startedPods := 0
|
||||
totalPods := len(pods)
|
||||
for _, pod := range pods {
|
||||
creationTime := pod.GetCreationTimestamp()
|
||||
pendingDuration := time.Since(creationTime.Time)
|
||||
|
||||
if pod.Status.Phase == corev1.PodPending && pendingDuration > time.Minute*10 {
|
||||
if isServerSide {
|
||||
return false, errcode.ErrServerSidePodPending
|
||||
}
|
||||
return false, errcode.ErrPodPending
|
||||
}
|
||||
totalContainers := len(pod.Spec.Containers)
|
||||
@@ -629,9 +694,12 @@ func checkIfStartup(pods *corev1.PodList) (bool, error) {
|
||||
}
|
||||
}
|
||||
if startedContainers == totalContainers {
|
||||
return true, nil
|
||||
startedPods++
|
||||
}
|
||||
}
|
||||
if totalPods == startedPods {
|
||||
return true, nil
|
||||
}
|
||||
return false, nil
|
||||
}
|
||||
|
||||
@@ -795,7 +863,7 @@ func (h *HelmOps) Install() error {
|
||||
return nil
|
||||
}
|
||||
ok, err := h.WaitForStartUp()
|
||||
if err != nil && errors.Is(err, errcode.ErrPodPending) {
|
||||
if err != nil && (errors.Is(err, errcode.ErrPodPending) || errors.Is(err, errcode.ErrServerSidePodPending)) {
|
||||
return err
|
||||
}
|
||||
if !ok {
|
||||
|
||||
@@ -51,7 +51,7 @@ func (h *HelmOpsV2) ApplyEnv() error {
|
||||
}
|
||||
|
||||
ok, err := h.WaitForStartUp()
|
||||
if err != nil && errors.Is(err, errcode.ErrPodPending) {
|
||||
if err != nil && (errors.Is(err, errcode.ErrPodPending) || errors.Is(err, errcode.ErrServerSidePodPending)) {
|
||||
return err
|
||||
}
|
||||
|
||||
|
||||
@@ -119,7 +119,7 @@ func (h *HelmOpsV2) Install() error {
|
||||
return nil
|
||||
}
|
||||
ok, err := h.WaitForStartUp()
|
||||
if err != nil && errors.Is(err, errcode.ErrPodPending) {
|
||||
if err != nil && (errors.Is(err, errcode.ErrPodPending) || errors.Is(err, errcode.ErrServerSidePodPending)) {
|
||||
klog.Errorf("App %s is pending, err=%v", h.App().AppName, err)
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -91,7 +91,7 @@ func (h *HelmOpsV2) Upgrade() error {
|
||||
}
|
||||
|
||||
ok, err := h.WaitForStartUp()
|
||||
if err != nil && errors.Is(err, errcode.ErrPodPending) {
|
||||
if err != nil && (errors.Is(err, errcode.ErrPodPending) || errors.Is(err, errcode.ErrServerSidePodPending)) {
|
||||
return err
|
||||
}
|
||||
|
||||
|
||||
@@ -13,6 +13,7 @@ import (
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/images"
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/kubesphere"
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/utils"
|
||||
apputils "github.com/beclab/Olares/framework/app-service/pkg/utils/app"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
@@ -47,6 +48,29 @@ func (r *downloadingInProgressApp) WaitAsync(ctx context.Context) {
|
||||
return
|
||||
}
|
||||
|
||||
// Check Kubernetes request resources before transitioning to Installing
|
||||
var appConfig *appcfg.ApplicationConfig
|
||||
if err := json.Unmarshal([]byte(r.manager.Spec.Config), &appConfig); err != nil {
|
||||
klog.Errorf("failed to unmarshal app config for %s: %v", r.manager.Spec.AppName, err)
|
||||
updateErr := r.updateStatus(context.TODO(), r.manager, appsv1.InstallFailed, nil, fmt.Sprintf("invalid app config: %v", err), "")
|
||||
if updateErr != nil {
|
||||
klog.Errorf("update app manager %s to %s state failed %v", r.manager.Name, appsv1.InstallFailed.String(), updateErr)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
_, conditionType, checkErr := apputils.CheckAppK8sRequestResource(appConfig, r.manager.Spec.OpType)
|
||||
if checkErr != nil {
|
||||
klog.Errorf("k8s request resource check failed for app %s: %v", r.manager.Spec.AppName, checkErr)
|
||||
opRecord := makeRecord(r.manager, appsv1.InstallFailed, checkErr.Error())
|
||||
updateErr := r.updateStatus(context.TODO(), r.manager, appsv1.InstallFailed, opRecord, checkErr.Error(), string(conditionType))
|
||||
if updateErr != nil {
|
||||
klog.Errorf("update app manager %s to %s state failed %v", r.manager.Name, appsv1.InstallFailed.String(), updateErr)
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
updateErr := r.updateStatus(context.TODO(), r.manager, appsv1.Installing, nil, appsv1.Installing.String(), "")
|
||||
if updateErr != nil {
|
||||
klog.Errorf("update app manager %s to %s state failed %v", r.manager.Name, appsv1.Installing.String(), updateErr)
|
||||
|
||||
@@ -16,6 +16,7 @@ import (
|
||||
apputils "github.com/beclab/Olares/framework/app-service/pkg/utils/app"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/klog/v2"
|
||||
ctrl "sigs.k8s.io/controller-runtime"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||
@@ -104,8 +105,37 @@ func (p *InstallingApp) Exec(ctx context.Context) (StatefulInProgressApp, error)
|
||||
err = ops.Install()
|
||||
if err != nil {
|
||||
klog.Errorf("install app %s failed %v", p.manager.Spec.AppName, err)
|
||||
if errors.Is(err, errcode.ErrPodPending) {
|
||||
if errors.Is(err, errcode.ErrServerSidePodPending) {
|
||||
p.finally = func() {
|
||||
klog.Infof("app %s server side pods is pending, set stop-all annotation and update app state to stopping", p.manager.Spec.AppName)
|
||||
|
||||
var am appsv1.ApplicationManager
|
||||
if err := p.client.Get(context.TODO(), types.NamespacedName{Name: p.manager.Name}, &am); err != nil {
|
||||
klog.Errorf("failed to get application manager: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
if am.Annotations == nil {
|
||||
am.Annotations = make(map[string]string)
|
||||
}
|
||||
am.Annotations[api.AppStopAllKey] = "true"
|
||||
|
||||
if err := p.client.Update(ctx, &am); err != nil {
|
||||
klog.Errorf("failed to set stop-all annotation: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
updateErr := p.updateStatus(ctx, &am, appsv1.Stopping, nil, err.Error(), constants.AppUnschedulable)
|
||||
if updateErr != nil {
|
||||
klog.Errorf("update status failed %v", updateErr)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
if errors.Is(err, errcode.ErrPodPending) {
|
||||
p.finally = func() {
|
||||
klog.Infof("app %s pods is still pending, update app state to stopping", p.manager.Spec.AppName)
|
||||
updateErr := p.updateStatus(context.TODO(), p.manager, appsv1.Stopping, nil, err.Error(), constants.AppUnschedulable)
|
||||
|
||||
@@ -2,13 +2,11 @@ package appstate
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
appsv1 "github.com/beclab/Olares/framework/app-service/api/app.bytetrade.io/v1alpha1"
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/apiserver/api"
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/appcfg"
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/constants"
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/kubeblocks"
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/users/userspace"
|
||||
@@ -60,41 +58,24 @@ func (p *ResumingApp) Exec(ctx context.Context) (StatefulInProgressApp, error) {
|
||||
}
|
||||
|
||||
func (p *ResumingApp) exec(ctx context.Context) error {
|
||||
err := suspendOrResumeApp(ctx, p.client, p.manager, int32(1))
|
||||
if err != nil {
|
||||
klog.Errorf("resume %s %s failed %v", p.manager.Spec.Type, p.manager.Spec.AppName, err)
|
||||
return fmt.Errorf("resume app %s failed %w", p.manager.Spec.AppName, err)
|
||||
}
|
||||
|
||||
// If resume-all is requested, also resume v2 server-side shared charts by scaling them up
|
||||
if p.manager.Annotations[api.AppResumeAllKey] == "true" {
|
||||
var appCfg *appcfg.ApplicationConfig
|
||||
if err := json.Unmarshal([]byte(p.manager.Spec.Config), &appCfg); err != nil {
|
||||
klog.Errorf("unmarshal to appConfig failed %v", err)
|
||||
return err
|
||||
// Check if resume-all is requested for V2 apps to also resume server-side shared charts
|
||||
resumeServer := p.manager.Annotations[api.AppResumeAllKey] == "true"
|
||||
if resumeServer {
|
||||
err := resumeV2AppAll(ctx, p.client, p.manager)
|
||||
if err != nil {
|
||||
klog.Errorf("resume v2 app %s %s failed %v", p.manager.Spec.Type, p.manager.Spec.AppName, err)
|
||||
return fmt.Errorf("resume v2 app %s failed %w", p.manager.Spec.AppName, err)
|
||||
}
|
||||
if appCfg != nil && appCfg.IsV2() && appCfg.HasClusterSharedCharts() {
|
||||
for _, chart := range appCfg.SubCharts {
|
||||
if !chart.Shared {
|
||||
continue
|
||||
}
|
||||
ns := chart.Namespace(appCfg.OwnerName)
|
||||
// create a shallow copy with target namespace/name for scaling logic
|
||||
amCopy := p.manager.DeepCopy()
|
||||
amCopy.Spec.AppNamespace = ns
|
||||
amCopy.Spec.AppName = chart.Name
|
||||
klog.Infof("resume-amCopy.Spec.AppNamespace: %s", ns)
|
||||
klog.Infof("resume-amCopy.Spec.AppName: %s", chart.Name)
|
||||
if err := suspendOrResumeApp(ctx, p.client, amCopy, int32(1)); err != nil {
|
||||
klog.Errorf("failed to resume shared chart %s in namespace %s: %v", chart.Name, ns, err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
} else {
|
||||
err := resumeV1AppOrV2AppClient(ctx, p.client, p.manager)
|
||||
if err != nil {
|
||||
klog.Errorf("resume v2 app %s %s failed %v", p.manager.Spec.Type, p.manager.Spec.AppName, err)
|
||||
return fmt.Errorf("resume v2 app %s failed %w", p.manager.Spec.AppName, err)
|
||||
}
|
||||
}
|
||||
|
||||
if p.manager.Spec.Type == "middleware" && userspace.IsKbMiddlewares(p.manager.Spec.AppName) {
|
||||
err = p.execMiddleware(ctx)
|
||||
err := p.execMiddleware(ctx)
|
||||
if err != nil {
|
||||
klog.Errorf("failed to resume middleware %s,err=%v", p.manager.Spec.AppName, err)
|
||||
return err
|
||||
|
||||
@@ -2,10 +2,13 @@ package appstate
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
appsv1 "github.com/beclab/Olares/framework/app-service/api/app.bytetrade.io/v1alpha1"
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/apiserver/api"
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/kubeblocks"
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/users/userspace"
|
||||
|
||||
kbopv1alpha1 "github.com/apecloud/kubeblocks/apis/operations/v1alpha1"
|
||||
"k8s.io/klog/v2"
|
||||
@@ -44,20 +47,30 @@ func (p *SuspendFailedApp) Exec(ctx context.Context) (StatefulInProgressApp, err
|
||||
}
|
||||
|
||||
func (p *SuspendFailedApp) StateReconcile(ctx context.Context) error {
|
||||
err := suspendOrResumeApp(ctx, p.client, p.manager, int32(0))
|
||||
if err != nil {
|
||||
klog.Errorf("stop-failed-app %s state reconcile failed %v", p.manager.Spec.AppName, err)
|
||||
return err
|
||||
stopServer := p.manager.Annotations[api.AppStopAllKey] == "true"
|
||||
if stopServer {
|
||||
err := suspendV2AppAll(ctx, p.client, p.manager)
|
||||
if err != nil {
|
||||
klog.Errorf("suspend v2 app %s %s failed %v", p.manager.Spec.Type, p.manager.Spec.AppName, err)
|
||||
return fmt.Errorf("suspend v2 app %s failed %w", p.manager.Spec.AppName, err)
|
||||
}
|
||||
} else {
|
||||
err := suspendV1AppOrV2Client(ctx, p.client, p.manager)
|
||||
if err != nil {
|
||||
klog.Errorf("suspend app %s %s failed %v", p.manager.Spec.Type, p.manager.Spec.AppName, err)
|
||||
return fmt.Errorf("suspend app %s failed %w", p.manager.Spec.AppName, err)
|
||||
}
|
||||
}
|
||||
if p.manager.Spec.Type == "middleware" {
|
||||
|
||||
if p.manager.Spec.Type == "middleware" && userspace.IsKbMiddlewares(p.manager.Spec.AppName) {
|
||||
op := kubeblocks.NewOperation(ctx, kbopv1alpha1.StopType, p.manager, p.client)
|
||||
err = op.Stop()
|
||||
err := op.Stop()
|
||||
if err != nil {
|
||||
klog.Errorf("stop-failed-middleware %s state reconcile failed %v", p.manager.Spec.AppName, err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
return err
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *SuspendFailedApp) Cancel(ctx context.Context) error {
|
||||
|
||||
@@ -2,17 +2,17 @@ package appstate
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
appsv1 "github.com/beclab/Olares/framework/app-service/api/app.bytetrade.io/v1alpha1"
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/apiserver/api"
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/appcfg"
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/constants"
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/kubeblocks"
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/users/userspace"
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/utils"
|
||||
apputils "github.com/beclab/Olares/framework/app-service/pkg/utils/app"
|
||||
|
||||
kbopv1alpha1 "github.com/apecloud/kubeblocks/apis/operations/v1alpha1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
@@ -78,35 +78,68 @@ func (p *SuspendingApp) Exec(ctx context.Context) (StatefulInProgressApp, error)
|
||||
}
|
||||
|
||||
func (p *SuspendingApp) exec(ctx context.Context) error {
|
||||
err := suspendOrResumeApp(ctx, p.client, p.manager, int32(0))
|
||||
if err != nil {
|
||||
klog.Errorf("suspend %s %s failed %v", p.manager.Spec.Type, p.manager.Spec.AppName, err)
|
||||
return fmt.Errorf("suspend app %s failed %w", p.manager.Spec.AppName, err)
|
||||
}
|
||||
// If stop-all is requested, also stop v2 server-side shared charts by scaling them down
|
||||
if p.manager.Annotations[api.AppStopAllKey] == "true" {
|
||||
var appCfg *appcfg.ApplicationConfig
|
||||
if err := json.Unmarshal([]byte(p.manager.Spec.Config), &appCfg); err != nil {
|
||||
klog.Errorf("unmarshal to appConfig failed %v", err)
|
||||
return err
|
||||
// Check if stop-all is requested for V2 apps to also stop server-side shared charts
|
||||
stopServer := p.manager.Annotations[api.AppStopAllKey] == "true"
|
||||
if stopServer {
|
||||
err := suspendV2AppAll(ctx, p.client, p.manager)
|
||||
if err != nil {
|
||||
klog.Errorf("suspend v2 app %s %s failed %v", p.manager.Spec.Type, p.manager.Spec.AppName, err)
|
||||
return fmt.Errorf("suspend v2 app %s failed %w", p.manager.Spec.AppName, err)
|
||||
}
|
||||
if appCfg != nil && appCfg.IsV2() && appCfg.HasClusterSharedCharts() {
|
||||
for _, chart := range appCfg.SubCharts {
|
||||
if !chart.Shared {
|
||||
} else {
|
||||
err := suspendV1AppOrV2Client(ctx, p.client, p.manager)
|
||||
if err != nil {
|
||||
klog.Errorf("suspend app %s %s failed %v", p.manager.Spec.Type, p.manager.Spec.AppName, err)
|
||||
return fmt.Errorf("suspend app %s failed %w", p.manager.Spec.AppName, err)
|
||||
}
|
||||
}
|
||||
|
||||
if stopServer {
|
||||
// For V2 cluster-scoped apps, when server is down, stop all other users' clients
|
||||
// because they share the same server and cannot function without it
|
||||
klog.Infof("stopping other users' clients for v2 app %s", p.manager.Spec.AppName)
|
||||
|
||||
var appManagerList appsv1.ApplicationManagerList
|
||||
if err := p.client.List(ctx, &appManagerList); err != nil {
|
||||
klog.Errorf("failed to list application managers: %v", err)
|
||||
} else {
|
||||
// find all ApplicationManagers with same AppName but different AppOwner
|
||||
for _, am := range appManagerList.Items {
|
||||
// Skip if same owner (already handled) or different app
|
||||
if am.Spec.AppName != p.manager.Spec.AppName || am.Spec.AppOwner == p.manager.Spec.AppOwner {
|
||||
continue
|
||||
}
|
||||
ns := chart.Namespace(appCfg.OwnerName)
|
||||
// create a shallow copy with target namespace/name for scaling logic
|
||||
amCopy := p.manager.DeepCopy()
|
||||
amCopy.Spec.AppNamespace = ns
|
||||
amCopy.Spec.AppName = chart.Name
|
||||
klog.Infof("amCopy.Spec.AppNamespace: %s", ns)
|
||||
klog.Infof("amCopy.Spec.AppName: %s", chart.Name)
|
||||
|
||||
if err := suspendOrResumeApp(ctx, p.client, amCopy, int32(0)); err != nil {
|
||||
klog.Errorf("failed to stop shared chart %s in namespace %s: %v", chart.Name, ns, err)
|
||||
if am.Spec.Type != appsv1.App && am.Spec.Type != appsv1.Middleware {
|
||||
continue
|
||||
}
|
||||
|
||||
if am.Status.State == appsv1.Stopped || am.Status.State == appsv1.Stopping {
|
||||
klog.Infof("app %s owner %s already in stopped/stopping state, skip", am.Spec.AppName, am.Spec.AppOwner)
|
||||
continue
|
||||
}
|
||||
|
||||
if !IsOperationAllowed(am.Status.State, appsv1.StopOp) {
|
||||
klog.Infof("app %s owner %s not allowed do stop operation, skip", am.Spec.AppName, am.Spec.AppOwner)
|
||||
continue
|
||||
}
|
||||
opID := strconv.FormatInt(time.Now().Unix(), 10)
|
||||
now := metav1.Now()
|
||||
status := appsv1.ApplicationManagerStatus{
|
||||
OpType: appsv1.StopOp,
|
||||
OpID: opID,
|
||||
State: appsv1.Stopping,
|
||||
StatusTime: &now,
|
||||
UpdateTime: &now,
|
||||
Reason: p.manager.Status.Reason,
|
||||
Message: p.manager.Status.Message,
|
||||
}
|
||||
if _, err := apputils.UpdateAppMgrStatus(am.Name, status); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
klog.Infof("stopping client for user %s, app %s", am.Spec.AppOwner, am.Spec.AppName)
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2,10 +2,13 @@ package appstate
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
|
||||
appv1alpha1 "github.com/beclab/Olares/framework/app-service/api/app.bytetrade.io/v1alpha1"
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/apiserver/api"
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/appcfg"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
appsv1 "k8s.io/api/apps/v1"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
@@ -21,25 +24,25 @@ import (
|
||||
const suspendAnnotation = "bytetrade.io/suspend-by"
|
||||
const suspendCauseAnnotation = "bytetrade.io/suspend-cause"
|
||||
|
||||
func suspendOrResumeApp(ctx context.Context, cli client.Client, am *appv1alpha1.ApplicationManager, replicas int32) error {
|
||||
suspend := func(list client.ObjectList) error {
|
||||
namespace := am.Spec.AppNamespace
|
||||
err := cli.List(ctx, list, client.InNamespace(namespace))
|
||||
if err != nil && !apierrors.IsNotFound(err) {
|
||||
klog.Errorf("Failed to get workload namespace=%s err=%v", namespace, err)
|
||||
// suspendOrResumeApp suspends or resumes an application.
|
||||
func suspendOrResumeApp(ctx context.Context, cli client.Client, am *appv1alpha1.ApplicationManager, replicas int32, stopOrResumeServer bool) error {
|
||||
suspendOrResume := func(list client.ObjectList, targetNamespace, targetAppName string) error {
|
||||
err := cli.List(ctx, list, client.InNamespace(targetNamespace))
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to get workload namespace=%s err=%v", targetNamespace, err)
|
||||
return err
|
||||
}
|
||||
|
||||
listObjects, err := apimeta.ExtractList(list)
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to extract list namespace=%s err=%v", namespace, err)
|
||||
klog.Errorf("Failed to extract list namespace=%s err=%v", targetNamespace, err)
|
||||
return err
|
||||
}
|
||||
check := func(appName, deployName string) bool {
|
||||
if namespace == fmt.Sprintf("user-space-%s", am.Spec.AppOwner) ||
|
||||
namespace == fmt.Sprintf("user-system-%s", am.Spec.AppOwner) ||
|
||||
namespace == "os-platform" ||
|
||||
namespace == "os-framework" {
|
||||
if targetNamespace == fmt.Sprintf("user-space-%s", am.Spec.AppOwner) ||
|
||||
targetNamespace == fmt.Sprintf("user-system-%s", am.Spec.AppOwner) ||
|
||||
targetNamespace == "os-platform" ||
|
||||
targetNamespace == "os-framework" {
|
||||
if appName == deployName {
|
||||
return true
|
||||
}
|
||||
@@ -54,7 +57,7 @@ func suspendOrResumeApp(ctx context.Context, cli client.Client, am *appv1alpha1.
|
||||
workloadName := ""
|
||||
switch workload := w.(type) {
|
||||
case *appsv1.Deployment:
|
||||
if check(am.Spec.AppName, workload.Name) {
|
||||
if check(targetAppName, workload.Name) {
|
||||
if workload.Annotations == nil {
|
||||
workload.Annotations = make(map[string]string)
|
||||
}
|
||||
@@ -64,7 +67,7 @@ func suspendOrResumeApp(ctx context.Context, cli client.Client, am *appv1alpha1.
|
||||
workloadName = workload.Namespace + "/" + workload.Name
|
||||
}
|
||||
case *appsv1.StatefulSet:
|
||||
if check(am.Spec.AppName, workload.Name) {
|
||||
if check(targetAppName, workload.Name) {
|
||||
if workload.Annotations == nil {
|
||||
workload.Annotations = make(map[string]string)
|
||||
}
|
||||
@@ -92,15 +95,79 @@ func suspendOrResumeApp(ctx context.Context, cli client.Client, am *appv1alpha1.
|
||||
} // end of suspend func
|
||||
|
||||
var deploymentList appsv1.DeploymentList
|
||||
err := suspend(&deploymentList)
|
||||
err := suspendOrResume(&deploymentList, am.Spec.AppNamespace, am.Spec.AppName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var stsList appsv1.StatefulSetList
|
||||
err = suspend(&stsList)
|
||||
err = suspendOrResume(&stsList, am.Spec.AppNamespace, am.Spec.AppName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return err
|
||||
// If stopOrResumeServer is true, also suspend/resume shared server charts for V2 apps
|
||||
if stopOrResumeServer {
|
||||
var appCfg *appcfg.ApplicationConfig
|
||||
if err := json.Unmarshal([]byte(am.Spec.Config), &appCfg); err != nil {
|
||||
klog.Warningf("failed to unmarshal app config for stopServer check: %v", err)
|
||||
return err
|
||||
}
|
||||
|
||||
if appCfg != nil && appCfg.IsV2() && appCfg.HasClusterSharedCharts() {
|
||||
for _, chart := range appCfg.SubCharts {
|
||||
if !chart.Shared {
|
||||
continue
|
||||
}
|
||||
ns := chart.Namespace(am.Spec.AppOwner)
|
||||
if replicas == 0 {
|
||||
klog.Infof("suspending shared chart %s in namespace %s", chart.Name, ns)
|
||||
} else {
|
||||
klog.Infof("resuming shared chart %s in namespace %s", chart.Name, ns)
|
||||
}
|
||||
|
||||
var sharedDeploymentList appsv1.DeploymentList
|
||||
if err := suspendOrResume(&sharedDeploymentList, ns, chart.Name); err != nil {
|
||||
klog.Errorf("failed to scale deployments in shared chart %s namespace %s: %v", chart.Name, ns, err)
|
||||
return err
|
||||
}
|
||||
|
||||
var sharedStsList appsv1.StatefulSetList
|
||||
if err := suspendOrResume(&sharedStsList, ns, chart.Name); err != nil {
|
||||
klog.Errorf("failed to scale statefulsets in shared chart %s namespace %s: %v", chart.Name, ns, err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Reset the stop-all/resume-all annotation after processing
|
||||
if am.Annotations != nil {
|
||||
delete(am.Annotations, api.AppStopAllKey)
|
||||
delete(am.Annotations, api.AppResumeAllKey)
|
||||
if err := cli.Update(ctx, am); err != nil {
|
||||
klog.Warningf("failed to reset stop-all/resume-all annotations for app=%s owner=%s: %v", am.Spec.AppName, am.Spec.AppOwner, err)
|
||||
// Don't return error, operation already succeeded
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func suspendV1AppOrV2Client(ctx context.Context, cli client.Client, am *appv1alpha1.ApplicationManager) error {
|
||||
return suspendOrResumeApp(ctx, cli, am, 0, false)
|
||||
}
|
||||
|
||||
func suspendV2AppAll(ctx context.Context, cli client.Client, am *appv1alpha1.ApplicationManager) error {
|
||||
return suspendOrResumeApp(ctx, cli, am, 0, true)
|
||||
}
|
||||
|
||||
func resumeV1AppOrV2AppClient(ctx context.Context, cli client.Client, am *appv1alpha1.ApplicationManager) error {
|
||||
return suspendOrResumeApp(ctx, cli, am, 1, false)
|
||||
}
|
||||
|
||||
func resumeV2AppAll(ctx context.Context, cli client.Client, am *appv1alpha1.ApplicationManager) error {
|
||||
return suspendOrResumeApp(ctx, cli, am, 1, true)
|
||||
}
|
||||
|
||||
func isStartUp(am *appv1alpha1.ApplicationManager, cli client.Client) (bool, error) {
|
||||
|
||||
@@ -3,5 +3,6 @@ package errcode
|
||||
import "errors"
|
||||
|
||||
var (
|
||||
ErrPodPending = errors.New("pod is pending")
|
||||
ErrServerSidePodPending = errors.New("server side pod is pending")
|
||||
ErrPodPending = errors.New("pod is pending")
|
||||
)
|
||||
|
||||
@@ -160,6 +160,7 @@ func PublishAppEventToQueue(p utils.EventParams) {
|
||||
return p.RawAppName
|
||||
}(),
|
||||
Title: p.Title,
|
||||
Icon: p.Icon,
|
||||
Reason: p.Reason,
|
||||
Message: p.Message,
|
||||
SharedEntrances: p.SharedEntrances,
|
||||
|
||||
@@ -233,6 +233,7 @@ func (imc *ImageManagerClient) updateProgress(ctx context.Context, am *appv1alph
|
||||
RawAppName: am.Spec.RawAppName,
|
||||
Type: am.Spec.Type.String(),
|
||||
Title: apputils.AppTitle(am.Spec.Config),
|
||||
Icon: apputils.AppIcon(am.Spec.Config),
|
||||
})
|
||||
}
|
||||
klog.Infof("app %s download progress.... %v", am.Spec.AppName, progressStr)
|
||||
|
||||
@@ -42,6 +42,9 @@ const (
|
||||
|
||||
// TypeMySQL indicates the middleware is mysql
|
||||
TypeMySQL MiddlewareType = "mysql"
|
||||
|
||||
// TypeClickHouse indicates the middleware is ClickHouse
|
||||
TypeClickHouse MiddlewareType = "clickhouse"
|
||||
)
|
||||
|
||||
func (mr MiddlewareType) String() string {
|
||||
@@ -323,6 +326,27 @@ func Apply(middleware *Middleware, kubeConfig *rest.Config, appName, appNamespac
|
||||
}
|
||||
klog.Infof("values.mysql: %v", vals["mysql"])
|
||||
}
|
||||
if middleware.ClickHouse != nil {
|
||||
username := fmt.Sprintf("%s-%s-%s", middleware.ClickHouse.Username, ownerName, appName)
|
||||
err := process(kubeConfig, appName, appNamespace, namespace, username, TypeClickHouse, ownerName, middleware)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
resp, err := getMiddlewareRequest(TypeClickHouse)
|
||||
if err != nil {
|
||||
klog.Errorf("failed to get clickHouse middleware request info %v", err)
|
||||
return err
|
||||
}
|
||||
vals["clickhouse"] = map[string]interface{}{
|
||||
"host": resp.Host,
|
||||
"port": resp.Port,
|
||||
"username": resp.UserName,
|
||||
"password": resp.Password,
|
||||
"databases": resp.Databases,
|
||||
}
|
||||
klog.Infof("values.clickhouse: %v", vals["clickhouse"])
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -383,6 +407,8 @@ func getPassword(middleware *Middleware, middlewareType MiddlewareType) (string,
|
||||
return middleware.MariaDB.Password, nil
|
||||
case TypeMySQL:
|
||||
return middleware.MySQL.Password, nil
|
||||
case TypeClickHouse:
|
||||
return middleware.ClickHouse.Password, nil
|
||||
}
|
||||
return "", fmt.Errorf("unsupported middleware type %v", middlewareType)
|
||||
}
|
||||
|
||||
@@ -287,6 +287,32 @@ spec:
|
||||
user: {{ .Middleware.Username }}
|
||||
`
|
||||
|
||||
const clickHouseRequest = `apiVersion: apr.bytetrade.io/v1alpha1
|
||||
kind: MiddlewareRequest
|
||||
metadata:
|
||||
name: {{ .AppName }}-clickhouse
|
||||
namespace: {{ .Namespace }}
|
||||
spec:
|
||||
app: {{ .AppName }}
|
||||
appNamespace: {{ .AppNamespace }}
|
||||
middleware: clickhouse
|
||||
clickhouse:
|
||||
databases:
|
||||
{{- range $k, $v := .Middleware.Databases }}
|
||||
- name: {{ $v.Name }}
|
||||
{{- end }}
|
||||
password:
|
||||
{{- if not (eq .Middleware.Password "") }}
|
||||
value: {{ .Middleware.Password }}
|
||||
{{- else }}
|
||||
valueFrom:
|
||||
secretKeyRef:
|
||||
name: {{ .AppName }}-{{ .Namespace }}-clickhouse-password
|
||||
key: "password"
|
||||
{{- end }}
|
||||
user: {{ .Middleware.Username }}
|
||||
`
|
||||
|
||||
type RequestParams struct {
|
||||
MiddlewareType MiddlewareType
|
||||
AppName string
|
||||
@@ -318,6 +344,8 @@ func GenMiddleRequest(p RequestParams) ([]byte, error) {
|
||||
return genMariadbRequest(p)
|
||||
case TypeMySQL:
|
||||
return genMysqlRequest(p)
|
||||
case TypeClickHouse:
|
||||
return genClickHouseRequest(p)
|
||||
default:
|
||||
return []byte{}, fmt.Errorf("unsupported middleware type: %s", p.MiddlewareType)
|
||||
}
|
||||
@@ -512,3 +540,22 @@ func genElasticsearchRequest(p RequestParams) ([]byte, error) {
|
||||
}
|
||||
return renderTemplate(elasticsearchRequest, data)
|
||||
}
|
||||
|
||||
func genClickHouseRequest(p RequestParams) ([]byte, error) {
|
||||
data := struct {
|
||||
AppName string
|
||||
AppNamespace string
|
||||
Namespace string
|
||||
Middleware *ClickHouseConfig
|
||||
}{
|
||||
AppName: p.AppName,
|
||||
AppNamespace: p.AppNamespace,
|
||||
Namespace: p.Namespace,
|
||||
Middleware: &ClickHouseConfig{
|
||||
Username: p.Username,
|
||||
Password: p.Password,
|
||||
Databases: p.Middleware.ClickHouse.Databases,
|
||||
},
|
||||
}
|
||||
return renderTemplate(clickHouseRequest, data)
|
||||
}
|
||||
|
||||
@@ -12,6 +12,7 @@ type Middleware struct {
|
||||
MariaDB *MariaDBConfig `yaml:"mariadb,omitempty"`
|
||||
MySQL *MySQLConfig `yaml:"mysql,omitempty"`
|
||||
Argo *ArgoConfig `yaml:"argo,omitempty"`
|
||||
ClickHouse *ClickHouseConfig `yaml:"clickHouse,omitempty"`
|
||||
}
|
||||
|
||||
// Database specify database name and if distributed.
|
||||
@@ -92,6 +93,13 @@ type MySQLConfig struct {
|
||||
Databases []Database `yaml:"databases" json:"databases"`
|
||||
}
|
||||
|
||||
// ClickHouseConfig contains fields for clickhouse config.
|
||||
type ClickHouseConfig struct {
|
||||
Username string `yaml:"username" json:"username"`
|
||||
Password string `yaml:"password,omitempty" json:"password"`
|
||||
Databases []Database `yaml:"databases" json:"databases"`
|
||||
}
|
||||
|
||||
type NatsConfig struct {
|
||||
Username string `yaml:"username" json:"username"`
|
||||
Password string `yaml:"password,omitempty" json:"password,omitempty"`
|
||||
|
||||
@@ -1080,13 +1080,28 @@ func IsClonedApp(appName, rawAppName string) bool {
|
||||
}
|
||||
|
||||
func AppTitle(config string) string {
|
||||
var cfg appcfg.ApplicationConfig
|
||||
err := json.Unmarshal([]byte(config), &cfg)
|
||||
if err != nil {
|
||||
cfg := unmarshalApplicationConfig(config)
|
||||
if cfg == nil {
|
||||
return ""
|
||||
}
|
||||
return cfg.Title
|
||||
}
|
||||
func AppIcon(config string) string {
|
||||
cfg := unmarshalApplicationConfig(config)
|
||||
if cfg == nil {
|
||||
return ""
|
||||
}
|
||||
return cfg.Icon
|
||||
}
|
||||
|
||||
func unmarshalApplicationConfig(config string) *appcfg.ApplicationConfig {
|
||||
var cfg appcfg.ApplicationConfig
|
||||
err := json.Unmarshal([]byte(config), &cfg)
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
return &cfg
|
||||
}
|
||||
|
||||
func GetRawAppName(AppName, rawAppName string) string {
|
||||
if rawAppName == "" {
|
||||
|
||||
@@ -24,6 +24,7 @@ import (
|
||||
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/utils"
|
||||
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
"k8s.io/apimachinery/pkg/api/resource"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
@@ -260,42 +261,10 @@ func CheckAppRequirement(token string, appConfig *appcfg.ApplicationConfig, op v
|
||||
}
|
||||
}
|
||||
|
||||
allocatedResources, err := getRequestResources()
|
||||
if err != nil {
|
||||
return "", "", err
|
||||
}
|
||||
if len(allocatedResources) == 1 {
|
||||
sufficientCPU, sufficientMemory := false, false
|
||||
if appConfig.Requirement.CPU == nil {
|
||||
sufficientCPU = true
|
||||
}
|
||||
if appConfig.Requirement.Memory == nil {
|
||||
sufficientMemory = true
|
||||
}
|
||||
for _, v := range allocatedResources {
|
||||
if appConfig.Requirement.CPU != nil {
|
||||
if v.cpu.allocatable.Cmp(*appConfig.Requirement.CPU) > 0 {
|
||||
sufficientCPU = true
|
||||
}
|
||||
}
|
||||
if appConfig.Requirement.Memory != nil {
|
||||
if v.memory.allocatable.Cmp(*appConfig.Requirement.Memory) > 0 {
|
||||
sufficientMemory = true
|
||||
}
|
||||
}
|
||||
}
|
||||
if !sufficientCPU {
|
||||
return constants.CPU, constants.K8sRequestCPUPressure, fmt.Errorf(constants.K8sRequestCPUPressureMessage, op)
|
||||
}
|
||||
if !sufficientMemory {
|
||||
return constants.Memory, constants.K8sRequestMemoryPressure, fmt.Errorf(constants.K8sRequestMemoryPressureMessage, op)
|
||||
}
|
||||
}
|
||||
|
||||
return "", "", nil
|
||||
return CheckAppK8sRequestResource(appConfig, op)
|
||||
}
|
||||
|
||||
func getRequestResources() (map[string]resources, error) {
|
||||
func GetRequestResources() (map[string]resources, error) {
|
||||
config, err := ctrl.GetConfig()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -960,3 +929,83 @@ func CheckCloneEntrances(ctrlClient client.Client, appConfig *appcfg.Application
|
||||
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func GetClusterAvailableResource() (*resources, error) {
|
||||
config, err := ctrl.GetConfig()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
client, err := kubernetes.NewForConfig(config)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
nodes, err := client.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
initAllocatable := resource.MustParse("0")
|
||||
availableResources := resources{
|
||||
cpu: usage{allocatable: &initAllocatable},
|
||||
memory: usage{allocatable: &initAllocatable},
|
||||
}
|
||||
nodeList := make([]corev1.Node, 0)
|
||||
for _, node := range nodes.Items {
|
||||
if !utils.IsNodeReady(&node) || node.Spec.Unschedulable {
|
||||
continue
|
||||
}
|
||||
nodeList = append(nodeList, node)
|
||||
}
|
||||
if len(nodeList) == 0 {
|
||||
return nil, errors.New("cluster has no suitable node to schedule")
|
||||
}
|
||||
for _, node := range nodeList {
|
||||
availableResources.cpu.allocatable.Add(*node.Status.Allocatable.Cpu())
|
||||
availableResources.memory.allocatable.Add(*node.Status.Allocatable.Memory())
|
||||
fieldSelector := fmt.Sprintf("spec.nodeName=%s,status.phase!=Failed,status.phase!=Succeeded", node.Name)
|
||||
pods, err := client.CoreV1().Pods("").List(context.TODO(), metav1.ListOptions{
|
||||
FieldSelector: fieldSelector,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for _, pod := range pods.Items {
|
||||
for _, container := range pod.Spec.Containers {
|
||||
availableResources.cpu.allocatable.Sub(*container.Resources.Requests.Cpu())
|
||||
availableResources.memory.allocatable.Sub(*container.Resources.Requests.Memory())
|
||||
}
|
||||
}
|
||||
}
|
||||
return &availableResources, nil
|
||||
}
|
||||
|
||||
func CheckAppK8sRequestResource(appConfig *appcfg.ApplicationConfig, op v1alpha1.OpType) (constants.ResourceType, constants.ResourceConditionType, error) {
|
||||
availableResources, err := GetClusterAvailableResource()
|
||||
if err != nil {
|
||||
return "", "", err
|
||||
}
|
||||
if appConfig == nil {
|
||||
return "", "", errors.New("nil appConfig")
|
||||
}
|
||||
|
||||
sufficientCPU, sufficientMemory := false, false
|
||||
|
||||
if appConfig.Requirement.CPU == nil {
|
||||
sufficientCPU = true
|
||||
}
|
||||
if appConfig.Requirement.Memory == nil {
|
||||
sufficientMemory = true
|
||||
}
|
||||
if appConfig.Requirement.CPU != nil && availableResources.cpu.allocatable.Cmp(*appConfig.Requirement.CPU) > 0 {
|
||||
sufficientCPU = true
|
||||
}
|
||||
if appConfig.Requirement.Memory != nil && availableResources.memory.allocatable.Cmp(*appConfig.Requirement.Memory) > 0 {
|
||||
sufficientMemory = true
|
||||
}
|
||||
if !sufficientCPU {
|
||||
return constants.CPU, constants.K8sRequestCPUPressure, fmt.Errorf(constants.K8sRequestCPUPressureMessage, op)
|
||||
}
|
||||
if !sufficientMemory {
|
||||
return constants.Memory, constants.K8sRequestMemoryPressure, fmt.Errorf(constants.K8sRequestMemoryPressureMessage, op)
|
||||
}
|
||||
return "", "", nil
|
||||
}
|
||||
|
||||
@@ -162,6 +162,9 @@ func GetResourceListFromChart(chartPath string, values map[string]interface{}) (
|
||||
values["elasticsearch"] = map[string]interface{}{
|
||||
"indexes": map[string]interface{}{},
|
||||
}
|
||||
values["clickhouse"] = map[string]interface{}{
|
||||
"databases": map[string]interface{}{},
|
||||
}
|
||||
values["svcs"] = map[string]interface{}{}
|
||||
values["nats"] = map[string]interface{}{
|
||||
"subjects": map[string]interface{}{},
|
||||
|
||||
@@ -25,6 +25,7 @@ type Event struct {
|
||||
User string `json:"user"`
|
||||
EntranceStatuses []v1alpha1.EntranceStatus `json:"entranceStatuses,omitempty"`
|
||||
Title string `json:"title,omitempty"`
|
||||
Icon string `json:"icon,omitempty"`
|
||||
Reason string `json:"reason,omitempty"`
|
||||
Message string `json:"message,omitempty"`
|
||||
SharedEntrances []v1alpha1.Entrance `json:"sharedEntrances,omitempty"`
|
||||
@@ -45,6 +46,7 @@ type EventParams struct {
|
||||
Reason string
|
||||
Message string
|
||||
SharedEntrances []v1alpha1.Entrance
|
||||
Icon string
|
||||
}
|
||||
|
||||
func PublishEvent(nc *nats.Conn, subject string, data interface{}) error {
|
||||
|
||||
Reference in New Issue
Block a user