Revert "Borrado de archivos extra"

This reverts commit 06feae543a.
This commit is contained in:
root
2024-09-10 10:38:52 -06:00
parent 9b1d56951b
commit 86f39ea31c
30 changed files with 853 additions and 0 deletions

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

17
lib/argument_parser.py Normal file
View File

@@ -0,0 +1,17 @@
import argparse
import getpass
def parse_arguments():
parser = argparse.ArgumentParser()
parser.add_argument("clave", help="Clave ULSA argument")
args = parser.parse_args()
clave = args.clave
if not clave.startswith('al') or not clave[2:].isdigit() or len(clave) != 8:
raise ValueError("Clave no válida. Debe comenzar con 'al' y tener 6 dígitos.")
contraseña = getpass.getpass("Contraseña: ")
if not clave or not contraseña:
raise ValueError("Clave y/o contraseña no válidos")
return clave, contraseña

53
lib/data_processing.py Normal file
View File

@@ -0,0 +1,53 @@
from bs4 import BeautifulSoup
def process_html(html_doc):
"""
Procesa el HTML de la página y extrae la información relevante
como materias, estados del servicio social, etc.
"""
soup = BeautifulSoup(html_doc, 'lxml')
table = soup.find('table', attrs={'id': 'ctl00_contenedor_HistorialAlumno1_gvMaterias'})
actualmente_cursadas = soup.find('table', attrs={'id': 'ctl00_contenedor_HistorialAlumno1_gvMatOrdinario'})
if table is None:
raise Exception("Tabla no encontrada en la página")
elif actualmente_cursadas is None:
raise Exception("Tabla de materias actualmente cursadas no encontrada en la página")
materias_sgu = []
headers = [header.text for header in table.find_all('th')]
for row in table.find_all('tr'):
cols = row.find_all('td')
if cols and not any(col.text == "Promedio:" for col in cols):
materias_sgu.append({ headers[i]: col.text for i, col in enumerate(cols) if not is_cell_empty(col.text) })
materias_actualmente_cursadas = []
headers = [header.text for header in actualmente_cursadas.find_all('th')]
for row in actualmente_cursadas.find_all('tr'):
cols = row.find_all('td')
if cols and not any(col.text == "No hay Datos" for col in cols):
materias_actualmente_cursadas.append({ headers[i]: col.text for i, col in enumerate(cols) if not is_cell_empty(col.text) })
periodo_actual = soup.find('span', attrs={'id': 'ctl00_contenedor_HistorialAlumno1_Header1_lblPeriodo'})
grupo_actual = soup.find('span', attrs={'id': 'ctl00_contenedor_HistorialAlumno1_Header1_lblgru'})
return clean_materias_sgu(materias_sgu), clean_materias_sgu(materias_actualmente_cursadas), f"'{periodo_actual.text}'", f"'{grupo_actual.text}'"
def is_cell_empty(cell_content):
"""
Verifica si el contenido de una celda está vacío o contiene solo espacios.
"""
return not cell_content.strip() or cell_content == u'\xa0'
def clean_materias_sgu(materias_sgu):
"""
Limpia y ajusta las materias obtenidas del SGU para su procesamiento posterior.
"""
for materia in materias_sgu:
if '\xa0' in materia:
materia['SEMESTRE'] = materia.pop('\xa0')
return materias_sgu
# Puedes agregar más funciones según sea necesario para procesar otros aspectos del HTML

View File

@@ -0,0 +1,47 @@
import psycopg2
from psycopg2.extras import DictCursor
def connect_to_database(dbname="sgi", user="postgres", password="h3rcul3s#$", host="200.13.89.8", port="5432"):
"""
Establece una conexión a la base de datos y la retorna.
"""
try:
connection = psycopg2.connect(
dbname=dbname,
user=user,
password=password,
host=host,
port=port
)
return connection
except psycopg2.Error as e:
print(f"No se pudo conectar a la base de datos: {e}")
exit()
def query_all(sql):
with connect_to_database() as conn:
with conn.cursor(cursor_factory=DictCursor) as cur:
cur.execute(sql)
return cur.fetchall()
def query_single(sql):
with connect_to_database() as conn:
with conn.cursor(cursor_factory=DictCursor) as cur:
cur.execute(sql)
return cur.fetchone() # Returns a dictionary-like object
def execute_query(sql):
with connect_to_database() as conn:
with conn.cursor() as cur:
cur.execute(sql)
conn.commit() # Commit to save the insert operation
def log(message, status, error_message=None, clave=None, no_insertadas=[]):
with connect_to_database(dbname="adcfi", user="postgres", password="Ultr4p0d3r0s0##", host="200.13.89.42", port="5432") as conn:
with conn.cursor() as cur:
cur.execute(f"""
INSERT INTO calificaciones.calificaciones_log (response_status, error_message, user_ip, additional_info)
VALUES ({status}, {f"'{error_message}'" if error_message else 'NULL'}, '200.13.89.42', '{{"clave": "{clave}", "message": "{message}", "no insertadas": "[{', '.join(no_insertada for no_insertada in no_insertadas)}]"}}')
""")
conn.commit()

99
lib/funciones.py Normal file
View File

@@ -0,0 +1,99 @@
from database_operations import query_all, query_single, execute_query
def actualizar_servicio(clave, Alumno_serviciosocial):
# Actualizar servicio social
execute_query(f'''
UPDATE "Alumno" SET "Alumno_serviciosocial" = {Alumno_serviciosocial} WHERE "Usuario_claveULSA" = {int(clave[2:])}
''')
def get_periodos(materias_sgu):
return query_all(f'''SELECT * FROM "Periodo" WHERE "Periodo_shortname" IN ({",".join(f"'{materia['PERIODO']}'" for materia in materias_sgu)})''')
def get_materias(materias_sgu):
return query_all('SELECT * FROM "Materia"')
def get_tipo_calificaciones(materias_sgu):
return query_all(f'''SELECT * FROM "TipoCalificacion"''')
def get_alumno(clave):
return query_single(f'SELECT "Carrera_id", "PlanEstudio_id" FROM "Alumno_view" WHERE "Usuario_claveULSA" = {clave[2:]}')
def get_grupo(grupo, carrera_id):
return query_single(f'''SELECT "Grupo_id" FROM "Grupo_view"
WHERE REGEXP_REPLACE("Grupo_desc", '[^\d]', '', 'g') = '{grupo}' AND (("Carrera_id" = {carrera_id}) OR "Carrera_esComun")
''')
def insert_materia(clave, materia_id, periodo_id, grupo_id):
execute_query(f'''INSERT INTO public."Alumno_Materia"("Usuario_claveULSA", "Materia_id", "Periodo_id", "Grupo_id")
VALUES ({int(clave[2:])}, {materia_id}, {periodo_id}, {grupo_id})
ON CONFLICT ("Usuario_claveULSA", "Materia_id", "Periodo_id") DO NOTHING;
''')
def insert_calificaciones(calificaciones):
execute_query(f'''
insert into "Alumno_Materia_Calificacion"
("Usuario_claveULSA", "Materia_id", "Periodo_id", "TipoCalificacion_id", "Calificacion_calif", "Calificacion_fecha", "Calificacion_comentario")
values {','.join(calificaciones)}
on conflict ("Usuario_claveULSA", "Materia_id", "Periodo_id", "TipoCalificacion_id")
DO UPDATE SET "Calificacion_calif" = EXCLUDED."Calificacion_calif", "Calificacion_comentario" = EXCLUDED."Calificacion_comentario";
''')
#alumno structure {apellido_paterno: str, apellido_materno: str, curp: str, clave_carrera: int, plan: int, clave: int, servicio_social: bool, nombre: str, correo: str}
def insert_alumno(alumno):
alumno_base = query_single(f'SELECT "Carrera_id", "PlanEstudio_id" FROM "Alumno_view" WHERE "Usuario_claveULSA" = {alumno["clave"]}')
if alumno_base:
return alumno_base
usuario_base = query_single(f"""SELECT * FROM "Usuario" WHERE "Usuario_curp" = '{alumno["curp"]}'""")
plan_estudio_base = query_single(f"""SELECT * FROM "PlanEstudio_view" WHERE "PlanEstudio_desc" LIKE '%{alumno["plan"]}' AND "Carrera_clave" = {alumno["clave_carrera"]}""")
if usuario_base:
execute_query(f'''
INSERT INTO public."Alumno"("Usuario_claveULSA", "Usuario_id", "PlanEstudio_id", "Alumno_fecha_ingreso", "Alumno_generacion", "Alumno_serviciosocial")
VALUES ({alumno["clave"]}, {usuario_base["Usuario_id"]}, {plan_estudio_base["PlanEstudio_id"]}, '{alumno['fecha_ingreso']}', '{alumno['fecha_ingreso']}', {alumno["servicio_social"]})
ON CONFLICT ("Usuario_claveULSA") DO NOTHING;
''')
execute_query(f'''
INSERT INTO "Alumno_SubEstadoAlumno" ("SubEstadoAlumno_id", "Usuario_claveULSA", "SEA_fecha", "SEA_actual")
VALUES (3, {alumno["clave"]},'{alumno['fecha_ingreso']}', true)
''')
alumno_base = query_single(f'SELECT "Carrera_id", "PlanEstudio_id" FROM "Alumno_view" WHERE "Usuario_claveULSA" = {alumno["clave"]}')
return alumno_base
usuario_base = query_single(f'''
INSERT INTO public."Usuario"("Usuario_nombre", "Usuario_apellidos", "Usuario_curp")
VALUES ('{alumno["nombre"]}', '{alumno["apellido_paterno"]} {alumno["apellido_materno"]}', '{alumno["curp"]}') RETURNING "Usuario_id"
''')
execute_query(f'''
INSERT INTO public."Alumno"("Usuario_claveULSA", "Usuario_id", "PlanEstudio_id", "Alumno_fecha_ingreso", "Alumno_generacion", "Alumno_serviciosocial")
VALUES ({alumno["clave"]}, {usuario_base["Usuario_id"]}, {plan_estudio_base["PlanEstudio_id"]}, '{alumno['fecha_ingreso']}', '{alumno['fecha_ingreso']}', {alumno["servicio_social"]})
ON CONFLICT ("Usuario_claveULSA") DO NOTHING;
''')
alumno_base = query_single(f'SELECT "Carrera_id", "PlanEstudio_id" FROM "Alumno_view" WHERE "Usuario_claveULSA" = {alumno["clave"]}')
return alumno_base
# insert actualmente_cursadas
def insert_actualmente_cursadas(clave, actualmente_cursadas, periodo_actual, grupo_actual):
grupo_base = query_single(f'SELECT "Grupo_id" FROM "Grupo" WHERE "Grupo_desc" = {grupo_actual}')
periodo_base = query_single(f'SELECT "Periodo_id" FROM "Periodo" WHERE "Periodo_shortname" = {periodo_actual}')
# only where materia["Clave"] is set
for materia in filter(lambda materia: "Clave" in materia, actualmente_cursadas):
# to varchar
materia_clave = f"'{materia['Clave']}'"
materia_base = query_single(f'SELECT "Materia_id" FROM "Materia" WHERE {materia_clave} = ANY("Materia_claves")')
if materia_base:
execute_query(f'''
INSERT INTO public."Alumno_Materia"("Usuario_claveULSA", "Materia_id", "Periodo_id", "Grupo_id")
VALUES ({clave[2:]}, {materia_base["Materia_id"]}, {periodo_base["Periodo_id"]}, {grupo_base["Grupo_id"]})
ON CONFLICT ("Usuario_claveULSA", "Materia_id", "Periodo_id") DO NOTHING;
''')
def insert_datos(alumno):
execute_query(f'''
INSERT INTO public."pos_ultima_extraccion"("Usuario_claveULSA", "telefono", "correo", "estatus", "promedio", "sexo", "semestre")
VALUES ({alumno["clave"]}, '{alumno["telefono"]}', '{alumno["correo"]}', '{alumno["estatus"]}', {alumno["promedio"]} , '{alumno["sexo"]}', {alumno["semestre"]})
ON CONFLICT ("Usuario_claveULSA") DO UPDATE SET "telefono" = EXCLUDED."telefono", "correo" = EXCLUDED."correo", "estatus" = EXCLUDED."estatus", "promedio" = EXCLUDED."promedio",
"actualizacion" = now();
''')

