巧用apache httpcore-nio实现android推送服务

1. 背景

Android推送服务应用非常广泛,一般有轮询、SMS推送、IP推送几种实现方式。由于轮询的即时性不高、SMS推送需要不菲的费用,所以一般采取IP推送。由于googleIP推送平台C2DM国内被屏蔽,国内涌现很多优秀的推送平台如个推、极光推送。由于实现推送服务有一定技术难度,很多移动互联网应用直接租用这些平台服务,达到快速拓展业务的目标。

 

但是在一些企业和行业应用场合,限制手机接入互联网,不能采用互联网推送平台,就必须实现自己的推送服务。国内流行的方案是采用开源的androidpn,基于XMPP协议。由于XMPP是一个用于IM即时消息的协议,用来做推送服务有很多冗余的东西,显得很不优雅。因此本文提出另一种简洁思路。

2. 采用HTTP的可能性

从服务器向手机推送存在两个限制:1移动设备IP经常改变,Server端无法通过配置解决;2)设备常在NAT后面(如无线路由器),无法由Server发起建立socket连接。因此必须由手机端发起向服务器发起socket连接,并保持这个连接,服务在这个连接上向手机推送消息。

 

因此推送服务存在这样的场景:在socket建连时,手机是client端,推送服务器是server端。但是在消息推送时,推送服务器是发起端成了client,手机作为接收端变成了Server。一般的HTTP开源实现,如apache http coresocket server端绑定http server端,socket client端绑定HTTP client端,无法拆分。但是幸运的是,apache http coreNIO版本,却实现可socket层和http层的分离,可以在socket server上发起http client请求。推送服务器因为要保持成千上万的持久连接,NIO成了不二的选择,apache httpcore-nioNIO模型是理想选择。

3. Demo描述

在公司项目中,服务器端采用apache httpcore-nio,手机端则采用普通的http协议栈,以简化手机的开发难度。因为基于简单协议HTTP、及成熟开源代码,开发量非常小,千把行代码就实现了一个简洁够用的推送服务器,目前该推送服务器已经用于生产环境一年多,性能和稳定性都表现相当不错。

 

demo中,手机和服务器端统一采用apache httpcore-nio,基于HTTP通信。通信模型如下图所示:两个手机、一台服务器都运行于localhost,采用不同的端口区分。

 

Java工程如下图所示。


采用apache httpcore-nio-4.2.4版本。ReverseNHttpServer.java实现推送客户端功能。ReverseNHttpClient.javaConnectionManager.java实现推送服务器功能,其中ConnectionManager用一个Queue保存已经建立的连接。ReverseHttpTest.java用于测试。 

3.1. ReverseNHttpClient

下面是源码。

package com.cangfu.reversehttp;
import java.io.IOException;
import java.net.InetSocketAddress;

import org.apache.http.ContentTooLongException;
import org.apache.http.HttpEntity;
import org.apache.http.HttpException;
import org.apache.http.HttpHost;
import org.apache.http.HttpRequestInterceptor;
import org.apache.http.HttpResponse;
import org.apache.http.entity.ContentType;
import org.apache.http.impl.DefaultConnectionReuseStrategy;
import org.apache.http.impl.nio.DefaultHttpClientIODispatch;
import org.apache.http.impl.nio.reactor.DefaultListeningIOReactor;
import org.apache.http.impl.nio.reactor.IOReactorConfig;
import org.apache.http.message.BasicHttpRequest;
import org.apache.http.message.BasicHttpResponse;
import org.apache.http.nio.ContentDecoder;
import org.apache.http.nio.IOControl;
import org.apache.http.nio.NHttpClientConnection;
import org.apache.http.nio.entity.ContentBufferEntity;
import org.apache.http.nio.protocol.AbstractAsyncResponseConsumer;
import org.apache.http.nio.protocol.BasicAsyncRequestProducer;
import org.apache.http.nio.protocol.HttpAsyncRequestExecutor;
import org.apache.http.nio.protocol.HttpAsyncRequester;
import org.apache.http.nio.reactor.IOEventDispatch;
import org.apache.http.nio.reactor.IOReactorException;
import org.apache.http.nio.reactor.ListenerEndpoint;
import org.apache.http.nio.reactor.ListeningIOReactor;
import org.apache.http.nio.util.HeapByteBufferAllocator;
import org.apache.http.nio.util.SimpleInputBuffer;
import org.apache.http.params.CoreConnectionPNames;
import org.apache.http.params.CoreProtocolPNames;
import org.apache.http.params.HttpParams;
import org.apache.http.params.SyncBasicHttpParams;
import org.apache.http.protocol.HttpContext;
import org.apache.http.protocol.HttpProcessor;
import org.apache.http.protocol.ImmutableHttpProcessor;
import org.apache.http.protocol.RequestConnControl;
import org.apache.http.protocol.RequestContent;
import org.apache.http.protocol.RequestExpectContinue;
import org.apache.http.protocol.RequestTargetHost;
import org.apache.http.protocol.RequestUserAgent;

