dubbo的rpc服务如果使用的是长连接的协议,如dubbo协议,tcp长连接本身升双向异步的,dubbo内部通过监听返回结果,来实现方法调用的同步,所以性能上是有牺牲的;
在某些场景可以通过异步提升性能
服务消费者Callback接口及实现:
consumer.xml
<beanid="demoCallback"class="com.alibaba.dubbo.callback.implicit.NofifyImpl"/> < id="demoService"interface="com.alibaba.dubbo.callback.implicit.IDemoService"version="1.0.0"group="cn"> < name="get"async="true"onreturn="demoCallback.onreturn"onthrow="demoCallback.onthrow"/> </ > |
classNofifyImplimplementsNofify { publicMap<Integer, Person> ret =newHashMap<Integer, Person>(); publicMap<Integer, Throwable> errors =newHashMap<Integer, Throwable>(); publicvoidonreturn(Person msg, Integer id) { //第一个参数为返回对象,第二个开始为入参 System.out.println("onreturn:"+ msg); ret.put(id, msg); } publicvoidonthrow(Throwable ex, Integer id) {//第一个参数为异常,第二个开始参数为入参,异常只有在onreturn的时候发生发生异常(返回异常,这里onreturn调用也是异常的),才调用, errors.put(id, ex); } } |
注意点:因为是回调异步,异步处理的时候有时候需要,当时调用时候的一些对象,所有你可以
自己写个子类,继承入参,在新类里设置需要的内容,回调就可以带过来,(但新子类必须都实现序列号接口,不然回调不成功);
详细参考:
com.alibaba.dubbo.rpc.protocol.dubbo.filter.FutureFilter
异常只有在onreturn的时候发生发生异常(返回异常,这里onreturn调用也是异常的),才调用
private void fireInvokeCallback(final Invoker invoker, final Invocation invocation) { final Method onInvokeMethod = (Method)StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(), Constants.ON_INVOKE_METHOD_KEY)); final Object onInvokeInst = StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(), Constants.ON_INVOKE_INSTANCE_KEY)); if (onInvokeMethod == null && onInvokeInst == null ){ return ; } if (onInvokeMethod == null || onInvokeInst == null ){ throw new IllegalStateException("service:" + invoker.getUrl().getServiceKey() +" has a onreturn callback config , but no such "+(onInvokeMethod == null ? "method" : "instance")+" found. url:"+invoker.getUrl()); } if (onInvokeMethod != null && ! onInvokeMethod.isAccessible()) { onInvokeMethod.setAccessible(true); } Object[] params = invocation.getArguments(); try { onInvokeMethod.invoke(onInvokeInst, params); } catch (InvocationTargetException e) { fireThrowCallback(invoker, invocation, e.getTargetException()); } catch (Throwable e) { fireThrowCallback(invoker, invocation, e); } }
public class Test { public static void main(String[] args) { DubboParam p = new DubboParam(); DubboServiceManager dubboServiceManager = new DubboServiceManager(p); RetrunNotify notify=new RetrunNotify(); ReferenceConfigref=dubboServiceManager.getChatHisServiceReference(); List list=new ArrayList (); MethodConfig methodConfig=new MethodConfig(); methodConfig.setName("insert"); methodConfig.setAsync(true); methodConfig.setOnreturn(notify); methodConfig.setOnreturnMethod("onreturn"); methodConfig.setOnthrow(notify); methodConfig.setOnthrowMethod("onthrow"); list.add(methodConfig); ref.setMethods(list); ChatHisDTONotify his = new ChatHisDTONotify(); Random r = new Random(); try { his.setSenderId(r.nextLong()); his.setMsg(" msg" + r.nextInt()); his.setReceiverId(r.nextLong()); his.setDomainId(1L); his.setA(100); his.setC(new Date()); ResponseDTO a = ref.get().insert(his); System.out.println("onreturn:" + a); } catch (Exception e) { // TODO: handle exception } } public static class ChatHisDTONotify extends ChatHisDTO{ Object c; int a; public int getA() { return a; } public void setA(int a) { this.a = a; } @Override public String toString() { return "ChatHisDTONotify [c=" + c + ", a=" + a + "]"; } public Object getC() { return c; } public void setC(Object c) { this.c = c; } } public static class RetrunNotify{ private int maxSize=20000; Map map=new ConcurrentHashMap (); public void onreturn(ResponseDTO a,ChatHisDTONotify msg) { System.out.println(a.getDataResult()); System.out.println("onreturn:" + msg); throw new RuntimeException("xxxxx"); } public void onthrow(Throwable ex,ChatHisDTONotify msg ){ System.out.println(msg.getA()); } }}
问题:
dubbo的异步调用发现个问题
A -----[异步]--> B --[同步调用]-->C B同步dubbo调用C,就好直接返回null如果B下一步还有同步调用D,就会正确;
待调查