This integration allows Paperless-ngx to automatically process consumed documents through FileParse. By using a post-consume script, documents are sent to FileParse for high-quality parsing into Markdown, with the results written back to a custom field in Paperless-ngx.
Paperless consumes document
↓
post_consume.py executes
↓
Uploads to FileParse
↓
Polls until parsing complete
↓
Writes parsed markdown to Paperless custom field
post_consume.py script into your Paperless container.volumes:
- ./integrations/paperless-ngx/post_consume.py:/usr/src/paperless/scripts/post_consume.py:ro
docker-compose.env or docker-compose.yml file.PAPERLESS_POST_CONSUME_SCRIPT=/usr/src/paperless/scripts/post_consume.py
FILEPARSE_URL=https://fileparse.example.com
FILEPARSE_API_KEY=<your-api-key>
PAPERLESS_URL=http://paperless:8000
PAPERLESS_TOKEN=<your-token>
PAPERLESS_POST_CONSUME_SCRIPT environment variable points to the correct path of the script.| Variable | Required | Default | Description |
|---|---|---|---|
| FILEPARSE_URL | Yes | - | Your FileParse URL (e.g. https://fileparse.example.com) |
| FILEPARSE_API_KEY | Yes | - | FileParse API key from your settings page |
| PAPERLESS_URL | Yes | - | Your Paperless-ngx URL (e.g. http://paperless:8000) |
| PAPERLESS_TOKEN | Yes | - | Paperless API token from your profile page |
| FILEPARSE_TIMEOUT | No | 300 | Max seconds to wait for parsing |
| FILEPARSE_POLL_INTERVAL | No | 5 | Seconds between poll requests |
| FILEPARSE_CLEANUP | No | true | Delete from FileParse after success |
| FILEPARSE_CUSTOM_FIELD_NAME | No | "Parsed Content" | Paperless custom field name |
The integration uses Paperless-ngx's post-consume script functionality. These scripts run synchronously during the document consumption process.
DOCUMENT_ID, DOCUMENT_SOURCE_PATH, and DOCUMENT_FILE_NAME from Paperless./api/v1/documents.GET /api/v1/documents/{id} until the status is either completed or failed.PATCH /api/documents/{id}/.FILEPARSE_CLEANUP is set to true, the document is deleted from FileParse after successful processing.PAPERLESS_POST_CONSUME_SCRIPT, and check the Paperless logs for errors.FILEPARSE_API_KEY and PAPERLESS_TOKEN are correct and have the necessary permissions.FILEPARSE_TIMEOUT.#!/usr/bin/env python3
"""
Paperless-ngx post-consume hook for FileParse integration.
This script is executed by Paperless-ngx after document consumption.
It uploads the document to FileParse for parsing, polls for completion,
and writes the parsed content back to Paperless as a custom field.
"""
import json
import logging
import os
import sys
import time
import urllib.request
from typing import Optional
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="[FileParse] %(levelname)s: %(message)s",
stream=sys.stderr,
)
logger = logging.getLogger(__name__)
# Paperless environment variables (set by Paperless-ngx)
DOCUMENT_ID = os.getenv("DOCUMENT_ID")
DOCUMENT_SOURCE_PATH = os.getenv("DOCUMENT_SOURCE_PATH")
DOCUMENT_FILE_NAME = os.getenv("DOCUMENT_FILE_NAME")
DOCUMENT_ARCHIVE_PATH = os.getenv("DOCUMENT_ARCHIVE_PATH")
DOCUMENT_DOWNLOAD_URL = os.getenv("DOCUMENT_DOWNLOAD_URL")
DOCUMENT_ORIGINAL_FILENAME = os.getenv("DOCUMENT_ORIGINAL_FILENAME")
DOCUMENT_OWNER = os.getenv("DOCUMENT_OWNER")
DOCUMENT_CORRESPONDENT = os.getenv("DOCUMENT_CORRESPONDENT")
DOCUMENT_TAGS = os.getenv("DOCUMENT_TAGS")
TASK_ID = os.getenv("TASK_ID")
# FileParse configuration
FILEPARSE_URL = os.getenv("FILEPARSE_URL")
FILEPARSE_API_KEY = os.getenv("FILEPARSE_API_KEY")
FILEPARSE_TIMEOUT = int(os.getenv("FILEPARSE_TIMEOUT", "300"))
FILEPARSE_POLL_INTERVAL = int(os.getenv("FILEPARSE_POLL_INTERVAL", "5"))
FILEPARSE_CLEANUP = os.getenv("FILEPARSE_CLEANUP", "true").lower() == "true"
FILEPARSE_CUSTOM_FIELD_NAME = os.getenv("FILEPARSE_CUSTOM_FIELD_NAME", "Parsed Content")
# Paperless configuration
PAPERLESS_URL = os.getenv("PAPERLESS_URL")
PAPERLESS_TOKEN = os.getenv("PAPERLESS_TOKEN")
def _error_status_code(exc: Exception) -> Optional[int]:
status_code = getattr(exc, "code", None)
if isinstance(status_code, int):
return status_code
return None
def _error_body(exc: Exception) -> str:
read_func = getattr(exc, "read", None)
if not callable(read_func):
return ""
try:
data = read_func()
if isinstance(data, bytes):
return data.decode("utf-8", errors="replace")
if isinstance(data, str):
return data
except Exception:
return "<unable to read error body>"
return ""
def upload_to_fileparse() -> Optional[str]:
"""
Upload the document to FileParse.
Returns:
Document ID from FileParse response, or None on failure.
"""
if not DOCUMENT_SOURCE_PATH:
logger.error("Missing required environment variable: DOCUMENT_SOURCE_PATH")
return None
if not FILEPARSE_URL:
logger.error("Missing required environment variable: FILEPARSE_URL")
return None
source_path_lower = DOCUMENT_SOURCE_PATH.lower()
if not source_path_lower.endswith((".pdf", ".png", ".jpg", ".jpeg")):
logger.info(
"Skipping unsupported file type for path: %s",
DOCUMENT_SOURCE_PATH,
)
return None
file_name = DOCUMENT_FILE_NAME or DOCUMENT_ORIGINAL_FILENAME
if not file_name:
file_name = os.path.basename(DOCUMENT_SOURCE_PATH)
mime_type = "application/octet-stream"
if source_path_lower.endswith(".pdf"):
mime_type = "application/pdf"
elif source_path_lower.endswith(".png"):
mime_type = "image/png"
elif source_path_lower.endswith((".jpg", ".jpeg")):
mime_type = "image/jpeg"
try:
with open(DOCUMENT_SOURCE_PATH, "rb") as file_handle:
file_content = file_handle.read()
except OSError as exc:
logger.error("Failed to read source file: %s", exc)
return None
logger.info("Uploading document to FileParse")
boundary = "----FileParseBoundary{}".format(int(time.time() * 1000))
body_prefix = (
"--{}\r\n"
"Content-Disposition: form-data; name=\"file\"; filename=\"{}\"\r\n"
"Content-Type: {}\r\n\r\n"
).format(boundary, file_name, mime_type).encode("utf-8")
body_suffix = "\r\n--{}--\r\n".format(boundary).encode("utf-8")
body = body_prefix + file_content + body_suffix
upload_url = "{}/api/v1/documents".format(FILEPARSE_URL.rstrip("/"))
request = urllib.request.Request(
upload_url,
data=body,
method="POST",
headers={
"X-API-Key": FILEPARSE_API_KEY or "",
"Content-Type": "multipart/form-data; boundary={}".format(boundary),
},
)
try:
with urllib.request.urlopen(request) as response:
response_body = response.read().decode("utf-8")
response_json = json.loads(response_body)
document_id = response_json.get("id")
if not isinstance(document_id, str) or not document_id:
logger.error("FileParse upload response missing document ID")
return None
return document_id
except Exception as exc:
status_code = _error_status_code(exc)
if status_code is not None:
body_text = _error_body(exc)
logger.error("FileParse upload failed with status %s: %s", status_code, body_text)
if status_code in (401, 403):
logger.error("Authentication failed. Check FILEPARSE_API_KEY.")
if status_code == 413:
logger.error("FileParse storage quota exceeded.")
else:
logger.error(
"Failed to connect to FileParse upload endpoint: %s. Check FILEPARSE_URL and network connectivity.",
exc,
)
return None
def poll_until_done(fileparse_doc_id: str) -> Optional[str]:
"""
Poll FileParse until document parsing is complete.
Args:
fileparse_doc_id: Document ID returned from FileParse upload.
Returns:
Parsed markdown content, or None on timeout/failure.
"""
if not FILEPARSE_URL:
logger.error("Missing required environment variable: FILEPARSE_URL")
return None
poll_url = "{}/api/v1/documents/{}".format(
FILEPARSE_URL.rstrip("/"),
fileparse_doc_id,
)
start_time = time.time()
poll_count = 0
while True:
elapsed = int(time.time() - start_time)
if elapsed >= FILEPARSE_TIMEOUT:
logger.error("Parsing timed out after %ss. The document may still be processing.", FILEPARSE_TIMEOUT)
return None
poll_count += 1
if poll_count % 5 == 0:
logger.info(
"Polling FileParse status (attempt %s, elapsed %ss)",
poll_count,
elapsed,
)
request = urllib.request.Request(
poll_url,
headers={"X-API-Key": FILEPARSE_API_KEY or ""},
)
try:
with urllib.request.urlopen(request) as response:
response_body = response.read().decode("utf-8")
response_json = json.loads(response_body)
except Exception as exc:
status_code = _error_status_code(exc)
if status_code is not None:
body_text = _error_body(exc)
logger.error("FileParse poll failed with status %s: %s", status_code, body_text)
if status_code in (401, 403):
logger.error("Authentication failed. Check FILEPARSE_API_KEY.")
else:
logger.error(
"Failed to connect to FileParse polling endpoint: %s. Check FILEPARSE_URL and network connectivity.",
exc,
)
return None
status = response_json.get("status")
if status == "completed":
markdown = response_json.get("markdown")
if isinstance(markdown, str):
return markdown
logger.error("FileParse completed response missing markdown content")
return None
if status == "failed":
error_data = response_json.get("error")
if isinstance(error_data, dict):
message = error_data.get("message")
if isinstance(message, str) and message:
logger.error("FileParse parsing failed: %s", message)
else:
logger.error("FileParse parsing failed")
else:
logger.error("FileParse parsing failed")
return None
time.sleep(FILEPARSE_POLL_INTERVAL)
def ensure_custom_field() -> Optional[int]:
"""
Ensure the custom field exists in Paperless.
Returns:
Custom field ID, or None on failure.
"""
if not PAPERLESS_URL:
logger.error("Missing required environment variable: PAPERLESS_URL")
return None
list_url = "{}/api/custom_fields/".format(PAPERLESS_URL.rstrip("/"))
headers = {
"Authorization": "Token {}".format(PAPERLESS_TOKEN or ""),
}
list_request = urllib.request.Request(list_url, headers=headers)
try:
with urllib.request.urlopen(list_request) as response:
response_body = response.read().decode("utf-8")
response_json = json.loads(response_body)
except Exception as exc:
status_code = _error_status_code(exc)
if status_code is not None:
body_text = _error_body(exc)
logger.error("Paperless custom field list failed with status %s: %s", status_code, body_text)
if status_code in (401, 403):
logger.error("Authentication failed. Check PAPERLESS_TOKEN.")
else:
logger.error(
"Failed to connect to Paperless custom fields endpoint: %s. Check PAPERLESS_URL and network connectivity.",
exc,
)
return None
results = response_json.get("results") if isinstance(response_json, dict) else None
if not isinstance(results, list):
logger.error("Unexpected Paperless custom fields response format")
return None
for field in results:
if not isinstance(field, dict):
continue
name = field.get("name")
field_id = field.get("id")
data_type = field.get("data_type")
if name == FILEPARSE_CUSTOM_FIELD_NAME:
if data_type != "string":
logger.warning(
"Custom field '%s' exists but has data_type '%s' (expected 'string')",
FILEPARSE_CUSTOM_FIELD_NAME,
data_type,
)
if isinstance(field_id, int):
return field_id
logger.error("Custom field '%s' has invalid ID", FILEPARSE_CUSTOM_FIELD_NAME)
return None
create_body = json.dumps(
{
"name": FILEPARSE_CUSTOM_FIELD_NAME,
"data_type": "string",
}
).encode("utf-8")
create_request = urllib.request.Request(
list_url,
data=create_body,
method="POST",
headers={
"Authorization": "Token {}".format(PAPERLESS_TOKEN or ""),
"Content-Type": "application/json",
},
)
try:
with urllib.request.urlopen(create_request) as response:
response_body = response.read().decode("utf-8")
response_json = json.loads(response_body)
except Exception as exc:
status_code = _error_status_code(exc)
if status_code is not None:
body_text = _error_body(exc)
logger.error(
"Paperless custom field creation failed with status %s: %s",
status_code,
body_text,
)
if status_code in (401, 403):
logger.error("Authentication failed. Check PAPERLESS_TOKEN.")
else:
logger.error(
"Failed to connect to Paperless custom field creation endpoint: %s. Check PAPERLESS_URL and network connectivity.",
exc,
)
return None
field_id = response_json.get("id") if isinstance(response_json, dict) else None
if isinstance(field_id, int):
logger.info("Created Paperless custom field '%s'", FILEPARSE_CUSTOM_FIELD_NAME)
return field_id
logger.error("Paperless custom field creation response missing valid field ID")
return None
def write_to_paperless(custom_field_id: int, parsed_content: str) -> bool:
"""
Write parsed content to Paperless document custom field.
Args:
custom_field_id: ID of the custom field.
parsed_content: Markdown content to write.
Returns:
True on success, False on failure.
"""
if not PAPERLESS_URL:
logger.error("Missing required environment variable: PAPERLESS_URL")
return False
update_url = "{}/api/documents/{}/".format(
PAPERLESS_URL.rstrip("/"),
DOCUMENT_ID,
)
body = json.dumps(
{
"custom_fields": [
{
"field": custom_field_id,
"value": parsed_content,
}
],
}
).encode("utf-8")
request = urllib.request.Request(
update_url,
data=body,
method="PATCH",
headers={
"Authorization": "Token {}".format(PAPERLESS_TOKEN or ""),
"Content-Type": "application/json",
},
)
try:
with urllib.request.urlopen(request) as response:
if response.status == 200:
return True
response_body = response.read().decode("utf-8", errors="replace")
logger.error(
"Paperless document update failed with status %s: %s",
response.status,
response_body,
)
return False
except Exception as exc:
status_code = _error_status_code(exc)
if status_code is not None:
body_text = _error_body(exc)
logger.error("Paperless document update failed with status %s: %s", status_code, body_text)
if status_code in (401, 403):
logger.error("Authentication failed. Check PAPERLESS_TOKEN.")
else:
logger.error(
"Failed to connect to Paperless document endpoint: %s. Check PAPERLESS_URL and network connectivity.",
exc,
)
return False
def cleanup_fileparse_document(fileparse_doc_id: str) -> bool:
"""
Delete the document from FileParse after successful parsing.
Args:
fileparse_doc_id: Document ID to delete.
Returns:
True on success, False on failure.
"""
if not FILEPARSE_URL:
logger.warning("Missing required environment variable: FILEPARSE_URL")
return False
delete_url = "{}/api/v1/documents/{}".format(
FILEPARSE_URL.rstrip("/"),
fileparse_doc_id,
)
request = urllib.request.Request(
delete_url,
method="DELETE",
headers={"X-API-Key": FILEPARSE_API_KEY or ""},
)
try:
with urllib.request.urlopen(request) as response:
if response.status == 200:
return True
response_body = response.read().decode("utf-8", errors="replace")
logger.warning(
"FileParse cleanup failed with status %s: %s",
response.status,
response_body,
)
return False
except Exception as exc:
status_code = _error_status_code(exc)
if status_code is not None:
body_text = _error_body(exc)
logger.warning("FileParse cleanup failed with status %s: %s", status_code, body_text)
if status_code in (401, 403):
logger.warning("Authentication failed. Check FILEPARSE_API_KEY.")
else:
logger.warning(
"Failed to connect to FileParse cleanup endpoint: %s. Check FILEPARSE_URL and network connectivity.",
exc,
)
return False
def main() -> int:
"""
Main entry point for the post-consume hook.
Returns:
Exit code (0 for success, 1 for failure).
"""
try:
# Validate required environment variables
missing_vars = []
if not FILEPARSE_URL:
missing_vars.append("FILEPARSE_URL")
if not FILEPARSE_API_KEY:
missing_vars.append("FILEPARSE_API_KEY")
if not PAPERLESS_URL:
missing_vars.append("PAPERLESS_URL")
if not PAPERLESS_TOKEN:
missing_vars.append("PAPERLESS_TOKEN")
if missing_vars:
logger.error("Missing required environment variable(s): %s", ", ".join(missing_vars))
return 1
missing_doc_vars = []
if not DOCUMENT_ID:
missing_doc_vars.append("DOCUMENT_ID")
if not DOCUMENT_SOURCE_PATH:
missing_doc_vars.append("DOCUMENT_SOURCE_PATH")
if missing_doc_vars:
logger.error(
"Missing Paperless document environment variable(s): %s",
", ".join(missing_doc_vars),
)
return 1
source_path = DOCUMENT_SOURCE_PATH or ""
# Upload to FileParse
fileparse_doc_id = upload_to_fileparse()
if not fileparse_doc_id:
if source_path.lower().endswith((".pdf", ".png", ".jpg", ".jpeg")):
logger.error("Failed to upload document to FileParse")
return 1
logger.info("No processing required for unsupported file type")
return 0
logger.info("Uploaded to FileParse: %s", fileparse_doc_id)
# Poll for completion
parsed_content = poll_until_done(fileparse_doc_id)
if parsed_content is None:
logger.error("Failed to retrieve parsed content from FileParse")
return 1
logger.info("Document parsing completed")
# Ensure custom field exists
custom_field_id = ensure_custom_field()
if custom_field_id is None:
logger.error("Failed to ensure custom field exists")
return 1
# Write to Paperless
if not write_to_paperless(custom_field_id, parsed_content):
logger.error("Failed to write parsed content to Paperless")
return 1
logger.info("Parsed content written to Paperless")
# Cleanup FileParse document if enabled
if FILEPARSE_CLEANUP:
if not cleanup_fileparse_document(fileparse_doc_id):
logger.warning("Failed to cleanup FileParse document (non-fatal)")
logger.info("Post-consume hook completed successfully")
return 0
except Exception as exc:
logger.error("Unexpected error: %s", exc, exc_info=True)
return 1
if __name__ == "__main__":
sys.exit(main())