(五)Spark大数据开发实战:灵活运用PySpark常用DataFrame API

news/2024/11/5 1:54:48 标签: 大数据, spark, 分布式

目录

一、PySpark

二、数据介绍

三、PySpark大数据开发实战

1、数据文件上传HDFS

2、导入模块及数据

3、数据统计与分析

①、计算演员参演电影数

②、依次罗列电影番位前十的演员

③、按照番位计算演员参演电影数

④、求每位演员所有参演电影中的最早、最晚上映时间及其相隔天数、年数

⑤、求每位演员所有电影中的评分最高值、最低值、电影数量、评分均值、标准差、方差、最高最低评分之差值

⑥、求参演大于等于10部电影的每位演员的平均评分,计算规则:去掉一个最高分和一个最低分,然后再计算电影平均分

⑦、求投票数在所有电影中排前80%、评分在所有电影中排前20%的电影信息

⑧、求美国、中国(含港澳台)、英国、法国、俄罗斯5个国家各个电影类型的上映电影数目

⑨、求各个地区评分最高的电影,若并列第一则以“|”拼接

⑩、统计从数据中最早年份到最晚年份的每月上映电影数量,若某个月份无电影上映则数量为0

4、下载统计结果

四、总结


一、PySpark

Apache Spark是一个用于大数据处理的开源分布式计算框架,而PySpark则是Spark的Python 实现。PySpark允许使用Python编程语言来利用Spark的强大功能,使得开发人员能够利用Python的易用性和灵活性进行大规模数据处理和分析。

PySpark与Spark-Scala的对比:

1、语言选择:
PySpark: 使用简洁而易学的Python作为编程语言,这使得PySpark学习难度大大降低。
Spark-Scala: 使用Scala作为主要编程语言。Scala是一门运行在Java虚拟机上的多范式编程语言,更接近Java,并具有强大的面向对象和函数式编程特性,但是其学习曲线较陡。

2、性能:
PySpark:由于Python是解释型语言,相比Scala的原生Spark可能会有性能上的一些损失。但通过PySpark的DataFrame和优化技术也可以显著提高性能。
Spark-Scala:使用Scala编写的Spark应用程序可能在性能上略优于PySpark,因为Scala是一门静态类型语言,而且运行在Java虚拟机上。

4、生态系统支持:
PySpark:可与Python的生态系统(如NumPy、Pandas)以及其他大数据工具和库进行集成。
Spark-Scala:由于运行在JVM上,可以利用Java生态系统,但Scala本身的生态系统相对较小。

5、机器学习支持:
PySpark: 提供了MLlib库,支持在分布式环境中进行大规模的机器学习任务。
Spark-Scala: 同样支持MLlib,但在API的使用上可能相对繁琐一些。

总体而言,PySpark强于数据分析,Spark-Scala强于性能。如果应用场景有非常多的可视化和机器学习算法需求,推荐使用pyspark,可以更好地和python中的相关库配合使用。

本文软件环境如下:

操作系统:CentOS Linux 7

Hadoop版本:3.1.3,安装教程可见我另一篇博客内容:Linux CentOS安装Hadoop3.1.3(单机版)详细教程

Spark版本:3.5.2,安装教程可见我另一篇博客内容:Linux CentOS安装PySpark3.5(单机版)详细教程及机器学习实战

Python版本:Python(Anaconda)3.11.4

PySpark基础学习可看 PySpark系列文章:

(一)PySpark3:安装教程及RDD编程

(二)PySpark3:SparkSQL编程

(三)PySpark3:SparkSQL40题

(四)PySpark3:Mlib机器学习实战-信用卡交易数据异常检测


二、数据介绍

