ELK unified log management



Generally, we need to conduct log analysis scenarios: you can get the information you want by directly grep and awk in the log file. However, in large-scale scenarios, this method is inefficient and faces problems, including how to archive if the log volume is too large, what to do if the text search is too slow, and how to query in multiple dimensions. Centralized log management is required, and the logs on all servers are collected and summarized. The common solution is to establish a centralized log collection system to collect, manage and access the logs on all nodes.

The general large-scale system is a distributed deployment architecture. Different service modules are deployed on different servers. When a problem occurs, in most cases, it is necessary to locate the specific server and service module according to the key information exposed by the problem, and build a set of centralized logging system, which can improve the efficiency of locating the problem.

A complete centralized log system needs to include the following main features:

  • Collection - able to collect log data from multiple sources
  • Transmission - able to stably transmit log data to the central system
  • Storage - how to store log data
  • Analysis - UI analysis can be supported
  • Warning - can provide error reporting and monitoring mechanism

ELK provides a complete set of solutions, which are open-source software. They are used together, perfectly connected, and efficiently meet the applications in many occasions. At present, it is a mainstream log system.

ELK introduction

ELK is the abbreviation of three open source software, which means elasticsearch, logstash and kibana respectively. They are all open source software.

Elasticsearch is an open source distributed search engine, which provides three functions: collecting, analyzing and storing data. Its features include: distributed, zero configuration, automatic discovery, automatic index fragmentation, index copy mechanism, restful style interface, multiple data sources, automatic search load, etc.

Logstash is mainly used to collect, analyze and filter logs. It supports a large number of data acquisition methods. The general working mode is c/s architecture. The client side is installed on the host that needs to collect logs, and the server side is responsible for filtering, modifying and other operations of each node's logs received at one time and going to elastic search.

Kibana is also an open source and free tool. Kibana can provide log analysis friendly Web interface for Logstash and ElasticSearch, which can help summarize, analyze and search important data logs

ELK architecture



In this architecture, Logstash collects relevant logs and data on each node, and sends them to elasticsearch on the remote server for storage after analysis and filtering. Elasticsearch compresses and stores the data in the form of slices, and provides a variety of API s for users to query and operate. Users can also configure Kibana Web more intuitively to query logs conveniently and generate reports according to data.

However, if the remote Logstash server stops running due to failure, the data will be lost. Therefore, redis or kafka may be introduced as message middleware to indirectly transfer the messages or data in the queue to logstash. Logstash will filter and analyze the data and transfer it to Elasticsearch for storage. Finally, Kibana will present the logs and data to the user. In this way, even if the remote logstash goes down, there will be message middleware for messages. Wait for the logstash to recover and continue to consume messages, so as to avoid data loss.

How Logstash works

Logstash event processing has three stages: inputs → filters → outputs. It is a tool for receiving, processing and forwarding logs. It supports system log, webserver log, error log and application log. In short, it includes all types of logs that can be thrown out.



Input: input data to logstash.

Some common inputs are:

  • File: read from a file in the file system, similar to the tail -f command
  • syslog: listen for system log messages on port 514 and parse them according to RFC3164 standard
  • Redis: read from redis service
  • beats: read from filebeat

Filters: data intermediate processing, which operates on data.

Some commonly used filters are:

  • Grok: parse arbitrary text data. Grok is the most important plug-in of Logstash. Its main function is to convert the text format string into specific structured data, which can be used with regular expressions. Built in more than 120 parsing syntax.

Official grok expression: https://github.com/logstash-plugins/logstash-patterns-core/tree/master/patterns

grok online debugging: https://grokdebug.herokuapp.com/

  • mutate: convert the field. For example, delete, replace, modify and rename fields.
  • drop: discard some events without processing.
  • clone: copy the event. In this process, you can also add or remove fields.
  • geoip: add geographic information (for kibana graphical display at the front desk)

Outputs: outputs is the last component of the logstash processing pipeline.  

An event can go through multiple outputs during processing, but once all outputs are executed, the event will complete its life cycle. Some common outputs are:

  • Elastic search: it can save data efficiently and query conveniently and simply.
  • File: save event data to a file.
  • graphite: send event data to graphical components, a popular open source storage graphical display component.
  • Codecs: codecs is a filter based on data flow. It can be configured as part of input and output. Codecs can help you easily split the sent data that has been serialized.  

Some common codecs:

  • json: encode / decode data using json format.
  • multiline: summarize the data in multiple events into a single row. For example: java exception information and stack information.

Install ElasticSearch

Download and install ElasticSearch 6.3.0

mkdir es
cd es
wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-6.3.0.tar.gz
tar -zxvf elasticsearch-6.3.0.tar.gz

Modify es configuration file

vim /config/elasticsearch.yml
#Native ip
network.host: xxx.xxx.xx.xxx
#Default listening port
http.port: 9200

Run ElasticSearch 6.3.0

After configuration, es root is not allowed to run. Create a new account to run

useradd elk
passwd elk

Enter the password twice

Go back to the parent directory and change the owner of elasticsearch

chown -R elk elasticsearch-6.3.0

Switch to elk user

su elk

Run it in the background and record the startup log to es Log (create your own log folder and give elk users permission to operate the log folder. Of course, you can directly start the default log in the background)

nohup ./bin/elasticsearch >/usr/local/es/log/es.log

Install Logstash

Download and install logstash

mkdir logstash
cd logstash
wget https://artifacts.elastic.co/downloads/logstash/logstash-6.3.0.tar.gz
tar -zxvf logstash-6.3.0.tar.gz
cd logstash-6.3.0/

Create a configuration file (logstash-es.conf)

vim config/logstash-es.conf
input {
  tcp {
    port => 4560
    codec => json_lines
output {
  elasticsearch {
    hosts => "3x.1xx.7x.1xx"
    index => "log_%{+YYYY.MM.dd}_%{[appname]}"
  stdout {
    codec => rubydebug

be careful:

1.input.tcp: the local address is configured in. The ip and port must be the same as the logback of springboot The configuration in XML is exactly the same. One ip and one localhost cannot be configured

2.output.elasticsearch: configure the ip address of the elasticsearch server

3.%{[appname]}: reference the logback of springboot Variables configured in XML

4.output.stdout: display the output information on the terminal (can not be configured)

5. The conf file must be indented in strict accordance with the tab, otherwise an exception will occur when starting logstash

Run logstash

nohup ./bin/logstash -f ./config/logstash-es.conf >/usr/local/logstash/log/logstash.log &

Install Kibana

mkdir kibana
cd kibana
wget https://artifacts.elastic.co/downloads/kibana/kibana-6.3.0-linux-x86_64.tar.gz
tar -zxvf kibana-6.3.0-linux-x86_64.tar.gz
cd kibana-6.3.0-linux-x86_64

Modify profile

vim config/kibana.y
server.host: ""
elasticsearch.url: "http://your es ip:port"
elasticsearch.username: "elastic"
elasticsearch.password: "changeme

Run kibana

nohup ./bin/kibana >/usr/local/kibana/log/kibana.log &

SpringBoot integrates Logback and prints logs to logstash

Project POM Introducing logback dependency into XML file


Add logback in resource XML configuration file

<?xml version="1.0" encoding="UTF-8"?>
<property resource="application.properties"/>
<appender name="LOGSTASH" class="net.logstash.logback.appender.LogstashTcpSocketAppender">
<!-- <filter class="ch.qos.logback.classic.filter.LevelFilter">-->
<!-- &lt;!&ndash;filter INFO&ndash;&gt;-->
<!-- <level>INFO</level>-->
<!-- &lt;!&ndash;Prohibit after matching&ndash;&gt;-->
<!-- <onMatch>DENY</onMatch>-->
<!-- &lt;!&ndash;No match is allowed&ndash;&gt;-->
<!-- <onMismatch>ACCEPT</onMismatch>-->
<!-- </filter>-->
<encoder charset="UTF-8" class="net.logstash.logback.encoder.LogstashEncoder">
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder charset="UTF-8"> <!--encoder The character set can be specified, which is meaningful for Chinese output-->
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger - %msg%n</pattern>
<logger name="com.ntnikka" level="Error" additivity="false">
<appender-ref ref="LOGSTASH"/>
<logger name="com.ntnikka" level="INFO" additivity="false">
<appender-ref ref="STDOUT" />
<!-- <root level="Error">-->
<!-- <appender-ref ref="LOGSTASH" />-->
<!-- <appender-ref ref="STDOUT" />-->
<!-- </root>-->

Configure logstash IP and PORT


logstash configuration of multiple IP S or ports

        <connectionTTL>5 minutes</connectionTTL>

Output logs to logstash if there are multiple logstash IP S or ports, you can poll the load ports

Different application modules can be configured with different module names to facilitate log search


{"appname":"app-server1","xxx":"xxx",..} If necessary, you can also add other custom parameters


Writing demo examples

public class demoController {
  private static final Logger logger = LoggerFactory.getLogger(demoController.class);
  demoService demoService;
  public String testSysLog(){
    // demoService.demoMethod();
    return "test demo";
  public static void divide(){
    // int i = 10 /0;
    throw new MaynException("MaynException", "test MaynException",40000);

The exceptions in the project are handled uniformly, and the handle class is written. You can also try the exception in the program and print the exception log through logback in the catch code block

* Exception handler
public class MaynExceptionHandler {
  private Logger logger = LoggerFactory.getLogger(this.getClass());

  * Handle custom exceptions
  public R handleMaynException(MaynException e) { R r = new R();
    r.put("code", e.getCode());
    r.put("msg", e.getMessage()); logger.error(e.getMessage(),e); return r;

  public R handleDuplicateKeyException(DuplicateKeyException e) { logger.error(e.getMessage(), e);
    return R.error("The record already exists in the database");

public R handleException(Exception e) { System.out.println("===================== enter exceptionHandle ================="); logger.error(e.getMessage(), e); return R.error(); } }

Record exception log in unified processing

logger.error(e.getMessage(), e);

Start the springboot project, call the interface test, and output the console

08:48:23.369 [http-nio-9090-exec-5] ERROR com.ntnikka.common.RRExceptionHandler - MaynException
com.ntnikka.exception.MaynException: MaynException
at com.ntnikka.rhlogsystem.controller.demoController.divide(demoController.java:36)
at com.ntnikka.rhlogsystem.controller.demoController.testSysLog(demoController.java:30)
at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)
at com.ntnikka.aspect.SysLogAspect.around(SysLogAspect.java:52)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.aop.aspectj.AspectJAroundAdvice.invoke(AspectJAroundAdvice.java:70)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:1038)
at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:942)
at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:1005)
at org.springframework.web.servlet.FrameworkServlet.doGet(FrameworkServlet.java:897)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:634)
at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:882)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:741)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:231)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:53)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
at com.alibaba.druid.support.http.WebStatFilter.doFilter(WebStatFilter.java:123)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
at org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:99)
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
at org.springframework.web.filter.FormContentFilter.doFilterInternal(FormContentFilter.java:92)
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:200)
at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:96)
at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:490)
at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:139)
at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:92)
at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:74)
at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:343)
at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:408)
at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:66)
at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:834)
at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1415)
at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61)
at java.lang.Thread.run(Thread.java:748)

View kibana



The exception log is successfully output to kibana, and the project integration logback is successful

Tags: Java ELK

Posted by tj71587 on Sun, 17 Apr 2022 07:58:17 +0930