Compare commits

...

2 Commits

Author SHA1 Message Date
eball
933a093ce4 fix: admin user not found 2025-06-30 14:50:26 +08:00
eball
188028c186 feat: refresh user expiring certs 2025-06-30 14:13:44 +08:00
7 changed files with 182 additions and 12 deletions

View File

@@ -14,6 +14,7 @@ import (
"github.com/beclab/Olares/daemon/internel/ble"
"github.com/beclab/Olares/daemon/internel/mdns"
"github.com/beclab/Olares/daemon/internel/watcher"
"github.com/beclab/Olares/daemon/internel/watcher/cert"
"github.com/beclab/Olares/daemon/internel/watcher/system"
"github.com/beclab/Olares/daemon/internel/watcher/upgrade"
"github.com/beclab/Olares/daemon/internel/watcher/usb"
@@ -96,6 +97,7 @@ func main() {
// usb.NewUsbWatcher(),
usb.NewUmountWatcher(),
upgrade.NewUpgradeWatcher(),
cert.NewCertWatcher(),
}, func() {
if s != nil {
if err := s.Restart(); err != nil {

View File

@@ -89,11 +89,11 @@ func (s *server) Restart() error {
}
if s.registeredIP != ip {
s.registeredIP = ip
if s.server != nil {
s.Close()
}
s.registeredIP = ip
instanceName := s.name
if instanceName == "" {
instanceName = hostname

View File

@@ -0,0 +1,141 @@
package cert
import (
"context"
"fmt"
"time"
"github.com/beclab/Olares/daemon/internel/watcher"
"github.com/beclab/Olares/daemon/pkg/cluster/state"
"github.com/beclab/Olares/daemon/pkg/utils"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
kubeErr "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/klog/v2"
"k8s.io/utils/ptr"
)
var _ watcher.Watcher = &userCertWatcher{}
type userCertWatcher struct {
}
func NewCertWatcher() *userCertWatcher {
return &userCertWatcher{}
}
// Watch implements watcher.Watcher.
func (u *userCertWatcher) Watch(ctx context.Context) {
if state.CurrentState.TerminusState != state.TerminusRunning {
return
}
kubeClient, err := utils.GetKubeClient()
if err != nil {
klog.Error("failed to get kube client, ", err)
return
}
dynamicClient, err := utils.GetDynamicClient()
if err != nil {
klog.Error("failed to get dynamic client, ", err)
return
}
users, err := utils.ListUsers(ctx, dynamicClient)
if err != nil {
klog.Error("failed to list users, ", err)
return
}
for _, user := range users {
namespace := fmt.Sprintf("user-space-%s", user.GetName())
config, err := kubeClient.CoreV1().ConfigMaps(namespace).Get(ctx, "zone-ssl-config", metav1.GetOptions{})
if err != nil {
klog.Error("failed to get user config map, ", err, ", namespace: ", namespace)
continue
}
if expired, ok := config.Data["expired_at"]; ok {
expiredTime, err := time.Parse("2006-01-02T15:04:05Z", expired)
if err != nil {
klog.Error("failed to parse expired_at, ", err)
continue
}
// Check if the certificate will expire within 10 days
if expiredTime.Before(time.Now().Add(10 * 24 * time.Hour)) {
klog.Info("user cert expired, ", user.GetName())
err = createOrUpdateJob(ctx, kubeClient, namespace)
if err != nil {
klog.Error("failed to create or update job for user cert, ", err, ", namespace: ", namespace)
} else {
klog.Info("job created for user cert download, ", user.GetName(), ", namespace: ", namespace)
}
}
}
}
}
func createOrUpdateJob(ctx context.Context, kubeClient kubernetes.Interface, namespace string) error {
currentJob, err := kubeClient.BatchV1().Jobs(namespace).Get(ctx, jobDownloadUserCert.Name, metav1.GetOptions{})
if err != nil {
if kubeErr.IsNotFound(err) {
// Create the job if it does not exist
} else {
return fmt.Errorf("failed to get job: %w", err)
}
} else {
// check the existing job
if currentJob.Status.Succeeded == 0 || currentJob.Status.Failed > 0 {
klog.Info("job is still running, skip creating a new one")
return nil
}
// If the job exists and has completed, delete it before creating a new one
klog.Info("delete existing job: ", currentJob.Name)
err = kubeClient.BatchV1().Jobs(namespace).Delete(ctx, currentJob.Name, metav1.DeleteOptions{})
if err != nil {
return fmt.Errorf("failed to delete job: %w", err)
}
}
job := jobDownloadUserCert.DeepCopy()
job.Namespace = namespace
_, err = kubeClient.BatchV1().Jobs(job.Namespace).Create(ctx, job, metav1.CreateOptions{})
if err != nil {
return fmt.Errorf("failed to create job: %w", err)
}
klog.Info("Job created: ", job.Name)
return nil
}
var jobDownloadUserCert = batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
Name: "download-user-cert",
},
Spec: batchv1.JobSpec{
BackoffLimit: ptr.To[int32](5),
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
RestartPolicy: corev1.RestartPolicyOnFailure,
Containers: []corev1.Container{
{
Name: "download-user-cert",
Image: "busybox:1.28",
Command: []string{"wget",
"--header",
"X-FROM-CRONJOB: true",
"-qSO -",
"http://bfl.user-space-pengpeng9/bfl/backend/v1/re-download-cert",
},
},
},
},
},
},
}

View File

@@ -63,6 +63,11 @@ func (i *collectLogs) Execute(ctx context.Context, p any) (res any, err error) {
return
}
if adminUser == nil {
errStr = "admin user not found"
return
}
hostPath, err := utils.GetUserspacePvcHostPath(ctx, adminUser.GetName(), kubeClient)
if err != nil {
errStr = fmt.Sprintf("get admin user host path error, %v", err)

View File

@@ -89,7 +89,7 @@ func detectdStorageDevices(ctx context.Context, bus string) (usbDevs []storageDe
token := strings.Split(syspath, "/")
devPath := filepath.Join("/dev", token[len(token)-1])
klog.Info("device path:", device.Properties())
klog.V(8).Info("device path:", device.Properties())
vender := device.Properties()["ID_VENDOR"]
if vender == "" {
vender = device.Properties()["ID_USB_VENDOR"]
@@ -162,7 +162,7 @@ func getMountedPath(devs []storageDevice) ([]string, error) {
var paths []string
for _, m := range list {
if slices.ContainsFunc(devs, func(u storageDevice) bool { return u.DevPath == m.Device }) {
klog.Infof("mount: %v, %v, %v", m.Path, m.Device, devs)
klog.V(8).Infof("mount: %v, %v, %v", m.Path, m.Device, devs)
paths = append(paths, m.Path)
}
}

View File

@@ -278,25 +278,48 @@ func GetAdminUserTerminusName(ctx context.Context, client dynamic.Interface) (st
}
type Filter func(u *unstructured.Unstructured) bool
func GetAdminUser(ctx context.Context, client dynamic.Interface) (*unstructured.Unstructured, error) {
u, err := ListUsers(ctx, client, func(u *unstructured.Unstructured) bool {
role, ok := u.GetAnnotations()[bflconst.UserAnnotationOwnerRole]
if !ok {
return false
}
return role == bflconst.RolePlatformAdmin
})
if err != nil {
klog.Error("list user error, ", err)
return nil, err
}
if len(u) == 0 {
klog.Info("admin user not found")
return nil, nil
}
return u[0], nil
}
func ListUsers(ctx context.Context, client dynamic.Interface, filters ...Filter) ([]*unstructured.Unstructured, error) {
users, err := client.Resource(UserGVR).List(ctx, metav1.ListOptions{})
if err != nil {
klog.Error("list user error, ", err)
return nil, err
}
var userList []*unstructured.Unstructured
for _, u := range users.Items {
role, ok := u.GetAnnotations()[bflconst.UserAnnotationOwnerRole]
if !ok {
continue
for _, filter := range filters {
if !filter(&u) {
continue
}
}
if role == bflconst.RolePlatformAdmin {
return &u, nil
}
userList = append(userList, &u)
}
return nil, nil
return userList, nil
}
func isKeyPod(pod *corev1.Pod) bool {

View File

@@ -111,7 +111,6 @@ func ManagedAllDevices(ctx context.Context) (map[string]Device, error) {
cmd := exec.CommandContext(ctx, nmcli, "device", "set", d.Name, "managed", "yes")
cmd.Env = os.Environ()
output, err := cmd.CombinedOutput()
klog.Info(string(output))
if err != nil {
klog.Error("exec cmd error, ", err, ", nmcli device set ", d.Name, " managed yes")
return false
@@ -288,7 +287,7 @@ func showDeviceByNM(ctx context.Context, deviceName string, device *Device) erro
case "GENERAL.CONNECTION":
err := showConnectionByNM(ctx, value, device)
if err != nil {
klog.Error("get connection method error, ", err, ", connection name: ", value)
klog.V(8).Info("get connection method error, ", err, ", connection name: ", value)
}
default:
continue