Filtering RDD Based on condition and extracting ma

2019-01-20 19:08发布

问题:

I have the data like,

cl_id      cn_id        cn_value
10004,     77173296     ,390.0
10004,     77173299     ,376.0
10004,     77173300     ,0.0
20005,     77173296     ,0.0
20005,     77173299     ,6.0
2005,      77438800     ,2.0

Cl_id IDs: 10004 ,20005

Filter by 10004

10004,     77173296     ,390.0
10004,     77173299     ,376.0

Filter by 20005

20005,    77173296    ,0.0
20005,    77173299     ,6.0

Now I want the return RDD like,

10004,cn_id,x1(77173296.value,77173300.value) ==> 10004,77173296,390.0,376.0
20005,cn_id,x1(77173296.value,77173300.value) ==> 20005,77173296,0.0,6.0 

And I want to perform some operation on this return_RDD:

 def cal_for(rdd_list):
     #list.map(position1).filter(cn_id for this formula)-> calculate that formula -> store in a separate RDD -> Return that RDD

     rdd_list = rdd_list.map(lambda line:line.split(','))
     new_list = rdd_list.map(lambda x: (x[0]+', '+x[1],float(x[2])))
     new_list = rdd_list.filter(lambda x: x[1] == '77173296' && x[1] ==  '77173299')
     ## then  get the  RDD containing respective cn_values for cn_id 77173296 & cn_id 77173299
     ## and apply the following formula whre a=77173296.value b=77173299.value for cl_id 1004

    try:
        # want to process RDD with this  Formula
        return ((float(a)/float(a+b))*100)
    except ZeroDivisionError:
        return 0

#return or save cal_RDD

回答1:

Instead of filtering the RDD twice, modifying and recombining the resulting RDDs, simply group by id, then map over the values to make any changes you need. If you want to further limit the results based on some criteria, then perform a filter while mapping.

I can't really give you a more precise answer as:

a) It doesn't look like you've really tried to implement this yet, and b) I'm not entirely certain what you want.