001/**
002 * Copyright 2014 Tampere University of Technology, Pori Department
003 * 
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 * 
008 *   http://www.apache.org/licenses/LICENSE-2.0
009 * 
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016package service.tut.pori.twitterjazz;
017
018import java.io.IOException;
019import java.util.Arrays;
020import java.util.EnumSet;
021import java.util.Iterator;
022import java.util.List;
023
024import org.apache.commons.lang3.StringUtils;
025import org.apache.http.client.methods.HttpPost;
026import org.apache.http.entity.StringEntity;
027import org.apache.http.impl.client.BasicResponseHandler;
028import org.apache.http.impl.client.CloseableHttpClient;
029import org.apache.http.impl.client.HttpClients;
030import org.apache.log4j.Logger;
031import org.quartz.JobDataMap;
032import org.quartz.JobExecutionContext;
033
034import service.tut.pori.contentanalysis.AnalysisBackend;
035import service.tut.pori.contentanalysis.AnalysisBackend.Capability;
036import service.tut.pori.contentanalysis.AsyncTask;
037import service.tut.pori.contentanalysis.BackendStatus;
038import service.tut.pori.contentanalysis.BackendStatusList;
039import service.tut.pori.contentanalysis.CAContentCore.ServiceType;
040import service.tut.pori.contentanalysis.Definitions;
041import service.tut.pori.contentanalysis.Photo;
042import service.tut.pori.contentanalysis.CAContentCore.Visibility;
043import service.tut.pori.contentanalysis.CAContentCore;
044import service.tut.pori.contentanalysis.PhotoDAO;
045import service.tut.pori.contentanalysis.PhotoList;
046import service.tut.pori.contentanalysis.MediaObject;
047import service.tut.pori.contentanalysis.MediaObjectDAO;
048import service.tut.pori.contentanalysis.MediaObjectList;
049import service.tut.pori.contentstorage.TwitterPhotoStorage;
050import core.tut.pori.context.ServiceInitializer;
051import core.tut.pori.users.UserIdentity;
052import core.tut.pori.utils.XMLFormatter;
053
054/**
055 * An implementation of ASyncTask, meant for executing a Twitter summarization task.
056 * 
057 * Requires a valid taskId for execution, provided in a JobExecutionContext.
058 */
059public class TwitterSummarizationTask extends AsyncTask {
060  private static final Double DEFAULT_TAG_CONFIDENCE = 1.0;
061  private static final Integer DEFAULT_TAG_RANK = 1;
062  private static final Logger LOGGER = Logger.getLogger(TwitterSummarizationTask.class);
063
064  @Override
065  public void execute(JobExecutionContext context) {
066    try{
067      LOGGER.debug("Executing task...");
068      JobDataMap data = context.getMergedJobDataMap();
069
070      Long taskId = getTaskId(data);
071      if(taskId == null){
072        LOGGER.debug("No taskId.");
073        return;
074      }
075      
076      TwitterTaskDAO taskDAO = ServiceInitializer.getDAOHandler().getSQLDAO(TwitterTaskDAO.class);
077      BackendStatusList backends = taskDAO.getBackendStatus(taskId, TaskStatus.NOT_STARTED);
078      if(BackendStatusList.isEmpty(backends)){
079        LOGGER.warn("No analysis back-ends available for taskId: "+taskId+" with status "+TaskStatus.NOT_STARTED.name());
080        return;
081      }
082
083      TwitterSummarizationTaskDetails details = taskDAO.getTask(null, null, null, taskId); // no need to retrieve per back-end as the details are the same for each back-end
084      if(details == null){
085        LOGGER.warn("Task not found, id: "+taskId);
086        return;
087      }
088
089      UserIdentity userId = details.getUserId();
090      String screenName = details.getScreenName();
091      if(details.isSynchronize()){
092        synchronize(backends, screenName, taskId, userId);
093      }else{
094        LOGGER.warn("Synchronization disabled by parameter.");
095      }
096
097      if(details.isSummarize()){
098        summarize(backends, details, screenName, taskId);
099      }else{
100        LOGGER.warn("Summarization disabled by parameter.");
101      }
102    } catch(Throwable ex){  // catch all exceptions to prevent re-scheduling on error
103      LOGGER.error(ex, ex);
104    }
105  }
106  
107  /**
108   * 
109   * @param backends
110   * @param details
111   * @param screenName
112   * @param taskId
113   */
114  private void summarize(BackendStatusList backends, TwitterSummarizationTaskDetails details, String screenName, Long taskId){
115    LOGGER.debug("Summarizing...");
116    
117    backends = BackendStatusList.getBackendStatusList(backends.getBackendStatuses(EnumSet.of(Capability.TWITTER_SUMMARIZATION))); // filter back-ends with summarization capability
118    if(BackendStatusList.isEmpty(backends)){
119      LOGGER.warn("Aborting summarization... no back-end given with capability "+Capability.TWITTER_SUMMARIZATION.name()+" for task, id: "+taskId);
120      return;
121    }
122    
123    TwitterExtractor e = TwitterExtractor.getExtractor(details.getUserId());
124    if(e == null){
125      LOGGER.error("Failed to create extractor for the given user.");
126      return;
127    }
128    
129    TwitterProfile p = null;
130    if(StringUtils.isBlank(screenName)){
131      LOGGER.debug("No screen name, retrieving authenticated user's profile.");
132      p = e.getProfile(details.getContentTypes());
133      if(p == null){
134        LOGGER.error("Failed to retrieve profile for the given user from Twitter.");
135        return;
136      }
137    }else{
138      LOGGER.debug("Retrieving profile for user with screen name: "+screenName);
139      List<TwitterProfile> profiles = e.getProfiles(details.getContentTypes(), new String[]{screenName});
140      if(profiles == null){
141        LOGGER.error("Failed to retrieve profile for the screen name "+screenName);
142        return;
143      }
144      p = profiles.get(0);
145    }
146    details.setProfile(p);
147
148    try (CloseableHttpClient client = HttpClients.createDefault()) {
149      BasicResponseHandler h = new BasicResponseHandler();
150      for(BackendStatus backendStatus : backends.getBackendStatuses()){
151        AnalysisBackend end = backendStatus.getBackend();
152        try {
153          Integer backendId = end.getBackendId();
154          String url = end.getAnalysisUri()+Definitions.METHOD_ADD_TASK;
155          LOGGER.debug("Task, id: "+taskId+", back-end id: "+backendId+". Executing POST "+url);
156          HttpPost taskRequest = new HttpPost(url);
157          details.setBackendId(backendId);
158          taskRequest.setHeader("Content-Type", "text/xml; charset=UTF-8");
159          taskRequest.setEntity(new StringEntity((new XMLFormatter()).toString(details), core.tut.pori.http.Definitions.ENCODING_UTF8));        
160
161          LOGGER.debug("Backend with id: "+backendId+" responded "+client.execute(taskRequest,h));
162        } catch (IOException ex) {
163          LOGGER.warn(ex, ex);
164        }
165      }
166    } catch (IOException ex) {
167      LOGGER.error(ex, ex);
168    }
169  }
170  
171  /**
172   * @param backends 
173   * @param screenName 
174   * @param taskId 
175   * @param userId 
176   */
177  private void synchronize(BackendStatusList backends, String screenName, Long taskId, UserIdentity userId){
178    LOGGER.debug("Synchronizing...");
179    backends = BackendStatusList.getBackendStatusList(backends.getBackendStatuses(EnumSet.of(Capability.PHOTO_ANALYSIS))); // filter back-ends with analysis capability;
180    if(BackendStatusList.isEmpty(backends)){
181      LOGGER.warn("Aborting synchronization... no back-end given with capability "+Capability.PHOTO_ANALYSIS.name()+" for task, id: "+taskId);
182      return;
183    }
184    
185    TwitterPhotoStorage twitterStorage = new TwitterPhotoStorage();
186    twitterStorage.setBackends(backends);
187    
188    if(StringUtils.isBlank(screenName)){
189      LOGGER.debug("Synchronizing without screen names.");
190      twitterStorage.synchronizeAccount(userId);  // there is no need to wait for the analysis tasks to complete
191    }else{
192      LOGGER.debug("Synchronizing with screen names.");
193      twitterStorage.synchronizeAccount(userId, Arrays.asList(screenName)); // there is no need to wait for the analysis tasks to complete
194    } // else
195  }
196
197  /**
198   * Process the response. After this method has finished, the response will not contain non-existent photos (if any were present).
199   * 
200   * @param response
201   * @throws IllegalArgumentException on bad data
202   */
203  public static void taskFinished(TwitterTaskResponse response) throws IllegalArgumentException {
204    Integer backendId = response.getBackendId();
205    if(backendId == null){
206      throw new IllegalArgumentException("Invalid backendId.");
207    }
208    Long taskId = response.getTaskId();
209    if(taskId == null){
210      throw new IllegalArgumentException("Invalid taskId.");
211    }
212
213    TwitterTaskDAO taskDAO = ServiceInitializer.getDAOHandler().getSQLDAO(TwitterTaskDAO.class);
214    BackendStatus backendStatus = taskDAO.getBackendStatus(backendId, taskId);
215    if(backendStatus == null){
216      LOGGER.warn("Backend, id: "+backendId+" returned results for task, not given to the backend. TaskId: "+taskId);
217      throw new IllegalArgumentException("This task is not given for backend, id: "+backendId);
218    }
219    
220    TaskStatus status = response.getStatus();
221    if(status == null){
222      LOGGER.warn("Task status not available.");
223      status = TaskStatus.UNKNOWN;
224    }
225    backendStatus.setStatus(status);
226
227    try{
228      PhotoList photoList = response.getPhotoList();
229      if(PhotoList.isEmpty(photoList)){
230        LOGGER.debug("No photo list returned by backend, id: "+backendId+", task, id: "+taskId);
231      }else{ // create media objects and associate
232        PhotoDAO pdao = ServiceInitializer.getDAOHandler().getSolrDAO(PhotoDAO.class);
233        List<String> foundGUIDs = PhotoList.getGUIDs(pdao.getPhotos(null, photoList.getGUIDs(), null, null, null));
234        if(foundGUIDs == null){
235          LOGGER.warn("None of the photos exist, will not process media objects for backend, id: "+backendId+" for task, id: "+taskId);
236          photoList = null; // prevents generation of feedback task for invalid content
237        }else{    
238          List<Photo> photos = photoList.getPhotos();
239          LOGGER.debug("New media objects for photos, photo count: "+photos.size()+", backend, id: "+backendId);
240          for(Iterator<Photo> iter = photos.iterator(); iter.hasNext();){
241            Photo photo = iter.next();
242            MediaObjectList mediaObjects = photo.getMediaObjects();
243            if(MediaObjectList.isEmpty(mediaObjects)){
244              LOGGER.warn("Ignored empty media object list for backend, id: "+backendId+" for task, id: "+taskId+", photo, GUID: "+photo.getGUID());
245              iter.remove();
246            }else if(!foundGUIDs.contains(photo.getGUID())){
247              LOGGER.warn("Ignored non-existing photo for backend, id: "+backendId+" for task, id: "+taskId+", photo, GUID: "+photo.getGUID());
248              iter.remove(); // remove to prevent association
249            }else if(!validate(mediaObjects, backendId, photo.getOwnerUserId()) || !insertOrUpdate(mediaObjects)){
250              backendStatus.setStatus(TaskStatus.ERROR);
251              throw new IllegalArgumentException("Invalid object list returned by backend, id: "+backendId+" for task, id: "+taskId);
252            }
253          } // for
254          pdao.associate(photoList);
255        } // else
256      }
257
258      MediaObjectList objects = response.getMediaObjects();
259      if(MediaObjectList.isEmpty(objects)){
260        LOGGER.debug("No media object list returned by backend, id: "+backendId+" for task, id: "+taskId);
261      }else if(!validate(objects, backendId, null) || !insertOrUpdate(objects)){
262        backendStatus.setStatus(TaskStatus.ERROR);
263        throw new IllegalArgumentException("Invalid object list returned by backend, id: "+backendId+" for task, id: "+taskId);
264      }else{
265        LOGGER.debug("New media objects: "+objects.getMediaObjects().size()+" backend, id: "+backendId);
266      }
267      
268      CAContentCore.scheduleBackendFeedback(backendId, photoList, taskId);
269    } finally {
270      taskDAO.updateTaskStatus(backendStatus, taskId);
271      ServiceInitializer.getEventHandler().publishEvent(new AsyncTaskEvent(backendId, TwitterSummarizationTask.class, status, taskId, TaskType.TWITTER_PROFILE_SUMMARIZATION));
272    }
273  }
274  
275  /**
276   * 
277   * @param mediaObjects non-null, non-empty validated object list
278   * @return true on success
279   */
280  private static boolean insertOrUpdate(MediaObjectList mediaObjects){
281    MediaObjectList updates = new MediaObjectList();
282    MediaObjectList inserts = new MediaObjectList();
283
284    for(MediaObject o : mediaObjects.getMediaObjects()){
285      if(StringUtils.isBlank(o.getMediaObjectId())){ // no media object id
286        inserts.addMediaObject(o);
287      }else{
288        updates.addMediaObject(o);
289      }
290    }
291    
292    PhotoDAO photoDAO = ServiceInitializer.getDAOHandler().getSolrDAO(PhotoDAO.class);
293    if(MediaObjectList.isEmpty(inserts)){
294      LOGGER.debug("Nothing to insert.");
295    }else if(!photoDAO.insert(inserts)){
296      LOGGER.warn("Failed to insert media objects.");
297      return false;
298    }
299
300    if(MediaObjectList.isEmpty(updates)){
301      LOGGER.debug("Nothing to update.");
302    }else if(!photoDAO.update(updates)){
303      LOGGER.warn("Failed to update media objects.");
304      return false;
305    }
306    return true;
307  }
308
309  /**
310   * Validate the given list of media objects, if confidence is missing, this method will automatically set it to default, rank will also be set to 0
311   * 
312   * This also set the correct serviceType and visibility (private, if not given), and resolves mediaObjectIds
313   * 
314   * @param mediaObjects non-empty and non-null list of objects
315   * @param backendId non-null id
316   * @param userId if null, the check will be ignored
317   * @return true if the given parameters were valid
318   */
319  private static boolean validate(MediaObjectList mediaObjects, Integer backendId, UserIdentity userId){
320    MediaObjectDAO vdao = ServiceInitializer.getDAOHandler().getSolrDAO(MediaObjectDAO.class);
321    vdao.resolveObjectIds(mediaObjects);
322    for(MediaObject object : mediaObjects.getMediaObjects()){
323      if(backendId != object.getBackendId()){
324        LOGGER.warn("Backend id mismatch.");
325        return false;
326      }else if(userId != null && !UserIdentity.equals(object.getOwnerUserId(), userId)){
327        LOGGER.warn("Media objects user identity does not match the given user identity.");
328        return false;
329      }
330      Integer rank = object.getRank();
331      if(rank == null){
332        object.setRank(DEFAULT_TAG_RANK);
333      }
334      Double confidence = object.getConfidence();
335      if(confidence == null){
336        object.setConfidence(DEFAULT_TAG_CONFIDENCE);
337      }
338      object.setServiceType(ServiceType.TWITTER_JAZZ);
339      if(object.getVisibility() == null){
340        object.setVisibility(Visibility.PRIVATE);
341      }
342    }
343    return true;
344  }
345}