Skip to content

paperscraper.xrxiv

paperscraper.xrxiv

bioRxiv and medRxiv utilities.

xrxiv_api

API for bioRxiv and medRxiv.

XRXivApi

API class.

Source code in paperscraper/xrxiv/xrxiv_api.py
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
class XRXivApi:
    """API class."""

    def __init__(
        self,
        server: str,
        launch_date: str,
        api_base_url: str = "https://api.biorxiv.org",
        max_retries: int = 10,
        request_timeout: Tuple[float, float] = (5.0, 30.0),
        retry_backoff_seconds: float = 1.0,
        window_days: int = 365,
    ):
        """
        Initialize API class.

        Args:
            server: Name of the preprint server to access.
            launch_date: Launch date expressed as YYYY-MM-DD.
            api_base_url: Base url for the API.
            max_retries: Number of retries for transient failures per request.
            request_timeout: (connect timeout, read timeout), in seconds.
            retry_backoff_seconds: Initial backoff delay between retries.
            window_days: Date-window size for full-history scraping. Windows keep
                cursor offsets small and reduce deep-pagination slowdowns.
        """
        self.server = server
        self.api_base_url = api_base_url
        self.launch_date = launch_date
        self.launch_datetime = datetime.fromisoformat(self.launch_date)
        self.get_papers_url = (
            "{}/details/{}".format(self.api_base_url, self.server)
            + "/{start_date}/{end_date}/{cursor}"
        )
        self.max_retries = max(1, int(max_retries))
        self.request_timeout = request_timeout
        self.retry_backoff_seconds = max(0.0, retry_backoff_seconds)
        self.window_days = max(1, int(window_days))

        # Reuse TCP connections across requests to lower latency and avoid
        # repeated TLS setup in long-running dumps.
        self.session = requests.Session()
        adapter = HTTPAdapter(pool_connections=8, pool_maxsize=8)
        self.session.mount("https://", adapter)
        self.session.mount("http://", adapter)

    def _iter_date_windows(self, start_datetime: datetime, end_datetime: datetime):
        """Iterate over date windows using the instance window size.

        Args:
            start_datetime: Start of the overall date range.
            end_datetime: End of the overall date range.

        Yields:
            Tuple[datetime, datetime]: Inclusive window bounds.
        """
        current_start = start_datetime
        max_span = timedelta(days=self.window_days - 1)

        while current_start <= end_datetime:
            current_end = min(current_start + max_span, end_datetime)
            yield current_start, current_end
            current_start = current_end + timedelta(days=1)

    def _normalize_date_range(
        self, start_date: Optional[str], end_date: Optional[str]
    ) -> Tuple[datetime, datetime]:
        """Validate and normalize start/end date inputs.

        Args:
            start_date: Optional start date in YYYY-MM-DD format.
            end_date: Optional end date in YYYY-MM-DD format.

        Returns:
            Tuple[datetime, datetime]: Normalized start and end datetimes.

        Raises:
            ValueError: If the normalized start date is after the end date.
        """
        now_datetime = datetime.now()
        if start_date:
            start_datetime = datetime.fromisoformat(start_date)
            if start_datetime < self.launch_datetime:
                start_datetime = self.launch_datetime
        else:
            start_datetime = self.launch_datetime

        if end_date:
            end_datetime = datetime.fromisoformat(end_date)
            if end_datetime > now_datetime:
                end_datetime = now_datetime
        else:
            end_datetime = now_datetime

        if start_datetime > end_datetime:
            raise ValueError(
                f"start_date {start_datetime.strftime('%Y-%m-%d')} cannot be later than "
                f"end_date {end_datetime.strftime('%Y-%m-%d')}"
            )
        return start_datetime, end_datetime

    def _iter_date_windows_custom(
        self,
        start_datetime: datetime,
        end_datetime: datetime,
        window_days: int,
    ):
        """Iterate over date windows for an explicit window size.

        Args:
            start_datetime: Start of the overall date range.
            end_datetime: End of the overall date range.
            window_days: Number of days per yielded window.

        Yields:
            Tuple[datetime, datetime]: Inclusive window bounds.

        Raises:
            ValueError: If `window_days` is smaller than 1.
        """
        if window_days < 1:
            raise ValueError(f"window_days must be >= 1, got {window_days}")
        current_start = start_datetime
        max_span = timedelta(days=window_days - 1)

        while current_start <= end_datetime:
            current_end = min(current_start + max_span, end_datetime)
            yield current_start, current_end
            current_start = current_end + timedelta(days=1)

    def _worker_api(self, worker_window_days: int, worker_retries: int) -> "XRXivApi":
        """Create a dedicated API client for a parallel worker.

        Args:
            worker_window_days: Window size used by the worker client.
            worker_retries: Retry limit used by the worker client.

        Returns:
            XRXivApi: A new client configured like the parent instance.
        """
        # Use a dedicated client per worker to avoid shared-session contention
        # across threads and keep retry/backoff state isolated.
        return XRXivApi(
            server=self.server,
            launch_date=self.launch_date,
            api_base_url=self.api_base_url,
            max_retries=worker_retries,
            request_timeout=self.request_timeout,
            retry_backoff_seconds=self.retry_backoff_seconds,
            window_days=worker_window_days,
        )

    def _fetch_window_to_file(
        self,
        idx: int,
        start_date: str,
        end_date: str,
        output_dir: str,
        fields: List[str],
        max_retries: int,
    ) -> Tuple[int, str, int]:
        """Fetch one date window and persist it as a temporary JSONL chunk.

        Args:
            idx: Window index used for stable chunk ordering.
            start_date: Window start date in YYYY-MM-DD format.
            end_date: Window end date in YYYY-MM-DD format.
            output_dir: Directory where the temporary chunk is written.
            fields: Metadata fields to keep for each record.
            max_retries: Per-request retry limit for this fetch.

        Returns:
            Tuple[int, str, int]: Window index, chunk path, and written row count.
        """
        worker_window_days = (
            datetime.fromisoformat(end_date) - datetime.fromisoformat(start_date)
        ).days + 1
        api = self._worker_api(
            worker_window_days=worker_window_days,
            worker_retries=max_retries,
        )

        part_path = os.path.join(output_dir, f"window_{idx:04d}.jsonl")
        count = 0
        with open(part_path, "w", encoding="utf-8") as fp:
            for paper in api.get_papers(
                start_date=start_date,
                end_date=end_date,
                fields=fields,
                max_retries=max_retries,
            ):
                if count > 0:
                    fp.write(os.linesep)
                fp.write(json.dumps(paper))
                count += 1
        return idx, part_path, count

    @staticmethod
    def _merge_window_files(
        save_path: str,
        ordered_window_paths: List[str],
        deduplicate_dois: bool,
    ) -> int:
        """Merge temporary JSONL chunks into a single output file.

        Args:
            save_path: Final JSONL output path.
            ordered_window_paths: Chunk files sorted by window order.
            deduplicate_dois: Whether to drop duplicate DOI entries.

        Returns:
            int: Number of rows written to `save_path`.
        """
        seen_dois = set()
        written = 0

        with open(save_path, "w", encoding="utf-8") as out_fp:
            for window_path in ordered_window_paths:
                with open(window_path, "r", encoding="utf-8") as in_fp:
                    for line in in_fp:
                        line = line.strip()
                        if not line:
                            continue

                        if deduplicate_dois:
                            paper = json.loads(line)
                            doi = str(paper.get("doi", "")).strip().lower()
                            if doi and doi in seen_dois:
                                continue
                            if doi:
                                seen_dois.add(doi)

                        if written > 0:
                            out_fp.write(os.linesep)
                        out_fp.write(line)
                        written += 1

        return written

    def dump_papers(
        self,
        save_path: str,
        start_date: Optional[str] = None,
        end_date: Optional[str] = None,
        fields: Optional[List[str]] = None,
        max_retries: Optional[int] = None,
        max_workers: int = 1,
        window_days: Optional[int] = None,
        deduplicate_dois: bool = False,
        show_progress: bool = True,
    ) -> int:
        """Dump papers to JSONL, optionally in parallel using date windows.

        Args:
            save_path: Output JSONL path.
            start_date: Begin date, YYYY-MM-DD.
            end_date: End date, YYYY-MM-DD.
            fields: Fields to include per paper.
            max_retries: Optional per-request retry override.
            max_workers: Number of parallel window workers.
            window_days: Window size used for partitioning date ranges.
            deduplicate_dois: If True, drop repeated DOIs while merging windows.
            show_progress: If True, render tqdm progress bars.

        Returns:
            Number of records written to `save_path`.
        """
        if fields is None:
            fields = ["title", "doi", "authors", "abstract", "date", "journal"]
        if max_workers < 1:
            raise ValueError(f"max_workers must be >= 1, got {max_workers}")
        worker_retries = max_retries if max_retries is not None else self.max_retries
        worker_retries = max(1, int(worker_retries))
        span_days = max(1, int(window_days if window_days is not None else self.window_days))

        start_datetime, end_datetime = self._normalize_date_range(start_date, end_date)
        start_text = start_datetime.strftime("%Y-%m-%d")
        end_text = end_datetime.strftime("%Y-%m-%d")

        output_dir = os.path.dirname(os.path.abspath(save_path)) or "."
        os.makedirs(output_dir, exist_ok=True)

        if max_workers == 1:
            iterator = self.get_papers(
                start_date=start_text,
                end_date=end_text,
                fields=fields,
                max_retries=worker_retries,
            )
            if show_progress:
                iterator = tqdm(iterator, desc=f"{self.server} dump")

            written = 0
            seen_dois = set()
            with open(save_path, "w", encoding="utf-8") as fp:
                for paper in iterator:
                    if deduplicate_dois:
                        # For sequential mode, track DOIs inline.
                        doi = str(paper.get("doi", "")).strip().lower()
                        if doi and doi in seen_dois:
                            continue
                        if doi:
                            seen_dois.add(doi)

                    if written > 0:
                        fp.write(os.linesep)
                    fp.write(json.dumps(paper))
                    written += 1

            return written

        windows = list(
            self._iter_date_windows_custom(
                start_datetime=start_datetime,
                end_datetime=end_datetime,
                window_days=span_days,
            )
        )
        if not windows:
            with open(save_path, "w", encoding="utf-8"):
                pass
            return 0

        # Keep worker chunks outside server_dumps so interrupted runs
        # cannot be mistaken for dump files during module import.
        tmp_dir = mkdtemp(prefix=f"{self.server}_windows_")
        try:
            part_files: Dict[int, str] = {}
            futures = []
            with ThreadPoolExecutor(max_workers=max_workers) as executor:
                for idx, (win_start, win_end) in enumerate(windows):
                    fut = executor.submit(
                        self._fetch_window_to_file,
                        idx,
                        win_start.strftime("%Y-%m-%d"),
                        win_end.strftime("%Y-%m-%d"),
                        tmp_dir,
                        fields,
                        worker_retries,
                    )
                    futures.append(fut)

                completion_iter = as_completed(futures)
                if show_progress:
                    completion_iter = tqdm(
                        completion_iter,
                        total=len(futures),
                        desc=f"{self.server} windows",
                    )

                for fut in completion_iter:
                    idx, part_path, _ = fut.result()
                    part_files[idx] = part_path

            ordered_window_paths = [part_files[idx] for idx in sorted(part_files)]
            return self._merge_window_files(
                save_path=save_path,
                ordered_window_paths=ordered_window_paths,
                deduplicate_dois=deduplicate_dois,
            )
        finally:
            shutil.rmtree(tmp_dir, ignore_errors=True)

    def call_api(
        self,
        start_date: str,
        end_date: str,
        cursor: int,
        max_retries: Optional[int] = None,
    ) -> dict:
        """Call the x-rxiv details endpoint with retry and backoff.

        Args:
            start_date: Query start date in YYYY-MM-DD format.
            end_date: Query end date in YYYY-MM-DD format.
            cursor: Cursor offset for paginated retrieval.
            max_retries: Optional retry override for this call.

        Returns:
            dict: Parsed JSON response payload from the endpoint.

        Raises:
            requests.HTTPError: If a non-retryable HTTP error occurs.
            requests.Timeout: If retries are exhausted after timeout failures.
            requests.ConnectionError: If retries are exhausted after connection failures.
            RuntimeError: If the request loop exits unexpectedly without a result.
        """
        max_attempts = max_retries if max_retries is not None else self.max_retries
        max_attempts = max(1, int(max_attempts))
        backoff = self.retry_backoff_seconds
        transient_status = {408, 429, 500, 502, 503, 504}
        url = self.get_papers_url.format(
            start_date=start_date,
            end_date=end_date,
            cursor=cursor,
        )
        last_error = None

        for attempt in range(1, max_attempts + 1):
            try:
                response = self.session.get(url, timeout=self.request_timeout)
                if response.status_code in transient_status:
                    logger.info(
                        f"{self.server} transient HTTP {response.status_code} at cursor {cursor} "
                        f"for {start_date}..{end_date} (attempt {attempt}/{max_attempts}); retrying"
                    )
                    if attempt == max_attempts:
                        response.raise_for_status()
                    if backoff:
                        sleep(backoff)
                        backoff = min(60.0, max(backoff * 2, backoff + 0.1))
                    continue

                response.raise_for_status()
                return response.json()
            except (Timeout, ConnectionError) as exc:
                last_error = exc
                logger.info(
                    f"{self.server} request failed ({exc.__class__.__name__}) at cursor {cursor} "
                    f"for {start_date}..{end_date} (attempt {attempt}/{max_attempts}); retrying"
                )
                if attempt == max_attempts:
                    raise
                if backoff:
                    sleep(backoff)
                    backoff = min(60.0, max(backoff * 2, backoff + 0.1))
            except (JSONDecodeError, ValueError) as exc:
                last_error = exc
                logger.info(
                    f"{self.server} JSON decode failed at cursor {cursor} for {start_date}..{end_date} "
                    f"(attempt {attempt}/{max_attempts}); retrying"
                )
                if attempt == max_attempts:
                    raise
                if backoff:
                    sleep(backoff)
                    backoff = min(60.0, max(backoff * 2, backoff + 0.1))
            except HTTPError as exc:
                last_error = exc
                # Non-transient errors should fail fast.
                raise

        if last_error is not None:
            raise last_error
        raise RuntimeError("Failed to query x-rxiv API for unknown reasons")

    def get_papers(
        self,
        start_date: Optional[str] = None,
        end_date: Optional[str] = None,
        fields: Optional[List[str]] = None,
        max_retries: Optional[int] = None,
    ) -> Generator:
        """
        Get paper metadata.

        Args:
            start_date: Begin date. Defaults to launch date.
            end_date: End date. Defaults to today.
            fields: Fields to return per paper.
            max_retries: Optional per-request retry override.

        Yields:
            A generator of paper metadata dicts.
        """
        if fields is None:
            fields = ["title", "doi", "authors", "abstract", "date", "journal"]

        start_datetime, end_datetime = self._normalize_date_range(start_date, end_date)

        for window_start, window_end in self._iter_date_windows(
            start_datetime, end_datetime
        ):
            start_date_text = window_start.strftime("%Y-%m-%d")
            end_date_text = window_end.strftime("%Y-%m-%d")
            cursor = 0

            while True:
                json_response = self.call_api(
                    start_date=start_date_text,
                    end_date=end_date_text,
                    cursor=cursor,
                    max_retries=max_retries,
                )

                messages = json_response.get("messages", [])
                message = messages[0] if messages else {}
                if message.get("status") != "ok":
                    break

                collection = json_response.get("collection", [])
                if not collection:
                    break

                for paper in collection:
                    yield {field: paper.get(field, "") for field in fields}

                returned_count = len(collection)
                cursor += returned_count

                # API pages are capped at 100 items. If we got less than that, we
                # reached the end of this interval without another request.
                if returned_count < 100:
                    break

                total = _to_int(message.get("total"), default=0)
                if total and cursor >= total:
                    break
__init__(server: str, launch_date: str, api_base_url: str = 'https://api.biorxiv.org', max_retries: int = 10, request_timeout: Tuple[float, float] = (5.0, 30.0), retry_backoff_seconds: float = 1.0, window_days: int = 365)

Initialize API class.

Parameters:

Name Type Description Default
server str

Name of the preprint server to access.

required
launch_date str

Launch date expressed as YYYY-MM-DD.

required
api_base_url str

Base url for the API.

'https://api.biorxiv.org'
max_retries int

Number of retries for transient failures per request.

10
request_timeout Tuple[float, float]

(connect timeout, read timeout), in seconds.

(5.0, 30.0)
retry_backoff_seconds float

Initial backoff delay between retries.

1.0
window_days int

Date-window size for full-history scraping. Windows keep cursor offsets small and reduce deep-pagination slowdowns.

365
Source code in paperscraper/xrxiv/xrxiv_api.py
def __init__(
    self,
    server: str,
    launch_date: str,
    api_base_url: str = "https://api.biorxiv.org",
    max_retries: int = 10,
    request_timeout: Tuple[float, float] = (5.0, 30.0),
    retry_backoff_seconds: float = 1.0,
    window_days: int = 365,
):
    """
    Initialize API class.

    Args:
        server: Name of the preprint server to access.
        launch_date: Launch date expressed as YYYY-MM-DD.
        api_base_url: Base url for the API.
        max_retries: Number of retries for transient failures per request.
        request_timeout: (connect timeout, read timeout), in seconds.
        retry_backoff_seconds: Initial backoff delay between retries.
        window_days: Date-window size for full-history scraping. Windows keep
            cursor offsets small and reduce deep-pagination slowdowns.
    """
    self.server = server
    self.api_base_url = api_base_url
    self.launch_date = launch_date
    self.launch_datetime = datetime.fromisoformat(self.launch_date)
    self.get_papers_url = (
        "{}/details/{}".format(self.api_base_url, self.server)
        + "/{start_date}/{end_date}/{cursor}"
    )
    self.max_retries = max(1, int(max_retries))
    self.request_timeout = request_timeout
    self.retry_backoff_seconds = max(0.0, retry_backoff_seconds)
    self.window_days = max(1, int(window_days))

    # Reuse TCP connections across requests to lower latency and avoid
    # repeated TLS setup in long-running dumps.
    self.session = requests.Session()
    adapter = HTTPAdapter(pool_connections=8, pool_maxsize=8)
    self.session.mount("https://", adapter)
    self.session.mount("http://", adapter)
