Commit inicial

This commit is contained in:
2024-08-05 10:46:20 -06:00
parent eb5f92e19c
commit 8f6b1f129e
32 changed files with 2101 additions and 0 deletions

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

181
app.py Normal file
View File

@@ -0,0 +1,181 @@
import psycopg2
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.chrome.options import Options
from selenium.common.exceptions import NoSuchElementException
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
import os
from flask import Flask, request, jsonify
from waitress import serve
import pandas as pd
from io import StringIO
# Set options for the Chromium browser
chrome_options = Options()
chrome_options.add_argument("--headless") # Optional: Run Chromium in headless mode
chrome_options.add_argument("--no-sandbox")
chrome_options.add_argument("--disable-dev-shm-usage")
# Specify the path to the Chromium driver
service = Service('/usr/bin/chromedriver')
driver = None
def extract(username: str, password: str):
url_credentials = f'https://{username}:{password}@sgu.ulsa.edu.mx/psulsa/alumnos/consultainformacionalumnos/consultainformacion.aspx'
url = 'https://sgu.ulsa.edu.mx/psulsa/alumnos/consultainformacionalumnos/consultainformacion.aspx'
username_integer = int(username[2:])
def insert_alumno_extraccion(datos_html: str, materias_html: str, historial_html: str = 'error', materias_actuales_html: str = 'error'):
try:
conn = psycopg2.connect(
dbname=os.getenv("DBNAME"),
user=os.getenv("DBUSER"),
password=os.getenv("DBPASSWORD"),
host=os.getenv("DBHOST"),
port=os.getenv("DBPORT")
)
cur = conn.cursor()
insert_query = """
INSERT INTO public.alumno_extraccion ("Usuario_claveULSA", datos_html, materias_html, historial_html, materias_actuales_html) VALUES (%s, TRIM(%s), TRIM(%s), TRIM(%s)::JSONB, TRIM(%s))
ON CONFLICT ("Usuario_claveULSA") DO UPDATE SET datos_html = EXCLUDED.datos_html, materias_html = EXCLUDED.materias_html, error_message = NULL, registrado = DEFAULT;
"""
cur.execute(insert_query, (username_integer, datos_html, materias_html, historial_html, materias_actuales_html))
conn.commit()
return cur.query.decode('utf-8')
except psycopg2.ProgrammingError as e:
print(f"Error de sintaxis: {e}")
except psycopg2.IntegrityError as e:
print(f"Error de integridad: {e}")
except Exception as e:
print(f"Error: {e}")
finally:
cur.close()
conn.close()
def update_alumno_extraccion_error(error: str):
try:
conn = psycopg2.connect(
dbname=os.getenv("DBNAME"),
user=os.getenv("DBUSER"),
password=os.getenv("DBPASSWORD"),
host=os.getenv("DBHOST"),
port=os.getenv("DBPORT")
)
cur = conn.cursor()
update_query = """
INSERT INTO public.alumno_extraccion ("Usuario_claveULSA", error_message) VALUES (%s, %s)
ON CONFLICT ("Usuario_claveULSA") DO UPDATE SET error_message = EXCLUDED.error_message,
materias_html = DEFAULT, registrado = DEFAULT;
"""
cur.execute(update_query, (username_integer, error[:255]))
conn.commit()
print("Data updated successfully")
except psycopg2.ProgrammingError as e:
print(f"Error de sintaxis: {e}")
finally:
cur.close()
conn.close()
try:
driver.get(url_credentials)
driver.get(url)
# si no existe el elemento, ctl00_contenedor_control
datos_html = driver.find_element(By.ID, 'ctl00_contenedor_control').get_attribute('innerHTML')
elemento = WebDriverWait(driver, 3.5).until(
EC.presence_of_element_located((By.ID, 'ctl00_contenedor_HistorialAlumno1_lblBtnSeccionHAcademico'))
)
elemento.click()
# Get the HTML content of the materias element
materias_html = driver.find_element(By.ID, 'ctl00_contenedor_HistorialAlumno1_divHAcademico').get_attribute('innerHTML')
historial_html = driver.find_element(By.ID, 'ctl00_contenedor_HistorialAlumno1_gvMaterias').get_attribute('innerHTML')
# materias_actuales_html = driver.find_element(By.ID, 'ctl00_contenedor_HistorialAlumno1_div13').get_attribute('innerHTML')
historial_html_io = StringIO(f"<table>{historial_html}</table>")
# Read the HTML table into a DataFrame
df = pd.read_html(historial_html_io)[0]
# Convert the DataFrame to JSON
json_result = df[df['GRUPO'] != 'Promedio:'].to_json(orient='records')
# Connect to PostgreSQL database
query = insert_alumno_extraccion(datos_html, materias_html, json_result)
print("Data extracted successfully")
return json_result
except NoSuchElementException as e:
update_alumno_extraccion_error(str(e))
def se_puede_extraer():
try:
conn = conn = psycopg2.connect(
dbname=os.getenv("DBNAME"),
user=os.getenv("DBUSER"),
password=os.getenv("DBPASSWORD"),
host=os.getenv("DBHOST"),
port=os.getenv("DBPORT")
)
cursor = conn.cursor()
# SELECCIONAR ULTIMA planeacion
query = """
SELECT 1
FROM alumno_extraccion_fecha
WHERE CURRENT_DATE BETWEEN fecha_inicio AND fecha_fin
ORDER BY CREATED_AT DESC
LIMIT 1;
"""
# Ejecuta la consulta
cursor.execute(query)
result = cursor.fetchone()
# Verifica si se obtuvo algún resultado
exists = result is not None
# Cierra el cursor y la conexión
cursor.close()
conn.close()
return exists
except Exception as e:
print(f"Error: {e}")
return False
app = Flask(__name__)
@app.route('/calificaciones', methods=['POST'])
def main():
global driver
# Initialize the WebDriver
driver = webdriver.Chrome(service=service, options=chrome_options)
username = request.form.get('clave')
password = request.form.get('password')
if se_puede_extraer():
query = extract(username, password)
# Close the session
driver.quit()
return jsonify({"message": "Data extracted successfully"})
if __name__ == '__main__':
serve(app, host='0.0.0.0', port=5000)

194
extraccion_html.c Normal file
View File

@@ -0,0 +1,194 @@
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <libpq-fe.h>
#include <libxml/HTMLparser.h>
#include <libxml/xpath.h>
// Define the alumno struct
struct alumno {
char apellido_paterno[100];
char apellido_materno[100];
char curp[20];
char clave_carrera[2];
char plan[2];
char clave[6];
char nombre[100];
char correo[100];
char estatus;
char telefono[11];
int semestre;
char sexo;
};
// Function to check for PostgreSQL connection errors
void check_conn_status(PGconn *conn) {
if (PQstatus(conn) != CONNECTION_OK) {
fprintf(stderr, "Connection to database failed: %s", PQerrorMessage(conn));
PQfinish(conn);
exit(EXIT_FAILURE);
}
}
// Function to check for PostgreSQL query execution errors
void check_exec_status(PGresult *res, PGconn *conn) {
if (PQresultStatus(res) != PGRES_TUPLES_OK) {
fprintf(stderr, "Query failed: %s", PQerrorMessage(conn));
PQclear(res);
PQfinish(conn);
exit(EXIT_FAILURE);
}
}
// Function to extract content from an HTML element by ID
char* get_element_content_by_id(htmlDocPtr doc, const char *id) {
xmlChar xpath[100];
snprintf((char *)xpath, sizeof(xpath), "//*[@id='%s']", id);
xmlXPathContextPtr xpathCtx = xmlXPathNewContext(doc);
xmlXPathObjectPtr xpathObj = xmlXPathEvalExpression(xpath, xpathCtx);
if (xpathObj == NULL || xmlXPathNodeSetIsEmpty(xpathObj->nodesetval)) {
xmlXPathFreeObject(xpathObj);
xmlXPathFreeContext(xpathCtx);
return NULL;
}
xmlNodePtr node = xpathObj->nodesetval->nodeTab[0];
xmlChar *content = xmlNodeGetContent(node);
xmlXPathFreeObject(xpathObj);
xmlXPathFreeContext(xpathCtx);
return (char *)content;
}
// Function to parse HTML content using libxml2 and populate the alumno struct
void parse_html(const char *html, struct alumno *alum) {
htmlDocPtr doc = htmlReadMemory(html, strlen(html), NULL, NULL, HTML_PARSE_NOERROR | HTML_PARSE_NOWARNING);
if (doc == NULL) {
fprintf(stderr, "Failed to parse HTML\n");
return;
}
char *content;
content = get_element_content_by_id(doc, "ctl00_contenedor_HistorialAlumno1_lblApPatAlumnoHP");
if (content) {
strncpy(alum->apellido_paterno, content, 100);
xmlFree(content);
}
content = get_element_content_by_id(doc, "ctl00_contenedor_HistorialAlumno1_lblApMatAlumnoHP");
if (content) {
strncpy(alum->apellido_materno, content, 100);
xmlFree(content);
}
content = get_element_content_by_id(doc, "ctl00_contenedor_HistorialAlumno1_lblCURPAlumnoHP");
if (content) {
strncpy(alum->curp, content, 20);
xmlFree(content);
}
content = get_element_content_by_id(doc, "ctl00_contenedor_HistorialAlumno1_Header1_lblCveCarrera");
if (content) {
strncpy(alum->clave_carrera, content, 2);
xmlFree(content);
}
content = get_element_content_by_id(doc, "ctl00_contenedor_HistorialAlumno1_Header1_lblAlupla");
if (content) {
strncpy(alum->plan, content, 4);
xmlFree(content);
}
content = get_element_content_by_id(doc, "ctl00_contenedor_HistorialAlumno1_Header1_lblCveUlsa");
if (content) {
strncpy(alum->clave, content, 7);
xmlFree(content);
}
content = get_element_content_by_id(doc, "ctl00_contenedor_HistorialAlumno1_lblNombreAlumnoHP");
if (content) {
strncpy(alum->nombre, content, 100);
xmlFree(content);
}
content = get_element_content_by_id(doc, "ctl00_contenedor_HistorialAlumno1_lblCorreoAlumnoHP");
if (content) {
strncpy(alum->correo, content, 100);
xmlFree(content);
}
content = get_element_content_by_id(doc, "ctl00_contenedor_HistorialAlumno1_Header1_lblStat");
if (content) {
alum->estatus = content[0];
xmlFree(content);
}
content = get_element_content_by_id(doc, "ctl00_contenedor_HistorialAlumno1_lblTelefonoAlumnoHP");
if (content) {
strncpy(alum->telefono, content, 11);
xmlFree(content);
}
content = get_element_content_by_id(doc, "ctl00_contenedor_HistorialAlumno1_Header1_lblSem");
if (content) {
alum->semestre = atoi(content);
xmlFree(content);
}
content = get_element_content_by_id(doc, "ctl00_contenedor_HistorialAlumno1_lblSexoAlumnoHP");
if (content) {
alum->sexo = content[0];
xmlFree(content);
}
xmlFreeDoc(doc);
}
int main() {
// PostgreSQL connection parameters
const char *conninfo = "dbname=sgi user=postgres password=h3rcul3s#$ hostaddr=200.13.89.8 port=5432";
PGconn *conn = PQconnectdb(conninfo);
// Check connection status
check_conn_status(conn);
// Execute SQL query to retrieve HTML content
PGresult *res = PQexec(conn, "SELECT datos_html FROM public.alumno_extraccion WHERE error_message IS NULL");
check_exec_status(res, conn);
// Process each row
int rows = PQntuples(res);
for (int i = 0; i < rows; i++) {
char *html_content = PQgetvalue(res, i, 0);
// printf("HTML Content: %s\n", html_content);
struct alumno alum;
memset(&alum, 0, sizeof(alum)); // Initialize the struct to zero
parse_html(html_content, &alum);
printf("Apellido Paterno: %s\n", alum.apellido_paterno);
printf("Apellido Materno: %s\n", alum.apellido_materno);
printf("CURP: %s\n", alum.curp);
printf("Clave Carrera: %s\n", alum.clave_carrera);
printf("Plan: %s\n", alum.plan);
printf("Clave: %s\n", alum.clave);
printf("Nombre: %s\n", alum.nombre);
printf("Correo: %s\n", alum.correo);
printf("Estatus: %c\n", alum.estatus);
printf("Telefono: %s\n", alum.telefono);
printf("Semestre: %d\n", alum.semestre);
printf("Sexo: %c\n", alum.sexo);
}
// Clean up
PQclear(res);
PQfinish(conn);
return 0;
}

1067
ian_tabla.html Normal file

File diff suppressed because one or more lines are too long

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

17
lib/argument_parser.py Normal file
View File

@@ -0,0 +1,17 @@
import argparse
import getpass
def parse_arguments():
parser = argparse.ArgumentParser()
parser.add_argument("clave", help="Clave ULSA argument")
args = parser.parse_args()
clave = args.clave
if not clave.startswith('al') or not clave[2:].isdigit() or len(clave) != 8:
raise ValueError("Clave no válida. Debe comenzar con 'al' y tener 6 dígitos.")
contraseña = getpass.getpass("Contraseña: ")
if not clave or not contraseña:
raise ValueError("Clave y/o contraseña no válidos")
return clave, contraseña

53
lib/data_processing.py Normal file
View File

@@ -0,0 +1,53 @@
from bs4 import BeautifulSoup
def process_html(html_doc):
"""
Procesa el HTML de la página y extrae la información relevante
como materias, estados del servicio social, etc.
"""
soup = BeautifulSoup(html_doc, 'lxml')
table = soup.find('table', attrs={'id': 'ctl00_contenedor_HistorialAlumno1_gvMaterias'})
actualmente_cursadas = soup.find('table', attrs={'id': 'ctl00_contenedor_HistorialAlumno1_gvMatOrdinario'})
if table is None:
raise Exception("Tabla no encontrada en la página")
elif actualmente_cursadas is None:
raise Exception("Tabla de materias actualmente cursadas no encontrada en la página")
materias_sgu = []
headers = [header.text for header in table.find_all('th')]
for row in table.find_all('tr'):
cols = row.find_all('td')
if cols and not any(col.text == "Promedio:" for col in cols):
materias_sgu.append({ headers[i]: col.text for i, col in enumerate(cols) if not is_cell_empty(col.text) })
materias_actualmente_cursadas = []
headers = [header.text for header in actualmente_cursadas.find_all('th')]
for row in actualmente_cursadas.find_all('tr'):
cols = row.find_all('td')
if cols and not any(col.text == "No hay Datos" for col in cols):
materias_actualmente_cursadas.append({ headers[i]: col.text for i, col in enumerate(cols) if not is_cell_empty(col.text) })
periodo_actual = soup.find('span', attrs={'id': 'ctl00_contenedor_HistorialAlumno1_Header1_lblPeriodo'})
grupo_actual = soup.find('span', attrs={'id': 'ctl00_contenedor_HistorialAlumno1_Header1_lblgru'})
return clean_materias_sgu(materias_sgu), clean_materias_sgu(materias_actualmente_cursadas), f"'{periodo_actual.text}'", f"'{grupo_actual.text}'"
def is_cell_empty(cell_content):
"""
Verifica si el contenido de una celda está vacío o contiene solo espacios.
"""
return not cell_content.strip() or cell_content == u'\xa0'
def clean_materias_sgu(materias_sgu):
"""
Limpia y ajusta las materias obtenidas del SGU para su procesamiento posterior.
"""
for materia in materias_sgu:
if '\xa0' in materia:
materia['SEMESTRE'] = materia.pop('\xa0')
return materias_sgu
# Puedes agregar más funciones según sea necesario para procesar otros aspectos del HTML

View File

@@ -0,0 +1,47 @@
import psycopg2
from psycopg2.extras import DictCursor
def connect_to_database(dbname="sgi", user="postgres", password="h3rcul3s#$", host="200.13.89.8", port="5432"):
"""
Establece una conexión a la base de datos y la retorna.
"""
try:
connection = psycopg2.connect(
dbname=dbname,
user=user,
password=password,
host=host,
port=port
)
return connection
except psycopg2.Error as e:
print(f"No se pudo conectar a la base de datos: {e}")
exit()
def query_all(sql):
with connect_to_database() as conn:
with conn.cursor(cursor_factory=DictCursor) as cur:
cur.execute(sql)
return cur.fetchall()
def query_single(sql):
with connect_to_database() as conn:
with conn.cursor(cursor_factory=DictCursor) as cur:
cur.execute(sql)
return cur.fetchone() # Returns a dictionary-like object
def execute_query(sql):
with connect_to_database() as conn:
with conn.cursor() as cur:
cur.execute(sql)
conn.commit() # Commit to save the insert operation
def log(message, status, error_message=None, clave=None, no_insertadas=[]):
with connect_to_database(dbname="adcfi", user="postgres", password="Ultr4p0d3r0s0##", host="200.13.89.42", port="5432") as conn:
with conn.cursor() as cur:
cur.execute(f"""
INSERT INTO calificaciones.calificaciones_log (response_status, error_message, user_ip, additional_info)
VALUES ({status}, {f"'{error_message}'" if error_message else 'NULL'}, '200.13.89.42', '{{"clave": "{clave}", "message": "{message}", "no insertadas": "[{', '.join(no_insertada for no_insertada in no_insertadas)}]"}}')
""")
conn.commit()

99
lib/funciones.py Normal file
View File

@@ -0,0 +1,99 @@
from database_operations import query_all, query_single, execute_query
def actualizar_servicio(clave, Alumno_serviciosocial):
# Actualizar servicio social
execute_query(f'''
UPDATE "Alumno" SET "Alumno_serviciosocial" = {Alumno_serviciosocial} WHERE "Usuario_claveULSA" = {int(clave[2:])}
''')
def get_periodos(materias_sgu):
return query_all(f'''SELECT * FROM "Periodo" WHERE "Periodo_shortname" IN ({",".join(f"'{materia['PERIODO']}'" for materia in materias_sgu)})''')
def get_materias(materias_sgu):
return query_all('SELECT * FROM "Materia"')
def get_tipo_calificaciones(materias_sgu):
return query_all(f'''SELECT * FROM "TipoCalificacion"''')
def get_alumno(clave):
return query_single(f'SELECT "Carrera_id", "PlanEstudio_id" FROM "Alumno_view" WHERE "Usuario_claveULSA" = {clave[2:]}')
def get_grupo(grupo, carrera_id):
return query_single(f'''SELECT "Grupo_id" FROM "Grupo_view"
WHERE REGEXP_REPLACE("Grupo_desc", '[^\d]', '', 'g') = '{grupo}' AND (("Carrera_id" = {carrera_id}) OR "Carrera_esComun")
''')
def insert_materia(clave, materia_id, periodo_id, grupo_id):
execute_query(f'''INSERT INTO public."Alumno_Materia"("Usuario_claveULSA", "Materia_id", "Periodo_id", "Grupo_id")
VALUES ({int(clave[2:])}, {materia_id}, {periodo_id}, {grupo_id})
ON CONFLICT ("Usuario_claveULSA", "Materia_id", "Periodo_id") DO NOTHING;
''')
def insert_calificaciones(calificaciones):
execute_query(f'''
insert into "Alumno_Materia_Calificacion"
("Usuario_claveULSA", "Materia_id", "Periodo_id", "TipoCalificacion_id", "Calificacion_calif", "Calificacion_fecha", "Calificacion_comentario")
values {','.join(calificaciones)}
on conflict ("Usuario_claveULSA", "Materia_id", "Periodo_id", "TipoCalificacion_id")
DO UPDATE SET "Calificacion_calif" = EXCLUDED."Calificacion_calif", "Calificacion_comentario" = EXCLUDED."Calificacion_comentario";
''')
#alumno structure {apellido_paterno: str, apellido_materno: str, curp: str, clave_carrera: int, plan: int, clave: int, servicio_social: bool, nombre: str, correo: str}
def insert_alumno(alumno):
alumno_base = query_single(f'SELECT "Carrera_id", "PlanEstudio_id" FROM "Alumno_view" WHERE "Usuario_claveULSA" = {alumno["clave"]}')
if alumno_base:
return alumno_base
usuario_base = query_single(f"""SELECT * FROM "Usuario" WHERE "Usuario_curp" = '{alumno["curp"]}'""")
plan_estudio_base = query_single(f"""SELECT * FROM "PlanEstudio_view" WHERE "PlanEstudio_desc" LIKE '%{alumno["plan"]}' AND "Carrera_clave" = {alumno["clave_carrera"]}""")
if usuario_base:
execute_query(f'''
INSERT INTO public."Alumno"("Usuario_claveULSA", "Usuario_id", "PlanEstudio_id", "Alumno_fecha_ingreso", "Alumno_generacion", "Alumno_serviciosocial")
VALUES ({alumno["clave"]}, {usuario_base["Usuario_id"]}, {plan_estudio_base["PlanEstudio_id"]}, '{alumno['fecha_ingreso']}', '{alumno['fecha_ingreso']}', {alumno["servicio_social"]})
ON CONFLICT ("Usuario_claveULSA") DO NOTHING;
''')
execute_query(f'''
INSERT INTO "Alumno_SubEstadoAlumno" ("SubEstadoAlumno_id", "Usuario_claveULSA", "SEA_fecha", "SEA_actual")
VALUES (3, {alumno["clave"]},'{alumno['fecha_ingreso']}', true)
''')
alumno_base = query_single(f'SELECT "Carrera_id", "PlanEstudio_id" FROM "Alumno_view" WHERE "Usuario_claveULSA" = {alumno["clave"]}')
return alumno_base
usuario_base = query_single(f'''
INSERT INTO public."Usuario"("Usuario_nombre", "Usuario_apellidos", "Usuario_curp")
VALUES ('{alumno["nombre"]}', '{alumno["apellido_paterno"]} {alumno["apellido_materno"]}', '{alumno["curp"]}') RETURNING "Usuario_id"
''')
execute_query(f'''
INSERT INTO public."Alumno"("Usuario_claveULSA", "Usuario_id", "PlanEstudio_id", "Alumno_fecha_ingreso", "Alumno_generacion", "Alumno_serviciosocial")
VALUES ({alumno["clave"]}, {usuario_base["Usuario_id"]}, {plan_estudio_base["PlanEstudio_id"]}, '{alumno['fecha_ingreso']}', '{alumno['fecha_ingreso']}', {alumno["servicio_social"]})
ON CONFLICT ("Usuario_claveULSA") DO NOTHING;
''')
alumno_base = query_single(f'SELECT "Carrera_id", "PlanEstudio_id" FROM "Alumno_view" WHERE "Usuario_claveULSA" = {alumno["clave"]}')
return alumno_base
# insert actualmente_cursadas
def insert_actualmente_cursadas(clave, actualmente_cursadas, periodo_actual, grupo_actual):
grupo_base = query_single(f'SELECT "Grupo_id" FROM "Grupo" WHERE "Grupo_desc" = {grupo_actual}')
periodo_base = query_single(f'SELECT "Periodo_id" FROM "Periodo" WHERE "Periodo_shortname" = {periodo_actual}')
# only where materia["Clave"] is set
for materia in filter(lambda materia: "Clave" in materia, actualmente_cursadas):
# to varchar
materia_clave = f"'{materia['Clave']}'"
materia_base = query_single(f'SELECT "Materia_id" FROM "Materia" WHERE {materia_clave} = ANY("Materia_claves")')
if materia_base:
execute_query(f'''
INSERT INTO public."Alumno_Materia"("Usuario_claveULSA", "Materia_id", "Periodo_id", "Grupo_id")
VALUES ({clave[2:]}, {materia_base["Materia_id"]}, {periodo_base["Periodo_id"]}, {grupo_base["Grupo_id"]})
ON CONFLICT ("Usuario_claveULSA", "Materia_id", "Periodo_id") DO NOTHING;
''')
def insert_datos(alumno):
execute_query(f'''
INSERT INTO public."pos_ultima_extraccion"("Usuario_claveULSA", "telefono", "correo", "estatus", "promedio", "sexo", "semestre")
VALUES ({alumno["clave"]}, '{alumno["telefono"]}', '{alumno["correo"]}', '{alumno["estatus"]}', {alumno["promedio"]} , '{alumno["sexo"]}', {alumno["semestre"]})
ON CONFLICT ("Usuario_claveULSA") DO UPDATE SET "telefono" = EXCLUDED."telefono", "correo" = EXCLUDED."correo", "estatus" = EXCLUDED."estatus", "promedio" = EXCLUDED."promedio",
"actualizacion" = now();
''')

