-
Notifications
You must be signed in to change notification settings - Fork 6
Expand file tree
/
Copy pathts_auth.py
More file actions
90 lines (72 loc) · 4.21 KB
/
ts_auth.py
File metadata and controls
90 lines (72 loc) · 4.21 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
import base64
from copy import deepcopy
from datetime import datetime
import hmac
import pytz
from urllib.parse import urlparse
import uuid
def add_basic(request_properties, customer_id, api_key):
new_request_properties = deepcopy(request_properties)
new_request_properties['headers']['Authorization'] = create_basic_auth_string(customer_id, api_key)
return new_request_properties['headers']
def add_digest(request_properties, customer_id, api_key):
new_request_properties = deepcopy(request_properties)
new_request_properties['path_url'] = urlparse(new_request_properties['url']).path
method = new_request_properties['method']
# Add any missing headers with default values
new_request_properties['headers']['x-ts-auth-method'] = new_request_properties['headers'].get('x-ts-auth-method', 'HMAC-SHA256')
new_request_properties['headers']['x-ts-nonce'] = new_request_properties['headers'].get('x-ts-nonce', uuid.uuid1().hex)
if method in ("PUT", "POST"):
new_request_properties['headers']['Content-Type'] = new_request_properties['headers'].get('Content-Type', 'application/x-www-form-urlencoded')
if 'Date' not in new_request_properties['headers'] and 'x-ts-date' not in new_request_properties['headers']:
new_request_properties['headers']['Date'] = format_current_date()
signature = generate_signature(customer_id, api_key, new_request_properties)
new_request_properties['headers']['Authorization'] = 'TSA ' + customer_id + ':' + signature
# Sort the headers in alpha order
new_headers = {k: new_request_properties['headers'][k] for k in sorted(new_request_properties['headers'])}
return new_headers
def create_basic_auth_string(customer_id, api_key):
auth_string = customer_id + ":" + api_key
auth_string = base64.b64encode(auth_string.encode())
auth_string = auth_string.decode("utf-8")
auth_string = "Basic " + auth_string
return auth_string
def format_current_date():
now = datetime.now(pytz.timezone('GMT'))
return now.strftime('%a, %d %b %Y %H:%M:%S %Z')
def generate_signature(customer_id, api_key, new_request_properties):
method = new_request_properties['method']
if new_request_properties['headers']['x-ts-auth-method'].upper() == 'HMAC-SHA1' or new_request_properties['headers']['x-ts-auth-method'].upper() == 'HMAC-SHA256':
auth_method = new_request_properties['headers']['x-ts-auth-method'].split('-')[1]
else:
auth_method = new_request_properties['headers']['x-ts-auth-method']
new_request_properties['body'] = new_request_properties.get('body', None)
fields = new_request_properties['body']
string_to_sign = method + '\n'
string_to_sign += new_request_properties['headers'].get('Content-Type', '') + '\n'
string_to_sign += new_request_properties['headers'].get('Date', '') + '\n'
string_to_sign += 'x-ts-auth-method:' + new_request_properties['headers']['x-ts-auth-method'] + '\n'
if 'x-ts-date' in new_request_properties['headers']:
string_to_sign += 'x-ts-date:' + new_request_properties['headers']['x-ts-date'] + '\n'
if 'x-ts-nonce' in new_request_properties['headers']:
string_to_sign = string_to_sign + 'x-ts-nonce:' + new_request_properties['headers']['x-ts-nonce'] + '\n'
if fields and fields is not None:
if new_request_properties['headers']['Content-Type'] == 'application/json':
string_to_sign += fields.decode('UTF-8') + '\n'
elif new_request_properties['headers']['Content-Type'] == 'application/x-www-form-urlencoded':
string_to_sign += fields + '\n'
else:
raise ValueError("Can't generate signature for this content type.")
string_to_sign += new_request_properties['path_url']
encoded_string_to_sign = string_to_sign.encode(encoding='utf-8')
decoded_api_key = base64.b64decode(api_key)
hmac_obj = hmac.new(decoded_api_key, encoded_string_to_sign, digestmod=auth_method)
signature = base64.b64encode(hmac_obj.digest()).decode('utf-8')
return signature
def pretty_print_request(req):
print('\n{}\n{}\r\n{}\r\n\r\n{}\n'.format(
'Request:',
req.method + ' ' + req.url,
'\r\n'.join('{}: {}'.format(k, v) for k, v in req.headers.items()),
req.body,
))