Skip to content

API Reference

paperscraper

Initialize the module.

dump_queries(keywords: List[List[Union[str, List[str]]]], dump_root: str) -> None

Performs keyword search on all available servers and dump the results.

Parameters:

Name Type Description Default
keywords List[List[Union[str, List[str]]]]

List of lists of keywords Each second-level list is considered a separate query. Within each query, each item (whether str or List[str]) are considered AND separated. If an item is again a list, strs are considered synonyms (OR separated).

required
dump_root str

Path to root for dumping.

required
Source code in paperscraper/__init__.py
def dump_queries(keywords: List[List[Union[str, List[str]]]], dump_root: str) -> None:
    """Performs keyword search on all available servers and dump the results.

    Args:
        keywords (List[List[Union[str, List[str]]]]): List of lists of keywords
            Each second-level list is considered a separate query. Within each
            query, each item (whether str or List[str]) are considered AND
            separated. If an item is again a list, strs are considered synonyms
            (OR separated).
        dump_root (str): Path to root for dumping.
    """

    for idx, keyword in enumerate(keywords):
        for db, f in QUERY_FN_DICT.items():
            logger.info(f" Keyword {idx + 1}/{len(keywords)}, DB: {db}")
            filename = get_filename_from_query(keyword)
            os.makedirs(os.path.join(dump_root, db), exist_ok=True)
            f(keyword, output_filepath=os.path.join(dump_root, db, filename))

arxiv

XRXivQuery

Query class.

Source code in paperscraper/xrxiv/xrxiv_query.py
class XRXivQuery:
    """Query class."""

    def __init__(
        self,
        dump_filepath: str,
        fields: List[str] = ["title", "doi", "authors", "abstract", "date", "journal"],
    ):
        """
        Initialize the query class.

        Args:
            dump_filepath (str): filepath to the dump to be queried.
            fields (List[str], optional): fields to contained in the dump per paper.
                Defaults to ['title', 'doi', 'authors', 'abstract', 'date', 'journal'].
        """
        self.dump_filepath = dump_filepath
        self.fields = fields
        self.errored = False

        try:
            self.df = pd.read_json(self.dump_filepath, lines=True)
            self.df["date"] = [date.strftime("%Y-%m-%d") for date in self.df["date"]]
        except ValueError as e:
            logger.warning(f"Problem in reading file {dump_filepath}: {e} - Skipping!")
            self.errored = True
        except KeyError as e:
            logger.warning(f"Key {e} missing in file from {dump_filepath} - Skipping!")
            self.errored = True

    def search_keywords(
        self,
        keywords: List[Union[str, List[str]]],
        fields: List[str] = None,
        output_filepath: str = None,
    ) -> pd.DataFrame:
        """
        Search for papers in the dump using keywords.

        Args:
            keywords (List[str, List[str]]): Items will be AND separated. If items
                are lists themselves, they will be OR separated.
            fields (List[str], optional): fields to be used in the query search.
                Defaults to None, a.k.a. search in all fields excluding date.
            output_filepath (str, optional): optional output filepath where to store
                the hits in JSONL format. Defaults to None, a.k.a., no export to a file.

        Returns:
            pd.DataFrame: A dataframe with one paper per row.
        """
        if fields is None:
            fields = self.fields
        fields = [field for field in fields if field != "date"]
        hits_per_field = []
        for field in fields:
            field_data = self.df[field].str.lower()
            hits_per_keyword = []
            for keyword in keywords:
                if isinstance(keyword, list):
                    query = "|".join([_.lower() for _ in keyword])
                else:
                    query = keyword.lower()
                hits_per_keyword.append(field_data.str.contains(query))
            if len(hits_per_keyword):
                keyword_hits = hits_per_keyword[0]
                for single_keyword_hits in hits_per_keyword[1:]:
                    keyword_hits &= single_keyword_hits
                hits_per_field.append(keyword_hits)
        if len(hits_per_field):
            hits = hits_per_field[0]
            for single_hits in hits_per_field[1:]:
                hits |= single_hits
        if output_filepath is not None:
            self.df[hits].to_json(output_filepath, orient="records", lines=True)
        return self.df[hits]
__init__(dump_filepath: str, fields: List[str] = ['title', 'doi', 'authors', 'abstract', 'date', 'journal'])

Initialize the query class.

Parameters:

Name Type Description Default
dump_filepath str

filepath to the dump to be queried.

required
fields List[str]

fields to contained in the dump per paper. Defaults to ['title', 'doi', 'authors', 'abstract', 'date', 'journal'].

['title', 'doi', 'authors', 'abstract', 'date', 'journal']
Source code in paperscraper/xrxiv/xrxiv_query.py
def __init__(
    self,
    dump_filepath: str,
    fields: List[str] = ["title", "doi", "authors", "abstract", "date", "journal"],
):
    """
    Initialize the query class.

    Args:
        dump_filepath (str): filepath to the dump to be queried.
        fields (List[str], optional): fields to contained in the dump per paper.
            Defaults to ['title', 'doi', 'authors', 'abstract', 'date', 'journal'].
    """
    self.dump_filepath = dump_filepath
    self.fields = fields
    self.errored = False

    try:
        self.df = pd.read_json(self.dump_filepath, lines=True)
        self.df["date"] = [date.strftime("%Y-%m-%d") for date in self.df["date"]]
    except ValueError as e:
        logger.warning(f"Problem in reading file {dump_filepath}: {e} - Skipping!")
        self.errored = True
    except KeyError as e:
        logger.warning(f"Key {e} missing in file from {dump_filepath} - Skipping!")
        self.errored = True
search_keywords(keywords: List[Union[str, List[str]]], fields: List[str] = None, output_filepath: str = None) -> pd.DataFrame

Search for papers in the dump using keywords.

Parameters:

Name Type Description Default
keywords List[str, List[str]]

Items will be AND separated. If items are lists themselves, they will be OR separated.

required
fields List[str]

fields to be used in the query search. Defaults to None, a.k.a. search in all fields excluding date.

None
output_filepath str

optional output filepath where to store the hits in JSONL format. Defaults to None, a.k.a., no export to a file.

None

Returns:

Type Description
DataFrame

pd.DataFrame: A dataframe with one paper per row.

Source code in paperscraper/xrxiv/xrxiv_query.py
def search_keywords(
    self,
    keywords: List[Union[str, List[str]]],
    fields: List[str] = None,
    output_filepath: str = None,
) -> pd.DataFrame:
    """
    Search for papers in the dump using keywords.

    Args:
        keywords (List[str, List[str]]): Items will be AND separated. If items
            are lists themselves, they will be OR separated.
        fields (List[str], optional): fields to be used in the query search.
            Defaults to None, a.k.a. search in all fields excluding date.
        output_filepath (str, optional): optional output filepath where to store
            the hits in JSONL format. Defaults to None, a.k.a., no export to a file.

    Returns:
        pd.DataFrame: A dataframe with one paper per row.
    """
    if fields is None:
        fields = self.fields
    fields = [field for field in fields if field != "date"]
    hits_per_field = []
    for field in fields:
        field_data = self.df[field].str.lower()
        hits_per_keyword = []
        for keyword in keywords:
            if isinstance(keyword, list):
                query = "|".join([_.lower() for _ in keyword])
            else:
                query = keyword.lower()
            hits_per_keyword.append(field_data.str.contains(query))
        if len(hits_per_keyword):
            keyword_hits = hits_per_keyword[0]
            for single_keyword_hits in hits_per_keyword[1:]:
                keyword_hits &= single_keyword_hits
            hits_per_field.append(keyword_hits)
    if len(hits_per_field):
        hits = hits_per_field[0]
        for single_hits in hits_per_field[1:]:
            hits |= single_hits
    if output_filepath is not None:
        self.df[hits].to_json(output_filepath, orient="records", lines=True)
    return self.df[hits]

dump_papers(papers: pd.DataFrame, filepath: str) -> None

Receives a pd.DataFrame, one paper per row and dumps it into a .jsonl file with one paper per line.

Parameters:

Name Type Description Default
papers DataFrame

A dataframe of paper metadata, one paper per row.

required
filepath str

Path to dump the papers, has to end with .jsonl.

required
Source code in paperscraper/utils.py
def dump_papers(papers: pd.DataFrame, filepath: str) -> None:
    """
    Receives a pd.DataFrame, one paper per row and dumps it into a .jsonl
    file with one paper per line.

    Args:
        papers (pd.DataFrame): A dataframe of paper metadata, one paper per row.
        filepath (str): Path to dump the papers, has to end with `.jsonl`.
    """
    if not isinstance(filepath, str):
        raise TypeError(f"filepath must be a string, not {type(filepath)}")
    if not filepath.endswith(".jsonl"):
        raise ValueError("Please provide a filepath with .jsonl extension")

    if isinstance(papers, List) and all([isinstance(p, Dict) for p in papers]):
        papers = pd.DataFrame(papers)
        logger.warning(
            "Preferably pass a pd.DataFrame, not a list of dictionaries. "
            "Passing a list is a legacy functionality that might become deprecated."
        )

    if not isinstance(papers, pd.DataFrame):
        raise TypeError(f"papers must be a pd.DataFrame, not {type(papers)}")

    paper_list = list(papers.T.to_dict().values())

    with open(filepath, "w") as f:
        for paper in paper_list:
            f.write(json.dumps(paper) + "\n")

get_query_from_keywords(keywords: List[Union[str, List[str]]], start_date: str = 'None', end_date: str = 'None') -> str

Receives a list of keywords and returns the query for the arxiv API.

Parameters:

Name Type Description Default
keywords List[str, List[str]]

Items will be AND separated. If items are lists themselves, they will be OR separated.

required
start_date str

Start date for the search. Needs to be in format: YYYY-MM-DD, e.g. '2020-07-20'. Defaults to 'None', i.e. no specific dates are used.

'None'
end_date str

End date for the search. Same notation as start_date.

'None'

Returns:

Name Type Description
str str

query to enter to arxiv API.

Source code in paperscraper/arxiv/utils.py
def get_query_from_keywords(
    keywords: List[Union[str, List[str]]],
    start_date: str = "None",
    end_date: str = "None",
) -> str:
    """Receives a list of keywords and returns the query for the arxiv API.

    Args:
        keywords (List[str, List[str]]): Items will be AND separated. If items
            are lists themselves, they will be OR separated.
        start_date (str): Start date for the search. Needs to be in format:
            YYYY-MM-DD, e.g. '2020-07-20'. Defaults to 'None', i.e. no specific
            dates are used.
        end_date (str): End date for the search. Same notation as start_date.

    Returns:
        str: query to enter to arxiv API.
    """

    query = ""
    for i, key in enumerate(keywords):
        if isinstance(key, str):
            query += f"all:{key} AND "
        elif isinstance(key, list):
            inter = "".join([f"all:{syn} OR " for syn in key])
            query += finalize_disjunction(inter)

    query = finalize_conjunction(query)
    if start_date == "None" and end_date == "None":
        return query
    elif start_date == "None":
        start_date = EARLIEST_START
    elif end_date == "None":
        end_date = datetime.now().strftime("%Y-%m-%d")

    start = format_date(start_date)
    end = format_date(end_date)
    date_filter = f" AND submittedDate:[{start} TO {end}]"
    return query + date_filter

get_arxiv_papers_local(keywords: List[Union[str, List[str]]], fields: List[str] = None, output_filepath: str = None) -> pd.DataFrame

Search for papers in the dump using keywords.

Parameters:

Name Type Description Default
keywords List[Union[str, List[str]]]

Items will be AND separated. If items are lists themselves, they will be OR separated.

required
fields List[str]

fields to be used in the query search. Defaults to None, a.k.a. search in all fields excluding date.

None
output_filepath str

optional output filepath where to store the hits in JSONL format. Defaults to None, a.k.a., no export to a file.

None

Returns:

Type Description
DataFrame

pd.DataFrame: A dataframe with one paper per row.

Source code in paperscraper/arxiv/arxiv.py
def get_arxiv_papers_local(
    keywords: List[Union[str, List[str]]],
    fields: List[str] = None,
    output_filepath: str = None,
) -> pd.DataFrame:
    """
    Search for papers in the dump using keywords.

    Args:
        keywords: Items will be AND separated. If items
            are lists themselves, they will be OR separated.
        fields: fields to be used in the query search.
            Defaults to None, a.k.a. search in all fields excluding date.
        output_filepath: optional output filepath where to store the hits in JSONL format.
            Defaults to None, a.k.a., no export to a file.

    Returns:
        pd.DataFrame: A dataframe with one paper per row.
    """
    search_local_arxiv()
    if ARXIV_QUERIER is None:
        raise ValueError(
            "Could not find local arxiv dump. Use `backend=api` or download dump via `paperscraper.get_dumps.arxiv"
        )
    return ARXIV_QUERIER(
        keywords=keywords, fields=fields, output_filepath=output_filepath
    )

get_arxiv_papers_api(query: str, fields: List = ['title', 'authors', 'date', 'abstract', 'journal', 'doi'], max_results: int = 99999, client_options: Dict = {'num_retries': 10}, search_options: Dict = dict(), verbose: bool = True) -> pd.DataFrame

Performs arxiv API request of a given query and returns list of papers with fields as desired.

Parameters:

Name Type Description Default
query str

Query to arxiv API. Needs to match the arxiv API notation.

required
fields List

List of strings with fields to keep in output.

['title', 'authors', 'date', 'abstract', 'journal', 'doi']
max_results int

Maximal number of results, defaults to 99999.

99999
client_options Dict

Optional arguments for arxiv.Client. E.g.: page_size (int), delay_seconds (int), num_retries (int). NOTE: Decreasing 'num_retries' will speed up processing but might result in more frequent 'UnexpectedEmptyPageErrors'.

{'num_retries': 10}
search_options Dict

Optional arguments for arxiv.Search. E.g.: id_list (List), sort_by, or sort_order.

dict()

Returns:

Type Description
DataFrame

pd.DataFrame: One row per paper.

Source code in paperscraper/arxiv/arxiv.py
def get_arxiv_papers_api(
    query: str,
    fields: List = ["title", "authors", "date", "abstract", "journal", "doi"],
    max_results: int = 99999,
    client_options: Dict = {"num_retries": 10},
    search_options: Dict = dict(),
    verbose: bool = True,
) -> pd.DataFrame:
    """
    Performs arxiv API request of a given query and returns list of papers with
    fields as desired.

    Args:
        query: Query to arxiv API. Needs to match the arxiv API notation.
        fields: List of strings with fields to keep in output.
        max_results: Maximal number of results, defaults to 99999.
        client_options: Optional arguments for `arxiv.Client`. E.g.:
            page_size (int), delay_seconds (int), num_retries (int).
            NOTE: Decreasing 'num_retries' will speed up processing but might
            result in more frequent 'UnexpectedEmptyPageErrors'.
        search_options: Optional arguments for `arxiv.Search`. E.g.:
            id_list (List), sort_by, or sort_order.

    Returns:
        pd.DataFrame: One row per paper.

    """
    client = arxiv.Client(**client_options)
    search = arxiv.Search(query=query, max_results=max_results, **search_options)
    results = client.results(search)

    processed = pd.DataFrame(
        [
            {
                arxiv_field_mapper.get(key, key): process_fields.get(
                    arxiv_field_mapper.get(key, key), lambda x: x
                )(value)
                for key, value in vars(paper).items()
                if arxiv_field_mapper.get(key, key) in fields and key != "doi"
            }
            for paper in tqdm(results, desc=f"Processing {query}", disable=not verbose)
        ]
    )
    return processed

get_and_dump_arxiv_papers(keywords: List[Union[str, List[str]]], output_filepath: str, fields: List = ['title', 'authors', 'date', 'abstract', 'journal', 'doi'], start_date: str = 'None', end_date: str = 'None', backend: Literal['api', 'local', 'infer'] = 'api', *args, **kwargs)

Combines get_arxiv_papers and dump_papers.

Parameters:

Name Type Description Default
keywords List[Union[str, List[str]]]

List of keywords for arxiv search. The outer list level will be considered as AND separated keys, the inner level as OR separated.

required
output_filepath str

Path where the dump will be saved.

required
fields List

List of strings with fields to keep in output. Defaults to ['title', 'authors', 'date', 'abstract', 'journal', 'doi'].

['title', 'authors', 'date', 'abstract', 'journal', 'doi']
start_date str

Start date for the search. Needs to be in format: YYYY/MM/DD, e.g. '2020/07/20'. Defaults to 'None', i.e. no specific dates are used.

'None'
end_date str

End date for the search. Same notation as start_date.

'None'
backend Literal['api', 'local', 'infer']

If api, the arXiv API is queried. If local the local arXiv dump is queried (has to be downloaded before). If infer the local dump will be used if exists, otherwise API will be queried. Defaults to api since it is faster.

'api'
Source code in paperscraper/arxiv/arxiv.py
def get_and_dump_arxiv_papers(
    keywords: List[Union[str, List[str]]],
    output_filepath: str,
    fields: List = ["title", "authors", "date", "abstract", "journal", "doi"],
    start_date: str = "None",
    end_date: str = "None",
    backend: Literal["api", "local", "infer"] = "api",
    *args,
    **kwargs,
):
    """
    Combines get_arxiv_papers and dump_papers.

    Args:
        keywords: List of keywords for arxiv search.
            The outer list level will be considered as AND separated keys, the
            inner level as OR separated.
        output_filepath: Path where the dump will be saved.
        fields: List of strings with fields to keep in output.
            Defaults to ['title', 'authors', 'date', 'abstract',
            'journal', 'doi'].
        start_date: Start date for the search. Needs to be in format:
            YYYY/MM/DD, e.g. '2020/07/20'. Defaults to 'None', i.e. no specific
            dates are used.
        end_date: End date for the search. Same notation as start_date.
        backend: If `api`, the arXiv API is queried. If `local` the local arXiv dump
            is queried (has to be downloaded before). If `infer` the local dump will
            be used if exists, otherwise API will be queried. Defaults to `api`
            since it is faster.
        *args, **kwargs are additional arguments for `get_arxiv_papers`.
    """
    # Translate keywords into query.
    query = get_query_from_keywords(keywords, start_date=start_date, end_date=end_date)

    if backend not in {"api", "local", "infer"}:
        raise ValueError(
            f"Invalid backend: {backend}. Must be one of ['api', 'local', 'infer']"
        )
    elif backend == "infer":
        backend = infer_backend()

    if backend == "api":
        papers = get_arxiv_papers_api(query, fields, *args, **kwargs)
    elif backend == "local":
        papers = get_arxiv_papers_local(keywords, fields, *args, **kwargs)
    dump_papers(papers, output_filepath)

arxiv

get_arxiv_papers_local(keywords: List[Union[str, List[str]]], fields: List[str] = None, output_filepath: str = None) -> pd.DataFrame

Search for papers in the dump using keywords.

Parameters:

Name Type Description Default
keywords List[Union[str, List[str]]]

Items will be AND separated. If items are lists themselves, they will be OR separated.

required
fields List[str]

fields to be used in the query search. Defaults to None, a.k.a. search in all fields excluding date.

None
output_filepath str

optional output filepath where to store the hits in JSONL format. Defaults to None, a.k.a., no export to a file.

None

Returns:

Type Description
DataFrame

pd.DataFrame: A dataframe with one paper per row.

Source code in paperscraper/arxiv/arxiv.py
def get_arxiv_papers_local(
    keywords: List[Union[str, List[str]]],
    fields: List[str] = None,
    output_filepath: str = None,
) -> pd.DataFrame:
    """
    Search for papers in the dump using keywords.

    Args:
        keywords: Items will be AND separated. If items
            are lists themselves, they will be OR separated.
        fields: fields to be used in the query search.
            Defaults to None, a.k.a. search in all fields excluding date.
        output_filepath: optional output filepath where to store the hits in JSONL format.
            Defaults to None, a.k.a., no export to a file.

    Returns:
        pd.DataFrame: A dataframe with one paper per row.
    """
    search_local_arxiv()
    if ARXIV_QUERIER is None:
        raise ValueError(
            "Could not find local arxiv dump. Use `backend=api` or download dump via `paperscraper.get_dumps.arxiv"
        )
    return ARXIV_QUERIER(
        keywords=keywords, fields=fields, output_filepath=output_filepath
    )
get_arxiv_papers_api(query: str, fields: List = ['title', 'authors', 'date', 'abstract', 'journal', 'doi'], max_results: int = 99999, client_options: Dict = {'num_retries': 10}, search_options: Dict = dict(), verbose: bool = True) -> pd.DataFrame

Performs arxiv API request of a given query and returns list of papers with fields as desired.

Parameters:

Name Type Description Default
query str

Query to arxiv API. Needs to match the arxiv API notation.

required
fields List

List of strings with fields to keep in output.

['title', 'authors', 'date', 'abstract', 'journal', 'doi']
max_results int

Maximal number of results, defaults to 99999.

99999
client_options Dict

Optional arguments for arxiv.Client. E.g.: page_size (int), delay_seconds (int), num_retries (int). NOTE: Decreasing 'num_retries' will speed up processing but might result in more frequent 'UnexpectedEmptyPageErrors'.

{'num_retries': 10}
search_options Dict

Optional arguments for arxiv.Search. E.g.: id_list (List), sort_by, or sort_order.

dict()

Returns:

Type Description
DataFrame

pd.DataFrame: One row per paper.

Source code in paperscraper/arxiv/arxiv.py
def get_arxiv_papers_api(
    query: str,
    fields: List = ["title", "authors", "date", "abstract", "journal", "doi"],
    max_results: int = 99999,
    client_options: Dict = {"num_retries": 10},
    search_options: Dict = dict(),
    verbose: bool = True,
) -> pd.DataFrame:
    """
    Performs arxiv API request of a given query and returns list of papers with
    fields as desired.

    Args:
        query: Query to arxiv API. Needs to match the arxiv API notation.
        fields: List of strings with fields to keep in output.
        max_results: Maximal number of results, defaults to 99999.
        client_options: Optional arguments for `arxiv.Client`. E.g.:
            page_size (int), delay_seconds (int), num_retries (int).
            NOTE: Decreasing 'num_retries' will speed up processing but might
            result in more frequent 'UnexpectedEmptyPageErrors'.
        search_options: Optional arguments for `arxiv.Search`. E.g.:
            id_list (List), sort_by, or sort_order.

    Returns:
        pd.DataFrame: One row per paper.

    """
    client = arxiv.Client(**client_options)
    search = arxiv.Search(query=query, max_results=max_results, **search_options)
    results = client.results(search)

    processed = pd.DataFrame(
        [
            {
                arxiv_field_mapper.get(key, key): process_fields.get(
                    arxiv_field_mapper.get(key, key), lambda x: x
                )(value)
                for key, value in vars(paper).items()
                if arxiv_field_mapper.get(key, key) in fields and key != "doi"
            }
            for paper in tqdm(results, desc=f"Processing {query}", disable=not verbose)
        ]
    )
    return processed
get_and_dump_arxiv_papers(keywords: List[Union[str, List[str]]], output_filepath: str, fields: List = ['title', 'authors', 'date', 'abstract', 'journal', 'doi'], start_date: str = 'None', end_date: str = 'None', backend: Literal['api', 'local', 'infer'] = 'api', *args, **kwargs)

Combines get_arxiv_papers and dump_papers.

Parameters:

Name Type Description Default
keywords List[Union[str, List[str]]]

List of keywords for arxiv search. The outer list level will be considered as AND separated keys, the inner level as OR separated.

required
output_filepath str

Path where the dump will be saved.

required
fields List

List of strings with fields to keep in output. Defaults to ['title', 'authors', 'date', 'abstract', 'journal', 'doi'].

['title', 'authors', 'date', 'abstract', 'journal', 'doi']
start_date str

Start date for the search. Needs to be in format: YYYY/MM/DD, e.g. '2020/07/20'. Defaults to 'None', i.e. no specific dates are used.

'None'
end_date str

End date for the search. Same notation as start_date.

'None'
backend Literal['api', 'local', 'infer']

If api, the arXiv API is queried. If local the local arXiv dump is queried (has to be downloaded before). If infer the local dump will be used if exists, otherwise API will be queried. Defaults to api since it is faster.

'api'
Source code in paperscraper/arxiv/arxiv.py
def get_and_dump_arxiv_papers(
    keywords: List[Union[str, List[str]]],
    output_filepath: str,
    fields: List = ["title", "authors", "date", "abstract", "journal", "doi"],
    start_date: str = "None",
    end_date: str = "None",
    backend: Literal["api", "local", "infer"] = "api",
    *args,
    **kwargs,
):
    """
    Combines get_arxiv_papers and dump_papers.

    Args:
        keywords: List of keywords for arxiv search.
            The outer list level will be considered as AND separated keys, the
            inner level as OR separated.
        output_filepath: Path where the dump will be saved.
        fields: List of strings with fields to keep in output.
            Defaults to ['title', 'authors', 'date', 'abstract',
            'journal', 'doi'].
        start_date: Start date for the search. Needs to be in format:
            YYYY/MM/DD, e.g. '2020/07/20'. Defaults to 'None', i.e. no specific
            dates are used.
        end_date: End date for the search. Same notation as start_date.
        backend: If `api`, the arXiv API is queried. If `local` the local arXiv dump
            is queried (has to be downloaded before). If `infer` the local dump will
            be used if exists, otherwise API will be queried. Defaults to `api`
            since it is faster.
        *args, **kwargs are additional arguments for `get_arxiv_papers`.
    """
    # Translate keywords into query.
    query = get_query_from_keywords(keywords, start_date=start_date, end_date=end_date)

    if backend not in {"api", "local", "infer"}:
        raise ValueError(
            f"Invalid backend: {backend}. Must be one of ['api', 'local', 'infer']"
        )
    elif backend == "infer":
        backend = infer_backend()

    if backend == "api":
        papers = get_arxiv_papers_api(query, fields, *args, **kwargs)
    elif backend == "local":
        papers = get_arxiv_papers_local(keywords, fields, *args, **kwargs)
    dump_papers(papers, output_filepath)

utils

format_date(date_str: str) -> str

Converts a date in YYYY-MM-DD format to arXiv's YYYYMMDDTTTT format.

Source code in paperscraper/arxiv/utils.py
def format_date(date_str: str) -> str:
    """Converts a date in YYYY-MM-DD format to arXiv's YYYYMMDDTTTT format."""
    date_obj = datetime.strptime(date_str, "%Y-%m-%d")
    return date_obj.strftime("%Y%m%d0000")
get_query_from_keywords(keywords: List[Union[str, List[str]]], start_date: str = 'None', end_date: str = 'None') -> str

Receives a list of keywords and returns the query for the arxiv API.

Parameters:

Name Type Description Default
keywords List[str, List[str]]

Items will be AND separated. If items are lists themselves, they will be OR separated.

required
start_date str

Start date for the search. Needs to be in format: YYYY-MM-DD, e.g. '2020-07-20'. Defaults to 'None', i.e. no specific dates are used.

'None'
end_date str

End date for the search. Same notation as start_date.

'None'

Returns:

Name Type Description
str str

query to enter to arxiv API.

Source code in paperscraper/arxiv/utils.py
def get_query_from_keywords(
    keywords: List[Union[str, List[str]]],
    start_date: str = "None",
    end_date: str = "None",
) -> str:
    """Receives a list of keywords and returns the query for the arxiv API.

    Args:
        keywords (List[str, List[str]]): Items will be AND separated. If items
            are lists themselves, they will be OR separated.
        start_date (str): Start date for the search. Needs to be in format:
            YYYY-MM-DD, e.g. '2020-07-20'. Defaults to 'None', i.e. no specific
            dates are used.
        end_date (str): End date for the search. Same notation as start_date.

    Returns:
        str: query to enter to arxiv API.
    """

    query = ""
    for i, key in enumerate(keywords):
        if isinstance(key, str):
            query += f"all:{key} AND "
        elif isinstance(key, list):
            inter = "".join([f"all:{syn} OR " for syn in key])
            query += finalize_disjunction(inter)

    query = finalize_conjunction(query)
    if start_date == "None" and end_date == "None":
        return query
    elif start_date == "None":
        start_date = EARLIEST_START
    elif end_date == "None":
        end_date = datetime.now().strftime("%Y-%m-%d")

    start = format_date(start_date)
    end = format_date(end_date)
    date_filter = f" AND submittedDate:[{start} TO {end}]"
    return query + date_filter

async_utils

optional_async(func: Callable[..., Awaitable[T]]) -> Callable[..., Union[T, Awaitable[T]]]

Allows an async function to be called from sync code (blocks until done) or from within an async context (returns a coroutine to await).

Source code in paperscraper/async_utils.py
def optional_async(
    func: Callable[..., Awaitable[T]],
) -> Callable[..., Union[T, Awaitable[T]]]:
    """
    Allows an async function to be called from sync code (blocks until done)
    or from within an async context (returns a coroutine to await).
    """

    @wraps(func)
    def wrapper(*args, **kwargs) -> Union[T, Awaitable[T]]:
        coro = func(*args, **kwargs)
        try:
            # If we're already in an asyncio loop, hand back the coroutine:
            asyncio.get_running_loop()
            return coro  # caller must await it
        except RuntimeError:
            # Otherwise, schedule on the background loop and block
            future = asyncio.run_coroutine_threadsafe(coro, _background_loop)
            return future.result()

    return wrapper

retry_with_exponential_backoff(*, max_retries: int = 5, base_delay: float = 1.0) -> Callable[[F], F]

Decorator factory that retries an async def on HTTP 429, with exponential backoff.

Parameters:

Name Type Description Default
max_retries int

how many times to retry before giving up.

5
base_delay float

initial delay in seconds; next delays will be duplication of previous.

1.0
@retry_with_exponential_backoff(max_retries=3, base_delay=0.5)
async def fetch_data(...):
    ...
Source code in paperscraper/async_utils.py
def retry_with_exponential_backoff(
    *, max_retries: int = 5, base_delay: float = 1.0
) -> Callable[[F], F]:
    """
    Decorator factory that retries an `async def` on HTTP 429, with exponential backoff.

    Args:
        max_retries: how many times to retry before giving up.
        base_delay: initial delay in seconds; next delays will be duplication of previous.

    Usage:

        @retry_with_exponential_backoff(max_retries=3, base_delay=0.5)
        async def fetch_data(...):
            ...

    """

    def decorator(func: F) -> F:
        @wraps(func)
        async def wrapper(*args, **kwargs) -> Any:
            delay = base_delay
            for attempt in range(max_retries):
                try:
                    return await func(*args, **kwargs)
                except httpx.HTTPStatusError as e:
                    # only retry on 429
                    status = e.response.status_code if e.response is not None else None
                    if status != 429 or attempt == max_retries - 1:
                        raise
                # backoff
                await asyncio.sleep(delay)
                delay *= 2
            # in theory we never reach here

        return wrapper

    return decorator

citations

citations

get_citations_by_doi(doi: str) -> int

Get the number of citations of a paper according to semantic scholar.

Parameters:

Name Type Description Default
doi str

the DOI of the paper.

required

Returns:

Type Description
int

The number of citations

Source code in paperscraper/citations/citations.py
def get_citations_by_doi(doi: str) -> int:
    """
    Get the number of citations of a paper according to semantic scholar.

    Args:
        doi: the DOI of the paper.

    Returns:
        The number of citations
    """

    try:
        paper = sch.get_paper(doi)
        citations = len(paper["citations"])
    except SemanticScholarException.ObjectNotFoundException:
        logger.warning(f"Could not find paper {doi}, assuming 0 citation.")
        citations = 0
    except ConnectionRefusedError as e:
        logger.warning(f"Waiting for 10 sec since {doi} gave: {e}")
        sleep(10)
        citations = len(sch.get_paper(doi)["citations"])
    finally:
        return citations
get_citations_from_title(title: str) -> int

Parameters:

Name Type Description Default
title str

Title of paper to be searched on Scholar.

required

Raises:

Type Description
TypeError

If sth else than str is passed.

Returns:

Name Type Description
int int

Number of citations of paper.

Source code in paperscraper/citations/citations.py
def get_citations_from_title(title: str) -> int:
    """
    Args:
        title (str): Title of paper to be searched on Scholar.

    Raises:
        TypeError: If sth else than str is passed.

    Returns:
        int: Number of citations of paper.
    """

    if not isinstance(title, str):
        raise TypeError(f"Pass str not {type(title)}")

    # Search for exact match
    title = '"' + title.strip() + '"'

    matches = scholarly.search_pubs(title)
    counts = list(map(lambda p: int(p["num_citations"]), matches))
    if len(counts) == 0:
        logger.warning(f"Found no match for {title}.")
        return 0
    if len(counts) > 1:
        logger.warning(f"Found {len(counts)} matches for {title}, returning first one.")
    return counts[0]

entity

core
Entity

An abstract entity class with a set of utilities shared by the objects that perform self-linking analyses, such as Paper and Researcher.

Source code in paperscraper/citations/entity/core.py
class Entity:
    """
    An abstract entity class with a set of utilities shared by the objects that perform
    self-linking analyses, such as Paper and Researcher.
    """

    @abstractmethod
    def self_references(self):
        """
        Has to be implemented by the child class. Performs a self-referencing analyses
        for the object.
        """
        ...

    @abstractmethod
    def self_citations(self):
        """
        Has to be implemented by the child class. Performs a self-citation analyses
        for the object.
        """
        ...

    @abstractmethod
    def get_result(self):
        """
        Has to be implemented by the child class. Provides the result of the analysis.
        """
        ...
self_references() abstractmethod

Has to be implemented by the child class. Performs a self-referencing analyses for the object.

Source code in paperscraper/citations/entity/core.py
@abstractmethod
def self_references(self):
    """
    Has to be implemented by the child class. Performs a self-referencing analyses
    for the object.
    """
    ...
self_citations() abstractmethod

Has to be implemented by the child class. Performs a self-citation analyses for the object.

Source code in paperscraper/citations/entity/core.py
@abstractmethod
def self_citations(self):
    """
    Has to be implemented by the child class. Performs a self-citation analyses
    for the object.
    """
    ...
get_result() abstractmethod

Has to be implemented by the child class. Provides the result of the analysis.

Source code in paperscraper/citations/entity/core.py
@abstractmethod
def get_result(self):
    """
    Has to be implemented by the child class. Provides the result of the analysis.
    """
    ...
paper
Paper

Bases: Entity

Source code in paperscraper/citations/entity/paper.py
class Paper(Entity):
    title: str = ""
    doi: str = ""
    authors: List[str] = []

    def __init__(self, input: str, mode: ModeType = "infer"):
        """
        Set up a Paper object for analysis.

        Args:
            input: Paper identifier. This can be the title, DOI or semantic scholar ID
                of the paper.
            mode: The format in which the ID was provided. Defaults to "infer".

        Raises:
            ValueError: If unknown mode is given.
        """
        if mode not in MODES:
            raise ValueError(f"Unknown mode {mode} chose from {MODES}.")

        input = input.strip()
        self.input = input
        if mode == "infer":
            mode = determine_paper_input_type(input)

        if mode == "doi":
            self.doi = input
        elif mode == "title":
            self.doi = get_doi_from_title(input)
        elif mode == "ssid":
            self.doi = get_doi_from_ssid(input)

        if self.doi is not None:
            out = get_title_and_id_from_doi(self.doi)
            if out is not None:
                self.title = out["title"]
                self.ssid = out["ssid"]

    def self_references(self):
        """
        Extracts the self references of a paper, for each author.
        """
        if isinstance(self.doi, str):
            self.ref_result: ReferenceResult = self_references_paper(self.doi)

    def self_citations(self):
        """
        Extracts the self citations of a paper, for each author.
        """
        if isinstance(self.doi, str):
            self.citation_result: CitationResult = self_citations_paper(self.doi)

    def get_result(self) -> Optional[PaperResult]:
        """
        Provides the result of the analysis.

        Returns: PaperResult if available.
        """
        if not hasattr(self, "ref_result"):
            logger.warning(
                f"Can't get result since no referencing result for {self.input} exists. Run `.self_references` first."
            )
            return
        elif not hasattr(self, "citation_result"):
            logger.warning(
                f"Can't get result since no citation result for {self.input} exists. Run `.self_citations` first."
            )
            return
        ref_result = self.ref_result.model_dump()
        ref_result.pop("ssid", None)
        return PaperResult(
            title=self.title, **ref_result, **self.citation_result.model_dump()
        )
__init__(input: str, mode: ModeType = 'infer')

Set up a Paper object for analysis.

Parameters:

Name Type Description Default
input str

Paper identifier. This can be the title, DOI or semantic scholar ID of the paper.

required
mode ModeType

The format in which the ID was provided. Defaults to "infer".

'infer'

Raises:

Type Description
ValueError

If unknown mode is given.

Source code in paperscraper/citations/entity/paper.py
def __init__(self, input: str, mode: ModeType = "infer"):
    """
    Set up a Paper object for analysis.

    Args:
        input: Paper identifier. This can be the title, DOI or semantic scholar ID
            of the paper.
        mode: The format in which the ID was provided. Defaults to "infer".

    Raises:
        ValueError: If unknown mode is given.
    """
    if mode not in MODES:
        raise ValueError(f"Unknown mode {mode} chose from {MODES}.")

    input = input.strip()
    self.input = input
    if mode == "infer":
        mode = determine_paper_input_type(input)

    if mode == "doi":
        self.doi = input
    elif mode == "title":
        self.doi = get_doi_from_title(input)
    elif mode == "ssid":
        self.doi = get_doi_from_ssid(input)

    if self.doi is not None:
        out = get_title_and_id_from_doi(self.doi)
        if out is not None:
            self.title = out["title"]
            self.ssid = out["ssid"]
self_references()

Extracts the self references of a paper, for each author.

Source code in paperscraper/citations/entity/paper.py
def self_references(self):
    """
    Extracts the self references of a paper, for each author.
    """
    if isinstance(self.doi, str):
        self.ref_result: ReferenceResult = self_references_paper(self.doi)
self_citations()

Extracts the self citations of a paper, for each author.

Source code in paperscraper/citations/entity/paper.py
def self_citations(self):
    """
    Extracts the self citations of a paper, for each author.
    """
    if isinstance(self.doi, str):
        self.citation_result: CitationResult = self_citations_paper(self.doi)
get_result() -> Optional[PaperResult]

Provides the result of the analysis.

Returns: PaperResult if available.

Source code in paperscraper/citations/entity/paper.py
def get_result(self) -> Optional[PaperResult]:
    """
    Provides the result of the analysis.

    Returns: PaperResult if available.
    """
    if not hasattr(self, "ref_result"):
        logger.warning(
            f"Can't get result since no referencing result for {self.input} exists. Run `.self_references` first."
        )
        return
    elif not hasattr(self, "citation_result"):
        logger.warning(
            f"Can't get result since no citation result for {self.input} exists. Run `.self_citations` first."
        )
        return
    ref_result = self.ref_result.model_dump()
    ref_result.pop("ssid", None)
    return PaperResult(
        title=self.title, **ref_result, **self.citation_result.model_dump()
    )
researcher
Researcher

Bases: Entity

Source code in paperscraper/citations/entity/researcher.py
class Researcher(Entity):
    name: str
    ssid: int
    orcid: Optional[str] = None

    def __init__(self, input: str, mode: ModeType = "infer"):
        """
        Construct researcher object for self citation/reference analysis.

        Args:
            input: A researcher to search for.
            mode: This can be a `name` `orcid` (ORCID iD) or `ssaid` (Semantic Scholar Author ID).
                Defaults to "infer".

        Raises:
            ValueError: Unknown mode
        """
        if mode not in MODES:
            raise ValueError(f"Unknown mode {mode} chose from {MODES}.")

        input = input.strip()
        if mode == "infer":
            if input.isdigit():
                mode = "ssaid"
            elif (
                input.count("-") == 3
                and len(input) == 19
                and all([x.isdigit() for x in input.split("-")])
            ):
                mode = "orcid"
            else:
                mode = "author"

        if mode == "ssaid":
            self.author = sch.get_author(input)
            self.ssid = input
        elif mode == "orcid":
            self.author = orcid_to_author_name(input)
            self.orcid = input
            self.ssid = author_name_to_ssaid(input)
        elif mode == "author":
            self.author = input
            self.ssid = author_name_to_ssaid(input)

        # TODO: Skip over erratum / corrigendum
        self.ssids = get_papers_for_author(self.ssid)

    def self_references(self):
        """
        Sifts through all papers of a researcher and extracts the self references.
        """
        # TODO: Asynchronous call to self_references
        print("Going through SSIDs", self.ssids)

        # TODO: Aggregate results

    def self_citations(self):
        """
        Sifts through all papers of a researcher and finds how often they are self-cited.
        """
        ...

    def get_result(self) -> ResearcherResult:
        """
        Provides the result of the analysis.
        """
        ...
__init__(input: str, mode: ModeType = 'infer')

Construct researcher object for self citation/reference analysis.

Parameters:

Name Type Description Default
input str

A researcher to search for.

required
mode ModeType

This can be a name orcid (ORCID iD) or ssaid (Semantic Scholar Author ID). Defaults to "infer".

'infer'

Raises:

Type Description
ValueError

Unknown mode

Source code in paperscraper/citations/entity/researcher.py
def __init__(self, input: str, mode: ModeType = "infer"):
    """
    Construct researcher object for self citation/reference analysis.

    Args:
        input: A researcher to search for.
        mode: This can be a `name` `orcid` (ORCID iD) or `ssaid` (Semantic Scholar Author ID).
            Defaults to "infer".

    Raises:
        ValueError: Unknown mode
    """
    if mode not in MODES:
        raise ValueError(f"Unknown mode {mode} chose from {MODES}.")

    input = input.strip()
    if mode == "infer":
        if input.isdigit():
            mode = "ssaid"
        elif (
            input.count("-") == 3
            and len(input) == 19
            and all([x.isdigit() for x in input.split("-")])
        ):
            mode = "orcid"
        else:
            mode = "author"

    if mode == "ssaid":
        self.author = sch.get_author(input)
        self.ssid = input
    elif mode == "orcid":
        self.author = orcid_to_author_name(input)
        self.orcid = input
        self.ssid = author_name_to_ssaid(input)
    elif mode == "author":
        self.author = input
        self.ssid = author_name_to_ssaid(input)

    # TODO: Skip over erratum / corrigendum
    self.ssids = get_papers_for_author(self.ssid)
self_references()

Sifts through all papers of a researcher and extracts the self references.

Source code in paperscraper/citations/entity/researcher.py
def self_references(self):
    """
    Sifts through all papers of a researcher and extracts the self references.
    """
    # TODO: Asynchronous call to self_references
    print("Going through SSIDs", self.ssids)
self_citations()

Sifts through all papers of a researcher and finds how often they are self-cited.

Source code in paperscraper/citations/entity/researcher.py
def self_citations(self):
    """
    Sifts through all papers of a researcher and finds how often they are self-cited.
    """
    ...
get_result() -> ResearcherResult

Provides the result of the analysis.

Source code in paperscraper/citations/entity/researcher.py
def get_result(self) -> ResearcherResult:
    """
    Provides the result of the analysis.
    """
    ...

orcid

orcid_to_author_name(orcid_id: str) -> Optional[str]

Given an ORCID ID (as a string, e.g. '0000-0002-1825-0097'), returns the full name of the author from the ORCID public API.

Source code in paperscraper/citations/orcid.py
def orcid_to_author_name(orcid_id: str) -> Optional[str]:
    """
    Given an ORCID ID (as a string, e.g. '0000-0002-1825-0097'),
    returns the full name of the author from the ORCID public API.
    """

    headers = {"Accept": "application/json"}
    response = requests.get(f"{BASE_URL}{orcid_id}/person", headers=headers)
    if response.status_code == 200:
        data = response.json()
        given = data.get("name", {}).get("given-names", {}).get("value", "")
        family = data.get("name", {}).get("family-name", {}).get("value", "")
        full_name = f"{given} {family}".strip()
        return full_name
    logger.error(
        f"Error fetching ORCID data ({orcid_id}): {response.status_code} {response.text}"
    )

self_citations

self_citations_paper(inputs: Union[str, List[str]], verbose: bool = False) -> Union[CitationResult, List[CitationResult]] async

Analyze self-citations for one or more papers by DOI or Semantic Scholar ID.

Parameters:

Name Type Description Default
inputs Union[str, List[str]]

A single DOI/SSID string or a list of them.

required
verbose bool

If True, logs detailed information for each paper.

False

Returns:

Type Description
Union[CitationResult, List[CitationResult]]

A single CitationResult if a string was passed, else a list of CitationResults.

Source code in paperscraper/citations/self_citations.py
@optional_async
@retry_with_exponential_backoff(max_retries=4, base_delay=1.0)
async def self_citations_paper(
    inputs: Union[str, List[str]], verbose: bool = False
) -> Union[CitationResult, List[CitationResult]]:
    """
    Analyze self-citations for one or more papers by DOI or Semantic Scholar ID.

    Args:
        inputs: A single DOI/SSID string or a list of them.
        verbose: If True, logs detailed information for each paper.

    Returns:
        A single CitationResult if a string was passed, else a list of CitationResults.
    """
    single_input = isinstance(inputs, str)
    identifiers = [inputs] if single_input else list(inputs)

    async with httpx.AsyncClient(timeout=httpx.Timeout(20)) as client:
        tasks = [_process_single(client, ident) for ident in identifiers]
        results = await asyncio.gather(*tasks)

    if verbose:
        for res in results:
            logger.info(
                f'Self-citations in "{res.ssid}": N={res.num_citations}, Score={res.citation_score}%'
            )
            for author, pct in res.self_citations.items():
                logger.info(f"  {author}: {pct}%")

    return results[0] if single_input else results

self_references

self_references_paper(inputs: Union[str, List[str]], verbose: bool = False) -> Union[ReferenceResult, List[ReferenceResult]] async

Analyze self-references for one or more papers by DOI or Semantic Scholar ID.

Parameters:

Name Type Description Default
inputs Union[str, List[str]]

A single DOI/SSID string or a list of them.

required
verbose bool

If True, logs detailed information for each paper.

False

Returns:

Type Description
Union[ReferenceResult, List[ReferenceResult]]

A single ReferenceResult if a string was passed, else a list of ReferenceResults.

Raises:

Type Description
ValueError

If no references are found for a given identifier.

Source code in paperscraper/citations/self_references.py
@optional_async
@retry_with_exponential_backoff(max_retries=4, base_delay=1.0)
async def self_references_paper(
    inputs: Union[str, List[str]], verbose: bool = False
) -> Union[ReferenceResult, List[ReferenceResult]]:
    """
    Analyze self-references for one or more papers by DOI or Semantic Scholar ID.

    Args:
        inputs: A single DOI/SSID string or a list of them.
        verbose: If True, logs detailed information for each paper.

    Returns:
        A single ReferenceResult if a string was passed, else a list of ReferenceResults.

    Raises:
        ValueError: If no references are found for a given identifier.
    """
    single_input = isinstance(inputs, str)
    identifiers = [inputs] if single_input else list(inputs)

    async with httpx.AsyncClient(timeout=httpx.Timeout(20)) as client:
        tasks = [_process_single_reference(client, ident) for ident in identifiers]
        results = await asyncio.gather(*tasks)

    if verbose:
        for res in results:
            logger.info(
                f'Self-references in "{res.ssid}": N={res.num_references}, '
                f"Score={res.reference_score}%"
            )
            for author, pct in res.self_references.items():
                logger.info(f"  {author}: {pct}% self-reference")

    return results[0] if single_input else results

tests

test_self_references
TestSelfReferences
Source code in paperscraper/citations/tests/test_self_references.py
class TestSelfReferences:
    @pytest.fixture
    def dois(self):
        return [
            "10.1038/s41586-023-06600-9",
            "10.1016/j.neunet.2014.09.003",
        ]

    def test_single_doi(self, dois):
        result = self_references_paper(dois[0])
        assert isinstance(result, ReferenceResult)
        assert isinstance(result.num_references, int)
        assert result.num_references > 0
        assert isinstance(result.ssid, str)
        assert isinstance(result.reference_score, float)
        assert result.reference_score > 0
        assert isinstance(result.self_references, Dict)
        for author, self_cites in result.self_references.items():
            assert isinstance(author, str)
            assert isinstance(self_cites, float)
            assert self_cites >= 0 and self_cites <= 100

    def test_multiple_dois(self, dois):
        results = self_references_paper(dois[1:])
        assert isinstance(results, list)
        assert len(results) == len(dois[1:])
        for ref_result in results:
            assert isinstance(ref_result, ReferenceResult)
            assert isinstance(ref_result.ssid, str)
            assert isinstance(ref_result.num_references, int)
            assert ref_result.num_references > 0
            assert ref_result.reference_score > 0
            assert isinstance(ref_result.reference_score, float)
            for author, self_cites in ref_result.self_references.items():
                assert isinstance(author, str)
                assert isinstance(self_cites, float)
                assert self_cites >= 0 and self_cites <= 100

    def test_compare_async_and_sync_performance(self, dois):
        """
        Compares the execution time of asynchronous and synchronous `self_references`
        for a list of DOIs.
        """

        start_time = time.perf_counter()
        async_results = self_references_paper(dois)
        async_duration = time.perf_counter() - start_time

        # Measure synchronous execution time (three independent calls)
        start_time = time.perf_counter()
        sync_results = [self_references_paper(doi) for doi in dois]

        sync_duration = time.perf_counter() - start_time

        print(f"Asynchronous execution time (batch): {async_duration:.2f} seconds")
        print(
            f"Synchronous execution time (independent calls): {sync_duration:.2f} seconds"
        )
        for a, s in zip(async_results, sync_results):
            assert a == s, f"{a} vs {s}"

        # Assert that async execution (batch) is faster or at least not slower
        assert 0.9 * async_duration <= sync_duration, (
            f"Async execution ({async_duration:.2f}s) is slower than sync execution "
            f"({sync_duration:.2f}s)"
        )
test_compare_async_and_sync_performance(dois)

Compares the execution time of asynchronous and synchronous self_references for a list of DOIs.

Source code in paperscraper/citations/tests/test_self_references.py
def test_compare_async_and_sync_performance(self, dois):
    """
    Compares the execution time of asynchronous and synchronous `self_references`
    for a list of DOIs.
    """

    start_time = time.perf_counter()
    async_results = self_references_paper(dois)
    async_duration = time.perf_counter() - start_time

    # Measure synchronous execution time (three independent calls)
    start_time = time.perf_counter()
    sync_results = [self_references_paper(doi) for doi in dois]

    sync_duration = time.perf_counter() - start_time

    print(f"Asynchronous execution time (batch): {async_duration:.2f} seconds")
    print(
        f"Synchronous execution time (independent calls): {sync_duration:.2f} seconds"
    )
    for a, s in zip(async_results, sync_results):
        assert a == s, f"{a} vs {s}"

    # Assert that async execution (batch) is faster or at least not slower
    assert 0.9 * async_duration <= sync_duration, (
        f"Async execution ({async_duration:.2f}s) is slower than sync execution "
        f"({sync_duration:.2f}s)"
    )

utils

get_doi_from_title(title: str) -> Optional[str]

Searches the DOI of a paper based on the paper title

Parameters:

Name Type Description Default
title str

Paper title

required

Returns:

Type Description
Optional[str]

DOI according to semantic scholar API

Source code in paperscraper/citations/utils.py
def get_doi_from_title(title: str) -> Optional[str]:
    """
    Searches the DOI of a paper based on the paper title

    Args:
        title: Paper title

    Returns:
        DOI according to semantic scholar API
    """
    response = requests.get(
        PAPER_URL + "search",
        params={"query": title, "fields": "externalIds", "limit": 1},
    )
    data = response.json()

    if data.get("data"):
        paper = data["data"][0]
        doi = paper.get("externalIds", {}).get("DOI")
        if doi:
            return doi
    logger.warning(f"Did not find DOI for title={title}")
get_doi_from_ssid(ssid: str, max_retries: int = 10) -> Optional[str]

Given a Semantic Scholar paper ID, returns the corresponding DOI if available.

Parameters:

Name Type Description Default
ssid str

The paper ID on Semantic Scholar.

required

Returns:

Type Description
Optional[str]

str or None: The DOI of the paper, or None if not found or in case of an error.

Source code in paperscraper/citations/utils.py
def get_doi_from_ssid(ssid: str, max_retries: int = 10) -> Optional[str]:
    """
    Given a Semantic Scholar paper ID, returns the corresponding DOI if available.

    Parameters:
      ssid (str): The paper ID on Semantic Scholar.

    Returns:
      str or None: The DOI of the paper, or None if not found or in case of an error.
    """
    logger.warning(
        "Semantic Scholar API is easily overloaded when passing SS IDs, provide DOIs to improve throughput."
    )
    attempts = 0
    for attempt in tqdm(
        range(1, max_retries + 1), desc=f"Fetching DOI for {ssid}", unit="attempt"
    ):
        # Make the GET request to Semantic Scholar.
        response = requests.get(
            f"{PAPER_URL}{ssid}", params={"fields": "externalIds", "limit": 1}
        )

        # If successful, try to extract and return the DOI.
        if response.status_code == 200:
            data = response.json()
            doi = data.get("externalIds", {}).get("DOI")
            return doi
        attempts += 1
        sleep(10)
    logger.warning(
        f"Did not find DOI for paper ID {ssid}. Code={response.status_code}, text={response.text}"
    )
get_title_and_id_from_doi(doi: str) -> Dict[str, Any]

Given a DOI, retrieves the paper's title and semantic scholar paper ID.

Parameters:

Name Type Description Default
doi str

The DOI of the paper (e.g., "10.18653/v1/N18-3011").

required

Returns:

Type Description
Dict[str, Any]

dict or None: A dictionary with keys 'title' and 'ssid'.

Source code in paperscraper/citations/utils.py
def get_title_and_id_from_doi(doi: str) -> Dict[str, Any]:
    """
    Given a DOI, retrieves the paper's title and semantic scholar paper ID.

    Parameters:
        doi (str): The DOI of the paper (e.g., "10.18653/v1/N18-3011").

    Returns:
        dict or None: A dictionary with keys 'title' and 'ssid'.
    """

    # Send the GET request to Semantic Scholar
    response = requests.get(f"{PAPER_URL}DOI:{doi}")
    if response.status_code == 200:
        data = response.json()
        return {"title": data.get("title"), "ssid": data.get("paperId")}
    logger.warning(
        f"Could not get authors & semantic scholar ID for DOI={doi}, {response.status_code}: {response.text}"
    )
author_name_to_ssaid(author_name: str) -> str

Given an author name, returns the Semantic Scholar author ID.

Parameters:

Name Type Description Default
author_name str

The full name of the author.

required

Returns:

Type Description
str

str or None: The Semantic Scholar author ID or None if no author is found.

Source code in paperscraper/citations/utils.py
def author_name_to_ssaid(author_name: str) -> str:
    """
    Given an author name, returns the Semantic Scholar author ID.

    Parameters:
        author_name (str): The full name of the author.

    Returns:
        str or None: The Semantic Scholar author ID or None if no author is found.
    """

    response = requests.get(
        AUTHOR_URL, params={"query": author_name, "fields": "name", "limit": 1}
    )
    if response.status_code == 200:
        data = response.json()
        authors = data.get("data", [])
        if authors:
            # Return the Semantic Scholar author ID from the first result.
            return authors[0].get("authorId")

    logger.error(
        f"Error in retrieving name from SS Author ID: {response.status_code} - {response.text}"
    )
determine_paper_input_type(input: str) -> Literal['ssid', 'doi', 'title']

Determines the intended input type by the user if not explicitly given (infer).

Parameters:

Name Type Description Default
input str

Either a DOI or a semantic scholar paper ID or an author name.

required

Returns:

Type Description
Literal['ssid', 'doi', 'title']

The input type

Source code in paperscraper/citations/utils.py
def determine_paper_input_type(input: str) -> Literal["ssid", "doi", "title"]:
    """
    Determines the intended input type by the user if not explicitly given (`infer`).

    Args:
        input: Either a DOI or a semantic scholar paper ID or an author name.

    Returns:
        The input type
    """
    if len(input) > 15 and " " not in input and (input.isalnum() and input.islower()):
        mode = "ssid"
    elif len(re.findall(DOI_PATTERN, input, re.IGNORECASE)) == 1:
        mode = "doi"
    else:
        logger.info(
            f"Assuming `{input}` is a paper title, since it seems neither a DOI nor a paper ID"
        )
        mode = "title"
    return mode
get_papers_for_author(ss_author_id: str) -> List[str] async

Given a Semantic Scholar author ID, returns a list of all Semantic Scholar paper IDs for that author.

Parameters:

Name Type Description Default
ss_author_id str

The Semantic Scholar author ID (e.g., "1741101").

required

Returns:

Type Description
List[str]

A list of paper IDs (as strings) authored by the given author.

Source code in paperscraper/citations/utils.py
async def get_papers_for_author(ss_author_id: str) -> List[str]:
    """
    Given a Semantic Scholar author ID, returns a list of all Semantic Scholar paper IDs for that author.

    Args:
        ss_author_id (str): The Semantic Scholar author ID (e.g., "1741101").

    Returns:
        A list of paper IDs (as strings) authored by the given author.
    """
    papers = []
    offset = 0
    limit = 100

    async with httpx.AsyncClient() as client:
        while True:
            response = await client.get(
                f"https://api.semanticscholar.org/graph/v1/author/{ss_author_id}/papers",
                params={"fields": "paperId", "offset": offset, "limit": limit},
            )
            response.raise_for_status()
            data = response.json()
            page = data.get("data", [])

            # Extract paper IDs from the current page.
            for paper in page:
                if "paperId" in paper:
                    papers.append(paper["paperId"])

            # If fewer papers were returned than the limit, we've reached the end.
            if len(page) < limit:
                break

            offset += limit

    return papers
find_matching(first: List[Dict[str, str]], second: List[Dict[str, str]]) -> List[str]

Ingests two sets of authors and returns a list of those that match (either based on name or on author ID).

Parameters:

Name Type Description Default
first List[Dict[str, str]]

First set of authors given as list of dict with two keys (authorID and name).

required
second List[Dict[str, str]]

Second set of authors given as list of dict with two same keys.

required

Returns:

Type Description
List[str]

List of names of authors in first list where a match was found.

Source code in paperscraper/citations/utils.py
def find_matching(
    first: List[Dict[str, str]], second: List[Dict[str, str]]
) -> List[str]:
    """
    Ingests two sets of authors and returns a list of those that match (either based on name
        or on author ID).

    Args:
        first: First set of authors given as list of dict with two keys (`authorID` and `name`).
        second: Second set of authors given as list of dict with two same keys.

    Returns:
        List of names of authors in first list where a match was found.
    """
    # Check which author IDs overlap
    second_names = set(map(lambda x: x["authorId"], second))
    overlap_ids = {f["name"] for f in first if f["authorId"] in second_names}

    overlap_names = {
        f["name"]
        for f in first
        if f["authorId"] not in overlap_ids
        and any([check_overlap(f["name"], s["name"]) for s in second])
    }
    return list(overlap_ids | overlap_names)
check_overlap(n1: str, n2: str) -> bool

Check whether two author names are identical. TODO: This can be made more robust

Parameters:

Name Type Description Default
n1 str

first name

required
n2 str

second name

required

Returns:

Name Type Description
bool bool

Whether names are identical.

Source code in paperscraper/citations/utils.py
def check_overlap(n1: str, n2: str) -> bool:
    """
    Check whether two author names are identical.
    TODO: This can be made more robust

    Args:
        n1: first name
        n2: second name

    Returns:
        bool: Whether names are identical.
    """
    # remove initials and check for name intersection
    s1 = {w for w in clean_name(n1).split()}
    s2 = {w for w in clean_name(n2).split()}
    return len(s2) > 0 and len(s1 | s2) == len(s1)
clean_name(s: str) -> str

Clean up a str by removing special characters.

Parameters:

Name Type Description Default
s str

Input possibly containing special symbols

required

Returns:

Type Description
str

Homogenized string.

Source code in paperscraper/citations/utils.py
def clean_name(s: str) -> str:
    """
    Clean up a str by removing special characters.

    Args:
        s: Input possibly containing special symbols

    Returns:
        Homogenized string.
    """
    return "".join(ch for ch in unidecode(s) if ch.isalpha() or ch.isspace()).lower()

get_dumps

arxiv

Dump arxiv data in JSONL format.

arxiv(start_date: Optional[str] = None, end_date: Optional[str] = None, save_path: str = save_path)

Fetches papers from arXiv based on time range, i.e., start_date and end_date. If the start_date and end_date are not provided, fetches papers from the earliest possible date to the current date. The fetched papers are stored in JSONL format.

Parameters:

Name Type Description Default
start_date str

Start date in format YYYY-MM-DD. Defaults to None.

None
end_date str

End date in format YYYY-MM-DD. Defaults to None.

None
save_path str

Path to save the JSONL dump. Defaults to save_path.

save_path
Source code in paperscraper/get_dumps/arxiv.py
def arxiv(
    start_date: Optional[str] = None,
    end_date: Optional[str] = None,
    save_path: str = save_path,
):
    """
    Fetches papers from arXiv based on time range, i.e., start_date and end_date.
    If the start_date and end_date are not provided, fetches papers from the earliest
    possible date to the current date. The fetched papers are stored in JSONL format.

    Args:
        start_date (str, optional): Start date in format YYYY-MM-DD. Defaults to None.
        end_date (str, optional): End date in format YYYY-MM-DD. Defaults to None.
        save_path (str, optional): Path to save the JSONL dump. Defaults to save_path.
    """
    # Set default dates
    EARLIEST_START = "1991-01-01"
    if start_date is None:
        start_date = EARLIEST_START
    if end_date is None:
        end_date = datetime.today().strftime("%Y-%m-%d")

    # Convert dates to datetime objects
    start_date = datetime.strptime(start_date, "%Y-%m-%d")
    end_date = datetime.strptime(end_date, "%Y-%m-%d")

    if start_date > end_date:
        raise ValueError(
            f"start_date {start_date} cannot be later than end_date {end_date}"
        )

    # Open file for writing results
    with open(save_path, "w") as fp:
        progress_bar = tqdm(total=(end_date - start_date).days + 1)

        current_date = start_date
        while current_date <= end_date:
            next_date = current_date + timedelta(days=1)
            progress_bar.set_description(
                f"Fetching {current_date.strftime('%Y-%m-%d')}"
            )

            # Format dates for query
            query = f"submittedDate:[{current_date.strftime('%Y%m%d0000')} TO {next_date.strftime('%Y%m%d0000')}]"
            try:
                papers = get_arxiv_papers_api(
                    query=query,
                    fields=["title", "authors", "date", "abstract", "journal", "doi"],
                    verbose=False,
                )
                if not papers.empty:
                    for paper in papers.to_dict(orient="records"):
                        fp.write(json.dumps(paper) + "\n")
            except Exception as e:
                print(f"Arxiv scraping error: {current_date.strftime('%Y-%m-%d')}: {e}")
            current_date = next_date
            progress_bar.update(1)

biorxiv

Dump bioRxiv data in JSONL format.

biorxiv(start_date: Optional[str] = None, end_date: Optional[str] = None, save_path: str = save_path, max_retries: int = 10)

Fetches papers from biorxiv based on time range, i.e., start_date and end_date. If the start_date and end_date are not provided, papers will be fetched from biorxiv from the launch date of biorxiv until the current date. The fetched papers will be stored in jsonl format in save_path.

Parameters:

Name Type Description Default
start_date str

begin date expressed as YYYY-MM-DD. Defaults to None, i.e., earliest possible.

None
end_date str

end date expressed as YYYY-MM-DD. Defaults to None, i.e., today.

None
save_path str

Path where the dump is stored. Defaults to save_path.

save_path
max_retries int

Number of retries when API shows connection issues. Defaults to 10.

10
Source code in paperscraper/get_dumps/biorxiv.py
def biorxiv(
    start_date: Optional[str] = None,
    end_date: Optional[str] = None,
    save_path: str = save_path,
    max_retries: int = 10,
):
    """Fetches papers from biorxiv based on time range, i.e., start_date and end_date.
    If the start_date and end_date are not provided, papers will be fetched from biorxiv
    from the launch date of biorxiv until the current date. The fetched papers will be
    stored in jsonl format in save_path.

    Args:
        start_date (str, optional): begin date expressed as YYYY-MM-DD.
            Defaults to None, i.e., earliest possible.
        end_date (str, optional): end date expressed as YYYY-MM-DD.
            Defaults to None, i.e., today.
        save_path (str, optional): Path where the dump is stored.
            Defaults to save_path.
        max_retries (int, optional): Number of retries when API shows connection issues.
            Defaults to 10.
    """
    # create API client
    api = BioRxivApi(max_retries=max_retries)

    # dump all papers
    with open(save_path, "w") as fp:
        for index, paper in enumerate(
            tqdm(api.get_papers(start_date=start_date, end_date=end_date))
        ):
            if index > 0:
                fp.write(os.linesep)
            fp.write(json.dumps(paper))

chemrxiv

Dump chemRxiv data in JSONL format.

chemrxiv(start_date: Optional[str] = None, end_date: Optional[str] = None, save_path: str = save_path) -> None

Fetches papers from bichemrxiv based on time range, i.e., start_date and end_date. If the start_date and end_date are not provided, papers will be fetched from chemrxiv from the launch date of chemrxiv until the current date. The fetched papers will be stored in jsonl format in save_path.

Parameters:

Name Type Description Default
start_date str

begin date expressed as YYYY-MM-DD. Defaults to None, i.e., earliest possible.

None
end_date str

end date expressed as YYYY-MM-DD. Defaults to None, i.e., today.

None
save_path str

Path where the dump is stored. Defaults to save_path.

save_path
Source code in paperscraper/get_dumps/chemrxiv.py
def chemrxiv(
    start_date: Optional[str] = None,
    end_date: Optional[str] = None,
    save_path: str = save_path,
) -> None:
    """Fetches papers from bichemrxiv based on time range, i.e., start_date and end_date.
    If the start_date and end_date are not provided, papers will be fetched from chemrxiv
    from the launch date of chemrxiv until the current date. The fetched papers will be
    stored in jsonl format in save_path.

    Args:
        start_date (str, optional): begin date expressed as YYYY-MM-DD.
            Defaults to None, i.e., earliest possible.
        end_date (str, optional): end date expressed as YYYY-MM-DD.
            Defaults to None, i.e., today.
        save_path (str, optional): Path where the dump is stored.
            Defaults to save_path.
    """

    # create API client
    api = ChemrxivAPI(start_date, end_date)
    # Download the data
    download_full(save_folder, api)
    # Convert to JSONL format.
    parse_dump(save_folder, save_path)

medrxiv

Dump medrxiv data in JSONL format.

medrxiv(start_date: Optional[str] = None, end_date: Optional[str] = None, save_path: str = save_path, max_retries: int = 10)

Fetches papers from medrxiv based on time range, i.e., start_date and end_date. If the start_date and end_date are not provided, then papers will be fetched from medrxiv starting from the launch date of medrxiv until current date. The fetched papers will be stored in jsonl format in save_path.

Parameters:

Name Type Description Default
start_date str

begin date expressed as YYYY-MM-DD. Defaults to None, i.e., earliest possible.

None
end_date str

end date expressed as YYYY-MM-DD. Defaults to None, i.e., today.

None
save_path str

Path where the dump is stored. Defaults to save_path.

save_path
max_retries int

Number of retries when API shows connection issues. Defaults to 10.

10
Source code in paperscraper/get_dumps/medrxiv.py
def medrxiv(
    start_date: Optional[str] = None,
    end_date: Optional[str] = None,
    save_path: str = save_path,
    max_retries: int = 10,
):
    """Fetches papers from medrxiv based on time range, i.e., start_date and end_date.
    If the start_date and end_date are not provided, then papers will be fetched from
    medrxiv starting from the launch date of medrxiv until current date. The fetched
    papers will be stored in jsonl format in save_path.

    Args:
        start_date (str, optional): begin date expressed as YYYY-MM-DD.
            Defaults to None, i.e., earliest possible.
        end_date (str, optional): end date expressed as YYYY-MM-DD.
            Defaults to None, i.e., today.
        save_path (str, optional): Path where the dump is stored.
            Defaults to save_path.
        max_retries (int, optional): Number of retries when API shows connection issues.
            Defaults to 10.
    """
    # create API client
    api = MedRxivApi(max_retries=max_retries)
    # dump all papers
    with open(save_path, "w") as fp:
        for index, paper in enumerate(
            tqdm(api.get_papers(start_date=start_date, end_date=end_date))
        ):
            if index > 0:
                fp.write(os.linesep)
            fp.write(json.dumps(paper))

utils

chemrxiv
get_author(author_list: List[Dict]) -> str

Parse ChemRxiv dump entry to extract author list

Parameters:

Name Type Description Default
author_list list

List of dicts, one per author.

required

Returns:

Name Type Description
str str

;-concatenated author list.

Source code in paperscraper/get_dumps/utils/chemrxiv/utils.py
def get_author(author_list: List[Dict]) -> str:
    """Parse ChemRxiv dump entry to extract author list

    Args:
        author_list (list): List of dicts, one per author.

    Returns:
        str: ;-concatenated author list.
    """

    return "; ".join([" ".join([a["firstName"], a["lastName"]]) for a in author_list])
get_categories(category_list: List[Dict]) -> str

Parse ChemRxiv dump entry to extract the categories of the paper

Parameters:

Name Type Description Default
category_list list

List of dicts, one per category.

required

Returns:

Name Type Description
str str

;-concatenated category list.

Source code in paperscraper/get_dumps/utils/chemrxiv/utils.py
def get_categories(category_list: List[Dict]) -> str:
    """Parse ChemRxiv dump entry to extract the categories of the paper

    Args:
        category_list (list): List of dicts, one per category.

    Returns:
        str: ;-concatenated category list.
    """

    return "; ".join([a["name"] for a in category_list])
get_date(datestring: str) -> str

Get the date of a chemrxiv dump enry.

Parameters:

Name Type Description Default
datestring str

String in the format: 2021-10-15T05:12:32.356Z

required

Returns:

Name Type Description
str str

Date in the format: YYYY-MM-DD.

Source code in paperscraper/get_dumps/utils/chemrxiv/utils.py
def get_date(datestring: str) -> str:
    """Get the date of a chemrxiv dump enry.

    Args:
        datestring: String in the format: 2021-10-15T05:12:32.356Z

    Returns:
        str: Date in the format: YYYY-MM-DD.
    """
    return datestring.split("T")[0]
get_metrics(metrics_list: List[Dict]) -> Dict

Parse ChemRxiv dump entry to extract the access metrics of the paper.

Parameters:

Name Type Description Default
metrics_list List[Dict]

A list of single-keyed, dictionaries each containing key and value for exactly one metric.

required

Returns:

Name Type Description
Dict Dict

A flattened dictionary with all metrics and a timestamp

Source code in paperscraper/get_dumps/utils/chemrxiv/utils.py
def get_metrics(metrics_list: List[Dict]) -> Dict:
    """
    Parse ChemRxiv dump entry to extract the access metrics of the paper.

    Args:
        metrics_list (List[Dict]): A list of single-keyed, dictionaries each
            containing key and value for exactly one metric.

    Returns:
        Dict: A flattened dictionary with all metrics and a timestamp
    """
    metric_dict = {m["description"]: m["value"] for m in metrics_list}

    # This assumes that the .jsonl is constructed at roughly the same date
    # where this entry was obtained from the API
    metric_dict.update({"timestamp": today})
parse_dump(source_path: str, target_path: str) -> None

Parses the dump as generated by the chemrXiv API and this repo: https://github.com/cthoyt/chemrxiv-summarize into a format that is equal to that of biorXiv and medRxiv.

NOTE: This is a lazy parser trying to store all data in memory.

Parameters:

Name Type Description Default
source_path str

Path to the source dump

required
Source code in paperscraper/get_dumps/utils/chemrxiv/utils.py
def parse_dump(source_path: str, target_path: str) -> None:
    """
    Parses the dump as generated by the chemrXiv API and this repo:
    https://github.com/cthoyt/chemrxiv-summarize
    into a format that is equal to that of biorXiv and medRxiv.

    NOTE: This is a lazy parser trying to store all data in memory.

    Args:
        source_path: Path to the source dump
    """

    dump = []
    # Read source dump
    for file_name in tqdm(os.listdir(source_path)):
        if not file_name.endswith(".json"):
            continue
        filepath = os.path.join(source_path, file_name)
        with open(filepath, "r") as f:
            source_paper = json.load(f)

        target_paper = {
            "title": source_paper["title"],
            "doi": source_paper["doi"],
            "published_doi": (
                source_paper["vor"]["vorDoi"] if source_paper["vor"] else "N.A."
            ),
            "published_url": (
                source_paper["vor"]["url"] if source_paper["vor"] else "N.A."
            ),
            "authors": get_author(source_paper["authors"]),
            "abstract": source_paper["abstract"],
            "date": get_date(source_paper["statusDate"]),
            "journal": "chemRxiv",
            "categories": get_categories(source_paper["categories"]),
            "metrics": get_metrics(source_paper["metrics"]),
            "license": source_paper["license"]["name"],
        }
        dump.append(target_paper)
        os.remove(filepath)
    # Write dump
    with open(target_path, "w") as f:
        for idx, target_paper in enumerate(dump):
            if idx > 0:
                f.write(os.linesep)
            f.write(json.dumps(target_paper))
    logger.info("Done, shutting down")
chemrxiv_api
ChemrxivAPI

Handle OpenEngage API requests, using access. Adapted from https://github.com/fxcoudert/tools/blob/master/chemRxiv/chemRxiv.py.

Source code in paperscraper/get_dumps/utils/chemrxiv/chemrxiv_api.py
class ChemrxivAPI:
    """Handle OpenEngage API requests, using access.
    Adapted from https://github.com/fxcoudert/tools/blob/master/chemRxiv/chemRxiv.py.
    """

    base = "https://chemrxiv.org/engage/chemrxiv/public-api/v1/"

    def __init__(
        self,
        start_date: Optional[str] = None,
        end_date: Optional[str] = None,
        page_size: Optional[int] = None,
        max_retries: int = 10,
    ):
        """
        Initialize API class.

        Args:
            start_date (Optional[str], optional): begin date expressed as YYYY-MM-DD.
                Defaults to None.
            end_date (Optional[str], optional): end date expressed as YYYY-MM-DD.
                Defaults to None.
            page_size (int, optional): The batch size used to fetch the records from chemrxiv.
            max_retries (int): Number of retries in case of error
        """

        self.page_size = page_size or 50
        self.max_retries = max_retries

        # Begin Date and End Date of the search
        launch_date = launch_dates["chemrxiv"]
        launch_datetime = datetime.fromisoformat(launch_date)

        if start_date:
            start_datetime = datetime.fromisoformat(start_date)
            if start_datetime < launch_datetime:
                self.start_date = launch_date
                logger.warning(
                    f"Begin date {start_date} is before chemrxiv launch date. Will use {launch_date} instead."
                )
            else:
                self.start_date = start_date
        else:
            self.start_date = launch_date
        if end_date:
            end_datetime = datetime.fromisoformat(end_date)
            if end_datetime > now_datetime:
                logger.warning(
                    f"End date {end_date} is in the future. Will use {now_datetime} instead."
                )
                self.end_date = now_datetime.strftime("%Y-%m-%d")
            else:
                self.end_date = end_date
        else:
            self.end_date = now_datetime.strftime("%Y-%m-%d")

    def request(self, url, method, params=None):
        """Send an API request to open Engage."""

        for attempt in range(self.max_retries):
            try:
                if method.casefold() == "get":
                    return requests.get(url, params=params, timeout=10)
                elif method.casefold() == "post":
                    return requests.post(url, json=params, timeout=10)
                else:
                    raise ConnectionError(f"Unknown method for query: {method}")
            except ChunkedEncodingError as e:
                logger.warning(f"ChunkedEncodingError occurred for {url}: {e}")
                if attempt + 1 == self.max_retries:
                    raise e
                time.sleep(3)

    def query(self, query, method="get", params=None):
        """Perform a direct query."""

        r = self.request(urljoin(self.base, query), method, params=params)
        r.raise_for_status()
        return r.json()

    def query_generator(self, query, method: str = "get", params: Dict = {}):
        """Query for a list of items, with paging. Returns a generator."""

        try:
            total = self.number_of_preprints()
        except Exception:
            total = float("inf")   # fallback if that call fails

        page = 0
        while True:
            params.update(
                {
                    "limit": self.page_size,
                    "skip": page * self.page_size,
                    "searchDateFrom": self.start_date,
                    "searchDateTo": self.end_date,
                }
            )
            if page * self.page_size > total:
                break
            r = self.request(urljoin(self.base, query), method, params=params)
            if r.status_code == 400:
                raise ValueError(r.json()["message"])
            r.raise_for_status()
            r = r.json()
            r = r["itemHits"]

            # If we have no more results, bail out
            if len(r) == 0:
                return

            yield from r
            page += 1

    def all_preprints(self):
        """Return a generator to all the chemRxiv articles."""
        return self.query_generator("items")

    def preprint(self, article_id):
        """Information on a given preprint.
        .. seealso:: https://docs.figshare.com/#public_article
        """
        return self.query(os.path.join("items", article_id))

    def number_of_preprints(self):
        return self.query("items")["totalCount"]
__init__(start_date: Optional[str] = None, end_date: Optional[str] = None, page_size: Optional[int] = None, max_retries: int = 10)

Initialize API class.

Parameters:

Name Type Description Default
start_date Optional[str]

begin date expressed as YYYY-MM-DD. Defaults to None.

None
end_date Optional[str]

end date expressed as YYYY-MM-DD. Defaults to None.

None
page_size int

The batch size used to fetch the records from chemrxiv.

None
max_retries int

Number of retries in case of error

10
Source code in paperscraper/get_dumps/utils/chemrxiv/chemrxiv_api.py
def __init__(
    self,
    start_date: Optional[str] = None,
    end_date: Optional[str] = None,
    page_size: Optional[int] = None,
    max_retries: int = 10,
):
    """
    Initialize API class.

    Args:
        start_date (Optional[str], optional): begin date expressed as YYYY-MM-DD.
            Defaults to None.
        end_date (Optional[str], optional): end date expressed as YYYY-MM-DD.
            Defaults to None.
        page_size (int, optional): The batch size used to fetch the records from chemrxiv.
        max_retries (int): Number of retries in case of error
    """

    self.page_size = page_size or 50
    self.max_retries = max_retries

    # Begin Date and End Date of the search
    launch_date = launch_dates["chemrxiv"]
    launch_datetime = datetime.fromisoformat(launch_date)

    if start_date:
        start_datetime = datetime.fromisoformat(start_date)
        if start_datetime < launch_datetime:
            self.start_date = launch_date
            logger.warning(
                f"Begin date {start_date} is before chemrxiv launch date. Will use {launch_date} instead."
            )
        else:
            self.start_date = start_date
    else:
        self.start_date = launch_date
    if end_date:
        end_datetime = datetime.fromisoformat(end_date)
        if end_datetime > now_datetime:
            logger.warning(
                f"End date {end_date} is in the future. Will use {now_datetime} instead."
            )
            self.end_date = now_datetime.strftime("%Y-%m-%d")
        else:
            self.end_date = end_date
    else:
        self.end_date = now_datetime.strftime("%Y-%m-%d")
request(url, method, params=None)

Send an API request to open Engage.

Source code in paperscraper/get_dumps/utils/chemrxiv/chemrxiv_api.py
def request(self, url, method, params=None):
    """Send an API request to open Engage."""

    for attempt in range(self.max_retries):
        try:
            if method.casefold() == "get":
                return requests.get(url, params=params, timeout=10)
            elif method.casefold() == "post":
                return requests.post(url, json=params, timeout=10)
            else:
                raise ConnectionError(f"Unknown method for query: {method}")
        except ChunkedEncodingError as e:
            logger.warning(f"ChunkedEncodingError occurred for {url}: {e}")
            if attempt + 1 == self.max_retries:
                raise e
            time.sleep(3)
query(query, method='get', params=None)

Perform a direct query.

Source code in paperscraper/get_dumps/utils/chemrxiv/chemrxiv_api.py
def query(self, query, method="get", params=None):
    """Perform a direct query."""

    r = self.request(urljoin(self.base, query), method, params=params)
    r.raise_for_status()
    return r.json()
query_generator(query, method: str = 'get', params: Dict = {})

Query for a list of items, with paging. Returns a generator.

Source code in paperscraper/get_dumps/utils/chemrxiv/chemrxiv_api.py
def query_generator(self, query, method: str = "get", params: Dict = {}):
    """Query for a list of items, with paging. Returns a generator."""

    try:
        total = self.number_of_preprints()
    except Exception:
        total = float("inf")   # fallback if that call fails

    page = 0
    while True:
        params.update(
            {
                "limit": self.page_size,
                "skip": page * self.page_size,
                "searchDateFrom": self.start_date,
                "searchDateTo": self.end_date,
            }
        )
        if page * self.page_size > total:
            break
        r = self.request(urljoin(self.base, query), method, params=params)
        if r.status_code == 400:
            raise ValueError(r.json()["message"])
        r.raise_for_status()
        r = r.json()
        r = r["itemHits"]

        # If we have no more results, bail out
        if len(r) == 0:
            return

        yield from r
        page += 1
all_preprints()

Return a generator to all the chemRxiv articles.

Source code in paperscraper/get_dumps/utils/chemrxiv/chemrxiv_api.py
def all_preprints(self):
    """Return a generator to all the chemRxiv articles."""
    return self.query_generator("items")
preprint(article_id)

Information on a given preprint. .. seealso:: https://docs.figshare.com/#public_article

Source code in paperscraper/get_dumps/utils/chemrxiv/chemrxiv_api.py
def preprint(self, article_id):
    """Information on a given preprint.
    .. seealso:: https://docs.figshare.com/#public_article
    """
    return self.query(os.path.join("items", article_id))
utils

Misc utils to download chemRxiv dump

get_author(author_list: List[Dict]) -> str

Parse ChemRxiv dump entry to extract author list

Parameters:

Name Type Description Default
author_list list

List of dicts, one per author.

required

Returns:

Name Type Description
str str

;-concatenated author list.

Source code in paperscraper/get_dumps/utils/chemrxiv/utils.py
def get_author(author_list: List[Dict]) -> str:
    """Parse ChemRxiv dump entry to extract author list

    Args:
        author_list (list): List of dicts, one per author.

    Returns:
        str: ;-concatenated author list.
    """

    return "; ".join([" ".join([a["firstName"], a["lastName"]]) for a in author_list])
get_categories(category_list: List[Dict]) -> str

Parse ChemRxiv dump entry to extract the categories of the paper

Parameters:

Name Type Description Default
category_list list

List of dicts, one per category.

required

Returns:

Name Type Description
str str

;-concatenated category list.

Source code in paperscraper/get_dumps/utils/chemrxiv/utils.py
def get_categories(category_list: List[Dict]) -> str:
    """Parse ChemRxiv dump entry to extract the categories of the paper

    Args:
        category_list (list): List of dicts, one per category.

    Returns:
        str: ;-concatenated category list.
    """

    return "; ".join([a["name"] for a in category_list])
get_date(datestring: str) -> str

Get the date of a chemrxiv dump enry.

Parameters:

Name Type Description Default
datestring str

String in the format: 2021-10-15T05:12:32.356Z

required

Returns:

Name Type Description
str str

Date in the format: YYYY-MM-DD.

Source code in paperscraper/get_dumps/utils/chemrxiv/utils.py
def get_date(datestring: str) -> str:
    """Get the date of a chemrxiv dump enry.

    Args:
        datestring: String in the format: 2021-10-15T05:12:32.356Z

    Returns:
        str: Date in the format: YYYY-MM-DD.
    """
    return datestring.split("T")[0]
get_metrics(metrics_list: List[Dict]) -> Dict

Parse ChemRxiv dump entry to extract the access metrics of the paper.

Parameters:

Name Type Description Default
metrics_list List[Dict]

A list of single-keyed, dictionaries each containing key and value for exactly one metric.

required

Returns:

Name Type Description
Dict Dict

A flattened dictionary with all metrics and a timestamp

Source code in paperscraper/get_dumps/utils/chemrxiv/utils.py
def get_metrics(metrics_list: List[Dict]) -> Dict:
    """
    Parse ChemRxiv dump entry to extract the access metrics of the paper.

    Args:
        metrics_list (List[Dict]): A list of single-keyed, dictionaries each
            containing key and value for exactly one metric.

    Returns:
        Dict: A flattened dictionary with all metrics and a timestamp
    """
    metric_dict = {m["description"]: m["value"] for m in metrics_list}

    # This assumes that the .jsonl is constructed at roughly the same date
    # where this entry was obtained from the API
    metric_dict.update({"timestamp": today})
parse_dump(source_path: str, target_path: str) -> None

Parses the dump as generated by the chemrXiv API and this repo: https://github.com/cthoyt/chemrxiv-summarize into a format that is equal to that of biorXiv and medRxiv.

