Skip to content

paperscraper.get_dumps

paperscraper.get_dumps

arxiv

Dump arxiv data in JSONL format.

arxiv(start_date: Optional[str] = None, end_date: Optional[str] = None, save_path: str = save_path)

Fetches papers from arXiv based on time range, i.e., start_date and end_date. If the start_date and end_date are not provided, fetches papers from the earliest possible date to the current date. The fetched papers are stored in JSONL format.

Parameters:

Name Type Description Default
start_date str

Start date in format YYYY-MM-DD. Defaults to None.

None
end_date str

End date in format YYYY-MM-DD. Defaults to None.

None
save_path str

Path to save the JSONL dump. Defaults to save_path.

save_path
Source code in paperscraper/get_dumps/arxiv.py
def arxiv(
    start_date: Optional[str] = None,
    end_date: Optional[str] = None,
    save_path: str = save_path,
):
    """
    Fetches papers from arXiv based on time range, i.e., start_date and end_date.
    If the start_date and end_date are not provided, fetches papers from the earliest
    possible date to the current date. The fetched papers are stored in JSONL format.

    Args:
        start_date (str, optional): Start date in format YYYY-MM-DD. Defaults to None.
        end_date (str, optional): End date in format YYYY-MM-DD. Defaults to None.
        save_path (str, optional): Path to save the JSONL dump. Defaults to save_path.
    """
    # Set default dates
    EARLIEST_START = "1991-01-01"
    if start_date is None:
        start_date = EARLIEST_START
    if end_date is None:
        end_date = datetime.today().strftime("%Y-%m-%d")

    # Convert dates to datetime objects
    start_date = datetime.strptime(start_date, "%Y-%m-%d")
    end_date = datetime.strptime(end_date, "%Y-%m-%d")

    if start_date > end_date:
        raise ValueError(
            f"start_date {start_date} cannot be later than end_date {end_date}"
        )

    # Open file for writing results
    with open(save_path, "w") as fp:
        progress_bar = tqdm(total=(end_date - start_date).days + 1)

        current_date = start_date
        while current_date <= end_date:
            next_date = current_date + timedelta(days=1)
            progress_bar.set_description(
                f"Fetching {current_date.strftime('%Y-%m-%d')}"
            )

            # Format dates for query
            query = f"submittedDate:[{current_date.strftime('%Y%m%d0000')} TO {next_date.strftime('%Y%m%d0000')}]"
            try:
                papers = get_arxiv_papers_api(
                    query=query,
                    fields=["title", "authors", "date", "abstract", "journal", "doi"],
                    verbose=False,
                )
                if not papers.empty:
                    for paper in papers.to_dict(orient="records"):
                        fp.write(json.dumps(paper) + "\n")
            except Exception as e:
                print(f"Arxiv scraping error: {current_date.strftime('%Y-%m-%d')}: {e}")
            current_date = next_date
            progress_bar.update(1)

biorxiv

Dump bioRxiv data in JSONL format.

biorxiv(start_date: Optional[str] = None, end_date: Optional[str] = None, save_path: str = save_path, max_retries: int = 10)

Fetches papers from biorxiv based on time range, i.e., start_date and end_date. If the start_date and end_date are not provided, papers will be fetched from biorxiv from the launch date of biorxiv until the current date. The fetched papers will be stored in jsonl format in save_path.

Parameters:

Name Type Description Default
start_date str

begin date expressed as YYYY-MM-DD. Defaults to None, i.e., earliest possible.

None
end_date str

end date expressed as YYYY-MM-DD. Defaults to None, i.e., today.

None
save_path str

Path where the dump is stored. Defaults to save_path.

save_path
max_retries int

Number of retries when API shows connection issues. Defaults to 10.

10
Source code in paperscraper/get_dumps/biorxiv.py
def biorxiv(
    start_date: Optional[str] = None,
    end_date: Optional[str] = None,
    save_path: str = save_path,
    max_retries: int = 10,
):
    """Fetches papers from biorxiv based on time range, i.e., start_date and end_date.
    If the start_date and end_date are not provided, papers will be fetched from biorxiv
    from the launch date of biorxiv until the current date. The fetched papers will be
    stored in jsonl format in save_path.

    Args:
        start_date (str, optional): begin date expressed as YYYY-MM-DD.
            Defaults to None, i.e., earliest possible.
        end_date (str, optional): end date expressed as YYYY-MM-DD.
            Defaults to None, i.e., today.
        save_path (str, optional): Path where the dump is stored.
            Defaults to save_path.
        max_retries (int, optional): Number of retries when API shows connection issues.
            Defaults to 10.
    """
    # create API client
    api = BioRxivApi(max_retries=max_retries)

    # dump all papers
    with open(save_path, "w") as fp:
        for index, paper in enumerate(
            tqdm(api.get_papers(start_date=start_date, end_date=end_date))
        ):
            if index > 0:
                fp.write(os.linesep)
            fp.write(json.dumps(paper))

chemrxiv

Dump chemRxiv data in JSONL format.

chemrxiv(start_date: Optional[str] = None, end_date: Optional[str] = None, save_path: str = SAVE_PATH) -> None

Fetches papers from bichemrxiv based on time range, i.e., start_date and end_date. If the start_date and end_date are not provided, papers will be fetched from chemrxiv from the launch date of chemrxiv until the current date. The fetched papers will be stored in jsonl format in save_path.

Parameters:

Name Type Description Default
start_date str

begin date expressed as YYYY-MM-DD. Defaults to None, i.e., earliest possible.

None
end_date str

end date expressed as YYYY-MM-DD. Defaults to None, i.e., today.

None
save_path str

Path where the dump is stored. Defaults to SAVE_PATH.

SAVE_PATH
Source code in paperscraper/get_dumps/chemrxiv.py
def chemrxiv(
    start_date: Optional[str] = None,
    end_date: Optional[str] = None,
    save_path: str = SAVE_PATH,
) -> None:
    """Fetches papers from bichemrxiv based on time range, i.e., start_date and end_date.
    If the start_date and end_date are not provided, papers will be fetched from chemrxiv
    from the launch date of chemrxiv until the current date. The fetched papers will be
    stored in jsonl format in save_path.

    Args:
        start_date (str, optional): begin date expressed as YYYY-MM-DD.
            Defaults to None, i.e., earliest possible.
        end_date (str, optional): end date expressed as YYYY-MM-DD.
            Defaults to None, i.e., today.
        save_path (str, optional): Path where the dump is stored.
            Defaults to SAVE_PATH.
    """

    if save_path == SAVE_PATH and (start_date is not None or end_date is not None):
        start_part = start_date or "2017-01-01"
        end_part = end_date or today
        save_path = os.path.join(save_folder, f"chemrxiv_{start_part}_{end_part}.jsonl")

    # create API client
    api = ChemrxivAPI(start_date, end_date)
    try:
        # Download the data
        download_full(save_folder, api)
        # Convert to JSONL format.
        parse_dump(save_folder, save_path)
    except PermissionError:
        logger.warning(
            "ChemRxiv OpenEngage API is blocked (403). Falling back to Crossref."
        )
        crossref_start = start_date or "2017-01-01"
        crossref_end = end_date or today
        crossref_api = CrossrefChemrxivAPI(crossref_start, crossref_end)
        download_full_crossref(save_folder, crossref_api)
        parse_dump_crossref(save_folder, save_path)

medrxiv

Dump medrxiv data in JSONL format.

medrxiv(start_date: Optional[str] = None, end_date: Optional[str] = None, save_path: str = save_path, max_retries: int = 10)

Fetches papers from medrxiv based on time range, i.e., start_date and end_date. If the start_date and end_date are not provided, then papers will be fetched from medrxiv starting from the launch date of medrxiv until current date. The fetched papers will be stored in jsonl format in save_path.

Parameters:

Name Type Description Default
start_date str

