@@ -7,9 +7,11 @@
# option. This file may not be copied, modified, or distributed
# except according to those terms.
from __future__ import annotations
from concurrent . futures import Future
from dataclasses import dataclass
import logging
import socket
from geckordp . actors . root import RootActor
from geckordp . actors . descriptors . tab import TabActor
from geckordp . actors . watcher import WatcherActor
@@ -23,7 +25,7 @@ import socketserver
import subprocess
import time
from threading import Thread
from typing import Optional
from typing import Any , Optional
import unittest
from servo . command_base import BuildType
@@ -43,28 +45,96 @@ class Devtools:
client : RDPClient
watcher : WatcherActor
targets : list
exited : bool = False
def connect ( * , expected_targets : int = 1 ) - > Devtools :
"""
Connect to the Servo devtools server.
You should use a `with` statement to ensure we disconnect unconditionally.
"""
client = RDPClient ( )
client . connect ( " 127.0.0.1 " , 6080 )
root = RootActor ( client )
tabs = root . list_tabs ( )
tab_dict = tabs [ 0 ]
tab = TabActor ( client , tab_dict [ " actor " ] )
watcher = tab . get_watcher ( )
watcher = WatcherActor ( client , watcher [ " actor " ] )
done = Future ( )
targets = [ ]
def on_target ( data ) :
try :
targets . append ( data [ " target " ] )
if len ( targets ) == expected_targets :
done . set_result ( None )
except Exception as e :
# Raising here does nothing, for some reason.
# Send the exception back so it can be raised.
done . set_result ( e )
client . add_event_listener (
watcher . actor_id ,
Events . Watcher . TARGET_AVAILABLE_FORM ,
on_target ,
)
watcher . watch_targets ( WatcherActor . Targets . FRAME )
watcher . watch_targets ( WatcherActor . Targets . WORKER )
result : Optional [ Exception ] = done . result ( 1 )
if result :
raise result
return Devtools ( client , watcher , targets )
def __getattribute__ ( self , name : str ) - > Any :
"""
Access a property, raising a ValueError if the instance was previously marked as exited.
"""
if name != " exited " and object . __getattribute__ ( self , " exited " ) :
raise ValueError ( " Devtools instance must not be used after __exit__() " )
return object . __getattribute__ ( self , name )
def __enter__ ( self ) - > Devtools :
"""
Enter the `with` context for this instance, raising a ValueError if it was previously marked as exited.
"""
if self . exited :
raise ValueError ( " Devtools instance must not be used after __exit__() " )
return self
def __exit__ ( self , exc_type , exc_value , traceback ) - > None :
"""
Exit the `with` context for this instance, disconnecting the client and marking it as exited.
Does not raise a ValueError if it was previously marked as exited, so you can nest `with` statements.
"""
if not self . exited :
# Ignore any return value; we never want to return True to suppress exceptions
self . client . __exit__ ( exc_type , exc_value , traceback )
self . exited = True
class DevtoolsTests ( unittest . IsolatedAsyncioTestCase ) :
# /path/to/servo/python/servo
script_path = None
build_type : Optional [ BuildType ] = None
base_urls = None
web_servers = None
web_server_threads = None
def __init__ ( self , methodName = " runTest " ) :
super ( ) . __init__ ( methodName )
self . servoshell = None
self . base_urls = None
self . web_servers = None
self . web_server_threads = None
# Watcher tests
def test_watcher_returns_same_breakpoint_list_actor_every_time ( self ) :
self . run_servoshell ( url = " data:text/html, " )
devtools = self . _setup_devtools_client ( )
response1 = devtools . watcher . get_breakpoint_list_actor ( )
response2 = devtools . watcher . get_breakpoint_list_actor ( )
self . assertEqual ( response1 [ " breakpointList " ] [ " actor " ] , response2 [ " breakpointList " ] [ " actor " ] )
with Devtools . connect ( ) as devtools :
response1 = devtools . watcher . get_breakpoint_list_actor ( )
response2 = devtools . watcher . get_breakpoint_list_actor ( )
self . assertEqual ( response1 [ " breakpointList " ] [ " actor " ] , response2 [ " breakpointList " ] [ " actor " ] )
# Sources list
# Classic script vs module script:
@@ -77,22 +147,20 @@ class DevtoolsTests(unittest.IsolatedAsyncioTestCase):
# Worker script sources can be external or blob.
def test_sources_list ( self ) :
self . start_web_server ( test_dir = os . path . join ( DevtoolsTests . script_path , " devtools_tests/sources " ) )
self . run_servoshell ( )
self . run_servoshell ( url = f " { self . base_urls [ 0 ] } /sources/test.html " )
self . assert_sources_list (
set (
[
# TODO: update expectations when we fix ES modules
tuple (
[
Source ( " srcScript " , f " { self . base_urls [ 0 ] } /classic.js " ) ,
Source ( " inlineScript " , f " { self . base_urls [ 0 ] } /test.html " ) ,
Source ( " inlineScript " , f " { self . base_urls [ 0 ] } /test.html " ) ,
Source ( " srcScript " , f " { self . base_urls [ 1 ] } /classic.js " ) ,
Source ( " importedModule " , f " { self . base_urls [ 0 ] } /module.js " ) ,
Source ( " srcScript " , f " { self . base_urls [ 0 ] } /sources/ classic.js " ) ,
Source ( " inlineScript " , f " { self . base_urls [ 0 ] } /sources/ test.html " ) ,
Source ( " inlineScript " , f " { self . base_urls [ 0 ] } /sources/ test.html " ) ,
Source ( " srcScript " , f " { self . base_urls [ 1 ] } /sources/ classic.js " ) ,
Source ( " importedModule " , f " { self . base_urls [ 0 ] } /sources/ module.js " ) ,
]
) ,
tuple ( [ Source ( " Worker " , f " { self . base_urls [ 0 ] } /classic_worker.js " ) ] ) ,
tuple ( [ Source ( " Worker " , f " { self . base_urls [ 0 ] } /sources/ classic_worker.js " ) ] ) ,
]
) ,
)
@@ -112,9 +180,8 @@ class DevtoolsTests(unittest.IsolatedAsyncioTestCase):
self . assert_sources_list ( set ( [ tuple ( [ Source ( " inlineScript " , " data:text/html,<script>;</script> " ) ] ) ] ) )
def test_sources_list_with_data_external_classic_script ( self ) :
self . start_web_server ( test_dir = os . path . join ( DevtoolsTests . script_path , " devtools_tests/sources " ) )
self . run_servoshell ( url = f ' data:text/html,<script src= "{ self . base_urls [ 0 ] } /classic.js "></script> ' )
self . assert_sources_list ( set ( [ tuple ( [ Source ( " srcScript " , f " { self . base_urls [ 0 ] } /classic.js " ) ] ) ] ) )
self . run_servoshell ( url = f ' data:text/html,<script src= " { self . base_urls [ 0 ] } /sources/classic.js " ></script> ' )
self . assert_sources_list ( set ( [ tuple ( [ Source ( " srcScript " , f "{ self . base_urls [ 0 ] } /sources /classic.js ") ] ) ] ) )
def test_sources_list_with_data_empty_inline_module_script ( self ) :
self . run_servoshell ( url = " data:text/html,<script type=module></script> " )
@@ -127,24 +194,23 @@ class DevtoolsTests(unittest.IsolatedAsyncioTestCase):
)
def test_sources_list_with_data_external_module_script ( self ) :
self . start_web_server ( test_dir = os . path . join ( DevtoolsTests . script_path , " devtools_tests/sources " ) )
self . run_servoshell ( url = f " { self . base_urls [ 0 ] } /test_ sources_list_with_data_external_module_script.html " )
self . assert_sources_list ( set ( [ tuple ( [ Source ( " srcScript " , f " { self . base_urls [ 0 ] } /module.js " ) ] ) ] ) )
self . run_servoshell ( url = f " { self . base_urls [ 0 ] } /sources/test_sources_list_with_data_external_module_script.html " )
self . assert_sources_list ( set ( [ tuple ( [ Source ( " srcScript " , f " { self . base_urls [ 0 ] } /sources/module.js " ) ] ) ] ) )
# Sources list for `introductionType` = `importedModule`
def test_sources_list_with_static_import_module ( self ) :
self . start_web_server ( test_dir = os . path . join ( DevtoolsTests . script_path , " devtools_ tests/ sources") )
self . run_servoshell ( url = f " { self . base_urls [ 0 ] } /test_sources_list_with_static_import_module.html " )
self . run_servoshell ( url = f " { self . base_urls [ 0 ] } /sources/ test_ sources_list_with_static_import_module.html " )
self . assert_sources_list (
set (
[
tuple (
[
Source (
" inlineScript " , f " { self . base_urls [ 0 ] } /test_sources_list_with_static_import_module.html "
" inlineScript " ,
f " { self . base_urls [ 0 ] } /sources/test_sources_list_with_static_import_module.html " ,
) ,
Source ( " importedModule " , f " { self . base_urls [ 0 ] } /module.js " ) ,
Source ( " importedModule " , f " { self . base_urls [ 0 ] } /sources/ module.js " ) ,
]
)
]
@@ -152,17 +218,17 @@ class DevtoolsTests(unittest.IsolatedAsyncioTestCase):
)
def test_sources_list_with_dynamic_import_module ( self ) :
self . start_web_server ( test_dir = os . path . join ( DevtoolsTests . script_path , " devtools_ tests/ sources") )
self . run_servoshell ( url = f " { self . base_urls [ 0 ] } /test_sources_list_with_dynamic_import_module.html " )
self . run_servoshell ( url = f " { self . base_urls [ 0 ] } /sources/ test_ sources_list_with_dynamic_import_module.html " )
self . assert_sources_list (
set (
[
tuple (
[
Source (
" inlineScript " , f " { self . base_urls [ 0 ] } /test_sources_list_with_dynamic_import_module.html "
" inlineScript " ,
f " { self . base_urls [ 0 ] } /sources/test_sources_list_with_dynamic_import_module.html " ,
) ,
Source ( " importedModule " , f " { self . base_urls [ 0 ] } /module.js " ) ,
Source ( " importedModule " , f " { self . base_urls [ 0 ] } /sources/ module.js " ) ,
]
)
]
@@ -172,19 +238,21 @@ class DevtoolsTests(unittest.IsolatedAsyncioTestCase):
# Sources list for `introductionType` = `Worker`
def test_sources_list_with_classic_worker ( self ) :
self . start_web_server ( test_dir = os . path . join ( DevtoolsTests . script_path , " devtools_ tests/ sources") )
self . run_servoshell ( url = f " { self . base_urls [ 0 ] } /test_sources_list_with_classic_worker.html " )
self . run_servoshell ( url = f " { self . base_urls [ 0 ] } /sources/ test_ sources_list_with_classic_worker.html " )
self . assert_sources_list (
set (
[
tuple (
[
Source ( " inlineScript " , f " { self . base_urls [ 0 ] } /test_sources_list_with_classic_worker.html " ) ,
Source (
" inlineScript " ,
f " { self . base_urls [ 0 ] } /sources/test_sources_list_with_classic_worker.html " ,
) ,
]
) ,
tuple (
[
Source ( " Worker " , f " { self . base_urls [ 0 ] } /classic_worker.js " ) ,
Source ( " Worker " , f " { self . base_urls [ 0 ] } /sources/ classic_worker.js " ) ,
]
) ,
]
@@ -192,19 +260,20 @@ class DevtoolsTests(unittest.IsolatedAsyncioTestCase):
)
def test_sources_list_with_module_worker ( self ) :
self . start_web_server ( test_dir = os . path . join ( DevtoolsTests . script_path , " devtools_ tests/ sources") )
self . run_servoshell ( url = f " { self . base_urls [ 0 ] } /test_sources_list_with_module_worker.html " )
self . run_servoshell ( url = f " { self . base_urls [ 0 ] } /sources/ test_ sources_list_with_module_worker.html " )
self . assert_sources_list (
set (
[
tuple (
[
Source ( " inlineScript " , f " { self . base_urls [ 0 ] } /test_sources_list_with_module_worker.html " ) ,
Source (
" inlineScript " , f " { self . base_urls [ 0 ] } /sources/test_sources_list_with_module_worker.html "
) ,
]
) ,
tuple (
[
Source ( " Worker " , f " { self . base_urls [ 0 ] } /module_worker.js " ) ,
Source ( " Worker " , f " { self . base_urls [ 0 ] } /sources/ module_worker.js " ) ,
]
) ,
]
@@ -310,31 +379,35 @@ class DevtoolsTests(unittest.IsolatedAsyncioTestCase):
def test_sources_list_with_debugger_eval_and_display_url ( self ) :
self . run_servoshell ( url = " data:text/html, " )
devtools = self . _setup_devtools_client ( )
console = WebConsoleActor ( devtools . client , devtools . targets [ 0 ] [ " consoleActor " ] )
evaluation_result = Future ( )
with Devtools . connect ( ) as devtools :
console = WebConsoleActor ( devtools . client , devtools . targets [ 0 ] [ " consoleActor " ] )
evaluation_result = Future ( )
async def on_evaluation_result ( data : dict ) :
evaluation_result . set_result ( data )
async def on_evaluation_result ( data : dict ) :
evaluation_result . set_result ( data )
devtools . client . add_event_listener ( console . actor_id , Events . WebConsole . EVALUATION_RESULT , on_evaluation_result )
console . evaluate_js_async ( " //# sourceURL=http://test " )
evaluation_result . result ( 1 )
self . assert_sources_list ( set ( [ tuple ( [ Source ( " debugger eval " , " http://test/ " ) ] ) ] ) )
devtools . client . add_event_listener (
console . actor_id , Events . WebConsole . EVALUATION_RESULT , on_evaluation_result
)
console . evaluate_js_async ( " //# sourceURL= http://test " )
evaluation_result . result ( 1 )
self . assert_sources_list ( set ( [ tuple ( [ Source ( " debugger eval " , " http://test/ " ) ] ) ] ) , devtools = devtools )
def test_sources_list_with_debugger_eval_but_no_display_url ( self ) :
self . run_servoshell ( url = " data:text/html, " )
devtools = self . _setup_devtools_client ( )
console = WebConsoleActor ( devtools . client , devtools . targets [ 0 ] [ " consoleActor " ] )
evaluation_result = Future ( )
with Devtools . connect ( ) as devtools :
console = WebConsoleActor ( devtools . client , devtools . targets [ 0 ] [ " consoleActor " ] )
evaluation_result = Future ( )
async def on_evaluation_result ( data : dict ) :
evaluation_result . set_result ( data )
async def on_evaluation_result ( data : dict ) :
evaluation_result . set_result ( data )
devtools . client . add_event_listener ( console . actor_id , Events . WebConsole . EVALUATION_RESULT , on_evaluation_result )
console . evaluate_js_async ( " 1 " )
evaluation_result . result ( 1 )
self . assert_sources_list ( set ( [ tuple ( [ ] ) ] ) )
devtools . client . add_event_listener (
console . actor_id , Events . WebConsole . EVALUATION_RESULT , on_evaluation_result
)
console . evaluate_js_async ( " 1 " )
evaluation_result . result ( 1 )
self . assert_sources_list ( set ( [ tuple ( [ ] ) ] ) , devtools = devtools )
def test_sources_list_with_function_and_display_url ( self ) :
self . run_servoshell ( url = ' data:text/html,<script>new Function( " // % 23 sourceURL=http://test " )</script> ' )
@@ -537,24 +610,20 @@ class DevtoolsTests(unittest.IsolatedAsyncioTestCase):
self . assert_source_content ( Source ( " inlineScript " , f " data:text/html, { script_tag } " ) , script_tag )
def test_source_content_external_script ( self ) :
self . start_web_server ( test_dir = os . path . join ( DevtoolsTests . script_path , " devtools_tests/sources " ) )
self . run_servoshell ( url = f ' data:text/html,<script src= " { self . base_urls [ 0 ] } /classic.js " ></script> ' )
self . run_servoshell ( url = f ' data:text/html,<script src= " { self . base_urls [ 0 ] } /sources/classic.js " ></script> ' )
expected_content = ' console.log( " external classic " ); \n '
self . assert_source_content ( Source ( " srcScript " , f " { self . base_urls [ 0 ] } /classic.js " ) , expected_content )
self . assert_source_content ( Source ( " srcScript " , f " { self . base_urls [ 0 ] } /sources/ classic.js " ) , expected_content )
def test_source_content_html_file ( self ) :
self . start_web_server ( test_dir = self . get_test_path ( " sources ") )
self . run_servoshell ( )
self . run_servoshell ( url = f " { self . base_urls [ 0 ] } / sources/test.html " )
expected_content = open ( self . get_test_path ( " sources/test.html " ) ) . read ( )
self . assert_source_content ( Source ( " inlineScript " , f " { self . base_urls [ 0 ] } /test.html " ) , expected_content )
self . assert_source_content ( Source ( " inlineScript " , f " { self . base_urls [ 0 ] } /sources/ test.html " ) , expected_content )
def test_source_content_with_inline_module_import_external ( self ) :
self . start_web_server ( test_dir = self . get_test_path ( " sources_content_with_inline_module_import_external ") )
self . run_servoshell ( )
expected_content = open (
self . get_test_path ( " sources _content_with_inline_module_import_external/test.html " )
) . read ( )
self . assert_source_content ( Source ( " inlineScript " , f " { self . base_urls [ 0 ] } /test.html " ) , expected_content )
self . run_servoshell ( url = f " { self . base_urls [ 0 ] } / sources_content_with_inline_module_import_external/test.html " )
path = " sources_content_with_inline_module_import_external/test.html "
expected_content = open ( self . get_test_path ( path ) ) . read ( )
self . assert_ source_content( Source ( " inlineScript " , f " { self . base_urls [ 0 ] } / { path } " ) , expected_content )
# Test case that uses innerHTML and would actually need the HTML parser
# (innerHTML has a fast path for values that don’ t contain b'&' | b'\0' | b'<' | b'\r')
@@ -580,16 +649,16 @@ class DevtoolsTests(unittest.IsolatedAsyncioTestCase):
# Test case that uses XMLHttpRequest#responseXML and would actually need the HTML parser
# (innerHTML has a fast path for values that don’ t contain b'&' | b'\0' | b'<' | b'\r')
def test_source_content_inline_script_with_responsexml ( self ) :
self . start_web_server ( test_dir = self . get_test_path ( " sources_content_with_responsexml ") )
self . run_servoshell ( )
self . run_servoshell ( url = f " { self . base_urls [ 0 ] } / sources_content_with_responsexml/test.html " )
expected_content = open ( self . get_test_path ( " sources_content_with_responsexml/test.html " ) ) . read ( )
self . assert_source_content ( Source ( " inlineScript " , f " { self . base_urls [ 0 ] } /test.html " ) , expected_content )
self . assert_source_content (
Source ( " inlineScript " , f " { self . base_urls [ 0 ] } /sources_content_with_responsexml/test.html " ) , expected_content
)
def test_source_breakable_lines_and_positions ( self ) :
self . start_web_server ( test_dir = self . get_test_path ( " sources_breakable_lines_and_positions ") )
self . run_servoshell ( )
self . run_servoshell ( url = f " { self . base_urls [ 0 ] } / sources_breakable_lines_and_positions/test.html " )
self . assert_source_breakable_lines_and_positions (
Source ( " inlineScript " , f " { self . base_urls [ 0 ] } /test.html " ) ,
Source ( " inlineScript " , f " { self . base_urls [ 0 ] } /sources_breakable_lines_and_positions/ test.html " ) ,
[ 4 , 5 , 6 , 7 ] ,
{
" 4 " : [ 4 , 12 , 20 , 28 ] ,
@@ -616,13 +685,14 @@ class DevtoolsTests(unittest.IsolatedAsyncioTestCase):
)
# Sets `base_url` and `web_server` and `web_server_thread`.
def start_web_server ( self , * , test_dir = None , num_servers = 2 ) :
assert self . base_urls is None and self . web_servers is None and self . web_server_threads is None
if test_dir is None :
test_dir = os . path . join ( DevtoolsTests . script_path , " devtools_tests " )
@classmethod
def setUpClass ( cls ) :
assert cls . base_urls is None and cls . web_servers is None and cls . web_server_threads is None
test_dir = os . path . join ( DevtoolsTests . script_path , " devtools_tests " )
num_servers = 2
base_urls = [ Future ( ) for i in range ( num_servers ) ]
self . web_servers = [ None for i in range ( num_servers ) ]
self . web_server_threads = [ None for i in range ( num_servers ) ]
cls . web_servers = [ None for i in range ( num_servers ) ]
cls . web_server_threads = [ None for i in range ( num_servers ) ]
class Handler ( http . server . SimpleHTTPRequestHandler ) :
def __init__ ( self , * args , * * kwargs ) :
@@ -641,165 +711,142 @@ class DevtoolsTests(unittest.IsolatedAsyncioTestCase):
web_server = socketserver . TCPServer ( ( " 127.0.0.1 " , 10000 + index ) , Handler )
base_url = f " http://127.0.0.1: { web_server . server_address [ 1 ] } "
base_urls [ index ] . set_result ( base_url )
self . web_servers [ index ] = web_server
cls . web_servers [ index ] = web_server
web_server . serve_forever ( )
# Start a web server for the test.
for index in range ( num_servers ) :
thread = Thread ( target = server_thread , args = [ index ] )
self . web_server_threads [ index ] = thread
cls . web_server_threads [ index ] = thread
thread . start ( )
self . base_urls = [ base_url . result ( 1 ) for base_url in base_urls ]
cls . base_urls = [ base_url . result ( 1 ) for base_url in base_urls ]
# Sets `servoshell`.
def run_servoshell ( self , * , url = None ) :
def run_servoshell ( self , * , url ) :
# Change this setting if you want to debug Servo.
os . environ [ " RUST_LOG " ] = " error,devtools=warn "
# Run servoshell.
if url is None :
url = f " { self . base_urls [ 0 ] } /test.html "
self . servoshell = subprocess . Popen ( [ f " target/ { self . build_type . directory_name ( ) } /servo " , " --devtools=6080 " , url ] )
self . servoshell = subprocess . Popen (
[ f " target/ { self . build_type . directory_name ( ) } /servo " , " --headless " , " --devtools=6080 " , url ]
)
# FIXME: Don’ t do thi s
time . sleep ( 1 )
sleep_per_try = 1 / 8 # second s
remaining_tries = 5 / sleep_per_try # 5 seconds
while True :
print ( " . " , end = " " , flush = True )
stream = socket . socket ( socket . AF_INET , socket . SOCK_STREAM )
try :
stream . connect ( ( " 127.0.0.1 " , 6080 ) )
stream . recv ( 4096 ) # FIXME: without this, geckordp RDPClient.connect() may fail
stream . shutdown ( socket . SHUT_RDWR )
print ( " + " , end = " " , flush = True )
break
except Exception :
time . sleep ( sleep_per_try )
self . assertGreater ( remaining_tries , 0 )
remaining_tries - = 1
continue
def tearDown ( self ) :
# Terminate servoshell.
# Terminate servoshell, but do not stop the web servers .
if self . servoshell is not None :
self . servoshell . terminate ( )
self . servoshell = None
@classmethod
def tearDownClass ( cls ) :
# Stop the web servers.
if self . web_servers is not None :
for web_server in self . web_servers :
if cls . web_servers is not None :
for web_server in cls . web_servers :
web_server . shutdown ( )
web_server . server_close ( )
self . web_servers = None
if self . web_server_threads is not None :
for web_server_thread in self . web_server_threads :
cls . web_servers = None
if cls . web_server_threads is not None :
for web_server_thread in cls . web_server_threads :
web_server_thread . join ( )
self . web_server_threads = None
if self . base_urls is not None :
self . base_urls = None
def _setup_devtools_client ( self , * , expected_targets = 1 ) - > Devtools :
client = RDPClient ( )
client . connect ( " 127.0.0.1 " , 6080 )
root = RootActor ( client )
tabs = root . list_tabs ( )
tab_dict = tabs [ 0 ]
tab = TabActor ( client , tab_dict [ " actor " ] )
watcher = tab . get_watcher ( )
watcher = WatcherActor ( client , watcher [ " actor " ] )
done = Future ( )
targets = [ ]
def on_target ( data ) :
try :
targets . append ( data [ " target " ] )
if len ( targets ) == expected_targets :
done . set_result ( None )
except Exception as e :
# Raising here does nothing, for some reason.
# Send the exception back so it can be raised.
done . set_result ( e )
client . add_event_listener (
watcher . actor_id ,
Events . Watcher . TARGET_AVAILABLE_FORM ,
on_target ,
)
watcher . watch_targets ( WatcherActor . Targets . FRAME )
watcher . watch_targets ( WatcherActor . Targets . WORKER )
result : Optional [ Exception ] = done . result ( 1 )
if result :
raise result
return Devtools ( client , watcher , targets )
cls . web_server_threads = None
if cls . base_urls is not None :
cls . base_urls = None
def assert_sources_list (
self , expected_sources_by_target : set [ tuple [ Source ] ] , * , devtools : Optional [ Devtools ] = None
) :
expected_targets = len ( expected_sources_by_target )
if devtools is None :
devtools = self . _setup_devtools_clien t( expected_targets = expected_targets )
done = Future ( )
# NOTE: breaks if two targets have the same list of source urls.
# This should really be a multiset, but Python does not have multiset s.
actual_sources_by_target : set [ tuple [ Source ] ] = set ( )
devtools = Devtools . connec t( expected_targets = expected_targets )
with devtools :
done = Future ( )
# NOTE: breaks if two targets have the same list of source url s.
# This should really be a multiset, but Python does not have multisets.
actual_sources_by_target : set [ tuple [ Source ] ] = set ( )
def on_source_resource ( data ) :
for [ resource_type , sources ] in data [ " array " ] :
try :
self . assertEqual ( resource_type , " source " )
source_urls = tuple ( [ Source ( source [ " introductionType " ] , source [ " url " ] ) for source in sources ] )
self . assertFalse ( source_urls in actual_sources_by_target ) # See NOTE above
actual_sources_by_target . add ( source_urls )
if len ( actual_sources_by_target ) == expected_targets :
done . set_result ( None )
except Exception as e :
# Raising here does nothing, for some reason.
# Send the exception back so it can be raised.
done . set_result ( e )
def on_source_resource ( data ) :
for [ resource_type , sources ] in data [ " array " ] :
try :
self . assertEqual ( resource_type , " source " )
source_urls = tuple ( [ Source ( source [ " introductionType " ] , source [ " url " ] ) for source in sources ] )
self . assertFalse ( source_urls in actual_sources_by_target ) # See NOTE above
actual_sources_by_target . add ( source_urls )
if len ( actual_sources_by_target ) == expected_targets :
done . set_result ( None )
except Exception as e :
# Raising here does nothing, for some reason.
# Send the exception back so it can be raised.
done . set_result ( e )
for target in devtools . targets :
devtools . client . add_event_listener (
target [ " actor " ] ,
Events . Watcher . RESOURCES_AVAILABLE_ARRAY ,
on_source_resource ,
)
devtools . watcher . watch_resources ( [ Resources . SOURCE ] )
for target in devtools . targets :
devtools . client . add_event_listener (
target [ " actor " ] ,
Events . Watcher . RESOURCES_AVAILABLE_ARRAY ,
on_source_resource ,
)
devtools . watcher . watch_resources ( [ Resources . SOURCE ] )
result : Optional [ Exception ] = done . result ( 1 )
if result :
raise result
self . assertEqual ( actual_sources_by_target , expected_sources_by_target )
devtools . client . disconnect ( )
result : Optional [ Exception ] = done . result ( 1 )
if result :
raise result
self . assertEqual ( actual_sources_by_target , expected_sources_by_target )
def assert_source_content (
self , expected_source : Source , expected_content : str , * , devtools : Optional [ Devtools ] = None
) :
if devtools is None :
devtools = self . _setup_devtools_clien t( )
devtools = Devtools . connec t( )
with devtools :
done = Future ( )
source_actors = { }
done = Fut ure( )
source_actors = { }
def on_source_reso urc e ( data ) :
for [ resource_type , sources ] in data [ " array " ] :
try :
self . assertEqual ( resource_type , " source " )
for source in sources :
if Source ( source [ " introductionType " ] , source [ " url " ] ) == expected_source :
source_actors [ expected_source ] = source [ " actor " ]
done . set_result ( None )
except Exception as e :
done . set_result ( e )
def on_source_resource ( data ) :
for [ resource_type , sources ] in data [ " array " ] :
try :
self . assertEqual ( resource_type , " source " )
for source in sources :
if Source ( source [ " introductionType " ] , source [ " url " ] ) == expected_source :
source_actors [ expected_ source ] = source [ " actor " ]
done . set_result ( None )
except Exception as e :
done . set_result ( e )
for target in devtools . targets :
devtools . client . add_event_listener (
target [ " actor " ] ,
Events . Watcher . RESOURCES_AVAILABLE_ARRAY ,
on_source_re source,
)
devtools . watcher . watch_re sources ( [ Resources . SOURCE ] )
for target in devtools . targets :
devtools . client . add_event_listener (
target [ " actor " ] ,
Events . Watcher . RESOURCES_AVAILABLE_ARRAY ,
on_source_resource ,
)
devtools . watcher . watch_resources ( [ Resources . SOURCE ] )
result : Optional [ Exception ] = done . result ( 1 )
if result :
raise result
result : Optional [ Exception ] = done . result ( 1 )
if result :
raise result
# We found at least one source with the given url.
self . assertIn ( expected_source , source_actors )
source_actor = source_actors [ expected_source ]
# We found at least one source with the given url.
self . assertIn ( expected_source , source_actors )
source_actor = source_actors [ expected_source ]
response = devtools . client . send_receive ( { " to " : source_actor , " type " : " source " } )
response = devtools . client . send_receive ( { " to " : source_actor , " type " : " source " } )
self . assertEqual ( response [ " source " ] , expected_content )
devtools . client . disconnect ( )
self . assertEqual ( response [ " source " ] , expected_content )
def assert_source_breakable_lines_and_positions (
self ,
@@ -810,45 +857,43 @@ class DevtoolsTests(unittest.IsolatedAsyncioTestCase):
devtools : Optional [ Devtools ] = None ,
) :
if devtools is None :
devtools = self . _setup_devtools_clien t( )
devtools = Devtools . connec t( )
with devtools :
done = Future ( )
source_actors = { }
done = Fut ure( )
source_actors = { }
def on_source_reso urc e ( data ) :
for [ resource_type , sources ] in data [ " array " ] :
try :
self . assertEqual ( resource_type , " source " )
for source in sources :
if Source ( source [ " introductionType " ] , source [ " url " ] ) == expected_source :
source_actors [ expected_source ] = source [ " actor " ]
done . set_result ( None )
except Exception as e :
done . set_result ( e )
def on_source_resource ( data ) :
for [ resource_type , sources ] in data [ " array " ] :
try :
self . assertEqual ( resource_type , " source " )
for source in sources :
if Source ( source [ " introductionType " ] , source [ " url " ] ) == expected_source :
source_actors [ expected_ source ] = source [ " actor " ]
done . set_result ( None )
except Exception as e :
done . set_result ( e )
for target in devtools . targets :
devtools . client . add_event_listener (
target [ " actor " ] ,
Events . Watcher . RESOURCES_AVAILABLE_ARRAY ,
on_source_re source,
)
devtools . watcher . watch_re sources ( [ Resources . SOURCE ] )
for target in devtools . targets :
devtools . client . add_event_listener (
target [ " actor " ] ,
Events . Watcher . RESOURCES_AVAILABLE_ARRAY ,
on_source_resource ,
)
devtools . watcher . watch_resources ( [ Resources . SOURCE ] )
result : Optional [ Exception ] = done . result ( 1 )
if result :
raise result
result : Optional [ Exception ] = done . result ( 1 )
if result :
raise result
# We found at least one source with the given url.
self . assertIn ( expected_source , source_actors )
source_actor = source_actors [ expected_source ]
# We found at least one source with the given url.
self . assertIn ( expected_source , source_actor s )
source_actor = source_actors [ expected_source ]
response = devtools . client . send_receive ( { " to " : source_actor , " type " : " getBreakableLines " } )
self . assertEqual ( response [ " lines " ] , expected_breakable_line s )
response = devtools . client . send_receive ( { " to " : source_actor , " type " : " getBreakableLines " } )
self . assertEqual ( response [ " line s" ] , expected_breakable_line s )
response = devtools . client . send_receive ( { " to " : source_actor , " type " : " getBreakpointPositionsCompressed " } )
self . assertEqual ( response [ " positions " ] , expected_positions )
devtools . client . disconnect ( )
response = devtools . client . send_receive ( { " to " : source_actor , " type " : " getBreakpointPositionsCompressed " } )
self . assertEqual ( response [ " position s" ] , expected_position s )
def get_test_path ( self , path : str ) - > str :
return os . path . join ( DevtoolsTests . script_path , os . path . join ( " devtools_tests " , path ) )