dump_papers(save_path: str, start_date: Optional[str] = None, end_date: Optional[str] = None, fields: Optional[List[str]] = None, max_retries: Optional[int] = None, max_workers: int = 1, window_days: Optional[int] = None, deduplicate_dois: bool = False, show_progress: bool = True) -> int

Dump papers to JSONL, optionally in parallel using date windows.

Parameters:

Name Type Description Default
save_path str

Output JSONL path.

required
start_date Optional[str]

Begin date, YYYY-MM-DD.

None
end_date Optional[str]

End date, YYYY-MM-DD.

None
fields Optional[List[str]]

Fields to include per paper.

None
max_retries Optional[int]

Optional per-request retry override.

None
max_workers int

Number of parallel window workers.

1
window_days Optional[int]

Window size used for partitioning date ranges.

None
deduplicate_dois bool

If True, drop repeated DOIs while merging windows.

False
show_progress bool

If True, render tqdm progress bars.

True

Returns:

Type Description
int

Number of records written to save_path.

Source code in paperscraper/xrxiv/xrxiv_api.py
def dump_papers(
    self,
    save_path: str,
    start_date: Optional[str] = None,
    end_date: Optional[str] = None,
    fields: Optional[List[str]] = None,
    max_retries: Optional[int] = None,
    max_workers: int = 1,
    window_days: Optional[int] = None,
    deduplicate_dois: bool = False,
    show_progress: bool = True,
) -> int:
    """Dump papers to JSONL, optionally in parallel using date windows.

    Args:
        save_path: Output JSONL path.
        start_date: Begin date, YYYY-MM-DD.
        end_date: End date, YYYY-MM-DD.
        fields: Fields to include per paper.
        max_retries: Optional per-request retry override.
        max_workers: Number of parallel window workers.
        window_days: Window size used for partitioning date ranges.
        deduplicate_dois: If True, drop repeated DOIs while merging windows.
        show_progress: If True, render tqdm progress bars.

    Returns:
        Number of records written to `save_path`.
    """
    if fields is None:
        fields = ["title", "doi", "authors", "abstract", "date", "journal"]
    if max_workers < 1:
        raise ValueError(f"max_workers must be >= 1, got {max_workers}")
    worker_retries = max_retries if max_retries is not None else self.max_retries
    worker_retries = max(1, int(worker_retries))
    span_days = max(1, int(window_days if window_days is not None else self.window_days))

    start_datetime, end_datetime = self._normalize_date_range(start_date, end_date)
    start_text = start_datetime.strftime("%Y-%m-%d")
    end_text = end_datetime.strftime("%Y-%m-%d")

    output_dir = os.path.dirname(os.path.abspath(save_path)) or "."
    os.makedirs(output_dir, exist_ok=True)

    if max_workers == 1:
        iterator = self.get_papers(
            start_date=start_text,
            end_date=end_text,
            fields=fields,
            max_retries=worker_retries,
        )
        if show_progress:
            iterator = tqdm(iterator, desc=f"{self.server} dump")

        written = 0
        seen_dois = set()
        with open(save_path, "w", encoding="utf-8") as fp:
            for paper in iterator:
                if deduplicate_dois:
                    # For sequential mode, track DOIs inline.
                    doi = str(paper.get("doi", "")).strip().lower()
                    if doi and doi in seen_dois:
                        continue
                    if doi:
                        seen_dois.add(doi)

                if written > 0:
                    fp.write(os.linesep)
                fp.write(json.dumps(paper))
                written += 1

        return written

    windows = list(
        self._iter_date_windows_custom(
            start_datetime=start_datetime,
            end_datetime=end_datetime,
            window_days=span_days,
        )
    )
    if not windows:
        with open(save_path, "w", encoding="utf-8"):
            pass
        return 0

    # Keep worker chunks outside server_dumps so interrupted runs
    # cannot be mistaken for dump files during module import.
    tmp_dir = mkdtemp(prefix=f"{self.server}_windows_")
    try:
        part_files: Dict[int, str] = {}
        futures = []
        with ThreadPoolExecutor(max_workers=max_workers) as executor:
            for idx, (win_start, win_end) in enumerate(windows):
                fut = executor.submit(
                    self._fetch_window_to_file,
                    idx,
                    win_start.strftime("%Y-%m-%d"),
                    win_end.strftime("%Y-%m-%d"),
                    tmp_dir,
                    fields,
                    worker_retries,
                )
                futures.append(fut)

            completion_iter = as_completed(futures)
            if show_progress:
                completion_iter = tqdm(
                    completion_iter,
                    total=len(futures),
                    desc=f"{self.server} windows",
                )

            for fut in completion_iter:
                idx, part_path, _ = fut.result()
                part_files[idx] = part_path

        ordered_window_paths = [part_files[idx] for idx in sorted(part_files)]
        return self._merge_window_files(
            save_path=save_path,
            ordered_window_paths=ordered_window_paths,
            deduplicate_dois=deduplicate_dois,
        )
    finally:
        shutil.rmtree(tmp_dir, ignore_errors=True)
