Spring Boot集成ElasticSearch7
环境
我的开发环境如下:
Java 8
ElasticSearch 7.13.4
Spring Boot 2.7.14
一、依赖
<!-- elasticsearch依赖 --> <dependency> <groupId>co.elastic.clients</groupId> <artifactId>elasticsearch-java</artifactId> <version>7.17.9</version> </dependency> <dependency> <groupId>org.glassfish</groupId> <artifactId>jakarta.json</artifactId> <version>2.0.1</version> </dependency>
二、项目配置
1、在application.yaml文件中添加配置
spring: elasticsearch: uris: test.xxxxxx.com:9200 #替换成实际的地址和端口 username: xxxxxx password: xxxxxx #替换成实际的账号密码 socket-timeout: 10s connection-timeout: 15s webclient: max-in-memory-size: 100MB
2、实现ElasticSearchConfig类
@Configuration
public class ElasticSearchConfig
{
@Value("${spring.elasticsearch.uris}")
private String host;
@Value("${spring.elasticsearch.username}")
private String name;
@Value("${spring.elasticsearch.password}")
private String password;
@Bean
public ElasticsearchClient getElasticsearchClient()
{
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(name, password));
var headers = Collections.singletonList(new BasicHeader(HttpHeaders.CONTENT_TYPE, ContentType.APPLICATION_JSON.toString()));
HttpHost httpHost = HttpHost.create(host);
RestClient client = RestClient
.builder(httpHost)
.setHttpClientConfigCallback(httpClientBuilder
->httpClientBuilder
.setDefaultHeaders(headers)
.addInterceptorLast((HttpResponseInterceptor) (response, context)
-> response.addHeader("X-Elastic-Product", "Elasticsearch"))
.setDefaultCredentialsProvider(credentialsProvider)
.setKeepAliveStrategy((response, context) -> 180 * 1000)
)
.build();
ElasticsearchTransport transport = new RestClientTransport(client, new JacksonJsonpMapper());
return new ElasticsearchClient(transport);
}
}三、ElasticSearch工具类
ElasticSearchHandle工具类主要封装了index和document的相关方法。
@Slf4j
@Service
public class ElasticSearchHandle{
private final ElasticsearchClient client;
public ElasticSearchHandle(ElasticsearchClient elasticsearchClient) {
this.client = elasticsearchClient;
}
//判断index是否存在
public boolean hasIndex(String indexName) throws IOException {
var exists = client.indices().exists(d -> d.index(indexName));
return exists.value();
}
//创建index
public void createIndex(String indexName) {
try {
var response = client.indices().create(c -> c.index(indexName));
} catch (IOException e) {
log.error("es create index error",e);
throw new RuntimeException("es create index error");
}
}
//删除index
public void deleteIndex(String indexName) throws IOException{
client.indices().delete(d -> d.index(indexName));
}
//插入document
public void insertDocument(String indexName, Object obj, String id){
try{
client.index(i -> i.index(indexName).id(id).document(obj));
} catch (IOException e) {
log.error("es insert document error",e);
throw new RuntimeException("es insert document error");
}
}
//删除document
public void deleteDocument(String indexName, String id) {
try {
client.delete(d -> d.index(indexName).id(id));
} catch (IOException e) {
log.error("es delete document error",e);
throw new RuntimeException("es delete document error");
}
}
//查询document
public <T> GetResponse<T> getDocument(String indexName, String id, Class<T> t) {
try {
return client.get(d -> d.index(indexName).id(id),t );
} catch (IOException e) {
log.error("es get document error",e);
throw new RuntimeException("es get document error");
}
}
//条件查询
public <T> PageData<T> searchDocumentPage(SearchRequest request, Class<T> t){
try{
var response = client.search(request, t);
var totalCount = response.hits().total() == null ? 0 : response.hits().total().value();
var data = response.hits().hits().stream().map(Hit::source).collect(Collectors.toList());
return new PageData<>(data,totalCount);
}catch (IOException e){
log.error("es search document error",e);
throw new RuntimeException("es get document error");
}
}
}条件查询时,定义返回结果:PageData
public class PageData<T>{
public List<T> data;
public long total;
public PageData(List<T> data, long total)
{
this.data = data;
this.total = total;
}
}以上,elasticSearch的准备工作就算完成了,后面只需要将ElasticSearchHandle注入到具体的service中,就可以实现具体的业务需求了。
四、查询示例
这是我通过elasticSearch实现的一个示例,包括按月份动态创建index、保存和分页查询。
@Service
public class AlarmHistoryService{
public static final String INDEX = "alarm_history_";
private final ElasticSearchHandle elasticSearchHandle;
public AlarmHistoryService(ElasticSearchHandle elasticSearchHandle) {
this.elasticSearchHandle = elasticSearchHandle;
}
//按月份动态建立index,示例:alarm_history_2024-08
public String getIndexName(){
var now = Instant.now();
return INDEX + now.toString().substring(0,7);
}
//创建动态index
public String createIndex() throws IOException {
String indexName = getIndexName();
if(!elasticSearchHandle.hasIndex(indexName)){
elasticSearchHandle.createIndex(indexName);
}
return indexName;
}
//保存
public void commit(AlarmHistory alarmHistory) throws IOException {
var indexName = createIndex();
elasticSearchHandle.insertDocument(indexName, alarmHistory, alarmHistory.getId());
}
//按条件过滤
public PageData<AlarmHistory> queryPage(Integer level, Date startTime, Date endTime, Integer pageIndex, Integer pageSize){
List<Query> mustQueries = new ArrayList<>();
if(level != null){
mustQueries.add(Query.of(q -> q.term(t -> t.field("level").value(level))));
}
if(startTime != null && endTime != null){
mustQueries.add(Query.of(q -> q.range(r -> r.field("alarmTime").gt(JsonData.of(startTime)).lt(JsonData.of(endTime)))));
}
//这里是查询所有alarm_history_开头的index,也可以替换成指定的index。
//对结果按"alarmTime"倒叙排序,并分页
SearchRequest request = SearchRequest.of(r -> r.index(AlarmHistoryService.INDEX + "*")
.query(q -> q.bool(b -> b.must(mustQueries)))
.sort(so -> so.field(f -> f.field("alarmTime").order(SortOrder.Desc)))
.from(pageIndex)
.size(pageSize)
);
return elasticSearchHandle.searchDocumentPage(request, AlarmHistory.class);
}
}點擊查看更多內容
為 TA 點贊
評論
評論
共同學習,寫下你的評論
評論加載中...
作者其他優質文章
正在加載中
感謝您的支持,我會繼續努力的~
掃碼打賞,你說多少就多少
贊賞金額會直接到老師賬戶
支付方式
打開微信掃一掃,即可進行掃碼打賞哦