LEDネームバッジの制御方法

趣味TECH祭2019(夏)にて、LEDネームバッジを購入したので、取り扱い方法についてメモを残してみたり。

同類品が幾つか存在している様なのですが、自分はこちらのプログラムで制御出来ました。

Led-Bedge-44×11

https://github.com/jnweiger/led-name-badge-ls32

44×11というのは、LEDのサイズが横44、縦11ドットという意味です。

動作確認はUbuntu 16.04 LTS上のPython3で行いました。

ライブラリの導入は、README.mdに記載されている内容そのままです。

sudo pip install pyhidapi
sudo pip install pillow
sudo apt-get install libhidapi-hidraw0
sudo ln -s /usr/lib/x86_64-linux-gnu/libhidapi-hidraw.so.0 /usr/local/lib/

基本的な使用法は、以下の様になります。

# 例)
sudo python3 ./led-badge-11x44.py "TEST"

送信が完了するとネームバッジの電源が落ちますので、ケーブルの抜き差しをして再度電源を入れてやります。

標準状態では半角英数しか利用できませんが、PNG画像を指定することで任意の二値画像を表示させることが出来ます。

今回は値段を表示させたかったので、以下の様な画像を用意しました。白が点灯する場所となります。

# 例)
sudo python3 ./led-badge-11x44.py ":image.png:"

フォルダ内の重複画像を取り除きたい(2)

NNC Utilsで指定可能なハッシュ方法の説明を追記しておきます。

Hash方法

Average hash

画像を縮小した後に全画素の平均値を取得します。縮小画像の画素が求めた平均値よりも明るい(1)か暗い(0)かを元にハッシュ値を生成します。

画像の全体的な輝度や若干の色調補正が加わっていても、同じ画像と見なすことが出来ますが、縮小した結果が似たような画像だと適切な区別ができなくなります。

Perceptual hash

画像を縮小した後にDCT(離散コサイン変換)をかけます。求めた結果から低周波成分を取り出し、中央値を取得します。高周波成分を取り除くという事は、画像からノイズ(小さな差異)を取り除くという事になります。

求めた低周波成分領域から中央値よりも大きい(1)か小さい(0)かを元にハッシュ値を求めます。

一旦周波数に変換すると、直接の画素的な位置情報が失われてしまいますが、全体的に似ている(同じ様な周波数を含んでいるか)かどうかを区別する際に使用します。

Perceptual hash (simple)

Perceptual hashと同じですが、DCTの計算を一回、中央値を使わずに平均値を使用しています。

Difference hash

画像を縮小した後に、左右の画素同士を比較して、どちらが明るいかを求めます。

Average hashは画素全体の輝度から平均を求めるため、細かな差異が埋もれてしまう場合がありますが、こちらを使用するとうまく拾い出せる場合があります。

画像の一部分部分だけが異なっているものを区別したい場合に有効ですが、比率が異なる画像を違うものとみなしてしまう場合や、ノイズを有効な画素と見なしてしまう場合があります。

ある画像と同じ画像をJPEGで圧縮して劣化させた画像を同じものと見なしたい場合には良い結果が得られません。

Difference hash (vertical)

Difference hashと同じですが、画像比較を上下で行います。

Wavelet hash

画像を縮小した後に輝度情報を0.0〜1.0の範囲に変換し、wavelet変換(haar)を行います。変換結果から低周波成分の結果を取得し、中央値を求めます。

求めた低周波成分領域から中央値よりも大きい(1)か小さい(0)かを元にハッシュ値を求めます。

Perceptual hashと似たような処理ですが、waveletはDCTに比べてより適切な分解を行うことが出来るため、Perceptualよりも適切な区別が期待できます。

imagehashが使用できるアルゴリズムで最も重い処理となります。

.

ニューラルネットワークで要素抽出

Autoencoder

ニューラルネットワークは画像を認識させる際に、元情報よりも低次元で表現するようにネットワークを構成することが出来ます。

そのような低次元で表現された空間には、学習に使用した情報の要素が詰まっているとも言えます。

要素が詰まっているのですから、その情報を抜き出して再び画像にすれば、ニューラルネットワークが思っている物の片鱗を見ることが出来る…かもしれません。

技法的にはオートエンコーダと呼ばれるものなのですが、画像認識に使用しているヴィヴィオ達の画像を使用して計算させてみました。

Autoencoderを通したヴィヴィオ達

結果は以下のような感じです。

   

多分、リリカルなのはVividをよく知っている人であれば、誰か答えられるのではないかと思います。

今回は100次元で表現してみましたが、多分もっと少ない要素で表現出来そうで、ヴィヴィオ達は見た目だけでもかなり分類しやすそうに感じます。(…ってプロの方がそうなる様にデザインされているわけですから、そりゃそうなりますね。)

次元数を下げるほどぼやけた画像になっていき、極端に下げると「黄色っぽい」「みどりっぽい」だけの画像になります。

学習結果の応用

この学習は、

  • 「ヴィヴィオの絵」 + 「ヴィヴィオ」
  • 「アインハルトの絵」 + 「アインハルト」
  • 「リオの絵」 + 「リオ」
  • 「コロナの絵」 + 「コロナ」

という入力で学習させましたので、以下の様な入力を受け付けます。

  • 「ヴィヴィオの絵」 + 「アインハルト」
  • 「写真」 + 「ヴィヴィオ」

これを使えば、入力された画像をヴィヴィオっぽくしたり、アインハルトっぽくしたりする事が出来そうです。

試しに自分の絵を使って実行してみたものがこちら。

  •  =  + ヴィヴィオ
  •  =  + アインハルト
  •  =  + リオ
  •  =  + コロナ

…心の目で見ればなっているような、なっていないような。

今回やったことを図にすると、以下の様になります。

 学習させます。

 学習結果とヒントを使い、絵を加工。

Azure上でnnablaを動かしてみました

現在使用しているAzure環境にnnablaをインストールして、ウェブアプリを作成してみました。

nnablaはSONY Neural Network Consoleでも使用されているもので、比較的簡単にニューラルネットワークを利用出来るのですけど、学習した結果をどう使うかがなかったので、サンプルを兼ねて作ってみました。

VE Detect

https://www.mizunagi-works.com/gadgets/ve_detect/index.html

 しょっぱなから誤判定するデータを与えていたりして

言い訳っぽいのですけど、これは学習データが少ないのと「アインハルトかヴィヴィオのどちらか」を判定させるように学習させた結果となります。

例えばおにぎりの画像を与えたときでも必ずどちらかという判定します。(どちらでもないという回答を出すようには学習させていません)

Azureへの導入

ニューラルネットワークというと、強力なGPUを使用する負荷の高いアプリケーションに感じられるかもしれません。

確かに計算量は多いのですが、処理が重たいのは学習をする時で、学習済み結果を利用するのはそんなに重くありません。

(今回作成したニューラルネットワークの構成が単純というのもあります)

自分が使用しているAzure上のVMはA1 Basicでかなり非力(下から2番目)なのですが、学習結果を利用するにあたってはそこまで負荷はありませんでした。

SONY NNCによるネットワークの作成と学習

こんなネットワークにしてみました。

10,000回ほど学習させました。

プログラムの作成

学習が完了すると、学習結果が保存されたフォルダにparameters.h5というファイルが生成されています。

学習済みモデルを利用するにはこのh5ファイルと、NNCからエクスポートしたPythonスクリプトが必要となります。

スクリプトはちょっと長めですのでGitHubに置いておきました。

bottleを使用していますので、ローカルでこっそり楽しむことが可能です。

.

Neural Network Libraryで遊んでみる

SONYのNeural Network Libraryを使ってみてのメモとか。

NNLで画像を読み込みたい場合

モデルに画像を読み込ませるには、nnabla.utils.data_source_loaderモジュールで読み込ませる事が出来ます。

# -*- coding: utf-8 -*-
import PIL.Image
import nnabla.utils.data_source_loader as nn_dsl

X_SIZE = 28
Y_SIZE = 28

# shapeに指定したサイズにリサイズされて読み込まれます。
# paddingはされません。
np_img = nn_dsl.load_image(
    "testdata0.png",
    shape=(3, X_SIZE, Y_SIZE),
    normalize=False
)

# 読み込んだ画像のshapeのままだと、Pillowで利用出来ませんので
# Pillowで活用したい場合は以下のようにします。

np_pil = np_img.transpose(1, 2, 0)
imp_pil = PIL.Image.fromarray(np_pil)
img_pil.save("testdata1.png")

Pillowで読み込んだ画像をNNLで扱える構造にしたい場合

numpyモジュールを使います。

# -*- coding: utf-8 -*-

import numpy as np
import PIL.Image

X_SIZE = 28
Y_SIZE = 28

img_pil = PIL.Image.open(pathname)
img_pil.thumbnail((X_SIZE, Y_SIZE))

img_target = PIL.Image.new("RGB", (X_SIZE, Y_SIZE))

x_src, y_src = o_image.size
x_pos = (X_SIZE - x_src) >> 1
y_pos = (Y_SIZE - y_src) >> 1

img_target.paste(img_pil, (x_pos, y_pos))

data = img_target.getdata()

np_img = np.array(data, dtype=np.float32)
np_img = np_img.reshape((X_SIZE, Y_SIZE, 3))
np_img = np_img.transpose(2, 0, 1)

# normalizeする場合は、 np_img /= 255.0

 

ISO-2022-JP-MSからCP932への変換

使用する機械はあまりなさそうでうけど、ISO-2022-JP-MS(JIS)からCP932(SHIFT-JIS)へ変換するコードを、以下のサイトに掲載されているプログラムを移植してみました。

http://www.geocities.jp/hoku_hoshi/TIPPRG/tipprg03.html

プログラム本体

# -*- coding: utf-8 -*-
"""
"""
# ------------------------------------------------------------------ import(s)
import sys
import array

# --------------------------------------------------------------- exception(s)
class CConvertError(Exception):
    """
    想定範囲外のJISコードエスケープを検出
    """
    pass

# ---------------------------------------------------------------- function(s)
# ============================================================================
# 0x1B([ESC]), 0x24($), 0x40(@)
# 0x1B([ESC]), 0x24($), 0x42(B)
# 0x1B([ESC]), 0x26(&), 0x40(@)
# 0x1B([ESC]), 0x28((), 0x42(B)
# 0x1B([ESC]), 0x28((), 0x4A(J)
# 0x1B([ESC]), 0x28((), 0x49(I)
def jis_to_sjis(jis_buffer):
    """
    与えられたバイト配列をJISコード文字列とみなして、SHIFT-JISへの変換を行います。

    args:
        jis_buffer (str):
            バイト配列

    returns:
        str: SHIFT-JIS文字列を戻します。
    """

    ary_src = array.array("B", jis_buffer)
    ary_dst = array.array("B")

    str_result = []

    n = 0
    kanji_mode = False
    while n < len(ary_src):
        c0 = ary_src[n]
        n += 1
        if c0 == 0x1B:
            c1 = ary_src[n]
            n += 1
            c2 = ary_src[n]
            n += 1
            if c1 == 0x24 and c2 in (0x40, 0x42):
                kanji_mode = True
            elif c1 == 0x26 and c2 == 0x40:
                kanji_mode = True
            elif c1 == 0x28 and c2 in (0x42, 0x49, 0x4A):
                kanji_mode = False
            else:
                raise CConvertError()
            continue

        if kanji_mode is True:
            c1 = ary_src[n]
            n += 1

            if c0 < 0x5F:
                irow_offset = 0x70
            else:
                irow_offset = 0xB0

            if c0 & 0x01:
                if c1 > 0x5F:
                    icell_offset = 0x20
                else:
                    icell_offset = 0x1F
            else:
                icell_offset = 0x7E

            ary_dst.append(((c0 + 1) >> 1) + irow_offset)
            ary_dst.append(c1 + icell_offset)

        else:
            ary_dst.append(c0)

    return ary_dst.tostring()


if __name__ == "__main__":

    with open("testdata.jis", "rb") as h_reader, open("testdata.conv", "wb") as h_writer:
        code_jis = h_reader.read()
        code_sjis = jis_to_sjis(code_jis)
        h_writer.write(code_sjis)
        print("cp932", code_sjis.decode("cp932"))

# [EOF]

テストデータ

ABCDE
あいうえお
FGHIJ
①②③④⑤
KLMNO
㈱㈲δ㌔㌧

テスト方法

上記の様なテストデータをUTF-8で作成して、iconvやnkfを使用してjis形式のファイルに変換します。

iconvの場合

iconv -f UTF-8 -t ISO-2022-JP-MS testdata.utf8 > testdata.jis
iconv -f UTF-8 -t CP932 > testdata.utf8 > testdata.cp932

nkfの場合

nkf -Wj testdata.utf8 > testdata.jis
nkf -Ws testdata.utf8 > testdata.cp932

.

Neural Network Consoleでキャラ判定

SONYが公開しているNeural Network Console を使用して、キャラクター判定をさせてみました。

