2010年4月26日月曜日

Twitter Streaming APIをJavaで試してみる


2011年10月05日追記:
Streaming APIがHTTPS対応となったので,若干変更が必要です.
詳しくは,こちらのバージョンをご覧ください.

2010年12月24日追記:
Twitter4jを使ったバージョンを作ったので,そちらを使った方が良いと思われます.


TwitterのStreamingAPIにがぜん興味が出てきたので,Javaで実装してみた.
package twitter;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.UnsupportedEncodingException;
import java.net.Authenticator;
import java.net.HttpURLConnection;
import java.net.PasswordAuthentication;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

/**
 * StreamAPIから送られてくるStreamデータをひたすら取得し続けるクラス
 * @author tori
 *
 */
public class TwitterStreamReader {


 /**
  * 使い方の例
  * @param args
  * @throws UnsupportedEncodingException 
  */
 public static void main(String[] args) throws UnsupportedEncodingException {
  
  TwitterStreamReader tsr = new TwitterStreamReader("Twitter-UserName", "Password");
  tsr.start();
  
  while(true){
   List<String> tweetList = tsr.readStoredList();
   List<String> errList = tsr.readErrLogList();
   
   for(String text:tweetList){
    System.out.println(text);
   }
   
   for(String err:errList){
    System.err.println(err);
   }
  }
 }
 
 /**
  * sampleのタイムライン取得のためのURL
  */
 private static final String DEFAULT_API_URL = "http://stream.twitter.com/1/statuses/sample.json";

 String userName;
 String password;

 /**
  * 強制終了させられたかどうか
  */
 boolean isForceStop;

 /**
  * 取得しているかどうか
  */
 boolean isRunning;
 
 /**
  * Synchlonizedされたリスト
  */
 List<String> dataList;

 /**
  * Synchlonizedされたリスト
  */
 List<String> errLogList;

 /**
  * apiのURL
  */
 String apiUrl = DEFAULT_API_URL;

 /**
  * ストリームを読み続けるためのスレッド
  */
 private Thread streamReadThread;
 
 public TwitterStreamReader(String userName, String password) {
  super();
  this.userName = userName;
  this.password = password;

  dataList = Collections.synchronizedList(new ArrayList<String>());
  errLogList = Collections.synchronizedList(new ArrayList<String>());
  
 }

 /**
  * 読み込みを開始する
  */
 public void start(){
  StreamReader streamReader = new StreamReader();
  
  streamReadThread = new Thread(streamReader);
  streamReadThread.start();
 }

 /**
  * 取得を停止する<br>
  */
 public void stop() {
  this.isForceStop = true;
 }
 
 /**
  * データ取得中かどうかを返す
  * @return
  */
 public boolean isRunning(){
  return streamReadThread.isAlive();
 }
 
 /**
  * 取得済みのTweetデータのリストを取得する<br>
  * これまでに読み込んだデータは削除される
  * @return 是までに読み込んだデータ
  */
 public List<String> readStoredList(){
  List<String> storedList;
  synchronized (dataList) {
   storedList = new ArrayList<String>(dataList);
   dataList.clear();
  }

  return storedList;
 }
 
 /**
  * エラーログのリストを取得する<br>
  * これまでに読み込んだデータは削除される
  * @return 是までに読み込んだデータ
  */
 public List<String> readErrLogList(){
  List<String> storedList;
  synchronized (errLogList) {
   storedList = new ArrayList<String>(errLogList);
   errLogList.clear();
  }
  return storedList;
 }

 /**
  * 強制停止信号を送ったかどうか
  * @return the isForceStop
  */
 public boolean isForceStop() {
  return isForceStop;
 }
 
 /**
  * データ読み込み用クラス
  * @author tori
  *
  */
 class StreamReader implements Runnable{
  @Override
  public void run() {
   isForceStop = false;
   while(!isForceStop){
    InputStreamReader isr = null;
    BufferedReader br = null;
    try{
     URL connectUrl = new URL(apiUrl);
     HttpURLConnection con = (HttpURLConnection)connectUrl.openConnection();
     
     con.setRequestMethod("GET");
     con.setDoOutput(true);
     con.setInstanceFollowRedirects(true); 

     Authenticator auth = new Authenticator(){
      public PasswordAuthentication getPasswordAuthentication(){
       return new PasswordAuthentication(userName, password.toCharArray());
      }
     };
     Authenticator.setDefault(auth);
     

     isr = new InputStreamReader(con.getInputStream(), "UTF8");
     br = new BufferedReader(isr);
     while(!isForceStop){
      String tweet = br.readLine();
      synchronized (dataList) {
       dataList.add(tweet);
      }
     }
    }catch(Exception e){
     StringBuffer buf = new StringBuffer();
     buf.append(e.getClass().getName()+"\n");
     for(StackTraceElement ste:e.getStackTrace()){
      buf.append(String.format("\tat %s.%s(%s:%d)\n", ste.getClassName(), ste.getMethodName(), ste.getFileName(), ste.getLineNumber()));
     }
     synchronized (errLogList) {
      errLogList.add(buf.toString());
     }
    }finally{
     try{
      br.close();
     }catch(Exception e){
     }
     try{
      isr.close();
     }catch(Exception e){
     }
    }
   }
  }
 }
}


参考にしたのは,
この辺.

どんどん取ってくるためにThread化したけど,使いたい人は好きに作り直せばいいと思う.

0 件のコメント: