##############################################################################
# Parte del libro Introducción a la programación con Python
# Autor: Nilo Ney Coutinho Menezes
# Editora Novatec (c) 2015 - ISBN 978-85-7522-250-8
# Primera edición - Mayo/2016
# Sitio: http://www.librodepython.com
#
# Archivo: lista\capítulo 11\11.30 - Lista parcial – Clase DBAgenda.py
# Descripción: Lista parcial – Clase DBAgenda
##############################################################################
BANCO = """
create table tipos(id integer primary key autoincrement,
descripción text);
create table nombres(id integer primary key autoincrement,
nombre text);
create table teléfonos(id integer primary key autoincrement,
id_nombre integer,
número text,
id_tipo integer);
insert into tipos(descripción) values ("Celular");
insert into tipos(descripción) values ("Fijo");
insert into tipos(descripción) values ("Fax");
insert into tipos(descripción) values ("Casa");
insert into tipos(descripción) values ("Trabajo");
"""
class DBAgenda:
def __init__(self, banco):
self.tiposTeléfono = DBTiposTeléfono()
self.banco = banco
nuevo = not os.path.isfile(banco)
self.conexión = sqlite3.connect(banco)
self.conexión.row_factory = sqlite3.Row
if nuevo:
self.crea_banco()
self.cargaTipos()
def cargaTipos(self):
for tipo in self.conexión.execute("select * from tipos"):
id_ = tipo["id"]
descripción = tipo["descripción"]
self.tiposTeléfono.adiciona(DBTipoTeléfono(id_, descripción))
def crea_banco(self):
self.conexión.executescript(BANCO)
def investigaciónNombre(self, nombre):
if not isinstance(nombre, DBNome):
raise TypeError("nombre debe ser del tipo DBNome")
encontrado = self.conexión.execute("""select count(*)
from nombres where nombre = ?""",
(nombre.nombre,)).fetchone()
if(encontrado[0] > 0):
return self.carga_por_nombre(nombre)
else:
return None
def carga_por_id(self, id):
consulta = self.conexión.execute(
"select * from nombres where id = ?", (id,))
return self.carga(consulta.fetchone())
def carga_por_nombre(self, nombre):
consulta = self.conexión.execute(
"select * from nombres where nombre = ?", (nombre.nombre,))
return self.carga(consulta.fetchone())
def carga(self, consulta):
if consulta is None:
return None
nuevo = DBDadoAgenda(DBNome(consulta["nombre"], consulta["id"]))
for teléfono in self.conexión.execute(
"select * from teléfonos where id_nombre = ?",
(nuevo.nombre.id,)):
ntel = DBTeléfono(teléfono["número"], None,
teléfono["id"], teléfono["id_nombre"])
for tipo in self.tiposTeléfono:
if tipo.id == teléfono["id_tipo"]:
ntel.tipo = tipo
break
nuevo.teléfonos.adiciona(ntel)
return nuevo
def lista(self):
consulta = self.conexión.execute("select * from nombres order by nombre")
for registro in consulta:
yield self.carga(registro)
def nuevo(self, registro):
try:
cur = self.conexión.cursor()
cur.execute("insert into nombres(nombre) values (?)",
(str(registro.nombre),))
registro.nombre.id = cur.lastrowid
for teléfono in registro.teléfonos:
cur.execute("""insert into teléfonos(número,
id_tipo, id_nombre) values (?,?,?)""",
(teléfono.número, teléfono.tipo.id,
registro.nombre.id))
teléfono.id = cur.lastrowid
self.conexión.commit()
except:
self.conexión.rollback()
raise
finally:
cur.close()
def actualiza(self, registro):
try:
cur = self.conexión.cursor()
cur.execute("update nombres set nombre=? where id = ?",
(str(registro.nombre), registro.nombre.id))
for teléfono in registro.teléfonos:
if teléfono.id is None:
cur.execute("""insert into teléfonos(número,
id_tipo, id_nombre)
values (?,?,?)""",
(teléfono.número, teléfono.tipo.id,
registro.nombre.id))
teléfono.id = cur.lastrowid
else:
cur.execute("""update teléfonos set número=?,
id_tipo=?, id_nombre=?
where id = ?""",
(teléfono.número, teléfono.tipo.id,
registro.nombre.id, teléfono.id))
for borrado in registro.teléfonos.borrados:
cur.execute("delete from teléfonos where id = ?", (borrado,))
self.conexión.commit()
registro.teléfonos.limpia()
except:
self.conexión.rollback()
raise
finally:
cur.close()
def borra(self, registro):
try:
cur = self.conexión.cursor()
cur.execute("delete from teléfonos where id_nombre = ?",
(registro.nombre.id,))
cur.execute("delete from nombres where id = ?",
(registro.nombre.id,))
self.conexión.commit()
except:
self.conexión.rollback()
raise
finally:
cur.close()