1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102
| import org.slf4j.Logger; import org.slf4j.LoggerFactory;
import java.util.concurrent.DelayQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit;
public class DelayQueueManager { private static final Logger LOG = LoggerFactory.getLogger(DelayQueueManager.class);
private String name; private ExecutorService executor; private Thread monitorThread; private DelayQueue<DelayTask<?>> delayQueue;
public DelayQueueManager(String name, int poolSize) { this.name = name; this.executor = Executors.newFixedThreadPool(poolSize); this.delayQueue = new DelayQueue<>(); init(); }
private void init() { monitorThread = new Thread(() -> { execute(); }, "DelayQueueMonitor-" + name); monitorThread.start(); }
private void execute() { while (true) { LOG.info("当前延时任务数量:" + delayQueue.size()); try { DelayTask<?> delayTask = delayQueue.take(); if (delayTask != null) { Runnable task = delayTask.getTask(); if (task != null) { executor.execute(task); } } } catch (Exception e) { LOG.error(null, e); } } }
public void put(String id, Runnable task, long time, TimeUnit unit) { long timeout = TimeUnit.MILLISECONDS.convert(time, unit); long delayTimeMillis = System.currentTimeMillis() + timeout; delayQueue.put(new DelayTask<>(id, delayTimeMillis, task)); }
public void putAt(String id, Runnable task, long delayTimeMillis) { delayQueue.put(new DelayTask<>(id, delayTimeMillis, task)); }
public boolean removeTaskById(String id) { DelayTask task = new DelayTask(id, 0, null); return delayQueue.remove(task); }
public boolean removeTask(DelayTask task) { return delayQueue.remove(task); } }
|