35
lib/log.py Normal file
View File

@@ -0,0 +1,35 @@
import psycopg2
from psycopg2.extras import DictCursor
def connect_to_database(dbname="sgi", user="postgres", password="sys4lci", host="200.13.89.27", port="5432"):
"""
Establece una conexión a la base de datos y la retorna.
"""
connection = psycopg2.connect(
dbname=dbname,
user=user,
password=password,
host=host,
port=port
)
return connection
def query_all(sql):
with connect_to_database() as conn:
with conn.cursor(cursor_factory=DictCursor) as cur:
cur.execute(sql)
return cur.fetchall()
def query_single(sql):
with connect_to_database() as conn:
with conn.cursor(cursor_factory=DictCursor) as cur:
cur.execute(sql)
return cur.fetchone() # Returns a dictionary-like object
def execute_query(sql):
with connect_to_database() as conn:
with conn.cursor() as cur:
cur.execute(sql)
conn.commit() # Commit to save the insert operation

16
lib/selenium_setup.py Normal file
View File

@@ -0,0 +1,16 @@
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
def configure_selenium():
options = Options()
options.add_argument("--headless")
options.add_argument("--disable-gpu")
options.add_argument("--no-sandbox")
options.add_argument("--disable-dev-shm-usage")
options.add_argument("--window-size=1920x1080")
PATH = "/usr/bin/chromedriver"
service = Service(PATH)
driver = webdriver.Chrome(service=service, options=options)
return driver

54
lib/web_navigation.py Normal file
View File

