Skip to content

ARROW-10882: [Python] Allow writing dataset from iterator of batches#9802

Closed
lidavidm wants to merge 9 commits into
apache:masterfrom
lidavidm:arrow-10882
Closed

ARROW-10882: [Python] Allow writing dataset from iterator of batches#9802
lidavidm wants to merge 9 commits into
apache:masterfrom
lidavidm:arrow-10882

Conversation

@lidavidm

Copy link
Copy Markdown
Member

This binds InMemoryDataset to Python, allowing us to create and write back out datasets from iterables of record batches and various other objects.

@github-actions

Copy link
Copy Markdown

@lidavidm

Copy link
Copy Markdown
Member Author

CC @jorisvandenbossche perhaps?

@jorisvandenbossche jorisvandenbossche left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Really cool, thanks!

Added few minor inline comments.

I assume this is existing behaviour of the C++ InMemoryDataset, but trying it out now because you added bindings for it, and I am noticing some surprising behaviour (or at least different behaviour compared to other datasets) regarding multiple scans.

Listing fragments only works a single time:

In [26]: table = pa.table({"a": range(10), 'b': np.random.randn(10)})

In [27]: dataset = ds.dataset(table)

In [28]: list(dataset.get_fragments())
Out[28]: [<pyarrow._dataset.Fragment at 0x7fe157128d70>]

In [29]: list(dataset.get_fragments())
Out[29]: []

Scanning a fragment results in an empty table? (I had expected that it would work at least the first time)

In [30]: dataset = ds.dataset(table)

In [31]: fragment = list(dataset.get_fragments())[0]

In [32]: fragment.to_table().to_pandas()
Out[32]: 
Empty DataFrame
Columns: []
Index: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

Scanning the dataset twice gives an empty table the second time:

In [33]: dataset = ds.dataset(table)

In [34]: dataset.to_table().to_pandas()
Out[34]: 
   a         b
0  0  0.114496
1  1 -0.402515
2  2 -0.581416
3  3 -0.919706
4  4 -1.335101
5  5  0.743333
6  6 -0.156003
7  7 -0.273285
8  8 -0.901662
9  9 -1.873260

In [35]: dataset.to_table().to_pandas()
Out[35]: 
Empty DataFrame
Columns: [a, b]
Index: []

I suppose some of this is the expected behaviour (eg since the iterator is only consumed a single time), but it might then be worth to more explicitly document this. As with FileSystemDatasets, you can happily do multiple scans (eg with different filters).
But an error instead of empty results could also be more user friendly IMO.

Comment thread python/pyarrow/_dataset.pyx Outdated

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
f'Item has schema {item.schema} which which does not '
f'Item has schema {item.schema} which does not '

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And I would maybe add some \n around the inserted schema to improve readability of the message

Comment thread python/pyarrow/dataset.py Outdated
Comment on lines 735 to 738

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could also pass this to an InMemoryDataset? Then _filesystemdataset_write could potentially be simplified to not have to deal with a list of batches/tables

@lidavidm

Copy link
Copy Markdown
Member Author

Hmm, I think the behavior is specifically because we're passing a reader, which of course is single-shot. In some cases we can't avoid this (e.g. if you pass in a Flight reader or an iterator). But InMemoryDataset allows passing a record batch iterator factory which we should take advantage of as much as possible, so at least in the case where we get a list of batches, that should be re-readable.

@lidavidm

Copy link
Copy Markdown
Member Author

Aha, the reason why scanning a fragment is empty is because it gets constructed with an empty schema due to a spot of undefined behavior.

InMemoryFragment::InMemoryFragment(RecordBatchVector record_batches,
                                   Expression partition_expression)
    : InMemoryFragment(record_batches.empty() ? schema({}) : record_batches[0]->schema(),
                       std::move(record_batches), std::move(partition_expression)) {}

This might move record_batches before we evaluate empty(), resulting in an empty schema getting passed.

@lidavidm

Copy link
Copy Markdown
Member Author

This should be good (minus the known flaky integration test). As suggested I've changed _filesystemdataset_write such that it only needs to handle datasets now.

@jorisvandenbossche jorisvandenbossche left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updates look good!

@lidavidm

lidavidm commented Apr 5, 2021

Copy link
Copy Markdown
Member Author

@westonpace would it be easier if we held this until ARROW-7001 is through? If we think ARROW-7001 won't make it we can let this through then, or else rework it after ARROW-7001 lands.

@westonpace

Copy link
Copy Markdown
Member

Is the concern just that we will need to rebase this into ARROW-7001? It doesn't look like it should be too challenging to rebase so I'd say go for it. Or is there some other concern I'm missing?

@lidavidm

lidavidm commented Apr 5, 2021

Copy link
Copy Markdown
Member Author

Ah yeah, I just wanted to make sure it wasn't creating a lot of additional work for you in that PR.

@westonpace

Copy link
Copy Markdown
Member

Don't worry about it. Go for it.

@lidavidm

lidavidm commented Apr 6, 2021

Copy link
Copy Markdown
Member Author

@jorisvandenbossche any other comments?

@jorisvandenbossche

Copy link
Copy Markdown
Member

Nope, I approved above, so go ahead and merge!

@lidavidm lidavidm closed this in 3274d08 Apr 6, 2021
@westonpace

Copy link
Copy Markdown
Member

Postmortem comments now that I'm reviewing this in more detail to merge 😃

  • If you accept a record batch reader then "in-memory" could be misleading. There is nothing preventing you from passing an IPC reader of any kind. In the future we might want to rename this to something like ExternalDataset or PipedDataset or StreamingDataset.

  • Until we interface with Python async there is no way to really scan this asynchronously. I can either scan it on a background thread or use the CPU thread and simply pray that the reader doesn't block. For now I'll do the latter but going forwards maybe we should split this into two different dataset classes? An InMemoryDataset which wraps a list of batches (or table[s]) and a piped dataset which wraps a reader/iterator? The latter would be consumed by an I/O thread while the former would just get consumed on the CPU thread.

@lidavidm

lidavidm commented Apr 6, 2021

Copy link
Copy Markdown
Member Author

Both points sound good to me. I filed ARROW-12231 to keep track.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants