Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support parameter passing to foreign server #293

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 19 additions & 21 deletions connection.c
Original file line number Diff line number Diff line change
Expand Up @@ -26,27 +26,6 @@
/* Length of host */
#define HOST_LEN 256

/*
* Connection cache hash table entry
*
* The lookup key in this hash table is the foreign server OID plus the user
* mapping OID. (We use just one connection per user per foreign server,
* so that we can ensure all scans use the same snapshot during a query.)
*/
typedef struct ConnCacheKey
{
Oid serverid; /* OID of foreign server */
Oid userid; /* OID of local user whose mapping we use */
} ConnCacheKey;

typedef struct ConnCacheEntry
{
ConnCacheKey key; /* hash key (must be first) */
MYSQL *conn; /* connection to foreign server, or NULL */
bool invalidated; /* true if reconnect is pending */
uint32 server_hashvalue; /* hash value of foreign server OID */
uint32 mapping_hashvalue; /* hash value of user mapping OID */
} ConnCacheEntry;

/*
* Connection cache (initialized on first use)
Expand All @@ -55,6 +34,25 @@ static HTAB *ConnectionHash = NULL;

static void mysql_inval_callback(Datum arg, int cacheid, uint32 hashvalue);

ConnCacheEntry *get_conn_cache_entry(ForeignServer *server, UserMapping *user, mysql_opt *opt) {
bool found;
ConnCacheEntry *entry;
ConnCacheKey key;

if (ConnectionHash == NULL) {
return NULL;
}

key.serverid = server->serverid;
key.userid = user->userid;

entry = hash_search(ConnectionHash, &key, HASH_ENTER, &found);
if (!found)
return NULL;

return entry;
}

/*
* mysql_get_connection:
* Get a connection which can be used to execute queries on the remote
Expand Down
2 changes: 2 additions & 0 deletions deparse.c
Original file line number Diff line number Diff line change
Expand Up @@ -1764,6 +1764,8 @@ foreign_expr_walker(Node *node, foreign_glob_cxt *glob_cxt,
else if (inner_cxt.state == FDW_COLLATE_SAFE &&
collation == inner_cxt.collation)
state = FDW_COLLATE_SAFE;
else if (collation == DEFAULT_COLLATION_OID)
state = FDW_COLLATE_NONE;
else
state = FDW_COLLATE_UNSAFE;
}
Expand Down
117 changes: 77 additions & 40 deletions mysql_fdw.c
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
#endif
#include "commands/defrem.h"
#include "commands/explain.h"
#include "common/hashfn.h"
#include "catalog/heap.h"
#include "catalog/pg_type.h"
#include "foreign/fdwapi.h"
Expand Down Expand Up @@ -585,6 +586,8 @@ mysqlBeginForeignScan(ForeignScanState *node, int eflags)
TupleTableSlot *tupleSlot = node->ss.ss_ScanTupleSlot;
TupleDesc tupleDescriptor = tupleSlot->tts_tupleDescriptor;
MYSQL *conn;
ConnCacheEntry *cache_entry;
StmtCacheEntry *stmt_cache_entry;
RangeTblEntry *rte;
MySQLFdwExecState *festate;
EState *estate = node->ss.ps.state;
Expand All @@ -602,6 +605,8 @@ mysqlBeginForeignScan(ForeignScanState *node, int eflags)
int rtindex;
List *fdw_private = fsplan->fdw_private;
char sql_mode[255];
bool prep_stmt_found;
uint32 query_hash;

/*
* We'll save private state in node->fdw_state.
Expand Down Expand Up @@ -693,37 +698,71 @@ mysqlBeginForeignScan(ForeignScanState *node, int eflags)
"mysql_fdw temporary data",
ALLOCSET_DEFAULT_SIZES);

if (wait_timeout > 0)
{
/* Set the session timeout in seconds */
sprintf(timeout, "SET wait_timeout = %d", wait_timeout);
mysql_query(festate->conn, timeout);
cache_entry = get_conn_cache_entry(server, user, options);
if (cache_entry == NULL) {
elog(ERROR, "beginForeignScan, no valid cache entry from connection cache!");
}

