PostgreSQLでUPSERT(MERGE、INSERT ... ON DUPLICATE UPDATE)するには?

MySQLではINSERT ... ON DUPLICATE UPDATEと呼ばれ、標準ではMERGE操作の一部としてサポートされています。

PostgreSQLでは(pg 9.5以前では)直接サポートされていませんが、どのようにすればよいのでしょうか?以下のように考えてください:

CREATE TABLE testtable (
    id integer PRIMARY KEY,
    somedata text NOT NULL
);

INSERT INTO testtable (id, somedata) VALUES
(1, 'fred'),
(2, 'bob');

タプル(2, 'Joe'), (3, 'Alan')をupsert&quotしたいとします:

(1, 'fred'),
(2, 'Joe'),    -- Changed value of existing tuple
(3, 'Alan')    -- Added new tuple

これが、upsertについて議論するときに人々が話していることである。重要なことは、どのようなアプローチであっても、明示的なロッキングを使用するか、あるいは結果として発生するレースコンディションを防御することで、同じテーブル*上で複数のトランザクションが動作していても安全でなければならないということです。

このトピックは、https://stackoverflow.com/q/1109061/398670 で広範囲に議論されていますが、MySQL 構文の代替案に関するもので、時間の経過とともに無関係な詳細がかなり増えています。確定的な答えに取り組んでいます。

これらのテクニックは、"insert if not exists, otherwise do nothing""、つまり、"insert ... on duplicate key ignore" "にも有効です。

質問へのコメント (6)
ソリューション

9.5以降

PostgreSQL 9.5以降では、INSERT ... ON CONFLICT UPDATE (および ON CONFLICT DO NOTHING )がサポートされています。 ON DUPLICATE KEY UPDATE`との比較簡単な説明。 使用法はマニュアル - 特に構文図のconflict_action節と説明テキストを参照してください。 以下に示す9.4以前の解決策とは異なり、この機能は複数の競合する行に対して動作し、排他ロックや再試行ループを必要としません。 この機能を追加したコミットはこちらで、この機能の開発に関する議論はこちらです。

*もしあなたが9.5を使っていて後方互換性を必要としないのであれば、今読むのを止めても構いません。

9.4以上です:

PostgreSQLには組み込みのUPSERT(またはMERGE)機能がありません。 この記事ではこの問題について詳しく説明しています。 一般的には、2つのオプションから選択しなければならない:

  • 個々の挿入/更新操作をリトライ・ループで実行する。
  • テーブルをロックし、一括マージする。

    個々の行のリトライループ

    多くの接続が同時に挿入を行おうとする場合、再試行ループで個々の行の挿入を行うのが合理的です。 PostgreSQLのドキュメントには、これをデータベース内のループで実行する便利なプロシジャがあります。ほとんどの単純な解決策とは異なり、更新の失敗や挿入の競合を防ぎます。このプロシージャはREAD COMMITTEDモードでのみ動作し、トランザクション内で行うことがこのプロシージャのみである場合にのみ安全です。この関数は、トリガやセカンダリユニークキーがユニーク違反の原因となる場合、正しく動作しません。 この方法は非常に非効率的です。現実的であればいつでも、作業をキューに入れ、代わりに以下に説明するような一括アップサートを行うべきである。 この問題に対する多くの解決策はロールバックを考慮していないため、不完全な更新になってしまう。2つのトランザクションが互いに競合し、一方はINSERTに成功し、もう一方は重複キーエラーになり、代わりにUPDATEを行う。UPDATEINSERTがロールバックするかコミットするのを待ってブロックする。そのため、UPDATE`がコミットしても、実際には期待したアップサートは行われていません。結果の行数をチェックし、必要に応じて再試行する必要がある。 また、SELECTレースを考慮していない解決策もあります。明らかで単純な方法を試すと

-- THIS IS WRONG. DO NOT COPY IT. It's an EXAMPLE.

BEGIN;

UPDATE testtable
SET somedata = 'blah'
WHERE id = 2;

-- Remember, this is WRONG. Do NOT COPY IT.

INSERT INTO testtable (id, somedata)
SELECT 2, 'blah'
WHERE NOT EXISTS (SELECT 1 FROM testtable WHERE testtable.id = 2);

COMMIT;