NNCはWindows用なのですが、学習させたモデルの利用はMac(Anaconda + Python 3.6)でも動作しました。

ヴィヴィオとアインハルトのみですので適当に答えても50%の確率なので、あまり面白みはないけど…

Neural Network Console

https://dl.sony.com/ja/

SONYが公開しているニューラルネットワークを作成するためのGUI環境です。

今回は以下のようなネットワークを作成してみました。

 ネットワーク構成

 学習状況(2000Epoch)

画像データはBingの画像検索を使用してヴィヴィオとアインハルトの画像を収集しました。

画像については以下のルールで生成しています。

  • 変身前か変身後は関係なくキャラクター毎に30枚程度を用意
  • 描かれているキャラクターは一人だけ(背景はあってもなくてもよい)
  • 画像サイズは64×64に縮小(パディングは0(黒)で行う)

画像はかなり少ないのですが2000Epoch程学習をさせて、評価を行います。

評価方法

NNC上でも評価出来るのですが、生成したモデルをPythonやC++から使用することが出来ます。

今回は以下のようなコードを生成しました。

# -*- coding: utf-8 -*-
import sys

import nnabla as nn
import nnabla.utils.data_source_loader as U_dsl
import nnabla.functions as F
import nnabla.parametric_functions as PF

# Neural Network Console から出力した Python コード
# ---- ここから ----
def network(x, y, test=False):
    # Input -> 3,64,64
    # Convolution -> 16,31,31
    with nn.parameter_scope('Convolution'):
        h = PF.convolution(x, 16, (3,3), (0,0), (2,2))
    # ReLU
    h = F.relu(h, True)
    # MaxPooling -> 16,16,16
    h = F.max_pooling(h, (2,2), (2,2))
    # Dropout_2
    if not test:
        h = F.dropout(h, 0.2)
    # Convolution_2 -> 27,7,7
    with nn.parameter_scope('Convolution_2'):
        h = PF.convolution(h, 27, (3,3), (0,0), (2,2))
    # ReLU_5
    h = F.relu(h, True)
    # Convolution_3 -> 16,3,3
    with nn.parameter_scope('Convolution_3'):
        h = PF.convolution(h, 16, (3,3), (0,0), (2,2))
    # ReLU_3
    h = F.relu(h, True)
    # Dropout
    if not test:
        h = F.dropout(h)
    # Affine -> 20
    with nn.parameter_scope('Affine'):
        h = PF.affine(h, (20,))
    # ReLU_4
    h = F.relu(h, True)
    # Affine_2 -> 10
    with nn.parameter_scope('Affine_2'):
        h = PF.affine(h, (10,))
    # ReLU_2
    h = F.relu(h, True)
    # Affine_4 -> 1
    with nn.parameter_scope('Affine_4'):
        h = PF.affine(h, (1,))
    # SquaredError
    # ここはコメントアウト
    # h = F.squared_error(h, y)

    return h
# ---- ここまで ---- 

def main():

    nn.clear_parameters()
    nn.load_parameters("parameters.h5")

    x = nn.Variable(shape=(1, 3, 64, 64))
    y = network(x, None)

    x.d = U_dsl.load_image(sys.argv[1], shape=(3, 64, 64), normalize=True)

    y.forward()

    print(y.d)

if __name__ == "__main__":
    main()

# [EOF]

学習結果はフォルダにHDF5形式で格納されていますので、そのまま使用することが出来ます。

ソースコードもNNCが生成したものをそのまま使用可能となっています。

評価に使用するデータ

画像は自分が描いた2枚の画像を使用してみました。

 ヴィヴィオ( vivio = 1 )

 アインハルト( einhald = 0 )

実行結果

 0.978

 0.042

0に近いほどアインハルト、1に近いほどヴィヴィオという判定が出るように学習させましたので、どうやら正しく判定してくれたようです。

自分以外の絵でも試してみましたが、正答率は80%ぐらいでした。

.

文書の類似度判定(3)

データベースに文書を保存するまでが出来たので、やっと本題の類似度判定を。

データベースから文書を取り出して学習と判定を行う

# -*- coding: utf-8 -*-
# ------------------------------------------------------------------ import(s)
import sys
import os
import sqlite3
import gensim.models.doc2vec
import gensim.models.doc2vec


# ------------------------------------------------------------------- const(s)
# ------------------------------------------------------------------- class(s)
# ---------------------------------------------------------------- function(s)
def main():

    list_document = []

    o_conn = sqlite3.connect("dccol.db")
    o_cursor = o_conn.cursor()

    o_cursor.execute(
        """
        SELECT
            id_doc
        ,   word_pos
        ,   word_surface
        ,   word_feature
        FROM
            doc_collection
        WHERE
            LENGTH(word_surface) > 1
            AND
            word_feature = '名詞'
        ORDER BY
            id_doc, word_pos
        """
    )

    dict_document = {}

    for r in o_cursor.fetchall():
        id_doc = r[0]
        word_pos = r[1]
        word_surface = r[2]
        word_feature = r[3]

        if id_doc not in dict_document:
            dict_document[id_doc] = []
        dict_document[id_doc].append(word_surface.encode("utf-8"))

    list_tagged_document = []

    for id_doc, list_word in dict_document.items():

        with open(id_doc + ".wakati", "w") as h_writer:
            h_writer.write(" ".join(list_word))

        o_doc = gensim.models.doc2vec.TaggedDocument(
            words=list_word,
            tags=[id_doc]
        )

        list_tagged_document.append(o_doc)

    dvec_model = gensim.models.doc2vec.Doc2Vec(
        documents=list_tagged_document,
        min_count=1
    )

    list_word = ["ヴィヴィオ", "アインハルト","コロナ","リオ"]
    for similar_word in list_word:
        print u"指定した単語に関連するもの", similar_word.decode("utf-8")
        for r in dvec_model.wv.most_similar(positive=[similar_word]):
            print "\t", r[0].decode("utf-8"), r[1]
        print

    similar_doc = os.path.join("doc_source", "doc1.txt")
    print u"指定した文書に近いもの", similar_doc
    for r in dvec_model.docvecs.most_similar(positive=[similar_doc]):
        print "\t", r[0], r[1]
    print

if __name__ == "__main__":
    main()

# [EOF]

やっていることは、データベースから二文字以上の名詞を取り出して学習用データを生成して、与えたキーワード、文書に似ている(近いベクトルのもの)を取得しています。

実行すると以下のような結果が表示されます。

