Lista 11-30: Lista parcial – Clase DBAgenda

##############################################################################
# Parte del libro Introducción a la programación con Python
# Autor: Nilo Ney Coutinho Menezes
# Editora Novatec (c) 2015 - ISBN 978-85-7522-250-8
# Primera edición - Mayo/2016
# Sitio: http://www.librodepython.com
#
# Archivo: lista\capítulo 11\11.30 - Lista parcial – Clase DBAgenda.py
# Descripción: Lista parcial – Clase DBAgenda
##############################################################################

BANCO = """
create table tipos(id integer primary key autoincrement,
                                     descripción text);
create table nombres(id integer primary key autoincrement,
                                         nombre text);
create table teléfonos(id integer primary key autoincrement,
                                             id_nombre integer,
                                             número text,
                                             id_tipo integer);
insert into tipos(descripción) values ("Celular");
insert into tipos(descripción) values ("Fijo");
insert into tipos(descripción) values ("Fax");
insert into tipos(descripción) values ("Casa");
insert into tipos(descripción) values ("Trabajo");
"""


class DBAgenda:
    def __init__(self, banco):
        self.tiposTeléfono = DBTiposTeléfono()
        self.banco = banco
        nuevo = not os.path.isfile(banco)
        self.conexión = sqlite3.connect(banco)
        self.conexión.row_factory = sqlite3.Row
        if nuevo:
            self.crea_banco()
        self.cargaTipos()

    def cargaTipos(self):
        for tipo in self.conexión.execute("select * from tipos"):
            id_ = tipo["id"]
            descripción = tipo["descripción"]
            self.tiposTeléfono.adiciona(DBTipoTeléfono(id_, descripción))

    def crea_banco(self):
        self.conexión.executescript(BANCO)

    def investigaciónNombre(self, nombre):
        if not isinstance(nombre, DBNome):
            raise TypeError("nombre debe ser del tipo DBNome")
        encontrado = self.conexión.execute("""select count(*)
                                              from nombres where nombre = ?""",
                                           (nombre.nombre,)).fetchone()
        if(encontrado[0] > 0):
            return self.carga_por_nombre(nombre)
        else:
            return None

    def carga_por_id(self, id):
        consulta = self.conexión.execute(
            "select * from nombres where id = ?", (id,))
        return self.carga(consulta.fetchone())

    def carga_por_nombre(self, nombre):
        consulta = self.conexión.execute(
            "select * from nombres where nombre = ?", (nombre.nombre,))
        return self.carga(consulta.fetchone())

    def carga(self, consulta):
        if consulta is None:
            return None
        nuevo = DBDadoAgenda(DBNome(consulta["nombre"], consulta["id"]))
        for teléfono in self.conexión.execute(
                "select * from teléfonos where id_nombre = ?",
                (nuevo.nombre.id,)):
            ntel = DBTeléfono(teléfono["número"], None,
                              teléfono["id"], teléfono["id_nombre"])
        for tipo in self.tiposTeléfono:
            if tipo.id == teléfono["id_tipo"]:
                ntel.tipo = tipo
                break
            nuevo.teléfonos.adiciona(ntel)
        return nuevo

    def lista(self):
        consulta = self.conexión.execute("select * from nombres order by nombre")
        for registro in consulta:
            yield self.carga(registro)

    def nuevo(self, registro):
        try:
            cur = self.conexión.cursor()
            cur.execute("insert into nombres(nombre) values (?)",
                        (str(registro.nombre),))
            registro.nombre.id = cur.lastrowid
            for teléfono in registro.teléfonos:
                cur.execute("""insert into teléfonos(número,
                                                     id_tipo, id_nombre) values (?,?,?)""",
                            (teléfono.número, teléfono.tipo.id,
                             registro.nombre.id))
                teléfono.id = cur.lastrowid
            self.conexión.commit()
        except:
            self.conexión.rollback()
            raise
        finally:
            cur.close()

    def actualiza(self, registro):
        try:
            cur = self.conexión.cursor()
            cur.execute("update nombres set nombre=? where id = ?",
                        (str(registro.nombre), registro.nombre.id))
            for teléfono in registro.teléfonos:
                if teléfono.id is None:
                    cur.execute("""insert into teléfonos(número,
                                                         id_tipo, id_nombre)
                                                         values (?,?,?)""",
                                (teléfono.número, teléfono.tipo.id,
                                 registro.nombre.id))
                    teléfono.id = cur.lastrowid
                else:
                    cur.execute("""update teléfonos set número=?,
                                                        id_tipo=?, id_nombre=?
                                                    where id = ?""",
                                (teléfono.número, teléfono.tipo.id,
                                 registro.nombre.id, teléfono.id))
            for borrado in registro.teléfonos.borrados:
                cur.execute("delete from teléfonos where id = ?", (borrado,))
            self.conexión.commit()
            registro.teléfonos.limpia()
        except:
            self.conexión.rollback()
            raise
        finally:
            cur.close()

    def borra(self, registro):
        try:
            cur = self.conexión.cursor()
            cur.execute("delete from teléfonos where id_nombre = ?",
                        (registro.nombre.id,))
            cur.execute("delete from nombres where id = ?",
                        (registro.nombre.id,))
            self.conexión.commit()
        except:
            self.conexión.rollback()
            raise
        finally:
            cur.close()
Haga clic aquí para bajar el archivo