Im trying to save some data in a postgreSQL database using cygnus-ngsi, but nothing happens. Im running all services in a docker container using docker-compose.
docker-compose.yml:
...
cygnus:
image: fiware/cygnus-ngsi:latest
hostname: cygnus
container_name: cygnus_fiware
volumes:
- ./config/cygnus/cygnus.conf:/opt/apache-flume/conf/agent.conf
- ./config/cygnus/grouping_rules.conf:/opt/apache-flume/conf/grouping_rules.conf
links:
- orion
- postgres
expose:
- "5050"
ports:
- "5050:5050"
postgres:
restart: always
image: postgres:latest
container_name: postgres_fiware
volumes:
- ./data/db/postgres:/var/lib/postgresql/data
ports:
- "5432:5432"
expose:
- "5432"
environment:
- POSTGRES_USER=teste
- POSTGRES_DB=newdb
- POSTGRES_PASSWORD=123456789
agent.conf
cygnus-ngsi.sources = http-source
cygnus-ngsi.sinks = postgresql-sink
cygnus-ngsi.channels = postgresql-channel
#=============================================
# source configuration
# channel name where to write the notification events
cygnus-ngsi.sources.http-source.channels = postgresql-channel
# source class, must not be changed
cygnus-ngsi.sources.http-source.type = org.apache.flume.source.http.HTTPSource
# listening port the Flume source will use for receiving incoming notifications
cygnus-ngsi.sources.http-source.port = 5050
# Flume handler that will parse the notifications, must not be changed
cygnus-ngsi.sources.http-source.handler = com.telefonica.iot.cygnus.handlers.NGSIRestHandler
# URL target
cygnus-ngsi.sources.http-source.handler.notification_target = /notify
# default service (service semantic depends on the persistence sink)
cygnus-ngsi.sources.http-source.handler.default_service = default
# default service path (service path semantic depends on the persistence sink)
cygnus-ngsi.sources.http-source.handler.default_service_path = /
# source interceptors, do not change
cygnus-ngsi.sources.http-source.interceptors = ts gi
# TimestampInterceptor, do not change
cygnus-ngsi.sources.http-source.interceptors.ts.type = timestamp
# GroupingInterceptor, do not change
cygnus-ngsi.sources.http-source.interceptors.gi.type = com.telefonica.iot.cygnus.interceptors.NGSIGroupingInterceptor$Builder
# Grouping rules for the GroupingInterceptor, put the right absolute path to the file if necessary
# see the doc/design/interceptors document for more details
cygnus-ngsi.sources.http-source.interceptors.gi.grouping_rules_conf_file = /opt/apache-flume/conf/grouping_rules.conf
# ============================================
# NGSIPostgreSQLSink configuration
# channel name from where to read notification events
cygnus-ngsi.sinks.postgresql-sink.channel = postgresql-channel
# sink class, must not be changed
cygnus-ngsi.sinks.postgresql-sink.type = com.telefonica.iot.cygnus.sinks.NGSIPostgreSQLSink
# true applies the new encoding, false applies the old encoding.
cygnus-ngsi.sinks.postgresql-sink.enable_encoding = false
# true if name mappings are enabled for this sink, false otherwise
cygnus-ngsi.sinks.postgresql-sink.enable_name_mappings = false
# true if the grouping feature is enabled for this sink, false otherwise
cygnus-ngsi.sinks.postgresql-sink.enable_grouping = false
# true if lower case is wanted to forced in all the element names, false otherwise
cygnus-ngsi.sinks.postgresql-sink.enable_lowercase = false
# the FQDN/IP address where the PostgreSQL server runs
cygnus-ngsi.sinks.postgresql-sink.postgresql_host = postgres
# the port where the PostgreSQL server listens for incomming connections
cygnus-ngsi.sinks.postgresql-sink.postgresql_port = 5432
# the name of the postgresql database
cygnus-ngsi.sinks.postgresql-sink.postgresql_database = newdb
# a valid user in the PostgreSQL server
cygnus-ngsi.sinks.postgresql-sink.postgresql_username = teste
# password for the user above
cygnus-ngsi.sinks.postgresql-sink.postgresql_password = 123456789
# how the attributes are stored, either per row either per column (row, column)
cygnus-ngsi.sinks.postgresql-sink.attr_persistence = row
# select the data_model: dm-by-service-path or dm-by-entity
cygnus-ngsi.sinks.postgresql-sink.data_model = dm-by-entity
# number of notifications to be included within a processing batch
cygnus-ngsi.sinks.postgresql-sink.batch_size = 100
# timeout for batch accumulation
cygnus-ngsi.sinks.postgresql-sink.batch_timeout = 30
# number of retries upon persistence error
cygnus-ngsi.sinks.postgresql-sink.batch_ttl = 10
# =============================================
# postgresql-channel configuration
# channel type (must not be changed)
cygnus-ngsi.channels.postgresql-channel.type = memory
# capacity of the channel
cygnus-ngsi.channels.postgresql-channel.capacity = 1000
# amount of bytes that can be sent per transaction
cygnus-ngsi.channels.postgresql-channel.transactionCapacity = 100
Messages from docker-compose:
cygnus_fiware | time=2017-06-19T15:11:15.132Z | lvl=WARN | corr=4af15a0c-5501-11e7-aa0a-0242ac130004 | trans=508576db-1443-4c64-bfc9-629d1a0b250e | srv=espometeo | subsrv=/environment | comp=cygnus-ngsi | op=getEvents | msg=com.telefonica.iot.cygnus.handlers.NGSIRestHandler[257] : [NGSIRestHandler] Unnecessary header
cygnus_fiware | time=2017-06-19T15:11:15.133Z | lvl=INFO | corr=89ac4c66-5501-11e7-850f-0242ac130004 | trans=3ae0dc99-de51-49a3-937f-42d887b7e7d7 | srv=espometeo | subsrv=/environment | comp=cygnus-ngsi | op=getEvents | msg=com.telefonica.iot.cygnus.handlers.NGSIRestHandler[282] : [NGSIRestHandler] Starting internal transaction (3ae0dc99-de51-49a3-937f-42d887b7e7d7)
cygnus_fiware | time=2017-06-19T15:11:15.133Z | lvl=INFO | corr=89ac4c66-5501-11e7-850f-0242ac130004 | trans=3ae0dc99-de51-49a3-937f-42d887b7e7d7 | srv=espometeo | subsrv=/environment | comp=cygnus-ngsi | op=getEvents | msg=com.telefonica.iot.cygnus.handlers.NGSIRestHandler[299] : [NGSIRestHandler] Received data ({ "subscriptionId" : "5947d328e143997a02b11008", "originator" : "localhost", "contextResponses" : [ { "contextElement" : { "type" : "EstacaoMeteo", "isPattern" : "false", "id" : "Estacao3", "attributes" : [ { "name" : "Humidity", "type" : "float", "value" : "35.3", "metadatas" : [ { "name" : "TimeInstant", "type" : "ISO8601", "value" : "2017-06-24T13:03:00" } ] }, { "name" : "Temperature", "type" : "float", "value" : "15.2", "metadatas" : [ { "name" : "TimeInstant", "type" : "ISO8601", "value" : "2017-06-24T13:03:00" } ] } ] }, "statusCode" : { "code" : "200", "reasonPhrase" : "OK" } } ]})
cygnus_fiware | time=2017-06-19T15:11:36.141Z | lvl=INFO | corr=89ac4c66-5501-11e7-850f-0242ac130004 | trans=3ae0dc99-de51-49a3-937f-42d887b7e7d7 | srv=espometeo | subsrv=/environment | comp=cygnus-ngsi | op=persistAggregation | msg=com.telefonica.iot.cygnus.sinks.NGSIPostgreSQLSink[479] : [postgresql-sink] Persisting data at NGSIPostgreSQLSink. Schema (espometeo), Table (environment_estacao3_estacaometeo), Fields ((recvTimeTs,recvTime,fiwareServicePath,entityId,entityType,attrName,attrType,attrValue,attrMd)), Values (('1497885075139','2017-06-19T15:11:15.139Z','/environment','Estacao3','EstacaoMeteo','Humidity','float','35.3','[{"name":"TimeInstant","type":"ISO8601","value":"2017-06-24T13:03:00"}]'),('1497885075139','2017-06-19T15:11:15.139Z','/environment','Estacao3','EstacaoMeteo','Temperature','float','15.2','[{"name":"TimeInstant","type":"ISO8601","value":"2017-06-24T13:03:00"}]'))
cygnus_fiware | time=2017-06-19T15:11:36.142Z | lvl=WARN | corr=89ac4c66-5501-11e7-850f-0242ac130004 | trans=3ae0dc99-de51-49a3-937f-42d887b7e7d7 | srv=espometeo | subsrv=/environment | comp=cygnus-ngsi | op=processNewBatches | msg=com.telefonica.iot.cygnus.sinks.NGSISink[541] :
Seems like cygnus is getting all the data from the Orion and putting it right, but when i go to the postgresql db, there nothing. Some one already have this problem?
Persistance ERROR message:
cygnus_fiware | time=2017-06-22T09:45:06.092Z | lvl=ERROR | corr=N/A | trans=N/A | srv=N/A | subsrv=N/A | comp=cygnus-ngsi | op=processRollbackedBatches | msg=com.telefonica.iot.cygnus.sinks.NGSISink[398] : CygnusPersistenceError. -, null. Stack trace: [com.telefonica.iot.cygnus.sinks.NGSIPostgreSQLSink.persistAggregation(NGSIPostgreSQLSink.java:504), com.telefonica.iot.cygnus.sinks.NGSIPostgreSQLSink.persistBatch(NGSIPostgreSQLSink.java:231), com.telefonica.iot.cygnus.sinks.NGSISink.processRollbackedBatches(NGSISink.java:390), com.telefonica.iot.cygnus.sinks.NGSISink.process(NGSISink.java:372), org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68), org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147), java.lang.Thread.run(Thread.java:748)]
Cygnus start ERRORS:
cygnus_fiware | + exec /usr/lib/jvm/java-6-sun/bin/java -Xms512m -Xmx1g -Dcom.sun.management.jmxremote -Dflume.root.logger=INFO,console -Duser.timezone=UTC -Dfile.encoding=UTF-8 -cp '/opt/apache-flume/conf:/opt/apache-flume/lib/*:/opt/apache-flume/plugins.d/cygnus/lib/*:/opt/apache-flume/plugins.d/cygnus/libext/*' -Djava.library.path= com.telefonica.iot.cygnus.nodes.CygnusApplication -f /opt/apache-flume/conf/agent.conf -n cygnus-ngsi -p 8081
cygnus_fiware | /opt/apache-flume/bin/cygnus-flume-ng: line 232: /usr/lib/jvm/java-6-sun/bin/java: No such file or directory
cygnus_fiware | /opt/apache-flume/bin/cygnus-flume-ng: line 232: exec: /usr/lib/jvm/java-6-sun/bin/java: cannot execute: No such file or directory
I solved the issue by changing false to true in agent.conf file: cygnus-ngsi.sinks.postgresql-sink.backend.enable_cache = true.
There is a problem with PostgreSQL sink when cache is not enabled. It is described at this issue.