我有一个用 Go 编写的管道,想要用 Spark runner 执行,Spark Standalone 安装在我的本地机器上。

  • Apache Beam 2.56.0

  • Apache Spark 3.2.2

我使用此命令从安装目录启动了 Spark master 和 worker。

# for master
./sbin/start-master.sh -h localhost

# for worker
./sbin/start-worker.sh spark://localhost:7077

然后我启动了 beam_spark3_job_server 并安装/tmp

docker run -v /tmp:/tmp --net=host apache/beam_spark3_job_server:2.56.0 \
        --spark-master-url=spark://localhost:7077

现在,从运行的 Go 项目

go run main.go --runner PortableRunner \
        --endpoint localhost:8099 \
        --environment_type LOOPBACK

运行正常,但environment_type设置为LOOPBACK

因此,如果我想删除它并在没有它的情况下再次运行脚本(默认情况下它设置为
DOCKER

go run main.go --runner PortableRunner \
        --endpoint localhost:8099

这样我就可以在控制台上看到它

java.lang.IllegalStateException: No container running for id xxxxx

不同,但使用带有 -v 的 docker run 命令解决了在 中找不到文件的问题/tmp/beam-artifact-staging

尽管如此,问题仍然存在。

以下是来自 Spark 的一些日志片段

24/07/02 15:21:37 DEBUG DockerEnvironmentFactory: Creating Docker Container with ID 1-1
24/07/02 15:21:39 DEBUG DockerEnvironmentFactory: Created Docker Container with Container ID 1464722e38b1eba5d50a3f4a7f8036c7ae03d524abcb3f0a5e868a73f5b51fc4
24/07/02 15:21:39 INFO GrpcLoggingService: Beam Fn Logging client connected.
24/07/02 15:21:39 DEBUG : Initializing Go harness: /opt/apache/beam/boot --id=1-1 --provision_endpoint=localhost:39005
24/07/02 15:21:39 DEBUG LocalFileSystem: opening file /tmp/beam-artifact-staging/9b228b83e120b5aa87f4ce34788bacdf1c35d2f05311deb8efb494bfbea0ff0b/1-0:go-/tmp/worker-1-1719926483620669554
24/07/02 15:21:40 WARN GrpcLoggingService: Logging client failed unexpectedly.
org.apache.beam.vendor.grpc.v1p60p1.io.grpc.StatusRuntimeException: CANCELLED: client cancelled
    at org.apache.beam.vendor.grpc.v1p60p1.io.grpc.Status.asRuntimeException(Status.java:529)
    at org.apache.beam.vendor.grpc.v1p60p1.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onCancel(ServerCalls.java:291)
    at org.apache.beam.vendor.grpc.v1p60p1.io.grpc.PartialForwardingServerCallListener.onCancel(PartialForwardingServerCallListener.java:40)
    at org.apache.beam.vendor.grpc.v1p60p1.io.grpc.ForwardingServerCallListener.onCancel(ForwardingServerCallListener.java:23)
    at org.apache.beam.vendor.grpc.v1p60p1.io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onCancel(ForwardingServerCallListener.java:40)
    at org.apache.beam.vendor.grpc.v1p60p1.io.grpc.Contexts$ContextualizedServerCallListener.onCancel(Contexts.java:96)
    at org.apache.beam.vendor.grpc.v1p60p1.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.closedInternal(ServerCallImpl.java:370)
    at org.apache.beam.vendor.grpc.v1p60p1.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.closed(ServerCallImpl.java:359)
    at org.apache.beam.vendor.grpc.v1p60p1.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1Closed.runInContext(ServerImpl.java:910)
    at org.apache.beam.vendor.grpc.v1p60p1.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
    at org.apache.beam.vendor.grpc.v1p60p1.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)
