Implementing mutexes for file writes

2019-03-07 06:36发布

I am trying to use mutexes to avoid multiple writes to the same thread inC/Cpp. Below is the flow of my program. I am confused as to where to include my lock and unlock code.

main() {
    spawn a worker thread
}
worker_thread() {
    read the input file name 
    read some content
    write the content to the given file name
}

Most of the implementation that I see, seem to have something like this:

main() {
    pthread_mutex_init(&myMutex;,0);
    *spawn a worker thread*
    pthread_join(thread1, 0);
    pthread_mutex_destroy(&myMutex;);
}
worker_thread() {
    read the input file name 
    read some content
    write the content to the given file name
}

What I want it something like this:

main() {
    spawn a worker thread
}
worker_thread() {
    read the input file name 
    read some content
    pthread_mutex_init(&myMutex;,0) --> for the given file?
    write the content to the given file name
    pthread_mutex_destroy(&myMutex;);
}

Any ideas to proceed much appreciated. Thank you!

1条回答
Lonely孤独者°
2楼-- · 2019-03-07 06:46

It's fairly easy to create a wrapper for an iostream that ensures only one thread can write to the stream at a time. Unfortunately, almost as soon as you do so, you run into another problem. It ensures that only one thread can insert into the stream at a time, so you get defined behavior. If, however, you have something like:

thread 1: sync_stream << a << b << c << '\n';
thread 2: sync_stream << x << y << z << '\n';

What you wanted was either:

abc
xyz

...or else:

xyz
abc

Since they're in separate threads, it's fine for the order between them to vary, but a line of output from one thread should remain a single line of output. Something like:

abxy
cz

...probably isn't desired or acceptable. To ensure against this, we really need two separate classes. One is a synchronized stream. The other is something to let us do some (more or less arbitrary) set of insertions into the stream as a single, indivisible "transaction". To do that, we can use a pair of classes like this:

class transaction {
    std::ostringstream buffer;
public:
    transaction(std::string const &s="") : buffer(s, std::ios::out | std::ios::ate) {}

    template <class T>
    transaction &operator<<(T const &t) {
        buffer << t;
        return *this;
    }

    friend std::ostream &operator<<(std::ostream &os, transaction const &t) {
        return os << t.buffer.str();
    }
};

class sync_stream {
    std::ostream &out;
    std::mutex mutex;
public:
    sync_stream(std::ostream &sink) : out(sink) { }

    void operator<<(transaction const &t) {
        std::lock_guard<std::mutex> l(mutex);
        out << t;
    }    
};

Note that the transaction class supports chaining, but the sync_stream does not (and the only thing you can insert into it is a transaction). To use them, we do something like this:

for (int i=0; i<10; i++)
    threads[i] = std::thread([&]{ 
        for (int i=0; i<10; i++) 
            s << (transaction() << "Thread: " << std::this_thread::get_id() << "\n");
    });

This way, what a thread thinks of as a single output actually comes out as a single output, so our result might look like this:

Thread: 140375947724544
Thread: 140376068564736
Thread: 140375964509952
Thread: 140375964509952
Thread: 140375972902656
Thread: 140375964509952

Of course, you'll get different thread IDs than I did, and the order of the lines is likely to vary--but each line will be written as a single, intact unit.

Summary

The worker threads shouldn't work directly with the mutex at all. That should be automated, so the worker thread can focus on its work, and spend only a bare minimum of effort on the underlying mechanism necessary for it to do its job.

查看更多
登录 后发表回答