public class ReverseNHttpClient {
	ListeningIOReactor ioReactor;
    HttpParams params;
    Thread t;
    
    static ConnectionManager<NHttpClientConnection> connMgr = new ConnectionManager<NHttpClientConnection>();
    public static ConnectionManager<NHttpClientConnection> getConnectionManager() {
    	return connMgr;
    }
    
    public ReverseNHttpClient(IOReactorConfig ioconfig) throws IOReactorException {
	this.params = new SyncBasicHttpParams();
        this.params
            .setIntParameter(CoreConnectionPNames.SO_TIMEOUT, 3000)
            .setIntParameter(CoreConnectionPNames.CONNECTION_TIMEOUT, 3000)
            .setIntParameter(CoreConnectionPNames.SOCKET_BUFFER_SIZE, 8 * 1024)
            .setParameter(CoreProtocolPNames.USER_AGENT, "HttpTest/1.1")
            .setParameter(CoreProtocolPNames.ORIGIN_SERVER, "HttpTest/1.1");
        
	// Create client-side I/O reactor
        ioReactor = new DefaultListeningIOReactor(ioconfig);
    }
 
	public void start() {
		
        HttpAsyncRequestExecutor protocolHandler = new HttpAsyncRequestExecutor() {
        	@Override
        	public void connected(
                    final NHttpClientConnection conn,
                    final Object attachment) throws IOException, HttpException {
        		
        		try {
					ReverseNHttpClient.getConnectionManager().putConnection(conn);
				} catch (InterruptedException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}

        		System.out.println(conn + ": connection open in ReverseClient side");

        		super.connected(conn, attachment);
        	}
        };
        // Create client-side I/O event dispatch
        final IOEventDispatch ioEventDispatch = new DefaultHttpClientIODispatch(protocolHandler, params);

        // Run the I/O reactor in a separate thread
        t = new Thread(new Runnable() {

            public void run() {
                try {
                    // Ready to go!
                    ioReactor.execute(ioEventDispatch);
                } catch (IOException e) {
                    System.err.println("I/O error: " + e.getMessage());
                }
                System.out.println("Shutdown");
            }

        });
        // Start the client thread
        t.start();	
	}
	
	public void stop() {
		if (t != null)
			t.interrupt();
	}
	
	public void accept(int port) throws InterruptedException {
		ListenerEndpoint request = ioReactor.listen(new InetSocketAddress(port));
		request.waitFor();
	}
	
	public void HttpExchange(HttpHost target, BasicHttpRequest request) throws InterruptedException {
		
        HttpProcessor httpproc = new ImmutableHttpProcessor(new HttpRequestInterceptor[] {
                // Use standard client-side protocol interceptors
                new RequestContent(),
                new RequestTargetHost(),
                new RequestConnControl(),
                new RequestUserAgent(),
                new RequestExpectContinue()});
        
        HttpAsyncRequester requester = new HttpAsyncRequester(
                httpproc, new DefaultConnectionReuseStrategy(), params);
        
        requester.execute(
                new BasicAsyncRequestProducer(target, request),
                new MyResponseConsumer(),
                ReverseNHttpClient.getConnectionManager().takeConnection()
        		);
	}
	
