#!/usr/bin/env python3 """ Python replacement for COLMAP's crashing exhaustive_matcher on Apple Silicon. Reads SIFT features from the COLMAP SQLite database, matches them with OpenCV BFMatcher (Lowe ratio test), verifies with RANSAC, and writes matches + two_view_geometries back to the database. COLMAP's mapper reads two_view_geometries — no need to re-run any COLMAP matcher binary after this script. """ import argparse import sqlite3 import numpy as np import cv2 import sys from pathlib import Path MIN_INLIERS = 15 # reject pairs with fewer verified matches RATIO_TEST = 0.75 # Lowe's ratio threshold RANSAC_ERROR = 4.0 # max reprojection error in pixels for RANSAC # COLMAP 4.x pair_id formula: kMaxNumImages * min(id1,id2) + max(id1,id2) KMAX = 2_147_483_647 def pair_id(id1: int, id2: int) -> int: lo, hi = (id1, id2) if id1 < id2 else (id2, id1) return KMAX * lo + hi def read_images(cur): cur.execute("SELECT image_id, name FROM images ORDER BY name") return cur.fetchall() # [(image_id, name), ...] def load_all(cur, image_ids): descs, kpts = {}, {} for iid in image_ids: cur.execute("SELECT rows, cols, data FROM descriptors WHERE image_id=?", (iid,)) r = cur.fetchone() if r: descs[iid] = np.frombuffer(r[2], dtype=np.uint8).reshape(r[0], r[1]) else: descs[iid] = np.zeros((0, 128), dtype=np.uint8) cur.execute("SELECT rows, cols, data FROM keypoints WHERE image_id=?", (iid,)) r = cur.fetchone() if r: kp = np.frombuffer(r[2], dtype=np.float32).reshape(r[0], r[1]) kpts[iid] = kp[:, :2] # x, y else: kpts[iid] = np.zeros((0, 2), dtype=np.float32) return descs, kpts def match_pair(desc1, desc2, kp1, kp2): if len(desc1) < 8 or len(desc2) < 8: return None, None bf = cv2.BFMatcher(cv2.NORM_L2) raw = bf.knnMatch(desc1.astype(np.float32), desc2.astype(np.float32), k=2) good = [] for m_pair in raw: if len(m_pair) == 2: m, n = m_pair if m.distance < RATIO_TEST * n.distance: good.append((m.queryIdx, m.trainIdx)) if len(good) < MIN_INLIERS: return None, None arr = np.array(good, dtype=np.uint32) pts1 = kp1[arr[:, 0]] pts2 = kp2[arr[:, 1]] F, mask = cv2.findFundamentalMat( pts1, pts2, cv2.FM_RANSAC, ransacReprojThreshold=RANSAC_ERROR, confidence=0.9999, maxIters=2000, ) if F is None or mask is None: return None, None inliers = arr[mask.ravel().astype(bool)] if len(inliers) < MIN_INLIERS: return None, None return inliers, F def write_pair(cur, pid, inliers, F): blob = inliers.astype(np.uint32).tobytes() zeros9 = np.zeros(9, dtype=np.float64).tobytes() zeros4 = np.zeros(4, dtype=np.float64).tobytes() zeros3 = np.zeros(3, dtype=np.float64).tobytes() F_blob = F.flatten().astype(np.float64).tobytes() cur.execute( "INSERT OR REPLACE INTO matches (pair_id, rows, cols, data) VALUES (?,?,?,?)", (pid, len(inliers), 2, blob), ) cur.execute( "INSERT OR REPLACE INTO two_view_geometries " "(pair_id, rows, cols, data, config, F, E, H, qvec, tvec) " "VALUES (?,?,?,?,?,?,?,?,?,?)", (pid, len(inliers), 2, blob, 3, # UNCALIBRATED — uses F matrix F_blob, zeros9, zeros9, zeros4, zeros3), ) def sequential_pairs(ids, overlap): pairs = [] n = len(ids) for i in range(n): for j in range(i + 1, min(i + overlap + 1, n)): pairs.append((ids[i], ids[j])) return pairs def main(): p = argparse.ArgumentParser() p.add_argument("--db", default="my_scene/database.db") p.add_argument("--overlap", type=int, default=50) args = p.parse_args() db_path = args.db db = sqlite3.connect(db_path) db.execute("PRAGMA journal_mode=WAL") cur = db.cursor() images = read_images(cur) ids = [r[0] for r in images] print(f"Images: {len(ids)}") print("Loading descriptors & keypoints into memory…") descs, kpts = load_all(cur, ids) total_feats = sum(len(d) for d in descs.values()) print(f"Loaded {total_feats:,} keypoints total") overlap = args.overlap pairs = sequential_pairs(ids, overlap) print(f"Pairs to match: {len(pairs)} (sequential overlap={overlap})") matched = skipped = 0 for i, (id1, id2) in enumerate(pairs): if i % 200 == 0: pct = 100 * i / len(pairs) print(f" [{i}/{len(pairs)} {pct:.0f}%] matched={matched}", flush=True) inliers, F = match_pair(descs[id1], descs[id2], kpts[id1], kpts[id2]) if inliers is not None: write_pair(cur, pair_id(id1, id2), inliers, F) matched += 1 else: skipped += 1 if i % 500 == 0: db.commit() db.commit() db.close() print(f"\nDone. {matched} pairs matched, {skipped} below threshold.") print(f"Now run: colmap mapper --database_path {db_path} " f"--image_path my_scene/images --output_path my_scene/sparse") if __name__ == "__main__": main()