を試した場合、2つ同時に実行するといくつかの失敗モードがあります。1つは、すでに説明した更新再チェックの問題です。もう1つは、両方が同時にUPDATEを実行し、0行にマッチして続行する場合です。このテストはINSERTの前に行われます。どちらもゼロ行を得たので、どちらもINSERTを行う。一方は重複キーエラーで失敗する。 これが、再試行ループが必要な理由である。巧妙なSQLを使えば、重複キーエラーや更新の失敗を防げると思うかもしれませんが、そうではありません。行数をチェックするか、重複キーエラーを処理して(選択したアプローチによりますが)再試行する必要があります。 このために独自のソリューションを開発しないでください。メッセージキューイングと同様、それはおそらく間違っています。

ロック付き一括アップサート

新しいデータセットを古い既存のデータセットにマージするような一括アップサートを行いたいことがあります。これは個々の行のアップサートに比べて非常に効率的であり、実用的であればいつでも行うべきである。 この場合、一般的には以下のような処理を行います:

  • TEMPORARYテーブルをCREATE` する。
  • 新しいデータをテンポラリテーブルに COPY または一括挿入する。
  • ターゲットテーブルを IN EXCLUSIVE MODELOCK する。これにより、他のトランザクションは SELECT することはできるが、テーブルに変更を加えることはできない。
  • 既存のレコードの UPDATE ...FROM を実行する;
  • ターゲットテーブルにまだ存在しない行の INSERT を行う;
  • COMMIT を行い、ロックを解放する。 例えば、質問で与えられた例では、多値の INSERT を使用して temp テーブルにデータを入力します:
BEGIN;

CREATE TEMPORARY TABLE newvals(id integer, somedata text);

INSERT INTO newvals(id, somedata) VALUES (2, 'Joe'), (3, 'Alan');

LOCK TABLE testtable IN EXCLUSIVE MODE;

UPDATE testtable
SET somedata = newvals.somedata
FROM newvals
WHERE newvals.id = testtable.id;

INSERT INTO testtable
SELECT newvals.id, newvals.somedata
FROM newvals
LEFT OUTER JOIN testtable ON (testtable.id = newvals.id)
WHERE testtable.id IS NULL;

COMMIT;

関連する読み物

解説 (17)

PostgreSQLの9.5以前のバージョンでの単一挿入の問題に対する別の解決策を提供しようとしています。そのアイデアは、まず挿入を行い、レコードが既に存在する場合は更新を行うというものです:

do $$
begin 
  insert into testtable(id, somedata) values(2,'Joe');
exception when unique_violation then
  update testtable set somedata = 'Joe' where id = 2;
end $$;

この解決策は、テーブルの行の削除がない場合のみ適用できることに注意してください。

この解決策の効率については知りませんが、十分に合理的だと思われます。

解説 (7)

insertの例をいくつか示します。 ... 紛争について。 ...( pg 9.5 + ):

-紛争時に挿入-何もしないダミー(id、name、size)値(1、 'new_name'、3)に挿入します。 紛争については何もしません。

-競合時に挿入-更新を行い、で競合ターゲットを指定します。 ダミー(id、name、size)値(1、 'new_name'、3)に挿入します。 紛争(id)について。 設定名= 'new_name'を更新し、サイズ= 3;を更新します。

-競合時に挿入-更新を行い、制約名を介して競合ターゲットを指定します。 ダミー(id、name、size)値(1、 'new_name'、3)に挿入します。 制約dummy_pkeyの競合について。 設定名= 'new_name'を更新し、サイズ= 4;を更新します。

解説 (0)

PostgresのSQLAlchemy upsert> = 9.5。

上記の大きな投稿はPostgresバージョンの多くの異なるSQLアプローチをカバーしているため(質問のように9.5以外)、Postgres 9.5を使用している場合は、SQLAlchemyでそれを行う方法を追加したいと思います。 独自のupsertを実装する代わりに、SQLAlchemyの機能(SQLAlchemy 1.1で追加された)を使用することもできます。 個人的には、可能であればこれらを使用することをお勧めします。 利便性のためだけでなく、PostgreSQLが発生する可能性のあるレース条件を処理できるためです。

昨日私が与えた別の回答からのクロス投稿(https://stackoverflow.com/a/44395983/2156909)。

SQLAlchemyは、 on_conflict_do_update()on_conflict_do_nothing()の2つのメソッドで ON CONFLICTをサポートしています。

ドキュメントからのコピー:

from sqlalchemy.dialects.postgresql import insert

stmt = insert(my_table).values(user_email='a@b.com', data='inserted data')
stmt = stmt.on_conflict_do_update(
    index_elements=[my_table.c.user_email],
    index_where=my_table.c.user_email.like('%@gmail.com'),
    set_=dict(data=stmt.excluded.data)
    )
conn.execute(stmt)

http://docs.sqlalchemy.org/en/latest/dialects/postgresql.html?highlight = conflict#insert-on-conflict-upsert。

解説 (2)
WITH UPD AS (UPDATE TEST_TABLE SET SOME_DATA = 'Joe' WHERE ID = 2 
RETURNING ID),
INS AS (SELECT '2', 'Joe' WHERE NOT EXISTS (SELECT * FROM UPD))
INSERT INTO TEST_TABLE(ID, SOME_DATA) SELECT * FROM INS

Postgresql 9.3 でテスト済み

解説 (2)

この質問が閉じられたので、SQLAlchemyを使用してどのように実行するかについてここに投稿します。 再帰を介して、レース条件と検証エラーに対抗するために、バルクインサートまたは更新を再試行します。

まず輸入。

import itertools as it

from functools import partial
from operator import itemgetter

from sqlalchemy.exc import IntegrityError
from app import session
from models import Posts

これで、いくつかのヘルパー機能が機能します。

def chunk(content, chunksize=None):
    """Groups data into chunks each with (at most) `chunksize` items.
    https://stackoverflow.com/a/22919323/408556
    """
    if chunksize:
        i = iter(content)
        generator = (list(it.islice(i, chunksize)) for _ in it.count())
    else:
        generator = iter([content])

    return it.takewhile(bool, generator)

def gen_resources(records):
    """Yields a dictionary if the record's id already exists, a row object 
    otherwise.
    """
    ids = {item[0] for item in session.query(Posts.id)}

    for record in records:
        is_row = hasattr(record, 'to_dict')

        if is_row and record.id in ids:
            # It's a row but the id already exists, so we need to convert it 
            # to a dict that updates the existing record. Since it is duplicate,
            # also yield True
            yield record.to_dict(), True
        elif is_row:
            # It's a row and the id doesn't exist, so no conversion needed. 
            # Since it's not a duplicate, also yield False
            yield record, False
        elif record['id'] in ids:
            # It's a dict and the id already exists, so no conversion needed. 
            # Since it is duplicate, also yield True
            yield record, True
        else:
            # It's a dict and the id doesn't exist, so we need to convert it. 
            # Since it's not a duplicate, also yield False
            yield Posts(**record), False

そして最後にupsert関数。

def upsert(data, chunksize=None):
    for records in chunk(data, chunksize):
        resources = gen_resources(records)
        sorted_resources = sorted(resources, key=itemgetter(1))

        for dupe, group in it.groupby(sorted_resources, itemgetter(1)):
            items = [g[0] for g in group]

            if dupe:
                _upsert = partial(session.bulk_update_mappings, Posts)
            else:
                _upsert = session.add_all

            try:
                _upsert(items)
                session.commit()
            except IntegrityError:
                # A record was added or deleted after we checked, so retry
                # 
                # modify accordingly by adding additional exceptions, e.g.,
                # except (IntegrityError, ValidationError, ValueError)
                db.session.rollback()
                upsert(items)
            except Exception as e:
                # Some other error occurred so reduce chunksize to isolate the 
                # offending row(s)
                db.session.rollback()
                num_items = len(items)

                if num_items > 1:
                    upsert(items, num_items // 2)
                else:
                    print('Error adding record {}'.format(items[0]))

使用方法は次のとおりです。

>>> data = [
...     {'id': 1, 'text': 'updated post1'}, 
...     {'id': 5, 'text': 'updated post5'}, 
...     {'id': 1000, 'text': 'new post1000'}]
... 
>>> upsert(data)

これが[bulk_save_objects][3]を超える利点は、挿入時に関係、エラーチェックなどを処理できることです(バルク操作とは異なります)。

[3]:http://docs.sqlalchemy.org/en/latest/orm/session_api.html?highlight = bulk_save_objects#sqlalchemy.orm.session.Session.bulk_save_objects。

解説 (4)