dubbo重试和超时机制

dubbo重试和超时机制

dubbo 超时机制以及负载均衡、重试机制都是针对客户端进行的。

1. dubbo 重试机制

dubbo 重试机制针对不同的Invoker。主要的集群Invoker 有如下:

默认的集群Invoker是FailoverClusterInvoker。这里有重试机制。其默认的重试次数是2次(调用1次,重试2次,所以总共尝试是3次)。

package org.apache.dubbo.rpc.cluster.support;

import org.apache.dubbo.common.Version;

import org.apache.dubbo.common.logger.Logger;

import org.apache.dubbo.common.logger.LoggerFactory;

import org.apache.dubbo.common.utils.NetUtils;

import org.apache.dubbo.rpc.Invocation;

import org.apache.dubbo.rpc.Invoker;

import org.apache.dubbo.rpc.Result;

import org.apache.dubbo.rpc.RpcContext;

import org.apache.dubbo.rpc.RpcException;

import org.apache.dubbo.rpc.cluster.Directory;

import org.apache.dubbo.rpc.cluster.LoadBalance;

import org.apache.dubbo.rpc.support.RpcUtils;

import java.util.ArrayList;

import java.util.HashSet;

import java.util.List;

import java.util.Set;

import static org.apache.dubbo.rpc.cluster.Constants.DEFAULT_RETRIES;

import static org.apache.dubbo.rpc.cluster.Constants.RETRIES_KEY;

/**

* When invoke fails, log the initial error and retry other invokers (retry n times, which means at most n different invokers will be invoked)

* Note that retry causes latency.

*

* Failover

*

*/

public class FailoverClusterInvoker extends AbstractClusterInvoker {

private static final Logger logger = LoggerFactory.getLogger(FailoverClusterInvoker.class);

public FailoverClusterInvoker(Directory directory) {

super(directory);

}

@Override

@SuppressWarnings({"unchecked", "rawtypes"})

public Result doInvoke(Invocation invocation, final List> invokers, LoadBalance loadbalance) throws RpcException {

List> copyInvokers = invokers;

checkInvokers(copyInvokers, invocation);

String methodName = RpcUtils.getMethodName(invocation);

int len = getUrl().getMethodParameter(methodName, RETRIES_KEY, DEFAULT_RETRIES) + 1;

if (len <= 0) {

len = 1;

}

// retry loop.

RpcException le = null; // last exception.

List> invoked = new ArrayList>(copyInvokers.size()); // invoked invokers.

Set providers = new HashSet(len);

for (int i = 0; i < len; i++) {

//Reselect before retry to avoid a change of candidate `invokers`.

//NOTE: if `invokers` changed, then `invoked` also lose accuracy.

if (i > 0) {

checkWhetherDestroyed();

copyInvokers = list(invocation);

// check again

checkInvokers(copyInvokers, invocation);

}

Invoker invoker = select(loadbalance, invocation, copyInvokers, invoked);

invoked.add(invoker);

RpcContext.getContext().setInvokers((List) invoked);

try {

Result result = invoker.invoke(invocation);

if (le != null && logger.isWarnEnabled()) {

logger.warn("Although retry the method " + methodName

+ " in the service " + getInterface().getName()

+ " was successful by the provider " + invoker.getUrl().getAddress()

+ ", but there have been failed providers " + providers

+ " (" + providers.size() + "/" + copyInvokers.size()

+ ") from the registry " + directory.getUrl().getAddress()

+ " on the consumer " + NetUtils.getLocalHost()

+ " using the dubbo version " + Version.getVersion() + ". Last error is: "

+ le.getMessage(), le);

}

return result;

} catch (RpcException e) {

if (e.isBiz()) { // biz exception.

throw e;

}

le = e;

} catch (Throwable e) {

le = new RpcException(e.getMessage(), e);

} finally {

providers.add(invoker.getUrl().getAddress());

}

}

throw new RpcException(le.getCode(), "Failed to invoke the method "

+ methodName + " in the service " + getInterface().getName()

+ ". Tried " + len + " times of the providers " + providers

+ " (" + providers.size() + "/" + copyInvokers.size()

+ ") from the registry " + directory.getUrl().getAddress()

+ " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version "

+ Version.getVersion() + ". Last error is: "

+ le.getMessage(), le.getCause() != null ? le.getCause() : le);

}

}

View Code

