Is there a better way to process a 300,000 line te

2020-08-01 06:47发布

问题:

What I'm doing right now is reading the contents of the text file and store it in a variable. After reading the whole content, I run a loop for the chunk data and in there call a function that will read to each line of the chunk data and pass every line to another function that process the processing of each column of data and inserting it in the database by batch. The batch is the whole chunk.

The code process too long for every file with more than 500KB size. My problem is there is no unique identifier in the text file that I can use so that I can apply "LOAD DATA INFILE" which put me in this situation of processing the text file by chunks.

The 700K took almost the whole day in processing but still it depends on the machine specifications. The code runs in an CentOS. After the first text file processed the next text file with 800KB++ size took almost a week to process. In these goes with the other text file with more than 800KB size, took almost or more than a week to process especially the 1MB size file.

Can somebody tell me what I'm doing wrong and what are the options I need to make my code run efficiently.


/*
====================================================================================
                RECORDS FETCH 
====================================================================================

Needs path and filename with extension.
The page do an iteration of records in the file by line.
After by line, it iterates again per delimiter ","..
It concatenates the part of the records for bulk insert process.
PID address is incremental, every three PID correspond to one Chamber
and the reading in each Chamber is CO2 for first PID address, RH for the
second PID address, TEMP for the third PID address.


====================================================================================
*/
$path = "";
$filename = "";
error_reporting(0);
include_once ("connect.php");
$p_results = mysql_query("SELECT PATH, FILENAME FROM tbl_path"); 
if(mysql_num_rows($p_results) > 0 ){
while ( $rows = mysql_fetch_assoc($p_results) )
{
    $path = $rows['PATH'];
    $filename = $rows['FILENAME'];
}
}
else
{
mysql_query("INSERT INTO tbl_log (LOG_DATE, DETAILS) VALUES ( NOW(), 'There is no path and filename to be accessed. Please provide.' )");
}
$path = str_replace('\\','/',$path);
//holds the path..NOTE: Change backslash (\) to forward slash (/)
//$path = "E:/";
//holds the filename.. NOTE: Include the file extension
//$filename = "sample2.txt"; //"chamber_1_con.txt";
if ($path <> "" && $filename <> "")
     is_connected($path, $filename);

echo ('<script language="javascript">window.location = "chambers_monitoring.php"  </script>');

//function for DB writing in table data
function InsertData($rec, &$errorDataCnt, &$sql, $y, $i, $x, &$dCnt)
{

$dDate = (!isset($rec[0]) ? 0 : (trim($rec[0]) == "" ? 0 : trim($rec[0]))); 
$dTime = (!isset($rec[1]) ? 0 : (trim($rec[1]) == "" ? 0 : trim($rec[1]))); 
$address = (!isset($rec[2]) ? 0 : (trim($rec[2]) == "" ? 0 : trim($rec[2]))); 
$co2SV = (!isset($rec[3]) ? 0 : (trim($rec[3]) == "" ? 0 : trim($rec[3]))); 
$co2PV = (!isset($rec[4]) ? 0 : (trim($rec[4]) == "" ? 0 : trim($rec[4]))); 
$tempSV = (!isset($rec[5]) ? 0 : (trim($rec[5]) == "" ? 0 : trim($rec[5]))); 
$tempPV = (!isset($rec[6]) ? 0 : (trim($rec[6]) == "" ? 0 : trim($rec[6]))); 
$rhSV = (!isset($rec[7]) ? 0 : (trim($rec[7]) == "" ? 0 : trim($rec[7]))); 
$rhPV = (!isset($rec[8]) ? 0 : (trim($rec[8]) == "" ? 0 : trim($rec[8]))); 


    /* include('connect.php'); */
    set_time_limit(36000);
    ini_set('max_execution_time','43200');
    $e_results = mysql_query("SELECT ID FROM tbl_reading WHERE (READING_DATE = '".date("Y-m-d",strtotime($dDate))."' AND READING_TIME = '".date("H:i:s",strtotime($dTime))."') AND READING_ADDRESS = $address LIMIT 1"); 
    if(mysql_num_rows($e_results) <= 0 ){
      if (!($dDate == 0 || $dTime == 0 || $address == 0) ) {
        if ($y == 0){
            $sql = "INSERT INTO tbl_reading (READING_DATE,   READING_TIME, READING_ADDRESS, CO2_SET_VALUE, CO2_PROCESS_VALUE, TEMP_SET_VALUE, TEMP_PROCESS_VALUE, RH_SET_VALUE, RH_PROCESS_VALUE) VALUES ('".date("Y/m/d",strtotime($dDate))."','".date("H:i:s",strtotime($dTime))."', ".  mysql_real_escape_string($address).",". mysql_real_escape_string($co2SV).",". mysql_real_escape_string($co2PV).",". mysql_real_escape_string($tempSV).",". mysql_real_escape_string($tempPV).",". mysql_real_escape_string($rhSV).",". mysql_real_escape_string($rhPV).")";
        }
        else {
            $sql .= ",   ('".date("Y/m/d",strtotime($dDate))."','".date("H:i:s",strtotime($dTime))."', ". mysql_real_escape_string($address).",". mysql_real_escape_string($co2SV).",". mysql_real_escape_string($co2PV).",". mysql_real_escape_string($tempSV).",". mysql_real_escape_string($tempPV).",". mysql_real_escape_string($rhSV).",". mysql_real_escape_string($rhPV).")";

        }
       }
      }

        if(($x + 1) == $i){
            //echo ($x + 1)." = ".$i."<br>";
            if (substr($sql, 0, 1) == ",")
                $sql = "INSERT INTO tbl_reading (READING_DATE, READING_TIME, READING_ADDRESS, CO2_SET_VALUE, CO2_PROCESS_VALUE, TEMP_SET_VALUE, TEMP_PROCESS_VALUE, RH_SET_VALUE, RH_PROCESS_VALUE) VALUES".substr($sql, 1);
            //echo $sql."<br>";
            set_time_limit(36000);
            try {

                $result = mysql_query($sql) ;
                $dCnt = mysql_affected_rows();
                if( $dCnt  == 0)
                {
                    $errorDataCnt = $errorDataCnt + 1;
                }
            }
            catch (Exception $e)
            {
                mysql_query("INSERT INTO tbl_log (LOG_DATE,  DETAILS) VALUES ( NOW(), '".$e->getMessage()."' )");
            }
            //mysql_free_result($result);
        }

unset($dDate); 
unset($dTime); 
unset($address); 
unset($co2SV); 
unset($co2PV); 
unset($tempSV); 
unset($tempPV); 
unset($rhSV); 
unset($rhPV);  

 }

