Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import scala.sys.process._
import java.util.Comparator
import org.apache.texera.common.config.PythonUtils
import org.apache.texera.dao.SqlServer
import com.typesafe.scalalogging.LazyLogging
import org.apache.commons.lang3.SystemUtils
import org.apache.texera.dao.jooq.generated.tables.daos.VirtualEnvironmentsDao
import org.apache.texera.dao.jooq.generated.tables.pojos.VirtualEnvironments
import org.jooq.JSONB
Expand All @@ -44,7 +46,7 @@ import org.jooq.JSONB
* /tmp/texera-pve/venvs/{cuid}/{pveName}/
*/

object PveManager {
object PveManager extends LazyLogging {

case class PvePackageResponse(
pveName: String,
Expand All @@ -67,8 +69,16 @@ object PveManager {
private def pveDir(cuid: Int, pveName: String): Path =
cuidDir(cuid, pveName).resolve("pve")

// Resolves the Python interpreter inside a venv. POSIX puts it at
// `<venv>/bin/python`; Windows puts it at `<venv>/Scripts/python.exe`.
private def venvPython(venvDir: Path): Path =
if (SystemUtils.IS_OS_WINDOWS)
venvDir.resolve("Scripts").resolve("python.exe")
else
venvDir.resolve("bin").resolve("python")

private def pythonBinPath(cuid: Int, pveName: String): Path =
pveDir(cuid, pveName).resolve("bin").resolve("python")
venvPython(pveDir(cuid, pveName))

/*
* Validates the PVE name and returns the Python binary path if it exists,
Expand Down Expand Up @@ -103,26 +113,95 @@ object PveManager {
}
}

private def getSystemPath(isLocal: Boolean): Path = {
Paths.get(
if (isLocal) "amber/system-requirements-lock.txt"
else "/tmp/system-requirements-lock.txt"
)
}
private def locateRequirementsTxt(): Option[Path] =
Comment thread
Yicong-Huang marked this conversation as resolved.
Seq(Paths.get("/tmp", "requirements.txt"), Paths.get("amber", "requirements.txt"))
.find(Files.exists(_))
Comment thread
SarahAsad23 marked this conversation as resolved.

// Resolves the fully-pinned system package set by installing requirements.txt
// into a throwaway venv and running `pip freeze`.
private def resolveSystemPackages(): Seq[String] = {
val requirementsPath = locateRequirementsTxt() match {
case Some(p) => p
case None =>
logger.error("requirements.txt not found; system package set will be empty")
Comment thread
SarahAsad23 marked this conversation as resolved.
return Seq.empty
}

def getSystemPackages(isLocal: Boolean): Seq[String] = {
if (!Files.exists(getSystemPath(isLocal))) {
Seq()
} else {
Files
.readAllLines(getSystemPath(isLocal))
.asScala
.map(_.trim)
.filter(line => line.nonEmpty && !line.startsWith("#"))
.toSeq
val tempVenv = Files.createTempDirectory("texera-system-venv-")
Comment thread
Yicong-Huang marked this conversation as resolved.
try {
val python = venvPython(tempVenv).toString
val createCode =
Process(Seq(PythonUtils.getPythonExecutable, "-m", "venv", tempVenv.toString)).!
if (createCode != 0) {
logger.error(s"failed to create temp venv for system-package resolution (exit=$createCode)")
return Seq.empty
}

val installCode = Process(
Seq(
python,
Comment thread
Yicong-Huang marked this conversation as resolved.
"-u",
"-m",
"pip",
"install",
"--progress-bar",
"off",
"--no-input",
"-r",
requirementsPath.toString
),
None,
pipEnv.toSeq: _*
).!
if (installCode != 0) {
logger.error(s"failed to install requirements into temp venv (exit=$installCode)")
return Seq.empty
}

val collected = scala.collection.mutable.ListBuffer[String]()
val freezeCode = Process(Seq(python, "-m", "pip", "freeze")).!(
ProcessLogger(line => collected += line, _ => ())
)
if (freezeCode != 0) {
logger.error(s"pip freeze failed (exit=$freezeCode)")
return Seq.empty
}
collected.toSeq.map(_.trim).filter(line => line.nonEmpty && !line.startsWith("#"))
} finally {
try {
val stream = Files.walk(tempVenv)
try stream
.sorted(Comparator.reverseOrder())
.iterator()
.asScala
.foreach(Files.deleteIfExists)
finally stream.close()
} catch {
case _: Throwable => ()
}
}
}

// Cached for the JVM lifetime. The system Python + requirements.txt don't
// change without an app restart, so resolving once is sufficient.
private lazy val systemPackages: Seq[String] = resolveSystemPackages()

// Normalised package names ("numpy", "pandas") — used to reject user
// attempts to install or delete system packages.
private lazy val systemPackageNames: Set[String] =
systemPackages.map(_.split("==")(0).trim.toLowerCase).toSet

// Materialised once: a file containing the frozen system requirements,
// passed as `pip install --constraint` so user installs respect system pins.
private lazy val systemConstraintFile: Path = {
val f = Files.createTempFile("texera-system-constraint-", ".txt")
Files.write(f, systemPackages.asJava)
f.toFile.deleteOnExit()
f
}

def getSystemPackages: Seq[String] = systemPackages

private def runPipInstall(
python: String,
args: Seq[String],
Expand Down Expand Up @@ -162,20 +241,15 @@ object PveManager {
def createNewPve(
cuid: Int,
queue: BlockingQueue[String],
pveName: String,
isLocal: Boolean
pveName: String
): Unit = {
queue.put(s"[PVE] Creating new PVE for cuid: $cuid with name: $pveName")

// NOTE: These paths are derived from computing-unit-master.dockerfile.
// If requirements.txt location changes, update these paths.
val requirementsPath =
if (isLocal) Paths.get("amber", "requirements.txt")
else Paths.get("/tmp", "requirements.txt")

if (!Files.exists(requirementsPath)) {
queue.put(s"[PVE][ERR] System requirements not found")
return
val requirementsPath = locateRequirementsTxt() match {
case Some(p) => p
case None =>
queue.put(s"[PVE][ERR] System requirements not found")
return
}

val venvDirPath = pveDir(cuid, pveName).toAbsolutePath
Expand Down Expand Up @@ -354,8 +428,7 @@ object PveManager {
packages: List[String],
cuid: Int,
queue: BlockingQueue[String],
pveName: String,
isLocal: Boolean
pveName: String
): Unit = {

val python = pythonBinPath(cuid, pveName).toAbsolutePath.toString
Expand All @@ -369,27 +442,14 @@ object PveManager {

var installedPackages = readPackageFile(metadataPath).toSet

val systemPackages =
if (Files.exists(getSystemPath(isLocal))) {
Files
.readAllLines(getSystemPath(isLocal))
.asScala
.map(_.trim)
.filter(line => line.nonEmpty && !line.startsWith("#"))
.map(line => line.split("==")(0).trim.toLowerCase)
.toSet
} else {
Set[String]()
}

packages.foreach { pkg =>
val trimmedPkg = pkg.trim

if (trimmedPkg.nonEmpty) {

val userPackageName = trimmedPkg.split("==")(0).trim.toLowerCase

if (systemPackages.contains(userPackageName)) {
if (systemPackageNames.contains(userPackageName)) {
queue.put(
s"[PVE][ERR] $trimmedPkg is a system package and cannot be installed or modified by the user."
)
Expand All @@ -401,8 +461,8 @@ object PveManager {
val code = runPipInstall(
python,
Seq(
"--constraint", // check against system-requirements-lock
getSystemPath(isLocal).toString,
"--constraint", // pin to the runtime-resolved system set
systemConstraintFile.toString,
Comment thread
SarahAsad23 marked this conversation as resolved.
trimmedPkg
),
queue
Expand Down Expand Up @@ -440,35 +500,21 @@ object PveManager {
def deletePackages(
cuid: Int,
packageName: String,
pveName: String,
isLocal: Boolean
pveName: String
): List[String] = {
val python = pythonBinPath(cuid, pveName).toAbsolutePath.toString
val metadataPath = cuidDir(cuid, pveName).resolve("user-packages.txt")

if (!Files.exists(Paths.get(python))) {
val msg = s"[PVE][ERR] Python executable not found for PVE: $python"
println(msg)
logger.error(msg)
return List(msg)
}

val trimmedPackageName = packageName.trim
val normalizedPackageName = trimmedPackageName.split("==")(0).trim.toLowerCase

val systemPackages =
if (Files.exists(getSystemPath(isLocal))) {
Files
.readAllLines(getSystemPath(isLocal))
.asScala
.map(_.trim)
.filter(line => line.nonEmpty && !line.startsWith("#"))
.map(line => line.split("==")(0).trim.toLowerCase)
.toSet
} else {
Set[String]()
}

if (systemPackages.contains(normalizedPackageName)) {
if (systemPackageNames.contains(normalizedPackageName)) {
return List(
s"[PVE][ERR] $trimmedPackageName is a system package and cannot be deleted."
)
Expand All @@ -494,11 +540,11 @@ object PveManager {
val exitCode = command.!(
ProcessLogger(
out => {
println(s"[pip] $out")
logger.info(s"[pip] $out")
output += s"[pip] $out"
},
err => {
System.err.println(s"[pip][ERR] $err")
logger.error(s"[pip][ERR] $err")
output += s"[pip][ERR] $err"
}
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import com.fasterxml.jackson.module.scala.DefaultScalaModule
import com.typesafe.scalalogging.LazyLogging
import io.dropwizard.auth.Auth
import org.apache.texera.auth.SessionUser
import org.apache.texera.common.config.KubernetesConfig
import org.jooq.exception.DataAccessException

import javax.ws.rs._
Expand Down Expand Up @@ -55,10 +54,9 @@ class PveResource extends LazyLogging {
@Path("/system")
@Produces(Array(MediaType.APPLICATION_JSON))
def getSystemPackages: util.Map[String, util.List[String]] = {
val isLocal = !KubernetesConfig.kubernetesComputingUnitEnabled
try {
val systemPkgs =
PveManager.getSystemPackages(isLocal).toList.asJava
PveManager.getSystemPackages.toList.asJava

Map("system" -> systemPkgs).asJava
} catch {
Expand Down Expand Up @@ -208,12 +206,10 @@ class PveResource extends LazyLogging {
@PathParam("pveName") pveName: String,
@PathParam("packageName") packageName: String
): Response = {
val isLocal = !KubernetesConfig.kubernetesComputingUnitEnabled
val messages = PveManager.deletePackages(
cuid,
packageName,
pveName,
isLocal
pveName
)

if (messages.exists(_.contains("[PVE][ERR]"))) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@

package org.apache.texera.web.resource.pythonvirtualenvironment

import org.apache.texera.common.config.KubernetesConfig

import javax.websocket._
import javax.websocket.server.ServerEndpoint
import java.util.concurrent.LinkedBlockingQueue
Expand All @@ -43,7 +41,6 @@ class PveWebsocketResource {

val cuid = params.get("cuid").get(0).toInt
val pveName = params.get("pveName").get(0)
val isLocal = !KubernetesConfig.kubernetesComputingUnitEnabled
val action = params.getOrDefault("action", java.util.List.of("create")).get(0)

val queue = new LinkedBlockingQueue[String]()
Expand All @@ -52,7 +49,7 @@ class PveWebsocketResource {
try {
action match {
case "create" =>
PveManager.createNewPve(cuid, queue, pveName, isLocal)
PveManager.createNewPve(cuid, queue, pveName)

case "install" =>
val packages =
Expand All @@ -66,7 +63,7 @@ class PveWebsocketResource {
.map(_.replace("\"", "").trim)
.filter(_.nonEmpty)

PveManager.installUserPackages(packages, cuid, queue, pveName, isLocal)
PveManager.installUserPackages(packages, cuid, queue, pveName)

case _ =>
queue.put(s"[ERR] Unknown action: $action")
Expand Down
Loading
Loading