import warnings
import openpyxl
import datetime
 
HEADER_ROW = True
START_COL = 1
END_COL = -1
 
SHEET_NAME = ""
 
DATE_FORMAT = "%Y-%m-%d %H-%M-%S"
TIME_FORMAT = "%H-%M-%S"
DEFAULT_DATE = datetime.datetime(1970,1,1)
STRIP_STRINGS = True
 
 
def str_to_int(string: str) -> int:
 
value = None
change_code = 0
try: value = int(string)
except ValueError: change_code = 1
 
return value, change_code
 
 
def str_to_float(string: str) -> float:
 
value = None
change_code = 0
try: value = float(string)
except ValueError: change_code = 1
 
return value, change_code
 
 
def str_to_datetime(string: str) -> datetime.datetime:
 
value = None
change_code = 0
try: value = datetime.datetime.strptime(value, DATE_FORMAT)
except ValueError: change_code = 1
 
return value, change_code
 
 
def str_to_time(string: str) -> datetime.time:
 
value = None
change_code = 0
try: value = datetime.datetime.strptime(value, TIME_FORMAT).time()
except ValueError: change_code = 1
 
return value, change_code
 
 
# Return values of lambda functions: (converted value, action)
# action in (0,1,2)
# 0: value converted, column type change not necessary (conversion possible in both directions)
# 1: value not converted, column type can be changed
# 2: value set to None, column type cannot be changed
# TODO, implement conversion of string constants "true", "True", etc.
 
TYPE_CONVERSIONS = {(str, int): lambda x: str_to_int(x),
(str, float): lambda x: str_to_float(x),
(str, datetime.time): lambda x: str_to_time(x),
(str, datetime.datetime): lambda x: str_to_datetime(x),
(str, bool): lambda x: (bool(x), 0),
(int, str): lambda x: (str(x), 0),
(int, float): lambda x: (float(x), 0),
(int, datetime.time): lambda _: (None, 2),
(int, datetime.datetime): lambda _: (None, 2),
(int, bool): lambda x: (bool(x), 0),
(float, int): lambda x: (None, 1),
(float, datetime.time): lambda _: (None, 1),
(float, datetime.datetime): lambda _: (None, 1),
(float, bool): lambda x: (bool(x), 0),
(float, str): lambda x: (str(x), 0),
(bool, str): lambda x: (str(x), 0),
(bool, int): lambda x: (int(x), 0),
(bool, float): lambda x: (float(x), 0),
(bool, datetime.time): lambda _: (None, 2),
(bool, datetime.datetime): lambda _: (None, 2),
(datetime.time, str): lambda x: (x.strftime(TIME_FORMAT), 0),
(datetime.time, int): lambda _: (None, 2),
(datetime.time, float): lambda _: (None, 2),
(datetime.time, bool): lambda x: (None, 2),
(datetime.time, datetime.datetime):
lambda x: ((DEFAULT_DATE + datetime.timedelta(hours = x.hour,
minutes = x.minute,
seconds = x.seconds)), 0),
(datetime.datetime, str): lambda x: (x.strftime(DATE_FORMAT), 0),
(datetime.datetime, int): lambda _: (None, 2),
(datetime.datetime, float): lambda _: (None, 2),
(datetime.datetime, bool): lambda x: (None, 2),
(datetime.datetime, datetime.time): lambda x: (x.time(), 0),
}
 
 
async def run(params, in_tables, out_tables, in_files, out_files, misc):
conf_colnames = in_tables[0].columns
conf_table = await in_tables[0].rows()
conf_row = []
if conf_table: conf_row = conf_table[0]
conf_header_row = HEADER_ROW
conf_sheetname = SHEET_NAME
conf_strip = STRIP_STRINGS
if "First row is header" in conf_colnames :
conf_header_row = conf_row[conf_colnames["First row is header"]]
if "Sheetname" in conf_colnames :
conf_sheetname = conf_row[conf_colnames["Sheetname"]]
if "Strip strings" in conf_colnames :
conf_strip = conf_row[conf_colnames["Strip strings"]]
 
 
out_colnames = {}
out_types = {}
out_rows = []
 
err_rows = []
err_schema = (("Row nr", int),
("Column nr", int),
("Change code", int),
("Original value", str),
("Converted value", str),
("Value type", str),
("Column type", str),
("Info", str),)
err_colnames = {name: index for index, (name,_) in enumerate(err_schema)}
err_types = {index: datatype for index, (_,datatype) in enumerate(err_schema)}
excel_file = in_files[0]
excel_bytereader = await excel_file.bytereader()
start_row = 1
 
with warnings.catch_warnings(record=True):
workbook = openpyxl.load_workbook(filename=excel_bytereader, data_only=True)
 
sheetname = workbook.sheetnames[0]
if conf_sheetname: sheetname = conf_sheetname
sheet = workbook[sheetname]
# set column names
if conf_header_row:
header_name_duplicates = {}
for col_nr, header_cell in enumerate(sheet[1]):
# assume that first None header indicates end of data
if header_cell.value is None:
break
header_name = str(header_cell.value)
# deal with non-unique column names
if header_name in out_colnames:
count = header_name_duplicates.get(header_name,0)
count += 1
header_name_duplicates[header_name] = count
header_name = f"{header_name}_{count}"
out_colnames[header_name] = col_nr
start_row += 1
# initialize types from first data row
for col_nr, cell in enumerate(sheet[start_row]):
if cell.value != None:
out_types[col_nr] = type(cell.value)
else: out_types[col_nr] = str
 
# mit max_col begrenzen wir hier schon die Spalten
for row_nr, row in enumerate(sheet.iter_rows(min_row=start_row,
max_col=len(out_colnames),
values_only=True)):
 
out_row = []
for col_nr, value in enumerate(row):
 
converted_value = value
 
column_type = out_types[col_nr]
value_type = type(converted_value)
 
if type(converted_value) == str:
if conf_strip and converted_value != converted_value.strip:
err_rows.append([row_nr, col_nr, -1,
str(value), str(converted_value),
str(value_type), str(column_type),
"String value stripped"])
converted_value = converted_value.strip()
 
if not converted_value: converted_value = None
 
 
if converted_value is not None and value_type != column_type:
 
if (value_type, column_type) in TYPE_CONVERSIONS:
converted_value, change_code = TYPE_CONVERSIONS[value_type, column_type](converted_value)
else: converted_value, change_code = None, 2
if change_code == 0: err_rows.append([row_nr, col_nr, change_code,
str(value), str(converted_value),
str(value_type), str(column_type),
"Value converted"])
if change_code == 1:
out_types[col_nr] = value_type
err_rows.append([row_nr, col_nr, change_code,
str(value), str(converted_value),
str(value_type), str(column_type),
"Column type changed"])
if change_code == 2: err_rows.append([row_nr, col_nr, change_code,
str(value), str(converted_value),
str(value_type), str(column_type),
"Value set to None, column type remains"])
 
out_row.append(converted_value)
 
out_rows.append(out_row)
workbook.close()
print(len(err_rows), "conversions (strip, value or column type)")
out_tables[0].columns = out_colnames
out_tables[0].types = out_types
out_tables[0].rows = out_rows
out_tables[1].columns = err_colnames
out_tables[1].types = err_types
for row in err_rows:
await out_tables[1].send_row(row)