摸鱼大数据——Spark SQL——Spark SQL函数定义二

3、Spark原生自定义UDF函数

3.1 自定义函数流程:
 第一步: 在PySpark中创建一个Python的函数,在这个函数中书写自定义的功能逻辑代码即可
 ​
 第二步: 将Python函数注册到Spark SQL中
     注册方式一: udf对象 = sparkSession.udf.register(参数1,参数2,参数3)
         参数1: 【UDF函数名称】,此名称用于后续在SQL中使用,可以任意取值,但是要符合名称的规范
         参数2: 【自定义的Python函数】,表示将哪个Python的函数注册为Spark SQL的函数
         参数3: 【UDF函数的返回值类型】。用于表示当前这个Python的函数返回的类型
         udf对象: 返回值对象,是一个UDF对象,可以在DSL中使用
     
         说明: 如果通过方式一来注册函数, 【可以用在SQL和DSL】
     
     注册方式二:  udf对象 = F.udf(参数1,参数2)
         参数1: Python函数的名称,表示将那个Python的函数注册为Spark SQL的函数
         参数2: 返回值的类型。用于表示当前这个Python的函数返回的类型
         udf对象: 返回值对象,是一个UDF对象,可以在DSL中使用
         
         说明: 如果通过方式二来注册函数,【仅能用在DSL中】
         
     注册方式三:  语法糖写法  @F.udf(returnType=返回值类型)  放置到对应Python的函数上面
         说明: 实际是方式二的扩展。如果通过方式三来注册函数,【仅能用在DSL中】
     
         
 第三步: 在Spark SQL的 DSL/ SQL 中进行使用即可
 ​

3.2 自定义演示一:

需求1: 请自定义一个函数,完成对 数据 统一添加一个后缀名的操作 , 例如后缀名 '_itheima'

效果如下:

# 导包
import os
from pyspark.sql import SparkSession,functions as F
from pyspark.sql.types import StringType

# 绑定指定的python解释器

os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'

# 创建main函数
if __name__ == '__main__':
    # 1.创建SparkContext对象
    spark = SparkSession.builder.appName('pyspark_demo').master('local[*]').getOrCreate()


    # 2.数据输入
    df = spark.createDataFrame(
        data=[(1,'张三','广州'),(2,'李四','深圳')],
        schema='id int,name string,address string'
    )
    df.show()
    
    # 3.SparkSQL自定义udf函数
    # 第一步.自定义python函数
    def add_suffix(data):
        return data+'_itheima'

    # 第二步.把python函数注册到SparkSQL
    # ① spark.udf.register注册
    dsl1_add_suffix = spark.udf.register('sql_add_suffix',add_suffix,StringType())
    # ②F.udf注册
    dsl2_add_suffix = F.udf(add_suffix, StringType())
    # ③@F.udf注册
    @F.udf( StringType())
    def candy_add_suffix(data):
        return data+'_itheima'

    # 第三步.在SparkSQL中调用自定义函数
    # SQL方式
    df.createTempView('temp')
    spark.sql(
        """select id,name,sql_add_suffix(address) as new_address from temp"""
    ).show()
    
    # DSL方式
    # 调用dsl1_add_suffix
    df.select(
        'id', 'name', dsl1_add_suffix('address').alias('new_address')
    ).show()
    # 调用dsl2_add_suffix
    df.select(
        'id', 'name', dsl2_add_suffix('address').alias('new_address')
    ).show()
    # 调用candy_add_suffix
    df.select(
        'id', 'name', candy_add_suffix('address').alias('new_address')
    ).show()

    # 4.关闭资源
    spark.stop()

可能遇到的问题如下

 原因: 在错误的地方调用了错误的函数。spark.udf.register参数1取的函数名只能在SQL中使用,不能在DSL中用。

3.3 自定义演示二:

需求2: 请自定义一个函数,返回值类型为复杂类型: 列表

效果如下:

参考代码:

 # 导包
 import os
 from pyspark.sql import SparkSession,functions as F
 from pyspark.sql.types import StringType, ArrayType
 ​
 # 绑定指定的python解释器
 ​
 os.environ['SPARK_HOME'] = '/export/server/spark'
 os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
 os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
 ​
 # 创建main函数
 if __name__ == '__main__':
     # 1.创建SparkContext对象
     spark = SparkSession.builder.appName('pyspark_demo').master('local[*]').getOrCreate()
 ​
 ​
     # 2.数据输入
     df = spark.createDataFrame(
         data=[(1,'张三_广州'),(2,'李四_深圳')],
         schema='id int,name_address string'
     )
     df.show()
 ​
     # 3.SparkSQL自定义udf函数
     # 第一步.自定义python函数
     def my_split(data:str):
         list1 = data.split('_')
         return list1
 ​
     # 第二步.把python函数注册到SparkSQL
     # ① spark.udf.register注册
     dsl1_add_suffix = spark.udf.register('sql_add_suffix',my_split,ArrayType(StringType()))
     # ②F.udf注册
     dsl2_add_suffix = F.udf(my_split, ArrayType(StringType()))
     # ③@F.udf注册
     @F.udf(ArrayType(StringType()))
     def candy_add_suffix(data):
         list1 = data.split('_')
         return list1
 ​
     # 第三步.在SparkSQL中调用自定义函数
     # SQL方式
     df.createTempView('temp')
     spark.sql(
         """select id,sql_add_suffix(name_address) as new_address from temp"""
     ).show()
 ​
     # DSL方式
     # 调用dsl1_add_suffix
     df.select(
         'id',  dsl1_add_suffix('name_address').alias('new_name_address')
     ).show()
     # 调用dsl2_add_suffix
     df.select(
         'id',dsl2_add_suffix('name_address').alias('new_name_address')
     ).show()
     # 调用candy_add_suffix
     df.select(
         'id',candy_add_suffix('name_address').alias('new_name_address')
     ).show()
 ​
 ​
     # 4.关闭资源
     spark.stop()

3.4 自定义演示三:

需求3: 请自定义一个函数,返回值类型为复杂类型: 字典

效果如下:

 注意: 注意: 如果是字典类型,StructType中列名需要和字典的key值一致,否则是null补充
 # 导包
 import os
 from pyspark.sql import SparkSession,functions as F
 from pyspark.sql.types import StringType, ArrayType, StructType
 ​
 # 绑定指定的python解释器
 ​
 os.environ['SPARK_HOME'] = '/export/server/spark'
 os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
 os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
 ​
 # 创建main函数
 if __name__ == '__main__':
     # 1.创建SparkContext对象
     spark = SparkSession.builder.appName('pyspark_demo').master('local[*]').getOrCreate()
 ​
 ​
     # 2.数据输入
     df = spark.createDataFrame(
         data=[(1,'张三_广州'),(2,'李四_深圳')],
         schema='id int,name_address string'
     )
     df.show()
 ​
     # 3.SparkSQL自定义udf函数
     # 第一步.自定义python函数
     def my_split(data:str):
         list1 = data.split('_')
         return {'name':list1[0],'address':list1[1]}
 ​
     # 第二步.把python函数注册到SparkSQL
     # 注意: 如果是字典类型,StructType中列名需要和字典的key值一致,否则是null
     t = StructType().add('name',StringType()).add('address',StringType())
     # ① spark.udf.register注册
     dsl1_add_suffix = spark.udf.register('sql_add_suffix',my_split,t)
     # ②F.udf注册
     dsl2_add_suffix = F.udf(my_split, t)
     # ③@F.udf注册
     @F.udf(t)
     def candy_add_suffix(data):
         list1 = data.split('_')
         return {'name':list1[0],'address':list1[1]}
 ​
     # 第三步.在SparkSQL中调用自定义函数
     # SQL方式
     df.createTempView('temp')
     spark.sql(
         """select id,sql_add_suffix(name_address) as new_name_address from temp"""
     ).show()
 ​
     # DSL方式
     # 调用dsl1_add_suffix
     df.select(
         'id', dsl1_add_suffix('name_address').alias('new_name_address')
     ).show()
     # 调用dsl2_add_suffix
     df.select(
         'id',dsl2_add_suffix('name_address').alias('new_name_address')
     ).show()
     # 调用candy_add_suffix
     df.select(
         'id',candy_add_suffix('name_address').alias('new_name_address')
     ).show()
 ​
     # 4.关闭资源
     spark.stop()

