Skip to content

Commit

Permalink
GEOMESA-3343 Kafka integration test for GetGeoMesaKafkaRecord
Browse files Browse the repository at this point in the history
  • Loading branch information
elahrvivaz committed Mar 19, 2024
1 parent 2bc9b32 commit d41946a
Show file tree
Hide file tree
Showing 5 changed files with 1,173 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ class AccumuloNarIT extends Specification {

nifiContainer =
new NiFiContainer()
.withDefaultIngestFlow("accumulo21", classOf[AccumuloDataStoreService], Map("accumulo.catalog" -> catalog))
.withDefaultIngestFlow[AccumuloDataStoreService]("accumulo21", Map("accumulo.catalog" -> catalog))
.withFileSystemBind(accumuloClientProps, clientPropsMountPath, BindMode.READ_ONLY)
.withNetwork(network)
nifiContainer.start()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,8 @@ if [[ -f /logback.xml ]]; then
cp /logback.xml /opt/nifi/nifi-current/conf/
fi

# lower the administrative yield deadline, to decrease test time when failing due to e.g. feature types not yet being available
sed -i s'/nifi.administrative.yield.duration=.*/nifi.administrative.yield.duration=1 sec/' /opt/nifi/nifi-current/conf/nifi.properties

# delegate to the normal nifi entrypoint
exec /opt/nifi/scripts/start.sh
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.testcontainers.utility.{DockerImageName, PathUtils}
import java.io.{ByteArrayInputStream, File, FileOutputStream, InputStream}
import java.nio.charset.StandardCharsets
import java.nio.file.{Files, Path, Paths}
import scala.reflect.ClassTag

class NiFiContainer(image: DockerImageName) extends GenericContainer[NiFiContainer](image) {

Expand Down Expand Up @@ -58,15 +59,14 @@ class NiFiContainer(image: DockerImageName) extends GenericContainer[NiFiContain
* @param params parameters used to configure the data store service
* @return
*/
def withDefaultIngestFlow(
def withDefaultIngestFlow[T <: DataStoreService: ClassTag](
narName: String,
datastoreService: Class[_ <: DataStoreService],
params: Map[String, String]): NiFiContainer = {
val nar = findNar(narName)
withNarByPath(nar)
val flow = WithClose(getClass.getClassLoader.getResourceAsStream("docker/ingest-flow.json")) { is =>
IOUtils.toString(is, StandardCharsets.UTF_8)
.replace("STORE_CLASS", datastoreService.getName)
.replace("STORE_CLASS", implicitly[ClassTag[T]].runtimeClass.getName)
.replace("STORE_NAR", new File(nar).getName.replaceAll("-[0-9.]+(-SNAPSHOT)?\\.nar", ""))
.replace("STORE_PROPERTIES", new Gson().toJson(params.asJava))
}
Expand Down
Loading

0 comments on commit d41946a

Please sign in to comment.