	static class MyResponseConsumer extends AbstractAsyncResponseConsumer<HttpResponse> {

	    private volatile HttpResponse response;
	    private volatile SimpleInputBuffer buf;

	    public MyResponseConsumer() {
	        super();
	    }

	    @Override
	    protected void onResponseReceived(final HttpResponse response) throws IOException {
	        this.response = response;
	    }

	    @Override
	    protected void onEntityEnclosed(
	            final HttpEntity entity, final ContentType contentType) throws IOException {
	        long len = entity.getContentLength();
	        if (len > Integer.MAX_VALUE) {
	            throw new ContentTooLongException("Entity content is too long: " + len);
	        }
	        if (len < 0) {
	            len = 4096;
	        }
	        this.buf = new SimpleInputBuffer((int) len, new HeapByteBufferAllocator());
	        this.response.setEntity(new ContentBufferEntity(entity, this.buf));
	    }

	    @Override
	    protected void onContentReceived(
	            final ContentDecoder decoder, final IOControl ioctrl) throws IOException {
	        if (this.buf == null) {
	            throw new IllegalStateException("Content buffer is null");
	        }
	        this.buf.consumeContent(decoder);
	    }

	    @Override
	    protected void releaseResources() {
	        this.response = null;
	        this.buf = null;
	    }

	    @Override
	    protected HttpResponse buildResult(final HttpContext context) throws IOException {
	        System.out.println();
	        System.out.println(((BasicHttpResponse)response).toString());
	        response.getEntity().writeTo(System.out);
	        System.out.println();

	        return this.response;
	    }
	}
}

其中,start()方法用于启动httpclientaccept()用于启动socket监听,httpExchange用于发送http请求,MyResponseconsumer是一个内部类,用于处理返回的响应。

 

3.2. ReverseNHttpServer

下面是源码。

 

package com.cangfu.reversehttp;

import java.io.File;
import java.io.IOException;
import java.net.SocketAddress;
import java.net.URL;
import java.net.URLDecoder;
import java.security.KeyStore;
import java.util.Locale;

import javax.net.ssl.KeyManager;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;

import org.apache.http.HttpException;
import org.apache.http.HttpRequest;
import org.apache.http.HttpResponse;
import org.apache.http.HttpResponseInterceptor;
import org.apache.http.HttpStatus;
import org.apache.http.MethodNotSupportedException;
import org.apache.http.entity.ContentType;
import org.apache.http.impl.DefaultConnectionReuseStrategy;
import org.apache.http.impl.nio.DefaultHttpServerIODispatch;
import org.apache.http.impl.nio.DefaultNHttpServerConnection;
import org.apache.http.impl.nio.DefaultNHttpServerConnectionFactory;
import org.apache.http.impl.nio.SSLNHttpServerConnectionFactory;
import org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor;
import org.apache.http.impl.nio.reactor.IOReactorConfig;
import org.apache.http.nio.NHttpConnection;
import org.apache.http.nio.NHttpConnectionFactory;
import org.apache.http.nio.NHttpServerConnection;
import org.apache.http.nio.entity.NFileEntity;
import org.apache.http.nio.entity.NStringEntity;
import org.apache.http.nio.protocol.BasicAsyncRequestConsumer;
import org.apache.http.nio.protocol.BasicAsyncResponseProducer;
import org.apache.http.nio.protocol.HttpAsyncExchange;
import org.apache.http.nio.protocol.HttpAsyncRequestConsumer;
import org.apache.http.nio.protocol.HttpAsyncRequestHandler;
import org.apache.http.nio.protocol.HttpAsyncRequestHandlerRegistry;
import org.apache.http.nio.protocol.HttpAsyncService;
import org.apache.http.nio.reactor.ConnectingIOReactor;
import org.apache.http.nio.reactor.IOEventDispatch;
import org.apache.http.nio.reactor.IOReactorException;
import org.apache.http.nio.reactor.SessionRequest;
import org.apache.http.nio.reactor.SessionRequestCallback;
import org.apache.http.params.CoreConnectionPNames;
import org.apache.http.params.CoreProtocolPNames;
import org.apache.http.params.HttpParams;
import org.apache.http.params.SyncBasicHttpParams;
import org.apache.http.protocol.ExecutionContext;
import org.apache.http.protocol.HttpContext;
import org.apache.http.protocol.HttpProcessor;
import org.apache.http.protocol.ImmutableHttpProcessor;
import org.apache.http.protocol.ResponseConnControl;
import org.apache.http.protocol.ResponseContent;
import org.apache.http.protocol.ResponseDate;
import org.apache.http.protocol.ResponseServer;

public class ReverseNHttpServer {
    HttpParams params;
    final ConnectingIOReactor ioReactor;
    Thread t;
    
    static ConnectionManager<NHttpServerConnection> connMgr = new ConnectionManager<NHttpServerConnection>();
    public static ConnectionManager<NHttpServerConnection> getConnectionManager() {
    	return connMgr;
    }

    public ReverseNHttpServer(IOReactorConfig ioconfig) throws IOReactorException {
    
    	this.params = new SyncBasicHttpParams();
        this.params
            .setIntParameter(CoreConnectionPNames.SO_TIMEOUT, 3000)
            .setIntParameter(CoreConnectionPNames.CONNECTION_TIMEOUT, 3000)
            .setIntParameter(CoreConnectionPNames.SOCKET_BUFFER_SIZE, 8 * 1024)
            .setParameter(CoreProtocolPNames.USER_AGENT, "HttpTest/1.1")
            .setParameter(CoreProtocolPNames.ORIGIN_SERVER, "HttpTest/1.1");
        
        ioReactor = new DefaultConnectingIOReactor(ioconfig);
    }
     
    public void start(String path, boolean ssl) throws Exception {
          
        // Create HTTP protocol processing chain
        HttpProcessor httpproc = new ImmutableHttpProcessor(new HttpResponseInterceptor[] {
                // Use standard server-side protocol interceptors
                new ResponseDate(),
                new ResponseServer(),
                new ResponseContent(),
                new ResponseConnControl()
        });
        // Create request handler registry
        HttpAsyncRequestHandlerRegistry registry = new HttpAsyncRequestHandlerRegistry();
        // Register the default handler for all URIs
        registry.register("*", new HttpFileHandler(new File(path)));
        // Create server-side HTTP protocol handler
        HttpAsyncService protocolHandler = new HttpAsyncService(
                httpproc, new DefaultConnectionReuseStrategy(), registry, params) {

            @Override
            public void connected(final NHttpServerConnection conn) {
                System.out.println(conn + ": connection open in ReverseServer side");
                try {
                	ReverseNHttpServer.getConnectionManager().putConnection(conn);
				} catch (InterruptedException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
                super.connected(conn);
            }

            @Override
            public void closed(final NHttpServerConnection conn) {
                System.out.println(conn + ": connection closed");
                super.closed(conn);
            }

        };
        
        // Create HTTP connection factory
        NHttpConnectionFactory<DefaultNHttpServerConnection> connFactory;
        if (ssl) {
            // Initialize SSL context
            ClassLoader cl = ReverseNHttpServer.class.getClassLoader();
            URL url = cl.getResource("my.keystore");
            if (url == null) {
                System.out.println("Keystore not found");
                System.exit(1);
            }
            KeyStore keystore  = KeyStore.getInstance("jks");
            keystore.load(url.openStream(), "secret".toCharArray());
            KeyManagerFactory kmfactory = KeyManagerFactory.getInstance(
                    KeyManagerFactory.getDefaultAlgorithm());
            kmfactory.init(keystore, "secret".toCharArray());
            KeyManager[] keymanagers = kmfactory.getKeyManagers();
            SSLContext sslcontext = SSLContext.getInstance("TLS");
            sslcontext.init(keymanagers, null, null);
            connFactory = new SSLNHttpServerConnectionFactory(sslcontext, null, params);
        } else {
            connFactory = new DefaultNHttpServerConnectionFactory(params);
        }
        // Create server-side I/O event dispatch
        final IOEventDispatch ioEventDispatch = new DefaultHttpServerIODispatch(protocolHandler, connFactory);
 
        // Run the I/O reactor in a separate thread
        t = new Thread(new Runnable() {

            public void run() {
                try {
                    // Ready to go!
                    ioReactor.execute(ioEventDispatch);
                } catch (IOException e) {
                    System.err.println("I/O error: " + e.getMessage());
                }
                System.out.println("Shutdown");
            }

        });
        // Start the client thread
        t.start();	
    }
    
    public void stop() {
    	if (t != null) {
    		t.interrupt();
    	}
    }

    public void connect(final SocketAddress remoteAddress, final SocketAddress localAddress) throws InterruptedException {
	
    	SessionRequest sessionReq = ioReactor.connect(remoteAddress, localAddress, null, 
			new SessionRequestCallback() {
			
				@Override
				public void completed(SessionRequest request) {
					System.out.println("connection established succeed in device side!!!");
				}
		
				@Override
				public void failed(SessionRequest request) {
					System.err.println("Device side connection failed!");
					System.err.println(request.getException().getMessage());
				}
		
				@Override
				public void timeout(SessionRequest request) {
					System.err.println("Device side connection timeout!");
					System.err.println(request.getException().getMessage());
				}
		
				@Override
				public void cancelled(SessionRequest request) {
					System.err.println("Device side connection cancelled!");
					System.err.println(request.getException().getMessage());
				}
			}
    	);
	
    	sessionReq.waitFor();
    }
    
    static class HttpFileHandler implements HttpAsyncRequestHandler<HttpRequest> {

        private final File docRoot;
        
        public HttpFileHandler() {
            docRoot = new File(".");
        }

        public HttpFileHandler(final File docRoot) {
            super();
            this.docRoot = docRoot;
        }

        public HttpAsyncRequestConsumer<HttpRequest> processRequest(
                final HttpRequest request,
                final HttpContext context) {
            // Buffer request content in memory for simplicity
            return new BasicAsyncRequestConsumer();
        }

        public void handle(
                final HttpRequest request,
                final HttpAsyncExchange httpexchange,
                final HttpContext context) throws HttpException, IOException {
            HttpResponse response = httpexchange.getResponse();
            handleInternal(request, response, context);
            httpexchange.submitResponse(new BasicAsyncResponseProducer(response));
        }

        private void handleInternal(
                final HttpRequest request,
                final HttpResponse response,
                final HttpContext context) throws HttpException, IOException {

            String method = request.getRequestLine().getMethod().toUpperCase(Locale.ENGLISH);
            if (!method.equals("GET") && !method.equals("HEAD") && !method.equals("POST")) {
                throw new MethodNotSupportedException(method + " method not supported");
            }

            String target = request.getRequestLine().getUri();
            final File file = new File(this.docRoot, URLDecoder.decode(target, "UTF-8"));
            if (!file.exists()) {

                response.setStatusCode(HttpStatus.SC_NOT_FOUND);
                NStringEntity entity = new NStringEntity(
                        "<html><body><h1>File" + file.getPath() +
                        " not found</h1></body></html>",
                        ContentType.create("text/html", "UTF-8"));
                response.setEntity(entity);
                System.out.println("File " + file.getPath() + " not found");

            } else if (!file.canRead() || file.isDirectory()) {

                response.setStatusCode(HttpStatus.SC_FORBIDDEN);
                NStringEntity entity = new NStringEntity(
                        "<html><body><h1>Access denied</h1></body></html>",
                        ContentType.create("text/html", "UTF-8"));
                response.setEntity(entity);
                System.out.println("Cannot read file " + file.getPath());

            } else {
                NHttpConnection conn = (NHttpConnection) context.getAttribute(
                        ExecutionContext.HTTP_CONNECTION);
                response.setStatusCode(HttpStatus.SC_OK);
                NFileEntity body = new NFileEntity(file, ContentType.create("text/html"));
                response.setEntity(body);
                System.out.println(conn + ": serving file " + file.getPath());
            }
        }

    }

}

其中,start()方法用于启动httpserverconnect()用于发起socket连接。HttpFileHandler()是一个处理http请求的内部钩子类,它从本地读取一个文本文件,通过response发送到服务器。


3.3. ConnectionManager

源码。

package com.cangfu.reversehttp;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

public class ConnectionManager<E> {
	
