Compare commits

...

8 Commits

Author SHA1 Message Date
eball
82a5cbe08b feat: add support for selecting GPU types in application installation (#2458)
* fix: failed release upgrade

* fix: helm upgrade do not use atomic param and allow upgrade failed release

* feat: add clickhouse support

* appservice image tag to 0.4.76

* feat: add icon filed to nats event

* chores: get all node gpu types

* feat: add support for selecting GPU types in application installation

* feat: enhance GPU type selection logic in application installation

* feat: replace hardcoded GPU type with constant for supported GPU selection

* feat: update app config methods to include selected GPU type and enhance validation for NVIDIA GPUs

* feat: update supported GPU handling to include default options and improve validation logic

* feat: update GPU resource handling to unset previous limits before setting new ones

* feat: refactor permission parsing to use exported function and update related calls

---------

Co-authored-by: hys <hysyeah@gmail.com>
2026-02-03 11:19:24 +08:00
hys
a88cedb0ce set appservice image tag to 0.4.77 2026-01-28 20:27:10 +08:00
hys
06d0d36042 fix: add spec ports 2026-01-28 20:27:10 +08:00
hys
67deaf16ea fix: check k8s request before into installing state 2026-01-28 20:27:10 +08:00
hys
b27854b863 fix: v2 app stop 2026-01-28 20:27:10 +08:00
hys
4fd22c4e20 feat: add icon filed to nats event 2026-01-28 20:27:10 +08:00
hys
031d8164ff fix: helm upgrade do not use atomic param and allow upgrade failed release 2026-01-28 20:27:10 +08:00
hys
0c6def8f43 fix: failed release upgrade 2026-01-28 20:27:10 +08:00
39 changed files with 782 additions and 516 deletions

View File

@@ -170,7 +170,7 @@ spec:
priorityClassName: "system-cluster-critical"
containers:
- name: app-service
image: beclab/app-service:0.4.76
image: beclab/app-service:0.4.77
imagePullPolicy: IfNotPresent
ports:
- containerPort: 6755
@@ -196,7 +196,7 @@ spec:
- name: SYS_APPS
value: "market,auth,citus,desktop,did,docs,files,fsnotify,headscale,infisical,intentprovider,ksserver,message,mongo,monitoring,notifications,profile,redis,recommend,seafile,search,search-admin,settings,systemserver,tapr,vault,video,zinc,accounts,control-hub,dashboard,nitro,olares-app"
- name: KB_MIDDLEWARES
value: "mongodb,minio,mysql,mariadb,elasticsearch,rabbitmq"
value: "mongodb,minio,mysql,mariadb,elasticsearch,rabbitmq,clickhouse"
- name: GENERATED_APPS
value: "citus,mongo-cluster-cfg,mongo-cluster-mongos,mongo-cluster-rs0,frp-agent,l4-bfl-proxy,drc-redis-cluster,appdata-backend,argoworkflows,argoworkflow-workflow-controller,velero,kvrocks"
- name: WS_CONTAINER_IMAGE

View File

@@ -193,6 +193,7 @@ func (r *ApplicationManagerController) publishStateChangeEvent(am *appv1alpha1.A
RawAppName: am.Spec.RawAppName,
Type: am.Spec.Type.String(),
Title: apputils.AppTitle(am.Spec.Config),
Icon: apputils.AppIcon(am.Spec.Config),
Reason: am.Status.Reason,
Message: am.Status.Message,
})

View File

@@ -252,6 +252,7 @@ func (r *EntranceStatusManagerController) updateEntranceStatus(ctx context.Conte
RawAppName: appCopy.Spec.RawAppName,
Type: am.Spec.Type.String(),
Title: app.AppTitle(am.Spec.Config),
Icon: app.AppIcon(am.Spec.Config),
SharedEntrances: appCopy.Spec.SharedEntrances,
})
}

View File

