小工具      在线工具  汉语词典  dos游戏  css  js  c++  java

简单的延时消费队列

Java 额外说明

收录于:52天前

1.一个简单的延迟消费队列单元测试,添加2个任务到队列然后消费。代码如下:

package com.robinboot.facade;

import com.robinboot.service.task.DelayRunnable;
import com.robinboot.service.task.DelayRunnable;
import com.robinboot.service.task.TaskManager;
import org.apache.log4j.Logger;
import org.junit.Before;
import org.junit.Test;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * @auther: TF12778
 * @date: 2020/11/26 11:22
 * @description:简单的延时消费队列单元测试,将2个任务添加到队列中,然后消费
 */
public class TaskTest extends BaseTest{

    public static final Logger logger=Logger.getLogger(TaskTest.class);

    ExecutorService singlePooling = Executors.newSingleThreadExecutor();
    DelayRunnable delayTask = new DelayRunnable();

    @Before
    public void tastttt1() {
        singlePooling.submit(delayTask);
    }

    @Test
    public void tastttt() {

        delayTask.put(new Runnable() {
            @Override
            public void run() {
                System.out.println("delayTask------1");
            }
        }, 1);

        delayTask.put(new Runnable() {
            @Override
            public void run() {
                System.out.println("delayTask------2");
            }
        }, 5);
    }
}

结果:

2、DelayRunnable主要功能:

声明一个延迟任务队列,将任务添加到任务队列中,每隔100ms取出一个任务执行,并将任务提交到线程池

package com.robinboot.service.task;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.*;

/**
 * @auther: TF12778
 * @date: 2020/11/26 11:02
 * @description:声明了一个延时任务队列,将任务添加到任务队列中,每100ms取出一个任务去执行, 任务提交到线程池
 */
public class DelayRunnable implements Runnable {

    private static final Logger logger = LoggerFactory.getLogger(DelayRunnable.class);

    private ExecutorService pooling  = new ThreadPoolExecutor(2, 3, 100, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(10));

    private DelayQueue<DelayedTask<Runnable>> dQueue = new DelayQueue<>();
    private volatile boolean started = true;

    public void put(Runnable r, long delay){
        dQueue.add(new DelayedTask<Runnable>(r, delay));
    }

    @Override
    public void run() {
        DelayedTask<Runnable> task = null;
        while (started){
            try {
                task = dQueue.poll(100, TimeUnit.MILLISECONDS); // 每100ms取出一个任务去执行
            }catch (InterruptedException e){
                logger.warn("dQueue thread is interrupted.",e);
            }
            if(task != null){
                logger.info("tasks in delayQueue: " + dQueue.size());
                fireTask(task.getItem());
                task = null;
            }
        }
    }

    public void stop(){
        started = false;
    }


    private void fireTask(Runnable r){
        this.pooling.submit(r);  // 任务提交到线程池
    }
}

package com.robinboot.service.task;

import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

/**
 * @auther: sww
 * @date: 2020/11/26 10:57
 * @description:
 */
public class DelayedTask<T> implements Delayed {

    /**
     * Sequence number to break scheduling ties, and in turn to guarantee FIFO order among tied
     * entries.
     */
    private static final AtomicLong SEQUENCER = new AtomicLong(0);
    /** The time the task is enabled to execute in nanoTime units */
    private long time;
    /** Sequence number to break ties FIFO */
    private final long SEQUENCE_NUMBER;
    /** Base of nanosecond timings, to avoid wrapping */
    private static final long NANO_ORIGIN = System.nanoTime();
    private final T item;

    public DelayedTask(T submit, long timeout) {
        this.time = now() + TimeUnit.NANOSECONDS.convert(timeout, TimeUnit.MILLISECONDS);
        this.item = submit;
        this.SEQUENCE_NUMBER = SEQUENCER.getAndIncrement();
    }

    public T getItem() {
        return this.item;
    }

    final static long now() {
        return System.nanoTime() - NANO_ORIGIN;
    }

    @Override
    public long getDelay(TimeUnit unit) {
        return unit.convert(time - now(), TimeUnit.NANOSECONDS);
    }