NOTE: This is a lazy parser trying to store all data in memory.

Parameters:

Name Type Description Default
source_path str

Path to the source dump

required
Source code in paperscraper/get_dumps/utils/chemrxiv/utils.py
def parse_dump(source_path: str, target_path: str) -> None:
    """
    Parses the dump as generated by the chemrXiv API and this repo:
    https://github.com/cthoyt/chemrxiv-summarize
    into a format that is equal to that of biorXiv and medRxiv.

    NOTE: This is a lazy parser trying to store all data in memory.

    Args:
        source_path: Path to the source dump
    """

    dump = []
    # Read source dump
    for file_name in tqdm(os.listdir(source_path)):
        if not file_name.endswith(".json"):
            continue
        filepath = os.path.join(source_path, file_name)
        with open(filepath, "r") as f:
            source_paper = json.load(f)

        target_paper = {
            "title": source_paper["title"],
            "doi": source_paper["doi"],
            "published_doi": (
                source_paper["vor"]["vorDoi"] if source_paper["vor"] else "N.A."
            ),
            "published_url": (
                source_paper["vor"]["url"] if source_paper["vor"] else "N.A."
            ),
            "authors": get_author(source_paper["authors"]),
            "abstract": source_paper["abstract"],
            "date": get_date(source_paper["statusDate"]),
            "journal": "chemRxiv",
            "categories": get_categories(source_paper["categories"]),
            "metrics": get_metrics(source_paper["metrics"]),
            "license": source_paper["license"]["name"],
        }
        dump.append(target_paper)
        os.remove(filepath)
    # Write dump
    with open(target_path, "w") as f:
        for idx, target_paper in enumerate(dump):
            if idx > 0:
                f.write(os.linesep)
            f.write(json.dumps(target_paper))
    logger.info("Done, shutting down")

impact

Impactor

Source code in paperscraper/impact.py
class Impactor:
    def __init__(self):
        """
        Initialize the Impactor class with an instance of the Factor class.
        This allows access to the database of journal impact factors.
        """
        self.fa = Factor()
        self.all_journals = self.fa.search("%")
        self.metadata = pd.DataFrame(self.all_journals, dtype=str)
        logger.info(f"Loaded metadata for {len(self.metadata)} journals")

    def search(
        self,
        query: str,
        threshold: int = 100,
        sort_by: Optional[str] = None,
        min_impact: float = 0.0,
        max_impact: float = float("inf"),
        return_all: bool = False,
    ) -> List[Dict[str, Any]]:
        """
        Search for journals matching the given query with an optional fuzziness
            level and sorting.

        Args:
            query: The journal name or abbreviation to search for.
            threshold: The threshold for fuzzy matching. If set to 100, exact matching
                is performed. If set below 100, fuzzy matching is used. Defaults to 100.
            sort_by: Criterion for sorting results, one of 'impact', 'journal' and 'score'.
            min_impact: Minimum impact factor for journals to be considered, defaults to 0.
            max_impact: Maximum impact factor for journals to be considered, defaults to infinity.
            return_all: If True, returns all columns of the DataFrame for each match.

        Returns:
            List[dict]: A list of dictionaries containing the journal information.

        """
        # Validation of parameters
        if not isinstance(query, str) or not isinstance(threshold, int):
            raise TypeError(
                f"Query must be a str and threshold must be an int, not {type(query)} and {type(threshold)}"
            )
        if threshold < 0 or threshold > 100:
            raise ValueError(
                f"Fuzziness threshold must be between 0 and 100, not {threshold}"
            )

        if str.isdigit(query) and threshold >= 100:
            # When querying with NLM ID, exact matching does not work since impact_factor
            # strips off leading zeros, so we use fuzzy matching instead
            threshold = 99

        # Define a function to calculate fuzziness score
        def calculate_fuzziness_score(row):
            return max(fuzz.partial_ratio(query, str(value)) for value in row.values)

        # Search with or without fuzzy matching
        if threshold >= 100:
            matched_df = self.metadata[
                self.metadata.apply(
                    lambda x: query.lower() in x.astype(str).str.lower().values, axis=1
                )
            ].copy()
            # Exact matches get a default score of 100
            matched_df["score"] = 100
        else:
            matched_df = self.metadata[
                self.metadata.apply(
                    lambda x: calculate_fuzziness_score(x) >= threshold, axis=1
                )
            ].copy()
            matched_df["score"] = matched_df.apply(calculate_fuzziness_score, axis=1)

        # Sorting based on the specified criterion
        if sort_by == "score":
            matched_df = matched_df.sort_values(by="score", ascending=False)
        elif sort_by == "journal":
            matched_df = matched_df.sort_values(by="journal")
        elif sort_by == "impact":
            matched_df = matched_df.sort_values(by="factor", ascending=False)

        matched_df["factor"] = pd.to_numeric(matched_df["factor"])
        matched_df = matched_df[
            (matched_df["factor"] >= min_impact) & (matched_df["factor"] <= max_impact)
        ]

        # Prepare the final result
        results = [
            (
                row.to_dict()
                if return_all
                else {
                    "journal": row["journal"],
                    "factor": row["factor"],
                    "score": row["score"],
                }
            )
            for _, row in matched_df.iterrows()
        ]

        return results
__init__()

Initialize the Impactor class with an instance of the Factor class. This allows access to the database of journal impact factors.

Source code in paperscraper/impact.py
def __init__(self):
    """
    Initialize the Impactor class with an instance of the Factor class.
    This allows access to the database of journal impact factors.
    """
    self.fa = Factor()
    self.all_journals = self.fa.search("%")
    self.metadata = pd.DataFrame(self.all_journals, dtype=str)
    logger.info(f"Loaded metadata for {len(self.metadata)} journals")
search(query: str, threshold: int = 100, sort_by: Optional[str] = None, min_impact: float = 0.0, max_impact: float = float('inf'), return_all: bool = False) -> List[Dict[str, Any]]

Search for journals matching the given query with an optional fuzziness level and sorting.

Parameters:

Name Type Description Default
query str

The journal name or abbreviation to search for.

required
threshold int

The threshold for fuzzy matching. If set to 100, exact matching is performed. If set below 100, fuzzy matching is used. Defaults to 100.

100
sort_by Optional[str]

Criterion for sorting results, one of 'impact', 'journal' and 'score'.

None
min_impact float

Minimum impact factor for journals to be considered, defaults to 0.

0.0
max_impact float

Maximum impact factor for journals to be considered, defaults to infinity.

float('inf')
return_all bool

If True, returns all columns of the DataFrame for each match.

False

Returns:

Type Description
List[Dict[str, Any]]

List[dict]: A list of dictionaries containing the journal information.

Source code in paperscraper/impact.py
def search(
    self,
    query: str,
    threshold: int = 100,
    sort_by: Optional[str] = None,
    min_impact: float = 0.0,
    max_impact: float = float("inf"),
    return_all: bool = False,
) -> List[Dict[str, Any]]:
    """
    Search for journals matching the given query with an optional fuzziness
        level and sorting.

    Args:
        query: The journal name or abbreviation to search for.
        threshold: The threshold for fuzzy matching. If set to 100, exact matching
            is performed. If set below 100, fuzzy matching is used. Defaults to 100.
        sort_by: Criterion for sorting results, one of 'impact', 'journal' and 'score'.
        min_impact: Minimum impact factor for journals to be considered, defaults to 0.
        max_impact: Maximum impact factor for journals to be considered, defaults to infinity.
        return_all: If True, returns all columns of the DataFrame for each match.

    Returns:
        List[dict]: A list of dictionaries containing the journal information.

    """
    # Validation of parameters
    if not isinstance(query, str) or not isinstance(threshold, int):
        raise TypeError(
            f"Query must be a str and threshold must be an int, not {type(query)} and {type(threshold)}"
        )
    if threshold < 0 or threshold > 100:
        raise ValueError(
            f"Fuzziness threshold must be between 0 and 100, not {threshold}"
        )

    if str.isdigit(query) and threshold >= 100:
        # When querying with NLM ID, exact matching does not work since impact_factor
        # strips off leading zeros, so we use fuzzy matching instead
        threshold = 99

    # Define a function to calculate fuzziness score
    def calculate_fuzziness_score(row):
        return max(fuzz.partial_ratio(query, str(value)) for value in row.values)

    # Search with or without fuzzy matching
    if threshold >= 100:
        matched_df = self.metadata[
            self.metadata.apply(
                lambda x: query.lower() in x.astype(str).str.lower().values, axis=1
            )
        ].copy()
        # Exact matches get a default score of 100
        matched_df["score"] = 100
    else:
        matched_df = self.metadata[
            self.metadata.apply(
                lambda x: calculate_fuzziness_score(x) >= threshold, axis=1
            )
        ].copy()
        matched_df["score"] = matched_df.apply(calculate_fuzziness_score, axis=1)

    # Sorting based on the specified criterion
    if sort_by == "score":
        matched_df = matched_df.sort_values(by="score", ascending=False)
    elif sort_by == "journal":
        matched_df = matched_df.sort_values(by="journal")
    elif sort_by == "impact":
        matched_df = matched_df.sort_values(by="factor", ascending=False)

    matched_df["factor"] = pd.to_numeric(matched_df["factor"])
    matched_df = matched_df[
        (matched_df["factor"] >= min_impact) & (matched_df["factor"] <= max_impact)
    ]

    # Prepare the final result
    results = [
        (
            row.to_dict()
            if return_all
            else {
                "journal": row["journal"],
                "factor": row["factor"],
                "score": row["score"],
            }
        )
        for _, row in matched_df.iterrows()
    ]

    return results

pdf

fallbacks

Functionalities to scrape PDF files of publications.

fallback_wiley_api(paper_metadata: Dict[str, Any], output_path: Path, api_keys: Dict[str, str], max_attempts: int = 2) -> bool

Attempt to download the PDF via the Wiley TDM API (popular publisher which blocks standard scraping attempts; API access free for academic users).

This function uses the WILEY_TDM_API_TOKEN environment variable to authenticate with the Wiley TDM API and attempts to download the PDF for the given paper. See https://onlinelibrary.wiley.com/library-info/resources/text-and-datamining for a description on how to get your WILEY_TDM_API_TOKEN.

Parameters:

Name Type Description Default
paper_metadata dict

Dictionary containing paper metadata. Must include the 'doi' key.

required
output_path Path

A pathlib.Path object representing the path where the PDF will be saved.

required
api_keys dict

Preloaded API keys.

required
max_attempts int

The maximum number of attempts to retry API call.

2

Returns:

Name Type Description
bool bool

True if the PDF file was successfully downloaded, False otherwise.

Source code in paperscraper/pdf/fallbacks.py
def fallback_wiley_api(
    paper_metadata: Dict[str, Any],
    output_path: Path,
    api_keys: Dict[str, str],
    max_attempts: int = 2,
) -> bool:
    """
    Attempt to download the PDF via the Wiley TDM API (popular publisher which blocks standard scraping attempts; API access free for academic users).

    This function uses the WILEY_TDM_API_TOKEN environment variable to authenticate
    with the Wiley TDM API and attempts to download the PDF for the given paper.
    See https://onlinelibrary.wiley.com/library-info/resources/text-and-datamining for a description on how to get your WILEY_TDM_API_TOKEN.

    Args:
        paper_metadata (dict): Dictionary containing paper metadata. Must include the 'doi' key.
        output_path (Path): A pathlib.Path object representing the path where the PDF will be saved.
        api_keys (dict): Preloaded API keys.
        max_attempts (int): The maximum number of attempts to retry API call.

    Returns:
        bool: True if the PDF file was successfully downloaded, False otherwise.
    """

    WILEY_TDM_API_TOKEN = api_keys.get("WILEY_TDM_API_TOKEN")
    encoded_doi = paper_metadata["doi"].replace("/", "%2F")
    api_url = f"https://api.wiley.com/onlinelibrary/tdm/v1/articles/{encoded_doi}"
    headers = {"Wiley-TDM-Client-Token": WILEY_TDM_API_TOKEN}

    attempt = 0
    success = False

    while attempt < max_attempts:
        try:
            api_response = requests.get(
                api_url, headers=headers, allow_redirects=True, timeout=60
            )
            api_response.raise_for_status()
            if api_response.content[:4] != b"%PDF":
                logger.warning(
                    f"API returned content that is not a valid PDF for {paper_metadata['doi']}."
                )
            else:
                with open(output_path.with_suffix(".pdf"), "wb+") as f:
                    f.write(api_response.content)
                logger.info(
                    f"Successfully downloaded PDF via Wiley API for {paper_metadata['doi']}."
                )
                success = True
                break
        except Exception as e2:
            if attempt < max_attempts - 1:
                logger.info("Waiting 20 seconds before retrying...")
                time.sleep(20)
            logger.error(
                f"Could not download via Wiley API (attempt {attempt + 1}/{max_attempts}): {e2}"
            )

        attempt += 1

    # **Mandatory delay of 10 seconds to comply with Wiley API rate limits**
    logger.info(
        "Waiting 10 seconds before next request to comply with Wiley API rate limits..."
    )
    time.sleep(10)
    return success
fallback_bioc_pmc(doi: str, output_path: Path) -> bool

Attempt to download the XML via the BioC-PMC fallback.

This function first converts a given DOI to a PMCID using the NCBI ID Converter API. If a PMCID is found, it constructs the corresponding PMC XML URL and attempts to download the full-text XML.

PubMed Central® (PMC) is a free full-text archive of biomedical and life sciences journal literature at the U.S. National Institutes of Health's National Library of Medicine (NIH/NLM).

Parameters:

Name Type Description Default
doi str

The DOI of the paper to retrieve.

required
output_path Path

A pathlib.Path object representing the path where the XML file will be saved.

required

Returns:

Name Type Description
bool bool

True if the XML file was successfully downloaded, False otherwise.

Source code in paperscraper/pdf/fallbacks.py
def fallback_bioc_pmc(doi: str, output_path: Path) -> bool:
    """
    Attempt to download the XML via the BioC-PMC fallback.

    This function first converts a given DOI to a PMCID using the NCBI ID Converter API.
    If a PMCID is found, it constructs the corresponding PMC XML URL and attempts to
    download the full-text XML.

    PubMed Central® (PMC) is a free full-text archive of biomedical and life sciences
    journal literature at the U.S. National Institutes of Health's National Library of Medicine (NIH/NLM).

    Args:
        doi (str): The DOI of the paper to retrieve.
        output_path (Path): A pathlib.Path object representing the path where the XML file will be saved.

    Returns:
        bool: True if the XML file was successfully downloaded, False otherwise.
    """
    ncbi_tool = "paperscraper"
    ncbi_email = "your_email@example.com"

    converter_url = "https://www.ncbi.nlm.nih.gov/pmc/utils/idconv/v1.0/"
    params = {
        "tool": ncbi_tool,
        "email": ncbi_email,
        "ids": doi,
        "idtype": "doi",
        "format": "json",
    }
    try:
        conv_response = requests.get(converter_url, params=params, timeout=60)
        conv_response.raise_for_status()
        data = conv_response.json()
        records = data.get("records", [])
        if not records or "pmcid" not in records[0]:
            logger.warning(
                f"No PMCID available for DOI {doi}. Fallback via PMC therefore not possible."
            )
            return False
        pmcid = records[0]["pmcid"]
        logger.info(f"Converted DOI {doi} to PMCID {pmcid}.")
    except Exception as conv_err:
        logger.error(f"Error during DOI to PMCID conversion: {conv_err}")
        return False

    # Construct PMC XML URL
    xml_url = f"https://www.ncbi.nlm.nih.gov/research/bionlp/RESTful/pmcoa.cgi/BioC_xml/{pmcid}/unicode"
    logger.info(f"Attempting to download XML from BioC-PMC URL: {xml_url}")
    try:
        xml_response = requests.get(xml_url, timeout=60)
        xml_response.raise_for_status()
        xml_path = output_path.with_suffix(".xml")
        # check for xml error:
        if xml_response.content.startswith(
            b"[Error] : No result can be found. <BR><HR><B> - https://www.ncbi.nlm.nih.gov/research/bionlp/RESTful/"
        ):
            logger.warning(f"No XML found for DOI {doi} at BioC-PMC URL {xml_url}.")
            return False
        with open(xml_path, "wb+") as f:
            f.write(xml_response.content)
        logger.info(f"Successfully downloaded XML for DOI {doi} to {xml_path}.")
        return True
    except Exception as xml_err:
        logger.error(f"Failed to download XML from BioC-PMC URL {xml_url}: {xml_err}")
        return False
fallback_elsevier_api(paper_metadata: Dict[str, Any], output_path: Path, api_keys: Dict[str, str]) -> bool

Attempt to download the full text via the Elsevier TDM API. For more information, see: https://www.elsevier.com/about/policies-and-standards/text-and-data-mining (Requires an institutional subscription and an API key provided in the api_keys dictionary under the key "ELSEVIER_TDM_API_KEY".)

Parameters:

Name Type Description Default
paper_metadata Dict[str, Any]

Dictionary containing paper metadata. Must include the 'doi' key.

required
output_path Path

A pathlib.Path object representing the path where the XML file will be saved.

required
api_keys Dict[str, str]

A dictionary containing API keys. Must include the key "ELSEVIER_TDM_API_KEY".

required

Returns:

Name Type Description
bool bool

True if the XML file was successfully downloaded, False otherwise.

Source code in paperscraper/pdf/fallbacks.py
def fallback_elsevier_api(
    paper_metadata: Dict[str, Any], output_path: Path, api_keys: Dict[str, str]
) -> bool:
    """
    Attempt to download the full text via the Elsevier TDM API.
    For more information, see:
    https://www.elsevier.com/about/policies-and-standards/text-and-data-mining
    (Requires an institutional subscription and an API key provided in the api_keys dictionary under the key "ELSEVIER_TDM_API_KEY".)

    Args:
        paper_metadata (Dict[str, Any]): Dictionary containing paper metadata. Must include the 'doi' key.
        output_path (Path): A pathlib.Path object representing the path where the XML file will be saved.
        api_keys (Dict[str, str]): A dictionary containing API keys. Must include the key "ELSEVIER_TDM_API_KEY".

    Returns:
        bool: True if the XML file was successfully downloaded, False otherwise.
    """
    elsevier_api_key = api_keys.get("ELSEVIER_TDM_API_KEY")
    doi = paper_metadata["doi"]
    api_url = f"https://api.elsevier.com/content/article/doi/{doi}?apiKey={elsevier_api_key}&httpAccept=text%2Fxml"
    logger.info(f"Attempting download via Elsevier API (XML) for {doi}: {api_url}")
    headers = {"Accept": "application/xml"}
    try:
        response = requests.get(api_url, headers=headers, timeout=60)

        # Check for 401 error and look for APIKEY_INVALID in the response
        if response.status_code == 401:
            error_text = response.text
            if "APIKEY_INVALID" in error_text:
                logger.error("Invalid API key. Couldn't download via Elsevier XML API")
            else:
                logger.error("401 Unauthorized. Couldn't download via Elsevier XML API")
            return False

        response.raise_for_status()

        # Attempt to parse it with lxml to confirm it's valid XML
        try:
            etree.fromstring(response.content)
        except etree.XMLSyntaxError as e:
            logger.warning(f"Elsevier API returned invalid XML for {doi}: {e}")
            return False

        xml_path = output_path.with_suffix(".xml")
        with open(xml_path, "wb") as f:
            f.write(response.content)
        logger.info(
            f"Successfully used Elsevier API to downloaded XML for {doi} to {xml_path}"
        )
        return True
    except Exception as e:
        logger.error(f"Could not download via Elsevier XML API: {e}")
        return False
fallback_elife_xml(doi: str, output_path: Path) -> bool

Attempt to download the XML via the eLife XML repository on GitHub.

eLife provides open access to their XML files on GitHub, which can be used as a fallback. When multiple versions exist (revised papers), it takes the latest version (e.g., v3 instead of v1).

Parameters:

Name Type Description Default
doi str

The DOI of the eLife paper to download.

required
output_path Path

A pathlib.Path object representing the path where the XML file will be saved.

required

Returns:

Name Type Description
bool bool

True if the XML file was successfully downloaded, False otherwise.

Source code in paperscraper/pdf/fallbacks.py
def fallback_elife_xml(doi: str, output_path: Path) -> bool:
    """
    Attempt to download the XML via the eLife XML repository on GitHub.

    eLife provides open access to their XML files on GitHub, which can be used as a fallback.
    When multiple versions exist (revised papers), it takes the latest version (e.g., v3 instead of v1).

    Args:
        doi (str): The DOI of the eLife paper to download.
        output_path (Path): A pathlib.Path object representing the path where the XML file will be saved.

    Returns:
        bool: True if the XML file was successfully downloaded, False otherwise.
    """
    parts = doi.split("eLife.")
    if len(parts) < 2:
        logger.error(f"Unable to parse eLife DOI: {doi}")
        return False
    article_num = parts[1].strip()

    index = get_elife_xml_index()
    if article_num not in index:
        logger.warning(f"No eLife XML found for DOI {doi}.")
        return False
    candidate_files = index[article_num]
    latest_version, latest_download_url = max(candidate_files, key=lambda x: x[0])
    try:
        r = requests.get(latest_download_url, timeout=60)
        r.raise_for_status()
        latest_xml = r.content
    except Exception as e:
        logger.error(f"Error downloading file from {latest_download_url}: {e}")
        return False

    xml_path = output_path.with_suffix(".xml")
    with open(xml_path, "wb") as f:
        f.write(latest_xml)
    logger.info(
        f"Successfully downloaded XML via eLife API ({latest_version}) for DOI {doi} to {xml_path}."
    )
    return True
get_elife_xml_index() -> dict

Fetch the eLife XML index from GitHub and return it as a dictionary.

This function retrieves and caches the list of available eLife articles in XML format from the eLife GitHub repository. It ensures that the latest version of each article is accessible for downloading. The index is cached in memory to avoid repeated network requests when processing multiple eLife papers.

Returns:

Name Type Description
dict dict

A dictionary where keys are article numbers (as strings) and values are lists of tuples (version, download_url). Each list is sorted by version number.

Source code in paperscraper/pdf/fallbacks.py
def get_elife_xml_index() -> dict:
    """
    Fetch the eLife XML index from GitHub and return it as a dictionary.

    This function retrieves and caches the list of available eLife articles in XML format
    from the eLife GitHub repository. It ensures that the latest version of each article
    is accessible for downloading. The index is cached in memory to avoid repeated
    network requests when processing multiple eLife papers.

    Returns:
        dict: A dictionary where keys are article numbers (as strings) and values are
              lists of tuples (version, download_url). Each list is sorted by version number.
    """
    global ELIFE_XML_INDEX
    if ELIFE_XML_INDEX is None:
        logger.info("Fetching eLife XML index from GitHub using git tree API")
        ELIFE_XML_INDEX = {}
        # Use the git tree API to get the full repository tree.
        base_tree_url = "https://api.github.com/repos/elifesciences/elife-article-xml/git/trees/master?recursive=1"
        r = requests.get(base_tree_url, timeout=60)
        r.raise_for_status()
        tree_data = r.json()
        items = tree_data.get("tree", [])
        # Look for files in the 'articles' directory matching the pattern.
        pattern = r"articles/elife-(\d+)-v(\d+)\.xml"
        for item in items:
            path = item.get("path", "")
            match = re.match(pattern, path)
            if match:
                article_num_padded = match.group(1)
                version = int(match.group(2))
                # Construct the raw download URL.
                download_url = f"https://raw.githubusercontent.com/elifesciences/elife-article-xml/master/{path}"
                ELIFE_XML_INDEX.setdefault(article_num_padded, []).append(
                    (version, download_url)
                )
        # Sort each article's file list by version.
        for key in ELIFE_XML_INDEX:
            ELIFE_XML_INDEX[key].sort(key=lambda x: x[0])
    return ELIFE_XML_INDEX
month_folder(doi: str) -> str

Query bioRxiv API to get the posting date of a given DOI. Convert a date to the BioRxiv S3 folder name, rolling over if it's the month's last day. E.g., if date is the last day of April, treat as May_YYYY.

Parameters:

Name Type Description Default
doi str

The DOI for which to retrieve the date.

required

Returns:

Type Description
str

Month and year in format October_2019

Source code in paperscraper/pdf/fallbacks.py
def month_folder(doi: str) -> str:
    """
    Query bioRxiv API to get the posting date of a given DOI.
    Convert a date to the BioRxiv S3 folder name, rolling over if it's the month's last day.
    E.g., if date is the last day of April, treat as May_YYYY.

    Args:
        doi: The DOI for which to retrieve the date.

    Returns:
        Month and year in format `October_2019`
    """
    url = f"https://api.biorxiv.org/details/biorxiv/{doi}/na/json"
    resp = requests.get(url, timeout=30)
    resp.raise_for_status()
    date_str = resp.json()["collection"][0]["date"]
    date = datetime.date.fromisoformat(date_str)

    # NOTE: bioRxiv papers posted on the last day of the month are archived the next day
    last_day = calendar.monthrange(date.year, date.month)[1]
    if date.day == last_day:
        date = date + datetime.timedelta(days=1)
    return date.strftime("%B_%Y")
list_meca_keys(s3_client: BaseClient, bucket: str, prefix: str) -> list

List all .meca object keys under a given prefix in a requester-pays bucket.

Parameters:

Name Type Description Default
s3_client BaseClient

S3 client to get the data from.

required
bucket str

bucket to get data from.

required
prefix str

prefix to get data from.

required

Returns:

Type Description
list

List of keys, one per existing .meca in the bucket.

Source code in paperscraper/pdf/fallbacks.py
def list_meca_keys(s3_client: BaseClient, bucket: str, prefix: str) -> list:
    """
    List all .meca object keys under a given prefix in a requester-pays bucket.

    Args:
        s3_client: S3 client to get the data from.
        bucket: bucket to get data from.
        prefix: prefix to get data from.

    Returns:
        List of keys, one per existing .meca in the bucket.
    """
    keys = []
    paginator = s3_client.get_paginator("list_objects_v2")
    for page in paginator.paginate(
        Bucket=bucket, Prefix=prefix, RequestPayer="requester"
    ):
        for obj in page.get("Contents", []):
            if obj["Key"].endswith(".meca"):
                keys.append(obj["Key"])
    return keys
find_meca_for_doi(s3_client: BaseClient, bucket: str, key: str, doi_token: str) -> bool

Efficiently inspect manifest.xml within a .meca zip by fetching only necessary bytes. Parse via ZipFile to read manifest.xml and match DOI token.

Parameters:

Name Type Description Default
s3_client BaseClient

S3 client to get the data from.

required
bucket str

bucket to get data from.

required
key str

prefix to get data from.

required
doi_token str

the DOI that should be matched

required

Returns:

Type Description
bool

Whether or not the DOI could be matched

Source code in paperscraper/pdf/fallbacks.py
def find_meca_for_doi(
    s3_client: BaseClient, bucket: str, key: str, doi_token: str
) -> bool:
    """
    Efficiently inspect manifest.xml within a .meca zip by fetching only necessary bytes.
    Parse via ZipFile to read manifest.xml and match DOI token.

    Args:
        s3_client: S3 client to get the data from.
        bucket: bucket to get data from.
        key: prefix to get data from.
        doi_token: the DOI that should be matched

    Returns:
        Whether or not the DOI could be matched
    """
    try:
        head = s3_client.get_object(
            Bucket=bucket, Key=key, Range="bytes=0-4095", RequestPayer="requester"
        )["Body"].read()
        tail = s3_client.get_object(
            Bucket=bucket, Key=key, Range="bytes=-4096", RequestPayer="requester"
        )["Body"].read()
    except Exception:
        return False

    data = head + tail
    with zipfile.ZipFile(io.BytesIO(data)) as z:
        manifest = z.read("manifest.xml")

    # Extract the last part of the DOI (newer DOIs that contain date fail otherwise)
    doi_token = doi_token.split(".")[-1]
    return doi_token.encode("utf-8") in manifest.lower()
fallback_s3(doi: str, output_path: Union[str, Path], api_keys: dict, workers: int = 32) -> bool

Download a BioRxiv PDF via the requester-pays S3 bucket using range requests.

Parameters:

Name Type Description Default
doi str

The DOI for which to retrieve the PDF (e.g. '10.1101/798496').

required
output_path Union[str, Path]

Path where the PDF will be saved (with .pdf suffix added).

required
api_keys dict

Dict containing 'AWS_ACCESS_KEY_ID' and 'AWS_SECRET_ACCESS_KEY'.

required

Returns:

Type Description
bool

True if download succeeded, False otherwise.

Source code in paperscraper/pdf/fallbacks.py
def fallback_s3(
    doi: str, output_path: Union[str, Path], api_keys: dict, workers: int = 32
) -> bool:
    """
    Download a BioRxiv PDF via the requester-pays S3 bucket using range requests.

    Args:
        doi: The DOI for which to retrieve the PDF (e.g. '10.1101/798496').
        output_path: Path where the PDF will be saved (with .pdf suffix added).
        api_keys: Dict containing 'AWS_ACCESS_KEY_ID' and 'AWS_SECRET_ACCESS_KEY'.

    Returns:
        True if download succeeded, False otherwise.
    """

    s3 = boto3.client(
        "s3",
        aws_access_key_id=api_keys.get("AWS_ACCESS_KEY_ID"),
        aws_secret_access_key=api_keys.get("AWS_SECRET_ACCESS_KEY"),
        region_name="us-east-1",
    )
    bucket = "biorxiv-src-monthly"

    # Derive prefix from DOI date
    prefix = f"Current_Content/{month_folder(doi)}/"

    # List MECA archives in that month
    meca_keys = list_meca_keys(s3, bucket, prefix)
    if not meca_keys:
        return False

    token = doi.split("/")[-1].lower()
    target = None
    executor = ThreadPoolExecutor(max_workers=32)
    futures = {
        executor.submit(find_meca_for_doi, s3, bucket, key, token): key
        for key in meca_keys
    }
    target = None
    pbar = tqdm(
        total=len(futures),
        desc=f"Scanning in biorxiv with {workers} workers for {doi}…",
    )
    for future in as_completed(futures):
        key = futures[future]
        try:
            if future.result():
                target = key
                pbar.set_description(f"Success! Found target {doi} in {key}")
                # cancel pending futures to speed shutdown
                for fut in futures:
                    fut.cancel()
                break
        except Exception:
            pass
        finally:
            pbar.update(1)
    # shutdown without waiting for remaining threads
    executor.shutdown(wait=False)
    if target is None:
        logger.error(f"Could not find {doi} on biorxiv")
        return False

    # Download full MECA and extract PDF
    data = s3.get_object(Bucket=bucket, Key=target, RequestPayer="requester")[
        "Body"
    ].read()
    output_path = Path(output_path)
    with zipfile.ZipFile(io.BytesIO(data)) as z:
        for name in z.namelist():
            if name.lower().endswith(".pdf"):
                z.extract(name, path=output_path.parent)
                # Move file to desired location
                (output_path.parent / name).rename(output_path.with_suffix(".pdf"))
                return True
    return False

pdf

Functionalities to scrape PDF files of publications.

save_pdf(paper_metadata: Dict[str, Any], filepath: Union[str, Path], save_metadata: bool = False, api_keys: Optional[Union[str, Dict[str, str]]] = None) -> None

Save a PDF file of a paper.

Parameters:

Name Type Description Default
paper_metadata Dict[str, Any]

A dictionary with the paper metadata. Must contain the doi key.

required
filepath Union[str, Path]

Path to the PDF file to be saved (with or without suffix).

required
save_metadata bool

A boolean indicating whether to save paper metadata as a separate json.

False
api_keys Optional[Union[str, Dict[str, str]]]

Either a dictionary containing API keys (if already loaded) or a string (path to API keys file). If None, will try to load from .env file and if unsuccessful, skip API-based fallbacks.

None
Source code in paperscraper/pdf/pdf.py
def save_pdf(
    paper_metadata: Dict[str, Any],
    filepath: Union[str, Path],
    save_metadata: bool = False,
    api_keys: Optional[Union[str, Dict[str, str]]] = None,
) -> None:
    """
    Save a PDF file of a paper.

    Args:
        paper_metadata: A dictionary with the paper metadata. Must contain the `doi` key.
        filepath: Path to the PDF file to be saved (with or without suffix).
        save_metadata: A boolean indicating whether to save paper metadata as a separate json.
        api_keys: Either a dictionary containing API keys (if already loaded) or a string (path to API keys file).
                  If None, will try to load from `.env` file and if unsuccessful, skip API-based fallbacks.
    """
    if not isinstance(paper_metadata, Dict):
        raise TypeError(f"paper_metadata must be a dict, not {type(paper_metadata)}.")
    if "doi" not in paper_metadata.keys():
        raise KeyError("paper_metadata must contain the key 'doi'.")
    if not isinstance(filepath, str):
        raise TypeError(f"filepath must be a string, not {type(filepath)}.")

    output_path = Path(filepath)

    if not Path(output_path).parent.exists():
        raise ValueError(f"The folder: {output_path} seems to not exist.")

    # load API keys from file if not already loaded via in save_pdf_from_dump (dict)
    if not isinstance(api_keys, dict):
        api_keys = load_api_keys(api_keys)

    doi = paper_metadata["doi"]
    url = f"https://doi.org/{doi}"
    success = False
    try:
        response = requests.get(url, timeout=60)
        response.raise_for_status()
        success = True
    except Exception as e:
        error = str(e)
        logger.warning(f"Could not download from: {url} - {e}. ")

    if not success and "biorxiv" in error:
        if (
            api_keys.get("AWS_ACCESS_KEY_ID") is None
            or api_keys.get("AWS_SECRET_ACCESS_KEY") is None
        ):
            logger.info(
                "BiorXiv PDFs can be downloaded from a S3 bucket with a requester-pay option. "
                "Consider setting `AWS_ACCESS_KEY_ID` and `AWS_SECRET_ACCESS_KEY` to use this option. "
                "Pricing is a few cent per GB, thus each request costs < 0.1 cents. "
                "For details see: https://www.biorxiv.org/tdm"
            )
        else:
            success = FALLBACKS["s3"](doi, output_path, api_keys)
            if success:
                return

    if not success:
        # always first try fallback to BioC-PMC (open access papers on PubMed Central)
        success = FALLBACKS["bioc_pmc"](doi, output_path)

        # if BioC-PMC fails, try other fallbacks
        if not success:
            # check for specific publishers
            if "elife" in error.lower():  # elife has an open XML repository on GitHub
                FALLBACKS["elife"](doi, output_path)
            elif (
                ("wiley" in error.lower())
                and api_keys
                and ("WILEY_TDM_API_TOKEN" in api_keys)
            ):
                FALLBACKS["wiley"](paper_metadata, output_path, api_keys)
        return

    soup = BeautifulSoup(response.text, features="lxml")
    meta_pdf = soup.find("meta", {"name": "citation_pdf_url"})
    if meta_pdf and meta_pdf.get("content"):
        pdf_url = meta_pdf.get("content")
        try:
            response = requests.get(pdf_url, timeout=60)
            response.raise_for_status()

            if response.content[:4] != b"%PDF":
                logger.warning(
                    f"The file from {url} does not appear to be a valid PDF."
                )
                success = FALLBACKS["bioc_pmc"](doi, output_path)
                if not success:
                    # Check for specific publishers
                    if "elife" in doi.lower():
                        logger.info("Attempting fallback to eLife XML repository")
                        FALLBACKS["elife"](doi, output_path)
                    elif api_keys and "WILEY_TDM_API_TOKEN" in api_keys:
                        FALLBACKS["wiley"](paper_metadata, output_path, api_keys)
                    elif api_keys and "ELSEVIER_TDM_API_KEY" in api_keys:
                        FALLBACKS["elsevier"](paper_metadata, output_path, api_keys)
            else:
                with open(output_path.with_suffix(".pdf"), "wb+") as f:
                    f.write(response.content)
        except Exception as e:
            logger.warning(f"Could not download {pdf_url}: {e}")
    else:  # if no citation_pdf_url meta tag found, try other fallbacks
        if "elife" in doi.lower():
            logger.info(
                "DOI contains eLife, attempting fallback to eLife XML repository on GitHub."
            )
            if not FALLBACKS["elife"](doi, output_path):
                logger.warning(
                    f"eLife XML fallback failed for {paper_metadata['doi']}."
                )
        elif (
            api_keys and "ELSEVIER_TDM_API_KEY" in api_keys
        ):  # elsevier journals can be accessed via the Elsevier TDM API (requires API key)
            FALLBACKS["elsevier"](paper_metadata, output_path, api_keys)
        else:
            logger.warning(
                f"Retrieval failed. No citation_pdf_url meta tag found for {url} and no applicable fallback mechanism available."
            )

    if not save_metadata:
        return

    metadata = {}
    # Extract title
    title_tag = soup.find("meta", {"name": "citation_title"})
    metadata["title"] = title_tag.get("content") if title_tag else "Title not found"

    # Extract authors
    authors = []
    for author_tag in soup.find_all("meta", {"name": "citation_author"}):
        if author_tag.get("content"):
            authors.append(author_tag["content"])
    metadata["authors"] = authors if authors else ["Author information not found"]

    # Extract abstract
    domain = tldextract.extract(url).domain
    abstract_keys = ABSTRACT_ATTRIBUTE.get(domain, DEFAULT_ATTRIBUTES)

    for key in abstract_keys:
        abstract_tag = soup.find("meta", {"name": key})
        if abstract_tag:
            raw_abstract = BeautifulSoup(
                abstract_tag.get("content", "None"), "html.parser"
            ).get_text(separator="\n")
            if raw_abstract.strip().startswith("Abstract"):
                raw_abstract = raw_abstract.strip()[8:]
            metadata["abstract"] = raw_abstract.strip()
            break

    if "abstract" not in metadata.keys():
        metadata["abstract"] = "Abstract not found"
        logger.warning(f"Could not find abstract for {url}")
    elif metadata["abstract"].endswith("..."):
        logger.warning(f"Abstract truncated from {url}")

    # Save metadata to JSON
    try:
        with open(output_path.with_suffix(".json"), "w", encoding="utf-8") as f:
            json.dump(metadata, f, ensure_ascii=False, indent=4)
    except Exception as e:
        logger.error(f"Failed to save metadata to {str(output_path)}: {e}")
save_pdf_from_dump(dump_path: str, pdf_path: str, key_to_save: str = 'doi', save_metadata: bool = False, api_keys: Optional[str] = None) -> None

Receives a path to a .jsonl dump with paper metadata and saves the PDF files of each paper.

Parameters:

Name Type Description Default
dump_path str

Path to a .jsonl file with paper metadata, one paper per line.

required
pdf_path str

Path to a folder where the files will be stored.

required
key_to_save str

Key in the paper metadata to use as filename. Has to be doi or title. Defaults to doi.

'doi'
save_metadata bool

A boolean indicating whether to save paper metadata as a separate json.

False
api_keys Optional[str]

Path to a file with API keys. If None, API-based fallbacks will be skipped.

None
Source code in paperscraper/pdf/pdf.py
def save_pdf_from_dump(
    dump_path: str,
    pdf_path: str,
    key_to_save: str = "doi",
    save_metadata: bool = False,
    api_keys: Optional[str] = None,
) -> None:
    """
    Receives a path to a `.jsonl` dump with paper metadata and saves the PDF files of
    each paper.

    Args:
        dump_path: Path to a `.jsonl` file with paper metadata, one paper per line.
        pdf_path: Path to a folder where the files will be stored.
        key_to_save: Key in the paper metadata to use as filename.
            Has to be `doi` or `title`. Defaults to `doi`.
        save_metadata: A boolean indicating whether to save paper metadata as a separate json.
        api_keys: Path to a file with API keys. If None, API-based fallbacks will be skipped.
    """

    if not isinstance(dump_path, str):
        raise TypeError(f"dump_path must be a string, not {type(dump_path)}.")
    if not dump_path.endswith(".jsonl"):
        raise ValueError("Please provide a dump_path with .jsonl extension.")

    if not isinstance(pdf_path, str):
        raise TypeError(f"pdf_path must be a string, not {type(pdf_path)}.")

    if not isinstance(key_to_save, str):
        raise TypeError(f"key_to_save must be a string, not {type(key_to_save)}.")
    if key_to_save not in ["doi", "title", "date"]:
        raise ValueError("key_to_save must be one of 'doi' or 'title'.")

    papers = load_jsonl(dump_path)

    if not isinstance(api_keys, dict):
        api_keys = load_api_keys(api_keys)

    pbar = tqdm(papers, total=len(papers), desc="Processing")
    for i, paper in enumerate(pbar):
        pbar.set_description(f"Processing paper {i + 1}/{len(papers)}")

        if "doi" not in paper.keys() or paper["doi"] is None:
            logger.warning(f"Skipping {paper['title']} since no DOI available.")
            continue
        filename = paper[key_to_save].replace("/", "_")
        pdf_file = Path(os.path.join(pdf_path, f"{filename}.pdf"))
        xml_file = pdf_file.with_suffix(".xml")
        if pdf_file.exists():
            logger.info(f"File {pdf_file} already exists. Skipping download.")
            continue
        if xml_file.exists():
            logger.info(f"File {xml_file} already exists. Skipping download.")
            continue
        output_path = str(pdf_file)
        save_pdf(paper, output_path, save_metadata=save_metadata, api_keys=api_keys)

utils

load_api_keys(filepath: Optional[str] = None) -> Dict[str, str]

Reads API keys from a file and returns them as a dictionary. The file should have each API key on a separate line in the format: KEY_NAME=API_KEY_VALUE

Example

WILEY_TDM_API_TOKEN=your_wiley_token_here ELSEVIER_TDM_API_KEY=your_elsevier_key_here

Parameters:

Name Type Description Default
filepath Optional[str]

Optional path to the file containing API keys.

None

Returns:

Type Description
Dict[str, str]

Dict[str, str]: A dictionary where keys are API key names and values are their respective API keys.

Source code in paperscraper/pdf/utils.py
def load_api_keys(filepath: Optional[str] = None) -> Dict[str, str]:
    """
    Reads API keys from a file and returns them as a dictionary.
    The file should have each API key on a separate line in the format:
        KEY_NAME=API_KEY_VALUE

    Example:
        WILEY_TDM_API_TOKEN=your_wiley_token_here
        ELSEVIER_TDM_API_KEY=your_elsevier_key_here

    Args:
        filepath: Optional path to the file containing API keys.

    Returns:
        Dict[str, str]: A dictionary where keys are API key names and values are their respective API keys.
    """
    if filepath:
        load_dotenv(dotenv_path=filepath)
    else:
        load_dotenv(find_dotenv())

    return {
        "WILEY_TDM_API_TOKEN": os.getenv("WILEY_TDM_API_TOKEN"),
        "ELSEVIER_TDM_API_KEY": os.getenv("ELSEVIER_TDM_API_KEY"),
        "AWS_ACCESS_KEY_ID": os.getenv("AWS_ACCESS_KEY_ID"),
        "AWS_SECRET_ACCESS_KEY": os.getenv("AWS_SECRET_ACCESS_KEY"),
    }

plotting

plot_comparison(data_dict: dict, keys: List[str], x_ticks: List[str] = ['2015', '2016', '2017', '2018', '2019', '2020'], show_preprint: bool = False, title_text: str = '', keyword_text: Optional[List[str]] = None, figpath: str = 'comparison_plot.pdf') -> None

Plot temporal evolution of number of papers per keyword

Parameters:

Name Type Description Default
data_dict dict

A dictionary with keywords as keys. Each value should be a dictionary itself, with keys for the different APIs. For example data_dict = { 'covid_19.jsonl': { 'pubmed': [0, 0, 0, 12345], 'arxiv': [0, 0, 0, 1234], ... } 'coronavirus.jsonl': 'pubmed': [234, 345, 456, 12345], 'arxiv': [123, 234, 345, 1234], ... } }

required
keys List[str]

List of keys which should be plotted. This has to be a subset of data_dict.keys().

required
x_ticks List[str]

List of strings to be used for the x-ticks. Should have same length as data_dict[key][database]. Defaults to ['2015', '2016', '2017', '2018', '2019', '2020'], meaning that papers are aggregated per year.

['2015', '2016', '2017', '2018', '2019', '2020']
show_preprint bool

Whether preprint servers are aggregated or not. Defaults to False.

False
title_text str

Title for the produced figure. Defaults to ''.

''
keyword_text Optional[List[str]]

Figure caption per keyword. Defaults to None, i.e. empty strings will be used.

None
figpath str

Name under which figure is saved. Relative or absolute paths can be given. Defaults to 'comparison_plot.pdf'.

'comparison_plot.pdf'

Raises:

Type Description
KeyError

If a database is missing in data_dict.

Source code in paperscraper/plotting.py
def plot_comparison(
    data_dict: dict,
    keys: List[str],
    x_ticks: List[str] = ["2015", "2016", "2017", "2018", "2019", "2020"],
    show_preprint: bool = False,
    title_text: str = "",
    keyword_text: Optional[List[str]] = None,
    figpath: str = "comparison_plot.pdf",
) -> None:
    """Plot temporal evolution of number of papers per keyword

    Args:
        data_dict: A dictionary with keywords as keys. Each value should be a
            dictionary itself, with keys for the different APIs. For example
            data_dict = {
                'covid_19.jsonl': {
                    'pubmed': [0, 0, 0, 12345],
                    'arxiv': [0, 0, 0, 1234],
                    ...
                }
                'coronavirus.jsonl':
                    'pubmed': [234, 345, 456, 12345],
                    'arxiv': [123, 234, 345, 1234],
                    ...
                }
            }
        keys: List of keys which should be plotted. This has to be a subset of data_dict.keys().
        x_ticks: List of strings to be used for the x-ticks. Should have same length as
            data_dict[key][database]. Defaults to ['2015', '2016', '2017', '2018', '2019', '2020'],
            meaning that papers are aggregated per year.
        show_preprint: Whether preprint servers are aggregated or not.
            Defaults to False.
        title_text: Title for the produced figure. Defaults to ''.
        keyword_text: Figure caption per keyword. Defaults to None, i.e. empty strings will be used.
        figpath: Name under which figure is saved. Relative or absolute
            paths can be given. Defaults to 'comparison_plot.pdf'.

    Raises:
        KeyError: If a database is missing in data_dict.
    """

    sns.set_palette(sns.color_palette("colorblind", 10))
    plt.rcParams.update({"hatch.color": "w"})
    plt.rcParams["figure.facecolor"] = "white"
    plt.figure(figsize=(8, 5))

    arxiv, biorxiv, pubmed, medrxiv, chemrxiv, preprint = [], [], [], [], [], []

    for key in keys:
        try:
            arxiv.append(data_dict[key]["arxiv"])
            biorxiv.append(data_dict[key]["biorxiv"])
            medrxiv.append(data_dict[key]["medrxiv"])
            chemrxiv.append(data_dict[key]["chemrxiv"])
            pubmed.append(data_dict[key]["pubmed"])
        except KeyError:
            raise KeyError(
                f"Did not find all DBs for {key}, only found {data_dict[key].keys()}"
            )
        preprint.append(arxiv[-1] + biorxiv[-1] + medrxiv[-1] + chemrxiv[-1])

    ind = np.arange(len(arxiv[0]))  # the x locations for the groups
    width = [0.2] * len(ind)  # the width of the bars: can also be len(x) sequence
    if len(keys) == 2:
        pos = [-0.2, 0.2]
    elif len(keys) == 3:
        pos = [-0.3, 0.0, 0.3]

    plts = []
    legend_plts = []
    patterns = ("|||", "oo", "xx", "..", "**")
    if show_preprint:
        bars = [pubmed, preprint]
        legend_platform = ["PubMed", "Preprint"]
    else:
        bars = [pubmed, arxiv, biorxiv, chemrxiv, medrxiv]
        legend_platform = ["PubMed", "ArXiv", "BiorXiv", "ChemRxiv", "MedRxiv"]
    for idx in range(len(keys)):
        bottom = 0

        for bidx, b in enumerate(bars):
            if idx == 0:
                p = plt.bar(
                    ind + pos[idx],
                    b[idx],
                    width,
                    linewidth=1,
                    edgecolor="k",
                    bottom=bottom,
                )
            else:
                p = plt.bar(
                    ind + pos[idx],
                    b[idx],
                    width,
                    color=next(iter(plts[bidx])).get_facecolor(),
                    linewidth=1,
                    edgecolor="k",
                    bottom=bottom,
                )

            bottom += b[idx]
            plts.append(p)
        legend_plts.append(
            plt.bar(ind + pos[idx], np.zeros((len(ind),)), color="k", bottom=bottom)
        )

    plt.ylabel("Counts", size=15)
    plt.xlabel("Years", size=15)
    plt.title(f"Keywords: {title_text}", size=14)
    # Customize minor tick labels
    plt.xticks(ind, x_ticks, size=10)

    legend = plt.legend(
        legend_platform,
        prop={"size": 12},
        loc="upper left",
        title="Platform:",
        title_fontsize=13,
        ncol=1,
    )

    # Now set the hatches to not destroy legend

    for idx, stackbar in enumerate(plts):
        pidx = int(np.floor(idx / len(bars)))
        for bar in stackbar:
            bar.set_hatch(patterns[pidx])

    for idx, stackbar in enumerate(legend_plts):
        for bar in stackbar:
            bar.set_hatch(patterns[idx])

    if not keyword_text:
        keyword_text = [""] * len(keys)

    plt.legend(
        legend_plts,
        keyword_text,
        loc="upper center",
        prop={"size": 12},
        title="Keywords (X):",
        title_fontsize=13,
    )
    plt.gca().add_artist(legend)

    get_step_size = lambda x: round(x / 10, -math.floor(math.log10(x)) + 1)
    ymax = plt.gca().get_ylim()[1]
    step_size = np.clip(get_step_size(ymax), 5, 1000)
    y_steps = np.arange(0, ymax, step_size)

    for y_step in y_steps:
        plt.hlines(y_step, xmax=10, xmin=-1, color="black", linewidth=0.1)
    plt.xlim([-0.5, len(ind)])
    plt.ylim([0, ymax * 1.02])

    plt.tight_layout()
    plt.savefig(figpath)
    plt.show()

plot_single(data_dict: dict, keys: str, x_ticks: List[str] = ['2015', '2016', '2017', '2018', '2019', '2020'], show_preprint: bool = False, title_text: str = '', figpath: str = 'comparison_plot.pdf', logscale: bool = False) -> None

Plot temporal evolution of number of papers per keyword

Parameters:

Name Type Description Default
data_dict dict

A dictionary with keywords as keys. Each value should be a dictionary itself, with keys for the different APIs. For example data_dict = { 'covid_19.jsonl': { 'pubmed': [0, 0, 0, 12345], 'arxiv': [0, 0, 0, 1234], ... } 'coronavirus.jsonl': 'pubmed': [234, 345, 456, 12345], 'arxiv': [123, 234, 345, 1234], ... } }

required
keys str

A key which should be plotted. This has to be a subset of data_dict.keys().

required
x_ticks List[str]

List of strings to be used for the x-ticks. Should have same length as data_dict[key][database]. Defaults to ['2015', '2016', '2017', '2018', '2019', '2020'], meaning that papers are aggregated per year.

['2015', '2016', '2017', '2018', '2019', '2020']
show_preprint bool

Whether preprint servers are aggregated or not. Defaults to False.

False
title_text str

Title for the produced figure. Defaults to ''.

''
figpath str

Name under which figure is saved. Relative or absolute paths can be given. Defaults to 'comparison_plot.pdf'.

'comparison_plot.pdf'
logscale bool

Whether y-axis is plotted on logscale. Defaults to False.

False

Raises:

Type Description
KeyError

If a database is missing in data_dict.

Source code in paperscraper/plotting.py
def plot_single(
    data_dict: dict,
    keys: str,
    x_ticks: List[str] = ["2015", "2016", "2017", "2018", "2019", "2020"],
    show_preprint: bool = False,
    title_text: str = "",
    figpath: str = "comparison_plot.pdf",
    logscale: bool = False,
) -> None:
    """Plot temporal evolution of number of papers per keyword

    Args:
        data_dict: A dictionary with keywords as keys. Each value should be a
            dictionary itself, with keys for the different APIs. For example
            data_dict = {
                'covid_19.jsonl': {
                    'pubmed': [0, 0, 0, 12345],
                    'arxiv': [0, 0, 0, 1234],
                    ...
                }
                'coronavirus.jsonl':
                    'pubmed': [234, 345, 456, 12345],
                    'arxiv': [123, 234, 345, 1234],
                    ...
                }
            }
        keys: A key which should be plotted. This has to be a subset of data_dict.keys().
        x_ticks (List[str]): List of strings to be used for the x-ticks. Should have
            same length as data_dict[key][database]. Defaults to ['2015', '2016',
            '2017', '2018', '2019', '2020'], meaning that papers are aggregated per
            year.
        show_preprint: Whether preprint servers are aggregated or not.
            Defaults to False.
        title_text: Title for the produced figure. Defaults to ''.
        figpath (str, optional): Name under which figure is saved. Relative or absolute
            paths can be given. Defaults to 'comparison_plot.pdf'.
        logscale: Whether y-axis is plotted on logscale. Defaults to False.

    Raises:
        KeyError: If a database is missing in data_dict.
    """

    sns.set_palette(sns.color_palette("colorblind", 10))
    plt.rcParams.update({"hatch.color": "w"})
    plt.rcParams["figure.facecolor"] = "white"
    plt.figure(figsize=(8, 5))

    arxiv, biorxiv, pubmed, medrxiv, chemrxiv, preprint = [], [], [], [], [], []

    for key in keys:
        try:
            arxiv.append(data_dict[key]["arxiv"])
            biorxiv.append(data_dict[key]["biorxiv"])
            medrxiv.append(data_dict[key]["medrxiv"])
            chemrxiv.append(data_dict[key]["chemrxiv"])
            pubmed.append(data_dict[key]["pubmed"])
        except KeyError:
            raise KeyError(
                f"Did not find all DBs for {key}, only found {data_dict[key].keys()}"
            )
        preprint.append(arxiv[-1] + biorxiv[-1] + medrxiv[-1] + chemrxiv[-1])

    ind = np.arange(len(arxiv[0]))  # the x locations for the groups
    width = [0.75] * len(ind)  # the width of the bars: can also be len(x) sequence
    fnc = np.log10 if logscale else np.copy

    plts = []
    legend_plts = []
    if show_preprint:
        bars = [pubmed, preprint]
        legend_platform = ["PubMed", "Preprint"]
        if logscale:
            sums = np.array(pubmed) + np.array(preprint)
            logsums = np.log10(sums)
            bars = [pubmed * logsums / sums, preprint * logsums / sums]

    else:
        bars = [pubmed, arxiv, biorxiv, chemrxiv, medrxiv]
        legend_platform = ["PubMed", "ArXiv", "BiorXiv", "ChemRxiv", "MedRxiv"]
        if logscale:
            sums = (
                np.array(pubmed)
                + np.array(arxiv)
                + np.array(biorxiv)
                + np.array(chemrxiv)
                + np.array(medrxiv)
            )
            logsums = np.log10s(sums)
            bars = [
                pubmed * logsums / sums,
                arxiv * logsums / sums,
                biorxiv * logsums / sums,
                chemrxiv * logsums / sums,
                medrxiv * logsums / sums,
            ]
    for idx in range(len(keys)):
        bottom = 0

        for bidx, b in enumerate(bars):
            if idx == 0:
                p = plt.bar(
                    ind,
                    b[idx],
                    width,
                    linewidth=1,
                    edgecolor="k",
                    bottom=bottom,
                )
            else:
                p = plt.bar(
                    ind,
                    b[idx],
                    width,
                    color=next(iter(plts[bidx])).get_facecolor(),
                    linewidth=1,
                    edgecolor="k",
                    bottom=bottom,
                )

            bottom += b[idx]
            plts.append(p)
        legend_plts.append(
            plt.bar(ind, np.zeros((len(ind),)), color="k", bottom=bottom)
        )

    (
        plt.ylabel("Counts", size=17)
        if not logscale
        else plt.ylabel("Counts (log scale)", size=17)
    )
    plt.xlabel("Years", size=17)
    plt.title(title_text, size=17)
    # Customize minor tick labels

    plt.xticks(ind, x_ticks, size=14)
    ymax = plt.gca().get_ylim()[1]
    if logscale:
        yticks = np.arange(1, ymax).astype(int)
        plt.yticks(yticks, np.power(10, yticks))

    plt.tick_params(axis="y", labelsize=17)

    plt.legend(
        legend_platform,
        prop={"size": 14},
        loc="upper left",
        title="Platform:",
        title_fontsize=17,
        ncol=1,
    )

    get_step_size = lambda x: round(x / 10, -math.floor(math.log10(x)) + 1)
    ymax = plt.gca().get_ylim()[1]

    for y_step in plt.yticks()[0]:
        plt.hlines(y_step, xmax=10, xmin=-1, color="black", linewidth=0.1)
    plt.xlim([-0.5, len(ind)])
    plt.ylim([0, ymax * 1.02])

    plt.tight_layout()
    plt.savefig(figpath)
    plt.show()

