什么是Zeronet?

并发是一个难题,但是可以通过使用强力简单的抽象来显著的简化,为了简化问题,guava扩展了Future接口,即 ListenableFuture (可以监听的Future)。我强烈建议你在你的所有代码里使用ListenableFuture去替代Future,原因如下:

  • 很多的Futures 类的方法需要它。(Futures工具类使用)
  • 它比后来改造为ListenableFutrue更简单。(早点使用比重构更简单)
  • 工具方法的提供者不需要提供Future和ListenableFuture方法的变体。(不需要兼容两套)

接口

一个传统的Futrue代表一个异步计算的结果:一个可能完成也可能没有完成输出结果的计算。一个Future可以用在进度计算,或者说是 一个提供给我们结果的服务的承诺。一个ListenableFuture允许注册当你在计算完成的时候的回调,或者计算已经完成了。这个简单的增强让高效支持多种操作成为可能。而Future接口并不能支持。ListenbleFuture中添加的基本操作是addListener(Runnable , Executor ),它指出了当未来计算完成时,指定的Runnable会在指定的Executor中运行。

增加回调

很多用户喜欢使用 Futures.addCallback(ListenableFuture,FutureCallback,Executor)方法。FutureCallback实现了下面两个方法:

  • onSuccess(v) 当未来成功执行的动作,基于计算结果
  • onFailure(Throwable) 当未来失败执行的动作,基于失败

创建

相较于jdk提供的 ExecutorService.submit(Callable)方法来初始化一个异步计算。它返回一个常规的Future,guava提供了ListeningExecutorService接口,它返回ListenableFuture。把ExecutorService转换为ListenableExecutorService使用:MoreExecutors.listeningDecorator(ExecutorService)

基础用法如下:


/**
* 说明:使用例子代码
* @author carter
* 创建时间: 2020年03月19日 9:54 上午
**/

