What's the best way to compare / insert / upda

2019-05-12 18:06发布

问题:

At our company we pull a .CSV file from the suppliers FTP server and update our product data (price, stock,..) each morning.

We wrote a cron for this task as it should run automatically.

The current script is working in most cases. However, sometimes we recieve an error: 'Allowed memory size of 134217728 bytes exhausted (tried to allocate 75 bytes)'.

We use CodeIgniter with DataMapper ORM. A possible design error might be the fact that the script is working with objects instead of array's...

Each time 49000 rows are checked.

Can anyone help us find another way of doing this?


The following script is the function that runs after the files are copied.

// Include auth connection params
$udb = $this->_completeParams($db);
// Check if an update was downloaded
$supplier = new Supplier(NULL,$udb);
$supplier->where(array('alias'=>'XX','name'=>'xxxxxxxxx'))->get(1);

$cronStart = date('Y-m-d H:i:s');
$cronStartDate = date('Y-m-d');
//mail($this->adminMail, 'CRON', 'Gestart:' .$cronStart, $this->headerMail);

//$message .= '1: '.memory_get_usage()."\r\n";
if($supplier->import_found) {
//if(true) {
    $rows = 0;
    $updated = 0;
    $new = 0;

    //$aAvailable = array();

    $message .= '<h3>Start: '.$cronStart.'</h3>' . "\r\n";

    $object = new Supplier_product(NULL,$udb);
    $cat = new Supplier_category(NULL, $udb);
    $manu = new Supplier_manufacturer(NULL, $udb);

    $auvibel = new Supplier_auvibel(NULL, $udb);
    $bebat = new Supplier_bebat(NULL, $udb);
    $recupel = new Supplier_recupel(NULL, $udb);
    $reprobel = new Supplier_reprobel(NULL, $udb);

    $files = glob($this->tempDir.'XXXXX/prices/*');
    foreach($files as $file) {

        $ext = pathinfo($file, PATHINFO_EXTENSION);
        $data = ($ext == 'txt')?$this->_csvToArray($file, ';'):false;

        // If the CSV data is in $data
        if($data !== false) {
            $totalCount = count($data);
            for($i = 0; $i <= $totalCount; $i++) {

                //$aAvailable[] = $data[$i]['ArtID'];
                $rows++;
                //$message .= 'loop start: '.memory_get_usage()."\r\n";

                $object->where(array('art_id'=>$data[$i]['ArtID'],'supplier_id'=>$supplier->id))->get(1);

                $auvibel->select('value')->where(array('art_id'=>$data[$i]['ArtID'], 'supplier_id'=>$supplier->id))->get(1);
                $auvibel->value = ($auvibel->exists())?$auvibel->value:0;

                $bebat->select('value')->where(array('art_id'=>$data[$i]['ArtID'], 'supplier_id'=>$supplier->id))->get(1);
                $bebat->value = ($bebat->exists())?$bebat->value:0;

                $recupel->select('value')->where(array('art_id'=>$data[$i]['ArtID'], 'supplier_id'=>$supplier->id))->get(1);
                $recupel->value = ($recupel->exists())?$recupel->value:0;

                $reprobel->select('value')->where(array('art_id'=>$data[$i]['ArtID'], 'supplier_id'=>$supplier->id))->get(1);
                $reprobel->value = ($reprobel->exists())?$reprobel->value:0;

                $intrastat = 0;

                $data[$i]['LP_Eur'] = ($data[$i]['LP_Eur'] != '')?str_replace(',', '.', $data[$i]['LP_Eur']):0;
                $data[$i]['DE_Eur'] = ($data[$i]['DE_Eur'] != '')?str_replace(',', '.', $data[$i]['DE_Eur']):0;
                $data[$i]['D1_Eur'] = ($data[$i]['D1_Eur'] != '')?str_replace(',', '.', $data[$i]['D1_Eur']):0;
                $data[$i]['D1_Eur'] = ($data[$i]['D2_Eur'] != '')?str_replace(',', '.', $data[$i]['D2_Eur']):0;
                $data[$i]['PricePersonal_Eur'] = ($data[$i]['PricePersonal_Eur'] != '')?str_replace(',', '.', $data[$i]['PricePersonal_Eur']):0;
                $data[$i]['BackorderDate'] = ($data[$i]['BackorderDate'] != '')?date('Y-m-d', strtotime($data[$i]['BackorderDate'])):NULL;
                $data[$i]['ModifDate'] = ($data[$i]['ModifDate'] != '')?date('Y-m-d', strtotime($data[$i]['ModifDate'])):NULL;

                if($object->exists()) {
                    if($object->allow_cron_update) { //if($data[$i]['ModifDate'] != $object->modified) {

                        // Check if category group exists
                        $cat->select('id')->where(array(
                            'supplier_id' => $supplier->id,
                            'name_a' => $data[$i]['Class1'],
                            'name_b' => $data[$i]['Class2'],
                            'name_c' => $data[$i]['Class3'],
                        ))->get(1);
                        if(!$cat->exists()) {

                            // Category should be added
                            $cat->supplier_id = $supplier->id;
                            $cat->name_a = $data[$i]['Class1'];
                            $cat->name_b = $data[$i]['Class2'];
                            $cat->name_c = $data[$i]['Class3'];
                            $cat->save();

                            // Log as notification: New supplier categorie
                            $this->_notify('Niewe categorie',array(
                                'body' => $supplier->name.' heeft "'.$cat->name_a.' - '.$cat->name_b.' - '.$cat->name_c.'" als nieuwe categorie toegevoegd.',
                                'controller' => 'leveranciers',
                                'trigger' => 'new_supplier_category',
                                'url' => base_url().'leveranciers/item/'.$supplier->id.'/categorien',
                                'icon' => 'icon-truck',
                                'udb' => $udb,
                            ));
                        }

                        // Check if manufacturer exists
                        $manu->select('id')->where(array(
                            'name' => $data[$i]['PublisherName']
                        ))->get(1);
                        if(!$manu->exists()) {

                            // Manufacturer should be added
                            $manu->name = $data[$i]['PublisherName'];
                            $manu->save($supplier);
                        }

                        // Add the product to the database
                        $object->art_id = $data[$i]['ArtID'];
                        $object->supplier_id = $supplier->id;
                        $object->supplier_category_id = $cat->id;
                        $object->supplier_manufacturer_id = $manu->id;
                        $object->part_id = $data[$i]['PartID'];
                        $object->ean_code = $data[$i]['EanCode'];
                        $object->name = $data[$i]['Description'];
                        $object->description = NULL;
                        $object->version = $data[$i]['Version'];
                        $object->language = $data[$i]['Language'];
                        $object->media = $data[$i]['Media'];
                        $object->trend = $data[$i]['Trend'];
                        $object->price_group = $data[$i]['PriceGroup'];
                        $object->price_code = $data[$i]['PriceCode'];
                        $object->eur_lp = $data[$i]['LP_Eur'];
                        $object->eur_de = $data[$i]['DE_Eur'];
                        $object->eur_d1 = $data[$i]['D1_Eur'];
                        $object->eur_d2 = $data[$i]['D2_Eur'];
                        $object->eur_personal = $data[$i]['PricePersonal_Eur'];
                        $object->stock = $data[$i]['Stock'];
                        $object->backorder = ($data[$i]['BackorderDate'] != '' && !empty($data[$i]['BackorderDate']))?$data[$i]['BackorderDate']:NULL;
                        $object->modified = ($data[$i]['ModifDate'] != '' && !empty($data[$i]['ModifDate']))?$data[$i]['ModifDate']:NULL;
                        $object->flag = 'MODIFIED';
                        $object->auvibel = $auvibel->value;
                        $object->bebat = $bebat->value;
                        $object->intrastat = $intrastat;
                        $object->recupel = $recupel->value;
                        $object->reprobel = $reprobel->value;
                        $object->save();

                        $updated++;
                    }
                    elseif(($object->auvibel != $auvibel) || ($object->bebat != $bebat) || ($object->recupel != $recupel) || ($object->reprobel != $reprobel)) {
                        $object->auvibel = $auvibel->value;
                        $object->bebat = $bebat->value;
                        $object->intrastat = $intrastat;
                        $object->recupel = $recupel->value;
                        $object->reprobel = $reprobel->value;
                        $object->save();
                    }
                }
                else {

                    // Check if category group exists
                    $cat->select('id')->where(array(
                        'supplier_id' => $supplier->id,
                        'name_a' => $data[$i]['Class1'],
                        'name_b' => $data[$i]['Class2'],
                        'name_c' => $data[$i]['Class3'],
                    ))->get(1);
                    if(!$cat->exists()) {

                        // Category should be added
                        $cat->supplier_id = $supplier->id;
                        $cat->name_a = $data[$i]['Class1'];
                        $cat->name_b = $data[$i]['Class2'];
                        $cat->name_c = $data[$i]['Class3'];
                        $cat->save();

                        // Log as notification: New supplier categorie
                        $this->_notify('Niewe categorie',array(
                            'body' => $supplier->name.' heeft "'.$cat->name_a.' - '.$cat->name_b.' - '.$cat->name_c.'" als nieuwe categorie toegevoegd.',
                            'controller' => 'leveranciers',
                            'trigger' => 'new_supplier_category',
                            'url' => '[hidden-url]'.$supplier->id.'/categorien',
                            'icon' => 'icon-truck',
                            'udb' => $udb,
                        ));
                    }

                    // Check if manufacturer exists
                    $manu->select('id')->where(array(
                        'name' => $data[$i]['PublisherName']
                    ))->get(1);
                    if(!$manu->exists()) {

                        // Manufacturer should be added
                        $manu->name = $data[$i]['PublisherName'];
                        $manu->save($supplier);
                    }

                    // Add the product to the database
                    $object->art_id = $data[$i]['ArtID'];
                    $object->supplier_id = $supplier->id;
                    $object->supplier_category_id = $cat->id;
                    $object->supplier_manufacturer_id = $manu->id;
                    $object->part_id = $data[$i]['PartID'];
                    $object->ean_code = $data[$i]['EanCode'];
                    $object->name = $data[$i]['Description'];
                    $object->description = NULL;
                    $object->version = (($data[$i]['Version'] != '')?$data[$i]['Version']:NULL);
                    $object->language = (($data[$i]['Language'] != '')?$data[$i]['Language']:NULL);
                    $object->media = (($data[$i]['Media'] != '')?$data[$i]['Media']:NULL);
                    $object->trend = (($data[$i]['Trend'] != '')?$data[$i]['Trend']:NULL);
                    $object->price_group = (($data[$i]['PriceGroup'] != '')?$data[$i]['PriceGroup']:NULL);
                    $object->price_code = (($data[$i]['PriceCode'] != '')?$data[$i]['PriceCode']:NULL);
                    $object->eur_lp = (($data[$i]['LP_Eur'] != '')?$data[$i]['LP_Eur']:NULL);
                    $object->eur_de = (($data[$i]['DE_Eur'] != '')?$data[$i]['DE_Eur']:NULL);
                    $object->eur_d1 = (($data[$i]['D1_Eur'] != '')?$data[$i]['D1_Eur']:NULL);
                    $object->eur_d2 = (($data[$i]['D2_Eur'] != '')?$data[$i]['D2_Eur']:NULL);
                    $object->eur_personal = $data[$i]['PricePersonal_Eur'];
                    $object->stock = $data[$i]['Stock'];
                    $object->backorder = ($data[$i]['BackorderDate'] != '' && !empty($data[$i]['BackorderDate']))?$data[$i]['BackorderDate']:NULL;
                    $object->modified = ($data[$i]['ModifDate'] != '' && !empty($data[$i]['ModifDate']))?$data[$i]['ModifDate']:NULL;
                    $object->flag = NULL;
                    $object->auvibel = $auvibel->value;
                    $object->bebat = $bebat->value;
                    $object->intrastat = $intrastat;
                    $object->recupel = $recupel->value;
                    $object->reprobel = $reprobel->value;
                    $object->save();
                    //$object->clear_cache();

                    $new++;
                }

                //$message .= 'loop end A: '.memory_get_usage().' - '.$i."\r\n";

                $object->clear();
                $cat->clear();
                $manu->clear();
                $auvibel->clear();
                $bebat->clear();
                $recupel->clear();
                $reprobel->clear();

                unset($data[$i]);

                //$message .= 'loop end B: '.memory_get_usage()."\r\n";
            }
        }
        unset($manu);
        unset($auvibel);
        unset($bebat);
        unset($recupel);
        unset($reprobel);

        if(is_file($file)) {
            unlink($file);
        }

        $object->clear();
        //$message .= 'BEFORE MARK EOL: '.memory_get_usage()."\r\n";
        /**
         * Mark products as EOL when not found in file
         */
        $eolCount = 0;
        $eol = $object
            ->group_start()
                ->where('flag IS NULL')
                ->or_where('flag !=', 'EOL')
            ->group_end()
            ->where('supplier_id', $supplier->id)
            ->group_start()
                ->group_start()->where('updated IS NOT NULL')->where('updated <',$cronStart)->group_end()
                ->or_group_start()->where('updated IS NULL')->where('created <',$cronStart)->group_end()
            ->group_end()
            ->get_iterated();

        $p = new Product(NULL,$udb);
        //unset($aAvailable);
        foreach($eol as $i => $product) {
            $product->flag = "EOL";
            $product->save();

            if($product->art_id != NULL) {
                // The 'copied' products should be marked eol in the webshop!
                $p->where('art_code',$product->art_id)->where('supplier_product_id', $product->id)->get();
                if($p->exists()) {
                    $p->eol = date('Y-m-d H:i:s');
                    $p->save();
                }
                $p->clear();
            }

            $product->clear();
            $eolCount++;
            //unset($eol[$i]);
            //$message .= 'INSIDE MARK EOL: '.memory_get_usage()."\r\n";
        }
        unset($product);
        $object->clear();
        //$message .= 'AFTER MARK EOL: '.memory_get_usage()."\r\n";
        if($eolCount > 0) {
            // Log as notification: supplier products marked EOL
            $this->_notify('EOL melding',array(
                'body' => "Er ".(($eolCount == 1)?'is een product':'zijn '.$eolCount.' producten')." gemarkeerd als EOL",
                'controller' => 'leveranciers',
                'trigger' => 'eol_supplier_product',
                'url' => '[hidden-url]'.$supplier->id.'/artikels',
                'icon' => 'icon-truck',
                'udb' => $udb,
            ));
        }
    }

    // After looping files build e-mail.
    $message .= 'Totaal: '.$rows. "\r\n";
    $message .= 'new: '.$new. "\r\n";
    $message .= 'updated: '.$updated. "\r\n";
    $message .= 'EOL: '.$eolCount. "\r\n";
    $subject = 'Import XXXXX Update';
}
// No updates found
else {
    $subject = 'Import XXXXX No Update Found';
    $message .= "\r\n";
}
$message .= '<h3>Einde: '.date('Y-m-d H:i:s').'</h3>' . "\r\n";
mail($this->adminMail, $subject, $message, $this->headerMail);

