问答平台(7),任务执行和调度

JDK 线程池

1
2
- ExecutorService: JDK 普通线程池
- ScheduledExecutorService: JDK 定时任务线程池

Spring 线程池

1
2
- ThreadPoolTaskExecutor: Spring 普通线程池
- ThreadPoolTaskScheduler: Spring 定时任务线程池

测试类

  • ThreadPoolTests: 新增
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    @RunWith(SpringRunner.class)
    @SpringBootTest
    @ContextConfiguration(classes = CommunityApplication.class)
    public class ThreadPoolTests {

    private static final Logger logger = LoggerFactory.getLogger(ThreadPoolTests.class);

    // JDK普通线程池
    private ExecutorService executorService = Executors.newFixedThreadPool(5);

    // JDK可执行定时任务的线程池
    private ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(5);

    // Spring普通线程池
    @Autowired
    private ThreadPoolTaskExecutor taskExecutor;

    // Spring可执行定时任务的线程池
    @Autowired
    private ThreadPoolTaskScheduler taskScheduler;

    @Autowired
    private AlphaService alphaService;

    private void sleep(long m) {
    try {
    Thread.sleep(m);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }

    // 1.JDK普通线程池
    @Test
    public void testExecutorService() {
    Runnable task = new Runnable() {
    @Override
    public void run() {
    logger.debug("Hello ExecutorService");
    }
    };

    for (int i = 0; i < 10; i++) {
    executorService.submit(task);
    }

    sleep(10000);
    }

    // 2.JDK定时任务线程池
    @Test
    public void testScheduledExecutorService() {
    Runnable task = new Runnable() {
    @Override
    public void run() {
    logger.debug("Hello ScheduledExecutorService");
    }
    };

    scheduledExecutorService.scheduleAtFixedRate(task, 10000, 1000, TimeUnit.MILLISECONDS);

    sleep(30000);
    }

    // 3.Spring普通线程池
    @Test
    public void testThreadPoolTaskExecutor() {
    Runnable task = new Runnable() {
    @Override
    public void run() {
    logger.debug("Hello ThreadPoolTaskExecutor");
    }
    };

    for (int i = 0; i < 10; i++) {
    taskExecutor.submit(task);
    }

    sleep(10000);
    }

    // 4.Spring定时任务线程池
    @Test
    public void testThreadPoolTaskScheduler() {
    Runnable task = new Runnable() {
    @Override
    public void run() {
    logger.debug("Hello ThreadPoolTaskScheduler");
    }
    };

    Date startTime = new Date(System.currentTimeMillis() + 10000);
    taskScheduler.scheduleAtFixedRate(task, startTime, 1000);

    sleep(30000);
    }

    // 5.Spring普通线程池(简化)
    @Test
    public void testThreadPoolTaskExecutorSimple() {
    for (int i = 0; i < 10; i++) {
    alphaService.execute1();
    }

    sleep(10000);
    }

    // 6.Spring定时任务线程池(简化)
    @Test
    public void testThreadPoolTaskSchedulerSimple() {
    sleep(30000);
    }
    }

参数配置

  • application-develop.properties: 增加内容
    1
    2
    3
    4
    5
    6
    7
    # TaskExecutionProperties
    spring.task.execution.pool.core-size=5
    spring.task.execution.pool.max-size=15
    spring.task.execution.pool.queue-capacity=100

    # TaskSchedulingProperties
    spring.task.scheduling.pool.size=5

config

  • ThreadPoolConfig: 新增
    1
    2
    3
    4
    5
    6
    @Configuration
    @EnableScheduling
    @EnableAsync
    public class ThreadPoolConfig {

    }

业务层

  • AlphaService: 增加内容
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    private static final Logger logger = LoggerFactory.getLogger(AlphaService.class);

    // 让该方法在多线程环境下,被异步地调用
    @Async
    public void execute1() {
    logger.debug("execute1");
    }

    @Scheduled(initialDelay = 10000, fixedRate = 1000)
    public void execute2() {
    logger.debug("execute2");
    }

分布式定时任务

Spring Quartz
分布式定时任务-图示

导入依赖

1
2
3
4
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-quartz</artifactId>
</dependency>

参数配置

  • application-develop.properties: 增加内容
    1
    2
    3
    4
    5
    6
    7
    8
    9
    # QuartzProperties
    spring.quartz.job-store-type=jdbc
    spring.quartz.scheduler-name=communityScheduler
    spring.quartz.properties.org.quartz.scheduler.instanceId=AUTO
    spring.quartz.properties.org.quartz.jobStore.class=org.quartz.impl.jdbcjobstore.JobStoreTX
    spring.quartz.properties.org.quartz.jobStore.driverDelegateClass=org.quartz.impl.jdbcjobstore.StdJDBCDelegate
    spring.quartz.properties.org.quartz.jobStore.isClustered=true
    spring.quartz.properties.org.quartz.threadPool.class=org.quartz.simpl.SimpleThreadPool
    spring.quartz.properties.org.quartz.threadPool.threadCount=5

分析

1
2
3
4
5
一旦任务初始化完成就会存到数据库的qrtz_triggers表中。
- Scheduler: 核心调度接口
- Job: 定义任务接口
- JobDetail: 任务的配置接口
- Trigger: 任务的配置接口

quartz

  • AlphaJob: 新增
    1
    2
    3
    4
    5
    6
    public class AlphaJob implements Job {
    @Override
    public void execute(JobExecutionContext context) throws JobExecutionException {
    System.out.println(Thread.currentThread().getName() + ": execute a quartz job.");
    }
    }

config

  • QuartzConfig: 新增
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    // 配置 -> 数据库 -> 调用
    @Configuration
    public class QuartzConfig {

    // FactoryBean可简化Bean的实例化过程:
    // 1.通过FactoryBean封装Bean的实例化过程
    // 2.将FactoryBean装配到Spring容器里
    // 3.将FactoryBean注入给其他的Bean
    // 4.该Bean得到的是FactoryBean所管理的对象实例

    // 配置JobDetail
    @Bean
    public JobDetailFactoryBean alphaJobDetail() {
    JobDetailFactoryBean factoryBean = new JobDetailFactoryBean();
    factoryBean.setJobClass(AlphaJob.class);
    factoryBean.setName("alphaJob");
    factoryBean.setGroup("alphaJobGroup");
    factoryBean.setDurability(true);
    factoryBean.setRequestsRecovery(true);
    return factoryBean;
    }

    // 配置Trigger(SimpleTriggerFactoryBean, CronTriggerFactoryBean)
    @Bean
    public SimpleTriggerFactoryBean alphaTrigger(JobDetail alphaJobDetail) {
    SimpleTriggerFactoryBean factoryBean = new SimpleTriggerFactoryBean();
    factoryBean.setJobDetail(alphaJobDetail);
    factoryBean.setName("alphaTrigger");
    factoryBean.setGroup("alphaTriggerGroup");
    factoryBean.setRepeatInterval(3000);
    factoryBean.setJobDataMap(new JobDataMap());
    return factoryBean;
    }
    }

测试类

  • QuartzTests: 新增
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    @RunWith(SpringRunner.class)
    @SpringBootTest
    @ContextConfiguration(classes = CommunityApplication.class)
    public class QuartzTests {

    @Autowired
    private Scheduler scheduler;

    @Test
    public void testDeleteJob() {
    try {
    boolean result = scheduler.deleteJob(new JobKey("alphaJob", "alphaJobGroup"));
    System.out.println(result);
    } catch (SchedulerException e) {
    e.printStackTrace();
    }
    }
    }

参考资料


问答平台(7),任务执行和调度
https://lcf163.github.io/2020/06/16/问答平台(7),任务执行和调度/
作者
乘风的小站
发布于
2020年6月16日
许可协议