Commit 25276e14 authored by marioromera's avatar marioromera
Browse files

Tokenize texts

parent 481c4c95
import requests
import csv
import time
from bs4 import BeautifulSoup
import datetime
import os
from os import walk
from datetime import timedelta, date
import io
from pdf_to_txt import convert_pdf_to_txt
boe_url = 'https://boe.es'
boe_api_sumario = boe_url + '/diario_boe/xml.php?id=BOE-S-'
today_year = datetime.datetime.now().strftime("%Y")
today_month = datetime.datetime.now().strftime("%m")
today_day = datetime.datetime.now().strftime("%d")
print("Today is " + today_year + today_month + today_day)
def daterange(start, end):
for n in range(int((end - start).days)):
yield start + timedelta(n)
years_from = [str(i) for i in range(2019, int(today_year))]
months_from = ["%.2d" % i for i in range(1, int(today_month))]
days_from = ["%.2d" % i for i in range(1, int(today_day))]
start_date = date(2020, 1, 25)
end_date = date(2020, 5, 28)
print(f"Searching from: {start_date}, to: {end_date}")
boe_url = 'https://boe.es'
boe_api_sumario = boe_url + '/diario_borme/xml.php?id=BORME-S-'
downloads_folder = "./downloaded_files"
if not os.path.exists(downloads_folder):
os.makedirs(downloads_folder)
summaries_folder = "/summaries"
summaries_folder = downloads_folder + "/summaries"
if not os.path.exists(summaries_folder):
os.makedirs(summaries_folder)
pdfs_folder = "/pdfs"
if not os.path.exists(pdfs_folder):
os.makedirs(pdfs_folder)
texts_folder = downloads_folder + "/texts"
if not os.path.exists(texts_folder):
os.makedirs(texts_folder)
def get_file_by_url(url):
print("Getting " + url)
summary = requests.get(url)
file = requests.get(url)
print("Got it!")
return summary
for year in years_from:
for month in months_from:
for day in days_from:
summary_filepath = downloads_folder + summaries_folder + "/summary-" + year + month + day + ".xml"
if os.path.exists(summary_filepath):
break
print("Creating summary file " + summary_filepath)
f = open(summary_filepath, "w+")
summary_content = get_file_by_url(boe_api_sumario + year + month + day).text
f.write(summary_content)
f.close()
for (dirpath, dirnames, filenames) in walk(downloads_folder + summaries_folder):
return file
for single_date in daterange(start_date, end_date):
date = single_date.strftime("%Y%m%d")
summary_filepath = summaries_folder + "/summary-" + date + ".xml"
if os.path.exists(summary_filepath):
print(f"Already gotten {summary_filepath}")
continue
print("Starting to get: " + date)
summary_content = get_file_by_url(boe_api_sumario + date).text
if len(summary_content):
print(f"Creating summary file {summary_filepath}")
f = open(summary_filepath, "w+")
f.write(summary_content)
f.close()
for (dirpath, dirnames, filenames) in walk(summaries_folder):
for filename in filenames:
filepath = downloads_folder + summaries_folder + "/" + filename
filepath = summaries_folder + "/" + filename
print("opening " + filepath)
summary = open(filepath, "r")
parsed_summary = BeautifulSoup(summary, 'xml')
error = parsed_summary.find("error")
if error:
print("File has error", error.contents)
continue
summary_pdfs_tags = parsed_summary.find_all("urlPdf")
for tag in summary_pdfs_tags:
pdf_filepath = downloads_folder + pdfs_folder + tag.text[len("/pdfs/boe/dias/2019/01/01"): ]
if os.path.exists(summary_filepath):
break
print("Creating pdf file " + pdf_filepath)
f = open(pdf_filepath, "wb")
pdf = get_file_by_url(boe_url + tag.text).content
f.write(pdf)
f.close()
file_date = filename.replace("summary-", "").replace(".xml", "")
txt_name = tag.text[len("/txts/boe/dias/01/01/01/01"):].replace(".pdf", ".txt")
txt_filepath = f"{texts_folder}/{file_date}-{txt_name}"
if os.path.exists(txt_filepath):
print(f"Already gotten {txt_filepath}")
continue
txt = get_file_by_url(boe_url + tag.text).content
if not len(txt):
print(f"Nothing found {boe_url + tag.text}")
continue
print("Creating txt file " + txt_filepath)
with io.open(txt_filepath, "w", encoding="utf-8") as f:
text = convert_pdf_to_txt(io.BytesIO(txt))
f.write(text)
f.close()
print("Finished downloading files")
import os
import spacy
import io
from spacy.lang.es import Spanish
import textacy
from textacy.vsm import Vectorizer
from textacy import preprocessing
pdfs_folder_path = "./downloaded_files/texts"
spacy_stopwords = spacy.lang.es.stop_words.STOP_WORDS
nlp = Spanish()
def clean_text(txt):
clean = preprocessing.replace_urls(preprocessing.normalize_whitespace(preprocessing.remove_punctuation(txt)))
clean = [tok for tok in clean.split(" ") if tok not in spacy_stopwords or len(tok) > 2]
return " ".join(clean)
docs = []
for (dirpath, dirnames, filenames) in os.walk(pdfs_folder_path):
for filename in filenames:
print(f"Opening {filename}")
file = io.open(f"{pdfs_folder_path}/{filename}", encoding="utf-8")
text = "".join(line.rstrip('\n') for line in file)
print("Cleaning text")
preprocessed = clean_text(text)
print(f"Making spacy doc")
date = filename[:len("YYYYMMDD")]
doc = textacy.make_spacy_doc((preprocessed, {"date": date}), "es")
print(filename, doc._.preview)
print(f"Adding to docs!")
docs.append(doc)
print(f"Docs: {len(docs)}")
def get_tokenize_docs(documents):
return [
d._.to_terms_list(ngrams=1, named_entities=True, as_strings=True, filter_stops=False,
normalize='lower')
for d in documents]
def find(needle, haystack):
if needle == haystack: return []
# Strings are iterable, too
if isinstance(haystack, str) and len(haystack) <= 1:
return None
try:
for i, e in enumerate(haystack):
r = find(needle, e)
if r is not None:
r.insert(0, i)
return r
except TypeError:
pass
return None
searched_word = "paz".lower()
print("Finding needle in the haystack")
googleIndex = find(searched_word, get_tokenize_docs(docs))
print(f"{searched_word} index", googleIndex)
tf_vectorizer = Vectorizer(apply_idf=False, norm=None)
tf_doc_term_matrix = tf_vectorizer.fit_transform(get_tokenize_docs(docs))
print("tf", tf_doc_term_matrix.toarray())
idf_vectorizer = Vectorizer(apply_idf=False, norm=None)
idf_doc_term_matrix = idf_vectorizer.fit_transform(get_tokenize_docs(docs))
print("idf", idf_doc_term_matrix.toarray())
tfidf_vectorizer = Vectorizer(apply_idf=True, norm=None, idf_type='standard')
tfidf_doc_term_matrix = tfidf_vectorizer.fit_transform(get_tokenize_docs(docs))
print("tfidf", tfidf_doc_term_matrix.toarray())
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment