import
warnings
import
openpyxl
import
datetime
HEADER_ROW
=
True
START_COL
=
1
END_COL
=
-
1
SHEET_NAME
=
""
DATE_FORMAT
=
"%Y-%m-%d %H-%M-%S"
TIME_FORMAT
=
"%H-%M-%S"
DEFAULT_DATE
=
datetime.datetime(
1970
,
1
,
1
)
STRIP_STRINGS
=
True
def
str_to_int(string:
str
)
-
>
int
:
value
=
None
change_code
=
0
try
: value
=
int
(string)
except
ValueError: change_code
=
1
return
value, change_code
def
str_to_float(string:
str
)
-
>
float
:
value
=
None
change_code
=
0
try
: value
=
float
(string)
except
ValueError: change_code
=
1
return
value, change_code
def
str_to_datetime(string:
str
)
-
> datetime.datetime:
value
=
None
change_code
=
0
try
: value
=
datetime.datetime.strptime(value, DATE_FORMAT)
except
ValueError: change_code
=
1
return
value, change_code
def
str_to_time(string:
str
)
-
> datetime.time:
value
=
None
change_code
=
0
try
: value
=
datetime.datetime.strptime(value, TIME_FORMAT).time()
except
ValueError: change_code
=
1
return
value, change_code
TYPE_CONVERSIONS
=
{(
str
,
int
):
lambda
x: str_to_int(x),
(
str
,
float
):
lambda
x: str_to_float(x),
(
str
, datetime.time):
lambda
x: str_to_time(x),
(
str
, datetime.datetime):
lambda
x: str_to_datetime(x),
(
str
,
bool
):
lambda
x: (
bool
(x),
0
),
(
int
,
str
):
lambda
x: (
str
(x),
0
),
(
int
,
float
):
lambda
x: (
float
(x),
0
),
(
int
, datetime.time):
lambda
_: (
None
,
2
),
(
int
, datetime.datetime):
lambda
_: (
None
,
2
),
(
int
,
bool
):
lambda
x: (
bool
(x),
0
),
(
float
,
int
):
lambda
x: (
None
,
1
),
(
float
, datetime.time):
lambda
_: (
None
,
1
),
(
float
, datetime.datetime):
lambda
_: (
None
,
1
),
(
float
,
bool
):
lambda
x: (
bool
(x),
0
),
(
float
,
str
):
lambda
x: (
str
(x),
0
),
(
bool
,
str
):
lambda
x: (
str
(x),
0
),
(
bool
,
int
):
lambda
x: (
int
(x),
0
),
(
bool
,
float
):
lambda
x: (
float
(x),
0
),
(
bool
, datetime.time):
lambda
_: (
None
,
2
),
(
bool
, datetime.datetime):
lambda
_: (
None
,
2
),
(datetime.time,
str
):
lambda
x: (x.strftime(TIME_FORMAT),
0
),
(datetime.time,
int
):
lambda
_: (
None
,
2
),
(datetime.time,
float
):
lambda
_: (
None
,
2
),
(datetime.time,
bool
):
lambda
x: (
None
,
2
),
(datetime.time, datetime.datetime):
lambda
x: ((DEFAULT_DATE
+
datetime.timedelta(hours
=
x.hour,
minutes
=
x.minute,
seconds
=
x.seconds)),
0
),
(datetime.datetime,
str
):
lambda
x: (x.strftime(DATE_FORMAT),
0
),
(datetime.datetime,
int
):
lambda
_: (
None
,
2
),
(datetime.datetime,
float
):
lambda
_: (
None
,
2
),
(datetime.datetime,
bool
):
lambda
x: (
None
,
2
),
(datetime.datetime, datetime.time):
lambda
x: (x.time(),
0
),
}
async
def
run(params, in_tables, out_tables, in_files, out_files, misc):
conf_colnames
=
in_tables[
0
].columns
conf_table
=
await in_tables[
0
].rows()
conf_row
=
[]
if
conf_table: conf_row
=
conf_table[
0
]
conf_header_row
=
HEADER_ROW
conf_sheetname
=
SHEET_NAME
conf_strip
=
STRIP_STRINGS
if
"First row is header"
in
conf_colnames :
conf_header_row
=
conf_row[conf_colnames[
"First row is header"
]]
if
"Sheetname"
in
conf_colnames :
conf_sheetname
=
conf_row[conf_colnames[
"Sheetname"
]]
if
"Strip strings"
in
conf_colnames :
conf_strip
=
conf_row[conf_colnames[
"Strip strings"
]]
out_colnames
=
{}
out_types
=
{}
out_rows
=
[]
err_rows
=
[]
err_schema
=
((
"Row nr"
,
int
),
(
"Column nr"
,
int
),
(
"Change code"
,
int
),
(
"Original value"
,
str
),
(
"Converted value"
,
str
),
(
"Value type"
,
str
),
(
"Column type"
,
str
),
(
"Info"
,
str
),)
err_colnames
=
{name: index
for
index, (name,_)
in
enumerate
(err_schema)}
err_types
=
{index: datatype
for
index, (_,datatype)
in
enumerate
(err_schema)}
excel_file
=
in_files[
0
]
excel_bytereader
=
await excel_file.bytereader()
start_row
=
1
with warnings.catch_warnings(record
=
True
):
workbook
=
openpyxl.load_workbook(filename
=
excel_bytereader, data_only
=
True
)
sheetname
=
workbook.sheetnames[
0
]
if
conf_sheetname: sheetname
=
conf_sheetname
sheet
=
workbook[sheetname]
if
conf_header_row:
header_name_duplicates
=
{}
for
col_nr, header_cell
in
enumerate
(sheet[
1
]):
if
header_cell.value
is
None
:
break
header_name
=
str
(header_cell.value)
if
header_name
in
out_colnames:
count
=
header_name_duplicates.get(header_name,
0
)
count
+
=
1
header_name_duplicates[header_name]
=
count
header_name
=
f
"{header_name}_{count}"
out_colnames[header_name]
=
col_nr
start_row
+
=
1
for
col_nr, cell
in
enumerate
(sheet[start_row]):
if
cell.value !
=
None
:
out_types[col_nr]
=
type
(cell.value)
else
: out_types[col_nr]
=
str
for
row_nr, row
in
enumerate
(sheet.iter_rows(min_row
=
start_row,
max_col
=
len
(out_colnames),
values_only
=
True
)):
out_row
=
[]
for
col_nr, value
in
enumerate
(row):
converted_value
=
value
column_type
=
out_types[col_nr]
value_type
=
type
(converted_value)
if
type
(converted_value)
=
=
str
:
if
conf_strip
and
converted_value !
=
converted_value.strip:
err_rows.append([row_nr, col_nr,
-
1
,
str
(value),
str
(converted_value),
str
(value_type),
str
(column_type),
"String value stripped"
])
converted_value
=
converted_value.strip()
if
not
converted_value: converted_value
=
None
if
converted_value
is
not
None
and
value_type !
=
column_type:
if
(value_type, column_type)
in
TYPE_CONVERSIONS:
converted_value, change_code
=
TYPE_CONVERSIONS[value_type, column_type](converted_value)
else
: converted_value, change_code
=
None
,
2
if
change_code
=
=
0
: err_rows.append([row_nr, col_nr, change_code,
str
(value),
str
(converted_value),
str
(value_type),
str
(column_type),
"Value converted"
])
if
change_code
=
=
1
:
out_types[col_nr]
=
value_type
err_rows.append([row_nr, col_nr, change_code,
str
(value),
str
(converted_value),
str
(value_type),
str
(column_type),
"Column type changed"
])
if
change_code
=
=
2
: err_rows.append([row_nr, col_nr, change_code,
str
(value),
str
(converted_value),
str
(value_type),
str
(column_type),
"Value set to None, column type remains"
])
out_row.append(converted_value)
out_rows.append(out_row)
workbook.close()
print
(
len
(err_rows),
"conversions (strip, value or column type)"
)
out_tables[
0
].columns
=
out_colnames
out_tables[
0
].types
=
out_types
out_tables[
0
].rows
=
out_rows
out_tables[
1
].columns
=
err_colnames
out_tables[
1
].types
=
err_types
for
row
in
err_rows:
await out_tables[
1
].send_row(row)