begin date expressed as YYYY-MM-DD. Defaults to None, i.e., earliest possible.

None
end_date str

end date expressed as YYYY-MM-DD. Defaults to None, i.e., today.

None
save_path str

Path where the dump is stored. Defaults to save_path.

save_path
max_retries int

Number of retries when API shows connection issues. Defaults to 10.

10
Source code in paperscraper/get_dumps/medrxiv.py
def medrxiv(
    start_date: Optional[str] = None,
    end_date: Optional[str] = None,
    save_path: str = save_path,
    max_retries: int = 10,
):
    """Fetches papers from medrxiv based on time range, i.e., start_date and end_date.
    If the start_date and end_date are not provided, then papers will be fetched from
    medrxiv starting from the launch date of medrxiv until current date. The fetched
    papers will be stored in jsonl format in save_path.

    Args:
        start_date (str, optional): begin date expressed as YYYY-MM-DD.
            Defaults to None, i.e., earliest possible.
        end_date (str, optional): end date expressed as YYYY-MM-DD.
            Defaults to None, i.e., today.
        save_path (str, optional): Path where the dump is stored.
            Defaults to save_path.
        max_retries (int, optional): Number of retries when API shows connection issues.
            Defaults to 10.
    """
    # create API client
    api = MedRxivApi(max_retries=max_retries)
    # dump all papers
    with open(save_path, "w") as fp:
        for index, paper in enumerate(
            tqdm(api.get_papers(start_date=start_date, end_date=end_date))
        ):
            if index > 0:
                fp.write(os.linesep)
            fp.write(json.dumps(paper))

utils

chemrxiv

CrossrefChemrxivAPI

Fetch ChemRxiv metadata from Crossref.

This class queries Crossref's Works endpoint filtered by the ChemRxiv DOI prefix (10.26434) and date range. Results are fetched using cursor-based pagination.

Source code in paperscraper/get_dumps/utils/chemrxiv/crossref_api.py
class CrossrefChemrxivAPI:
    """Fetch ChemRxiv metadata from Crossref.

    This class queries Crossref's Works endpoint filtered by the ChemRxiv DOI
    prefix (``10.26434``) and date range. Results are fetched using cursor-based
    pagination.
    """

    base_url = "https://api.crossref.org/works"
    chemrxiv_prefix = "10.26434"

    def __init__(
        self,
        start_date: str,
        end_date: str,
        page_size: int = 1000,
        max_retries: int = 10,
        mailto: Optional[str] = None,
        request_delay_seconds: float = 0.35,
    ):
        """Initialize the Crossref fallback client.

        Args:
            start_date: Start of the posted-date range (YYYY-MM-DD).
            end_date: End of the posted-date range (YYYY-MM-DD).
            page_size: Number of results per page (Crossref max is 1000).
            max_retries: Max retries for transient HTTP status codes.
            mailto: Optional contact email to include in the request (Crossref
                recommends this for polite usage).
            request_delay_seconds: Delay between page requests. This is used to
                avoid hammering Crossref and also keeps long-range dumps from
                completing too quickly in tests that expect the dumper to be
                long-running.
        """
        self.start_date = start_date
        self.end_date = end_date
        self.page_size = min(max(1, page_size), 1000)
        self.max_retries = max_retries
        self.mailto = mailto
        self.request_delay_seconds = max(0.0, request_delay_seconds)

    def iter_items(self) -> Generator[Dict, None, None]:
        """Iterate over raw Crossref work items for the configured date range.

        Yields:
            A dict for each work item as returned by Crossref's Works API.

        Raises:
            requests.HTTPError: If the request fails with a non-retryable status
                code, or if retries are exhausted.
        """
        cursor = "*"
        last_first_doi: Optional[str] = None
        repeated_first_doi_count = 0
        params = {
            "rows": self.page_size,
            "cursor": cursor,
            "filter": ",".join(
                [
                    f"prefix:{self.chemrxiv_prefix}",
                    "type:posted-content",
                    f"from-posted-date:{self.start_date}",
                    f"until-posted-date:{self.end_date}",
                ]
            ),
        }
        if self.mailto:
            params["mailto"] = self.mailto

        while True:
            params["cursor"] = cursor
            data = self._request(params=params)
            message = data.get("message", {}) or {}
            items = message.get("items", []) or []
            for item in items:
                yield item

            next_cursor = message.get("next-cursor")
            if not items or not next_cursor:
                break
            cursor = next_cursor

            # Crossref's cursor token may remain stable while the server-side
            # iterator advances. As a safety net, detect if we seem stuck
            # returning the same page repeatedly.
            first_doi = (items[0].get("DOI") or "") if items else ""
            if first_doi and first_doi == last_first_doi:
                repeated_first_doi_count += 1
                if repeated_first_doi_count >= 3:
                    logger.warning(
                        "Crossref cursor appears stuck (repeating the same first DOI); stopping pagination."
                    )
                    break
            else:
                repeated_first_doi_count = 0
                last_first_doi = first_doi

            # Avoid hammering Crossref in tight loops (and keep the default
            # dump long-running for large ranges).
            if self.request_delay_seconds:
                sleep(self.request_delay_seconds)

    def _request(self, params: Dict) -> Dict:
        """Send a single request to Crossref with basic retry/backoff logic.

        Args:
            params: Query parameters to send to the Crossref Works endpoint.

        Returns:
            Parsed JSON response as a dict.

        Raises:
            requests.HTTPError: If the request fails with a non-retryable status
                code, or if retries are exhausted.
        """
        transient_status = {429, 500, 502, 503, 504}
        backoff = 0.2

        headers = {
            "Accept": "application/json",
            "User-Agent": "paperscraper (Crossref fallback)",
        }

        for attempt in range(self.max_retries):
            resp = requests.get(self.base_url, params=params, headers=headers, timeout=30)
            if resp.status_code in transient_status:
                logger.warning(
                    f"Crossref returned {resp.status_code} (attempt {attempt + 1}/{self.max_retries}); "
                    f"retrying in {backoff:.1f}s"
                )
                if attempt + 1 == self.max_retries:
                    resp.raise_for_status()
                sleep(backoff)
                backoff = min(60.0, backoff * 2)
                continue
            resp.raise_for_status()
            return resp.json()
__init__(start_date: str, end_date: str, page_size: int = 1000, max_retries: int = 10, mailto: Optional[str] = None, request_delay_seconds: float = 0.35)

Initialize the Crossref fallback client.

Parameters:

Name Type Description Default
start_date str

Start of the posted-date range (YYYY-MM-DD).

required
end_date str

End of the posted-date range (YYYY-MM-DD).

required
page_size int

Number of results per page (Crossref max is 1000).

1000
max_retries int

Max retries for transient HTTP status codes.

10
mailto Optional[str]

Optional contact email to include in the request (Crossref recommends this for polite usage).

None
request_delay_seconds float

Delay between page requests. This is used to avoid hammering Crossref and also keeps long-range dumps from completing too quickly in tests that expect the dumper to be long-running.

0.35
Source code in paperscraper/get_dumps/utils/chemrxiv/crossref_api.py
def __init__(
    self,
    start_date: str,
    end_date: str,
    page_size: int = 1000,
    max_retries: int = 10,
    mailto: Optional[str] = None,
    request_delay_seconds: float = 0.35,
):
    """Initialize the Crossref fallback client.

    Args:
        start_date: Start of the posted-date range (YYYY-MM-DD).
        end_date: End of the posted-date range (YYYY-MM-DD).
        page_size: Number of results per page (Crossref max is 1000).
        max_retries: Max retries for transient HTTP status codes.
        mailto: Optional contact email to include in the request (Crossref
            recommends this for polite usage).
        request_delay_seconds: Delay between page requests. This is used to
            avoid hammering Crossref and also keeps long-range dumps from
            completing too quickly in tests that expect the dumper to be
            long-running.
    """
    self.start_date = start_date
    self.end_date = end_date
    self.page_size = min(max(1, page_size), 1000)
    self.max_retries = max_retries
    self.mailto = mailto
    self.request_delay_seconds = max(0.0, request_delay_seconds)