本文数据来自采集豆瓣网分类排行榜 (“https://movie.douban.com/chart”)中各分类类别所有电影的相关信息并存储为csv文件。

爬虫代码在我另一篇博客:豆瓣电影信息爬取与可视化分析

数据放在了百度云上:https://pan.baidu.com/s/1YWB2iEOsMmXHkEUFpY2_TA?pwd=ej3z

数据如下图所示,包含电影名、上映日期、上映地区、类型、豆瓣链接、参演演员、演员数、评分、打分人数,共有3357部电影:

三、PySpark大数据开发实战

1、数据文件上传HDFS

首先通过xftp上传linux服务器,然后通过以下命令上传至HDFS:

hdfs dfs -mkdir /data
hdfs dfs -mkdir /output
hdfs dfs -put film_info.csv /data

2、导入模块及数据

使用SparkSession.builder.config(conf=SparkConf()).getOrCreate()创建Spark会话。使用spark.read.csv()读取CSV文件,并设置header=True以识别首行为列名,inferSchema=True自动推断数据类型。

from pyspark import SparkConf, SparkContext
from pyspark.sql import Row
from pyspark.sql.types import *
from pyspark.sql import SparkSession
from datetime import datetime
import pyspark.sql.functions as F
from pyspark.sql.window import Window

# 主程序:
spark = SparkSession.builder.config(conf=SparkConf()).getOrCreate()

df = spark.read.csv("/data/film_info.csv", header=True, inferSchema=True)

3、数据统计与分析

①、计算演员参演电影数

以下代码中使用了spark sql进行统计,也可以通过DataFrame API进行统计:

df.groupBy("actor").agg(count("title").alias("act_film_num"))
# 按分隔符切分列表
df_split = df.withColumn("actors", F.split(df["actors"], "\|")) \
    .withColumn("types", F.split(df["types"], "\|")) \
    .withColumn("regions", F.split(df["regions"], "\|"))

# 演员:拆分多行
df_exploded = df_split.withColumn("actor", F.explode(F.col("actors")))
df_exploded.drop(*["actors", "regions", "types"]).createOrReplaceTempView("actor_exploded")

df1 = spark.sql('''
    select actor,
           count(*) as act_film_num
    from actor_exploded 
    group by actor
    ''')
df1.sort(df1["act_film_num"].desc()).repartition(1).write.mode("overwrite").option("header", "true").csv(
    "/output/result1.csv")

结果如下:

+-------------+------------+
|        actor|act_film_num|
+-------------+------------+
|       童自荣|          43|
|     户田惠子|          37|
|         林雪|          33|
|       张国荣|          32|
|       刘德华|          31|
|       周星驰|          31|
|         成龙|          31|
|       任达华|          31|
|         刘洵|          30|
|塞缪尔·杰克逊|          29|
|  汤姆·汉克斯|          29|
|       梁家辉|          28|
|       吴孟达|          28|
|       梁朝伟|          27|
|      斯坦·李|          27|
|       吴君如|          27|
|    威廉·达福|          27|
|       黄秋生|          27|
|       胡立成|          27|
|  布拉德·皮特|          26|
+-------------+------------+
only showing top 20 rows
②、依次罗列电影番位前十的演员

这一题考察了窗口函数、行转列等等。

# 定义窗口函数,按电影标题和演员顺序排序
windowSpec = Window.partitionBy("title").orderBy("actors")
# 添加序号列
df2 = df_exploded.withColumn("rank", F.row_number().over(windowSpec))
# 过滤出前10个演员
rank_num = 10
rank_num_list = [str(i + 1) for i in range(rank_num)]
# 将演员重新组合成单行
df2_tmp1 = df2.groupBy("title").pivot("rank", rank_num_list).agg(F.collect_list("actor"))
df2_tmp2 = df2_tmp1.select("title", *[F.col(f"{i + 1}")[0].alias(f"actor{i + 1}") for i in range(rank_num)])
df2_tmp2.repartition(1).write.mode("overwrite").option("header", "true").csv("/output/result2.csv")

结果如下:

+------------------------+-------------------+---------------------+------------------+---------------+-----------------+----------------------+-------------------+---------------------+-----------------+-----------------+
|                   title|             actor1|               actor2|            actor3|         actor4|           actor5|                actor6|             actor7|               actor8|           actor9|          actor10|
+------------------------+-------------------+---------------------+------------------+---------------+-----------------+----------------------+-------------------+---------------------+-----------------+-----------------+
|                 101忠狗|          罗德·泰勒|            凯特·鲍尔|           本·怀特|    丽莎·戴维斯| 贝蒂·洛乌·格尔森|         J·帕特·奥马利|      玛莎·温特沃思|      大卫·弗兰克海姆|弗莱德里克·沃洛克|        汤姆·康威|
|                   11:14|        亨利·托马斯|          布莱克·赫伦|       芭芭拉·赫希|  克拉克·格雷格|    希拉里·斯万克|           肖恩·海托西|      斯塔克·桑德斯|          科林·汉克斯|        本·福斯特|  帕特里克·斯威兹|
|                    2012|        约翰·库萨克|          阿曼达·皮特|     切瓦特·埃加福|      坦迪·牛顿|    奥利弗·普莱特|           汤姆·麦卡锡|        伍迪·哈里森|          丹尼·格洛弗|      连姆·詹姆斯|        摩根·莉莉|
|                    2046|             梁朝伟|               章子怡|              王菲|       木村拓哉|             巩俐|                刘嘉玲|               张震|               张曼玉|             董洁|      通猜·麦金泰|
|                    21克|            西恩·潘|          娜奥米·沃茨|本尼西奥·德尔·托罗|  夏洛特·甘斯布|      梅丽莎·里奥|         迈克尔·芬内尔|     

http://www.niftyadmin.cn/n/5738782.html

相关文章

PyTorch核心概念:从梯度、计算图到连续性的全面解析(一)

文章目录 梯度requires_gradtorch.no_grad() 反向传播及网络的更新tensor.detach()CPU和GPUtensor.item()参考文献 梯度 requires_grad 当我们创建一个张量(tensor)的时候,如果没有特殊指定的话,那么这个张量是默认不需要求导的…

Dubbo Telnet服务追踪源码分析

前言 Dubbo 的 Telnet 特性允许开发者通过 Telnet 客户端直接连接到 Dubbo 服务提供者,执行一系列命令以获取服务状态、调试信息和进行服务管理。这种特性在开发和运维过程中非常有用,尤其是在需要快速定位问题或查看服务状态时。 本文介绍 Telnet 中最…

springboot 自动装配和bean注入原理及实现

装配:创建bean,并加入IOC容器。 注入:创建bean之间的依赖关系。 1、类自动装配 SpringBoot 自动装配使得开发人员可以轻松地搭建、配置和运行应用程序,而无需手动管理大部分的 bean 和配置。 Spring Boot 的自动装配机制与模块…

HarmonyOS鸿蒙开发入门,常用ArkUI组件学习(二)

书接上回,让我们继续来学习ArkUI的其他组件 目录,可以点击跳转到想要了解的组件详细内容 组件四:Button组件五:Slider组件六: Column & Row组件七:循环控制组件八: List 组件四:…

[每周一更]-(第121期):模拟面试|微服务架构面试思路解析

这一系列针对Go面试题整理,仅供参考 文章目录 00|综合服务治理方案:怎么保证微服务应用的高可用?1. **什么是微服务架构?**2. **怎么保证微服务架构的高可用?**3. **怎么判定服务是否已经健康?**4. **如果服务不健康该怎么办?**5. **怎么判定服务已经从不健康状态恢复过…

24下软考中级软件设计师,就剩几天了!

这几页纸,快背! 一 计算机组成与体系结构 1、寻址方式:立即寻址最快(操作数本身),寄存器寻址次之(操作数地址的地址),直接寻址最慢(操作数的地址&#xff0…

MySQL日志——针对实习面试

目录 MySQL日志MySQL有哪些日志?请解释一下MySQL的二进制日志(Binlog)的作用?复制(Replication)数据恢复(Point-in-Time Recovery) Binlog日志的三种格式是什么?如何使用…

企业培训考试系统源码

企业培训考试系统,是一个集成了多元化学习资源的综合性平台,其核心模块包括章节练习、历年真题、错题记录、模拟考试和正式考试等,旨在全方位提升员工的业务能力和应试技巧。 章节练习模块根据培训课程的内容进行细致划分,员工可…