call_api(start_date: str, end_date: str, cursor: int, max_retries: Optional[int] = None) -> dict

Call the x-rxiv details endpoint with retry and backoff.

Parameters:

Name Type Description Default
start_date str

Query start date in YYYY-MM-DD format.

required
end_date str

Query end date in YYYY-MM-DD format.

required
cursor int

Cursor offset for paginated retrieval.

required
max_retries Optional[int]

Optional retry override for this call.

None

Returns:

Name Type Description
dict dict

Parsed JSON response payload from the endpoint.

Raises:

Type Description
HTTPError

If a non-retryable HTTP error occurs.

Timeout

If retries are exhausted after timeout failures.

ConnectionError

If retries are exhausted after connection failures.

RuntimeError

If the request loop exits unexpectedly without a result.

Source code in paperscraper/xrxiv/xrxiv_api.py
def call_api(
    self,
    start_date: str,
    end_date: str,
    cursor: int,
    max_retries: Optional[int] = None,
) -> dict:
    """Call the x-rxiv details endpoint with retry and backoff.

    Args:
        start_date: Query start date in YYYY-MM-DD format.
        end_date: Query end date in YYYY-MM-DD format.
        cursor: Cursor offset for paginated retrieval.
        max_retries: Optional retry override for this call.

    Returns:
        dict: Parsed JSON response payload from the endpoint.

    Raises:
        requests.HTTPError: If a non-retryable HTTP error occurs.
        requests.Timeout: If retries are exhausted after timeout failures.
        requests.ConnectionError: If retries are exhausted after connection failures.
        RuntimeError: If the request loop exits unexpectedly without a result.
    """
    max_attempts = max_retries if max_retries is not None else self.max_retries
    max_attempts = max(1, int(max_attempts))
    backoff = self.retry_backoff_seconds
    transient_status = {408, 429, 500, 502, 503, 504}
    url = self.get_papers_url.format(
        start_date=start_date,
        end_date=end_date,
        cursor=cursor,
    )
    last_error = None

    for attempt in range(1, max_attempts + 1):
        try:
            response = self.session.get(url, timeout=self.request_timeout)
            if response.status_code in transient_status:
                logger.info(
                    f"{self.server} transient HTTP {response.status_code} at cursor {cursor} "
                    f"for {start_date}..{end_date} (attempt {attempt}/{max_attempts}); retrying"
                )
                if attempt == max_attempts:
                    response.raise_for_status()
                if backoff:
                    sleep(backoff)
                    backoff = min(60.0, max(backoff * 2, backoff + 0.1))
                continue

            response.raise_for_status()
            return response.json()
        except (Timeout, ConnectionError) as exc:
            last_error = exc
            logger.info(
                f"{self.server} request failed ({exc.__class__.__name__}) at cursor {cursor} "
                f"for {start_date}..{end_date} (attempt {attempt}/{max_attempts}); retrying"
            )
            if attempt == max_attempts:
                raise
            if backoff:
                sleep(backoff)
                backoff = min(60.0, max(backoff * 2, backoff + 0.1))
        except (JSONDecodeError, ValueError) as exc:
            last_error = exc
            logger.info(
                f"{self.server} JSON decode failed at cursor {cursor} for {start_date}..{end_date} "
                f"(attempt {attempt}/{max_attempts}); retrying"
            )
            if attempt == max_attempts:
                raise
            if backoff:
                sleep(backoff)
                backoff = min(60.0, max(backoff * 2, backoff + 0.1))
        except HTTPError as exc:
            last_error = exc
            # Non-transient errors should fail fast.
            raise

    if last_error is not None:
        raise last_error
    raise RuntimeError("Failed to query x-rxiv API for unknown reasons")
