001/** 002 * Copyright 2014 Tampere University of Technology, Pori Department 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016package service.tut.pori.contentanalysis; 017 018import java.io.IOException; 019import java.util.Collection; 020import java.util.Iterator; 021import java.util.Set; 022 023import javax.xml.bind.annotation.XmlEnum; 024import javax.xml.bind.annotation.XmlEnumValue; 025 026import org.apache.commons.lang3.StringUtils; 027import org.apache.http.client.methods.HttpPost; 028import org.apache.http.entity.StringEntity; 029import org.apache.http.impl.client.BasicResponseHandler; 030import org.apache.http.impl.client.CloseableHttpClient; 031import org.apache.http.impl.client.HttpClients; 032import org.apache.log4j.Logger; 033import org.quartz.Job; 034import org.quartz.JobDataMap; 035import org.springframework.context.ApplicationEvent; 036 037import service.tut.pori.contentanalysis.AnalysisBackend.Capability; 038import core.tut.pori.utils.XMLFormatter; 039 040/** 041 * Base class for tasks. There is a default implementation to call addTask, which should suffice in many cases. 042 */ 043public abstract class AsyncTask implements Job{ 044 /** Job data key, content type is long */ 045 private static final String JOB_DATA_TASK_ID = "taskId"; 046 private static final Logger LOGGER = Logger.getLogger(AsyncTask.class); 047 private static final String TASKSTATUS_COMPLETED = "COMPLETED"; 048 private static final String TASKSTATUS_ERROR = "ERROR"; 049 private static final String TASKSTATUS_EXECUTING = "EXECUTING"; 050 private static final String TASKSTATUS_NOT_STARTED = "NOT_STARTED"; 051 private static final String TASKSTATUS_PENDING = "PENDING"; 052 private static final String TASKSTATUS_UNKNOWN = "UNKNOWN"; 053 private static final String TASKTYPE_ANALYSIS = "ANALYSIS"; 054 private static final String TASK_TYPE_BACKEND_FEEDBACK = "BACKEND_FEEDBACK"; 055 private static final String TASKTYPE_FACEBOOK_PROFILE_SUMMARIZATION = "FACEBOOK_PROFILE_SUMMARIZATION"; 056 private static final String TASK_TYPE_FACEBOOK_PROFILE_SUMMARIZATION_FEEDBACK = "FACEBOOK_PROFILE_SUMMARIZATION_FEEDBACK"; 057 private static final String TASK_TYPE_TWITTER_PROFILE_SUMMARIZATION = "TWITTER_PROFILE_SUMMARIZATION"; 058 private static final String TASKTYPE_FEEDBACK = "FEEDBACK"; 059 private static final String TASKTYPE_SEARCH = "SEARCH"; 060 private static final String TASKTYPE_UNDEFINED = "UNDEFINED"; 061 062 /** 063 * The status of the task. 064 */ 065 @XmlEnum 066 public enum TaskStatus{ 067 /** unknown or unspecified task status */ 068 @XmlEnumValue(value = TASKSTATUS_UNKNOWN) 069 UNKNOWN(0), 070 /** task has been created, but back-ends have not yet started to process it, or the the task has not been delivered to back-ends */ 071 @XmlEnumValue(value = TASKSTATUS_NOT_STARTED) 072 NOT_STARTED(1), 073 /** task has been delivered to back-end, but the analysis has not yet started */ 074 @XmlEnumValue(value = TASKSTATUS_PENDING) 075 PENDING(2), 076 /** the task is being executed */ 077 @XmlEnumValue(value = TASKSTATUS_EXECUTING) 078 EXECUTING(3), 079 /** task has completed */ 080 @XmlEnumValue(value = TASKSTATUS_COMPLETED) 081 COMPLETED(4), 082 /** an error condition has prevented to execution of the task */ 083 @XmlEnumValue(value = TASKSTATUS_ERROR) 084 ERROR(5); 085 086 private int _value; 087 088 089 /** 090 * 091 * @param value 092 */ 093 private TaskStatus(int value){ 094 _value = value; 095 } 096 097 098 /** 099 * 100 * @return TaskStatus as integer 101 */ 102 public int toInt(){ 103 return _value; 104 } 105 106 /** 107 * 108 * @param value 109 * @return the value converted to TaskStatus 110 * @throws IllegalArgumentException on bad input 111 */ 112 public static TaskStatus fromInt(int value) throws IllegalArgumentException{ 113 for(TaskStatus s : TaskStatus.values()){ 114 if(s._value == value){ 115 return s; 116 } 117 } 118 throw new IllegalArgumentException("Bad "+TaskStatus.class.toString()+" : "+value); 119 } 120 121 /** 122 * 123 * @param statusList 124 * @return the combined status for the list of status codes or null if null or empty list was passed 125 */ 126 public static TaskStatus getCombinedTaskStatus(Collection<TaskStatus> statusList) { 127 if(statusList == null || statusList.size() < 1){ 128 return null; 129 } 130 TaskStatus status = UNKNOWN; 131 for(Iterator<TaskStatus> iter = statusList.iterator();iter.hasNext();){ 132 switch(iter.next()){ 133 case COMPLETED: 134 if(status == NOT_STARTED){ 135 status = PENDING; 136 }else if(status!=PENDING && status!=EXECUTING && status!=COMPLETED){ 137 status = COMPLETED; 138 } 139 break; 140 case ERROR: 141 return ERROR; 142 case EXECUTING: 143 status = EXECUTING; 144 break; 145 case NOT_STARTED: 146 if(status != PENDING && status != EXECUTING) 147 status = NOT_STARTED; 148 break; 149 case PENDING: 150 status = PENDING; 151 break; 152 case UNKNOWN: 153 LOGGER.debug("Unknown Task Status detected."); 154 break; 155 default: 156 LOGGER.error("Unhandled "+TaskStatus.class.toString()); 157 break; 158 } 159 } 160 return status; 161 } 162 } // enum TaskStatus 163 164 165 /** 166 * The type of the task. 167 * 168 * New task types cannot be defined by the services. 169 * If new task type is required one option is to use {@link service.tut.pori.contentanalysis.AsyncTask.TaskType#UNDEFINED} and add the proper task type as a metadata for the task. 170 */ 171 @XmlEnum 172 public enum TaskType{ 173 /** task type is unknown or undefined */ 174 @XmlEnumValue(value = TASKTYPE_UNDEFINED) 175 UNDEFINED(0), 176 /** media analysis task */ 177 @XmlEnumValue(value = TASKTYPE_ANALYSIS) 178 ANALYSIS(1), 179 /** search task */ 180 @XmlEnumValue(value = TASKTYPE_SEARCH) 181 SEARCH(2), 182 /** Feedback task. This can either be direct or indirect user feedback (e.g. deleted content) */ 183 @XmlEnumValue(value = TASKTYPE_FEEDBACK) 184 FEEDBACK(3), // user feedback 185 /** facebook user profile summarization */ 186 @XmlEnumValue(value = TASKTYPE_FACEBOOK_PROFILE_SUMMARIZATION) 187 FACEBOOK_PROFILE_SUMMARIZATION(4), 188 /** user feedback for facebook profile summarization */ 189 @XmlEnumValue(value = TASK_TYPE_FACEBOOK_PROFILE_SUMMARIZATION_FEEDBACK) 190 FACEBOOK_PROFILE_SUMMARIZATION_FEEDBACK(5), 191 /** twitter user profile summarization */ 192 @XmlEnumValue(value = TASK_TYPE_TWITTER_PROFILE_SUMMARIZATION) 193 TWITTER_PROFILE_SUMMARIZATION(6), 194 /** feedback generated based on results received from back-ends. This task is always targeted to other back-ends. */ 195 @XmlEnumValue(value = TASK_TYPE_BACKEND_FEEDBACK) 196 BACKEND_FEEDBACK(7); 197 198 private int _value; 199 200 201 /** 202 * 203 * @param value 204 */ 205 private TaskType(int value){ 206 _value = value; 207 } 208 209 210 /** 211 * 212 * @return TaskType as integer 213 */ 214 public int toInt(){ 215 return _value; 216 } 217 218 /** 219 * 220 * @param value 221 * @return the value converted to TaskType 222 * @throws IllegalArgumentException on bad input 223 */ 224 public static TaskType fromInt(int value) throws IllegalArgumentException{ 225 for(TaskType s : TaskType.values()){ 226 if(s._value == value){ 227 return s; 228 } 229 } 230 throw new IllegalArgumentException("Bad "+TaskType.class.toString()+" : "+value); 231 } 232 233 /** 234 * 235 * @param value 236 * @return the value converted to TaskType 237 * @throws IllegalArgumentException on bad input 238 */ 239 public static TaskType fromString(String value) throws IllegalArgumentException { 240 if(!StringUtils.isBlank(value)){ 241 switch(value.toUpperCase()){ 242 case TASKTYPE_ANALYSIS: 243 return TaskType.ANALYSIS; 244 case TASK_TYPE_BACKEND_FEEDBACK: 245 return TaskType.BACKEND_FEEDBACK; 246 case TASKTYPE_FACEBOOK_PROFILE_SUMMARIZATION: 247 return FACEBOOK_PROFILE_SUMMARIZATION; 248 case TASK_TYPE_FACEBOOK_PROFILE_SUMMARIZATION_FEEDBACK: 249 return FACEBOOK_PROFILE_SUMMARIZATION_FEEDBACK; 250 case TASK_TYPE_TWITTER_PROFILE_SUMMARIZATION: 251 return TWITTER_PROFILE_SUMMARIZATION; 252 case TASKTYPE_FEEDBACK: 253 return FEEDBACK; 254 case TASKTYPE_SEARCH: 255 return SEARCH; 256 case TASKTYPE_UNDEFINED: 257 return UNDEFINED; 258 default: 259 break; 260 } 261 } 262 throw new IllegalArgumentException("Bad "+TaskType.class.toString()+" : "+value); 263 } 264 } // enum TaskType 265 266 /** 267 * Default implementation for addTask. 268 * 269 * This will: 270 * - use the default TaskDAO to retrieve the back-ends associated with the task 271 * - use the default TaskDAO to retrieve the task details for the task 272 * - call each backend's addTask method with default parameters 273 * - use the default TaskDAO to update the back-end details for the associated back-ends 274 * 275 * @param requiredCapabilities capabilities required for the participating back-ends, all back-ends for the task not having ALL of the capabilities are ignored. If null no check is performed. 276 * @param taskDAO used to resolve task details with taskDAO.getTask(backendId, dataGroups, limits, taskId) with limit parameter null 277 * @param taskId 278 */ 279 protected void executeAddTask(Set<Capability> requiredCapabilities, TaskDAO taskDAO, Long taskId) { 280 try{ 281 BackendStatusList backends = taskDAO.getBackendStatus(taskId, TaskStatus.NOT_STARTED); 282 if(BackendStatusList.isEmpty(backends)){ 283 LOGGER.warn("No analysis back-ends available for taskId: "+taskId+" with status "+TaskStatus.NOT_STARTED.name()); 284 return; 285 } 286 287 if(requiredCapabilities != null && !requiredCapabilities.isEmpty()){ 288 backends = BackendStatusList.getBackendStatusList(backends.getBackendStatuses(requiredCapabilities)); // filter back-ends 289 if(BackendStatusList.isEmpty(backends)){ 290 LOGGER.warn("Aborting execute... no back-end given with required capabilities for task, id: "+taskId); 291 return; 292 } 293 }else{ 294 LOGGER.debug("Ignoring capability check..."); 295 } 296 297 try (CloseableHttpClient client = HttpClients.createDefault()) { 298 BasicResponseHandler h = new BasicResponseHandler(); 299 for(BackendStatus status : backends.getBackendStatuses()){ 300 AnalysisBackend end = status.getBackend(); 301 Integer backendId = end.getBackendId(); 302 AbstractTaskDetails details = taskDAO.getTask(backendId, end.getDefaultTaskDataGroups(), null, taskId); 303 if(details == null){ 304 LOGGER.warn("Task, id: "+taskId+" does not exist for backend, id: "+backendId); 305 continue; 306 } 307 308 if(details.getUserId() == null && !end.hasCapability(Capability.ANONYMOUS_TASK)){ 309 LOGGER.warn("backendId: "+backendId+" cannot process anonymous tasks. Ignoring backend..."); 310 continue; 311 } 312 313 try { 314 String url = end.getAnalysisUri()+Definitions.METHOD_ADD_TASK; 315 LOGGER.debug("Task, id: "+taskId+" of type "+details.getTaskType().name()+", back-end id: "+backendId+". Sending "+Definitions.METHOD_ADD_TASK+" to URL: "+url); 316 HttpPost taskRequest = new HttpPost(url); 317 details.setBackendId(backendId); 318 taskRequest.setHeader("Content-Type", "text/xml; charset=UTF-8"); 319 taskRequest.setEntity(new StringEntity((new XMLFormatter()).toString(details), core.tut.pori.http.Definitions.ENCODING_UTF8)); 320 321 LOGGER.debug("Backend with id: "+backendId+" responded "+client.execute(taskRequest,h)); 322 323 status.setStatus(TaskStatus.EXECUTING); //updates the status of the task for this back-end 324 } catch (IOException ex) { 325 LOGGER.warn(ex, ex); 326 } 327 } // for 328 } catch (IOException ex) { 329 LOGGER.error(ex, ex); 330 } 331 taskDAO.updateTaskStatus(backends, taskId); 332 }catch (Throwable ex) { // catch all exceptions to prevent re-scheduling on error 333 LOGGER.error(ex, ex); 334 } 335 } // run 336 337 /** 338 * 339 * @param map non-null map 340 * @param taskId if null or empty, previous value will be removed 341 */ 342 public static void setTaskId(JobDataMap map, Long taskId){ 343 if(taskId == null){ 344 map.remove(JOB_DATA_TASK_ID); 345 }else{ 346 map.put(JOB_DATA_TASK_ID, taskId); 347 } 348 } 349 350 /** 351 * 352 * @param map 353 * @return the taskId or null if not found or the map was null 354 */ 355 public static Long getTaskId(JobDataMap map){ 356 if(map == null || !map.containsKey(JOB_DATA_TASK_ID)){ 357 return null; 358 }else{ 359 return map.getLong(JOB_DATA_TASK_ID); 360 } 361 } 362 363 /** 364 * An application event used to notify listeners about progress or change in status of an ASyncTask execution. 365 * 366 */ 367 public static class AsyncTaskEvent extends ApplicationEvent{ 368 /** serial version id */ 369 private static final long serialVersionUID = 2342360048902381597L; 370 private Integer _backendId = null; 371 private Class<?> _source = null; 372 private TaskStatus _status = null; 373 private Long _taskId = null; 374 private TaskType _taskType = null; 375 376 /** 377 * 378 * @param source 379 */ 380 public AsyncTaskEvent(Class<?> source) { 381 super(source); 382 _source = source; 383 } 384 385 /** 386 * 387 * @param source 388 * @param status 389 * @param taskId 390 * @param taskType 391 */ 392 public AsyncTaskEvent(Class<?> source, TaskStatus status, Long taskId, TaskType taskType) { 393 super(source); 394 _status = status; 395 _taskId = taskId; 396 _taskType = taskType; 397 } 398 399 /** 400 * 401 * @param backendId 402 * @param source 403 * @param status 404 * @param taskId 405 * @param taskType 406 */ 407 public AsyncTaskEvent(Integer backendId, Class<?> source, TaskStatus status, Long taskId, TaskType taskType) { 408 super(source); 409 _backendId = backendId; 410 _status = status; 411 _taskId = taskId; 412 _taskType = taskType; 413 } 414 415 /** 416 * @return the backendId 417 */ 418 public Integer getBackendId() { 419 return _backendId; 420 } 421 422 @Override 423 public Class<?> getSource() { 424 return _source; 425 } 426 427 /** 428 * @return the status 429 */ 430 public TaskStatus getStatus() { 431 return _status; 432 } 433 434 /** 435 * @return the taskId 436 */ 437 public Long getTaskId() { 438 return _taskId; 439 } 440 441 /** 442 * @return the taskType 443 */ 444 public TaskType getTaskType() { 445 return _taskType; 446 } 447 } // class AsyncTaskEvent 448}