plot_venn_two(sizes: List[int], labels: List[str], figpath: str = 'venn_two.pdf', title: str = '', **kwargs) -> None

Plot a single Venn Diagram with two terms.

Parameters:

Name Type Description Default
sizes List[int]

List of ints of length 3. First two elements correspond to the labels, third one to the intersection.

required
labels [type]

List of str of length 2, containing names of circles.

required
figpath str

Name under which figure is saved. Defaults to 'venn_two.pdf', i.e. it is inferred from labels.

'venn_two.pdf'
title str

Title of the plot. Defaults to '', i.e. it is inferred from labels.

''
**kwargs

Additional keyword arguments for venn2.

{}
Source code in paperscraper/plotting.py
def plot_venn_two(
    sizes: List[int],
    labels: List[str],
    figpath: str = "venn_two.pdf",
    title: str = "",
    **kwargs,
) -> None:
    """Plot a single Venn Diagram with two terms.

    Args:
        sizes (List[int]): List of ints of length 3. First two elements correspond to
            the labels, third one to the intersection.
        labels ([type]): List of str of length 2, containing names of circles.
        figpath (str): Name under which figure is saved. Defaults to 'venn_two.pdf', i.e. it is
            inferred from labels.
        title (str): Title of the plot. Defaults to '', i.e. it is inferred from
            labels.
        **kwargs: Additional keyword arguments for venn2.
    """
    assert len(sizes) == 3, "Incorrect type/length of sizes"
    assert len(labels) == 2, "Incorrect type/length of labels"

    title = get_name(labels) if title == "" else title
    figname = title.lower().replace(" vs. ", "_") if figpath == "" else figpath
    venn2(subsets=sizes, set_labels=labels, alpha=0.6, **kwargs)
    venn2_circles(
        subsets=sizes, linestyle="solid", linewidth=0.6, color="grey", **kwargs
    )
    if kwargs.get("ax", False):
        print(kwargs, type(kwargs))
        print(kwargs["ax"])
        kwargs["ax"].set_title(title, fontdict={"fontweight": "bold"}, size=15)
    else:
        plt.title(title, fontdict={"fontweight": "bold"}, size=15)
        plt.savefig(f"{figname}.pdf")

plot_venn_three(sizes: List[int], labels: List[str], figpath: str = '', title: str = '', **kwargs) -> None

Plot a single Venn Diagram with two terms.

Parameters:

Name Type Description Default
sizes List[int]

List of ints of length 3. First two elements correspond to the labels, third one to the intersection.

required
labels List[str]

List of str of length 2, containing names of circles.

required
figpath str

Name under which figure is saved. Defaults to '', i.e. it is inferred from labels.

''
title str

Title of the plot. Defaults to '', i.e. it is inferred from labels.

''
**kwargs

Additional keyword arguments for venn3.

{}
Source code in paperscraper/plotting.py
def plot_venn_three(
    sizes: List[int], labels: List[str], figpath: str = "", title: str = "", **kwargs
) -> None:
    """Plot a single Venn Diagram with two terms.

    Args:
        sizes (List[int]): List of ints of length 3. First two elements correspond to
            the labels, third one to the intersection.
        labels (List[str]): List of str of length 2, containing names of circles.
        figpath (str): Name under which figure is saved. Defaults to '', i.e. it is
            inferred from labels.
        title (str): Title of the plot. Defaults to '', i.e. it is inferred from
            labels.
        **kwargs: Additional keyword arguments for venn3.
    """
    assert len(sizes) == 7, "Incorrect type/length of sizes"
    assert len(labels) == 3, "Incorrect type/length of labels"

    title = get_name(labels) if title == "" else title
    figname = title.lower().replace(" vs. ", "_") if figpath == "" else figpath

    venn3(subsets=sizes, set_labels=labels, alpha=0.6, **kwargs)
    venn3_circles(
        subsets=sizes, linestyle="solid", linewidth=0.6, color="grey", **kwargs
    )

    if kwargs.get("ax", False):
        kwargs["ax"].set_title(title, fontdict={"fontweight": "bold"}, size=15)
    else:
        plt.title(title, fontdict={"fontweight": "bold"}, size=15)
        plt.savefig(f"{figname}.pdf")

plot_multiple_venn(sizes: List[List[int]], labels: List[List[str]], figname: str, titles: List[str], suptitle: str = '', gridspec_kw: dict = {}, figsize: Iterable = (8, 4.5), **kwargs) -> None

Plots multiple Venn Diagrams next to each other

Parameters:

Name Type Description Default
sizes List[List[int]]

List of lists with sizes, one per Venn Diagram. Lengths of lists should be either 3 (plot_venn_two) or 7 (plot_venn_two).

required
labels List[List[str]]

List of Lists of str containing names of circles. Lengths of lists should be either 2 or 3.

required
figname str

Name under which figure is saved. Defaults to '', i.e. it is inferred from labels.

required
titles List[str]

Titles of subplots. Should have same length like labels and sizes.

required
suptitle str

Title of entire plot. Defaults to '', i.e. no title.

''
gridspec_kw dict

Additional keyword args for plt.subplots. Useful to adjust width of plots. E.g. gridspec_kw={'width_ratios': [1, 2]} will make the second Venn Diagram double as wide as first one.

{}
**kwargs

Additional keyword arguments for venn3.

{}
Source code in paperscraper/plotting.py
def plot_multiple_venn(
    sizes: List[List[int]],
    labels: List[List[str]],
    figname: str,
    titles: List[str],
    suptitle: str = "",
    gridspec_kw: dict = {},
    figsize: Iterable = (8, 4.5),
    **kwargs,
) -> None:
    """Plots multiple Venn Diagrams next to each other

    Args:
        sizes (List[List[int]]): List of lists with sizes, one per Venn Diagram.
            Lengths of lists should be either 3 (plot_venn_two) or 7
            (plot_venn_two).
        labels (List[List[str]]): List of Lists of str containing names of circles.
            Lengths of lists should be either 2 or 3.
        figname (str): Name under which figure is saved. Defaults to '', i.e. it is
            inferred from labels.
        titles (List[str]): Titles of subplots. Should have same length like labels
            and sizes.
        suptitle (str): Title of entire plot. Defaults to '', i.e. no title.
        gridspec_kw (dict): Additional keyword args for plt.subplots. Useful to
            adjust width of plots. E.g.
                gridspec_kw={'width_ratios': [1, 2]}
            will make the second Venn Diagram double as wide as first one.
        **kwargs: Additional keyword arguments for venn3.
    """

    assert len(sizes) == len(labels), "Length of labels & sizes dont match."
    assert len(sizes) == len(titles), "Length of titles & sizes dont match."
    assert len(sizes) > 1, "At least 2 items should be provided."
    assert all(list(map(lambda x: len(x) in [2, 3], labels))), "Wrong label sizes."
    assert all(list(map(lambda x: len(x) in [3, 7], sizes))), "Wrong label sizes."

    fig, axes = plt.subplots(1, len(sizes), gridspec_kw=gridspec_kw, figsize=figsize)
    plt.suptitle(suptitle, size=18, fontweight="bold")

    figname = titles[0].lower().replace(" vs. ", "_") if figname == "" else figname

    for idx, (size, label, title) in enumerate(zip(sizes, labels, titles)):
        if len(label) == 2:
            plot_venn_two(size, label, title=title, ax=axes[idx])
        elif len(label) == 3:
            plot_venn_three(size, label, title=title, ax=axes[idx])

    plt.savefig(f"{figname}.pdf")

postprocessing

aggregate_paper(data: List[Dict[str, str]], start_year: int = 2016, bins_per_year: int = 4, filtering: bool = False, filter_keys: List = list(), unwanted_keys: List = list(), return_filtered: bool = False, filter_abstract: bool = True, last_year: int = 2021)

Consumes a list of unstructured keyword results from a .jsonl and aggregates papers into several bins per year.

Parameters:

Name Type Description Default
data List[Dict[str, str]]

Content of a .jsonl file, i.e., a list of dictionaries, one per paper.

required
start_year int

First year of interest. Defaults to 2016.

2016
bins_per_year int

Defaults to 4 (quarterly aggregation).

4
filtering bool

Whether or not all papers in .jsonl are perceived as matches or whether an additional sanity checking for the keywords is performed in abstract/title. Defaults to False.

False
filter_keys list

List of str used for filtering. Only applies if filtering is True. Defaults to empty list.

list()
unwanted_keys list

List of str that must not occur in either title or abstract. Only applies if filtering is True.

list()
return_filtered bool

Whether the filtered matches are also returned. Only applies if filtering is True. Defaults to False.

False
filter_abstract bool

Whether the keyword is searched in the abstract or not. Defaults to True.

True
last_year int

Most recent year for the aggregation. Defaults to current year. All newer entries are discarded.

2021

Returns:

Name Type Description
bins array

Vector of length number of years (2020 - start_year) x bins_per_year.

Source code in paperscraper/postprocessing.py
def aggregate_paper(
    data: List[Dict[str, str]],
    start_year: int = 2016,
    bins_per_year: int = 4,
    filtering: bool = False,
    filter_keys: List = list(),
    unwanted_keys: List = list(),
    return_filtered: bool = False,
    filter_abstract: bool = True,
    last_year: int = 2021,
):
    """Consumes a list of unstructured keyword results from a .jsonl and
    aggregates papers into several bins per year.

    Args:
        data (List[Dict[str,str]]): Content of a .jsonl file, i.e., a list of
            dictionaries, one per paper.
        start_year (int, optional): First year of interest. Defaults to 2016.
        bins_per_year (int, optional): Defaults to 4 (quarterly aggregation).
        filtering (bool, optional): Whether or not all papers in .jsonl are
            perceived as matches or whether an additional sanity checking for
            the keywords is performed in abstract/title. Defaults to False.
        filter_keys (list, optional): List of str used for filtering. Only
            applies if filtering is True. Defaults to empty list.
        unwanted_keys (list, optional): List of str that must not occur in either
            title or abstract. Only applies if filtering is True.
        return_filtered (bool, optional): Whether the filtered matches are also
            returned. Only applies if filtering is True. Defaults to False.
        filter_abstract (bool, optional): Whether the keyword is searched in the abstract
            or not. Defaults to True.
        last_year (int, optional): Most recent year for the aggregation. Defaults
            to current year. All newer entries are discarded.

    Returns:
        bins (np.array): Vector of length number of years (2020 - start_year) x
            bins_per_year.
    """

    if not isinstance(data, list):
        raise ValueError(f"Expected list, received {type(data)}")
    if not isinstance(bins_per_year, int):
        raise ValueError(f"Expected int, received {type(bins_per_year)}")
    if 12 % bins_per_year != 0:
        raise ValueError(f"Can't split year into {bins_per_year} bins")

    num_years = last_year - start_year + 1
    bins = np.zeros((num_years * bins_per_year))

    if len(data) == 0:
        return bins if not return_filtered else (bins, [])

    # Remove duplicate entries (keep only the first one)
    df = pd.DataFrame(data).sort_values(by="date", ascending=True)
    data = df.drop_duplicates(subset="title", keep="first").to_dict("records")

    dates = [dd["date"] for dd in data]

    filtered = []
    for paper, date in zip(data, dates):
        year = int(date.split("-")[0])
        if year < start_year or year > last_year:
            continue

        # At least one synonym per keyword needs to be in either title or
        # abstract.
        if filtering and filter_keys != list():
            # Filter out papers which undesired terms
            unwanted = False
            for unwanted_key in unwanted_keys:
                if unwanted_key.lower() in paper["title"].lower():
                    unwanted = True
                if (
                    filter_abstract
                    and paper["abstract"] is not None
                    and unwanted_key.lower() in paper["abstract"].lower()
                ):
                    unwanted = True
            if unwanted:
                continue

            got_keys = []
            for key_term in filter_keys:
                got_key = False
                if not isinstance(key_term, list):
                    key_term = [key_term]
                for key in key_term:
                    if key.lower() in paper["title"].lower():
                        got_key = True
                    if (
                        filter_abstract
                        and paper["abstract"] is not None
                        and key.lower() in paper["abstract"].lower()
                    ):
                        got_key = True
                got_keys.append(got_key)

            if len(got_keys) != sum(got_keys):
                continue

        filtered.append(paper)

        if len(date.split("-")) < 2:
            logger.warning(
                f"Paper without month {date}, randomly assigned month.{paper['title']}"
            )
            month = np.random.choice(12)
        else:
            month = int(date.split("-")[1])

        year_bin = year - start_year
        month_bin = int(np.floor((month - 1) / (12 / bins_per_year)))
        bins[year_bin * bins_per_year + month_bin] += 1

    if return_filtered:
        return bins, filtered
    else:
        return bins

pubmed

dump_papers(papers: pd.DataFrame, filepath: str) -> None

Receives a pd.DataFrame, one paper per row and dumps it into a .jsonl file with one paper per line.

Parameters:

Name Type Description Default
papers DataFrame

A dataframe of paper metadata, one paper per row.

required
filepath str

Path to dump the papers, has to end with .jsonl.

required
Source code in paperscraper/utils.py
def dump_papers(papers: pd.DataFrame, filepath: str) -> None:
    """
    Receives a pd.DataFrame, one paper per row and dumps it into a .jsonl
    file with one paper per line.

    Args:
        papers (pd.DataFrame): A dataframe of paper metadata, one paper per row.
        filepath (str): Path to dump the papers, has to end with `.jsonl`.
    """
    if not isinstance(filepath, str):
        raise TypeError(f"filepath must be a string, not {type(filepath)}")
    if not filepath.endswith(".jsonl"):
        raise ValueError("Please provide a filepath with .jsonl extension")

    if isinstance(papers, List) and all([isinstance(p, Dict) for p in papers]):
        papers = pd.DataFrame(papers)
        logger.warning(
            "Preferably pass a pd.DataFrame, not a list of dictionaries. "
            "Passing a list is a legacy functionality that might become deprecated."
        )

    if not isinstance(papers, pd.DataFrame):
        raise TypeError(f"papers must be a pd.DataFrame, not {type(papers)}")

    paper_list = list(papers.T.to_dict().values())

    with open(filepath, "w") as f:
        for paper in paper_list:
            f.write(json.dumps(paper) + "\n")

get_emails(paper: PubMedArticle) -> List

Extracts author email addresses from PubMedArticle.

Parameters:

Name Type Description Default
paper PubMedArticle

An object of type PubMedArticle. Requires to have an 'author' field.

required

Returns:

Name Type Description
List List

A possibly empty list of emails associated to authors of the paper.

Source code in paperscraper/pubmed/utils.py
def get_emails(paper: PubMedArticle) -> List:
    """
    Extracts author email addresses from PubMedArticle.

    Args:
        paper (PubMedArticle): An object of type PubMedArticle. Requires to have
            an 'author' field.

    Returns:
        List: A possibly empty list of emails associated to authors of the paper.
    """

    emails = []
    for author in paper.authors:
        for v in author.values():
            if v is not None and "@" in v:
                parts = v.split("@")
                if len(parts) == 2:
                    # Found one email address
                    prefix = parts[0].split(" ")[-1]
                    postfix = parts[1]
                    mail = prefix + "@" + postfix
                    if not (postfix.endswith(".") or postfix.endswith(" ")):
                        emails.append(mail)
                    else:
                        emails.append(mail[:-1])
                else:
                    # Found multiple addresses
                    for idx, part in enumerate(parts):
                        try:
                            if idx == 0:
                                prefix = part.split(" ")[-1]
                            else:
                                postfix = part.split("\n")[0]

                                if postfix.endswith("."):
                                    postfix = postfix[:-1]
                                    mail = prefix + "@" + postfix
                                else:
                                    current_postfix = postfix.split(" ")[0]
                                    mail = prefix + "@" + current_postfix
                                    prefix = postfix.split(" ")[1]
                                emails.append(mail)
                        except IndexError:
                            warnings.warn(f"Mail could not be inferred from {part}.")

    return list(set(emails))

get_query_from_keywords_and_date(keywords: List[Union[str, List]], start_date: str = 'None', end_date: str = 'None') -> str

Receives a list of keywords and returns the query for the pubmed API.

Parameters:

Name Type Description Default
keywords List[str, List[str]]

Items will be AND separated. If items are lists themselves, they will be OR separated.

required
start_date str

Start date for the search. Needs to be in format: YYYY/MM/DD, e.g. '2020/07/20'. Defaults to 'None', i.e. no specific dates are used.

'None'
end_date str

End date for the search. Same notation as start_date.

'None'
If start_date and end_date are left as default, the function is

identical to get_query_from_keywords.

Returns:

Name Type Description
str str

query to enter to pubmed API.

Source code in paperscraper/pubmed/utils.py
def get_query_from_keywords_and_date(
    keywords: List[Union[str, List]], start_date: str = "None", end_date: str = "None"
) -> str:
    """Receives a list of keywords and returns the query for the pubmed API.

    Args:
        keywords (List[str, List[str]]): Items will be AND separated. If items
            are lists themselves, they will be OR separated.
        start_date (str): Start date for the search. Needs to be in format:
            YYYY/MM/DD, e.g. '2020/07/20'. Defaults to 'None', i.e. no specific
            dates are used.
        end_date (str): End date for the search. Same notation as start_date.

    Note: If start_date and end_date are left as default, the function is
        identical to get_query_from_keywords.

    Returns:
        str: query to enter to pubmed API.
    """

    query = get_query_from_keywords(keywords)

    if start_date != "None" and end_date != "None":
        date = date_root.format(start_date, end_date)
    elif start_date != "None" and end_date == "None":
        date = date_root.format(start_date, "3000")
    elif start_date == "None" and end_date != "None":
        date = date_root.format("1000", end_date)
    else:
        return query

    return query + " AND " + date

get_pubmed_papers(query: str, fields: List = ['title', 'authors', 'date', 'abstract', 'journal', 'doi'], max_results: int = 9998, *args, **kwargs) -> pd.DataFrame

Performs PubMed API request of a query and returns list of papers with fields as desired.

Parameters:

Name Type Description Default
query str

Query to PubMed API. Needs to match PubMed API notation.

required
fields List

List of strings with fields to keep in output. NOTE: If 'emails' is passed, an attempt is made to extract author mail addresses.

['title', 'authors', 'date', 'abstract', 'journal', 'doi']
max_results int

Maximal number of results retrieved from DB. Defaults to 9998, higher values likely raise problems due to PubMedAPI, see: https://stackoverflow.com/questions/75353091/biopython-entrez-article-limit

9998
args

additional arguments for pubmed.query

()
kwargs

additional arguments for pubmed.query

{}

Returns:

Type Description
DataFrame

pd.DataFrame. One paper per row.

Source code in paperscraper/pubmed/pubmed.py
def get_pubmed_papers(
    query: str,
    fields: List = ["title", "authors", "date", "abstract", "journal", "doi"],
    max_results: int = 9998,
    *args,
    **kwargs,
) -> pd.DataFrame:
    """
    Performs PubMed API request of a query and returns list of papers with
    fields as desired.

    Args:
        query: Query to PubMed API. Needs to match PubMed API notation.
        fields: List of strings with fields to keep in output.
            NOTE: If 'emails' is passed, an attempt is made to extract author mail
            addresses.
        max_results: Maximal number of results retrieved from DB. Defaults
            to 9998, higher values likely raise problems due to PubMedAPI, see:
            https://stackoverflow.com/questions/75353091/biopython-entrez-article-limit
        args: additional arguments for pubmed.query
        kwargs: additional arguments for pubmed.query

    Returns:
        pd.DataFrame. One paper per row.

    """
    if max_results > 9998:
        logger.warning(
            f"\nmax_results cannot be larger than 9998, received {max_results}."
            "This will likely result in a JSONDecodeError. Considering lowering `max_results`.\n"
            "For PubMed, ESearch can only retrieve the first 9,999 records matching the query. "
            "To obtain more than 9,999 PubMed records, consider using EDirect that contains additional"
            "logic to batch PubMed search results automatically so that an arbitrary number can be retrieved"
        )
    raw = list(PUBMED.query(query, max_results=max_results, *args, **kwargs))

    get_mails = "emails" in fields
    if get_mails:
        fields.pop(fields.index("emails"))

    processed = [
        {
            pubmed_field_mapper.get(key, key): process_fields.get(
                pubmed_field_mapper.get(key, key), lambda x: x
            )(value)
            for key, value in paper.toDict().items()
            if pubmed_field_mapper.get(key, key) in fields
        }
        for paper in raw
    ]
    if get_mails:
        for idx, paper in enumerate(raw):
            processed[idx].update({"emails": get_emails(paper)})

    return pd.DataFrame(processed)

get_and_dump_pubmed_papers(keywords: List[Union[str, List[str]]], output_filepath: str, fields: List = ['title', 'authors', 'date', 'abstract', 'journal', 'doi'], start_date: str = 'None', end_date: str = 'None', *args, **kwargs) -> None

Combines get_pubmed_papers and dump_papers.

Parameters:

Name Type Description Default
keywords List[Union[str, List[str]]]

List of keywords to request pubmed API. The outer list level will be considered as AND separated keys. The inner level as OR separated.

required
output_filepath str

Path where the dump will be saved.

required
fields List

List of strings with fields to keep in output. Defaults to ['title', 'authors', 'date', 'abstract', 'journal', 'doi']. NOTE: If 'emails' is passed, an attempt is made to extract author mail addresses.

['title', 'authors', 'date', 'abstract', 'journal', 'doi']
start_date str

Start date for the search. Needs to be in format: YYYY/MM/DD, e.g. '2020/07/20'. Defaults to 'None', i.e. no specific dates are used.

'None'
end_date str

End date for the search. Same notation as start_date.

'None'
Source code in paperscraper/pubmed/pubmed.py
def get_and_dump_pubmed_papers(
    keywords: List[Union[str, List[str]]],
    output_filepath: str,
    fields: List = ["title", "authors", "date", "abstract", "journal", "doi"],
    start_date: str = "None",
    end_date: str = "None",
    *args,
    **kwargs,
) -> None:
    """
    Combines get_pubmed_papers and dump_papers.

    Args:
        keywords: List of keywords to request pubmed API.
            The outer list level will be considered as AND separated keys.
            The inner level as OR separated.
        output_filepath: Path where the dump will be saved.
        fields: List of strings with fields to keep in output.
            Defaults to ['title', 'authors', 'date', 'abstract',
            'journal', 'doi'].
            NOTE: If 'emails' is passed, an attempt is made to extract author mail
            addresses.
        start_date: Start date for the search. Needs to be in format:
            YYYY/MM/DD, e.g. '2020/07/20'. Defaults to 'None', i.e. no specific
            dates are used.
        end_date: End date for the search. Same notation as start_date.
    """
    # Translate keywords into query.
    query = get_query_from_keywords_and_date(
        keywords, start_date=start_date, end_date=end_date
    )
    papers = get_pubmed_papers(query, fields, *args, **kwargs)
    dump_papers(papers, output_filepath)

pubmed

get_pubmed_papers(query: str, fields: List = ['title', 'authors', 'date', 'abstract', 'journal', 'doi'], max_results: int = 9998, *args, **kwargs) -> pd.DataFrame

Performs PubMed API request of a query and returns list of papers with fields as desired.

Parameters:

Name Type Description Default
query str

Query to PubMed API. Needs to match PubMed API notation.

required
fields List

List of strings with fields to keep in output. NOTE: If 'emails' is passed, an attempt is made to extract author mail addresses.

['title', 'authors', 'date', 'abstract', 'journal', 'doi']
max_results int

Maximal number of results retrieved from DB. Defaults to 9998, higher values likely raise problems due to PubMedAPI, see: https://stackoverflow.com/questions/75353091/biopython-entrez-article-limit

9998
args

additional arguments for pubmed.query

()
kwargs

additional arguments for pubmed.query

{}

Returns:

Type Description
DataFrame

pd.DataFrame. One paper per row.

Source code in paperscraper/pubmed/pubmed.py
def get_pubmed_papers(
    query: str,
    fields: List = ["title", "authors", "date", "abstract", "journal", "doi"],
    max_results: int = 9998,
    *args,
    **kwargs,
) -> pd.DataFrame:
    """
    Performs PubMed API request of a query and returns list of papers with
    fields as desired.

    Args:
        query: Query to PubMed API. Needs to match PubMed API notation.
        fields: List of strings with fields to keep in output.
            NOTE: If 'emails' is passed, an attempt is made to extract author mail
            addresses.
        max_results: Maximal number of results retrieved from DB. Defaults
            to 9998, higher values likely raise problems due to PubMedAPI, see:
            https://stackoverflow.com/questions/75353091/biopython-entrez-article-limit
        args: additional arguments for pubmed.query
        kwargs: additional arguments for pubmed.query

    Returns:
        pd.DataFrame. One paper per row.

    """
    if max_results > 9998:
        logger.warning(
            f"\nmax_results cannot be larger than 9998, received {max_results}."
            "This will likely result in a JSONDecodeError. Considering lowering `max_results`.\n"
            "For PubMed, ESearch can only retrieve the first 9,999 records matching the query. "
            "To obtain more than 9,999 PubMed records, consider using EDirect that contains additional"
            "logic to batch PubMed search results automatically so that an arbitrary number can be retrieved"
        )
    raw = list(PUBMED.query(query, max_results=max_results, *args, **kwargs))

    get_mails = "emails" in fields
    if get_mails:
        fields.pop(fields.index("emails"))

    processed = [
        {
            pubmed_field_mapper.get(key, key): process_fields.get(
                pubmed_field_mapper.get(key, key), lambda x: x
            )(value)
            for key, value in paper.toDict().items()
            if pubmed_field_mapper.get(key, key) in fields
        }
        for paper in raw
    ]
    if get_mails:
        for idx, paper in enumerate(raw):
            processed[idx].update({"emails": get_emails(paper)})

    return pd.DataFrame(processed)
get_and_dump_pubmed_papers(keywords: List[Union[str, List[str]]], output_filepath: str, fields: List = ['title', 'authors', 'date', 'abstract', 'journal', 'doi'], start_date: str = 'None', end_date: str = 'None', *args, **kwargs) -> None

Combines get_pubmed_papers and dump_papers.

Parameters:

Name Type Description Default
keywords List[Union[str, List[str]]]

List of keywords to request pubmed API. The outer list level will be considered as AND separated keys. The inner level as OR separated.

required
output_filepath str

Path where the dump will be saved.

required
fields List

List of strings with fields to keep in output. Defaults to ['title', 'authors', 'date', 'abstract', 'journal', 'doi']. NOTE: If 'emails' is passed, an attempt is made to extract author mail addresses.

['title', 'authors', 'date', 'abstract', 'journal', 'doi']
start_date str

Start date for the search. Needs to be in format: YYYY/MM/DD, e.g. '2020/07/20'. Defaults to 'None', i.e. no specific dates are used.

'None'
end_date str

End date for the search. Same notation as start_date.

'None'
Source code in paperscraper/pubmed/pubmed.py
def get_and_dump_pubmed_papers(
    keywords: List[Union[str, List[str]]],
    output_filepath: str,
    fields: List = ["title", "authors", "date", "abstract", "journal", "doi"],
    start_date: str = "None",
    end_date: str = "None",
    *args,
    **kwargs,
) -> None:
    """
    Combines get_pubmed_papers and dump_papers.

    Args:
        keywords: List of keywords to request pubmed API.
            The outer list level will be considered as AND separated keys.
            The inner level as OR separated.
        output_filepath: Path where the dump will be saved.
        fields: List of strings with fields to keep in output.
            Defaults to ['title', 'authors', 'date', 'abstract',
            'journal', 'doi'].
            NOTE: If 'emails' is passed, an attempt is made to extract author mail
            addresses.
        start_date: Start date for the search. Needs to be in format:
            YYYY/MM/DD, e.g. '2020/07/20'. Defaults to 'None', i.e. no specific
            dates are used.
        end_date: End date for the search. Same notation as start_date.
    """
    # Translate keywords into query.
    query = get_query_from_keywords_and_date(
        keywords, start_date=start_date, end_date=end_date
    )
    papers = get_pubmed_papers(query, fields, *args, **kwargs)
    dump_papers(papers, output_filepath)

