package command import ( "context" "encoding/json" "fmt" "io/fs" "os" "path/filepath" "strconv" "strings" "time" "github.com/olekukonko/tablewriter" "github.com/shamaton/msgpack/v2" "github.com/urfave/cli/v2" userpb "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1" "github.com/owncloud/ocis/v2/ocis-pkg/config/configlog" "github.com/owncloud/ocis/v2/services/storage-users/pkg/config" "github.com/owncloud/ocis/v2/services/storage-users/pkg/config/parser" "github.com/owncloud/ocis/v2/services/storage-users/pkg/event" "github.com/owncloud/ocis/v2/services/storage-users/pkg/revaconfig" "github.com/owncloud/reva/v2/pkg/events" "github.com/owncloud/reva/v2/pkg/storage" "github.com/owncloud/reva/v2/pkg/storage/fs/registry" "github.com/owncloud/reva/v2/pkg/storage/utils/decomposedfs/lookup" "github.com/owncloud/reva/v2/pkg/storage/utils/decomposedfs/node" "github.com/owncloud/reva/v2/pkg/utils" ) const ( MSGPACK_KEY_USER_OCIS_NODESTATUS = "user.ocis.nodestatus" // Log indentation levels LOG_INDENT_L1 = " " // 2 spaces LOG_INDENT_L2 = LOG_INDENT_L1 + LOG_INDENT_L1 LOG_INDENT_L3 = LOG_INDENT_L2 + LOG_INDENT_L1 ) // Session contains the information of an upload session type Session struct { ID string `json:"id"` Space string `json:"space"` Filename string `json:"filename"` Offset int64 `json:"offset"` Size int64 `json:"size"` Executant userpb.UserId `json:"executant"` SpaceOwner *userpb.UserId `json:"spaceowner,omitempty"` Expires time.Time `json:"expires"` Processing bool `json:"processing"` ScanDate time.Time `json:"virus_scan_date"` ScanResult string `json:"virus_scan_result"` } // Uploads is the entry point for the uploads command func Uploads(cfg *config.Config) *cli.Command { return &cli.Command{ Name: "uploads", Usage: "manage unfinished uploads", Subcommands: []*cli.Command{ ListUploadSessions(cfg), DeleteStaleProcessingNodes(cfg), }, } } // ListUploadSessions prints a list of upload sessiens func ListUploadSessions(cfg *config.Config) *cli.Command { return &cli.Command{ Name: "sessions", Usage: "Print a list of upload sessions", Flags: []cli.Flag{ &cli.StringFlag{ Name: "id", DefaultText: "unset", Usage: "filter sessions by upload session id", }, &cli.BoolFlag{ Name: "processing", DefaultText: "unset", Usage: "filter sessions by processing status", }, &cli.BoolFlag{ Name: "expired", DefaultText: "unset", Usage: "filter sessions by expired status", }, &cli.BoolFlag{ Name: "has-virus", DefaultText: "unset", Usage: "filter sessions by virus scan result", }, &cli.BoolFlag{ Name: "json", Usage: "output as json", }, &cli.BoolFlag{ Name: "restart", Usage: "send restart event for all listed sessions. Only one of resume/restart/clean can be set.", }, &cli.BoolFlag{ Name: "resume", Usage: "send resume event for all listed sessions. Only one of resume/restart/clean can be set.", }, &cli.BoolFlag{ Name: "clean", Usage: "remove uploads for all listed sessions. Only one of resume/restart/clean can be set.", }, }, Before: func(c *cli.Context) error { return configlog.ReturnFatal(parser.ParseConfig(cfg)) }, Action: func(c *cli.Context) error { f, ok := registry.NewFuncs[cfg.Driver] if !ok { fmt.Fprintf(os.Stderr, "Unknown filesystem driver '%s'\n", cfg.Driver) os.Exit(1) } drivers := revaconfig.StorageProviderDrivers(cfg) fs, err := f(drivers[cfg.Driver].(map[string]interface{}), nil, nil) if err != nil { fmt.Fprintf(os.Stderr, "Failed to initialize filesystem driver '%s'\n", cfg.Driver) return err } managingFS, ok := fs.(storage.UploadSessionLister) if !ok { fmt.Fprintf(os.Stderr, "'%s' storage does not support listing upload sessions\n", cfg.Driver) os.Exit(1) } var stream events.Stream if c.Bool("restart") || c.Bool("resume") { stream, err = event.NewStream(cfg) if err != nil { fmt.Fprintf(os.Stderr, "Failed to create event stream: %v\n", err) os.Exit(1) } } filter := buildFilter(c) uploads, err := managingFS.ListUploadSessions(c.Context, filter) if err != nil { return err } var ( table *tablewriter.Table raw []Session ) if !c.Bool("json") { fmt.Println(buildInfo(filter)) table = tablewriter.NewTable(os.Stdout) table.Header("Space", "Upload Id", "Name", "Offset", "Size", "Executant", "Owner", "Expires", "Processing", "Scan Date", "Scan Result") } for _, u := range uploads { ref := u.Reference() sr, sd := u.ScanData() session := Session{ Space: ref.GetResourceId().GetSpaceId(), ID: u.ID(), Filename: u.Filename(), Offset: u.Offset(), Size: u.Size(), Executant: u.Executant(), SpaceOwner: u.SpaceOwner(), Expires: u.Expires(), Processing: u.IsProcessing(), ScanDate: sd, ScanResult: sr, } if c.Bool("json") { raw = append(raw, session) } else { table.Append([]string{ session.Space, session.ID, session.Filename, strconv.FormatInt(session.Offset, 10), strconv.FormatInt(session.Size, 10), session.Executant.OpaqueId, session.SpaceOwner.GetOpaqueId(), session.Expires.Format(time.RFC3339), strconv.FormatBool(session.Processing), session.ScanDate.Format(time.RFC3339), session.ScanResult, }) } switch { case c.Bool("restart"): if err := events.Publish(context.Background(), stream, events.RestartPostprocessing{ UploadID: u.ID(), Timestamp: utils.TSNow(), }); err != nil { fmt.Fprintf(os.Stderr, "Failed to send restart event for upload session '%s'\n", u.ID()) // if publishing fails there is no need to try publishing other events - they will fail too. os.Exit(1) } case c.Bool("resume"): if err := events.Publish(context.Background(), stream, events.ResumePostprocessing{ UploadID: u.ID(), Timestamp: utils.TSNow(), }); err != nil { fmt.Fprintf(os.Stderr, "Failed to send resume event for upload session '%s'\n", u.ID()) // if publishing fails there is no need to try publishing other events - they will fail too. os.Exit(1) } case c.Bool("clean"): if err := u.Purge(c.Context); err != nil { fmt.Fprintf(os.Stderr, "Failed to clean upload session '%s'\n", u.ID()) } } } if !c.Bool("json") { table.Render() return nil } j, err := json.Marshal(raw) if err != nil { fmt.Println(err) return err } fmt.Println(string(j)) return nil }, } } func buildFilter(c *cli.Context) storage.UploadSessionFilter { filter := storage.UploadSessionFilter{} if c.IsSet("processing") { processingValue := c.Bool("processing") filter.Processing = &processingValue } if c.IsSet("expired") { expiredValue := c.Bool("expired") filter.Expired = &expiredValue } if c.IsSet("has-virus") { infectedValue := c.Bool("has-virus") filter.HasVirus = &infectedValue } if c.IsSet("id") { idValue := c.String("id") filter.ID = &idValue } return filter } func buildInfo(filter storage.UploadSessionFilter) string { var b strings.Builder if filter.Processing != nil { if !*filter.Processing { b.WriteString("Not ") } if b.Len() == 0 { b.WriteString("Processing") } else { b.WriteString("processing") } } if filter.Expired != nil { if b.Len() != 0 { b.WriteString(", ") } if !*filter.Expired { if b.Len() == 0 { b.WriteString("Not ") } else { b.WriteString("not ") } } if b.Len() == 0 { b.WriteString("Expired") } else { b.WriteString("expired") } } if filter.HasVirus != nil { if b.Len() != 0 { b.WriteString(", ") } if !*filter.HasVirus { if b.Len() == 0 { b.WriteString("Not ") } else { b.WriteString("not ") } } if b.Len() == 0 { b.WriteString("Virusinfected") } else { b.WriteString("virusinfected") } } if b.Len() == 0 { b.WriteString("Session") } else { b.WriteString(" session") } if filter.ID != nil { b.WriteString(" with id '" + *filter.ID + "'") } else { // to make `session` plural b.WriteString("s") } b.WriteString(":") return b.String() } // DeleteStaleProcessingNodes is the entry point for the delete-stale-nodes command func DeleteStaleProcessingNodes(cfg *config.Config) *cli.Command { return &cli.Command{ Name: "delete-stale-nodes", Usage: "Delete all nodes in processing state that are not referenced by any upload session", Flags: []cli.Flag{ &cli.StringFlag{ Name: "spaceid", Usage: "Space ID to check for processing nodes (omit to check all spaces)", Required: false, }, &cli.BoolFlag{ Name: "dry-run", Usage: "Only show what would be deleted without actually deleting", Value: true, }, &cli.BoolFlag{ Name: "verbose", Usage: "Enable verbose logging", Value: false, }, }, Before: func(c *cli.Context) error { return configlog.ReturnFatal(parser.ParseConfig(cfg)) }, Action: func(ctx *cli.Context) error { spaceIDs := []string{} dryRun := ctx.Bool("dry-run") verbose := ctx.Bool("verbose") start := time.Now() // Check if specific space ID provided if ctx.IsSet("spaceid") { spaceIDs = append(spaceIDs, ctx.String("spaceid")) } else { fmt.Println("Scanning all spaces for stale processing nodes...") spaceIDs = globSpaceIDs(cfg) } if verbose { fmt.Printf("Spaces to cleanup: %d\n", len(spaceIDs)) for _, spaceID := range spaceIDs { fmt.Printf(" - %s\n", spaceID) } } staleCount := 0 for _, spaceID := range spaceIDs { staleCount += deleteStaleUploads(cfg, spaceID, dryRun, verbose) } if verbose { fmt.Printf("Took %ds\n", int(time.Since(start).Seconds())) } fmt.Printf("Total stale nodes: %d\n", staleCount) return nil }, } } // globSpaceIDs returns a list of all space IDs in the storage root func globSpaceIDs(cfg *config.Config) []string { fsys := os.DirFS(cfg.Drivers.OCIS.Root) dirs, err := fs.Glob(fsys, "spaces/*/*/nodes") if err != nil { fmt.Fprintf(os.Stderr, "Error globbing spaces root directory %s: %v\n", cfg.Drivers.OCIS.Root, err) return []string{} } spaceIDs := []string{} for _, dir := range dirs { // For dir i.e. spaces/9d/408cec-8f0a-4d33-8715-89df1217a10c/nodes // spaceID is 9d408cec-8f0a-4d33-8715-89df1217a10c spaceIDs = append(spaceIDs, strings.ReplaceAll(strings.TrimSuffix(strings.TrimPrefix(dir, "spaces/"), "/nodes"), "/", "")) } return spaceIDs } // delete stale processing nodes for a given spaceID func deleteStaleUploads(cfg *config.Config, spaceID string, dryRun bool, verbose bool) int { if verbose { fmt.Printf("\nDeleting stale processing nodes for space: %s\n", spaceID) } // Find .mpk files in space directory spaceRoot := filepath.Join(cfg.Drivers.OCIS.Root, "spaces", lookup.Pathify(spaceID, 1, 2)) mpkFiles := []string{} err := filepath.Walk(spaceRoot, func(path string, info os.FileInfo, err error) error { if err != nil { fmt.Fprintf(os.Stderr, "Error accessing path %s: %s\n", path, err) return filepath.SkipDir } if !info.IsDir() && strings.HasSuffix(path, ".mpk") { mpkFiles = append(mpkFiles, path) } return nil }) if err != nil { fmt.Fprintf(os.Stderr, "Error walking space directory %s: %s\n", spaceRoot, err) return 0 } if verbose { fmt.Printf("%sFound total %d .mpk files\n", LOG_INDENT_L1, len(mpkFiles)) } staleCount := 0 for _, path := range mpkFiles { staleCount += deleteStaleNode(cfg, path, dryRun, verbose) } if verbose { fmt.Printf("%sFound total %d stale nodes\n", LOG_INDENT_L1, staleCount) } return staleCount } // deleteStaleNode deletes a stale node: if it is not referenced by any upload session // returns 1 if the node stale node was detected for deletion, 0 otherwise, for counting purposes func deleteStaleNode(cfg *config.Config, path string, dryRun bool, verbose bool) int { nodeDir := filepath.Dir(path) // Read .mpk file to get processing info b, err := os.ReadFile(path) if err != nil { fmt.Fprintf(os.Stderr, "Error reading file %s: %s\n", path, err) return 0 } var mpkData map[string]interface{} if err := msgpack.Unmarshal(b, &mpkData); err != nil { fmt.Fprintf(os.Stderr, "Error unmarshaling file %s: %s\n", path, err) return 0 } processingID := extractProcessingID(mpkData) if processingID == "" { return 0 } // Construct path to upload info file: // i.e. ~/.ocis/storage/users/uploads/5329c14b-b786-4b27-8f7d-7429f03009d7.info // And pass only the .info file not exists: err is ErrNotExist pathUploadInfo := filepath.Join(cfg.Drivers.OCIS.Root, "uploads", processingID) + ".info" _, infoStatErr := os.Stat(pathUploadInfo) if infoStatErr == nil { return 0 } if !os.IsNotExist(infoStatErr) { // Tere was an error other than file not existing, log and return fmt.Fprintf(os.Stderr, "Error checking upload info %s: %s\n", pathUploadInfo, infoStatErr) return 0 } if verbose { fmt.Printf("%sFound stale upload at %s (Processing ID: %s)\n", LOG_INDENT_L1, path, processingID) fmt.Printf("%sUpload info missing at: %s\n", LOG_INDENT_L2, pathUploadInfo) } if dryRun { return 1 } if err := os.RemoveAll(nodeDir); err != nil { fmt.Fprintf(os.Stderr, "%sError deleting stale node %s: %v\n", LOG_INDENT_L2, nodeDir, err) return 0 } if verbose { fmt.Printf("%sDeleted stale node: %s\n", LOG_INDENT_L2, nodeDir) } return 1 } func extractProcessingID(mpkData map[string]interface{}) string { processingID := "" for k, v := range mpkData { vStr := string(v.([]byte)) if k == MSGPACK_KEY_USER_OCIS_NODESTATUS && strings.Contains(vStr, node.ProcessingStatus) { processingID = strings.Split(vStr, ":")[1] break } } return processingID }