PK4N̰@@pipomatic_hudge_xtracta.py"Xtracta package" __version__ = "0.2" #!/usr/bin/env python # coding: utf-8 # In[1]: import json import os import re from pathlib import Path from typing import Any, Dict, List, Union # From mypy library import numpy as np import pandas as pd import requests import xmltodict # type: ignore from dotenv import find_dotenv, load_dotenv from jinja2 import Template from PIL import Image, ImageSequence # type: ignore from pandas.api.types import ( is_datetime64_any_dtype, is_numeric_dtype, is_integer_dtype, is_object_dtype, is_string_dtype, ) # ## Set up test environment # In[ ]: number_of_rows = 100 load_dotenv(find_dotenv()) api_key = os.getenv("XTRACTA_API_KEY") database_id = os.getenv("XTRACTA_DATABASE_ID") header_workflow = os.getenv("XTRACTA_HEADER_ID") line_workflow = os.getenv("XTRACTA_LINE_ID") def delete_files(folder): file_generator = folder.glob("**/*") file_list = list(file_generator) for file in file_list: file.unlink() return list(file_list) def move_files(samples_folder, file_to_copy, destination_folder): (destination_folder / file_to_copy).write_bytes( (samples_folder / file_to_copy).read_bytes() ) return file_to_copy def convert_to_numeric_and_date(df, dayfirst=True): for column in df.columns: if is_object_dtype(df[column]) or is_string_dtype(df[column]): try: df[column] = pd.to_numeric(df[column], downcast="integer") except: try: df[column] = df[column].str.replace("$", "") df[column] = df[column].str.replace(",", "") df[column] = pd.to_numeric(df[column]) except: try: df[column] = pd.to_datetime(df[column], dayfirst=dayfirst) except: pass return df def random_dates(start, end, seed=1, replace=True, number_of_rows=100): dates = pd.date_range(start, end).to_series() return dates.sample(number_of_rows, replace=replace, random_state=seed).index def dataframe_obfuscator(df, number_of_rows=100): for column in df.columns: if is_datetime64_any_dtype(df[column]): df[column] = random_dates(min(df[column]), max(df[column]), seed=1) elif is_integer_dtype(df[column]): df[column] = df[column].fillna(0) if min(df[column]) < max(df[column]): df[column] = np.random.randint( min(df[column]), max(df[column]), size=(number_of_rows) ) else: df[column] = min(df[column]) elif is_numeric_dtype(df[column]): df[column] = df[column].fillna(0) df[column] = np.random.uniform( min(df[column]), max(df[column]), size=(number_of_rows) ) else: df[column] = "random text" return df def obfuscate_csv(data_file, dayfirst=True, number_of_rows=100): df = pd.read_csv(data_file, nrows=number_of_rows) df = convert_to_numeric_and_date(df) df = dataframe_obfuscator(df) df.to_csv(data_file, header=True, index=False) return df def obfuscate_excel(data_file, dayfirst=True, number_of_rows=100): df = pd.read_excel(data_file, nrows=number_of_rows) display(df.head()) df = convert_to_numeric_and_date(df) df = dataframe_obfuscator(df) df.to_excel(data_file, header=True, index=False) return df # ## Functions for interacting with Xtracta's API # ### Upload file # # Uploads a PDf or image file for extraction. The classifier field is used if you want to assign a specific classifier to the document rather than letting Xtracta make its own classification decision # In[ ]: def upload_file(api_key, workflow_id, filename, classifier=""): classifier_xml = ( f'{classifier}' ) upload_url = "https://api-app.xtracta.com/v1/documents/upload" file = {"userfile": open(filename, mode="rb")} data = { "api_key": api_key, "workflow_id": workflow_id, "field_data": classifier_xml, } r = requests.post(upload_url, data=data, files=file) if r.status_code != 200: print(r.status_code) return t.text else: response = xmltodict.parse(r.text) return response["xml"]["document_id"] # In[ ]: def get_document(api_key: str, document_id: str): """retrieves the full xml document from Xtracta and converts it to a dict""" documents_url = "https://api-app.xtracta.com/v1/documents" data = {"api_key": api_key, "document_id": document_id} try: r = requests.post(documents_url, data=data) response = xmltodict.parse(r.text) return response except Exception as e: return e.args # In[ ]: def get_xtracta_status( api_key: str, workflow_id: str, status: str, api_download_status: str = "active", detailed: int = 0, documents_order: str = "asc", ) -> list: """Returns a list of all Xtracta documents with a particular status""" documents_url = "https://api-app.xtracta.com/v1/documents" data = { "api_key": api_key, "workflow_id": workflow_id, "document_status": status, "api_download_status": api_download_status, "items_per_page": 1000, "detailed": detailed, "documents_order": documents_order, } try: r = requests.post(documents_url, data=data) response = xmltodict.parse(r.text) except Exception as e: return [e.__str__] try: response_content = response["documents_response"]["document"] if type(response_content) == list: return response_content else: return [response_content] except Exception as e: if type(e).__name__ == "KeyError": return [f"No {status} documents in queue"] else: return [e] # In[ ]: # def find_documents_to_skip(api_key, header_workflow): # """You only want to process documents that have data in the document body. # This function finds documents that are not in this state""" # status_to_skip = ['reject', 'preprocessing', 'output-in-progress'] # items_to_skip = [] # for status in status_to_skip: # queue = get_xtracta_status(api_key, header_workflow, status) # for item in queue: # if item != f'No {status} documents in queue': # items_to_skip.append(item['document_id']) # return items_to_skip # ## Build the output dictionary from Xtracta data # In[ ]: def create_output(document: Dict[Any, Any]) -> Dict[Any, Any]: """Returns a dictionary with document_id, status and version as top level values and remaining fields as key value pairs in a header section""" output = {} header_dict = document["documents_response"]["document"]["field_data"]["field"] header = transform_dict(header_dict) output["document_id"] = document["documents_response"]["document"]["document_id"] output["status"] = document["documents_response"]["document"]["document_status"] output["version"] = document["documents_response"]["document"]["@revision"] output["header"] = header return output def transform_dict(start_dict): end_dict = {} for item in start_dict: end_dict[item["field_name"]] = item["field_value"] return end_dict # In[ ]: def get_documents_wo_json(folder): json_files = [] pdfs = [] json_list = list(folder.glob("*.json")) pdf_list = list(folder.glob("*.pdf")) for file in json_list: json_files.append(file.stem) for pdf in pdf_list: pdfs.append(pdf.stem) new_documents = list(set(pdfs) - set(json_files)) return new_documents # In[ ]: def open_document_ui(api_key: str, document_id: str) -> str: """Opens the Xtracta UI to fix and train documents""" documents_url = "https://api-app.xtracta.com/v1/documents/ui" data = { "api_key": api_key, "document_id": int(document_id), "buttons": "output,archive", "no_lockout": 1, "expire": 86400, } r = requests.post(documents_url, data=data) response = xmltodict.parse(r.text) return response["documents_response"]["url"] # In[ ]: def update_document( api_key: str, document_id: str, delete: int = 0, api_download_status: str = "active" ) -> Dict[str, str]: """Updates document on Xtracta""" documents_url = "https://api-app.xtracta.com/v1/documents/update" data = { "api_key": api_key, "document_id": int(document_id), "delete": delete, "api_download_status": api_download_status, } r = requests.post(documents_url, data=data) response = xmltodict.parse(r.text) return response["documents_response"] # In[ ]: def get_lines(document): lines_dict = document["documents_response"]["document"]["field_data"]["field_set"][ "row" ] lines = [] if len(lines_dict) > 1: for line_dict in lines_dict: line = transform_dict(line_dict["field"]) lines.append(line) else: line = transform_dict(lines_dict["field"]) lines.append(line) return lines # ## Write JSON files # In[ ]: def write_json_simple(filename, output): with open(f"{filename}", "w") as f: f.write(json.dumps(output, indent=4)) return filename # In[ ]: def write_json(old_name, new_name, dp, output): print(old_name, new_name, dp) duplicate_name = dp / new_name.name if new_name.exists(): with open(f"{duplicate_name}", "w") as f: f.write(json.dumps(output, indent=4)) else: with open(f"{new_name}", "w") as f: f.write(json.dumps(output, indent=4)) if new_name.exists(): old_name.unlink() return new_name else: return "File not saved!" # ## Build output once in output status # In[ ]: def build_out_output(filename, document, output): output["stem"] = filename.stem output[ "new_filename" ] = f"{output['header']['supplier_id']}-{output['header']['invoice_number']}" output["header"]["emaildate"] = get_email_date(filename) output["document_url"] = document["documents_response"]["document"]["document_url"] output["image_urls"] = get_image_urls( document["documents_response"]["document"]["image_url"] ) return output def get_email_date(filename): year = filename.stem[:4] month = filename.stem[4:6] day = filename.stem[6:8] return f"{year}-{month}-{day}" def get_image_urls(image_urls): if type(image_urls) != list: image_urls = [image_urls] return image_urls # ## Pull company name and location from filename # In[ ]: def add_company_location(output): company_extract = re.compile(r".*\[<<(.*)>>\].*") company_location = company_extract.match(output["header"]["email_subject"])[1] try: output["company"], output["location"] = company_location.split("-") except: output["company"] = company_location output["location"] = "NA" return output # ## Moving files in the file system # ## Formatting XML for upload into Xtracta's database # # Take a list of dicts and format it for uploading to Xtracta's database API # In[ ]: def update_database_data(api_key, database_id, out, refresh): documents_url = "https://api-app.xtracta.com/v1/databases/data_add" data = { "api_key": api_key, "database_id": int(database_id), "data": out, "refresh": refresh, } r = requests.post(documents_url, data=data) response = xmltodict.parse(r.text) return response # In[ ]: def build_xml_data(supplier_data_dict): xml_rows = [] for row in supplier_data_dict: po = { "column": [ {"@id": "55261", "#text": f"{row['po_number']}"}, {"@id": "55264", "#text": f"{row['supplier_number']}"}, {"@id": "60223", "#text": f"{row['line_number']}"}, {"@id": "58133", "#text": f"{row['abn']}"}, {"@id": "58134", "#text": f"{row['bsb']}"}, {"@id": "58135", "#text": f"{row['bank_account']}"}, {"@id": "58242", "#text": f"{row['supplier_name']}"}, ] } xml_rows.append(po) xml_data = {"xml": {"row": xml_rows}} return xmltodict.unparse(xml_data, pretty=True) # ## Build HTML file for handling rejections # In[ ]: def create_html_section(data, html_template): template = Template(html_template) html = template.render(data=data) return html reject_queue_html_template = """