4、Pandas的自定义函数

4.1 Apache Arrow框架

Apache Arrow是Apache旗下的一款顶级的项目。是一个跨平台的在内存中以列式存储的数据层,它的设计目标就是作为一个跨平台的数据层,来加快大数据分析项目的运行效率

Pandas 与 Spark SQL 进行交互的时候,建立在Apache Arrow上,带来低开销 高性能的UDF函数

如何安装? 三个节点建议都安装

 检查服务器上是否有安装pyspark
 pip list | grep pyspark  或者 conda list | grep pyspark
 ​
 如果服务器已经安装了pyspark的库,那么仅需要执行以下内容,即可安装。例如在 node1安装
 pip install -i https://pypi.tuna.tsinghua.edu.cn/simple pyspark[sql]
     
 如果服务器中python环境中没有安装pyspark,建议执行以下操作,即可安装。例如在 node2 和 node3安装
 pip install -i https://pypi.tuna.tsinghua.edu.cn/simple pyarrow==16.1.0

Arrow并不会自动使用,在某些情况下,需要配置 以及在代码中需要进行小的更改才可以使用

如何使用呢? 默认不会自动启动的, 一般建议手动配置

 spark.conf.set('spark.sql.execution.arrow.pyspark.enabled',True)

4.2 基于Arrow完成Pandas和Spark的DataFrame互转

Pandas中DataFrame:

DataFrame:表示一个二维表对象,就是表示整个表

字段、列、索引;Series表示一列

Spark SQL中DataFrame:

使用场景:

1- Spark的DataFrame -> Pandas的DataFrame:当大数据处理到后期的时候,可能数据量会越来越少,这样可以考虑使用单机版的Pandas来做后续数据的分析

2- Pandas的DataFrame -> Spark的DataFrame:当数据量达到单机无法高效处理的时候,或者需要和其他大数据框架集成的时候,可以转成Spark中的DataFrame

 Pandas的DataFrame -> Spark的DataFrame: spark.createDataFrame(data=pandas_df)
 Spark的DataFrame -> Pandas的DataFrame: init_df.toPandas()

示例:

 # 导包
 import os
 from pyspark.sql import SparkSession
 ​
 # 绑定指定的python解释器
 os.environ['SPARK_HOME'] = '/export/server/spark'
 os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
 os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
 ​
 # 创建main函数
 if __name__ == '__main__':
     # 1.创建SparkContext对象
     spark = SparkSession.builder.appName('pyspark_demo').master('local[*]').getOrCreate()
 ​
     # TODO: 手动开启arrow框架
     spark.conf.set('spark.sql.execution.arrow.pyspark.enabled', True)
 ​
     # 2.数据输入
     df = spark.createDataFrame(
         data=[(1,'张三_广州'),(2,'李四_深圳')],
         schema='id int ,name_address string'
     )
     df.show()
     print(type(df))
     print('------------------------')
 ​
     # 3.数据处理(切分,转换,分组聚合)
     # 4.数据输出
     # spark->pandas
     pd_df = df.toPandas()
     print(pd_df)
     print(type(pd_df))
 ​
     print('------------------------')
     # pandas->spark
     df2 = spark.createDataFrame(pd_df)
     df2.show()
     print(type(df2))
     
 ​
     # 5.关闭资源
     spark.stop()
 ​

4.3 基于Pandas自定义函数

基于Pandas的UDF函数来转换为Spark SQL的UDF函数进行使用。底层是基于Arrow框架来完成数据传输,允许向量化(可以充分利用计算机CPU性能)操作。

Pandas的UDF函数其实本质上就是Python的函数,只不过函数的传入数据类型为Pandas的类型

基于Pandas的UDF可以使用自定义UDF函数和自定义UDAF函数

4.3.1 自定义函数流程

第一步: 在PySpark中创建一个Python的函数,在这个函数中书写自定义的功能逻辑代码即可

第二步: 将Python函数包装成Spark SQL的函数
    注册方式一: udf对象 = spark.udf.register(参数1, 参数2)
        参数1: UDF函数名称。此名称用于后续在SQL中使用,可以任意取值,但是要符合名称的规范
        参数2: Python函数的名称。表示将哪个Python的函数注册为Spark SQL的函数
        使用: udf对象只能在DSL中使用。参数1指定的名称只能在SQL中使用
        
        
    注册方式二: udf对象 = F.pandas_udf(参数1, 参数2)
        参数1: 自定义的Python函数。表示将哪个Python的函数注册为Spark SQL的函数
        参数2: UDF函数的返回值类型。用于表示当前这个Python的函数返回的类型对应到Spark SQL的数据类型
        udf对象: 返回值对象,是一个UDF对象。仅能用在DSL中使用
    
    注册方式三: 语法糖写法  @F.pandas_udf(returnType)  放置到对应Python的函数上面
        说明: 实际是方式二的扩展。仅能用在DSL中使用
    
第三步: 在Spark SQL的 DSL/ SQL 中进行使用即可

基于pandas方式还支持自定义UDAF函数
注意: 如果要用于自定义UDAF函数,理论上只能用上述注册方式三语法糖方式,也就意味着理论只能DSL使用
注意: 如果还想同时用SQL方式和DSL方式,可以把加了语法糖的函数,再传入到方式register注册,就可以使用了!

4.3.2 自定义UDF函数
  • 自定义Python函数的要求:SeriesToSeries

# 导包
import os
from pyspark.sql import SparkSession,functions as F
import pandas as pd

# 绑定指定的python解释器
from pyspark.sql.types import LongType, IntegerType

os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'

# 创建main函数
if __name__ == '__main__':
    # 1.创建SparkContext对象
    spark = SparkSession.builder.appName('pyspark_demo').master('local[*]').getOrCreate()

    # TODO: 开启Arrow的使用
    spark.conf.set('spark.sql.execution.arrow.pyspark.enabled', 'True')

    # 2.数据输入
    df = spark.createDataFrame(
        data = [(1,1),(2,2),(3,3)],
        schema= 'num1 int,num2 int'
    )
    df.show()
    # 3.基于pandas自定义函数 :SeriesTOSeries
    # 第一步: 自定义python函数
    def multiply(num1:pd.Series,num2:pd.Series)->pd.Series:
        return num1*num2

    # 第二步: 把python注册为SparkSQL函数
    # ①spark.udf.register注册
    dsl1_multiply = spark.udf.register('sql_multiply',multiply)
    # ②F.pandas_udf注册
    dsl2_multiply = F.pandas_udf(multiply,IntegerType())
    # ③@F.pandas_udf注册
    @F.pandas_udf(IntegerType())
    def candy_multiply(num1: pd.Series, num2: pd.Series) -> pd.Series:
        return num1 * num2

    # 第三步: 在SparkSQL中调用注册后函数
    # SQL方式
    df.createTempView('temp')
    spark.sql(
        """select num1,num2,sql_multiply(num1,num2) as result from temp"""
    ).show()
    # DSL方式
    #调用dsl1_multiply
    df.select(
        'num1','num2',dsl1_multiply('num1','num2').alias('result')
    ).show()
    # 调用dsl2_multiply
    df.select(
        'num1', 'num2', dsl2_multiply('num1', 'num2').alias('result')
    ).show()
    # 调用candy_multiply
    df.select(
        'num1', 'num2', candy_multiply('num1', 'num2').alias('result')
    ).show()

    # 4.关闭资源
    spark.stop()
 

