FileParse logo
Sign up

Paperless-ngx FileParse Integration

Overview

This integration allows Paperless-ngx to automatically process consumed documents through FileParse. By using a post-consume script, documents are sent to FileParse for high-quality parsing into Markdown, with the results written back to a custom field in Paperless-ngx.

Paperless consumes document
       ↓
post_consume.py executes
       ↓
Uploads to FileParse
       ↓
Polls until parsing complete
       ↓
Writes parsed markdown to Paperless custom field

Prerequisites

  • A FileParse account with an API key.
  • A Paperless-ngx deployment (Docker recommended).
  • Python 3.8+ available within the Paperless container.
  • Network connectivity from your Paperless container to FileParse.

Setup

  1. Create a FileParse account and generate an API key from the settings page.
  2. Obtain a Paperless API token from your Paperless-ngx profile page.
  3. Mount the post_consume.py script into your Paperless container.
volumes:
  - ./integrations/paperless-ngx/post_consume.py:/usr/src/paperless/scripts/post_consume.py:ro
  1. Set the required environment variables in your docker-compose.env or docker-compose.yml file.
PAPERLESS_POST_CONSUME_SCRIPT=/usr/src/paperless/scripts/post_consume.py
FILEPARSE_URL=https://fileparse.example.com
FILEPARSE_API_KEY=<your-api-key>
PAPERLESS_URL=http://paperless:8000
PAPERLESS_TOKEN=<your-token>
  1. Ensure the PAPERLESS_POST_CONSUME_SCRIPT environment variable points to the correct path of the script.
  2. Restart your Paperless container to apply the changes.
  3. Test the integration by uploading a sample document to Paperless.

Configuration

