<#if settings.post_mathjax!false>

Elasticsearch Java API Client

admin
17
2025-07-01

Elasticsearch Java API Client

Spring Boot 默认会引入 ​​Elasticsearch High Level REST Client​​(旧版客户端)。7.17.9 是最后一个兼容 Spring Boot 2.x 的版本。

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>hmall</artifactId>
        <groupId>com.heima</groupId>
        <version>1.0.0</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>item-service</artifactId>

    <properties>
        <maven.compiler.source>11</maven.compiler.source>
        <maven.compiler.target>11</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <!-- 统一 Elasticsearch 相关版本 -->
        <elasticsearch.version>7.17.9</elasticsearch.version>
    </properties>

    <dependencies>
        <!-- common -->
        <dependency>
            <groupId>com.heima</groupId>
            <artifactId>hm-common</artifactId>
            <version>1.0.0</version>
        </dependency>
        
        <!-- web -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        
        <!-- 数据库 -->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
        </dependency>
        
        <!-- mybatis -->
        <dependency>
            <groupId>com.baomidou</groupId>
            <artifactId>mybatis-plus-boot-starter</artifactId>
        </dependency>
        
        <!-- 单元测试 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
        </dependency>
        
        <!-- nacos -->
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-alibaba-nacos-discovery</artifactId>
            <version>2.2.0.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-bootstrap</artifactId>
        </dependency>
        
        <!-- 添加并使用兼容版本的 Elasticsearch 依赖 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-elasticsearch</artifactId>
            <exclusions>
                <!-- 排除冲突的旧版客户端 -->
                <exclusion>
                    <groupId>org.elasticsearch.client</groupId>
                    <artifactId>elasticsearch-rest-high-level-client</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        
        <dependency>
            <groupId>co.elastic.clients</groupId>
            <artifactId>elasticsearch-java</artifactId>
            <version>${elasticsearch.version}</version>
        </dependency>
        
        <dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>elasticsearch-rest-client</artifactId>
            <version>${elasticsearch.version}</version>
        </dependency>
        
        <!-- 以下依赖保持不变 -->
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
        </dependency>
        <dependency>
            <groupId>org.glassfish</groupId>
            <artifactId>jakarta.json</artifactId>
            <version>2.0.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.httpcomponents.client5</groupId>
            <artifactId>httpclient5</artifactId>
        </dependency>
    </dependencies>
    
    <build>
        <finalName>${project.artifactId}</finalName>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
</project>

索引操作

package com.hmall.item.es;  
  
  
import co.elastic.clients.elasticsearch.ElasticsearchClient;  
import co.elastic.clients.elasticsearch._types.ElasticsearchException;  
import co.elastic.clients.elasticsearch._types.mapping.*;  
import co.elastic.clients.elasticsearch.indices.*;  
import co.elastic.clients.transport.ElasticsearchTransport;  
import co.elastic.clients.transport.rest_client.RestClientTransport;  
import co.elastic.clients.json.jackson.JacksonJsonpMapper;  
import org.apache.http.auth.AuthScope;  
import org.apache.http.auth.UsernamePasswordCredentials;  
import org.apache.http.impl.client.BasicCredentialsProvider;  
import org.apache.http.HttpHost;  
import org.elasticsearch.client.RestClient;  
import org.elasticsearch.client.RestClientBuilder;  
import org.junit.jupiter.api.*;  
  
import java.io.IOException;  
import java.util.HashMap;  
import java.util.Map;  
  
// 使用elasticsearch-java 8.18.2  
public class IndexJACTest {  
    private static final String indexName = "items";  
    private ElasticsearchClient client;  
    private RestClient restClient;  
  
    @BeforeEach  
    public void init() {  
        // 设置认证  
        BasicCredentialsProvider credentialsProvider = new BasicCredentialsProvider();  
        credentialsProvider.setCredentials(  
                AuthScope.ANY,  
                new UsernamePasswordCredentials("elastic", "***")  
        );  
        RestClientBuilder builder = RestClient.builder(new HttpHost("rizx.dpdns.org", 9200, "http"))  
                .setHttpClientConfigCallback(httpClientBuilder -> httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider));  
        restClient = builder.build();  
        // 创建 transport 和 client        
        ElasticsearchTransport transport = new RestClientTransport(restClient, new JacksonJsonpMapper());  
        client = new ElasticsearchClient(transport);  
    }  
  
    @AfterEach  
    public void close() throws IOException {  
        if (restClient != null) {  
            restClient.close();  
        }  
    }  
    @Test  
    public void testClient() throws IOException {  
        System.out.println("Client connected: " + (client != null));  
    }  
    @Test  
    public void createIndex() throws IOException {  
        try {  
            CreateIndexRequest request = new CreateIndexRequest.Builder()  
                    .index(indexName)  
                    .mappings(m -> m  
                            .properties("id", p -> p.keyword(k -> k))  
                            .properties("name", p -> p.text(t -> t.analyzer("ik_max_word")))  
                            .properties("price", p -> p.integer(i -> i))  
                            .properties("stock", p -> p.integer(i -> i))  
                            .properties("image", p -> p.keyword(k -> k.index(false)))  
                            .properties("category", p -> p.keyword(k -> k))  
                            .properties("brand", p -> p.keyword(k -> k))  
                            .properties("sold", p -> p.integer(i -> i))  
                            .properties("commentCount", p -> p.integer(i -> i))  
                            .properties("isAD", p -> p.boolean_(b -> b))  
                            .properties("updateTime", p -> p.date(d -> d))  
                    )  
                    .build();  
  
            CreateIndexResponse response = client.indices().create(request);  
            if (response.acknowledged()) {  
                System.out.println("索引创建成功!");  
            } else {  
                System.out.println("索引创建未被确认!");  
            }  
        } catch (Exception e) {  
            System.err.println("索引创建失败: " + e.getMessage());  
            e.printStackTrace();  
        }  
    }  
  
    // 判断索引库是否存在  
    @Test  
    public void isIndexExist() throws IOException, ElasticsearchException {  
        // 使用 ExistsRequest 构建请求  
        ExistsRequest existsRequest = ExistsRequest.of(b -> b.index(indexName));  
        System.out.println(client.indices().exists(existsRequest).value() ? "Exists" : "Not Exists");  
    }  
  
    // 删除索引库  
    @Test  
    public void deleteIndex() throws IOException, ElasticsearchException {  
        try {  
            DeleteIndexResponse response = client.indices().delete(DeleteIndexRequest.of(b -> b.index(indexName)));  
            System.out.println(response.acknowledged() ? "删除成功":"删除失败");  
        } catch (Exception e) {  
            System.err.println("删除失败:" + e.getMessage());  
            throw new RuntimeException(e);  
        }  
    }  
}

文档操作

package com.hmall.item.es;  
  
  
import cn.hutool.json.JSONUtil;  
import co.elastic.clients.elasticsearch.ElasticsearchClient;  
import co.elastic.clients.elasticsearch._types.ElasticsearchException;  
import co.elastic.clients.elasticsearch._types.Refresh;  
import co.elastic.clients.elasticsearch._types.mapping.*;  
import co.elastic.clients.elasticsearch.core.*;  
import co.elastic.clients.elasticsearch.core.bulk.BulkOperation;  
import co.elastic.clients.elasticsearch.core.bulk.CreateOperation;  
import co.elastic.clients.elasticsearch.indices.*;  
import co.elastic.clients.json.JsonData;  
import co.elastic.clients.transport.ElasticsearchTransport;  
import co.elastic.clients.transport.rest_client.RestClientTransport;  
import co.elastic.clients.json.jackson.JacksonJsonpMapper;  
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;  
import com.hmall.common.utils.BeanUtils;  
import com.hmall.item.domain.po.Item;  
import com.hmall.item.domain.po.ItemDoc;  
import com.hmall.item.service.IItemService;  
import org.apache.http.auth.AuthScope;  
import org.apache.http.auth.UsernamePasswordCredentials;  
import org.apache.http.impl.client.BasicCredentialsProvider;  
import org.apache.http.HttpHost;  
import org.elasticsearch.client.Request;  
import org.elasticsearch.client.RequestOptions;  
import org.elasticsearch.client.RestClient;  
import org.elasticsearch.client.RestClientBuilder;  
import org.junit.jupiter.api.*;  
import org.springframework.beans.factory.annotation.Autowired;  
import org.springframework.boot.test.context.SpringBootTest;  
  
import java.io.IOException;  
import java.io.StringReader;  
import java.util.HashMap;  
import java.util.List;  
import java.util.Map;  
import java.util.stream.Collectors;  
  
// 使用elasticsearch-java 8.18.2  
// 如果有多个环境可以切换查询,可以通过Properties设置  
//@SpringBootTest(properties = "spring.profiles.active=local")  
@SpringBootTest // 启动整个 Spring Boot 应用,包括所有配置类、Bean 依赖(如 Service、Repository、Controller 等),确保测试环境与生产环境一致.支持自动注入(@Autowired)被测试的组件  
public class DocumentJACTest {  
    private static final String indexName = "items";  
    private ElasticsearchClient client;  
    private RestClient restClient;  
  
    @Autowired  
    private IItemService itemService;  
  
