1#[cfg(feature = "serde")]
2use alloc::collections::BTreeMap;
3#[cfg(feature = "serde")]
4use alloc::sync::Arc;
5use alloc::{
6 string::{String, ToString},
7 vec::Vec,
8};
9use core::fmt;
10
11use crate::error::{ReadError, ReadErrorKind};
12
13pub trait Read {
18 fn read(&mut self, buf: &mut Vec<u8>) -> Result<bool, ReadError>;
23}
24
25#[cfg(not(feature = "std"))]
26impl Read for &[u8] {
27 fn read(&mut self, buf: &mut Vec<u8>) -> Result<bool, ReadError> {
28 let chunk = 16384;
29 let to_copy = chunk.min(self.len());
30 if to_copy == 0 {
31 return Ok(false);
32 }
33 buf.extend_from_slice(&self[..to_copy]);
34 *self = &self[to_copy..];
35 Ok(true)
36 }
37}
38
39#[cfg(feature = "std")]
40impl<R: std::io::Read> Read for R {
41 fn read(&mut self, buf: &mut Vec<u8>) -> Result<bool, ReadError> {
42 let mut tmp = [0u8; 16384];
43 let n = std::io::Read::read(self, &mut tmp)?;
44 if n == 0 {
45 return Ok(false);
46 }
47 buf.extend_from_slice(&tmp[..n]);
48 Ok(true)
49 }
50}
51
52#[derive(Clone, Copy, Debug, PartialEq)]
53enum State {
54 StartOfField,
55 InUnquoted,
56 InQuoted,
57 AfterQuote,
58}
59
60#[derive(Clone, Copy, Debug)]
61enum Src {
62 Buf,
63 Scratch,
64}
65
66#[derive(Clone, Copy, Debug)]
67struct RawField {
68 start: usize,
69 end: usize,
70 src: Src,
71}
72
73#[derive(Clone, Copy, Debug, PartialEq)]
74pub struct FieldRange {
75 pub start: usize,
76 pub end: usize,
77}
78
79pub struct Reader<R: Read> {
141 buf: Vec<u8>,
143 scratch: Vec<u8>,
145 ranges: Vec<RawField>,
147 start: usize,
149 end: usize,
151 line: usize,
153 field_start: usize,
155 scratch_field_start: usize,
157 state: State,
159 delimiter: u8,
161 flexible: bool,
163 num_fields: Option<usize>,
165 row_size_hint: usize,
167 headers_parsed: bool,
169 pending_cr: bool,
171 headers: Vec<String>,
173 #[cfg(feature = "serde")]
174 header_map: Option<Arc<BTreeMap<String, usize>>>,
176 source: R,
178 eof: bool,
180}
181
182impl<R: Read> Reader<R> {
185 pub fn new(source: R) -> Self {
189 Reader {
190 buf: Vec::with_capacity(65536),
191 scratch: Vec::new(),
192 ranges: Vec::new(),
193 start: 0,
194 end: 0,
195 line: 1,
196 field_start: 0,
197 scratch_field_start: 0,
198 state: State::StartOfField,
199 delimiter: b',',
200 flexible: false,
201 num_fields: None,
202 row_size_hint: 0,
203 headers_parsed: false,
204 pending_cr: false,
205 headers: Vec::new(),
206 #[cfg(feature = "serde")]
207 header_map: None,
208 source,
209 eof: false,
210 }
211 }
212
213 pub fn set_delimiter(mut self, byte: u8) -> Self {
215 self.delimiter = byte;
216 self
217 }
218
219 pub fn set_flexible(mut self, yes: bool) -> Self {
221 self.flexible = yes;
222 self
223 }
224
225 pub fn set_headers(mut self, headers: Vec<String>) -> Self {
229 self.headers_parsed = true;
230 self.headers = headers;
231 #[cfg(feature = "serde")]
232 {
233 let map: BTreeMap<String, usize> = self
234 .headers
235 .iter()
236 .enumerate()
237 .map(|(i, name)| (name.clone(), i))
238 .collect();
239 self.header_map = Some(Arc::new(map));
240 }
241 self
242 }
243
244 pub fn headers(&self) -> Option<&[String]> {
246 if self.headers_parsed { Some(&self.headers) } else { None }
247 }
248
249 pub fn parse_headers(&mut self) -> Result<&[String], ReadError> {
255 let row = match self.read_row() {
256 Some(row) => row,
257 None => return Ok(&[]),
258 };
259 let headers: Vec<String> = row.to_vec()?.iter().map(|s| s.to_string()).collect();
260 self.headers_parsed = true;
261 self.num_fields = Some(headers.len());
262 self.headers = headers;
263 #[cfg(feature = "serde")]
264 {
265 let map: BTreeMap<String, usize> = self
266 .headers
267 .iter()
268 .enumerate()
269 .map(|(i, name)| (name.clone(), i))
270 .collect();
271 self.header_map = Some(Arc::new(map));
272 }
273 Ok(&self.headers)
274 }
275
276 pub fn rows(&mut self) -> Rows<'_, R> {
278 Rows {
279 reader: self,
280 }
281 }
282
283 pub fn rows_bytes(&mut self) -> BytesRows<'_, R> {
285 BytesRows {
286 reader: self,
287 }
288 }
289
290 fn fill_buf(&mut self) -> Result<bool, ReadError> {
295 if self.eof {
296 return Ok(false);
297 }
298 if self.source.read(&mut self.buf)? {
299 self.end = self.buf.len();
300 Ok(true)
301 } else {
302 self.eof = true;
303 Ok(false)
304 }
305 }
306
307 fn read_row(&mut self) -> Option<Row> {
313 self.compact();
314 self.ranges.clear();
315 self.scratch.clear();
316 self.state = State::StartOfField;
317
318 loop {
319 if self.start >= self.end {
320 match self.fill_buf() {
321 Err(e) => {
322 self.eof = true;
323 return match self.state {
324 State::InQuoted => Some(self.build_row(Some(e))),
325 _ => {
326 self.finalize_current_field();
327 Some(self.build_row(Some(e)))
328 }
329 };
330 }
331 Ok(false) => {
332 if self.ranges.is_empty() && self.state == State::StartOfField {
333 return None;
334 }
335 return match self.state {
336 State::InQuoted => Some(self.build_row(Some(ReadError::new(
337 ReadErrorKind::UnterminatedQuote,
338 self.line,
339 0,
340 )))),
341 _ => {
342 self.finalize_current_field();
343 Some(self.build_row(None))
344 }
345 };
346 }
347 Ok(true) => {}
348 }
349 }
350
351 if self.pending_cr {
352 if self.buf[self.start] == b'\n' {
353 self.start += 1;
354 }
355 self.pending_cr = false;
356 continue;
357 }
358
359 let byte = self.buf[self.start];
360
361 match self.state {
362 State::StartOfField => {
363 if byte == b'\r' || byte == b'\n' {
364 if !self.ranges.is_empty() {
365 self.ranges.push(RawField {
366 start: self.start,
367 end: self.start,
368 src: Src::Buf,
369 });
370 }
371 self.consume_newline();
372 if self.ranges.is_empty() {
373 continue;
374 }
375 return Some(self.build_row(None));
376 }
377 self.field_start = self.start;
378 if byte == self.delimiter {
379 self.ranges.push(RawField {
380 start: self.start,
381 end: self.start,
382 src: Src::Buf,
383 });
384 self.start += 1;
385 continue;
386 }
387 if byte == b'"' {
388 self.scratch_field_start = self.scratch.len();
389 self.start += 1;
390 self.state = State::InQuoted;
391 } else {
392 self.start += 1;
393 self.state = State::InUnquoted;
394 }
395 }
396
397 State::InUnquoted => {
398 let haystack = &self.buf[self.start..self.end];
399 match memchr::memchr3(self.delimiter, b'\r', b'\n', haystack) {
400 Some(offset) => {
401 let pos = self.start + offset;
402 self.ranges.push(RawField {
403 start: self.field_start,
404 end: pos,
405 src: Src::Buf,
406 });
407 let b = self.buf[pos];
408 if b == self.delimiter {
409 self.start = pos + 1;
410 self.state = State::StartOfField;
411 } else {
412 self.start = pos;
413 self.consume_newline();
414 return Some(self.build_row(None));
415 }
416 }
417 None => {
418 self.start = self.end;
419 }
420 }
421 }
422
423 State::InQuoted => {
424 let haystack = &self.buf[self.start..self.end];
425 match memchr::memchr(b'"', haystack) {
426 Some(offset) => {
427 let quote_pos = self.start + offset;
428 self.scratch.extend_from_slice(&self.buf[self.start..quote_pos]);
429 let after_quote = quote_pos + 1;
430 if after_quote < self.end && self.buf[after_quote] == b'"' {
431 self.scratch.push(b'"');
432 self.start = after_quote + 1;
433 } else if after_quote < self.end {
434 self.ranges.push(RawField {
435 start: self.scratch_field_start,
436 end: self.scratch.len(),
437 src: Src::Scratch,
438 });
439 self.start = after_quote;
440 self.state = State::AfterQuote;
441 } else {
442 match self.fill_buf() {
443 Ok(true) if self.buf[after_quote] == b'"' => {
444 self.scratch.push(b'"');
445 self.start = after_quote + 1;
446 }
447 Err(e) => {
448 self.ranges.push(RawField {
449 start: self.scratch_field_start,
450 end: self.scratch.len(),
451 src: Src::Scratch,
452 });
453 self.start = after_quote;
454 self.state = State::AfterQuote;
455 return Some(self.build_row(Some(e)));
456 }
457 _ => {
458 self.ranges.push(RawField {
459 start: self.scratch_field_start,
460 end: self.scratch.len(),
461 src: Src::Scratch,
462 });
463 self.start = after_quote;
464 self.state = State::AfterQuote;
465 }
466 }
467 }
468 }
469 None => {
470 self.scratch.extend_from_slice(&self.buf[self.start..self.end]);
471 self.start = self.end;
472 }
473 }
474 }
475
476 State::AfterQuote => {
477 if byte == self.delimiter {
478 self.start += 1;
479 self.state = State::StartOfField;
480 } else if is_newline(byte) {
481 self.consume_newline();
482 return Some(self.build_row(None));
483 } else {
484 return Some(self.build_row(Some(ReadError::new(
485 ReadErrorKind::TrailingContent,
486 self.line,
487 0,
488 ))));
489 }
490 }
491 }
492 }
493 }
494
495 fn compact(&mut self) {
497 if self.start > 0 {
498 let remaining = self.end - self.start;
499 if remaining > 0 {
500 self.buf.copy_within(self.start..self.end, 0);
501 }
502 self.buf.truncate(remaining);
503 self.end = remaining;
504 self.start = 0;
505 }
506 }
507
508 fn consume_newline(&mut self) {
513 if self.start < self.end && self.buf[self.start] == b'\r' {
514 self.start += 1;
515 if self.start >= self.end {
516 self.pending_cr = true;
517 }
518 }
519 if self.start < self.end && self.buf[self.start] == b'\n' {
520 self.start += 1;
521 }
522 self.line += 1;
523 }
524
525 fn finalize_current_field(&mut self) {
530 match self.state {
531 State::InUnquoted => {
532 self.ranges.push(RawField {
533 start: self.field_start,
534 end: self.start,
535 src: Src::Buf,
536 });
537 }
538 State::AfterQuote => {}
539 State::StartOfField => {
540 if !self.ranges.is_empty() {
541 self.ranges.push(RawField {
542 start: self.start,
543 end: self.start,
544 src: Src::Buf,
545 });
546 }
547 }
548 State::InQuoted => unreachable!(),
549 }
550 }
551
552 fn build_row(&mut self, error: Option<ReadError>) -> Row {
554 let bytes_row = self.build_bytes_row(error);
555 Row {
556 inner: bytes_row,
557 #[cfg(feature = "serde")]
558 header_map: self.header_map.clone(),
559 }
560 }
561
562 fn build_bytes_row(&mut self, error: Option<ReadError>) -> BytesRow {
564 let mut total: usize = 0;
565 for r in &self.ranges {
566 total += r.end - r.start;
567 }
568
569 let row_buf_capacity = if total > 0 { total } else { self.row_size_hint.max(64) };
570 let mut row_buf = Vec::with_capacity(row_buf_capacity);
571 let mut row_ranges = Vec::with_capacity(self.num_fields.unwrap_or(self.ranges.len()));
572
573 for r in &self.ranges {
574 let slice = match r.src {
575 Src::Buf => &self.buf[r.start..r.end],
576 Src::Scratch => &self.scratch[r.start..r.end],
577 };
578 let start = row_buf.len();
579 row_buf.extend_from_slice(slice);
580 let end = row_buf.len();
581 row_ranges.push(FieldRange {
582 start,
583 end,
584 });
585 }
586
587 let field_count = row_ranges.len();
588
589 let error = if !self.flexible {
590 error.or_else(|| match self.num_fields {
591 Some(expected) if field_count != expected => Some(ReadError::new(
592 ReadErrorKind::InconsistentFieldCount {
593 expected,
594 found: field_count,
595 },
596 self.line,
597 0,
598 )),
599 _ => None,
600 })
601 } else {
602 error
603 };
604
605 if self.num_fields.is_none() && field_count > 0 {
606 self.num_fields = Some(field_count);
607 }
608
609 if row_buf.len() > self.row_size_hint {
610 self.row_size_hint = row_buf.len();
611 }
612
613 BytesRow {
614 buf: row_buf,
615 ranges: row_ranges,
616 error,
617 line: self.line,
618 }
619 }
620}
621
622fn is_newline(b: u8) -> bool {
623 b == b'\n' || b == b'\r'
624}
625
626pub struct BytesRow {
629 pub(crate) buf: Vec<u8>,
630 pub(crate) ranges: Vec<FieldRange>,
631 pub(crate) error: Option<ReadError>,
632 line: usize,
633}
634
635impl BytesRow {
636 pub fn error(&self) -> Option<&ReadError> {
637 self.error.as_ref()
638 }
639
640 pub fn len(&self) -> usize {
641 self.ranges.len()
642 }
643
644 pub fn is_empty(&self) -> bool {
645 self.ranges.is_empty()
646 }
647
648 pub fn get(&self, index: usize) -> Option<&[u8]> {
649 self.ranges.get(index).map(|r| &self.buf[r.start..r.end])
650 }
651
652 pub fn to_vec(&self) -> Result<Vec<&[u8]>, ReadError> {
653 if let Some(ref e) = self.error {
654 return Err(e.clone());
655 }
656 Ok(self.ranges.iter().map(|r| &self.buf[r.start..r.end]).collect())
657 }
658
659 pub fn iter(&self) -> BytesFields<'_> {
660 BytesFields {
661 buf: &self.buf,
662 iter: self.ranges.iter(),
663 }
664 }
665}
666
667impl fmt::Debug for BytesRow {
668 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
669 match &self.error {
670 Some(e) => write!(f, "BytesRow(Err({e}))"),
671 None => f
672 .debug_list()
673 .entries(self.ranges.iter().map(|r| &self.buf[r.start..r.end]))
674 .finish(),
675 }
676 }
677}
678
679pub struct BytesFields<'a> {
680 buf: &'a [u8],
681 iter: core::slice::Iter<'a, FieldRange>,
682}
683
684impl<'a> Iterator for BytesFields<'a> {
685 type Item = &'a [u8];
686 fn next(&mut self) -> Option<&'a [u8]> {
687 let r = self.iter.next()?;
688 Some(&self.buf[r.start..r.end])
689 }
690 fn size_hint(&self) -> (usize, Option<usize>) {
691 self.iter.size_hint()
692 }
693}
694
695impl<'a> ExactSizeIterator for BytesFields<'a> {}
696
697pub struct Row {
700 pub(crate) inner: BytesRow,
701 #[cfg(feature = "serde")]
702 pub(crate) header_map: Option<Arc<BTreeMap<String, usize>>>,
703}
704
705impl Row {
706 pub fn error(&self) -> Option<&ReadError> {
707 self.inner.error()
708 }
709 pub fn len(&self) -> usize {
710 self.inner.len()
711 }
712 pub fn is_empty(&self) -> bool {
713 self.inner.is_empty()
714 }
715
716 pub fn get(&self, index: usize) -> Option<Result<&str, ReadError>> {
717 self.inner.get(index).map(|bytes| {
718 core::str::from_utf8(bytes).map_err(|_| ReadError::new(ReadErrorKind::InvalidUtf8, self.inner.line, 0))
719 })
720 }
721
722 pub fn to_vec(&self) -> Result<Vec<&str>, ReadError> {
723 if let Some(ref e) = self.inner.error {
724 return Err(e.clone());
725 }
726 self.inner
727 .ranges
728 .iter()
729 .map(|r| {
730 core::str::from_utf8(&self.inner.buf[r.start..r.end])
731 .map_err(|_| ReadError::new(ReadErrorKind::InvalidUtf8, self.inner.line, 0))
732 })
733 .collect()
734 }
735
736 pub fn iter(&self) -> Fields<'_> {
737 Fields {
738 buf: &self.inner.buf,
739 iter: self.inner.ranges.iter(),
740 error: self.inner.error.as_ref().map(|e| &e.kind),
741 line: self.inner.line,
742 }
743 }
744}
745
746impl fmt::Debug for Row {
747 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
748 match &self.inner.error {
749 Some(e) => write!(f, "Row(Err({e}))"),
750 None => {
751 f.debug_list()
752 .entries(
753 self.inner.ranges.iter().map(|r| {
754 core::str::from_utf8(&self.inner.buf[r.start..r.end]).unwrap_or("<invalid utf-8>")
755 }),
756 )
757 .finish()
758 }
759 }
760 }
761}
762
763pub struct Fields<'a> {
764 buf: &'a [u8],
765 iter: core::slice::Iter<'a, FieldRange>,
766 error: Option<&'a ReadErrorKind>,
767 line: usize,
768}
769
770impl<'a> Iterator for Fields<'a> {
771 type Item = Result<&'a str, ReadError>;
772 fn next(&mut self) -> Option<Self::Item> {
773 let r = self.iter.next()?;
774 if let Some(kind) = self.error {
775 return Some(Err(ReadError::new(kind.clone(), self.line, 0)));
776 }
777 Some(
778 core::str::from_utf8(&self.buf[r.start..r.end])
779 .map_err(|_| ReadError::new(ReadErrorKind::InvalidUtf8, self.line, 0)),
780 )
781 }
782 fn size_hint(&self) -> (usize, Option<usize>) {
783 self.iter.size_hint()
784 }
785}
786
787impl<'a> ExactSizeIterator for Fields<'a> {}
788
789pub struct BytesRows<'r, R: Read> {
792 reader: &'r mut Reader<R>,
793}
794
795impl<'r, R: Read> Iterator for BytesRows<'r, R> {
796 type Item = BytesRow;
797 fn next(&mut self) -> Option<Self::Item> {
798 self.reader.read_row().map(|r| r.inner)
799 }
800}
801
802pub struct Rows<'r, R: Read> {
803 reader: &'r mut Reader<R>,
804}
805
806impl<'r, R: Read> Iterator for Rows<'r, R> {
807 type Item = Row;
808 fn next(&mut self) -> Option<Self::Item> {
809 self.reader.read_row()
810 }
811}