Reading messages in Chronicle Queue Tailer (v5.16.

2019-01-27 06:43发布

问题:

I use CQ v5.16.11 (with openjdk 11) to save data with daily roll cycles. The process runs non-stop from Sun to Fri so I have 5 cq4 files per week. I ran the process for 1.5 weeks and have 8 files (3 for 1st and 5 for 2nd week).

So the files that I have are:

20181003.cq4 cycle=17807,
20181004.cq4 cycle=17808,
20181005.cq4 cycle=17809,
20181007.cq4 cycle=17811,
20181008.cq4 cycle=17812,
20181009.cq4 cycle=17813,
20181010.cq4 cycle=17814,
20181011.cq4 cycle=17815,

Note the missing file for 20181006.cq4 (cycle=17810) as the process does not run on Saturday.

I use this code to read data:

tailer.toEnd();
lastTailerIndex = tailer.index();
tailer.toStart();

while (tailer.index() <= lastTailerIndex) {
    // read data
    if (tailer.readBytes(data) {
     /// do something with data bytes
    }
    if (tailer.index() == lastTailerIndex) {
        break;
    }
}

This correctly reads the 1st week data but does not read the 2nd week data as it does not auto-roll to next cycle.

Any idea why this is happening or how to fix this?

The issue is similar to this which was for an older version

  • Reading message from chronicle queue does not move the current index to the next cycle automatically
  • I have created a single queue with daily rolling

Logs:

2018-10-12 12:41:15,784 DEBUG [main] net.openhft.chronicle.bytes.MappedFile - Allocation of 0 chunk in /site/data/metadata.cq4t took 19.237 ms.
2018-10-12 12:41:15,876 DEBUG [main] net.openhft.chronicle.bytes.MappedFile - Allocation of 0 chunk in /site/data/20181011.cq4 took 0.063 ms.
2018-10-12 12:41:15,881 DEBUG [main] net.openhft.chronicle.queue.impl.single.PretoucherState - /site/data/20181011.cq4 - Reset pretoucher to pos 4835096 as the underlying MappedBytes changed.
2018-10-12 12:41:15,887 DEBUG [main] net.openhft.chronicle.bytes.MappedFile - Allocation of 0 chunk in /site/data/20181003.cq4 took 0.065 ms.
2018-10-12 12:41:15,995 DEBUG [main] net.openhft.chronicle.bytes.MappedFile - Allocation of 0 chunk in /site/data/20181011.cq4 took 0.082 ms.
2018-10-12 12:41:15,996 DEBUG [main] net.openhft.chronicle.queue.impl.single.SingleChronicleQueueBuilder - File released /site/data/20181003.cq4
2018-10-12 12:41:15,997 DEBUG [main] net.openhft.chronicle.queue.impl.single.SingleChronicleQueueBuilder - File released /site/data/20181011.cq4
2018-10-12 12:41:16,418 DEBUG [main] net.openhft.chronicle.bytes.MappedFile - Allocation of 0 chunk in /site/data/20181004.cq4 took 0.112 ms.
2018-10-12 12:41:16,418 DEBUG [main] net.openhft.chronicle.queue.impl.single.SingleChronicleQueueBuilder - File released /site/data/20181003.cq4
2018-10-12 12:41:16,813 DEBUG [main] net.openhft.chronicle.bytes.MappedFile - Allocation of 0 chunk in /site/data/20181005.cq4 took 0.084 ms.
2018-10-12 12:41:16,813 DEBUG [main] net.openhft.chronicle.queue.impl.single.SingleChronicleQueueBuilder - File released /site/data/20181004.cq4

[Edit 1]: The same thing happened on this last weekend, i.e., as expected no new file for Oct 13. Now I have files from Oct 7th to Oct 15th (with missing Oct 13th file). If I do tailer.toStart(); while(tailer.readBytes() { ...} it only reads files from Oct7th till Oct 12th and does not read Oct 14th and 15th.

[Edit 2]: Replicated the issue as below Chronicle-Queue/issues/537

  1. Setup / Libs: jvm openjdk 11, Ubuntu 16.04, openhft.affinity/3.1.9, chronicle-map/3.16.0, chronicle-queue/5.16.11, chronicle-bytes/1.16.23, chronicle-core/1.16.20, chronicle-wire/1.16.16, chronicle-threads/1.16.3, jna/4.4.0
  2. Steps:
    • Start WriterProcess - let it finish.
    • Start ReaderProcess - see the 5 print statements.
    • Stop ReaderProcess
    • Wait for some time - 10 mins.
    • Start WriterProcess again - let it finish or keep running this process.
    • Start ReaderProcess - it prints only the first 5 print statements and nothing prints after this. Even if the WriterProcess is running/writing to queue the tailer in this process does not move forward.

public class WriterProcess {
        public static void main(String[] args) throws InterruptedException {
            final String dir = "/tmp/demo/";
            final LocalTime localTime = LocalTime.of(17, 0);
            final ZoneId zoneID = ZoneId.of("America/New_York");
            final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(2);
            final SingleChronicleQueue queue  = SingleChronicleQueueBuilder.binary(dir)
                    .blockSize((long) Math.pow(2, 23))
                    .rollCycle(RollCycles.MINUTELY)
                    .rollTime(localTime, zoneID)
                    .build();
            final ExcerptAppender appender = queue.acquireAppender();
            // pre touch
            scheduledExecutorService.scheduleAtFixedRate(appender::pretouch,0,30, TimeUnit.SECONDS);
            // write data
            System.out.println("writing data ...");
            writeData(appender, 5);
            // close queue
            System.out.println("shutting down now ...");
            queue.close();
            scheduledExecutorService.shutdown();
            scheduledExecutorService.awaitTermination(1, TimeUnit.SECONDS);
        }
        public static void writeData(ExcerptAppender appender, int count) {
            int ctr = 0;
            String dateStr;
            Date date = new Date();
            while (true) {
                dateStr = date.toString();
                appender.writeText("["+ctr+"] Written " + dateStr);
                System.out.println("["+ctr+"] Written " + dateStr);
                ctr++;
                if (ctr >= count) {
                    break;
                }
                try {
                    Thread.sleep(65_000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    public class ReaderProcess {
        public static void main(String[] args) {
            final String dir = "/tmp/demo/";
            final LocalTime localTime = LocalTime.of(17, 0);
            final ZoneId zoneID = ZoneId.of("America/New_York");
            final SingleChronicleQueue queue  = SingleChronicleQueueBuilder.binary(dir)
                    .blockSize((long) Math.pow(2, 23))
                    .rollCycle(RollCycles.MINUTELY)
                    .rollTime(localTime, zoneID)
                    .build();
            final ExcerptTailer tailer = queue.createTailer();
            tailer.toStart();
            // read data
            System.out.println("reading data ...");
            readData(tailer, 25);
            // close
            System.out.println("shutting down now ...");
            queue.close();
        }
        public static void readData(ExcerptTailer tailer, int count) {
            int ctr = 0;
            Bytes data = Bytes.allocateDirect(new byte[500]);
            while (true) {
                if (tailer.readBytes(data)) {
                    System.out.println("["+ctr+"] Read {"+ data + "}");
                    ctr++;
                    if (ctr >= count) {
                        break;
                    }
                }
            }
        }
    }

回答1:

I have written a slightly simpler version which works with chronicle-bom 2.17 and the versions it uses. The biggest change I made was to clear the Bytes data before reading otherwise it only appends so as to not overwrite anything.

import net.openhft.chronicle.bytes.Bytes;
import net.openhft.chronicle.core.OS;
import net.openhft.chronicle.queue.ExcerptAppender;
import net.openhft.chronicle.queue.ExcerptTailer;
import net.openhft.chronicle.queue.RollCycles;
import net.openhft.chronicle.queue.impl.single.SingleChronicleQueue;
import net.openhft.chronicle.queue.impl.single.SingleChronicleQueueBuilder;

import java.time.LocalDateTime;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class WriterProcess {
    static final String dir = OS.TMP + "/demo-" + System.nanoTime() + "/";

    public static void main(String[] args) throws InterruptedException {
        final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(2);
        final SingleChronicleQueue queue = SingleChronicleQueueBuilder.binary(dir)
                .testBlockSize()
                .rollCycle(RollCycles.TEST_SECONDLY)
                .build();
        final ExcerptAppender appender = queue.acquireAppender();
        // pre touch
        scheduledExecutorService.scheduleAtFixedRate(appender::pretouch, 3, 30, TimeUnit.SECONDS);
        new Thread(ReaderProcess::main).start();
        // write data
        System.out.println("writing data ...");
        writeData(appender, 100);
        // close queue
        System.out.println("shutting down now ...");
        queue.close();
        scheduledExecutorService.shutdown();
        scheduledExecutorService.awaitTermination(1, TimeUnit.SECONDS);
    }

    public static void writeData(ExcerptAppender appender, int count) {
        int ctr = 0;
        while (true) {
            LocalDateTime date = LocalDateTime.now();
            appender.writeText("[" + ctr + "] Written " + date);
            System.out.println("[" + ctr + "] Written " + date);
            ctr++;
            if (ctr >= count) {
                break;
            }
            try {
                Thread.sleep(2_200);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

class ReaderProcess {
    public static void main(String... args) {
        final String dir = WriterProcess.dir;
        final SingleChronicleQueue queue = SingleChronicleQueueBuilder.binary(dir)
                .testBlockSize()
                .rollCycle(RollCycles.TEST_SECONDLY)
                .build();
        final ExcerptTailer tailer = queue.createTailer();
        tailer.toStart();
        // read data
        System.out.println("reading data ...");
        readData(tailer, 100);
        // close
        System.out.println("shutting down now ...");
        queue.close();
    }

    public static void readData(ExcerptTailer tailer, int count) {
        int ctr = 0;
        Bytes data = Bytes.allocateDirect(64);
        while (true) {
            data.clear();
            if (tailer.readBytes(data)) {
                System.out.println("[" + ctr + "] Read {" + data + "}");
                ctr++;
                if (ctr >= count) {
                    break;
                }
            }
        }
    }
}