Commit 376b8064 by 段启岩

加入socket.io

parent 63b24893
package cn.meteor.beyondclouds.modules.im.server;
import cn.meteor.beyondclouds.core.redis.TokenManager;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import javax.websocket.OnClose;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
* @author meteor
*/
//@ServerEndpoint("/im")
//@Component
@Slf4j
public class ImServer {
private static final Map<String, Session> SESSION_MAP = new HashMap<>();
private static TokenManager tokenManager;
private static final Pattern TOKEN_PATTERN = Pattern.compile("token=([^&]+)");
private String userId;
@Autowired
public void setTokenManager(TokenManager tokenManager) {
ImServer.tokenManager = tokenManager;
}
/**
* 新用户连接
* @param session
* @throws IOException
*/
@OnOpen
public void onOpen(Session session) throws IOException {
String token = parseToken(session.getQueryString());
log.info("new websocket connection, token is {}", token);
// 若没有携带token, 则直接关闭webscoket连接
if (null == token) {
session.close();
return;
}
// 校验token
String userId = tokenManager.getUserId(token);
if (null == userId) {
session.close();
return;
}
this.userId = userId;
// 将session存入内存
SESSION_MAP.put(userId, session);
log.info("user {} connected to the im server", userId);
}
/**
* 用户断开连接
*/
@OnClose
public void onClose() {
SESSION_MAP.remove(userId);
log.info("user {} disconnected from the im server", userId);
}
private String parseToken(String queryString) {
if (!StringUtils.isEmpty(queryString)) {
Matcher matcher = TOKEN_PATTERN.matcher(queryString);
if (matcher.find()) {
return matcher.group(1);
}
}
return null;
}
public static void sendMessage(String userId, String message) throws IOException {
Session session = SESSION_MAP.get(userId);
if (null != session) {
session.getBasicRemote().sendText(message);
}
}
}
......@@ -11,5 +11,5 @@ public interface SocketIOService {
void stop();
// 推送信息
void pushMessageToUser(String userId, String message);
void pushMessage(String userId, String message);
}
package cn.meteor.beyondclouds.modules.im.server;
import cn.meteor.beyondclouds.core.redis.TokenManager;
import com.corundumstudio.socketio.SocketIOClient;
import com.corundumstudio.socketio.SocketIOServer;
import org.apache.commons.lang3.StringUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@Component
@Slf4j
public class SocketIOServiceImpl implements SocketIOService {
// 用来存已连接的客户端
private static Map<String, SocketIOClient> clientMap = new ConcurrentHashMap<>();
/**
* 用来存已连接的客户端
*/
private static final Map<String, SocketIOClient> CLIENT_MAP = new ConcurrentHashMap<>();
@Autowired
private SocketIOServer socketIOServer;
private TokenManager tokenManager;
@Autowired
public void setTokenManager(TokenManager tokenManager) {
this.tokenManager = tokenManager;
}
@Autowired
public void setSocketIOServer(SocketIOServer socketIOServer) {
this.socketIOServer = socketIOServer;
}
/**
* Spring IoC容器创建之后,在加载SocketIOServiceImpl Bean之后启动
*
* @throws Exception
*/
@PostConstruct
......@@ -33,7 +46,6 @@ public class SocketIOServiceImpl implements SocketIOService {
/**
* Spring IoC容器在销毁SocketIOServiceImpl Bean之前关闭,避免重启项目服务端口占用问题
*
* @throws Exception
*/
@PreDestroy
......@@ -45,30 +57,42 @@ public class SocketIOServiceImpl implements SocketIOService {
public void start() throws Exception {
// 监听客户端连接
socketIOServer.addConnectListener(client -> {
client.sendEvent(PUSH_EVENT, "hello, 这是服务器发来的消息!");
});
// 监听客户端断开连接
socketIOServer.addDisconnectListener(client -> {
String loginUserNum = getParamsByClient(client);
if (loginUserNum != null) {
clientMap.remove(loginUserNum);
System.out.println("断开连接: " + loginUserNum);
System.out.println("断开连接: " + client.getSessionId());
String token = client.getHandshakeData().getSingleUrlParam("token");
log.info("new websocket connection, token is {}", token);
// 若没有携带token, 则直接关闭webscoket连接
if (null == token) {
client.disconnect();
return;
}
});
// 处理自定义的事件,与连接监听类似
socketIOServer.addEventListener("text", Object.class, (client, data, ackSender) -> {
// TODO do something
client.getHandshakeData();
System.out.println( " 客户端:************ " + data);
// 校验token
String userId = tokenManager.getUserId(token);
if (null == userId) {
client.disconnect();
return;
}
// 将session存入内存
CLIENT_MAP.put(userId, client);
log.info("user {} connected to the im server", userId);
client.sendEvent(PUSH_EVENT, "hello, 欢迎连接服务器!");
});
// 监听客户端断开连接
socketIOServer.addDisconnectListener(client -> {
String token = client.getHandshakeData().getSingleUrlParam("token");
if (null != token) {
String userId = tokenManager.getUserId(token);
if (null != userId) {
CLIENT_MAP.remove(userId);
}
}
});
socketIOServer.start();
System.out.println("START.");
System.out.println("SOCKET SERVER START.");
}
......@@ -82,37 +106,11 @@ public class SocketIOServiceImpl implements SocketIOService {
}
@Override
public void pushMessageToUser(String userId, String message) {
if (StringUtils.isNotBlank(userId)) {
SocketIOClient client = clientMap.get(userId);
if (client != null) {
client.sendEvent(PUSH_EVENT, message);
}
}
}
/**
* 此方法为获取client连接中的参数,可根据需求更改
* @param client
* @return
*/
private String getParamsByClient(SocketIOClient client) {
// 从请求的连接中拿出参数(这里的loginUserNum必须是唯一标识)
Map<String, List<String>> params = client.getHandshakeData().getUrlParams();
List<String> list = params.get("loginUserNum");
if (list != null && list.size() > 0) {
return list.get(0);
public void pushMessage(String userId, String message) {
SocketIOClient client = CLIENT_MAP.get(userId);
if (null != client) {
client.sendEvent(PUSH_EVENT, message);
}
return null;
}
public static Map<String, SocketIOClient> getClientMap() {
return clientMap;
}
public static void setClientMap(Map<String, SocketIOClient> clientMap) {
SocketIOServiceImpl.clientMap = clientMap;
}
}
......@@ -7,7 +7,7 @@ import cn.meteor.beyondclouds.modules.blog.entity.Blog;
import cn.meteor.beyondclouds.modules.blog.entity.BlogComment;
import cn.meteor.beyondclouds.modules.blog.service.IBlogCommentService;
import cn.meteor.beyondclouds.modules.blog.service.IBlogService;
import cn.meteor.beyondclouds.modules.im.server.ImServer;
import cn.meteor.beyondclouds.modules.im.server.SocketIOService;
import cn.meteor.beyondclouds.modules.message.entity.Message;
import cn.meteor.beyondclouds.modules.message.enums.MessageType;
import cn.meteor.beyondclouds.modules.message.service.IMessageService;
......@@ -49,6 +49,7 @@ public class MessageListener implements DataItemChangeListener {
private IPostCommentService postCommentService;
private IQuestionReplyService questionReplyService;
private IQuestionService questionService;
private SocketIOService socketIOService;
@Autowired
......@@ -57,6 +58,11 @@ public class MessageListener implements DataItemChangeListener {
}
@Autowired
public void setSocketIOService(SocketIOService socketIOService) {
this.socketIOService = socketIOService;
}
@Autowired
public void setUserFollowService(IUserFollowService userFollowService) {
this.userFollowService = userFollowService;
}
......@@ -247,7 +253,7 @@ public class MessageListener implements DataItemChangeListener {
message.setMsgType(msgType);
message.setMscContent(msg);
messageService.save(message);
ImServer.sendMessage(toUserId, msg);
socketIOService.pushMessage(toUserId, msg);
}
......@@ -288,7 +294,7 @@ public class MessageListener implements DataItemChangeListener {
message.setMsgType(msgType);
message.setMscContent(msg);
messageService.save(message);
ImServer.sendMessage(toUserId, msg);
socketIOService.pushMessage(toUserId, msg);
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment