I'm using logstash input jdbc plugin to read two (or more) databases and send the data to elasticsearch, and using kibana 4 to vizualize these data.
This is my logstash config:
input {
jdbc {
type => "A"
jdbc_driver_library => "C:\DEV\elasticsearch-1.7.1\plugins\elasticsearch-jdbc-1.7.1.0\lib\jtds-1.3.1.jar"
jdbc_driver_class => "Java::net.sourceforge.jtds.jdbc.Driver"
jdbc_connection_string => "jdbc:jtds:sqlserver://dev_data_base_server:1433/dbApp1;domain=CORPDOMAIN;useNTLMv2=true"
jdbc_user => "user"
jdbc_password => "pass"
schedule => "5 * * * *"
statement => "SELECT id, date, content, status from test_table"
}
jdbc {
type => "B"
jdbc_driver_library => "C:\DEV\elasticsearch-1.7.1\plugins\elasticsearch-jdbc-1.7.1.0\lib\jtds-1.3.1.jar"
jdbc_driver_class => "Java::net.sourceforge.jtds.jdbc.Driver"
jdbc_connection_string => "jdbc:jtds:sqlserver://dev_data_base_server:1433/dbApp2;domain=CORPDOMAIN;useNTLMv2=true"
jdbc_user => "user"
jdbc_password => "pass"
schedule => "5 * * * *"
statement => "SELECT id, date, content, status from test_table"
}
}
filter {
}
output {
if [type] == "A" {
elasticsearch {
host => "localhost"
protocol => http
index => "logstash-servera-%{+YYYY.MM.dd}"
}
}
if [type] == "B" {
elasticsearch {
host => "localhost"
protocol => http
index => "logstash-serverb-%{+YYYY.MM.dd}"
}
}
stdout { codec => rubydebug }
}
The problem is that every time run the logstash, it starts to save all data that is already in elastic search.
After run with the where clause = date > '2015-09-10' I'd stoped the logstash and run again (with --debug) with the 'special parameter' :sql_last_date. After the logstash startup It starts to show this in the log:
←[36mExecuting JDBC query {:statement=>"SELECT \n\tSUBSTRING(R.RECEBEDOR, 1, 2)
AS 'DDD',\nCASE WHEN R.STATUS <> 'RCON' AND R.COD_RESPOSTA in (428,429,230,425,
430,427,418,422,415,424,214,433,435,207,426) THEN 'REGRA DE NEGÓCIO' \n W
HEN R.STATUS = 'RCON' THEN 'SUCESSO'\n\t ELSE 'ERRO'\n END AS 'TIPO_MENSAGEM
',\nAP.ALIAS as 'CANAL', R.ID_RECARGA, R.VALOR, R.STATUS, R.COD_RESPOSTA, R.DESC
_RESPOSTA, R.DT_RECARGA as '@timestamp', R.ID_CLIENTE, R.ID_DEPENDENTE, R.ID_APL
ICACAO, RECEBEDOR, R.ID_OPERADORA, R.TIPO_PRODUTO \n\nFROM RECARGA R (NOLOCK)\nJ
OIN APLICACAO AP ON R.ID_APLICACAO = AP.ID_APLICACAO \nwhere R.DT_RECARGA > :sql
_last_start\nORDER BY R.DT_RECARGA ASC", :parameters=>{:sql_last_start=>2015-09-
10 18:48:00 UTC}, :level=>:debug, :file=>"/DEV/logstash-1.5.4/vendor/bundle/jrub
y/1.9/gems/logstash-input-jdbc-1.0.0/lib/logstash/plugin_mixins/jdbc.rb", :line=
>"107", :method=>"execute_statement"}←[0m
This time i ran with the 'real' statement that is:
SELECT
SUBSTRING(R.RECEBEDOR, 1, 2) AS 'DDD',
CASE WHEN R.STATUS <> 'RCON' AND R.COD_RESPOSTA in (428,429,230,425,430,427,418,422,415,424,214,433,435,207,426) THEN 'REGRA DE NEGÓCIO'
WHEN R.STATUS = 'RCON' THEN 'SUCESSO'
ELSE 'ERRO'
END AS 'TIPO_MENSAGEM',
AP.ALIAS as 'CANAL', R.ID_RECARGA, R.VALOR, R.STATUS, R.COD_RESPOSTA, R.DESC_RESPOSTA, R.DT_RECARGA as '@timestamp', R.ID_CLIENTE, R.ID_DEPENDENTE, R.ID_APLICACAO, RECEBEDOR, R.ID_OPERADORA
FROM RECARGA R (NOLOCK)
JOIN APLICACAO AP ON R.ID_APLICACAO = AP.ID_APLICACAO
where R.DT_RECARGA > :sql_last_start
ORDER BY R.DT_RECARGA ASC
Anyone knows how to solve it?
Thanks!
By default, the
jdbc
input will execute the configured SQL statement. In your case, your statement selects everything intest_table
. You need to instruct your SQL statement to only load data from the last time thejdbc
input ran by using the predefinedsql_last_start
parameter in your SQL query.Also if by any coincidence the same record is loaded twice from your DB and you don't want dups to be created in your ES server, you can also specify to use the record ID as the document ID in your
elasticsearch
output, that way the document will be updated in ES and not duplicated.sql_last_start
is nowsql_last_value
please check here the special parametersql_last_start
is now renamed tosql_last_value
for better clarity as it is not only limited to datetime but may have other column type as well. so now solution may be something like thisi have tested with sql Server DB
please run for first time with clean_run=>ture to avoid datatype error while in development we may have different datatype value stored in
sql_last_value
variable