CXF中Web服务请求处理流程

   Web服务其本质上还是HTTP请求,只不过要遵循Web服务特定的规范,比如说服务端与客户端双方通信的媒介为XML文件格式,以完成对一次Web服务调用的描述。当然此XML还有特定的格式,至于这个特定的格式是什么样的取决于使用哪一种数据绑定方式。CXF中称这为数据绑定,个人觉得称之为Java对象的序列化与反序列化更易懂些。

   本文将从源码(CXF版本为2.7.6)层面来分析一下CXF是如何完成对一个Web服务(SOAP类型为例)请求的处理的,注意这里分析的是请求的处理,并不包含这个请求是如何生成的,分析将将从CXF服务端接收到一个请求开始。在CXF中,处理HTTP请求是使用Jetty实现的,其处理请求的Handler为org.apache.cxf.transport.http_jetty.JettyHTTPHandler在其handle方法中调用的是org.apache.cxf.transport.http_jetty.JettyHTTPDestination的doService()方法,其中最重要的一句代码是调用了serviceRequest(context, req, resp)方法,下面是源码:

protected void serviceRequest(final ServletContext context, 
                                  final HttpServletRequest req, 
                                  final HttpServletResponse resp) throws IOException {
	Request baseRequest = (req instanceof Request) ? (Request)req : getCurrentRequest();
	
	if (LOG.isLoggable(Level.FINE)) {
		LOG.fine("Service http request on thread: " + Thread.currentThread());
	}
	Message inMessage = retrieveFromContinuation(req);
	
	if (inMessage == null) {
		
		inMessage = new MessageImpl();
		ExchangeImpl exchange = new ExchangeImpl();
		exchange.setInMessage(inMessage);
		setupMessage(inMessage, context, req, resp);
		
		((MessageImpl)inMessage).setDestination(this);

		exchange.setSession(new HTTPSession(req));
	}
	
	try {
		//处理消息
		//incomingObserver的现实类为org.apache.cxf.transport.ChainInitiationObserver
		incomingObserver.onMessage(inMessage);
		resp.flushBuffer();
		baseRequest.setHandled(true);
	} catch (SuspendedInvocationException ex) {
		//省略...
	} 
	//省略...
}

在这个方法中,完成了对Message的创建,初始化设置(setupMessage);Exchange、Session的创建,并将Message与Session放置进Exchange对象中。Message对象的初始化设置(setupMessage方法中)非常重要,向Message中放置了很多信息,如Http请求对象,响应对象,请求的地址等一系列信息,具体请查看setupMessage方法,需要的对象创建完成后调用ChainInitiationObserver.onMessage()方法,源码如下:

public void onMessage(Message m) {
	Bus origBus = BusFactory.getAndSetThreadDefaultBus(bus);
	ClassLoaderHolder origLoader = null;
	try {
		if (loader != null) {
			origLoader = ClassLoaderUtils.setThreadContextClassloader(loader);
		}
		InterceptorChain phaseChain = null;
		
		//如果拦截器链不为空,则检查其状态,如果是停止或挂起状态则使其恢复
		if (m.getInterceptorChain() != null) {
			phaseChain = m.getInterceptorChain();
			// To make sure the phase chain is run by one thread once
			synchronized (phaseChain) {
				if (phaseChain.getState() == InterceptorChain.State.PAUSED 
					|| phaseChain.getState() == InterceptorChain.State.SUSPENDED) {
					phaseChain.resume();
					return;
				}
			}
		}
		
		//重新创建Message对象,m为MessageImpl对象,而CXF中支持SOAP协议服务与restful服务
		//所以要根据具体使用的是哪一种类型创建出更具体的Message对象,这里,如果是SOAP协议则创建的是SOAPMessage对象
		//如果是restful服务则创建的是XMLMessage对象
		Message message = getBinding().createMessage(m);
		Exchange exchange = message.getExchange();
		if (exchange == null) {
			exchange = new ExchangeImpl();
			m.setExchange(exchange);
		}
		exchange.setInMessage(message);
		//往Exchage对象中填充信息
		setExchangeProperties(exchange, message);

		InterceptorProvider dbp = null;
		if (endpoint.getService().getDataBinding() instanceof InterceptorProvider) {
			//数据绑定对象不为空
			dbp = (InterceptorProvider)endpoint.getService().getDataBinding();
		}
		// setup chain
		if (dbp == null) {
			phaseChain = chainCache.get(bus.getExtension(PhaseManager.class).getInPhases(),
														 bus.getInInterceptors(),
														 endpoint.getService().getInInterceptors(),
														 endpoint.getInInterceptors(),
														 getBinding().getInInterceptors());
		} else {
			//将Bus、Service、Endpoint、协议绑定、数据绑定中的所有输入拦截器汇聚到拦截器链中
			phaseChain = chainCache.get(bus.getExtension(PhaseManager.class).getInPhases(),
										bus.getInInterceptors(),
										endpoint.getService().getInInterceptors(),
										endpoint.getInInterceptors(),
										getBinding().getInInterceptors(),
										dbp.getInInterceptors());
		}
	
		
		//将拦截器链设置进Message中
		message.setInterceptorChain(phaseChain);
		phaseChain.setFaultObserver(endpoint.getOutFaultObserver());
		addToChain(phaseChain, message);
		
		//调用拦截器链的doIntercept方法,该方法中即依次调用链中的各个拦截器的handMessage方法。
		phaseChain.doIntercept(message);
		
	} finally {
		if (origBus != bus) {
			BusFactory.setThreadDefaultBus(origBus);
		}
		if (origLoader != null) {
			origLoader.reset();
		}
	}
}

