001/**
002 * Copyright 2014 Tampere University of Technology, Pori Department
003 * 
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 * 
008 *   http://www.apache.org/licenses/LICENSE-2.0
009 * 
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016package core.tut.pori.websocket;
017
018import java.io.IOException;
019import java.util.ArrayList;
020import java.util.HashMap;
021import java.util.List;
022import java.util.Map;
023
024import javax.websocket.CloseReason;
025import javax.websocket.Session;
026
027import org.apache.log4j.Logger;
028
029import core.tut.pori.users.UserIdentity;
030
031/**
032 * 
033 * An abstract class for handling simple text/broadcast to users.
034 * 
035 * Sub-classing this class will automatically add an instance to the WebSocketHandler, retrievable through ServiceInitializer.
036 * 
037 * 
038 */
039public abstract class SocketService {
040  private static final Logger LOGGER = Logger.getLogger(SocketService.class);
041  private Map<Long, List<Session>> AUTHORIZED_USERS = new HashMap<>(); // userId, sessions map
042  private List<Session> UNAUTHORIZED_USERS = new ArrayList<>();
043  
044  /**
045   * The default implementation will simply print the exception to log and close the session.
046   * 
047   * @param session
048   * @param throwable 
049   */
050  public void onError(Session session, Throwable throwable){
051    LOGGER.debug(throwable.toString());
052    onClose(session, null);
053  }
054  
055  /**
056   * Close all sessions belonging to the given user identity. 
057   * Note that calling this method may or may not cause disconnected() to be called depending on whether the user has active sessions or not.
058   * 
059   * @param authorizedUser
060   * @param closeReason
061   */
062  public void close(UserIdentity authorizedUser, CloseReason closeReason){
063    Long userId = authorizedUser.getUserId();
064    synchronized (AUTHORIZED_USERS) {
065      List<Session> sessions = AUTHORIZED_USERS.get(userId);
066      if(sessions == null){
067        LOGGER.debug("No sessions for user, id: "+userId);
068        return;
069      }
070      for(Session session : sessions){
071        try {
072          session.close(closeReason);
073        } catch (IOException ex) {
074          LOGGER.error(ex, ex); // simply log the exception, the session will be automatically removed in onClose
075        }
076      } // for
077    } // synchronized
078  }
079  
080  /**
081   * 
082   * @param session
083   * @return true on successful accept, false on rejection. Upon rejection the session will be automatically closed.
084   */
085  public boolean accept(Session session){
086    UserIdentity userIdentity = (UserIdentity) session.getUserPrincipal();
087    if(userIdentity == null){
088      if(accept()){
089        synchronized (UNAUTHORIZED_USERS) {
090          UNAUTHORIZED_USERS.add(session);
091        }
092        LOGGER.debug("Added new unauthorized session.");
093        return true;
094      }else{
095        return false;
096      }
097    }else if(accept(userIdentity)){
098      Long userId = userIdentity.getUserId();
099      synchronized (AUTHORIZED_USERS) {
100        List<Session> sessions = AUTHORIZED_USERS.get(userId);
101        if(sessions == null){
102          AUTHORIZED_USERS.put(userId, (sessions = new ArrayList<>()));
103        }
104        sessions.add(session);
105      }
106      LOGGER.debug("Added new authorized session for user, id: "+userId);
107      return true;
108    }else{
109      return false;
110    }
111  }
112  
113  /**
114   * 
115   * @param session
116   * @param closeReason
117   */
118  public void onClose(Session session, CloseReason closeReason){
119    LOGGER.debug("Closing session, reason: "+(closeReason == null ? "unknown" : closeReason.getCloseCode()+" "+closeReason.getReasonPhrase()));
120    
121    UserIdentity userIdentity = (UserIdentity) session.getUserPrincipal();
122    if(userIdentity == null){
123      synchronized (UNAUTHORIZED_USERS) {
124        if(!UNAUTHORIZED_USERS.remove(session)){
125          LOGGER.warn("Failed to remove session for an unauthorized user.");
126        }
127      }
128      disconnected();
129    }else{
130      Long userId = userIdentity.getUserId();
131      synchronized (AUTHORIZED_USERS) {
132        List<Session> sessions = AUTHORIZED_USERS.get(userId);
133        if(sessions == null){
134          LOGGER.warn("No known sessions for user, id: "+userId);
135        }else if(!sessions.remove(session)){ // the sessions list may not always have the closed session. This may happen e.g. if the connection was outright rejected or broken whilst establishing the connection
136          LOGGER.warn("Failed to remove closed session for user, id: "+userId);
137        }
138        
139        if(sessions.isEmpty()){
140          AUTHORIZED_USERS.remove(userId);
141        }
142      } // synchronized
143      disconnected(userIdentity);
144    } // else
145  }
146  
147  /**
148   * 
149   * @param session
150   * @param message
151   */
152  public void received(Session session, String message){
153    UserIdentity userIdentity = (UserIdentity) session.getUserPrincipal();
154    if(userIdentity == null){
155      received(message);
156    }else{
157      received(userIdentity, message);
158    }
159  }
160  
161  /**
162   * Send message to the given authorized user
163   * 
164   * @param authenticatedUser
165   * @param message
166   * @return true if the message was successfully sent to at least one of the user's sessions
167   */
168  public boolean send(UserIdentity authenticatedUser, String message){
169    Long userId = authenticatedUser.getUserId();
170    boolean retval = false;
171    synchronized (AUTHORIZED_USERS) {
172      List<Session> sessions = AUTHORIZED_USERS.get(userId);
173      if(sessions == null){
174        LOGGER.warn("The user has no valid sessions.");
175      }else{
176        for(Session session : sessions){
177          try {
178            session.getBasicRemote().sendText(message);
179            retval = true;
180          } catch (IOException ex) {
181            LOGGER.warn(ex, ex); // simply print the message, if the connection is broken, a call to onClose should follow
182          }
183        } // for
184      } // else
185    }
186    return retval;
187  }
188  
189  /**
190   * Send message to all unauthorized users
191   * 
192   * @param message
193   */
194  public void send(String message){
195    synchronized (UNAUTHORIZED_USERS) {
196      for(Session session : UNAUTHORIZED_USERS){
197        try {
198          session.getBasicRemote().sendText(message);
199        } catch (IOException ex) {
200          LOGGER.warn(ex, ex); // simply print the message, if the connection is broken, a call to onClose should follow
201        }
202      }
203    }
204  }
205  
206  /**
207   * Send message to all connected users.
208   * 
209   * @param message
210   */
211  public void broadcast(String message){
212    LOGGER.debug("Sending message to all authorized users.");
213    synchronized (AUTHORIZED_USERS) {
214      for(List<Session> sessionList : AUTHORIZED_USERS.values()){
215        for(Session session : sessionList){
216          try {
217            session.getBasicRemote().sendText(message);
218          } catch (IOException ex) {
219            LOGGER.warn(ex, ex); // simply print the message, if the connection is broken, a call to onClose should follow
220          }
221        } // for
222      } // for
223    } // synchronized
224    
225    LOGGER.debug("Sending message to all unauthorized users.");
226    send(message);
227  }
228  
229  /**
230   * 
231   * @param authenticatedUser
232   * @return true if the given user has active sessions. Note that in this case "active" is loosely defined, it is possible that the session has timed out, but the socket service has not yet registered the drop.
233   */
234  public boolean hasSessions(UserIdentity authenticatedUser){
235    synchronized (AUTHORIZED_USERS) {
236      return (AUTHORIZED_USERS.get(authenticatedUser.getUserId()) != null);
237    }
238  }
239  
240  /**
241   * 
242   * @param authenticatedUser
243   * @return true if the connection for the given user should be accepted, otherwise, the connection will be closed
244   */
245  public abstract boolean accept(UserIdentity authenticatedUser);
246
247  /**
248   * 
249   * @return true if an anonymous connection should be accepted, otherwise, the connection will be closed
250   */
251  public abstract boolean accept();
252
253  /**
254   * 
255   * @param authenticatedUser
256   * @param message
257   */
258  public abstract void received(UserIdentity authenticatedUser, String message);
259
260  /**
261   * Message received for an anonymous user.
262   * 
263   * @param message
264   */
265  public abstract void received(String message);
266
267  /**
268   * The user has been disconnected.
269   * 
270   * @param authenticatedUser
271   */
272  public abstract void disconnected(UserIdentity authenticatedUser);
273
274  /**
275   * An anonymous user has been disconnected.
276   */
277  public abstract void disconnected();
278  
279  /**
280   * Note: the name is checked on initialization, changing the name run-time has no effect.
281   * 
282   * @return the end point name
283   */
284  public abstract String getEndPointName();
285}