iter_items() -> Generator[Dict, None, None]

Iterate over raw Crossref work items for the configured date range.

Yields:

Type Description
Dict

A dict for each work item as returned by Crossref's Works API.

Raises:

Type Description
HTTPError

If the request fails with a non-retryable status code, or if retries are exhausted.

Source code in paperscraper/get_dumps/utils/chemrxiv/crossref_api.py
def iter_items(self) -> Generator[Dict, None, None]:
    """Iterate over raw Crossref work items for the configured date range.

    Yields:
        A dict for each work item as returned by Crossref's Works API.

    Raises:
        requests.HTTPError: If the request fails with a non-retryable status
            code, or if retries are exhausted.
    """
    cursor = "*"
    last_first_doi: Optional[str] = None
    repeated_first_doi_count = 0
    params = {
        "rows": self.page_size,
        "cursor": cursor,
        "filter": ",".join(
            [
                f"prefix:{self.chemrxiv_prefix}",
                "type:posted-content",
                f"from-posted-date:{self.start_date}",
                f"until-posted-date:{self.end_date}",
            ]
        ),
    }
    if self.mailto:
        params["mailto"] = self.mailto

    while True:
        params["cursor"] = cursor
        data = self._request(params=params)
        message = data.get("message", {}) or {}
        items = message.get("items", []) or []
        for item in items:
            yield item

        next_cursor = message.get("next-cursor")
        if not items or not next_cursor:
            break
        cursor = next_cursor

        # Crossref's cursor token may remain stable while the server-side
        # iterator advances. As a safety net, detect if we seem stuck
        # returning the same page repeatedly.
        first_doi = (items[0].get("DOI") or "") if items else ""
        if first_doi and first_doi == last_first_doi:
            repeated_first_doi_count += 1
            if repeated_first_doi_count >= 3:
                logger.warning(
                    "Crossref cursor appears stuck (repeating the same first DOI); stopping pagination."
                )
                break
        else:
            repeated_first_doi_count = 0
            last_first_doi = first_doi

        # Avoid hammering Crossref in tight loops (and keep the default
        # dump long-running for large ranges).
        if self.request_delay_seconds:
            sleep(self.request_delay_seconds)
crossref_item_to_paper(item: Dict) -> Dict

Convert a Crossref work item into the ChemRxiv dump schema.

Parameters:

Name Type Description Default
item Dict

A single work item dict from Crossref's Works API.

required

Returns:

Type Description
Dict

A dict compatible with the JSONL dump schema used for ChemRxiv in this

Dict

package.

Source code in paperscraper/get_dumps/utils/chemrxiv/crossref_api.py
def crossref_item_to_paper(item: Dict) -> Dict:
    """Convert a Crossref work item into the ChemRxiv dump schema.

    Args:
        item: A single work item dict from Crossref's Works API.

    Returns:
        A dict compatible with the JSONL dump schema used for ChemRxiv in this
        package.
    """
    title_list: List[str] = item.get("title") or []
    title = title_list[0] if title_list else ""

    doi = item.get("DOI") or ""

    authors = []
    for a in item.get("author") or []:
        given = (a.get("given") or "").strip()
        family = (a.get("family") or "").strip()
        full = " ".join([p for p in [given, family] if p])
        if full:
            authors.append(full)
    authors_str = "; ".join(authors)

    date_parts = (item.get("posted") or {}).get("date-parts") or []
    if not date_parts:
        date_parts = (item.get("issued") or {}).get("date-parts") or []
    if date_parts and date_parts[0]:
        parts = date_parts[0]
        year = parts[0]
        month = parts[1] if len(parts) > 1 else 1
        day = parts[2] if len(parts) > 2 else 1
        date = f"{year:04d}-{month:02d}-{day:02d}"
    else:
        date = ""

    published_doi = "N.A."
    published_url = "N.A."
    rel = item.get("relation") or {}
    is_preprint_of = rel.get("is-preprint-of") or []
    if is_preprint_of:
        candidate = is_preprint_of[0].get("id")
        if candidate:
            published_doi = candidate
            published_url = f"https://doi.org/{candidate}"

    license_str = "N.A."
    licenses = item.get("license") or []
    if licenses:
        license_str = licenses[0].get("URL") or license_str

    return {
        "title": title,
        "doi": doi,
        "published_doi": published_doi,
        "published_url": published_url,
        "authors": authors_str,
        "abstract": "",
        "date": date,
        "journal": "chemRxiv",
        "categories": "",
        "metrics": {},
        "license": license_str,
        "url": (item.get("resource") or {}).get("primary", {}).get("URL") or "",
    }
get_author(author_list: List[Dict]) -> str

Parse ChemRxiv dump entry to extract author list

Parameters:

Name Type Description Default
author_list list

List of dicts, one per author.

required

Returns:

Name Type Description
str str

;-concatenated author list.

Source code in paperscraper/get_dumps/utils/chemrxiv/utils.py
def get_author(author_list: List[Dict]) -> str:
    """Parse ChemRxiv dump entry to extract author list

    Args:
        author_list (list): List of dicts, one per author.

    Returns:
        str: ;-concatenated author list.
    """

    return "; ".join([" ".join([a["firstName"], a["lastName"]]) for a in author_list])
get_categories(category_list: List[Dict]) -> str

Parse ChemRxiv dump entry to extract the categories of the paper

Parameters:

Name Type Description Default
category_list list

List of dicts, one per category.

required

Returns:

Name Type Description
str str

;-concatenated category list.

Source code in paperscraper/get_dumps/utils/chemrxiv/utils.py
def get_categories(category_list: List[Dict]) -> str:
    """Parse ChemRxiv dump entry to extract the categories of the paper

    Args:
        category_list (list): List of dicts, one per category.

    Returns:
        str: ;-concatenated category list.
    """

    return "; ".join([a["name"] for a in category_list])
get_date(datestring: str) -> str

Get the date of a chemrxiv dump enry.

Parameters:

Name Type Description Default
datestring str

String in the format: 2021-10-15T05:12:32.356Z

required

Returns:

Name Type Description
str str

Date in the format: YYYY-MM-DD.

Source code in paperscraper/get_dumps/utils/chemrxiv/utils.py
def get_date(datestring: str) -> str:
    """Get the date of a chemrxiv dump enry.

    Args:
        datestring: String in the format: 2021-10-15T05:12:32.356Z

    Returns:
        str: Date in the format: YYYY-MM-DD.
    """
    return datestring.split("T")[0]
get_metrics(metrics_list: List[Dict]) -> Dict

Parse ChemRxiv dump entry to extract the access metrics of the paper.

Parameters:

Name Type Description Default
metrics_list List[Dict]

A list of single-keyed, dictionaries each containing key and value for exactly one metric.

required

Returns:

Name Type Description
Dict Dict

A flattened dictionary with all metrics and a timestamp

Source code in paperscraper/get_dumps/utils/chemrxiv/utils.py
def get_metrics(metrics_list: List[Dict]) -> Dict:
    """
    Parse ChemRxiv dump entry to extract the access metrics of the paper.

    Args:
        metrics_list (List[Dict]): A list of single-keyed, dictionaries each
            containing key and value for exactly one metric.

    Returns:
        Dict: A flattened dictionary with all metrics and a timestamp
    """
    metric_dict = {m["description"]: m["value"] for m in metrics_list}

    # This assumes that the .jsonl is constructed at roughly the same date
    # where this entry was obtained from the API
    metric_dict.update({"timestamp": today})
parse_dump(source_path: str, target_path: str) -> None

