Compare commits
12 Commits
daemon/fix
...
module-app
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b7e54d63b8 | ||
|
|
0aef8c3c99 | ||
|
|
b9605307cc | ||
|
|
61fec6d056 | ||
|
|
be46c64783 | ||
|
|
4989ced9f3 | ||
|
|
754e949855 | ||
|
|
0901056794 | ||
|
|
4b421e0976 | ||
|
|
7d433cf8ac | ||
|
|
6fc22a4778 | ||
|
|
96c12d2682 |
@@ -317,7 +317,7 @@ spec:
|
||||
chown -R 1000:1000 /uploadstemp && \
|
||||
chown -R 1000:1000 /appdata
|
||||
- name: olares-app-init
|
||||
image: beclab/system-frontend:v1.6.37
|
||||
image: beclab/system-frontend:v1.6.40
|
||||
imagePullPolicy: IfNotPresent
|
||||
command:
|
||||
- /bin/sh
|
||||
|
||||
@@ -29,7 +29,7 @@ spec:
|
||||
|
||||
containers:
|
||||
- name: wizard
|
||||
image: beclab/wizard:v1.6.30
|
||||
image: beclab/wizard:v1.6.40
|
||||
imagePullPolicy: IfNotPresent
|
||||
ports:
|
||||
- containerPort: 80
|
||||
|
||||
@@ -287,7 +287,7 @@ func (a *Argument) LoadReleaseInfo() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (a *Argument) SaveReleaseInfo() error {
|
||||
func (a *Argument) SaveReleaseInfo(withoutName bool) error {
|
||||
if a.BaseDir == "" {
|
||||
return errors.New("invalid: empty base directory")
|
||||
}
|
||||
@@ -300,15 +300,17 @@ func (a *Argument) SaveReleaseInfo() error {
|
||||
ENV_OLARES_VERSION: a.OlaresVersion,
|
||||
}
|
||||
|
||||
if a.User != nil && a.User.UserName != "" && a.User.DomainName != "" {
|
||||
releaseInfoMap["OLARES_NAME"] = fmt.Sprintf("%s@%s", a.User.UserName, a.User.DomainName)
|
||||
} else {
|
||||
if util.IsExist(OlaresReleaseFile) {
|
||||
// if the user is not set, try to load the user name from the release file
|
||||
envs, err := godotenv.Read(OlaresReleaseFile)
|
||||
if err == nil {
|
||||
if userName, ok := envs["OLARES_NAME"]; ok {
|
||||
releaseInfoMap["OLARES_NAME"] = userName
|
||||
if !withoutName {
|
||||
if a.User != nil && a.User.UserName != "" && a.User.DomainName != "" {
|
||||
releaseInfoMap["OLARES_NAME"] = fmt.Sprintf("%s@%s", a.User.UserName, a.User.DomainName)
|
||||
} else {
|
||||
if util.IsExist(OlaresReleaseFile) {
|
||||
// if the user is not set, try to load the user name from the release file
|
||||
envs, err := godotenv.Read(OlaresReleaseFile)
|
||||
if err == nil {
|
||||
if userName, ok := envs["OLARES_NAME"]; ok {
|
||||
releaseInfoMap["OLARES_NAME"] = userName
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -111,6 +111,7 @@ func (p *phaseBuilder) phaseInstall() *phaseBuilder {
|
||||
PhaseFile: common.TerminusStateFileInstalled,
|
||||
BaseDir: p.runtime.GetBaseDir(),
|
||||
},
|
||||
&terminus.WriteReleaseFileModule{WithoutName: true},
|
||||
)
|
||||
}
|
||||
return p
|
||||
|
||||
@@ -74,6 +74,7 @@ func (m *PreparedModule) Init() {
|
||||
|
||||
type WriteReleaseFileModule struct {
|
||||
common.KubeModule
|
||||
WithoutName bool
|
||||
}
|
||||
|
||||
func (m *WriteReleaseFileModule) Init() {
|
||||
@@ -82,7 +83,7 @@ func (m *WriteReleaseFileModule) Init() {
|
||||
m.Tasks = []task.Interface{
|
||||
&task.LocalTask{
|
||||
Name: "WriteReleaseFile",
|
||||
Action: new(WriteReleaseFile),
|
||||
Action: &WriteReleaseFile{WithoutName: m.WithoutName},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
@@ -250,13 +250,14 @@ func (t *PrepareFinished) Execute(runtime connector.Runtime) error {
|
||||
|
||||
type WriteReleaseFile struct {
|
||||
common.KubeAction
|
||||
WithoutName bool
|
||||
}
|
||||
|
||||
func (t *WriteReleaseFile) Execute(runtime connector.Runtime) error {
|
||||
if util.IsExist(common.OlaresReleaseFile) {
|
||||
logger.Debugf("found existing release file: %s, overriding ...", common.OlaresReleaseFile)
|
||||
}
|
||||
return t.KubeConf.Arg.SaveReleaseInfo()
|
||||
return t.KubeConf.Arg.SaveReleaseInfo(t.WithoutName)
|
||||
}
|
||||
|
||||
type RemoveReleaseFile struct {
|
||||
|
||||
@@ -199,7 +199,7 @@ func MountedHddPath(ctx context.Context) ([]string, error) {
|
||||
|
||||
func FilterBySerial(serial string) func(dev storageDevice) bool {
|
||||
return func(dev storageDevice) bool {
|
||||
return dev.IDSerial == serial || dev.IDSerialShort == serial
|
||||
return strings.HasSuffix(serial, dev.IDSerial) || strings.HasSuffix(serial, dev.IDSerialShort)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -170,7 +170,7 @@ spec:
|
||||
priorityClassName: "system-cluster-critical"
|
||||
containers:
|
||||
- name: app-service
|
||||
image: beclab/app-service:0.4.72
|
||||
image: beclab/app-service:0.4.73
|
||||
imagePullPolicy: IfNotPresent
|
||||
securityContext:
|
||||
runAsUser: 0
|
||||
|
||||
@@ -18,6 +18,7 @@ import (
|
||||
appevent "github.com/beclab/Olares/framework/app-service/pkg/event"
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/generated/clientset/versioned"
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/images"
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/utils"
|
||||
|
||||
kbappsv1 "github.com/apecloud/kubeblocks/apis/apps/v1"
|
||||
kbopv1alphav1 "github.com/apecloud/kubeblocks/apis/operations/v1alpha1"
|
||||
@@ -138,7 +139,13 @@ func main() {
|
||||
setupLog.Error(err, "Unable to create controller", "controller", "Security")
|
||||
os.Exit(1)
|
||||
}
|
||||
appEventQueue := appevent.NewAppEventQueue(ictx)
|
||||
natsConn, err := utils.NewNatsConn()
|
||||
if err != nil {
|
||||
setupLog.Error(err, "Failed to connect to NATS")
|
||||
os.Exit(1)
|
||||
}
|
||||
defer natsConn.Drain()
|
||||
appEventQueue := appevent.NewAppEventQueue(ictx, natsConn)
|
||||
appevent.SetAppEventQueue(appEventQueue)
|
||||
go appEventQueue.Run()
|
||||
|
||||
@@ -198,6 +205,7 @@ func main() {
|
||||
if err = (&controllers.NodeAlertController{
|
||||
Client: mgr.GetClient(),
|
||||
KubeConfig: config,
|
||||
NatsConn: natsConn,
|
||||
}).SetupWithManager(mgr); err != nil {
|
||||
setupLog.Error(err, "Unable to create controller", "controller", "NodeAlert")
|
||||
os.Exit(1)
|
||||
|
||||
@@ -6,13 +6,13 @@ import (
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/constants"
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/utils"
|
||||
|
||||
appv1alpha1 "github.com/beclab/Olares/framework/app-service/api/app.bytetrade.io/v1alpha1"
|
||||
sysv1alpha1 "github.com/beclab/Olares/framework/app-service/api/sys.bytetrade.io/v1alpha1"
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/appstate"
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/constants"
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/utils"
|
||||
apputils "github.com/beclab/Olares/framework/app-service/pkg/utils/app"
|
||||
|
||||
coordinationv1 "k8s.io/api/coordination/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
@@ -218,20 +218,10 @@ func (r *AppEnvController) triggerApplyEnv(ctx context.Context, appEnv *sysv1alp
|
||||
UpdateTime: &now,
|
||||
}
|
||||
|
||||
am, err := apputils.UpdateAppMgrStatus(targetAppMgr.Name, status)
|
||||
_, err = apputils.UpdateAppMgrStatus(targetAppMgr.Name, status)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to update ApplicationManager Status: %v", err)
|
||||
}
|
||||
utils.PublishAppEvent(utils.EventParams{
|
||||
Owner: am.Spec.AppOwner,
|
||||
Name: am.Spec.AppName,
|
||||
OpType: string(am.Status.OpType),
|
||||
OpID: opID,
|
||||
State: appv1alpha1.ApplyingEnv.String(),
|
||||
RawAppName: am.Spec.RawAppName,
|
||||
Type: am.Spec.Type.String(),
|
||||
Title: apputils.AppTitle(am.Spec.Config),
|
||||
})
|
||||
|
||||
klog.Infof("Successfully triggered ApplyEnv for app: %s owner: %s", appEnv.AppName, appEnv.AppOwner)
|
||||
return nil
|
||||
|
||||
@@ -7,7 +7,10 @@ import (
|
||||
|
||||
appv1alpha1 "github.com/beclab/Olares/framework/app-service/api/app.bytetrade.io/v1alpha1"
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/appstate"
|
||||
appevent "github.com/beclab/Olares/framework/app-service/pkg/event"
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/images"
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/utils"
|
||||
apputils "github.com/beclab/Olares/framework/app-service/pkg/utils/app"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/client-go/rest"
|
||||
|
||||
@@ -167,6 +170,11 @@ func (r *ApplicationManagerController) preEnqueueCheckForUpdate(old, new client.
|
||||
if curAppMgr.Spec.Type != appv1alpha1.App && curAppMgr.Spec.Type != appv1alpha1.Middleware {
|
||||
return false
|
||||
}
|
||||
|
||||
if oldAppMgr.Status.State != curAppMgr.Status.State {
|
||||
r.publishStateChangeEvent(curAppMgr)
|
||||
}
|
||||
|
||||
if curAppMgr.Status.OpGeneration <= oldAppMgr.Status.OpGeneration {
|
||||
return false
|
||||
}
|
||||
@@ -174,6 +182,22 @@ func (r *ApplicationManagerController) preEnqueueCheckForUpdate(old, new client.
|
||||
return true
|
||||
}
|
||||
|
||||
func (r *ApplicationManagerController) publishStateChangeEvent(am *appv1alpha1.ApplicationManager) {
|
||||
appevent.PublishAppEventToQueue(utils.EventParams{
|
||||
Owner: am.Spec.AppOwner,
|
||||
Name: am.Spec.AppName,
|
||||
OpType: string(am.Status.OpType),
|
||||
OpID: am.Status.OpID,
|
||||
State: am.Status.State.String(),
|
||||
Progress: am.Status.Progress,
|
||||
RawAppName: am.Spec.RawAppName,
|
||||
Type: am.Spec.Type.String(),
|
||||
Title: apputils.AppTitle(am.Spec.Config),
|
||||
Reason: am.Status.Reason,
|
||||
Message: am.Status.Message,
|
||||
})
|
||||
}
|
||||
|
||||
func (r *ApplicationManagerController) loadStatefulAppAndReconcile(ctx context.Context, name string) (appstate.StatefulApp, error) {
|
||||
statefulApp, err := LoadStatefulApp(ctx, r, name)
|
||||
if err != nil {
|
||||
|
||||
@@ -146,7 +146,7 @@ func LoadStatefulApp(ctx context.Context, appmgr *ApplicationManagerController,
|
||||
case appv1alpha1.ApplyingEnvCanceling:
|
||||
return appstate.NewApplyingEnvCancelingApp(appmgr, &am)
|
||||
case appv1alpha1.Uninstalling:
|
||||
return appstate.NewUninstallingApp(appmgr, &am, 15*time.Minute)
|
||||
return appstate.NewUninstallingApp(appmgr, &am, 30*time.Minute)
|
||||
case appv1alpha1.StopFailed:
|
||||
return appstate.NewSuspendFailedApp(appmgr, &am)
|
||||
case appv1alpha1.UninstallFailed:
|
||||
|
||||
@@ -8,6 +8,7 @@ import (
|
||||
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/utils"
|
||||
|
||||
"github.com/nats-io/nats.go"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/client-go/rest"
|
||||
@@ -54,6 +55,7 @@ type NodeAlertController struct {
|
||||
// lastPressureState tracks the last known pressure state for each node and pressure type
|
||||
lastPressureState map[string]bool
|
||||
mutex sync.RWMutex
|
||||
NatsConn *nats.Conn
|
||||
}
|
||||
|
||||
// SetupWithManager sets up the controller with the Manager.
|
||||
@@ -245,5 +247,5 @@ func (r *NodeAlertController) sendNodeAlert(nodeName string, pressureType NodePr
|
||||
|
||||
// publishToNats publishes a message to the specified NATS subject
|
||||
func (r *NodeAlertController) publishToNats(subject string, data interface{}) error {
|
||||
return utils.PublishToNats(subject, data)
|
||||
return utils.PublishEvent(r.NatsConn, subject, data)
|
||||
}
|
||||
|
||||
@@ -9,7 +9,6 @@ import (
|
||||
appv1alpha1 "github.com/beclab/Olares/framework/app-service/api/app.bytetrade.io/v1alpha1"
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/appstate"
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/constants"
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/utils"
|
||||
apputils "github.com/beclab/Olares/framework/app-service/pkg/utils/app"
|
||||
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
@@ -236,20 +235,6 @@ func (r *PodAbnormalSuspendAppController) trySuspendApp(ctx context.Context, own
|
||||
if _, err := apputils.UpdateAppMgrStatus(name, status); err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
utils.PublishAppEvent(utils.EventParams{
|
||||
Owner: am.Spec.AppOwner,
|
||||
Name: am.Spec.AppName,
|
||||
OpType: string(status.OpType),
|
||||
OpID: opID,
|
||||
State: appv1alpha1.Stopping.String(),
|
||||
Progress: message,
|
||||
RawAppName: am.Spec.RawAppName,
|
||||
Type: am.Spec.Type.String(),
|
||||
Title: apputils.AppTitle(am.Spec.Config),
|
||||
Reason: reason,
|
||||
Message: message,
|
||||
})
|
||||
klog.Infof("suspend requested for app=%s owner=%s, reason=%s", am.Spec.AppName, am.Spec.AppOwner, message)
|
||||
return true, nil
|
||||
}
|
||||
|
||||
@@ -8,6 +8,7 @@ import (
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
natsevent "github.com/beclab/Olares/framework/app-service/pkg/event"
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/users"
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/users/userspace/v1"
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/utils"
|
||||
@@ -165,7 +166,7 @@ func (r *UserController) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.
|
||||
klog.Infof("update user failed %v", updateErr)
|
||||
return ctrl.Result{}, updateErr
|
||||
}
|
||||
utils.PublishUserEvent("Delete", user.Name, user.Annotations[users.AnnotationUserDeleter])
|
||||
r.publish("Delete", user.Name, user.Annotations[users.AnnotationUserDeleter])
|
||||
}
|
||||
return ctrl.Result{}, nil
|
||||
}
|
||||
@@ -299,11 +300,15 @@ func (r *UserController) handleUserCreation(ctx context.Context, user *iamv1alph
|
||||
klog.Errorf("failed to update user status to Created %v", updateErr)
|
||||
} else {
|
||||
klog.Infof("publish user creation event.....")
|
||||
utils.PublishUserEvent("Create", user.Name, user.Annotations[users.AnnotationUserCreator])
|
||||
r.publish("Create", user.Name, user.Annotations[users.AnnotationUserCreator])
|
||||
}
|
||||
return ctrl.Result{}, updateErr
|
||||
}
|
||||
|
||||
func (r *UserController) publish(topic, user, operator string) {
|
||||
natsevent.PublishUserEventToQueue(topic, user, operator)
|
||||
}
|
||||
|
||||
func (r *UserController) checkResource(user *iamv1alpha2.User) error {
|
||||
metrics, _, err := apputils.GetClusterResource("")
|
||||
if err != nil {
|
||||
|
||||
@@ -9,7 +9,6 @@ import (
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/apiserver/api"
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/appstate"
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/constants"
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/utils"
|
||||
apputils "github.com/beclab/Olares/framework/app-service/pkg/utils/app"
|
||||
|
||||
"github.com/emicklei/go-restful/v3"
|
||||
@@ -74,21 +73,11 @@ func (h *Handler) appApplyEnv(req *restful.Request, resp *restful.Response) {
|
||||
UpdateTime: &now,
|
||||
}
|
||||
|
||||
am, err := apputils.UpdateAppMgrStatus(appMgrName, status)
|
||||
_, err = apputils.UpdateAppMgrStatus(appMgrName, status)
|
||||
if err != nil {
|
||||
api.HandleError(resp, req, err)
|
||||
return
|
||||
}
|
||||
utils.PublishAppEvent(utils.EventParams{
|
||||
Owner: am.Spec.AppOwner,
|
||||
Name: am.Spec.AppName,
|
||||
OpType: string(am.Status.OpType),
|
||||
OpID: opID,
|
||||
State: appv1alpha1.ApplyingEnv.String(),
|
||||
RawAppName: am.Spec.RawAppName,
|
||||
Type: am.Spec.Type.String(),
|
||||
Title: apputils.AppTitle(am.Spec.Config),
|
||||
})
|
||||
|
||||
resp.WriteEntity(api.Response{Code: 200})
|
||||
}
|
||||
|
||||
@@ -9,8 +9,8 @@ import (
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/apiserver/api"
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/appstate"
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/constants"
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/utils"
|
||||
apputils "github.com/beclab/Olares/framework/app-service/pkg/utils/app"
|
||||
|
||||
"github.com/emicklei/go-restful/v3"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
@@ -82,21 +82,11 @@ func (h *Handler) cancel(req *restful.Request, resp *restful.Response) {
|
||||
api.HandleError(resp, req, err)
|
||||
return
|
||||
}
|
||||
a, err := apputils.UpdateAppMgrStatus(name, status)
|
||||
_, err = apputils.UpdateAppMgrStatus(name, status)
|
||||
if err != nil {
|
||||
api.HandleError(resp, req, err)
|
||||
return
|
||||
}
|
||||
utils.PublishAppEvent(utils.EventParams{
|
||||
Owner: a.Spec.AppOwner,
|
||||
Name: a.Spec.AppName,
|
||||
OpType: string(a.Status.OpType),
|
||||
OpID: opID,
|
||||
State: cancelState.String(),
|
||||
RawAppName: a.Spec.RawAppName,
|
||||
Type: a.Spec.Type.String(),
|
||||
Title: apputils.AppTitle(a.Spec.Config),
|
||||
})
|
||||
|
||||
resp.WriteAsJson(api.InstallationResponse{
|
||||
Response: api.Response{Code: 200},
|
||||
|
||||
@@ -9,8 +9,6 @@ import (
|
||||
"slices"
|
||||
"strconv"
|
||||
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/users/userspace"
|
||||
|
||||
"github.com/beclab/Olares/framework/app-service/api/app.bytetrade.io/v1alpha1"
|
||||
sysv1alpha1 "github.com/beclab/Olares/framework/app-service/api/sys.bytetrade.io/v1alpha1"
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/apiserver/api"
|
||||
@@ -19,6 +17,7 @@ import (
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/constants"
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/generated/clientset/versioned"
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/kubesphere"
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/users/userspace"
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/utils"
|
||||
apputils "github.com/beclab/Olares/framework/app-service/pkg/utils/app"
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/utils/config"
|
||||
@@ -611,23 +610,12 @@ func (h *installHandlerHelper) applyApplicationManager(marketSource string) (opI
|
||||
UpdateTime: &now,
|
||||
OpTime: &now,
|
||||
}
|
||||
a, err = apputils.UpdateAppMgrStatus(name, status)
|
||||
|
||||
_, err = apputils.UpdateAppMgrStatus(name, status)
|
||||
if err != nil {
|
||||
api.HandleError(h.resp, h.req, err)
|
||||
return
|
||||
}
|
||||
|
||||
utils.PublishAppEvent(utils.EventParams{
|
||||
Owner: a.Spec.AppOwner,
|
||||
Name: a.Spec.AppName,
|
||||
OpType: string(a.Status.OpType),
|
||||
OpID: opID,
|
||||
State: v1alpha1.Pending.String(),
|
||||
RawAppName: a.Spec.RawAppName,
|
||||
Type: a.Spec.Type.String(),
|
||||
Title: apputils.AppTitle(a.Spec.Config),
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
@@ -11,8 +11,8 @@ import (
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/appstate"
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/constants"
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/users/userspace"
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/utils"
|
||||
apputils "github.com/beclab/Olares/framework/app-service/pkg/utils/app"
|
||||
|
||||
"github.com/emicklei/go-restful/v3"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
@@ -94,21 +94,11 @@ func (h *Handler) uninstall(req *restful.Request, resp *restful.Response) {
|
||||
UpdateTime: &now,
|
||||
}
|
||||
|
||||
a, err := apputils.UpdateAppMgrStatus(name, status)
|
||||
_, err = apputils.UpdateAppMgrStatus(name, status)
|
||||
if err != nil {
|
||||
api.HandleError(resp, req, err)
|
||||
return
|
||||
}
|
||||
utils.PublishAppEvent(utils.EventParams{
|
||||
Owner: a.Spec.AppOwner,
|
||||
Name: a.Spec.AppName,
|
||||
OpType: string(a.Status.OpType),
|
||||
OpID: opID,
|
||||
State: v1alpha1.Uninstalling.String(),
|
||||
RawAppName: a.Spec.RawAppName,
|
||||
Type: a.Spec.Type.String(),
|
||||
Title: apputils.AppTitle(a.Spec.Config),
|
||||
})
|
||||
|
||||
resp.WriteEntity(api.InstallationResponse{
|
||||
Response: api.Response{Code: 200},
|
||||
|
||||
@@ -384,21 +384,11 @@ func (h *Handler) appUpgrade(req *restful.Request, resp *restful.Response) {
|
||||
UpdateTime: &now,
|
||||
}
|
||||
|
||||
am, err := apputils.UpdateAppMgrStatus(appMgrName, status)
|
||||
_, err = apputils.UpdateAppMgrStatus(appMgrName, status)
|
||||
if err != nil {
|
||||
api.HandleError(resp, req, err)
|
||||
return
|
||||
}
|
||||
utils.PublishAppEvent(utils.EventParams{
|
||||
Owner: am.Spec.AppOwner,
|
||||
Name: am.Spec.AppName,
|
||||
OpType: string(am.Status.OpType),
|
||||
OpID: opID,
|
||||
State: appv1alpha1.Upgrading.String(),
|
||||
RawAppName: am.Spec.RawAppName,
|
||||
Type: am.Spec.Type.String(),
|
||||
Title: apputils.AppTitle(am.Spec.Config),
|
||||
})
|
||||
|
||||
resp.WriteEntity(api.InstallationResponse{
|
||||
Response: api.Response{Code: 200},
|
||||
|
||||
@@ -150,7 +150,6 @@ func (h *Handler) installMiddleware(req *restful.Request, resp *restful.Response
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to uninstall middleware err=%v", err)
|
||||
}
|
||||
utils.PublishMiddlewareEvent(owner, app, string(v1alpha1.InstallOp), opID, v1alpha1.InstallFailed.String(), "", nil, "")
|
||||
}
|
||||
}
|
||||
}()
|
||||
@@ -172,7 +171,6 @@ func (h *Handler) installMiddleware(req *restful.Request, resp *restful.Response
|
||||
api.HandleError(resp, req, err)
|
||||
return
|
||||
}
|
||||
utils.PublishMiddlewareEvent(owner, app, string(v1alpha1.InstallOp), opID, v1alpha1.Installing.String(), "", nil, "")
|
||||
|
||||
klog.Infof("Start to install middleware name=%v", middlewareConfig.MiddlewareName)
|
||||
err = middlewareinstaller.Install(req.Request.Context(), h.kubeConfig, middlewareConfig)
|
||||
@@ -226,7 +224,7 @@ func (h *Handler) installMiddleware(req *restful.Request, resp *restful.Response
|
||||
klog.Infof("Failed to update status err=%v", err)
|
||||
return
|
||||
}
|
||||
utils.PublishMiddlewareEvent(owner, app, string(v1alpha1.CancelOp), opID, v1alpha1.InstallingCanceled.String(), "", nil, "")
|
||||
|
||||
return
|
||||
}
|
||||
klog.Infof("ticker get middleware status")
|
||||
@@ -245,8 +243,6 @@ func (h *Handler) installMiddleware(req *restful.Request, resp *restful.Response
|
||||
e := apputils.UpdateStatus(a, opRecord.Status, &opRecord, opRecord.Message)
|
||||
if e != nil {
|
||||
klog.Error(e)
|
||||
} else {
|
||||
utils.PublishMiddlewareEvent(owner, app, string(v1alpha1.InstallOp), opID, opRecord.Status.String(), "", nil, "")
|
||||
}
|
||||
delete(middlewareManager, name)
|
||||
return
|
||||
@@ -309,7 +305,6 @@ func (h *Handler) uninstallMiddleware(req *restful.Request, resp *restful.Respon
|
||||
api.HandleError(resp, req, err)
|
||||
return
|
||||
}
|
||||
utils.PublishMiddlewareEvent(owner, app, string(v1alpha1.UninstallOp), opID, v1alpha1.Uninstalling.String(), "", nil, "")
|
||||
|
||||
now = metav1.Now()
|
||||
opRecord := v1alpha1.OpRecord{
|
||||
@@ -327,8 +322,6 @@ func (h *Handler) uninstallMiddleware(req *restful.Request, resp *restful.Respon
|
||||
e := apputils.UpdateStatus(mgr, v1alpha1.UninstallFailed, &opRecord, opRecord.Message)
|
||||
if e != nil {
|
||||
klog.Errorf("Failed to update applicationmanager status in uninstall middleware name=%s err=%v", mgr.Name, e)
|
||||
} else {
|
||||
utils.PublishMiddlewareEvent(owner, app, string(v1alpha1.UninstallOp), opID, v1alpha1.UninstallFailed.String(), "", nil, "")
|
||||
}
|
||||
}
|
||||
}()
|
||||
@@ -344,7 +337,6 @@ func (h *Handler) uninstallMiddleware(req *restful.Request, resp *restful.Respon
|
||||
api.HandleError(resp, req, err)
|
||||
return
|
||||
}
|
||||
utils.PublishMiddlewareEvent(owner, app, string(v1alpha1.Uninstalled), opID, v1alpha1.Uninstalled.String(), "", nil, "")
|
||||
|
||||
resp.WriteEntity(api.InstallationResponse{
|
||||
Response: api.Response{Code: 200},
|
||||
@@ -390,7 +382,6 @@ func (h *Handler) cancelMiddleware(req *restful.Request, resp *restful.Response)
|
||||
api.HandleError(resp, req, err)
|
||||
return
|
||||
}
|
||||
utils.PublishMiddlewareEvent(owner, app, string(v1alpha1.CancelOp), opID, v1alpha1.InstallingCanceling.String(), "", nil, "")
|
||||
|
||||
resp.WriteAsJson(api.InstallationResponse{
|
||||
Response: api.Response{Code: 200},
|
||||
|
||||
@@ -19,7 +19,6 @@ import (
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/kubesphere"
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/provider"
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/tapr"
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/utils"
|
||||
apputils "github.com/beclab/Olares/framework/app-service/pkg/utils/app"
|
||||
|
||||
"github.com/emicklei/go-restful/v3"
|
||||
@@ -264,22 +263,11 @@ func (h *Handler) setupAppEntranceDomain(req *restful.Request, resp *restful.Res
|
||||
UpdateTime: &now,
|
||||
}
|
||||
|
||||
am, err := apputils.UpdateAppMgrStatus(appMgr.Name, status)
|
||||
_, err = apputils.UpdateAppMgrStatus(appMgr.Name, status)
|
||||
if err != nil {
|
||||
api.HandleError(resp, req, err)
|
||||
return
|
||||
}
|
||||
utils.PublishAppEvent(utils.EventParams{
|
||||
Owner: am.Spec.AppOwner,
|
||||
Name: am.Spec.AppName,
|
||||
OpType: string(am.Status.OpType),
|
||||
OpID: opID,
|
||||
State: v1alpha1.Upgrading.String(),
|
||||
RawAppName: am.Spec.RawAppName,
|
||||
Type: am.Spec.Type.String(),
|
||||
Title: apputils.AppTitle(am.Spec.Config),
|
||||
Message: fmt.Sprintf("app %s was upgrade via setup domain by user %s", am.Spec.AppName, am.Spec.AppOwner),
|
||||
})
|
||||
}
|
||||
resp.WriteAsJson(appUpdated.Spec.Settings)
|
||||
}
|
||||
|
||||
@@ -8,8 +8,6 @@ import (
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"k8s.io/klog/v2"
|
||||
|
||||
"github.com/beclab/Olares/framework/app-service/api/app.bytetrade.io/v1alpha1"
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/apiserver/api"
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/appcfg"
|
||||
@@ -17,12 +15,12 @@ import (
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/constants"
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/kubesphere"
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/users/userspace"
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/utils"
|
||||
apputils "github.com/beclab/Olares/framework/app-service/pkg/utils/app"
|
||||
|
||||
"github.com/emicklei/go-restful/v3"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/klog/v2"
|
||||
)
|
||||
|
||||
func (h *Handler) suspend(req *restful.Request, resp *restful.Response) {
|
||||
@@ -77,23 +75,11 @@ func (h *Handler) suspend(req *restful.Request, resp *restful.Response) {
|
||||
StatusTime: &now,
|
||||
UpdateTime: &now,
|
||||
}
|
||||
a, err := apputils.UpdateAppMgrStatus(name, status)
|
||||
_, err = apputils.UpdateAppMgrStatus(name, status)
|
||||
if err != nil {
|
||||
api.HandleError(resp, req, err)
|
||||
return
|
||||
}
|
||||
utils.PublishAppEvent(utils.EventParams{
|
||||
Owner: a.Spec.AppOwner,
|
||||
Name: a.Spec.AppName,
|
||||
OpType: string(a.Status.OpType),
|
||||
OpID: opID,
|
||||
State: v1alpha1.Stopping.String(),
|
||||
RawAppName: a.Spec.RawAppName,
|
||||
Type: a.Spec.Type.String(),
|
||||
Title: apputils.AppTitle(a.Spec.Config),
|
||||
Reason: constants.AppStopByUser,
|
||||
Message: fmt.Sprintf("app %s was stop by user %s", a.Spec.AppName, a.Spec.AppOwner),
|
||||
})
|
||||
|
||||
resp.WriteEntity(api.InstallationResponse{
|
||||
Response: api.Response{Code: 200},
|
||||
@@ -185,22 +171,11 @@ func (h *Handler) resume(req *restful.Request, resp *restful.Response) {
|
||||
StatusTime: &now,
|
||||
UpdateTime: &now,
|
||||
}
|
||||
a, err := apputils.UpdateAppMgrStatus(name, status)
|
||||
_, err = apputils.UpdateAppMgrStatus(name, status)
|
||||
if err != nil {
|
||||
api.HandleError(resp, req, err)
|
||||
return
|
||||
}
|
||||
utils.PublishAppEvent(utils.EventParams{
|
||||
Owner: a.Spec.AppOwner,
|
||||
Name: a.Spec.AppName,
|
||||
OpType: string(a.Status.OpType),
|
||||
OpID: opID,
|
||||
State: v1alpha1.Resuming.String(),
|
||||
RawAppName: a.Spec.RawAppName,
|
||||
Type: a.Spec.Type.String(),
|
||||
Title: apputils.AppTitle(a.Spec.Config),
|
||||
Message: fmt.Sprintf("app %s was resume by user %s", a.Spec.AppName, a.Spec.AppOwner),
|
||||
})
|
||||
|
||||
resp.WriteEntity(api.InstallationResponse{
|
||||
Response: api.Response{Code: 200},
|
||||
|
||||
@@ -15,7 +15,6 @@ import (
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/constants"
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/kubesphere"
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/provider"
|
||||
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/users"
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/users/userspace"
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/utils"
|
||||
@@ -1088,14 +1087,13 @@ func (h *Handler) applicationManagerMutate(req *restful.Request, resp *restful.R
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
var pam *v1alpha1.ApplicationManager
|
||||
var admissionReq, admissionResp admissionv1.AdmissionReview
|
||||
proxyUUID := uuid.New()
|
||||
if _, _, err := webhook.Deserializer.Decode(admissionRequestBody, nil, &admissionReq); err != nil {
|
||||
klog.Errorf("Failed to decode admission request body err=%v", err)
|
||||
admissionResp.Response = h.sidecarWebhook.AdmissionError("", err)
|
||||
} else {
|
||||
admissionResp.Response, pam = h.applicationManagerInject(req.Request.Context(), admissionReq.Request, proxyUUID)
|
||||
admissionResp.Response, _ = h.applicationManagerInject(req.Request.Context(), admissionReq.Request, proxyUUID)
|
||||
}
|
||||
admissionResp.TypeMeta = admissionReq.TypeMeta
|
||||
admissionResp.Kind = admissionReq.Kind
|
||||
@@ -1109,18 +1107,6 @@ func (h *Handler) applicationManagerMutate(req *restful.Request, resp *restful.R
|
||||
klog.Errorf("Failed to write response[application-manager inject] admin review in namespace=%s err=%v", requestForNamespace, err)
|
||||
return
|
||||
}
|
||||
if pam != nil {
|
||||
utils.PublishAppEvent(utils.EventParams{
|
||||
Owner: pam.Spec.AppOwner,
|
||||
Name: pam.Spec.AppName,
|
||||
OpType: string(pam.Spec.OpType),
|
||||
OpID: pam.Status.OpID,
|
||||
State: pam.Status.State.String(),
|
||||
RawAppName: pam.Spec.RawAppName,
|
||||
Type: pam.Spec.Type.String(),
|
||||
Title: apputils.AppTitle(pam.Spec.Config),
|
||||
})
|
||||
}
|
||||
|
||||
klog.Infof("Done[application-manager inject] with uuid=%s in namespace=%s", proxyUUID, requestForNamespace)
|
||||
}
|
||||
|
||||
@@ -130,15 +130,6 @@ func (p *InstallingApp) Exec(ctx context.Context) (StatefulInProgressApp, error)
|
||||
return
|
||||
} // end of err != nil
|
||||
|
||||
p.finally = func() {
|
||||
klog.Infof("app %s install successfully, update app state to initializing", p.manager.Spec.AppName)
|
||||
updateErr := p.updateStatus(context.TODO(), p.manager, appsv1.Initializing, nil, appsv1.Initializing.String(), "")
|
||||
if updateErr != nil {
|
||||
klog.Errorf("update status failed %v", updateErr)
|
||||
return
|
||||
}
|
||||
|
||||
}
|
||||
if p.manager.Spec.Type == appsv1.Middleware {
|
||||
ok, err := ops.WaitForLaunch()
|
||||
if !ok {
|
||||
@@ -165,6 +156,16 @@ func (p *InstallingApp) Exec(ctx context.Context) (StatefulInProgressApp, error)
|
||||
return
|
||||
}
|
||||
}
|
||||
} else {
|
||||
p.finally = func() {
|
||||
klog.Infof("app %s install successfully, update app state to initializing", p.manager.Spec.AppName)
|
||||
updateErr := p.updateStatus(context.TODO(), p.manager, appsv1.Initializing, nil, appsv1.Initializing.String(), "")
|
||||
if updateErr != nil {
|
||||
klog.Errorf("update status failed %v", updateErr)
|
||||
return
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
|
||||
@@ -6,7 +6,6 @@ import (
|
||||
|
||||
appsv1 "github.com/beclab/Olares/framework/app-service/api/app.bytetrade.io/v1alpha1"
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/constants"
|
||||
appevent "github.com/beclab/Olares/framework/app-service/pkg/event"
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/helm"
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/utils"
|
||||
apputils "github.com/beclab/Olares/framework/app-service/pkg/utils/app"
|
||||
@@ -100,17 +99,6 @@ func (p *PendingApp) Exec(ctx context.Context) (StatefulInProgressApp, error) {
|
||||
klog.Error("update app manager status error, ", err, ", ", p.manager.Name)
|
||||
return err
|
||||
}
|
||||
appevent.PublishAppEventToQueue(utils.EventParams{
|
||||
Owner: p.manager.Spec.AppOwner,
|
||||
Name: p.manager.Spec.AppName,
|
||||
OpType: string(p.manager.Spec.OpType),
|
||||
OpID: p.manager.Status.OpID,
|
||||
State: appsv1.Downloading.String(),
|
||||
RawAppName: p.manager.Spec.RawAppName,
|
||||
Type: p.manager.Spec.Type.String(),
|
||||
Title: apputils.AppTitle(p.manager.Spec.Config),
|
||||
})
|
||||
|
||||
return nil
|
||||
},
|
||||
); err != nil {
|
||||
|
||||
@@ -11,15 +11,14 @@ import (
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/appcfg"
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/appinstaller"
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/appinstaller/versioned"
|
||||
appevent "github.com/beclab/Olares/framework/app-service/pkg/event"
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/middlewareinstaller"
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/utils"
|
||||
apputils "github.com/beclab/Olares/framework/app-service/pkg/utils/app"
|
||||
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
utilwait "k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/client-go/rest"
|
||||
"k8s.io/klog/v2"
|
||||
ctrl "sigs.k8s.io/controller-runtime"
|
||||
@@ -81,19 +80,6 @@ func (b *baseStatefulApp) updateStatus(ctx context.Context, am *appsv1.Applicati
|
||||
klog.Errorf("patch appmgr's %s status failed %v", am.Name, err)
|
||||
return err
|
||||
}
|
||||
appevent.PublishAppEventToQueue(utils.EventParams{
|
||||
Owner: b.manager.Spec.AppOwner,
|
||||
Name: b.manager.Spec.AppName,
|
||||
OpType: string(b.manager.Spec.OpType),
|
||||
OpID: b.manager.Status.OpID,
|
||||
State: state.String(),
|
||||
RawAppName: b.manager.Spec.RawAppName,
|
||||
Type: b.manager.Spec.Type.String(),
|
||||
Title: apputils.AppTitle(b.manager.Spec.Config),
|
||||
Reason: reason,
|
||||
Message: message,
|
||||
})
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -138,6 +124,13 @@ func (p *baseStatefulApp) forceDeleteApp(ctx context.Context) error {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// Wait for namespace to be fully deleted before updating status
|
||||
if err = p.waitForNamespaceDeleted(ctx); err != nil {
|
||||
klog.Errorf("wait for namespace %s deleted failed %v", p.manager.Spec.AppNamespace, err)
|
||||
return err
|
||||
}
|
||||
|
||||
err = p.updateStatus(ctx, p.manager, appsv1.Uninstalled, nil, appsv1.Uninstalled.String(), "")
|
||||
if err != nil {
|
||||
klog.Errorf("update app manager %s to state %s failed", p.manager.Name, appsv1.Uninstalled)
|
||||
@@ -146,6 +139,32 @@ func (p *baseStatefulApp) forceDeleteApp(ctx context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// waitForNamespaceDeleted waits for the namespace to be completely deleted
|
||||
func (p *baseStatefulApp) waitForNamespaceDeleted(ctx context.Context) error {
|
||||
namespace := p.manager.Spec.AppNamespace
|
||||
if apputils.IsProtectedNamespace(namespace) {
|
||||
return nil
|
||||
}
|
||||
|
||||
klog.Infof("waiting for namespace %s to be fully deleted", namespace)
|
||||
err := utilwait.PollImmediate(time.Second, 30*time.Minute, func() (done bool, err error) {
|
||||
var ns corev1.Namespace
|
||||
err = p.client.Get(ctx, types.NamespacedName{Name: namespace}, &ns)
|
||||
if err != nil && !apierrors.IsNotFound(err) {
|
||||
klog.Errorf("failed to get namespace %s: %v", namespace, err)
|
||||
return false, err
|
||||
}
|
||||
if apierrors.IsNotFound(err) {
|
||||
klog.Infof("namespace %s has been fully deleted", namespace)
|
||||
return true, nil
|
||||
}
|
||||
klog.Infof("namespace %s still exists, waiting...", namespace)
|
||||
return false, nil
|
||||
})
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
type OperationApp interface {
|
||||
StatefulApp
|
||||
IsTimeout() bool
|
||||
|
||||
@@ -100,7 +100,7 @@ func (p *UninstallingApp) waitForDeleteNamespace(ctx context.Context) error {
|
||||
if apputils.IsProtectedNamespace(p.manager.Spec.AppNamespace) {
|
||||
return nil
|
||||
}
|
||||
err := utilwait.PollImmediate(time.Second, 15*time.Minute, func() (done bool, err error) {
|
||||
err := utilwait.PollImmediate(time.Second, 30*time.Minute, func() (done bool, err error) {
|
||||
klog.Infof("waiting for namespace %s to be deleted", p.manager.Spec.AppNamespace)
|
||||
nsName := p.manager.Spec.AppNamespace
|
||||
var ns corev1.Namespace
|
||||
|
||||
@@ -176,6 +176,7 @@ func (p *UpgradingApp) exec(ctx context.Context) error {
|
||||
return err
|
||||
}
|
||||
appConfig.Ports = cfg.Ports
|
||||
appConfig.TailScale = cfg.TailScale
|
||||
|
||||
} else {
|
||||
_, err = apputils.GetIndexAndDownloadChart(ctx, &apputils.ConfigOptions{
|
||||
|
||||
@@ -7,6 +7,7 @@ import (
|
||||
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/utils"
|
||||
|
||||
"github.com/nats-io/nats.go"
|
||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/client-go/util/workqueue"
|
||||
@@ -18,6 +19,7 @@ var AppEventQueue *QueuedEventController
|
||||
type QueuedEventController struct {
|
||||
wq workqueue.RateLimitingInterface
|
||||
ctx context.Context
|
||||
nc *nats.Conn
|
||||
}
|
||||
|
||||
type QueueEvent struct {
|
||||
@@ -25,6 +27,17 @@ type QueueEvent struct {
|
||||
Data interface{}
|
||||
}
|
||||
|
||||
type UserEvent struct {
|
||||
Topic string `json:"topic"`
|
||||
Payload Payload `json:"payload"`
|
||||
}
|
||||
|
||||
type Payload struct {
|
||||
User string `json:"user"`
|
||||
Operator string `json:"operator"`
|
||||
Timestamp time.Time `json:"timestamp"`
|
||||
}
|
||||
|
||||
func (qe *QueuedEventController) processNextWorkItem() bool {
|
||||
obj, shutdown := qe.wq.Get()
|
||||
if shutdown {
|
||||
@@ -41,11 +54,19 @@ func (qe *QueuedEventController) process(obj interface{}) {
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
err := utils.PublishToNats(eobj.Subject, eobj.Data)
|
||||
if err != nil {
|
||||
klog.Errorf("async publish subject %s,data %v, failed %v", eobj.Subject, eobj.Data, err)
|
||||
} else {
|
||||
klog.Infof("publish event success data: %#v", eobj.Data)
|
||||
for {
|
||||
err := qe.publish(eobj.Subject, eobj.Data)
|
||||
if err == nil {
|
||||
klog.Infof("publish event success data: %#v", eobj.Data)
|
||||
return
|
||||
}
|
||||
klog.Errorf("publish subject %s, data %v failed: %v", eobj.Subject, eobj.Data, err)
|
||||
select {
|
||||
case <-qe.ctx.Done():
|
||||
return
|
||||
case <-time.After(time.Second):
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -68,9 +89,14 @@ func (qe *QueuedEventController) enqueue(obj interface{}) {
|
||||
qe.wq.Add(obj)
|
||||
}
|
||||
|
||||
func NewAppEventQueue(ctx context.Context) *QueuedEventController {
|
||||
func (qe *QueuedEventController) publish(subject string, data interface{}) error {
|
||||
return utils.PublishEvent(qe.nc, subject, data)
|
||||
}
|
||||
|
||||
func NewAppEventQueue(ctx context.Context, nc *nats.Conn) *QueuedEventController {
|
||||
return &QueuedEventController{
|
||||
ctx: ctx,
|
||||
nc: nc,
|
||||
wq: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "app-event-queue"),
|
||||
}
|
||||
}
|
||||
@@ -110,3 +136,16 @@ func PublishAppEventToQueue(p utils.EventParams) {
|
||||
|
||||
AppEventQueue.enqueue(&QueueEvent{Subject: subject, Data: data})
|
||||
}
|
||||
|
||||
func PublishUserEventToQueue(topic, user, operator string) {
|
||||
subject := "os.users"
|
||||
data := UserEvent{
|
||||
Topic: topic,
|
||||
Payload: Payload{
|
||||
User: user,
|
||||
Operator: operator,
|
||||
Timestamp: time.Now(),
|
||||
},
|
||||
}
|
||||
AppEventQueue.enqueue(&QueueEvent{Subject: subject, Data: data})
|
||||
}
|
||||
|
||||
@@ -3,7 +3,6 @@ package utils
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
@@ -12,6 +11,7 @@ import (
|
||||
"github.com/beclab/Olares/framework/app-service/api/app.bytetrade.io/v1alpha1"
|
||||
"github.com/beclab/Olares/framework/app-service/pkg/constants"
|
||||
refdocker "github.com/containerd/containerd/reference/docker"
|
||||
"github.com/pkg/errors"
|
||||
"helm.sh/helm/v3/pkg/action"
|
||||
"helm.sh/helm/v3/pkg/chart"
|
||||
helmLoader "helm.sh/helm/v3/pkg/chart/loader"
|
||||
@@ -182,7 +182,7 @@ func GetResourceListFromChart(chartPath string, values map[string]interface{}) (
|
||||
if err == io.EOF {
|
||||
return resources, nil
|
||||
}
|
||||
return nil, fmt.Errorf("error parsing")
|
||||
return nil, errors.Wrap(err, "error parsing")
|
||||
}
|
||||
ext.Raw = bytes.TrimSpace(ext.Raw)
|
||||
if len(ext.Raw) == 0 || bytes.Equal(ext.Raw, []byte("null")) {
|
||||
|
||||
@@ -47,96 +47,13 @@ type EventParams struct {
|
||||
SharedEntrances []v1alpha1.Entrance
|
||||
}
|
||||
|
||||
type UserEvent struct {
|
||||
Topic string `json:"topic"`
|
||||
Payload Payload `json:"payload"`
|
||||
func PublishEvent(nc *nats.Conn, subject string, data interface{}) error {
|
||||
return publish(nc, subject, data)
|
||||
}
|
||||
|
||||
type Payload struct {
|
||||
User string `json:"user"`
|
||||
Operator string `json:"operator"`
|
||||
Timestamp time.Time `json:"timestamp"`
|
||||
}
|
||||
|
||||
func PublishUserEvent(topic, user, operator string) {
|
||||
subject := "os.users"
|
||||
data := UserEvent{
|
||||
Topic: topic,
|
||||
Payload: Payload{
|
||||
User: user,
|
||||
Operator: operator,
|
||||
Timestamp: time.Now(),
|
||||
},
|
||||
}
|
||||
if err := publish(subject, data); err != nil {
|
||||
klog.Errorf("async publish subject %s,data %v, failed %v", subject, data, err)
|
||||
} else {
|
||||
t, _ := json.Marshal(data)
|
||||
klog.Infof("publish user event success. data: %v", string(t))
|
||||
}
|
||||
}
|
||||
|
||||
func PublishAppEvent(p EventParams) {
|
||||
subject := fmt.Sprintf("os.application.%s", p.Owner)
|
||||
|
||||
now := time.Now()
|
||||
data := Event{
|
||||
EventID: fmt.Sprintf("%s-%s-%d", p.Owner, p.Name, now.UnixMilli()),
|
||||
CreateTime: now,
|
||||
Name: p.Name,
|
||||
Type: func() string {
|
||||
if p.Type == "" {
|
||||
return "app"
|
||||
}
|
||||
return p.Type
|
||||
}(),
|
||||
OpType: p.OpType,
|
||||
OpID: p.OpID,
|
||||
State: p.State,
|
||||
Progress: p.Progress,
|
||||
User: p.Owner,
|
||||
RawAppName: func() string {
|
||||
if p.RawAppName == "" {
|
||||
return p.Name
|
||||
}
|
||||
return p.RawAppName
|
||||
}(),
|
||||
Title: p.Title,
|
||||
Reason: p.Reason,
|
||||
Message: p.Message,
|
||||
}
|
||||
if len(p.EntranceStatuses) > 0 {
|
||||
data.EntranceStatuses = p.EntranceStatuses
|
||||
}
|
||||
|
||||
if err := publish(subject, data); err != nil {
|
||||
klog.Errorf("async publish subject %s,data %v, failed %v", subject, data, err)
|
||||
} else {
|
||||
klog.Infof("publish event success data: %#v", data)
|
||||
}
|
||||
}
|
||||
|
||||
func PublishToNats(subject string, data interface{}) error {
|
||||
return publish(subject, data)
|
||||
}
|
||||
|
||||
func publish(subject string, data interface{}) error {
|
||||
natsHost := os.Getenv("NATS_HOST")
|
||||
natsPort := os.Getenv("NATS_PORT")
|
||||
|
||||
username := os.Getenv("NATS_USERNAME")
|
||||
password := os.Getenv("NATS_PASSWORD")
|
||||
|
||||
natsURL := fmt.Sprintf("nats://%s:%s", natsHost, natsPort)
|
||||
nc, err := nats.Connect(natsURL, nats.UserInfo(username, password))
|
||||
if err != nil {
|
||||
klog.Infof("connect error: err=%v", err)
|
||||
return err
|
||||
}
|
||||
defer nc.Drain()
|
||||
func publish(nc *nats.Conn, subject string, data interface{}) error {
|
||||
d, err := json.Marshal(data)
|
||||
if err != nil {
|
||||
klog.Errorf("marshal failed: %v", err)
|
||||
return err
|
||||
}
|
||||
err = nc.Publish(subject, d)
|
||||
@@ -147,34 +64,38 @@ func publish(subject string, data interface{}) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func PublishMiddlewareEvent(owner, name, opType, opID, state, progress string, entranceStatuses []v1alpha1.EntranceStatus, rawAppName string) {
|
||||
subject := fmt.Sprintf("os.application.%s", owner)
|
||||
func NewNatsConn() (*nats.Conn, error) {
|
||||
natsHost := os.Getenv("NATS_HOST")
|
||||
natsPort := os.Getenv("NATS_PORT")
|
||||
username := os.Getenv("NATS_USERNAME")
|
||||
password := os.Getenv("NATS_PASSWORD")
|
||||
|
||||
now := time.Now()
|
||||
data := Event{
|
||||
EventID: fmt.Sprintf("%s-%s-%d", owner, name, now.UnixMilli()),
|
||||
CreateTime: now,
|
||||
Name: name,
|
||||
Type: "middleware",
|
||||
OpType: opType,
|
||||
OpID: opID,
|
||||
State: state,
|
||||
Progress: progress,
|
||||
User: owner,
|
||||
RawAppName: func() string {
|
||||
if rawAppName == "" {
|
||||
return name
|
||||
natsURL := fmt.Sprintf("nats://%s:%s", natsHost, natsPort)
|
||||
|
||||
opts := []nats.Option{
|
||||
nats.UserInfo(username, password),
|
||||
nats.MaxReconnects(-1),
|
||||
nats.ReconnectWait(2 * time.Second),
|
||||
nats.DisconnectErrHandler(func(nc *nats.Conn, err error) {
|
||||
if err != nil {
|
||||
klog.Warningf("NATS disconnected: %v, will attempt to reconnect", err)
|
||||
} else {
|
||||
klog.Infof("NATS disconnected, will attempt to reconnect")
|
||||
}
|
||||
return rawAppName
|
||||
}(),
|
||||
}
|
||||
if len(entranceStatuses) > 0 {
|
||||
data.EntranceStatuses = entranceStatuses
|
||||
}),
|
||||
nats.ReconnectHandler(func(nc *nats.Conn) {
|
||||
klog.Infof("NATS reconnected to %s", nc.ConnectedUrl())
|
||||
}),
|
||||
nats.ClosedHandler(func(nc *nats.Conn) {
|
||||
klog.Errorf("NATS connection closed permanently: %v", nc.LastError())
|
||||
}),
|
||||
}
|
||||
|
||||
if err := publish(subject, data); err != nil {
|
||||
klog.Errorf("async publish subject %s,data %v, failed %v", subject, data, err)
|
||||
} else {
|
||||
klog.Infof("publish event success data: %#v", data)
|
||||
nc, err := nats.Connect(natsURL, opts...)
|
||||
if err != nil {
|
||||
klog.Errorf("failed to connect to NATS: %v", err)
|
||||
return nil, err
|
||||
}
|
||||
klog.Infof("connected to NATS at %s", natsURL)
|
||||
return nc, nil
|
||||
}
|
||||
|
||||
@@ -29,7 +29,7 @@ spec:
|
||||
name: check-auth
|
||||
containers:
|
||||
- name: auth-front
|
||||
image: beclab/login:v1.6.30
|
||||
image: beclab/login:v1.6.38
|
||||
imagePullPolicy: IfNotPresent
|
||||
ports:
|
||||
- containerPort: 80
|
||||
|
||||
@@ -210,7 +210,7 @@ spec:
|
||||
command:
|
||||
- /samba_share
|
||||
- name: files
|
||||
image: beclab/files-server:v0.2.144
|
||||
image: beclab/files-server:v0.2.145
|
||||
imagePullPolicy: IfNotPresent
|
||||
securityContext:
|
||||
allowPrivilegeEscalation: true
|
||||
|
||||
@@ -226,7 +226,7 @@ spec:
|
||||
spec:
|
||||
initContainers:
|
||||
- name: seahub-init
|
||||
image: beclab/seahub-init:v0.0.5
|
||||
image: beclab/seahub-init:v0.0.6
|
||||
env:
|
||||
- name: DB_HOST
|
||||
value: citus-headless.os-platform
|
||||
@@ -248,7 +248,7 @@ spec:
|
||||
|
||||
containers:
|
||||
- name: seafile-server
|
||||
image: beclab/pg_seafile_server:v0.0.17
|
||||
image: beclab/pg_seafile_server:v0.0.18
|
||||
imagePullPolicy: IfNotPresent
|
||||
ports:
|
||||
- containerPort: 8082
|
||||
|
||||
@@ -4,7 +4,7 @@ nameOverride: ""
|
||||
fullnameOverride: ""
|
||||
namespaceOverride: ""
|
||||
imagePullSecrets: []
|
||||
version: "v2.6.6"
|
||||
version: "v2.6.7"
|
||||
|
||||
# Nvidia GPU Parameters
|
||||
resourceName: "nvidia.com/gpu"
|
||||
|
||||
@@ -3,7 +3,7 @@ target: prebuilt
|
||||
output:
|
||||
containers:
|
||||
-
|
||||
name: beclab/hami:v2.6.6
|
||||
name: beclab/hami:v2.6.7
|
||||
-
|
||||
name: beclab/hami-webui-fe-oss:v1.0.8
|
||||
-
|
||||
|
||||
Reference in New Issue
Block a user