仙豆のレシピ

ちょっとしたことでも書いていく姿勢で

Polars の write_database() を INSERT OR IGNORE にする

polars.DataFrame.write_database で DataFrame を DB に書き込めるが、内部で実行されるのは INSERT 文なので既存テーブルに追加する際などにこれを INSERT OR IGNORE にしたくなることがある。これを実現する方法をメモ。

環境

DB は SQLitewrite_database()engine="sqlalchemy" を想定。 polars==0.18.15, pandas==1.5.3 で確認。

背景

PRIMARY KEY 制約で重複が禁止されているテーブルに重複した値を持つ行を追加しようとするとエラーになる。 このような場合に「重複しているものは無視して追加」を実現する SQL 文として INSERT OR IGNORE がある。 これを polars の write_database() でも実現したい。

import sqlite3

import polars as pl

DB_NAME="tmp.db"
TABLE_NAME="test_tbl"

conn = sqlite3.connect(DB_NAME)
cur = conn.cursor()

# `id` の重複を許さないテーブルを作成
cur.execute(f"CREATE TABLE {TABLE_NAME} (id INT, value STR, PRIMARY KEY(id))")
# データ挿入
cur.execute(f"INSERT INTO {TABLE_NAME} (id, abc) VALUES(111, 'old')")

# 追加データを挿入、`id=111` が重複しているのでエラー
# sqlalchemy.exc.IntegrityError: (sqlite3.IntegrityError) UNIQUE constraint failed: test_tbl.id
df = pl.DataFrame({"id": [111, 222], "value": ["new", "abc"]})
df.write_database(TABLE_NAME, f"sqlite:///{DB_NAME}", if_exists="append")

方法

pandas.io.sql.SQLTable._execute_insert を置き換える:

from pandas.io.sql import SQLTable
def _execute_insert_or_ignore(self, conn, keys: list[str], data_iter) -> int:
    data = [dict(zip(keys, row)) for row in data_iter]
    result = conn.execute(self.table.insert().prefix_with("OR IGNORE"), data)
    return result.rowcount
SQLTable._execute_insert = _execute_insert_or_ignore

変更点は self.table.insert()self.table.insert().prefix_with("OR IGNORE") にしたのみ1

こうすると前述のコード実行後テーブルは以下のようになる( id=111 の値(old)は変わらずに id=222 が挿入される)。

┌─────┬───────┐
│ id  ┆ value │
│ --- ┆ ---   │
│ i64 ┆ str   │
╞═════╪═══════╡
│ 111 ┆ old   │
│ 222 ┆ abc   │
└─────┴───────┘

原理としては、 write_database(engine="sqlalchemy") は内部で to_pandas() して pandas.DataFrame.to_sql を実行している(該当部分)。 なので to_sql() の内部で SQLAlchemy を実行する部分を置き換えている。

代替案

上記の方法だとすべての write_table()INSERT OR IGNORE になってしまう。もちろん

orig = SQLTable._execute_insert
SQLTable._execute_insert = _execute_insert_or_ignore
df.write_table()
SQLTable._execute_insert = orig

のような泥臭い方法も考えられるが面倒。 正攻法としては write_table() の引数に method を追加して to_sql()にこの method を渡すように修正、実行時に method=_execute_insert_or_ignore とすることで IGNORE するかどうか切り替える、みたいな感じかなと思うがengine=adbc のときとの整合性とかが面倒かもしれない。

別の方法としては stack overflow にあるように一旦(新規の)別テーブルに write_database() してこのテーブルから目的のテーブルに INSERT OR IGNORE するというのがあるらしい。 INSERT が2回走ることになるけど場合によってはこれもアリかもしれない。


  1. MySQL なら prefix_with("IGNORE") にすればよいらしい( stack overflow