001/**
002 * Copyright 2014 Tampere University of Technology, Pori Department
003 * 
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 * 
008 *   http://www.apache.org/licenses/LICENSE-2.0
009 * 
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016package service.tut.pori.contentanalysis;
017
018import java.io.IOException;
019import java.util.Collection;
020import java.util.Iterator;
021import java.util.Set;
022
023import javax.xml.bind.annotation.XmlEnum;
024import javax.xml.bind.annotation.XmlEnumValue;
025
026import org.apache.commons.lang3.StringUtils;
027import org.apache.http.client.methods.HttpPost;
028import org.apache.http.entity.StringEntity;
029import org.apache.http.impl.client.BasicResponseHandler;
030import org.apache.http.impl.client.CloseableHttpClient;
031import org.apache.http.impl.client.HttpClients;
032import org.apache.log4j.Logger;
033import org.quartz.Job;
034import org.quartz.JobDataMap;
035import org.springframework.context.ApplicationEvent;
036
037import service.tut.pori.contentanalysis.AnalysisBackend.Capability;
038import core.tut.pori.utils.XMLFormatter;
039
040/**
041 * Base class for tasks. There is a default implementation to call addTask, which should suffice in many cases.
042 */
043public abstract class AsyncTask implements Job{
044  /** Job data key, content type is long */
045  private static final String JOB_DATA_TASK_ID = "taskId";
046  private static final Logger LOGGER = Logger.getLogger(AsyncTask.class);
047  private static final String TASKSTATUS_COMPLETED = "COMPLETED";
048  private static final String TASKSTATUS_ERROR = "ERROR";
049  private static final String TASKSTATUS_EXECUTING = "EXECUTING";
050  private static final String TASKSTATUS_NOT_STARTED = "NOT_STARTED";
051  private static final String TASKSTATUS_PENDING = "PENDING";
052  private static final String TASKSTATUS_UNKNOWN = "UNKNOWN";
053  private static final String TASKTYPE_ANALYSIS = "ANALYSIS";
054  private static final String TASK_TYPE_BACKEND_FEEDBACK = "BACKEND_FEEDBACK";
055  private static final String TASKTYPE_FACEBOOK_PROFILE_SUMMARIZATION = "FACEBOOK_PROFILE_SUMMARIZATION";
056  private static final String TASK_TYPE_FACEBOOK_PROFILE_SUMMARIZATION_FEEDBACK = "FACEBOOK_PROFILE_SUMMARIZATION_FEEDBACK";
057  private static final String TASK_TYPE_TWITTER_PROFILE_SUMMARIZATION = "TWITTER_PROFILE_SUMMARIZATION";
058  private static final String TASKTYPE_FEEDBACK = "FEEDBACK";
059  private static final String TASKTYPE_SEARCH = "SEARCH";
060  private static final String TASKTYPE_UNDEFINED = "UNDEFINED";
061  
062  /**
063   * The status of the task.
064   */
065  @XmlEnum
066  public enum TaskStatus{
067    /** unknown or unspecified task status */
068    @XmlEnumValue(value = TASKSTATUS_UNKNOWN)
069    UNKNOWN(0),
070    /** task has been created, but back-ends have not yet started to process it, or the the task has not been delivered to back-ends */
071    @XmlEnumValue(value = TASKSTATUS_NOT_STARTED)
072    NOT_STARTED(1),
073    /** task has been delivered to back-end, but the analysis has not yet started */
074    @XmlEnumValue(value = TASKSTATUS_PENDING)
075    PENDING(2),
076    /** the task is being executed */
077    @XmlEnumValue(value = TASKSTATUS_EXECUTING)
078    EXECUTING(3),
079    /** task has completed */
080    @XmlEnumValue(value = TASKSTATUS_COMPLETED)
081    COMPLETED(4),
082    /** an error condition has prevented to execution of the task */
083    @XmlEnumValue(value = TASKSTATUS_ERROR)
084    ERROR(5);
085
086    private int _value;
087
088
089    /**
090     * 
091     * @param value
092     */
093    private TaskStatus(int value){
094      _value = value;
095    }
096
097
098    /**
099     * 
100     * @return TaskStatus as integer
101     */
102    public int toInt(){
103      return _value;
104    }
105
106    /**
107     * 
108     * @param value
109     * @return the value converted to TaskStatus
110     * @throws IllegalArgumentException on bad input
111     */
112    public static TaskStatus fromInt(int value) throws IllegalArgumentException{
113      for(TaskStatus s : TaskStatus.values()){
114        if(s._value == value){
115          return s;
116        }
117      }
118      throw new IllegalArgumentException("Bad "+TaskStatus.class.toString()+" : "+value);
119    }
120
121    /**
122     * 
123     * @param statusList
124     * @return the combined status for the list of status codes or null if null or empty list was passed
125     */
126    public static TaskStatus getCombinedTaskStatus(Collection<TaskStatus> statusList) {
127      if(statusList == null || statusList.size() < 1){
128        return null;
129      }
130      TaskStatus status = UNKNOWN;
131      for(Iterator<TaskStatus> iter = statusList.iterator();iter.hasNext();){
132        switch(iter.next()){
133          case COMPLETED:
134            if(status == NOT_STARTED){
135              status = PENDING;
136            }else if(status!=PENDING && status!=EXECUTING && status!=COMPLETED){
137              status = COMPLETED;
138            }
139            break;
140          case ERROR:
141            return ERROR;
142          case EXECUTING:
143            status = EXECUTING;
144            break;
145          case NOT_STARTED:
146            if(status != PENDING && status != EXECUTING)
147              status = NOT_STARTED;
148            break;
149          case PENDING:
150            status = PENDING;
151            break;
152          case UNKNOWN:
153            LOGGER.debug("Unknown Task Status detected.");
154            break;
155          default:
156            LOGGER.error("Unhandled "+TaskStatus.class.toString());
157            break;
158        }
159      }
160      return status;
161    }
162  }  // enum TaskStatus
163
164
165  /**
166   * The type of the task.
167   * 
168   * New task types cannot be defined by the services. 
169   * If new task type is required one option is to use {@link service.tut.pori.contentanalysis.AsyncTask.TaskType#UNDEFINED} and add the proper task type as a metadata for the task.
170   */
171  @XmlEnum
172  public enum TaskType{
173    /** task type is unknown or undefined */
174    @XmlEnumValue(value = TASKTYPE_UNDEFINED)
175    UNDEFINED(0),
176    /** media analysis task */
177    @XmlEnumValue(value = TASKTYPE_ANALYSIS)
178    ANALYSIS(1),
179    /** search task */
180    @XmlEnumValue(value = TASKTYPE_SEARCH)
181    SEARCH(2),
182    /** Feedback task. This can either be direct or indirect user feedback (e.g. deleted content) */
183    @XmlEnumValue(value = TASKTYPE_FEEDBACK)
184    FEEDBACK(3), // user feedback
185    /** facebook user profile summarization */
186    @XmlEnumValue(value = TASKTYPE_FACEBOOK_PROFILE_SUMMARIZATION)
187    FACEBOOK_PROFILE_SUMMARIZATION(4),
188    /** user feedback for facebook profile summarization */
189    @XmlEnumValue(value = TASK_TYPE_FACEBOOK_PROFILE_SUMMARIZATION_FEEDBACK)
190    FACEBOOK_PROFILE_SUMMARIZATION_FEEDBACK(5),
191    /** twitter user profile summarization */
192    @XmlEnumValue(value = TASK_TYPE_TWITTER_PROFILE_SUMMARIZATION)
193    TWITTER_PROFILE_SUMMARIZATION(6),
194    /** feedback generated based on results received from back-ends. This task is always targeted to other back-ends. */
195    @XmlEnumValue(value = TASK_TYPE_BACKEND_FEEDBACK)
196    BACKEND_FEEDBACK(7);
197
198    private int _value;
199
200
201    /**
202     * 
203     * @param value
204     */
205    private TaskType(int value){
206      _value = value;
207    }
208
209
210    /**
211     * 
212     * @return TaskType as integer
213     */
214    public int toInt(){
215      return _value;
216    }
217
218    /**
219     * 
220     * @param value
221     * @return the value converted to TaskType
222     * @throws IllegalArgumentException on bad input
223     */
224    public static TaskType fromInt(int value) throws IllegalArgumentException{
225      for(TaskType s : TaskType.values()){
226        if(s._value == value){
227          return s;
228        }
229      }
230      throw new IllegalArgumentException("Bad "+TaskType.class.toString()+" : "+value);
231    }
232
233    /**
234     * 
235     * @param value
236     * @return the value converted to TaskType
237     * @throws IllegalArgumentException on bad input
238     */
239    public static TaskType fromString(String value) throws IllegalArgumentException {
240      if(!StringUtils.isBlank(value)){
241        switch(value.toUpperCase()){
242          case TASKTYPE_ANALYSIS:
243            return TaskType.ANALYSIS;
244          case TASK_TYPE_BACKEND_FEEDBACK:
245            return TaskType.BACKEND_FEEDBACK;
246          case TASKTYPE_FACEBOOK_PROFILE_SUMMARIZATION:
247            return FACEBOOK_PROFILE_SUMMARIZATION;
248          case TASK_TYPE_FACEBOOK_PROFILE_SUMMARIZATION_FEEDBACK:
249            return FACEBOOK_PROFILE_SUMMARIZATION_FEEDBACK;
250          case TASK_TYPE_TWITTER_PROFILE_SUMMARIZATION:
251            return TWITTER_PROFILE_SUMMARIZATION;
252          case TASKTYPE_FEEDBACK:
253            return FEEDBACK;
254          case TASKTYPE_SEARCH:
255            return SEARCH;
256          case TASKTYPE_UNDEFINED:
257            return UNDEFINED;
258          default:
259            break;
260        }
261      }
262      throw new IllegalArgumentException("Bad "+TaskType.class.toString()+" : "+value);
263    }
264  } //  enum TaskType
265
266  /**
267   * Default implementation for addTask.
268   * 
269   * This will:
270   * - use the default TaskDAO to retrieve the back-ends associated with the task
271   * - use the default TaskDAO to retrieve the task details for the task
272   * - call each backend's addTask method with default parameters
273   * - use the default TaskDAO to update the back-end details for the associated back-ends
274   * 
275   * @param requiredCapabilities capabilities required for the participating back-ends, all back-ends for the task not having ALL of the capabilities are ignored. If null no check is performed.
276   * @param taskDAO used to resolve task details with taskDAO.getTask(backendId, dataGroups, limits, taskId) with limit parameter null
277   * @param taskId
278   */
279  protected void executeAddTask(Set<Capability> requiredCapabilities, TaskDAO taskDAO, Long taskId) {
280    try{
281      BackendStatusList backends = taskDAO.getBackendStatus(taskId, TaskStatus.NOT_STARTED);
282      if(BackendStatusList.isEmpty(backends)){
283        LOGGER.warn("No analysis back-ends available for taskId: "+taskId+" with status "+TaskStatus.NOT_STARTED.name());
284        return;
285      }
286      
287      if(requiredCapabilities != null && !requiredCapabilities.isEmpty()){
288        backends = BackendStatusList.getBackendStatusList(backends.getBackendStatuses(requiredCapabilities)); // filter back-ends
289        if(BackendStatusList.isEmpty(backends)){
290          LOGGER.warn("Aborting execute... no back-end given with required capabilities for task, id: "+taskId);
291          return;
292        }
293      }else{
294        LOGGER.debug("Ignoring capability check...");
295      }
296
297      try (CloseableHttpClient client = HttpClients.createDefault()) {
298        BasicResponseHandler h = new BasicResponseHandler();
299        for(BackendStatus status : backends.getBackendStatuses()){              
300          AnalysisBackend end = status.getBackend();
301          Integer backendId = end.getBackendId();
302          AbstractTaskDetails details = taskDAO.getTask(backendId, end.getDefaultTaskDataGroups(), null, taskId);
303          if(details == null){
304            LOGGER.warn("Task, id: "+taskId+" does not exist for backend, id: "+backendId);
305            continue;
306          }
307
308          if(details.getUserId() == null && !end.hasCapability(Capability.ANONYMOUS_TASK)){
309            LOGGER.warn("backendId: "+backendId+" cannot process anonymous tasks. Ignoring backend...");
310            continue;
311          }
312
313          try {
314            String url = end.getAnalysisUri()+Definitions.METHOD_ADD_TASK;
315            LOGGER.debug("Task, id: "+taskId+" of type "+details.getTaskType().name()+", back-end id: "+backendId+". Sending "+Definitions.METHOD_ADD_TASK+" to URL: "+url);
316            HttpPost taskRequest = new HttpPost(url);
317            details.setBackendId(backendId);
318            taskRequest.setHeader("Content-Type", "text/xml; charset=UTF-8");
319            taskRequest.setEntity(new StringEntity((new XMLFormatter()).toString(details), core.tut.pori.http.Definitions.ENCODING_UTF8));
320
321            LOGGER.debug("Backend with id: "+backendId+" responded "+client.execute(taskRequest,h));
322
323            status.setStatus(TaskStatus.EXECUTING); //updates the status of the task for this back-end
324          } catch (IOException ex) {
325            LOGGER.warn(ex, ex);
326          }
327        }  // for
328      } catch (IOException ex) {
329        LOGGER.error(ex, ex);
330      }
331      taskDAO.updateTaskStatus(backends, taskId);
332    }catch (Throwable ex) { // catch all exceptions to prevent re-scheduling on error
333      LOGGER.error(ex, ex);
334    }
335  }  // run
336
337  /**
338   * 
339   * @param map non-null map
340   * @param taskId if null or empty, previous value will be removed
341   */
342  public static void setTaskId(JobDataMap map, Long taskId){
343    if(taskId == null){
344      map.remove(JOB_DATA_TASK_ID);
345    }else{
346      map.put(JOB_DATA_TASK_ID, taskId);
347    }
348  }
349
350  /**
351   * 
352   * @param map
353   * @return the taskId or null if not found or the map was null
354   */
355  public static Long getTaskId(JobDataMap map){
356    if(map == null || !map.containsKey(JOB_DATA_TASK_ID)){
357      return null;
358    }else{
359      return map.getLong(JOB_DATA_TASK_ID);
360    }
361  }
362  
363  /**
364   * An application event used to notify listeners about progress or change in status of an ASyncTask execution.
365   *
366   */
367  public static class AsyncTaskEvent extends ApplicationEvent{
368    /** serial version id */
369    private static final long serialVersionUID = 2342360048902381597L;
370    private Integer _backendId = null;
371    private Class<?> _source = null;
372    private TaskStatus _status = null;
373    private Long _taskId = null;
374    private TaskType _taskType = null;
375
376    /**
377     * 
378     * @param source
379     */
380    public AsyncTaskEvent(Class<?> source) {
381      super(source);
382      _source = source;
383    }
384    
385    /**
386     * 
387     * @param source
388     * @param status
389     * @param taskId
390     * @param taskType
391     */
392    public AsyncTaskEvent(Class<?> source, TaskStatus status, Long taskId, TaskType taskType) {
393      super(source);
394      _status = status;
395      _taskId = taskId;
396      _taskType = taskType;
397    }
398    
399    /**
400     * 
401     * @param backendId
402     * @param source
403     * @param status
404     * @param taskId
405     * @param taskType
406     */
407    public AsyncTaskEvent(Integer backendId, Class<?> source, TaskStatus status, Long taskId, TaskType taskType) {
408      super(source);
409      _backendId = backendId;
410      _status = status;
411      _taskId = taskId;
412      _taskType = taskType;
413    }
414
415    /**
416     * @return the backendId
417     */
418    public Integer getBackendId() {
419      return _backendId;
420    }
421
422    @Override
423    public Class<?> getSource() {
424      return _source;
425    }
426
427    /**
428     * @return the status
429     */
430    public TaskStatus getStatus() {
431      return _status;
432    }
433
434    /**
435     * @return the taskId
436     */
437    public Long getTaskId() {
438      return _taskId;
439    }
440
441    /**
442     * @return the taskType
443     */
444    public TaskType getTaskType() {
445      return _taskType;
446    }
447  } // class AsyncTaskEvent
448}