到这里我们知道了拦截器是如何被调用的,但奇怪是我们并还没有看到服务的调用,而当doIntercept执行完成后onMessage()方法将退出,serviceRequest方法也就基本上执行完成了。大家应该猜得出来,服务的调用正是在doIntercept方法中完成的,确切点是在拦截器链接中的某个拦截器的handMessage()方法中完成的。使用拦截器来实现框架自身某些特定的功能是不是和struts2中的拦截器有点类似呢(并不是说拦截器的实现方式)。

真实情况是拦截器链从Service注册的拦截器中获取了一个org.apache.cxf.interceptor.ServiceInvokerInterceptor对象,服务的调用由该拦截器完成。


public void handleMessage(final Message message) {
	final Exchange exchange = message.getExchange();
	final Endpoint endpoint = exchange.get(Endpoint.class);
	final Service service = endpoint.getService();
	//获取调用都对象org.apache.cxf.jaxws.JAXWSMethodInvoker
	final Invoker invoker = service.getInvoker();        
	
	//创建一个Runnable可执行对象
	Runnable invocation = new Runnable() {

		public void run() {
			Exchange runableEx = message.getExchange();
			//调用JAXWSMethodInvoker的invoke方法,其返回值result为MessageContentsList对象
			Object result = invoker.invoke(runableEx, getInvokee(message));
			if (!exchange.isOneWay()) {//如果不是单向Exchange,即是双向的,也就是还有输出
				Endpoint ep = exchange.get(Endpoint.class);
				//创建输出Message
				Message outMessage = runableEx.getOutMessage();
				if (outMessage == null) {
					outMessage = new MessageImpl();
					outMessage.setExchange(exchange);
					outMessage = ep.getBinding().createMessage(outMessage);
					exchange.setOutMessage(outMessage);
				}
				copyJaxwsProperties(message, outMessage);
				if (result != null) {
					MessageContentsList resList = null;
					if (result instanceof MessageContentsList) {
						resList = (MessageContentsList)result;
					} else if (result instanceof List) {
						resList = new MessageContentsList((List<?>)result);
					} else if (result.getClass().isArray()) {
						resList = new MessageContentsList((Object[])result);
					} else {
						outMessage.setContent(Object.class, result);                            
					}
					if (resList != null) {
						//将结果放置在outMessage中
						outMessage.setContent(List.class, resList);
					}
				}                   
			}
		}

	};
	
	//从Service中获取Executor,不为null
	Executor executor = getExecutor(endpoint);
	//从Exchange中获取Executor,为null
	Executor executor2 = exchange.get(Executor.class);
	if (executor2 == executor || executor == null) {
		// already executing on the appropriate executor
		invocation.run();
	} else {
		//执行
		exchange.put(Executor.class, executor);
		FutureTask<Object> o = new FutureTask<Object>(invocation, null) {
			@Override
			protected void done() {
				super.done();
				synchronized (this) {
					this.notifyAll();
				}
			}
		};
		synchronized (o) {
			//执行FutureTask,最后还是执行了刚创建Runnable的run方法,返回run方法中
			executor.execute(o);
			
			//省略...
		}
	}
}

调用JAXWSMethodInvoker的invoke(Exchange exchange, Object o)从AbstractInvoker类中继承而来,如下:

