郵便局配布の郵便番号データをJSONに加工

郵便局のウェブサイトでは郵便番号CSVが公開されており、ダウンロードして利用する事で郵便番号から住所や事業者を取得する事が出来ます。これらのデータについて、郵便局は著作権を主張せず再配布も自由との事ですので、データベース用のテストデータとして活用してみるのに良さそうです。

郵便番号CSVは名前の通りCSV形式ではありますが、一つの住所が複数の情報に跨って記述されている場合があり、そのままではやや扱いづらいものがあります。

というわけで、

郵便番号7桁 = 1〜n個の住所データ

というJSON形式に変換するスクリプトを書いてみました。

ついでに郵便番号CSV(住所・カナ表記)と郵便番号CSV(住所・ローマ表記)を結合させています。

# こんな感じでの使用を想定しています。
$ python conv_zipcode_jp.py KEN_ALL.CSV KEN_ALL_ROME.CSV

# -*- coding: utf-8 -*-
# ------------------------------------------------------------------ import(s)
import sys
import re
import csv
import json
import collections
import unicodedata


# ------------------------------------------------------------------- class(s)
class CZipcodeRecord(object):

    class CAddress(object):
        prefecture = ""
        municipality = ""
        area = ""
        def __str__(self):
            return "|".join([self.prefecture, self.municipality, self.area])
        def get_list(self):
            if len(self.area) > 0:
                return [self.prefecture, self.municipality, self.area]
            elif len(self.municipality) > 0:
                return [self.prefecture, self.municipality]
            else:
                return [self.prefecture]
        def is_splitted_be(self):
            if re.search("(.*?", self.area) is not None:
                if re.search("(.*?)", self.area) is None:
                    return True
            return False
        def is_splitted_en(self):
            if re.search(".*?)", self.area) is not None:
                if re.search("(.*?)", self.area) is None:
                    return True
            return False

    def __init__(self, src_record, dict_zipcode_roma):

        r = [v.decode("cp932").encode("utf-8") for v in src_record]

        self.jiscode = r[0]
        self.zipcode_5 = r[1]
        self.zipcode_7 = r[2]

        self.address = self.CAddress()
        self.address.prefecture = r[6]
        self.address.municipality = r[7]
        self.address.area = r[8]

        self.address_kana = self.CAddress()
        self.address_kana.prefecture = norm_process(r[3])
        self.address_kana.municipality = norm_process(r[4])
        self.address_kana.area = norm_process(r[5])

        self.address_rome = self.CAddress()

        self.flag_attrib = "".join(r[9:13])
        self.flag_update = "".join(r[12:15])

        try:
            rome = dict_zipcode_roma[self.zipcode_7]
            if self.address.prefecture == rome[1]:
                self.address_rome.prefecture = rome[4]
            if self.address.municipality == rome[2]:
                self.address_rome.municipality = rome[5]
            if self.address.area == rome[3]:
                self.address_rome.area = rome[6]
        except KeyError:
            pass

        if self.address.area.find("以下に掲載がない場合") == 0:
            self.address.area = ""
            self.address_kana.area = ""
            self.address_rome.area = ""

    def key(self):
        return (self.zipcode_7)


# ============================================================================
def norm_process(src_text):
    utext = unicodedata.normalize("NFKC", src_text.decode("utf-8"))
    for src_v, dst_v in zip(u"0123456789()", u"0123456789()"):
        utext = utext.replace(src_v, dst_v)
    return utext.encode("utf-8")


# ============================================================================
def build_rome_dic(rome_pathname):
    pattern_rome = re.compile("[A-Z0-9\\(\\)]")
    dict_result = collections.OrderedDict()
    with open(rome_pathname, "r") as h_rfile:
        for r in csv.reader(h_rfile):
            list_record = [v.decode("cp932").encode("utf-8").replace(" " , "") for v in r]
            if len([v for v in list_record[4:] if pattern_rome.search(v) is not None]):
                dict_result[list_record[0]] = list_record
    return dict_result


# ============================================================================
def main():

    dict_zip_roma = build_rome_dic(sys.argv[2])
    dict_zip = collections.OrderedDict()

    nRecordCount = 0
    with open(sys.argv[1], "r") as h_rfile:
        for r in csv.reader(h_rfile):
            o = CZipcodeRecord(r, dict_zip_roma)
            if o.key() not in dict_zip:
                dict_zip[o.key()] = [o]
                nRecordCount += 1
                if o.address.is_splitted_be() is True:
                    n_rpart = 1
                else:
                    n_rpart = 0
            else:
                if n_rpart == 0:
                    dict_zip[o.key()].append(o)
                    nRecordCount += 1
                    if o.address.is_splitted_en() is True:
                        print "addres parser error"
                        sys.exit()
                elif n_rpart == 1:
                    dict_zip[o.key()][0].address.area += o.address.area
                    dict_zip[o.key()][0].address_kana.area += o.address_kana.area
                    dict_zip[o.key()][0].address_rome.area += o.address_rome.area
                    if o.address.is_splitted_be() is True:
                        print "addres parser error"
                        sys.exit()
                    elif o.address.is_splitted_en() is True:
                        n_rpart -= 1
                else:
                    print "addres parser error"
                    sys.exit()

    # json で出力する場合の例
    """
    {
        "0000000": {
            "country": "JP",
            "address_list": [
                {
                    "ja": [],
                    "ja_kana": [],
                    "ja_rome": []
                }
            ]
        }
    }
    """
    nProgress = 0
    dictExport = {}
    for k, list_o in dict_zip.items():
        list_address = []
        for o in list_o:
            list_address.append(
                {
                    "ja": o.address.get_list(),
                    "ja_kana": o.address_kana.get_list(),
                    "ja_rome": o.address_rome.get_list()
                }
            )
            nProgress += 1
        dictExport[k] = {
            "country": "JP",
            "address_list": list_address
        }

    print json.dumps(dictExport, indent=True)



# -----------------------------------------------------------------------[EOF]