Skip to content

Commit

Permalink
don't create columns until we know the type
Browse files Browse the repository at this point in the history
  • Loading branch information
snopoke committed Jun 30, 2015
1 parent e4052c0 commit 72760ef
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 21 deletions.
58 changes: 40 additions & 18 deletions commcare_export/writers.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ def best_type_for(self, val):
else:
# We do not have a name for "bottom" in SQL aka the type whose least upper bound
# with any other type is the other type.
return None
return self.sqlalchemy.UnicodeText()

def compatible(self, source_type, dest_type):
"""
Expand Down Expand Up @@ -287,48 +287,68 @@ def least_upper_bound(self, source_type, dest_type):
# FIXME: Don't be so silly
return self.sqlalchemy.UnicodeText()

def get_id_column(self):
return self.sqlalchemy.Column(
'id',
self.sqlalchemy.Unicode(self.MAX_VARCHAR_LEN),
primary_key=True
)

def make_table_compatible(self, table_name, row_dict):
# FIXME: This does lots of redundant checks in a tight loop. Stop doing that.

ctx = self.alembic.migration.MigrationContext.configure(self.connection)
op = self.alembic.operations.Operations(ctx)

if not table_name in self.metadata.tables:
id_column = self.sqlalchemy.Column(
'id',
self.sqlalchemy.Unicode(self.MAX_VARCHAR_LEN),
primary_key=True
)
op.create_table(table_name, id_column)
def get_columns():
return [self.get_id_column()] + [
self.sqlalchemy.Column(name, self.best_type_for(val), nullable=True)
for name, val in row_dict.items() if val is not None and name != 'id'
]

if self.strict_types:
create_sql = self.sqlalchemy.schema.CreateTable(self.sqlalchemy.Table(
table_name,
self.sqlalchemy.MetaData(),
*get_columns()
)).compile(self.connection.engine)
logger.warn("Table '{table_name}' does not exist. Creating table with:\n{schema}".format(
table_name=table_name,
schema=create_sql
))
empty_cols = [name for name, val in row_dict.items() if val is None]
if empty_cols:
logger.warn("This schema does not include the following columns since we are unable "
"to determine the column type at this stage: {}".format(empty_cols))
op.create_table(table_name, *get_columns())
self.metadata.clear()
self.metadata.reflect()
return

def get_cols():
return {c.name: c for c in self.table(table_name).columns}

columns = get_cols()

for column, val in row_dict.items():
if val is None:
continue

ty = self.best_type_for(val)
if not column in columns:
# If we are creating the column, a None crashes things even though it is the "empty" type
# but SQL does not have such a type. So we have to guess a liberal type for future use.
ty = ty or self.sqlalchemy.UnicodeText()
logger.warn("Adding column '{}.{} {}'".format(table_name, column, ty))
op.add_column(table_name, self.sqlalchemy.Column(column, ty, nullable=True))
self.metadata.clear()
self.metadata.reflect()
columns = get_cols()
else:
if val is None:
if self.strict_types:
# don't bother checking compatibility since we're not going to change anything
continue

current_ty = columns[column].type

if not self.compatible(ty, current_ty):
new_type = self.least_upper_bound(ty, current_ty)
if self.strict_types:
logger.warn('Type mismatch detected for column %s (%s != %s) '
'but strict types in use.', columns[column], current_ty, new_type)
continue
if self.is_sqllite:
logger.warn('Type mismatch detected for column %s (%s != %s) '
'but sqlite does not support changing column types', columns[column], current_ty, new_type)
Expand All @@ -340,10 +360,12 @@ def get_cols():
columns = get_cols()

def upsert(self, table, row_dict):

# For atomicity "insert, catch, update" is slightly better than "select, insert or update".
# The latter may crash, while the former may overwrite data (which should be fine if whatever is
# racing against this is importing from the same source... if not you are busted anyhow

# strip out values that are None since the column may not exist yet
row_dict = {col: val for col, val in row_dict.items() if val is not None}
try:
insert = table.insert().values(**row_dict)
self.connection.execute(insert)
Expand Down
7 changes: 4 additions & 3 deletions tests/test_writers.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,13 +158,14 @@ def SqlWriter_upsert_tests(self, connection):
'name': 'foo_upsert',
'headings': ['id', 'a', 'b', 'c'],
'rows': [
['zing', 3, None, 5] # The None is allowed only in string fields as it defaults the col to text
['zing', 3, None, 5]
]
})

result = dict([(row['id'], row) for row in connection.execute('SELECT id, a, b, c FROM foo_upsert')])
# don't select column 'b' since it hasn't been created yet
result = dict([(row['id'], row) for row in connection.execute('SELECT id, a, c FROM foo_upsert')])
assert len(result) == 1
assert dict(result['zing']) == {'id': 'zing', 'a': 3, 'b': None, 'c': 5}
assert dict(result['zing']) == {'id': 'zing', 'a': 3, 'c': 5}

with writer:
writer.write_table({
Expand Down

0 comments on commit 72760ef

Please sign in to comment.