001/**
002 * Copyright 2014 Tampere University of Technology, Pori Department
003 * 
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 * 
008 *   http://www.apache.org/licenses/LICENSE-2.0
009 * 
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016package service.tut.pori.cawebsocket;
017
018import java.util.Map.Entry;
019import java.util.Set;
020import java.util.concurrent.ConcurrentHashMap;
021
022import org.apache.log4j.Logger;
023import org.springframework.context.ApplicationListener;
024
025import com.google.common.collect.Sets;
026
027import service.tut.pori.contentanalysis.AbstractTaskDetails;
028import service.tut.pori.contentanalysis.Definitions;
029import service.tut.pori.contentanalysis.PhotoTaskDAO;
030import service.tut.pori.contentanalysis.AsyncTask.AsyncTaskEvent;
031import service.tut.pori.contentanalysis.AsyncTask.TaskType;
032import core.tut.pori.context.ServiceInitializer;
033import core.tut.pori.http.parameters.DataGroups;
034import core.tut.pori.users.UserAuthority;
035import core.tut.pori.users.UserIdentity;
036
037/**
038 * Core methods for CAWebSocketService
039 * 
040 */
041public final class CAWebSocketCore {
042  private static final Logger LOGGER = Logger.getLogger(CAWebSocketCore.class);
043  private static final ConcurrentHashMap<Long, Registration> REGISTERED_USERS = new ConcurrentHashMap<>(); // user-id, registration map
044  
045  /**
046   * 
047   */
048  private CAWebSocketCore(){
049    // nothing needed
050  }
051  
052  /**
053   * Registers the given user as a listener for finished tasks. 
054   * If the registration object has no user id filter and anonymous task listener is disabled this method will automatically add the authenticatedUser as a filter.
055   * 
056   * @param authenticatedUser
057   * @param registration
058   */
059  public static void taskFinishedRegistered(UserIdentity authenticatedUser, Registration registration){
060    Long userId = authenticatedUser.getUserId();
061    if(registration == null){
062      LOGGER.debug("No registration details, using defaults for user, id: "+userId);
063      registration = new Registration();
064      registration.setUserIds(Sets.newHashSet(userId));
065    }else{
066      boolean extendedPermissions = UserIdentity.hasAuthority(UserAuthority.AUTHORITY_ROLE_BACKEND, authenticatedUser);
067      Set<Long> userIdFilter = registration.getUserIds();
068      if(registration.isListenAnonymousTasks() && !extendedPermissions){ // only allow user with extended permissions to listen for anonymous tasks. If anonymous listening is enabled (and permissions match), the user id filter can be given or omitted.
069        LOGGER.warn("User, id: "+userId+" attempted to listen for anonymous tasks without appropriate permissions.");
070        ServiceInitializer.getWebSocketHandler().getSocketService(TaskFinishedService.class).close(authenticatedUser, core.tut.pori.websocket.Definitions.CLOSE_REASON_FORBIDDEN);
071        return;
072      }else if(userIdFilter != null){
073        if(userIdFilter.isEmpty()){ 
074          LOGGER.debug("Empty user id filter, adding the authenticated user as a listener.");
075          registration.setUserIds(Sets.newHashSet(userId));
076        }else if(!extendedPermissions && (userIdFilter.size() != 1 || !userIdFilter.iterator().next().equals(userId))){ // check that there are no other user ids in addition to the authenticatedUser if the user has no extended permissions
077          LOGGER.warn("Bad user id filter for user, id: "+userId);
078          ServiceInitializer.getWebSocketHandler().getSocketService(TaskFinishedService.class).close(authenticatedUser, core.tut.pori.websocket.Definitions.CLOSE_REASON_FORBIDDEN);
079          return;
080        }
081      }else{ // userIdFilter == null
082        LOGGER.debug("No user id filter, adding the authenticated user as a listener.");
083        registration.setUserIds(Sets.newHashSet(userId));
084      }
085      
086      LOGGER.debug("Registering listener for user, id: "+userId);
087    }
088    REGISTERED_USERS.put(userId, registration);
089  }
090  
091  /**
092   * 
093   * @param authenticatedUser
094   */
095  public static void taskFinishedUnregistered(UserIdentity authenticatedUser){
096    Long userId = authenticatedUser.getUserId();
097    LOGGER.debug("Unregistering listener for user, id: "+userId);
098    REGISTERED_USERS.remove(userId);
099  }
100  
101  /**
102   * Listens for TaskStatus messages from completed tasks
103   *
104   */
105  @SuppressWarnings("unused") // instance created automatically
106  private static class TaskFinishedListener implements ApplicationListener<AsyncTaskEvent>{
107
108    @Override
109    public void onApplicationEvent(AsyncTaskEvent event) {
110      if(event.getStatus() != service.tut.pori.contentanalysis.AsyncTask.TaskStatus.COMPLETED){ // ignore everything but completed
111        return;
112      }
113      if(REGISTERED_USERS.isEmpty()){
114        LOGGER.debug("No listeners...");
115        return;
116      }
117      
118      Long taskId = event.getTaskId();
119      Integer backendId = event.getBackendId();
120      AbstractTaskDetails details = ServiceInitializer.getDAOHandler().getSQLDAO(PhotoTaskDAO.class).getTask(backendId, new DataGroups(DataGroups.DATA_GROUP_BASIC, Definitions.DATA_GROUP_BACKEND_STATUS), null, taskId);
121      if(details == null){
122        LOGGER.warn("Received task finished for non-existing task, id: "+taskId+", for back-end, id: "+backendId);
123        return;
124      }
125      
126      LOGGER.debug("Received "+service.tut.pori.contentanalysis.AsyncTask.TaskStatus.COMPLETED.name()+", sending notifications...");
127      Long taskUserId = details.getUserIdValue();
128      TaskFinishedService service = ServiceInitializer.getWebSocketHandler().getSocketService(TaskFinishedService.class);
129      UserIdentity userId = new UserIdentity();
130      TaskType taskType = event.getTaskType();
131      TaskStatus taskStatus = new TaskStatus();
132      taskStatus.setTaskId(taskId);
133      taskStatus.setTaskType(taskType);
134      taskStatus.setBackendStatusList(details.getBackends());
135      for(Entry<Long, Registration> e : REGISTERED_USERS.entrySet()){
136        Registration registration = e.getValue();
137        if(((taskUserId == null && registration.isListenAnonymousTasks()) || registration.hasUserId(taskUserId)) && registration.hasBackendId(backendId) && registration.hasTaskId(taskId) && registration.hasTaskType(taskType)){
138          userId.setUserId(e.getKey());
139          service.send(userId, taskStatus);
140        }
141      }
142    }
143  } // class TaskFinishedListener
144}