# Copyright 2013 The Servo Project Developers. See the COPYRIGHT # file at the top-level directory of this distribution. # # Licensed under the Apache License, Version 2.0 or the MIT license # , at your # option. This file may not be copied, modified, or distributed # except according to those terms. from __future__ import annotations from concurrent.futures import Future from dataclasses import dataclass import logging import socket import sys from geckordp.actors.root import RootActor from geckordp.actors.descriptors.tab import TabActor from geckordp.actors.watcher import WatcherActor from geckordp.actors.web_console import WebConsoleActor from geckordp.actors.resources import Resources from geckordp.actors.events import Events from geckordp.actors.inspector import InspectorActor from geckordp.actors.walker import WalkerActor from geckordp.actors.node import NodeActor from geckordp.rdp_client import RDPClient import http.server import os.path import socketserver import subprocess import time from threading import Thread from typing import Any, Iterable, Optional, TypeVar import unittest from collections import Counter # Set this to true to log requests in the internal web servers. LOG_REQUESTS = False @dataclass(frozen=True, order=True) class Source: introduction_type: str url: str @dataclass class Devtools: client: RDPClient watcher: WatcherActor targets: list exited: bool = False def connect(*, expected_targets: int = 1) -> Devtools: """ Connect to the Servo devtools server. You should use a `with` statement to ensure we disconnect unconditionally. """ client = RDPClient() client.connect("127.0.0.1", 6080) root = RootActor(client) tabs = root.list_tabs() tab_dict = tabs[0] tab = TabActor(client, tab_dict["actor"]) watcher = tab.get_watcher() watcher = WatcherActor(client, watcher["actor"]) done = Future() targets = [] def on_target(data): try: targets.append(data["target"]) if len(targets) == expected_targets: done.set_result(None) except Exception as e: # Raising here does nothing, for some reason. # Send the exception back so it can be raised. done.set_result(e) client.add_event_listener( watcher.actor_id, Events.Watcher.TARGET_AVAILABLE_FORM, on_target, ) watcher.watch_targets(WatcherActor.Targets.FRAME) watcher.watch_targets(WatcherActor.Targets.WORKER) result: Optional[Exception] = done.result(1) if result: raise result return Devtools(client, watcher, targets) def __getattribute__(self, name: str) -> Any: """ Access a property, raising a ValueError if the instance was previously marked as exited. """ if name != "exited" and object.__getattribute__(self, "exited"): raise ValueError("Devtools instance must not be used after __exit__()") return object.__getattribute__(self, name) def __enter__(self) -> Devtools: """ Enter the `with` context for this instance, raising a ValueError if it was previously marked as exited. """ if self.exited: raise ValueError("Devtools instance must not be used after __exit__()") return self def __exit__(self, exc_type, exc_value, traceback) -> None: """ Exit the `with` context for this instance, disconnecting the client and marking it as exited. Does not raise a ValueError if it was previously marked as exited, so you can nest `with` statements. """ if not self.exited: # Ignore any return value; we never want to return True to suppress exceptions self.client.__exit__(exc_type, exc_value, traceback) self.exited = True # TODO: Use new syntax in python 3.12 # # T = TypeVar("T") FrozenMultiset = tuple[tuple[T, int], ...] def frozen_multiset(items: Iterable[T] = []) -> FrozenMultiset[T]: """ Simulate a frozen multiset using a tuple of tuples. Python does not have one yet: """ # First make a mutable multiset result = Counter(items) # then convert it to a tuple with a stable order return tuple(sorted(result.items())) class DevtoolsTests(unittest.IsolatedAsyncioTestCase): # /path/to/servo/python/servo script_path = None servo_binary: Optional[str] = None base_urls = None web_servers = None web_server_threads = None def __init__(self, methodName="runTest"): super().__init__(methodName) self.servoshell = None # Watcher tests def test_watcher_returns_same_breakpoint_list_actor_every_time(self): self.run_servoshell(url="data:text/html,") with Devtools.connect() as devtools: response1 = devtools.watcher.get_breakpoint_list_actor() response2 = devtools.watcher.get_breakpoint_list_actor() self.assertEqual(response1["breakpointList"]["actor"], response2["breakpointList"]["actor"]) def test_breakpoint_pause(self): self.run_servoshell(url=f"{self.base_urls[0]}/debugger/loop.html") with Devtools.connect() as devtools: thread_actor = devtools.targets[0]["threadActor"] devtools.client.send_receive({"to": thread_actor, "type": "attach"}) # Wait for source source_future = Future() def on_source(data): for [resource_type, sources] in data.get("array", []): if resource_type == "source": for source in sources: if "debugger/loop.html" in source.get("url", ""): source_future.set_result(source["actor"]) devtools.client.add_event_listener( devtools.targets[0]["actor"], Events.Watcher.RESOURCES_AVAILABLE_ARRAY, on_source, ) devtools.watcher.watch_resources([Resources.SOURCE]) source_actor = source_future.result(2) # Get valid breakpoint position positions = devtools.client.send_receive( {"to": source_actor, "type": "getBreakpointPositionsCompressed"} ).get("positions", {}) line_str = min(positions.keys(), key=int) line, column = int(line_str), positions[line_str][0] # Set breakpoint at the first available position breakpoint_list = devtools.watcher.get_breakpoint_list_actor() devtools.client.send_receive( { "to": breakpoint_list["breakpointList"]["actor"], "type": "setBreakpoint", "location": { "sourceUrl": f"{self.base_urls[0]}/debugger/loop.html", "line": line, "column": column, }, } ) # Listen for paused event paused_future = Future() def on_paused(data): paused_future.set_result(data) devtools.client.add_event_listener(thread_actor, "paused", on_paused) # Verify pause paused_data = paused_future.result(3) self.assertEqual(paused_data.get("type"), "paused") self.assertEqual(paused_data.get("why", {}).get("type"), "breakpoint") def test_frame_scoped_eval(self): self.run_servoshell(url=f"{self.base_urls[0]}/debugger/frame_scoped.html") with Devtools.connect() as devtools: thread_actor = devtools.targets[0]["threadActor"] console_actor = devtools.targets[0]["consoleActor"] devtools.client.send_receive({"to": thread_actor, "type": "attach"}) paused_future = Future() def on_paused(data): paused_future.set_result(data) devtools.client.add_event_listener(thread_actor, "paused", on_paused) devtools.client.send_receive({"to": thread_actor, "type": "interrupt", "when": "onNext"}) paused_data = paused_future.result(3) frame_actor = paused_data.get("frame", {}).get("actor") self.assertIsNotNone(frame_actor) eval_future = Future() def on_eval_result(data): eval_future.set_result(data) devtools.client.add_event_listener(console_actor, Events.WebConsole.EVALUATION_RESULT, on_eval_result) devtools.client.send_receive( { "to": console_actor, "type": "evaluateJSAsync", "text": "i", "frameActor": frame_actor, } ) eval_result = eval_future.result(2) self.assertFalse(eval_result.get("hasException", True)) self.assertEqual(eval_result.get("result"), 42) def test_breakpoint_at_invalid_entry_point_does_not_crash(self): self.run_servoshell(url=f"{self.base_urls[0]}/debugger/loop.html") with Devtools.connect() as devtools: breakpoint_list = devtools.watcher.get_breakpoint_list_actor() response = devtools.client.send_receive( { "to": breakpoint_list["breakpointList"]["actor"], "type": "setBreakpoint", "location": { "sourceUrl": f"{self.base_urls[0]}/debugger/loop.html", "line": 1, "column": 0, }, } ) self.assertIn("from", response) def test_manual_pause(self): self.run_servoshell(url=f"{self.base_urls[0]}/debugger/loop.html") with Devtools.connect() as devtools: thread_actor = devtools.targets[0]["threadActor"] devtools.client.send_receive({"to": thread_actor, "type": "attach"}) # Listen for paused event paused_future = Future() def on_paused(data): paused_future.set_result(data) devtools.client.add_event_listener(thread_actor, "paused", on_paused) # Interrupt when entering the next frame devtools.client.send_receive( { "to": thread_actor, "type": "interrupt", "when": "onNext", } ) # Verify pause paused_data = paused_future.result(3) self.assertEqual(paused_data.get("type"), "paused") why = paused_data.get("why", {}) self.assertEqual(why.get("type"), "interrupted") self.assertEqual(why.get("onNext"), True) # Sources list # Classic script vs module script: # - # - # Worker scripts can be classic or module: # - # - # Non-worker(?) script sources can be inline, external, or blob. # Worker script sources can be external or blob. def test_sources_list(self): self.run_servoshell(url=f"{self.base_urls[0]}/sources/test.html") self.assert_sources_list( Counter( [ frozen_multiset( [ Source("srcScript", f"{self.base_urls[0]}/sources/classic.js"), Source("inlineScript", f"{self.base_urls[0]}/sources/test.html"), Source("inlineScript", f"{self.base_urls[0]}/sources/test.html"), Source("srcScript", f"{self.base_urls[1]}/sources/classic.js"), Source("importedModule", f"{self.base_urls[0]}/sources/module.js"), ] ), frozen_multiset([Source("Worker", f"{self.base_urls[0]}/sources/classic_worker.js")]), ] ), ) def test_sources_list_with_data_no_scripts(self): self.run_servoshell(url="data:text/html,") self.assert_sources_list(Counter([frozen_multiset()])) # Sources list for `introductionType` = `inlineScript` and `srcScript` def test_sources_list_with_data_empty_inline_classic_script(self): self.run_servoshell(url="data:text/html,") self.assert_sources_list(Counter([frozen_multiset()])) def test_sources_list_with_data_inline_classic_script(self): self.run_servoshell(url="data:text/html,") self.assert_sources_list( Counter([frozen_multiset([Source("inlineScript", "data:text/html,")])]) ) def test_sources_list_with_data_external_classic_script(self): self.run_servoshell(url=f'data:text/html,') self.assert_sources_list( Counter([frozen_multiset([Source("srcScript", f"{self.base_urls[0]}/sources/classic.js")])]) ) def test_sources_list_with_data_empty_inline_module_script(self): self.run_servoshell(url="data:text/html,") self.assert_sources_list(Counter([frozen_multiset()])) def test_sources_list_with_data_inline_module_script(self): self.run_servoshell(url="data:text/html,") self.assert_sources_list( Counter([frozen_multiset([Source("inlineScript", "data:text/html,")])]) ) def test_sources_list_with_data_external_module_script(self): self.run_servoshell(url=f"{self.base_urls[0]}/sources/test_sources_list_with_data_external_module_script.html") self.assert_sources_list( Counter([frozen_multiset([Source("srcScript", f"{self.base_urls[0]}/sources/module.js")])]) ) # Sources list for `introductionType` = `importedModule` def test_sources_list_with_static_import_module(self): self.run_servoshell(url=f"{self.base_urls[0]}/sources/test_sources_list_with_static_import_module.html") self.assert_sources_list( Counter( [ frozen_multiset( [ Source( "inlineScript", f"{self.base_urls[0]}/sources/test_sources_list_with_static_import_module.html", ), Source("importedModule", f"{self.base_urls[0]}/sources/module.js"), ] ) ] ), ) def test_sources_list_with_dynamic_import_module(self): self.run_servoshell(url=f"{self.base_urls[0]}/sources/test_sources_list_with_dynamic_import_module.html") self.assert_sources_list( Counter( [ frozen_multiset( [ Source( "inlineScript", f"{self.base_urls[0]}/sources/test_sources_list_with_dynamic_import_module.html", ), Source("importedModule", f"{self.base_urls[0]}/sources/module.js"), ] ) ] ), ) # Sources list for `introductionType` = `Worker` def test_sources_list_with_classic_worker(self): self.run_servoshell(url=f"{self.base_urls[0]}/sources/test_sources_list_with_classic_worker.html") self.assert_sources_list( Counter( [ frozen_multiset( [ Source( "inlineScript", f"{self.base_urls[0]}/sources/test_sources_list_with_classic_worker.html", ), ] ), frozen_multiset( [ Source("Worker", f"{self.base_urls[0]}/sources/classic_worker.js"), ] ), ] ), ) def test_sources_list_with_module_worker(self): self.run_servoshell(url=f"{self.base_urls[0]}/sources/test_sources_list_with_module_worker.html") self.assert_sources_list( Counter( [ frozen_multiset( [ Source( "inlineScript", f"{self.base_urls[0]}/sources/test_sources_list_with_module_worker.html" ), ] ), frozen_multiset( [ Source("Worker", f"{self.base_urls[0]}/sources/module_worker.js"), ] ), ] ), ) # Sources list for `introductionType` set to values that require `displayURL` (`//# sourceURL`) def test_sources_list_with_injected_script_write_and_display_url(self): self.run_servoshell( url='data:text/html,' ) self.assert_sources_list( Counter( [ frozen_multiset( [ Source( "inlineScript", 'data:text/html,', ), Source("injectedScript", "http://test/"), ] ) ] ) ) def test_sources_list_with_injected_script_write_but_no_display_url(self): self.run_servoshell(url='data:text/html,') self.assert_sources_list( Counter( [ frozen_multiset( [ Source( "inlineScript", 'data:text/html,', ), ] ) ] ) ) def test_sources_list_with_injected_script_append_and_display_url(self): script = 's=document.createElement("script");s.append("//%23 sourceURL=http://test");document.body.append(s)' self.run_servoshell(url=f"data:text/html,") self.assert_sources_list( Counter( [ frozen_multiset( [ Source( "inlineScript", f"data:text/html,", ), Source("injectedScript", "http://test/"), ] ) ] ) ) def test_sources_list_with_injected_script_append_but_no_display_url(self): script = 's=document.createElement("script");s.append("1");document.body.append(s)' self.run_servoshell(url=f"data:text/html,") self.assert_sources_list( Counter( [ frozen_multiset( [ Source( "inlineScript", f"data:text/html,", ), ] ) ] ) ) def test_sources_list_with_eval_and_display_url(self): self.run_servoshell(url='data:text/html,') self.assert_sources_list( Counter( [ frozen_multiset( [ Source( "inlineScript", 'data:text/html,' ), Source("eval", "http://test/"), ] ) ] ) ) def test_sources_list_with_eval_but_no_display_url(self): self.run_servoshell(url='data:text/html,') self.assert_sources_list( Counter([frozen_multiset([Source("inlineScript", 'data:text/html,')])]) ) def test_sources_list_with_debugger_eval_and_display_url(self): self.run_servoshell(url="data:text/html,") with Devtools.connect() as devtools: console = WebConsoleActor(devtools.client, devtools.targets[0]["consoleActor"]) evaluation_result = Future() async def on_evaluation_result(data: dict): evaluation_result.set_result(data) devtools.client.add_event_listener( console.actor_id, Events.WebConsole.EVALUATION_RESULT, on_evaluation_result ) console.evaluate_js_async("//# sourceURL=http://test") evaluation_result.result(1) self.assert_sources_list( Counter([frozen_multiset([Source("debugger eval", "http://test/")])]), devtools=devtools ) def test_sources_list_with_debugger_eval_but_no_display_url(self): self.run_servoshell(url="data:text/html,") with Devtools.connect() as devtools: console = WebConsoleActor(devtools.client, devtools.targets[0]["consoleActor"]) evaluation_result = Future() async def on_evaluation_result(data: dict): evaluation_result.set_result(data) devtools.client.add_event_listener( console.actor_id, Events.WebConsole.EVALUATION_RESULT, on_evaluation_result ) console.evaluate_js_async("1") evaluation_result.result(1) self.assert_sources_list(Counter([frozen_multiset([])]), devtools=devtools) def test_sources_list_with_function_and_display_url(self): self.run_servoshell(url='data:text/html,') self.assert_sources_list( Counter( [ frozen_multiset( [ Source( "inlineScript", 'data:text/html,', ), Source("Function", "http://test/"), ] ) ] ) ) def test_sources_list_with_function_but_no_display_url(self): self.run_servoshell(url='data:text/html,') self.assert_sources_list( Counter( [ frozen_multiset( [ Source("inlineScript", 'data:text/html,'), ] ) ] ) ) def test_sources_list_with_javascript_url_and_display_url(self): # “1” prefix is a workaround for self.run_servoshell( url='data:text/html,' ) self.assert_sources_list( Counter( [ frozen_multiset( [ Source( "inlineScript", 'data:text/html,', ), Source("javascriptURL", "http://test/"), ] ) ] ) ) def test_sources_list_with_javascript_url_but_no_display_url(self): self.run_servoshell(url='data:text/html,') self.assert_sources_list(Counter([frozen_multiset([])])) @unittest.expectedFailure def test_sources_list_with_event_handler_and_display_url(self): self.run_servoshell(url='data:text/html,') self.assert_sources_list( Counter( [ frozen_multiset( [ Source("eventHandler", "http://test/"), ] ) ] ) ) def test_sources_list_with_event_handler_but_no_display_url(self): self.run_servoshell(url='data:text/html,') self.assert_sources_list(Counter([frozen_multiset([])])) @unittest.expectedFailure def test_sources_list_with_dom_timer_and_display_url(self): self.run_servoshell(url='data:text/html,') self.assert_sources_list( Counter( [ frozen_multiset( [ Source("domTimer", "http://test/"), ] ) ] ) ) @unittest.expectedFailure def test_sources_list_with_dom_timer_but_no_display_url(self): self.run_servoshell(url='data:text/html,') self.assert_sources_list(Counter([frozen_multiset([])])) # Sources list for scripts with `displayURL` (`//# sourceURL`), despite not being required by `introductionType` def test_sources_list_with_inline_script_and_display_url(self): self.run_servoshell(url="data:text/html,") self.assert_sources_list( Counter( [ frozen_multiset( [ Source("inlineScript", "http://test/"), ] ) ] ) ) # Extra test case for situation where `//# sourceURL` can’t be parsed with page url as base. def test_sources_list_with_inline_script_but_invalid_display_url(self): self.run_servoshell(url="data:text/html,") self.assert_sources_list( Counter( [ frozen_multiset( [ Source("inlineScript", "data:text/html,"), ] ) ] ) ) def test_sources_list_with_inline_script_but_no_display_url(self): self.run_servoshell(url="data:text/html,") self.assert_sources_list( Counter( [ frozen_multiset( [ Source("inlineScript", "data:text/html,"), ] ) ] ) ) # Sources list for inline scripts in `