elk使用爬坑记录 3 之 SpringBoot 中 Elasticsearch 相关API的使用

  1. 首先需要再application.properties中配置elasticsearch的相关属性:
#-----------------elasticsearch相关的配置------------------
es.host=192.168.100.102
es.port=9200
es.scheme=http
es.time-out=60000
es.max.batch=10000
  1. EsUtils.java
package com.innovamed.amo.common.log.operation.es.server.utils;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;

import javax.annotation.PostConstruct;

import org.apache.http.HttpHost;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.client.indices.CreateIndexResponse;
import org.elasticsearch.client.indices.GetIndexRequest;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import com.alibaba.fastjson.JSON;
import com.innovamed.amo.common.log.operation.es.server.enums.EsIndexSchemaEnums;
import com.innovamed.amo.common.log.operation.es.server.model.EsEntity;

/***
* 
* Copyright: Copyright (c) 2019 Jun_Zhou
* 
* @ClassName: EsUtils.java
* @Description: es操作相关的枚举类;
* 
* @version: v1.0.0
* @author: JunZhou
* @Email: 1769676159@qq.com
* @Site: CERNO
* @date: 2019年9月6日 上午9:32:44
*/
@SuppressWarnings("rawtypes")
//@Component
public class EsUtils
{
    @Value("${es.host}")
    public String host;
    @Value("${es.port}")
    public int port;
    @Value("${es.scheme}")
    public String scheme;
    @Value("${es.time-out}")
    public String timeOut;
    @Value("${es.max.batch}")
    public String maxBatchOpNum;

    public static final String    INDEX_NAME = EsIndexSchemaEnums.AMO_OPERATION_LOG_INDEX.getIndexName();
    public static final String  INDEX_SCHEMA = EsIndexSchemaEnums.AMO_OPERATION_LOG_INDEX.getIndexSchema();
    public static RestHighLevelClient client = null;


    @PostConstruct
    public void init() 
    {
        try {
                if (client != null) 
                {
                    client.close();
                }
                    initIndex();
            } catch (Exception e) {
                    e.printStackTrace();
                    System.exit(0);
        }
    }

    /***
           * 初始化索引;
     * @throws Exception
     * @throws IOException
     */
    public void initIndex() throws Exception, IOException
    {
        HttpHost httpHost = new HttpHost(host, port, scheme);
                 client   = new RestHighLevelClient(RestClient.builder(httpHost));
        if (this.indexExist(INDEX_NAME)) 
        {
            return;
        }
        CreateIndexRequest request = new CreateIndexRequest(INDEX_NAME);
                           request.settings(Settings.builder().put("index.number_of_shards", 3).put("index.number_of_replicas", 2));
                           request.mapping(INDEX_SCHEMA, XContentType.JSON);
        CreateIndexResponse    res = client.indices().create(request, RequestOptions.DEFAULT);
        if (!res.isAcknowledged()) 
        {
            throw new RuntimeException("初始化失败");
        }
    }

    /**
     * @Des:         判断某个index是否存在
     * @param index: es的索引名称;
     * @return       boolean
     * @throws       Exception
     */
    public boolean indexExist(String index) throws Exception {
        GetIndexRequest request = new GetIndexRequest(index);
                        request.local(false);
                        request.humanReadable(true);
                        request.includeDefaults(false);
            return client.indices().exists(request, RequestOptions.DEFAULT);
    }

    /**
     * @Des:          插入/更新一条记录
     * @param index:  索引名称;
     * @param entity: es数据实体;
     */
    public void insertOrUpdateOne(String index, EsEntity entity) 
    {
        IndexRequest request = new IndexRequest(index);
                     request.id(entity.getId());
                     request.source(JSON.toJSONString(entity.getData()), XContentType.JSON);
        try {
                     client.index(request, RequestOptions.DEFAULT);
            } catch (Exception e) 
            {
                   throw new RuntimeException(e);
            }
     }

    /**
     * @Des:         批量插入数据
     * @param index: index名称;
     * @param list:  待插入的数据列表;
     */
    public void insertBatch(String index, List<EsEntity> list) {
        BulkRequest request = new BulkRequest();
        list.forEach(item -> request.add(new IndexRequest(index).id(item.getId())
                .source(JSON.toJSONString(item.getData()), XContentType.JSON)));
        try 
        {
                    client.bulk(request, RequestOptions.DEFAULT);
        } catch (Exception e) {
                throw new RuntimeException(e);
        }
    }

