tj
2025-06-05 2d549a04870d1315868a7cf19952b64e8071e711
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
package com.cloudroam.module.message;
 
import com.cloudroam.service.GroupService;
import com.cloudroam.model.UserDO;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;
 
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicInteger;
 
/**
 * @author 
 * @author 
 * websocket实现类
 */
@Slf4j
public class WsHandlerImpl implements WsHandler {
 
    private final AtomicInteger connectionCount = new AtomicInteger(0);
 
    private CopyOnWriteArraySet<WebSocketSession> sessions = new CopyOnWriteArraySet<>();
 
    @Autowired
    private GroupService groupService;
 
    @Override
    public void handleOpen(WebSocketSession session) {
        sessions.add(session);
        int cnt = connectionCount.incrementAndGet();
        log.info("a new connection opened,current online count:{}", cnt);
    }
 
    @Override
    public void handleClose(WebSocketSession session) {
        sessions.remove(session);
        int cnt = connectionCount.decrementAndGet();
        log.info("a connection closed,current online count:{}", cnt);
    }
 
    @Override
    public void handleMessage(WebSocketSession session, String message) {
        // 只处理前端传来的文本消息,并且直接丢弃了客户端传来的消息
    }
 
    @Override
    public void sendMessage(WebSocketSession session, String message) throws IOException {
        this.sendMessage(session, new TextMessage(message));
    }
 
    @Override
    public void sendMessage(Integer userId, TextMessage message) throws IOException {
        Optional<WebSocketSession> userSession = sessions.stream().filter(session -> {
            if (!session.isOpen()) {
                return false;
            }
            Map<String, Object> attributes = session.getAttributes();
            if (!attributes.containsKey(MessageConstant.USER_KEY)) {
                return false;
            }
            UserDO user = (UserDO) attributes.get(MessageConstant.USER_KEY);
            return user.getId().equals(userId);
        }).findFirst();
        if (userSession.isPresent()) {
            userSession.get().sendMessage(message);
        }
    }
 
    @Override
    public void sendMessage(Integer userId, String message) throws IOException {
        sendMessage(userId, new TextMessage(message));
    }
 
    @Override
    public void sendMessage(WebSocketSession session, TextMessage message) throws IOException {
        session.sendMessage(message);
    }
 
    @Override
    public void broadCast(String message) throws IOException {
        for (WebSocketSession session : sessions) {
            if (!session.isOpen()) {
                continue;
            }
            sendMessage(session, message);
        }
    }
 
    @Override
    public void broadCast(TextMessage message) throws IOException {
        for (WebSocketSession session : sessions) {
            if (!session.isOpen()) {
                continue;
            }
            session.sendMessage(message);
        }
    }
 
    @Override
    public void broadCastToGroup(Integer groupId, String message) throws IOException {
        this.broadCastToGroup(groupId, new TextMessage(message));
    }
 
    @Override
    public void broadCastToGroup(Integer groupId, TextMessage message) throws IOException {
        List<Integer> userIds = groupService.getGroupUserIds(groupId);
        for (WebSocketSession session : sessions) {
            if (!session.isOpen()) {
                continue;
            }
            Map<String, Object> attributes = session.getAttributes();
            if (!attributes.containsKey(MessageConstant.USER_KEY)) {
                continue;
            }
            UserDO user = (UserDO) attributes.get(MessageConstant.USER_KEY);
            boolean matched = userIds.stream().anyMatch(id -> id.equals(user.getId()));
            if (!matched) {
                continue;
            }
            session.sendMessage(message);
        }
    }
 
    @Override
    public void handleError(WebSocketSession session, Throwable error) {
        log.error("websocket error:{},session id: {}", error.getMessage(), session.getId());
        log.error("", error);
    }
 
    @Override
    public CopyOnWriteArraySet<WebSocketSession> getSessions() {
        return sessions;
    }
 
    @Override
    public int getConnectionCount() {
        return connectionCount.get();
    }
}