I tried looking at cache mechanisms, such as Guava's Cache
. Their expiration is only since last update.
What I'm looking for is a data structure that stores keys and cleans the keys after a time has passed since the first insertion. I'm planning for the value to be some counter.
A scenario might be a silent worker that does some work for the first time but keeps silent for an expiry period of time - even if work is requested. If work is requested after the expiry time has passed, he'll do the work.
Know of such a data structure? Thanks.
There are a few options for this.
Passive Removal
If it is not a requirement to clean up the expired keys as soon as they expire or at set intervals (i.e. a key does not need to be removed when the key expires or at some set interval), then PassiveExpiringMap from Apache Commons Collections is a good option. When attempting to access a key in this map, the Time to Live (TTL) of the key (all keys have the same TTL) is checked and if the key has expired, it is removed from the map and null
is returned. This data structure does not have an active clean-up mechanism, so expired entries are only removed when they are accessed after the TTL corresponding to the key has expired.
Cache
If more cache-based functionality is needed (such as maximum cache capacity and add/remove listening), Google Guava provides the CacheBuilder class. This class is more complex than the Apache Commons alternative, but it also provides much more functionality. The trade-off may be worth it if this intended for more of a cache-based application.
Threaded Removal
If active removal of expired keys is needed, a separate thread can be spawn that is responsible for removing expired keys. Before looking at a possible implementation structure, it should be noted that this approach may be less performant than the above alternatives. Besides the overhead of starting a thread, the thread may cause contention with clients accessing the map. For example, if a client wants to access a key and the clean-up thread is currently removing expired keys, the client will either block (if synchronization is used) or have a different view of the map (which key-value pairs are contained in the map) if some concurrent mechanism is employed.
With that said, using this approach is complicated because it requires that the TTL be stored with the key. One approach is to create an ExpiringKey
, such as (each key can have its own TTL; even if all of the keys will end up having the same TTL value, this technique removes the need to create a Map
decorator or some other implementation of the Map
interface):
public class ExpiringKey<T> {
private final T key;
private final long expirationTimestamp;
public ExpiringKey(T key, long ttlInMillis) {
this.key = key;
expirationTimestamp = System.currentTimeMillis() + ttlInMillis;
}
public T getKey() {
return key;
}
public boolean isExpired() {
return System.currentTimeMillis() > expirationTimestamp;
}
}
Now the type of the map would be Map<ExpiringKey<K>, V>
with some specific K
and V
type values. The background thread can be represented using a Runnable
that resembles the following:
public class ExpiredKeyRemover implements Runnable {
private final Map<ExpiringKey<?>, ?> map;
public ExpiredKeyRemover(Map<ExpiringKey<?>, ?> map) {
this.map = map;
}
@Override
public void run() {
Iterator<ExpiringKey<?>> it = map.keySet().iterator();
while (it.hasNext()) {
if (it.next().isExpired()) {
it.remove();
}
}
}
}
Then the Runnable
can be started so that it executes at a fixed interval using a ScheduledExecutorService
as follows (which will clean up the map every 5 seconds):
Map<ExpiringKey<K>, V> myMap = // ...
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
executor.scheduleAtFixedRate(new ExpiredKeyRemover(myMap), 0, 5, TimeUnit.SECONDS);
It is important to note that the Map
implementation used for myMap
must be synchronized or allow for concurrent access. The challenge with a concurrent Map
implementation is that the ExpiredKeyRemover
may see a different view of the map than a client and an expired key may be returned to the client if the clean-up thread is not completed removing other keys (even if it has removed the desired/expired key since its changes may not have been committed yet). Additionally, the above key-removal code can be implemented using streams, but the above code has been used just to illustrate the logic rather than provide a performant implementation.
Hope that helps.
Created a data structure. Called it DuplicateActionFilterByInsertTime
.
The correct notion is filtering duplicate messages. The following class filters from insert time for some period (filterMillis
).
Implementation:
public class DuplicateActionFilterByInsertTime<E extends Runnable> {
private static final Logger LOGGER = Logger.getLogger(DuplicateActionFilterByInsertTime.class.getName());
private final long filterMillis;
private final ConcurrentHashMap<E, SilenceInfoImpl> actionMap = new ConcurrentHashMap<>();
private final ConcurrentLinkedQueue<E> actionQueue = new ConcurrentLinkedQueue<>();
private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
private final AtomicBoolean purgerRegistered = new AtomicBoolean(false);
private final Set<Listener<E>> listeners = ConcurrentHashMap.newKeySet();
public DuplicateActionFilterByInsertTime(int filterMillis) {
this.filterMillis = filterMillis;
}
public SilenceInfo get(E e) {
SilenceInfoImpl insertionData = actionMap.get(e);
if (insertionData == null || insertionData.isExpired(filterMillis)) {
return null;
}
return insertionData;
}
public boolean run(E e) {
actionMap.computeIfPresent(e, (e1, insertionData) -> {
int count = insertionData.incrementAndGet();
if (count == 2) {
notifyFilteringStarted(e1);
}
return insertionData;
});
boolean isNew = actionMap.computeIfAbsent(e, e1 -> {
SilenceInfoImpl insertionData = new SilenceInfoImpl();
actionQueue.add(e1);
return insertionData;
}).getCount() == 1;
tryRegisterPurger();
if (isNew) {
e.run();
}
return isNew;
}
private void tryRegisterPurger() {
if (actionMap.size() != 0 && purgerRegistered.compareAndSet(false, true)) {
scheduledExecutorService.schedule(() -> {
try {
for (Iterator<E> iterator = actionQueue.iterator(); iterator.hasNext(); ) {
E e = iterator.next();
SilenceInfoImpl insertionData = actionMap.get(e);
if (insertionData == null || insertionData.isExpired(filterMillis)) {
iterator.remove();
}
if (insertionData != null && insertionData.isExpired(filterMillis)) {
SilenceInfoImpl removed = actionMap.remove(e);
FilteredItem<E> filteredItem = new FilteredItem<>(e, removed);
notifySilenceFinished(filteredItem);
} else {
// All the elements that were left shouldn't be purged.
break;
}
}
} finally {
purgerRegistered.set(false);
tryRegisterPurger();
}
}, filterMillis, TimeUnit.MILLISECONDS);
}
}
private void notifySilenceFinished(FilteredItem<E> filteredItem) {
new Thread(() -> listeners.forEach(l -> {
try {
l.onFilteringFinished(filteredItem);
} catch (Exception e) {
LOGGER.log(Level.WARNING, "Purge notification failed. Continuing to next one (if exists)", e);
}
})).start();
}
private void notifyFilteringStarted(final E e) {
new Thread(() -> listeners.forEach(l -> {
try {
l.onFilteringStarted(e);
} catch (Exception e1) {
LOGGER.log(Level.WARNING, "Silence started notification failed. Continuing to next one (if exists)", e1);
}
})).start();
}
public void addListener(Listener<E> listener) {
listeners.add(listener);
}
public void removeLister(Listener<E> listener) {
listeners.remove(listener);
}
public interface SilenceInfo {
long getInsertTimeMillis();
int getCount();
}
public interface Listener<E> {
void onFilteringStarted(E e);
void onFilteringFinished(FilteredItem<E> filteredItem);
}
private static class SilenceInfoImpl implements SilenceInfo {
private final long insertTimeMillis = System.currentTimeMillis();
private AtomicInteger count = new AtomicInteger(1);
int incrementAndGet() {
return count.incrementAndGet();
}
@Override
public long getInsertTimeMillis() {
return insertTimeMillis;
}
@Override
public int getCount() {
return count.get();
}
boolean isExpired(long expirationMillis) {
return insertTimeMillis + expirationMillis < System.currentTimeMillis();
}
}
public static class FilteredItem<E> {
private final E item;
private final SilenceInfo silenceInfo;
FilteredItem(E item, SilenceInfo silenceInfo) {
this.item = item;
this.silenceInfo = silenceInfo;
}
public E getItem() {
return item;
}
public SilenceInfo getSilenceInfo() {
return silenceInfo;
}
}
}
Test example: (More tests here)
@Test
public void testSimple() throws InterruptedException {
int filterMillis = 100;
DuplicateActionFilterByInsertTime<Runnable> expSet = new DuplicateActionFilterByInsertTime<>(filterMillis);
AtomicInteger purgeCount = new AtomicInteger(0);
expSet.addListener(new DuplicateActionFilterByInsertTime.Listener<Runnable>() {
@Override
public void onFilteringFinished(DuplicateActionFilterByInsertTime.FilteredItem<Runnable> filteredItem) {
purgeCount.incrementAndGet();
}
@Override
public void onFilteringStarted(Runnable runnable) {
}
});
Runnable key = () -> {
};
long beforeAddMillis = System.currentTimeMillis();
boolean added = expSet.run(key);
long afterAddMillis = System.currentTimeMillis();
Assert.assertTrue(added);
DuplicateActionFilterByInsertTime.SilenceInfo silenceInfo = expSet.get(key);
Assertions.assertThat(silenceInfo.getInsertTimeMillis()).isBetween(beforeAddMillis, afterAddMillis);
expSet.run(key);
DuplicateActionFilterByInsertTime.SilenceInfo silenceInfo2 = expSet.get(key);
Assert.assertEquals(silenceInfo.getInsertTimeMillis(), silenceInfo2.getInsertTimeMillis());
Assert.assertFalse(silenceInfo.getInsertTimeMillis() + filterMillis < System.currentTimeMillis());
Assert.assertEquals(silenceInfo.getCount(), 2);
Thread.sleep(filterMillis);
Assertions.assertThat(expSet.get(key)).isNull();
Assert.assertNull(expSet.get(key));
Thread.sleep(filterMillis * 2); // Give a chance to purge the items.
Assert.assertEquals(1, purgeCount.get());
System.out.println("Finished");
}
Source.