skaka的博客

every flight begins with a fall

微服务框架Finagle介绍 Part2: 在Finagle中开发基于Http协议的应用

| Comments

上篇文章中我介绍了Finagle中的Future/Service/Filter. 这篇文章里, 我们将构建一个基于Http协议的echo服务端和客户端, 下篇文章将构建一个基于thrift协议的客户端和服务端. 这两篇文章对应的源代码地址在Github. 代码中有Java和Scala版本两套版本的实现, 但是这里我只会介绍Java版本.

首先来看echo应用的Server端代码, 打开java-finagle-example/src/main/java/com/akkafun/finagle/Server.java:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class Server extends Service<Request, Response> {                             //1

    @Override
    public Future<Response> apply(Request request) {                                 //2
        System.out.println("request: " + request.getContentString());
        Response response = Response.apply(Version.Http11$.MODULE$, Status.Ok());
        response.setContentString(request.getContentString());
        return Future.value(response);
    }

    public static void main(String[] args) throws Exception {
        Server service = new Server();

        ListeningServer server = Http.server().                                      //3
                withLabel("echo-server").
                withTracer(ZipkinTracer.mk("192.168.99.100",
                    9410, DefaultStatsReceiver$.MODULE$, 1.0f)).
                serve(new InetSocketAddress(8081), service);

        Await.result(server);
    }
}
  1. 在Finagle中, 实现一个RPC服务非常简单. 只需要继承Service抽象类, 实现它的apply方法. Service抽象类有两个类型参数, 第一个类型参数代表的是请求对象, 第二个类型参数代表的是返回对象. 这两个对象的具体类型与Service实现类使用的具体协议有关. 例如我们在echo服务中使用Http协议, 对应的Request类就是com.twitter.finagle.http.Request, 对应的Response类是com.twitter.finagle.http.Response. 如果是thrift协议, 则这两个类型参数在Service实现类中都是scala.Array<scala.Byte>(Array和Byte都是scala中的类, 对应Java中的数组与byte).

  2. apply方法中, 我们首先使用Response的工厂方法构造一个Response对象. 然后将Request中的请求内容原封不动的设置到Response中, 再将Response设置到Future中返回. 需要最后一步的原因是apply方法的返回值类型是Future<Response>, 但是我们在这个方法中不需要进行异步操作, 所以可以直接使用Future.value(response)将对象包装成Future返回. 另外, 细心的你应该发现了一行比较碍眼的代码: Response.apply(Version.Http11$.MODULE$, Status.Ok()), 其中Version的用法很古怪. 这是Java调用Scala伴生对象的副作用, Scala有一些语法和特性在Java中没有对应的概念, 这种情况下Java调用Scala的代码就会比较晦涩.

  3. 为了启动Service实例, 我们需要构造一个com.twitter.finagle.ListeningServer. withLabel设置服务名称, withTracer设置监控信息, 这个等后面介绍zipkin的时候在解释. 最后指定端口启动服务.

现在来看echo应用的Client端代码, 打开java-finagle-example/src/main/java/com/akkafun/finagle/Client.java:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import static scala.compat.java8.JFunction.*;

public class Client {

    public static void main(String[] args) throws TimeoutException, InterruptedException {
        Service<Request, Response> service = Http.client().                             //1
                withLabel("echo-client").
                withTracer(ZipkinTracer.mk("192.168.99.100",
                    9410, DefaultStatsReceiver$.MODULE$, 1.0f)).
                newService("127.0.0.1:8081");

        //create a "Greetings!" request.
        Reader data = Reader$.MODULE$.fromStream(                                       //2
            new ByteArrayInputStream("Greetings!".getBytes(StandardCharsets.UTF_8)));
        Request request = Request.apply(Version.Http11$.MODULE$,
            Method.Post$.MODULE$, "/", data);

        Future<Response> responseFuture = Await.ready(service.apply(request));          //3
        responseFuture.onSuccess(func(response -> {                                     //4
            System.out.println(String.format("response status: %s, response string: %s",
                    response.status().toString(), response.contentString()));
            return BoxedUnit.UNIT;
        }));
        responseFuture.onFailure(func(e -> {
            System.out.println("error: " + e.toString());
            return BoxedUnit.UNIT;
        }));
        responseFuture.ensure(func(() -> {
            service.close();
            //IDE may complain here, just ignore
            return BoxedUnit.UNIT;
        }));

    }
}
  1. 这部分代码和我们之前的Server类代码很像. 在Server类中, 我们创建了一个Service实例并监听了8081端口, 现在客户端通过newService创建了一个Service的stub.

  2. 这部分代码用来构造一个消息内容为Greetings的Http请求.

  3. service.apply(request)就是一次客户端到服务端的RPC调用. 这个调用的返回值是Future<Response>.
    service.apply(request)是一个异步操作, 主线程调用这个方法并不会阻塞, 有可能主线程退出了实际调用还没有完成. 所以这里就要用到Await.ready了. Await.ready的作用是等待一个Future执行完成再返回, 是一个同步操作. 通过调用Await.ready我们就能将一个异步操作转化成一个同步操作.

  4. 接下来我们在Future上注册请求成功与失败的回调函数. 请求成功的回调函数中只是简单的打印出响应的消息内容.
    这里有个细节需要说明一下. Future的onSuccess方法需要传入一个Scala的函数特质: scala.Function1[Response, BoxedUnit]. 如果是Java6或7, 我们可以这样实现这个特质:

1
2
3
4
5
6
7
8
responseFuture.onSuccess(new AbstractFunction1<Response, BoxedUnit>(){
    @Override
    public BoxedUnit apply(Response response) {
        System.out.println(String.format("response status: %s, response string: %s",
                response.status().toString(), response.contentString()));
        return BoxedUnit.UNIT;
    }
});

在Java8中, 这种匿名类我们一般会使用Lambda代替, 理想情况下写法是这样:

1
2
3
4
5
responseFuture.onSuccess(response -> {
    System.out.println(String.format("response status: %s, response string: %s",
            response.status().toString(), response.contentString()));
    return BoxedUnit.UNIT;
});

可惜的是这种写法编译不会通过, 因为只有符合FunctionalInterface定义的接口才能使用Lambda表达式(什么是FunctionalInterface, 请参考Javadoc), 而在Scala2.11中, scala.Function1不是一个FunctionalInterface(Scala2.12会兼容Java8). 为了在这里使用Lambda, 我们使用了scala-java8-compat这个库, 调用scala.compat.java8.JFunction.func方法将一个FunctionalInterface转化成scala.Function1.

可以看出, 在Java中调用Finagle的API不是很方便. 所以Finagle适合以Scala为主, Java为辅的项目. 如果项目全是Java, 则值得为Finagle主要的API写一层Java的适配层, 来屏蔽Java调用Scala代码会出现的一些晦涩代码.

现在我们启动服务端和客户端来看看运行结果. 首先启动Server类, 然后启动Client. Client运行完毕自动结束, 你应该能在Client的控制台看到如下输出:

1
response status: Status(200), response string: Greetings!

Server控制台的输出:

1
request: Greetings!

Http协议比较适合用于对外提供服务, 并且一般会使用REST. 在Finagle中使用REST可以使用Finch库. 这个库轻量小巧, API简单, 提供了一套很方便的对Http消息进行操作的DSL. 如果是内网服务调用, 一般推荐使用结构紧凑, 传输效率高的协议. 比如protocol buffer, thrift或Avro. Finagle对thrift有很好的支持, 下篇文章我将介绍在Finagle中如何开发thrift应用.

Comments