Compare commits

...

5 Commits

Author SHA1 Message Date
hysyeah
d8d0078bfd fix: delay create nats conn (#2391) 2026-01-09 15:16:11 +08:00
hys
ffbcb13ae5 update appservice image tag to 0.4.74 2026-01-08 21:12:38 +08:00
hysyeah
f3eef1f115 fix: push all nats event to queue (#2374)
* fix: push all nats event to queue and via one connection

* fix: wrap yaml decode error
2026-01-08 21:12:38 +08:00
hys
42c70afad5 fix: helm upgrade do not use atomic param and allow upgrade failed release 2026-01-08 21:12:38 +08:00
hys
7273577f4f fix: failed release upgrade 2026-01-08 21:12:38 +08:00
22 changed files with 213 additions and 318 deletions

View File

@@ -170,7 +170,7 @@ spec:
priorityClassName: "system-cluster-critical"
containers:
- name: app-service
image: beclab/app-service:0.4.73
image: beclab/app-service:0.4.74
imagePullPolicy: IfNotPresent
securityContext:
runAsUser: 0

View File

@@ -138,10 +138,16 @@ func main() {
setupLog.Error(err, "Unable to create controller", "controller", "Security")
os.Exit(1)
}
appEventQueue := appevent.NewAppEventQueue(ictx)
appEventQueue := appevent.NewAppEventQueue(ictx, nil)
appevent.SetAppEventQueue(appEventQueue)
go appEventQueue.Run()
defer func() {
if nc := appEventQueue.GetNatsConn(); nc != nil {
nc.Drain()
}
}()
if err = (&controllers.ApplicationManagerController{
Client: mgr.GetClient(),
KubeConfig: config,
@@ -198,6 +204,7 @@ func main() {
if err = (&controllers.NodeAlertController{
Client: mgr.GetClient(),
KubeConfig: config,
NatsConn: nil,
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "Unable to create controller", "controller", "NodeAlert")
os.Exit(1)

View File

@@ -6,13 +6,13 @@ import (
"strconv"
"time"
"github.com/beclab/Olares/framework/app-service/pkg/constants"
"github.com/beclab/Olares/framework/app-service/pkg/utils"
appv1alpha1 "github.com/beclab/Olares/framework/app-service/api/app.bytetrade.io/v1alpha1"
sysv1alpha1 "github.com/beclab/Olares/framework/app-service/api/sys.bytetrade.io/v1alpha1"
"github.com/beclab/Olares/framework/app-service/pkg/appstate"
"github.com/beclab/Olares/framework/app-service/pkg/constants"
"github.com/beclab/Olares/framework/app-service/pkg/utils"
apputils "github.com/beclab/Olares/framework/app-service/pkg/utils/app"
coordinationv1 "k8s.io/api/coordination/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
@@ -218,20 +218,10 @@ func (r *AppEnvController) triggerApplyEnv(ctx context.Context, appEnv *sysv1alp
UpdateTime: &now,
}
am, err := apputils.UpdateAppMgrStatus(targetAppMgr.Name, status)
_, err = apputils.UpdateAppMgrStatus(targetAppMgr.Name, status)
if err != nil {
return fmt.Errorf("failed to update ApplicationManager Status: %v", err)
}
utils.PublishAppEvent(utils.EventParams{
Owner: am.Spec.AppOwner,
Name: am.Spec.AppName,
OpType: string(am.Status.OpType),
OpID: opID,
State: appv1alpha1.ApplyingEnv.String(),
RawAppName: am.Spec.RawAppName,
Type: am.Spec.Type.String(),
Title: apputils.AppTitle(am.Spec.Config),
})
klog.Infof("Successfully triggered ApplyEnv for app: %s owner: %s", appEnv.AppName, appEnv.AppOwner)
return nil

View File

@@ -7,7 +7,10 @@ import (
appv1alpha1 "github.com/beclab/Olares/framework/app-service/api/app.bytetrade.io/v1alpha1"
"github.com/beclab/Olares/framework/app-service/pkg/appstate"
appevent "github.com/beclab/Olares/framework/app-service/pkg/event"
"github.com/beclab/Olares/framework/app-service/pkg/images"
"github.com/beclab/Olares/framework/app-service/pkg/utils"
apputils "github.com/beclab/Olares/framework/app-service/pkg/utils/app"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/rest"
@@ -167,6 +170,11 @@ func (r *ApplicationManagerController) preEnqueueCheckForUpdate(old, new client.
if curAppMgr.Spec.Type != appv1alpha1.App && curAppMgr.Spec.Type != appv1alpha1.Middleware {
return false
}
if oldAppMgr.Status.State != curAppMgr.Status.State {
r.publishStateChangeEvent(curAppMgr)
}
if curAppMgr.Status.OpGeneration <= oldAppMgr.Status.OpGeneration {
return false
}
@@ -174,6 +182,22 @@ func (r *ApplicationManagerController) preEnqueueCheckForUpdate(old, new client.
return true
}
func (r *ApplicationManagerController) publishStateChangeEvent(am *appv1alpha1.ApplicationManager) {
appevent.PublishAppEventToQueue(utils.EventParams{
Owner: am.Spec.AppOwner,
Name: am.Spec.AppName,
OpType: string(am.Status.OpType),
OpID: am.Status.OpID,
State: am.Status.State.String(),
Progress: am.Status.Progress,
RawAppName: am.Spec.RawAppName,
Type: am.Spec.Type.String(),
Title: apputils.AppTitle(am.Spec.Config),
Reason: am.Status.Reason,
Message: am.Status.Message,
})
}
func (r *ApplicationManagerController) loadStatefulAppAndReconcile(ctx context.Context, name string) (appstate.StatefulApp, error) {
statefulApp, err := LoadStatefulApp(ctx, r, name)
if err != nil {

View File

@@ -8,6 +8,7 @@ import (
"github.com/beclab/Olares/framework/app-service/pkg/utils"
"github.com/nats-io/nats.go"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/rest"
@@ -54,6 +55,8 @@ type NodeAlertController struct {
// lastPressureState tracks the last known pressure state for each node and pressure type
lastPressureState map[string]bool
mutex sync.RWMutex
NatsConn *nats.Conn
natsConnMux sync.Mutex
}
// SetupWithManager sets up the controller with the Manager.
@@ -245,5 +248,31 @@ func (r *NodeAlertController) sendNodeAlert(nodeName string, pressureType NodePr
// publishToNats publishes a message to the specified NATS subject
func (r *NodeAlertController) publishToNats(subject string, data interface{}) error {
return utils.PublishToNats(subject, data)
if err := r.ensureNatsConnected(); err != nil {
return fmt.Errorf("failed to ensure NATS connection: %w", err)
}
return utils.PublishEvent(r.NatsConn, subject, data)
}
func (r *NodeAlertController) ensureNatsConnected() error {
r.natsConnMux.Lock()
defer r.natsConnMux.Unlock()
if r.NatsConn != nil && r.NatsConn.IsConnected() {
return nil
}
if r.NatsConn != nil {
r.NatsConn.Close()
}
klog.Info("NATS connection not established in NodeAlertController, attempting to connect...")
nc, err := utils.NewNatsConn()
if err != nil {
klog.Errorf("NodeAlertController failed to connect to NATS: %v", err)
return err
}
r.NatsConn = nc
klog.Info("NodeAlertController successfully connected to NATS")
return nil
}

View File

@@ -9,7 +9,6 @@ import (
appv1alpha1 "github.com/beclab/Olares/framework/app-service/api/app.bytetrade.io/v1alpha1"
"github.com/beclab/Olares/framework/app-service/pkg/appstate"
"github.com/beclab/Olares/framework/app-service/pkg/constants"
"github.com/beclab/Olares/framework/app-service/pkg/utils"
apputils "github.com/beclab/Olares/framework/app-service/pkg/utils/app"
corev1 "k8s.io/api/core/v1"
@@ -236,20 +235,6 @@ func (r *PodAbnormalSuspendAppController) trySuspendApp(ctx context.Context, own
if _, err := apputils.UpdateAppMgrStatus(name, status); err != nil {
return false, err
}
utils.PublishAppEvent(utils.EventParams{
Owner: am.Spec.AppOwner,
Name: am.Spec.AppName,
OpType: string(status.OpType),
OpID: opID,
State: appv1alpha1.Stopping.String(),
Progress: message,
RawAppName: am.Spec.RawAppName,
Type: am.Spec.Type.String(),
Title: apputils.AppTitle(am.Spec.Config),
Reason: reason,
Message: message,
})
klog.Infof("suspend requested for app=%s owner=%s, reason=%s", am.Spec.AppName, am.Spec.AppOwner, message)
return true, nil
}

View File

@@ -8,6 +8,7 @@ import (
"strconv"
"time"
natsevent "github.com/beclab/Olares/framework/app-service/pkg/event"
"github.com/beclab/Olares/framework/app-service/pkg/users"
"github.com/beclab/Olares/framework/app-service/pkg/users/userspace/v1"
"github.com/beclab/Olares/framework/app-service/pkg/utils"
@@ -165,7 +166,7 @@ func (r *UserController) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.
klog.Infof("update user failed %v", updateErr)
return ctrl.Result{}, updateErr
}
utils.PublishUserEvent("Delete", user.Name, user.Annotations[users.AnnotationUserDeleter])
r.publish("Delete", user.Name, user.Annotations[users.AnnotationUserDeleter])
}
return ctrl.Result{}, nil
}
@@ -299,11 +300,15 @@ func (r *UserController) handleUserCreation(ctx context.Context, user *iamv1alph
klog.Errorf("failed to update user status to Created %v", updateErr)
} else {
klog.Infof("publish user creation event.....")
utils.PublishUserEvent("Create", user.Name, user.Annotations[users.AnnotationUserCreator])
r.publish("Create", user.Name, user.Annotations[users.AnnotationUserCreator])
}
return ctrl.Result{}, updateErr
}
func (r *UserController) publish(topic, user, operator string) {
natsevent.PublishUserEventToQueue(topic, user, operator)
}
func (r *UserController) checkResource(user *iamv1alpha2.User) error {
metrics, _, err := apputils.GetClusterResource("")
if err != nil {

View File

@@ -9,7 +9,6 @@ import (
"github.com/beclab/Olares/framework/app-service/pkg/apiserver/api"
"github.com/beclab/Olares/framework/app-service/pkg/appstate"
"github.com/beclab/Olares/framework/app-service/pkg/constants"
"github.com/beclab/Olares/framework/app-service/pkg/utils"
apputils "github.com/beclab/Olares/framework/app-service/pkg/utils/app"
"github.com/emicklei/go-restful/v3"
@@ -74,21 +73,11 @@ func (h *Handler) appApplyEnv(req *restful.Request, resp *restful.Response) {
UpdateTime: &now,
}
am, err := apputils.UpdateAppMgrStatus(appMgrName, status)
_, err = apputils.UpdateAppMgrStatus(appMgrName, status)
if err != nil {
api.HandleError(resp, req, err)
return
}
utils.PublishAppEvent(utils.EventParams{
Owner: am.Spec.AppOwner,
Name: am.Spec.AppName,
OpType: string(am.Status.OpType),
OpID: opID,
State: appv1alpha1.ApplyingEnv.String(),
RawAppName: am.Spec.RawAppName,
Type: am.Spec.Type.String(),
Title: apputils.AppTitle(am.Spec.Config),
})
resp.WriteEntity(api.Response{Code: 200})
}

View File

@@ -9,8 +9,8 @@ import (
"github.com/beclab/Olares/framework/app-service/pkg/apiserver/api"
"github.com/beclab/Olares/framework/app-service/pkg/appstate"
"github.com/beclab/Olares/framework/app-service/pkg/constants"
"github.com/beclab/Olares/framework/app-service/pkg/utils"
apputils "github.com/beclab/Olares/framework/app-service/pkg/utils/app"
"github.com/emicklei/go-restful/v3"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
@@ -82,21 +82,11 @@ func (h *Handler) cancel(req *restful.Request, resp *restful.Response) {
api.HandleError(resp, req, err)
return
}
a, err := apputils.UpdateAppMgrStatus(name, status)
_, err = apputils.UpdateAppMgrStatus(name, status)
if err != nil {
api.HandleError(resp, req, err)
return
}
utils.PublishAppEvent(utils.EventParams{
Owner: a.Spec.AppOwner,
Name: a.Spec.AppName,
OpType: string(a.Status.OpType),
OpID: opID,
State: cancelState.String(),
RawAppName: a.Spec.RawAppName,
Type: a.Spec.Type.String(),
Title: apputils.AppTitle(a.Spec.Config),
})
resp.WriteAsJson(api.InstallationResponse{
Response: api.Response{Code: 200},

View File

@@ -9,8 +9,6 @@ import (
"slices"
"strconv"
"github.com/beclab/Olares/framework/app-service/pkg/users/userspace"
"github.com/beclab/Olares/framework/app-service/api/app.bytetrade.io/v1alpha1"
sysv1alpha1 "github.com/beclab/Olares/framework/app-service/api/sys.bytetrade.io/v1alpha1"
"github.com/beclab/Olares/framework/app-service/pkg/apiserver/api"
@@ -19,6 +17,7 @@ import (
"github.com/beclab/Olares/framework/app-service/pkg/constants"
"github.com/beclab/Olares/framework/app-service/pkg/generated/clientset/versioned"
"github.com/beclab/Olares/framework/app-service/pkg/kubesphere"
"github.com/beclab/Olares/framework/app-service/pkg/users/userspace"
"github.com/beclab/Olares/framework/app-service/pkg/utils"
apputils "github.com/beclab/Olares/framework/app-service/pkg/utils/app"
"github.com/beclab/Olares/framework/app-service/pkg/utils/config"
@@ -611,23 +610,12 @@ func (h *installHandlerHelper) applyApplicationManager(marketSource string) (opI
UpdateTime: &now,
OpTime: &now,
}
a, err = apputils.UpdateAppMgrStatus(name, status)
_, err = apputils.UpdateAppMgrStatus(name, status)
if err != nil {
api.HandleError(h.resp, h.req, err)
return
}
utils.PublishAppEvent(utils.EventParams{
Owner: a.Spec.AppOwner,
Name: a.Spec.AppName,
OpType: string(a.Status.OpType),
OpID: opID,
State: v1alpha1.Pending.String(),
RawAppName: a.Spec.RawAppName,
Type: a.Spec.Type.String(),
Title: apputils.AppTitle(a.Spec.Config),
})
return
}

View File

@@ -11,8 +11,8 @@ import (
"github.com/beclab/Olares/framework/app-service/pkg/appstate"
"github.com/beclab/Olares/framework/app-service/pkg/constants"
"github.com/beclab/Olares/framework/app-service/pkg/users/userspace"
"github.com/beclab/Olares/framework/app-service/pkg/utils"
apputils "github.com/beclab/Olares/framework/app-service/pkg/utils/app"
"github.com/emicklei/go-restful/v3"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
@@ -94,21 +94,11 @@ func (h *Handler) uninstall(req *restful.Request, resp *restful.Response) {
UpdateTime: &now,
}
a, err := apputils.UpdateAppMgrStatus(name, status)
_, err = apputils.UpdateAppMgrStatus(name, status)
if err != nil {
api.HandleError(resp, req, err)
return
}
utils.PublishAppEvent(utils.EventParams{
Owner: a.Spec.AppOwner,
Name: a.Spec.AppName,
OpType: string(a.Status.OpType),
OpID: opID,
State: v1alpha1.Uninstalling.String(),
RawAppName: a.Spec.RawAppName,
Type: a.Spec.Type.String(),
Title: apputils.AppTitle(a.Spec.Config),
})
resp.WriteEntity(api.InstallationResponse{
Response: api.Response{Code: 200},

View File

@@ -384,21 +384,11 @@ func (h *Handler) appUpgrade(req *restful.Request, resp *restful.Response) {
UpdateTime: &now,
}
am, err := apputils.UpdateAppMgrStatus(appMgrName, status)
_, err = apputils.UpdateAppMgrStatus(appMgrName, status)
if err != nil {
api.HandleError(resp, req, err)
return
}
utils.PublishAppEvent(utils.EventParams{
Owner: am.Spec.AppOwner,
Name: am.Spec.AppName,
OpType: string(am.Status.OpType),
OpID: opID,
State: appv1alpha1.Upgrading.String(),
RawAppName: am.Spec.RawAppName,
Type: am.Spec.Type.String(),
Title: apputils.AppTitle(am.Spec.Config),
})
resp.WriteEntity(api.InstallationResponse{
Response: api.Response{Code: 200},

View File

@@ -150,7 +150,6 @@ func (h *Handler) installMiddleware(req *restful.Request, resp *restful.Response
if err != nil {
klog.Errorf("Failed to uninstall middleware err=%v", err)
}
utils.PublishMiddlewareEvent(owner, app, string(v1alpha1.InstallOp), opID, v1alpha1.InstallFailed.String(), "", nil, "")
}
}
}()
@@ -172,7 +171,6 @@ func (h *Handler) installMiddleware(req *restful.Request, resp *restful.Response
api.HandleError(resp, req, err)
return
}
utils.PublishMiddlewareEvent(owner, app, string(v1alpha1.InstallOp), opID, v1alpha1.Installing.String(), "", nil, "")
klog.Infof("Start to install middleware name=%v", middlewareConfig.MiddlewareName)
err = middlewareinstaller.Install(req.Request.Context(), h.kubeConfig, middlewareConfig)
@@ -226,7 +224,7 @@ func (h *Handler) installMiddleware(req *restful.Request, resp *restful.Response
klog.Infof("Failed to update status err=%v", err)
return
}
utils.PublishMiddlewareEvent(owner, app, string(v1alpha1.CancelOp), opID, v1alpha1.InstallingCanceled.String(), "", nil, "")
return
}
klog.Infof("ticker get middleware status")
@@ -245,8 +243,6 @@ func (h *Handler) installMiddleware(req *restful.Request, resp *restful.Response
e := apputils.UpdateStatus(a, opRecord.Status, &opRecord, opRecord.Message)
if e != nil {
klog.Error(e)
} else {
utils.PublishMiddlewareEvent(owner, app, string(v1alpha1.InstallOp), opID, opRecord.Status.String(), "", nil, "")
}
delete(middlewareManager, name)
return
@@ -309,7 +305,6 @@ func (h *Handler) uninstallMiddleware(req *restful.Request, resp *restful.Respon
api.HandleError(resp, req, err)
return
}
utils.PublishMiddlewareEvent(owner, app, string(v1alpha1.UninstallOp), opID, v1alpha1.Uninstalling.String(), "", nil, "")
now = metav1.Now()
opRecord := v1alpha1.OpRecord{
@@ -327,8 +322,6 @@ func (h *Handler) uninstallMiddleware(req *restful.Request, resp *restful.Respon
e := apputils.UpdateStatus(mgr, v1alpha1.UninstallFailed, &opRecord, opRecord.Message)
if e != nil {
klog.Errorf("Failed to update applicationmanager status in uninstall middleware name=%s err=%v", mgr.Name, e)
} else {
utils.PublishMiddlewareEvent(owner, app, string(v1alpha1.UninstallOp), opID, v1alpha1.UninstallFailed.String(), "", nil, "")
}
}
}()
@@ -344,7 +337,6 @@ func (h *Handler) uninstallMiddleware(req *restful.Request, resp *restful.Respon
api.HandleError(resp, req, err)
return
}
utils.PublishMiddlewareEvent(owner, app, string(v1alpha1.Uninstalled), opID, v1alpha1.Uninstalled.String(), "", nil, "")
resp.WriteEntity(api.InstallationResponse{
Response: api.Response{Code: 200},
@@ -390,7 +382,6 @@ func (h *Handler) cancelMiddleware(req *restful.Request, resp *restful.Response)
api.HandleError(resp, req, err)
return
}
utils.PublishMiddlewareEvent(owner, app, string(v1alpha1.CancelOp), opID, v1alpha1.InstallingCanceling.String(), "", nil, "")
resp.WriteAsJson(api.InstallationResponse{
Response: api.Response{Code: 200},

View File

@@ -19,7 +19,6 @@ import (
"github.com/beclab/Olares/framework/app-service/pkg/kubesphere"
"github.com/beclab/Olares/framework/app-service/pkg/provider"
"github.com/beclab/Olares/framework/app-service/pkg/tapr"
"github.com/beclab/Olares/framework/app-service/pkg/utils"
apputils "github.com/beclab/Olares/framework/app-service/pkg/utils/app"
"github.com/emicklei/go-restful/v3"
@@ -264,22 +263,11 @@ func (h *Handler) setupAppEntranceDomain(req *restful.Request, resp *restful.Res
UpdateTime: &now,
}
am, err := apputils.UpdateAppMgrStatus(appMgr.Name, status)
_, err = apputils.UpdateAppMgrStatus(appMgr.Name, status)
if err != nil {
api.HandleError(resp, req, err)
return
}
utils.PublishAppEvent(utils.EventParams{
Owner: am.Spec.AppOwner,
Name: am.Spec.AppName,
OpType: string(am.Status.OpType),
OpID: opID,
State: v1alpha1.Upgrading.String(),
RawAppName: am.Spec.RawAppName,
Type: am.Spec.Type.String(),
Title: apputils.AppTitle(am.Spec.Config),
Message: fmt.Sprintf("app %s was upgrade via setup domain by user %s", am.Spec.AppName, am.Spec.AppOwner),
})
}
resp.WriteAsJson(appUpdated.Spec.Settings)
}

View File

@@ -8,8 +8,6 @@ import (
"strconv"
"time"
"k8s.io/klog/v2"
"github.com/beclab/Olares/framework/app-service/api/app.bytetrade.io/v1alpha1"
"github.com/beclab/Olares/framework/app-service/pkg/apiserver/api"
"github.com/beclab/Olares/framework/app-service/pkg/appcfg"
@@ -17,12 +15,12 @@ import (
"github.com/beclab/Olares/framework/app-service/pkg/constants"
"github.com/beclab/Olares/framework/app-service/pkg/kubesphere"
"github.com/beclab/Olares/framework/app-service/pkg/users/userspace"
"github.com/beclab/Olares/framework/app-service/pkg/utils"
apputils "github.com/beclab/Olares/framework/app-service/pkg/utils/app"
"github.com/emicklei/go-restful/v3"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/klog/v2"
)
func (h *Handler) suspend(req *restful.Request, resp *restful.Response) {
@@ -77,23 +75,11 @@ func (h *Handler) suspend(req *restful.Request, resp *restful.Response) {
StatusTime: &now,
UpdateTime: &now,
}
a, err := apputils.UpdateAppMgrStatus(name, status)
_, err = apputils.UpdateAppMgrStatus(name, status)
if err != nil {
api.HandleError(resp, req, err)
return
}
utils.PublishAppEvent(utils.EventParams{
Owner: a.Spec.AppOwner,
Name: a.Spec.AppName,
OpType: string(a.Status.OpType),
OpID: opID,
State: v1alpha1.Stopping.String(),
RawAppName: a.Spec.RawAppName,
Type: a.Spec.Type.String(),
Title: apputils.AppTitle(a.Spec.Config),
Reason: constants.AppStopByUser,
Message: fmt.Sprintf("app %s was stop by user %s", a.Spec.AppName, a.Spec.AppOwner),
})
resp.WriteEntity(api.InstallationResponse{
Response: api.Response{Code: 200},
@@ -185,22 +171,11 @@ func (h *Handler) resume(req *restful.Request, resp *restful.Response) {
StatusTime: &now,
UpdateTime: &now,
}
a, err := apputils.UpdateAppMgrStatus(name, status)
_, err = apputils.UpdateAppMgrStatus(name, status)
if err != nil {
api.HandleError(resp, req, err)
return
}
utils.PublishAppEvent(utils.EventParams{
Owner: a.Spec.AppOwner,
Name: a.Spec.AppName,
OpType: string(a.Status.OpType),
OpID: opID,
State: v1alpha1.Resuming.String(),
RawAppName: a.Spec.RawAppName,
Type: a.Spec.Type.String(),
Title: apputils.AppTitle(a.Spec.Config),
Message: fmt.Sprintf("app %s was resume by user %s", a.Spec.AppName, a.Spec.AppOwner),
})
resp.WriteEntity(api.InstallationResponse{
Response: api.Response{Code: 200},

View File

@@ -15,7 +15,6 @@ import (
"github.com/beclab/Olares/framework/app-service/pkg/constants"
"github.com/beclab/Olares/framework/app-service/pkg/kubesphere"
"github.com/beclab/Olares/framework/app-service/pkg/provider"
"github.com/beclab/Olares/framework/app-service/pkg/users"
"github.com/beclab/Olares/framework/app-service/pkg/users/userspace"
"github.com/beclab/Olares/framework/app-service/pkg/utils"
@@ -1088,14 +1087,13 @@ func (h *Handler) applicationManagerMutate(req *restful.Request, resp *restful.R
if !ok {
return
}
var pam *v1alpha1.ApplicationManager
var admissionReq, admissionResp admissionv1.AdmissionReview
proxyUUID := uuid.New()
if _, _, err := webhook.Deserializer.Decode(admissionRequestBody, nil, &admissionReq); err != nil {
klog.Errorf("Failed to decode admission request body err=%v", err)
admissionResp.Response = h.sidecarWebhook.AdmissionError("", err)
} else {
admissionResp.Response, pam = h.applicationManagerInject(req.Request.Context(), admissionReq.Request, proxyUUID)
admissionResp.Response, _ = h.applicationManagerInject(req.Request.Context(), admissionReq.Request, proxyUUID)
}
admissionResp.TypeMeta = admissionReq.TypeMeta
admissionResp.Kind = admissionReq.Kind
@@ -1109,18 +1107,6 @@ func (h *Handler) applicationManagerMutate(req *restful.Request, resp *restful.R
klog.Errorf("Failed to write response[application-manager inject] admin review in namespace=%s err=%v", requestForNamespace, err)
return
}
if pam != nil {
utils.PublishAppEvent(utils.EventParams{
Owner: pam.Spec.AppOwner,
Name: pam.Spec.AppName,
OpType: string(pam.Spec.OpType),
OpID: pam.Status.OpID,
State: pam.Status.State.String(),
RawAppName: pam.Spec.RawAppName,
Type: pam.Spec.Type.String(),
Title: apputils.AppTitle(pam.Spec.Config),
})
}
klog.Infof("Done[application-manager inject] with uuid=%s in namespace=%s", proxyUUID, requestForNamespace)
}

View File

@@ -130,15 +130,6 @@ func (p *InstallingApp) Exec(ctx context.Context) (StatefulInProgressApp, error)
return
} // end of err != nil
p.finally = func() {
klog.Infof("app %s install successfully, update app state to initializing", p.manager.Spec.AppName)
updateErr := p.updateStatus(context.TODO(), p.manager, appsv1.Initializing, nil, appsv1.Initializing.String(), "")
if updateErr != nil {
klog.Errorf("update status failed %v", updateErr)
return
}
}
if p.manager.Spec.Type == appsv1.Middleware {
ok, err := ops.WaitForLaunch()
if !ok {
@@ -165,6 +156,16 @@ func (p *InstallingApp) Exec(ctx context.Context) (StatefulInProgressApp, error)
return
}
}
} else {
p.finally = func() {
klog.Infof("app %s install successfully, update app state to initializing", p.manager.Spec.AppName)
updateErr := p.updateStatus(context.TODO(), p.manager, appsv1.Initializing, nil, appsv1.Initializing.String(), "")
if updateErr != nil {
klog.Errorf("update status failed %v", updateErr)
return
}
}
}
}()

View File

@@ -6,7 +6,6 @@ import (
appsv1 "github.com/beclab/Olares/framework/app-service/api/app.bytetrade.io/v1alpha1"
"github.com/beclab/Olares/framework/app-service/pkg/constants"
appevent "github.com/beclab/Olares/framework/app-service/pkg/event"
"github.com/beclab/Olares/framework/app-service/pkg/helm"
"github.com/beclab/Olares/framework/app-service/pkg/utils"
apputils "github.com/beclab/Olares/framework/app-service/pkg/utils/app"
@@ -100,17 +99,6 @@ func (p *PendingApp) Exec(ctx context.Context) (StatefulInProgressApp, error) {
klog.Error("update app manager status error, ", err, ", ", p.manager.Name)
return err
}
appevent.PublishAppEventToQueue(utils.EventParams{
Owner: p.manager.Spec.AppOwner,
Name: p.manager.Spec.AppName,
OpType: string(p.manager.Spec.OpType),
OpID: p.manager.Status.OpID,
State: appsv1.Downloading.String(),
RawAppName: p.manager.Spec.RawAppName,
Type: p.manager.Spec.Type.String(),
Title: apputils.AppTitle(p.manager.Spec.Config),
})
return nil
},
); err != nil {

View File

@@ -11,9 +11,7 @@ import (
"github.com/beclab/Olares/framework/app-service/pkg/appcfg"
"github.com/beclab/Olares/framework/app-service/pkg/appinstaller"
"github.com/beclab/Olares/framework/app-service/pkg/appinstaller/versioned"
appevent "github.com/beclab/Olares/framework/app-service/pkg/event"
"github.com/beclab/Olares/framework/app-service/pkg/middlewareinstaller"
"github.com/beclab/Olares/framework/app-service/pkg/utils"
apputils "github.com/beclab/Olares/framework/app-service/pkg/utils/app"
corev1 "k8s.io/api/core/v1"
@@ -82,19 +80,6 @@ func (b *baseStatefulApp) updateStatus(ctx context.Context, am *appsv1.Applicati
klog.Errorf("patch appmgr's %s status failed %v", am.Name, err)
return err
}
appevent.PublishAppEventToQueue(utils.EventParams{
Owner: b.manager.Spec.AppOwner,
Name: b.manager.Spec.AppName,
OpType: string(b.manager.Spec.OpType),
OpID: b.manager.Status.OpID,
State: state.String(),
RawAppName: b.manager.Spec.RawAppName,
Type: b.manager.Spec.Type.String(),
Title: apputils.AppTitle(b.manager.Spec.Config),
Reason: reason,
Message: message,
})
return nil
}

View File

@@ -3,10 +3,12 @@ package event
import (
"context"
"fmt"
"sync"
"time"
"github.com/beclab/Olares/framework/app-service/pkg/utils"
"github.com/nats-io/nats.go"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/util/workqueue"
@@ -16,8 +18,10 @@ import (
var AppEventQueue *QueuedEventController
type QueuedEventController struct {
wq workqueue.RateLimitingInterface
ctx context.Context
wq workqueue.RateLimitingInterface
ctx context.Context
nc *nats.Conn
ncMux sync.Mutex
}
type QueueEvent struct {
@@ -25,6 +29,17 @@ type QueueEvent struct {
Data interface{}
}
type UserEvent struct {
Topic string `json:"topic"`
Payload Payload `json:"payload"`
}
type Payload struct {
User string `json:"user"`
Operator string `json:"operator"`
Timestamp time.Time `json:"timestamp"`
}
func (qe *QueuedEventController) processNextWorkItem() bool {
obj, shutdown := qe.wq.Get()
if shutdown {
@@ -41,11 +56,19 @@ func (qe *QueuedEventController) process(obj interface{}) {
if !ok {
return
}
err := utils.PublishToNats(eobj.Subject, eobj.Data)
if err != nil {
klog.Errorf("async publish subject %s,data %v, failed %v", eobj.Subject, eobj.Data, err)
} else {
klog.Infof("publish event success data: %#v", eobj.Data)
for {
err := qe.publish(eobj.Subject, eobj.Data)
if err == nil {
klog.Infof("publish event success data: %#v", eobj.Data)
return
}
klog.Errorf("publish subject %s, data %v failed: %v", eobj.Subject, eobj.Data, err)
select {
case <-qe.ctx.Done():
return
case <-time.After(time.Second):
}
}
}
@@ -68,9 +91,46 @@ func (qe *QueuedEventController) enqueue(obj interface{}) {
qe.wq.Add(obj)
}
func NewAppEventQueue(ctx context.Context) *QueuedEventController {
func (qe *QueuedEventController) publish(subject string, data interface{}) error {
if err := qe.ensureNatsConnected(); err != nil {
return fmt.Errorf("failed to ensure NATS connection: %w", err)
}
return utils.PublishEvent(qe.nc, subject, data)
}
func (qe *QueuedEventController) ensureNatsConnected() error {
qe.ncMux.Lock()
defer qe.ncMux.Unlock()
if qe.nc != nil && qe.nc.IsConnected() {
return nil
}
if qe.nc != nil {
qe.nc.Close()
}
klog.Info("NATS connection not established, attempting to connect...")
nc, err := utils.NewNatsConn()
if err != nil {
klog.Errorf("Failed to connect to NATS: %v", err)
return err
}
qe.nc = nc
klog.Info("Successfully connected to NATS")
return nil
}
func (qe *QueuedEventController) GetNatsConn() *nats.Conn {
qe.ncMux.Lock()
defer qe.ncMux.Unlock()
return qe.nc
}
func NewAppEventQueue(ctx context.Context, nc *nats.Conn) *QueuedEventController {
return &QueuedEventController{
ctx: ctx,
nc: nc,
wq: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "app-event-queue"),
}
}
@@ -110,3 +170,16 @@ func PublishAppEventToQueue(p utils.EventParams) {
AppEventQueue.enqueue(&QueueEvent{Subject: subject, Data: data})
}
func PublishUserEventToQueue(topic, user, operator string) {
subject := "os.users"
data := UserEvent{
Topic: topic,
Payload: Payload{
User: user,
Operator: operator,
Timestamp: time.Now(),
},
}
AppEventQueue.enqueue(&QueueEvent{Subject: subject, Data: data})
}

View File

@@ -3,7 +3,6 @@ package utils
import (
"bytes"
"context"
"fmt"
"io"
"io/ioutil"
"os"
@@ -12,6 +11,7 @@ import (
"github.com/beclab/Olares/framework/app-service/api/app.bytetrade.io/v1alpha1"
"github.com/beclab/Olares/framework/app-service/pkg/constants"
refdocker "github.com/containerd/containerd/reference/docker"
"github.com/pkg/errors"
"helm.sh/helm/v3/pkg/action"
"helm.sh/helm/v3/pkg/chart"
helmLoader "helm.sh/helm/v3/pkg/chart/loader"
@@ -182,7 +182,7 @@ func GetResourceListFromChart(chartPath string, values map[string]interface{}) (
if err == io.EOF {
return resources, nil
}
return nil, fmt.Errorf("error parsing")
return nil, errors.Wrap(err, "error parsing")
}
ext.Raw = bytes.TrimSpace(ext.Raw)
if len(ext.Raw) == 0 || bytes.Equal(ext.Raw, []byte("null")) {

View File

@@ -47,96 +47,13 @@ type EventParams struct {
SharedEntrances []v1alpha1.Entrance
}
type UserEvent struct {
Topic string `json:"topic"`
Payload Payload `json:"payload"`
func PublishEvent(nc *nats.Conn, subject string, data interface{}) error {
return publish(nc, subject, data)
}
type Payload struct {
User string `json:"user"`
Operator string `json:"operator"`
Timestamp time.Time `json:"timestamp"`
}
func PublishUserEvent(topic, user, operator string) {
subject := "os.users"
data := UserEvent{
Topic: topic,
Payload: Payload{
User: user,
Operator: operator,
Timestamp: time.Now(),
},
}
if err := publish(subject, data); err != nil {
klog.Errorf("async publish subject %s,data %v, failed %v", subject, data, err)
} else {
t, _ := json.Marshal(data)
klog.Infof("publish user event success. data: %v", string(t))
}
}
func PublishAppEvent(p EventParams) {
subject := fmt.Sprintf("os.application.%s", p.Owner)
now := time.Now()
data := Event{
EventID: fmt.Sprintf("%s-%s-%d", p.Owner, p.Name, now.UnixMilli()),
CreateTime: now,
Name: p.Name,
Type: func() string {
if p.Type == "" {
return "app"
}
return p.Type
}(),
OpType: p.OpType,
OpID: p.OpID,
State: p.State,
Progress: p.Progress,
User: p.Owner,
RawAppName: func() string {
if p.RawAppName == "" {
return p.Name
}
return p.RawAppName
}(),
Title: p.Title,
Reason: p.Reason,
Message: p.Message,
}
if len(p.EntranceStatuses) > 0 {
data.EntranceStatuses = p.EntranceStatuses
}
if err := publish(subject, data); err != nil {
klog.Errorf("async publish subject %s,data %v, failed %v", subject, data, err)
} else {
klog.Infof("publish event success data: %#v", data)
}
}
func PublishToNats(subject string, data interface{}) error {
return publish(subject, data)
}
func publish(subject string, data interface{}) error {
natsHost := os.Getenv("NATS_HOST")
natsPort := os.Getenv("NATS_PORT")
username := os.Getenv("NATS_USERNAME")
password := os.Getenv("NATS_PASSWORD")
natsURL := fmt.Sprintf("nats://%s:%s", natsHost, natsPort)
nc, err := nats.Connect(natsURL, nats.UserInfo(username, password))
if err != nil {
klog.Infof("connect error: err=%v", err)
return err
}
defer nc.Drain()
func publish(nc *nats.Conn, subject string, data interface{}) error {
d, err := json.Marshal(data)
if err != nil {
klog.Errorf("marshal failed: %v", err)
return err
}
err = nc.Publish(subject, d)
@@ -147,34 +64,38 @@ func publish(subject string, data interface{}) error {
return nil
}
func PublishMiddlewareEvent(owner, name, opType, opID, state, progress string, entranceStatuses []v1alpha1.EntranceStatus, rawAppName string) {
subject := fmt.Sprintf("os.application.%s", owner)
func NewNatsConn() (*nats.Conn, error) {
natsHost := os.Getenv("NATS_HOST")
natsPort := os.Getenv("NATS_PORT")
username := os.Getenv("NATS_USERNAME")
password := os.Getenv("NATS_PASSWORD")
now := time.Now()
data := Event{
EventID: fmt.Sprintf("%s-%s-%d", owner, name, now.UnixMilli()),
CreateTime: now,
Name: name,
Type: "middleware",
OpType: opType,
OpID: opID,
State: state,
Progress: progress,
User: owner,
RawAppName: func() string {
if rawAppName == "" {
return name
natsURL := fmt.Sprintf("nats://%s:%s", natsHost, natsPort)
opts := []nats.Option{
nats.UserInfo(username, password),
nats.MaxReconnects(-1),
nats.ReconnectWait(2 * time.Second),
nats.DisconnectErrHandler(func(nc *nats.Conn, err error) {
if err != nil {
klog.Warningf("NATS disconnected: %v, will attempt to reconnect", err)
} else {
klog.Infof("NATS disconnected, will attempt to reconnect")
}
return rawAppName
}(),
}
if len(entranceStatuses) > 0 {
data.EntranceStatuses = entranceStatuses
}),
nats.ReconnectHandler(func(nc *nats.Conn) {
klog.Infof("NATS reconnected to %s", nc.ConnectedUrl())
}),
nats.ClosedHandler(func(nc *nats.Conn) {
klog.Errorf("NATS connection closed permanently: %v", nc.LastError())
}),
}
if err := publish(subject, data); err != nil {
klog.Errorf("async publish subject %s,data %v, failed %v", subject, data, err)
} else {
klog.Infof("publish event success data: %#v", data)
nc, err := nats.Connect(natsURL, opts...)
if err != nil {
klog.Errorf("failed to connect to NATS: %v", err)
return nil, err
}
klog.Infof("connected to NATS at %s", natsURL)
return nc, nil
}