preface
Recently, after the newly developed service went online, users gave feedback that data update was not successful. But it’s fine when tested locally; Consider that the difference between local and online environments is a single unit and a cluster. Considering this factor, I set up another service locally. The test result showed that the operation failed with a high probability and the transaction did not submit successfully. Due to the choice framework has no maintenance can only open the debug mode to troubleshoot the problem, so after two days of screening finally found according to the module name is TM participants find the problem, because the framework module name values of logic is to use the project name, under the cluster service registry to TM module name is the same, Then TM cannot find the specific participant, resulting in the failure of transaction submission.
1, version,
2. Process analysis
According to the flow chart of TC agent control processing in the following figure, the failure reasons of distributed transactions under the cluster are analyzed from the source level.
[
As can be seen from the figure above, the submission of the transaction is finally controlled by TM, so TM will finally notify the participants to respond to the transaction. However, in the cluster environment, the real participants are most likely not to receive the message from TM. So our starting point is naturally the process of sending TM notification messages.
2.1 TM NotifyGroup source code analysis
Process of TM notifying TC
private void notifyTransaction(DTXContext dtxContext, int transactionState) throws TransactionException {
List<TransactionUnit> transactionUnits = dtxContext.transactionUnits();/ / @ @ 1
log.debug("group[{}]'s transaction units: {}", dtxContext.getGroupId(), transactionUnits);
for (TransactionUnit transUnit : transactionUnits) {
NotifyUnitParams notifyUnitParams = new NotifyUnitParams();
notifyUnitParams.setGroupId(dtxContext.getGroupId());
notifyUnitParams.setUnitId(transUnit.getUnitId());
notifyUnitParams.setUnitType(transUnit.getUnitType());
notifyUnitParams.setState(transactionState);
txLogger.txTrace(dtxContext.getGroupId(), notifyUnitParams.getUnitId(), "notify {}'s unit: {}",
transUnit.getModId(), transUnit.getUnitId());
try {
List<String> modChannelKeys = rpcClient.remoteKeys(transUnit.getModId());/ / @ @ 2
if (modChannelKeys.isEmpty()) {
// record exception
throw new RpcException("offline mod.");
}
MessageDto respMsg =
rpcClient.request(modChannelKeys.get(0), MessageCreator.notifyUnit(notifyUnitParams));/ / @ @ 3
if(! MessageUtils.statusOk(respMsg)) {// Failed message processing for commit/rollbackList<Object> params = Arrays.asList(notifyUnitParams, transUnit.getModId()); rpcExceptionHandler.handleNotifyUnitBusinessException(params, respMsg.loadBean(Throwable.class)); }}catch (RpcException e) {
// Failed to commit/rollback communication
List<Object> params = Arrays.asList(notifyUnitParams, transUnit.getModId());
rpcExceptionHandler.handleNotifyUnitMessageException(params, e);
} finally {
txLogger.txTrace(dtxContext.getGroupId(), notifyUnitParams.getUnitId(), "notify unit over"); }}}Copy the code
- @@1 Gets all participants of the transaction group
- @@2 Obtains the connection based on the participant id
- @@3 sends a message to the first connection
It can be seen from @@2 and @@3 that if modId also caused TM to fail to find the accurate TC under the cluster, then we can take a look at the processing logic of @@2 with this problem:
public List<String> remoteKeys(String moduleName) {
return SocketManager.getInstance().remoteKeys(moduleName);/ / @ @ 1
}
/*
*/
public List<String> remoteKeys(String moduleName) {
List<String> allKeys = new ArrayList<>();
for (Channel channel : channels) {
if (moduleName.equals(getModuleName(channel))) {/ / @ @ 2allKeys.add(channel.remoteAddress().toString()); }}return allKeys;
}
/*
*/
public String getModuleName(Channel channel) {/ / @ @ 3
String key = channel.remoteAddress().toString();
return getModuleName(key);
}
/ * * /
public String getModuleName(String remoteKey) {/ / @ @ 4
AppInfo appInfo = appNames.get(remoteKey);
return appInfo == null ? null : appInfo.getAppName();
}
Copy the code
- @@1 Obtain the TC address based on moduleName
- By traversing all the connections established with TM, netease 2 finds the connections that meet the conditions according to moduleName
- Netease 3 obtains its ModuleName according to channel
- @4 gets ModuleName according to the remote address, where the structure of AppInfo is CurrentHashMap
Transunit.getmodid () and AppInfo are the focus of our investigation. Through their mutual call relationship, finally determine the initialization of both places.
Initialization of AppInfo
/ / the InitClientService on TM
public void bindModuleName(String remoteKey, String appName,String labelName) throws RpcException{
AppInfo appInfo = new AppInfo();
appInfo.setAppName(appName);
appInfo.setLabelName(labelName);
appInfo.setCreateTime(new Date());
if(containsLabelName(labelName)){
throw new RpcException("labelName:"+labelName+" has exist.");
}
appNames.put(remoteKey, appInfo);
}
// appName=applicationName;
String appName = environment.getProperty("spring.application.name");
this.applicationName = StringUtils.hasText(appName) ? appName : "application";
// labelName=modIdProvider.getModId
public static String modId(ConfigurableEnvironment environment, ServerProperties serverProperties) {
String applicationName = environment.getProperty("spring.application.name");
applicationName = StringUtils.hasText(applicationName) ? applicationName : "application";
return applicationName + ":" + serverPort(serverProperties);
}
Copy the code
Initialization of TransactionUnit
//JoinGroupExecuteService
transactionManager.join(dtxContext, joinGroupParams.getUnitId(), joinGroupParams.getUnitType(),
rpcClient.getAppName(transactionCmd.getRemoteKey()), joinGroupParams.getTransactionState());
//NettyRpcClient
public String getAppName(String remoteKey) {
return SocketManager.getInstance().getModuleName(remoteKey);
}
//SocketManager
public String getModuleName(String remoteKey) {
AppInfo appInfo = appNames.get(remoteKey);
return appInfo == null ? null : appInfo.getAppName();
}
//SimpleTransactionManager
public void join(DTXContext dtxContext, String unitId, String unitType, String modId, int userState) throws TransactionException {
// Set the state to rollback state 0 for manual rollback
if (userState == 0) {
dtxContext.resetTransactionState(0);
}
TransactionUnit transactionUnit = new TransactionUnit();
transactionUnit.setModId(modId);
transactionUnit.setUnitId(unitId);
transactionUnit.setUnitType(unitType);
dtxContext.join(transactionUnit);
}
Copy the code
AppName = environment. GetProperty (“spring.application.name”); AppName = environment. You can see that the spring.application.name of the same service in the cluster is the same.
3, conclusion
In the cluster, because the application name is the same, the TM end cannot find the exact participant when sending the notification information, which leads to the failure of transaction submission. In view of this situation, one is to change the name of the application service, start a service to change the name of the service, this method is cumbersome and impractical, so abandon; The second way is to modify the source code to change the search logic and change the ModId of TransactionUnit to labelName, where the name of LableName is service name + IP + port number, so relevant methods need to be rewritten.
4. Solutions
There are two ways to modify the source code:
- Download the source code directly in the source code to modify, modify the package after the completion of use
- Create classes in the same directory and file under the project, and modify the source code according to the priority of the compiled file.
The second method will be adopted in this solution. The first method has high coupling degree and is not suitable for framework version upgrade.
4.1. Method of rewriting modId
Place this file in the project’s public module class
@Configuration
public class LcnConfig {
@Bean
@ConditionalOnMissingBean
@Primary
public ModIdProvider modIdProvider(ConfigurableEnvironment environment,
@Autowired(required = false) ServerProperties serverProperties) {
return () -> modId(environment, serverProperties);
}
private String modId(ConfigurableEnvironment environment, ServerProperties serverProperties) {
String applicationName = environment.getProperty("spring.application.name");
applicationName = StringUtils.hasText(applicationName) ? applicationName : "application";
return applicationName + ":" + serverPort(serverProperties);
}
/** * If this method is not accurate, please obtain the address from the configuration file. * IP:PORT * *@param serverProperties serverProperties
* @return int
*/
private String serverPort(ServerProperties serverProperties) {
return Objects.isNull(serverProperties) ? "127.0.0.1:8080" :
(Objects.isNull(serverProperties.getPort()) && Objects.isNull(serverProperties.getAddress().getHostAddress()) ? "127.0.0.1:8080" :
serverProperties.getAddress().getHostAddress() + ":"+ serverProperties.getPort()); }}Copy the code
4.2 Rewriting the SocketManager Class
Note: Create files in the same directory in the public package of the project
* * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */
package com.codingapi.txlcn.txmsg.netty.bean;
import com.codingapi.txlcn.txmsg.RpcConfig;
import com.codingapi.txlcn.txmsg.dto.AppInfo;
import com.codingapi.txlcn.txmsg.dto.MessageDto;
import com.codingapi.txlcn.txmsg.dto.RpcCmd;
import com.codingapi.txlcn.txmsg.dto.RpcResponseState;
import com.codingapi.txlcn.txmsg.exception.RpcException;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;
import lombok.extern.slf4j.Slf4j;
import java.net.SocketAddress;
import java.util.*;
import java.util.concurrent.*;
/** * Created by lorne on 2017/6/30. */
@Slf4j
public class SocketManager {
private Map<String, AppInfo> appNames;
private ScheduledExecutorService executorService;
private ChannelGroup channels;
private static SocketManager manager = null;
private long attrDelayTime = 1000 * 60;
private SocketManager(a) {
channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
appNames = new ConcurrentHashMap<>();
executorService = Executors.newSingleThreadScheduledExecutor();
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
executorService.shutdown();
try {
executorService.awaitTermination(10, TimeUnit.MINUTES);
} catch (InterruptedException ignored) {
}
}));
}
public static SocketManager getInstance(a) {
if (manager == null) {
synchronized (SocketManager.class) {
if (manager == null) {
manager = newSocketManager(); }}}return manager;
}
public void addChannel(Channel channel) {
channels.add(channel);
}
public void removeChannel(Channel channel) {
channels.remove(channel);
String key = channel.remoteAddress().toString();
// The expiration time is not set
if (attrDelayTime < 0) {
appNames.remove(key);
return;
}
// Set the expiration time
try {
executorService.schedule(() -> {
appNames.remove(key);
}, attrDelayTime, TimeUnit.MILLISECONDS);
} catch (RejectedExecutionException ignored) {
// caused down server.}}private Channel getChannel(String key) throws RpcException {
for (Channel channel : channels) {
String val = channel.remoteAddress().toString();
if (key.equals(val)) {
returnchannel; }}throw new RpcException("channel not online.");
}
public RpcResponseState send(String key, RpcCmd cmd) throws RpcException {
Channel channel = getChannel(key);
ChannelFuture future = channel.writeAndFlush(cmd).syncUninterruptibly();
return future.isSuccess() ? RpcResponseState.success : RpcResponseState.fail;
}
public MessageDto request(String key, RpcCmd cmd, long timeout) throws RpcException {
NettyRpcCmd nettyRpcCmd = (NettyRpcCmd) cmd;
log.debug("get channel, key:{}", key);
Channel channel = getChannel(key);
channel.writeAndFlush(nettyRpcCmd);
log.debug("await response");
if (timeout < 1) {
nettyRpcCmd.await();
} else {
nettyRpcCmd.await(timeout);
}
MessageDto res = cmd.loadResult();
log.debug("response is: {}", res);
nettyRpcCmd.loadRpcContent().clear();
return res;
}
public MessageDto request(String key, RpcCmd cmd) throws RpcException {
return request(key, cmd, -1);
}
public List<String> loadAllRemoteKey(a) {
List<String> allKeys = new ArrayList<>();
for (Channel channel : channels) {
allKeys.add(channel.remoteAddress().toString());
}
return allKeys;
}
public ChannelGroup getChannels(a) {
return channels;
}
public int currentSize(a) {
return channels.size();
}
public boolean noConnect(SocketAddress socketAddress) {
for (Channel channel : channels) {
if (channel.remoteAddress().toString().equals(socketAddress.toString())) {
return false; }}return true;
}
/** * Get the module's remote identifier keys **@paramModuleName moduleName *@return remoteKeys
*/
public List<String> remoteKeys(String moduleName) {
List<String> allKeys = new ArrayList<>();
for (Channel channel : channels) {
if(moduleName.equals(getModuleName(channel))) { allKeys.add(channel.remoteAddress().toString()); }}return allKeys;
}
/** * bind connection data **@paramRemoteKey Remote identifier *@paramAppName Module name *@paramLabelName TC identifier name */
public void bindModuleName(String remoteKey, String appName, String labelName) throws RpcException {
AppInfo appInfo = new AppInfo();
appInfo.setAppName(appName);
appInfo.setLabelName(labelName);
appInfo.setCreateTime(new Date());
if (containsLabelName(labelName)) {
throw new RpcException("labelName:" + labelName + " has exist.");
}
appNames.put(remoteKey, appInfo);
}
public boolean containsLabelName(String moduleName) {
Set<String> keys = appNames.keySet();
for (String key : keys) {
AppInfo appInfo = appNames.get(key);
if (moduleName.equals(appInfo.getLabelName())) {
return true; }}return false;
}
public void setRpcConfig(RpcConfig rpcConfig) {
attrDelayTime = rpcConfig.getAttrDelayTime();
}
/** * Get the module name **@paramChannel Indicates the channel information@returnModule name */
public String getModuleName(Channel channel) {
String key = channel.remoteAddress().toString();
return getModuleName(key);
}
/** * Get the module name **@paramRemoteKey Remote unique identifier *@returnModule name */
public String getModuleName(String remoteKey) {
AppInfo appInfo = appNames.get(remoteKey);
return appInfo == null ? null : appInfo.getLabelName();
}
public List<AppInfo> appInfos(a) {
return newArrayList<>(appNames.values()); }}Copy the code
5, summary
My writing ability level is limited, the relevant explanation in the article is wrong, please help to point out, thank you! The next step is to analyze the framework of TX-LCN, understand its internal principles, and strengthen the understanding of distributed transactions.