	private final BlockingQueue<E> connections;

	public ConnectionManager() {
		
		connections = new LinkedBlockingQueue<E>();
	}

	public void putConnection(E conn) throws InterruptedException {
		this.connections.put(conn);
	}
	
	public E takeConnection() throws InterruptedException {
		return this.connections.take();
	}
}

3.4. ReverseHttpTest

源码如下。

package com.cangfu.reversehttp;

import java.net.InetSocketAddress;

import org.apache.http.HttpHost;
import org.apache.http.impl.nio.reactor.IOReactorConfig;
import org.apache.http.message.BasicHttpRequest;
import org.junit.Test;

public class ReverseHttpTest {

	@Test
	public void testReverseHttp() throws Exception {
		
		int sockServerPort = 60010;
		int sockClientPort1 = 60011;
		int sockClientPort2 = 60012;
		
		IOReactorConfig ioconfig = new IOReactorConfig();
		ioconfig.setIoThreadCount(1);
		ioconfig.setSoKeepalive(true);
		ioconfig.setSoReuseAddress(true);
		ioconfig.setSelectInterval(1000);
		
		// Start ReverseClient, at APNS Server side
		ReverseNHttpClient reverseClient = new ReverseNHttpClient(ioconfig);
		reverseClient.start();
		reverseClient.accept(sockServerPort);
		
		// Start ReverseServer, at Device side
		ReverseNHttpServer reverseServer1 = new ReverseNHttpServer(ioconfig);
		reverseServer1.start(".", false);
		reverseServer1.connect(new InetSocketAddress("localhost", sockServerPort), new InetSocketAddress("localhost", sockClientPort1));
		
		// Start another ReverseServer, at Device side
		ReverseNHttpServer reverseServer2 = new ReverseNHttpServer(ioconfig);
		reverseServer2.start(".", false);
		reverseServer2.connect(new InetSocketAddress("localhost", sockServerPort), new InetSocketAddress("localhost", sockClientPort2));

		// send a request from APNS server to phone 1
		HttpHost target1 = new HttpHost("localhost", sockClientPort1, "http");
		BasicHttpRequest request1 = new BasicHttpRequest("GET", "/Hello1.txt");
        reverseClient.HttpExchange(target1, request1);
        
        // send another request from APNS server to phone 2
        HttpHost target2 = new HttpHost("localhost", sockClientPort2, "http");
        BasicHttpRequest request2 = new BasicHttpRequest("GET", "/Hello2.txt");
        reverseClient.HttpExchange(target2, request2);
        
        Thread.sleep(2*1000);
        reverseServer1.stop();
        reverseServer2.stop();
        reverseClient.stop();
	}
}

 

启动一个推送服务器的HttpClient和两个手机端的HttpServer。推送服务器分别向两个手机发送GET请求。手机读取本地的hello.txt文件,放置于http response的消息体中,返回给服务器。下面是测试运行结果:

 


本文出自 “伧夫的博客” 博客,请务必保留此出处http://cangfu.blog.51cto.com/5966711/1580017

郑重声明:本站内容如果来自互联网及其他传播媒体,其版权均属原媒体及文章作者所有。转载目的在于传递更多信息及用于网络分享,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。