    @BeforeEach  
    public void init() {  
        // 设置认证  
        BasicCredentialsProvider credentialsProvider = new BasicCredentialsProvider();  
        credentialsProvider.setCredentials(  
                AuthScope.ANY,  
                new UsernamePasswordCredentials("elastic", "****")  
        );  
        RestClientBuilder builder = RestClient.builder(new HttpHost("rizx.dpdns.org", 9200, "http"))  
                .setHttpClientConfigCallback(httpClientBuilder -> httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider));  
        restClient = builder.build();  
        // 创建 transport 和 client        
        ElasticsearchTransport transport = new RestClientTransport(restClient, new JacksonJsonpMapper());  
        client = new ElasticsearchClient(transport);  
    }  
  
    @AfterEach  
    public void close() throws IOException {  
        if (restClient != null) {  
            restClient.close();  
        }  
    }  
  
    /**  
     * 根据商品id查询Mysql数据库中商品,将改商品保存到ElasticSearch  
     */    @Test  
    public void testCreateDocument() throws IOException {  
        System.out.println(itemService);  
        // 1. 根据商品id查询mysql 数据库中的商品  
        Item item = itemService.getById(577967);  
        // 2. 将item转为ElasticSearch可接受的ItemDoc  
        ItemDoc itemDoc = BeanUtils.copyBean(item, ItemDoc.class);  
        // 3. 创建索引请求 (新API)  
        String jsonStr = JSONUtil.toJsonStr(itemDoc);  
        IndexRequest<ItemDoc> request = IndexRequest.of(  
                b -> b.index(indexName)  
                                .id(itemDoc.getId().toString())  
                                // .document(itemDoc)  // 直接传入Java对象,自动序列化  
                                .withJson(new StringReader(jsonStr))    // 使用手动序列化的JSON  
        );  
  
        // 4. 发送请求  
        IndexResponse response = client.index(request);  
        System.out.println("文档ID: " + response.id() + " 版本: " + response.version());  
    }  
  
    // 查询文档  
    @Test  
    public void testGetDocument() throws IOException {  
        System.out.println("test get document");  
        GetRequest getRequest = new GetRequest.Builder()  
                .index(indexName)   // 设置索引名  
                .id("577967")        // 设置文档 ID                .build();  
        System.out.println(getRequest.toString());  
        GetResponse<ItemDoc> response = client.get(getRequest, ItemDoc.class);  
        ItemDoc itemDoc = response.source();  
        System.out.println(JSONUtil.toJsonStr(itemDoc));  
  
//        GetResponse<ItemDoc> response = client.get(b -> b  
//                .index(indexName)  
//                .id("577967"), ItemDoc.class  
//        );  
//  
//        ItemDoc itemDoc = response.source();  
//        System.out.println(JSONUtil.toJsonStr(itemDoc));  
    }  
  
    // 更新文档  
    @Test  
    public void testUpdateDocument() throws IOException {  
        ItemDoc partialUpdate = new ItemDoc();  
        partialUpdate.setName("i love jbl very much");  
        partialUpdate.setPrice(10000);  
  
        client.update(b -> b  
                        .index(indexName)  
                        .id("577967")  
                        .doc(partialUpdate), // 直接传入对象  
                ItemDoc.class  
        );  
    }  
  
    // 删除文档  
    @Test  
    public void testDeleteDocument() throws IOException {  
//        DeleteIndexRequest request = new DeleteIndexRequest.Builder().index(indexName, "11").build();  
//        client.delete(request, RequestOptions.DEFAULT);  
        client.delete(b -> b.index(indexName).id("577967"));  
    }  
  
  
    // 批量导入商品到es  
    @Test  
    public void testImportItem() throws IOException {  
        int pageNo = 1, pageSize = 1000;  
        while (true) {  
            // 1、根据页号、页大小(1000);每次查询1000条数据  
            System.out.println("正在导入第 " + pageNo + " 页数据");  
            Page<Item> page = itemService.lambdaQuery().eq(Item::getStatus, 1).page(new Page<>(pageNo, pageSize));  
            List<Item> itemList = page.getRecords();  
            if (itemList.isEmpty()) break;  
            // 2. 转换为ItemDoc并构建批量操作  
            List<BulkOperation> operations = itemList.stream()  
                    .map(item -> {  
                        ItemDoc itemDoc = BeanUtils.copyProperties(item, ItemDoc.class);  
                        return BulkOperation.of(op -> op  
                                .create(CreateOperation.of(c -> c  
                                        .index(indexName)  
                                        .id(itemDoc.getId().toString())  
                                        .document(JsonData.of(itemDoc))  
                                ))  
                        );  
                    })  
                    .collect(Collectors.toList());  
            // 3. 构建批量请求(优化刷新策略)  
            BulkRequest bulkRequest = BulkRequest.of(b -> b  
                    .operations(operations)  
                    .refresh(Refresh.WaitFor)  
            );  
            // 4. 执行批量操作并检查错误  
            System.out.println(bulkRequest.toString()); // BulkRequest: POST /_bulk?refresh=wait_for [{"create":{"_id":"317578","_index":"items"}},{"create":{"_id":"317580","_index":"items"}},{"create":{"_id":"546872","_index":"items"}}]  
  
            BulkResponse response = client.bulk(bulkRequest);  
            if (response.errors()) {  
                response.items().stream()  
                        .filter(item -> item.error() != null)  
                        .forEach(item -> System.err.println("ID " + item.id() + " 错误: " + item.error().reason()));  
            }  
            System.out.println("----------------第" + pageNo + "页完成(耗时: " + response.took() + "ms)----------------");  
  
            ++pageNo;  
        }  
    }  
}
动物装饰