如上代码可以看到负载均衡和重试机制。会重试三次,然后进行负载均衡选择服务器之后进行RPC调用。

如果正常则返回结果,如果重试都失败之后就抛出RPC异常。

还有一些其他的Invoker 调用策略,比如:

- failsafe, 这个策略默认是返回一个空值,失败不做任何异常处理。

public class FailsafeClusterInvoker extends AbstractClusterInvoker {

private static final Logger logger = LoggerFactory.getLogger(FailsafeClusterInvoker.class);

public FailsafeClusterInvoker(Directory directory) {

super(directory);

}

@Override

public Result doInvoke(Invocation invocation, List> invokers, LoadBalance loadbalance) throws RpcException {

try {

checkInvokers(invokers, invocation);

Invoker invoker = select(loadbalance, invocation, invokers, null);

return invoker.invoke(invocation);

} catch (Throwable e) {

logger.error("Failsafe ignore exception: " + e.getMessage(), e);

return AsyncRpcResult.newDefaultAsyncResult(null, null, invocation); // ignore

}

}

}

View Code

- failfast, 这个策略不进行重试,会直接抛出异常

public class FailfastClusterInvoker extends AbstractClusterInvoker {

public FailfastClusterInvoker(Directory directory) {

super(directory);

}

@Override

public Result doInvoke(Invocation invocation, List> invokers, LoadBalance loadbalance) throws RpcException {

checkInvokers(invokers, invocation);

Invoker invoker = select(loadbalance, invocation, invokers, null);

try {

return invoker.invoke(invocation);

} catch (Throwable e) {

if (e instanceof RpcException && ((RpcException) e).isBiz()) { // biz exception.

throw (RpcException) e;

}

throw new RpcException(e instanceof RpcException ? ((RpcException) e).getCode() : 0,

"Failfast invoke providers " + invoker.getUrl() + " " + loadbalance.getClass().getSimpleName()

+ " select from all providers " + invokers + " for service " + getInterface().getName()

+ " method " + invocation.getMethodName() + " on consumer " + NetUtils.getLocalHost()

+ " use dubbo version " + Version.getVersion()

+ ", but no luck to perform the invocation. Last error is: " + e.getMessage(),

e.getCause() != null ? e.getCause() : e);

}

}

}

View Code

重试次数可以通过修改全局配置,也可以针对单个服务进行修改:

(1) 全局配置

dubbo:

consumer:

retries: 3

(2) 单个配置

@Reference(version = "1.0.0", retries = 5)

private UserService userService;

2. dubbo 负载均衡

dubbo的负载均衡选择器如下:

(1) Random - 随机算法,(根据权重进行随机) 默认算法

(2) RoundRobin - 轮询(基于权重)负载均衡算法

(3) leastactive - 最近最少活跃

(4) consistenthash - 一致性hash, 计算参数的hash, 根据hash 进行选择

3. 超时机制

参考:https://juejin.cn/post/6887553443880255501

dubbo 超时一般也是针对消费者端。消费者端的超时时间默认是1000 ms,可以通过配置文件进行修改。下面研究其超时实现:

(1) dubbo客户端服务调用会调用到如下方法: org.apache.dubbo.rpc.protocol.AsyncToSyncInvoker#invoke:

public Result invoke(Invocation invocation) throws RpcException {

Result asyncResult = invoker.invoke(invocation);

try {

if (InvokeMode.SYNC == ((RpcInvocation) invocation).getInvokeMode()) {

asyncResult.get(Integer.MAX_VALUE, TimeUnit.MILLISECONDS);

}

} catch (InterruptedException e) {

throw new RpcException("Interrupted unexpectedly while waiting for remoting result to return! method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);

} catch (ExecutionException e) {

Throwable t = e.getCause();

if (t instanceof TimeoutException) {

throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);

} else if (t instanceof RemotingException) {

throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);

}

} catch (Throwable e) {

throw new RpcException(e.getMessage(), e);

}

return asyncResult;

}

这里可以看出: 如果是同步模式的话,需要调用asyncResult.get 获取等待结果。如果是异步模式的话直接返回asyncResult, 不等待返回结果。所以同步是通过java.util.concurrent.Future#get(long, java.util.concurrent.TimeUnit) 同步阻塞等待结果实现的。 内部会判断如果是超时异常,会抛出异常。

get 后面会调用到: java.util.concurrent.CompletableFuture#reportGet 然后抛出异常 ExecutionException

