DataExtractByReadFile.scala 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248
  1. package com.aspirecn.jiaoke.data.extract
  2. import java.nio.file.{Files, Paths}
  3. import java.sql.Date
  4. import org.apache.log4j.{Level, Logger}
  5. import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
  6. import org.apache.spark.storage.StorageLevel
  7. import scala.collection.immutable
  8. /**
  9. * @auther: hwj
  10. * @date: 2021/4/13 10:39
  11. * @description:
  12. */
  13. object DataExtractByReadFile {
  14. case class TrcExetcpu(vehicleid:String, exTime:Date, enTime:Date,feemileage :String, shortfeemileage:String,fee:String, enweight:String, extolllaneid:String, axlecount:String, vehicletype:String, day:String )
  15. case class GBUPLoad(vehicleid:String, transtime:String, day:String)
  16. def main(args: Array[String]): Unit = {
  17. val log: Logger = Logger.getLogger("DataExtractByReadFile")
  18. log.setLevel(Level.INFO)
  19. if (args.size != 6){
  20. log.error(
  21. """
  22. |请正确传入程序参数:1.门架数据读取目录
  23. | 2.ETC数据读取目录
  24. | 3.现金出口数据读取目录
  25. | 4.结果文件的输出路径(文件输出目录在执行不可存在,程序会自动进行创建,否则会报错)
  26. | 5.checkpoint目录
  27. | 6.结果文件输出个数,请根据数据量合理输入该参数,输出的文件越少,执行时间越长,输入0则为默认文件个数
  28. |""".stripMargin)
  29. System.exit(0)
  30. }
  31. // 门架数据读取目录
  32. val menjiaPath: String = args(0)
  33. // etc数据读取目录
  34. val etcPath: String = args(1)
  35. // 现金出口数据读取目录
  36. val qitachukouPath: String = args(2)
  37. // 结果文件输出目录
  38. val out_path: String = args(3)
  39. // checkpoint目录
  40. val check_path: String = args(4)
  41. // 结果文件输出个数
  42. val result_partitions: Int = Integer.valueOf(args(5))
  43. // 当数据读取目录不存在时,结束程序
  44. if((!exists(menjiaPath))||(!exists(etcPath))||(!exists(qitachukouPath))){
  45. log.error("请输入正确的文件目录:" +
  46. "高架数据路径:"+menjiaPath +
  47. "etc数据路径:"+etcPath +
  48. "现金出口数据路径:"+qitachukouPath)
  49. System.exit(0)
  50. }
  51. // 当结果数据输出目录已存在时,结束程序
  52. if(exists(out_path)){
  53. log.error("数据输出目录已存在"+out_path)
  54. System.exit(0)
  55. }
  56. // 初始化SparkSession
  57. val spark: SparkSession = formatSparkSession
  58. //设置checkpoint
  59. spark.sparkContext.setCheckpointDir(check_path)
  60. // spark.sparkContext.setCheckpointDir("file:///data/checkpoint")
  61. log.info("开始抽取数据")
  62. //todo:开始处理数据
  63. import org.apache.spark.sql.functions._
  64. // todo: 引入隐式转换
  65. import spark.implicits._
  66. // 高架数据
  67. log.info("开始抽取门架数据,抽取路径为:"+menjiaPath)
  68. val gbuLoadDataset: Dataset[GBUPLoad] = spark.read.option("header", true).csv(menjiaPath).toDF("vehicleid", "transtime", "day").as[GBUPLoad].coalesce(36)
  69. log.info("抽取门架数据成功")
  70. //etc数据
  71. log.info("开始抽取etc出口数据,抽取路径为:"+etcPath)
  72. val trcExetcpuDataset = spark.read.option("header", true).csv(etcPath).toDF("vehicleid","extime","entime","feemileage","shortfeemileage","fee","enweight","extolllaneid","axlecount","vehicletype","day").as[TrcExetcpu].coalesce(36)
  73. log.info("抽取etc出口数据成功")
  74. // 其他出口数据
  75. log.info("抽取开始抽取现金出口数据,抽取路径为:"+qitachukouPath)
  76. val trcOtuDataset = spark.read.option("header", true).csv(qitachukouPath).toDF("vehicleid","extime","entime","feemileage","shortfeemileage","fee","enweight","extolllaneid","axlecount","vehicletype","day").as[TrcExetcpu].coalesce(36)
  77. log.info("现金出口数据成功")
  78. //合并etc出口和现金出口的数据
  79. log.info("开始合并etc出口和现金出口数据")
  80. val trcDataSet: Dataset[TrcExetcpu] = trcExetcpuDataset.unionByName(trcOtuDataset)
  81. log.info("合并etc出口和现金出口数据成功")
  82. // 创建视图 对出口数据进行处理
  83. trcDataSet.createOrReplaceTempView("trcDataSet")
  84. log.info("创建视图,开始统计出口相关数据")
  85. val trcDataSet_p: DataFrame = spark.sql(
  86. """
  87. |select
  88. |vehicleid,
  89. |exTime,
  90. |entime,
  91. |(case when entime is not null then unix_timestamp(regexp_replace(extime,'T',' '))-unix_timestamp(regexp_replace(entime,'T',' ')) else 0 end ) as travel_time,
  92. |(case when feemileage>0 and feemileage is not null then feemileage else shortfeemileage end) as feemileage,
  93. | cast(fee as double) as fee,
  94. |(case when enweight >0 then ( case when feemileage>0 and feemileage is not null then feemileage else shortfeemileage end) else 0 end ) as weight_mileage,
  95. |substr(exTollLaneId,6,2) as travel_provinces,
  96. |axlecount,
  97. |vehicletype,
  98. |day
  99. |from
  100. |trcDataSet
  101. |
  102. |""".stripMargin)
  103. log.info("出口相关数据统计完成")
  104. // 取出出口表的 出口交易时间和入口交易时间,与门架表进行合并
  105. log.info("开始抽取出口数据中的出口交易时间")
  106. val ex_transtime: Dataset[GBUPLoad] = trcDataSet_p.select($"day", $"vehicleid".as("vehicleid"), $"exTime".as("transtime")).as[GBUPLoad]
  107. log.info("抽取出口数据中的出口交易时间成功")
  108. log.info("开始抽取出口数据中的入口交易时间")
  109. val en_transtime: Dataset[GBUPLoad] = trcDataSet_p.select($"day", $"vehicleid".as("vehicleid"),$"enTime".as("transtime")).as[GBUPLoad]
  110. log.info("抽取出口数据中的入口交易时间成功")
  111. log.info("开始合并门架数据时间与 入口交易时间、出口交易时间")
  112. val exetcpuDataset: Dataset[GBUPLoad] = gbuLoadDataset.unionByName(ex_transtime).unionByName(en_transtime)
  113. log.info("合并交易时间成功")
  114. trcDataSet_p.persist(StorageLevel.MEMORY_AND_DISK)
  115. trcDataSet_p.checkpoint()
  116. trcDataSet_p.unpersist()
  117. // 创建视图 对用户出行数据进行分段统计次数
  118. exetcpuDataset.createOrReplaceTempView("gbupload_etctu")
  119. val gbupload_etctu: Dataset[GBUPLoad] = spark.sql(
  120. """
  121. |select vehicleid as vehicleid,
  122. |(case
  123. | when 0 <= hour(transtime) and hour(transtime)<=3 then 'A'
  124. | when 3 < hour(transtime) and hour(transtime)<=6 then 'B'
  125. | when 6 < hour(transtime) and hour(transtime)<=9 then 'C'
  126. | when 9 < hour(transtime) and hour(transtime)<=12 then 'D'
  127. | when 12 < hour(transtime) and hour(transtime)<=15 then 'E'
  128. | when 15 < hour(transtime) and hour(transtime)<=18 then 'F'
  129. | when 18 < hour(transtime) and hour(transtime)<=21 then 'G'
  130. | when 21 < hour(transtime) and hour(transtime)<24 then 'H'
  131. | else 'I' end) as transtime,
  132. |day
  133. |from
  134. |gbupload_etctu
  135. |""".stripMargin).as[GBUPLoad]
  136. gbupload_etctu.persist(StorageLevel.MEMORY_AND_DISK)
  137. gbupload_etctu.checkpoint()
  138. gbupload_etctu.unpersist()
  139. spark.sqlContext.dropTempTable("gbupload_etctu")
  140. //todo:根据车牌号进行分组聚合,统计最近一次出行时间、出行总时长、最长出行时间、出行总里程、出行花费总金额、负重出行总里程、出行次数
  141. //合并etc出口和现金出口的数据
  142. val trcDataFrame = trcDataSet_p
  143. .groupBy($"day", $"vehicleid")
  144. .agg(immutable.Map("exTime" -> "max", "travel_time" -> "sum", "travel_time" -> "max", "feemileage" -> "sum", "fee" -> "sum", "weight_mileage" -> "sum", "vehicleid" -> "count"))
  145. .withColumnRenamed("max(exTime)","max_exTime")
  146. .withColumnRenamed("max(travel_time)","max_travel_time")
  147. .withColumnRenamed("sum(travel_time)","sum_travel_time")
  148. .withColumnRenamed("sum(feemileage)","sum_feemileage")
  149. .withColumnRenamed("sum(fee)","sum_fee")
  150. .withColumnRenamed("sum(weight_mileage)","sum_weight_mileage")
  151. .withColumnRenamed("count(vehicleid)","exTimes_count")
  152. //todo: 取车辆轴数和收费类型
  153. trcDataSet_p.createOrReplaceTempView("trc_table")
  154. val vehicleTypeAxlecount: DataFrame = spark.sql(
  155. """
  156. |select
  157. |a.day as day,
  158. |a.vehicleid as vehicleid,
  159. |a.axlecount as axlecount,
  160. |a.vehicletype as vehicletype
  161. |from
  162. |(select day ,
  163. |vehicleid ,
  164. | axlecount ,
  165. | vehicletype ,
  166. | row_number() over(partition by day , vehicleid order by exTime desc) as rr
  167. |from
  168. |trc_table) a where a.rr=1
  169. |""".stripMargin)
  170. // val trc_C_V_DataFrame: DataFrame = trcDataFrame.join(vehicleTypeAxlecount.select($"axlecount",$"vehicletype",$"day",$"vehicleid"), (trcDataFrame.col("vehicleid") === vehicleTypeAxlecount.col("vehicleid")).and(trcDataFrame.col("day") === vehicleTypeAxlecount.col("day")), "left")
  171. // 合并出口统计数据与用户轴数
  172. val trc_C_V_DataFrame: DataFrame = trcDataFrame.join(vehicleTypeAxlecount, Seq("day","vehicleid"), "left")
  173. //todo: 抽取出口数据集中的省份编码
  174. val trcProvinceDataSet: Dataset[Row] = trcDataSet_p.select($"day", $"vehicleid", $"travel_provinces").groupBy($"day", $"vehicleid", $"travel_provinces").count().withColumnRenamed("count","travel_provinces_count")
  175. trcProvinceDataSet.createOrReplaceTempView("trcProvince_table")
  176. val trcProvinceCountDataSet = spark.sql(
  177. """
  178. |select day , vehicleid as vehicleid ,concat_ws("#",collect_list(concat_ws("%",travel_provinces,travel_provinces_count) )) as travel_provinces_count
  179. |from trcProvince_table
  180. |group by day , vehicleid
  181. |""".stripMargin)
  182. val trcTransProvinceCountDataFrame = trc_C_V_DataFrame.join(trcProvinceCountDataSet,
  183. Seq("day","vehicleid"),
  184. "outer")
  185. //统计每个牌照出行时间段的次数
  186. val transtimeDataFrame:DataFrame = gbupload_etctu.groupBy($"day",$"vehicleid",$"transtime")
  187. .agg(count("transtime").as("transtime_preferences_count"))
  188. // 汇总车辆各个时间段出行次数,变成了map结构
  189. transtimeDataFrame.createOrReplaceTempView("transtimeData")
  190. val transTimeCountMapDataFrame: DataFrame = spark.sql(
  191. """
  192. |select day ,
  193. |vehicleid ,
  194. |concat_ws("#",collect_list(concat_ws("%",transtime,transtime_preferences_count) )) as transtime_count
  195. |from transtimeData
  196. |group by day , vehicleid
  197. |""".stripMargin)
  198. var trcResultDataFrame = trcTransProvinceCountDataFrame.join(transTimeCountMapDataFrame,
  199. Seq("day","vehicleid"),
  200. "outer")
  201. spark.sqlContext.dropTempTable("trcDataSet")
  202. spark.sqlContext.dropTempTable("trcProvince_table")
  203. spark.sqlContext.dropTempTable("transtimeData")
  204. spark.sqlContext.dropTempTable("gbupload_etctu")
  205. spark.sqlContext.dropTempTable("trc_table")
  206. log.info(trcResultDataFrame.printSchema())
  207. if (result_partitions> 0){
  208. trcResultDataFrame= trcResultDataFrame.repartition(result_partitions)
  209. }
  210. trcResultDataFrame.write.csv(out_path)
  211. spark.stop()
  212. }
  213. def formatSparkSession(): SparkSession ={
  214. // todo: 实例化SparkSession
  215. val spark: SparkSession = SparkSession
  216. .builder()
  217. // .appName("DataExtract")
  218. // .master("local[*]")
  219. .getOrCreate()
  220. spark
  221. }
  222. def exists(path:String):Boolean={
  223. Files.exists(Paths.get(path))
  224. }
  225. }