Need Time-Efficient Method of Importing Large CSV

2019-01-18 06:38发布

问题:

Okay, I'm having some serious issues here. I'm new to this site, and new to dealing with importing CSV data via PHP, but I'm not new to programming.

Currently, I'm working on building a Customer Relationship Manager. I need to create a script to import a file that will populate the database with leads. The main issue here is that the Lead data consists of Companies and Employees of said Company. Also, a few other tables are split off, such as billing information, from the main tables.

I have a working script that will allow users to map the imported data to specific rows and columns.

function mapData($file) {
    // Open the Text File
    $fd = fopen($file, "r");

    // Return FALSE if file not found
    if(!$fd) {
        return FALSE;
    }

    // Get the First Two Lines
    $first = 0;
    $data = array();
    while(!feof($fd)) {
        if($first == 0) {
            $cols = fgetcsv($fd, 4096);
            $data['cols'] = array();
            if(is_array($cols) && count($cols)) {
                foreach($cols as $col) {
                    if(!$col) {
                        continue;
                    }
                    $data['cols'][] = $col;
                }
            }
            if(empty($data['cols'])) {
                return array();
            }
            $first++;
            continue;
        }
        else {
            $data['first'] = fgetcsv($fd, 4096);
            break;
        }
    }
    fclose($fd);

    // Return Data
    return $data;
}

The above script only activates after CodeIgniter moves the file to a working directory. I already know what the file name is by this point. The file goes in and returns the list of columns and the first row. Any empty columns are ignored.

After this, process passes to a mapping script. Once mapping is done and "Import" is pressed, this piece of code loads.