private static T reportGet(Object r)

throws InterruptedException, ExecutionException {

if (r == null) // by convention below, null means interrupted

throw new InterruptedException();

if (r instanceof AltResult) {

Throwable x, cause;

if ((x = ((AltResult)r).ex) == null)

return null;

if (x instanceof CancellationException)

throw (CancellationException)x;

if ((x instanceof CompletionException) &&

(cause = x.getCause()) != null)

x = cause;

throw new ExecutionException(x);

}

@SuppressWarnings("unchecked") T t = (T) r;

return t;

}

(2) 下面研究其生成AltResult 的过程

这里可以理解为后面有个定时任务(HashedWheelTimer, 哈希轮定时器)来对发送之后的请求进行判断。如果超过超时时间的设置就设置响应结果为超时,这样前面在get 调用到 reportGet 的时候可以获取到相关的超时异常。

org.apache.dubbo.rpc.protocol.AsyncToSyncInvoker#invoke 经过一系列调用会调用到 org.apache.dubbo.remoting.exchange.support.header.HeaderExchangeChannel#request(java.lang.Object, int)

public CompletableFuture request(Object request, int timeout) throws RemotingException {

if (closed) {

throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");

}

// create request.

Request req = new Request();

req.setVersion(Version.getProtocolVersion());

req.setTwoWay(true);

req.setData(request);

DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout);

try {

channel.send(req);

} catch (RemotingException e) {

future.cancel();

throw e;

}

return future;

}

1》org.apache.dubbo.remoting.exchange.support.DefaultFuture#newFuture

public static DefaultFuture newFuture(Channel channel, Request request, int timeout) {

final DefaultFuture future = new DefaultFuture(channel, request, timeout);

// timeout check

timeoutCheck(future);

return future;

}

private static void timeoutCheck(DefaultFuture future) {

TimeoutCheckTask task = new TimeoutCheckTask(future.getId());

future.timeoutCheckTask = TIME_OUT_TIMER.newTimeout(task, future.getTimeout(), TimeUnit.MILLISECONDS);

}

2》org.apache.dubbo.common.timer.HashedWheelTimer#newTimeout 这里创建超时检测任务并且添加到timeouts 队列中

public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {

if (task == null) {

throw new NullPointerException("task");

}

if (unit == null) {

throw new NullPointerException("unit");

}

long pendingTimeoutsCount = pendingTimeouts.incrementAndGet();

if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) {

pendingTimeouts.decrementAndGet();

throw new RejectedExecutionException("Number of pending timeouts ("

+ pendingTimeoutsCount + ") is greater than or equal to maximum allowed pending "

+ "timeouts (" + maxPendingTimeouts + ")");

}

start();

// Add the timeout to the timeout queue which will be processed on the next tick.

// During processing all the queued HashedWheelTimeouts will be added to the correct HashedWheelBucket.

long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;

// Guard against overflow.

if (delay > 0 && deadline < 0) {

deadline = Long.MAX_VALUE;

}

HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);

timeouts.add(timeout);

return timeout;

}

3》接下来就交给org.apache.dubbo.common.timer.HashedWheelTimer.Worker#run 方法定时跑任务

public void run() {

// Initialize the startTime.

startTime = System.nanoTime();

if (startTime == 0) {

// We use 0 as an indicator for the uninitialized value here, so make sure it's not 0 when initialized.

startTime = 1;

}

// Notify the other threads waiting for the initialization at start().

startTimeInitialized.countDown();

do {

final long deadline = waitForNextTick();

if (deadline > 0) {

int idx = (int) (tick & mask);

processCancelledTasks();

HashedWheelBucket bucket =

wheel[idx];

transferTimeoutsToBuckets();

bucket.expireTimeouts(deadline);

tick++;

}

} while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);

// Fill the unprocessedTimeouts so we can return them from stop() method.

for (HashedWheelBucket bucket : wheel) {

bucket.clearTimeouts(unprocessedTimeouts);

}

for (; ; ) {

HashedWheelTimeout timeout = timeouts.poll();

if (timeout == null) {

break;

}

if (!timeout.isCancelled()) {

unprocessedTimeouts.add(timeout);

}

}

processCancelledTasks();

}

然后会调用到: org.apache.dubbo.common.timer.HashedWheelTimer.HashedWheelBucket#expireTimeouts

