ARCNM

Recipes

Recipe — bulk RFQ triage

Process 50–500 CAD files at once, rank by unit cost, fan out to suppliers.

Your inbox just got hit with a 200-part RFQ. Manually quoting each takes a week. ARCNM triages the whole batch in minutes, not days.

By the end you'll have:

  1. Uploaded the whole batch in parallel.
  2. Ranked the parts by unit cost.
  3. Flagged the expensive / failed parts that need human review.
  4. Pre-routed the rest to your supplier fleet.

Architecture

A bounded-concurrency fan-out, then poll each calc to completion:

   inbox/*.step ─→ upload-and-quote (lite tier) ─→ poll GET /{id} ─→ rank + route

Use the lite extraction tier for triage — it is the fastest path. Re-quote the parts you'll actually send with the full tier so the audit blob is as thorough as possible.


Step 1 — Fan out the upload + queue

Each file goes through POST /api/v1/parts/calculations/upload-and-quote (202, multipart) and is then polled to a terminal status. Bound the concurrency so you stay under your rate limits.

import asyncio, os
from pathlib import Path
import httpx

BASE = "https://api.arcnm.io/api/v1"
HEADERS = {"X-API-Key": os.environ["ARCNM_API_KEY"]}
ENV_ID = os.environ["ENV_ID"]
CONCURRENCY = 16

async def poll(client: httpx.AsyncClient, calc_id: str, timeout_s: int = 180) -> dict:
    deadline = asyncio.get_event_loop().time() + timeout_s
    while asyncio.get_event_loop().time() < deadline:
        r = await client.get(f"{BASE}/parts/calculations/{calc_id}", headers=HEADERS)
        r.raise_for_status()
        calc = r.json()
        if calc["status"] in ("succeeded", "failed", "cancelled", "timed_out"):
            return calc
        await asyncio.sleep(2)
    raise TimeoutError(calc_id)

async def quote_one(client, sem, path: Path) -> dict:
    async with sem:
        with open(path, "rb") as fh:
            r = await client.post(
                f"{BASE}/parts/calculations/upload-and-quote",
                headers=HEADERS,
                data={
                    "costing_environment_id": ENV_ID,
                    "part_number": path.stem,
                    "lot_size": 10,                 # triage volume
                    "extraction_tier": "lite",
                },
                files={"cad_file": (path.name, fh, "application/step")},
            )
        r.raise_for_status()
        return await poll(client, r.json()["id"])

async def main():
    files = list(Path("./rfq/").glob("*.step"))
    async with httpx.AsyncClient(timeout=30) as client:
        sem = asyncio.Semaphore(CONCURRENCY)
        results = await asyncio.gather(
            *[quote_one(client, sem, p) for p in files],
            return_exceptions=True,
        )
        for path, result in zip(files, results):
            cost = result.get("unit_cost") if isinstance(result, dict) else str(result)
            print(path.stem, "→", cost)

asyncio.run(main())
import pLimit from "p-limit"
import { readFile, readdir } from "node:fs/promises"

const BASE = "https://api.arcnm.io/api/v1"
const HEADERS = { "X-API-Key": process.env.ARCNM_API_KEY! }
const ENV_ID = process.env.ENV_ID!
const limit = pLimit(16)

async function poll(calcId: string, timeoutMs = 180_000): Promise<any> {
  const deadline = Date.now() + timeoutMs
  while (Date.now() < deadline) {
    const r = await fetch(`${BASE}/parts/calculations/${calcId}`, { headers: HEADERS })
    if (!r.ok) throw new Error(`${r.status}`)
    const calc = await r.json()
    if (["succeeded", "failed", "cancelled", "timed_out"].includes(calc.status)) return calc
    await new Promise(res => setTimeout(res, 2_000))
  }
  throw new Error(`timeout ${calcId}`)
}

const files = (await readdir("./rfq")).filter(f => f.endsWith(".step"))

const results = await Promise.all(
  files.map(name => limit(async () => {
    const form = new FormData()
    form.set("costing_environment_id", ENV_ID)
    form.set("part_number", name.replace(/\.[^.]*$/, ""))
    form.set("lot_size", "10")
    form.set("extraction_tier", "lite")
    form.set("cad_file", new Blob([await readFile(`./rfq/${name}`)]), name)
    const r = await fetch(`${BASE}/parts/calculations/upload-and-quote`,
      { method: "POST", headers: HEADERS, body: form })
    if (!r.ok) throw new Error(`${r.status} ${await r.text()}`)
    return poll((await r.json()).id)
  })),
)

results.forEach((res, i) => console.log(files[i], "→", res.unit_cost))
# One upload per file in parallel; print "<file> → <calc_id>".
# Poll each id afterwards (see Step 2) until it reaches a terminal status.
ls ./rfq/*.step | xargs -P 16 -I {} bash -c '
  RESP=$(curl -s -X POST https://api.arcnm.io/api/v1/parts/calculations/upload-and-quote \
    -H "X-API-Key: $ARCNM_API_KEY" \
    -F "costing_environment_id=$ENV_ID" \
    -F "part_number=$(basename "{}" .step)" \
    -F "cad_file=@{};type=application/step" \
    -F "lot_size=10" \
    -F "extraction_tier=lite")
  echo "{} → $(echo "$RESP" | jq -r .id)"
'

Concurrency 16 is a reasonable default. Watch for 429 responses and back off with exponential backoff + jitter (the API doesn't emit a Retry-After header). See Rate limits.


Step 2 — Rank by unit cost

After the calcs land, sort the succeeded ones by unit_cost and collect the failures separately:

succeeded = [(p, r) for p, r in zip(files, results)
             if isinstance(r, dict) and r["status"] == "succeeded"]
failed     = [(p, r) for p, r in zip(files, results)
             if not (isinstance(r, dict) and r["status"] == "succeeded")]

succeeded.sort(key=lambda pr: pr[1]["unit_cost"])

print(f"{len(succeeded)} priced, {len(failed)} need attention\n")
for path, calc in succeeded[:10]:
    print(f"  {calc['unit_cost']:>8.2f} {calc['currency']}  {path.stem}")

You can also pull the batch back later with GET /api/v1/parts/calculations?limit=200&status_filter=succeeded ({ "items": [...], "count": N }). The list projection includes id, unit_cost, currency, lot_size, and status per row.


Step 3 — Flag parts for human review

Route the failures and the top-of-range prices to a human estimator. A failed calc carries an error object; a high relative price often signals an unusual part:

def needs_review(calc: dict, median_cost: float) -> bool:
    if calc["status"] != "succeeded":
        return True                              # extraction / pricing failed
    return calc["unit_cost"] > 3 * median_cost   # cost outlier vs the batch

costs = sorted(c["unit_cost"] for _, c in succeeded)
median = costs[len(costs) // 2] if costs else 0.0

for path, calc in zip(files, results):
    if isinstance(calc, dict) and needs_review(calc, median):
        send_to_estimator(path, calc)
    elif not isinstance(calc, dict):
        send_to_estimator(path, {"error": str(calc)})

A failed status usually means one of:

  • The CAD or drawing could not be parsed (error.details.retryable tells you whether a re-run is worth trying).
  • The detected discipline isn't one the engine prices yet (unsupported_discipline).
  • The material reference didn't resolve (material_not_resolved).

These are exactly the cases a human should look at.


Step 4 — Re-quote winners with the full tier

For the quotes you'll actually send to the customer, re-quote at the full extraction tier so the PDF carries the most thorough audit blob. Re-use the existing revision via the JSON /quote endpoint so you don't re-upload:

finalists = succeeded[:10]   # cheapest 10 from Step 2

for path, triage in finalists:
    r = requests.post(
        "https://api.arcnm.io/api/v1/parts/calculations/quote",
        headers={"X-API-Key": os.environ["ARCNM_API_KEY"]},
        json={
            "part_revision_id": triage["part_revision_id"],
            "costing_environment_id": os.environ["ENV_ID"],
            "lot_size": triage["lot_size"],
            "extraction_tier": "full",       # upgrade
        },
    )
    r.raise_for_status()
    final = poll(r.json()["id"])             # poll helper from Step 1

Step 5 — Send finalists to suppliers

Map parts to your supplier pool and fan out. The supplier sees only your arcanum_calc_id — keep the audit blob server-side:

import httpx

SUPPLIERS = {
    "shop_a": "https://supplier-a.example.com/rfq",
    "shop_b": "https://supplier-b.example.com/rfq",
}

async def send_to_supplier(client, url, parts):
    payload = [{"part_number": p.stem, "arcanum_calc_id": c["id"]} for p, c in parts]
    await client.post(url, json=payload)

async with httpx.AsyncClient() as client:
    await asyncio.gather(*[
        send_to_supplier(client, url, finalists)
        for url in SUPPLIERS.values()
    ])

If a supplier needs to see the price, fetch it from GET /api/v1/parts/calculations/{id} with a parts:read key and pass on just the unit_cost + currency — the analytics blob stays with you.


Cost

Each run places a wallet hold (cost + margin) when it is enqueued; the lite tier is cheaper than full. Failed requests and idempotent replays don't bill. For current per-run rates and your live spend, see the pricing page and read your usage with a wallet:read-scoped key.


See also