Fuzzy logic on big datasets using Python

2019-04-12 20:36发布

问题:

My team has been stuck with running a fuzzy logic algorithm on a two large datasets. The first (subset) is about 180K rows contains names, addresses, and emails for the people that we need to match in the second (superset). The superset contains 2.5M records. Both have the same structure and the data has been cleaned already, i.e. addresses parsed, names normalized, etc.

  • ContactID int,
  • FullName varchar(150),
  • Address varchar(100),
  • Email varchar(100)

The goal is to match values in a row of subset to the corresponding values in superset, so the output would combine the subset and superset and the corresponding similarity percentages for each field (token).

  • ContactID,
  • LookupContactID,
  • FullName,
  • LookupFullName,
  • FullName_Similarity,
  • Address,
  • LookupAddress,
  • Address_Similarity,
  • Email,
  • LookupEmail,
  • Email_Similarity

To simplify and test the code first, we've concatenated strings and we know that the code works on very small superset; however, once we increase the number of records, it gets stuck. We've tried different algorithms, Levenshtein, FuzzyWuzzy, etc. to no avail. The problem, in my opinion, is that Python does it row by row; however, I'm not sure. We've even tried running it on our Hadoop cluster using streaming; however, it has not yielded any positive results.

#!/usr/bin/env python
import sys
from fuzzywuzzy import fuzz
import datetime
import time
import Levenshtein

#init for comparison
with open('normalized_set_record_set.csv') as normalized_records_ALL_file:
# with open('delete_this/xab') as normalized_records_ALL_file:
    normalized_records_ALL_dict = {}
    for line in normalized_records_ALL_file:
        key, value = line.strip('\n').split(':', 1)
        normalized_records_ALL_dict[key] = value
        # normalized_records_ALL_dict[contact_id] = concat_record

def score_it_bag(target_contact_id, target_str, ALL_records_dict):
    '''
    INPUT target_str, ALL_records_dict
    OUTPUT sorted list by highest fuzzy match
    '''
    return sorted([(value_str, contact_id_index_str, fuzz.ratio(target_str, value_str)) 
        for contact_id_index_str, value_str in ALL_records_dict.iteritems()], key=lambda x:x[2])[::-1]

def score_it_closest_match_pandas(target_contact_id, target_str, place_holder_delete):
    '''
    INPUT target_str, ALL_records_dict
    OUTPUT closest match
    '''
    # simply drop this index target_contact_id
    df_score = df_ALL.concat_record.apply(lambda x: fuzz.ratio(target_str, x))

    return df_ALL.concat_record[df_score.idxmax()], df_score.max(), df_score.idxmax()

def score_it_closest_match_L(target_contact_id, target_str, ALL_records_dict_input):
    '''
    INPUT target_str, ALL_records_dict
    OUTPUT closest match tuple (best matching str, score, contact_id of best match str)
    '''
    best_score = 100

    #score it
    for comparison_contactid, comparison_record_str in ALL_records_dict_input.iteritems():
        if target_contact_id != comparison_contactid:
            current_score = Levenshtein.distance(target_str, comparison_record_str)


            if current_score < best_score:
                best_score = current_score 
                best_match_id = comparison_contactid 
                best_match_str = comparison_record_str 

    return (best_match_str, best_score, best_match_id)



def score_it_closest_match_fuzz(target_contact_id, target_str, ALL_records_dict_input):
    '''
    INPUT target_str, ALL_records_dict
    OUTPUT closest match tuple (best matching str, score, contact_id of best match str)
    '''
    best_score = 0

    #score it
    for comparison_contactid, comparison_record_str in ALL_records_dict_input.iteritems():
        if target_contact_id != comparison_contactid:
            current_score = fuzz.ratio(target_str, comparison_record_str)

            if current_score > best_score:
                best_score = current_score 
                best_match_id = comparison_contactid 
                best_match_str = comparison_record_str 

    return (best_match_str, best_score, best_match_id)

def score_it_threshold_match(target_contact_id, target_str, ALL_records_dict_input):
    '''
    INPUT target_str, ALL_records_dict
    OUTPUT closest match tuple (best matching str, score, contact_id of best match str)
    '''
    score_threshold = 95

    #score it
    for comparison_contactid, comparison_record_str in ALL_records_dict_input.iteritems():
        if target_contact_id != comparison_contactid:
            current_score = fuzz.ratio(target_str, comparison_record_str)

            if current_score > score_threshold: 
                return (comparison_record_str, current_score, comparison_contactid)

    return (None, None, None)


def score_it_closest_match_threshold_bag(target_contact_id, target_str, ALL_records_dict):
    '''
    INPUT target_str, ALL_records_dict
    OUTPUT closest match
    '''
    threshold_score = 80
    top_matches_list = []
    #score it
    #iterate through dictionary
    for comparison_contactid, comparison_record_str in ALL_records_dict.iteritems():
        if target_contact_id != comparison_contactid:
            current_score = fuzz.ratio(target_str, comparison_record_str)

            if current_score > threshold_score:
                top_matches_list.append((comparison_record_str, current_score, comparison_contactid))


    if len(top_matches_list) > 0:  return top_matches_list

def score_it_closest_match_threshold_bag_print(target_contact_id, target_str, ALL_records_dict):
    '''
    INPUT target_str, ALL_records_dict
    OUTPUT closest match
    '''
    threshold_score = 80


    #iterate through dictionary
    for comparison_contactid, comparison_record_str in ALL_records_dict.iteritems():
        if target_contact_id != comparison_contactid:

            #score it
            current_score = fuzz.ratio(target_str, comparison_record_str)
            if current_score > threshold_score:
                print target_contact_id + ':' + str((target_str,comparison_record_str, current_score, comparison_contactid))


    pass


#stream in all contacts ie large set
for line in sys.stdin:
    # ERROR DIAG TOOL
    ts = time.time()
    st = datetime.datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S')
    print >> sys.stderr, line, st

    contact_id, target_str = line.strip().split(':', 1)

    score_it_closest_match_threshold_bag_print(contact_id, target_str, normalized_records_ALL_dict)
    # output = (target_str, score_it_closest_match_fuzz(contact_id, target_str, normalized_records_ALL_dict))
    # output = (target_str, score_it_closest_match_threshold_bag(contact_id, target_str, normalized_records_ALL_dict))
    # print contact_id + ':' + str(output)

回答1:

Your approach requires you to make 180,000 * 2,500,000 = 450,000,000,000 comparisons.

450 billion is a lot.

To reduce the number of comparisons, you can first group records that have some features in common, like the first five characters of an address field, or a common token. Then, only compare records that share a feature. This idea is called "blocking" and will usually reduce the number of total comparisons you have to make to something manageable.

The general problem you are trying to solve is called "record linkage." Since you are using python, you might want to look at the dedupe library which provides a comprehensive approach (I am an author of this library).