I'm new to java and Trident , I imported project for getting tweets but i want to get something How this code get more than one tweet as i got form the code that tuple.getValue(0);
means first tweet only ?!
Problem with me to get all tweets in hashset or hashmap to get total number of distnictive words in each tweet
public void execute(TridentTuple tuple, TridentCollector collector) {
this method is used to execute equations on tweet
public Values getValues(Tweet tweet, String[] words){
}
This code got first tweet then get body of it ,converting it to array of string , i know what i need to solve but couldn't write it well
My Think : Make for loop like
for (int i=0;i<10;i++)
{
Tweet tweet = (Tweet) tuple.getValue(i);
}
If the word already exists in the set, remove it from the set.
The "problem" is a miss-match between "get the count of distinct words over all tweets" and Strom as a stream processor. The query you want to answer can only be computed on a finite set of Tweets. However, in stream processing you process an potential infinite stream of input data.
If you have a finite set of Tweets, you might want to use a batch processing framework such as Flink, Spark, or MapReduce. If you indeed have an infinite number of Tweets, you must rephrase your question...
As you mentioned already, you actually want to "loop over all Tweets". As you so stream processing, there is no such concept. You have an infinite number of input tuples, and Storm applies
execute()
on each of those (ie, you can think of it as if Storm "loops over the input" automatically -- even in "looping" is not the correct term for it). As your computation is "over all Tweets" you would need to maintain a state in your Bolt code, such that you can update this state for each Tweet. The simples form of a state in Storm would be member variable in your Bolt class.Right now, this code does not emit anything, because it is unclear what you actually want to emit? As in stream processing, there is no end, you cannot say "emit the final count of words, contained in
allWords
". What you could do, it to emit the current count after each update... For this, addcollector.emit(new Values(this.allWords.size()));
at the end ofexecute()
.Furthermore, I want to add, that the presented solution only works correctly, if no parallelism is applied to
MyBolt
-- otherwise, the different sets over the instances might contain the same word. To resolve this, it would be required to tokenize each Tweet into its words in a stateless Bolt and feet this streams of words into an adoptedMyBolt
that uses an internalSet
as state. The input data forMyBolt
must also receive the data viafieldsGrouping
to ensure distinct sets of words on each instance.