35
lib/log.py Normal file
View File

@@ -0,0 +1,35 @@
import psycopg2
from psycopg2.extras import DictCursor
def connect_to_database(dbname="sgi", user="postgres", password="sys4lci", host="200.13.89.27", port="5432"):
"""
Establece una conexión a la base de datos y la retorna.
"""
connection = psycopg2.connect(
dbname=dbname,
user=user,
password=password,
host=host,
port=port
)
return connection
def query_all(sql):
with connect_to_database() as conn:
with conn.cursor(cursor_factory=DictCursor) as cur:
cur.execute(sql)
return cur.fetchall()
def query_single(sql):
with connect_to_database() as conn:
with conn.cursor(cursor_factory=DictCursor) as cur:
cur.execute(sql)
return cur.fetchone() # Returns a dictionary-like object
def execute_query(sql):
with connect_to_database() as conn:
with conn.cursor() as cur:
cur.execute(sql)
conn.commit() # Commit to save the insert operation

16
lib/selenium_setup.py Normal file
View File

@@ -0,0 +1,16 @@
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
def configure_selenium():
options = Options()
options.add_argument("--headless")
options.add_argument("--disable-gpu")
options.add_argument("--no-sandbox")
options.add_argument("--disable-dev-shm-usage")
options.add_argument("--window-size=1920x1080")
PATH = "/usr/bin/chromedriver"
service = Service(PATH)
driver = webdriver.Chrome(service=service, options=options)
return driver