Parses the dump as generated by the chemrXiv API and this repo: https://github.com/cthoyt/chemrxiv-summarize into a format that is equal to that of biorXiv and medRxiv.

NOTE: This is a lazy parser trying to store all data in memory.

Parameters:

Name Type Description Default
source_path str

Path to the source dump

required
Source code in paperscraper/get_dumps/utils/chemrxiv/utils.py
def parse_dump(source_path: str, target_path: str) -> None:
    """
    Parses the dump as generated by the chemrXiv API and this repo:
    https://github.com/cthoyt/chemrxiv-summarize
    into a format that is equal to that of biorXiv and medRxiv.

    NOTE: This is a lazy parser trying to store all data in memory.

    Args:
        source_path: Path to the source dump
    """

    dump = []
    # Read source dump
    for file_name in tqdm(os.listdir(source_path)):
        if not file_name.endswith(".json"):
            continue
        filepath = os.path.join(source_path, file_name)
        if os.path.getsize(filepath) == 0:
            logger.warning(f"Empty chemRxiv dump file {filepath}; skipping.")
            os.remove(filepath)
            continue
        try:
            with open(filepath, "r") as f:
                source_paper = json.load(f)
        except JSONDecodeError as exc:
            logger.warning(f"Invalid JSON in chemRxiv dump file {filepath}: {exc}")
            os.remove(filepath)
            continue

        target_paper = {
            "title": source_paper["title"],
            "doi": source_paper["doi"],
            "published_doi": (
                source_paper["vor"]["vorDoi"] if source_paper["vor"] else "N.A."
            ),
            "published_url": (
                source_paper["vor"]["url"] if source_paper["vor"] else "N.A."
            ),
            "authors": get_author(source_paper["authors"]),
            "abstract": source_paper["abstract"],
            "date": get_date(source_paper["statusDate"]),
            "journal": "chemRxiv",
            "categories": get_categories(source_paper["categories"]),
            "metrics": get_metrics(source_paper["metrics"]),
            "license": source_paper["license"]["name"],
        }
        dump.append(target_paper)
        os.remove(filepath)
    # Write dump
    with open(target_path, "w") as f:
        for idx, target_paper in enumerate(dump):
            if idx > 0:
                f.write(os.linesep)
            f.write(json.dumps(target_paper))
    logger.info("Done, shutting down")
download_full_crossref(save_dir: str, api: Optional[CrossrefChemrxivAPI] = None) -> None

Download ChemRxiv records via Crossref into per-item JSON payloads.

This mirrors the behavior of the OpenEngage backend by storing one JSON payload per record in save_dir. The payloads are raw Crossref work items.

Parameters:

Name Type Description Default
save_dir str

Directory where per-item payloads are stored.

required
api Optional[CrossrefChemrxivAPI]

Crossref API client. If None, uses the widest possible date range.

None
Source code in paperscraper/get_dumps/utils/chemrxiv/utils.py
def download_full_crossref(
    save_dir: str, api: Optional[CrossrefChemrxivAPI] = None
) -> None:
    """Download ChemRxiv records via Crossref into per-item JSON payloads.

    This mirrors the behavior of the OpenEngage backend by
    storing one JSON payload per record in ``save_dir``. The payloads are raw
    Crossref work items.

    Args:
        save_dir: Directory where per-item payloads are stored.
        api: Crossref API client. If None, uses the widest possible date range.
    """
    if api is None:
        api = CrossrefChemrxivAPI(start_date="2017-01-01", end_date=today)
    os.makedirs(save_dir, exist_ok=True)

    for item in tqdm(api.iter_items()):
        doi = (item.get("DOI") or "").strip().lower()
        if not doi:
            continue
        safe_name = doi.replace("/", "_")
        path = os.path.join(save_dir, f"{safe_name}.json")
        if os.path.exists(path):
            continue

        tmp_path = f"{path}.tmp"
        try:
            with open(tmp_path, "w") as file:
                json.dump(item, file, indent=2)
            os.replace(tmp_path, path)
        finally:
            if os.path.exists(tmp_path):
                os.remove(tmp_path)
parse_dump_crossref(source_path: str, target_path: str) -> None

Parse Crossref payloads into the ChemRxiv JSONL dump format.

Parameters:

Name Type Description Default
source_path str

Directory containing per-item Crossref JSON payloads.

required
target_path str

JSONL output path.

required
Source code in paperscraper/get_dumps/utils/chemrxiv/utils.py
def parse_dump_crossref(source_path: str, target_path: str) -> None:
    """Parse Crossref payloads into the ChemRxiv JSONL dump format.

    Args:
        source_path: Directory containing per-item Crossref JSON payloads.
        target_path: JSONL output path.
    """
    dump: List[Dict] = []

    for file_name in tqdm(os.listdir(source_path)):
        if not file_name.endswith(".json"):
            continue
        filepath = os.path.join(source_path, file_name)
        if os.path.getsize(filepath) == 0:
            logger.warning(f"Empty Crossref payload file {filepath}; skipping.")
            os.remove(filepath)
            continue
        try:
            with open(filepath, "r") as f:
                source_item = json.load(f)
        except JSONDecodeError as exc:
            logger.warning(f"Invalid JSON in Crossref payload file {filepath}: {exc}")
            os.remove(filepath)
            continue

        dump.append(crossref_item_to_paper(source_item))
        os.remove(filepath)

    with open(target_path, "w") as f:
        for idx, paper in enumerate(dump):
            if idx > 0:
                f.write(os.linesep)
            f.write(json.dumps(paper))
    logger.info("Done, shutting down")
chemrxiv_api
ChemrxivAPI

Handle OpenEngage API requests, using access. Adapted from https://github.com/fxcoudert/tools/blob/master/chemRxiv/chemRxiv.py.

