Package net.fortytwo.twitlogic.persistence

Source Code of net.fortytwo.twitlogic.persistence.TweetPersister

package net.fortytwo.twitlogic.persistence;

import net.fortytwo.twitlogic.TwitLogic;
import net.fortytwo.twitlogic.flow.Handler;
import net.fortytwo.twitlogic.model.Dollartag;
import net.fortytwo.twitlogic.model.Hashtag;
import net.fortytwo.twitlogic.model.Person;
import net.fortytwo.twitlogic.model.PlainLiteral;
import net.fortytwo.twitlogic.model.Resource;
import net.fortytwo.twitlogic.model.Triple;
import net.fortytwo.twitlogic.model.Tweet;
import net.fortytwo.twitlogic.model.TypedLiteral;
import net.fortytwo.twitlogic.model.URIReference;
import net.fortytwo.twitlogic.model.User;
import net.fortytwo.twitlogic.persistence.beans.Feature;
import net.fortytwo.twitlogic.persistence.beans.MicroblogPost;
import net.fortytwo.twitlogic.persistence.beans.Point;
import net.fortytwo.twitlogic.persistence.beans.SpatialThing;
import net.fortytwo.twitlogic.services.twitter.HandlerException;
import net.fortytwo.twitlogic.services.twitter.TwitterClient;
import net.fortytwo.twitlogic.services.twitter.TwitterClientException;
import net.fortytwo.twitlogic.util.properties.PropertyException;
import org.openrdf.concepts.owl.Thing;
import org.openrdf.model.Literal;
import org.openrdf.model.Statement;
import org.openrdf.model.URI;
import org.openrdf.model.Value;
import org.openrdf.model.ValueFactory;
import org.openrdf.sail.SailException;

import javax.xml.namespace.QName;
import java.util.Set;
import java.util.logging.Logger;

/**
* @author Joshua Shinavier (http://fortytwo.net).
*/
public class TweetPersister implements Handler<Tweet> {
    private static final Logger LOGGER = TwitLogic.getLogger(TweetPersister.class);

    private final TweetStoreConnection storeConnection;
    private final ValueFactory valueFactory;
    private final PersistenceContext persistenceContext;
    private final PlacePersistenceHelper placeHelper;

    /**
     * @param store  the knowledge base
     * @param client an optional Twitter client for building the Places hierarchy
     * @throws TweetStoreException if something goes awry
     */
    public TweetPersister(final TweetStore store,
                          final TwitterClient client) throws TweetStoreException, TwitterClientException {
        this.storeConnection = store.createConnection();
        this.valueFactory = store.getSail().getValueFactory();
        try {
            this.persistenceContext = new PersistenceContext(storeConnection);
        } catch (PropertyException e) {
            throw new TweetStoreException(e);
        }
        this.placeHelper = null == client
                ? null
                : new PlacePersistenceHelper(persistenceContext, client);
    }

    public void close() throws TweetStoreException {
        storeConnection.close();
    }

    public boolean isOpen() {
        return true;
    }