@@ -4,9 +4,11 @@ import (
"context"
"os"
"strconv"
"strings"
"time"
appv1alpha1 "github.com/beclab/Olares/framework/app-service/api/app.bytetrade.io/v1alpha1"
"github.com/beclab/Olares/framework/app-service/pkg/apiserver/api"
"github.com/beclab/Olares/framework/app-service/pkg/appstate"
"github.com/beclab/Olares/framework/app-service/pkg/constants"
apputils "github.com/beclab/Olares/framework/app-service/pkg/utils/app"
@@ -120,7 +122,7 @@ func (r *PodAbnormalSuspendAppController) Reconcile(ctx context.Context, req ctr
if pod.Status.Reason == "Evicted" {
klog.Infof("pod evicted name=%s namespace=%s, attempting to suspend app=%s owner=%s", pod.Name, pod.Namespace, appName, owner)
ok, err := r.trySuspendApp(ctx, owner, appName, constants.AppStopDueToEvicted, "evicted pod: "+pod.Namespace+"/"+pod.Name)
ok, err := r.trySuspendApp(ctx, owner, appName, constants.AppStopDueToEvicted, "evicted pod: "+pod.Namespace+"/"+pod.Name, pod.Namespace)
if err != nil {
klog.Errorf("suspend attempt failed for app=%s owner=%s: %v", appName, owner, err)
return ctrl.Result{}, err
@@ -147,7 +149,7 @@ func (r *PodAbnormalSuspendAppController) Reconcile(ctx context.Context, req ctr
}
klog.Infof("attempting to suspend app=%s owner=%s due to pending unschedulable timeout", appName, owner)
ok, err := r.trySuspendApp(ctx, owner, appName, constants.AppUnschedulable, "pending unschedulable timeout on pod: "+pod.Namespace+"/"+pod.Name)
ok, err := r.trySuspendApp(ctx, owner, appName, constants.AppUnschedulable, "pending unschedulable timeout on pod: "+pod.Namespace+"/"+pod.Name, pod.Namespace)
if err != nil {
klog.Errorf("suspend attempt failed for app=%s owner=%s: %v", appName, owner, err)
return ctrl.Result{}, err
@@ -191,7 +193,7 @@ func pendingUnschedulableSince(pod *corev1.Pod) (time.Time, bool) {
// trySuspendApp attempts to suspend the app and returns (true, nil) if a suspend request was issued.
// If the app is not suspendable yet, returns (false, nil) to trigger a short requeue.
func (r *PodAbnormalSuspendAppController) trySuspendApp(ctx context.Context, owner, appName, reason, message string) (bool, error) {
func (r *PodAbnormalSuspendAppController) trySuspendApp(ctx context.Context, owner, appName, reason, message, podNamespace string) (bool, error) {
name, err := apputils.FmtAppMgrName(appName, owner, "")
if err != nil {
klog.Errorf("failed to format app manager name app=%s owner=%s: %v", appName, owner, err)
@@ -215,6 +217,11 @@ func (r *PodAbnormalSuspendAppController) trySuspendApp(ctx context.Context, own
return false, nil
}
isServerPod := strings.HasSuffix(podNamespace, "-shared")
if isServerPod {
am.Annotations[api.AppStopAllKey] = "true"
}
am.Spec.OpType = appv1alpha1.StopOp
if err := r.Update(ctx, &am); err != nil {
klog.Errorf("failed to update applicationmanager spec to StopOp name=%s app=%s owner=%s: %v", am.Name, appName, owner, err)

View File

@@ -126,15 +126,16 @@ type UpgradeRequest struct {
// InstallRequest represents a request to install an application.
type InstallRequest struct {
Dev bool `json:"devMode"`
RepoURL string `json:"repoUrl"`
CfgURL string `json:"cfgUrl"`
Source AppSource `json:"source"`
Images []Image `json:"images"`
Envs []sysv1alpha1.AppEnvVar `json:"envs"`
RawAppName string `json:"rawAppName"`
Title string `json:"title"`
Entrances []EntranceClone `json:"entrances"`
Dev bool `json:"devMode"`
RepoURL string `json:"repoUrl"`
CfgURL string `json:"cfgUrl"`
Source AppSource `json:"source"`
Images []Image `json:"images"`
Envs []sysv1alpha1.AppEnvVar `json:"envs"`
RawAppName string `json:"rawAppName"`
Title string `json:"title"`
Entrances []EntranceClone `json:"entrances"`
SelectedGpuType string `json:"selectedGpuType"`
}
type Image struct {

View File

@@ -3,11 +3,14 @@ package apiserver
import (
"context"
"encoding/json"
"fmt"
"os"
"sort"
"strconv"
"strings"
"golang.org/x/exp/maps"
"github.com/beclab/Olares/framework/app-service/api/app.bytetrade.io/v1alpha1"
"github.com/beclab/Olares/framework/app-service/pkg/apiserver/api"
"github.com/beclab/Olares/framework/app-service/pkg/appcfg"
@@ -426,6 +429,11 @@ func (h *Handler) apps(req *restful.Request, resp *restful.Response) {
api.HandleError(resp, req, err)
return
}
for i := range appconfig.Entrances {
if appconfig.Entrances[i].AuthLevel == "" {
appconfig.Entrances[i].AuthLevel = "private"
}
}
now := metav1.Now()
name, _ := apputils.FmtAppMgrName(am.Spec.AppName, owner, appconfig.Namespace)
app := &v1alpha1.Application{
@@ -443,6 +451,7 @@ func (h *Handler) apps(req *restful.Request, resp *restful.Response) {
Owner: owner,
Entrances: appconfig.Entrances,
SharedEntrances: appconfig.SharedEntrances,
Ports: appconfig.Ports,
Icon: appconfig.Icon,
Settings: map[string]string{
"title": am.Annotations[constants.ApplicationTitleLabel],
@@ -477,6 +486,8 @@ func (h *Handler) apps(req *restful.Request, resp *restful.Response) {
}
if v, ok := appsMap[a.Name]; ok {
v.Spec.Settings = a.Spec.Settings
v.Spec.Entrances = a.Spec.Entrances
v.Spec.Ports = a.Spec.Ports
}
}
}
@@ -738,6 +749,11 @@ func (h *Handler) allUsersApps(req *restful.Request, resp *restful.Response) {
api.HandleError(resp, req, err)
return
}
for i := range appconfig.Entrances {
if appconfig.Entrances[i].AuthLevel == "" {
appconfig.Entrances[i].AuthLevel = "private"
}
}
now := metav1.Now()
app := v1alpha1.Application{
@@ -754,6 +770,7 @@ func (h *Handler) allUsersApps(req *restful.Request, resp *restful.Response) {
Namespace: am.Spec.AppNamespace,
Owner: am.Spec.AppOwner,
Entrances: appconfig.Entrances,
Ports: appconfig.Ports,
SharedEntrances: appconfig.SharedEntrances,
Icon: appconfig.Icon,
Settings: map[string]string{
@@ -788,6 +805,8 @@ func (h *Handler) allUsersApps(req *restful.Request, resp *restful.Response) {
}
if v, ok := appsMap[a.Name]; ok {
v.Spec.Settings = a.Spec.Settings
v.Spec.Entrances = a.Spec.Entrances
v.Spec.Ports = a.Spec.Ports
}
}
@@ -930,12 +949,37 @@ func (h *Handler) oamValues(req *restful.Request, resp *restful.Response) {
api.HandleError(resp, req, err)
return
}
gpuType, err := utils.FindGpuTypeFromNodes(&nodes)
gpuTypes, err := utils.GetAllGpuTypesFromNodes(&nodes)
if err != nil {
klog.Errorf("get gpu type failed %v", gpuType)
klog.Errorf("get gpu type failed %v", err)
api.HandleError(resp, req, err)
return
}
gpuType := "none"
selectedGpuType := req.QueryParameter("gputype")
if len(gpuTypes) > 0 {
if selectedGpuType != "" {
if _, ok := gpuTypes[selectedGpuType]; ok {
gpuType = selectedGpuType
} else {
err := fmt.Errorf("selected gpu type %s not found in cluster", selectedGpuType)
klog.Error(err)
api.HandleError(resp, req, err)
return
}
} else {
if len(gpuTypes) == 1 {
gpuType = maps.Keys(gpuTypes)[0]
} else {
err := fmt.Errorf("multiple gpu types found in cluster, please specify one")
klog.Error(err)
api.HandleError(resp, req, err)
return
}
}
}
values["GPU"] = map[string]interface{}{
"Type": gpuType,
"Cuda": os.Getenv("OLARES_SYSTEM_CUDA_VERSION"),

View File

@@ -1,174 +1,33 @@
package apiserver
import (
"fmt"
"sync"
"time"
"github.com/beclab/Olares/framework/app-service/pkg/apiserver/api"
"github.com/beclab/Olares/framework/app-service/pkg/client/clientset"
"github.com/beclab/Olares/framework/app-service/pkg/constants"
"github.com/beclab/Olares/framework/app-service/pkg/utils"
"golang.org/x/exp/maps"
"sigs.k8s.io/controller-runtime/pkg/client"
"github.com/emicklei/go-restful/v3"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/util/retry"
"k8s.io/klog/v2"
)
var running bool = false
var switchLock sync.Mutex
func (h *Handler) disableGpuManagedMemory(req *restful.Request, resp *restful.Response) {
if err := h.nvshareSwitch(req, false); err != nil {
api.HandleError(resp, req, &errors.StatusError{
ErrStatus: metav1.Status{Code: 400, Message: "operation failed, " + err.Error()},
})
func (h *Handler) getGpuTypes(req *restful.Request, resp *restful.Response) {
var nodes corev1.NodeList
err := h.ctrlClient.List(req.Request.Context(), &nodes, &client.ListOptions{})
if err != nil {
klog.Errorf("list node failed %v", err)
api.HandleError(resp, req, err)
return
}
resp.WriteAsJson(map[string]int{"code": 0})
}
func (h *Handler) enableGpuManagedMemory(req *restful.Request, resp *restful.Response) {
if err := h.nvshareSwitch(req, true); err != nil {
api.HandleError(resp, req, &errors.StatusError{
ErrStatus: metav1.Status{Code: 400, Message: "operation failed, " + err.Error()},
})
gpuTypes, err := utils.GetAllGpuTypesFromNodes(&nodes)
if err != nil {
klog.Errorf("get gpu type failed %v", err)
api.HandleError(resp, req, err)
return
}
resp.WriteAsJson(map[string]int{"code": 0})
}
func (h *Handler) nvshareSwitch(req *restful.Request, enable bool) error {
client := req.Attribute(constants.KubeSphereClientAttribute).(*clientset.ClientSet)
switchLock.Lock()
defer switchLock.Unlock()
if running {
return fmt.Errorf("last operation is still running")
}
deployments, err := client.KubeClient.Kubernetes().AppsV1().Deployments("").List(req.Request.Context(), metav1.ListOptions{})
if err != nil {
klog.Error("list deployment error, ", err)
return err
}
envValue := "0"
if enable {
envValue = "1"
}
for _, d := range deployments.Items {
shouldUpdate := false
for i, c := range d.Spec.Template.Spec.Containers {
found := false
for k := range c.Resources.Limits {
if k == constants.NvshareGPU {
found = true
break
}
}
if found {
// a gpu request container
addEnv := true
for n, env := range d.Spec.Template.Spec.Containers[i].Env {
if env.Name == constants.EnvNvshareManagedMemory {
addEnv = false
d.Spec.Template.Spec.Containers[i].Env[n].Value = envValue
break
}
}
if addEnv {
d.Spec.Template.Spec.Containers[i].Env =
append(d.Spec.Template.Spec.Containers[i].Env,
corev1.EnvVar{Name: constants.EnvNvshareManagedMemory, Value: envValue})
}
shouldUpdate = true
} // end found
} // end of container loop
if shouldUpdate {
err = retry.RetryOnConflict(retry.DefaultRetry, func() error {
deployment, err := client.KubeClient.Kubernetes().AppsV1().Deployments(d.Namespace).
Get(req.Request.Context(), d.Name, metav1.GetOptions{})
if err != nil {
return err
}
deployment.Spec.Template.Spec.Containers = d.Spec.Template.Spec.Containers
_, err = client.KubeClient.Kubernetes().AppsV1().Deployments(d.Namespace).
Update(req.Request.Context(), deployment, metav1.UpdateOptions{})
return err
})
if err != nil {
klog.Error("update deployment error, ", err, ", ", d.Name, ", ", d.Namespace)
return err
}
} // should update
} // end of deployment loop
// update terminus
err = retry.RetryOnConflict(retry.DefaultRetry, func() error {
terminus, err := utils.GetTerminus(req.Request.Context(), h.ctrlClient)
if err != nil {
return err
}
terminus.Spec.Settings[constants.EnvNvshareManagedMemory] = envValue
return h.ctrlClient.Update(req.Request.Context(), terminus)
})
if err != nil {
klog.Error("update terminus error, ", err)
return err
}
running = true
// delay 30s, assume the all pods will be reload in 30s.
delay := time.NewTimer(30 * time.Second)
go func() {
<-delay.C
switchLock.Lock()
defer switchLock.Unlock()
running = false
}()
return nil
}
func (h *Handler) getManagedMemoryValue(req *restful.Request, resp *restful.Response) {
terminus, err := utils.GetTerminus(req.Request.Context(), h.ctrlClient)
if err != nil {
klog.Error("get terminus value error, ", err)
api.HandleError(resp, req, &errors.StatusError{
ErrStatus: metav1.Status{Code: 400, Message: "get value error, " + err.Error()},
})
return
}
managed := true
if v, ok := terminus.Spec.Settings[constants.EnvNvshareManagedMemory]; ok && v == "0" {
managed = false
}
resp.WriteAsJson(&map[string]interface{}{
"managed_memory": managed,
"gpu_types": maps.Keys(gpuTypes),
},
)
}

View File

@@ -21,9 +21,12 @@ import (
"github.com/beclab/Olares/framework/app-service/pkg/utils"
apputils "github.com/beclab/Olares/framework/app-service/pkg/utils/app"
"github.com/beclab/Olares/framework/app-service/pkg/utils/config"
"golang.org/x/exp/maps"
"sigs.k8s.io/controller-runtime/pkg/client"
"github.com/emicklei/go-restful/v3"
"helm.sh/helm/v3/pkg/time"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
@@ -37,7 +40,7 @@ type depRequest struct {
type installHelperIntf interface {
getAdminUsers() (admin []string, isAdmin bool, err error)
getInstalledApps() (installed bool, app []*v1alpha1.Application, err error)
getAppConfig(adminUsers []string, marketSource string, isAdmin, appInstalled bool, installedApps []*v1alpha1.Application, chartVersion string) (err error)
getAppConfig(adminUsers []string, marketSource string, isAdmin, appInstalled bool, installedApps []*v1alpha1.Application, chartVersion, selectedGpuType string) (err error)
setAppConfig(req *api.InstallRequest, appName string)
validate(bool, []*v1alpha1.Application) error
setAppEnv(overrides []sysv1alpha1.AppEnvVar) error
@@ -105,6 +108,36 @@ func (h *Handler) install(req *restful.Request, resp *restful.Response) {
}
}
// check selected gpu type can be supported
// if selectedGpuType != "" , then check if the gpu type exists in cluster
// if selectedGpuType == "" , and only one gpu type exists in cluster, then use it
var nodes corev1.NodeList
err = h.ctrlClient.List(req.Request.Context(), &nodes, &client.ListOptions{})
if err != nil {
klog.Errorf("list node failed %v", err)
api.HandleError(resp, req, err)
return
}
gpuTypes, err := utils.GetAllGpuTypesFromNodes(&nodes)
if err != nil {
klog.Errorf("get gpu type failed %v", err)
api.HandleError(resp, req, err)
return
}
if insReq.SelectedGpuType != "" {
if _, ok := gpuTypes[insReq.SelectedGpuType]; !ok {
klog.Errorf("selected gpu type %s not found in cluster", insReq.SelectedGpuType)
api.HandleBadRequest(resp, req, fmt.Errorf("selected gpu type %s not found in cluster", insReq.SelectedGpuType))
return
}
} else {
if len(gpuTypes) == 1 {
insReq.SelectedGpuType = maps.Keys(gpuTypes)[0]
klog.Infof("only one gpu type %s found in cluster, use it as selected gpu type", insReq.SelectedGpuType)
}
}
apiVersion, appCfg, err := apputils.GetApiVersionFromAppConfig(req.Request.Context(), &apputils.ConfigOptions{
App: app,
RawAppName: rawAppName,
@@ -112,6 +145,7 @@ func (h *Handler) install(req *restful.Request, resp *restful.Response) {
RepoURL: insReq.RepoURL,
MarketSource: marketSource,
Version: chartVersion,
SelectedGpu: insReq.SelectedGpuType,
})
klog.Infof("chartVersion: %s", chartVersion)
if err != nil {
@@ -188,7 +222,7 @@ func (h *Handler) install(req *restful.Request, resp *restful.Response) {
return
}
err = helper.getAppConfig(adminUsers, marketSource, isAdmin, appInstalled, installedApps, chartVersion)
err = helper.getAppConfig(adminUsers, marketSource, isAdmin, appInstalled, installedApps, chartVersion, insReq.SelectedGpuType)
if err != nil {
klog.Errorf("Failed to get app config err=%v", err)
return
@@ -423,7 +457,7 @@ func (h *installHandlerHelper) getInstalledApps() (installed bool, app []*v1alph
return
}
func (h *installHandlerHelper) getAppConfig(adminUsers []string, marketSource string, isAdmin, appInstalled bool, installedApps []*v1alpha1.Application, chartVersion string) (err error) {
func (h *installHandlerHelper) getAppConfig(adminUsers []string, marketSource string, isAdmin, appInstalled bool, installedApps []*v1alpha1.Application, chartVersion, selectedGpuType string) (err error) {
var (
admin string
installAsAdmin bool
@@ -472,6 +506,7 @@ func (h *installHandlerHelper) getAppConfig(adminUsers []string, marketSource st
Admin: admin,
IsAdmin: installAsAdmin,
MarketSource: marketSource,
SelectedGpu: selectedGpuType,
})
if err != nil {
klog.Errorf("Failed to get appconfig err=%v", err)
@@ -685,7 +720,7 @@ func (h *installHandlerHelperV2) _validateClusterScope(isAdmin bool, installedAp
return nil
}
func (h *installHandlerHelperV2) getAppConfig(adminUsers []string, marketSource string, isAdmin, appInstalled bool, installedApps []*v1alpha1.Application, chartVersion string) (err error) {
func (h *installHandlerHelperV2) getAppConfig(adminUsers []string, marketSource string, isAdmin, appInstalled bool, installedApps []*v1alpha1.Application, chartVersion, selectedGpuType string) (err error) {
klog.Info("get app config for install handler v2")
var (
@@ -713,6 +748,7 @@ func (h *installHandlerHelperV2) getAppConfig(adminUsers []string, marketSource
Admin: admin,
MarketSource: marketSource,
IsAdmin: isAdmin,
SelectedGpu: selectedGpuType,
})
if err != nil {
klog.Errorf("Failed to get appconfig err=%v", err)

View File

@@ -187,6 +187,11 @@ func (h *Handler) listBackend(req *restful.Request, resp *restful.Response) {
api.HandleError(resp, req, err)
return
}
for i := range appconfig.Entrances {
if appconfig.Entrances[i].AuthLevel == "" {
appconfig.Entrances[i].AuthLevel = "private"
}
}
appconfig.SharedEntrances, err = appconfig.GenSharedEntranceURL(req.Request.Context())
if err != nil {
@@ -214,6 +219,7 @@ func (h *Handler) listBackend(req *restful.Request, resp *restful.Response) {
Namespace: am.Spec.AppNamespace,
Owner: am.Spec.AppOwner,
Entrances: appconfig.Entrances,
Ports: appconfig.Ports,
SharedEntrances: appconfig.SharedEntrances,
Icon: appconfig.Icon,
Settings: map[string]string{
@@ -274,6 +280,8 @@ func (h *Handler) listBackend(req *restful.Request, resp *restful.Response) {
}
if v, ok := appsMap[a.Name]; ok {
v.Spec.Settings = a.Spec.Settings
v.Spec.Entrances = a.Spec.Entrances
v.Spec.Ports = a.Spec.Ports
}
}
}

View File

@@ -13,6 +13,7 @@ import (
"github.com/beclab/Olares/framework/app-service/api/app.bytetrade.io/v1alpha1"
"github.com/beclab/Olares/framework/app-service/pkg/apiserver/api"
"github.com/beclab/Olares/framework/app-service/pkg/appcfg"
"github.com/beclab/Olares/framework/app-service/pkg/appinstaller"
"github.com/beclab/Olares/framework/app-service/pkg/appstate"
"github.com/beclab/Olares/framework/app-service/pkg/client/clientset"
"github.com/beclab/Olares/framework/app-service/pkg/constants"
@@ -520,6 +521,7 @@ type applicationPermission struct {
Permissions []permission `json:"permissions"`
}
// Deprecated
func (h *Handler) applicationPermissionList(req *restful.Request, resp *restful.Response) {
owner := req.Attribute(constants.UserContextAttribute).(string)
//token := req.HeaderParameter(constants.AuthorizationTokenKey)
@@ -572,46 +574,39 @@ func (h *Handler) applicationPermissionList(req *restful.Request, resp *restful.
func (h *Handler) getApplicationPermission(req *restful.Request, resp *restful.Response) {
app := req.PathParameter(ParamAppName)
owner := req.Attribute(constants.UserContextAttribute).(string)
client, err := dynamic.NewForConfig(h.kubeConfig)
name, err := apputils.FmtAppMgrName(app, owner, "")
if err != nil {
api.HandleError(resp, req, err)
return
}
var am v1alpha1.ApplicationManager
err = h.ctrlClient.Get(req.Request.Context(), types.NamespacedName{Name: name}, &am)
if err != nil {
api.HandleError(resp, req, err)
return
}
var appConfig appcfg.ApplicationConfig
err = am.GetAppConfig(&appConfig)
if err != nil {
klog.Errorf("Failed to get app config err=%v", err)
api.HandleError(resp, req, err)
return
}
var ret *applicationPermission
apClient := provider.NewApplicationPermissionRequest(client)
namespace := fmt.Sprintf("user-system-%s", owner)
aps, err := apClient.List(req.Request.Context(), namespace, metav1.ListOptions{})
if err != nil {
api.HandleError(resp, req, err)
return
}
for _, ap := range aps.Items {
if ap.Object == nil {
continue
}
appName, _, _ := unstructured.NestedString(ap.Object, "spec", "app")
if appName == app {
perms, _, _ := unstructured.NestedSlice(ap.Object, "spec", "permissions")
permissions := appinstaller.ParseAppPermission(appConfig.Permission)
for _, ap := range permissions {
if perms, ok := ap.([]appcfg.ProviderPermission); ok {
permissions := make([]permission, 0)
for _, p := range perms {
if perm, ok := p.(map[string]interface{}); ok {
ops := make([]string, 0)
for _, op := range perm["ops"].([]interface{}) {
if opStr, ok := op.(string); ok {
ops = append(ops, opStr)
}
}
permissions = append(permissions, permission{
DataType: perm["dataType"].(string),
Group: perm["group"].(string),
Version: perm["version"].(string),
Ops: ops,
})
}
permissions = append(permissions, permission{
DataType: p.ProviderName,
Group: p.AppName,
})
}
ret = &applicationPermission{
App: appName,
App: am.Spec.AppName,
Owner: owner,
Permissions: permissions,
}
@@ -642,6 +637,7 @@ type opApi struct {
URI string `json:"uri"`
}
// Deprecated
func (h *Handler) getProviderRegistry(req *restful.Request, resp *restful.Response) {
dataTypeReq := req.PathParameter(ParamDataType)
groupReq := req.PathParameter(ParamGroup)
@@ -708,56 +704,44 @@ func (h *Handler) getProviderRegistry(req *restful.Request, resp *restful.Respon
func (h *Handler) getApplicationProviderList(req *restful.Request, resp *restful.Response) {
owner := req.Attribute(constants.UserContextAttribute).(string)
app := req.PathParameter(ParamAppName)
client, err := dynamic.NewForConfig(h.kubeConfig)
name, err := apputils.FmtAppMgrName(app, owner, "")
if err != nil {
api.HandleError(resp, req, err)
return
}
var am v1alpha1.ApplicationManager
err = h.ctrlClient.Get(req.Request.Context(), types.NamespacedName{Name: name}, &am)
if err != nil {
api.HandleError(resp, req, err)
return
}
var appConfig appcfg.ApplicationConfig
err = am.GetAppConfig(&appConfig)
if err != nil {
klog.Errorf("Failed to get app config err=%v", err)
api.HandleError(resp, req, err)
return
}
ret := make([]providerRegistry, 0)
rClient := provider.NewRegistryRequest(client)
namespace := fmt.Sprintf("user-system-%s", owner)
prs, err := rClient.List(req.Request.Context(), namespace, metav1.ListOptions{})
if err != nil {
api.HandleError(resp, req, err)
return
}
for _, ap := range prs.Items {
if ap.Object == nil {
continue
}
deployment, _, _ := unstructured.NestedString(ap.Object, "spec", "deployment")
kind, _, _ := unstructured.NestedString(ap.Object, "spec", "kind")
if app == deployment && kind == "provider" {
dataType, _, _ := unstructured.NestedString(ap.Object, "spec", "dataType")
group, _, _ := unstructured.NestedString(ap.Object, "spec", "group")
description, _, _ := unstructured.NestedString(ap.Object, "spec", "description")
endpoint, _, _ := unstructured.NestedString(ap.Object, "spec", "endpoint")
ns, _, _ := unstructured.NestedString(ap.Object, "spec", "namespace")
version, _, _ := unstructured.NestedString(ap.Object, "spec", "version")
opApis := make([]opApi, 0)
opApiList, _, _ := unstructured.NestedSlice(ap.Object, "spec", "opApis")
for _, op := range opApiList {
if aop, ok := op.(map[string]interface{}); ok {
opApis = append(opApis, opApi{
Name: aop["name"].(string),
URI: aop["uri"].(string),
})
}
}
ret = append(ret, providerRegistry{
DataType: dataType,
Deployment: deployment,
Description: description,
Endpoint: endpoint,
Kind: kind,
Group: group,
Namespace: ns,
OpApis: opApis,
Version: version,
ns := am.Spec.AppNamespace
for _, ap := range appConfig.Provider {
dataType := ap.Name
endpoint := ap.Entrance
opApis := make([]opApi, 0)
for _, op := range ap.Paths {
opApis = append(opApis, opApi{
URI: op,
})
}
ret = append(ret, providerRegistry{
DataType: dataType,
Endpoint: endpoint,
Namespace: ns,
OpApis: opApis,
})
}
resp.WriteAsJson(ret)
}

View File

@@ -37,7 +37,6 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/dynamic"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/webhook/admission"
)
@@ -308,36 +307,21 @@ func (h *Handler) gpuLimitMutate(ctx context.Context, req *admissionv1.Admission
return resp
}
GPUType, err := h.findNvidiaGpuFromNodes(ctx)
if err != nil && !errors.Is(err, api.ErrGPUNodeNotFound) {
return h.sidecarWebhook.AdmissionError(req.UID, err)
}
GPUType := appcfg.GetSelectedGpuTypeValue()
// no gpu found, no need to inject env, just return.
if GPUType == "" {
if GPUType == "none" || GPUType == "" {
return resp
}
terminus, err := utils.GetTerminus(ctx, h.ctrlClient)
if err != nil {
return h.sidecarWebhook.AdmissionError(req.UID, err)
}
nvshareManagedMemory := ""
if terminus.Spec.Settings != nil {
nvshareManagedMemory = terminus.Spec.Settings[constants.EnvNvshareManagedMemory]
envs := []webhook.EnvKeyValue{
{
Key: constants.EnvGPUType,
Value: GPUType,
},
}
envs := []webhook.EnvKeyValue{}
if nvshareManagedMemory != "" {
envs = append(envs, webhook.EnvKeyValue{
Key: constants.EnvNvshareManagedMemory,
Value: nvshareManagedMemory,
})
}
envs = append(envs, webhook.EnvKeyValue{Key: "NVSHARE_DEBUG", Value: "1"})
patchBytes, err := webhook.CreatePatchForDeployment(tpl, req.Namespace, gpuRequired, GPUType, envs)
patchBytes, err := webhook.CreatePatchForDeployment(tpl, h.getGPUResourceTypeKey(GPUType), envs)
if err != nil {
klog.Errorf("create patch error %v", err)
return h.sidecarWebhook.AdmissionError(req.UID, err)
@@ -347,33 +331,17 @@ func (h *Handler) gpuLimitMutate(ctx context.Context, req *admissionv1.Admission
return resp
}
func (h *Handler) findNvidiaGpuFromNodes(ctx context.Context) (string, error) {
var nodes corev1.NodeList
err := h.ctrlClient.List(ctx, &nodes, &client.ListOptions{})
if err != nil {
return "", err
func (h *Handler) getGPUResourceTypeKey(gpuType string) string {
switch gpuType {
case utils.NvidiaCardType:
return constants.NvidiaGPU
case utils.GB10ChipType:
return constants.NvidiaGB10GPU
case utils.AmdApuCardType:
return constants.AMDAPU
default:
return ""
}
// return nvshare gpu or virtaitech gpu in priority
gtype := ""
for _, n := range nodes.Items {
if _, ok := n.Status.Capacity[constants.NvidiaGPU]; ok {
if _, ok = n.Status.Capacity[constants.NvshareGPU]; ok {
return constants.NvshareGPU, nil
}
gtype = constants.NvidiaGPU
}
if _, ok := n.Status.Capacity[constants.VirtAiTechVGPU]; ok {
return constants.VirtAiTechVGPU, nil
}
}
if gtype != "" {
return gtype, nil
}
return "", api.ErrGPUNodeNotFound
}
func (h *Handler) providerRegistryValidate(req *restful.Request, resp *restful.Response) {

View File

@@ -340,7 +340,9 @@ func GetClusterResource(kubeConfig *rest.Config, token string) (*prometheus.Clus
arches.Insert(n.Labels["kubernetes.io/arch"])
if quantity, ok := n.Status.Capacity[constants.NvidiaGPU]; ok {
total += quantity.AsApproximateFloat64()
} else if quantity, ok = n.Status.Capacity[constants.VirtAiTechVGPU]; ok {
} else if quantity, ok = n.Status.Capacity[constants.NvidiaGB10GPU]; ok {
total += quantity.AsApproximateFloat64()
} else if quantity, ok = n.Status.Capacity[constants.AMDAPU]; ok {
total += quantity.AsApproximateFloat64()
}
}

View File

@@ -254,21 +254,9 @@ func addServiceToContainer(c *restful.Container, handler *Handler) error {
Param(ws.PathParameter(ParamEntranceName, "the name of a application entrance")).
Returns(http.StatusOK, "Success to set the application entrance policy", nil))
ws.Route(ws.POST("/gpu/disable/managed-memory").
To(handler.disableGpuManagedMemory).
Doc("disable nvshare's managed memory ").
Metadata(restfulspec.KeyOpenAPITags, MODULE_TAGS).
Returns(http.StatusOK, "Success to disable", nil))
ws.Route(ws.POST("/gpu/enable/managed-memory").
To(handler.enableGpuManagedMemory).
Doc("enable nvshare's managed memory ").
Metadata(restfulspec.KeyOpenAPITags, MODULE_TAGS).
Returns(http.StatusOK, "Success to enable", nil))
ws.Route(ws.GET("/gpu/managed-memory").
To(handler.getManagedMemoryValue).
Doc("get nvshare's managed memory enabled or not").
ws.Route(ws.GET("/gpu/types").
To(handler.getGpuTypes).
Doc("get all gpu types in the cluster").
Metadata(restfulspec.KeyOpenAPITags, MODULE_TAGS).
Returns(http.StatusOK, "Success to get ", &ResultResponse{}))

View File

@@ -56,14 +56,19 @@ type AppSpec struct {
Developer string `yaml:"developer" json:"developer"`
RequiredMemory string `yaml:"requiredMemory" json:"requiredMemory"`
RequiredDisk string `yaml:"requiredDisk" json:"requiredDisk"`
SupportClient SupportClient `yaml:"supportClient" json:"supportClient"`
RequiredGPU string `yaml:"requiredGpu" json:"requiredGpu"`
RequiredCPU string `yaml:"requiredCpu" json:"requiredCpu"`
LimitedMemory string `yaml:"limitedMemory" json:"limitedMemory"`
LimitedDisk string `yaml:"limitedDisk" json:"limitedDisk"`
LimitedGPU string `yaml:"limitedGPU" json:"limitedGPU"`
LimitedCPU string `yaml:"limitedCPU" json:"limitedCPU"`
SupportClient SupportClient `yaml:"supportClient" json:"supportClient"`
RunAsUser bool `yaml:"runAsUser" json:"runAsUser"`
RunAsInternal bool `yaml:"runAsInternal" json:"runAsInternal"`
PodGPUConsumePolicy string `yaml:"podGpuConsumePolicy" json:"podGpuConsumePolicy"`
SubCharts []Chart `yaml:"subCharts" json:"subCharts"`
Hardware Hardware `yaml:"hardware" json:"hardware"`
SupportedGpu []any `yaml:"supportedGpu,omitempty" json:"supportedGpu,omitempty"`
}
type Hardware struct {
@@ -188,6 +193,17 @@ type Provider struct {
Verbs []string `yaml:"verbs" json:"verbs"`
}
type SpecialResource struct {
RequiredMemory *string `yaml:"requiredMemory,omitempty" json:"requiredMemory,omitempty"`
RequiredDisk *string `yaml:"requiredDisk,omitempty" json:"requiredDisk,omitempty"`
RequiredGPU *string `yaml:"requiredGpu,omitempty" json:"requiredGpu,omitempty"`
RequiredCPU *string `yaml:"requiredCpu,omitempty" json:"requiredCpu,omitempty"`
LimitedMemory *string `yaml:"limitedMemory,omitempty" json:"limitedMemory,omitempty"`
LimitedDisk *string `yaml:"limitedDisk,omitempty" json:"limitedDisk,omitempty"`
LimitedGPU *string `yaml:"limitedGPU,omitempty" json:"limitedGPU,omitempty"`
LimitedCPU *string `yaml:"limitedCPU,omitempty" json:"limitedCPU,omitempty"`
}
func (c *Chart) Namespace(owner string) string {
if c.Shared {
return fmt.Sprintf("%s-%s", c.Name, "shared")

View File

@@ -100,6 +100,7 @@ type ApplicationConfig struct {
PodsSelectors []metav1.LabelSelector
HardwareRequirement Hardware
SharedEntrances []v1alpha1.Entrance
SelectedGpuType string
}
func (c *ApplicationConfig) IsMiddleware() bool {
@@ -159,6 +160,13 @@ func (c *ApplicationConfig) GenSharedEntranceURL(ctx context.Context) ([]v1alpha
return app.GenSharedEntranceURL(ctx)
}
func (c *ApplicationConfig) GetSelectedGpuTypeValue() string {
if c.SelectedGpuType == "" {
return "none"
}
return c.SelectedGpuType
}
func (p *ProviderPermission) GetNamespace(ownerName string) string {
if p.Namespace != "" {
if p.Namespace == "user-space" || p.Namespace == "user-system" {

View File

@@ -564,7 +564,7 @@ func (h *HelmOps) WaitForStartUp() (bool, error) {
}
return true, nil
}
if errors.Is(err, errcode.ErrPodPending) {
if errors.Is(err, errcode.ErrPodPending) || errors.Is(err, errcode.ErrServerSidePodPending) {
return false, err
}
@@ -576,11 +576,41 @@ func (h *HelmOps) WaitForStartUp() (bool, error) {
}
func (h *HelmOps) isStartUp() (bool, error) {
pods, err := h.findAppSelectedPods()
if h.app.IsV2() && h.app.IsMultiCharts() {
serverPods, err := h.findServerPods()
if err != nil {
return false, err
}
podNames := make([]string, 0)
for _, p := range serverPods {
podNames = append(podNames, p.Name)
}
klog.Infof("podSErvers: %v", podNames)
serverStarted, err := checkIfStartup(serverPods, true)
if err != nil {
klog.Errorf("v2 app %s server pods not ready: %v", h.app.AppName, err)
return false, err
}
if !serverStarted {
klog.Infof("v2 app %s server pods not started yet, waiting...", h.app.AppName)
return false, nil
}
klog.Infof("v2 app %s server pods started, checking client pods", h.app.AppName)
}
clientPods, err := h.findV1OrClientPods()
if err != nil {
return false, err
}
return checkIfStartup(pods)
clientStarted, err := checkIfStartup(clientPods, false)
if err != nil {
return false, err
}
return clientStarted, nil
}
func (h *HelmOps) findAppSelectedPods() (*corev1.PodList, error) {
@@ -610,15 +640,49 @@ func (h *HelmOps) findAppSelectedPods() (*corev1.PodList, error) {
return pods, nil
}
func checkIfStartup(pods *corev1.PodList) (bool, error) {
if len(pods.Items) == 0 {
func (h *HelmOps) findV1OrClientPods() ([]corev1.Pod, error) {
podList, err := h.client.KubeClient.Kubernetes().CoreV1().Pods(h.app.Namespace).List(h.ctx, metav1.ListOptions{})
if err != nil {
klog.Errorf("app %s get pods err %v", h.app.AppName, err)
return nil, err
}
return podList.Items, nil
}
func (h *HelmOps) findServerPods() ([]corev1.Pod, error) {
pods := make([]corev1.Pod, 0)
for _, c := range h.app.SubCharts {
if !c.Shared {
continue
}
ns := c.Namespace(h.app.OwnerName)
podList, err := h.client.KubeClient.Kubernetes().CoreV1().Pods(ns).List(h.ctx, metav1.ListOptions{})
if err != nil {
klog.Errorf("app %s get pods err %v", h.app.AppName, err)
return nil, err
}
pods = append(pods, podList.Items...)
}
return pods, nil
}
func checkIfStartup(pods []corev1.Pod, isServerSide bool) (bool, error) {
if len(pods) == 0 {
return false, errors.New("no pod found")
}
for _, pod := range pods.Items {
startedPods := 0
totalPods := len(pods)
for _, pod := range pods {
creationTime := pod.GetCreationTimestamp()
pendingDuration := time.Since(creationTime.Time)
if pod.Status.Phase == corev1.PodPending && pendingDuration > time.Minute*10 {
if isServerSide {
return false, errcode.ErrServerSidePodPending
}
return false, errcode.ErrPodPending
}
totalContainers := len(pod.Spec.Containers)
@@ -630,9 +694,12 @@ func checkIfStartup(pods *corev1.PodList) (bool, error) {
}
}
if startedContainers == totalContainers {
return true, nil
startedPods++
}
}
if totalPods == startedPods {
return true, nil
}
return false, nil
}
@@ -685,7 +752,7 @@ func getApplicationPolicy(policies []appcfg.AppPolicy, entrances []appv1alpha1.E
return string(policyStr), nil
}
func parseAppPermission(data []appcfg.AppPermission) []appcfg.AppPermission {
func ParseAppPermission(data []appcfg.AppPermission) []appcfg.AppPermission {
permissions := make([]appcfg.AppPermission, 0)
for _, p := range data {
switch perm := p.(type) {
@@ -796,7 +863,7 @@ func (h *HelmOps) Install() error {
return nil
}
ok, err := h.WaitForStartUp()
if err != nil && errors.Is(err, errcode.ErrPodPending) {
if err != nil && (errors.Is(err, errcode.ErrPodPending) || errors.Is(err, errcode.ErrServerSidePodPending)) {
return err
}
if !ok {

View File

@@ -78,7 +78,7 @@ func (h *HelmOps) Uninstall_(client kubernetes.Interface, actionConfig *action.C
return err
}
h.app.Permission = parseAppPermission(h.app.Permission)
h.app.Permission = ParseAppPermission(h.app.Permission)
var perm []appcfg.ProviderPermission
for _, p := range h.app.Permission {
if t, ok := p.([]appcfg.ProviderPermission); ok {

View File

@@ -50,7 +50,7 @@ func (h *HelmOps) SetValues() (values map[string]interface{}, err error) {
values["domain"] = entries
userspace := make(map[string]interface{})
h.app.Permission = parseAppPermission(h.app.Permission)
h.app.Permission = ParseAppPermission(h.app.Permission)
for _, p := range h.app.Permission {
switch perm := p.(type) {
case appcfg.AppDataPermission, appcfg.AppCachePermission, appcfg.UserDataPermission:
@@ -170,17 +170,12 @@ func (h *HelmOps) SetValues() (values map[string]interface{}, err error) {
values["cluster"] = map[string]interface{}{
"arch": arch,
}
gpuType, err := utils.FindGpuTypeFromNodes(nodes)
if err != nil {
klog.Errorf("Failed to get gpuType err=%v", err)
return values, err
}
values["GPU"] = map[string]interface{}{
"Type": gpuType,
"Type": h.app.GetSelectedGpuTypeValue(),
"Cuda": os.Getenv("OLARES_SYSTEM_CUDA_VERSION"),
}
values["gpu"] = gpuType
values["gpu"] = h.app.GetSelectedGpuTypeValue()
if h.app.OIDC.Enabled {
err = h.createOIDCClient(values, zone, h.app.Namespace)

View File

@@ -51,7 +51,7 @@ func (h *HelmOpsV2) ApplyEnv() error {
}
ok, err := h.WaitForStartUp()
if err != nil && errors.Is(err, errcode.ErrPodPending) {
if err != nil && (errors.Is(err, errcode.ErrPodPending) || errors.Is(err, errcode.ErrServerSidePodPending)) {
return err
}

View File

@@ -119,7 +119,7 @@ func (h *HelmOpsV2) Install() error {
return nil
}
ok, err := h.WaitForStartUp()
if err != nil && errors.Is(err, errcode.ErrPodPending) {
if err != nil && (errors.Is(err, errcode.ErrPodPending) || errors.Is(err, errcode.ErrServerSidePodPending)) {
klog.Errorf("App %s is pending, err=%v", h.App().AppName, err)
return err
}

View File

@@ -91,7 +91,7 @@ func (h *HelmOpsV2) Upgrade() error {
}
ok, err := h.WaitForStartUp()
if err != nil && errors.Is(err, errcode.ErrPodPending) {
if err != nil && (errors.Is(err, errcode.ErrPodPending) || errors.Is(err, errcode.ErrServerSidePodPending)) {
return err
}

View File

@@ -13,9 +13,9 @@ import (
"github.com/beclab/Olares/framework/app-service/pkg/images"
"github.com/beclab/Olares/framework/app-service/pkg/kubesphere"
"github.com/beclab/Olares/framework/app-service/pkg/utils"
apputils "github.com/beclab/Olares/framework/app-service/pkg/utils/app"
"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
"k8s.io/klog/v2"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
@@ -47,6 +47,29 @@ func (r *downloadingInProgressApp) WaitAsync(ctx context.Context) {
return
}
// Check Kubernetes request resources before transitioning to Installing
var appConfig *appcfg.ApplicationConfig
if err := json.Unmarshal([]byte(r.manager.Spec.Config), &appConfig); err != nil {
klog.Errorf("failed to unmarshal app config for %s: %v", r.manager.Spec.AppName, err)
updateErr := r.updateStatus(context.TODO(), r.manager, appsv1.InstallFailed, nil, fmt.Sprintf("invalid app config: %v", err), "")
if updateErr != nil {
klog.Errorf("update app manager %s to %s state failed %v", r.manager.Name, appsv1.InstallFailed.String(), updateErr)
}
return
}
_, conditionType, checkErr := apputils.CheckAppK8sRequestResource(appConfig, r.manager.Spec.OpType)
if checkErr != nil {
klog.Errorf("k8s request resource check failed for app %s: %v", r.manager.Spec.AppName, checkErr)
opRecord := makeRecord(r.manager, appsv1.InstallFailed, checkErr.Error())
updateErr := r.updateStatus(context.TODO(), r.manager, appsv1.InstallFailed, opRecord, checkErr.Error(), string(conditionType))
if updateErr != nil {
klog.Errorf("update app manager %s to %s state failed %v", r.manager.Name, appsv1.InstallFailed.String(), updateErr)
}
return
}
updateErr := r.updateStatus(context.TODO(), r.manager, appsv1.Installing, nil, appsv1.Installing.String(), "")
if updateErr != nil {
klog.Errorf("update app manager %s to %s state failed %v", r.manager.Name, appsv1.Installing.String(), updateErr)
@@ -152,19 +175,8 @@ func (p *DownloadingApp) exec(ctx context.Context) error {
},
}
var nodes corev1.NodeList
err = p.client.List(ctx, &nodes, &client.ListOptions{})
if err != nil {
klog.Errorf("list node failed %v", err)
return err
}
gpuType, err := utils.FindGpuTypeFromNodes(&nodes)
if err != nil {
klog.Errorf("get gpu type failed %v", gpuType)
return err
}
values["GPU"] = map[string]interface{}{
"Type": gpuType,
"Type": appConfig.GetSelectedGpuTypeValue(),
"Cuda": os.Getenv("OLARES_SYSTEM_CUDA_VERSION"),
}

View File

@@ -16,6 +16,7 @@ import (
apputils "github.com/beclab/Olares/framework/app-service/pkg/utils/app"
"github.com/pkg/errors"
"k8s.io/apimachinery/pkg/types"
"k8s.io/klog/v2"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
@@ -104,8 +105,37 @@ func (p *InstallingApp) Exec(ctx context.Context) (StatefulInProgressApp, error)
err = ops.Install()
if err != nil {
klog.Errorf("install app %s failed %v", p.manager.Spec.AppName, err)
if errors.Is(err, errcode.ErrPodPending) {
if errors.Is(err, errcode.ErrServerSidePodPending) {
p.finally = func() {
klog.Infof("app %s server side pods is pending, set stop-all annotation and update app state to stopping", p.manager.Spec.AppName)
var am appsv1.ApplicationManager
if err := p.client.Get(context.TODO(), types.NamespacedName{Name: p.manager.Name}, &am); err != nil {
klog.Errorf("failed to get application manager: %v", err)
return
}
if am.Annotations == nil {
am.Annotations = make(map[string]string)
}
am.Annotations[api.AppStopAllKey] = "true"
if err := p.client.Update(ctx, &am); err != nil {
klog.Errorf("failed to set stop-all annotation: %v", err)
return
}
updateErr := p.updateStatus(ctx, &am, appsv1.Stopping, nil, err.Error(), constants.AppUnschedulable)
if updateErr != nil {
klog.Errorf("update status failed %v", updateErr)
return
}
}
return
}
if errors.Is(err, errcode.ErrPodPending) {
p.finally = func() {
klog.Infof("app %s pods is still pending, update app state to stopping", p.manager.Spec.AppName)
updateErr := p.updateStatus(context.TODO(), p.manager, appsv1.Stopping, nil, err.Error(), constants.AppUnschedulable)

View File

@@ -2,13 +2,11 @@ package appstate
import (
"context"
"encoding/json"
"fmt"
"time"
appsv1 "github.com/beclab/Olares/framework/app-service/api/app.bytetrade.io/v1alpha1"
"github.com/beclab/Olares/framework/app-service/pkg/apiserver/api"
"github.com/beclab/Olares/framework/app-service/pkg/appcfg"
"github.com/beclab/Olares/framework/app-service/pkg/constants"
"github.com/beclab/Olares/framework/app-service/pkg/kubeblocks"
"github.com/beclab/Olares/framework/app-service/pkg/users/userspace"
@@ -60,41 +58,24 @@ func (p *ResumingApp) Exec(ctx context.Context) (StatefulInProgressApp, error) {
}
func (p *ResumingApp) exec(ctx context.Context) error {
err := suspendOrResumeApp(ctx, p.client, p.manager, int32(1))
if err != nil {
klog.Errorf("resume %s %s failed %v", p.manager.Spec.Type, p.manager.Spec.AppName, err)
return fmt.Errorf("resume app %s failed %w", p.manager.Spec.AppName, err)
}
// If resume-all is requested, also resume v2 server-side shared charts by scaling them up
if p.manager.Annotations[api.AppResumeAllKey] == "true" {
var appCfg *appcfg.ApplicationConfig
if err := json.Unmarshal([]byte(p.manager.Spec.Config), &appCfg); err != nil {
klog.Errorf("unmarshal to appConfig failed %v", err)
return err
// Check if resume-all is requested for V2 apps to also resume server-side shared charts
resumeServer := p.manager.Annotations[api.AppResumeAllKey] == "true"
if resumeServer {
err := resumeV2AppAll(ctx, p.client, p.manager)
if err != nil {
klog.Errorf("resume v2 app %s %s failed %v", p.manager.Spec.Type, p.manager.Spec.AppName, err)
return fmt.Errorf("resume v2 app %s failed %w", p.manager.Spec.AppName, err)
}
if appCfg != nil && appCfg.IsV2() && appCfg.HasClusterSharedCharts() {
for _, chart := range appCfg.SubCharts {
if !chart.Shared {
continue
}
ns := chart.Namespace(appCfg.OwnerName)
// create a shallow copy with target namespace/name for scaling logic
amCopy := p.manager.DeepCopy()
amCopy.Spec.AppNamespace = ns
amCopy.Spec.AppName = chart.Name
klog.Infof("resume-amCopy.Spec.AppNamespace: %s", ns)
klog.Infof("resume-amCopy.Spec.AppName: %s", chart.Name)
if err := suspendOrResumeApp(ctx, p.client, amCopy, int32(1)); err != nil {
klog.Errorf("failed to resume shared chart %s in namespace %s: %v", chart.Name, ns, err)
return err
}
}
} else {
err := resumeV1AppOrV2AppClient(ctx, p.client, p.manager)
if err != nil {
klog.Errorf("resume v2 app %s %s failed %v", p.manager.Spec.Type, p.manager.Spec.AppName, err)
return fmt.Errorf("resume v2 app %s failed %w", p.manager.Spec.AppName, err)
}
}
if p.manager.Spec.Type == "middleware" && userspace.IsKbMiddlewares(p.manager.Spec.AppName) {
err = p.execMiddleware(ctx)
err := p.execMiddleware(ctx)
if err != nil {
klog.Errorf("failed to resume middleware %s,err=%v", p.manager.Spec.AppName, err)
return err

View File

@@ -2,10 +2,13 @@ package appstate
import (
"context"
"fmt"
"time"
appsv1 "github.com/beclab/Olares/framework/app-service/api/app.bytetrade.io/v1alpha1"
"github.com/beclab/Olares/framework/app-service/pkg/apiserver/api"
"github.com/beclab/Olares/framework/app-service/pkg/kubeblocks"
"github.com/beclab/Olares/framework/app-service/pkg/users/userspace"
kbopv1alpha1 "github.com/apecloud/kubeblocks/apis/operations/v1alpha1"
"k8s.io/klog/v2"
@@ -44,20 +47,30 @@ func (p *SuspendFailedApp) Exec(ctx context.Context) (StatefulInProgressApp, err
}
func (p *SuspendFailedApp) StateReconcile(ctx context.Context) error {
err := suspendOrResumeApp(ctx, p.client, p.manager, int32(0))
if err != nil {
klog.Errorf("stop-failed-app %s state reconcile failed %v", p.manager.Spec.AppName, err)
return err
stopServer := p.manager.Annotations[api.AppStopAllKey] == "true"
if stopServer {
err := suspendV2AppAll(ctx, p.client, p.manager)
if err != nil {
klog.Errorf("suspend v2 app %s %s failed %v", p.manager.Spec.Type, p.manager.Spec.AppName, err)
return fmt.Errorf("suspend v2 app %s failed %w", p.manager.Spec.AppName, err)
}
} else {
err := suspendV1AppOrV2Client(ctx, p.client, p.manager)
if err != nil {
klog.Errorf("suspend app %s %s failed %v", p.manager.Spec.Type, p.manager.Spec.AppName, err)
return fmt.Errorf("suspend app %s failed %w", p.manager.Spec.AppName, err)
}
}
if p.manager.Spec.Type == "middleware" {
if p.manager.Spec.Type == "middleware" && userspace.IsKbMiddlewares(p.manager.Spec.AppName) {
op := kubeblocks.NewOperation(ctx, kbopv1alpha1.StopType, p.manager, p.client)
err = op.Stop()
err := op.Stop()
if err != nil {
klog.Errorf("stop-failed-middleware %s state reconcile failed %v", p.manager.Spec.AppName, err)
return err
}
}
return err
return nil
}
func (p *SuspendFailedApp) Cancel(ctx context.Context) error {

View File

@@ -2,17 +2,17 @@ package appstate
import (
"context"
"encoding/json"
"fmt"
"strconv"
"time"
appsv1 "github.com/beclab/Olares/framework/app-service/api/app.bytetrade.io/v1alpha1"
"github.com/beclab/Olares/framework/app-service/pkg/apiserver/api"
"github.com/beclab/Olares/framework/app-service/pkg/appcfg"
"github.com/beclab/Olares/framework/app-service/pkg/constants"
"github.com/beclab/Olares/framework/app-service/pkg/kubeblocks"
"github.com/beclab/Olares/framework/app-service/pkg/users/userspace"
"github.com/beclab/Olares/framework/app-service/pkg/utils"
apputils "github.com/beclab/Olares/framework/app-service/pkg/utils/app"
kbopv1alpha1 "github.com/apecloud/kubeblocks/apis/operations/v1alpha1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -78,35 +78,68 @@ func (p *SuspendingApp) Exec(ctx context.Context) (StatefulInProgressApp, error)
}
func (p *SuspendingApp) exec(ctx context.Context) error {
err := suspendOrResumeApp(ctx, p.client, p.manager, int32(0))
if err != nil {
klog.Errorf("suspend %s %s failed %v", p.manager.Spec.Type, p.manager.Spec.AppName, err)
return fmt.Errorf("suspend app %s failed %w", p.manager.Spec.AppName, err)
}
// If stop-all is requested, also stop v2 server-side shared charts by scaling them down
if p.manager.Annotations[api.AppStopAllKey] == "true" {
var appCfg *appcfg.ApplicationConfig
if err := json.Unmarshal([]byte(p.manager.Spec.Config), &appCfg); err != nil {
klog.Errorf("unmarshal to appConfig failed %v", err)
return err
// Check if stop-all is requested for V2 apps to also stop server-side shared charts
stopServer := p.manager.Annotations[api.AppStopAllKey] == "true"
if stopServer {
err := suspendV2AppAll(ctx, p.client, p.manager)
if err != nil {
klog.Errorf("suspend v2 app %s %s failed %v", p.manager.Spec.Type, p.manager.Spec.AppName, err)
return fmt.Errorf("suspend v2 app %s failed %w", p.manager.Spec.AppName, err)
}
if appCfg != nil && appCfg.IsV2() && appCfg.HasClusterSharedCharts() {
for _, chart := range appCfg.SubCharts {
if !chart.Shared {
} else {
err := suspendV1AppOrV2Client(ctx, p.client, p.manager)
if err != nil {
klog.Errorf("suspend app %s %s failed %v", p.manager.Spec.Type, p.manager.Spec.AppName, err)
return fmt.Errorf("suspend app %s failed %w", p.manager.Spec.AppName, err)
}
}
if stopServer {
// For V2 cluster-scoped apps, when server is down, stop all other users' clients
// because they share the same server and cannot function without it
klog.Infof("stopping other users' clients for v2 app %s", p.manager.Spec.AppName)
var appManagerList appsv1.ApplicationManagerList
if err := p.client.List(ctx, &appManagerList); err != nil {
klog.Errorf("failed to list application managers: %v", err)
} else {
// find all ApplicationManagers with same AppName but different AppOwner
for _, am := range appManagerList.Items {
// Skip if same owner (already handled) or different app
if am.Spec.AppName != p.manager.Spec.AppName || am.Spec.AppOwner == p.manager.Spec.AppOwner {
continue
}
ns := chart.Namespace(appCfg.OwnerName)
// create a shallow copy with target namespace/name for scaling logic
amCopy := p.manager.DeepCopy()
amCopy.Spec.AppNamespace = ns
amCopy.Spec.AppName = chart.Name
klog.Infof("amCopy.Spec.AppNamespace: %s", ns)
klog.Infof("amCopy.Spec.AppName: %s", chart.Name)
if err := suspendOrResumeApp(ctx, p.client, amCopy, int32(0)); err != nil {
klog.Errorf("failed to stop shared chart %s in namespace %s: %v", chart.Name, ns, err)
if am.Spec.Type != appsv1.App && am.Spec.Type != appsv1.Middleware {
continue
}
if am.Status.State == appsv1.Stopped || am.Status.State == appsv1.Stopping {
klog.Infof("app %s owner %s already in stopped/stopping state, skip", am.Spec.AppName, am.Spec.AppOwner)
continue
}
if !IsOperationAllowed(am.Status.State, appsv1.StopOp) {
klog.Infof("app %s owner %s not allowed do stop operation, skip", am.Spec.AppName, am.Spec.AppOwner)
continue
}
opID := strconv.FormatInt(time.Now().Unix(), 10)
now := metav1.Now()
status := appsv1.ApplicationManagerStatus{
OpType: appsv1.StopOp,
OpID: opID,
State: appsv1.Stopping,
StatusTime: &now,
UpdateTime: &now,
Reason: p.manager.Status.Reason,
Message: p.manager.Status.Message,
}
if _, err := apputils.UpdateAppMgrStatus(am.Name, status); err != nil {
return err
}
klog.Infof("stopping client for user %s, app %s", am.Spec.AppOwner, am.Spec.AppName)
}
}
}

View File

@@ -22,7 +22,6 @@ import (
"github.com/pkg/errors"
"helm.sh/helm/v3/pkg/action"
corev1 "k8s.io/api/core/v1"
"k8s.io/klog/v2"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
@@ -214,19 +213,8 @@ func (p *UpgradingApp) exec(ctx context.Context) error {
"username": p.manager.Spec.AppOwner,
},
}
var nodes corev1.NodeList
err = p.client.List(ctx, &nodes, &client.ListOptions{})
if err != nil {
klog.Errorf("list node failed %v", err)
return err
}
gpuType, err := utils.FindGpuTypeFromNodes(&nodes)
if err != nil {
klog.Errorf("get gpu type failed %v", gpuType)
return err
}
values["GPU"] = map[string]interface{}{
"Type": gpuType,
"Type": appConfig.GetSelectedGpuTypeValue(),
"Cuda": os.Getenv("OLARES_SYSTEM_CUDA_VERSION"),
}

View File

@@ -2,10 +2,13 @@ package appstate
import (
"context"
"encoding/json"
"fmt"
appv1alpha1 "github.com/beclab/Olares/framework/app-service/api/app.bytetrade.io/v1alpha1"
"github.com/beclab/Olares/framework/app-service/pkg/apiserver/api"
"github.com/beclab/Olares/framework/app-service/pkg/appcfg"
"github.com/pkg/errors"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
@@ -21,25 +24,25 @@ import (
const suspendAnnotation = "bytetrade.io/suspend-by"
const suspendCauseAnnotation = "bytetrade.io/suspend-cause"
func suspendOrResumeApp(ctx context.Context, cli client.Client, am *appv1alpha1.ApplicationManager, replicas int32) error {
suspend := func(list client.ObjectList) error {
namespace := am.Spec.AppNamespace
err := cli.List(ctx, list, client.InNamespace(namespace))
if err != nil && !apierrors.IsNotFound(err) {
klog.Errorf("Failed to get workload namespace=%s err=%v", namespace, err)
// suspendOrResumeApp suspends or resumes an application.
func suspendOrResumeApp(ctx context.Context, cli client.Client, am *appv1alpha1.ApplicationManager, replicas int32, stopOrResumeServer bool) error {
suspendOrResume := func(list client.ObjectList, targetNamespace, targetAppName string) error {
err := cli.List(ctx, list, client.InNamespace(targetNamespace))
if err != nil {
klog.Errorf("Failed to get workload namespace=%s err=%v", targetNamespace, err)
return err
}
listObjects, err := apimeta.ExtractList(list)
if err != nil {
klog.Errorf("Failed to extract list namespace=%s err=%v", namespace, err)
klog.Errorf("Failed to extract list namespace=%s err=%v", targetNamespace, err)
return err
}
check := func(appName, deployName string) bool {
if namespace == fmt.Sprintf("user-space-%s", am.Spec.AppOwner) ||
namespace == fmt.Sprintf("user-system-%s", am.Spec.AppOwner) ||
namespace == "os-platform" ||
namespace == "os-framework" {
if targetNamespace == fmt.Sprintf("user-space-%s", am.Spec.AppOwner) ||
targetNamespace == fmt.Sprintf("user-system-%s", am.Spec.AppOwner) ||
targetNamespace == "os-platform" ||
targetNamespace == "os-framework" {
if appName == deployName {
return true
}
@@ -54,7 +57,7 @@ func suspendOrResumeApp(ctx context.Context, cli client.Client, am *appv1alpha1.
workloadName := ""
switch workload := w.(type) {
case *appsv1.Deployment:
if check(am.Spec.AppName, workload.Name) {
if check(targetAppName, workload.Name) {
if workload.Annotations == nil {
workload.Annotations = make(map[string]string)
}
@@ -64,7 +67,7 @@ func suspendOrResumeApp(ctx context.Context, cli client.Client, am *appv1alpha1.
workloadName = workload.Namespace + "/" + workload.Name
}
case *appsv1.StatefulSet:
if check(am.Spec.AppName, workload.Name) {
if check(targetAppName, workload.Name) {
if workload.Annotations == nil {
workload.Annotations = make(map[string]string)
}
@@ -92,15 +95,79 @@ func suspendOrResumeApp(ctx context.Context, cli client.Client, am *appv1alpha1.
} // end of suspend func
var deploymentList appsv1.DeploymentList
err := suspend(&deploymentList)
err := suspendOrResume(&deploymentList, am.Spec.AppNamespace, am.Spec.AppName)
if err != nil {
return err
}
var stsList appsv1.StatefulSetList
err = suspend(&stsList)
err = suspendOrResume(&stsList, am.Spec.AppNamespace, am.Spec.AppName)
if err != nil {
return err
}
return err
// If stopOrResumeServer is true, also suspend/resume shared server charts for V2 apps
if stopOrResumeServer {
var appCfg *appcfg.ApplicationConfig
if err := json.Unmarshal([]byte(am.Spec.Config), &appCfg); err != nil {
klog.Warningf("failed to unmarshal app config for stopServer check: %v", err)
return err
}
if appCfg != nil && appCfg.IsV2() && appCfg.HasClusterSharedCharts() {
for _, chart := range appCfg.SubCharts {
if !chart.Shared {
continue
}
ns := chart.Namespace(am.Spec.AppOwner)
if replicas == 0 {
klog.Infof("suspending shared chart %s in namespace %s", chart.Name, ns)
} else {
klog.Infof("resuming shared chart %s in namespace %s", chart.Name, ns)
}
var sharedDeploymentList appsv1.DeploymentList
if err := suspendOrResume(&sharedDeploymentList, ns, chart.Name); err != nil {
klog.Errorf("failed to scale deployments in shared chart %s namespace %s: %v", chart.Name, ns, err)
return err
}
var sharedStsList appsv1.StatefulSetList
if err := suspendOrResume(&sharedStsList, ns, chart.Name); err != nil {
klog.Errorf("failed to scale statefulsets in shared chart %s namespace %s: %v", chart.Name, ns, err)
return err
}
}
}
// Reset the stop-all/resume-all annotation after processing
if am.Annotations != nil {
delete(am.Annotations, api.AppStopAllKey)
delete(am.Annotations, api.AppResumeAllKey)
if err := cli.Update(ctx, am); err != nil {
klog.Warningf("failed to reset stop-all/resume-all annotations for app=%s owner=%s: %v", am.Spec.AppName, am.Spec.AppOwner, err)
// Don't return error, operation already succeeded
}
}
}
return nil
}
func suspendV1AppOrV2Client(ctx context.Context, cli client.Client, am *appv1alpha1.ApplicationManager) error {
return suspendOrResumeApp(ctx, cli, am, 0, false)
}
func suspendV2AppAll(ctx context.Context, cli client.Client, am *appv1alpha1.ApplicationManager) error {
return suspendOrResumeApp(ctx, cli, am, 0, true)
}
func resumeV1AppOrV2AppClient(ctx context.Context, cli client.Client, am *appv1alpha1.ApplicationManager) error {
return suspendOrResumeApp(ctx, cli, am, 1, false)
}
func resumeV2AppAll(ctx context.Context, cli client.Client, am *appv1alpha1.ApplicationManager) error {
return suspendOrResumeApp(ctx, cli, am, 1, true)
}
func isStartUp(am *appv1alpha1.ApplicationManager, cli client.Client) (bool, error) {

View File

@@ -78,13 +78,15 @@ const (
SidecarInitContainerName = "olares-sidecar-init"
EnvoyConfigWorkDirName = "envoy-config"
ByteTradeAuthor = "bytetrade.io"
NvshareGPU = "nvshare.com/gpu"
NvidiaGPU = "nvidia.com/gpu"
VirtAiTechVGPU = "virtaitech.com/gpu"
PatchOpAdd = "add"
PatchOpReplace = "replace"
EnvNvshareManagedMemory = "NVSHARE_MANAGED_MEMORY"
ByteTradeAuthor = "bytetrade.io"
PatchOpAdd = "add"
PatchOpReplace = "replace"
EnvGPUType = "GPU_TYPE"
// gpu resource keys
NvidiaGPU = "nvidia.com/gpu"
NvidiaGB10GPU = "nvidia.com/gb10"
AMDAPU = "amd.com/apu"
AuthorizationLevelOfPublic = "public"
AuthorizationLevelOfPrivate = "private"

View File

@@ -3,5 +3,6 @@ package errcode
import "errors"
var (
ErrPodPending = errors.New("pod is pending")
ErrServerSidePodPending = errors.New("server side pod is pending")
ErrPodPending = errors.New("pod is pending")
)

View File

@@ -160,6 +160,7 @@ func PublishAppEventToQueue(p utils.EventParams) {
return p.RawAppName
}(),
Title: p.Title,
Icon: p.Icon,
Reason: p.Reason,
Message: p.Message,
SharedEntrances: p.SharedEntrances,

View File

@@ -233,6 +233,7 @@ func (imc *ImageManagerClient) updateProgress(ctx context.Context, am *appv1alph
RawAppName: am.Spec.RawAppName,
Type: am.Spec.Type.String(),
Title: apputils.AppTitle(am.Spec.Config),
Icon: apputils.AppIcon(am.Spec.Config),
})
}
klog.Infof("app %s download progress.... %v", am.Spec.AppName, progressStr)

View File

@@ -273,11 +273,7 @@ func (c *Creator) installSysApps(ctx context.Context, bflPod *corev1.Pod) error
"arch": arch,
}
gpuType, err := utils.FindGpuTypeFromNodes(&nodes)
if err != nil {
return err
}
vals["gpu"] = gpuType
vals["gpu"] = "none" // unused currently
userIndex, userSubnet, err := c.getUserSubnet(ctx)
if err != nil {

View File

@@ -16,6 +16,7 @@ import (
corev1 "k8s.io/api/core/v1"
sysv1alpha1 "github.com/beclab/Olares/framework/app-service/api/sys.bytetrade.io/v1alpha1"
"github.com/go-viper/mapstructure/v2"
"sigs.k8s.io/controller-runtime/pkg/client"
"github.com/beclab/Olares/framework/app-service/api/app.bytetrade.io/v1alpha1"
@@ -674,6 +675,7 @@ type ConfigOptions struct {
MarketSource string
IsAdmin bool
RawAppName string
SelectedGpu string
}
// GetAppConfig get app installation configuration from app store
@@ -740,7 +742,7 @@ func getAppConfigFromRepo(ctx context.Context, options *ConfigOptions) (*appcfg.
return getAppConfigFromConfigurationFile(options, chartPath)
}
func toApplicationConfig(app, chart, rawAppName string, cfg *appcfg.AppConfiguration) (*appcfg.ApplicationConfig, string, error) {
func toApplicationConfig(app, chart, rawAppName, selectedGpu string, cfg *appcfg.AppConfiguration) (*appcfg.ApplicationConfig, string, error) {
var permission []appcfg.AppPermission
if cfg.Permission.AppData {
permission = append(permission, appcfg.AppDataRW)
@@ -788,6 +790,57 @@ func toApplicationConfig(app, chart, rawAppName string, cfg *appcfg.AppConfigura
return nil, chart, err
}
// set suppertedGpu to ["nvidia","nvidia-gb10"] by default
if len(cfg.Spec.SupportedGpu) == 0 {
cfg.Spec.SupportedGpu = []interface{}{utils.NvidiaCardType, utils.GB10ChipType}
}
// try to get selected GPU type special resource requirement
if selectedGpu != "" {
found := false
for _, supportedGpu := range cfg.Spec.SupportedGpu {
if str, ok := supportedGpu.(string); ok && str == selectedGpu {
found = true
break
}
if supportedGpuResourceMap, ok := supportedGpu.(map[string]interface{}); ok {
if resourceRequirement, ok := supportedGpuResourceMap[selectedGpu].(map[string]interface{}); ok {
found = true
var specialResource appcfg.SpecialResource
err := mapstructure.Decode(resourceRequirement, &specialResource)
if err != nil {
return nil, chart, fmt.Errorf("failed to decode special resource for selected GPU type %s: %v", selectedGpu, err)
}
for _, resSetter := range []struct {
v **resource.Quantity
s *string
}{
{v: &mem, s: specialResource.RequiredMemory},
{v: &disk, s: specialResource.RequiredDisk},
{v: &cpu, s: specialResource.RequiredCPU},
{v: &gpu, s: specialResource.RequiredGPU},
} {
if resSetter.s != nil && *resSetter.s != "" {
*resSetter.v, err = valuePtr(resource.ParseQuantity(*resSetter.s))
if err != nil {
return nil, chart, fmt.Errorf("failed to parse special resource quantity %s: %v", *resSetter.s, err)
}
}
}
break
} // end if selected gpu's resource requirement found
} // end if supportedGpu is map
} // end for supportedGpu
if !found {
return nil, chart, fmt.Errorf("selected GPU type %s is not supported", selectedGpu)
}
}
// transform from Policy to AppPolicy
var policies []appcfg.AppPolicy
for _, p := range cfg.Options.Policies {
@@ -877,6 +930,7 @@ func toApplicationConfig(app, chart, rawAppName string, cfg *appcfg.AppConfigura
PodsSelectors: podSelectors,
HardwareRequirement: cfg.Spec.Hardware,
SharedEntrances: cfg.SharedEntrances,
SelectedGpuType: selectedGpu,
}, chart, nil
}
@@ -890,7 +944,7 @@ func getAppConfigFromConfigurationFile(opt *ConfigOptions, chartPath string) (*a
return nil, chartPath, err
}
return toApplicationConfig(opt.App, chartPath, opt.RawAppName, &cfg)
return toApplicationConfig(opt.App, chartPath, opt.RawAppName, opt.SelectedGpu, &cfg)
}
func checkVersionFormat(constraint string) error {
@@ -1080,13 +1134,28 @@ func IsClonedApp(appName, rawAppName string) bool {
}
func AppTitle(config string) string {
var cfg appcfg.ApplicationConfig
err := json.Unmarshal([]byte(config), &cfg)
if err != nil {
cfg := unmarshalApplicationConfig(config)
if cfg == nil {
return ""
}
return cfg.Title
}
func AppIcon(config string) string {
cfg := unmarshalApplicationConfig(config)
if cfg == nil {
return ""
}
return cfg.Icon
}
func unmarshalApplicationConfig(config string) *appcfg.ApplicationConfig {
var cfg appcfg.ApplicationConfig
err := json.Unmarshal([]byte(config), &cfg)
if err != nil {
return nil
}
return &cfg
}
func GetRawAppName(AppName, rawAppName string) string {
if rawAppName == "" {

View File

@@ -24,6 +24,7 @@ import (
"github.com/beclab/Olares/framework/app-service/pkg/utils"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -233,7 +234,9 @@ func CheckAppRequirement(token string, appConfig *appcfg.ApplicationConfig, op v
return constants.CPU, constants.SystemCPUPressure, fmt.Errorf(constants.SystemCPUPressureMessage, op)
}
}
if appConfig.Requirement.GPU != nil {
// only support nvidia gpu managment by HAMi for now
if appConfig.Requirement.GPU != nil && appConfig.GetSelectedGpuTypeValue() == utils.NvidiaCardType {
if !appConfig.Requirement.GPU.IsZero() && metrics.GPU.Total <= 0 {
return constants.GPU, constants.SystemGPUNotAvailable, fmt.Errorf(constants.SystemGPUNotAvailableMessage, op)
@@ -260,42 +263,10 @@ func CheckAppRequirement(token string, appConfig *appcfg.ApplicationConfig, op v
}
}
allocatedResources, err := getRequestResources()
if err != nil {
return "", "", err
}
if len(allocatedResources) == 1 {
sufficientCPU, sufficientMemory := false, false
if appConfig.Requirement.CPU == nil {
sufficientCPU = true
}
if appConfig.Requirement.Memory == nil {
sufficientMemory = true
}
for _, v := range allocatedResources {
if appConfig.Requirement.CPU != nil {
if v.cpu.allocatable.Cmp(*appConfig.Requirement.CPU) > 0 {
sufficientCPU = true
}
}
if appConfig.Requirement.Memory != nil {
if v.memory.allocatable.Cmp(*appConfig.Requirement.Memory) > 0 {
sufficientMemory = true
}
}
}
if !sufficientCPU {
return constants.CPU, constants.K8sRequestCPUPressure, fmt.Errorf(constants.K8sRequestCPUPressureMessage, op)
}
if !sufficientMemory {
return constants.Memory, constants.K8sRequestMemoryPressure, fmt.Errorf(constants.K8sRequestMemoryPressureMessage, op)
}
}
return "", "", nil
return CheckAppK8sRequestResource(appConfig, op)
}
func getRequestResources() (map[string]resources, error) {
func GetRequestResources() (map[string]resources, error) {
config, err := ctrl.GetConfig()
if err != nil {
return nil, err
@@ -429,7 +400,9 @@ func GetClusterResource(token string) (*prometheus.ClusterMetrics, []string, err
arches.Insert(n.Labels["kubernetes.io/arch"])
if quantity, ok := n.Status.Capacity[constants.NvidiaGPU]; ok {
total += quantity.AsApproximateFloat64()
} else if quantity, ok = n.Status.Capacity[constants.VirtAiTechVGPU]; ok {
} else if quantity, ok = n.Status.Capacity[constants.NvidiaGB10GPU]; ok {
total += quantity.AsApproximateFloat64()
} else if quantity, ok = n.Status.Capacity[constants.AMDAPU]; ok {
total += quantity.AsApproximateFloat64()
}
}
@@ -960,3 +933,83 @@ func CheckCloneEntrances(ctrlClient client.Client, appConfig *appcfg.Application
return nil, nil
}
func GetClusterAvailableResource() (*resources, error) {
config, err := ctrl.GetConfig()
if err != nil {
return nil, err
}
client, err := kubernetes.NewForConfig(config)
if err != nil {
return nil, err
}
nodes, err := client.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{})
if err != nil {
return nil, err
}
initAllocatable := resource.MustParse("0")
availableResources := resources{
cpu: usage{allocatable: &initAllocatable},
memory: usage{allocatable: &initAllocatable},
}
nodeList := make([]corev1.Node, 0)
for _, node := range nodes.Items {
if !utils.IsNodeReady(&node) || node.Spec.Unschedulable {
continue
}
nodeList = append(nodeList, node)
}
if len(nodeList) == 0 {
return nil, errors.New("cluster has no suitable node to schedule")
}
for _, node := range nodeList {
availableResources.cpu.allocatable.Add(*node.Status.Allocatable.Cpu())
availableResources.memory.allocatable.Add(*node.Status.Allocatable.Memory())
fieldSelector := fmt.Sprintf("spec.nodeName=%s,status.phase!=Failed,status.phase!=Succeeded", node.Name)
pods, err := client.CoreV1().Pods("").List(context.TODO(), metav1.ListOptions{
FieldSelector: fieldSelector,
})
if err != nil {
return nil, err
}
for _, pod := range pods.Items {
for _, container := range pod.Spec.Containers {
availableResources.cpu.allocatable.Sub(*container.Resources.Requests.Cpu())
availableResources.memory.allocatable.Sub(*container.Resources.Requests.Memory())
}
}
}
return &availableResources, nil
}
func CheckAppK8sRequestResource(appConfig *appcfg.ApplicationConfig, op v1alpha1.OpType) (constants.ResourceType, constants.ResourceConditionType, error) {
availableResources, err := GetClusterAvailableResource()
if err != nil {
return "", "", err
}
if appConfig == nil {
return "", "", errors.New("nil appConfig")
}
sufficientCPU, sufficientMemory := false, false
if appConfig.Requirement.CPU == nil {
sufficientCPU = true
}
if appConfig.Requirement.Memory == nil {
sufficientMemory = true
}
if appConfig.Requirement.CPU != nil && availableResources.cpu.allocatable.Cmp(*appConfig.Requirement.CPU) > 0 {
sufficientCPU = true
}
if appConfig.Requirement.Memory != nil && availableResources.memory.allocatable.Cmp(*appConfig.Requirement.Memory) > 0 {
sufficientMemory = true
}
if !sufficientCPU {
return constants.CPU, constants.K8sRequestCPUPressure, fmt.Errorf(constants.K8sRequestCPUPressureMessage, op)
}
if !sufficientMemory {
return constants.Memory, constants.K8sRequestMemoryPressure, fmt.Errorf(constants.K8sRequestMemoryPressureMessage, op)
}
return "", "", nil
}

View File

@@ -0,0 +1,12 @@
package utils
const (
NodeGPUTypeLabel = "gpu.bytetrade.io/type"
)
const (
NvidiaCardType = "nvidia" // handling by HAMi
AmdGpuCardType = "amd-gpu" //
AmdApuCardType = "amd-apu" // AMD APU with integrated GPU , AI Max 395 etc.
GB10ChipType = "nvidia-gb10" // NVIDIA GB10 Superchip & unified system memory
)

View File

@@ -103,24 +103,37 @@ func GetAllNodesTunnelIPCIDRs() (cidrs []string) {
return cidrs
}
func FindGpuTypeFromNodes(nodes *corev1.NodeList) (string, error) {
gpuType := "none"
// func FindGpuTypeFromNodes(nodes *corev1.NodeList) (string, error) {
// gpuType := "none"
// if nodes == nil {
// return gpuType, errors.New("empty node list")
// }
// for _, n := range nodes.Items {
// if _, ok := n.Status.Capacity[constants.NvidiaGPU]; ok {
// if _, ok = n.Status.Capacity[constants.NvshareGPU]; ok {
// return "nvshare", nil
// }
// gpuType = "nvidia"
// }
// if _, ok := n.Status.Capacity[constants.VirtAiTechVGPU]; ok {
// return "virtaitech", nil
// }
// }
// return gpuType, nil
// }
func GetAllGpuTypesFromNodes(nodes *corev1.NodeList) (map[string]struct{}, error) {
gpuTypes := make(map[string]struct{})
if nodes == nil {
return gpuType, errors.New("empty node list")
return gpuTypes, errors.New("empty node list")
}
for _, n := range nodes.Items {
if _, ok := n.Status.Capacity[constants.NvidiaGPU]; ok {
if _, ok = n.Status.Capacity[constants.NvshareGPU]; ok {
return "nvshare", nil
}
gpuType = "nvidia"
}
if _, ok := n.Status.Capacity[constants.VirtAiTechVGPU]; ok {
return "virtaitech", nil
if typeLabel, ok := n.Labels[NodeGPUTypeLabel]; ok {
gpuTypes[typeLabel] = struct{}{} // TODO: add driver version info
}
}
return gpuType, nil
return gpuTypes, nil
}
func IsNodeReady(node *corev1.Node) bool {

View File

@@ -25,6 +25,7 @@ type Event struct {
User string `json:"user"`
EntranceStatuses []v1alpha1.EntranceStatus `json:"entranceStatuses,omitempty"`
Title string `json:"title,omitempty"`
Icon string `json:"icon,omitempty"`
Reason string `json:"reason,omitempty"`
Message string `json:"message,omitempty"`
SharedEntrances []v1alpha1.Entrance `json:"sharedEntrances,omitempty"`
@@ -45,6 +46,7 @@ type EventParams struct {
Reason string
Message string
SharedEntrances []v1alpha1.Entrance
Icon string
}
func PublishEvent(nc *nats.Conn, subject string, data interface{}) error {

View File

@@ -30,7 +30,6 @@ import (
admissionv1 "k8s.io/api/admission/v1"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
@@ -544,16 +543,21 @@ type EnvKeyValue struct {
}
// CreatePatchForDeployment add gpu env for deployment and returns patch bytes.
func CreatePatchForDeployment(tpl *corev1.PodTemplateSpec, namespace string, gpuRequired *resource.Quantity, typeKey string, envKeyValues []EnvKeyValue) ([]byte, error) {
patches, err := addResourceLimits(tpl, namespace, gpuRequired, typeKey, envKeyValues)
func CreatePatchForDeployment(tpl *corev1.PodTemplateSpec, typeKey string, envKeyValues []EnvKeyValue) ([]byte, error) {
patches, err := addResourceLimits(tpl, typeKey, envKeyValues)
if err != nil {
return []byte{}, err
}
return json.Marshal(patches)
}
func addResourceLimits(tpl *corev1.PodTemplateSpec, namespace string, gpuRequired *resource.Quantity, typeKey string, envKeyValues []EnvKeyValue) (patch []patchOp, err error) {
if typeKey == constants.NvidiaGPU || typeKey == constants.NvshareGPU {
func addResourceLimits(tpl *corev1.PodTemplateSpec, typeKey string, envKeyValues []EnvKeyValue) (patch []patchOp, err error) {
if typeKey == "" {
klog.Warning("No gpu type selected, skip adding resource limits")
return patch, nil
}
if typeKey == constants.NvidiaGPU || typeKey == constants.NvidiaGB10GPU {
if tpl.Spec.RuntimeClassName != nil {
patch = append(patch, patchOp{
Op: constants.PatchOpReplace,
@@ -584,7 +588,10 @@ func addResourceLimits(tpl *corev1.PodTemplateSpec, namespace string, gpuRequire
t := make(map[string]map[string]string)
t["limits"] = map[string]string{}
for k, v := range container.Resources.Limits {
if k.String() == constants.NvidiaGPU || k.String() == constants.NvshareGPU || k.String() == constants.VirtAiTechVGPU {
if k.String() == constants.NvidiaGPU ||
k.String() == constants.NvidiaGB10GPU ||
k.String() == constants.AMDAPU {
// unset all previous gpu limits
continue
}
t["limits"][k.String()] = v.String()