mirror of
https://github.com/owncloud/ocis
synced 2026-04-25 17:25:21 +02:00
graph concurrent share listing
Signed-off-by: Jörn Friedrich Dreyer <jfd@butonic.de>
This commit is contained in:
@@ -17,6 +17,7 @@ import (
|
||||
storageprovider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
|
||||
types "github.com/cs3org/go-cs3apis/cs3/types/v1beta1"
|
||||
libregraph "github.com/owncloud/libre-graph-api-go"
|
||||
"golang.org/x/sync/errgroup"
|
||||
"google.golang.org/protobuf/types/known/fieldmaskpb"
|
||||
|
||||
"github.com/cs3org/reva/v2/pkg/rgrpc/todo/pool"
|
||||
@@ -350,77 +351,204 @@ func (g BaseGraphService) listPublicShares(ctx context.Context, filters []*link.
|
||||
}
|
||||
|
||||
func (g BaseGraphService) cs3UserSharesToDriveItems(ctx context.Context, shares []*collaboration.Share, driveItems driveItemsByResourceID) (driveItemsByResourceID, error) {
|
||||
for _, s := range shares {
|
||||
g.logger.Debug().Interface("CS3 UserShare", s).Msg("Got Share")
|
||||
resIDStr := storagespace.FormatResourceID(s.ResourceId)
|
||||
item, ok := driveItems[resIDStr]
|
||||
if !ok {
|
||||
itemptr, err := g.getDriveItem(ctx, &storageprovider.Reference{ResourceId: s.ResourceId})
|
||||
if err != nil {
|
||||
g.logger.Debug().Err(err).Interface("Share", s.ResourceId).Msg("could not stat share, skipping")
|
||||
continue
|
||||
}
|
||||
item = *itemptr
|
||||
}
|
||||
errg, ctx := errgroup.WithContext(ctx)
|
||||
|
||||
var condition string
|
||||
switch {
|
||||
case item.Root != nil:
|
||||
condition = unifiedrole.UnifiedRoleConditionDrive
|
||||
case item.Folder != nil:
|
||||
condition = unifiedrole.UnifiedRoleConditionFolder
|
||||
case item.File != nil:
|
||||
condition = unifiedrole.UnifiedRoleConditionFile
|
||||
}
|
||||
perm, err := g.cs3UserShareToPermission(ctx, s, condition)
|
||||
|
||||
var errcode errorcode.Error
|
||||
switch {
|
||||
case errors.As(err, &errcode) && errcode.GetCode() == errorcode.ItemNotFound:
|
||||
// The Grantee couldn't be found (user/group does not exist anymore)
|
||||
continue
|
||||
case err != nil:
|
||||
return driveItems, err
|
||||
}
|
||||
item.Permissions = append(item.Permissions, *perm)
|
||||
driveItems[resIDStr] = item
|
||||
// group shares by resource id
|
||||
sharesByResource := make(map[string][]*collaboration.Share)
|
||||
for _, share := range shares {
|
||||
sharesByResource[share.GetResourceId().String()] = append(sharesByResource[share.GetResourceId().String()], share)
|
||||
}
|
||||
|
||||
type resourceShares struct {
|
||||
ResourceID *storageprovider.ResourceId
|
||||
Shares []*collaboration.Share
|
||||
}
|
||||
|
||||
work := make(chan resourceShares, len(shares))
|
||||
results := make(chan *libregraph.DriveItem, len(shares))
|
||||
|
||||
// Distribute work
|
||||
errg.Go(func() error {
|
||||
defer close(work)
|
||||
|
||||
for _, shares := range sharesByResource {
|
||||
select {
|
||||
case work <- resourceShares{ResourceID: shares[0].GetResourceId(), Shares: shares}:
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
// Spawn workers that'll concurrently work the queue
|
||||
numWorkers := g.config.MaxConcurrency
|
||||
if len(sharesByResource) < numWorkers {
|
||||
numWorkers = len(sharesByResource)
|
||||
}
|
||||
for i := 0; i < numWorkers; i++ {
|
||||
errg.Go(func() error {
|
||||
for sharesByResource := range work {
|
||||
resIDStr := storagespace.FormatResourceID(sharesByResource.ResourceID)
|
||||
// check if we already have the drive item in the map
|
||||
item, ok := driveItems[resIDStr]
|
||||
if !ok {
|
||||
itemptr, err := g.getDriveItem(ctx, &storageprovider.Reference{ResourceId: sharesByResource.ResourceID})
|
||||
if err != nil {
|
||||
g.logger.Debug().Err(err).Str("storage", sharesByResource.ResourceID.StorageId).Str("space", sharesByResource.ResourceID.SpaceId).Str("node", sharesByResource.ResourceID.OpaqueId).Msg("could not stat resource, skipping")
|
||||
continue
|
||||
}
|
||||
item = *itemptr
|
||||
}
|
||||
|
||||
var condition string
|
||||
switch {
|
||||
case item.Root != nil:
|
||||
condition = unifiedrole.UnifiedRoleConditionDrive
|
||||
case item.Folder != nil:
|
||||
condition = unifiedrole.UnifiedRoleConditionFolder
|
||||
case item.File != nil:
|
||||
condition = unifiedrole.UnifiedRoleConditionFile
|
||||
}
|
||||
for _, share := range sharesByResource.Shares {
|
||||
perm, err := g.cs3UserShareToPermission(ctx, share, condition)
|
||||
var errcode errorcode.Error
|
||||
switch {
|
||||
case errors.As(err, &errcode) && errcode.GetCode() == errorcode.ItemNotFound:
|
||||
// The Grantee couldn't be found (user/group does not exist anymore)
|
||||
continue
|
||||
case err != nil:
|
||||
return err
|
||||
}
|
||||
item.Permissions = append(item.Permissions, *perm)
|
||||
}
|
||||
if len(item.Permissions) == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
select {
|
||||
case results <- &item:
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
// Wait for things to settle down, then close results chan
|
||||
go func() {
|
||||
_ = errg.Wait() // error is checked later
|
||||
close(results)
|
||||
}()
|
||||
|
||||
for item := range results {
|
||||
driveItems[item.GetId()] = *item
|
||||
}
|
||||
|
||||
if err := errg.Wait(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return driveItems, nil
|
||||
}
|
||||
|
||||
func (g BaseGraphService) cs3OCMSharesToDriveItems(ctx context.Context, shares []*ocm.Share, driveItems driveItemsByResourceID) (driveItemsByResourceID, error) {
|
||||
for _, s := range shares {
|
||||
g.logger.Debug().Interface("CS3 OCMShare", s).Msg("Got Share")
|
||||
resIDStr := storagespace.FormatResourceID(s.ResourceId)
|
||||
item, ok := driveItems[resIDStr]
|
||||
if !ok {
|
||||
itemptr, err := g.getDriveItem(ctx, &storageprovider.Reference{ResourceId: s.ResourceId})
|
||||
if err != nil {
|
||||
g.logger.Debug().Err(err).Interface("Share", s.ResourceId).Msg("could not stat ocm share, skipping")
|
||||
continue
|
||||
}
|
||||
item = *itemptr
|
||||
}
|
||||
errg, ctx := errgroup.WithContext(ctx)
|
||||
|
||||
var condition string
|
||||
switch {
|
||||
case item.Folder != nil:
|
||||
condition = unifiedrole.UnifiedRoleConditionFolderFederatedUser
|
||||
case item.File != nil:
|
||||
condition = unifiedrole.UnifiedRoleConditionFileFederatedUser
|
||||
}
|
||||
perm, err := g.cs3OCMShareToPermission(ctx, s, condition)
|
||||
|
||||
var errcode errorcode.Error
|
||||
switch {
|
||||
case errors.As(err, &errcode) && errcode.GetCode() == errorcode.ItemNotFound:
|
||||
// The Grantee couldn't be found (user/group does not exist anymore)
|
||||
continue
|
||||
case err != nil:
|
||||
return driveItems, err
|
||||
}
|
||||
item.Permissions = append(item.Permissions, *perm)
|
||||
driveItems[resIDStr] = item
|
||||
// group shares by resource id
|
||||
sharesByResource := make(map[string][]*ocm.Share)
|
||||
for _, share := range shares {
|
||||
sharesByResource[share.GetResourceId().String()] = append(sharesByResource[share.GetResourceId().String()], share)
|
||||
}
|
||||
|
||||
type resourceShares struct {
|
||||
ResourceID *storageprovider.ResourceId
|
||||
Shares []*ocm.Share
|
||||
}
|
||||
|
||||
work := make(chan resourceShares, len(shares))
|
||||
results := make(chan *libregraph.DriveItem, len(shares))
|
||||
|
||||
// Distribute work
|
||||
errg.Go(func() error {
|
||||
defer close(work)
|
||||
|
||||
for _, shares := range sharesByResource {
|
||||
select {
|
||||
case work <- resourceShares{ResourceID: shares[0].GetResourceId(), Shares: shares}:
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
// Spawn workers that'll concurrently work the queue
|
||||
numWorkers := g.config.MaxConcurrency
|
||||
if len(sharesByResource) < numWorkers {
|
||||
numWorkers = len(sharesByResource)
|
||||
}
|
||||
for i := 0; i < numWorkers; i++ {
|
||||
errg.Go(func() error {
|
||||
for sharesByResource := range work {
|
||||
resIDStr := storagespace.FormatResourceID(sharesByResource.ResourceID)
|
||||
item, ok := driveItems[resIDStr]
|
||||
if !ok {
|
||||
itemptr, err := g.getDriveItem(ctx, &storageprovider.Reference{ResourceId: sharesByResource.ResourceID})
|
||||
if err != nil {
|
||||
g.logger.Debug().Err(err).Interface("Share", sharesByResource.ResourceID).Msg("could not stat ocm share, skipping")
|
||||
continue
|
||||
}
|
||||
item = *itemptr
|
||||
}
|
||||
|
||||
var condition string
|
||||
switch {
|
||||
case item.Folder != nil:
|
||||
condition = unifiedrole.UnifiedRoleConditionFolderFederatedUser
|
||||
case item.File != nil:
|
||||
condition = unifiedrole.UnifiedRoleConditionFileFederatedUser
|
||||
}
|
||||
for _, share := range sharesByResource.Shares {
|
||||
perm, err := g.cs3OCMShareToPermission(ctx, share, condition)
|
||||
|
||||
var errcode errorcode.Error
|
||||
switch {
|
||||
case errors.As(err, &errcode) && errcode.GetCode() == errorcode.ItemNotFound:
|
||||
// The Grantee couldn't be found (user/group does not exist anymore)
|
||||
continue
|
||||
case err != nil:
|
||||
return err
|
||||
}
|
||||
item.Permissions = append(item.Permissions, *perm)
|
||||
}
|
||||
if len(item.Permissions) == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
select {
|
||||
case results <- &item:
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
// Wait for things to settle down, then close results chan
|
||||
go func() {
|
||||
_ = errg.Wait() // error is checked later
|
||||
close(results)
|
||||
}()
|
||||
|
||||
for item := range results {
|
||||
driveItems[item.GetId()] = *item
|
||||
}
|
||||
|
||||
if err := errg.Wait(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return driveItems, nil
|
||||
}
|
||||
|
||||
@@ -593,26 +721,89 @@ func (g BaseGraphService) cs3OCMShareToPermission(ctx context.Context, share *oc
|
||||
}
|
||||
|
||||
func (g BaseGraphService) cs3PublicSharesToDriveItems(ctx context.Context, shares []*link.PublicShare, driveItems driveItemsByResourceID) (driveItemsByResourceID, error) {
|
||||
for _, s := range shares {
|
||||
g.logger.Debug().Interface("CS3 PublicShare", s).Msg("Got Share")
|
||||
resIDStr := storagespace.FormatResourceID(s.ResourceId)
|
||||
item, ok := driveItems[resIDStr]
|
||||
if !ok {
|
||||
itemptr, err := g.getDriveItem(ctx, &storageprovider.Reference{ResourceId: s.ResourceId})
|
||||
if err != nil {
|
||||
g.logger.Debug().Err(err).Interface("Share", s.ResourceId).Msg("could not stat share, skipping")
|
||||
continue
|
||||
}
|
||||
item = *itemptr
|
||||
}
|
||||
perm, err := g.libreGraphPermissionFromCS3PublicShare(s)
|
||||
if err != nil {
|
||||
g.logger.Error().Err(err).Interface("Link", s.ResourceId).Msg("could not convert link to libregraph")
|
||||
return driveItems, err
|
||||
}
|
||||
errg, ctx := errgroup.WithContext(ctx)
|
||||
|
||||
item.Permissions = append(item.Permissions, *perm)
|
||||
driveItems[resIDStr] = item
|
||||
// group shares by resource id
|
||||
sharesByResource := make(map[string][]*link.PublicShare)
|
||||
for _, share := range shares {
|
||||
sharesByResource[share.GetResourceId().String()] = append(sharesByResource[share.GetResourceId().String()], share)
|
||||
}
|
||||
|
||||
type resourceShares struct {
|
||||
ResourceID *storageprovider.ResourceId
|
||||
Shares []*link.PublicShare
|
||||
}
|
||||
|
||||
work := make(chan resourceShares, len(shares))
|
||||
results := make(chan *libregraph.DriveItem, len(shares))
|
||||
|
||||
// Distribute work
|
||||
errg.Go(func() error {
|
||||
defer close(work)
|
||||
|
||||
for _, shares := range sharesByResource {
|
||||
select {
|
||||
case work <- resourceShares{ResourceID: shares[0].GetResourceId(), Shares: shares}:
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
// Spawn workers that'll concurrently work the queue
|
||||
numWorkers := g.config.MaxConcurrency
|
||||
if len(sharesByResource) < numWorkers {
|
||||
numWorkers = len(sharesByResource)
|
||||
}
|
||||
for i := 0; i < numWorkers; i++ {
|
||||
errg.Go(func() error {
|
||||
for sharesByResource := range work {
|
||||
resIDStr := storagespace.FormatResourceID(sharesByResource.ResourceID)
|
||||
item, ok := driveItems[resIDStr]
|
||||
if !ok {
|
||||
itemptr, err := g.getDriveItem(ctx, &storageprovider.Reference{ResourceId: sharesByResource.ResourceID})
|
||||
if err != nil {
|
||||
g.logger.Debug().Err(err).Interface("Share", sharesByResource.ResourceID).Msg("could not stat share, skipping")
|
||||
continue
|
||||
}
|
||||
item = *itemptr
|
||||
}
|
||||
for _, share := range sharesByResource.Shares {
|
||||
|
||||
perm, err := g.libreGraphPermissionFromCS3PublicShare(share)
|
||||
if err != nil {
|
||||
g.logger.Error().Err(err).Interface("Link", sharesByResource.ResourceID).Msg("could not convert link to libregraph")
|
||||
return err
|
||||
}
|
||||
|
||||
item.Permissions = append(item.Permissions, *perm)
|
||||
}
|
||||
if len(item.Permissions) == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
select {
|
||||
case results <- &item:
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
// Wait for things to settle down, then close results chan
|
||||
go func() {
|
||||
_ = errg.Wait() // error is checked later
|
||||
close(results)
|
||||
}()
|
||||
|
||||
for item := range results {
|
||||
driveItems[item.GetId()] = *item
|
||||
}
|
||||
|
||||
if err := errg.Wait(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return driveItems, nil
|
||||
|
||||
Reference in New Issue
Block a user