generated from fastai/nbdev_template
-
Notifications
You must be signed in to change notification settings - Fork 70
Expand file tree
/
Copy pathcore.py
More file actions
345 lines (295 loc) · 15.2 KB
/
core.py
File metadata and controls
345 lines (295 loc) · 15.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
"""Detailed information on the GhApi API"""
# AUTOGENERATED! DO NOT EDIT! File to edit: ../00_core.ipynb.
# %% auto #0
__all__ = ['GH_HOST', 'img_md_pat', 'EMPTY_TREE_SHA', 'print_summary', 'GhApi', 'date2gh', 'gh2date']
# %% ../00_core.ipynb #5b5cba7b
from fastcore.all import *
from .metadata import funcs
import mimetypes,base64
from inspect import signature,Parameter,Signature
from urllib.request import Request
from urllib.error import HTTPError
from urllib.parse import quote
from datetime import datetime, timedelta, timezone
from pprint import pprint
from time import sleep
import os, shutil, tempfile, subprocess, fnmatch
# %% ../00_core.ipynb #8d2b1a54
GH_HOST = os.getenv('GH_HOST', "https://api.github.com")
_DOC_URL = 'https://docs.github.com/'
# %% ../00_core.ipynb #ba730c65
def _preview_hdr(preview): return {'Accept': f'application/vnd.github.{preview}-preview+json'} if preview else {}
def _mk_param(nm, **kwargs): return Parameter(nm, kind=Parameter.POSITIONAL_OR_KEYWORD, **kwargs)
def _mk_sig_detls(o):
res = {}
if o[0]!=object: res['annotation']=o[0]
res['default'] = o[1] if len(o)>1 else None
return res
def _mk_sig(req_args, opt_args, anno_args):
params = [_mk_param(k) for k in req_args]
params += [_mk_param(k, default=v) for k,v in opt_args.items()]
params += [_mk_param(k, **_mk_sig_detls(v)) for k,v in anno_args.items()]
return Signature(params)
class _GhObj: pass
# %% ../00_core.ipynb #d4a51c1a
class _GhVerb(_GhObj):
__slots__ = 'path,verb,tag,name,summary,url,route_ps,params,data,preview,client,__doc__'.split(',')
def __init__(self, path, verb, oper, summary, url, params, data, preview, client, kwargs):
tag,*name = oper.split('/')
name = '__'.join(name)
name = name.replace('-','_')
path,_,_ = partial_format(path, **kwargs)
route_ps = stringfmt_names(path)
__doc__ = summary
data = {o[0]:o[1:] for o in data}
store_attr()
def __call__(self, *args, headers=None, **kwargs):
headers = {**_preview_hdr(self.preview),**(headers or {})}
d = list(self.data)
flds = [o for o in self.route_ps+self.params+d if o not in kwargs]
for a,b in zip(args,flds): kwargs[b]=a
route_p,query_p,data_p = [{p:kwargs[p] for p in o if p in kwargs}
for o in (self.route_ps,self.params,d)]
return self.client(self.path, self.verb, headers=headers, route=route_p, query=query_p, data=data_p)
def __str__(self): return f'{self.tag}.{self.name}{signature(self)}\n{self.doc_url}'
@property
def __signature__(self): return _mk_sig(self.route_ps, dict.fromkeys(self.params), self.data)
__call__.__signature__ = __signature__
@property
def doc_url(self): return _DOC_URL + self.url.replace(" ","_")
def _repr_markdown_(self):
params = ', '.join(self.route_ps+self.params+list(self.data))
return f'[{self.tag}.{self.name}]({self.doc_url})({params}): *{self.summary}*'
__repr__ = _repr_markdown_
# %% ../00_core.ipynb #dd7e6b61
class _GhVerbGroup(_GhObj):
def __init__(self, name, verbs):
self.name,self.verbs = name,verbs
for o in verbs: setattr(self, o.name, o)
def __str__(self): return "\n".join(str(v) for v in self.verbs)
def _repr_markdown_(self): return "\n".join(f'- {v._repr_markdown_()}' for v in self.verbs)
# %% ../00_core.ipynb #531aee7d
_docroot = 'https://docs.github.com/rest/reference/'
# %% ../00_core.ipynb #f361159b
def print_summary(req:Request):
"Print `Request.summary` with the token (if any) removed"
pprint(req.summary('Authorization'))
# %% ../00_core.ipynb #dd66fcdd
_binary_cts = ('octet-stream', 'zip', 'gzip', 'tar', 'image/', 'audio/', 'video/')
# %% ../00_core.ipynb #83e8a9ce
class GhApi(_GhObj):
def __init__(self, owner=None, repo=None, token=None, jwt_token=None, debug=None, limit_cb=None, gh_host=None,
authenticate=True, **kwargs):
self.headers = { 'Accept': 'application/vnd.github.v3+json' }
if authenticate:
token = token or os.getenv('GITHUB_TOKEN', None)
jwt_token = jwt_token or os.getenv('GITHUB_JWT_TOKEN', None)
if jwt_token: self.headers['Authorization'] = 'Bearer ' + jwt_token
elif token: self.headers['Authorization'] = 'token ' + token
else: warn('Neither GITHUB_TOKEN nor GITHUB_JWT_TOKEN found: running as unauthenticated')
if owner: kwargs['owner'] = owner
if repo: kwargs['repo' ] = repo
funcs_ = L(funcs).starmap(_GhVerb, client=self, kwargs=kwargs)
self.func_dict = {f'{o.path}:{o.verb.upper()}':o for o in funcs_}
self.groups = {k.replace('-','_'):_GhVerbGroup(k,v) for k,v in groupby(funcs_, 'tag').items()}
self.debug,self.limit_cb,self.limit_rem = debug,limit_cb,5000
self.gh_host = gh_host or GH_HOST
def __call__(self, path:str, verb:str=None, headers:dict=None, route:dict=None, query:dict=None, data=None, timeout=None):
"Call a fully specified `path` using HTTP `verb`, passing arguments to `fastcore.core.urlsend`"
if verb is None: verb = 'POST' if data else 'GET'
headers = {**self.headers,**(headers or {})}
if not path.startswith(('http://', 'https://')):
path = self.gh_host + path
if route:
for k,v in route.items(): route[k] = quote(str(route[k]), safe='')
debug = self.debug if self.debug else print_summary if os.getenv('GHAPI_DEBUG') else None
res,self.recv_hdrs = urlsend(path, verb, headers=headers or None, decode=False, debug=debug, return_headers=True,
route=route or None, query=query or None, data=data or None, return_json=False, timeout=timeout)
ct = self.recv_hdrs.get('Content-Type', '')
if not any(t in ct for t in _binary_cts): res = res.decode()
if 'json' in ct: res = loads(res)
if 'X-RateLimit-Remaining' in self.recv_hdrs:
newlim = self.recv_hdrs['X-RateLimit-Remaining']
if self.limit_cb is not None and newlim != self.limit_rem:
self.limit_cb(int(newlim),int(self.recv_hdrs['X-RateLimit-Limit']))
self.limit_rem = newlim
return dict2obj(res) if isinstance(res, (dict, list)) else res
def __dir__(self): return super().__dir__() + list(self.groups)
def _repr_markdown_(self): return "\n".join(f"- [{o}]({_docroot + o.replace('_', '-')})" for o in sorted(self.groups))
def __getattr__(self,k): return self.groups[k] if 'groups' in vars(self) and k in self.groups else stop(AttributeError(k))
def __getitem__(self, k):
"Lookup and call an endpoint by path and verb (which defaults to 'GET')"
a,b = k if isinstance(k,tuple) else (k,'GET')
return self.func_dict[f'{a}:{b.upper()}']
def full_docs(self):
return '\n'.join(f'## {gn}\n\n{group._repr_markdown_()}\n' for gn,group in sorted(self.groups.items()))
# %% ../00_core.ipynb #05cbdf91
def date2gh(dt:datetime)->str:
"Convert `dt` (which is assumed to be in UTC time zone) to a format suitable for GitHub API operations"
return f'{dt.replace(microsecond=0).isoformat()}Z'
# %% ../00_core.ipynb #3f4c8b27
def gh2date(dtstr:str)->datetime:
"Convert date string `dtstr` received from a GitHub API operation to a UTC `datetime`"
return datetime.fromisoformat(dtstr.replace('Z', ''))
# %% ../00_core.ipynb #16068542
img_md_pat = re.compile(r'!\[(?P<alt>.*?)\]\((?P<url>[^\s]+)\)')
def _run_subp(cmd):
r = subprocess.run(cmd, check=False, capture_output=True, text=True)
if r.returncode != 0: raise RuntimeError(r.stderr)
@patch
def create_gist(self:GhApi, description, content, filename='gist.txt', public=False, img_paths=None):
'Create a gist, optionally with images where each md img url will be placed with img upload urls.'
gist = self.gists.create(description, public=public, files={filename: {"content": content}})
if not img_paths: return gist
with tempfile.TemporaryDirectory() as clone_dir:
token = self.headers['Authorization'].split('token ')[1]
_run_subp(['git', 'clone', f'https://{token}@gist.github.com/{gist.id}.git', clone_dir])
clone_dir, img_paths = Path(clone_dir), L(img_paths).map(Path)
for o in img_paths: shutil.copy2(o, clone_dir/o.name)
_run_subp(['git', '-C', clone_dir, 'add', '.'])
_run_subp(['git', '-C', clone_dir, 'commit', '-m', 'Add images'])
_run_subp(['git', '-C', clone_dir, 'push'])
updated_gist = self.gists.get(gist.id)
img_urls = {o.name: updated_gist.files[o.name].raw_url for o in img_paths}
content = img_md_pat.sub(lambda m: f"![{m['alt']}]({img_urls.get(m['url'], m['url'])})", content)
return self.gists.update(gist.id, files={filename:{'content':content}})
# %% ../00_core.ipynb #4b7a278c
@patch
def delete_release(self:GhApi, release):
"Delete a release and its associated tag"
self.repos.delete_release(release.id)
self.git.delete_ref(f'tags/{release.tag_name}')
# %% ../00_core.ipynb #b2bf7e22
@patch
def upload_file(self:GhApi, rel, fn):
"Upload `fn` to endpoint for release `rel`"
fn = Path(fn)
url = rel.upload_url.replace('{?name,label}','')
mime = mimetypes.guess_type(fn, False)[0] or 'application/octet-stream'
return self(url, 'POST', headers={'Content-Type':mime}, query = {'name':fn.name}, data=fn.read_bytes())
# %% ../00_core.ipynb #3cad71a4
@patch
def create_release(self:GhApi, tag_name, branch='master', name=None, body='',
draft=False, prerelease=False, files=None):
"Wrapper for `GhApi.repos.create_release` which also uploads `files`"
if name is None: name = 'v'+tag_name
rel = self.repos.create_release(tag_name, target_commitish=branch, name=name, body=body,
draft=draft, prerelease=prerelease)
for file in listify(files): self.upload_file(rel, file)
return rel
# %% ../00_core.ipynb #2be73ae0
@patch
def list_tags(self:GhApi, prefix:str=''):
"List all tags, optionally filtered to those starting with `prefix`"
return self.git.list_matching_refs(f'tags/{prefix}')
# %% ../00_core.ipynb #303eeec6
@patch
def list_branches(self:GhApi, prefix:str=''):
"List all branches, optionally filtered to those starting with `prefix`"
return self.git.list_matching_refs(f'heads/{prefix}')
# %% ../00_core.ipynb #eb85edd7
# See https://stackoverflow.com/questions/9765453
EMPTY_TREE_SHA = '4b825dc642cb6eb9a060e54bf8d69288fbee4904'
# %% ../00_core.ipynb #ba6ab941
@patch
def create_branch_empty(self:GhApi, branch):
t = self.git.create_tree(base_tree=EMPTY_TREE_SHA, tree = [dict(
path='.dummy', content='ignore me', mode='100644', type='blob')])
c = self.git.create_commit(f'create {branch}', t.sha)
return self.git.create_ref(f'refs/heads/{branch}', c.sha)
# %% ../00_core.ipynb #68b150fc
@patch
def delete_tag(self:GhApi, tag:str):
"Delete a tag"
return self.git.delete_ref(f'tags/{tag}')
# %% ../00_core.ipynb #75c168a1
@patch
def delete_branch(self:GhApi, branch:str):
"Delete a branch"
return self.git.delete_ref(f'heads/{branch}')
# %% ../00_core.ipynb #96053795
@patch
def get_branch(self:GhApi, branch=None):
branch = branch or self.repos.get().default_branch
return self.list_branches(branch)[0]
# %% ../00_core.ipynb #93b24881
@patch
def list_files(self:GhApi, branch=None):
ref = self.get_branch(branch)
res = self.git.get_tree(ref.object.sha).tree
return {o.path:o for o in res}
# %% ../00_core.ipynb #ffc347b2
@patch
def get_content(self:GhApi, path):
res = self.repos.get_content(path)
return base64.b64decode(res.content)
# %% ../00_core.ipynb #9582ee1f
@patch
def create_or_update_file(self:GhApi, path, message, committer, author, content=None, sha=None, branch=''):
if not branch: branch = api.repos.get()['default_branch']
if not isinstance(content,bytes): content = content.encode()
content = base64.b64encode(content).decode()
kwargs = {'sha':sha} if sha else {}
return self.repos.create_or_update_file_contents(path, message, content=content,
branch=branch, committer=committer or {}, author=author or {}, **kwargs)
# %% ../00_core.ipynb #5044445d
@patch
def create_file(self:GhApi, path, message, committer, author, content=None, branch=None):
if not branch: branch = api.repos.get()['default_branch']
return self.create_or_update_file(path, message, branch=branch, committer=committer, content=content, author=author)
# %% ../00_core.ipynb #4e74c337
@patch
def delete_file(self:GhApi, path, message, committer, author, sha=None, branch=None):
if not branch: branch = api.repos.get()['default_branch']
if sha is None: sha = self.list_files()[path].sha
return self.repos.delete_file(path, message=message, sha=sha,
branch=branch, committer=committer, author=author)
# %% ../00_core.ipynb #93f6b559
@patch
def update_contents(self:GhApi, path, message, committer, author, content, sha=None, branch=None):
if not branch: branch = api.repos.get()['default_branch']
if sha is None: sha = self.list_files()[path].sha
return self.create_or_update_file(path, message, committer=committer, author=author, content=content, sha=sha, branch=branch)
# %% ../00_core.ipynb #444784ce
def _find_matches(path, pats):
"Returns matched patterns"
return L(pats).filter(lambda p: fnmatch.fnmatch(path, p))
# %% ../00_core.ipynb #9ffb3b8f
def _include(path, include, exclude):
"Returns True if path matches include patterns (if any) and doesn't match any exclude pattern."
if include and not any(fnmatch.fnmatch(path, p) for p in listify(include)): return False
if exclude and any(fnmatch.fnmatch(path, p) for p in listify(exclude)): return False
return True
# %% ../00_core.ipynb #802737b1
@patch
def _get_repo_files(self:GhApi, owner, repo, branch="main"):
return self.git.get_tree(owner=owner, repo=repo, tree_sha=branch, recursive=True)
@patch
def get_repo_files(self:GhApi, owner, repo, branch="main", inc=None, exc=None):
"Get all file items of a repo, optionally filtered."
tree = self._get_repo_files(owner, repo, branch)
return L(tree['tree']).filter(lambda o: o['type'] == 'blob' and _include(o.path, inc, exc))
# %% ../00_core.ipynb #5ef71bf5
@patch
def get_file_content(self:GhApi, path, owner, repo, branch="main"):
o = self.repos.get_content(owner, repo, path, ref=branch)
o['content_decoded'] = base64.b64decode(o.content).decode('utf-8')
return o
# %% ../00_core.ipynb #d02d1bab
@patch
@delegates(GhApi.get_repo_files)
def get_repo_contents(self:GhApi, owner, repo, branch='main', **kwargs):
repo_files = self.get_repo_files(owner, repo, **kwargs)
for s in ('inc','exc',): kwargs.pop(s)
return parallel(self.get_file_content, repo_files.attrgot("path"), owner=owner, repo=repo, branch=branch)
# %% ../00_core.ipynb #ac4ab4e0
@patch
def enable_pages(self:GhApi, branch=None, path="/"):
"Enable or update pages for a repo to point to a `branch` and `path`."
if path not in ('/docs','/'): raise Exception("path not in ('/docs','/')")
r = self.repos.get()
branch = branch or r.default_branch
source = {"branch": branch, "path": path}
if r.has_pages: return # self.repos.update_information_about_pages_site(source=source)
if len(self.list_branches(branch))==0: self.create_branch_empty(branch)
return self.repos.create_pages_site(source=source)