day07 Elasticsearch search engine 3

day07 Elasticsearch search engine 3

1. Data aggregation

aggregations It allows us to realize the statistics, analysis and calculation of document data very conveniently. E.g:

  • What brand of mobile phone is the most popular?
  • The average price, the highest price, the lowest price of these phones?
  • How are these phones selling monthly?

It is much more convenient to implement these statistical functions than the sql of the database, and the query speed is very fast, which can realize real-time search effect.

1.1. Classification of aggregation

There are three common types of aggregation:

  • Bucket aggregation: used to group documents
    • TermAggregation: group by document field value, such as group by brand value, group by country
    • Date Histogram: Group by date ladder, such as a week as a group, or a month as a group, a quarter as a group, etc.

  • Metric aggregation: used to calculate some values, such as: maximum value, minimum value, average value, etc.
    • Avg: Average
    • Max: find the maximum value
    • Min: Find the minimum value
    • Stats: Simultaneously find max, min, avg, sum, etc.
  • Pipeline (pipeline) aggregation: aggregation based on the results of other aggregations

Note: The fields participating in the aggregation must be keyword, date, value, and Boolean types, that is, the aggregated fields must not be word-segmented

DSL for Aggregation

Now, we want to count the hotel brands in all the data. In fact, we group the data according to the brand. At this point, aggregation can be done based on the name of the hotel brand, that is, Bucket aggregation.

Bucket aggregation syntax

Basic syntax:

GET /hotel/_search
{
  "size": 0,  // Set size to 0, the result does not contain documents, only aggregate results
  "aggs": { // define aggregation
    "brandAgg": { // give the aggregation a name
      "terms": { // The type of aggregation, aggregated according to the brand value, so choose term
        "field": "brand", // Fields involved in aggregation
        "size": 20 // The number of aggregation results you want to get
      }
    }
  }
}

Example:

# aggregation function
GET /hotel/_search
{
  "size": 0,
  "aggs": {
    "brandAgg": {
      "terms": {
        "field": "brand",
        "size": 20
      }
    }
  }
}

Aggregation result sorting

By default, Bucket aggregation will count the number of documents in the Bucket, recorded as _count*, * and sorted in descending order of *_*count.

We can specify the order attribute to customize the ordering of aggregates:

GET /hotel/_search
{
  "size": 0, 
  "aggs": {
    "brandAgg": {
      "terms": {
        "field": "brand",
        "order": {
          "_count": "asc" // Sort by _count ascending
        },
        "size": 20
      }
    }
  }
}

Example:

# Aggregation function, custom collation
GET /hotel/_search
{
  "size": 0,
  "aggs": {
    "brandAgg": {
      "terms": {
        "field": "brand",
        "size": 20,
        "order": {
          "_count": "asc"
        }
      }
    }
  }
}

Limit aggregation scope

By default, Bucket aggregation aggregates all documents in the index library, but in real scenarios, users will enter search conditions, so aggregation is aggregation of search results, and then aggregation must add restrictions.

We can limit the range of documents to be aggregated, just add query conditions:

GET /hotel/_search
{
  "query": {
    "range": {
      "price": {
        "lte": 200 // Only aggregate documents below 200 yuan
      }
    }
  }, 
  "size": 0, 
  "aggs": {
    "brandAgg": {
      "terms": {
        "field": "brand",
        "size": 20
      }
    }
  }
}

Example:

# Aggregation function, limit the scope of aggregation
GET /hotel/_search
{
  "query": {
    "range": {
      "price": {
        "lte": 200
      }
    }
  }, 
  "size": 0,
  "aggs": {
    "brandAgg": {
      "terms": {
        "field": "brand",
        "size": 20
      }
    }
  }
}

Metric Aggregation Syntax

Above we grouped hotels by brand to form buckets. Now we need to do calculations on the hotels in the bucket to obtain the min, max, avg and other values ​​of the user ratings of each brand.

This requires the use of Metric aggregation, such as stats aggregation: you can get min, max, avg and other results

Basic syntax:

GET /hotel/_search
{
  "size": 0, 
  "aggs": {
    "brandAgg": { 
      "terms": { 
        "field": "brand", 
        "size": 20
      },
      "aggs": { // It is a sub-aggregation of brands aggregation, that is, it is calculated separately for each group after grouping
        "score_stats": { // aggregate name
          "stats": { // Aggregation type, where stats can calculate min, max, avg, etc.
            "field": "score" // Aggregate field, here is score
          }
        }
      }
    }
  }
}

Example:

# Nested aggregation metric s
GET /hotel/_search
{
  "size": 0,
  "aggs": {
    "brandAgg": {
      "terms": {
        "field": "brand",
        "size": 20,
        "order": {
          "scoreAgg.avg": "desc"
        }
      },
      "aggs": {
        "scoreAgg": {
          "stats": {
            "field": "score"
          }
        }
      }
    }
  }
}

The score_stats aggregation this time is a sub-aggregation nested inside the brandAgg aggregation, so we need to calculate it separately for each bucket.

In addition, we can also sort the aggregation results, for example, according to the average hotel score of each bucket:

1.3. RestAPI implements aggregation

API syntax

Aggregation conditions are at the same level as query conditions, so request.source() needs to be used to specify aggregation conditions.

Syntax for aggregate conditions:

The aggregation result is also different from the query result, and the API is also special. However, JSON is also parsed layer by layer:

Example:

		@Test
    void testAggregation() throws IOException {
        // 1. Create a Request object
        SearchRequest request = new SearchRequest("hotel");
        // 2. Prepare DSL
        // 2.1, set the size
        request.source().size(0);
        // 2.1. Aggregation
        request.source().aggregation(AggregationBuilders
                // aggregate name
                .terms("brandAgg")
                // aggregated fields
                .field("brand")
                // number of aggregations
                .size(10)
        );

        // 3. Send request
        SearchResponse response = client.search(request, RequestOptions.DEFAULT);

        // 4. Parsing the aggregation results
        Aggregations aggregations = response.getAggregations();
        // 4.1. Obtain the aggregation result according to the aggregation name
        Terms brandTerms = aggregations.get("brandAgg");
        // 4.2. Get buckets
        List<? extends Terms.Bucket> buckets = brandTerms.getBuckets();
        // 4.3, traverse each bucket
        for (Terms.Bucket bucket : buckets) {
            // 4.4. Get the key
            String key = bucket.getKeyAsString();
            System.out.println("key = " + key);
        }
    }

Business needs

Requirement: The brand, city and other information of the search page should not be hard-coded on the page, but obtained by aggregated hotel data in the index database:

analyze:

At present, the city list, star list, and brand list on the page are all hard-coded, and will not change as the search results change. But when the user's search conditions change, the search results will change accordingly.

For example, if a user searches for "Oriental Pearl", the searched hotel must be near the Shanghai Oriental Pearl Tower. Therefore, the city can only be Shanghai. At this time, Beijing, Shenzhen, and Hangzhou should not be displayed in the city list.

That is to say, which cities are included in the search results, which cities should be listed on the page; which brands are included in the search results, which brands should be listed on the page.

How do I know which brands are included in my search results? How do I know which cities are included in my search results?

Use the aggregation function and use Bucket aggregation to group the documents in the search results based on brand and city, and you can know which brands and cities are included.

Because the search results are aggregated, the aggregation is a limited-range aggregation, that is to say, the limiting conditions of the aggregation are consistent with the conditions of the search document.

Looking at the browser, we can find that the front end has actually sent such a request:

The request parameters are exactly the same as those for searching documents.

The return value type is the final result to be displayed on the page:

The result is a Map structure:

  • key is a string, city, star, brand, price
  • value is a collection, such as the names of multiple cities

business realization

Add a method to HotelController under the cn.itcast.hotel.web package and follow the requirements below:

  • Request method: POST
  • Request path: /hotel/filters
  • Request parameters: RequestParams, consistent with the parameters of the search document
  • Return value type: Map<String, List<String>>

code:

		@PostMapping("/filters")
    public Map<String, List<String>> getFilters(@RequestBody RequestParams params) {
        return hotelService.filters(params);
    }

The getFilters method in IHotelService is called here, which has not been implemented yet.

Define a new method in cn.itcast.hotel.service.IHotelService:

Map<String, List<String>> filters(RequestParams params);