Source code in paperscraper/get_dumps/utils/chemrxiv/chemrxiv_api.py
class ChemrxivAPI:
    """Handle OpenEngage API requests, using access.
    Adapted from https://github.com/fxcoudert/tools/blob/master/chemRxiv/chemRxiv.py.
    """

    base_primary = "https://chemrxiv.org/engage/chemrxiv/public-api/v1/"

    def __init__(
        self,
        start_date: Optional[str] = None,
        end_date: Optional[str] = None,
        page_size: Optional[int] = None,
        max_retries: int = 10,
    ):
        """
        Initialize API class.

        Args:
            start_date (Optional[str], optional): begin date expressed as YYYY-MM-DD.
                Defaults to None.
            end_date (Optional[str], optional): end date expressed as YYYY-MM-DD.
                Defaults to None.
            page_size (int, optional): The batch size used to fetch the records from chemrxiv.
            max_retries (int): Number of retries in case of error
        """

        self.page_size = page_size or 50
        self.max_retries = max_retries
        self._origin_filter = None
        self._set_base(self.base_primary)

        # Begin Date and End Date of the search
        launch_date = launch_dates["chemrxiv"]
        launch_datetime = datetime.fromisoformat(launch_date)

        if start_date:
            start_datetime = datetime.fromisoformat(start_date)
            if start_datetime < launch_datetime:
                self.start_date = launch_date
                logger.warning(
                    f"Begin date {start_date} is before chemrxiv launch date. Will use {launch_date} instead."
                )
            else:
                self.start_date = start_date
        else:
            self.start_date = launch_date
        if end_date:
            end_datetime = datetime.fromisoformat(end_date)
            if end_datetime > now_datetime:
                logger.warning(
                    f"End date {end_date} is in the future. Will use {now_datetime} instead."
                )
                self.end_date = now_datetime.strftime("%Y-%m-%d")
            else:
                self.end_date = end_date
        else:
            self.end_date = now_datetime.strftime("%Y-%m-%d")

    def request(self, url, method, params=None, parse_json: bool = False):
        """Send an API request to open Engage."""

        headers = {
            "Accept-Encoding": "identity",
            "Accept": "application/json",
            "User-Agent": "paperscraper",
        }
        retryable = (
            ChunkedEncodingError,
            ContentDecodingError,
            DecodeError,
            ReadTimeout,
            ConnectionError,
        )
        transient_status = {429, 500, 502, 503, 504}
        backoff = 0.1

        for attempt in range(self.max_retries):
            try:
                if method.casefold() == "get":
                    response = requests.get(
                        url, params=params, headers=headers, timeout=(5, 30)
                    )
                elif method.casefold() == "post":
                    response = requests.post(
                        url, json=params, headers=headers, timeout=(5, 30)
                    )
                else:
                    raise ConnectionError(f"Unknown method for query: {method}")
                if response.status_code in transient_status:
                    logger.warning(
                        f"{response.status_code} for {url} (attempt {attempt + 1}/{self.max_retries}); retrying in {backoff:.1f}s"
                    )
                    if attempt + 1 == self.max_retries:
                        response.raise_for_status()
                    sleep(backoff)
                    backoff = min(60.0, backoff * 2)
                    continue
                elif 400 <= response.status_code < 500:
                    response.raise_for_status()
                if not parse_json:
                    return response

                try:
                    return response.json()
                except JSONDecodeError:
                    logger.warning(
                        f"JSONDecodeError for {response.url} "
                        f"(attempt {attempt + 1}/{self.max_retries}); retrying in {backoff:.1f}s"
                    )
                    if attempt + 1 == self.max_retries:
                        raise
                    sleep(backoff)
                    backoff = min(60.0, backoff * 2)
                    continue

            except retryable as e:
                logger.warning(
                    f"{e.__class__.__name__} for {url} (attempt {attempt + 1}/{self.max_retries}); "
                    f"retrying in {backoff:.1f}s"
                )
                if attempt + 1 == self.max_retries:
                    raise
                sleep(backoff)
                backoff = min(60.0, backoff * 2)

    def query(self, query, method="get", params=None):
        """Perform a direct query."""

        return self.request(
            urljoin(self.base, query), method, params=params, parse_json=True
        )

    def query_generator(
        self, query, method: str = "get", params: Optional[Dict] = None
    ):
        """Query for a list of items, with paging. Returns a generator."""

        start_datetime = datetime.fromisoformat(self.start_date)
        end_datetime = datetime.fromisoformat(self.end_date)

        def year_windows():
            year = start_datetime.year
            while year <= end_datetime.year:
                year_start = datetime(year, 1, 1)
                year_end = datetime(year, 12, 31)
                win_start = max(start_datetime, year_start)
                win_end = min(end_datetime, year_end)
                yield win_start.strftime("%Y-%m-%d"), win_end.strftime("%Y-%m-%d")
                year += 1

        params = (params or {}).copy()

        for year_from, year_to in year_windows():
            logger.info(f"Starting to scrape data from {year_from} to {year_to}")
            page = 0
            while True:
                params.update(
                    {
                        "limit": self.page_size,
                        "skip": page * self.page_size,
                        "searchDateFrom": year_from,
                        "searchDateTo": year_to,
                    }
                )
                try:
                    data = self.request(
                        urljoin(self.base, query),
                        method,
                        params=params,
                        parse_json=True,
                    )
                except requests.HTTPError as e:
                    status = getattr(e.response, "status_code", None)
                    if status == 403 and query == "items":
                        raise PermissionError(
                            "ChemRxiv OpenEngage API returned 403 (likely Cloudflare / bot protection)."
                        ) from e
                    logger.warning(
                        f"Stopping year window {year_from}..{year_to} at skip={page * self.page_size} "
                        f"due to HTTPError {status}"
                    )
                    break
                items = data.get("itemHits", [])
                if not items:
                    break
                for item in items:
                    yield item
                page += 1

    def all_preprints(self):
        """Return a generator to all the chemRxiv articles."""
        return self.query_generator("items")

    def preprint(self, article_id):
        """Information on a given preprint.
        .. seealso:: https://docs.figshare.com/#public_article
        """
        return self.query(os.path.join("items", article_id))

    def number_of_preprints(self):
        return self.query("items")["totalCount"]

    def _set_base(self, base_url: str) -> None:
        self.base = base_url
__init__(start_date: Optional[str] = None, end_date: Optional[str] = None, page_size: Optional[int] = None, max_retries: int = 10)

Initialize API class.

Parameters:

Name Type Description Default
start_date Optional[str]

begin date expressed as YYYY-MM-DD. Defaults to None.

None
end_date Optional[str]

end date expressed as YYYY-MM-DD. Defaults to None.

None
page_size int

The batch size used to fetch the records from chemrxiv.

None
max_retries int

Number of retries in case of error

10
Source code in paperscraper/get_dumps/utils/chemrxiv/chemrxiv_api.py
def __init__(
    self,
    start_date: Optional[str] = None,
    end_date: Optional[str] = None,
    page_size: Optional[int] = None,
    max_retries: int = 10,
):
    """
    Initialize API class.

    Args:
        start_date (Optional[str], optional): begin date expressed as YYYY-MM-DD.
            Defaults to None.
        end_date (Optional[str], optional): end date expressed as YYYY-MM-DD.
            Defaults to None.
        page_size (int, optional): The batch size used to fetch the records from chemrxiv.
        max_retries (int): Number of retries in case of error
    """

    self.page_size = page_size or 50
    self.max_retries = max_retries
    self._origin_filter = None
    self._set_base(self.base_primary)

    # Begin Date and End Date of the search
    launch_date = launch_dates["chemrxiv"]
    launch_datetime = datetime.fromisoformat(launch_date)

    if start_date:
        start_datetime = datetime.fromisoformat(start_date)
        if start_datetime < launch_datetime:
            self.start_date = launch_date
            logger.warning(
                f"Begin date {start_date} is before chemrxiv launch date. Will use {launch_date} instead."
            )
        else:
            self.start_date = start_date
    else:
        self.start_date = launch_date
    if end_date:
        end_datetime = datetime.fromisoformat(end_date)
        if end_datetime > now_datetime:
            logger.warning(
                f"End date {end_date} is in the future. Will use {now_datetime} instead."
            )
            self.end_date = now_datetime.strftime("%Y-%m-%d")
        else:
            self.end_date = end_date
    else:
        self.end_date = now_datetime.strftime("%Y-%m-%d")
request(url, method, params=None, parse_json: bool = False)

Send an API request to open Engage.

Source code in paperscraper/get_dumps/utils/chemrxiv/chemrxiv_api.py
def request(self, url, method, params=None, parse_json: bool = False):
    """Send an API request to open Engage."""

    headers = {
        "Accept-Encoding": "identity",
        "Accept": "application/json",
        "User-Agent": "paperscraper",
    }
    retryable = (
        ChunkedEncodingError,
        ContentDecodingError,
        DecodeError,
        ReadTimeout,
        ConnectionError,
    )
    transient_status = {429, 500, 502, 503, 504}
    backoff = 0.1

    for attempt in range(self.max_retries):
        try:
            if method.casefold() == "get":
                response = requests.get(
                    url, params=params, headers=headers, timeout=(5, 30)
                )
            elif method.casefold() == "post":
                response = requests.post(
                    url, json=params, headers=headers, timeout=(5, 30)
                )
            else:
                raise ConnectionError(f"Unknown method for query: {method}")
            if response.status_code in transient_status:
                logger.warning(
                    f"{response.status_code} for {url} (attempt {attempt + 1}/{self.max_retries}); retrying in {backoff:.1f}s"
                )
                if attempt + 1 == self.max_retries:
                    response.raise_for_status()
                sleep(backoff)
                backoff = min(60.0, backoff * 2)
                continue
            elif 400 <= response.status_code < 500:
                response.raise_for_status()
            if not parse_json:
                return response

            try:
                return response.json()
            except JSONDecodeError:
                logger.warning(
                    f"JSONDecodeError for {response.url} "
                    f"(attempt {attempt + 1}/{self.max_retries}); retrying in {backoff:.1f}s"
                )
                if attempt + 1 == self.max_retries:
                    raise
                sleep(backoff)
                backoff = min(60.0, backoff * 2)
                continue

        except retryable as e:
            logger.warning(
                f"{e.__class__.__name__} for {url} (attempt {attempt + 1}/{self.max_retries}); "
                f"retrying in {backoff:.1f}s"
            )
            if attempt + 1 == self.max_retries:
                raise
            sleep(backoff)
            backoff = min(60.0, backoff * 2)
