diff --git a/src/psycstore/plugin_psycstore_postgres.c b/src/psycstore/plugin_psycstore_postgres.c index 2dcafd4..360d0eb 100644 --- a/src/psycstore/plugin_psycstore_postgres.c +++ b/src/psycstore/plugin_psycstore_postgres.c @@ -70,7 +70,7 @@ struct Plugin /** * Native Postgres database handle. */ - PGconn *dbh; + struct GNUNET_PQ_Context *dbh; enum Transactions transaction; @@ -160,201 +160,185 @@ database_setup (struct Plugin *plugin) GNUNET_PQ_EXECUTE_STATEMENT_END }; + /* Prepare statements */ + struct GNUNET_PQ_PreparedStatement ps[] = { + GNUNET_PQ_make_prepare ("transaction_begin", + "BEGIN", 0), + GNUNET_PQ_make_prepare ("transaction_commit", + "COMMIT", 0), + GNUNET_PQ_make_prepare ("transaction_rollback", + "ROLLBACK", 0), + GNUNET_PQ_make_prepare ("insert_channel_key", + "INSERT INTO channels (pub_key) VALUES ($1)" + " ON CONFLICT DO NOTHING", 1), + GNUNET_PQ_make_prepare ("insert_slave_key", + "INSERT INTO slaves (pub_key) VALUES ($1)" + " ON CONFLICT DO NOTHING", 1), + GNUNET_PQ_make_prepare ("insert_membership", + "INSERT INTO membership\n" + " (channel_id, slave_id, did_join, announced_at,\n" + " effective_since, group_generation)\n" + "VALUES (get_chan_id($1),\n" + " get_slave_id($2),\n" + " $3, $4, $5, $6)", 6), + GNUNET_PQ_make_prepare ("select_membership", + "SELECT did_join FROM membership\n" + "WHERE channel_id = get_chan_id($1)\n" + " AND slave_id = get_slave_id($2)\n" + " AND effective_since <= $3 AND did_join = 1\n" + "ORDER BY announced_at DESC LIMIT 1", 3), + GNUNET_PQ_make_prepare ("insert_fragment", + "INSERT INTO messages\n" + " (channel_id, hop_counter, signature, purpose,\n" + " fragment_id, fragment_offset, message_id,\n" + " group_generation, multicast_flags, psycstore_flags, data)\n" + "VALUES (get_chan_id($1),\n" + " $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)" + "ON CONFLICT DO NOTHING", 11), + GNUNET_PQ_make_prepare ("update_message_flags", + "UPDATE messages\n" + "SET psycstore_flags = psycstore_flags | $1\n" + "WHERE channel_id = get_chan_id($2) \n" + " AND message_id = $3 AND fragment_offset = 0", 3), + GNUNET_PQ_make_prepare ("select_fragments", + "SELECT hop_counter, signature, purpose, fragment_id,\n" + " fragment_offset, message_id, group_generation,\n" + " multicast_flags, psycstore_flags, data\n" + "FROM messages\n" + "WHERE channel_id = get_chan_id($1) \n" + " AND $2 <= fragment_id AND fragment_id <= $3", 3), + /** @todo select_messages: add method_prefix filter */ + GNUNET_PQ_make_prepare ("select_messages", + "SELECT hop_counter, signature, purpose, fragment_id,\n" + " fragment_offset, message_id, group_generation,\n" + " multicast_flags, psycstore_flags, data\n" + "FROM messages\n" + "WHERE channel_id = get_chan_id($1) \n" + " AND $2 <= message_id AND message_id <= $3\n" + "LIMIT $4;", 4), + /** @todo select_latest_messages: add method_prefix filter */ + GNUNET_PQ_make_prepare ("select_latest_fragments", + "SELECT rev.hop_counter AS hop_counter,\n" + " rev.signature AS signature,\n" + " rev.purpose AS purpose,\n" + " rev.fragment_id AS fragment_id,\n" + " rev.fragment_offset AS fragment_offset,\n" + " rev.message_id AS message_id,\n" + " rev.group_generation AS group_generation,\n" + " rev.multicast_flags AS multicast_flags,\n" + " rev.psycstore_flags AS psycstore_flags,\n" + " rev.data AS data\n" + " FROM\n" + " (SELECT hop_counter, signature, purpose, fragment_id,\n" + " fragment_offset, message_id, group_generation,\n" + " multicast_flags, psycstore_flags, data \n" + " FROM messages\n" + " WHERE channel_id = get_chan_id($1) \n" + " ORDER BY fragment_id DESC\n" + " LIMIT $2) AS rev\n" + " ORDER BY rev.fragment_id;", 2), + GNUNET_PQ_make_prepare ("select_latest_messages", + "SELECT hop_counter, signature, purpose, fragment_id,\n" + " fragment_offset, message_id, group_generation,\n" + " multicast_flags, psycstore_flags, data\n" + "FROM messages\n" + "WHERE channel_id = get_chan_id($1)\n" + " AND message_id IN\n" + " (SELECT message_id\n" + " FROM messages\n" + " WHERE channel_id = get_chan_id($2) \n" + " GROUP BY message_id\n" + " ORDER BY message_id\n" + " DESC LIMIT $3)\n" + "ORDER BY fragment_id", 3), + GNUNET_PQ_make_prepare ("select_message_fragment", + "SELECT hop_counter, signature, purpose, fragment_id,\n" + " fragment_offset, message_id, group_generation,\n" + " multicast_flags, psycstore_flags, data\n" + "FROM messages\n" + "WHERE channel_id = get_chan_id($1) \n" + " AND message_id = $2 AND fragment_offset = $3", 3), + GNUNET_PQ_make_prepare ("select_counters_message", + "SELECT fragment_id, message_id, group_generation\n" + "FROM messages\n" + "WHERE channel_id = get_chan_id($1)\n" + "ORDER BY fragment_id DESC LIMIT 1", 1), + GNUNET_PQ_make_prepare ("select_counters_state", + "SELECT max_state_message_id\n" + "FROM channels\n" + "WHERE pub_key = $1 AND max_state_message_id IS NOT NULL", 1), + GNUNET_PQ_make_prepare ("update_max_state_message_id", + "UPDATE channels\n" + "SET max_state_message_id = $1\n" + "WHERE pub_key = $2", 2), + + GNUNET_PQ_make_prepare ("update_state_hash_message_id", + "UPDATE channels\n" + "SET state_hash_message_id = $1\n" + "WHERE pub_key = $2", 2), + GNUNET_PQ_make_prepare ("insert_state_current", + "INSERT INTO state\n" + " (channel_id, name, value_current, value_signed)\n" + "SELECT new.channel_id, new.name,\n" + " new.value_current, old.value_signed\n" + "FROM (SELECT get_chan_id($1) AS channel_id,\n" + " $2::TEXT AS name, $3::BYTEA AS value_current) AS new\n" + "LEFT JOIN (SELECT channel_id, name, value_signed\n" + " FROM state) AS old\n" + "ON new.channel_id = old.channel_id AND new.name = old.name\n" + "ON CONFLICT (channel_id, name)\n" + " DO UPDATE SET value_current = EXCLUDED.value_current,\n" + " value_signed = EXCLUDED.value_signed", 3), + GNUNET_PQ_make_prepare ("delete_state_empty", + "DELETE FROM state\n" + "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = $1)\n" + " AND (value_current IS NULL OR length(value_current) = 0)\n" + " AND (value_signed IS NULL OR length(value_signed) = 0)", 1), + GNUNET_PQ_make_prepare ("update_state_signed", + "UPDATE state\n" + "SET value_signed = value_current\n" + "WHERE channel_id = get_chan_id($1) ", 1), + GNUNET_PQ_make_prepare ("delete_state", + "DELETE FROM state\n" + "WHERE channel_id = get_chan_id($1) ", 1), + GNUNET_PQ_make_prepare ("insert_state_sync", + "INSERT INTO state_sync (channel_id, name, value)\n" + "VALUES (get_chan_id($1), $2, $3)", 3), + GNUNET_PQ_make_prepare ("insert_state_from_sync", + "INSERT INTO state\n" + " (channel_id, name, value_current, value_signed)\n" + "SELECT channel_id, name, value, value\n" + "FROM state_sync\n" + "WHERE channel_id = get_chan_id($1)", 1), + GNUNET_PQ_make_prepare ("delete_state_sync", + "DELETE FROM state_sync\n" + "WHERE channel_id = get_chan_id($1)", 1), + GNUNET_PQ_make_prepare ("select_state_one", + "SELECT value_current\n" + "FROM state\n" + "WHERE channel_id = get_chan_id($1)\n" + " AND name = $2", 2), + GNUNET_PQ_make_prepare ("select_state_prefix", + "SELECT name, value_current\n" + "FROM state\n" + "WHERE channel_id = get_chan_id($1)\n" + " AND (name = $2 OR substr(name, 1, $3) = $4)", 4), + GNUNET_PQ_make_prepare ("select_state_signed", + "SELECT name, value_signed\n" + "FROM state\n" + "WHERE channel_id = get_chan_id($1)\n" + " AND value_signed IS NOT NULL", 1), + GNUNET_PQ_PREPARED_STATEMENT_END + }; + /* Open database and precompile statements */ plugin->dbh = GNUNET_PQ_connect_with_cfg (plugin->cfg, - "psycstore-postgres"); + "psycstore-postgres", + es, + ps); + if (NULL == plugin->dbh) return GNUNET_SYSERR; - if (GNUNET_OK != - GNUNET_PQ_exec_statements (plugin->dbh, - es)) - { - PQfinish (plugin->dbh); - plugin->dbh = NULL; - return GNUNET_SYSERR; - } - - /* Prepare statements */ - { - struct GNUNET_PQ_PreparedStatement ps[] = { - GNUNET_PQ_make_prepare ("transaction_begin", - "BEGIN", 0), - GNUNET_PQ_make_prepare ("transaction_commit", - "COMMIT", 0), - GNUNET_PQ_make_prepare ("transaction_rollback", - "ROLLBACK", 0), - GNUNET_PQ_make_prepare ("insert_channel_key", - "INSERT INTO channels (pub_key) VALUES ($1)" - " ON CONFLICT DO NOTHING", 1), - GNUNET_PQ_make_prepare ("insert_slave_key", - "INSERT INTO slaves (pub_key) VALUES ($1)" - " ON CONFLICT DO NOTHING", 1), - GNUNET_PQ_make_prepare ("insert_membership", - "INSERT INTO membership\n" - " (channel_id, slave_id, did_join, announced_at,\n" - " effective_since, group_generation)\n" - "VALUES (get_chan_id($1),\n" - " get_slave_id($2),\n" - " $3, $4, $5, $6)", 6), - GNUNET_PQ_make_prepare ("select_membership", - "SELECT did_join FROM membership\n" - "WHERE channel_id = get_chan_id($1)\n" - " AND slave_id = get_slave_id($2)\n" - " AND effective_since <= $3 AND did_join = 1\n" - "ORDER BY announced_at DESC LIMIT 1", 3), - GNUNET_PQ_make_prepare ("insert_fragment", - "INSERT INTO messages\n" - " (channel_id, hop_counter, signature, purpose,\n" - " fragment_id, fragment_offset, message_id,\n" - " group_generation, multicast_flags, psycstore_flags, data)\n" - "VALUES (get_chan_id($1),\n" - " $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)" - "ON CONFLICT DO NOTHING", 11), - GNUNET_PQ_make_prepare ("update_message_flags", - "UPDATE messages\n" - "SET psycstore_flags = psycstore_flags | $1\n" - "WHERE channel_id = get_chan_id($2) \n" - " AND message_id = $3 AND fragment_offset = 0", 3), - GNUNET_PQ_make_prepare ("select_fragments", - "SELECT hop_counter, signature, purpose, fragment_id,\n" - " fragment_offset, message_id, group_generation,\n" - " multicast_flags, psycstore_flags, data\n" - "FROM messages\n" - "WHERE channel_id = get_chan_id($1) \n" - " AND $2 <= fragment_id AND fragment_id <= $3", 3), - /** @todo select_messages: add method_prefix filter */ - GNUNET_PQ_make_prepare ("select_messages", - "SELECT hop_counter, signature, purpose, fragment_id,\n" - " fragment_offset, message_id, group_generation,\n" - " multicast_flags, psycstore_flags, data\n" - "FROM messages\n" - "WHERE channel_id = get_chan_id($1) \n" - " AND $2 <= message_id AND message_id <= $3\n" - "LIMIT $4;", 4), - /** @todo select_latest_messages: add method_prefix filter */ - GNUNET_PQ_make_prepare ("select_latest_fragments", - "SELECT rev.hop_counter AS hop_counter,\n" - " rev.signature AS signature,\n" - " rev.purpose AS purpose,\n" - " rev.fragment_id AS fragment_id,\n" - " rev.fragment_offset AS fragment_offset,\n" - " rev.message_id AS message_id,\n" - " rev.group_generation AS group_generation,\n" - " rev.multicast_flags AS multicast_flags,\n" - " rev.psycstore_flags AS psycstore_flags,\n" - " rev.data AS data\n" - " FROM\n" - " (SELECT hop_counter, signature, purpose, fragment_id,\n" - " fragment_offset, message_id, group_generation,\n" - " multicast_flags, psycstore_flags, data \n" - " FROM messages\n" - " WHERE channel_id = get_chan_id($1) \n" - " ORDER BY fragment_id DESC\n" - " LIMIT $2) AS rev\n" - " ORDER BY rev.fragment_id;", 2), - GNUNET_PQ_make_prepare ("select_latest_messages", - "SELECT hop_counter, signature, purpose, fragment_id,\n" - " fragment_offset, message_id, group_generation,\n" - " multicast_flags, psycstore_flags, data\n" - "FROM messages\n" - "WHERE channel_id = get_chan_id($1)\n" - " AND message_id IN\n" - " (SELECT message_id\n" - " FROM messages\n" - " WHERE channel_id = get_chan_id($2) \n" - " GROUP BY message_id\n" - " ORDER BY message_id\n" - " DESC LIMIT $3)\n" - "ORDER BY fragment_id", 3), - GNUNET_PQ_make_prepare ("select_message_fragment", - "SELECT hop_counter, signature, purpose, fragment_id,\n" - " fragment_offset, message_id, group_generation,\n" - " multicast_flags, psycstore_flags, data\n" - "FROM messages\n" - "WHERE channel_id = get_chan_id($1) \n" - " AND message_id = $2 AND fragment_offset = $3", 3), - GNUNET_PQ_make_prepare ("select_counters_message", - "SELECT fragment_id, message_id, group_generation\n" - "FROM messages\n" - "WHERE channel_id = get_chan_id($1)\n" - "ORDER BY fragment_id DESC LIMIT 1", 1), - GNUNET_PQ_make_prepare ("select_counters_state", - "SELECT max_state_message_id\n" - "FROM channels\n" - "WHERE pub_key = $1 AND max_state_message_id IS NOT NULL", 1), - GNUNET_PQ_make_prepare ("update_max_state_message_id", - "UPDATE channels\n" - "SET max_state_message_id = $1\n" - "WHERE pub_key = $2", 2), - - GNUNET_PQ_make_prepare ("update_state_hash_message_id", - "UPDATE channels\n" - "SET state_hash_message_id = $1\n" - "WHERE pub_key = $2", 2), - GNUNET_PQ_make_prepare ("insert_state_current", - "INSERT INTO state\n" - " (channel_id, name, value_current, value_signed)\n" - "SELECT new.channel_id, new.name,\n" - " new.value_current, old.value_signed\n" - "FROM (SELECT get_chan_id($1) AS channel_id,\n" - " $2::TEXT AS name, $3::BYTEA AS value_current) AS new\n" - "LEFT JOIN (SELECT channel_id, name, value_signed\n" - " FROM state) AS old\n" - "ON new.channel_id = old.channel_id AND new.name = old.name\n" - "ON CONFLICT (channel_id, name)\n" - " DO UPDATE SET value_current = EXCLUDED.value_current,\n" - " value_signed = EXCLUDED.value_signed", 3), - GNUNET_PQ_make_prepare ("delete_state_empty", - "DELETE FROM state\n" - "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = $1)\n" - " AND (value_current IS NULL OR length(value_current) = 0)\n" - " AND (value_signed IS NULL OR length(value_signed) = 0)", 1), - GNUNET_PQ_make_prepare ("update_state_signed", - "UPDATE state\n" - "SET value_signed = value_current\n" - "WHERE channel_id = get_chan_id($1) ", 1), - GNUNET_PQ_make_prepare ("delete_state", - "DELETE FROM state\n" - "WHERE channel_id = get_chan_id($1) ", 1), - GNUNET_PQ_make_prepare ("insert_state_sync", - "INSERT INTO state_sync (channel_id, name, value)\n" - "VALUES (get_chan_id($1), $2, $3)", 3), - GNUNET_PQ_make_prepare ("insert_state_from_sync", - "INSERT INTO state\n" - " (channel_id, name, value_current, value_signed)\n" - "SELECT channel_id, name, value, value\n" - "FROM state_sync\n" - "WHERE channel_id = get_chan_id($1)", 1), - GNUNET_PQ_make_prepare ("delete_state_sync", - "DELETE FROM state_sync\n" - "WHERE channel_id = get_chan_id($1)", 1), - GNUNET_PQ_make_prepare ("select_state_one", - "SELECT value_current\n" - "FROM state\n" - "WHERE channel_id = get_chan_id($1)\n" - " AND name = $2", 2), - GNUNET_PQ_make_prepare ("select_state_prefix", - "SELECT name, value_current\n" - "FROM state\n" - "WHERE channel_id = get_chan_id($1)\n" - " AND (name = $2 OR substr(name, 1, $3) = $4)", 4), - GNUNET_PQ_make_prepare ("select_state_signed", - "SELECT name, value_signed\n" - "FROM state\n" - "WHERE channel_id = get_chan_id($1)\n" - " AND value_signed IS NOT NULL", 1), - GNUNET_PQ_PREPARED_STATEMENT_END - }; - - if (GNUNET_OK != - GNUNET_PQ_prepare_statements (plugin->dbh, - ps)) - { - PQfinish (plugin->dbh); - plugin->dbh = NULL; - return GNUNET_SYSERR; - } - } return GNUNET_OK; } @@ -368,7 +352,7 @@ database_setup (struct Plugin *plugin) static void database_shutdown (struct Plugin *plugin) { - PQfinish (plugin->dbh); + GNUNET_PQ_disconnect (plugin->dbh); plugin->dbh = NULL; }