Cygnus 0.7.1 does not create tables (MySQL and HDF

2019-02-20 12:18发布

I've installed (from source) cygnus 0.7.1, and after configuring it (MySQL and HDFS sinks) I can start it without problems. When I subscribe cygnus to a orion context, it receives the information ok, but there is a problem with MySQL and HDFS. This is the log:

15/03/17 13:58:52 INFO handlers.OrionRestHandler: Starting transaction (1426597123-891-0000000000)
15/03/17 13:58:52 INFO handlers.OrionRestHandler: Received data ({  "subscriptionId" : "5508250c1860a36e55240c84",  "originator" : "localhost",  "contextResponses" : [    {      "contextElement" : {        "type" : "ubk-temp",        "isPattern" : "false",        "id" : "ubk:temp:1",        "attributes" : [          {            "name" : "temperature",            "type" : "float",            "value" : "11"          }        ]      },      "statusCode" : {        "code" : "200",        "reasonPhrase" : "OK"      }    }  ]})
15/03/17 13:58:52 INFO handlers.OrionRestHandler: Event put in the channel (id=1549700267, ttl=10)
15/03/17 13:58:52 INFO sinks.OrionSink: Event got from the channel (id=1549700267, headers={fiware-servicepath=ubktemp, destination=ubk_temp_1_ubk-temp, content-type=application/json, fiware-service=ubikwa, ttl=10, transactionId=1426597123-891-0000000000, timestamp=1426597132511}, bodyLength=462)
15/03/17 13:58:52 INFO sinks.OrionSink: Event got from the channel (id=1549700267, headers={fiware-servicepath=ubktemp, destination=ubk_temp_1_ubk-temp, content-type=application/json, fiware-service=ubikwa, ttl=10, transactionId=1426597123-891-0000000000, timestamp=1426597132511}, bodyLength=462)
15/03/17 13:58:52 INFO sinks.OrionMySQLSink: [mysql-sink] Persisting data at OrionMySQLSink. Database: ubikwa, Table: ubktemp_ubk_temp_1_ubk-temp, Timestamp: 2015-03-17T13:58:52.511, Data (attrs): {temperature=11}, (metadata): {temperature_md=[]}
15/03/17 13:58:53 INFO sinks.OrionHDFSSink: [hdfs-sink] Persisting data at OrionHDFSSink. HDFS file (ubikwa/ubktemp/ubk_temp_1_ubk-temp/ubk_temp_1_ubk-temp.txt), Data ({"recvTime":"2015-03-17T13:58:52.511","temperature":"11", "temperature_md":[]})
15/03/17 13:58:53 WARN sinks.OrionSink: Bad context data (Table 'ubikwa.ubktemp_ubk_temp_1_ubk-temp' doesn't exist)
15/03/17 13:58:53 INFO sinks.OrionSink: Finishing transaction (1426597123-891-0000000000)

The MySQL sink does not raise any errors but no tables are created. And the HDFS sink seems to be unable to create the files. I previously installed cygnus 0.6 and it worked with the same configuration.

Edit:

Here its is my configuration:

cygnusagent.sources = http-source
cygnusagent.sinks = hdfs-sink mysql-sink
cygnusagent.channels = hdfs-channel mysql-channel

#=============================================
# source configuration
# channel name where to write the notification events
cygnusagent.sources.http-source.channels = hdfs-channel mysql-channel
# source class, must not be changed
cygnusagent.sources.http-source.type = org.apache.flume.source.http.HTTPSource
# listening port the Flume source will use for receiving incoming notifications
cygnusagent.sources.http-source.port = 5050
# Flume handler that will parse the notifications, must not be changed
cygnusagent.sources.http-source.handler = es.tid.fiware.fiwareconnectors.cygnus.handlers.OrionRestHandler
# URL target
cygnusagent.sources.http-source.handler.notification_target = /notify
# Default service (service semantic depends on the persistence sink)
cygnusagent.sources.http-source.handler.default_service = ubikwa
# Default service path (service path semantic depends on the persistence sink)
cygnusagent.sources.http-source.handler.default_service_path = ubktemp
# Number of channel re-injection retries before a Flume event is definitely discarded (-1 means infinite retries)
cygnusagent.sources.http-source.handler.events_ttl = 10
# Source interceptors, do not change
cygnusagent.sources.http-source.interceptors = ts de
# Timestamp interceptor, do not change
cygnusagent.sources.http-source.interceptors.ts.type = timestamp
# Destination extractor interceptor, do not change
cygnusagent.sources.http-source.interceptors.de.type = es.tid.fiware.fiwareconnectors.cygnus.interceptors.DestinationExtractor$Builder
# Matching table for the destination extractor interceptor, put the right absolute path to the file if necessary
# See the doc/design/interceptors document for more details
cygnusagent.sources.http-source.interceptors.de.matching_table = /opt/cygnus/conf/matching_table.conf

# ============================================
# OrionHDFSSink configuration
# channel name from where to read notification events
cygnusagent.sinks.hdfs-sink.channel = hdfs-channel
# sink class, must not be changed
cygnusagent.sinks.hdfs-sink.type = es.tid.fiware.fiwareconnectors.cygnus.sinks.OrionHDFSSink
# Comma-separated list of FQDN/IP address regarding the Cosmos Namenode endpoints
# If you are using Kerberos authentication, then the usage of FQDNs instead of IP addresses is mandatory
cygnusagent.sinks.hdfs-sink.cosmos_host = 130.206.80.46
# port of the Cosmos service listening for persistence operations; 14000 for httpfs, 50070 for webhdfs and free choice for inifinty
cygnusagent.sinks.hdfs-sink.cosmos_port = 14000
# default username allowed to write in HDFS
cygnusagent.sinks.hdfs-sink.cosmos_default_username = ***
# default password for the default username
cygnusagent.sinks.hdfs-sink.cosmos_default_password = ***
# HDFS backend type (webhdfs, httpfs or infinity)
cygnusagent.sinks.hdfs-sink.hdfs_api = httpfs
# how the attributes are stored, either per row either per column (row, column)
cygnusagent.sinks.hdfs-sink.attr_persistence = column
# Hive FQDN/IP address of the Hive server
cygnusagent.sinks.hdfs-sink.hive_host = 130.206.80.46
# Hive port for Hive external table provisioning
cygnusagent.sinks.hdfs-sink.hive_port = 10000
# Kerberos-based authentication enabling
cygnusagent.sinks.hdfs-sink.krb5_auth = false
# Kerberos username
cygnusagent.sinks.hdfs-sink.krb5_auth.krb5_user = krb5_username
# Kerberos password
cygnusagent.sinks.hdfs-sink.krb5_auth.krb5_password = xxxxxxxxxxxxx
# Kerberos login file
cygnusagent.sinks.hdfs-sink.krb5_auth.krb5_login_conf_file = /usr/cygnus/conf/krb5_login.conf
# Kerberos configuration file
cygnusagent.sinks.hdfs-sink.krb5_auth.krb5_conf_file = /usr/cygnus/conf/krb5.conf

# ============================================
# OrionMySQLSink configuration
# channel name from where to read notification events
cygnusagent.sinks.mysql-sink.channel = mysql-channel
# sink class, must not be changed
cygnusagent.sinks.mysql-sink.type = es.tid.fiware.fiwareconnectors.cygnus.sinks.OrionMySQLSink
# the FQDN/IP address where the MySQL server runs
cygnusagent.sinks.mysql-sink.mysql_host = 127.0.0.1
# the port where the MySQL server listes for incomming connections
cygnusagent.sinks.mysql-sink.mysql_port = 3306
# a valid user in the MySQL server
cygnusagent.sinks.mysql-sink.mysql_username = ***
# password for the user above
cygnusagent.sinks.mysql-sink.mysql_password = ***
# how the attributes are stored, either per row either per column (row, column)
cygnusagent.sinks.mysql-sink.attr_persistence = column

#=============================================
# hdfs-channel configuration
# channel type (must not be changed)
cygnusagent.channels.hdfs-channel.type = memory
# capacity of the channel
cygnusagent.channels.hdfs-channel.capacity = 1000
# amount of bytes that can be sent per transaction
cygnusagent.channels.hdfs-channel.transactionCapacity = 100

#=============================================
# mysql-channel configuration
# channel type (must not be changed)
cygnusagent.channels.mysql-channel.type = memory
# capacity of the channel
cygnusagent.channels.mysql-channel.capacity = 1000
# amount of bytes that can be sent per transaction
cygnusagent.channels.mysql-channel.transactionCapacity = 100

Any hints?

Thanks

2条回答
淡お忘
2楼-- · 2019-02-20 13:08

As Petark suggest, the "column mode" does not automatically creates the tables, and this must be provisioned in advanced by you. Why? The reason is, depending on the subscription you made to Orion CB in order to sent notifications to Cygnus, such notification may include some times a few attrbute updates, some times other very different attributes set, etc.

For instance, let's consider an entity called "car" with attrbutes "speed", "location" and "oil-level". Then you may say "notify Cygnus each time the speed changes, but send only the speed value. But at the same time you may say "notify Cygnus each time the oil level changes, and send all the attribute's value" as well. If the car starts moving, and only the speed and the location change, but not the oil level, then only the speed updates will be notified to Cygnus, which has no chance to know about the other attributes at any time.

Thus, if you want rows of data having all the 3 attributes, then you have to provision the table by yourself. By the way, having such subscription examples will lead to a lot of "speed-value,null,null" rows in your tables.

The differece with the "row mode" is that, independently of the number of notifed attributes, a new row will be added for each notified attribute, having all the rows the same format (entityId,entitytType,attrName,attrType,attrValue,attrMd); these format can be automatically provisioned by Cygnus.

查看更多
仙女界的扛把子
3楼-- · 2019-02-20 13:09

I believe its because you are using parameter column in your configuration for OrionMySQLSink

# how the attributes are stored, either per row either per column (row, column)
cygnusagent.sinks.mysql-sink.attr_persistence = column 

In documentation is stated that when using column database and tables must be created before starting of cygnus. When you using row all 8 rows will be created automatically before first insert.

Within tables, we can find two options:

Fixed 8-field rows, as usual: recvTimeTs, recvTime, entityId, entityType, attrName, attrType, attrValue and attrMd. These tables (and the databases) are created at execution time if the table doesn't exist previously to the row insertion. Regarding attrValue, in its simplest form, this value is just a string, but since Orion 0.11.0 it can be Json object or Json array. Regarding attrMd, it contains a string serialization of the metadata array for the attribute in Json (if the attribute hasn't metadata, an empty array [] is inserted), Two columns per each entity's attribute (one for the value and other for the metadata), plus an addition column about the reception time of the data (recv_time). This kind of tables (and the databases) must be provisioned previously to the execution of Cygnus, because each entity may have a different number of attributes, and the notifications must ensure a value per each attribute is notified.

The behaviour of the connector regarding the internal representation of the data is governed through a configuration parameter, attr_persistence, whose values can be row or column.

查看更多
登录 后发表回答