How to read files in multithreaded mode?

2020-03-03 08:28发布

I currently have a program that reads file (very huge) in single threaded mode and creates search index but it takes too long to index in single threaded environment.

Now I am trying to make it work in multithreaded mode but not sure the best way to achieve that.

My main program creates a buffered reader and passes the instance to thread and the thread uses the buffered reader instance to read the files.

I don't think this works as expected rather each thread is reading the same line again and again.

Is there a way to make the threads read only the lines that are not read by other thread? Do I need to split the file? Is there a way to implement this without splitting the file?

Sample Main program:

import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.util.ArrayList;

public class TestMTFile {
    public static void main(String args[]) {
        BufferedReader reader = null;
        ArrayList<Thread> threads = new ArrayList<Thread>();
        try {
            reader = new BufferedReader(new FileReader(
                    "test.tsv"));
        } catch (FileNotFoundException e1) {
            e1.printStackTrace();
        }
        for (int i = 0; i <= 10; i++) {
            Runnable task = new ReadFileMT(reader);
            Thread worker = new Thread(task);
            // We can set the name of the thread
            worker.setName(String.valueOf(i));
            // Start the thread, never call method run() direct
            worker.start();
            // Remember the thread for later usage
            threads.add(worker);
        }

        int running = 0;
        int runner1 = 0;
        int runner2 = 0;
        do {
            running = 0;
            for (Thread thread : threads) {
                if (thread.isAlive()) {
                    runner1 = running++;
                }
            }
            if (runner2 != runner1) {
                runner2 = runner1;
                System.out.println("We have " + runner2 + " running threads. ");

            }
        } while (running > 0);

        if (running == 0) {
            System.out.println("Ended");
        }
    }
}

Thread:

import java.io.BufferedReader;
import java.io.IOException;

public class ReadFileMT implements Runnable {
    BufferedReader bReader = null;

    ReadFileMT(BufferedReader reader) {
        this.bReader = reader;
    }

    public synchronized void run() {
        String line;
        try {
            while ((line = bReader.readLine()) != null) {

                try {
                    System.out.println(line);
                } catch (Exception e) {

                }
            }
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
}

4条回答
贼婆χ
2楼-- · 2020-03-03 09:01

Your bottleneck is most likely the indexing, not the file reading. assuming your indexing system supports multiple threads, you probably want a producer/consumer setup with one thread reading the file and pushing each line into a BlockingQueue (the producer), and multiple threads pulling lines from the BlockingQueue and pushing them into the index (the consumers).

查看更多
淡お忘
3楼-- · 2020-03-03 09:17

See this thread - if your files are all on the same disk then you can't do better than reading them with a single thread, although it may be possible to process the files with multiple threads once you've read them into main memory.

查看更多
成全新的幸福
4楼-- · 2020-03-03 09:20

First, I agree with @Zim-Zam that it is the file IO, not the indexing, that is likely the rate determining step. (So I disagree with @jtahlborn). Depends on how complex the indexing is.

Second, in your code, each thread has it's own, independent BufferedReader. Therefore they will all read the entire file. One possible fix is to use a single BufferedReader that they share. And then you need to synchronize the BufferedReader.readLine() method (I think) since the javadocs are silent on whether BufferedReader is thread-safe. And, since I think the IO is the botleneck, this will become the bottleneck and I doubt if multithreading will gain you much. But give it a try, I have been wrong occasionally. :-)

p.s. I agree with @jtahlmorn that a producer/consumer pattern is better than my share the BufferedReader idea, but that would be much more work for you.

查看更多
爷的心禁止访问
5楼-- · 2020-03-03 09:26

If you can use Java 8, you may be able to do this quickly and easily using the Streams API. Read the file into a MappedByteBuffer, which can open a file up to 2GB very quicky, then read the lines out of the buffer (you need to make sure your JVM has enough extra memory to hold the file):

package com.objective.stream;

import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.stream.Stream;

public class StreamsFileProcessor {
    private MappedByteBuffer buffer;

    public static void main(String[] args){
        if (args[0] != null){
            Path myFile = Paths.get(args[0]);
            StreamsFileProcessor proc = new StreamsFileProcessor();
            try {
                proc.process(myFile);
            } catch (IOException e) {
                e.printStackTrace();
            }   
        }
    }

    public void process(Path file) throws IOException {
        readFileIntoBuffer(file);
        getBufferStream().parallel()
            .forEach(this::doIndex);
    }

    private Stream<String> getBufferStream() throws IOException {
        try (BufferedReader reader = new BufferedReader(new InputStreamReader(new ByteArrayInputStream(buffer.array())))){
            return reader.lines();
        }
    }

    private void readFileIntoBuffer(Path file) throws IOException{
        try(FileInputStream fis = new FileInputStream(file.toFile())){
            FileChannel channel = fis.getChannel();
            buffer = channel.map(FileChannel.MapMode.PRIVATE, 0, channel.size());
        }
    }

    private void doIndex(String s){
        // Do whatever I need to do to index the line here
    }
}
查看更多
登录 后发表回答