Commit a1f50a50 authored by marioromera's avatar marioromera
Browse files

Analyze text extracting tf, idf and tf-idf

parent 25276e14
......@@ -8,24 +8,29 @@ import io
from pdf_to_txt import convert_pdf_to_txt
# Returns an array containing all dates in between start and end
def daterange(start, end):
for n in range(int((end - start).days)):
yield start + timedelta(n)
start_date = date(2020, 1, 25)
# Input dates
start_date = date(2020, 5, 25)
end_date = date(2020, 5, 28)
print(f"Searching from: {start_date}, to: {end_date}")
# Fixed urls to download content
boe_url = 'https://boe.es'
boe_api_sumario = boe_url + '/diario_borme/xml.php?id=BORME-S-'
boe_api_sumario = boe_url + '/diario_borme/xml.php?id=BOE-S-'
downloads_folder = "./downloaded_files"
if not os.path.exists(downloads_folder):
os.makedirs(downloads_folder)
summaries_folder = downloads_folder + "/summaries"
summaries_folder = downloads_folder + "/summaries"
if not os.path.exists(summaries_folder):
os.makedirs(summaries_folder)
texts_folder = downloads_folder + "/texts"
if not os.path.exists(texts_folder):
os.makedirs(texts_folder)
......@@ -33,55 +38,77 @@ if not os.path.exists(texts_folder):
def get_file_by_url(url):
print("Getting " + url)
file = requests.get(url)
print("Got it!")
return file
file_request = requests.get(url)
print("Got file request")
return file_request
# Iterate over dates to download files
for single_date in daterange(start_date, end_date):
# Adjusts to "api" format
date = single_date.strftime("%Y%m%d")
# Summary naming
summary_filepath = summaries_folder + "/summary-" + date + ".xml"
if os.path.exists(summary_filepath):
print(f"Already gotten {summary_filepath}")
# TODO: check properly if it is the same
print(f"Already download {summary_filepath}")
continue
print("Starting to get: " + date)
print(f"Starting to get files from date: {date}")
summary_content = get_file_by_url(boe_api_sumario + date).text
if len(summary_content):
print(f"Creating summary file {summary_filepath}")
# if doesn't exists creates the file, otherwise write to it
f = open(summary_filepath, "w+")
f.write(summary_content)
f.close()
for (dirpath, dirnames, filenames) in walk(summaries_folder):
for filename in filenames:
filepath = summaries_folder + "/" + filename
print("opening " + filepath)
print("Opening " + filepath)
summary = open(filepath, "r")
# Parse doc to allow queries
parsed_summary = BeautifulSoup(summary, 'xml')
error = parsed_summary.find("error")
# if error on summary, we skip it, most proably there wasn't a publication
if error:
print("File has error", error.contents)
continue
summary_pdfs_tags = parsed_summary.find_all("urlPdf")
# Find urls by getting the xml urls
summary_pdfs_tags = parsed_summary.find_all("urlXml")
for tag in summary_pdfs_tags:
file_date = filename.replace("summary-", "").replace(".xml", "")
txt_name = tag.text[len("/txts/boe/dias/01/01/01/01"):].replace(".pdf", ".txt")
txt_filepath = f"{texts_folder}/{file_date}-{txt_name}"
summary_date = filename.replace("summary-", "").rstrip(".xml")
# Remove slugs from xml tag to name properly the document
txt_name = tag.text[len("/diario_boe/xml.php?id="):] + ".txt"
# Add summary date
txt_filepath = f"{texts_folder}/{summary_date}-{txt_name}"
# TODO: check properly if it is the same
if os.path.exists(txt_filepath):
print(f"Already gotten {txt_filepath}")
print(f"Already downloaded {txt_filepath}")
continue
txt = get_file_by_url(boe_url + tag.text).content
if not len(txt):
print(f"Nothing found {boe_url + tag.text}")
xml_file = get_file_by_url(boe_url + tag.text).text
parsed_xml_file = BeautifulSoup(xml_file, 'xml')
error = parsed_summary.find("error")
if error:
print(f"Error found {boe_url + tag.text}")
continue
print("Creating txt file " + txt_filepath)
with io.open(txt_filepath, "w", encoding="utf-8") as f:
text = convert_pdf_to_txt(io.BytesIO(txt))
text = ""
# Takes all text entries and put them together as the file content
for el in parsed_xml_file.find_all('texto'):
text = text + el.get_text()
f.write(text)
f.close()
......
import os
import spacy
import io
from spacy.lang.es import Spanish
import textacy
from textacy.vsm import Vectorizer
from textacy import preprocessing
import textacy.vsm
pdfs_folder_path = "./downloaded_files/texts"
# Folder from where files will be loaded
pdfs_folder_path = "./downloaded_files/texts_to_debug"
spacy_stopwords = spacy.lang.es.stop_words.STOP_WORDS
# Spacy library with spanish language loaded
nlp = Spanish()
def clean_text(txt):
clean = preprocessing.replace_urls(preprocessing.normalize_whitespace(preprocessing.remove_punctuation(txt)))
clean = [tok for tok in clean.split(" ") if tok not in spacy_stopwords or len(tok) > 2]
clean = [tok.text for tok in nlp.tokenizer(clean) if tok.is_stop or len(tok.text) > 2]
return " ".join(clean)
docs = []
corpus = []
# Iterate over files in folder
for (dirpath, dirnames, filenames) in os.walk(pdfs_folder_path):
for filename in filenames:
print(f"Opening {filename}")
file = io.open(f"{pdfs_folder_path}/{filename}", encoding="utf-8")
text = "".join(line.rstrip('\n') for line in file)
file = open(f"{pdfs_folder_path}/{filename}", "r", encoding="utf-8")
text = file.read()
print("Cleaning text")
preprocessed = clean_text(text)
print(f"Making spacy doc")
date = filename[:len("YYYYMMDD")]
# Creates a spacy doc out of each text
# TODO: Find out if its possible to just use nlp and then add it
doc = textacy.make_spacy_doc((preprocessed, {"date": date}), "es")
# Shows an overview of document
print(filename, doc._.preview)
print(f"Adding to docs!")
docs.append(doc)
print(f"Adding doc to corpus")
corpus.append(doc)
print(f"Docs: {len(docs)}")
print(f"All docs added: {len(corpus)}")
# Split documents into tokens(units of text, commonly known as words)
def get_tokenize_docs(documents):
return [
d._.to_terms_list(ngrams=1, named_entities=True, as_strings=True, filter_stops=False,
normalize='lower')
d._.to_terms_list(ngrams=1, entities=True, as_strings=True, filter_stops=False, normalize='lower')
for d in documents]
def find(needle, haystack):
if needle == haystack: return []
# Strings are iterable, too
if isinstance(haystack, str) and len(haystack) <= 1:
return None
try:
for i, e in enumerate(haystack):
r = find(needle, e)
if r is not None:
r.insert(0, i)
return r
except TypeError:
pass
return None
searched_word = "paz".lower()
# Input word to search for, in the future can be a list
searched_word = "concurso".lower()
print("Finding needle in the haystack")
googleIndex = find(searched_word, get_tokenize_docs(docs))
print(f"{searched_word} index", googleIndex)
tf_vectorizer = Vectorizer(apply_idf=False, norm=None)
tf_doc_term_matrix = tf_vectorizer.fit_transform(get_tokenize_docs(docs))
print("tf", tf_doc_term_matrix.toarray())
idf_vectorizer = Vectorizer(apply_idf=False, norm=None)
idf_doc_term_matrix = idf_vectorizer.fit_transform(get_tokenize_docs(docs))
print("idf", idf_doc_term_matrix.toarray())
tfidf_vectorizer = Vectorizer(apply_idf=True, norm=None, idf_type='standard')
tfidf_doc_term_matrix = tfidf_vectorizer.fit_transform(get_tokenize_docs(docs))
print("tfidf", tfidf_doc_term_matrix.toarray())
# Creates a matrice of all the terms in the documents, first creates the vector then fill it with vocabulary
vectorizer = textacy.vsm.Vectorizer(apply_idf=False, norm=None)
doc_term_matrix = vectorizer.fit_transform(get_tokenize_docs(corpus))
# -1 means not found
searched_word_index = -1
# Check if index of term exists in vector
try:
searched_word_index = vectorizer.terms_list.index(searched_word)
except:
print("Word not found")
# Word is in corpus
if searched_word_index != -1:
print("vocab", searched_word_index)
tf = textacy.vsm.matrix_utils.get_term_freqs(doc_term_matrix, type_="log")[searched_word_index]
print("tf", tf)
idf = textacy.vsm.matrix_utils.get_inverse_doc_freqs(doc_term_matrix)[searched_word_index]
print("idf", idf)
print("tf-idf", tf * idf)
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment