|
@@ -0,0 +1,265 @@
|
|
|
+package info.aspirecn.cloud.yysj.dao.es;
|
|
|
+
|
|
|
+import info.aspirecn.cloud.yysj.config.IndexConfig;
|
|
|
+import info.aspirecn.cloud.yysj.model.entity.UserProduce;
|
|
|
+import info.aspirecn.cloud.yysj.model.response.ConsumptionInfo;
|
|
|
+import info.aspirecn.cloud.yysj.model.response.ExceptionStatistics;
|
|
|
+import info.aspirecn.cloud.yysj.model.response.Statistics;
|
|
|
+import lombok.SneakyThrows;
|
|
|
+import org.apache.commons.lang3.StringUtils;
|
|
|
+import org.elasticsearch.action.search.SearchRequest;
|
|
|
+import org.elasticsearch.action.search.SearchResponse;
|
|
|
+import org.elasticsearch.client.RequestOptions;
|
|
|
+import org.elasticsearch.client.RestHighLevelClient;
|
|
|
+import org.elasticsearch.index.query.QueryBuilders;
|
|
|
+import org.elasticsearch.rest.RestStatus;
|
|
|
+import org.elasticsearch.search.aggregations.AggregationBuilders;
|
|
|
+import org.elasticsearch.search.aggregations.Aggregations;
|
|
|
+import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
|
|
|
+import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
|
|
|
+import org.elasticsearch.search.aggregations.bucket.histogram.ParsedDateHistogram;
|
|
|
+import org.elasticsearch.search.aggregations.bucket.terms.ParsedTerms;
|
|
|
+import org.elasticsearch.search.aggregations.bucket.terms.Terms;
|
|
|
+import org.elasticsearch.search.aggregations.metrics.sum.ParsedSum;
|
|
|
+import org.elasticsearch.search.builder.SearchSourceBuilder;
|
|
|
+import org.springframework.beans.factory.annotation.Autowired;
|
|
|
+import org.springframework.stereotype.Repository;
|
|
|
+
|
|
|
+import java.io.IOException;
|
|
|
+import java.util.ArrayList;
|
|
|
+import java.util.HashMap;
|
|
|
+import java.util.List;
|
|
|
+import java.util.Map;
|
|
|
+
|
|
|
+/**
|
|
|
+ * ElasticSearch 云验数据平台子订单索引查询
|
|
|
+ *
|
|
|
+ * @author dingliqiang
|
|
|
+ */
|
|
|
+@Repository
|
|
|
+public class YysjGatewaySecondIndex {
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 是否计费(0:不计费,1:计费)
|
|
|
+ */
|
|
|
+ private static final int IS_CHARGE = 1;
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 一致
|
|
|
+ */
|
|
|
+ private static final int QUERY_RESULT_COINCIDE = 1;
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 不一致
|
|
|
+ */
|
|
|
+ private static final int QUERY_RESULT_NOT_COINCIDE = 0;
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 查询产品类型
|
|
|
+ */
|
|
|
+ private static final int[] QUERY_RESULT = {QUERY_RESULT_COINCIDE, QUERY_RESULT_NOT_COINCIDE};
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ private IndexConfig indexConfig;
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ private RestHighLevelClient restHighLevelClient;
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 获取一定时间内异常效验统计结果(获取不一致结果的统计信息)
|
|
|
+ *
|
|
|
+ * @param startTime 开始时间
|
|
|
+ * @param endTime 结束时间
|
|
|
+ * @param userId 云验数据平台用户ID
|
|
|
+ * @return 异常效验统计结果
|
|
|
+ */
|
|
|
+ @SneakyThrows(IOException.class)
|
|
|
+ public List<Statistics> getCountStatistics(String startTime, String endTime, String userId) {
|
|
|
+ // 设置返回结果
|
|
|
+ List<Statistics> exceptionStatisticsList = new ArrayList<>();
|
|
|
+ // 创建 Bool 查询构建器
|
|
|
+ SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
|
|
|
+ searchSourceBuilder.size(0);
|
|
|
+ searchSourceBuilder.query(QueryBuilders.boolQuery()
|
|
|
+ .filter(QueryBuilders.rangeQuery("DateTime").gte(startTime).lte(endTime).format("yyyy-MM-dd"))
|
|
|
+ .must(QueryBuilders.termQuery("MG-UserId", userId))
|
|
|
+ .must(QueryBuilders.termsQuery("MG-Result", QUERY_RESULT)));
|
|
|
+ searchSourceBuilder.aggregation(AggregationBuilders.dateHistogram("daily_count")
|
|
|
+ .field("DateTime").format("yyyy-MM-dd").dateHistogramInterval(DateHistogramInterval.DAY)
|
|
|
+ .subAggregation(AggregationBuilders.terms("normal_num").field("MG-Result"))
|
|
|
+ );
|
|
|
+ // 创建查询请求对象,将查询对象配置到其中
|
|
|
+ SearchRequest request = new SearchRequest(indexConfig.getIndexYysjSecond()).source(searchSourceBuilder);
|
|
|
+ // 执行请求
|
|
|
+ SearchResponse response = restHighLevelClient.search(request, RequestOptions.DEFAULT);
|
|
|
+ if (!RestStatus.OK.equals(response.status()) || response.getAggregations() == null) {
|
|
|
+ return exceptionStatisticsList;
|
|
|
+ }
|
|
|
+ // 获取响应中的聚合信息
|
|
|
+ Aggregations aggregations = response.getAggregations();
|
|
|
+ // 转换为 DateHistogram 对象
|
|
|
+ ParsedDateHistogram aggregation = aggregations.get("daily_count");
|
|
|
+ List<? extends Histogram.Bucket> buckets = aggregation.getBuckets();
|
|
|
+ for (Histogram.Bucket bucket : buckets) {
|
|
|
+ Statistics statistics = new Statistics();
|
|
|
+ // 设置日期
|
|
|
+ statistics.setDate(bucket.getKeyAsString());
|
|
|
+ // 设置总数
|
|
|
+ statistics.setSum(bucket.getDocCount());
|
|
|
+ // 设置一致结果统计数
|
|
|
+ ParsedTerms termsAggregations = bucket.getAggregations().get("normal_num");
|
|
|
+ List<? extends Terms.Bucket> termsBuckets = termsAggregations.getBuckets();
|
|
|
+ for (Terms.Bucket buck : termsBuckets) {
|
|
|
+ if (StringUtils.equalsIgnoreCase(String.valueOf(QUERY_RESULT_COINCIDE), buck.getKeyAsString())) {
|
|
|
+ statistics.setNormalNum(buck.getDocCount());
|
|
|
+ }
|
|
|
+ }
|
|
|
+ // 加入到集合
|
|
|
+ exceptionStatisticsList.add(statistics);
|
|
|
+ }
|
|
|
+ return exceptionStatisticsList;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 获取一定时间内总量与正常验统计结果(获取总数和一致结果的统计信息)
|
|
|
+ *
|
|
|
+ * @param startTime 开始时间
|
|
|
+ * @param endTime 结束时间
|
|
|
+ * @param userId 云验数据平台用户ID
|
|
|
+ * @return 总量与正常验统计结果
|
|
|
+ */
|
|
|
+ @SneakyThrows(IOException.class)
|
|
|
+ public List<ExceptionStatistics> getErrorStatistics(String startTime, String endTime, String userId) {
|
|
|
+ // 设置返回结果
|
|
|
+ List<ExceptionStatistics> exceptionStatisticsList = new ArrayList<>();
|
|
|
+ // 创建 Bool 查询构建器
|
|
|
+ SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
|
|
|
+ searchSourceBuilder.size(0);
|
|
|
+ searchSourceBuilder.query(QueryBuilders.boolQuery()
|
|
|
+ .filter(QueryBuilders.rangeQuery("DateTime").gte(startTime).lte(endTime).format("yyyy-MM-dd"))
|
|
|
+ .must(QueryBuilders.termQuery("MG-UserId", userId))
|
|
|
+ .must(QueryBuilders.termQuery("MG-Result", QUERY_RESULT_NOT_COINCIDE)));
|
|
|
+ searchSourceBuilder.aggregation(AggregationBuilders.dateHistogram("daily_count")
|
|
|
+ .field("DateTime").dateHistogramInterval(DateHistogramInterval.DAY).format("yyyy-MM-dd"));
|
|
|
+ // 创建查询请求对象,将查询对象配置到其中
|
|
|
+ SearchRequest request = new SearchRequest(indexConfig.getIndexYysjSecond()).source(searchSourceBuilder);
|
|
|
+ // 执行请求
|
|
|
+ SearchResponse response = restHighLevelClient.search(request, RequestOptions.DEFAULT);
|
|
|
+ if (!RestStatus.OK.equals(response.status()) || response.getAggregations() == null) {
|
|
|
+ return exceptionStatisticsList;
|
|
|
+ }
|
|
|
+ // 获取响应中的聚合信息
|
|
|
+ Aggregations aggregations = response.getAggregations();
|
|
|
+ // 转换为 DateHistogram 对象
|
|
|
+ ParsedDateHistogram aggregation = aggregations.get("daily_count");
|
|
|
+ List<? extends Histogram.Bucket> buckets = aggregation.getBuckets();
|
|
|
+ for (Histogram.Bucket bucket : buckets) {
|
|
|
+ ExceptionStatistics orderStatistics = new ExceptionStatistics();
|
|
|
+ // 设置日期
|
|
|
+ orderStatistics.setDate(bucket.getKeyAsString());
|
|
|
+ // 设置总数
|
|
|
+ orderStatistics.setNum(bucket.getDocCount());
|
|
|
+ // 加入到集合
|
|
|
+ exceptionStatisticsList.add(orderStatistics);
|
|
|
+ }
|
|
|
+ return exceptionStatisticsList;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 获取用户当日消费金额
|
|
|
+ *
|
|
|
+ * @param userId 云验平台用户ID
|
|
|
+ * @return 当日消费金额
|
|
|
+ */
|
|
|
+ @SneakyThrows(IOException.class)
|
|
|
+ public Long getAccountConsumptionAmount(String userId) {
|
|
|
+ // 构建查询源构建器
|
|
|
+ SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
|
|
|
+ searchSourceBuilder.size(0);
|
|
|
+ searchSourceBuilder.query(QueryBuilders.boolQuery()
|
|
|
+ .filter(QueryBuilders.rangeQuery("DateTime").gte("now/d").lte("now"))
|
|
|
+ .must(QueryBuilders.termQuery("MG-UserId", userId))
|
|
|
+ .must(QueryBuilders.termQuery("MG-IsCharge", IS_CHARGE)));
|
|
|
+ searchSourceBuilder.aggregation(AggregationBuilders.sum("product_price_sum").field("MG-Money"));
|
|
|
+ // 创建查询请求对象,将查询对象配置到其中
|
|
|
+ SearchRequest request = new SearchRequest(indexConfig.getIndexYysjSecond()).source(searchSourceBuilder);
|
|
|
+ // 执行请求
|
|
|
+ SearchResponse response = restHighLevelClient.search(request, RequestOptions.DEFAULT);
|
|
|
+ if (!RestStatus.OK.equals(response.status()) || response.getAggregations() == null) {
|
|
|
+ return 0L;
|
|
|
+ }
|
|
|
+ // 获取响应中的聚合信息
|
|
|
+ Aggregations aggregations = response.getAggregations();
|
|
|
+ ParsedSum aggregation = aggregations.get("product_price_sum");
|
|
|
+ return Math.round(aggregation.getValue());
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 获取云验平台用户当月订购产品的消费及占比信息
|
|
|
+ *
|
|
|
+ * @param userId 云验平台用户ID
|
|
|
+ * @param produceIds 产品ID列表
|
|
|
+ * @return 用户当月订购产品的消费及占比信息
|
|
|
+ */
|
|
|
+ @SneakyThrows(IOException.class)
|
|
|
+ public Map<Integer, ConsumptionInfo> getConsumptionInfo(String userId, List<UserProduce> userProduceList, Integer[] produceIds) {
|
|
|
+ Map<Integer, ConsumptionInfo> consumptionInfoMap = new HashMap<>();
|
|
|
+ // 构建查询源构建器
|
|
|
+ SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
|
|
|
+ searchSourceBuilder.size(0);
|
|
|
+ searchSourceBuilder.query(QueryBuilders.boolQuery().filter(QueryBuilders.rangeQuery("DateTime").gte("now/M"))
|
|
|
+ .must(QueryBuilders.termQuery("MG-UserId", userId))
|
|
|
+ .must(QueryBuilders.termQuery("MG-IsCharge", IS_CHARGE))
|
|
|
+ .must(QueryBuilders.termsQuery("MG-ProductId", produceIds)));
|
|
|
+ searchSourceBuilder.aggregation(AggregationBuilders.terms("product_usage_analysis").field("MG-ProductId")
|
|
|
+ .subAggregation(AggregationBuilders.sum("price_sum").field("MG-Money")));
|
|
|
+ // 创建查询请求对象,将查询对象配置到其中
|
|
|
+ SearchRequest request = new SearchRequest(indexConfig.getIndexYysjSecond()).source(searchSourceBuilder);
|
|
|
+ // 执行查询请求,获取响应信息
|
|
|
+ SearchResponse response = restHighLevelClient.search(request, RequestOptions.DEFAULT);
|
|
|
+ if (!RestStatus.OK.equals(response.status()) || response.getAggregations() == null) {
|
|
|
+ return consumptionInfoMap;
|
|
|
+ }
|
|
|
+ // 获取响应中的聚合信息
|
|
|
+ Aggregations aggregations = response.getAggregations();
|
|
|
+ // 转换为 Terms 对象
|
|
|
+ ParsedTerms aggregation = aggregations.get("product_usage_analysis");
|
|
|
+ List<? extends Terms.Bucket> buckets = aggregation.getBuckets();
|
|
|
+ // 合并各个产品信息,然后返回数据
|
|
|
+ consumptionInfoMap.putAll(getCarDriverWaybill(buckets, userProduceList));
|
|
|
+ return consumptionInfoMap;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 获取数据交互中各个产品信息,如果数据交互多个产品同属于云验平台,则将这些产品汇总
|
|
|
+ *
|
|
|
+ * @param buckets 分桶信息
|
|
|
+ */
|
|
|
+ private Map<Integer, ConsumptionInfo> getCarDriverWaybill(List<? extends Terms.Bucket> buckets, List<UserProduce> userProduceList) {
|
|
|
+ Map<Integer, Long> produceMap = new HashMap<>(50);
|
|
|
+ for (UserProduce userProduce : userProduceList) {
|
|
|
+ for (Terms.Bucket bucket : buckets) {
|
|
|
+ if (StringUtils.equalsIgnoreCase(String.valueOf(userProduce.getProduceId()), bucket.getKeyAsString())) {
|
|
|
+ ParsedSum parsedSum = bucket.getAggregations().get("price_sum");
|
|
|
+ long value;
|
|
|
+ if (produceMap.containsKey(userProduce.getProduceId())) {
|
|
|
+ value = produceMap.get(userProduce.getProduceId()) + Math.round(parsedSum.getValue());
|
|
|
+ } else {
|
|
|
+ value = Math.round(parsedSum.getValue());
|
|
|
+ }
|
|
|
+ produceMap.put(userProduce.getProduceId(), value);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ Map<Integer, ConsumptionInfo> consumptionInfoMap = new HashMap<>(50);
|
|
|
+ // 设置各个产品的金额比例
|
|
|
+ for (Map.Entry<Integer, Long> entry : produceMap.entrySet()) {
|
|
|
+ // 设置产品消费信息
|
|
|
+ consumptionInfoMap.put(entry.getKey(), new ConsumptionInfo()
|
|
|
+ .setProductId(entry.getKey())
|
|
|
+ .setConsumptionAmount(String.valueOf(entry.getValue())));
|
|
|
+ }
|
|
|
+ return consumptionInfoMap;
|
|
|
+ }
|
|
|
+
|
|
|
+}
|