query(query, method='get', params=None)

Perform a direct query.

Source code in paperscraper/get_dumps/utils/chemrxiv/chemrxiv_api.py
def query(self, query, method="get", params=None):
    """Perform a direct query."""

    return self.request(
        urljoin(self.base, query), method, params=params, parse_json=True
    )
query_generator(query, method: str = 'get', params: Optional[Dict] = None)

Query for a list of items, with paging. Returns a generator.

Source code in paperscraper/get_dumps/utils/chemrxiv/chemrxiv_api.py
def query_generator(
    self, query, method: str = "get", params: Optional[Dict] = None
):
    """Query for a list of items, with paging. Returns a generator."""

    start_datetime = datetime.fromisoformat(self.start_date)
    end_datetime = datetime.fromisoformat(self.end_date)

    def year_windows():
        year = start_datetime.year
        while year <= end_datetime.year:
            year_start = datetime(year, 1, 1)
            year_end = datetime(year, 12, 31)
            win_start = max(start_datetime, year_start)
            win_end = min(end_datetime, year_end)
            yield win_start.strftime("%Y-%m-%d"), win_end.strftime("%Y-%m-%d")
            year += 1

    params = (params or {}).copy()

    for year_from, year_to in year_windows():
        logger.info(f"Starting to scrape data from {year_from} to {year_to}")
        page = 0
        while True:
            params.update(
                {
                    "limit": self.page_size,
                    "skip": page * self.page_size,
                    "searchDateFrom": year_from,
                    "searchDateTo": year_to,
                }
            )
            try:
                data = self.request(
                    urljoin(self.base, query),
                    method,
                    params=params,
                    parse_json=True,
                )
            except requests.HTTPError as e:
                status = getattr(e.response, "status_code", None)
                if status == 403 and query == "items":
                    raise PermissionError(
                        "ChemRxiv OpenEngage API returned 403 (likely Cloudflare / bot protection)."
                    ) from e
                logger.warning(
                    f"Stopping year window {year_from}..{year_to} at skip={page * self.page_size} "
                    f"due to HTTPError {status}"
                )
                break
            items = data.get("itemHits", [])
            if not items:
                break
            for item in items:
                yield item
            page += 1
all_preprints()

Return a generator to all the chemRxiv articles.

Source code in paperscraper/get_dumps/utils/chemrxiv/chemrxiv_api.py
def all_preprints(self):
    """Return a generator to all the chemRxiv articles."""
    return self.query_generator("items")
preprint(article_id)

Information on a given preprint. .. seealso:: https://docs.figshare.com/#public_article

Source code in paperscraper/get_dumps/utils/chemrxiv/chemrxiv_api.py
def preprint(self, article_id):
    """Information on a given preprint.
    .. seealso:: https://docs.figshare.com/#public_article
    """
    return self.query(os.path.join("items", article_id))
crossref_api

Crossref-based fallback for ChemRxiv dumps.

ChemRxiv's primary OpenEngage API can be blocked by Cloudflare (HTTP 403) in some environments. This module provides a fallback based on Crossref's public API using the ChemRxiv DOI prefix (10.26434).

NOTE

Crossref does not expose ChemRxiv abstracts, categories, or usage metrics. Those fields are therefore left empty in the converted dump format.

CrossrefChemrxivAPI

Fetch ChemRxiv metadata from Crossref.

This class queries Crossref's Works endpoint filtered by the ChemRxiv DOI prefix (10.26434) and date range. Results are fetched using cursor-based pagination.

Source code in paperscraper/get_dumps/utils/chemrxiv/crossref_api.py
class CrossrefChemrxivAPI:
    """Fetch ChemRxiv metadata from Crossref.

    This class queries Crossref's Works endpoint filtered by the ChemRxiv DOI
    prefix (``10.26434``) and date range. Results are fetched using cursor-based
    pagination.
    """

    base_url = "https://api.crossref.org/works"
    chemrxiv_prefix = "10.26434"

    def __init__(
        self,
        start_date: str,
        end_date: str,
        page_size: int = 1000,
        max_retries: int = 10,
        mailto: Optional[str] = None,
        request_delay_seconds: float = 0.35,
    ):
        """Initialize the Crossref fallback client.

        Args:
            start_date: Start of the posted-date range (YYYY-MM-DD).
            end_date: End of the posted-date range (YYYY-MM-DD).
            page_size: Number of results per page (Crossref max is 1000).
            max_retries: Max retries for transient HTTP status codes.
            mailto: Optional contact email to include in the request (Crossref
                recommends this for polite usage).
            request_delay_seconds: Delay between page requests. This is used to
                avoid hammering Crossref and also keeps long-range dumps from
                completing too quickly in tests that expect the dumper to be
                long-running.
        """
        self.start_date = start_date
        self.end_date = end_date
        self.page_size = min(max(1, page_size), 1000)
        self.max_retries = max_retries
        self.mailto = mailto
        self.request_delay_seconds = max(0.0, request_delay_seconds)

    def iter_items(self) -> Generator[Dict, None, None]:
        """Iterate over raw Crossref work items for the configured date range.

        Yields:
            A dict for each work item as returned by Crossref's Works API.

        Raises:
            requests.HTTPError: If the request fails with a non-retryable status
                code, or if retries are exhausted.
        """
        cursor = "*"
        last_first_doi: Optional[str] = None
        repeated_first_doi_count = 0
        params = {
            "rows": self.page_size,
            "cursor": cursor,
            "filter": ",".join(
                [
                    f"prefix:{self.chemrxiv_prefix}",
                    "type:posted-content",
                    f"from-posted-date:{self.start_date}",
                    f"until-posted-date:{self.end_date}",
                ]
            ),
        }
        if self.mailto:
            params["mailto"] = self.mailto

        while True:
            params["cursor"] = cursor
            data = self._request(params=params)
            message = data.get("message", {}) or {}
            items = message.get("items", []) or []
            for item in items:
                yield item

            next_cursor = message.get("next-cursor")
            if not items or not next_cursor:
                break
            cursor = next_cursor

            # Crossref's cursor token may remain stable while the server-side
            # iterator advances. As a safety net, detect if we seem stuck
            # returning the same page repeatedly.
            first_doi = (items[0].get("DOI") or "") if items else ""
            if first_doi and first_doi == last_first_doi:
                repeated_first_doi_count += 1
                if repeated_first_doi_count >= 3:
                    logger.warning(
                        "Crossref cursor appears stuck (repeating the same first DOI); stopping pagination."
                    )
                    break
            else:
                repeated_first_doi_count = 0
                last_first_doi = first_doi

            # Avoid hammering Crossref in tight loops (and keep the default
            # dump long-running for large ranges).
            if self.request_delay_seconds:
                sleep(self.request_delay_seconds)

    def _request(self, params: Dict) -> Dict:
        """Send a single request to Crossref with basic retry/backoff logic.

        Args:
            params: Query parameters to send to the Crossref Works endpoint.

        Returns:
            Parsed JSON response as a dict.

        Raises:
            requests.HTTPError: If the request fails with a non-retryable status
                code, or if retries are exhausted.
        """
        transient_status = {429, 500, 502, 503, 504}
        backoff = 0.2

        headers = {
            "Accept": "application/json",
            "User-Agent": "paperscraper (Crossref fallback)",
        }

        for attempt in range(self.max_retries):
            resp = requests.get(self.base_url, params=params, headers=headers, timeout=30)
            if resp.status_code in transient_status:
                logger.warning(
                    f"Crossref returned {resp.status_code} (attempt {attempt + 1}/{self.max_retries}); "
                    f"retrying in {backoff:.1f}s"
                )
                if attempt + 1 == self.max_retries:
                    resp.raise_for_status()
                sleep(backoff)
                backoff = min(60.0, backoff * 2)
                continue
            resp.raise_for_status()
            return resp.json()