public Object invoke(Exchange exchange, Object o) {
	
	//获取服务实现类对象,默认使用单例工厂创建
	final Object serviceObject = getServiceObject(exchange);
	try {
		
		//获取方法信息
		BindingOperationInfo bop = exchange.get(BindingOperationInfo.class);
		MethodDispatcher md = (MethodDispatcher) 
			exchange.get(Service.class).get(MethodDispatcher.class.getName());
		//查找出要执行的方法
		Method m = bop == null ? null : md.getMethod(bop);
		if (m == null && bop == null) {
			LOG.severe(new Message("MISSING_BINDING_OPERATION", LOG).toString());
			throw new Fault(new Message("EXCEPTION_INVOKING_OBJECT", LOG, 
										 "No binding operation info", "unknown method", "unknown"));
		}
		List<Object> params = null;
		if (o instanceof List) {
			params = CastUtils.cast((List<?>)o);
		} else if (o != null) {
			params = new MessageContentsList(o);
		}
		
		m = adjustMethodAndParams(m, exchange, params);
		
		//Method m = (Method)bop.getOperationInfo().getProperty(Method.class.getName());
		m = matchMethod(m, serviceObject);//再次匹配方法
		
		//将服务对象,方法,方法参数传入
		return invoke(exchange, serviceObject, m, params);
	} finally {
		releaseServiceObject(exchange, serviceObject);
	}
}

将服务对象,方法,方法参数传入入调用了重载的protected Object invoke(Exchange exchange,  final Object serviceObject, Method m, List<Object> params)方法该方法位于JAXWSMethodInvoker类中,其夫归调用父类的invoke方法,最终调用AbstractInvoker的performInvocation方法,如下:

protected Object performInvocation(Exchange exchange, final Object serviceObject, Method m, Object[] paramArray) throws Exception {
	paramArray = insertExchange(m, paramArray, exchange);
	//省略...
	return m.invoke(serviceObject, paramArray);//通过反射调用服务实现对象方法
}

到这里,你应该知道,服务是如何被调用的,那么服务被调用后又是如何返回的呢?现在回到ServiceInvokerInterceptor拦截器中创建的Runnable对象,因为Exchange是双向的,所以会创建出outMessage,然后将服务执行的结果封装成一个MessageContentsList对象放置在outMessage的content中。

在输入拦截器链中还注册了另外一个org.apache.cxf.interceptor.OutgoingChainInterceptor拦截器:


public void handleMessage(Message message) {
	Exchange ex = message.getExchange();
	BindingOperationInfo binding = ex.get(BindingOperationInfo.class);
	if (null != binding && null != binding.getOperationInfo() && binding.getOperationInfo().isOneWay()) {
		closeInput(message);
		return;
	}
	Message out = ex.getOutMessage();
	if (out != null) {
		getBackChannelConduit(message);
		if (binding != null) {
			out.put(MessageInfo.class, binding.getOperationInfo().getOutput());
			out.put(BindingMessageInfo.class, binding.getOutput());
		}
		
		InterceptorChain outChain = out.getInterceptorChain();
		if (outChain == null) {
			//对于输出消息来说,输出拦截器链就是其输入,这里与收集输入拦截器类似,这只过这里是
			//获取Bus、Service、Endpoint、协议绑定、数据绑定中的输出拦截器
			outChain = OutgoingChainInterceptor.getChain(ex, chainCache);
			out.setInterceptorChain(outChain);
		}
		//负责收集输出拦截器并创建出输出拦截器链,然后调用其doIntercept方法
		outChain.doIntercept(out);
	}
}

在输出拦截器链中有几个关于向客端输出的拦截器:

1. org.apache.cxf.interceptor.MessageSenderInterceptor

在其handleMessage方法中执行了getConduit(message).prepare(message);而在prepare方法中向Message中设置了OutputStream,并传入了HttpServletResponse对象


public void prepare(Message message) throws IOException {
	message.put(HTTP_RESPONSE, response);
	OutputStream os = message.getContent(OutputStream.class);
	if (os == null) {
		message.setContent(OutputStream.class, 
					   new WrappedOutputStream(message, response));
	}
}

Conduit实现类为org.apache.cxf.transport.http.AbstractHTTPDestination.BackChannelConduit


2. org.apache.cxf.interceptor.StaxOutInterceptor
创建XMLStreamWriter对象,并设置进Message中


3. org.apache.cxf.interceptor.BareOutInterceptor
获取服务调用的返回结果,使用XMLStreamWriter对象将包装好的返回结果以XML格式写入WrappedOutputStream中,而WrappedOutputStream是包装了HttpServletRespone.getOutputStream()对象
即,将结果返回给了客户端。

当输出拦截器链执行完成后返回到JettyHTTPDestination的serviceRequest()方法,刷新HttpServletRespone并标记请求处理完成。

郑重声明:本站内容如果来自互联网及其他传播媒体,其版权均属原媒体及文章作者所有。转载目的在于传递更多信息及用于网络分享,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。