How to insert data into druid via tranquility

2019-05-14 16:02发布

问题:

By following tutorial at http://druid.io/docs/latest/tutorials/tutorial-loading-streaming-data.html , I was able to insert data into druid via Kafka console

Kafka console

The spec file looks as following

examples/indexing/wikipedia.spec

[
  {
    "dataSchema" : {
      "dataSource" : "wikipedia",
      "parser" : {
        "type" : "string",
        "parseSpec" : {
          "format" : "json",
          "timestampSpec" : {
            "column" : "timestamp",
            "format" : "auto"
          },
          "dimensionsSpec" : {
            "dimensions": ["page","language","user","unpatrolled","newPage","robot","anonymous","namespace","continent","country","region","city"],
            "dimensionExclusions" : [],
            "spatialDimensions" : []
          }
        }
      },
      "metricsSpec" : [{
        "type" : "count",
        "name" : "count"
      }, {
        "type" : "doubleSum",
        "name" : "added",
        "fieldName" : "added"
      }, {
        "type" : "doubleSum",
        "name" : "deleted",
        "fieldName" : "deleted"
      }, {
        "type" : "doubleSum",
        "name" : "delta",
        "fieldName" : "delta"
      }],
      "granularitySpec" : {
        "type" : "uniform",
        "segmentGranularity" : "DAY",
        "queryGranularity" : "NONE"
      }
    },
    "ioConfig" : {
      "type" : "realtime",
      "firehose": {
        "type": "kafka-0.8",
        "consumerProps": {
          "zookeeper.connect": "localhost:2181",
          "zookeeper.connection.timeout.ms" : "15000",
          "zookeeper.session.timeout.ms" : "15000",
          "zookeeper.sync.time.ms" : "5000",
          "group.id": "druid-example",
          "fetch.message.max.bytes" : "1048586",
          "auto.offset.reset": "largest",
          "auto.commit.enable": "false"
        },
        "feed": "wikipedia"
      },
      "plumber": {
        "type": "realtime"
      }
    },
    "tuningConfig": {
      "type" : "realtime",
      "maxRowsInMemory": 500000,
      "intermediatePersistPeriod": "PT10m",
      "windowPeriod": "PT10m",
      "basePersistDirectory": "\/tmp\/realtime\/basePersist",
      "rejectionPolicy": {
        "type": "messageTime"
      }
    }
  }
]

I start realtime via

java -Xmx512m -Duser.timezone=UTC -Dfile.encoding=UTF-8 -Ddruid.realtime.specFile=examples/indexing/wikipedia.spec -classpath config/_common:config/realtime:lib/* io.druid.cli.Main server realtime

In Kafka console, I paste and enter the following

{"timestamp": "2013-08-10T01:02:33Z", "page": "Good Bye", "language" : "en", "user" : "catty", "unpatrolled" : "true", "newPage" : "true", "robot": "false", "anonymous": "false", "namespace":"article", "continent":"North America", "country":"United States", "region":"Bay Area", "city":"San Francisco", "added": 57, "deleted": 200, "delta": -143}

Then I tend to perform query by creating select.json and run curl -X POST 'http://localhost:8084/druid/v2/?pretty' -H 'content-type: application/json' -d @select.json

select.json

 {
   "queryType": "select",
   "dataSource": "wikipedia",
   "dimensions":[],
   "metrics":[],
   "granularity": "all",
   "intervals": [
     "2000-01-01/2020-01-02"
   ],

   "filter" : {"type":"and",
        "fields" : [
                { "type": "selector", "dimension": "user", "value": "catty" }
        ]
   },

   "pagingSpec":{"pagingIdentifiers": {}, "threshold":500}
 }

I was able to get the following result.

[ {
  "timestamp" : "2013-08-10T01:02:33.000Z",
  "result" : {
    "pagingIdentifiers" : {
      "wikipedia_2013-08-10T00:00:00.000Z_2013-08-11T00:00:00.000Z_2013-08-10T00:00:00.000Z" : 0
    },
    "events" : [ {
      "segmentId" : "wikipedia_2013-08-10T00:00:00.000Z_2013-08-11T00:00:00.000Z_2013-08-10T00:00:00.000Z",
      "offset" : 0,
      "event" : {
        "timestamp" : "2013-08-10T01:02:33.000Z",
        "continent" : "North America",
        "robot" : "false",
        "country" : "United States",
        "city" : "San Francisco",
        "newPage" : "true",
        "unpatrolled" : "true",
        "namespace" : "article",
        "anonymous" : "false",
        "language" : "en",
        "page" : "Good Bye",
        "region" : "Bay Area",
        "user" : "catty",
        "deleted" : 200.0,
        "added" : 57.0,
        "count" : 1,
        "delta" : -143.0
      }
    } ]
  }
} ]

It seem that I had setup Druid correctly.

Now, I would like to insert data via HTTP endpoint. According to How realtime data input to Druid?, it seems like recommended way is to use tranquility

tranquility

I have indexing service started via

java -Xmx2g -Duser.timezone=UTC -Dfile.encoding=UTF-8 -classpath config/_common:config/overlord:lib/*: io.druid.cli.Main server overlord

conf/server.json looks like

{
   "dataSources" : [
      {
         "spec" : {
            "dataSchema" : {
                "dataSource" : "wikipedia",
                "parser" : {
                    "type" : "string",
                    "parseSpec" : {
                      "format" : "json",
                      "timestampSpec" : {
                        "column" : "timestamp",
                        "format" : "auto"
                      },
                      "dimensionsSpec" : {
                        "dimensions": ["page","language","user","unpatrolled","newPage","robot","anonymous","namespace","continent","country","region","city"],
                        "dimensionExclusions" : [],
                        "spatialDimensions" : []
                      }
                    }
                },
                "metricsSpec" : [{
                    "type" : "count",
                    "name" : "count"
                }, {
                    "type" : "doubleSum",
                    "name" : "added",
                    "fieldName" : "added"
                }, {
                    "type" : "doubleSum",
                    "name" : "deleted",
                    "fieldName" : "deleted"
                }, {
                    "type" : "doubleSum",
                    "name" : "delta",
                    "fieldName" : "delta"
                }],
                "granularitySpec" : {
                    "type" : "uniform",
                    "segmentGranularity" : "DAY",
                    "queryGranularity" : "NONE"
                }
            },
            "tuningConfig" : {
               "windowPeriod" : "PT10M",
               "type" : "realtime",
               "intermediatePersistPeriod" : "PT10M",
               "maxRowsInMemory" : "100000"
            }
         },
         "properties" : {
            "task.partitions" : "1",
            "task.replicants" : "1"
         }
      }
   ],
   "properties" : {
      "zookeeper.connect" : "localhost",
      "http.port" : "8200",
      "http.threads" : "8"
   }
}

Then, I start the server using

bin/tranquility server -configFile conf/server.json

I perform post to http://xx.xxx.xxx.xxx:8200/v1/post/wikipedia, with content-type equals application/json

{"timestamp": "2013-08-10T01:02:33Z", "page": "Selamat Pagi", "language" : "en", "user" : "catty", "unpatrolled" : "true", "newPage" : "true", "robot": "false", "anonymous": "false", "namespace":"article", "continent":"North America", "country":"United States", "region":"Bay Area", "city":"San Francisco", "added": 57, "deleted": 200, "delta": -143}

I get the the following respond

{"result":{"received":1,"sent":0}}

It seems that tranquility has received our data, but failed to send it to druid!

I try to run curl -X POST 'http://localhost:8084/druid/v2/?pretty' -H 'content-type: application/json' -d @select.json, but doesn't get the output I inserted via tranquility.

Any idea why? Thanks.

回答1:

This generally happens when the data you send is out of the window period. If you are inserting data manually, give the exact current timestamp (UTC) in milliseconds. Else it can be easily done if you are using any script to generate data. Make sure it is UTC current time.



回答2:

It is extremely difficult to setup druid to work properly with real-time data insertion.

The best bet I found is, use https://github.com/implydata . Imply is a set of wrappers around druid, to make it easy to use.

However, the real-time insertion in imply is not perfect either. I had experiment OutOfMemoryException, after inserting 30 millions items via real-time. This will caused data loss on previous inserted 30 millions rows.

The detailed regarding data loss can be found here : https://groups.google.com/forum/#!topic/imply-user-group/95xpYojxiOg

An issue ticket has been filed : https://github.com/implydata/distribution/issues/8



回答3:

Druid streaming windowPeriod is very short (10 minutes). Outside this period, your event will be ignored.



回答4:

As you got {"result":{"received":1,"sent":0}}, your worker threads are working fine. Tranquility decides what data is sent to the druid based on the timestamp associated with the data.

This period is decided by the "windowPeriod" configuration. So if your type is realtime ("type":"realtime") and window period is PT10M ("windowPeriod" : "PT10M"), tranquility will send any data between t-10, t+10 and not send anything outside this period.

I disagree with the insertion efficiency problems, we have been sending 3million rows every 15 minutes since June 2016 and has been running beautifully. Of course, we have a stronger infrastructure deemed for the scale.



标签: druid