-
Notifications
You must be signed in to change notification settings - Fork 18
Expand file tree
/
Copy pathplugin.py
More file actions
367 lines (312 loc) · 15.1 KB
/
plugin.py
File metadata and controls
367 lines (312 loc) · 15.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
import concurrent.futures
import fnmatch
from functools import lru_cache, partial
import os.path
import pathlib
import re
import threading
import time
from typing import Dict, List, Optional, Set
import urllib.parse
import uuid
from bs4 import BeautifulSoup, SoupStrainer
from markdown.extensions.toc import slugify
from mkdocs import utils
from mkdocs.config import Config, config_options
from mkdocs.exceptions import PluginError
from mkdocs.plugins import BasePlugin
from mkdocs.structure.files import File, Files
from mkdocs.structure.pages import Page
import requests
import urllib3
URL_TIMEOUT = 10.0
_URL_BOT_ID = f'Bot {uuid.uuid4()}'
URL_HEADERS = {'User-Agent': _URL_BOT_ID, 'Accept-Language': '*'}
NAME = "htmlproofer"
MARKDOWN_ANCHOR_PATTERN = re.compile(r'([^#]+)(#(.+))?')
HEADING_PATTERN = re.compile(r'\s*#+\s*(.*)')
HTML_LINK_PATTERN = re.compile(r'<a (?:id|name)=\"([^\"]+)\">')
IMAGE_PATTERN = re.compile(r'\[\!\[.*\]\(.*\)\].*|\!\[.*\]\[.*\].*')
LOCAL_PATTERNS = [
re.compile(rf'https?://{local}')
for local in ('localhost', '127.0.0.1', 'app_server')
]
ATTRLIST_ANCHOR_PATTERN = re.compile(r'\{.*?\#([^\s\}]*).*?\}')
ATTRLIST_PATTERN = re.compile(r'\{.*?\}')
# Example emojis:
# :banana:
# :smiley_cat:
# :octicons-apps-16:
# :material-star:
EMOJI_PATTERN = re.compile(r'\:[a-z0-9_-]+\:')
urllib3.disable_warnings()
def log_info(msg, *args, **kwargs):
utils.log.info(f"{NAME}: {msg}", *args, **kwargs)
def log_warning(msg, *args, **kwargs):
utils.log.warning(f"{NAME}: {msg}", *args, **kwargs)
def log_error(msg, *args, **kwargs):
utils.log.error(f"{NAME}: {msg}", *args, **kwargs)
class HtmlProoferPlugin(BasePlugin):
files: List[File]
invalid_links = False
config_scheme = (
("enabled", config_options.Type(bool, default=True)),
('raise_error', config_options.Type(bool, default=False)),
('raise_error_after_finish', config_options.Type(bool, default=False)),
('raise_error_excludes', config_options.Type(dict, default={})),
('skip_downloads', config_options.Type(bool, default=False)),
('validate_external_urls', config_options.Type(bool, default=True)),
('validate_rendered_template', config_options.Type(bool, default=False)),
('ignore_urls', config_options.Type(list, default=[])),
('warn_on_ignored_urls', config_options.Type(bool, default=False)),
('ignore_pages', config_options.Type(list, default=[])),
('retry_max_times', config_options.Type(int, default=0)),
('max_workers', config_options.Type(int, default=None)),
)
def __init__(self):
self._local = threading.local()
self.files = []
self.scheme_handlers = {
"http": partial(HtmlProoferPlugin.resolve_web_scheme, self),
"https": partial(HtmlProoferPlugin.resolve_web_scheme, self),
}
super().__init__()
def _get_session(self) -> requests.Session:
"""Return a per-thread `requests.Session`, creating one lazily if needed."""
session = getattr(self._local, 'session', None)
if session is None:
session = requests.Session()
session.verify = False
session.headers.update(URL_HEADERS)
session.max_redirects = 5
self._local.session = session
return session
def on_post_build(self, config: Config) -> None:
if self.config['raise_error_after_finish'] and self.invalid_links:
raise PluginError("Invalid links present.")
def on_files(self, files: Files, config: Config) -> None:
# Store files to allow inspecting Markdown files in later stages.
# The values in files at this point are not guaranteed to be the same as the ones in the Page objects.
# For example, material blog plugin may modify the files after this event.
for f in files:
self.files.append(f)
def on_post_page(self, output_content: str, page: Page, config: Config) -> None:
if not self.config['enabled']:
return
# Optimization: At this point, we have all the files, so we can create
# a dictionary for faster lookups. Prior to this point, files are
# still being updated so creating a dictionary before now would result
# in incorrect values appearing as the key.
opt_files = {}
opt_files.update({os.path.normpath(file.url): file for file in self.files})
opt_files.update({os.path.normpath(file.src_uri): file for file in self.files})
# Optimization: only parse links and headings
# li, sup are used for footnotes
strainer = SoupStrainer(('a', 'h1', 'h2', 'h3', 'h4', 'h5', 'h6', 'li', 'sup', 'img'))
content = output_content if self.config['validate_rendered_template'] else page.content
soup = BeautifulSoup(str(content), 'html.parser', parse_only=strainer)
all_element_ids = set(str(tag['id']) for tag in soup.select('[id]'))
all_element_ids.add('') # Empty anchor is commonly used, but not real
urls = (set(str(a['href']) for a in soup.find_all('a', href=True)) |
set(str(img['src']) for img in soup.find_all('img')))
urls_to_check: List[str] = []
for url in urls:
if any(fnmatch.fnmatch(url, ignore_url) for ignore_url in self.config['ignore_urls']):
if self.config['warn_on_ignored_urls']:
log_warning(f"ignoring URL {url} from {page.file.src_path}")
elif any(
fnmatch.fnmatch(page.file.src_path, ignore_page)
for ignore_page in self.config['ignore_pages']
):
if self.config['warn_on_ignored_urls']:
log_warning(f"ignoring URL {url} from {page.file.src_path}")
else:
urls_to_check.append(url)
# Note on exception propagation: `future.result()` re-raises any exception
# from a worker thread. If `raise_error` is `True` and multiple URLs fail
# concurrently, only the first exception to be observed here will propagate;
# remaining futures continue to execute but their exceptions are not raised.
# This is acceptable because each thread independently logs/reports its
# failure via `report_invalid_url` before raising, so no errors are silently
# lost. When `raise_error_after_finish` is used instead, all failures are
# recorded via the `invalid_links` flag and surfaced in `on_post_build`.
with concurrent.futures.ThreadPoolExecutor(max_workers=self.config['max_workers']) as executor:
for future in concurrent.futures.as_completed(
executor.submit(self.check_url, url, page.file.src_path, all_element_ids, opt_files) for url in urls_to_check
):
future.result()
def report_invalid_url(self, url, url_status, src_path):
error = f'invalid url - {url} [{url_status}] [{src_path}]'
if self.config['raise_error']:
raise PluginError(error)
elif self.config['raise_error_after_finish']:
log_error(error)
self.invalid_links = True
else:
log_warning(error)
def get_external_url(self, url, scheme, src_path):
try:
return self.scheme_handlers[scheme](url)
except KeyError:
log_info(f'Unknown url-scheme "{scheme}:" detected. "{url}" from "{src_path}" will not be checked.')
return 0
@lru_cache(maxsize=1000)
def resolve_web_scheme(self, url: str) -> int:
try:
response = self._get_session().get(url, timeout=URL_TIMEOUT, stream=True)
if self.config['skip_downloads'] is False:
# Download the entire contents as to not break previous behaviour.
for _ in response.iter_content(chunk_size=1024 * 1024):
pass
return response.status_code
except requests.exceptions.Timeout:
return 504
except requests.exceptions.TooManyRedirects:
return -1
except requests.exceptions.ConnectionError:
return -1
def check_url(
self,
url: str,
src_path: str,
all_element_ids: Set[str],
files: Dict[str, File],
) -> None:
retry_times = 0
retry_max_times = self.config['retry_max_times']
retry_duration = 2
while retry_times <= retry_max_times:
url_status = self.get_url_status(url, src_path, all_element_ids, files)
retry_times += 1
if self.bad_url(url_status) and self.is_error(self.config, url, url_status):
if retry_times > retry_max_times:
self.report_invalid_url(url, url_status, src_path)
else:
log_info(f"Retrying URL {url} from {src_path} after {retry_duration} seconds...")
time.sleep(retry_duration)
retry_duration *= 2
def get_url_status(
self,
url: str,
src_path: str,
all_element_ids: Set[str],
files: Dict[str, File]
) -> int:
if any(pat.match(url) for pat in LOCAL_PATTERNS):
return 0
scheme, _, path, _, fragment = urllib.parse.urlsplit(url)
if scheme:
if self.config['validate_external_urls']:
return self.get_external_url(url, scheme, src_path)
return 0
if fragment and not path:
return 0 if url[1:] in all_element_ids else 404
else:
is_valid = self.is_url_target_valid(url, src_path, files)
url_status = 404
if not is_valid and self.is_error(self.config, url, url_status):
log_warning(f"Unable to locate source file for: {url}")
return url_status
return 0
@staticmethod
def is_url_target_valid(url: str, src_path: str, files: Dict[str, File]) -> bool:
match = MARKDOWN_ANCHOR_PATTERN.match(url)
if match is None:
return True
url_target, _, optional_anchor = match.groups()
source_file = HtmlProoferPlugin.find_source_file(url_target, src_path, files)
if source_file is None:
return False
# If there's an anchor (fragment) on the link, we try to find it in the source_file
if optional_anchor:
_, extension = os.path.splitext(source_file.src_uri)
# Currently only Markdown-based pages are supported, but conceptually others could be added below
if extension == ".md":
if source_file.page is None or source_file.page.markdown is None:
return False
if not HtmlProoferPlugin.contains_anchor(source_file.page.markdown, optional_anchor):
return False
return True
@staticmethod
def find_target_markdown(url: str, src_path: str, files: Dict[str, File]) -> Optional[str]:
"""From a built URL, find the original Markdown source from the project that built it."""
file = HtmlProoferPlugin.find_source_file(url, src_path, files)
if file and file.page:
return file.page.markdown
return None
@staticmethod
def find_source_file(url: str, src_path: str, files: Dict[str, File]) -> Optional[File]:
"""From a built URL, find the original file from the project that built it."""
if len(url) > 1 and url[0] == '/':
# Convert root/site paths
search_path = os.path.normpath(url[1:])
else:
# Handle relative links by looking up the destination url for the
# src_path and getting the parent directory.
try:
dest_uri = files[src_path].dest_uri
src_dir = urllib.parse.quote(str(pathlib.Path(dest_uri).parent), safe='/\\')
search_path = os.path.normpath(str(pathlib.Path(src_dir) / pathlib.Path(url)))
except KeyError:
return None
try:
return files[search_path]
except KeyError:
return None
@staticmethod
def contains_anchor(markdown: str, anchor: str) -> bool:
"""Check if a set of Markdown source text contains a heading that corresponds to a
given anchor."""
for line in markdown.splitlines():
# Markdown allows whitespace before headers and an arbitrary number of #'s.
heading_match = HEADING_PATTERN.match(line)
if heading_match is not None:
heading = heading_match.groups()[0]
# Headings are allowed to have attr_list after them, of the form:
# # Heading { #testanchor .testclass }
# # Heading {: #testanchor .testclass }
# # Heading {.testclass #testanchor}
# # Heading {.testclass}
# these can override the headings anchor id, or alternatively just provide additional class etc.
attr_list_anchor_match = ATTRLIST_ANCHOR_PATTERN.match(heading)
if attr_list_anchor_match is not None:
attr_list_anchor = heading_match.groups()[1]
if anchor == attr_list_anchor:
return True
heading = re.sub(ATTRLIST_PATTERN, '', heading) # remove any attribute list from heading, before slugify
# Headings are allowed to have images after them, of the form:
# # Heading [] or ![Image][image-reference]
# But these images are not included in the generated anchor, so remove them.
heading = re.sub(IMAGE_PATTERN, '', heading)
# Headings are allowed to have emojis in them under certain Mkdocs themes.
# https://squidfunk.github.io/mkdocs-material/setup/extensions/python-markdown-extensions/#emoji
heading = re.sub(EMOJI_PATTERN, '', heading)
anchor_slug = slugify(heading, '-')
if anchor == anchor_slug:
return True
# Check for HTML anchors using id or name attributes
# Multiple anchors can exist on a single line, so find all of them
for html_anchor in re.findall(HTML_LINK_PATTERN, line):
if anchor == html_anchor:
return True
# Any attribute list at end of paragraphs or after images can also generate an anchor (in addition to
# the heading ones) so gather those and check as well (multiple could be a line so gather all)
for attr_list_anchor in re.findall(ATTRLIST_ANCHOR_PATTERN, line):
if anchor == attr_list_anchor:
return True
return False
@staticmethod
def bad_url(url_status: int) -> bool:
if url_status == -1:
return True
elif url_status >= 400:
return True
else:
return False
@staticmethod
def is_error(config: Config, url: str, url_status: int) -> bool:
excludes = config['raise_error_excludes'].get(url_status, [])
if any(fnmatch.fnmatch(url, exclude_url) for exclude_url in excludes):
return False
else:
return True