In some circumstances, we may need to parse MySQL data to other security compliance appliances such as SIEM or a database such as Elasticsearch to have a better understanding of the data stored in the MySQL database.
In such a circumstance, we may use Logstash.
Logstash is designed to collect, parse, transform, and enrich data from various sources and then send it to a variety of destinations, typically Elasticsearch for search and analytics, but it can also send data to other systems like databases and message brokers.
In this post, We will Ingest MySQL Data Into Elasticsearch Using Logstash So let's see how we can achieve it.
1. Configure Yum Repo & Install Logstash, Elasticsearch & mysql-connector-java Connector
Before installing Logstash, Elasticsearch & mysql-connector-java connector let's enable the yum repository of Elasticsearch.
[root@siddhesh ~]# cat /etc/yum.repos.d/elasticsearch.repo
[elasticsearch-7.x]
name=Elasticsearch repository for 7.x packages
baseurl=https://artifacts.elastic.co/packages/7.x/yum
gpgcheck=1
gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch
enabled=1
autorefresh=1
type=rpm-md
[root@siddhesh ~]#
Install Logstash :
[root@siddhesh ~]# yum install logstash
Install Elasticsearch :
[root@siddhesh ~]# yum install elasticsearch
Install mysql-connector-java :
[root@siddhesh ~]# yum install mysql-connector-java
2. Create a Logstash Configuration File.
Let's create a Logstash configuration file "mysql_ingest.conf"
[root@siddhesh ~]# cat mysql_ingest.conf
input {
jdbc {
jdbc_connection_string => "jdbc:mysql://localhost:3306/testdatabase"
jdbc_user => "root"
jdbc_password => ""
jdbc_driver_library => "/usr/share/java/mysql-connector-java.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
statement => "SELECT * FROM inventory"
type => "your_type"
}
}
output {
elasticsearch {
hosts => ["http://localhost:9200"]
index => "builddevops_index"
}
}
[root@siddhesh ~]#
Where,
jdbc: This specifies that you are using the JDBC (Java Database Connectivity) input plugin to connect to a database.
jdbc_connection_string: The URL to connect to the MySQL database. In this case, it connects to a MySQL server running on the local machine on port 3306 and uses the testdatabase.
jdbc_user: The username used to authenticate with the MySQL server. In this case, it's "root."
jdbc_password: The password for the MySQL user. It is empty ("") in this example, so no password is provided. In a production environment, you should never store passwords in plain text and should use environment variables or other secure methods for password storage.
jdbc_driver_library: The path to the MySQL JDBC driver JAR file. Logstash needs this to establish a connection to the MySQL database. Ensure that the JAR file exists at the specified location.
jdbc_driver_class: The fully-qualified class name of the MySQL JDBC driver. This should be set to "com.mysql.jdbc.Driver" for the MySQL driver.
statement: The SQL query to execute on the MySQL database. In this case, it's a simple SELECT * FROM inventory, which selects all rows from the "inventory" table.
type: The type field to assign to the events ingested from this input. It's set to "your_type" in the example, but you can choose any type name that is meaningful to you. The type field can be used for filtering or routing data within the Logstash pipeline. elasticsearch: This specifies that you are using the Elasticsearch output plugin to send data to an Elasticsearch cluster.
hosts: The Elasticsearch cluster's URL. In this case, it's set to "http://localhost:9200," assuming Elasticsearch is running on the local machine on port 9200. Modify this to match your Elasticsearch cluster's location.
index: The name of the Elasticsearch index where the data will be stored. In this example, it's "builddevops_index." You can change this to a more meaningful name for your data.
3. To test, we'll add some sample data to MySQL.
MariaDB [testdatabase]> CREATE TABLE inventory ( id INT NOT NULL AUTO_INCREMENT, name VARCHAR(255) NOT NULL, PRIMARY KEY (id) );
Query OK, 0 rows affected (0.01 sec)
MariaDB [testdatabase]> insert into inventory values ("","Laptop");
Query OK, 1 row affected, 1 warning (0.01 sec)
MariaDB [testdatabase]> insert into inventory values ("","Desktop");
Query OK, 1 row affected, 1 warning (0.01 sec)
MariaDB [testdatabase]> insert into inventory values ("","Server");
Query OK, 1 row affected, 1 warning (0.01 sec)
MariaDB [testdatabase]> insert into inventory values ("","Server Rack");
Query OK, 1 row affected, 1 warning (0.01 sec)
MariaDB [testdatabase]> insert into inventory values ("","Keyboard");
Query OK, 1 row affected, 1 warning (0.01 sec)
MariaDB [testdatabase]>
MariaDB [testdatabase]> select * from inventory;
+----+-------------+
| id | name |
+----+-------------+
| 1 | Laptop |
| 2 | Desktop |
| 3 | Server |
| 4 | Server Rack |
| 5 | Keyboard |
+----+-------------+
5 rows in set (0.00 sec)
4. Create Elasticsearch index. Let's create an Elasticsearch index "builddevops_index" using curl API
[root@siddhesh ~]# curl -X PUT "http://localhost:9200/builddevops_index"
{"acknowledged":true,"shards_acknowledged":true,"index":"builddevops_index"}
[root@siddhesh ~]#
5. Run Logstash configuration file and ingest data into Elasticsearch.
[root@siddhesh ~]# /usr/share/logstash/bin/logstash -f mysql_ingest.conf
[INFO ] 2023-10-23 17:13:32.269 [[main]-pipeline-manager] javapipeline - Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>4, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50, "pipeline.max_inflight"=>500, "pipeline.sources"=>["/root/mysql_ingest.conf"], :thread=>"#<Thread:0x1f6eeba3 run>"}
[INFO ] 2023-10-23 17:13:33.327 [[main]-pipeline-manager] javapipeline - Pipeline Java execution initialization time {"seconds"=>1.05}
[INFO ] 2023-10-23 17:13:33.441 [[main]-pipeline-manager] javapipeline - Pipeline started {"pipeline.id"=>"main"}
[INFO ] 2023-10-23 17:13:33.497 [Agent thread] agent - Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}
[INFO ] 2023-10-23 17:13:34.222 [[main]<jdbc] jdbc - (0.006595s) SELECT * FROM inventory
[INFO ] 2023-10-23 17:13:35.038 [[main]-pipeline-manager] javapipeline - Pipeline terminated {"pipeline.id"=>"main"}
[INFO ] 2023-10-23 17:13:35.569 [Converge PipelineAction::Delete<main>] pipelinesregistry - Removed pipeline from registry successfully {:pipeline_id=>:main}
[INFO ] 2023-10-23 17:13:35.626 [LogStash::Runner] runner - Logstash shut down.
[root@siddhesh ~]#
6. Verify Data in Elasticsearch using Curl API.
[root@siddhesh ~]# curl -X GET "http://localhost:9200/builddevops_index/_search?q=name:*&pretty=true"
{
"took" : 3,
"timed_out" : false,
"_shards" : {
"total" : 1,
"successful" : 1,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : {
"value" : 5,
"relation" : "eq"
},
"max_score" : 1.0,
"hits" : [
{
"_index" : "builddevops_index",
"_type" : "_doc",
"_id" : "OjNYXIsBV53rzcJZ8DRG",
"_score" : 1.0,
"_source" : {
"id" : 1,
"name" : "Laptop",
"type" : "your_type",
"@version" : "1",
"@timestamp" : "2023-10-23T11:43:34.259Z"
}
},
{
"_index" : "builddevops_index",
"_type" : "_doc",
"_id" : "OzNYXIsBV53rzcJZ8DRG",
"_score" : 1.0,
"_source" : {
"id" : 5,
"name" : "Keyboard",
"type" : "your_type",
"@version" : "1",
"@timestamp" : "2023-10-23T11:43:34.272Z"
}
},
{
"_index" : "builddevops_index",
"_type" : "_doc",
"_id" : "PjNYXIsBV53rzcJZ8DRl",
"_score" : 1.0,
"_source" : {
"id" : 3,
"name" : "Server",
"type" : "your_type",
"@version" : "1",
"@timestamp" : "2023-10-23T11:43:34.272Z"
}
},
{
"_index" : "builddevops_index",
"_type" : "_doc",
"_id" : "PDNYXIsBV53rzcJZ8DRU",
"_score" : 1.0,
"_source" : {
"id" : 4,
"name" : "Server Rack",
"type" : "your_type",
"@version" : "1",
"@timestamp" : "2023-10-23T11:43:34.272Z"
}
},
{
"_index" : "builddevops_index",
"_type" : "_doc",
"_id" : "PTNYXIsBV53rzcJZ8DRW",
"_score" : 1.0,
"_source" : {
"id" : 2,
"name" : "Desktop",
"type" : "your_type",
"@version" : "1",
"@timestamp" : "2023-10-23T11:43:34.271Z"
}
}
]
}
}
[root@siddhesh ~]#
We have the exact same data that is in MySQL, as you can see here.
1 Comment