get_papers(start_date: Optional[str] = None, end_date: Optional[str] = None, fields: Optional[List[str]] = None, max_retries: Optional[int] = None) -> Generator

Get paper metadata.

Parameters:

Name Type Description Default
start_date Optional[str]

Begin date. Defaults to launch date.

None
end_date Optional[str]

End date. Defaults to today.

None
fields Optional[List[str]]

Fields to return per paper.

None
max_retries Optional[int]

Optional per-request retry override.

None

Yields:

Type Description
Generator

A generator of paper metadata dicts.

Source code in paperscraper/xrxiv/xrxiv_api.py
def get_papers(
    self,
    start_date: Optional[str] = None,
    end_date: Optional[str] = None,
    fields: Optional[List[str]] = None,
    max_retries: Optional[int] = None,
) -> Generator:
    """
    Get paper metadata.

    Args:
        start_date: Begin date. Defaults to launch date.
        end_date: End date. Defaults to today.
        fields: Fields to return per paper.
        max_retries: Optional per-request retry override.

    Yields:
        A generator of paper metadata dicts.
    """
    if fields is None:
        fields = ["title", "doi", "authors", "abstract", "date", "journal"]

    start_datetime, end_datetime = self._normalize_date_range(start_date, end_date)

    for window_start, window_end in self._iter_date_windows(
        start_datetime, end_datetime
    ):
        start_date_text = window_start.strftime("%Y-%m-%d")
        end_date_text = window_end.strftime("%Y-%m-%d")
        cursor = 0

        while True:
            json_response = self.call_api(
                start_date=start_date_text,
                end_date=end_date_text,
                cursor=cursor,
                max_retries=max_retries,
            )

            messages = json_response.get("messages", [])
            message = messages[0] if messages else {}
            if message.get("status") != "ok":
                break

            collection = json_response.get("collection", [])
            if not collection:
                break

            for paper in collection:
                yield {field: paper.get(field, "") for field in fields}

            returned_count = len(collection)
            cursor += returned_count

            # API pages are capped at 100 items. If we got less than that, we
            # reached the end of this interval without another request.
            if returned_count < 100:
                break

            total = _to_int(message.get("total"), default=0)
            if total and cursor >= total:
                break