if (interactive_timeout > 0)
{
/* Set the session timeout in seconds */
sprintf(timeout, "SET interactive_timeout = %d", interactive_timeout);
mysql_query(festate->conn, timeout);
if (cache_entry->PrepStmtCache == NULL) {


HASHCTL ctl;
MemSet(&ctl, 0, sizeof(ctl));
ctl.keysize = sizeof(uint32);
ctl.entrysize = sizeof(StmtCacheEntry);

ctl.hcxt = CacheMemoryContext;
cache_entry->PrepStmtCache = hash_create("mysql_fdw per-connection statement cache", 200,
&ctl,
HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
}

snprintf(sql_mode, sizeof(sql_mode), "SET sql_mode = '%s'",
options->sql_mode);
if (mysql_query(festate->conn, sql_mode) != 0)
mysql_error_print(festate->conn);
query_hash = string_hash(festate->query, strlen(festate->query));
stmt_cache_entry = hash_search(cache_entry->PrepStmtCache, &query_hash, HASH_ENTER, &prep_stmt_found);
elog(WARNING, "'%s'", festate->query);
if (!prep_stmt_found) {
elog(WARNING, "No cache entry found: BE SURE YOURE CACHE PLAN CONFIG IS GENERIC NOT AUTO");
stmt_cache_entry->key = query_hash;

/* Initialize the MySQL statement */
festate->stmt = mysql_stmt_init(festate->conn);
if (festate->stmt == NULL)
ereport(ERROR,
(errcode(ERRCODE_FDW_UNABLE_TO_CREATE_EXECUTION),
errmsg("failed to initialize the mysql query: \n%s",
mysql_error(festate->conn))));
if (wait_timeout > 0)
{
/* Set the session timeout in seconds */
sprintf(timeout, "SET wait_timeout = %d", wait_timeout);
mysql_query(festate->conn, timeout);
}

/* Prepare MySQL statement */
if (mysql_stmt_prepare(festate->stmt, festate->query,
strlen(festate->query)) != 0)
mysql_stmt_error_print(festate, "failed to prepare the MySQL query");
if (interactive_timeout > 0)
{
/* Set the session timeout in seconds */
sprintf(timeout, "SET interactive_timeout = %d", interactive_timeout);
mysql_query(festate->conn, timeout);
}

snprintf(sql_mode, sizeof(sql_mode), "SET sql_mode = '%s'",
options->sql_mode);

if (mysql_query(festate->conn, sql_mode) != 0)
mysql_error_print(festate->conn);

/* Initialize the MySQL statement */
festate->stmt = mysql_stmt_init(festate->conn);
if (festate->stmt == NULL)
ereport(ERROR,
(errcode(ERRCODE_FDW_UNABLE_TO_CREATE_EXECUTION),
errmsg("failed to initialize the mysql query: \n%s",
mysql_error(festate->conn))));



/* Prepare MySQL statement */
if (mysql_stmt_prepare(festate->stmt, festate->query,
strlen(festate->query)) != 0)
mysql_stmt_error_print(festate, "failed to prepare the MySQL query");

stmt_cache_entry->stmt = festate->stmt;
} else {
festate->stmt = stmt_cache_entry->stmt;
}

/* Prepare for output conversion of parameters used in remote query. */
numParams = list_length(fsplan->fdw_exprs);
Expand Down Expand Up @@ -789,7 +828,7 @@ mysqlBeginForeignScan(ForeignScanState *node, int eflags)

/* Bind the results pointers for the prepare statements */
if (mysql_stmt_bind_result(festate->stmt, festate->table->mysql_bind) != 0)
mysql_stmt_error_print(festate, "failed to bind the MySQL query");
mysql_stmt_error_print(festate, "failed to bind the MySQL result");
}

/*
Expand Down Expand Up @@ -867,9 +906,6 @@ mysqlIterateForeignScan(ForeignScanState *node)
#else
ExecStoreTuple(tup, tupleSlot, InvalidBuffer, false);
#endif
else
mysql_stmt_close(festate->stmt);

/*
* Release locally palloc'd space and values of pass-by-reference
* datums, as well.
Expand Down Expand Up @@ -976,12 +1012,6 @@ mysqlEndForeignScan(ForeignScanState *node)
mysql_free_result(festate->table->mysql_res);
festate->table->mysql_res = NULL;
}

if (festate->stmt)
{
mysql_stmt_close(festate->stmt);
festate->stmt = NULL;
}
}

/*
Expand Down Expand Up @@ -1051,11 +1081,15 @@ mysqlGetForeignRelSize(PlannerInfo *root, RelOptInfo *baserel,
pull_varattnos((Node *) baserel->reltarget->exprs, baserel->relid,
&attrs_used);

/*
* Identify which baserestrictinfo clauses can be sent to the remote
* server and which can't.
*/
foreach(lc, baserel->baserestrictinfo)
{
RestrictInfo *ri = (RestrictInfo *) lfirst(lc);

if (mysql_is_foreign_expr(root, baserel, ri->clause, false))
if (mysql_is_foreign_expr(root, baserel, ri->clause, true))
fpinfo->remote_conds = lappend(fpinfo->remote_conds, ri);
else
fpinfo->local_conds = lappend(fpinfo->local_conds, ri);
Expand Down Expand Up @@ -1233,26 +1267,29 @@ mysqlGetForeignPaths(PlannerInfo *root, RelOptInfo *baserel,
{
Cost startup_cost;
Cost total_cost;
Path *path;

/* Estimate costs */
mysqlEstimateCosts(root, baserel, &startup_cost, &total_cost,
foreigntableid);

/* Create a ForeignPath node and add it as only possible path */
add_path(baserel, (Path *)
create_foreignscan_path(root, baserel,
path = (Path *)create_foreignscan_path(root, baserel,
NULL, /* default pathtarget */
baserel->rows,
startup_cost,
total_cost,
NIL, /* no pathkeys */
baserel->lateral_relids,
NULL, /* no extra plan */
NULL)); /* no fdw_private data */
NULL); /* no fdw_private data */
add_path(baserel, path);


/* Add paths with pathkeys */
mysql_add_paths_with_pathkeys(root, baserel, NULL, startup_cost,
total_cost);

}


Expand Down Expand Up @@ -2643,7 +2680,7 @@ bind_stmt_params_and_exec(ForeignScanState *node)

/* Bind the results pointers for the prepare statements */
if (mysql_stmt_bind_result(festate->stmt, festate->table->mysql_bind) != 0)
mysql_stmt_error_print(festate, "failed to bind the MySQL query");
mysql_stmt_error_print(festate, "failed to bind the MySQL results");

MemoryContextSwitchTo(oldcontext);
}
Expand Down
32 changes: 32 additions & 0 deletions mysql_fdw.h
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,36 @@ typedef struct MySQLColumn
} MySQLColumn;


/*
* Connection cache hash table entry
*
* The lookup key in this hash table is the foreign server OID plus the user
* mapping OID. (We use just one connection per user per foreign server,
* so that we can ensure all scans use the same snapshot during a query.)
*/
typedef struct ConnCacheKey
{
Oid serverid; /* OID of foreign server */
Oid userid; /* OID of local user whose mapping we use */
} ConnCacheKey;

typedef struct ConnCacheEntry
{
ConnCacheKey key; /* hash key (must be first) */
MYSQL *conn; /* connection to foreign server, or NULL */
bool invalidated; /* true if reconnect is pending */
uint32 server_hashvalue; /* hash value of foreign server OID */
uint32 mapping_hashvalue; /* hash value of user mapping OID */
HTAB *PrepStmtCache;

} ConnCacheEntry;

typedef struct StmtCacheEntry
{
uint32 key;
MYSQL_STMT *stmt;
} StmtCacheEntry;

extern int ((mysql_options) (MYSQL *mysql, enum mysql_option option,
const void *arg));
extern int ((mysql_stmt_prepare) (MYSQL_STMT *stmt, const char *query,
Expand Down Expand Up @@ -352,4 +382,6 @@ MYSQL *mysql_fdw_connect(mysql_opt *opt);
void mysql_cleanup_connection(void);
void mysql_release_connection(MYSQL *conn);

ConnCacheEntry *get_conn_cache_entry(ForeignServer *server, UserMapping *user, mysql_opt *opt);

#endif /* MYSQL_FDW_H */