1.Dubboä¹SPIå®ç°åç详解
2.Dubbo调用超时那些事儿
Dubboä¹SPIå®ç°åç详解
SPIå ¨ç§°ä¸ºService Provider Interfaceï¼æ¯ä¸ç§æå¡æä¾æºå¶ï¼æ¯å¦å¨ç°å®ä¸æ们ç»å¸¸ä¼æè¿ç§åºæ¯ï¼å°±æ¯å¯¹äºä¸ä¸ªè§èå®ä¹æ¹èè¨ï¼å¯ä»¥ç解为ä¸ä¸ªæå¤ä¸ªæ¥å£ï¼ï¼å ·ä½çæå¡å®ç°æ¹æ¯ä¸å¯ç¥çï¼å¯ä»¥ç解为对è¿äºæ¥å£çå®ç°ç±»ï¼ï¼é£ä¹å¨å®ä¹è¿äºè§èçæ¶åï¼å°±éè¦è§èå®ä¹æ¹è½å¤éè¿ä¸å®çæ¹å¼æ¥è·åå°è¿äºæå¡æä¾æ¹å ·ä½æä¾çæ¯åªäºæå¡ï¼èSPIå°±æ¯è¿è¡è¿ç§å®ä¹çã说æï¼
Dubbo çæ©å±ç¹å è½½æ¯åºäºJDK æ åç SPI æ©å±ç¹åç°æºå¶å¢å¼ºèæ¥çï¼Dubbo æ¹è¿äº JDK æ åç SPI ç以ä¸é®é¢ï¼
dubbo对äºSPIçå®ç°ä¸»è¦æ¯å¨ExtensionLoaderè¿ä¸ªç±»ä¸ï¼è¿ä¸ªç±»ä¸»è¦æä¸ä¸ªæ¹æ³ï¼
å¦ä¸æ¯getExtension()æ¹æ³çæºç ï¼
createExtension()æ¹æ³çæºç ï¼
å¨createExtension()æ¹æ³ä¸ï¼å ¶ä¸»è¦åäºä¸ä»¶äºï¼
å ³äºwrapper对象ï¼è¿ééè¦è¯´æçæ¯ï¼å ¶ä¸»è¦ä½ç¨æ¯ä¸ºç®æ 对象å®ç°AOPãwrapper对象æ两个ç¹ç¹ï¼
getExtensionClasses()æ¹æ³çæºç
loadDirectory()æ¹æ³çæºç ï¼
loadClass()æ¹æ³çæºç
loadClass()æ¹æ³ä¸»è¦ä½ç¨æ¯å¯¹åç±»è¿è¡ååï¼è¿é主è¦ååæäºä¸é¨åï¼
æ»ç»èè¨ï¼getExtension()æ¹æ³ä¸»è¦æ¯è·åæå®å称对åºçåç±»ãå¨è·åè¿ç¨ä¸ï¼é¦å ä¼ä»ç¼åä¸è·åæ¯å¦å·²ç»å è½½è¿è¯¥åç±»ï¼å¦æ没å è½½è¿åéè¿å®ä¹æ件å è½½ï¼å¹¶ä¸ä½¿ç¨è·åå°çwrapper对象å°è£ ç®æ 对象è¿åã
getAdaptiveExtension()æ¹æ³æºç
Dubbo调用超时那些事儿
其实之前很早就看过Dubbo源码中关于超时这部分的码下处理逻辑,但是码下没有记录下来,最近在某脉上看到有人问了这个问题,码下想着再回顾一下。码下stormkafka源码开始从dubbo的码下请求开始,看看dubbo(2.6.6)在超时这块是码下怎么处理的:
com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchangeChannel#request(java.lang.Object, int)@Overridepublic ResponseFuture request(Object request, int timeout) throws RemotingException { if (closed) { throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");}// create request.Request req = new Request();req.setVersion(Version.getProtocolVersion());req.setTwoWay(true);req.setData(request);DefaultFuture future = new DefaultFuture(channel, req, timeout);try { channel.send(req);} catch (RemotingException e) { future.cancel();throw e;}return future;}DefaultFuture从返回值ResponseFuture类型可以看出,这是码下一个异步方法(不等同于Dubbo的异步调用)。那么调用超时的码下关键可以从ResponseFuture来看:
public interface ResponseFuture { Object get() throws RemotingException;Object get(int timeoutInMillis) throws RemotingException;void setCallback(ResponseCallback callback);boolean isDone();}可以看到这是一个接口,从request方法可以得知实现类是码下DefaultFuture,从构造函数入手:
public DefaultFuture(Channel channel,码下 Request request, int timeout) { this.channel = channel;this.request = request;this.id = request.getId();this.timeout = timeout > 0 ? timeout : channel.getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);// put into waiting map.FUTURES.put(id, this);CHANNELS.put(id, channel);}可以得知每一个DefaultFuture都有一个id,并且等于requestId,码下酷q网站源码timeout是码下从url中获取的配置,没有时默认ms。码下
从代码的码下注释可以看到FUTURES这个map应该就是关键,是码下一个waiting map。
DefaultFuture中还有一个方法:
public static void received(Channel channel,企业引导页源码 Response response) { try { DefaultFuture future = FUTURES.remove(response.getId());if (future != null) { future.doReceived(response);} else { logger.warn("The timeout response finally returned at "+ (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()))+ ", response " + response+ (channel == null ? "" : ", channel: " + channel.getLocalAddress()+ " -> " + channel.getRemoteAddress()));}} finally { CHANNELS.remove(response.getId());}}可以看到调用的地方为:
com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchangeHandler#received
@Overridepublic void received(Channel channel, Object message) throws RemotingException { //省略一些代码} else if (message instanceof Response) { handleResponse(channel, (Response) message);//省略一些代码}}com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchangeHandler#handleResponse
static void handleResponse(Channel channel, Response response) throws RemotingException { if (response != null && !response.isHeartbeat()) { DefaultFuture.received(channel, response);}}回到DefaultFuture.received,可以看到通过Response id从FUTURES中拿了一个DefaultFuture出来,然后调用了doReceived方法,也就是说Response id和Request id 相同。结下来看看doReceived做了什么:
private void doReceived(Response res) { lock.lock();try { response = res;if (done != null) { done.signal();}} finally { lock.unlock();}if (callback != null) { invokeCallback(callback);}}首先是加锁,然后通过唤醒了阻塞在Condition上的人像采集软件 源码线程。看看什么地方会阻塞在done这个条件上:
@Overridepublic Object get(int timeout) throws RemotingException { if (timeout <= 0) { timeout = Constants.DEFAULT_TIMEOUT;}if (!isDone()) { long start = System.currentTimeMillis();lock.lock();try { while (!isDone()) { done.await(timeout, TimeUnit.MILLISECONDS);if (isDone() || System.currentTimeMillis() - start > timeout) { break;}}} catch (InterruptedException e) { throw new RuntimeException(e);} finally { lock.unlock();}if (!isDone()) { throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false));}}return returnFromResponse();}是get方法,get方法确实在request请求后被调用:
(Result) currentClient.request(inv, timeout).get()可以看到get方法的大致逻辑为,先获取锁,然后循环判断isDone,并阻塞等到条件,买底部指标源码当条件超时,如果任务完成,或者超过timeout结束循环,接着判断isDone,如果超时抛出TimeoutException。并且通过sent(request请求时间)是否>0()来判断是clientSide还是serverSide超时。
isDone逻辑如下:
@Overridepublic boolean isDone() { return response != null;}如果是正常Response,也有可能是超时的现象,可以看到get方法最后调用了一个函数:
public interface ResponseFuture { Object get() throws RemotingException;Object get(int timeoutInMillis) throws RemotingException;void setCallback(ResponseCallback callback);boolean isDone();}0TIMEOUT SIDESERVER_TIMEOUT(服务端超时): 这个就是正常的我们消费端请求一个RPC接口,服务端由于性能等一些原因处理时间超过了timeout配置时间。
CLIENT_TIMEOUT:我们可以看到是通过sent(上面有说sent>0)这个来判断是否clientTimeout,那么这个sent什么时候改变呢?就在发送请求的地方:
public interface ResponseFuture { Object get() throws RemotingException;Object get(int timeoutInMillis) throws RemotingException;void setCallback(ResponseCallback callback);boolean isDone();}1也就是说handler.sent一旦调用成功返回,那么就不算clientSide Timeout了。那么CLIENT_TIMEOUT大概率就是由于client端网络,系统等原因超时。
原文:/post/2024-11-29 23:58
2024-11-29 23:54
2024-11-29 23:18
2024-11-29 23:16
2024-11-29 22:34
2024-11-29 22:24