连接到外部数据源(JDBC)
平台包含很多 JDBC 数据库的驱动,可能需要安装相应版本的驱动程序才能连接到首选数据库。目前支持的数据库包括
使用 JDBC 读取数据
下面示例使用 Python 或 Scala 通过 JDBC 读取数据,注意每个数据库的<jdbc-url>使用不同的格式。
# 获取 dataframe 对象
employees_table = (spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
.load()
)
# 读取表结构
employees_table.printSchema
# 打印查询结果
display(employees_table.select("age", "salary").groupBy("age").avg("salary"))
// 获取 dataframe 对象
val employees_table = spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
.load()
// 读取表结构
employees_table.printSchema
// 打印查询结果
display(employees_table.select("age", "salary").groupBy("age").avg("salary"))
可以在option中配置JDBC的查询并行度和下推:
# 为具有 8 个核心的群集配置并行度, 设置并行度太大可能会使数据源服务不堪重负
employees_table = (spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
.option("numPartitions", 8)
.load()
)
# 将整个查询向下推送到数据库,且只返回结果
pushdown_query = "(select * from employees where emp_no < 10008) as emp_alias"
employees_table = (spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", pushdown_query)
.option("user", "<username>")
.option("password", "<password>")
.load()
)
# 通过fetchSize参数控制一次从远程数据库中提取的行数
employees_table = (spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
.option("fetchSize", "100")
.load()
)
使用 JDBC 写入数据
下面示例通过 JDBC 将数据保存到表:
# 创建新表并写入数据,如果同名的表已存在会引发错误
(employees_table.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.save()
)
# 将数据追加到现有的表
(employees_table.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.mode("append")
.save()
)
# 覆盖已有的数据表
(employees_table.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.mode("overwrite")
.save()
)
使用 JDBC 写入数据库时,Apache Spark 使用内存中的分区数来控制并行度。 可以在进行写入之前重新分区数据以控制并行度。
(employees_table.repartition(8)
.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.save()
)
查询 PostgreSQL
下面示例使用 PostgreSQL 的 JDBC 驱动程序对其进行查询。
driver = "org.postgresql.Driver"
database_host = "<database-host-url>"
database_port = "5432" # update if you use a non-default port
database_name = "<database-name>"
table = "<table-name>"
user = "<username>"
password = "<password>"
url = f"jdbc:postgresql://{database_host}:{database_port}/{database_name}"
remote_table = (spark.read
.format("jdbc")
.option("driver", driver)
.option("url", url)
.option("dbtable", table)
.option("user", user)
.option("password", password)
.load()
)
查询 MySQL
下面示例使用 MySQL 的 JDBC 驱动程序对其进行查询。
driver = "com.mysql.cj.jdbc.Driver"
database_host = "<database-host-url>"
database_port = "3306" # update if you use a non-default port
database_name = "<database-name>"
table = "<table-name>"
user = "<username>"
password = "<password>"
url = f"jdbc:mysql://{database_host}:{database_port}/{database_name}"
remote_table = (spark.read
.format("jdbc")
.option("driver", driver)
.option("url", url)
.option("dbtable", table)
.option("user", user)
.option("password", password)
.load()
)
查询 SQL Server
下面示例使用 SQL Server 的 JDBC 驱动程序对其进行查询。
driver = "com.microsoft.sqlserver.jdbc.SQLServerDriver"
database_host = "<database-host-url>"
database_port = "1433" # update if you use a non-default port
database_name = "<database-name>"
table = "<table-name>"
user = "<username>"
password = "<password>"
url = f"jdbc:sqlserver://{database_host}:{database_port};database={database_name}"
remote_table = (spark.read
.format("jdbc")
.option("driver", driver)
.option("url", url)
.option("dbtable", table)
.option("user", user)
.option("password", password)
.load()
)