    /**
     * @Des:          批量删除
     * @param index:  索引名称;
     * @param delSet: 待删除的数据集合;
     */
    public <T> void deleteBatch(String index, Collection<T> delSet) {
        BulkRequest request = new BulkRequest();
        delSet.forEach(item -> request.add(new DeleteRequest(index, item.toString())));
        try {
            client.bulk(request, RequestOptions.DEFAULT);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * @Des:             搜索指定索引中的指定数据;
     * @param index:    指定的索引名称;
     * @param builder:    查询参数
     * @param c:        结果类对象的class对象;
     * @return java.util.ArrayList
     */
    public <T> List<T> search(String index, SearchSourceBuilder builder, Class<T> c) {
        SearchRequest request = new SearchRequest(index);
        request.source(builder);
        try {
            SearchResponse        response  =     client.search(request, RequestOptions.DEFAULT);
            SearchHit[]             hits  =     response.getHits().getHits();
            List<T> res = new ArrayList<>(hits.length);
            for (SearchHit hit : hits) 
                {
                    res.add(JSON.parseObject(hit.getSourceAsString(), c));
                }
                return res;
            } catch (Exception e) 
            {
                throw new RuntimeException(e);
            }
    }

    /**
     * @Des:        删除指定的index
     * @param index:索引名称;
     */
    public void deleteIndex(String index) 
    {
        try 
        {
            client.indices().delete(new DeleteIndexRequest(index), RequestOptions.DEFAULT);
        } catch (Exception e) {
               throw new RuntimeException(e);
        }
    }

    /**
     * @Des:             根据条件删除数据;
     * @param index:    指定的索引名称;
     * @param builder:    条件构造器;
     */
    public void deleteByQuery(String index, QueryBuilder builder) 
    {
        DeleteByQueryRequest request = new DeleteByQueryRequest(index);
        request.setQuery(builder);
        //设置批量操作数量,最大为10000
        request.setBatchSize(Integer.parseInt(maxBatchOpNum));
        request.setConflicts("proceed");
        try {
                client.deleteByQuery(request, RequestOptions.DEFAULT);
            } catch (Exception e) 
            {
                throw new RuntimeException(e);
            }
    }

}

  1. AmoOperationLogController.java
package com.innovamed.amo.common.log.operation.es.server.controller;

import java.util.ArrayList;
import java.util.Date;
import java.util.List;

import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.TermQueryBuilder;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.DeleteMapping;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PutMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import com.innovamed.amo.common.log.operation.es.client.annotation.AccessLog;
import com.innovamed.amo.common.log.operation.es.client.model.OperationLog;
import com.innovamed.amo.common.log.operation.es.server.constants.OpTypeConstants;
import com.innovamed.amo.common.log.operation.es.server.model.EsEntity;
import com.innovamed.amo.common.log.operation.es.server.utils.EsUtils;

import cn.hutool.core.date.DateUtil;
import cn.hutool.core.lang.UUID;

/***
* 
* Copyright: Copyright (c) 2019 Jun_Zhou
* 
* @ClassName: AmoOperationLogController.java
* @Description: Amo项目的用户行为日志操作控制器;
* 
* @version: v1.0.0
* @author: JunZhou
* @Email: 1769676159@qq.com
* @Site: CERNO
* @date: 2019年9月6日 上午8:57:56
*/
//@RestController
@RequestMapping("/amo_operation_log")
public class AmoOperationLogController
{
    @Autowired
    private EsUtils esUtil;

    /**
     * @Des:   根据记录的Id获取某一个索引下的文档记录;
     * @param: uuid
     * @return
     */
    @GetMapping("/{uuid}")
    public OperationLog getById(@PathVariable("uuid") int uuid) {
        SearchSourceBuilder builder =     new SearchSourceBuilder();
                            builder.query(new TermQueryBuilder("uuid", uuid));
        List<OperationLog>             res     =     esUtil.search(EsUtils.INDEX_NAME, builder, OperationLog.class);
                            if (res.size() > 0) {
                                return res.get(0);
                            } else {
            return null;
                            }
    }

    /**
     * @Des:    获取指定索引下的全部文档记录;
     * @return
     */
    @GetMapping("/")
    public List<OperationLog> getAll() {
        return esUtil.search(EsUtils.INDEX_NAME, new SearchSourceBuilder(), OperationLog.class);
    }

    /***
     * @Des:            根据关键信息搜索指定索引下的文档;
     * @param uuid:        文档的uuid;
     * @param operator:    操作者;
     * @return
     */
    @GetMapping("/search")
    public List<OperationLog> searchByUserIdAndName(@RequestParam("uuid") String uuid,@RequestParam("operator")  String operator) {
        BoolQueryBuilder     boolQueryBuilder = new BoolQueryBuilder();
                             //boolQueryBuilder.must(QueryBuilders.termQuery("uuid", uuid));
                             boolQueryBuilder.must(QueryBuilders.matchQuery("operator", operator));
        SearchSourceBuilder builder = new SearchSourceBuilder();
                            builder.size(10).query(boolQueryBuilder);
        return esUtil.search(EsUtils.INDEX_NAME, builder, OperationLog.class);

    }


    /***
     * @Des:    根据关键词模糊搜索某用户的行为日志;
     * @param:  operationLog:操作日志实体;
     * @return
     */
    @GetMapping("/fuzzyQueryByCondition")
    public List<OperationLog> searchByCondition(@RequestBody OperationLog operationLog) {
        String uuid = operationLog.getUuid();
        String operator = operationLog.getOperator();
        BoolQueryBuilder     boolQueryBuilder = new BoolQueryBuilder();
                             boolQueryBuilder.must(QueryBuilders.termQuery("uuid", uuid));
                            boolQueryBuilder.must(QueryBuilders.fuzzyQuery("operator", operator));
        SearchSourceBuilder builder = new SearchSourceBuilder();
                            builder.size(10).query(boolQueryBuilder);
        return esUtil.search(EsUtils.INDEX_NAME, builder, OperationLog.class);
    }

    /***
     * @Des:    单个插入用户的操作日志;
     * @param:  operationLog:用户行为日志实体;
     */
    @PutMapping("/")
    public void putOne(@RequestBody OperationLog operationLog) {
        EsEntity<OperationLog> entity = new EsEntity<OperationLog>(operationLog.getUuid(), operationLog);
        esUtil.insertOrUpdateOne(EsUtils.INDEX_NAME, entity);
    }

    /***
     * @Des:                    批量插入用户的操作日志;
     * @param operationLogSet:  用户行为日志的集合;
     */
    @SuppressWarnings("rawtypes")
    @PutMapping("/many")
    @AccessLog(type=OpTypeConstants.ADD,des="批量插入用户的操作日志")
    public void putList(@RequestBody List<OperationLog> operationLogSet) {
        List<OperationLog>  logSet  =  new ArrayList<OperationLog>();
                    for (int i = 0; i < 1; i++)
                    {
                        Date date = new Date();
                        OperationLog opLog            =     new OperationLog();
                                     opLog.setUuid(UUID.fastUUID().toString());
                                     opLog.setOperationType("DEL");
                                     opLog.setOperationDes("Del by Id");
                                     opLog.setOperator("JunZhou");
                                     opLog.setReferIp("127.0.0.1");
                                     opLog.setReferMac("0000::0000::0000");
                                     opLog.setCreateTime(DateUtil.formatDateTime(date));
                                     opLog.setOperateTime(DateUtil.formatDateTime(date));
                                     opLog.setOperateResult("SUCCESS");
                                     opLog.setCreator("LOGGER");
                                     logSet.add(opLog);
                    }
                    operationLogSet = logSet;
        List<EsEntity> list  =  new ArrayList<EsEntity>();
        operationLogSet.forEach(item -> list.add(new EsEntity<OperationLog>(item.getUuid(), item)));
        esUtil.insertBatch(EsUtils.INDEX_NAME, list);
    }

    /***
     * @Des:   批量删除指定索引下的文档数据;
     * @param: list
     */
    @DeleteMapping("/deleteBatch")
    public void deleteBatch(List<Integer> list) {
        esUtil.deleteBatch(EsUtils.INDEX_NAME, list);
    }

    /***
     * @Des:    根据操作用户的信息删除指定索引下的用户行为日志的文档;
     * @param:  operator
     */
    @DeleteMapping("/operator/{operator}")
    public void deleteByUserId(@PathVariable("operator") int operator) {
        esUtil.deleteByQuery(EsUtils.INDEX_NAME, new TermQueryBuilder("operator", operator));
    }

}

  1. EsIndexConstants.java
package com.innovamed.amo.common.log.operation.es.server.constants;
/***
* 
* Copyright: Copyright (c) 2019 Jun_Zhou
* 
* @ClassName: IndexConstants.java
* @Description: es索引的默认常量集合;
* 
* @version: v1.0.0
* @author: JunZhou
* @Email: 1769676159@qq.com
* @Site: CERNO
* @date: 2019年9月6日 上午9:24:30
 */
public class EsIndexConstants
{   
    /**
     * amo用户行为日志记录的es索引结构描述json结构;
     */
    public static final String AMO_OPERATION_LOG_INDEX = 
            "{\r\n" + 
            "   \"properties\":{\r\n" + 
            "    \"referMac\":            {\"type\":\"text\"},\r\n" + 
            "    \"creator\":             {\r\n" + 
            "                        \"type\":\"text\",\r\n" + 
            "                        \"index\":true,\r\n" + 
            "                        \"analyzer\":\"ik_max_word\",\r\n" + 
            "                        \"search_analyzer\":\"ik_smart\"\r\n" + 
            "                        },\r\n" + 
            "    \"operationDes\":     {\r\n" + 
            "                        \"type\":\"text\",\r\n" + 
            "                        \"index\":true,\r\n" + 
            "                        \"analyzer\":\"ik_max_word\",\r\n" + 
            "                        \"search_analyzer\":\"ik_smart\"\r\n" + 
            "                        },\r\n" + 
            "    \"operateResult\":    {\"type\":\"text\",\r\n" + 
            "                         \"index\":true\r\n" + 
            "                        },\r\n" + 
            "    \"createTime\":         {\"type\":\"date\",\r\n" + 
            "                         \"index\":true,\r\n" + 
            "                         \"format\":\"yyyy-MM-dd HH:mm:ss || yyyy-MM-dd || yyyy/MM/dd HH:mm:ss|| yyyy/MM/dd ||epoch_millis\"\r\n" + 
            "                                    },\r\n" + 
            "    \"operateTime\":         {\"type\":\"date\",\r\n" + 
            "                         \"index\":true,\r\n" + 
            "                         \"format\":\"yyyy-MM-dd HH:mm:ss || yyyy-MM-dd || yyyy/MM/dd HH:mm:ss|| yyyy/MM/dd ||epoch_millis\"\r\n" + 
            "                                    },\r\n" + 
            "    \"operationType\":     {\"type\":\"text\"},\r\n" + 
            "    \"referIp\":             {\"type\":\"text\",\r\n" + 
            "                         \"index\":true\r\n" + 
            "                        },\r\n" + 
            "    \"uuid\":             {\"type\":\"text\",\r\n" + 
            "                         \"index\":true\r\n" + 
            "                         },\r\n" + 
            "    \"operator\":         {\"type\":\"text\",\r\n" + 
            "                         \"index\":true\r\n" + 
            "                        }\r\n" + 
            "            }\r\n" + 
            "}";
}

  1. 注意事项:
    a.再elasticsearch中时间戳格式对应的type为date,定义格式如下:
"createTime":         {"type":"date",
                         "index":true,
                         "format":"yyyy-MM-dd HH:mm:ss || yyyy-MM-dd || yyyy/MM/dd HH:mm:ss|| yyyy/MM/dd ||epoch_millis"
                                    },
    "operateTime":         {"type":"date",
                         "index":true,
                         "format":"yyyy-MM-dd HH:mm:ss || yyyy-MM-dd || yyyy/MM/dd HH:mm:ss|| yyyy/MM/dd ||epoch_millis"
                                    },

b.Kibanaz中的时间会比上海时间早八个小时,为了最简单的解决时间匹配问题,建议在将数据导入elasticsearch中的时候自动向后偏移8小时,如下所示:

//用户行为的发生时间,注意向后偏移8小时,否则在kibana中将显示有误;
Date date = DateUtil.offsetHour(DateUtil.date(), -8);

代码资源(代码已做保密处理):操作elasticsearch的java restful api


Previous
elk使用爬坑记录 2 之 SpringBoot 集成 Elasticsearch elk使用爬坑记录 2 之 SpringBoot 集成 Elasticsearch
目前最新版的ElasticSearch为7.3.1,SpringBoot2.x版本还没有即时的更新,其父级依赖还是<elasticsearch.version>5.6.10</elasticsearch.version&g
2019-09-07
Next
elk使用爬坑记录 1 之 Elasticsearch和Kibana的安装 elk使用爬坑记录 1 之 Elasticsearch和Kibana的安装
背景: 最近公司要做用户行为日志的可视化展示,因为Elasticsearch对数据的可视化展示提供了初步支撑所以决定使用Elasticsearch。个人理解: Elasticsearch相当于提供数据存储功能的应用系
2019-09-07