utils

get_query_from_keywords(keywords: List[Union[str, List]]) -> str

Receives a list of keywords and returns the query for the pubmed API.

Parameters:

Name Type Description Default
keywords List[str, List[str]]

Items will be AND separated. If items are lists themselves, they will be OR separated.

required

Returns:

Name Type Description
str str

query to enter to pubmed API.

Source code in paperscraper/pubmed/utils.py
def get_query_from_keywords(keywords: List[Union[str, List]]) -> str:
    """Receives a list of keywords and returns the query for the pubmed API.

    Args:
        keywords (List[str, List[str]]): Items will be AND separated. If items
            are lists themselves, they will be OR separated.

    Returns:
        str: query to enter to pubmed API.
    """

    query = ""
    for i, key in enumerate(keywords):
        if isinstance(key, str):
            query += f"({key}) AND "
        elif isinstance(key, list):
            inter = "".join([f"({syn}) OR " for syn in key])
            query += finalize_disjunction(inter)

    query = finalize_conjunction(query)
    return query
get_query_from_keywords_and_date(keywords: List[Union[str, List]], start_date: str = 'None', end_date: str = 'None') -> str

Receives a list of keywords and returns the query for the pubmed API.

Parameters:

Name Type Description Default
keywords List[str, List[str]]

Items will be AND separated. If items are lists themselves, they will be OR separated.

required
start_date str

Start date for the search. Needs to be in format: YYYY/MM/DD, e.g. '2020/07/20'. Defaults to 'None', i.e. no specific dates are used.

'None'
end_date str

End date for the search. Same notation as start_date.

'None'
If start_date and end_date are left as default, the function is

identical to get_query_from_keywords.

Returns:

Name Type Description
str str

query to enter to pubmed API.

Source code in paperscraper/pubmed/utils.py
def get_query_from_keywords_and_date(
    keywords: List[Union[str, List]], start_date: str = "None", end_date: str = "None"
) -> str:
    """Receives a list of keywords and returns the query for the pubmed API.

    Args:
        keywords (List[str, List[str]]): Items will be AND separated. If items
            are lists themselves, they will be OR separated.
        start_date (str): Start date for the search. Needs to be in format:
            YYYY/MM/DD, e.g. '2020/07/20'. Defaults to 'None', i.e. no specific
            dates are used.
        end_date (str): End date for the search. Same notation as start_date.

    Note: If start_date and end_date are left as default, the function is
        identical to get_query_from_keywords.

    Returns:
        str: query to enter to pubmed API.
    """

    query = get_query_from_keywords(keywords)

    if start_date != "None" and end_date != "None":
        date = date_root.format(start_date, end_date)
    elif start_date != "None" and end_date == "None":
        date = date_root.format(start_date, "3000")
    elif start_date == "None" and end_date != "None":
        date = date_root.format("1000", end_date)
    else:
        return query

    return query + " AND " + date
get_emails(paper: PubMedArticle) -> List

Extracts author email addresses from PubMedArticle.

Parameters:

Name Type Description Default
paper PubMedArticle

An object of type PubMedArticle. Requires to have an 'author' field.

required

Returns:

Name Type Description
List List

A possibly empty list of emails associated to authors of the paper.

Source code in paperscraper/pubmed/utils.py
def get_emails(paper: PubMedArticle) -> List:
    """
    Extracts author email addresses from PubMedArticle.

    Args:
        paper (PubMedArticle): An object of type PubMedArticle. Requires to have
            an 'author' field.

    Returns:
        List: A possibly empty list of emails associated to authors of the paper.
    """

    emails = []
    for author in paper.authors:
        for v in author.values():
            if v is not None and "@" in v:
                parts = v.split("@")
                if len(parts) == 2:
                    # Found one email address
                    prefix = parts[0].split(" ")[-1]
                    postfix = parts[1]
                    mail = prefix + "@" + postfix
                    if not (postfix.endswith(".") or postfix.endswith(" ")):
                        emails.append(mail)
                    else:
                        emails.append(mail[:-1])
                else:
                    # Found multiple addresses
                    for idx, part in enumerate(parts):
                        try:
                            if idx == 0:
                                prefix = part.split(" ")[-1]
                            else:
                                postfix = part.split("\n")[0]

                                if postfix.endswith("."):
                                    postfix = postfix[:-1]
                                    mail = prefix + "@" + postfix
                                else:
                                    current_postfix = postfix.split(" ")[0]
                                    mail = prefix + "@" + current_postfix
                                    prefix = postfix.split(" ")[1]
                                emails.append(mail)
                        except IndexError:
                            warnings.warn(f"Mail could not be inferred from {part}.")

    return list(set(emails))

scholar

get_citations_from_title(title: str) -> int

Parameters:

Name Type Description Default
title str

Title of paper to be searched on Scholar.

required

Raises:

Type Description
TypeError

If sth else than str is passed.

Returns:

Name Type Description
int int

Number of citations of paper.

Source code in paperscraper/citations/citations.py
def get_citations_from_title(title: str) -> int:
    """
    Args:
        title (str): Title of paper to be searched on Scholar.

    Raises:
        TypeError: If sth else than str is passed.

    Returns:
        int: Number of citations of paper.
    """

    if not isinstance(title, str):
        raise TypeError(f"Pass str not {type(title)}")

    # Search for exact match
    title = '"' + title.strip() + '"'

    matches = scholarly.search_pubs(title)
    counts = list(map(lambda p: int(p["num_citations"]), matches))
    if len(counts) == 0:
        logger.warning(f"Found no match for {title}.")
        return 0
    if len(counts) > 1:
        logger.warning(f"Found {len(counts)} matches for {title}, returning first one.")
    return counts[0]

dump_papers(papers: pd.DataFrame, filepath: str) -> None

Receives a pd.DataFrame, one paper per row and dumps it into a .jsonl file with one paper per line.

Parameters:

Name Type Description Default
papers DataFrame

A dataframe of paper metadata, one paper per row.

required
filepath str

Path to dump the papers, has to end with .jsonl.

required
Source code in paperscraper/utils.py
def dump_papers(papers: pd.DataFrame, filepath: str) -> None:
    """
    Receives a pd.DataFrame, one paper per row and dumps it into a .jsonl
    file with one paper per line.

    Args:
        papers (pd.DataFrame): A dataframe of paper metadata, one paper per row.
        filepath (str): Path to dump the papers, has to end with `.jsonl`.
    """
    if not isinstance(filepath, str):
        raise TypeError(f"filepath must be a string, not {type(filepath)}")
    if not filepath.endswith(".jsonl"):
        raise ValueError("Please provide a filepath with .jsonl extension")

    if isinstance(papers, List) and all([isinstance(p, Dict) for p in papers]):
        papers = pd.DataFrame(papers)
        logger.warning(
            "Preferably pass a pd.DataFrame, not a list of dictionaries. "
            "Passing a list is a legacy functionality that might become deprecated."
        )

    if not isinstance(papers, pd.DataFrame):
        raise TypeError(f"papers must be a pd.DataFrame, not {type(papers)}")

    paper_list = list(papers.T.to_dict().values())

    with open(filepath, "w") as f:
        for paper in paper_list:
            f.write(json.dumps(paper) + "\n")

get_scholar_papers(title: str, fields: List = ['title', 'authors', 'year', 'abstract', 'journal', 'citations'], *args, **kwargs) -> pd.DataFrame

Performs Google Scholar API request of a given title and returns list of papers with fields as desired.

Parameters:

Name Type Description Default
title str

Query to arxiv API. Needs to match the arxiv API notation.

required
fields List

List of strings with fields to keep in output.

['title', 'authors', 'year', 'abstract', 'journal', 'citations']

Returns:

Type Description
DataFrame

pd.DataFrame. One paper per row.

Source code in paperscraper/scholar/scholar.py
def get_scholar_papers(
    title: str,
    fields: List = ["title", "authors", "year", "abstract", "journal", "citations"],
    *args,
    **kwargs,
) -> pd.DataFrame:
    """
    Performs Google Scholar API request of a given title and returns list of papers with
    fields as desired.

    Args:
        title: Query to arxiv API. Needs to match the arxiv API notation.
        fields: List of strings with fields to keep in output.

    Returns:
        pd.DataFrame. One paper per row.

    """
    logger.info(
        "NOTE: Scholar API cannot be used with Boolean logic in keywords."
        "Query should be a single string to be entered in the Scholar search field."
    )
    if not isinstance(title, str):
        raise TypeError(f"Pass str not {type(title)}")

    matches = scholarly.search_pubs(title)

    processed = []
    for paper in matches:
        # Extracts title, author, year, journal, abstract
        entry = {
            scholar_field_mapper.get(key, key): process_fields.get(
                scholar_field_mapper.get(key, key), lambda x: x
            )(value)
            for key, value in paper["bib"].items()
            if scholar_field_mapper.get(key, key) in fields
        }

        entry["citations"] = paper["num_citations"]
        processed.append(entry)

    return pd.DataFrame(processed)

get_and_dump_scholar_papers(title: str, output_filepath: str, fields: List = ['title', 'authors', 'year', 'abstract', 'journal', 'citations']) -> None

Combines get_scholar_papers and dump_papers.

Parameters:

Name Type Description Default
title str

Paper to search for on Google Scholar.

required
output_filepath str

Path where the dump will be saved.

required
fields List

List of strings with fields to keep in output.

['title', 'authors', 'year', 'abstract', 'journal', 'citations']
Source code in paperscraper/scholar/scholar.py
def get_and_dump_scholar_papers(
    title: str,
    output_filepath: str,
    fields: List = ["title", "authors", "year", "abstract", "journal", "citations"],
) -> None:
    """
    Combines get_scholar_papers and dump_papers.

    Args:
        title: Paper to search for on Google Scholar.
        output_filepath: Path where the dump will be saved.
        fields: List of strings with fields to keep in output.
    """
    papers = get_scholar_papers(title, fields)
    dump_papers(papers, output_filepath)

scholar

get_scholar_papers(title: str, fields: List = ['title', 'authors', 'year', 'abstract', 'journal', 'citations'], *args, **kwargs) -> pd.DataFrame

Performs Google Scholar API request of a given title and returns list of papers with fields as desired.

Parameters:

Name Type Description Default
title str

Query to arxiv API. Needs to match the arxiv API notation.

required
fields List

List of strings with fields to keep in output.

['title', 'authors', 'year', 'abstract', 'journal', 'citations']

Returns:

Type Description
DataFrame

pd.DataFrame. One paper per row.

Source code in paperscraper/scholar/scholar.py
def get_scholar_papers(
    title: str,
    fields: List = ["title", "authors", "year", "abstract", "journal", "citations"],
    *args,
    **kwargs,
) -> pd.DataFrame:
    """
    Performs Google Scholar API request of a given title and returns list of papers with
    fields as desired.

    Args:
        title: Query to arxiv API. Needs to match the arxiv API notation.
        fields: List of strings with fields to keep in output.

    Returns:
        pd.DataFrame. One paper per row.

    """
    logger.info(
        "NOTE: Scholar API cannot be used with Boolean logic in keywords."
        "Query should be a single string to be entered in the Scholar search field."
    )
    if not isinstance(title, str):
        raise TypeError(f"Pass str not {type(title)}")

    matches = scholarly.search_pubs(title)

    processed = []
    for paper in matches:
        # Extracts title, author, year, journal, abstract
        entry = {
            scholar_field_mapper.get(key, key): process_fields.get(
                scholar_field_mapper.get(key, key), lambda x: x
            )(value)
            for key, value in paper["bib"].items()
            if scholar_field_mapper.get(key, key) in fields
        }

        entry["citations"] = paper["num_citations"]
        processed.append(entry)

    return pd.DataFrame(processed)
get_and_dump_scholar_papers(title: str, output_filepath: str, fields: List = ['title', 'authors', 'year', 'abstract', 'journal', 'citations']) -> None

Combines get_scholar_papers and dump_papers.

Parameters:

Name Type Description Default
title str

Paper to search for on Google Scholar.

required
output_filepath str

Path where the dump will be saved.

required
fields List

List of strings with fields to keep in output.

['title', 'authors', 'year', 'abstract', 'journal', 'citations']
Source code in paperscraper/scholar/scholar.py
def get_and_dump_scholar_papers(
    title: str,
    output_filepath: str,
    fields: List = ["title", "authors", "year", "abstract", "journal", "citations"],
) -> None:
    """
    Combines get_scholar_papers and dump_papers.

    Args:
        title: Paper to search for on Google Scholar.
        output_filepath: Path where the dump will be saved.
        fields: List of strings with fields to keep in output.
    """
    papers = get_scholar_papers(title, fields)
    dump_papers(papers, output_filepath)

server_dumps

Folder for the metadata dumps from biorxiv, medrxiv and chemrxiv API. No code here but will be populated with your local .jsonl files.

tests

test_pdf

TestPDF
Source code in paperscraper/tests/test_pdf.py
class TestPDF:
    @pytest.fixture
    def paper_data(self):
        return {"doi": "10.48550/arXiv.2207.03928"}

    def test_basic_search(self):
        paper_data = {"doi": "10.48550/arXiv.2207.03928"}
        save_pdf(paper_data, filepath="gt4sd.pdf", save_metadata=True)
        assert os.path.exists("gt4sd.pdf")
        assert os.path.exists("gt4sd.json")
        os.remove("gt4sd.pdf")
        os.remove("gt4sd.json")

        # chemrxiv
        paper_data = {"doi": "10.26434/chemrxiv-2021-np7xj-v4"}
        save_pdf(paper_data, filepath="kinases.pdf", save_metadata=True)
        assert os.path.exists("kinases.pdf")
        assert os.path.exists("kinases.json")
        os.remove("kinases.pdf")
        os.remove("kinases.json")

        # biorxiv
        if os.path.exists("taskload.pdf"):
            os.remove("taskload.pdf")
        paper_data = {"doi": "10.1101/798496"}
        # NOTE: biorxiv is cloudflare controlled so standard scraping fails

        # Now try with S3 routine
        keys = load_api_keys("api_keys.txt")
        save_pdf(
            {"doi": "10.1101/786871"},
            filepath="taskload.pdf",
            save_metadata=False,
            api_keys=keys,
        )
        assert os.path.exists("taskload.pdf")
        os.remove("taskload.pdf")

        # Test S3 fallback explicitly
        FALLBACKS["s3"](doi="10.1101/786871", output_path="taskload.pdf", api_keys=keys)
        assert os.path.exists("taskload.pdf")
        os.remove("taskload.pdf")

        # Test S3 fallback with newer DOIs (including year/month/day)
        FALLBACKS["s3"](doi="10.1101/2023.10.09.561414", output_path="taskload.pdf", api_keys=keys)
        assert os.path.exists("taskload.pdf")
        os.remove("taskload.pdf")

        # medrxiv now also seems cloudflare-controlled. skipping test
        # paper_data = {"doi": "10.1101/2020.09.02.20187096"}
        # save_pdf(paper_data, filepath="covid_review.pdf", save_metadata=True)
        # assert os.path.exists("covid_review.pdf")
        # assert os.path.exists("covid_review.json")
        # os.remove("covid_review.pdf")
        # os.remove("covid_review.json")

        # journal with OA paper
        paper_data = {"doi": "10.1038/s42256-023-00639-z"}
        save_pdf(paper_data, filepath="regression_transformer", save_metadata=True)
        assert os.path.exists("regression_transformer.pdf")
        assert os.path.exists("regression_transformer.json")
        os.remove("regression_transformer.pdf")
        os.remove("regression_transformer.json")

        # book chapter with paywall
        paper_data = {"doi": "10.1007/978-981-97-4828-0_7"}
        save_pdf(paper_data, filepath="clm_chapter", save_metadata=True)
        assert not os.path.exists("clm_chapter.pdf")
        assert os.path.exists("clm_chapter.json")
        os.remove("clm_chapter.json")

        # journal without OA paper
        paper_data = {"doi": "10.1126/science.adk9587"}
        save_pdf(paper_data, filepath="color", save_metadata=True)
        assert not os.path.exists("color.pdf")
        assert not os.path.exists("color.json")

    def test_missing_doi(self):
        with pytest.raises(KeyError):
            paper_data = {"title": "Sample Paper"}
            save_pdf(paper_data, "sample_paper.pdf")

    def test_invalid_metadata_type(self):
        with pytest.raises(TypeError):
            save_pdf(paper_metadata="not_a_dict", filepath="output.pdf")

    def test_missing_doi_key(self):
        with pytest.raises(KeyError):
            save_pdf(paper_metadata={}, filepath="output.pdf")

    def test_invalid_filepath_type(self):
        with pytest.raises(TypeError):
            save_pdf(paper_metadata=self.paper_data, filepath=123)

    def test_incorrect_filepath_extension(self):
        with pytest.raises(TypeError):
            save_pdf(paper_metadata=self.paper_data, filepath="output.txt")

    def test_incorrect_filepath_type(self):
        with pytest.raises(TypeError):
            save_pdf(paper_metadata=list(self.paper_data), filepath="output.txt")

    def test_nonexistent_directory_in_filepath(self, paper_data):
        with pytest.raises(ValueError):
            save_pdf(paper_metadata=paper_data, filepath="/nonexistent/output.pdf")

    @patch("requests.get")
    def test_network_issues_on_doi_url_request(self, mock_get, paper_data):
        mock_get.side_effect = Exception("Network error")
        save_pdf(paper_metadata=paper_data, filepath="output.pdf")
        assert not os.path.exists("output.pdf")

    @patch("requests.get")
    def test_missing_pdf_url_in_meta_tags(self, mock_get, paper_data):
        response = MagicMock()
        response.text = "<html></html>"
        mock_get.return_value = response
        save_pdf(paper_metadata=paper_data, filepath="output.pdf")
        assert not os.path.exists("output.pdf")

    @patch("requests.get")
    def test_network_issues_on_pdf_url_request(self, mock_get, paper_data):
        response_doi = MagicMock()
        response_doi.text = (
            '<meta name="citation_pdf_url" content="http://valid.url/document.pdf">'
        )
        mock_get.side_effect = [response_doi, Exception("Network error")]
        save_pdf(paper_metadata=paper_data, filepath="output.pdf")
        assert not os.path.exists("output.pdf")

    def test_save_pdf_from_dump_wrong_type(self):
        with pytest.raises(TypeError):
            save_pdf_from_dump(-1, pdf_path=SAVE_PATH, key_to_save="doi")

    def test_save_pdf_from_dump_wrong_output_type(self):
        with pytest.raises(TypeError):
            save_pdf_from_dump(TEST_FILE_PATH, pdf_path=1, key_to_save="doi")

    def test_save_pdf_from_dump_wrong_suffix(self):
        with pytest.raises(ValueError):
            save_pdf_from_dump(
                TEST_FILE_PATH.replace("jsonl", "json"),
                pdf_path=SAVE_PATH,
                key_to_save="doi",
            )

    def test_save_pdf_from_dump_wrong_key(self):
        with pytest.raises(ValueError):
            save_pdf_from_dump(TEST_FILE_PATH, pdf_path=SAVE_PATH, key_to_save="doix")

    def test_save_pdf_from_dump_wrong_key_type(self):
        with pytest.raises(TypeError):
            save_pdf_from_dump(TEST_FILE_PATH, pdf_path=SAVE_PATH, key_to_save=["doix"])

    def test_save_pdf_from_dump(self):
        os.makedirs(SAVE_PATH, exist_ok=True)
        save_pdf_from_dump(TEST_FILE_PATH, pdf_path=SAVE_PATH, key_to_save="doi")
        shutil.rmtree(SAVE_PATH)

    def test_api_keys_none_pmc(self):
        """Test that save_pdf works properly even when no API keys are provided. Paper in PMC."""
        test_doi = {"doi": "10.1038/s41587-022-01613-7"}  # DOI known to be in PMC
        filename = SAVE_PATH + "_pmc"
        # Call function with no API keys
        save_pdf(test_doi, filepath=filename, api_keys=None)

        # Verify file was created - with .xml extension from PMC fallback
        assert os.path.exists(filename + ".xml"), (
            "XML file was not created via PMC fallback"
        )
        os.remove(filename + ".xml")

    def test_api_keys_none_oa(self):
        """Test that save_pdf works properly even when no API keys are provided. Paper available open-access."""
        test_doi = {"doi": "10.1038/s42256-023-00639-z"}  # DOI known to be OA
        filename = SAVE_PATH + "_oa"
        # Call function with no API keys
        save_pdf(test_doi, filepath=filename, api_keys=None)

        # Verify file was created - with .pdf extension for direct PDF download
        assert os.path.exists(filename + ".pdf"), (
            "PDF file was not created for OA content"
        )
        os.remove(filename + ".pdf")

    def test_api_key_file(self):
        test_doi = {"doi": "10.1002/smll.202309431"}  # Use a DOI from Wiley
        with open("tmp_keyfile.txt", "w") as f:
            f.write("WILEY_TDM_API_TOKEN=INVALID_TEST_KEY_123")
        save_pdf(test_doi, filepath=SAVE_PATH, api_keys="tmp_keyfile.txt")
        os.remove("tmp_keyfile.txt")

    def test_api_key_env(self):
        test_doi = {"doi": "10.1002/smll.202309431"}  # Use a DOI known to be in PMC
        with patch.dict(
            os.environ, {"WILEY_TDM_API_TOKEN": "ANOTHER_INVALID_TEST_KEY"}
        ):
            save_pdf(test_doi, filepath=SAVE_PATH, api_keys=None)

    @pytest.mark.skipif(
        os.getenv("INSTITUTIONAL_NETWORK") != "1",
        reason="Not in an institutional network",
    )
    def test_api_key_file_academic_network(self):
        test_doi = {"doi": "10.1002/smll.202309431"}  # Use a DOI from Wiley
        filename = SAVE_PATH + "_wiley"
        wiley_key_path = SAVE_PATH + "_wiley_key1"
        success = False
        try:
            with open(wiley_key_path, "w") as f:
                f.write("WILEY_TDM_API_TOKEN=INVALID_TEST_KEY_123")
            save_pdf(test_doi, filepath=filename, api_keys=wiley_key_path)
            # Verify file was created - with .pdf extension for Wiley content
            assert os.path.exists(filename + ".pdf"), (
                "PDF file was not created for Wiley content"
            )
            success = True
        finally:
            for file in [filename + ".pdf", wiley_key_path]:
                if os.path.exists(file):
                    os.remove(file)
            if not success:
                raise ValueError("PDF file was not created for Wiley content")

    @pytest.mark.skipif(
        os.getenv("INSTITUTIONAL_NETWORK") != "1",
        reason="Not in an institutional network",
    )
    def test_api_key_file_env_academic_network(self):
        test_doi = {"doi": "10.1002/smll.202309431"}  # Use a DOI from Wiley
        filename = SAVE_PATH + "_wiley"
        line = "WILEY_TDM_API_TOKEN=INVALID_TEST_KEY_123\n"
        # Append to .env file in the current directory
        with open(".env", "a") as f:
            f.write(line)

        try:
            save_pdf(test_doi, filepath=filename, api_keys=None)

            # Verify file was created - with .pdf extension for Wiley content
            assert os.path.exists(filename + ".pdf"), (
                "PDF file was not created for Wiley content"
            )
        finally:
            # Clean up
            if os.path.exists(filename + ".pdf"):
                os.remove(filename + ".pdf")
            with open(".env", "r") as f:
                lines = f.readlines()
            if lines and lines[-1] == line:
                with open(".env", "w") as f:
                    f.writelines(lines[:-1])

    def test_fallback_bioc_pmc_real_api(self):
        """Test the BioC-PMC fallback with a real API call."""
        test_doi = "10.1038/s41587-022-01613-7"  # Use a DOI known to be in PMC
        output_path = Path("test_bioc_pmc_output")
        try:
            result = FALLBACKS["bioc_pmc"](test_doi, output_path)
            assert result is True
            assert (output_path.with_suffix(".xml")).exists()
            with open(
                output_path.with_suffix(".xml"), "r"
            ) as f:  # Check if the file contains XML data
                content = f.read()
                assert "<" in content and ">" in content  # Basic XML check
                assert len(content) > 100  # Should have substantial content
        finally:
            if (output_path.with_suffix(".xml")).exists():
                os.remove(output_path.with_suffix(".xml"))

    def test_fallback_bioc_pmc_no_pmcid(self):
        """Test BioC-PMC fallback when no PMCID is available."""
        test_doi = "10.1002/smll.202309431"  # This DOI should not have a PMCID
        output_path = Path("test_bioc_pmc_no_pmcid")
        result = FALLBACKS["bioc_pmc"](test_doi, output_path)
        assert result is False
        assert not os.path.exists(output_path.with_suffix(".xml"))

    def test_fallback_elife_xml_real_api(self):
        """Test the eLife XML fallback with a real API call."""
        test_doi = "10.7554/eLife.100173"  # Use a DOI known to be in eLife
        output_path = Path("test_elife_xml_output")
        try:
            result = FALLBACKS["elife"](test_doi, output_path)
            assert result is True
            assert (output_path.with_suffix(".xml")).exists()
            with open(
                output_path.with_suffix(".xml"), "r"
            ) as f:  # Check if the file contains XML data
                content = f.read()
                assert "<" in content and ">" in content  # Basic XML check
                assert len(content) > 100  # Should have substantial content
        finally:
            if (output_path.with_suffix(".xml")).exists():
                os.remove(output_path.with_suffix(".xml"))

    def test_fallback_elife_nonexistent_article(self):
        """Test eLife XML fallback with a DOI that looks like eLife but doesn't exist."""
        test_doi = (
            "10.7554/eLife.00001"  # Article that doesn't exist in eLife repository
        )
        output_path = Path("test_elife_nonexistent")
        result = FALLBACKS["elife"](test_doi, output_path)
        # Assertions - should return False and not create a file
        assert result is False
        assert not os.path.exists(output_path.with_suffix(".xml"))

    @patch("requests.get")
    def test_fallback_wiley_api_mock(self, mock_get):
        """Test Wiley API fallback with mocked response."""
        mock_response = MagicMock()
        mock_response.content = b"%PDF-1.5 test content"
        mock_response.raise_for_status = MagicMock()
        mock_get.return_value = mock_response
        paper_metadata = {"doi": "10.1002/smll.202309431"}
        output_path = Path("test_wiley_output")
        api_keys = {"WILEY_TDM_API_TOKEN": "test_token"}
        try:
            FALLBACKS["wiley"](paper_metadata, output_path, api_keys)
            assert mock_get.called
            mock_get.assert_called_with(
                "https://api.wiley.com/onlinelibrary/tdm/v1/articles/10.1002%2Fsmll.202309431",
                headers={"Wiley-TDM-Client-Token": "test_token"},
                allow_redirects=True,
                timeout=60,
            )
            pdf_path = output_path.with_suffix(".pdf")
            assert os.path.exists(pdf_path)
            with open(pdf_path, "rb") as f:
                content = f.read()
                assert content == b"%PDF-1.5 test content"
        finally:
            if os.path.exists(output_path.with_suffix(".pdf")):
                os.remove(output_path.with_suffix(".pdf"))

    def test_fallback_wiley_api_returns_boolean(self):
        """Test that fallback_wiley_api properly returns a boolean value."""
        paper_metadata = {"doi": "10.1002/smll.202309431"}
        output_path = Path("test_wiley_output")
        api_keys = {"WILEY_TDM_API_TOKEN": "INVALID_TEST_KEY_123"}
        result = FALLBACKS["wiley"](paper_metadata, output_path, api_keys)
        # Check the result is a boolean
        # will be True if on university network and False otherwise
        assert isinstance(result, bool)
        if result and output_path.with_suffix(".pdf").exists():
            os.remove(output_path.with_suffix(".pdf"))

    @patch("requests.get")
    def test_fallback_elsevier_api_mock(self, mock_get):
        """Test Elsevier API fallback with mocked response."""
        mock_response = MagicMock()
        mock_response.content = b"<xml>Test content</xml>"
        mock_response.raise_for_status = MagicMock()
        mock_get.return_value = mock_response
        paper_metadata = {"doi": "10.1016/j.xops.2024.100504"}
        output_path = Path("test_elsevier_output")
        api_keys = {"ELSEVIER_TDM_API_KEY": "test_key"}
        try:
            FALLBACKS["elsevier"](paper_metadata, output_path, api_keys)
            assert mock_get.called
            mock_get.assert_called_with(
                "https://api.elsevier.com/content/article/doi/10.1016/j.xops.2024.100504?apiKey=test_key&httpAccept=text%2Fxml",
                headers={"Accept": "application/xml"},
                timeout=60,
            )
            xml_path = output_path.with_suffix(".xml")
            assert os.path.exists(xml_path)
            with open(xml_path, "rb") as f:
                content = f.read()
                assert content == b"<xml>Test content</xml>"
        finally:
            if os.path.exists(output_path.with_suffix(".xml")):
                os.remove(output_path.with_suffix(".xml"))

    def test_fallback_elsevier_api_invalid_key(self, caplog):
        """Test real Elsevier API connectivity by verifying invalid key response pattern."""
        caplog.set_level(logging.ERROR)
        paper_metadata = {"doi": "10.1016/j.xops.2024.100504"}
        output_path = Path("test_elsevier_invalid")
        api_keys = {"ELSEVIER_TDM_API_KEY": "INVALID_TEST_KEY_123"}
        result = FALLBACKS["elsevier"](paper_metadata, output_path, api_keys)
        # Should return False for invalid key
        assert result is False
        assert not output_path.with_suffix(".xml").exists()
        # Check for the specific APIKEY_INVALID error in the logs
        assert "invalid" in caplog.text.lower()
