Skip to content

Commit fb6244a

Browse files
committed
sql: introduce and mandate PostgreSQL connections
Introduce a new PostgreSQL connection type, as in... CREATE CONNECTION pgconn FOR POSTGRES HOST postgres, DATABASE postgres, USER postgres, PASSWORD SECRET pgpass; CREATE SOURCE pg FOR POSTGRES CONNECTION pgconn; The connection options correspond to the old libpq connection parameters in the obvious way. As decided in #13374, this commit also removes the old syntax for specifying PostgreSQL connection details inline in the `CREATE SOURCE` statement; connection details *must* now be specified in `CREATE CONNECTION`. Touches #11504. Touches #13374.
1 parent 6c1b273 commit fb6244a

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

64 files changed

+1102
-230
lines changed

Cargo.lock

Lines changed: 9 additions & 5 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/adapter/src/catalog/builtin_table_updates.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -311,6 +311,7 @@ impl CatalogState {
311311
mz_storage::client::connections::Connection::Csr { .. } => {
312312
"confluent-schema-registry"
313313
}
314+
mz_storage::client::connections::Connection::Postgres { .. } => "postgres",
314315
}),
315316
]),
316317
diff,

src/adapter/src/coord.rs

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -5501,7 +5501,7 @@ impl<S: Append + 'static> Coordinator<S> {
55015501
let mut sinks_to_drop = vec![];
55025502
let mut indexes_to_drop = vec![];
55035503
let mut recorded_views_to_drop = vec![];
5504-
let mut replication_slots_to_drop: HashMap<String, Vec<String>> = HashMap::new();
5504+
let mut replication_slots_to_drop: Vec<(tokio_postgres::Config, String)> = vec![];
55055505
let mut secrets_to_drop = vec![];
55065506

