Skip to content

Commit 763919d

Browse files
authored
Adding custom Salesforce connection type + SalesforceToS3Operator updates (#17162)
1 parent 45a49e0 commit 763919d

File tree

8 files changed

+71
-71
lines changed

8 files changed

+71
-71
lines changed

airflow/providers/amazon/aws/example_dags/example_salesforce_to_s3.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -54,19 +54,18 @@
5454

5555
store_to_s3_data_lake = S3CopyObjectOperator(
5656
task_id="store_to_s3_data_lake",
57-
source_bucket_key=upload_salesforce_data_to_s3_landing.output["s3_uri"],
57+
source_bucket_key=upload_salesforce_data_to_s3_landing.output,
5858
dest_bucket_name="data_lake",
5959
dest_bucket_key=f"{BASE_PATH}/{date_prefixes}/{FILE_NAME}",
6060
)
6161

6262
delete_data_from_s3_landing = S3DeleteObjectsOperator(
6363
task_id="delete_data_from_s3_landing",
64-
bucket=upload_salesforce_data_to_s3_landing.output["s3_bucket_name"],
65-
keys=upload_salesforce_data_to_s3_landing.output["s3_key"],
64+
bucket=upload_salesforce_data_to_s3_landing.s3_bucket_name,
65+
keys=upload_salesforce_data_to_s3_landing.s3_key,
6666
)
6767

6868
store_to_s3_data_lake >> delete_data_from_s3_landing
6969

7070
# Task dependencies created via `XComArgs`:
7171
# upload_salesforce_data_to_s3_landing >> store_to_s3_data_lake
72-
# upload_salesforce_data_to_s3_landing >> delete_data_from_s3_landing

airflow/providers/amazon/aws/transfers/salesforce_to_s3.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -106,8 +106,8 @@ def __init__(
106106
self.gzip = gzip
107107
self.acl_policy = acl_policy
108108

109-
def execute(self, context: Dict) -> Dict:
110-
salesforce_hook = SalesforceHook(conn_id=self.salesforce_conn_id)
109+
def execute(self, context: Dict) -> str:
110+
salesforce_hook = SalesforceHook(salesforce_conn_id=self.salesforce_conn_id)
111111
response = salesforce_hook.make_query(
112112
query=self.salesforce_query,
113113
include_deleted=self.include_deleted,
@@ -138,4 +138,4 @@ def execute(self, context: Dict) -> Dict:
138138
s3_uri = f"s3://{self.s3_bucket_name}/{self.s3_key}"
139139
self.log.info(f"Salesforce data uploaded to S3 at {s3_uri}.")
140140

141-
return {"s3_uri": s3_uri, "s3_bucket_name": self.s3_bucket_name, "s3_key": self.s3_key}
141+
return s3_uri

airflow/providers/google/cloud/transfers/salesforce_to_gcs.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ def __init__(
9797
self.query_params = query_params
9898

9999
def execute(self, context: Dict):
100-
salesforce = SalesforceHook(conn_id=self.salesforce_conn_id)
100+
salesforce = SalesforceHook(salesforce_conn_id=self.salesforce_conn_id)
101101
response = salesforce.make_query(
102102
query=self.query, include_deleted=self.include_deleted, query_params=self.query_params
103103
)

airflow/providers/salesforce/hooks/salesforce.py

Lines changed: 43 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
"""
2626
import logging
2727
import time
28-
from typing import Iterable, List, Optional
28+
from typing import Any, Dict, Iterable, List, Optional
2929

3030
import pandas as pd
3131
from simple_salesforce import Salesforce, api
@@ -37,29 +37,58 @@
3737

3838
class SalesforceHook(BaseHook):
3939
"""
40-
Create new connection to Salesforce and allows you to pull data out of SFDC and save it to a file.
40+
Creates new connection to Salesforce and allows you to pull data out of SFDC and save it to a file.
4141
4242
You can then use that file with other Airflow operators to move the data into another data source.
4343
44-
:param conn_id: the name of the connection that has the parameters we need to connect to Salesforce.
45-
The connection should be type `http` and include a user's security token in the `Extras` field.
44+
:param conn_id: The name of the connection that has the parameters needed to connect to Salesforce.
45+
The connection should be of type `Salesforce`.
4646
:type conn_id: str
4747
4848
.. note::
49-
For the HTTP connection type, you can include a
50-
JSON structure in the `Extras` field.
51-
We need a user's security token to connect to Salesforce.
52-
So we define it in the `Extras` field as `{"security_token":"YOUR_SECURITY_TOKEN"}`
53-
54-
For sandbox mode, add `{"domain":"test"}` in the `Extras` field
49+
To connect to Salesforce make sure the connection includes a Username, Password, and Security Token.
50+
If in sandbox, enter a Domain value of 'test'. Login methods such as IP filtering and JWT are not
51+
supported currently.
5552
5653
"""
5754

58-
def __init__(self, conn_id: str) -> None:
55+
conn_name_attr = "salesforce_conn_id"
56+
default_conn_name = "salesforce_default"
57+
conn_type = "salesforce"
58+
hook_name = "Salesforce"
59+
60+
def __init__(self, salesforce_conn_id: str = default_conn_name) -> None:
5961
super().__init__()
60-
self.conn_id = conn_id
62+
self.conn_id = salesforce_conn_id
6163
self.conn = None
6264

65+
@staticmethod
66+
def get_connection_form_widgets() -> Dict[str, Any]:
67+
"""Returns connection widgets to add to connection form"""
68+
from flask_appbuilder.fieldwidgets import BS3PasswordFieldWidget, BS3TextFieldWidget
69+
from flask_babel import lazy_gettext
70+
from wtforms import PasswordField, StringField
71+
72+
return {
73+
"extra__salesforce__security_token": PasswordField(
74+
lazy_gettext("Security Token"), widget=BS3PasswordFieldWidget()
75+
),
76+
"extra__salesforce__domain": StringField(lazy_gettext("Domain"), widget=BS3TextFieldWidget()),
77+
}
78+
79+
@staticmethod
80+
def get_ui_field_behaviour() -> Dict:
81+
"""Returns custom field behaviour"""
82+
return {
83+
"hidden_fields": ["schema", "port", "extra", "host"],
84+
"relabeling": {
85+
"login": "Username",
86+
},
87+
"placeholders": {
88+
"extra__salesforce__domain": "(Optional) Set to 'test' if working in sandbox mode.",
89+
},
90+
}
91+
6392
def get_conn(self) -> api.Salesforce:
6493
"""Sign into Salesforce, only if we are not already signed in."""
6594
if not self.conn:
@@ -68,9 +97,8 @@ def get_conn(self) -> api.Salesforce:
6897
self.conn = Salesforce(
6998
username=connection.login,
7099
password=connection.password,
71-
security_token=extras['security_token'],
72-
instance_url=connection.host,
73-
domain=extras.get('domain'),
100+
security_token=extras["extra__salesforce__security_token"],
101+
domain=extras["extra__salesforce__domain"] or "login",
74102
)
75103
return self.conn
76104

airflow/providers/salesforce/provider.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,4 +55,5 @@ hooks:
5555
- airflow.providers.salesforce.hooks.salesforce
5656

5757
hook-class-names:
58+
- airflow.providers.salesforce.hooks.salesforce.SalesforceHook
5859
- airflow.providers.salesforce.hooks.tableau.TableauHook

docs/apache-airflow-providers-salesforce/connections/salesforce.rst

Lines changed: 13 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -19,55 +19,32 @@
1919

2020
Salesforce Connection
2121
=====================
22-
The HTTP connection type provides connection to Salesforce.
22+
The Salesforce connection type provides connection to Salesforce.
2323

2424
Configuring the Connection
2525
--------------------------
26-
Host (required)
27-
specify the host address to connect: ``https://your_host.lightning.force.com``
28-
29-
Login (required)
26+
Username (required)
3027
Specify the email address used to login to your account.
3128

3229
Password (required)
3330
Specify the password associated with the account.
3431

35-
Extra (required)
36-
Specify the extra parameters (as json dictionary) that can be used in Salesforce
37-
connection.
38-
The following parameter is required:
39-
40-
* ``security_token``: Salesforce token.
41-
42-
The following parameter is optional:
43-
44-
* ``domain``: set to ``test`` if working in sandbox mode.
45-
46-
For security reason we suggest you to use one of the secrets Backend to create this
47-
connection (Using ENVIRONMENT VARIABLE or Hashicorp Vault, GCP Secrets Manager etc).
48-
49-
50-
When specifying the connection as URI (in :envvar:`AIRFLOW_CONN_{CONN_ID}` variable) you should specify it
51-
following the standard syntax of DB connections - where extras are passed as parameters
52-
of the URI.
53-
54-
For example:
55-
56-
.. code-block:: bash
32+
Security Token (required)
33+
Specify the Salesforce security token for the username.
5734

58-
export AIRFLOW_CONN_SALESFORCE_DEFAULT='http://your_username:your_password@https%3A%2F%2Fyour_host.lightning.force.com?security_token=your_token'
35+
Domain (optional)
36+
The domain to using for connecting to Salesforce. Use common domains, such as 'login'
37+
or 'test', or Salesforce My domain. If not used, will default to 'login'.
5938

39+
For security reason we suggest you to use one of the secrets Backend to create this
40+
connection (Using ENVIRONMENT VARIABLE or Hashicorp Vault, GCP Secrets Manager etc).
6041

61-
Examples for the **Extra** field
62-
--------------------------------
63-
Setting up sandbox mode:
42+
When specifying the connection as URI (in :envvar:`AIRFLOW_CONN_{CONN_ID}` variable) you should specify it
43+
following the standard syntax of DB connections - where extras are passed as parameters of the URI. For example:
6444

65-
.. code-block:: json
45+
.. code-block:: bash
6646
67-
{
68-
"security_token": "your_token",
69-
"domain":"test"
70-
}
47+
export AIRFLOW_CONN_SALESFORCE_DEFAULT='http://your_username:your_password@https%3A%2F%2Fyour_host.lightning.force.com?security_token=your_token'
7148
7249
.. note::
7350
Airflow currently does not support other login methods such as IP filtering and JWT.

tests/providers/amazon/aws/transfers/test_salesforce_to_s3.py

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -95,13 +95,7 @@ def test_execute(self, mock_make_query, mock_write_object_to_file, mock_load_fil
9595
assert operator.gzip == GZIP
9696
assert operator.acl_policy == ACL_POLICY
9797

98-
expected_op_output = {
99-
"s3_uri": f"s3://{S3_BUCKET}/{S3_KEY}",
100-
"s3_bucket_name": S3_BUCKET,
101-
"s3_key": S3_KEY,
102-
}
103-
104-
assert expected_op_output == operator.execute({})
98+
assert f"s3://{S3_BUCKET}/{S3_KEY}" == operator.execute({})
10599

106100
mock_make_query.assert_called_once_with(
107101
query=QUERY, include_deleted=INCLUDE_DELETED, query_params=QUERY_PARAMS

tests/providers/salesforce/hooks/test_salesforce.py

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131

3232
class TestSalesforceHook(unittest.TestCase):
3333
def setUp(self):
34-
self.salesforce_hook = SalesforceHook(conn_id="conn_id")
34+
self.salesforce_hook = SalesforceHook(salesforce_conn_id="conn_id")
3535

3636
def test_get_conn_exists(self):
3737
self.salesforce_hook.conn = Mock(spec=Salesforce)
@@ -43,7 +43,9 @@ def test_get_conn_exists(self):
4343
@patch(
4444
"airflow.providers.salesforce.hooks.salesforce.SalesforceHook.get_connection",
4545
return_value=Connection(
46-
login="username", password="password", extra='{"security_token": "token", "domain": "test"}'
46+
login="username",
47+
password="password",
48+
extra='{"extra__salesforce__security_token": "token", "extra__salesforce__domain": "login"}',
4749
),
4850
)
4951
@patch("airflow.providers.salesforce.hooks.salesforce.Salesforce")
@@ -54,9 +56,8 @@ def test_get_conn(self, mock_salesforce, mock_get_connection):
5456
mock_salesforce.assert_called_once_with(
5557
username=mock_get_connection.return_value.login,
5658
password=mock_get_connection.return_value.password,
57-
security_token=mock_get_connection.return_value.extra_dejson["security_token"],
58-
instance_url=mock_get_connection.return_value.host,
59-
domain=mock_get_connection.return_value.extra_dejson.get("domain"),
59+
security_token=mock_get_connection.return_value.extra_dejson["extra__salesforce__security_token"],
60+
domain=mock_get_connection.return_value.extra_dejson.get("extra__salesforce__domain"),
6061
)
6162

6263
@patch("airflow.providers.salesforce.hooks.salesforce.Salesforce")

0 commit comments

Comments
 (0)