4.3.3 自定义UDAF函数
  • 自定义Python函数的要求:Series To 标量

    表示:自定义函数的输入数据类型是Pandas中的Series对象,返回值数据类型是标量数据类型。也就是Python中的数据类型,例如:int、float、bool、list....

 基于pandas方式还支持自定义UDAF函数
 注意: 如果要用于自定义UDAF函数,理论上只能用上述注册方式三语法糖方式,也就意味着理论只能DSL使用
 注意: 如果还想同时用SQL方式和DSL方式,可以把加了语法糖的函数,再传入到方式register注册,就可以使用了!
 # 导包
 import os
 from pyspark.sql import SparkSession, functions as F
 import pandas as pd
 ​
 # 绑定指定的python解释器
 from pyspark.sql.types import LongType, IntegerType, FloatType
 ​
 os.environ['SPARK_HOME'] = '/export/server/spark'
 os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
 os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
 ​
 # 创建main函数
 if __name__ == '__main__':
     # 1.创建SparkContext对象
     spark = SparkSession.builder.appName('pyspark_demo').master('local[*]').getOrCreate()
 ​
     # TODO: 开启Arrow的使用
     spark.conf.set('spark.sql.execution.arrow.pyspark.enabled', 'True')
 ​
     # 2.数据输入
     df = spark.createDataFrame(
         data=[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
         schema='id int,value float'
     )
     df.show()
 ​
 ​
     # 3.基于pandas自定义函数 :SeriesTOSeries
     # 第一步: 自定义python函数
     # ③@F.pandas_udf注册  注意: 理论上UDAF只能用注册方式三语法糖方式,也就意味着只能DSL使用
     @F.pandas_udf(FloatType())
     def candy_mean_v(value: pd.Series) -> float:
         return value.mean()
 ​
 ​
     # 第二步: 注意: 如果还想同时用SQL方式和DSL方式,可以把加了语法糖的函数,再传入到方式一register注册
     # ①spark.udf.register注册
     dsl1_mean_v = spark.udf.register('sql_mean_v', candy_mean_v)
 ​
     # 第三步: 在SparkSQL中调用注册后函数
     # DSL方式
     # 调用candy_mean_v
     df.groupBy('id').agg(
         candy_mean_v('value').alias('result')
     ).show()
 ​
     # 调用dsl1_mean_v
     df.groupBy('id').agg(
         dsl1_mean_v('value').alias('result')
     ).show()
 ​
     # SQL方式
     df.createTempView('temp')
     spark.sql(
         """select id,sql_mean_v(value) as result from temp group by id"""
     ).show()
 ​
     # 4.关闭资源
     spark.stop()

相关推荐

  1. 数据——Hive调优10-12

    2024-07-10 08:44:02       16 阅读

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2024-07-10 08:44:02       3 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2024-07-10 08:44:02       3 阅读
  3. 在Django里面运行非项目文件

    2024-07-10 08:44:02       2 阅读
  4. Python语言-面向对象

    2024-07-10 08:44:02       2 阅读

热门阅读

  1. IT专业入门,高考假期预习指南

    2024-07-10 08:44:02       10 阅读
  2. 强化OT安全英国发布工控网络事件响应实践指南

    2024-07-10 08:44:02       15 阅读
  3. 使用静态图加速

    2024-07-10 08:44:02       7 阅读
  4. 修改ES索引名称

    2024-07-10 08:44:02       8 阅读
  5. asp.netWebForm(.netFramework) CSRF漏洞

    2024-07-10 08:44:02       10 阅读
  6. Redis的使用(三)常见使用场景-session共享

    2024-07-10 08:44:02       8 阅读
  7. DS200CVMAG1AEB处理器 控制器 模块

    2024-07-10 08:44:02       9 阅读
  8. 插8张显卡的服务器有哪些?

    2024-07-10 08:44:02       7 阅读
  9. react antd table拖拽

    2024-07-10 08:44:02       11 阅读
  10. VB 关键字

    2024-07-10 08:44:02       11 阅读
  11. 前端面试题(13)答案版

    2024-07-10 08:44:02       10 阅读
  12. 智能警卫:Conda包依赖的自动监控之道

    2024-07-10 08:44:02       9 阅读
  13. vue处理重复请求

    2024-07-10 08:44:02       9 阅读
  14. 深度学习:从数据采集到模型测试的全面指南

    2024-07-10 08:44:02       9 阅读