/**

* Expire all {@link HashedWheelTimeout}s for the given {@code deadline}.

*/

void expireTimeouts(long deadline) {

HashedWheelTimeout timeout = head;

// process all timeouts

while (timeout != null) {

HashedWheelTimeout next = timeout.next;

if (timeout.remainingRounds <= 0) {

next = remove(timeout);

if (timeout.deadline <= deadline) {

timeout.expire();

} else {

// The timeout was placed into a wrong slot. This should never happen.

throw new IllegalStateException(String.format(

"timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline));

}

} else if (timeout.isCancelled()) {

next = remove(timeout);

} else {

timeout.remainingRounds--;

}

timeout = next;

}

}

继续调用调用到:org.apache.dubbo.common.timer.HashedWheelTimer.HashedWheelTimeout#expire

public void expire() {

if (!compareAndSetState(ST_INIT, ST_EXPIRED)) {

return;

}

try {

task.run(this);

} catch (Throwable t) {

if (logger.isWarnEnabled()) {

logger.warn("An exception was thrown by " + TimerTask.class.getSimpleName() + '.', t);

}

}

}

继续调用:org.apache.dubbo.remoting.exchange.support.DefaultFuture.TimeoutCheckTask#run

@Override

public void run(Timeout timeout) {

DefaultFuture future = DefaultFuture.getFuture(requestID);

if (future == null || future.isDone()) {

return;

}

// create exception response.

Response timeoutResponse = new Response(future.getId());

// set timeout status.

timeoutResponse.setStatus(future.isSent() ? Response.SERVER_TIMEOUT : Response.CLIENT_TIMEOUT);

timeoutResponse.setErrorMessage(future.getTimeoutMessage(true));

// handle response.

DefaultFuture.received(future.getChannel(), timeoutResponse, true);

}

然后调用 org.apache.dubbo.remoting.exchange.support.DefaultFuture#received(org.apache.dubbo.remoting.Channel, org.apache.dubbo.remoting.exchange.Response, boolean) 设置结果为超时结果。 这里的res.getStatus 为SERVER_TIMEOUT。 也就是请求已经发出去,但是在指定的超时时间内没有响应结果。如果在指定时间内,请求还没有发出去则认为是客户端超时。

public static void received(Channel channel, Response response, boolean timeout) {

try {

DefaultFuture future = FUTURES.remove(response.getId());

if (future != null) {

Timeout t = future.timeoutCheckTask;

if (!timeout) {

// decrease Time

t.cancel();

}

future.doReceived(response);

} else {

logger.warn("The timeout response finally returned at "

+ (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()))

+ ", response " + response

+ (channel == null ? "" : ", channel: " + channel.getLocalAddress()

+ " -> " + channel.getRemoteAddress()));

}

} finally {

CHANNELS.remove(response.getId());

}

}

private void doReceived(Response res) {

if (res == null) {

throw new IllegalStateException("response cannot be null");

}

if (res.getStatus() == Response.OK) {

this.complete(res.getResult());

} else if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) {

this.completeExceptionally(new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage()));

} else {

this.completeExceptionally(new RemotingException(channel, res.getErrorMessage()));

}

}

然后调用 java.util.concurrent.CompletableFuture#completeExceptionally 设置结果为:AltResult, 并且记录异常为超时异常。这样在前面的get 阻塞获取结果的时候就可以获取到结果。

public boolean completeExceptionally(Throwable ex) {

if (ex == null) throw new NullPointerException();

boolean triggered = internalComplete(new AltResult(ex));

postComplete();

return triggered;

}

补充:重试次数、是否异步等都可以单独配置。

这里需要注意,如果配置不生效,可能是单个服务里面配置了多个Reference。 注解生成的代理对象是单例,所以导致不生效。 最好是将dubbo 接口集中统一管理。

@Reference(version = "1.0.0", retries = 5, async = true)

private UserService userService;

查看注解如下:

@Documented

@Retention(RetentionPolicy.RUNTIME)

@Target({ElementType.FIELD, ElementType.METHOD, ElementType.ANNOTATION_TYPE})