__init__(start_date: str, end_date: str, page_size: int = 1000, max_retries: int = 10, mailto: Optional[str] = None, request_delay_seconds: float = 0.35)

Initialize the Crossref fallback client.

Parameters:

Name Type Description Default
start_date str

Start of the posted-date range (YYYY-MM-DD).

required
end_date str

End of the posted-date range (YYYY-MM-DD).

required
page_size int

Number of results per page (Crossref max is 1000).

1000
max_retries int

Max retries for transient HTTP status codes.

10
mailto Optional[str]

Optional contact email to include in the request (Crossref recommends this for polite usage).

None
request_delay_seconds float

Delay between page requests. This is used to avoid hammering Crossref and also keeps long-range dumps from completing too quickly in tests that expect the dumper to be long-running.

0.35
Source code in paperscraper/get_dumps/utils/chemrxiv/crossref_api.py
def __init__(
    self,
    start_date: str,
    end_date: str,
    page_size: int = 1000,
    max_retries: int = 10,
    mailto: Optional[str] = None,
    request_delay_seconds: float = 0.35,
):
    """Initialize the Crossref fallback client.

    Args:
        start_date: Start of the posted-date range (YYYY-MM-DD).
        end_date: End of the posted-date range (YYYY-MM-DD).
        page_size: Number of results per page (Crossref max is 1000).
        max_retries: Max retries for transient HTTP status codes.
        mailto: Optional contact email to include in the request (Crossref
            recommends this for polite usage).
        request_delay_seconds: Delay between page requests. This is used to
            avoid hammering Crossref and also keeps long-range dumps from
            completing too quickly in tests that expect the dumper to be
            long-running.
    """
    self.start_date = start_date
    self.end_date = end_date
    self.page_size = min(max(1, page_size), 1000)
    self.max_retries = max_retries
    self.mailto = mailto
    self.request_delay_seconds = max(0.0, request_delay_seconds)
iter_items() -> Generator[Dict, None, None]

Iterate over raw Crossref work items for the configured date range.

Yields:

Type Description
Dict

A dict for each work item as returned by Crossref's Works API.

Raises:

Type Description
HTTPError

If the request fails with a non-retryable status code, or if retries are exhausted.

Source code in paperscraper/get_dumps/utils/chemrxiv/crossref_api.py
def iter_items(self) -> Generator[Dict, None, None]:
    """Iterate over raw Crossref work items for the configured date range.

    Yields:
        A dict for each work item as returned by Crossref's Works API.

    Raises:
        requests.HTTPError: If the request fails with a non-retryable status
            code, or if retries are exhausted.
    """
    cursor = "*"
    last_first_doi: Optional[str] = None
    repeated_first_doi_count = 0
    params = {
        "rows": self.page_size,
        "cursor": cursor,
        "filter": ",".join(
            [
                f"prefix:{self.chemrxiv_prefix}",
                "type:posted-content",
                f"from-posted-date:{self.start_date}",
                f"until-posted-date:{self.end_date}",
            ]
        ),
    }
    if self.mailto:
        params["mailto"] = self.mailto

    while True:
        params["cursor"] = cursor
        data = self._request(params=params)
        message = data.get("message", {}) or {}
        items = message.get("items", []) or []
        for item in items:
            yield item

        next_cursor = message.get("next-cursor")
        if not items or not next_cursor:
            break
        cursor = next_cursor

        # Crossref's cursor token may remain stable while the server-side
        # iterator advances. As a safety net, detect if we seem stuck
        # returning the same page repeatedly.
        first_doi = (items[0].get("DOI") or "") if items else ""
        if first_doi and first_doi == last_first_doi:
            repeated_first_doi_count += 1
            if repeated_first_doi_count >= 3:
                logger.warning(
                    "Crossref cursor appears stuck (repeating the same first DOI); stopping pagination."
                )
                break
        else:
            repeated_first_doi_count = 0
            last_first_doi = first_doi

        # Avoid hammering Crossref in tight loops (and keep the default
        # dump long-running for large ranges).
        if self.request_delay_seconds:
            sleep(self.request_delay_seconds)
crossref_item_to_paper(item: Dict) -> Dict

Convert a Crossref work item into the ChemRxiv dump schema.

Parameters:

Name Type Description Default
item Dict

A single work item dict from Crossref's Works API.

required

Returns:

Type Description
Dict

A dict compatible with the JSONL dump schema used for ChemRxiv in this

Dict

package.

Source code in paperscraper/get_dumps/utils/chemrxiv/crossref_api.py
def crossref_item_to_paper(item: Dict) -> Dict:
    """Convert a Crossref work item into the ChemRxiv dump schema.

    Args:
        item: A single work item dict from Crossref's Works API.

    Returns:
        A dict compatible with the JSONL dump schema used for ChemRxiv in this
        package.
    """
    title_list: List[str] = item.get("title") or []
    title = title_list[0] if title_list else ""

    doi = item.get("DOI") or ""

    authors = []
    for a in item.get("author") or []:
        given = (a.get("given") or "").strip()
        family = (a.get("family") or "").strip()
        full = " ".join([p for p in [given, family] if p])
        if full:
            authors.append(full)
    authors_str = "; ".join(authors)

    date_parts = (item.get("posted") or {}).get("date-parts") or []
    if not date_parts:
        date_parts = (item.get("issued") or {}).get("date-parts") or []
    if date_parts and date_parts[0]:
        parts = date_parts[0]
        year = parts[0]
        month = parts[1] if len(parts) > 1 else 1
        day = parts[2] if len(parts) > 2 else 1
        date = f"{year:04d}-{month:02d}-{day:02d}"
    else:
        date = ""

    published_doi = "N.A."
    published_url = "N.A."
    rel = item.get("relation") or {}
    is_preprint_of = rel.get("is-preprint-of") or []
    if is_preprint_of:
        candidate = is_preprint_of[0].get("id")
        if candidate:
            published_doi = candidate
            published_url = f"https://doi.org/{candidate}"

    license_str = "N.A."
    licenses = item.get("license") or []
    if licenses:
        license_str = licenses[0].get("URL") or license_str

    return {
        "title": title,
        "doi": doi,
        "published_doi": published_doi,
        "published_url": published_url,
        "authors": authors_str,
        "abstract": "",
        "date": date,
        "journal": "chemRxiv",
        "categories": "",
        "metrics": {},
        "license": license_str,
        "url": (item.get("resource") or {}).get("primary", {}).get("URL") or "",
    }
utils

Misc utils to download chemRxiv dump

get_author(author_list: List[Dict]) -> str

Parse ChemRxiv dump entry to extract author list

Parameters:

Name Type Description Default
author_list list

List of dicts, one per author.

required

Returns:

Name Type Description
str str

;-concatenated author list.

Source code in paperscraper/get_dumps/utils/chemrxiv/utils.py
def get_author(author_list: List[Dict]) -> str:
    """Parse ChemRxiv dump entry to extract author list

    Args:
        author_list (list): List of dicts, one per author.

    Returns:
        str: ;-concatenated author list.
    """

    return "; ".join([" ".join([a["firstName"], a["lastName"]]) for a in author_list])
get_categories(category_list: List[Dict]) -> str

