I am having the same problem as here and try to solve it, but I do not know how to properly format the datastore so cygnus will not throw the persistence error.
My orion suscription is this one:
(curl localhost:1026/v1/subscribeContext -s -S --header 'Content-Type: application/json' \
--header 'Accept: application/json' -d @- | python -mjson.tool) <<EOF
{
"entities": [
{
"type": "Event",
"isPattern": "false",
"id": "es-leon-0"
},
{
"type": "Event",
"isPattern": "false",
"id": "es-leon-1"
}
],
"attributes": [
"IdEvent", "IdUser", "Title"
],
"reference": "http://localhost:5050/notify",
"duration": "P1M",
"notifyConditions": [
{
"type": "ONCHANGE",
"condValues": [ ]
}
],
"throttling": "PT5S"
}
EOF
My cygnus config:
ygnusagent.sources = http-source
cygnusagent.sinks = ckan-sink
cygnusagent.channels = ckan-channel
cygnusagent.sources.http-source.channels = ckan-channel
cygnusagent.sources.http-source.type = org.apache.flume.source.http.HTTPSource
cygnusagent.sources.http-source.port = 5050
cygnusagent.sources.http-source.handler = com.telefonica.iot.cygnus.handlers.OrionRestHandler
cygnusagent.sources.http-source.handler.notification_target = /notify
cygnusagent.sources.http-source.handler.default_service = Papel
cygnusagent.sources.http-source.handler.default_service_path = Test
cygnusagent.sources.http-source.handler.events_ttl = 5
cygnusagent.sources.http-source.interceptors = ts gi
cygnusagent.sources.http-source.interceptors.ts.type = timestamp
cygnusagent.sources.http-source.interceptors.gi.type = com.telefonica.iot.cygnus.interceptors.GroupingInterceptor$Builder
cygnusagent.sources.http-source.interceptors.gi.gropuing_rules_conf_file = /Applications/apache-flume-1.4.0-bin/conf/grouping_rules.conf
cygnusagent.channels.ckan-channel.type = memory
cygnusagent.channels.ckan-channel.capacity = 1000
cygnusagent.channels.ckan-channel.transactionCapacity = 100
# ============================================
# OrionCKANSink configuration
# channel name from where to read notification events
cygnusagent.sinks.ckan-sink.channel = ckan-channel
# sink class, must not be changed
cygnusagent.sinks.ckan-sink.type = com.telefonica.iot.cygnus.sinks.OrionCKANSink
# true if the grouping feature is enabled for this sink, false otherwise
cygnusagent.sinks.ckan-sink.enable_grouping = false
# true if lower case is wanted to forced in all the element names, false otherwise
cygnusagent.sinks.hdfs-sink.enable_lowercase = false
# the CKAN API key to use
cygnusagent.sinks.ckan-sink.api_key = xxxxx
# the FQDN/IP address for the CKAN API endpoint
cygnusagent.sinks.ckan-sink.ckan_host = ckan-demo.ckan.io
# the port for the CKAN API endpoint
cygnusagent.sinks.ckan-sink.ckan_port = 80
# Orion URL used to compose the resource URL with the convenience operation URL to query it
cygnusagent.sinks.ckan-sink.orion_url = http://localhost:1026
# how the attributes are stored, either per row either per column (row, column)
cygnusagent.sinks.ckan-sink.attr_persistence = column
# enable SSL for secure Http transportation; 'true' or 'false'
cygnusagent.sinks.ckan-sink.ssl = false
# number of notifications to be included within a processing batch
cygnusagent.sinks.ckan-sink.batch_size = 100
# timeout for batch accumulation
cygnusagent.sinks.ckan-sink.batch_timeout = 60
# number of retries upon persistence error
cygnusagent.sinks.ckan-sink.batch_ttl = 10
Cygnus is receiving right it but then shows the following error:
time=2016-04-21T07:44:57.504CDT | lvl=INFO | trans=1461242686-614-0000000001 | srv=Papel | subsrv=Test | function=getEvents | comp=Cygnus | msg=com.telefonica.iot.cygnus.handlers.OrionRestHandler[231] : Starting transaction (1461242686-614-0000000001)
time=2016-04-21T07:44:57.528CDT | lvl=INFO | trans=1461242686-614-0000000001 | srv=Papel | subsrv=Test | function=getEvents | comp=Cygnus | msg=com.telefonica.iot.cygnus.handlers.OrionRestHandler[258] : Received data ({ "subscriptionId" : "571897360e94f9fa53829885", "originator" : "localhost", "contextResponses" : [ { "contextElement" : { "type" : "Event", "isPattern" : "false", "id" : "es-leon-0", "attributes" : [ { "name" : "IdEvent", "type" : "text", "value" : "1084" }, { "name" : "IdUser", "type" : "text", "value" : "18" }, { "name" : "Title", "type" : "text", "value" : "Papes" } ] }, "statusCode" : { "code" : "200", "reasonPhrase" : "OK" } } ]})
time=2016-04-21T07:44:57.528CDT | lvl=INFO | trans=1461242686-614-0000000001 | srv=Papel | subsrv=Test | function=getEvents | comp=Cygnus | msg=com.telefonica.iot.cygnus.handlers.OrionRestHandler[280] : Event put in the channel, id=2024732986
time=2016-04-21T07:45:50.771CDT | lvl=INFO | trans=1461242686-614-0000000001 | srv=Papel | subsrv=Test | function=persistAggregation | comp=Cygnus | msg=com.telefonica.iot.cygnus.sinks.OrionCKANSink[417] : [ckan-sink] Persisting data at OrionCKANSink (orgName=papel, pkgName=papel_test, resName=es-leon-0_event, data={"recvTime": "2016-04-21T12:44:57.497Z","fiwareServicePath": "Test","entityId": "es-leon-0","entityType": "Event","Title": "Papes"},{"recvTime": "2016-04-21T12:44:57.528Z","fiwareServicePath": "Test","entityId": "es-leon-0","entityType": "Event","IdEvent": "1084","IdUser": "18","Title": "Papes"})
time=2016-04-21T07:45:51.875CDT | lvl=ERROR | trans=1461242686-614-0000000001 | srv=Papel | subsrv=Test | function=processNewBatches | comp=Cygnus | msg=com.telefonica.iot.cygnus.sinks.OrionSink[426] : Runtime error (Cannot persist the data (orgName=papel, pkgName=papel_test, resName=es-leon-0_event))
As said in here, I created the corresponding datastore: http://ckan-demo.ckan.io/dataset/papel-test/resource/8d7cb489-878e-465e-8c8c-60ea537411e0 But don't know how to format it or if the csv is the correct format.
Thanks
*Note: I tried in row mode and all works, but it's not what I want.
**Note: I also found an error in the previewer software changing the title of my column "Title" to the title of the page "CKAN Demo".
EDITED:
I have done what is said in the documentation:
Column: A single row is upserted for all the notified context attributes. This kind of row will contain two fields per each entity's attribute (one for the value, called <attrName>
, and other for the metadata, called <attrName>_md
), plus four additional fields:
- recvTime: UTC timestamp in human-redable format (ISO 8601).
- fiwareServicePath: The notified one or the default one.
- entityId: Notified entity identifier.
- entityType: Notified entity type.
But still have the same error:
time=2016-04-25T05:17:48.790CDT | lvl=ERROR | trans=1461579403-571-0000000000 | srv=Papel | subsrv=Test | function=processNewBatches | comp=Cygnus | msg=com.telefonica.iot.cygnus.sinks.OrionSink[426] : Runtime error (Cannot persist the data (orgName=papel, pkgName=papel_test, resName=es-leon-0_event))
First of all, you'll need a CKAN organization and package/dataset before creating a resource and an associated datastore in order to persist the data.
Creating an organization, let's say in
demo.ckan.org
; the organization name isfrb
, because our entity will be in that FIWARE service:Creating a package/dataset within the above organization; the package name is
frb_test
, because our entity will be in the FIWARE servicefrb
and in the FIWARE service pathtest
:Creating a resource within the above package/dataset (the package ID is given in the response to the above package creation request); the name of the resource is
room1_room
because the entity ID will beroom1
and its typeroom
:Finally, and answering to your question, creating a datastore associated to the above resource and suitable for receiving Cgynus data in column mode (the resource ID is given in the response to the above resource creation request):
Now, Cygnus is able to persist data for an entity with ID
room1
of typeroom
in thefrb
service,test
service path:The insertion can be checked through the CKAN API as well: