001/**
002 * Copyright 2014 Tampere University of Technology, Pori Department
003 * 
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 * 
008 *   http://www.apache.org/licenses/LICENSE-2.0
009 * 
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016package service.tut.pori.contentanalysis.video;
017
018import java.io.IOException;
019import java.io.InputStream;
020import java.util.ArrayList;
021import java.util.Collection;
022import java.util.EnumSet;
023import java.util.Iterator;
024import java.util.LinkedHashSet;
025import java.util.List;
026import java.util.Set;
027import java.util.concurrent.Callable;
028import java.util.concurrent.ExecutionException;
029import java.util.concurrent.Future;
030
031import org.apache.commons.lang3.ArrayUtils;
032import org.apache.commons.lang3.StringUtils;
033import org.apache.http.HttpEntity;
034import org.apache.http.client.methods.CloseableHttpResponse;
035import org.apache.http.client.methods.HttpPost;
036import org.apache.http.config.SocketConfig;
037import org.apache.http.impl.client.CloseableHttpClient;
038import org.apache.http.impl.client.HttpClientBuilder;
039import org.apache.http.impl.client.HttpClients;
040import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
041import org.apache.http.util.EntityUtils;
042import org.apache.log4j.Logger;
043
044import service.tut.pori.contentanalysis.AnalysisBackend;
045import service.tut.pori.contentanalysis.AnalysisBackend.Capability;
046import service.tut.pori.contentanalysis.BackendDAO;
047import service.tut.pori.contentanalysis.CAContentCore.ServiceType;
048import service.tut.pori.contentanalysis.Definitions;
049import service.tut.pori.contentanalysis.PhotoParameters.AnalysisType;
050import core.tut.pori.context.ServiceInitializer;
051import core.tut.pori.http.Response;
052import core.tut.pori.http.parameters.DataGroups;
053import core.tut.pori.http.parameters.Limits;
054import core.tut.pori.users.UserIdentity;
055import core.tut.pori.utils.XMLFormatter;
056
057/**
058 * Search task used to query back-ends for results. 
059 * 
060 * This class is not asynchronous and will block for the duration of execution, also this task cannot be submitted to system schedulers.
061 * 
062 */
063public class VideoSearchTask{
064  /** Back-end capability required for search tasks */
065  public static final Capability SEARCH_CAPABILITY = Capability.VIDEO_SEARCH;
066  private static final int CONNECTION_SOCKET_TIME_OUT = 10000;  // connection socket time out in milliseconds
067  private static final XMLFormatter FORMATTER = new XMLFormatter();
068  private static final Logger LOGGER = Logger.getLogger(VideoSearchTask.class);
069  private EnumSet<AnalysisType> _analysisTypes = null;
070  private UserIdentity _authenticatedUser = null;
071  private DataGroups _dataGroups = null;
072  private String _guid = null;
073  private Limits _limits = null;
074  private EnumSet<ServiceType> _serviceTypeFilter = null;
075  private long[] _userIdFilter = null;
076  
077  /**
078   * Execute this task
079   * 
080   * @return results of the search or null if no results
081   */
082  public VideoList execute(){
083    List<AnalysisBackend> backends = ServiceInitializer.getDAOHandler().getSQLDAO(BackendDAO.class).getBackends(SEARCH_CAPABILITY);
084    if(backends == null){
085      LOGGER.warn("No backends with capability "+SEARCH_CAPABILITY.name());
086      return null;
087    }
088    
089    try (PoolingHttpClientConnectionManager cm = new PoolingHttpClientConnectionManager(); CloseableHttpClient client =  HttpClientBuilder.create().setConnectionManager(cm).setDefaultSocketConfig(SocketConfig.custom().setSoTimeout(CONNECTION_SOCKET_TIME_OUT).build()).build()){
090      ArrayList<Searcher> searchers = new ArrayList<>(backends.size());
091      String baseParameterString = generateParameterString(); // create the default parameter string
092      for(AnalysisBackend end : backends){
093        searchers.add(new Searcher(end, baseParameterString));
094      }
095      
096      List<Future<VideoList>> results = ServiceInitializer.getExecutorHandler().getExecutor().invokeAll(searchers);
097      List<VideoList> videoLists = new ArrayList<>(searchers.size());
098      for(Future<VideoList> result : results){
099        if(result.isDone() && !result.isCancelled()){
100          try {
101            VideoList r = result.get();
102            if(!VideoList.isEmpty(r)){
103              videoLists.add(r);
104            }
105          } catch (ExecutionException ex) {
106            LOGGER.warn(ex, ex);
107          }
108        }else{
109          LOGGER.debug("Ignoring unsuccessful result.");
110        } // for
111      }
112      if(videoLists.isEmpty()){
113        LOGGER.debug("No search results.");
114      }else{
115        VideoList videos = combineResults(_authenticatedUser, _dataGroups, _limits, videoLists);
116        return removeTargetVideo(videos);
117      }
118    } catch (IOException | InterruptedException ex) {
119      LOGGER.error(ex, ex);
120    }
121    return null;
122  }
123  
124  /**
125   * Combine the given lists of videos. Non-existent and duplicate videos and videos which the user cannot access will be automatically removed.
126   * 
127   * @param authenticatedUser
128   * @param dataGroups
129   * @param limits note that if there is a limit for maximum number of results, the videos for the combined list will be selected randomly from the given video lists
130   * @param videoLists the video lists, note that only GUIDs have any meaning, all other details will be retrieved from the database based on the given data groups.
131   * @return the given video lists combined into a single list, removing duplicates or null if no content or null collection was passed
132   */
133  private VideoList combineResults(UserIdentity authenticatedUser, DataGroups dataGroups, Limits limits, Collection<VideoList> videoLists){
134    if(videoLists == null || videoLists.isEmpty()){
135      return null;
136    }
137    Set<String> guids = new LinkedHashSet<>();
138    for(VideoList videoList : videoLists){  // collect all unique GUIDs. Note: this will prioritize the FASTER back-end, and does not guarantee that MORE APPLICABLE results appear first. This is because we have no easy (defined) way to decide which results are better than others.
139      for(Video video : videoList.getVideos()){
140        guids.add(video.getGUID());
141      }
142    }
143    
144    if(!StringUtils.isBlank(_guid)){ // if search target is given, make sure it does not appear in the results
145      guids.remove(_guid);
146    }
147    
148    VideoList results = ServiceInitializer.getDAOHandler().getSolrDAO(VideoDAO.class).search(authenticatedUser, dataGroups, guids, limits, null, null, null); // the back-ends are not guaranteed to return results the user has permissions to view, so re-search everything just in case
149    if(VideoList.isEmpty(results)){
150      LOGGER.debug("No videos found.");
151      return null;
152    }
153    
154    List<Video> sorted = new ArrayList<>(guids.size());
155    for(String guid : guids){ // sort the results back to the original order
156      Video video = results.getVideo(guid);
157      if(video == null){
158        LOGGER.warn("Ignored video with non-existing GUID: "+guid);
159      }else{
160        sorted.add(video);
161      }
162    }
163    results.setVideos(sorted);
164    return results;
165  }
166  
167  /**
168   * Removes the video with the given search term (GUID), if GUID based search was performed.
169   * 
170   * @param videos the list from which the reference video is to be removed.
171   * @return the list with the reference video removed or null if nothing was left after removal
172   */
173  private VideoList removeTargetVideo(VideoList videos){
174    if(VideoList.isEmpty(videos)){
175      LOGGER.debug("Empty video list.");
176      return null;
177    }
178    if(_guid != null){
179      LOGGER.debug("Removing target video, if present in the search results...");
180      for(Iterator<Video> iter = videos.getVideos().iterator();iter.hasNext();){
181        if(iter.next().getGUID().equals(_guid)){
182          iter.remove();
183        }
184      }
185    } // if
186    
187    if(VideoList.isEmpty(videos)){
188      LOGGER.debug("No videos were left in the list, returning null..");
189      return null;
190    }else{
191      return videos;
192    }
193  }
194  
195  /**
196   * Create new search task using the GUID as a reference, the GUID is assumed to exist, no database lookup is going to be done to confirm it
197   * 
198   * @param authenticatedUser optional authenticated user
199   * @param analysisTypes 
200   * @param dataGroups optional data groups
201   * @param guid
202   * @param limits optional limits
203   * @param serviceTypeFilter optional service type filter
204   * @param userIdFilter optional user id filter, if given only the content for the specific users will searched for
205   * @return new task
206   * @throws IllegalArgumentException on bad GUID
207   */
208  public static VideoSearchTask getTaskByGUID(UserIdentity authenticatedUser, EnumSet<AnalysisType> analysisTypes, DataGroups dataGroups, String guid, Limits limits, EnumSet<ServiceType> serviceTypeFilter, long[] userIdFilter) throws IllegalArgumentException{
209    if(StringUtils.isBlank(guid)){
210      throw new IllegalArgumentException("GUID was null or empty.");
211    }
212    VideoSearchTask task = new VideoSearchTask(authenticatedUser, analysisTypes, dataGroups, limits, serviceTypeFilter, userIdFilter);
213    task._guid = guid;
214    return task;
215  }
216  
217  /**
218   * 
219   * @param authenticatedUser
220   * @param analysisTypes 
221   * @param dataGroups
222   * @param limits
223   * @param serviceTypeFilter
224   * @param userIdFilter
225   */
226  private VideoSearchTask(UserIdentity authenticatedUser, EnumSet<AnalysisType> analysisTypes, DataGroups dataGroups, Limits limits, EnumSet<ServiceType> serviceTypeFilter, long[] userIdFilter){
227    if(UserIdentity.isValid(authenticatedUser)){
228      _authenticatedUser = authenticatedUser;
229      LOGGER.debug("Searching with authenticated user, id: "+authenticatedUser.getUserId());
230    }else{
231      LOGGER.debug("No authenticated user.");
232    }
233    
234    if(!DataGroups.isEmpty(dataGroups)){
235      _dataGroups = dataGroups;
236    }
237    
238    _limits = limits;
239    
240    if(serviceTypeFilter != null && !serviceTypeFilter.isEmpty()){
241      LOGGER.debug("Using service type filter...");
242      _serviceTypeFilter = serviceTypeFilter;
243    }
244    
245    if(!ArrayUtils.isEmpty(userIdFilter)){
246      LOGGER.debug("Using user id filter...");
247      _userIdFilter = userIdFilter;
248    }
249    
250    if(analysisTypes != null && !analysisTypes.isEmpty()){
251      LOGGER.debug("Analysis types specified...");
252      _analysisTypes = analysisTypes;
253    }
254  }
255  
256  /**
257   * Helper method for creating the default parameter string: filters, limits & data groups
258   * 
259   * @return parameter string or null if none
260   */
261  private String generateParameterString(){
262    StringBuilder sb = new StringBuilder();
263    if(_userIdFilter != null){
264      sb.append(core.tut.pori.http.Definitions.SEPARATOR_URI_QUERY_PARAMS+service.tut.pori.users.Definitions.PARAMETER_USER_ID+core.tut.pori.http.Definitions.SEPARATOR_URI_QUERY_PARAM_VALUE_SEPARATOR);
265      core.tut.pori.utils.StringUtils.append(sb, _userIdFilter, core.tut.pori.http.Definitions.SEPARATOR_URI_QUERY_PARAM_VALUES);
266    }
267    
268    if(_serviceTypeFilter != null){
269      sb.append(core.tut.pori.http.Definitions.SEPARATOR_URI_QUERY_PARAMS+service.tut.pori.contentanalysis.Definitions.PARAMETER_SERVICE_ID+core.tut.pori.http.Definitions.SEPARATOR_URI_QUERY_PARAM_VALUE_SEPARATOR);
270      core.tut.pori.utils.StringUtils.append(sb, ServiceType.toIdArray(_serviceTypeFilter), core.tut.pori.http.Definitions.SEPARATOR_URI_QUERY_PARAM_VALUES);
271    }
272    
273    if(_limits != null){
274      sb.append(core.tut.pori.http.Definitions.SEPARATOR_URI_QUERY_PARAMS+Limits.PARAMETER_DEFAULT_NAME+core.tut.pori.http.Definitions.SEPARATOR_URI_QUERY_PARAM_VALUE_SEPARATOR);
275      sb.append(_limits.toLimitString());     
276    }
277    
278    if(_analysisTypes != null){
279      sb.append(core.tut.pori.http.Definitions.SEPARATOR_URI_QUERY_PARAMS+Definitions.PARAMETER_ANALYSIS_TYPE+core.tut.pori.http.Definitions.SEPARATOR_URI_QUERY_PARAM_VALUE_SEPARATOR);
280      Iterator<AnalysisType> iter = _analysisTypes.iterator();
281      sb.append(iter.next().toAnalysisTypeString());
282      while(iter.hasNext()){
283        sb.append(core.tut.pori.http.Definitions.SEPARATOR_URI_QUERY_PARAM_VALUES);
284        sb.append(iter.next().toAnalysisTypeString());
285      }
286    }
287    
288    if(sb.length() < 1){
289      return null;
290    }else{
291      return sb.toString();
292    }
293  }
294
295  /**
296   * Internal class, which executes the search queries to back-ends.
297   *
298   */
299  private class Searcher implements Callable<VideoList>{
300    private AnalysisBackend _backend = null;
301    private String _baseParameterString = null;
302    
303    /**
304     * 
305     * @param end
306     * @param baseParameterString 
307     */
308    public Searcher(AnalysisBackend end, String baseParameterString){
309      _backend = end;
310      _baseParameterString = baseParameterString;
311    }
312
313    @Override
314    public VideoList call() throws Exception {
315      StringBuilder uri = new StringBuilder(_backend.getAnalysisUri());
316      uri.append(Definitions.METHOD_SEARCH_SIMILAR_BY_ID+"?"+service.tut.pori.contentanalysis.Definitions.PARAMETER_GUID+"=");
317      uri.append(_guid);
318      
319      if(_baseParameterString != null){
320        uri.append(_baseParameterString);
321      } 
322      String url = uri.toString();
323      LOGGER.debug("Calling URL: "+url);
324      try (CloseableHttpClient client = HttpClients.createDefault(); CloseableHttpResponse response = client.execute(new HttpPost(url))) {
325        HttpEntity entity = response.getEntity();
326        try (InputStream content = entity.getContent()) {
327          Response r = FORMATTER.toResponse(content, VideoList.class);
328          if(r == null){
329            LOGGER.warn("No results returned by backend, id: "+_backend.getBackendId());
330          }else{
331            return (VideoList) r.getResponseData();
332          }
333        } finally {
334          EntityUtils.consume(entity);
335        } // try content
336      } // try response
337      return null;
338    }
339    
340  } //  class Searcher
341}