티스토리 뷰

[sqlite3 table : export, import ]


db 파일의 table 을 csv 로 export, import 해보자


[csv <-> db]


csv 파일이름은 table 이름


첫째줄은 테이블의 필드 이름들


둘째줄부터 필드의 값들이 들어간다.



[커맨드]


import test.db


위의 커맨드를 입력하면  

현재 root.py 가 실행되는 폴더의 모든 csv 파일을 test.db 로 생성 하자

(위의 포멧 참조)


show test.db


위 커맨드로 test.db 의 테이블과 필드들을 출력한다.


export test.db


입력하면 test.db 의 모든 테이블과 내용을 csv 파일로 export 하자

1개의 테이블단 1개의 csv 파일로 생성한다.

현재 폴더에 생성한다.

(위의 포멧 참조)



[예외처리]


export/show 하려는 파일이 없을때 

파일이름 is not found 출력하면서 예외처리를 해준다.


커맨드가 잘못 되었을때 

invalid command! 라고 출력해준다.








root.py

import os
from dbmanager import *
from csvmanager import *


def context_manager(export=False):
def decorator(func):
def wrapper(*args, **kwargs):
db_file = kwargs['db_file']
if not export or db_file in [file for file in os.listdir(os.getcwd())]:
with sqlite3.connect(db_file) as con:
return func(*args, con=con, **kwargs)
print("{} is not found".format(db_file))
return wrapper
return decorator


@context_manager()
def import_csv(csv, db_file=None, con=None):
field = read(csv)
table = csv.replace(".csv", "")
print("create {} from {}".format(table, csv))
delete_tables(con, [table])
create_table(con, table, *list(map(lambda x: str(x) + " text", field[0])))
insert_fields(con, table, field[1:])
con.commit()


def import_all(db_file=None):
files = [file for file in os.listdir(os.getcwd()) if '.csv' in file]
for file in files:
import_csv(file, db_file=db_file)
print("import {} files".format(len(files)))


@context_manager(export=True)
def export_all(db_file=None, con=None):
tables = get_tables(con)
for table in tables:
file = "./{}.csv".format(table)
write(file, select_table(con, table, key=True))
print("write {} from {} table".format(file, table))
print("export {} tables".format(len(list(tables))))


@context_manager(export=True)
def show_db(db_file=None, con=None):
print("\n".join([(str(table) + " " + str(select_table(con, table, key=True))) for table in get_tables(con)]))


def operator(cmd, size=2, tks=('import', 'export', 'show'), fs=(import_all, export_all, show_db)):
cmd = cmd.split(" ")
if len(cmd) != size or cmd[0] not in tks or '.db' not in cmd[1]:
print("invalid command")
else:
fs[tks.index(cmd[0])](db_file=cmd[1])


def main():
while True:
operator(input(""))


if __name__ == "__main__":
main()


csvmanager.py

def handler(func):
def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except FileNotFoundError as e:
print(e)
except Exception as e:
print(e)
return wrapper


@handler
def write(path, data, encoding='euc-kr'):
with open(path, 'w', encoding=encoding) as f:
f.writelines("\n".join([",".join(list(map(str, row))) for row in data]))


@handler
def read(path, encoding='euc-kr'):
with open(path, 'r', encoding=encoding) as f:
return [[item if '\n' not in item else item.replace('\n', '') for item in line.split(',')]for line in f.readlines()]




dbmanager.py

import sqlite3


def handler(func):
def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except sqlite3.OperationalError as e:
print(e)
except Exception as e:
print(e)
return wrapper


@handler
def create_table(con, table, *args):
cur = con.cursor()
fields = ("{} text," * len(args)).format(*args)[:-1]
cur.execute("create table {}({})".format(table, fields))


@handler
def insert_fields(con, table, field_list):
cur = con.cursor()
cur.executemany("insert into {} values({})".format(table, ",".join(['?']*len(field_list[0]))), field_list)


@handler
def find_table(con, table):
cur = con.cursor()
cur.execute("select name from sqlite_master where type='table' and name='{}'".format(table))
table = [tb[0] for tb in cur]
return table[0] if table else []


@handler
def delete_tables(con, tables):
cur = con.cursor()
for table in tables:
cur.execute("drop table {}".format(table))


@handler
def select_table(con, table, key=False):
cur = con.cursor()
cur.execute("select * from {}".format(table))
values = [row for row in cur]
return [tuple([fd[0] for fd in cur.description])] + values if key else values


@handler
def get_tables(con):
cur = con.cursor()
cur.execute("select name from sqlite_master where type='table'")
return tuple([table[0] for table in cur])


댓글