123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248 |
- package com.aspirecn.jiaoke.data.extract
- import java.nio.file.{Files, Paths}
- import java.sql.Date
- import org.apache.log4j.{Level, Logger}
- import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
- import org.apache.spark.storage.StorageLevel
- import scala.collection.immutable
- /**
- * @auther: hwj
- * @date: 2021/4/13 10:39
- * @description:
- */
- object DataExtractByReadFile {
- case class TrcExetcpu(vehicleid:String, exTime:Date, enTime:Date,feemileage :String, shortfeemileage:String,fee:String, enweight:String, extolllaneid:String, axlecount:String, vehicletype:String, day:String )
- case class GBUPLoad(vehicleid:String, transtime:String, day:String)
- def main(args: Array[String]): Unit = {
- val log: Logger = Logger.getLogger("DataExtractByReadFile")
- log.setLevel(Level.INFO)
- if (args.size != 6){
- log.error(
- """
- |请正确传入程序参数:1.门架数据读取目录
- | 2.ETC数据读取目录
- | 3.现金出口数据读取目录
- | 4.结果文件的输出路径(文件输出目录在执行不可存在,程序会自动进行创建,否则会报错)
- | 5.checkpoint目录
- | 6.结果文件输出个数,请根据数据量合理输入该参数,输出的文件越少,执行时间越长,输入0则为默认文件个数
- |""".stripMargin)
- System.exit(0)
- }
- // 门架数据读取目录
- val menjiaPath: String = args(0)
- // etc数据读取目录
- val etcPath: String = args(1)
- // 现金出口数据读取目录
- val qitachukouPath: String = args(2)
- // 结果文件输出目录
- val out_path: String = args(3)
- // checkpoint目录
- val check_path: String = args(4)
- // 结果文件输出个数
- val result_partitions: Int = Integer.valueOf(args(5))
- // 当数据读取目录不存在时,结束程序
- if((!exists(menjiaPath))||(!exists(etcPath))||(!exists(qitachukouPath))){
- log.error("请输入正确的文件目录:" +
- "高架数据路径:"+menjiaPath +
- "etc数据路径:"+etcPath +
- "现金出口数据路径:"+qitachukouPath)
- System.exit(0)
- }
- // 当结果数据输出目录已存在时,结束程序
- if(exists(out_path)){
- log.error("数据输出目录已存在"+out_path)
- System.exit(0)
- }
- // 初始化SparkSession
- val spark: SparkSession = formatSparkSession
- //设置checkpoint
- spark.sparkContext.setCheckpointDir(check_path)
- // spark.sparkContext.setCheckpointDir("file:///data/checkpoint")
- log.info("开始抽取数据")
- //todo:开始处理数据
- import org.apache.spark.sql.functions._
- // todo: 引入隐式转换
- import spark.implicits._
- // 高架数据
- log.info("开始抽取门架数据,抽取路径为:"+menjiaPath)
- val gbuLoadDataset: Dataset[GBUPLoad] = spark.read.option("header", true).csv(menjiaPath).toDF("vehicleid", "transtime", "day").as[GBUPLoad].coalesce(36)
- log.info("抽取门架数据成功")
- //etc数据
- log.info("开始抽取etc出口数据,抽取路径为:"+etcPath)
- val trcExetcpuDataset = spark.read.option("header", true).csv(etcPath).toDF("vehicleid","extime","entime","feemileage","shortfeemileage","fee","enweight","extolllaneid","axlecount","vehicletype","day").as[TrcExetcpu].coalesce(36)
- log.info("抽取etc出口数据成功")
- // 其他出口数据
- log.info("抽取开始抽取现金出口数据,抽取路径为:"+qitachukouPath)
- val trcOtuDataset = spark.read.option("header", true).csv(qitachukouPath).toDF("vehicleid","extime","entime","feemileage","shortfeemileage","fee","enweight","extolllaneid","axlecount","vehicletype","day").as[TrcExetcpu].coalesce(36)
- log.info("现金出口数据成功")
- //合并etc出口和现金出口的数据
- log.info("开始合并etc出口和现金出口数据")
- val trcDataSet: Dataset[TrcExetcpu] = trcExetcpuDataset.unionByName(trcOtuDataset)
- log.info("合并etc出口和现金出口数据成功")
- // 创建视图 对出口数据进行处理
- trcDataSet.createOrReplaceTempView("trcDataSet")
- log.info("创建视图,开始统计出口相关数据")
- val trcDataSet_p: DataFrame = spark.sql(
- """
- |select
- |vehicleid,
- |exTime,
- |entime,
- |(case when entime is not null then unix_timestamp(regexp_replace(extime,'T',' '))-unix_timestamp(regexp_replace(entime,'T',' ')) else 0 end ) as travel_time,
- |(case when feemileage>0 and feemileage is not null then feemileage else shortfeemileage end) as feemileage,
- | cast(fee as double) as fee,
- |(case when enweight >0 then ( case when feemileage>0 and feemileage is not null then feemileage else shortfeemileage end) else 0 end ) as weight_mileage,
- |substr(exTollLaneId,6,2) as travel_provinces,
- |axlecount,
- |vehicletype,
- |day
- |from
- |trcDataSet
- |
- |""".stripMargin)
- log.info("出口相关数据统计完成")
- // 取出出口表的 出口交易时间和入口交易时间,与门架表进行合并
- log.info("开始抽取出口数据中的出口交易时间")
- val ex_transtime: Dataset[GBUPLoad] = trcDataSet_p.select($"day", $"vehicleid".as("vehicleid"), $"exTime".as("transtime")).as[GBUPLoad]
- log.info("抽取出口数据中的出口交易时间成功")
- log.info("开始抽取出口数据中的入口交易时间")
- val en_transtime: Dataset[GBUPLoad] = trcDataSet_p.select($"day", $"vehicleid".as("vehicleid"),$"enTime".as("transtime")).as[GBUPLoad]
- log.info("抽取出口数据中的入口交易时间成功")
- log.info("开始合并门架数据时间与 入口交易时间、出口交易时间")
- val exetcpuDataset: Dataset[GBUPLoad] = gbuLoadDataset.unionByName(ex_transtime).unionByName(en_transtime)
- log.info("合并交易时间成功")
- trcDataSet_p.persist(StorageLevel.MEMORY_AND_DISK)
- trcDataSet_p.checkpoint()
- trcDataSet_p.unpersist()
- // 创建视图 对用户出行数据进行分段统计次数
- exetcpuDataset.createOrReplaceTempView("gbupload_etctu")
- val gbupload_etctu: Dataset[GBUPLoad] = spark.sql(
- """
- |select vehicleid as vehicleid,
- |(case
- | when 0 <= hour(transtime) and hour(transtime)<=3 then 'A'
- | when 3 < hour(transtime) and hour(transtime)<=6 then 'B'
- | when 6 < hour(transtime) and hour(transtime)<=9 then 'C'
- | when 9 < hour(transtime) and hour(transtime)<=12 then 'D'
- | when 12 < hour(transtime) and hour(transtime)<=15 then 'E'
- | when 15 < hour(transtime) and hour(transtime)<=18 then 'F'
- | when 18 < hour(transtime) and hour(transtime)<=21 then 'G'
- | when 21 < hour(transtime) and hour(transtime)<24 then 'H'
- | else 'I' end) as transtime,
- |day
- |from
- |gbupload_etctu
- |""".stripMargin).as[GBUPLoad]
- gbupload_etctu.persist(StorageLevel.MEMORY_AND_DISK)
- gbupload_etctu.checkpoint()
- gbupload_etctu.unpersist()
- spark.sqlContext.dropTempTable("gbupload_etctu")
- //todo:根据车牌号进行分组聚合,统计最近一次出行时间、出行总时长、最长出行时间、出行总里程、出行花费总金额、负重出行总里程、出行次数
- //合并etc出口和现金出口的数据
- val trcDataFrame = trcDataSet_p
- .groupBy($"day", $"vehicleid")
- .agg(immutable.Map("exTime" -> "max", "travel_time" -> "sum", "travel_time" -> "max", "feemileage" -> "sum", "fee" -> "sum", "weight_mileage" -> "sum", "vehicleid" -> "count"))
- .withColumnRenamed("max(exTime)","max_exTime")
- .withColumnRenamed("max(travel_time)","max_travel_time")
- .withColumnRenamed("sum(travel_time)","sum_travel_time")
- .withColumnRenamed("sum(feemileage)","sum_feemileage")
- .withColumnRenamed("sum(fee)","sum_fee")
- .withColumnRenamed("sum(weight_mileage)","sum_weight_mileage")
- .withColumnRenamed("count(vehicleid)","exTimes_count")
- //todo: 取车辆轴数和收费类型
- trcDataSet_p.createOrReplaceTempView("trc_table")
- val vehicleTypeAxlecount: DataFrame = spark.sql(
- """
- |select
- |a.day as day,
- |a.vehicleid as vehicleid,
- |a.axlecount as axlecount,
- |a.vehicletype as vehicletype
- |from
- |(select day ,
- |vehicleid ,
- | axlecount ,
- | vehicletype ,
- | row_number() over(partition by day , vehicleid order by exTime desc) as rr
- |from
- |trc_table) a where a.rr=1
- |""".stripMargin)
- // val trc_C_V_DataFrame: DataFrame = trcDataFrame.join(vehicleTypeAxlecount.select($"axlecount",$"vehicletype",$"day",$"vehicleid"), (trcDataFrame.col("vehicleid") === vehicleTypeAxlecount.col("vehicleid")).and(trcDataFrame.col("day") === vehicleTypeAxlecount.col("day")), "left")
- // 合并出口统计数据与用户轴数
- val trc_C_V_DataFrame: DataFrame = trcDataFrame.join(vehicleTypeAxlecount, Seq("day","vehicleid"), "left")
- //todo: 抽取出口数据集中的省份编码
- val trcProvinceDataSet: Dataset[Row] = trcDataSet_p.select($"day", $"vehicleid", $"travel_provinces").groupBy($"day", $"vehicleid", $"travel_provinces").count().withColumnRenamed("count","travel_provinces_count")
- trcProvinceDataSet.createOrReplaceTempView("trcProvince_table")
- val trcProvinceCountDataSet = spark.sql(
- """
- |select day , vehicleid as vehicleid ,concat_ws("#",collect_list(concat_ws("%",travel_provinces,travel_provinces_count) )) as travel_provinces_count
- |from trcProvince_table
- |group by day , vehicleid
- |""".stripMargin)
- val trcTransProvinceCountDataFrame = trc_C_V_DataFrame.join(trcProvinceCountDataSet,
- Seq("day","vehicleid"),
- "outer")
- //统计每个牌照出行时间段的次数
- val transtimeDataFrame:DataFrame = gbupload_etctu.groupBy($"day",$"vehicleid",$"transtime")
- .agg(count("transtime").as("transtime_preferences_count"))
- // 汇总车辆各个时间段出行次数,变成了map结构
- transtimeDataFrame.createOrReplaceTempView("transtimeData")
- val transTimeCountMapDataFrame: DataFrame = spark.sql(
- """
- |select day ,
- |vehicleid ,
- |concat_ws("#",collect_list(concat_ws("%",transtime,transtime_preferences_count) )) as transtime_count
- |from transtimeData
- |group by day , vehicleid
- |""".stripMargin)
- var trcResultDataFrame = trcTransProvinceCountDataFrame.join(transTimeCountMapDataFrame,
- Seq("day","vehicleid"),
- "outer")
- spark.sqlContext.dropTempTable("trcDataSet")
- spark.sqlContext.dropTempTable("trcProvince_table")
- spark.sqlContext.dropTempTable("transtimeData")
- spark.sqlContext.dropTempTable("gbupload_etctu")
- spark.sqlContext.dropTempTable("trc_table")
- log.info(trcResultDataFrame.printSchema())
- if (result_partitions> 0){
- trcResultDataFrame= trcResultDataFrame.repartition(result_partitions)
- }
- trcResultDataFrame.write.csv(out_path)
- spark.stop()
- }
- def formatSparkSession(): SparkSession ={
- // todo: 实例化SparkSession
- val spark: SparkSession = SparkSession
- .builder()
- // .appName("DataExtract")
- // .master("local[*]")
- .getOrCreate()
- spark
- }
- def exists(path:String):Boolean={
- Files.exists(Paths.get(path))
- }
- }
|