尧图网站建设 尧图网络
  • 首页
  • 关于我们
  • 服务项目
  • 案例展示
  • 建站流程
  • 资讯中心
  • 联系我们
首页/资讯中心/详情

PySpark和PyFlink如何读取Hive中的表?

PySpark和PyFlink如何读取Hive中的表?
📅 发布时间:2026/6/20 2:04:46
PySpark和PyFlink如何读取Hive中的表?

在 PySpark 和 PyFlink 中读取 Hive 表,其核心逻辑都是通过特定的 Catalog 机制连接到 Hive Metastore (HMS),获取元数据后,直接读取底层的存储文件。

以下是具体的实现方式:


1. PySpark 读取 Hive

PySpark 具有原生的 Hive 支持。它通过 enableHiveSupport() 开启连接。

核心步骤

  1. 环境准备:将 hive-site.xml 放置在 Spark 的 conf 目录下,或者确保环境变量 HADOOP_CONF_DIR 指向包含该文件的路径。

  2. 代码实现:

from pyspark.sql import SparkSession# 必须调用 enableHiveSupport()
spark = SparkSession.builder \.appName("PySpark_Hive_Access") \.config("spark.sql.warehouse.dir", "/user/hive/warehouse") \.enableHiveSupport() \.getOrCreate()# 1. 使用 SQL 查询
df = spark.sql("SELECT * FROM my_database.my_table LIMIT 10")# 2. 使用 DataFrame API
df_table = spark.table("my_database.my_table")df.show()

关键调优:

  • 读取格式转换:Spark 读取 Hive 的 ORC/Parquet 时,默认会使用 Spark 优化的向量化读取器。如果遇到兼容性问题(如 Hive 独有的某些特性),可以设置 spark.sql.hive.convertMetastoreOrc=false 强制走 Hive 的原生 SerDe。


2. PyFlink 读取 Hive

PyFlink 访问 Hive 需要依赖 Hive Catalog。由于 Flink 不自带 Hive 依赖,你需要手动添加相应的 Jar 包(如 flink-sql-connector-hive-xxx.jar)。

核心步骤

  1. 添加依赖:在启动时指定 Jar 包或放入 Flink 的 lib 目录。

  2. 代码实现:

from pyflink.table import TableEnvironment, EnvironmentSettings, HiveCatalog# 1. 创建环境设置
settings = EnvironmentSettings.new_instance().in_batch_mode().build()
t_env = TableEnvironment.create(settings)# 2. 定义 Hive Catalog
catalog_name = "my_hive"
default_database = "default"
hive_conf_dir = "/etc/hive/conf" # 包含 hive-site.xml 的目录catalog = HiveCatalog(catalog_name, default_database, hive_conf_dir)# 3. 注册并切换 Catalog
t_env.register_catalog(catalog_name, catalog)
t_env.use_catalog(catalog_name)# 4. 读取表
table = t_env.from_path("my_database.my_table")
result = table.to_pandas() # 或进行后续 transformation
print(result)

 

  • in_batch_mode(): This tells Flink to treat the data as a finite set (like a traditional SQL database) rather than an endless stream.

  • TableEnvironment: This is the main entry point for Flink SQL and Table API. It’s where you register tables, run queries, and manage catalogs.

 

  • HiveCatalog: This is a specific connector that tells Flink: "I want to talk to the Hive Metastore."

  • hive_conf_dir: This is the most critical part. It points to the folder containing your hive-site.xml. Flink reads this file to find the Thrift URL (e.g., thrift://localhost:9083) to actually talk to Hive.

 

  • register_catalog: You are giving the catalog a nickname (my_hive) within the Flink session.

  • use_catalog: By default, Flink uses an internal memory catalog. This command switches the context so that when you say "table A," Flink looks for it in Hive instead of its own memory.

 

  • from_path: This locates the table. Because you switched to the Hive catalog, Flink asks Hive: "Where is this table stored on HDFS and what is its schema (SerDe)?"

  • to_pandas(): This triggers the execution. Flink will:

    1. Start a mini-job.

    2. Open the files on HDFS (ORC, Parquet, etc.).

    3. Convert the rows into a Python Pandas DataFrame.

 

3. 核心差异对比

特性 PySpark PyFlink
连接机制 enableHiveSupport() 自动集成 需显式注册 HiveCatalog
依赖处理 通常已集成在 Spark 发行版中 必须手动匹配 Hive 版本的 Connector Jar
主要用途 离线大规模批处理、机器学习 流批一体、实时入湖 (CDC) 后的查询
SQL 语法 Spark SQL (兼容部分 HiveQL) Flink SQL (高度兼容 Hive 方言)

4. 常见问题排查(性能与报错)

  • 元数据同步:

    • Spark:如果 Hive 中新增了分区,Spark 侧有时需要执行 MSCK REPAIR TABLE table_name 或 REFRESH TABLE table_name 来更新缓存。

    • Flink:在流式读取 Hive 时,需要注意 streaming-source.monitor-interval 参数,否则 Flink 只会读取当前快照。

  • 权限问题 (Kerberos):

    如果集群开启了 Kerberos 认证,PySpark/PyFlink 程序在启动前必须先执行 kinit 获取票据,且配置中需指定 principal 和 keytab。

 

相关新闻

  • 《道德情操论》
  • 基于S7-200 PLC与组态王的机械手自动化搬运控制策略
  • 《生命的进程》

最新新闻

  • 商用车车联网:场景篇 - 金融风控(第6篇):风控评分模型——从规则到算法
  • LangGraph 工作流:把关键流程跑顺
  • 2026年长沙GEO优化服务商TOP5榜单 - GEO优化
  • 2026茂名2026正规漏水检测维修公司精选口碑榜TOP5权威推荐-精准定位检测漏水点-专业防水补漏堵漏维修、卫生间/厨房/屋顶/天沟/地下室/阳台防水漏水检测维修 - 安佳防水
  • Steam成就管理器完整指南:如何免费轻松管理你的游戏成就
  • 2026年当下上海诚信的硼化锆源头厂家选型全指南 - 品牌鉴赏官2026

日新闻

  • 信任的进化:技术实现详解——如何用JavaScript构建博弈论模拟器
  • Terrakube自定义工作流:如何集成OPA、Infracost等工具扩展IaC能力
  • grunt-concurrent快速入门:5分钟学会并行运行Grunt任务

周新闻

  • 3步解锁iOS设备:applera1n激活锁绕过完全指南
  • 39 2026 人工智能证书终极盘点,普通人选 AI 证书可以从这些方向入手
  • Redis 暴露公网有多危险?从端口检查到补救步骤

月新闻

  • 【总结】入门篇:50句话让你记住架构核心概念
  • WeChatMsg技术方案解析:实现Mac微信数据自主管理的完整解决方案
  • WeChatMsg:革新性微信数据备份方案,打造你的专属数字记忆库

关于尧图

  • 公司简介
  • 团队介绍
  • 企业文化
  • 荣誉资质

服务项目

  • 定制开发
  • 电商建站
  • UI 设计
  • 运维服务

快速链接

  • 案例展示
  • 建站流程
  • 常见问题
  • 资讯中心

联系方式

  • 📍北京市朝阳区互联网产业园 A 座 10 层
  • 📞400-888-8888
  • ✉️contact@rkmt.cn
  • 🕐周一至周日 9:00-21:00

© 2024 北京尧图网络科技有限公司 版权所有 | 京 ICP 备 XXXXXXXX 号