|
@@ -0,0 +1,248 @@
|
|
|
+package com.aspirecn.jiaoke.data.extract
|
|
|
+
|
|
|
+import java.nio.file.{Files, Paths}
|
|
|
+import java.sql.Date
|
|
|
+
|
|
|
+import org.apache.log4j.{Level, Logger}
|
|
|
+import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
|
|
|
+import org.apache.spark.storage.StorageLevel
|
|
|
+
|
|
|
+import scala.collection.immutable
|
|
|
+
|
|
|
+
|
|
|
+/**
|
|
|
+ * @auther: hwj
|
|
|
+ * @date: 2021/4/13 10:39
|
|
|
+ * @description:
|
|
|
+ */
|
|
|
+object DataExtractByReadFile {
|
|
|
+ case class TrcExetcpu(vehicleid:String, exTime:Date, enTime:Date,feemileage :String, shortfeemileage:String,fee:String, enweight:String, extolllaneid:String, axlecount:String, vehicletype:String, day:String )
|
|
|
+ case class GBUPLoad(vehicleid:String, transtime:String, day:String)
|
|
|
+ def main(args: Array[String]): Unit = {
|
|
|
+ val log: Logger = Logger.getLogger("DataExtractByReadFile")
|
|
|
+ log.setLevel(Level.INFO)
|
|
|
+ if (args.size != 6){
|
|
|
+ log.error(
|
|
|
+ """
|
|
|
+ |请正确传入程序参数:1.门架数据读取目录
|
|
|
+ | 2.ETC数据读取目录
|
|
|
+ | 3.现金出口数据读取目录
|
|
|
+ | 4.结果文件的输出路径(文件输出目录在执行不可存在,程序会自动进行创建,否则会报错)
|
|
|
+ | 5.checkpoint目录
|
|
|
+ | 6.结果文件输出个数,请根据数据量合理输入该参数,输出的文件越少,执行时间越长,输入0则为默认文件个数
|
|
|
+ |""".stripMargin)
|
|
|
+ System.exit(0)
|
|
|
+ }
|
|
|
+
|
|
|
+ // 门架数据读取目录
|
|
|
+ val menjiaPath: String = args(0)
|
|
|
+ // etc数据读取目录
|
|
|
+ val etcPath: String = args(1)
|
|
|
+ // 现金出口数据读取目录
|
|
|
+ val qitachukouPath: String = args(2)
|
|
|
+ // 结果文件输出目录
|
|
|
+ val out_path: String = args(3)
|
|
|
+ // checkpoint目录
|
|
|
+ val check_path: String = args(4)
|
|
|
+ // 结果文件输出个数
|
|
|
+ val result_partitions: Int = Integer.valueOf(args(5))
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+ // 当数据读取目录不存在时,结束程序
|
|
|
+ if((!exists(menjiaPath))||(!exists(etcPath))||(!exists(qitachukouPath))){
|
|
|
+ log.error("请输入正确的文件目录:" +
|
|
|
+ "高架数据路径:"+menjiaPath +
|
|
|
+ "etc数据路径:"+etcPath +
|
|
|
+ "现金出口数据路径:"+qitachukouPath)
|
|
|
+ System.exit(0)
|
|
|
+ }
|
|
|
+ // 当结果数据输出目录已存在时,结束程序
|
|
|
+ if(exists(out_path)){
|
|
|
+ log.error("数据输出目录已存在"+out_path)
|
|
|
+ System.exit(0)
|
|
|
+ }
|
|
|
+ // 初始化SparkSession
|
|
|
+ val spark: SparkSession = formatSparkSession
|
|
|
+ //设置checkpoint
|
|
|
+ spark.sparkContext.setCheckpointDir(check_path)
|
|
|
+ // spark.sparkContext.setCheckpointDir("file:///data/checkpoint")
|
|
|
+
|
|
|
+ log.info("开始抽取数据")
|
|
|
+ //todo:开始处理数据
|
|
|
+ import org.apache.spark.sql.functions._
|
|
|
+ // todo: 引入隐式转换
|
|
|
+ import spark.implicits._
|
|
|
+ // 高架数据
|
|
|
+ log.info("开始抽取门架数据,抽取路径为:"+menjiaPath)
|
|
|
+ val gbuLoadDataset: Dataset[GBUPLoad] = spark.read.option("header", true).csv(menjiaPath).toDF("vehicleid", "transtime", "day").as[GBUPLoad].coalesce(36)
|
|
|
+ log.info("抽取门架数据成功")
|
|
|
+ //etc数据
|
|
|
+ log.info("开始抽取etc出口数据,抽取路径为:"+etcPath)
|
|
|
+ val trcExetcpuDataset = spark.read.option("header", true).csv(etcPath).toDF("vehicleid","extime","entime","feemileage","shortfeemileage","fee","enweight","extolllaneid","axlecount","vehicletype","day").as[TrcExetcpu].coalesce(36)
|
|
|
+ log.info("抽取etc出口数据成功")
|
|
|
+ // 其他出口数据
|
|
|
+ log.info("抽取开始抽取现金出口数据,抽取路径为:"+qitachukouPath)
|
|
|
+ val trcOtuDataset = spark.read.option("header", true).csv(qitachukouPath).toDF("vehicleid","extime","entime","feemileage","shortfeemileage","fee","enweight","extolllaneid","axlecount","vehicletype","day").as[TrcExetcpu].coalesce(36)
|
|
|
+ log.info("现金出口数据成功")
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+ //合并etc出口和现金出口的数据
|
|
|
+ log.info("开始合并etc出口和现金出口数据")
|
|
|
+ val trcDataSet: Dataset[TrcExetcpu] = trcExetcpuDataset.unionByName(trcOtuDataset)
|
|
|
+ log.info("合并etc出口和现金出口数据成功")
|
|
|
+ // 创建视图 对出口数据进行处理
|
|
|
+ trcDataSet.createOrReplaceTempView("trcDataSet")
|
|
|
+ log.info("创建视图,开始统计出口相关数据")
|
|
|
+ val trcDataSet_p: DataFrame = spark.sql(
|
|
|
+ """
|
|
|
+ |select
|
|
|
+ |vehicleid,
|
|
|
+ |exTime,
|
|
|
+ |entime,
|
|
|
+ |(case when entime is not null then unix_timestamp(regexp_replace(extime,'T',' '))-unix_timestamp(regexp_replace(entime,'T',' ')) else 0 end ) as travel_time,
|
|
|
+ |(case when feemileage>0 and feemileage is not null then feemileage else shortfeemileage end) as feemileage,
|
|
|
+ | cast(fee as double) as fee,
|
|
|
+ |(case when enweight >0 then ( case when feemileage>0 and feemileage is not null then feemileage else shortfeemileage end) else 0 end ) as weight_mileage,
|
|
|
+ |substr(exTollLaneId,6,2) as travel_provinces,
|
|
|
+ |axlecount,
|
|
|
+ |vehicletype,
|
|
|
+ |day
|
|
|
+ |from
|
|
|
+ |trcDataSet
|
|
|
+ |
|
|
|
+ |""".stripMargin)
|
|
|
+ log.info("出口相关数据统计完成")
|
|
|
+ // 取出出口表的 出口交易时间和入口交易时间,与门架表进行合并
|
|
|
+ log.info("开始抽取出口数据中的出口交易时间")
|
|
|
+ val ex_transtime: Dataset[GBUPLoad] = trcDataSet_p.select($"day", $"vehicleid".as("vehicleid"), $"exTime".as("transtime")).as[GBUPLoad]
|
|
|
+ log.info("抽取出口数据中的出口交易时间成功")
|
|
|
+ log.info("开始抽取出口数据中的入口交易时间")
|
|
|
+ val en_transtime: Dataset[GBUPLoad] = trcDataSet_p.select($"day", $"vehicleid".as("vehicleid"),$"enTime".as("transtime")).as[GBUPLoad]
|
|
|
+ log.info("抽取出口数据中的入口交易时间成功")
|
|
|
+ log.info("开始合并门架数据时间与 入口交易时间、出口交易时间")
|
|
|
+ val exetcpuDataset: Dataset[GBUPLoad] = gbuLoadDataset.unionByName(ex_transtime).unionByName(en_transtime)
|
|
|
+ log.info("合并交易时间成功")
|
|
|
+ trcDataSet_p.persist(StorageLevel.MEMORY_AND_DISK)
|
|
|
+ trcDataSet_p.checkpoint()
|
|
|
+ trcDataSet_p.unpersist()
|
|
|
+ // 创建视图 对用户出行数据进行分段统计次数
|
|
|
+ exetcpuDataset.createOrReplaceTempView("gbupload_etctu")
|
|
|
+ val gbupload_etctu: Dataset[GBUPLoad] = spark.sql(
|
|
|
+ """
|
|
|
+ |select vehicleid as vehicleid,
|
|
|
+ |(case
|
|
|
+ | when 0 <= hour(transtime) and hour(transtime)<=3 then 'A'
|
|
|
+ | when 3 < hour(transtime) and hour(transtime)<=6 then 'B'
|
|
|
+ | when 6 < hour(transtime) and hour(transtime)<=9 then 'C'
|
|
|
+ | when 9 < hour(transtime) and hour(transtime)<=12 then 'D'
|
|
|
+ | when 12 < hour(transtime) and hour(transtime)<=15 then 'E'
|
|
|
+ | when 15 < hour(transtime) and hour(transtime)<=18 then 'F'
|
|
|
+ | when 18 < hour(transtime) and hour(transtime)<=21 then 'G'
|
|
|
+ | when 21 < hour(transtime) and hour(transtime)<24 then 'H'
|
|
|
+ | else 'I' end) as transtime,
|
|
|
+ |day
|
|
|
+ |from
|
|
|
+ |gbupload_etctu
|
|
|
+ |""".stripMargin).as[GBUPLoad]
|
|
|
+ gbupload_etctu.persist(StorageLevel.MEMORY_AND_DISK)
|
|
|
+ gbupload_etctu.checkpoint()
|
|
|
+ gbupload_etctu.unpersist()
|
|
|
+ spark.sqlContext.dropTempTable("gbupload_etctu")
|
|
|
+
|
|
|
+ //todo:根据车牌号进行分组聚合,统计最近一次出行时间、出行总时长、最长出行时间、出行总里程、出行花费总金额、负重出行总里程、出行次数
|
|
|
+ //合并etc出口和现金出口的数据
|
|
|
+ val trcDataFrame = trcDataSet_p
|
|
|
+ .groupBy($"day", $"vehicleid")
|
|
|
+ .agg(immutable.Map("exTime" -> "max", "travel_time" -> "sum", "travel_time" -> "max", "feemileage" -> "sum", "fee" -> "sum", "weight_mileage" -> "sum", "vehicleid" -> "count"))
|
|
|
+ .withColumnRenamed("max(exTime)","max_exTime")
|
|
|
+ .withColumnRenamed("max(travel_time)","max_travel_time")
|
|
|
+ .withColumnRenamed("sum(travel_time)","sum_travel_time")
|
|
|
+ .withColumnRenamed("sum(feemileage)","sum_feemileage")
|
|
|
+ .withColumnRenamed("sum(fee)","sum_fee")
|
|
|
+ .withColumnRenamed("sum(weight_mileage)","sum_weight_mileage")
|
|
|
+ .withColumnRenamed("count(vehicleid)","exTimes_count")
|
|
|
+ //todo: 取车辆轴数和收费类型
|
|
|
+ trcDataSet_p.createOrReplaceTempView("trc_table")
|
|
|
+ val vehicleTypeAxlecount: DataFrame = spark.sql(
|
|
|
+ """
|
|
|
+ |select
|
|
|
+ |a.day as day,
|
|
|
+ |a.vehicleid as vehicleid,
|
|
|
+ |a.axlecount as axlecount,
|
|
|
+ |a.vehicletype as vehicletype
|
|
|
+ |from
|
|
|
+ |(select day ,
|
|
|
+ |vehicleid ,
|
|
|
+ | axlecount ,
|
|
|
+ | vehicletype ,
|
|
|
+ | row_number() over(partition by day , vehicleid order by exTime desc) as rr
|
|
|
+ |from
|
|
|
+ |trc_table) a where a.rr=1
|
|
|
+ |""".stripMargin)
|
|
|
+ // val trc_C_V_DataFrame: DataFrame = trcDataFrame.join(vehicleTypeAxlecount.select($"axlecount",$"vehicletype",$"day",$"vehicleid"), (trcDataFrame.col("vehicleid") === vehicleTypeAxlecount.col("vehicleid")).and(trcDataFrame.col("day") === vehicleTypeAxlecount.col("day")), "left")
|
|
|
+ // 合并出口统计数据与用户轴数
|
|
|
+ val trc_C_V_DataFrame: DataFrame = trcDataFrame.join(vehicleTypeAxlecount, Seq("day","vehicleid"), "left")
|
|
|
+ //todo: 抽取出口数据集中的省份编码
|
|
|
+ val trcProvinceDataSet: Dataset[Row] = trcDataSet_p.select($"day", $"vehicleid", $"travel_provinces").groupBy($"day", $"vehicleid", $"travel_provinces").count().withColumnRenamed("count","travel_provinces_count")
|
|
|
+ trcProvinceDataSet.createOrReplaceTempView("trcProvince_table")
|
|
|
+ val trcProvinceCountDataSet = spark.sql(
|
|
|
+ """
|
|
|
+ |select day , vehicleid as vehicleid ,concat_ws("#",collect_list(concat_ws("%",travel_provinces,travel_provinces_count) )) as travel_provinces_count
|
|
|
+ |from trcProvince_table
|
|
|
+ |group by day , vehicleid
|
|
|
+ |""".stripMargin)
|
|
|
+ val trcTransProvinceCountDataFrame = trc_C_V_DataFrame.join(trcProvinceCountDataSet,
|
|
|
+ Seq("day","vehicleid"),
|
|
|
+ "outer")
|
|
|
+
|
|
|
+
|
|
|
+ //统计每个牌照出行时间段的次数
|
|
|
+ val transtimeDataFrame:DataFrame = gbupload_etctu.groupBy($"day",$"vehicleid",$"transtime")
|
|
|
+ .agg(count("transtime").as("transtime_preferences_count"))
|
|
|
+ // 汇总车辆各个时间段出行次数,变成了map结构
|
|
|
+ transtimeDataFrame.createOrReplaceTempView("transtimeData")
|
|
|
+ val transTimeCountMapDataFrame: DataFrame = spark.sql(
|
|
|
+ """
|
|
|
+ |select day ,
|
|
|
+ |vehicleid ,
|
|
|
+ |concat_ws("#",collect_list(concat_ws("%",transtime,transtime_preferences_count) )) as transtime_count
|
|
|
+ |from transtimeData
|
|
|
+ |group by day , vehicleid
|
|
|
+ |""".stripMargin)
|
|
|
+ var trcResultDataFrame = trcTransProvinceCountDataFrame.join(transTimeCountMapDataFrame,
|
|
|
+ Seq("day","vehicleid"),
|
|
|
+ "outer")
|
|
|
+ spark.sqlContext.dropTempTable("trcDataSet")
|
|
|
+ spark.sqlContext.dropTempTable("trcProvince_table")
|
|
|
+ spark.sqlContext.dropTempTable("transtimeData")
|
|
|
+ spark.sqlContext.dropTempTable("gbupload_etctu")
|
|
|
+ spark.sqlContext.dropTempTable("trc_table")
|
|
|
+ log.info(trcResultDataFrame.printSchema())
|
|
|
+ if (result_partitions> 0){
|
|
|
+ trcResultDataFrame= trcResultDataFrame.repartition(result_partitions)
|
|
|
+ }
|
|
|
+ trcResultDataFrame.write.csv(out_path)
|
|
|
+
|
|
|
+ spark.stop()
|
|
|
+
|
|
|
+ }
|
|
|
+ def formatSparkSession(): SparkSession ={
|
|
|
+ // todo: 实例化SparkSession
|
|
|
+ val spark: SparkSession = SparkSession
|
|
|
+ .builder()
|
|
|
+ // .appName("DataExtract")
|
|
|
+ // .master("local[*]")
|
|
|
+ .getOrCreate()
|
|
|
+ spark
|
|
|
+ }
|
|
|
+
|
|
|
+ def exists(path:String):Boolean={
|
|
|
+ Files.exists(Paths.get(path))
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+}
|