Implement this method in cn.itcast.hotel.service.impl.HotelService:

		/**
     * Query aggregate results of cities, star ratings, and brands
     *
     * @param params
     * @return Aggregation result, format: {"city": ["Shanghai", "Beijing"], "brand": ["Home Inn", "Hilton"]}
     */
    @Override
    public Map<String, List<String>> filters(RequestParams params) {
        try {
            // 1. Create a Request object
            SearchRequest request = new SearchRequest("hotel");
            // 2. Prepare DSL
            // 2.1,query
            buildBasicQuery(params, request);
            // 2.2, set the size
            request.source().size(0);
            // 2.1. Aggregation
            buildAggregation(request);
            // 3. Send request
            SearchResponse response = client.search(request, RequestOptions.DEFAULT);
            // 4. Parsing the aggregation results
            Map<String, List<String>> result = new HashMap<>();
            Aggregations aggregations = response.getAggregations();
            // 4.1. According to the brand name, get the brand result
            List<String> brandList = getAggByName(aggregations, "brandAgg");
            result.put("brand", brandList);
            // 4.2. According to the city name, get the city result
            List<String> cityList = getAggByName(aggregations, "cityAgg");
            result.put("City", cityList);
            // 4.3. According to the star name, get the star result
            List<String> starList = getAggByName(aggregations, "starAgg");
            result.put("the star", starList);
            return result;
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    private List<String> getAggByName(Aggregations aggregations, String aggName) {
        // 1. Obtain the aggregation result according to the aggregation name
        Terms brandTerms = aggregations.get(aggName);
        // 2. Get buckets
        List<? extends Terms.Bucket> buckets = brandTerms.getBuckets();
        // 3. Traverse each bucket
        List<String> brandList = new ArrayList<>();
        for (Terms.Bucket bucket : buckets) {
            // 4. Get key
            String key = bucket.getKeyAsString();
            brandList.add(key);
        }
        return brandList;
    }

    private void buildAggregation(SearchRequest request) {
        // Aggregate the brands
        request.source().aggregation(AggregationBuilders
                // aggregate name
                .terms("brandAgg")
                // aggregated fields
                .field("brand")
                // number of aggregations
                .size(100));
        // Aggregate cities
        request.source().aggregation(AggregationBuilders
                // aggregate name
                .terms("cityAgg")
                // aggregated fields
                .field("city")
                // number of aggregations
                .size(100));
        // Aggregate star ratings
        request.source().aggregation(AggregationBuilders
                // aggregate name
                .terms("starAgg")
                // aggregated fields
                .field("starName")
                // number of aggregations
                .size(100));
    }

    private void buildBasicQuery(RequestParams params, SearchRequest request) {
        // 1. Build BooleanQuery
        BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
        // keyword search
        String key = params.getKey();
        if (key == null || "".equals(key)) {
            boolQuery.must(QueryBuilders.matchAllQuery());
        } else {
            boolQuery.must(QueryBuilders.matchQuery("all", key));
        }
        // city ​​conditions
        if (params.getCity() != null && !params.getCity().equals("")) {
            boolQuery.filter(QueryBuilders.termQuery("city", params.getCity()));
        }
        // Brand conditions
        if (params.getBrand() != null && !params.getBrand().equals("")) {
            boolQuery.filter(QueryBuilders.termQuery("brand", params.getBrand()));
        }
        // star condition
        if (params.getStarName() != null && !params.getStarName().equals("")) {
            boolQuery.filter(QueryBuilders.termQuery("starName", params.getStarName()));
        }
        // price
        if (params.getMinPrice() != null && params.getMaxPrice() != null) {
            boolQuery.filter(QueryBuilders.rangeQuery("price").gte(params.getMinPrice()).lte(params.getMaxPrice()));
        }

        // 2. Score control
        FunctionScoreQueryBuilder functionScoreQuery =
                // Build function_socre query
                QueryBuilders.functionScoreQuery(
                        // Original query, query with correlation score
                        boolQuery,
                        // array of function socre
                        new FunctionScoreQueryBuilder.FilterFunctionBuilder[]{
                                // One of the function socre elements
                                new FunctionScoreQueryBuilder.FilterFunctionBuilder(
                                        // filter condition
                                        QueryBuilders.termQuery("isAD", true),
                                        // Scoring function
                                        ScoreFunctionBuilders.weightFactorFunction(10))});
        // 3. Put in source
        request.source().query(functionScoreQuery);
    }

2. Auto-completion

When the user enters a character in the search box, we should prompt the search item related to the character, as shown in the figure:

This function of prompting complete entries based on the letters entered by the user is automatic completion.

Because it needs to be inferred based on the pinyin letters, the pinyin word segmentation function is used.

2.1. Pinyin tokenizer

To achieve automatic completion based on letters, the document must be segmented according to pinyin. There happens to be a pinyin word segmentation plugin for elasticsearch on GitHub. address: https://github.com/medcl/elasticsearch-analysis-pinyin

The installation method is the same as the IK tokenizer, which is divided into three steps:

1. Unzip

2. Upload to the virtual machine, the plugin directory of elasticsearch

/var/lib/docker/volumes/es-plugins/_data

3. Restart elasticsearch

4. Test

POST /_analyze
{
  "text": ["Home Inn is not bad"],
  "analyzer": "pinyin"
}

2.2, custom tokenizer

The default pinyin word breaker divides each Chinese character into pinyin, but we want each entry to form a set of pinyin, so we need to customize the pinyin word breaker to form a custom word breaker.

The composition of the analyzer in elasticsearch consists of three parts:

  • character filters: process the text before the tokenizer. e.g. delete characters, replace characters
  • tokenizer: Cut the text into term s according to certain rules. For example, keyword is not participle; there is also ik_smart
  • tokenizer filter: further process the entries output by the tokenizer. For example, case conversion, synonyms processing, pinyin processing, etc.

When document word segmentation, the document will be processed by these three parts in turn:

We can configure a custom analyzer (word breaker) through settings when creating an index library:

PUT /test // Create a test index library
{
  "settings": {
    "analysis": {
      "analyzer": { // custom tokenizer
        "my_analyzer": {  // tokenizer name
          "tokenizer": "ik_max_word",
          "filter": "py"
        }
      },
      "filter": { // Custom tokenizer filter
        "py": { // filter name
          "type": "pinyin", // Filter type, here is pinyin
		  		"keep_full_pinyin": false, // Cancel the pinyin of a single word, for example: Andy Lau -> [liu,de,hua]
          "keep_joined_full_pinyin": true, // Plus full spell function, for example: Andy Lau -> [liudehua]
          "keep_original": true, // Keep Chinese
          "limit_first_letter_length": 16,
          "remove_duplicated_term": true,
          "none_chinese_pinyin_tokenize": false
        }
      }
    }
  }
}

Example:

# Custom pinyin tokenizer
PUT /test
{
  "settings": {
    "analysis": {
      "analyzer": { 
        "my_analyzer": { 
          "tokenizer": "ik_max_word",
          "filter": "py"
        }
      },
      "filter": {
        "py": { 
          "type": "pinyin",
          "keep_full_pinyin": false,
          "keep_joined_full_pinyin": true,
          "keep_original": true,
          "limit_first_letter_length": 16,
          "remove_duplicated_term": true,
          "none_chinese_pinyin_tokenize": false
        }
      }
    }
  },
  "mappings": {
    "properties": {
      "name": {
        "type": "text",
        "analyzer": "my_analyzer"
      }
    }
  }
}

test:

POST /test/_analyze
{
  "text": ["Home Inn is not bad"],
  "analyzer": "my_analyzer"
}

Note: In order to avoid searching for homophones, do not use the pinyin word breaker when searching

The pinyin word breaker is suitable for use when creating an inverted index, but it cannot be used for searching.

# insert data
POST /test/_doc/1
{
  "id": 1,
  "name": "lion"
}
POST /test/_doc/2
{
  "id": 2,
  "name": "lice"
}
# search for the keyword
GET /test/_search
{
  "query": {
    "match": {
      "name": "What to do if you fall into the lion cage"
    }
  }
}

Therefore, the field should use the my_analyzer tokenizer when creating an inverted index; the field should use the ik_smart tokenizer when searching

# Delete the test index library
DELETE /test

# custom tokenizer
PUT /test
{
  "settings": {
    "analysis": {
      "analyzer": { 
        "my_analyzer": { 
          "tokenizer": "ik_max_word",
          "filter": "py"
        }
      },
      "filter": {
        "py": { 
          "type": "pinyin",
          "keep_full_pinyin": false,
          "keep_joined_full_pinyin": true,
          "keep_original": true,
          "limit_first_letter_length": 16,
          "remove_duplicated_term": true,
          "none_chinese_pinyin_tokenize": false
        }
      }
    }
  },
  "mappings": {
    "properties": {
      "name": {
        "type": "text",
        "analyzer": "my_analyzer",
        "search_analyzer": "ik_smart"
      }
    }
  }
}

Test again:

2.3. Auto-complete query

elasticsearch provides Completion Suggester query to implement autocompletion. This query will match terms beginning with the user input and return them. To improve the efficiency of autocompletion queries, there are some constraints on the types of fields in documents:

  • Fields participating in the completion query must be of type completion.
  • The content of the field is generally an array formed by multiple entries for completion.

For example, an index library like this:

# Auto-completion index library
PUT test2
{
  "mappings": {
    "properties": {
      "title":{
        "type": "completion"
      }
    }
  }
}

Then insert the following data:

# sample data
POST test2/_doc
{
  "title": ["Sony", "WH-1000XM3"]
}
POST test2/_doc
{
  "title": ["SK-II", "PITERA"]
}
POST test2/_doc
{
  "title": ["Nintendo", "switch"]
}

The query syntax is as follows:

// autocomplete query
GET /test/_search
{
  "suggest": {
    "title_suggest": { // give the query a name
      "text": "s", // keywords entered by the user
      "completion": { // Types of autocompletion
        "field": "title", // Fields to complete the query
        "skip_duplicates": true, // skip duplicate
        "size": 10 // Get the first 10 results
      }
    }
  }
}

Example:

# autocomplete query
GET /test2/_search
{
  "suggest": {
    "titleSuggest": {
      "text": "s",
      "completion": {
        "field": "title",
        "skip_duplicates": true,
        "size": 10
      }
    }
  }
}

2.4. Realize automatic completion of hotel search box

Now, our hotel index library has not set up a pinyin word breaker, and we need to modify the configuration in the index library. But we know that the index library cannot be modified, it can only be deleted and then recreated.

In addition, we need to add a field for auto-completion, and put brand, suggestion, city, etc. into it as a prompt for auto-completion.

So, to summarize, the things we need to do include:

1. Modify the structure of the hotel index library and set a custom pinyin word breaker

2. Modify the name and all fields of the index library and use a custom tokenizer

3. Add a new field suggestion to the index library, the type is completion type, and use a custom tokenizer

4. Add a suggestion field to the HotelDoc class, which contains brand and business

5. Re-import the data to the hotel library

Modify the hotel mapping structure

# Delete Hotel Index Library
DELETE /hotel

# Hotel Index
PUT /hotel
{
  "settings": {
    "analysis": {
      "analyzer": {
        "text_anlyzer": {
          "tokenizer": "ik_max_word",
          "filter": "py"
        },
        "completion_analyzer": {
          "tokenizer": "keyword",
          "filter": "py"
        }
      },
      "filter": {
        "py": {
          "type": "pinyin",
          "keep_full_pinyin": false,
          "keep_joined_full_pinyin": true,
          "keep_original": true,
          "limit_first_letter_length": 16,
          "remove_duplicated_term": true,
          "none_chinese_pinyin_tokenize": false
        }
      }
    }
  },
  "mappings": {
    "properties": {
      "id":{
        "type": "keyword"
      },
      "name":{
        "type": "text",
        "analyzer": "text_anlyzer",
        "search_analyzer": "ik_smart",
        "copy_to": "all"
      },
      "address":{
        "type": "keyword",
        "index": false
      },
      "price":{
        "type": "integer"
      },
      "score":{
        "type": "integer"
      },
      "brand":{
        "type": "keyword",
        "copy_to": "all"
      },
      "city":{
        "type": "keyword"
      },
      "starName":{
        "type": "keyword"
      },
      "business":{
        "type": "keyword",
        "copy_to": "all"
      },
      "location":{
        "type": "geo_point"
      },
      "pic":{
        "type": "keyword",
        "index": false
      },
      "all":{
        "type": "text",
        "analyzer": "text_anlyzer",
        "search_analyzer": "ik_smart"
      },
      "suggestion":{
          "type": "completion",
          "analyzer": "completion_analyzer"
      }
    }
  }
}

Note: text_anlyzer is used for full-text retrieval and requires word segmentation (ik_max_word); while completion_analyzer is used for automatic completion and does not require word segmentation (keyword)

Look at the name and all fields here, "analyzer": "text_anlyzer" means to use text_anlyzer when creating an inverted index, "search_analyzer": "ik_smart" means to use ik_smart when searching.

Look at suggestion again, this field is used for auto-completion, its type is completion, and the tokenizer used is completion_analyzer, that is, it is directly converted into pinyin without word segmentation.

Modify the HotelDoc entity

A field needs to be added in HotelDoc for auto-completion, and the content can be hotel brand, city, business district and other information. As required for autocomplete fields, preferably an array of these fields.

So we add a suggestion field in HotelDoc, the type is List<String>, and then put the brand, city, business and other information into it.

@Data
@NoArgsConstructor
public class HotelDoc {
    private Long id;
    private String name;
    private String address;
    private Integer price;
    private Integer score;
    // brand
    private String brand;
    private String city;
    private String starName;
    // The business district where the hotel is located
    private String business;
    private String location;
    private String pic;
    // distance value when sorting
    private Object distance;
    // ad tag
    private boolean isAD;
    // autocompleted array
    private List<String> suggestion;

    // Note that these two get and set methods need to be added manually, otherwise the automatically generated method name does not contain get, which will cause the advertisement image to not be displayed normally
    public boolean getisAD() {
        return isAD;
    }

    public void setisAD(boolean AD) {
        isAD = AD;
    }

    public HotelDoc(Hotel hotel) {
        this.id = hotel.getId();
        this.name = hotel.getName();
        this.address = hotel.getAddress();
        this.price = hotel.getPrice();
        this.score = hotel.getScore();
        this.brand = hotel.getBrand();
        this.city = hotel.getCity();
        this.starName = hotel.getStarName();
        this.business = hotel.getBusiness();
        this.location = hotel.getLatitude() + ", " + hotel.getLongitude();
        this.pic = hotel.getPic();
        // Handling of autocomplete fields
        if (this.business.contains("/")) {
            // business has multiple values ​​and needs to be cut
            String[] arr = this.business.split("/");
            this.suggestion = new ArrayList<>();
            // add brand
            this.suggestion.add(this.brand);
            // Add business district
            Collections.addAll(this.suggestion, arr);
        } else {
            this.suggestion = Arrays.asList(this.brand, this.business);
        }
    }
}

Re-import data into es

Re-execute the previously written import data function:

		/**
     * Add documents in batches
     *
     * @throws IOException
     */
    @Test
    void testBulkRequest() throws IOException {
        // Query all hotel data
        List<Hotel> list = hotelService.list();

        // 1. Create a Request object
        BulkRequest request = new BulkRequest();
        // 2. Prepare parameters and add multiple new Request s
        for (Hotel hotel : list) {
            // 2.1 Conversion to document type HotelDoc
            HotelDoc hotelDoc = new HotelDoc(hotel);
            // 2.2 Convert to json
            String json = JSON.toJSONString(hotelDoc);
            // 2.3 Add request
            request.add(new IndexRequest("hotel").id(hotel.getId().toString()).source(json, XContentType.JSON));
        }

        // 3. Send request
        client.bulk(request, RequestOptions.DEFAULT);
    }

You can see that the suggestion is included in the new hotel data:

Test autocompletion

GET /hotel/_search
{
  "suggest": {
    "suggestions": {
      "text": "h",
      "completion": {
        "field": "suggestion",
        "skip_duplicates": true,
        "size": 10
      }
    }
  }
}

RestAPI realizes automatic completion

API constructed by request parameters:

The result of auto-completion is special, and the parsing code is as follows:

Autocomplete the search box

Looking at the front-end page, we can find that when we type in the input box, the front-end will initiate an ajax request:

The return value is a collection of completed entries, the type is List<String>

1. Add a new interface to HotelController under the cn.itcast.hotel.web package to receive new requests:

		@GetMapping("/suggestion")
    public List<String> getSuggestions(@RequestParam("key") String prefix) {
        return hotelService.getSuggestions(prefix);
    }

2. Add the method to IhotelService under the cn.itcast.hotel.service package:

List<String> getSuggestions(String prefix);

3. Implement this method in cn.itcast.hotel.service.impl.HotelService:

		@Override
    public List<String> getSuggestions(String prefix) {
        try {
            // 1. Create a Request object
            SearchRequest request = new SearchRequest("hotel");
            // 2. Prepare DSL
            request.source().suggest(new SuggestBuilder()
                    // Add a completion query name
                    .addSuggestion("suggestions",
                            // Autocompleted field names
                            SuggestBuilders.completionSuggestion("suggestion")
                                    // autocomplete prefix
                                    .prefix(prefix)
                                    // skip duplicate
                                    .skipDuplicates(true)
                                    // Up to 10 pieces of data can be displayed
                                    .size(10)));
            // 3. Initiate a request
            SearchResponse response = client.search(request, RequestOptions.DEFAULT);
            // 4. Parse the response result
            Suggest suggest = response.getSuggest();
            // 4.1. Obtain the completion result according to the name of the completion query
            CompletionSuggestion suggestions = suggest.getSuggestion("suggestions");
            // 4.2. Get options
            List<CompletionSuggestion.Entry.Option> options = suggestions.getOptions();
            // 4.3, traverse options
            List<String> list = new ArrayList<>(options.size());
            for (CompletionSuggestion.Entry.Option option : options) {
                // 4.4. Get the text field in an option, that is, the completed entry
                String text = option.getText().toString();
                list.add(text);
            }
            return list;
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

4. Test:

3. Data synchronization (interview focus)

The hotel data in elasticsearch comes from the mysql database, so when the mysql data changes, elasticsearch must also change accordingly, which is the data synchronization between elasticsearch and mysql
question.

3.1. Thinking analysis

There are three common data synchronization schemes:

  • synchronous call
  • asynchronous notification
  • monitor binlog

Solution 1: Synchronous call

The process is as follows:

  • hotel-demo provides external interfaces to modify data in elasticsearch
  • After the hotel management service completes the database operation, it directly calls the interface provided by hotel-demo

Advantages and disadvantages:

  • Advantages: simple to implement, rough
  • Disadvantages: high degree of business coupling

Solution 2: Asynchronous notification

The process is as follows:

  • hotel-admin sends MQ messages after adding, deleting, and modifying data in the mysql database
  • hotel-demo listens to MQ and completes the modification of elasticsearch data after receiving the message

Advantages and disadvantages:

  • Advantages: low coupling, generally difficult to implement
  • Disadvantages: depend on the reliability of mq

Solution 3: Monitor binlog

The process is as follows:

  • Enable binlog function for mysql
  • The addition, deletion, and modification of mysql will be recorded in the binlog
  • hotel-demo listens to binlog changes based on canal, and updates the content in elasticsearch in real time

Advantages and disadvantages:

  • Advantages: completely decoupling between services
  • Disadvantages: Enabling binlog will increase the burden on the database, and the implementation complexity is high

3.2. Realize data synchronization

Let's take asynchronous notification as an example and use MQ message middleware

train of thought

Use the hotel-admin project provided in the pre-course materials as a microservice for hotel management. When the hotel data is added, deleted, or modified, the same operation is required for the data in elasticsearch.

step:

  • Import the hotel-admin project provided in the pre-course materials, start and test the CRUD of hotel data
  • Declare exchange, queue, RoutingKey
  • Complete the message sending in the add, delete and modify business in hotel-admin
  • Complete message monitoring in hotel-demo and update data in elasticsearch
  • Start and test the data sync function

import demo

Import the hotel-admin project provided in the pre-course materials:

After running, visit http://localhost:8099

Declare exchanges and queues

The MQ structure is shown in the figure:

1. Introduce dependencies

Introduce the dependency of rabbitmq in hotel-admin and hotel-demo:

<!-- amqp rely -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2. Configure rabbitmq

Configure rabbitmq in application.yaml in hotel-admin, hotel-demo

spring:
	rabbitmq:
    host: rabbitmq server ip address
    port: 5672
    username: admin
    password: 283619
    virtual-host: /

3. Declare queue and switch names

Create a new class MqConstants under the cn.itcast.hotel.constatnts package in hotel-admin and hotel-demo:

/**
 * Declare the names of queues and exchanges
 *
 * @author xiexu
 * @create 2022-11-17 10:12
 */
public class MqConstants {

    /**
     * switch
     */
    public final static String HOTEL_EXCHANGE = "hotel.topic";
    /**
     * Listen for new or modified queues
     */
    public final static String HOTEL_INSERT_QUEUE = "hotel.insert.queue";
    /**
     * Listen for deleted queues
     */
    public final static String HOTEL_DELETE_QUEUE = "hotel.delete.queue";
    /**
     * Added or modified RoutingKey
     */
    public final static String HOTEL_INSERT_KEY = "hotel.insert";
    /**
     * Deleted RoutingKey
     */
    public final static String HOTEL_DELETE_KEY = "hotel.delete";

}

4. Declare queues and switches

Define the configuration class in hotel-demo, declare queues and switches:

/**
 * Declare queues and exchanges
 *
 * @author xiexu
 * @create 2022-11-17 10:17
 */
@Configuration
public class MqConfig {

    /**
     * declare a switch
     *
     * @return
     */
    @Bean
    public TopicExchange topicExchange() {
        return new TopicExchange(MqConstants.HOTEL_EXCHANGE, true, false);
    }

    /**
     * Declare new or modified queues
     *
     * @return
     */
    @Bean
    public Queue insertQueue() {
        return new Queue(MqConstants.HOTEL_INSERT_QUEUE, true);
    }

    /**
     * Declare the deleted queue
     *
     * @return
     */
    @Bean
    public Queue deleteQueue() {
        return new Queue(MqConstants.HOTEL_DELETE_QUEUE, true);
    }

    /**
     * Declare the binding relationship between the new queue and the switch
     *
     * @return
     */
    @Bean
    public Binding insertQueueBinding() {
        return BindingBuilder.bind(insertQueue()).to(topicExchange()).with(MqConstants.HOTEL_INSERT_KEY);
    }

    /**
     * Declare to delete the binding relationship between the queue and the exchange
     *
     * @return
     */
    @Bean
    public Binding deleteQueueBinding() {
        return BindingBuilder.bind(deleteQueue()).to(topicExchange()).with(MqConstants.HOTEL_DELETE_KEY);
    }

}

Send MQ message

Send MQ messages respectively in the add, delete, and modify services in hotel-admin:

@RestController
@RequestMapping("/hotel")
public class HotelController {

    @Autowired
    private IHotelService hotelService;

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping("/{id}")
    public Hotel queryById(@PathVariable("id") Long id) {
        return hotelService.getById(id);
    }

    @GetMapping("/list")
    public PageResult hotelList(@RequestParam(value = "page", defaultValue = "1") Integer page, @RequestParam(value = "size", defaultValue = "1") Integer size) {
        Page<Hotel> result = hotelService.page(new Page<>(page, size));

        return new PageResult(result.getTotal(), result.getRecords());
    }

    /**
     * Add
     *
     * @param hotel
     */
    @PostMapping
    public void saveHotel(@RequestBody Hotel hotel) {
        // Add new hotel
        hotelService.save(hotel);
        /**
         * Send MQ message
         * First parameter: switch
         * The second parameter: RoutinKey
         * The third parameter: In order to save resources, only the hotel id is sent, and the consumer can query the mysql database through the id to get the inserted hotel data
         */
        rabbitTemplate.convertAndSend(HotelMqConstants.EXCHANGE_NAME, HotelMqConstants.INSERT_KEY, hotel.getId());
    }

    /**
     * renew
     *
     * @param hotel
     */
    @PutMapping()
    public void updateById(@RequestBody Hotel hotel) {
        if (hotel.getId() == null) {
            throw new InvalidParameterException("id Can not be empty");
        }
        hotelService.updateById(hotel);
        /**
         * Send MQ message
         * First parameter: switch
         * The second parameter: RoutinKey
         * The third parameter: In order to save resources, only the hotel id is sent, and the consumer can query the mysql database through the id to get the inserted hotel data
         */
        rabbitTemplate.convertAndSend(HotelMqConstants.EXCHANGE_NAME, HotelMqConstants.INSERT_KEY, hotel.getId());
    }

    /**
     * delete
     *
     * @param id
     */
    @DeleteMapping("/{id}")
    public void deleteById(@PathVariable("id") Long id) {
        hotelService.removeById(id);
        /**
         * Send MQ message
         * First parameter: switch
         * The second parameter: RoutinKey
         * The third parameter: hotel id
         */
        rabbitTemplate.convertAndSend(HotelMqConstants.EXCHANGE_NAME, HotelMqConstants.DELETE_KEY, id);
    }
}

Receive MQ message

Things to do when hotel-demo receives MQ messages include:

  • New message: Query hotel information according to the passed hotel id, and then add a piece of data to the es index library
  • Delete message: Delete a piece of data in the index library according to the passed hotel id

operate

1. First, add new and deleted services to IHotelService under the cn.itcast.hotel.service package of hotel-demo

void insertById(Long id);

void deleteById(Long id);

2. Implement the business in HotelService under the cn.itcast.hotel.service.impl package in hotel-demo:

		@Override
    public void insertById(Long id) {
        try {
            // 1. Query hotel data according to id
            Hotel hotel = getById(id);
            // 2. Convert to HotelDoc document type
            HotelDoc hotelDoc = new HotelDoc(hotel);
            // 3. Convert to JSON format
            String json = JSON.toJSONString(hotelDoc);

            // 1. Create a Request object
            IndexRequest request = new IndexRequest("hotel").id(hotelDoc.getId().toString());
            // 2. Prepare the JSON document
            request.source(json, XContentType.JSON);
            // 3. Send request
            client.index(request, RequestOptions.DEFAULT);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public void deleteById(Long id) {
        try {
            // 1. Create a Request object
            DeleteRequest request = new DeleteRequest("hotel", id.toString());
            // 2. Send request
            client.delete(request, RequestOptions.DEFAULT);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

3. Write the listener

Add a new class under the cn.itcast.hotel.mq package in hotel-demo:

/**
 * @author xiexu
 * @create 2022-11-17 10:37
 */
@Component
public class HotelListener {

    @Autowired
    private IHotelService hotelService;

    /**
     * Monitor the business added or modified by the hotel
     *
     * @param id hotel id
     */
    @RabbitListener(queues = MqConstants.HOTEL_INSERT_QUEUE) // Listening queue name
    public void listenHotelInsertOrUpdate(Long id) {
        hotelService.insertById(id);
    }

    /**
     * Monitor the business deleted by the hotel
     *
     * @param id hotel id
     */
    @RabbitListener(queues = MqConstants.HOTEL_DELETE_QUEUE) // Listening queue name
    public void listenHotelDelete(Long id) {
        hotelService.deleteById(id);
    }

}

4. elasticsearch cluster

Stand-alone elasticsearch for data storage will inevitably face two problems: massive data storage and single point of failure.

  • Massive data storage problem: Logically split the index library into N shards (shards) and store them on multiple nodes
  • Single point of failure problem: back up fragmented data on different nodes (replica)

ES cluster related concepts:

  • Cluster: A group of nodes with a common cluster name.
  • Node: an Elasticearch instance in the cluster
  • shard: Indexes can be split into different parts for storage, called shards. In a cluster environment, different shards of an index can be split into different nodes

Solve the problem: the amount of data is too large and the storage capacity of a single point is limited.

Here, we divide the data into 3 pieces: shard0, shard1, shard2

  • Primary shard (Primary shard): relative to the definition of replica shards.
  • Replica shard (Replica shard) Each primary shard can have one or more copies, and the data is the same as the primary shard.

Data backup can ensure high availability, but if each shard is backed up, the number of nodes required will double, and the cost is too high!

In order to find a balance between high availability and cost, we can do this:

  • First shard the data and store it in different nodes
  • Then back up each shard and put it on the other node to complete mutual backup

In this way, the number of required service nodes can be greatly reduced. As shown in the figure, we take 3 shards and each shard as a backup copy as an example:

Now, each shard has 1 backup, stored on 3 nodes:

  • node0: holds shards 0 and 1
  • node1: holds shards 0 and 2
  • node2: saved shards 1 and 2

4.1. Deploy ES cluster

We will simulate a cluster by running multiple instances of Elasticsearch in Docker containers on a single machine.

It can be done directly using docker-compose, but this requires your Linux server to have at least 4G**** memory space.

1. First write a docker-compose file with the following content:

version: '2.2'
services:
  es01:
    image: elasticsearch:7.12.1 # mirror image
    container_name: es01 # container name
    environment: # environment variable
      - node.name=es01 # node name
      - cluster.name=es-docker-cluster # cluster name
      - discovery.seed_hosts=es02,es03 # The ip address of other nodes in the cluster, because the docker container is interconnected, so just write the container name directly
      - cluster.initial_master_nodes=es01,es02,es03 # The initialized master node means that these three es nodes can participate in the election
      - "ES_JAVA_OPTS=-Xms512m -Xmx512m" # JVM heap memory size
    volumes: # data volume
      - data01:/usr/share/elasticsearch/data
    ports: # Port Mapping
      - 9200:9200
    networks:
      - elastic
  es02:
    image: elasticsearch:7.12.1
    container_name: es02
    environment:
      - node.name=es02
      - cluster.name=es-docker-cluster
      - discovery.seed_hosts=es01,es03
      - cluster.initial_master_nodes=es01,es02,es03
      - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
    volumes:
      - data02:/usr/share/elasticsearch/data
    ports: # Port Mapping
      - 9201:9200
    networks:
      - elastic
  es03:
    image: elasticsearch:7.12.1
    container_name: es03
    environment:
      - node.name=es03
      - cluster.name=es-docker-cluster
      - discovery.seed_hosts=es01,es02
      - cluster.initial_master_nodes=es01,es02,es03
      - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
    volumes:
      - data03:/usr/share/elasticsearch/data
    ports: # Port Mapping
      - 9202:9200
    networks:
      - elastic

volumes:
  data01:
    driver: local
  data02:
    driver: local
  data03:
    driver: local

networks:
  elastic:
    driver: bridge

Example:

version: '2.2'
services:
  es01:
    image: elasticsearch:7.12.1
    container_name: es01
    environment:
      - node.name=es01
      - cluster.name=es-docker-cluster
      - discovery.seed_hosts=es02,es03
      - cluster.initial_master_nodes=es01,es02,es03
      - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
    volumes:
      - data01:/usr/share/elasticsearch/data
    ports:
      - 9200:9200
    networks:
      - elastic
  es02:
    image: elasticsearch:7.12.1
    container_name: es02
    environment:
      - node.name=es02
      - cluster.name=es-docker-cluster
      - discovery.seed_hosts=es01,es03
      - cluster.initial_master_nodes=es01,es02,es03
      - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
    volumes:
      - data02:/usr/share/elasticsearch/data
    ports: 
      - 9201:9200
    networks:
      - elastic
  es03:
    image: elasticsearch:7.12.1
    container_name: es03
    environment:
      - node.name=es03
      - cluster.name=es-docker-cluster
      - discovery.seed_hosts=es01,es02
      - cluster.initial_master_nodes=es01,es02,es03
      - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
    volumes:
      - data03:/usr/share/elasticsearch/data
    ports:
      - 9202:9200
    networks:
      - elastic

volumes:
  data01:
    driver: local
  data02:
    driver: local
  data03:
    driver: local

networks:
  elastic:
    driver: bridge

2. To run es, you need to modify the Linux system permissions and modify the /etc/sysctl.conf file

vi /etc/sysctl.conf

3. Add the following content:

vm.max_map_count=262144

4. Then execute the command to make the configuration take effect

sysctl -p

5. Start the cluster through docker-compose:

docker-compose up -d

6. View the logs of each es node

docker logs -f es01

docker logs -f es02

docker logs -f es03

4.2. Cluster status monitoring

kibana can monitor the es cluster status, but the new version needs to rely on the x-pack function of es, and the configuration is more complicated.

It is recommended to use cerebro to monitor the status of es cluster, official website: https://github.com/lmenezes/cerebro

After downloading, unzip and open cerebro in the bin directory

access http://localhost:9000 You can enter the management interface

The green line indicates that the es cluster is in a healthy state

4.3. Create an index library

Create an index library using kibana's DevTools

Enter the command in DevTools:

PUT /itcast
{
  "settings": {
    "number_of_shards": 3, // Number of fragments
    "number_of_replicas": 1 // The number of replicas added to each shard
  },
  "mappings": {
    "properties": {
      // mapping mapping definition ...
    }
  }
}

Create an index library with cerebro

Fill in the index library information:

Go back to the home page, and you can view the fragmentation effect of the index library:

4.4. Division of Cluster Responsibilities

Cluster nodes in elasticsearch have different responsibilities:

By default, any node in the cluster has the above four roles at the same time.

A real cluster must separate cluster responsibilities:

  • master node: high CPU requirements, but low memory requirements
  • data nodes: high CPU and memory requirements
  • coordinating node: High requirements for network bandwidth and CPU

Separation of duties allows us to allocate different hardware for deployment according to the needs of different nodes, avoiding mutual interference between services.

Each node role in elasticsearch has its own different responsibilities, so it is recommended that each node has an independent role during cluster deployment

A typical es cluster responsibility division is shown in the figure:

LB refers to Load Balancer.

4.5, ES cluster split-brain problem

By default, each node is a master eligible node (master node), so once the master node goes down, other candidate nodes will elect one to become the master node. A split-brain problem can occur when the master node has a network failure with other nodes.

For example, in a cluster, the master node loses connection with other nodes due to network failure:

At this time, node2 and node3 think that node1 is down, and they will re-elect the master:

After node3 is elected, the cluster continues to provide external services, node2 and node3 form a cluster, node1 forms a cluster, and the data of the two clusters is not synchronized, resulting in data discrepancies.

When the network is restored, because there are two master nodes in the cluster, the status of the cluster is inconsistent, and a split-brain situation occurs:

In order to avoid the split-brain problem, it is required that the number of votes exceeds (number of eligible nodes + 1)/2 to be elected as the master, so the number of eligible nodes should preferably be an odd number. The corresponding configuration item is discovery.zen.minimum_master_nodes, which has become the default configuration after es7.0, so split brain problems generally do not occur.

For example: for a cluster formed by 3 nodes, the votes must exceed (3 + 1)/2, which is 2 votes. node3 gets the votes of node2 and node3, and is elected as the master. And node1 has only 1 vote for itself and was not elected. There is still only one master node in the cluster, and there is no split-brain problem.

Summarize

What is the role of the master eligible node?

  • Participate in group election
  • The master node can manage the cluster state, manage sharding information, and process requests to create and delete index libraries

What is the role of the data node?

  • CRUD of data

What is the role of the coordinator node?

  • Route requests to other nodes
  • Combine the query results and return them to the user

4.6. Cluster distributed storage

When new documents are added, they should be saved in different shards to ensure data balance, so how does the coordinating node determine which shard the data should be stored in?

Distributed storage test

Insert three pieces of data:

You can see from the test that the three pieces of data are in different shards:

result:

Distributed storage principle

elasticsearch will use the hash algorithm to calculate which shard the document should be stored on:

The coordinating node performs a hash operation based on the id, and the result obtained is the remainder of the number of shards, and the remainder is the corresponding shard to be stored

illustrate:

  • _routing defaults to the id of the document
  • The algorithm is related to the number of shards, so once the index library is created, the number of shards cannot be modified!

The process of adding new documents is as follows:

Dark blue represents the primary shard, light blue represents the shard replica

  • Add a new document with id=1
  • Do hash operation on id, if the result is 2, it should be stored in shard-2
  • The primary shard of shard-2 is on node3, and the data is routed to node3, and node3 saves the document
  • Synchronize the replica shard (R-2) to shard-2, on the node2 node
  • Return the result to the coordinating-node node (node1)

4.7, cluster distributed query

elasticsearch queries are divided into two phases:

  • scatter phase: In the scatter phase, the coordinating node will distribute the request to each shard
  • gather phase: the gathering phase, the coordinating node summarizes the search results of the data node, and processes it as the final result set and returns it to the user

4.8. Cluster failover

The master node of the cluster will monitor the status of the nodes in the cluster. If a node is found to be down, it will immediately migrate the fragmented data of the down node to other nodes to ensure data security. This is called failover.

For example, a cluster structure is shown in the figure, and all three nodes are healthy.

Now node1 is the master node and the other two nodes are slave nodes. Suddenly, node1 fails:

The first thing after the downtime is to re-elect the master, for example, node2 is selected:

After node2 becomes the primary node, it will check the cluster monitoring status and find that P-1 has no replica fragments and P-0 has no primary fragments. Therefore, it is necessary to migrate the data on node1 to node2 and node3, and ensure that any shard has at least two copies (one primary shard and one replica shard):

Summarize:

  • After the master node (master) goes down, the candidate master node (EligibleMaster) is elected as the new master node.
  • The master node (master) monitors the fragmentation and node status, and transfers the fragmentation on the faulty node to the normal node to ensure data security.

Tags: ElasticSearch search engine Spring Cloud

Posted by b-real on Fri, 18 Nov 2022 11:53:01 +1030