//function for looping into the records per line
function loop($data)
{
$errorDataCnt = 0; $sql = ""; $exist = 0;
$i = count( $data); $x = 0; $y = 0; $tmpAdd = ""; $cnt = 0; $t = 0; $dCnt = 0; 

ini_set('max_execution_time','43200');
while($x < $i) 
{
    $rec = explode(",", $data[$x]); 
    InsertData($rec, $errorDataCnt, $sql, $y, $i, $x, $dCnt);
    $x++; 
    $y++;
    unset($rec);
}

    $errFetch = ($i - $dCnt);
if($errorDataCnt > 0)
    mysql_query("INSERT INTO tbl_log (LOG_DATE, DETAILS) VALUES ( NOW(), 'Error inserting $errFetch records. Check if there is a NULL or empty value or if it is the correct data type.' )");
if($dCnt > 0)
    mysql_query("INSERT INTO tbl_log (LOG_DATE, DETAILS) VALUES ( NOW(), 'Saved   $dCnt of $i records into the database. Total $exist records already existing in the database.' )");


}

// functions in looping records and passing into $contents variable
function DataLoop($file)
{
ini_set("auto_detect_line_endings", true);
set_time_limit(36000);
ini_set('max_execution_time','43200');
$contents = ''; $j = 0;
if ($handle = fopen($file,"rb")){
    while (!feof($handle)) {
        $rdata = fgets($handle, 3359232);//filesize($file));
        //$rdata = fread($handle, filesize($file));
        if(trim($rdata) != "" || $rdata === FALSE){
            if (feof($handle)) break;
            else {
            $contents .= $rdata;
            $j = $j + 1; }}
    }   
    fclose($handle);
    $data = explode("\n", $contents);
    unset($contents);
    unset($rdata);
}
/* echo count($contents)." ".count($data); */
/* $query = "SELECT MAX(`ID`) AS `max` FROM `tbl_reading`";
$result = mysql_query($query) or die(mysql_error());
$row = mysql_fetch_assoc($result);
$max = $row['max']; */
/* $res =   mysql_fetch_assoc(mysql_query("SELECT COUNT(*) as total FROM  tbl_reading")) or die(mysql_error());
echo "<script>alert('".$res['total']."')</script>"; */
$p = 0;
ini_set('memory_limit','512M');
if($j != 0)
{
    foreach(array_chunk($data, ceil(count($data)/200)) as $rec_data){
        loop($rec_data);
        $p++;
    }
} 

}
//function to test if filename exists
function IsExist($file)
{
if ($con = fopen($file, "r"))// file_exists($file))
{
    fclose($con);
    DataLoop($file);
}
else
    mysql_query("INSERT INTO tbl_log (LOG_DATE, DETAILS) VALUES ( NOW(), '$filename is not existing in $path. Check if the filename or the path is correct.' )");

}

//function to test connection to where the file is.
function is_connected($path, $filename)
{
//check to see if the local machine is connected to the network 
$errno = ""; $errstr = ""; 
if (substr(trim($path), -1) == '/')
  $file = $path.$filename;
else
    $file = $path."/".$filename; 

IsExist($file);

}

回答1:

From your code, it appears that your "unique identifier" (for the purposes of this insertion, at least) is the composite (READING_DATE, READING_TIME, READING_ADDRESS).

If you define such a UNIQUE key in your database, then LOAD DATA with the IGNORE keyword should do exactly what you require:

ALTER TABLE tbl_reading
  ADD UNIQUE KEY (READING_DATE, READING_TIME, READING_ADDRESS)
;

LOAD DATA INFILE '/path/to/csv'
    IGNORE
    INTO TABLE tbl_reading
    FIELDS
        TERMINATED BY ','
        OPTIONALLY ENCLOSED BY '"'
        ESCAPED BY ''
    LINES
        TERMINATED BY '\r\n'
    (@rec_0, @rec_1, @rec_2, @rec_3, @rec_4, @rec_5, @rec_6, @rec_7, @rec_8)
    SET
        READING_DATE = DATE_FORMAT(STR_TO_DATE(TRIM(@rec_0), '???'), '%Y/%m/%d'),
        READING_TIME = DATE_FORMAT(STR_TO_DATE(TRIM(@rec_1), '???'), '%H:%i:%s'),
        READING_ADDRESS    = TRIM(@rec_2),
        CO2_SET_VALUE      = TRIM(@rec_3),
        CO2_PROCESS_VALUE  = TRIM(@rec_4),
        TEMP_SET_VALUE     = TRIM(@rec_5),
        TEMP_PROCESS_VALUE = TRIM(@rec_6),
        RH_SET_VALUE       = TRIM(@rec_7),
        RH_PROCESS_VALUE   = TRIM(@rec_8)
;

(Where '???' are replaced with strings that represent the date and time formats in your CSV).

Note that you should really be storing READING_DATE and READING_TIME together in a single DATETIME or TIMESTAMP column:

ALTER TABLE tbl_reading
  ADD COLUMN READING_DATETIME DATETIME AFTER READING_TIME,
  ADD UNIQUE KEY (READING_DATETIME, READING_ADDRESS)
;

UPDATE tbl_reading SET READING_DATETIME = STR_TO_DATE(
  CONCAT(READING_DATE, ' ', READING_TIME),
  '%Y/%m/%d %H:%i:%s'
);

ALTER TABLE tbl_reading
  DROP COLUMN READING_DATE,
  DROP COLUMN READING_TIME
;

In which case, the SET clause of the LOAD DATA command would include instead:

READING_DATETIME = STR_TO_DATE(CONCAT(TRIM(@rec_0), ' ', TRIM(@rec_1)), '???')


回答2:

Reading a 1 MB file line by line takes less than a second. Even concatenating and then again splitting all lines doesn't take any amount of time.

With a simple test, inserting 100,000 rows took about 90 seconds.

But, doing a select query before the insert, increases the time needed by more than an order of magnitude.

The lesson to learn from this is, if you need to insert large amounts of data, do it in big chunks (see LOAD DATA INFILE). If you can't do this for whatever reasons, do inserts and inserts alone.

Update:

As @eggyal already suggested, add a unique key to your table definition. In my small, one column test, I added a unique key and changed insert to insert ignore. Wall clock time increased 15%-30% (~100-110 sec), which is much better than the increase to 38 min (25 times!) with separate select + insert.

So, as a conclusion, (stealing from eggyal) add

ALTER TABLE tbl_reading
  ADD UNIQUE KEY (READING_DATE, READING_TIME, READING_ADDRESS)

to your table, remove the select in InsertData() and change insert to insert ignore.



回答3:

You need to make some preparations before starting your inserts because InnoDB engine makes inserts too slow with default settings.

either set this option before insert

innodb_flush_log_at_trx_commit=0

or make all your inserts into one transaction.
And it will be blazing fast, no matter what syntax or driver you choose.