@@ -0,0 +1,54 @@
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
def navigate_to_url(driver, url, clave, contraseña):
formatted_url = f"https://{clave}:{contraseña}@{url}"
driver.get(formatted_url)
driver.get(f'https://{url}')
# If dentro del código no existe un elemento con el id ctl00_lnkHome
if not WebDriverWait(driver, 10).until(
EC.presence_of_element_located((By.ID, "ctl00_lnkHome"))
):
raise Exception("No se pudo iniciar sesión.")
# wait until it appears this element.id = ctl00_contenedor_HistorialAlumno1_lblApPatAlumnoHP and get it
def wait_for_element(driver, element_id):
return WebDriverWait(driver, 10).until(
EC.presence_of_element_located((By.ID, element_id))
)
servicio_social = wait_for_element(driver, "ctl00_contenedor_HistorialAlumno1_Header1_lblSS")
alumno = {
"apellido_paterno": wait_for_element(driver, "ctl00_contenedor_HistorialAlumno1_lblApPatAlumnoHP").text,
"apellido_materno": wait_for_element(driver, "ctl00_contenedor_HistorialAlumno1_lblApMatAlumnoHP").text,
"curp": wait_for_element(driver, "ctl00_contenedor_HistorialAlumno1_lblCURPAlumnoHP").text,
"clave_carrera": int(wait_for_element(driver, "ctl00_contenedor_HistorialAlumno1_Header1_lblCveCarrera").text),
"plan": int(wait_for_element(driver, "ctl00_contenedor_HistorialAlumno1_Header1_lblAlupla").text),
"clave": int(wait_for_element(driver, "ctl00_contenedor_HistorialAlumno1_Header1_lblCveUlsa").text),
"servicio_social": servicio_social.text == "Realizado",
"nombre":wait_for_element(driver, "ctl00_contenedor_HistorialAlumno1_lblNombreAlumnoHP").text,
"correo": wait_for_element(driver, "ctl00_contenedor_HistorialAlumno1_lblCorreoAlumnoHP").text,
'estatus' : wait_for_element(driver, "ctl00_contenedor_HistorialAlumno1_Header1_lblStat").text,
"telefono": wait_for_element(driver, "ctl00_contenedor_HistorialAlumno1_lblTelefonoAlumnoHP").text,
"semestre": int(wait_for_element(driver, "ctl00_contenedor_HistorialAlumno1_Header1_lblSem").text),
"sexo": wait_for_element(driver, "ctl00_contenedor_HistorialAlumno1_lblSexoAlumnoHP").text,
}
click_element(driver, element_id="ctl00_contenedor_HistorialAlumno1_lblBtnSeccionHAcademico")
historial_academico = driver.page_source
alumno['promedio'] = float(wait_for_element(driver, "ctl00_contenedor_HistorialAlumno1_lblPromedioAlumnoHA").text)
return alumno, historial_academico
def click_element(driver, element_id):
elemento = WebDriverWait(driver, 10).until(
EC.presence_of_element_located((By.ID, element_id))
)
elemento.click()

16
logs.py Normal file
View File

@@ -0,0 +1,16 @@
import psycopg2
from psycopg2.extras import DictCursor
conn = psycopg2.connect(
dbname='adcfi',
user='postgres',
password='Ultr4p0d3r0s0##',
host='200.13.89.42',
port='5432'
)
with conn.cursor(cursor_factory=DictCursor) as cur:
cur.execute("SELECT * FROM calificaciones.calificaciones_log ORDER BY log_timestamp DESC LIMIT 10")
for row in cur:
print(row)

105
main.py Normal file
View File

@@ -0,0 +1,105 @@
import sys
from pathlib import Path
from flask import Flask, request, jsonify
from waitress import serve
app = Flask(__name__)
# Agregar el directorio lib al path de Python
lib_path = Path(__file__).parent / 'lib'
sys.path.append(str(lib_path))
# Ahora puedes importar los módulos desde el directorio lib
from selenium_setup import configure_selenium
from argument_parser import parse_arguments
from web_navigation import navigate_to_url, click_element
from data_processing import process_html
from database_operations import log
from funciones import *
def main(clave, contraseña):
# Configurar Selenium y navegar a la URL
driver = configure_selenium()
url = "sgu.ulsa.edu.mx/psulsa/alumnos/consultainformacionalumnos/consultainformacion.aspx"
datos_alumno, historial_academico = navigate_to_url(driver, url, clave, contraseña)
materias_sgu, actualmente_cursadas, periodo_actual, grupo_actual = process_html(historial_academico)
insert_actualmente_cursadas(clave, actualmente_cursadas, periodo_actual, grupo_actual)
# Obtener datos de la base
periodos_base = get_periodos(materias_sgu)
# obtener la fecha mínima del arreglo de diccionarios de periodos en su campo "Periodo_fecha_inicio"
fecha_mínima = min(periodos_base, key=lambda x: x['Periodo_fecha_inicial'])['Periodo_fecha_inicial']
# déjalo en el primer día del mes
datos_alumno['fecha_ingreso'] = fecha_mínima.replace(day=1)
materias_base = get_materias(materias_sgu)
tipo_calificaciones_base = get_tipo_calificaciones(materias_sgu)
alumno_base = insert_alumno(datos_alumno)
# Actualizar servicio social
actualizar_servicio(clave, datos_alumno['servicio_social'])
#insert datos
insert_datos(datos_alumno)
calificaciones = []
no_insertadas = []
for materia_sgu in materias_sgu:
materia_base = next((materia_base for materia_base in materias_base if materia_sgu['Cve ULSA'] in materia_base['Materia_claves'] or (materia_sgu['Cve SEP'] in materia_base["Materia_claves"] and alumno_base['PlanEstudio_id'] == materia_base['PlanEstudio_id'])), None)
periodo_base = next((periodo_base for periodo_base in periodos_base if periodo_base['Periodo_shortname'] == materia_sgu['PERIODO']), None)
tipo_calificacion_base = next((calificacion_base for calificacion_base in tipo_calificaciones_base if calificacion_base['TipoCalificacion_desc_corta'] == materia_sgu['EXAMEN']), None)
if 'GRUPO' in materia_sgu.keys():
grupo = get_grupo(materia_sgu['GRUPO'], alumno_base['Carrera_id'])
else:
grupo = None
if materia_base and periodo_base and tipo_calificacion_base:
calificaciones.append(f"({clave[2:]}, {materia_base['Materia_id']}, {periodo_base['Periodo_id']}, {tipo_calificacion_base['TipoCalificacion_id']}, {materia_sgu['CALIF']}, CURRENT_DATE, 'SGU')")
if not materia_base or not periodo_base or not tipo_calificacion_base or not grupo:
no_insertadas.append(f'''Materia: {materia_base['Materia_id'] if materia_base else materia_sgu['Cve ULSA']} - Periodo_base: {periodo_base['Periodo_id'] if periodo_base else materia_sgu['PERIODO']} Tipo_calificacion_base: {tipo_calificacion_base['TipoCalificacion_id'] if tipo_calificacion_base else materia_sgu['EXAMEN']} Grupo: {grupo['Grupo_id'] if grupo else materia_sgu['GRUPO'] if 'GRUPO' in materia_sgu.keys() else 'None' } ''')
continue
insert_materia(clave, materia_base['Materia_id'], periodo_base['Periodo_id'], grupo['Grupo_id'])
if not calificaciones or len(calificaciones) == 0:
raise Exception("No hay calificaciones para insertar o actualizar.")
# Insertar calificaciones
insert_calificaciones(calificaciones)
return clave, no_insertadas
@app.route('/calificaciones', methods=['POST'])
def calificaciones():
try:
# Obtener la clave y la contraseña de la solicitud POST
clave = request.form.get('clave')
contraseña = request.form.get('contraseña')
# Verificar si la clave y la contraseña existen
if clave is None or contraseña is None:
return "Error: La clave y/o contraseña no fueron proporcionadas en la solicitud."
# Procesar los datos (aquí llamamos a la función main)
clave, no_insertadas = main(clave, contraseña)
# Registro de éxito
log("Proceso terminado con éxito.", 200, None, clave, no_insertadas)
# Retornar respuesta exitosa como JSON
return jsonify({"mensaje": "Proceso terminado con éxito.", "clave": clave, "no_insertadas": no_insertadas, "success": True})
except Exception as e:
# remove all ' from the error message
e = e.replace("'", "")
log(str(e), 500, e, None)
# Retornar mensaje de error como JSON
return jsonify({"mensaje": str(e), "success": False})
if __name__ == "__main__":
serve(app, host='0.0.0.0', port=5000)

