12345678910111213141516171819202122232425262728293031323334353637383940414243444546 |
- from typing import Dict, Any
- from pandasql import PandaSQL
- class TunedPandaSQL(PandaSQL):
- def multiquery(self, queries: Dict[Any, str], env=None):
- """
- Execute the SQL query.
- Automatically creates tables mentioned in the query from dataframes before executing.
- :param query: SQL query string, which can reference pandas dataframes as SQL tables.
- :param env: Variables environment - a dict mapping table names to pandas dataframes.
- If not specified use local and global variables of the caller.
- :return: Pandas dataframe with the result of the SQL query.
- """
- if env is None:
- from pandasql.sqldf import get_outer_frame_variables
- env = get_outer_frame_variables()
- from pandasql.sqldf import extract_table_names
- from pandasql.sqldf import write_table, read_sql, DatabaseError, PandaSQLException, ResourceClosedError
- with self.conn as conn:
- table_names = set(name
- for query in queries.values()
- for name in extract_table_names(query))
- for table_name in table_names:
- if table_name not in env:
- # don't raise error because the table may be already in the database
- continue
- if self.persist and table_name in self.loaded_tables:
- # table was loaded before using the same instance, don't do it again
- continue
- self.loaded_tables.add(table_name)
- write_table(env[table_name], table_name, conn)
- results = {}
- for k, query in queries.items():
- try:
- results[k] = read_sql(query, conn)
- except DatabaseError as ex:
- raise PandaSQLException(ex)
- except ResourceClosedError:
- # query returns nothing
- results[k] = None
- return results
|