001/** 002 * Copyright 2014 Tampere University of Technology, Pori Department 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016package service.tut.pori.contentanalysis.video; 017 018import java.io.IOException; 019import java.io.InputStream; 020import java.util.ArrayList; 021import java.util.Collection; 022import java.util.EnumSet; 023import java.util.Iterator; 024import java.util.LinkedHashSet; 025import java.util.List; 026import java.util.Set; 027import java.util.concurrent.Callable; 028import java.util.concurrent.ExecutionException; 029import java.util.concurrent.Future; 030 031import org.apache.commons.lang3.ArrayUtils; 032import org.apache.commons.lang3.StringUtils; 033import org.apache.http.HttpEntity; 034import org.apache.http.client.methods.CloseableHttpResponse; 035import org.apache.http.client.methods.HttpPost; 036import org.apache.http.config.SocketConfig; 037import org.apache.http.impl.client.CloseableHttpClient; 038import org.apache.http.impl.client.HttpClientBuilder; 039import org.apache.http.impl.client.HttpClients; 040import org.apache.http.impl.conn.PoolingHttpClientConnectionManager; 041import org.apache.http.util.EntityUtils; 042import org.apache.log4j.Logger; 043 044import service.tut.pori.contentanalysis.AnalysisBackend; 045import service.tut.pori.contentanalysis.AnalysisBackend.Capability; 046import service.tut.pori.contentanalysis.BackendDAO; 047import service.tut.pori.contentanalysis.CAContentCore.ServiceType; 048import service.tut.pori.contentanalysis.Definitions; 049import service.tut.pori.contentanalysis.PhotoParameters.AnalysisType; 050import core.tut.pori.context.ServiceInitializer; 051import core.tut.pori.http.Response; 052import core.tut.pori.http.parameters.DataGroups; 053import core.tut.pori.http.parameters.Limits; 054import core.tut.pori.users.UserIdentity; 055import core.tut.pori.utils.XMLFormatter; 056 057/** 058 * Search task used to query back-ends for results. 059 * 060 * This class is not asynchronous and will block for the duration of execution, also this task cannot be submitted to system schedulers. 061 * 062 */ 063public class VideoSearchTask{ 064 /** Back-end capability required for search tasks */ 065 public static final Capability SEARCH_CAPABILITY = Capability.VIDEO_SEARCH; 066 private static final int CONNECTION_SOCKET_TIME_OUT = 10000; // connection socket time out in milliseconds 067 private static final XMLFormatter FORMATTER = new XMLFormatter(); 068 private static final Logger LOGGER = Logger.getLogger(VideoSearchTask.class); 069 private EnumSet<AnalysisType> _analysisTypes = null; 070 private UserIdentity _authenticatedUser = null; 071 private DataGroups _dataGroups = null; 072 private String _guid = null; 073 private Limits _limits = null; 074 private EnumSet<ServiceType> _serviceTypeFilter = null; 075 private long[] _userIdFilter = null; 076 077 /** 078 * Execute this task 079 * 080 * @return results of the search or null if no results 081 */ 082 public VideoList execute(){ 083 List<AnalysisBackend> backends = ServiceInitializer.getDAOHandler().getSQLDAO(BackendDAO.class).getBackends(SEARCH_CAPABILITY); 084 if(backends == null){ 085 LOGGER.warn("No backends with capability "+SEARCH_CAPABILITY.name()); 086 return null; 087 } 088 089 try (PoolingHttpClientConnectionManager cm = new PoolingHttpClientConnectionManager(); CloseableHttpClient client = HttpClientBuilder.create().setConnectionManager(cm).setDefaultSocketConfig(SocketConfig.custom().setSoTimeout(CONNECTION_SOCKET_TIME_OUT).build()).build()){ 090 ArrayList<Searcher> searchers = new ArrayList<>(backends.size()); 091 String baseParameterString = generateParameterString(); // create the default parameter string 092 for(AnalysisBackend end : backends){ 093 searchers.add(new Searcher(end, baseParameterString)); 094 } 095 096 List<Future<VideoList>> results = ServiceInitializer.getExecutorHandler().getExecutor().invokeAll(searchers); 097 List<VideoList> videoLists = new ArrayList<>(searchers.size()); 098 for(Future<VideoList> result : results){ 099 if(result.isDone() && !result.isCancelled()){ 100 try { 101 VideoList r = result.get(); 102 if(!VideoList.isEmpty(r)){ 103 videoLists.add(r); 104 } 105 } catch (ExecutionException ex) { 106 LOGGER.warn(ex, ex); 107 } 108 }else{ 109 LOGGER.debug("Ignoring unsuccessful result."); 110 } // for 111 } 112 if(videoLists.isEmpty()){ 113 LOGGER.debug("No search results."); 114 }else{ 115 VideoList videos = combineResults(_authenticatedUser, _dataGroups, _limits, videoLists); 116 return removeTargetVideo(videos); 117 } 118 } catch (IOException | InterruptedException ex) { 119 LOGGER.error(ex, ex); 120 } 121 return null; 122 } 123 124 /** 125 * Combine the given lists of videos. Non-existent and duplicate videos and videos which the user cannot access will be automatically removed. 126 * 127 * @param authenticatedUser 128 * @param dataGroups 129 * @param limits note that if there is a limit for maximum number of results, the videos for the combined list will be selected randomly from the given video lists 130 * @param videoLists the video lists, note that only GUIDs have any meaning, all other details will be retrieved from the database based on the given data groups. 131 * @return the given video lists combined into a single list, removing duplicates or null if no content or null collection was passed 132 */ 133 private VideoList combineResults(UserIdentity authenticatedUser, DataGroups dataGroups, Limits limits, Collection<VideoList> videoLists){ 134 if(videoLists == null || videoLists.isEmpty()){ 135 return null; 136 } 137 Set<String> guids = new LinkedHashSet<>(); 138 for(VideoList videoList : videoLists){ // collect all unique GUIDs. Note: this will prioritize the FASTER back-end, and does not guarantee that MORE APPLICABLE results appear first. This is because we have no easy (defined) way to decide which results are better than others. 139 for(Video video : videoList.getVideos()){ 140 guids.add(video.getGUID()); 141 } 142 } 143 144 if(!StringUtils.isBlank(_guid)){ // if search target is given, make sure it does not appear in the results 145 guids.remove(_guid); 146 } 147 148 VideoList results = ServiceInitializer.getDAOHandler().getSolrDAO(VideoDAO.class).search(authenticatedUser, dataGroups, guids, limits, null, null, null); // the back-ends are not guaranteed to return results the user has permissions to view, so re-search everything just in case 149 if(VideoList.isEmpty(results)){ 150 LOGGER.debug("No videos found."); 151 return null; 152 } 153 154 List<Video> sorted = new ArrayList<>(guids.size()); 155 for(String guid : guids){ // sort the results back to the original order 156 Video video = results.getVideo(guid); 157 if(video == null){ 158 LOGGER.warn("Ignored video with non-existing GUID: "+guid); 159 }else{ 160 sorted.add(video); 161 } 162 } 163 results.setVideos(sorted); 164 return results; 165 } 166 167 /** 168 * Removes the video with the given search term (GUID), if GUID based search was performed. 169 * 170 * @param videos the list from which the reference video is to be removed. 171 * @return the list with the reference video removed or null if nothing was left after removal 172 */ 173 private VideoList removeTargetVideo(VideoList videos){ 174 if(VideoList.isEmpty(videos)){ 175 LOGGER.debug("Empty video list."); 176 return null; 177 } 178 if(_guid != null){ 179 LOGGER.debug("Removing target video, if present in the search results..."); 180 for(Iterator<Video> iter = videos.getVideos().iterator();iter.hasNext();){ 181 if(iter.next().getGUID().equals(_guid)){ 182 iter.remove(); 183 } 184 } 185 } // if 186 187 if(VideoList.isEmpty(videos)){ 188 LOGGER.debug("No videos were left in the list, returning null.."); 189 return null; 190 }else{ 191 return videos; 192 } 193 } 194 195 /** 196 * Create new search task using the GUID as a reference, the GUID is assumed to exist, no database lookup is going to be done to confirm it 197 * 198 * @param authenticatedUser optional authenticated user 199 * @param analysisTypes 200 * @param dataGroups optional data groups 201 * @param guid 202 * @param limits optional limits 203 * @param serviceTypeFilter optional service type filter 204 * @param userIdFilter optional user id filter, if given only the content for the specific users will searched for 205 * @return new task 206 * @throws IllegalArgumentException on bad GUID 207 */ 208 public static VideoSearchTask getTaskByGUID(UserIdentity authenticatedUser, EnumSet<AnalysisType> analysisTypes, DataGroups dataGroups, String guid, Limits limits, EnumSet<ServiceType> serviceTypeFilter, long[] userIdFilter) throws IllegalArgumentException{ 209 if(StringUtils.isBlank(guid)){ 210 throw new IllegalArgumentException("GUID was null or empty."); 211 } 212 VideoSearchTask task = new VideoSearchTask(authenticatedUser, analysisTypes, dataGroups, limits, serviceTypeFilter, userIdFilter); 213 task._guid = guid; 214 return task; 215 } 216 217 /** 218 * 219 * @param authenticatedUser 220 * @param analysisTypes 221 * @param dataGroups 222 * @param limits 223 * @param serviceTypeFilter 224 * @param userIdFilter 225 */ 226 private VideoSearchTask(UserIdentity authenticatedUser, EnumSet<AnalysisType> analysisTypes, DataGroups dataGroups, Limits limits, EnumSet<ServiceType> serviceTypeFilter, long[] userIdFilter){ 227 if(UserIdentity.isValid(authenticatedUser)){ 228 _authenticatedUser = authenticatedUser; 229 LOGGER.debug("Searching with authenticated user, id: "+authenticatedUser.getUserId()); 230 }else{ 231 LOGGER.debug("No authenticated user."); 232 } 233 234 if(!DataGroups.isEmpty(dataGroups)){ 235 _dataGroups = dataGroups; 236 } 237 238 _limits = limits; 239 240 if(serviceTypeFilter != null && !serviceTypeFilter.isEmpty()){ 241 LOGGER.debug("Using service type filter..."); 242 _serviceTypeFilter = serviceTypeFilter; 243 } 244 245 if(!ArrayUtils.isEmpty(userIdFilter)){ 246 LOGGER.debug("Using user id filter..."); 247 _userIdFilter = userIdFilter; 248 } 249 250 if(analysisTypes != null && !analysisTypes.isEmpty()){ 251 LOGGER.debug("Analysis types specified..."); 252 _analysisTypes = analysisTypes; 253 } 254 } 255 256 /** 257 * Helper method for creating the default parameter string: filters, limits & data groups 258 * 259 * @return parameter string or null if none 260 */ 261 private String generateParameterString(){ 262 StringBuilder sb = new StringBuilder(); 263 if(_userIdFilter != null){ 264 sb.append(core.tut.pori.http.Definitions.SEPARATOR_URI_QUERY_PARAMS+service.tut.pori.users.Definitions.PARAMETER_USER_ID+core.tut.pori.http.Definitions.SEPARATOR_URI_QUERY_PARAM_VALUE_SEPARATOR); 265 core.tut.pori.utils.StringUtils.append(sb, _userIdFilter, core.tut.pori.http.Definitions.SEPARATOR_URI_QUERY_PARAM_VALUES); 266 } 267 268 if(_serviceTypeFilter != null){ 269 sb.append(core.tut.pori.http.Definitions.SEPARATOR_URI_QUERY_PARAMS+service.tut.pori.contentanalysis.Definitions.PARAMETER_SERVICE_ID+core.tut.pori.http.Definitions.SEPARATOR_URI_QUERY_PARAM_VALUE_SEPARATOR); 270 core.tut.pori.utils.StringUtils.append(sb, ServiceType.toIdArray(_serviceTypeFilter), core.tut.pori.http.Definitions.SEPARATOR_URI_QUERY_PARAM_VALUES); 271 } 272 273 if(_limits != null){ 274 sb.append(core.tut.pori.http.Definitions.SEPARATOR_URI_QUERY_PARAMS+Limits.PARAMETER_DEFAULT_NAME+core.tut.pori.http.Definitions.SEPARATOR_URI_QUERY_PARAM_VALUE_SEPARATOR); 275 sb.append(_limits.toLimitString()); 276 } 277 278 if(_analysisTypes != null){ 279 sb.append(core.tut.pori.http.Definitions.SEPARATOR_URI_QUERY_PARAMS+Definitions.PARAMETER_ANALYSIS_TYPE+core.tut.pori.http.Definitions.SEPARATOR_URI_QUERY_PARAM_VALUE_SEPARATOR); 280 Iterator<AnalysisType> iter = _analysisTypes.iterator(); 281 sb.append(iter.next().toAnalysisTypeString()); 282 while(iter.hasNext()){ 283 sb.append(core.tut.pori.http.Definitions.SEPARATOR_URI_QUERY_PARAM_VALUES); 284 sb.append(iter.next().toAnalysisTypeString()); 285 } 286 } 287 288 if(sb.length() < 1){ 289 return null; 290 }else{ 291 return sb.toString(); 292 } 293 } 294 295 /** 296 * Internal class, which executes the search queries to back-ends. 297 * 298 */ 299 private class Searcher implements Callable<VideoList>{ 300 private AnalysisBackend _backend = null; 301 private String _baseParameterString = null; 302 303 /** 304 * 305 * @param end 306 * @param baseParameterString 307 */ 308 public Searcher(AnalysisBackend end, String baseParameterString){ 309 _backend = end; 310 _baseParameterString = baseParameterString; 311 } 312 313 @Override 314 public VideoList call() throws Exception { 315 StringBuilder uri = new StringBuilder(_backend.getAnalysisUri()); 316 uri.append(Definitions.METHOD_SEARCH_SIMILAR_BY_ID+"?"+service.tut.pori.contentanalysis.Definitions.PARAMETER_GUID+"="); 317 uri.append(_guid); 318 319 if(_baseParameterString != null){ 320 uri.append(_baseParameterString); 321 } 322 String url = uri.toString(); 323 LOGGER.debug("Calling URL: "+url); 324 try (CloseableHttpClient client = HttpClients.createDefault(); CloseableHttpResponse response = client.execute(new HttpPost(url))) { 325 HttpEntity entity = response.getEntity(); 326 try (InputStream content = entity.getContent()) { 327 Response r = FORMATTER.toResponse(content, VideoList.class); 328 if(r == null){ 329 LOGGER.warn("No results returned by backend, id: "+_backend.getBackendId()); 330 }else{ 331 return (VideoList) r.getResponseData(); 332 } 333 } finally { 334 EntityUtils.consume(entity); 335 } // try content 336 } // try response 337 return null; 338 } 339 340 } // class Searcher 341}