Fail to Read Through Shared Memory

2019-01-28 10:42发布

I am trying to publish some random things over shared memory; and for some weird reason, the reader doesn't pick up what the sender has written

#include <sys/stat.h>
#include <fcntl.h>
#include <sys/mman.h>
#include <unistd.h>
#include <sys/types.h>
#include <cstdio>

class SHM {
    volatile char* _ptr;
public:
    SHM() {
        const auto handle = shm_open("myTest", O_RDWR|O_CREAT, 0666);
        const auto size =  4 * 1024 * 1024;
        if (-1 == ftruncate(handle, size)) {
            throw;
        }
        _ptr = (volatile char*)mmap(0,size , PROT_READ | PROT_WRITE, MAP_SHARED, handle, 0);

        if(_ptr == MAP_FAILED){
            throw;
        }

                int rc = fchmod(handle, 0666);
                if (rc == -1) {
            throw;
                }
    }

    bool read(uint64_t& magic, uint64_t& time) {
        const uint64_t newVal = *(uint64_t*)_ptr;
        if (newVal != magic) {
            magic = newVal;
            printf("value changed!!!\n");
            time = *(uint64_t*)(_ptr + sizeof(magic));
            return true;
        }
        //printf("old value: %lu\n", newVal);
        return false;
    }

    void publish(const uint64_t time) {
        __sync_fetch_and_add((uint64_t*)_ptr, time);
        __sync_synchronize();
        *(uint64_t*)(_ptr + sizeof(uint64_t)) = time;
    }
};

Here is the sender:

#include <ctime>
#include <unistd.h>
#include <cstdlib>
#include <cstdint>
#include "shm.h"

int main() {
    SHM shm;
    timespec t;
    for (auto i = 0; i < 10000; i++) {
        if (0 == clock_gettime(CLOCK_REALTIME, &t)) {
            const uint64_t v = t.tv_sec * 1000 * 1000 * 1000 + t.tv_nsec;
            shm.publish(v);
            printf("published %lu\n", v);
            usleep(100);
        }
    }
}

Here is the reader:

#include <iostream>
#include "shm.h"

int main() {
    SHM shm;
    uint64_t magic = 0;
    uint64_t t = 0;
    while (true) {
        if (shm.read(magic, t)) {
            printf("%lu, %lu\n", magic, t);
        }
    }
}

If I restart the reader, the reader is indeed able to read the last value that the sender has written.

However, if I start the reader first, and then the sender, all the values the sender writes aren't picked up by the reader.

To make this even weirder, if I uncomment the printf statement in SHM::read(), then the reader is able to pick up sometimes.

Any idea?

GCC version:

g++ (GCC) 7.2.1 20170829 (Red Hat 7.2.1-1)

标签: c++ linux g++
1条回答
冷血范
2楼-- · 2019-01-28 11:24

I spotted a couple of issues, however, I am unsure if they would fix your problem.

  1. name for shm_open should start with / for portable use.
  2. In read and publish the casts must not discard volatile. E.g.: const uint64_t newVal = *(uint64_t volatile*)_ptr;. Even better, drop volatile and use std::atomic.

Although there are different processes involved, this is still the case of same objects being accessed by more than one thread of execution and at least one of these threads modifies the shared objects.


I made the above changes. Using std::atomic fixed it:

class SHM {
    void* _ptr;
public:
    SHM() {
        const auto handle = shm_open("/myTest", O_RDWR|O_CREAT, 0666);
        const auto size =  4 * 1024 * 1024;
        if (-1 == ftruncate(handle, size))
            throw;

        _ptr = mmap(0,size , PROT_READ | PROT_WRITE, MAP_SHARED, handle, 0);

        if(_ptr == MAP_FAILED)
            throw;
    }

    bool read(uint64_t& magic, uint64_t& time) {
        auto p = static_cast<std::atomic<uint64_t>*>(_ptr);
        const uint64_t newVal = p[0];
        if (newVal != magic) {
            magic = newVal;
            printf("value changed!!!\n");
            time = p[1];
            return true;
        }
        return false;
    }

    void publish(const uint64_t time) {
        auto p = static_cast<std::atomic<uint64_t>*>(_ptr);
        p[0] += time;
        p[1] = time;
    }
};

void sender() {
    SHM shm;
    timespec t;
    for (auto i = 0; i < 10000; i++) {
        if (0 == clock_gettime(CLOCK_REALTIME, &t)) {
            const uint64_t v = t.tv_sec * 1000 * 1000 * 1000 + t.tv_nsec;
            shm.publish(v);
            printf("published %lu\n", v);
            usleep(100);
        }
    }
}

void reader() {
    SHM shm;
    uint64_t magic = 0;
    uint64_t t = 0;
    while (true) {
        if (shm.read(magic, t)) {
            printf("%lu, %lu\n", magic, t);
        }
    }
}

int main(int ac, char**) {
    if(ac > 1)
        reader();
    else
        sender();
}

With std::atomic you can have more control. E.g.:

struct Data {
    std::atomic<uint64_t> time;
    std::atomic<uint64_t> generation;
};

// ...

    bool read(uint64_t& generation, uint64_t& time) {
        auto data = static_cast<Data*>(_ptr);

        auto new_generation = data->generation.load(std::memory_order_acquire); // 1. Syncronizes with (2).
        if(generation == new_generation)
            return false;

        generation = new_generation;
        time = data->time.load(std::memory_order_relaxed);
        printf("value changed!!!\n");
        return true;
    }

    void publish(const uint64_t time) {
        auto data = static_cast<Data*>(_ptr);

        data->time.store(time, std::memory_order_relaxed);
        data->generation.fetch_add(time, std::memory_order_release);  // 2. (1) Synchronises with this store.
    }
查看更多
登录 后发表回答