BioRxivApi

Bases: XRXivApi

bioRxiv API.

Source code in paperscraper/xrxiv/xrxiv_api.py
class BioRxivApi(XRXivApi):
    """bioRxiv API."""

    def __init__(
        self,
        max_retries: int = 10,
        request_timeout: Tuple[float, float] = (5.0, 30.0),
        retry_backoff_seconds: float = 1.0,
        window_days: int = 365,
    ):
        super().__init__(
            server="biorxiv",
            launch_date=launch_dates["biorxiv"],
            max_retries=max_retries,
            request_timeout=request_timeout,
            retry_backoff_seconds=retry_backoff_seconds,
            window_days=window_days,
        )

MedRxivApi

Bases: XRXivApi

medRxiv API.

Source code in paperscraper/xrxiv/xrxiv_api.py
class MedRxivApi(XRXivApi):
    """medRxiv API."""

    def __init__(
        self,
        max_retries: int = 10,
        request_timeout: Tuple[float, float] = (5.0, 30.0),
        retry_backoff_seconds: float = 1.0,
        window_days: int = 365,
    ):
        super().__init__(
            server="medrxiv",
            launch_date=launch_dates["medrxiv"],
            max_retries=max_retries,
            request_timeout=request_timeout,
            retry_backoff_seconds=retry_backoff_seconds,
            window_days=window_days,
        )

