001/** 002 * Copyright 2014 Tampere University of Technology, Pori Department 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016package core.tut.pori.websocket; 017 018import java.io.IOException; 019import java.util.ArrayList; 020import java.util.HashMap; 021import java.util.List; 022import java.util.Map; 023 024import javax.websocket.CloseReason; 025import javax.websocket.Session; 026 027import org.apache.log4j.Logger; 028 029import core.tut.pori.users.UserIdentity; 030 031/** 032 * 033 * An abstract class for handling simple text/broadcast to users. 034 * 035 * Sub-classing this class will automatically add an instance to the WebSocketHandler, retrievable through ServiceInitializer. 036 * 037 * 038 */ 039public abstract class SocketService { 040 private static final Logger LOGGER = Logger.getLogger(SocketService.class); 041 private Map<Long, List<Session>> AUTHORIZED_USERS = new HashMap<>(); // userId, sessions map 042 private List<Session> UNAUTHORIZED_USERS = new ArrayList<>(); 043 044 /** 045 * The default implementation will simply print the exception to log and close the session. 046 * 047 * @param session 048 * @param throwable 049 */ 050 public void onError(Session session, Throwable throwable){ 051 LOGGER.debug(throwable.toString()); 052 onClose(session, null); 053 } 054 055 /** 056 * Close all sessions belonging to the given user identity. 057 * Note that calling this method may or may not cause disconnected() to be called depending on whether the user has active sessions or not. 058 * 059 * @param authorizedUser 060 * @param closeReason 061 */ 062 public void close(UserIdentity authorizedUser, CloseReason closeReason){ 063 Long userId = authorizedUser.getUserId(); 064 synchronized (AUTHORIZED_USERS) { 065 List<Session> sessions = AUTHORIZED_USERS.get(userId); 066 if(sessions == null){ 067 LOGGER.debug("No sessions for user, id: "+userId); 068 return; 069 } 070 for(Session session : sessions){ 071 try { 072 session.close(closeReason); 073 } catch (IOException ex) { 074 LOGGER.error(ex, ex); // simply log the exception, the session will be automatically removed in onClose 075 } 076 } // for 077 } // synchronized 078 } 079 080 /** 081 * 082 * @param session 083 * @return true on successful accept, false on rejection. Upon rejection the session will be automatically closed. 084 */ 085 public boolean accept(Session session){ 086 UserIdentity userIdentity = (UserIdentity) session.getUserPrincipal(); 087 if(userIdentity == null){ 088 if(accept()){ 089 synchronized (UNAUTHORIZED_USERS) { 090 UNAUTHORIZED_USERS.add(session); 091 } 092 LOGGER.debug("Added new unauthorized session."); 093 return true; 094 }else{ 095 return false; 096 } 097 }else if(accept(userIdentity)){ 098 Long userId = userIdentity.getUserId(); 099 synchronized (AUTHORIZED_USERS) { 100 List<Session> sessions = AUTHORIZED_USERS.get(userId); 101 if(sessions == null){ 102 AUTHORIZED_USERS.put(userId, (sessions = new ArrayList<>())); 103 } 104 sessions.add(session); 105 } 106 LOGGER.debug("Added new authorized session for user, id: "+userId); 107 return true; 108 }else{ 109 return false; 110 } 111 } 112 113 /** 114 * 115 * @param session 116 * @param closeReason 117 */ 118 public void onClose(Session session, CloseReason closeReason){ 119 LOGGER.debug("Closing session, reason: "+(closeReason == null ? "unknown" : closeReason.getCloseCode()+" "+closeReason.getReasonPhrase())); 120 121 UserIdentity userIdentity = (UserIdentity) session.getUserPrincipal(); 122 if(userIdentity == null){ 123 synchronized (UNAUTHORIZED_USERS) { 124 if(!UNAUTHORIZED_USERS.remove(session)){ 125 LOGGER.warn("Failed to remove session for an unauthorized user."); 126 } 127 } 128 disconnected(); 129 }else{ 130 Long userId = userIdentity.getUserId(); 131 synchronized (AUTHORIZED_USERS) { 132 List<Session> sessions = AUTHORIZED_USERS.get(userId); 133 if(sessions == null){ 134 LOGGER.warn("No known sessions for user, id: "+userId); 135 }else if(!sessions.remove(session)){ // the sessions list may not always have the closed session. This may happen e.g. if the connection was outright rejected or broken whilst establishing the connection 136 LOGGER.warn("Failed to remove closed session for user, id: "+userId); 137 } 138 139 if(sessions.isEmpty()){ 140 AUTHORIZED_USERS.remove(userId); 141 } 142 } // synchronized 143 disconnected(userIdentity); 144 } // else 145 } 146 147 /** 148 * 149 * @param session 150 * @param message 151 */ 152 public void received(Session session, String message){ 153 UserIdentity userIdentity = (UserIdentity) session.getUserPrincipal(); 154 if(userIdentity == null){ 155 received(message); 156 }else{ 157 received(userIdentity, message); 158 } 159 } 160 161 /** 162 * Send message to the given authorized user 163 * 164 * @param authenticatedUser 165 * @param message 166 * @return true if the message was successfully sent to at least one of the user's sessions 167 */ 168 public boolean send(UserIdentity authenticatedUser, String message){ 169 Long userId = authenticatedUser.getUserId(); 170 boolean retval = false; 171 synchronized (AUTHORIZED_USERS) { 172 List<Session> sessions = AUTHORIZED_USERS.get(userId); 173 if(sessions == null){ 174 LOGGER.warn("The user has no valid sessions."); 175 }else{ 176 for(Session session : sessions){ 177 try { 178 session.getBasicRemote().sendText(message); 179 retval = true; 180 } catch (IOException ex) { 181 LOGGER.warn(ex, ex); // simply print the message, if the connection is broken, a call to onClose should follow 182 } 183 } // for 184 } // else 185 } 186 return retval; 187 } 188 189 /** 190 * Send message to all unauthorized users 191 * 192 * @param message 193 */ 194 public void send(String message){ 195 synchronized (UNAUTHORIZED_USERS) { 196 for(Session session : UNAUTHORIZED_USERS){ 197 try { 198 session.getBasicRemote().sendText(message); 199 } catch (IOException ex) { 200 LOGGER.warn(ex, ex); // simply print the message, if the connection is broken, a call to onClose should follow 201 } 202 } 203 } 204 } 205 206 /** 207 * Send message to all connected users. 208 * 209 * @param message 210 */ 211 public void broadcast(String message){ 212 LOGGER.debug("Sending message to all authorized users."); 213 synchronized (AUTHORIZED_USERS) { 214 for(List<Session> sessionList : AUTHORIZED_USERS.values()){ 215 for(Session session : sessionList){ 216 try { 217 session.getBasicRemote().sendText(message); 218 } catch (IOException ex) { 219 LOGGER.warn(ex, ex); // simply print the message, if the connection is broken, a call to onClose should follow 220 } 221 } // for 222 } // for 223 } // synchronized 224 225 LOGGER.debug("Sending message to all unauthorized users."); 226 send(message); 227 } 228 229 /** 230 * 231 * @param authenticatedUser 232 * @return true if the given user has active sessions. Note that in this case "active" is loosely defined, it is possible that the session has timed out, but the socket service has not yet registered the drop. 233 */ 234 public boolean hasSessions(UserIdentity authenticatedUser){ 235 synchronized (AUTHORIZED_USERS) { 236 return (AUTHORIZED_USERS.get(authenticatedUser.getUserId()) != null); 237 } 238 } 239 240 /** 241 * 242 * @param authenticatedUser 243 * @return true if the connection for the given user should be accepted, otherwise, the connection will be closed 244 */ 245 public abstract boolean accept(UserIdentity authenticatedUser); 246 247 /** 248 * 249 * @return true if an anonymous connection should be accepted, otherwise, the connection will be closed 250 */ 251 public abstract boolean accept(); 252 253 /** 254 * 255 * @param authenticatedUser 256 * @param message 257 */ 258 public abstract void received(UserIdentity authenticatedUser, String message); 259 260 /** 261 * Message received for an anonymous user. 262 * 263 * @param message 264 */ 265 public abstract void received(String message); 266 267 /** 268 * The user has been disconnected. 269 * 270 * @param authenticatedUser 271 */ 272 public abstract void disconnected(UserIdentity authenticatedUser); 273 274 /** 275 * An anonymous user has been disconnected. 276 */ 277 public abstract void disconnected(); 278 279 /** 280 * Note: the name is checked on initialization, changing the name run-time has no effect. 281 * 282 * @return the end point name 283 */ 284 public abstract String getEndPointName(); 285}