-->

FIWARE Cygnus -> cartodb sinks.NGSISink: Persisten

2019-03-02 06:50发布

问题:

I'm trying to connect cygnus (1.4.0_SNAPSHOT) to cartodb. I run it locally, and I use a script to send a notification to cygnus. The script runs Ok, but cygnus says:

ERROR sinks.NGSISink: Persistence error (The query 'INSERT INTO jcarneroatos.x002fpeoplelocation (recvtime,fiwareservicepath,entityid,entitytype,the_geom) VALUES ('2016-10-31T19:04:00.994Z','/peoplelocation','Person:1','Person',ST_SetSRID(ST_MakePoint({"coordinates":[-4.423032856,36.721290055]), 4326))' could not be executed. CartoDB response: 400 Bad Request)

Anyone knows what could be happening? Below I put my config files for information, thanks!

My username at CARTO is "jcarneroatos", and the domain is https://jcarneroatos.carto.com. This is the script I'm using to simulate the notification from Orion Context Broker:

#/bin/bash
HOST=localhost
PORT=5050
SERVICE=jcarneroatos
SUBSERVICE=/peoplelocation

#send notification
NOTIFICATION=$(\
curl http://$HOST:$PORT/notify \
    -v -s -S \
    --header "Content-Type: application/json; charset=utf-8" \
    --header 'Accept: application/json' \
    --header "Fiware-Service: $SERVICE" \
    --header "Fiware-ServicePath: $SUBSERVICE" \
    -d '
    {
        "contextResponses": [
            {
                "contextElement": {
                    "attributes": [
                        {
                            "metadatas": [
                                {
                                    "name": "location",
                                    "type": "string",
                                    "value": "WGS84"
                                }
                            ],
                            "name": "location",
                            "type": "geo:json",
                            "value": {
                                "coordinates": [
                                    -4.423032856,
                                    36.721290055
                                ],
                                "type": "Point"
                            }
                        }
                    ],
                    "id": "Person:1",
                    "isPattern": "false",
                    "type": "Person"
                },
                "statusCode": {
                    "code": "200",
                    "reasonPhrase": "OK"
                }
            }
        ],
        "originator": "localhost",
        "subscriptionId": "58178396634ded66caac35b2"
    }')
if [ -z "$NOTIFICATION" ]; then
    echo "Ok"
else
    echo $NOTIFICATION
fi

This is the structure of the dataset at cartodb:

x002fpeoplelocation
cartodb_id | the_geom | entityid | entitytype | fiwareservicepath | recvtime
  number   | geometry |  string  |   string   |      string       |   date

This is the cygnus config file:

cygnusagent.sources = http-source
cygnusagent.sinks = cartodb-sink
cygnusagent.channels =cartodb-channel

cygnusagent.sources.http-source.channels = cartodb-channel
cygnusagent.sources.http-source.type = org.apache.flume.source.http.HTTPSource
cygnusagent.sources.http-source.port = 5050
cygnusagent.sources.http-source.handler = com.telefonica.iot.cygnus.handlers.NGSIRestHandler
cygnusagent.sources.http-source.handler.notification_target = /notify
cygnusagent.sources.http-source.handler.default_service = jcarneroatos
cygnusagent.sources.http-source.handler.default_service_path = /peoplelocation
cygnusagent.sources.http-source.interceptors = ts gi
cygnusagent.sources.http-source.interceptors.ts.type = timestamp
cygnusagent.sources.http-source.interceptors.gi.type = com.telefonica.iot.cygnus.interceptors.NGSIGroupingInterceptor$Builder
cygnusagent.sources.http-source.interceptors.gi.grouping_rules_conf_file = /home/cygnus/APACHE_FLUME_HOME/conf/grouping_rules.conf

cygnusagent.sinks.cartodb-sink.type = com.telefonica.iot.cygnus.sinks.NGSICartoDBSink
cygnusagent.sinks.cartodb-sink.channel = cartodb-channel
cygnusagent.sinks.cartodb-sink.enable_grouping = false
cygnusagent.sinks.cartodb-sink.enable_name_mappings = false
cygnusagent.sinks.cartodb-sink.enable_lowercase = false
cygnusagent.sinks.cartodb-sink.data_model = dm-by-service-path
cygnusagent.sinks.cartodb-sink.keys_conf_file = /home/cygnus/APACHE_FLUME_HOME/conf/cartodb_keys.conf
cygnusagent.sinks.cartodb-sink.flip_coordinates = false
cygnusagent.sinks.cartodb-sink.enable_raw = true
cygnusagent.sinks.cartodb-sink.enable_distance = false
cygnusagent.sinks.cartodb-sink.batch_size = 100
cygnusagent.sinks.cartodb-sink.batch_timeout = 30
cygnusagent.sinks.cartodb-sink.batch_ttl = 10
cygnusagent.sinks.cartodb-sink.backend.max_conns = 500
cygnusagent.sinks.cartodb-sink.backend.max_conns_per_route = 100

cygnusagent.channels.cartodb-channel.type = memory
cygnusagent.channels.cartodb-channel.capacity = 1000
cygnusagent.channels.cartodb-channel.transactionCapacity = 100

And finally the cartodb_keys.conf file (without key):

{
   "cartodb_keys": [
      {
         "username": "jcarneroatos",
         "endpoint": "https://jcarneroatos.carto.com",
         "key": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"   
      }
   ]
}

Update: After executing Cygnus in DEBUG mode and check the logs, it seems that CARTO is returning:

{"error":["syntax error at or near \"{\""]}

This the complete log: http://pastebin.com/p9VyUU8n

回答1:

Finally, after offline discussion with @Javi Carnero, we found there was a bug in Cygnus code related to the way Carto differentiates among "enterprise" and "personal" accounts. First ones allow for PostgreSQL schemas per user under the enterprise organization, while second ones have "hardcoded" schemas named public. Since the notified FIWARE service was used as the schema name, Cygnus was not properly working for "personal" accounts.

Such a bug has been fixed:

  • Issue: https://github.com/telefonicaid/fiware-cygnus/issues/1382
  • PR: https://github.com/telefonicaid/fiware-cygnus/issues/1393

At the moment of writting this, the bug is fixed in master branch. By the end of sprint/month Cygnus 1.7.0 will be released, including this fix.

Please observe my previous answer is perfectly valid if you have an "enterprise" account. Anyway, I'll edit it in order to explain this.



回答2:

The problem is geo:json type is not currently supported by NGSICartoDBSink. This sink understands ceratain ways of notifying geolocated attributes, according to Orion Context Broker specification; these ones:

  • Using the geo:point type, and sending the coordinates in the value field with format "latitude, longitude".
  • Using the location metadata, of type string and value WGS84, and sending the coordinates in the value field with format "latitude, longitude".

Please observe:

  • The above options are exclussive, i.e. cannot be used at the same time.
  • The location metadata is deprecated in Orion, nevertheless it can be still used.

While geo:json is supported (I'll start working on that, it could be ready during this sprint/month), I'll recommend you to use the geo:point type.

EDIT 1

I'm adding here an example of Cygnus execution, when receiving a notification involving a geolocated attribute (geo:point type).

Cygnus version:

1.6.0

Cygnus configuration:

cygnus-ngsi.sources = http-source
cygnus-ngsi.sinks = raw-sink
cygnus-ngsi.channels = raw-channel

cygnus-ngsi.sources.http-source.type = org.apache.flume.source.http.HTTPSource
cygnus-ngsi.sources.http-source.channels = raw-channel
cygnus-ngsi.sources.http-source.port = 5050
cygnus-ngsi.sources.http-source.handler = com.telefonica.iot.cygnus.handlers.NGSIRestHandler
cygnus-ngsi.sources.http-source.handler.notification_target = /notify
cygnus-ngsi.sources.http-source.handler.default_service = default
cygnus-ngsi.sources.http-source.handler.default_service_path = /
cygnus-ngsi.sources.http-source.interceptors = ts
cygnus-ngsi.sources.http-source.interceptors.ts.type = timestamp

cygnus-ngsi.sinks.raw-sink.type = com.telefonica.iot.cygnus.sinks.NGSICartoDBSink
cygnus-ngsi.sinks.raw-sink.channel = raw-channel
cygnus-ngsi.sinks.raw-sink.enable_grouping = false
cygnus-ngsi.sinks.raw-sink.keys_conf_file = /usr/cygnus/conf/cartodb_keys.conf
cygnus-ngsi.sinks.raw-sink.swap_coordinates = false
cygnus-ngsi.sinks.raw-sink.enable_raw = true
cygnus-ngsi.sinks.raw-sink.enable_distance = false
cygnus-ngsi.sinks.raw-sink.enable_raw_snapshot = false
cygnus-ngsi.sinks.raw-sink.data_model = dm-by-entity
cygnus-ngsi.sinks.raw-sink.batch_size = 50
cygnus-ngsi.sinks.raw-sink.batch_timeout = 10
cygnus-ngsi.sinks.raw-sink.batch_ttl = 0
cygnus-ngsi.sinks.raw-sink.batch_retries = 5000

cygnus-ngsi.channels.raw-channel.type = com.telefonica.iot.cygnus.channels.CygnusMemoryChannel
cygnus-ngsi.channels.raw-channel.capacity = 1000
cygnus-ngsi.channels.raw-channel.transactionCapacity = 100

Create the table:

$ curl -X GET -G "https://<my_user>.cartodb.com/api/v2/sql?api_key=<my_key>" --data-urlencode "q=CREATE TABLE x002ftestxffffx0043ar1xffffx0043ar (recvTime text, fiwareServicePath text, entityId text, entityType text, speed float, speed_md text, the_geom geometry(POINT,4326))"
{"rows":[],"time":0.005,"fields":{},"total_rows":0}

Script simulating a notification:

$ cat notification.sh
#!/bin/sh

URL=$1

if [ "$2" != "" ]
then
   SERVICE=$2
else
   SERVICE=default
fi

if [ "$3" != "" ]
then
   SERVICE_PATH=$3
else
   SERVICE_PATH=/
fi

curl $URL -v -s -S --header 'Content-Type: application/json; charset=utf-8' --header 'Accept: application/json' --header 'User-Agent: orion/0.10.0' --header "Fiware-Service: $SERVICE" --header "Fiware-ServicePath: $SERVICE_PATH" -d @- 
<<EOF
{
  "subscriptionId" : "51c0ac9ed714fb3b37d7d5a8",
  "originator" : "localhost",
  "contextResponses" : [
    {
      "contextElement" : {
        "attributes" : [
          {
            "name" : "speed",
            "type" : "float",
            "value" : "$6"
          },
          {
            "name" : "the_geom",
            "type" : "geo:point",
            "value" : "$4, $5"
          }
        ],
        "type" : "Car",
        "isPattern" : "false",
        "id" : "Car1"
      },
      "statusCode" : {
        "code" : "200",
        "reasonPhrase" : "OK"
      }
    }
  ]
}
EOF

Script execution:

$ ./notification.sh http://localhost:5050/notify <my_user> /test 40.40 -3.4 120
*   Trying ::1...
* Connected to localhost (::1) port 5050 (#0)
> POST /notify HTTP/1.1
> Host: localhost:5050
> Content-Type: application/json; charset=utf-8
> Accept: application/json
> User-Agent: orion/0.10.0
> Fiware-Service: <my_user>
> Fiware-ServicePath: /test
> Content-Length: 569
> 
* upload completely sent off: 569 out of 569 bytes
< HTTP/1.1 200 OK
< Transfer-Encoding: chunked
< Server: Jetty(6.1.26)
< 
* Connection #0 to host localhost left intact

Cygnus logs upon notification reception:

time=2016-12-02T13:48:27.310UTC | lvl=INFO | corr=eb73b8d5-af9b-48ea-8ce7-ff21edc957f3 | trans=eb73b8d5-af9b-48ea-8ce7-ff21edc957f3 | srv=<my_user> | subsrv=/test | comp=cygnus-ngsi | op=getEvents | msg=com.telefonica.iot.cygnus.handlers.NGSIRestHandler[282] : [NGSIRestHandler] Starting internal transaction (eb73b8d5-af9b-48ea-8ce7-ff21edc957f3)
time=2016-12-02T13:48:27.312UTC | lvl=INFO | corr=eb73b8d5-af9b-48ea-8ce7-ff21edc957f3 | trans=eb73b8d5-af9b-48ea-8ce7-ff21edc957f3 | srv=<my_user> | subsrv=/test | comp=cygnus-ngsi | op=getEvents | msg=com.telefonica.iot.cygnus.handlers.NGSIRestHandler[299] : [NGSIRestHandler] Received data ({  "subscriptionId" : "51c0ac9ed714fb3b37d7d5a8",  "originator" : "localhost",  "contextResponses" : [    {      "contextElement" : {        "attributes" : [          {            "name" : "speed",            "type" : "float",            "value" : "120"          },          {            "name" : "the_geom",            "type" : "geo:point",            "value" : "40.40, -3.4"          }        ],        "type" : "Car",        "isPattern" : "false",        "id" : "Car1"      },      "statusCode" : {        "code" : "200",        "reasonPhrase" : "OK"      }    }  ]})
time=2016-12-02T13:48:36.404UTC | lvl=INFO | corr=eb73b8d5-af9b-48ea-8ce7-ff21edc957f3 | trans=eb73b8d5-af9b-48ea-8ce7-ff21edc957f3 | srv=<my_user> | subsrv=/test | comp=cygnus-ngsi | op=persistRawAggregation | msg=com.telefonica.iot.cygnus.sinks.NGSICartoDBSink[553] : [raw-sink] Persisting data at NGSICartoDBSink. Schema (<my_user>), Table (x002ftestxffffx0043ar1xffffx0043ar), Data (('2016-12-02T13:48:27.381Z','/test','Car1','Car',ST_SetSRID(ST_MakePoint(40.40,-3.4), 4326),'120','[]'))
time=2016-12-02T13:48:38.237UTC | lvl=INFO | corr=eb73b8d5-af9b-48ea-8ce7-ff21edc957f3 | trans=eb73b8d5-af9b-48ea-8ce7-ff21edc957f3 | srv=<my_user> | subsrv=/test | comp=cygnus-ngsi | op=processNewBatches | msg=com.telefonica.iot.cygnus.sinks.NGSISink[514] : Finishing internal transaction (eb73b8d5-af9b-48ea-8ce7-ff21edc957f3)

Getting the data:

$ curl -X GET -G "https://<my_user>.cartodb.com/api/v2/sql?api_key=<my_key>" --data-urlencode "q=select * from x002ftestxffffx0043ar1xffffx0043ar"
{"rows":[{"recvtime":"2016-12-02T13:48:27.381Z","fiwareservicepath":"/test","entityid":"Car1","entitytype":"Car","speed":120,"speed_md":"[]","the_geom":"0101000020E610000033333333333344403333333333330BC0"}],"time":0.001,"fields":{"recvtime":{"type":"string"},"fiwareservicepath":{"type":"string"},"entityid":{"type":"string"},"entitytype":{"type":"string"},"speed":{"type":"number"},"speed_md":{"type":"string"},"the_geom":{"type":"geometry"}},"total_rows":1}

EDIT 2

This answer is only valid if you own an "enterprise" Carto account. Please, see my other answer to this question.