package com.aspirecn.jiaoke.data.extract import java.nio.file.{Files, Paths} import java.sql.Date import org.apache.log4j.{Level, Logger} import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession} import org.apache.spark.storage.StorageLevel import scala.collection.immutable /** * @auther: hwj * @date: 2021/4/13 10:39 * @description: */ object DataExtractByReadFile { case class TrcExetcpu(vehicleid:String, exTime:Date, enTime:Date,feemileage :String, shortfeemileage:String,fee:String, enweight:String, extolllaneid:String, axlecount:String, vehicletype:String, day:String ) case class GBUPLoad(vehicleid:String, transtime:String, day:String) def main(args: Array[String]): Unit = { val log: Logger = Logger.getLogger("DataExtractByReadFile") log.setLevel(Level.INFO) if (args.size != 6){ log.error( """ |请正确传入程序参数:1.门架数据读取目录 | 2.ETC数据读取目录 | 3.现金出口数据读取目录 | 4.结果文件的输出路径(文件输出目录在执行不可存在,程序会自动进行创建,否则会报错) | 5.checkpoint目录 | 6.结果文件输出个数,请根据数据量合理输入该参数,输出的文件越少,执行时间越长,输入0则为默认文件个数 |""".stripMargin) System.exit(0) } // 门架数据读取目录 val menjiaPath: String = args(0) // etc数据读取目录 val etcPath: String = args(1) // 现金出口数据读取目录 val qitachukouPath: String = args(2) // 结果文件输出目录 val out_path: String = args(3) // checkpoint目录 val check_path: String = args(4) // 结果文件输出个数 val result_partitions: Int = Integer.valueOf(args(5)) // 当数据读取目录不存在时,结束程序 if((!exists(menjiaPath))||(!exists(etcPath))||(!exists(qitachukouPath))){ log.error("请输入正确的文件目录:" + "高架数据路径:"+menjiaPath + "etc数据路径:"+etcPath + "现金出口数据路径:"+qitachukouPath) System.exit(0) } // 当结果数据输出目录已存在时,结束程序 if(exists(out_path)){ log.error("数据输出目录已存在"+out_path) System.exit(0) } // 初始化SparkSession val spark: SparkSession = formatSparkSession //设置checkpoint spark.sparkContext.setCheckpointDir(check_path) // spark.sparkContext.setCheckpointDir("file:///data/checkpoint") log.info("开始抽取数据") //todo:开始处理数据 import org.apache.spark.sql.functions._ // todo: 引入隐式转换 import spark.implicits._ // 高架数据 log.info("开始抽取门架数据,抽取路径为:"+menjiaPath) val gbuLoadDataset: Dataset[GBUPLoad] = spark.read.option("header", true).csv(menjiaPath).toDF("vehicleid", "transtime", "day").as[GBUPLoad].coalesce(36) log.info("抽取门架数据成功") //etc数据 log.info("开始抽取etc出口数据,抽取路径为:"+etcPath) val trcExetcpuDataset = spark.read.option("header", true).csv(etcPath).toDF("vehicleid","extime","entime","feemileage","shortfeemileage","fee","enweight","extolllaneid","axlecount","vehicletype","day").as[TrcExetcpu].coalesce(36) log.info("抽取etc出口数据成功") // 其他出口数据 log.info("抽取开始抽取现金出口数据,抽取路径为:"+qitachukouPath) val trcOtuDataset = spark.read.option("header", true).csv(qitachukouPath).toDF("vehicleid","extime","entime","feemileage","shortfeemileage","fee","enweight","extolllaneid","axlecount","vehicletype","day").as[TrcExetcpu].coalesce(36) log.info("现金出口数据成功") //合并etc出口和现金出口的数据 log.info("开始合并etc出口和现金出口数据") val trcDataSet: Dataset[TrcExetcpu] = trcExetcpuDataset.unionByName(trcOtuDataset) log.info("合并etc出口和现金出口数据成功") // 创建视图 对出口数据进行处理 trcDataSet.createOrReplaceTempView("trcDataSet") log.info("创建视图,开始统计出口相关数据") val trcDataSet_p: DataFrame = spark.sql( """ |select |vehicleid, |exTime, |entime, |(case when entime is not null then unix_timestamp(regexp_replace(extime,'T',' '))-unix_timestamp(regexp_replace(entime,'T',' ')) else 0 end ) as travel_time, |(case when feemileage>0 and feemileage is not null then feemileage else shortfeemileage end) as feemileage, | cast(fee as double) as fee, |(case when enweight >0 then ( case when feemileage>0 and feemileage is not null then feemileage else shortfeemileage end) else 0 end ) as weight_mileage, |substr(exTollLaneId,6,2) as travel_provinces, |axlecount, |vehicletype, |day |from |trcDataSet | |""".stripMargin) log.info("出口相关数据统计完成") // 取出出口表的 出口交易时间和入口交易时间,与门架表进行合并 log.info("开始抽取出口数据中的出口交易时间") val ex_transtime: Dataset[GBUPLoad] = trcDataSet_p.select($"day", $"vehicleid".as("vehicleid"), $"exTime".as("transtime")).as[GBUPLoad] log.info("抽取出口数据中的出口交易时间成功") log.info("开始抽取出口数据中的入口交易时间") val en_transtime: Dataset[GBUPLoad] = trcDataSet_p.select($"day", $"vehicleid".as("vehicleid"),$"enTime".as("transtime")).as[GBUPLoad] log.info("抽取出口数据中的入口交易时间成功") log.info("开始合并门架数据时间与 入口交易时间、出口交易时间") val exetcpuDataset: Dataset[GBUPLoad] = gbuLoadDataset.unionByName(ex_transtime).unionByName(en_transtime) log.info("合并交易时间成功") trcDataSet_p.persist(StorageLevel.MEMORY_AND_DISK) trcDataSet_p.checkpoint() trcDataSet_p.unpersist() // 创建视图 对用户出行数据进行分段统计次数 exetcpuDataset.createOrReplaceTempView("gbupload_etctu") val gbupload_etctu: Dataset[GBUPLoad] = spark.sql( """ |select vehicleid as vehicleid, |(case | when 0 <= hour(transtime) and hour(transtime)<=3 then 'A' | when 3 < hour(transtime) and hour(transtime)<=6 then 'B' | when 6 < hour(transtime) and hour(transtime)<=9 then 'C' | when 9 < hour(transtime) and hour(transtime)<=12 then 'D' | when 12 < hour(transtime) and hour(transtime)<=15 then 'E' | when 15 < hour(transtime) and hour(transtime)<=18 then 'F' | when 18 < hour(transtime) and hour(transtime)<=21 then 'G' | when 21 < hour(transtime) and hour(transtime)<24 then 'H' | else 'I' end) as transtime, |day |from |gbupload_etctu |""".stripMargin).as[GBUPLoad] gbupload_etctu.persist(StorageLevel.MEMORY_AND_DISK) gbupload_etctu.checkpoint() gbupload_etctu.unpersist() spark.sqlContext.dropTempTable("gbupload_etctu") //todo:根据车牌号进行分组聚合,统计最近一次出行时间、出行总时长、最长出行时间、出行总里程、出行花费总金额、负重出行总里程、出行次数 //合并etc出口和现金出口的数据 val trcDataFrame = trcDataSet_p .groupBy($"day", $"vehicleid") .agg(immutable.Map("exTime" -> "max", "travel_time" -> "sum", "travel_time" -> "max", "feemileage" -> "sum", "fee" -> "sum", "weight_mileage" -> "sum", "vehicleid" -> "count")) .withColumnRenamed("max(exTime)","max_exTime") .withColumnRenamed("max(travel_time)","max_travel_time") .withColumnRenamed("sum(travel_time)","sum_travel_time") .withColumnRenamed("sum(feemileage)","sum_feemileage") .withColumnRenamed("sum(fee)","sum_fee") .withColumnRenamed("sum(weight_mileage)","sum_weight_mileage") .withColumnRenamed("count(vehicleid)","exTimes_count") //todo: 取车辆轴数和收费类型 trcDataSet_p.createOrReplaceTempView("trc_table") val vehicleTypeAxlecount: DataFrame = spark.sql( """ |select |a.day as day, |a.vehicleid as vehicleid, |a.axlecount as axlecount, |a.vehicletype as vehicletype |from |(select day , |vehicleid , | axlecount , | vehicletype , | row_number() over(partition by day , vehicleid order by exTime desc) as rr |from |trc_table) a where a.rr=1 |""".stripMargin) // val trc_C_V_DataFrame: DataFrame = trcDataFrame.join(vehicleTypeAxlecount.select($"axlecount",$"vehicletype",$"day",$"vehicleid"), (trcDataFrame.col("vehicleid") === vehicleTypeAxlecount.col("vehicleid")).and(trcDataFrame.col("day") === vehicleTypeAxlecount.col("day")), "left") // 合并出口统计数据与用户轴数 val trc_C_V_DataFrame: DataFrame = trcDataFrame.join(vehicleTypeAxlecount, Seq("day","vehicleid"), "left") //todo: 抽取出口数据集中的省份编码 val trcProvinceDataSet: Dataset[Row] = trcDataSet_p.select($"day", $"vehicleid", $"travel_provinces").groupBy($"day", $"vehicleid", $"travel_provinces").count().withColumnRenamed("count","travel_provinces_count") trcProvinceDataSet.createOrReplaceTempView("trcProvince_table") val trcProvinceCountDataSet = spark.sql( """ |select day , vehicleid as vehicleid ,concat_ws("#",collect_list(concat_ws("%",travel_provinces,travel_provinces_count) )) as travel_provinces_count |from trcProvince_table |group by day , vehicleid |""".stripMargin) val trcTransProvinceCountDataFrame = trc_C_V_DataFrame.join(trcProvinceCountDataSet, Seq("day","vehicleid"), "outer") //统计每个牌照出行时间段的次数 val transtimeDataFrame:DataFrame = gbupload_etctu.groupBy($"day",$"vehicleid",$"transtime") .agg(count("transtime").as("transtime_preferences_count")) // 汇总车辆各个时间段出行次数,变成了map结构 transtimeDataFrame.createOrReplaceTempView("transtimeData") val transTimeCountMapDataFrame: DataFrame = spark.sql( """ |select day , |vehicleid , |concat_ws("#",collect_list(concat_ws("%",transtime,transtime_preferences_count) )) as transtime_count |from transtimeData |group by day , vehicleid |""".stripMargin) var trcResultDataFrame = trcTransProvinceCountDataFrame.join(transTimeCountMapDataFrame, Seq("day","vehicleid"), "outer") spark.sqlContext.dropTempTable("trcDataSet") spark.sqlContext.dropTempTable("trcProvince_table") spark.sqlContext.dropTempTable("transtimeData") spark.sqlContext.dropTempTable("gbupload_etctu") spark.sqlContext.dropTempTable("trc_table") log.info(trcResultDataFrame.printSchema()) if (result_partitions> 0){ trcResultDataFrame= trcResultDataFrame.repartition(result_partitions) } trcResultDataFrame.write.csv(out_path) spark.stop() } def formatSparkSession(): SparkSession ={ // todo: 实例化SparkSession val spark: SparkSession = SparkSession .builder() // .appName("DataExtract") // .master("local[*]") .getOrCreate() spark } def exists(path:String):Boolean={ Files.exists(Paths.get(path)) } }