指定した単語に関連するもの ヴィヴィオ
        スタイル 0.404163241386
        状態 0.396886348724
        ため 0.391772806644
        アインハルト 0.378925800323
        リンネ・ベルリネッタ 0.374079525471
        結果 0.3656296134
        意味合い 0.356572329998
        デバイス 0.350234866142
        雪辱 0.344531387091
        兵器 0.343862652779

学習させた文書は、Pixiv百科事典からキャラクター名の項目をテキスト化したものを与えています。

ヴィヴィオに関連するものとして、

  • アインハルト
  • リンネ・ベルリネッタ

といったキーワードが出力されています。

与えた文書量が少なく、文書自体も短いためなんとなくうまくいっているようないないような。

文書の類似度判定(2)

まずはjanomeを使用して形態素解析を行います。

カスタム辞書を指定していますが、指定しなくても動作します。

janomeによる形態素解析

# -*- coding: utf-8 -*-
# http://www.mizunagi-works.com
# ------------------------------------------------------------------ import(s)
import sys
import os

import janome.tokenizer

# ------------------------------------------------------------------- const(s)
DB_FILENAME = "dccol.db"
CUSTOM_DIC = "./dic/nanoha_keyword.dic"
RAW_TEXT_DIR = "doc_source"


# ------------------------------------------------------------------- class(s)
# ---------------------------------------------------------------- function(s)
def main():

    o_tagger = janome.tokenizer.Tokenizer(CUSTOM_DIC)

    # 指定したフォルダ内の .txt 拡張子のファイルを読み込み
    for dirname, _, list_filename in os.walk(RAW_TEXT_DIR):
        for filename in list_filename:
            if os.path.splitext(filename)[1].lower() not in (".txt",):
                continue

            pathname = os.path.join(dirname, filename)

            with open(pathname, "r") as h_reader:

                # utf-8 to unicode
                raw_u_text = h_reader.read().decode("utf-8")

                for pos, r in enumerate(o_tagger.tokenize(raw_u_text)):
                    word_surface = r.surface
                    word_feature = r.part_of_speech.split(",")[0]

if __name__ == "__main__":
    main()

# [EOF]

word2vecを使用するには形態素解析したデータをもとに分かち書きを作成する必要があります。

毎回形態素解析をして生成しても良いですが、ここでは形態素解析をした結果をデータベースに一旦保存した後、必要な情報を取得できるようにしてみます。

janomeによる形態素解析とデータベースへの保存

前述のコードを修正してデータベース(ここではsqlite3)へ保存するようにしたもの。

# -*- coding: utf-8 -*-
# http://www.mizunagi-works.com
# ------------------------------------------------------------------ import(s)
import sys
import os
import sqlite3

import janome.tokenizer


# ------------------------------------------------------------------- const(s)
DB_FILENAME = "dccol.db"
CUSTOM_DIC = "./dic/nanoha_keyword.dic"
RAW_TEXT_DIR = "doc_source"


# ------------------------------------------------------------------- class(s)
# ---------------------------------------------------------------- function(s)
def main():

    o_tagger = janome.tokenizer.Tokenizer(CUSTOM_DIC)

    o_conn = sqlite3.connect(DB_FILENAME)
    o_cursor = o_conn.cursor()

    # テーブル生成
    o_cursor.execute(
        """
        CREATE TABLE IF NOT EXISTS doc_collection
        (
            id_doc text
        ,   word_pos int
        ,   word_surface text
        ,   word_feature text
        );
        """
    )

    # 指定したフォルダ内の .txt 拡張子のファイルを読み込んでDB化
    for dirname, _, list_filename in os.walk(RAW_TEXT_DIR):
        for filename in list_filename:
            if os.path.splitext(filename)[1].lower() not in (".txt",):
                continue

            pathname = os.path.join(dirname, filename)

            with open(pathname, "r") as h_reader:

                # utf-8 to unicode
                raw_u_text = h_reader.read().decode("utf-8")

                # 指定したドキュメントを削除
                o_cursor.execute(
                    """
                    DELETE FROM
                        doc_collection
                    WHERE
                        id_doc = ?;
                    """,
                    (pathname,)
                )

                for pos, r in enumerate(o_tagger.tokenize(raw_u_text)):

                    word_surface = r.surface
                    word_feature = r.part_of_speech.split(",")[0]

                    # 指定したドキュメントに単語を追加
                    o_cursor.execute(
                        """
                        INSERT INTO doc_collection
                        (
                            id_doc, word_pos, word_surface, word_feature
                        ) VALUES (?, ?, ?, ?);
                        """,
                        (pathname, pos, word_surface, word_feature,)
                    )

    o_conn.commit()

if __name__ == "__main__":
    main()

# [EOF]

.

potraceGUI for Win32を公開しました

GitHubで公開しているpotrace_guiのバイナリパッケージを作成してみました。

セットアップ等はありませんので、zipファイルを展開して適当な場所に配置してください。(もしかしたらデスクトップに展開した場合はちゃんと動作しないかも)

ダウンロード場所を自サイトからGitHubに変更しました。

Download

PotraceGUI for WIn32

potraceGUI-0.2.0.win-amd64.zip

.

(ウイルスチェックはVirusBarrierを使用しています)

アーカイブ内に以下の物が同梱済です。

potrace – Transforming bitmaps into vector graphics

http://potrace.sourceforge.net/

potrace_guiの説明

コマンドラインツールであるpotraceに簡易GUIを追加したものです。簡易的ですがプレビュー機能を搭載していますので、コマンドライン上でトライ&エラーを何度も繰り返すよりは楽…かもしれません。

potrace_gui使用方法

アプリケーションの起動

potrace_gui.exeを起動すると以下の様なWindowが表示されます。コマンドプロンプトが一緒に立ち上がってきますが無視してください。

 起動してすぐの状態

ファイルの読み込み

左上ツールバーのOpen()をクリックすると、ファイル選択ダイアログが表示されますので、変換したいビットマップ画像を指定して下さい。

しょぼいですがBMPファイルしか扱えませんので、元画像はビットマップファイルに変換しておいてください。カラー画像も読み込めますが、あらかじめモノクロ化しておいた方がより良い出力結果が得られます。

 レヴィとコロナちゃん

読み込みが完了すると、右側にプレビューが表示されます。プレビュー画面はスクロールさせたりドラッグする事で領域を切り替えることが出来ます。

 これ(BMP画像に変換したもの)を変換しました。

ファイルの変換

読み込んだ状態のままだと、やや線が細いため黒の部分をより多く抽出させてみます。

