001/** 002 * Copyright 2014 Tampere University of Technology, Pori Department 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016package service.tut.pori.cawebsocket; 017 018import java.util.Map.Entry; 019import java.util.Set; 020import java.util.concurrent.ConcurrentHashMap; 021 022import org.apache.log4j.Logger; 023import org.springframework.context.ApplicationListener; 024 025import com.google.common.collect.Sets; 026 027import service.tut.pori.contentanalysis.AbstractTaskDetails; 028import service.tut.pori.contentanalysis.Definitions; 029import service.tut.pori.contentanalysis.PhotoTaskDAO; 030import service.tut.pori.contentanalysis.AsyncTask.AsyncTaskEvent; 031import service.tut.pori.contentanalysis.AsyncTask.TaskType; 032import core.tut.pori.context.ServiceInitializer; 033import core.tut.pori.http.parameters.DataGroups; 034import core.tut.pori.users.UserAuthority; 035import core.tut.pori.users.UserIdentity; 036 037/** 038 * Core methods for CAWebSocketService 039 * 040 */ 041public final class CAWebSocketCore { 042 private static final Logger LOGGER = Logger.getLogger(CAWebSocketCore.class); 043 private static final ConcurrentHashMap<Long, Registration> REGISTERED_USERS = new ConcurrentHashMap<>(); // user-id, registration map 044 045 /** 046 * 047 */ 048 private CAWebSocketCore(){ 049 // nothing needed 050 } 051 052 /** 053 * Registers the given user as a listener for finished tasks. 054 * If the registration object has no user id filter and anonymous task listener is disabled this method will automatically add the authenticatedUser as a filter. 055 * 056 * @param authenticatedUser 057 * @param registration 058 */ 059 public static void taskFinishedRegistered(UserIdentity authenticatedUser, Registration registration){ 060 Long userId = authenticatedUser.getUserId(); 061 if(registration == null){ 062 LOGGER.debug("No registration details, using defaults for user, id: "+userId); 063 registration = new Registration(); 064 registration.setUserIds(Sets.newHashSet(userId)); 065 }else{ 066 boolean extendedPermissions = UserIdentity.hasAuthority(UserAuthority.AUTHORITY_ROLE_BACKEND, authenticatedUser); 067 Set<Long> userIdFilter = registration.getUserIds(); 068 if(registration.isListenAnonymousTasks() && !extendedPermissions){ // only allow user with extended permissions to listen for anonymous tasks. If anonymous listening is enabled (and permissions match), the user id filter can be given or omitted. 069 LOGGER.warn("User, id: "+userId+" attempted to listen for anonymous tasks without appropriate permissions."); 070 ServiceInitializer.getWebSocketHandler().getSocketService(TaskFinishedService.class).close(authenticatedUser, core.tut.pori.websocket.Definitions.CLOSE_REASON_FORBIDDEN); 071 return; 072 }else if(userIdFilter != null){ 073 if(userIdFilter.isEmpty()){ 074 LOGGER.debug("Empty user id filter, adding the authenticated user as a listener."); 075 registration.setUserIds(Sets.newHashSet(userId)); 076 }else if(!extendedPermissions && (userIdFilter.size() != 1 || !userIdFilter.iterator().next().equals(userId))){ // check that there are no other user ids in addition to the authenticatedUser if the user has no extended permissions 077 LOGGER.warn("Bad user id filter for user, id: "+userId); 078 ServiceInitializer.getWebSocketHandler().getSocketService(TaskFinishedService.class).close(authenticatedUser, core.tut.pori.websocket.Definitions.CLOSE_REASON_FORBIDDEN); 079 return; 080 } 081 }else{ // userIdFilter == null 082 LOGGER.debug("No user id filter, adding the authenticated user as a listener."); 083 registration.setUserIds(Sets.newHashSet(userId)); 084 } 085 086 LOGGER.debug("Registering listener for user, id: "+userId); 087 } 088 REGISTERED_USERS.put(userId, registration); 089 } 090 091 /** 092 * 093 * @param authenticatedUser 094 */ 095 public static void taskFinishedUnregistered(UserIdentity authenticatedUser){ 096 Long userId = authenticatedUser.getUserId(); 097 LOGGER.debug("Unregistering listener for user, id: "+userId); 098 REGISTERED_USERS.remove(userId); 099 } 100 101 /** 102 * Listens for TaskStatus messages from completed tasks 103 * 104 */ 105 @SuppressWarnings("unused") // instance created automatically 106 private static class TaskFinishedListener implements ApplicationListener<AsyncTaskEvent>{ 107 108 @Override 109 public void onApplicationEvent(AsyncTaskEvent event) { 110 if(event.getStatus() != service.tut.pori.contentanalysis.AsyncTask.TaskStatus.COMPLETED){ // ignore everything but completed 111 return; 112 } 113 if(REGISTERED_USERS.isEmpty()){ 114 LOGGER.debug("No listeners..."); 115 return; 116 } 117 118 Long taskId = event.getTaskId(); 119 Integer backendId = event.getBackendId(); 120 AbstractTaskDetails details = ServiceInitializer.getDAOHandler().getSQLDAO(PhotoTaskDAO.class).getTask(backendId, new DataGroups(DataGroups.DATA_GROUP_BASIC, Definitions.DATA_GROUP_BACKEND_STATUS), null, taskId); 121 if(details == null){ 122 LOGGER.warn("Received task finished for non-existing task, id: "+taskId+", for back-end, id: "+backendId); 123 return; 124 } 125 126 LOGGER.debug("Received "+service.tut.pori.contentanalysis.AsyncTask.TaskStatus.COMPLETED.name()+", sending notifications..."); 127 Long taskUserId = details.getUserIdValue(); 128 TaskFinishedService service = ServiceInitializer.getWebSocketHandler().getSocketService(TaskFinishedService.class); 129 UserIdentity userId = new UserIdentity(); 130 TaskType taskType = event.getTaskType(); 131 TaskStatus taskStatus = new TaskStatus(); 132 taskStatus.setTaskId(taskId); 133 taskStatus.setTaskType(taskType); 134 taskStatus.setBackendStatusList(details.getBackends()); 135 for(Entry<Long, Registration> e : REGISTERED_USERS.entrySet()){ 136 Registration registration = e.getValue(); 137 if(((taskUserId == null && registration.isListenAnonymousTasks()) || registration.hasUserId(taskUserId)) && registration.hasBackendId(backendId) && registration.hasTaskId(taskId) && registration.hasTaskType(taskType)){ 138 userId.setUserId(e.getKey()); 139 service.send(userId, taskStatus); 140 } 141 } 142 } 143 } // class TaskFinishedListener 144}