test_api_keys_none_pmc()

Test that save_pdf works properly even when no API keys are provided. Paper in PMC.

Source code in paperscraper/tests/test_pdf.py
def test_api_keys_none_pmc(self):
    """Test that save_pdf works properly even when no API keys are provided. Paper in PMC."""
    test_doi = {"doi": "10.1038/s41587-022-01613-7"}  # DOI known to be in PMC
    filename = SAVE_PATH + "_pmc"
    # Call function with no API keys
    save_pdf(test_doi, filepath=filename, api_keys=None)

    # Verify file was created - with .xml extension from PMC fallback
    assert os.path.exists(filename + ".xml"), (
        "XML file was not created via PMC fallback"
    )
    os.remove(filename + ".xml")
test_api_keys_none_oa()

Test that save_pdf works properly even when no API keys are provided. Paper available open-access.

Source code in paperscraper/tests/test_pdf.py
def test_api_keys_none_oa(self):
    """Test that save_pdf works properly even when no API keys are provided. Paper available open-access."""
    test_doi = {"doi": "10.1038/s42256-023-00639-z"}  # DOI known to be OA
    filename = SAVE_PATH + "_oa"
    # Call function with no API keys
    save_pdf(test_doi, filepath=filename, api_keys=None)

    # Verify file was created - with .pdf extension for direct PDF download
    assert os.path.exists(filename + ".pdf"), (
        "PDF file was not created for OA content"
    )
    os.remove(filename + ".pdf")
test_fallback_bioc_pmc_real_api()

Test the BioC-PMC fallback with a real API call.

Source code in paperscraper/tests/test_pdf.py
def test_fallback_bioc_pmc_real_api(self):
    """Test the BioC-PMC fallback with a real API call."""
    test_doi = "10.1038/s41587-022-01613-7"  # Use a DOI known to be in PMC
    output_path = Path("test_bioc_pmc_output")
    try:
        result = FALLBACKS["bioc_pmc"](test_doi, output_path)
        assert result is True
        assert (output_path.with_suffix(".xml")).exists()
        with open(
            output_path.with_suffix(".xml"), "r"
        ) as f:  # Check if the file contains XML data
            content = f.read()
            assert "<" in content and ">" in content  # Basic XML check
            assert len(content) > 100  # Should have substantial content
    finally:
        if (output_path.with_suffix(".xml")).exists():
            os.remove(output_path.with_suffix(".xml"))
test_fallback_bioc_pmc_no_pmcid()

Test BioC-PMC fallback when no PMCID is available.

Source code in paperscraper/tests/test_pdf.py
def test_fallback_bioc_pmc_no_pmcid(self):
    """Test BioC-PMC fallback when no PMCID is available."""
    test_doi = "10.1002/smll.202309431"  # This DOI should not have a PMCID
    output_path = Path("test_bioc_pmc_no_pmcid")
    result = FALLBACKS["bioc_pmc"](test_doi, output_path)
    assert result is False
    assert not os.path.exists(output_path.with_suffix(".xml"))
test_fallback_elife_xml_real_api()

Test the eLife XML fallback with a real API call.

Source code in paperscraper/tests/test_pdf.py
def test_fallback_elife_xml_real_api(self):
    """Test the eLife XML fallback with a real API call."""
    test_doi = "10.7554/eLife.100173"  # Use a DOI known to be in eLife
    output_path = Path("test_elife_xml_output")
    try:
        result = FALLBACKS["elife"](test_doi, output_path)
        assert result is True
        assert (output_path.with_suffix(".xml")).exists()
        with open(
            output_path.with_suffix(".xml"), "r"
        ) as f:  # Check if the file contains XML data
            content = f.read()
            assert "<" in content and ">" in content  # Basic XML check
            assert len(content) > 100  # Should have substantial content
    finally:
        if (output_path.with_suffix(".xml")).exists():
            os.remove(output_path.with_suffix(".xml"))
test_fallback_elife_nonexistent_article()

Test eLife XML fallback with a DOI that looks like eLife but doesn't exist.

Source code in paperscraper/tests/test_pdf.py
def test_fallback_elife_nonexistent_article(self):
    """Test eLife XML fallback with a DOI that looks like eLife but doesn't exist."""
    test_doi = (
        "10.7554/eLife.00001"  # Article that doesn't exist in eLife repository
    )
    output_path = Path("test_elife_nonexistent")
    result = FALLBACKS["elife"](test_doi, output_path)
    # Assertions - should return False and not create a file
    assert result is False
    assert not os.path.exists(output_path.with_suffix(".xml"))
test_fallback_wiley_api_mock(mock_get)

Test Wiley API fallback with mocked response.

Source code in paperscraper/tests/test_pdf.py
@patch("requests.get")
def test_fallback_wiley_api_mock(self, mock_get):
    """Test Wiley API fallback with mocked response."""
    mock_response = MagicMock()
    mock_response.content = b"%PDF-1.5 test content"
    mock_response.raise_for_status = MagicMock()
    mock_get.return_value = mock_response
    paper_metadata = {"doi": "10.1002/smll.202309431"}
    output_path = Path("test_wiley_output")
    api_keys = {"WILEY_TDM_API_TOKEN": "test_token"}
    try:
        FALLBACKS["wiley"](paper_metadata, output_path, api_keys)
        assert mock_get.called
        mock_get.assert_called_with(
            "https://api.wiley.com/onlinelibrary/tdm/v1/articles/10.1002%2Fsmll.202309431",
            headers={"Wiley-TDM-Client-Token": "test_token"},
            allow_redirects=True,
            timeout=60,
        )
        pdf_path = output_path.with_suffix(".pdf")
        assert os.path.exists(pdf_path)
        with open(pdf_path, "rb") as f:
            content = f.read()
            assert content == b"%PDF-1.5 test content"
    finally:
        if os.path.exists(output_path.with_suffix(".pdf")):
            os.remove(output_path.with_suffix(".pdf"))
test_fallback_wiley_api_returns_boolean()

Test that fallback_wiley_api properly returns a boolean value.

Source code in paperscraper/tests/test_pdf.py
def test_fallback_wiley_api_returns_boolean(self):
    """Test that fallback_wiley_api properly returns a boolean value."""
    paper_metadata = {"doi": "10.1002/smll.202309431"}
    output_path = Path("test_wiley_output")
    api_keys = {"WILEY_TDM_API_TOKEN": "INVALID_TEST_KEY_123"}
    result = FALLBACKS["wiley"](paper_metadata, output_path, api_keys)
    # Check the result is a boolean
    # will be True if on university network and False otherwise
    assert isinstance(result, bool)
    if result and output_path.with_suffix(".pdf").exists():
        os.remove(output_path.with_suffix(".pdf"))
test_fallback_elsevier_api_mock(mock_get)

Test Elsevier API fallback with mocked response.

Source code in paperscraper/tests/test_pdf.py
@patch("requests.get")
def test_fallback_elsevier_api_mock(self, mock_get):
    """Test Elsevier API fallback with mocked response."""
    mock_response = MagicMock()
    mock_response.content = b"<xml>Test content</xml>"
    mock_response.raise_for_status = MagicMock()
    mock_get.return_value = mock_response
    paper_metadata = {"doi": "10.1016/j.xops.2024.100504"}
    output_path = Path("test_elsevier_output")
    api_keys = {"ELSEVIER_TDM_API_KEY": "test_key"}
    try:
        FALLBACKS["elsevier"](paper_metadata, output_path, api_keys)
        assert mock_get.called
        mock_get.assert_called_with(
            "https://api.elsevier.com/content/article/doi/10.1016/j.xops.2024.100504?apiKey=test_key&httpAccept=text%2Fxml",
            headers={"Accept": "application/xml"},
            timeout=60,
        )
        xml_path = output_path.with_suffix(".xml")
        assert os.path.exists(xml_path)
        with open(xml_path, "rb") as f:
            content = f.read()
            assert content == b"<xml>Test content</xml>"
    finally:
        if os.path.exists(output_path.with_suffix(".xml")):
            os.remove(output_path.with_suffix(".xml"))
test_fallback_elsevier_api_invalid_key(caplog)

Test real Elsevier API connectivity by verifying invalid key response pattern.

Source code in paperscraper/tests/test_pdf.py
def test_fallback_elsevier_api_invalid_key(self, caplog):
    """Test real Elsevier API connectivity by verifying invalid key response pattern."""
    caplog.set_level(logging.ERROR)
    paper_metadata = {"doi": "10.1016/j.xops.2024.100504"}
    output_path = Path("test_elsevier_invalid")
    api_keys = {"ELSEVIER_TDM_API_KEY": "INVALID_TEST_KEY_123"}
    result = FALLBACKS["elsevier"](paper_metadata, output_path, api_keys)
    # Should return False for invalid key
    assert result is False
    assert not output_path.with_suffix(".xml").exists()
    # Check for the specific APIKEY_INVALID error in the logs
    assert "invalid" in caplog.text.lower()

utils

dump_papers(papers: pd.DataFrame, filepath: str) -> None

Receives a pd.DataFrame, one paper per row and dumps it into a .jsonl file with one paper per line.

Parameters:

Name Type Description Default
papers DataFrame

A dataframe of paper metadata, one paper per row.

required
filepath str

Path to dump the papers, has to end with .jsonl.

required
Source code in paperscraper/utils.py
def dump_papers(papers: pd.DataFrame, filepath: str) -> None:
    """
    Receives a pd.DataFrame, one paper per row and dumps it into a .jsonl
    file with one paper per line.

    Args:
        papers (pd.DataFrame): A dataframe of paper metadata, one paper per row.
        filepath (str): Path to dump the papers, has to end with `.jsonl`.
    """
    if not isinstance(filepath, str):
        raise TypeError(f"filepath must be a string, not {type(filepath)}")
    if not filepath.endswith(".jsonl"):
        raise ValueError("Please provide a filepath with .jsonl extension")

    if isinstance(papers, List) and all([isinstance(p, Dict) for p in papers]):
        papers = pd.DataFrame(papers)
        logger.warning(
            "Preferably pass a pd.DataFrame, not a list of dictionaries. "
            "Passing a list is a legacy functionality that might become deprecated."
        )

    if not isinstance(papers, pd.DataFrame):
        raise TypeError(f"papers must be a pd.DataFrame, not {type(papers)}")

    paper_list = list(papers.T.to_dict().values())

    with open(filepath, "w") as f:
        for paper in paper_list:
            f.write(json.dumps(paper) + "\n")

get_filename_from_query(query: List[str]) -> str

Convert a keyword query into filenames to dump the paper.

Parameters:

Name Type Description Default
query list

List of string with keywords.

required

Returns:

Name Type Description
str str

Filename.

Source code in paperscraper/utils.py
def get_filename_from_query(query: List[str]) -> str:
    """Convert a keyword query into filenames to dump the paper.

    Args:
        query (list): List of string with keywords.

    Returns:
        str: Filename.
    """
    filename = "_".join([k if isinstance(k, str) else k[0] for k in query]) + ".jsonl"
    filename = filename.replace(" ", "").lower()
    return filename

load_jsonl(filepath: str) -> List[Dict[str, str]]

Load data from a .jsonl file, i.e., a file with one dictionary per line.

Parameters:

Name Type Description Default
filepath str

Path to .jsonl file.

required

Returns:

Type Description
List[Dict[str, str]]

List[Dict[str, str]]: A list of dictionaries, one per paper.

Source code in paperscraper/utils.py
def load_jsonl(filepath: str) -> List[Dict[str, str]]:
    """
    Load data from a `.jsonl` file, i.e., a file with one dictionary per line.

    Args:
        filepath (str): Path to `.jsonl` file.

    Returns:
        List[Dict[str, str]]: A list of dictionaries, one per paper.
    """

    with open(filepath, "r") as f:
        data = [json.loads(line) for line in f.readlines()]
    return data

xrxiv

bioRxiv and medRxiv utilities.

xrxiv_api

API for bioRxiv and medRXiv.

XRXivApi

API class.

Source code in paperscraper/xrxiv/xrxiv_api.py
class XRXivApi:
    """API class."""

    def __init__(
        self,
        server: str,
        launch_date: str,
        api_base_url: str = "https://api.biorxiv.org",
        max_retries: int = 10,
    ):
        """
        Initialize API class.

        Args:
            server (str): name of the preprint server to access.
            launch_date (str): launch date expressed as YYYY-MM-DD.
            api_base_url (str, optional): Base url for the API. Defaults to 'api.biorxiv.org'.
            max_retries (int, optional): Maximal number of retries for a request before an
                error is raised. Defaults to 10.
        """
        self.server = server
        self.api_base_url = api_base_url
        self.launch_date = launch_date
        self.launch_datetime = datetime.fromisoformat(self.launch_date)
        self.get_papers_url = (
            "{}/details/{}".format(self.api_base_url, self.server)
            + "/{start_date}/{end_date}/{cursor}"
        )
        self.max_retries = max_retries

    @retry_multi()
    def call_api(self, start_date, end_date, cursor):
        try:
            json_response = requests.get(
                self.get_papers_url.format(
                    start_date=start_date, end_date=end_date, cursor=cursor
                ),
                timeout=10,
            ).json()
        except requests.exceptions.Timeout:
            logger.info("Timed out, will retry")
            return None

        return json_response

    def get_papers(
        self,
        start_date: Optional[str] = None,
        end_date: Optional[str] = None,
        fields: List[str] = ["title", "doi", "authors", "abstract", "date", "journal"],
        max_retries: int = 10,
    ) -> Generator:
        """
        Get paper metadata.

        Args:
            start_date (Optional[str]): begin date. Defaults to None, a.k.a. launch date.
            end_date (Optional[str]): end date. Defaults to None, a.k.a. today.
            fields (List[str], optional): fields to return per paper.
                Defaults to ['title', 'doi', 'authors', 'abstract', 'date', 'journal'].
            max_retries (int): Number of retries on connection failure. Defaults to 10.

        Yields:
            Generator: a generator of paper metadata (dict) with the desired fields.
        """
        try:
            now_datetime = datetime.now()
            if start_date:
                start_datetime = datetime.fromisoformat(start_date)
                if start_datetime < self.launch_datetime:
                    start_date = self.launch_date
            else:
                start_date = self.launch_date
            if end_date:
                end_datetime = datetime.fromisoformat(end_date)
                if end_datetime > now_datetime:
                    end_date = now_datetime.strftime("%Y-%m-%d")
            else:
                end_date = now_datetime.strftime("%Y-%m-%d")
            do_loop = True
            cursor = 0
            while do_loop:
                papers = []
                for attempt in range(max_retries):
                    try:
                        json_response = self.call_api(start_date, end_date, cursor)
                        do_loop = json_response["messages"][0]["status"] == "ok"
                        if do_loop:
                            cursor += json_response["messages"][0]["count"]
                            for paper in json_response["collection"]:
                                processed_paper = {
                                    field: paper.get(field, "") for field in fields
                                }
                                papers.append(processed_paper)

                        if do_loop:
                            yield from papers
                            break
                    except (ConnectionError, Timeout) as e:
                        logger.error(
                            f"Connection error: {e}. Retrying ({attempt + 1}/{max_retries})"
                        )
                        sleep(5)
                        continue
                    except Exception as exc:
                        logger.exception(f"Failed getting papers: {exc}")
        except Exception as exc:
            logger.exception(f"Failed getting papers: {exc}")
__init__(server: str, launch_date: str, api_base_url: str = 'https://api.biorxiv.org', max_retries: int = 10)

Initialize API class.

Parameters:

Name Type Description Default
server str

name of the preprint server to access.

required
launch_date str

launch date expressed as YYYY-MM-DD.

required
api_base_url str

Base url for the API. Defaults to 'api.biorxiv.org'.

'https://api.biorxiv.org'
max_retries int

Maximal number of retries for a request before an error is raised. Defaults to 10.

10
Source code in paperscraper/xrxiv/xrxiv_api.py
def __init__(
    self,
    server: str,
    launch_date: str,
    api_base_url: str = "https://api.biorxiv.org",
    max_retries: int = 10,
):
    """
    Initialize API class.

    Args:
        server (str): name of the preprint server to access.
        launch_date (str): launch date expressed as YYYY-MM-DD.
        api_base_url (str, optional): Base url for the API. Defaults to 'api.biorxiv.org'.
        max_retries (int, optional): Maximal number of retries for a request before an
            error is raised. Defaults to 10.
    """
    self.server = server
    self.api_base_url = api_base_url
    self.launch_date = launch_date
    self.launch_datetime = datetime.fromisoformat(self.launch_date)
    self.get_papers_url = (
        "{}/details/{}".format(self.api_base_url, self.server)
        + "/{start_date}/{end_date}/{cursor}"
    )
    self.max_retries = max_retries
get_papers(start_date: Optional[str] = None, end_date: Optional[str] = None, fields: List[str] = ['title', 'doi', 'authors', 'abstract', 'date', 'journal'], max_retries: int = 10) -> Generator

Get paper metadata.

Parameters:

Name Type Description Default
start_date Optional[str]

begin date. Defaults to None, a.k.a. launch date.

None
end_date Optional[str]

end date. Defaults to None, a.k.a. today.

None
fields List[str]

fields to return per paper. Defaults to ['title', 'doi', 'authors', 'abstract', 'date', 'journal'].

['title', 'doi', 'authors', 'abstract', 'date', 'journal']
max_retries int

Number of retries on connection failure. Defaults to 10.

10

Yields:

Name Type Description
Generator Generator

a generator of paper metadata (dict) with the desired fields.

Source code in paperscraper/xrxiv/xrxiv_api.py
def get_papers(
    self,
    start_date: Optional[str] = None,
    end_date: Optional[str] = None,
    fields: List[str] = ["title", "doi", "authors", "abstract", "date", "journal"],
    max_retries: int = 10,
) -> Generator:
    """
    Get paper metadata.

    Args:
        start_date (Optional[str]): begin date. Defaults to None, a.k.a. launch date.
        end_date (Optional[str]): end date. Defaults to None, a.k.a. today.
        fields (List[str], optional): fields to return per paper.
            Defaults to ['title', 'doi', 'authors', 'abstract', 'date', 'journal'].
        max_retries (int): Number of retries on connection failure. Defaults to 10.

    Yields:
        Generator: a generator of paper metadata (dict) with the desired fields.
    """
    try:
        now_datetime = datetime.now()
        if start_date:
            start_datetime = datetime.fromisoformat(start_date)
            if start_datetime < self.launch_datetime:
                start_date = self.launch_date
        else:
            start_date = self.launch_date
        if end_date:
            end_datetime = datetime.fromisoformat(end_date)
            if end_datetime > now_datetime:
                end_date = now_datetime.strftime("%Y-%m-%d")
        else:
            end_date = now_datetime.strftime("%Y-%m-%d")
        do_loop = True
        cursor = 0
        while do_loop:
            papers = []
            for attempt in range(max_retries):
                try:
                    json_response = self.call_api(start_date, end_date, cursor)
                    do_loop = json_response["messages"][0]["status"] == "ok"
                    if do_loop:
                        cursor += json_response["messages"][0]["count"]
                        for paper in json_response["collection"]:
                            processed_paper = {
                                field: paper.get(field, "") for field in fields
                            }
                            papers.append(processed_paper)

                    if do_loop:
                        yield from papers
                        break
                except (ConnectionError, Timeout) as e:
                    logger.error(
                        f"Connection error: {e}. Retrying ({attempt + 1}/{max_retries})"
                    )
                    sleep(5)
                    continue
                except Exception as exc:
                    logger.exception(f"Failed getting papers: {exc}")
    except Exception as exc:
        logger.exception(f"Failed getting papers: {exc}")
BioRxivApi

Bases: XRXivApi

bioRxiv API.

Source code in paperscraper/xrxiv/xrxiv_api.py
class BioRxivApi(XRXivApi):
    """bioRxiv API."""

    def __init__(self, max_retries: int = 10):
        super().__init__(
            server="biorxiv",
            launch_date=launch_dates["biorxiv"],
            max_retries=max_retries,
        )
MedRxivApi

Bases: XRXivApi

medRxiv API.

Source code in paperscraper/xrxiv/xrxiv_api.py
class MedRxivApi(XRXivApi):
    """medRxiv API."""

    def __init__(self, max_retries: int = 10):
        super().__init__(
            server="medrxiv",
            launch_date=launch_dates["medrxiv"],
            max_retries=max_retries,
        )
retry_multi()

Retry a function several times

Source code in paperscraper/xrxiv/xrxiv_api.py
def retry_multi():
    """Retry a function several times"""

    def decorator(func):
        @wraps(func)
        def wrapper(self, *args, **kwargs):
            num_retries = 0
            max_retries = getattr(self, "max_retries", 10)
            while num_retries <= max_retries:
                try:
                    ret = func(self, *args, **kwargs)
                    if ret is None:
                        time.sleep(5)
                        continue
                    break
                except HTTPError:
                    if num_retries == max_retries:
                        raise
                    num_retries += 1
                    time.sleep(5)
            return ret

        return wrapper

    return decorator

xrxiv_query

Query dumps from bioRxiv and medRXiv.

XRXivQuery

Query class.

Source code in paperscraper/xrxiv/xrxiv_query.py
class XRXivQuery:
    """Query class."""

    def __init__(
        self,
        dump_filepath: str,
        fields: List[str] = ["title", "doi", "authors", "abstract", "date", "journal"],
    ):
        """
        Initialize the query class.

        Args:
            dump_filepath (str): filepath to the dump to be queried.
            fields (List[str], optional): fields to contained in the dump per paper.
                Defaults to ['title', 'doi', 'authors', 'abstract', 'date', 'journal'].
        """
        self.dump_filepath = dump_filepath
        self.fields = fields
        self.errored = False

        try:
            self.df = pd.read_json(self.dump_filepath, lines=True)
            self.df["date"] = [date.strftime("%Y-%m-%d") for date in self.df["date"]]
        except ValueError as e:
            logger.warning(f"Problem in reading file {dump_filepath}: {e} - Skipping!")
            self.errored = True
        except KeyError as e:
            logger.warning(f"Key {e} missing in file from {dump_filepath} - Skipping!")
            self.errored = True

    def search_keywords(
        self,
        keywords: List[Union[str, List[str]]],
        fields: List[str] = None,
        output_filepath: str = None,
    ) -> pd.DataFrame:
        """
        Search for papers in the dump using keywords.

        Args:
            keywords (List[str, List[str]]): Items will be AND separated. If items
                are lists themselves, they will be OR separated.
            fields (List[str], optional): fields to be used in the query search.
                Defaults to None, a.k.a. search in all fields excluding date.
            output_filepath (str, optional): optional output filepath where to store
                the hits in JSONL format. Defaults to None, a.k.a., no export to a file.

        Returns:
            pd.DataFrame: A dataframe with one paper per row.
        """
        if fields is None:
            fields = self.fields
        fields = [field for field in fields if field != "date"]
        hits_per_field = []
        for field in fields:
            field_data = self.df[field].str.lower()
            hits_per_keyword = []
            for keyword in keywords:
                if isinstance(keyword, list):
                    query = "|".join([_.lower() for _ in keyword])
                else:
                    query = keyword.lower()
                hits_per_keyword.append(field_data.str.contains(query))
            if len(hits_per_keyword):
                keyword_hits = hits_per_keyword[0]
                for single_keyword_hits in hits_per_keyword[1:]:
                    keyword_hits &= single_keyword_hits
                hits_per_field.append(keyword_hits)
        if len(hits_per_field):
            hits = hits_per_field[0]
            for single_hits in hits_per_field[1:]:
                hits |= single_hits
        if output_filepath is not None:
            self.df[hits].to_json(output_filepath, orient="records", lines=True)
        return self.df[hits]
__init__(dump_filepath: str, fields: List[str] = ['title', 'doi', 'authors', 'abstract', 'date', 'journal'])

Initialize the query class.

Parameters:

Name Type Description Default
dump_filepath str

filepath to the dump to be queried.

required
fields List[str]

fields to contained in the dump per paper. Defaults to ['title', 'doi', 'authors', 'abstract', 'date', 'journal'].

['title', 'doi', 'authors', 'abstract', 'date', 'journal']
Source code in paperscraper/xrxiv/xrxiv_query.py
def __init__(
    self,
    dump_filepath: str,
    fields: List[str] = ["title", "doi", "authors", "abstract", "date", "journal"],
):
    """
    Initialize the query class.

    Args:
        dump_filepath (str): filepath to the dump to be queried.
        fields (List[str], optional): fields to contained in the dump per paper.
            Defaults to ['title', 'doi', 'authors', 'abstract', 'date', 'journal'].
    """
    self.dump_filepath = dump_filepath
    self.fields = fields
    self.errored = False

    try:
        self.df = pd.read_json(self.dump_filepath, lines=True)
        self.df["date"] = [date.strftime("%Y-%m-%d") for date in self.df["date"]]
    except ValueError as e:
        logger.warning(f"Problem in reading file {dump_filepath}: {e} - Skipping!")
        self.errored = True
    except KeyError as e:
        logger.warning(f"Key {e} missing in file from {dump_filepath} - Skipping!")
        self.errored = True
search_keywords(keywords: List[Union[str, List[str]]], fields: List[str] = None, output_filepath: str = None) -> pd.DataFrame

Search for papers in the dump using keywords.

Parameters:

Name Type Description Default
keywords List[str, List[str]]

Items will be AND separated. If items are lists themselves, they will be OR separated.

required
fields List[str]

fields to be used in the query search. Defaults to None, a.k.a. search in all fields excluding date.

None
output_filepath str

optional output filepath where to store the hits in JSONL format. Defaults to None, a.k.a., no export to a file.

None

Returns:

Type Description
DataFrame

pd.DataFrame: A dataframe with one paper per row.

Source code in paperscraper/xrxiv/xrxiv_query.py
def search_keywords(
    self,
    keywords: List[Union[str, List[str]]],
    fields: List[str] = None,
    output_filepath: str = None,
) -> pd.DataFrame:
    """
    Search for papers in the dump using keywords.

    Args:
        keywords (List[str, List[str]]): Items will be AND separated. If items
            are lists themselves, they will be OR separated.
        fields (List[str], optional): fields to be used in the query search.
            Defaults to None, a.k.a. search in all fields excluding date.
        output_filepath (str, optional): optional output filepath where to store
            the hits in JSONL format. Defaults to None, a.k.a., no export to a file.

    Returns:
        pd.DataFrame: A dataframe with one paper per row.
    """
    if fields is None:
        fields = self.fields
    fields = [field for field in fields if field != "date"]
    hits_per_field = []
    for field in fields:
        field_data = self.df[field].str.lower()
        hits_per_keyword = []
        for keyword in keywords:
            if isinstance(keyword, list):
                query = "|".join([_.lower() for _ in keyword])
            else:
                query = keyword.lower()
            hits_per_keyword.append(field_data.str.contains(query))
        if len(hits_per_keyword):
            keyword_hits = hits_per_keyword[0]
            for single_keyword_hits in hits_per_keyword[1:]:
                keyword_hits &= single_keyword_hits
            hits_per_field.append(keyword_hits)
    if len(hits_per_field):
        hits = hits_per_field[0]
        for single_hits in hits_per_field[1:]:
            hits |= single_hits
    if output_filepath is not None:
        self.df[hits].to_json(output_filepath, orient="records", lines=True)
    return self.df[hits]