- 首先需要再application.properties中配置elasticsearch的相关属性:
#-----------------elasticsearch相关的配置------------------
es.host=192.168.100.102
es.port=9200
es.scheme=http
es.time-out=60000
es.max.batch=10000
- EsUtils.java
package com.innovamed.amo.common.log.operation.es.server.utils;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import javax.annotation.PostConstruct;
import org.apache.http.HttpHost;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.client.indices.CreateIndexResponse;
import org.elasticsearch.client.indices.GetIndexRequest;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import com.alibaba.fastjson.JSON;
import com.innovamed.amo.common.log.operation.es.server.enums.EsIndexSchemaEnums;
import com.innovamed.amo.common.log.operation.es.server.model.EsEntity;
/***
*
* Copyright: Copyright (c) 2019 Jun_Zhou
*
* @ClassName: EsUtils.java
* @Description: es操作相关的枚举类;
*
* @version: v1.0.0
* @author: JunZhou
* @Email: 1769676159@qq.com
* @Site: CERNO
* @date: 2019年9月6日 上午9:32:44
*/
@SuppressWarnings("rawtypes")
//@Component
public class EsUtils
{
@Value("${es.host}")
public String host;
@Value("${es.port}")
public int port;
@Value("${es.scheme}")
public String scheme;
@Value("${es.time-out}")
public String timeOut;
@Value("${es.max.batch}")
public String maxBatchOpNum;
public static final String INDEX_NAME = EsIndexSchemaEnums.AMO_OPERATION_LOG_INDEX.getIndexName();
public static final String INDEX_SCHEMA = EsIndexSchemaEnums.AMO_OPERATION_LOG_INDEX.getIndexSchema();
public static RestHighLevelClient client = null;
@PostConstruct
public void init()
{
try {
if (client != null)
{
client.close();
}
initIndex();
} catch (Exception e) {
e.printStackTrace();
System.exit(0);
}
}
/***
* 初始化索引;
* @throws Exception
* @throws IOException
*/
public void initIndex() throws Exception, IOException
{
HttpHost httpHost = new HttpHost(host, port, scheme);
client = new RestHighLevelClient(RestClient.builder(httpHost));
if (this.indexExist(INDEX_NAME))
{
return;
}
CreateIndexRequest request = new CreateIndexRequest(INDEX_NAME);
request.settings(Settings.builder().put("index.number_of_shards", 3).put("index.number_of_replicas", 2));
request.mapping(INDEX_SCHEMA, XContentType.JSON);
CreateIndexResponse res = client.indices().create(request, RequestOptions.DEFAULT);
if (!res.isAcknowledged())
{
throw new RuntimeException("初始化失败");
}
}
/**
* @Des: 判断某个index是否存在
* @param index: es的索引名称;
* @return boolean
* @throws Exception
*/
public boolean indexExist(String index) throws Exception {
GetIndexRequest request = new GetIndexRequest(index);
request.local(false);
request.humanReadable(true);
request.includeDefaults(false);
return client.indices().exists(request, RequestOptions.DEFAULT);
}
/**
* @Des: 插入/更新一条记录
* @param index: 索引名称;
* @param entity: es数据实体;
*/
public void insertOrUpdateOne(String index, EsEntity entity)
{
IndexRequest request = new IndexRequest(index);
request.id(entity.getId());
request.source(JSON.toJSONString(entity.getData()), XContentType.JSON);
try {
client.index(request, RequestOptions.DEFAULT);
} catch (Exception e)
{
throw new RuntimeException(e);
}
}
/**
* @Des: 批量插入数据
* @param index: index名称;
* @param list: 待插入的数据列表;
*/
public void insertBatch(String index, List<EsEntity> list) {
BulkRequest request = new BulkRequest();
list.forEach(item -> request.add(new IndexRequest(index).id(item.getId())
.source(JSON.toJSONString(item.getData()), XContentType.JSON)));
try
{
client.bulk(request, RequestOptions.DEFAULT);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
/**
* @Des: 批量删除
* @param index: 索引名称;
* @param delSet: 待删除的数据集合;
*/
public <T> void deleteBatch(String index, Collection<T> delSet) {
BulkRequest request = new BulkRequest();
delSet.forEach(item -> request.add(new DeleteRequest(index, item.toString())));
try {
client.bulk(request, RequestOptions.DEFAULT);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
/**
* @Des: 搜索指定索引中的指定数据;
* @param index: 指定的索引名称;
* @param builder: 查询参数
* @param c: 结果类对象的class对象;
* @return java.util.ArrayList
*/
public <T> List<T> search(String index, SearchSourceBuilder builder, Class<T> c) {
SearchRequest request = new SearchRequest(index);
request.source(builder);
try {
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
SearchHit[] hits = response.getHits().getHits();
List<T> res = new ArrayList<>(hits.length);
for (SearchHit hit : hits)
{
res.add(JSON.parseObject(hit.getSourceAsString(), c));
}
return res;
} catch (Exception e)
{
throw new RuntimeException(e);
}
}
/**
* @Des: 删除指定的index
* @param index:索引名称;
*/
public void deleteIndex(String index)
{
try
{
client.indices().delete(new DeleteIndexRequest(index), RequestOptions.DEFAULT);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
/**
* @Des: 根据条件删除数据;
* @param index: 指定的索引名称;
* @param builder: 条件构造器;
*/
public void deleteByQuery(String index, QueryBuilder builder)
{
DeleteByQueryRequest request = new DeleteByQueryRequest(index);
request.setQuery(builder);
//设置批量操作数量,最大为10000
request.setBatchSize(Integer.parseInt(maxBatchOpNum));
request.setConflicts("proceed");
try {
client.deleteByQuery(request, RequestOptions.DEFAULT);
} catch (Exception e)
{
throw new RuntimeException(e);
}
}
}
- AmoOperationLogController.java
package com.innovamed.amo.common.log.operation.es.server.controller;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.TermQueryBuilder;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.DeleteMapping;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PutMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import com.innovamed.amo.common.log.operation.es.client.annotation.AccessLog;
import com.innovamed.amo.common.log.operation.es.client.model.OperationLog;
import com.innovamed.amo.common.log.operation.es.server.constants.OpTypeConstants;
import com.innovamed.amo.common.log.operation.es.server.model.EsEntity;
import com.innovamed.amo.common.log.operation.es.server.utils.EsUtils;
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.lang.UUID;
/***
*
* Copyright: Copyright (c) 2019 Jun_Zhou
*
* @ClassName: AmoOperationLogController.java
* @Description: Amo项目的用户行为日志操作控制器;
*
* @version: v1.0.0
* @author: JunZhou
* @Email: 1769676159@qq.com
* @Site: CERNO
* @date: 2019年9月6日 上午8:57:56
*/
//@RestController
@RequestMapping("/amo_operation_log")
public class AmoOperationLogController
{
@Autowired
private EsUtils esUtil;
/**
* @Des: 根据记录的Id获取某一个索引下的文档记录;
* @param: uuid
* @return
*/
@GetMapping("/{uuid}")
public OperationLog getById(@PathVariable("uuid") int uuid) {
SearchSourceBuilder builder = new SearchSourceBuilder();
builder.query(new TermQueryBuilder("uuid", uuid));
List<OperationLog> res = esUtil.search(EsUtils.INDEX_NAME, builder, OperationLog.class);
if (res.size() > 0) {
return res.get(0);
} else {
return null;
}
}
/**
* @Des: 获取指定索引下的全部文档记录;
* @return
*/
@GetMapping("/")
public List<OperationLog> getAll() {
return esUtil.search(EsUtils.INDEX_NAME, new SearchSourceBuilder(), OperationLog.class);
}
/***
* @Des: 根据关键信息搜索指定索引下的文档;
* @param uuid: 文档的uuid;
* @param operator: 操作者;
* @return
*/
@GetMapping("/search")
public List<OperationLog> searchByUserIdAndName(@RequestParam("uuid") String uuid,@RequestParam("operator") String operator) {
BoolQueryBuilder boolQueryBuilder = new BoolQueryBuilder();
//boolQueryBuilder.must(QueryBuilders.termQuery("uuid", uuid));
boolQueryBuilder.must(QueryBuilders.matchQuery("operator", operator));
SearchSourceBuilder builder = new SearchSourceBuilder();
builder.size(10).query(boolQueryBuilder);
return esUtil.search(EsUtils.INDEX_NAME, builder, OperationLog.class);
}
/***
* @Des: 根据关键词模糊搜索某用户的行为日志;
* @param: operationLog:操作日志实体;
* @return
*/
@GetMapping("/fuzzyQueryByCondition")
public List<OperationLog> searchByCondition(@RequestBody OperationLog operationLog) {
String uuid = operationLog.getUuid();
String operator = operationLog.getOperator();
BoolQueryBuilder boolQueryBuilder = new BoolQueryBuilder();
boolQueryBuilder.must(QueryBuilders.termQuery("uuid", uuid));
boolQueryBuilder.must(QueryBuilders.fuzzyQuery("operator", operator));
SearchSourceBuilder builder = new SearchSourceBuilder();
builder.size(10).query(boolQueryBuilder);
return esUtil.search(EsUtils.INDEX_NAME, builder, OperationLog.class);
}
/***
* @Des: 单个插入用户的操作日志;
* @param: operationLog:用户行为日志实体;
*/
@PutMapping("/")
public void putOne(@RequestBody OperationLog operationLog) {
EsEntity<OperationLog> entity = new EsEntity<OperationLog>(operationLog.getUuid(), operationLog);
esUtil.insertOrUpdateOne(EsUtils.INDEX_NAME, entity);
}
/***
* @Des: 批量插入用户的操作日志;
* @param operationLogSet: 用户行为日志的集合;
*/
@SuppressWarnings("rawtypes")
@PutMapping("/many")
@AccessLog(type=OpTypeConstants.ADD,des="批量插入用户的操作日志")
public void putList(@RequestBody List<OperationLog> operationLogSet) {
List<OperationLog> logSet = new ArrayList<OperationLog>();
for (int i = 0; i < 1; i++)
{
Date date = new Date();
OperationLog opLog = new OperationLog();
opLog.setUuid(UUID.fastUUID().toString());
opLog.setOperationType("DEL");
opLog.setOperationDes("Del by Id");
opLog.setOperator("JunZhou");
opLog.setReferIp("127.0.0.1");
opLog.setReferMac("0000::0000::0000");
opLog.setCreateTime(DateUtil.formatDateTime(date));
opLog.setOperateTime(DateUtil.formatDateTime(date));
opLog.setOperateResult("SUCCESS");
opLog.setCreator("LOGGER");
logSet.add(opLog);
}
operationLogSet = logSet;
List<EsEntity> list = new ArrayList<EsEntity>();
operationLogSet.forEach(item -> list.add(new EsEntity<OperationLog>(item.getUuid(), item)));
esUtil.insertBatch(EsUtils.INDEX_NAME, list);
}
/***
* @Des: 批量删除指定索引下的文档数据;
* @param: list
*/
@DeleteMapping("/deleteBatch")
public void deleteBatch(List<Integer> list) {
esUtil.deleteBatch(EsUtils.INDEX_NAME, list);
}
/***
* @Des: 根据操作用户的信息删除指定索引下的用户行为日志的文档;
* @param: operator
*/
@DeleteMapping("/operator/{operator}")
public void deleteByUserId(@PathVariable("operator") int operator) {
esUtil.deleteByQuery(EsUtils.INDEX_NAME, new TermQueryBuilder("operator", operator));
}
}
- EsIndexConstants.java
package com.innovamed.amo.common.log.operation.es.server.constants;
/***
*
* Copyright: Copyright (c) 2019 Jun_Zhou
*
* @ClassName: IndexConstants.java
* @Description: es索引的默认常量集合;
*
* @version: v1.0.0
* @author: JunZhou
* @Email: 1769676159@qq.com
* @Site: CERNO
* @date: 2019年9月6日 上午9:24:30
*/
public class EsIndexConstants
{
/**
* amo用户行为日志记录的es索引结构描述json结构;
*/
public static final String AMO_OPERATION_LOG_INDEX =
"{\r\n" +
" \"properties\":{\r\n" +
" \"referMac\": {\"type\":\"text\"},\r\n" +
" \"creator\": {\r\n" +
" \"type\":\"text\",\r\n" +
" \"index\":true,\r\n" +
" \"analyzer\":\"ik_max_word\",\r\n" +
" \"search_analyzer\":\"ik_smart\"\r\n" +
" },\r\n" +
" \"operationDes\": {\r\n" +
" \"type\":\"text\",\r\n" +
" \"index\":true,\r\n" +
" \"analyzer\":\"ik_max_word\",\r\n" +
" \"search_analyzer\":\"ik_smart\"\r\n" +
" },\r\n" +
" \"operateResult\": {\"type\":\"text\",\r\n" +
" \"index\":true\r\n" +
" },\r\n" +
" \"createTime\": {\"type\":\"date\",\r\n" +
" \"index\":true,\r\n" +
" \"format\":\"yyyy-MM-dd HH:mm:ss || yyyy-MM-dd || yyyy/MM/dd HH:mm:ss|| yyyy/MM/dd ||epoch_millis\"\r\n" +
" },\r\n" +
" \"operateTime\": {\"type\":\"date\",\r\n" +
" \"index\":true,\r\n" +
" \"format\":\"yyyy-MM-dd HH:mm:ss || yyyy-MM-dd || yyyy/MM/dd HH:mm:ss|| yyyy/MM/dd ||epoch_millis\"\r\n" +
" },\r\n" +
" \"operationType\": {\"type\":\"text\"},\r\n" +
" \"referIp\": {\"type\":\"text\",\r\n" +
" \"index\":true\r\n" +
" },\r\n" +
" \"uuid\": {\"type\":\"text\",\r\n" +
" \"index\":true\r\n" +
" },\r\n" +
" \"operator\": {\"type\":\"text\",\r\n" +
" \"index\":true\r\n" +
" }\r\n" +
" }\r\n" +
"}";
}
- 注意事项:
a.再elasticsearch中时间戳格式对应的type为date,定义格式如下:
"createTime": {"type":"date",
"index":true,
"format":"yyyy-MM-dd HH:mm:ss || yyyy-MM-dd || yyyy/MM/dd HH:mm:ss|| yyyy/MM/dd ||epoch_millis"
},
"operateTime": {"type":"date",
"index":true,
"format":"yyyy-MM-dd HH:mm:ss || yyyy-MM-dd || yyyy/MM/dd HH:mm:ss|| yyyy/MM/dd ||epoch_millis"
},
b.Kibanaz中的时间会比上海时间早八个小时,为了最简单的解决时间匹配问题,建议在将数据导入elasticsearch中的时候自动向后偏移8小时,如下所示:
//用户行为的发生时间,注意向后偏移8小时,否则在kibana中将显示有误;
Date date = DateUtil.offsetHour(DateUtil.date(), -8);
代码资源(代码已做保密处理):操作elasticsearch的java restful api