Twitter Streaming API を使う上でのサンプルをメモ。
Streaming API は Search API 同様 OAuth 認証を使用していますので、Search API のサンプルとまとめてみました。
プログラムは、ふゆみけさんのウェブに記載されているものを参考にしました。
http://d.hatena.ne.jp/fuyumi3/20111205/1323087002
# -*- coding: utf-8 -*- import json import urllib2 import oauth2 TWITTER_CONSUMER_KEY = "********" TWITTER_CONSUMER_SECRET = "********" TWITTER_ACCESS_TOKEN_KEY = "********" TWITTER_ACCESS_TOKEN_SECRET = "********" # --------------------------------------------------------------------------- ## # class twitter_api(object): SEARCH = "https://api.twitter.com/1.1/search/tweets.json" STREAM_SAMPLE = "https://stream.twitter.com/1.1/statuses/sample.json" STREAM_FILTER = "https://stream.twitter.com/1.1/statuses/filter.json" # ----------------------------------------------------------------------- ## # def __init__(self, CONSUMER_KEY, CONSUMER_SEC, ACCESS_TOKEN_KEY, ACCESS_TOKEN_SEC): self.m_dictConsumer = { "key": CONSUMER_KEY, "secret": CONSUMER_SEC } self.m_dictToken = { "key": ACCESS_TOKEN_KEY, "secret": ACCESS_TOKEN_SEC } # ----------------------------------------------------------------------- ## # def build_request(self, strURL, dictParam): # 引数に key と secret を渡す oCConsumer = oauth2.Consumer(**self.m_dictConsumer) oCToken = oauth2.Token(**self.m_dictToken) oCOAuthRequest = oauth2.Request.from_consumer_and_token( oCConsumer, oCToken, http_url=strURL, parameters=dictParam ) oCOAuthRequest.sign_request( oauth2.SignatureMethod_HMAC_SHA1(), oCConsumer, oCToken ) oCRequest = urllib2.urlopen(oCOAuthRequest.to_url()) return(oCRequest) # =========================================================================== ## # def main(): oCTWApi = twitter_api( TWITTER_CONSUMER_KEY, TWITTER_CONSUMER_SECRET, TWITTER_ACCESS_TOKEN_KEY, TWITTER_ACCESS_TOKEN_SECRET ) print "Twitter Search API" # Twitter Search API # https://dev.twitter.com/rest/reference/get/search/tweets oCReq = oCTWApi.build_request( twitter_api.SEARCH, { "q": urllib2.quote("book") } ) for r in oCReq: dictData = json.loads(r) print "----" for dictTweet in dictData["statuses"]: print dictTweet["user"]["id"], dictTweet["text"].encode("utf-8") print "Twitter Streaming API" # Twitter Stream API # https://dev.twitter.com/streaming/overview oCReq = oCTWApi.build_request( twitter_api.STREAM_FILTER, { "track": urllib2.quote("book") } ) for r in oCReq: dictData = json.loads(r) print "----" print dictData["user"]["id"], dictData["text"].encode("utf-8") if(__name__ == "__main__"): main()
上記のコードを実行するには、利用するPython環境にoauth2がインストールされている必要があります。
また、Twitter API を利用可能にしておく必要があります。