Rip out Downloader's non-UI functionality into private core API (#6706)

This commit is contained in:
Jakub Kuczys
2026-03-29 22:25:04 +02:00
committed by GitHub
parent e2acec0862
commit ee1db01a2f
20 changed files with 1120 additions and 896 deletions

View File

View File

@@ -0,0 +1,415 @@
import asyncio
import pathlib
from collections import namedtuple
from typing import Any, NamedTuple
from pathlib import Path
import pytest
from pytest_mock import MockFixture
from redbot.pytest.downloader import *
from redbot.core._downloader.repo_manager import Installable
from redbot.core._downloader.repo_manager import Candidate, ProcessFormatter, RepoManager, Repo
from redbot.core._downloader.errors import (
AmbiguousRevision,
ExistingGitRepo,
GitException,
UnknownRevision,
)
class FakeCompletedProcess(NamedTuple):
returncode: int
stdout: bytes = b""
stderr: bytes = b""
def _mock_run(
mocker: MockFixture, repo: Repo, returncode: int, stdout: bytes = b"", stderr: bytes = b""
):
return mocker.patch.object(
repo, "_run", autospec=True, return_value=FakeCompletedProcess(returncode, stdout, stderr)
)
def _mock_setup_repo(mocker: MockFixture, repo: Repo, commit: str):
def update_commit(*args, **kwargs):
repo.commit = commit
return mocker.DEFAULT
return mocker.patch.object(
repo, "_setup_repo", autospec=True, side_effect=update_commit, return_value=None
)
def test_existing_git_repo(tmp_path):
repo_folder = tmp_path / "repos" / "squid" / ".git"
repo_folder.mkdir(parents=True, exist_ok=True)
r = Repo(
url="https://github.com/tekulvw/Squid-Plugins",
name="squid",
branch="rewrite_cogs",
commit="6acb5decbb717932e5dc0cda7fca0eff452c47dd",
folder_path=repo_folder.parent,
)
exists, git_path = r._existing_git_repo()
assert exists is True
assert git_path == repo_folder
ancestor_rev = "c950fc05a540dd76b944719c2a3302da2e2f3090"
descendant_rev = "fb99eb7d2d5bed514efc98fe6686b368f8425745"
@pytest.mark.parametrize(
"maybe_ancestor_rev,descendant_rev,returncode,expected",
[(ancestor_rev, descendant_rev, 0, True), (descendant_rev, ancestor_rev, 1, False)],
)
async def test_is_ancestor(mocker, repo, maybe_ancestor_rev, descendant_rev, returncode, expected):
m = _mock_run(mocker, repo, returncode)
ret = await repo.is_ancestor(maybe_ancestor_rev, descendant_rev)
m.assert_called_once_with(
ProcessFormatter().format(
repo.GIT_IS_ANCESTOR,
path=repo.folder_path,
maybe_ancestor_rev=maybe_ancestor_rev,
descendant_rev=descendant_rev,
),
valid_exit_codes=(0, 1),
debug_only=True,
)
assert ret is expected
async def test_is_ancestor_object_raise(mocker, repo):
m = _mock_run(mocker, repo, 128, b"", b"fatal: Not a valid object name invalid1")
with pytest.raises(UnknownRevision):
await repo.is_ancestor("invalid1", "invalid2")
m.assert_called_once_with(
ProcessFormatter().format(
repo.GIT_IS_ANCESTOR,
path=repo.folder_path,
maybe_ancestor_rev="invalid1",
descendant_rev="invalid2",
),
valid_exit_codes=(0, 1),
debug_only=True,
)
async def test_is_ancestor_commit_raise(mocker, repo):
m = _mock_run(
mocker,
repo,
128,
b"",
b"fatal: Not a valid commit name 0123456789abcde0123456789abcde0123456789",
)
with pytest.raises(UnknownRevision):
await repo.is_ancestor(
"0123456789abcde0123456789abcde0123456789", "c950fc05a540dd76b944719c2a3302da2e2f3090"
)
m.assert_called_once_with(
ProcessFormatter().format(
repo.GIT_IS_ANCESTOR,
path=repo.folder_path,
maybe_ancestor_rev="0123456789abcde0123456789abcde0123456789",
descendant_rev="c950fc05a540dd76b944719c2a3302da2e2f3090",
),
valid_exit_codes=(0, 1),
debug_only=True,
)
async def test_get_file_update_statuses(mocker, repo):
old_rev = "c950fc05a540dd76b944719c2a3302da2e2f3090"
new_rev = "fb99eb7d2d5bed514efc98fe6686b368f8425745"
m = _mock_run(
mocker,
repo,
0,
b"A\x00added_file.txt\x00\t"
b"M\x00mycog/__init__.py\x00\t"
b"D\x00sample_file1.txt\x00\t"
b"D\x00sample_file2.txt\x00\t"
b"A\x00sample_file3.txt",
)
ret = await repo._get_file_update_statuses(old_rev, new_rev)
m.assert_called_once_with(
ProcessFormatter().format(
repo.GIT_DIFF_FILE_STATUS, path=repo.folder_path, old_rev=old_rev, new_rev=new_rev
)
)
assert ret == {
"added_file.txt": "A",
"mycog/__init__.py": "M",
"sample_file1.txt": "D",
"sample_file2.txt": "D",
"sample_file3.txt": "A",
}
async def test_is_module_modified(mocker, repo):
old_rev = "c950fc05a540dd76b944719c2a3302da2e2f3090"
new_rev = "fb99eb7d2d5bed514efc98fe6686b368f8425745"
FakeInstallable = namedtuple("Installable", "name commit")
module = FakeInstallable("mycog", new_rev)
m = mocker.patch.object(
repo,
"_get_file_update_statuses",
autospec=True,
return_value={
"added_file.txt": "A",
"mycog/__init__.py": "M",
"sample_file1.txt": "D",
"sample_file2.txt": "D",
"sample_file3.txt": "A",
},
)
ret = await repo._is_module_modified(module, old_rev)
m.assert_called_once_with(old_rev, new_rev)
assert ret is True
async def test_get_full_sha1_success(mocker, repo):
commit = "c950fc05a540dd76b944719c2a3302da2e2f3090"
m = _mock_run(mocker, repo, 0, commit.encode())
ret = await repo.get_full_sha1(commit)
m.assert_called_once_with(
ProcessFormatter().format(repo.GIT_GET_FULL_SHA1, path=repo.folder_path, rev=commit)
)
assert ret == commit
async def test_get_full_sha1_notfound(mocker, repo):
m = _mock_run(mocker, repo, 128, b"", b"fatal: Needed a single revision")
with pytest.raises(UnknownRevision):
await repo.get_full_sha1("invalid")
m.assert_called_once_with(
ProcessFormatter().format(repo.GIT_GET_FULL_SHA1, path=repo.folder_path, rev="invalid")
)
async def test_get_full_sha1_ambiguous(mocker, repo):
m = _mock_run(
mocker,
repo,
128,
b"",
b"error: short SHA1 c6f0 is ambiguous\n"
b"hint: The candidates are:\n"
b"hint: c6f028f tag ambiguous_tag_66387\n"
b"hint: c6f0e5e commit 2019-10-24 - Commit ambiguous with tag.\n"
b"fatal: Needed a single revision",
)
with pytest.raises(AmbiguousRevision) as exc_info:
await repo.get_full_sha1("c6f0")
m.assert_called_once_with(
ProcessFormatter().format(repo.GIT_GET_FULL_SHA1, path=repo.folder_path, rev="c6f0")
)
assert exc_info.value.candidates == [
Candidate("c6f028f", "tag", "ambiguous_tag_66387"),
Candidate("c6f0e5e", "commit", "2019-10-24 - Commit ambiguous with tag."),
]
def test_update_available_modules(repo):
module = repo.folder_path / "mycog" / "__init__.py"
submodule = module.parent / "submodule" / "__init__.py"
module.parent.mkdir(parents=True)
module.touch()
submodule.parent.mkdir()
submodule.touch()
ret = repo._update_available_modules()
assert (
ret
== repo.available_modules
== (Installable(location=module.parent, repo=repo, commit=repo.commit),)
)
async def test_checkout(mocker, repo):
commit = "c950fc05a540dd76b944719c2a3302da2e2f3090"
m = _mock_run(mocker, repo, 0)
_mock_setup_repo(mocker, repo, commit)
git_path = repo.folder_path / ".git"
git_path.mkdir()
await repo._checkout(commit)
assert repo.commit == commit
m.assert_called_once_with(
ProcessFormatter().format(repo.GIT_CHECKOUT, path=repo.folder_path, rev=commit)
)
async def test_checkout_ctx_manager(mocker, repo):
commit = "c950fc05a540dd76b944719c2a3302da2e2f3090"
m = mocker.patch.object(repo, "_checkout", autospec=True, return_value=None)
old_commit = repo.commit
async with repo.checkout(commit):
m.assert_called_with(commit, force_checkout=False)
m.return_value = None
m.assert_called_with(old_commit, force_checkout=False)
async def test_checkout_await(mocker, repo):
commit = "c950fc05a540dd76b944719c2a3302da2e2f3090"
m = mocker.patch.object(repo, "_checkout", autospec=True, return_value=None)
await repo.checkout(commit)
m.assert_called_once_with(commit, force_checkout=False)
async def test_clone_with_branch(mocker, repo):
branch = repo.branch = "dont_add_commits"
commit = "a0ccc2390883c85a361f5a90c72e1b07958939fa"
repo.commit = ""
m = _mock_run(mocker, repo, 0)
_mock_setup_repo(mocker, repo, commit)
await repo.clone()
assert repo.commit == commit
m.assert_called_once_with(
ProcessFormatter().format(
repo.GIT_CLONE, branch=branch, url=repo.url, folder=repo.folder_path
)
)
async def test_clone_without_branch(mocker, repo):
branch = "dont_add_commits"
commit = "a0ccc2390883c85a361f5a90c72e1b07958939fa"
repo.branch = None
repo.commit = ""
m = _mock_run(mocker, repo, 0)
_mock_setup_repo(mocker, repo, commit)
mocker.patch.object(repo, "current_branch", autospec=True, return_value=branch)
await repo.clone()
assert repo.commit == commit
m.assert_called_once_with(
ProcessFormatter().format(repo.GIT_CLONE_NO_BRANCH, url=repo.url, folder=repo.folder_path)
)
async def test_update(mocker, repo):
old_commit = repo.commit
new_commit = "a0ccc2390883c85a361f5a90c72e1b07958939fa"
m = _mock_run(mocker, repo, 0)
_mock_setup_repo(mocker, repo, new_commit)
mocker.patch.object(repo, "latest_commit", autospec=True, return_value=old_commit)
mocker.patch.object(repo, "hard_reset", autospec=True, return_value=None)
ret = await repo.update()
assert ret == (old_commit, new_commit)
m.assert_called_once_with(ProcessFormatter().format(repo.GIT_PULL, path=repo.folder_path))
# old tests
async def test_add_repo(monkeypatch, repo_manager):
monkeypatch.setattr("redbot.core._downloader.repo_manager.Repo._run", fake_run_noprint)
monkeypatch.setattr(
"redbot.core._downloader.repo_manager.Repo.current_commit", fake_current_commit
)
squid = await repo_manager.add_repo(
url="https://github.com/tekulvw/Squid-Plugins", name="squid", branch="rewrite_cogs"
)
assert squid.available_modules == ()
async def test_lib_install_requirements(monkeypatch, library_installable, repo, tmpdir):
monkeypatch.setattr("redbot.core._downloader.repo_manager.Repo._run", fake_run_noprint)
monkeypatch.setattr(
"redbot.core._downloader.repo_manager.Repo.available_libraries", (library_installable,)
)
lib_path = Path(str(tmpdir)) / "cog_data_path" / "lib"
sharedlib_path = lib_path / "cog_shared"
sharedlib_path.mkdir(parents=True, exist_ok=True)
installed, failed = await repo.install_libraries(
target_dir=sharedlib_path, req_target_dir=lib_path
)
assert len(installed) == 1
assert len(failed) == 0
async def test_remove_repo(monkeypatch, repo_manager):
monkeypatch.setattr("redbot.core._downloader.repo_manager.Repo._run", fake_run_noprint)
monkeypatch.setattr(
"redbot.core._downloader.repo_manager.Repo.current_commit", fake_current_commit
)
await repo_manager.add_repo(
url="https://github.com/tekulvw/Squid-Plugins", name="squid", branch="rewrite_cogs"
)
assert repo_manager.get_repo("squid") is not None
await repo_manager.delete_repo("squid")
assert repo_manager.get_repo("squid") is None
async def test_existing_repo(mocker, repo_manager):
repo_manager.does_repo_exist = mocker.MagicMock(return_value=True)
with pytest.raises(ExistingGitRepo):
await repo_manager.add_repo("http://test.com", "test")
repo_manager.does_repo_exist.assert_called_once_with("test")
def test_tree_url_parse(repo_manager):
cases = [
{
"input": ("https://github.com/Tobotimus/Tobo-Cogs", None),
"expected": ("https://github.com/Tobotimus/Tobo-Cogs", None),
},
{
"input": ("https://github.com/Tobotimus/Tobo-Cogs", "V3"),
"expected": ("https://github.com/Tobotimus/Tobo-Cogs", "V3"),
},
{
"input": ("https://github.com/Tobotimus/Tobo-Cogs/tree/V3", None),
"expected": ("https://github.com/Tobotimus/Tobo-Cogs", "V3"),
},
{
"input": ("https://github.com/Tobotimus/Tobo-Cogs/tree/V3", "V4"),
"expected": ("https://github.com/Tobotimus/Tobo-Cogs", "V4"),
},
]
for test_case in cases:
assert test_case["expected"] == repo_manager._parse_url(*test_case["input"])
def test_tree_url_non_github(repo_manager):
cases = [
{
"input": ("https://gitlab.com/Tobotimus/Tobo-Cogs", None),
"expected": ("https://gitlab.com/Tobotimus/Tobo-Cogs", None),
},
{
"input": ("https://my.usgs.gov/bitbucket/scm/Tobotimus/Tobo-Cogs", "V3"),
"expected": ("https://my.usgs.gov/bitbucket/scm/Tobotimus/Tobo-Cogs", "V3"),
},
]
for test_case in cases:
assert test_case["expected"] == repo_manager._parse_url(*test_case["input"])

View File

@@ -0,0 +1,529 @@
from pathlib import Path
import subprocess as sp
import pytest
from redbot.core._downloader.repo_manager import ProcessFormatter, Repo
from redbot.pytest.downloader import (
GIT_VERSION,
cloned_git_repo,
git_repo,
git_repo_with_remote,
_session_git_repo,
)
async def test_git_clone_nobranch(git_repo, tmp_path):
p = await git_repo._run(
ProcessFormatter().format(
git_repo.GIT_CLONE_NO_BRANCH,
url=git_repo.folder_path,
folder=tmp_path / "cloned_repo_test",
)
)
assert p.returncode == 0
async def test_git_clone_branch(git_repo, tmp_path):
p = await git_repo._run(
ProcessFormatter().format(
git_repo.GIT_CLONE,
branch="master",
url=git_repo.folder_path,
folder=tmp_path / "cloned_repo_test",
)
)
assert p.returncode == 0
async def test_git_clone_non_existent_branch(git_repo, tmp_path):
p = await git_repo._run(
ProcessFormatter().format(
git_repo.GIT_CLONE,
branch="non-existent-branch",
url=git_repo.folder_path,
folder=tmp_path / "cloned_repo_test",
)
)
assert p.returncode == 128
async def test_git_clone_notgit_repo(git_repo, tmp_path):
notgit_repo = tmp_path / "test_clone_folder"
p = await git_repo._run(
ProcessFormatter().format(
git_repo.GIT_CLONE, branch=None, url=notgit_repo, folder=tmp_path / "cloned_repo_test"
)
)
assert p.returncode == 128
async def test_git_current_branch_master(git_repo):
p = await git_repo._run(
ProcessFormatter().format(git_repo.GIT_CURRENT_BRANCH, path=git_repo.folder_path)
)
assert p.returncode == 0
assert p.stdout.decode().strip() == "master"
async def test_git_current_branch_detached(git_repo):
await git_repo._run(
ProcessFormatter().format(
git_repo.GIT_CHECKOUT,
path=git_repo.folder_path,
rev="c950fc05a540dd76b944719c2a3302da2e2f3090",
)
)
p = await git_repo._run(
ProcessFormatter().format(git_repo.GIT_CURRENT_BRANCH, path=git_repo.folder_path)
)
assert p.returncode == 128
assert p.stderr.decode().strip() == "fatal: ref HEAD is not a symbolic ref"
async def test_git_current_commit_on_branch(git_repo):
# HEAD on dont_add_commits (a0ccc2390883c85a361f5a90c72e1b07958939fa)
# setup
p = await git_repo._run(
ProcessFormatter().format(
git_repo.GIT_CHECKOUT, path=git_repo.folder_path, rev="dont_add_commits"
)
)
assert p.returncode == 0
p = await git_repo._run(
ProcessFormatter().format(git_repo.GIT_CURRENT_COMMIT, path=git_repo.folder_path)
)
assert p.returncode == 0
assert p.stdout.decode().strip() == "a0ccc2390883c85a361f5a90c72e1b07958939fa"
async def test_git_current_commit_detached(git_repo):
# detached HEAD state (c950fc05a540dd76b944719c2a3302da2e2f3090)
await git_repo._run(
ProcessFormatter().format(
git_repo.GIT_CHECKOUT,
path=git_repo.folder_path,
rev="c950fc05a540dd76b944719c2a3302da2e2f3090",
)
)
p = await git_repo._run(
ProcessFormatter().format(git_repo.GIT_CURRENT_COMMIT, path=git_repo.folder_path)
)
assert p.returncode == 0
assert p.stdout.decode().strip() == "c950fc05a540dd76b944719c2a3302da2e2f3090"
async def test_git_latest_commit(git_repo):
# HEAD on dont_add_commits (a0ccc2390883c85a361f5a90c72e1b07958939fa)
p = await git_repo._run(
ProcessFormatter().format(
git_repo.GIT_LATEST_COMMIT, path=git_repo.folder_path, branch="dont_add_commits"
)
)
assert p.returncode == 0
assert p.stdout.decode().strip() == "a0ccc2390883c85a361f5a90c72e1b07958939fa"
async def test_git_hard_reset(cloned_git_repo, tmp_path):
staged_file = cloned_git_repo.folder_path / "staged_file.txt"
staged_file.touch()
git_dirparams = ("git", "-C", str(cloned_git_repo.folder_path))
sp.run((*git_dirparams, "add", "staged_file.txt"), check=True)
assert staged_file.exists() is True
p = await cloned_git_repo._run(
ProcessFormatter().format(
cloned_git_repo.GIT_HARD_RESET, path=cloned_git_repo.folder_path, branch="master"
)
)
assert p.returncode == 0
assert staged_file.exists() is False
async def test_git_pull(git_repo_with_remote, tmp_path):
# setup
staged_file = Path(git_repo_with_remote.url) / "staged_file.txt"
staged_file.touch()
git_dirparams = ("git", "-C", git_repo_with_remote.url)
sp.run((*git_dirparams, "add", "staged_file.txt"), check=True)
sp.run(
(*git_dirparams, "commit", "-m", "test commit", "--no-gpg-sign", "--no-verify"), check=True
)
assert not (git_repo_with_remote.folder_path / "staged_file.txt").exists()
p = await git_repo_with_remote._run(
ProcessFormatter().format(
git_repo_with_remote.GIT_PULL, path=git_repo_with_remote.folder_path
)
)
assert p.returncode == 0
assert (git_repo_with_remote.folder_path / "staged_file.txt").exists()
async def test_git_diff_file_status(git_repo):
p = await git_repo._run(
ProcessFormatter().format(
git_repo.GIT_DIFF_FILE_STATUS,
path=git_repo.folder_path,
old_rev="c950fc05a540dd76b944719c2a3302da2e2f3090",
new_rev="fb99eb7d2d5bed514efc98fe6686b368f8425745",
)
)
assert p.returncode == 0
stdout = p.stdout.strip(b"\t\n\x00 ").decode()
assert stdout == (
"A\x00added_file.txt\x00\t"
"M\x00mycog/__init__.py\x00\t"
"D\x00sample_file1.txt\x00\t"
"D\x00sample_file2.txt\x00\t"
"A\x00sample_file3.txt"
)
# might need to add test for test_git_log, but it's unused method currently
async def test_git_discover_remote_url(cloned_git_repo, tmp_path):
p = await cloned_git_repo._run(
ProcessFormatter().format(
cloned_git_repo.GIT_DISCOVER_REMOTE_URL, path=cloned_git_repo.folder_path
)
)
assert p.returncode == 0
assert p.stdout.decode().strip() == cloned_git_repo.url
async def test_git_checkout_detached_head(git_repo):
p = await git_repo._run(
ProcessFormatter().format(
git_repo.GIT_CHECKOUT,
path=git_repo.folder_path,
rev="c950fc05a540dd76b944719c2a3302da2e2f3090",
)
)
assert p.returncode == 0
async def test_git_checkout_branch(git_repo):
p = await git_repo._run(
ProcessFormatter().format(
git_repo.GIT_CHECKOUT, path=git_repo.folder_path, rev="dont_add_commits"
)
)
assert p.returncode == 0
async def test_git_checkout_non_existent_branch(git_repo):
p = await git_repo._run(
ProcessFormatter().format(
git_repo.GIT_CHECKOUT, path=git_repo.folder_path, rev="non-existent-branch"
)
)
assert p.returncode == 1
async def test_git_get_full_sha1_from_branch_name(git_repo):
p = await git_repo._run(
ProcessFormatter().format(
git_repo.GIT_GET_FULL_SHA1, path=git_repo.folder_path, rev="dont_add_commits"
)
)
assert p.returncode == 0
assert p.stdout.decode().strip() == "a0ccc2390883c85a361f5a90c72e1b07958939fa"
async def test_git_get_full_sha1_from_full_hash(git_repo):
p = await git_repo._run(
ProcessFormatter().format(
git_repo.GIT_GET_FULL_SHA1,
path=git_repo.folder_path,
rev="c950fc05a540dd76b944719c2a3302da2e2f3090",
)
)
assert p.returncode == 0
assert p.stdout.decode().strip() == "c950fc05a540dd76b944719c2a3302da2e2f3090"
async def test_git_get_full_sha1_from_short_hash(git_repo):
p = await git_repo._run(
ProcessFormatter().format(
git_repo.GIT_GET_FULL_SHA1, path=git_repo.folder_path, rev="c950"
)
)
assert p.returncode == 0
assert p.stdout.decode().strip() == "c950fc05a540dd76b944719c2a3302da2e2f3090"
async def test_git_get_full_sha1_from_too_short_hash(git_repo):
p = await git_repo._run(
ProcessFormatter().format(git_repo.GIT_GET_FULL_SHA1, path=git_repo.folder_path, rev="c95")
)
assert p.returncode == 128
assert p.stderr.decode().strip() == "fatal: Needed a single revision"
async def test_git_get_full_sha1_from_lightweight_tag(git_repo):
p = await git_repo._run(
ProcessFormatter().format(
git_repo.GIT_GET_FULL_SHA1, path=git_repo.folder_path, rev="lightweight"
)
)
assert p.returncode == 0
assert p.stdout.decode().strip() == "fb99eb7d2d5bed514efc98fe6686b368f8425745"
async def test_git_get_full_sha1_from_annotated_tag(git_repo):
p = await git_repo._run(
ProcessFormatter().format(
git_repo.GIT_GET_FULL_SHA1, path=git_repo.folder_path, rev="annotated"
)
)
assert p.returncode == 0
assert p.stdout.decode().strip() == "a7120330cc179396914e0d6af80cfa282adc124b"
async def test_git_get_full_sha1_from_invalid_ref(git_repo):
p = await git_repo._run(
ProcessFormatter().format(
git_repo.GIT_GET_FULL_SHA1, path=git_repo.folder_path, rev="invalid"
)
)
assert p.returncode == 128
assert p.stderr.decode().strip() == "fatal: Needed a single revision"
@pytest.mark.skipif(
GIT_VERSION < (2, 31), reason="This is test for output from Git 2.31 and newer."
)
async def test_git_get_full_sha1_from_ambiguous_commits(git_repo):
# 2 ambiguous refs:
# branch ambiguous_1 - 95da0b576271cb5bee5f3e075074c03ee05fed05
# branch ambiguous_2 - 95da0b57a416d9c8ce950554228d1fc195c30b43
p = await git_repo._run(
ProcessFormatter().format(
git_repo.GIT_GET_FULL_SHA1, path=git_repo.folder_path, rev="95da0b57"
)
)
assert p.returncode == 128
assert p.stderr.decode().strip() == (
"error: short object ID 95da0b57 is ambiguous\n"
"hint: The candidates are:\n"
"hint: 95da0b576 commit 2019-10-22 - Ambiguous commit 16955\n"
"hint: 95da0b57a commit 2019-10-22 - Ambiguous commit 44414\n"
"fatal: Needed a single revision"
)
@pytest.mark.skipif(
GIT_VERSION < (2, 36), reason="This is test for output from Git 2.36 and newer."
)
async def test_git_get_full_sha1_from_ambiguous_tag_and_commit(git_repo):
# 2 ambiguous refs:
# branch ambiguous_with_tag - c6f0e5ec04d99bdf8c6c78ff20d66d286eecb3ea
# tag ambiguous_tag_66387 - c6f0e5ec04d99bdf8c6c78ff20d66d286eecb3ea
p = await git_repo._run(
ProcessFormatter().format(
git_repo.GIT_GET_FULL_SHA1, path=git_repo.folder_path, rev="c6f0"
)
)
assert p.returncode == 128
assert p.stderr.decode().strip() == (
"error: short object ID c6f0 is ambiguous\n"
"hint: The candidates are:\n"
"hint: c6f028f tag 2019-10-24 - ambiguous_tag_66387\n"
"hint: c6f0e5e commit 2019-10-24 - Commit ambiguous with tag.\n"
"fatal: Needed a single revision"
)
@pytest.mark.skipif(
not ((2, 31) <= GIT_VERSION < (2, 36)), reason="This is test for output from Git >=2.31,<2.36."
)
async def test_git_get_full_sha1_from_ambiguous_tag_and_commit_pre_2_36(git_repo):
# 2 ambiguous refs:
# branch ambiguous_with_tag - c6f0e5ec04d99bdf8c6c78ff20d66d286eecb3ea
# tag ambiguous_tag_66387 - c6f0e5ec04d99bdf8c6c78ff20d66d286eecb3ea
p = await git_repo._run(
ProcessFormatter().format(
git_repo.GIT_GET_FULL_SHA1, path=git_repo.folder_path, rev="c6f0"
)
)
assert p.returncode == 128
assert p.stderr.decode().strip() == (
"error: short object ID c6f0 is ambiguous\n"
"hint: The candidates are:\n"
"hint: c6f028f tag ambiguous_tag_66387\n"
"hint: c6f0e5e commit 2019-10-24 - Commit ambiguous with tag.\n"
"fatal: Needed a single revision"
)
@pytest.mark.skipif(
GIT_VERSION >= (2, 31), reason="This is test for output from Git older than 2.31."
)
async def test_git_get_full_sha1_from_ambiguous_commits_pre_2_31(git_repo):
# 2 ambiguous refs:
# branch ambiguous_1 - 95da0b576271cb5bee5f3e075074c03ee05fed05
# branch ambiguous_2 - 95da0b57a416d9c8ce950554228d1fc195c30b43
p = await git_repo._run(
ProcessFormatter().format(
git_repo.GIT_GET_FULL_SHA1, path=git_repo.folder_path, rev="95da0b57"
)
)
assert p.returncode == 128
assert p.stderr.decode().strip() == (
"error: short SHA1 95da0b57 is ambiguous\n"
"hint: The candidates are:\n"
"hint: 95da0b576 commit 2019-10-22 - Ambiguous commit 16955\n"
"hint: 95da0b57a commit 2019-10-22 - Ambiguous commit 44414\n"
"fatal: Needed a single revision"
)
@pytest.mark.skipif(
GIT_VERSION >= (2, 31), reason="This is test for output from Git older than 2.31."
)
async def test_git_get_full_sha1_from_ambiguous_tag_and_commit_pre_2_31(git_repo):
# 2 ambiguous refs:
# branch ambiguous_with_tag - c6f0e5ec04d99bdf8c6c78ff20d66d286eecb3ea
# tag ambiguous_tag_66387 - c6f0e5ec04d99bdf8c6c78ff20d66d286eecb3ea
p = await git_repo._run(
ProcessFormatter().format(
git_repo.GIT_GET_FULL_SHA1, path=git_repo.folder_path, rev="c6f0"
)
)
assert p.returncode == 128
assert p.stderr.decode().strip() == (
"error: short SHA1 c6f0 is ambiguous\n"
"hint: The candidates are:\n"
"hint: c6f028f tag ambiguous_tag_66387\n"
"hint: c6f0e5e commit 2019-10-24 - Commit ambiguous with tag.\n"
"fatal: Needed a single revision"
)
async def test_git_is_ancestor_true(git_repo):
p = await git_repo._run(
ProcessFormatter().format(
git_repo.GIT_IS_ANCESTOR,
path=git_repo.folder_path,
maybe_ancestor_rev="c950fc05a540dd76b944719c2a3302da2e2f3090",
descendant_rev="fb99eb7d2d5bed514efc98fe6686b368f8425745",
)
)
assert p.returncode == 0
async def test_git_is_ancestor_false(git_repo):
p = await git_repo._run(
ProcessFormatter().format(
git_repo.GIT_IS_ANCESTOR,
path=git_repo.folder_path,
maybe_ancestor_rev="fb99eb7d2d5bed514efc98fe6686b368f8425745",
descendant_rev="c950fc05a540dd76b944719c2a3302da2e2f3090",
)
)
assert p.returncode == 1
async def test_git_is_ancestor_invalid_object(git_repo):
p = await git_repo._run(
ProcessFormatter().format(
git_repo.GIT_IS_ANCESTOR,
path=git_repo.folder_path,
maybe_ancestor_rev="invalid1",
descendant_rev="invalid2",
)
)
assert p.returncode == 128
assert p.stderr.decode().strip() == "fatal: Not a valid object name invalid1"
async def test_git_is_ancestor_invalid_commit(git_repo):
p = await git_repo._run(
ProcessFormatter().format(
git_repo.GIT_IS_ANCESTOR,
path=git_repo.folder_path,
maybe_ancestor_rev="0123456789abcde0123456789abcde0123456789",
descendant_rev="c950fc05a540dd76b944719c2a3302da2e2f3090",
)
)
assert p.returncode == 128
assert p.stderr.decode().strip() == (
"fatal: Not a valid commit name 0123456789abcde0123456789abcde0123456789"
)
async def test_git_check_if_module_exists_true(git_repo):
p = await git_repo._run(
ProcessFormatter().format(
git_repo.GIT_CHECK_IF_MODULE_EXISTS,
path=git_repo.folder_path,
rev="fb99eb7d2d5bed514efc98fe6686b368f8425745",
module_name="mycog",
)
)
assert p.returncode == 0
@pytest.mark.skipif(
GIT_VERSION < (2, 36), reason="This is test for output from Git 2.36 and newer."
)
async def test_git_check_if_module_exists_false(git_repo):
p = await git_repo._run(
ProcessFormatter().format(
git_repo.GIT_CHECK_IF_MODULE_EXISTS,
path=git_repo.folder_path,
rev="a7120330cc179396914e0d6af80cfa282adc124b",
module_name="mycog",
)
)
assert p.returncode == 128
assert p.stderr.decode().strip() == (
"fatal: path 'mycog/__init__.py' does not exist in 'a7120330cc179396914e0d6af80cfa282adc124b'"
)
@pytest.mark.skipif(
GIT_VERSION >= (2, 36), reason="This is test for output from Git older than 2.31."
)
async def test_git_check_if_module_exists_false_pre_2_36(git_repo):
p = await git_repo._run(
ProcessFormatter().format(
git_repo.GIT_CHECK_IF_MODULE_EXISTS,
path=git_repo.folder_path,
rev="a7120330cc179396914e0d6af80cfa282adc124b",
module_name="mycog",
)
)
assert p.returncode == 128
assert p.stderr.decode().strip() == (
"fatal: Not a valid object name a7120330cc179396914e0d6af80cfa282adc124b:mycog/__init__.py"
)
async def test_git_find_last_occurrence_existent(git_repo):
p = await git_repo._run(
ProcessFormatter().format(
git_repo.GIT_GET_LAST_MODULE_OCCURRENCE_COMMIT,
path=git_repo.folder_path,
descendant_rev="2db662c1d341b1db7d225ccc1af4019ba5228c70",
module_name="mycog",
)
)
assert p.returncode == 0
# the command gives a commit after last occurrence
assert p.stdout.decode().strip() == "a7120330cc179396914e0d6af80cfa282adc124b"
async def test_git_find_last_occurrence_non_existent(git_repo):
p = await git_repo._run(
ProcessFormatter().format(
git_repo.GIT_GET_LAST_MODULE_OCCURRENCE_COMMIT,
path=git_repo.folder_path,
descendant_rev="c950fc05a540dd76b944719c2a3302da2e2f3090",
module_name="mycog",
)
)
assert p.returncode == 0
assert p.stdout.decode().strip() == ""

View File

@@ -0,0 +1,58 @@
import json
from pathlib import Path
import pytest
from redbot.pytest.downloader import *
from redbot.core._downloader.installable import Installable, InstallableType
from redbot.core import VersionInfo
def test_process_info_file(installable):
for k, v in INFO_JSON.items():
if k == "type":
assert installable.type is InstallableType.COG
elif k in ("min_bot_version", "max_bot_version"):
assert getattr(installable, k) == VersionInfo.from_str(v)
else:
assert getattr(installable, k) == v
def test_process_lib_info_file(library_installable):
for k, v in LIBRARY_INFO_JSON.items():
if k == "type":
assert library_installable.type is InstallableType.SHARED_LIBRARY
elif k in ("min_bot_version", "max_bot_version"):
assert getattr(library_installable, k) == VersionInfo.from_str(v)
elif k == "hidden":
# libraries are always hidden, even if False
assert library_installable.hidden is True
else:
assert getattr(library_installable, k) == v
# noinspection PyProtectedMember
def test_location_is_dir(installable):
assert installable._location.exists()
assert installable._location.is_dir()
# noinspection PyProtectedMember
def test_info_file_is_file(installable):
assert installable._info_file.exists()
assert installable._info_file.is_file()
def test_name(installable):
assert installable.name == "test_cog"
def test_repo_name(installable):
assert installable.repo_name == "test_repo"
def test_serialization(installed_cog):
data = installed_cog.to_json()
cog_name = data["module_name"]
assert cog_name == "test_installed_cog"