54
lib/web_navigation.py Normal file
View File

@@ -0,0 +1,54 @@
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
def navigate_to_url(driver, url, clave, contraseña):
formatted_url = f"https://{clave}:{contraseña}@{url}"
driver.get(formatted_url)
driver.get(f'https://{url}')
# If dentro del código no existe un elemento con el id ctl00_lnkHome
if not WebDriverWait(driver, 10).until(
EC.presence_of_element_located((By.ID, "ctl00_lnkHome"))
):
raise Exception("No se pudo iniciar sesión.")
# wait until it appears this element.id = ctl00_contenedor_HistorialAlumno1_lblApPatAlumnoHP and get it
def wait_for_element(driver, element_id):
return WebDriverWait(driver, 10).until(
EC.presence_of_element_located((By.ID, element_id))
)
servicio_social = wait_for_element(driver, "ctl00_contenedor_HistorialAlumno1_Header1_lblSS")
alumno = {
"apellido_paterno": wait_for_element(driver, "ctl00_contenedor_HistorialAlumno1_lblApPatAlumnoHP").text,
"apellido_materno": wait_for_element(driver, "ctl00_contenedor_HistorialAlumno1_lblApMatAlumnoHP").text,
"curp": wait_for_element(driver, "ctl00_contenedor_HistorialAlumno1_lblCURPAlumnoHP").text,
"clave_carrera": int(wait_for_element(driver, "ctl00_contenedor_HistorialAlumno1_Header1_lblCveCarrera").text),
"plan": int(wait_for_element(driver, "ctl00_contenedor_HistorialAlumno1_Header1_lblAlupla").text),
"clave": int(wait_for_element(driver, "ctl00_contenedor_HistorialAlumno1_Header1_lblCveUlsa").text),
"servicio_social": servicio_social.text == "Realizado",
"nombre":wait_for_element(driver, "ctl00_contenedor_HistorialAlumno1_lblNombreAlumnoHP").text,
"correo": wait_for_element(driver, "ctl00_contenedor_HistorialAlumno1_lblCorreoAlumnoHP").text,
'estatus' : wait_for_element(driver, "ctl00_contenedor_HistorialAlumno1_Header1_lblStat").text,
"telefono": wait_for_element(driver, "ctl00_contenedor_HistorialAlumno1_lblTelefonoAlumnoHP").text,
"semestre": int(wait_for_element(driver, "ctl00_contenedor_HistorialAlumno1_Header1_lblSem").text),
"sexo": wait_for_element(driver, "ctl00_contenedor_HistorialAlumno1_lblSexoAlumnoHP").text,
}
click_element(driver, element_id="ctl00_contenedor_HistorialAlumno1_lblBtnSeccionHAcademico")
historial_academico = driver.page_source
alumno['promedio'] = float(wait_for_element(driver, "ctl00_contenedor_HistorialAlumno1_lblPromedioAlumnoHA").text)
return alumno, historial_academico
def click_element(driver, element_id):
elemento = WebDriverWait(driver, 10).until(
EC.presence_of_element_located((By.ID, element_id))
)
elemento.click()