xrxiv_query

Query dumps from bioRxiv and medRXiv.

XRXivQuery

Query class.

Source code in paperscraper/xrxiv/xrxiv_query.py
class XRXivQuery:
    """Query class."""

    def __init__(
        self,
        dump_filepath: str,
        fields: List[str] = ["title", "doi", "authors", "abstract", "date", "journal"],
    ):
        """
        Initialize the query class.

        Args:
            dump_filepath (str): filepath to the dump to be queried.
            fields (List[str], optional): fields to contained in the dump per paper.
                Defaults to ['title', 'doi', 'authors', 'abstract', 'date', 'journal'].
        """
        self.dump_filepath = dump_filepath
        self.fields = fields
        self.errored = False

        try:
            self.df = pd.read_json(self.dump_filepath, lines=True)
            self.df["date"] = [date.strftime("%Y-%m-%d") for date in self.df["date"]]
        except ValueError as e:
            logger.warning(f"Problem in reading file {dump_filepath}: {e} - Skipping!")
            self.errored = True
        except KeyError as e:
            logger.warning(f"Key {e} missing in file from {dump_filepath} - Skipping!")
            self.errored = True

    def search_keywords(
        self,
        keywords: List[Union[str, List[str]]],
        fields: List[str] = None,
        output_filepath: str = None,
    ) -> pd.DataFrame:
        """
        Search for papers in the dump using keywords.

        Args:
            keywords (List[str, List[str]]): Items will be AND separated. If items
                are lists themselves, they will be OR separated.
            fields (List[str], optional): fields to be used in the query search.
                Defaults to None, a.k.a. search in all fields excluding date.
            output_filepath (str, optional): optional output filepath where to store
                the hits in JSONL format. Defaults to None, a.k.a., no export to a file.

        Returns:
            pd.DataFrame: A dataframe with one paper per row.
        """
        if fields is None:
            fields = self.fields
        fields = [field for field in fields if field != "date"]
        hits_per_field = []
        for field in fields:
            field_data = self.df[field].str.lower()
            hits_per_keyword = []
            for keyword in keywords:
                if isinstance(keyword, list):
                    query = "|".join([_.lower() for _ in keyword])
                else:
                    query = keyword.lower()
                hits_per_keyword.append(field_data.str.contains(query))
            if len(hits_per_keyword):
                keyword_hits = hits_per_keyword[0]
                for single_keyword_hits in hits_per_keyword[1:]:
                    keyword_hits &= single_keyword_hits
                hits_per_field.append(keyword_hits)
        if len(hits_per_field):
            hits = hits_per_field[0]
            for single_hits in hits_per_field[1:]:
                hits |= single_hits
        if output_filepath is not None:
            self.df[hits].to_json(output_filepath, orient="records", lines=True)
        return self.df[hits]