// Remove import_found marker for supplier
$supplier->import_found = false;
$supplier->save();

回答1:

We had a similar situation. After a lot of attempts at making the script better, we decided that we needed another approach to make our import work and not take ~10 hours.

What we did was dump all the PHP code, and instead use mysqlimport to load the contents of the CSV file directly into a table. That table now contains everything we need, but not in a form that's useful for us (no structure, some fields need some processing, etc.)

However, because everything is now in the database, we can do everything we want with a query. For example, deleting all data that is no longer in the import file, thats just DELETE FROM structured_table AS st LEFT JOIN unstructured_table AS ut ON st.someField = ut.someField WHERE ut.someField IS NULL;, updating existing records is just UPDATE structured_table AS st INNER JOIN unstructured_table AS ut ON st.someField = ut.someField SET st.anotherField = CONCAT(ut.aField, ' ', ut.yetAnotherField);.

Obviously, for a complex import script, your queries will be more complex and you'll need more of them. You might even need to throw some stored procedures in to do processing on individual fields. But if you can take this kind of approach you'll end up with a process that can handle a lot of data and is very scalable.



回答2:

I have a similar situation... Compare around 20M records every day to update a few records with changes and add / remove the delta. Data source is CSV as well. I use perl, while I think php also work.

  1. Each record must have a linking key, SKU of product? Or something like that. May already be the primary key /unique key in your DB table.
  2. You know the lst of fields that you want to compare and update.

Step 1: read ALL records from DB, store in an array using the linking key as named index.

1.1: value is concat of all fields need to compare, or md5() of the concat result to save memory.

Step 2: loop through the CSV file, extract the linking key and new values per row.

2.1: if linking key is NOT in the array, INSERT action to DB.

2.2: isset() return true so compare the values (or md5() of the value concat), if different, UPDATE action to DB.

2.3: delete this entry from the array.

Step 3: by the end of reading the CSV, the entries remains in the array were records to DELETE.

In my case, it use less than 2GB RAM for the process and runs around 3 minutes, which should be feasible and acceptable.