24/07/02 15:21:42 DEBUG AwsRegionProviderChain: Unable to load region from software.amazon.awssdk.regions.providers.InstanceProfileRegionProvider@247a4d99:Unable to contact EC2 metadata service.
24/07/02 15:21:42 DEBUG LocalDiskShuffleMapOutputWriter: Writing shuffle index file for mapId 1 with length 8
24/07/02 15:21:42 DEBUG IndexShuffleBlockResolver: Shuffle index for mapId 1: [0,0,0,0,0,0,0,0]
24/07/02 15:21:42 INFO Executor: Finished task 1.0 in stage 0.0 (TID 1). 6019 bytes result sent to driver
24/07/02 15:21:42 DEBUG ExecutorMetricsPoller: stageTCMP: (0, 0) -> 1
24/07/02 15:21:44 INFO DockerEnvironmentFactory: Still waiting for startup of environment apache/beam_go_sdk:2.56.0 for worker id 1-1
24/07/02 15:21:44 ERROR DockerEnvironmentFactory: Docker container 1464722e38b1eba5d50a3f4a7f8036c7ae03d524abcb3f0a5e868a73f5b51fc4 logs:
2024/07/02 13:21:39 Provision info:
pipeline_options:{fields:{key:"beam:option:app_name:v1"  value:{string_value:"go-job-1-1719926483620667092"}}  fields:{key:"beam:option:experiments:v1"  value:{list_value:{values:{string_value:"beam_fn_api"}}}}  fields:{key:"beam:option:go_options:v1"  value:{struct_value:{fields:{key:"options"  value:{struct_value:{fields:{key:"endpoint"  value:{string_value:"localhost:8099"}}  fields:{key:"hookOrder"  value:{string_value:"[\"default_remote_logging\"]"}}  fields:{key:"hooks"  value:{string_value:"{\"default_remote_logging\":null}"}}  fields:{key:"job"  value:{string_value:"wordcount"}}  fields:{key:"runner"  value:{string_value:"spark"}}}}}}}}  fields:{key:"beam:option:job_name:v1"  value:{string_value:"go0job0101719926483620667092-root-0702132126-ff3f12ba"}}  fields:{key:"beam:option:options_id:v1"  value:{number_value:2}}  fields:{key:"beam:option:parallelism:v1"  value:{number_value:-1}}  fields:{key:"beam:option:retain_docker_containers:v1"  value:{bool_value:false}}  fields:{key:"beam:option:runner:v1"  value:{null_value:NULL_VALUE}}  fields:{key:"beam:option:spark_master:v1"  value:{string_value:"spark://localhost:7077"}}}  retrieval_token:"go-job-1-1719926483620667092_8d8b0d53-0d18-49dc-908b-a85d0be89cc5"  logging_endpoint:{url:"localhost:36449"}  artifact_endpoint:{url:"localhost:36373"}  control_endpoint:{url:"localhost:43091"}  dependencies:{type_urn:"beam:artifact:type:file:v1"  type_payload:"\n\x84\x01/tmp/beam-artifact-staging/9b228b83e120b5aa87f4ce34788bacdf1c35d2f05311deb8efb494bfbea0ff0b/1-0:go-/tmp/worker-1-1719926483620669554"  role_urn:"beam:artifact:role:go_worker_binary:v1"}  runner_capabilities:"beam:protocol:control_response_elements_embedding:v1"
2024/07/02 13:21:40 Downloaded: /tmp/staged/1-worker-1-1719926483620669554 (sha256: 89580cb558dbc92138c20bdb88f8687d7c96386e9f6d0b717b07b68fe9327476, size: 122860883)
24/07/02 15:21:44 ERROR Executor: Exception in task 7.0 in stage 0.0 (TID 7)
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.UncheckedExecutionException: java.lang.IllegalStateException: No container running for id 1464722e38b1eba5d50a3f4a7f8036c7ae03d524abcb3f0a5e868a73f5b51fc4
    at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2086)
    at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache.get(LocalCache.java:4012)
    at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:4035)
    at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:5013)
    at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:5020)
    at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.<init>(DefaultJobBundleFactory.java:458)
    at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.<init>(DefaultJobBundleFactory.java:443)
    at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory.forStage(DefaultJobBundleFactory.java:310)
    at org.apache.beam.runners.fnexecution.control.DefaultExecutableStageContext.getStageBundleFactory(DefaultExecutableStageContext.java:38)
    at org.apache.beam.runners.fnexecution.control.ReferenceCountingExecutableStageContextFactory$WrappedContext.getStageBundleFactory(ReferenceCountingExecutableStageContextFactory.java:207)
    at org.apache.beam.runners.spark.translation.SparkExecutableStageFunction.call(SparkExecutableStageFunction.java:142)
    at org.apache.beam.runners.spark.translation.SparkExecutableStageFunction.call(SparkExecutableStageFunction.java:81)
    at org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitions$1(JavaRDDLike.scala:153)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:863)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:863)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
    at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
    at org.apache.spark.scheduler.Task.run(Task.scala:131)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1491)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.IllegalStateException: No container running for id 1464722e38b1eba5d50a3f4a7f8036c7ae03d524abcb3f0a5e868a73f5b51fc4
    at org.apache.beam.runners.fnexecution.environment.DockerEnvironmentFactory.createEnvironment(DockerEnvironmentFactory.java:137)
    at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(DefaultJobBundleFactory.java:259)
    at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(DefaultJobBundleFactory.java:232)
    at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3571)
    at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2313)
    at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2190)
    at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2080)
    ... 39 more
    Suppressed: java.io.IOException: Received exit code 1 for command 'docker kill 1464722e38b1eba5d50a3f4a7f8036c7ae03d524abcb3f0a5e868a73f5b51fc4'. stderr: Error response from daemon: cannot kill container: 1464722e38b1eba5d50a3f4a7f8036c7ae03d524abcb3f0a5e868a73f5b51fc4: container 1464722e38b1eba5d50a3f4a7f8036c7ae03d524abcb3f0a5e868a73f5b51fc4 is not running
        at org.apache.beam.runners.fnexecution.environment.DockerCommand.runShortCommand(DockerCommand.java:255)
        at org.apache.beam.runners.fnexecution.environment.DockerCommand.runShortCommand(DockerCommand.java:181)
        at org.apache.beam.runners.fnexecution.environment.DockerCommand.killContainer(DockerCommand.java:161)
        at org.apache.beam.runners.fnexecution.environment.DockerEnvironmentFactory.createEnvironment(DockerEnvironmentFactory.java:161)
        ... 45 more