55075507
for op in &ops {
@@ -5514,14 +5514,17 @@ impl<S: Append + 'static> Coordinator<S> {
55145514
sources_to_drop.push(*id);
55155515
match &source.source_desc.connection {
55165516
SourceConnection::Postgres(PostgresSourceConnection {
5517-
conn,
5517+
connection,
55185518
details,
55195519
..
55205520
}) => {
5521-
replication_slots_to_drop
5522-
.entry(conn.clone())
5523-
.or_insert_with(Vec::new)
5524-
.push(details.slot.clone());
5521+
let config = connection
5522+
.config(&self.connection_context.secrets_reader)
5523+
.await
5524+
.unwrap_or_else(|e| {
5525+
panic!("Postgres source {id} missing secrets: {e}")
5526+
});
5527+
replication_slots_to_drop.push((config, details.slot.clone()));
55255528
}
55265529
_ => {}
55275530
}
@@ -5607,12 +5610,16 @@ impl<S: Append + 'static> Coordinator<S> {
56075610
if !replication_slots_to_drop.is_empty() {
56085611
// TODO(guswynn): see if there is more relevant info to add to this name
56095612
task::spawn(|| "drop_replication_slots", async move {
5610-
for (conn, slot_names) in replication_slots_to_drop {
5613+
for (config, slot_name) in replication_slots_to_drop {
56115614
// Try to drop the replication slots, but give up after a while.
56125615
let _ = Retry::default()
56135616
.max_duration(Duration::from_secs(30))
5614-
.retry_async(|_state| {
5615-
mz_postgres_util::drop_replication_slots(&conn, &slot_names)
5617+
.retry_async(|_state| async {
5618+
mz_postgres_util::drop_replication_slots(
5619+
config.clone(),
5620+
&[&slot_name],
5621+
)
5622+
.await
56165623
})
56175624
.await;
56185625
}

src/persist/src/postgres.rs

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,9 @@ use std::time::Instant;
1414
use anyhow::{anyhow, bail, Context};
1515
use async_trait::async_trait;
1616
use bytes::Bytes;
17-
use openssl::ssl::{SslConnector, SslFiletype, SslMethod, SslVerifyMode};
17+
use openssl::pkey::PKey;
18+
use openssl::ssl::{SslConnector, SslMethod, SslVerifyMode};
19+
use openssl::x509::X509;
1820
use postgres_openssl::MakeTlsConnector;
1921
use tokio::task::JoinHandle;
2022
use tokio_postgres::config::SslMode;
@@ -208,15 +210,17 @@ fn make_tls(config: &tokio_postgres::Config) -> Result<MakeTlsConnector, anyhow:
208210
// Configure certificates
209211
match (config.get_ssl_cert(), config.get_ssl_key()) {
210212
(Some(ssl_cert), Some(ssl_key)) => {
211-
builder.set_certificate_file(ssl_cert, SslFiletype::PEM)?;
212-
builder.set_private_key_file(ssl_key, SslFiletype::PEM)?;
213+
builder.set_certificate(&*X509::from_pem(ssl_cert)?)?;
214+
builder.set_private_key(&*PKey::private_key_from_pem(ssl_key)?)?;
213215
}
214216
(None, Some(_)) => bail!("must provide both sslcert and sslkey, but only provided sslkey"),
215217
(Some(_), None) => bail!("must provide both sslcert and sslkey, but only provided sslcert"),
216218
_ => {}
217219
}
218220
if let Some(ssl_root_cert) = config.get_ssl_root_cert() {
219-
builder.set_ca_file(ssl_root_cert)?
221+
builder
222+
.cert_store_mut()
223+
.add_cert(X509::from_pem(ssl_root_cert)?)?;
220224
}
221225

222226
let mut tls_connector = MakeTlsConnector::new(builder.build());

src/postgres-util/src/lib.rs

Lines changed: 16 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,9 @@
1010
//! PostgreSQL utility library.
1111
1212
use anyhow::{anyhow, bail};
13-
use openssl::ssl::{SslConnector, SslFiletype, SslMethod, SslVerifyMode};
13+
use openssl::pkey::PKey;
14+
use openssl::ssl::{SslConnector, SslMethod, SslVerifyMode};
15+
use openssl::x509::X509;
1416
use postgres_openssl::MakeTlsConnector;
1517
use std::time::Duration;
1618
use tokio_postgres::config::{ReplicationMode, SslMode};
@@ -52,15 +54,17 @@ pub fn make_tls(config: &Config) -> Result<MakeTlsConnector, anyhow::Error> {
5254
// Configure certificates
5355
match (config.get_ssl_cert(), config.get_ssl_key()) {
5456
(Some(ssl_cert), Some(ssl_key)) => {
55-
builder.set_certificate_file(ssl_cert, SslFiletype::PEM)?;
56-
builder.set_private_key_file(ssl_key, SslFiletype::PEM)?;
57+
builder.set_certificate(&*X509::from_pem(ssl_cert)?)?;
58+
builder.set_private_key(&*PKey::private_key_from_pem(ssl_key)?)?;
5759
}
5860
(None, Some(_)) => bail!("must provide both sslcert and sslkey, but only provided sslkey"),
5961
(Some(_), None) => bail!("must provide both sslcert and sslkey, but only provided sslcert"),
6062
_ => {}
6163
}
6264
if let Some(ssl_root_cert) = config.get_ssl_root_cert() {
63-
builder.set_ca_file(ssl_root_cert)?
65+
builder
66+
.cert_store_mut()
67+
.add_cert(X509::from_pem(ssl_root_cert)?)?;
6468
}
6569

6670
let mut tls_connector = MakeTlsConnector::new(builder.build());
@@ -85,13 +89,12 @@ pub fn make_tls(config: &Config) -> Result<MakeTlsConnector, anyhow::Error> {
8589
/// - Invalid connection string, user information, or user permissions.
8690
/// - Upstream publication does not exist or contains invalid values.
8791
pub async fn publication_info(
88-
conn: &str,
92+
config: &Config,
8993
publication: &str,
9094
) -> Result<Vec<PostgresTableDesc>, anyhow::Error> {
91-
let config = conn.parse()?;
9295
let tls = make_tls(&config)?;
9396
let (client, connection) = config.connect(tls).await?;
94-
task::spawn(|| format!("postgres_publication_info:{conn}"), connection);
97+
task::spawn(|| "postgres_publication_info", connection);
9598

9699
client
97100
.query(
@@ -169,16 +172,12 @@ pub async fn publication_info(
169172
Ok(table_infos)
170173
}
171174

172-
pub async fn drop_replication_slots(conn: &str, slots: &[String]) -> Result<(), anyhow::Error> {
173-
let config = conn.parse()?;
175+
pub async fn drop_replication_slots(config: Config, slots: &[&str]) -> Result<(), anyhow::Error> {
174176
let tls = make_tls(&config)?;
175-
let (client, connection) = tokio_postgres::connect(&conn, tls).await?;
176-
task::spawn(
177-
|| format!("postgres_drop_replication_slots:{conn}"),
178-
connection,
179-
);
177+
let (client, connection) = config.connect(tls).await?;
178+
task::spawn(|| "postgres_drop_replication_slots", connection);
180179

181-
let replication_client = connect_replication(conn).await?;
180+
let replication_client = connect_replication(config).await?;
182181
for slot in slots {
183182
let rows = client
184183
.query(
@@ -209,18 +208,14 @@ pub async fn drop_replication_slots(conn: &str, slots: &[String]) -> Result<(),
209208
}
210209

211210
/// Starts a replication connection to the upstream database
212-
pub async fn connect_replication(conn: &str) -> Result<Client, anyhow::Error> {
213-
let mut config: Config = conn.parse()?;
211+
pub async fn connect_replication(mut config: Config) -> Result<Client, anyhow::Error> {
214212
let tls = make_tls(&config)?;
215213
let (client, connection) = config
216214
.replication_mode(ReplicationMode::Logical)
217215
.connect_timeout(Duration::from_secs(30))
218216
.keepalives_idle(Duration::from_secs(10 * 60))
219217
.connect(tls)
220218
.await?;
221-
task::spawn(
222-
|| format!("postgres_connect_replication:{conn}"),
223-
connection,
224-
);
219+
task::spawn(|| "postgres_connect_replication", connection);
225220
Ok(client)
226221
}

src/proto/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,9 @@ mz-ore = { path = "../ore", default-features = false }
1515
proptest = { git = "https://github.com/MaterializeInc/proptest.git", default-features = false, features = ["std"] }
1616
prost = { version = "0.10.3", features = ["no-recursion-limit"] }
1717
regex = "1.6.0"
18+
serde = { version = "1.0.138", features = ["derive"] }
1819
serde_json = { version = "1.0.82", features = ["arbitrary_precision"] }
20+
tokio-postgres = { git = "https://github.com/MaterializeInc/rust-postgres", optional = true }
1921
url = { version = "2.2.2", features = ["serde"] }
2022
uuid = "1.1.2"
2123

src/proto/build.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,9 @@
99

1010
fn main() {
1111
prost_build::Config::new()
12-
.compile_protos(&["proto/src/proto.proto"], &[".."])
12+
.compile_protos(
13+
&["proto/src/proto.proto", "proto/src/tokio_postgres.proto"],
14+
&[".."],
15+
)
1316
.unwrap();
1417
}

src/proto/src/lib.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,9 @@ use uuid::Uuid;
1616

1717
use mz_ore::cast::CastFrom;
1818

19+
#[cfg(feature = "tokio-postgres")]
20+
pub mod tokio_postgres;
21+
1922
include!(concat!(env!("OUT_DIR"), "/mz_proto.rs"));
2023

2124
/// An error thrown when trying to convert from a `*.proto`-generated type
@@ -38,6 +41,8 @@ pub enum TryFromProtoError {
3841
/// Indicates an `Option<U>` field in the `Proto$T` that should be set,
3942
/// but for some reason it is not. In practice this should never occur.
4043
MissingField(String),
44+
/// Indicates an unknown enum variant in `Proto$T`.
45+
UnknownEnumVariant(String),
4146
/// Indicates that the serialized ShardId value failed to deserialize, according
4247
/// to its custom deserialization logic.
4348
InvalidShardId(String),
@@ -59,6 +64,11 @@ impl TryFromProtoError {
5964
pub fn missing_field<T: ToString>(s: T) -> TryFromProtoError {
6065
TryFromProtoError::MissingField(s.to_string())
6166
}
67+
68+
/// Construct a new [`TryFromProtoError::UnknownEnumVariant`] instance.
69+
pub fn unknown_enum_variant<T: ToString>(s: T) -> TryFromProtoError {
70+
TryFromProtoError::UnknownEnumVariant(s.to_string())
71+
}
6272
}
6373

6474
impl From<TryFromIntError> for TryFromProtoError {
@@ -127,6 +137,7 @@ impl std::fmt::Display for TryFromProtoError {
127137
DeserializationError(error) => error.fmt(f),
128138
RowConversionError(msg) => write!(f, "Row packing failed: `{}`", msg),
129139
MissingField(field) => write!(f, "Missing value for `{}`", field),
140+
UnknownEnumVariant(field) => write!(f, "Unknown enum value for `{}`", field),
130141
InvalidShardId(value) => write!(f, "Invalid value of ShardId found: `{}`", value),
131142
CodecMismatch(error) => error.fmt(f),
132143
InvalidPersistState(error) => error.fmt(f),
@@ -156,6 +167,7 @@ impl std::error::Error for TryFromProtoError {
156167
DateConversionError(_) => None,
157168
RowConversionError(_) => None,
158169
MissingField(_) => None,
170+
UnknownEnumVariant(_) => None,
159171
InvalidShardId(_) => None,
160172
CodecMismatch(_) => None,
161173
InvalidPersistState(_) => None,

src/proto/src/tokio_postgres.proto

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
// Copyright Materialize, Inc. and contributors. All rights reserved.
2+
//
3+
// Use of this software is governed by the Business Source License
4+
// included in the LICENSE file.
5+
//
6+
// As of the Change Date specified in that file, in accordance with
7+
// the Business Source License, use of this software will be governed
8+
// by the Apache License, Version 2.0.
9+
10+
syntax = "proto3";
11+
12+
import "google/protobuf/empty.proto";
13+
14+
package mz_proto.tokio_postgres;
15+
16+
message ProtoSslMode {
17+
oneof kind {
18+
google.protobuf.Empty unknown = 1;
19+
google.protobuf.Empty disable = 2;
20+
google.protobuf.Empty prefer = 3;
21+
google.protobuf.Empty require = 4;
22+
google.protobuf.Empty verify_ca = 5;
23+
google.protobuf.Empty verify_full = 6;
24+
}
25+
}

0 commit comments

Comments
 (0)