VariableRequiredDefaultDescription
FILEPARSE_URLYes-Your FileParse URL (e.g. https://fileparse.example.com)
FILEPARSE_API_KEYYes-FileParse API key from your settings page
PAPERLESS_URLYes-Your Paperless-ngx URL (e.g. http://paperless:8000)
PAPERLESS_TOKENYes-Paperless API token from your profile page
FILEPARSE_TIMEOUTNo300Max seconds to wait for parsing
FILEPARSE_POLL_INTERVALNo5Seconds between poll requests
FILEPARSE_CLEANUPNotrueDelete from FileParse after success
FILEPARSE_CUSTOM_FIELD_NAMENo"Parsed Content"Paperless custom field name

How It Works

The integration uses Paperless-ngx's post-consume script functionality. These scripts run synchronously during the document consumption process.

  • The script receives DOCUMENT_ID, DOCUMENT_SOURCE_PATH, and DOCUMENT_FILE_NAME from Paperless.
  • It uploads the document via a multipart POST request to /api/v1/documents.
  • The script then polls GET /api/v1/documents/{id} until the status is either completed or failed.
  • If the specified custom field is missing in Paperless, the script automatically creates it via the Paperless API.
  • Once parsing is complete, the script updates the document in Paperless via PATCH /api/documents/{id}/.
  • If FILEPARSE_CLEANUP is set to true, the document is deleted from FileParse after successful processing.

Troubleshooting

  • Script not executing: Check file permissions (ensure it's executable), verify the path in PAPERLESS_POST_CONSUME_SCRIPT, and check the Paperless logs for errors.
  • Authentication failed: Double-check that FILEPARSE_API_KEY and PAPERLESS_TOKEN are correct and have the necessary permissions.
  • Timeout: If parsing is taking too long (large documents or high server load), increase FILEPARSE_TIMEOUT.
  • Quota exceeded: You've hit your FileParse storage limit. Delete old documents to free up space.
  • Custom field not appearing: Verify that the Paperless user associated with the token has permission to manage custom fields and update documents.

Limitations

  • Blocking script: The post-consume script is synchronous, which means it will slow down the document consumption process in Paperless.
  • Network requirements: Your Paperless container must be able to reach FileParse over the network.
  • File size limits: FileParse's standard limits apply (50MB per file, 1GB total quota).
  • Supported types: Only PDF, PNG, and JPEG files are processed by this integration.
  • Polling-based: The script repeatedly checks FileParse for results, which adds some overhead during document consumption.

post_consume.py

#!/usr/bin/env python3
"""
Paperless-ngx post-consume hook for FileParse integration.

This script is executed by Paperless-ngx after document consumption.
It uploads the document to FileParse for parsing, polls for completion,
and writes the parsed content back to Paperless as a custom field.
"""

import json
import logging
import os
import sys
import time
import urllib.request
from typing import Optional

# Configure logging
logging.basicConfig(
	level=logging.INFO,
	format="[FileParse] %(levelname)s: %(message)s",
	stream=sys.stderr,
)
logger = logging.getLogger(__name__)

# Paperless environment variables (set by Paperless-ngx)
DOCUMENT_ID = os.getenv("DOCUMENT_ID")
DOCUMENT_SOURCE_PATH = os.getenv("DOCUMENT_SOURCE_PATH")
DOCUMENT_FILE_NAME = os.getenv("DOCUMENT_FILE_NAME")
DOCUMENT_ARCHIVE_PATH = os.getenv("DOCUMENT_ARCHIVE_PATH")
DOCUMENT_DOWNLOAD_URL = os.getenv("DOCUMENT_DOWNLOAD_URL")
DOCUMENT_ORIGINAL_FILENAME = os.getenv("DOCUMENT_ORIGINAL_FILENAME")
DOCUMENT_OWNER = os.getenv("DOCUMENT_OWNER")
DOCUMENT_CORRESPONDENT = os.getenv("DOCUMENT_CORRESPONDENT")
DOCUMENT_TAGS = os.getenv("DOCUMENT_TAGS")
TASK_ID = os.getenv("TASK_ID")

# FileParse configuration
FILEPARSE_URL = os.getenv("FILEPARSE_URL")
FILEPARSE_API_KEY = os.getenv("FILEPARSE_API_KEY")
FILEPARSE_TIMEOUT = int(os.getenv("FILEPARSE_TIMEOUT", "300"))
FILEPARSE_POLL_INTERVAL = int(os.getenv("FILEPARSE_POLL_INTERVAL", "5"))
FILEPARSE_CLEANUP = os.getenv("FILEPARSE_CLEANUP", "true").lower() == "true"
FILEPARSE_CUSTOM_FIELD_NAME = os.getenv("FILEPARSE_CUSTOM_FIELD_NAME", "Parsed Content")

# Paperless configuration
PAPERLESS_URL = os.getenv("PAPERLESS_URL")
PAPERLESS_TOKEN = os.getenv("PAPERLESS_TOKEN")


def _error_status_code(exc: Exception) -> Optional[int]:
	status_code = getattr(exc, "code", None)
	if isinstance(status_code, int):
		return status_code
	return None


def _error_body(exc: Exception) -> str:
	read_func = getattr(exc, "read", None)
	if not callable(read_func):
		return ""
	try:
		data = read_func()
		if isinstance(data, bytes):
			return data.decode("utf-8", errors="replace")
		if isinstance(data, str):
			return data
	except Exception:
		return "<unable to read error body>"
	return ""


def upload_to_fileparse() -> Optional[str]:
	"""
	Upload the document to FileParse.

	Returns:
		Document ID from FileParse response, or None on failure.
	"""
	if not DOCUMENT_SOURCE_PATH:
		logger.error("Missing required environment variable: DOCUMENT_SOURCE_PATH")
		return None
	if not FILEPARSE_URL:
		logger.error("Missing required environment variable: FILEPARSE_URL")
		return None

	source_path_lower = DOCUMENT_SOURCE_PATH.lower()
	if not source_path_lower.endswith((".pdf", ".png", ".jpg", ".jpeg")):
		logger.info(
			"Skipping unsupported file type for path: %s",
			DOCUMENT_SOURCE_PATH,
		)
		return None

	file_name = DOCUMENT_FILE_NAME or DOCUMENT_ORIGINAL_FILENAME
	if not file_name:
		file_name = os.path.basename(DOCUMENT_SOURCE_PATH)

	mime_type = "application/octet-stream"
	if source_path_lower.endswith(".pdf"):
		mime_type = "application/pdf"
	elif source_path_lower.endswith(".png"):
		mime_type = "image/png"
	elif source_path_lower.endswith((".jpg", ".jpeg")):
		mime_type = "image/jpeg"

	try:
		with open(DOCUMENT_SOURCE_PATH, "rb") as file_handle:
			file_content = file_handle.read()
	except OSError as exc:
		logger.error("Failed to read source file: %s", exc)
		return None

	logger.info("Uploading document to FileParse")
	boundary = "----FileParseBoundary{}".format(int(time.time() * 1000))

	body_prefix = (
		"--{}\r\n"
		"Content-Disposition: form-data; name=\"file\"; filename=\"{}\"\r\n"
		"Content-Type: {}\r\n\r\n"
	).format(boundary, file_name, mime_type).encode("utf-8")
	body_suffix = "\r\n--{}--\r\n".format(boundary).encode("utf-8")
	body = body_prefix + file_content + body_suffix

	upload_url = "{}/api/v1/documents".format(FILEPARSE_URL.rstrip("/"))
	request = urllib.request.Request(
		upload_url,
		data=body,
		method="POST",
		headers={
			"X-API-Key": FILEPARSE_API_KEY or "",
			"Content-Type": "multipart/form-data; boundary={}".format(boundary),
		},
	)

	try:
		with urllib.request.urlopen(request) as response:
			response_body = response.read().decode("utf-8")
			response_json = json.loads(response_body)
			document_id = response_json.get("id")
			if not isinstance(document_id, str) or not document_id:
				logger.error("FileParse upload response missing document ID")
				return None
			return document_id
	except Exception as exc:
		status_code = _error_status_code(exc)
		if status_code is not None:
			body_text = _error_body(exc)
			logger.error("FileParse upload failed with status %s: %s", status_code, body_text)
			if status_code in (401, 403):
				logger.error("Authentication failed. Check FILEPARSE_API_KEY.")
			if status_code == 413:
				logger.error("FileParse storage quota exceeded.")
		else:
			logger.error(
				"Failed to connect to FileParse upload endpoint: %s. Check FILEPARSE_URL and network connectivity.",
				exc,
			)
		return None


def poll_until_done(fileparse_doc_id: str) -> Optional[str]:
	"""
	Poll FileParse until document parsing is complete.

	Args:
		fileparse_doc_id: Document ID returned from FileParse upload.

	Returns:
		Parsed markdown content, or None on timeout/failure.
	"""
	if not FILEPARSE_URL:
		logger.error("Missing required environment variable: FILEPARSE_URL")
		return None

	poll_url = "{}/api/v1/documents/{}".format(
		FILEPARSE_URL.rstrip("/"),
		fileparse_doc_id,
	)
	start_time = time.time()
	poll_count = 0

	while True:
		elapsed = int(time.time() - start_time)
		if elapsed >= FILEPARSE_TIMEOUT:
			logger.error("Parsing timed out after %ss. The document may still be processing.", FILEPARSE_TIMEOUT)
			return None

		poll_count += 1
		if poll_count % 5 == 0:
			logger.info(
				"Polling FileParse status (attempt %s, elapsed %ss)",
				poll_count,
				elapsed,
			)

		request = urllib.request.Request(
			poll_url,
			headers={"X-API-Key": FILEPARSE_API_KEY or ""},
		)

		try:
			with urllib.request.urlopen(request) as response:
				response_body = response.read().decode("utf-8")
				response_json = json.loads(response_body)
		except Exception as exc:
			status_code = _error_status_code(exc)
			if status_code is not None:
				body_text = _error_body(exc)
				logger.error("FileParse poll failed with status %s: %s", status_code, body_text)
				if status_code in (401, 403):
					logger.error("Authentication failed. Check FILEPARSE_API_KEY.")
			else:
				logger.error(
					"Failed to connect to FileParse polling endpoint: %s. Check FILEPARSE_URL and network connectivity.",
					exc,
				)
			return None

		status = response_json.get("status")
		if status == "completed":
			markdown = response_json.get("markdown")
			if isinstance(markdown, str):
				return markdown
			logger.error("FileParse completed response missing markdown content")
			return None

		if status == "failed":
			error_data = response_json.get("error")
			if isinstance(error_data, dict):
				message = error_data.get("message")
				if isinstance(message, str) and message:
					logger.error("FileParse parsing failed: %s", message)
				else:
					logger.error("FileParse parsing failed")
			else:
				logger.error("FileParse parsing failed")
			return None

		time.sleep(FILEPARSE_POLL_INTERVAL)


def ensure_custom_field() -> Optional[int]:
	"""
	Ensure the custom field exists in Paperless.

	Returns:
		Custom field ID, or None on failure.
	"""
	if not PAPERLESS_URL:
		logger.error("Missing required environment variable: PAPERLESS_URL")
		return None

	list_url = "{}/api/custom_fields/".format(PAPERLESS_URL.rstrip("/"))
	headers = {
		"Authorization": "Token {}".format(PAPERLESS_TOKEN or ""),
	}

	list_request = urllib.request.Request(list_url, headers=headers)
	try:
		with urllib.request.urlopen(list_request) as response:
			response_body = response.read().decode("utf-8")
			response_json = json.loads(response_body)
	except Exception as exc:
		status_code = _error_status_code(exc)
		if status_code is not None:
			body_text = _error_body(exc)
			logger.error("Paperless custom field list failed with status %s: %s", status_code, body_text)
			if status_code in (401, 403):
				logger.error("Authentication failed. Check PAPERLESS_TOKEN.")
		else:
			logger.error(
				"Failed to connect to Paperless custom fields endpoint: %s. Check PAPERLESS_URL and network connectivity.",
				exc,
			)
		return None

	results = response_json.get("results") if isinstance(response_json, dict) else None
	if not isinstance(results, list):
		logger.error("Unexpected Paperless custom fields response format")
		return None

	for field in results:
		if not isinstance(field, dict):
			continue
		name = field.get("name")
		field_id = field.get("id")
		data_type = field.get("data_type")
		if name == FILEPARSE_CUSTOM_FIELD_NAME:
			if data_type != "string":
				logger.warning(
					"Custom field '%s' exists but has data_type '%s' (expected 'string')",
					FILEPARSE_CUSTOM_FIELD_NAME,
					data_type,
				)
			if isinstance(field_id, int):
				return field_id
			logger.error("Custom field '%s' has invalid ID", FILEPARSE_CUSTOM_FIELD_NAME)
			return None

	create_body = json.dumps(
		{
			"name": FILEPARSE_CUSTOM_FIELD_NAME,
			"data_type": "string",
		}
	).encode("utf-8")
	create_request = urllib.request.Request(
		list_url,
		data=create_body,
		method="POST",
		headers={
			"Authorization": "Token {}".format(PAPERLESS_TOKEN or ""),
			"Content-Type": "application/json",
		},
	)

	try:
		with urllib.request.urlopen(create_request) as response:
			response_body = response.read().decode("utf-8")
			response_json = json.loads(response_body)
	except Exception as exc:
		status_code = _error_status_code(exc)
		if status_code is not None:
			body_text = _error_body(exc)
			logger.error(
				"Paperless custom field creation failed with status %s: %s",
				status_code,
				body_text,
			)
			if status_code in (401, 403):
				logger.error("Authentication failed. Check PAPERLESS_TOKEN.")
		else:
			logger.error(
				"Failed to connect to Paperless custom field creation endpoint: %s. Check PAPERLESS_URL and network connectivity.",
				exc,
			)
		return None

	field_id = response_json.get("id") if isinstance(response_json, dict) else None
	if isinstance(field_id, int):
		logger.info("Created Paperless custom field '%s'", FILEPARSE_CUSTOM_FIELD_NAME)
		return field_id

	logger.error("Paperless custom field creation response missing valid field ID")
	return None


def write_to_paperless(custom_field_id: int, parsed_content: str) -> bool:
	"""
	Write parsed content to Paperless document custom field.

	Args:
		custom_field_id: ID of the custom field.
		parsed_content: Markdown content to write.

	Returns:
		True on success, False on failure.
	"""
	if not PAPERLESS_URL:
		logger.error("Missing required environment variable: PAPERLESS_URL")
		return False

	update_url = "{}/api/documents/{}/".format(
		PAPERLESS_URL.rstrip("/"),
		DOCUMENT_ID,
	)
	body = json.dumps(
		{
			"custom_fields": [
				{
					"field": custom_field_id,
					"value": parsed_content,
				}
			],
		}
	).encode("utf-8")

	request = urllib.request.Request(
		update_url,
		data=body,
		method="PATCH",
		headers={
			"Authorization": "Token {}".format(PAPERLESS_TOKEN or ""),
			"Content-Type": "application/json",
		},
	)

	try:
		with urllib.request.urlopen(request) as response:
			if response.status == 200:
				return True
			response_body = response.read().decode("utf-8", errors="replace")
			logger.error(
				"Paperless document update failed with status %s: %s",
				response.status,
				response_body,
			)
			return False
	except Exception as exc:
		status_code = _error_status_code(exc)
		if status_code is not None:
			body_text = _error_body(exc)
			logger.error("Paperless document update failed with status %s: %s", status_code, body_text)
			if status_code in (401, 403):
				logger.error("Authentication failed. Check PAPERLESS_TOKEN.")
		else:
			logger.error(
				"Failed to connect to Paperless document endpoint: %s. Check PAPERLESS_URL and network connectivity.",
				exc,
			)
		return False


def cleanup_fileparse_document(fileparse_doc_id: str) -> bool:
	"""
	Delete the document from FileParse after successful parsing.

	Args:
		fileparse_doc_id: Document ID to delete.

	Returns:
		True on success, False on failure.
	"""
	if not FILEPARSE_URL:
		logger.warning("Missing required environment variable: FILEPARSE_URL")
		return False

	delete_url = "{}/api/v1/documents/{}".format(
		FILEPARSE_URL.rstrip("/"),
		fileparse_doc_id,
	)
	request = urllib.request.Request(
		delete_url,
		method="DELETE",
		headers={"X-API-Key": FILEPARSE_API_KEY or ""},
	)

	try:
		with urllib.request.urlopen(request) as response:
			if response.status == 200:
				return True
			response_body = response.read().decode("utf-8", errors="replace")
			logger.warning(
				"FileParse cleanup failed with status %s: %s",
				response.status,
				response_body,
			)
			return False
	except Exception as exc:
		status_code = _error_status_code(exc)
		if status_code is not None:
			body_text = _error_body(exc)
			logger.warning("FileParse cleanup failed with status %s: %s", status_code, body_text)
			if status_code in (401, 403):
				logger.warning("Authentication failed. Check FILEPARSE_API_KEY.")
		else:
			logger.warning(
				"Failed to connect to FileParse cleanup endpoint: %s. Check FILEPARSE_URL and network connectivity.",
				exc,
			)
		return False


def main() -> int:
	"""
	Main entry point for the post-consume hook.

	Returns:
		Exit code (0 for success, 1 for failure).
	"""
	try:
		# Validate required environment variables
		missing_vars = []
		if not FILEPARSE_URL:
			missing_vars.append("FILEPARSE_URL")
		if not FILEPARSE_API_KEY:
			missing_vars.append("FILEPARSE_API_KEY")
		if not PAPERLESS_URL:
			missing_vars.append("PAPERLESS_URL")
		if not PAPERLESS_TOKEN:
			missing_vars.append("PAPERLESS_TOKEN")
		if missing_vars:
			logger.error("Missing required environment variable(s): %s", ", ".join(missing_vars))
			return 1

		missing_doc_vars = []
		if not DOCUMENT_ID:
			missing_doc_vars.append("DOCUMENT_ID")
		if not DOCUMENT_SOURCE_PATH:
			missing_doc_vars.append("DOCUMENT_SOURCE_PATH")
		if missing_doc_vars:
			logger.error(
				"Missing Paperless document environment variable(s): %s",
				", ".join(missing_doc_vars),
			)
			return 1

		source_path = DOCUMENT_SOURCE_PATH or ""

		# Upload to FileParse
		fileparse_doc_id = upload_to_fileparse()
		if not fileparse_doc_id:
			if source_path.lower().endswith((".pdf", ".png", ".jpg", ".jpeg")):
				logger.error("Failed to upload document to FileParse")
				return 1
			logger.info("No processing required for unsupported file type")
			return 0

		logger.info("Uploaded to FileParse: %s", fileparse_doc_id)

		# Poll for completion
		parsed_content = poll_until_done(fileparse_doc_id)
		if parsed_content is None:
			logger.error("Failed to retrieve parsed content from FileParse")
			return 1

		logger.info("Document parsing completed")

		# Ensure custom field exists
		custom_field_id = ensure_custom_field()
		if custom_field_id is None:
			logger.error("Failed to ensure custom field exists")
			return 1

		# Write to Paperless
		if not write_to_paperless(custom_field_id, parsed_content):
			logger.error("Failed to write parsed content to Paperless")
			return 1

		logger.info("Parsed content written to Paperless")

		# Cleanup FileParse document if enabled
		if FILEPARSE_CLEANUP:
			if not cleanup_fileparse_document(fileparse_doc_id):
				logger.warning("Failed to cleanup FileParse document (non-fatal)")

		logger.info("Post-consume hook completed successfully")
		return 0

	except Exception as exc:
		logger.error("Unexpected error: %s", exc, exc_info=True)
		return 1


if __name__ == "__main__":
	sys.exit(main())