public @interface Reference {

/**

* Interface class, default value is void.class

*/

Class interfaceClass() default void.class;

/**

* Interface class name, default value is empty string

*/

String interfaceName() default "";

/**

* Service version, default value is empty string

*/

String version() default "";

/**

* Service group, default value is empty string

*/

String group() default "";

/**

* Service target URL for direct invocation, if this is specified, then registry center takes no effect.

*/

String url() default "";

/**

* Client transport type, default value is "netty"

*/

String client() default "";

/**

* Whether to enable generic invocation, default value is false

*/

boolean generic() default false;

/**

* When enable, prefer to call local service in the same JVM if it's present, default value is true

*/

boolean injvm() default true;

/**

* Check if service provider is available during boot up, default value is true

*/

boolean check() default true;

/**

* Whether eager initialize the reference bean when all properties are set, default value is false

*/

boolean init() default false;

/**

* Whether to make connection when the client is created, the default value is false

*/

boolean lazy() default false;

/**

* Export an stub service for event dispatch, default value is false.

*

* @see org.apache.dubbo.rpc.Constants#STUB_EVENT_METHODS_KEY

*/

boolean stubevent() default false;

/**

* Whether to reconnect if connection is lost, if not specify, reconnect is enabled by default, and the interval

* for retry connecting is 2000 ms

*

* @see org.apache.dubbo.remoting.Constants#DEFAULT_RECONNECT_PERIOD

*/

String reconnect() default "";

/**

* Whether to stick to the same node in the cluster, the default value is false

*

* @see Constants#DEFAULT_CLUSTER_STICKY

*/

boolean sticky() default false;

/**

* How the proxy is generated, legal values include: jdk, javassist

*/

String proxy() default "";

/**

* Service stub name, use interface name + Local if not set

*/

String stub() default "";

/**

* Cluster strategy, legal values include: failover, failfast, failsafe, failback, forking

*/

String cluster() default "";

/**

* Maximum connections service provider can accept, default value is 0 - connection is shared

*/

int connections() default 0;

/**

* The callback instance limit peer connection

*

* @see org.apache.dubbo.rpc.Constants#DEFAULT_CALLBACK_INSTANCES

*/

int callbacks() default 0;

/**

* Callback method name when connected, default value is empty string

*/

String onconnect() default "";

/**

* Callback method name when disconnected, default value is empty string

*/

String ondisconnect() default "";

/**

* Service owner, default value is empty string

*/

String owner() default "";

/**

* Service layer, default value is empty string

*/

String layer() default "";

/**

* Service invocation retry times

*

* @see Constants#DEFAULT_RETRIES

*/

int retries() default 2;

/**

* Load balance strategy, legal values include: random, roundrobin, leastactive

*

* @see Constants#DEFAULT_LOADBALANCE

*/

String loadbalance() default "";

/**

* Whether to enable async invocation, default value is false

*/

boolean async() default false;

/**

* Maximum active requests allowed, default value is 0

*/

int actives() default 0;

/**

* Whether the async request has already been sent, the default value is false

*/

boolean sent() default false;

/**

* Service mock name, use interface name + Mock if not set

*/

String mock() default "";

/**

* Whether to use JSR303 validation, legal values are: true, false

*/

String validation() default "";

/**

* Timeout value for service invocation, default value is 0

*/

int timeout() default 0;

/**

* Specify cache implementation for service invocation, legal values include: lru, threadlocal, jcache

*/

String cache() default "";

/**

* Filters for service invocation

*

* @see Filter

*/

String[] filter() default {};

/**

* Listeners for service exporting and unexporting

*

* @see ExporterListener

*/

String[] listener() default {};

/**

* Customized parameter key-value pair, for example: {key1, value1, key2, value2}

*/

String[] parameters() default {};

/**

* Application associated name

*/

String application() default "";

/**

* Module associated name

*/

String module() default "";

/**

* Consumer associated name

*/

String consumer() default "";

/**

* Monitor associated name

*/

String monitor() default "";

/**

* Registry associated name

*/

String[] registry() default {};

/**

* The communication protocol of Dubbo Service

*

* @return the default value is ""

* @since 2.6.6

*/

String protocol() default "";

/**

* Service tag name

*/

String tag() default "";

/**

* methods support

*

* @return

*/

Method[] methods() default {};

/**

* The id

*

* @return default value is empty

* @since 2.7.3

*/

String id() default "";

}

View Code

相关数据流

汽车之家
365bet娱乐网址

汽车之家

⌚ 07-11 👁️‍🗨️ 1351
做外贸GV号码那里可以买到
365bet娱乐网址

做外贸GV号码那里可以买到

⌚ 09-15 👁️‍🗨️ 6365