线程池中的饱和策略

线程池中的饱和策略

ThreadPoolExecutor允许提供一个BlockingQueue来保存等待执行的任务。

查看结构图

我们需要关注的方法是offer(E),put(E),take()

newFixedThreadPool和newSingleThreadExecutor在默认情况下将使用一个无界的队列(LinkedBlockingQueue),如果所有线程都在执行任务,那么任务将在队列中等待,如果任务到达的速度大于线程执行的速度,造成的后果将是队列无限期增加。

更稳妥的管理策略是使用有界队列,如:ArrayBlockingQueue,有界的LinkedBlockingQueue,PriorityBlockingQueue.有界队列避免了资源耗尽的情况,但出现一个问题,队列填满后,新的任务该怎么办?使用拒绝策略。

JDK提供了几种不同的RejectedExecutionHandler实现,每种都是不同的饱和策略:AbortPolicy,CallerRunsPolicy,DiscardPolicy和DiscardOldestPolicy.

AbortPolicy 当任务添加到线程池中被拒绝时,它将抛出 RejectedExecutionException 异常。如下代码:

1 package cn.concurrent.executor;

2

3 import java.util.concurrent.*;

4

5 /**

6 * Created by spark on 17-9-24.

7 */

8 public class AbortPolicyDemo {

9

10 public static void main(String[] args) {

11 //初始化一个初始化容量大小为1,阻塞队列容量为1,maxmumPoolSize大小为1的线程池

12 ThreadPoolExecutor pool = new ThreadPoolExecutor(1, 1,

13 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>(1));

14 //设置饱和策略为AbortPolicy---拒绝策略/**/,用户可以捕获这个异常

15 pool.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());

16 //创建线程执行

17 for (int i = 0; i < 5; i++) {

18 MyRunnable myRunnable = new MyRunnable();

19 pool.execute(myRunnable);

20 }

21 pool.shutdown();

22 }

23

24

25 static class MyRunnable implements Runnable {

26 @Override

27 public void run() {

28 System.err.println(Thread.currentThread().getId() + ":正在执行");

29 try {

30 Thread.sleep(300);

31 } catch (InterruptedException e) {

32 e.printStackTrace();

33 }

34 }

35 }

36 }

结果如下:

Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task cn.concurrent.executor.AbortPolicyDemo$MyRunnable@1d44bcfa rejected from java.util.concurrent.ThreadPoolExecutor@266474c2[Running, pool size = 1, active threads = 1, queued tasks = 1, completed tasks = 0]

at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2066)

at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)

at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)

at cn.concurrent.executor.AbortPolicyDemo.main(AbortPolicyDemo.java:19)

10:正在执行

10:正在执行

抛出了RejectedExecutionException,由于饱和策略引起的。

如果修改代码如下:

1 package cn.concurrent.executor;

2

3 import java.util.concurrent.*;

4

5 /**

6 * Created by spark on 17-9-24.

7 */

8 public class AbortPolicyDemo {

9

10 public static void main(String[] args) {

11 //初始化一个初始化容量大小为1,阻塞队列容量为1,maxmumPoolSize大小为1的线程池

12 ThreadPoolExecutor pool = new ThreadPoolExecutor(1, 1,

13 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>(1));

14 //设置饱和策略为AbortPolicy---拒绝策略/**/,用户可以捕获这个异常

15 pool.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());

16 //创建线程执行

17 for (int i = 0; i < 5; i++) {

18 try {

19 Thread.sleep(500);

20 } catch (InterruptedException e) {

21 e.printStackTrace();

22 }

23 MyRunnable myRunnable = new MyRunnable();

24 pool.execute(myRunnable);

25 }

26 pool.shutdown();

27 }

28

29

30 static class MyRunnable implements Runnable {

31 @Override

32 public void run() {

33 System.err.println(Thread.currentThread().getId() + ":正在执行");

34 try {

35 Thread.sleep(300);

36 } catch (InterruptedException e) {

37 e.printStackTrace();

38 }

39 }

40 }

41 }

执行结果为:

1 /usr/lib/jvm/java-1.8.0-openjdk-amd64/bin/java -javaagent:/usr/local/idea/lib/idea_rt.jar=43205:/usr/local/idea/bin -Dfile.encoding=UTF-8 -classpath /usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/charsets.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/cldrdata.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/dnsns.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/icedtea-sound.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/jaccess.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/localedata.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/nashorn.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/sunec.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/sunjce_provider.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/sunpkcs11.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/zipfs.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/jce.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/jsse.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/management-agent.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/resources.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/rt.jar:/home/spark/IdeaProjects/thread/thread/target/classes:/home/spark/.m2/repository/com/lmax/disruptor/3.3.2/disruptor-3.3.2.jar cn.concurrent.executor.AbortPolicyDemo