左側メニューにある「-k, –blacklevel」という項目を、0.5から0.85に変更してから、refresh(をクリックするか、Ctrl+Rでプレビューが更新されます。AutoRefresh()を押しておくと何らかのパラメータが変更される度に自動更新されます。(ファイルサイズが大きいときには動作が重くなるのでご注意を)

ファイルの保存

パラメータの調整が終わったら、BACKEND TYPESから出力形式を選んだ後にSave As..をクリックして下さい。

ファイル保存ダイアログが表示されますので、任意の場所を選んで保存して下さい。

各項目の説明

ほとんどの場合、変更すべきパラメーターは-k, –bkacklevelの値を変更するだけだと思いますが、以下にパラメータの説明を記載します。

-z, –turnpolicy

ビットマップ画像をどのようにパスに変換するかのルールを設定します。デフォルト設定値はminorityです。

.

-t, –turdsize

 

指定した値以下の曲線を除去します。曲線部の認識方法は、turnpolicyの設定に依存します。0以上の値が設定可能で、デフォルト設定値は2です。

.

-a, –alphamax

 

検出されたコーナーをどの程度滑らかに繋げるかを設定します。0.0が曲線無し、1.3334が全て曲線となります。デフォルト設定値は1.0です。

.

-n, –longcurve

なるべく少ない分割数でカーブを割り当てるようにします。

.

-O, –opttolerance

 

正確さを犠牲にする代わりに制御点の数を減らす事で単純化を図ります。0以上の値が設定可能で、デフォルト設定値は0.2です。

.

-u, –unit

 

出力時の1ピクセル当たりの内部分割サイズを設定します。値を大きくする事でより細かい精度で出力されます。1以上の値が設定可能で、デフォルト設定値は10です。

.

-k, –blacklevel

 

グレイスケール画像を与えた場合、黒と認識する閾値を指定します。指定範囲は0.0〜1.0で、1.0に近づくほど黒とみなされる領域が増えます。デフォルト設定値は0.5です。

.

-i, –invert

白黒を反転します。

.

-P, –pagesize

出力画像サイズを指定します。

–tight

有効にすると周囲の余白(線画認識されなかった領域)が削除されます。

Download

PotraceGUI for WIn32

potraceGUI-0.2.0.win-amd64.zip

.

(ウイルスチェックはVirusBarrierを使用しています)

Splinterによるブラウザ制御(2)

Windows版でSplinter + firefoxを使用すると、CERTIFICATE_VERIFY_FAILEDが出てしまう問題に無理矢理対応。

SSLでの接続時に以下の場所でエラーになっているため、確認処理を無視するように変更。

    def _create_connection(self):
        self._parse_url()
        if self.scheme == 'https':
            self.conn = http_client.HTTPSConnection(self.host, self.port)
        else:
            self.conn = http_client.HTTPConnection(self.host, self.port)
        self.conn.putrequest('GET', self.path)
        self.conn.putheader('User-agent', 'python/splinter')
        if self.auth:
            self.conn.putheader("Authorization", "Basic %s" % self.auth)
        self.conn.endheaders()

こんな感じに…

    def _create_connection(self):
        self._parse_url()
        if self.scheme == 'https':
            import ssl
            ssl_context = ssl.create_default_context()
            ssl_context.check_hostname = False
            ssl_context.verify_mode = ssl.CERT_NONE
            self.conn = http_client.HTTPSConnection(
                self.host,
                self.port,
                context=ssl_context, check_hostname=False
            )
        else:
            self.conn = http_client.HTTPConnection(self.host, self.port)
        self.conn.putrequest('GET', self.path)
        self.conn.putheader('User-agent', 'python/splinter')
        if self.auth:
            self.conn.putheader("Authorization", "Basic %s" % self.auth)
        self.conn.endheaders()

エラーは出なくなるけど危険です。

splinterによるブラウザ制御(1)

ウェブアプリケーションの開発をする際に、実際のブラウザで動作確認を行う、といった場面があるかと思います。

単純なHTTPの通信テストであればスクリプトでもなんとかなる場合もありますが、javascriptを使用している場合、javascriptの動作結果や挙動についても検証する必要があります。

こういったテストを行う場合、大抵は外部からブラウザを制御して行います。

有名どころとしてはSeleniumだと思いますが、今回はあえてsplinterを使用してみました。

インストールは pip から。

> pip install splinter

ちなみに標準ではfirefoxを使用しますので、 firefoxのインストールをしておく必要があります。

簡単な使用方法

# -*- coding: utf-8 -*-
import splinter

BROWSER = "firefox"
URL = "http://mizuvm01.cloudapp.net/wp"
oCBrowser = splinter.Browser(BROWSER)
oCBrowser.visit(URL)

こんな感じに記述します。

上の例では、firefoxを開き http://mizuvm01.cloudapp.net/wp を開きます。

ページ遷移後はhtmlやタグやIDの検索をする事が出来ます。

単純な表示だけでは面白くないので、うちのサイトから画像を取得するプログラムなんぞを作ってみます。

やることは以下の三つです。

  • 水凪工房のウェブサイトを開く。
  • 「魔法少女リリカルなのは」を検索する。
  • 表示されている画像をダウンロードする。

画像のダウンロードについてはsplinterで処理するのは困難なため、requestsモジュールを追加します。

> pip install requests

インストール出来たところで、早速こんな感じのコードを書きます。

# -*- coding: utf-8 -*-
import sys
import splinter
import re
import requests

URL = "http://mizuvm01.cloudapp.net/wp"
# ユーザーエージェントはサンプルを兼ねて明示的に指定しています。
USER_AGENT = "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.10; rv:35.0) Gecko/20100101 Firefox/35.0"

def main():

    oCBrowser = splinter.Browser("firefox", user_agent=USER_AGENT)
    oCBrowser.visit(URL)

    oCBrowser.type("s", u"魔法少女リリカルなのは\n")

    oCReP = re.compile("(" + URL + "/[0-9]{4}/[0-9]{2}/[0-9]{2}/.*)")
    oCReI = re.compile("(" + URL + "/wp/wp-content/uploads/[0-9]{4}/[0-9]{2}/.*)")

    listHref = [r["href"] for r in oCBrowser.find_by_tag("a")]

    for strHref in listHref:
        oCResult = oCReP.search(strHref)
        if(oCResult is None):
            continue

        strBlogPage = oCResult.group(1)

        oCBrowser.visit(strBlogPage)

        listImage = oCBrowser.find_by_tag("a")

        for o in listImage:
            try:
                s = oCReI.search(o["href"])
            except:
                s = None

            if(s is None):
                continue

            strImageUrl = s.group(1)

            # ユーザーエージェントはsplinterで制御しているブラウザと同じにする。
            # リファラーを現在ページからとする。
            # クッキーの値をブラウザから取得して、requestsに反映させる。
            dictCookie = {}
            dictHeader = {
                "User-Agent": USER_AGENT,
                "Referer": strBlogPage
            }

            for k, v in oCBrowser.cookies.all().items():
                dictCookie[k] = v

            oCRes = requests.get(
                strImageUrl,
                headers=dictHeader,
                cookies=dictCookie,
                stream=True
            )

            # requestsが戻す結果のrawをファイルに書き出す事で実際に保存可能です。
            print strImageUrl.split("/")[-1], len(oCRes.raw.read())

    oCBrowser.quit()

if(__name__ == "__main__"):
    main()

かなりいい加減なコードですが、こんな感じとなります。

 

Twitter API とか

Twitter Streaming API を使う上でのサンプルをメモ。

Streaming API は Search API 同様 OAuth 認証を使用していますので、Search API のサンプルとまとめてみました。

プログラムは、ふゆみけさんのウェブに記載されているものを参考にしました。

http://d.hatena.ne.jp/fuyumi3/20111205/1323087002

# -*- coding: utf-8 -*-
import json
import urllib2
import oauth2

TWITTER_CONSUMER_KEY = "********"
TWITTER_CONSUMER_SECRET = "********"
TWITTER_ACCESS_TOKEN_KEY = "********"
TWITTER_ACCESS_TOKEN_SECRET = "********"


# ---------------------------------------------------------------------------
##
#
class twitter_api(object):

    SEARCH = "https://api.twitter.com/1.1/search/tweets.json"
    STREAM_SAMPLE = "https://stream.twitter.com/1.1/statuses/sample.json"
    STREAM_FILTER = "https://stream.twitter.com/1.1/statuses/filter.json"

    # -----------------------------------------------------------------------
    ##
    #
    def __init__(self, CONSUMER_KEY, CONSUMER_SEC, ACCESS_TOKEN_KEY, ACCESS_TOKEN_SEC):
        self.m_dictConsumer = {
            "key": CONSUMER_KEY,
            "secret": CONSUMER_SEC
        }
        self.m_dictToken = {
            "key": ACCESS_TOKEN_KEY,
            "secret": ACCESS_TOKEN_SEC
        }

    # -----------------------------------------------------------------------
    ##
    #
    def build_request(self, strURL, dictParam):

        # 引数に key と secret を渡す
        oCConsumer = oauth2.Consumer(**self.m_dictConsumer)
        oCToken = oauth2.Token(**self.m_dictToken)

        oCOAuthRequest = oauth2.Request.from_consumer_and_token(
            oCConsumer,
            oCToken,
            http_url=strURL,
            parameters=dictParam
        )
        oCOAuthRequest.sign_request(
            oauth2.SignatureMethod_HMAC_SHA1(),
            oCConsumer,
            oCToken
        )

        oCRequest = urllib2.urlopen(oCOAuthRequest.to_url())

        return(oCRequest)


# ===========================================================================
##
# 
def main():

    oCTWApi = twitter_api(
        TWITTER_CONSUMER_KEY,
        TWITTER_CONSUMER_SECRET,
        TWITTER_ACCESS_TOKEN_KEY,
        TWITTER_ACCESS_TOKEN_SECRET
    )

    print "Twitter Search API"
    # Twitter Search API
    # https://dev.twitter.com/rest/reference/get/search/tweets

    oCReq = oCTWApi.build_request(
        twitter_api.SEARCH,
        {
            "q": urllib2.quote("book")
        }
    )

    for r in oCReq:
        dictData = json.loads(r)
        print "----"
        for dictTweet in dictData["statuses"]:
            print dictTweet["user"]["id"], dictTweet["text"].encode("utf-8")

    print "Twitter Streaming API"
    # Twitter Stream API
    # https://dev.twitter.com/streaming/overview

    oCReq = oCTWApi.build_request(
        twitter_api.STREAM_FILTER,
        {
            "track": urllib2.quote("book")
        }
    )

    for r in oCReq:
        dictData = json.loads(r)
        print "----"
        print dictData["user"]["id"], dictData["text"].encode("utf-8")


if(__name__ == "__main__"):
    main()

上記のコードを実行するには、利用するPython環境にoauth2がインストールされている必要があります。

また、Twitter API を利用可能にしておく必要があります。

watershedによる領域分割

少し前の記事でGmicのcolorizeが便利そうでしたので、Gmic内のスクリプトを読みながらopencvで似たようなことをやってみました。

Gmicはsourceに含まれているgmic_def.gmicに処理手順が書かれているため、同じことができるんじゃないか…と思ったのですが、なかなか難しいかも。

pyopencv2を使ってwatershedを中途半端に実装してみた例

gmic_def.gmic内のスクリプトによると、__x_colorize内でpotential mapというのを生成し、それを領域分割時に参照しているようなので、まずはここの処理と同等のものを作ってみることにしました。

# Compute potential function.
__x_colorize :
  -if $1 # Potential for lineart.
    -b[-1] 0.05% -n[-1] 0,1 --b[-1] 0.5% -pow[-2] 10 -n[-2] 0,1 -n[-1] 0.3,1 -min[-2,-1]
  -else  # Potential for generic grayscale image.
    -gradient_norm[-1] -n[-1] 0,255 -normalize_local[-1] 3,3 -*[-1] -1 -n[-1] 0,255
    -b[-1] 0.05% -n[-1] 0,1 -sqr[-1] --b[-1] 0.5% -n[-2,-1] 0,1 -min[-2,-1]
  -endif
  -nm[-1] potential

ラインアート処理部なので、

  1. [Layer1] blur 0.05%
  2. [Layer1] normalize 0.0 〜 1.0
  3. [Layer1] Layer1に対して blur 0.5% したものを[Layer2]に出力
  4. [Layer1] pow(べき乗計算) 10
  5. [Layer1] normalize 0.0 〜 1.0
  6. [Layer2] normalize 0.3 〜 1.0
  7. [Layer1] と [Layer2] を min 合成

といった処理をこんな感じに読みかえ。

oCImage = cv2.imread(strFilename)

nH, nW = oCImage.shape[0:2]

oCImageGray = cv2.cvtColor(oCImage, cv2.COLOR_BGR2GRAY) / 256.0

# oCImageBlur1 = cv2.GaussianBlur(oCImageGray, self.make_gaussian_param(oCImage, 0.0005), 0)
oCImageBlur1 = cv2.GaussianBlur(oCImageGray, (0, 0), 0.5, 0.5)
oCImageNormal1 = cv2.normalize(oCImageBlur1, None, 0.0, 1.0, cv2.cv.CV_MINMAX)
cv2.imshow("oCImageNormal1", oCImageNormal1)

# oCImageBlur2 = cv2.GaussianBlur(oCImageNormal1, self.make_gaussian_param(oCImage, 0.005), 0)
oCImageBlur2 = cv2.GaussianBlur(oCImageGray, (0, 0), 5, 5)
oCImageNormal2 = cv2.normalize(oCImageBlur2, None, 0.0, 1.0, cv2.cv.CV_MINMAX)
cv2.imshow("oCImageNormal2", oCImageNormal2)

oCImagePow = cv2.pow(oCImageNormal1, 10)
oCImageNormal1 = cv2.normalize(oCImagePow, None, 0.0, 1.0, cv2.cv.CV_MINMAX)
oCImageNormal2 = cv2.normalize(oCImageNormal2, None, 0.3, 1.0, cv2.cv.CV_MINMAX)

oCImagePotential = cv2.min(oCImageNormal1, oCImageNormal2)

ガウシアンフィルタのパラメータを理解していないため、数値は適当です。

 plotしてみる。

 watershed…だめかも。

 potential map

 

塗り分け処理を行っている部分も移植しないとあんまり意味がなかったり。

OpenCVを直接使用するのは初めてだったけど、Python用のライブラリも含まれているのでありがたかった。

MacOSX用のMakefileの生成にはCMakeが必要なのですが、以下のオプションのチェックを外しておきました。

  • BUILD_SHARED_LIBS
  • BUILD_TESTS
  • WITH_1394
  • WITH_FFMPEG

参考:http://blogs.wcode.org/2014/10/howto-install-build-and-use-opencv-macosx-10-10/

Render color map, Manage replace color mode, Render base image, Render viewの部分が理解できないと絶望的かも。

potrace を GUI 化(2)

だいぶ形になってきたので、サンプル画像らしきものを掲載してみます。

元々コマンドラインで使用していたので、作っておきながら使うときはあまりないような気がしますけど、Qtのアプリケーションをどの様に作っていけば良いのかの練習にはなりました。

 アリサちゃん

GUI のアプリケーション部分は気の利いた処理も何もなく、potrace 用のコマンドライン引数名そのままですので、判りづらいです。

 名称はコマンドライン引数名そのまま

同じ様な GUI フレームワークとしては wxWidgets があるのですけど、そちらよりも GUI 部分が組みやすい感じがしました。

問題になりそうな部分としては Qt のライセンス形式かもしれません。(特に、Python で作成する場合は、 Python + Qt + PyQt が必要になるため)

 

potrace を GUI 化(1)

Qt を使用して GUI アプリケーションを作成

せっかく Qt を導入したので、 Qt を使用したアプリケーションを作成してみました。

potrace に指定可能なパラメータ全てをそのまま貼り付けてあるため、 potrace を知らないと何を言っているのかわからない設定項目になってしまっていますけど…

potrace の出力を QGraphicsView に出力しているだけなのですが、適当に作った割にはそれっぽい動作をしてくれている感じです。

実行には Python3 と PyQt5, potrace が必要になるのですが、作成中のソースコードを Github にアップロードしておきました。

https://github.com/MizunagiKB/potrace_gui

 

Windows Azure の モバイルサービス

モバイル サービスについて

Windows Azure で利用できるサービスの一つに モバイル サービス というのがあります。

モバイルサービスと書かれてしまうとなんなのか判りづらいのですが、

  • REST 経由で使用可能なデータベース(20MBytes の場合は無料)
  • 認証機能
  • モバイルへのプッシュ通知
  • スケジュール

といったサービスをセットで提供してくれるものとなります。

ちなみにウェブサービスを提供するものなので、ウェブサイト用のスペースは提供されません。(ウェブサイトが必要な場合は、別途 web sites や virtual machines の契約が必要です)

モバイルサービスを開始すると、以下の様な画面が表示されてサンプルコードや組み込み例を提供してくれます。

azure_mobileservice

提供してくれるのはありがたいのですが、 Python 用のサンプルコードがありませんでした。

というわけでメモがてら記述しておきます。

REST insert / select / update / delete

# -*- coding: utf-8 -*-
import sys
import httplib
import urllib
import json

API_HOST = "xxxxxxxx.azure-mobile.net"
API_CODE = "xxxxxxxx"


def insert(table_name, id):

    oCHttp = httplib.HTTPSConnection(API_HOST)
    
    jsonData = json.dumps({"id": id, "text": "ABCD"})
    
    dictHead = {
        "Accept": "application/json",
        "X-ZUMO-APPLICATION": API_CODE,
        "Content-Type": "application/json",
        "Host": API_HOST,
        "Content-Length": len(jsonData)
    }
    
    
    oCHttp.request("POST", "/tables/" + table_name, jsonData, dictHead)
    oCRes = oCHttp.getresponse()

    print oCRes.status
    print oCRes.read()


def select(table_name):

    oCHttp = httplib.HTTPSConnection(API_HOST)

    dictHead = {
        "Accept": "application/json",
        "X-ZUMO-APPLICATION": API_CODE,
        "Host": API_HOST
    }

    urlParam = urllib.urlencode(
        {
            "$filter": "text eq 'ABCD'"
        }
    )

    oCHttp.request("GET", "/tables/" + table_name + "?" + urlParam, None, dictHead)
    oCRes = oCHttp.getresponse()
    
    print oCRes.status
    print oCRes.read()


def update(table_name, id):

    oCHttp = httplib.HTTPSConnection(API_HOST)
    
    
    jsonData = json.dumps({"text": "EFGH"})
    
    dictHead = {
        "Accept": "application/json",
        "X-ZUMO-APPLICATION": API_CODE,
        "Content-Type": "application/json",
        "Host": API_HOST,
        "Content-Length": len(jsonData)
    }

    oCHttp.request("PATCH", "/tables/" + table_name + "/" + id, jsonData, dictHead)
    oCRes = oCHttp.getresponse()

    print oCRes.status
    print oCRes.read()


def delete(table_name, id):

    oCHttp = httplib.HTTPSConnection(API_HOST)

    dictHead = {
        "X-ZUMO-APPLICATION": API_CODE,
        "Host": API_HOST
    }

    oCHttp.request("DELETE", "/tables/" + table_name + "/" + id, None, dictHead)
    oCRes = oCHttp.getresponse()
    
    print oCRes.status
    print oCRes.read()


insert("test_table", "1")
select("test_table")
update("test_table", "1")
delete("test_table", "1")

 ■

どうという事のないコードですが、適当に記述するとこのような感じです。

Azure モバイル サービスの REST API リファレンス

http://msdn.microsoft.com/library/jj710108.aspx

上に記載したコードには select に関して単純なフィルターしか指定していませんが、色々な条件を指定する事が出来ます。

$filter の指定方法は、OData の URL Conventions に記載されています。

 

PyQt5 で GUI アプリケーション

Python には昔から Tk という GUI ライブラリが付属しているのですが、あまりかっこよくありません…

勿論、目的が達成出来れば問題ないのですが、やっぱり見た目がかっこいい方が開発していて楽しいかも?ということで、 MacOSX 上に PyQt5 の環境を構築してみることにしました。

インストール作業

Qt のインストール

http://qt-project.org

PyQt5 は Qt をラッピングしたモジュールなので、まず Qt を用意しなければいけません。

Qt には Online Installer が用意してありますので、それを使用してインストールします。

そのうち指定しなくても良くなるかと思いますが、MacOSX 10.9 上で build をしている場合は、環境変数にあらかじめ 10.8 という値を設定しておきます。

export MACOSX_DEPLOYMENT_TARGET=10.8

Python 3.4 のインストール

https://www.python.org

標準の MacOSX には Python3 がありませんので、 Python3[1. この記事を書いている時点では 3.4 が最新バージョンでした。]をインストールする必要があります。

…の前に Marvericks 環境に必要なツールをインストールしておきます。

xcode-select --install

インストールが終わったら、

./configure
または
./configure --prefix=INSTALL_PATH
make
make install

インストール先を好きな場所にしたい場合は、 ./configure オプションの –prefix を使用する事でインストール先を任意の場所に変更することが出来ます。

sip のインストール

sip は C/C++ 用のモジュール(dll や so)を Python から呼び出せるようにするためのモジュールです。 PyQt5 は Qt を呼び出すために sip を使用するので、事前にインストールしておく必要があります。

python3 configure.py
make
make install

PyQt5 のインストール

ここまで完了したらやっと PyQt5 のインストールを始める事が出来ます。

PyQt5 の build には Qt 用の make ツールである qmake と、事前にインストールした sip が必要となります。

インストール場所を以下の様に指定します。

python3 configure.py --qmake <QMAKE PATH> --sip <SIP PATH>
make
make install

PyQt5 のビルドにはかなりの時間がかかります。

ビルド後の確認方法は、 PyQt5 に添付している example を実行することで試せます。

python3 example/qtdemo/qtdemo.py

とすることで、各種デモを呼び出せるメニューアプリケーションが起動出来ます。

メニューアプリケーション自体が PyQt5 製になっています。

pyqt_demo こんな感じ。

おまけ

cx_freeze を導入しておくと、アプリケーションの配布に便利です。

http://cx-freeze.sourceforge.net

bottle.py の egg 化

bottle.py の stable(0.12.7) は、 setup.py 内の指定が

from distutils.core import setup

となっているため、

> python setup.py bdist_egg

といった感じで egg ファイルを生成する事が出来ません。(もっとも、 bottle.py は1ファイルだけで構成されているため、 egg 化するまでもないのですが…)

egg 化したくてしょうがない場合は、前述の場所を以下の様に書き換える事で egg ファイルを生成する事が出来ます。

from setuptools import setup

 

Twisted を使用した proxy

Twisted を使用した Proxy のサンプルについての記録。

オリジナルは GitHub Gist で公開されている tcp-proxy.py となります。

Proxy を実現するには、 squid や Delegate 、 HAProxy といったものがありますが、 tcp-proxy.py は Twisted を使用した Python スクリプトとなります。

設定ファイルやコマンドライン引数などはなく、待ち受けと転送先を変更するにはそれぞれ 57 行目と 75 行目を書き換えるだけです。

#!/usr/bin/env python
# coding: utf-8
# http://musta.sh/2012-03-04/twisted-tcp-proxy.html

import sys

from twisted.internet import defer
from twisted.internet import protocol
from twisted.internet import reactor
from twisted.python import log

class ProxyClientProtocol(protocol.Protocol):
    def connectionMade(self):
        log.msg("Client: connected to peer")
        self.cli_queue = self.factory.cli_queue
        self.cli_queue.get().addCallback(self.serverDataReceived)

    def serverDataReceived(self, chunk):
        if chunk is False:
            self.cli_queue = None
            log.msg("Client: disconnecting from peer")
            self.factory.continueTrying = False
            self.transport.loseConnection()
        elif self.cli_queue:
            log.msg("Client: writing %d bytes to peer" % len(chunk))
            self.transport.write(chunk)
            self.cli_queue.get().addCallback(self.serverDataReceived)
        else:
            self.factory.cli_queue.put(chunk)

    def dataReceived(self, chunk):
        log.msg("Client: %d bytes received from peer" % len(chunk))
        self.factory.srv_queue.put(chunk)

    def connectionLost(self, why):
        if self.cli_queue:
            self.cli_queue = None
            log.msg("Client: peer disconnected unexpectedly")


class ProxyClientFactory(protocol.ReconnectingClientFactory):
    maxDelay = 10
    continueTrying = True
    protocol = ProxyClientProtocol

    def __init__(self, srv_queue, cli_queue):
        self.srv_queue = srv_queue
        self.cli_queue = cli_queue

class ProxyServer(protocol.Protocol):
    def connectionMade(self):
        self.srv_queue = defer.DeferredQueue()
        self.cli_queue = defer.DeferredQueue()
        self.srv_queue.get().addCallback(self.clientDataReceived)

        factory = ProxyClientFactory(self.srv_queue, self.cli_queue)
        reactor.connectTCP("127.0.0.1", 6666, factory)

    def clientDataReceived(self, chunk):
        log.msg("Server: writing %d bytes to original client" % len(chunk))
        self.transport.write(chunk)
        self.srv_queue.get().addCallback(self.clientDataReceived)

    def dataReceived(self, chunk):
        log.msg("Server: %d bytes received" % len(chunk))
        self.cli_queue.put(chunk)

    def connectionLost(self, why):
        self.cli_queue.put(False)

if __name__ == "__main__":
    log.startLogging(sys.stdout)
    factory = protocol.Factory()
    factory.protocol = ProxyServer
    reactor.listenTCP(9999, factory, interface="0.0.0.0")
    reactor.run()

Twisted の特徴は、イベント駆動と Defferred オブジェクトである事でしょうか。この二つを使用する事でポーリング処理が不要になるのと、処理を時系列に記述する事が出来るという利点があります。

自分は Deffered を理解するのに結構時間がかかってしまいました。