@Slf4j
public class ListenableFutureUtils {

public static void main(String[] args) {

ListeningExecutorService service = MoreExecutors.listeningDecorator(
Executors.newFixedThreadPool(10));


final ListenableFuture<AResult> listenableFuture = service.submit(() -> {
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
return new AResult(30, \"male\", 1);

});


Futures.addCallback(listenableFuture,
new FutureCallback<AResult>() {
@Override
public void onSuccess(AResult aResult) {
log.info(\"计算成功,{}\",aResult);
}

@Override
public void onFailure(Throwable throwable) {

log.error(\"计算错误\",throwable);

}
},service);

}

@Data
@AllArgsConstructor
public static class AResult{

private Integer age;

private String sex;

private Integer id;


}

}

相对的,如果你想从基于FutureTask的API转换过来,Guava提供了ListenableFutureTask.create(Callable)和ListenableFutureTask.create(Runnable)不同于jdk,ListenableFutureTask并不是直接扩展的。

如果你喜欢抽象的设置future的值,而不是实现一个方法然后计算值,可以考虑使用AbstractFuture或使用SettableFuture ;

如果你必须转换Future为ListenableFuture,你别无选择,必须使用 JdkFutureAdapters.listenInPoolThread(Future)来转换Future为ListenableFuture任何时候只要可能,推荐你修改源码让它返回一个 ListenableFuture

应用

使用ListenablFuture最重要的原因是可以使用链式异步操作。

代码如下:

package com.xxx.demo;

import com.google.common.util.concurrent.AsyncFunction;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import lombok.AllArgsConstructor;
import lombok.Data;

/**
* 说明:异步操作链
* @author carter
* 创建时间: 2020年03月19日 10:11 上午
**/

public class ApplicationUtils {


public static void main(String[] args) {

Query query = new Query(30);

ListenableFuture<RowKey> rowKeyFuture = lookUp(query);

AsyncFunction<RowKey, QueryResult> queryFun = rowKey -> readData(rowKey);

final ListenableFuture<QueryResult> queryResultListenableFuture =
Futures.transformAsync(rowKeyFuture, queryFun);

}

private static ListenableFuture<QueryResult> readData(RowKey rowKey) {
return null;
}

private static ListenableFuture<RowKey> lookUp(Query query) {
return null;
}


@Data
@AllArgsConstructor
public static class RowKey {

private String id;

}

@Data
@AllArgsConstructor
public static class Query {

private Integer age;

}


@Data
@AllArgsConstructor
public static class QueryResult {

private String id;
private String age;

}


}

很多其他高效支持的操作ListenableFuture提供,而Future不提供。不同的操作可以被不同的线程池执行,一个简单的ListenableFuture可以有多个操作去等待。

只要一个操作开始,其他多个操作应该开始,fan-out, 千帆竞发。

ListenableFuture可以实现这样的操作:它触发了所有请求的回调。

通过少量的工作,我们可以 fan-in.

触发一个ListenableFuture 来获得计算结果,当其他的Future结束的时候。

Futures.allAsList是一个例子。

方法介绍:

方法描述transformAsync(ListenableFuture , AsyncFunction , Executor)返回一个新的ListenableFuture,它的结果是执行异步函数的返回,函数入参是ListenableFuture的返回结果;transform(ListenableFuture , Function , Executor)返回一个新的ListenableFuture,它的结果是执行函数的返回,函数入参是ListenableFuture的返回结果;allAsList(Iterable)返回一个ListenableFuture,它的结果是一个list,包含每一个列表中的ListenableFuture的执行结果,任何一个ListenableFuture执行失败或者取消,最后的返回结果取消successfullAsList(Iterable)返回一个ListenableFuture,它的结果是一个list,包含每一个列表中的ListenableFuture的执行结果,成功的是结果,失败或者取消的值使用null替代

AsyncFunction<A,B> 提供了一个方法 , ListenableFuture apply(A inpunt),它可以用来异步的转换值。

代码如下:

package com.xxx.demo;

import com.google.common.collect.Lists;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;

import java.util.List;

/**
* 说明:成功执行结果汇集
* @author carter
* 创建时间: 2020年03月19日 10:34 上午
**/
@Slf4j
public class Test3 {

public static void main(String[] args) {

List<ListenableFuture<QueryResult>> querys = Lists.newLinkedList();
final ListenableFuture<List<QueryResult>> successfulAsList =
Futures.successfulAsList(querys);

Futures.addCallback(successfulAsList, new FutureCallback<List<QueryResult>>() {
@Override
public void onSuccess(List<QueryResult> queryResults) {
log.info(\"执行结果列表:{}\",queryResults);
}

@Override
public void onFailure(Throwable throwable) {
log.error(\"执行失败\",throwable);
}
});


}

@Data
@AllArgsConstructor
public static class QueryResult{


private Integer age;

}


}

嵌套的Future

你的代码调用一个通用接口并返回一个Future,很可能最终返回一个嵌套的Future.

package com.xxx.demo;

import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import lombok.AllArgsConstructor;
import lombok.Data;

import java.util.concurrent.Callable;
import java.util.concurrent.Executors;

/**
* 说明:嵌套的ListenableFuture
* @author carter
* 创建时间: 2020年03月19日 10:43 上午
**/

public class Test4 {

public static void main(String[] args) {


final ListeningExecutorService executorService = MoreExecutors
.listeningDecorator(Executors.newFixedThreadPool(2));
final ListeningExecutorService otherExecutorService = MoreExecutors
.listeningDecorator(Executors.newFixedThreadPool(2));


Callable<Foo> otherCallback = ()->new Foo(\"aaa\");


final ListenableFuture<ListenableFuture<Foo>> submit =
executorService.submit(() -> otherExecutorService.submit(otherCallback));


}

@Data
@AllArgsConstructor
public static class Foo{

private String name;
}

}

例子最后返回的是: ListenableFuture<ListenableFuture> ,
这个代码不对,因为当外层的Future 取消的时候,无法传播到内层的Future,
这也是一个 使用get()检查别的Future或者Listnener的常规的错误,

但是,除非特别关注 否则 otherCallback抛出的异常会被压制。为了避免这种情况,所有的guava的Future处理方法(有些从jdk来),有 *Async版本来安全的解开这个嵌套。

比如:transform,transformAsyn, submit, submitAsync方法。

深入研究

guava并发工具

guava并发工具

上一篇

肾友谨记:你最该担心的从来都不是尿毒症!

下一篇

最坏打算其实空间也有限

你也可能喜欢

  • 暂无相关文章!

发表评论

您的电子邮件地址不会被公开。 必填项已用 * 标注

提示:点击验证后方可评论!

插入图片
返回顶部