2 10:正在执行

3 10:正在执行

4 10:正在执行

5 10:正在执行

6 10:正在执行

7

8 Process finished with exit code 0

原因是:正好每一个线程都有足够的时间执行,因此有界阻塞队列不会填满,程序能够正常运行。

DiscardOldestPolicy 当任务添加到线程池中被拒绝时,线程池会放弃等待队列中最旧的未处理任务(抛弃下一个将被执行的任务),然后将被拒绝的任务添加到等待队列中,如果队列是一个优先队列,那么抛弃最旧的策略就会抛弃优先级最高的任务,因此不要将两者在一起使用。如下代码:

1 package cn.concurrent.executor;

2

3 import java.util.concurrent.ArrayBlockingQueue;

4 import java.util.concurrent.ThreadPoolExecutor;

5 import java.util.concurrent.TimeUnit;

6

7 /**

8 * Created by spark on 17-9-24.

9 */

10 public class DiscardOledesrPolicy {

11

12 public static void main(String[] args) {

13

14 ThreadPoolExecutor pool = new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS,

15 new ArrayBlockingQueue(1));

16 //设置饱和策略为DiscardOledestPolicy

17 pool.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy());

18

19

20 for (int i = 0; i < 6; i++) {

21 MyRunnable myRunnable = new MyRunnable("this is " + i + " task");

22 pool.submit(myRunnable);

23 }

24 pool.shutdown();

25 }

26

27 static class MyRunnable implements Runnable {

28

29 private String name;

30

31 public MyRunnable(String name) {

32 this.name = name;

33 }

34

35 @Override

36 public void run() {

37 System.err.println(this.name + ": is running.");

38 try {

39 Thread.sleep(300);

40 } catch (InterruptedException e) {

41 e.printStackTrace();

42 }

43 }

44 }

45 }

结果如下:

/usr/lib/jvm/java-1.8.0-openjdk-amd64/bin/java -javaagent:/usr/local/idea/lib/idea_rt.jar=33258:/usr/local/idea/bin -Dfile.encoding=UTF-8 -classpath /usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/charsets.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/cldrdata.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/dnsns.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/icedtea-sound.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/jaccess.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/localedata.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/nashorn.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/sunec.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/sunjce_provider.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/sunpkcs11.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/zipfs.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/jce.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/jsse.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/management-agent.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/resources.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/rt.jar:/home/spark/IdeaProjects/thread/thread/target/classes:/home/spark/.m2/repository/com/lmax/disruptor/3.3.2/disruptor-3.3.2.jar cn.concurrent.executor.DiscardOledesrPolicy

this is 0 task: is running.

this is 5 task: is running.

Process finished with exit code 0

从结果中可以看到,1,2,3,4都被丢弃了。

DiscardPolicy 该策略默默地丢弃无法处理的任务,不予任何处理。

代码如下:

1 package cn.concurrent.executor;

2

3 import java.util.concurrent.ArrayBlockingQueue;

4 import java.util.concurrent.PriorityBlockingQueue;

5 import java.util.concurrent.ThreadPoolExecutor;

6 import java.util.concurrent.TimeUnit;

7

8 /**

9 * Created by spark on 17-9-24.

10 */

11 public class DiscardPolicy {

12

13 public static void main(String[] args) {

14 ThreadPoolExecutor pool=new ThreadPoolExecutor(1,1,0, TimeUnit.SECONDS,

15 new ArrayBlockingQueue(1));

16 //添加饱和策略为丢弃策略

17 pool.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());

18 for (int i = 0; i < 6; i++) {

19 MyRunnable myRunnable = new MyRunnable("this is " + i + " task");

20 pool.submit(myRunnable);

21 }

22 pool.shutdown();

23 }

24 static class MyRunnable implements Runnable {

25

26 private String name;

27

28 public MyRunnable(String name) {

29 this.name = name;

30 }

31

32 @Override

33 public void run() {

34 System.err.println(this.name + ": is running.");

35 try {

36 Thread.sleep(300);

37 } catch (InterruptedException e) {

38 e.printStackTrace();

39 }

40 }

41 }

42 }

结果如下:

