Elasticsearch Java API Client
Spring Boot 默认会引入 Elasticsearch High Level REST Client(旧版客户端)。7.17.9 是最后一个兼容 Spring Boot 2.x 的版本。
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>hmall</artifactId>
<groupId>com.heima</groupId>
<version>1.0.0</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>item-service</artifactId>
<properties>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<!-- 统一 Elasticsearch 相关版本 -->
<elasticsearch.version>7.17.9</elasticsearch.version>
</properties>
<dependencies>
<!-- common -->
<dependency>
<groupId>com.heima</groupId>
<artifactId>hm-common</artifactId>
<version>1.0.0</version>
</dependency>
<!-- web -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- 数据库 -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
<!-- mybatis -->
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
</dependency>
<!-- 单元测试 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
<!-- nacos -->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-alibaba-nacos-discovery</artifactId>
<version>2.2.0.RELEASE</version>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-bootstrap</artifactId>
</dependency>
<!-- 添加并使用兼容版本的 Elasticsearch 依赖 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-elasticsearch</artifactId>
<exclusions>
<!-- 排除冲突的旧版客户端 -->
<exclusion>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>co.elastic.clients</groupId>
<artifactId>elasticsearch-java</artifactId>
<version>${elasticsearch.version}</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-client</artifactId>
<version>${elasticsearch.version}</version>
</dependency>
<!-- 以下依赖保持不变 -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
<groupId>org.glassfish</groupId>
<artifactId>jakarta.json</artifactId>
<version>2.0.1</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents.client5</groupId>
<artifactId>httpclient5</artifactId>
</dependency>
</dependencies>
<build>
<finalName>${project.artifactId}</finalName>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
索引操作
package com.hmall.item.es;
import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch._types.ElasticsearchException;
import co.elastic.clients.elasticsearch._types.mapping.*;
import co.elastic.clients.elasticsearch.indices.*;
import co.elastic.clients.transport.ElasticsearchTransport;
import co.elastic.clients.transport.rest_client.RestClientTransport;
import co.elastic.clients.json.jackson.JacksonJsonpMapper;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.HttpHost;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.junit.jupiter.api.*;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
// 使用elasticsearch-java 8.18.2
public class IndexJACTest {
private static final String indexName = "items";
private ElasticsearchClient client;
private RestClient restClient;
@BeforeEach
public void init() {
// 设置认证
BasicCredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(
AuthScope.ANY,
new UsernamePasswordCredentials("elastic", "***")
);
RestClientBuilder builder = RestClient.builder(new HttpHost("rizx.dpdns.org", 9200, "http"))
.setHttpClientConfigCallback(httpClientBuilder -> httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider));
restClient = builder.build();
// 创建 transport 和 client
ElasticsearchTransport transport = new RestClientTransport(restClient, new JacksonJsonpMapper());
client = new ElasticsearchClient(transport);
}
@AfterEach
public void close() throws IOException {
if (restClient != null) {
restClient.close();
}
}
@Test
public void testClient() throws IOException {
System.out.println("Client connected: " + (client != null));
}
@Test
public void createIndex() throws IOException {
try {
CreateIndexRequest request = new CreateIndexRequest.Builder()
.index(indexName)
.mappings(m -> m
.properties("id", p -> p.keyword(k -> k))
.properties("name", p -> p.text(t -> t.analyzer("ik_max_word")))
.properties("price", p -> p.integer(i -> i))
.properties("stock", p -> p.integer(i -> i))
.properties("image", p -> p.keyword(k -> k.index(false)))
.properties("category", p -> p.keyword(k -> k))
.properties("brand", p -> p.keyword(k -> k))
.properties("sold", p -> p.integer(i -> i))
.properties("commentCount", p -> p.integer(i -> i))
.properties("isAD", p -> p.boolean_(b -> b))
.properties("updateTime", p -> p.date(d -> d))
)
.build();
CreateIndexResponse response = client.indices().create(request);
if (response.acknowledged()) {
System.out.println("索引创建成功!");
} else {
System.out.println("索引创建未被确认!");
}
} catch (Exception e) {
System.err.println("索引创建失败: " + e.getMessage());
e.printStackTrace();
}
}
// 判断索引库是否存在
@Test
public void isIndexExist() throws IOException, ElasticsearchException {
// 使用 ExistsRequest 构建请求
ExistsRequest existsRequest = ExistsRequest.of(b -> b.index(indexName));
System.out.println(client.indices().exists(existsRequest).value() ? "Exists" : "Not Exists");
}
// 删除索引库
@Test
public void deleteIndex() throws IOException, ElasticsearchException {
try {
DeleteIndexResponse response = client.indices().delete(DeleteIndexRequest.of(b -> b.index(indexName)));
System.out.println(response.acknowledged() ? "删除成功":"删除失败");
} catch (Exception e) {
System.err.println("删除失败:" + e.getMessage());
throw new RuntimeException(e);
}
}
}
文档操作
package com.hmall.item.es;
import cn.hutool.json.JSONUtil;
import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch._types.ElasticsearchException;
import co.elastic.clients.elasticsearch._types.Refresh;
import co.elastic.clients.elasticsearch._types.mapping.*;
import co.elastic.clients.elasticsearch.core.*;
import co.elastic.clients.elasticsearch.core.bulk.BulkOperation;
import co.elastic.clients.elasticsearch.core.bulk.CreateOperation;
import co.elastic.clients.elasticsearch.indices.*;
import co.elastic.clients.json.JsonData;
import co.elastic.clients.transport.ElasticsearchTransport;
import co.elastic.clients.transport.rest_client.RestClientTransport;
import co.elastic.clients.json.jackson.JacksonJsonpMapper;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.hmall.common.utils.BeanUtils;
import com.hmall.item.domain.po.Item;
import com.hmall.item.domain.po.ItemDoc;
import com.hmall.item.service.IItemService;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.HttpHost;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.junit.jupiter.api.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import java.io.IOException;
import java.io.StringReader;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
// 使用elasticsearch-java 8.18.2
// 如果有多个环境可以切换查询,可以通过Properties设置
//@SpringBootTest(properties = "spring.profiles.active=local")
@SpringBootTest // 启动整个 Spring Boot 应用,包括所有配置类、Bean 依赖(如 Service、Repository、Controller 等),确保测试环境与生产环境一致.支持自动注入(@Autowired)被测试的组件
public class DocumentJACTest {
private static final String indexName = "items";
private ElasticsearchClient client;
private RestClient restClient;
@Autowired
private IItemService itemService;
@BeforeEach
public void init() {
// 设置认证
BasicCredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(
AuthScope.ANY,
new UsernamePasswordCredentials("elastic", "****")
);
RestClientBuilder builder = RestClient.builder(new HttpHost("rizx.dpdns.org", 9200, "http"))
.setHttpClientConfigCallback(httpClientBuilder -> httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider));
restClient = builder.build();
// 创建 transport 和 client
ElasticsearchTransport transport = new RestClientTransport(restClient, new JacksonJsonpMapper());
client = new ElasticsearchClient(transport);
}
@AfterEach
public void close() throws IOException {
if (restClient != null) {
restClient.close();
}
}
/**
* 根据商品id查询Mysql数据库中商品,将改商品保存到ElasticSearch
*/ @Test
public void testCreateDocument() throws IOException {
System.out.println(itemService);
// 1. 根据商品id查询mysql 数据库中的商品
Item item = itemService.getById(577967);
// 2. 将item转为ElasticSearch可接受的ItemDoc
ItemDoc itemDoc = BeanUtils.copyBean(item, ItemDoc.class);
// 3. 创建索引请求 (新API)
String jsonStr = JSONUtil.toJsonStr(itemDoc);
IndexRequest<ItemDoc> request = IndexRequest.of(
b -> b.index(indexName)
.id(itemDoc.getId().toString())
// .document(itemDoc) // 直接传入Java对象,自动序列化
.withJson(new StringReader(jsonStr)) // 使用手动序列化的JSON
);
// 4. 发送请求
IndexResponse response = client.index(request);
System.out.println("文档ID: " + response.id() + " 版本: " + response.version());
}
// 查询文档
@Test
public void testGetDocument() throws IOException {
System.out.println("test get document");
GetRequest getRequest = new GetRequest.Builder()
.index(indexName) // 设置索引名
.id("577967") // 设置文档 ID .build();
System.out.println(getRequest.toString());
GetResponse<ItemDoc> response = client.get(getRequest, ItemDoc.class);
ItemDoc itemDoc = response.source();
System.out.println(JSONUtil.toJsonStr(itemDoc));
// GetResponse<ItemDoc> response = client.get(b -> b
// .index(indexName)
// .id("577967"), ItemDoc.class
// );
//
// ItemDoc itemDoc = response.source();
// System.out.println(JSONUtil.toJsonStr(itemDoc));
}
// 更新文档
@Test
public void testUpdateDocument() throws IOException {
ItemDoc partialUpdate = new ItemDoc();
partialUpdate.setName("i love jbl very much");
partialUpdate.setPrice(10000);
client.update(b -> b
.index(indexName)
.id("577967")
.doc(partialUpdate), // 直接传入对象
ItemDoc.class
);
}
// 删除文档
@Test
public void testDeleteDocument() throws IOException {
// DeleteIndexRequest request = new DeleteIndexRequest.Builder().index(indexName, "11").build();
// client.delete(request, RequestOptions.DEFAULT);
client.delete(b -> b.index(indexName).id("577967"));
}
// 批量导入商品到es
@Test
public void testImportItem() throws IOException {
int pageNo = 1, pageSize = 1000;
while (true) {
// 1、根据页号、页大小(1000);每次查询1000条数据
System.out.println("正在导入第 " + pageNo + " 页数据");
Page<Item> page = itemService.lambdaQuery().eq(Item::getStatus, 1).page(new Page<>(pageNo, pageSize));
List<Item> itemList = page.getRecords();
if (itemList.isEmpty()) break;
// 2. 转换为ItemDoc并构建批量操作
List<BulkOperation> operations = itemList.stream()
.map(item -> {
ItemDoc itemDoc = BeanUtils.copyProperties(item, ItemDoc.class);
return BulkOperation.of(op -> op
.create(CreateOperation.of(c -> c
.index(indexName)
.id(itemDoc.getId().toString())
.document(JsonData.of(itemDoc))
))
);
})
.collect(Collectors.toList());
// 3. 构建批量请求(优化刷新策略)
BulkRequest bulkRequest = BulkRequest.of(b -> b
.operations(operations)
.refresh(Refresh.WaitFor)
);
// 4. 执行批量操作并检查错误
System.out.println(bulkRequest.toString()); // BulkRequest: POST /_bulk?refresh=wait_for [{"create":{"_id":"317578","_index":"items"}},{"create":{"_id":"317580","_index":"items"}},{"create":{"_id":"546872","_index":"items"}}]
BulkResponse response = client.bulk(bulkRequest);
if (response.errors()) {
response.items().stream()
.filter(item -> item.error() != null)
.forEach(item -> System.err.println("ID " + item.id() + " 错误: " + item.error().reason()));
}
System.out.println("----------------第" + pageNo + "页完成(耗时: " + response.took() + "ms)----------------");
++pageNo;
}
}
}