001/** 002 * Copyright 2014 Tampere University of Technology, Pori Department 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016package service.tut.pori.twitterjazz; 017 018import java.io.IOException; 019import java.util.Arrays; 020import java.util.EnumSet; 021import java.util.Iterator; 022import java.util.List; 023 024import org.apache.commons.lang3.StringUtils; 025import org.apache.http.client.methods.HttpPost; 026import org.apache.http.entity.StringEntity; 027import org.apache.http.impl.client.BasicResponseHandler; 028import org.apache.http.impl.client.CloseableHttpClient; 029import org.apache.http.impl.client.HttpClients; 030import org.apache.log4j.Logger; 031import org.quartz.JobDataMap; 032import org.quartz.JobExecutionContext; 033 034import service.tut.pori.contentanalysis.AnalysisBackend; 035import service.tut.pori.contentanalysis.AnalysisBackend.Capability; 036import service.tut.pori.contentanalysis.AsyncTask; 037import service.tut.pori.contentanalysis.BackendStatus; 038import service.tut.pori.contentanalysis.BackendStatusList; 039import service.tut.pori.contentanalysis.CAContentCore.ServiceType; 040import service.tut.pori.contentanalysis.Definitions; 041import service.tut.pori.contentanalysis.Photo; 042import service.tut.pori.contentanalysis.CAContentCore.Visibility; 043import service.tut.pori.contentanalysis.CAContentCore; 044import service.tut.pori.contentanalysis.PhotoDAO; 045import service.tut.pori.contentanalysis.PhotoList; 046import service.tut.pori.contentanalysis.MediaObject; 047import service.tut.pori.contentanalysis.MediaObjectDAO; 048import service.tut.pori.contentanalysis.MediaObjectList; 049import service.tut.pori.contentstorage.TwitterPhotoStorage; 050import core.tut.pori.context.ServiceInitializer; 051import core.tut.pori.users.UserIdentity; 052import core.tut.pori.utils.XMLFormatter; 053 054/** 055 * An implementation of ASyncTask, meant for executing a Twitter summarization task. 056 * 057 * Requires a valid taskId for execution, provided in a JobExecutionContext. 058 */ 059public class TwitterSummarizationTask extends AsyncTask { 060 private static final Double DEFAULT_TAG_CONFIDENCE = 1.0; 061 private static final Integer DEFAULT_TAG_RANK = 1; 062 private static final Logger LOGGER = Logger.getLogger(TwitterSummarizationTask.class); 063 064 @Override 065 public void execute(JobExecutionContext context) { 066 try{ 067 LOGGER.debug("Executing task..."); 068 JobDataMap data = context.getMergedJobDataMap(); 069 070 Long taskId = getTaskId(data); 071 if(taskId == null){ 072 LOGGER.debug("No taskId."); 073 return; 074 } 075 076 TwitterTaskDAO taskDAO = ServiceInitializer.getDAOHandler().getSQLDAO(TwitterTaskDAO.class); 077 BackendStatusList backends = taskDAO.getBackendStatus(taskId, TaskStatus.NOT_STARTED); 078 if(BackendStatusList.isEmpty(backends)){ 079 LOGGER.warn("No analysis back-ends available for taskId: "+taskId+" with status "+TaskStatus.NOT_STARTED.name()); 080 return; 081 } 082 083 TwitterSummarizationTaskDetails details = taskDAO.getTask(null, null, null, taskId); // no need to retrieve per back-end as the details are the same for each back-end 084 if(details == null){ 085 LOGGER.warn("Task not found, id: "+taskId); 086 return; 087 } 088 089 UserIdentity userId = details.getUserId(); 090 String screenName = details.getScreenName(); 091 if(details.isSynchronize()){ 092 synchronize(backends, screenName, taskId, userId); 093 }else{ 094 LOGGER.warn("Synchronization disabled by parameter."); 095 } 096 097 if(details.isSummarize()){ 098 summarize(backends, details, screenName, taskId); 099 }else{ 100 LOGGER.warn("Summarization disabled by parameter."); 101 } 102 } catch(Throwable ex){ // catch all exceptions to prevent re-scheduling on error 103 LOGGER.error(ex, ex); 104 } 105 } 106 107 /** 108 * 109 * @param backends 110 * @param details 111 * @param screenName 112 * @param taskId 113 */ 114 private void summarize(BackendStatusList backends, TwitterSummarizationTaskDetails details, String screenName, Long taskId){ 115 LOGGER.debug("Summarizing..."); 116 117 backends = BackendStatusList.getBackendStatusList(backends.getBackendStatuses(EnumSet.of(Capability.TWITTER_SUMMARIZATION))); // filter back-ends with summarization capability 118 if(BackendStatusList.isEmpty(backends)){ 119 LOGGER.warn("Aborting summarization... no back-end given with capability "+Capability.TWITTER_SUMMARIZATION.name()+" for task, id: "+taskId); 120 return; 121 } 122 123 TwitterExtractor e = TwitterExtractor.getExtractor(details.getUserId()); 124 if(e == null){ 125 LOGGER.error("Failed to create extractor for the given user."); 126 return; 127 } 128 129 TwitterProfile p = null; 130 if(StringUtils.isBlank(screenName)){ 131 LOGGER.debug("No screen name, retrieving authenticated user's profile."); 132 p = e.getProfile(details.getContentTypes()); 133 if(p == null){ 134 LOGGER.error("Failed to retrieve profile for the given user from Twitter."); 135 return; 136 } 137 }else{ 138 LOGGER.debug("Retrieving profile for user with screen name: "+screenName); 139 List<TwitterProfile> profiles = e.getProfiles(details.getContentTypes(), new String[]{screenName}); 140 if(profiles == null){ 141 LOGGER.error("Failed to retrieve profile for the screen name "+screenName); 142 return; 143 } 144 p = profiles.get(0); 145 } 146 details.setProfile(p); 147 148 try (CloseableHttpClient client = HttpClients.createDefault()) { 149 BasicResponseHandler h = new BasicResponseHandler(); 150 for(BackendStatus backendStatus : backends.getBackendStatuses()){ 151 AnalysisBackend end = backendStatus.getBackend(); 152 try { 153 Integer backendId = end.getBackendId(); 154 String url = end.getAnalysisUri()+Definitions.METHOD_ADD_TASK; 155 LOGGER.debug("Task, id: "+taskId+", back-end id: "+backendId+". Executing POST "+url); 156 HttpPost taskRequest = new HttpPost(url); 157 details.setBackendId(backendId); 158 taskRequest.setHeader("Content-Type", "text/xml; charset=UTF-8"); 159 taskRequest.setEntity(new StringEntity((new XMLFormatter()).toString(details), core.tut.pori.http.Definitions.ENCODING_UTF8)); 160 161 LOGGER.debug("Backend with id: "+backendId+" responded "+client.execute(taskRequest,h)); 162 } catch (IOException ex) { 163 LOGGER.warn(ex, ex); 164 } 165 } 166 } catch (IOException ex) { 167 LOGGER.error(ex, ex); 168 } 169 } 170 171 /** 172 * @param backends 173 * @param screenName 174 * @param taskId 175 * @param userId 176 */ 177 private void synchronize(BackendStatusList backends, String screenName, Long taskId, UserIdentity userId){ 178 LOGGER.debug("Synchronizing..."); 179 backends = BackendStatusList.getBackendStatusList(backends.getBackendStatuses(EnumSet.of(Capability.PHOTO_ANALYSIS))); // filter back-ends with analysis capability; 180 if(BackendStatusList.isEmpty(backends)){ 181 LOGGER.warn("Aborting synchronization... no back-end given with capability "+Capability.PHOTO_ANALYSIS.name()+" for task, id: "+taskId); 182 return; 183 } 184 185 TwitterPhotoStorage twitterStorage = new TwitterPhotoStorage(); 186 twitterStorage.setBackends(backends); 187 188 if(StringUtils.isBlank(screenName)){ 189 LOGGER.debug("Synchronizing without screen names."); 190 twitterStorage.synchronizeAccount(userId); // there is no need to wait for the analysis tasks to complete 191 }else{ 192 LOGGER.debug("Synchronizing with screen names."); 193 twitterStorage.synchronizeAccount(userId, Arrays.asList(screenName)); // there is no need to wait for the analysis tasks to complete 194 } // else 195 } 196 197 /** 198 * Process the response. After this method has finished, the response will not contain non-existent photos (if any were present). 199 * 200 * @param response 201 * @throws IllegalArgumentException on bad data 202 */ 203 public static void taskFinished(TwitterTaskResponse response) throws IllegalArgumentException { 204 Integer backendId = response.getBackendId(); 205 if(backendId == null){ 206 throw new IllegalArgumentException("Invalid backendId."); 207 } 208 Long taskId = response.getTaskId(); 209 if(taskId == null){ 210 throw new IllegalArgumentException("Invalid taskId."); 211 } 212 213 TwitterTaskDAO taskDAO = ServiceInitializer.getDAOHandler().getSQLDAO(TwitterTaskDAO.class); 214 BackendStatus backendStatus = taskDAO.getBackendStatus(backendId, taskId); 215 if(backendStatus == null){ 216 LOGGER.warn("Backend, id: "+backendId+" returned results for task, not given to the backend. TaskId: "+taskId); 217 throw new IllegalArgumentException("This task is not given for backend, id: "+backendId); 218 } 219 220 TaskStatus status = response.getStatus(); 221 if(status == null){ 222 LOGGER.warn("Task status not available."); 223 status = TaskStatus.UNKNOWN; 224 } 225 backendStatus.setStatus(status); 226 227 try{ 228 PhotoList photoList = response.getPhotoList(); 229 if(PhotoList.isEmpty(photoList)){ 230 LOGGER.debug("No photo list returned by backend, id: "+backendId+", task, id: "+taskId); 231 }else{ // create media objects and associate 232 PhotoDAO pdao = ServiceInitializer.getDAOHandler().getSolrDAO(PhotoDAO.class); 233 List<String> foundGUIDs = PhotoList.getGUIDs(pdao.getPhotos(null, photoList.getGUIDs(), null, null, null)); 234 if(foundGUIDs == null){ 235 LOGGER.warn("None of the photos exist, will not process media objects for backend, id: "+backendId+" for task, id: "+taskId); 236 photoList = null; // prevents generation of feedback task for invalid content 237 }else{ 238 List<Photo> photos = photoList.getPhotos(); 239 LOGGER.debug("New media objects for photos, photo count: "+photos.size()+", backend, id: "+backendId); 240 for(Iterator<Photo> iter = photos.iterator(); iter.hasNext();){ 241 Photo photo = iter.next(); 242 MediaObjectList mediaObjects = photo.getMediaObjects(); 243 if(MediaObjectList.isEmpty(mediaObjects)){ 244 LOGGER.warn("Ignored empty media object list for backend, id: "+backendId+" for task, id: "+taskId+", photo, GUID: "+photo.getGUID()); 245 iter.remove(); 246 }else if(!foundGUIDs.contains(photo.getGUID())){ 247 LOGGER.warn("Ignored non-existing photo for backend, id: "+backendId+" for task, id: "+taskId+", photo, GUID: "+photo.getGUID()); 248 iter.remove(); // remove to prevent association 249 }else if(!validate(mediaObjects, backendId, photo.getOwnerUserId()) || !insertOrUpdate(mediaObjects)){ 250 backendStatus.setStatus(TaskStatus.ERROR); 251 throw new IllegalArgumentException("Invalid object list returned by backend, id: "+backendId+" for task, id: "+taskId); 252 } 253 } // for 254 pdao.associate(photoList); 255 } // else 256 } 257 258 MediaObjectList objects = response.getMediaObjects(); 259 if(MediaObjectList.isEmpty(objects)){ 260 LOGGER.debug("No media object list returned by backend, id: "+backendId+" for task, id: "+taskId); 261 }else if(!validate(objects, backendId, null) || !insertOrUpdate(objects)){ 262 backendStatus.setStatus(TaskStatus.ERROR); 263 throw new IllegalArgumentException("Invalid object list returned by backend, id: "+backendId+" for task, id: "+taskId); 264 }else{ 265 LOGGER.debug("New media objects: "+objects.getMediaObjects().size()+" backend, id: "+backendId); 266 } 267 268 CAContentCore.scheduleBackendFeedback(backendId, photoList, taskId); 269 } finally { 270 taskDAO.updateTaskStatus(backendStatus, taskId); 271 ServiceInitializer.getEventHandler().publishEvent(new AsyncTaskEvent(backendId, TwitterSummarizationTask.class, status, taskId, TaskType.TWITTER_PROFILE_SUMMARIZATION)); 272 } 273 } 274 275 /** 276 * 277 * @param mediaObjects non-null, non-empty validated object list 278 * @return true on success 279 */ 280 private static boolean insertOrUpdate(MediaObjectList mediaObjects){ 281 MediaObjectList updates = new MediaObjectList(); 282 MediaObjectList inserts = new MediaObjectList(); 283 284 for(MediaObject o : mediaObjects.getMediaObjects()){ 285 if(StringUtils.isBlank(o.getMediaObjectId())){ // no media object id 286 inserts.addMediaObject(o); 287 }else{ 288 updates.addMediaObject(o); 289 } 290 } 291 292 PhotoDAO photoDAO = ServiceInitializer.getDAOHandler().getSolrDAO(PhotoDAO.class); 293 if(MediaObjectList.isEmpty(inserts)){ 294 LOGGER.debug("Nothing to insert."); 295 }else if(!photoDAO.insert(inserts)){ 296 LOGGER.warn("Failed to insert media objects."); 297 return false; 298 } 299 300 if(MediaObjectList.isEmpty(updates)){ 301 LOGGER.debug("Nothing to update."); 302 }else if(!photoDAO.update(updates)){ 303 LOGGER.warn("Failed to update media objects."); 304 return false; 305 } 306 return true; 307 } 308 309 /** 310 * Validate the given list of media objects, if confidence is missing, this method will automatically set it to default, rank will also be set to 0 311 * 312 * This also set the correct serviceType and visibility (private, if not given), and resolves mediaObjectIds 313 * 314 * @param mediaObjects non-empty and non-null list of objects 315 * @param backendId non-null id 316 * @param userId if null, the check will be ignored 317 * @return true if the given parameters were valid 318 */ 319 private static boolean validate(MediaObjectList mediaObjects, Integer backendId, UserIdentity userId){ 320 MediaObjectDAO vdao = ServiceInitializer.getDAOHandler().getSolrDAO(MediaObjectDAO.class); 321 vdao.resolveObjectIds(mediaObjects); 322 for(MediaObject object : mediaObjects.getMediaObjects()){ 323 if(backendId != object.getBackendId()){ 324 LOGGER.warn("Backend id mismatch."); 325 return false; 326 }else if(userId != null && !UserIdentity.equals(object.getOwnerUserId(), userId)){ 327 LOGGER.warn("Media objects user identity does not match the given user identity."); 328 return false; 329 } 330 Integer rank = object.getRank(); 331 if(rank == null){ 332 object.setRank(DEFAULT_TAG_RANK); 333 } 334 Double confidence = object.getConfidence(); 335 if(confidence == null){ 336 object.setConfidence(DEFAULT_TAG_CONFIDENCE); 337 } 338 object.setServiceType(ServiceType.TWITTER_JAZZ); 339 if(object.getVisibility() == null){ 340 object.setVisibility(Visibility.PRIVATE); 341 } 342 } 343 return true; 344 } 345}