Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Fixed

- Fix `query_selector_all` and `select_all(include_frames=True)` not searching nested iframes. CDP's `querySelectorAll` only queries within a single document boundary, so elements inside nested iframes were never found. Now walks the full DOM tree to collect each iframe's `content_document` and queries them individually. Also adds a guard for cross-origin iframes where `content_document` is `None`. @chronoAP
- Fix `Connection._register_handlers` reenabling already manually enabled domains @S-Tarr
- Fix flakey behavior in `api-reponses-tutorial-2` tutorial @S-Tarr

Expand Down
119 changes: 119 additions & 0 deletions tests/core/test_tab.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import asyncio
from collections.abc import Generator
from typing import Any

import pytest
Expand All @@ -10,6 +11,30 @@
from zendriver.core.connection import ProtocolException


def make_node(
node_id: int,
node_name: str,
*,
children: list[zd.cdp.dom.Node] | None = None,
content_document: zd.cdp.dom.Node | None = None,
attributes: list[str] | None = None,
parent_id: int | None = None,
) -> zd.cdp.dom.Node:
return zd.cdp.dom.Node(
node_id=zd.cdp.dom.NodeId(node_id),
backend_node_id=zd.cdp.dom.BackendNodeId(node_id),
node_type=9 if node_name == "#document" else 1,
node_name=node_name,
local_name="" if node_name == "#document" else node_name.lower(),
node_value="",
parent_id=zd.cdp.dom.NodeId(parent_id) if parent_id is not None else None,
child_node_count=len(children) if children is not None else None,
children=children,
attributes=attributes,
content_document=content_document,
)


async def test_set_user_agent_sets_navigator_values(browser: zd.Browser) -> None:
tab = browser.main_tab
assert tab is not None
Expand Down Expand Up @@ -67,6 +92,100 @@ async def test_select(browser: zd.Browser) -> None:
assert result.text == "Apples"


async def test_query_selector_all_include_frames_queries_nested_iframe_documents(
monkeypatch: pytest.MonkeyPatch,
) -> None:
top_match = make_node(
3,
"SPAN",
attributes=["class", "match", "data-location", "top"],
parent_id=2,
)
inner_match = make_node(
9,
"SPAN",
attributes=["class", "match", "data-location", "inner"],
parent_id=8,
)
inner_doc = make_node(
8,
"#document",
children=[inner_match],
)
inner_iframe = make_node(
7,
"IFRAME",
content_document=inner_doc,
parent_id=5,
)
outer_match = make_node(
6,
"SPAN",
attributes=["class", "match", "data-location", "outer"],
parent_id=5,
)
outer_doc = make_node(
5,
"#document",
children=[outer_match, inner_iframe],
)
outer_iframe = make_node(
4,
"IFRAME",
content_document=outer_doc,
parent_id=2,
)
cross_origin_iframe = make_node(
10,
"IFRAME",
content_document=None,
parent_id=2,
)
body = make_node(
2,
"BODY",
children=[top_match, outer_iframe, cross_origin_iframe],
parent_id=1,
)
doc = make_node(1, "#document", children=[body])

matches_by_document_id = {
doc.node_id: [top_match.node_id],
outer_doc.node_id: [outer_match.node_id],
inner_doc.node_id: [inner_match.node_id],
}
queried_document_ids: list[zd.cdp.dom.NodeId] = []

async def send(
cdp_obj: Generator[dict[str, Any], dict[str, Any], Any],
_is_update: bool = False,
) -> Any:
command = next(cdp_obj)
if command["method"] == "DOM.getDocument":
return doc
if command["method"] == "DOM.querySelectorAll":
node_id = zd.cdp.dom.NodeId(command["params"]["nodeId"])
queried_document_ids.append(node_id)
return matches_by_document_id[node_id]
raise AssertionError(f"Unexpected CDP command: {command['method']}")

tab = zd.Tab.__new__(zd.Tab)
monkeypatch.setattr(tab, "send", send)

results = await tab.query_selector_all(".match", _include_frames=True)

assert {result.attrs["data-location"] for result in results} == {
"top",
"outer",
"inner",
}
assert set(queried_document_ids) == {
doc.node_id,
outer_doc.node_id,
inner_doc.node_id,
}


async def test_xpath(browser: zd.Browser) -> None:
tab = await browser.get(sample_file("groceries.html"))

Expand Down
60 changes: 49 additions & 11 deletions zendriver/core/tab.py
Original file line number Diff line number Diff line change
Expand Up @@ -325,12 +325,9 @@

while True:
items = []
if include_frames:
frames = await self.query_selector_all("iframe")
for fr in frames:
items.extend(await fr.query_selector_all(selector))

items.extend(await self.query_selector_all(selector))
items.extend(
await self.query_selector_all(selector, _include_frames=include_frames)
)

if items:
return items
Expand Down Expand Up @@ -408,72 +405,111 @@
await self.wait()
return self

async def query_selector_all(
self,
selector: str,
_node: cdp.dom.Node | Element | None = None,
_include_frames: bool = False,
) -> List[Element]:
"""
equivalent of javascripts document.querySelectorAll.
this is considered one of the main methods to use in this package.

it returns all matching :py:obj:`zendriver.Element` objects.

:param selector: css selector. (first time? => https://www.w3schools.com/cssref/css_selectors.php )
:param _node: internal use
:return:
:rtype:
"""
doc: Any
content_doc_nodes = []
if not _node:
# Returns all document node ids here
doc = await self.send(cdp.dom.get_document(-1, True))
if _include_frames:
# Collect all iframe content_document nodes
stack = [doc]
while stack:
# pop off items to grab the nodes
node = stack.pop()
if node.content_document:
# save the nodes
content_doc_nodes.append(node.content_document)
stack.append(node.content_document)
if node.children:
# add back child nodes to pop later on
stack.extend(node.children)

else:
doc = _node
if _node.node_name == "IFRAME":
doc = _node.content_document
if doc is None:
return [] # cross-origin iframes block access to content_document, skip gracefully
node_ids = []

try:
node_ids = await self.send(
cdp.dom.query_selector_all(doc.node_id, selector)
)
if _include_frames:
for cd_node in content_doc_nodes:
try:
node_ids.extend(
await self.send(
cdp.dom.query_selector_all(cd_node.node_id, selector)
)
)
except Exception as e:
print(f"Exception in new content doc ids loop {e}")

except ProtocolException as e:
if _node is not None:
if e.message is not None and "could not find node" in e.message.lower():
if getattr(_node, "__last", None):
delattr(_node, "__last")
return []
# if supplied node is not found, the dom has changed since acquiring the element
# therefore we need to update our passed node and try again
if isinstance(_node, element.Element):
await _node.update()
# make sure this isn't turned into infinite loop
setattr(_node, "__last", True)
return await self.query_selector_all(selector, _node)
return await self.query_selector_all(
selector, _node, _include_frames=_include_frames
)
else:
if e.message is not None and "could not find node" in e.message.lower():
# The document node is stale; refetch and retry once
doc = await self.send(cdp.dom.get_document(-1, True))
# Prevent double-retry by marking this node as 'last attempt'
setattr(doc, "__last", True)
return await self.query_selector_all(selector, doc)
return await self.query_selector_all(
selector, doc, _include_frames=_include_frames
)

await self.disable_dom_agent()
raise
if not node_ids:
return []
items = []

for nid in node_ids:
node = util.filter_recurse(doc, lambda n: n.node_id == nid)
if not node:
for cd_node in content_doc_nodes:
node = util.filter_recurse(cd_node, lambda n: n.node_id == nid)
if node:
break
# we pass along the retrieved document tree,
# to improve performance
if not node:
continue
elem = element.create(node, self, doc)
items.append(elem)

return items

Check notice on line 512 in zendriver/core/tab.py

View check run for this annotation

codefactor.io / CodeFactor

zendriver/core/tab.py#L408-L512

Complex Method

async def query_selector(
self,
Expand Down Expand Up @@ -608,8 +644,10 @@
if iframe_elem.content_document:
iframe_text_nodes = util.filter_recurse_all(
iframe_elem,
lambda node: node.node_type == 3 # noqa
and text.lower() in node.node_value.lower(),
lambda node: (
node.node_type == 3 # noqa
and text.lower() in node.node_value.lower()
),
)
if iframe_text_nodes:
iframe_text_elems = [
Expand Down Expand Up @@ -970,7 +1008,7 @@

async def fullscreen(self) -> None:
"""
minimize page/tab/window
fullscreen page/tab/window
"""
return await self.set_window_state(state="fullscreen")

Expand Down