背景:我想从文本文件中读取一些数据到polars
数据框中。数据从包含字符串 的行开始foo
,并在之后的第一个空行处停止。示例文件test.txt
:
stuff to skip
more stuff to skip
skip me too
foo bar foobar
1 2 A
4 5 B
7 8 C
other stuff
stuff
pl.read_csv
有 argsskip_rows
和n_rows
.因此,如果我可以找到行号foo
和第一个空行的行号,我应该能够将数据读入polars
数据帧。我怎样才能做到这一点?我能够找到skip_rows
:
from pathlib import Path
file_path = Path('test.txt')
with open(file_path, 'r') as file:
skip_rows = 0
n_rows = 0
for line_number, line in enumerate(file, 1):
if 'foo' in line:
skip_rows = line_number - 1
但我怎样才能n_rows
在不再次扫描文件的情况下找到它呢?此外,解决方案必须处理没有包含 的行的情况foo
,例如
stuff to skip
more stuff to skip
skip me too
1 2 A
4 5 B
7 8 C
other stuff
stuff
在这种情况下,我想返回一个指示foo
未找到的值,或者引发异常,以便调用者知道出了问题(也许是ValueError
异常?)。
编辑:我忘记了一个边缘情况。有时数据可能会持续到文件末尾:
stuff to skip
more stuff to skip
skip me too
foo bar foobar
1 2 A
4 5 B
7 8 C
0
5 个回答
5
你可以试试:
start, end = None, None
with open("your_file.txt", "r") as f_in:
for line_no, line in enumerate(map(str.strip, f_in)):
if line.startswith("foo"): # or use `if "foo" in line:`
start = line_no
elif start is not None and line == "":
end = line_no
break
else:
# no break, but we found `foo`
if start is not None:
end = line_no
else:
print("foo not found!")
if start is not None:
print(f"{start=} {end=}")
打印(包含问题的第一个输入):
start=6 end=10
4
-
“数据从包含字符串 foo 的行开始” – 它没有说明具体以“foo”开头。另外,如果所有数据都位于文件末尾并且没有空行怎么办?您的示例将声称未找到 foo 。
–
-
if 'food network'.startswith("foo"):
另外if "foo" in "foot fetish":
~你看到问题了吗?
– -
@OysterShucker,我留给OP – 他/她可以使用正则表达式,str.split等(如果需要)
– -
但你的答案是与我的竞争,而我的答案会处理你“留给OP”的所有问题。这是对你自己的侮辱。你在这上面花费了时间,但这并不是为了争夺奖励。我并不是想缠扰你。
–
|
使用可能是一个有用的模式。
如果没有产生任何值,则会next()
引发StopIteration
异常,您可以捕获该异常并将其报告给调用者。
with open("test.txt") as f:
f = enumerate(f)
try:
skip_rows = next(n for n, line in f if "foo" in line)
except StopIteration:
raise ValueError("Start line not found.")
for n, line in f:
if line.strip() == "":
n -= 1
break
n_rows = n - skip_rows
print(f"{skip_rows=}")
print(f"{n_rows=}")
skip_rows=6
n_rows=3
2
-
有趣的!但我不精确 – 如果数据一直到文本文件末尾就可以了。换句话说,我认为没有
ValueError("End line not found.")
必要。您能否修改您的答案,以便当数据部分持续到文件末尾时它也可以工作?我在问题中添加了一个相关的测试用例。
– -
啊好吧。在这种情况下,
next()
只能用于起始行条件。我已经编辑了答案,但与常规的 for 循环方法相比,它可能不再有用。
–
|
这是一个可能的解决方案。该解决方案涵盖了一些边缘情况。
- 它不会在“foobar”中找到“foo”
- 它将找到“Foo”、“fOO”、“FOO”等
- 它停在第一个空行或文件末尾,以先到者为准
try:
with open('test.txt', 'r') as lines:
for row, line in enumerate(lines):
# maybe "foo" is present but mixed or uppercase
# split on space so we find exactly "foo" and not "foo" in "footage"
if 'foo' in line.lower().split(' '):
for n_row, ln in enumerate(lines, row+1):
if not ln.strip():
break
else:
# end of file, this line doesn't actually exist
# which is fine if you use this number for `stop` with range or splice
n_row += 1
break
else:
raise Exception
except:
print("foo was not found")
else:
print(row, n_row) #6, 10
您可能需要考虑到,获取行号可能意味着您需要再次检查数据。只需很少的修改,您就可以简单地获取数据,并且仍然可以获得行号。
try:
with open('test.txt', 'r') as lines:
for row, line in enumerate(lines):
if 'foo' in line.lower().split(' '):
data = [line.strip()]
for n_row, ln in enumerate(lines, row+1):
if not (line := ln.strip()):
break
data.append(line)
else:
n_row += 1
break
else:
raise Exception
except:
print("foo was not found")
else:
print(row, n_row)
print(*data, sep='\n')
输出
6 10
foo bar foobar
1 2 A
4 5 B
7 8 C
这是一个跳过所有行骗局并直接解析数据并将其转换为数据帧的版本。
import polars
from io import StringIO
try:
with open('test.txt', 'r') as lines:
for line in lines:
if 'foo' in line.lower().split(' '):
data = line
for ln in lines:
if not ln.strip(): break
data += ln
break
else:
raise Exception
except:
print("foo was not found")
else:
print(polars.read_csv(StringIO(data)))
输出
shape: (3, 1)
┌────────────────┐
│ foo bar foobar │
│ --- │
│ str │
╞════════════════╡
│ 1 2 A │
│ 4 5 B │
│ 7 8 C │
└────────────────┘
3
-
我不明白这一行:
if not (line := ln.strip()):
。你为什么改变它if not ln.strip():
?
–
-
@DeltaIV – 因为在我的第一个中我们没有保存该行。在我的第二个例子中,我们正在保存线路。我将剥离的行分配给变量 (
line
),这样我就不必再次剥离它来保存剥离的版本。海象 (:=
) 允许我分配一个变量并具有条件,所有这些都在一行中。
– -
@DeltaIV – 我用一个简单地获取数据并将其转换为数据框的版本更新了我的答案。
–
|
这是两个全极性解决方案,每个解决方案都依赖于固定长度界定来区分什么是数据或不是数据。
扫描一切方法
csv_scan = (
pl.scan_csv("ex.txt",
separator="\0", # null character as sep so just one column
has_header=False,
schema={"a": pl.String} # name column here
)
.filter(pl.col('a').is_not_null())
)
dfdata = (
csv_scan
# this is kind of like a row id but only increments at unique values
.group_by(z=pl.col("a").str.len_chars().rle_id(), maintain_order=True)
.agg("a", len=pl.len())
# Assume that data rows have the same string length, that the data
# rows will be continuous, and that the meta rows won't have as many
# same str length rows in a row as the data
.with_columns(z1=pl.col("z").filter(pl.col("len").max() == pl.col("len")))
.filter(
(pl.col("z") == pl.col('z1'))
| (pl.col("z") + 1 == pl.col('z1'))
)
# extract all based on \S regex, can't use extract_groups b/c we
# don't know how many proper columns there are
.select(
pl.col("a")
.explode()
.str.extract_all(r"(\S)+")
.list.to_struct()
)
.unnest("a")
.collect()
.pipe(
lambda df: (
# promote 0th row to header and discard it from data itself
df[1:].rename(
{f"field_{x}": y
for x, y in enumerate(next(df.iter_rows()))})
)
)
# TODO: cast columns to intended datatypes (see below)
)
扫描偏移方法然后重读
csv_scan = pl.scan_csv(
"ex.txt",
separator="\0", # null character as sep so just one column
has_header=False,
schema={"a": pl.String}, # name column here
)
offsets = (
csv_scan.with_columns(i=pl.int_range(pl.len()))
.group_by(z=pl.col("a").str.len_chars().rle_id(), maintain_order=True)
.agg("a", "i", len=pl.len())
.filter(
(pl.col("z") == pl.col("z").filter(pl.col("len").max() == pl.col("len")))
| (pl.col("z") + 1 == pl.col("z").filter(pl.col("len").max() == pl.col("len")))
)
.collect()
)
df = pl.read_csv(
"ex.txt",
skip_rows=offsets["i"][0][0] + 1,
has_header=False,
separator=" ",
n_rows=offsets["len"][-1],
).pipe(lambda df: (
df.select(
df.melt()
.filter(pl.col('value').is_not_null())
.get_column('variable')
.unique(maintain_order=True)
.to_list()
)
))
df.columns=offsets['a'][0].str.replace_all("(\s+)", " ").str.split(" ")[0]
第二种方法的好处是read_csv
可以推断数据类型,但是当我运行它时, foo 列仍然被推断为字符串,也许第一种方法最适合手动 dtype 转换。
尝试将字符串转换为数字
for col in dfdata.columns:
try:
dfdata=dfdata.with_columns(pl.col(col).cast(pl.Int64))
continue
except:
pass
try:
dfdata=dfdata.with_columns(pl.col(col).cast(pl.Float64))
continue
except:
pass
|
这是一个简单的方法:
- 将 csv 读取为单列 csv 文件
- 使用 Polars 字符串匹配来查找偏移量
- 用于
write_csv
将正确的slice
数据帧写入StringIO
缓冲区 - 使用
read_csv
缓冲区StringIO
作为源来根据需要解析 csv。
def read_received_csv(path_to_file: str) -> DataFrame | None:
"""
Read CSV file from the first occurrence of 'foo' until the next blank line or end of file.
If 'foo' is not found, return None.
"""
df_initial_read = pl.read_csv(
path_to_file,
has_header=False,
separator="\0", # no separator
row_index_name="index",
missing_utf8_is_empty_string=True,
new_columns=["index", "unparsed_string"],
)
# Find the offset of the first occurrence of "foo".
df_foo = df_initial_read.filter(pl.col("unparsed_string").str.contains(r"foo"))
if df_foo.is_empty():
return None
slice_offset = df_foo.get_column("index").item(0)
# Starting with the offset above, find the first blank line that follows "foo", if any
df_next_blank = df_initial_read.slice(slice_offset).filter(
pl.col("unparsed_string").str.strip_chars() == ""
)
if df_next_blank.is_empty():
slice_length = None
else:
slice_length = df_next_blank.get_column("index").item(0) - slice_offset
# Take a "slice" of the initial unparsed csv, write it out to a string buffer, and then
# allow Polars to parse the string buffer per usual.
return pl.read_csv(
StringIO(
df_initial_read.select("unparsed_string")
.slice(slice_offset, slice_length)
.write_csv(include_header=False)
),
separator="\t",
)
对于上面的三种情况,这将返回:(带有foo
一个空行,并假设制表符作为分隔符):
>>> read_received_csv("data/raw/so_78196632/test1.txt")
shape: (3, 3)
foo bar foobar
--- --- ---
i64 i64 str
1 2 A
4 5 B
7 8 C
(没有出现foo
)
>>> read_received_csv("data/raw/so_78196632/test2.txt")
(foo
发生,但后面没有空行)
>>> read_received_csv("data/raw/so_78196632/test3.txt")
shape: (3, 3)
foo bar foobar
--- --- ---
i64 i64 str
1 2 A
4 5 B
7 8 C
这是一个相对简单的方法。也许不适合非常大的文件(因为它将文件的全部内容读入 RAM)。
|
|