Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Questions about threads #550

Closed
YueWu0301 opened this issue Apr 19, 2024 · 8 comments
Closed

Questions about threads #550

YueWu0301 opened this issue Apr 19, 2024 · 8 comments

Comments

@YueWu0301
Copy link

After I successfully ran the pipeline once, I can no longer reproduce my code, even if I changed my name, entered data and related parameters, and reported the following error. What may be the cause?

EOFError
Exception in thread Thread-1 (_monitor):
Traceback (most recent call last):
  File "/root/miniconda3/envs/datagen/lib/python3.10/threading.py", line 1016, in _bootstrap_inner
    self.run()
  File "/root/miniconda3/envs/datagen/lib/python3.10/threading.py", line 953, in run
    self._target(*self._args, **self._kwargs)
  File "/root/miniconda3/envs/datagen/lib/python3.10/logging/handlers.py", line 1556, in _monitor
    record = self.dequeue(True)
  File "/root/miniconda3/envs/datagen/lib/python3.10/logging/handlers.py", line 1505, in dequeue
    return self.queue.get(block)
  File "/root/miniconda3/envs/datagen/lib/python3.10/multiprocessing/queues.py", line 103, in get
    res = self._recv_bytes()
  File "/root/miniconda3/envs/datagen/lib/python3.10/multiprocessing/connection.py", line 216, in recv_bytes
    buf = self._recv_bytes(maxlength)
  File "/root/miniconda3/envs/datagen/lib/python3.10/multiprocessing/connection.py", line 414, in _recv_bytes
    buf = self._recv(4)
  File "/root/miniconda3/envs/datagen/lib/python3.10/multiprocessing/connection.py", line 383, in _recv
    raise EOFError
EOFError
/root/miniconda3/envs/datagen/lib/python3.10/multiprocessing/resource_tracker.py:224: UserWarning: resource_tracker: There appear to be 3 leaked semaphore objects to clean up at shutdown
  warnings.warn('resource_tracker: There appear to be %d '
@gabrielmbmb
Copy link
Member

Hi @YueWu0301, could you share the code of your pipeline?

@tridungduong-unsw
Copy link

Hi @gabrielmbmb, I face the same problem. The code is as follows:

import json
import os
import pdb

import openai
from distilabel.llms import AzureOpenAILLM, OpenAILLM, vLLM
from distilabel.llms.mistral import MistralLLM
from distilabel.pipeline import Pipeline
from distilabel.steps import (
    CombineColumns,
    KeepColumns,
    LoadDataFromDicts,
    LoadHubDataset,
    PreferenceToArgilla,
    TextGenerationToArgilla,
)
from distilabel.steps.tasks import TextGeneration, UltraFeedback
from distilabel.steps.tasks.text_generation import TextGeneration
from dotenv import load_dotenv

load_dotenv()


def read_jsonl_file(file_path):
    """
    Reads a .jsonl file where each line is a separate JSON object, and returns a list of dictionaries.

    :param file_path: str - The path to the .jsonl file.
    :return: list - A list containing dictionaries, each representing a JSON object from the file.
    """
    data = []
    try:
        with open(file_path, "r") as file:
            for line in file:

                json_object = json.loads(line.strip())
                json_object["instruction"] = json_object.pop("question")
                json_object["generations"] = json_object.pop("answer")
                data.append(json_object)
    except FileNotFoundError:
        print(f"Error: The file '{file_path}' does not exist.")
    except json.JSONDecodeError:
        print(f"Error: The file '{file_path}' contains invalid JSON.")
    except Exception as e:
        print(f"An unexpected error occurred: {e}")
    return data


llm = AzureOpenAILLM(
    model=os.getenv("api_engine_gpt4"),
    base_url=os.getenv("api_base_gpt4"),
    api_key=os.getenv("api_key_gpt4"),
    api_version=os.getenv("api_version"),
)


with Pipeline(name="ultrafeedback-pipeline") as pipeline:

    data = read_jsonl_file(
        "data.json"
    )

    load_hub_dataset = LoadDataFromDicts(
        name="load_data",
        data=data,
        batch_size=1,
    )

    ultrafeedback = UltraFeedback(
        name="ultrafeedback_overall_rating",
        llm=llm,
        aspect="overall-rating",
        output_mappings={"model_name": "ultrafeedback_model"},
    )

    load_hub_dataset.connect(ultrafeedback)


dataset = pipeline.run(
    parameters={
        "ultrafeedback_overall_rating": {
            "generation_kwargs": {
                "max_new_tokens": 1024,
                "temperature": 0.7,
            },
        },
    }
)

@gabrielmbmb
Copy link
Member

Hi, I think the issue might be caused because the run method is not being called within an if __name__ == "__main__": block. Could you try to update your script and check if you still have the error?

@tridungduong-unsw
Copy link

Hi @gabrielmbmb, I'm currently trying to modify it as follows:

import json
import os
import pdb

import openai
from distilabel.llms import AzureOpenAILLM, OpenAILLM, vLLM
from distilabel.llms.mistral import MistralLLM
from distilabel.pipeline import Pipeline
from distilabel.steps import (
    CombineColumns,
    KeepColumns,
    LoadDataFromDicts,
    LoadHubDataset,
    PreferenceToArgilla,
    TextGenerationToArgilla,
)
from distilabel.steps.tasks import TextGeneration, UltraFeedback
from distilabel.steps.tasks.text_generation import TextGeneration
from dotenv import load_dotenv
import pandas as pd

load_dotenv()

def location_extraction(article):
    system_prompt = """
    You are an advanced Named Entity Recognition (NER) system specializing in disease-related information.
    Task: Identify geographical locations from a given list of entities.
    Instruction: 
    - Focus on identifying specific and recognized geographical locations in each paragraph.
    - LOCATION: Extract names of countries, cities, regions, and towns. Do not include vague or non-specific locations.
    - Present your findings for each entity in a clear, line-separated format. If an entity value includes a list or multiple components, separate these so that each item appears on its own line. 
    Example Output Format:
    - LOCATION: Mexico
    - LOCATION: Vietnam
    """
    prompt = f"""
    Article Content:
    ----------------
    {article}

    Analysis Task:
    --------------
    Please analyze the above article for the specified entities. If certain entities, like dates or locations, are not mentioned, indicate this by stating 'Not mentioned'. For example, 'DATE: Not mentioned'.
    """
    return system_prompt + prompt

llm = AzureOpenAILLM(
    model=os.getenv("api_engine_gpt4"),
    base_url=os.getenv("api_base_gpt4"),
    api_key=os.getenv("api_key_gpt4"),
    api_version=os.getenv("api_version"),
)


with Pipeline(name="ultrafeedback-pipeline") as pipeline:
    df=pd.read_csv('/g/data/ue03/duongd/ews-nlp-llm-inference/dataset/ner/collected/latest_ner.csv')
    df['instruction'] = [location_extraction(x) for x in df['summary']]
    df=df[['instruction', 'locations']]
    df=df.rename(columns={'locations':'generations'})
    df=df.loc[:3, :]
    data = df.to_dict(orient='records')
    load_hub_dataset = LoadDataFromDicts(
        name="load_data",
        data=data,
        batch_size=1,
    )
    ultrafeedback = UltraFeedback(
        name="ultrafeedback_overall_rating",
        llm=llm,
        aspect="overall-rating",
        output_mappings={"model_name": "ultrafeedback_model"},
    )
    load_hub_dataset.connect(ultrafeedback)

if __name__ == "__main__":
    dataset = pipeline.run(
        parameters={
            "ultrafeedback_overall_rating": {
                "generation_kwargs": {
                    "max_new_tokens": 1024,
                    "temperature": 0.7,
                },
            },
        }
    )

The errors still occurred:

Screenshot 2024-04-21 at 5 15 09 pm

@gabrielmbmb
Copy link
Member

Hi @tridungduong-unsw, thanks for the details! I'll try to reproduce the error and get back to you. You are using conda, right?

@tridungduong-unsw
Copy link

Hi @gabrielmbmb, yes, I'm using conda env. btw, I make it run now but need to modify a little bit. Other people will the same problem can try:

import json
import os
import pdb

import openai
from distilabel.llms import AzureOpenAILLM, OpenAILLM, vLLM
from distilabel.llms.mistral import MistralLLM
from distilabel.pipeline import Pipeline
from distilabel.steps import (
    CombineColumns,
    KeepColumns,
    LoadDataFromDicts,
    LoadHubDataset,
    PreferenceToArgilla,
    TextGenerationToArgilla,
)
from distilabel.steps.tasks import TextGeneration, UltraFeedback
from distilabel.steps.tasks.text_generation import TextGeneration
from dotenv import load_dotenv
import pandas as pd

load_dotenv()

def location_extraction(article):
    return system_prompt + prompt

llm = AzureOpenAILLM(
    model=os.getenv("api_engine_gpt4"),
    base_url=os.getenv("api_base_gpt4"),
    api_key=os.getenv("api_key_gpt4"),
    api_version=os.getenv("api_version"),
)


with Pipeline(name="ultrafeedback-pipeline") as pipeline:
    df=pd.read_csv('data.csv')
    df['instruction'] = [location_extraction(x) for x in df['summary']]
    df=df[['instruction', 'locations']]
    df=df.rename(columns={'locations':'generations'})
    df=df.loc[:3, :]
    data = df.to_dict(orient='records')
    load_hub_dataset = LoadDataFromDicts(
        name="load_data",
        data=data,
        batch_size=1,
    )
    ultrafeedback = UltraFeedback(
        name="ultrafeedback_overall_rating",
        llm=llm,
        aspect="overall-rating",
        output_mappings={"model_name": "ultrafeedback_model"},
    )
    load_hub_dataset.connect(ultrafeedback)

if __name__ == "__main__":
    dataset = pipeline.run(
        parameters={
            "ultrafeedback_overall_rating": {
                "generation_kwargs": {
                    "max_new_tokens": 1024,
                    "temperature": 0.7,
                },
            },
        }
    )

@YueWu0301
Copy link
Author

Hi @YueWu0301, could you share the code of your pipeline?

sure,here is my code:

with Pipeline("pipe-name", description="My first pipe") as pipeline:
    load_dataset = LoadHubDataset(
        repo_id="xxxx",
        name="load_dataset2"
        # output_mappings={"input": "instruction"},
    )
    push_to_hub = PushToHub(
    name="push_to_hub1",
    repo_id="xxxx",
    token="xxxxxx"
    )
    llm1 = OpenAILLM(model="xxxx",
                     api_key = "xxxx",
                     base_url="xxxx")
    task = TextGeneration(name=f"text_generation1", llm=llm1)
    load_dataset.connect(task)
    task.connect(push_to_hub)


re = pipeline.run(
        parameters={
        "load_dataset2":{
            "repo_id":"xxxxxx",
        },
        
        "text_generation1": {
            "llm": {
                "generation_kwargs": {
                    "temperature": 0.9,
                    }
                }
            },
        
        "push_to_hub1":{
                "repo_id":"xxxxxxx", 
        }
        }
)

Thanks a lot

@gabrielmbmb
Copy link
Member

Hi @YueWu0301, could you try running and see if it works for you too? (mind the if __name__ == "__main__":)

with Pipeline("pipe-name", description="My first pipe") as pipeline:
    load_dataset = LoadHubDataset(
        repo_id="xxxx",
        name="load_dataset2"
        # output_mappings={"input": "instruction"},
    )
    push_to_hub = PushToHub(
    name="push_to_hub1",
    repo_id="xxxx",
    token="xxxxxx"
    )
    llm1 = OpenAILLM(model="xxxx",
                     api_key = "xxxx",
                     base_url="xxxx")
    task = TextGeneration(name=f"text_generation1", llm=llm1)
    load_dataset.connect(task)
    task.connect(push_to_hub)

if __name__ == "__main__":
    re = pipeline.run(
            parameters={
            "load_dataset2":{
                "repo_id":"xxxxxx",
            },
            
            "text_generation1": {
                "llm": {
                    "generation_kwargs": {
                        "temperature": 0.9,
                        }
                    }
                },
            
            "push_to_hub1":{
                    "repo_id":"xxxxxxx", 
            }
            }
    )

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants