Compare commits

...

11 Commits

Author SHA1 Message Date
hysyeah
256922e6d0 fix: check k8s request before into installing state (#2454) 2026-01-28 20:09:00 +08:00
hys
669de95449 fix: add spec ports 2026-01-28 19:49:22 +08:00
hys
8f703c5d23 fix: check k8s request before into installing state 2026-01-28 17:10:08 +08:00
hysyeah
94ae4e88fa fix: v2 app stop (#2452) 2026-01-28 16:14:43 +08:00
hys
71f2f2152c fix: v2 app stop 2026-01-28 16:04:56 +08:00
hysyeah
bf08441dbd feat: add icon filed to nats event (#2443) 2026-01-23 20:40:15 +08:00
hys
ae837c6fd2 feat: add icon filed to nats event 2026-01-23 14:42:57 +08:00
hys
137dd9f505 appservice image tag to 0.4.76 2026-01-22 20:19:49 +08:00
hys
e643c01917 feat: add clickhouse support 2026-01-22 20:19:49 +08:00
hys
66b65b8e44 fix: helm upgrade do not use atomic param and allow upgrade failed release 2026-01-22 20:19:48 +08:00
hys
aa31e4b388 fix: failed release upgrade 2026-01-22 20:19:48 +08:00
26 changed files with 539 additions and 134 deletions

View File

@@ -170,7 +170,7 @@ spec:
priorityClassName: "system-cluster-critical"
containers:
- name: app-service
image: beclab/app-service:0.4.75
image: beclab/app-service:0.4.76
imagePullPolicy: IfNotPresent
ports:
- containerPort: 6755

View File

@@ -193,6 +193,7 @@ func (r *ApplicationManagerController) publishStateChangeEvent(am *appv1alpha1.A
RawAppName: am.Spec.RawAppName,
Type: am.Spec.Type.String(),
Title: apputils.AppTitle(am.Spec.Config),
Icon: apputils.AppIcon(am.Spec.Config),
Reason: am.Status.Reason,
Message: am.Status.Message,
})

View File

@@ -252,6 +252,7 @@ func (r *EntranceStatusManagerController) updateEntranceStatus(ctx context.Conte
RawAppName: appCopy.Spec.RawAppName,
Type: am.Spec.Type.String(),
Title: app.AppTitle(am.Spec.Config),
Icon: app.AppIcon(am.Spec.Config),
SharedEntrances: appCopy.Spec.SharedEntrances,
})
}

View File

@@ -4,9 +4,11 @@ import (
"context"
"os"
"strconv"
"strings"
"time"
appv1alpha1 "github.com/beclab/Olares/framework/app-service/api/app.bytetrade.io/v1alpha1"
"github.com/beclab/Olares/framework/app-service/pkg/apiserver/api"
"github.com/beclab/Olares/framework/app-service/pkg/appstate"
"github.com/beclab/Olares/framework/app-service/pkg/constants"
apputils "github.com/beclab/Olares/framework/app-service/pkg/utils/app"
@@ -120,7 +122,7 @@ func (r *PodAbnormalSuspendAppController) Reconcile(ctx context.Context, req ctr
if pod.Status.Reason == "Evicted" {
klog.Infof("pod evicted name=%s namespace=%s, attempting to suspend app=%s owner=%s", pod.Name, pod.Namespace, appName, owner)
ok, err := r.trySuspendApp(ctx, owner, appName, constants.AppStopDueToEvicted, "evicted pod: "+pod.Namespace+"/"+pod.Name)
ok, err := r.trySuspendApp(ctx, owner, appName, constants.AppStopDueToEvicted, "evicted pod: "+pod.Namespace+"/"+pod.Name, pod.Namespace)
if err != nil {
klog.Errorf("suspend attempt failed for app=%s owner=%s: %v", appName, owner, err)
return ctrl.Result{}, err
@@ -147,7 +149,7 @@ func (r *PodAbnormalSuspendAppController) Reconcile(ctx context.Context, req ctr
}
klog.Infof("attempting to suspend app=%s owner=%s due to pending unschedulable timeout", appName, owner)
ok, err := r.trySuspendApp(ctx, owner, appName, constants.AppUnschedulable, "pending unschedulable timeout on pod: "+pod.Namespace+"/"+pod.Name)
ok, err := r.trySuspendApp(ctx, owner, appName, constants.AppUnschedulable, "pending unschedulable timeout on pod: "+pod.Namespace+"/"+pod.Name, pod.Namespace)
if err != nil {
klog.Errorf("suspend attempt failed for app=%s owner=%s: %v", appName, owner, err)
return ctrl.Result{}, err
@@ -191,7 +193,7 @@ func pendingUnschedulableSince(pod *corev1.Pod) (time.Time, bool) {
// trySuspendApp attempts to suspend the app and returns (true, nil) if a suspend request was issued.
// If the app is not suspendable yet, returns (false, nil) to trigger a short requeue.
func (r *PodAbnormalSuspendAppController) trySuspendApp(ctx context.Context, owner, appName, reason, message string) (bool, error) {
func (r *PodAbnormalSuspendAppController) trySuspendApp(ctx context.Context, owner, appName, reason, message, podNamespace string) (bool, error) {
name, err := apputils.FmtAppMgrName(appName, owner, "")
if err != nil {
klog.Errorf("failed to format app manager name app=%s owner=%s: %v", appName, owner, err)
@@ -215,6 +217,11 @@ func (r *PodAbnormalSuspendAppController) trySuspendApp(ctx context.Context, own
return false, nil
}
isServerPod := strings.HasSuffix(podNamespace, "-shared")
if isServerPod {
am.Annotations[api.AppStopAllKey] = "true"
}
am.Spec.OpType = appv1alpha1.StopOp
if err := r.Update(ctx, &am); err != nil {
klog.Errorf("failed to update applicationmanager spec to StopOp name=%s app=%s owner=%s: %v", am.Name, appName, owner, err)

View File

@@ -426,6 +426,11 @@ func (h *Handler) apps(req *restful.Request, resp *restful.Response) {
api.HandleError(resp, req, err)
return
}
for i := range appconfig.Entrances {
if appconfig.Entrances[i].AuthLevel == "" {
appconfig.Entrances[i].AuthLevel = "private"
}
}
now := metav1.Now()
name, _ := apputils.FmtAppMgrName(am.Spec.AppName, owner, appconfig.Namespace)
app := &v1alpha1.Application{
@@ -443,6 +448,7 @@ func (h *Handler) apps(req *restful.Request, resp *restful.Response) {
Owner: owner,
Entrances: appconfig.Entrances,
SharedEntrances: appconfig.SharedEntrances,
Ports: appconfig.Ports,
Icon: appconfig.Icon,
Settings: map[string]string{
"title": am.Annotations[constants.ApplicationTitleLabel],
@@ -477,6 +483,8 @@ func (h *Handler) apps(req *restful.Request, resp *restful.Response) {
}
if v, ok := appsMap[a.Name]; ok {
v.Spec.Settings = a.Spec.Settings
v.Spec.Entrances = a.Spec.Entrances
v.Spec.Ports = a.Spec.Ports
}
}
}
@@ -738,6 +746,11 @@ func (h *Handler) allUsersApps(req *restful.Request, resp *restful.Response) {
api.HandleError(resp, req, err)
return
}
for i := range appconfig.Entrances {
if appconfig.Entrances[i].AuthLevel == "" {
appconfig.Entrances[i].AuthLevel = "private"
}
}
now := metav1.Now()
app := v1alpha1.Application{
@@ -754,6 +767,7 @@ func (h *Handler) allUsersApps(req *restful.Request, resp *restful.Response) {
Namespace: am.Spec.AppNamespace,
Owner: am.Spec.AppOwner,
Entrances: appconfig.Entrances,
Ports: appconfig.Ports,
SharedEntrances: appconfig.SharedEntrances,
Icon: appconfig.Icon,
Settings: map[string]string{
@@ -788,6 +802,8 @@ func (h *Handler) allUsersApps(req *restful.Request, resp *restful.Response) {
}
if v, ok := appsMap[a.Name]; ok {
v.Spec.Settings = a.Spec.Settings
v.Spec.Entrances = a.Spec.Entrances
v.Spec.Ports = a.Spec.Ports
}
}
@@ -984,6 +1000,9 @@ func (h *Handler) oamValues(req *restful.Request, resp *restful.Response) {
values["mongodb"] = map[string]interface{}{
"databases": map[string]interface{}{},
}
values["clickhouse"] = map[string]interface{}{
"databases": map[string]interface{}{},
}
values["svcs"] = map[string]interface{}{}
values["nats"] = map[string]interface{}{
"subjects": map[string]interface{}{},

View File

@@ -187,6 +187,11 @@ func (h *Handler) listBackend(req *restful.Request, resp *restful.Response) {
api.HandleError(resp, req, err)
return
}
for i := range appconfig.Entrances {
if appconfig.Entrances[i].AuthLevel == "" {
appconfig.Entrances[i].AuthLevel = "private"
}
}
appconfig.SharedEntrances, err = appconfig.GenSharedEntranceURL(req.Request.Context())
if err != nil {
@@ -214,6 +219,7 @@ func (h *Handler) listBackend(req *restful.Request, resp *restful.Response) {
Namespace: am.Spec.AppNamespace,
Owner: am.Spec.AppOwner,
Entrances: appconfig.Entrances,
Ports: appconfig.Ports,
SharedEntrances: appconfig.SharedEntrances,
Icon: appconfig.Icon,
Settings: map[string]string{
@@ -274,6 +280,8 @@ func (h *Handler) listBackend(req *restful.Request, resp *restful.Response) {
}
if v, ok := appsMap[a.Name]; ok {
v.Spec.Settings = a.Spec.Settings
v.Spec.Entrances = a.Spec.Entrances
v.Spec.Ports = a.Spec.Ports
}
}
}

View File

@@ -51,6 +51,7 @@ var (
tapr.TypeElasticsearch.String(),
tapr.TypeMariaDB.String(),
tapr.TypeMySQL.String(),
tapr.TypeClickHouse.String(),
}
)
@@ -563,7 +564,7 @@ func (h *HelmOps) WaitForStartUp() (bool, error) {
}
return true, nil
}
if errors.Is(err, errcode.ErrPodPending) {
if errors.Is(err, errcode.ErrPodPending) || errors.Is(err, errcode.ErrServerSidePodPending) {
return false, err
}
@@ -575,11 +576,41 @@ func (h *HelmOps) WaitForStartUp() (bool, error) {
}
func (h *HelmOps) isStartUp() (bool, error) {
pods, err := h.findAppSelectedPods()
if h.app.IsV2() && h.app.IsMultiCharts() {
serverPods, err := h.findServerPods()
if err != nil {
return false, err
}
podNames := make([]string, 0)
for _, p := range serverPods {
podNames = append(podNames, p.Name)
}
klog.Infof("podSErvers: %v", podNames)
serverStarted, err := checkIfStartup(serverPods, true)
if err != nil {
klog.Errorf("v2 app %s server pods not ready: %v", h.app.AppName, err)
return false, err
}
if !serverStarted {
klog.Infof("v2 app %s server pods not started yet, waiting...", h.app.AppName)
return false, nil
}
klog.Infof("v2 app %s server pods started, checking client pods", h.app.AppName)
}
clientPods, err := h.findV1OrClientPods()
if err != nil {
return false, err
}
return checkIfStartup(pods)
clientStarted, err := checkIfStartup(clientPods, false)
if err != nil {
return false, err
}
return clientStarted, nil
}
func (h *HelmOps) findAppSelectedPods() (*corev1.PodList, error) {
@@ -609,15 +640,49 @@ func (h *HelmOps) findAppSelectedPods() (*corev1.PodList, error) {
return pods, nil
}
func checkIfStartup(pods *corev1.PodList) (bool, error) {
if len(pods.Items) == 0 {
func (h *HelmOps) findV1OrClientPods() ([]corev1.Pod, error) {
podList, err := h.client.KubeClient.Kubernetes().CoreV1().Pods(h.app.Namespace).List(h.ctx, metav1.ListOptions{})
if err != nil {
klog.Errorf("app %s get pods err %v", h.app.AppName, err)
return nil, err
}
return podList.Items, nil
}
func (h *HelmOps) findServerPods() ([]corev1.Pod, error) {
pods := make([]corev1.Pod, 0)
for _, c := range h.app.SubCharts {
if !c.Shared {
continue
}
ns := c.Namespace(h.app.OwnerName)
podList, err := h.client.KubeClient.Kubernetes().CoreV1().Pods(ns).List(h.ctx, metav1.ListOptions{})
if err != nil {
klog.Errorf("app %s get pods err %v", h.app.AppName, err)
return nil, err
}
pods = append(pods, podList.Items...)
}
return pods, nil
}
func checkIfStartup(pods []corev1.Pod, isServerSide bool) (bool, error) {
if len(pods) == 0 {
return false, errors.New("no pod found")
}
for _, pod := range pods.Items {
startedPods := 0
totalPods := len(pods)
for _, pod := range pods {
creationTime := pod.GetCreationTimestamp()
pendingDuration := time.Since(creationTime.Time)
if pod.Status.Phase == corev1.PodPending && pendingDuration > time.Minute*10 {
if isServerSide {
return false, errcode.ErrServerSidePodPending
}
return false, errcode.ErrPodPending
}
totalContainers := len(pod.Spec.Containers)
@@ -629,9 +694,12 @@ func checkIfStartup(pods *corev1.PodList) (bool, error) {
}
}
if startedContainers == totalContainers {
return true, nil
startedPods++
}
}
if totalPods == startedPods {
return true, nil
}
return false, nil
}
@@ -795,7 +863,7 @@ func (h *HelmOps) Install() error {
return nil
}
ok, err := h.WaitForStartUp()
if err != nil && errors.Is(err, errcode.ErrPodPending) {
if err != nil && (errors.Is(err, errcode.ErrPodPending) || errors.Is(err, errcode.ErrServerSidePodPending)) {
return err
}
if !ok {

View File

@@ -51,7 +51,7 @@ func (h *HelmOpsV2) ApplyEnv() error {
}
ok, err := h.WaitForStartUp()
if err != nil && errors.Is(err, errcode.ErrPodPending) {
if err != nil && (errors.Is(err, errcode.ErrPodPending) || errors.Is(err, errcode.ErrServerSidePodPending)) {
return err
}

View File

@@ -119,7 +119,7 @@ func (h *HelmOpsV2) Install() error {
return nil
}
ok, err := h.WaitForStartUp()
if err != nil && errors.Is(err, errcode.ErrPodPending) {
if err != nil && (errors.Is(err, errcode.ErrPodPending) || errors.Is(err, errcode.ErrServerSidePodPending)) {
klog.Errorf("App %s is pending, err=%v", h.App().AppName, err)
return err
}

View File

@@ -91,7 +91,7 @@ func (h *HelmOpsV2) Upgrade() error {
}
ok, err := h.WaitForStartUp()
if err != nil && errors.Is(err, errcode.ErrPodPending) {
if err != nil && (errors.Is(err, errcode.ErrPodPending) || errors.Is(err, errcode.ErrServerSidePodPending)) {
return err
}

View File

@@ -13,6 +13,7 @@ import (
"github.com/beclab/Olares/framework/app-service/pkg/images"
"github.com/beclab/Olares/framework/app-service/pkg/kubesphere"
"github.com/beclab/Olares/framework/app-service/pkg/utils"
apputils "github.com/beclab/Olares/framework/app-service/pkg/utils/app"
"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
@@ -47,6 +48,29 @@ func (r *downloadingInProgressApp) WaitAsync(ctx context.Context) {
return
}
// Check Kubernetes request resources before transitioning to Installing
var appConfig *appcfg.ApplicationConfig
if err := json.Unmarshal([]byte(r.manager.Spec.Config), &appConfig); err != nil {
klog.Errorf("failed to unmarshal app config for %s: %v", r.manager.Spec.AppName, err)
updateErr := r.updateStatus(context.TODO(), r.manager, appsv1.InstallFailed, nil, fmt.Sprintf("invalid app config: %v", err), "")
if updateErr != nil {
klog.Errorf("update app manager %s to %s state failed %v", r.manager.Name, appsv1.InstallFailed.String(), updateErr)
}
return
}
_, conditionType, checkErr := apputils.CheckAppK8sRequestResource(appConfig, r.manager.Spec.OpType)
if checkErr != nil {
klog.Errorf("k8s request resource check failed for app %s: %v", r.manager.Spec.AppName, checkErr)
opRecord := makeRecord(r.manager, appsv1.InstallFailed, checkErr.Error())
updateErr := r.updateStatus(context.TODO(), r.manager, appsv1.InstallFailed, opRecord, checkErr.Error(), string(conditionType))
if updateErr != nil {
klog.Errorf("update app manager %s to %s state failed %v", r.manager.Name, appsv1.InstallFailed.String(), updateErr)
}
return
}
updateErr := r.updateStatus(context.TODO(), r.manager, appsv1.Installing, nil, appsv1.Installing.String(), "")
if updateErr != nil {
klog.Errorf("update app manager %s to %s state failed %v", r.manager.Name, appsv1.Installing.String(), updateErr)

View File

@@ -16,6 +16,7 @@ import (
apputils "github.com/beclab/Olares/framework/app-service/pkg/utils/app"
"github.com/pkg/errors"
"k8s.io/apimachinery/pkg/types"
"k8s.io/klog/v2"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
@@ -104,8 +105,37 @@ func (p *InstallingApp) Exec(ctx context.Context) (StatefulInProgressApp, error)
err = ops.Install()
if err != nil {
klog.Errorf("install app %s failed %v", p.manager.Spec.AppName, err)
if errors.Is(err, errcode.ErrPodPending) {
if errors.Is(err, errcode.ErrServerSidePodPending) {
p.finally = func() {
klog.Infof("app %s server side pods is pending, set stop-all annotation and update app state to stopping", p.manager.Spec.AppName)
var am appsv1.ApplicationManager
if err := p.client.Get(context.TODO(), types.NamespacedName{Name: p.manager.Name}, &am); err != nil {
klog.Errorf("failed to get application manager: %v", err)
return
}
if am.Annotations == nil {
am.Annotations = make(map[string]string)
}
am.Annotations[api.AppStopAllKey] = "true"
if err := p.client.Update(ctx, &am); err != nil {
klog.Errorf("failed to set stop-all annotation: %v", err)
return
}
updateErr := p.updateStatus(ctx, &am, appsv1.Stopping, nil, err.Error(), constants.AppUnschedulable)
if updateErr != nil {
klog.Errorf("update status failed %v", updateErr)
return
}
}
return
}
if errors.Is(err, errcode.ErrPodPending) {
p.finally = func() {
klog.Infof("app %s pods is still pending, update app state to stopping", p.manager.Spec.AppName)
updateErr := p.updateStatus(context.TODO(), p.manager, appsv1.Stopping, nil, err.Error(), constants.AppUnschedulable)

View File

@@ -2,13 +2,11 @@ package appstate
import (
"context"
"encoding/json"
"fmt"
"time"
appsv1 "github.com/beclab/Olares/framework/app-service/api/app.bytetrade.io/v1alpha1"
"github.com/beclab/Olares/framework/app-service/pkg/apiserver/api"
"github.com/beclab/Olares/framework/app-service/pkg/appcfg"
"github.com/beclab/Olares/framework/app-service/pkg/constants"
"github.com/beclab/Olares/framework/app-service/pkg/kubeblocks"
"github.com/beclab/Olares/framework/app-service/pkg/users/userspace"
@@ -60,41 +58,24 @@ func (p *ResumingApp) Exec(ctx context.Context) (StatefulInProgressApp, error) {
}
func (p *ResumingApp) exec(ctx context.Context) error {
err := suspendOrResumeApp(ctx, p.client, p.manager, int32(1))
if err != nil {
klog.Errorf("resume %s %s failed %v", p.manager.Spec.Type, p.manager.Spec.AppName, err)
return fmt.Errorf("resume app %s failed %w", p.manager.Spec.AppName, err)
}
// If resume-all is requested, also resume v2 server-side shared charts by scaling them up
if p.manager.Annotations[api.AppResumeAllKey] == "true" {
var appCfg *appcfg.ApplicationConfig
if err := json.Unmarshal([]byte(p.manager.Spec.Config), &appCfg); err != nil {
klog.Errorf("unmarshal to appConfig failed %v", err)
return err
// Check if resume-all is requested for V2 apps to also resume server-side shared charts
resumeServer := p.manager.Annotations[api.AppResumeAllKey] == "true"
if resumeServer {
err := resumeV2AppAll(ctx, p.client, p.manager)
if err != nil {
klog.Errorf("resume v2 app %s %s failed %v", p.manager.Spec.Type, p.manager.Spec.AppName, err)
return fmt.Errorf("resume v2 app %s failed %w", p.manager.Spec.AppName, err)
}
if appCfg != nil && appCfg.IsV2() && appCfg.HasClusterSharedCharts() {
for _, chart := range appCfg.SubCharts {
if !chart.Shared {
continue
}
ns := chart.Namespace(appCfg.OwnerName)
// create a shallow copy with target namespace/name for scaling logic
amCopy := p.manager.DeepCopy()
amCopy.Spec.AppNamespace = ns
amCopy.Spec.AppName = chart.Name
klog.Infof("resume-amCopy.Spec.AppNamespace: %s", ns)
klog.Infof("resume-amCopy.Spec.AppName: %s", chart.Name)
if err := suspendOrResumeApp(ctx, p.client, amCopy, int32(1)); err != nil {
klog.Errorf("failed to resume shared chart %s in namespace %s: %v", chart.Name, ns, err)
return err
}
}
} else {
err := resumeV1AppOrV2AppClient(ctx, p.client, p.manager)
if err != nil {
klog.Errorf("resume v2 app %s %s failed %v", p.manager.Spec.Type, p.manager.Spec.AppName, err)
return fmt.Errorf("resume v2 app %s failed %w", p.manager.Spec.AppName, err)
}
}
if p.manager.Spec.Type == "middleware" && userspace.IsKbMiddlewares(p.manager.Spec.AppName) {
err = p.execMiddleware(ctx)
err := p.execMiddleware(ctx)
if err != nil {
klog.Errorf("failed to resume middleware %s,err=%v", p.manager.Spec.AppName, err)
return err

View File

@@ -2,10 +2,13 @@ package appstate
import (
"context"
"fmt"
"time"
appsv1 "github.com/beclab/Olares/framework/app-service/api/app.bytetrade.io/v1alpha1"
"github.com/beclab/Olares/framework/app-service/pkg/apiserver/api"
"github.com/beclab/Olares/framework/app-service/pkg/kubeblocks"
"github.com/beclab/Olares/framework/app-service/pkg/users/userspace"
kbopv1alpha1 "github.com/apecloud/kubeblocks/apis/operations/v1alpha1"
"k8s.io/klog/v2"
@@ -44,20 +47,30 @@ func (p *SuspendFailedApp) Exec(ctx context.Context) (StatefulInProgressApp, err
}
func (p *SuspendFailedApp) StateReconcile(ctx context.Context) error {
err := suspendOrResumeApp(ctx, p.client, p.manager, int32(0))
if err != nil {
klog.Errorf("stop-failed-app %s state reconcile failed %v", p.manager.Spec.AppName, err)
return err
stopServer := p.manager.Annotations[api.AppStopAllKey] == "true"
if stopServer {
err := suspendV2AppAll(ctx, p.client, p.manager)
if err != nil {
klog.Errorf("suspend v2 app %s %s failed %v", p.manager.Spec.Type, p.manager.Spec.AppName, err)
return fmt.Errorf("suspend v2 app %s failed %w", p.manager.Spec.AppName, err)
}
} else {
err := suspendV1AppOrV2Client(ctx, p.client, p.manager)
if err != nil {
klog.Errorf("suspend app %s %s failed %v", p.manager.Spec.Type, p.manager.Spec.AppName, err)
return fmt.Errorf("suspend app %s failed %w", p.manager.Spec.AppName, err)
}
}
if p.manager.Spec.Type == "middleware" {
if p.manager.Spec.Type == "middleware" && userspace.IsKbMiddlewares(p.manager.Spec.AppName) {
op := kubeblocks.NewOperation(ctx, kbopv1alpha1.StopType, p.manager, p.client)
err = op.Stop()
err := op.Stop()
if err != nil {
klog.Errorf("stop-failed-middleware %s state reconcile failed %v", p.manager.Spec.AppName, err)
return err
}
}
return err
return nil
}
func (p *SuspendFailedApp) Cancel(ctx context.Context) error {

View File

@@ -2,17 +2,17 @@ package appstate
import (
"context"
"encoding/json"
"fmt"
"strconv"
"time"
appsv1 "github.com/beclab/Olares/framework/app-service/api/app.bytetrade.io/v1alpha1"
"github.com/beclab/Olares/framework/app-service/pkg/apiserver/api"
"github.com/beclab/Olares/framework/app-service/pkg/appcfg"
"github.com/beclab/Olares/framework/app-service/pkg/constants"
"github.com/beclab/Olares/framework/app-service/pkg/kubeblocks"
"github.com/beclab/Olares/framework/app-service/pkg/users/userspace"
"github.com/beclab/Olares/framework/app-service/pkg/utils"
apputils "github.com/beclab/Olares/framework/app-service/pkg/utils/app"
kbopv1alpha1 "github.com/apecloud/kubeblocks/apis/operations/v1alpha1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -78,35 +78,68 @@ func (p *SuspendingApp) Exec(ctx context.Context) (StatefulInProgressApp, error)
}
func (p *SuspendingApp) exec(ctx context.Context) error {
err := suspendOrResumeApp(ctx, p.client, p.manager, int32(0))
if err != nil {
klog.Errorf("suspend %s %s failed %v", p.manager.Spec.Type, p.manager.Spec.AppName, err)
return fmt.Errorf("suspend app %s failed %w", p.manager.Spec.AppName, err)
}
// If stop-all is requested, also stop v2 server-side shared charts by scaling them down
if p.manager.Annotations[api.AppStopAllKey] == "true" {
var appCfg *appcfg.ApplicationConfig
if err := json.Unmarshal([]byte(p.manager.Spec.Config), &appCfg); err != nil {
klog.Errorf("unmarshal to appConfig failed %v", err)
return err
// Check if stop-all is requested for V2 apps to also stop server-side shared charts
stopServer := p.manager.Annotations[api.AppStopAllKey] == "true"
if stopServer {
err := suspendV2AppAll(ctx, p.client, p.manager)
if err != nil {
klog.Errorf("suspend v2 app %s %s failed %v", p.manager.Spec.Type, p.manager.Spec.AppName, err)
return fmt.Errorf("suspend v2 app %s failed %w", p.manager.Spec.AppName, err)
}
if appCfg != nil && appCfg.IsV2() && appCfg.HasClusterSharedCharts() {
for _, chart := range appCfg.SubCharts {
if !chart.Shared {
} else {
err := suspendV1AppOrV2Client(ctx, p.client, p.manager)
if err != nil {
klog.Errorf("suspend app %s %s failed %v", p.manager.Spec.Type, p.manager.Spec.AppName, err)
return fmt.Errorf("suspend app %s failed %w", p.manager.Spec.AppName, err)
}
}
if stopServer {
// For V2 cluster-scoped apps, when server is down, stop all other users' clients
// because they share the same server and cannot function without it
klog.Infof("stopping other users' clients for v2 app %s", p.manager.Spec.AppName)
var appManagerList appsv1.ApplicationManagerList
if err := p.client.List(ctx, &appManagerList); err != nil {
klog.Errorf("failed to list application managers: %v", err)
} else {
// find all ApplicationManagers with same AppName but different AppOwner
for _, am := range appManagerList.Items {
// Skip if same owner (already handled) or different app
if am.Spec.AppName != p.manager.Spec.AppName || am.Spec.AppOwner == p.manager.Spec.AppOwner {
continue
}
ns := chart.Namespace(appCfg.OwnerName)
// create a shallow copy with target namespace/name for scaling logic
amCopy := p.manager.DeepCopy()
amCopy.Spec.AppNamespace = ns
amCopy.Spec.AppName = chart.Name
klog.Infof("amCopy.Spec.AppNamespace: %s", ns)
klog.Infof("amCopy.Spec.AppName: %s", chart.Name)
if err := suspendOrResumeApp(ctx, p.client, amCopy, int32(0)); err != nil {
klog.Errorf("failed to stop shared chart %s in namespace %s: %v", chart.Name, ns, err)
if am.Spec.Type != appsv1.App && am.Spec.Type != appsv1.Middleware {
continue
}
if am.Status.State == appsv1.Stopped || am.Status.State == appsv1.Stopping {
klog.Infof("app %s owner %s already in stopped/stopping state, skip", am.Spec.AppName, am.Spec.AppOwner)
continue
}
if !IsOperationAllowed(am.Status.State, appsv1.StopOp) {
klog.Infof("app %s owner %s not allowed do stop operation, skip", am.Spec.AppName, am.Spec.AppOwner)
continue
}
opID := strconv.FormatInt(time.Now().Unix(), 10)
now := metav1.Now()
status := appsv1.ApplicationManagerStatus{
OpType: appsv1.StopOp,
OpID: opID,
State: appsv1.Stopping,
StatusTime: &now,
UpdateTime: &now,
Reason: p.manager.Status.Reason,
Message: p.manager.Status.Message,
}
if _, err := apputils.UpdateAppMgrStatus(am.Name, status); err != nil {
return err
}
klog.Infof("stopping client for user %s, app %s", am.Spec.AppOwner, am.Spec.AppName)
}
}
}

View File

@@ -2,10 +2,13 @@ package appstate
import (
"context"
"encoding/json"
"fmt"
appv1alpha1 "github.com/beclab/Olares/framework/app-service/api/app.bytetrade.io/v1alpha1"
"github.com/beclab/Olares/framework/app-service/pkg/apiserver/api"
"github.com/beclab/Olares/framework/app-service/pkg/appcfg"
"github.com/pkg/errors"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
@@ -21,25 +24,25 @@ import (
const suspendAnnotation = "bytetrade.io/suspend-by"
const suspendCauseAnnotation = "bytetrade.io/suspend-cause"
func suspendOrResumeApp(ctx context.Context, cli client.Client, am *appv1alpha1.ApplicationManager, replicas int32) error {
suspend := func(list client.ObjectList) error {
namespace := am.Spec.AppNamespace
err := cli.List(ctx, list, client.InNamespace(namespace))
if err != nil && !apierrors.IsNotFound(err) {
klog.Errorf("Failed to get workload namespace=%s err=%v", namespace, err)
// suspendOrResumeApp suspends or resumes an application.
func suspendOrResumeApp(ctx context.Context, cli client.Client, am *appv1alpha1.ApplicationManager, replicas int32, stopOrResumeServer bool) error {
suspendOrResume := func(list client.ObjectList, targetNamespace, targetAppName string) error {
err := cli.List(ctx, list, client.InNamespace(targetNamespace))
if err != nil {
klog.Errorf("Failed to get workload namespace=%s err=%v", targetNamespace, err)
return err
}
listObjects, err := apimeta.ExtractList(list)
if err != nil {
klog.Errorf("Failed to extract list namespace=%s err=%v", namespace, err)
klog.Errorf("Failed to extract list namespace=%s err=%v", targetNamespace, err)
return err
}
check := func(appName, deployName string) bool {
if namespace == fmt.Sprintf("user-space-%s", am.Spec.AppOwner) ||
namespace == fmt.Sprintf("user-system-%s", am.Spec.AppOwner) ||
namespace == "os-platform" ||
namespace == "os-framework" {
if targetNamespace == fmt.Sprintf("user-space-%s", am.Spec.AppOwner) ||
targetNamespace == fmt.Sprintf("user-system-%s", am.Spec.AppOwner) ||
targetNamespace == "os-platform" ||
targetNamespace == "os-framework" {
if appName == deployName {
return true
}
@@ -54,7 +57,7 @@ func suspendOrResumeApp(ctx context.Context, cli client.Client, am *appv1alpha1.
workloadName := ""
switch workload := w.(type) {
case *appsv1.Deployment:
if check(am.Spec.AppName, workload.Name) {
if check(targetAppName, workload.Name) {
if workload.Annotations == nil {
workload.Annotations = make(map[string]string)
}
@@ -64,7 +67,7 @@ func suspendOrResumeApp(ctx context.Context, cli client.Client, am *appv1alpha1.
workloadName = workload.Namespace + "/" + workload.Name
}
case *appsv1.StatefulSet:
if check(am.Spec.AppName, workload.Name) {
if check(targetAppName, workload.Name) {
if workload.Annotations == nil {
workload.Annotations = make(map[string]string)
}
@@ -92,15 +95,79 @@ func suspendOrResumeApp(ctx context.Context, cli client.Client, am *appv1alpha1.
} // end of suspend func
var deploymentList appsv1.DeploymentList
err := suspend(&deploymentList)
err := suspendOrResume(&deploymentList, am.Spec.AppNamespace, am.Spec.AppName)
if err != nil {
return err
}
var stsList appsv1.StatefulSetList
err = suspend(&stsList)
err = suspendOrResume(&stsList, am.Spec.AppNamespace, am.Spec.AppName)
if err != nil {
return err
}
return err
// If stopOrResumeServer is true, also suspend/resume shared server charts for V2 apps
if stopOrResumeServer {
var appCfg *appcfg.ApplicationConfig
if err := json.Unmarshal([]byte(am.Spec.Config), &appCfg); err != nil {
klog.Warningf("failed to unmarshal app config for stopServer check: %v", err)
return err
}
if appCfg != nil && appCfg.IsV2() && appCfg.HasClusterSharedCharts() {
for _, chart := range appCfg.SubCharts {
if !chart.Shared {
continue
}
ns := chart.Namespace(am.Spec.AppOwner)
if replicas == 0 {
klog.Infof("suspending shared chart %s in namespace %s", chart.Name, ns)
} else {
klog.Infof("resuming shared chart %s in namespace %s", chart.Name, ns)
}
var sharedDeploymentList appsv1.DeploymentList
if err := suspendOrResume(&sharedDeploymentList, ns, chart.Name); err != nil {
klog.Errorf("failed to scale deployments in shared chart %s namespace %s: %v", chart.Name, ns, err)
return err
}
var sharedStsList appsv1.StatefulSetList
if err := suspendOrResume(&sharedStsList, ns, chart.Name); err != nil {
klog.Errorf("failed to scale statefulsets in shared chart %s namespace %s: %v", chart.Name, ns, err)
return err
}
}
}
// Reset the stop-all/resume-all annotation after processing
if am.Annotations != nil {
delete(am.Annotations, api.AppStopAllKey)
delete(am.Annotations, api.AppResumeAllKey)
if err := cli.Update(ctx, am); err != nil {
klog.Warningf("failed to reset stop-all/resume-all annotations for app=%s owner=%s: %v", am.Spec.AppName, am.Spec.AppOwner, err)
// Don't return error, operation already succeeded
}
}
}
return nil
}
func suspendV1AppOrV2Client(ctx context.Context, cli client.Client, am *appv1alpha1.ApplicationManager) error {
return suspendOrResumeApp(ctx, cli, am, 0, false)
}
func suspendV2AppAll(ctx context.Context, cli client.Client, am *appv1alpha1.ApplicationManager) error {
return suspendOrResumeApp(ctx, cli, am, 0, true)
}
func resumeV1AppOrV2AppClient(ctx context.Context, cli client.Client, am *appv1alpha1.ApplicationManager) error {
return suspendOrResumeApp(ctx, cli, am, 1, false)
}
func resumeV2AppAll(ctx context.Context, cli client.Client, am *appv1alpha1.ApplicationManager) error {
return suspendOrResumeApp(ctx, cli, am, 1, true)
}
func isStartUp(am *appv1alpha1.ApplicationManager, cli client.Client) (bool, error) {

View File

@@ -3,5 +3,6 @@ package errcode
import "errors"
var (
ErrPodPending = errors.New("pod is pending")
ErrServerSidePodPending = errors.New("server side pod is pending")
ErrPodPending = errors.New("pod is pending")
)

View File

@@ -160,6 +160,7 @@ func PublishAppEventToQueue(p utils.EventParams) {
return p.RawAppName
}(),
Title: p.Title,
Icon: p.Icon,
Reason: p.Reason,
Message: p.Message,
SharedEntrances: p.SharedEntrances,

View File

@@ -233,6 +233,7 @@ func (imc *ImageManagerClient) updateProgress(ctx context.Context, am *appv1alph
RawAppName: am.Spec.RawAppName,
Type: am.Spec.Type.String(),
Title: apputils.AppTitle(am.Spec.Config),
Icon: apputils.AppIcon(am.Spec.Config),
})
}
klog.Infof("app %s download progress.... %v", am.Spec.AppName, progressStr)

View File

@@ -42,6 +42,9 @@ const (
// TypeMySQL indicates the middleware is mysql
TypeMySQL MiddlewareType = "mysql"
// TypeClickHouse indicates the middleware is ClickHouse
TypeClickHouse MiddlewareType = "clickhouse"
)
func (mr MiddlewareType) String() string {
@@ -323,6 +326,27 @@ func Apply(middleware *Middleware, kubeConfig *rest.Config, appName, appNamespac
}
klog.Infof("values.mysql: %v", vals["mysql"])
}
if middleware.ClickHouse != nil {
username := fmt.Sprintf("%s-%s-%s", middleware.ClickHouse.Username, ownerName, appName)
err := process(kubeConfig, appName, appNamespace, namespace, username, TypeClickHouse, ownerName, middleware)
if err != nil {
return err
}
resp, err := getMiddlewareRequest(TypeClickHouse)
if err != nil {
klog.Errorf("failed to get clickHouse middleware request info %v", err)
return err
}
vals["clickhouse"] = map[string]interface{}{
"host": resp.Host,
"port": resp.Port,
"username": resp.UserName,
"password": resp.Password,
"databases": resp.Databases,
}
klog.Infof("values.clickhouse: %v", vals["clickhouse"])
}
return nil
}
@@ -383,6 +407,8 @@ func getPassword(middleware *Middleware, middlewareType MiddlewareType) (string,
return middleware.MariaDB.Password, nil
case TypeMySQL:
return middleware.MySQL.Password, nil
case TypeClickHouse:
return middleware.ClickHouse.Password, nil
}
return "", fmt.Errorf("unsupported middleware type %v", middlewareType)
}

View File

@@ -287,6 +287,32 @@ spec:
user: {{ .Middleware.Username }}
`
const clickHouseRequest = `apiVersion: apr.bytetrade.io/v1alpha1
kind: MiddlewareRequest
metadata:
name: {{ .AppName }}-clickhouse
namespace: {{ .Namespace }}
spec:
app: {{ .AppName }}
appNamespace: {{ .AppNamespace }}
middleware: clickhouse
clickhouse:
databases:
{{- range $k, $v := .Middleware.Databases }}
- name: {{ $v.Name }}
{{- end }}
password:
{{- if not (eq .Middleware.Password "") }}
value: {{ .Middleware.Password }}
{{- else }}
valueFrom:
secretKeyRef:
name: {{ .AppName }}-{{ .Namespace }}-clickhouse-password
key: "password"
{{- end }}
user: {{ .Middleware.Username }}
`
type RequestParams struct {
MiddlewareType MiddlewareType
AppName string
@@ -318,6 +344,8 @@ func GenMiddleRequest(p RequestParams) ([]byte, error) {
return genMariadbRequest(p)
case TypeMySQL:
return genMysqlRequest(p)
case TypeClickHouse:
return genClickHouseRequest(p)
default:
return []byte{}, fmt.Errorf("unsupported middleware type: %s", p.MiddlewareType)
}
@@ -512,3 +540,22 @@ func genElasticsearchRequest(p RequestParams) ([]byte, error) {
}
return renderTemplate(elasticsearchRequest, data)
}
func genClickHouseRequest(p RequestParams) ([]byte, error) {
data := struct {
AppName string
AppNamespace string
Namespace string
Middleware *ClickHouseConfig
}{
AppName: p.AppName,
AppNamespace: p.AppNamespace,
Namespace: p.Namespace,
Middleware: &ClickHouseConfig{
Username: p.Username,
Password: p.Password,
Databases: p.Middleware.ClickHouse.Databases,
},
}
return renderTemplate(clickHouseRequest, data)
}

View File

@@ -12,6 +12,7 @@ type Middleware struct {
MariaDB *MariaDBConfig `yaml:"mariadb,omitempty"`
MySQL *MySQLConfig `yaml:"mysql,omitempty"`
Argo *ArgoConfig `yaml:"argo,omitempty"`
ClickHouse *ClickHouseConfig `yaml:"clickHouse,omitempty"`
}
// Database specify database name and if distributed.
@@ -92,6 +93,13 @@ type MySQLConfig struct {
Databases []Database `yaml:"databases" json:"databases"`
}
// ClickHouseConfig contains fields for clickhouse config.
type ClickHouseConfig struct {
Username string `yaml:"username" json:"username"`
Password string `yaml:"password,omitempty" json:"password"`
Databases []Database `yaml:"databases" json:"databases"`
}
type NatsConfig struct {
Username string `yaml:"username" json:"username"`
Password string `yaml:"password,omitempty" json:"password,omitempty"`

View File

@@ -1080,13 +1080,28 @@ func IsClonedApp(appName, rawAppName string) bool {
}
func AppTitle(config string) string {
var cfg appcfg.ApplicationConfig
err := json.Unmarshal([]byte(config), &cfg)
if err != nil {
cfg := unmarshalApplicationConfig(config)
if cfg == nil {
return ""
}
return cfg.Title
}
func AppIcon(config string) string {
cfg := unmarshalApplicationConfig(config)
if cfg == nil {
return ""
}
return cfg.Icon
}
func unmarshalApplicationConfig(config string) *appcfg.ApplicationConfig {
var cfg appcfg.ApplicationConfig
err := json.Unmarshal([]byte(config), &cfg)
if err != nil {
return nil
}
return &cfg
}
func GetRawAppName(AppName, rawAppName string) string {
if rawAppName == "" {

View File

@@ -24,6 +24,7 @@ import (
"github.com/beclab/Olares/framework/app-service/pkg/utils"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -260,42 +261,10 @@ func CheckAppRequirement(token string, appConfig *appcfg.ApplicationConfig, op v
}
}
allocatedResources, err := getRequestResources()
if err != nil {
return "", "", err
}
if len(allocatedResources) == 1 {
sufficientCPU, sufficientMemory := false, false
if appConfig.Requirement.CPU == nil {
sufficientCPU = true
}
if appConfig.Requirement.Memory == nil {
sufficientMemory = true
}
for _, v := range allocatedResources {
if appConfig.Requirement.CPU != nil {
if v.cpu.allocatable.Cmp(*appConfig.Requirement.CPU) > 0 {
sufficientCPU = true
}
}
if appConfig.Requirement.Memory != nil {
if v.memory.allocatable.Cmp(*appConfig.Requirement.Memory) > 0 {
sufficientMemory = true
}
}
}
if !sufficientCPU {
return constants.CPU, constants.K8sRequestCPUPressure, fmt.Errorf(constants.K8sRequestCPUPressureMessage, op)
}
if !sufficientMemory {
return constants.Memory, constants.K8sRequestMemoryPressure, fmt.Errorf(constants.K8sRequestMemoryPressureMessage, op)
}
}
return "", "", nil
return CheckAppK8sRequestResource(appConfig, op)
}
func getRequestResources() (map[string]resources, error) {
func GetRequestResources() (map[string]resources, error) {
config, err := ctrl.GetConfig()
if err != nil {
return nil, err
@@ -960,3 +929,83 @@ func CheckCloneEntrances(ctrlClient client.Client, appConfig *appcfg.Application
return nil, nil
}
func GetClusterAvailableResource() (*resources, error) {
config, err := ctrl.GetConfig()
if err != nil {
return nil, err
}
client, err := kubernetes.NewForConfig(config)
if err != nil {
return nil, err
}
nodes, err := client.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{})
if err != nil {
return nil, err
}
initAllocatable := resource.MustParse("0")
availableResources := resources{
cpu: usage{allocatable: &initAllocatable},
memory: usage{allocatable: &initAllocatable},
}
nodeList := make([]corev1.Node, 0)
for _, node := range nodes.Items {
if !utils.IsNodeReady(&node) || node.Spec.Unschedulable {
continue
}
nodeList = append(nodeList, node)
}
if len(nodeList) == 0 {
return nil, errors.New("cluster has no suitable node to schedule")
}
for _, node := range nodeList {
availableResources.cpu.allocatable.Add(*node.Status.Allocatable.Cpu())
availableResources.memory.allocatable.Add(*node.Status.Allocatable.Memory())
fieldSelector := fmt.Sprintf("spec.nodeName=%s,status.phase!=Failed,status.phase!=Succeeded", node.Name)
pods, err := client.CoreV1().Pods("").List(context.TODO(), metav1.ListOptions{
FieldSelector: fieldSelector,
})
if err != nil {
return nil, err
}
for _, pod := range pods.Items {
for _, container := range pod.Spec.Containers {
availableResources.cpu.allocatable.Sub(*container.Resources.Requests.Cpu())
availableResources.memory.allocatable.Sub(*container.Resources.Requests.Memory())
}
}
}
return &availableResources, nil
}
func CheckAppK8sRequestResource(appConfig *appcfg.ApplicationConfig, op v1alpha1.OpType) (constants.ResourceType, constants.ResourceConditionType, error) {
availableResources, err := GetClusterAvailableResource()
if err != nil {
return "", "", err
}
if appConfig == nil {
return "", "", errors.New("nil appConfig")
}
sufficientCPU, sufficientMemory := false, false
if appConfig.Requirement.CPU == nil {
sufficientCPU = true
}
if appConfig.Requirement.Memory == nil {
sufficientMemory = true
}
if appConfig.Requirement.CPU != nil && availableResources.cpu.allocatable.Cmp(*appConfig.Requirement.CPU) > 0 {
sufficientCPU = true
}
if appConfig.Requirement.Memory != nil && availableResources.memory.allocatable.Cmp(*appConfig.Requirement.Memory) > 0 {
sufficientMemory = true
}
if !sufficientCPU {
return constants.CPU, constants.K8sRequestCPUPressure, fmt.Errorf(constants.K8sRequestCPUPressureMessage, op)
}
if !sufficientMemory {
return constants.Memory, constants.K8sRequestMemoryPressure, fmt.Errorf(constants.K8sRequestMemoryPressureMessage, op)
}
return "", "", nil
}

View File

@@ -162,6 +162,9 @@ func GetResourceListFromChart(chartPath string, values map[string]interface{}) (
values["elasticsearch"] = map[string]interface{}{
"indexes": map[string]interface{}{},
}
values["clickhouse"] = map[string]interface{}{
"databases": map[string]interface{}{},
}
values["svcs"] = map[string]interface{}{}
values["nats"] = map[string]interface{}{
"subjects": map[string]interface{}{},

View File

@@ -25,6 +25,7 @@ type Event struct {
User string `json:"user"`
EntranceStatuses []v1alpha1.EntranceStatus `json:"entranceStatuses,omitempty"`
Title string `json:"title,omitempty"`
Icon string `json:"icon,omitempty"`
Reason string `json:"reason,omitempty"`
Message string `json:"message,omitempty"`
SharedEntrances []v1alpha1.Entrance `json:"sharedEntrances,omitempty"`
@@ -45,6 +46,7 @@ type EventParams struct {
Reason string
Message string
SharedEntrances []v1alpha1.Entrance
Icon string
}
func PublishEvent(nc *nats.Conn, subject string, data interface{}) error {