    @Override
    public int compareTo(Delayed o) {
        if(o == this){
            return 0;
        }
        if(o instanceof DelayedTask){
            DelayedTask t = (DelayedTask) o;
            long diff = this.time - t.time;
            if(diff < 0){
                return -1;
            }else if(diff > 0){
                return 1;
            }else if(SEQUENCE_NUMBER < t.SEQUENCE_NUMBER){
                return -1;
            }else {
                return 1;
            }
        }
        long d = (getDelay(TimeUnit.NANOSECONDS) - o.getDelay(TimeUnit.NANOSECONDS));
        return (d == 0) ? 0 : ((d < 0) ? -1 : 1);
    }

    @Override
    public boolean equals(Object object){
        if(object ==null || !(object instanceof DelayedTask)){
            return false;
        }
        return this.getItem().equals(((DelayedTask<T>)object).getItem());
    }
}

. . .

相关推荐

额外说明

PDF转换工具安装教程

目录 一、下载地址 二、软件介绍  三、安装步骤 1、下载PDF转换软件后解压,然后双击打开exe程序。 2、 选择自定义安装。 3、点击浏览选择软件安装路径,可在除C盘以外新建一个PDF的文件夹。 4、点击下一步。 5、点击下一步。 6、点击安装。 7

额外说明

2.运行官方示例

1.部署例子 1.找到如下3个 war 包 2.放到 tomcat7 的 webapps 目录中 3. 启动 tomcat (使用的是H2数据库) 2.测试流程 4.登录 5.点击 身份管理 (这个是退出按钮) 6.创建用户 7.流程创建 (activi

额外说明

Influx Sql系列教程四:series/point/tag/field

        influxdb中的一条记录point,主要可以分为三类,必须存在的time(时间),string类型的tag,以及其他成员field;而series则是一个measurement中保存策略和tag集构成;本篇教程将介绍一些这几个概念 1

额外说明

rabbitmq下载地址

    https://packagecloud.io/rabbitmq    

额外说明

逆波兰表达式

逆波兰表达式又称作后缀表达式,在四则混合运算的程序设计中用到。 例如: 1+2写成后缀表达式就是12+ 4+5*(3-2)的后缀表达式就是4532-*+ 后缀表达式在四则运算中带来了意想不到的方便,在生成过程中自动保持了优先级; 生成逆波兰表达式的算法如

额外说明

C.9 文档级关系抽取:基于结构先验产生注意力偏差SSAN模型

NLP专栏简介:数据增强、智能标注、意图识别算法|多分类算法、文本信息抽取、多模态信息抽取、可解释性分析、性能调优、模型压缩算法等 专栏详细介绍:NLP专栏简介:数据增强、智能标注、意图识别算法|多分类算法、文本信息抽取、多模态信息抽取、可解释性分析、性

额外说明

Target runtime Apache Tomcat 5.5 is not defined

在他们机器上整合好的工程在我机器上总是出现Target runtime Apache Tomcat 6.0 is 解决not defined.这个错误,找工程属性也没有,后来网上找到的方法。   在工程目录下的.settings文件夹里,打开org.ec

额外说明

java:自定义变量加载到系统变量后替换shell模版并执行shell

这里的需求前提是,在项目中进行某些操作前,需要在命令后对shell配置文件的进行修改(如ip、port),这个对于用户是不友好的,需要改为用户页面输入ip、port,后台自动去操作修改配置;那么这篇博客的由来就有了。 上面图片是AI创作,未经允许,不可商

额外说明

全网最新的vue.js下载和安装的3种方法(2023年)

文章目录 1. 文章引言 2. 环境搭建 3. 安装vue.js 3.1 方法一:官网下载vue.js源代码 3.2 方法二:使用npm install创建 3.3 方法三:使用bower下载 4. 总结 1. 文章引言 我主要从事java后端开发,但对

额外说明

日期工具类-操作字符串和Date、LocalDate互转,两个日期的时间差等

避免重复造轮子,相关方法基于hutool日期时间工具封装并做部分增强。需要先引入如下坐标 <dependency> <groupId>cn.hutool</groupId> <artifactId>hutool-all</artifact

ads via 小工具