Compare commits
8 Commits
module-l4-
...
module-app
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
82a5cbe08b | ||
|
|
a88cedb0ce | ||
|
|
06d0d36042 | ||
|
|
67deaf16ea | ||
|
|
b27854b863 | ||
|
|
4fd22c4e20 | ||
|
|
031d8164ff | ||
|
|
0c6def8f43 |
@@ -170,7 +170,7 @@ spec:
|
||||
priorityClassName: "system-cluster-critical"
|
||||
containers:
|
||||
- name: app-service
|
||||
image: beclab/app-service:0.4.76
|
||||
image: beclab/app-service:0.4.77
|
||||
imagePullPolicy: IfNotPresent
|
||||
ports:
|
||||
- containerPort: 6755
|
||||
@@ -196,7 +196,7 @@ spec:
|
||||
- name: SYS_APPS
|
||||
value: "market,auth,citus,desktop,did,docs,files,fsnotify,headscale,infisical,intentprovider,ksserver,message,mongo,monitoring,notifications,profile,redis,recommend,seafile,search,search-admin,settings,systemserver,tapr,vault,video,zinc,accounts,control-hub,dashboard,nitro,olares-app"
|
||||
- name: KB_MIDDLEWARES
|
||||
value: "mongodb,minio,mysql,mariadb,elasticsearch,rabbitmq"
|
||||
value: "mongodb,minio,mysql,mariadb,elasticsearch,rabbitmq,clickhouse"
|
||||
- name: GENERATED_APPS
|
||||
value: "citus,mongo-cluster-cfg,mongo-cluster-mongos,mongo-cluster-rs0,frp-agent,l4-bfl-proxy,drc-redis-cluster,appdata-backend,argoworkflows,argoworkflow-workflow-controller,velero,kvrocks"
|
||||
- name: WS_CONTAINER_IMAGE
|
||||
|
||||
@@ -193,6 +193,7 @@ func (r *ApplicationManagerController) publishStateChangeEvent(am *appv1alpha1.A
|
||||
RawAppName: am.Spec.RawAppName,
|
||||
Type: am.Spec.Type.String(),
|
||||
Title: apputils.AppTitle(am.Spec.Config),
|
||||
Icon: apputils.AppIcon(am.Spec.Config),
|
||||
Reason: am.Status.Reason,
|
||||
Message: am.Status.Message,
|
||||
})
|
||||
|
||||
@@ -252,6 +252,7 @@ func (r *EntranceStatusManagerController) updateEntranceStatus(ctx context.Conte
|
||||
RawAppName: appCopy.Spec.RawAppName,
|
||||
Type: am.Spec.Type.String(),
|
||||
Title: app.AppTitle(am.Spec.Config),
|
||||
Icon: app.AppIcon(am.Spec.Config),
|
||||
SharedEntrances: appCopy.Spec.SharedEntrances,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -4,9 +4,11 @@ import (
|
||||
"context"
|
||||
"os"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
appv1alpha1 "github.com/beclab/Olares/framework/app-service/api/app.bytetrade.io/v1alpha1"
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/apiserver/api"
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/appstate"
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/constants"
|
||||
apputils "github.com/beclab/Olares/framework/app-service/pkg/utils/app"
|
||||
@@ -120,7 +122,7 @@ func (r *PodAbnormalSuspendAppController) Reconcile(ctx context.Context, req ctr
|
||||
|
||||
if pod.Status.Reason == "Evicted" {
|
||||
klog.Infof("pod evicted name=%s namespace=%s, attempting to suspend app=%s owner=%s", pod.Name, pod.Namespace, appName, owner)
|
||||
ok, err := r.trySuspendApp(ctx, owner, appName, constants.AppStopDueToEvicted, "evicted pod: "+pod.Namespace+"/"+pod.Name)
|
||||
ok, err := r.trySuspendApp(ctx, owner, appName, constants.AppStopDueToEvicted, "evicted pod: "+pod.Namespace+"/"+pod.Name, pod.Namespace)
|
||||
if err != nil {
|
||||
klog.Errorf("suspend attempt failed for app=%s owner=%s: %v", appName, owner, err)
|
||||
return ctrl.Result{}, err
|
||||
@@ -147,7 +149,7 @@ func (r *PodAbnormalSuspendAppController) Reconcile(ctx context.Context, req ctr
|
||||
}
|
||||
|
||||
klog.Infof("attempting to suspend app=%s owner=%s due to pending unschedulable timeout", appName, owner)
|
||||
ok, err := r.trySuspendApp(ctx, owner, appName, constants.AppUnschedulable, "pending unschedulable timeout on pod: "+pod.Namespace+"/"+pod.Name)
|
||||
ok, err := r.trySuspendApp(ctx, owner, appName, constants.AppUnschedulable, "pending unschedulable timeout on pod: "+pod.Namespace+"/"+pod.Name, pod.Namespace)
|
||||
if err != nil {
|
||||
klog.Errorf("suspend attempt failed for app=%s owner=%s: %v", appName, owner, err)
|
||||
return ctrl.Result{}, err
|
||||
@@ -191,7 +193,7 @@ func pendingUnschedulableSince(pod *corev1.Pod) (time.Time, bool) {
|
||||
|
||||
// trySuspendApp attempts to suspend the app and returns (true, nil) if a suspend request was issued.
|
||||
// If the app is not suspendable yet, returns (false, nil) to trigger a short requeue.
|
||||
func (r *PodAbnormalSuspendAppController) trySuspendApp(ctx context.Context, owner, appName, reason, message string) (bool, error) {
|
||||
func (r *PodAbnormalSuspendAppController) trySuspendApp(ctx context.Context, owner, appName, reason, message, podNamespace string) (bool, error) {
|
||||
name, err := apputils.FmtAppMgrName(appName, owner, "")
|
||||
if err != nil {
|
||||
klog.Errorf("failed to format app manager name app=%s owner=%s: %v", appName, owner, err)
|
||||
@@ -215,6 +217,11 @@ func (r *PodAbnormalSuspendAppController) trySuspendApp(ctx context.Context, own
|
||||
return false, nil
|
||||
}
|
||||
|
||||
isServerPod := strings.HasSuffix(podNamespace, "-shared")
|
||||
if isServerPod {
|
||||
am.Annotations[api.AppStopAllKey] = "true"
|
||||
}
|
||||
|
||||
am.Spec.OpType = appv1alpha1.StopOp
|
||||
if err := r.Update(ctx, &am); err != nil {
|
||||
klog.Errorf("failed to update applicationmanager spec to StopOp name=%s app=%s owner=%s: %v", am.Name, appName, owner, err)
|
||||
|
||||
@@ -126,15 +126,16 @@ type UpgradeRequest struct {
|
||||
|
||||
// InstallRequest represents a request to install an application.
|
||||
type InstallRequest struct {
|
||||
Dev bool `json:"devMode"`
|
||||
RepoURL string `json:"repoUrl"`
|
||||
CfgURL string `json:"cfgUrl"`
|
||||
Source AppSource `json:"source"`
|
||||
Images []Image `json:"images"`
|
||||
Envs []sysv1alpha1.AppEnvVar `json:"envs"`
|
||||
RawAppName string `json:"rawAppName"`
|
||||
Title string `json:"title"`
|
||||
Entrances []EntranceClone `json:"entrances"`
|
||||
Dev bool `json:"devMode"`
|
||||
RepoURL string `json:"repoUrl"`
|
||||
CfgURL string `json:"cfgUrl"`
|
||||
Source AppSource `json:"source"`
|
||||
Images []Image `json:"images"`
|
||||
Envs []sysv1alpha1.AppEnvVar `json:"envs"`
|
||||
RawAppName string `json:"rawAppName"`
|
||||
Title string `json:"title"`
|
||||
Entrances []EntranceClone `json:"entrances"`
|
||||
SelectedGpuType string `json:"selectedGpuType"`
|
||||
}
|
||||
|
||||
type Image struct {
|
||||
|
||||
@@ -3,11 +3,14 @@ package apiserver
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"os"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
"golang.org/x/exp/maps"
|
||||
|
||||
"github.com/beclab/Olares/framework/app-service/api/app.bytetrade.io/v1alpha1"
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/apiserver/api"
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/appcfg"
|
||||
@@ -426,6 +429,11 @@ func (h *Handler) apps(req *restful.Request, resp *restful.Response) {
|
||||
api.HandleError(resp, req, err)
|
||||
return
|
||||
}
|
||||
for i := range appconfig.Entrances {
|
||||
if appconfig.Entrances[i].AuthLevel == "" {
|
||||
appconfig.Entrances[i].AuthLevel = "private"
|
||||
}
|
||||
}
|
||||
now := metav1.Now()
|
||||
name, _ := apputils.FmtAppMgrName(am.Spec.AppName, owner, appconfig.Namespace)
|
||||
app := &v1alpha1.Application{
|
||||
@@ -443,6 +451,7 @@ func (h *Handler) apps(req *restful.Request, resp *restful.Response) {
|
||||
Owner: owner,
|
||||
Entrances: appconfig.Entrances,
|
||||
SharedEntrances: appconfig.SharedEntrances,
|
||||
Ports: appconfig.Ports,
|
||||
Icon: appconfig.Icon,
|
||||
Settings: map[string]string{
|
||||
"title": am.Annotations[constants.ApplicationTitleLabel],
|
||||
@@ -477,6 +486,8 @@ func (h *Handler) apps(req *restful.Request, resp *restful.Response) {
|
||||
}
|
||||
if v, ok := appsMap[a.Name]; ok {
|
||||
v.Spec.Settings = a.Spec.Settings
|
||||
v.Spec.Entrances = a.Spec.Entrances
|
||||
v.Spec.Ports = a.Spec.Ports
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -738,6 +749,11 @@ func (h *Handler) allUsersApps(req *restful.Request, resp *restful.Response) {
|
||||
api.HandleError(resp, req, err)
|
||||
return
|
||||
}
|
||||
for i := range appconfig.Entrances {
|
||||
if appconfig.Entrances[i].AuthLevel == "" {
|
||||
appconfig.Entrances[i].AuthLevel = "private"
|
||||
}
|
||||
}
|
||||
|
||||
now := metav1.Now()
|
||||
app := v1alpha1.Application{
|
||||
@@ -754,6 +770,7 @@ func (h *Handler) allUsersApps(req *restful.Request, resp *restful.Response) {
|
||||
Namespace: am.Spec.AppNamespace,
|
||||
Owner: am.Spec.AppOwner,
|
||||
Entrances: appconfig.Entrances,
|
||||
Ports: appconfig.Ports,
|
||||
SharedEntrances: appconfig.SharedEntrances,
|
||||
Icon: appconfig.Icon,
|
||||
Settings: map[string]string{
|
||||
@@ -788,6 +805,8 @@ func (h *Handler) allUsersApps(req *restful.Request, resp *restful.Response) {
|
||||
}
|
||||
if v, ok := appsMap[a.Name]; ok {
|
||||
v.Spec.Settings = a.Spec.Settings
|
||||
v.Spec.Entrances = a.Spec.Entrances
|
||||
v.Spec.Ports = a.Spec.Ports
|
||||
}
|
||||
}
|
||||
|
||||
@@ -930,12 +949,37 @@ func (h *Handler) oamValues(req *restful.Request, resp *restful.Response) {
|
||||
api.HandleError(resp, req, err)
|
||||
return
|
||||
}
|
||||
gpuType, err := utils.FindGpuTypeFromNodes(&nodes)
|
||||
gpuTypes, err := utils.GetAllGpuTypesFromNodes(&nodes)
|
||||
if err != nil {
|
||||
klog.Errorf("get gpu type failed %v", gpuType)
|
||||
klog.Errorf("get gpu type failed %v", err)
|
||||
api.HandleError(resp, req, err)
|
||||
return
|
||||
}
|
||||
|
||||
gpuType := "none"
|
||||
selectedGpuType := req.QueryParameter("gputype")
|
||||
if len(gpuTypes) > 0 {
|
||||
if selectedGpuType != "" {
|
||||
if _, ok := gpuTypes[selectedGpuType]; ok {
|
||||
gpuType = selectedGpuType
|
||||
} else {
|
||||
err := fmt.Errorf("selected gpu type %s not found in cluster", selectedGpuType)
|
||||
klog.Error(err)
|
||||
api.HandleError(resp, req, err)
|
||||
return
|
||||
}
|
||||
} else {
|
||||
if len(gpuTypes) == 1 {
|
||||
gpuType = maps.Keys(gpuTypes)[0]
|
||||
} else {
|
||||
err := fmt.Errorf("multiple gpu types found in cluster, please specify one")
|
||||
klog.Error(err)
|
||||
api.HandleError(resp, req, err)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
values["GPU"] = map[string]interface{}{
|
||||
"Type": gpuType,
|
||||
"Cuda": os.Getenv("OLARES_SYSTEM_CUDA_VERSION"),
|
||||
|
||||
@@ -1,174 +1,33 @@
|
||||
package apiserver
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/apiserver/api"
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/client/clientset"
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/constants"
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/utils"
|
||||
"golang.org/x/exp/maps"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||
|
||||
"github.com/emicklei/go-restful/v3"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/client-go/util/retry"
|
||||
"k8s.io/klog/v2"
|
||||
)
|
||||
|
||||
var running bool = false
|
||||
var switchLock sync.Mutex
|
||||
|
||||
func (h *Handler) disableGpuManagedMemory(req *restful.Request, resp *restful.Response) {
|
||||
if err := h.nvshareSwitch(req, false); err != nil {
|
||||
api.HandleError(resp, req, &errors.StatusError{
|
||||
ErrStatus: metav1.Status{Code: 400, Message: "operation failed, " + err.Error()},
|
||||
})
|
||||
|
||||
func (h *Handler) getGpuTypes(req *restful.Request, resp *restful.Response) {
|
||||
var nodes corev1.NodeList
|
||||
err := h.ctrlClient.List(req.Request.Context(), &nodes, &client.ListOptions{})
|
||||
if err != nil {
|
||||
klog.Errorf("list node failed %v", err)
|
||||
api.HandleError(resp, req, err)
|
||||
return
|
||||
}
|
||||
resp.WriteAsJson(map[string]int{"code": 0})
|
||||
}
|
||||
|
||||
func (h *Handler) enableGpuManagedMemory(req *restful.Request, resp *restful.Response) {
|
||||
if err := h.nvshareSwitch(req, true); err != nil {
|
||||
api.HandleError(resp, req, &errors.StatusError{
|
||||
ErrStatus: metav1.Status{Code: 400, Message: "operation failed, " + err.Error()},
|
||||
})
|
||||
|
||||
gpuTypes, err := utils.GetAllGpuTypesFromNodes(&nodes)
|
||||
if err != nil {
|
||||
klog.Errorf("get gpu type failed %v", err)
|
||||
api.HandleError(resp, req, err)
|
||||
return
|
||||
}
|
||||
resp.WriteAsJson(map[string]int{"code": 0})
|
||||
}
|
||||
|
||||
func (h *Handler) nvshareSwitch(req *restful.Request, enable bool) error {
|
||||
client := req.Attribute(constants.KubeSphereClientAttribute).(*clientset.ClientSet)
|
||||
switchLock.Lock()
|
||||
defer switchLock.Unlock()
|
||||
|
||||
if running {
|
||||
return fmt.Errorf("last operation is still running")
|
||||
}
|
||||
|
||||
deployments, err := client.KubeClient.Kubernetes().AppsV1().Deployments("").List(req.Request.Context(), metav1.ListOptions{})
|
||||
if err != nil {
|
||||
klog.Error("list deployment error, ", err)
|
||||
return err
|
||||
}
|
||||
|
||||
envValue := "0"
|
||||
if enable {
|
||||
envValue = "1"
|
||||
}
|
||||
|
||||
for _, d := range deployments.Items {
|
||||
shouldUpdate := false
|
||||
for i, c := range d.Spec.Template.Spec.Containers {
|
||||
found := false
|
||||
for k := range c.Resources.Limits {
|
||||
if k == constants.NvshareGPU {
|
||||
found = true
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if found {
|
||||
// a gpu request container
|
||||
addEnv := true
|
||||
for n, env := range d.Spec.Template.Spec.Containers[i].Env {
|
||||
if env.Name == constants.EnvNvshareManagedMemory {
|
||||
addEnv = false
|
||||
d.Spec.Template.Spec.Containers[i].Env[n].Value = envValue
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if addEnv {
|
||||
d.Spec.Template.Spec.Containers[i].Env =
|
||||
append(d.Spec.Template.Spec.Containers[i].Env,
|
||||
corev1.EnvVar{Name: constants.EnvNvshareManagedMemory, Value: envValue})
|
||||
}
|
||||
|
||||
shouldUpdate = true
|
||||
} // end found
|
||||
} // end of container loop
|
||||
|
||||
if shouldUpdate {
|
||||
err = retry.RetryOnConflict(retry.DefaultRetry, func() error {
|
||||
deployment, err := client.KubeClient.Kubernetes().AppsV1().Deployments(d.Namespace).
|
||||
Get(req.Request.Context(), d.Name, metav1.GetOptions{})
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
deployment.Spec.Template.Spec.Containers = d.Spec.Template.Spec.Containers
|
||||
|
||||
_, err = client.KubeClient.Kubernetes().AppsV1().Deployments(d.Namespace).
|
||||
Update(req.Request.Context(), deployment, metav1.UpdateOptions{})
|
||||
|
||||
return err
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
klog.Error("update deployment error, ", err, ", ", d.Name, ", ", d.Namespace)
|
||||
return err
|
||||
}
|
||||
} // should update
|
||||
} // end of deployment loop
|
||||
|
||||
// update terminus
|
||||
err = retry.RetryOnConflict(retry.DefaultRetry, func() error {
|
||||
terminus, err := utils.GetTerminus(req.Request.Context(), h.ctrlClient)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
terminus.Spec.Settings[constants.EnvNvshareManagedMemory] = envValue
|
||||
|
||||
return h.ctrlClient.Update(req.Request.Context(), terminus)
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
klog.Error("update terminus error, ", err)
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
running = true
|
||||
// delay 30s, assume the all pods will be reload in 30s.
|
||||
delay := time.NewTimer(30 * time.Second)
|
||||
go func() {
|
||||
<-delay.C
|
||||
switchLock.Lock()
|
||||
defer switchLock.Unlock()
|
||||
|
||||
running = false
|
||||
}()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (h *Handler) getManagedMemoryValue(req *restful.Request, resp *restful.Response) {
|
||||
terminus, err := utils.GetTerminus(req.Request.Context(), h.ctrlClient)
|
||||
if err != nil {
|
||||
klog.Error("get terminus value error, ", err)
|
||||
api.HandleError(resp, req, &errors.StatusError{
|
||||
ErrStatus: metav1.Status{Code: 400, Message: "get value error, " + err.Error()},
|
||||
})
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
managed := true
|
||||
if v, ok := terminus.Spec.Settings[constants.EnvNvshareManagedMemory]; ok && v == "0" {
|
||||
managed = false
|
||||
}
|
||||
|
||||
resp.WriteAsJson(&map[string]interface{}{
|
||||
"managed_memory": managed,
|
||||
"gpu_types": maps.Keys(gpuTypes),
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
@@ -21,9 +21,12 @@ import (
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/utils"
|
||||
apputils "github.com/beclab/Olares/framework/app-service/pkg/utils/app"
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/utils/config"
|
||||
"golang.org/x/exp/maps"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||
|
||||
"github.com/emicklei/go-restful/v3"
|
||||
"helm.sh/helm/v3/pkg/time"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
@@ -37,7 +40,7 @@ type depRequest struct {
|
||||
type installHelperIntf interface {
|
||||
getAdminUsers() (admin []string, isAdmin bool, err error)
|
||||
getInstalledApps() (installed bool, app []*v1alpha1.Application, err error)
|
||||
getAppConfig(adminUsers []string, marketSource string, isAdmin, appInstalled bool, installedApps []*v1alpha1.Application, chartVersion string) (err error)
|
||||
getAppConfig(adminUsers []string, marketSource string, isAdmin, appInstalled bool, installedApps []*v1alpha1.Application, chartVersion, selectedGpuType string) (err error)
|
||||
setAppConfig(req *api.InstallRequest, appName string)
|
||||
validate(bool, []*v1alpha1.Application) error
|
||||
setAppEnv(overrides []sysv1alpha1.AppEnvVar) error
|
||||
@@ -105,6 +108,36 @@ func (h *Handler) install(req *restful.Request, resp *restful.Response) {
|
||||
}
|
||||
}
|
||||
|
||||
// check selected gpu type can be supported
|
||||
// if selectedGpuType != "" , then check if the gpu type exists in cluster
|
||||
// if selectedGpuType == "" , and only one gpu type exists in cluster, then use it
|
||||
var nodes corev1.NodeList
|
||||
err = h.ctrlClient.List(req.Request.Context(), &nodes, &client.ListOptions{})
|
||||
if err != nil {
|
||||
klog.Errorf("list node failed %v", err)
|
||||
api.HandleError(resp, req, err)
|
||||
return
|
||||
}
|
||||
gpuTypes, err := utils.GetAllGpuTypesFromNodes(&nodes)
|
||||
if err != nil {
|
||||
klog.Errorf("get gpu type failed %v", err)
|
||||
api.HandleError(resp, req, err)
|
||||
return
|
||||
}
|
||||
|
||||
if insReq.SelectedGpuType != "" {
|
||||
if _, ok := gpuTypes[insReq.SelectedGpuType]; !ok {
|
||||
klog.Errorf("selected gpu type %s not found in cluster", insReq.SelectedGpuType)
|
||||
api.HandleBadRequest(resp, req, fmt.Errorf("selected gpu type %s not found in cluster", insReq.SelectedGpuType))
|
||||
return
|
||||
}
|
||||
} else {
|
||||
if len(gpuTypes) == 1 {
|
||||
insReq.SelectedGpuType = maps.Keys(gpuTypes)[0]
|
||||
klog.Infof("only one gpu type %s found in cluster, use it as selected gpu type", insReq.SelectedGpuType)
|
||||
}
|
||||
}
|
||||
|
||||
apiVersion, appCfg, err := apputils.GetApiVersionFromAppConfig(req.Request.Context(), &apputils.ConfigOptions{
|
||||
App: app,
|
||||
RawAppName: rawAppName,
|
||||
@@ -112,6 +145,7 @@ func (h *Handler) install(req *restful.Request, resp *restful.Response) {
|
||||
RepoURL: insReq.RepoURL,
|
||||
MarketSource: marketSource,
|
||||
Version: chartVersion,
|
||||
SelectedGpu: insReq.SelectedGpuType,
|
||||
})
|
||||
klog.Infof("chartVersion: %s", chartVersion)
|
||||
if err != nil {
|
||||
@@ -188,7 +222,7 @@ func (h *Handler) install(req *restful.Request, resp *restful.Response) {
|
||||
return
|
||||
}
|
||||
|
||||
err = helper.getAppConfig(adminUsers, marketSource, isAdmin, appInstalled, installedApps, chartVersion)
|
||||
err = helper.getAppConfig(adminUsers, marketSource, isAdmin, appInstalled, installedApps, chartVersion, insReq.SelectedGpuType)
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to get app config err=%v", err)
|
||||
return
|
||||
@@ -423,7 +457,7 @@ func (h *installHandlerHelper) getInstalledApps() (installed bool, app []*v1alph
|
||||
return
|
||||
}
|
||||
|
||||
func (h *installHandlerHelper) getAppConfig(adminUsers []string, marketSource string, isAdmin, appInstalled bool, installedApps []*v1alpha1.Application, chartVersion string) (err error) {
|
||||
func (h *installHandlerHelper) getAppConfig(adminUsers []string, marketSource string, isAdmin, appInstalled bool, installedApps []*v1alpha1.Application, chartVersion, selectedGpuType string) (err error) {
|
||||
var (
|
||||
admin string
|
||||
installAsAdmin bool
|
||||
@@ -472,6 +506,7 @@ func (h *installHandlerHelper) getAppConfig(adminUsers []string, marketSource st
|
||||
Admin: admin,
|
||||
IsAdmin: installAsAdmin,
|
||||
MarketSource: marketSource,
|
||||
SelectedGpu: selectedGpuType,
|
||||
})
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to get appconfig err=%v", err)
|
||||
@@ -685,7 +720,7 @@ func (h *installHandlerHelperV2) _validateClusterScope(isAdmin bool, installedAp
|
||||
return nil
|
||||
}
|
||||
|
||||
func (h *installHandlerHelperV2) getAppConfig(adminUsers []string, marketSource string, isAdmin, appInstalled bool, installedApps []*v1alpha1.Application, chartVersion string) (err error) {
|
||||
func (h *installHandlerHelperV2) getAppConfig(adminUsers []string, marketSource string, isAdmin, appInstalled bool, installedApps []*v1alpha1.Application, chartVersion, selectedGpuType string) (err error) {
|
||||
klog.Info("get app config for install handler v2")
|
||||
|
||||
var (
|
||||
@@ -713,6 +748,7 @@ func (h *installHandlerHelperV2) getAppConfig(adminUsers []string, marketSource
|
||||
Admin: admin,
|
||||
MarketSource: marketSource,
|
||||
IsAdmin: isAdmin,
|
||||
SelectedGpu: selectedGpuType,
|
||||
})
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to get appconfig err=%v", err)
|
||||
|
||||
@@ -187,6 +187,11 @@ func (h *Handler) listBackend(req *restful.Request, resp *restful.Response) {
|
||||
api.HandleError(resp, req, err)
|
||||
return
|
||||
}
|
||||
for i := range appconfig.Entrances {
|
||||
if appconfig.Entrances[i].AuthLevel == "" {
|
||||
appconfig.Entrances[i].AuthLevel = "private"
|
||||
}
|
||||
}
|
||||
|
||||
appconfig.SharedEntrances, err = appconfig.GenSharedEntranceURL(req.Request.Context())
|
||||
if err != nil {
|
||||
@@ -214,6 +219,7 @@ func (h *Handler) listBackend(req *restful.Request, resp *restful.Response) {
|
||||
Namespace: am.Spec.AppNamespace,
|
||||
Owner: am.Spec.AppOwner,
|
||||
Entrances: appconfig.Entrances,
|
||||
Ports: appconfig.Ports,
|
||||
SharedEntrances: appconfig.SharedEntrances,
|
||||
Icon: appconfig.Icon,
|
||||
Settings: map[string]string{
|
||||
@@ -274,6 +280,8 @@ func (h *Handler) listBackend(req *restful.Request, resp *restful.Response) {
|
||||
}
|
||||
if v, ok := appsMap[a.Name]; ok {
|
||||
v.Spec.Settings = a.Spec.Settings
|
||||
v.Spec.Entrances = a.Spec.Entrances
|
||||
v.Spec.Ports = a.Spec.Ports
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -13,6 +13,7 @@ import (
|
||||
"github.com/beclab/Olares/framework/app-service/api/app.bytetrade.io/v1alpha1"
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/apiserver/api"
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/appcfg"
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/appinstaller"
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/appstate"
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/client/clientset"
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/constants"
|
||||
@@ -520,6 +521,7 @@ type applicationPermission struct {
|
||||
Permissions []permission `json:"permissions"`
|
||||
}
|
||||
|
||||
// Deprecated
|
||||
func (h *Handler) applicationPermissionList(req *restful.Request, resp *restful.Response) {
|
||||
owner := req.Attribute(constants.UserContextAttribute).(string)
|
||||
//token := req.HeaderParameter(constants.AuthorizationTokenKey)
|
||||
@@ -572,46 +574,39 @@ func (h *Handler) applicationPermissionList(req *restful.Request, resp *restful.
|
||||
func (h *Handler) getApplicationPermission(req *restful.Request, resp *restful.Response) {
|
||||
app := req.PathParameter(ParamAppName)
|
||||
owner := req.Attribute(constants.UserContextAttribute).(string)
|
||||
client, err := dynamic.NewForConfig(h.kubeConfig)
|
||||
name, err := apputils.FmtAppMgrName(app, owner, "")
|
||||
if err != nil {
|
||||
api.HandleError(resp, req, err)
|
||||
return
|
||||
}
|
||||
var am v1alpha1.ApplicationManager
|
||||
err = h.ctrlClient.Get(req.Request.Context(), types.NamespacedName{Name: name}, &am)
|
||||
if err != nil {
|
||||
api.HandleError(resp, req, err)
|
||||
return
|
||||
}
|
||||
|
||||
var appConfig appcfg.ApplicationConfig
|
||||
err = am.GetAppConfig(&appConfig)
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to get app config err=%v", err)
|
||||
api.HandleError(resp, req, err)
|
||||
return
|
||||
}
|
||||
|
||||
var ret *applicationPermission
|
||||
apClient := provider.NewApplicationPermissionRequest(client)
|
||||
namespace := fmt.Sprintf("user-system-%s", owner)
|
||||
aps, err := apClient.List(req.Request.Context(), namespace, metav1.ListOptions{})
|
||||
if err != nil {
|
||||
api.HandleError(resp, req, err)
|
||||
return
|
||||
}
|
||||
for _, ap := range aps.Items {
|
||||
if ap.Object == nil {
|
||||
continue
|
||||
}
|
||||
appName, _, _ := unstructured.NestedString(ap.Object, "spec", "app")
|
||||
if appName == app {
|
||||
perms, _, _ := unstructured.NestedSlice(ap.Object, "spec", "permissions")
|
||||
permissions := appinstaller.ParseAppPermission(appConfig.Permission)
|
||||
for _, ap := range permissions {
|
||||
if perms, ok := ap.([]appcfg.ProviderPermission); ok {
|
||||
permissions := make([]permission, 0)
|
||||
for _, p := range perms {
|
||||
if perm, ok := p.(map[string]interface{}); ok {
|
||||
ops := make([]string, 0)
|
||||
for _, op := range perm["ops"].([]interface{}) {
|
||||
if opStr, ok := op.(string); ok {
|
||||
ops = append(ops, opStr)
|
||||
}
|
||||
}
|
||||
permissions = append(permissions, permission{
|
||||
DataType: perm["dataType"].(string),
|
||||
Group: perm["group"].(string),
|
||||
Version: perm["version"].(string),
|
||||
Ops: ops,
|
||||
})
|
||||
}
|
||||
|
||||
permissions = append(permissions, permission{
|
||||
DataType: p.ProviderName,
|
||||
Group: p.AppName,
|
||||
})
|
||||
}
|
||||
ret = &applicationPermission{
|
||||
App: appName,
|
||||
App: am.Spec.AppName,
|
||||
Owner: owner,
|
||||
Permissions: permissions,
|
||||
}
|
||||
@@ -642,6 +637,7 @@ type opApi struct {
|
||||
URI string `json:"uri"`
|
||||
}
|
||||
|
||||
// Deprecated
|
||||
func (h *Handler) getProviderRegistry(req *restful.Request, resp *restful.Response) {
|
||||
dataTypeReq := req.PathParameter(ParamDataType)
|
||||
groupReq := req.PathParameter(ParamGroup)
|
||||
@@ -708,56 +704,44 @@ func (h *Handler) getProviderRegistry(req *restful.Request, resp *restful.Respon
|
||||
func (h *Handler) getApplicationProviderList(req *restful.Request, resp *restful.Response) {
|
||||
owner := req.Attribute(constants.UserContextAttribute).(string)
|
||||
app := req.PathParameter(ParamAppName)
|
||||
client, err := dynamic.NewForConfig(h.kubeConfig)
|
||||
|
||||
name, err := apputils.FmtAppMgrName(app, owner, "")
|
||||
if err != nil {
|
||||
api.HandleError(resp, req, err)
|
||||
return
|
||||
}
|
||||
var am v1alpha1.ApplicationManager
|
||||
err = h.ctrlClient.Get(req.Request.Context(), types.NamespacedName{Name: name}, &am)
|
||||
if err != nil {
|
||||
api.HandleError(resp, req, err)
|
||||
return
|
||||
}
|
||||
|
||||
var appConfig appcfg.ApplicationConfig
|
||||
err = am.GetAppConfig(&appConfig)
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to get app config err=%v", err)
|
||||
api.HandleError(resp, req, err)
|
||||
return
|
||||
}
|
||||
|
||||
ret := make([]providerRegistry, 0)
|
||||
rClient := provider.NewRegistryRequest(client)
|
||||
namespace := fmt.Sprintf("user-system-%s", owner)
|
||||
prs, err := rClient.List(req.Request.Context(), namespace, metav1.ListOptions{})
|
||||
if err != nil {
|
||||
api.HandleError(resp, req, err)
|
||||
return
|
||||
}
|
||||
for _, ap := range prs.Items {
|
||||
if ap.Object == nil {
|
||||
continue
|
||||
}
|
||||
deployment, _, _ := unstructured.NestedString(ap.Object, "spec", "deployment")
|
||||
kind, _, _ := unstructured.NestedString(ap.Object, "spec", "kind")
|
||||
|
||||
if app == deployment && kind == "provider" {
|
||||
dataType, _, _ := unstructured.NestedString(ap.Object, "spec", "dataType")
|
||||
group, _, _ := unstructured.NestedString(ap.Object, "spec", "group")
|
||||
description, _, _ := unstructured.NestedString(ap.Object, "spec", "description")
|
||||
endpoint, _, _ := unstructured.NestedString(ap.Object, "spec", "endpoint")
|
||||
ns, _, _ := unstructured.NestedString(ap.Object, "spec", "namespace")
|
||||
version, _, _ := unstructured.NestedString(ap.Object, "spec", "version")
|
||||
opApis := make([]opApi, 0)
|
||||
opApiList, _, _ := unstructured.NestedSlice(ap.Object, "spec", "opApis")
|
||||
for _, op := range opApiList {
|
||||
if aop, ok := op.(map[string]interface{}); ok {
|
||||
opApis = append(opApis, opApi{
|
||||
Name: aop["name"].(string),
|
||||
URI: aop["uri"].(string),
|
||||
})
|
||||
}
|
||||
}
|
||||
ret = append(ret, providerRegistry{
|
||||
DataType: dataType,
|
||||
Deployment: deployment,
|
||||
Description: description,
|
||||
Endpoint: endpoint,
|
||||
Kind: kind,
|
||||
Group: group,
|
||||
Namespace: ns,
|
||||
OpApis: opApis,
|
||||
Version: version,
|
||||
ns := am.Spec.AppNamespace
|
||||
for _, ap := range appConfig.Provider {
|
||||
dataType := ap.Name
|
||||
endpoint := ap.Entrance
|
||||
opApis := make([]opApi, 0)
|
||||
for _, op := range ap.Paths {
|
||||
opApis = append(opApis, opApi{
|
||||
URI: op,
|
||||
})
|
||||
|
||||
}
|
||||
ret = append(ret, providerRegistry{
|
||||
DataType: dataType,
|
||||
Endpoint: endpoint,
|
||||
Namespace: ns,
|
||||
OpApis: opApis,
|
||||
})
|
||||
}
|
||||
resp.WriteAsJson(ret)
|
||||
}
|
||||
|
||||
@@ -37,7 +37,6 @@ import (
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/client-go/dynamic"
|
||||
"k8s.io/klog/v2"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||
"sigs.k8s.io/controller-runtime/pkg/webhook/admission"
|
||||
)
|
||||
|
||||
@@ -308,36 +307,21 @@ func (h *Handler) gpuLimitMutate(ctx context.Context, req *admissionv1.Admission
|
||||
return resp
|
||||
}
|
||||
|
||||
GPUType, err := h.findNvidiaGpuFromNodes(ctx)
|
||||
if err != nil && !errors.Is(err, api.ErrGPUNodeNotFound) {
|
||||
return h.sidecarWebhook.AdmissionError(req.UID, err)
|
||||
}
|
||||
GPUType := appcfg.GetSelectedGpuTypeValue()
|
||||
|
||||
// no gpu found, no need to inject env, just return.
|
||||
if GPUType == "" {
|
||||
if GPUType == "none" || GPUType == "" {
|
||||
return resp
|
||||
}
|
||||
|
||||
terminus, err := utils.GetTerminus(ctx, h.ctrlClient)
|
||||
if err != nil {
|
||||
return h.sidecarWebhook.AdmissionError(req.UID, err)
|
||||
}
|
||||
nvshareManagedMemory := ""
|
||||
if terminus.Spec.Settings != nil {
|
||||
nvshareManagedMemory = terminus.Spec.Settings[constants.EnvNvshareManagedMemory]
|
||||
envs := []webhook.EnvKeyValue{
|
||||
{
|
||||
Key: constants.EnvGPUType,
|
||||
Value: GPUType,
|
||||
},
|
||||
}
|
||||
|
||||
envs := []webhook.EnvKeyValue{}
|
||||
if nvshareManagedMemory != "" {
|
||||
envs = append(envs, webhook.EnvKeyValue{
|
||||
Key: constants.EnvNvshareManagedMemory,
|
||||
Value: nvshareManagedMemory,
|
||||
})
|
||||
}
|
||||
|
||||
envs = append(envs, webhook.EnvKeyValue{Key: "NVSHARE_DEBUG", Value: "1"})
|
||||
|
||||
patchBytes, err := webhook.CreatePatchForDeployment(tpl, req.Namespace, gpuRequired, GPUType, envs)
|
||||
patchBytes, err := webhook.CreatePatchForDeployment(tpl, h.getGPUResourceTypeKey(GPUType), envs)
|
||||
if err != nil {
|
||||
klog.Errorf("create patch error %v", err)
|
||||
return h.sidecarWebhook.AdmissionError(req.UID, err)
|
||||
@@ -347,33 +331,17 @@ func (h *Handler) gpuLimitMutate(ctx context.Context, req *admissionv1.Admission
|
||||
return resp
|
||||
}
|
||||
|
||||
func (h *Handler) findNvidiaGpuFromNodes(ctx context.Context) (string, error) {
|
||||
var nodes corev1.NodeList
|
||||
err := h.ctrlClient.List(ctx, &nodes, &client.ListOptions{})
|
||||
if err != nil {
|
||||
return "", err
|
||||
func (h *Handler) getGPUResourceTypeKey(gpuType string) string {
|
||||
switch gpuType {
|
||||
case utils.NvidiaCardType:
|
||||
return constants.NvidiaGPU
|
||||
case utils.GB10ChipType:
|
||||
return constants.NvidiaGB10GPU
|
||||
case utils.AmdApuCardType:
|
||||
return constants.AMDAPU
|
||||
default:
|
||||
return ""
|
||||
}
|
||||
|
||||
// return nvshare gpu or virtaitech gpu in priority
|
||||
gtype := ""
|
||||
for _, n := range nodes.Items {
|
||||
if _, ok := n.Status.Capacity[constants.NvidiaGPU]; ok {
|
||||
if _, ok = n.Status.Capacity[constants.NvshareGPU]; ok {
|
||||
return constants.NvshareGPU, nil
|
||||
}
|
||||
gtype = constants.NvidiaGPU
|
||||
}
|
||||
|
||||
if _, ok := n.Status.Capacity[constants.VirtAiTechVGPU]; ok {
|
||||
return constants.VirtAiTechVGPU, nil
|
||||
}
|
||||
}
|
||||
|
||||
if gtype != "" {
|
||||
return gtype, nil
|
||||
}
|
||||
|
||||
return "", api.ErrGPUNodeNotFound
|
||||
}
|
||||
|
||||
func (h *Handler) providerRegistryValidate(req *restful.Request, resp *restful.Response) {
|
||||
|
||||
@@ -340,7 +340,9 @@ func GetClusterResource(kubeConfig *rest.Config, token string) (*prometheus.Clus
|
||||
arches.Insert(n.Labels["kubernetes.io/arch"])
|
||||
if quantity, ok := n.Status.Capacity[constants.NvidiaGPU]; ok {
|
||||
total += quantity.AsApproximateFloat64()
|
||||
} else if quantity, ok = n.Status.Capacity[constants.VirtAiTechVGPU]; ok {
|
||||
} else if quantity, ok = n.Status.Capacity[constants.NvidiaGB10GPU]; ok {
|
||||
total += quantity.AsApproximateFloat64()
|
||||
} else if quantity, ok = n.Status.Capacity[constants.AMDAPU]; ok {
|
||||
total += quantity.AsApproximateFloat64()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -254,21 +254,9 @@ func addServiceToContainer(c *restful.Container, handler *Handler) error {
|
||||
Param(ws.PathParameter(ParamEntranceName, "the name of a application entrance")).
|
||||
Returns(http.StatusOK, "Success to set the application entrance policy", nil))
|
||||
|
||||
ws.Route(ws.POST("/gpu/disable/managed-memory").
|
||||
To(handler.disableGpuManagedMemory).
|
||||
Doc("disable nvshare's managed memory ").
|
||||
Metadata(restfulspec.KeyOpenAPITags, MODULE_TAGS).
|
||||
Returns(http.StatusOK, "Success to disable", nil))
|
||||
|
||||
ws.Route(ws.POST("/gpu/enable/managed-memory").
|
||||
To(handler.enableGpuManagedMemory).
|
||||
Doc("enable nvshare's managed memory ").
|
||||
Metadata(restfulspec.KeyOpenAPITags, MODULE_TAGS).
|
||||
Returns(http.StatusOK, "Success to enable", nil))
|
||||
|
||||
ws.Route(ws.GET("/gpu/managed-memory").
|
||||
To(handler.getManagedMemoryValue).
|
||||
Doc("get nvshare's managed memory enabled or not").
|
||||
ws.Route(ws.GET("/gpu/types").
|
||||
To(handler.getGpuTypes).
|
||||
Doc("get all gpu types in the cluster").
|
||||
Metadata(restfulspec.KeyOpenAPITags, MODULE_TAGS).
|
||||
Returns(http.StatusOK, "Success to get ", &ResultResponse{}))
|
||||
|
||||
|
||||
@@ -56,14 +56,19 @@ type AppSpec struct {
|
||||
Developer string `yaml:"developer" json:"developer"`
|
||||
RequiredMemory string `yaml:"requiredMemory" json:"requiredMemory"`
|
||||
RequiredDisk string `yaml:"requiredDisk" json:"requiredDisk"`
|
||||
SupportClient SupportClient `yaml:"supportClient" json:"supportClient"`
|
||||
RequiredGPU string `yaml:"requiredGpu" json:"requiredGpu"`
|
||||
RequiredCPU string `yaml:"requiredCpu" json:"requiredCpu"`
|
||||
LimitedMemory string `yaml:"limitedMemory" json:"limitedMemory"`
|
||||
LimitedDisk string `yaml:"limitedDisk" json:"limitedDisk"`
|
||||
LimitedGPU string `yaml:"limitedGPU" json:"limitedGPU"`
|
||||
LimitedCPU string `yaml:"limitedCPU" json:"limitedCPU"`
|
||||
SupportClient SupportClient `yaml:"supportClient" json:"supportClient"`
|
||||
RunAsUser bool `yaml:"runAsUser" json:"runAsUser"`
|
||||
RunAsInternal bool `yaml:"runAsInternal" json:"runAsInternal"`
|
||||
PodGPUConsumePolicy string `yaml:"podGpuConsumePolicy" json:"podGpuConsumePolicy"`
|
||||
SubCharts []Chart `yaml:"subCharts" json:"subCharts"`
|
||||
Hardware Hardware `yaml:"hardware" json:"hardware"`
|
||||
SupportedGpu []any `yaml:"supportedGpu,omitempty" json:"supportedGpu,omitempty"`
|
||||
}
|
||||
|
||||
type Hardware struct {
|
||||
@@ -188,6 +193,17 @@ type Provider struct {
|
||||
Verbs []string `yaml:"verbs" json:"verbs"`
|
||||
}
|
||||
|
||||
type SpecialResource struct {
|
||||
RequiredMemory *string `yaml:"requiredMemory,omitempty" json:"requiredMemory,omitempty"`
|
||||
RequiredDisk *string `yaml:"requiredDisk,omitempty" json:"requiredDisk,omitempty"`
|
||||
RequiredGPU *string `yaml:"requiredGpu,omitempty" json:"requiredGpu,omitempty"`
|
||||
RequiredCPU *string `yaml:"requiredCpu,omitempty" json:"requiredCpu,omitempty"`
|
||||
LimitedMemory *string `yaml:"limitedMemory,omitempty" json:"limitedMemory,omitempty"`
|
||||
LimitedDisk *string `yaml:"limitedDisk,omitempty" json:"limitedDisk,omitempty"`
|
||||
LimitedGPU *string `yaml:"limitedGPU,omitempty" json:"limitedGPU,omitempty"`
|
||||
LimitedCPU *string `yaml:"limitedCPU,omitempty" json:"limitedCPU,omitempty"`
|
||||
}
|
||||
|
||||
func (c *Chart) Namespace(owner string) string {
|
||||
if c.Shared {
|
||||
return fmt.Sprintf("%s-%s", c.Name, "shared")
|
||||
|
||||
@@ -100,6 +100,7 @@ type ApplicationConfig struct {
|
||||
PodsSelectors []metav1.LabelSelector
|
||||
HardwareRequirement Hardware
|
||||
SharedEntrances []v1alpha1.Entrance
|
||||
SelectedGpuType string
|
||||
}
|
||||
|
||||
func (c *ApplicationConfig) IsMiddleware() bool {
|
||||
@@ -159,6 +160,13 @@ func (c *ApplicationConfig) GenSharedEntranceURL(ctx context.Context) ([]v1alpha
|
||||
return app.GenSharedEntranceURL(ctx)
|
||||
}
|
||||
|
||||
func (c *ApplicationConfig) GetSelectedGpuTypeValue() string {
|
||||
if c.SelectedGpuType == "" {
|
||||
return "none"
|
||||
}
|
||||
return c.SelectedGpuType
|
||||
}
|
||||
|
||||
func (p *ProviderPermission) GetNamespace(ownerName string) string {
|
||||
if p.Namespace != "" {
|
||||
if p.Namespace == "user-space" || p.Namespace == "user-system" {
|
||||
|
||||
@@ -564,7 +564,7 @@ func (h *HelmOps) WaitForStartUp() (bool, error) {
|
||||
}
|
||||
return true, nil
|
||||
}
|
||||
if errors.Is(err, errcode.ErrPodPending) {
|
||||
if errors.Is(err, errcode.ErrPodPending) || errors.Is(err, errcode.ErrServerSidePodPending) {
|
||||
return false, err
|
||||
}
|
||||
|
||||
@@ -576,11 +576,41 @@ func (h *HelmOps) WaitForStartUp() (bool, error) {
|
||||
}
|
||||
|
||||
func (h *HelmOps) isStartUp() (bool, error) {
|
||||
pods, err := h.findAppSelectedPods()
|
||||
if h.app.IsV2() && h.app.IsMultiCharts() {
|
||||
serverPods, err := h.findServerPods()
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
podNames := make([]string, 0)
|
||||
for _, p := range serverPods {
|
||||
podNames = append(podNames, p.Name)
|
||||
}
|
||||
klog.Infof("podSErvers: %v", podNames)
|
||||
|
||||
serverStarted, err := checkIfStartup(serverPods, true)
|
||||
if err != nil {
|
||||
klog.Errorf("v2 app %s server pods not ready: %v", h.app.AppName, err)
|
||||
return false, err
|
||||
}
|
||||
|
||||
if !serverStarted {
|
||||
klog.Infof("v2 app %s server pods not started yet, waiting...", h.app.AppName)
|
||||
return false, nil
|
||||
}
|
||||
|
||||
klog.Infof("v2 app %s server pods started, checking client pods", h.app.AppName)
|
||||
}
|
||||
|
||||
clientPods, err := h.findV1OrClientPods()
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
return checkIfStartup(pods)
|
||||
|
||||
clientStarted, err := checkIfStartup(clientPods, false)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
return clientStarted, nil
|
||||
}
|
||||
|
||||
func (h *HelmOps) findAppSelectedPods() (*corev1.PodList, error) {
|
||||
@@ -610,15 +640,49 @@ func (h *HelmOps) findAppSelectedPods() (*corev1.PodList, error) {
|
||||
return pods, nil
|
||||
}
|
||||
|
||||
func checkIfStartup(pods *corev1.PodList) (bool, error) {
|
||||
if len(pods.Items) == 0 {
|
||||
func (h *HelmOps) findV1OrClientPods() ([]corev1.Pod, error) {
|
||||
podList, err := h.client.KubeClient.Kubernetes().CoreV1().Pods(h.app.Namespace).List(h.ctx, metav1.ListOptions{})
|
||||
if err != nil {
|
||||
klog.Errorf("app %s get pods err %v", h.app.AppName, err)
|
||||
return nil, err
|
||||
}
|
||||
return podList.Items, nil
|
||||
|
||||
}
|
||||
|
||||
func (h *HelmOps) findServerPods() ([]corev1.Pod, error) {
|
||||
pods := make([]corev1.Pod, 0)
|
||||
|
||||
for _, c := range h.app.SubCharts {
|
||||
if !c.Shared {
|
||||
continue
|
||||
}
|
||||
ns := c.Namespace(h.app.OwnerName)
|
||||
podList, err := h.client.KubeClient.Kubernetes().CoreV1().Pods(ns).List(h.ctx, metav1.ListOptions{})
|
||||
if err != nil {
|
||||
klog.Errorf("app %s get pods err %v", h.app.AppName, err)
|
||||
return nil, err
|
||||
}
|
||||
pods = append(pods, podList.Items...)
|
||||
}
|
||||
|
||||
return pods, nil
|
||||
}
|
||||
|
||||
func checkIfStartup(pods []corev1.Pod, isServerSide bool) (bool, error) {
|
||||
if len(pods) == 0 {
|
||||
return false, errors.New("no pod found")
|
||||
}
|
||||
for _, pod := range pods.Items {
|
||||
startedPods := 0
|
||||
totalPods := len(pods)
|
||||
for _, pod := range pods {
|
||||
creationTime := pod.GetCreationTimestamp()
|
||||
pendingDuration := time.Since(creationTime.Time)
|
||||
|
||||
if pod.Status.Phase == corev1.PodPending && pendingDuration > time.Minute*10 {
|
||||
if isServerSide {
|
||||
return false, errcode.ErrServerSidePodPending
|
||||
}
|
||||
return false, errcode.ErrPodPending
|
||||
}
|
||||
totalContainers := len(pod.Spec.Containers)
|
||||
@@ -630,9 +694,12 @@ func checkIfStartup(pods *corev1.PodList) (bool, error) {
|
||||
}
|
||||
}
|
||||
if startedContainers == totalContainers {
|
||||
return true, nil
|
||||
startedPods++
|
||||
}
|
||||
}
|
||||
if totalPods == startedPods {
|
||||
return true, nil
|
||||
}
|
||||
return false, nil
|
||||
}
|
||||
|
||||
@@ -685,7 +752,7 @@ func getApplicationPolicy(policies []appcfg.AppPolicy, entrances []appv1alpha1.E
|
||||
return string(policyStr), nil
|
||||
}
|
||||
|
||||
func parseAppPermission(data []appcfg.AppPermission) []appcfg.AppPermission {
|
||||
func ParseAppPermission(data []appcfg.AppPermission) []appcfg.AppPermission {
|
||||
permissions := make([]appcfg.AppPermission, 0)
|
||||
for _, p := range data {
|
||||
switch perm := p.(type) {
|
||||
@@ -796,7 +863,7 @@ func (h *HelmOps) Install() error {
|
||||
return nil
|
||||
}
|
||||
ok, err := h.WaitForStartUp()
|
||||
if err != nil && errors.Is(err, errcode.ErrPodPending) {
|
||||
if err != nil && (errors.Is(err, errcode.ErrPodPending) || errors.Is(err, errcode.ErrServerSidePodPending)) {
|
||||
return err
|
||||
}
|
||||
if !ok {
|
||||
|
||||
@@ -78,7 +78,7 @@ func (h *HelmOps) Uninstall_(client kubernetes.Interface, actionConfig *action.C
|
||||
return err
|
||||
}
|
||||
|
||||
h.app.Permission = parseAppPermission(h.app.Permission)
|
||||
h.app.Permission = ParseAppPermission(h.app.Permission)
|
||||
var perm []appcfg.ProviderPermission
|
||||
for _, p := range h.app.Permission {
|
||||
if t, ok := p.([]appcfg.ProviderPermission); ok {
|
||||
|
||||
@@ -50,7 +50,7 @@ func (h *HelmOps) SetValues() (values map[string]interface{}, err error) {
|
||||
|
||||
values["domain"] = entries
|
||||
userspace := make(map[string]interface{})
|
||||
h.app.Permission = parseAppPermission(h.app.Permission)
|
||||
h.app.Permission = ParseAppPermission(h.app.Permission)
|
||||
for _, p := range h.app.Permission {
|
||||
switch perm := p.(type) {
|
||||
case appcfg.AppDataPermission, appcfg.AppCachePermission, appcfg.UserDataPermission:
|
||||
@@ -170,17 +170,12 @@ func (h *HelmOps) SetValues() (values map[string]interface{}, err error) {
|
||||
values["cluster"] = map[string]interface{}{
|
||||
"arch": arch,
|
||||
}
|
||||
gpuType, err := utils.FindGpuTypeFromNodes(nodes)
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to get gpuType err=%v", err)
|
||||
return values, err
|
||||
}
|
||||
values["GPU"] = map[string]interface{}{
|
||||
"Type": gpuType,
|
||||
"Type": h.app.GetSelectedGpuTypeValue(),
|
||||
"Cuda": os.Getenv("OLARES_SYSTEM_CUDA_VERSION"),
|
||||
}
|
||||
|
||||
values["gpu"] = gpuType
|
||||
values["gpu"] = h.app.GetSelectedGpuTypeValue()
|
||||
|
||||
if h.app.OIDC.Enabled {
|
||||
err = h.createOIDCClient(values, zone, h.app.Namespace)
|
||||
|
||||
@@ -51,7 +51,7 @@ func (h *HelmOpsV2) ApplyEnv() error {
|
||||
}
|
||||
|
||||
ok, err := h.WaitForStartUp()
|
||||
if err != nil && errors.Is(err, errcode.ErrPodPending) {
|
||||
if err != nil && (errors.Is(err, errcode.ErrPodPending) || errors.Is(err, errcode.ErrServerSidePodPending)) {
|
||||
return err
|
||||
}
|
||||
|
||||
|
||||
@@ -119,7 +119,7 @@ func (h *HelmOpsV2) Install() error {
|
||||
return nil
|
||||
}
|
||||
ok, err := h.WaitForStartUp()
|
||||
if err != nil && errors.Is(err, errcode.ErrPodPending) {
|
||||
if err != nil && (errors.Is(err, errcode.ErrPodPending) || errors.Is(err, errcode.ErrServerSidePodPending)) {
|
||||
klog.Errorf("App %s is pending, err=%v", h.App().AppName, err)
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -91,7 +91,7 @@ func (h *HelmOpsV2) Upgrade() error {
|
||||
}
|
||||
|
||||
ok, err := h.WaitForStartUp()
|
||||
if err != nil && errors.Is(err, errcode.ErrPodPending) {
|
||||
if err != nil && (errors.Is(err, errcode.ErrPodPending) || errors.Is(err, errcode.ErrServerSidePodPending)) {
|
||||
return err
|
||||
}
|
||||
|
||||
|
||||
@@ -13,9 +13,9 @@ import (
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/images"
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/kubesphere"
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/utils"
|
||||
apputils "github.com/beclab/Olares/framework/app-service/pkg/utils/app"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
"k8s.io/klog/v2"
|
||||
ctrl "sigs.k8s.io/controller-runtime"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||
@@ -47,6 +47,29 @@ func (r *downloadingInProgressApp) WaitAsync(ctx context.Context) {
|
||||
return
|
||||
}
|
||||
|
||||
// Check Kubernetes request resources before transitioning to Installing
|
||||
var appConfig *appcfg.ApplicationConfig
|
||||
if err := json.Unmarshal([]byte(r.manager.Spec.Config), &appConfig); err != nil {
|
||||
klog.Errorf("failed to unmarshal app config for %s: %v", r.manager.Spec.AppName, err)
|
||||
updateErr := r.updateStatus(context.TODO(), r.manager, appsv1.InstallFailed, nil, fmt.Sprintf("invalid app config: %v", err), "")
|
||||
if updateErr != nil {
|
||||
klog.Errorf("update app manager %s to %s state failed %v", r.manager.Name, appsv1.InstallFailed.String(), updateErr)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
_, conditionType, checkErr := apputils.CheckAppK8sRequestResource(appConfig, r.manager.Spec.OpType)
|
||||
if checkErr != nil {
|
||||
klog.Errorf("k8s request resource check failed for app %s: %v", r.manager.Spec.AppName, checkErr)
|
||||
opRecord := makeRecord(r.manager, appsv1.InstallFailed, checkErr.Error())
|
||||
updateErr := r.updateStatus(context.TODO(), r.manager, appsv1.InstallFailed, opRecord, checkErr.Error(), string(conditionType))
|
||||
if updateErr != nil {
|
||||
klog.Errorf("update app manager %s to %s state failed %v", r.manager.Name, appsv1.InstallFailed.String(), updateErr)
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
updateErr := r.updateStatus(context.TODO(), r.manager, appsv1.Installing, nil, appsv1.Installing.String(), "")
|
||||
if updateErr != nil {
|
||||
klog.Errorf("update app manager %s to %s state failed %v", r.manager.Name, appsv1.Installing.String(), updateErr)
|
||||
@@ -152,19 +175,8 @@ func (p *DownloadingApp) exec(ctx context.Context) error {
|
||||
},
|
||||
}
|
||||
|
||||
var nodes corev1.NodeList
|
||||
err = p.client.List(ctx, &nodes, &client.ListOptions{})
|
||||
if err != nil {
|
||||
klog.Errorf("list node failed %v", err)
|
||||
return err
|
||||
}
|
||||
gpuType, err := utils.FindGpuTypeFromNodes(&nodes)
|
||||
if err != nil {
|
||||
klog.Errorf("get gpu type failed %v", gpuType)
|
||||
return err
|
||||
}
|
||||
values["GPU"] = map[string]interface{}{
|
||||
"Type": gpuType,
|
||||
"Type": appConfig.GetSelectedGpuTypeValue(),
|
||||
"Cuda": os.Getenv("OLARES_SYSTEM_CUDA_VERSION"),
|
||||
}
|
||||
|
||||
|
||||
@@ -16,6 +16,7 @@ import (
|
||||
apputils "github.com/beclab/Olares/framework/app-service/pkg/utils/app"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/klog/v2"
|
||||
ctrl "sigs.k8s.io/controller-runtime"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||
@@ -104,8 +105,37 @@ func (p *InstallingApp) Exec(ctx context.Context) (StatefulInProgressApp, error)
|
||||
err = ops.Install()
|
||||
if err != nil {
|
||||
klog.Errorf("install app %s failed %v", p.manager.Spec.AppName, err)
|
||||
if errors.Is(err, errcode.ErrPodPending) {
|
||||
if errors.Is(err, errcode.ErrServerSidePodPending) {
|
||||
p.finally = func() {
|
||||
klog.Infof("app %s server side pods is pending, set stop-all annotation and update app state to stopping", p.manager.Spec.AppName)
|
||||
|
||||
var am appsv1.ApplicationManager
|
||||
if err := p.client.Get(context.TODO(), types.NamespacedName{Name: p.manager.Name}, &am); err != nil {
|
||||
klog.Errorf("failed to get application manager: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
if am.Annotations == nil {
|
||||
am.Annotations = make(map[string]string)
|
||||
}
|
||||
am.Annotations[api.AppStopAllKey] = "true"
|
||||
|
||||
if err := p.client.Update(ctx, &am); err != nil {
|
||||
klog.Errorf("failed to set stop-all annotation: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
updateErr := p.updateStatus(ctx, &am, appsv1.Stopping, nil, err.Error(), constants.AppUnschedulable)
|
||||
if updateErr != nil {
|
||||
klog.Errorf("update status failed %v", updateErr)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
if errors.Is(err, errcode.ErrPodPending) {
|
||||
p.finally = func() {
|
||||
klog.Infof("app %s pods is still pending, update app state to stopping", p.manager.Spec.AppName)
|
||||
updateErr := p.updateStatus(context.TODO(), p.manager, appsv1.Stopping, nil, err.Error(), constants.AppUnschedulable)
|
||||
|
||||
@@ -2,13 +2,11 @@ package appstate
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
appsv1 "github.com/beclab/Olares/framework/app-service/api/app.bytetrade.io/v1alpha1"
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/apiserver/api"
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/appcfg"
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/constants"
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/kubeblocks"
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/users/userspace"
|
||||
@@ -60,41 +58,24 @@ func (p *ResumingApp) Exec(ctx context.Context) (StatefulInProgressApp, error) {
|
||||
}
|
||||
|
||||
func (p *ResumingApp) exec(ctx context.Context) error {
|
||||
err := suspendOrResumeApp(ctx, p.client, p.manager, int32(1))
|
||||
if err != nil {
|
||||
klog.Errorf("resume %s %s failed %v", p.manager.Spec.Type, p.manager.Spec.AppName, err)
|
||||
return fmt.Errorf("resume app %s failed %w", p.manager.Spec.AppName, err)
|
||||
}
|
||||
|
||||
// If resume-all is requested, also resume v2 server-side shared charts by scaling them up
|
||||
if p.manager.Annotations[api.AppResumeAllKey] == "true" {
|
||||
var appCfg *appcfg.ApplicationConfig
|
||||
if err := json.Unmarshal([]byte(p.manager.Spec.Config), &appCfg); err != nil {
|
||||
klog.Errorf("unmarshal to appConfig failed %v", err)
|
||||
return err
|
||||
// Check if resume-all is requested for V2 apps to also resume server-side shared charts
|
||||
resumeServer := p.manager.Annotations[api.AppResumeAllKey] == "true"
|
||||
if resumeServer {
|
||||
err := resumeV2AppAll(ctx, p.client, p.manager)
|
||||
if err != nil {
|
||||
klog.Errorf("resume v2 app %s %s failed %v", p.manager.Spec.Type, p.manager.Spec.AppName, err)
|
||||
return fmt.Errorf("resume v2 app %s failed %w", p.manager.Spec.AppName, err)
|
||||
}
|
||||
if appCfg != nil && appCfg.IsV2() && appCfg.HasClusterSharedCharts() {
|
||||
for _, chart := range appCfg.SubCharts {
|
||||
if !chart.Shared {
|
||||
continue
|
||||
}
|
||||
ns := chart.Namespace(appCfg.OwnerName)
|
||||
// create a shallow copy with target namespace/name for scaling logic
|
||||
amCopy := p.manager.DeepCopy()
|
||||
amCopy.Spec.AppNamespace = ns
|
||||
amCopy.Spec.AppName = chart.Name
|
||||
klog.Infof("resume-amCopy.Spec.AppNamespace: %s", ns)
|
||||
klog.Infof("resume-amCopy.Spec.AppName: %s", chart.Name)
|
||||
if err := suspendOrResumeApp(ctx, p.client, amCopy, int32(1)); err != nil {
|
||||
klog.Errorf("failed to resume shared chart %s in namespace %s: %v", chart.Name, ns, err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
} else {
|
||||
err := resumeV1AppOrV2AppClient(ctx, p.client, p.manager)
|
||||
if err != nil {
|
||||
klog.Errorf("resume v2 app %s %s failed %v", p.manager.Spec.Type, p.manager.Spec.AppName, err)
|
||||
return fmt.Errorf("resume v2 app %s failed %w", p.manager.Spec.AppName, err)
|
||||
}
|
||||
}
|
||||
|
||||
if p.manager.Spec.Type == "middleware" && userspace.IsKbMiddlewares(p.manager.Spec.AppName) {
|
||||
err = p.execMiddleware(ctx)
|
||||
err := p.execMiddleware(ctx)
|
||||
if err != nil {
|
||||
klog.Errorf("failed to resume middleware %s,err=%v", p.manager.Spec.AppName, err)
|
||||
return err
|
||||
|
||||
@@ -2,10 +2,13 @@ package appstate
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
appsv1 "github.com/beclab/Olares/framework/app-service/api/app.bytetrade.io/v1alpha1"
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/apiserver/api"
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/kubeblocks"
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/users/userspace"
|
||||
|
||||
kbopv1alpha1 "github.com/apecloud/kubeblocks/apis/operations/v1alpha1"
|
||||
"k8s.io/klog/v2"
|
||||
@@ -44,20 +47,30 @@ func (p *SuspendFailedApp) Exec(ctx context.Context) (StatefulInProgressApp, err
|
||||
}
|
||||
|
||||
func (p *SuspendFailedApp) StateReconcile(ctx context.Context) error {
|
||||
err := suspendOrResumeApp(ctx, p.client, p.manager, int32(0))
|
||||
if err != nil {
|
||||
klog.Errorf("stop-failed-app %s state reconcile failed %v", p.manager.Spec.AppName, err)
|
||||
return err
|
||||
stopServer := p.manager.Annotations[api.AppStopAllKey] == "true"
|
||||
if stopServer {
|
||||
err := suspendV2AppAll(ctx, p.client, p.manager)
|
||||
if err != nil {
|
||||
klog.Errorf("suspend v2 app %s %s failed %v", p.manager.Spec.Type, p.manager.Spec.AppName, err)
|
||||
return fmt.Errorf("suspend v2 app %s failed %w", p.manager.Spec.AppName, err)
|
||||
}
|
||||
} else {
|
||||
err := suspendV1AppOrV2Client(ctx, p.client, p.manager)
|
||||
if err != nil {
|
||||
klog.Errorf("suspend app %s %s failed %v", p.manager.Spec.Type, p.manager.Spec.AppName, err)
|
||||
return fmt.Errorf("suspend app %s failed %w", p.manager.Spec.AppName, err)
|
||||
}
|
||||
}
|
||||
if p.manager.Spec.Type == "middleware" {
|
||||
|
||||
if p.manager.Spec.Type == "middleware" && userspace.IsKbMiddlewares(p.manager.Spec.AppName) {
|
||||
op := kubeblocks.NewOperation(ctx, kbopv1alpha1.StopType, p.manager, p.client)
|
||||
err = op.Stop()
|
||||
err := op.Stop()
|
||||
if err != nil {
|
||||
klog.Errorf("stop-failed-middleware %s state reconcile failed %v", p.manager.Spec.AppName, err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
return err
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *SuspendFailedApp) Cancel(ctx context.Context) error {
|
||||
|
||||
@@ -2,17 +2,17 @@ package appstate
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
appsv1 "github.com/beclab/Olares/framework/app-service/api/app.bytetrade.io/v1alpha1"
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/apiserver/api"
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/appcfg"
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/constants"
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/kubeblocks"
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/users/userspace"
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/utils"
|
||||
apputils "github.com/beclab/Olares/framework/app-service/pkg/utils/app"
|
||||
|
||||
kbopv1alpha1 "github.com/apecloud/kubeblocks/apis/operations/v1alpha1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
@@ -78,35 +78,68 @@ func (p *SuspendingApp) Exec(ctx context.Context) (StatefulInProgressApp, error)
|
||||
}
|
||||
|
||||
func (p *SuspendingApp) exec(ctx context.Context) error {
|
||||
err := suspendOrResumeApp(ctx, p.client, p.manager, int32(0))
|
||||
if err != nil {
|
||||
klog.Errorf("suspend %s %s failed %v", p.manager.Spec.Type, p.manager.Spec.AppName, err)
|
||||
return fmt.Errorf("suspend app %s failed %w", p.manager.Spec.AppName, err)
|
||||
}
|
||||
// If stop-all is requested, also stop v2 server-side shared charts by scaling them down
|
||||
if p.manager.Annotations[api.AppStopAllKey] == "true" {
|
||||
var appCfg *appcfg.ApplicationConfig
|
||||
if err := json.Unmarshal([]byte(p.manager.Spec.Config), &appCfg); err != nil {
|
||||
klog.Errorf("unmarshal to appConfig failed %v", err)
|
||||
return err
|
||||
// Check if stop-all is requested for V2 apps to also stop server-side shared charts
|
||||
stopServer := p.manager.Annotations[api.AppStopAllKey] == "true"
|
||||
if stopServer {
|
||||
err := suspendV2AppAll(ctx, p.client, p.manager)
|
||||
if err != nil {
|
||||
klog.Errorf("suspend v2 app %s %s failed %v", p.manager.Spec.Type, p.manager.Spec.AppName, err)
|
||||
return fmt.Errorf("suspend v2 app %s failed %w", p.manager.Spec.AppName, err)
|
||||
}
|
||||
if appCfg != nil && appCfg.IsV2() && appCfg.HasClusterSharedCharts() {
|
||||
for _, chart := range appCfg.SubCharts {
|
||||
if !chart.Shared {
|
||||
} else {
|
||||
err := suspendV1AppOrV2Client(ctx, p.client, p.manager)
|
||||
if err != nil {
|
||||
klog.Errorf("suspend app %s %s failed %v", p.manager.Spec.Type, p.manager.Spec.AppName, err)
|
||||
return fmt.Errorf("suspend app %s failed %w", p.manager.Spec.AppName, err)
|
||||
}
|
||||
}
|
||||
|
||||
if stopServer {
|
||||
// For V2 cluster-scoped apps, when server is down, stop all other users' clients
|
||||
// because they share the same server and cannot function without it
|
||||
klog.Infof("stopping other users' clients for v2 app %s", p.manager.Spec.AppName)
|
||||
|
||||
var appManagerList appsv1.ApplicationManagerList
|
||||
if err := p.client.List(ctx, &appManagerList); err != nil {
|
||||
klog.Errorf("failed to list application managers: %v", err)
|
||||
} else {
|
||||
// find all ApplicationManagers with same AppName but different AppOwner
|
||||
for _, am := range appManagerList.Items {
|
||||
// Skip if same owner (already handled) or different app
|
||||
if am.Spec.AppName != p.manager.Spec.AppName || am.Spec.AppOwner == p.manager.Spec.AppOwner {
|
||||
continue
|
||||
}
|
||||
ns := chart.Namespace(appCfg.OwnerName)
|
||||
// create a shallow copy with target namespace/name for scaling logic
|
||||
amCopy := p.manager.DeepCopy()
|
||||
amCopy.Spec.AppNamespace = ns
|
||||
amCopy.Spec.AppName = chart.Name
|
||||
klog.Infof("amCopy.Spec.AppNamespace: %s", ns)
|
||||
klog.Infof("amCopy.Spec.AppName: %s", chart.Name)
|
||||
|
||||
if err := suspendOrResumeApp(ctx, p.client, amCopy, int32(0)); err != nil {
|
||||
klog.Errorf("failed to stop shared chart %s in namespace %s: %v", chart.Name, ns, err)
|
||||
if am.Spec.Type != appsv1.App && am.Spec.Type != appsv1.Middleware {
|
||||
continue
|
||||
}
|
||||
|
||||
if am.Status.State == appsv1.Stopped || am.Status.State == appsv1.Stopping {
|
||||
klog.Infof("app %s owner %s already in stopped/stopping state, skip", am.Spec.AppName, am.Spec.AppOwner)
|
||||
continue
|
||||
}
|
||||
|
||||
if !IsOperationAllowed(am.Status.State, appsv1.StopOp) {
|
||||
klog.Infof("app %s owner %s not allowed do stop operation, skip", am.Spec.AppName, am.Spec.AppOwner)
|
||||
continue
|
||||
}
|
||||
opID := strconv.FormatInt(time.Now().Unix(), 10)
|
||||
now := metav1.Now()
|
||||
status := appsv1.ApplicationManagerStatus{
|
||||
OpType: appsv1.StopOp,
|
||||
OpID: opID,
|
||||
State: appsv1.Stopping,
|
||||
StatusTime: &now,
|
||||
UpdateTime: &now,
|
||||
Reason: p.manager.Status.Reason,
|
||||
Message: p.manager.Status.Message,
|
||||
}
|
||||
if _, err := apputils.UpdateAppMgrStatus(am.Name, status); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
klog.Infof("stopping client for user %s, app %s", am.Spec.AppOwner, am.Spec.AppName)
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -22,7 +22,6 @@ import (
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"helm.sh/helm/v3/pkg/action"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
"k8s.io/klog/v2"
|
||||
ctrl "sigs.k8s.io/controller-runtime"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||
@@ -214,19 +213,8 @@ func (p *UpgradingApp) exec(ctx context.Context) error {
|
||||
"username": p.manager.Spec.AppOwner,
|
||||
},
|
||||
}
|
||||
var nodes corev1.NodeList
|
||||
err = p.client.List(ctx, &nodes, &client.ListOptions{})
|
||||
if err != nil {
|
||||
klog.Errorf("list node failed %v", err)
|
||||
return err
|
||||
}
|
||||
gpuType, err := utils.FindGpuTypeFromNodes(&nodes)
|
||||
if err != nil {
|
||||
klog.Errorf("get gpu type failed %v", gpuType)
|
||||
return err
|
||||
}
|
||||
values["GPU"] = map[string]interface{}{
|
||||
"Type": gpuType,
|
||||
"Type": appConfig.GetSelectedGpuTypeValue(),
|
||||
"Cuda": os.Getenv("OLARES_SYSTEM_CUDA_VERSION"),
|
||||
}
|
||||
|
||||
|
||||
@@ -2,10 +2,13 @@ package appstate
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
|
||||
appv1alpha1 "github.com/beclab/Olares/framework/app-service/api/app.bytetrade.io/v1alpha1"
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/apiserver/api"
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/appcfg"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
appsv1 "k8s.io/api/apps/v1"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
@@ -21,25 +24,25 @@ import (
|
||||
const suspendAnnotation = "bytetrade.io/suspend-by"
|
||||
const suspendCauseAnnotation = "bytetrade.io/suspend-cause"
|
||||
|
||||
func suspendOrResumeApp(ctx context.Context, cli client.Client, am *appv1alpha1.ApplicationManager, replicas int32) error {
|
||||
suspend := func(list client.ObjectList) error {
|
||||
namespace := am.Spec.AppNamespace
|
||||
err := cli.List(ctx, list, client.InNamespace(namespace))
|
||||
if err != nil && !apierrors.IsNotFound(err) {
|
||||
klog.Errorf("Failed to get workload namespace=%s err=%v", namespace, err)
|
||||
// suspendOrResumeApp suspends or resumes an application.
|
||||
func suspendOrResumeApp(ctx context.Context, cli client.Client, am *appv1alpha1.ApplicationManager, replicas int32, stopOrResumeServer bool) error {
|
||||
suspendOrResume := func(list client.ObjectList, targetNamespace, targetAppName string) error {
|
||||
err := cli.List(ctx, list, client.InNamespace(targetNamespace))
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to get workload namespace=%s err=%v", targetNamespace, err)
|
||||
return err
|
||||
}
|
||||
|
||||
listObjects, err := apimeta.ExtractList(list)
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to extract list namespace=%s err=%v", namespace, err)
|
||||
klog.Errorf("Failed to extract list namespace=%s err=%v", targetNamespace, err)
|
||||
return err
|
||||
}
|
||||
check := func(appName, deployName string) bool {
|
||||
if namespace == fmt.Sprintf("user-space-%s", am.Spec.AppOwner) ||
|
||||
namespace == fmt.Sprintf("user-system-%s", am.Spec.AppOwner) ||
|
||||
namespace == "os-platform" ||
|
||||
namespace == "os-framework" {
|
||||
if targetNamespace == fmt.Sprintf("user-space-%s", am.Spec.AppOwner) ||
|
||||
targetNamespace == fmt.Sprintf("user-system-%s", am.Spec.AppOwner) ||
|
||||
targetNamespace == "os-platform" ||
|
||||
targetNamespace == "os-framework" {
|
||||
if appName == deployName {
|
||||
return true
|
||||
}
|
||||
@@ -54,7 +57,7 @@ func suspendOrResumeApp(ctx context.Context, cli client.Client, am *appv1alpha1.
|
||||
workloadName := ""
|
||||
switch workload := w.(type) {
|
||||
case *appsv1.Deployment:
|
||||
if check(am.Spec.AppName, workload.Name) {
|
||||
if check(targetAppName, workload.Name) {
|
||||
if workload.Annotations == nil {
|
||||
workload.Annotations = make(map[string]string)
|
||||
}
|
||||
@@ -64,7 +67,7 @@ func suspendOrResumeApp(ctx context.Context, cli client.Client, am *appv1alpha1.
|
||||
workloadName = workload.Namespace + "/" + workload.Name
|
||||
}
|
||||
case *appsv1.StatefulSet:
|
||||
if check(am.Spec.AppName, workload.Name) {
|
||||
if check(targetAppName, workload.Name) {
|
||||
if workload.Annotations == nil {
|
||||
workload.Annotations = make(map[string]string)
|
||||
}
|
||||
@@ -92,15 +95,79 @@ func suspendOrResumeApp(ctx context.Context, cli client.Client, am *appv1alpha1.
|
||||
} // end of suspend func
|
||||
|
||||
var deploymentList appsv1.DeploymentList
|
||||
err := suspend(&deploymentList)
|
||||
err := suspendOrResume(&deploymentList, am.Spec.AppNamespace, am.Spec.AppName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var stsList appsv1.StatefulSetList
|
||||
err = suspend(&stsList)
|
||||
err = suspendOrResume(&stsList, am.Spec.AppNamespace, am.Spec.AppName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return err
|
||||
// If stopOrResumeServer is true, also suspend/resume shared server charts for V2 apps
|
||||
if stopOrResumeServer {
|
||||
var appCfg *appcfg.ApplicationConfig
|
||||
if err := json.Unmarshal([]byte(am.Spec.Config), &appCfg); err != nil {
|
||||
klog.Warningf("failed to unmarshal app config for stopServer check: %v", err)
|
||||
return err
|
||||
}
|
||||
|
||||
if appCfg != nil && appCfg.IsV2() && appCfg.HasClusterSharedCharts() {
|
||||
for _, chart := range appCfg.SubCharts {
|
||||
if !chart.Shared {
|
||||
continue
|
||||
}
|
||||
ns := chart.Namespace(am.Spec.AppOwner)
|
||||
if replicas == 0 {
|
||||
klog.Infof("suspending shared chart %s in namespace %s", chart.Name, ns)
|
||||
} else {
|
||||
klog.Infof("resuming shared chart %s in namespace %s", chart.Name, ns)
|
||||
}
|
||||
|
||||
var sharedDeploymentList appsv1.DeploymentList
|
||||
if err := suspendOrResume(&sharedDeploymentList, ns, chart.Name); err != nil {
|
||||
klog.Errorf("failed to scale deployments in shared chart %s namespace %s: %v", chart.Name, ns, err)
|
||||
return err
|
||||
}
|
||||
|
||||
var sharedStsList appsv1.StatefulSetList
|
||||
if err := suspendOrResume(&sharedStsList, ns, chart.Name); err != nil {
|
||||
klog.Errorf("failed to scale statefulsets in shared chart %s namespace %s: %v", chart.Name, ns, err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Reset the stop-all/resume-all annotation after processing
|
||||
if am.Annotations != nil {
|
||||
delete(am.Annotations, api.AppStopAllKey)
|
||||
delete(am.Annotations, api.AppResumeAllKey)
|
||||
if err := cli.Update(ctx, am); err != nil {
|
||||
klog.Warningf("failed to reset stop-all/resume-all annotations for app=%s owner=%s: %v", am.Spec.AppName, am.Spec.AppOwner, err)
|
||||
// Don't return error, operation already succeeded
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func suspendV1AppOrV2Client(ctx context.Context, cli client.Client, am *appv1alpha1.ApplicationManager) error {
|
||||
return suspendOrResumeApp(ctx, cli, am, 0, false)
|
||||
}
|
||||
|
||||
func suspendV2AppAll(ctx context.Context, cli client.Client, am *appv1alpha1.ApplicationManager) error {
|
||||
return suspendOrResumeApp(ctx, cli, am, 0, true)
|
||||
}
|
||||
|
||||
func resumeV1AppOrV2AppClient(ctx context.Context, cli client.Client, am *appv1alpha1.ApplicationManager) error {
|
||||
return suspendOrResumeApp(ctx, cli, am, 1, false)
|
||||
}
|
||||
|
||||
func resumeV2AppAll(ctx context.Context, cli client.Client, am *appv1alpha1.ApplicationManager) error {
|
||||
return suspendOrResumeApp(ctx, cli, am, 1, true)
|
||||
}
|
||||
|
||||
func isStartUp(am *appv1alpha1.ApplicationManager, cli client.Client) (bool, error) {
|
||||
|
||||
@@ -78,13 +78,15 @@ const (
|
||||
SidecarInitContainerName = "olares-sidecar-init"
|
||||
EnvoyConfigWorkDirName = "envoy-config"
|
||||
|
||||
ByteTradeAuthor = "bytetrade.io"
|
||||
NvshareGPU = "nvshare.com/gpu"
|
||||
NvidiaGPU = "nvidia.com/gpu"
|
||||
VirtAiTechVGPU = "virtaitech.com/gpu"
|
||||
PatchOpAdd = "add"
|
||||
PatchOpReplace = "replace"
|
||||
EnvNvshareManagedMemory = "NVSHARE_MANAGED_MEMORY"
|
||||
ByteTradeAuthor = "bytetrade.io"
|
||||
PatchOpAdd = "add"
|
||||
PatchOpReplace = "replace"
|
||||
EnvGPUType = "GPU_TYPE"
|
||||
|
||||
// gpu resource keys
|
||||
NvidiaGPU = "nvidia.com/gpu"
|
||||
NvidiaGB10GPU = "nvidia.com/gb10"
|
||||
AMDAPU = "amd.com/apu"
|
||||
|
||||
AuthorizationLevelOfPublic = "public"
|
||||
AuthorizationLevelOfPrivate = "private"
|
||||
|
||||
@@ -3,5 +3,6 @@ package errcode
|
||||
import "errors"
|
||||
|
||||
var (
|
||||
ErrPodPending = errors.New("pod is pending")
|
||||
ErrServerSidePodPending = errors.New("server side pod is pending")
|
||||
ErrPodPending = errors.New("pod is pending")
|
||||
)
|
||||
|
||||
@@ -160,6 +160,7 @@ func PublishAppEventToQueue(p utils.EventParams) {
|
||||
return p.RawAppName
|
||||
}(),
|
||||
Title: p.Title,
|
||||
Icon: p.Icon,
|
||||
Reason: p.Reason,
|
||||
Message: p.Message,
|
||||
SharedEntrances: p.SharedEntrances,
|
||||
|
||||
@@ -233,6 +233,7 @@ func (imc *ImageManagerClient) updateProgress(ctx context.Context, am *appv1alph
|
||||
RawAppName: am.Spec.RawAppName,
|
||||
Type: am.Spec.Type.String(),
|
||||
Title: apputils.AppTitle(am.Spec.Config),
|
||||
Icon: apputils.AppIcon(am.Spec.Config),
|
||||
})
|
||||
}
|
||||
klog.Infof("app %s download progress.... %v", am.Spec.AppName, progressStr)
|
||||
|
||||
@@ -273,11 +273,7 @@ func (c *Creator) installSysApps(ctx context.Context, bflPod *corev1.Pod) error
|
||||
"arch": arch,
|
||||
}
|
||||
|
||||
gpuType, err := utils.FindGpuTypeFromNodes(&nodes)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
vals["gpu"] = gpuType
|
||||
vals["gpu"] = "none" // unused currently
|
||||
|
||||
userIndex, userSubnet, err := c.getUserSubnet(ctx)
|
||||
if err != nil {
|
||||
|
||||
@@ -16,6 +16,7 @@ import (
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
|
||||
sysv1alpha1 "github.com/beclab/Olares/framework/app-service/api/sys.bytetrade.io/v1alpha1"
|
||||
"github.com/go-viper/mapstructure/v2"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||
|
||||
"github.com/beclab/Olares/framework/app-service/api/app.bytetrade.io/v1alpha1"
|
||||
@@ -674,6 +675,7 @@ type ConfigOptions struct {
|
||||
MarketSource string
|
||||
IsAdmin bool
|
||||
RawAppName string
|
||||
SelectedGpu string
|
||||
}
|
||||
|
||||
// GetAppConfig get app installation configuration from app store
|
||||
@@ -740,7 +742,7 @@ func getAppConfigFromRepo(ctx context.Context, options *ConfigOptions) (*appcfg.
|
||||
return getAppConfigFromConfigurationFile(options, chartPath)
|
||||
}
|
||||
|
||||
func toApplicationConfig(app, chart, rawAppName string, cfg *appcfg.AppConfiguration) (*appcfg.ApplicationConfig, string, error) {
|
||||
func toApplicationConfig(app, chart, rawAppName, selectedGpu string, cfg *appcfg.AppConfiguration) (*appcfg.ApplicationConfig, string, error) {
|
||||
var permission []appcfg.AppPermission
|
||||
if cfg.Permission.AppData {
|
||||
permission = append(permission, appcfg.AppDataRW)
|
||||
@@ -788,6 +790,57 @@ func toApplicationConfig(app, chart, rawAppName string, cfg *appcfg.AppConfigura
|
||||
return nil, chart, err
|
||||
}
|
||||
|
||||
// set suppertedGpu to ["nvidia","nvidia-gb10"] by default
|
||||
if len(cfg.Spec.SupportedGpu) == 0 {
|
||||
cfg.Spec.SupportedGpu = []interface{}{utils.NvidiaCardType, utils.GB10ChipType}
|
||||
}
|
||||
|
||||
// try to get selected GPU type special resource requirement
|
||||
if selectedGpu != "" {
|
||||
found := false
|
||||
for _, supportedGpu := range cfg.Spec.SupportedGpu {
|
||||
if str, ok := supportedGpu.(string); ok && str == selectedGpu {
|
||||
found = true
|
||||
break
|
||||
}
|
||||
|
||||
if supportedGpuResourceMap, ok := supportedGpu.(map[string]interface{}); ok {
|
||||
if resourceRequirement, ok := supportedGpuResourceMap[selectedGpu].(map[string]interface{}); ok {
|
||||
found = true
|
||||
var specialResource appcfg.SpecialResource
|
||||
err := mapstructure.Decode(resourceRequirement, &specialResource)
|
||||
if err != nil {
|
||||
return nil, chart, fmt.Errorf("failed to decode special resource for selected GPU type %s: %v", selectedGpu, err)
|
||||
}
|
||||
|
||||
for _, resSetter := range []struct {
|
||||
v **resource.Quantity
|
||||
s *string
|
||||
}{
|
||||
{v: &mem, s: specialResource.RequiredMemory},
|
||||
{v: &disk, s: specialResource.RequiredDisk},
|
||||
{v: &cpu, s: specialResource.RequiredCPU},
|
||||
{v: &gpu, s: specialResource.RequiredGPU},
|
||||
} {
|
||||
|
||||
if resSetter.s != nil && *resSetter.s != "" {
|
||||
*resSetter.v, err = valuePtr(resource.ParseQuantity(*resSetter.s))
|
||||
if err != nil {
|
||||
return nil, chart, fmt.Errorf("failed to parse special resource quantity %s: %v", *resSetter.s, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
break
|
||||
} // end if selected gpu's resource requirement found
|
||||
} // end if supportedGpu is map
|
||||
} // end for supportedGpu
|
||||
|
||||
if !found {
|
||||
return nil, chart, fmt.Errorf("selected GPU type %s is not supported", selectedGpu)
|
||||
}
|
||||
}
|
||||
|
||||
// transform from Policy to AppPolicy
|
||||
var policies []appcfg.AppPolicy
|
||||
for _, p := range cfg.Options.Policies {
|
||||
@@ -877,6 +930,7 @@ func toApplicationConfig(app, chart, rawAppName string, cfg *appcfg.AppConfigura
|
||||
PodsSelectors: podSelectors,
|
||||
HardwareRequirement: cfg.Spec.Hardware,
|
||||
SharedEntrances: cfg.SharedEntrances,
|
||||
SelectedGpuType: selectedGpu,
|
||||
}, chart, nil
|
||||
}
|
||||
|
||||
@@ -890,7 +944,7 @@ func getAppConfigFromConfigurationFile(opt *ConfigOptions, chartPath string) (*a
|
||||
return nil, chartPath, err
|
||||
}
|
||||
|
||||
return toApplicationConfig(opt.App, chartPath, opt.RawAppName, &cfg)
|
||||
return toApplicationConfig(opt.App, chartPath, opt.RawAppName, opt.SelectedGpu, &cfg)
|
||||
}
|
||||
|
||||
func checkVersionFormat(constraint string) error {
|
||||
@@ -1080,13 +1134,28 @@ func IsClonedApp(appName, rawAppName string) bool {
|
||||
}
|
||||
|
||||
func AppTitle(config string) string {
|
||||
var cfg appcfg.ApplicationConfig
|
||||
err := json.Unmarshal([]byte(config), &cfg)
|
||||
if err != nil {
|
||||
cfg := unmarshalApplicationConfig(config)
|
||||
if cfg == nil {
|
||||
return ""
|
||||
}
|
||||
return cfg.Title
|
||||
}
|
||||
func AppIcon(config string) string {
|
||||
cfg := unmarshalApplicationConfig(config)
|
||||
if cfg == nil {
|
||||
return ""
|
||||
}
|
||||
return cfg.Icon
|
||||
}
|
||||
|
||||
func unmarshalApplicationConfig(config string) *appcfg.ApplicationConfig {
|
||||
var cfg appcfg.ApplicationConfig
|
||||
err := json.Unmarshal([]byte(config), &cfg)
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
return &cfg
|
||||
}
|
||||
|
||||
func GetRawAppName(AppName, rawAppName string) string {
|
||||
if rawAppName == "" {
|
||||
|
||||
@@ -24,6 +24,7 @@ import (
|
||||
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/utils"
|
||||
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
"k8s.io/apimachinery/pkg/api/resource"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
@@ -233,7 +234,9 @@ func CheckAppRequirement(token string, appConfig *appcfg.ApplicationConfig, op v
|
||||
return constants.CPU, constants.SystemCPUPressure, fmt.Errorf(constants.SystemCPUPressureMessage, op)
|
||||
}
|
||||
}
|
||||
if appConfig.Requirement.GPU != nil {
|
||||
|
||||
// only support nvidia gpu managment by HAMi for now
|
||||
if appConfig.Requirement.GPU != nil && appConfig.GetSelectedGpuTypeValue() == utils.NvidiaCardType {
|
||||
if !appConfig.Requirement.GPU.IsZero() && metrics.GPU.Total <= 0 {
|
||||
return constants.GPU, constants.SystemGPUNotAvailable, fmt.Errorf(constants.SystemGPUNotAvailableMessage, op)
|
||||
|
||||
@@ -260,42 +263,10 @@ func CheckAppRequirement(token string, appConfig *appcfg.ApplicationConfig, op v
|
||||
}
|
||||
}
|
||||
|
||||
allocatedResources, err := getRequestResources()
|
||||
if err != nil {
|
||||
return "", "", err
|
||||
}
|
||||
if len(allocatedResources) == 1 {
|
||||
sufficientCPU, sufficientMemory := false, false
|
||||
if appConfig.Requirement.CPU == nil {
|
||||
sufficientCPU = true
|
||||
}
|
||||
if appConfig.Requirement.Memory == nil {
|
||||
sufficientMemory = true
|
||||
}
|
||||
for _, v := range allocatedResources {
|
||||
if appConfig.Requirement.CPU != nil {
|
||||
if v.cpu.allocatable.Cmp(*appConfig.Requirement.CPU) > 0 {
|
||||
sufficientCPU = true
|
||||
}
|
||||
}
|
||||
if appConfig.Requirement.Memory != nil {
|
||||
if v.memory.allocatable.Cmp(*appConfig.Requirement.Memory) > 0 {
|
||||
sufficientMemory = true
|
||||
}
|
||||
}
|
||||
}
|
||||
if !sufficientCPU {
|
||||
return constants.CPU, constants.K8sRequestCPUPressure, fmt.Errorf(constants.K8sRequestCPUPressureMessage, op)
|
||||
}
|
||||
if !sufficientMemory {
|
||||
return constants.Memory, constants.K8sRequestMemoryPressure, fmt.Errorf(constants.K8sRequestMemoryPressureMessage, op)
|
||||
}
|
||||
}
|
||||
|
||||
return "", "", nil
|
||||
return CheckAppK8sRequestResource(appConfig, op)
|
||||
}
|
||||
|
||||
func getRequestResources() (map[string]resources, error) {
|
||||
func GetRequestResources() (map[string]resources, error) {
|
||||
config, err := ctrl.GetConfig()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -429,7 +400,9 @@ func GetClusterResource(token string) (*prometheus.ClusterMetrics, []string, err
|
||||
arches.Insert(n.Labels["kubernetes.io/arch"])
|
||||
if quantity, ok := n.Status.Capacity[constants.NvidiaGPU]; ok {
|
||||
total += quantity.AsApproximateFloat64()
|
||||
} else if quantity, ok = n.Status.Capacity[constants.VirtAiTechVGPU]; ok {
|
||||
} else if quantity, ok = n.Status.Capacity[constants.NvidiaGB10GPU]; ok {
|
||||
total += quantity.AsApproximateFloat64()
|
||||
} else if quantity, ok = n.Status.Capacity[constants.AMDAPU]; ok {
|
||||
total += quantity.AsApproximateFloat64()
|
||||
}
|
||||
}
|
||||
@@ -960,3 +933,83 @@ func CheckCloneEntrances(ctrlClient client.Client, appConfig *appcfg.Application
|
||||
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func GetClusterAvailableResource() (*resources, error) {
|
||||
config, err := ctrl.GetConfig()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
client, err := kubernetes.NewForConfig(config)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
nodes, err := client.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
initAllocatable := resource.MustParse("0")
|
||||
availableResources := resources{
|
||||
cpu: usage{allocatable: &initAllocatable},
|
||||
memory: usage{allocatable: &initAllocatable},
|
||||
}
|
||||
nodeList := make([]corev1.Node, 0)
|
||||
for _, node := range nodes.Items {
|
||||
if !utils.IsNodeReady(&node) || node.Spec.Unschedulable {
|
||||
continue
|
||||
}
|
||||
nodeList = append(nodeList, node)
|
||||
}
|
||||
if len(nodeList) == 0 {
|
||||
return nil, errors.New("cluster has no suitable node to schedule")
|
||||
}
|
||||
for _, node := range nodeList {
|
||||
availableResources.cpu.allocatable.Add(*node.Status.Allocatable.Cpu())
|
||||
availableResources.memory.allocatable.Add(*node.Status.Allocatable.Memory())
|
||||
fieldSelector := fmt.Sprintf("spec.nodeName=%s,status.phase!=Failed,status.phase!=Succeeded", node.Name)
|
||||
pods, err := client.CoreV1().Pods("").List(context.TODO(), metav1.ListOptions{
|
||||
FieldSelector: fieldSelector,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for _, pod := range pods.Items {
|
||||
for _, container := range pod.Spec.Containers {
|
||||
availableResources.cpu.allocatable.Sub(*container.Resources.Requests.Cpu())
|
||||
availableResources.memory.allocatable.Sub(*container.Resources.Requests.Memory())
|
||||
}
|
||||
}
|
||||
}
|
||||
return &availableResources, nil
|
||||
}
|
||||
|
||||
func CheckAppK8sRequestResource(appConfig *appcfg.ApplicationConfig, op v1alpha1.OpType) (constants.ResourceType, constants.ResourceConditionType, error) {
|
||||
availableResources, err := GetClusterAvailableResource()
|
||||
if err != nil {
|
||||
return "", "", err
|
||||
}
|
||||
if appConfig == nil {
|
||||
return "", "", errors.New("nil appConfig")
|
||||
}
|
||||
|
||||
sufficientCPU, sufficientMemory := false, false
|
||||
|
||||
if appConfig.Requirement.CPU == nil {
|
||||
sufficientCPU = true
|
||||
}
|
||||
if appConfig.Requirement.Memory == nil {
|
||||
sufficientMemory = true
|
||||
}
|
||||
if appConfig.Requirement.CPU != nil && availableResources.cpu.allocatable.Cmp(*appConfig.Requirement.CPU) > 0 {
|
||||
sufficientCPU = true
|
||||
}
|
||||
if appConfig.Requirement.Memory != nil && availableResources.memory.allocatable.Cmp(*appConfig.Requirement.Memory) > 0 {
|
||||
sufficientMemory = true
|
||||
}
|
||||
if !sufficientCPU {
|
||||
return constants.CPU, constants.K8sRequestCPUPressure, fmt.Errorf(constants.K8sRequestCPUPressureMessage, op)
|
||||
}
|
||||
if !sufficientMemory {
|
||||
return constants.Memory, constants.K8sRequestMemoryPressure, fmt.Errorf(constants.K8sRequestMemoryPressureMessage, op)
|
||||
}
|
||||
return "", "", nil
|
||||
}
|
||||
|
||||
12
framework/app-service/pkg/utils/gpu_types.go
Normal file
12
framework/app-service/pkg/utils/gpu_types.go
Normal file
@@ -0,0 +1,12 @@
|
||||
package utils
|
||||
|
||||
const (
|
||||
NodeGPUTypeLabel = "gpu.bytetrade.io/type"
|
||||
)
|
||||
|
||||
const (
|
||||
NvidiaCardType = "nvidia" // handling by HAMi
|
||||
AmdGpuCardType = "amd-gpu" //
|
||||
AmdApuCardType = "amd-apu" // AMD APU with integrated GPU , AI Max 395 etc.
|
||||
GB10ChipType = "nvidia-gb10" // NVIDIA GB10 Superchip & unified system memory
|
||||
)
|
||||
@@ -103,24 +103,37 @@ func GetAllNodesTunnelIPCIDRs() (cidrs []string) {
|
||||
return cidrs
|
||||
}
|
||||
|
||||
func FindGpuTypeFromNodes(nodes *corev1.NodeList) (string, error) {
|
||||
gpuType := "none"
|
||||
// func FindGpuTypeFromNodes(nodes *corev1.NodeList) (string, error) {
|
||||
// gpuType := "none"
|
||||
// if nodes == nil {
|
||||
// return gpuType, errors.New("empty node list")
|
||||
// }
|
||||
// for _, n := range nodes.Items {
|
||||
// if _, ok := n.Status.Capacity[constants.NvidiaGPU]; ok {
|
||||
// if _, ok = n.Status.Capacity[constants.NvshareGPU]; ok {
|
||||
// return "nvshare", nil
|
||||
|
||||
// }
|
||||
// gpuType = "nvidia"
|
||||
// }
|
||||
// if _, ok := n.Status.Capacity[constants.VirtAiTechVGPU]; ok {
|
||||
// return "virtaitech", nil
|
||||
// }
|
||||
// }
|
||||
// return gpuType, nil
|
||||
// }
|
||||
|
||||
func GetAllGpuTypesFromNodes(nodes *corev1.NodeList) (map[string]struct{}, error) {
|
||||
gpuTypes := make(map[string]struct{})
|
||||
if nodes == nil {
|
||||
return gpuType, errors.New("empty node list")
|
||||
return gpuTypes, errors.New("empty node list")
|
||||
}
|
||||
for _, n := range nodes.Items {
|
||||
if _, ok := n.Status.Capacity[constants.NvidiaGPU]; ok {
|
||||
if _, ok = n.Status.Capacity[constants.NvshareGPU]; ok {
|
||||
return "nvshare", nil
|
||||
|
||||
}
|
||||
gpuType = "nvidia"
|
||||
}
|
||||
if _, ok := n.Status.Capacity[constants.VirtAiTechVGPU]; ok {
|
||||
return "virtaitech", nil
|
||||
if typeLabel, ok := n.Labels[NodeGPUTypeLabel]; ok {
|
||||
gpuTypes[typeLabel] = struct{}{} // TODO: add driver version info
|
||||
}
|
||||
}
|
||||
return gpuType, nil
|
||||
return gpuTypes, nil
|
||||
}
|
||||
|
||||
func IsNodeReady(node *corev1.Node) bool {
|
||||
|
||||
@@ -25,6 +25,7 @@ type Event struct {
|
||||
User string `json:"user"`
|
||||
EntranceStatuses []v1alpha1.EntranceStatus `json:"entranceStatuses,omitempty"`
|
||||
Title string `json:"title,omitempty"`
|
||||
Icon string `json:"icon,omitempty"`
|
||||
Reason string `json:"reason,omitempty"`
|
||||
Message string `json:"message,omitempty"`
|
||||
SharedEntrances []v1alpha1.Entrance `json:"sharedEntrances,omitempty"`
|
||||
@@ -45,6 +46,7 @@ type EventParams struct {
|
||||
Reason string
|
||||
Message string
|
||||
SharedEntrances []v1alpha1.Entrance
|
||||
Icon string
|
||||
}
|
||||
|
||||
func PublishEvent(nc *nats.Conn, subject string, data interface{}) error {
|
||||
|
||||
@@ -30,7 +30,6 @@ import (
|
||||
admissionv1 "k8s.io/api/admission/v1"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
"k8s.io/apimachinery/pkg/api/resource"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
@@ -544,16 +543,21 @@ type EnvKeyValue struct {
|
||||
}
|
||||
|
||||
// CreatePatchForDeployment add gpu env for deployment and returns patch bytes.
|
||||
func CreatePatchForDeployment(tpl *corev1.PodTemplateSpec, namespace string, gpuRequired *resource.Quantity, typeKey string, envKeyValues []EnvKeyValue) ([]byte, error) {
|
||||
patches, err := addResourceLimits(tpl, namespace, gpuRequired, typeKey, envKeyValues)
|
||||
func CreatePatchForDeployment(tpl *corev1.PodTemplateSpec, typeKey string, envKeyValues []EnvKeyValue) ([]byte, error) {
|
||||
patches, err := addResourceLimits(tpl, typeKey, envKeyValues)
|
||||
if err != nil {
|
||||
return []byte{}, err
|
||||
}
|
||||
return json.Marshal(patches)
|
||||
}
|
||||
|
||||
func addResourceLimits(tpl *corev1.PodTemplateSpec, namespace string, gpuRequired *resource.Quantity, typeKey string, envKeyValues []EnvKeyValue) (patch []patchOp, err error) {
|
||||
if typeKey == constants.NvidiaGPU || typeKey == constants.NvshareGPU {
|
||||
func addResourceLimits(tpl *corev1.PodTemplateSpec, typeKey string, envKeyValues []EnvKeyValue) (patch []patchOp, err error) {
|
||||
if typeKey == "" {
|
||||
klog.Warning("No gpu type selected, skip adding resource limits")
|
||||
return patch, nil
|
||||
}
|
||||
|
||||
if typeKey == constants.NvidiaGPU || typeKey == constants.NvidiaGB10GPU {
|
||||
if tpl.Spec.RuntimeClassName != nil {
|
||||
patch = append(patch, patchOp{
|
||||
Op: constants.PatchOpReplace,
|
||||
@@ -584,7 +588,10 @@ func addResourceLimits(tpl *corev1.PodTemplateSpec, namespace string, gpuRequire
|
||||
t := make(map[string]map[string]string)
|
||||
t["limits"] = map[string]string{}
|
||||
for k, v := range container.Resources.Limits {
|
||||
if k.String() == constants.NvidiaGPU || k.String() == constants.NvshareGPU || k.String() == constants.VirtAiTechVGPU {
|
||||
if k.String() == constants.NvidiaGPU ||
|
||||
k.String() == constants.NvidiaGB10GPU ||
|
||||
k.String() == constants.AMDAPU {
|
||||
// unset all previous gpu limits
|
||||
continue
|
||||
}
|
||||
t["limits"][k.String()] = v.String()
|
||||
|
||||
Reference in New Issue
Block a user