from typing import Self
import mysql.connector
from mysql.connector import errorcode
import datetime
import csv
import pandas as pd
import os
from dotenv import load_dotenv
load_dotenv()
class DataBase:
def __init__(self):
self.username = os.getenv("")
self.password = os.getenv("")
self.endpoint_url = os.getenv("DB_ENDPOINT")
self.db = os.getenv("DB_NAME")
self.cnx = None
self.cursor = None
def connect(self):
try:
self.cnx = mysql.connector.connect(
user=self.username,
password=self.password,
host=self.endpoint_url,
database=self.db,
)
self.cursor = self.cnx.cursor()
except mysql.connector.Error as err:
if err.errno == errorcode.ER_ACCESS_DENIED_ERROR:
raise ValueError("Invalid username or password") from err
elif err.errno == errorcode.ER_BAD_DB_ERROR:
raise ValueError("Database does not exist") from err
else:
raise err
def close(self):
if self.cursor:
self.cursor.close()
if self.cnx:
self.cnx.close()
def execute_query(self, query, params=None):
if not self.cursor:
raise ValueError("Database not connected")
try:
self.cursor.execute(query, params)
except mysql.connector.Error as err:
raise err
def get_total_record_count(self):
query = "SELECT COUNT(*) FROM sensordata"
self.execute_query(query)
return self.cursor.fetchone()[0]
def get_filtered_record_count(self, sensor, start_date_str, end_date_str):
query = "SELECT COUNT(*) FROM sensordata WHERE 1=1 "
params = []
if sensor:
query += "AND SensorID = %s "
params.append(sensor)
if start_date_str and end_date_str:
query += "AND timestamp BETWEEN %s AND %s "
params.extend([start_date_str, end_date_str])
self.execute_query(query, params)
return self.cursor.fetchone()[0]
def fetch_sensor_data(self, sensor=None, start=0, page_size=10, start_date=None, end_date=None):
datetime_format = "%Y-%m-%d %H:%M:%S"
if start_date:
start_date = start_date.strip()
try:
start_date = datetime.datetime.strptime(start_date, datetime_format)
except ValueError as e:
raise ValueError(f"Start date is not in the correct format: {e}")
if end_date:
end_date = end_date.strip()
try:
end_date = datetime.datetime.strptime(end_date, datetime_format)
except ValueError as e:
raise ValueError(f"End date is not in the correct format: {e}")
query = "SELECT * FROM sensordata WHERE 1=1"
params = []
if sensor:
query += " AND SensorID = %s"
params.append(sensor)
if start_date and end_date:
query += " AND timestamp BETWEEN %s AND %s"
params.extend([start_date, end_date])
query += " ORDER BY timestamp ASC LIMIT %s OFFSET %s"
params.extend([page_size, start])
self.execute_query(query, params)
rows = self.cursor.fetchall()
column_names = [desc[0] for desc in self.cursor.description]
data = [dict(zip(column_names, row)) for row in rows]
return data
def export_to_csv(self, file_name, data):
file_loc = os.path.join(os.getcwd(), file_name)
with open(file_loc, mode="w", newline="") as sensorfile:
fieldnames = [
"SensorID",
"location",
"temp",
"humidity",
"pm10",
"pm25",
"pm100",
"particles03",
"timestamp",
]
sensor_write = csv.DictWriter(sensorfile, fieldnames=fieldnames)
sensor_write.writeheader()
for row in data:
sensor_write.writerow(row)
if __name__ == "__main__":
db_info = DataBase()
try:
db_info.connect()
data = db_info.fetch_sensor_data(
sensor="PMTL_sensor1",
start_date="2023-11-25 23:10:00",
end_date="2023-12-03 05:30:00",
start=0,
page_size=100)
db_info.export_to_csv("PMTL_sensor1.csv", data)
except Exception as e:
print(f"An error occurred: {e}")
finally:
db_info.close()
in the previous code I keep getting the error: An error occurred: strptime() takes exactly 2 arguments (1 given). the purpose of this code is download data from a private server
The problem is I am not understanding what I should be changing.
by giving it two inputs, the date and the format
Have you tried to debug the code, to see if
start_date
andend_date
is set? And which line you get the error?