Total number of rejects in queue: {{data.reject_count}}

{{data.output.header.supplier}}

Invoice number: {{data.output.header.invoice_number}}

Net: {{"$%.2f"|format(data.output.header.net_total|float)}}

GST: {{"$%.2f"|format(data.output.header.gst_total|float)}}

Total: {{"$%.2f"|format(data.output.header.gross_total|float)}}

{% for message in data.messages %} {% endfor %}
FieldMessage
{{message.field}} {{message.message}}

Review invoice

Invoice Image

""" def get_reject_html(api_key, workflow_id, status, html_template): queue = get_xtracta_status(api_key, workflow_id, status) reject_count = len(queue) document_id = queue[0]["document_id"] reasons = queue[0]["rejection"]["reason"] messages = get_reject_info(api_key, document_id, reasons) document = get_document(api_key, document_id) image_url = document["documents_response"]["document"]["image_url"][0] output = create_output(document) review_link = open_document_ui(api_key, document_id) data = { "output": output, "reject_count": reject_count, "review_link": review_link, "invoice_image": image_url, "messages": messages, } html = create_html_section(data=data, html_template=html_template) return html # In[ ]: def get_reject_info(api_key, document_id, reasons): messages = [] document = get_document(api_key, document_id) field_ids = get_field_ids(document) if type(reasons) != list: field_id = reasons["linked_field"]["field_id"] message = reasons["message"] messages.append({"field": field_ids[field_id], "message": message}) else: for sub_item in reasons: field_id = sub_item["linked_field"]["field_id"] message = sub_item["message"] messages.append({"field": field_ids[field_id], "message": message}) return messages def get_field_ids(document): field_ids = {} fields = document["documents_response"]["document"]["field_data"]["field"] for field in fields: field_ids[field["field_id"]] = field["field_name"] return field_ids # ## Build code # # The remaining cells load the code to PIP # In[ ]: PKm4Ne??-pipomatic_hudge_xtracta-0.2.dist-info/LICENSEMIT License Copyright (c) 2019 pipomatic Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. PK!Hd BUc+pipomatic_hudge_xtracta-0.2.dist-info/WHEEL HM K-*ϳR03rOK-J,/RH,rzd&Y)r$[)T&UD"PK!Ht[.pipomatic_hudge_xtracta-0.2.dist-info/METADATAur0{=Ž & 9Y~:o7,dH昬rjrlcMԫ:-+Q"Sa0(V$d}鮧l)QΦީOKC^CYZ^B)ٳa"lw+XoVgnPK!HW0,pipomatic_hudge_xtracta-0.2.dist-info/RECORD˚BP}r2hBP6>#xY͢H UcvpO!a7e­ y=E5OS/0lCRx'.\ dU[/ԐsDs* mvs8e}| Iv#)` cN"hٶ:o euy-YpأUNʜSBtQi6[P 4J3??\|PK4N̰@@pipomatic_hudge_xtracta.pyPKm4Ne??-L@pipomatic_hudge_xtracta-0.2.dist-info/LICENSEPK!Hd BUc+Dpipomatic_hudge_xtracta-0.2.dist-info/WHEELPK!Ht[.tEpipomatic_hudge_xtracta-0.2.dist-info/METADATAPK!HW0,xFpipomatic_hudge_xtracta-0.2.dist-info/RECORDPKG