|
@@ -50,10 +50,9 @@ public class RecordStorageServiceImpl implements RecordStorageService {
|
|
|
|
|
|
@Override
|
|
@Override
|
|
public CommonResponseObject recordStorage(List<MonthResult> monthResultList) {
|
|
public CommonResponseObject recordStorage(List<MonthResult> monthResultList) {
|
|
|
|
+ log.info("待存入ES:{}", monthResultList.size());
|
|
//设置保存的索引和id
|
|
//设置保存的索引和id
|
|
String indexName = "index-record-" + monthResultList.get(0).getMonth();
|
|
String indexName = "index-record-" + monthResultList.get(0).getMonth();
|
|
- // todo
|
|
|
|
-// String indexName = "index-record-test" + monthResultList.get(0).getMonth();
|
|
|
|
BulkRequest bulkRequest=new BulkRequest();
|
|
BulkRequest bulkRequest=new BulkRequest();
|
|
|
|
|
|
// 发送es失败id集合
|
|
// 发送es失败id集合
|
|
@@ -61,26 +60,23 @@ public class RecordStorageServiceImpl implements RecordStorageService {
|
|
|
|
|
|
//保存的数据 这里指定json类型
|
|
//保存的数据 这里指定json类型
|
|
for (MonthResult monthResult : monthResultList) {
|
|
for (MonthResult monthResult : monthResultList) {
|
|
- // todo
|
|
|
|
String id = new SimpleDateFormat("yyyyMMddHHmmssSSS").format(new Date()) + UUID.randomUUID().toString();
|
|
String id = new SimpleDateFormat("yyyyMMddHHmmssSSS").format(new Date()) + UUID.randomUUID().toString();
|
|
-// if (monthResult.getVehicleid().equals("浙K9C262_0") || monthResult.getVehicleid().equals("苏B061F3_0")) {
|
|
|
|
-// id = "test-202107260500";
|
|
|
|
-// } else {
|
|
|
|
-// id = new SimpleDateFormat("yyyyMMddHHmmssSSS").format(new Date()) + UUID.randomUUID().toString();
|
|
|
|
-// }
|
|
|
|
IndexRequest indexRequest = new IndexRequest(indexName).id(id);
|
|
IndexRequest indexRequest = new IndexRequest(indexName).id(id);
|
|
indexRequest.source(JSON.toJSONString(monthResult), XContentType.JSON);
|
|
indexRequest.source(JSON.toJSONString(monthResult), XContentType.JSON);
|
|
bulkRequest.add(indexRequest);
|
|
bulkRequest.add(indexRequest);
|
|
monthResult.setId(id);
|
|
monthResult.setId(id);
|
|
}
|
|
}
|
|
|
|
|
|
-
|
|
|
|
try {
|
|
try {
|
|
// esRestClient.bulk()
|
|
// esRestClient.bulk()
|
|
//执行保存 index同步 indexAsync异步
|
|
//执行保存 index同步 indexAsync异步
|
|
BulkResponse response = esRestClient.bulk(bulkRequest, RequestOptions.DEFAULT);
|
|
BulkResponse response = esRestClient.bulk(bulkRequest, RequestOptions.DEFAULT);
|
|
|
|
+ log.info("推送结果:{}", !response.hasFailures());
|
|
// 判断发送es是否存在失败
|
|
// 判断发送es是否存在失败
|
|
if (response.hasFailures()) {
|
|
if (response.hasFailures()) {
|
|
|
|
+ log.error("pushEsError-推送失败条数:{}", response.getItems().length);
|
|
|
|
+ log.error("pushEsError-推送失败条数:{}", response.getItems()[0].getFailureMessage());
|
|
|
|
+ log.error("pushEsError-推送失败条数:{}", response.getItems()[0].getFailure());
|
|
// 将发送es失败数据的id存入集合
|
|
// 将发送es失败数据的id存入集合
|
|
while (response.iterator().hasNext()) {
|
|
while (response.iterator().hasNext()) {
|
|
BulkItemResponse bulkItemResponse = response.iterator().next();
|
|
BulkItemResponse bulkItemResponse = response.iterator().next();
|
|
@@ -90,13 +86,12 @@ public class RecordStorageServiceImpl implements RecordStorageService {
|
|
|
|
|
|
}
|
|
}
|
|
}
|
|
}
|
|
- System.out.println(response.toString());
|
|
|
|
|
|
+
|
|
} catch (Exception e) {
|
|
} catch (Exception e) {
|
|
- e.printStackTrace();
|
|
|
|
|
|
+ log.error("pushEsException:", e);
|
|
}
|
|
}
|
|
|
|
|
|
// 存在发送失败数据,将发送失败数据找出并返回
|
|
// 存在发送失败数据,将发送失败数据找出并返回
|
|
- // todo 返回失败数据集合 待测试
|
|
|
|
if (errorIds.size() != 0) {
|
|
if (errorIds.size() != 0) {
|
|
Map<String, MonthResult> tempMonthMap = new HashMap<>();
|
|
Map<String, MonthResult> tempMonthMap = new HashMap<>();
|
|
List<MonthResult> errorList = new ArrayList<>();
|
|
List<MonthResult> errorList = new ArrayList<>();
|