Spark Python: How to calculate Jaccard Similarity

2020-04-17 04:32发布

I have a table of around 50k distinct rows, and 2 columns. You can think of each row being a movie, and columns being the attributes of that movie - "ID": id of that movie, "Tags":some content tags of the movie, in form of a list of strings for each movie.

Data looks something like this:

movie_1, ['romantic','comedy','English']; movie_2, ['action','kongfu','Chinese']

My goal is to first calculate the jacquard similarity between each Movie based on their corresponding tags, and once that's done, I will be able to know for each movie (for example I choose movie_1), what are the other top 5 most similar movies as with this one (movie_1 in this case). And I want the top 5 results not only for movie_1 itself, but to get the top 5 for all of the movies.

I have tried using Python to solve the problem, however the run time is a big challenge here. Even when I used multiprocessing, running on 6 cores, the total run time still lasted over 20 hours.

Python code below:

import pandas as pd
from collections import Counter
import numpy as np
from multiprocessing import Pool
import time

col_names=['movie_id','tag_name']
df=pd.read_csv("movies.csv",names=col_names)
movie_ids=df['movie_id'].tolist()
tag_list=df['tag_name'].tolist()

def jaccard_similarity(string1, string2):
    intersection = set(string1).intersection(set(string2))
    union = set(string1).union(set(string2))
    return len(intersection)/float(len(union))

def jc_results(movie_id):
    result=Counter()
    this_index=movie_ids.index(movie_id)
    for another_id in movie_ids:
        that_index=movie_ids.index(another_id)
        if another_id==movie_id:
            continue
        else:
            tag_1=tag_list[this_index]
            tag_2=tag_list[that_index]
            jaccard = jaccard_similarity(tag_1,tag_2)
            result[(movie_id,another_id)]=jaccard
    return result.most_common(10)


from multiprocessing import Pool
pool=Pool(6)
results={}
for movie_id in movie_ids:
    results[movie_id]=pool.apply_async(jc_results,args=(movie_id,))
pool.close()
pool.join()
for movie_id, res in results.items():
    results[movie_id] = res.get()

Then I wanted to switch to Pyspark, however I am still very new to spark python, and got stuck after writing a few lines with it, actually I only progress I have made was reading in the data to RDD using sc.textFile...Have read the existing posts but they are all using Scala..It will be great if anyone can help or provide any guidance with Pyspark. Thanks a lot!

1条回答
再贱就再见
2楼-- · 2020-04-17 05:02

You could try a solution similar to this stackoverflow answer, though since your data is already tokenized (a list of strings), you wouldn't need to do that step or the ngram step.

I'll also mention that the approxSimilarityJoin in pyspark calculates the Jaccard Distance rather than the Jaccard Similarity, but you can just subtract from 1 to convert back to the Similarity if you need that in particular.

Your code would end up looking similar to:

from pyspark.ml import Pipeline
from pyspark.ml.feature import HashingTF, MinHashLSH
import pyspark.sql.functions as f

db = spark.createDataFrame([
        ('movie_1', ['romantic','comedy','English']),
        ('movie_2', ['action','kongfu','Chinese']),
        ('movie_3', ['romantic', 'action'])
    ], ['movie_id', 'genres'])


model = Pipeline(stages=[
        HashingTF(inputCol="genres", outputCol="vectors"),
        MinHashLSH(inputCol="vectors", outputCol="lsh", numHashTables=10)
    ]).fit(db)

db_hashed = model.transform(db)

db_matches = model.stages[-1].approxSimilarityJoin(db_hashed, db_hashed, 0.9)

#show all matches (including duplicates)
db_matches.select(f.col('datasetA.movie_id').alias('movie_id_A'),
                 f.col('datasetB.movie_id').alias('movie_id_B'),
                 f.col('distCol')).show()

#show non-duplicate matches
db_matches.select(f.col('datasetA.movie_id').alias('movie_id_A'),
                 f.col('datasetB.movie_id').alias('movie_id_B'),
                 f.col('distCol')).filter('movie_id_A < movie_id_B').show()

With the corresponding output:

+----------+----------+-------+
|movie_id_A|movie_id_B|distCol|
+----------+----------+-------+
|   movie_3|   movie_3|    0.0|
|   movie_1|   movie_3|   0.75|
|   movie_2|   movie_3|   0.75|
|   movie_1|   movie_1|    0.0|
|   movie_2|   movie_2|    0.0|
|   movie_3|   movie_2|   0.75|
|   movie_3|   movie_1|   0.75|
+----------+----------+-------+

+----------+----------+-------+
|movie_id_A|movie_id_B|distCol|
+----------+----------+-------+
|   movie_1|   movie_3|   0.75|
|   movie_2|   movie_3|   0.75|
+----------+----------+-------+
查看更多
登录 后发表回答