function importLeads($file, $map) {
    // Open the Text File
    if(!file_exists($file)) {
        return false;
    }
    error_reporting(E_ALL);
    set_time_limit(240);
    ini_set("memory_limit", "512M");
    $fd = fopen($file, "r");

    // Return FALSE if file not found
    if(!$fd) {
        return FALSE;
    }

    // Traverse Each Line of the File
    $true = false;
    $first = 0;
    while(!feof($fd)) {
        if($first == 0) {
            $cols = fgetcsv($fd);
            $first++;
            continue;
        }

        // Get the columns of each line
        $row = fgetcsv($fd);

        // Traverse columns
        $group = array();
        $lead_status = array();
        $lead_type = array();
        $lead_source = array();
        $user = array();
        $user_cstm = array();
        $user_prof = array();
        $acct = array();
        $acct_cstm = array();
        $acct_prof = array();
        $acct_group = array();
        if(!$row) {
            continue;
        }
        foreach($row as $num => $val) {
            if(empty($map[$num])) {
                continue;
            }
            $val = str_replace('"', """, $val);
            $val = str_replace("'", "'", $val);
            switch($map[$num]) {
            // Company Account
            case "company_name":
                $acct['company_name'] = $val;
                break;
            case "lead_type":
                $lead_type['name'] = $val;
                break;
            case "lead_source":
                $lead_source['name'] = $val;
                break;
            case "lead_source_description":
                $lead_source['name'] = $val;
                break;
            case "campaign":
                $campaign['name'] = $val;
                break;
            case "mcn":
                $acct['mcn'] = $val;
                break;
            case "usdot":
                $acct['usdot'] = $val;
                break;
            case "sic_codes":
                $acct_cstm['sic_codes'] = $val;
                break;
            case "naics_codes":
                $acct_cstm['naics_codes'] = $val;
                break;
            case "agent_assigned":
                $acct_cstm['agent_assigned'] = $val;
                break;
            case "group_assigned":
                $group['name'] = $val;
                break;
            case "rating":
                $acct_cstm['rating'] = $val;
                break;
            case "main_phone":
                $acct['phone'] = $val;
                break;
            case "billing_phone":
                $acct_cstm['billing_phone'] = $val;
                break;
            case "company_fax":
                $acct['fax'] = $val;
                break;
            case "company_email":
                $acct['email2'] = $val;
                break;

            // Company Location
            case "primary_address":
                $acct['address'] = $val;
                break;
            case "primary_address2":
                $acct['address2'] = $val;
                break;
            case "primary_city":
                $acct['city'] = $val;
                break;
            case "primary_state":
                $acct['state'] = $val;
                break;
            case "primary_zip":
                $acct['zip'] = $val;
                break;
            case "primary_country":
                $acct['country'] = $val;
                break;
            case "billing_address":
                $billing['address'] = $val;
                break;
            case "billing_address2":
                $billing['address2'] = $val;
                break;
            case "billing_city":
                $billing['city'] = $val;
                break;
            case "billing_state":
                $billing['state'] = $val;
                break;
            case "billing_zip":
                $billing['zip'] = $val;
                break;
            case "billing_country":
                $billing['country'] = $val;
                break;
            case "company_website":
                $acct_cstm['website'] = $val;
                break;
            case "company_revenue":
                $acct_cstm['revenue'] = $val;
                break;
            case "company_about":
                $acct_prof['aboutus'] = $val;
                break;

            // Misc. Company Data
            case "bols_per_mo":
                $acct_cstm['approx_bols_per_mo'] = $val;
                break;
            case "no_employees":
                $acct_cstm['no_employees'] = $val;
                break;
            case "no_drivers":
                $acct_prof['drivers'] = $val;
                break;
            case "no_trucks":
                $acct_prof['power_units'] = $val;
                break;
            case "no_trailers":
                $acct_cstm['no_trailers'] = $acct_prof['trailers'] = $val;
                break;
            case "no_parcels_day":
                $acct_cstm['no_parcels_day'] = $val;
                break;
            case "no_shipping_locations":
                $acct_cstm['no_shipping_locations'] = $val;
                break;
            case "approves_inbound":
                $acct_cstm['approves_inbound'] = $val;
                break;
            case "what_erp_used":
                $acct_cstm['what_erp_used'] = $val;
                break;
            case "birddog":
                $acct_cstm['birddog_referral'] = $val;
                break;
            case "status_notes":
                $acct_cstm['status_notes'] = $val;
                break;
            case "notes":
                $acct_cstm['notes'] = $val;
                break;
            case "internal_notes":
                $acct_cstm['notes_internal'] = $val;
                break;

            // User Data
            case "salutation":
                $user_cstm['salutation'] = $val;
                break;
            case "first_name":
                $user['first_name'] = $billing['first_name'] = $val;
                break;
            case "last_name":
                $user['last_name'] = $billing['last_name'] = $val;
                break;
            case "user_title":
                $user_prof['title'] = $val;
                break;
            case "user_about":
                $user_prof['about'] = $val;
                break;
            case "user_email":
                $user['email'] = $val;
                break;
            case "home_phone":
                $user_prof['phone'] = $val;
                break;
            case "mobile_phone":
                $user_cstm['mobile_phone'] = $val;
                break;
            case "direct_phone":
                $user_cstm['direct_phone'] = $val;
                break;
            case "user_fax":
                $user_prof['fax'] = $val;
                break;
            case "user_locale":
                $user['location'] = $val;
                break;
            case "user_website":
                $user_prof['website_url'] = $val;
                break;
            case "user_facebook":
                $user_prof['fb_url'] = $val;
                break;
            case "user_twitter":
                $user_prof['twitter_url'] = $val;
                break;
            case "user_linkedin":
                $user_prof['linkedin_url'] = $val;
                break;
            }
        }
        if(empty($acct['company_name']) || empty($user['first_name']) || empty($user['last_name'])) {
            continue;
        }
        $this->db = $this->load->database('crm_db', TRUE);
        if(isset($lead_type['name']) && ($name = $lead_type['name'])) {
            $count = $this->db->count_all("lead_types");
            $check = $this->db->get_where("lead_types", array("name" => $name));
            if($check->num_rows() < 1) {
                $this->db->insert("lead_types", array("name" => $name, "order" => $count));
                $ltype = $this->db->insert_id();
                $acct_cstm['lead_type'] = $acct['account_type'] = $user['company_type'] = $ltype;
            }
        }
        if(isset($lead_source['name']) && ($name = $lead_source['name'])) {
            $count = $this->db->count_all("lead_sources");
            $check = $this->db->get_where("lead_sources", array("name" => $name));
            if($check->num_rows() < 1) {
                $this->db->insert("lead_sources", array("name" => $name, "order" => $count));
                $acct_cstm['lead_source'] = $this->db->insert_id();
            }
        }
        if(isset($campaign['name']) && ($name = $campaign['name'])) {
            $check = $this->db->get_where("campaigns", array("name" => $name));
            if($check->num_rows() < 1) {
                $campaign['id'] = $accounts_cstm['campaign'] = $this->Secure_m->generate_sugar_id();
                $campaign['date_entered'] = time();
                $campaign['date_modified'] = time();
                $campaign['modified_user_id'] = $this->session->userdata('id');
                $campaign['created_by'] = $this->session->userdata('id');
                $this->db->insert("campaigns", $campaign);
            }
        }
        if(isset($group['name']) && ($name = $group['name'])) {
            $order = $this->db->count_all("groups");
            $check = $this->db->get_where("groups", array("name" => $name));
            if($check->num_rows() < 1) {
                $this->db->insert("groups", array("name" => $name, "order" => $order));
                $acct_group['id'] = $this->db->insert_id();
            }
        }
        $mem = new stdclass;
        $uid = 0;
        if(is_array($user) && count($user)) {
            $where = "";
            if(!empty($user['phone'])) {
                $where .= "prof.phone = '{$user['phone']}' OR ";
                $where .= "cstm.mobile_phone = '{$user['phone']}' OR ";
                $where .= "cstm.direct_phone = '{$user['phone']}'";
            }
            if(!empty($user['mobile_phone'])) {
                if($where) {
                    $where .= " OR ";
                }
                $where .= "prof.phone = '{$user['mobile_phone']}' OR ";
                $where .= "cstm.mobile_phone = '{$user['mobile_phone']}' OR ";
                $where .= "cstm.direct_phone = '{$user['mobile_phone']}'";
            }
            if(!empty($user['direct_phone'])) {
                if($where) {
                    $where .= " OR ";
                }
                $where .= "prof.phone = '{$user['direct_phone']}' OR ";
                $where .= "cstm.mobile_phone = '{$user['direct_phone']}' OR ";
                $where .= "cstm.direct_phone = '{$user['direct_phone']}'";
            }
            $query = $this->db->query($this->Account_m->userQuery($where));
            $mem = reset($query->result());
            if($where && !empty($mem->id)) {
                $uid = $mem->id;
                $new = array();
                foreach($user as $k => $v) {
                    if(!empty($mem->$k)) {
                        $new[$k] = $mem->$k;
                        unset($user[$k]);
                    }
                    else {
                        $new[$k] = $v;
                    }
                }
                //$this->db->update("leads", $user, array("id" => $uid));
                $user = $new;
            }
            else {
                $user['uxtime'] = time();
                $user['isclient'] = 0;
                $user['flag'] = 0;
                $user['activation_code'] = $this->Secure_m->generate_activate_id();
                $uid = $this->Secure_m->generate_activate_id(10);
                $query = $this->db->get_where("leads", array("id" => $uid), 1);
                $data = reset($query->result());
                while(!empty($data->id)) {
                    $uid = $this->Secure_m->generate_activate_id(10);
                    $query = $this->db->get_where("leads", array("id" => $uid), 1);
                    $data = reset($query->result());
                }
                $user['id'] = $uid;
                $this->db->insert("leads", $user);
            }
        }
        if($uid && is_array($user_prof) && count($user_prof)) {
            if(!empty($mem->uid)) {
                $new = array();
                foreach($user_prof as $k => $v) {
                    if(!empty($mem->$k)) {
                        $new[$k] = $mem->$k;
                        unset($user_prof[$k]);
                    }
                    else {
                        $new[$k] = $v;
                    }
                }
                //$this->db->update("mprofiles", $user_prof, array("uid" => $uid));
                $user_prof = $new;
            }
            else {
                $user_prof['uid'] = $uid;
                $user_prof['flag'] = 0;
                $this->db->insert("ldetails", $user_prof);
            }
        }
        if($uid && is_array($user_cstm) && count($user_cstm)) {
            $query = $this->db->get_where("leads_cstm", array("crm_id" => $cid), 1);
            $data = reset($query->result());
            if(!empty($data->crm_id)) {
                $new = array();
                foreach($user_cstm as $k => $v) {
                    if(!empty($mem->$k)) {
                        $new[$k] = $mem->$k;
                        unset($user_cstm[$k]);
                    }
                    else {
                        $new[$k] = $v;
                    }
                }
                //$this->db->update("leads_cstm", $acct_prof, array("fa_user_id" => $cid));
                $user_cstm = $new;
            }
            else {
                $user_cstm['crm_id'] = $uid;
                $user_cstm['date_entered'] = time();
                $user_cstm['date_modified'] = time();
                $user_cstm['created_by'] = $this->session->userdata('id');
                $user_cstm['modified_user_id'] = $this->session->userdata('id');
                $this->db->insert("leads_cstm", $user_cstm);
            }
        }
        $cmp = new stdclass;
        $cid = 0;
        if(is_array($acct) && count($acct)) {
            $acct['uid'] = $uid;
            $acct['main_contact'] = "{$user['first_name']} {$user['last_name']}";
            if(!empty($user['email'])) {
                $acct['email'] = $user['email'];
            }
            $acct['isprospect'] = 0;
            $acct['flag'] = 0;
            if(!empty($acct['mcn'])) {
                $where .= "fms.mcn = '{$acct['mcn']}'";
            }
            if(!empty($acct['phone'])) {
                if($where) {
                    $where .= " OR ";
                }
                $where .= "fms.phone = '{$acct['phone']}' OR ";
                $where .= "crm.billing_phone = '{$acct['phone']}'";
            }
            if(!empty($acct['billing_phone'])) {
                if($where) {
                    $where .= " OR ";
                }
                $where .= "fms.phone = '{$acct['billing_phone']}' OR ";
                $where .= "crm.billing_phone = '{$acct['billing_phone']}'";
            }
            if(!empty($acct['company_name'])) {
                if($where) {
                    $where .= " OR ";
                }
                $where .= "fms.company_name = '{$acct['company_name']}'";
            }
            $query = $this->db->query($this->Account_m->acctQuery($where));
            $cmp = reset($query->result());
            if($where && !empty($cmp->id)) {
                $cid = $cmp->id;
                $new = array();
                foreach($acct as $k => $v) {
                    if(!empty($cmp->$k)) {
                        $new[$k] = $cmp->$k;
                        unset($acct[$k]);
                    }
                    else {
                        $new[$k] = $v;
                    }
                }
                //$this->db->update("accounts", $billing, array("cid" => $cid));
                $acct = $new;
            }
            else {
                $cid = $this->Secure_m->generate_activate_id(10);
                $query = $this->db->get_where("leads", array("id" => $uid), 1);
                $data = reset($query->result());
                while(!empty($data->id)) {
                    $cid = $this->Secure_m->generate_activate_id(10);
                    $query = $this->db->get_where("accounts", array("id" => $cid), 1);
                    $data = reset($query->result());
                }
                $acct['id'] = $cid;
                $this->db->insert("accounts", $acct);
            }
        }
        if($cid && is_array($acct_group) && count($acct_group)) {
            $grp = $this->db->get_where("accounts_groups", array("cid" => $cid, "gid" => $acct_group['id']));
            if(empty($cmp->id)) {
                $acct_group['cid'] = $cid;
                $this->db->insert("accounts_groups", $acct_group);
            }
        }
        if($cid && is_array($acct_prof) && count($acct_prof)) {
            if(!empty($cmp->id)) {
                $new = array();
                foreach($acct_prof as $k => $v) {
                    if(!empty($cmp->$k)) {
                        $new[$k] = $cmp->$k;
                        unset($acct_prof[$k]);
                    }
                    else {
                        $new[$k] = $v;
                    }
                }
                //$this->db->update("cprofiles", $acct_prof, array("cid" => $cid));
                $acct_prof = $new;
            }
            else {
                $acct_prof['cid'] = $cid;
                $acct_prof['flag'] = 0;
                $this->db->insert("adetails", $acct_prof);
            }
        }
        if($cid && is_array($billing) && count($billing)) {
            $bill = $this->db->get_where("accounts_billing", array("cid" => $cid));
            if(!empty($bill->id)) {
                $new = array();
                foreach($acct_prof as $k => $v) {
                    if(!empty($cmp->$k)) {
                        $new[$k] = $cmp->$k;
                        unset($acct_prof[$k]);
                    }
                    else {
                        $new[$k] = $v;
                    }
                }
                //$this->db->update("accounts_billing", $billing, array("cid" => $cid));
            }
            else {
                $billing['cid'] = $cid;
                $billing['flag'] = 0;
                $this->db->insert("accounts_billing", $billing);
            }
        }
        if($cid && $uid) {
            $this->db->update("leads", array("cid" => $cid), array("id" => $uid));
        }
        if($cid && is_array($acct_cstm) && count($acct_cstm)) {
            $query = $this->db->get_where("accounts_cstm", array("crm_id" => $cid), 1);
            $data = reset($query->result());
            if(!empty($data->crm_id)) {
                $new = array();
                foreach($acct_cstm as $k => $v) {
                    if(!empty($cmp->$k)) {
                        $new[$k] = $cmp->$k;
                        unset($acct_cstm[$k]);
                    }
                    else {
                        $new[$k] = $v;
                    }
                }
                //$this->db->update("accounts_cstm", $acct_cstm, array("crm_id" => $cid));
                $acct_cstm = $new;
            }
            else {
                $acct_cstm['crm_id'] = $cid;
                $acct_cstm['date_entered'] = time();
                $acct_cstm['date_modified'] = time();
                $acct_cstm['created_by'] = $this->session->userdata('id');
                $acct_cstm['modified_user_id'] = $this->session->userdata('id');
                if(empty($acct_cstm['rating'])) {
                    $acct_cstm['rating'] = 1;
                }
                $this->db->insert("accounts_cstm", $acct_cstm);
            }
        }
        $true = TRUE;
    }
    fclose($fd);

    return $true;
}

Now, as far as I can see, the script works perfectly fine. There's nothing wrong with the actual code itself. The problem is that after around 400-500 rows, the script just stops. I don't receive an error, but no further code is processed.

I know this because I have code after this that is supposed to return a redirect page through AJAX. Nothing after my loop in the importLeads function ever loads, though.

I'm not sure how to make this script more efficient... I'm positive it is timing out, but I don't know how to make it run more efficiently. I NEED this script to process all the information above separately. I have a variety of separate tables that all link together, and this import script has to set everything up in different ways.

I've talked with my client about this project. This script works when I drop it to around 400 rows. He has some a lot of these CSV files that are around 75,000 rows. The one I am importing is a smaller one, only about 1,200 rows.

I've tried looking into alternate methods, such as MySQL's import script, but I can't do that because this script must import data into separate tables, and it must check for existing data first. I'm also supposed to have all empty fields update with imported information, but that will make this even worse.

If anyone knows of a more efficient method it would be much appreciated. I tried to be as detailed as I could. Of note, I will mention that I'm using CodeIgniter, but if there's a more efficient way that doesn't use CodeIgniter I'll take it (I can still put it into a CI model, though).

回答1:

I have written PHP scripts to bulk-load the data published by Stack Overflow data dump. I import millions of rows and it doesn't take that long.

Here are some tips:

  • Don't rely on autocommit. The overhead of starting and committing a transaction for every row is enormous. Use explicit transactions, and commit after every 1000 rows (or more).

  • Use prepared statements. Since you are basically doing the same inserts thousands of times, you can prepare each insert before you start looping, and then execute during the loop, passing values as parameters. I don't know how to do this with CodeIgniter's database library, you'll have to figure it out.

  • Tune MySQL for import. Increase cache buffers and so on. See Speed of INSERT Statements for more information.

  • Use LOAD DATA INFILE. If possible. It's literally 20x faster than using INSERT to load data row by row. I understand if you can't because you need to get the last insert id and so on. But in most cases, even if you read the CSV file, rearrange it and write it out to multiple temp CSV files, the data load is still faster than using INSERT.

  • Do it offline. Don't run long-running tasks during a web request. The time limit of a PHP request will terminate the job, if not today then next Tuesday when the job is 10% longer. Instead, make the web request queue the job, and then return control to the user. You should run the data import as a server process, and periodically allow the user to glimpse the rate of progress. For instance, a cheap way to do this is for your import script to output "." to a temp file, and then the user can request to view the temp file and keep reloading in their browser. If you want to get fancy, do something with Ajax.



回答2:

To efficiently import data in MySQL you have to use LOAD DATA INFILE. It will make a huge difference in performance.

If you need to pre-process your data, do it with the above script, then export back to CSV/TSV and use LOAD DATA queries to finally import into your database.

Your script is not going beyond 500 rows because it is most likely reaching the PHP execution time limit. You can use the set_time_limit() function to give your script no time limit at all, in which case you have to call set_time_limit(0) at the start of your script.



回答3:

One other item I HAVE to bring up, is this code NEEDS the following to happen:

currently where you do this :

foreach($row as $num => $val) {
        if(empty($map[$num])) {
            continue;
        }
        $val = str_replace('"', "&#34;", $val);
        $val = str_replace("'", "&#39;", $val);
        switch($map[$num]) {
        // Company Account
        case "company_name":
            $acct['company_name'] = $val;
            break;

You need to change the switch/case to do this instead:

1) create a data map of your mapped fields. The data map should have the correct array that the field maps to, as well as the index of that array. For example:

$dataMap['company_name'] = array($acct, 'company_name');
$dataMap['lead_type']    = array($lead_type, 'name');
.
.
.
$dataMap['bols_per_mo']  = array($acct_cstm, 'approx_bols_per_mo');
.
.
.

And so on

Then 2) Replace your massive switch statement with this simple code snippet:

foreach($row as $num => $val) {
        if(empty($map[$num])) {
            continue;
        }
        $val = str_replace('"', "&#34;", $val);
        $val = str_replace("'", "&#39;", $val);
        $mappingRecord = $dataMap[ $map[$num] ];

        //The first element is the array the data should go in 
        $destinationArray = $mappingRecord[0];

        //the second element is the index of the array it should go in
        $destinationArray[$mappingRecord[1]] = $val;


回答4:

load the raw csv file data into staging (temporary) tables using load data infile method which is nice and fast:

set autocommit = 0;

load data infile..
load data infile..
...

commit;

once the data is loaded run your data cleansing, mapping and validation stored procedures etc:

call cleanse_staging_data();
call map_staging_data();
call validate_staging_data();

once the data has been processed copy the data from the staging tables into correct data tables:

call copy_staging_to_production();

or something like that.



回答5:

Are you hitting the PHP script time limit loading your big files?

Try this:

set_time_limit(0);

to disable the default 30-second time limit. There's a server wide limit you can retrieve using max_execution_time(). If the server wide limit isn't long enough for this bulk load job, you'll need to figure out how to get your local server wrangler to change it or do your upload some other way.



回答6:

I had to do something similar on occasion. There are a couple problems you're potentially running into:

  1. PHP script timeout – After a pre-defined period of time PHP will automatically kill the process. You can disable this in the php.ini file or with set_time_limit(0). Some hosts disable this method, though, and some hosts have separate proc watches setup to kill processes that have run for a set period of time to keep the server from being taken down.
  2. Memory limit – PHP will let you set a maximum memory limit in that same php.ini. If it hits it it'll trigger a fatal error and die. You can see this in the error log, but nothing will be output to the browser.
  3. MySQL Query Overhead – As others have pointed out, there's a lot of overhead to each individual query. I didn't have enough rows to justify throwing load data infile into the storm. I didn't need to get results for each individual query, either, so I just put them all in a single call and fired it off to a single mysql_query (which you can do from your controller in CodeIgniter like so: mysql_query($sql, $this->db->conn_id); since it'll throw a fit if you pass that to DB::query).

Bill Karwin pointed out a lot of good ways to optimize when you're dealing with very large data sets, but if you're having trouble at ~400 rows I don't think it'll do you much good just yet. Check your error logs, get the problem fixed, and then work on optimizing it.