1 /usr/lib/jvm/java-1.8.0-openjdk-amd64/bin/java -javaagent:/usr/local/idea/lib/idea_rt.jar=41981:/usr/local/idea/bin -Dfile.encoding=UTF-8 -classpath /usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/charsets.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/cldrdata.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/dnsns.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/icedtea-sound.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/jaccess.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/localedata.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/nashorn.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/sunec.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/sunjce_provider.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/sunpkcs11.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/zipfs.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/jce.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/jsse.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/management-agent.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/resources.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/rt.jar:/home/spark/IdeaProjects/thread/thread/target/classes:/home/spark/.m2/repository/com/lmax/disruptor/3.3.2/disruptor-3.3.2.jar cn.concurrent.executor.DiscardPolicy

2 this is 0 task: is running.

3 this is 1 task: is running.

4

5 Process finished with exit code 0

从结果可以看出,2,3,4,5任务都被丢弃了。

线程池pool的"最大池大小"和"核心池大小"都为1(THREADS_SIZE),这意味着"线程池能同时运行的任务数量最大只能是1"。线程池pool的阻塞队列是ArrayBlockingQueue,ArrayBlockingQueue是一个有界的阻塞队列,ArrayBlockingQueue的容量为1。这也意味着线程池的阻塞队列只能有一个线程池阻塞等待。根据""中分析的execute()代码可知:线程池中共运行了2个任务。第1个任务直接放到Worker中,通过线程去执行;第2个任务放到阻塞队列中等待。其他的任务都被丢弃了!

修改线程池中的队列为PriorityBlockingQueue看看结果

1 package cn.concurrent.executor;

2

3 import java.util.concurrent.ArrayBlockingQueue;

4 import java.util.concurrent.PriorityBlockingQueue;

5 import java.util.concurrent.ThreadPoolExecutor;

6 import java.util.concurrent.TimeUnit;

7

8 /**

9 * Created by spark on 17-9-24.

10 * 13 */

14 public class DiscardPolicy2 {

15

16 public static void main(String[] args) {

17 ThreadPoolExecutor pool=new ThreadPoolExecutor(1,1,0, TimeUnit.SECONDS,

18 new PriorityBlockingQueue<>(1));

19 //添加饱和策略为丢弃策略

20 pool.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());

21 for (int i = 0; i < 6; i++) {

22 MyRunnable myRunnable = new MyRunnable("this is " + i + " task");

23 pool.submit(myRunnable);

24 }

25 pool.shutdown();

26 }

27 static class MyRunnable implements Runnable {

28

29 private String name;

30

31 public MyRunnable(String name) {

32 this.name = name;

33 }

34

35 @Override

36 public void run() {

37 System.err.println(this.name + ": is running.");

38 try {

39 Thread.sleep(300);

40 } catch (InterruptedException e) {

41 e.printStackTrace();

42 }

43 }

44 }

45 }

结果如下:

/usr/lib/jvm/java-1.8.0-openjdk-amd64/bin/java -javaagent:/usr/local/idea/lib/idea_rt.jar=42797:/usr/local/idea/bin -Dfile.encoding=UTF-8 -classpath /usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/charsets.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/cldrdata.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/dnsns.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/icedtea-sound.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/jaccess.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/localedata.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/nashorn.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/sunec.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/sunjce_provider.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/sunpkcs11.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/zipfs.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/jce.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/jsse.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/management-agent.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/resources.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/rt.jar:/home/spark/IdeaProjects/thread/thread/target/classes:/home/spark/.m2/repository/com/lmax/disruptor/3.3.2/disruptor-3.3.2.jar cn.concurrent.executor.DiscardPolicy2

Exception in thread "main" java.lang.ClassCastException: java.util.concurrent.FutureTask cannot be cast to java.lang.Comparable

at java.util.concurrent.PriorityBlockingQueue.siftUpComparable(PriorityBlockingQueue.java:357)

at java.util.concurrent.PriorityBlockingQueue.offer(PriorityBlockingQueue.java:489)

at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1371)

at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112)

at cn.concurrent.executor.DiscardPolicy2.main(DiscardPolicy2.java:23)

this is 0 task: is running.

报错的原因是因为我们的任务没有优先级,因此应该实现Comparaable接口再看看

1 package cn.concurrent.executor;

2

3 import java.util.concurrent.ArrayBlockingQueue;

4 import java.util.concurrent.PriorityBlockingQueue;

5 import java.util.concurrent.ThreadPoolExecutor;

6 import java.util.concurrent.TimeUnit;

7

8 /**

9 * Created by spark on 17-9-24.

10 * 线程池pool的"最大池大小"和"核心池大小"都为1(THREADS_SIZE),这意味着"线程池能同时运行的任务数量最大只能是1"。

11 * 线程池pool的阻塞队列是ArrayBlockingQueue,ArrayBlockingQueue是一个有界的阻塞队列,ArrayBlockingQueue的容量为1。这也意味着线程池的阻塞队列只能有一个线程池阻塞等待。

12 * 根据""中分析的execute()代码可知:线程池中共运行了2个任务。第1个任务直接放到Worker中,通过线程去执行;第2个任务放到阻塞队列中等待。其他的任务都被丢弃了!

13 */

14 public class DiscardPolicy2 {

15

16 public static void main(String[] args) {

17 ThreadPoolExecutor pool = new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS,

18 new PriorityBlockingQueue<>(1));

19 //添加饱和策略为丢弃策略

20 pool.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());

21 for (int i = 0; i < 6; i++) {

22 MyRunnable myRunnable = new MyRunnable("this is " + i + " task", i);

23 pool.execute(myRunnable);

24 }

25 pool.shutdown();

26 }

27

28 static class MyRunnable implements Runnable, Comparable {

29

30 private String name;

31 private int num;

32

33 public MyRunnable(String name, int num) {

34 this.name = name;

35 this.num = num;

36 }

37

38 public MyRunnable(String name) {

39 this.name = name;

40 }

41

42 @Override

43 public void run() {

44 System.err.println(this.name + ": is running.");

45 try {

46 Thread.sleep(300);

47 } catch (InterruptedException e) {

48 e.printStackTrace();

49 }

50 }

51

52 @Override

53 public int compareTo(Object o) {

54 return 0;

55 }

56 }

57 }

结果:

/usr/lib/jvm/java-1.8.0-openjdk-amd64/bin/java -agentlib:jdwp=transport=dt_socket,address=127.0.0.1:41534,suspend=y,server=n -Dfile.encoding=UTF-8 -classpath /usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/charsets.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/cldrdata.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/dnsns.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/icedtea-sound.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/jaccess.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/localedata.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/nashorn.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/sunec.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/sunjce_provider.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/sunpkcs11.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/zipfs.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/jce.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/jsse.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/management-agent.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/resources.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/rt.jar:/home/spark/IdeaProjects/thread/thread/target/classes:/home/spark/.m2/repository/com/lmax/disruptor/3.3.2/disruptor-3.3.2.jar:/usr/local/idea/lib/idea_rt.jar cn.concurrent.executor.DiscardPolicy2

Connected to the target VM, address: '127.0.0.1:41534', transport: 'socket'

this is 0 task: is running.

this is 1 task: is running.

this is 5 task: is running.

this is 4 task: is running.

this is 3 task: is running.

this is 2 task: is running.

Disconnected from the target VM, address: '127.0.0.1:41534', transport: 'socket'

Process finished with exit code 0

这里使用execute没有使用submit是因为submit返回的结果为FutureTask,这个类没有实现Comparable

CallerRunsPolicy 该策略只要线程池未关闭,该策略直接在调用者线程中,运行当前被丢弃的任务(白话就是不会抛弃线程,也不抛出异常,而是将任务回退到调用者, 从而降低新任务的流量),这样会影响QPS。

代码如下:

1 package cn.concurrent.executor;

2

3 import java.util.concurrent.ArrayBlockingQueue;

4 import java.util.concurrent.ThreadPoolExecutor;

5 import java.util.concurrent.TimeUnit;

6

7 /**

8 * Created by spark on 17-9-24.

9 */

10 public class CallerRunsPolicyDemo {

11 public static void main(String[] args) {

12 ThreadPoolExecutor pool=new ThreadPoolExecutor(1,1,0, TimeUnit.SECONDS,

13 new ArrayBlockingQueue(1));

14 //添加饱和策略为丢弃策略

15 pool.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());

16 for (int i = 0; i < 6; i++) {

17 MyRunnable myRunnable = new MyRunnable("this is " + i + " task");

18 pool.submit(myRunnable);

19 }

20 pool.shutdown();

21 }

22 static class MyRunnable implements Runnable {

23

24 private String name;

25

26 public MyRunnable(String name) {

27 this.name = name;

28 }

29

30 @Override

31 public void run() {

32 System.err.println(this.name + ": is running.");

33 try {

34 Thread.sleep(300);

35 } catch (InterruptedException e) {

36 e.printStackTrace();

37 }

38 }

39 }

40 }

结果如下:

/usr/lib/jvm/java-1.8.0-openjdk-amd64/bin/java -javaagent:/usr/local/idea/lib/idea_rt.jar=40487:/usr/local/idea/bin -Dfile.encoding=UTF-8 -classpath /usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/charsets.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/cldrdata.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/dnsns.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/icedtea-sound.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/jaccess.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/localedata.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/nashorn.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/sunec.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/sunjce_provider.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/sunpkcs11.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/zipfs.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/jce.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/jsse.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/management-agent.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/resources.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/rt.jar:/home/spark/IdeaProjects/thread/thread/target/classes:/home/spark/.m2/repository/com/lmax/disruptor/3.3.2/disruptor-3.3.2.jar cn.concurrent.executor.CallerRunsPolicyDemo

this is 2 task: is running.

this is 0 task: is running.

this is 3 task: is running.

this is 1 task: is running.

this is 5 task: is running.

this is 4 task: is running.

Process finished with exit code 0

我们还可以自定义饱和策略:如下:

1 package cn.concurrent;

2

3 import java.util.concurrent.*;

4

5 /**

6 * Created by spark on 17-9-3.

7 * 主要演示线程池的拒绝策略实现的接口RejectedExecutionHandler

8 */

9 public class RejectThreadPoolDemo {

10

11 public static class MyTask implements Runnable {

12 @Override

13 public void run() {

14 System.out.println(System.currentTimeMillis() + ":Thread ID:" + Thread.currentThread().getId());

15 try {

16 Thread.sleep(100);

17 } catch (InterruptedException e) {

18 e.printStackTrace();

19 }

20 }

21 }

22

23 //实现RejectExecutionHandler

24 public static void main(String[] args) throws InterruptedException {

25 MyTask myTask = new MyTask();

26 //创建一个线程池

27 ExecutorService es = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS,

28 new LinkedBlockingDeque(10),

29 Executors.defaultThreadFactory(),

30 new RejectedExecutionHandler() {

31 //自定义拒绝策略的处理

32 @Override

33 public void rejectedExecution(Runnable runnable, ThreadPoolExecutor threadPoolExecutor) {

34 System.out.println(runnable.toString() + " is discard");

35 }

36 });

37 for(int i=0;i

38 es.submit(myTask);

39 Thread.sleep(10);

40 }

41 }

42 }

结果如下:

/usr/lib/jvm/java-1.8.0-openjdk-amd64/bin/java -javaagent:/usr/local/idea/lib/idea_rt.jar=35478:/usr/local/idea/bin -Dfile.encoding=UTF-8 -classpath /usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/charsets.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/cldrdata.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/dnsns.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/icedtea-sound.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/jaccess.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/localedata.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/nashorn.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/sunec.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/sunjce_provider.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/sunpkcs11.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/zipfs.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/jce.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/jsse.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/management-agent.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/resources.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/rt.jar:/home/spark/IdeaProjects/thread/thread/target/classes:/home/spark/.m2/repository/com/lmax/disruptor/3.3.2/disruptor-3.3.2.jar cn.concurrent.RejectThreadPoolDemo

1506244149620:Thread ID:10

1506244149630:Thread ID:11

1506244149640:Thread ID:12

1506244149651:Thread ID:13

1506244149661:Thread ID:14

1506244149720:Thread ID:10

1506244149730:Thread ID:11

1506244149740:Thread ID:12

1506244149751:Thread ID:13

1506244149761:Thread ID:14

1506244149821:Thread ID:10

1506244149830:Thread ID:11

1506244149841:Thread ID:12

1506244149851:Thread ID:13

1506244149862:Thread ID:14

java.util.concurrent.FutureTask@63947c6b is discard

java.util.concurrent.FutureTask@2b193f2d is discard

java.util.concurrent.FutureTask@355da254 is discard

java.util.concurrent.FutureTask@4dc63996 is discard

java.util.concurrent.FutureTask@d716361 is discard

1506244149921:Thread ID:10

1506244149930:Thread ID:11

1506244149941:Thread ID:12

1506244149951:Thread ID:13

1506244149962:Thread ID:14

java.util.concurrent.FutureTask@6ff3c5b5 is discard

java.util.concurrent.FutureTask@3764951d is discard

java.util.concurrent.FutureTask@4b1210ee is discard

java.util.concurrent.FutureTask@4d7e1886 is discard

java.util.concurrent.FutureTask@3cd1a2f1 is discard

1506244150023:Thread ID:10

1506244150031:Thread ID:11

可以看到,饱和策略生效了,在实际应用中,我们可以记录日志,分析系统的负载和任务丢失的情况。

记录点点滴滴,有很多要学习,望大家指点。

相关文章

时间单位的换算(秒,毫秒,微秒,纳秒,皮秒)
历史世界杯冠军赛中有平局吗?(揭秘世界杯决赛的胜负结果及平局情况)
王者荣耀苏烈多少金币-王者荣耀苏烈价格介绍