最有趣的是

24/07/02 15:21:44 INFO DockerEnvironmentFactory: Still waiting for startup of environment apache/beam_go_sdk:2.56.0 for worker id 1-1
24/07/02 15:21:44 ERROR DockerEnvironmentFactory: Docker container 1464722e38b1eba5d50a3f4a7f8036c7ae03d524abcb3f0a5e868a73f5b51fc4 logs:

$(function() {
$(“.js-gps-inline-related-questions .spacer”).on(“click”, function () {
fireRelatedEvent($(this).index() + 1, $(this).data(‘question-id’));
});

function fireRelatedEvent(position, questionId) {
StackExchange.using(“gps”, function() {
StackExchange.gps.track(‘related_questions.click’,
{
position: position,
originQuestionId: 78697550,
relatedQuestionId: +questionId,
location: ‘inline’,
source: ‘Baseline_Fallback’
});
});
}
});

function toggleInlineRelated(showMore) {
var inlineRelatedLess = document.getElementById(“inline_related_var_a_less”);
var inlineRelatedMore = document.getElementById(“inline_related_var_a_more”);

var inlineRelatedSeeMore = document.getElementById(“inline_related_see_more”);
var inlineRelatedSeeLess = document.getElementById(“inline_related_see_less”);

if (showMore) {
inlineRelatedLess.classList.add(“d-none”);
inlineRelatedSeeMore.classList.add(“d-none”);

inlineRelatedMore.classList.remove(“d-none”);
inlineRelatedSeeLess.classList.remove(“d-none”);
}
else {
inlineRelatedMore.classList.add(“d-none”);
inlineRelatedSeeLess.classList.add(“d-none”);

inlineRelatedLess.classList.remove(“d-none”);
inlineRelatedSeeMore.classList.remove(“d-none”);
}
}

0