Compare commits
5 Commits
module-l4-
...
module-app
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d8d0078bfd | ||
|
|
ffbcb13ae5 | ||
|
|
f3eef1f115 | ||
|
|
42c70afad5 | ||
|
|
7273577f4f |
@@ -170,7 +170,7 @@ spec:
|
||||
priorityClassName: "system-cluster-critical"
|
||||
containers:
|
||||
- name: app-service
|
||||
image: beclab/app-service:0.4.73
|
||||
image: beclab/app-service:0.4.74
|
||||
imagePullPolicy: IfNotPresent
|
||||
securityContext:
|
||||
runAsUser: 0
|
||||
|
||||
@@ -138,10 +138,16 @@ func main() {
|
||||
setupLog.Error(err, "Unable to create controller", "controller", "Security")
|
||||
os.Exit(1)
|
||||
}
|
||||
appEventQueue := appevent.NewAppEventQueue(ictx)
|
||||
appEventQueue := appevent.NewAppEventQueue(ictx, nil)
|
||||
appevent.SetAppEventQueue(appEventQueue)
|
||||
go appEventQueue.Run()
|
||||
|
||||
defer func() {
|
||||
if nc := appEventQueue.GetNatsConn(); nc != nil {
|
||||
nc.Drain()
|
||||
}
|
||||
}()
|
||||
|
||||
if err = (&controllers.ApplicationManagerController{
|
||||
Client: mgr.GetClient(),
|
||||
KubeConfig: config,
|
||||
@@ -198,6 +204,7 @@ func main() {
|
||||
if err = (&controllers.NodeAlertController{
|
||||
Client: mgr.GetClient(),
|
||||
KubeConfig: config,
|
||||
NatsConn: nil,
|
||||
}).SetupWithManager(mgr); err != nil {
|
||||
setupLog.Error(err, "Unable to create controller", "controller", "NodeAlert")
|
||||
os.Exit(1)
|
||||
|
||||
@@ -6,13 +6,13 @@ import (
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/constants"
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/utils"
|
||||
|
||||
appv1alpha1 "github.com/beclab/Olares/framework/app-service/api/app.bytetrade.io/v1alpha1"
|
||||
sysv1alpha1 "github.com/beclab/Olares/framework/app-service/api/sys.bytetrade.io/v1alpha1"
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/appstate"
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/constants"
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/utils"
|
||||
apputils "github.com/beclab/Olares/framework/app-service/pkg/utils/app"
|
||||
|
||||
coordinationv1 "k8s.io/api/coordination/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
@@ -218,20 +218,10 @@ func (r *AppEnvController) triggerApplyEnv(ctx context.Context, appEnv *sysv1alp
|
||||
UpdateTime: &now,
|
||||
}
|
||||
|
||||
am, err := apputils.UpdateAppMgrStatus(targetAppMgr.Name, status)
|
||||
_, err = apputils.UpdateAppMgrStatus(targetAppMgr.Name, status)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to update ApplicationManager Status: %v", err)
|
||||
}
|
||||
utils.PublishAppEvent(utils.EventParams{
|
||||
Owner: am.Spec.AppOwner,
|
||||
Name: am.Spec.AppName,
|
||||
OpType: string(am.Status.OpType),
|
||||
OpID: opID,
|
||||
State: appv1alpha1.ApplyingEnv.String(),
|
||||
RawAppName: am.Spec.RawAppName,
|
||||
Type: am.Spec.Type.String(),
|
||||
Title: apputils.AppTitle(am.Spec.Config),
|
||||
})
|
||||
|
||||
klog.Infof("Successfully triggered ApplyEnv for app: %s owner: %s", appEnv.AppName, appEnv.AppOwner)
|
||||
return nil
|
||||
|
||||
@@ -7,7 +7,10 @@ import (
|
||||
|
||||
appv1alpha1 "github.com/beclab/Olares/framework/app-service/api/app.bytetrade.io/v1alpha1"
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/appstate"
|
||||
appevent "github.com/beclab/Olares/framework/app-service/pkg/event"
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/images"
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/utils"
|
||||
apputils "github.com/beclab/Olares/framework/app-service/pkg/utils/app"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/client-go/rest"
|
||||
|
||||
@@ -167,6 +170,11 @@ func (r *ApplicationManagerController) preEnqueueCheckForUpdate(old, new client.
|
||||
if curAppMgr.Spec.Type != appv1alpha1.App && curAppMgr.Spec.Type != appv1alpha1.Middleware {
|
||||
return false
|
||||
}
|
||||
|
||||
if oldAppMgr.Status.State != curAppMgr.Status.State {
|
||||
r.publishStateChangeEvent(curAppMgr)
|
||||
}
|
||||
|
||||
if curAppMgr.Status.OpGeneration <= oldAppMgr.Status.OpGeneration {
|
||||
return false
|
||||
}
|
||||
@@ -174,6 +182,22 @@ func (r *ApplicationManagerController) preEnqueueCheckForUpdate(old, new client.
|
||||
return true
|
||||
}
|
||||
|
||||
func (r *ApplicationManagerController) publishStateChangeEvent(am *appv1alpha1.ApplicationManager) {
|
||||
appevent.PublishAppEventToQueue(utils.EventParams{
|
||||
Owner: am.Spec.AppOwner,
|
||||
Name: am.Spec.AppName,
|
||||
OpType: string(am.Status.OpType),
|
||||
OpID: am.Status.OpID,
|
||||
State: am.Status.State.String(),
|
||||
Progress: am.Status.Progress,
|
||||
RawAppName: am.Spec.RawAppName,
|
||||
Type: am.Spec.Type.String(),
|
||||
Title: apputils.AppTitle(am.Spec.Config),
|
||||
Reason: am.Status.Reason,
|
||||
Message: am.Status.Message,
|
||||
})
|
||||
}
|
||||
|
||||
func (r *ApplicationManagerController) loadStatefulAppAndReconcile(ctx context.Context, name string) (appstate.StatefulApp, error) {
|
||||
statefulApp, err := LoadStatefulApp(ctx, r, name)
|
||||
if err != nil {
|
||||
|
||||
@@ -8,6 +8,7 @@ import (
|
||||
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/utils"
|
||||
|
||||
"github.com/nats-io/nats.go"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/client-go/rest"
|
||||
@@ -54,6 +55,8 @@ type NodeAlertController struct {
|
||||
// lastPressureState tracks the last known pressure state for each node and pressure type
|
||||
lastPressureState map[string]bool
|
||||
mutex sync.RWMutex
|
||||
NatsConn *nats.Conn
|
||||
natsConnMux sync.Mutex
|
||||
}
|
||||
|
||||
// SetupWithManager sets up the controller with the Manager.
|
||||
@@ -245,5 +248,31 @@ func (r *NodeAlertController) sendNodeAlert(nodeName string, pressureType NodePr
|
||||
|
||||
// publishToNats publishes a message to the specified NATS subject
|
||||
func (r *NodeAlertController) publishToNats(subject string, data interface{}) error {
|
||||
return utils.PublishToNats(subject, data)
|
||||
if err := r.ensureNatsConnected(); err != nil {
|
||||
return fmt.Errorf("failed to ensure NATS connection: %w", err)
|
||||
}
|
||||
return utils.PublishEvent(r.NatsConn, subject, data)
|
||||
}
|
||||
|
||||
func (r *NodeAlertController) ensureNatsConnected() error {
|
||||
r.natsConnMux.Lock()
|
||||
defer r.natsConnMux.Unlock()
|
||||
|
||||
if r.NatsConn != nil && r.NatsConn.IsConnected() {
|
||||
return nil
|
||||
}
|
||||
if r.NatsConn != nil {
|
||||
r.NatsConn.Close()
|
||||
}
|
||||
|
||||
klog.Info("NATS connection not established in NodeAlertController, attempting to connect...")
|
||||
nc, err := utils.NewNatsConn()
|
||||
if err != nil {
|
||||
klog.Errorf("NodeAlertController failed to connect to NATS: %v", err)
|
||||
return err
|
||||
}
|
||||
|
||||
r.NatsConn = nc
|
||||
klog.Info("NodeAlertController successfully connected to NATS")
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -9,7 +9,6 @@ import (
|
||||
appv1alpha1 "github.com/beclab/Olares/framework/app-service/api/app.bytetrade.io/v1alpha1"
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/appstate"
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/constants"
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/utils"
|
||||
apputils "github.com/beclab/Olares/framework/app-service/pkg/utils/app"
|
||||
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
@@ -236,20 +235,6 @@ func (r *PodAbnormalSuspendAppController) trySuspendApp(ctx context.Context, own
|
||||
if _, err := apputils.UpdateAppMgrStatus(name, status); err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
utils.PublishAppEvent(utils.EventParams{
|
||||
Owner: am.Spec.AppOwner,
|
||||
Name: am.Spec.AppName,
|
||||
OpType: string(status.OpType),
|
||||
OpID: opID,
|
||||
State: appv1alpha1.Stopping.String(),
|
||||
Progress: message,
|
||||
RawAppName: am.Spec.RawAppName,
|
||||
Type: am.Spec.Type.String(),
|
||||
Title: apputils.AppTitle(am.Spec.Config),
|
||||
Reason: reason,
|
||||
Message: message,
|
||||
})
|
||||
klog.Infof("suspend requested for app=%s owner=%s, reason=%s", am.Spec.AppName, am.Spec.AppOwner, message)
|
||||
return true, nil
|
||||
}
|
||||
|
||||
@@ -8,6 +8,7 @@ import (
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
natsevent "github.com/beclab/Olares/framework/app-service/pkg/event"
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/users"
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/users/userspace/v1"
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/utils"
|
||||
@@ -165,7 +166,7 @@ func (r *UserController) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.
|
||||
klog.Infof("update user failed %v", updateErr)
|
||||
return ctrl.Result{}, updateErr
|
||||
}
|
||||
utils.PublishUserEvent("Delete", user.Name, user.Annotations[users.AnnotationUserDeleter])
|
||||
r.publish("Delete", user.Name, user.Annotations[users.AnnotationUserDeleter])
|
||||
}
|
||||
return ctrl.Result{}, nil
|
||||
}
|
||||
@@ -299,11 +300,15 @@ func (r *UserController) handleUserCreation(ctx context.Context, user *iamv1alph
|
||||
klog.Errorf("failed to update user status to Created %v", updateErr)
|
||||
} else {
|
||||
klog.Infof("publish user creation event.....")
|
||||
utils.PublishUserEvent("Create", user.Name, user.Annotations[users.AnnotationUserCreator])
|
||||
r.publish("Create", user.Name, user.Annotations[users.AnnotationUserCreator])
|
||||
}
|
||||
return ctrl.Result{}, updateErr
|
||||
}
|
||||
|
||||
func (r *UserController) publish(topic, user, operator string) {
|
||||
natsevent.PublishUserEventToQueue(topic, user, operator)
|
||||
}
|
||||
|
||||
func (r *UserController) checkResource(user *iamv1alpha2.User) error {
|
||||
metrics, _, err := apputils.GetClusterResource("")
|
||||
if err != nil {
|
||||
|
||||
@@ -9,7 +9,6 @@ import (
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/apiserver/api"
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/appstate"
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/constants"
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/utils"
|
||||
apputils "github.com/beclab/Olares/framework/app-service/pkg/utils/app"
|
||||
|
||||
"github.com/emicklei/go-restful/v3"
|
||||
@@ -74,21 +73,11 @@ func (h *Handler) appApplyEnv(req *restful.Request, resp *restful.Response) {
|
||||
UpdateTime: &now,
|
||||
}
|
||||
|
||||
am, err := apputils.UpdateAppMgrStatus(appMgrName, status)
|
||||
_, err = apputils.UpdateAppMgrStatus(appMgrName, status)
|
||||
if err != nil {
|
||||
api.HandleError(resp, req, err)
|
||||
return
|
||||
}
|
||||
utils.PublishAppEvent(utils.EventParams{
|
||||
Owner: am.Spec.AppOwner,
|
||||
Name: am.Spec.AppName,
|
||||
OpType: string(am.Status.OpType),
|
||||
OpID: opID,
|
||||
State: appv1alpha1.ApplyingEnv.String(),
|
||||
RawAppName: am.Spec.RawAppName,
|
||||
Type: am.Spec.Type.String(),
|
||||
Title: apputils.AppTitle(am.Spec.Config),
|
||||
})
|
||||
|
||||
resp.WriteEntity(api.Response{Code: 200})
|
||||
}
|
||||
|
||||
@@ -9,8 +9,8 @@ import (
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/apiserver/api"
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/appstate"
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/constants"
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/utils"
|
||||
apputils "github.com/beclab/Olares/framework/app-service/pkg/utils/app"
|
||||
|
||||
"github.com/emicklei/go-restful/v3"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
@@ -82,21 +82,11 @@ func (h *Handler) cancel(req *restful.Request, resp *restful.Response) {
|
||||
api.HandleError(resp, req, err)
|
||||
return
|
||||
}
|
||||
a, err := apputils.UpdateAppMgrStatus(name, status)
|
||||
_, err = apputils.UpdateAppMgrStatus(name, status)
|
||||
if err != nil {
|
||||
api.HandleError(resp, req, err)
|
||||
return
|
||||
}
|
||||
utils.PublishAppEvent(utils.EventParams{
|
||||
Owner: a.Spec.AppOwner,
|
||||
Name: a.Spec.AppName,
|
||||
OpType: string(a.Status.OpType),
|
||||
OpID: opID,
|
||||
State: cancelState.String(),
|
||||
RawAppName: a.Spec.RawAppName,
|
||||
Type: a.Spec.Type.String(),
|
||||
Title: apputils.AppTitle(a.Spec.Config),
|
||||
})
|
||||
|
||||
resp.WriteAsJson(api.InstallationResponse{
|
||||
Response: api.Response{Code: 200},
|
||||
|
||||
@@ -9,8 +9,6 @@ import (
|
||||
"slices"
|
||||
"strconv"
|
||||
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/users/userspace"
|
||||
|
||||
"github.com/beclab/Olares/framework/app-service/api/app.bytetrade.io/v1alpha1"
|
||||
sysv1alpha1 "github.com/beclab/Olares/framework/app-service/api/sys.bytetrade.io/v1alpha1"
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/apiserver/api"
|
||||
@@ -19,6 +17,7 @@ import (
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/constants"
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/generated/clientset/versioned"
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/kubesphere"
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/users/userspace"
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/utils"
|
||||
apputils "github.com/beclab/Olares/framework/app-service/pkg/utils/app"
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/utils/config"
|
||||
@@ -611,23 +610,12 @@ func (h *installHandlerHelper) applyApplicationManager(marketSource string) (opI
|
||||
UpdateTime: &now,
|
||||
OpTime: &now,
|
||||
}
|
||||
a, err = apputils.UpdateAppMgrStatus(name, status)
|
||||
|
||||
_, err = apputils.UpdateAppMgrStatus(name, status)
|
||||
if err != nil {
|
||||
api.HandleError(h.resp, h.req, err)
|
||||
return
|
||||
}
|
||||
|
||||
utils.PublishAppEvent(utils.EventParams{
|
||||
Owner: a.Spec.AppOwner,
|
||||
Name: a.Spec.AppName,
|
||||
OpType: string(a.Status.OpType),
|
||||
OpID: opID,
|
||||
State: v1alpha1.Pending.String(),
|
||||
RawAppName: a.Spec.RawAppName,
|
||||
Type: a.Spec.Type.String(),
|
||||
Title: apputils.AppTitle(a.Spec.Config),
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
@@ -11,8 +11,8 @@ import (
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/appstate"
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/constants"
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/users/userspace"
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/utils"
|
||||
apputils "github.com/beclab/Olares/framework/app-service/pkg/utils/app"
|
||||
|
||||
"github.com/emicklei/go-restful/v3"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
@@ -94,21 +94,11 @@ func (h *Handler) uninstall(req *restful.Request, resp *restful.Response) {
|
||||
UpdateTime: &now,
|
||||
}
|
||||
|
||||
a, err := apputils.UpdateAppMgrStatus(name, status)
|
||||
_, err = apputils.UpdateAppMgrStatus(name, status)
|
||||
if err != nil {
|
||||
api.HandleError(resp, req, err)
|
||||
return
|
||||
}
|
||||
utils.PublishAppEvent(utils.EventParams{
|
||||
Owner: a.Spec.AppOwner,
|
||||
Name: a.Spec.AppName,
|
||||
OpType: string(a.Status.OpType),
|
||||
OpID: opID,
|
||||
State: v1alpha1.Uninstalling.String(),
|
||||
RawAppName: a.Spec.RawAppName,
|
||||
Type: a.Spec.Type.String(),
|
||||
Title: apputils.AppTitle(a.Spec.Config),
|
||||
})
|
||||
|
||||
resp.WriteEntity(api.InstallationResponse{
|
||||
Response: api.Response{Code: 200},
|
||||
|
||||
@@ -384,21 +384,11 @@ func (h *Handler) appUpgrade(req *restful.Request, resp *restful.Response) {
|
||||
UpdateTime: &now,
|
||||
}
|
||||
|
||||
am, err := apputils.UpdateAppMgrStatus(appMgrName, status)
|
||||
_, err = apputils.UpdateAppMgrStatus(appMgrName, status)
|
||||
if err != nil {
|
||||
api.HandleError(resp, req, err)
|
||||
return
|
||||
}
|
||||
utils.PublishAppEvent(utils.EventParams{
|
||||
Owner: am.Spec.AppOwner,
|
||||
Name: am.Spec.AppName,
|
||||
OpType: string(am.Status.OpType),
|
||||
OpID: opID,
|
||||
State: appv1alpha1.Upgrading.String(),
|
||||
RawAppName: am.Spec.RawAppName,
|
||||
Type: am.Spec.Type.String(),
|
||||
Title: apputils.AppTitle(am.Spec.Config),
|
||||
})
|
||||
|
||||
resp.WriteEntity(api.InstallationResponse{
|
||||
Response: api.Response{Code: 200},
|
||||
|
||||
@@ -150,7 +150,6 @@ func (h *Handler) installMiddleware(req *restful.Request, resp *restful.Response
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to uninstall middleware err=%v", err)
|
||||
}
|
||||
utils.PublishMiddlewareEvent(owner, app, string(v1alpha1.InstallOp), opID, v1alpha1.InstallFailed.String(), "", nil, "")
|
||||
}
|
||||
}
|
||||
}()
|
||||
@@ -172,7 +171,6 @@ func (h *Handler) installMiddleware(req *restful.Request, resp *restful.Response
|
||||
api.HandleError(resp, req, err)
|
||||
return
|
||||
}
|
||||
utils.PublishMiddlewareEvent(owner, app, string(v1alpha1.InstallOp), opID, v1alpha1.Installing.String(), "", nil, "")
|
||||
|
||||
klog.Infof("Start to install middleware name=%v", middlewareConfig.MiddlewareName)
|
||||
err = middlewareinstaller.Install(req.Request.Context(), h.kubeConfig, middlewareConfig)
|
||||
@@ -226,7 +224,7 @@ func (h *Handler) installMiddleware(req *restful.Request, resp *restful.Response
|
||||
klog.Infof("Failed to update status err=%v", err)
|
||||
return
|
||||
}
|
||||
utils.PublishMiddlewareEvent(owner, app, string(v1alpha1.CancelOp), opID, v1alpha1.InstallingCanceled.String(), "", nil, "")
|
||||
|
||||
return
|
||||
}
|
||||
klog.Infof("ticker get middleware status")
|
||||
@@ -245,8 +243,6 @@ func (h *Handler) installMiddleware(req *restful.Request, resp *restful.Response
|
||||
e := apputils.UpdateStatus(a, opRecord.Status, &opRecord, opRecord.Message)
|
||||
if e != nil {
|
||||
klog.Error(e)
|
||||
} else {
|
||||
utils.PublishMiddlewareEvent(owner, app, string(v1alpha1.InstallOp), opID, opRecord.Status.String(), "", nil, "")
|
||||
}
|
||||
delete(middlewareManager, name)
|
||||
return
|
||||
@@ -309,7 +305,6 @@ func (h *Handler) uninstallMiddleware(req *restful.Request, resp *restful.Respon
|
||||
api.HandleError(resp, req, err)
|
||||
return
|
||||
}
|
||||
utils.PublishMiddlewareEvent(owner, app, string(v1alpha1.UninstallOp), opID, v1alpha1.Uninstalling.String(), "", nil, "")
|
||||
|
||||
now = metav1.Now()
|
||||
opRecord := v1alpha1.OpRecord{
|
||||
@@ -327,8 +322,6 @@ func (h *Handler) uninstallMiddleware(req *restful.Request, resp *restful.Respon
|
||||
e := apputils.UpdateStatus(mgr, v1alpha1.UninstallFailed, &opRecord, opRecord.Message)
|
||||
if e != nil {
|
||||
klog.Errorf("Failed to update applicationmanager status in uninstall middleware name=%s err=%v", mgr.Name, e)
|
||||
} else {
|
||||
utils.PublishMiddlewareEvent(owner, app, string(v1alpha1.UninstallOp), opID, v1alpha1.UninstallFailed.String(), "", nil, "")
|
||||
}
|
||||
}
|
||||
}()
|
||||
@@ -344,7 +337,6 @@ func (h *Handler) uninstallMiddleware(req *restful.Request, resp *restful.Respon
|
||||
api.HandleError(resp, req, err)
|
||||
return
|
||||
}
|
||||
utils.PublishMiddlewareEvent(owner, app, string(v1alpha1.Uninstalled), opID, v1alpha1.Uninstalled.String(), "", nil, "")
|
||||
|
||||
resp.WriteEntity(api.InstallationResponse{
|
||||
Response: api.Response{Code: 200},
|
||||
@@ -390,7 +382,6 @@ func (h *Handler) cancelMiddleware(req *restful.Request, resp *restful.Response)
|
||||
api.HandleError(resp, req, err)
|
||||
return
|
||||
}
|
||||
utils.PublishMiddlewareEvent(owner, app, string(v1alpha1.CancelOp), opID, v1alpha1.InstallingCanceling.String(), "", nil, "")
|
||||
|
||||
resp.WriteAsJson(api.InstallationResponse{
|
||||
Response: api.Response{Code: 200},
|
||||
|
||||
@@ -19,7 +19,6 @@ import (
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/kubesphere"
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/provider"
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/tapr"
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/utils"
|
||||
apputils "github.com/beclab/Olares/framework/app-service/pkg/utils/app"
|
||||
|
||||
"github.com/emicklei/go-restful/v3"
|
||||
@@ -264,22 +263,11 @@ func (h *Handler) setupAppEntranceDomain(req *restful.Request, resp *restful.Res
|
||||
UpdateTime: &now,
|
||||
}
|
||||
|
||||
am, err := apputils.UpdateAppMgrStatus(appMgr.Name, status)
|
||||
_, err = apputils.UpdateAppMgrStatus(appMgr.Name, status)
|
||||
if err != nil {
|
||||
api.HandleError(resp, req, err)
|
||||
return
|
||||
}
|
||||
utils.PublishAppEvent(utils.EventParams{
|
||||
Owner: am.Spec.AppOwner,
|
||||
Name: am.Spec.AppName,
|
||||
OpType: string(am.Status.OpType),
|
||||
OpID: opID,
|
||||
State: v1alpha1.Upgrading.String(),
|
||||
RawAppName: am.Spec.RawAppName,
|
||||
Type: am.Spec.Type.String(),
|
||||
Title: apputils.AppTitle(am.Spec.Config),
|
||||
Message: fmt.Sprintf("app %s was upgrade via setup domain by user %s", am.Spec.AppName, am.Spec.AppOwner),
|
||||
})
|
||||
}
|
||||
resp.WriteAsJson(appUpdated.Spec.Settings)
|
||||
}
|
||||
|
||||
@@ -8,8 +8,6 @@ import (
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"k8s.io/klog/v2"
|
||||
|
||||
"github.com/beclab/Olares/framework/app-service/api/app.bytetrade.io/v1alpha1"
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/apiserver/api"
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/appcfg"
|
||||
@@ -17,12 +15,12 @@ import (
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/constants"
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/kubesphere"
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/users/userspace"
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/utils"
|
||||
apputils "github.com/beclab/Olares/framework/app-service/pkg/utils/app"
|
||||
|
||||
"github.com/emicklei/go-restful/v3"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/klog/v2"
|
||||
)
|
||||
|
||||
func (h *Handler) suspend(req *restful.Request, resp *restful.Response) {
|
||||
@@ -77,23 +75,11 @@ func (h *Handler) suspend(req *restful.Request, resp *restful.Response) {
|
||||
StatusTime: &now,
|
||||
UpdateTime: &now,
|
||||
}
|
||||
a, err := apputils.UpdateAppMgrStatus(name, status)
|
||||
_, err = apputils.UpdateAppMgrStatus(name, status)
|
||||
if err != nil {
|
||||
api.HandleError(resp, req, err)
|
||||
return
|
||||
}
|
||||
utils.PublishAppEvent(utils.EventParams{
|
||||
Owner: a.Spec.AppOwner,
|
||||
Name: a.Spec.AppName,
|
||||
OpType: string(a.Status.OpType),
|
||||
OpID: opID,
|
||||
State: v1alpha1.Stopping.String(),
|
||||
RawAppName: a.Spec.RawAppName,
|
||||
Type: a.Spec.Type.String(),
|
||||
Title: apputils.AppTitle(a.Spec.Config),
|
||||
Reason: constants.AppStopByUser,
|
||||
Message: fmt.Sprintf("app %s was stop by user %s", a.Spec.AppName, a.Spec.AppOwner),
|
||||
})
|
||||
|
||||
resp.WriteEntity(api.InstallationResponse{
|
||||
Response: api.Response{Code: 200},
|
||||
@@ -185,22 +171,11 @@ func (h *Handler) resume(req *restful.Request, resp *restful.Response) {
|
||||
StatusTime: &now,
|
||||
UpdateTime: &now,
|
||||
}
|
||||
a, err := apputils.UpdateAppMgrStatus(name, status)
|
||||
_, err = apputils.UpdateAppMgrStatus(name, status)
|
||||
if err != nil {
|
||||
api.HandleError(resp, req, err)
|
||||
return
|
||||
}
|
||||
utils.PublishAppEvent(utils.EventParams{
|
||||
Owner: a.Spec.AppOwner,
|
||||
Name: a.Spec.AppName,
|
||||
OpType: string(a.Status.OpType),
|
||||
OpID: opID,
|
||||
State: v1alpha1.Resuming.String(),
|
||||
RawAppName: a.Spec.RawAppName,
|
||||
Type: a.Spec.Type.String(),
|
||||
Title: apputils.AppTitle(a.Spec.Config),
|
||||
Message: fmt.Sprintf("app %s was resume by user %s", a.Spec.AppName, a.Spec.AppOwner),
|
||||
})
|
||||
|
||||
resp.WriteEntity(api.InstallationResponse{
|
||||
Response: api.Response{Code: 200},
|
||||
|
||||
@@ -15,7 +15,6 @@ import (
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/constants"
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/kubesphere"
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/provider"
|
||||
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/users"
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/users/userspace"
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/utils"
|
||||
@@ -1088,14 +1087,13 @@ func (h *Handler) applicationManagerMutate(req *restful.Request, resp *restful.R
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
var pam *v1alpha1.ApplicationManager
|
||||
var admissionReq, admissionResp admissionv1.AdmissionReview
|
||||
proxyUUID := uuid.New()
|
||||
if _, _, err := webhook.Deserializer.Decode(admissionRequestBody, nil, &admissionReq); err != nil {
|
||||
klog.Errorf("Failed to decode admission request body err=%v", err)
|
||||
admissionResp.Response = h.sidecarWebhook.AdmissionError("", err)
|
||||
} else {
|
||||
admissionResp.Response, pam = h.applicationManagerInject(req.Request.Context(), admissionReq.Request, proxyUUID)
|
||||
admissionResp.Response, _ = h.applicationManagerInject(req.Request.Context(), admissionReq.Request, proxyUUID)
|
||||
}
|
||||
admissionResp.TypeMeta = admissionReq.TypeMeta
|
||||
admissionResp.Kind = admissionReq.Kind
|
||||
@@ -1109,18 +1107,6 @@ func (h *Handler) applicationManagerMutate(req *restful.Request, resp *restful.R
|
||||
klog.Errorf("Failed to write response[application-manager inject] admin review in namespace=%s err=%v", requestForNamespace, err)
|
||||
return
|
||||
}
|
||||
if pam != nil {
|
||||
utils.PublishAppEvent(utils.EventParams{
|
||||
Owner: pam.Spec.AppOwner,
|
||||
Name: pam.Spec.AppName,
|
||||
OpType: string(pam.Spec.OpType),
|
||||
OpID: pam.Status.OpID,
|
||||
State: pam.Status.State.String(),
|
||||
RawAppName: pam.Spec.RawAppName,
|
||||
Type: pam.Spec.Type.String(),
|
||||
Title: apputils.AppTitle(pam.Spec.Config),
|
||||
})
|
||||
}
|
||||
|
||||
klog.Infof("Done[application-manager inject] with uuid=%s in namespace=%s", proxyUUID, requestForNamespace)
|
||||
}
|
||||
|
||||
@@ -130,15 +130,6 @@ func (p *InstallingApp) Exec(ctx context.Context) (StatefulInProgressApp, error)
|
||||
return
|
||||
} // end of err != nil
|
||||
|
||||
p.finally = func() {
|
||||
klog.Infof("app %s install successfully, update app state to initializing", p.manager.Spec.AppName)
|
||||
updateErr := p.updateStatus(context.TODO(), p.manager, appsv1.Initializing, nil, appsv1.Initializing.String(), "")
|
||||
if updateErr != nil {
|
||||
klog.Errorf("update status failed %v", updateErr)
|
||||
return
|
||||
}
|
||||
|
||||
}
|
||||
if p.manager.Spec.Type == appsv1.Middleware {
|
||||
ok, err := ops.WaitForLaunch()
|
||||
if !ok {
|
||||
@@ -165,6 +156,16 @@ func (p *InstallingApp) Exec(ctx context.Context) (StatefulInProgressApp, error)
|
||||
return
|
||||
}
|
||||
}
|
||||
} else {
|
||||
p.finally = func() {
|
||||
klog.Infof("app %s install successfully, update app state to initializing", p.manager.Spec.AppName)
|
||||
updateErr := p.updateStatus(context.TODO(), p.manager, appsv1.Initializing, nil, appsv1.Initializing.String(), "")
|
||||
if updateErr != nil {
|
||||
klog.Errorf("update status failed %v", updateErr)
|
||||
return
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
|
||||
@@ -6,7 +6,6 @@ import (
|
||||
|
||||
appsv1 "github.com/beclab/Olares/framework/app-service/api/app.bytetrade.io/v1alpha1"
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/constants"
|
||||
appevent "github.com/beclab/Olares/framework/app-service/pkg/event"
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/helm"
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/utils"
|
||||
apputils "github.com/beclab/Olares/framework/app-service/pkg/utils/app"
|
||||
@@ -100,17 +99,6 @@ func (p *PendingApp) Exec(ctx context.Context) (StatefulInProgressApp, error) {
|
||||
klog.Error("update app manager status error, ", err, ", ", p.manager.Name)
|
||||
return err
|
||||
}
|
||||
appevent.PublishAppEventToQueue(utils.EventParams{
|
||||
Owner: p.manager.Spec.AppOwner,
|
||||
Name: p.manager.Spec.AppName,
|
||||
OpType: string(p.manager.Spec.OpType),
|
||||
OpID: p.manager.Status.OpID,
|
||||
State: appsv1.Downloading.String(),
|
||||
RawAppName: p.manager.Spec.RawAppName,
|
||||
Type: p.manager.Spec.Type.String(),
|
||||
Title: apputils.AppTitle(p.manager.Spec.Config),
|
||||
})
|
||||
|
||||
return nil
|
||||
},
|
||||
); err != nil {
|
||||
|
||||
@@ -11,9 +11,7 @@ import (
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/appcfg"
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/appinstaller"
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/appinstaller/versioned"
|
||||
appevent "github.com/beclab/Olares/framework/app-service/pkg/event"
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/middlewareinstaller"
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/utils"
|
||||
apputils "github.com/beclab/Olares/framework/app-service/pkg/utils/app"
|
||||
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
@@ -82,19 +80,6 @@ func (b *baseStatefulApp) updateStatus(ctx context.Context, am *appsv1.Applicati
|
||||
klog.Errorf("patch appmgr's %s status failed %v", am.Name, err)
|
||||
return err
|
||||
}
|
||||
appevent.PublishAppEventToQueue(utils.EventParams{
|
||||
Owner: b.manager.Spec.AppOwner,
|
||||
Name: b.manager.Spec.AppName,
|
||||
OpType: string(b.manager.Spec.OpType),
|
||||
OpID: b.manager.Status.OpID,
|
||||
State: state.String(),
|
||||
RawAppName: b.manager.Spec.RawAppName,
|
||||
Type: b.manager.Spec.Type.String(),
|
||||
Title: apputils.AppTitle(b.manager.Spec.Config),
|
||||
Reason: reason,
|
||||
Message: message,
|
||||
})
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@@ -3,10 +3,12 @@ package event
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/utils"
|
||||
|
||||
"github.com/nats-io/nats.go"
|
||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/client-go/util/workqueue"
|
||||
@@ -16,8 +18,10 @@ import (
|
||||
var AppEventQueue *QueuedEventController
|
||||
|
||||
type QueuedEventController struct {
|
||||
wq workqueue.RateLimitingInterface
|
||||
ctx context.Context
|
||||
wq workqueue.RateLimitingInterface
|
||||
ctx context.Context
|
||||
nc *nats.Conn
|
||||
ncMux sync.Mutex
|
||||
}
|
||||
|
||||
type QueueEvent struct {
|
||||
@@ -25,6 +29,17 @@ type QueueEvent struct {
|
||||
Data interface{}
|
||||
}
|
||||
|
||||
type UserEvent struct {
|
||||
Topic string `json:"topic"`
|
||||
Payload Payload `json:"payload"`
|
||||
}
|
||||
|
||||
type Payload struct {
|
||||
User string `json:"user"`
|
||||
Operator string `json:"operator"`
|
||||
Timestamp time.Time `json:"timestamp"`
|
||||
}
|
||||
|
||||
func (qe *QueuedEventController) processNextWorkItem() bool {
|
||||
obj, shutdown := qe.wq.Get()
|
||||
if shutdown {
|
||||
@@ -41,11 +56,19 @@ func (qe *QueuedEventController) process(obj interface{}) {
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
err := utils.PublishToNats(eobj.Subject, eobj.Data)
|
||||
if err != nil {
|
||||
klog.Errorf("async publish subject %s,data %v, failed %v", eobj.Subject, eobj.Data, err)
|
||||
} else {
|
||||
klog.Infof("publish event success data: %#v", eobj.Data)
|
||||
for {
|
||||
err := qe.publish(eobj.Subject, eobj.Data)
|
||||
if err == nil {
|
||||
klog.Infof("publish event success data: %#v", eobj.Data)
|
||||
return
|
||||
}
|
||||
klog.Errorf("publish subject %s, data %v failed: %v", eobj.Subject, eobj.Data, err)
|
||||
select {
|
||||
case <-qe.ctx.Done():
|
||||
return
|
||||
case <-time.After(time.Second):
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -68,9 +91,46 @@ func (qe *QueuedEventController) enqueue(obj interface{}) {
|
||||
qe.wq.Add(obj)
|
||||
}
|
||||
|
||||
func NewAppEventQueue(ctx context.Context) *QueuedEventController {
|
||||
func (qe *QueuedEventController) publish(subject string, data interface{}) error {
|
||||
if err := qe.ensureNatsConnected(); err != nil {
|
||||
return fmt.Errorf("failed to ensure NATS connection: %w", err)
|
||||
}
|
||||
return utils.PublishEvent(qe.nc, subject, data)
|
||||
}
|
||||
|
||||
func (qe *QueuedEventController) ensureNatsConnected() error {
|
||||
qe.ncMux.Lock()
|
||||
defer qe.ncMux.Unlock()
|
||||
|
||||
if qe.nc != nil && qe.nc.IsConnected() {
|
||||
return nil
|
||||
}
|
||||
if qe.nc != nil {
|
||||
qe.nc.Close()
|
||||
}
|
||||
|
||||
klog.Info("NATS connection not established, attempting to connect...")
|
||||
nc, err := utils.NewNatsConn()
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to connect to NATS: %v", err)
|
||||
return err
|
||||
}
|
||||
|
||||
qe.nc = nc
|
||||
klog.Info("Successfully connected to NATS")
|
||||
return nil
|
||||
}
|
||||
|
||||
func (qe *QueuedEventController) GetNatsConn() *nats.Conn {
|
||||
qe.ncMux.Lock()
|
||||
defer qe.ncMux.Unlock()
|
||||
return qe.nc
|
||||
}
|
||||
|
||||
func NewAppEventQueue(ctx context.Context, nc *nats.Conn) *QueuedEventController {
|
||||
return &QueuedEventController{
|
||||
ctx: ctx,
|
||||
nc: nc,
|
||||
wq: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "app-event-queue"),
|
||||
}
|
||||
}
|
||||
@@ -110,3 +170,16 @@ func PublishAppEventToQueue(p utils.EventParams) {
|
||||
|
||||
AppEventQueue.enqueue(&QueueEvent{Subject: subject, Data: data})
|
||||
}
|
||||
|
||||
func PublishUserEventToQueue(topic, user, operator string) {
|
||||
subject := "os.users"
|
||||
data := UserEvent{
|
||||
Topic: topic,
|
||||
Payload: Payload{
|
||||
User: user,
|
||||
Operator: operator,
|
||||
Timestamp: time.Now(),
|
||||
},
|
||||
}
|
||||
AppEventQueue.enqueue(&QueueEvent{Subject: subject, Data: data})
|
||||
}
|
||||
|
||||
@@ -3,7 +3,6 @@ package utils
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
@@ -12,6 +11,7 @@ import (
|
||||
"github.com/beclab/Olares/framework/app-service/api/app.bytetrade.io/v1alpha1"
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/constants"
|
||||
refdocker "github.com/containerd/containerd/reference/docker"
|
||||
"github.com/pkg/errors"
|
||||
"helm.sh/helm/v3/pkg/action"
|
||||
"helm.sh/helm/v3/pkg/chart"
|
||||
helmLoader "helm.sh/helm/v3/pkg/chart/loader"
|
||||
@@ -182,7 +182,7 @@ func GetResourceListFromChart(chartPath string, values map[string]interface{}) (
|
||||
if err == io.EOF {
|
||||
return resources, nil
|
||||
}
|
||||
return nil, fmt.Errorf("error parsing")
|
||||
return nil, errors.Wrap(err, "error parsing")
|
||||
}
|
||||
ext.Raw = bytes.TrimSpace(ext.Raw)
|
||||
if len(ext.Raw) == 0 || bytes.Equal(ext.Raw, []byte("null")) {
|
||||
|
||||
@@ -47,96 +47,13 @@ type EventParams struct {
|
||||
SharedEntrances []v1alpha1.Entrance
|
||||
}
|
||||
|
||||
type UserEvent struct {
|
||||
Topic string `json:"topic"`
|
||||
Payload Payload `json:"payload"`
|
||||
func PublishEvent(nc *nats.Conn, subject string, data interface{}) error {
|
||||
return publish(nc, subject, data)
|
||||
}
|
||||
|
||||
type Payload struct {
|
||||
User string `json:"user"`
|
||||
Operator string `json:"operator"`
|
||||
Timestamp time.Time `json:"timestamp"`
|
||||
}
|
||||
|
||||
func PublishUserEvent(topic, user, operator string) {
|
||||
subject := "os.users"
|
||||
data := UserEvent{
|
||||
Topic: topic,
|
||||
Payload: Payload{
|
||||
User: user,
|
||||
Operator: operator,
|
||||
Timestamp: time.Now(),
|
||||
},
|
||||
}
|
||||
if err := publish(subject, data); err != nil {
|
||||
klog.Errorf("async publish subject %s,data %v, failed %v", subject, data, err)
|
||||
} else {
|
||||
t, _ := json.Marshal(data)
|
||||
klog.Infof("publish user event success. data: %v", string(t))
|
||||
}
|
||||
}
|
||||
|
||||
func PublishAppEvent(p EventParams) {
|
||||
subject := fmt.Sprintf("os.application.%s", p.Owner)
|
||||
|
||||
now := time.Now()
|
||||
data := Event{
|
||||
EventID: fmt.Sprintf("%s-%s-%d", p.Owner, p.Name, now.UnixMilli()),
|
||||
CreateTime: now,
|
||||
Name: p.Name,
|
||||
Type: func() string {
|
||||
if p.Type == "" {
|
||||
return "app"
|
||||
}
|
||||
return p.Type
|
||||
}(),
|
||||
OpType: p.OpType,
|
||||
OpID: p.OpID,
|
||||
State: p.State,
|
||||
Progress: p.Progress,
|
||||
User: p.Owner,
|
||||
RawAppName: func() string {
|
||||
if p.RawAppName == "" {
|
||||
return p.Name
|
||||
}
|
||||
return p.RawAppName
|
||||
}(),
|
||||
Title: p.Title,
|
||||
Reason: p.Reason,
|
||||
Message: p.Message,
|
||||
}
|
||||
if len(p.EntranceStatuses) > 0 {
|
||||
data.EntranceStatuses = p.EntranceStatuses
|
||||
}
|
||||
|
||||
if err := publish(subject, data); err != nil {
|
||||
klog.Errorf("async publish subject %s,data %v, failed %v", subject, data, err)
|
||||
} else {
|
||||
klog.Infof("publish event success data: %#v", data)
|
||||
}
|
||||
}
|
||||
|
||||
func PublishToNats(subject string, data interface{}) error {
|
||||
return publish(subject, data)
|
||||
}
|
||||
|
||||
func publish(subject string, data interface{}) error {
|
||||
natsHost := os.Getenv("NATS_HOST")
|
||||
natsPort := os.Getenv("NATS_PORT")
|
||||
|
||||
username := os.Getenv("NATS_USERNAME")
|
||||
password := os.Getenv("NATS_PASSWORD")
|
||||
|
||||
natsURL := fmt.Sprintf("nats://%s:%s", natsHost, natsPort)
|
||||
nc, err := nats.Connect(natsURL, nats.UserInfo(username, password))
|
||||
if err != nil {
|
||||
klog.Infof("connect error: err=%v", err)
|
||||
return err
|
||||
}
|
||||
defer nc.Drain()
|
||||
func publish(nc *nats.Conn, subject string, data interface{}) error {
|
||||
d, err := json.Marshal(data)
|
||||
if err != nil {
|
||||
klog.Errorf("marshal failed: %v", err)
|
||||
return err
|
||||
}
|
||||
err = nc.Publish(subject, d)
|
||||
@@ -147,34 +64,38 @@ func publish(subject string, data interface{}) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func PublishMiddlewareEvent(owner, name, opType, opID, state, progress string, entranceStatuses []v1alpha1.EntranceStatus, rawAppName string) {
|
||||
subject := fmt.Sprintf("os.application.%s", owner)
|
||||
func NewNatsConn() (*nats.Conn, error) {
|
||||
natsHost := os.Getenv("NATS_HOST")
|
||||
natsPort := os.Getenv("NATS_PORT")
|
||||
username := os.Getenv("NATS_USERNAME")
|
||||
password := os.Getenv("NATS_PASSWORD")
|
||||
|
||||
now := time.Now()
|
||||
data := Event{
|
||||
EventID: fmt.Sprintf("%s-%s-%d", owner, name, now.UnixMilli()),
|
||||
CreateTime: now,
|
||||
Name: name,
|
||||
Type: "middleware",
|
||||
OpType: opType,
|
||||
OpID: opID,
|
||||
State: state,
|
||||
Progress: progress,
|
||||
User: owner,
|
||||
RawAppName: func() string {
|
||||
if rawAppName == "" {
|
||||
return name
|
||||
natsURL := fmt.Sprintf("nats://%s:%s", natsHost, natsPort)
|
||||
|
||||
opts := []nats.Option{
|
||||
nats.UserInfo(username, password),
|
||||
nats.MaxReconnects(-1),
|
||||
nats.ReconnectWait(2 * time.Second),
|
||||
nats.DisconnectErrHandler(func(nc *nats.Conn, err error) {
|
||||
if err != nil {
|
||||
klog.Warningf("NATS disconnected: %v, will attempt to reconnect", err)
|
||||
} else {
|
||||
klog.Infof("NATS disconnected, will attempt to reconnect")
|
||||
}
|
||||
return rawAppName
|
||||
}(),
|
||||
}
|
||||
if len(entranceStatuses) > 0 {
|
||||
data.EntranceStatuses = entranceStatuses
|
||||
}),
|
||||
nats.ReconnectHandler(func(nc *nats.Conn) {
|
||||
klog.Infof("NATS reconnected to %s", nc.ConnectedUrl())
|
||||
}),
|
||||
nats.ClosedHandler(func(nc *nats.Conn) {
|
||||
klog.Errorf("NATS connection closed permanently: %v", nc.LastError())
|
||||
}),
|
||||
}
|
||||
|
||||
if err := publish(subject, data); err != nil {
|
||||
klog.Errorf("async publish subject %s,data %v, failed %v", subject, data, err)
|
||||
} else {
|
||||
klog.Infof("publish event success data: %#v", data)
|
||||
nc, err := nats.Connect(natsURL, opts...)
|
||||
if err != nil {
|
||||
klog.Errorf("failed to connect to NATS: %v", err)
|
||||
return nil, err
|
||||
}
|
||||
klog.Infof("connected to NATS at %s", natsURL)
|
||||
return nc, nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user