Parse ChemRxiv dump entry to extract the categories of the paper

Parameters:

Name Type Description Default
category_list list

List of dicts, one per category.

required

Returns:

Name Type Description
str str

;-concatenated category list.

Source code in paperscraper/get_dumps/utils/chemrxiv/utils.py
def get_categories(category_list: List[Dict]) -> str:
    """Parse ChemRxiv dump entry to extract the categories of the paper

    Args:
        category_list (list): List of dicts, one per category.

    Returns:
        str: ;-concatenated category list.
    """

    return "; ".join([a["name"] for a in category_list])
get_date(datestring: str) -> str

Get the date of a chemrxiv dump enry.

Parameters:

Name Type Description Default
datestring str

String in the format: 2021-10-15T05:12:32.356Z

required

Returns:

Name Type Description
str str

Date in the format: YYYY-MM-DD.

Source code in paperscraper/get_dumps/utils/chemrxiv/utils.py
def get_date(datestring: str) -> str:
    """Get the date of a chemrxiv dump enry.

    Args:
        datestring: String in the format: 2021-10-15T05:12:32.356Z

    Returns:
        str: Date in the format: YYYY-MM-DD.
    """
    return datestring.split("T")[0]
get_metrics(metrics_list: List[Dict]) -> Dict

Parse ChemRxiv dump entry to extract the access metrics of the paper.

Parameters:

Name Type Description Default
metrics_list List[Dict]

A list of single-keyed, dictionaries each containing key and value for exactly one metric.

required

Returns:

Name Type Description
Dict Dict

A flattened dictionary with all metrics and a timestamp

Source code in paperscraper/get_dumps/utils/chemrxiv/utils.py
def get_metrics(metrics_list: List[Dict]) -> Dict:
    """
    Parse ChemRxiv dump entry to extract the access metrics of the paper.

    Args:
        metrics_list (List[Dict]): A list of single-keyed, dictionaries each
            containing key and value for exactly one metric.

    Returns:
        Dict: A flattened dictionary with all metrics and a timestamp
    """
    metric_dict = {m["description"]: m["value"] for m in metrics_list}

    # This assumes that the .jsonl is constructed at roughly the same date
    # where this entry was obtained from the API
    metric_dict.update({"timestamp": today})
parse_dump(source_path: str, target_path: str) -> None

Parses the dump as generated by the chemrXiv API and this repo: https://github.com/cthoyt/chemrxiv-summarize into a format that is equal to that of biorXiv and medRxiv.

NOTE: This is a lazy parser trying to store all data in memory.

Parameters:

Name Type Description Default
source_path str

Path to the source dump

required
Source code in paperscraper/get_dumps/utils/chemrxiv/utils.py
def parse_dump(source_path: str, target_path: str) -> None:
    """
    Parses the dump as generated by the chemrXiv API and this repo:
    https://github.com/cthoyt/chemrxiv-summarize
    into a format that is equal to that of biorXiv and medRxiv.

    NOTE: This is a lazy parser trying to store all data in memory.

    Args:
        source_path: Path to the source dump
    """

    dump = []
    # Read source dump
    for file_name in tqdm(os.listdir(source_path)):
        if not file_name.endswith(".json"):
            continue
        filepath = os.path.join(source_path, file_name)
        if os.path.getsize(filepath) == 0:
            logger.warning(f"Empty chemRxiv dump file {filepath}; skipping.")
            os.remove(filepath)
            continue
        try:
            with open(filepath, "r") as f:
                source_paper = json.load(f)
        except JSONDecodeError as exc:
            logger.warning(f"Invalid JSON in chemRxiv dump file {filepath}: {exc}")
            os.remove(filepath)
            continue

        target_paper = {
            "title": source_paper["title"],
            "doi": source_paper["doi"],
            "published_doi": (
                source_paper["vor"]["vorDoi"] if source_paper["vor"] else "N.A."
            ),
            "published_url": (
                source_paper["vor"]["url"] if source_paper["vor"] else "N.A."
            ),
            "authors": get_author(source_paper["authors"]),
            "abstract": source_paper["abstract"],
            "date": get_date(source_paper["statusDate"]),
            "journal": "chemRxiv",
            "categories": get_categories(source_paper["categories"]),
            "metrics": get_metrics(source_paper["metrics"]),
            "license": source_paper["license"]["name"],
        }
        dump.append(target_paper)
        os.remove(filepath)
    # Write dump
    with open(target_path, "w") as f:
        for idx, target_paper in enumerate(dump):
            if idx > 0:
                f.write(os.linesep)
            f.write(json.dumps(target_paper))
    logger.info("Done, shutting down")
download_full_crossref(save_dir: str, api: Optional[CrossrefChemrxivAPI] = None) -> None

Download ChemRxiv records via Crossref into per-item JSON payloads.

This mirrors the behavior of the OpenEngage backend by storing one JSON payload per record in save_dir. The payloads are raw Crossref work items.

Parameters:

Name Type Description Default
save_dir str

Directory where per-item payloads are stored.

required
api Optional[CrossrefChemrxivAPI]

Crossref API client. If None, uses the widest possible date range.

None
Source code in paperscraper/get_dumps/utils/chemrxiv/utils.py
def download_full_crossref(
    save_dir: str, api: Optional[CrossrefChemrxivAPI] = None
) -> None:
    """Download ChemRxiv records via Crossref into per-item JSON payloads.

    This mirrors the behavior of the OpenEngage backend by
    storing one JSON payload per record in ``save_dir``. The payloads are raw
    Crossref work items.

    Args:
        save_dir: Directory where per-item payloads are stored.
        api: Crossref API client. If None, uses the widest possible date range.
    """
    if api is None:
        api = CrossrefChemrxivAPI(start_date="2017-01-01", end_date=today)
    os.makedirs(save_dir, exist_ok=True)

    for item in tqdm(api.iter_items()):
        doi = (item.get("DOI") or "").strip().lower()
        if not doi:
            continue
        safe_name = doi.replace("/", "_")
        path = os.path.join(save_dir, f"{safe_name}.json")
        if os.path.exists(path):
            continue

        tmp_path = f"{path}.tmp"
        try:
            with open(tmp_path, "w") as file:
                json.dump(item, file, indent=2)
            os.replace(tmp_path, path)
        finally:
            if os.path.exists(tmp_path):
                os.remove(tmp_path)
parse_dump_crossref(source_path: str, target_path: str) -> None

Parse Crossref payloads into the ChemRxiv JSONL dump format.

Parameters:

Name Type Description Default
source_path str

Directory containing per-item Crossref JSON payloads.

required
target_path str

JSONL output path.

required
Source code in paperscraper/get_dumps/utils/chemrxiv/utils.py
def parse_dump_crossref(source_path: str, target_path: str) -> None:
    """Parse Crossref payloads into the ChemRxiv JSONL dump format.

    Args:
        source_path: Directory containing per-item Crossref JSON payloads.
        target_path: JSONL output path.
    """
    dump: List[Dict] = []

    for file_name in tqdm(os.listdir(source_path)):
        if not file_name.endswith(".json"):
            continue
        filepath = os.path.join(source_path, file_name)
        if os.path.getsize(filepath) == 0:
            logger.warning(f"Empty Crossref payload file {filepath}; skipping.")
            os.remove(filepath)
            continue
        try:
            with open(filepath, "r") as f:
                source_item = json.load(f)
        except JSONDecodeError as exc:
            logger.warning(f"Invalid JSON in Crossref payload file {filepath}: {exc}")
            os.remove(filepath)
            continue

        dump.append(crossref_item_to_paper(source_item))
        os.remove(filepath)

    with open(target_path, "w") as f:
        for idx, paper in enumerate(dump):
            if idx > 0:
                f.write(os.linesep)
            f.write(json.dumps(paper))
    logger.info("Done, shutting down")