I am working with PySpark on a huge dataset, where I want to filter the data frame based on strings in another data frame. For example,
dd = spark.createDataFrame(["something.google.com","something.google.com.somethingelse.ac.uk","something.good.com.cy", "something.good.com.cy.mal.org"], StringType()).toDF('domains')
+----------------------------------------+
|domains |
+----------------------------------------+
|something.google.com |
|something.google.com.somethingelse.ac.uk|
|something.good.com.cy |
|something.good.com.cy.mal.org |
+----------------------------------------+
dd1 = spark.createDataFrame(["google.com", "good.com.cy"], StringType()).toDF('gooddomains')
+-----------+
|gooddomains|
+-----------+
|google.com |
|good.com.cy|
+-----------+
I assume that domains
and gooddomains
are valid domain names.
What I want to do is filter out the matching strings in dd
that do not end with dd1
. So in the above example, I want to filter out row 1 and row 3, to end up with
+----------------------------------------+
|domains |
+----------------------------------------+
|something.google.com.somethingelse.ac.uk|
|something.good.com.cy.mal.org |
+----------------------------------------+
My current solution (as shown below) can only account for domains up to 3 'words'. If I were to add say, verygood.co.ac.uk
in dd1
(i.e. whitelist), then It will fail.
def split_filter(x, whitelist):
splitted1 = x.select(F.split(x['domains'], '\.').alias('splitted_domains'))
last_two = splitted1.select(F.concat(splitted1.splitted_domains[F.size(splitted1.splitted_domains)-2], \
F.lit('.'), \
splitted1.splitted_domains[F.size(splitted1.splitted_domains)-1]).alias('last_two'))
last_three = splitted1.select(F.concat(splitted1.splitted_domains[F.size(splitted1.splitted_domains)-3], \
F.lit('.'), \
splitted1.splitted_domains[F.size(splitted1.splitted_domains)-2], \
F.lit('.'), \
splitted1.splitted_domains[F.size(splitted1.splitted_domains)-1]).alias('last_three'))
x = x.withColumn('id', F.monotonically_increasing_id())
last_two = last_two.withColumn('id', F.monotonically_increasing_id())
last_three = last_three.withColumn('id', F.monotonically_increasing_id())
final_d = x.join(last_two, ['id']).join(last_three, ['id'])
df1 = final_d.join(whitelist, final_d['last_two'] == whitelist['domains'], how = 'left_anti')
df2 = df1.join(whitelist, df1['last_three'] == whitelist['domains'], how = 'left_anti')
return df2.drop('id')
I am using Spark 2.3.0 with Python 2.7.5.
If I understand correctly, you just want a left anti join using a simple SQL string matching pattern.
The expression
concat('%', r.gooddomains)
prepends a wildcard tor.gooddomains
.Next, we use
l.domains LIKE concat('%', r.gooddomains)
to find the rows which match this pattern.Finally, specify
how="leftanti"
in order to keep only the rows that don't match.Update: As pointed out in the comments by @user10938362 there are 2 flaws with this approach:
1) Since this only looks at matching suffixes, there are edge cases where this produces the wrong results. For example:
There are two ways to approach this. The first is to modify the
LIKE
expression to handle this. Since we know these are all valid domains, we can check for an exact match or a dot followed by the domain:Similarly, one can use
RLIKE
with a regular expression pattern with a look-behind.2) The larger issue is that, as explained here, joining on a
LIKE
expression will cause a Cartesian Product. Ifdd1
is small enough to be broadcast, then this isn't an issue.Otherwise, you may run into performance issues and will have to try a different approach.
More on the PySparkSQL
LIKE
operator from the Apache HIVE docs:A LIKE B
:Note: This exploits the "trick" of using
pyspark.sql.functions.expr
to pass in a column value as a parameter to a function.Let's extend the
domains
for slightly better coverage:Now... A naive solution, using only Spark SQL primitives, is to simplify your current approach a bit. Since you've stated that it is safe to assume that these are valid public domains, we can define a function like this:
which extract top level domain and first level subdomain:
Now we can outer join:
and filter the result:
This is better than a Cartesian product required for direct join with
LIKE
, but is unsatisfactory to brute-force and in the worst case scenario requires two shuffles - one forjoin
(this can be skipped ifgood_domains
are small enough tobroadcasted
), and the another one forgroup_by
+agg
.Unfortunately Spark SQL doesn't allow custom partitioner to use only one shuffle for both (it is however possible with composite key in RDD API) and optimizer is not smart enough yet, to optimize
join(_, "key1")
and.groupBy("key1", _)
.If you can accept some false negatives you can go probabilistic. First let's build probabilistic counter (here using
bounter
with small help fromtoolz
)next define an user defined function function like this
and filter the
domains
:In Scala this could be done with
bloomFilter
and if needed, shouldn't be hard to call such code from Python.
This might be still not fully satisfying, due to approximate nature. If you require an exact result you can try to leverage redundant nature of the data, for example with trie (here using
datrie
implementation).If
good_domains
are relatively small you can create a single model, in a similar way as in the probabilistic variant:define user defined function:
and apply it to the data:
This specific approach works under assumption that all
good_domains
can be compressed into a single trie, but can be easily extended to handle cases where this assumption is not satisfied. For example you can build a single trie per top level domain or suffix (as defined in the naive solution)and then, either load models on demand from serialized version, or use
RDD
operations.The two non-native methods can be further adjusted depending on the data, business requirements (like false negative tolerance in case of approximate solution) and available resources (driver memory, executor memory, cardinality of
suffixes
, access to distributed POSIX-compliant distributed file system, and so on). There also some trade-offs to consider when choosing between applying these onDataFrames
andRDDs
(memory usage, communication and serialization overhead).* See Understanding treeReduce() in Spark