BIN
proceso Normal file

Binary file not shown.

217
sgu.py Normal file
View File

@@ -0,0 +1,217 @@
import numpy as np
from bs4 import BeautifulSoup
import psycopg2
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
import argparse
import getpass
try:
# Configuración de Selenium
options = Options()
options.add_argument("--headless")
options.add_argument("--disable-gpu")
options.add_argument("--no-sandbox")
options.add_argument("--disable-dev-shm-usage")
options.add_argument("--window-size=1920x1080")
PATH = "/usr/bin/chromedriver"
service = Service(PATH)
driver = webdriver.Chrome(service=service, options=options)
except Exception as e:
print(f"Error configurando el navegador: {e}")
exit()
try:
# Parseo de argumentos
parser = argparse.ArgumentParser()
parser.add_argument("clave", help="Clave ULSA argument")
args = parser.parse_args()
clave = args.clave
# Solicitar la contraseña de manera segura
contraseña = getpass.getpass("Contraseña: ")
if not clave or not contraseña:
raise ValueError("Clave y/o contraseña no válidos")
except Exception as e:
print(f"Error en los argumentos: {e}")
driver.quit()
exit()
try:
# Navegar a la URL
url = "sgu.ulsa.edu.mx/psulsa/alumnos/consultainformacionalumnos/consultainformacion.aspx"
formatted_url = f"https://{clave}:{contraseña}@{url}"
driver.get(formatted_url)
driver.get(f'https://{url}')
except Exception as e:
print(f"Error navegando a la URL: {e}")
driver.quit()
exit()
try:
# If dentro del código existe un (case insensitive) Unauthorized
if "Unauthorized" in driver.page_source:
raise Exception("Credenciales inválidas")
elemento = WebDriverWait(driver, 10).until(
EC.presence_of_element_located((By.ID, "ctl00_contenedor_HistorialAlumno1_lblBtnSeccionHAcademico"))
)
elemento.click()
except Exception as e:
print(f"Error interactuando con la página: {e}")
driver.quit()
exit()
try:
# Procesamiento de HTML con BeautifulSoup
html_doc = driver.page_source
soup = BeautifulSoup(html_doc, 'lxml')
table = soup.find('table', attrs={'id': 'ctl00_contenedor_HistorialAlumno1_gvMaterias'})
if table is None:
raise Exception("Tabla no encontrada en la página")
materias_sgu = []
headers = [header.text for header in table.find_all('th')]
def is_cell_empty(cell_content):
return not cell_content.strip() or cell_content == u'\xa0'
for row in table.find_all('tr'):
cols = row.find_all('td')
if cols and not any(is_cell_empty(col.text) for col in cols):
materias_sgu.append({headers[i]: col.text for i, col in enumerate(cols)})
for materia in materias_sgu:
materia['SEMESTRE'] = materia.pop('\xa0')
servicio_social = soup.find('span', attrs={'id': 'ctl00_contenedor_HistorialAlumno1_Header1_lblSS'}).text
# puede ser Realizado, No Realizado
if servicio_social == "No Realizado":
Alumno_serviciosocial = False
elif servicio_social == "Realizado":
Alumno_serviciosocial = True
except Exception as e:
print(f"Error procesando HTML: {e}")
exit()
finally:
driver.quit()
# Conexión a la base de datos y operaciones
conexion = psycopg2.connect(
dbname="sgi",
user="postgres",
password="sys4lci",
host="200.13.89.27",
port="5432"
)
# Ejecutar la consulta para actualizar public."Alumno"."Alumno_serviciosocial"
try:
with conexion.cursor() as cursor:
cursor.execute(f'UPDATE public."Alumno" SET "Alumno_serviciosocial" = {Alumno_serviciosocial} WHERE "Usuario_claveULSA" = \'{clave[2:]}\'')
conexion.commit()
except Exception as e:
print(f"Error al actualizar el servicio social: {e}")
if conexion:
conexion.rollback()
try:
# Ejecutar la consulta para obtener los periodos
with conexion.cursor() as cursor:
cursor.execute('SELECT * FROM "Periodo"')
Periodos = cursor.fetchall()
except Exception as e:
print(f"Error obteniendo los periodos de la base de datos: {e}")
try:
# Ejecutar la consulta para obtener los tipos de calificación
with conexion.cursor() as cursor:
cursor.execute('SELECT * FROM "TipoCalificacion"')
TiposCalificacion = cursor.fetchall()
except Exception as e:
print(f"Error obteniendo los tipos de calificación de la base de datos: {e}")
try:
# Ejecutar la consulta para obtener las materias base
with conexion.cursor() as cursor:
cursor.execute('SELECT * FROM "Materia"')
materias_base = cursor.fetchall()
except Exception as e:
print(f"Error obteniendo las materias de la base de datos: {e}")
try:
# Procesar y preparar las calificaciones para inserción/actualización
calificaciones = []
with conexion.cursor() as cursor:
cursor.execute(f'''
SELECT "Carrera_id", "PlanEstudio_id"
FROM "Alumno_view"
WHERE "Usuario_claveULSA" = {clave[2:]}
''')
Alumno_base = cursor.fetchone()
for materia_sgu in materias_sgu:
materia_base = next((materia_base for materia_base in materias_base if (materia_sgu['Cve ULSA'] in materia_base[-1] or materia_sgu['Cve SEP'] in materia_base[-1]) and (Alumno_base[1] == materia_base[0])), None)
if not materia_base:
continue
Periodo_base = next((Periodo_base for Periodo_base in Periodos if Periodo_base[-2] == materia_sgu['PERIODO']), None)
if not Periodo_base:
continue
Calificacion_base = next((Calificacion_base for Calificacion_base in TiposCalificacion if Calificacion_base[-2] == materia_sgu['EXAMEN']), None)
if not Calificacion_base:
raise Exception(f"No se encontró el tipo de calificación {materia_sgu['EXAMEN']} en la base de datos")
# buscar en la base de datos el grupo WHERE Grupo_desc = materia_sgu['GRUPO'] and if is not in the base insert it (note: semestre when printed in the console is '\xa0': '5')
with conexion.cursor() as cursor:
cursor.execute(f'''
SELECT "Grupo_id"
FROM "Grupo_view"
WHERE REGEXP_REPLACE("Grupo_desc", '[^\d]', '', 'g') = '{materia_sgu["GRUPO"]}' AND (("Carrera_id" = {Alumno_base[0]}) OR "Carrera_esComun")
''')
Grupo = cursor.fetchone()
if Grupo:
cursor.execute(f'''
INSERT INTO public."Alumno_Materia"("Usuario_claveULSA", "Materia_id", "Periodo_id", "Grupo_id")
VALUES ({clave[2:]}, {materia_base[0]}, {Periodo_base[0]}, {Grupo[0]})
ON CONFLICT ("Usuario_claveULSA", "Materia_id", "Periodo_id") DO NOTHING
;
''')
conexion.commit()
if not materia_base or not Periodo_base or not Calificacion_base:
print(f"No se encontraron coincidencias para la materia {materia_sgu['Cve ULSA']} o el periodo {materia_sgu['PERIODO']} o el tipo de calificación {materia_sgu['EXAMEN']}")
continue # Saltar esta iteración si alguna coincidencia falla
calificaciones.append(f"({clave[2:]}, {materia_base[0]}, {Periodo_base[0]}, {Calificacion_base[0]}, {materia_sgu['CALIF']}, CURRENT_DATE, 'SGU')")
if not calificaciones or len(calificaciones) == 0:
raise Exception("No hay calificaciones para insertar o actualizar.")
# Inserción/actualización de calificaciones en la base de datos
with conexion.cursor() as cursor:
cursor.execute(f'''
insert into "Alumno_Materia_Calificacion"
("Usuario_claveULSA", "Materia_id", "Periodo_id", "TipoCalificacion_id", "Calificacion_calif", "Calificacion_fecha", "Calificacion_comentario")
values
{','.join(calificaciones)}
on conflict ("Usuario_claveULSA", "Materia_id", "Periodo_id", "TipoCalificacion_id")
DO UPDATE SET "Calificacion_calif" = EXCLUDED."Calificacion_calif", "Calificacion_comentario" = EXCLUDED."Calificacion_comentario";
''')
conexion.commit()
except Exception as e:
print(f"Error al insertar/actualizar las calificaciones: {e} Stack: {e.__traceback__}")
# print the whole stack
if conexion:
conexion.rollback() # Revertir cambios en caso de error
conexion.close()