    // Note: synchronization may be necessary for use with Twitter4j
    public synchronized void handle(final Tweet tweet) throws HandlerException {
        LOGGER.fine(tweet.describe());

        storeConnection.getElmoManager().getTransaction().begin();
        boolean finished = false;

        try {
            // begin Elmo operations

            if (null != placeHelper) {
                // Since Elmo is not thread-safe, Elmo operations to be carried out by
                // placeHelper are queued until they can be executed here, in the main
                // transaction.
                placeHelper.flush();
            }

            boolean hasAnnotations = 0 < tweet.getAnnotations().size();

            MicroblogPost currentMicroblogPost = persistenceContext.persist(tweet, hasAnnotations);

            if (null != tweet.getUser()) {
                persistenceContext.persist(tweet.getUser());
            } else {
                LOGGER.fine("null user for tweet: " + tweet);
            }

            if (null != tweet.getGeo()) {
                Point p = persistenceContext.persist(tweet.getGeo());

                Set<SpatialThing> s = currentMicroblogPost.getLocation();
                s.add(p);
                currentMicroblogPost.setLocation(s);
            }

            if (null != tweet.getPlace()) {
                Feature f = persistenceContext.persist(tweet.getPlace());
                if (null != placeHelper) {
                    try {
                        placeHelper.submit(tweet.getPlace(), f);
                    } catch (TwitterClientException e) {
                        throw new HandlerException(e);
                    }
                }

                Set<SpatialThing> s = currentMicroblogPost.getLocation();
                s.add(f);
                currentMicroblogPost.setLocation(s);
            }

            // end Elmo operations

            try {
                storeConnection.commit();
            } catch (TweetStoreException e) {
                throw new HandlerException(e);
            }

            // Note: we assume that Twitter and any other services which supply these posts will not allow a cycle
            // of replies and/or retweets.
            // Note: these tweets are persisted in their own transactions.
            if (null != tweet.getInReplyToTweet()) {
                this.handle(tweet.getInReplyToTweet());
            }
            if (null != tweet.getRetweetOf()) {
                this.handle(tweet.getRetweetOf());
            }

            //System.out.println("    ...ending transaction");

            // Note: these Sail operations are performed outside of the Elmo transaction.  If they were to be
            // carried out inside the transaction, apparently Sesame would kill the thread without throwing
            // an exception or logging an error.
            if (hasAnnotations) {
                for (Triple triple : tweet.getAnnotations()) {
                    System.out.println("\t (" + triple.getWeight() + ")\t" + triple);
                    Statement st;

                    try {
                        st = toRDF(triple, uriOf(currentMicroblogPost.getEmbedsKnowledge()));
                    } catch (TwitterClientException e) {
                        throw new HandlerException(e);
                    }

                    if (null != st) {
                        // FIXME: creating a statement and then breaking it into parts is wasty
                        try {
                            //System.out.println("subject: " + st.getSubject());
                            //System.out.println("predicate: " + st.getPredicate());
                            //System.out.println("object: " + st.getObject());
                            //System.out.println("context: " + st.getContext());
                            storeConnection.getSailConnection()
                                    .addStatement(st.getSubject(), st.getPredicate(), st.getObject(), st.getContext());
                        } catch (SailException e) {
                            throw new HandlerException(e);
                        }
                    }
                }

                try {
                    storeConnection.getSailConnection().commit();
                } catch (SailException e) {
                    throw new HandlerException(e);
                }
            }

            finished = true;
        } catch (HandlerException e) {
            throw e;
        } catch (Throwable t) {
            throw new HandlerException(t);
        } finally {
            if (!finished) {
                try {
                    LOGGER.info("rolling back failed transaction");
                    storeConnection.rollback();
                } catch (TweetStoreException e) {
                    throw new HandlerException(e);
                }
            }
        }
    }

    private Statement toRDF(final Triple triple,
                            final org.openrdf.model.Resource graph) throws TwitterClientException {
        Value subject = toRDF(triple.getSubject());
        Value predicate = toRDF(triple.getPredicate());
        Value object = toRDF(triple.getObject());

        if (!(subject instanceof org.openrdf.model.Resource)) {
            LOGGER.warning("triple subject is not a subject resource: " + triple.getSubject());
            return null;
        } else if (!(predicate instanceof URI)) {
            LOGGER.warning("triple predicate is not a predicate resource: " + triple.getPredicate());
            return null;
        } else {
            return valueFactory.createStatement((org.openrdf.model.Resource) subject, (URI) predicate, object, graph);
        }
    }

    private Value toRDF(final Resource resource) throws TwitterClientException {
        switch (resource.getType()) {
            case DOLLARTAG:
                return valueOf((Dollartag) resource);
            case HASHTAG:
                return valueOf((Hashtag) resource);
            case PLAIN_LITERAL:
                return valueOf((PlainLiteral) resource);
            case TWEET:
                return valueOf((Tweet) resource);
            case TYPED_LITERAL:
                return valueOf((TypedLiteral) resource);
            case URI_REFERENCE:
                return valueOf((URIReference) resource);
            case USER:
                return valueOf((User) resource);
            case PERSON:
                return valueOf((Person) resource);
            default:
                throw new IllegalStateException("unhandled resource type: " + resource.getType());
        }
    }

    private URI valueOf(final Dollartag tag) {
        return uriOf(persistenceContext.persist(tag));
    }

    private URI valueOf(final Hashtag hashtag) {
        return uriOf(persistenceContext.persist(hashtag));
    }

    private Literal valueOf(final PlainLiteral literal) {
        return valueFactory.createLiteral(literal.getLabel());
    }

    private URI valueOf(final Tweet tweet) {
        return uriOf(persistenceContext.persist(tweet, 0 < tweet.getAnnotations().size()));
    }

    private Literal valueOf(final TypedLiteral literal) {
        return valueFactory.createLiteral(literal.getLabel(),
                valueFactory.createURI(literal.getDatatype()));
    }

    private URI valueOf(final URIReference uri) throws TwitterClientException {
        //String nonRedirecting = httpClient.resolve301Redirection(uri.getValue());
        //return valueFactory.createURI(nonRedirecting);

        // TODO: consider bringing 301 resolution back.
        return valueFactory.createURI(uri.getValue());
    }

    private URI valueOf(final User user) throws TwitterClientException {
        return uriOf(persistenceContext.persist(user));
    }

    private URI valueOf(final Person person) throws TwitterClientException {
        return uriOf(persistenceContext.persist(person));
    }

    private URI uriOf(final Thing thing) {
        QName q = thing.getQName();
        return valueFactory.createURI(q.getNamespaceURI() + q.getLocalPart());
    }

}
TOP

Related Classes of net.fortytwo.twitlogic.persistence.TweetPersister

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.