__init__(dump_filepath: str, fields: List[str] = ['title', 'doi', 'authors', 'abstract', 'date', 'journal'])

Initialize the query class.

Parameters:

Name Type Description Default
dump_filepath str

filepath to the dump to be queried.

required
fields List[str]

fields to contained in the dump per paper. Defaults to ['title', 'doi', 'authors', 'abstract', 'date', 'journal'].

['title', 'doi', 'authors', 'abstract', 'date', 'journal']
Source code in paperscraper/xrxiv/xrxiv_query.py
def __init__(
    self,
    dump_filepath: str,
    fields: List[str] = ["title", "doi", "authors", "abstract", "date", "journal"],
):
    """
    Initialize the query class.

    Args:
        dump_filepath (str): filepath to the dump to be queried.
        fields (List[str], optional): fields to contained in the dump per paper.
            Defaults to ['title', 'doi', 'authors', 'abstract', 'date', 'journal'].
    """
    self.dump_filepath = dump_filepath
    self.fields = fields
    self.errored = False

    try:
        self.df = pd.read_json(self.dump_filepath, lines=True)
        self.df["date"] = [date.strftime("%Y-%m-%d") for date in self.df["date"]]
    except ValueError as e:
        logger.warning(f"Problem in reading file {dump_filepath}: {e} - Skipping!")
        self.errored = True
    except KeyError as e:
        logger.warning(f"Key {e} missing in file from {dump_filepath} - Skipping!")
        self.errored = True
search_keywords(keywords: List[Union[str, List[str]]], fields: List[str] = None, output_filepath: str = None) -> pd.DataFrame

Search for papers in the dump using keywords.

Parameters:

Name Type Description Default
keywords List[str, List[str]]

Items will be AND separated. If items are lists themselves, they will be OR separated.

required
fields List[str]

fields to be used in the query search. Defaults to None, a.k.a. search in all fields excluding date.

None
output_filepath str

optional output filepath where to store the hits in JSONL format. Defaults to None, a.k.a., no export to a file.

None

Returns:

Type Description
DataFrame

pd.DataFrame: A dataframe with one paper per row.

Source code in paperscraper/xrxiv/xrxiv_query.py
def search_keywords(
    self,
    keywords: List[Union[str, List[str]]],
    fields: List[str] = None,
    output_filepath: str = None,
) -> pd.DataFrame:
    """
    Search for papers in the dump using keywords.

    Args:
        keywords (List[str, List[str]]): Items will be AND separated. If items
            are lists themselves, they will be OR separated.
        fields (List[str], optional): fields to be used in the query search.
            Defaults to None, a.k.a. search in all fields excluding date.
        output_filepath (str, optional): optional output filepath where to store
            the hits in JSONL format. Defaults to None, a.k.a., no export to a file.

    Returns:
        pd.DataFrame: A dataframe with one paper per row.
    """
    if fields is None:
        fields = self.fields
    fields = [field for field in fields if field != "date"]
    hits_per_field = []
    for field in fields:
        field_data = self.df[field].str.lower()
        hits_per_keyword = []
        for keyword in keywords:
            if isinstance(keyword, list):
                query = "|".join([_.lower() for _ in keyword])
            else:
                query = keyword.lower()
            hits_per_keyword.append(field_data.str.contains(query))
        if len(hits_per_keyword):
            keyword_hits = hits_per_keyword[0]
            for single_keyword_hits in hits_per_keyword[1:]:
                keyword_hits &= single_keyword_hits
            hits_per_field.append(keyword_hits)
    if len(hits_per_field):
        hits = hits_per_field[0]
        for single_hits in hits_per_field[1:]:
            hits |= single_hits
    if output_filepath is not None:
        self.df[hits].to_json(output_filepath, orient="records", lines=True)
    return self.df[hits]